Skip to content

Commit

Permalink
Implement S/G IO for non-batched sends and eliminate more data copies (
Browse files Browse the repository at this point in the history
  • Loading branch information
cgutman authored Jul 18, 2024
1 parent b93756a commit 81c6e61
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 65 deletions.
6 changes: 4 additions & 2 deletions src/platform/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -620,8 +620,10 @@ namespace platf {
send_batch(batched_send_info_t &send_info);

struct send_info_t {
const char *buffer;
size_t size;
const char *header;
size_t header_size;
const char *payload;
size_t payload_size;

std::uintptr_t native_socket;
boost::asio::ip::address &target_address;
Expand Down
19 changes: 13 additions & 6 deletions src/platform/linux/misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -606,12 +606,19 @@ namespace platf {
memcpy(CMSG_DATA(pktinfo_cm), &pktInfo, sizeof(pktInfo));
}

struct iovec iov = {};
iov.iov_base = (void *) send_info.buffer;
iov.iov_len = send_info.size;

msg.msg_iov = &iov;
msg.msg_iovlen = 1;
struct iovec iovs[2] = {};
int iovlen = 0;
if (send_info.header) {
iovs[iovlen].iov_base = (void *) send_info.header;
iovs[iovlen].iov_len = send_info.header_size;
iovlen++;
}
iovs[iovlen].iov_base = (void *) send_info.payload;
iovs[iovlen].iov_len = send_info.payload_size;
iovlen++;

msg.msg_iov = iovs;
msg.msg_iovlen = iovlen;

msg.msg_controllen = cmbuflen;

Expand Down
17 changes: 12 additions & 5 deletions src/platform/macos/misc.mm
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,19 @@
memcpy(CMSG_DATA(pktinfo_cm), &pktInfo, sizeof(pktInfo));
}

struct iovec iov = {};
iov.iov_base = (void *) send_info.buffer;
iov.iov_len = send_info.size;
struct iovec iovs[2] = {};
int iovlen = 0;
if (send_info.header) {
iovs[iovlen].iov_base = (void *) send_info.header;
iovs[iovlen].iov_len = send_info.header_size;
iovlen++;
}
iovs[iovlen].iov_base = (void *) send_info.payload;
iovs[iovlen].iov_len = send_info.payload_size;
iovlen++;

msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_iov = iovs;
msg.msg_iovlen = iovlen;

msg.msg_controllen = cmbuflen;

Expand Down
19 changes: 13 additions & 6 deletions src/platform/windows/misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1535,12 +1535,19 @@ namespace platf {
msg.namelen = sizeof(taddr_v4);
}

WSABUF buf;
buf.buf = (char *) send_info.buffer;
buf.len = send_info.size;

msg.lpBuffers = &buf;
msg.dwBufferCount = 1;
WSABUF bufs[2];
DWORD bufcount = 0;
if (send_info.header) {
bufs[bufcount].buf = (char *) send_info.header;
bufs[bufcount].len = send_info.header_size;
bufcount++;
}
bufs[bufcount].buf = (char *) send_info.payload;
bufs[bufcount].len = send_info.payload_size;
bufcount++;

msg.lpBuffers = bufs;
msg.dwBufferCount = bufcount;
msg.dwFlags = 0;

char cmbuf[std::max(WSA_CMSG_SPACE(sizeof(IN6_PKTINFO)), WSA_CMSG_SPACE(sizeof(IN_PKTINFO)))] = {};
Expand Down
82 changes: 36 additions & 46 deletions src/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,7 @@ namespace stream {
std::uint8_t tag[16];
};

struct audio_packet_raw_t {
uint8_t *
payload() {
return (uint8_t *) (this + 1);
}

struct audio_packet_t {
RTP_PACKET rtp;
};

Expand Down Expand Up @@ -219,12 +214,7 @@ namespace stream {
// encrypted control_header_v2 and payload data follow
} *control_encrypted_p;

struct audio_fec_packet_raw_t {
uint8_t *
payload() {
return (uint8_t *) (this + 1);
}

struct audio_fec_packet_t {
RTP_PACKET rtp;
AUDIO_FEC_HEADER fecHeader;
};
Expand All @@ -238,8 +228,6 @@ namespace stream {
constexpr std::size_t MAX_AUDIO_PACKET_SIZE = 1400;

using video_packet_t = util::c_ptr<video_packet_raw_t>;
using audio_packet_t = util::c_ptr<audio_packet_raw_t>;
using audio_fec_packet_t = util::c_ptr<audio_fec_packet_raw_t>;
using audio_aes_t = std::array<char, round_to_pkcs7_padded(MAX_AUDIO_PACKET_SIZE)>;

using av_session_id_t = std::variant<asio::ip::address, std::string>; // IP address or SS-Ping-Payload from RTSP handshake
Expand All @@ -249,14 +237,14 @@ namespace stream {
// return bytes written on success
// return -1 on error
static inline int
encode_audio(bool encrypted, const audio::buffer_t &plaintext, audio_packet_t &destination, crypto::aes_t &iv, crypto::cipher::cbc_t &cbc) {
encode_audio(bool encrypted, const audio::buffer_t &plaintext, uint8_t *destination, crypto::aes_t &iv, crypto::cipher::cbc_t &cbc) {
// If encryption isn't enabled
if (!encrypted) {
std::copy(std::begin(plaintext), std::end(plaintext), destination->payload());
std::copy(std::begin(plaintext), std::end(plaintext), destination);
return plaintext.size();
}

return cbc.encrypt(std::string_view { (char *) std::begin(plaintext), plaintext.size() }, destination->payload(), &iv);
return cbc.encrypt(std::string_view { (char *) std::begin(plaintext), plaintext.size() }, destination, &iv);
}

static inline void
Expand Down Expand Up @@ -755,6 +743,7 @@ namespace stream {
std::vector<uint8_t>
replace(const std::string_view &original, const std::string_view &old, const std::string_view &_new) {
std::vector<uint8_t> replaced;
replaced.reserve(original.size() + _new.size() - old.size());

auto begin = std::begin(original);
auto end = std::end(original);
Expand Down Expand Up @@ -1531,6 +1520,8 @@ namespace stream {
BOOST_LOG(verbose) << "Falling back to unbatched send"sv;
for (auto y = 0; y < current_batch_size; y++) {
auto send_info = platf::send_info_t {
nullptr,
0,
shards.prefix(next_shard_to_send + y),
shards.prefixsize + shards.blocksize,
(uintptr_t) sock.native_handle(),
Expand Down Expand Up @@ -1584,9 +1575,7 @@ namespace stream {
auto shutdown_event = mail::man->event<bool>(mail::broadcast_shutdown);
auto packets = mail::man->queue<audio::packet_t>(mail::audio_packets);

constexpr auto max_block_size = crypto::cipher::round_to_pkcs7_padded(2048);

audio_packet_t audio_packet { (audio_packet_raw_t *) malloc(sizeof(audio_packet_raw_t) + max_block_size) };
audio_packet_t audio_packet;
fec::rs_t rs { reed_solomon_new(RTPA_DATA_SHARDS, RTPA_FEC_SHARDS) };
crypto::aes_t iv(16);

Expand All @@ -1598,9 +1587,9 @@ namespace stream {
const unsigned char parity[] = { 0x77, 0x40, 0x38, 0x0e, 0xc7, 0xa7, 0x0d, 0x6c };
memcpy(rs.get()->p, parity, sizeof(parity));

audio_packet->rtp.header = 0x80;
audio_packet->rtp.packetType = 97;
audio_packet->rtp.ssrc = 0;
audio_packet.rtp.header = 0x80;
audio_packet.rtp.packetType = 97;
audio_packet.rtp.ssrc = 0;

// Audio traffic is sent on this thread
platf::adjust_thread_priority(platf::thread_priority_e::high);
Expand All @@ -1618,26 +1607,28 @@ namespace stream {

*(std::uint32_t *) iv.data() = util::endian::big<std::uint32_t>(session->audio.avRiKeyId + sequenceNumber);

auto bytes = encode_audio(session->config.encryptionFlagsEnabled & SS_ENC_AUDIO, packet_data, audio_packet, iv, session->audio.cipher);
auto &shards_p = session->audio.shards_p;

auto bytes = encode_audio(session->config.encryptionFlagsEnabled & SS_ENC_AUDIO, packet_data,
shards_p[sequenceNumber % RTPA_DATA_SHARDS], iv, session->audio.cipher);
if (bytes < 0) {
BOOST_LOG(error) << "Couldn't encode audio packet"sv;
break;
}

audio_packet->rtp.sequenceNumber = util::endian::big(sequenceNumber);
audio_packet->rtp.timestamp = util::endian::big(timestamp);
audio_packet.rtp.sequenceNumber = util::endian::big(sequenceNumber);
audio_packet.rtp.timestamp = util::endian::big(timestamp);

session->audio.sequenceNumber++;
session->audio.timestamp += session->config.audio.packetDuration;

auto &shards_p = session->audio.shards_p;

std::copy_n(audio_packet->payload(), bytes, shards_p[sequenceNumber % RTPA_DATA_SHARDS]);
auto peer_address = session->audio.peer.address();
try {
auto send_info = platf::send_info_t {
(const char *) audio_packet.get(),
sizeof(audio_packet_raw_t) + bytes,
(const char *) &audio_packet,
sizeof(audio_packet),
(const char *) shards_p[sequenceNumber % RTPA_DATA_SHARDS],
(size_t) bytes,
(uintptr_t) sock.native_handle(),
peer_address,
session->audio.peer.port(),
Expand All @@ -1649,22 +1640,23 @@ namespace stream {
auto &fec_packet = session->audio.fec_packet;
// initialize the FEC header at the beginning of the FEC block
if (sequenceNumber % RTPA_DATA_SHARDS == 0) {
fec_packet->fecHeader.baseSequenceNumber = util::endian::big(sequenceNumber);
fec_packet->fecHeader.baseTimestamp = util::endian::big(timestamp);
fec_packet.fecHeader.baseSequenceNumber = util::endian::big(sequenceNumber);
fec_packet.fecHeader.baseTimestamp = util::endian::big(timestamp);
}

// generate parity shards at the end of the FEC block
if ((sequenceNumber + 1) % RTPA_DATA_SHARDS == 0) {
reed_solomon_encode(rs.get(), shards_p.begin(), RTPA_TOTAL_SHARDS, bytes);

for (auto x = 0; x < RTPA_FEC_SHARDS; ++x) {
fec_packet->rtp.sequenceNumber = util::endian::big<std::uint16_t>(sequenceNumber + x + 1);
fec_packet->fecHeader.fecShardIndex = x;
memcpy(fec_packet->payload(), shards_p[RTPA_DATA_SHARDS + x], bytes);
fec_packet.rtp.sequenceNumber = util::endian::big<std::uint16_t>(sequenceNumber + x + 1);
fec_packet.fecHeader.fecShardIndex = x;

auto send_info = platf::send_info_t {
(const char *) fec_packet.get(),
sizeof(audio_fec_packet_raw_t) + bytes,
(const char *) &fec_packet,
sizeof(fec_packet),
(const char *) shards_p[RTPA_DATA_SHARDS + x],
(size_t) bytes,
(uintptr_t) sock.native_handle(),
peer_address,
session->audio.peer.port(),
Expand Down Expand Up @@ -2030,15 +2022,13 @@ namespace stream {
session->audio.shards = std::move(shards);
session->audio.shards_p = std::move(shards_p);

session->audio.fec_packet.reset((audio_fec_packet_raw_t *) malloc(sizeof(audio_fec_packet_raw_t) + max_block_size));

session->audio.fec_packet->rtp.header = 0x80;
session->audio.fec_packet->rtp.packetType = 127;
session->audio.fec_packet->rtp.timestamp = 0;
session->audio.fec_packet->rtp.ssrc = 0;
session->audio.fec_packet.rtp.header = 0x80;
session->audio.fec_packet.rtp.packetType = 127;
session->audio.fec_packet.rtp.timestamp = 0;
session->audio.fec_packet.rtp.ssrc = 0;

session->audio.fec_packet->fecHeader.payloadType = 97;
session->audio.fec_packet->fecHeader.ssrc = 0;
session->audio.fec_packet.fecHeader.payloadType = 97;
session->audio.fec_packet.fecHeader.ssrc = 0;

session->audio.cipher = crypto::cipher::cbc_t {
launch_session.gcm_key, true
Expand Down

0 comments on commit 81c6e61

Please sign in to comment.