Skip to content

Commit

Permalink
channel でエラーを伝搬させる
Browse files Browse the repository at this point in the history
  • Loading branch information
Hexa committed Nov 29, 2024
1 parent a834f61 commit e250093
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 18 deletions.
2 changes: 1 addition & 1 deletion amazon_transcribe_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (h *AmazonTranscribeHandler) ResetRetryCount() int {
return h.RetryCount
}

func (h *AmazonTranscribeHandler) Handle(ctx context.Context, opusCh chan []byte) (*io.PipeReader, error) {
func (h *AmazonTranscribeHandler) Handle(ctx context.Context, opusCh chan opusChannel) (*io.PipeReader, error) {
at := NewAmazonTranscribe(h.Config, h.LanguageCode, int64(h.SampleRate), int64(h.ChannelCount))

packetReader := opus2ogg(ctx, opusCh, h.SampleRate, h.ChannelCount, h.Config)
Expand Down
52 changes: 39 additions & 13 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,25 +424,34 @@ func readPacketWithHeader(reader io.Reader) io.Reader {
return r
}

func readOpus(ctx context.Context, reader io.Reader) chan []byte {
opusCh := make(chan []byte)
func readOpus(ctx context.Context, reader io.Reader) chan opusChannel {
opusCh := make(chan opusChannel)

go func() {
defer close(opusCh)

for {
select {
case <-ctx.Done():
opusCh <- opusChannel{
Error: ctx.Err(),
}
return
default:
buf := make([]byte, FrameSize)
n, err := reader.Read(buf)
if err != nil {
opusCh <- opusChannel{
Error: err,
}
return
}

if n > 0 {
opusCh <- buf[:n]
opusCh <- opusChannel{
Payload: buf[:n],
}

}
}
}
Expand All @@ -451,7 +460,7 @@ func readOpus(ctx context.Context, reader io.Reader) chan []byte {
return opusCh
}

func opus2ogg(ctx context.Context, opusCh chan []byte, sampleRate uint32, channelCount uint16, c Config) io.ReadCloser {
func opus2ogg(ctx context.Context, opusCh chan opusChannel, sampleRate uint32, channelCount uint16, c Config) io.ReadCloser {
oggReader, oggWriter := io.Pipe()

go func() {
Expand All @@ -467,20 +476,25 @@ func opus2ogg(ctx context.Context, opusCh chan []byte, sampleRate uint32, channe
case <-ctx.Done():
oggWriter.CloseWithError(ctx.Err())
return
case buf, ok := <-opusCh:
case opus, ok := <-opusCh:
if !ok {
oggWriter.CloseWithError(fmt.Errorf("channel closed"))
oggWriter.CloseWithError(fmt.Errorf("channel closed1"))
return
}

opus := codecs.OpusPacket{}
_, err := opus.Unmarshal(buf)
if err := opus.Error; err != nil {
oggWriter.CloseWithError(err)
return
}

opusPacket := codecs.OpusPacket{}
_, err := opusPacket.Unmarshal(opus.Payload)
if err != nil {
oggWriter.CloseWithError(err)
return
}

if err := o.Write(&opus); err != nil {
if err := o.Write(&opusPacket); err != nil {
oggWriter.CloseWithError(err)
return
}
Expand Down Expand Up @@ -591,7 +605,13 @@ func silentPacket(audioStreamingHeader bool) []byte {
return packet
}

func channelToIOReadCloser(ctx context.Context, ch chan []byte) io.ReadCloser {
type opusChannel struct {
Payload []byte
Error error
}

// func channelToIOReadCloser(ctx context.Context, ch chan []byte) io.ReadCloser {
func channelToIOReadCloser(ctx context.Context, ch chan opusChannel) io.ReadCloser {
r, w := io.Pipe()

go func() {
Expand All @@ -602,12 +622,18 @@ func channelToIOReadCloser(ctx context.Context, ch chan []byte) io.ReadCloser {
case <-ctx.Done():
w.CloseWithError(ctx.Err())
return
case buf, ok := <-ch:
case opus, ok := <-ch:
if !ok {
w.CloseWithError(fmt.Errorf("channel closed"))
w.CloseWithError(io.EOF)
return
}

if err := opus.Error; err != nil {
w.CloseWithError(err)
return
}
if _, err := w.Write(buf); err != nil {

if _, err := w.Write(opus.Payload); err != nil {
w.CloseWithError(err)
return
}
Expand Down
2 changes: 1 addition & 1 deletion packet_dump_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (h *PacketDumpHandler) ResetRetryCount() int {
return h.RetryCount
}

func (h *PacketDumpHandler) Handle(ctx context.Context, opusCh chan []byte) (*io.PipeReader, error) {
func (h *PacketDumpHandler) Handle(ctx context.Context, opusCh chan opusChannel) (*io.PipeReader, error) {
c := h.Config
filename := c.DumpFile
channelID := h.ChannelID
Expand Down
2 changes: 1 addition & 1 deletion service_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var (
)

type serviceHandlerInterface interface {
Handle(context.Context, chan []byte) (*io.PipeReader, error)
Handle(context.Context, chan opusChannel) (*io.PipeReader, error)
UpdateRetryCount() int
GetRetryCount() int
ResetRetryCount() int
Expand Down
2 changes: 1 addition & 1 deletion speech_to_text_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (h *SpeechToTextHandler) ResetRetryCount() int {
return h.RetryCount
}

func (h *SpeechToTextHandler) Handle(ctx context.Context, opusCh chan []byte) (*io.PipeReader, error) {
func (h *SpeechToTextHandler) Handle(ctx context.Context, opusCh chan opusChannel) (*io.PipeReader, error) {
stt := NewSpeechToText(h.Config, h.LanguageCode, int32(h.SampleRate), int32(h.ChannelCount))

packetReader := opus2ogg(ctx, opusCh, h.SampleRate, h.ChannelCount, h.Config)
Expand Down
2 changes: 1 addition & 1 deletion test_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (h *TestHandler) ResetRetryCount() int {
return h.RetryCount
}

func (h *TestHandler) Handle(ctx context.Context, opusCh chan []byte) (*io.PipeReader, error) {
func (h *TestHandler) Handle(ctx context.Context, opusCh chan opusChannel) (*io.PipeReader, error) {
r, w := io.Pipe()

reader := channelToIOReadCloser(ctx, opusCh)
Expand Down

0 comments on commit e250093

Please sign in to comment.