Curator Recipes是netfix开源的zookeeper客户端框架,因为zookeeper客户端在使用上很不方便,因此curator recipes对其进行了封装,并提供了十分丰富的功能。如下图所示。
基本涵盖了常用的分布式调度功能。那么这块他们是怎么做的。考虑到好的代码肯定做了很多优化,里边会有很多设计模式。但是作者目前还达不到那种一眼就看出其代码的精髓锁着,因此这块作者还是按照老样子。小了解其大概得轮廓。以后再复习设计模式的时候。再去思考这些能够真正提升自身功力的东西。
首先要使用curator提供的功能,需要导入相关的包
-
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
-
<dependency>
-
<groupId>org.apache.curator
</groupId>
-
<artifactId>curator-framework
</artifactId>
-
<version>5.1.0
</version>
-
</dependency>
-
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
-
<dependency>
-
<groupId>org.apache.curator
</groupId>
-
<artifactId>curator-recipes
</artifactId>
-
<version>5.1.0
</version>
-
</dependency>
这里我们将zookeeper客户端交给spring进行管理。
-
-
-
@Configuration
-
public
class TestCurd {
-
-
-
-
-
@Bean
-
public CuratorFramework main() {
-
RetryPolicy retryPolicy =
new ExponentialBackoffRetry(
1000,
3);
-
//创建连接对象
-
CuratorFramework client = CuratorFrameworkFactory.builder()
-
//IP地址端口号
-
.connectString(
"127.0.0.1:2181")
-
//客户端与服务器之间的会话超时时间
-
.sessionTimeoutMs(
1000000)
-
//当客户端与服务器之间会话超时3s后,进行一次重连
-
.retryPolicy(retryPolicy)
-
//命名空间,当我们创建节点的时候,以/create为父节点
-
.namespace(
"create")
-
//构建连接对象
-
.build();
-
//打开连接
-
client.start();
-
//是否成功建立连接,true :建立, false:没有建立
-
System.out.println(client.isStarted());
-
return client;
-
}
-
}
-
-
编写相关测试方法
-
@GetMapping(
value =
"/lock2")
-
public void lock2() throws Exception {
-
// 读写锁
-
InterProcessReadWriteLock interProcessReadWriteLock=
new InterProcessReadWriteLock(client,
"/lock1");
-
// 获取读锁对象
-
InterProcessLock interProcessLock=interProcessReadWriteLock.readLock();
-
System.
out.println(
"等待获取锁对象!");
-
// 获取锁
-
interProcessLock.acquire();
-
for (
int i =
1; i <=
10; i++) {
-
Thread.sleep(
3000);
-
System.
out.println(i);
-
}
-
// 释放锁
-
interProcessLock.release();
-
System.
out.println(
"等待释放锁!");
-
}
-
-
-
@GetMapping(
value =
"/lock3")
-
public void lock3() throws Exception {
-
// 读写锁
-
InterProcessReadWriteLock interProcessReadWriteLock=
new InterProcessReadWriteLock(client,
"/lock1");
-
// 获取写锁对象
-
InterProcessLock interProcessLock=interProcessReadWriteLock.writeLock();
-
System.
out.println(
"等待获取锁对象!");
-
// 获取锁
-
interProcessLock.acquire();
-
for (
int i =
1; i <=
10; i++) {
-
Thread.sleep(
3000);
-
System.
out.println(i);
-
}
-
// 释放锁
-
interProcessLock.release();
-
System.
out.println(
"等待释放锁!");
-
}
这块我们我们看到curator提供了读写锁。我们发现在初始化的时候。curator就已经将读锁和写锁进行了初始化。而我们真正在使用的时候也就是直接使用。
-
public InterProcessReadWriteLock(CuratorFramework client, String basePath) {
-
this(client, basePath, (
byte[])
null);
-
}
-
-
-
public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData) {
-
lockData = lockData ==
null ?
null : Arrays.copyOf(lockData, lockData.length);
-
//写锁
-
this.writeMutex =
new InterProcessReadWriteLock.InternalInterProcessMutex(client, basePath,
"__WRIT__", lockData,
1,
new InterProcessReadWriteLock.SortingLockInternalsDriver() {
-
public PredicateResults getsTheLock(CuratorFramework client, Listchildren, String sequenceNodeName, int maxLeases) throws Exception {
-
return
super.getsTheLock(client, children, sequenceNodeName, maxLeases);
-
}
-
});
-
//读锁
-
this.readMutex =
new InterProcessReadWriteLock.InternalInterProcessMutex(client, basePath,
"__READ__", lockData,
2147483647,
new InterProcessReadWriteLock.SortingLockInternalsDriver() {
-
public PredicateResults getsTheLock(CuratorFramework client, Listchildren, String sequenceNodeName, int maxLeases) throws Exception {
-
return InterProcessReadWriteLock.
this.readLockPredicate(children, sequenceNodeName);
-
}
-
});
-
}
获取读锁
-
//获取锁
-
public void acquire() throws Exception {
-
if (!
this.internalLock(-
1L, (TimeUnit)
null)) {
-
throw new IOException(
"Lost connection while trying to acquire lock: " +
this.basePath);
-
}
-
}
-
//获取锁
-
private boolean internalLock(long time, TimeUnit unit) throws Exception {
-
Thread currentThread = Thread.currentThread();
-
//通过绑定thread的方式对该线程重入的次数进行记录。
-
InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)
this.threadData.
get(currentThread);
-
if (lockData !=
null) {
-
//如果发生了重入,那么这里就将重入的次数进行加一操作
-
lockData.lockCount.incrementAndGet();
-
//表示获取到锁
-
return
true;
-
}
else {
-
//如果第一次加锁,或者中途获取锁失败。那么进行尝试
-
String lockPath =
this.internals.attemptLock(time, unit,
this.getLockNodeBytes());
-
if (lockPath !=
null) {
-
InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
-
this.threadData.put(currentThread, newLockData);
-
return
true;
-
}
else {
-
return
false;
-
}
-
}
-
}
-
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
-
long startMillis = System.currentTimeMillis();
-
Long millisToWait = unit !=
null ? unit.toMillis(time) :
null;
-
byte[] localLockNodeBytes =
this.revocable.get() !=
null ?
new
byte[
0] : lockNodeBytes;
-
int retryCount =
0;
-
String ourPath =
null;
-
boolean hasTheLock =
false;
-
boolean isDone =
false;
-
-
-
while(!isDone) {
-
isDone =
true;
-
-
-
try {
-
//通过初始化的driver获取锁
-
ourPath =
this.driver.createsTheLock(
this.client,
this.path, localLockNodeBytes);
-
//判断是否拿到锁,这里对读锁和写锁进行兼容。
-
hasTheLock =
this.internalLockLoop(startMillis, millisToWait, ourPath);
-
}
catch (NoNodeException var14) {
-
if (!
this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
-
throw var14;
-
}
-
-
-
isDone =
false;
-
}
-
}
-
-
-
return hasTheLock ? ourPath :
null;
-
}
获取锁
-
public
String createsTheLock(CuratorFramework client,
String path, byte[] lockNodeBytes) throws Exception {
-
String ourPath;
-
//通过我们注入到spring ioc中的client操作zk,通过判断是否存在该路径进行加锁
-
if (lockNodeBytes !=
null) {
-
ourPath = (
String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path, lockNodeBytes);
-
}
else {
-
ourPath = (
String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path);
-
}
-
//拿到路径之后,就返回
-
return ourPath;
判断是否拿到锁的根据是这里的maxLeasse,写锁这里为1,读锁为2147483647
-
public PredicateResults getsTheLock(CuratorFramework client, Listchildren, String sequenceNodeName, int maxLeases) throws Exception {
-
int ourIndex = children.indexOf(sequenceNodeName);
-
validateOurIndex(sequenceNodeName, ourIndex);
-
boolean getsTheLock = ourIndex < maxLeases;
-
String pathToWatch = getsTheLock ?
null : (String)children.get(ourIndex - maxLeases);
-
return
new PredicateResults(pathToWatch, getsTheLock);
-
}
通过上述分析,我们大概了解了curator做分布式锁的基本过程,通过对path路径的是否存在进行加锁。锁的重入是针对于线程本身来说的。在单个jvm中线程的中断对其他线程的轮询没有任何影响。只有当当前线程运行完毕并删除zk中的节点,其他线程才可以进行加锁。相反在读锁中,通过与数字2147483647进行对比来判断是否可以加锁。这里的2147483647就是读锁的上线。
在锁释放的这个问题上。我们看到也是通过从lackdata中获取重入的次数,然后进行递减的。因为这个lockdata和线程进行绑定。所以在线程轮转中是没有数据消失的问题的。
-
public void release() throws Exception {
-
Thread currentThread = Thread.currentThread();
-
//拿到当前线程的重入数据
-
InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)
this.threadData.
get(currentThread);
-
if (lockData ==
null) {
-
throw
new IllegalMonitorStateException(
"You do not own the lock: " +
this.basePath);
-
}
else {
-
//进行锁的重入次数的释放
-
int newLockCount = lockData.lockCount.decrementAndGet();
-
if (newLockCount <=
0) {
-
if (newLockCount <
0) {
-
throw
new IllegalMonitorStateException(
"Lock count has gone negative for lock: " +
this.basePath);
-
}
else {
-
try {
-
//如果锁被释放完毕。那么就开始真正的释放
-
this.internals.releaseLock(lockData.lockPath);
-
}
finally {
-
this.threadData.
remove(currentThread);
-
}
-
-
-
}
-
}
-
}
-
}
锁的释放也很简单,直接删除
-
final void releaseLock(String lockPath) throws Exception {
-
this.client.removeWatchers();
-
this.revocable.
set((Object)
null);
-
this.deleteOurPath(lockPath);
-
}
-
-
总结:通过分析,curator读写锁是通过对zk节点的存在与否进行判断的从而进行加锁的,对于读锁来说只有在不存在的时候线程才能加锁成功。通过将线程和重入次数的绑定,来实现的锁重入机制。当锁被释放之后,通过删除节点来通知其他线程进行加锁。对于读锁来说,单个线程最大或者的读锁数量也是有限制的。通过序列号的方式与写锁进行区别。读锁这块的详细实现作者还没有想明白,以后想明白了再补上。
转载:https://blog.csdn.net/tianjingle_blog/article/details/113533329