栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > PHP

php-beanstalkd消息队列类实例分享

PHP 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

php-beanstalkd消息队列类实例分享

本文实例为大家分享了php beanstalkd消息队列类的具体代码,供大家参考,具体内容如下

 true,
      'host' => '127.0.0.1',
      'port' => 11300,
      'timeout' => 1,
      'logger' => null
    ];
    $this->_config = $config + $defaults;
  }
 
  
  public function __destruct() {
    $this->disconnect();
  }
 
  
  public function connect() {
    if (isset($this->_connection)) {
      $this->disconnect();
    }
    $errNum = '';
    $errStr = '';
    $function = $this->_config['persistent'] ? 'pfsockopen' : 'fsockopen';
    $params = [$this->_config['host'], $this->_config['port'], &$errNum, &$errStr];
 
    if ($this->_config['timeout']) {
      $params[] = $this->_config['timeout'];
    }
    $this->_connection = @call_user_func_array($function, $params);
 
    if (!empty($errNum) || !empty($errStr)) {
      $this->_error("{$errNum}: {$errStr}");
    }
 
    $this->connected = is_resource($this->_connection);
 
    if ($this->connected) {
      stream_set_timeout($this->_connection, -1);
    }
    return $this->connected;
  }
 
  
  public function disconnect() {
    if (!is_resource($this->_connection)) {
      $this->connected = false;
    } else {
      $this->_write('quit');
      $this->connected = !fclose($this->_connection);
 
      if (!$this->connected) {
 $this->_connection = null;
      }
    }
    return !$this->connected;
  }
 
  
  protected function _error($message) {
    if ($this->_config['logger']) {
      $this->_config['logger']->error($message);
    }
  }
 
  public function errors()
  {
    return $this->_config['logger'];
  }
  
  protected function _write($data) {
    if (!$this->connected) {
      $message = 'No connecting found while writing data to socket.';
      throw new RuntimeException($message);
    }
 
    $data .= "rn";
    return fwrite($this->_connection, $data, strlen($data));
  }
 
  
  protected function _read($length = null) {
    if (!$this->connected) {
      $message = 'No connection found while reading data from socket.';
      throw new RuntimeException($message);
    }
    if ($length) {
      if (feof($this->_connection)) {
 return false;
      }
      $data = stream_get_contents($this->_connection, $length + 2);
      $meta = stream_get_meta_data($this->_connection);
 
      if ($meta['timed_out']) {
 $message = 'Connection timed out while reading data from socket.';
 throw new RuntimeException($message);
      }
      $packet = rtrim($data, "rn");
    } else {
      $packet = stream_get_line($this->_connection, 16384, "rn");
    }
    return $packet;
  }
 
  
 
  
  public function put($pri, $delay, $ttr, $data) {
    $this->_write(sprintf("put %d %d %d %drn%s", $pri, $delay, $ttr, strlen($data), $data));
    $status = strtok($this->_read(), ' ');
 
    switch ($status) {
      case 'INSERTED':
      case 'BURIED':
 return (integer) strtok(' '); // job id
      case 'EXPECTED_CRLF':
      case 'JOB_TOO_BIG':
      default:
 $this->_error($status);
 return false;
    }
  }
 
  
  public function useTube($tube) {
    $this->_write(sprintf('use %s', $tube));
    $status = strtok($this->_read(), ' ');
 
    switch ($status) {
      case 'USING':
 return strtok(' ');
      default:
 $this->_error($status);
 return false;
    }
  }
 
  
  public function pauseTube($tube, $delay) {
    $this->_write(sprintf('pause-tube %s %d', $tube, $delay));
    $status = strtok($this->_read(), ' ');
 
    switch ($status) {
      case 'PAUSED':
 return true;
      case 'NOT_FOUND':
      default:
 $this->_error($status);
 return false;
    }
  }
 
  
 
  
  public function reserve($timeout = null) {
    if (isset($timeout)) {
      $this->_write(sprintf('reserve-with-timeout %d', $timeout));
    } else {
      $this->_write('reserve');
    }
    $status = strtok($this->_read(), ' ');
 
    switch ($status) {
      case 'RESERVED':
 return [
   'id' => (integer) strtok(' '),
   'body' => $this->_read((integer) strtok(' '))
 ];
      case 'DEADLINE_SOON':
      case 'TIMED_OUT':
      default:
 $this->_error($status);
 return false;
    }
  }
 
  
  public function delete($id) {
    $this->_write(sprintf('delete %d', $id));
    $status = $this->_read();
 
    switch ($status) {
      case 'DELETED':
 return true;
      case 'NOT_FOUND':
      default:
 $this->_error($status);
 return false;
    }
  }
 
  
  public function release($id, $pri, $delay) {
    $this->_write(sprintf('release %d %d %d', $id, $pri, $delay));
    $status = $this->_read();
 
    switch ($status) {
      case 'RELEASED':
      case 'BURIED':
 return true;
      case 'NOT_FOUND':
      default:
 $this->_error($status);
 return false;
    }
  }
 
  
  public function bury($id, $pri) {
    $this->_write(sprintf('bury %d %d', $id, $pri));
    $status = $this->_read();
 
    switch ($status) {
      case 'BURIED':
 return true;
      case 'NOT_FOUND':
      default:
 $this->_error($status);
 return false;
    }
  }
 
  
  public function touch($id) {
    $this->_write(sprintf('touch %d', $id));
    $status = $this->_read();
 
    switch ($status) {
      case 'TOUCHED':
 return true;
      case 'NOT_TOUCHED':
      default:
 $this->_error($status);
 return false;
    }
  }
 
  
  public function watch($tube) {
    $this->_write(sprintf('watch %s', $tube));
    $status = strtok($this->_read(), ' ');
 
    switch ($status) {
      case 'WATCHING':
 return (integer) strtok(' ');
      default:
 $this->_error($status);
 return false;
    }
  }
 
  
  public function ignore($tube) {
    $this->_write(sprintf('ignore %s', $tube));
    $status = strtok($this->_read(), ' ');
 
    switch ($status) {
      case 'WATCHING':
 return (integer) strtok(' ');
      case 'NOT_IGNORED':
      default:
 $this->_error($status);
 return false;
    }
  }
 
  
 
  
  public function peek($id) {
    $this->_write(sprintf('peek %d', $id));
    return $this->_peekRead();
  }
 
  
  public function peekReady() {
    $this->_write('peek-ready');
    return $this->_peekRead();
  }
 
  
  public function peekDelayed() {
    $this->_write('peek-delayed');
    return $this->_peekRead();
  }
 
  
  public function peekBuried() {
    $this->_write('peek-buried');
    return $this->_peekRead();
  }
 
  
  protected function _peekRead() {
    $status = strtok($this->_read(), ' ');
 
    switch ($status) {
      case 'FOUND':
 return [
   'id' => (integer) strtok(' '),
   'body' => $this->_read((integer) strtok(' '))
 ];
      case 'NOT_FOUND':
      default:
 $this->_error($status);
 return false;
    }
  }
 
  
  public function kick($bound) {
    $this->_write(sprintf('kick %d', $bound));
    $status = strtok($this->_read(), ' ');
 
    switch ($status) {
      case 'KICKED':
 return (integer) strtok(' ');
      default:
 $this->_error($status);
 return false;
    }
  }
 
  
  public function kickJob($id) {
    $this->_write(sprintf('kick-job %d', $id));
    $status = strtok($this->_read(), ' ');
 
    switch ($status) {
      case 'KICKED':
 return true;
      case 'NOT_FOUND':
      default:
 $this->_error($status);
 return false;
    }
  }
 
  
 
  
  public function statsJob($id) {
    $this->_write(sprintf('stats-job %d', $id));
    return $this->_statsRead();
  }
 
  
  public function statsTube($tube) {
    $this->_write(sprintf('stats-tube %s', $tube));
    return $this->_statsRead();
  }
 
  
  public function stats() {
    $this->_write('stats');
    return $this->_statsRead();
  }
 
  
  public function listTubes() {
    $this->_write('list-tubes');
    return $this->_statsRead();
  }
 
  
  public function listTubeUsed() {
    $this->_write('list-tube-used');
    $status = strtok($this->_read(), ' ');
 
    switch ($status) {
      case 'USING':
 return strtok(' ');
      default:
 $this->_error($status);
 return false;
    }
  }
 
  
  public function listTubesWatched() {
    $this->_write('list-tubes-watched');
    return $this->_statsRead();
  }
 
  
  protected function _statsRead($decode = true) {
    $status = strtok($this->_read(), ' ');
 
    switch ($status) {
      case 'OK':
 $data = $this->_read((integer) strtok(' '));
 return $decode ? $this->_decode($data) : $data;
      default:
 $this->_error($status);
 return false;
    }
  }
 
  
  protected function _decode($data) {
    $data = array_slice(explode("n", $data), 1);
    $result = [];
 
    foreach ($data as $key => $value) {
      if ($value[0] === '-') {
 $value = ltrim($value, '- ');
      } elseif (strpos($value, ':') !== false) {
 list($key, $value) = explode(':', $value);
 $value = ltrim($value, ' ');
      }
      if (is_numeric($value)) {
 $value = (integer) $value == $value ? (integer) $value : (float) $value;
      }
      $result[$key] = $value;
    }
    return $result;
  }
}
 
?>

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持考高分网。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/40686.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号