Subscriptions
GraphQL Subscriptions enable a client to receive updates for a query from the server over time. Pushing update notifications from the server is a good example.
The DGS framework supports subscriptions out of the box.
The Server Side Programming Model¶
In the DGS framework a Subscription is implemented as a data fetcher with the @DgsSubscription
annotation.
The @DgsSubscription
is just short-hand for @DgsData(parentType = "Subscription")
.
The difference with a normal data fetcher is that a subscription must return a org.reactivestreams.Publisher
.
import reactor.core.publisher.Flux;
import org.reactivestreams.Publisher;
@DgsSubscription
public Publisher<Stock> stocks() {
//Create a never-ending Flux that emits an item every second
return Flux.interval(Duration.ofSeconds(1)).map({ t -> Stock("NFLX", 500 + t) })
}
The Publisher
interface is from Reactive Streams.
The Spring Framework comes with the Reactor library to work with Reactive Streams.
A complete example can be found in SubscriptionDatafetcher.java
.
WebSockets¶
The GraphQL specification doesn't specify a transport protocol. WebSockets are the most popular transport protocol however, and are supported by the DGS Framework.
The framework now supports the graphql-ws
library which uses the graphql-transport-ws
sub-protocol for websockets for both webmvc and webflux stacks.
Apollo now supports the client for this newer protocol as well.
Note that the newer graphql-ws
library name is confusing since the deprecated sub-protocol is also named graphql-ws
.
Note
The deprecated subscriptions-transport-ws
library, which uses the graphql-ws
sub-protocol is functional for backwards compatibility.
However, this implementation will no longer be actively maintained in the framework and we will be dropping support in a future release.
To enable WebSockets support for the WebMVC stack, add the following module to your build.gradle
:
implementation 'com.netflix.graphql.dgs:graphql-dgs-subscriptions-websockets-autoconfigure:latest.release'
For WebFlux, the starter already comes with support for websocket subscriptions, so no additional configuration is required.
implementation 'com.netflix.graphql.dgs:graphql-dgs-webflux-starter:latest.release'
The subscription endpoint is on /subscriptions
.
Normal GraphQL queries can be sent to /graphql
, while subscription requests go to /subscriptions
.
Apollo client supports WebSockets through a link.
Typically, you want to configure Apollo Client with both an HTTP link and a WS link, and split between them based on the query type.
A simple example of using the Apollo client can be found in the example project of the DGS Framework repository.
Unit Testing Subscriptions¶
Similar to a "normal" data fetcher test, you use the DgsQueryExecutor
to execute a query.
Just like a normal query, this results in a ExecutionResult
.
Instead of returning a result directly in the getData()
method, a subscription query returns a Publisher
.
A Publisher
can be asserted using the testing capabilities from Reactor.
Each onNext
of the Publisher
is another ExecutionResult
.
This ExecutionResult
contains the actual data!
It might take a minute to wrap your head around the concept of this nested ExecutionResult
, but it gives an excellent way to test Subscriptions, including corner cases.
The following is a simple example of such a test.
The example tests the stocks
subscription from above.
The stocks
subscription produces a result every second, so the test uses VirtualTime
to fast-forward time, without needing to wait in the test.
Also note that the emitted ExecutionResult
returns a Map<String, Object>
, and not the Java type that your data fetcher returns.
Use the Jackson Objectmapper
to convert the map to a Java object.
@SpringBootTest(classes = {DgsAutoConfiguration.class, SubscriptionDataFetcher.class})
class SubscriptionDataFetcherTest {
@Autowired
DgsQueryExecutor queryExecutor;
ObjectMapper objectMapper = new ObjectMapper();
@Test
void stocks() {
ExecutionResult executionResult = queryExecutor.execute("subscription Stocks { stocks { name, price } }");
Publisher<ExecutionResult> publisher = executionResult.getData();
VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create();
StepVerifier.withVirtualTime(() -> publisher, 3)
.expectSubscription()
.thenRequest(3)
.assertNext(result -> assertThat(toStock(result).getPrice()).isEqualTo(500))
.assertNext(result -> assertThat(toStock(result).getPrice()).isEqualTo(501))
.assertNext(result -> assertThat(toStock(result).getPrice()).isEqualTo(502))
.thenCancel()
.verify();
}
private Stock toStock(ExecutionResult result) {
Map<String, Object> data = result.getData();
return objectMapper.convertValue(data.get("stocks"), Stock.class);
}
}
In this example the subscription works in isolation; it just emits a result every second. In other scenarios a subscription could depend on something else happening in the system, such as the processing of a mutation. Such scenarios are easy to set up in a unit test, simply run multiple queries/mutations in your test to see it all work together.
Notice that the unit tests really only test your code. It doesn't care about transport protocols. This is exactly what you need for your tests, because your tests should focus on testing your code, not the framework code.
Integration testing subscriptions¶
Although most subscription logic should be tested in unit tests, it can be useful to test end-to-end with a client.
This can be achieved with the DGS client, and works well in a @SpringBootTest
with a random port.
The example below starts a subscription, and sends to mutations that should result in updates on the subscription.
The example uses Websockets, but the same can be done for SSE.
The code for this example can be found in the example project.
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ReviewSubscriptionIntegrationTest {
@LocalServerPort
private Integer port;
private WebSocketGraphQLClient webSocketGraphQLClient;
private MonoGraphQLClient graphQLClient;
private MonoRequestExecutor requestExecutor = (url, headers, body) -> WebClient.create(url)
.post()
.bodyValue(body)
.headers(consumer -> headers.forEach(consumer::addAll))
.exchangeToMono(r -> r.bodyToMono(String.class).map(responseBody -> new HttpResponse(r.rawStatusCode(), responseBody, r.headers().asHttpHeaders())));
@BeforeEach
public void setup() {
webSocketGraphQLClient = new WebSocketGraphQLClient("ws://localhost:" + port + "/subscriptions", new ReactorNettyWebSocketClient());
graphQLClient = new DefaultGraphQLClient("http://localhost:" + port + "/graphql");
}
@Test
public void testWebSocketSubscription() {
GraphQLQueryRequest subscriptionRequest = new GraphQLQueryRequest(
ReviewAddedGraphQLQuery.newRequest().showId(1).build(),
new ReviewAddedProjectionRoot().starScore()
);
GraphQLQueryRequest addReviewMutation1 = new GraphQLQueryRequest(
AddReviewGraphQLQuery.newRequest().review(SubmittedReview.newBuilder().showId(1).starScore(5).username("DGS User").build()).build(),
new AddReviewProjectionRoot().starScore()
);
GraphQLQueryRequest addReviewMutation2 = new GraphQLQueryRequest(
AddReviewGraphQLQuery.newRequest().review(SubmittedReview.newBuilder().showId(1).starScore(3).username("DGS User").build()).build(),
new AddReviewProjectionRoot().starScore()
);
Flux<Integer> starScore = webSocketGraphQLClient.reactiveExecuteQuery(subscriptionRequest.serialize(), Collections.emptyMap()).map(r -> r.extractValue("reviewAdded.starScore"));
StepVerifier.create(starScore)
.thenAwait(Duration.ofSeconds(1)) //This await is necessary because of issue [#657](https://github.com/Netflix/dgs-framework/issues/657)
.then(() -> {
graphQLClient.reactiveExecuteQuery(addReviewMutation1.serialize(), Collections.emptyMap(), requestExecutor).block();
})
.then(() ->
graphQLClient.reactiveExecuteQuery(addReviewMutation2.serialize(), Collections.emptyMap(), requestExecutor).block())
.expectNext(5)
.expectNext(3)
.thenCancel()
.verify();
}
}