小言_互联网的博客

不会用java api对kafka topic进行创建、查询和删除,你也太out了(建议收藏)

314人阅读  评论(0)

大家好,我是道哥,专注于后端java开发,喜欢写作和分享。如果觉得文章对你有用,那就点个赞呗!如果能转发那是对道哥最大的支持!

相信很多后端程序员对kafka都不陌生,作为主流消息队列,而随着高并发业务的需求,消息队列在业务中的使用可以说是标配了,如果是你没听说过kafka,摆脱,好好补补课吧。道哥接下来会以通俗的语言介绍下kafka,不属于本篇讨论的。

控制台操作topic方法

topic最主流的操作方式当然是登录上控制台,通过控制台命令去操作。

创建topic

控制台创建

bin/kafka-topics.sh --create --topic topicname --replication-factor 1 --partitions 1 --zookeeper localhost:2181 

–topic 指定topic名字
–replication-factor 指定副本数,集群环境,这里副本数就为3或以上,具体由你的集群情况而定。
–partitions 指定分区数,这个参数需要根据broker数和数据量决定,正常情况下,每个broker上两个partition最好。

这种创建方式有个弊端就是,你需要连接到kafka所有在的集群,或者需要登录到某个broker,本机环境还好,一切很自然,如果是线上环境呢?你需要记住好多不同broker list,好麻烦。

开启自动创建

配置:auto.create.topics.enable=true

使用程序直接往kafka中相应的topic发送数据,如果topic不存在就会按默认配置进行创建。

这个弊端就更明显了,如果人家集群默认开启了这种方式还好,如果人家没有开启呢?你需要去修改配置啊。而且开启这个参数,可能会导致topic泛滥。

最好的方式是因需而建topic,目前主流的编程语言是java,我们就演示一下如何通过java api进行kafka topic的创建、查询和删除。

查看topic列表

bin/kafka-topics.sh --list --zookeeper localhost:2181

删除topic

删除topic命令

 bin/kafka-topics.sh --delete --zookeeper 192.168.242.131:2181 --topic aaa

注:此命令如果要生效,还需在server.properties中添加:
delete.topic.enable=true
控制台也会有返回提示信息:
Topic aaa is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

通过java api操作topics

利其器

工欲善其事必先利其器,这种需求,我们想到了,kafka的编写大神们肯定也想到了啊,人家给我们提供了现成的包可以用,我们只需要通过maven把依赖引入进来,进行简单的调用即可。
这里我们使用kafka-clients,0.11.0.3版本。

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.3</version>
        </dependency>

善其事

像通过bash控制台命令操作kafka集群一样,通过api,首先你也得连接上目标服务器啊。

  1. 连接bootstrap.servers
    其中kafkaBroker是你实际kafka集群接入点。
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaBroker);
  1. 创建adminClient
adminClient = KafkaAdminClient.create(properties);

创建topic

注意kafka topic的名字不可以包含中文字符,不然是无法创建的。分区数,通常建议为12,其初期最好评估好消费节点数和能力来设置此值;副本数,建议值是2-3,可根据业务需求和发送延迟需要进行设置,影响数据可靠性。

重复创建已有的topic会抛出异常。

createTopics() throws ExecutionException, InterruptedException {
        try {
            NewTopic newTopic = new NewTopic("topic-name", /*topic名称,不建议过长*/
                12 , 
                (short)2 
            );
            adminClient.createTopics(Collections.singleton(newTopic), new CreateTopicsOptions().timeoutMs(10000))
                .all()
                .get();
        } catch (ExecutionException e) {
            if (e.getMessage().startsWith("org.apache.kafka.common.errors.TopicExistsException")) {
                System.out.println("topic is exist !! " + e.getMessage());
            } else {
                throw e;
            }
        }
    }

查询topic列表

List<String> queryCurrentTopics() throws InterruptedException, ExecutionException, TimeoutException {
        List<String> list = new ArrayList<>();
        adminClient.listTopics()
            .listings()
            .get(1, TimeUnit.MINUTES)
            .forEach(topicListing -> {
                System.out.println("currentTopic is: " + topicListing.name());
                list.add(topicListing.name());
            });
        return list;
    }

删除topic

void deleteTopics() throws ExecutionException, InterruptedException {
        adminClient.deleteTopics(Collections.singleton("hello"))
            .all()
            .get();
    }

能够看到这里的都是真爱学习啊,觉得有用的话,就点个赞。转发是对道哥最大的鼓励,点个关注,就不会错过道哥的更新了。
程序员的小伙伴们,学习之路,同行的人越多才可以走的更远,加入公众号[程序员之道],一起交流沟通,走出我们的程序员之道!


转载:https://blog.csdn.net/chanllenge/article/details/105170090
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场