TG Telegram Group & Channel
Библиотека Go-разработчика | Golang | United States America (US)
Create: Update:

🛠 Как использовать кафку в Go

Представьте, что у вас в руках инструмент, который позволяет строить крутой обмен сообщениями между сервисами. Представили? Давайте попробуем его применить.

1. Установка

Для начала проверьте, что у вас установлен Go версии 1.15 и выше и введите команду:

go get github.com/segmentio/kafka-go


Это подтянет для вас либу kafka-go после чего её можно будет использовать в проекте:
import "github.com/segmentio/kafka-go"


2. Используем низкоуровневый коннект

Когда нужно отправить сообщение:
ctx := context.Background()
conn, err := kafka.DialLeader(ctx, "tcp", "localhost:9092", "my-topic", 0)
if err != nil {
log.Fatal(err)
}
defer conn.Close()

// Отправляем два сообщения подряд
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
_, err = conn.WriteMessages(
kafka.Message{Value: []byte("Первое сообщение")},
kafka.Message{Value: []byte("Второе сообщение")},
)
if err != nil {
log.Fatal(err)
}


Когда сообщение нужно принять:
batch := conn.ReadBatch(10e3, 1e6)
defer batch.Close()

buf := make([]byte, 10e3)
for {
n, err := batch.Read(buf)
if err != nil {
break
}
fmt.Println("→", string(buf[:n]))
}


3. Используем высокоуровневый Reader

Reader сам следит за смещениями, повторными подключениями и балансировкой в группе:
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093"},
GroupID: "example-group",
Topic: "events",
MinBytes: 10e3,
MaxBytes: 10e6,
CommitInterval: time.Second,
})
defer r.Close()

for {
m, err := r.ReadMessage(context.Background())
if err != nil {
log.Println("Завершение чтения:", err)
break
}
fmt.Printf("Получено: ключ=%s, значение=%s, смещение=%d\n", m.Key, m.Value, m.Offset)
}


Если нужно ручное управление, замените ReadMessage на FetchMessage + CommitMessages.

4. Просто пишем с высокоуровневым Writer

Writer сам позаботится о повторных попытках, балансировке и «гладком» завершении:
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092", "localhost:9093"},
Topic: "events",
Balancer: &kafka.LeastBytes{},
})
defer w.Close()

messages := []kafka.Message{
{Key: []byte("order123"), Value: []byte(`{"status":"created"}`)},
{Key: []byte("order124"), Value: []byte(`{"status":"paid"}`)},
}
if err := w.WriteMessages(context.Background(), messages...); err != nil {
log.Fatal("Ошибка при записи:", err)
}


Бонус: советы по использованию

— всегда context.WithTimeout или WithCancel — это ваша страховка от «зависаний».

— ловите SIGINT/SIGTERM и аккуратно закрывайте Reader и Writer.

— подключайте Prometheus/OpenTelemetry — мониторьте throughput, задержки и ошибки.

Запустите пару консьюмеров и продюсеров — и почувствуйте, как работают Kafka и Go.

🐸Библиотека Go-разработчика #буст

🛠 Как использовать кафку в Go

Представьте, что у вас в руках инструмент, который позволяет строить крутой обмен сообщениями между сервисами. Представили? Давайте попробуем его применить.

1. Установка

Для начала проверьте, что у вас установлен Go версии 1.15 и выше и введите команду:
go get github.com/segmentio/kafka-go


Это подтянет для вас либу kafka-go после чего её можно будет использовать в проекте:
import "github.com/segmentio/kafka-go"


2. Используем низкоуровневый коннект

Когда нужно отправить сообщение:
ctx := context.Background()
conn, err := kafka.DialLeader(ctx, "tcp", "localhost:9092", "my-topic", 0)
if err != nil {
log.Fatal(err)
}
defer conn.Close()

// Отправляем два сообщения подряд
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
_, err = conn.WriteMessages(
kafka.Message{Value: []byte("Первое сообщение")},
kafka.Message{Value: []byte("Второе сообщение")},
)
if err != nil {
log.Fatal(err)
}


Когда сообщение нужно принять:
batch := conn.ReadBatch(10e3, 1e6)
defer batch.Close()

buf := make([]byte, 10e3)
for {
n, err := batch.Read(buf)
if err != nil {
break
}
fmt.Println("→", string(buf[:n]))
}


3. Используем высокоуровневый Reader

Reader сам следит за смещениями, повторными подключениями и балансировкой в группе:
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093"},
GroupID: "example-group",
Topic: "events",
MinBytes: 10e3,
MaxBytes: 10e6,
CommitInterval: time.Second,
})
defer r.Close()

for {
m, err := r.ReadMessage(context.Background())
if err != nil {
log.Println("Завершение чтения:", err)
break
}
fmt.Printf("Получено: ключ=%s, значение=%s, смещение=%d\n", m.Key, m.Value, m.Offset)
}


Если нужно ручное управление, замените ReadMessage на FetchMessage + CommitMessages.

4. Просто пишем с высокоуровневым Writer

Writer сам позаботится о повторных попытках, балансировке и «гладком» завершении:
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092", "localhost:9093"},
Topic: "events",
Balancer: &kafka.LeastBytes{},
})
defer w.Close()

messages := []kafka.Message{
{Key: []byte("order123"), Value: []byte(`{"status":"created"}`)},
{Key: []byte("order124"), Value: []byte(`{"status":"paid"}`)},
}
if err := w.WriteMessages(context.Background(), messages...); err != nil {
log.Fatal("Ошибка при записи:", err)
}


Бонус: советы по использованию

— всегда context.WithTimeout или WithCancel — это ваша страховка от «зависаний».

— ловите SIGINT/SIGTERM и аккуратно закрывайте Reader и Writer.

— подключайте Prometheus/OpenTelemetry — мониторьте throughput, задержки и ошибки.

Запустите пару консьюмеров и продюсеров — и почувствуйте, как работают Kafka и Go.

🐸Библиотека Go-разработчика #буст
Please open Telegram to view this post
VIEW IN TELEGRAM
7👍4🥱3🔥2


>>Click here to continue<<

Библиотека Go-разработчика | Golang






Share with your best friend
VIEW MORE

United States America Popular Telegram Group (US)