From 0662c695b1af4ccbd29557d53fcca563a4e7717f Mon Sep 17 00:00:00 2001 From: turuslan Date: Thu, 19 Feb 2026 20:17:45 +0500 Subject: [PATCH 1/6] cmake preset vcpkg Signed-off-by: turuslan --- CMakePresets.json | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 CMakePresets.json diff --git a/CMakePresets.json b/CMakePresets.json new file mode 100644 index 000000000..e0445e306 --- /dev/null +++ b/CMakePresets.json @@ -0,0 +1,14 @@ +{ + "version": 2, + "configurePresets": [ + { + "name": "vcpkg", + "generator": "Ninja", + "binaryDir": "${sourceDir}/build", + "cacheVariables": { + "CMAKE_TOOLCHAIN_FILE": "$env{VCPKG_ROOT}/scripts/buildsystems/vcpkg.cmake", + "CMAKE_BUILD_TYPE": "Debug" + } + } + ] +} From 5edd59786005573390c8c40bc2a99857f4ee864f Mon Sep 17 00:00:00 2001 From: turuslan Date: Thu, 19 Feb 2026 20:17:59 +0500 Subject: [PATCH 2/6] update vcpkg baseline Signed-off-by: turuslan --- vcpkg-configuration.json | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/vcpkg-configuration.json b/vcpkg-configuration.json index 1ad42d251..0073f1a14 100644 --- a/vcpkg-configuration.json +++ b/vcpkg-configuration.json @@ -1,16 +1,9 @@ { "default-registry": { "kind": "git", - "baseline": "fe1cde61e971d53c9687cf9a46308f8f55da19fa", + "baseline": "897ba2ab4c4c776b985ab1f599548fcf3ae598ba", "repository": "https://github.com/microsoft/vcpkg" }, - "registries": [ - { - "kind": "artifact", - "location": "https://github.com/microsoft/vcpkg-ce-catalog/archive/refs/heads/main.zip", - "name": "microsoft" - } - ], "overlay-ports": [ "vcpkg-overlay" ] From 8d733af65d6c4a371beb64430eb854f8de626a9f Mon Sep 17 00:00:00 2001 From: turuslan Date: Thu, 19 Feb 2026 20:20:34 +0500 Subject: [PATCH 3/6] simplify read Signed-off-by: turuslan --- include/libp2p/transport/quic/engine.hpp | 9 +-------- src/transport/quic/engine.cpp | 15 +++++---------- src/transport/quic/stream.cpp | 10 +++++++++- 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/include/libp2p/transport/quic/engine.hpp b/include/libp2p/transport/quic/engine.hpp index f29151d9a..6b73b2049 100644 --- a/include/libp2p/transport/quic/engine.hpp +++ b/include/libp2p/transport/quic/engine.hpp @@ -75,14 +75,7 @@ namespace libp2p::transport::lsquic { Engine *engine; lsquic_stream_t *ls_stream; std::weak_ptr stream{}; - /** - * Stream read operation arguments. - */ - struct Reading { - BytesOut out; - std::function)> cb; - }; - std::optional reading{}; + std::optional> reading{}; }; using OnAccept = std::function)>; diff --git a/src/transport/quic/engine.cpp b/src/transport/quic/engine.cpp index 267a8d761..43b2e3997 100644 --- a/src/transport/quic/engine.cpp +++ b/src/transport/quic/engine.cpp @@ -147,12 +147,12 @@ namespace libp2p::transport::lsquic { +[](lsquic_stream_t *stream, lsquic_stream_ctx_t *_stream_ctx) { // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) auto stream_ctx = reinterpret_cast(_stream_ctx); - if (auto op = qtils::optionTake(stream_ctx->reading)) { - op->cb(QuicError::STREAM_CLOSED); - } if (auto stream = stream_ctx->stream.lock()) { stream->onClose(); } + if (auto reading = qtils::optionTake(stream_ctx->reading)) { + reading.value()(); + } // NOLINTNEXTLINE(cppcoreguidelines-owning-memory) delete stream_ctx; }; @@ -161,14 +161,9 @@ namespace libp2p::transport::lsquic { lsquic_stream_wantread(stream, 0); // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) auto stream_ctx = reinterpret_cast(_stream_ctx); - auto op = qtils::optionTake(stream_ctx->reading).value(); - auto n = lsquic_stream_read(stream, op.out.data(), op.out.size()); - outcome::result r = QuicError::STREAM_CLOSED; - if (n > 0) { - r = n; + if (auto reading = qtils::optionTake(stream_ctx->reading)) { + reading.value()(); } - post(*stream_ctx->engine->io_context_, - [cb{std::move(op.cb)}, r] { cb(r); }); }; lsquic_engine_api api{}; diff --git a/src/transport/quic/stream.cpp b/src/transport/quic/stream.cpp index 1adb3f527..f200139d9 100644 --- a/src/transport/quic/stream.cpp +++ b/src/transport/quic/stream.cpp @@ -34,7 +34,15 @@ namespace libp2p::connection { } auto n = lsquic_stream_read(stream_ctx_->ls_stream, out.data(), out.size()); if (n == -1 and errno == EWOULDBLOCK) { - stream_ctx_->reading.emplace(StreamCtx::Reading{out, std::move(cb)}); + stream_ctx_->reading.emplace( + [weak_self{weak_from_this()}, out, cb{std::move(cb)}]() mutable { + auto self = weak_self.lock(); + if (not self) { + cb(QuicError::STREAM_CLOSED); + return; + } + self->readSome(out, std::move(cb)); + }); lsquic_stream_wantread(stream_ctx_->ls_stream, 1); return; } From db84f1a7d2f15f114f1c7227d08290a2c434bfba Mon Sep 17 00:00:00 2001 From: turuslan Date: Thu, 19 Feb 2026 20:20:55 +0500 Subject: [PATCH 4/6] fix quic version Signed-off-by: turuslan --- src/transport/quic/engine.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/transport/quic/engine.cpp b/src/transport/quic/engine.cpp index 43b2e3997..cadb3d5c2 100644 --- a/src/transport/quic/engine.cpp +++ b/src/transport/quic/engine.cpp @@ -45,6 +45,7 @@ namespace libp2p::transport::lsquic { lsquic_engine_settings settings{}; lsquic_engine_init_settings(&settings, flags); + settings.es_versions = 1 << LSQVER_I001; settings.es_init_max_stream_data_bidi_remote = mux_config.maximum_window_size; settings.es_init_max_stream_data_bidi_local = From e25bdb657d2fe2c5abf6e10b31d291da142206e2 Mon Sep 17 00:00:00 2001 From: turuslan Date: Thu, 19 Feb 2026 20:22:05 +0500 Subject: [PATCH 5/6] batch defer process Signed-off-by: turuslan --- include/libp2p/transport/quic/engine.hpp | 4 +++- src/transport/quic/engine.cpp | 15 ++++++++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/include/libp2p/transport/quic/engine.hpp b/include/libp2p/transport/quic/engine.hpp index 6b73b2049..c60f34d2e 100644 --- a/include/libp2p/transport/quic/engine.hpp +++ b/include/libp2p/transport/quic/engine.hpp @@ -111,9 +111,10 @@ namespace libp2p::transport::lsquic { void onAccept(OnAccept cb) { on_accept_ = std::move(cb); } - void process(); + void wantProcess(); private: + void process(); void readLoop(); std::shared_ptr io_context_; @@ -127,6 +128,7 @@ namespace libp2p::transport::lsquic { lsquic_engine_t *engine_ = nullptr; OnAccept on_accept_; bool started_ = false; + bool want_process_ = false; std::optional connecting_; struct Reading { static constexpr size_t kMaxUdpPacketSize = 64 << 10; diff --git a/src/transport/quic/engine.cpp b/src/transport/quic/engine.cpp index cadb3d5c2..38d7492ef 100644 --- a/src/transport/quic/engine.cpp +++ b/src/transport/quic/engine.cpp @@ -257,7 +257,7 @@ namespace libp2p::transport::lsquic { if (auto op = qtils::optionTake(connecting_)) { op->cb(QuicError::CANT_CREATE_CONNECTION); } - process(); + wantProcess(); } outcome::result> Engine::newStream( @@ -277,7 +277,20 @@ namespace libp2p::transport::lsquic { return stream; } + void Engine::wantProcess() { + if (want_process_) { + return; + } + want_process_ = true; + boost::asio::post(*io_context_, [weak_self{weak_from_this()}] { + if (auto self = weak_self.lock()) { + self->process(); + } + }); + } + void Engine::process() { + want_process_ = false; lsquic_engine_process_conns(engine_); int us = 0; if (not lsquic_engine_earliest_adv_tick(engine_, &us)) { From 404dc0dc7bf22644e2aed0701887f5ecfd1be4f6 Mon Sep 17 00:00:00 2001 From: turuslan Date: Thu, 19 Feb 2026 20:22:20 +0500 Subject: [PATCH 6/6] fix write Signed-off-by: turuslan --- include/libp2p/transport/quic/engine.hpp | 4 +++ include/libp2p/transport/quic/stream.hpp | 2 ++ src/transport/quic/engine.cpp | 36 ++++++++++++++++++++++++ src/transport/quic/stream.cpp | 24 ++++++++++++++-- 4 files changed, 64 insertions(+), 2 deletions(-) diff --git a/include/libp2p/transport/quic/engine.hpp b/include/libp2p/transport/quic/engine.hpp index c60f34d2e..105bcef2f 100644 --- a/include/libp2p/transport/quic/engine.hpp +++ b/include/libp2p/transport/quic/engine.hpp @@ -76,6 +76,8 @@ namespace libp2p::transport::lsquic { lsquic_stream_t *ls_stream; std::weak_ptr stream{}; std::optional> reading{}; + std::optional> writing{}; + bool want_flush = false; }; using OnAccept = std::function)>; @@ -112,6 +114,7 @@ namespace libp2p::transport::lsquic { on_accept_ = std::move(cb); } void wantProcess(); + void wantFlush(StreamCtx *stream_ctx); private: void process(); @@ -128,6 +131,7 @@ namespace libp2p::transport::lsquic { lsquic_engine_t *engine_ = nullptr; OnAccept on_accept_; bool started_ = false; + std::deque> want_flush_; bool want_process_ = false; std::optional connecting_; struct Reading { diff --git a/include/libp2p/transport/quic/stream.hpp b/include/libp2p/transport/quic/stream.hpp index 453a443a8..e45bf2062 100644 --- a/include/libp2p/transport/quic/stream.hpp +++ b/include/libp2p/transport/quic/stream.hpp @@ -19,6 +19,8 @@ namespace libp2p::transport::lsquic { namespace libp2p::connection { class QuicStream : public Stream, public std::enable_shared_from_this { + friend libp2p::transport::lsquic::Engine; + public: QuicStream(std::shared_ptr conn, transport::lsquic::StreamCtx *stream_ctx, diff --git a/src/transport/quic/engine.cpp b/src/transport/quic/engine.cpp index 38d7492ef..f1e046052 100644 --- a/src/transport/quic/engine.cpp +++ b/src/transport/quic/engine.cpp @@ -154,6 +154,9 @@ namespace libp2p::transport::lsquic { if (auto reading = qtils::optionTake(stream_ctx->reading)) { reading.value()(); } + if (auto writing = qtils::optionTake(stream_ctx->writing)) { + writing.value()(); + } // NOLINTNEXTLINE(cppcoreguidelines-owning-memory) delete stream_ctx; }; @@ -166,6 +169,15 @@ namespace libp2p::transport::lsquic { reading.value()(); } }; + stream_if.on_write = + +[](lsquic_stream_t *stream, lsquic_stream_ctx_t *_stream_ctx) { + lsquic_stream_wantwrite(stream, 0); + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + auto stream_ctx = reinterpret_cast(_stream_ctx); + if (auto writing = qtils::optionTake(stream_ctx->writing)) { + writing.value()(); + } + }; lsquic_engine_api api{}; api.ea_settings = &settings; @@ -289,8 +301,32 @@ namespace libp2p::transport::lsquic { }); } + void Engine::wantFlush(StreamCtx *stream_ctx) { + if (stream_ctx->want_flush) { + return; + } + stream_ctx->want_flush = true; + if (stream_ctx->stream.expired()) { + return; + } + want_flush_.emplace_back(stream_ctx->stream); + wantProcess(); + } + void Engine::process() { want_process_ = false; + auto want_flush = std::exchange(want_flush_, {}); + for (auto &weak_stream : want_flush) { + auto stream = weak_stream.lock(); + if (not stream) { + continue; + } + if (stream->stream_ctx_->ls_stream == nullptr) { + continue; + } + stream->stream_ctx_->want_flush = false; + lsquic_stream_flush(stream->stream_ctx_->ls_stream); + } lsquic_engine_process_conns(engine_); int us = 0; if (not lsquic_engine_earliest_adv_tick(engine_, &us)) { diff --git a/src/transport/quic/stream.cpp b/src/transport/quic/stream.cpp index f200139d9..4c923c8d0 100644 --- a/src/transport/quic/stream.cpp +++ b/src/transport/quic/stream.cpp @@ -62,11 +62,31 @@ namespace libp2p::connection { if (not stream_ctx_) { return cb(r); } + if (stream_ctx_->writing) { + throw std::logic_error{"QuicStream::writeSome already in progress"}; + } + // Missing from `lsquic_stream_write` documentation comment. + // Return value 0 means buffer is full. + // Call `lsquic_stream_wantwrite` and wait for `stream_if.on_write` + // callback, before calling `lsquic_stream_write` again. auto n = lsquic_stream_write(stream_ctx_->ls_stream, in.data(), in.size()); - if (n > 0 and lsquic_stream_flush(stream_ctx_->ls_stream) == 0) { + if (n == 0) { + stream_ctx_->writing.emplace( + [weak_self{weak_from_this()}, in, cb{std::move(cb)}]() mutable { + auto self = weak_self.lock(); + if (not self) { + cb(QuicError::STREAM_CLOSED); + return; + } + self->writeSome(in, std::move(cb)); + }); + lsquic_stream_wantwrite(stream_ctx_->ls_stream, 1); + return; + } + if (n > 0) { r = n; + stream_ctx_->engine->wantFlush(stream_ctx_); } - stream_ctx_->engine->process(); deferReadCallback(r, std::move(cb)); }