Skip to content

Commit

Permalink
Merge branch 'release/2024.7.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
Hexa committed Dec 3, 2024
2 parents d830370 + 11f989f commit d874179
Show file tree
Hide file tree
Showing 10 changed files with 286 additions and 163 deletions.
9 changes: 9 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@

### misc

## 2024.7.0

- [FIX] サービスへの接続が成功してもリトライカウントがリセットされない不具合を修正する
- @Hexa
- [FIX] 解析結果だけでなくエラーメッセージの送信時にもリトライカウントをリセットしていたため、リトライ処理によってカウントがリセットされていた不具合を修正する
- @Hexa
- [FIX] リトライ待ち時にクライアントから切断しようとすると、リトライ待ちで処理がブロックされているため切断までに時間がかかる不具合を修正する
- @Hexa

## 2024.6.0

- [CHANGE] aws の再接続条件の exception に InternalFailureException を追加する
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2024.6.0
2024.7.0
5 changes: 4 additions & 1 deletion amazon_transcribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/transcribestreamingservice"
zlog "github.com/rs/zerolog/log"
)

type AmazonTranscribe struct {
Expand Down Expand Up @@ -89,7 +90,7 @@ func NewAmazonTranscribeClient(config Config) *transcribestreamingservice.Transc
return transcribestreamingservice.New(sess, cfg)
}

func (at *AmazonTranscribe) Start(ctx context.Context, r io.Reader) (*transcribestreamingservice.StartStreamTranscriptionEventStream, error) {
func (at *AmazonTranscribe) Start(ctx context.Context, r io.ReadCloser) (*transcribestreamingservice.StartStreamTranscriptionEventStream, error) {
config := at.Config
client := NewAmazonTranscribeClient(config)
input := NewStartStreamTranscriptionInput(at)
Expand Down Expand Up @@ -117,9 +118,11 @@ func (at *AmazonTranscribe) Start(ctx context.Context, r io.Reader) (*transcribe
stream := resp.GetStream()

go func() {
defer r.Close()
defer stream.Close()

if err := transcribestreamingservice.StreamAudioFromReader(ctx, stream, FrameSize, r); err != nil {
zlog.Error().Err(err).Send()
return
}
}()
Expand Down
46 changes: 7 additions & 39 deletions amazon_transcribe_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,31 +95,19 @@ func (h *AmazonTranscribeHandler) ResetRetryCount() int {
return h.RetryCount
}

func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) {
func (h *AmazonTranscribeHandler) Handle(ctx context.Context, opusCh chan opusChannel) (*io.PipeReader, error) {
at := NewAmazonTranscribe(h.Config, h.LanguageCode, int64(h.SampleRate), int64(h.ChannelCount))

oggReader, oggWriter := io.Pipe()
go func() {
defer oggWriter.Close()
if err := opus2ogg(ctx, reader, oggWriter, h.SampleRate, h.ChannelCount, h.Config); err != nil {
if !errors.Is(err, io.EOF) {
zlog.Error().
Err(err).
Str("channel_id", h.ChannelID).
Str("connection_id", h.ConnectionID).
Send()
}

oggWriter.CloseWithError(err)
return
}
}()
packetReader := opus2ogg(ctx, opusCh, h.SampleRate, h.ChannelCount, h.Config)

stream, err := at.Start(ctx, oggReader)
stream, err := at.Start(ctx, packetReader)
if err != nil {
return nil, err
}

// リクエストが成功した時点でリトライカウントをリセットする
h.ResetRetryCount()

r, w := io.Pipe()

go func() {
Expand Down Expand Up @@ -195,33 +183,13 @@ func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader)
switch err.(type) {
case *transcribestreamingservice.LimitExceededException,
*transcribestreamingservice.InternalFailureException:
// リトライしない設定の場合、または、max_retry を超えた場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する
if (at.Config.MaxRetry < 1) || (at.Config.MaxRetry <= h.GetRetryCount()) {
if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil {
zlog.Error().
Err(err).
Str("channel_id", h.ChannelID).
Str("connection_id", h.ConnectionID).
Send()
}
}

err = ErrServerDisconnected
err = errors.Join(err, ErrServerDisconnected)
default:
// 再接続を想定している以外のエラーの場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する
if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil {
zlog.Error().
Err(err).
Str("channel_id", h.ChannelID).
Str("connection_id", h.ConnectionID).
Send()
}
}

w.CloseWithError(err)
return
}

w.Close()
}()

Expand Down
Loading

0 comments on commit d874179

Please sign in to comment.