diff --git a/CHANGES.md b/CHANGES.md index 3d10411..d0d0cc8 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,12 @@ ## develop +- [ADD] audio streaming header に対応する + - @Hexa +- [ADD] クライアントから送られてくるデータにヘッダーが付与されている場合に対応する audio_streaming_header 設定を追加する + - デフォルト値: false + - @Hexa + ### misc ## 2024.3.0 diff --git a/config.go b/config.go index e0e6e98..3b12e60 100644 --- a/config.go +++ b/config.go @@ -41,6 +41,8 @@ type Config struct { ListenAddr string `ini:"listen_addr"` ListenPort int `ini:"listen_port"` + AudioStreamingHeader bool `ini:"audio_streaming_header"` + TLSFullchainFile string `ini:"tls_fullchain_file"` TLSPrivkeyFile string `ini:"tls_privkey_file"` TLSVerifyCacertPath string `ini:"tls_verify_cacert_path"` // クライアント認証用 diff --git a/config_example.ini b/config_example.ini index 2d5b5d2..19b71db 100644 --- a/config_example.ini +++ b/config_example.ini @@ -11,6 +11,9 @@ exporter_https = false exporter_listen_addr = 0.0.0.0 exporter_listen_port = 48081 +# クライアントから受信する音声データにヘッダーが含まれている想定かどうかです +audio_streaming_header = false + # Suzu のサーバ証明書ファイルです # tls_fullchain_file = # Suzu の秘密鍵ファイルです diff --git a/handler.go b/handler.go index cec4245..abe6eab 100644 --- a/handler.go +++ b/handler.go @@ -2,6 +2,7 @@ package suzu import ( "context" + "encoding/binary" "errors" "fmt" "io" @@ -230,6 +231,107 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte } } +func readPacketWithHeader(reader io.Reader) (io.Reader, error) { + r, w := io.Pipe() + + go func() { + length := 0 + payloadLength := 0 + var payload []byte + + for { + buf := make([]byte, 20+0xffff) + n, err := reader.Read(buf) + if err != nil { + w.CloseWithError(err) + return + } + + payload = append(payload, buf[:n]...) + length += n + + if length > 20 { + // timestamp(64), sequence number(64), length(32) + h := payload[0:20] + p := payload[20:length] + + payloadLength = int(binary.BigEndian.Uint32(h[16:20])) + + if length == (20 + payloadLength) { + if _, err := w.Write(p); err != nil { + w.CloseWithError(err) + return + } + payload = []byte{} + length = 0 + continue + } + + // payload が足りないのでさらに読み込む + if length < (20 + payloadLength) { + // 前の payload へ追加して次へ + payload = append(payload, p...) + continue + } + + // 次の frame が含まれている場合 + if length > (20 + payloadLength) { + if _, err := w.Write(p[:payloadLength]); err != nil { + w.CloseWithError(err) + return + } + // 次の payload 処理へ + payload = p[payloadLength:] + length = len(payload) + + // 次の payload がすでにある場合の処理 + for { + if length > 20 { + h = payload[0:20] + p = payload[20:length] + + payloadLength = int(binary.BigEndian.Uint32(h[16:20])) + + // すでに次の payload が全てある場合 + if length == (20 + payloadLength) { + if _, err := w.Write(p); err != nil { + w.CloseWithError(err) + return + } + payload = []byte{} + length = 0 + continue + } + + if length > (20 + payloadLength) { + if _, err := w.Write(p[:payloadLength]); err != nil { + w.CloseWithError(err) + return + } + + // 次の payload 処理へ + payload = p[payloadLength:] + length = len(payload) + continue + } + } else { + // payload が足りないので、次の読み込みへ + break + } + } + + continue + } + } else { + // ヘッダー分に足りなければ次の読み込みへ + continue + } + } + }() + + return r, nil +} + func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sampleRate uint32, channelCount uint16, c Config) error { o, err := NewWith(oggWriter, sampleRate, channelCount) if err != nil { @@ -247,15 +349,26 @@ func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sa return err } + var r io.Reader + if c.AudioStreamingHeader { + r, err = readPacketWithHeader(opusReader) + if err != nil { + return err + } + } else { + r = opusReader + } + for { buf := make([]byte, FrameSize) - n, err := opusReader.Read(buf) + n, err := r.Read(buf) if err != nil { if w, ok := oggWriter.(*io.PipeWriter); ok { w.CloseWithError(err) } return err } + if n > 0 { opus := codecs.OpusPacket{} _, err := opus.Unmarshal(buf[:n])