Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update websocket client connection flow #52

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
327 changes: 183 additions & 144 deletions sdk/websocket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ import (
"github.com/proximax-storage/go-xpx-chain-sdk/sdk/websocket/subscribers"
)

const pathWS = "ws"

type Path string

const (
pathWS = "ws"

pathBlock Path = "block"
pathConfirmedAdded Path = "confirmedAdded"
pathUnconfirmedAdded Path = "unconfirmedAdded"
Expand All @@ -39,138 +37,111 @@ var (
ErrUnsupportedMessageType = errors.New("unsupported message type")
)

func NewClient(ctx context.Context, cfg *sdk.Config) (CatapultClient, error) {
ctx, cancelFunc := context.WithCancel(ctx)
type (
// Subscribe path
Path string

conn, uid, err := connect(cfg)
if err != nil {
return nil, err
CatapultWebsocketClientImpl struct {
ctx context.Context
cancelFunc context.CancelFunc

UID string
config *sdk.Config

conn *websocket.Conn

blockSubscriber subscribers.Block
statusSubscribers subscribers.Status
cosignatureSubscribers subscribers.Cosignature
partialAddedSubscribers subscribers.PartialAdded
partialRemovedSubscribers subscribers.PartialRemoved
confirmedAddedSubscribers subscribers.ConfirmedAdded
unconfirmedAddedSubscribers subscribers.UnconfirmedAdded
unconfirmedRemovedSubscribers subscribers.UnconfirmedRemoved

messageRouter Router
topicHandlers TopicHandlersStorage
messagePublisher MessagePublisher

// connectionStatusCh chan bool
listenCh chan bool // channel for manage current listen status for connection
reconnectCh chan *websocket.Conn // channel for connection with we will close, and open new connection
connectionCh chan *websocket.Conn // channel for new opened connection

connectFn func(cfg *sdk.Config) (*websocket.Conn, string, error)
}

Client interface {
io.Closer

Listen()
}

topicHandlers := make(topicHandlers)
messagePublisher := newMessagePublisher(conn)
messageRouter := NewRouter(uid, messagePublisher, topicHandlers)
CatapultClient interface {
Client

return &CatapultWebsocketClientImpl{
config: cfg,
conn: conn,
AddBlockHandlers(handlers ...subscribers.BlockHandler) error
AddConfirmedAddedHandlers(address *sdk.Address, handlers ...subscribers.ConfirmedAddedHandler) error
AddUnconfirmedAddedHandlers(address *sdk.Address, handlers ...subscribers.UnconfirmedAddedHandler) error
AddUnconfirmedRemovedHandlers(address *sdk.Address, handlers ...subscribers.UnconfirmedRemovedHandler) error
AddPartialAddedHandlers(address *sdk.Address, handlers ...subscribers.PartialAddedHandler) error
AddPartialRemovedHandlers(address *sdk.Address, handlers ...subscribers.PartialRemovedHandler) error
AddStatusHandlers(address *sdk.Address, handlers ...subscribers.StatusHandler) error
AddCosignatureHandlers(address *sdk.Address, handlers ...subscribers.CosignatureHandler) error
}
)

func NewClient(ctx context.Context, cfg *sdk.Config) (CatapultClient, error) {
ctx, cancelFunc := context.WithCancel(ctx)

socketClient := &CatapultWebsocketClientImpl{
ctx: ctx,
cancelFunc: cancelFunc,
UID: uid,

config: cfg,

blockSubscriber: subscribers.NewBlock(),
statusSubscribers: subscribers.NewStatus(),
cosignatureSubscribers: subscribers.NewCosignature(),
partialAddedSubscribers: subscribers.NewPartialAdded(),
partialRemovedSubscribers: subscribers.NewPartialRemoved(),
confirmedAddedSubscribers: subscribers.NewConfirmedAdded(),
unconfirmedAddedSubscribers: subscribers.NewUnconfirmedAdded(),
unconfirmedRemovedSubscribers: subscribers.NewUnconfirmedRemoved(),
partialAddedSubscribers: subscribers.NewPartialAdded(),
partialRemovedSubscribers: subscribers.NewPartialRemoved(),
statusSubscribers: subscribers.NewStatus(),
cosignatureSubscribers: subscribers.NewCosignature(),

topicHandlers: topicHandlers,
messageRouter: messageRouter,
messagePublisher: messagePublisher,
}, nil
}

type Client interface {
io.Closer
topicHandlers: make(topicHandlers),

Listen()
}
listenCh: make(chan bool),
reconnectCh: make(chan *websocket.Conn),
connectionCh: make(chan *websocket.Conn),

type CatapultClient interface {
Client

AddBlockHandlers(handlers ...subscribers.BlockHandler) error
AddConfirmedAddedHandlers(address *sdk.Address, handlers ...subscribers.ConfirmedAddedHandler) error
AddUnconfirmedAddedHandlers(address *sdk.Address, handlers ...subscribers.UnconfirmedAddedHandler) error
AddUnconfirmedRemovedHandlers(address *sdk.Address, handlers ...subscribers.UnconfirmedRemovedHandler) error
AddPartialAddedHandlers(address *sdk.Address, handlers ...subscribers.PartialAddedHandler) error
AddPartialRemovedHandlers(address *sdk.Address, handlers ...subscribers.PartialRemovedHandler) error
AddStatusHandlers(address *sdk.Address, handlers ...subscribers.StatusHandler) error
AddCosignatureHandlers(address *sdk.Address, handlers ...subscribers.CosignatureHandler) error
}

type CatapultWebsocketClientImpl struct {
config *sdk.Config
conn *websocket.Conn
ctx context.Context
cancelFunc context.CancelFunc
UID string

blockSubscriber subscribers.Block
confirmedAddedSubscribers subscribers.ConfirmedAdded
unconfirmedAddedSubscribers subscribers.UnconfirmedAdded
unconfirmedRemovedSubscribers subscribers.UnconfirmedRemoved
partialAddedSubscribers subscribers.PartialAdded
partialRemovedSubscribers subscribers.PartialRemoved
statusSubscribers subscribers.Status
cosignatureSubscribers subscribers.Cosignature

topicHandlers TopicHandlersStorage
messageRouter Router
messagePublisher MessagePublisher

connectFn func(cfg *sdk.Config) (*websocket.Conn, string, error)
alreadyListening bool
}

func (c *CatapultWebsocketClientImpl) Listen() {
if c.alreadyListening {
return
connectFn: connect,
}

c.alreadyListening = true

messagesChan := make(chan []byte)

go func() {
defer c.cancelFunc()

ReadMessageLoop:
for {
_, resp, e := c.conn.ReadMessage()
if e != nil {
if _, ok := e.(*net.OpError); ok {
// Stop ReadMessage goroutine if user called Close function for websocket client
return
}
go socketClient.handleSignal()

if _, ok := e.(*websocket.CloseError); ok {
// Start websocket reconnect processing if connection was closed
for range time.NewTicker(c.config.WsReconnectionTimeout).C {
if err := c.reconnect(); err != nil {
continue
}
if err := socketClient.initNewConnection(); err != nil {
return socketClient, err
}

continue ReadMessageLoop
}
}
return socketClient, nil
}

return
}
func (c *CatapultWebsocketClientImpl) Listen() {

messagesChan <- resp
}
}()
c.listenCh <- true

for {
select {
case <-c.ctx.Done():
if c.conn != nil {
if err := c.conn.Close(); err != nil {
panic(err)
}
c.conn = nil
}
return
case msg := <-messagesChan:
go c.messageRouter.RouteMessage(msg)
}
select {
case <-c.ctx.Done():
c.closeConnection(c.conn)
c.removeHandlers()
}
}

func (c *CatapultWebsocketClientImpl) Close() error {
c.cancelFunc()
return nil
}
func (c *CatapultWebsocketClientImpl) AddBlockHandlers(handlers ...subscribers.BlockHandler) error {
if len(handlers) == 0 {
return nil
Expand Down Expand Up @@ -377,20 +348,115 @@ func (c *CatapultWebsocketClientImpl) AddCosignatureHandlers(address *sdk.Addres
return nil
}

func (c *CatapultWebsocketClientImpl) reconnect() error {
func (c *CatapultWebsocketClientImpl) handleSignal() {
for {
select {
case conn := <-c.connectionCh:
c.conn = conn
case conn := <-c.reconnectCh:
c.closeConnection(conn)
go func() {
c.listenCh <- false
}()

case <-c.listenCh:

if c.conn == nil {
err := c.initNewConnection()
if err != nil {
fmt.Println("websocket: connection is failed. Try again after wait period")
select {
case <-time.NewTicker(c.config.WsReconnectionTimeout).C:
go func() {
c.listenCh <- true
}()
continue
}
}
}

err := c.updateHandlers()
if err != nil {
fmt.Println("websocket: update handles is failed. Try again after timeout period")
select {
case <-time.NewTicker(c.config.WsReconnectionTimeout).C:
continue
}

}
fmt.Println(fmt.Sprintf("websocket: connection established: %s", c.config.UsedBaseUrl.String()))
c.startListener()
}
}
}

func (c *CatapultWebsocketClientImpl) removeHandlers() {
c.blockSubscriber = nil
c.confirmedAddedSubscribers = nil
c.unconfirmedAddedSubscribers = nil
c.unconfirmedRemovedSubscribers = nil
c.partialAddedSubscribers = nil
c.partialRemovedSubscribers = nil
c.statusSubscribers = nil
c.cosignatureSubscribers = nil

c.topicHandlers = nil
}

func (c *CatapultWebsocketClientImpl) closeConnection(conn *websocket.Conn) {
if conn != nil {
if err := conn.Close(); err != nil {
fmt.Println(fmt.Sprintf("websocket: disconnection error: %s", err))
}
}
c.conn = nil
}

func (c *CatapultWebsocketClientImpl) startListener() {

for {
_, resp, e := c.conn.ReadMessage()
if e != nil {
if _, ok := e.(*net.OpError); ok {
// Stop ReadMessage if user called Close function for websocket client
return
}

if _, ok := e.(*websocket.CloseError); ok {
go func() {
c.reconnectCh <- c.conn
}()
return
}
}

go func() {
c.messageRouter.RouteMessage(resp)
}()
}
}

func (c *CatapultWebsocketClientImpl) initNewConnection() error {
conn, uid, err := c.connectFn(c.config)
if err != nil {
return err
}

c.conn = conn
c.UID = uid
c.conn = conn

messagePublisher := newMessagePublisher(c.conn)
messageRouter := NewRouter(c.UID, messagePublisher, c.topicHandlers)

c.messageRouter = messageRouter
c.messagePublisher = messagePublisher

return nil
}

c.messagePublisher.SetConn(conn)
c.messageRouter.SetUid(uid)
func (c *CatapultWebsocketClientImpl) updateHandlers() error {

if c.blockSubscriber.HasHandlers() {
if c.topicHandlers.HasHandler(pathBlock) {
if err := c.messagePublisher.PublishSubscribeMessage(c.UID, Path(fmt.Sprintf("%s", pathBlock))); err != nil {
return err
}
Expand Down Expand Up @@ -441,33 +507,6 @@ func (c *CatapultWebsocketClientImpl) reconnect() error {
return nil
}

func (c *CatapultWebsocketClientImpl) Close() error {
if c.conn != nil {
if err := c.conn.Close(); err != nil {
return err
}

c.conn = nil
}

c.cancelFunc()

c.alreadyListening = false

c.blockSubscriber = nil
c.confirmedAddedSubscribers = nil
c.unconfirmedAddedSubscribers = nil
c.unconfirmedRemovedSubscribers = nil
c.partialAddedSubscribers = nil
c.partialRemovedSubscribers = nil
c.statusSubscribers = nil
c.cosignatureSubscribers = nil

c.topicHandlers = nil

return nil
}

func connect(cfg *sdk.Config) (*websocket.Conn, string, error) {
var conn *websocket.Conn
var err error
Expand Down
Loading