|
@@ -0,0 +1,66 @@
|
|
|
+<?php
|
|
|
+/**
|
|
|
+ * for redis implement
|
|
|
+ */
|
|
|
+namespace app\common\util\QueueHelp;
|
|
|
+
|
|
|
+class RedisQueue {
|
|
|
+
|
|
|
+ public function __construct($config) {
|
|
|
+ $this->redis = new \Redis();
|
|
|
+ $this->redis->connect(env('redis.hostname'), env('redis.hostport'));
|
|
|
+ $this->redis->auth(env('redis.password'));
|
|
|
+ $this->redis->select($config['db']);
|
|
|
+ }
|
|
|
+
|
|
|
+ function push($queue, $obj)
|
|
|
+ {
|
|
|
+ if (!$queue || !is_object($obj)) {
|
|
|
+ return array('ack'=>false, 'msg'=>'parameter error');
|
|
|
+ }
|
|
|
+ if ($this->redis) {
|
|
|
+ $ack = $this->redis->rpush($queue, json_encode($obj));
|
|
|
+ $ack = ($ack>0)?true:false;
|
|
|
+ $msg = ($ack)?'ok':'redis push fail';
|
|
|
+ } else {
|
|
|
+ $ack = false;
|
|
|
+ $msg = 'redis not support';
|
|
|
+ }
|
|
|
+ return array('ack'=>$ack, 'msg'=>$msg);
|
|
|
+ }
|
|
|
+
|
|
|
+ function queueSize($queue)
|
|
|
+ {
|
|
|
+ if (!$queue){
|
|
|
+ return array('ack'=>false, 'msg'=>'parameter error');
|
|
|
+ }
|
|
|
+ if ($this->redis){
|
|
|
+ $size = $this->redis->llen($queue);
|
|
|
+ $ack = true;
|
|
|
+ $msg = 'ok';
|
|
|
+ }else{
|
|
|
+ $ack = false;
|
|
|
+ $msg = 'redis not support';
|
|
|
+ $size = -1;
|
|
|
+ }
|
|
|
+ return array('ack'=>$ack, 'msg'=>$msg, 'data'=>$size);
|
|
|
+ }
|
|
|
+
|
|
|
+ function pull($queue)
|
|
|
+ {
|
|
|
+ $req = new \stdClass();
|
|
|
+ if ($this->redis) {
|
|
|
+ $ack = false;
|
|
|
+ $data = $this->redis->lpop($queue);
|
|
|
+ if ($data) {
|
|
|
+ $ack = true;
|
|
|
+ }
|
|
|
+ $req = json_decode($data);
|
|
|
+ $msg = ($ack)?'ok':'redis pull fail';
|
|
|
+ } else {
|
|
|
+ $ack = false;
|
|
|
+ $msg = 'redis not support';
|
|
|
+ }
|
|
|
+ return array('ack'=>$ack, 'msg'=>$msg, 'data'=>$req);
|
|
|
+ }
|
|
|
+}
|