version: '3.8'
services:
zookeeper1:
image: zookeeper:3.7
container_name: zookeeper1
restart: always
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zookeeper1:2888:3888;2181 server.2=zookeeper2:2888:3888;2181 server.3=zookeeper3:2888:3888;2181
volumes:
- ./zookeeper1/data:/data
- ./zookeeper1/datalog:/datalog
zookeeper2:
image: zookeeper:3.7
container_name: zookeeper2
restart: always
ports:
- "2182:2181"
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zookeeper1:2888:3888;2181 server.2=zookeeper2:2888:3888;2181 server.3=zookeeper3:2888:3888;2181
volumes:
- ./zookeeper2/data:/data
- ./zookeeper2/datalog:/datalog
zookeeper3:
image: zookeeper:3.7
container_name: zookeeper3
restart: always
ports:
- "2183:2181"
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zookeeper1:2888:3888;2181 server.2=zookeeper2:2888:3888;2181 server.3=zookeeper3:2888:3888;2181
volumes:
- ./zookeeper3/data:/data
- ./zookeeper3/datalog:/datalog
# 创建目录
mkdir -p {zookeeper1,zookeeper2,zookeeper3}/{data,datalog}
# 启动集群
docker-compose up -d
# 查看运行状态
docker-compose ps
# 查看日志
docker-compose logs -f
# 测试集群状态
echo stat | nc localhost 2181
echo stat | nc localhost 2182
echo stat | nc localhost 2183
version: '3.8'
services:
kafka1:
image: confluentinc/cp-kafka:7.3.0
container_name: kafka1
restart: always
depends_on:
- zookeeper1
- zookeeper2
- zookeeper3
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://localhost:9092
KAFKA_LISTENERS: INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_LOG_SEGMENT_BYTES: 1073741824
KAFKA_NUM_PARTITIONS: 3
volumes:
- ./kafka1/data:/var/lib/kafka/data
kafka2:
image: confluentinc/cp-kafka:7.3.0
container_name: kafka2
restart: always
depends_on:
- zookeeper1
- zookeeper2
- zookeeper3
ports:
- "9093:9093"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://localhost:9093
KAFKA_LISTENERS: INTERNAL://0.0.0.0:19093,EXTERNAL://0.0.0.0:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_LOG_SEGMENT_BYTES: 1073741824
KAFKA_NUM_PARTITIONS: 3
volumes:
- ./kafka2/data:/var/lib/kafka/data
kafka3:
image: confluentinc/cp-kafka:7.3.0
container_name: kafka3
restart: always
depends_on:
- zookeeper1
- zookeeper2
- zookeeper3
ports:
- "9094:9094"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://localhost:9094
KAFKA_LISTENERS: INTERNAL://0.0.0.0:19094,EXTERNAL://0.0.0.0:9094
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_LOG_SEGMENT_BYTES: 1073741824
KAFKA_NUM_PARTITIONS: 3
volumes:
- ./kafka3/data:/var/lib/kafka/data
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
restart: always
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: "kafka1:19092,kafka2:19093,kafka3:19094"
KAFKA_CLUSTERS_0_ZOOKEEPER: "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181"
depends_on:
- kafka1
- kafka2
- kafka3
# 创建目录
mkdir -p kafka1 kafka2 kafka3
# 启动Kafka集群
docker-compose -f docker-compose-kafka.yml up -d
# 查看运行状态
docker-compose -f docker-compose-kafka.yml ps
# 测试Kafka
# 进入容器
docker exec -it kafka1 /bin/bash
# 创建topic
kafka-topics --create --topic test-topic --partitions 3 --replication-factor 3 --bootstrap-server kafka1:19092
# 查看topic
kafka-topics --describe --topic test-topic --bootstrap-server kafka1:19092
# 生产消息
kafka-console-producer --topic test-topic --bootstrap-server kafka1:19092
# 消费消息(新终端)
kafka-console-consumer --topic test-topic --from-beginning --bootstrap-server kafka1:19092
<dependencies>
<!-- Kafka客户端 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
<!-- Slf4j日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.36</version>
</dependency>
<!-- JSON处理 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.2</version>
</dependency>
</dependencies>
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class KafkaProducerExample {
private static final String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";
private static final String TOPIC_NAME = "test-topic";
private static final ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) {
// 1. 创建配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 2. 生产者配置优化
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有副本确认
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 保证顺序
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // 压缩
props.put(ProducerConfig.LINGER_MS_CONFIG, 20); // 延迟发送
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32 * 1024); // 批量大小
// 3. 创建生产者
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
// 4. 发送同步消息
System.out.println("开始发送同步消息...");
for (int i = 1; i <= 10; i++) {
User user = new User("user-" + i, "email-" + i + "@test.com", i * 10);
String message = objectMapper.writeValueAsString(user);
ProducerRecord<String, String> record = new ProducerRecord<>(
TOPIC_NAME,
String.valueOf(i % 3), // 按用户ID分区的key
message
);
// 同步发送
RecordMetadata metadata = producer.send(record).get();
System.out.printf("同步发送成功 - Topic: %s, Partition: %d, Offset: %d, Key: %s%n",
metadata.topic(), metadata.partition(), metadata.offset(), record.key());
Thread.sleep(100);
}
// 5. 发送异步消息
System.out.println("\n开始发送异步消息...");
for (int i = 11; i <= 20; i++) {
User user = new User("user-" + i, "email-" + i + "@test.com", i * 10);
String message = objectMapper.writeValueAsString(user);
ProducerRecord<String, String> record = new ProducerRecord<>(
TOPIC_NAME,
String.valueOf(i % 3),
message
);
// 异步发送
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("异步发送成功 - Topic: %s, Partition: %d, Offset: %d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
System.err.println("异步发送失败: " + exception.getMessage());
}
});
Thread.sleep(100);
}
// 6. 发送带回调的消息
System.out.println("\n开始发送带回调的消息...");
for (int i = 21; i <= 30; i++) {
User user = new User("user-" + i, "email-" + i + "@test.com", i * 10);
String message = objectMapper.writeValueAsString(user);
ProducerRecord<String, String> record = new ProducerRecord<>(
TOPIC_NAME,
String.valueOf(i % 3),
message
);
producer.send(record, new CustomCallback(i));
Thread.sleep(100);
}
producer.flush();
System.out.println("\n所有消息发送完成!");
} catch (Exception e) {
e.printStackTrace();
}
}
static class CustomCallback implements Callback {
private final int messageId;
public CustomCallback(int messageId) {
this.messageId = messageId;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.printf("回调消息%d发送成功 - Partition: %d, Offset: %d%n",
messageId, metadata.partition(), metadata.offset());
} else {
System.err.printf("回调消息%d发送失败: %s%n", messageId, exception.getMessage());
}
}
}
static class User {
private String username;
private String email;
private int age;
public User(String username, String email, int age) {
this.username = username;
this.email = email;
this.age = age;
}
// getters and setters
public String getUsername() { return username; }
public void setUsername(String username) { this.username = username; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public int getAge() { return age; }
public void setAge(int age) { this.age = age; }
}
}
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class KafkaConsumerExample {
private static final String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";
private static final String TOPIC_NAME = "test-topic";
private static final String GROUP_ID = "test-consumer-group";
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final AtomicBoolean running = new AtomicBoolean(true);
public static void main(String[] args) {
// 1. 创建消费者配置
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 2. 消费者配置优化
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早开始消费
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // 每次poll最大记录数
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // poll间隔
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); // 会话超时
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000); // 心跳间隔
// 3. 创建消费者
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
// 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("正在关闭消费者...");
running.set(false);
consumer.wakeup();
}));
// 4. 订阅主题
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
System.out.println("消费者已启动,开始消费消息...");
// 5. 消费消息
while (running.get()) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
if (!records.isEmpty()) {
System.out.printf("收到 %d 条消息%n", records.count());
// 按分区处理消息
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
System.out.printf("分区 %d 有 %d 条消息%n",
partition.partition(), partitionRecords.size());
for (ConsumerRecord<String, String> record : partitionRecords) {
processMessage(record);
}
}
// 手动提交偏移量(异步提交)
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
System.err.println("提交偏移量失败: " + exception.getMessage());
}
});
}
} catch (WakeupException e) {
// 忽略,用于关闭消费者
} catch (Exception e) {
System.err.println("消费消息异常: " + e.getMessage());
}
}
// 最终提交
try {
consumer.commitSync();
System.out.println("最终提交偏移量完成");
} catch (Exception e) {
System.err.println("最终提交偏移量失败: " + e.getMessage());
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static void processMessage(ConsumerRecord<String, String> record) {
try {
User user = objectMapper.readValue(record.value(), User.class);
System.out.printf("收到消息 - Topic: %s, Partition: %d, Offset: %d, Key: %s%n",
record.topic(), record.partition(), record.offset(), record.key());
System.out.printf("消息内容 - User: %s, Email: %s, Age: %d%n%n",
user.getUsername(), user.getEmail(), user.getAge());
} catch (Exception e) {
System.err.println("解析消息失败: " + e.getMessage());
}
}
// 重分配监听器示例
static class CustomRebalanceListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("分区被撤销: " + partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("分区被分配: " + partitions);
}
}
static class User {
private String username;
private String email;
private int age;
// 必须有无参构造函数
public User() {}
public User(String username, String email, int age) {
this.username = username;
this.email = email;
this.age = age;
}
// getters and setters
public String getUsername() { return username; }
public void setUsername(String username) { this.username = username; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public int getAge() { return age; }
public void setAge(int age) { this.age = age; }
}
}
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
public class KafkaAdvancedFeatures {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092,localhost:9093,localhost:9094");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
// 1. 获取主题分区信息
System.out.println("=== 获取主题分区信息 ===");
List<PartitionInfo> partitions = producer.partitionsFor("test-topic");
partitions.forEach(p -> System.out.printf("分区: %d, Leader: %s, Replicas: %s%n",
p.partition(), p.leader(), p.replicas()));
// 2. 事务生产者示例
System.out.println("\n=== 事务生产者示例 ===");
Properties txProps = new Properties();
txProps.putAll(props);
txProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
txProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
try (KafkaProducer<String, String> txProducer = new KafkaProducer<>(txProps)) {
// 初始化事务
txProducer.initTransactions();
try {
// 开始事务
txProducer.beginTransaction();
// 发送事务消息
for (int i = 1; i <= 3; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"test-topic", "key-" + i, "transaction-message-" + i);
txProducer.send(record);
System.out.println("发送事务消息: " + i);
}
// 提交事务
txProducer.commitTransaction();
System.out.println("事务提交成功");
} catch (Exception e) {
// 回滚事务
txProducer.abortTransaction();
System.err.println("事务回滚: " + e.getMessage());
throw e;
}
}
// 3. 自定义分区器示例(如果需要)
System.out.println("\n=== 自定义路由示例 ===");
// Kafka默认使用hash(key) % partitions进行分区
} catch (Exception e) {
e.printStackTrace();
}
}
}
#!/bin/bash
# cluster-test.sh
echo "=== 检查Zookeeper集群 ==="
echo "stat | nc localhost 2181"
echo "stat | nc localhost 2182"
echo "stat | nc localhost 2183"
echo -e "\n=== 检查Kafka集群 ==="
echo "列出所有主题:"
docker exec kafka1 kafka-topics --list --bootstrap-server localhost:9092
echo -e "\n=== 查看test-topic详情 ==="
docker exec kafka1 kafka-topics --describe --topic test-topic --bootstrap-server localhost:9092
echo -e "\n=== 查看消费者组 ==="
docker exec kafka1 kafka-consumer-groups --list --bootstrap-server localhost:9092
echo -e "\n=== Kafka UI访问地址 ==="
echo "http://localhost:8080"
#!/bin/bash
# kafka-performance-test.sh
echo "=== Kafka性能测试 ==="
echo "1. 创建测试主题"
docker exec kafka1 kafka-topics --create \
--topic performance-test \
--partitions 3 \
--replication-factor 3 \
--bootstrap-server localhost:9092
echo "2. 生产者性能测试"
docker exec kafka1 kafka-producer-perf-test \
--topic performance-test \
--num-records 100000 \
--record-size 1024 \
--throughput 10000 \
--producer-props bootstrap.servers=localhost:9092
echo "3. 消费者性能测试"
docker exec kafka1 kafka-consumer-perf-test \
--topic performance-test \
--messages 100000 \
--bootstrap-server localhost:9092
// 如果遇到连接问题,可以尝试以下配置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 1000);
props.put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 10000);
// 消费者配置
props.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, 1000);
props.put(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 10000);
# 在docker-compose中调整Kafka内存
environment:
KAFKA_HEAP_OPTS: "-Xmx2G -Xms2G"
KAFKA_JVM_PERFORMANCE_OPTS: "-XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"
访问 http://localhost:8080 可以查看:
# 在docker-compose中添加JMX配置
environment:
JMX_PORT: 9999
KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost"
这个完整的Docker集群搭建和Java测试方案提供了:
完整的集群配置:3节点Zookeeper + 3节点Kafka 高可用性:所有组件都有冗余 Java客户端示例:包含生产者和消费者的完整实现 高级特性:事务、异步回调、自定义分区等 监控工具:Kafka UI用于可视化监控 故障恢复:配置了持久化存储你可以根据需要调整配置参数,如分区数、副本因子、内存设置等,以适应不同的业务场景。