Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

audio streaming header に対応する #188

Merged
merged 7 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@

## develop

- [ADD] audio streaming header に対応する
- @Hexa
- [ADD] クライアントから送られてくるデータにヘッダーが付与されている場合に対応する audio_streaming_header 設定を追加する
- デフォルト値: false
- @Hexa

### misc

## 2024.3.0
Expand Down
2 changes: 2 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Config struct {
ListenAddr string `ini:"listen_addr"`
ListenPort int `ini:"listen_port"`

AudioStreamingHeader bool `ini:"audio_streaming_header"`

TLSFullchainFile string `ini:"tls_fullchain_file"`
TLSPrivkeyFile string `ini:"tls_privkey_file"`
TLSVerifyCacertPath string `ini:"tls_verify_cacert_path"` // クライアント認証用
Expand Down
3 changes: 3 additions & 0 deletions config_example.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ exporter_https = false
exporter_listen_addr = 0.0.0.0
exporter_listen_port = 48081

# クライアントから受信する音声データにヘッダーが含まれている想定かどうかです
audio_streaming_header = false

# Suzu のサーバ証明書ファイルです
# tls_fullchain_file =
# Suzu の秘密鍵ファイルです
Expand Down
115 changes: 114 additions & 1 deletion handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package suzu

import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -230,6 +231,107 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte
}
}

func readPacketWithHeader(reader io.Reader) (io.Reader, error) {
r, w := io.Pipe()

go func() {
length := 0
payloadLength := 0
var payload []byte

for {
buf := make([]byte, 20+0xffff)
n, err := reader.Read(buf)
if err != nil {
w.CloseWithError(err)
return
}

payload = append(payload, buf[:n]...)
length += n

if length > 20 {
// timestamp(64), sequence number(64), length(32)
h := payload[0:20]
p := payload[20:length]

payloadLength = int(binary.BigEndian.Uint32(h[16:20]))

if length == (20 + payloadLength) {
if _, err := w.Write(p); err != nil {
w.CloseWithError(err)
return
}
payload = []byte{}
length = 0
continue
}

// payload が足りないのでさらに読み込む
if length < (20 + payloadLength) {
// 前の payload へ追加して次へ
payload = append(payload, p...)
continue
}

// 次の frame が含まれている場合
if length > (20 + payloadLength) {
if _, err := w.Write(p[:payloadLength]); err != nil {
w.CloseWithError(err)
return
}
// 次の payload 処理へ
payload = p[payloadLength:]
length = len(payload)

// 次の payload がすでにある場合の処理
for {
if length > 20 {
h = payload[0:20]
p = payload[20:length]

payloadLength = int(binary.BigEndian.Uint32(h[16:20]))

// すでに次の payload が全てある場合
if length == (20 + payloadLength) {
if _, err := w.Write(p); err != nil {
w.CloseWithError(err)
return
}
payload = []byte{}
length = 0
continue
}

if length > (20 + payloadLength) {
if _, err := w.Write(p[:payloadLength]); err != nil {
w.CloseWithError(err)
return
}

// 次の payload 処理へ
payload = p[payloadLength:]
length = len(payload)
continue
}
} else {
// payload が足りないので、次の読み込みへ
break
}
}

continue
}
} else {
// ヘッダー分に足りなければ次の読み込みへ
continue
}
}
}()

return r, nil
}

func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sampleRate uint32, channelCount uint16, c Config) error {
o, err := NewWith(oggWriter, sampleRate, channelCount)
if err != nil {
Expand All @@ -247,15 +349,26 @@ func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sa
return err
}

var r io.Reader
if c.AudioStreamingHeader {
r, err = readPacketWithHeader(opusReader)
if err != nil {
return err
}
} else {
r = opusReader
}

for {
buf := make([]byte, FrameSize)
n, err := opusReader.Read(buf)
n, err := r.Read(buf)
if err != nil {
if w, ok := oggWriter.(*io.PipeWriter); ok {
w.CloseWithError(err)
}
return err
}

if n > 0 {
opus := codecs.OpusPacket{}
_, err := opus.Unmarshal(buf[:n])
Expand Down