Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

リトライ処理の改善 #172

Merged
merged 15 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,20 @@

## develop

- [CHANGE] retry 設定を削除し、リトライ回数を指定する max_retry 設定を追加する
- リトライしない場合は、max_retry を設定ファイルから削除するか、または、max_retry = 0 を設定する
voluntas marked this conversation as resolved.
Show resolved Hide resolved
- @Hexa
- [ADD] サービス接続時のエラーによるリトライまでの時間間隔を指定する retry_interval_ms 設定(ミリ秒間隔)を追加する
voluntas marked this conversation as resolved.
Show resolved Hide resolved
- @Hexa
- [ADD] サービス接続時の特定のエラー発生時に、リトライする仕組みを追加する
- @Hexa
- [ADD] ハンドラーにリトライ回数を管理するメソッドを追加する
- @Hexa
- [CHANGE] aws への接続時に HTTP ステータスコードが 429 の応答の場合は、指定されたリトライ設定に応じて、再接続を試みるように変更する
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

少し解説追加してほしい、なぜ 429 なのかを簡単にでいいので

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fa9448e

こちらで追記しました

- @Hexa
- [CHANGE] aws、または、gcp への接続後にリトライ回数が max_retry を超えた場合は、{"type": "error", "reason": string} をクライアントへ送信する
- @Hexa


## 2024.1.0

Expand Down
7 changes: 7 additions & 0 deletions amazon_transcribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,16 @@ func (at *AmazonTranscribe) Start(ctx context.Context, r io.Reader) (*transcribe
if reqErr, ok := err.(awserr.RequestFailure); ok {
code := reqErr.StatusCode()
message := reqErr.Message()

var retry bool
if code == http.StatusTooManyRequests {
retry = true
}

return nil, &SuzuError{
Code: code,
Message: message,
Retry: retry,
}
}
return nil, err
Expand Down
39 changes: 31 additions & 8 deletions amazon_transcribe_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"io"
"sync"

"github.com/aws/aws-sdk-go/service/transcribestreamingservice"
zlog "github.com/rs/zerolog/log"
Expand All @@ -22,6 +23,8 @@ type AmazonTranscribeHandler struct {
SampleRate uint32
ChannelCount uint16
LanguageCode string
RetryCount int
mu sync.Mutex

OnResultFunc func(context.Context, io.WriteCloser, string, string, string, any) error
}
Expand All @@ -34,6 +37,7 @@ func NewAmazonTranscribeHandler(config Config, channelID, connectionID string, s
SampleRate: sampleRate,
ChannelCount: channelCount,
LanguageCode: languageCode,
RetryCount: 0,
OnResultFunc: onResultFunc.(func(context.Context, io.WriteCloser, string, string, string, any) error),
}
}
Expand Down Expand Up @@ -67,6 +71,24 @@ func (ar *AwsResult) SetMessage(message string) *AwsResult {
return ar
}

func (h *AmazonTranscribeHandler) UpdateRetryCount() int {
defer h.mu.Unlock()
h.mu.Lock()
h.RetryCount++
return h.RetryCount
}

func (h *AmazonTranscribeHandler) GetRetryCount() int {
return h.RetryCount
}

func (h *AmazonTranscribeHandler) ResetRetryCount() int {
defer h.mu.Unlock()
h.mu.Lock()
h.RetryCount = 0
return h.RetryCount
}

func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) {
at := NewAmazonTranscribe(h.Config, h.LanguageCode, int64(h.SampleRate), int64(h.ChannelCount))

Expand Down Expand Up @@ -153,17 +175,18 @@ func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader)
}

if err := stream.Err(); err != nil {
zlog.Error().
Err(err).
Str("channel_id", h.ChannelID).
Str("connection_id", h.ConnectionID).
Int("retry_count", h.GetRetryCount()).
Send()

// 復帰が不可能なエラー以外は再接続を試みる
switch err.(type) {
case *transcribestreamingservice.LimitExceededException:
zlog.Error().
Err(err).
Str("channel_id", h.ChannelID).
Str("connection_id", h.ConnectionID).
Send()

// リトライしない設定の場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する
if !*at.Config.Retry {
// リトライしない設定の場合、または、max_retry を超えた場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する
if (at.Config.MaxRetry < 1) || (at.Config.MaxRetry <= h.GetRetryCount()) {
if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil {
zlog.Error().
Err(err).
Expand Down
15 changes: 10 additions & 5 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ const (

// 100ms
DefaultTimeToWaitForOpusPacketMs = 100

// リトライ間隔 100ms
DefaultRetryIntervalMs = 100
)

type Config struct {
Expand All @@ -46,7 +49,8 @@ type Config struct {
HTTP2MaxReadFrameSize uint32 `ini:"http2_max_read_frame_size"`
HTTP2IdleTimeout uint32 `ini:"http2_idle_timeout"`

Retry *bool `ini:"retry"`
MaxRetry int `ini:"max_retry"`
RetryIntervalMs int `ini:"retry_interval_ms"`

ExporterHTTPS bool `ini:"exporter_https"`
ExporterListenAddr string `ini:"exporter_listen_addr"`
Expand Down Expand Up @@ -160,12 +164,11 @@ func setDefaultsConfig(config *Config) {
config.TimeToWaitForOpusPacketMs = DefaultTimeToWaitForOpusPacketMs
}

// 未指定の場合は true
if config.Retry == nil {
defaultRetry := true
config.Retry = &defaultRetry
if config.RetryIntervalMs == 0 {
config.RetryIntervalMs = DefaultRetryIntervalMs
}
}

func validateConfig(config *Config) error {
var err error
// アドレスとして正しいことを確認する
Expand Down Expand Up @@ -213,4 +216,6 @@ func ShowConfig(config *Config) {
zlog.Info().Str("exporter_listen_addr", config.ExporterListenAddr).Msg("CONF")
zlog.Info().Int("exporter_listen_port", config.ExporterListenPort).Msg("CONF")

zlog.Info().Int("max_retry", config.MaxRetry).Msg("CONF")
zlog.Info().Int("retry_interval_ms", config.RetryIntervalMs).Msg("CONF")
}
6 changes: 4 additions & 2 deletions config_example.ini
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ audio_channel_count = 1
# 受信した音声データの保存先ファイルです
dump_file = ./dump.jsonl

# サーバからの切断時に再接続を試みます
retry = true
# サーバからの切断時またはハンドラー個別で指定した条件でのリトライ回数を指定します
max_retry = 0
# リトライ間隔(ミリ秒)です
retry_interval_ms = 100

# aws の場合は IsPartial が false, gcp の場合は IsFinal が true の場合の最終的な結果のみを返す指定
final_result_only = true
Expand Down
5 changes: 5 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@ package suzu
type SuzuError struct {
Code int
Message string
Retry bool
}

func (e *SuzuError) Error() string {
return e.Message
}

func (e *SuzuError) IsRetry() bool {
return e.Retry
}
46 changes: 30 additions & 16 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,13 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte
return echo.NewHTTPError(http.StatusInternalServerError)
}

retryCount := 0

// サーバへの接続・結果の送信処理
// サーバへの再接続が期待できる限りは、再接続を試みる
for {
zlog.Info().
Str("channel_id", h.SoraChannelID).
Str("connection_id", h.SoraConnectionID).
Int("retry_count", serviceHandler.GetRetryCount()).
Msg("NEW-REQUEST")

reader, err := serviceHandler.Handle(ctx, r)
Expand All @@ -143,10 +142,20 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte
Str("connection_id", h.SoraConnectionID).
Send()
if err, ok := err.(*SuzuError); ok {
if err.IsRetry() {
if s.config.MaxRetry > serviceHandler.GetRetryCount() {
serviceHandler.UpdateRetryCount()

// 連続のリトライを避けるために少し待つ
time.Sleep(time.Duration(s.config.RetryIntervalMs) * time.Millisecond)

// リトライ対象のエラーのため、クライアントとの接続は切らずにリトライする
continue
}
}
// SuzuError の場合はその Status Code を返す
return c.NoContent(err.Code)
}

// SuzuError 以外の場合は 500 を返す
return echo.NewHTTPError(http.StatusInternalServerError, err)
}
Expand All @@ -168,37 +177,42 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte
Send()
return err
} else if errors.Is(err, ErrServerDisconnected) {
if *s.config.Retry {
// サーバから切断されたが再度接続できる可能性があるため、接続を試みる
retryCount += 1

zlog.Debug().
if s.config.MaxRetry < 1 {
// サーバから切断されたが再接続させない設定の場合
zlog.Error().
Err(err).
Str("channel_id", h.SoraChannelID).
Str("connection_id", h.SoraConnectionID).
Int("retry_count", retryCount).
Send()
return err
}

if s.config.MaxRetry > serviceHandler.GetRetryCount() {
// サーバから切断されたが再度接続できる可能性があるため、接続を試みる

serviceHandler.UpdateRetryCount()

// TODO: 必要な場合は連続のリトライを避けるために少し待つ処理を追加する

break
} else {
// サーバから切断されたが再接続させない設定の場合
zlog.Error().
Err(err).
Str("channel_id", h.SoraChannelID).
Str("connection_id", h.SoraConnectionID).
Send()
return err
// max_retry を超えた場合は終了
return c.NoContent(http.StatusOK)
}
}

zlog.Error().
Err(err).
Str("channel_id", h.SoraChannelID).
Str("connection_id", h.SoraConnectionID).
Send()
// サーバから切断されたが再度の接続が期待できない場合
return err
}

// 1 度でも接続結果を受け取れた場合はリトライ回数をリセットする
serviceHandler.ResetRetryCount()

// メッセージが空でない場合はクライアントに結果を送信する
if n > 0 {
if _, err := c.Response().Write(buf[:n]); err != nil {
Expand Down
21 changes: 21 additions & 0 deletions packet_dump_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"io"
"os"
"sync"
"time"
)

Expand All @@ -20,6 +21,8 @@ type PacketDumpHandler struct {
SampleRate uint32
ChannelCount uint16
LanguageCode string
RetryCount int
mu sync.Mutex

OnResultFunc func(context.Context, io.WriteCloser, string, string, string, any) error
}
Expand All @@ -46,6 +49,24 @@ type PacketDumpResult struct {
Payload []byte `json:"payload"`
}

func (h *PacketDumpHandler) UpdateRetryCount() int {
defer h.mu.Unlock()
h.mu.Lock()
h.RetryCount++
return h.RetryCount
}

func (h *PacketDumpHandler) GetRetryCount() int {
return h.RetryCount
}

func (h *PacketDumpHandler) ResetRetryCount() int {
defer h.mu.Unlock()
h.mu.Lock()
h.RetryCount = 0
return h.RetryCount
}

func (h *PacketDumpHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) {
c := h.Config
filename := c.DumpFile
Expand Down
3 changes: 3 additions & 0 deletions service_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ var (

type serviceHandlerInterface interface {
Handle(context.Context, io.Reader) (*io.PipeReader, error)
UpdateRetryCount() int
GetRetryCount() int
ResetRetryCount() int
}

type newServiceHandlerFunc func(Config, string, string, uint32, uint16, string, any) serviceHandlerInterface
Expand Down
25 changes: 23 additions & 2 deletions speech_to_text_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"strings"
"sync"

zlog "github.com/rs/zerolog/log"

Expand All @@ -25,6 +26,8 @@ type SpeechToTextHandler struct {
SampleRate uint32
ChannelCount uint16
LanguageCode string
RetryCount int
mu sync.Mutex

OnResultFunc func(context.Context, io.WriteCloser, string, string, string, any) error
}
Expand Down Expand Up @@ -70,6 +73,24 @@ func (gr *GcpResult) SetMessage(message string) *GcpResult {
return gr
}

func (h *SpeechToTextHandler) UpdateRetryCount() int {
defer h.mu.Unlock()
h.mu.Lock()
h.RetryCount++
return h.RetryCount
}

func (h *SpeechToTextHandler) GetRetryCount() int {
return h.RetryCount
}

func (h *SpeechToTextHandler) ResetRetryCount() int {
defer h.mu.Unlock()
h.mu.Lock()
h.RetryCount = 0
return h.RetryCount
}

func (h *SpeechToTextHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) {
stt := NewSpeechToText(h.Config, h.LanguageCode, int32(h.SampleRate), int32(h.ChannelCount))

Expand Down Expand Up @@ -141,8 +162,8 @@ func (h *SpeechToTextHandler) Handle(ctx context.Context, reader io.Reader) (*io
Int32("code", status.GetCode()).
Send()

// リトライしない設定の場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する
if !*stt.Config.Retry {
// リトライしない設定の場合、または、max_retry を超えた場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する
if (stt.Config.MaxRetry < 1) || (stt.Config.MaxRetry <= h.GetRetryCount()) {
if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil {
zlog.Error().
Err(err).
Expand Down
Loading
Loading