Skip to content

Commit

Permalink
backend: properly close Read & Send pumps
Browse files Browse the repository at this point in the history
  • Loading branch information
mytja committed Dec 28, 2024
1 parent be8edc8 commit fa76dcb
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 47 deletions.
67 changes: 44 additions & 23 deletions backend/internal/lobby/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ import (
)

const (
writeTimeout = 5 * time.Second
writeTimeout = 5 * time.Second
SEND_FAILURE_LIMIT = 25
)

type clientImpl struct {
clientId string
user sql.User
addr net.Addr
conn *websocket.Conn
clientId string
user sql.User
addr net.Addr
conn *websocket.Conn
sendFailure int

logger *zap.SugaredLogger

Expand All @@ -37,12 +39,13 @@ type clientImpl struct {
// and allows server to communicate with him
func NewClient(user sql.User, conn *websocket.Conn, serv Server, logger *zap.Logger) Client {
return &clientImpl{
clientId: uuid.NewString(),
user: user,
addr: conn.RemoteAddr(),
conn: conn,
logger: logger.Sugar(),
server: serv,
clientId: uuid.NewString(),
user: user,
addr: conn.RemoteAddr(),
conn: conn,
logger: logger.Sugar(),
server: serv,
sendFailure: 0,

send: make(chan *lobby_messages.LobbyMessage),
}
Expand All @@ -68,19 +71,36 @@ func (c *clientImpl) GetRemoteAddr() net.Addr {

// Close closes the client
func (c *clientImpl) Close() {
//close(c.send)
defer func() {
if r := recover(); r != nil {
c.logger.Debugw("recovered after a panic", "err", r)
}
}()
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
c.conn.Close()
// TODO: Server needs to be aware that we disconnected as well
close(c.send)
}

// SendClientCloseToServer initiates the client close procedure
// by sending close message to server
func (c *clientImpl) SendClientCloseToServer() {
c.logger.Debugw("sending lobby.disconnect message", "client", c)
events.Publish("lobby.disconnect", c)
}

// Send sends lobby_messages
func (c *clientImpl) Send(msg *lobby_messages.LobbyMessage) {
c.logger.Debugw("sending message to c.send", "msg", msg)
c.logger.Debugw("sending message to c.send", "clientId", c.clientId, "userId", c.user.ID)
select {
case c.send <- msg:
c.logger.Debugw("sent message to c.send", "msg", msg)
c.logger.Debugw("sent message to c.send", "clientId", c.clientId, "userId", c.user.ID)
c.sendFailure = 0
case <-time.After(5 * time.Second):
c.logger.Warnw("timeout sending message to c.send", "msg", msg)
c.logger.Warnw("timeout sending message to c.send", "clientId", c.clientId, "userId", c.user.ID)
c.sendFailure++
if c.sendFailure >= SEND_FAILURE_LIMIT {
c.SendClientCloseToServer()
}
}
}

Expand All @@ -99,7 +119,7 @@ func (c *clientImpl) ReadPump() {

authenticated := false

defer c.Close()
defer c.SendClientCloseToServer()
for {
_, msg, err := c.conn.ReadMessage()
if err != nil {
Expand Down Expand Up @@ -178,18 +198,20 @@ func (c *clientImpl) ReadPump() {
break
}
}

c.logger.Debugw("sending server.disconnect message", "client", c)
events.Publish("lobby.disconnect", c)
//events.Publish("server.disconnect", c.user.ID)
}

// SendPump sends lobby_messages to client and checks if there is an error and returns it
func (c *clientImpl) SendPump() {
c.logger.Debugw("started send pump for client",
"id", c.user.ID, "remoteAddr", c.addr)

for message := range c.send {
for {
message, open := <-c.send
if !open {
c.logger.Infow("gracefully closing SendPump", "id", c.user.ID, "clientId", c.clientId, "remoteAddr", c.addr)
break
}

c.logger.Debugw("sending message", "msg", message, "client", c.clientId)

// So we don't wait for too long before we send
Expand Down Expand Up @@ -227,5 +249,4 @@ func (c *clientImpl) SendPump() {
}

c.logger.Debugw("exiting client send pump", "id", c.user.ID, "remoteAddr", c.addr)
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
}
1 change: 1 addition & 0 deletions backend/internal/lobby/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Server interface {
AddNewFriend(userId string, friendEmail string)
IncomingFriendRequestAcceptDeny(userId string, friendRequestId string, accept bool)
RemoveFriend(userId string, relationshipId string)
Disconnect(client Client)
}

// Client contains all the methods we need for recognising and working with the Client
Expand Down
69 changes: 45 additions & 24 deletions backend/internal/ws/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@ import (
)

const (
writeTimeout = 5 * time.Second
writeTimeout = 5 * time.Second
SEND_FAILURE_LIMIT = 25
)

type clientImpl struct {
clientId string
user sql.User
addr net.Addr
position int32
conn *websocket.Conn
game string
clientId string
user sql.User
addr net.Addr
position int32
conn *websocket.Conn
game string
sendFailure int

logger *zap.SugaredLogger

Expand Down Expand Up @@ -73,21 +75,39 @@ func (c *clientImpl) GetRemoteAddr() net.Addr {
return c.addr
}

// Close closes the client
// Close closes the client.
// Should be called only from the server
func (c *clientImpl) Close() {
//close(c.send)
defer func() {
if r := recover(); r != nil {
c.logger.Debugw("recovered after a panic", "err", r)
}
}()
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
c.conn.Close()
// TODO: Server needs to be aware that we disconnected as well
close(c.send)
}

// SendClientCloseToServer initiates the client close procedure
// by sending close message to server
func (c *clientImpl) SendClientCloseToServer() {
c.logger.Debugw("sending server.disconnect message", "client", c)
events.Publish("server.disconnect", c)
}

// Send sends messages
func (c *clientImpl) Send(msg *messages.Message) {
c.logger.Debugw("sending message to c.send", "msg", msg)
c.logger.Debugw("sending message to c.send", "clientId", c.clientId, "userId", c.user.ID)
select {
case c.send <- msg:
c.logger.Debugw("sent message to c.send", "msg", msg)
c.logger.Debugw("sent message to c.send", "clientId", c.clientId, "userId", c.user.ID)
c.sendFailure = 0
case <-time.After(5 * time.Second):
c.logger.Warnw("timeout sending message to c.send", "msg", msg)
c.logger.Warnw("timeout sending message to c.send", "clientId", c.clientId, "userId", c.user.ID)
c.sendFailure++
if c.sendFailure >= SEND_FAILURE_LIMIT {
c.SendClientCloseToServer()
}
}
}

Expand All @@ -104,7 +124,7 @@ func (c *clientImpl) ReadPump() {
c.logger.Debugw("started read pump for client",
"id", c.user.ID, "remoteAddr", c.addr)

defer c.Close()
defer c.SendClientCloseToServer()
for {
_, msg, err := c.conn.ReadMessage()
if err != nil {
Expand Down Expand Up @@ -253,43 +273,45 @@ func (c *clientImpl) ReadPump() {
break
}
}

c.logger.Debugw("sending server.disconnect message", "client", c)
events.Publish("server.disconnect", c)
//events.Publish("server.disconnect", c.user.ID)
}

// SendPump sends messages to client and checks if there is an error and returns it
func (c *clientImpl) SendPump() {
c.logger.Debugw("started send pump for client",
"id", c.user.ID, "remoteAddr", c.addr)

for message := range c.send {
for {
message, open := <-c.send
if !open {
c.logger.Debugw("gracefully closing SendPump", "id", c.user.ID, "clientId", c.clientId, "remoteAddr", c.addr)
break
}

c.logger.Debugw("sending message", "msg", message, "client", c.clientId)

// So we don't wait for too long before we send
err := c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
if err != nil {
c.logger.Errorw("error while setting write deadline", "err", err)
return
break
}

writer, err := c.conn.NextWriter(websocket.BinaryMessage)
if err != nil {
c.logger.Warnw("error while getting NextWriter for client",
"id", c.user.ID, "remoteAddr", c.addr, zap.Error(err))
return
break
}

rawMessage, err := proto.Marshal(message)
if err != nil {
c.logger.Errorw("error while marshalling protobuf message", "err", err)
return
break
}
_, err = writer.Write(rawMessage)
if err != nil {
c.logger.Errorw("error while writing to the writer", "err", err)
return
break
}
// We need to close the writer so that our message
// gets flushed to the client
Expand All @@ -300,5 +322,4 @@ func (c *clientImpl) SendPump() {
}

c.logger.Debugw("exiting client send pump", "id", c.user.ID, "remoteAddr", c.addr)
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
}

0 comments on commit fa76dcb

Please sign in to comment.