diff --git a/lib/schematic/datastream/websocket_client.rb b/lib/schematic/datastream/websocket_client.rb index 2c90178..9a4c627 100644 --- a/lib/schematic/datastream/websocket_client.rb +++ b/lib/schematic/datastream/websocket_client.rb @@ -6,6 +6,8 @@ require "json" require "websocket" +require_relative "../version" + module Schematic module DataStream WRITE_WAIT = 10 # seconds @@ -27,6 +29,14 @@ module DataStream MESSAGE_TYPE_DELETE = "delete" MESSAGE_TYPE_ERROR = "error" + # Headers attached to the WebSocket handshake so the backend can distinguish + # direct-SDK connections from the schematic-datastream-replicator and + # correlate either to a specific release. Mode is always "direct" here — + # replicator mode in this SDK doesn't open a WebSocket at all. + CLIENT_NAME = "schematic-ruby" + DATASTREAM_MODE_DIRECT = "direct" + UNKNOWN_VERSION = "unknown" + class WebSocketClient attr_reader :url, :connected, :ready @@ -154,7 +164,7 @@ def connect def perform_handshake @handshake = WebSocket::Handshake::Client.new( url: @url, - headers: { "X-Schematic-Api-Key" => @api_key } + headers: handshake_headers ) @socket.write(@handshake.to_s) @@ -310,6 +320,27 @@ def write_frame(type, data) ) @write_mutex.synchronize { @socket&.write(frame.to_s) } end + + # Headers attached to the WebSocket handshake. The mode/client headers let + # the backend tell direct-SDK connections apart from the + # schematic-datastream-replicator and correlate them to a release. + def handshake_headers + { + "X-Schematic-Api-Key" => @api_key, + "X-Schematic-Datastream-Mode" => DATASTREAM_MODE_DIRECT, + "X-Schematic-Client" => CLIENT_NAME, + "X-Schematic-Client-Version" => sdk_version + } + end + + # Resolves the SDK version reported in handshake headers. Schematic::VERSION + # is stamped into the Fern-generated lib/schematic/version.rb on each release; + # fall back to "unknown" if the constant is missing (e.g. a partial checkout). + def sdk_version + return Schematic::VERSION if defined?(Schematic::VERSION) && !Schematic::VERSION.to_s.empty? + + UNKNOWN_VERSION + end end end end diff --git a/lib/schematic/event_buffer.rb b/lib/schematic/event_buffer.rb index 1fb2b35..6890459 100644 --- a/lib/schematic/event_buffer.rb +++ b/lib/schematic/event_buffer.rb @@ -26,6 +26,7 @@ def initialize(api_key:, logger:, interval: DEFAULT_FLUSH_INTERVAL, max_batch_si @stopped = false @flushing = false @flush_done = ConditionVariable.new + @stop_cv = ConditionVariable.new start_periodic_flush unless @offline end @@ -77,6 +78,10 @@ def stop @mutex.synchronize do @stopped = true + # Wake the periodic flush thread so it sees @stopped immediately + # instead of sleeping out the rest of @interval. + @stop_cv.broadcast + # Wait for any in-flight flush to complete before our final flush, # so we don't skip events that arrived during the in-flight batch. @flush_done.wait(@mutex, 30) if @flushing @@ -92,8 +97,11 @@ def stop def start_periodic_flush @flush_thread = Thread.new do loop do - sleep(@interval) - break if @stopped + should_break = @mutex.synchronize do + @stop_cv.wait(@mutex, @interval) unless @stopped + @stopped + end + break if should_break begin flush diff --git a/test/custom.test.rb b/test/custom.test.rb index 2601c28..eaca6ff 100644 --- a/test/custom.test.rb +++ b/test/custom.test.rb @@ -659,6 +659,40 @@ def stop = @inner.stop end end +describe "DataStream WebSocket handshake headers" do + def build_client(api_key: "test_api_key") + Schematic::DataStream::WebSocketClient.new( + base_url: "https://api.schematichq.com", + api_key: api_key, + logger: Schematic::ConsoleLogger.new(level: :error), + message_handler: ->(_msg) {} + ) + end + + it "includes the api key, datastream mode, client name, and client version" do + headers = build_client(api_key: "test_api_key").send(:handshake_headers) + + assert_equal "test_api_key", headers["X-Schematic-Api-Key"] + assert_equal "direct", headers["X-Schematic-Datastream-Mode"] + assert_equal "schematic-ruby", headers["X-Schematic-Client"] + assert_equal Schematic::VERSION, headers["X-Schematic-Client-Version"] + end + + it "falls back to 'unknown' when Schematic::VERSION is empty" do + client = build_client + original_version = Schematic::VERSION + Schematic.send(:remove_const, :VERSION) + Schematic.const_set(:VERSION, "") + + begin + assert_equal "unknown", client.send(:handshake_headers)["X-Schematic-Client-Version"] + ensure + Schematic.send(:remove_const, :VERSION) + Schematic.const_set(:VERSION, original_version) + end + end +end + # ============================================================================= # CheckFlagResponse Tests # =============================================================================