上一节,我们实现了搭建kafka集群。本节我们将从0开始,使用Java,搭建kafka客户端生产消费模型。
[TOC]
1.创建maven项目
首先我们使用idea创建项目。

这里我们使用maven来管理jar包,所以创建的是一个maven项目。

然后输入GroupId和ArtifactId即可,这两个id在maven中相当于“坐标”,其中ArtifactId是你的项目名。

这时候,一个maven项目就创建完成了。但是maven还需要配置,我们在idea中找到Preferences(mac系统快捷键:command + ,),搜索maven,接着配置maven home directory(maven安装路径),User settings file(maven 配置文件所在位置 settings.xml), Local repository(本地仓库位置,在setting.xml中配置),配置完成后,点击APPLY即可。到此,一个maven项目就配置完成了。
最后,我们需要在pom.xml中配置kafka依赖的坐标
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.3</version>
</dependency>
</dependencies>
2.kafka producer
接下来,我们要对kafka中的生产者进行开发。在开发之前,要保证我们kafka服务处于可用的状态。
生产者程序如下:
public class Producer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "-1");
props.put("retries", 3);
props.put("batch.size", 232840);
props.put("linger.ms", 10);
props.put("buffer.memory", 33554432);
props.put("max.block.ms", 3000);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>(
"my-topic", Integer.toString(i), Integer.toString(i)), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
System.out.println("消息发送成功");
} else {
System.out.println(String.format("消息发送失败: %s", e.getMessage()));
}
}
});
}
producer.close();
}
}
3.kafka consumer
接下来是kafka中的消费者代码。
public class Consumer {
public static void main(String[] args) {
String topicName = "my-topic";
String groupId = "test-group";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList(topicName));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
}
4.结果
最后是对代码进行测试。依次启动生产者、消费者实例,观察控制台输出接口。
生产者:
消息发送成功
...
消息发送成功
消费者:
offset = 0, key = 0, value = 0
...
offset = 99, key = 99, value = 99
可能遇到的坑:
如果你的程序出错,请首先检查代码props是否正确,其次应该确认你的kafka client和kafka server 版本相同,最后bootstrap.servers这个参数的配置值,要与kafka中server.properties中一致,否则将会出现获取不到元数据信息的异常。(消息发送失败: Failed to update metadata after 3000 ms.)
最后:
上述代码,只是实现了一个最小的生产消费模型,写法上并不规范(比如配置应该写在配置文件中、对于异常应该有处理方法,不能只是输出日志),不能直接使用在生产环境中。只能用于kafka入门学习。
最后,期待您的订阅和点赞,专栏每周都会更新,希望可以和您一起进步,同时也期待您的批评与指正!

上一节,我们实现了搭建kafka集群。本节我们将从0开始,使用Java,搭建kafka客户端生产消费模型。
[TOC]
1.创建maven项目
首先我们使用idea创建项目。

这里我们使用maven来管理jar包,所以创建的是一个maven项目。

然后输入GroupId和ArtifactId即可,这两个id在maven中相当于“坐标”,其中ArtifactId是你的项目名。

这时候,一个maven项目就创建完成了。但是maven还需要配置,我们在idea中找到Preferences(mac系统快捷键:command + ,),搜索maven,接着配置maven home directory(maven安装路径),User settings file(maven 配置文件所在位置 settings.xml), Local repository(本地仓库位置,在setting.xml中配置),配置完成后,点击APPLY即可。到此,一个maven项目就配置完成了。
最后,我们需要在pom.xml中配置kafka依赖的坐标
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.3</version>
</dependency>
</dependencies>
2.kafka producer
接下来,我们要对kafka中的生产者进行开发。在开发之前,要保证我们kafka服务处于可用的状态。
生产者程序如下:
public class Producer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "-1");
props.put("retries", 3);
props.put("batch.size", 232840);
props.put("linger.ms", 10);
props.put("buffer.memory", 33554432);
props.put("max.block.ms", 3000);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>(
"my-topic", Integer.toString(i), Integer.toString(i)), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
System.out.println("消息发送成功");
} else {
System.out.println(String.format("消息发送失败: %s", e.getMessage()));
}
}
});
}
producer.close();
}
}
3.kafka consumer
接下来是kafka中的消费者代码。
public class Consumer {
public static void main(String[] args) {
String topicName = "my-topic";
String groupId = "test-group";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList(topicName));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
}
4.结果
最后是对代码进行测试。依次启动生产者、消费者实例,观察控制台输出接口。
生产者:
消息发送成功
...
消息发送成功
消费者:
offset = 0, key = 0, value = 0
...
offset = 99, key = 99, value = 99
可能遇到的坑:
如果你的程序出错,请首先检查代码props是否正确,其次应该确认你的kafka client和kafka server 版本相同,最后bootstrap.servers这个参数的配置值,要与kafka中server.properties中一致,否则将会出现获取不到元数据信息的异常。(消息发送失败: Failed to update metadata after 3000 ms.)
最后:
上述代码,只是实现了一个最小的生产消费模型,写法上并不规范(比如配置应该写在配置文件中、对于异常应该有处理方法,不能只是输出日志),不能直接使用在生产环境中。只能用于kafka入门学习。
最后,期待您的订阅和点赞,专栏每周都会更新,希望可以和您一起进步,同时也期待您的批评与指正!

上一节,我们实现了搭建kafka集群。本节我们将从0开始,使用Java,搭建kafka客户端生产消费模型。
[TOC]
1.创建maven项目
首先我们使用idea创建项目。

这里我们使用maven来管理jar包,所以创建的是一个maven项目。

然后输入GroupId和ArtifactId即可,这两个id在maven中相当于“坐标”,其中ArtifactId是你的项目名。

这时候,一个maven项目就创建完成了。但是maven还需要配置,我们在idea中找到Preferences(mac系统快捷键:command + ,),搜索maven,接着配置maven home directory(maven安装路径),User settings file(maven 配置文件所在位置 settings.xml), Local repository(本地仓库位置,在setting.xml中配置),配置完成后,点击APPLY即可。到此,一个maven项目就配置完成了。
最后,我们需要在pom.xml中配置kafka依赖的坐标
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.3</version>
</dependency>
</dependencies>
2.kafka producer
接下来,我们要对kafka中的生产者进行开发。在开发之前,要保证我们kafka服务处于可用的状态。
生产者程序如下:
public class Producer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "-1");
props.put("retries", 3);
props.put("batch.size", 232840);
props.put("linger.ms", 10);
props.put("buffer.memory", 33554432);
props.put("max.block.ms", 3000);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>(
"my-topic", Integer.toString(i), Integer.toString(i)), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
System.out.println("消息发送成功");
} else {
System.out.println(String.format("消息发送失败: %s", e.getMessage()));
}
}
});
}
producer.close();
}
}
3.kafka consumer
接下来是kafka中的消费者代码。
public class Consumer {
public static void main(String[] args) {
String topicName = "my-topic";
String groupId = "test-group";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList(topicName));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
}
4.结果
最后是对代码进行测试。依次启动生产者、消费者实例,观察控制台输出接口。
生产者:
消息发送成功
...
消息发送成功
消费者:
offset = 0, key = 0, value = 0
...
offset = 99, key = 99, value = 99
可能遇到的坑:
如果你的程序出错,请首先检查代码props是否正确,其次应该确认你的kafka client和kafka server 版本相同,最后bootstrap.servers这个参数的配置值,要与kafka中server.properties中一致,否则将会出现获取不到元数据信息的异常。(消息发送失败: Failed to update metadata after 3000 ms.)
最后:
上述代码,只是实现了一个最小的生产消费模型,写法上并不规范(比如配置应该写在配置文件中、对于异常应该有处理方法,不能只是输出日志),不能直接使用在生产环境中。只能用于kafka入门学习。
最后,期待您的订阅和点赞,专栏每周都会更新,希望可以和您一起进步,同时也期待您的批评与指正!
