diff --git a/README.md b/README.md index 8335c1d..eedef8a 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ The execution algorithm is proven at scale in production. This implementation st * Uses GraphQL Ruby schemas. * Currently no built-in validation or analysis, do it ahead of time. -* No support for subscription, defer, or stream. +* Currently no defer or stream. * Supports input validations, but NOT input transformations (ie: "prepare" hooks). # Usage @@ -16,11 +16,10 @@ The execution algorithm is proven at scale in production. This implementation st ## Execute a query ```ruby -executor = GraphQL::BreadthExec::Executor.build( - schema: MyGraphQLSchema, - document: GraphQL.parse(document), +executor = GraphQL::BreadthExec::Executor.new( + MyGraphQLSchema, + GraphQL.parse(document), root_object: { ... }, - operation_name: "OperationName", variables: { ... }, context: { ... }, tracers: [ ... ], @@ -539,7 +538,9 @@ class MyAuthorization < GraphQL::BreadthExec::Authorization # ... detail auth behaviors end -GraphQL::BreadthExec::Executor.build( +GraphQL::BreadthExec::Executor.new( + MyGraphQLSchema, + GraphQL.parse(document), authorization: MyAuthorization, ) ``` @@ -654,3 +655,83 @@ end ``` This architecture makes cascading resolvers run repeatedly on every field in a subtree, rather than just once at the top of the owning field's subtree. This pattern is more granular and generally safer for isolation and parallelism, though has more resolver churn than a typical depth traversal so should be used accordingly. + +## Subscriptions + +Query and mutation execution use `result`, which always returns a normal GraphQL result hash. If a controller accepts subscription operations, use `result_or_subscribe` instead. For query and mutation operations it returns the same result hash as `result`; for subscription operations it returns a `GraphQL::BreadthExec::SubscriptionResponseStream` on successful source setup, or a normal GraphQL result hash for public setup errors. Calling `result` for a subscription operation raises an implementation error. + +```ruby +executor = GraphQL::BreadthExec::Executor.new( + MyGraphQLSchema, + GraphQL.parse(document), + variables: { ... }, + context: { ... }, +) + +response = executor.result_or_subscribe + +if response.is_a?(GraphQL::BreadthExec::SubscriptionResponseStream) + response.each do |event_result| + deliver(event_result) + end +else + deliver(response) +end +``` + +Subscription root fields use two field resolver hooks: + +* `subscribe(exec_field, context)` runs once during subscription setup and must return an `Enumerable` or `Enumerator` of source events. +* `resolve(exec_field, context)` runs once per yielded source event. The source event is used as the root object for that event's GraphQL execution. + +```ruby +class OnWriteValueResolver < GraphQL::BreadthExec::FieldResolver + def subscribe(exec_field, context) + context[:write_value_events] + end + + def resolve(exec_field, context) + exec_field.map_objects(&:itself) + end +end + +class WriteValuePayload < BaseObject + field :value, String, null: true +end + +class Subscription < BaseObject + field :on_write_value, WriteValuePayload, null: true do |field| + field.breadth_resolver = OnWriteValueResolver.new + end +end +``` + +For a small in-process source stream, any Ruby enumerator is enough: + +```ruby +write_value_events = Enumerator.new do |events| + events << { value: "first" } + events << { value: "second" } +end + +executor = GraphQL::BreadthExec::Executor.new( + MyGraphQLSchema, + GraphQL.parse(%| + subscription WatchWrites { + onWriteValue { + value + } + } + |), + context: { write_value_events: write_value_events }, +) + +stream = executor.result_or_subscribe +stream.each do |event_result| + # {"data"=>{"onWriteValue"=>{"value"=>"first"}}} + # {"data"=>{"onWriteValue"=>{"value"=>"second"}}} + deliver(event_result) +end +``` + +Each source event is fulfilled through normal breadth execution, so field resolvers, lazy loading, authorization, directives, abstract type resolution, and error formatting all work as they do for query execution. Errors raised while enumerating the source stream are allowed to propagate to the stream consumer. Promise-backed subscription setup is not supported; `subscribe` should return the source stream synchronously. Returning a promise or any non-enumerable value is an implementation error. diff --git a/benchmark/run.rb b/benchmark/run.rb index 99601d0..85318dd 100644 --- a/benchmark/run.rb +++ b/benchmark/run.rb @@ -363,7 +363,7 @@ def benchmark_execution resolvers: BREADTH_RESOLVERS, root_object: data_source, tracers: [CARDINAL_TRACER], - ).perform + ).result end x.compare! @@ -396,7 +396,7 @@ def benchmark_lazy_execution resolvers: BREADTH_DEFERRED_RESOLVERS, root_object: data_source, tracers: [CARDINAL_TRACER], - ).perform + ).result end x.compare! @@ -419,7 +419,7 @@ def benchmark_introspection resolvers: BREADTH_RESOLVERS, root_object: {}, tracers: [CARDINAL_TRACER], - ).perform + ).result end x.compare! @@ -451,7 +451,7 @@ def benchmark_resolve_batch resolvers: RESOLVE_BATCH_RESOLVERS, root_object: RESOLVE_BATCH_DATA, tracers: [CARDINAL_TRACER], - ).perform + ).result end x.compare! @@ -471,7 +471,7 @@ def benchmark_lazy_scalars resolvers: LAZY_SCALAR_BREADTH_RESOLVERS, root_object: RESOLVE_BATCH_DATA, tracers: [CARDINAL_TRACER], - ).perform + ).result end x.report("graphql-ruby graphql-batch: #{RESOLVE_BATCH_OBJECT_COUNT} x 3 scalars") do @@ -523,7 +523,7 @@ def benchmark_lazy_field_batch resolvers: LAZY_SCALAR_BREADTH_RESOLVERS, root_object: RESOLVE_BATCH_DATA, tracers: [CARDINAL_TRACER], - ).perform + ).result end x.compare! @@ -552,7 +552,7 @@ def memory_profile resolvers: BREADTH_RESOLVERS, root_object: data_source, tracers: [CARDINAL_TRACER], - ).perform + ).result end puts "\n\ngraphql-cardinal memory profile: #{num_objects} resolvers" diff --git a/lib/graphql/breadth_exec.rb b/lib/graphql/breadth_exec.rb index b1dc4bc..945c74c 100644 --- a/lib/graphql/breadth_exec.rb +++ b/lib/graphql/breadth_exec.rb @@ -44,6 +44,7 @@ class ExecutionField; end require_relative "breadth_exec/tracer" require_relative "breadth_exec/field_resolvers" require_relative "breadth_exec/directive_resolvers" +require_relative "breadth_exec/subscription_response_stream" require_relative "breadth_exec/has_breadth_resolver" require_relative "breadth_exec/introspection" require_relative "breadth_exec/executor" diff --git a/lib/graphql/breadth_exec/errors.rb b/lib/graphql/breadth_exec/errors.rb index 6f76fdc..5894cf2 100644 --- a/lib/graphql/breadth_exec/errors.rb +++ b/lib/graphql/breadth_exec/errors.rb @@ -179,21 +179,6 @@ def initialize(message = nil, exec_field:, list_item: false) end end - class OperationTypeUnsupportedError < ExecutionError - #: (String) -> void - def initialize(operation_type) - @operation_type = operation_type - super("Unsupported operation type") - end - - #: () -> error_hash - def to_h - super.tap do |hash| - hash["path"] = [@operation_type] - end - end - end - class InputError < ExecutionError #: ( #| String message, diff --git a/lib/graphql/breadth_exec/executor.rb b/lib/graphql/breadth_exec/executor.rb index 3e469ae..36d381e 100644 --- a/lib/graphql/breadth_exec/executor.rb +++ b/lib/graphql/breadth_exec/executor.rb @@ -66,8 +66,10 @@ def initialize(schema, document, resolvers: EMPTY_OBJECT, root_object: nil, vari @input = InputFormatter.new(@context) @planner = ExecutionPlanner.new(executor: self, resolvers: @resolvers) @authorization = authorization.new + @authorization_class = authorization @document = document @root_object = root_object + @context_value = context @tracers = tracers @result = {} @data = {} @@ -98,9 +100,41 @@ def paths end #: -> graphql_result - def perform - execute unless executed? - @result + def result + operation = @query.selected_operation + if operation.operation_type == ExecutionPlanner::SUBSCRIPTION_OPERATION + raise ImplementationError, "Use result_or_subscribe for subscription operations" + end + + return @result if executed? + + execute + end + + #: -> (SubscriptionResponseStream | graphql_result) + def result_or_subscribe + return @result if executed? + + operation = @query.selected_operation + @result = if operation.operation_type == ExecutionPlanner::SUBSCRIPTION_OPERATION + subscribe(operation) + else + execute + end + end + + #: (untyped) -> graphql_result + def execute_subscription_event(object) + self.class.new( + @schema, + @document, + resolvers: @resolvers, + root_object: object, + variables: @provided_variables, + context: @context_value, + tracers: @tracers, + authorization: @authorization_class, + ).execute end #: (singleton(LazyLoader), ?loader_args?) -> LazyLoader[untyped] @@ -173,7 +207,7 @@ def executed? @executed end - private + protected #: -> graphql_result def execute @@ -211,11 +245,7 @@ def execute @tracers.each { _1.before_execute(self, @context) } end - root_scopes = begin - @planner.root_scopes_for_operation(operation, root_object: @root_object, result: @data) - rescue OperationTypeUnsupportedError => op_err - return build_result(errors: [op_err.to_h]) - end + root_scopes = @planner.root_scopes_for_operation(operation, root_object: @root_object, result: @data) @planner.plan_scopes(root_scopes).each do |exec_scope| @exec_queue << exec_scope @@ -274,6 +304,8 @@ def execute end end + private + #: ( #| Array[ExecutionDirective], #| ?current_field: ExecutionField[untyped]?, @@ -528,10 +560,10 @@ def execute_field(exec_field) if !pre_authorized exec_field.result = exec_field.resolve_all(FieldAuthorizationError.new(exec_field: exec_field)) elsif exec_field.directives.empty? - exec_field.result = exec_field.resolver.resolve_field(exec_field, @context) + exec_field.result = exec_field.resolver.resolve(exec_field, @context) else execute_with_directives(exec_field.directives, current_field: exec_field) do - exec_field.result = exec_field.resolver.resolve_field(exec_field, @context) + exec_field.result = exec_field.resolver.resolve(exec_field, @context) end end rescue StandardError => e @@ -831,6 +863,52 @@ def build_abstract_scopes(exec_field, return_type, next_objects, next_results) @exec_queue.concat(@planner.plan_scopes(scopes)) end + #: (GraphQL::Language::Nodes::OperationDefinition) -> (SubscriptionResponseStream | graphql_result) + def subscribe(operation) + @executed = true + + unless @context.errors.empty? + return build_result(errors: @context.errors.map(&:to_h)) + end + + begin + @input.coerce_variable_values(operation.variables, @query.provided_variables || EMPTY_OBJECT) + rescue InputValidationErrorSet => input_error + return build_result(errors: serialize_errors(input_error)) + end + + execute_with_directives(@planner.root_directives_for_operation(operation)) do + root_scopes = @planner.root_scopes_for_operation(operation, root_object: @root_object, result: @data) + exec_scope = @planner.plan_scopes(root_scopes).fetch(0) + exec_field = exec_scope.fields.each_value.first #: as !nil + + exec_field.validate! + exec_field.lazy_state_executing! + + pre_authorized = @authorization.authorized_field?(exec_field, @context) + pre_authorized &&= @authorization.authorized_type?(exec_field.type.unwrap, @context, exec_field: exec_field) + raise FieldAuthorizationError.new(exec_field: exec_field) unless pre_authorized + + source_stream = if exec_field.directives.empty? + exec_field.resolver.subscribe(exec_field, @context) + else + execute_with_directives(exec_field.directives, current_field: exec_field) do + exec_field.resolver.subscribe(exec_field, @context) + end + end + + unless source_stream.is_a?(Enumerable) + raise ImplementationError, "Subscription source must return an Enumerable" + end + + exec_field.lazy_state_locked! + SubscriptionResponseStream.new(executor: self, source_stream: source_stream) + rescue StandardError => e + handled_error = handle_or_reraise(e, exec_field: exec_field) + build_result(errors: serialize_errors(handled_error, exec_field: exec_field)) + end + end + #: (?data: Util::NilLike | graphql_result | nil, ?errors: Array[error_hash]) -> graphql_result def build_result(data: UNDEFINED, errors: EMPTY_ARRAY) # Truncate errors but preserve GraphQL-Ruby's validation error limit message when present. @@ -862,6 +940,20 @@ def build_result(data: UNDEFINED, errors: EMPTY_ARRAY) @result end + #: (ExecutionError, ?exec_field: ExecutionField[untyped]?) -> Array[error_hash] + def serialize_errors(error, exec_field: nil) + errors = [] #: Array[error_hash] + error.each do |err| + next if err.equal?(UNREPORTED_ERROR) + + hash = err.to_h + hash["path"] ||= exec_field.path if exec_field + errors << hash + end + + errors + end + #: (?Float?) -> Float def timer(start_time = nil) now = Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) diff --git a/lib/graphql/breadth_exec/executor/execution_planner.rb b/lib/graphql/breadth_exec/executor/execution_planner.rb index e847780..67e0715 100644 --- a/lib/graphql/breadth_exec/executor/execution_planner.rb +++ b/lib/graphql/breadth_exec/executor/execution_planner.rb @@ -83,8 +83,16 @@ def root_scopes_for_operation(operation, root_object:, result:) results: [result], ) end - else - raise OperationTypeUnsupportedError.new(operation.operation_type) + when SUBSCRIPTION_OPERATION + [ + ExecutionScope.new( + executor: @executor, + parent_type: root_type_for_operation(SUBSCRIPTION_OPERATION), + selections: operation.selections, + objects: [root_object], + results: [result], + ), + ] end end @@ -176,7 +184,6 @@ def parent_type_possible?(fragment_type, parent_type) #| ?ordered_fields: Array[ExecutionField[untyped]], #| ) -> Array[ExecutionField[untyped]] def build_execution_tree(exec_scope, ordered_fields = []) - exec_scope.fields ||= {} @has_runtime_directives = false selections_by_key = selections_grouped_by_key(exec_scope.parent_type, exec_scope.selections) has_runtime_directives = @has_runtime_directives diff --git a/lib/graphql/breadth_exec/executor/execution_scope.rb b/lib/graphql/breadth_exec/executor/execution_scope.rb index 75db38f..7f15036 100644 --- a/lib/graphql/breadth_exec/executor/execution_scope.rb +++ b/lib/graphql/breadth_exec/executor/execution_scope.rb @@ -35,7 +35,7 @@ class ExecutionScope attr_reader :parent #: Hash[String, ExecutionField[untyped]] - attr_accessor :fields + attr_reader :fields #: bool attr_writer :executed @@ -72,7 +72,7 @@ def initialize( @abstraction = abstraction @path = (parent_field ? parent_field.path : path).freeze @parent = parent_field ? parent_field.scope : parent - @fields = nil + @fields = {} @executed = false @root = nil @planning_root = nil diff --git a/lib/graphql/breadth_exec/field_resolvers.rb b/lib/graphql/breadth_exec/field_resolvers.rb index 55a2b1a..64e8b07 100644 --- a/lib/graphql/breadth_exec/field_resolvers.rb +++ b/lib/graphql/breadth_exec/field_resolvers.rb @@ -11,13 +11,8 @@ def plan(_exec_field, _ctx) end #: (Executor::ExecutionField[untyped], ContextType) -> (Array[untyped] | ExecutionPromise) - def resolve(*) - raise NotImplementedError, "Resolver#resolve must be implemented." - end - - #: (Executor::ExecutionField[untyped], ContextType) -> (Array[untyped] | ExecutionPromise) - def resolve_field(exec_field, ctx) - resolve(exec_field, ctx) + def resolve(exec_field, ctx) + raise NotImplementedError, "FieldResolver#resolve must be implemented." end #: (Array[untyped] | ExecutionPromise) { (Array[untyped]) -> Array[untyped] } -> (Array[untyped] | ExecutionPromise) @@ -28,6 +23,11 @@ def handle_resolved(result) yield(result) end end + + #: (Executor::ExecutionField[untyped], ContextType) -> Enumerable + def subscribe(_exec_field, _ctx) + raise NotImplementedError, "FieldResolver#subscribe must be implemented." + end end #: [ContextType = GraphQL::Query::Context] diff --git a/lib/graphql/breadth_exec/subscription_response_stream.rb b/lib/graphql/breadth_exec/subscription_response_stream.rb new file mode 100644 index 0000000..379eef5 --- /dev/null +++ b/lib/graphql/breadth_exec/subscription_response_stream.rb @@ -0,0 +1,33 @@ +# typed: true +# frozen_string_literal: true + +module GraphQL + module BreadthExec + class SubscriptionResponseStream + include Enumerable + + #: Executor + attr_reader :executor + + #: Enumerable + attr_reader :source_stream + + #: (executor: Executor, source_stream: Enumerable) -> void + def initialize(executor:, source_stream:) + @executor = executor + @source_stream = source_stream + end + + #: () ?{ (graphql_result) -> void } -> Enumerator? + def each(&block) + return enum_for(:each) unless block + + @source_stream.each do |source_event| + block.call(@executor.execute_subscription_event(source_event)) + end + + nil + end + end + end +end diff --git a/test/graphql/breadth_exec/executor/abstracts_test.rb b/test/graphql/breadth_exec/executor/abstracts_test.rb index 3d03a06..e7d0e08 100644 --- a/test/graphql/breadth_exec/executor/abstracts_test.rb +++ b/test/graphql/breadth_exec/executor/abstracts_test.rb @@ -105,7 +105,7 @@ def test_unresolved_abstract_type_reports_basic_schema_type_error root_object: { "node" => { "id" => "Product/1" } }, ) - assert_raises(GraphQL::BreadthExec::ImplementationError) { executor.perform } + assert_raises(GraphQL::BreadthExec::ImplementationError) { executor.result } error, context = TypeErrorSchema.type_errors.first assert_kind_of GraphQL::UnresolvedTypeError, error diff --git a/test/graphql/breadth_exec/executor/authorization_test.rb b/test/graphql/breadth_exec/executor/authorization_test.rb index 7d72961..eb86097 100644 --- a/test/graphql/breadth_exec/executor/authorization_test.rb +++ b/test/graphql/breadth_exec/executor/authorization_test.rb @@ -79,7 +79,7 @@ def execute(document) resolvers: AUTH_RESOLVERS, root_object: SOURCE, authorization: RecordingAuthorization, - ).perform + ).result end def test_checks_field_authorization_before_resolving_field diff --git a/test/graphql/breadth_exec/executor/directives_test.rb b/test/graphql/breadth_exec/executor/directives_test.rb index 8dd3be7..c5761be 100644 --- a/test/graphql/breadth_exec/executor/directives_test.rb +++ b/test/graphql/breadth_exec/executor/directives_test.rb @@ -74,7 +74,7 @@ def test_resolves_operation_and_field_directives } }|) - result = GraphQL::BreadthExec::Executor.new(DIRECTIVE_SCHEMA, document, resolvers: resolvers, root_object: SOURCE).perform + result = GraphQL::BreadthExec::Executor.new(DIRECTIVE_SCHEMA, document, resolvers: resolvers, root_object: SOURCE).result assert_equal "Original", result.dig("data", "widget", "title") assert_equal [ @@ -103,7 +103,7 @@ def test_wrapping_field_directive_can_short_circuit_resolution } }|) - result = GraphQL::BreadthExec::Executor.new(DIRECTIVE_SCHEMA, document, resolvers: resolvers, root_object: SOURCE).perform + result = GraphQL::BreadthExec::Executor.new(DIRECTIVE_SCHEMA, document, resolvers: resolvers, root_object: SOURCE).result assert_equal "Overridden", result.dig("data", "widget", "title") end @@ -119,7 +119,7 @@ def test_invalid_directive_arguments_are_reported_then_raised_during_execution GraphQL.parse(%|{ widget { title @replace } }|), resolvers: resolvers, root_object: SOURCE, - ).perform + ).result end assert_equal ["Argument \"value\" of required type \"String!\" was not provided."], reported.errors.map(&:message) @@ -151,7 +151,7 @@ def test_cascading_field_directive_applies_to_descendants } }|) - result = GraphQL::BreadthExec::Executor.new(DIRECTIVE_SCHEMA, document, resolvers: resolvers, root_object: SOURCE).perform + result = GraphQL::BreadthExec::Executor.new(DIRECTIVE_SCHEMA, document, resolvers: resolvers, root_object: SOURCE).result assert_equal "Original", result.dig("data", "widget", "title") assert_equal "Nested", result.dig("data", "widget", "nested", "title") diff --git a/test/graphql/breadth_exec/executor/error_formatter_test.rb b/test/graphql/breadth_exec/executor/error_formatter_test.rb index a9b5972..74e236b 100644 --- a/test/graphql/breadth_exec/executor/error_formatter_test.rb +++ b/test/graphql/breadth_exec/executor/error_formatter_test.rb @@ -199,7 +199,7 @@ def exec_test(schema, query, source) root_object: source, ) - result = executor.perform + result = executor.result yield executor if block_given? result end diff --git a/test/graphql/breadth_exec/executor/errors_test.rb b/test/graphql/breadth_exec/executor/errors_test.rb index 5fd93ba..b8be10b 100644 --- a/test/graphql/breadth_exec/executor/errors_test.rb +++ b/test/graphql/breadth_exec/executor/errors_test.rb @@ -134,7 +134,7 @@ def test_handled_build_result_error_stays_field_local } executor = GraphQL::BreadthExec::Executor.new(BUILD_ERROR_SCHEMA, document, resolvers: BUILD_ERROR_RESOLVERS, root_object: source) - assert_equal expected, executor.perform + assert_equal expected, executor.result end def test_nullable_scalar_result_coerced_to_nil_builds_missing_value_without_error @@ -254,6 +254,6 @@ def execute_result_coercion_query(document, source) ResultCoercionSchema, GraphQL.parse(document), root_object: source, - ).perform + ).result end end diff --git a/test/graphql/breadth_exec/executor/introspection_test.rb b/test/graphql/breadth_exec/executor/introspection_test.rb index d3c005b..93ab497 100644 --- a/test/graphql/breadth_exec/executor/introspection_test.rb +++ b/test/graphql/breadth_exec/executor/introspection_test.rb @@ -537,6 +537,6 @@ def test_introspect_argument_default_values private def execute_query(document, schema: TEST_SCHEMA) - GraphQL::BreadthExec::Executor.new(schema, GraphQL.parse(document), resolvers: TEST_RESOLVERS, root_object: {}).perform + GraphQL::BreadthExec::Executor.new(schema, GraphQL.parse(document), resolvers: TEST_RESOLVERS, root_object: {}).result end end diff --git a/test/graphql/breadth_exec/executor/lists_test.rb b/test/graphql/breadth_exec/executor/lists_test.rb index 49f939d..01e1b0c 100644 --- a/test/graphql/breadth_exec/executor/lists_test.rb +++ b/test/graphql/breadth_exec/executor/lists_test.rb @@ -202,6 +202,6 @@ def test_resolves_nested_leaf_lists private def execute_query(document, source, schema: TEST_SCHEMA, resolvers: TEST_RESOLVERS) - GraphQL::BreadthExec::Executor.new(schema, GraphQL.parse(document), resolvers: resolvers, root_object: source).perform + GraphQL::BreadthExec::Executor.new(schema, GraphQL.parse(document), resolvers: resolvers, root_object: source).result end end diff --git a/test/graphql/breadth_exec/executor/loaders_test.rb b/test/graphql/breadth_exec/executor/loaders_test.rb index fc7a161..1345f6d 100644 --- a/test/graphql/breadth_exec/executor/loaders_test.rb +++ b/test/graphql/breadth_exec/executor/loaders_test.rb @@ -153,7 +153,7 @@ def test_splits_loaders_by_group_across_fields } executor = GraphQL::BreadthExec::Executor.new(LOADER_SCHEMA, document, resolvers: LOADER_RESOLVERS, root_object: source) - assert_equal expected, executor.perform + assert_equal expected, executor.result assert_equal [["Apple", "Banana"], ["Coconut"]], FancyLoader.perform_keys end @@ -187,7 +187,7 @@ def test_maintains_ordered_selections_around_object_fields } executor = GraphQL::BreadthExec::Executor.new(LOADER_SCHEMA, document, resolvers: LOADER_RESOLVERS, root_object: source) - result = executor.perform + result = executor.result assert_equal expected, result assert_equal result.dig("data", "widget").keys, expected.dig("data", "widget").keys end @@ -222,7 +222,7 @@ def test_maintains_ordered_selections_around_leaf_fields } executor = GraphQL::BreadthExec::Executor.new(LOADER_SCHEMA, document, resolvers: LOADER_RESOLVERS, root_object: source) - result = executor.perform + result = executor.result assert_equal expected, result assert_equal result.dig("data", "widget").keys, expected.dig("data", "widget").keys end @@ -256,7 +256,7 @@ def test_lazy_field_uses_eager_values_without_loading_them } executor = GraphQL::BreadthExec::Executor.new(LOADER_SCHEMA, document, resolvers: resolvers, root_object: source) - assert_equal expected, executor.perform + assert_equal expected, executor.result assert_equal [["Banana"]], FancyLoader.perform_keys end @@ -286,7 +286,7 @@ def test_resumes_field_execution_after_lazy_preloads } executor = GraphQL::BreadthExec::Executor.new(LOADER_SCHEMA, document, resolvers: resolvers, root_object: source) - assert_equal expected, executor.perform + assert_equal expected, executor.result assert_equal [["Apple"]], FancyLoader.perform_keys end @@ -319,7 +319,7 @@ def test_resumes_scope_execution_after_lazy_preloads } executor = GraphQL::BreadthExec::Executor.new(LOADER_SCHEMA, document, resolvers: resolvers, root_object: source) - assert_equal expected, executor.perform + assert_equal expected, executor.result assert_equal [["Apple"], ["Banana"]], FancyLoader.perform_keys end end diff --git a/test/graphql/breadth_exec/executor/subscriptions_test.rb b/test/graphql/breadth_exec/executor/subscriptions_test.rb new file mode 100644 index 0000000..6877f15 --- /dev/null +++ b/test/graphql/breadth_exec/executor/subscriptions_test.rb @@ -0,0 +1,313 @@ +# frozen_string_literal: true + +require "timeout" + +require "test_helper" + +class GraphQL::BreadthExec::Executor::SubscriptionsTest < Minitest::Test + SUBSCRIPTION_SCHEMA = GraphQL::Schema.from_definition(%| + schema { + query: Query + subscription: Subscription + } + + type Query { + noop: String + } + + type WriteValuePayload { + value: String + lazyValue: String + error: String + } + + type Subscription { + onWriteValue(value: String): WriteValuePayload + } + |) + + class SourceResolver < GraphQL::BreadthExec::FieldResolver + attr_reader :resolve_calls + attr_reader :subscribe_calls + + def initialize(subscribe: nil, resolve: nil) + @subscribe = subscribe + @resolve = resolve + @subscribe_calls = [] + @resolve_calls = [] + end + + def subscribe(exec_field, ctx) + @subscribe_calls << [exec_field.arguments, ctx[:test_context]] + return @subscribe.call(exec_field, ctx) if @subscribe + + exec_field.objects.first.fetch("events") + end + + def resolve(exec_field, ctx) + @resolve_calls << [exec_field.objects, ctx[:test_context]] + return @resolve.call(exec_field, ctx) if @resolve + + exec_field.map_objects(&:itself) + end + end + + class EventErrorResolver < GraphQL::BreadthExec::FieldResolver + def resolve(exec_field, _ctx) + exec_field.resolve_all(GraphQL::ExecutionError.new("Event failed")) + end + end + + def test_result_raises_for_subscription_operation + error = assert_raises(GraphQL::BreadthExec::ImplementationError) do + subscription_executor(%|subscription { onWriteValue { value } }|).result + end + + assert_equal "Use result_or_subscribe for subscription operations", error.message + end + + def test_result_or_subscribe_returns_subscription_response_stream_for_subscription_operation + result = subscription_executor( + %|subscription { onWriteValue { value } }|, + root_object: { "events" => [{ "value" => "direct" }] }, + ).result_or_subscribe + + assert_instance_of GraphQL::BreadthExec::SubscriptionResponseStream, result + assert_equal [{ "data" => { "onWriteValue" => { "value" => "direct" } } }], result.to_a + end + + def test_subscribe_reports_graphql_execution_error_from_source_resolver + resolver = SourceResolver.new(subscribe: ->(_exec_field, _ctx) { raise GraphQL::ExecutionError, "Cannot subscribe" }) + + result = subscription_executor(%|subscription { onWriteValue { value } }|, source_resolver: resolver).result_or_subscribe + + assert_equal "Cannot subscribe", result.dig("errors", 0, "message") + assert_equal ["onWriteValue"], result.dig("errors", 0, "path") + end + + def test_subscribe_raises_for_plain_field_resolver_without_subscription_hook + error = assert_raises(NotImplementedError) do + subscription_executor( + %|subscription { onWriteValue { value } }|, + source_resolver: GraphQL::BreadthExec::SelfResolver.new, + ).result_or_subscribe + end + + assert_equal "FieldResolver#subscribe must be implemented.", error.message + end + + def test_subscribe_raises_for_non_enumerable_source + resolver = SourceResolver.new(subscribe: ->(_exec_field, _ctx) { Object.new }) + + error = assert_raises(GraphQL::BreadthExec::ImplementationError) do + subscription_executor(%|subscription { onWriteValue { value } }|, source_resolver: resolver).result_or_subscribe + end + + assert_equal "Subscription source must return an Enumerable", error.message + end + + def test_subscribe_raises_for_lazy_source_setup + resolver = SourceResolver.new(subscribe: ->(_exec_field, _ctx) { GraphQL::BreadthExec::ExecutionPromise.new }) + + error = assert_raises(GraphQL::BreadthExec::ImplementationError) do + subscription_executor(%|subscription { onWriteValue { value } }|, source_resolver: resolver).result_or_subscribe + end + + assert_equal "Subscription source must return an Enumerable", error.message + end + + def test_stream_yields_one_result_for_one_source_event + stream = subscription_executor( + %|subscription { onWriteValue { value } }|, + root_object: { "events" => [{ "value" => "first" }] }, + ).result_or_subscribe + + assert_instance_of GraphQL::BreadthExec::SubscriptionResponseStream, stream + assert_equal( + [{ "data" => { "onWriteValue" => { "value" => "first" } } }], + stream.to_a, + ) + end + + def test_stream_preserves_multiple_event_order + stream = subscription_executor( + %|subscription { onWriteValue { value } }|, + root_object: { + "events" => [ + { "value" => "first" }, + { "value" => "second" }, + { "value" => "third" }, + ], + }, + ).result_or_subscribe + + assert_equal( + [ + { "data" => { "onWriteValue" => { "value" => "first" } } }, + { "data" => { "onWriteValue" => { "value" => "second" } } }, + { "data" => { "onWriteValue" => { "value" => "third" } } }, + ], + stream.to_a, + ) + end + + def test_stream_yields_events_published_after_subscription_result_is_created + events = Queue.new + source_stream = Enumerator.new do |yielder| + loop do + event = events.pop + break if event == :done + + yielder << event + end + end + + stream = subscription_executor( + %|subscription { onWriteValue { value } }|, + root_object: { "events" => source_stream }, + ).result_or_subscribe + + deliveries = Queue.new + stream_errors = Queue.new + consumer = Thread.new do + stream.each { deliveries << _1 } + rescue StandardError => e + stream_errors << e + end + + assert_equal( + { "data" => { "onWriteValue" => { "value" => "after subscribe" } } }, + begin + events << { "value" => "after subscribe" } + Timeout.timeout(1) { deliveries.pop } + end, + ) + + assert_equal( + { "data" => { "onWriteValue" => { "value" => "after that" } } }, + begin + events << { "value" => "after that" } + Timeout.timeout(1) { deliveries.pop } + end, + ) + + events << :done + assert consumer.join(1), "Expected subscription stream consumer to finish" + flunk("Expected no stream errors, got #{stream_errors.pop.message}") unless stream_errors.empty? + ensure + consumer&.kill + end + + def test_event_execution_uses_subscription_root_resolver + resolver = SourceResolver.new( + resolve: ->(exec_field, _ctx) { exec_field.map_objects { _1.fetch("payload") } }, + ) + + stream = subscription_executor( + %|subscription { onWriteValue { value } }|, + source_resolver: resolver, + root_object: { "events" => [{ "payload" => { "value" => "from resolver" } }] }, + context: { test_context: "ctx" }, + ).result_or_subscribe + + assert_equal [{ "data" => { "onWriteValue" => { "value" => "from resolver" } } }], stream.to_a + assert_equal [[{}, "ctx"]], resolver.subscribe_calls + assert_equal [[[{ "payload" => { "value" => "from resolver" } }], "ctx"]], resolver.resolve_calls + end + + def test_event_execution_supports_lazy_fields + stream = subscription_executor( + %|subscription { onWriteValue { lazyValue } }|, + root_object: { "events" => [{ "lazyValue" => "loaded" }] }, + ).result_or_subscribe + + assert_equal( + [{ "data" => { "onWriteValue" => { "lazyValue" => "loaded" } } }], + stream.to_a, + ) + end + + def test_event_execution_errors_are_returned_in_result + stream = subscription_executor( + %|subscription { onWriteValue { error } }|, + root_object: { "events" => [{ "value" => "bad" }] }, + ).result_or_subscribe + + assert_equal( + [{ + "errors" => [{ + "message" => "Event failed", + "locations" => [{ "line" => 1, "column" => 31 }], + "path" => ["onWriteValue", "error"], + }], + "data" => { "onWriteValue" => { "error" => nil } }, + }], + stream.to_a, + ) + end + + def test_source_stream_errors_propagate_to_consumer + source_stream = Enumerator.new do |yielder| + yielder << { "value" => "before error" } + raise RuntimeError, "source failed" + end + + stream = subscription_executor( + %|subscription { onWriteValue { value } }|, + root_object: { "events" => source_stream }, + ).result_or_subscribe + + responses = [] + error = assert_raises(RuntimeError) do + stream.each { responses << _1 } + end + + assert_equal "source failed", error.message + assert_equal [{ "data" => { "onWriteValue" => { "value" => "before error" } } }], responses + end + + def test_result_or_subscribe_returns_same_stream_when_called_twice + executor = subscription_executor(%|subscription { onWriteValue { value } }|) + + stream = executor.result_or_subscribe + + assert_same stream, executor.result_or_subscribe + end + + def test_result_raises_after_subscription_response_stream_was_created + executor = subscription_executor(%|subscription { onWriteValue { value } }|) + + executor.result_or_subscribe + error = assert_raises(GraphQL::BreadthExec::ImplementationError) { executor.result } + + assert_equal "Use result_or_subscribe for subscription operations", error.message + end + + private + + def subscription_executor(document, source_resolver: SourceResolver.new, root_object: { "events" => [] }, context: {}) + GraphQL::BreadthExec::Executor.new( + SUBSCRIPTION_SCHEMA, + GraphQL.parse(document), + resolvers: subscription_resolvers(source_resolver), + root_object: root_object, + context: context, + ) + end + + def subscription_resolvers(source_resolver) + { + "Query" => { + "noop" => GraphQL::BreadthExec::ValueResolver.new("noop"), + }, + "Subscription" => { + "onWriteValue" => source_resolver, + }, + "WriteValuePayload" => { + "value" => GraphQL::BreadthExec::HashKeyResolver.new("value"), + "lazyValue" => DeferredHashResolver.new("lazyValue"), + "error" => EventErrorResolver.new, + }, + }.freeze + end +end diff --git a/test/graphql/breadth_exec/executor/tracers_test.rb b/test/graphql/breadth_exec/executor/tracers_test.rb index 7990589..9d9b2db 100644 --- a/test/graphql/breadth_exec/executor/tracers_test.rb +++ b/test/graphql/breadth_exec/executor/tracers_test.rb @@ -158,7 +158,7 @@ def test_calls_on_exception_for_unhandled_errors resolvers: resolvers, root_object: @source, tracers: [@tracer], - ).perform + ).result end assert_equal 1, @tracer.on_exception_calls.size diff --git a/test/graphql/breadth_exec/executor_test.rb b/test/graphql/breadth_exec/executor_test.rb index 93559e5..34a5331 100644 --- a/test/graphql/breadth_exec/executor_test.rb +++ b/test/graphql/breadth_exec/executor_test.rb @@ -60,21 +60,6 @@ def test_mutations_run_serially assert_equal expected, breadth_exec(document, source).dig("data") end - def test_subscriptions_not_supported - document = %|subscription { - onWriteValue { value } - }| - - expected = { - "errors" => [{ - "message" => "Unsupported operation type", - "path" => ["subscription"], - }], - } - - assert_equal expected, breadth_exec(document, {}) - end - def test_raises_not_implemented_for_missing_resolvers document = %|{ noResolver }| diff --git a/test/graphql/breadth_exec/field_resolvers_test.rb b/test/graphql/breadth_exec/field_resolvers_test.rb index d63b4c6..d0c59bf 100644 --- a/test/graphql/breadth_exec/field_resolvers_test.rb +++ b/test/graphql/breadth_exec/field_resolvers_test.rb @@ -130,6 +130,6 @@ def execute(document_string, root_object, resolvers) GraphQL.parse(document_string), resolvers: resolvers, root_object: root_object, - ).perform + ).result end end diff --git a/test/graphql/breadth_exec/has_breadth_resolver_test.rb b/test/graphql/breadth_exec/has_breadth_resolver_test.rb index f8f595c..9ed8ae8 100644 --- a/test/graphql/breadth_exec/has_breadth_resolver_test.rb +++ b/test/graphql/breadth_exec/has_breadth_resolver_test.rb @@ -155,6 +155,6 @@ def execute(schema, document, source, resolvers: GraphQL::BreadthExec::EMPTY_OBJ GraphQL.parse(document), resolvers: resolvers, root_object: source, - ).perform + ).result end end diff --git a/test/star_wars_fixtures.rb b/test/star_wars_fixtures.rb index 0f5ea74..74519af 100644 --- a/test/star_wars_fixtures.rb +++ b/test/star_wars_fixtures.rb @@ -181,5 +181,5 @@ def execute_star_wars(query, variables: {}) GraphQL.parse(query), resolvers: STAR_WARS_RESOLVERS, variables: variables, - ).perform + ).result end diff --git a/test/test_helper.rb b/test/test_helper.rb index 345878b..e4bcd56 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -28,7 +28,7 @@ def breadth_exec(query, source, variables: {}, context: {}, tracers: [GraphQL::B tracers: tracers, variables: variables, context: context, - ).perform + ).result end def assert_error_reported(expected_class, &block)