| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- <?php
- namespace tw\async\tasks;
- use crmeb\utils\Beanstalk;
- use think\facade\Config;
- use think\facade\Log;
- /**
- * 这个 Task 表示异步任务的基类, 借助于 beanstalkd 等服务来异步执行或延后执行
- *
- * HOWTO
- *
- * 首先根据特定的任务,实现一个子类, 比如 SomeTask
- *
- * class SomeTask extends Task {
- * //客户端调用,异步执行
- * public function push($param1, $param2, $param3) {
- * self::put([
- * 'param1' => $param1,
- * 'param2' => $param2,
- * 'param3' => $param3
- * ]);
- * }
- *
- * protected function getCmd() {
- * return 'some_task';
- * }
- *
- * protected function exec(array $param) {
- * //Do something
- * }
- * }
- *
- * // 消费任务的代码实现
- * // read from beanstalk
- * SomeTask::exec($param)
- *
- * 增加任务到 beanstalkd 代码
- */
- abstract class Task {
- /**
- * 虚函数。返回任务 cmd 标识, 任务类别
- *
- * @return string
- */
- abstract public function getCmd() : string;
- /**
- * 虚函数。执行从 beanstalkd 中取出的任务数据 $task.
- *
- * @return boolean
- */
- abstract protected function _exec(array $params);
- public function exec($task)
- {
- if (!$this->checkTask($task)) {
- return false;
- }
- return $this->_exec($task['params']);
- }
- /**
- * 任务自由的数据都叫 params
- * 这个函数打包成任务数据
- * @return array
- */
- protected function formatJob(array $params): array
- {
- return [
- 'cmd' => $this->getCmd(),
- 'ts' => time(),
- 'sender' => '0',
- 'params' => $params,
- ];
- }
- /**
- * 插入任务到 beanstalk
- *
- * @params 具体某个任务需要的参数
- * @return boolean
- */
- protected function put(array $params): bool
- {
- $bean = Beanstalk::instance();
- $tube = Config::get('app.beanstalk_tube', 'twong');
- $arr = $this->formatJob($params);
- $json = json_encode($arr);
- try {
- $bean::useTube($tube)->put($json);
- } catch (\Exception $e) {
- Log::error('beanstalk put failed: ' . $json . ' error:' . $e->getMessage());
- return false;
- }
- return true;
- }
- /**
- * job 格式基本检查
- *
- * @return boolean
- */
- protected function checkTask($task)
- {
- if (!$task || !isset($task['cmd']) || $task['cmd'] != $this->getCmd()) {
- Log::error('task format error: null or invalid cmd');
- return false;
- }
- if (!isset($task['params']) || !is_array($task['params'])) {
- Log::error('task format error: without params or not an array.');
- return false;
- }
- return true;
- }
- }
|