FileSocket.php 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. <?php
  2. namespace Pheanstalk\Socket;
  3. use Pheanstalk\Contract\SocketInterface;
  4. use Pheanstalk\Exception\SocketException;
  5. /**
  6. * A Socket implementation using the standard file functions.
  7. */
  8. abstract class FileSocket implements SocketInterface
  9. {
  10. /** @var ?resource */
  11. protected $socket;
  12. /**
  13. * Writes data to the socket.
  14. *
  15. * @param string $data
  16. *
  17. * @return void
  18. */
  19. public function write(string $data): void
  20. {
  21. $this->checkClosed();
  22. $retries = 0;
  23. error_clear_last();
  24. while (!empty($data) && $retries < 10) {
  25. $written = fwrite($this->socket, $data);
  26. if ($written === false) {
  27. $this->throwException();
  28. } elseif ($written === 0) {
  29. $retries++;
  30. continue;
  31. }
  32. $data = substr($data, $written);
  33. }
  34. if (!empty($data)) {
  35. throw new SocketException('Write failed');
  36. }
  37. }
  38. private function throwException()
  39. {
  40. if (null === $error = error_get_last()) {
  41. throw new SocketException('Unknown error');
  42. }
  43. throw new SocketException($error['message'], $error['type']);
  44. }
  45. private function checkClosed()
  46. {
  47. if (!isset($this->socket)) {
  48. throw new SocketException('The connection was closed');
  49. }
  50. }
  51. /**
  52. * Reads up to $length bytes from the socket.
  53. *
  54. * @return string
  55. */
  56. public function read(int $length): string
  57. {
  58. $this->checkClosed();
  59. $buffer = '';
  60. while (mb_strlen($buffer, '8BIT') < $length) {
  61. $result = fread($this->socket, $length - mb_strlen($buffer, '8BIT'));
  62. if ($result === false) {
  63. $this->throwException();
  64. }
  65. $buffer .= $result;
  66. }
  67. return $buffer;
  68. }
  69. /**
  70. * Reads up to the next new-line.
  71. * Trailing whitespace is trimmed.
  72. *
  73. * @param int
  74. */
  75. public function getLine(): string
  76. {
  77. $this->checkClosed();
  78. $result = fgets($this->socket, 8192);
  79. if ($result === false) {
  80. $this->throwException();
  81. }
  82. return rtrim($result);
  83. }
  84. /**
  85. * Disconnect the socket; subsequent usage of the socket will fail.
  86. */
  87. public function disconnect(): void
  88. {
  89. $this->checkClosed();
  90. fclose($this->socket);
  91. $this->socket = null;
  92. }
  93. }