Skip to content

Commit

Permalink
Introduce QuicWebTransport
Browse files Browse the repository at this point in the history
Summary: This is an implementation of WebTransport that uses raw QUIC rather than HTTP

Reviewed By: hanidamlaj

Differential Revision: D59586164

fbshipit-source-id: 504085dc905f959a3fd344b032ac53e46625e80a
  • Loading branch information
afrind authored and facebook-github-bot committed Oct 24, 2024
1 parent 6c0225d commit 07df503
Show file tree
Hide file tree
Showing 9 changed files with 629 additions and 0 deletions.
1 change: 1 addition & 0 deletions proxygen/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ add_subdirectory(http/connpool/test)
add_subdirectory(http/codec/test)
add_subdirectory(http/codec/compress/test)
add_subdirectory(http/session/test)
add_subdirectory(http/webtransport)
add_subdirectory(http/webtransport/test)
add_subdirectory(sampling/test)
add_subdirectory(services/test)
Expand Down
3 changes: 3 additions & 0 deletions proxygen/lib/http/session/test/MockQuicSocketDriver.h
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,9 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback {
cb->onConnectionError(std::move(error));
}
}
auto& connState = streams_[kConnectionStreamId];
connState.readState = CLOSED;
connState.writeState = CLOSED;
}

void deliverWriteError(quic::StreamId id,
Expand Down
33 changes: 33 additions & 0 deletions proxygen/lib/http/webtransport/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

add_library(
quicwebtransport
QuicWebTransport.cpp
)
add_dependencies(
quicwebtransport
proxygen
)
target_include_directories(
quicwebtransport PUBLIC
$<BUILD_INTERFACE:${PROXYGEN_FBCODE_ROOT}>
$<BUILD_INTERFACE:${PROXYGEN_GENERATED_ROOT}>
$<INSTALL_INTERFACE:include/>
${LIBGMOCK_INCLUDE_DIR}
${LIBGTEST_INCLUDE_DIR}
)
target_compile_options(
quicwebtransport PRIVATE
${_PROXYGEN_COMMON_COMPILE_OPTIONS}
)
target_link_libraries(quicwebtransport PUBLIC proxygen)
install(
TARGETS quicwebtransport
EXPORT proxygen-exports
ARCHIVE DESTINATION ${LIB_INSTALL_DIR}
LIBRARY DESTINATION ${LIB_INSTALL_DIR}
)
256 changes: 256 additions & 0 deletions proxygen/lib/http/webtransport/QuicWebTransport.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

#include <proxygen/lib/http/webtransport/QuicWebTransport.h>

using FCState = proxygen::WebTransportImpl::TransportProvider::FCState;

namespace proxygen {

void QuicWebTransport::onFlowControlUpdate(quic::StreamId /*id*/) noexcept {
}

void QuicWebTransport::onNewBidirectionalStream(quic::StreamId id) noexcept {
XCHECK(quicSocket_);
if (!handler_) {
resetWebTransportEgress(id, WebTransport::kInternalError);
stopReadingWebTransportIngress(id, WebTransport::kInternalError);
return;
}
auto handle = WebTransportImpl::onWebTransportBidiStream(id);
handler_->onNewBidiStream(
WebTransport::BidiStreamHandle({handle.readHandle, handle.writeHandle}));
quicSocket_->setReadCallback(id, handle.readHandle);
}

void QuicWebTransport::onNewUnidirectionalStream(quic::StreamId id) noexcept {
XCHECK(quicSocket_);
if (!handler_) {
LOG(ERROR) << "Handler not set";
stopReadingWebTransportIngress(id, WebTransport::kInternalError);
return;
}
auto readHandle = WebTransportImpl::onWebTransportUniStream(id);
handler_->onNewUniStream(readHandle);
quicSocket_->setReadCallback(id, readHandle);
}

void QuicWebTransport::onStopSending(
quic::StreamId id, quic::ApplicationErrorCode errorCode) noexcept {
onWebTransportStopSending(id, static_cast<uint32_t>(errorCode));
}

void QuicWebTransport::onConnectionEnd() noexcept {
onConnectionEndImpl(folly::none);
}

void QuicWebTransport::onConnectionError(quic::QuicError error) noexcept {
onConnectionEndImpl(error);
}
void QuicWebTransport::onConnectionEnd(quic::QuicError error) noexcept {
onConnectionEndImpl(error);
}

void QuicWebTransport::onConnectionEndImpl(
folly::Optional<quic::QuicError> error) {
destroy();
folly::Optional<uint32_t> wtError;
if (error) {
if (error->code.type() == quic::QuicErrorCode::Type::ApplicationErrorCode) {
wtError = static_cast<uint32_t>(*error->code.asApplicationErrorCode());
} else {
XLOG(ERR) << "QUIC Connection Error: " << *error;
wtError = std::numeric_limits<uint32_t>::max();
}
}
quicSocket_.reset();
handler_->onSessionEnd(wtError);
}

folly::Expected<HTTPCodec::StreamID, WebTransport::ErrorCode>
QuicWebTransport::newWebTransportBidiStream() {
XCHECK(quicSocket_);
auto id = quicSocket_->createBidirectionalStream();
if (id.hasError()) {
return folly::makeUnexpected(ErrorCode::GENERIC_ERROR);
}
return id.value();
}

folly::Expected<HTTPCodec::StreamID, WebTransport::ErrorCode>
QuicWebTransport::newWebTransportUniStream() {
XCHECK(quicSocket_);
auto id = quicSocket_->createUnidirectionalStream();
if (id.hasError()) {
return folly::makeUnexpected(ErrorCode::GENERIC_ERROR);
}
return id.value();
}

folly::Expected<WebTransportImpl::TransportProvider::FCState,
WebTransport::ErrorCode>
QuicWebTransport::sendWebTransportStreamData(HTTPCodec::StreamID id,
std::unique_ptr<folly::IOBuf> data,
bool eof,
quic::StreamWriteCallback* wcb) {
XCHECK(quicSocket_);
auto res = quicSocket_->writeChain(id, std::move(data), eof);
if (!res) {
return folly::makeUnexpected(WebTransport::ErrorCode::GENERIC_ERROR);
}
auto flowControl = quicSocket_->getStreamFlowControl(id);
if (!flowControl) {
LOG(ERROR) << "Failed to get flow control";
return folly::makeUnexpected(WebTransport::ErrorCode::SEND_ERROR);
}
if (!eof && flowControl->sendWindowAvailable == 0) {
quicSocket_->notifyPendingWriteOnStream(id, wcb);
VLOG(4) << "Closing fc window";
return FCState::BLOCKED;
} else {
return FCState::UNBLOCKED;
}
}

folly::Expected<folly::Unit, WebTransport::ErrorCode>
QuicWebTransport::resetWebTransportEgress(HTTPCodec::StreamID id,
uint32_t errorCode) {
XCHECK(quicSocket_);
auto res = quicSocket_->resetStream(id, errorCode);
if (!res) {
return folly::makeUnexpected(WebTransport::ErrorCode::GENERIC_ERROR);
}
return folly::unit;
}

folly::Expected<folly::Unit, WebTransport::ErrorCode>
QuicWebTransport::setWebTransportStreamPriority(HTTPCodec::StreamID id,
HTTPPriority pri) {

XCHECK(quicSocket_);
auto res = quicSocket_->setStreamPriority(
id, quic::Priority(pri.urgency, pri.incremental, pri.orderId));
if (res.hasError()) {
return folly::makeUnexpected(WebTransport::ErrorCode::GENERIC_ERROR);
}

return folly::unit;
}

folly::Expected<folly::Unit, WebTransport::ErrorCode>
QuicWebTransport::pauseWebTransportIngress(HTTPCodec::StreamID id) {
XCHECK(quicSocket_);
auto res = quicSocket_->pauseRead(id);
if (res.hasError()) {
return folly::makeUnexpected(WebTransport::ErrorCode::GENERIC_ERROR);
}
return folly::unit;
}

folly::Expected<folly::Unit, WebTransport::ErrorCode>
QuicWebTransport::resumeWebTransportIngress(HTTPCodec::StreamID id) {
XCHECK(quicSocket_);
auto res = quicSocket_->resumeRead(id);
if (res.hasError()) {
return folly::makeUnexpected(WebTransport::ErrorCode::GENERIC_ERROR);
}
return folly::unit;
}

folly::Expected<folly::Unit, WebTransport::ErrorCode>
QuicWebTransport::stopReadingWebTransportIngress(HTTPCodec::StreamID id,
uint32_t errorCode) {
XCHECK(quicSocket_);
auto res = quicSocket_->setReadCallback(
id, nullptr, quic::ApplicationErrorCode(errorCode));
if (res.hasError()) {
return folly::makeUnexpected(WebTransport::ErrorCode::GENERIC_ERROR);
}
return folly::unit;
}

folly::Expected<folly::Unit, WebTransport::ErrorCode>
QuicWebTransport::sendDatagram(std::unique_ptr<folly::IOBuf> datagram) {
XCHECK(quicSocket_);
auto writeRes = quicSocket_->writeDatagram(std::move(datagram));
if (writeRes.hasError()) {
LOG(ERROR) << "Failed to send datagram";
return folly::makeUnexpected(WebTransport::ErrorCode::GENERIC_ERROR);
}
return folly::unit;
}

folly::Expected<folly::Unit, WebTransport::ErrorCode>
QuicWebTransport::closeSession(folly::Optional<uint32_t> error) {
if (quicSocket_) {
if (error) {
quicSocket_->close(quic::QuicError(quic::ApplicationErrorCode(*error)));
} else {
quicSocket_->close(quic::QuicError(quic::ApplicationErrorCode(0)));
}
quicSocket_.reset();
} // else we came from connectionEnd/Error and quicSocket_ is reset
return folly::unit;
}

void QuicWebTransport::onUnidirectionalStreamsAvailable(
uint64_t numStreamsAvailable) noexcept {
if (numStreamsAvailable > 0 && waitingForUniStreams_) {
waitingForUniStreams_->setValue(folly::unit);
waitingForUniStreams_.reset();
}
}

folly::SemiFuture<folly::Unit> QuicWebTransport::awaitUniStreamCredit() {
XCHECK(quicSocket_);
auto numOpenable = quicSocket_->getNumOpenableUnidirectionalStreams();
if (numOpenable > 0) {
return folly::makeFuture(folly::unit);
}
CHECK(!waitingForUniStreams_);
auto [promise, future] = folly::makePromiseContract<folly::Unit>();
waitingForUniStreams_ = std::move(promise);
return std::move(future);
}

void QuicWebTransport::onBidirectionalStreamsAvailable(
uint64_t numStreamsAvailable) noexcept {
if (numStreamsAvailable > 0 && waitingForBidiStreams_) {
waitingForBidiStreams_->setValue(folly::unit);
waitingForBidiStreams_.reset();
}
}

folly::SemiFuture<folly::Unit> QuicWebTransport::awaitBidiStreamCredit() {
XCHECK(quicSocket_);
auto numOpenable = quicSocket_->getNumOpenableBidirectionalStreams();
if (numOpenable > 0) {
return folly::makeFuture(folly::unit);
}
CHECK(!waitingForBidiStreams_);
auto [promise, future] = folly::makePromiseContract<folly::Unit>();
waitingForBidiStreams_ = std::move(promise);
return std::move(future);
}

void QuicWebTransport::onDatagramsAvailable() noexcept {
XCHECK(quicSocket_);
auto result = quicSocket_->readDatagramBufs();
if (result.hasError()) {
LOG(ERROR) << "Got error while reading datagrams: error="
<< toString(result.error());
closeSession(0);
return;
}
VLOG(4) << "Received " << result.value().size() << " datagrams";
for (auto& datagram : result.value()) {
handler_->onDatagram(std::move(datagram));
}
}

} // namespace proxygen
Loading

0 comments on commit 07df503

Please sign in to comment.