Task.php 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. <?php
  2. namespace tw\async\tasks;
  3. use crmeb\utils\Beanstalk;
  4. use think\facade\Config;
  5. /**
  6. * 这个 Task 表示异步任务的基类, 借助于 beanstalkd 等服务来异步执行或延后执行
  7. *
  8. * HOWTO
  9. *
  10. * 首先根据特定的任务,实现一个子类, 比如 SomeTask
  11. *
  12. * class SomeTask extends Task {
  13. * //客户端调用,异步执行
  14. * public function push($param1, $param2, $param3) {
  15. * self::put([
  16. * 'param1' => $param1,
  17. * 'param2' => $param2,
  18. * 'param3' => $param3
  19. * ]);
  20. * }
  21. *
  22. * protected function getCmd() {
  23. * return 'some_task';
  24. * }
  25. *
  26. * protected function exec(array $param) {
  27. * //Do something
  28. * }
  29. * }
  30. *
  31. * // 消费任务的代码实现
  32. * // read from beanstalk
  33. * SomeTask::exec($param)
  34. *
  35. * 增加任务到 beanstalkd 代码
  36. */
  37. abstract class Task
  38. {
  39. /**
  40. * 虚函数。返回任务 cmd 标识, 任务类别
  41. *
  42. * @return string
  43. */
  44. abstract public function getCmd(): string;
  45. /**
  46. * 虚函数。执行从 beanstalkd 中取出的任务数据 $task.
  47. *
  48. * @return boolean
  49. */
  50. abstract protected function _exec(array $params);
  51. public function exec($task)
  52. {
  53. if (!$this->checkTask($task)) {
  54. return false;
  55. }
  56. return $this->_exec($task['params']);
  57. }
  58. /**
  59. * 任务自由的数据都叫 params
  60. * 这个函数打包成任务数据
  61. * @return array
  62. */
  63. protected function formatJob(array $params): array
  64. {
  65. return [
  66. 'cmd' => $this->getCmd(),
  67. 'ts' => time(),
  68. 'sender' => '0',
  69. 'params' => $params,
  70. ];
  71. }
  72. /**
  73. * 插入任务到 beanstalk
  74. *
  75. * @params 具体某个任务需要的参数
  76. * @return boolean
  77. */
  78. protected function put(array $params): bool
  79. {
  80. $bean = Beanstalk::instance();
  81. $tube = Config::get('app.beanstalk_tube', 'twong');
  82. $arr = $this->formatJob($params);
  83. $json = json_encode($arr);
  84. try {
  85. $bean::useTube($tube)->put($json);
  86. } catch (\Exception $e) {
  87. errlog('beanstalk put failed: ' . $json . ' error:' . $e->getMessage());
  88. return false;
  89. }
  90. return true;
  91. }
  92. /**
  93. * job 格式基本检查
  94. *
  95. * @return boolean
  96. */
  97. protected function checkTask($task)
  98. {
  99. if (!$task || !isset($task['cmd']) || $task['cmd'] != $this->getCmd()) {
  100. errlog('task format error: null or invalid cmd');
  101. return false;
  102. }
  103. if (!isset($task['params']) || !is_array($task['params'])) {
  104. errlog('task format error: without params or not an array.');
  105. return false;
  106. }
  107. return true;
  108. }
  109. }