Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CMakePresets.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"version": 2,
"configurePresets": [
{
"name": "vcpkg",
"generator": "Ninja",
"binaryDir": "${sourceDir}/build",
"cacheVariables": {
"CMAKE_TOOLCHAIN_FILE": "$env{VCPKG_ROOT}/scripts/buildsystems/vcpkg.cmake",
"CMAKE_BUILD_TYPE": "Debug"
}
}
]
}
17 changes: 8 additions & 9 deletions include/libp2p/transport/quic/engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,9 @@ namespace libp2p::transport::lsquic {
Engine *engine;
lsquic_stream_t *ls_stream;
std::weak_ptr<QuicStream> stream{};
/**
* Stream read operation arguments.
*/
struct Reading {
BytesOut out;
std::function<void(outcome::result<size_t>)> cb;
};
std::optional<Reading> reading{};
std::optional<std::function<void()>> reading{};
std::optional<std::function<void()>> writing{};
bool want_flush = false;
};

using OnAccept = std::function<void(std::shared_ptr<QuicConnection>)>;
Expand Down Expand Up @@ -118,9 +113,11 @@ namespace libp2p::transport::lsquic {
void onAccept(OnAccept cb) {
on_accept_ = std::move(cb);
}
void process();
void wantProcess();
void wantFlush(StreamCtx *stream_ctx);

private:
void process();
void readLoop();

std::shared_ptr<boost::asio::io_context> io_context_;
Expand All @@ -134,6 +131,8 @@ namespace libp2p::transport::lsquic {
lsquic_engine_t *engine_ = nullptr;
OnAccept on_accept_;
bool started_ = false;
std::deque<std::weak_ptr<connection::QuicStream>> want_flush_;
bool want_process_ = false;
std::optional<Connecting> connecting_;
struct Reading {
static constexpr size_t kMaxUdpPacketSize = 64 << 10;
Expand Down
2 changes: 2 additions & 0 deletions include/libp2p/transport/quic/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ namespace libp2p::transport::lsquic {
namespace libp2p::connection {
class QuicStream : public Stream,
public std::enable_shared_from_this<QuicStream> {
friend libp2p::transport::lsquic::Engine;

public:
QuicStream(std::shared_ptr<transport::QuicConnection> conn,
transport::lsquic::StreamCtx *stream_ctx,
Expand Down
67 changes: 56 additions & 11 deletions src/transport/quic/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ namespace libp2p::transport::lsquic {

lsquic_engine_settings settings{};
lsquic_engine_init_settings(&settings, flags);
settings.es_versions = 1 << LSQVER_I001;
settings.es_init_max_stream_data_bidi_remote =
mux_config.maximum_window_size;
settings.es_init_max_stream_data_bidi_local =
Expand Down Expand Up @@ -147,12 +148,15 @@ namespace libp2p::transport::lsquic {
+[](lsquic_stream_t *stream, lsquic_stream_ctx_t *_stream_ctx) {
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
auto stream_ctx = reinterpret_cast<StreamCtx *>(_stream_ctx);
if (auto op = qtils::optionTake(stream_ctx->reading)) {
op->cb(QuicError::STREAM_CLOSED);
}
if (auto stream = stream_ctx->stream.lock()) {
stream->onClose();
}
if (auto reading = qtils::optionTake(stream_ctx->reading)) {
reading.value()();
}
if (auto writing = qtils::optionTake(stream_ctx->writing)) {
writing.value()();
}
// NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
delete stream_ctx;
};
Expand All @@ -161,14 +165,18 @@ namespace libp2p::transport::lsquic {
lsquic_stream_wantread(stream, 0);
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
auto stream_ctx = reinterpret_cast<StreamCtx *>(_stream_ctx);
auto op = qtils::optionTake(stream_ctx->reading).value();
auto n = lsquic_stream_read(stream, op.out.data(), op.out.size());
outcome::result<size_t> r = QuicError::STREAM_CLOSED;
if (n > 0) {
r = n;
if (auto reading = qtils::optionTake(stream_ctx->reading)) {
reading.value()();
}
};
stream_if.on_write =
+[](lsquic_stream_t *stream, lsquic_stream_ctx_t *_stream_ctx) {
lsquic_stream_wantwrite(stream, 0);
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
auto stream_ctx = reinterpret_cast<StreamCtx *>(_stream_ctx);
if (auto writing = qtils::optionTake(stream_ctx->writing)) {
writing.value()();
}
post(*stream_ctx->engine->io_context_,
[cb{std::move(op.cb)}, r] { cb(r); });
};

lsquic_engine_api api{};
Expand Down Expand Up @@ -261,7 +269,7 @@ namespace libp2p::transport::lsquic {
if (auto op = qtils::optionTake(connecting_)) {
op->cb(QuicError::CANT_CREATE_CONNECTION);
}
process();
wantProcess();
}

outcome::result<std::shared_ptr<QuicStream>> Engine::newStream(
Expand All @@ -281,7 +289,44 @@ namespace libp2p::transport::lsquic {
return stream;
}

void Engine::wantProcess() {
if (want_process_) {
return;
}
want_process_ = true;
boost::asio::post(*io_context_, [weak_self{weak_from_this()}] {
if (auto self = weak_self.lock()) {
self->process();
}
});
}

void Engine::wantFlush(StreamCtx *stream_ctx) {
if (stream_ctx->want_flush) {
return;
}
stream_ctx->want_flush = true;
if (stream_ctx->stream.expired()) {
return;
}
want_flush_.emplace_back(stream_ctx->stream);
wantProcess();
}

void Engine::process() {
want_process_ = false;
auto want_flush = std::exchange(want_flush_, {});
for (auto &weak_stream : want_flush) {
auto stream = weak_stream.lock();
if (not stream) {
continue;
}
if (stream->stream_ctx_->ls_stream == nullptr) {
continue;
}
stream->stream_ctx_->want_flush = false;
lsquic_stream_flush(stream->stream_ctx_->ls_stream);
}
lsquic_engine_process_conns(engine_);
int us = 0;
if (not lsquic_engine_earliest_adv_tick(engine_, &us)) {
Expand Down
34 changes: 31 additions & 3 deletions src/transport/quic/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,15 @@ namespace libp2p::connection {
}
auto n = lsquic_stream_read(stream_ctx_->ls_stream, out.data(), out.size());
if (n == -1 and errno == EWOULDBLOCK) {
stream_ctx_->reading.emplace(StreamCtx::Reading{out, std::move(cb)});
stream_ctx_->reading.emplace(
[weak_self{weak_from_this()}, out, cb{std::move(cb)}]() mutable {
auto self = weak_self.lock();
if (not self) {
cb(QuicError::STREAM_CLOSED);
return;
}
self->readSome(out, std::move(cb));
});
lsquic_stream_wantread(stream_ctx_->ls_stream, 1);
return;
}
Expand All @@ -54,11 +62,31 @@ namespace libp2p::connection {
if (not stream_ctx_) {
return cb(r);
}
if (stream_ctx_->writing) {
throw std::logic_error{"QuicStream::writeSome already in progress"};
}
// Missing from `lsquic_stream_write` documentation comment.
// Return value 0 means buffer is full.
// Call `lsquic_stream_wantwrite` and wait for `stream_if.on_write`
// callback, before calling `lsquic_stream_write` again.
auto n = lsquic_stream_write(stream_ctx_->ls_stream, in.data(), in.size());
if (n > 0 and lsquic_stream_flush(stream_ctx_->ls_stream) == 0) {
if (n == 0) {
stream_ctx_->writing.emplace(
[weak_self{weak_from_this()}, in, cb{std::move(cb)}]() mutable {
auto self = weak_self.lock();
if (not self) {
cb(QuicError::STREAM_CLOSED);
return;
}
self->writeSome(in, std::move(cb));
});
lsquic_stream_wantwrite(stream_ctx_->ls_stream, 1);
return;
}
if (n > 0) {
r = n;
stream_ctx_->engine->wantFlush(stream_ctx_);
}
stream_ctx_->engine->process();
deferReadCallback(r, std::move(cb));
}

Expand Down
9 changes: 1 addition & 8 deletions vcpkg-configuration.json
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
{
"default-registry": {
"kind": "git",
"baseline": "fe1cde61e971d53c9687cf9a46308f8f55da19fa",
"baseline": "897ba2ab4c4c776b985ab1f599548fcf3ae598ba",
"repository": "https://github.com/microsoft/vcpkg"
},
"registries": [
{
"kind": "artifact",
"location": "https://github.com/microsoft/vcpkg-ce-catalog/archive/refs/heads/main.zip",
"name": "microsoft"
}
],
"overlay-ports": [
"vcpkg-overlay"
]
Expand Down
Loading