diff --git a/CMakeLists.txt b/CMakeLists.txt index be4a8e5d..023366b4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -376,6 +376,7 @@ add_library(livekit SHARED src/data_track_frame.cpp src/data_stream.cpp src/data_track_error.cpp + src/data_track_schema.cpp src/data_track_stream.cpp src/e2ee.cpp src/ffi_handle.cpp diff --git a/client-sdk-rust b/client-sdk-rust index 175cf276..a271c9d5 160000 --- a/client-sdk-rust +++ b/client-sdk-rust @@ -1 +1 @@ -Subproject commit 175cf276a8aa6770dbc795404fa91dc55dd27f10 +Subproject commit a271c9d558c655b7d12a545929ce015230c122c8 diff --git a/include/livekit/data_track_info.h b/include/livekit/data_track_info.h index 27310269..25efaccb 100644 --- a/include/livekit/data_track_info.h +++ b/include/livekit/data_track_info.h @@ -16,8 +16,11 @@ #pragma once +#include #include +#include "livekit/data_track_schema.h" + namespace livekit { /// Metadata about a published data track. @@ -33,6 +36,12 @@ struct DataTrackInfo { /// Whether frames on this track use end-to-end encryption. bool uses_e2ee = false; + + /// Schema associated with frames sent on the track, if any. + std::optional schema; + + /// Encoding of frames sent on the track, if specified. + std::optional frame_encoding; }; } // namespace livekit diff --git a/include/livekit/data_track_options.h b/include/livekit/data_track_options.h new file mode 100644 index 00000000..ba6a612e --- /dev/null +++ b/include/livekit/data_track_options.h @@ -0,0 +1,45 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "livekit/data_track_schema.h" + +namespace livekit { + +/** + * Options for publishing a data track. + * + * The schema and frame encoding are optional metadata advertised to + * subscribers; they are surfaced on the subscriber side via DataTrackInfo. + */ +struct DataTrackPublishOptions { + /// Track name used to identify the track to other participants. + /// + /// Must not be empty and must be unique per publisher. + std::string name; + + /// Schema describing frames sent on the track, if any. + std::optional schema; + + /// Encoding of frames sent on the track, if any. + std::optional frame_encoding; +}; + +} // namespace livekit diff --git a/include/livekit/data_track_schema.h b/include/livekit/data_track_schema.h new file mode 100644 index 00000000..8354cefc --- /dev/null +++ b/include/livekit/data_track_schema.h @@ -0,0 +1,90 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace livekit { + +/** + * Encoding used to interpret a data track schema definition. + * + * Identifies the interface definition language the schema is written in (e.g. a + * `.proto` file for \ref DataTrackSchemaEncoding::Protobuf), which in turn + * dictates the wire format of the frames the schema describes. + */ +enum class DataTrackSchemaEncoding { + /** Protocol Buffer IDL. */ + Protobuf, + /** FlatBuffer IDL. */ + Flatbuffer, + /** ROS 1 Message. */ + Ros1Msg, + /** ROS 2 Message. */ + Ros2Msg, + /** ROS 2 IDL. */ + Ros2Idl, + /** OMG IDL. */ + OmgIdl, + /** JSON Schema. */ + JsonSchema, + /** Another encoding not known to this client version. */ + Other, +}; + +/** + * Encoding used for frames sent on a data track. + * + * The serialization format of the frame bytes (e.g. + * \ref DataTrackFrameEncoding::Protobuf); the structure of those bytes is + * described by a schema (see \ref DataTrackSchemaEncoding). + */ +enum class DataTrackFrameEncoding { + /** ROS 1, described by a Ros1Msg schema. */ + Ros1, + /** CDR, described by a Ros2Msg, Ros2Idl, or OmgIdl schema. */ + Cdr, + /** Protocol Buffer, described by a Protobuf schema. */ + Protobuf, + /** FlatBuffer, described by a Flatbuffer schema. */ + Flatbuffer, + /** CBOR, self-describing. */ + Cbor, + /** MessagePack, self-describing. */ + Msgpack, + /** JSON, self-describing or described by a JsonSchema schema. */ + Json, + /** Another encoding not known to this client version. */ + Other, +}; + +/** + * Uniquely identifies a data track schema. + * + * A compound identifier with two components: a name and an encoding. Two IDs are + * equal only if both components match; the same name with a different encoding + * refers to a distinct schema. + */ +struct DataTrackSchemaId { + /** Name component of the schema identifier. */ + std::string name; + + /** Encoding of the schema definition. */ + DataTrackSchemaEncoding encoding = DataTrackSchemaEncoding::Other; +}; + +} // namespace livekit diff --git a/include/livekit/local_participant.h b/include/livekit/local_participant.h index 9369a914..17f921ab 100644 --- a/include/livekit/local_participant.h +++ b/include/livekit/local_participant.h @@ -26,6 +26,8 @@ #include #include +#include "livekit/data_track_options.h" +#include "livekit/data_track_schema.h" #include "livekit/ffi_handle.h" #include "livekit/local_audio_track.h" #include "livekit/local_data_track.h" @@ -172,6 +174,18 @@ class LIVEKIT_API LocalParticipant : public Participant { /// publication failed. Result, PublishDataTrackError> publishDataTrack(const std::string& name); + /// Publish a data track to the room with explicit options. + /// + /// Like publishDataTrack(const std::string&), but also lets the publisher + /// advertise an optional schema and frame encoding as metadata. These are + /// surfaced to subscribers via the remote DataTrackInfo. + /// + /// @param options Track name plus optional schema / frame encoding. + /// @return The published track on success, or a typed error describing why + /// publication failed. + Result, PublishDataTrackError> publishDataTrack( + const DataTrackPublishOptions& options); + /// Unpublish a data track from the room. /// /// Delegates to LocalDataTrack::unpublishDataTrack(). After this call, @@ -180,6 +194,39 @@ class LIVEKIT_API LocalParticipant : public Participant { /// @param track The data track to unpublish. Null is ignored. void unpublishDataTrack(const std::shared_ptr& track); + /// Store the definition of a data track schema. + /// + /// Called by a publisher to make a schema available to subscribers, who can + /// later look up its definition via getSchema(). Define a schema before + /// publishing any data track that references it, so subscribers can resolve + /// the schema by its ID. + /// + /// A schema can only be defined once. Attempting to redefine an existing + /// schema fails. + /// + /// @param id Identifies the schema (name and encoding). + /// @param definition The schema definition, stored as-is. It is neither + /// parsed nor validated against its encoding, so the + /// caller is responsible for ensuring it is well-formed. + /// + /// @throws std::runtime_error If the schema could not be defined or the FFI + /// call fails. + void defineSchema(const DataTrackSchemaId& id, const std::string& definition); + + /// Retrieve the definition for a data track schema. + /// + /// Called by a subscriber that wants to inspect the schema a participant + /// defined (see defineSchema()) for a data track it is publishing. + /// + /// @param id Identifies the schema to retrieve. + /// @param participant_identity Identity of the participant that defined the + /// schema. + /// @return The schema definition. + /// + /// @throws std::runtime_error If the participant has not defined a schema + /// with this ID or the FFI call fails. + std::string getSchema(const DataTrackSchemaId& id, const std::string& participant_identity); + /// Initiate an RPC call to a remote participant. /// /// @param destination_identity Identity of the destination participant. diff --git a/src/data_track_proto_converter.h b/src/data_track_proto_converter.h new file mode 100644 index 00000000..5c04f869 --- /dev/null +++ b/src/data_track_proto_converter.h @@ -0,0 +1,37 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "data_track.pb.h" +#include "livekit/data_track_info.h" +#include "livekit/data_track_schema.h" + +namespace livekit { + +proto::DataTrackSchemaEncoding toProto(DataTrackSchemaEncoding in); +DataTrackSchemaEncoding fromProto(proto::DataTrackSchemaEncoding in); + +proto::DataTrackFrameEncoding toProto(DataTrackFrameEncoding in); +DataTrackFrameEncoding fromProto(proto::DataTrackFrameEncoding in); + +proto::DataTrackSchemaId toProto(const DataTrackSchemaId& in); +DataTrackSchemaId fromProto(const proto::DataTrackSchemaId& in); + +// Converts an FFI data track info message into the public DataTrackInfo struct. +DataTrackInfo fromProto(const proto::DataTrackInfo& in); + +} // namespace livekit diff --git a/src/data_track_schema.cpp b/src/data_track_schema.cpp new file mode 100644 index 00000000..77afb89b --- /dev/null +++ b/src/data_track_schema.cpp @@ -0,0 +1,137 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "data_track_proto_converter.h" + +namespace livekit { + +proto::DataTrackSchemaEncoding toProto(DataTrackSchemaEncoding in) { + switch (in) { + case DataTrackSchemaEncoding::Protobuf: + return proto::DATA_TRACK_SCHEMA_ENCODING_PROTOBUF; + case DataTrackSchemaEncoding::Flatbuffer: + return proto::DATA_TRACK_SCHEMA_ENCODING_FLATBUFFER; + case DataTrackSchemaEncoding::Ros1Msg: + return proto::DATA_TRACK_SCHEMA_ENCODING_ROS1_MSG; + case DataTrackSchemaEncoding::Ros2Msg: + return proto::DATA_TRACK_SCHEMA_ENCODING_ROS2_MSG; + case DataTrackSchemaEncoding::Ros2Idl: + return proto::DATA_TRACK_SCHEMA_ENCODING_ROS2_IDL; + case DataTrackSchemaEncoding::OmgIdl: + return proto::DATA_TRACK_SCHEMA_ENCODING_OMG_IDL; + case DataTrackSchemaEncoding::JsonSchema: + return proto::DATA_TRACK_SCHEMA_ENCODING_JSON_SCHEMA; + case DataTrackSchemaEncoding::Other: + return proto::DATA_TRACK_SCHEMA_ENCODING_OTHER; + } + return proto::DATA_TRACK_SCHEMA_ENCODING_OTHER; +} + +DataTrackSchemaEncoding fromProto(proto::DataTrackSchemaEncoding in) { + switch (in) { + case proto::DATA_TRACK_SCHEMA_ENCODING_PROTOBUF: + return DataTrackSchemaEncoding::Protobuf; + case proto::DATA_TRACK_SCHEMA_ENCODING_FLATBUFFER: + return DataTrackSchemaEncoding::Flatbuffer; + case proto::DATA_TRACK_SCHEMA_ENCODING_ROS1_MSG: + return DataTrackSchemaEncoding::Ros1Msg; + case proto::DATA_TRACK_SCHEMA_ENCODING_ROS2_MSG: + return DataTrackSchemaEncoding::Ros2Msg; + case proto::DATA_TRACK_SCHEMA_ENCODING_ROS2_IDL: + return DataTrackSchemaEncoding::Ros2Idl; + case proto::DATA_TRACK_SCHEMA_ENCODING_OMG_IDL: + return DataTrackSchemaEncoding::OmgIdl; + case proto::DATA_TRACK_SCHEMA_ENCODING_JSON_SCHEMA: + return DataTrackSchemaEncoding::JsonSchema; + case proto::DATA_TRACK_SCHEMA_ENCODING_OTHER: + return DataTrackSchemaEncoding::Other; + } + return DataTrackSchemaEncoding::Other; +} + +proto::DataTrackFrameEncoding toProto(DataTrackFrameEncoding in) { + switch (in) { + case DataTrackFrameEncoding::Ros1: + return proto::DATA_TRACK_FRAME_ENCODING_ROS1; + case DataTrackFrameEncoding::Cdr: + return proto::DATA_TRACK_FRAME_ENCODING_CDR; + case DataTrackFrameEncoding::Protobuf: + return proto::DATA_TRACK_FRAME_ENCODING_PROTOBUF; + case DataTrackFrameEncoding::Flatbuffer: + return proto::DATA_TRACK_FRAME_ENCODING_FLATBUFFER; + case DataTrackFrameEncoding::Cbor: + return proto::DATA_TRACK_FRAME_ENCODING_CBOR; + case DataTrackFrameEncoding::Msgpack: + return proto::DATA_TRACK_FRAME_ENCODING_MSGPACK; + case DataTrackFrameEncoding::Json: + return proto::DATA_TRACK_FRAME_ENCODING_JSON; + case DataTrackFrameEncoding::Other: + return proto::DATA_TRACK_FRAME_ENCODING_OTHER; + } + return proto::DATA_TRACK_FRAME_ENCODING_OTHER; +} + +DataTrackFrameEncoding fromProto(proto::DataTrackFrameEncoding in) { + switch (in) { + case proto::DATA_TRACK_FRAME_ENCODING_ROS1: + return DataTrackFrameEncoding::Ros1; + case proto::DATA_TRACK_FRAME_ENCODING_CDR: + return DataTrackFrameEncoding::Cdr; + case proto::DATA_TRACK_FRAME_ENCODING_PROTOBUF: + return DataTrackFrameEncoding::Protobuf; + case proto::DATA_TRACK_FRAME_ENCODING_FLATBUFFER: + return DataTrackFrameEncoding::Flatbuffer; + case proto::DATA_TRACK_FRAME_ENCODING_CBOR: + return DataTrackFrameEncoding::Cbor; + case proto::DATA_TRACK_FRAME_ENCODING_MSGPACK: + return DataTrackFrameEncoding::Msgpack; + case proto::DATA_TRACK_FRAME_ENCODING_JSON: + return DataTrackFrameEncoding::Json; + case proto::DATA_TRACK_FRAME_ENCODING_OTHER: + return DataTrackFrameEncoding::Other; + } + return DataTrackFrameEncoding::Other; +} + +proto::DataTrackSchemaId toProto(const DataTrackSchemaId& in) { + proto::DataTrackSchemaId out; + out.set_name(in.name); + out.set_encoding(toProto(in.encoding)); + return out; +} + +DataTrackSchemaId fromProto(const proto::DataTrackSchemaId& in) { + DataTrackSchemaId out; + out.name = in.name(); + out.encoding = fromProto(in.encoding()); + return out; +} + +DataTrackInfo fromProto(const proto::DataTrackInfo& in) { + DataTrackInfo out; + out.name = in.name(); + out.sid = in.sid(); + out.uses_e2ee = in.uses_e2ee(); + if (in.has_schema()) { + out.schema = fromProto(in.schema()); + } + if (in.has_frame_encoding()) { + out.frame_encoding = fromProto(in.frame_encoding()); + } + return out; +} + +} // namespace livekit diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index 89d6b0a8..0d24e880 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -23,6 +23,7 @@ #include #include "data_track.pb.h" +#include "data_track_proto_converter.h" #include "ffi.pb.h" #include "livekit/build.h" #include "livekit/data_track_error.h" @@ -786,7 +787,7 @@ std::future FfiClient::publishDataAsync(std::uint64_t local_participant_ha } std::future> FfiClient::publishDataTrackAsync( - std::uint64_t local_participant_handle, const std::string& track_name) { + std::uint64_t local_participant_handle, const DataTrackPublishOptions& options) { const AsyncId async_id = generateAsyncId(); auto fut = registerAsync>( @@ -812,7 +813,14 @@ std::future> FfiClient proto::FfiRequest req; auto* msg = req.mutable_publish_data_track(); msg->set_local_participant_handle(local_participant_handle); - msg->mutable_options()->set_name(track_name); + auto* opts = msg->mutable_options(); + opts->set_name(options.name); + if (options.schema.has_value()) { + *opts->mutable_schema() = toProto(*options.schema); + } + if (options.frame_encoding.has_value()) { + opts->set_frame_encoding(toProto(*options.frame_encoding)); + } msg->set_request_async_id(async_id); try { @@ -1173,4 +1181,87 @@ std::future FfiClient::sendStreamTrailerAsync(std::uint64_t local_particip return fut; } +std::future FfiClient::defineSchemaAsync(std::uint64_t local_participant_handle, const DataTrackSchemaId& schema_id, + const std::string& definition) { + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, + [async_id](const proto::FfiEvent& event) { + return event.has_define_schema() && event.define_schema().async_id() == async_id; + }, + [](const proto::FfiEvent& event, std::promise& pr) { + const auto& cb = event.define_schema(); + if (cb.has_error() && !cb.error().empty()) { + pr.set_exception(std::make_exception_ptr(std::runtime_error(cb.error()))); + return; + } + pr.set_value(); + }); + + // Build and send the request + proto::FfiRequest req; + auto* msg = req.mutable_define_schema(); + msg->set_local_participant_handle(local_participant_handle); + *msg->mutable_schema_id() = toProto(schema_id); + msg->set_definition(definition); + msg->set_request_async_id(async_id); + + try { + const proto::FfiResponse resp = sendRequest(req); + if (!resp.has_define_schema()) { + logAndThrow("FfiResponse missing define_schema"); + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; +} + +std::future FfiClient::getSchemaAsync(std::uint64_t local_participant_handle, + const DataTrackSchemaId& schema_id, + const std::string& participant_identity) { + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, + [async_id](const proto::FfiEvent& event) { + return event.has_get_schema() && event.get_schema().async_id() == async_id; + }, + [](const proto::FfiEvent& event, std::promise& pr) { + const auto& cb = event.get_schema(); + if (cb.has_error() && !cb.error().empty()) { + pr.set_exception(std::make_exception_ptr(std::runtime_error(cb.error()))); + return; + } + pr.set_value(cb.definition()); + }); + + // Build and send the request + proto::FfiRequest req; + auto* msg = req.mutable_get_schema(); + msg->set_local_participant_handle(local_participant_handle); + *msg->mutable_schema_id() = toProto(schema_id); + msg->set_participant_identity(participant_identity); + msg->set_request_async_id(async_id); + + try { + const proto::FfiResponse resp = sendRequest(req); + if (!resp.has_get_schema()) { + logAndThrow("FfiResponse missing get_schema"); + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; +} + } // namespace livekit diff --git a/src/ffi_client.h b/src/ffi_client.h index 9dc840e4..86d0f94e 100644 --- a/src/ffi_client.h +++ b/src/ffi_client.h @@ -31,6 +31,8 @@ #include "data_track.pb.h" #include "livekit/data_track_error.h" +#include "livekit/data_track_options.h" +#include "livekit/data_track_schema.h" #include "livekit/result.h" #include "livekit/room_event_types.h" #include "livekit/stats.h" @@ -123,9 +125,15 @@ class LIVEKIT_INTERNAL_API FfiClient { const std::string& payload, std::optional response_timeout_ms = std::nullopt); + // Data Track schema APIs + std::future defineSchemaAsync(std::uint64_t local_participant_handle, const DataTrackSchemaId& schema_id, + const std::string& definition); + std::future getSchemaAsync(std::uint64_t local_participant_handle, const DataTrackSchemaId& schema_id, + const std::string& participant_identity); + // Data Track APIs std::future> publishDataTrackAsync( - std::uint64_t local_participant_handle, const std::string& track_name); + std::uint64_t local_participant_handle, const DataTrackPublishOptions& options); Result subscribeDataTrack( std::uint64_t track_handle, std::optional buffer_size = std::nullopt); diff --git a/src/local_data_track.cpp b/src/local_data_track.cpp index 6173ab28..5986e283 100644 --- a/src/local_data_track.cpp +++ b/src/local_data_track.cpp @@ -17,6 +17,7 @@ #include "livekit/local_data_track.h" #include "data_track.pb.h" +#include "data_track_proto_converter.h" #include "ffi.pb.h" #include "ffi_client.h" #include "livekit/data_track_error.h" @@ -25,12 +26,7 @@ namespace livekit { LocalDataTrack::LocalDataTrack(const proto::OwnedLocalDataTrack& owned) - : handle_(static_cast(owned.handle().id())) { - const auto& pi = owned.info(); - info_.name = pi.name(); - info_.sid = pi.sid(); - info_.uses_e2ee = pi.uses_e2ee(); -} + : handle_(static_cast(owned.handle().id())), info_(fromProto(owned.info())) {} Result LocalDataTrack::tryPush(const DataTrackFrame& frame) { if (!handle_.valid()) { diff --git a/src/local_participant.cpp b/src/local_participant.cpp index 7fda68ac..8f3755c1 100644 --- a/src/local_participant.cpp +++ b/src/local_participant.cpp @@ -263,6 +263,13 @@ LocalParticipant::PublicationMap LocalParticipant::trackPublications() const { Result, PublishDataTrackError> LocalParticipant::publishDataTrack( const std::string& name) { + DataTrackPublishOptions options; + options.name = name; + return publishDataTrack(options); +} + +Result, PublishDataTrackError> LocalParticipant::publishDataTrack( + const DataTrackPublishOptions& options) { auto handle_id = ffiHandleId(); if (handle_id == 0) { return Result, PublishDataTrackError>::failure( @@ -271,7 +278,7 @@ Result, PublishDataTrackError> LocalParticipant: "handle"}); } - auto fut = FfiClient::instance().publishDataTrackAsync(static_cast(handle_id), name); + auto fut = FfiClient::instance().publishDataTrackAsync(static_cast(handle_id), options); auto result = fut.get(); if (!result) { @@ -290,6 +297,26 @@ void LocalParticipant::unpublishDataTrack(const std::shared_ptr& track->unpublishDataTrack(); } +void LocalParticipant::defineSchema(const DataTrackSchemaId& id, const std::string& definition) { + auto handle_id = ffiHandleId(); + if (handle_id == 0) { + throw std::runtime_error("LocalParticipant::defineSchema: invalid FFI handle"); + } + + auto fut = FfiClient::instance().defineSchemaAsync(static_cast(handle_id), id, definition); + fut.get(); +} + +std::string LocalParticipant::getSchema(const DataTrackSchemaId& id, const std::string& participant_identity) { + auto handle_id = ffiHandleId(); + if (handle_id == 0) { + throw std::runtime_error("LocalParticipant::getSchema: invalid FFI handle"); + } + + auto fut = FfiClient::instance().getSchemaAsync(static_cast(handle_id), id, participant_identity); + return fut.get(); +} + std::string LocalParticipant::performRpc(const std::string& destination_identity, const std::string& method, const std::string& payload, const std::optional& response_timeout) { auto handle_id = ffiHandleId(); diff --git a/src/remote_data_track.cpp b/src/remote_data_track.cpp index 7c48c560..f02a9e70 100644 --- a/src/remote_data_track.cpp +++ b/src/remote_data_track.cpp @@ -19,6 +19,7 @@ #include #include "data_track.pb.h" +#include "data_track_proto_converter.h" #include "ffi.pb.h" #include "ffi_client.h" #include "lk_log.h" @@ -27,10 +28,7 @@ namespace livekit { RemoteDataTrack::RemoteDataTrack(const proto::OwnedRemoteDataTrack& owned) : handle_(static_cast(owned.handle().id())), publisher_identity_(owned.publisher_identity()) { - const auto& pi = owned.info(); - info_.name = pi.name(); - info_.sid = pi.sid(); - info_.uses_e2ee = pi.uses_e2ee(); + info_ = fromProto(owned.info()); } bool RemoteDataTrack::isPublished() const { diff --git a/src/tests/integration/test_data_track.cpp b/src/tests/integration/test_data_track.cpp index b575c410..2ca856bb 100644 --- a/src/tests/integration/test_data_track.cpp +++ b/src/tests/integration/test_data_track.cpp @@ -21,8 +21,11 @@ // and run: // ./build-debug/bin/livekit_integration_tests +#include +#include #include #include +#include #include #include @@ -51,6 +54,7 @@ constexpr std::uint8_t kTransportPayloadValue = 0xFA; constexpr char kE2EESharedSecret[] = "password"; constexpr int kE2EEFrameCount = 5; constexpr int kTimestampFrameAttempts = 200; +constexpr std::size_t kMaxSchemaDefinitionBytes = 60000; std::string makeTrackName(const std::string& suffix) { return std::string(kTrackNamePrefix) + "_" + suffix + "_" + std::to_string(getTimestampUs()); @@ -516,6 +520,98 @@ TEST_F(DataTrackE2ETest, PublishDuplicateName) { first_track->unpublishDataTrack(); } +TEST_F(DataTrackE2ETest, DefineAndGetSchema) { + auto rooms = testRooms(2); + auto& publisher_room = rooms[0]; + auto& subscriber_room = rooms[1]; + + const auto publisher_identity = lockLocalParticipant(*publisher_room)->identity(); + const DataTrackSchemaId schema_id{"some_schema", DataTrackSchemaEncoding::JsonSchema}; + const std::string definition(kMaxSchemaDefinitionBytes, 'a'); + + ASSERT_NO_THROW(lockLocalParticipant(*publisher_room)->defineSchema(schema_id, definition)); + + std::string retrieved; + ASSERT_NO_THROW(retrieved = lockLocalParticipant(*subscriber_room)->getSchema(schema_id, publisher_identity)); + EXPECT_EQ(retrieved, definition); +} + +TEST_F(DataTrackE2ETest, DefineSchemaOverLimitFails) { + auto rooms = testRooms(1); + auto& room = rooms[0]; + + const DataTrackSchemaId schema_id{"some_schema", DataTrackSchemaEncoding::JsonSchema}; + // Deliberately exceed the maximum allowed schema definition size. + const std::string definition(2 * kMaxSchemaDefinitionBytes, 'a'); + + EXPECT_THROW(lockLocalParticipant(*room)->defineSchema(schema_id, definition), std::runtime_error); +} + +TEST_F(DataTrackE2ETest, DefineDuplicateSchemaFails) { + auto rooms = testRooms(1); + auto& room = rooms[0]; + + const DataTrackSchemaId schema_id{"some_schema", DataTrackSchemaEncoding::JsonSchema}; + const std::string definition(kMaxSchemaDefinitionBytes, 'a'); + + ASSERT_NO_THROW(lockLocalParticipant(*room)->defineSchema(schema_id, definition)); + // Defining the same schema again must fail. + EXPECT_THROW(lockLocalParticipant(*room)->defineSchema(schema_id, definition), std::runtime_error); +} + +TEST_F(DataTrackE2ETest, GetUndefinedSchemaFails) { + auto rooms = testRooms(1); + auto& room = rooms[0]; + + const auto identity = lockLocalParticipant(*room)->identity(); + const DataTrackSchemaId schema_id{"undefined", DataTrackSchemaEncoding::JsonSchema}; + + EXPECT_THROW((void)lockLocalParticipant(*room)->getSchema(schema_id, identity), std::runtime_error); +} + +TEST_F(DataTrackE2ETest, PublishWithSchemaAndFrameEncodingMetadata) { + const auto track_name = makeTrackName("schema_meta"); + + DataTrackPublishedDelegate subscriber_delegate; + std::vector room_configs(2); + room_configs[1].delegate = &subscriber_delegate; + + auto rooms = testRooms(room_configs); + auto& publisher_room = rooms[0]; + + DataTrackPublishOptions options; + options.name = track_name; + options.schema = DataTrackSchemaId{"sensor_msgs/Image", DataTrackSchemaEncoding::Ros2Msg}; + options.frame_encoding = DataTrackFrameEncoding::Cdr; + + auto publish_result = lockLocalParticipant(*publisher_room)->publishDataTrack(options); + if (!publish_result) { + FAIL() << describeDataTrackError(publish_result.error()); + } + auto local_track = publish_result.value(); + ASSERT_TRUE(local_track->isPublished()); + + const auto& local_info = local_track->info(); + ASSERT_TRUE(local_info.schema.has_value()); + EXPECT_EQ(local_info.schema->name, "sensor_msgs/Image"); + EXPECT_EQ(local_info.schema->encoding, DataTrackSchemaEncoding::Ros2Msg); + ASSERT_TRUE(local_info.frame_encoding.has_value()); + EXPECT_EQ(*local_info.frame_encoding, DataTrackFrameEncoding::Cdr); + + auto remote_track = subscriber_delegate.waitForTrack(kTrackWaitTimeout); + ASSERT_NE(remote_track, nullptr) << "Timed out waiting for remote data track"; + EXPECT_EQ(remote_track->info().name, track_name); + + const auto& remote_info = remote_track->info(); + ASSERT_TRUE(remote_info.schema.has_value()); + EXPECT_EQ(remote_info.schema->name, "sensor_msgs/Image"); + EXPECT_EQ(remote_info.schema->encoding, DataTrackSchemaEncoding::Ros2Msg); + ASSERT_TRUE(remote_info.frame_encoding.has_value()); + EXPECT_EQ(*remote_info.frame_encoding, DataTrackFrameEncoding::Cdr); + + local_track->unpublishDataTrack(); +} + TEST_F(DataTrackE2ETest, CanResubscribeToRemoteDataTrack) { const auto track_name = makeTrackName("resubscribe");