From e2500932d29c71e215487eecb9ff6f40a2133361 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Fri, 29 Nov 2024 15:42:10 +0900 Subject: [PATCH] =?UTF-8?q?channel=20=E3=81=A7=E3=82=A8=E3=83=A9=E3=83=BC?= =?UTF-8?q?=E3=82=92=E4=BC=9D=E6=90=AC=E3=81=95=E3=81=9B=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- amazon_transcribe_handler.go | 2 +- handler.go | 52 +++++++++++++++++++++++++++--------- packet_dump_handler.go | 2 +- service_handler.go | 2 +- speech_to_text_handler.go | 2 +- test_handler.go | 2 +- 6 files changed, 44 insertions(+), 18 deletions(-) diff --git a/amazon_transcribe_handler.go b/amazon_transcribe_handler.go index 0531ce9..3080874 100644 --- a/amazon_transcribe_handler.go +++ b/amazon_transcribe_handler.go @@ -95,7 +95,7 @@ func (h *AmazonTranscribeHandler) ResetRetryCount() int { return h.RetryCount } -func (h *AmazonTranscribeHandler) Handle(ctx context.Context, opusCh chan []byte) (*io.PipeReader, error) { +func (h *AmazonTranscribeHandler) Handle(ctx context.Context, opusCh chan opusChannel) (*io.PipeReader, error) { at := NewAmazonTranscribe(h.Config, h.LanguageCode, int64(h.SampleRate), int64(h.ChannelCount)) packetReader := opus2ogg(ctx, opusCh, h.SampleRate, h.ChannelCount, h.Config) diff --git a/handler.go b/handler.go index 9bee125..7f4f018 100644 --- a/handler.go +++ b/handler.go @@ -424,8 +424,8 @@ func readPacketWithHeader(reader io.Reader) io.Reader { return r } -func readOpus(ctx context.Context, reader io.Reader) chan []byte { - opusCh := make(chan []byte) +func readOpus(ctx context.Context, reader io.Reader) chan opusChannel { + opusCh := make(chan opusChannel) go func() { defer close(opusCh) @@ -433,16 +433,25 @@ func readOpus(ctx context.Context, reader io.Reader) chan []byte { for { select { case <-ctx.Done(): + opusCh <- opusChannel{ + Error: ctx.Err(), + } return default: buf := make([]byte, FrameSize) n, err := reader.Read(buf) if err != nil { + opusCh <- opusChannel{ + Error: err, + } return } if n > 0 { - opusCh <- buf[:n] + opusCh <- opusChannel{ + Payload: buf[:n], + } + } } } @@ -451,7 +460,7 @@ func readOpus(ctx context.Context, reader io.Reader) chan []byte { return opusCh } -func opus2ogg(ctx context.Context, opusCh chan []byte, sampleRate uint32, channelCount uint16, c Config) io.ReadCloser { +func opus2ogg(ctx context.Context, opusCh chan opusChannel, sampleRate uint32, channelCount uint16, c Config) io.ReadCloser { oggReader, oggWriter := io.Pipe() go func() { @@ -467,20 +476,25 @@ func opus2ogg(ctx context.Context, opusCh chan []byte, sampleRate uint32, channe case <-ctx.Done(): oggWriter.CloseWithError(ctx.Err()) return - case buf, ok := <-opusCh: + case opus, ok := <-opusCh: if !ok { - oggWriter.CloseWithError(fmt.Errorf("channel closed")) + oggWriter.CloseWithError(fmt.Errorf("channel closed1")) return } - opus := codecs.OpusPacket{} - _, err := opus.Unmarshal(buf) + if err := opus.Error; err != nil { + oggWriter.CloseWithError(err) + return + } + + opusPacket := codecs.OpusPacket{} + _, err := opusPacket.Unmarshal(opus.Payload) if err != nil { oggWriter.CloseWithError(err) return } - if err := o.Write(&opus); err != nil { + if err := o.Write(&opusPacket); err != nil { oggWriter.CloseWithError(err) return } @@ -591,7 +605,13 @@ func silentPacket(audioStreamingHeader bool) []byte { return packet } -func channelToIOReadCloser(ctx context.Context, ch chan []byte) io.ReadCloser { +type opusChannel struct { + Payload []byte + Error error +} + +// func channelToIOReadCloser(ctx context.Context, ch chan []byte) io.ReadCloser { +func channelToIOReadCloser(ctx context.Context, ch chan opusChannel) io.ReadCloser { r, w := io.Pipe() go func() { @@ -602,12 +622,18 @@ func channelToIOReadCloser(ctx context.Context, ch chan []byte) io.ReadCloser { case <-ctx.Done(): w.CloseWithError(ctx.Err()) return - case buf, ok := <-ch: + case opus, ok := <-ch: if !ok { - w.CloseWithError(fmt.Errorf("channel closed")) + w.CloseWithError(io.EOF) + return + } + + if err := opus.Error; err != nil { + w.CloseWithError(err) return } - if _, err := w.Write(buf); err != nil { + + if _, err := w.Write(opus.Payload); err != nil { w.CloseWithError(err) return } diff --git a/packet_dump_handler.go b/packet_dump_handler.go index 91f7381..1af4cb4 100644 --- a/packet_dump_handler.go +++ b/packet_dump_handler.go @@ -67,7 +67,7 @@ func (h *PacketDumpHandler) ResetRetryCount() int { return h.RetryCount } -func (h *PacketDumpHandler) Handle(ctx context.Context, opusCh chan []byte) (*io.PipeReader, error) { +func (h *PacketDumpHandler) Handle(ctx context.Context, opusCh chan opusChannel) (*io.PipeReader, error) { c := h.Config filename := c.DumpFile channelID := h.ChannelID diff --git a/service_handler.go b/service_handler.go index c836aa7..05a3cf2 100644 --- a/service_handler.go +++ b/service_handler.go @@ -15,7 +15,7 @@ var ( ) type serviceHandlerInterface interface { - Handle(context.Context, chan []byte) (*io.PipeReader, error) + Handle(context.Context, chan opusChannel) (*io.PipeReader, error) UpdateRetryCount() int GetRetryCount() int ResetRetryCount() int diff --git a/speech_to_text_handler.go b/speech_to_text_handler.go index e23cb1f..7d0649f 100644 --- a/speech_to_text_handler.go +++ b/speech_to_text_handler.go @@ -91,7 +91,7 @@ func (h *SpeechToTextHandler) ResetRetryCount() int { return h.RetryCount } -func (h *SpeechToTextHandler) Handle(ctx context.Context, opusCh chan []byte) (*io.PipeReader, error) { +func (h *SpeechToTextHandler) Handle(ctx context.Context, opusCh chan opusChannel) (*io.PipeReader, error) { stt := NewSpeechToText(h.Config, h.LanguageCode, int32(h.SampleRate), int32(h.ChannelCount)) packetReader := opus2ogg(ctx, opusCh, h.SampleRate, h.ChannelCount, h.Config) diff --git a/test_handler.go b/test_handler.go index 9fe1728..e22f3af 100644 --- a/test_handler.go +++ b/test_handler.go @@ -73,7 +73,7 @@ func (h *TestHandler) ResetRetryCount() int { return h.RetryCount } -func (h *TestHandler) Handle(ctx context.Context, opusCh chan []byte) (*io.PipeReader, error) { +func (h *TestHandler) Handle(ctx context.Context, opusCh chan opusChannel) (*io.PipeReader, error) { r, w := io.Pipe() reader := channelToIOReadCloser(ctx, opusCh)