前言
配置一词,想必大家都不陌生。毕竟程序员总是收到这样的叮嘱:不要写死。也就是要把一些不经常变化的,可以统一维护的信息放在配置文件中,这样当配置发生变化时就不用重新编译代码。
在单机系统中,往往几个配置文件足矣。自然就是放在固定的目录下,程序启动时加载即可。即使修改配置,也不需要修改很多地方。但是,分布式系统是大势所趋,未来会作为程序员的基本技能。如果原先的系统被拆分成了很多个节点来部署,每次配置文件发生了变化,就需要把每个节点上的配置文件都修改一次。作为一个有追求的程序员,我们肯定不能干这事。
于是,我们把配置文件统一放在一个地方,配置文件修改了之后,能够及时通知所有节点重新加载。这样即使再多的节点,也只需要修改一次配置文件。这个管理统一配置的系统就是统一配置中心。
本文主要用zookeeper的watch
机制结合响应式API,实现简易的统一配置中心。
实现
可选的统一配置中心也不少,这里列出几个主要的,有兴趣的同学自己研究下
统一配置中心的核心功能之一就是配置变化后,及时通知所有的节点。zookeeper的watch
机制正好可以满足这样的需求,所以本文利用zookeeper的watch
机制来实现。不熟悉zookeeper异步非阻塞API的朋友可以先看这篇练练手:Java操作Zookeeper
整个实现过程基于响应式编程模型,所以一步步讲解不是很方便。有兴趣的同学可以把源码拷下来多运行几次,自然会慢慢弄懂。
获取对象
首先肯定要获取zookeeper对象
package com.sicimike.zk;
import com.google.common.base.Joiner;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
/**
* @author sicimike
* @create 2020-05-03 21:15
*/
public class ZookeeperUtil {
private static final String[] ZK_SERVER = {
"192.168.1.101:2181", "192.168.1.102:2181",
"192.168.1.103:2181", "192.168.1.104:2181",
};
private static final int SESSION_TIMEOUT = 3000;
private static final CountDownLatch latch = new CountDownLatch(1);
private static ZooKeeper zooKeeper;
public static ZooKeeper newInstance() throws IOException, InterruptedException {
zooKeeper = new ZooKeeper(Joiner.on(",").join(ZK_SERVER), SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
switch (event.getState()) {
case SyncConnected:
latch.countDown();
break;
case Expired:
break;
}
}
});
latch.await();
return zooKeeper;
}
}
实现监听器
本文是利用zookeeper的watch
机制来实现配置的更新通知,所以需要实现监听器来注册事件。因为这个监听器需要同时监听Znode创建、删除、以及Znode数据被修改等事件,所以需要实现三个接口。
package com.sicimike.zk;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
/**
* @author sicimike
* @create 2020-05-03 21:18
*/
public class CommonWatcher implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
private CountDownLatch latch = new CountDownLatch(1);
private DF17Config config;
private ZooKeeper zooKeeper;
public void await() throws InterruptedException {
zooKeeper.exists(config.getPath(), this, this, "");
latch.await();
}
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case None:
System.out.println("None even");
break;
case NodeCreated:
System.out.println("NodeCreated even");
zooKeeper.getData(config.getPath(), this, this, "");
break;
case NodeDeleted:
System.out.println("NodeDeleted even");
config.setValue("");
latch = new CountDownLatch(1);
break;
case NodeDataChanged:
System.out.println("NodeDataChanged even");
zooKeeper.getData(config.getPath(), this, this, "");
break;
case NodeChildrenChanged:
System.out.println("NodeChildrenChanged even");
break;
}
}
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if (stat != null) {
zooKeeper.getData(path, this, this, "");
}
}
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
if (data != null) {
config.setValue(new String(data));
latch.countDown();
}
}
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
public void setConfig(DF17Config config) {
this.config = config;
}
}
核心配置
核心配置类也就是业务真正需要的类,包含了业务需要的所有配置,此处的配置内容抽象成一个String
。
package com.sicimike.zk;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.Objects;
/**
* 配置文件
*
* @author sicimike
* @create 2020-05-03 21:21
*/
public class DF17Config {
private static final String CONFIG_PATH = "/DF-config";
private String path;
private String value;
public static void getConfig() throws InterruptedException, IOException {
ZooKeeper zooKeeper = ZookeeperUtil.newInstance();
DF17Config config = new DF17Config();
config.setPath(CONFIG_PATH);
CommonWatcher commonWatcher = new CommonWatcher();
commonWatcher.setZooKeeper(zooKeeper);
commonWatcher.setConfig(config);
commonWatcher.await();
while (true) {
if (Objects.equals(config.getValue(), "")) {
commonWatcher.await();
}
System.out.println("config: " + config.getValue());
Thread.sleep(3000);
}
}
public void setPath(String path) {
this.path = path;
}
public String getPath() {
return path;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
测试类
package com.sicimike.zk;
import java.io.IOException;
/**
* @author sicimike
* @create 2020-05-03 12:05
*/
public class ZookeeperDemo {
public static void main(String[] args) throws IOException, InterruptedException {
DF17Config.getConfig();
}
}
执行
执行ZookeeperDemo
类,等待程序连上zookeeper之后,再利用zkCli
客户端操作产生事件,查看程序的输出。操作的目录为/DF-config
。
NodeCreated even
config: hello DF-17
config: hello DF-17
config: hello DF-17
NodeDataChanged even
config: hello DF-18
config: hello DF-18
config: hello DF-18
NodeDeleted even
NodeCreated even
config: hello DF-17
config: hello DF-17
config: hello DF-17
此处博主先执行
create /DF-config "hello DF-17"
可以看到程序检测到了NodeCreated even
事件,并且输出了最新的配置。然后执行
set /DF-config "hello DF-18"
程序检测到了NodeDataChanged even
事件,并输出了最新的配置。接着执行
delete /DF-config
程序检测到了NodeDeleted even
事件,并被阻塞,停止了输出,因为删除节点,相当于配置也丢失了。
最后再次创建这个Znode,程序依然可以继续执行。
总结
本文利用zookeeper简单实现了配置中心的核心功能,主要是为了熟悉zookeeper的核心API以及初步尝试响应式编程。
转载:https://blog.csdn.net/Baisitao_/article/details/105908681