Skip to content

add ability to use perEventExecutor #4211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion src/execution/__tests__/subscribe-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ import {
import { GraphQLSchema } from '../../type/schema.js';

import type { ExecutionArgs } from '../execute.js';
import { createSourceEventStream, subscribe } from '../execute.js';
import {
createSourceEventStream,
executeSubscriptionEvent,
subscribe,
} from '../execute.js';
import type { ExecutionResult } from '../types.js';

import { SimplePubSub } from './simplePubSub.js';
Expand Down Expand Up @@ -335,6 +339,45 @@ describe('Subscription Initialization Phase', () => {
});
});

it('uses a custom default perEventExecutor', async () => {
const schema = new GraphQLSchema({
query: DummyQueryType,
subscription: new GraphQLObjectType({
name: 'Subscription',
fields: {
foo: { type: GraphQLString },
},
}),
});

async function* fooGenerator() {
yield { foo: 'FooValue' };
}

let count = 0;
const subscription = subscribe({
schema,
document: parse('subscription { foo }'),
rootValue: { foo: fooGenerator },
perEventExecutor: (validatedArgs) => {
count++;
return executeSubscriptionEvent(validatedArgs);
},
});
assert(isAsyncIterable(subscription));

expect(await subscription.next()).to.deep.equal({
done: false,
value: { data: { foo: 'FooValue' } },
});

expect(await subscription.next()).to.deep.equal({
done: true,
value: undefined,
});
expect(count).to.equal(1);
});

it('should only resolve the first field of invalid multi-field', async () => {
async function* fooGenerator() {
yield { foo: 'FooValue' };
Expand Down
69 changes: 44 additions & 25 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ export interface ValidatedExecutionArgs {
fieldResolver: GraphQLFieldResolver<any, any>;
typeResolver: GraphQLTypeResolver<any, any>;
subscribeFieldResolver: GraphQLFieldResolver<any, any>;
perEventExecutor: (
validatedExecutionArgs: ValidatedExecutionArgs,
) => PromiseOrValue<ExecutionResult>;
enableEarlyExecution: boolean;
}

Expand All @@ -171,6 +174,11 @@ export interface ExecutionArgs {
fieldResolver?: Maybe<GraphQLFieldResolver<any, any>>;
typeResolver?: Maybe<GraphQLTypeResolver<any, any>>;
subscribeFieldResolver?: Maybe<GraphQLFieldResolver<any, any>>;
perEventExecutor?: Maybe<
(
validatedExecutionArgs: ValidatedExecutionArgs,
) => PromiseOrValue<ExecutionResult>
>;
enableEarlyExecution?: Maybe<boolean>;
}

Expand Down Expand Up @@ -210,23 +218,28 @@ export function execute(args: ExecutionArgs): PromiseOrValue<ExecutionResult> {
}

const result = experimentalExecuteIncrementally(args);
if (!isPromise(result)) {
if ('initialResult' in result) {
// This can happen if the operation contains @defer or @stream directives
// and is not validated prior to execution
throw new Error(UNEXPECTED_MULTIPLE_PAYLOADS);
}
return result;
}
// Multiple payloads could be encountered if the operation contains @defer or
// @stream directives and is not validated prior to execution
return ensureSinglePayload(result);
}

return result.then((incrementalResult) => {
if ('initialResult' in incrementalResult) {
// This can happen if the operation contains @defer or @stream directives
// and is not validated prior to execution
throw new Error(UNEXPECTED_MULTIPLE_PAYLOADS);
}
return incrementalResult;
});
function ensureSinglePayload(
result: PromiseOrValue<
ExecutionResult | ExperimentalIncrementalExecutionResults
>,
): PromiseOrValue<ExecutionResult> {
if (isPromise(result)) {
return result.then((resolved) => {
if ('initialResult' in resolved) {
throw new Error(UNEXPECTED_MULTIPLE_PAYLOADS);
}
return resolved;
});
}
if ('initialResult' in result) {
throw new Error(UNEXPECTED_MULTIPLE_PAYLOADS);
}
return result;
}

/**
Expand All @@ -253,7 +266,7 @@ export function experimentalExecuteIncrementally(
return { errors: validatedExecutionArgs };
}

return executeOperation(validatedExecutionArgs);
return executeQueryOrMutationOrSubscriptionEvent(validatedExecutionArgs);
}

/**
Expand All @@ -271,7 +284,7 @@ export function experimentalExecuteIncrementally(
* at which point we still log the error and null the parent field, which
* in this case is the entire response.
*/
function executeOperation(
function executeQueryOrMutationOrSubscriptionEvent(
validatedExecutionArgs: ValidatedExecutionArgs,
): PromiseOrValue<ExecutionResult | ExperimentalIncrementalExecutionResults> {
const exeContext: ExecutionContext = {
Expand Down Expand Up @@ -471,6 +484,7 @@ export function validateExecutionArgs(
fieldResolver,
typeResolver,
subscribeFieldResolver,
perEventExecutor,
enableEarlyExecution,
} = args;

Expand Down Expand Up @@ -544,6 +558,7 @@ export function validateExecutionArgs(
fieldResolver: fieldResolver ?? defaultFieldResolver,
typeResolver: typeResolver ?? defaultTypeResolver,
subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver,
perEventExecutor: perEventExecutor ?? executeSubscriptionEvent,
enableEarlyExecution: enableEarlyExecution === true,
};
}
Expand Down Expand Up @@ -1938,21 +1953,25 @@ function mapSourceToResponse(
// For each payload yielded from a subscription, map it over the normal
// GraphQL `execute` function, with `payload` as the rootValue.
// This implements the "MapSourceToResponseEvent" algorithm described in
// the GraphQL specification. The `execute` function provides the
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
// "ExecuteQuery" algorithm, for which `execute` is also used.
// the GraphQL specification..
return mapAsyncIterable(resultOrStream, (payload: unknown) => {
const perEventExecutionArgs: ValidatedExecutionArgs = {
...validatedExecutionArgs,
rootValue: payload,
};
// typecast to ExecutionResult, not possible to return
// ExperimentalIncrementalExecutionResults when
// exeContext.operation is 'subscription'.
return executeOperation(perEventExecutionArgs) as ExecutionResult;
return validatedExecutionArgs.perEventExecutor(perEventExecutionArgs);
});
}

export function executeSubscriptionEvent(
validatedExecutionArgs: ValidatedExecutionArgs,
): PromiseOrValue<ExecutionResult> {
const result = executeQueryOrMutationOrSubscriptionEvent(
validatedExecutionArgs,
);
return ensureSinglePayload(result);
}

/**
* Implements the "CreateSourceEventStream" algorithm described in the
* GraphQL specification, resolving the subscription source event stream.
Expand Down
1 change: 1 addition & 0 deletions src/execution/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ export { pathToArray as responsePathAsArray } from '../jsutils/Path.js';
export {
createSourceEventStream,
execute,
executeSubscriptionEvent,
experimentalExecuteIncrementally,
executeSync,
defaultFieldResolver,
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ export type {
// Execute GraphQL queries.
export {
execute,
executeSubscriptionEvent,
experimentalExecuteIncrementally,
executeSync,
defaultFieldResolver,
Expand Down
Loading