Connection.php 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. <?php
  2. namespace Pheanstalk;
  3. use Pheanstalk\Contract\CommandInterface;
  4. use Pheanstalk\Contract\ResponseInterface;
  5. use Pheanstalk\Contract\SocketFactoryInterface;
  6. use Pheanstalk\Contract\SocketInterface;
  7. use Pheanstalk\Exception\ServerBadFormatException;
  8. use Pheanstalk\Exception\ServerDrainingException;
  9. use Pheanstalk\Exception\ServerInternalErrorException;
  10. use Pheanstalk\Exception\ServerOutOfMemoryException;
  11. use Pheanstalk\Exception\ServerUnknownCommandException;
  12. use Pheanstalk\Response\ArrayResponse;
  13. /**
  14. * A connection to a beanstalkd server, backed by any type of socket.
  15. *
  16. */
  17. class Connection
  18. {
  19. const CRLF = "\r\n";
  20. const CRLF_LENGTH = 2;
  21. const DEFAULT_CONNECT_TIMEOUT = 2;
  22. // responses which are global errors, mapped to their exception classes
  23. private static $errorResponses = [
  24. ResponseInterface::RESPONSE_OUT_OF_MEMORY => ServerOutOfMemoryException::class,
  25. ResponseInterface::RESPONSE_INTERNAL_ERROR => ServerInternalErrorException::class,
  26. ResponseInterface::RESPONSE_DRAINING => ServerDrainingException::class,
  27. ResponseInterface::RESPONSE_BAD_FORMAT => ServerBadFormatException::class,
  28. ResponseInterface::RESPONSE_UNKNOWN_COMMAND => ServerUnknownCommandException::class,
  29. ];
  30. // responses which are followed by data
  31. private static $dataResponses = [
  32. ResponseInterface::RESPONSE_RESERVED,
  33. ResponseInterface::RESPONSE_FOUND,
  34. ResponseInterface::RESPONSE_OK,
  35. ];
  36. /**
  37. * @var SocketFactoryInterface
  38. */
  39. private $factory;
  40. /**
  41. * @var ?SocketInterface
  42. */
  43. private $socket;
  44. public function __construct(SocketFactoryInterface $factory)
  45. {
  46. $this->factory = $factory;
  47. }
  48. /**
  49. * Disconnect the socket.
  50. * Subsequent socket operations will create a new connection.
  51. */
  52. public function disconnect()
  53. {
  54. if (isset($this->socket)) {
  55. $this->socket->disconnect();
  56. $this->socket = null;
  57. }
  58. }
  59. /**
  60. * @throws Exception\ClientException
  61. */
  62. public function dispatchCommand(CommandInterface $command): ArrayResponse
  63. {
  64. $socket = $this->getSocket();
  65. $to_send = $command->getCommandLine().self::CRLF;
  66. if ($command->hasData()) {
  67. $to_send .= $command->getData().self::CRLF;
  68. }
  69. $socket->write($to_send);
  70. $responseLine = $socket->getLine();
  71. $responseName = preg_replace('#^(\S+).*$#s', '$1', $responseLine);
  72. if (isset(self::$errorResponses[$responseName])) {
  73. $exceptionClass = self::$errorResponses[$responseName];
  74. throw new $exceptionClass(sprintf(
  75. "%s in response to '%s'",
  76. $responseName,
  77. $command->getCommandLine()
  78. ));
  79. }
  80. if (in_array($responseName, self::$dataResponses)) {
  81. $dataLength = preg_replace('#^.*\b(\d+)$#', '$1', $responseLine);
  82. $data = $socket->read((int) $dataLength);
  83. $crlf = $socket->read(self::CRLF_LENGTH);
  84. if ($crlf !== self::CRLF) {
  85. throw new Exception\ClientException(sprintf(
  86. 'Expected %u bytes of CRLF after %u bytes of data',
  87. self::CRLF_LENGTH,
  88. $dataLength
  89. ));
  90. }
  91. } else {
  92. $data = null;
  93. }
  94. return $command
  95. ->getResponseParser()
  96. ->parseResponse($responseLine, $data);
  97. }
  98. // ----------------------------------------
  99. /**
  100. * Socket handle for the connection to beanstalkd.
  101. *
  102. * @throws Exception\ConnectionException
  103. *
  104. * @return SocketInterface
  105. */
  106. private function getSocket()
  107. {
  108. if (!isset($this->socket)) {
  109. $this->socket = $this->factory->create();
  110. }
  111. return $this->socket;
  112. }
  113. }