From 0681e4e2ecb9551268adfd49a1c0959a1745db26 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Tue, 26 Nov 2024 12:08:19 +0900 Subject: [PATCH] =?UTF-8?q?=E3=83=AA=E3=83=88=E3=83=A9=E3=82=A4=E5=BE=85?= =?UTF-8?q?=E3=81=A1=E3=81=AE=E9=96=93=E3=81=AB=E3=82=AF=E3=83=A9=E3=82=A4?= =?UTF-8?q?=E3=82=A2=E3=83=B3=E3=83=88=E3=81=8B=E3=82=89=E5=88=87=E6=96=AD?= =?UTF-8?q?=E3=81=95=E3=82=8C=E3=81=9F=E5=A0=B4=E5=90=88=E3=81=AB=E3=80=81?= =?UTF-8?q?=E3=83=AA=E3=83=88=E3=83=A9=E3=82=A4=E5=BE=85=E3=81=A1=E3=81=A7?= =?UTF-8?q?=E3=83=96=E3=83=AD=E3=83=83=E3=82=AF=E3=81=95=E3=82=8C=E3=81=AA?= =?UTF-8?q?=E3=81=84=E3=82=88=E3=81=86=E3=81=AB=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler.go | 71 +++++++++++++++++++++++++++++++++++-------------- handler_test.go | 3 +-- 2 files changed, 52 insertions(+), 22 deletions(-) diff --git a/handler.go b/handler.go index 0250279..78781b2 100644 --- a/handler.go +++ b/handler.go @@ -116,8 +116,15 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte channelCount := uint16(s.config.ChannelCount) d := time.Duration(s.config.TimeToWaitForOpusPacketMs) * time.Millisecond - r := NewOpusReader(*s.config, d, c.Request().Body) - defer r.Close() + opusReader := NewOpusReader(*s.config, d, c.Request().Body) + defer opusReader.Close() + + var r io.Reader + if s.config.AudioStreamingHeader { + r = readPacketWithHeader(opusReader) + } else { + r = opusReader + } serviceHandler, err := getServiceHandler(serviceType, *s.config, h.SoraChannelID, h.SoraConnectionID, sampleRate, channelCount, languageCode, onResultFunc) if err != nil { @@ -154,10 +161,44 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte if s.config.MaxRetry > serviceHandler.GetRetryCount() { serviceHandler.UpdateRetryCount() + // 切断検知のために、クライアントから送られてくるパケットは受信し続ける + errCh := make(chan error) + ctx, cancelPacketDiscard := context.WithCancel(ctx) + defer cancelPacketDiscard() + + // TODO: 関数化 + go func(ctx context.Context) { + defer close(errCh) + + // サービス側には接続していないため、パケットは破棄する + buf := make([]byte, HeaderLength+MaxPayloadLength) + for { + select { + case <-ctx.Done(): + return + default: + if _, err := r.Read(buf); err != nil { + errCh <- err + return + } + } + } + }(ctx) + // 連続のリトライを避けるために少し待つ - time.Sleep(time.Duration(s.config.RetryIntervalMs) * time.Millisecond) + retryTimer := time.NewTimer(time.Duration(s.config.RetryIntervalMs) * time.Millisecond) + select { + case <-retryTimer.C: + retryTimer.Stop() + cancelPacketDiscard() + // リトライ対象のエラーのため、クライアントとの接続は切らずにリトライする + break + case err := <-errCh: + retryTimer.Stop() + // リトライする前にクライアントとの接続でエラーが発生した場合は終了する + return err + } - // リトライ対象のエラーのため、クライアントとの接続は切らずにリトライする continue } } @@ -272,6 +313,8 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte Str("connection_id", h.SoraConnectionID). Send() + orgErr := err + errMessage, err := json.Marshal(NewSuzuErrorResponse(err)) if err != nil { zlog.Error(). @@ -293,7 +336,7 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte c.Response().Flush() // サーバから切断されたが再度の接続が期待できない場合 - return err + return orgErr } // メッセージが空でない場合はクライアントに結果を送信する @@ -313,9 +356,7 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte } } -const () - -func readPacketWithHeader(reader io.Reader) (io.Reader, error) { +func readPacketWithHeader(reader io.Reader) io.Reader { r, w := io.Pipe() go func() { @@ -393,7 +434,7 @@ func readPacketWithHeader(reader io.Reader) (io.Reader, error) { } }() - return r, nil + return r } func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sampleRate uint32, channelCount uint16, c Config) error { @@ -406,16 +447,6 @@ func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sa } defer o.Close() - var r io.Reader - if c.AudioStreamingHeader { - r, err = readPacketWithHeader(opusReader) - if err != nil { - return err - } - } else { - r = opusReader - } - ch := make(chan []byte) go func() { @@ -423,7 +454,7 @@ func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sa for { buf := make([]byte, FrameSize) - n, err := r.Read(buf) + n, err := opusReader.Read(buf) if err != nil { if w, ok := oggWriter.(*io.PipeWriter); ok { w.CloseWithError(err) diff --git a/handler_test.go b/handler_test.go index b7192ad..39e1913 100644 --- a/handler_test.go +++ b/handler_test.go @@ -288,8 +288,7 @@ func TestReadPacketWithHeader(t *testing.T) { } }() - r, err := readPacketWithHeader(reader) - assert.NoError(t, err) + r := readPacketWithHeader(reader) i := 0 for {