RedisQueue.php 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. <?php
  2. /**
  3. * for redis implement
  4. */
  5. namespace app\common\util\QueueHelp;
  6. class RedisQueue {
  7. public function __construct($config) {
  8. $this->redis = new \Redis();
  9. $this->redis->connect(env('redis.hostname'), env('redis.hostport'));
  10. $this->redis->auth(env('redis.password'));
  11. $this->redis->select($config['db']);
  12. }
  13. function push($queue, $obj)
  14. {
  15. if (!$queue || !is_object($obj)) {
  16. return array('ack'=>false, 'msg'=>'parameter error');
  17. }
  18. if ($this->redis) {
  19. $ack = $this->redis->rpush($queue, json_encode($obj));
  20. $ack = ($ack>0)?true:false;
  21. $msg = ($ack)?'ok':'redis push fail';
  22. } else {
  23. $ack = false;
  24. $msg = 'redis not support';
  25. }
  26. return array('ack'=>$ack, 'msg'=>$msg);
  27. }
  28. function queueSize($queue)
  29. {
  30. if (!$queue){
  31. return array('ack'=>false, 'msg'=>'parameter error');
  32. }
  33. if ($this->redis){
  34. $size = $this->redis->llen($queue);
  35. $ack = true;
  36. $msg = 'ok';
  37. }else{
  38. $ack = false;
  39. $msg = 'redis not support';
  40. $size = -1;
  41. }
  42. return array('ack'=>$ack, 'msg'=>$msg, 'data'=>$size);
  43. }
  44. function pull($queue)
  45. {
  46. $req = new \stdClass();
  47. if ($this->redis) {
  48. $ack = false;
  49. $data = $this->redis->lpop($queue);
  50. if ($data) {
  51. $ack = true;
  52. }
  53. $req = json_decode($data);
  54. $msg = ($ack)?'ok':'redis pull fail';
  55. } else {
  56. $ack = false;
  57. $msg = 'redis not support';
  58. }
  59. return array('ack'=>$ack, 'msg'=>$msg, 'data'=>$req);
  60. }
  61. }