Ver código fonte

change async task code

joe 4 anos atrás
pai
commit
e330776e78

+ 25 - 9
crmeb/command/Task.php

@@ -3,6 +3,8 @@
 namespace crmeb\command;
 
 use crmeb\services\async\task\WechatNotify;
+use crmeb\services\async\task\AsyncSms;
+use crmeb\services\async\task\AsyncClass;
 use think\console\Command;
 use think\console\Input;
 use think\console\input\Argument;
@@ -73,22 +75,36 @@ class Task extends Command
             if (is_null($job)) {
                 continue;
             }
-
+            
+            $executed = false;
             try {
                 $str_payload = $job->getData();
                 $payload = json_decode($str_payload, true);
-                
-                switch($payload['cmd']) {
-                    case WechatNotify::CMD:
-                        WechatNotify::exec($payload);   // TODO: NOT yet check return value
-                        break;
-                    default:
-                        Log::error('task unsupported cmd:' . $str_payload);
+                $asyncTasks = [
+                    new WechatNotify(),
+                    new AsyncClass(),
+                    new AsyncSms(),
+                ];
+
+                foreach($asyncTasks as $t) {
+                    if ($t->getCmd() == $payload['cmd']) {
+                        if (!$t->exec($payload)) {
+                            Log::warning('async task executed return false:' . $str_payload);
+                        }
+                        $executed = true;
+                    }
                 }
+                if (!$executed) {
+                    Log::error('task unsupported cmd:' . $str_payload);
+                    Beanstalk::release($job, 1024, 30);
+                    continue;
+                }
+
                 Beanstalk::delete($job);
             } catch (\Exception $e) {
                 Log::error('task exception:' . $e->getMessage() . ' data:' . $job->getData());
-                Beanstalk::release($job, 1024, 30);
+                Beanstalk::release($job, 1024, 30); 
+                // Beanstalk::delete($job); // testing
             }
         } // while
     }

+ 39 - 0
crmeb/services/async/task/AsyncClass.php

@@ -0,0 +1,39 @@
+<?php
+namespace crmeb\services\async\task;
+
+use ReflectionClass;
+
+class AsyncClass extends Task 
+{
+    public function getCmd():string
+    {
+        return 'async_class';
+    }
+
+    protected function _exec(array $params)
+    {
+        $className = $params['class'] ?? '';
+        $classArgs = $params['classArgs'] ?? [];
+        $methodName = $params['method'] ?? '';
+        $methodArgs = $params['methodArgs'] ?? [];
+
+        $class = new ReflectionClass($className);
+        $method = $class->getMethod($methodName);
+        if ($method->isStatic()) {
+            return $method->invokeArgs(null, $methodArgs);
+        }
+        $inst = $class->newInstanceArgs($classArgs);
+        return $method->invokeArgs($inst, $methodArgs);
+    }
+
+    public static function push(string $className, array $classArgs, string $methodName, array $methodArgs)
+    {
+        $inst = new self();
+        return $inst->put([
+            'class' => $className,
+            'classArgs' => $classArgs,
+            'method' => $methodName,
+            'methodArgs' => $methodArgs,
+        ]);
+    }
+}

+ 5 - 12
crmeb/services/async/task/AsyncSms.php

@@ -9,27 +9,20 @@ use crmeb\repositories\ShortLetterRepositories;
  */
 class AsyncSms extends Task {
 
-    const CMD = 'sms';
-
-    protected static function getCmd()
+    public function getCmd():string
     {
-        return self::CMD;
+        return 'sms';
     }
 
-    public static function exec(array $task)
+    protected function _exec(array $params)
     {
-        if (!self::checkTask($task)) {
-            return false;
-        }
-
-        $params = $task['params'];
-
         return ShortLetterRepositories::send(true, $params['phone'], $params['data'], $params['template']);
     }
 
     public static function push(string $phone, array $data, string $template)
     {
-        return self::put([
+        $inst = new self();
+        return $inst->put([
             'phone' => $phone,
             'data'  => $data,
             'template' => $template,

+ 21 - 11
crmeb/services/async/task/Task.php

@@ -15,7 +15,8 @@ use think\facade\Log;
  * 首先根据特定的任务,实现一个子类, 比如 SomeTask
  * 
  * class SomeTask extends Task {
- *      public static function push($param1, $param2, $param3) {
+ *      //客户端调用,异步执行
+ *      public function push($param1, $param2, $param3) {
  *          self::put([
  *              'param1' => $param1,
  *              'param2' => $param2,
@@ -23,11 +24,11 @@ use think\facade\Log;
  *          ]);
  *      }
  * 
- *      protected static function getCmd() {
+ *      protected function getCmd() {
  *          return 'some_task';
  *      }
  * 
- *      protected static function exec(array $param) {
+ *      protected function exec(array $param) {
  *          //Do something
  *      }
  * }
@@ -44,24 +45,33 @@ abstract class Task {
      * 
      * @return string
      */
-    abstract protected static function getCmd();
+    abstract public function getCmd() : string;
 
     /**
      * 虚函数。执行从 beanstalkd 中取出的任务数据 $task.
      * 
      * @return boolean
      */
-    abstract public static function exec(array $task);
+    abstract protected function _exec(array $params);
+
+    public function exec($task)
+    {
+        if (!$this->checkTask($task)) {
+            return false;
+        }
+
+        return $this->_exec($task['params']);
+    }
 
     /**
      * 任务自由的数据都叫 params
      * 这个函数打包成任务数据
      * @return array
      */
-    protected static function formatJob(array $params)
+    protected function formatJob(array $params)
     {
         return [
-            'cmd' => static::getCmd(),
+            'cmd' => $this->getCmd(),
             'ts'  => time(),
             'sender' => '0',
             'params' => $params,
@@ -74,11 +84,11 @@ abstract class Task {
      * @params 具体某个任务需要的参数
      * @return boolean
      */
-    protected static function put(array $params)
+    protected function put(array $params)
     {
         $bean = Beanstalk::instance();
         $tube = Config::get('app.beanstalk_tube', 'twong');
-        $arr = self::formatJob($params);
+        $arr = $this->formatJob($params);
         $json = json_encode($arr);
         try {
             $bean::useTube($tube)->put($json);
@@ -94,9 +104,9 @@ abstract class Task {
      * 
      * @return boolean
      */
-    protected static function checkTask($task)
+    protected function checkTask($task)
     {
-        if (!$task || !isset($task['cmd']) || $task['cmd'] != self::getCmd()) {
+        if (!$task || !isset($task['cmd']) || $task['cmd'] != $this->getCmd()) {
             Log::error('task format error: null or invalid cmd');
             return false;
         }

+ 5 - 13
crmeb/services/async/task/WechatNotify.php

@@ -11,8 +11,6 @@ use think\facade\Log;
  */
 class WechatNotify extends Task
 {
-    const CMD = 'wechat';
-
     const TYPE_ORDER        = 'order';
     const TYPE_REFUND       = 'refund';
     const TYPE_WITHDRAW     = 'withdraw';
@@ -24,16 +22,17 @@ class WechatNotify extends Task
 
     public static function push(string $type, string $key, string $desc = '')
     {
-        self::put([
+        $inst = new self();
+        return $inst->put([
             'type' => $type,
             'key'  => $key,
             'desc' => $desc,
         ]);
     }
 
-    protected static function getCmd()
+    public function getCmd():string
     {
-        return self::CMD;
+        return 'wechat';
     }
 
     /**
@@ -41,15 +40,8 @@ class WechatNotify extends Task
      * @task: array
      * @return boolean
      */
-    public static function exec(array $task)
+    protected function _exec(array $params)
     {
-        if (!self::checkTask($task)) {
-            return false;
-        }
-        
-        // must has value
-        $params = $task['params'];
-
         $supported_types = [
             self::TYPE_REFUND       => '退款',
             self::TYPE_ORDER        => '订单',