diff --git a/src/Event/WsEvent.php b/src/Event/WsEvent.php index 24859a71..5cb552fc 100644 --- a/src/Event/WsEvent.php +++ b/src/Event/WsEvent.php @@ -8,8 +8,9 @@ */ final class WsEvent { - const ON_HANDSHAKE = 'handshake'; - const ON_OPEN = 'open'; - const ON_MESSAGE = 'message'; - const ON_CLOSE = 'close'; + const ON_HANDSHAKE = 'ws.handshake'; + const ON_OPEN = 'ws.open'; + const ON_MESSAGE = 'ws.message'; + const ON_CLOSE = 'ws.close'; + const ON_ERROR = 'ws.error'; } diff --git a/src/Exception/ContextLostException.php b/src/Exception/ContextLostException.php new file mode 100644 index 00000000..32845a0f --- /dev/null +++ b/src/Exception/ContextLostException.php @@ -0,0 +1,12 @@ +getUri()->getPath(); - list($className, ) = $this->getHandler($path); + list($className,) = $this->getHandler($path); } catch (\Throwable $e) { /* @var ErrorHandler $errorHandler */ // $errorHandler = \bean(ErrorHandler::class); @@ -39,7 +41,7 @@ public function handshake(Request $request, Response $response): array if ($e instanceof WsRouteException) { return [ HandlerInterface::HANDSHAKE_FAIL, - $response->withStatus(404) + $response->withStatus(404)->withAddedHeader('Failed-Reason', 'Route not found') ]; } @@ -70,7 +72,7 @@ public function handshake(Request $request, Response $response): array public function open(Server $server, Request $request, int $fd) { $path = $request->getUri()->getPath(); - list($className, ) = $this->getHandler($path); + list($className,) = $this->getHandler($path); /** @var HandlerInterface $handler */ $handler = \bean($className); @@ -86,21 +88,34 @@ public function open(Server $server, Request $request, int $fd) * @param Frame $frame * @throws \InvalidArgumentException * @throws \Swoft\WebSocket\Server\Exception\WsRouteException - * @throws \Swoft\WebSocket\Server\Exception\WsMessageException + * @throws \Swoft\WebSocket\Server\Exception\ContextLostException */ public function message(Server $server, Frame $frame) { $fd = $frame->fd; - if (!$path = WebSocketContext::getMeta('path', $fd)) { - throw new WsMessageException("The connection info has lost of the fd $fd"); - } + try { + if (!$path = WebSocketContext::getMeta('path', $fd)) { + throw new ContextLostException("The connection info has lost of the fd#$fd, on message"); + } - list($className, ) = $this->getHandler($path); + $className = $this->getHandler($path)[0]; - /** @var HandlerInterface $handler */ - $handler = \bean($className); - $handler->onMessage($server, $frame); + /** @var HandlerInterface $handler */ + $handler = \bean($className); + $handler->onMessage($server, $frame); + } catch (\Throwable $e) { + /** @var \Swoft\Event\EventManager $em */ + $em = App::getBean('eventManager'); + + if ($em->hasListenerQueue(WsEvent::ON_ERROR)) { + App::trigger(WsEvent::ON_ERROR, $frame, $e); + } else { + App::error($e->getMessage(), ['fd' => $fd, 'data' => $frame->data]); + // close connection + $server->close($fd); + } + } } /** @@ -109,21 +124,27 @@ public function message(Server $server, Frame $frame) * @param int $fd * @throws \InvalidArgumentException * @throws \Swoft\WebSocket\Server\Exception\WsRouteException - * @throws \Swoft\WebSocket\Server\Exception\WsMessageException + * @throws \Swoft\WebSocket\Server\Exception\ContextLostException */ public function close(Server $server, int $fd) { - if (!$path = WebSocketContext::getMeta('path', $fd)) { - throw new WsMessageException("The connection info has lost of the fd $fd"); - } + try { + if (!$path = WebSocketContext::getMeta('path', $fd)) { + throw new ContextLostException( + "The connection info has lost of the fd#$fd, on connection closed" + ); + } - list($className, ) = $this->getHandler($path); + $className = $this->getHandler($path)[0]; - /** @var HandlerInterface $handler */ - $handler = \bean($className); + /** @var HandlerInterface $handler */ + $handler = \bean($className); - if (\method_exists($handler, 'onClose')) { - $handler->onClose($server, $fd); + if (\method_exists($handler, 'onClose')) { + $handler->onClose($server, $fd); + } + } catch (\Throwable $e) { + App::error($e->getMessage(), ['fd' => $fd]); } } @@ -141,7 +162,7 @@ protected function getHandler(string $path): array if ($status !== HandlerMapping::FOUND) { throw new WsRouteException(sprintf( - 'The requested websocket route "%s" path is not exist! ', + 'The requested websocket route "%s" path is not exist!', $path )); } diff --git a/src/Router/HandlerMapping.php b/src/Router/HandlerMapping.php index 7048fcae..bb894a43 100644 --- a/src/Router/HandlerMapping.php +++ b/src/Router/HandlerMapping.php @@ -24,7 +24,7 @@ class HandlerMapping implements HandlerMappingInterface * ... * ] */ - private $routes = []; + protected $routes = []; /** * @param string $path @@ -59,7 +59,7 @@ public function getHandler(...$params): array */ public function match(string $path): array { - $path = \rtrim($path, '/ '); + $path = $path === '/' ? $path : \rtrim($path, '/ '); if (!isset($this->routes[$path])) { return [self::NOT_FOUND, $path]; diff --git a/src/WebSocketContext.php b/src/WebSocketContext.php index 68ad86fd..28a79c96 100644 --- a/src/WebSocketContext.php +++ b/src/WebSocketContext.php @@ -21,7 +21,7 @@ class WebSocketContext * fd => [ // metadata * 'meta' => [ - * 'id' => fd, + * 'fd' => fd, * 'path' => request path, * ... * ], @@ -48,7 +48,7 @@ class WebSocketContext * ] * @param Request $request */ - public static function set(int $fd, array $meta, Request $request) + public static function init(int $fd, array $meta, Request $request) { self::$connections[$fd][self::META_KEY] = $meta; self::$connections[$fd][self::REQUEST_KEY] = $request; @@ -56,16 +56,27 @@ public static function set(int $fd, array $meta, Request $request) /** * @param int $fd + * @param string $ctxKey + * @param mixed $ctxValue + */ + public static function set(int $fd, string $ctxKey, $ctxValue) + { + self::$connections[$fd][$ctxKey] = $ctxValue; + } + + /** + * @param int $fd + * @param string|null $ctxKey * @return array|null */ - public static function get(int $fd = null) + public static function get(int $fd = null, string $ctxKey = null) { - if ($fd === null) { - $fd = self::getFdByCoId(); + if ($fd === null && !($fd = self::getFdByCoId())) { + return null; + } - if ($fd === null) { - return null; - } + if ($ctxKey) { + return self::getContext($ctxKey, $fd); } return self::$connections[$fd] ?? null; @@ -86,12 +97,8 @@ public static function has(int $fd): bool */ public static function del(int $fd = null) { - if ($fd === null) { - $fd = self::getFdByCoId(); - - if ($fd === null) { - return false; - } + if ($fd === null && !($fd = self::getFdByCoId())) { + return false; } if (isset(self::$connections[$fd])) { @@ -102,6 +109,34 @@ public static function del(int $fd = null) return false; } + /** + * @param string $ctxKey + * @param int|null $fd + * @return mixed|null + */ + public static function getContext(string $ctxKey, int $fd = null) + { + if ($fd === null && !($fd = self::getFdByCoId())) { + return null; + } + + return self::$connections[$fd][$ctxKey] ?? null; + } + + /** + * @param string $ctxKey + * @param int $fd + * @return bool + */ + public static function hasContext(string $ctxKey, int $fd = null): bool + { + if ($fd === null && !($fd = self::getFdByCoId())) { + return false; + } + + return isset(self::$connections[$fd][$ctxKey]); + } + /** * @return int */ @@ -224,11 +259,12 @@ public static function getFdByCoId() /** * delete coId to fd mapping + * @param int|null $cid * @return bool */ - public static function delFdToCoId(): bool + public static function delFdToCoId(int $cid = null): bool { - $cid = self::getCoroutineId(); + $cid = $cid > -1 ? $cid : self::getCoroutineId(); if (isset(self::$map[$cid])) { unset(self::$map[$cid]); diff --git a/src/WebSocketEventTrait.php b/src/WebSocketEventTrait.php index 35c4fb38..9d2ca8b3 100644 --- a/src/WebSocketEventTrait.php +++ b/src/WebSocketEventTrait.php @@ -47,7 +47,7 @@ public function onHandShake(Request $request, Response $response): bool $psr7Res = new \Swoft\Http\Message\Server\Response($response); // Initialize client information - WebSocketContext::set($fd, $meta, $psr7Req); + WebSocketContext::init($fd, $meta, $psr7Req); // init fd and coId mapping WebSocketContext::setFdToCoId($fd); @@ -75,9 +75,7 @@ public function onHandShake(Request $request, Response $response): bool } // setting response - $psr7Res = $psr7Res - ->withStatus(101) - ->withHeaders(WebSocket::handshakeHeaders($secWSKey)); + $psr7Res = $psr7Res->withStatus(101)->withHeaders(WebSocket::handshakeHeaders($secWSKey)); if (isset($request->header['sec-websocket-protocol'])) { $psr7Res = $psr7Res->withHeader('Sec-WebSocket-Protocol', $request->header['sec-websocket-protocol']); @@ -90,11 +88,16 @@ public function onHandShake(Request $request, Response $response): bool WebSocketContext::setMeta($fd, true, 'handshake'); - $this->log("Handshake: Client #{$fd} handshake successful! path {$meta['path']}, co Id #$cid, Meta:", $meta, 'debug'); + $this->log( + "Handshake: Client #{$fd} handshake successful! path {$meta['path']}, co Id #$cid, Meta:", + WebSocketContext::getMeta(null, $fd), + 'debug' + ); // Handshaking successful, Manually triggering the open event $this->server->defer(function () use ($psr7Req, $fd) { $this->onWsOpen($this->server, $psr7Req, $fd); + }); // delete coId to fd mapping @@ -116,7 +119,7 @@ protected function buildConnectionMetadata(int $fd, Request $request): array $this->log("onHandShake: Client #{$fd} send handshake request to {$path}, client info: ", $info, 'debug'); return [ - 'id' => $fd, + 'fd' => $fd, 'ip' => $info['remote_ip'], 'port' => $info['remote_port'], 'path' => $path, @@ -186,7 +189,7 @@ public function onClose(Server $server, int $fd) if ($fdInfo['websocket_status'] > 0) { $total = $this->count(); - $this->log("onClose: Client #{$fd} connection will close. client count $total, client info:", $fdInfo, 'debug'); + $this->log("onClose: Client #{$fd} connection has been closed. client count $total, client info:", $fdInfo, 'debug'); if (!$meta = WebSocketContext::getMeta(null, $fd)) { $this->log("onClose: Client #{$fd} connection meta info has been lost"); @@ -208,41 +211,4 @@ public function onClose(Server $server, int $fd) WebSocketContext::del($fd); } } - - /** - * @param Request $request - * @param Response $response - * @return bool - */ - protected function simpleHandshake(Request $request, Response $response): bool - { - $this->log("received handshake request from fd #{$request->fd}, co ID #" . Coroutine::tid()); - - // websocket握手连接算法验证 - $secWSKey = $request->header['sec-websocket-key']; - - if (WebSocket::isInvalidSecWSKey($secWSKey)) { - $response->end(); - - return false; - } - - $headers = WebSocket::handshakeHeaders($secWSKey); - - // WebSocket connection to 'ws://127.0.0.1:9502/' - // failed: Error during WebSocket handshake: - // Response must not include 'Sec-WebSocket-Protocol' header if not present in request: websocket - if (isset($request->header['sec-websocket-protocol'])) { - $headers['Sec-WebSocket-Protocol'] = $request->header['sec-websocket-protocol']; - } - - foreach ($headers as $key => $val) { - $response->header($key, $val); - } - - $response->status(101); - $response->end(); - - return true; - } } diff --git a/src/WebSocketServer.php b/src/WebSocketServer.php index 66db909b..96e2fbc2 100644 --- a/src/WebSocketServer.php +++ b/src/WebSocketServer.php @@ -100,44 +100,61 @@ public function log(string $msg, array $data = [], string $type = 'info') return; } - ConsoleUtil::log($msg, $data, $type); + if (\config('debug')) { + ConsoleUtil::log($msg, $data, $type); + } } /***************************************************************************** * some methods for send message ****************************************************************************/ + /** + * @param string $fd + * @param string $data + * @param bool $isBinary + * @param bool $finish + * @return bool + */ + public function push(string $fd, string $data, $isBinary = false, bool $finish = true): bool + { + if (!$this->server->exist($fd)) { + return false; + } + + return $this->server->push($fd, $data, $isBinary, $finish); + } + /** * send message to client(s) * @param string $data * @param int|array $receivers - * @param int|array $expected + * @param int|array $excluded * @param int $sender + * @param int $pageSize * @return int */ - public function send(string $data, $receivers = 0, $expected = 0, int $sender = 0): int + public function send(string $data, $receivers = 0, $excluded = 0, int $sender = 0, int $pageSize = 50): int { if (!$data) { return 0; } $receivers = (array)$receivers; - $expected = (array)$expected; + $excluded = (array)$excluded; // only one receiver if (1 === \count($receivers)) { - return $this->sendTo(array_shift($receivers), $data, $sender); + return $this->sendTo((int)\array_shift($receivers), $data, $sender); } // to all - if (!$expected && !$receivers) { - $this->sendToAll($data, $sender); - // to some - } else { - $this->sendToSome($data, $receivers, $expected, $sender); + if (!$excluded && !$receivers) { + return $this->sendToAll($data, $sender, $pageSize); } - return $this->getErrorNo(); + // to some + return $this->sendToSome($data, $receivers, $excluded, $sender, $pageSize); } /** @@ -155,18 +172,18 @@ public function sendTo(int $receiver, string $data, int $sender = 0): int $this->log("(private)The #{$fromUser} send message to the user #{$receiver}. Data: {$data}"); - return $this->server->push($receiver, $data, $opcode, $finish) ? 0 : -500; + return $this->server->push($receiver, $data, $opcode, $finish) ? 1 : 0; } /** - * broadcast message 广播消息 + * broadcast message, will exclude self. * @param string $data 消息数据 * @param int $sender 发送者 * @param int[] $receivers 指定接收者们 - * @param int[] $expected 要排除的接收者 + * @param int[] $excluded 要排除的接收者 * @return int Return socket last error number code. gt 0 on failure, eq 0 on success */ - public function broadcast(string $data, array $receivers = [], array $expected = [], int $sender = 0): int + public function broadcast(string $data, array $receivers = [], array $excluded = [], int $sender = 0): int { if (!$data) { return 0; @@ -174,26 +191,30 @@ public function broadcast(string $data, array $receivers = [], array $expected = // only one receiver if (1 === \count($receivers)) { - return $this->sendTo(\array_shift($receivers), $data, $sender); + return $this->sendTo((int)\array_shift($receivers), $data, $sender); } // to all - if (!$expected && !$receivers) { - $this->sendToAll($data, $sender); - // to some - } else { - $this->sendToSome($data, $receivers, $expected, $sender); + if (!$excluded && !$receivers) { + return $this->sendToAll($data, $sender); + } + + if ($sender) { + $excluded[] = $sender; } - return $this->getErrorNo(); + // to some + return $this->sendToSome($data, $receivers, $excluded, $sender); } /** + * send message to all connections * @param string $data * @param int $sender + * @param int $pageSize * @return int */ - public function sendToAll(string $data, int $sender = 0): int + public function sendToAll(string $data, int $sender = 0, int $pageSize = 50): int { $startFd = 0; $count = 0; @@ -201,17 +222,17 @@ public function sendToAll(string $data, int $sender = 0): int $this->log("(broadcast)The #{$fromUser} send a message to all users. Data: {$data}"); while (true) { - $connList = $this->server->connection_list($startFd, 50); + $fdList = $this->server->connection_list($startFd, $pageSize); - if ($connList === false || ($num = \count($connList)) === 0) { + if ($fdList === false || ($num = \count($fdList)) === 0) { break; } $count += $num; - $startFd = \end($connList); + $startFd = \end($fdList); - /** @var $connList array */ - foreach ($connList as $fd) { + /** @var $fdList array */ + foreach ($fdList as $fd) { $info = $this->getClientInfo($fd); if ($info && $info['websocket_status'] > 0) { @@ -226,15 +247,14 @@ public function sendToAll(string $data, int $sender = 0): int /** * @param string $data * @param array $receivers - * @param array $expected + * @param array $excluded * @param int $sender + * @param int $pageSize * @return int */ - public function sendToSome(string $data, array $receivers = [], array $expected = [], int $sender = 0): int + public function sendToSome(string $data, array $receivers = [], array $excluded = [], int $sender = 0, int $pageSize = 50): int { $count = 0; - $res = $data; - $len = \strlen($res); $fromUser = $sender < 1 ? 'SYSTEM' : $sender; // to receivers @@ -242,9 +262,9 @@ public function sendToSome(string $data, array $receivers = [], array $expected $this->log("(broadcast)The #{$fromUser} gave some specified user sending a message. Data: {$data}"); foreach ($receivers as $receiver) { - if (WebSocketContext::has($receiver)) { + if ($this->exist($receiver)) { $count++; - $this->server->push($receiver, $res, $len); + $this->server->push($receiver, $data); } } @@ -253,25 +273,23 @@ public function sendToSome(string $data, array $receivers = [], array $expected // to special users $startFd = 0; + $excluded = $excluded ? (array)\array_flip($excluded) : []; + $this->log("(broadcast)The #{$fromUser} send the message to everyone except some people. Data: {$data}"); while (true) { - $connList = $this->server->connection_list($startFd, 50); + $fdList = $this->server->connection_list($startFd, $pageSize); - if ($connList === false || ($num = \count($connList)) === 0) { + if ($fdList === false || ($num = \count($fdList)) === 0) { break; } $count += $num; - $startFd = \end($connList); - - /** @var $connList array */ - foreach ($connList as $fd) { - if (isset($expected[$fd])) { - continue; - } + $startFd = \end($fdList); - if ($receivers && !isset($receivers[$fd])) { + /** @var $fdList array */ + foreach ($fdList as $fd) { + if (isset($excluded[$fd])) { continue; }