热门IT资讯网

ZooKeeper实现生产-消费者队列

发表于:2024-11-29 作者:热门IT资讯网编辑
编辑最后更新 2024年11月29日,生产-消费者队列,用于多节点的分布式数据结构,生产和消费数据。生产者创建一个数据对象,并放到队列中;消费者从队列中取出一个数据对象并进行处理。在ZooKeeper中,队列可以使用一个容器节点下创建多个

生产-消费者队列,用于多节点的分布式数据结构,生产和消费数据。生产者创建一个数据对象,并放到队列中;消费者从队列中取出一个数据对象并进行处理。在ZooKeeper中,队列可以使用一个容器节点下创建多个子节点来实现;创建子节点时,CreateMode使用 PERSISTENT_SEQUENTIAL,ZooKeeper会自动在节点名称后面添加唯一序列号。EPHEMERAL_SEQUENTIAL也有同样的特点,区别在于会话结束后是否会自动删除。

敲小黑板:*_SEQUENTIAL是ZooKeeper的一个很重要的特性,分布式锁、选举制度都依靠这个特性实现的。

1 对前续代码的重构

之前的文章,我们已经用实现了Watcher和Barrier,创建ZooKeeper连接的代码已经复制了一遍。后续还需要类似的工作,因此先对原有代码做一下重构,让代码味道干净一点。

以下是 process(WatchedEvent)的代码

final public void process(WatchedEvent event) {  if (Event.EventType.None.equals(event.getType())) {    // 连接状态发生变化    if (Event.KeeperState.SyncConnected.equals(event.getState())) {      // 连接建立成功      connectedSemaphore.countDown();    }  } else if (Event.EventType.NodeCreated.equals(event.getType())) {    processNodeCreated(event);  } else if (Event.EventType.NodeDeleted.equals(event.getType())) {    processNodeDeleted(event);  } else if (Event.EventType.NodeDataChanged.equals(event.getType())) {    processNodeDataChanged(event);  } else if (Event.EventType.NodeChildrenChanged.equals(event.getType())) {    processNodeChildrenChanged(event);  }}

以ZooKeeperBarrier为例,看看重构之后的构造函数和监听Event的代码

ZooKeeperBarrier(String address, String tableSerial, int tableCapacity, String customerName)    throws IOException {  super(address);  this.tableSerial = createRootNode(tableSerial);  this.tableCapacity = tableCapacity;  this.customerName = customerName;}protected void processNodeChildrenChanged(WatchedEvent event) {  log.info("{} 接收到了通知 : {}", customerName, event.getType());  // 子节点有变化  synchronized (mutex) {    mutex.notify();  }}

2 队列的生产者

生产者的关键代码

String elementName = queueName + "/element";ArrayList ids = ZooDefs.Ids.OPEN_ACL_UNSAFE;CreateMode createMode = CreateMode.PERSISTENT_SEQUENTIAL;getZooKeeper().create(elementName, value, ids, createMode);

注意,重点是PERSISTENT_SEQUENTIAL,PERSISTENT是表示永久存储直到有命令删除,SEQUENTIAL表示自动在后面添加自增的唯一序列号。这样,尽管elementName都一样,但实际生成的zNode名字在 "element"后面会添加格式为0d的10个数字,如0000000001。如一个完整的zNode名可能为/queue/element0000000021。

3 队列的消费者

消费者尝试从子节点列表获取zNode名最小的一个子节点,如果队列为空则等待NodeChildrenChanged事件。关键代码

/** 队列的同步信号 */private static Integer queueMutex = Integer.valueOf(1);@Overrideprotected void processNodeChildrenChanged(WatchedEvent event) {  synchronized (queueMutex) {    queueMutex.notify();  }}/** * 从队列中删除第一个对象 * * @return * @throws KeeperException * @throws InterruptedException */int consume() throws KeeperException, InterruptedException {  while (true) {    synchronized (queueMutex) {      List list = getZooKeeper().getChildren(queueName, true);      if (list.size() == 0) {        queueMutex.wait();      } else {        // 获取第一个子节点的名称        String firstNodeName = getFirstElementName(list);        // 删除节点,并返回节点的值        return deleteNodeAndReturnValue(firstNodeName);      }    }  }}

4 测试日志

把测试结果放源码前面,免得大家被无聊的代码晃晕。

测试代码创建了两个线程,一个线程是生产者,按随机间隔往队列中添加对象;一个线程是消费者,随机间隔尝试从队列中取出第一个,如果当时队列为空,会等到直到新的数据。

两个进程都加上随机间隔,是为了模拟生产可能比消费更快的情况。以下是测试日志,为了更突出,生产和消费的日志我增加了不同的文字样式。

49:47.866 [INFO] ZooKeeperQueueTest.testQueue(29) 开始ZooKeeper队列测试,本次将测试 10 个数据49:48.076 [DEBUG] ZooKeeperQueue.log(201)+ Profiler [tech.codestory.zookeeper.queue.ZooKeeperQueue 连接到ZooKeeper]|-- elapsed time                   [开始链接]   119.863 milliseconds.|-- elapsed time           [等待连接成功的Event]    40.039 milliseconds.|-- Total        [tech.codestory.zookeeper.queue.ZooKeeperQueue 连接到ZooKeeper]   159.911 milliseconds.49:48.082 [DEBUG] ZooKeeperQueue.log(201)+ Profiler [tech.codestory.zookeeper.queue.ZooKeeperQueue 连接到ZooKeeper]|-- elapsed time                   [开始链接]   103.795 milliseconds.|-- elapsed time           [等待连接成功的Event]    65.899 milliseconds.|-- Total        [tech.codestory.zookeeper.queue.ZooKeeperQueue 连接到ZooKeeper]   170.263 milliseconds.49:48.102 [INFO] ZooKeeperQueueTest.run(51) 生产对象 : 1 , 然后等待 1700 毫秒49:48.134 [INFO] ZooKeeperQueueTest.run(80) 消费对象: 1 , 然后等待 4000 毫秒49:49.814 [INFO] ZooKeeperQueueTest.run(51) 生产对象 : 2 , 然后等待 900 毫秒49:50.717 [INFO] ZooKeeperQueueTest.run(51) 生产对象 : 3 , 然后等待 1300 毫秒49:52.020 [INFO] ZooKeeperQueueTest.run(51) 生产对象 : 4 , 然后等待 3700 毫秒49:52.139 [INFO] ZooKeeperQueueTest.run(80) 消费对象: 2 , 然后等待 2800 毫秒49:54.947 [INFO] ZooKeeperQueueTest.run(80) 消费对象: 3 , 然后等待 4500 毫秒49:55.724 [INFO] ZooKeeperQueueTest.run(51) 生产对象 : 5 , 然后等待 3500 毫秒49:59.228 [INFO] ZooKeeperQueueTest.run(51) 生产对象 : 6 , 然后等待 4200 毫秒49:59.454 [INFO] ZooKeeperQueueTest.run(80) 消费对象: 4 , 然后等待 2400 毫秒50:01.870 [INFO] ZooKeeperQueueTest.run(80) 消费对象: 5 , 然后等待 4900 毫秒50:03.435 [INFO] ZooKeeperQueueTest.run(51) 生产对象 : 7 , 然后等待 4500 毫秒50:06.776 [INFO] ZooKeeperQueueTest.run(80) 消费对象: 6 , 然后等待 3600 毫秒50:07.938 [INFO] ZooKeeperQueueTest.run(51) 生产对象 : 8 , 然后等待 1900 毫秒50:09.846 [INFO] ZooKeeperQueueTest.run(51) 生产对象 : 9 , 然后等待 3200 毫秒50:10.388 [INFO] ZooKeeperQueueTest.run(80) 消费对象: 7 , 然后等待 2900 毫秒50:13.051 [INFO] ZooKeeperQueueTest.run(51) 生产对象 : 10 , 然后等待 4900 毫秒50:13.294 [INFO] ZooKeeperQueueTest.run(80) 消费对象: 8 , 然后等待 300 毫秒50:13.600 [INFO] ZooKeeperQueueTest.run(80) 消费对象: 9 , 然后等待 4800 毫秒50:18.407 [INFO] ZooKeeperQueueTest.run(80) 消费对象: 10 , 然后等待 2400 毫秒
0