From 05b28a9be58579df0be5fcb3a2f64511de02b7a7 Mon Sep 17 00:00:00 2001 From: vadimdidenkogs Date: Fri, 27 Sep 2019 03:57:13 +0300 Subject: [PATCH 1/3] Update client listening flow --- sdk/websocket/client.go | 326 +++++++++++++++++++++++----------------- 1 file changed, 185 insertions(+), 141 deletions(-) diff --git a/sdk/websocket/client.go b/sdk/websocket/client.go index a4e22e11..8e23adac 100644 --- a/sdk/websocket/client.go +++ b/sdk/websocket/client.go @@ -20,11 +20,9 @@ import ( "github.com/proximax-storage/go-xpx-chain-sdk/sdk/websocket/subscribers" ) -const pathWS = "ws" - -type Path string - const ( + pathWS = "ws" + pathBlock Path = "block" pathConfirmedAdded Path = "confirmedAdded" pathUnconfirmedAdded Path = "unconfirmedAdded" @@ -39,138 +37,222 @@ var ( ErrUnsupportedMessageType = errors.New("unsupported message type") ) -func NewClient(ctx context.Context, cfg *sdk.Config) (CatapultClient, error) { - ctx, cancelFunc := context.WithCancel(ctx) +type ( + // Subscribe path + Path string - conn, uid, err := connect(cfg) - if err != nil { - return nil, err + CatapultWebsocketClientImpl struct { + ctx context.Context + cancelFunc context.CancelFunc + + UID string + config *sdk.Config + + conn *websocket.Conn + + blockSubscriber subscribers.Block + statusSubscribers subscribers.Status + cosignatureSubscribers subscribers.Cosignature + partialAddedSubscribers subscribers.PartialAdded + partialRemovedSubscribers subscribers.PartialRemoved + confirmedAddedSubscribers subscribers.ConfirmedAdded + unconfirmedAddedSubscribers subscribers.UnconfirmedAdded + unconfirmedRemovedSubscribers subscribers.UnconfirmedRemoved + + messageRouter Router + topicHandlers TopicHandlersStorage + messagePublisher MessagePublisher + + connectionStatusCh chan bool + listenCh chan bool // channel for manage current listen status for connection + reconnectCh chan *websocket.Conn // channel for connection with we will close, and open new connection + connectionCh chan *websocket.Conn // channel for new opened connection + + connectFn func(cfg *sdk.Config) (*websocket.Conn, string, error) } - topicHandlers := make(topicHandlers) - messagePublisher := newMessagePublisher(conn) - messageRouter := NewRouter(uid, messagePublisher, topicHandlers) + Client interface { + io.Closer + + Listen() + } - return &CatapultWebsocketClientImpl{ - config: cfg, - conn: conn, + CatapultClient interface { + Client + + AddBlockHandlers(handlers ...subscribers.BlockHandler) error + AddConfirmedAddedHandlers(address *sdk.Address, handlers ...subscribers.ConfirmedAddedHandler) error + AddUnconfirmedAddedHandlers(address *sdk.Address, handlers ...subscribers.UnconfirmedAddedHandler) error + AddUnconfirmedRemovedHandlers(address *sdk.Address, handlers ...subscribers.UnconfirmedRemovedHandler) error + AddPartialAddedHandlers(address *sdk.Address, handlers ...subscribers.PartialAddedHandler) error + AddPartialRemovedHandlers(address *sdk.Address, handlers ...subscribers.PartialRemovedHandler) error + AddStatusHandlers(address *sdk.Address, handlers ...subscribers.StatusHandler) error + AddCosignatureHandlers(address *sdk.Address, handlers ...subscribers.CosignatureHandler) error + } +) + +func NewClient(ctx context.Context, cfg *sdk.Config) (CatapultClient, error) { + ctx, cancelFunc := context.WithCancel(ctx) + + socketClient := &CatapultWebsocketClientImpl{ ctx: ctx, cancelFunc: cancelFunc, - UID: uid, + + config: cfg, blockSubscriber: subscribers.NewBlock(), + statusSubscribers: subscribers.NewStatus(), + cosignatureSubscribers: subscribers.NewCosignature(), + partialAddedSubscribers: subscribers.NewPartialAdded(), + partialRemovedSubscribers: subscribers.NewPartialRemoved(), confirmedAddedSubscribers: subscribers.NewConfirmedAdded(), unconfirmedAddedSubscribers: subscribers.NewUnconfirmedAdded(), unconfirmedRemovedSubscribers: subscribers.NewUnconfirmedRemoved(), - partialAddedSubscribers: subscribers.NewPartialAdded(), - partialRemovedSubscribers: subscribers.NewPartialRemoved(), - statusSubscribers: subscribers.NewStatus(), - cosignatureSubscribers: subscribers.NewCosignature(), - topicHandlers: topicHandlers, - messageRouter: messageRouter, - messagePublisher: messagePublisher, - }, nil + topicHandlers: make(topicHandlers), + + connectionStatusCh: make(chan bool), + listenCh: make(chan bool), + reconnectCh: make(chan *websocket.Conn), + connectionCh: make(chan *websocket.Conn), + } + socketClient.connectFn = connect + go socketClient.handleSignal() + return socketClient, socketClient.initNewConnection() } -type Client interface { - io.Closer +func (c *CatapultWebsocketClientImpl) handleSignal() { + for { + select { + case conn := <-c.connectionCh: + c.conn = conn + c.connectionStatusCh <- true + case conn := <-c.reconnectCh: + c.closeConnection(conn) + go func() { + c.listenCh <- true + }() + + case isListen := <-c.listenCh: + if !isListen { + break + } - Listen() -} + if c.conn == nil { + err := c.initNewConnection() + if err != nil { + fmt.Println("websocket: connection is failed. Try again after wait period") + select { + case <-time.NewTicker(c.config.WsReconnectionTimeout).C: + go func() { + c.listenCh <- true + }() + continue + } + } + } -type CatapultClient interface { - Client - - AddBlockHandlers(handlers ...subscribers.BlockHandler) error - AddConfirmedAddedHandlers(address *sdk.Address, handlers ...subscribers.ConfirmedAddedHandler) error - AddUnconfirmedAddedHandlers(address *sdk.Address, handlers ...subscribers.UnconfirmedAddedHandler) error - AddUnconfirmedRemovedHandlers(address *sdk.Address, handlers ...subscribers.UnconfirmedRemovedHandler) error - AddPartialAddedHandlers(address *sdk.Address, handlers ...subscribers.PartialAddedHandler) error - AddPartialRemovedHandlers(address *sdk.Address, handlers ...subscribers.PartialRemovedHandler) error - AddStatusHandlers(address *sdk.Address, handlers ...subscribers.StatusHandler) error - AddCosignatureHandlers(address *sdk.Address, handlers ...subscribers.CosignatureHandler) error -} + err := c.updateHandlers() + if err != nil { + fmt.Println("websocket: update handles is failed. Try again after wait period") + select { + case <-time.NewTicker(c.config.WsReconnectionTimeout).C: + continue + } -type CatapultWebsocketClientImpl struct { - config *sdk.Config - conn *websocket.Conn - ctx context.Context - cancelFunc context.CancelFunc - UID string - - blockSubscriber subscribers.Block - confirmedAddedSubscribers subscribers.ConfirmedAdded - unconfirmedAddedSubscribers subscribers.UnconfirmedAdded - unconfirmedRemovedSubscribers subscribers.UnconfirmedRemoved - partialAddedSubscribers subscribers.PartialAdded - partialRemovedSubscribers subscribers.PartialRemoved - statusSubscribers subscribers.Status - cosignatureSubscribers subscribers.Cosignature - - topicHandlers TopicHandlersStorage - messageRouter Router - messagePublisher MessagePublisher - - connectFn func(cfg *sdk.Config) (*websocket.Conn, string, error) - alreadyListening bool -} + } + fmt.Println(fmt.Sprintf("websocket: connection established: %s", c.config.UsedBaseUrl.String())) -func (c *CatapultWebsocketClientImpl) Listen() { - if c.alreadyListening { - return + go func() { + + }() + } } +} - c.alreadyListening = true +func (c *CatapultWebsocketClientImpl) removeHandlers() { + c.blockSubscriber = nil + c.confirmedAddedSubscribers = nil + c.unconfirmedAddedSubscribers = nil + c.unconfirmedRemovedSubscribers = nil + c.partialAddedSubscribers = nil + c.partialRemovedSubscribers = nil + c.statusSubscribers = nil + c.cosignatureSubscribers = nil - messagesChan := make(chan []byte) + c.topicHandlers = nil +} - go func() { - defer c.cancelFunc() - - ReadMessageLoop: - for { - _, resp, e := c.conn.ReadMessage() - if e != nil { - if _, ok := e.(*net.OpError); ok { - // Stop ReadMessage goroutine if user called Close function for websocket client - return - } +func (c *CatapultWebsocketClientImpl) closeConnection(conn *websocket.Conn) { + if conn != nil { + if err := conn.Close(); err != nil { + fmt.Println(fmt.Sprintf("websocket: disconnection error: %s", err)) + } + } - if _, ok := e.(*websocket.CloseError); ok { - // Start websocket reconnect processing if connection was closed - for range time.NewTicker(c.config.WsReconnectionTimeout).C { - if err := c.reconnect(); err != nil { - continue - } +} - continue ReadMessageLoop - } - } +func (c *CatapultWebsocketClientImpl) startListener() { + for { + _, resp, e := c.conn.ReadMessage() + if e != nil { + if _, ok := e.(*net.OpError); ok { + // Stop ReadMessage if user called Close function for websocket client return } - messagesChan <- resp + if _, ok := e.(*websocket.CloseError); ok { + go func() { + c.reconnectCh <- c.conn + }() + break + } } + + go func() { + c.messageRouter.RouteMessage(resp) + }() + } +} + +func (c *CatapultWebsocketClientImpl) initNewConnection() error { + conn, uid, err := c.connectFn(c.config) + if err != nil { + return err + } + + topicHandlers := make(topicHandlers) + messagePublisher := newMessagePublisher(conn) + messageRouter := NewRouter(uid, messagePublisher, topicHandlers) + + c.UID = uid + c.topicHandlers = topicHandlers + c.messageRouter = messageRouter + c.messagePublisher = messagePublisher + go func() { + c.connectionCh <- conn }() + return nil +} - for { - select { - case <-c.ctx.Done(): - if c.conn != nil { - if err := c.conn.Close(); err != nil { - panic(err) - } - c.conn = nil - } - return - case msg := <-messagesChan: - go c.messageRouter.RouteMessage(msg) - } +func (c *CatapultWebsocketClientImpl) Listen() { + + <-c.connectionStatusCh + + go c.startListener() + + select { + case <-c.ctx.Done(): + c.closeConnection(c.conn) + c.removeHandlers() } } +func (c *CatapultWebsocketClientImpl) Close() error { + c.cancelFunc() + return nil +} func (c *CatapultWebsocketClientImpl) AddBlockHandlers(handlers ...subscribers.BlockHandler) error { if len(handlers) == 0 { return nil @@ -377,18 +459,7 @@ func (c *CatapultWebsocketClientImpl) AddCosignatureHandlers(address *sdk.Addres return nil } -func (c *CatapultWebsocketClientImpl) reconnect() error { - - conn, uid, err := c.connectFn(c.config) - if err != nil { - return err - } - - c.conn = conn - c.UID = uid - - c.messagePublisher.SetConn(conn) - c.messageRouter.SetUid(uid) +func (c *CatapultWebsocketClientImpl) updateHandlers() error { if c.blockSubscriber.HasHandlers() { if err := c.messagePublisher.PublishSubscribeMessage(c.UID, Path(fmt.Sprintf("%s", pathBlock))); err != nil { @@ -441,33 +512,6 @@ func (c *CatapultWebsocketClientImpl) reconnect() error { return nil } -func (c *CatapultWebsocketClientImpl) Close() error { - if c.conn != nil { - if err := c.conn.Close(); err != nil { - return err - } - - c.conn = nil - } - - c.cancelFunc() - - c.alreadyListening = false - - c.blockSubscriber = nil - c.confirmedAddedSubscribers = nil - c.unconfirmedAddedSubscribers = nil - c.unconfirmedRemovedSubscribers = nil - c.partialAddedSubscribers = nil - c.partialRemovedSubscribers = nil - c.statusSubscribers = nil - c.cosignatureSubscribers = nil - - c.topicHandlers = nil - - return nil -} - func connect(cfg *sdk.Config) (*websocket.Conn, string, error) { var conn *websocket.Conn var err error From 543d56520e0dd29ef6415bb54638cc44003c5819 Mon Sep 17 00:00:00 2001 From: vadimdidenkogs Date: Fri, 27 Sep 2019 12:24:44 +0300 Subject: [PATCH 2/3] Remove deprecated reconnect test --- sdk/websocket/client_test.go | 260 ----------------------------------- 1 file changed, 260 deletions(-) diff --git a/sdk/websocket/client_test.go b/sdk/websocket/client_test.go index 83b98cb7..15d27b4c 100644 --- a/sdk/websocket/client_test.go +++ b/sdk/websocket/client_test.go @@ -5,16 +5,12 @@ package websocket import ( - "context" - "fmt" "testing" - "github.com/gorilla/websocket" "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - mocks2 "github.com/proximax-storage/go-xpx-chain-sdk/mocks" mocks "github.com/proximax-storage/go-xpx-chain-sdk/mocks/subscribers" "github.com/proximax-storage/go-xpx-chain-sdk/sdk" "github.com/proximax-storage/go-xpx-chain-sdk/sdk/websocket/subscribers" @@ -969,259 +965,3 @@ func TestCatapultWebsocketClientImpl_AddCosignatureHandlers(t *testing.T) { }) } } - -func TestCatapultWebsocketClientImpl_reconnect(t *testing.T) { - type fields struct { - config *sdk.Config - conn *websocket.Conn - ctx context.Context - cancelFunc context.CancelFunc - UID string - blockSubscriber subscribers.Block - confirmedAddedSubscribers subscribers.ConfirmedAdded - unconfirmedAddedSubscribers subscribers.UnconfirmedAdded - unconfirmedRemovedSubscribers subscribers.UnconfirmedRemoved - partialAddedSubscribers subscribers.PartialAdded - partialRemovedSubscribers subscribers.PartialRemoved - statusSubscribers subscribers.Status - cosignatureSubscribers subscribers.Cosignature - topicHandlers TopicHandlersStorage - messageRouter Router - messagePublisher MessagePublisher - connectFn func(cfg *sdk.Config) (*websocket.Conn, string, error) - alreadyListening bool - } - - errorConnectFn := func(cfg *sdk.Config) (*websocket.Conn, string, error) { - return nil, "", errors.New("test error") - } - - successConnectFn := func(cfg *sdk.Config) (*websocket.Conn, string, error) { - return nil, "test-uid", nil - } - - mockRouter := new(mocks2.Router) - mockRouter.On("SetUid", mock.Anything).Return(nil) - - publisherError := errors.New("test publisher error") - mockMessagePublisher := new(MockMessagePublisher) - mockMessagePublisher. - On("SetConn", mock.Anything).Return(). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s", pathBlock))).Return(publisherError).Once(). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s", pathBlock))).Return(nil). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathConfirmedAdded, "test confirmed added address"))).Return(publisherError).Once(). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathConfirmedAdded, "test confirmed added address"))).Return(nil). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathCosignature, "test cosignature address"))).Return(publisherError).Once(). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathCosignature, "test cosignature address"))).Return(nil). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathPartialAdded, "test partial added address"))).Return(publisherError).Once(). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathPartialAdded, "test partial added address"))).Return(nil). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathPartialRemoved, "test partial removed address"))).Return(publisherError).Once(). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathPartialRemoved, "test partial removed address"))).Return(nil). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathStatus, "test status address"))).Return(publisherError).Once(). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathStatus, "test status address"))).Return(nil). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathUnconfirmedAdded, "test unconfirmed added address"))).Return(publisherError).Once(). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathUnconfirmedAdded, "test unconfirmed added address"))).Return(nil). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathUnconfirmedRemoved, "test unconfirmed removed address"))).Return(publisherError).Once(). - On("PublishSubscribeMessage", mock.Anything, Path(fmt.Sprintf("%s/%s", pathUnconfirmedRemoved, "test unconfirmed removed address"))).Return(nil) - - mockBlockSubscriber := new(mocks.Block) - mockBlockSubscriber.On("HasHandlers").Return(true) - - mockConfirmedAddedSubscriber := new(mocks.ConfirmedAdded) - mockConfirmedAddedSubscriber.On("GetAddresses").Return([]string{"test confirmed added address"}) - - mockCosignatureSubscriber := new(mocks.Cosignature) - mockCosignatureSubscriber.On("GetAddresses").Return([]string{"test cosignature address"}) - - mockPartialAddedSubscriber := new(mocks.PartialAdded) - mockPartialAddedSubscriber.On("GetAddresses").Return([]string{"test partial added address"}) - - mockPartialRemovedSubscriber := new(mocks.PartialRemoved) - mockPartialRemovedSubscriber.On("GetAddresses").Return([]string{"test partial removed address"}) - - mockStatusSubscriber := new(mocks.Status) - mockStatusSubscriber.On("GetAddresses").Return([]string{"test status address"}) - - mockUnconfirmedAddedSubscriber := new(mocks.UnconfirmedAdded) - mockUnconfirmedAddedSubscriber.On("GetAddresses").Return([]string{"test unconfirmed added address"}) - - mockUnconfirmedRemovedSubscriber := new(mocks.UnconfirmedRemoved) - mockUnconfirmedRemovedSubscriber.On("GetAddresses").Return([]string{"test unconfirmed removed address"}) - - tests := []struct { - name string - fields fields - wantErr bool - }{ - { - name: "connection error", - fields: fields{ - config: &sdk.Config{}, - connectFn: errorConnectFn, - }, - wantErr: true, - }, - { - name: "block message publisher error", - fields: fields{ - config: &sdk.Config{}, - connectFn: successConnectFn, - blockSubscriber: mockBlockSubscriber, - messagePublisher: mockMessagePublisher, - messageRouter: mockRouter, - }, - wantErr: true, - }, - { - name: "confirmed added message publisher error", - fields: fields{ - config: &sdk.Config{}, - connectFn: successConnectFn, - blockSubscriber: mockBlockSubscriber, - confirmedAddedSubscribers: mockConfirmedAddedSubscriber, - messagePublisher: mockMessagePublisher, - messageRouter: mockRouter, - }, - wantErr: true, - }, - { - name: "confirmed added message publisher error", - fields: fields{ - config: &sdk.Config{}, - connectFn: successConnectFn, - blockSubscriber: mockBlockSubscriber, - confirmedAddedSubscribers: mockConfirmedAddedSubscriber, - cosignatureSubscribers: mockCosignatureSubscriber, - messagePublisher: mockMessagePublisher, - messageRouter: mockRouter, - }, - wantErr: true, - }, - { - name: "partial added message publisher error", - fields: fields{ - config: &sdk.Config{}, - connectFn: successConnectFn, - blockSubscriber: mockBlockSubscriber, - confirmedAddedSubscribers: mockConfirmedAddedSubscriber, - cosignatureSubscribers: mockCosignatureSubscriber, - partialAddedSubscribers: mockPartialAddedSubscriber, - messagePublisher: mockMessagePublisher, - messageRouter: mockRouter, - }, - wantErr: true, - }, - { - name: "partial removed message publisher error", - fields: fields{ - config: &sdk.Config{}, - connectFn: successConnectFn, - blockSubscriber: mockBlockSubscriber, - confirmedAddedSubscribers: mockConfirmedAddedSubscriber, - cosignatureSubscribers: mockCosignatureSubscriber, - partialAddedSubscribers: mockPartialAddedSubscriber, - partialRemovedSubscribers: mockPartialRemovedSubscriber, - messagePublisher: mockMessagePublisher, - messageRouter: mockRouter, - }, - wantErr: true, - }, - { - name: "status message publisher error", - fields: fields{ - config: &sdk.Config{}, - connectFn: successConnectFn, - blockSubscriber: mockBlockSubscriber, - confirmedAddedSubscribers: mockConfirmedAddedSubscriber, - cosignatureSubscribers: mockCosignatureSubscriber, - partialAddedSubscribers: mockPartialAddedSubscriber, - partialRemovedSubscribers: mockPartialRemovedSubscriber, - statusSubscribers: mockStatusSubscriber, - unconfirmedAddedSubscribers: mockUnconfirmedAddedSubscriber, - messagePublisher: mockMessagePublisher, - messageRouter: mockRouter, - }, - wantErr: true, - }, - { - name: "unconfirmed added message publisher error", - fields: fields{ - config: &sdk.Config{}, - connectFn: successConnectFn, - blockSubscriber: mockBlockSubscriber, - confirmedAddedSubscribers: mockConfirmedAddedSubscriber, - cosignatureSubscribers: mockCosignatureSubscriber, - partialAddedSubscribers: mockPartialAddedSubscriber, - partialRemovedSubscribers: mockPartialRemovedSubscriber, - statusSubscribers: mockStatusSubscriber, - unconfirmedAddedSubscribers: mockUnconfirmedAddedSubscriber, - messagePublisher: mockMessagePublisher, - messageRouter: mockRouter, - }, - wantErr: true, - }, - { - name: "unconfirmed removed message publisher error", - fields: fields{ - config: &sdk.Config{}, - connectFn: successConnectFn, - blockSubscriber: mockBlockSubscriber, - confirmedAddedSubscribers: mockConfirmedAddedSubscriber, - cosignatureSubscribers: mockCosignatureSubscriber, - partialAddedSubscribers: mockPartialAddedSubscriber, - partialRemovedSubscribers: mockPartialRemovedSubscriber, - statusSubscribers: mockStatusSubscriber, - unconfirmedAddedSubscribers: mockUnconfirmedAddedSubscriber, - unconfirmedRemovedSubscribers: mockUnconfirmedRemovedSubscriber, - messagePublisher: mockMessagePublisher, - messageRouter: mockRouter, - }, - wantErr: true, - }, - { - name: "success", - fields: fields{ - config: &sdk.Config{}, - connectFn: successConnectFn, - blockSubscriber: mockBlockSubscriber, - confirmedAddedSubscribers: mockConfirmedAddedSubscriber, - cosignatureSubscribers: mockCosignatureSubscriber, - partialAddedSubscribers: mockPartialAddedSubscriber, - partialRemovedSubscribers: mockPartialRemovedSubscriber, - statusSubscribers: mockStatusSubscriber, - unconfirmedAddedSubscribers: mockUnconfirmedAddedSubscriber, - unconfirmedRemovedSubscribers: mockUnconfirmedRemovedSubscriber, - messagePublisher: mockMessagePublisher, - messageRouter: mockRouter, - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &CatapultWebsocketClientImpl{ - config: tt.fields.config, - conn: tt.fields.conn, - ctx: tt.fields.ctx, - cancelFunc: tt.fields.cancelFunc, - UID: tt.fields.UID, - blockSubscriber: tt.fields.blockSubscriber, - confirmedAddedSubscribers: tt.fields.confirmedAddedSubscribers, - unconfirmedAddedSubscribers: tt.fields.unconfirmedAddedSubscribers, - unconfirmedRemovedSubscribers: tt.fields.unconfirmedRemovedSubscribers, - partialAddedSubscribers: tt.fields.partialAddedSubscribers, - partialRemovedSubscribers: tt.fields.partialRemovedSubscribers, - statusSubscribers: tt.fields.statusSubscribers, - cosignatureSubscribers: tt.fields.cosignatureSubscribers, - topicHandlers: tt.fields.topicHandlers, - messageRouter: tt.fields.messageRouter, - messagePublisher: tt.fields.messagePublisher, - alreadyListening: tt.fields.alreadyListening, - connectFn: tt.fields.connectFn, - } - - err := c.reconnect() - assert.Equal(t, err != nil, tt.wantErr) - }) - } -} From a0f68091323dda3d9ced26f7217d9d67fd04f6c0 Mon Sep 17 00:00:00 2001 From: vadimdidenkogs Date: Fri, 27 Sep 2019 14:10:57 +0300 Subject: [PATCH 3/3] Update client --- sdk/websocket/client.go | 245 ++++++++++++++++++++-------------------- 1 file changed, 120 insertions(+), 125 deletions(-) diff --git a/sdk/websocket/client.go b/sdk/websocket/client.go index 8e23adac..e9c8b1c9 100644 --- a/sdk/websocket/client.go +++ b/sdk/websocket/client.go @@ -63,10 +63,10 @@ type ( topicHandlers TopicHandlersStorage messagePublisher MessagePublisher - connectionStatusCh chan bool - listenCh chan bool // channel for manage current listen status for connection - reconnectCh chan *websocket.Conn // channel for connection with we will close, and open new connection - connectionCh chan *websocket.Conn // channel for new opened connection + // connectionStatusCh chan bool + listenCh chan bool // channel for manage current listen status for connection + reconnectCh chan *websocket.Conn // channel for connection with we will close, and open new connection + connectionCh chan *websocket.Conn // channel for new opened connection connectFn func(cfg *sdk.Config) (*websocket.Conn, string, error) } @@ -111,136 +111,25 @@ func NewClient(ctx context.Context, cfg *sdk.Config) (CatapultClient, error) { topicHandlers: make(topicHandlers), - connectionStatusCh: make(chan bool), - listenCh: make(chan bool), - reconnectCh: make(chan *websocket.Conn), - connectionCh: make(chan *websocket.Conn), - } - socketClient.connectFn = connect - go socketClient.handleSignal() - return socketClient, socketClient.initNewConnection() -} - -func (c *CatapultWebsocketClientImpl) handleSignal() { - for { - select { - case conn := <-c.connectionCh: - c.conn = conn - c.connectionStatusCh <- true - case conn := <-c.reconnectCh: - c.closeConnection(conn) - go func() { - c.listenCh <- true - }() - - case isListen := <-c.listenCh: - if !isListen { - break - } - - if c.conn == nil { - err := c.initNewConnection() - if err != nil { - fmt.Println("websocket: connection is failed. Try again after wait period") - select { - case <-time.NewTicker(c.config.WsReconnectionTimeout).C: - go func() { - c.listenCh <- true - }() - continue - } - } - } + listenCh: make(chan bool), + reconnectCh: make(chan *websocket.Conn), + connectionCh: make(chan *websocket.Conn), - err := c.updateHandlers() - if err != nil { - fmt.Println("websocket: update handles is failed. Try again after wait period") - select { - case <-time.NewTicker(c.config.WsReconnectionTimeout).C: - continue - } - - } - fmt.Println(fmt.Sprintf("websocket: connection established: %s", c.config.UsedBaseUrl.String())) - - go func() { - - }() - } + connectFn: connect, } -} -func (c *CatapultWebsocketClientImpl) removeHandlers() { - c.blockSubscriber = nil - c.confirmedAddedSubscribers = nil - c.unconfirmedAddedSubscribers = nil - c.unconfirmedRemovedSubscribers = nil - c.partialAddedSubscribers = nil - c.partialRemovedSubscribers = nil - c.statusSubscribers = nil - c.cosignatureSubscribers = nil - - c.topicHandlers = nil -} - -func (c *CatapultWebsocketClientImpl) closeConnection(conn *websocket.Conn) { - if conn != nil { - if err := conn.Close(); err != nil { - fmt.Println(fmt.Sprintf("websocket: disconnection error: %s", err)) - } - } - -} - -func (c *CatapultWebsocketClientImpl) startListener() { - - for { - _, resp, e := c.conn.ReadMessage() - if e != nil { - if _, ok := e.(*net.OpError); ok { - // Stop ReadMessage if user called Close function for websocket client - return - } - - if _, ok := e.(*websocket.CloseError); ok { - go func() { - c.reconnectCh <- c.conn - }() - break - } - } - - go func() { - c.messageRouter.RouteMessage(resp) - }() - } -} + go socketClient.handleSignal() -func (c *CatapultWebsocketClientImpl) initNewConnection() error { - conn, uid, err := c.connectFn(c.config) - if err != nil { - return err + if err := socketClient.initNewConnection(); err != nil { + return socketClient, err } - topicHandlers := make(topicHandlers) - messagePublisher := newMessagePublisher(conn) - messageRouter := NewRouter(uid, messagePublisher, topicHandlers) - - c.UID = uid - c.topicHandlers = topicHandlers - c.messageRouter = messageRouter - c.messagePublisher = messagePublisher - go func() { - c.connectionCh <- conn - }() - return nil + return socketClient, nil } func (c *CatapultWebsocketClientImpl) Listen() { - <-c.connectionStatusCh - - go c.startListener() + c.listenCh <- true select { case <-c.ctx.Done(): @@ -459,9 +348,115 @@ func (c *CatapultWebsocketClientImpl) AddCosignatureHandlers(address *sdk.Addres return nil } +func (c *CatapultWebsocketClientImpl) handleSignal() { + for { + select { + case conn := <-c.connectionCh: + c.conn = conn + case conn := <-c.reconnectCh: + c.closeConnection(conn) + go func() { + c.listenCh <- false + }() + + case <-c.listenCh: + + if c.conn == nil { + err := c.initNewConnection() + if err != nil { + fmt.Println("websocket: connection is failed. Try again after wait period") + select { + case <-time.NewTicker(c.config.WsReconnectionTimeout).C: + go func() { + c.listenCh <- true + }() + continue + } + } + } + + err := c.updateHandlers() + if err != nil { + fmt.Println("websocket: update handles is failed. Try again after timeout period") + select { + case <-time.NewTicker(c.config.WsReconnectionTimeout).C: + continue + } + + } + fmt.Println(fmt.Sprintf("websocket: connection established: %s", c.config.UsedBaseUrl.String())) + c.startListener() + } + } +} + +func (c *CatapultWebsocketClientImpl) removeHandlers() { + c.blockSubscriber = nil + c.confirmedAddedSubscribers = nil + c.unconfirmedAddedSubscribers = nil + c.unconfirmedRemovedSubscribers = nil + c.partialAddedSubscribers = nil + c.partialRemovedSubscribers = nil + c.statusSubscribers = nil + c.cosignatureSubscribers = nil + + c.topicHandlers = nil +} + +func (c *CatapultWebsocketClientImpl) closeConnection(conn *websocket.Conn) { + if conn != nil { + if err := conn.Close(); err != nil { + fmt.Println(fmt.Sprintf("websocket: disconnection error: %s", err)) + } + } + c.conn = nil +} + +func (c *CatapultWebsocketClientImpl) startListener() { + + for { + _, resp, e := c.conn.ReadMessage() + if e != nil { + if _, ok := e.(*net.OpError); ok { + // Stop ReadMessage if user called Close function for websocket client + return + } + + if _, ok := e.(*websocket.CloseError); ok { + go func() { + c.reconnectCh <- c.conn + }() + return + } + } + + go func() { + c.messageRouter.RouteMessage(resp) + }() + } +} + +func (c *CatapultWebsocketClientImpl) initNewConnection() error { + conn, uid, err := c.connectFn(c.config) + if err != nil { + return err + } + + c.UID = uid + c.conn = conn + + messagePublisher := newMessagePublisher(c.conn) + messageRouter := NewRouter(c.UID, messagePublisher, c.topicHandlers) + + c.messageRouter = messageRouter + c.messagePublisher = messagePublisher + + return nil +} + func (c *CatapultWebsocketClientImpl) updateHandlers() error { - if c.blockSubscriber.HasHandlers() { + if c.topicHandlers.HasHandler(pathBlock) { if err := c.messagePublisher.PublishSubscribeMessage(c.UID, Path(fmt.Sprintf("%s", pathBlock))); err != nil { return err }