随着 Apache Pulsar 成为 Apache 的顶级开源项目,其存储层的解决方案 Apache BookKeeper 再次受到业界广泛关注。BookKeeper 在 Pulsar 之前也有很多成功的应用,比如使用 BookKeeper 实现了 HDFS NameNode 的 HA 机制(可能大部分公司使用的还是 Quorum Journal Manage 方案)、Twitter 开源的 DistributedLog 系统(可参考Twitter开源分布式高性能日志复制服务),BookKeeper 作为一个高扩展、强容错、低延迟的存储服务(A scalable, fault-tolerant, and low-latency storage service optimized for real-time workloads),它相当于把底层的存储层系统服务化(BookKeeper 是更底层的存储服务,类似于 Kafka 的存储层)。这样可以使得依赖于 BookKeeper 实现的分布式存储系统(包括分布式消息队列)在设计时可以只关注其应用层和功能层的内容,存储层比较难解决的问题像一致性、容错等,BookKeeper 已经实现了,从这个层面看,BookKeeper 确实解决业内的一些问题,而且 BookKeeper (Ledger 化,Ledger 相当于 Kafka segment)天生适合云上部署,未来还是有很大潜力的。近段对 BookKeeper 做了一些相应的调研,做了一些总结,本文将会主要从集群部署和使用角度来介绍一下 Apache BookKeeper,后面准备再写一篇文章来深入讲述其架构设计及实现原理。

BookKeeper 简介

这里先对 BookKeeper 的基本概念做一下介绍,下图是 BookKeeper 的架构图(图片来自 Introduction to Apache BookKeeper):

Apache BookKeeper 架构图

在 BookKeeper 中节点(Server)被称作 Bookie(类似于 Kafka 中 Broker,HDFS 中的 DN,但是 BookKeeper 没有 Master 节点,它是典型 Slave/Slave 架构),数据在 Bookie 上以 Ledger 的形式存储(类似 Kafka 中的 Segment,HDFS 中的 Block), BookKeeper 相关的基本概念如下:

  1. Cluster: 所有的 Bookie 组成一个集群(连接到同一个 zk 地址的 Bookie 集合);
  2. Bookie:BookKeeper 的存储节点,也即 Server 节点;
  3. Ledger:Ledger 是对一个 log 文件的抽象,它本质上是由一系列 Entry (类似与 Kafka 每条 msg)组成的,client 在向 BookKeeper 写数据时也是往 Ledger 中写的;
  4. Entry:entry 本质上就是一条数据,它会有一个 id 做标识;
  5. Journal: Write ahead log,数据是先写到 Journal 中,这个也是 BookKeeper 读写分离实现机制的一部分,后续会详细分析;
  6. Ensemble: Set of Bookies across which a ledger is striped,一个 Ledger 所涉及的 Bookie 集合,初始化 Ledger 时,需要指定这个 Ledger 可以在几台 Bookie 上存储;
  7. Write Quorum Size: Number of replicas,要写入的副本数;
  8. Ack Quorum Size: Number of responses needed before client’s write is satisfied,当这么多副本写入成功后才会向 client 返回成功,比如副本数设置了 3,这个设置了2,client 会同时向三副本写入数据,当收到两个成功响应后,会认为数据已经写入成功;
  9. LAC: Last Add Confirmed,Ledger 中已经确认的最近一条数据的 entry id。

BookKeeper 集群搭建

关于 BookKeeper 集群的搭建可以参考 Apache BookKeeper Manual deployment 这篇文章。

集群搭建前准备

BookKeeper 集群搭建需要:

  1. ZooKeeper 集群;
  2. 一些 Bookie 节点(在集群的模式下最好是选取三台);
  3. JDK 版本要求是 JDK8;

这里先看下 BookKeeper 的目录结构,跟其他分布式系统也类似,命令在 bin 目录下,配置文件在 conf 目录下,lib 是其依赖的相关 jar 包,如下所示:

1
2
3
4
5
6
7
8
9
[matt@XXX2 bookkeeper]$ ll
total 64
drwxr-xr-x 2 matt matt 4096 Sep 20 18:35 bin
drwxr-xr-x 2 matt matt 4096 Sep 20 18:35 conf
drwxrwxr-x 9 matt matt 4096 Oct 9 21:41 deps
drwxrwxr-x 2 matt matt 12288 Oct 9 21:41 lib
-rw-r--r-- 1 matt matt 24184 Sep 20 18:35 LICENSE
-rw-r--r-- 1 matt matt 5114 Sep 20 18:35 NOTICE
-rw-r--r-- 1 matt matt 4267 Sep 20 18:35 README.md

bin 目录下提供了 BookKeeper 相应的操作命令,这里用的命令主要是 bin/bookkeeper*bookkeeper-daemon.sh 可以让 Bookie 进程在后台自动运行),可以在 bin/common.sh 配置一些通用的配置(比如 JAVA_HOME),关于 bookkeeper 命令的使用方法见 bookkeeper cli

1
2
3
4
5
6
7
8
9
10
11
[matt@XXX2 bookkeeper]$ ll bin/
total 56
-rwxr-xr-x 1 matt matt 2319 Sep 20 18:35 bkctl
-rwxr-xr-x 1 matt matt 5874 Sep 20 18:35 bookkeeper
-rwxr-xr-x 1 matt matt 2869 Sep 20 18:35 bookkeeper-cluster.sh
-rwxr-xr-x 1 matt matt 4590 Sep 20 18:35 bookkeeper-daemon.sh
-rwxr-xr-x 1 matt matt 7785 Sep 20 18:35 common.sh
-rwxr-xr-x 1 matt matt 4575 Sep 20 18:35 dlog
-rwxr-xr-x 1 matt matt 1738 Sep 20 18:35 standalone
-rwxr-xr-x 1 matt matt 5128 Sep 20 18:35 standalone.docker-compose
-rwxr-xr-x 1 matt matt 1854 Sep 20 18:35 standalone.process

在 bookkeper 命令中,又提供了 shell 的相关命令,这里提供的命令非常丰富,可以参考 BookKeeper Shell,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
[matt@XXX2 bookkeeper]$ bin/bookkeeper shell
Usage: bookkeeper shell [-localbookie [<host:port>]] [-ledgeridformat <hex/long/uuid>] [-entryformat <hex/string>] [-conf configuration] <command>
where command is one of:
autorecovery [-enable|-disable]
bookieformat [-nonInteractive] [-force] [-deleteCookie]
bookieinfo
bookiesanity [-entries N] [-timeout N]
convert-to-db-storage
convert-to-interleaved-storage
decommissionbookie [-bookieid <bookieaddress>]
deleteledger -ledgerid <ledgerid> [-force]
help [COMMAND]
initbookie
initnewcluster
lastmark
ledger [-m] <ledger_id>
ledgermetadata -ledgerid <ledgerid>
listbookies [-readwrite|-readonly] [-hostnames]
listfilesondisc [-journal|-entrylog|-index]
listledgers [-meta] [-bookieid <bookieaddress>]
listunderreplicated [[-missingreplica <bookieaddress>] [-excludingmissingreplica <bookieaddress>]] [-printmissingreplica] [-printreplicationworkerid]
lostbookierecoverydelay [-get|-set <value>]
metaformat [-nonInteractive] [-force]
nukeexistingcluster -zkledgersrootpath <zkledgersrootpath> [-instanceid <instanceid> | -force]
readjournal [-dir] [-msg] <journal_id | journal_file_name>
readledger [-bookie <address:port>] [-msg] -ledgerid <ledgerid> [-firstentryid <firstentryid> [-lastentryid <lastentryid>]] [-force-recovery]
readlog [-msg] <entry_log_id | entry_log_file_name> [-ledgerid <ledgerid> [-entryid <entryid>]] [-startpos <startEntryLogBytePos> [-endpos <endEntryLogBytePos>]]
readlogmetadata <entry_log_id | entry_log_file_name>
rebuild-db-ledger-locations-index
recover [-deleteCookie] <bookieSrc[:bookieSrc]>
simpletest [-ensemble N] [-writeQuorum N] [-ackQuorum N] [-numEntries N]
triggeraudit
updatecookie [-bookieId <hostname|ip>] [-expandstorage] [-list] [-delete <force>]
updateledgers -bookieId <hostname|ip> [-updatespersec N] [-limit N] [-verbose true/false] [-printprogress N]
whatisinstanceid
whoisauditor

conf 目录下是关于 BookKeeper 的相关配置,如下所示,主要配置在 bk_server.conf 中,这里可以提供的配置非常多,具体可配置的参数可以参考 BookKeeper Config

1
2
3
4
5
6
7
8
9
10
11
12
13
[matt@XXX2 bookkeeper]$ ll conf/
total 84
-rw-r--r-- 1 matt matt 1804 Sep 20 18:35 bk_cli_env.sh
-rw-r--r-- 1 matt matt 2448 Sep 20 18:35 bkenv.sh
-rwxr-xr-x 1 matt matt 42269 Sep 20 18:35 bk_server.conf
-rw-r--r-- 1 matt matt 1211 Sep 20 18:35 jaas_example.conf
-rw-r--r-- 1 matt matt 2311 Sep 20 18:35 log4j.cli.properties
-rw-r--r-- 1 matt matt 2881 Sep 20 18:35 log4j.properties
-rw-r--r-- 1 matt matt 1810 Sep 20 18:35 log4j.shell.properties
-rw-r--r-- 1 matt matt 1117 Sep 20 18:35 nettyenv.sh
-rwxr-xr-x 1 matt matt 1300 Sep 20 18:35 standalone.conf
-rw-r--r-- 1 matt matt 3275 Sep 20 18:35 zookeeper.conf
-rw-r--r-- 1 matt matt 843 Sep 20 18:35 zookeeper.conf.dynamic

集群搭建

Apache BookKeeper Releases 中下载 BookKeeper 最新的安装包(这里以 bookkeeper-server-4.8.0-bin.tar.gz 为例)。

将安装包在指定目录下解压后,启动的操作分为以下几步:

  1. 修改相关配置(zkServersbookiePortjournalDirledgerDir 等);
  2. 在相应的机器上启动 Bookie 进程(使用 ./bin/bookkeeper-daemon.sh start bookie 启动 Bookie);
  3. 当所有的 Bookie 启动完成后,随便选择一台,初始化集群 meta 信息(使用 bookkeeper-server/bin/bookkeeper shell metaformat 命令初始化集群的 meta 信息,这里只需要初始化一次)。

如果启动成功的话(如果有异常日志,即使 Bookie 进程存在,也可能没有启动成功),启动正常的情况下,在日志中,可以看到类似下面的信息:

1
2018-10-15 11:24:49,549 - INFO  [main:ComponentStarter@81] - Started component bookie-server.

Admin REST API

BookKeeper 服务提供了相应的 Rest API,可供管理员使用,具体可以参考 BookKeeper Admin REST API,如果想要使用这个功能,首先需要 Bookie 服务将 bk_server.conf 中的 httpServerEnabled 配置设置为 true ,相关的配置参考 Http server settings

安装时踩的坑

在搭建 BookKeeper 集群中,并没有想象中那么顺畅,遇到了一些小问题,记录如下:

问题1:修改配置后重新启动失败

在使用 ./bin/bookkeeper-daemon.sh stop bookie 命令关闭 Bookie 进程,当关闭完 Bookie 进程后,再次启动时,发现无法启动,报出了下面的错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
2018-10-13 21:05:40,674 - ERROR [main:Main@221] - Failed to build bookie server
org.apache.bookkeeper.bookie.BookieException$InvalidCookieException: instanceId 406a08e5-911e-4ab6-b97b-40e4a56279a8 is not matching with null
at org.apache.bookkeeper.bookie.Cookie.verifyInternal(Cookie.java:142)
at org.apache.bookkeeper.bookie.Cookie.verify(Cookie.java:147)
at org.apache.bookkeeper.bookie.Bookie.verifyAndGetMissingDirs(Bookie.java:381)
at org.apache.bookkeeper.bookie.Bookie.checkEnvironmentWithStorageExpansion(Bookie.java:444)
at org.apache.bookkeeper.bookie.Bookie.checkEnvironment(Bookie.java:262)
at org.apache.bookkeeper.bookie.Bookie.<init>(Bookie.java:646)
at org.apache.bookkeeper.proto.BookieServer.newBookie(BookieServer.java:133)
at org.apache.bookkeeper.proto.BookieServer.<init>(BookieServer.java:102)
at org.apache.bookkeeper.server.service.BookieService.<init>(BookieService.java:43)
at org.apache.bookkeeper.server.Main.buildBookieServer(Main.java:299)
at org.apache.bookkeeper.server.Main.doMain(Main.java:219)
at org.apache.bookkeeper.server.Main.main(Main.java:201)

大概的意思就是说现在 zk 上的 instanceId 是 406a08e5-911e-4ab6-b97b-40e4a56279a8,而期望的 instanceId 是 null,索引因为验证失败导致进程无法启动,instanceId 是搭建集群第三步(初始化集群 meta 信息的地方)中初始化的。此时如果我们启动测试的 client 程序,会抛出以下异常,这是因为目前集群只有2台 Bookie 处在可用状态,而 ensSize 默认是 3,writeQuorumSize 是 2,ackQuorumSize 是2。在 client 的测试程序中,新建一个 Ledger 时,由于集群当前可用的 Bookie 为2,不满足相应的条件,所以抛出了一下的异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
org.apache.bookkeeper.client.BKException$BKNotEnoughBookiesException: Not enough non-faulty bookies available
at org.apache.bookkeeper.client.SyncCallbackUtils.finish(SyncCallbackUtils.java:83)
at org.apache.bookkeeper.client.SyncCallbackUtils$SyncCreateCallback.createComplete(SyncCallbackUtils.java:106)
at org.apache.bookkeeper.client.LedgerCreateOp.createComplete(LedgerCreateOp.java:238)
at org.apache.bookkeeper.client.LedgerCreateOp.initiate(LedgerCreateOp.java:142)
at org.apache.bookkeeper.client.BookKeeper.asyncCreateLedger(BookKeeper.java:891)
at org.apache.bookkeeper.client.BookKeeper.createLedger(BookKeeper.java:975)
at org.apache.bookkeeper.client.BookKeeper.createLedger(BookKeeper.java:930)
at org.apache.bookkeeper.client.BookKeeper.createLedger(BookKeeper.java:911)
at com.matt.test.bookkeeper.ledger.LedgerTest.createLedgerSync(LedgerTest.java:110)
at com.matt.test.bookkeeper.ledger.LedgerTest.main(LedgerTest.java:25)
Exception in thread "main" java.lang.NullPointerException
at com.matt.test.bookkeeper.ledger.LedgerTest.main(LedgerTest.java:26)

关于这个 BookieException$InvalidCookieException 异常,google 了一下并没有找到相应的解决办法,所以就直接看了相应的代码,抛出异常的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void verifyInternal(Cookie c, boolean checkIfSuperSet) throws BookieException.InvalidCookieException {
String errMsg;
if (c.layoutVersion < 3 && c.layoutVersion != layoutVersion) {
errMsg = "Cookie is of too old version " + c.layoutVersion;
LOG.error(errMsg);
throw new BookieException.InvalidCookieException(errMsg);
} else if (!(c.layoutVersion >= 3 && c.bookieHost.equals(bookieHost)
&& c.journalDirs.equals(journalDirs) && verifyLedgerDirs(c, checkIfSuperSet))) {
errMsg = "Cookie [" + this + "] is not matching with [" + c + "]";
throw new BookieException.InvalidCookieException(errMsg);
} else if ((instanceId == null && c.instanceId != null)
|| (instanceId != null && !instanceId.equals(c.instanceId))) {
// instanceId should be same in both cookies
errMsg = "instanceId " + instanceId
+ " is not matching with " + c.instanceId;
throw new BookieException.InvalidCookieException(errMsg); // 由于 instanceId 不匹配,抛出了相应的异常
}
}

这里可以看到的是从 zk 上拿到的 instanceId 是 406a08e5-911e-4ab6-b97b-40e4a56279a8,而 Cookie 实例 c 中的 instanceId 为 null,那么 这个 Cookie 是如何初始化的呢?往上追一下代码,发现是在初始化 Bookie 时,会检查一下相应的运行环境,此时会从 journalDirectories 和 ledgerDirectories 中 current/VERSION 中初始化相应的 Cookie 对象,由于这个台机器之前启动过,所以这个文件已经创建了,文件的内容如下:

1
2
3
4
5
6
7
8
9
10
[matt@XXX2 bookkeeper]$ cat /tmp/bk-data/current/VERSION
4
bookieHost: "XXX:3181"
journalDir: "/tmp/bk-txn"
ledgerDirs: "1\t/tmp/bk-data"
[matt@XXX2 bookkeeper]$ cat /tmp/bk-txn/current/VERSION
4
bookieHost: "XXX:3181"
journalDir: "/tmp/bk-txn"
ledgerDirs: "1\t/tmp/bk-data"

Cookie 从文件加载相应文件,并初始化对象的实现方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* Read cookie from registration manager for a given bookie <i>address</i>.
*
* @param rm registration manager
* @param address bookie address
* @return versioned cookie object
* @throws BookieException when fail to read cookie
*/
public static Versioned<Cookie> readFromRegistrationManager(RegistrationManager rm,
BookieSocketAddress address) throws BookieException {
Versioned<byte[]> cookieData = rm.readCookie(address.toString());
try {
try (BufferedReader reader = new BufferedReader(
new StringReader(new String(cookieData.getValue(), UTF_8)))) {
Builder builder = parse(reader);
Cookie cookie = builder.build();
return new Versioned<Cookie>(cookie, cookieData.getVersion());
}
} catch (IOException ioe) {
throw new InvalidCookieException(ioe);
}
}

private static Builder parse(BufferedReader reader) throws IOException {
Builder cBuilder = Cookie.newBuilder();
int layoutVersion = 0;
String line = reader.readLine();
if (null == line) {
throw new EOFException("Exception in parsing cookie");
}
try {
layoutVersion = Integer.parseInt(line.trim());
cBuilder.setLayoutVersion(layoutVersion);
} catch (NumberFormatException e) {
throw new IOException("Invalid string '" + line.trim()
+ "', cannot parse cookie.");
}
if (layoutVersion == 3) {
cBuilder.setBookieHost(reader.readLine());
cBuilder.setJournalDirs(reader.readLine());
cBuilder.setLedgerDirs(reader.readLine());
} else if (layoutVersion >= 4) { //这里的版本默认为 4
CookieFormat.Builder cfBuilder = CookieFormat.newBuilder();
TextFormat.merge(reader, cfBuilder);
CookieFormat data = cfBuilder.build();
cBuilder.setBookieHost(data.getBookieHost());
cBuilder.setJournalDirs(data.getJournalDir());
cBuilder.setLedgerDirs(data.getLedgerDirs());
// Since InstanceId is optional
if (null != data.getInstanceId() && !data.getInstanceId().isEmpty()) { //如果文件中没有 instanceId 字段,这里就不会初始化到 Cookie 中
cBuilder.setInstanceId(data.getInstanceId());
}
}
return cBuilder;
}

解决的方法很简单,在 current/VERSION 文件中添加相应的 instanceId 字段后,Bookie 便可启动成功。但是这里还需要考虑的问题是:

  • instanceId 在这里的作用是什么?instanceId 是在集群初始化时设置的,关于这个值的含义,我推测它的目的是对节点的上线做一个简单的认证,也就是说如果打算在集群中新添加一台 Bookie,需要知道当前的 instanceId 值,这样才能加入到这个集群中;
  • Bookie 服务的启动流程是什么样的?这里就需要看下代码的具体实现,追一下 Bookie 的启动流程了。

BookKeeper API 使用

关于 BookKeeper API,总共提供了以下三种 API:

  1. The ledger API is a lower-level API that enables you to interact with ledgers directly,第一种是一种较为底层的 API 接口,直接与 Ledger 交互,见 The Ledger API
  2. The Ledger Advanced API is an advanced extension to Ledger API to provide more flexibilities to applications,第二种较高级的 API,提供了一些较高级的功能,见 The Advanced Ledger API
  3. The DistributedLog API is a higher-level API that provides convenient abstractions,这种是关于 DistributedLog 的一些操作 API,见 DistributedLog

在这节,我们主要看下第一种的实现,会简单讲述一下第二种,第三种这里不再介绍。

The Ledger API

关于 Ledger API 基本操作主要有以下几种:

  1. 创建 Ledger;
  2. 向 Ledger 写入数据(Entry);
  3. 关闭 Ledger,Ledger 关闭后数据就不能再写入,Ledger 一旦关闭它的数据就是不可变的;
  4. 从 Ledger 中读取数据;
  5. 删除 Ledger。

当然实现上述操作的前提是,需要先初始化一个 BookKeeper Client,下面开始慢慢讲述。

初始化 BookKeeper Client

BK Client 的初始化需要指定 zk 地址,BK Client 通过 zk 来连接到 BK 集群,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 第一种初始化 BookKeeper Client 的方法
try {
String connectionString = zkAddr; // For a single-node, local ZooKeeper cluster
BookKeeper bkClient = new BookKeeper(connectionString);
logger.info("BookKeeper client init success.");
} catch (InterruptedException | IOException | BKException e) {
e.printStackTrace();
throw new RuntimeException(
"There is an exception throw while creating the BookKeeper client.");
}

// 第二种初始化 BookKeeper Client 的方法
try {
ClientConfiguration config = new ClientConfiguration();
config.setZkServers(zkAddr);
config.setAddEntryTimeout(2000);
BookKeeper bkClient = new BookKeeper(config);
logger.info("BookKeeper client init success.");
} catch (InterruptedException | IOException | BKException e) {
e.printStackTrace();
throw new RuntimeException(
"There is an exception throw while creating the BookKeeper client.");
}

新建一个 Ledger

Ledger 的创建有两种,一种是同步创建,一种是异步创建(创建时需要指定相应的 password),其实现分别如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* create the ledger, default ensemble size is 3, write quorum size is 2, ack quorum size is 2
*
* @param pw password
* @return LedgerHandle
*/
public LedgerHandle createLedgerSync(String pw) {
byte[] password = pw.getBytes();
try {
LedgerHandle handle = bkClient.createLedger(BookKeeper.DigestType.MAC, password);
return handle;
} catch (BKException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}

/**
* create the ledger
*
* @param pw password
*/
public void createLedgerAsync(String pw) {

class LedgerCreationCallback implements AsyncCallback.CreateCallback {

public void createComplete(int returnCode, LedgerHandle handle, Object ctx) {
System.out.println("Ledger successfully created");
logger.info("Ledger successfully created async.");
}
}

bkClient.asyncCreateLedger(
3, // ensSize
2, // writeQuorumSize and ackQuorumSize
BookKeeper.DigestType.MAC,
pw.getBytes(),
new LedgerCreationCallback(),
"some context"
);
}

新建好 Ledger 之后,会返回一个 LedgerHandle 实例,对于 Ledger 的操作都是通过这个实例对象完成的,也可以通过 LedgerHandle.getId() 方法获取 Ledger 的 id,有了这个 id 就可以映射到具体的 Ledger,当需要读取数据时,通过 ledger id 初始化相应的 LedgerHandle 实例即可。

向 Ledger 写入数据

有了 Ledger 对应的 LedgerHandle 实例之后,可以通过 addEntry() 方法直接向 Ledger 写数据,如下所示:

1
2
3
4
5
6
7
8
9
10
public long addEntry(LedgerHandle ledgerHandle, String msg) {
try {
return ledgerHandle.addEntry(msg.getBytes());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BKException e) {
e.printStackTrace();
}
return -1;
}

从 Ledger 读取数据

从 Ledger 读取数据时,也是通过 LedgerHandle 实例的方法实现,提供了以下三种方法:

  1. 指定读取的 entry.id 范围消费;
  2. 从某一个 entry.id 一直读取到 LAC (LastAddConfirmed,该 Ledger 中最近的已经确认的数据)位置;
  3. 从某一个 entry.id 一直读取到 lastEntryIdExpectedToRead 位置,该位置可以比 LAC 大,前提是需要该值已经有对应的数据;

方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* read entry from startId to endId
*
* @param ledgerHandle the ledger
* @param startId start entry id
* @param endId end entry id
* @return the entries, if occur exception, return null
*/
public Enumeration<LedgerEntry> readEntry(LedgerHandle ledgerHandle, int startId, int endId) {
try {
return ledgerHandle.readEntries(startId, endId);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BKException e) {
e.printStackTrace();
}
return null;
}

/**
* read entry from 0 to the LAC
*
* @param ledgerHandle the ledger
* @return the entries, if occur exception, return null
*/
public Enumeration<LedgerEntry> readEntry(LedgerHandle ledgerHandle) {
try {
return ledgerHandle.readEntries(0, ledgerHandle.getLastAddConfirmed());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BKException e) {
e.printStackTrace();
}
return null;
}

/**
* read entry form 0 to lastEntryIdExpectedToRead which can larger than the LastAddConfirmed range
*
* @param ledgerHandle the handle
* @param lastEntryIdExpectedToRead the last entry id
* @return the entries, if occur exception, return null
*/
public Enumeration<LedgerEntry> readEntry(LedgerHandle ledgerHandle,
long lastEntryIdExpectedToRead) {
try {
return ledgerHandle.readUnconfirmedEntries(0, lastEntryIdExpectedToRead);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BKException e) {
e.printStackTrace();
}
return null;
}

删除 Ledger

Ledger 的删除实现也很简洁,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* delete the ledger
*
* @param ledgerId the ledger id
* @return if occur exception, return false
*/
public boolean deleteLedger(long ledgerId) {
try {
bkClient.deleteLedger(ledgerId);
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}

The Ledger Advanced API

Ledger 的 Advanced API 在用法上与上面的实现差异不大,它向应用提供了更大的灵活性,比如:在创建 Ledger 时,应用可以指定 LedgerId,写入 Entry 时,应用也可以指定相应的 EntryID。

新建 Ledger

在新建 Ledger 这部分,Advanced API 可以指定 LedgerId 创建相应的 Ledger,如下面示例的第三种实现。

假设当前 BK 集群的 LedgerId 已经到了5,这时候在新建 Ledger 时如果不指定 LedgerId,下一个被使用的 LedgerId 就是6,如果应用指定了 7,新建的 Leader 的 id 将会是设置的 7,id 6 会等待下次再被使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/**
* create the ledger
*
* @param password pw
* @return LedgerHandleAdv
*/
public LedgerHandleAdv createLedger(String password) {
byte[] passwd = password.getBytes();
try {
LedgerHandleAdv handle = (LedgerHandleAdv) bkClient.createLedgerAdv(
3, 3, 2, // replica settings
BookKeeper.DigestType.CRC32,
passwd);
return handle;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BKException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}

/**
* create the ledger async
*
* @param password
*/
public void createLedgerAsync(String password) {
class LedgerCreationCallback implements AsyncCallback.CreateCallback {
public void createComplete(int returnCode, LedgerHandle handle, Object ctx) {
System.out.println("Ledger successfully created");
}
}
bkClient.asyncCreateLedgerAdv(
3, // ensemble size
3, // write quorum size
2, // ack quorum size
BookKeeper.DigestType.CRC32,
password.getBytes(),
new LedgerCreationCallback(),
"some context",
null);
}

/**
* create the ledger on special ledgerId
*
* @param password pw
* @param ledgerId the ledger id, if the ledger id exist, it will return BKLedgerExistException
* @return LedgerHandleAdv
*/
public LedgerHandleAdv createLedger(String password, long ledgerId) {
byte[] passwd = password.getBytes();
try {
LedgerHandleAdv handle = (LedgerHandleAdv) bkClient.createLedgerAdv(
ledgerId,
3, 3, 2, // replica settings
BookKeeper.DigestType.CRC32,
passwd,
null);
return handle;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BKException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}

向 Ledger 添加 Entry

向 Ledger 添加 Entry API 中,最吸引我的是可以指定 EntryId 写入(熟悉 Kafka 的同学知道,向 Kafka 写入数据是可以指定 Partition,但是不能指定 offset,如果可以指定 offset 写入,那么在做容灾时就可以实现 topic 的完全同步,下游可以根据 commit offset 随时切换数据源),其示例如下(注意,Advanced API 在写数据时是强制要指定 entryId 的):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* add the msg to the ledger on the special entryId
*
* @param ledgerHandleAdv ledgerHandleAdv
* @param entryId the entry id
* @param msg msg
*/
public void addEntry(LedgerHandleAdv ledgerHandleAdv, long entryId, String msg) {
try {
ledgerHandleAdv.addEntry(entryId, msg.getBytes());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BKException e) {
e.printStackTrace();
}
}

关于这个 API,社区官方文档有如下介绍:

  1. The entry id has to be non-negative.
  2. Clients are okay to add entries out of order.
  3. However, the entries are only acknowledged in a monotonic order starting from 0.

首先,说下我对上面的理解:entry.id 要求是非负的,client 在添加 entry 时可以乱序,但是 entry 只有 0 开始单调顺序增加时才会被 ack。最开始,我以为是只要 entry.id 单调递增就可以,跑了一个测试用例,第一个 entry 的 id 设置为 0,第二个设置为 2,然后程序直接 hang 在那里了,相应日志信息为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2018-10-19 16:58:34  [ BookKeeperClientWorker-OrderedExecutor-0-0:662 ] - [ DEBUG ]  Got Add response from bookie:XXX.230:3181 rc:EOK, ledger:8:entry:0
2018-10-19 16:58:34 [ BookKeeperClientWorker-OrderedExecutor-0-0:663 ] - [ DEBUG ] Got Add response from bookie:XXX.247:3181 rc:EOK, ledger:8:entry:0
2018-10-19 16:58:34 [ BookKeeperClientWorker-OrderedExecutor-0-0:663 ] - [ DEBUG ] Submit callback (lid:8, eid: 0). rc:0
2018-10-19 16:58:34 [ main:663 ] - [ DEBUG ] Adding entry [50, 32, 109, 97, 116, 116, 32, 116, 101, 115, 116]
2018-10-19 16:58:34 [ BookKeeperClientWorker-OrderedExecutor-0-0:673 ] - [ DEBUG ] Got Add response from bookie:XXX.247:3181 rc:EOK, ledger:8:entry:2
2018-10-19 16:58:34 [ BookKeeperClientWorker-OrderedExecutor-0-0:673 ] - [ DEBUG ] Got Add response from bookie:XXX.230:3181 rc:EOK, ledger:8:entry:2
2018-10-19 16:58:34 [ BookKeeperClientWorker-OrderedExecutor-0-0:673 ] - [ DEBUG ] Head of the queue entryId: 2 is not the expected value: 1
2018-10-19 16:58:34 [ BookKeeperClientWorker-OrderedExecutor-0-0:673 ] - [ DEBUG ] Got Add response from bookie:XXX.146:3181 rc:EOK, ledger:8:entry:0
2018-10-19 16:58:34 [ BookKeeperClientWorker-OrderedExecutor-0-0:673 ] - [ DEBUG ] Head of the queue entryId: 2 is not the expected value: 1
2018-10-19 16:58:34 [ BookKeeperClientWorker-OrderedExecutor-0-0:681 ] - [ DEBUG ] Got Add response from bookie:XXX.146:3181 rc:EOK, ledger:8:entry:2
2018-10-19 16:58:34 [ BookKeeperClientWorker-OrderedExecutor-0-0:681 ] - [ DEBUG ] Head of the queue entryId: 2 is not the expected value: 1
2018-10-19 16:58:37 [ main-SendThread(zk01:2181):3702 ] - [ DEBUG ] Got ping response for sessionid: 0x3637dbff9e7486c after 0ms
2018-10-19 16:58:40 [ main-SendThread(zk01:2181):7039 ] - [ DEBUG ] Got ping response for sessionid: 0x3637dbff9e7486c after 0ms
2018-10-19 16:58:43 [ main-SendThread(zk01:2181):10374 ] - [ DEBUG ] Got ping response for sessionid: 0x3637dbff9e7486c after 0ms
2018-10-19 16:58:47 [ main-SendThread(zk01:2181):13710 ] - [ DEBUG ] Got ping response for sessionid: 0x3637dbff9e7486c after 0ms
2018-10-19 16:58:50 [ main-SendThread(zk01:2181):17043 ] - [ DEBUG ] Got ping response for sessionid: 0x3637dbff9e7486c after 0ms

可以看到有这样的异常日志 Head of the queue entryId: 2 is not the expected value: 1,期望的 entry id 是 1,这里是 2,乱序了,导致程序直接 hang 住(hang 住的原因推测是这个 Entry 没有被 ack),该异常信息出现地方如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void sendAddSuccessCallbacks() {
// Start from the head of the queue and proceed while there are
// entries that have had all their responses come back
PendingAddOp pendingAddOp;

while ((pendingAddOp = pendingAddOps.peek()) != null
&& blockAddCompletions.get() == 0) {
if (!pendingAddOp.completed) {
if (LOG.isDebugEnabled()) {
LOG.debug("pending add not completed: {}", pendingAddOp);
}
return;
}
// Check if it is the next entry in the sequence.
if (pendingAddOp.entryId != 0 && pendingAddOp.entryId != pendingAddsSequenceHead + 1) {
if (LOG.isDebugEnabled()) {
LOG.debug("Head of the queue entryId: {} is not the expected value: {}", pendingAddOp.entryId,
pendingAddsSequenceHead + 1);
}
return;
}

pendingAddOps.remove();
explicitLacFlushPolicy.updatePiggyBackedLac(lastAddConfirmed);
pendingAddsSequenceHead = pendingAddOp.entryId;
if (!writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
this.lastAddConfirmed = pendingAddsSequenceHead;
}
pendingAddOp.submitCallback(BKException.Code.OK);
}
}

如果 entry id 出现了乱序,会导致这个 add 操作没有正常处理。但是如果这里强制要求 entry.id 从 0,而还有序,那么这个 API 跟前面的 API 有什么区别?这点没有搞懂,也向社区发一封邮件咨询,还在等待社区的响应。