diff --git a/proxygen/lib/http/webtransport/CMakeLists.txt b/proxygen/lib/http/webtransport/CMakeLists.txt index d057b3ca7d..1e7f03e7e8 100644 --- a/proxygen/lib/http/webtransport/CMakeLists.txt +++ b/proxygen/lib/http/webtransport/CMakeLists.txt @@ -4,6 +4,33 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. +add_library( + wt_stream_manager + WtStreamManager.cpp + WtEgressContainer.cpp +) +add_dependencies( + wt_stream_manager + proxygen +) +target_include_directories( + wt_stream_manager PUBLIC + $ + $ + $ +) +target_compile_options( + wt_stream_manager PRIVATE + ${_PROXYGEN_COMMON_COMPILE_OPTIONS} +) +target_link_libraries(wt_stream_manager PUBLIC proxygen) +install( + TARGETS wt_stream_manager + EXPORT proxygen-exports + ARCHIVE DESTINATION ${LIB_INSTALL_DIR} + LIBRARY DESTINATION ${LIB_INSTALL_DIR} +) + add_library( quicwebtransport QuicWebTransport.cpp diff --git a/proxygen/lib/http/webtransport/WtEgressContainer.cpp b/proxygen/lib/http/webtransport/WtEgressContainer.cpp index 4bda9582ef..733c09c57e 100644 --- a/proxygen/lib/http/webtransport/WtEgressContainer.cpp +++ b/proxygen/lib/http/webtransport/WtEgressContainer.cpp @@ -23,10 +23,31 @@ WtBufferedStreamData::FcRes WtBufferedStreamData::enqueue( WtBufferedStreamData::DequeueResult WtBufferedStreamData::dequeue( uint64_t atMost) noexcept { // min of maxBytes and how many bytes remaining in egress window - atMost = std::min({atMost, window_.getAvailable(), data_.chainLength()}); + atMost = std::min(atMost, + std::min(uint64_t(window_.getAvailable()), + uint64_t(data_.chainLength()))); DequeueResult res; res.data = data_.splitAtMost(atMost); - res.fin = data_.empty() && std::exchange(fin_, false); + // Send FIN only if data is empty, fin is pending, and we haven't sent it yet + res.fin = data_.empty() && fin_ && !finSent_; + if (res.fin) { + // IMPORTANT: To maintain stable comparison key for + // WtStreamManager::Compare, we need onlyFinPending() = data_.empty() && + // (fin_ || finSent_) to remain constant during this dequeue call. When we + // send last chunk + fin together: + // - Before: data_.empty()=false -> key=false + // - After: data_.empty()=true -> would become key=true + // To keep key=false, clear both fin_ and finSent_ after sending last chunk + // + fin. For fin-only streams, keep fin_=true, finSent_=true -> key remains + // true. + if (res.data && res.data->computeChainDataLength() > 0) { + // Last chunk + fin case: preserve key=false (finSent_ already false) + fin_ = false; + } else { + // Fin only: mark as sent to preserve key=true + finSent_ = true; + } + } window_.commit(atMost); return res; } diff --git a/proxygen/lib/http/webtransport/WtEgressContainer.h b/proxygen/lib/http/webtransport/WtEgressContainer.h index a85783f2f0..6096d35e8d 100644 --- a/proxygen/lib/http/webtransport/WtEgressContainer.h +++ b/proxygen/lib/http/webtransport/WtEgressContainer.h @@ -109,23 +109,27 @@ class WtBufferedStreamData { } bool hasData() const { - return !data_.empty() || fin_; + return !data_.empty() || (fin_ && !finSent_); } // we can send data if there is either data & available stream fc, or only fin // pending bool canSendData() const { - return (hasData() && window_.getAvailable()) || (data_.empty() && fin_); + return hasData() && (window_.getAvailable() || data_.empty()); } - // returns true if there's only a pending fin + // returns true if there's only a pending fin (or fin already sent) + // NOTE: This is used by Compare function in WtStreamManager - must remain + // stable during dequeue to avoid corrupting the std::set bool onlyFinPending() const { - return data_.empty() && fin_; + return data_.empty() && (fin_ || finSent_); } private: folly::IOBufQueue data_{folly::IOBufQueue::cacheChainLength()}; bool fin_{false}; + bool finSent_{false}; // Track if FIN has been sent (keep fin_ stable for + // Compare) BufferedFlowController window_; }; diff --git a/proxygen/lib/http/webtransport/WtStreamManager.cpp b/proxygen/lib/http/webtransport/WtStreamManager.cpp index 002820b96e..5345d14e4c 100644 --- a/proxygen/lib/http/webtransport/WtStreamManager.cpp +++ b/proxygen/lib/http/webtransport/WtStreamManager.cpp @@ -565,15 +565,18 @@ Result WtStreamManager::enqueue(WtReadHandle& rh, StreamData data) noexcept { StreamData WtStreamManager::dequeue(WtWriteHandle& wh, uint64_t atMost) noexcept { + // Save stream ID before dequeue, in case wh gets removed from writableStreams + uint64_t streamId = wh.getID(); // we're limited by conn egress fc - atMost = std::min(atMost, connSendFc_.getAvailable()); + uint64_t connAvail = connSendFc_.getAvailable(); + atMost = std::min(atMost, connAvail); auto res = writehandle_ref_cast(wh).dequeue(atMost); // TODO(@damlaj): return len to elide unnecessarily computing chain len auto len = computeChainLength(res.data); // commit len bytes to conn window connSendFc_.commit(len); - XLOG(DBG8) << __func__ << "; atMost=" << atMost << "; len=" << len - << "; fin=" << res.fin; + XLOG(DBG8) << __func__ << "; stream=" << streamId << "; len=" << len + << "; fin=" << res.fin << "; atMost=" << atMost; return res; } @@ -581,18 +584,26 @@ WtStreamManager::WtWriteHandle* WtStreamManager::nextWritable() const noexcept { WriteHandle* wh = !writableStreams_.empty() ? writehandle_ptr_cast(*writableStreams_.begin()) : nullptr; + uint64_t connAvail = connSendFc_.getAvailable(); + bool onlyFin = wh && wh->bufferedSendData_.onlyFinPending(); // streams with only a pending fin should be yielded even if connection-level // flow control window is blocked - return (wh && (connSendFc_.getAvailable() > 0 || - wh->bufferedSendData_.onlyFinPending())) - ? wh - : nullptr; + auto result = (wh && (connAvail > 0 || onlyFin)) ? wh : nullptr; + XLOG(DBG6) << __func__ << "; size=" << writableStreams_.size() + << "; connAvail=" << connAvail + << "; result=" << (result ? result->getID() : 999999) + << "; onlyFin=" << onlyFin; + return result; } void WtStreamManager::onStreamWritable(WtWriteHandle& wh) noexcept { bool wasEmpty = !hasEvent(); writableStreams_.insert(&wh); + XLOG(DBG6) << __func__ << "; stream=" << wh.getID() + << "; wasEmpty=" << wasEmpty + << "; size=" << writableStreams_.size(); if (wasEmpty && hasEvent()) { + XLOG(DBG6) << __func__ << "; calling eventsAvailable()"; egressCb_.eventsAvailable(); } } @@ -821,8 +832,8 @@ WriteHandle::writeStreamData(std::unique_ptr data, XLOG_IF(ERR, !(len || fin)) << "no-op writeStreamData"; bool connBlocked = smAccessor_.connSend().buffer(len); bool streamBlocked = bufferedSendData_.enqueue(std::move(data), fin); - XLOG(DBG6) << __func__ << "; len=" << len << "; fin=" << fin - << "; connBlocked=" << connBlocked + XLOG(DBG6) << __func__ << "; stream=" << getID() << "; len=" << len + << "; fin=" << fin << "; connBlocked=" << connBlocked << "; streamBlocked=" << streamBlocked; if (bufferedSendData_.canSendData()) { smAccessor_.onStreamWritable(*this); // stream is now writable @@ -840,7 +851,7 @@ folly::Expected WriteHandle::resetStream( folly::Expected WriteHandle::setPriority( uint8_t level, uint32_t order, bool incremental) { - XLOG(FATAL) << "not implemented"; + return folly::unit; } Result WriteHandle::onMaxData(uint64_t offset) { @@ -874,12 +885,17 @@ StreamData WriteHandle::dequeue(uint64_t atMost) noexcept { XCHECK_NE(state_, Closed) << "dequeue after close"; auto res = bufferedSendData_.dequeue(atMost); const auto bufferAvailable = bufferedSendData_.window().getBufferAvailable(); + XLOG(DBG6) << __func__ << "; stream=" << getID() + << "; len=" << computeChainLength(res.data) << "; fin=" << res.fin + << "; bufferAvail=" << bufferAvailable; if (bufferAvailable > 0) { if (auto p = resetPromise(); p.valid()) { p.setValue(bufferAvailable); } } if (!bufferedSendData_.canSendData()) { + XLOG(DBG6) << __func__ << "; stream=" << getID() + << "; removing from writableStreams"; smAccessor_.writableStreams().erase(this); } finish(res.fin);