diff --git a/src/platform/common.h b/src/platform/common.h index 86a7385681e..595b0cbecb0 100644 --- a/src/platform/common.h +++ b/src/platform/common.h @@ -620,8 +620,10 @@ namespace platf { send_batch(batched_send_info_t &send_info); struct send_info_t { - const char *buffer; - size_t size; + const char *header; + size_t header_size; + const char *payload; + size_t payload_size; std::uintptr_t native_socket; boost::asio::ip::address &target_address; diff --git a/src/platform/linux/misc.cpp b/src/platform/linux/misc.cpp index 25ea197046e..81b88f134a0 100644 --- a/src/platform/linux/misc.cpp +++ b/src/platform/linux/misc.cpp @@ -606,12 +606,19 @@ namespace platf { memcpy(CMSG_DATA(pktinfo_cm), &pktInfo, sizeof(pktInfo)); } - struct iovec iov = {}; - iov.iov_base = (void *) send_info.buffer; - iov.iov_len = send_info.size; - - msg.msg_iov = &iov; - msg.msg_iovlen = 1; + struct iovec iovs[2] = {}; + int iovlen = 0; + if (send_info.header) { + iovs[iovlen].iov_base = (void *) send_info.header; + iovs[iovlen].iov_len = send_info.header_size; + iovlen++; + } + iovs[iovlen].iov_base = (void *) send_info.payload; + iovs[iovlen].iov_len = send_info.payload_size; + iovlen++; + + msg.msg_iov = iovs; + msg.msg_iovlen = iovlen; msg.msg_controllen = cmbuflen; diff --git a/src/platform/macos/misc.mm b/src/platform/macos/misc.mm index ae5f266da6d..f03dd3bfb5c 100644 --- a/src/platform/macos/misc.mm +++ b/src/platform/macos/misc.mm @@ -363,12 +363,19 @@ memcpy(CMSG_DATA(pktinfo_cm), &pktInfo, sizeof(pktInfo)); } - struct iovec iov = {}; - iov.iov_base = (void *) send_info.buffer; - iov.iov_len = send_info.size; + struct iovec iovs[2] = {}; + int iovlen = 0; + if (send_info.header) { + iovs[iovlen].iov_base = (void *) send_info.header; + iovs[iovlen].iov_len = send_info.header_size; + iovlen++; + } + iovs[iovlen].iov_base = (void *) send_info.payload; + iovs[iovlen].iov_len = send_info.payload_size; + iovlen++; - msg.msg_iov = &iov; - msg.msg_iovlen = 1; + msg.msg_iov = iovs; + msg.msg_iovlen = iovlen; msg.msg_controllen = cmbuflen; diff --git a/src/platform/windows/misc.cpp b/src/platform/windows/misc.cpp index 82632537ee2..455771c6723 100644 --- a/src/platform/windows/misc.cpp +++ b/src/platform/windows/misc.cpp @@ -1535,12 +1535,19 @@ namespace platf { msg.namelen = sizeof(taddr_v4); } - WSABUF buf; - buf.buf = (char *) send_info.buffer; - buf.len = send_info.size; - - msg.lpBuffers = &buf; - msg.dwBufferCount = 1; + WSABUF bufs[2]; + DWORD bufcount = 0; + if (send_info.header) { + bufs[bufcount].buf = (char *) send_info.header; + bufs[bufcount].len = send_info.header_size; + bufcount++; + } + bufs[bufcount].buf = (char *) send_info.payload; + bufs[bufcount].len = send_info.payload_size; + bufcount++; + + msg.lpBuffers = bufs; + msg.dwBufferCount = bufcount; msg.dwFlags = 0; char cmbuf[std::max(WSA_CMSG_SPACE(sizeof(IN6_PKTINFO)), WSA_CMSG_SPACE(sizeof(IN_PKTINFO)))] = {}; diff --git a/src/stream.cpp b/src/stream.cpp index 93da58e7eb4..40adab09bd3 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -136,12 +136,7 @@ namespace stream { std::uint8_t tag[16]; }; - struct audio_packet_raw_t { - uint8_t * - payload() { - return (uint8_t *) (this + 1); - } - + struct audio_packet_t { RTP_PACKET rtp; }; @@ -219,12 +214,7 @@ namespace stream { // encrypted control_header_v2 and payload data follow } *control_encrypted_p; - struct audio_fec_packet_raw_t { - uint8_t * - payload() { - return (uint8_t *) (this + 1); - } - + struct audio_fec_packet_t { RTP_PACKET rtp; AUDIO_FEC_HEADER fecHeader; }; @@ -238,8 +228,6 @@ namespace stream { constexpr std::size_t MAX_AUDIO_PACKET_SIZE = 1400; using video_packet_t = util::c_ptr; - using audio_packet_t = util::c_ptr; - using audio_fec_packet_t = util::c_ptr; using audio_aes_t = std::array; using av_session_id_t = std::variant; // IP address or SS-Ping-Payload from RTSP handshake @@ -249,14 +237,14 @@ namespace stream { // return bytes written on success // return -1 on error static inline int - encode_audio(bool encrypted, const audio::buffer_t &plaintext, audio_packet_t &destination, crypto::aes_t &iv, crypto::cipher::cbc_t &cbc) { + encode_audio(bool encrypted, const audio::buffer_t &plaintext, uint8_t *destination, crypto::aes_t &iv, crypto::cipher::cbc_t &cbc) { // If encryption isn't enabled if (!encrypted) { - std::copy(std::begin(plaintext), std::end(plaintext), destination->payload()); + std::copy(std::begin(plaintext), std::end(plaintext), destination); return plaintext.size(); } - return cbc.encrypt(std::string_view { (char *) std::begin(plaintext), plaintext.size() }, destination->payload(), &iv); + return cbc.encrypt(std::string_view { (char *) std::begin(plaintext), plaintext.size() }, destination, &iv); } static inline void @@ -755,6 +743,7 @@ namespace stream { std::vector replace(const std::string_view &original, const std::string_view &old, const std::string_view &_new) { std::vector replaced; + replaced.reserve(original.size() + _new.size() - old.size()); auto begin = std::begin(original); auto end = std::end(original); @@ -1531,6 +1520,8 @@ namespace stream { BOOST_LOG(verbose) << "Falling back to unbatched send"sv; for (auto y = 0; y < current_batch_size; y++) { auto send_info = platf::send_info_t { + nullptr, + 0, shards.prefix(next_shard_to_send + y), shards.prefixsize + shards.blocksize, (uintptr_t) sock.native_handle(), @@ -1584,9 +1575,7 @@ namespace stream { auto shutdown_event = mail::man->event(mail::broadcast_shutdown); auto packets = mail::man->queue(mail::audio_packets); - constexpr auto max_block_size = crypto::cipher::round_to_pkcs7_padded(2048); - - audio_packet_t audio_packet { (audio_packet_raw_t *) malloc(sizeof(audio_packet_raw_t) + max_block_size) }; + audio_packet_t audio_packet; fec::rs_t rs { reed_solomon_new(RTPA_DATA_SHARDS, RTPA_FEC_SHARDS) }; crypto::aes_t iv(16); @@ -1598,9 +1587,9 @@ namespace stream { const unsigned char parity[] = { 0x77, 0x40, 0x38, 0x0e, 0xc7, 0xa7, 0x0d, 0x6c }; memcpy(rs.get()->p, parity, sizeof(parity)); - audio_packet->rtp.header = 0x80; - audio_packet->rtp.packetType = 97; - audio_packet->rtp.ssrc = 0; + audio_packet.rtp.header = 0x80; + audio_packet.rtp.packetType = 97; + audio_packet.rtp.ssrc = 0; // Audio traffic is sent on this thread platf::adjust_thread_priority(platf::thread_priority_e::high); @@ -1618,26 +1607,28 @@ namespace stream { *(std::uint32_t *) iv.data() = util::endian::big(session->audio.avRiKeyId + sequenceNumber); - auto bytes = encode_audio(session->config.encryptionFlagsEnabled & SS_ENC_AUDIO, packet_data, audio_packet, iv, session->audio.cipher); + auto &shards_p = session->audio.shards_p; + + auto bytes = encode_audio(session->config.encryptionFlagsEnabled & SS_ENC_AUDIO, packet_data, + shards_p[sequenceNumber % RTPA_DATA_SHARDS], iv, session->audio.cipher); if (bytes < 0) { BOOST_LOG(error) << "Couldn't encode audio packet"sv; break; } - audio_packet->rtp.sequenceNumber = util::endian::big(sequenceNumber); - audio_packet->rtp.timestamp = util::endian::big(timestamp); + audio_packet.rtp.sequenceNumber = util::endian::big(sequenceNumber); + audio_packet.rtp.timestamp = util::endian::big(timestamp); session->audio.sequenceNumber++; session->audio.timestamp += session->config.audio.packetDuration; - auto &shards_p = session->audio.shards_p; - - std::copy_n(audio_packet->payload(), bytes, shards_p[sequenceNumber % RTPA_DATA_SHARDS]); auto peer_address = session->audio.peer.address(); try { auto send_info = platf::send_info_t { - (const char *) audio_packet.get(), - sizeof(audio_packet_raw_t) + bytes, + (const char *) &audio_packet, + sizeof(audio_packet), + (const char *) shards_p[sequenceNumber % RTPA_DATA_SHARDS], + (size_t) bytes, (uintptr_t) sock.native_handle(), peer_address, session->audio.peer.port(), @@ -1649,8 +1640,8 @@ namespace stream { auto &fec_packet = session->audio.fec_packet; // initialize the FEC header at the beginning of the FEC block if (sequenceNumber % RTPA_DATA_SHARDS == 0) { - fec_packet->fecHeader.baseSequenceNumber = util::endian::big(sequenceNumber); - fec_packet->fecHeader.baseTimestamp = util::endian::big(timestamp); + fec_packet.fecHeader.baseSequenceNumber = util::endian::big(sequenceNumber); + fec_packet.fecHeader.baseTimestamp = util::endian::big(timestamp); } // generate parity shards at the end of the FEC block @@ -1658,13 +1649,14 @@ namespace stream { reed_solomon_encode(rs.get(), shards_p.begin(), RTPA_TOTAL_SHARDS, bytes); for (auto x = 0; x < RTPA_FEC_SHARDS; ++x) { - fec_packet->rtp.sequenceNumber = util::endian::big(sequenceNumber + x + 1); - fec_packet->fecHeader.fecShardIndex = x; - memcpy(fec_packet->payload(), shards_p[RTPA_DATA_SHARDS + x], bytes); + fec_packet.rtp.sequenceNumber = util::endian::big(sequenceNumber + x + 1); + fec_packet.fecHeader.fecShardIndex = x; auto send_info = platf::send_info_t { - (const char *) fec_packet.get(), - sizeof(audio_fec_packet_raw_t) + bytes, + (const char *) &fec_packet, + sizeof(fec_packet), + (const char *) shards_p[RTPA_DATA_SHARDS + x], + (size_t) bytes, (uintptr_t) sock.native_handle(), peer_address, session->audio.peer.port(), @@ -2030,15 +2022,13 @@ namespace stream { session->audio.shards = std::move(shards); session->audio.shards_p = std::move(shards_p); - session->audio.fec_packet.reset((audio_fec_packet_raw_t *) malloc(sizeof(audio_fec_packet_raw_t) + max_block_size)); - - session->audio.fec_packet->rtp.header = 0x80; - session->audio.fec_packet->rtp.packetType = 127; - session->audio.fec_packet->rtp.timestamp = 0; - session->audio.fec_packet->rtp.ssrc = 0; + session->audio.fec_packet.rtp.header = 0x80; + session->audio.fec_packet.rtp.packetType = 127; + session->audio.fec_packet.rtp.timestamp = 0; + session->audio.fec_packet.rtp.ssrc = 0; - session->audio.fec_packet->fecHeader.payloadType = 97; - session->audio.fec_packet->fecHeader.ssrc = 0; + session->audio.fec_packet.fecHeader.payloadType = 97; + session->audio.fec_packet.fecHeader.ssrc = 0; session->audio.cipher = crypto::cipher::cbc_t { launch_session.gcm_key, true