Ver Fonte

add: 增加支持 beanstalkd 的異步任務處理,wechat機器人消息切到新任務組件上

joe há 4 anos atrás
pai
commit
ccf7def2d8

+ 3 - 1
config/app.php

@@ -44,7 +44,9 @@ return [
     'show_error_msg'   => false,
 
     /// 企业微信机器人
-    'redis_robot_msg_key' => 'qywechatpush',
+    // redis key @deprecated.
+    'redis_robot_msg_key'   => 'qywechatpush',
+    'beanstalk_tube'        => 'twong',
     // 亚里士多得
     'qy_weixin_robot_aristotle' => 'a17b9d56-b566-404b-904e-4476b95dc8d7',
 

+ 6 - 0
config/beanstalk.php

@@ -0,0 +1,6 @@
+<?php
+
+return [
+    'addr' => '127.0.0.1',
+    'port' => 11300,
+];

+ 1 - 0
config/console.php

@@ -19,5 +19,6 @@ return [
     'commands' => [
         'workerman'=>\crmeb\command\Workerman::class,
         'timer'=>\crmeb\command\Timer::class,
+        'task' =>\crmeb\command\Task::class,
     ],
 ];

+ 101 - 0
crmeb/command/Task.php

@@ -0,0 +1,101 @@
+<?php
+
+namespace crmeb\command;
+
+use crmeb\services\async\task\WechatNotify;
+use think\console\Command;
+use think\console\Input;
+use think\console\input\Argument;
+use think\console\input\Option;
+use think\console\Output;
+use think\facade\Config;
+use think\facade\Log;
+use Workerman\Worker;
+use crmeb\utils\Beanstalk;
+
+/**
+ * 用于执行异步任务,基于 beanstalkd
+ * 用 redis 做异步任务的方式逐步废除
+ * 
+ * tube: sms
+ * {
+ *  "cmd": "sms", 
+ *  "params": {
+ *     "phone": '', 
+ *     "template": '', 
+ *     "content": ''
+ *  }
+ * }
+ * tube: sql
+ * {"static":true, "class":"UserModel", "func": "insert", "args":{}}
+ * 
+ */
+class Task extends Command
+{
+    protected function configure()
+    {
+        $this->setName('task')
+            ->addArgument('status', Argument::REQUIRED, 'start/stop/status')
+            ->addOption('d', null, Option::VALUE_NONE, 'run as daemon')
+            ->setDescription('start/stop/status async task executor');
+    }
+
+    protected function init(Input $input, Output $output)
+    {
+        global $argv;
+
+        $argv[1] = $input->getArgument('status')?:'start';
+
+        if ($input->hasOption('d')) {
+            $argv[2] = '-d';
+        } else {
+            unset($argv[2]);
+        }
+    }
+
+    protected function execute(Input $input, Output $output)
+    {
+        $this->init($input, $output);
+        Worker::$pidFile = app()->getRootPath() . 'task.pid';
+        $task = new Worker();
+        $task->count = 1;
+        $task->onWorkerStart = [$this, 'start'];
+        $task->onWorkerStop = [$this, 'stop'];
+        $task->runAll();
+    }
+
+    public function start()
+    {
+        $tube = Config::get('app.beanstalk_tube');
+        Beanstalk::watch($tube);
+        while(true) {
+            $job = Beanstalk::reserveWithTimeout(30);
+            if (is_null($job)) {
+                continue;
+            }
+
+            try {
+                $str_payload = $job->getData();
+                $payload = json_decode($str_payload, true);
+                
+                switch($payload['cmd']) {
+                    case WechatNotify::CMD:
+                        WechatNotify::exec($payload);   // TODO: NOT yet check return value
+                        break;
+                    default:
+                        Log::error('task unsupported cmd:' . $str_payload);
+                }
+                Beanstalk::delete($job);
+            } catch (\Exception $e) {
+                Log::error('task exception:' . $e->getMessage() . ' data:' . $job->getData());
+                Beanstalk::release($job, 1024, 30);
+            }
+        } // while
+    }
+
+    public function stop()
+    {
+        echo 'task stopped';
+    }
+}
+

+ 0 - 1
crmeb/command/Workerman.php

@@ -65,7 +65,6 @@ class Workerman extends Command
         } else {
             unset($argv[2]);
         }
-
         $this->config = config('workerman');
 
         return $server;

+ 16 - 2
crmeb/services/QyWeixinService.php

@@ -29,12 +29,18 @@ class QyWeixinService {
         return self::$inst;
     }
 
+    /**
+     * 指定 key, 一个 key 对应一个机器人
+     */
     public function key(string $key)
     {
         $this->key = $key;
         return $this;
     }
 
+    /**
+     * 发送文本消息
+     */
     public function text(string $text)
     {
         $this->msgtype = 'text';
@@ -69,16 +75,24 @@ class QyWeixinService {
             ],
         ];
         $url = $this->addr . $this->key;
-        $ret = json_decode(QyWeixinService::post_json($url, $data), true);
+        $ret = json_decode(self::post_json($url, $data), true);
+        //$ret = json_decode(HttpService::postRequest($url, $data), true);
         return isset($ret['errcode']) && $ret['errcode'] == 0;
     }
 
-    public function post_json(string $url, array $data, array $headers=[], int $timeout=15)
+    public static function post_json(string $url, array $data, array $headers=[], int $timeout=15)
     {
         $h = curl_init();
         curl_setopt($h, CURLOPT_URL, $url);
         curl_setopt($h, CURLOPT_RETURNTRANSFER, true);
         curl_setopt($h, CURLOPT_POST, true);
+        if ($headers && count($headers) > 0) {
+            curl_setopt($h, CURLOPT_HTTPHEADER, $headers);
+        }
+        if ($timeout > 0) {
+            curl_setopt($h, CURLOPT_CONNECTTIMEOUT, $timeout);
+            curl_setopt($h, CURLOPT_TIMEOUT, $timeout);
+        }
         $js = json_encode($data);
         curl_setopt($h, CURLOPT_POSTFIELDS, $js);
         $ret = curl_exec($h);

+ 0 - 82
crmeb/services/async/WechatNotify.php

@@ -1,82 +0,0 @@
-<?php
-
-namespace crmeb\services\async;
-
-use crmeb\utils\Redis;
-use crmeb\services\QyWeixinService;
-use think\facade\Config;
-
-/**
- * 异步机器人通知.
- * 消息格式:
- * {"type":"type_name", "key":"1234", "desc":{}}
- * Class WechatNotify
- */
-class WechatNotify
-{
-    public static $TYPE_ORDER = 'order';
-    public static $TYPE_REFUND = 'refund';
-    public static $TYPE_WITHDRAW = 'withdraw';
-    public static $TYPE_MESSAGE = 'message';
-    public static $TYPE_COMPLIANT = 'compliant';
-    public static $TYPE_RECHARGE = 'recharge';
-    public static $TYPE_COMMENT = 'comment';
-
-    public static function push(string $type, string $key, string $desc = '')
-    {
-        $r = Redis::instance();
-        $rk = Config::get('app.redis_robot_msg_key');
-        $arr = [
-            'type' => $type,
-        ];
-        if ($key) {
-            $arr['key'] = $key;
-        }
-        if ($desc) {
-            $arr['desc'] = $desc;
-        }
-
-        $json = json_encode($arr);
-
-        $r->lpush($rk, $json);
-    }
-
-    /**
-     * this function should be called in timer process.
-     */
-    public static function notify()
-    {
-        $r = Redis::instance();
-        $k = Config::get('app.redis_robot_msg_key');
-        $aristotle = Config::get('app.qy_weixin_robot_aristotle');
-        $supported_types = [
-            'refund' => '退款',
-            'order' => '订单',
-            'compliant' => '反馈建议',
-            'withdraw' => '提现请求',
-            'message' => '客服消息',
-            'recharge' => '充值',
-            'comment' => '商品评论',
-        ];
-        $total = $r->llen($k);
-        while ($total > 0) {
-            $json = $r->rpop($k);
-            $arr = json_decode($json, true);
-            if (!isset($arr['type']) || !is_string($arr['type'])) {
-                continue;
-            }
-            $type = $arr['type'];
-            $key = isset($arr['key']) ? $arr['key'] : '';
-            if (!isset($supported_types[$type])) {
-                continue;
-            }
-            $desc = isset($arr['desc']) ? $arr['desc'] : '';
-            $msg = $supported_types[$type];
-            $md = "### 新" . $msg . "\n>key: " . $key
-                . "\n备注:" . $desc;
-            QyWeixinService::instance()->key($aristotle)
-                ->markdown($md)->post();
-            $total = $r->llen($k);
-        }// while
-    }//
-}

+ 95 - 0
crmeb/services/async/task/WechatNotify.php

@@ -0,0 +1,95 @@
+<?php
+
+namespace crmeb\services\async\task;
+
+use crmeb\utils\Beanstalk;
+use crmeb\services\QyWeixinService;
+use think\facade\Config;
+use think\facade\Log;
+
+/**
+ * 异步机器人通知.
+ * 消息格式:
+ * {"type":"type_name", "key":"1234", "desc":''}
+ * Class WechatNotify
+ */
+class WechatNotify
+{
+    const CMD = 'wechat';
+
+    const TYPE_ORDER        = 'order';
+    const TYPE_REFUND       = 'refund';
+    const TYPE_WITHDRAW     = 'withdraw';
+    const TYPE_MESSAGE      = 'message';
+    const TYPE_COMPLIANT    = 'compliant';
+    const TYPE_RECHARGE     = 'recharge';
+    const TYPE_COMMENT      = 'comment';
+    const TYPE_ERROR        = 'error';
+
+    public static function push(string $type, string $key, string $desc = '')
+    {
+        $bean = Beanstalk::instance();
+        $tube = Config::get('app.beanstalk_tube', 'twong');
+        $arr = [
+            'cmd' => self::CMD,
+            'ts'  => time(),
+            'sender' => '0',
+            'params' => [
+                'type' => $type,
+                'key'  => $key,
+                'desc' => $desc,
+            ],
+        ];
+
+        $json = json_encode($arr);
+        try {
+            $bean::useTube($tube)->put($json);
+        } catch (\Exception $e) {
+            Log::error('push failed: ' . $json);
+        }
+    }
+
+    /**
+     * Pop task body from beanstalk and deliver to wechat robot.
+     * @task: array
+     * @return boolean
+     */
+    public static function exec(array $task)
+    {
+        if (!$task || !isset($task['cmd']) || $task['cmd'] != self::CMD) {
+            Log::error('invalid cmd');
+            return false;
+        }
+        // must has value
+        $params = $task['params'];
+
+        $supported_types = [
+            self::TYPE_REFUND       => '退款',
+            self::TYPE_ORDER        => '订单',
+            self::TYPE_COMPLIANT    => '反馈建议',
+            self::TYPE_WITHDRAW     => '提现请求',
+            self::TYPE_MESSAGE      => '客服消息',
+            self::TYPE_RECHARGE     => '充值',
+            self::TYPE_COMMENT      => '商品评论',
+            self::TYPE_ERROR        => '故障',
+        ];
+        if (!isset($params['type']) || !is_string($params['type'])) {
+            Log::error('invalid type: bad format.');
+            return false;
+        }
+        $type = $params['type'];
+        $key = isset($params['key']) ? $params['key'] : '';
+        if (!isset($supported_types[$type])) {
+            Log::error('unsupported type:' . $type);
+            return false;
+        }
+        $desc = isset($params['desc']) ? $params['desc'] : '';
+        $msg = $supported_types[$type];
+        $md = "### 新" . $msg . "\n "
+            . "> 描述:" . $desc;
+        QyWeixinService::instance()->key($key)
+            ->markdown($md)->post();
+            
+        return true;
+    }//
+}

+ 0 - 2
crmeb/subscribes/TimerSubscribe.php

@@ -13,7 +13,6 @@ use crmeb\services\async\ClearanceCalc;
 use crmeb\services\async\LuckyCalc;
 use think\facade\Db;
 use think\facade\Log;
-use crmeb\services\async\WechatNotify;
 use app\admin\model\system\SystemLog;
 
 /**
@@ -47,7 +46,6 @@ class TimerSubscribe
      */
     public function onTimer_10()
     {
-        WechatNotify::notify();
     }
 
     /**

+ 35 - 0
crmeb/utils/Beanstalk.php

@@ -0,0 +1,35 @@
+<?php
+
+namespace crmeb\utils;
+
+use Pheanstalk\Pheanstalk;
+
+/**
+ * Beanstalkd client
+ */
+class Beanstalk {
+    protected static $inst;
+
+    protected $talk;
+
+    protected $config = [];
+
+    protected function __construct()
+    {
+        $this->config = config('beanstalk');
+        $this->talk = Pheanstalk::create($this->config['addr'], $this->config['port']);
+    }
+
+    public static function instance()
+    {
+        if (is_null(self::$inst)) {
+            self::$inst = new static();
+        }
+        return self::$inst;
+    }
+
+    public static function __callStatic($name, $arguments)
+    {
+        return self::instance()->talk->{$name}(...$arguments);
+    }
+}