体验ZooKeeper

Apache ZooKeeper 是 Apache 软件基金会的一个软件项目,他为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册,是主要用于分布式中一致性处理的框架。ZooKeeper 曾经是 Hadoop 的一个子项目,但现在是一个独立的顶级项目。
ZooKeeper 的架构通过冗余服务实现高可用性。ZooKeeper 节点将它们的数据存储于一个分层的命名空间,非常类似于一个文件系统或一个前缀树结构。客户端可以在节点读写,从而以这种方式拥有一个共享的配置服务。更新是全序的。

Zookeeper 从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper 就将负责通知已经在 Zookeeper 上注册的那些观察者做出相应的反应。

所谓的一致性,实际上就是围绕着“看见”来的。谁能看见?能否看见?什么时候看见?
也对应着:强一致性、弱一致性、最终一致性。

ZK特点

  • 一个领导者(Leader),多个跟随者(Follower)组成的集群。
  • 集群中只要有半数以上(不包括一半)节点存活,Zookeeper 集群就能正常服务(所以集群最少 3 推荐奇数个)。
  • 全局数据一致:每个 Server 保存一份相同的数据副本,Client 无论连接到哪个 Server,数据都是一致的。
  • 更新请求顺序进行,来自同一个 Client 的更新请求按其发送顺序依次执行。
  • 数据更新原子性,一次数据更新要么成功,要么失败
  • 实时性,在一定时间范围内,Client 能读到最新数据。

ZooKeeper 数据模型的结构与 Unix 文件系统很类似,整体上可以看作是一棵树,每个节点称做一个 ZNode。每一个 ZNode 默认能够存储 1MB 的数据,每个 ZNode 都可以通过其路径唯一标识

需要注意的是 ZK 的选举过程是非常复杂的,也就是耗时比较长,选举过程整个集群不可用,所以说 ZK 是 CP 的,相对的 Eureka 则是 AP 设计的,他们的比较看这里就够了。

为什么要保证半数以上?
从概念上来说,ZooKeeper 它所做的就是确保对 Znode 树的每一个修改都会被复制到集合体中超过半数的机器上。如果少于半数的机器出现故障,则最少有一台机器会保存最新的状态,其余的副本最终也会更新到这个状态。

应用场景

ZK 提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。

  • 统一命名
    在分布式场景下,经常需要对应用/服务进行统一命名,便于识别,例如 ip 和域名的关系。
  • 统一配置管理
    分布式中配置文件同步是常见的,一般要求在一个集群中,所有节点的配置是一致的。
    对某一个节点的配置修改后也会快速同步到其他节点上。
    因为客户端在监听,一旦 ZNode 中的数据变化,ZK 将通知各个客户端。
  • 统一集群管理
    分布式中,实时掌握每个节点的状态是必要的,可以根据节点的状态做出一些调整。
    ZK 会将节点的相关信息一起写入到 ZNode 中。
  • 服务器动态上下线
    客户端能够实时的洞察服务器的上下线情况。
    因为服务器注册的时候创建的是临时节点,服务器下线后会自动删除,ZK 会通知监听的客户端,然后客户端会去重新获取列表。
  • 软负载均衡
    ZK 可以记录每台服务器的访问次数,让访问最少的来处理最新的客户端请求。

安装ZK

本地安装

windows 版的没啥好说的,就说说 linux 版,正常情况应该是搞集群的,不过测试用机器性能有限,开一个就行了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 解压
tar -zxvf zookeeper-3.x.tar.gz -C /opt/module/
# 进入 conf 文件夹修改配置文件名
# 进入配置文件将 dataDir= 修改到指定的目录,需要自己创建文件夹,例如 zkData
mv zoo_sample.cfg zoo.cfg

# 启动
bin/zkServer.sh start
# 查看进程使用 jps
# 查看状态
bin/zkServer.sh status

# 启动客户端,退出 quit
bin/zkCli.sh

其他常用配置参数:

  • tickTime
    通信心跳数,Zookeeper 服务器与客户端心跳时间,单位毫秒。
    它用于心跳机制,并且设置最小的 session 超时时间为两倍心跳时间。
  • initLimit
    集群中的 Follower(跟随者)服务器与 Leader(领导者)服务器之间初始连接时能容忍的最多心跳数(总时间就是它乘以 tickTime),用它来限定集群中的 Zookeeper 服务器连接到 Leader 的时限。
  • syncLimit
    同步通信时限,集群中 Leader 与 Follower 之间的最大响应时间单位,假如响应超过 syncLimit * tickTime,Leader 认为 Follwer 死掉,从服务器列表中删除 Follwer。
  • clientPort
    客户端连接的端口

客户端操作的常用命令:

命令基本语法 功能描述
help 显示所有操作命令
ls path [watch] 使用 ls 命令来查看当前 znode 中所包含的内容
ls2 path [watch] 查看当前节点数据并能看到更新次数等数据
create 普通创建 -s 含有序列 -e 临时(重启或者超时消失)
get path [watch] 获得节点的值
set 设置节点的具体值
stat 查看节点状态
delete 删除节点
rmr 递归删除节点

集群安装

解压跟上面一样,如果安装了 rsync 远程同步工具,可以使用 xsync 命令同步过去,然后在设置的数据文件夹目录下创建一个 myid 的文件,在里面写上对应的编号。
然后在所有的配置文件中加入:

1
2
3
4
5
6
7
8
9
10
11
#######################cluster##########################
server.2=hostname102:2888:3888
server.3=hostname103:2888:3888
server.4=hostname104:2888:3888

# server.A=B:C:D
# A 是一个数字,表示这个是第几号服务器,就是 myid 文件里写的那个数字,
# Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比较从而判断到底是哪个 server。
# B 是这个服务器的 ip 地址;
# C 是这个服务器与集群中的 Leader 服务器交换信息的端口;
# D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

最后分别启动就可以了,可以使用查看状态来查看。

结构体

客户端使用 get 或者 ls2 命令获取到的信息解释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
czxid - 创建节点的事务 zxid
每次修改 ZooKeeper 状态都会收到一个 zxid 形式的时间戳,也就是 ZooKeeper 事务 ID。
事务 ID 是 ZooKeeper 中所有修改总的次序。每个修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前发生。

ctime - znode被创建的毫秒数(从1970年开始)

mzxid - znode最后更新的事务zxid

mtime - znode最后修改的毫秒数(从1970年开始)

pZxid - znode最后更新的子节点zxid

cversion - znode子节点变化号,znode子节点修改次数

dataversion - znode数据变化号

aclVersion - znode访问控制列表的变化号

ephemeralOwner - 如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0。

dataLength - znode的数据长度

numChildren - znode子节点数量

主要留意最后两个就好了。

重点内容

选举机制

半数机制:集群中半数以上机器存活,集群可用。所以 Zookeeper 适合安装奇数台服务器
Zookeeper 虽然在配置文件中并没有指定 Master 和 Slave。但是,Zookeeper 工作时,是有一个节点为 Leader,其他则为 Follower,Leader 是通过内部的选举机制临时产生的。

选举机制简单说就是每个服务都首先选自己,如果超过了集群数量的半数以上,那么选举结果即确定,否则放弃选自己继而选择 id 比自己大的一个。
假设集群里有五台服务器,id 编号 1-5,依次启动他们。

  1. 服务器 1 启动,此时只有它一台服务器启动了,它首先选择自己,但是发出去的报文没有任何响应,所以它的选举状态一直是 LOOKING 状态。
  2. 服务器 2 启动,它与最开始启动的服务器 1 进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以 id 值较大的服务器 2 胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是 3),所以服务器 1、2 还是继续保持 LOOKING 状态。
  3. 服务器 3 启动,根据前面的理论分析,服务器 3 成为服务器 1、2、3 中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的 Leader。
  4. 服务器 4 启动,根据前面的分析,理论上服务器 4 应该是服务器 1、2、3、4 中最大的,但是由于前面已经有半数以上的服务器选举了服务器 3,所以只能作为 Follower,第五个也是类似。

真实的选举机制是非常复杂的,所以耗时长,Leader 选举是保证分布式数据一致性的关键所在,期间还需要对投票是否有效等进行检查,一般会进行多轮投票才会选出。

非初始化的情况下,通常那台服务器上的数据越新(ZXID 会越大),其成为 Leader 的可能性越大,也就越能够保证数据的恢复。如果 ZXID 相同,则 SID 越大机会越大。


Zookeeper 的核心是原子广播,这个机制保证了各个 Server 之间的同步。实现这个机制的协议叫做 Zab 协议(Zookeeper Atomic broadcast protocol)。
Zab 协议有两种模式,它们分别是恢复模式(选主)和广播模式(同步)。

当服务启动或者在领导者崩溃后,Zab 就进入了恢复模式,当领导者被选举出来,且大多数 Server 完成了和 leader 的状态同步以后,恢复模式就结束了。
状态同步保证了 leader 和 Server 具有相同的系统状态。

仅靠 Paxos 不能满足 ZooKeeper 需求。ZooKeeper 是一个树形结构,很多操作都要先检查才能确定能不能执行,例如创建 /a/b 之前要先确定 /a 的存在,我们就能从此看出 Paxos 的一致性达不到 ZooKeeper 一致性的要求,所以就有了 Zab 协议,它保证了:

  • 同一个 leader 的发起的事务要按顺序被 apply,同时还要保证只有先前的 leader 的所有事务都被 apply 之后,新选的 leader 才能在发起事务。
  • 一些已经 Skip 的消息,需要仍然被 Skip。

当 leader 崩溃或者 leader 失去大多数的 follower,这时候 zk 进入恢复模式,恢复模式需要重新选举出一个新的 leader,让所有的 Server 都恢复到一个正确的状态(包括数据的恢复)。
Zk 的选举算法有两种:一种是基于 basic paxos 实现的,另外一种是基于 fast paxos 算法实现的;系统默认的选举算法为 fast paxos。
选完 leader 以后,zk 就进入状态同步过程。

关于恢复模式。

选好新 leader 后它通过一个多数派获得老 leader 提交的最新数据;
老 leader 重启后,可能还会认为自己是 leader,可能会继续发送未完成的请求,从而因为两个 leader 同时存在导致算法过程失败;
解决办法是把 leader 信息加入每条消息的 id 中,Zookeeper 中称为 zxid 为一 64 位数字,高 32 位为 leader 信息又称为 epoch,每次 leader 转换时递增;低 32 位为消息编号,leader 转换时应该从 0 重新开始编号。
通过 zxid,follower 能很容易发现请求是否来自老 leader,从而拒绝老 leader 的请求。新 leader 首先要获得大多数节点的支持,然后从状态最新的节点同步事务,完成后才可正式成为 leader 发起事务。

  1. 新 Leader 等待 server 连接;
  2. Follower 连接 leader,将最大的 zxid 发送给 leader;
  3. Leader 根据 follower 的 zxid 确定同步点;
  4. 完成同步后通知 follower 已经成为 uptodate 状态;
  5. Follower 收到 uptodate 消息后,又可以重新接受 client 的请求进行服务了。

详细分析见:
https://www.cnblogs.com/binyue/p/4270393.html
http://www.cnblogs.com/leesf456/p/6107600.html
https://www.cnblogs.com/sunddenly/p/4138580.html
https://www.jianshu.com/p/e126bb01331c

节点类型

节点的类型大体可分为两类:

  • 持久(Persistent):客户端和服务器端断开连接后,创建的节点不删除
  • 短暂(Ephemeral):客户端和服务器端断开连接后,创建的节点自己删除

其中每一种里又细分为两类,目录节点和顺序编号目录节点,就如同名字一类会在后面加一个递增的序号,由父节点负责维护,可以用于排序,记录事件的顺序。

监听器

当 ZK 客户端运行后,会创建两个线程,一个负责网络连接通信(connect)一个负责监听(listener);
通过 connect 线程将注册监听事件发给 ZK,ZK 收到后会将其加入到监听列表,类似 ip:port:监听内容 这样的形式。
满足条件后,ZK 就会按照监听列表告诉客户端,然后 listener 线程会回调 process 方法。
监听一般分为两类,一种是数据变化的监听,一种是子节点变化的监听。

写数据流程

首先 Client 向 ZooKeeper 的 Server1 上写数据,发送一个写请求。
如果 Server1 不是 Leader,那么 Server1 会把接受到的请求进一步转发给 Leader,这个 Leader 会将写请求广播给各个 Server,各个 Server 写成功后就会通知 Leader。
当 Leader 收到大多数 Server 数据写成功了,那么就说明数据写成功了。如果这里三个节点的话,只要有两个节点数据写成功了,那么就认为数据写成功了。
认为写成功之后,Leader 会告诉 Server1 数据写成功了,Server1 会进一步通知 Client 数据写成功了,这时就认为整个写操作成功。

所以,你可以知道 Zookeeper 并不保证读取的是最新数据,也就是强一致性,它只能保证最终一致性,同时它具有原子性:更新操作要么成功要么失败,没有中间状态。
而只需要一半以上写入完成即可认为成功也就好理解了,首先写操作只能由 Leader 发起,那么它肯定是最新的,后续可以同步给其他的 Follower,即使还没同步 Leader 就挂掉了,那么依然能保证新的 Leader 是有最新数据的,这就是靠 Zab 协议中的恢复模式了。

代码示例

使用原生 API 的基本操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class SimpleZkClient {
private static final String connectString = "192.168.169.129:2181,192.168.169.130:2181,192.168.169.131:2181";
private static final int sessionTimeout = 2000;

ZooKeeper zkClient = null;

@Before
public void init() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 收到事件通知后的回调函数
System.out.println(event.getType() + "---" + event.getPath());
try {
zkClient.getChildren("/", true);
} catch (Exception e) {
}
}
});
}

// 获取子节点
@Test
public void getChildren() throws Exception {
List<String> children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
Thread.sleep(Long.MAX_VALUE);
}

// 创建数据节点到 zk 中
@Test
public void testCreate() throws KeeperException, InterruptedException {
// 参数1:要创建的节点的路径
// 参数2:节点的数据
// 参数3:节点的权限
// 参数4:节点的类型
String nodeCreated = zkClient.create("/servers",
"hellozk".getBytes(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}

// 判断节点是否存在
@Test
public void testExist() throws KeeperException, InterruptedException {
Stat stat = zkClient.exists("/java6", false);
System.out.println(stat == null ? "not exist" : "exist");
}

// 获取节点的数据
@Test
public void getData() throws KeeperException, InterruptedException {
byte[] data = zkClient.getData("/java6", false, null);
System.out.println(new String(data));
}

// 删除节点
@Test
public void deleteZnode() throws InterruptedException, KeeperException {
// 参数2:指定要删除的版本,-1 表示删除所有版本
zkClient.delete("/java6", -1);
}

// 更新节点数据
@Test
public void setData() throws KeeperException, InterruptedException {
zkClient.setData("/java6", "I Miss you".getBytes(), -1);
byte[] data = zkClient.getData("/java6", false, null);
System.out.println(new String(data));
}
}

服务器节点动态上下线示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/**
* 服务端代码
*/
public class DistributedServer {
private static final String connectString = "192.168.169.129:2181,192.168.169.130:2181,192.168.169.131:2181";
private static final int sessionTimeout = 2000;
private static final String parentNode = "/servers";

ZooKeeper zkClient = null;

public static void main(String[] args) throws Exception{
// 获取zk连接
DistributedServer server = new DistributedServer();
server.getConnect();
// 利用zk连接注册服务器信息
server.registerServer(args[0]);
// 启动业务功能
server.handleBussiness(args[0]);
}

/**
* 创建到zk的客户端连接
*/
public void getConnect() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {}
});
}

/**
* 向zk集群注册服务器信息
*/
public void registerServer(String hostname) throws Exception{
// 创建临时序号节点
String create = zkClient.create(parentNode + "/server",
hostname.getBytes(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname + "is online.." + create);
}

/**
* 业务功能
*/
public void handleBussiness(String hostname) throws InterruptedException {
System.out.println(hostname + "start working.....");
Thread.sleep(Long.MAX_VALUE);
}
}


/**
* 客户端代码
*/
public class DistributeClient {
private static String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private static int sessionTimeout = 2000;
private ZooKeeper zk = null;
private String parentNode = "/servers";

public static void main(String[] args) throws Exception {
// 获取zk连接
DistributeClient client = new DistributeClient();
client.getConnect();

// 获取servers的子节点信息,从中获取服务器信息列表
client.getServerList();

// 业务进程启动
client.business();
}

// 创建到zk的客户端连接
public void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 需要再次启动监听
try {
getServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}

// 获取服务器列表信息
public void getServerList() throws Exception {
// 1.获取服务器子节点信息,并且对父节点进行监听
List<String> children = zk.getChildren(parentNode, true);

ArrayList<String> servers = new ArrayList<>();
// 2.遍历所有节点,获取节点中的主机名称信息
for (String child : children) {
byte[] data = zk.getData(parentNode + "/" + child, false, null);
servers.add(new String(data));
}
System.out.println(servers);
}

// 业务功能
public void business() throws Exception{
System.out.println("client is working ...");
Thread.sleep(Long.MAX_VALUE);
}
}

对于集群来说,无论是服务器还是消费端,都是客户端;服务端就是向集群添加信息,消费端就是监听信息。

节点权限

ZK 的节点有 5 种操作权限:
CREATE、READ、WRITE、DELETE、ADMIN 也就是 增、删、改、查、管理权限,这 5 种权限简写为 crwda 。
注:这 5 种权限中,delete 是指对子节点的删除权限,其它 4 种权限指对自身节点的操作权限。

身份的认证有 4 种方式:
world:默认方式,相当于全世界都能访问
auth:代表已经认证通过的用户(cli 中可以通过 addauth digest user:pwd 来添加当前上下文中的授权用户)
digest:即 用户名:密码 这种方式认证,这也是业务系统中最常用的
ip:使用 Ip 地址认证。

Java API 中,Ids.OPEN_ACL_UNSAFE :默认匿名权限;Ids.READ_ACL_UNSAFE :只读权限; CREATOR_ALL_ACL :给创建该 znode 连接所有权限。 事实上这里是采用了 auth 验证模式,使用 sessionID 做验证,所以创建该 znode 的连接可以对该 znode 做任何修改。

关于服务发现

相比 ZK,现在也有了不少其他优秀的选择,感兴趣的可以了解下 etcd、consul、Eureka。

Feature Consul zookeeper etcd euerka
服务健康检查 服务状态,内存,硬盘等 (弱)长连接,keepalive 连接心跳 可配支持
多数据中心 支持
kv 存储服务 支持 支持 支持
一致性 raft paxos raft
CAP CA CP CP AP
使用接口(多语言能力) 支持http和dns 客户端 http/grpc http(sidecar)
watch支持 全量/支持long polling 支持 支持 long polling 支持 long polling/大部分增量
自身监控 metrics metrics metrics
安全 acl /https acl https支持(弱)
SpringCloud集成 已支持 已支持 已支持 已支持

真的是百家争鸣 o( ̄▽ ̄)ゞ)) ̄▽ ̄)o

参考

https://blog.51cto.com/zero01/2108483
https://tonydeng.github.io/2015/10/19/etcd-application-scenarios/
https://luyiisme.github.io/2017/04/22/spring-cloud-service-discovery-products/

喜欢就请我吃包辣条吧!

评论框加载失败,无法访问 Disqus

你可能需要魔法上网~~