大家好,我是道哥,专注于后端java开发,喜欢写作和分享。如果觉得文章对你有用,那就点个赞呗!如果能转发那是对道哥最大的支持!
相信很多后端程序员对kafka都不陌生,作为主流消息队列,而随着高并发业务的需求,消息队列在业务中的使用可以说是标配了,如果是你没听说过kafka,摆脱,好好补补课吧。道哥接下来会以通俗的语言介绍下kafka,不属于本篇讨论的。
控制台操作topic方法
topic最主流的操作方式当然是登录上控制台,通过控制台命令去操作。
创建topic
控制台创建
bin/kafka-topics.sh --create --topic topicname --replication-factor 1 --partitions 1 --zookeeper localhost:2181
–topic 指定topic名字
–replication-factor 指定副本数,集群环境,这里副本数就为3或以上,具体由你的集群情况而定。
–partitions 指定分区数,这个参数需要根据broker数和数据量决定,正常情况下,每个broker上两个partition最好。
这种创建方式有个弊端就是,你需要连接到kafka所有在的集群,或者需要登录到某个broker,本机环境还好,一切很自然,如果是线上环境呢?你需要记住好多不同broker list,好麻烦。
开启自动创建
配置:auto.create.topics.enable=true
使用程序直接往kafka中相应的topic发送数据,如果topic不存在就会按默认配置进行创建。
这个弊端就更明显了,如果人家集群默认开启了这种方式还好,如果人家没有开启呢?你需要去修改配置啊。而且开启这个参数,可能会导致topic泛滥。
最好的方式是因需而建topic,目前主流的编程语言是java,我们就演示一下如何通过java api进行kafka topic的创建、查询和删除。
查看topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
删除topic
删除topic命令
bin/kafka-topics.sh --delete --zookeeper 192.168.242.131:2181 --topic aaa
注:此命令如果要生效,还需在server.properties中添加:
delete.topic.enable=true
控制台也会有返回提示信息:
Topic aaa is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
通过java api操作topics
利其器
工欲善其事必先利其器,这种需求,我们想到了,kafka的编写大神们肯定也想到了啊,人家给我们提供了现成的包可以用,我们只需要通过maven把依赖引入进来,进行简单的调用即可。
这里我们使用kafka-clients,0.11.0.3版本。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.3</version>
</dependency>
善其事
像通过bash控制台命令操作kafka集群一样,通过api,首先你也得连接上目标服务器啊。
- 连接bootstrap.servers
其中kafkaBroker是你实际kafka集群接入点。
Properties properties = new Properties();
properties.put("bootstrap.servers", kafkaBroker);
- 创建adminClient
adminClient = KafkaAdminClient.create(properties);
创建topic
注意kafka topic的名字不可以包含中文字符,不然是无法创建的。分区数,通常建议为12,其初期最好评估好消费节点数和能力来设置此值;副本数,建议值是2-3,可根据业务需求和发送延迟需要进行设置,影响数据可靠性。
重复创建已有的topic会抛出异常。
createTopics() throws ExecutionException, InterruptedException {
try {
NewTopic newTopic = new NewTopic("topic-name", /*topic名称,不建议过长*/
12 ,
(short)2
);
adminClient.createTopics(Collections.singleton(newTopic), new CreateTopicsOptions().timeoutMs(10000))
.all()
.get();
} catch (ExecutionException e) {
if (e.getMessage().startsWith("org.apache.kafka.common.errors.TopicExistsException")) {
System.out.println("topic is exist !! " + e.getMessage());
} else {
throw e;
}
}
}
查询topic列表
List<String> queryCurrentTopics() throws InterruptedException, ExecutionException, TimeoutException {
List<String> list = new ArrayList<>();
adminClient.listTopics()
.listings()
.get(1, TimeUnit.MINUTES)
.forEach(topicListing -> {
System.out.println("currentTopic is: " + topicListing.name());
list.add(topicListing.name());
});
return list;
}
删除topic
void deleteTopics() throws ExecutionException, InterruptedException {
adminClient.deleteTopics(Collections.singleton("hello"))
.all()
.get();
}
能够看到这里的都是真爱学习啊,觉得有用的话,就点个赞。转发是对道哥最大的鼓励,点个关注,就不会错过道哥的更新了。
程序员的小伙伴们,学习之路,同行的人越多才可以走的更远,加入公众号[程序员之道],一起交流沟通,走出我们的程序员之道!
转载:https://blog.csdn.net/chanllenge/article/details/105170090