zookeeper从小白到精通

博客 分享
0 251
张三
张三 2022-03-27 17:57:13
悬赏:0 积分 收藏

zookeeper从小白到精通

目录
  • 1.介绍
    • 1.1概念
    • 1.2特点
    • 1.3主要的集群步骤
    • 1.4数据结构
    • 1.5应用场景
  • 2.本地安装
    • 2.1安装jdk
    • 2.2下载安装
    • 2.3配置文件修改
    • 2.4启动服务端
    • 2.5启动客户端
    • 2.6zookeeper常用命令
    • 2.7配置文件解读
  • 3.集群安装
    • 3.1集群规划
    • 3.2安装
    • 3.3配置
    • 3.4启动zookeeper集群
  • 4.选举机制
    • 4.1触发选举时机
    • 4.2zookeeper选举机制---第一次启动
    • 4.2zookeeper选举机制---非第一次启动
    • 4.3选举机制总结
  • 5.客户端命令行操作
    • 5.1常用命令整合
    • 5.2启动客户端
    • 5.3znode节点数据信息
    • 5.4节点类型(持久/短暂/有序号/无序号)
    • 5.5节点操作
      • 5.5.1创建/删除节点
      • 5.5.2获取/查看的值
    • 5.6节点监听
      • 5.6.1监听节点值改变
      • 5.6.2监听节点下子节点数量改变
  • 6.goang操作zookeeper
    • 6.1创建节点create(增)
    • 6.2查看节点get(查)
    • 6.3修改节点set(改)
    • 6.4删除节点delete(删)
    • 6.5查看节点的子节点Children
    • 6.6遍历节点Children后再get
    • 6.7判断节点是否存在-conn.Exists
  • 7.监听/watch
    • 7.1监听节点-全局监听
    • 7.2监听部分事件
  • 8.微服务动态上下线监听(服务注册/发现)
    • 8.1需求实现
    • 8.2服务端创建代码-(注册服务)
    • 8.3客户端监听代码-(服务发现)
  • 9.分布式锁
    • 9.1分布式锁案例
    • 9.2监控锁案例

1.介绍

1.1概念

zookeeper作用:用于维护配置信息、命名、提供分布式同步和提供组服务zookeeper主要是文件系统和通知机制文件系统主要是用来存储数据通知机制主要是服务器或者客户端进行通知,并且监督基于观察者模式设计的分布式服务管理框架,开源的分布式框架

1.2特点

1):一个leader,多个follower的集群2):集群只要有半数以上包括半数就可正常服务,一般安装奇数台服务器3):全局数据一致,每个服务器都保存同样的数据,实时更新4):更新的请求顺序保持顺序(来自同一个服务器)5):数据更新的原子性,数据要么成功要么失败(大事务)6):数据实时更新性很快(因为数据量很小)

1.3主要的集群步骤

1):服务端启动时去注册信息(创建都是临时节点)2):获取到当前在线服务器列表,并且注册监听3):服务器节点下线4):服务器节点上下线事件通知5):process(){重新再去获取服务器列表,并注册监听}

1.4数据结构

与 Unix 文件系统很类似,可看成树形结构,每个节点称做一个ZNode。每一个ZNode默认能够存储 1MB 的数据。也就是只能存储小数据(一般配置信息,注册信息等)

1.5应用场景

1):统一命名服务(域名服务)    在分布式环境下,经常需要对应用/服务进行统一命名,便于识别,eg:ip不容易记住,而域名容易记住2):统一配置管理(一个集群中的所有配置都一致,且也要实时更新同步)3):将配置信息写入ZooKeeper上的一个Znode,各个客户端服务器监听这个Znode。一旦Znode中的数据被修改,ZooKeeper将通知各个客户端服务器4):统一集群管理(掌握实时状态)5):将节点信息写入ZooKeeper上的一个ZNode。监听ZNode获取实时状态变化6):服务器节点动态上下线7):软负载均衡(根据每个节点的访问数,让访问数最少的服务器处理最新的数据需求)

2.本地安装

2.1安装jdk

1.查看是否已安装JDKyum list installed |grep java 2.卸载CentOS系统Java环境# yum -y remove java-1.8.0-openjdk*   *表示卸载所有openjdk相关文件输入 # yum -y remove tzdata-java.noarch       卸载tzdata-java  3.查看JDK软件包版本# yum -y list java*      或者使用# yum searchjava | grep -i --color JDK 4.安装yum install java-1.8.0-openjdk*  //安装java1.8.0所有程序java -version //查看java版本信息注意:使用yum安装环境变量自动就配好了

2.2下载安装

官网:https://zookeeper.apache.org/下载3.5.7稳定版本下载:https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz解压:tar -xf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/改名:mv apache-zookeeper-3.5.7-bin zookeeper-3.5.7配置文件改名:mv zoo_sample.cfg zoo.cfg
bin目录 框架启动停止,客户端和服务端的conf 配置文件信息docs文档lib 配置文档的依赖

2.3配置文件修改

配置文件:/opt/module/zookeeper-3.5.7/conf/zoo.cfg

修改:dataDir = /opt/module/zookeeper-3.5.7/zkData // 通常修改的路径

2.4启动服务端

cd /opt/module/zookeeper-3.5.7/bin[root@sg-15 bin]# ./zkServer.sh start  // 启动服务端//查看进程[root@sg-15 bin]# jps -l21172 sun.tools.jps.Jps21110 org.apache.zookeeper.server.quorum.QuorumPeerMain

2.5启动客户端

[root@sg-15 bin]# ./zkCli.sh  // 启动客户端[zk: localhost:2181(CONNECTED) 4] ls /[zookeeper][zk: localhost:2181(CONNECTED) 5] quit //退出客户端[root@sg-15 bin]# ./zkServer.sh status // 查看zookeeper状态/usr/bin/javaZooKeeper JMX enabled by defaultUsing config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfgClient port found: 2181. Client address: localhost.Mode: standalone

2.6zookeeper常用命令

[root@sg-15 bin]# ./zkServer.sh start  // 启动服务端[root@sg-15 bin]# ./zkServer.sh stop   // 停止服务端[root@sg-15 bin]# jps -l               // 查看进程[root@sg-15 bin]# ./zkCli.sh           // 启动客户端[zk: localhost:2181(CONNECTED) 5] quit // 退出客户端[root@sg-15 bin]# ./zkServer.sh status // 查看zookeeper状态

2.7配置文件解读

配置文件的5大参数:tickTime = 2000  //通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒initLimit = 10   //Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量)syncLimit = 5    //Leader和Follower之间通信时间如果超过5个心跳时间,Leader认为Follwer死掉,从服务器列表中删除Follwer。dataDir =/tmp/zookeeper //保存zookeeper的数据,这是默认值,会定时被系统清除dataDir保存zookeeper的数据,默认是temp会被系统定期清除,通常改为自己的路径dataDir = /opt/module/zookeeper-3.5.7/zkData // 通常修改的路径clientPort = 2181  //客户端的连接端口,一般不需要修改

3.集群安装

3.1集群规划

sg15,sg16,sg17三台机器部署Zookeeper

3.2安装

解压:tar -xf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/改名:mv apache-zookeeper-3.5.7-bin zookeeper-3.5.7配置文件改名:mv zoo_sample.cfg zoo.cfg

3.3配置

[root@sg-15 zookeeper-3.5.7]# mkdir zkData  // 创建目录zkData[root@sg-15 zkData]# vi myid                // 创建一个 myid 的文件在文件中添加与 server 对应的编号(注意:上下不要有空行,左右不要有空格)5    // 192.168.0.215,这里设置服务器尾号,其他的不重复就行[root@sg-15 conf]# mv zoo_sample.cfg zoo.cfg  // 配置文件改名[root@sg-15 conf]# vi zoo.cfg                //修改配置文件dataDir = /opt/module/zookeeper-3.5.7/zkData // 通常修改的路径// 添加配置#######################cluster##########################server.5=192.168.0.215:2888:3888server.6=192.168.0.216:2888:3888server.7=192.168.0.217:2888:3888##配置参数解读server.A=B:C:DA 是一个数字,表示这个是第几号服务器,就是myid中的值B 是这个服务器的地址C 是这个服务器Follower 与集群中的 Leader 服务器交换信息的端口;D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。注意:一台机器弄好之后,打包发送到其他机器。其他机器修改myid值tar -cf module.tar ./module  // 打包scp -r module.tar root@192.168.0.216:/opt/  //发送tar -xf module.tar          // 解包

3.4启动zookeeper集群

注意:集群只要有半数以上包括半数就可正常服务。三台机器启动:[root@sg-15 bin]# ./zkServer.sh start    //启动[root@sg-15 bin]# ./zkServer.sh restart  //重启[root@sg-17 bin]# ./zkServer.sh status   //检查状态/usr/bin/javaZooKeeper JMX enabled by defaultUsing config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfgClient port found: 2181. Client address: localhost.Mode: leader                  //本机器为leader[root@sg-15 bin]# ./zkServer.sh status/usr/bin/javaZooKeeper JMX enabled by defaultUsing config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfgClient port found: 2181. Client address: localhost.Mode: follower           //本机器为follower

4.选举机制

选举谁当leader介绍:SID:服务器ID。用来唯一标识一台ZooKeeper集群中的机器,每台机器不能重复,和myid一致。ZXID:事务ID。ZXID是一个事务ID,用来标识一次服务器状态的变更。在某一时刻, 集群中的每台机器的ZXID值不一定完全一样Epoch:每个Leader任期的代号。没有Leader时同一轮投票过程中的逻辑时钟值是

4.1触发选举时机

1.服务器刚启动2.服务器运行期间无法和leader保持连接当一台机器进入leader选举流程时,当前集群出现两种状态: 1.集群中已经存在一个leader(此机器和leader建立连接,并状态同步) 2.集群中不存在leader(触发选举),looking状态

4.2zookeeper选举机制---第一次启动

服务器1:myid=1服务器2:myid=2服务器3:myid=3(1)	服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为LOOKING;(2)	服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:服务器1和服务器2比较谁的myid大,更改选票为推举服务器myid大的。此时服务器1票数0票,服务器2票数2票,大于半数以上结果,选举完成。服务器1为follower,2状态为leader(3)	服务器3启动,发起一次选举。此时服务器1,2已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为1票,此时服务器3服从多数,更改选票信息为服务器2,并更改状态为FOLLOWING;

4.2zookeeper选举机制---非第一次启动

服务器1:myid=1服务器2:myid=2服务器3:myid=3服务器运行期间无法和leader保持连接触发重新选举:假设zookeeper由5台服务器组成,SID分别为1,2,3,4,5,ZXID分别为8,8,8,7,7,并且此时SID为3的服务器此时时leader。某时刻3和5出现故障,因此开始进行leader选举: SID为1的机器:(1,8,1) SID为2的机器:(1,8,2) SID为4的机器:(1,7,4)选举规则:优先比较SID,再比较ZXID,其次比较Epoch。大的直接胜出当选leader

4.3选举机制总结

半数机制,超过半数的投票通过,即通过。第一次启动选举规则:投票过半数时,服务器 myid 大的胜出当leader第二次启动选举规则:①EPOCH大的直接胜出 ②EPOCH相同,事务id大的胜出 ③事务id相同,任期代号id大的胜出

5.客户端命令行操作

5.1常用命令整合

登陆客户端操作:很多命令和linux命令相似,比如ls,history等jps : 查看zookeeper运行的进程help :显示所有操作命令ls / :查看当前znode中包含的内容ls -s / :查看当前节点详细数据create : 创建普通节点create -s :创建带序号节点create -e :创建临时普通节点create -e -s :创建临时有序节点delete:删除节点create /wangzhe "this is wangzhe"  //新建节点(永久节点,不带序号)create /wangzhe/fashi "this is fashi" // //新建节点(永久节点,不带序号)get /wangzhe  // this is wangzhe  取值get -s /wangzhe  //节点详情set set /wangzhe "this is wangzherongyao" //修改值get -w :监听值ls -w /wangzhe :监听数量监听注册一个生效一次

5.2启动客户端

[root@sg-15 bin]# ./zkCli.sh -server 192.168.0.215:2181quit //退出

5.3znode节点数据信息

命令:ls -s /[zk: 192.168.0.215:2181(CONNECTED) 3] ls -s /[zookeeper]cZxid = 0x0ctime = Thu Jan 01 08:00:00 CST 1970mZxid = 0x0mtime = Thu Jan 01 08:00:00 CST 1970pZxid = 0x0cversion = -1dataVersion = 0aclVersion = 0ephemeralOwner = 0x0dataLength = 0numChildren = 1##################################1):czxid:创建节点的事务id zxid  每次修改ZooKeeper 状态都会产生一个ZooKeeper 事务 ID。事务 ID 是ZooKeeper 中所有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么zxid1 在 zxid2 之前发生。2):ctime:znode 被创建的毫秒数(从 1970 年开始)3):mzxid:znode 最后更新的事务zxid4):mtime:znode 最后更新的毫秒数(从 1970 年开始)5):pZxid:znode 最后更新的子节点zxid6):cversion:znode 子节点变化号,znode 子节点修改次数7):dataversion:znode 数据变化号8):aclVersion:znode 访问控制列表的变化号9):ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0。10):dataLength:znode 的数据长度11):numChildren:znode 子节点数量

5.4节点类型(持久/短暂/有序号/无序号)

持久:客户端和服务端端开连接后,创建的节点不删除序号:在分布式系统中,顺序号可以被用于所有事件排序,这样客户端可以通过顺序号推断事件的顺序{  持久有序号:客户端和服务端端开连接后,创建的节点不删除。且节点名称顺序编号  持久无序号:客户端和服务端端开连接后,创建的节点不删除。无序号}短暂:客户端和服务端端开连接后,创建的节点自己删除{  短暂有序号:客户端和服务端端开连接后,创建的节点自己删除。且节点名称顺序编号  短暂无序号:客户端和服务端端开连接后,创建的节点自己删除。无序号}

5.5节点操作

5.5.1创建/删除节点

create : 创建普通节点create -s :创建带序号节点create -e :创建临时普通节点create -e -s :创建临时有序节点delete:删除一个节点(如果这个节点下有子节点,删除失败)deleteall:递归删除所有节点(如果这个节点下有子节点,全部删除)eg:delete /wangzhe/fashi //只删除fashi节点deleteall /wangzhe  //递归删除wangzhe所有节点//1.创建节点:普通永久节点[zk: 192.168.0.215:2181(CONNECTED) 27] create /wangzhe "this is wangzhe"Created /wangzhe[zk: 192.168.0.215:2181(CONNECTED) 40] create /wangzhe/fashi "this is fashi"Created /wangzhe/fashi//2.创建节点:有序永久节点[zk: 192.168.0.215:2181(CONNECTED) 66] create -s /wangzhe/fashi/daji "daji"Created /wangzhe/fashi/daji0000000000[zk: 192.168.0.215:2181(CONNECTED) 70] create -s /wangzhe/fashi/daji "daji"Created /wangzhe/fashi/daji0000000001 //再次创建daji,序号+1//3.创建节点:普通临时节点[zk: 192.168.0.215:2181(CONNECTED) 71] create -e /wangzhe/fashi/ganjiang "ganjiang"Created /wangzhe/fashi/ganjiang//4.创建节点:有序临时节点[zk: 192.168.0.215:2181(CONNECTED) 73] create -e -s /wangzhe/fashi/anqila "anqila"Created /wangzhe/fashi/anqila0000000003//查看[zk: 192.168.0.215:2181(CONNECTED) 74] ls /wangzhe/fashi[anqila0000000003, daji0000000000, daji0000000001, ganjiang]//delete删除[zk: 192.168.0.215:2181(CONNECTED) 19] delete /wangzhe/fashi/xiaoqiaoxiaoqiao             xiaoqiao0000000007

5.5.2获取/查看的值

注意:get获取有序节点的值时:必须获取最后一个序号的值,比如获取下面ganjiang0000000004报错,获取ganjiang0000000005正常。lsgetget -s//查看[zk: 192.168.0.215:2181(CONNECTED) 10] ls /wangzhe/fashi[daji0000000000, daji0000000001, ganjiang0000000004, ganjiang0000000005, xiaoqiao]//获取值[zk: 192.168.0.215:2181(CONNECTED) 12] get /wangzhe/fashi/xiaoqiao0000000007xiaoqiao// 获取详细[zk: 192.168.0.215:2181(CONNECTED) 13] get -s /wangzhe/fashi/xiaoqiao0000000007xiaoqiaocZxid = 0x30000002fctime = Sat Mar 26 18:37:35 CST 2022mZxid = 0x30000002fmtime = Sat Mar 26 18:37:35 CST 2022pZxid = 0x30000002fcversion = 0dataVersion = 0aclVersion = 0ephemeralOwner = 0x0dataLength = 8numChildren = 0[zk: 192.168.0.215:2181(CONNECTED) 14] get -s /wangzhe/fashithis is fashicZxid = 0x300000020ctime = Sat Mar 26 17:31:12 CST 2022mZxid = 0x300000020mtime = Sat Mar 26 17:31:12 CST 2022pZxid = 0x30000002fcversion = 10dataVersion = 0aclVersion = 0ephemeralOwner = 0x0dataLength = 13numChildren = 6

5.6节点监听

get -w :监听值ls -w /wangzhe :监听数量监听注册一个生效一次

5.6.1监听节点值改变

[zk: 192.168.0.215:2181(CONNECTED) 25] get -w /wangzhe //开启一次监听this is wangzhe[zk: 192.168.0.215:2181(CONNECTED) 26] set /wangzhe "jianting:this is wangzhe"  //修改值WATCHER::  //监听值发生变化WatchedEvent state:SyncConnected type:NodeDataChanged path:/wangzhe

5.6.2监听节点下子节点数量改变

[zk: 192.168.0.215:2181(CONNECTED) 27] ls -w /wangzhe  ////开启一次监听[fashi, fashi0000000001][zk: 192.168.0.215:2181(CONNECTED) 28] delete /wangzhe/fashi0000000001  //删除一个节点WATCHER::  //监听数量发生变化WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/wangzhe

6.goang操作zookeeper

6.1创建节点create(增)

package mainimport (	"fmt"	"time"	"github.com/go-zookeeper/zk")func main() {  // 任意一个ip都可以,但是建议放主节点ip	conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second) 	if err != nil {		panic(err)	}	defer conn.Close()	// 1.创建的普通永久节点	path, err := conn.Create("/hello", []byte("world"), 0, zk.WorldACL(zk.PermAll))	if err != nil {		fmt.Println("err:",err)	}	fmt.Println("创建的普通永久节点:", path)    // 2.创建的普通临时节点,创建此节点的会话结束后立即清除此节点	ephemeral, err := conn.Create("/ephemeral", []byte("world"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))	if err != nil {		fmt.Println("err:",err)	}	fmt.Println("创建的普通临时节点:", ephemeral)    // 3.创建的有序永久节点	sequence, err := conn.Create("/sequence", []byte("world"), zk.FlagSequence, zk.WorldACL(zk.PermAll))	if err != nil {		panic(err)	}	fmt.Println("创建的有序永久节点:", sequence)    // 4.创建的有序临时节点,创建此节点的会话结束后立即清除此节点	ephemeralSequence, err := conn.Create("/ephemeralSequence", []byte("world"), zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))	if err != nil {		panic(err)	}	fmt.Println("创建的有序临时节点:", ephemeralSequence)}

6.2查看节点get(查)

package mainimport (	"fmt"	"time"	"github.com/go-zookeeper/zk")func main() {	conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)	if err != nil {		fmt.Println("err:",err)		return	}	defer conn.Close()	result, state, err := conn.Get("/wangzhe/fashi")	if err != nil {		fmt.Println("err:",err)		return	}	fmt.Println("result: ", string(result)) // this is fashi	fmt.Printf("%#v",state)  //状态结果:&zk.Stat{Czxid:12884901920, Mzxid:12884901920, Ctime:1648287072539, Mtime:1648287072539, Version:0, Cversion:11, Aversion:0, EphemeralOwner:0, DataLength:13, NumChildren:5, Pzxid:12884901936}}

6.3修改节点set(改)

package mainimport (	"fmt"	"time"	"github.com/go-zookeeper/zk")func main() {	conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)	if err != nil {		fmt.Println("err:",err)		return	}	defer conn.Close()	path := "/wangzhe/fashi"	_, state, _ := conn.Get(path) // 先查询,拿到当前版本	state, err = conn.Set(path, []byte("hello"), state.Version)	if err != nil {		fmt.Println("err:",err)		return	}	fmt.Printf("%#v",state)    //结果:&zk.Stat{Czxid:12884901920, Mzxid:12884902012, Ctime:1648287072539, Mtime:1648305221598, Version:1, Cversion:11, Aversion:0, EphemeralOwner:0, DataLength:5, NumChildren:5, Pzxid:12884901936}2022/03/26 22:33:39 recv loop }

6.4删除节点delete(删)

注意:此方法不能递归删除节点

package mainimport (	"fmt"	"time"	"github.com/go-zookeeper/zk")func main() {	conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)	if err != nil {		fmt.Println("err:",err)		return	}	defer conn.Close()	path := "/hello"	exists, state, err := conn.Exists(path)  //判断是否存在和查询版本	if exists{		err = conn.Delete(path, state.Version)		if err != nil {			fmt.Println("err:",err)			return		}		fmt.Println("节点删除成功!!!")	}else {		fmt.Println("节点不存在!!!")	}}

6.5查看节点的子节点Children

package mainimport (	"fmt"	"time"	"github.com/go-zookeeper/zk")func main() {	conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)	if err != nil {		fmt.Println("err:",err)		return	}	defer conn.Close()	childrenList, state, err := conn.Children("/wangzhe/fashi")	if err != nil {		fmt.Println("err:",err)		return	}	fmt.Println(childrenList)	fmt.Printf("%#v",state)}

6.6遍历节点Children后再get

package mainimport (	"fmt"	"time"	"github.com/go-zookeeper/zk")func main() {	conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)	if err != nil {		fmt.Println("err:",err)		return	}	defer conn.Close()	childrenList, _, err := conn.Children("/wangzhe/fashi")	if err != nil {		fmt.Println("err:",err)		return	}	for _,children:=range childrenList{		childrenPath:= fmt.Sprintf("/wangzhe/fashi/%s",children)		result, state, err := conn.Get(childrenPath)		if err != nil {			fmt.Println("err:",err)			return		}		fmt.Println(string(result))		fmt.Println(state)	}}

6.7判断节点是否存在-conn.Exists

package mainimport (	"fmt"	"time"	"github.com/go-zookeeper/zk")func main() {	conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)	if err != nil {		fmt.Println("err:",err)		return	}	defer conn.Close()	path := "/hello"	exists, _, err := conn.Exists(path)  if err != nil {		fmt.Println("err:",err)		return	}    if exists{    fmt.Println("节点存在")  }else{    fmt.Println("节点不存在")  }}

7.监听/watch

7.1监听节点-全局监听

将监听器放到Connect函数中,如果有监听事件发生,会一直执行监听器的回调函数。监听执行了一次之后要重新注册监听。

package mainimport (	"fmt"	"time"	"github.com/go-zookeeper/zk")func callback(e zk.Event) {	fmt.Println("========================")	fmt.Println("path:", e.Path)	fmt.Println("type:", e.Type.String())	fmt.Println("state:", e.State.String())}func main() {	eventCallbackOption := zk.WithEventCallback(callback)  // 经过测试,在连接时会执行3次回调函数	conn, _, err := zk.Connect([]string{"192.168.0.215","192.168.0.216","192.168.0.217"}, time.Second,eventCallbackOption)	if err != nil {		fmt.Println("err:",err)		return	}	defer conn.Close()	// 注册一个 watch	exists, state, _, err := conn.ExistsW("/watch")	if err != nil {		fmt.Println("err:",err)		return	}	fmt.Println("exists:",exists)	if !exists {		// 创建 /watch 时,触发监听事件,watch 失效		_, err = conn.Create("/watch", []byte("watch"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))		if err != nil {			fmt.Println("err:",err)			return		}		// 再注册一个 watch		_, state, _, err = conn.ExistsW("/watch")		if err != nil {			fmt.Println("err:",err)			return		}	}	// 删除 /watch 时,触发监听事件,watch 失效	err = conn.Delete("/watch", state.Version)	if err != nil {		fmt.Println("err:",err)		return	}}

结果:

========================path: type: EventSessionstate: StateConnecting========================path: type: EventSessionstate: StateConnected2022/03/27 11:09:27 connected to 192.168.0.215:2181========================path: type: EventSessionstate: StateHasSession2022/03/27 11:09:27 authenticated: id=360292329056632906, timeout=40002022/03/27 11:09:27 re-submitting `0` credentials after reconnectexists: false========================path: /watchtype: EventNodeCreatedstate: Unknown========================path: /watchtype: EventNodeDeletedstate: Unknown

7.2监听部分事件

package mainimport (	"fmt"	"time"	"github.com/go-zookeeper/zk")func main() {	conn, _, err := zk.Connect([]string{"192.168.0.215","192.168.0.216","192.168.0.217"}, time.Second)	if err != nil {		fmt.Println("err:",err)		return	}	defer conn.Close()	// 注册一个 watch	exists, _, eventChannel, err := conn.ExistsW("/watch")	if err != nil {		fmt.Println("err:",err)		return	}	go func() {		// 从事件 channel 中取出事件		e := <-eventChannel		fmt.Println("========================")		fmt.Println("path:", e.Path)		fmt.Println("type:", e.Type.String())		fmt.Println("state:", e.State.String())	}()	if !exists {		// 创建 临时节点/watch 时,触发监听事件,watch 失效		_, err = conn.Create("/watch", []byte("watch"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))		if err != nil {			fmt.Println("err:",err)			return		}	}}

8.微服务动态上下线监听(服务注册/发现)

8.1需求实现

微服务分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。需求实现:服务端:服务端启动时,在zookeeper中创建临时有序节点,服务关闭时,临时节点自动删除了(zookeeper临时节点机制)客户端:监听节点的变化

8.2服务端创建代码-(注册服务)

package mainimport (	"fmt"	"time"	"github.com/go-zookeeper/zk")func main() {	conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)	if err != nil {		fmt.Println("err:",err)		return	}	defer conn.Close()	//创建的有序临时节点,创建此节点的会话结束后立即清除此节点 create -e -s	ephemeralSequence, err := conn.Create("/servers/bikesvc", []byte("bikesvc"), zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))	if err != nil {		fmt.Println("err:",err)		return	}	fmt.Println("创建的有序临时节点:", ephemeralSequence)	time.Sleep(time.Second*10)}

8.3客户端监听代码-(服务发现)

package mainimport (	"fmt"	"time"	"github.com/go-zookeeper/zk")func mirror(conn *zk.Conn, path string) (chan []string, chan error) {	snapshots := make(chan []string)	errors := make(chan error)	go func() {		for {			snapshot, _, events, err := conn.ChildrenW(path)			if err != nil {				errors <- err				return			}			snapshots <- snapshot			evt := <-events			if evt.Err != nil {				errors <- evt.Err				return			}		}	}()	return snapshots, errors}func main() {	conn, _, err := zk.Connect([]string{"192.168.0.215:2181", "192.168.0.216:2181", "192.168.0.217:2181"}, time.Second)	if err != nil {		fmt.Println("err:", err)		return	}	defer conn.Close()	snapshots, errors := mirror(conn, "/servers") //监控的根节点,根节点不能删除	go func() {		for {			select {			case snapshot := <-snapshots:				fmt.Println("监控变化:", snapshot)			case err := <-errors:				fmt.Println("err:", err)			}		}	}()	for {	}}
结果:服务端:创建的有序临时节点: /servers/bikesvc0000000010客户端:监控变化: []监控变化: [bikesvc0000000009]监控变化: []

9.分布式锁

加锁进行资源保护go-zookeeper 添加分布式锁的方法为NewLock(c *Conn, path string, acl []ACL)。锁的结构体为:type Lock struct {	c        *Conn	path     string	acl      []ACL	lockPath string	seq      int}这个结构体实现了三个方法:Lock(),LockWithData(data []byte)和Unlock()

9.1分布式锁案例

根节点“/root”判断是否存在,不存在则创建
package mainimport (	"fmt"	"sync"	"time"	"github.com/go-zookeeper/zk")func main() {	conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)	if err != nil {		fmt.Println("err:",err)		return	}	defer conn.Close()	var wg sync.WaitGroup	for i := 0; i < 2; i++ {		wg.Add(1)		go func(n int) {			defer wg.Done()			lock := zk.NewLock(conn, "/root/lock", zk.WorldACL(zk.PermAll)) //加锁			err = lock.LockWithData([]byte("it is a lock"))			if err != nil {				panic(err)			}			fmt.Println("第", n, "个 goroutine 获取到了锁")			time.Sleep(time.Second*1) // 1 秒后释放锁			lock.Unlock()  //解锁		}(i)	}	wg.Wait()}
这里给了两个进程抢锁,ls查看一下锁:解释:把所有进程按有序排列,当成节点放入lock节点中,按照最小的序号执行。解锁一个删除一个。直到节点为空,进程执行完毕。[zk: localhost:2181(CONNECTED) 32] ls /root/lock[_c_1dbbc1ec75b285ef10a6d6154627335c-lock-0000000153, _c_793a837ded040d01608395e5eac65979-lock-0000000152]先执行152,再执行153

9.2监控锁案例

监控锁节点变化

监控代码

package mainimport (	"fmt"	"time"	"github.com/go-zookeeper/zk")func mirror(conn *zk.Conn, path string) (chan []string, chan error) {	snapshots := make(chan []string)	errors := make(chan error)	go func() {		for {			snapshot, _, events, err := conn.ChildrenW(path)			if err != nil {				errors <- err				return			}			snapshots <- snapshot			evt := <-events			if evt.Err != nil {				errors <- evt.Err				return			}		}	}()	return snapshots, errors}func main() {	conn, _, err := zk.Connect([]string{"192.168.0.215:2181", "192.168.0.216:2181", "192.168.0.217:2181"}, time.Second)	if err != nil {		fmt.Println("err:", err)		return	}	defer conn.Close()	snapshots, errors := mirror(conn, "/root/lock") //监控的根节点,根节点不能删除	go func() {		for {			select {			case snapshot := <-snapshots:				fmt.Println("监控变化:", snapshot)			case err := <-errors:				fmt.Println("err:", err)			}		}	}()	for {	}}

分布式锁代码

package mainimport (	"fmt"	"sync"	"time"	"github.com/go-zookeeper/zk")func main() {	conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)	if err != nil {		fmt.Println("err:",err)		return	}	defer conn.Close()	var wg sync.WaitGroup	for i := 0; i < 2; i++ {		wg.Add(1)		go func(n int) {			defer wg.Done()			lock := zk.NewLock(conn, "/root/lock", zk.WorldACL(zk.PermAll)) //加锁			err = lock.LockWithData([]byte("it is a lock"))			if err != nil {				panic(err)			}			fmt.Println("第", n, "个 goroutine 获取到了锁")			time.Sleep(time.Second*1) // 1 秒后释放锁			lock.Unlock()  //解锁		}(i)	}	wg.Wait()}

结果:

posted @ 2022-03-27 16:52 Jeff的技术栈 阅读(6) 评论(0) 编辑 收藏 举报
回帖
    张三

    张三 (王者 段位)

    821 积分 (2)粉丝 (41)源码

     

    温馨提示

    亦奇源码

    最新会员