Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,19 +127,19 @@ client = Schematic::SchematicClient.new(
client.close
```

You can also adjust the log level of the built-in console logger:
You can also adjust the log level of the built-in console logger via the `log_level` option:

```ruby
require "schematichq"

api_key = ENV["SCHEMATIC_API_KEY"]
client = Schematic::SchematicClient.new(
api_key: api_key,
logger: Schematic::ConsoleLogger.new(level: :debug)
log_level: :debug
)
```

If no logger is provided, the client will use a default console logger at the `:info` level that outputs to stderr.
If no logger is provided, the client will use a default console logger at the `:warn` level that outputs to stderr. The `log_level` option only applies to this built-in logger; when you supply your own `logger`, its level is left untouched.

## Usage Examples

Expand Down Expand Up @@ -216,6 +216,36 @@ client.track({
})
```

#### Event options

Both `track` and `identify` accept an `options:` keyword for optional event metadata. Only fields you set are sent.

```ruby
client.track(
{ event: "query-tokens", company: { "id" => "your-company-id" }, quantity: 1500 },
options: {
# Dedupe key. Duplicate events with the same key are dropped
# server-side for 24 hours.
idempotency_key: "evt_2026_05_21_query_tokens",
# Timestamp the event occurred (Time or ISO 8601 String). Required
# when trusted_client_clock is true.
sent_at: Time.now.utc,
# Use sent_at as the effective event time instead of server receipt
# time. Requires a secret API key and sent_at.
trusted_client_clock: true,
# Import historical data without affecting billing. Requires a secret
# API key and trusted_client_clock.
backfill: false
}
)

client.identify(
{ keys: { "user_id" => "your-user-id" }, name: "Wile E. Coyote" },
# identify only supports idempotency_key.
options: { idempotency_key: "ident_your-user-id" }
)
```

### Creating and updating companies

Although it is faster to create companies and users via identify events, if you need to handle a response, you can use the companies API to upsert companies. Because you use your own identifiers to identify companies, rather than a Schematic company ID, creating and updating companies are both done via the same upsert operation:
Expand Down
71 changes: 71 additions & 0 deletions lib/schematic/datastream/merge.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,42 @@ def deep_copy(obj)
COMPANY_MAP_FIELDS = %i[credit_balances keys traits].freeze
COMPANY_ARRAY_FIELDS = %i[billing_product_ids entitlements plan_ids plan_version_ids rules].freeze

# Partials don't carry refreshed entitlements, so when their derived
# fields change in another part of the company we sync them here to match
# server behavior (see schematic-python merge.partial_company):
# - credit_remaining <- credit_balances[credit_id]
# - usage <- metric value matching (event_name, metric_period, month_reset)
# Both are skipped when the partial also sends entitlements wholesale.
def partial_company(existing, partial_data)
return existing unless partial_data.is_a?(Hash)

result = deep_copy(existing)
entitlements_in_partial = partial_data.key?(:entitlements) || partial_data.key?("entitlements")
updated_balances = nil
metrics_updated = false

partial_data.each do |key, value|
sym_key = key.to_sym
if COMPANY_MAP_FIELDS.include?(sym_key)
result[sym_key] ||= {}
result[sym_key] = result[sym_key].merge(value) if value.is_a?(Hash)
updated_balances = (value.is_a?(Hash) ? value : {}) if sym_key == :credit_balances
elsif COMPANY_ARRAY_FIELDS.include?(sym_key)
result[sym_key] = value if value.is_a?(Array)
elsif sym_key == :metrics
result[sym_key] = upsert_metrics(result[sym_key] || [], value || [])
metrics_updated = true
else
result[sym_key] = value
end
end

if (updated_balances&.any? || metrics_updated) && !entitlements_in_partial
result[:entitlements] = sync_entitlements(
result[:entitlements], result[:metrics], updated_balances, metrics_updated
)
end

result
end

Expand Down Expand Up @@ -90,6 +107,60 @@ def metrics_match?(left, right)
def get_metric_field(metric, field)
metric[field] || metric[field.to_s]
end

# Re-derive entitlement usage / credit_remaining from the merged metrics
# and the just-updated credit balances. Mirrors schematic-python so that
# entitlement usage reflects DataStream track events immediately.
def sync_entitlements(entitlements, metrics, updated_balances, metrics_updated)
return entitlements unless entitlements.is_a?(Array) && !entitlements.empty?

metrics_lookup = {}
if metrics_updated && metrics.is_a?(Array)
metrics.each do |metric|
next unless metric.is_a?(Hash)

key = [
get_metric_field(metric, :event_subtype) || "",
get_metric_field(metric, :period) || "",
get_metric_field(metric, :month_reset) || ""
]
value = get_metric_field(metric, :value)
metrics_lookup[key] = value.nil? ? 0 : value
end
end

entitlements.map do |ent|
next ent unless ent.is_a?(Hash)

new_ent = deep_copy(ent)

credit_id = get_metric_field(ent, :credit_id)
if updated_balances && credit_id
present, balance = fetch_balance(updated_balances, credit_id)
new_ent[:credit_remaining] = balance if present
end

event_name = get_metric_field(ent, :event_name)
unless metrics_lookup.empty? || event_name.nil?
period = get_metric_field(ent, :metric_period) || "all_time"
month_reset = get_metric_field(ent, :month_reset) || "first_of_month"
matched = metrics_lookup[[event_name, period, month_reset]]
new_ent[:usage] = matched unless matched.nil?
end

new_ent
end
end

# The partial's credit_balances may be keyed by string or symbol depending
# on how the message was parsed, while an entitlement's credit_id is a
# string value, so check both key forms.
def fetch_balance(balances, credit_id)
return [true, balances[credit_id]] if balances.key?(credit_id)
return [true, balances[credit_id.to_sym]] if balances.key?(credit_id.to_sym)

[false, nil]
end
end
end
end
8 changes: 7 additions & 1 deletion lib/schematic/event_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,19 @@ def calculate_backoff(attempt)
# Transform to capture service payload format (differs from Fern API):
# - Field is `type` not `event_type`
# - Each event includes `api_key`
# Optional metadata (idempotency_key, trusted_client_clock, backfill) is
# only included when the caller set it, so we never send explicit nulls.
def event_to_capture_payload(event)
{
payload = {
api_key: @api_key,
type: event[:event_type],
body: event[:body],
sent_at: event[:sent_at] || Time.now.utc.iso8601
}
payload[:idempotency_key] = event[:idempotency_key] unless event[:idempotency_key].nil?
payload[:trusted_client_clock] = event[:trusted_client_clock] unless event[:trusted_client_clock].nil?
payload[:backfill] = event[:backfill] unless event[:backfill].nil?
payload
end
end
end
2 changes: 1 addition & 1 deletion lib/schematic/logger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class ConsoleLogger

attr_accessor :level

def initialize(level: :info)
def initialize(level: :warn)
@level = level
@mutex = Mutex.new
@io = $stderr
Expand Down
59 changes: 46 additions & 13 deletions lib/schematic/schematic_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ class SchematicClient
DEFAULT_CACHE_MAX_SIZE = 1000
DEFAULT_EVENT_BUFFER_PERIOD = 5.0 # seconds (canonical Go value)

# Optional event metadata accepted via the `options:` keyword on track/identify.
# identify only honors :idempotency_key; track also honors :sent_at,
# :trusted_client_clock, and :backfill. Fields are only sent when set.
TRACK_OPTION_KEYS = %i[idempotency_key sent_at trusted_client_clock backfill].freeze
IDENTIFY_OPTION_KEYS = %i[idempotency_key].freeze

def initialize(
api_key: nil,
base_url: nil,
Expand All @@ -67,7 +73,7 @@ def initialize(
use_data_stream: false,
datastream_options: {},
logger: nil,
log_level: :info
log_level: :warn
)
@api_key = api_key
@base_url = base_url || DEFAULT_BASE_URL
Expand Down Expand Up @@ -251,26 +257,18 @@ def check_flags(company: nil, user: nil, keys: nil)

# --- Event Submission ---

def identify(body)
def identify(body, options: nil)
return if @offline

@event_buffer.push({
event_type: "identify",
body: body,
sent_at: Time.now.utc.iso8601
})
@event_buffer.push(build_event("identify", body, options, IDENTIFY_OPTION_KEYS))
rescue StandardError => e
@logger.error("Error sending identify event: #{e.message}")
end

def track(body)
def track(body, options: nil)
return if @offline

@event_buffer.push({
event_type: "track",
body: body,
sent_at: Time.now.utc.iso8601
})
@event_buffer.push(build_event("track", body, options, TRACK_OPTION_KEYS))

# Update company metrics locally if DataStream is active and connected
if @datastream_client&.connected? && body[:company]
Expand Down Expand Up @@ -495,6 +493,41 @@ def check_flags_via_datastream(keys, company, user)
results
end

# Build the buffered event hash, applying any caller-supplied options.
# Only keys in allowed_keys are honored, and only when explicitly set, so
# unset fields never appear on the wire.
def build_event(event_type, body, options, allowed_keys)
event = {
event_type: event_type,
body: body,
sent_at: Time.now.utc.iso8601
}
return event unless options.is_a?(Hash)

allowed_keys.each do |key|
value = option_value(options, key)
next if value.nil?

event[key] = key == :sent_at ? normalize_sent_at(value) : value
end

event
end

def option_value(options, key)
return options[key] if options.key?(key)

options[key.to_s]
end

def normalize_sent_at(value)
return value if value.is_a?(String)
return value.utc.iso8601 if value.respond_to?(:utc)
return value.iso8601 if value.respond_to?(:iso8601)

value
end

def enqueue_flag_check_event(flag_key, response, company, user)
body = {
flag_key: flag_key,
Expand Down
Loading