diff --git a/sdk/websocket/client.go b/sdk/websocket/client.go index a4e22e11..e9c8b1c9 100644 --- a/sdk/websocket/client.go +++ b/sdk/websocket/client.go @@ -20,11 +20,9 @@ import ( "github.com/proximax-storage/go-xpx-chain-sdk/sdk/websocket/subscribers" ) -const pathWS = "ws" - -type Path string - const ( + pathWS = "ws" + pathBlock Path = "block" pathConfirmedAdded Path = "confirmedAdded" pathUnconfirmedAdded Path = "unconfirmedAdded" @@ -39,138 +37,111 @@ var ( ErrUnsupportedMessageType = errors.New("unsupported message type") ) -func NewClient(ctx context.Context, cfg *sdk.Config) (CatapultClient, error) { - ctx, cancelFunc := context.WithCancel(ctx) +type ( + // Subscribe path + Path string - conn, uid, err := connect(cfg) - if err != nil { - return nil, err + CatapultWebsocketClientImpl struct { + ctx context.Context + cancelFunc context.CancelFunc + + UID string + config *sdk.Config + + conn *websocket.Conn + + blockSubscriber subscribers.Block + statusSubscribers subscribers.Status + cosignatureSubscribers subscribers.Cosignature + partialAddedSubscribers subscribers.PartialAdded + partialRemovedSubscribers subscribers.PartialRemoved + confirmedAddedSubscribers subscribers.ConfirmedAdded + unconfirmedAddedSubscribers subscribers.UnconfirmedAdded + unconfirmedRemovedSubscribers subscribers.UnconfirmedRemoved + + messageRouter Router + topicHandlers TopicHandlersStorage + messagePublisher MessagePublisher + + // connectionStatusCh chan bool + listenCh chan bool // channel for manage current listen status for connection + reconnectCh chan *websocket.Conn // channel for connection with we will close, and open new connection + connectionCh chan *websocket.Conn // channel for new opened connection + + connectFn func(cfg *sdk.Config) (*websocket.Conn, string, error) + } + + Client interface { + io.Closer + + Listen() } - topicHandlers := make(topicHandlers) - messagePublisher := newMessagePublisher(conn) - messageRouter := NewRouter(uid, messagePublisher, topicHandlers) + CatapultClient interface { + Client - return &CatapultWebsocketClientImpl{ - config: cfg, - conn: conn, + AddBlockHandlers(handlers ...subscribers.BlockHandler) error + AddConfirmedAddedHandlers(address *sdk.Address, handlers ...subscribers.ConfirmedAddedHandler) error + AddUnconfirmedAddedHandlers(address *sdk.Address, handlers ...subscribers.UnconfirmedAddedHandler) error + AddUnconfirmedRemovedHandlers(address *sdk.Address, handlers ...subscribers.UnconfirmedRemovedHandler) error + AddPartialAddedHandlers(address *sdk.Address, handlers ...subscribers.PartialAddedHandler) error + AddPartialRemovedHandlers(address *sdk.Address, handlers ...subscribers.PartialRemovedHandler) error + AddStatusHandlers(address *sdk.Address, handlers ...subscribers.StatusHandler) error + AddCosignatureHandlers(address *sdk.Address, handlers ...subscribers.CosignatureHandler) error + } +) + +func NewClient(ctx context.Context, cfg *sdk.Config) (CatapultClient, error) { + ctx, cancelFunc := context.WithCancel(ctx) + + socketClient := &CatapultWebsocketClientImpl{ ctx: ctx, cancelFunc: cancelFunc, - UID: uid, + + config: cfg, blockSubscriber: subscribers.NewBlock(), + statusSubscribers: subscribers.NewStatus(), + cosignatureSubscribers: subscribers.NewCosignature(), + partialAddedSubscribers: subscribers.NewPartialAdded(), + partialRemovedSubscribers: subscribers.NewPartialRemoved(), confirmedAddedSubscribers: subscribers.NewConfirmedAdded(), unconfirmedAddedSubscribers: subscribers.NewUnconfirmedAdded(), unconfirmedRemovedSubscribers: subscribers.NewUnconfirmedRemoved(), - partialAddedSubscribers: subscribers.NewPartialAdded(), - partialRemovedSubscribers: subscribers.NewPartialRemoved(), - statusSubscribers: subscribers.NewStatus(), - cosignatureSubscribers: subscribers.NewCosignature(), - - topicHandlers: topicHandlers, - messageRouter: messageRouter, - messagePublisher: messagePublisher, - }, nil -} -type Client interface { - io.Closer + topicHandlers: make(topicHandlers), - Listen() -} + listenCh: make(chan bool), + reconnectCh: make(chan *websocket.Conn), + connectionCh: make(chan *websocket.Conn), -type CatapultClient interface { - Client - - AddBlockHandlers(handlers ...subscribers.BlockHandler) error - AddConfirmedAddedHandlers(address *sdk.Address, handlers ...subscribers.ConfirmedAddedHandler) error - AddUnconfirmedAddedHandlers(address *sdk.Address, handlers ...subscribers.UnconfirmedAddedHandler) error - AddUnconfirmedRemovedHandlers(address *sdk.Address, handlers ...subscribers.UnconfirmedRemovedHandler) error - AddPartialAddedHandlers(address *sdk.Address, handlers ...subscribers.PartialAddedHandler) error - AddPartialRemovedHandlers(address *sdk.Address, handlers ...subscribers.PartialRemovedHandler) error - AddStatusHandlers(address *sdk.Address, handlers ...subscribers.StatusHandler) error - AddCosignatureHandlers(address *sdk.Address, handlers ...subscribers.CosignatureHandler) error -} - -type CatapultWebsocketClientImpl struct { - config *sdk.Config - conn *websocket.Conn - ctx context.Context - cancelFunc context.CancelFunc - UID string - - blockSubscriber subscribers.Block - confirmedAddedSubscribers subscribers.ConfirmedAdded - unconfirmedAddedSubscribers subscribers.UnconfirmedAdded - unconfirmedRemovedSubscribers subscribers.UnconfirmedRemoved - partialAddedSubscribers subscribers.PartialAdded - partialRemovedSubscribers subscribers.PartialRemoved - statusSubscribers subscribers.Status - cosignatureSubscribers subscribers.Cosignature - - topicHandlers TopicHandlersStorage - messageRouter Router - messagePublisher MessagePublisher - - connectFn func(cfg *sdk.Config) (*websocket.Conn, string, error) - alreadyListening bool -} - -func (c *CatapultWebsocketClientImpl) Listen() { - if c.alreadyListening { - return + connectFn: connect, } - c.alreadyListening = true - - messagesChan := make(chan []byte) - - go func() { - defer c.cancelFunc() - - ReadMessageLoop: - for { - _, resp, e := c.conn.ReadMessage() - if e != nil { - if _, ok := e.(*net.OpError); ok { - // Stop ReadMessage goroutine if user called Close function for websocket client - return - } + go socketClient.handleSignal() - if _, ok := e.(*websocket.CloseError); ok { - // Start websocket reconnect processing if connection was closed - for range time.NewTicker(c.config.WsReconnectionTimeout).C { - if err := c.reconnect(); err != nil { - continue - } + if err := socketClient.initNewConnection(); err != nil { + return socketClient, err + } - continue ReadMessageLoop - } - } + return socketClient, nil +} - return - } +func (c *CatapultWebsocketClientImpl) Listen() { - messagesChan <- resp - } - }() + c.listenCh <- true - for { - select { - case <-c.ctx.Done(): - if c.conn != nil { - if err := c.conn.Close(); err != nil { - panic(err) - } - c.conn = nil - } - return - case msg := <-messagesChan: - go c.messageRouter.RouteMessage(msg) - } + select { + case <-c.ctx.Done(): + c.closeConnection(c.conn) + c.removeHandlers() } } +func (c *CatapultWebsocketClientImpl) Close() error { + c.cancelFunc() + return nil +} func (c *CatapultWebsocketClientImpl) AddBlockHandlers(handlers ...subscribers.BlockHandler) error { if len(handlers) == 0 { return nil @@ -377,20 +348,115 @@ func (c *CatapultWebsocketClientImpl) AddCosignatureHandlers(address *sdk.Addres return nil } -func (c *CatapultWebsocketClientImpl) reconnect() error { +func (c *CatapultWebsocketClientImpl) handleSignal() { + for { + select { + case conn := <-c.connectionCh: + c.conn = conn + case conn := <-c.reconnectCh: + c.closeConnection(conn) + go func() { + c.listenCh <- false + }() + + case <-c.listenCh: + + if c.conn == nil { + err := c.initNewConnection() + if err != nil { + fmt.Println("websocket: connection is failed. Try again after wait period") + select { + case <-time.NewTicker(c.config.WsReconnectionTimeout).C: + go func() { + c.listenCh <- true + }() + continue + } + } + } + + err := c.updateHandlers() + if err != nil { + fmt.Println("websocket: update handles is failed. Try again after timeout period") + select { + case <-time.NewTicker(c.config.WsReconnectionTimeout).C: + continue + } + + } + fmt.Println(fmt.Sprintf("websocket: connection established: %s", c.config.UsedBaseUrl.String())) + c.startListener() + } + } +} + +func (c *CatapultWebsocketClientImpl) removeHandlers() { + c.blockSubscriber = nil + c.confirmedAddedSubscribers = nil + c.unconfirmedAddedSubscribers = nil + c.unconfirmedRemovedSubscribers = nil + c.partialAddedSubscribers = nil + c.partialRemovedSubscribers = nil + c.statusSubscribers = nil + c.cosignatureSubscribers = nil + c.topicHandlers = nil +} + +func (c *CatapultWebsocketClientImpl) closeConnection(conn *websocket.Conn) { + if conn != nil { + if err := conn.Close(); err != nil { + fmt.Println(fmt.Sprintf("websocket: disconnection error: %s", err)) + } + } + c.conn = nil +} + +func (c *CatapultWebsocketClientImpl) startListener() { + + for { + _, resp, e := c.conn.ReadMessage() + if e != nil { + if _, ok := e.(*net.OpError); ok { + // Stop ReadMessage if user called Close function for websocket client + return + } + + if _, ok := e.(*websocket.CloseError); ok { + go func() { + c.reconnectCh <- c.conn + }() + return + } + } + + go func() { + c.messageRouter.RouteMessage(resp) + }() + } +} + +func (c *CatapultWebsocketClientImpl) initNewConnection() error { conn, uid, err := c.connectFn(c.config) if err != nil { return err } - c.conn = conn c.UID = uid + c.conn = conn + + messagePublisher := newMessagePublisher(c.conn) + messageRouter := NewRouter(c.UID, messagePublisher, c.topicHandlers) + + c.messageRouter = messageRouter + c.messagePublisher = messagePublisher + + return nil +} - c.messagePublisher.SetConn(conn) - c.messageRouter.SetUid(uid) +func (c *CatapultWebsocketClientImpl) updateHandlers() error { - if c.blockSubscriber.HasHandlers() { + if c.topicHandlers.HasHandler(pathBlock) { if err := c.messagePublisher.PublishSubscribeMessage(c.UID, Path(fmt.Sprintf("%s", pathBlock))); err != nil { return err } @@ -441,33 +507,6 @@ func (c *CatapultWebsocketClientImpl) reconnect() error { return nil } -func (c *CatapultWebsocketClientImpl) Close() error { - if c.conn != nil { - if err := c.conn.Close(); err != nil { - return err - } - - c.conn = nil - } - - c.cancelFunc() - - c.alreadyListening = false - - c.blockSubscriber = nil - c.confirmedAddedSubscribers = nil - c.unconfirmedAddedSubscribers = nil - c.unconfirmedRemovedSubscribers = nil - c.partialAddedSubscribers = nil - c.partialRemovedSubscribers = nil - c.statusSubscribers = nil - c.cosignatureSubscribers = nil - - c.topicHandlers = nil - - return nil -} - func connect(cfg *sdk.Config) (*websocket.Conn, string, error) { var conn *websocket.Conn var err error diff --git a/sdk/websocket/client_test.go b/sdk/websocket/client_test.go index 83b98cb7..15d27b4c 100644 --- a/sdk/websocket/client_test.go +++ b/sdk/websocket/client_test.go @@ -5,16 +5,12 @@ package websocket import ( - "context" - "fmt" "testing" - "github.com/gorilla/websocket" "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - mocks2 "github.com/proximax-storage/go-xpx-chain-sdk/mocks" mocks "github.com/proximax-storage/go-xpx-chain-sdk/mocks/subscribers" "github.com/proximax-storage/go-xpx-chain-sdk/sdk" "github.com/proximax-storage/go-xpx-chain-sdk/sdk/websocket/subscribers" @@ -969,259 +965,3 @@ func TestCatapultWebsocketClientImpl_AddCosignatureHandlers(t *testing.T) { }) } } - -func TestCatapultWebsocketClientImpl_reconnect(t *testing.T) { - type fields struct { - config *sdk.Config - conn *websocket.Conn - ctx context.Context - cancelFunc context.CancelFunc - UID string - blockSubscriber subscribers.Block - confirmedAddedSubscribers subscribers.ConfirmedAdded - unconfirmedAddedSubscribers subscribers.UnconfirmedAdded - unconfirmedRemovedSubscribers subscribers.UnconfirmedRemoved - partialAddedSubscribers subscribers.PartialAdded - partialRemovedSubscribers subscribers.PartialRemoved - statusSubscribers subscribers.Status - cosignatureSubscribers subscribers.Cosignature - topicHandlers TopicHandlersStorage - messageRouter Router - messagePublisher MessagePublisher - connectFn func(cfg *sdk.Config) (*websocket.Conn, string, error) - alreadyListening bool - } - - errorConnectFn := func(cfg *sdk.Config) (*websocket.Conn, string, error) { - return nil, "", errors.New("test error") - } - - successConnectFn := func(cfg *sdk.Config) (*websocket.Conn, string, error) { - return nil, "test-uid", nil - } - - mockRouter := new(mocks2.Router) - mockRouter.On("SetUid", mock.Anything).Return(nil) - - publisherError := errors.New("test publisher error") - mockMessagePublisher := new(MockMessagePublisher) - mockMessagePublisher. - On("SetConn", mock.Anything).Return(). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s", pathBlock))).Return(publisherError).Once(). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s", pathBlock))).Return(nil). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathConfirmedAdded, "test confirmed added address"))).Return(publisherError).Once(). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathConfirmedAdded, "test confirmed added address"))).Return(nil). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathCosignature, "test cosignature address"))).Return(publisherError).Once(). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathCosignature, "test cosignature address"))).Return(nil). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathPartialAdded, "test partial added address"))).Return(publisherError).Once(). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathPartialAdded, "test partial added address"))).Return(nil). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathPartialRemoved, "test partial removed address"))).Return(publisherError).Once(). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathPartialRemoved, "test partial removed address"))).Return(nil). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathStatus, "test status address"))).Return(publisherError).Once(). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathStatus, "test status address"))).Return(nil). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathUnconfirmedAdded, "test unconfirmed added address"))).Return(publisherError).Once(). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathUnconfirmedAdded, "test unconfirmed added address"))).Return(nil). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathUnconfirmedRemoved, "test unconfirmed removed address"))).Return(publisherError).Once(). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathUnconfirmedRemoved, "test unconfirmed removed address"))).Return(nil) - - mockBlockSubscriber := new(mocks.Block) - mockBlockSubscriber.On("HasHandlers").Return(true) - - mockConfirmedAddedSubscriber := new(mocks.ConfirmedAdded) - mockConfirmedAddedSubscriber.On("GetAddresses").Return([]string{"test confirmed added address"}) - - mockCosignatureSubscriber := new(mocks.Cosignature) - mockCosignatureSubscriber.On("GetAddresses").Return([]string{"test cosignature address"}) - - mockPartialAddedSubscriber := new(mocks.PartialAdded) - mockPartialAddedSubscriber.On("GetAddresses").Return([]string{"test partial added address"}) - - mockPartialRemovedSubscriber := new(mocks.PartialRemoved) - mockPartialRemovedSubscriber.On("GetAddresses").Return([]string{"test partial removed address"}) - - mockStatusSubscriber := new(mocks.Status) - mockStatusSubscriber.On("GetAddresses").Return([]string{"test status address"}) - - mockUnconfirmedAddedSubscriber := new(mocks.UnconfirmedAdded) - mockUnconfirmedAddedSubscriber.On("GetAddresses").Return([]string{"test unconfirmed added address"}) - - mockUnconfirmedRemovedSubscriber := new(mocks.UnconfirmedRemoved) - mockUnconfirmedRemovedSubscriber.On("GetAddresses").Return([]string{"test unconfirmed removed address"}) - - tests := []struct { - name string - fields fields - wantErr bool - }{ - { - name: "connection error", - fields: fields{ - config: &sdk.Config{}, - connectFn: errorConnectFn, - }, - wantErr: true, - }, - { - name: "block message publisher error", - fields: fields{ - config: &sdk.Config{}, - connectFn: successConnectFn, - blockSubscriber: mockBlockSubscriber, - messagePublisher: mockMessagePublisher, - messageRouter: mockRouter, - }, - wantErr: true, - }, - { - name: "confirmed added message publisher error", - fields: fields{ - config: &sdk.Config{}, - connectFn: successConnectFn, - blockSubscriber: mockBlockSubscriber, - confirmedAddedSubscribers: mockConfirmedAddedSubscriber, - messagePublisher: mockMessagePublisher, - messageRouter: mockRouter, - }, - wantErr: true, - }, - { - name: "confirmed added message publisher error", - fields: fields{ - config: &sdk.Config{}, - connectFn: successConnectFn, - blockSubscriber: mockBlockSubscriber, - confirmedAddedSubscribers: mockConfirmedAddedSubscriber, - cosignatureSubscribers: mockCosignatureSubscriber, - messagePublisher: mockMessagePublisher, - messageRouter: mockRouter, - }, - wantErr: true, - }, - { - name: "partial added message publisher error", - fields: fields{ - config: &sdk.Config{}, - connectFn: successConnectFn, - blockSubscriber: mockBlockSubscriber, - confirmedAddedSubscribers: mockConfirmedAddedSubscriber, - cosignatureSubscribers: mockCosignatureSubscriber, - partialAddedSubscribers: mockPartialAddedSubscriber, - messagePublisher: mockMessagePublisher, - messageRouter: mockRouter, - }, - wantErr: true, - }, - { - name: "partial removed message publisher error", - fields: fields{ - config: &sdk.Config{}, - connectFn: successConnectFn, - blockSubscriber: mockBlockSubscriber, - confirmedAddedSubscribers: mockConfirmedAddedSubscriber, - cosignatureSubscribers: mockCosignatureSubscriber, - partialAddedSubscribers: mockPartialAddedSubscriber, - partialRemovedSubscribers: mockPartialRemovedSubscriber, - messagePublisher: mockMessagePublisher, - messageRouter: mockRouter, - }, - wantErr: true, - }, - { - name: "status message publisher error", - fields: fields{ - config: &sdk.Config{}, - connectFn: successConnectFn, - blockSubscriber: mockBlockSubscriber, - confirmedAddedSubscribers: mockConfirmedAddedSubscriber, - cosignatureSubscribers: mockCosignatureSubscriber, - partialAddedSubscribers: mockPartialAddedSubscriber, - partialRemovedSubscribers: mockPartialRemovedSubscriber, - statusSubscribers: mockStatusSubscriber, - unconfirmedAddedSubscribers: mockUnconfirmedAddedSubscriber, - messagePublisher: mockMessagePublisher, - messageRouter: mockRouter, - }, - wantErr: true, - }, - { - name: "unconfirmed added message publisher error", - fields: fields{ - config: &sdk.Config{}, - connectFn: successConnectFn, - blockSubscriber: mockBlockSubscriber, - confirmedAddedSubscribers: mockConfirmedAddedSubscriber, - cosignatureSubscribers: mockCosignatureSubscriber, - partialAddedSubscribers: mockPartialAddedSubscriber, - partialRemovedSubscribers: mockPartialRemovedSubscriber, - statusSubscribers: mockStatusSubscriber, - unconfirmedAddedSubscribers: mockUnconfirmedAddedSubscriber, - messagePublisher: mockMessagePublisher, - messageRouter: mockRouter, - }, - wantErr: true, - }, - { - name: "unconfirmed removed message publisher error", - fields: fields{ - config: &sdk.Config{}, - connectFn: successConnectFn, - blockSubscriber: mockBlockSubscriber, - confirmedAddedSubscribers: mockConfirmedAddedSubscriber, - cosignatureSubscribers: mockCosignatureSubscriber, - partialAddedSubscribers: mockPartialAddedSubscriber, - partialRemovedSubscribers: mockPartialRemovedSubscriber, - statusSubscribers: mockStatusSubscriber, - unconfirmedAddedSubscribers: mockUnconfirmedAddedSubscriber, - unconfirmedRemovedSubscribers: mockUnconfirmedRemovedSubscriber, - messagePublisher: mockMessagePublisher, - messageRouter: mockRouter, - }, - wantErr: true, - }, - { - name: "success", - fields: fields{ - config: &sdk.Config{}, - connectFn: successConnectFn, - blockSubscriber: mockBlockSubscriber, - confirmedAddedSubscribers: mockConfirmedAddedSubscriber, - cosignatureSubscribers: mockCosignatureSubscriber, - partialAddedSubscribers: mockPartialAddedSubscriber, - partialRemovedSubscribers: mockPartialRemovedSubscriber, - statusSubscribers: mockStatusSubscriber, - unconfirmedAddedSubscribers: mockUnconfirmedAddedSubscriber, - unconfirmedRemovedSubscribers: mockUnconfirmedRemovedSubscriber, - messagePublisher: mockMessagePublisher, - messageRouter: mockRouter, - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &CatapultWebsocketClientImpl{ - config: tt.fields.config, - conn: tt.fields.conn, - ctx: tt.fields.ctx, - cancelFunc: tt.fields.cancelFunc, - UID: tt.fields.UID, - blockSubscriber: tt.fields.blockSubscriber, - confirmedAddedSubscribers: tt.fields.confirmedAddedSubscribers, - unconfirmedAddedSubscribers: tt.fields.unconfirmedAddedSubscribers, - unconfirmedRemovedSubscribers: tt.fields.unconfirmedRemovedSubscribers, - partialAddedSubscribers: tt.fields.partialAddedSubscribers, - partialRemovedSubscribers: tt.fields.partialRemovedSubscribers, - statusSubscribers: tt.fields.statusSubscribers, - cosignatureSubscribers: tt.fields.cosignatureSubscribers, - topicHandlers: tt.fields.topicHandlers, - messageRouter: tt.fields.messageRouter, - messagePublisher: tt.fields.messagePublisher, - alreadyListening: tt.fields.alreadyListening, - connectFn: tt.fields.connectFn, - } - - err := c.reconnect() - assert.Equal(t, err != nil, tt.wantErr) - }) - } -}