diff --git a/CHANGES.md b/CHANGES.md index a75760a..8a7175e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,22 @@ ## develop +- [CHANGE] retry 設定を削除し、リトライ回数を指定する max_retry 設定を追加する + - リトライしない場合は、max_retry を設定ファイルから削除するか、または、max_retry = 0 を設定する + - デフォルト値: 0 (リトライ無し) + - @Hexa +- [ADD] サービス接続時のエラーによるリトライまでの時間間隔を指定する retry_interval_ms 設定(ミリ秒間隔)を追加する + - デフォルト値: 100 (100 ms) + - @Hexa +- [ADD] サービス接続時の特定のエラー発生時に、リトライする仕組みを追加する + - @Hexa +- [ADD] ハンドラーにリトライ回数を管理するメソッドを追加する + - @Hexa +- [CHANGE] aws への接続時に、時間をおいて再接続できる可能性がある HTTP ステータスコードが 429 の応答の場合は、指定されたリトライ設定に応じて、再接続を試みるように変更する + - @Hexa +- [CHANGE] aws、または、gcp への接続後にリトライ回数が max_retry を超えた場合は、{"type": "error", "reason": string} をクライアントへ送信する + - @Hexa + ## 2024.1.0 diff --git a/amazon_transcribe.go b/amazon_transcribe.go index 0b0e4da..ce61cbc 100644 --- a/amazon_transcribe.go +++ b/amazon_transcribe.go @@ -99,9 +99,16 @@ func (at *AmazonTranscribe) Start(ctx context.Context, r io.Reader) (*transcribe if reqErr, ok := err.(awserr.RequestFailure); ok { code := reqErr.StatusCode() message := reqErr.Message() + + var retry bool + if code == http.StatusTooManyRequests { + retry = true + } + return nil, &SuzuError{ Code: code, Message: message, + Retry: retry, } } return nil, err diff --git a/amazon_transcribe_handler.go b/amazon_transcribe_handler.go index 32214b2..dd3717d 100644 --- a/amazon_transcribe_handler.go +++ b/amazon_transcribe_handler.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "io" + "sync" "github.com/aws/aws-sdk-go/service/transcribestreamingservice" zlog "github.com/rs/zerolog/log" @@ -22,6 +23,8 @@ type AmazonTranscribeHandler struct { SampleRate uint32 ChannelCount uint16 LanguageCode string + RetryCount int + mu sync.Mutex OnResultFunc func(context.Context, io.WriteCloser, string, string, string, any) error } @@ -34,6 +37,7 @@ func NewAmazonTranscribeHandler(config Config, channelID, connectionID string, s SampleRate: sampleRate, ChannelCount: channelCount, LanguageCode: languageCode, + RetryCount: 0, OnResultFunc: onResultFunc.(func(context.Context, io.WriteCloser, string, string, string, any) error), } } @@ -67,6 +71,24 @@ func (ar *AwsResult) SetMessage(message string) *AwsResult { return ar } +func (h *AmazonTranscribeHandler) UpdateRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount++ + return h.RetryCount +} + +func (h *AmazonTranscribeHandler) GetRetryCount() int { + return h.RetryCount +} + +func (h *AmazonTranscribeHandler) ResetRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount = 0 + return h.RetryCount +} + func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) { at := NewAmazonTranscribe(h.Config, h.LanguageCode, int64(h.SampleRate), int64(h.ChannelCount)) @@ -153,17 +175,18 @@ func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader) } if err := stream.Err(); err != nil { + zlog.Error(). + Err(err). + Str("channel_id", h.ChannelID). + Str("connection_id", h.ConnectionID). + Int("retry_count", h.GetRetryCount()). + Send() + // 復帰が不可能なエラー以外は再接続を試みる switch err.(type) { case *transcribestreamingservice.LimitExceededException: - zlog.Error(). - Err(err). - Str("channel_id", h.ChannelID). - Str("connection_id", h.ConnectionID). - Send() - - // リトライしない設定の場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する - if !*at.Config.Retry { + // リトライしない設定の場合、または、max_retry を超えた場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する + if (at.Config.MaxRetry < 1) || (at.Config.MaxRetry <= h.GetRetryCount()) { if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil { zlog.Error(). Err(err). diff --git a/config.go b/config.go index 4fee353..45e47e8 100644 --- a/config.go +++ b/config.go @@ -27,6 +27,9 @@ const ( // 100ms DefaultTimeToWaitForOpusPacketMs = 100 + + // リトライ間隔 100ms + DefaultRetryIntervalMs = 100 ) type Config struct { @@ -46,7 +49,8 @@ type Config struct { HTTP2MaxReadFrameSize uint32 `ini:"http2_max_read_frame_size"` HTTP2IdleTimeout uint32 `ini:"http2_idle_timeout"` - Retry *bool `ini:"retry"` + MaxRetry int `ini:"max_retry"` + RetryIntervalMs int `ini:"retry_interval_ms"` ExporterHTTPS bool `ini:"exporter_https"` ExporterListenAddr string `ini:"exporter_listen_addr"` @@ -160,12 +164,11 @@ func setDefaultsConfig(config *Config) { config.TimeToWaitForOpusPacketMs = DefaultTimeToWaitForOpusPacketMs } - // 未指定の場合は true - if config.Retry == nil { - defaultRetry := true - config.Retry = &defaultRetry + if config.RetryIntervalMs == 0 { + config.RetryIntervalMs = DefaultRetryIntervalMs } } + func validateConfig(config *Config) error { var err error // アドレスとして正しいことを確認する @@ -213,4 +216,6 @@ func ShowConfig(config *Config) { zlog.Info().Str("exporter_listen_addr", config.ExporterListenAddr).Msg("CONF") zlog.Info().Int("exporter_listen_port", config.ExporterListenPort).Msg("CONF") + zlog.Info().Int("max_retry", config.MaxRetry).Msg("CONF") + zlog.Info().Int("retry_interval_ms", config.RetryIntervalMs).Msg("CONF") } diff --git a/config_example.ini b/config_example.ini index be8bf5b..90ee00d 100644 --- a/config_example.ini +++ b/config_example.ini @@ -44,8 +44,10 @@ audio_channel_count = 1 # 受信した音声データの保存先ファイルです dump_file = ./dump.jsonl -# サーバからの切断時に再接続を試みます -retry = true +# サーバからの切断時またはハンドラー個別で指定した条件でのリトライ回数を指定します +max_retry = 0 +# リトライ間隔(ミリ秒)です +retry_interval_ms = 100 # aws の場合は IsPartial が false, gcp の場合は IsFinal が true の場合の最終的な結果のみを返す指定 final_result_only = true diff --git a/errors.go b/errors.go index 34a3551..ce7ecd8 100644 --- a/errors.go +++ b/errors.go @@ -3,8 +3,13 @@ package suzu type SuzuError struct { Code int Message string + Retry bool } func (e *SuzuError) Error() string { return e.Message } + +func (e *SuzuError) IsRetry() bool { + return e.Retry +} diff --git a/handler.go b/handler.go index c348a42..cec4245 100644 --- a/handler.go +++ b/handler.go @@ -125,14 +125,13 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte return echo.NewHTTPError(http.StatusInternalServerError) } - retryCount := 0 - // サーバへの接続・結果の送信処理 // サーバへの再接続が期待できる限りは、再接続を試みる for { zlog.Info(). Str("channel_id", h.SoraChannelID). Str("connection_id", h.SoraConnectionID). + Int("retry_count", serviceHandler.GetRetryCount()). Msg("NEW-REQUEST") reader, err := serviceHandler.Handle(ctx, r) @@ -143,10 +142,20 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte Str("connection_id", h.SoraConnectionID). Send() if err, ok := err.(*SuzuError); ok { + if err.IsRetry() { + if s.config.MaxRetry > serviceHandler.GetRetryCount() { + serviceHandler.UpdateRetryCount() + + // 連続のリトライを避けるために少し待つ + time.Sleep(time.Duration(s.config.RetryIntervalMs) * time.Millisecond) + + // リトライ対象のエラーのため、クライアントとの接続は切らずにリトライする + continue + } + } // SuzuError の場合はその Status Code を返す return c.NoContent(err.Code) } - // SuzuError 以外の場合は 500 を返す return echo.NewHTTPError(http.StatusInternalServerError, err) } @@ -168,37 +177,42 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte Send() return err } else if errors.Is(err, ErrServerDisconnected) { - if *s.config.Retry { - // サーバから切断されたが再度接続できる可能性があるため、接続を試みる - retryCount += 1 - - zlog.Debug(). + if s.config.MaxRetry < 1 { + // サーバから切断されたが再接続させない設定の場合 + zlog.Error(). Err(err). Str("channel_id", h.SoraChannelID). Str("connection_id", h.SoraConnectionID). - Int("retry_count", retryCount). Send() + return err + } + + if s.config.MaxRetry > serviceHandler.GetRetryCount() { + // サーバから切断されたが再度接続できる可能性があるため、接続を試みる + + serviceHandler.UpdateRetryCount() + + // TODO: 必要な場合は連続のリトライを避けるために少し待つ処理を追加する + break } else { - // サーバから切断されたが再接続させない設定の場合 zlog.Error(). Err(err). Str("channel_id", h.SoraChannelID). Str("connection_id", h.SoraConnectionID). Send() - return err + // max_retry を超えた場合は終了 + return c.NoContent(http.StatusOK) } } - zlog.Error(). - Err(err). - Str("channel_id", h.SoraChannelID). - Str("connection_id", h.SoraConnectionID). - Send() // サーバから切断されたが再度の接続が期待できない場合 return err } + // 1 度でも接続結果を受け取れた場合はリトライ回数をリセットする + serviceHandler.ResetRetryCount() + // メッセージが空でない場合はクライアントに結果を送信する if n > 0 { if _, err := c.Response().Write(buf[:n]); err != nil { diff --git a/packet_dump_handler.go b/packet_dump_handler.go index 3d8b1a7..1ed7e94 100644 --- a/packet_dump_handler.go +++ b/packet_dump_handler.go @@ -5,6 +5,7 @@ import ( "encoding/json" "io" "os" + "sync" "time" ) @@ -20,6 +21,8 @@ type PacketDumpHandler struct { SampleRate uint32 ChannelCount uint16 LanguageCode string + RetryCount int + mu sync.Mutex OnResultFunc func(context.Context, io.WriteCloser, string, string, string, any) error } @@ -46,6 +49,24 @@ type PacketDumpResult struct { Payload []byte `json:"payload"` } +func (h *PacketDumpHandler) UpdateRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount++ + return h.RetryCount +} + +func (h *PacketDumpHandler) GetRetryCount() int { + return h.RetryCount +} + +func (h *PacketDumpHandler) ResetRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount = 0 + return h.RetryCount +} + func (h *PacketDumpHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) { c := h.Config filename := c.DumpFile diff --git a/service_handler.go b/service_handler.go index 0064f9f..0a8b722 100644 --- a/service_handler.go +++ b/service_handler.go @@ -16,6 +16,9 @@ var ( type serviceHandlerInterface interface { Handle(context.Context, io.Reader) (*io.PipeReader, error) + UpdateRetryCount() int + GetRetryCount() int + ResetRetryCount() int } type newServiceHandlerFunc func(Config, string, string, uint32, uint16, string, any) serviceHandlerInterface diff --git a/speech_to_text_handler.go b/speech_to_text_handler.go index e49dea0..5c85fd6 100644 --- a/speech_to_text_handler.go +++ b/speech_to_text_handler.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "strings" + "sync" zlog "github.com/rs/zerolog/log" @@ -25,6 +26,8 @@ type SpeechToTextHandler struct { SampleRate uint32 ChannelCount uint16 LanguageCode string + RetryCount int + mu sync.Mutex OnResultFunc func(context.Context, io.WriteCloser, string, string, string, any) error } @@ -70,6 +73,24 @@ func (gr *GcpResult) SetMessage(message string) *GcpResult { return gr } +func (h *SpeechToTextHandler) UpdateRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount++ + return h.RetryCount +} + +func (h *SpeechToTextHandler) GetRetryCount() int { + return h.RetryCount +} + +func (h *SpeechToTextHandler) ResetRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount = 0 + return h.RetryCount +} + func (h *SpeechToTextHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) { stt := NewSpeechToText(h.Config, h.LanguageCode, int32(h.SampleRate), int32(h.ChannelCount)) @@ -141,8 +162,8 @@ func (h *SpeechToTextHandler) Handle(ctx context.Context, reader io.Reader) (*io Int32("code", status.GetCode()). Send() - // リトライしない設定の場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する - if !*stt.Config.Retry { + // リトライしない設定の場合、または、max_retry を超えた場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する + if (stt.Config.MaxRetry < 1) || (stt.Config.MaxRetry <= h.GetRetryCount()) { if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil { zlog.Error(). Err(err). diff --git a/test_handler.go b/test_handler.go index a8c3912..e264439 100644 --- a/test_handler.go +++ b/test_handler.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "sync" zlog "github.com/rs/zerolog/log" ) @@ -21,6 +22,8 @@ type TestHandler struct { SampleRate uint32 ChannelCount uint16 LanguageCode string + RetryCount int + mu sync.Mutex OnResultFunc func(context.Context, io.WriteCloser, string, string, string, any) error } @@ -52,6 +55,24 @@ func NewTestResult(channelID, message string) TestResult { } } +func (h *TestHandler) UpdateRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount++ + return h.RetryCount +} + +func (h *TestHandler) GetRetryCount() int { + return h.RetryCount +} + +func (h *TestHandler) ResetRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount = 0 + return h.RetryCount +} + func (h *TestHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) { r, w := io.Pipe() diff --git a/test_handler_test.go b/test_handler_test.go index 57541ff..506a6c4 100644 --- a/test_handler_test.go +++ b/test_handler_test.go @@ -243,17 +243,6 @@ func TestSpeechHandler(t *testing.T) { }) t.Run("packet read error", func(t *testing.T) { - logger := log.Logger - defer func() { - log.Logger = logger - }() - - pr, pw, err := os.Pipe() - if err != nil { - t.Fatal(err) - } - log.Logger = zerolog.New(pw).With().Caller().Timestamp().Logger() - r := iotest.ErrReader(errors.New("packet read error")) e := echo.New() @@ -271,14 +260,11 @@ func TestSpeechHandler(t *testing.T) { assert.Equal(t, "packet read error", err.Error()) } - pw.Close() - - var buf bytes.Buffer - n, err := buf.ReadFrom(pr) + line, err := rec.Body.ReadBytes([]byte("\n")[0]) if err != nil { t.Fatal(err) } - assert.Contains(t, buf.String()[:n], "packet read error") + assert.Contains(t, string(line), "packet read error") }) t.Run("silent packet", func(t *testing.T) {