Skip to content

Commit e3542b2

Browse files
authored
add ability to use perEventExecutor (#4211)
by exporting executeSubscriptionEvent() and adding option for to provide a custom fn addresses #894 cf. #2485 , #3071
1 parent a1d22a2 commit e3542b2

File tree

4 files changed

+90
-26
lines changed

4 files changed

+90
-26
lines changed

src/execution/__tests__/subscribe-test.ts

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@ import {
2121
import { GraphQLSchema } from '../../type/schema.js';
2222

2323
import type { ExecutionArgs } from '../execute.js';
24-
import { createSourceEventStream, subscribe } from '../execute.js';
24+
import {
25+
createSourceEventStream,
26+
executeSubscriptionEvent,
27+
subscribe,
28+
} from '../execute.js';
2529
import type { ExecutionResult } from '../types.js';
2630

2731
import { SimplePubSub } from './simplePubSub.js';
@@ -335,6 +339,45 @@ describe('Subscription Initialization Phase', () => {
335339
});
336340
});
337341

342+
it('uses a custom default perEventExecutor', async () => {
343+
const schema = new GraphQLSchema({
344+
query: DummyQueryType,
345+
subscription: new GraphQLObjectType({
346+
name: 'Subscription',
347+
fields: {
348+
foo: { type: GraphQLString },
349+
},
350+
}),
351+
});
352+
353+
async function* fooGenerator() {
354+
yield { foo: 'FooValue' };
355+
}
356+
357+
let count = 0;
358+
const subscription = subscribe({
359+
schema,
360+
document: parse('subscription { foo }'),
361+
rootValue: { foo: fooGenerator },
362+
perEventExecutor: (validatedArgs) => {
363+
count++;
364+
return executeSubscriptionEvent(validatedArgs);
365+
},
366+
});
367+
assert(isAsyncIterable(subscription));
368+
369+
expect(await subscription.next()).to.deep.equal({
370+
done: false,
371+
value: { data: { foo: 'FooValue' } },
372+
});
373+
374+
expect(await subscription.next()).to.deep.equal({
375+
done: true,
376+
value: undefined,
377+
});
378+
expect(count).to.equal(1);
379+
});
380+
338381
it('should only resolve the first field of invalid multi-field', async () => {
339382
async function* fooGenerator() {
340383
yield { foo: 'FooValue' };

src/execution/execute.ts

Lines changed: 44 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ export interface ValidatedExecutionArgs {
147147
fieldResolver: GraphQLFieldResolver<any, any>;
148148
typeResolver: GraphQLTypeResolver<any, any>;
149149
subscribeFieldResolver: GraphQLFieldResolver<any, any>;
150+
perEventExecutor: (
151+
validatedExecutionArgs: ValidatedExecutionArgs,
152+
) => PromiseOrValue<ExecutionResult>;
150153
enableEarlyExecution: boolean;
151154
}
152155

@@ -171,6 +174,11 @@ export interface ExecutionArgs {
171174
fieldResolver?: Maybe<GraphQLFieldResolver<any, any>>;
172175
typeResolver?: Maybe<GraphQLTypeResolver<any, any>>;
173176
subscribeFieldResolver?: Maybe<GraphQLFieldResolver<any, any>>;
177+
perEventExecutor?: Maybe<
178+
(
179+
validatedExecutionArgs: ValidatedExecutionArgs,
180+
) => PromiseOrValue<ExecutionResult>
181+
>;
174182
enableEarlyExecution?: Maybe<boolean>;
175183
}
176184

@@ -210,23 +218,28 @@ export function execute(args: ExecutionArgs): PromiseOrValue<ExecutionResult> {
210218
}
211219

212220
const result = experimentalExecuteIncrementally(args);
213-
if (!isPromise(result)) {
214-
if ('initialResult' in result) {
215-
// This can happen if the operation contains @defer or @stream directives
216-
// and is not validated prior to execution
217-
throw new Error(UNEXPECTED_MULTIPLE_PAYLOADS);
218-
}
219-
return result;
220-
}
221+
// Multiple payloads could be encountered if the operation contains @defer or
222+
// @stream directives and is not validated prior to execution
223+
return ensureSinglePayload(result);
224+
}
221225

222-
return result.then((incrementalResult) => {
223-
if ('initialResult' in incrementalResult) {
224-
// This can happen if the operation contains @defer or @stream directives
225-
// and is not validated prior to execution
226-
throw new Error(UNEXPECTED_MULTIPLE_PAYLOADS);
227-
}
228-
return incrementalResult;
229-
});
226+
function ensureSinglePayload(
227+
result: PromiseOrValue<
228+
ExecutionResult | ExperimentalIncrementalExecutionResults
229+
>,
230+
): PromiseOrValue<ExecutionResult> {
231+
if (isPromise(result)) {
232+
return result.then((resolved) => {
233+
if ('initialResult' in resolved) {
234+
throw new Error(UNEXPECTED_MULTIPLE_PAYLOADS);
235+
}
236+
return resolved;
237+
});
238+
}
239+
if ('initialResult' in result) {
240+
throw new Error(UNEXPECTED_MULTIPLE_PAYLOADS);
241+
}
242+
return result;
230243
}
231244

232245
/**
@@ -253,7 +266,7 @@ export function experimentalExecuteIncrementally(
253266
return { errors: validatedExecutionArgs };
254267
}
255268

256-
return executeOperation(validatedExecutionArgs);
269+
return executeQueryOrMutationOrSubscriptionEvent(validatedExecutionArgs);
257270
}
258271

259272
/**
@@ -271,7 +284,7 @@ export function experimentalExecuteIncrementally(
271284
* at which point we still log the error and null the parent field, which
272285
* in this case is the entire response.
273286
*/
274-
function executeOperation(
287+
function executeQueryOrMutationOrSubscriptionEvent(
275288
validatedExecutionArgs: ValidatedExecutionArgs,
276289
): PromiseOrValue<ExecutionResult | ExperimentalIncrementalExecutionResults> {
277290
const exeContext: ExecutionContext = {
@@ -471,6 +484,7 @@ export function validateExecutionArgs(
471484
fieldResolver,
472485
typeResolver,
473486
subscribeFieldResolver,
487+
perEventExecutor,
474488
enableEarlyExecution,
475489
} = args;
476490

@@ -544,6 +558,7 @@ export function validateExecutionArgs(
544558
fieldResolver: fieldResolver ?? defaultFieldResolver,
545559
typeResolver: typeResolver ?? defaultTypeResolver,
546560
subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver,
561+
perEventExecutor: perEventExecutor ?? executeSubscriptionEvent,
547562
enableEarlyExecution: enableEarlyExecution === true,
548563
};
549564
}
@@ -1938,21 +1953,25 @@ function mapSourceToResponse(
19381953
// For each payload yielded from a subscription, map it over the normal
19391954
// GraphQL `execute` function, with `payload` as the rootValue.
19401955
// This implements the "MapSourceToResponseEvent" algorithm described in
1941-
// the GraphQL specification. The `execute` function provides the
1942-
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
1943-
// "ExecuteQuery" algorithm, for which `execute` is also used.
1956+
// the GraphQL specification..
19441957
return mapAsyncIterable(resultOrStream, (payload: unknown) => {
19451958
const perEventExecutionArgs: ValidatedExecutionArgs = {
19461959
...validatedExecutionArgs,
19471960
rootValue: payload,
19481961
};
1949-
// typecast to ExecutionResult, not possible to return
1950-
// ExperimentalIncrementalExecutionResults when
1951-
// exeContext.operation is 'subscription'.
1952-
return executeOperation(perEventExecutionArgs) as ExecutionResult;
1962+
return validatedExecutionArgs.perEventExecutor(perEventExecutionArgs);
19531963
});
19541964
}
19551965

1966+
export function executeSubscriptionEvent(
1967+
validatedExecutionArgs: ValidatedExecutionArgs,
1968+
): PromiseOrValue<ExecutionResult> {
1969+
const result = executeQueryOrMutationOrSubscriptionEvent(
1970+
validatedExecutionArgs,
1971+
);
1972+
return ensureSinglePayload(result);
1973+
}
1974+
19561975
/**
19571976
* Implements the "CreateSourceEventStream" algorithm described in the
19581977
* GraphQL specification, resolving the subscription source event stream.

src/execution/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ export { pathToArray as responsePathAsArray } from '../jsutils/Path.js';
33
export {
44
createSourceEventStream,
55
execute,
6+
executeSubscriptionEvent,
67
experimentalExecuteIncrementally,
78
executeSync,
89
defaultFieldResolver,

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ export type {
317317
// Execute GraphQL queries.
318318
export {
319319
execute,
320+
executeSubscriptionEvent,
320321
experimentalExecuteIncrementally,
321322
executeSync,
322323
defaultFieldResolver,

0 commit comments

Comments
 (0)