Task.php 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. <?php
  2. namespace crmeb\services\async\task;
  3. use crmeb\utils\Beanstalk;
  4. use think\facade\Config;
  5. use think\facade\Log;
  6. abstract class Task {
  7. /**
  8. * 得到任务 cmd 标识, 任务类别
  9. *
  10. * @return string
  11. */
  12. abstract protected static function getCmd();
  13. /**
  14. * 执行从 beanstalkd 中取出的任务数据 $task.
  15. *
  16. * @return boolean
  17. */
  18. abstract public static function exec(array $task);
  19. /**
  20. * 任务自由的数据都叫 params
  21. * 这个函数打包成任务数据
  22. * @return array
  23. */
  24. protected static function formatJob(array $params)
  25. {
  26. return [
  27. 'cmd' => static::getCmd(),
  28. 'ts' => time(),
  29. 'sender' => '0',
  30. 'params' => $params,
  31. ];
  32. }
  33. /**
  34. * 插入任务到 beanstalk
  35. *
  36. * @params 具体某个任务需要的参数
  37. * @return boolean
  38. */
  39. protected static function put(array $params)
  40. {
  41. $bean = Beanstalk::instance();
  42. $tube = Config::get('app.beanstalk_tube', 'twong');
  43. $arr = self::formatJob($params);
  44. $json = json_encode($arr);
  45. try {
  46. $bean::useTube($tube)->put($json);
  47. } catch (\Exception $e) {
  48. Log::error('beanstalk put failed: ' . $json . ' error:' . $e->getMessage());
  49. return false;
  50. }
  51. return true;
  52. }
  53. /**
  54. * job 格式基本检查
  55. *
  56. * @return boolean
  57. */
  58. protected static function checkTask($task)
  59. {
  60. if (!$task || !isset($task['cmd']) || $task['cmd'] != self::getCmd()) {
  61. Log::error('task format error: null or invalid cmd');
  62. return false;
  63. }
  64. if (!isset($task['params']) || !is_array($task['params'])) {
  65. Log::error('task format error: without params or not an array.');
  66. return false;
  67. }
  68. return true;
  69. }
  70. }