体验ZooKeeper

Apache ZooKeeper 是 Apache 软件基金会的一个软件项目,他为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册,是主要用于分布式中一致性处理的框架。ZooKeeper 曾经是 Hadoop 的一个子项目,但现在是一个独立的顶级项目。
ZooKeeper 的架构通过冗余服务实现高可用性。ZooKeeper 节点将它们的数据存储于一个分层的命名空间,非常类似于一个文件系统或一个前缀树结构。客户端可以在节点读写,从而以这种方式拥有一个共享的配置服务。更新是全序的。

Zookeeper 从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper 就将负责通知已经在 Zookeeper 上注册的那些观察者做出相应的反应。

所谓的一致性,实际上就是围绕着“看见”来的。谁能看见?能否看见?什么时候看见?
也对应着:强一致性、弱一致性、最终一致性。

ZK特点

  • 一个领导者(Leader),多个跟随者(Follower)组成的集群。
  • 集群中只要有半数以上(不包括一半)节点存活,Zookeeper 集群就能正常服务(所以集群最少 3 推荐奇数个)。
  • 全局数据一致:每个 Server 保存一份相同的数据副本,Client 无论连接到哪个 Server,数据都是一致的。
  • 更新请求顺序进行,来自同一个 Client 的更新请求按其发送顺序依次执行。
  • 数据更新原子性,一次数据更新要么成功,要么失败
  • 实时性,在一定时间范围内,Client 能读到最新数据。

ZooKeeper 数据模型的结构与 Unix 文件系统很类似,整体上可以看作是一棵树,每个节点称做一个 ZNode。每一个 ZNode 默认能够存储 1MB 的数据,每个 ZNode 都可以通过其路径唯一标识

需要注意的是 ZK 的选举过程是非常复杂的,也就是耗时比较长,选举过程整个集群不可用,所以说 ZK 是 CP 的,相对的 Eureka 则是 AP 设计的,他们的比较看这里就够了。

为什么要保证半数以上?
从概念上来说,ZooKeeper 它所做的就是确保对 Znode 树的每一个修改都会被复制到集合体中超过半数的机器上。如果少于半数的机器出现故障,则最少有一台机器会保存最新的状态,其余的副本最终也会更新到这个状态。

应用场景

ZK 提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。

  • 统一命名
    在分布式场景下,经常需要对应用/服务进行统一命名,便于识别,例如 ip 和域名的关系。
  • 统一配置管理
    分布式中配置文件同步是常见的,一般要求在一个集群中,所有节点的配置是一致的。
    对某一个节点的配置修改后也会快速同步到其他节点上。
    因为客户端在监听,一旦 ZNode 中的数据变化,ZK 将通知各个客户端。
  • 统一集群管理
    分布式中,实时掌握每个节点的状态是必要的,可以根据节点的状态做出一些调整。
    ZK 会将节点的相关信息一起写入到 ZNode 中。
  • 服务器动态上下线
    客户端能够实时的洞察服务器的上下线情况。
    因为服务器注册的时候创建的是临时节点,服务器下线后会自动删除,ZK 会通知监听的客户端,然后客户端会去重新获取列表。
  • 软负载均衡
    ZK 可以记录每台服务器的访问次数,让访问最少的来处理最新的客户端请求。

安装ZK

本地安装

windows 版的没啥好说的,就说说 linux 版,正常情况应该是搞集群的,不过测试用机器性能有限,开一个就行了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 解压
tar -zxvf zookeeper-3.x.tar.gz -C /opt/module/
# 进入 conf 文件夹修改配置文件名
# 进入配置文件将 dataDir= 修改到指定的目录,需要自己创建文件夹,例如 zkData
mv zoo_sample.cfg zoo.cfg

# 启动
bin/zkServer.sh start
# 查看进程使用 jps
# 查看状态
bin/zkServer.sh status

# 启动客户端,退出 quit
bin/zkCli.sh

其他常用配置参数:

  • tickTime
    通信心跳数,Zookeeper 服务器与客户端心跳时间,单位毫秒。
    它用于心跳机制,并且设置最小的 session 超时时间为两倍心跳时间。
  • initLimit
    集群中的 Follower(跟随者)服务器与 Leader(领导者)服务器之间初始连接时能容忍的最多心跳数(总时间就是它乘以 tickTime),用它来限定集群中的 Zookeeper 服务器连接到 Leader 的时限。
  • syncLimit
    同步通信时限,集群中 Leader 与 Follower 之间的最大响应时间单位,假如响应超过 syncLimit * tickTime,Leader 认为 Follwer 死掉,从服务器列表中删除 Follwer。
  • clientPort
    客户端连接的端口

客户端操作的常用命令:

命令基本语法功能描述
help显示所有操作命令
ls path [watch]使用 ls 命令来查看当前 znode 中所包含的内容
ls2 path [watch]查看当前节点数据并能看到更新次数等数据
create普通创建 -s 含有序列 -e 临时(重启或者超时消失)
get path [watch]获得节点的值
set设置节点的具体值
stat查看节点状态
delete删除节点
rmr递归删除节点

集群安装

解压跟上面一样,如果安装了 rsync 远程同步工具,可以使用 xsync 命令同步过去,然后在设置的数据文件夹目录下创建一个 myid 的文件,在里面写上对应的编号。
然后在所有的配置文件中加入:

1
2
3
4
5
6
7
8
9
10
11
#######################cluster##########################
server.2=hostname102:2888:3888
server.3=hostname103:2888:3888
server.4=hostname104:2888:3888

# server.A=B:C:D
# A 是一个数字,表示这个是第几号服务器,就是 myid 文件里写的那个数字,
# Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比较从而判断到底是哪个 server。
# B 是这个服务器的 ip 地址;
# C 是这个服务器与集群中的 Leader 服务器交换信息的端口;
# D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

最后分别启动就可以了,可以使用查看状态来查看。

结构体

客户端使用 get 或者 ls2 命令获取到的信息解释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
czxid - 创建节点的事务 zxid
每次修改 ZooKeeper 状态都会收到一个 zxid 形式的时间戳,也就是 ZooKeeper 事务 ID。
事务 ID 是 ZooKeeper 中所有修改总的次序。每个修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前发生。

ctime - znode被创建的毫秒数(从1970年开始)

mzxid - znode最后更新的事务zxid

mtime - znode最后修改的毫秒数(从1970年开始)

pZxid - znode最后更新的子节点zxid

cversion - znode子节点变化号,znode子节点修改次数

dataversion - znode数据变化号

aclVersion - znode访问控制列表的变化号

ephemeralOwner - 如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0。

dataLength - znode的数据长度

numChildren - znode子节点数量

主要留意最后两个就好了。

重点内容

选举机制

半数机制:集群中半数以上机器存活,集群可用。所以 Zookeeper 适合安装奇数台服务器
Zookeeper 虽然在配置文件中并没有指定 Master 和 Slave。但是,Zookeeper 工作时,是有一个节点为 Leader,其他则为 Follower,Leader 是通过内部的选举机制临时产生的。

选举机制简单说就是每个服务都首先选自己,如果超过了集群数量的半数以上,那么选举结果即确定,否则放弃选自己继而选择 id 比自己大的一个。
假设集群里有五台服务器,id 编号 1-5,依次启动他们。

  1. 服务器 1 启动,此时只有它一台服务器启动了,它首先选择自己,但是发出去的报文没有任何响应,所以它的选举状态一直是 LOOKING 状态。
  2. 服务器 2 启动,它与最开始启动的服务器 1 进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以 id 值较大的服务器 2 胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是 3),所以服务器 1、2 还是继续保持 LOOKING 状态。
  3. 服务器 3 启动,根据前面的理论分析,服务器 3 成为服务器 1、2、3 中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的 Leader。
  4. 服务器 4 启动,根据前面的分析,理论上服务器 4 应该是服务器 1、2、3、4 中最大的,但是由于前面已经有半数以上的服务器选举了服务器 3,所以只能作为 Follower,第五个也是类似。

真实的选举机制是非常复杂的,所以耗时长,Leader 选举是保证分布式数据一致性的关键所在,期间还需要对投票是否有效等进行检查,一般会进行多轮投票才会选出。

非初始化的情况下,通常那台服务器上的数据越新(ZXID 会越大),其成为 Leader 的可能性越大,也就越能够保证数据的恢复。如果 ZXID 相同,则 SID 越大机会越大。


Zookeeper 的核心是原子广播,这个机制保证了各个 Server 之间的同步。实现这个机制的协议叫做 Zab 协议(Zookeeper Atomic broadcast protocol)。
Zab 协议有两种模式,它们分别是恢复模式(选主)和广播模式(同步)。

当服务启动或者在领导者崩溃后,Zab 就进入了恢复模式,当领导者被选举出来,且大多数 Server 完成了和 leader 的状态同步以后,恢复模式就结束了。
状态同步保证了 leader 和 Server 具有相同的系统状态。

仅靠 Paxos 不能满足 ZooKeeper 需求。ZooKeeper 是一个树形结构,很多操作都要先检查才能确定能不能执行,例如创建 /a/b 之前要先确定 /a 的存在,我们就能从此看出 Paxos 的一致性达不到 ZooKeeper 一致性的要求,所以就有了 Zab 协议,它保证了:

  • 同一个 leader 的发起的事务要按顺序被 apply,同时还要保证只有先前的 leader 的所有事务都被 apply 之后,新选的 leader 才能在发起事务。
  • 一些已经 Skip 的消息,需要仍然被 Skip。

当 leader 崩溃或者 leader 失去大多数的 follower,这时候 zk 进入恢复模式,恢复模式需要重新选举出一个新的 leader,让所有的 Server 都恢复到一个正确的状态(包括数据的恢复)。
Zk 的选举算法有两种:一种是基于 basic paxos 实现的,另外一种是基于 fast paxos 算法实现的;系统默认的选举算法为 fast paxos。
选完 leader 以后,zk 就进入状态同步过程。

关于恢复模式。

选好新 leader 后它通过一个多数派获得老 leader 提交的最新数据;
老 leader 重启后,可能还会认为自己是 leader,可能会继续发送未完成的请求,从而因为两个 leader 同时存在导致算法过程失败;
解决办法是把 leader 信息加入每条消息的 id 中,Zookeeper 中称为 zxid 为一 64 位数字,高 32 位为 leader 信息又称为 epoch,每次 leader 转换时递增;低 32 位为消息编号,leader 转换时应该从 0 重新开始编号。
通过 zxid,follower 能很容易发现请求是否来自老 leader,从而拒绝老 leader 的请求。新 leader 首先要获得大多数节点的支持,然后从状态最新的节点同步事务,完成后才可正式成为 leader 发起事务。

  1. 新 Leader 等待 server 连接;
  2. Follower 连接 leader,将最大的 zxid 发送给 leader;
  3. Leader 根据 follower 的 zxid 确定同步点;
  4. 完成同步后通知 follower 已经成为 uptodate 状态;
  5. Follower 收到 uptodate 消息后,又可以重新接受 client 的请求进行服务了。

详细分析见:
https://www.cnblogs.com/binyue/p/4270393.html
http://www.cnblogs.com/leesf456/p/6107600.html
https://www.cnblogs.com/sunddenly/p/4138580.html
https://www.jianshu.com/p/e126bb01331c

节点类型

节点的类型大体可分为两类:

  • 持久(Persistent):客户端和服务器端断开连接后,创建的节点不删除
  • 短暂(Ephemeral):客户端和服务器端断开连接后,创建的节点自己删除

其中每一种里又细分为两类,目录节点和顺序编号目录节点,就如同名字一类会在后面加一个递增的序号,由父节点负责维护,可以用于排序,记录事件的顺序。

监听器

当 ZK 客户端运行后,会创建两个线程,一个负责网络连接通信(connect)一个负责监听(listener);
通过 connect 线程将注册监听事件发给 ZK,ZK 收到后会将其加入到监听列表,类似 ip:port:监听内容 这样的形式。
满足条件后,ZK 就会按照监听列表告诉客户端,然后 listener 线程会回调 process 方法。
监听一般分为两类,一种是数据变化的监听,一种是子节点变化的监听。

写数据流程

首先 Client 向 ZooKeeper 的 Server1 上写数据,发送一个写请求。
如果 Server1 不是 Leader,那么 Server1 会把接受到的请求进一步转发给 Leader,这个 Leader 会将写请求广播给各个 Server,各个 Server 写成功后就会通知 Leader。
当 Leader 收到大多数 Server 数据写成功了,那么就说明数据写成功了。如果这里三个节点的话,只要有两个节点数据写成功了,那么就认为数据写成功了。
认为写成功之后,Leader 会告诉 Server1 数据写成功了,Server1 会进一步通知 Client 数据写成功了,这时就认为整个写操作成功。

所以,你可以知道 Zookeeper 并不保证读取的是最新数据,也就是强一致性,它只能保证最终一致性,同时它具有原子性:更新操作要么成功要么失败,没有中间状态。
而只需要一半以上写入完成即可认为成功也就好理解了,首先写操作只能由 Leader 发起,那么它肯定是最新的,后续可以同步给其他的 Follower,即使还没同步 Leader 就挂掉了,那么依然能保证新的 Leader 是有最新数据的,这就是靠 Zab 协议中的恢复模式了。

代码示例

使用原生 API 的基本操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class SimpleZkClient {
private static final String connectString = "192.168.169.129:2181,192.168.169.130:2181,192.168.169.131:2181";
private static final int sessionTimeout = 2000;

ZooKeeper zkClient = null;

@Before
public void init() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 收到事件通知后的回调函数
System.out.println(event.getType() + "---" + event.getPath());
try {
zkClient.getChildren("/", true);
} catch (Exception e) {
}
}
});
}

// 获取子节点
@Test
public void getChildren() throws Exception {
List<String> children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
Thread.sleep(Long.MAX_VALUE);
}

// 创建数据节点到 zk 中
@Test
public void testCreate() throws KeeperException, InterruptedException {
// 参数1:要创建的节点的路径
// 参数2:节点的数据
// 参数3:节点的权限
// 参数4:节点的类型
String nodeCreated = zkClient.create("/servers",
"hellozk".getBytes(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}

// 判断节点是否存在
@Test
public void testExist() throws KeeperException, InterruptedException {
Stat stat = zkClient.exists("/java6", false);
System.out.println(stat == null ? "not exist" : "exist");
}

// 获取节点的数据
@Test
public void getData() throws KeeperException, InterruptedException {
byte[] data = zkClient.getData("/java6", false, null);
System.out.println(new String(data));
}

// 删除节点
@Test
public void deleteZnode() throws InterruptedException, KeeperException {
// 参数2:指定要删除的版本,-1 表示删除所有版本
zkClient.delete("/java6", -1);
}

// 更新节点数据
@Test
public void setData() throws KeeperException, InterruptedException {
zkClient.setData("/java6", "I Miss you".getBytes(), -1);
byte[] data = zkClient.getData("/java6", false, null);
System.out.println(new String(data));
}
}

服务器节点动态上下线示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/**
* 服务端代码
*/
public class DistributedServer {
private static final String connectString = "192.168.169.129:2181,192.168.169.130:2181,192.168.169.131:2181";
private static final int sessionTimeout = 2000;
private static final String parentNode = "/servers";

ZooKeeper zkClient = null;

public static void main(String[] args) throws Exception{
// 获取zk连接
DistributedServer server = new DistributedServer();
server.getConnect();
// 利用zk连接注册服务器信息
server.registerServer(args[0]);
// 启动业务功能
server.handleBussiness(args[0]);
}

/**
* 创建到zk的客户端连接
*/
public void getConnect() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {}
});
}

/**
* 向zk集群注册服务器信息
*/
public void registerServer(String hostname) throws Exception{
// 创建临时序号节点
String create = zkClient.create(parentNode + "/server",
hostname.getBytes(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname + "is online.." + create);
}

/**
* 业务功能
*/
public void handleBussiness(String hostname) throws InterruptedException {
System.out.println(hostname + "start working.....");
Thread.sleep(Long.MAX_VALUE);
}
}


/**
* 客户端代码
*/
public class DistributeClient {
private static String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private static int sessionTimeout = 2000;
private ZooKeeper zk = null;
private String parentNode = "/servers";

public static void main(String[] args) throws Exception {
// 获取zk连接
DistributeClient client = new DistributeClient();
client.getConnect();

// 获取servers的子节点信息,从中获取服务器信息列表
client.getServerList();

// 业务进程启动
client.business();
}

// 创建到zk的客户端连接
public void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 需要再次启动监听
try {
getServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}

// 获取服务器列表信息
public void getServerList() throws Exception {
// 1.获取服务器子节点信息,并且对父节点进行监听
List<String> children = zk.getChildren(parentNode, true);

ArrayList<String> servers = new ArrayList<>();
// 2.遍历所有节点,获取节点中的主机名称信息
for (String child : children) {
byte[] data = zk.getData(parentNode + "/" + child, false, null);
servers.add(new String(data));
}
System.out.println(servers);
}

// 业务功能
public void business() throws Exception{
System.out.println("client is working ...");
Thread.sleep(Long.MAX_VALUE);
}
}

对于集群来说,无论是服务器还是消费端,都是客户端;服务端就是向集群添加信息,消费端就是监听信息。

节点权限

ZK 的节点有 5 种操作权限:
CREATE、READ、WRITE、DELETE、ADMIN 也就是 增、删、改、查、管理权限,这 5 种权限简写为 crwda 。
注:这 5 种权限中,delete 是指对子节点的删除权限,其它 4 种权限指对自身节点的操作权限。

身份的认证有 4 种方式:
world:默认方式,相当于全世界都能访问
auth:代表已经认证通过的用户(cli 中可以通过 addauth digest user:pwd 来添加当前上下文中的授权用户)
digest:即 用户名:密码 这种方式认证,这也是业务系统中最常用的
ip:使用 Ip 地址认证。

Java API 中,Ids.OPEN_ACL_UNSAFE :默认匿名权限;Ids.READ_ACL_UNSAFE :只读权限; CREATOR_ALL_ACL :给创建该 znode 连接所有权限。 事实上这里是采用了 auth 验证模式,使用 sessionID 做验证,所以创建该 znode 的连接可以对该 znode 做任何修改。

关于服务发现

相比 ZK,现在也有了不少其他优秀的选择,感兴趣的可以了解下 etcd、consul、Eureka。

FeatureConsulzookeeperetcdeuerka
服务健康检查服务状态,内存,硬盘等(弱)长连接,keepalive连接心跳可配支持
多数据中心支持
kv 存储服务支持支持支持
一致性raftpaxosraft
CAPCACPCPAP
使用接口(多语言能力)支持http和dns客户端http/grpchttp(sidecar)
watch支持全量/支持long polling支持支持 long polling支持 long polling/大部分增量
自身监控metricsmetricsmetrics
安全acl /httpsaclhttps支持(弱)
SpringCloud集成已支持已支持已支持已支持

真的是百家争鸣 o( ̄▽ ̄)ゞ)) ̄▽ ̄)o

参考

https://blog.51cto.com/zero01/2108483
https://tonydeng.github.io/2015/10/19/etcd-application-scenarios/
https://luyiisme.github.io/2017/04/22/spring-cloud-service-discovery-products/

喜欢就请我吃包辣条吧!

评论框加载失败,无法访问 Disqus

你可能需要魔法上网~~