Skip to content

Commit

Permalink
リトライ待ちの間にクライアントから切断された場合に、リトライ待ちでブロックされないようにする
Browse files Browse the repository at this point in the history
  • Loading branch information
Hexa committed Nov 26, 2024
1 parent 084539d commit 0681e4e
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 22 deletions.
71 changes: 51 additions & 20 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,15 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte
channelCount := uint16(s.config.ChannelCount)

d := time.Duration(s.config.TimeToWaitForOpusPacketMs) * time.Millisecond
r := NewOpusReader(*s.config, d, c.Request().Body)
defer r.Close()
opusReader := NewOpusReader(*s.config, d, c.Request().Body)
defer opusReader.Close()

var r io.Reader
if s.config.AudioStreamingHeader {
r = readPacketWithHeader(opusReader)
} else {
r = opusReader
}

serviceHandler, err := getServiceHandler(serviceType, *s.config, h.SoraChannelID, h.SoraConnectionID, sampleRate, channelCount, languageCode, onResultFunc)
if err != nil {
Expand Down Expand Up @@ -154,10 +161,44 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte
if s.config.MaxRetry > serviceHandler.GetRetryCount() {
serviceHandler.UpdateRetryCount()

// 切断検知のために、クライアントから送られてくるパケットは受信し続ける
errCh := make(chan error)
ctx, cancelPacketDiscard := context.WithCancel(ctx)
defer cancelPacketDiscard()

// TODO: 関数化
go func(ctx context.Context) {
defer close(errCh)

// サービス側には接続していないため、パケットは破棄する
buf := make([]byte, HeaderLength+MaxPayloadLength)
for {
select {
case <-ctx.Done():
return
default:
if _, err := r.Read(buf); err != nil {
errCh <- err
return
}
}
}
}(ctx)

// 連続のリトライを避けるために少し待つ
time.Sleep(time.Duration(s.config.RetryIntervalMs) * time.Millisecond)
retryTimer := time.NewTimer(time.Duration(s.config.RetryIntervalMs) * time.Millisecond)
select {
case <-retryTimer.C:
retryTimer.Stop()
cancelPacketDiscard()
// リトライ対象のエラーのため、クライアントとの接続は切らずにリトライする
break
case err := <-errCh:
retryTimer.Stop()
// リトライする前にクライアントとの接続でエラーが発生した場合は終了する
return err
}

// リトライ対象のエラーのため、クライアントとの接続は切らずにリトライする
continue
}
}
Expand Down Expand Up @@ -272,6 +313,8 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte
Str("connection_id", h.SoraConnectionID).
Send()

orgErr := err

errMessage, err := json.Marshal(NewSuzuErrorResponse(err))
if err != nil {
zlog.Error().
Expand All @@ -293,7 +336,7 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte
c.Response().Flush()

// サーバから切断されたが再度の接続が期待できない場合
return err
return orgErr
}

// メッセージが空でない場合はクライアントに結果を送信する
Expand All @@ -313,9 +356,7 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte
}
}

const ()

func readPacketWithHeader(reader io.Reader) (io.Reader, error) {
func readPacketWithHeader(reader io.Reader) io.Reader {
r, w := io.Pipe()

go func() {
Expand Down Expand Up @@ -393,7 +434,7 @@ func readPacketWithHeader(reader io.Reader) (io.Reader, error) {
}
}()

return r, nil
return r
}

func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sampleRate uint32, channelCount uint16, c Config) error {
Expand All @@ -406,24 +447,14 @@ func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sa
}
defer o.Close()

var r io.Reader
if c.AudioStreamingHeader {
r, err = readPacketWithHeader(opusReader)
if err != nil {
return err
}
} else {
r = opusReader
}

ch := make(chan []byte)

go func() {
defer close(ch)

for {
buf := make([]byte, FrameSize)
n, err := r.Read(buf)
n, err := opusReader.Read(buf)
if err != nil {
if w, ok := oggWriter.(*io.PipeWriter); ok {
w.CloseWithError(err)
Expand Down
3 changes: 1 addition & 2 deletions handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,7 @@ func TestReadPacketWithHeader(t *testing.T) {
}
}()

r, err := readPacketWithHeader(reader)
assert.NoError(t, err)
r := readPacketWithHeader(reader)

i := 0
for {
Expand Down

0 comments on commit 0681e4e

Please sign in to comment.