Skip to content

Commit 84c9720

Browse files
authored
GraphQL-WS crate and Warp subscriptions update (#721)
* update pre-existing juniper_warp::subscriptions * initial draft * finish up, update example * polish + timing test * fix pre-existing bug * rebase updates * address comments * add release.toml * makefile and initial changelog * add new Cargo.toml to juniper/release.toml
1 parent dc309b8 commit 84c9720

File tree

15 files changed

+1695
-237
lines changed

15 files changed

+1695
-237
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ members = [
1515
"juniper_rocket",
1616
"juniper_rocket_async",
1717
"juniper_subscriptions",
18+
"juniper_graphql_ws",
1819
"juniper_warp",
1920
"juniper_actix",
2021
]

examples/warp_subscriptions/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,6 @@ serde_json = "1.0"
1313
tokio = { version = "0.2", features = ["rt-core", "macros"] }
1414
warp = "0.2.1"
1515

16-
juniper = { git = "https://github.com/graphql-rust/juniper" }
17-
juniper_subscriptions = { git = "https://github.com/graphql-rust/juniper" }
18-
juniper_warp = { git = "https://github.com/graphql-rust/juniper", features = ["subscriptions"] }
16+
juniper = { path = "../../juniper" }
17+
juniper_graphql_ws = { path = "../../juniper_graphql_ws" }
18+
juniper_warp = { path = "../../juniper_warp", features = ["subscriptions"] }

examples/warp_subscriptions/src/main.rs

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
33
use std::{env, pin::Pin, sync::Arc, time::Duration};
44

5-
use futures::{Future, FutureExt as _, Stream};
5+
use futures::{FutureExt as _, Stream};
66
use juniper::{DefaultScalarValue, EmptyMutation, FieldError, RootNode};
7-
use juniper_subscriptions::Coordinator;
8-
use juniper_warp::{playground_filter, subscriptions::graphql_subscriptions};
7+
use juniper_graphql_ws::ConnectionConfig;
8+
use juniper_warp::{playground_filter, subscriptions::serve_graphql_ws};
99
use warp::{http::Response, Filter};
1010

1111
#[derive(Clone)]
@@ -151,30 +151,24 @@ async fn main() {
151151
let qm_state = warp::any().map(move || Context {});
152152
let qm_graphql_filter = juniper_warp::make_graphql_filter(qm_schema, qm_state.boxed());
153153

154-
let sub_state = warp::any().map(move || Context {});
155-
let coordinator = Arc::new(juniper_subscriptions::Coordinator::new(schema()));
154+
let root_node = Arc::new(schema());
156155

157156
log::info!("Listening on 127.0.0.1:8080");
158157

159158
let routes = (warp::path("subscriptions")
160159
.and(warp::ws())
161-
.and(sub_state.clone())
162-
.and(warp::any().map(move || Arc::clone(&coordinator)))
163-
.map(
164-
|ws: warp::ws::Ws,
165-
ctx: Context,
166-
coordinator: Arc<Coordinator<'static, _, _, _, _, _>>| {
167-
ws.on_upgrade(|websocket| -> Pin<Box<dyn Future<Output = ()> + Send>> {
168-
graphql_subscriptions(websocket, coordinator, ctx)
169-
.map(|r| {
170-
if let Err(e) = r {
171-
println!("Websocket error: {}", e);
172-
}
173-
})
174-
.boxed()
175-
})
176-
},
177-
))
160+
.map(move |ws: warp::ws::Ws| {
161+
let root_node = root_node.clone();
162+
ws.on_upgrade(move |websocket| async move {
163+
serve_graphql_ws(websocket, root_node, ConnectionConfig::new(Context {}))
164+
.map(|r| {
165+
if let Err(e) = r {
166+
println!("Websocket error: {}", e);
167+
}
168+
})
169+
.await
170+
})
171+
}))
178172
.map(|reply| {
179173
// TODO#584: remove this workaround
180174
warp::reply::with_header(reply, "Sec-WebSocket-Protocol", "graphql-ws")

juniper/release.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ pre-release-replacements = [
3030
{file="../juniper_warp/Cargo.toml", search="\\[dev-dependencies\\.juniper\\]\nversion = \"[^\"]+\"", replace="[dev-dependencies.juniper]\nversion = \"{{version}}\""},
3131
# Subscriptions
3232
{file="../juniper_subscriptions/Cargo.toml", search="juniper = \\{ version = \"[^\"]+\"", replace="juniper = { version = \"{{version}}\""},
33+
# GraphQL-WS
34+
{file="../juniper_graphql_ws/Cargo.toml", search="juniper = \\{ version = \"[^\"]+\"", replace="juniper = { version = \"{{version}}\""},
3335
# Actix-Web
3436
{file="../juniper_actix/Cargo.toml", search="juniper = \\{ version = \"[^\"]+\"", replace="juniper = { version = \"{{version}}\""},
3537
{file="../juniper_actix/Cargo.toml", search="\\[dev-dependencies\\.juniper\\]\nversion = \"[^\"]+\"", replace="[dev-dependencies.juniper]\nversion = \"{{version}}\""},

juniper_graphql_ws/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# master
2+
3+
- Initial Release

juniper_graphql_ws/Cargo.toml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
[package]
2+
name = "juniper_graphql_ws"
3+
version = "0.1.0"
4+
authors = ["Christopher Brown <[email protected]>"]
5+
license = "BSD-2-Clause"
6+
description = "Graphql-ws protocol implementation for Juniper"
7+
documentation = "https://docs.rs/juniper_graphql_ws"
8+
repository = "https://github.com/graphql-rust/juniper"
9+
keywords = ["graphql-ws", "juniper", "graphql", "apollo"]
10+
edition = "2018"
11+
12+
[dependencies]
13+
juniper = { version = "0.14.2", path = "../juniper", default-features = false }
14+
juniper_subscriptions = { path = "../juniper_subscriptions" }
15+
serde = { version = "1.0.8", features = ["derive"] }
16+
tokio = { version = "0.2", features = ["macros", "rt-core", "time"] }
17+
18+
[dev-dependencies]
19+
serde_json = { version = "1.0.2" }

juniper_graphql_ws/Makefile.toml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
[env]
2+
CARGO_MAKE_CARGO_ALL_FEATURES = ""
3+
4+
[tasks.build-verbose]
5+
condition = { rust_version = { min = "1.29.0" } }
6+
7+
[tasks.build-verbose.windows]
8+
condition = { rust_version = { min = "1.29.0" }, env = { "TARGET" = "x86_64-pc-windows-msvc" } }
9+
10+
[tasks.test-verbose]
11+
condition = { rust_version = { min = "1.29.0" } }
12+
13+
[tasks.test-verbose.windows]
14+
condition = { rust_version = { min = "1.29.0" }, env = { "TARGET" = "x86_64-pc-windows-msvc" } }
15+
16+
[tasks.ci-coverage-flow]
17+
condition = { rust_version = { min = "1.29.0" } }
18+
19+
[tasks.ci-coverage-flow.windows]
20+
disabled = true

juniper_graphql_ws/release.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
no-dev-version = true
2+
pre-release-commit-message = "Release {{crate_name}} {{version}}"
3+
pro-release-commit-message = "Bump {{crate_name}} version to {{next_version}}"
4+
tag-message = "Release {{crate_name}} {{version}}"
5+
upload-doc = false
6+
pre-release-replacements = [
7+
{file="src/lib.rs", search="docs.rs/juniper_graphql_ws/[a-z0-9\\.-]+", replace="docs.rs/juniper_graphql_ws/{{version}}"},
8+
]
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
use juniper::{ScalarValue, Variables};
2+
3+
/// The payload for a client's "start" message. This triggers execution of a query, mutation, or
4+
/// subscription.
5+
#[derive(Debug, Deserialize, PartialEq)]
6+
#[serde(bound(deserialize = "S: ScalarValue"))]
7+
#[serde(rename_all = "camelCase")]
8+
pub struct StartPayload<S: ScalarValue> {
9+
/// The document body.
10+
pub query: String,
11+
12+
/// The optional variables.
13+
#[serde(default)]
14+
pub variables: Variables<S>,
15+
16+
/// The optional operation name (required if the document contains multiple operations).
17+
pub operation_name: Option<String>,
18+
}
19+
20+
/// ClientMessage defines the message types that clients can send.
21+
#[derive(Debug, Deserialize, PartialEq)]
22+
#[serde(bound(deserialize = "S: ScalarValue"))]
23+
#[serde(rename_all = "snake_case")]
24+
#[serde(tag = "type")]
25+
pub enum ClientMessage<S: ScalarValue> {
26+
/// ConnectionInit is sent by the client upon connecting.
27+
ConnectionInit {
28+
/// Optional parameters of any type sent from the client. These are often used for
29+
/// authentication.
30+
#[serde(default)]
31+
payload: Variables<S>,
32+
},
33+
/// Start messages are used to execute a GraphQL operation.
34+
Start {
35+
/// The id of the operation. This can be anything, but must be unique. If there are other
36+
/// in-flight operations with the same id, the message will be ignored or cause an error.
37+
id: String,
38+
39+
/// The query, variables, and operation name.
40+
payload: StartPayload<S>,
41+
},
42+
/// Stop messages are used to unsubscribe from a subscription.
43+
Stop {
44+
/// The id of the operation to stop.
45+
id: String,
46+
},
47+
/// ConnectionTerminate is used to terminate the connection.
48+
ConnectionTerminate,
49+
}
50+
51+
#[cfg(test)]
52+
mod test {
53+
use super::*;
54+
use juniper::{DefaultScalarValue, InputValue};
55+
56+
#[test]
57+
fn test_deserialization() {
58+
type ClientMessage = super::ClientMessage<DefaultScalarValue>;
59+
60+
assert_eq!(
61+
ClientMessage::ConnectionInit {
62+
payload: [("foo".to_string(), InputValue::scalar("bar"))]
63+
.iter()
64+
.cloned()
65+
.collect(),
66+
},
67+
serde_json::from_str(r##"{"type": "connection_init", "payload": {"foo": "bar"}}"##)
68+
.unwrap(),
69+
);
70+
71+
assert_eq!(
72+
ClientMessage::ConnectionInit {
73+
payload: Variables::default(),
74+
},
75+
serde_json::from_str(r##"{"type": "connection_init"}"##).unwrap(),
76+
);
77+
78+
assert_eq!(
79+
ClientMessage::Start {
80+
id: "foo".to_string(),
81+
payload: StartPayload {
82+
query: "query MyQuery { __typename }".to_string(),
83+
variables: [("foo".to_string(), InputValue::scalar("bar"))]
84+
.iter()
85+
.cloned()
86+
.collect(),
87+
operation_name: Some("MyQuery".to_string()),
88+
},
89+
},
90+
serde_json::from_str(
91+
r##"{"type": "start", "id": "foo", "payload": {
92+
"query": "query MyQuery { __typename }",
93+
"variables": {
94+
"foo": "bar"
95+
},
96+
"operationName": "MyQuery"
97+
}}"##
98+
)
99+
.unwrap(),
100+
);
101+
102+
assert_eq!(
103+
ClientMessage::Start {
104+
id: "foo".to_string(),
105+
payload: StartPayload {
106+
query: "query MyQuery { __typename }".to_string(),
107+
variables: Variables::default(),
108+
operation_name: None,
109+
},
110+
},
111+
serde_json::from_str(
112+
r##"{"type": "start", "id": "foo", "payload": {
113+
"query": "query MyQuery { __typename }"
114+
}}"##
115+
)
116+
.unwrap(),
117+
);
118+
119+
assert_eq!(
120+
ClientMessage::Stop {
121+
id: "foo".to_string()
122+
},
123+
serde_json::from_str(r##"{"type": "stop", "id": "foo"}"##).unwrap(),
124+
);
125+
126+
assert_eq!(
127+
ClientMessage::ConnectionTerminate,
128+
serde_json::from_str(r##"{"type": "connection_terminate"}"##).unwrap(),
129+
);
130+
}
131+
}

0 commit comments

Comments
 (0)