正文开始 github参考:
https://github.com/php-zookeeper/php-zookeeper/blob/master/examples/watch.php
https://www.cnblogs.com/lalalagq/p/9973679.html
https://blog.csdn.net/apanious/article/details/51075899
### 常用函数的api
~~~
zookeeper = new Zookeeper($address);
}
/**
* Set a node to a value. If the node doesn't exist yet, it is created.
* Existing values of the node are overwritten
*
* @param string $path The path to the node
* @param mixed $value The new value for the node
*
* @return mixed previous value if set, or null
*/
public function set($path, $value) {
if (!$this->zookeeper->exists($path)) {
$this->makePath($path);
$this->makeNode($path, $value);
} else {
$this->zookeeper->set($path, $value);
}
}
/**
* Equivalent of "mkdir -p" on ZooKeeper
*
* @param string $path The path to the node
* @param string $value The value to assign to each new node along the path
*
* @return bool
*/
public function makePath($path, $value = '') {
$parts = explode('/', $path);
$parts = array_filter($parts);
$subpath = '';
while (count($parts) > 1) {
$subpath .= '/' . array_shift($parts);
if (!$this->zookeeper->exists($subpath)) {
$this->makeNode($subpath, $value);
}
}
}
/**
* Create a node on ZooKeeper at the given path
*
* @param string $path The path to the node
* @param string $value The value to assign to the new node
* @param array $params Optional parameters for the Zookeeper node.
* By default, a public node is created
*
* @return string the path to the newly created node or null on failure
*/
public function makeNode($path, $value, array $params = array()) {
if (empty($params)) {
$params = array(
array(
'perms' => Zookeeper::PERM_ALL,
'scheme' => 'world',
'id' => 'anyone',
)
);
}
return $this->zookeeper->create($path, $value, $params);
}
/**
* Get the value for the node
*
* @param string $path the path to the node
*
* @return string|null
*/
public function get($path) {
if (!$this->zookeeper->exists($path)) {
return null;
}
return $this->zookeeper->get($path);
}
/**
* List the children of the given path, i.e. the name of the directories
* within the current node, if any
*
* @param string $path the path to the node
*
* @return array the subpaths within the given node
*/
public function getChildren($path) {
if (strlen($path) > 1 && preg_match('@/$@', $path)) {
// remove trailing /
$path = substr($path, 0, -1);
}
return $this->zookeeper->getChildren($path);
}
/**
* Delete the node if it does not have any children
*
* @param string $path the path to the node
*
* @return true if node is deleted else null
*/
public function deleteNode($path)
{
if(!$this->zookeeper->exists($path))
{
return null;
}
else
{
return $this->zookeeper->delete($path);
}
}
/**
* Wath a given path
* @param string $path the path to node
* @param callable $callback callback function
* @return string|null
*/
public function watch($path, $callback)
{
if (!is_callable($callback)) {
return null;
}
if ($this->zookeeper->exists($path)) {
if (!isset($this->callback[$path])) {
$this->callback[$path] = array();
}
if (!in_array($callback, $this->callback[$path])) {
$this->callback[$path][] = $callback;
return $this->zookeeper->get($path, array($this, 'watchCallback'));
}
}
}
/**
* Wath event callback warper
* @param int $event_type
* @param int $stat
* @param string $path
* @return the return of the callback or null
*/
public function watchCallback($event_type, $stat, $path)
{
if (!isset($this->callback[$path])) {
return null;
}
foreach ($this->callback[$path] as $callback) {
$this->zookeeper->get($path, array($this, 'watchCallback'));
return call_user_func($callback);
}
}
/**
* Delete watch callback on a node, delete all callback when $callback is null
* @param string $path
* @param callable $callback
* @return boolean|NULL
*/
public function cancelWatch($path, $callback = null)
{
if (isset($this->callback[$path])) {
if (empty($callback)) {
unset($this->callback[$path]);
$this->zookeeper->get($path); //reset the callback
return true;
} else {
$key = array_search($callback, $this->callback[$path]);
if ($key !== false) {
unset($this->callback[$path][$key]);
return true;
} else {
return null;
}
}
} else {
return null;
}
}
}
// $zk = new ZookeeperApi('localhost:2181');
//var_dump($zk->get('/zookeeper'));
// var_dump($zk->getChildren('/foo'));
// public function watcher($i, $type, $key) {
// echo "Insider Watcher\n";
// Watcher gets consumed so we need to set a new one
// ZooKeeper提供了可以绑定在znode的监视器。如果监视器发现znode发生变化,该service会立即通知所有相关的客户端。这就是PH//P脚本如何知道变化的。Zookeeper::get方法的第二个参数是回调函数。\\
// 当触发事件时,监视器会被消费掉,所以我们需要在回调函 数中再次设置监视器。
// $this->get($key, array($this, 'watcher' ) );
// }
// $ret = $zk->watch('/test', 'callback');
// var_dump($zk->get('/'));
// var_dump($zk->getChildren('/'));
// var_dump($zk->set('/test', 'abc'));
// var_dump($zk->get('/test'));
// var_dump($zk->getChildren('/'));
// var_dump($zk->set('/foo/001', 'bar1'));
// var_dump($zk->set('/foo/002', 'bar2'));
// var_dump($zk->get('/'));
// var_dump($zk->getChildren('/'));
// var_dump($zk->getChildren('/foo'));
//$zk->set('/test', 1);
//$zk->set('/test', 2);//在终端执行
// while (true) {
// sleep(1);
// }
//
// $zoo = new ZookeeperDemo('127.0.0.1:2181');
// $zoo->get( '/test', array($zoo, 'watcher' ) );
//
// while( true ) {
// echo '.';
// sleep(2);
// }
~~~
### 分布式应用
~~~
Zookeeper::PERM_ALL,
'scheme' => 'world',
'id' => 'anyone' ) );
private $isLeader = false;
private $znode;
public function __construct( $host = '', $watcher_cb = null, $recv_timeout = 10000 ) {
parent::__construct( $host, $watcher_cb, $recv_timeout );
}
public function register() {
if( ! $this->exists( self::CONTAINER ) ) {
$this->create( self::CONTAINER, null, $this->acl );
}
$this->znode = $this->create( self::CONTAINER . '/w-',
null,
$this->acl,
Zookeeper::EPHEMERAL | Zookeeper::SEQUENCE );
$this->znode = str_replace( self::CONTAINER .'/', '', $this->znode );
printf( "I'm registred as: %s\n", $this->znode );
$watching = $this->watchPrevious();
if( $watching == $this->znode ) {
printf( "Nobody here, I'm the leader\n" );
$this->setLeader( true );
}
else {
printf( "I'm watching %s\n", $watching );
}
}
public function watchPrevious() {
$workers = $this->getChildren( self::CONTAINER );
sort( $workers );
$size = sizeof( $workers );
for( $i = 0 ; $i < $size ; $i++ ) {
if( $this->znode == $workers[ $i ] ) {
if( $i > 0 ) {
$this->get( self::CONTAINER . '/' . $workers[ $i - 1 ], array( $this, 'watchNode' ) );
return $workers[ $i - 1 ];
}
return $workers[ $i ];
}
}
throw new Exception( sprintf( "Something went very wrong! I can't find myself: %s/%s",
self::CONTAINER,
$this->znode ) );
}
public function watchNode( $i, $type, $name ) {
$watching = $this->watchPrevious();
if( $watching == $this->znode ) {
printf( "I'm the new leader!\n" );
$this->setLeader( true );
}
else {
printf( "Now I'm watching %s\n", $watching );
}
}
public function isLeader() {
return $this->isLeader;
}
public function setLeader($flag) {
$this->isLeader = $flag;
}
public function run() {
$this->register();
while( true ) {
if( $this->isLeader() ) {
$this->doLeaderJob();
}
else {
$this->doWorkerJob();
}
sleep( 2 );
}
}
public function doLeaderJob() {
echo "Leading\n";
}
public function doWorkerJob() {
echo "Working\n";
}
}
// $worker = new Worker( '127.0.0.1:2181' );
// $worker->run();
// 打开至少3个终端,在每个终端中运行以下脚本:
// php worker.php
//现在模拟Leader崩溃的情形。使用Ctrl+c或其他方法退出第一个脚本。刚开始不会有任何变化,worker可以继续工作。后来,ZooKeeper会发现超时,并选举出新的leader。
//
//虽然这些脚本很容易理解,但是还是有必要对已使用的Zookeeper标志作注释:
//$this->znode = $this->create( self::CONTAINER . '/w-',
// null,
// $this->acl,
// Zookeeper::EPHEMERAL | Zookeeper::SEQUENCE );
//每个znode都是EPHEMERAL和SEQUENCE的。
//
//EPHEMRAL代表当客户端失去连接时移除该znode。这就是为何PHP脚本会知道超时。SEQUENCE代表在每个znode名称后添加顺序标识。我们通过这些唯一标识来标记worker。
//
//在PHP部分还有些问题要注意。该扩展目前还是beta版,如果使用不当很容易发生segmentation fault。比如,不能传入普通函数作为回调函数,传入的必须为方法。我希望更多PHP社区的同仁可以看到Apache ZooKeeper的好,同时该扩展也会获得更多的支持。
//
//ZooKeeper是一个强大的软件,拥有简洁和简单的API。由于文档和示例都做的很好,任何人都可以很容易的编写分布式软件。让我们开始吧,这会很有趣的。
~~~
正文结束 |
PHP接口(interface)和抽象类(abstract) | php获取文件mintype类型 |