Zookeeper的PHP实践

正文开始

github参考: https://github.com/php-zookeeper/php-zookeeper/blob/master/examples/watch.php https://www.cnblogs.com/lalalagq/p/9973679.html https://blog.csdn.net/apanious/article/details/51075899 ### 常用函数的api ~~~ zookeeper = new Zookeeper($address); } /** * Set a node to a value. If the node doesn't exist yet, it is created. * Existing values of the node are overwritten * * @param string $path The path to the node * @param mixed $value The new value for the node * * @return mixed previous value if set, or null */ public function set($path, $value) { if (!$this->zookeeper->exists($path)) { $this->makePath($path); $this->makeNode($path, $value); } else { $this->zookeeper->set($path, $value); } } /** * Equivalent of "mkdir -p" on ZooKeeper * * @param string $path The path to the node * @param string $value The value to assign to each new node along the path * * @return bool */ public function makePath($path, $value = '') { $parts = explode('/', $path); $parts = array_filter($parts); $subpath = ''; while (count($parts) > 1) { $subpath .= '/' . array_shift($parts); if (!$this->zookeeper->exists($subpath)) { $this->makeNode($subpath, $value); } } } /** * Create a node on ZooKeeper at the given path * * @param string $path The path to the node * @param string $value The value to assign to the new node * @param array $params Optional parameters for the Zookeeper node. * By default, a public node is created * * @return string the path to the newly created node or null on failure */ public function makeNode($path, $value, array $params = array()) { if (empty($params)) { $params = array( array( 'perms' => Zookeeper::PERM_ALL, 'scheme' => 'world', 'id' => 'anyone', ) ); } return $this->zookeeper->create($path, $value, $params); } /** * Get the value for the node * * @param string $path the path to the node * * @return string|null */ public function get($path) { if (!$this->zookeeper->exists($path)) { return null; } return $this->zookeeper->get($path); } /** * List the children of the given path, i.e. the name of the directories * within the current node, if any * * @param string $path the path to the node * * @return array the subpaths within the given node */ public function getChildren($path) { if (strlen($path) > 1 && preg_match('@/$@', $path)) { // remove trailing / $path = substr($path, 0, -1); } return $this->zookeeper->getChildren($path); } /** * Delete the node if it does not have any children * * @param string $path the path to the node * * @return true if node is deleted else null */ public function deleteNode($path) { if(!$this->zookeeper->exists($path)) { return null; } else { return $this->zookeeper->delete($path); } } /** * Wath a given path * @param string $path the path to node * @param callable $callback callback function * @return string|null */ public function watch($path, $callback) { if (!is_callable($callback)) { return null; } if ($this->zookeeper->exists($path)) { if (!isset($this->callback[$path])) { $this->callback[$path] = array(); } if (!in_array($callback, $this->callback[$path])) { $this->callback[$path][] = $callback; return $this->zookeeper->get($path, array($this, 'watchCallback')); } } } /** * Wath event callback warper * @param int $event_type * @param int $stat * @param string $path * @return the return of the callback or null */ public function watchCallback($event_type, $stat, $path) { if (!isset($this->callback[$path])) { return null; } foreach ($this->callback[$path] as $callback) { $this->zookeeper->get($path, array($this, 'watchCallback')); return call_user_func($callback); } } /** * Delete watch callback on a node, delete all callback when $callback is null * @param string $path * @param callable $callback * @return boolean|NULL */ public function cancelWatch($path, $callback = null) { if (isset($this->callback[$path])) { if (empty($callback)) { unset($this->callback[$path]); $this->zookeeper->get($path); //reset the callback return true; } else { $key = array_search($callback, $this->callback[$path]); if ($key !== false) { unset($this->callback[$path][$key]); return true; } else { return null; } } } else { return null; } } } // $zk = new ZookeeperApi('localhost:2181'); //var_dump($zk->get('/zookeeper')); // var_dump($zk->getChildren('/foo')); // public function watcher($i, $type, $key) { // echo "Insider Watcher\n"; // Watcher gets consumed so we need to set a new one // ZooKeeper提供了可以绑定在znode的监视器。如果监视器发现znode发生变化,该service会立即通知所有相关的客户端。这就是PH//P脚本如何知道变化的。Zookeeper::get方法的第二个参数是回调函数。\\ // 当触发事件时,监视器会被消费掉,所以我们需要在回调函 数中再次设置监视器。 // $this->get($key, array($this, 'watcher' ) ); // } // $ret = $zk->watch('/test', 'callback'); // var_dump($zk->get('/')); // var_dump($zk->getChildren('/')); // var_dump($zk->set('/test', 'abc')); // var_dump($zk->get('/test')); // var_dump($zk->getChildren('/')); // var_dump($zk->set('/foo/001', 'bar1')); // var_dump($zk->set('/foo/002', 'bar2')); // var_dump($zk->get('/')); // var_dump($zk->getChildren('/')); // var_dump($zk->getChildren('/foo')); //$zk->set('/test', 1); //$zk->set('/test', 2);//在终端执行 // while (true) { // sleep(1); // } // // $zoo = new ZookeeperDemo('127.0.0.1:2181'); // $zoo->get( '/test', array($zoo, 'watcher' ) ); // // while( true ) { // echo '.'; // sleep(2); // } ~~~ ### 分布式应用 ~~~ Zookeeper::PERM_ALL, 'scheme' => 'world', 'id' => 'anyone' ) ); private $isLeader = false; private $znode; public function __construct( $host = '', $watcher_cb = null, $recv_timeout = 10000 ) { parent::__construct( $host, $watcher_cb, $recv_timeout ); } public function register() { if( ! $this->exists( self::CONTAINER ) ) { $this->create( self::CONTAINER, null, $this->acl ); } $this->znode = $this->create( self::CONTAINER . '/w-', null, $this->acl, Zookeeper::EPHEMERAL | Zookeeper::SEQUENCE ); $this->znode = str_replace( self::CONTAINER .'/', '', $this->znode ); printf( "I'm registred as: %s\n", $this->znode ); $watching = $this->watchPrevious(); if( $watching == $this->znode ) { printf( "Nobody here, I'm the leader\n" ); $this->setLeader( true ); } else { printf( "I'm watching %s\n", $watching ); } } public function watchPrevious() { $workers = $this->getChildren( self::CONTAINER ); sort( $workers ); $size = sizeof( $workers ); for( $i = 0 ; $i < $size ; $i++ ) { if( $this->znode == $workers[ $i ] ) { if( $i > 0 ) { $this->get( self::CONTAINER . '/' . $workers[ $i - 1 ], array( $this, 'watchNode' ) ); return $workers[ $i - 1 ]; } return $workers[ $i ]; } } throw new Exception( sprintf( "Something went very wrong! I can't find myself: %s/%s", self::CONTAINER, $this->znode ) ); } public function watchNode( $i, $type, $name ) { $watching = $this->watchPrevious(); if( $watching == $this->znode ) { printf( "I'm the new leader!\n" ); $this->setLeader( true ); } else { printf( "Now I'm watching %s\n", $watching ); } } public function isLeader() { return $this->isLeader; } public function setLeader($flag) { $this->isLeader = $flag; } public function run() { $this->register(); while( true ) { if( $this->isLeader() ) { $this->doLeaderJob(); } else { $this->doWorkerJob(); } sleep( 2 ); } } public function doLeaderJob() { echo "Leading\n"; } public function doWorkerJob() { echo "Working\n"; } } // $worker = new Worker( '127.0.0.1:2181' ); // $worker->run(); // 打开至少3个终端,在每个终端中运行以下脚本: // php worker.php //现在模拟Leader崩溃的情形。使用Ctrl+c或其他方法退出第一个脚本。刚开始不会有任何变化,worker可以继续工作。后来,ZooKeeper会发现超时,并选举出新的leader。 // //虽然这些脚本很容易理解,但是还是有必要对已使用的Zookeeper标志作注释: //$this->znode = $this->create( self::CONTAINER . '/w-', // null, // $this->acl, // Zookeeper::EPHEMERAL | Zookeeper::SEQUENCE ); //每个znode都是EPHEMERAL和SEQUENCE的。 // //EPHEMRAL代表当客户端失去连接时移除该znode。这就是为何PHP脚本会知道超时。SEQUENCE代表在每个znode名称后添加顺序标识。我们通过这些唯一标识来标记worker。 // //在PHP部分还有些问题要注意。该扩展目前还是beta版,如果使用不当很容易发生segmentation fault。比如,不能传入普通函数作为回调函数,传入的必须为方法。我希望更多PHP社区的同仁可以看到Apache ZooKeeper的好,同时该扩展也会获得更多的支持。 // //ZooKeeper是一个强大的软件,拥有简洁和简单的API。由于文档和示例都做的很好,任何人都可以很容易的编写分布式软件。让我们开始吧,这会很有趣的。 ~~~

正文结束

PHP接口(interface)和抽象类(abstract) php获取文件mintype类型