Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions proxygen/lib/http/webtransport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,33 @@
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

add_library(
wt_stream_manager
WtStreamManager.cpp
WtEgressContainer.cpp
)
add_dependencies(
wt_stream_manager
proxygen
)
target_include_directories(
wt_stream_manager PUBLIC
$<BUILD_INTERFACE:${PROXYGEN_FBCODE_ROOT}>
$<BUILD_INTERFACE:${PROXYGEN_GENERATED_ROOT}>
$<INSTALL_INTERFACE:include/>
)
target_compile_options(
wt_stream_manager PRIVATE
${_PROXYGEN_COMMON_COMPILE_OPTIONS}
)
target_link_libraries(wt_stream_manager PUBLIC proxygen)
install(
TARGETS wt_stream_manager
EXPORT proxygen-exports
ARCHIVE DESTINATION ${LIB_INSTALL_DIR}
LIBRARY DESTINATION ${LIB_INSTALL_DIR}
)

add_library(
quicwebtransport
QuicWebTransport.cpp
Expand Down
25 changes: 23 additions & 2 deletions proxygen/lib/http/webtransport/WtEgressContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,31 @@ WtBufferedStreamData::FcRes WtBufferedStreamData::enqueue(
WtBufferedStreamData::DequeueResult WtBufferedStreamData::dequeue(
uint64_t atMost) noexcept {
// min of maxBytes and how many bytes remaining in egress window
atMost = std::min({atMost, window_.getAvailable(), data_.chainLength()});
atMost = std::min(atMost,
std::min(uint64_t(window_.getAvailable()),
uint64_t(data_.chainLength())));
DequeueResult res;
res.data = data_.splitAtMost(atMost);
res.fin = data_.empty() && std::exchange(fin_, false);
// Send FIN only if data is empty, fin is pending, and we haven't sent it yet
res.fin = data_.empty() && fin_ && !finSent_;
if (res.fin) {
// IMPORTANT: To maintain stable comparison key for
// WtStreamManager::Compare, we need onlyFinPending() = data_.empty() &&
// (fin_ || finSent_) to remain constant during this dequeue call. When we
// send last chunk + fin together:
// - Before: data_.empty()=false -> key=false
// - After: data_.empty()=true -> would become key=true
// To keep key=false, clear both fin_ and finSent_ after sending last chunk
// + fin. For fin-only streams, keep fin_=true, finSent_=true -> key remains
// true.
if (res.data && res.data->computeChainDataLength() > 0) {
// Last chunk + fin case: preserve key=false (finSent_ already false)
fin_ = false;
} else {
// Fin only: mark as sent to preserve key=true
finSent_ = true;
}
}
window_.commit(atMost);
return res;
}
Expand Down
12 changes: 8 additions & 4 deletions proxygen/lib/http/webtransport/WtEgressContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,27 @@ class WtBufferedStreamData {
}

bool hasData() const {
return !data_.empty() || fin_;
return !data_.empty() || (fin_ && !finSent_);
}

// we can send data if there is either data & available stream fc, or only fin
// pending
bool canSendData() const {
return (hasData() && window_.getAvailable()) || (data_.empty() && fin_);
return hasData() && (window_.getAvailable() || data_.empty());
}

// returns true if there's only a pending fin
// returns true if there's only a pending fin (or fin already sent)
// NOTE: This is used by Compare function in WtStreamManager - must remain
// stable during dequeue to avoid corrupting the std::set
bool onlyFinPending() const {
return data_.empty() && fin_;
return data_.empty() && (fin_ || finSent_);
}

private:
folly::IOBufQueue data_{folly::IOBufQueue::cacheChainLength()};
bool fin_{false};
bool finSent_{false}; // Track if FIN has been sent (keep fin_ stable for
// Compare)
BufferedFlowController window_;
};

Expand Down
36 changes: 26 additions & 10 deletions proxygen/lib/http/webtransport/WtStreamManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -565,34 +565,45 @@ Result WtStreamManager::enqueue(WtReadHandle& rh, StreamData data) noexcept {

StreamData WtStreamManager::dequeue(WtWriteHandle& wh,
uint64_t atMost) noexcept {
// Save stream ID before dequeue, in case wh gets removed from writableStreams
uint64_t streamId = wh.getID();
// we're limited by conn egress fc
atMost = std::min(atMost, connSendFc_.getAvailable());
uint64_t connAvail = connSendFc_.getAvailable();
atMost = std::min(atMost, connAvail);
auto res = writehandle_ref_cast(wh).dequeue(atMost);
// TODO(@damlaj): return len to elide unnecessarily computing chain len
auto len = computeChainLength(res.data);
// commit len bytes to conn window
connSendFc_.commit(len);
XLOG(DBG8) << __func__ << "; atMost=" << atMost << "; len=" << len
<< "; fin=" << res.fin;
XLOG(DBG8) << __func__ << "; stream=" << streamId << "; len=" << len
<< "; fin=" << res.fin << "; atMost=" << atMost;
return res;
}

WtStreamManager::WtWriteHandle* WtStreamManager::nextWritable() const noexcept {
WriteHandle* wh = !writableStreams_.empty()
? writehandle_ptr_cast(*writableStreams_.begin())
: nullptr;
uint64_t connAvail = connSendFc_.getAvailable();
bool onlyFin = wh && wh->bufferedSendData_.onlyFinPending();
// streams with only a pending fin should be yielded even if connection-level
// flow control window is blocked
return (wh && (connSendFc_.getAvailable() > 0 ||
wh->bufferedSendData_.onlyFinPending()))
? wh
: nullptr;
auto result = (wh && (connAvail > 0 || onlyFin)) ? wh : nullptr;
XLOG(DBG6) << __func__ << "; size=" << writableStreams_.size()
<< "; connAvail=" << connAvail
<< "; result=" << (result ? result->getID() : 999999)
<< "; onlyFin=" << onlyFin;
return result;
}

void WtStreamManager::onStreamWritable(WtWriteHandle& wh) noexcept {
bool wasEmpty = !hasEvent();
writableStreams_.insert(&wh);
XLOG(DBG6) << __func__ << "; stream=" << wh.getID()
<< "; wasEmpty=" << wasEmpty
<< "; size=" << writableStreams_.size();
if (wasEmpty && hasEvent()) {
XLOG(DBG6) << __func__ << "; calling eventsAvailable()";
egressCb_.eventsAvailable();
}
}
Expand Down Expand Up @@ -821,8 +832,8 @@ WriteHandle::writeStreamData(std::unique_ptr<folly::IOBuf> data,
XLOG_IF(ERR, !(len || fin)) << "no-op writeStreamData";
bool connBlocked = smAccessor_.connSend().buffer(len);
bool streamBlocked = bufferedSendData_.enqueue(std::move(data), fin);
XLOG(DBG6) << __func__ << "; len=" << len << "; fin=" << fin
<< "; connBlocked=" << connBlocked
XLOG(DBG6) << __func__ << "; stream=" << getID() << "; len=" << len
<< "; fin=" << fin << "; connBlocked=" << connBlocked
<< "; streamBlocked=" << streamBlocked;
if (bufferedSendData_.canSendData()) {
smAccessor_.onStreamWritable(*this); // stream is now writable
Expand All @@ -840,7 +851,7 @@ folly::Expected<folly::Unit, WriteHandle::ErrCode> WriteHandle::resetStream(

folly::Expected<folly::Unit, WriteHandle::ErrCode> WriteHandle::setPriority(
uint8_t level, uint32_t order, bool incremental) {
XLOG(FATAL) << "not implemented";
return folly::unit;
}

Result WriteHandle::onMaxData(uint64_t offset) {
Expand Down Expand Up @@ -874,12 +885,17 @@ StreamData WriteHandle::dequeue(uint64_t atMost) noexcept {
XCHECK_NE(state_, Closed) << "dequeue after close";
auto res = bufferedSendData_.dequeue(atMost);
const auto bufferAvailable = bufferedSendData_.window().getBufferAvailable();
XLOG(DBG6) << __func__ << "; stream=" << getID()
<< "; len=" << computeChainLength(res.data) << "; fin=" << res.fin
<< "; bufferAvail=" << bufferAvailable;
if (bufferAvailable > 0) {
if (auto p = resetPromise(); p.valid()) {
p.setValue(bufferAvailable);
}
}
if (!bufferedSendData_.canSendData()) {
XLOG(DBG6) << __func__ << "; stream=" << getID()
<< "; removing from writableStreams";
smAccessor_.writableStreams().erase(this);
}
finish(res.fin);
Expand Down
Loading