From c0035deed364805eda2866c0e996088686baec68 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Wed, 20 Nov 2024 15:56:07 +0900 Subject: [PATCH 01/22] =?UTF-8?q?JSON=20=E3=81=A7=E3=81=AF=E3=81=AA?= =?UTF-8?q?=E3=81=8F=E3=82=A8=E3=83=A9=E3=83=BC=E3=82=92=E8=BF=94=E3=81=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- amazon_transcribe_handler.go | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/amazon_transcribe_handler.go b/amazon_transcribe_handler.go index e374d3f..b7ab055 100644 --- a/amazon_transcribe_handler.go +++ b/amazon_transcribe_handler.go @@ -195,27 +195,8 @@ func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader) switch err.(type) { case *transcribestreamingservice.LimitExceededException, *transcribestreamingservice.InternalFailureException: - // リトライしない設定の場合、または、max_retry を超えた場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する - if (at.Config.MaxRetry < 1) || (at.Config.MaxRetry <= h.GetRetryCount()) { - if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil { - zlog.Error(). - Err(err). - Str("channel_id", h.ChannelID). - Str("connection_id", h.ConnectionID). - Send() - } - } - err = ErrServerDisconnected default: - // 再接続を想定している以外のエラーの場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する - if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil { - zlog.Error(). - Err(err). - Str("channel_id", h.ChannelID). - Str("connection_id", h.ConnectionID). - Send() - } } w.CloseWithError(err) From d94ffaae5e0cfe69bca71468beb8cd6c05c8bce5 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Thu, 21 Nov 2024 15:49:25 +0900 Subject: [PATCH 02/22] =?UTF-8?q?-=20=E3=83=AA=E3=83=88=E3=83=A9=E3=82=A4?= =?UTF-8?q?=E6=99=82=E3=81=AB=20ctx=20=E3=82=92=20cancel=20=E3=81=99?= =?UTF-8?q?=E3=82=8B=20-=20=E3=83=91=E3=82=B1=E3=83=83=E3=83=88=E8=AA=AD?= =?UTF-8?q?=E3=81=BF=E8=BE=BC=E3=81=BF=E3=81=A7=E3=83=96=E3=83=AD=E3=83=83?= =?UTF-8?q?=E3=82=AF=E3=81=97=E3=81=A6=E3=81=84=E3=82=8B=E3=81=9F=E3=82=81?= =?UTF-8?q?=E3=80=81ogg=20=E5=A4=89=E6=8F=9B=E3=81=A8=E5=87=A6=E7=90=86?= =?UTF-8?q?=E3=82=92=E5=88=86=E3=81=91=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler.go | 58 +++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 49 insertions(+), 9 deletions(-) diff --git a/handler.go b/handler.go index 17683c2..c50e3c7 100644 --- a/handler.go +++ b/handler.go @@ -137,6 +137,10 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte Int("retry_count", serviceHandler.GetRetryCount()). Msg("NEW-REQUEST") + // リトライ時にこれ以降の処理のみを cancel する + ctx, cancel := context.WithCancel(ctx) + defer cancel() + reader, err := serviceHandler.Handle(ctx, r) if err != nil { zlog.Error(). @@ -196,7 +200,13 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte serviceHandler.UpdateRetryCount() // TODO: 必要な場合は連続のリトライを避けるために少し待つ処理を追加する + zlog.Debug().Err(err). + Str("channel_id", h.SoraChannelID). + Str("connection_id", h.SoraConnectionID). + Int("retry_count", serviceHandler.GetRetryCount()). + Msg("RETRYING") + cancel() break } else { zlog.Error(). @@ -215,6 +225,11 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte // 1 度でも接続結果を受け取れた場合はリトライ回数をリセットする serviceHandler.ResetRetryCount() + zlog.Debug(). + Str("channel_id", h.SoraChannelID). + Str("connection_id", h.SoraConnectionID). + Int("retry_count", serviceHandler.GetRetryCount()). + Msg("RESET_RETRY_COUNT") // メッセージが空でない場合はクライアントに結果を送信する if n > 0 { @@ -336,19 +351,44 @@ func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sa r = opusReader } - for { - buf := make([]byte, FrameSize) - n, err := r.Read(buf) - if err != nil { - if w, ok := oggWriter.(*io.PipeWriter); ok { - w.CloseWithError(err) + ch := make(chan []byte) + + go func() { + defer close(ch) + + for { + buf := make([]byte, FrameSize) + n, err := r.Read(buf) + if err != nil { + if w, ok := oggWriter.(*io.PipeWriter); ok { + w.CloseWithError(err) + } + return + } + + if n > 0 { + ch <- buf[:n] } - return err } + }() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case buf, ok := <-ch: + if !ok { + return nil + } + + if !ok { + if w, ok := oggWriter.(*io.PipeWriter); ok { + w.CloseWithError(err) + } + } - if n > 0 { opus := codecs.OpusPacket{} - _, err := opus.Unmarshal(buf[:n]) + _, err := opus.Unmarshal(buf) if err != nil { if w, ok := oggWriter.(*io.PipeWriter); ok { w.CloseWithError(err) From b8a17ec84070d9fcce8a245445f688db608516b2 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Fri, 22 Nov 2024 12:30:53 +0900 Subject: [PATCH 03/22] =?UTF-8?q?=E3=82=A8=E3=83=A9=E3=83=BC=E3=82=92?= =?UTF-8?q?=E8=BF=94=E3=81=99=E3=82=88=E3=81=86=E3=81=AB=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- speech_to_text_handler.go | 47 +++++++++------------------------------ 1 file changed, 11 insertions(+), 36 deletions(-) diff --git a/speech_to_text_handler.go b/speech_to_text_handler.go index 5c85fd6..33be247 100644 --- a/speech_to_text_handler.go +++ b/speech_to_text_handler.go @@ -147,53 +147,28 @@ func (h *SpeechToTextHandler) Handle(ctx context.Context, reader io.Reader) (*io w.CloseWithError(err) return } + if status := resp.Error; status != nil { // 音声の長さの上限値に達した場合 + err := fmt.Errorf("%s", status.GetMessage()) code := codes.Code(status.GetCode()) - if code == codes.OutOfRange || - code == codes.InvalidArgument || - code == codes.ResourceExhausted { - - err := fmt.Errorf(status.GetMessage()) - zlog.Error(). - Err(err). - Str("channel_id", h.ChannelID). - Str("connection_id", h.ConnectionID). - Int32("code", status.GetCode()). - Send() - - // リトライしない設定の場合、または、max_retry を超えた場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する - if (stt.Config.MaxRetry < 1) || (stt.Config.MaxRetry <= h.GetRetryCount()) { - if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil { - zlog.Error(). - Err(err). - Str("channel_id", h.ChannelID). - Str("connection_id", h.ConnectionID). - Send() - } - } - - w.CloseWithError(ErrServerDisconnected) - return - } - errMessage := status.GetMessage() zlog.Error(). + Err(err). Str("channel_id", h.ChannelID). Str("connection_id", h.ConnectionID). Int32("code", status.GetCode()). - Msg(errMessage) + Send() - err := fmt.Errorf(errMessage) - if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil { - zlog.Error(). - Err(err). - Str("channel_id", h.ChannelID). - Str("connection_id", h.ConnectionID). - Send() + if code == codes.OutOfRange || + code == codes.InvalidArgument || + code == codes.ResourceExhausted { + + w.CloseWithError(ErrServerDisconnected) + return } - w.Close() + w.CloseWithError(err) return } From da9251add2951c2a646dc5fbf156a66266c39fc3 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Fri, 22 Nov 2024 12:32:13 +0900 Subject: [PATCH 04/22] =?UTF-8?q?=E3=82=B5=E3=83=BC=E3=83=93=E3=82=B9?= =?UTF-8?q?=E3=81=B8=E3=81=AE=E6=8E=A5=E7=B6=9A=E3=81=8C=E6=88=90=E5=8A=9F?= =?UTF-8?q?=E3=81=97=E3=81=9F=E6=99=82=E7=82=B9=E3=81=A7=E3=83=AA=E3=83=88?= =?UTF-8?q?=E3=83=A9=E3=82=A4=E5=9B=9E=E6=95=B0=E3=82=92=E3=83=AA=E3=82=BB?= =?UTF-8?q?=E3=83=83=E3=83=88=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- amazon_transcribe_handler.go | 4 ++++ handler.go | 8 -------- speech_to_text_handler.go | 2 ++ 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/amazon_transcribe_handler.go b/amazon_transcribe_handler.go index b7ab055..b9d375c 100644 --- a/amazon_transcribe_handler.go +++ b/amazon_transcribe_handler.go @@ -120,6 +120,9 @@ func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader) return nil, err } + // リクエストが成功した時点でリトライカウントをリセットする + h.ResetRetryCount() + r, w := io.Pipe() go func() { @@ -195,6 +198,7 @@ func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader) switch err.(type) { case *transcribestreamingservice.LimitExceededException, *transcribestreamingservice.InternalFailureException: + // TODO: 元の err を送信する err = ErrServerDisconnected default: } diff --git a/handler.go b/handler.go index c50e3c7..c119723 100644 --- a/handler.go +++ b/handler.go @@ -223,14 +223,6 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte return err } - // 1 度でも接続結果を受け取れた場合はリトライ回数をリセットする - serviceHandler.ResetRetryCount() - zlog.Debug(). - Str("channel_id", h.SoraChannelID). - Str("connection_id", h.SoraConnectionID). - Int("retry_count", serviceHandler.GetRetryCount()). - Msg("RESET_RETRY_COUNT") - // メッセージが空でない場合はクライアントに結果を送信する if n > 0 { if _, err := c.Response().Write(buf[:n]); err != nil { diff --git a/speech_to_text_handler.go b/speech_to_text_handler.go index 33be247..7141f9d 100644 --- a/speech_to_text_handler.go +++ b/speech_to_text_handler.go @@ -115,6 +115,8 @@ func (h *SpeechToTextHandler) Handle(ctx context.Context, reader io.Reader) (*io return nil, err } + h.ResetRetryCount() + r, w := io.Pipe() go func() { From 084539d4acc22627e3dc0e9d3005e31a602e6dd4 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Fri, 22 Nov 2024 16:40:40 +0900 Subject: [PATCH 05/22] =?UTF-8?q?=E5=85=83=E3=81=AE=E3=82=A8=E3=83=A9?= =?UTF-8?q?=E3=83=BC=E3=82=92=E3=82=AF=E3=83=A9=E3=82=A4=E3=82=A2=E3=83=B3?= =?UTF-8?q?=E3=83=88=E3=81=AB=E9=80=81=E4=BF=A1=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- amazon_transcribe_handler.go | 3 +- handler.go | 75 +++++++++++++++++++++++++++++++++++- speech_to_text_handler.go | 3 +- 3 files changed, 77 insertions(+), 4 deletions(-) diff --git a/amazon_transcribe_handler.go b/amazon_transcribe_handler.go index b9d375c..0093266 100644 --- a/amazon_transcribe_handler.go +++ b/amazon_transcribe_handler.go @@ -198,8 +198,7 @@ func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader) switch err.(type) { case *transcribestreamingservice.LimitExceededException, *transcribestreamingservice.InternalFailureException: - // TODO: 元の err を送信する - err = ErrServerDisconnected + err = errors.Join(err, ErrServerDisconnected) default: } diff --git a/handler.go b/handler.go index c119723..0250279 100644 --- a/handler.go +++ b/handler.go @@ -3,6 +3,7 @@ package suzu import ( "context" "encoding/binary" + "encoding/json" "errors" "fmt" "io" @@ -184,14 +185,39 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte Send() return err } else if errors.Is(err, ErrServerDisconnected) { + errs := err.(interface{ Unwrap() []error }).Unwrap() + // 元の err を取得する + err := errs[0] + if s.config.MaxRetry < 1 { // サーバから切断されたが再接続させない設定の場合 zlog.Error(). + Err(ErrServerDisconnected). Err(err). Str("channel_id", h.SoraChannelID). Str("connection_id", h.SoraConnectionID). Send() - return err + + errMessage, err := json.Marshal(NewSuzuErrorResponse(err)) + if err != nil { + zlog.Error(). + Err(err). + Str("channel_id", h.SoraChannelID). + Str("connection_id", h.SoraConnectionID). + Send() + return err + } + + if _, err := c.Response().Write(errMessage); err != nil { + zlog.Error(). + Err(err). + Str("channel_id", h.SoraChannelID). + Str("connection_id", h.SoraConnectionID). + Send() + return err + } + c.Response().Flush() + return ErrServerDisconnected } if s.config.MaxRetry > serviceHandler.GetRetryCount() { @@ -214,11 +240,58 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte Str("channel_id", h.SoraChannelID). Str("connection_id", h.SoraConnectionID). Send() + + errMessage, err := json.Marshal(NewSuzuErrorResponse(err)) + if err != nil { + zlog.Error(). + Err(err). + Str("channel_id", h.SoraChannelID). + Str("connection_id", h.SoraConnectionID). + Send() + return err + } + + if _, err := c.Response().Write(errMessage); err != nil { + zlog.Error(). + Err(err). + Str("channel_id", h.SoraChannelID). + Str("connection_id", h.SoraConnectionID). + Send() + return err + } + c.Response().Flush() + // max_retry を超えた場合は終了 return c.NoContent(http.StatusOK) } } + zlog.Debug(). + Err(err). + Str("channel_id", h.SoraChannelID). + Str("connection_id", h.SoraConnectionID). + Send() + + errMessage, err := json.Marshal(NewSuzuErrorResponse(err)) + if err != nil { + zlog.Error(). + Err(err). + Str("channel_id", h.SoraChannelID). + Str("connection_id", h.SoraConnectionID). + Send() + return err + } + + if _, err := c.Response().Write(errMessage); err != nil { + zlog.Error(). + Err(err). + Str("channel_id", h.SoraChannelID). + Str("connection_id", h.SoraConnectionID). + Send() + return err + } + c.Response().Flush() + // サーバから切断されたが再度の接続が期待できない場合 return err } diff --git a/speech_to_text_handler.go b/speech_to_text_handler.go index 7141f9d..0a55b9d 100644 --- a/speech_to_text_handler.go +++ b/speech_to_text_handler.go @@ -166,7 +166,8 @@ func (h *SpeechToTextHandler) Handle(ctx context.Context, reader io.Reader) (*io code == codes.InvalidArgument || code == codes.ResourceExhausted { - w.CloseWithError(ErrServerDisconnected) + err := errors.Join(err, ErrServerDisconnected) + w.CloseWithError(err) return } From 0681e4e2ecb9551268adfd49a1c0959a1745db26 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Tue, 26 Nov 2024 12:08:19 +0900 Subject: [PATCH 06/22] =?UTF-8?q?=E3=83=AA=E3=83=88=E3=83=A9=E3=82=A4?= =?UTF-8?q?=E5=BE=85=E3=81=A1=E3=81=AE=E9=96=93=E3=81=AB=E3=82=AF=E3=83=A9?= =?UTF-8?q?=E3=82=A4=E3=82=A2=E3=83=B3=E3=83=88=E3=81=8B=E3=82=89=E5=88=87?= =?UTF-8?q?=E6=96=AD=E3=81=95=E3=82=8C=E3=81=9F=E5=A0=B4=E5=90=88=E3=81=AB?= =?UTF-8?q?=E3=80=81=E3=83=AA=E3=83=88=E3=83=A9=E3=82=A4=E5=BE=85=E3=81=A1?= =?UTF-8?q?=E3=81=A7=E3=83=96=E3=83=AD=E3=83=83=E3=82=AF=E3=81=95=E3=82=8C?= =?UTF-8?q?=E3=81=AA=E3=81=84=E3=82=88=E3=81=86=E3=81=AB=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler.go | 71 +++++++++++++++++++++++++++++++++++-------------- handler_test.go | 3 +-- 2 files changed, 52 insertions(+), 22 deletions(-) diff --git a/handler.go b/handler.go index 0250279..78781b2 100644 --- a/handler.go +++ b/handler.go @@ -116,8 +116,15 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte channelCount := uint16(s.config.ChannelCount) d := time.Duration(s.config.TimeToWaitForOpusPacketMs) * time.Millisecond - r := NewOpusReader(*s.config, d, c.Request().Body) - defer r.Close() + opusReader := NewOpusReader(*s.config, d, c.Request().Body) + defer opusReader.Close() + + var r io.Reader + if s.config.AudioStreamingHeader { + r = readPacketWithHeader(opusReader) + } else { + r = opusReader + } serviceHandler, err := getServiceHandler(serviceType, *s.config, h.SoraChannelID, h.SoraConnectionID, sampleRate, channelCount, languageCode, onResultFunc) if err != nil { @@ -154,10 +161,44 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte if s.config.MaxRetry > serviceHandler.GetRetryCount() { serviceHandler.UpdateRetryCount() + // 切断検知のために、クライアントから送られてくるパケットは受信し続ける + errCh := make(chan error) + ctx, cancelPacketDiscard := context.WithCancel(ctx) + defer cancelPacketDiscard() + + // TODO: 関数化 + go func(ctx context.Context) { + defer close(errCh) + + // サービス側には接続していないため、パケットは破棄する + buf := make([]byte, HeaderLength+MaxPayloadLength) + for { + select { + case <-ctx.Done(): + return + default: + if _, err := r.Read(buf); err != nil { + errCh <- err + return + } + } + } + }(ctx) + // 連続のリトライを避けるために少し待つ - time.Sleep(time.Duration(s.config.RetryIntervalMs) * time.Millisecond) + retryTimer := time.NewTimer(time.Duration(s.config.RetryIntervalMs) * time.Millisecond) + select { + case <-retryTimer.C: + retryTimer.Stop() + cancelPacketDiscard() + // リトライ対象のエラーのため、クライアントとの接続は切らずにリトライする + break + case err := <-errCh: + retryTimer.Stop() + // リトライする前にクライアントとの接続でエラーが発生した場合は終了する + return err + } - // リトライ対象のエラーのため、クライアントとの接続は切らずにリトライする continue } } @@ -272,6 +313,8 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte Str("connection_id", h.SoraConnectionID). Send() + orgErr := err + errMessage, err := json.Marshal(NewSuzuErrorResponse(err)) if err != nil { zlog.Error(). @@ -293,7 +336,7 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte c.Response().Flush() // サーバから切断されたが再度の接続が期待できない場合 - return err + return orgErr } // メッセージが空でない場合はクライアントに結果を送信する @@ -313,9 +356,7 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte } } -const () - -func readPacketWithHeader(reader io.Reader) (io.Reader, error) { +func readPacketWithHeader(reader io.Reader) io.Reader { r, w := io.Pipe() go func() { @@ -393,7 +434,7 @@ func readPacketWithHeader(reader io.Reader) (io.Reader, error) { } }() - return r, nil + return r } func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sampleRate uint32, channelCount uint16, c Config) error { @@ -406,16 +447,6 @@ func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sa } defer o.Close() - var r io.Reader - if c.AudioStreamingHeader { - r, err = readPacketWithHeader(opusReader) - if err != nil { - return err - } - } else { - r = opusReader - } - ch := make(chan []byte) go func() { @@ -423,7 +454,7 @@ func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sa for { buf := make([]byte, FrameSize) - n, err := r.Read(buf) + n, err := opusReader.Read(buf) if err != nil { if w, ok := oggWriter.(*io.PipeWriter); ok { w.CloseWithError(err) diff --git a/handler_test.go b/handler_test.go index b7192ad..39e1913 100644 --- a/handler_test.go +++ b/handler_test.go @@ -288,8 +288,7 @@ func TestReadPacketWithHeader(t *testing.T) { } }() - r, err := readPacketWithHeader(reader) - assert.NoError(t, err) + r := readPacketWithHeader(reader) i := 0 for { From 0b02fef14ece77b21e991a93c414bc962570e157 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Tue, 26 Nov 2024 15:29:16 +0900 Subject: [PATCH 07/22] =?UTF-8?q?=E3=83=91=E3=82=B1=E3=83=83=E3=83=88?= =?UTF-8?q?=E7=A0=B4=E6=A3=84=E5=87=A6=E7=90=86=E3=82=92=E9=96=A2=E6=95=B0?= =?UTF-8?q?=E5=8C=96=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler.go | 43 +++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/handler.go b/handler.go index 78781b2..822c56b 100644 --- a/handler.go +++ b/handler.go @@ -162,28 +162,11 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte serviceHandler.UpdateRetryCount() // 切断検知のために、クライアントから送られてくるパケットは受信し続ける - errCh := make(chan error) ctx, cancelPacketDiscard := context.WithCancel(ctx) defer cancelPacketDiscard() - // TODO: 関数化 - go func(ctx context.Context) { - defer close(errCh) - - // サービス側には接続していないため、パケットは破棄する - buf := make([]byte, HeaderLength+MaxPayloadLength) - for { - select { - case <-ctx.Done(): - return - default: - if _, err := r.Read(buf); err != nil { - errCh <- err - return - } - } - } - }(ctx) + errCh := make(chan error) + go discardPacket(ctx, r, errCh) // 連続のリトライを避けるために少し待つ retryTimer := time.NewTimer(time.Duration(s.config.RetryIntervalMs) * time.Millisecond) @@ -192,14 +175,12 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte retryTimer.Stop() cancelPacketDiscard() // リトライ対象のエラーのため、クライアントとの接続は切らずにリトライする - break + continue case err := <-errCh: retryTimer.Stop() // リトライする前にクライアントとの接続でエラーが発生した場合は終了する return err } - - continue } } // SuzuError の場合はその Status Code を返す @@ -356,6 +337,24 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte } } +func discardPacket(ctx context.Context, r io.Reader, errCh chan error) { + defer close(errCh) + + // サービス側には接続していないため、パケットは破棄する + buf := make([]byte, HeaderLength+MaxPayloadLength) + for { + select { + case <-ctx.Done(): + return + default: + if _, err := r.Read(buf); err != nil { + errCh <- err + return + } + } + } +} + func readPacketWithHeader(reader io.Reader) io.Reader { r, w := io.Pipe() From eea6b1fffd2f7ad299c34c6ecaa15f1aeb04fd69 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Tue, 26 Nov 2024 16:10:16 +0900 Subject: [PATCH 08/22] =?UTF-8?q?context,=20cancel=20=E3=82=92=E5=88=86?= =?UTF-8?q?=E3=81=91=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/handler.go b/handler.go index 822c56b..4bc7559 100644 --- a/handler.go +++ b/handler.go @@ -146,8 +146,8 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte Msg("NEW-REQUEST") // リトライ時にこれ以降の処理のみを cancel する - ctx, cancel := context.WithCancel(ctx) - defer cancel() + ctx1, cancel1 := context.WithCancel(ctx) + defer cancel1() reader, err := serviceHandler.Handle(ctx, r) if err != nil { @@ -162,11 +162,11 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte serviceHandler.UpdateRetryCount() // 切断検知のために、クライアントから送られてくるパケットは受信し続ける - ctx, cancelPacketDiscard := context.WithCancel(ctx) + ctx2, cancelPacketDiscard := context.WithCancel(ctx1) defer cancelPacketDiscard() errCh := make(chan error) - go discardPacket(ctx, r, errCh) + go discardPacket(ctx2, r, errCh) // 連続のリトライを避けるために少し待つ retryTimer := time.NewTimer(time.Duration(s.config.RetryIntervalMs) * time.Millisecond) @@ -248,13 +248,7 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte serviceHandler.UpdateRetryCount() // TODO: 必要な場合は連続のリトライを避けるために少し待つ処理を追加する - zlog.Debug().Err(err). - Str("channel_id", h.SoraChannelID). - Str("connection_id", h.SoraConnectionID). - Int("retry_count", serviceHandler.GetRetryCount()). - Msg("RETRYING") - - cancel() + cancel1() break } else { zlog.Error(). From 8ae84edd133e69795dc3830af8dac941889954c3 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Tue, 26 Nov 2024 17:18:31 +0900 Subject: [PATCH 09/22] =?UTF-8?q?=E3=83=A1=E3=82=BD=E3=83=83=E3=83=89?= =?UTF-8?q?=E3=81=AB=E6=B8=A1=E3=81=99=20context=20=E3=82=92=E4=BF=AE?= =?UTF-8?q?=E6=AD=A3=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/handler.go b/handler.go index 4bc7559..99eb115 100644 --- a/handler.go +++ b/handler.go @@ -149,7 +149,7 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte ctx1, cancel1 := context.WithCancel(ctx) defer cancel1() - reader, err := serviceHandler.Handle(ctx, r) + reader, err := serviceHandler.Handle(ctx1, r) if err != nil { zlog.Error(). Err(err). From 853817adb75bf26d0b0f8a0412f8ba5f68bfbb97 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Wed, 27 Nov 2024 11:40:58 +0900 Subject: [PATCH 10/22] =?UTF-8?q?=E3=82=B3=E3=82=B9=E3=83=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/handler.go b/handler.go index 99eb115..3cb3a60 100644 --- a/handler.go +++ b/handler.go @@ -146,10 +146,10 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte Msg("NEW-REQUEST") // リトライ時にこれ以降の処理のみを cancel する - ctx1, cancel1 := context.WithCancel(ctx) - defer cancel1() + serviceHandlerCtx, cancelServiceHandler := context.WithCancel(ctx) + defer cancelServiceHandler() - reader, err := serviceHandler.Handle(ctx1, r) + reader, err := serviceHandler.Handle(serviceHandlerCtx, r) if err != nil { zlog.Error(). Err(err). @@ -162,11 +162,11 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte serviceHandler.UpdateRetryCount() // 切断検知のために、クライアントから送られてくるパケットは受信し続ける - ctx2, cancelPacketDiscard := context.WithCancel(ctx1) + packetDiscardCtx, cancelPacketDiscard := context.WithCancel(serviceHandlerCtx) defer cancelPacketDiscard() errCh := make(chan error) - go discardPacket(ctx2, r, errCh) + go discardPacket(packetDiscardCtx, r, errCh) // 連続のリトライを避けるために少し待つ retryTimer := time.NewTimer(time.Duration(s.config.RetryIntervalMs) * time.Millisecond) @@ -248,7 +248,7 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte serviceHandler.UpdateRetryCount() // TODO: 必要な場合は連続のリトライを避けるために少し待つ処理を追加する - cancel1() + cancelServiceHandler() break } else { zlog.Error(). From fe3b43f4bb10148ce60ea3f4f27c045ed27b0a7c Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Thu, 28 Nov 2024 16:43:54 +0900 Subject: [PATCH 11/22] =?UTF-8?q?-=20Read=20=E6=99=82=E3=81=AE=E5=87=A6?= =?UTF-8?q?=E7=90=86=E3=82=92=20channel=20=E3=81=AB=E7=BD=AE=E3=81=8D?= =?UTF-8?q?=E6=8F=9B=E3=81=88=E3=81=A6=E5=87=A6=E7=90=86=E3=82=92=E4=B8=AD?= =?UTF-8?q?=E6=96=AD=E3=81=A7=E3=81=8D=E3=82=8B=E3=82=88=E3=81=86=E3=81=AB?= =?UTF-8?q?=E3=81=99=E3=82=8B=20-=20opus=20=E3=81=8B=E3=82=89=20ogg=20?= =?UTF-8?q?=E3=81=B8=E3=81=AE=E5=A4=89=E6=8F=9B=E5=87=A6=E7=90=86=E3=82=92?= =?UTF-8?q?=E5=85=B1=E9=80=9A=E5=8C=96=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- amazon_transcribe_handler.go | 22 +---- handler.go | 163 +++++++++++++++++++++++++++-------- speech_to_text_handler.go | 20 +---- 3 files changed, 131 insertions(+), 74 deletions(-) diff --git a/amazon_transcribe_handler.go b/amazon_transcribe_handler.go index 0093266..4244f56 100644 --- a/amazon_transcribe_handler.go +++ b/amazon_transcribe_handler.go @@ -95,27 +95,10 @@ func (h *AmazonTranscribeHandler) ResetRetryCount() int { return h.RetryCount } -func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) { +func (h *AmazonTranscribeHandler) Handle(ctx context.Context, packetReader io.Reader) (*io.PipeReader, error) { at := NewAmazonTranscribe(h.Config, h.LanguageCode, int64(h.SampleRate), int64(h.ChannelCount)) - oggReader, oggWriter := io.Pipe() - go func() { - defer oggWriter.Close() - if err := opus2ogg(ctx, reader, oggWriter, h.SampleRate, h.ChannelCount, h.Config); err != nil { - if !errors.Is(err, io.EOF) { - zlog.Error(). - Err(err). - Str("channel_id", h.ChannelID). - Str("connection_id", h.ConnectionID). - Send() - } - - oggWriter.CloseWithError(err) - return - } - }() - - stream, err := at.Start(ctx, oggReader) + stream, err := at.Start(ctx, packetReader) if err != nil { return nil, err } @@ -205,7 +188,6 @@ func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader) w.CloseWithError(err) return } - w.Close() }() diff --git a/handler.go b/handler.go index 3cb3a60..525e1d0 100644 --- a/handler.go +++ b/handler.go @@ -123,9 +123,12 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte if s.config.AudioStreamingHeader { r = readPacketWithHeader(opusReader) } else { + // ヘッダー処理なし r = opusReader } + opusCh := readOpus(ctx, r) + serviceHandler, err := getServiceHandler(serviceType, *s.config, h.SoraChannelID, h.SoraConnectionID, sampleRate, channelCount, languageCode, onResultFunc) if err != nil { zlog.Error(). @@ -149,7 +152,11 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte serviceHandlerCtx, cancelServiceHandler := context.WithCancel(ctx) defer cancelServiceHandler() - reader, err := serviceHandler.Handle(serviceHandlerCtx, r) + oggCh := opus2ogg2(serviceHandlerCtx, opusCh, sampleRate, channelCount, *s.config) + + packetReader := readOgg(serviceHandlerCtx, oggCh) + + reader, err := serviceHandler.Handle(serviceHandlerCtx, packetReader) if err != nil { zlog.Error(). Err(err). @@ -161,25 +168,22 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte if s.config.MaxRetry > serviceHandler.GetRetryCount() { serviceHandler.UpdateRetryCount() - // 切断検知のために、クライアントから送られてくるパケットは受信し続ける - packetDiscardCtx, cancelPacketDiscard := context.WithCancel(serviceHandlerCtx) - defer cancelPacketDiscard() - - errCh := make(chan error) - go discardPacket(packetDiscardCtx, r, errCh) - - // 連続のリトライを避けるために少し待つ retryTimer := time.NewTimer(time.Duration(s.config.RetryIntervalMs) * time.Millisecond) + + retry: select { case <-retryTimer.C: retryTimer.Stop() - cancelPacketDiscard() // リトライ対象のエラーのため、クライアントとの接続は切らずにリトライする continue - case err := <-errCh: + case _, ok := <-oggCh: + if ok { + // エラー、または、リトライのタイマーが発火するま繰り返す + goto retry + } retryTimer.Stop() // リトライする前にクライアントとの接続でエラーが発生した場合は終了する - return err + return fmt.Errorf("retry error") } } } @@ -331,24 +335,6 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte } } -func discardPacket(ctx context.Context, r io.Reader, errCh chan error) { - defer close(errCh) - - // サービス側には接続していないため、パケットは破棄する - buf := make([]byte, HeaderLength+MaxPayloadLength) - for { - select { - case <-ctx.Done(): - return - default: - if _, err := r.Read(buf); err != nil { - errCh <- err - return - } - } - } -} - func readPacketWithHeader(reader io.Reader) io.Reader { r, w := io.Pipe() @@ -430,6 +416,117 @@ func readPacketWithHeader(reader io.Reader) io.Reader { return r } +func readOpus(ctx context.Context, reader io.Reader) chan []byte { + opusCh := make(chan []byte) + + go func() { + defer close(opusCh) + + for { + select { + case <-ctx.Done(): + return + default: + buf := make([]byte, FrameSize) + n, err := reader.Read(buf) + if err != nil { + return + } + + if n > 0 { + opusCh <- buf[:n] + } + } + } + }() + + return opusCh +} + +func readOgg(ctx context.Context, oggCh chan []byte) io.Reader { + pr, pw := io.Pipe() + + go func() { + defer pw.Close() + for { + select { + case <-ctx.Done(): + pw.CloseWithError(ctx.Err()) + return + case buf, ok := <-oggCh: + if !ok { + pw.CloseWithError(fmt.Errorf("channel closed")) + return + } + + if _, err := pw.Write(buf); err != nil { + pw.CloseWithError(err) + return + } + } + } + }() + + return pr +} + +func opus2ogg2(ctx context.Context, opusCh chan []byte, sampleRate uint32, channelCount uint16, c Config) chan []byte { + oggReader, oggWriter := io.Pipe() + oggCh := make(chan []byte) + + go func() { + defer close(oggCh) + + for { + buf := make([]byte, FrameSize) + n, err := oggReader.Read(buf) + if err != nil { + oggWriter.CloseWithError(err) + return + } + if n > 0 { + oggCh <- buf[:n] + } + } + }() + + go func() { + o, err := NewWith(oggWriter, sampleRate, channelCount) + if err != nil { + oggWriter.CloseWithError(err) + return + } + defer o.Close() + + for { + select { + case <-ctx.Done(): + oggWriter.CloseWithError(ctx.Err()) + return + case buf, ok := <-opusCh: + if !ok { + oggWriter.CloseWithError(fmt.Errorf("channel closed")) + return + } + + opus := codecs.OpusPacket{} + _, err := opus.Unmarshal(buf) + if err != nil { + oggWriter.CloseWithError(err) + return + } + + if err := o.Write(&opus); err != nil { + oggWriter.CloseWithError(err) + return + } + } + } + }() + + return oggCh +} + func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sampleRate uint32, channelCount uint16, c Config) error { o, err := NewWith(oggWriter, sampleRate, channelCount) if err != nil { @@ -470,12 +567,6 @@ func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sa return nil } - if !ok { - if w, ok := oggWriter.(*io.PipeWriter); ok { - w.CloseWithError(err) - } - } - opus := codecs.OpusPacket{} _, err := opus.Unmarshal(buf) if err != nil { diff --git a/speech_to_text_handler.go b/speech_to_text_handler.go index 0a55b9d..00b3ccb 100644 --- a/speech_to_text_handler.go +++ b/speech_to_text_handler.go @@ -91,26 +91,10 @@ func (h *SpeechToTextHandler) ResetRetryCount() int { return h.RetryCount } -func (h *SpeechToTextHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) { +func (h *SpeechToTextHandler) Handle(ctx context.Context, packetReader io.Reader) (*io.PipeReader, error) { stt := NewSpeechToText(h.Config, h.LanguageCode, int32(h.SampleRate), int32(h.ChannelCount)) - oggReader, oggWriter := io.Pipe() - go func() { - defer oggWriter.Close() - if err := opus2ogg(ctx, reader, oggWriter, h.SampleRate, h.ChannelCount, h.Config); err != nil { - if !errors.Is(err, io.EOF) { - zlog.Error(). - Err(err). - Str("channel_id", h.ChannelID). - Str("connection_id", h.ConnectionID). - Send() - } - oggWriter.CloseWithError(err) - return - } - }() - - stream, err := stt.Start(ctx, oggReader) + stream, err := stt.Start(ctx, packetReader) if err != nil { return nil, err } From d321c2d29c7e64e0313add05af4338155bdb467f Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Fri, 29 Nov 2024 13:16:42 +0900 Subject: [PATCH 12/22] =?UTF-8?q?opus=20=E5=8F=96=E5=BE=97=E3=81=8B?= =?UTF-8?q?=E3=82=89=20ogg=20=E3=81=B8=E3=81=AE=E5=A4=89=E6=8F=9B=E5=87=A6?= =?UTF-8?q?=E7=90=86=E3=82=92=E5=88=86=E9=9B=A2=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- amazon_transcribe.go | 6 +- amazon_transcribe_handler.go | 4 +- handler.go | 160 ++++++++++------------------------- packet_dump_handler.go | 4 +- service_handler.go | 2 +- speech_to_text_handler.go | 4 +- test_handler.go | 4 +- 7 files changed, 64 insertions(+), 120 deletions(-) diff --git a/amazon_transcribe.go b/amazon_transcribe.go index ce61cbc..bd8a5f6 100644 --- a/amazon_transcribe.go +++ b/amazon_transcribe.go @@ -9,6 +9,7 @@ import ( "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/transcribestreamingservice" + zlog "github.com/rs/zerolog/log" ) type AmazonTranscribe struct { @@ -22,6 +23,7 @@ type AmazonTranscribe struct { Region string Debug bool Config Config + Count int } func NewAmazonTranscribe(config Config, languageCode string, sampleRateHertz, audioChannelCount int64) *AmazonTranscribe { @@ -89,7 +91,7 @@ func NewAmazonTranscribeClient(config Config) *transcribestreamingservice.Transc return transcribestreamingservice.New(sess, cfg) } -func (at *AmazonTranscribe) Start(ctx context.Context, r io.Reader) (*transcribestreamingservice.StartStreamTranscriptionEventStream, error) { +func (at *AmazonTranscribe) Start(ctx context.Context, r io.ReadCloser) (*transcribestreamingservice.StartStreamTranscriptionEventStream, error) { config := at.Config client := NewAmazonTranscribeClient(config) input := NewStartStreamTranscriptionInput(at) @@ -117,9 +119,11 @@ func (at *AmazonTranscribe) Start(ctx context.Context, r io.Reader) (*transcribe stream := resp.GetStream() go func() { + defer r.Close() defer stream.Close() if err := transcribestreamingservice.StreamAudioFromReader(ctx, stream, FrameSize, r); err != nil { + zlog.Error().Err(err).Send() return } }() diff --git a/amazon_transcribe_handler.go b/amazon_transcribe_handler.go index 4244f56..0531ce9 100644 --- a/amazon_transcribe_handler.go +++ b/amazon_transcribe_handler.go @@ -95,9 +95,11 @@ func (h *AmazonTranscribeHandler) ResetRetryCount() int { return h.RetryCount } -func (h *AmazonTranscribeHandler) Handle(ctx context.Context, packetReader io.Reader) (*io.PipeReader, error) { +func (h *AmazonTranscribeHandler) Handle(ctx context.Context, opusCh chan []byte) (*io.PipeReader, error) { at := NewAmazonTranscribe(h.Config, h.LanguageCode, int64(h.SampleRate), int64(h.ChannelCount)) + packetReader := opus2ogg(ctx, opusCh, h.SampleRate, h.ChannelCount, h.Config) + stream, err := at.Start(ctx, packetReader) if err != nil { return nil, err diff --git a/handler.go b/handler.go index 525e1d0..9bee125 100644 --- a/handler.go +++ b/handler.go @@ -152,11 +152,7 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte serviceHandlerCtx, cancelServiceHandler := context.WithCancel(ctx) defer cancelServiceHandler() - oggCh := opus2ogg2(serviceHandlerCtx, opusCh, sampleRate, channelCount, *s.config) - - packetReader := readOgg(serviceHandlerCtx, oggCh) - - reader, err := serviceHandler.Handle(serviceHandlerCtx, packetReader) + reader, err := serviceHandler.Handle(serviceHandlerCtx, opusCh) if err != nil { zlog.Error(). Err(err). @@ -168,22 +164,34 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte if s.config.MaxRetry > serviceHandler.GetRetryCount() { serviceHandler.UpdateRetryCount() + // リトライ対象のエラーのため、クライアントとの接続は切らずにリトライする retryTimer := time.NewTimer(time.Duration(s.config.RetryIntervalMs) * time.Millisecond) retry: select { case <-retryTimer.C: retryTimer.Stop() - // リトライ対象のエラーのため、クライアントとの接続は切らずにリトライする + zlog.Info(). + Err(err). + Str("channel_id", h.SoraChannelID). + Str("connection_id", h.SoraConnectionID). + Msg("retry") + cancelServiceHandler() continue - case _, ok := <-oggCh: + case _, ok := <-opusCh: if ok { - // エラー、または、リトライのタイマーが発火するま繰り返す + // channel が閉じるか、または、リトライのタイマーが発火するまで繰り返す goto retry } retryTimer.Stop() + zlog.Info(). + Err(err). + Str("channel_id", h.SoraChannelID). + Str("connection_id", h.SoraConnectionID). + Msg("retry interrupted") + cancelServiceHandler() // リトライする前にクライアントとの接続でエラーが発生した場合は終了する - return fmt.Errorf("retry error") + return fmt.Errorf("%s", "retry interrupted") } } } @@ -443,52 +451,8 @@ func readOpus(ctx context.Context, reader io.Reader) chan []byte { return opusCh } -func readOgg(ctx context.Context, oggCh chan []byte) io.Reader { - pr, pw := io.Pipe() - - go func() { - defer pw.Close() - for { - select { - case <-ctx.Done(): - pw.CloseWithError(ctx.Err()) - return - case buf, ok := <-oggCh: - if !ok { - pw.CloseWithError(fmt.Errorf("channel closed")) - return - } - - if _, err := pw.Write(buf); err != nil { - pw.CloseWithError(err) - return - } - } - } - }() - - return pr -} - -func opus2ogg2(ctx context.Context, opusCh chan []byte, sampleRate uint32, channelCount uint16, c Config) chan []byte { +func opus2ogg(ctx context.Context, opusCh chan []byte, sampleRate uint32, channelCount uint16, c Config) io.ReadCloser { oggReader, oggWriter := io.Pipe() - oggCh := make(chan []byte) - - go func() { - defer close(oggCh) - - for { - buf := make([]byte, FrameSize) - n, err := oggReader.Read(buf) - if err != nil { - oggWriter.CloseWithError(err) - return - } - if n > 0 { - oggCh <- buf[:n] - } - } - }() go func() { o, err := NewWith(oggWriter, sampleRate, channelCount) @@ -524,66 +488,7 @@ func opus2ogg2(ctx context.Context, opusCh chan []byte, sampleRate uint32, chann } }() - return oggCh -} - -func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sampleRate uint32, channelCount uint16, c Config) error { - o, err := NewWith(oggWriter, sampleRate, channelCount) - if err != nil { - if w, ok := oggWriter.(*io.PipeWriter); ok { - w.CloseWithError(err) - } - return err - } - defer o.Close() - - ch := make(chan []byte) - - go func() { - defer close(ch) - - for { - buf := make([]byte, FrameSize) - n, err := opusReader.Read(buf) - if err != nil { - if w, ok := oggWriter.(*io.PipeWriter); ok { - w.CloseWithError(err) - } - return - } - - if n > 0 { - ch <- buf[:n] - } - } - }() - - for { - select { - case <-ctx.Done(): - return ctx.Err() - case buf, ok := <-ch: - if !ok { - return nil - } - - opus := codecs.OpusPacket{} - _, err := opus.Unmarshal(buf) - if err != nil { - if w, ok := oggWriter.(*io.PipeWriter); ok { - w.CloseWithError(err) - } - return err - } - - if err := o.Write(&opus); err != nil { - if w, ok := oggWriter.(*io.PipeWriter); ok { - w.CloseWithError(err) - } - return err - } - } - } + return oggReader } type opusRequest struct { @@ -685,3 +590,30 @@ func silentPacket(audioStreamingHeader bool) []byte { return packet } + +func channelToIOReadCloser(ctx context.Context, ch chan []byte) io.ReadCloser { + r, w := io.Pipe() + + go func() { + defer w.Close() + + for { + select { + case <-ctx.Done(): + w.CloseWithError(ctx.Err()) + return + case buf, ok := <-ch: + if !ok { + w.CloseWithError(fmt.Errorf("channel closed")) + return + } + if _, err := w.Write(buf); err != nil { + w.CloseWithError(err) + return + } + } + } + }() + + return r +} diff --git a/packet_dump_handler.go b/packet_dump_handler.go index 1ed7e94..91f7381 100644 --- a/packet_dump_handler.go +++ b/packet_dump_handler.go @@ -67,7 +67,7 @@ func (h *PacketDumpHandler) ResetRetryCount() int { return h.RetryCount } -func (h *PacketDumpHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) { +func (h *PacketDumpHandler) Handle(ctx context.Context, opusCh chan []byte) (*io.PipeReader, error) { c := h.Config filename := c.DumpFile channelID := h.ChannelID @@ -75,6 +75,8 @@ func (h *PacketDumpHandler) Handle(ctx context.Context, reader io.Reader) (*io.P r, w := io.Pipe() + reader := channelToIOReadCloser(ctx, opusCh) + go func() { f, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { diff --git a/service_handler.go b/service_handler.go index 0a8b722..c836aa7 100644 --- a/service_handler.go +++ b/service_handler.go @@ -15,7 +15,7 @@ var ( ) type serviceHandlerInterface interface { - Handle(context.Context, io.Reader) (*io.PipeReader, error) + Handle(context.Context, chan []byte) (*io.PipeReader, error) UpdateRetryCount() int GetRetryCount() int ResetRetryCount() int diff --git a/speech_to_text_handler.go b/speech_to_text_handler.go index 00b3ccb..e23cb1f 100644 --- a/speech_to_text_handler.go +++ b/speech_to_text_handler.go @@ -91,9 +91,11 @@ func (h *SpeechToTextHandler) ResetRetryCount() int { return h.RetryCount } -func (h *SpeechToTextHandler) Handle(ctx context.Context, packetReader io.Reader) (*io.PipeReader, error) { +func (h *SpeechToTextHandler) Handle(ctx context.Context, opusCh chan []byte) (*io.PipeReader, error) { stt := NewSpeechToText(h.Config, h.LanguageCode, int32(h.SampleRate), int32(h.ChannelCount)) + packetReader := opus2ogg(ctx, opusCh, h.SampleRate, h.ChannelCount, h.Config) + stream, err := stt.Start(ctx, packetReader) if err != nil { return nil, err diff --git a/test_handler.go b/test_handler.go index e264439..9fe1728 100644 --- a/test_handler.go +++ b/test_handler.go @@ -73,9 +73,11 @@ func (h *TestHandler) ResetRetryCount() int { return h.RetryCount } -func (h *TestHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) { +func (h *TestHandler) Handle(ctx context.Context, opusCh chan []byte) (*io.PipeReader, error) { r, w := io.Pipe() + reader := channelToIOReadCloser(ctx, opusCh) + go func() { encoder := json.NewEncoder(w) From a834f6179c552696c8d5b2a2da6c345dd5404cf1 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Fri, 29 Nov 2024 14:51:02 +0900 Subject: [PATCH 13/22] =?UTF-8?q?=E3=81=99=E3=81=90=E3=81=AB=20EOF=20?= =?UTF-8?q?=E3=81=AB=E3=81=AA=E3=82=89=E3=81=AA=E3=81=84=E3=82=88=E3=81=86?= =?UTF-8?q?=E3=81=AB=E3=83=95=E3=82=A1=E3=82=A4=E3=83=AB=E8=AA=AD=E3=81=BF?= =?UTF-8?q?=E8=BE=BC=E3=81=BF=E5=AE=8C=E4=BA=86=E5=BE=8C=E3=81=AB=E5=BE=85?= =?UTF-8?q?=E3=81=9F=E3=81=9B=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test_handler_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test_handler_test.go b/test_handler_test.go index 506a6c4..8986a7f 100644 --- a/test_handler_test.go +++ b/test_handler_test.go @@ -63,6 +63,9 @@ func readDumpFile(t *testing.T, filename string, d time.Duration) *io.PipeReader t.Error(err.Error()) return } + + // 閉じてしまうとすぐに EOF で切断状態になるため少し待つ + time.Sleep(2 * time.Second) }() return r From e2500932d29c71e215487eecb9ff6f40a2133361 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Fri, 29 Nov 2024 15:42:10 +0900 Subject: [PATCH 14/22] =?UTF-8?q?channel=20=E3=81=A7=E3=82=A8=E3=83=A9?= =?UTF-8?q?=E3=83=BC=E3=82=92=E4=BC=9D=E6=90=AC=E3=81=95=E3=81=9B=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- amazon_transcribe_handler.go | 2 +- handler.go | 52 +++++++++++++++++++++++++++--------- packet_dump_handler.go | 2 +- service_handler.go | 2 +- speech_to_text_handler.go | 2 +- test_handler.go | 2 +- 6 files changed, 44 insertions(+), 18 deletions(-) diff --git a/amazon_transcribe_handler.go b/amazon_transcribe_handler.go index 0531ce9..3080874 100644 --- a/amazon_transcribe_handler.go +++ b/amazon_transcribe_handler.go @@ -95,7 +95,7 @@ func (h *AmazonTranscribeHandler) ResetRetryCount() int { return h.RetryCount } -func (h *AmazonTranscribeHandler) Handle(ctx context.Context, opusCh chan []byte) (*io.PipeReader, error) { +func (h *AmazonTranscribeHandler) Handle(ctx context.Context, opusCh chan opusChannel) (*io.PipeReader, error) { at := NewAmazonTranscribe(h.Config, h.LanguageCode, int64(h.SampleRate), int64(h.ChannelCount)) packetReader := opus2ogg(ctx, opusCh, h.SampleRate, h.ChannelCount, h.Config) diff --git a/handler.go b/handler.go index 9bee125..7f4f018 100644 --- a/handler.go +++ b/handler.go @@ -424,8 +424,8 @@ func readPacketWithHeader(reader io.Reader) io.Reader { return r } -func readOpus(ctx context.Context, reader io.Reader) chan []byte { - opusCh := make(chan []byte) +func readOpus(ctx context.Context, reader io.Reader) chan opusChannel { + opusCh := make(chan opusChannel) go func() { defer close(opusCh) @@ -433,16 +433,25 @@ func readOpus(ctx context.Context, reader io.Reader) chan []byte { for { select { case <-ctx.Done(): + opusCh <- opusChannel{ + Error: ctx.Err(), + } return default: buf := make([]byte, FrameSize) n, err := reader.Read(buf) if err != nil { + opusCh <- opusChannel{ + Error: err, + } return } if n > 0 { - opusCh <- buf[:n] + opusCh <- opusChannel{ + Payload: buf[:n], + } + } } } @@ -451,7 +460,7 @@ func readOpus(ctx context.Context, reader io.Reader) chan []byte { return opusCh } -func opus2ogg(ctx context.Context, opusCh chan []byte, sampleRate uint32, channelCount uint16, c Config) io.ReadCloser { +func opus2ogg(ctx context.Context, opusCh chan opusChannel, sampleRate uint32, channelCount uint16, c Config) io.ReadCloser { oggReader, oggWriter := io.Pipe() go func() { @@ -467,20 +476,25 @@ func opus2ogg(ctx context.Context, opusCh chan []byte, sampleRate uint32, channe case <-ctx.Done(): oggWriter.CloseWithError(ctx.Err()) return - case buf, ok := <-opusCh: + case opus, ok := <-opusCh: if !ok { - oggWriter.CloseWithError(fmt.Errorf("channel closed")) + oggWriter.CloseWithError(fmt.Errorf("channel closed1")) return } - opus := codecs.OpusPacket{} - _, err := opus.Unmarshal(buf) + if err := opus.Error; err != nil { + oggWriter.CloseWithError(err) + return + } + + opusPacket := codecs.OpusPacket{} + _, err := opusPacket.Unmarshal(opus.Payload) if err != nil { oggWriter.CloseWithError(err) return } - if err := o.Write(&opus); err != nil { + if err := o.Write(&opusPacket); err != nil { oggWriter.CloseWithError(err) return } @@ -591,7 +605,13 @@ func silentPacket(audioStreamingHeader bool) []byte { return packet } -func channelToIOReadCloser(ctx context.Context, ch chan []byte) io.ReadCloser { +type opusChannel struct { + Payload []byte + Error error +} + +// func channelToIOReadCloser(ctx context.Context, ch chan []byte) io.ReadCloser { +func channelToIOReadCloser(ctx context.Context, ch chan opusChannel) io.ReadCloser { r, w := io.Pipe() go func() { @@ -602,12 +622,18 @@ func channelToIOReadCloser(ctx context.Context, ch chan []byte) io.ReadCloser { case <-ctx.Done(): w.CloseWithError(ctx.Err()) return - case buf, ok := <-ch: + case opus, ok := <-ch: if !ok { - w.CloseWithError(fmt.Errorf("channel closed")) + w.CloseWithError(io.EOF) + return + } + + if err := opus.Error; err != nil { + w.CloseWithError(err) return } - if _, err := w.Write(buf); err != nil { + + if _, err := w.Write(opus.Payload); err != nil { w.CloseWithError(err) return } diff --git a/packet_dump_handler.go b/packet_dump_handler.go index 91f7381..1af4cb4 100644 --- a/packet_dump_handler.go +++ b/packet_dump_handler.go @@ -67,7 +67,7 @@ func (h *PacketDumpHandler) ResetRetryCount() int { return h.RetryCount } -func (h *PacketDumpHandler) Handle(ctx context.Context, opusCh chan []byte) (*io.PipeReader, error) { +func (h *PacketDumpHandler) Handle(ctx context.Context, opusCh chan opusChannel) (*io.PipeReader, error) { c := h.Config filename := c.DumpFile channelID := h.ChannelID diff --git a/service_handler.go b/service_handler.go index c836aa7..05a3cf2 100644 --- a/service_handler.go +++ b/service_handler.go @@ -15,7 +15,7 @@ var ( ) type serviceHandlerInterface interface { - Handle(context.Context, chan []byte) (*io.PipeReader, error) + Handle(context.Context, chan opusChannel) (*io.PipeReader, error) UpdateRetryCount() int GetRetryCount() int ResetRetryCount() int diff --git a/speech_to_text_handler.go b/speech_to_text_handler.go index e23cb1f..7d0649f 100644 --- a/speech_to_text_handler.go +++ b/speech_to_text_handler.go @@ -91,7 +91,7 @@ func (h *SpeechToTextHandler) ResetRetryCount() int { return h.RetryCount } -func (h *SpeechToTextHandler) Handle(ctx context.Context, opusCh chan []byte) (*io.PipeReader, error) { +func (h *SpeechToTextHandler) Handle(ctx context.Context, opusCh chan opusChannel) (*io.PipeReader, error) { stt := NewSpeechToText(h.Config, h.LanguageCode, int32(h.SampleRate), int32(h.ChannelCount)) packetReader := opus2ogg(ctx, opusCh, h.SampleRate, h.ChannelCount, h.Config) diff --git a/test_handler.go b/test_handler.go index 9fe1728..e22f3af 100644 --- a/test_handler.go +++ b/test_handler.go @@ -73,7 +73,7 @@ func (h *TestHandler) ResetRetryCount() int { return h.RetryCount } -func (h *TestHandler) Handle(ctx context.Context, opusCh chan []byte) (*io.PipeReader, error) { +func (h *TestHandler) Handle(ctx context.Context, opusCh chan opusChannel) (*io.PipeReader, error) { r, w := io.Pipe() reader := channelToIOReadCloser(ctx, opusCh) From c6c26fbd7dd6f9fe876c367b61ffbaa680cdd766 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Fri, 29 Nov 2024 15:45:12 +0900 Subject: [PATCH 15/22] =?UTF-8?q?=E3=83=86=E3=82=B9=E3=83=88=E3=81=AE=20si?= =?UTF-8?q?lent=20packet=20=E9=80=81=E4=BF=A1=E3=81=BE=E3=81=A7=E3=81=AE?= =?UTF-8?q?=E6=99=82=E9=96=93=E3=82=92=E4=BC=B8=E3=81=B0=E3=81=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler_test.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/handler_test.go b/handler_test.go index 39e1913..ce3d365 100644 --- a/handler_test.go +++ b/handler_test.go @@ -34,7 +34,7 @@ func TestOpusPacketReader(t *testing.T) { } t.Run("success", func(t *testing.T) { - d := time.Duration(100) * time.Millisecond + d := time.Duration(3000) * time.Millisecond r := readDumpFile(t, "testdata/000.jsonl", 0) defer r.Close() @@ -47,12 +47,12 @@ func TestOpusPacketReader(t *testing.T) { assert.ErrorIs(t, err, io.EOF) break } - assert.Equal(t, buf[:n], []byte{0, 0, 0}) + assert.Equal(t, []byte{0, 0, 0}, buf[:n]) } }) t.Run("read error", func(t *testing.T) { - d := time.Duration(100) * time.Millisecond + d := time.Duration(3000) * time.Millisecond errPacketRead := errors.New("packet read error") r := NewErrReadCloser(errPacketRead) @@ -66,12 +66,12 @@ func TestOpusPacketReader(t *testing.T) { assert.ErrorIs(t, err, errPacketRead) break } - assert.Equal(t, buf[:n], []byte{255, 255, 254}) + assert.Equal(t, []byte{255, 255, 254}, buf[:n]) } }) t.Run("closed reader", func(t *testing.T) { - d := time.Duration(100) * time.Millisecond + d := time.Duration(3000) * time.Millisecond r := readDumpFile(t, "testdata/dump.jsonl", 0) r.Close() @@ -81,17 +81,17 @@ func TestOpusPacketReader(t *testing.T) { buf := make([]byte, FrameSize) _, err := reader.Read(buf) if err != nil { - assert.ErrorIs(t, err, io.ErrClosedPipe) + assert.ErrorIs(t, io.ErrClosedPipe, err) break } } }) t.Run("close reader", func(t *testing.T) { - d := time.Duration(100) * time.Millisecond + d := time.Duration(3000) * time.Millisecond r := readDumpFile(t, "testdata/dump.jsonl", 0) go func() { - time.Sleep(100 * time.Millisecond) + time.Sleep(3000 * time.Millisecond) r.Close() }() @@ -101,7 +101,7 @@ func TestOpusPacketReader(t *testing.T) { buf := make([]byte, FrameSize) _, err := reader.Read(buf) if err != nil { - assert.ErrorIs(t, err, io.EOF) + assert.ErrorIs(t, io.EOF, err) break } } From 491e173e07d2bb62955aebfa1896106143802831 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Fri, 29 Nov 2024 16:57:01 +0900 Subject: [PATCH 16/22] =?UTF-8?q?=E3=83=98=E3=83=83=E3=83=80=E3=82=92?= =?UTF-8?q?=E3=81=99=E3=81=90=E3=81=AB=E8=BF=94=E3=81=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/handler.go b/handler.go index 7f4f018..5e97464 100644 --- a/handler.go +++ b/handler.go @@ -105,6 +105,7 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte c.Response().Header().Set(echo.HeaderContentType, echo.MIMEApplicationJSON) // すぐにヘッダを送信したい場合はここで c.Response().Flush() を実行する + c.Response().Flush() ctx := c.Request().Context() // TODO: context.WithCancelCause(ctx) に変更する @@ -170,7 +171,6 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte retry: select { case <-retryTimer.C: - retryTimer.Stop() zlog.Info(). Err(err). Str("channel_id", h.SoraChannelID). @@ -189,7 +189,6 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte Str("channel_id", h.SoraChannelID). Str("connection_id", h.SoraConnectionID). Msg("retry interrupted") - cancelServiceHandler() // リトライする前にクライアントとの接続でエラーが発生した場合は終了する return fmt.Errorf("%s", "retry interrupted") } @@ -478,7 +477,7 @@ func opus2ogg(ctx context.Context, opusCh chan opusChannel, sampleRate uint32, c return case opus, ok := <-opusCh: if !ok { - oggWriter.CloseWithError(fmt.Errorf("channel closed1")) + oggWriter.CloseWithError(io.EOF) return } From 42bf69a18567b5e6f7ab50ea2c9267460475a0f3 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Fri, 29 Nov 2024 17:00:57 +0900 Subject: [PATCH 17/22] =?UTF-8?q?=E4=B8=8D=E8=A6=81=E3=81=AA=E3=81=9F?= =?UTF-8?q?=E3=82=81=E5=89=8A=E9=99=A4=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- amazon_transcribe.go | 1 - 1 file changed, 1 deletion(-) diff --git a/amazon_transcribe.go b/amazon_transcribe.go index bd8a5f6..fa8c67d 100644 --- a/amazon_transcribe.go +++ b/amazon_transcribe.go @@ -23,7 +23,6 @@ type AmazonTranscribe struct { Region string Debug bool Config Config - Count int } func NewAmazonTranscribe(config Config, languageCode string, sampleRateHertz, audioChannelCount int64) *AmazonTranscribe { From 9e08fa108c2f56b72935262cf1f1deac065ece30 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Fri, 29 Nov 2024 17:12:30 +0900 Subject: [PATCH 18/22] =?UTF-8?q?=E5=A4=89=E6=9B=B4=E5=B1=A5=E6=AD=B4?= =?UTF-8?q?=E3=82=92=E6=9B=B4=E6=96=B0=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGES.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 86082e3..14f08bc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,13 @@ ## develop +- [FIX] サービスへの接続が成功してもリトライカウントがリセットされない不具合を修正する + - @Hexa +- [FIX] 解析結果だけでなくエラーメッセージの送信時にもリトライカウントをリセットしていたため、リトライ処理によってカウントがリセットされていた不具合を修正する + - @Hexa +- [FIX] リトライ待ち時にクライアントから切断しようとすると、リトライ待ちで処理がブロックされているため切断までに時間がかかる不具合を修正する + - @Hexa + ### misc ## 2024.6.0 From cb10a6ced02e6cf40bec1cfbf05ab1b782498b98 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Mon, 2 Dec 2024 11:40:26 +0900 Subject: [PATCH 19/22] =?UTF-8?q?=E3=82=B3=E3=82=B9=E3=83=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler.go | 3 +-- packet_dump_handler.go | 2 +- test_handler.go | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/handler.go b/handler.go index 5e97464..490aef0 100644 --- a/handler.go +++ b/handler.go @@ -609,8 +609,7 @@ type opusChannel struct { Error error } -// func channelToIOReadCloser(ctx context.Context, ch chan []byte) io.ReadCloser { -func channelToIOReadCloser(ctx context.Context, ch chan opusChannel) io.ReadCloser { +func opusChannelToIOReadCloser(ctx context.Context, ch chan opusChannel) io.ReadCloser { r, w := io.Pipe() go func() { diff --git a/packet_dump_handler.go b/packet_dump_handler.go index 1af4cb4..e35e855 100644 --- a/packet_dump_handler.go +++ b/packet_dump_handler.go @@ -75,7 +75,7 @@ func (h *PacketDumpHandler) Handle(ctx context.Context, opusCh chan opusChannel) r, w := io.Pipe() - reader := channelToIOReadCloser(ctx, opusCh) + reader := opusChannelToIOReadCloser(ctx, opusCh) go func() { f, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) diff --git a/test_handler.go b/test_handler.go index e22f3af..005ea90 100644 --- a/test_handler.go +++ b/test_handler.go @@ -76,7 +76,7 @@ func (h *TestHandler) ResetRetryCount() int { func (h *TestHandler) Handle(ctx context.Context, opusCh chan opusChannel) (*io.PipeReader, error) { r, w := io.Pipe() - reader := channelToIOReadCloser(ctx, opusCh) + reader := opusChannelToIOReadCloser(ctx, opusCh) go func() { encoder := json.NewEncoder(w) From 5e553b629fe176b23d1d3066e8d17ed61300732d Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Mon, 2 Dec 2024 12:46:42 +0900 Subject: [PATCH 20/22] =?UTF-8?q?sleep=20=E3=82=92=E5=89=8A=E9=99=A4?= =?UTF-8?q?=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test_handler_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/test_handler_test.go b/test_handler_test.go index 8986a7f..506a6c4 100644 --- a/test_handler_test.go +++ b/test_handler_test.go @@ -63,9 +63,6 @@ func readDumpFile(t *testing.T, filename string, d time.Duration) *io.PipeReader t.Error(err.Error()) return } - - // 閉じてしまうとすぐに EOF で切断状態になるため少し待つ - time.Sleep(2 * time.Second) }() return r From 206e9e2b9780184d3bfd8d455039c9aa96c7b41d Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Mon, 2 Dec 2024 16:51:21 +0900 Subject: [PATCH 21/22] =?UTF-8?q?=E3=82=B3=E3=83=A1=E3=83=B3=E3=83=88?= =?UTF-8?q?=E3=82=92=E4=BF=AE=E6=AD=A3=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/handler.go b/handler.go index 490aef0..9f571f1 100644 --- a/handler.go +++ b/handler.go @@ -104,7 +104,7 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte Msg("CONNECTED") c.Response().Header().Set(echo.HeaderContentType, echo.MIMEApplicationJSON) - // すぐにヘッダを送信したい場合はここで c.Response().Flush() を実行する + // すぐにヘッダを送信したいので c.Response().Flush() を実行する c.Response().Flush() ctx := c.Request().Context() From d49c28f4071701fcf77907a5c4b0d2ade7e28c45 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Mon, 2 Dec 2024 16:57:33 +0900 Subject: [PATCH 22/22] =?UTF-8?q?=E3=83=AD=E3=82=B0=E3=83=AC=E3=83=99?= =?UTF-8?q?=E3=83=AB=E3=82=92=E5=A4=89=E6=9B=B4=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/handler.go b/handler.go index 9f571f1..9586524 100644 --- a/handler.go +++ b/handler.go @@ -171,7 +171,7 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte retry: select { case <-retryTimer.C: - zlog.Info(). + zlog.Debug(). Err(err). Str("channel_id", h.SoraChannelID). Str("connection_id", h.SoraConnectionID). @@ -184,7 +184,7 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte goto retry } retryTimer.Stop() - zlog.Info(). + zlog.Debug(). Err(err). Str("channel_id", h.SoraChannelID). Str("connection_id", h.SoraConnectionID).