Skip to content

Commit

Permalink
- リトライ時に ctx を cancel する
Browse files Browse the repository at this point in the history
- パケット読み込みでブロックしているため、ogg 変換と処理を分ける
  • Loading branch information
Hexa committed Nov 21, 2024
1 parent c0035de commit d94ffaa
Showing 1 changed file with 49 additions and 9 deletions.
58 changes: 49 additions & 9 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte
Int("retry_count", serviceHandler.GetRetryCount()).
Msg("NEW-REQUEST")

// リトライ時にこれ以降の処理のみを cancel する
ctx, cancel := context.WithCancel(ctx)
defer cancel()

reader, err := serviceHandler.Handle(ctx, r)
if err != nil {
zlog.Error().
Expand Down Expand Up @@ -196,7 +200,13 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte
serviceHandler.UpdateRetryCount()

// TODO: 必要な場合は連続のリトライを避けるために少し待つ処理を追加する
zlog.Debug().Err(err).
Str("channel_id", h.SoraChannelID).
Str("connection_id", h.SoraConnectionID).
Int("retry_count", serviceHandler.GetRetryCount()).
Msg("RETRYING")

cancel()
break
} else {
zlog.Error().
Expand All @@ -215,6 +225,11 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte

// 1 度でも接続結果を受け取れた場合はリトライ回数をリセットする
serviceHandler.ResetRetryCount()
zlog.Debug().
Str("channel_id", h.SoraChannelID).
Str("connection_id", h.SoraConnectionID).
Int("retry_count", serviceHandler.GetRetryCount()).
Msg("RESET_RETRY_COUNT")

// メッセージが空でない場合はクライアントに結果を送信する
if n > 0 {
Expand Down Expand Up @@ -336,19 +351,44 @@ func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sa
r = opusReader
}

for {
buf := make([]byte, FrameSize)
n, err := r.Read(buf)
if err != nil {
if w, ok := oggWriter.(*io.PipeWriter); ok {
w.CloseWithError(err)
ch := make(chan []byte)

go func() {
defer close(ch)

for {
buf := make([]byte, FrameSize)
n, err := r.Read(buf)
if err != nil {
if w, ok := oggWriter.(*io.PipeWriter); ok {
w.CloseWithError(err)
}
return
}

if n > 0 {
ch <- buf[:n]
}
return err
}
}()

for {
select {
case <-ctx.Done():
return ctx.Err()
case buf, ok := <-ch:
if !ok {
return nil
}

if !ok {
if w, ok := oggWriter.(*io.PipeWriter); ok {
w.CloseWithError(err)
}
}

if n > 0 {
opus := codecs.OpusPacket{}
_, err := opus.Unmarshal(buf[:n])
_, err := opus.Unmarshal(buf)
if err != nil {
if w, ok := oggWriter.(*io.PipeWriter); ok {
w.CloseWithError(err)
Expand Down

0 comments on commit d94ffaa

Please sign in to comment.