Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement FETCH streams #8

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion moxygen/MoQCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ void MoQObjectStreamCodec::onIngress(
case StreamType::STREAM_HEADER_SUBGROUP:
parseState_ = ParseState::OBJECT_STREAM;
break;
// CONTROL doesn't have a wire type yet.
case StreamType::FETCH_HEADER:
parseState_ = ParseState::FETCH_HEADER;
break;
// CONTROL doesn't have a wire type yet.
default:
XLOG(DBG4) << "Stream not allowed: 0x" << std::setfill('0')
<< std::setw(sizeof(uint64_t) * 2) << std::hex
Expand All @@ -154,6 +157,19 @@ void MoQObjectStreamCodec::onIngress(
}
break;
}
case ParseState::FETCH_HEADER: {
auto newCursor = cursor;
auto res = parseFetchHeader(newCursor);
if (res.hasError()) {
XLOG(DBG6) << __func__ << " " << uint32_t(res.error());
connError_ = res.error();
break;
}
curObjectHeader_.trackIdentifier = SubscribeID(res.value());
parseState_ = ParseState::MULTI_OBJECT_HEADER;
cursor = newCursor;
break;
}
case ParseState::OBJECT_STREAM: {
auto newCursor = cursor;
auto res = parseStreamHeader(newCursor, streamType_);
Expand Down
2 changes: 2 additions & 0 deletions moxygen/MoQCodec.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ class MoQObjectStreamCodec : public MoQCodec {
public:
~ObjectCallback() override = default;

virtual void onFetchHeader(uint64_t subscribeID) = 0;
virtual void onObjectHeader(ObjectHeader objectHeader) = 0;

virtual void onObjectPayload(
Expand Down Expand Up @@ -166,6 +167,7 @@ class MoQObjectStreamCodec : public MoQCodec {
STREAM_HEADER_TYPE,
DATAGRAM,
OBJECT_STREAM,
FETCH_HEADER,
MULTI_OBJECT_HEADER,
OBJECT_PAYLOAD,
// OBJECT_PAYLOAD_NO_LENGTH
Expand Down
45 changes: 41 additions & 4 deletions moxygen/MoQFramer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,15 @@ folly::Expected<ServerSetup, ErrorCode> parseServerSetup(
return serverSetup;
}

folly::Expected<uint64_t, ErrorCode> parseFetchHeader(
folly::io::Cursor& cursor) noexcept {
auto subscribeID = quic::decodeQuicInteger(cursor);
if (!subscribeID) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
return subscribeID->first;
}

folly::Expected<ObjectHeader, ErrorCode> parseObjectHeader(
folly::io::Cursor& cursor,
size_t length) noexcept {
Expand Down Expand Up @@ -288,10 +297,13 @@ folly::Expected<ObjectHeader, ErrorCode> parseMultiObjectHeader(
const ObjectHeader& headerTemplate) noexcept {
DCHECK(
streamType == StreamType::STREAM_HEADER_TRACK ||
streamType == StreamType::STREAM_HEADER_SUBGROUP);
streamType == StreamType::STREAM_HEADER_SUBGROUP ||
streamType == StreamType::FETCH_HEADER);
// TODO get rid of this
auto length = cursor.totalLength();
ObjectHeader objectHeader = headerTemplate;
if (streamType == StreamType::STREAM_HEADER_TRACK) {
if (streamType == StreamType::STREAM_HEADER_TRACK ||
streamType == StreamType::FETCH_HEADER) {
auto group = quic::decodeQuicInteger(cursor, length);
if (!group) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
Expand All @@ -302,12 +314,28 @@ folly::Expected<ObjectHeader, ErrorCode> parseMultiObjectHeader(
} else {
objectHeader.forwardPreference = ForwardPreference::Subgroup;
}
if (streamType == StreamType::FETCH_HEADER) {
objectHeader.forwardPreference = ForwardPreference::Fetch;
auto subgroup = quic::decodeQuicInteger(cursor, length);
if (!subgroup) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
length -= subgroup->second;
objectHeader.subgroup = subgroup->first;
}
auto id = quic::decodeQuicInteger(cursor, length);
if (!id) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
length -= id->second;
objectHeader.id = id->first;
if (streamType == StreamType::FETCH_HEADER) {
if (length < 2) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
objectHeader.priority = cursor.readBE<uint8_t>();
length--;
}
auto payloadLength = quic::decodeQuicInteger(cursor, length);
if (!payloadLength) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
Expand Down Expand Up @@ -1154,6 +1182,9 @@ WriteResult writeStreamHeader(
folly::to_underlying(StreamType::STREAM_HEADER_SUBGROUP),
size,
error);
} else if (objectHeader.forwardPreference == ForwardPreference::Fetch) {
writeVarint(
writeBuf, folly::to_underlying(StreamType::FETCH_HEADER), size, error);
} else {
LOG(FATAL) << "Unsupported forward preference to stream header";
}
Expand All @@ -1162,7 +1193,9 @@ WriteResult writeStreamHeader(
writeVarint(writeBuf, objectHeader.group, size, error);
writeVarint(writeBuf, objectHeader.subgroup, size, error);
}
writeVarint(writeBuf, objectHeader.priority, size, error);
if (objectHeader.forwardPreference != ForwardPreference::Fetch) {
writeVarint(writeBuf, objectHeader.priority, size, error);
}
if (error) {
return folly::makeUnexpected(quic::TransportErrorCode::INTERNAL_ERROR);
}
Expand Down Expand Up @@ -1199,12 +1232,16 @@ WriteResult writeObject(
if (objectHeader.forwardPreference != ForwardPreference::Subgroup) {
writeVarint(writeBuf, objectHeader.group, size, error);
}
if (objectHeader.forwardPreference == ForwardPreference::Fetch) {
writeVarint(writeBuf, objectHeader.subgroup, size, error);
}
writeVarint(writeBuf, objectHeader.id, size, error);
CHECK(
objectHeader.status != ObjectStatus::NORMAL ||
(objectHeader.length && *objectHeader.length > 0))
<< "Normal objects require non-zero length";
if (objectHeader.forwardPreference == ForwardPreference::Datagram) {
if (objectHeader.forwardPreference == ForwardPreference::Datagram ||
objectHeader.forwardPreference == ForwardPreference::Fetch) {
writeBuf.append(&objectHeader.priority, 1);
size += 1;
}
Expand Down
6 changes: 5 additions & 1 deletion moxygen/MoQFramer.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ enum class StreamType : uint64_t {
OBJECT_DATAGRAM = 1,
STREAM_HEADER_TRACK = 0x2,
STREAM_HEADER_SUBGROUP = 0x4,
FETCH_HEADER = 0x5,
CONTROL = 100000000
};

Expand Down Expand Up @@ -168,7 +169,7 @@ folly::Expected<ServerSetup, ErrorCode> parseServerSetup(
folly::io::Cursor& cursor,
size_t length) noexcept;

enum class ForwardPreference : uint8_t { Track, Subgroup, Datagram };
enum class ForwardPreference : uint8_t { Track, Subgroup, Datagram, Fetch };

enum class ObjectStatus : uint64_t {
NORMAL = 0,
Expand Down Expand Up @@ -264,6 +265,9 @@ folly::Expected<ObjectHeader, ErrorCode> parseObjectHeader(
folly::io::Cursor& cursor,
size_t length) noexcept;

folly::Expected<uint64_t, ErrorCode> parseFetchHeader(
folly::io::Cursor& cursor) noexcept;

folly::Expected<ObjectHeader, ErrorCode> parseStreamHeader(
folly::io::Cursor& cursor,
StreamType streamType) noexcept;
Expand Down
8 changes: 0 additions & 8 deletions moxygen/MoQServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,6 @@ void MoQServer::ControlVisitor::operator()(FetchCancel fetchCancel) const {
XLOG(INFO) << "FetchCancel id=" << fetchCancel.subscribeID;
}

void MoQServer::ControlVisitor::operator()(FetchOk fetchOk) const {
XLOG(INFO) << "FetchOk id=" << fetchOk.subscribeID;
}

void MoQServer::ControlVisitor::operator()(FetchError fetchError) const {
XLOG(INFO) << "FetchError id=" << fetchError.subscribeID;
}

void MoQServer::ControlVisitor::operator()(SubscribeDone subscribeDone) const {
XLOG(INFO) << "SubscribeDone id=" << subscribeDone.subscribeID
<< " code=" << folly::to_underlying(subscribeDone.statusCode)
Expand Down
2 changes: 0 additions & 2 deletions moxygen/MoQServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ class MoQServer {
void operator()(MaxSubscribeId maxSubscribeId) const override;
void operator()(Fetch fetch) const override;
void operator()(FetchCancel fetchCancel) const override;
void operator()(FetchOk fetchOk) const override;
void operator()(FetchError fetchError) const override;
void operator()(Unannounce unannounce) const override;
void operator()(AnnounceCancel announceCancel) const override;
void operator()(SubscribeAnnounces subscribeAnnounces) const override;
Expand Down
Loading
Loading