version: '2.1'
services:
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo1
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
depends_on:
- kafka1
environment:
DYNAMIC_CONFIG_ENABLED: "TRUE"

go get github.com/segmentio/kafka-go
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
)
func main() {
w := &kafka.Writer{
Addr: kafka.TCP(
"localhost:9092"),
Topic:
"topic-A",
Balancer: &kafka.LeastBytes{},
RequiredAcks: kafka.RequireAll,
Async:
true,
}
err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []
byte(
"Key-A"),
Value: []
byte(
"Hello World!"),
},
kafka.Message{
Key: []
byte(
"Key-B"),
Value: []
byte(
"One!"),
},
kafka.Message{
Key: []
byte(
"Key-C"),
Value: []
byte(
"Two!"),
})
if err !=
nil {
log.Fatalf(
"could not write message: %v", err)
}
if err := w.Close(); err !=
nil {
log.Fatalf(
"could not close writer: %v", err)
}
}
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
)
func main() {
w := &kafka.Writer{
Addr: kafka.TCP(
"localhost:9092"),
Topic:
"topic-A",
Balancer: &kafka.LeastBytes{},
RequiredAcks: kafka.RequireAll,
Async:
true,
}
err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []
byte(
"Key-A"),
Value: []
byte(
"Hello World!"),
},
kafka.Message{
Key: []
byte(
"Key-B"),
Value: []
byte(
"One!"),
},
kafka.Message{
Key: []
byte(
"Key-C"),
Value: []
byte(
"Two!"),
})
if err !=
nil {
log.Fatalf(
"could not write message: %v", err)
}
if err := w.Close(); err !=
nil {
log.Fatalf(
"could not close writer: %v", err)
}
}
root@pzj:/code/demo/kafka/reader# go run main.go
message at offset 0 key: Key-A value: Hello World!
message at offset 1 key: Key-B value: One!
message at offset 2 key: Key-C value: Two!