博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
zookeeper适用场景:分布式锁实现
阅读量:6263 次
发布时间:2019-06-22

本文共 4240 字,大约阅读时间需要 14 分钟。

问题导读:

1.zookeeper如何实现分布式锁?
2.什么是羊群效应?
3.zookeeper如何释放锁?
在有关于分布式集群配置文件同步问题的描述,设想一下如果有100台机器同时对同一台机器上某个文件进行修改,如何才能保证文本不会被写乱,这就是最简单的分布式锁,本文介绍利用zk实现分布式锁。下面是写锁的实现步骤
分布式写锁
create一个PERSISTENT类型的znode,/Locks/write_lock

  • 创建SEQUENCE|EPHEMERAL类型的znode,名字是lockid开头,创建的znode是/Locks/write_lock/lockid0000000001
  • 调用getChildren()不要设置Watcher获取/Locks/write_lock下的znode列表
  • 判断自己步骤2创建znode是不是znode列表中最小的一个,如果是就代表获得了锁,如果不是往下走
  • 调用exists()判断步骤2自己创建的节点编号小1的znode节点(也就是获取的znode节点列表中最小的znode),并且设置Watcher,如果exists()返回false,执行步骤3
  • 如果exists()返回true,那么等待zk通知,从而在回掉函数里返回执行步骤3

释放锁就是删除znode节点或者断开连接就行
*注意:上面步骤2中getChildren()不设置Watcher的原因是,防止羊群效应,如果getChildren()设置了Watcher,那么集群一抖动都会收到通知。在整个分布式锁的竞争过程中,大量重复运行,并且绝大多数的运行结果都是判断出自己并非是序号最小的节点,从而继续等待下一次通知—,这个显然看起来不怎么科学。客户端无端的接受到过多的和自己不相关的事件通知,这如果在集群规模大的时候,会对Server造成很大的性能影响,并且如果一旦同一时间有多个节点的客户端断开连接,这个时候,就会像其余客户端发送大量的事件通知——这就是所谓的羊群效应。
下面是代码实现

import sysclass GJZookeeper(object):    ZK_HOST = "localhost:2181"    ROOT = "/Locks"    WORKERS_PATH = join(ROOT,"write_lock")    MASTERS_NUM = 1    TIMEOUT = 10000    def __init__(self, verbose = True):        self.VERBOSE = verbose        self.masters = []        self.is_master = False        self.path = None        self.zk = ZKClient(self.ZK_HOST, timeout = self.TIMEOUT)        self.say("login ok!")        # init        self.__init_zk()        # register        self.register()    def __init_zk(self):        """        create the zookeeper node if not exist        |--Locks                |--write_lock        """        nodes = (self.ROOT, self.WORKERS_PATH)        for node in nodes:             if not self.zk.exists(node):                try:                    self.zk.create(node, "")                except:                    pass    def register(self):        """        register a node for this worker        |--Locks                |--write_lock                            |--lockid000000000x ==> hostname        """        import socket        hostname = socket.gethostname()        self.path = self.zk.create(self.WORKERS_PATH + "/lockid", hostname, flags=zookeeper.EPHEMERAL | zookeeper.SEQUENCE)        self.lockid = basename(self.path)        self.say("register ok! I'm %s" % self.path)        # check who is the master        self.get_lock()    def get_lock(self):        """        get children znode try to get lock        """        @watchmethod        def watcher(event):            self.say("child changed, try to get lock again.")            self.get_lock()        children = self.zk.get_children(self.WORKERS_PATH)        children.sort()        min_lock_id = children[0]        self.say("%s's children: %s" % (self.WORKERS_PATH, children))         if cmp(self.lockid,min_lock_id) == 0:            self.get_lock_success()            return True        elif cmp(self.lockid,min_lock_id) > 0:            index = children.index(self.lockid)            new_lockid_watch = join(self.WORKERS_PATH,children[index-1])            self.say("Add watch on %s"%new_lockid_watch)            res = self.zk.exists(new_lockid_watch,watcher)            if not res :                """代表没有存在之前小的锁,但是这并不意味着能拿到锁了,因为还可能有比当前还小的锁,还没轮到,要重新执行一遍"""#               self.get_lock_success()                return False            else :                """现在的锁有人在使用,等他释放了再抢"""                self.say("I can not get the lock this time,wait for the next time")                return False    def get_lock_success(self):        self.say("I get the lock !!!")        self.write_file()        self.zk.delete(join(self.WORKERS_PATH,self.lockid))        self.say("I release the lock !!!")        sys.exit(1)    def write_file(self):        fd = open("lock.log",'a')        fd.write("%s\n"%self.lockid)        fd.close()    def say(self, msg):        """        print messages to screen        """        if self.VERBOSE:            if self.path:                log.info("[ %s(%s) ] %s" % (self.path, "master" if self.is_master else "slave", msg))            else:                log.info(msg)def start_get_lock():    gj_zookeeper = GJZookeeper()def main():    th1 = threading.Thread(target = start_get_lock, name = "thread_1", args = ())    th1.start()    th1.join()    if __name__ == "__main__":    main()    time.sleep(1000)

  文章转自:http://www.aboutyun.com/forum.php?mod=viewthread&tid=9267&ctid=16

 

转载地址:http://vfzpa.baihongyu.com/

你可能感兴趣的文章
PHP表单提交验证各种方式
查看>>
ASP.net获取当前页面的文件名,参数,域名等方法
查看>>
Java反射内部类
查看>>
vxlan和vlan数据报文
查看>>
jQuery中其他
查看>>
(十四) Java B2B2C多用户商城 springboot架构- Spring Cloud构建分布式电子商务平台
查看>>
Spss统计描述分析
查看>>
快排-C语言实现
查看>>
Oracle11完全卸载方法
查看>>
实例变量和属性(转)
查看>>
HTML笔记
查看>>
php7安装步骤
查看>>
c# WPF客户端调用WebAPI并转换成List
查看>>
洛谷 2634&&BZOJ 2152: 聪聪可可【点分治学习+超详细注释】
查看>>
loadrunner
查看>>
JavaScript数组去重
查看>>
LeetCode:20. Valid Parentheses(Easy)
查看>>
2017-5-16 类
查看>>
loadView的用法
查看>>
5只蚂蚁走木棍问题
查看>>