Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion lib/schematic/datastream/websocket_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
require "json"
require "websocket"

require_relative "../version"

module Schematic
module DataStream
WRITE_WAIT = 10 # seconds
Expand All @@ -27,6 +29,14 @@ module DataStream
MESSAGE_TYPE_DELETE = "delete"
MESSAGE_TYPE_ERROR = "error"

# Headers attached to the WebSocket handshake so the backend can distinguish
# direct-SDK connections from the schematic-datastream-replicator and
# correlate either to a specific release. Mode is always "direct" here —
# replicator mode in this SDK doesn't open a WebSocket at all.
CLIENT_NAME = "schematic-ruby"
DATASTREAM_MODE_DIRECT = "direct"
UNKNOWN_VERSION = "unknown"

class WebSocketClient
attr_reader :url, :connected, :ready

Expand Down Expand Up @@ -154,7 +164,7 @@ def connect
def perform_handshake
@handshake = WebSocket::Handshake::Client.new(
url: @url,
headers: { "X-Schematic-Api-Key" => @api_key }
headers: handshake_headers
)

@socket.write(@handshake.to_s)
Expand Down Expand Up @@ -310,6 +320,27 @@ def write_frame(type, data)
)
@write_mutex.synchronize { @socket&.write(frame.to_s) }
end

# Headers attached to the WebSocket handshake. The mode/client headers let
# the backend tell direct-SDK connections apart from the
# schematic-datastream-replicator and correlate them to a release.
def handshake_headers
{
"X-Schematic-Api-Key" => @api_key,
"X-Schematic-Datastream-Mode" => DATASTREAM_MODE_DIRECT,
"X-Schematic-Client" => CLIENT_NAME,
"X-Schematic-Client-Version" => sdk_version
}
end

# Resolves the SDK version reported in handshake headers. Schematic::VERSION
# is stamped into the Fern-generated lib/schematic/version.rb on each release;
# fall back to "unknown" if the constant is missing (e.g. a partial checkout).
def sdk_version
return Schematic::VERSION if defined?(Schematic::VERSION) && !Schematic::VERSION.to_s.empty?

UNKNOWN_VERSION
end
end
end
end
12 changes: 10 additions & 2 deletions lib/schematic/event_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def initialize(api_key:, logger:, interval: DEFAULT_FLUSH_INTERVAL, max_batch_si
@stopped = false
@flushing = false
@flush_done = ConditionVariable.new
@stop_cv = ConditionVariable.new

start_periodic_flush unless @offline
end
Expand Down Expand Up @@ -77,6 +78,10 @@ def stop
@mutex.synchronize do
@stopped = true

# Wake the periodic flush thread so it sees @stopped immediately
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems unrelated but I'll assume there's a good reason for this to be included

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is unrelated but the sleep below was adding like 3 minutes to the test suite run.

# instead of sleeping out the rest of @interval.
@stop_cv.broadcast

# Wait for any in-flight flush to complete before our final flush,
# so we don't skip events that arrived during the in-flight batch.
@flush_done.wait(@mutex, 30) if @flushing
Expand All @@ -92,8 +97,11 @@ def stop
def start_periodic_flush
@flush_thread = Thread.new do
loop do
sleep(@interval)
break if @stopped
should_break = @mutex.synchronize do
@stop_cv.wait(@mutex, @interval) unless @stopped
@stopped
end
break if should_break

begin
flush
Expand Down
34 changes: 34 additions & 0 deletions test/custom.test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,40 @@ def stop = @inner.stop
end
end

describe "DataStream WebSocket handshake headers" do
def build_client(api_key: "test_api_key")
Schematic::DataStream::WebSocketClient.new(
base_url: "https://api.schematichq.com",
api_key: api_key,
logger: Schematic::ConsoleLogger.new(level: :error),
message_handler: ->(_msg) {}
)
end

it "includes the api key, datastream mode, client name, and client version" do
headers = build_client(api_key: "test_api_key").send(:handshake_headers)

assert_equal "test_api_key", headers["X-Schematic-Api-Key"]
assert_equal "direct", headers["X-Schematic-Datastream-Mode"]
assert_equal "schematic-ruby", headers["X-Schematic-Client"]
assert_equal Schematic::VERSION, headers["X-Schematic-Client-Version"]
end

it "falls back to 'unknown' when Schematic::VERSION is empty" do
client = build_client
original_version = Schematic::VERSION
Schematic.send(:remove_const, :VERSION)
Schematic.const_set(:VERSION, "")

begin
assert_equal "unknown", client.send(:handshake_headers)["X-Schematic-Client-Version"]
ensure
Schematic.send(:remove_const, :VERSION)
Schematic.const_set(:VERSION, original_version)
end
end
end

# =============================================================================
# CheckFlagResponse Tests
# =============================================================================
Expand Down