Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions include/livekit/room.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#pragma once

#include <chrono>
#include <cstdint>
#include <future>
#include <memory>
#include <mutex>
Expand Down Expand Up @@ -72,21 +74,53 @@ struct RoomOptions {
/// This is CRITICAL. Without auto_subscribe, you will never receive:
/// - `track_subscribed` events
/// - remote audio/video frames
///
/// @see https://docs.livekit.io/transport/media/subscribe/#selective-subscription
bool auto_subscribe = true;

/// Enable adaptive stream for subscribed video tracks.
///
/// When enabled, the SDK tells the server it may adjust the video layers sent
/// to this client based on what the application is currently rendering. This
/// lets the server pause or downscale subscribed video that is off-screen,
/// hidden, or only needed at a smaller size, reducing downstream bandwidth and
/// decode work. This affects media received by this room; use @ref dynacast
/// to control how this client publishes layers to others.
///
/// @see https://docs.livekit.io/transport/media/subscribe/#adaptive-stream
///
/// If unset, the Rust SDK default is used.
std::optional<bool> adaptive_stream;
Comment thread
alan-george-lk marked this conversation as resolved.

Comment thread
stephen-derosa marked this conversation as resolved.
/// Enable dynacast (server sends optimal layers depending on subscribers).
///
/// @see https://docs.livekit.io/transport/media/publish/#dynacast
bool dynacast = false;

/// Optional end-to-end encryption settings.
///
/// @see https://docs.livekit.io/transport/encryption/
std::optional<E2EEOptions> encryption;

/// Optional WebRTC configuration (ICE policy, servers, etc.)
///
/// @see https://docs.livekit.io/intro/basics/connect/#connection-reliability
std::optional<RtcConfig> rtc_config;

/// Number of retries for the initial room join after the first attempt.
///
/// If unset, the Rust SDK default is used.
std::optional<std::uint32_t> join_retries;

/// Enable single peer connection mode. When true, uses one RTCPeerConnection
/// for both publishing and subscribing instead of two separate connections.
/// Falls back to dual peer connection if the server doesn't support single PC.
bool single_peer_connection = true;

/// Optional WebRTC configuration (ICE policy, servers, etc.)
std::optional<RtcConfig> rtc_config;

/// Optional end-to-end encryption settings.
std::optional<E2EEOptions> encryption;
/// Timeout for each individual signal connection attempt.
///
/// If unset, the Rust SDK default is used.
std::optional<std::chrono::milliseconds> connect_timeout;
};

/// Represents a LiveKit room session.
Expand Down
68 changes: 24 additions & 44 deletions src/ffi_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
#include <cassert>
#include <csignal>
#include <cstdio>
#include <string>
#include <type_traits>

#include "data_track.pb.h"
#include "e2ee.pb.h"
#include "ffi.pb.h"
#include "livekit/build.h"
#include "livekit/data_track_error.h"
#include "livekit/e2ee.h"
#include "livekit/ffi_handle.h"
#include "livekit/room.h"
#include "livekit/rpc_error.h"
Expand All @@ -37,11 +37,27 @@
namespace livekit {

namespace {

inline void logAndThrow(const std::string& error_msg) {
LK_LOG_ERROR("LiveKit SDK Error: {}", error_msg);
throw std::runtime_error(error_msg);
}

// Helper for debug logging of optional values
const auto optional_to_string = [](const auto& value) -> std::string {
if (!value) {
return "<unset>";
}
using Value = std::decay_t<decltype(*value)>;
if constexpr (std::is_same_v<Value, bool>) {
return *value ? "true" : "false";
} else if constexpr (std::is_same_v<Value, std::chrono::milliseconds>) {
return std::to_string(value->count());
} else {
return std::to_string(*value);
}
};

Result<proto::OwnedDataTrackStream, SubscribeDataTrackError> subscribeDataTrackFailure(SubscribeDataTrackErrorCode code,
const std::string& message) {
LK_LOG_WARN("Subscribe data track failed: code={} message={}", static_cast<std::uint32_t>(code), message);
Expand Down Expand Up @@ -471,50 +487,14 @@ std::future<proto::ConnectCallback> FfiClient::connectAsync(const std::string& u
connect->set_url(url);
connect->set_token(token);
connect->set_request_async_id(async_id);
auto* opts = connect->mutable_options();
opts->set_auto_subscribe(options.auto_subscribe);
opts->set_dynacast(options.dynacast);
opts->set_single_peer_connection(options.single_peer_connection);
connect->mutable_options()->CopyFrom(toProto(options));

LK_LOG_DEBUG(
"[FfiClient] connectAsync: auto_subscribe={}, dynacast={}, "
"single_peer_connection={}",
options.auto_subscribe, options.dynacast, options.single_peer_connection);

// --- E2EE / encryption (optional) ---
if (options.encryption.has_value()) {
const E2EEOptions& e2ee = *options.encryption;
const auto& kpo = e2ee.key_provider_options;

auto* enc = opts->mutable_encryption();
enc->set_encryption_type(static_cast<proto::EncryptionType>(e2ee.encryption_type));
enc->mutable_key_provider_options()->CopyFrom(toProto(kpo));
}

// --- RTC configuration (optional) ---
if (options.rtc_config.has_value()) {
const RtcConfig& rc = *options.rtc_config;
auto* rtc = opts->mutable_rtc_config();

rtc->set_ice_transport_type(static_cast<proto::IceTransportType>(rc.ice_transport_type));
rtc->set_continual_gathering_policy(static_cast<proto::ContinualGatheringPolicy>(rc.continual_gathering_policy));

for (const IceServer& ice : rc.ice_servers) {
auto* s = rtc->add_ice_servers();

// proto: repeated string urls = 1
if (!ice.url.empty()) {
s->add_urls(ice.url);
}
if (!ice.username.empty()) {
s->set_username(ice.username);
}
if (!ice.credential.empty()) {
// proto: password = 3
s->set_password(ice.credential);
}
}
}
"[FfiClient] connectAsync: auto_subscribe={}, adaptive_stream={}, dynacast={}, "
"single_peer_connection={}, join_retries={}, connect_timeout_ms={}",
options.auto_subscribe, optional_to_string(options.adaptive_stream), options.dynacast,
options.single_peer_connection, optional_to_string(options.join_retries),
optional_to_string(options.connect_timeout));

try {
const proto::FfiResponse resp = sendRequest(req);
Expand Down
45 changes: 45 additions & 0 deletions src/room_proto_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "room_proto_converter.h"

#include "livekit/data_stream.h"
#include "livekit/room.h"
#include "room.pb.h"

namespace livekit {
Expand Down Expand Up @@ -402,6 +403,50 @@ proto::KeyProviderOptions toProto(const KeyProviderOptions& in) {
return out;
}

proto::RoomOptions toProto(const RoomOptions& in) {
proto::RoomOptions out;
out.set_auto_subscribe(in.auto_subscribe);
if (in.adaptive_stream) {
out.set_adaptive_stream(*in.adaptive_stream);
}
out.set_dynacast(in.dynacast);

if (in.encryption) {
auto* encryption = out.mutable_encryption();
encryption->set_encryption_type(static_cast<proto::EncryptionType>(in.encryption->encryption_type));
encryption->mutable_key_provider_options()->CopyFrom(toProto(in.encryption->key_provider_options));
}

if (in.rtc_config) {
auto* rtc = out.mutable_rtc_config();
rtc->set_ice_transport_type(static_cast<proto::IceTransportType>(in.rtc_config->ice_transport_type));
rtc->set_continual_gathering_policy(
static_cast<proto::ContinualGatheringPolicy>(in.rtc_config->continual_gathering_policy));

for (const IceServer& ice : in.rtc_config->ice_servers) {
auto* server = rtc->add_ice_servers();
if (!ice.url.empty()) {
server->add_urls(ice.url);
}
if (!ice.username.empty()) {
server->set_username(ice.username);
}
if (!ice.credential.empty()) {
server->set_password(ice.credential);
}
}
}

if (in.join_retries) {
out.set_join_retries(*in.join_retries);
}
out.set_single_peer_connection(in.single_peer_connection);
if (in.connect_timeout) {
out.set_connect_timeout_ms(static_cast<std::uint64_t>(in.connect_timeout->count()));
}
return out;
}

proto::AudioEncoding toProto(const AudioEncodingOptions& in) {
proto::AudioEncoding msg;
msg.set_max_bitrate(in.max_bitrate);
Expand Down
2 changes: 2 additions & 0 deletions src/room_proto_converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace livekit {
enum class RpcErrorCode;
class RemoteParticipant;
struct ByteStreamInfo;
struct RoomOptions;
struct TextStreamInfo;

// --------- basic helper conversions ---------
Expand Down Expand Up @@ -73,6 +74,7 @@ LIVEKIT_INTERNAL_API RoomMovedEvent roomMovedFromProto(const proto::RoomInfo& in
// --------- room options conversions ---------

LIVEKIT_INTERNAL_API proto::KeyProviderOptions toProto(const KeyProviderOptions& in);
LIVEKIT_INTERNAL_API proto::RoomOptions toProto(const RoomOptions& in);

LIVEKIT_INTERNAL_API proto::AudioEncoding toProto(const AudioEncodingOptions& in);
LIVEKIT_INTERNAL_API AudioEncodingOptions fromProto(const proto::AudioEncoding& in);
Expand Down
105 changes: 104 additions & 1 deletion src/tests/unit/test_room.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
#include <gtest/gtest.h>
#include <livekit/livekit.h>

#include <chrono>
#include <string>

#include "ffi.pb.h"
#include "room_proto_converter.h"

namespace livekit::test {

class RoomTest : public ::testing::Test {
Expand All @@ -31,6 +37,8 @@ TEST_F(RoomTest, ConnectWithoutInitialize) {
livekit::shutdown();

Room room;

// Default room options okay here, will return before FFI layer since not initialized
bool result = room.connect("wss://localhost:7880", "test", livekit::RoomOptions());
EXPECT_FALSE(result) << "Connecting without initializing should return false";
EXPECT_TRUE(room.localParticipant().expired()) << "Local participant should be empty after failed connect";
Expand All @@ -47,9 +55,104 @@ TEST_F(RoomTest, RoomOptionsDefaults) {
RoomOptions options;

EXPECT_TRUE(options.auto_subscribe) << "auto_subscribe should default to true";
EXPECT_FALSE(options.adaptive_stream.has_value()) << "adaptive_stream should defer to Rust default";
EXPECT_FALSE(options.dynacast) << "dynacast should default to false";
EXPECT_FALSE(options.rtc_config.has_value()) << "rtc_config should not have a value by default";
EXPECT_FALSE(options.encryption.has_value()) << "encryption should not have a value by default";
EXPECT_FALSE(options.rtc_config.has_value()) << "rtc_config should not have a value by default";
EXPECT_FALSE(options.join_retries.has_value()) << "join_retries should defer to Rust default";
EXPECT_TRUE(options.single_peer_connection) << "single_peer_connection should default to true";
EXPECT_FALSE(options.connect_timeout.has_value()) << "connect_timeout should defer to Rust default";
}

TEST_F(RoomTest, RoomOptionsToProtoSerializesDefaults) {
const proto::RoomOptions proto_options = toProto(RoomOptions{});

EXPECT_TRUE(proto_options.has_auto_subscribe());
EXPECT_TRUE(proto_options.auto_subscribe());
EXPECT_FALSE(proto_options.has_adaptive_stream());
EXPECT_TRUE(proto_options.has_dynacast());
EXPECT_FALSE(proto_options.dynacast());
EXPECT_FALSE(proto_options.has_encryption());
EXPECT_FALSE(proto_options.has_rtc_config());
EXPECT_FALSE(proto_options.has_join_retries());
EXPECT_TRUE(proto_options.has_single_peer_connection());
EXPECT_TRUE(proto_options.single_peer_connection());
EXPECT_FALSE(proto_options.has_connect_timeout_ms());
}

TEST_F(RoomTest, RoomOptionsProtoConverter) {
RoomOptions options;
options.auto_subscribe = false;
options.adaptive_stream = true;
options.dynacast = true;
E2EEOptions encryption;
encryption.key_provider_options.shared_key = std::vector<std::uint8_t>{'s', 'e', 'c', 'r', 'e', 't'};
options.encryption = encryption;
RtcConfig rtc_config;
rtc_config.ice_transport_type = proto::TRANSPORT_ALL;
rtc_config.continual_gathering_policy = proto::GATHER_CONTINUALLY;
rtc_config.ice_servers.push_back({"stun:stun.l.google.com:19302", "", ""});
rtc_config.ice_servers.push_back({"turn:turn.example.com:3478", "user", "pass"});
options.rtc_config = rtc_config;
options.join_retries = 8;
options.single_peer_connection = false;
options.connect_timeout = std::chrono::milliseconds(750);

const proto::RoomOptions proto_options = toProto(options);

EXPECT_TRUE(proto_options.has_auto_subscribe());
EXPECT_FALSE(proto_options.auto_subscribe());
EXPECT_TRUE(proto_options.has_adaptive_stream());
EXPECT_TRUE(proto_options.adaptive_stream());
EXPECT_TRUE(proto_options.has_dynacast());
EXPECT_TRUE(proto_options.dynacast());
ASSERT_TRUE(proto_options.has_encryption());
EXPECT_EQ(proto_options.encryption().encryption_type(),
static_cast<proto::EncryptionType>(encryption.encryption_type));
ASSERT_TRUE(proto_options.encryption().has_key_provider_options());
EXPECT_EQ(proto_options.encryption().key_provider_options().shared_key(), "secret");
ASSERT_TRUE(proto_options.has_rtc_config());
EXPECT_EQ(proto_options.rtc_config().ice_transport_type(), proto::TRANSPORT_ALL);
EXPECT_EQ(proto_options.rtc_config().continual_gathering_policy(), proto::GATHER_CONTINUALLY);
ASSERT_EQ(proto_options.rtc_config().ice_servers_size(), 2);
EXPECT_EQ(proto_options.rtc_config().ice_servers(0).urls(0), "stun:stun.l.google.com:19302");
EXPECT_EQ(proto_options.rtc_config().ice_servers(1).urls(0), "turn:turn.example.com:3478");
EXPECT_EQ(proto_options.rtc_config().ice_servers(1).username(), "user");
EXPECT_EQ(proto_options.rtc_config().ice_servers(1).password(), "pass");
EXPECT_TRUE(proto_options.has_join_retries());
EXPECT_EQ(proto_options.join_retries(), 8U);
EXPECT_TRUE(proto_options.has_single_peer_connection());
EXPECT_FALSE(proto_options.single_peer_connection());
EXPECT_TRUE(proto_options.has_connect_timeout_ms());
EXPECT_EQ(proto_options.connect_timeout_ms(), 750U);
}

TEST(RoomOptionsProtoTest, ConnectRequestSerializesRetryOptions) {
RoomOptions options;
options.join_retries = 8;
options.connect_timeout = std::chrono::milliseconds(750);

proto::FfiRequest request;
auto* connect = request.mutable_connect();
connect->set_url("ws://localhost:7880");
connect->set_token("test-token");
connect->mutable_options()->CopyFrom(toProto(options));

ASSERT_TRUE(connect->options().has_join_retries());
EXPECT_EQ(connect->options().join_retries(), 8U);
ASSERT_TRUE(connect->options().has_connect_timeout_ms());
EXPECT_EQ(connect->options().connect_timeout_ms(), 750U);

ASSERT_TRUE(request.IsInitialized()) << request.InitializationErrorString();

std::string serialized;
ASSERT_TRUE(request.SerializeToString(&serialized));
EXPECT_FALSE(serialized.empty());

proto::FfiRequest decoded;
ASSERT_TRUE(decoded.ParseFromString(serialized));
EXPECT_EQ(decoded.connect().options().join_retries(), 8U);
EXPECT_EQ(decoded.connect().options().connect_timeout_ms(), 750U);
}

TEST_F(RoomTest, RtcConfigDefaults) {
Expand Down
Loading