SocketSocket.php 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. <?php
  2. namespace Pheanstalk\Socket;
  3. use Pheanstalk\Contract\SocketInterface;
  4. use Pheanstalk\Exception\ConnectionException;
  5. use Pheanstalk\Exception\SocketException;
  6. /**
  7. * A Socket implementation using the Sockets extension
  8. */
  9. class SocketSocket implements SocketInterface
  10. {
  11. /** @var resource */
  12. private $socket;
  13. public function __construct(
  14. string $host,
  15. int $port,
  16. int $connectTimeout
  17. ) {
  18. if (!extension_loaded('sockets')) {
  19. throw new \Exception('Sockets extension not found');
  20. }
  21. $this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
  22. if ($this->socket === false) {
  23. $this->throwException();
  24. }
  25. $timeout = [
  26. 'sec' => $connectTimeout,
  27. 'usec' => 0
  28. ];
  29. $sendTimeout = socket_get_option($this->socket, SOL_SOCKET, SO_SNDTIMEO);
  30. $receiveTimeout = socket_get_option($this->socket, SOL_SOCKET, SO_RCVTIMEO);
  31. socket_set_option($this->socket, SOL_SOCKET, SO_KEEPALIVE, 1);
  32. socket_set_option($this->socket, SOL_SOCKET, SO_SNDTIMEO, $timeout);
  33. socket_set_option($this->socket, SOL_SOCKET, SO_RCVTIMEO, $timeout);
  34. if (socket_set_block($this->socket) === false) {
  35. throw new ConnectionException(0, "Failed to set socket to blocking mode");
  36. }
  37. $addresses = gethostbynamel($host);
  38. if ($addresses === false) {
  39. throw new ConnectionException(0, "Could not resolve hostname $host");
  40. }
  41. if (@socket_connect($this->socket, $addresses[0], $port) === false) {
  42. $error = socket_last_error($this->socket);
  43. throw new ConnectionException($error, socket_strerror($error));
  44. };
  45. socket_set_option($this->socket, SOL_SOCKET, SO_SNDTIMEO, $sendTimeout);
  46. socket_set_option($this->socket, SOL_SOCKET, SO_RCVTIMEO, $receiveTimeout);
  47. }
  48. /**
  49. * Writes data to the socket.
  50. *
  51. * @param string $data
  52. *
  53. * @return void
  54. */
  55. public function write(string $data): void
  56. {
  57. $this->checkClosed();
  58. while (!empty($data)) {
  59. $written = socket_write($this->socket, $data);
  60. if ($written === false) {
  61. $this->throwException();
  62. }
  63. $data = substr($data, $written);
  64. }
  65. }
  66. private function throwException()
  67. {
  68. $error = socket_last_error($this->socket);
  69. throw new SocketException(socket_strerror($error), $error);
  70. }
  71. private function checkClosed()
  72. {
  73. if (!isset($this->socket)) {
  74. throw new SocketException('The connection was closed');
  75. }
  76. }
  77. /**
  78. * Reads up to $length bytes from the socket.
  79. *
  80. * @return string
  81. */
  82. public function read(int $length): string
  83. {
  84. $this->checkClosed();
  85. $buffer = '';
  86. while (mb_strlen($buffer, '8BIT') < $length) {
  87. $result = socket_read($this->socket, $length - mb_strlen($buffer, '8BIT'));
  88. if ($result === false) {
  89. $this->throwException();
  90. }
  91. $buffer .= $result;
  92. }
  93. return $buffer;
  94. }
  95. public function getLine(): string
  96. {
  97. $this->checkClosed();
  98. $buffer = '';
  99. // Reading stops at \r or \n. In case it stopped at \r we must continue reading.
  100. while (substr($buffer, -1, 1) !== "\n") {
  101. $result = socket_read($this->socket, 1024, PHP_NORMAL_READ);
  102. if ($result === false) {
  103. $this->throwException();
  104. }
  105. $buffer .= $result;
  106. }
  107. return rtrim($buffer);
  108. }
  109. /**
  110. * Disconnect the socket; subsequent usage of the socket will fail.
  111. */
  112. public function disconnect(): void
  113. {
  114. $this->checkClosed();
  115. socket_close($this->socket);
  116. unset($this->socket);
  117. }
  118. }