🛠 Как использовать кафку в Go
Представьте, что у вас в руках инструмент, который позволяет строить крутой обмен сообщениями между сервисами. Представили? Давайте попробуем его применить.
1. Установка
Для начала проверьте, что у вас установлен Go версии 1.15 и выше и введите команду:
go get github.com/segmentio/kafka-go
Это подтянет для вас либу kafka-go после чего её можно будет использовать в проекте:
import "github.com/segmentio/kafka-go"
2. Используем низкоуровневый коннект
Когда нужно отправить сообщение:
ctx := context.Background()
conn, err := kafka.DialLeader(ctx, "tcp", "localhost:9092", "my-topic", 0)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// Отправляем два сообщения подряд
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
_, err = conn.WriteMessages(
kafka.Message{Value: []byte("Первое сообщение")},
kafka.Message{Value: []byte("Второе сообщение")},
)
if err != nil {
log.Fatal(err)
}
Когда сообщение нужно принять:
batch := conn.ReadBatch(10e3, 1e6)
defer batch.Close()
buf := make([]byte, 10e3)
for {
n, err := batch.Read(buf)
if err != nil {
break
}
fmt.Println("→", string(buf[:n]))
}
3. Используем высокоуровневый Reader
Reader сам следит за смещениями, повторными подключениями и балансировкой в группе:
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093"},
GroupID: "example-group",
Topic: "events",
MinBytes: 10e3,
MaxBytes: 10e6,
CommitInterval: time.Second,
})
defer r.Close()
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
log.Println("Завершение чтения:", err)
break
}
fmt.Printf("Получено: ключ=%s, значение=%s, смещение=%d\n", m.Key, m.Value, m.Offset)
}
Если нужно ручное управление, замените
ReadMessage
на FetchMessage
+ CommitMessages
.4. Просто пишем с высокоуровневым Writer
Writer сам позаботится о повторных попытках, балансировке и «гладком» завершении:
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092", "localhost:9093"},
Topic: "events",
Balancer: &kafka.LeastBytes{},
})
defer w.Close()
messages := []kafka.Message{
{Key: []byte("order123"), Value: []byte(`{"status":"created"}`)},
{Key: []byte("order124"), Value: []byte(`{"status":"paid"}`)},
}
if err := w.WriteMessages(context.Background(), messages...); err != nil {
log.Fatal("Ошибка при записи:", err)
}
Бонус: советы по использованию
— всегда
context.WithTimeout
или WithCancel
— это ваша страховка от «зависаний».— ловите
SIGINT
/SIGTERM
и аккуратно закрывайте Reader и Writer.— подключайте Prometheus/OpenTelemetry — мониторьте
throughput
, задержки и ошибки.Запустите пару консьюмеров и продюсеров — и почувствуйте, как работают Kafka и Go.