Zookeeper的重要性及应用的广泛性,这里就不再叙述了,本文是学习Hadoop权威指南的基础上进行的总结,当然本文大部分内容来自此书,中间会穿插一些个人的理解。本文主要分以下几块进行详述。

  • ZooKeeper介绍
  • Zookeeper安装与运行
  • ZooKeeper组成员关系
  • ZooKeeper服务
  • ZooKeeper应用

ZooKeeper介绍

官网对其介绍的原话如下:

ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications. Each time they are implemented there is a lot of work that goes into fixing the bugs and race conditions that are inevitable. Because of the difficulty of implementing these kinds of services, applications initially usually skimp on them ,which make them brittle in the presence of change and difficult to manage. Even when done correctly, different implementations of these services lead to management complexity when the applications are deployed.

总结一下就是,Zookeeper分布式服务框架是一个用来解决分布式应用中经常遇到的一些数据管理问题(如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等)的中央服务。

对于一个分布式系统最困难的事情之一就是如何处理部分失败(partial failure)。当一条message在网络中的两个节点之间传送时,如果出现了网络错误,发送者无法知道接收者是否已经接收到了这条message。接收者有可能在发生错误之前收到这个message,也有可能没有收到,还有可能接收者已经挂掉。发送者获得真实情况的一般解决方案就是:重新连接接收者,然后发起询问。这就是部分失败:即我们甚至不知道一个操作是否已经完成。

Zookeeper正是为了解决这个问题而应运而生的,当然Zookeeper并不能完全根除部分失败,当然它也不会隐藏这部分的失败。ZooKeeper具有以下几个特点:

  • 简单:它的核心是一个精简的文件系统,它提供一些简单的操作和一些额外的抽象操作;
  • 富有表现力:ZooKeeper可以用于实现多种协议和数据结构;
  • 高可用性:可避免单点故障;
  • 采用耦合交互方式:在交互过程中,参与者不需要彼此了解,进程在不了解其他进程的情况下就能够彼此发现并进行交互;
  • 是一个资源库:它是一个开源共享存储库,能使程序员免于编写这类通用的协议。
  • 高性能:对于写操作而言,Zookeeper的基准测试吞吐量已经超过每秒10000个操作,对于常规的读操作,吞吐量更高。

Zookeeper的安装与运行

安装

Zookeeper镜像上下载Zookeeper安装包(这里以zookeeper-3.4.6.tar.gz为例)。这里给出一般Zookeeper的安装与运行的方法,很多实际生成环境中,我们都是使用CDH集成的Zookeeper,这样的话安装与运行就完全可以通过图形化界面操作了。

1
2
3
4
# 解压
tar -zxvf zookeeper-3.4.6.tar.gz -C /opt
# 复制配置文件
cp /opt/zookeeper/zoo_sample.cfg /opt/zookeeper/zoo.cfg

修改配置文件zoo.cfg.

1
2
3
4
5
6
7
8
9
dataDir=/opt/zookeeper/data
dataLogDir=/opt/zookeeper/logs
clientPort=2181
tickTime=2000
initLimit=5
syncLimit=2
server.1=zookeeper1:2888:3888
server.2=zookeeper2:2888:3888
server.3=zookeeper3:2888:3888

首先需要在dataDir目录下,新建一个名为myid的文件,这个文件的作用是指定这个服务器的ID,服务器ID在集合体中是唯一的,并且取值范围在1到255之间。下面再分别介绍一下其他几个参数的意义:

  • dataDir:数据目录;
  • dataLogDir:日志目录;
  • clientPort:客户端连接端口;
  • tickTime:Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳(它也是Zookeeper中的基本时间单位);
  • initLimit:设定了允许所有follower(下面会介绍)与leader进行连接并同步的时间,它是tickTime的整数倍;
  • syncLimit:设定了一个follower与leader进行同步的时间,也是tickTime的整数倍;
  • server.n=hostname:port1:port2:n的值就是服务器的ID,port1是follower用来连接leader的端口,port2是用于leader选举。总结起来就是,2181用于客户端连接,对于leader来说,2888端口用于follower连接,3888端口用于leader选举阶段的其他服务器连接。

启动与停止

启动:

1
/opt/zookeeper-3.4.6/bin/zkServer.sh start

停止:

1
/opt/zookeeper-3.4.6/bin/zkServer.sh stop

ZooKeeper组成员关系

Zookeeper是一个具有高可用性的高性能协调服务。

组成员关系

Zookeeper 会维护一个具有层次关系的数据结构,它非常类似于一个标准的文件系统,但是这个文件系统中没有文件和目录,而是统一使用节点(node)的概念,成为znode。znode既可以作为保存数据的容器(如:文件),也可以作为保存其他znode的容器(如:目录)。所有的znode构成一个层次化的命名空间。一种自然的建立组成员列表的方式就是利用这个层次结构,如下图所示,首先创建一个以组名(/zk)为节点的znode作为父节点,然后以组成员(/zk/node1/zk/node2/zk/node3)为节点名来创建作为子节点的znode。

group

示例

本例通过一个小项目来介绍Zookeeper的API使用。工程项目参见ZooKeeperGroupExample.

这里是使用maven建立的工程,pom文件中jar包的依赖内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.5-cdh5.4.8</version>
</dependency>
</dependencies>

创建组

本程序是在Zookeeper中新建表示组的znode,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class CreateGroup implements Watcher {
private static final int SESSION_TIMEOUT = 5000;
private ZooKeeper zk;
private CountDownLatch connectedSignal = new CountDownLatch(1);

public void connect(String hosts) throws IOException, InterruptedException {
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
connectedSignal.await();
}

@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
connectedSignal.countDown();
}
}

public void create(String groupName) throws KeeperException, InterruptedException {
String path = "/" + groupName;
String createdPath = zk.create(path, null/*data*/, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("CreateGroup: Created" + createdPath);
}

public void close() throws InterruptedException {
zk.close();
}

public static void main(String[] args) throws Exception {
CreateGroup createGroup = new CreateGroup();
createGroup.connect(args[0]);
createGroup.create(args[1]);
createGroup.close();
}
}

程序的主要接口有:

  • new ZooKeeper():实例化一个新的Zookeeper类的对象,这个类负责维护客户端和Zookeeper服务之间的联系。它有三个参数
    1. Zookeeper服务的主机地址(可指定端口,默认是2181);
    2. 以毫秒为单位的会话超时参数;
    3. 一个Watcher对象的实例,Watcher对象接收来自Zookeeper的回调,以获得各种事件的通知。
  • zk.create():创建一个新的Zookeeper的znode。它有四个参数:
    1. 路径(字符串表示);
    2. znode的内容(字节数组,本例中都使用null值);
    3. ACL(访问控制列表);
    4. 创建znode的类型,有短暂和持久两种。

当一个Zookeeper实例新建时,会启动一个线程连接到Zookeeper服务,它对构造函数是立即返回的,因此在新建的Zookeeper对象之前一定要等待其与Zookeeper服务之间连接成功。这里使用CountDownLatch来阻止使用的Zookeeper对象。当客户端与Zookeeper建立连接之后,Watcherprocess()方法会被调用,参数表示一个连接的事件。在接收到一个连接事件(以Watcher.Event.KeeperState的枚举类型值SyncConnected来表示)时,我们通过调用CountDownLatchcountDown()方法来递减它的计数器。锁存器(latch)被创建时带有一个值为1的计数器,用于表示它在释放所有线程之前需要发生的事件数。在调用一次countDown()方法之后,计数器的值变为0,则await()方法返回。

输入以下命令运行:

1
java -cp zookeeperexample.jar groupexample.CreateGroup zkIP matt

输出:

1
CreateGroup: Created/matt

创建组成员

下面我们编写一个用于注册组成员的程序,每个组成员将作为一个程序运行,并且加入到组中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 用于等待建立与Zookeeper连接的辅助类

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ConnectionWatcher implements Watcher{
private static final int SESSION_TIMEOUT=5000;
protected ZooKeeper zk;
private CountDownLatch connectedSignal=new CountDownLatch(1);

public void connect(String hosts) throws IOException, InterruptedException{
zk=new ZooKeeper(hosts,SESSION_TIMEOUT,this);
connectedSignal.await();
}

@Override
public void process(WatchedEvent event){
if(event.getState()== Event.KeeperState.SyncConnected){
connectedSignal.countDown();
}
}

public void close() throws InterruptedException{
zk.close();
}
}

上述代码与CreateGroup的很类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 用于将组成员加入到组中

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;

public class JoinGroup extends ConnectionWatcher {
public void join(String groupName, String memberName) throws KeeperException, InterruptedException {
String path = "/" + groupName + "/" + memberName;
String createdPath = zk.create(path, null/*data*/, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("Created " + createdPath);
}

public static void main(String[] args) throws Exception {
JoinGroup joinGroup = new JoinGroup();
joinGroup.connect(args[0]);
joinGroup.join(args[1], args[2]);
}
}

这里的CreateMode.PERSISTENT也可以设置为CreateMode.EPHEMERAL,当设置为EPHEMERAL时,也就意味着这个znode是一个短暂的znode,一旦关闭客户端,子节点的znode就会从父节点的znode中删除。

输入以下命令运行:

1
2
3
java -cp zookeeperexample.jar groupexample.JoinGroup 192.168.80.23 matt wm1
java -cp zookeeperexample.jar groupexample.JoinGroup 192.168.80.23 matt wm2
java -cp zookeeperexample.jar groupexample.JoinGroup 192.168.80.23 matt wm3

输出:

1
2
3
Created /matt/wm1
Created /matt/wm2
Created /matt/wm3

列出组成员

这段程序的目标是,在给出Zookeeper地址和父节点znode的情况下,列出该父节点znode的子节点znode。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import org.apache.zookeeper.KeeperException;
import java.util.List;

public class ListGroup extends ConnectionWatcher {

public void list(String groupName) throws KeeperException, InterruptedException {
String path = "/" + groupName;
try {
List<String> children = zk.getChildren(path, false);
if (children.isEmpty()) {
System.out.printf("No members in group %s\n", groupName);
System.exit(1);
}
for (String child : children) {
System.out.println(child);
}
} catch (KeeperException.NoNodeException e) {
System.out.printf("Group %s does not exist\n", groupName);
System.exit(1);
}
}

public static void main(String[] args) throws Exception {
ListGroup listGroup = new ListGroup();
listGroup.connect(args[0]);
listGroup.list(args[1]);
listGroup.close();
}
}

这里主要是调用了zk.getChildren()来打印出一个znode的子节点列表,调用参数为该znode的路径和设为false的观察标志。如果在一个znode上设置了观察标志,那么一旦该znode的状态改变,关联的观察(Watcher)会被触发。在这里我们没有使用观察,但是在查看一个znode的子节点时,也可以设置观察,让应用程序接收到组成员加入、退出和组被删除的有关通知。

KeeperException.NoNodeException代表了组znode不存在的异常。

输入以下命令运行:

1
java -cp zookeeperexample.jar groupexample.ListGroup 192.168.80.23 matt

输出:

1
2
3
wm1
wm2
wm3

删除组

这里给出一个删除znode的程序,它需要支持一级目录的递归删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.apache.zookeeper.KeeperException;
import java.util.List;

public class DeleteGroup extends ConnectionWatcher{
public void delete(String groupName) throws KeeperException,InterruptedException{
String path="/"+groupName;

try{
List<String> children=zk.getChildren(path,false);
for(String child: children){
zk.delete(path+"/"+child,-1);
}
zk.delete(path,-1);
}catch (KeeperException.NoNodeException e){
System.out.printf("Group %s does not exist\n", groupName);
System.exit(1);
}
}

public static void main(String[] args) throws Exception{
DeleteGroup deleteGroup=new DeleteGroup();
deleteGroup.connect(args[0]);
deleteGroup.delete(args[1]);
deleteGroup.close();
}
}

zookeeper对象提供了delete()的方法,该方法有两个参数:

  1. 路径;
  2. 版本号:如果所提供的版本号与znode的版本号一致,则Zookeeper会删除这个znode,这是一种乐观枷锁方式,使客户端能够检测出对znode的修改冲突,这里将版本号设置为-1,可以绕过这个版本检测机制,不管znode的版本号是什么而直接将其删除。

Zookeeper不支持递归的删除操作,所以在删除父节点之前必须删除其子节点。

输入以下命令运行:

1
java -cp zookeeperexample.jar groupexample.DeleteGroup 192.168.80.23 matt

通过Zookeeper客户端看到的变化如下图(处理过之后的图)所示:

zk

ZooKeeper服务

这里主要通过数据模型、操作、实现、一致性、会话和状态来介绍。

数据模型

Zookeeper维护着一个树形层次结构,树中的节点被称为znode。znode可以用与存储数据,并且有一个与之关联的ACL。

  1. Zookeeper被设计用来协调服务(通常是小数据文件),而不是用于大容量数据存储,因此一个znode能存储的数据被限制在1MB以内;
  2. znode的数据访问具有原子性:客户端在读取一个znode数据时,要么读取到所有的数据,要么读操作失败,不会只读到部分数据。同样,写操作将替换znode存储的所有数据(Zookeeper不支持添加操作);
  3. znode通过路径被引用:Zookeeper中使用的路径必须是绝对路径,而且所有的路径必须是规范的,即每条路径只有唯一的一种表示方式,不支持路径解析;
  4. Zookeeper的路径与URI不同,前者在Java API中通过(java.lang.String)来使用,而后者通过Hadoop Path类(或java.net.URI)来使用。

短暂znode

znode有两种类型,znode的类型在创建时确定并且之后不能再修改。

  1. 短暂的:在创建短暂znode的客户端会话结束时,Zookeeper会将该短暂znode删除(短暂的znode不能有子节点);
    应用:对于那些需要知道特定时刻有哪些分布式资源可用的应用来说,使用短暂znode是一种理想的选择。
  2. 持久的:持久znode不依赖于客户端会话,只有当客户端明确要删除该持久znode时才会被删除。

顺序号

  • 概念
    • 顺序(sequential)znode是指名称中包含ZooKeeper指定顺序号的znode。
  • 设置
    • 如果在创建znode时设置了顺序标识,那么该znode名称之后便会附加一个值,这个值由一个单调递增的计数器(由父节点维护)所添加的。
  • 举例
    • 如果一个客户端请求创建一个名为/a/b-的顺序znode,则所创建znode的名字可能是/a/b-3。如果稍后,另外一个名为/a/b-的顺序znode被创建,计数器会给出一个更大的值来保证znode名称的唯一性,例如:/a/b-5。在 Java 的 API 中,顺序 znode 的实际路径会作为 create() 调用的返回值被传回到客户端。
  • 应用
    • 在一个分布式系统中,顺序号可以被用于为所有的时间进行全局排序,这样客户端就可以通过顺序号来推断事件的顺序。今后的共享锁就是利用该原理。

观察

znode以某种方式发生变化时,观察(Watch)机制可以让客户端得到通知。可以针对Zookeeper服务的操作来设置观察,该服务的其他操作可以触发观察。

注意:

  • 观察只触发一次,为了得到多次收到通知,客户端需要重新注册所需的观察。

操作

如下表,Zookeeper中有9种基本操作。

操作 描述
create 创建一个 znode (必须要有父节点)
delete 删除一个 znode (该 znode不能有任何子节点)
exists 测试一个 znode 是否存在并且查询它的元数据
getACL,setACL 获取/设置一个 znode 的 ACL
getChildren 获取一个 znode 的子节点列表
getData,setData 获取/设置一个 znode 所保存的数据
sync 将客户端的 znode 视图与 Zookeeper 同步

Zookeeper 中的更新操作时有条件的,在使用deletesetData操作时必须提供被更新 znode 的版本号(可以通过 exists 操作获得)。如果版本号不匹配,则更新操作会失败。更新操作时非阻塞操作,因此一个更新失败的客户端(由于其他进程同时在更新同一个 znode)可以决定是否重试,或执行其他操作,并不会因此而阻塞其他进程的执行。

虽然 Zookeeper 可以被看作是一个文件系统,但出于简单性的需求,有一些文件系统的基本操作被它摒弃了。由于 Zookeeper 中的文件较小并且总是被整体读写,因此没有必要提供打开、关闭或查找操作。

API

对于 Zookeeper 客户端来说,主要由两种语言绑定 (binging) 可以使用:Java 和 C;当然也可以使用 Perl、Python 和 REST 的 contrib 绑定。对于每一种绑定语言来说,在执行操作时都可以选择同步执行或异步执行(提供两种不同的API)。

同步API与异步API的区别:

  • 同步API:使用同步API每个线程都会阻塞进程,知道该操作返回;
  • 异步API:允许以流水线方式处理请求,这在某些情况下可以提供更好的吞吐量。

观察触发器

existsgetChildrengetData这些读操作上可以设置观察,这些观察可以被写操作createdeletesetData 触发。ACL 相关的操作不参与触发任何观察。当一个观察被触发时会产生一个观察事件,这个观察和触发它的操作共同决定着观察事件的类型。

  • 当所观察的znode被创建子节点、删除或其他数据更新时,设置在exists操作上的观察将会被触发。
  • 当所观察的znode被删除或其更新时,设置在getData上的观察将会被触发,创建znode不会触发getData上的观察,因为getData操作成功执行的前提是znode必须已经在。
  • 当所观察的znode的一个子节点被创建或删除时,或观察的znode自己被删时,设置在getChildren操作上的观察将会被触发。

设置监视器的操作及对应的触发器

watch

  • NodeCreated:节点创建事件;
  • NodeDeleted:代表znode被删除事件;
  • NodeDataChanged:节点数据改变事件;
  • NodeChildrenChanged:节点的子节点改变事件;

注意:

  • 对于NodeCreated和NodeDeleted事件,可以通过路径来判断哪一个节点被创建或删除;
  • 对于NodeChildrenChanged事件,需要重新调用getChildren来获取新的子节点列表来判断哪一个子节点被修改;
  • 对于NodeDataChanged事件,需要调用getData来获取最新的数据;
  • 对于上述第二、三种情况,从收到观察事件到执行操作期间,znode的状态可能会发生变化。

ACL 列表

每个 znode 被创建时都会有一个 ACL 列表,用于决定谁可以对它执行何种操作。ACL 依赖于 Zookeeper 的客户端身份验证机制。Zookeeper 提供了一下几种身份验证方式:

  • digest :通过用户名和密码来识别客户端;
  • host:通过客户端的主机名(hostname)来识别客户端;
  • ip : 通过客户端的 IP 地址来识别客户端。

在建议一个 Zookeeper 会话之后,客户端可以对自己进行身份验证。虽然 znode 的 ACL 列表会要求所有的客户端是经过验证的,但 Zookeeper 的身份验证过程却是可选的,客户端必须自己进行身份验证来支持对 znode 的访问。

1
2
3
4
5
//使用digest模式(用户和密码)进行身份验证
zk.addAuthInfo("digest","tom:secret".getBytes());

//给域example.com下的客户端对某个znode的读权限,可以使用host模式、example.com的ID和READ权限在该znode上设置一个ACL
new ACL(Perms.READ,new Id("host","example.com"));

ACL权限如下表:

ACL权限 允许的操作
CREATE create(子节点)
READ getChildren/getData
WRITE setData
DELETE delete(子节点)
ADMIN setACL

实现

这里先介绍一下Zookeeper在实际环境中使用时两种不同的运行模式:

  1. 独立模式(standalone mode)
    • 只有一个ZooKeeper服务器,这种模式比较简单,适用于测试环境,但是不能保证高可用性和恢复性;
  2. 复制模式(replicated mode)
    • 运行于一个计算机集群上,这个计算机集群被称为一个”集合体“(ensemble),ZooKeeper通过复制模式来实现高可用性,只要集合体中有半数以上的机器处于可用状态,他就可以提供服务;
    • 对于一个有5个节点的集合体中,最多可以容忍两台机器出现故障,这里要注意的是对于6个节点的集合体也是只能够容忍2台机器出现故障。

ZooKeeper要做的事情就是:确保对znode树的每一个修改都会被复制到集合体中超过半数的机器上。如果少于半数的机器出现故障,则最少有一台机器会保存最新的状态,其余的副本最终也会更新到这个状态。

Zookeeper使用了Zab协议,该协议包括两个可以无限重复的阶段:

  1. 阶段1:leader选举
    • 集合体中的所有机器通过一个选择过程来选出一台被称为“领导者”(leader)的机器,其他的机器被称为”跟随者“(follower)。一旦半数以上(或指定数量)的follower已经将其状态与leader同步,则标明这个阶段已经完成。
  2. 阶段2: 原子广播
    1. 所有的写请求都会被转发给leader,再由leader将更新广播给follwer;
    2. 当半数以上的follower已经将修改持久化之后,leader才会提交这个更新,然后客户端才会收到一个更新成功的响应。
    3. 这个用来打成共识的协议被设计成具有原子性,因此每个修改要么成功要么失败。

注意:

  • 如果leader出现故障,其余的机器会选出另外一个leader,并和新的leader继续提供服务。之后,如果之前的leader恢复正常,它就变成了一个follower(leader选举工程很快,根据目前的结果,大概只需要200ms);
  • 在更新内存中的znode树之前,集合体中的所有机器都会被先将更新写入磁盘。

一致性

理解 Zookeeper 的实现基础有助于理解其服务所提供的一致性保证。在集合体中所使用的术语leader和follower是恰当的,它们表名了一点,即一个follower可能滞后于leader几个更新。这也表名了一个现实情况,在一个修改被提交之前,只需要集合体中半数以上机器已经将该修改持久化即可。对 Zookeeper 来说,理想的情况就是将客户端都连接到与leader状态一致的服务器上,每个客户端都有连接到leader,但客户端对此无法控制,甚至它自己都无法知道是否连接到leader。参见下图

service

每一个对 znode 树的更新都被赋予一个全局唯一的 ID,称为zxid (代表 “Zookeeper Transaction ID”)。Zookeeper决定了分布式系统中的顺序,它对所有的更新进行排序,如果 zxid z1 小于 z2,则 z1 一定发生在 z2 之前。

在 Zookeeper 的设计中,以下几点考虑保证了数据的一致性。

  1. 顺序一致性
    • 来自任意特定客户端的更新都会按其发送顺序被提交。也就是说,如果一个客户端将 znode z 的值更新为 a,在之后的操作中,它又将 z 的值更新为 b ,则没有客户端能够在看到 z 的值是 b 之后再看到值 a(如果没有其他对于 z 的更新)。
  2. 原子性
    • 更新要么成功,要么失败,不会存在部分成功或失败的结果。如果失败了,则不会有客户端看到这个更新的结果。
  3. 单一系统映像
    • 一个客户端无论连接到具体哪一台服务器上,它看到的都是同样的系统视图。这意味着,如果一个客户端在同一个会话中连接到一台新的服务器,它所看到的系统状态不会比在之前服务器上所看到的更老。当一台服务器出故障,导致它的一个客户端需要尝试连接集合体中其他的服务器时,所有状态滞后于故障服务器的服务器都不会接受该连接请求,除非这些服务器将状态赶上故障服务器。
  4. 持久性(可靠性)
    • 一个更改一旦成功,其结果就会被持久化并且不会被撤。这表明更新不会受到服务器故障的影响。
  5. 及时性
    • 任何客户端所看到的系统视图的滞后都是有限的,不会超过几十秒,这意味着与其允许一个客户端看到非常陈旧的数据,还不如将服务器关闭,强迫该客户端连接到到一个状态较新的服务器。

由于性能的原因,所有的读操作都是从 Zookeeper 服务器的内存获得数据,它们不参与写操作的全局排序。如果客户端之间通过 Zookeeper 之外的机制进行通信,则客户端可能会发现它们所看到的 Zookeeper 状态是不一致的。

可以使用sync操作,保证任何后续的操作都在服务器的sync操作完成之后才执行。客户端使用sync操作来使自己保持最新的状态。

会话

每个 Zookeeper 客户端的配置中都包括集合体中服务器的列表。在启动时,客户端会尝试连接到列表中的一台服务器。如果连接失败,它会尝试连接另一台服务器,以此类推,直到成功与一台服务器建立连接或因为所有 Zookeeper 服务器都不可用而失败。

一旦客户端与一台 Zookeeper 服务器建立连接,这台服务器就会为该客户端创建一个新的会话。每个会话都会有一个超时的时间设置,这个设置由创建会话的应用来设定。如果服务器在超过时间段内没有收到任何请求,则相应的会话会过期。一旦一个会话已经过期,就无法重新被打开,并且任何与该会话相关联的短暂 znode 都会丢失。会话通常会长期存在,而会话过期则是一种比较罕见的事件,但对于应用来说,如何处理会话过期仍是非常重要的。

只要一个会话空闲超过一定时间,都可以通过客户端发送 ping 请求(也称为心跳)来保持会话不过期。(ping 请求是由 Zookeeper 的客户端库自动发送,因此在你的代码中不需要考虑如何维护会话)。这个时间长度的设置应当足够低,以便能够检测出服务器故障(由读超时体现),并且能够在会话超时的时间段内重新连接到另外一台服务器。

Zookeeper 客户端可以自动地进行故障切换,切换至另一台 Zookeeper 服务器,并且关键的是,在另一台服务器接替故障服务器之后,所有的会话(和相关的短暂 znode)仍然是有效的。

在故障切换过程中,应用程序将收到断开连接和连接至服务的通知。当客户端断开连接时,观察通知将无法发送;但是当客户端成功恢复连接后,这些延迟的通知还会被发送。当然,在客户端重新连接至另一台服务器的过程中,如果应用程序试图执行一个操作,这个操作将会失败。这充分说明在真实的 Zookeeper 应用中处理连接丢失异常的重要性。

时间

在 Zookeeper 中有几个时间参数。滴答 (tickTime) 参数定义了 ZooKeeper 中的基本时间周期。

其他设置都是根据 滴答 (tickTime) 参数来定义的,或至少受它的限制。例如,会话超时 (session timeout) 参数的值不可以小于 2 个 滴答 (tickTime) 并且不可以大于 20 个 滴答 (tickTime)。如果你试图将会话超时参数设置在这个范围之外,它将会被自动修改到这个范围之内。

通常将 滴答 (tickTime) 参数设置为 2 秒 (2000毫秒),对应于允许的会话超时范围是 4 到 40 秒。在选择会话超时设置时有几点需要考虑。

状态

ZooKeeper 对象在其生命周期中会经历几种不同的状态,如下图。你可以在任何时刻通过 getState() 方法来查询对象的状态。

1
public Status getState()

Status被定义为代表Zookeeper对象在不同状态的枚举类型值(一个Zookeeper的实例在一个时刻只能处于一种状态)。

status

  • 一个新建的Zookeeper实例处于CONNECTING状态。
  • 一旦建立连接,他就会进入CONNECTED状态。
  • 一个对象在进入CONNECTED状态时,观察对象会收到一个WatchedEvent通知,其中KeeperState的值是SyncConnected
  • Zookeeper实例可以断开,然后重新连接到Zookeeper服务,此时它的状态就在CONNECTEDCONNECTING之间转换。
  • 如果close()方法被调用或出现会话超时,Zookeeper实例就会转换到第三个状态CLOSED。一旦处于CLOSED状态,Zookeeper对象就不再被认为是活跃的,并且不能再用。

Zookeeper的观察对象有两个作用:

  • 它可以用来获得Zookeeper状态变化的相关通知;
  • 它还可以用来获得znode变化的相关通知。

ZooKeeper应用

配置服务示例

配置服务是分布式系统应用所需要的基本服务之一,它可以使集群中的机器共享配置信息中的那些公共部分。也就是说,Zookeeper可以作为一个具有高可用性的配置服务存储器,允许分布式应用的参与者检索和更新配置文件。

这里我们编写这样一个应用示例(完整代码参考Zookeeper Update Example),这里有两个假设来简化我们的示例:

  1. 所需存储的配置数据是字符串,关键字是znode的路径,因此我们在znode上存储了一个键值对;
  2. 在任何时候只有一个客户端会执行更新操作。

首先我们在ActiveKeyValueStore的类中编写如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import groupexample.ConnectionWatcher;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

import java.nio.charset.Charset;

public class ActiveKeyValueStore extends ConnectionWatcher {
private static final Charset CHARSET = Charset.forName("UTF-8");

public void write(String path, String value) throws InterruptedException, KeeperException {
Stat stat = zk.exists(path, false);

if (stat == null) {
zk.create(path, value.getBytes(CHARSET), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
zk.setData(path, value.getBytes(CHARSET), -1);
}
}

public String read(String path, Watcher watcher) throws InterruptedException, KeeperException {
byte[] data = zk.getData(path, watcher, null/*stat*/);
return new String(data, CHARSET);
}
}

这里面有两个关键方法:

  • write():将一个关键字及其值写入Zookeeper;
  • read():读取Zookeeper中的配置属性。

Zookeeper的getData()方法有三个参数:

  1. 路径;
  2. 一个观察对象;
  3. 一个Stat对象.

其中,Stat对象由getData()方法返回的值填充,用来将信息传回给调用者,通过这个方法,调用者可以获得一个znode的数据和元数据,但在本例中,由于我们对元数据不感兴趣,因此将Stat参数设为null。

下面我们编写一个用于更新配置属性值的类ConfigUpdater.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.apache.zookeeper.KeeperException;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeUnit;

public class ConfigUpdater {
public static final String PATH = "/matt";

private ActiveKeyValueStore store;
private Random random = new Random();

public ConfigUpdater(String hosts) throws IOException, InterruptedException {
store = new ActiveKeyValueStore();
store.connect(hosts);
}

public void run() throws InterruptedException, KeeperException {
while (true) {
String value = random.nextInt(100) + " ";
store.write(PATH, value);
System.out.printf("Set %s to %s \n", PATH, value);
TimeUnit.SECONDS.sleep(random.nextInt(10));
}
}
public static void main(String[] args) throws Exception{
ConfigUpdater configUpdater=new ConfigUpdater(args[0]);
configUpdater.run();
}
}

run()方法在随机时间将随机值更新到/mattznode中。

下面我们通过一个ConfigWatcher类初始化一个实例,然后在dirplayConfig()方法中调用read()显示它所读取到的配置信息的初始值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

import java.io.IOException;

public class ConfigWatcher implements Watcher {
private ActiveKeyValueStore store;

public ConfigWatcher(String hosts) throws IOException, InterruptedException {
store = new ActiveKeyValueStore();
store.connect(hosts);
}

public void displayConfig() throws InterruptedException, KeeperException {
String value = store.read(ConfigUpdater.PATH, this);
System.out.printf("Read %s as %s.\n", ConfigUpdater.PATH, value);
}

@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
try {
displayConfig();
} catch (InterruptedException e) {
System.out.println("Interrupted. exiting.");
Thread.currentThread().interrupt();
} catch (KeeperException e) {
System.out.printf("KeeperException: %s. Exiting.\n", e);
}
}
}

public static void main(String[] args) throws Exception {
ConfigWatcher configWatcher = new ConfigWatcher(args[0]);
configWatcher.displayConfig();

Thread.sleep(Long.MAX_VALUE);
}
}

运行命令:

1
2
3
# 两个控制台分别运行以下命令
java -cp zookeeperexample.jar updateexample.ConfigUpdater zkIP
java -cp zookeeperexample.jar updateexample.ConfigWatcher zkIp

这里要注意ConfigWatcher只能收到最近的一个更新,而不是收到所有的更新,每当ConfigWatcher调用时,就会收到最近的一个更新。

异常处理

在前面的两个例子,我们经常会看到InterruptedExceptionKeeperException这两种类型的异常,下面,我们就详细讲述一下。

InterruptedException异常

如果操作被中断,则会有一个InterruptedException异常。在Java中,有一个取消阻塞方法的标准机制,即针对存在阻塞方法的线程调用interrupt()。一个成功的取消操作将产生一个InterruptedException异常。Zookeeper也遵循这一机制,因此你可以使用这种方法来取消一个Zookeeper操作。使用了Zookeeper的类或者库时,通常就会传播InterruptedException异常,使客户端取消它们的操作。

InterruptedException异常并不意味着故障,只是表明相应的操作被取消了而已。

KeeperException异常

如果ZooKeeper服务器发出一个错误信号或与服务器存在通信问题,抛出的则是KeeperException异常。

  • 针对不同的错误情况,KeeperException异常存在不同的子类。
    例如: KeeperException.NoNodeExceptionKeeperException的一个子类,如果你试图针对一个不存在的znode执行操作,抛出的则是该异常。
  • 每一个KeeperException异常的子类都对应一个关于错误类型信息的代码。
    例如: KeeperException.NoNodeException异常的代码是KeeperException.Code.NONODE.

有两种方法被用来处理KeeperException异常:

  1. 捕捉KeeperException异常,并且通过检测它的代码来决定采取何种补救措施;
  2. 另一种是捕捉等价的KeeperException子类,并且在每段捕捉代码中执行相应的操作。

KeeperException异常分为三大类

1.状态异常

当一个操作因不能被应用于znode树而导致失败时,就会出现状态异常。状态异常产生的原因通常是在同一时间有另外一个进程正在修改znode。例如,如果一个znode先被另外一个进程更新了,根据版本号执行setData()操作的进程就会失败,并收到一个KeeperException.BadVersionException异常,这是因为版本号不匹配。程序员通常都知道这种冲突总是存在的,也都会编写代码来进行处理。

一些状态异常会指出程序中的错误,例如KeeperException.NoChildrenForEphemeralsException异常,试图在短暂znode下创建子节点时就会抛出该异常。

2.可恢复异常

可恢复的异常是指那些应用程序能够在同一个ZooKeeper会话中恢复的异常。一个可恢复的异常是通过KeeperException.ConnectionLossException来表示的,它意味着已经丢失了与ZooKeeper的连接。ZooKeeper会尝试重新连接,并且在大多数情况下重新连接会成功,并确保会话是完整的。

但是ZooKeeper不能判断与KeeperException.ConnectionLossException异常相关的操作是否成功执行。这种情况就是部分失败的一个例子。这时程序员有责任来解决这种不确定性,并且根据应用的情况来采取适当的操作。在这一点上,就需要对幂等(idempotent)操作非幂等(Nonidempotent)操作进行区分。

  • 幂等操作:指那些一次或多次执行都会产生相同结果的操作,例如读请求或无条件执行的setData操作。对于幂等操作,只需要简单地进行重试即可。
  • 非幂等操作:就不能盲目地进行重试,因为它们多次执行的结果与一次执行是完全不同的。程序可以通过在znode的路径和它的数据中编码信息来检测是否非幂等操怍的更新已经完成。

3.不可恢复的异常

在某些情况下,ZooKeeper会话会失效——也许因为超时或因为会话被关闭,两种情况下都会收到KeeperException.SessionExpiredException异常,或因为身份验证失败,KeeperException.AuthFailedException异常。无论上述哪种情况,所有与会话相关联的短暂znode都将丢失,因此应用程序需要在重新连接到ZooKeeper之前重建它的状态。

到这里,对Zookeeper的主要内容已经讲述差不多了,希望对大家能有所帮助。


参考: