From 49f46969a4e2a845c9869ee963eba735916ab741 Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Sun, 27 Aug 2017 15:12:45 +0800 Subject: [PATCH] exit when encounter error --- conn.go | 7 +++--- examples/config.client.yml | 2 +- gota/cmd/server.go | 17 ++++++++----- misc.go | 6 +++++ tunnel.go | 49 +++++++++++++++++++++++++------------- 5 files changed, 55 insertions(+), 26 deletions(-) diff --git a/conn.go b/conn.go index 77e0ef2..95da42e 100644 --- a/conn.go +++ b/conn.go @@ -469,6 +469,8 @@ func (ch *ConnHandler) CreateFastOpenConn() { } func (ch *ConnHandler) readFromTunnel() { + defer Recover() + drop := func(c chan *GotaFrame) { for gf := range c { log.Warnf("CH: Connection %d closed, Gota Frame dropped", gf.ConnID) @@ -492,8 +494,6 @@ func (ch *ConnHandler) readFromTunnel() { } }() - defer Recover() - var seq uint32 seq = 0 cache := make(map[uint32][]byte) @@ -560,6 +560,8 @@ func (ch *ConnHandler) readFromTunnel() { } func (ch *ConnHandler) writeToTunnel() { + defer Recover() + defer func() { ch.mutex.Lock() defer ch.mutex.Unlock() @@ -575,7 +577,6 @@ func (ch *ConnHandler) writeToTunnel() { } }() - defer Recover() // read io.EOF and send CloseWrite signal var seq uint32 seq = 0 diff --git a/examples/config.client.yml b/examples/config.client.yml index 8adb8a4..122cb97 100644 --- a/examples/config.client.yml +++ b/examples/config.client.yml @@ -17,5 +17,5 @@ fastopen: true # Gota server addresses with port tunnel: - remote: 127.0.0.1:12333 # connect server directly - #- remote: 127.0.0.1:12336 + - remote: 127.0.0.1:12336 # proxy: http://gota:gota@127.0.0.1:3128 # connect server using proxy, currently Gota support http/https/socks5 proxy diff --git a/gota/cmd/server.go b/gota/cmd/server.go index ea1b659..ff5714c 100644 --- a/gota/cmd/server.go +++ b/gota/cmd/server.go @@ -26,6 +26,7 @@ import ( _ "net/http/pprof" "os" "os/signal" + "sync" "syscall" ) @@ -84,22 +85,26 @@ var serverCmd = &cobra.Command{ } sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + wg := sync.WaitGroup{} go func() { sig := <-sigs log.Infof("Received signal: %s, exit gota server", sig) - cmClosed := make(chan struct{}) + wg.Add(2) go func() { client.ConnManager.Stop() - close(cmClosed) + wg.Done() + }() + go func() { + client.TunnelManager.Stop() + wg.Done() }() - client.TunnelManager.Stop() - <-cmClosed - os.Exit(0) }() client.Serve(viper.GetString("remote")) + wg.Wait() + } else { log.Error("Gota: Can not parse tunnel config") } diff --git a/misc.go b/misc.go index fe3c49e..0a64f40 100644 --- a/misc.go +++ b/misc.go @@ -6,6 +6,7 @@ import ( "errors" log "github.com/Sirupsen/logrus" "io" + "os" "strings" "time" ) @@ -142,3 +143,8 @@ func SetLogLevel(l string) { log.SetLevel(log.DebugLevel) } } + +func ShutdownGota() { + process, _ := os.FindProcess(os.Getpid()) + process.Signal(os.Interrupt) +} diff --git a/tunnel.go b/tunnel.go index 32af265..66215af 100644 --- a/tunnel.go +++ b/tunnel.go @@ -267,7 +267,7 @@ func (tm *TunnelManager) listenAndServe(config TunnelPassiveConfig) { tm.poolLock.Unlock() tm.poolLock.RLock() - t := NewTunnelTransport(tm.writePool[client], tm.readPool[client], conn, client) + t := NewTunnelTransport(tm.writePool[client], tm.readPool[client], conn, PassiveMode, client) tm.poolLock.RUnlock() t.SetCCIDChannel(tm.newCCIDChannel) @@ -367,7 +367,7 @@ func (tm *TunnelManager) connectAndServe(config TunnelActiveConfig, client Clien tm.poolLock.Unlock() tm.poolLock.RLock() - t := NewTunnelTransport(tm.writePool[client], tm.readPool[client], conn, client) + t := NewTunnelTransport(tm.writePool[client], tm.readPool[client], conn, ActiveMode, client) tm.poolLock.RUnlock() tm.ttPool = append(tm.ttPool, t) @@ -427,6 +427,7 @@ type TunnelTransport struct { clientID ClientID quit chan struct{} + mode int mutex sync.Locker writeStopped bool @@ -445,7 +446,7 @@ type TunnelTransport struct { rw io.ReadWriteCloser } -func NewTunnelTransport(wp, rp chan<- chan *GotaFrame, rw io.ReadWriteCloser, clientID ...ClientID) *TunnelTransport { +func NewTunnelTransport(wp, rp chan<- chan *GotaFrame, rw io.ReadWriteCloser, mode int, clientID ...ClientID) *TunnelTransport { var c ClientID if clientID != nil { c = clientID[0] @@ -462,6 +463,7 @@ func NewTunnelTransport(wp, rp chan<- chan *GotaFrame, rw io.ReadWriteCloser, cl mutex: &sync.Mutex{}, clientID: c, + mode: mode, newCCIDChannel: nil, @@ -497,7 +499,14 @@ func (t *TunnelTransport) readFromPeerTunnel() { for { gf, err := ReadGotaFrame(t.rw) if err != nil { - log.Errorf("TT: Received gota frame error, stop this worker, error: %s", err) + log.Errorf("TT: Error: %s", err) + select { + case <-t.quit: + return + default: + log.Errorf("TT: Received gota frame error, stop this worker, error: %s", err) + t.Stop() + } return } gf.clientID = t.clientID @@ -509,7 +518,7 @@ func (t *TunnelTransport) readFromPeerTunnel() { case TMHeartBeatPingSeq: go t.sendHeartBeatResponse() case TMHeartBeatPongSeq: - log.Info("TT: Received Hearbeat Pong") + log.Info("TT: Received Heartbeat Pong") case TMCreateConnSeq: log.Debug("TT: Received Create Connection Signal") @@ -602,14 +611,14 @@ Loop: log.Errorf("TT: Send heartbeat failed, stop this worker, error: \"%s\"", err) break Loop } - log.Info("TT: Sent Hearbeat Ping") + log.Info("TT: Sent Heartbeat Ping") case <-tick.C: err := t.sendHeartBeatRequest() if err != nil { log.Errorf("TT: Send heartbeat failed, stop this worker, error: \"%s\"", err) break Loop } - log.Info("TT: Sent Hearbeat Ping") + log.Info("TT: Sent Heartbeat Ping") case <-t.quit: // TODO if the read channel already registered to the read pool @@ -632,15 +641,16 @@ func (t *TunnelTransport) sendHeartBeatRequest() error { func (t *TunnelTransport) sendHeartBeatResponse() { t.readChannel <- TMHeartBeatPongGotaFrame - log.Info("TT: Sent Hearbeat Pong") + log.Info("TT: Sent Heartbeat Pong") } func (t *TunnelTransport) sendCloseTunnelRequest() error { + log.Info("TT: Sent Close Tunnel request") err := WriteNBytes(t.rw, HeaderLength, TMCloseTunnelBytes) if err != nil { - return err + log.Errorf("TT: Sent Close Tunnel request error: %s", err) } - return nil + return err } func (t *TunnelTransport) sendCloseTunnelResponse() { @@ -651,6 +661,7 @@ func (t *TunnelTransport) sendCloseTunnelResponse() { log.Info("TT: Sent Close Tunnel response") t.readStopped = true close(t.quit) + t.Stop() } // Start method starts the run loop for the worker, listening for a quit channel in @@ -665,23 +676,29 @@ func (t *TunnelTransport) Stop() { t.mutex.Lock() defer t.mutex.Unlock() + defer func() { + if t.mode == ActiveMode { + ShutdownGota() + } + }() + if t.readStopped && t.writeStopped { return } + // close a channel will trigger all reading from the channel to return immediately close(t.quit) timeout := 0 for { + if t.readStopped && t.writeStopped { + return + } if timeout < TMHeartBeatSecond { time.Sleep(time.Second) timeout++ } else { - if t.readStopped && t.writeStopped { - return - } else { - t.rw.Close() - return - } + t.rw.Close() + return } } }