diff --git a/README.md b/README.md index 2f42258..20197fb 100644 --- a/README.md +++ b/README.md @@ -127,7 +127,7 @@ client = Schematic::SchematicClient.new( client.close ``` -You can also adjust the log level of the built-in console logger: +You can also adjust the log level of the built-in console logger via the `log_level` option: ```ruby require "schematichq" @@ -135,11 +135,11 @@ require "schematichq" api_key = ENV["SCHEMATIC_API_KEY"] client = Schematic::SchematicClient.new( api_key: api_key, - logger: Schematic::ConsoleLogger.new(level: :debug) + log_level: :debug ) ``` -If no logger is provided, the client will use a default console logger at the `:info` level that outputs to stderr. +If no logger is provided, the client will use a default console logger at the `:warn` level that outputs to stderr. The `log_level` option only applies to this built-in logger; when you supply your own `logger`, its level is left untouched. ## Usage Examples @@ -216,6 +216,36 @@ client.track({ }) ``` +#### Event options + +Both `track` and `identify` accept an `options:` keyword for optional event metadata. Only fields you set are sent. + +```ruby +client.track( + { event: "query-tokens", company: { "id" => "your-company-id" }, quantity: 1500 }, + options: { + # Dedupe key. Duplicate events with the same key are dropped + # server-side for 24 hours. + idempotency_key: "evt_2026_05_21_query_tokens", + # Timestamp the event occurred (Time or ISO 8601 String). Required + # when trusted_client_clock is true. + sent_at: Time.now.utc, + # Use sent_at as the effective event time instead of server receipt + # time. Requires a secret API key and sent_at. + trusted_client_clock: true, + # Import historical data without affecting billing. Requires a secret + # API key and trusted_client_clock. + backfill: false + } +) + +client.identify( + { keys: { "user_id" => "your-user-id" }, name: "Wile E. Coyote" }, + # identify only supports idempotency_key. + options: { idempotency_key: "ident_your-user-id" } +) +``` + ### Creating and updating companies Although it is faster to create companies and users via identify events, if you need to handle a response, you can use the companies API to upsert companies. Because you use your own identifiers to identify companies, rather than a Schematic company ID, creating and updating companies are both done via the same upsert operation: diff --git a/lib/schematic/datastream/merge.rb b/lib/schematic/datastream/merge.rb index 0ab68fe..f4dd4e5 100644 --- a/lib/schematic/datastream/merge.rb +++ b/lib/schematic/datastream/merge.rb @@ -16,25 +16,42 @@ def deep_copy(obj) COMPANY_MAP_FIELDS = %i[credit_balances keys traits].freeze COMPANY_ARRAY_FIELDS = %i[billing_product_ids entitlements plan_ids plan_version_ids rules].freeze + # Partials don't carry refreshed entitlements, so when their derived + # fields change in another part of the company we sync them here to match + # server behavior (see schematic-python merge.partial_company): + # - credit_remaining <- credit_balances[credit_id] + # - usage <- metric value matching (event_name, metric_period, month_reset) + # Both are skipped when the partial also sends entitlements wholesale. def partial_company(existing, partial_data) return existing unless partial_data.is_a?(Hash) result = deep_copy(existing) + entitlements_in_partial = partial_data.key?(:entitlements) || partial_data.key?("entitlements") + updated_balances = nil + metrics_updated = false partial_data.each do |key, value| sym_key = key.to_sym if COMPANY_MAP_FIELDS.include?(sym_key) result[sym_key] ||= {} result[sym_key] = result[sym_key].merge(value) if value.is_a?(Hash) + updated_balances = (value.is_a?(Hash) ? value : {}) if sym_key == :credit_balances elsif COMPANY_ARRAY_FIELDS.include?(sym_key) result[sym_key] = value if value.is_a?(Array) elsif sym_key == :metrics result[sym_key] = upsert_metrics(result[sym_key] || [], value || []) + metrics_updated = true else result[sym_key] = value end end + if (updated_balances&.any? || metrics_updated) && !entitlements_in_partial + result[:entitlements] = sync_entitlements( + result[:entitlements], result[:metrics], updated_balances, metrics_updated + ) + end + result end @@ -90,6 +107,60 @@ def metrics_match?(left, right) def get_metric_field(metric, field) metric[field] || metric[field.to_s] end + + # Re-derive entitlement usage / credit_remaining from the merged metrics + # and the just-updated credit balances. Mirrors schematic-python so that + # entitlement usage reflects DataStream track events immediately. + def sync_entitlements(entitlements, metrics, updated_balances, metrics_updated) + return entitlements unless entitlements.is_a?(Array) && !entitlements.empty? + + metrics_lookup = {} + if metrics_updated && metrics.is_a?(Array) + metrics.each do |metric| + next unless metric.is_a?(Hash) + + key = [ + get_metric_field(metric, :event_subtype) || "", + get_metric_field(metric, :period) || "", + get_metric_field(metric, :month_reset) || "" + ] + value = get_metric_field(metric, :value) + metrics_lookup[key] = value.nil? ? 0 : value + end + end + + entitlements.map do |ent| + next ent unless ent.is_a?(Hash) + + new_ent = deep_copy(ent) + + credit_id = get_metric_field(ent, :credit_id) + if updated_balances && credit_id + present, balance = fetch_balance(updated_balances, credit_id) + new_ent[:credit_remaining] = balance if present + end + + event_name = get_metric_field(ent, :event_name) + unless metrics_lookup.empty? || event_name.nil? + period = get_metric_field(ent, :metric_period) || "all_time" + month_reset = get_metric_field(ent, :month_reset) || "first_of_month" + matched = metrics_lookup[[event_name, period, month_reset]] + new_ent[:usage] = matched unless matched.nil? + end + + new_ent + end + end + + # The partial's credit_balances may be keyed by string or symbol depending + # on how the message was parsed, while an entitlement's credit_id is a + # string value, so check both key forms. + def fetch_balance(balances, credit_id) + return [true, balances[credit_id]] if balances.key?(credit_id) + return [true, balances[credit_id.to_sym]] if balances.key?(credit_id.to_sym) + + [false, nil] + end end end end diff --git a/lib/schematic/event_buffer.rb b/lib/schematic/event_buffer.rb index 6890459..b482ce9 100644 --- a/lib/schematic/event_buffer.rb +++ b/lib/schematic/event_buffer.rb @@ -176,13 +176,19 @@ def calculate_backoff(attempt) # Transform to capture service payload format (differs from Fern API): # - Field is `type` not `event_type` # - Each event includes `api_key` + # Optional metadata (idempotency_key, trusted_client_clock, backfill) is + # only included when the caller set it, so we never send explicit nulls. def event_to_capture_payload(event) - { + payload = { api_key: @api_key, type: event[:event_type], body: event[:body], sent_at: event[:sent_at] || Time.now.utc.iso8601 } + payload[:idempotency_key] = event[:idempotency_key] unless event[:idempotency_key].nil? + payload[:trusted_client_clock] = event[:trusted_client_clock] unless event[:trusted_client_clock].nil? + payload[:backfill] = event[:backfill] unless event[:backfill].nil? + payload end end end diff --git a/lib/schematic/logger.rb b/lib/schematic/logger.rb index a22cb86..e50ff68 100644 --- a/lib/schematic/logger.rb +++ b/lib/schematic/logger.rb @@ -26,7 +26,7 @@ class ConsoleLogger attr_accessor :level - def initialize(level: :info) + def initialize(level: :warn) @level = level @mutex = Mutex.new @io = $stderr diff --git a/lib/schematic/schematic_client.rb b/lib/schematic/schematic_client.rb index 4ec2c53..10fad27 100644 --- a/lib/schematic/schematic_client.rb +++ b/lib/schematic/schematic_client.rb @@ -56,6 +56,12 @@ class SchematicClient DEFAULT_CACHE_MAX_SIZE = 1000 DEFAULT_EVENT_BUFFER_PERIOD = 5.0 # seconds (canonical Go value) + # Optional event metadata accepted via the `options:` keyword on track/identify. + # identify only honors :idempotency_key; track also honors :sent_at, + # :trusted_client_clock, and :backfill. Fields are only sent when set. + TRACK_OPTION_KEYS = %i[idempotency_key sent_at trusted_client_clock backfill].freeze + IDENTIFY_OPTION_KEYS = %i[idempotency_key].freeze + def initialize( api_key: nil, base_url: nil, @@ -67,7 +73,7 @@ def initialize( use_data_stream: false, datastream_options: {}, logger: nil, - log_level: :info + log_level: :warn ) @api_key = api_key @base_url = base_url || DEFAULT_BASE_URL @@ -251,26 +257,18 @@ def check_flags(company: nil, user: nil, keys: nil) # --- Event Submission --- - def identify(body) + def identify(body, options: nil) return if @offline - @event_buffer.push({ - event_type: "identify", - body: body, - sent_at: Time.now.utc.iso8601 - }) + @event_buffer.push(build_event("identify", body, options, IDENTIFY_OPTION_KEYS)) rescue StandardError => e @logger.error("Error sending identify event: #{e.message}") end - def track(body) + def track(body, options: nil) return if @offline - @event_buffer.push({ - event_type: "track", - body: body, - sent_at: Time.now.utc.iso8601 - }) + @event_buffer.push(build_event("track", body, options, TRACK_OPTION_KEYS)) # Update company metrics locally if DataStream is active and connected if @datastream_client&.connected? && body[:company] @@ -495,6 +493,41 @@ def check_flags_via_datastream(keys, company, user) results end + # Build the buffered event hash, applying any caller-supplied options. + # Only keys in allowed_keys are honored, and only when explicitly set, so + # unset fields never appear on the wire. + def build_event(event_type, body, options, allowed_keys) + event = { + event_type: event_type, + body: body, + sent_at: Time.now.utc.iso8601 + } + return event unless options.is_a?(Hash) + + allowed_keys.each do |key| + value = option_value(options, key) + next if value.nil? + + event[key] = key == :sent_at ? normalize_sent_at(value) : value + end + + event + end + + def option_value(options, key) + return options[key] if options.key?(key) + + options[key.to_s] + end + + def normalize_sent_at(value) + return value if value.is_a?(String) + return value.utc.iso8601 if value.respond_to?(:utc) + return value.iso8601 if value.respond_to?(:iso8601) + + value + end + def enqueue_flag_check_event(flag_key, response, company, user) body = { flag_key: flag_key, diff --git a/test/custom.test.rb b/test/custom.test.rb index eaca6ff..413398b 100644 --- a/test/custom.test.rb +++ b/test/custom.test.rb @@ -223,6 +223,26 @@ def stop = @inner.stop assert_equal :debug, logger.level end + + it "defaults to the warn level" do + assert_equal :warn, Schematic::ConsoleLogger.new.level + end + + it "creates a default logger at the warn level when none is provided" do + client = Schematic::SchematicClient.new(offline: true) + + assert_equal :warn, client.instance_variable_get(:@logger).level + client.close + end + + it "leaves a custom logger's level untouched, ignoring log_level" do + custom = Schematic::ConsoleLogger.new(level: :debug) + client = Schematic::SchematicClient.new(offline: true, logger: custom, log_level: :error) + + assert_same custom, client.instance_variable_get(:@logger) + assert_equal :debug, custom.level + client.close + end end # ============================================================================= @@ -489,6 +509,64 @@ def stop = @inner.stop buffer.stop end + it "excludes unset optional fields from the capture payload" do + request_body = nil + stub_request(:post, CAPTURE_URL).to_return do |req| + request_body = JSON.parse(req.body, symbolize_names: true) + { status: 200 } + end + + buffer = Schematic::EventBuffer.new( + api_key: "test_key", + logger: Schematic::ConsoleLogger.new(level: :error), + interval: 100, + offline: false + ) + + buffer.push({ event_type: "track", body: { event: "page_view" }, sent_at: "2024-01-01T00:00:00Z" }) + buffer.flush + + event = request_body[:events][0] + + refute event.key?(:idempotency_key) + refute event.key?(:trusted_client_clock) + refute event.key?(:backfill) + buffer.stop + end + + it "includes set optional fields in the capture payload" do + request_body = nil + stub_request(:post, CAPTURE_URL).to_return do |req| + request_body = JSON.parse(req.body, symbolize_names: true) + { status: 200 } + end + + buffer = Schematic::EventBuffer.new( + api_key: "test_key", + logger: Schematic::ConsoleLogger.new(level: :error), + interval: 100, + offline: false + ) + + buffer.push({ + event_type: "track", + body: { event: "page_view" }, + sent_at: "2024-01-01T00:00:00Z", + idempotency_key: "evt_xyz", + trusted_client_clock: true, + backfill: false + }) + buffer.flush + + event = request_body[:events][0] + + assert_equal "evt_xyz", event[:idempotency_key] + assert event[:trusted_client_clock] + assert event.key?(:backfill) + refute event[:backfill] + buffer.stop + end + it "includes X-Schematic-Api-Key header" do stub = stub_request(:post, CAPTURE_URL).with( headers: { "X-Schematic-Api-Key" => "test_key" } @@ -630,6 +708,142 @@ def stop = @inner.stop assert_equal 2, result.size end + + it "partial_company syncs entitlement usage from updated metrics" do + existing = { + id: "comp_1", + metrics: [ + { event_subtype: "api_call", period: "current_month", month_reset: "first_of_month", value: 5 } + ], + entitlements: [ + { + feature_id: "feat_1", + feature_key: "api", + event_name: "api_call", + metric_period: "current_month", + month_reset: "first_of_month", + usage: 5 + } + ] + } + + partial = { + metrics: [ + { event_subtype: "api_call", period: "current_month", month_reset: "first_of_month", value: 12 } + ] + } + + result = Schematic::DataStream::Merge.partial_company(existing, partial) + + assert_equal 12, result[:entitlements][0][:usage] + end + + it "partial_company defaults metric_period and month_reset when matching usage" do + existing = { + id: "comp_1", + metrics: [ + { event_subtype: "api_call", period: "all_time", month_reset: "first_of_month", value: 1 } + ], + entitlements: [ + { feature_id: "feat_1", feature_key: "api", event_name: "api_call", usage: 1 } + ] + } + + partial = { + metrics: [ + { event_subtype: "api_call", period: "all_time", month_reset: "first_of_month", value: 9 } + ] + } + + result = Schematic::DataStream::Merge.partial_company(existing, partial) + + assert_equal 9, result[:entitlements][0][:usage] + end + + it "partial_company syncs credit_remaining from updated credit_balances" do + existing = { + id: "comp_1", + credit_balances: { "credit_abc" => 100 }, + entitlements: [ + { feature_id: "feat_1", feature_key: "ai", credit_id: "credit_abc", credit_remaining: 100 } + ] + } + + partial = { + credit_balances: { "credit_abc" => 40 } + } + + result = Schematic::DataStream::Merge.partial_company(existing, partial) + + assert_equal 40, result[:entitlements][0][:credit_remaining] + end + + it "partial_company syncs credit_remaining for every entitlement sharing a credit_id" do + existing = { + id: "comp_1", + credit_balances: { "credit_abc" => 100 }, + entitlements: [ + { feature_id: "feat_1", feature_key: "ai-chat", credit_id: "credit_abc", credit_remaining: 100 }, + { feature_id: "feat_2", feature_key: "ai-image", credit_id: "credit_abc", credit_remaining: 100 }, + { feature_id: "feat_3", feature_key: "other", credit_id: "credit_xyz", credit_remaining: 7 } + ] + } + + partial = { + credit_balances: { "credit_abc" => 40 } + } + + result = Schematic::DataStream::Merge.partial_company(existing, partial) + + assert_equal 40, result[:entitlements][0][:credit_remaining] + assert_equal 40, result[:entitlements][1][:credit_remaining] + # untouched: its credit wasn't in the partial + assert_equal 7, result[:entitlements][2][:credit_remaining] + end + + it "partial_company does not derive entitlements when partial sends them wholesale" do + existing = { + id: "comp_1", + metrics: [ + { event_subtype: "api_call", period: "current_month", month_reset: "first_of_month", value: 5 } + ], + entitlements: [ + { feature_id: "feat_1", feature_key: "api", event_name: "api_call", + metric_period: "current_month", month_reset: "first_of_month", usage: 5 } + ] + } + + partial = { + metrics: [ + { event_subtype: "api_call", period: "current_month", month_reset: "first_of_month", value: 12 } + ], + entitlements: [ + { feature_id: "feat_1", feature_key: "api", event_name: "api_call", + metric_period: "current_month", month_reset: "first_of_month", usage: 99 } + ] + } + + result = Schematic::DataStream::Merge.partial_company(existing, partial) + + # entitlements were sent wholesale, so the explicit value wins (no derivation) + assert_equal 99, result[:entitlements][0][:usage] + end + + it "partial_company leaves entitlements untouched when neither metrics nor balances change" do + existing = { + id: "comp_1", + traits: { "plan" => "pro" }, + entitlements: [ + { feature_id: "feat_1", feature_key: "api", event_name: "api_call", usage: 5 } + ] + } + + partial = { traits: { "seats" => "10" } } + + result = Schematic::DataStream::Merge.partial_company(existing, partial) + + assert_equal 5, result[:entitlements][0][:usage] + end end # ============================================================================= @@ -1456,6 +1670,113 @@ def build_client(api_key: "test_api_key") end end +# ============================================================================= +# SchematicClient - Track/Identify Event Options +# ============================================================================= +describe "SchematicClient - Event Options" do + def capture_event(&block) + request_body = nil + stub_request(:post, CAPTURE_URL).to_return do |req| + request_body = JSON.parse(req.body, symbolize_names: true) + { status: 200 } + end + + client = Schematic::SchematicClient.new(api_key: "api_test_key_123", cache_providers: []) + block.call(client) + client.instance_variable_get(:@event_buffer).flush + client.close + + request_body[:events][0] + end + + after { WebMock.reset! } + + it "passes idempotency_key from track options to the wire payload" do + event = capture_event do |client| + client.track({ event: "credit-consumed", company: { "id" => "company_id" } }, + options: { idempotency_key: "evt_abc123" }) + end + + assert_equal "evt_abc123", event[:idempotency_key] + refute event.key?(:trusted_client_clock) + refute event.key?(:backfill) + end + + it "passes all track options to the wire payload" do + sent_at = Time.utc(2026, 5, 21, 12, 0, 0) + event = capture_event do |client| + client.track({ event: "historical-import", company: { "id" => "company_id" } }, + options: { + idempotency_key: "evt_xyz", + sent_at: sent_at, + trusted_client_clock: true, + backfill: true + }) + end + + assert_equal "evt_xyz", event[:idempotency_key] + assert_equal "2026-05-21T12:00:00Z", event[:sent_at] + assert event[:trusted_client_clock] + assert event[:backfill] + end + + it "omits unset optional fields when track has no options" do + event = capture_event do |client| + client.track({ event: "some-event", company: { "id" => "company_id" } }) + end + + refute event.key?(:idempotency_key) + refute event.key?(:trusted_client_clock) + refute event.key?(:backfill) + end + + it "sends explicitly false backfill on the wire" do + event = capture_event do |client| + client.track({ event: "some-event" }, options: { backfill: false, trusted_client_clock: false }) + end + + assert event.key?(:backfill) + refute event[:backfill] + assert event.key?(:trusted_client_clock) + refute event[:trusted_client_clock] + end + + it "accepts a string sent_at without modification" do + event = capture_event do |client| + client.track({ event: "some-event" }, options: { sent_at: "2024-01-01T00:00:00Z" }) + end + + assert_equal "2024-01-01T00:00:00Z", event[:sent_at] + end + + it "passes idempotency_key from identify options to the wire payload" do + event = capture_event do |client| + client.identify({ keys: { "id" => "user_id" } }, options: { idempotency_key: "ident_123" }) + end + + assert_equal "ident_123", event[:idempotency_key] + end + + it "ignores track-only options on identify" do + event = capture_event do |client| + client.identify({ keys: { "id" => "user_id" } }, + options: { idempotency_key: "ident_123", trusted_client_clock: true, backfill: true }) + end + + assert_equal "ident_123", event[:idempotency_key] + refute event.key?(:trusted_client_clock) + refute event.key?(:backfill) + end + + it "omits idempotency_key when identify has no options" do + event = capture_event do |client| + client.identify({ keys: { "id" => "user_id" }, name: "User Name" }) + end + + refute event.key?(:idempotency_key) + end +end + # ============================================================================= # EventBuffer - Concurrent Push # =============================================================================