diff --git a/src/execution/__tests__/subscribe-test.ts b/src/execution/__tests__/subscribe-test.ts index e6faca31e5..e3e7f6042f 100644 --- a/src/execution/__tests__/subscribe-test.ts +++ b/src/execution/__tests__/subscribe-test.ts @@ -21,7 +21,11 @@ import { import { GraphQLSchema } from '../../type/schema.js'; import type { ExecutionArgs } from '../execute.js'; -import { createSourceEventStream, subscribe } from '../execute.js'; +import { + createSourceEventStream, + executeSubscriptionEvent, + subscribe, +} from '../execute.js'; import type { ExecutionResult } from '../types.js'; import { SimplePubSub } from './simplePubSub.js'; @@ -335,6 +339,45 @@ describe('Subscription Initialization Phase', () => { }); }); + it('uses a custom default perEventExecutor', async () => { + const schema = new GraphQLSchema({ + query: DummyQueryType, + subscription: new GraphQLObjectType({ + name: 'Subscription', + fields: { + foo: { type: GraphQLString }, + }, + }), + }); + + async function* fooGenerator() { + yield { foo: 'FooValue' }; + } + + let count = 0; + const subscription = subscribe({ + schema, + document: parse('subscription { foo }'), + rootValue: { foo: fooGenerator }, + perEventExecutor: (validatedArgs) => { + count++; + return executeSubscriptionEvent(validatedArgs); + }, + }); + assert(isAsyncIterable(subscription)); + + expect(await subscription.next()).to.deep.equal({ + done: false, + value: { data: { foo: 'FooValue' } }, + }); + + expect(await subscription.next()).to.deep.equal({ + done: true, + value: undefined, + }); + expect(count).to.equal(1); + }); + it('should only resolve the first field of invalid multi-field', async () => { async function* fooGenerator() { yield { foo: 'FooValue' }; diff --git a/src/execution/execute.ts b/src/execution/execute.ts index b153fe2e51..080d5aa8a7 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -147,6 +147,9 @@ export interface ValidatedExecutionArgs { fieldResolver: GraphQLFieldResolver; typeResolver: GraphQLTypeResolver; subscribeFieldResolver: GraphQLFieldResolver; + perEventExecutor: ( + validatedExecutionArgs: ValidatedExecutionArgs, + ) => PromiseOrValue; enableEarlyExecution: boolean; } @@ -171,6 +174,11 @@ export interface ExecutionArgs { fieldResolver?: Maybe>; typeResolver?: Maybe>; subscribeFieldResolver?: Maybe>; + perEventExecutor?: Maybe< + ( + validatedExecutionArgs: ValidatedExecutionArgs, + ) => PromiseOrValue + >; enableEarlyExecution?: Maybe; } @@ -210,23 +218,28 @@ export function execute(args: ExecutionArgs): PromiseOrValue { } const result = experimentalExecuteIncrementally(args); - if (!isPromise(result)) { - if ('initialResult' in result) { - // This can happen if the operation contains @defer or @stream directives - // and is not validated prior to execution - throw new Error(UNEXPECTED_MULTIPLE_PAYLOADS); - } - return result; - } + // Multiple payloads could be encountered if the operation contains @defer or + // @stream directives and is not validated prior to execution + return ensureSinglePayload(result); +} - return result.then((incrementalResult) => { - if ('initialResult' in incrementalResult) { - // This can happen if the operation contains @defer or @stream directives - // and is not validated prior to execution - throw new Error(UNEXPECTED_MULTIPLE_PAYLOADS); - } - return incrementalResult; - }); +function ensureSinglePayload( + result: PromiseOrValue< + ExecutionResult | ExperimentalIncrementalExecutionResults + >, +): PromiseOrValue { + if (isPromise(result)) { + return result.then((resolved) => { + if ('initialResult' in resolved) { + throw new Error(UNEXPECTED_MULTIPLE_PAYLOADS); + } + return resolved; + }); + } + if ('initialResult' in result) { + throw new Error(UNEXPECTED_MULTIPLE_PAYLOADS); + } + return result; } /** @@ -253,7 +266,7 @@ export function experimentalExecuteIncrementally( return { errors: validatedExecutionArgs }; } - return executeOperation(validatedExecutionArgs); + return executeQueryOrMutationOrSubscriptionEvent(validatedExecutionArgs); } /** @@ -271,7 +284,7 @@ export function experimentalExecuteIncrementally( * at which point we still log the error and null the parent field, which * in this case is the entire response. */ -function executeOperation( +function executeQueryOrMutationOrSubscriptionEvent( validatedExecutionArgs: ValidatedExecutionArgs, ): PromiseOrValue { const exeContext: ExecutionContext = { @@ -471,6 +484,7 @@ export function validateExecutionArgs( fieldResolver, typeResolver, subscribeFieldResolver, + perEventExecutor, enableEarlyExecution, } = args; @@ -544,6 +558,7 @@ export function validateExecutionArgs( fieldResolver: fieldResolver ?? defaultFieldResolver, typeResolver: typeResolver ?? defaultTypeResolver, subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver, + perEventExecutor: perEventExecutor ?? executeSubscriptionEvent, enableEarlyExecution: enableEarlyExecution === true, }; } @@ -1938,21 +1953,25 @@ function mapSourceToResponse( // For each payload yielded from a subscription, map it over the normal // GraphQL `execute` function, with `payload` as the rootValue. // This implements the "MapSourceToResponseEvent" algorithm described in - // the GraphQL specification. The `execute` function provides the - // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the - // "ExecuteQuery" algorithm, for which `execute` is also used. + // the GraphQL specification.. return mapAsyncIterable(resultOrStream, (payload: unknown) => { const perEventExecutionArgs: ValidatedExecutionArgs = { ...validatedExecutionArgs, rootValue: payload, }; - // typecast to ExecutionResult, not possible to return - // ExperimentalIncrementalExecutionResults when - // exeContext.operation is 'subscription'. - return executeOperation(perEventExecutionArgs) as ExecutionResult; + return validatedExecutionArgs.perEventExecutor(perEventExecutionArgs); }); } +export function executeSubscriptionEvent( + validatedExecutionArgs: ValidatedExecutionArgs, +): PromiseOrValue { + const result = executeQueryOrMutationOrSubscriptionEvent( + validatedExecutionArgs, + ); + return ensureSinglePayload(result); +} + /** * Implements the "CreateSourceEventStream" algorithm described in the * GraphQL specification, resolving the subscription source event stream. diff --git a/src/execution/index.ts b/src/execution/index.ts index 702a214dc2..98b870a8a5 100644 --- a/src/execution/index.ts +++ b/src/execution/index.ts @@ -3,6 +3,7 @@ export { pathToArray as responsePathAsArray } from '../jsutils/Path.js'; export { createSourceEventStream, execute, + executeSubscriptionEvent, experimentalExecuteIncrementally, executeSync, defaultFieldResolver, diff --git a/src/index.ts b/src/index.ts index 96ecb2d39a..a9d282d8a4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -317,6 +317,7 @@ export type { // Execute GraphQL queries. export { execute, + executeSubscriptionEvent, experimentalExecuteIncrementally, executeSync, defaultFieldResolver,