redis = new \Redis(); $this->redis->connect(env('redis.hostname'), env('redis.hostport')); $this->redis->auth(env('redis.password')); $this->redis->select($config['db']); } function push($queue, $obj) { if (!$queue || !is_object($obj)) { return array('ack'=>false, 'msg'=>'parameter error'); } if ($this->redis) { $ack = $this->redis->rpush($queue, json_encode($obj)); $ack = ($ack>0)?true:false; $msg = ($ack)?'ok':'redis push fail'; } else { $ack = false; $msg = 'redis not support'; } return array('ack'=>$ack, 'msg'=>$msg); } function queueSize($queue) { if (!$queue){ return array('ack'=>false, 'msg'=>'parameter error'); } if ($this->redis){ $size = $this->redis->llen($queue); $ack = true; $msg = 'ok'; }else{ $ack = false; $msg = 'redis not support'; $size = -1; } return array('ack'=>$ack, 'msg'=>$msg, 'data'=>$size); } function pull($queue) { $req = new \stdClass(); if ($this->redis) { $ack = false; $data = $this->redis->lpop($queue); if ($data) { $ack = true; } $req = json_decode($data); $msg = ($ack)?'ok':'redis pull fail'; } else { $ack = false; $msg = 'redis not support'; } return array('ack'=>$ack, 'msg'=>$msg, 'data'=>$req); } }