Skip to content

GraphQL-WS crate and Warp subscriptions update #721

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jul 29, 2020

Conversation

ccbrown
Copy link
Contributor

@ccbrown ccbrown commented Jul 26, 2020

This add a new crate: juniper_graphql_ws. This crate implements the Apollo graphql-ws protocol.

This is a replacement for the implementation that was previously in juniper_warp. A few of the differences:

  • Simplifies SubscriptionConnection. There is a dedicated PR for this here: Simplify SubscriptionConnection #719. This can be reviewed separately, or that PR can be closed and everything can just be reviewed at once. (Edit: This was merged separately.)
  • Errors are now correctly emitted for field resolution errors within subscription event streams. Previously they were just being silently dropped inside of juniper_subscriptions::whole_responses_stream. This and the first bullet are the only changes outside of the new crate, juniper_warp::subscriptions mod, and the warp_subscriptions example.
  • Types for all graphql-ws message types have been added. This makes a host of things much easier than before, and makes serialization and deserialization much safer and more robust. For example, previously, if a client sent a "}" in a message id, we would do bad things because the implementation was crafting responses using format!.
  • The server now responds to "connection_init" with the appropriate "ack" + "keep-alive".
  • The server now sends keep-alives. These default to a 30 second interval, but are configurable.
  • Queries and mutations can now be executed over the same WebSocket connection.
  • Stopping a subscription now has well-defined timing. All work is immediately dropped and the "complete" message is not sent until we can guarantee that no other events will be sent.
  • We now have more reasonable behavior for "weird things" such as clients using the same id for multiple subscriptions, not sending "init" packets, etc. Clients must send an "init" packet before anything else happens, and multiple subscriptions with the same id do not start multiple unstoppable background streams.
  • We now correctly send "error" messages for pre-execution errors and "data" packets for errors that happen during execution.
  • We now correctly send "connection_error" messages for issues not associated with operations such as malformed client messages.
  • There is now a configurable maximum number of operations a single connection can have at any time.
  • Parameters from the client's "connection_init" message are now parsed and can be used as parameters for context creation. Because we actually require the "connection_init" message now, no context is required until this point. This enables support for authentication via this message.
  • Serialization and deserialization are defined outside of the crate, so users can serialize in the way that's most optimal for them. You don't even have to use JSON.
  • There are now unit tests, and lots of them. Because the entire implementation is contained by a single Sink + Stream and there are proper message types, writing unit tests is a breeze.

I suspect there are more. The previous implementation was really rough. But on to the usage.

Usage

I'm just going to copy a unit test:

#[tokio::test]
async fn test_query() {
    let mut conn = Connection::new(
        new_test_schema(),
        ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_secs(0)),
    );

    conn.send(ClientMessage::ConnectionInit {
        payload: Variables::default(),
    })
    .await
    .unwrap();

    assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap());

    conn.send(ClientMessage::Start {
        id: "foo".to_string(),
        payload: StartPayload {
            query: "{context}".to_string(),
            variables: Variables::default(),
            operation_name: None,
        },
    })
    .await
    .unwrap();

    assert_eq!(
        ServerMessage::Data {
            id: "foo".to_string(),
            payload: DataPayload {
                data: Value::Object(
                    [("context", Value::Scalar(DefaultScalarValue::Int(1)))]
                        .iter()
                        .cloned()
                        .collect()
                ),
                errors: vec![],
            },
        },
        conn.next().await.unwrap()
    );

    assert_eq!(
        ServerMessage::Complete {
            id: "foo".to_string(),
        },
        conn.next().await.unwrap()
    );
}

It couldn't really be any easier.

The second argument to Connection::new is generic. Instead of ConnectionConfig, you can provide a closure:

let mut conn = Connection::new(new_test_schema(), |params: Variables| async move {
    Ok(ConnectionConfig::new(Context(1))) as Result<_, Infallible>
});

This allows you to defer the configuration until you receive the "connection_init" parameters. Any error returned by this closure is sent back the client.

The entire juniper_warp::subscriptions module is now just a few lines to compose the sinks and streams:

/// Serves the graphql-ws protocol over a WebSocket connection.
///
/// The `init` argument is used to provide the context and additional configuration for
/// connections. This can be a `juniper_graphql_ws::ConnectionConfig` if the context and
/// configuration are already known, or it can be a closure that gets executed asynchronously
/// when the client sends the ConnectionInit message. Using a closure allows you to perform
/// authentication based on the parameters provided by the client.
pub async fn serve_graphql_ws<Query, Mutation, Subscription, CtxT, S, I>(
    websocket: warp::ws::WebSocket,
    root_node: Arc<RootNode<'static, Query, Mutation, Subscription, S>>,
    init: I,
) -> Result<(), Error>
where
    Query: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
    Query::TypeInfo: Send + Sync,
    Mutation: GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
    Mutation::TypeInfo: Send + Sync,
    Subscription: GraphQLSubscriptionType<S, Context = CtxT> + Send + 'static,
    Subscription::TypeInfo: Send + Sync,
    CtxT: Unpin + Send + Sync + 'static,
    S: ScalarValue + Send + Sync + 'static,
    I: Init<S, CtxT> + Send,
{
    let (ws_tx, ws_rx) = websocket.split();
    let (s_tx, s_rx) = Connection::new(ArcSchema(root_node), init).split();

    let ws_rx = ws_rx.map(|r| r.map(|msg| Message(msg)));
    let s_rx = s_rx.map(|msg| {
        serde_json::to_string(&msg)
            .map(|t| warp::ws::Message::text(t))
            .map_err(|e| Error::Serde(e))
    });

    match future::select(
        ws_rx.forward(s_tx.sink_err_into()),
        s_rx.forward(ws_tx.sink_err_into()),
    )
    .await
    {
        Either::Left((r, _)) => r.map_err(|e| e.into()),
        Either::Right((r, _)) => r,
    }
}

So adding support for the other frameworks should be pretty trivial now and they can all share one implementation.

Related Issues

Copy link
Contributor

@mihai-dinculescu mihai-dinculescu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great!

},
))
.map(move |ws: warp::ws::Ws| {
let root_node = root_node.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to put this on the route?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seemed simpler to me, but if someone with more Warp expertise tells me it would be better for whatever reason (demonstration purposes?), I'm happy to change it.

@LegNeato
Copy link
Member

LegNeato commented Jul 29, 2020

Thanks for this! Will look in-depth in a bit. One thing I am worried about is you can use subscriptions without graphql-ws...it is not (yet?) part of the spec.

So I want to make sure we have "open-ended" subscription support for people to roll their own protocol as well as practical opinionated support like graphql-ws.

@ccbrown
Copy link
Contributor Author

ccbrown commented Jul 29, 2020

Yep, I'm definitely not reducing functionality of the general purpose crates. You can still use juniper_subscriptions and juniper_warp without graphql-ws.

Copy link
Member

@LegNeato LegNeato left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great, just some small comments.

I invited you as a maintainer as this is high-quality work (thank you! Love the tests) and I don't personally use subscriptions. I don't want to be a blocker for you making improvements here!

.boxed();
s = s
.chain(stream::unfold((), move |_| async move {
tokio::time::delay_for(keep_alive_interval).await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a huge bummer to use tokio just for a delay...AFAICT everything else is executor-agnostic. Is there a generic delay that only depends on futures?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I looked for a bit before adding the tokio dependency, but couldn't find anything. :(

async fn start(id: String, params: ExecutionParams<S>) -> BoxStream<'static, Reaction<S>> {
// TODO: This could be made more efficient if juniper exposed functionality to allow us to
// parse and validate the query, determine whether it's a subscription, and then execute
// it. For now, the query gets parsed and validated twice.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you file an issue to track?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing. Will do once merged.

stream::iter(vec![
Reaction::ServerMessage(ServerMessage::Error {
id: id.clone(),
payload: GraphQLError::ValidationError(vec![
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a new error type here? I guess it is technically validation...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷 I don't really feel strongly either way.

@ccbrown
Copy link
Contributor Author

ccbrown commented Jul 29, 2020

@LegNeato mind taking another look over the last few commits? I added what appeared necessary to make CI happy, but I'm not super confident as I don't know much about the repo's release process.

If everything looks good to you, I think this can be merged now.

@LegNeato LegNeato merged commit 84c9720 into graphql-rust:master Jul 29, 2020
@LegNeato
Copy link
Member

Looks great! 🚀 🍾

@tyranron
Copy link
Member

tyranron commented Jul 29, 2020

@ccbrown does this GraphQL over WebSocket protocol implementation support queries and mutations too? Or just subscriptions?

@ccbrown
Copy link
Contributor Author

ccbrown commented Jul 29, 2020

@ccbrown does this GraphQL over WebSocket protocol implementation support queries and mutations too? Or just subscriptions?

It supports queries and mutations too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

juniper_warp::subscriptions rewrite wishlist
4 participants