diff --git a/CMakePresets.json b/CMakePresets.json new file mode 100644 index 000000000..e0445e306 --- /dev/null +++ b/CMakePresets.json @@ -0,0 +1,14 @@ +{ + "version": 2, + "configurePresets": [ + { + "name": "vcpkg", + "generator": "Ninja", + "binaryDir": "${sourceDir}/build", + "cacheVariables": { + "CMAKE_TOOLCHAIN_FILE": "$env{VCPKG_ROOT}/scripts/buildsystems/vcpkg.cmake", + "CMAKE_BUILD_TYPE": "Debug" + } + } + ] +} diff --git a/include/libp2p/transport/quic/engine.hpp b/include/libp2p/transport/quic/engine.hpp index f29151d9a..105bcef2f 100644 --- a/include/libp2p/transport/quic/engine.hpp +++ b/include/libp2p/transport/quic/engine.hpp @@ -75,14 +75,9 @@ namespace libp2p::transport::lsquic { Engine *engine; lsquic_stream_t *ls_stream; std::weak_ptr stream{}; - /** - * Stream read operation arguments. - */ - struct Reading { - BytesOut out; - std::function)> cb; - }; - std::optional reading{}; + std::optional> reading{}; + std::optional> writing{}; + bool want_flush = false; }; using OnAccept = std::function)>; @@ -118,9 +113,11 @@ namespace libp2p::transport::lsquic { void onAccept(OnAccept cb) { on_accept_ = std::move(cb); } - void process(); + void wantProcess(); + void wantFlush(StreamCtx *stream_ctx); private: + void process(); void readLoop(); std::shared_ptr io_context_; @@ -134,6 +131,8 @@ namespace libp2p::transport::lsquic { lsquic_engine_t *engine_ = nullptr; OnAccept on_accept_; bool started_ = false; + std::deque> want_flush_; + bool want_process_ = false; std::optional connecting_; struct Reading { static constexpr size_t kMaxUdpPacketSize = 64 << 10; diff --git a/include/libp2p/transport/quic/stream.hpp b/include/libp2p/transport/quic/stream.hpp index 453a443a8..e45bf2062 100644 --- a/include/libp2p/transport/quic/stream.hpp +++ b/include/libp2p/transport/quic/stream.hpp @@ -19,6 +19,8 @@ namespace libp2p::transport::lsquic { namespace libp2p::connection { class QuicStream : public Stream, public std::enable_shared_from_this { + friend libp2p::transport::lsquic::Engine; + public: QuicStream(std::shared_ptr conn, transport::lsquic::StreamCtx *stream_ctx, diff --git a/src/transport/quic/engine.cpp b/src/transport/quic/engine.cpp index 267a8d761..f1e046052 100644 --- a/src/transport/quic/engine.cpp +++ b/src/transport/quic/engine.cpp @@ -45,6 +45,7 @@ namespace libp2p::transport::lsquic { lsquic_engine_settings settings{}; lsquic_engine_init_settings(&settings, flags); + settings.es_versions = 1 << LSQVER_I001; settings.es_init_max_stream_data_bidi_remote = mux_config.maximum_window_size; settings.es_init_max_stream_data_bidi_local = @@ -147,12 +148,15 @@ namespace libp2p::transport::lsquic { +[](lsquic_stream_t *stream, lsquic_stream_ctx_t *_stream_ctx) { // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) auto stream_ctx = reinterpret_cast(_stream_ctx); - if (auto op = qtils::optionTake(stream_ctx->reading)) { - op->cb(QuicError::STREAM_CLOSED); - } if (auto stream = stream_ctx->stream.lock()) { stream->onClose(); } + if (auto reading = qtils::optionTake(stream_ctx->reading)) { + reading.value()(); + } + if (auto writing = qtils::optionTake(stream_ctx->writing)) { + writing.value()(); + } // NOLINTNEXTLINE(cppcoreguidelines-owning-memory) delete stream_ctx; }; @@ -161,14 +165,18 @@ namespace libp2p::transport::lsquic { lsquic_stream_wantread(stream, 0); // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) auto stream_ctx = reinterpret_cast(_stream_ctx); - auto op = qtils::optionTake(stream_ctx->reading).value(); - auto n = lsquic_stream_read(stream, op.out.data(), op.out.size()); - outcome::result r = QuicError::STREAM_CLOSED; - if (n > 0) { - r = n; + if (auto reading = qtils::optionTake(stream_ctx->reading)) { + reading.value()(); + } + }; + stream_if.on_write = + +[](lsquic_stream_t *stream, lsquic_stream_ctx_t *_stream_ctx) { + lsquic_stream_wantwrite(stream, 0); + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + auto stream_ctx = reinterpret_cast(_stream_ctx); + if (auto writing = qtils::optionTake(stream_ctx->writing)) { + writing.value()(); } - post(*stream_ctx->engine->io_context_, - [cb{std::move(op.cb)}, r] { cb(r); }); }; lsquic_engine_api api{}; @@ -261,7 +269,7 @@ namespace libp2p::transport::lsquic { if (auto op = qtils::optionTake(connecting_)) { op->cb(QuicError::CANT_CREATE_CONNECTION); } - process(); + wantProcess(); } outcome::result> Engine::newStream( @@ -281,7 +289,44 @@ namespace libp2p::transport::lsquic { return stream; } + void Engine::wantProcess() { + if (want_process_) { + return; + } + want_process_ = true; + boost::asio::post(*io_context_, [weak_self{weak_from_this()}] { + if (auto self = weak_self.lock()) { + self->process(); + } + }); + } + + void Engine::wantFlush(StreamCtx *stream_ctx) { + if (stream_ctx->want_flush) { + return; + } + stream_ctx->want_flush = true; + if (stream_ctx->stream.expired()) { + return; + } + want_flush_.emplace_back(stream_ctx->stream); + wantProcess(); + } + void Engine::process() { + want_process_ = false; + auto want_flush = std::exchange(want_flush_, {}); + for (auto &weak_stream : want_flush) { + auto stream = weak_stream.lock(); + if (not stream) { + continue; + } + if (stream->stream_ctx_->ls_stream == nullptr) { + continue; + } + stream->stream_ctx_->want_flush = false; + lsquic_stream_flush(stream->stream_ctx_->ls_stream); + } lsquic_engine_process_conns(engine_); int us = 0; if (not lsquic_engine_earliest_adv_tick(engine_, &us)) { diff --git a/src/transport/quic/stream.cpp b/src/transport/quic/stream.cpp index 1adb3f527..4c923c8d0 100644 --- a/src/transport/quic/stream.cpp +++ b/src/transport/quic/stream.cpp @@ -34,7 +34,15 @@ namespace libp2p::connection { } auto n = lsquic_stream_read(stream_ctx_->ls_stream, out.data(), out.size()); if (n == -1 and errno == EWOULDBLOCK) { - stream_ctx_->reading.emplace(StreamCtx::Reading{out, std::move(cb)}); + stream_ctx_->reading.emplace( + [weak_self{weak_from_this()}, out, cb{std::move(cb)}]() mutable { + auto self = weak_self.lock(); + if (not self) { + cb(QuicError::STREAM_CLOSED); + return; + } + self->readSome(out, std::move(cb)); + }); lsquic_stream_wantread(stream_ctx_->ls_stream, 1); return; } @@ -54,11 +62,31 @@ namespace libp2p::connection { if (not stream_ctx_) { return cb(r); } + if (stream_ctx_->writing) { + throw std::logic_error{"QuicStream::writeSome already in progress"}; + } + // Missing from `lsquic_stream_write` documentation comment. + // Return value 0 means buffer is full. + // Call `lsquic_stream_wantwrite` and wait for `stream_if.on_write` + // callback, before calling `lsquic_stream_write` again. auto n = lsquic_stream_write(stream_ctx_->ls_stream, in.data(), in.size()); - if (n > 0 and lsquic_stream_flush(stream_ctx_->ls_stream) == 0) { + if (n == 0) { + stream_ctx_->writing.emplace( + [weak_self{weak_from_this()}, in, cb{std::move(cb)}]() mutable { + auto self = weak_self.lock(); + if (not self) { + cb(QuicError::STREAM_CLOSED); + return; + } + self->writeSome(in, std::move(cb)); + }); + lsquic_stream_wantwrite(stream_ctx_->ls_stream, 1); + return; + } + if (n > 0) { r = n; + stream_ctx_->engine->wantFlush(stream_ctx_); } - stream_ctx_->engine->process(); deferReadCallback(r, std::move(cb)); } diff --git a/vcpkg-configuration.json b/vcpkg-configuration.json index 1ad42d251..0073f1a14 100644 --- a/vcpkg-configuration.json +++ b/vcpkg-configuration.json @@ -1,16 +1,9 @@ { "default-registry": { "kind": "git", - "baseline": "fe1cde61e971d53c9687cf9a46308f8f55da19fa", + "baseline": "897ba2ab4c4c776b985ab1f599548fcf3ae598ba", "repository": "https://github.com/microsoft/vcpkg" }, - "registries": [ - { - "kind": "artifact", - "location": "https://github.com/microsoft/vcpkg-ce-catalog/archive/refs/heads/main.zip", - "name": "microsoft" - } - ], "overlay-ports": [ "vcpkg-overlay" ]