Skip to content

Commit

Permalink
Merge pull request #192 from shiguredo/feature/fix-retry-processing
Browse files Browse the repository at this point in the history
リトライ処理の改善
  • Loading branch information
Hexa authored Dec 3, 2024
2 parents f0c043b + d49c28f commit e7fb3ba
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 162 deletions.
7 changes: 7 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@

## develop

- [FIX] サービスへの接続が成功してもリトライカウントがリセットされない不具合を修正する
- @Hexa
- [FIX] 解析結果だけでなくエラーメッセージの送信時にもリトライカウントをリセットしていたため、リトライ処理によってカウントがリセットされていた不具合を修正する
- @Hexa
- [FIX] リトライ待ち時にクライアントから切断しようとすると、リトライ待ちで処理がブロックされているため切断までに時間がかかる不具合を修正する
- @Hexa

### misc

## 2024.6.0
Expand Down
5 changes: 4 additions & 1 deletion amazon_transcribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/transcribestreamingservice"
zlog "github.com/rs/zerolog/log"
)

type AmazonTranscribe struct {
Expand Down Expand Up @@ -89,7 +90,7 @@ func NewAmazonTranscribeClient(config Config) *transcribestreamingservice.Transc
return transcribestreamingservice.New(sess, cfg)
}

func (at *AmazonTranscribe) Start(ctx context.Context, r io.Reader) (*transcribestreamingservice.StartStreamTranscriptionEventStream, error) {
func (at *AmazonTranscribe) Start(ctx context.Context, r io.ReadCloser) (*transcribestreamingservice.StartStreamTranscriptionEventStream, error) {
config := at.Config
client := NewAmazonTranscribeClient(config)
input := NewStartStreamTranscriptionInput(at)
Expand Down Expand Up @@ -117,9 +118,11 @@ func (at *AmazonTranscribe) Start(ctx context.Context, r io.Reader) (*transcribe
stream := resp.GetStream()

go func() {
defer r.Close()
defer stream.Close()

if err := transcribestreamingservice.StreamAudioFromReader(ctx, stream, FrameSize, r); err != nil {
zlog.Error().Err(err).Send()
return
}
}()
Expand Down
46 changes: 7 additions & 39 deletions amazon_transcribe_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,31 +95,19 @@ func (h *AmazonTranscribeHandler) ResetRetryCount() int {
return h.RetryCount
}

func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) {
func (h *AmazonTranscribeHandler) Handle(ctx context.Context, opusCh chan opusChannel) (*io.PipeReader, error) {
at := NewAmazonTranscribe(h.Config, h.LanguageCode, int64(h.SampleRate), int64(h.ChannelCount))

oggReader, oggWriter := io.Pipe()
go func() {
defer oggWriter.Close()
if err := opus2ogg(ctx, reader, oggWriter, h.SampleRate, h.ChannelCount, h.Config); err != nil {
if !errors.Is(err, io.EOF) {
zlog.Error().
Err(err).
Str("channel_id", h.ChannelID).
Str("connection_id", h.ConnectionID).
Send()
}

oggWriter.CloseWithError(err)
return
}
}()
packetReader := opus2ogg(ctx, opusCh, h.SampleRate, h.ChannelCount, h.Config)

stream, err := at.Start(ctx, oggReader)
stream, err := at.Start(ctx, packetReader)
if err != nil {
return nil, err
}

// リクエストが成功した時点でリトライカウントをリセットする
h.ResetRetryCount()

r, w := io.Pipe()

go func() {
Expand Down Expand Up @@ -195,33 +183,13 @@ func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader)
switch err.(type) {
case *transcribestreamingservice.LimitExceededException,
*transcribestreamingservice.InternalFailureException:
// リトライしない設定の場合、または、max_retry を超えた場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する
if (at.Config.MaxRetry < 1) || (at.Config.MaxRetry <= h.GetRetryCount()) {
if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil {
zlog.Error().
Err(err).
Str("channel_id", h.ChannelID).
Str("connection_id", h.ConnectionID).
Send()
}
}

err = ErrServerDisconnected
err = errors.Join(err, ErrServerDisconnected)
default:
// 再接続を想定している以外のエラーの場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する
if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil {
zlog.Error().
Err(err).
Str("channel_id", h.ChannelID).
Str("connection_id", h.ConnectionID).
Send()
}
}

w.CloseWithError(err)
return
}

w.Close()
}()

Expand Down
Loading

0 comments on commit e7fb3ba

Please sign in to comment.