БайтМеханик
Блог программиста. $ sed -e s/программиста/работяги/g

Буферизованные и не буферизованные каналы (buffered / unbuffered channels) в Go (Golang)

Время на чтение: 30 мин. Категория: Go (Golang)

Каналы служат для коммуникации между горутинами. Эта концепция противиположна концепции разделяемой памяти (shared memory).

В концепции разделяемой памяти мы используем одну и ту же переменную, чтобы передать данные между потоками. Таким образом возникает состояние гонки, когда чтение и запись в одну и ту же область памяти (где хранится наша общая переменная) может производиться одновременно и это приведет к неопределенному / неожиданному значению в этой переменной. Чтобы решить эту проблему, обычно используются мьютексы, синхронизации и прочее.

Каналы предлагают альтернативный подход: вместо того, чтобы писать и читать в общую переменную, мы посылаем данные в канал в одной горутине (напомню, что main() тоже запускается в своей горутине), а другая горутина читает из этого канала.

Каналы делятся на два типа: буферизированные и небуферизированные (это слитно пишется по-русски, не?).

Небуферизированные каналы (unbuffered channels)

Небуферизированный канал создается следующим образом:

ch := make(chan int)

Особенность такого канала в том, что он может работать как симафор: посылающая сторона остановит выполнение до тех пор, пока кто-то не примет данные из канала. Например:

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
    ch := make(chan int)

    // WaitGroup нужна только, чтобы дождаться завершения всех горутин (вывода на STDOUT)
    var wg sync.WaitGroup
    wg.Add(1)

    // Хитрость программы в том, что мы СНАЧАЛА запускаем горутину, которая будет
    // ждать значение из канала, а ПОТОМ записываем значение в этот канал
    go func() {
        fmt.Println("I'm waiting for value...")
        result := <- ch
        fmt.Println("Value received!")
        fmt.Println(result)
        wg.Done()
    }()

    time.Sleep(time.Second * 5)
    fmt.Println("I'm sending the value...")
    ch <- 1

    wg.Wait()
}

Если бы мы попытались сначала читать из канала, а потом запустить горутину, которая кладет данные в канал, у нас ничего бы не вышло. Горутина, которая читает данные, "заснула" бы и просто не дошла до выполнения кода, который запускает горутину, которая кладет данные в канал:

package main

import (
	"fmt"
	"sync"
)

func main() {
    ch := make(chan int)

    var wg sync.WaitGroup
    wg.Add(1)

    result := <- ch
    fmt.Println(result)

    go func() {
        ch <- 1
        wg.Done()
    }()

    wg.Wait()
}

Таким образом мы получили бы следующую ошибку (все горутины спят, фатальная ошибка):

$ go run main.go
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
main.main()
        main.go:14 +0x5d
exit status 2

При этом в стек-трейсе было бы указано, что засыпание произошло в функции main(), на 14ой строке (), проблема в приеме из канала (chan receive). И в этой строке, как видно, как раз происходит result := <- ch.

Важные замечания:

  1. Небуферизированный канал не имеет ёмкости (вместимости, размера). То есть предать через него можно только одно значение за раз.

  2. Как было сказано, когда отправитель посылает что-то в канал, он останавливается, пока из канала не будет прочитано значение.

  3. Небуферизированные каналы обычно используются для синхронного выполнения каких-то операций. Например, создания конвейеров: первая горутина перемножает два числа, а вторая добавляет к ним единицу; мы не можем добавить единицу до перемножения и нам нужна точка синхронизации. Пример может быть каким-то таким:

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan int)

    a := 2
    b := 2

    go func() {
        c := a * b
        ch <- c
    }()

    go func() {
        c := <- ch
        c = c + 1
        fmt.Println(c)
    }()

    time.Sleep(time.Second*5)
}

Напомню, что анонимная функция (go func(){}()) имеет общую область видимости при инициализации (то есть a и b не обязательно передавать в нее через аргументы), но это не работает "в обратную сторону" (инициализированная внутри нее переменная c не будет видна снаружи и, следовательно, в другой анонимной функции для второй горутины).

  1. Отправляющая и принимающая сторона должны быть готовы одновременно (и отправить и принять). В противном случае будет дедлок, как мы видели в примере выше. Это довольно непонятный момент, т.к. рантайм, видимо, в состоянии понять только формальные признаки того, готовы ли обе стороны. На практике запросто можно написать программу, которая зависнет навсегда, но будет соблюдать эти формальные признаки. Например, вот так:
package main

import (
    "time"
    "fmt"
)

func main() {
    ch := make(chan int)

    go func() {
        for {
            time.Sleep(time.Second*1)
        }
        ch <- 10
    }()

    c := <- ch
    fmt.Println(c)
}

Здесь мы выполняем бесконченый цикл, а после него отправляем значение в канал. Очевидно, мы никогда не отправим значение в канал. Но рантайм не сообщает нам о дедлоке (all goroutines are asleep - deadlock!), с его точки зрения всё идет хорошо.

Вывод по небуферизированным каналам

В основном они используются как точка синхронизации (буквально: останавливают выполнение горутин в месте записи или приема из канала) и передают данные между ними.

Буферизированные каналы

Создать буферизованный канал можно следующим образом:

ch := make(chan int, 3)

Буферизированные каналы очень похожи на не буферизированные каналы. Для себя я выбрал такую метафору. Не буферизированный канал, это как если бы два человека стояли рядом и передавали друг другу коробку. Предположим, они оба работают на складе и один упаковывает товар, а другой расставляет на полки.

Расставляльщик протянул руки и ждет (программа замирает в ожидании во время приема из канала), пока упаковщик передаст ему коробку. Как только ему передают коробку, он идет делать свои дела дальше. С другой стороны, если упаковщик не видит раставляльщика, которые еще не пришел забрать коробку, то он ждет, пока раставляльщик придет и возьмет коробку у него из рук (программа замирает во время отправки в канал).

Буферизированный же канал скорее похож на какую-то ленту, конвеер. Тот, кто хочет передать коробку, кладет ее на конвеер, а не отдает человеку сразу. Тот, кто принимает коробку, берет ее с конвеера, а не от конкретного человека. У конвеера есть некая вместительность (вместимость?). Например, туда можно поставить только пять коробок (размер буфера канала). И есть еще одна особенность: если коробка поставлена, она не упадет с конвеера (предположим, он остановится, если она доедет до конца; вообще говоря, лучше не думать об этом так, как будто там и в самом деле что-то куда-то едет).

В такой метафоре понятно, что если упаковщик попытается положить еще одну коробку на конвеер, но там уже и так слишком много коробок (канал заполнен), то положить ему не удастся и он будет ждать, пока освободится место (кто-то заберет коробку "на том конце" конвеера). Здесь программа зависнет в ожидании, когда в буферизованный канал можно будет отправить еще одно сообщение.

С другой стороны, если расставляльщик видит, что на конвеере коробок больше нет (он все забрал и расставил), то забирать ему больше нечего и он будет ждать, пока к нему не приедет по конвееру еще одна коробка. Здесь программа зависнет в ожидании, когда в пустой буферизованный канал придет хотя бы еще одно сообщение.

Таким образом у буферизованных каналов есть ряд особенностей и отличий от не буферизованных каналов, хотя из концепции очень похожи.

Важное отличие: буферизированный канал с размером буфера в единицу это не то же самое, что не буферизированный канал. Я не уверен, что с технической точки зрения так и есть, но не буферизированный канал - это буферизированный канал с размером буфера ноль. Поэтому в моем сравнении выше люди передавали посылки прямо из рук в руки.

Ряд отличий буферизованных каналов от не буферизованных каналов:

  1. Буферизованный канал имеет определенный размер (capacity);

  2. Отправитель блокируется, если канал уже полон;

  3. Получатель блокируется, если канал пуст;

  4. Буферизованные каналы отлично подходят, если вы хотите "развязать" тайминги получателя и отправителя (например, кто-то работает то быстро, то медленно, нет смысла ждать его каждый раз, когда он отрабатывает медленно);

  5. Буферизованные каналы предоставляют некоторую асинхронность, т.к. отправителю не обязательно постоянно (синхронно) ждать получателя;

Если вы помните, у нас не вышло отправить в небуферизованный канал значение и сразу же получить его внутри одной и той же горутины main. Это было связано с тем, что слушатель еще не был готов в момент, когда мы записывали значение в канал (мы еще не запустили дополнительную горутину для слушания канала). В случае с буферизованным каналом эта проблема "исчезает" по указанным выше причинам:

package main

import (
    "fmt"
)

func main() {
    ch := make(chan int, 3)
   
    ch <- 10
    ch <- 20
    ch <- 30
    
    fmt.Println(<-ch)
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}

Но нужно быть осторожным. Если вместимость канала меньше количества значений, которые мы в него записываем (поменяйте 3 на 2 в make(chan int, 3)), то мы сразу же получим ошибку fatal error: all goroutines are asleep - deadlock!. Это логично вытекает из сказанного выше: как только мы пишем туда третье значение, горутина, в которой запущена функция main(), начинает ждать, когда канал освободится. Но он не освободится никогда, т.к. и читать мы пытаемся в этой же горутине, а до чтения дело не доходит (мы "зависли" на этапе "положи в канал значение 30"). Решить эту проблему можно следующим способом (читая из канала в другой горутине):

package main

import (
	"fmt"
	"sync"
)

func main() {
    ch := make(chan int, 2)

    // WaitGroup нужна только чтобы дождаться выполнения всех горутин!
    var wg sync.WaitGroup
    wg.Add(1)

    go func() {
        fmt.Println(<-ch)
        fmt.Println(<-ch)
        fmt.Println(<-ch)
        wg.Done()
    }()

    ch <- 10
    ch <- 20
    ch <- 30
    
    wg.Wait()
}

То есть мы встретились с той же проблемой, что и с небуферизированным каналом. Даже решение такое же. И если мы запустим горутину (go func()) после того, как начнем записывать в канал, мы также получим ошибку о дедлоке (напомню: только в том случае, если записей в канал будет больше, чем емкость канала).

Емкость и количество элементов в буферизированном канале

Чтобы получить емкость и текущее количество элементов в канале, используйте функции cap() и len() следующим образом:

package main

import (
	"fmt"
)

func main() {
    ch := make(chan int, 10)
    ch <- 1
    ch <- 2 
    ch <- 3

    fmt.Printf("Capacity: %d\n", cap(ch))
    fmt.Printf("Current length: %d\n", len(ch))
}

Закрытие канала

Канал может быть закрыт следующим образом:

close(ch)

В закрытый канал нельзя писать (будет паника) и прочитать из него можно только нулевое значение для данного типа переменной:

package main

import (
	"fmt"
)

type Job struct {
    ID int
}

func main() {
    ch := make(chan Job, 10)
    close(ch)

    // close (ch) // Ошибка "panic: close of closed channel"

    fmt.Printf("%#v", <-ch) // Получим "main.Job{ID:0}"

    ch<-Job{ID: 100} // Ошибка "panic: send on closed channel"
}

Оговорка: канал может быть закрыт (обычно отправителем), но при этом мы еще можем читать из него значения, пока они не кончатся. И уже только далее чтение начнет возвращать нулевые значения типа переменной канала.

Например, попробуем записать в буферизованный канал 3 значения, закрыть его и прочитать из него значения 10 раз. На четвертый раз мы узнаем, что второй параметр чрения из канала стал false:

package main

import (
	"fmt"
    "time"
)

type Job struct {
    ID int
}

func main() {
    ch := make(chan Job, 3)

    go func() {
        ch <- Job{ID: 1000}
        ch <- Job{ID: 1100}
        ch <- Job{ID: 1200}
        close(ch)
    }()

    time.Sleep(time.Second * 2)

    for i := 0; i < 10; i++ {
        job, canProcess := <-ch
        fmt.Println(canProcess)

        if canProcess {
            fmt.Printf("%#v\n", job)
        }
    }
}

Важное замечание состоит в том, что канал никогда не будет автоматически закрыт. Он может быть собран сборщиком мусора, если на него больше нигде нет ссылок, но это не то же самое.

Чтение из "одного из каналов" (select)

Выражение select позволяет прочитать из "одного из каналов". То есть чтение произойдет из первого канала, куда придут данные и на этом прекратится. Пример:

package main

import (
	"fmt"
	"time"
)

func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)

	go func() {
		time.Sleep(time.Second * 1)
		ch1 <- 100
	}()

	go func() {
		time.Sleep(time.Second * 20)
		ch2 <- 200
	}()

	select {
	case data := <-ch1:
		fmt.Println("Channel 1: ", data)
	case data := <-ch2:
		fmt.Println("Channel 2: ", data)
	}
}

Как только данные будут записаны в первый канал ch1, select прочитает из него данные и программа завершится, несмотря на то, что во второй горутине слип на 20 секунд.

Если данные пришли в оба канала одновременно, то чтение произойдет из случайного канала (если я правильно перевел фразу из документации "It chooses one at random if multiple are ready").

Каналы только для чтения и только для записи

Если мы передаем канал в функцию, то в аргументах мы можем указать, является ли канал только для чтения или только для записи. Это удобно потому, что сохраняет логику нашей программы и страхует нас от несчастных случаев. Например, другой разработчик попытается добавить код, которые пишет данные в канал, который нужен в функции только для чтения - будет ошибка на этапе компиляции.

Вот так можно указать, что канал предназначен только для того, чтобы в него писать:

func my(ch chan<- int) {

Вот так можно указать, что из этого канала можно только читать:

func my2(ch2 <-chan string) {

То есть стрелочка слева от "chan" говорит, что можно только читать (она как бы "наружу"). Стрелочка справа от "chan" говорит, что можно только писать (она как бы "внутрь" слова "chan").

Пример программы, которая использует и каналы только для чтения и каналы только для записи (создание канала, однако, одинаковое):

package main

import (
	"fmt"
    "time"
)

func send(ch chan<- string) {
    ch <- "Hello!"
}

func print(ch <-chan string) {
    fmt.Println(<-ch)
}

func main() {
    ch := make(chan string)

    go func() {
        print(ch)
    }()

    send(ch)

    time.Sleep(time.Second*5)
}

Прелесть в том, что если мы добавим в send() попытку записать в канал, то заругается компилятор:

.\main11.go:9:12: invalid operation: cannot receive from send-only channel ch (variable of type chan<- string)

Если бы у нас такого не было и мы использовали бы обычную нотацию канала (и на чтение и на запись), то мы воткнулись бы в дедлок, а компилятор бы не ругался (это и есть та самая безопасность, о которой я говорил выше):

func send(ch chan string) {
    _ = <-ch
    ch <- "Hello!"
}

Конец.