Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions runtime-light/coroutine/await-set.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "runtime-light/coroutine/coroutine-state.h"
#include "runtime-light/coroutine/detail/await-set.h"
#include "runtime-light/coroutine/type-traits.h"
#include "runtime-light/stdlib/diagnostics/logs.h"

namespace kphp::coro {

Expand Down Expand Up @@ -44,10 +45,12 @@ class await_set {
template<typename awaitable_type>
requires kphp::coro::concepts::awaitable<awaitable_type> && std::is_same_v<typename awaitable_traits<awaitable_type>::awaiter_return_type, return_type>
void push(awaitable_type awaitable) noexcept {
kphp::log::assertion(m_await_broker != nullptr);
m_await_broker->start_task(detail::await_set::make_await_set_task(std::move(awaitable)), m_coroutine_stack_root, STACK_RETURN_ADDRESS);
}

auto next() noexcept {
kphp::log::assertion(m_await_broker != nullptr);
return detail::await_set::await_set_awaitable<return_type>{*m_await_broker};
}

Expand All @@ -56,12 +59,9 @@ class await_set {
}

size_t size() const noexcept {
kphp::log::assertion(m_await_broker != nullptr);
return m_await_broker->size();
}

~await_set() {
m_await_broker.release();
}
};

} // namespace kphp::coro
20 changes: 20 additions & 0 deletions runtime-light/state/instance-state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "runtime-common/core/std/containers.h"
#include "runtime-light/core/globals/php-init-scripts.h"
#include "runtime-light/core/globals/php-script-globals.h"
#include "runtime-light/coroutine/await-set.h"
#include "runtime-light/coroutine/task.h"
#include "runtime-light/k2-platform/k2-api.h"
#include "runtime-light/server/cli/init-functions.h"
Expand All @@ -27,6 +28,7 @@
#include "runtime-light/stdlib/diagnostics/logs.h"
#include "runtime-light/stdlib/fork/fork-functions.h"
#include "runtime-light/stdlib/fork/fork-state.h"
#include "runtime-light/stdlib/rpc/rpc-client-state.h"
#include "runtime-light/stdlib/time/time-functions.h"
#include "runtime-light/streams/read-ext.h"
#include "runtime-light/streams/stream.h"
Expand Down Expand Up @@ -214,4 +216,22 @@ kphp::coro::task<> InstanceState::run_instance_epilogue() noexcept {
web_state.session_is_finished = true;
web_state.session.reset();
}

/*
* Unlike regular RPC requests whose results the user code waits for via rpc_fetch_responses,
* thereby guaranteeing they are sent, the user code does not wait for requests sent with the
* ignore_answer flag. Therefore, we can’t guarantee that the coroutines responsible for
* sending ignore_answer requests have finished. This means the requests might not be sent
* if the instance terminates.
*
* This await suspends the current coroutine until all pending ignore_answer requests are
* fully sent. While suspended, other forks and coroutines may continue running.
*
* After this call completes, delivery of all ignore_answer requests is guaranteed.
*/
auto& rpc_client_instance_st{RpcClientInstanceState::get()};
auto ignore_answer_request_await_set{std::exchange(rpc_client_instance_st.ignore_answer_request_awaiter_tasks, kphp::coro::await_set<void>{})};
while (!ignore_answer_request_await_set.empty()) {
co_await ignore_answer_request_await_set.next();
}
}
16 changes: 13 additions & 3 deletions runtime-light/stdlib/rpc/rpc-api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,11 @@ kphp::coro::task<kphp::rpc::query_info> send_request(std::string_view actor, std
co_return std::move(opt_response);
}};

static constexpr auto ignore_answer_awaiter_coroutine{[](kphp::component::stream stream, std::chrono::milliseconds timeout) -> kphp::coro::shared_task<void> {
auto fetch_task{kphp::component::fetch_response(stream, [](std::span<const std::byte>) noexcept {})};
std::ignore = co_await kphp::coro::io_scheduler::get().schedule(std::move(fetch_task), timeout);
}};

// normalize timeout
using namespace std::chrono_literals;
static constexpr auto DEFAULT_TIMEOUT{300ms};
Expand All @@ -356,13 +361,18 @@ kphp::coro::task<kphp::rpc::query_info> send_request(std::string_view actor, std
})
.value_or(DEFAULT_TIMEOUT),
MIN_TIMEOUT, MAX_TIMEOUT)};
if (ignore_answer) {
// start ignore answer awaiter task
auto ignore_answer_awaiter_task{ignore_answer_awaiter_coroutine(std::move(stream), timeout)};
kphp::log::assertion(kphp::coro::io_scheduler::get().start(ignore_answer_awaiter_task));

rpc_client_instance_st.ignore_answer_request_awaiter_tasks.push(std::move(ignore_answer_awaiter_task));
co_return kphp::rpc::query_info{.id = kphp::rpc::IGNORED_ANSWER_QUERY_ID, .request_size = request_size, .timestamp = timestamp};
}
// start awaiter task
auto awaiter_task{awaiter_coroutine(query_id, std::move(stream), timeout, collect_responses_extra_info)};
kphp::log::assertion(kphp::coro::io_scheduler::get().start(awaiter_task));

if (ignore_answer) {
co_return kphp::rpc::query_info{.id = kphp::rpc::IGNORED_ANSWER_QUERY_ID, .request_size = request_size, .timestamp = timestamp};
}
rpc_client_instance_st.response_awaiter_tasks.emplace(query_id, std::move(awaiter_task));
co_return kphp::rpc::query_info{.id = query_id, .request_size = request_size, .timestamp = timestamp};
}
Expand Down
2 changes: 2 additions & 0 deletions runtime-light/stdlib/rpc/rpc-client-state.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "runtime-common/core/allocator/script-allocator.h"
#include "runtime-common/core/runtime-core.h"
#include "runtime-common/core/std/containers.h"
#include "runtime-light/coroutine/await-set.h"
#include "runtime-light/coroutine/shared-task.h"
#include "runtime-light/stdlib/rpc/rpc-constants.h"
#include "runtime-light/stdlib/rpc/rpc-extra-info.h"
Expand All @@ -26,6 +27,7 @@ struct RpcClientInstanceState final : private vk::not_copyable {
kphp::stl::unordered_map<int64_t, class_instance<RpcTlQuery>, kphp::memory::script_allocator> response_fetcher_instances;
kphp::stl::unordered_map<int64_t, std::pair<kphp::rpc::response_extra_info_status, kphp::rpc::response_extra_info>, kphp::memory::script_allocator>
rpc_responses_extra_info;
kphp::coro::await_set<void> ignore_answer_request_awaiter_tasks;

RpcClientInstanceState() noexcept = default;

Expand Down
Loading