Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 87 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,18 @@ The execution algorithm is proven at scale in production. This implementation st

* Uses GraphQL Ruby schemas.
* Currently no built-in validation or analysis, do it ahead of time.
* No support for subscription, defer, or stream.
* Currently no defer or stream.
* Supports input validations, but NOT input transformations (ie: "prepare" hooks).

# Usage

## Execute a query

```ruby
executor = GraphQL::BreadthExec::Executor.build(
schema: MyGraphQLSchema,
document: GraphQL.parse(document),
executor = GraphQL::BreadthExec::Executor.new(
MyGraphQLSchema,
GraphQL.parse(document),
root_object: { ... },
operation_name: "OperationName",
variables: { ... },
context: { ... },
tracers: [ ... ],
Expand Down Expand Up @@ -539,7 +538,9 @@ class MyAuthorization < GraphQL::BreadthExec::Authorization
# ... detail auth behaviors
end

GraphQL::BreadthExec::Executor.build(
GraphQL::BreadthExec::Executor.new(
MyGraphQLSchema,
GraphQL.parse(document),
authorization: MyAuthorization,
)
```
Expand Down Expand Up @@ -654,3 +655,83 @@ end
```

This architecture makes cascading resolvers run repeatedly on every field in a subtree, rather than just once at the top of the owning field's subtree. This pattern is more granular and generally safer for isolation and parallelism, though has more resolver churn than a typical depth traversal so should be used accordingly.

## Subscriptions

Query and mutation execution use `result`, which always returns a normal GraphQL result hash. If a controller accepts subscription operations, use `result_or_subscribe` instead. For query and mutation operations it returns the same result hash as `result`; for subscription operations it returns a `GraphQL::BreadthExec::SubscriptionResponseStream` on successful source setup, or a normal GraphQL result hash for public setup errors. Calling `result` for a subscription operation raises an implementation error.

```ruby
executor = GraphQL::BreadthExec::Executor.new(
MyGraphQLSchema,
GraphQL.parse(document),
variables: { ... },
context: { ... },
)

response = executor.result_or_subscribe

if response.is_a?(GraphQL::BreadthExec::SubscriptionResponseStream)
response.each do |event_result|
deliver(event_result)
end
else
deliver(response)
end
```

Subscription root fields use two field resolver hooks:

* `subscribe(exec_field, context)` runs once during subscription setup and must return an `Enumerable` or `Enumerator` of source events.
* `resolve(exec_field, context)` runs once per yielded source event. The source event is used as the root object for that event's GraphQL execution.

```ruby
class OnWriteValueResolver < GraphQL::BreadthExec::FieldResolver
def subscribe(exec_field, context)
context[:write_value_events]
end

def resolve(exec_field, context)
exec_field.map_objects(&:itself)
end
end

class WriteValuePayload < BaseObject
field :value, String, null: true
end

class Subscription < BaseObject
field :on_write_value, WriteValuePayload, null: true do |field|
field.breadth_resolver = OnWriteValueResolver.new
end
end
```

For a small in-process source stream, any Ruby enumerator is enough:

```ruby
write_value_events = Enumerator.new do |events|
events << { value: "first" }
events << { value: "second" }
end

executor = GraphQL::BreadthExec::Executor.new(
MyGraphQLSchema,
GraphQL.parse(%|
subscription WatchWrites {
onWriteValue {
value
}
}
|),
context: { write_value_events: write_value_events },
)

stream = executor.result_or_subscribe
stream.each do |event_result|
# {"data"=>{"onWriteValue"=>{"value"=>"first"}}}
# {"data"=>{"onWriteValue"=>{"value"=>"second"}}}
deliver(event_result)
end
```

Each source event is fulfilled through normal breadth execution, so field resolvers, lazy loading, authorization, directives, abstract type resolution, and error formatting all work as they do for query execution. Errors raised while enumerating the source stream are allowed to propagate to the stream consumer. Promise-backed subscription setup is not supported; `subscribe` should return the source stream synchronously. Returning a promise or any non-enumerable value is an implementation error.
14 changes: 7 additions & 7 deletions benchmark/run.rb
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ def benchmark_execution
resolvers: BREADTH_RESOLVERS,
root_object: data_source,
tracers: [CARDINAL_TRACER],
).perform
).result
end

x.compare!
Expand Down Expand Up @@ -396,7 +396,7 @@ def benchmark_lazy_execution
resolvers: BREADTH_DEFERRED_RESOLVERS,
root_object: data_source,
tracers: [CARDINAL_TRACER],
).perform
).result
end

x.compare!
Expand All @@ -419,7 +419,7 @@ def benchmark_introspection
resolvers: BREADTH_RESOLVERS,
root_object: {},
tracers: [CARDINAL_TRACER],
).perform
).result
end

x.compare!
Expand Down Expand Up @@ -451,7 +451,7 @@ def benchmark_resolve_batch
resolvers: RESOLVE_BATCH_RESOLVERS,
root_object: RESOLVE_BATCH_DATA,
tracers: [CARDINAL_TRACER],
).perform
).result
end

x.compare!
Expand All @@ -471,7 +471,7 @@ def benchmark_lazy_scalars
resolvers: LAZY_SCALAR_BREADTH_RESOLVERS,
root_object: RESOLVE_BATCH_DATA,
tracers: [CARDINAL_TRACER],
).perform
).result
end

x.report("graphql-ruby graphql-batch: #{RESOLVE_BATCH_OBJECT_COUNT} x 3 scalars") do
Expand Down Expand Up @@ -523,7 +523,7 @@ def benchmark_lazy_field_batch
resolvers: LAZY_SCALAR_BREADTH_RESOLVERS,
root_object: RESOLVE_BATCH_DATA,
tracers: [CARDINAL_TRACER],
).perform
).result
end

x.compare!
Expand Down Expand Up @@ -552,7 +552,7 @@ def memory_profile
resolvers: BREADTH_RESOLVERS,
root_object: data_source,
tracers: [CARDINAL_TRACER],
).perform
).result
end

puts "\n\ngraphql-cardinal memory profile: #{num_objects} resolvers"
Expand Down
1 change: 1 addition & 0 deletions lib/graphql/breadth_exec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class ExecutionField; end
require_relative "breadth_exec/tracer"
require_relative "breadth_exec/field_resolvers"
require_relative "breadth_exec/directive_resolvers"
require_relative "breadth_exec/subscription_response_stream"
require_relative "breadth_exec/has_breadth_resolver"
require_relative "breadth_exec/introspection"
require_relative "breadth_exec/executor"
Expand Down
15 changes: 0 additions & 15 deletions lib/graphql/breadth_exec/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -179,21 +179,6 @@ def initialize(message = nil, exec_field:, list_item: false)
end
end

class OperationTypeUnsupportedError < ExecutionError
#: (String) -> void
def initialize(operation_type)
@operation_type = operation_type
super("Unsupported operation type")
end

#: () -> error_hash
def to_h
super.tap do |hash|
hash["path"] = [@operation_type]
end
end
end

class InputError < ExecutionError
#: (
#| String message,
Expand Down
114 changes: 103 additions & 11 deletions lib/graphql/breadth_exec/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ def initialize(schema, document, resolvers: EMPTY_OBJECT, root_object: nil, vari
@input = InputFormatter.new(@context)
@planner = ExecutionPlanner.new(executor: self, resolvers: @resolvers)
@authorization = authorization.new
@authorization_class = authorization
@document = document
@root_object = root_object
@context_value = context
@tracers = tracers
@result = {}
@data = {}
Expand Down Expand Up @@ -98,9 +100,41 @@ def paths
end

#: -> graphql_result
def perform
execute unless executed?
@result
def result
operation = @query.selected_operation
if operation.operation_type == ExecutionPlanner::SUBSCRIPTION_OPERATION
raise ImplementationError, "Use result_or_subscribe for subscription operations"
end

return @result if executed?

execute
end

#: -> (SubscriptionResponseStream | graphql_result)
def result_or_subscribe
return @result if executed?

operation = @query.selected_operation
@result = if operation.operation_type == ExecutionPlanner::SUBSCRIPTION_OPERATION
subscribe(operation)
else
execute
end
end

#: (untyped) -> graphql_result
def execute_subscription_event(object)
self.class.new(
@schema,
@document,
resolvers: @resolvers,
root_object: object,
variables: @provided_variables,
context: @context_value,
tracers: @tracers,
authorization: @authorization_class,
).execute
end

#: (singleton(LazyLoader), ?loader_args?) -> LazyLoader[untyped]
Expand Down Expand Up @@ -173,7 +207,7 @@ def executed?
@executed
end

private
protected

#: -> graphql_result
def execute
Expand Down Expand Up @@ -211,11 +245,7 @@ def execute
@tracers.each { _1.before_execute(self, @context) }
end

root_scopes = begin
@planner.root_scopes_for_operation(operation, root_object: @root_object, result: @data)
rescue OperationTypeUnsupportedError => op_err
return build_result(errors: [op_err.to_h])
end
root_scopes = @planner.root_scopes_for_operation(operation, root_object: @root_object, result: @data)

@planner.plan_scopes(root_scopes).each do |exec_scope|
@exec_queue << exec_scope
Expand Down Expand Up @@ -274,6 +304,8 @@ def execute
end
end

private

#: (
#| Array[ExecutionDirective],
#| ?current_field: ExecutionField[untyped]?,
Expand Down Expand Up @@ -528,10 +560,10 @@ def execute_field(exec_field)
if !pre_authorized
exec_field.result = exec_field.resolve_all(FieldAuthorizationError.new(exec_field: exec_field))
elsif exec_field.directives.empty?
exec_field.result = exec_field.resolver.resolve_field(exec_field, @context)
exec_field.result = exec_field.resolver.resolve(exec_field, @context)
else
execute_with_directives(exec_field.directives, current_field: exec_field) do
exec_field.result = exec_field.resolver.resolve_field(exec_field, @context)
exec_field.result = exec_field.resolver.resolve(exec_field, @context)
end
end
rescue StandardError => e
Expand Down Expand Up @@ -831,6 +863,52 @@ def build_abstract_scopes(exec_field, return_type, next_objects, next_results)
@exec_queue.concat(@planner.plan_scopes(scopes))
end

#: (GraphQL::Language::Nodes::OperationDefinition) -> (SubscriptionResponseStream | graphql_result)
def subscribe(operation)
@executed = true

unless @context.errors.empty?
return build_result(errors: @context.errors.map(&:to_h))
end

begin
@input.coerce_variable_values(operation.variables, @query.provided_variables || EMPTY_OBJECT)
rescue InputValidationErrorSet => input_error
return build_result(errors: serialize_errors(input_error))
end

execute_with_directives(@planner.root_directives_for_operation(operation)) do
root_scopes = @planner.root_scopes_for_operation(operation, root_object: @root_object, result: @data)
exec_scope = @planner.plan_scopes(root_scopes).fetch(0)
exec_field = exec_scope.fields.each_value.first #: as !nil

exec_field.validate!
exec_field.lazy_state_executing!

pre_authorized = @authorization.authorized_field?(exec_field, @context)
pre_authorized &&= @authorization.authorized_type?(exec_field.type.unwrap, @context, exec_field: exec_field)
raise FieldAuthorizationError.new(exec_field: exec_field) unless pre_authorized

source_stream = if exec_field.directives.empty?
exec_field.resolver.subscribe(exec_field, @context)
else
execute_with_directives(exec_field.directives, current_field: exec_field) do
exec_field.resolver.subscribe(exec_field, @context)
end
end

unless source_stream.is_a?(Enumerable)
raise ImplementationError, "Subscription source must return an Enumerable"
end

exec_field.lazy_state_locked!
SubscriptionResponseStream.new(executor: self, source_stream: source_stream)
rescue StandardError => e
handled_error = handle_or_reraise(e, exec_field: exec_field)
build_result(errors: serialize_errors(handled_error, exec_field: exec_field))
end
end

#: (?data: Util::NilLike | graphql_result | nil, ?errors: Array[error_hash]) -> graphql_result
def build_result(data: UNDEFINED, errors: EMPTY_ARRAY)
# Truncate errors but preserve GraphQL-Ruby's validation error limit message when present.
Expand Down Expand Up @@ -862,6 +940,20 @@ def build_result(data: UNDEFINED, errors: EMPTY_ARRAY)
@result
end

#: (ExecutionError, ?exec_field: ExecutionField[untyped]?) -> Array[error_hash]
def serialize_errors(error, exec_field: nil)
errors = [] #: Array[error_hash]
error.each do |err|
next if err.equal?(UNREPORTED_ERROR)

hash = err.to_h
hash["path"] ||= exec_field.path if exec_field
errors << hash
end

errors
end

#: (?Float?) -> Float
def timer(start_time = nil)
now = Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond)
Expand Down
Loading
Loading