Subscriptions
GraphQL Subscriptions enable a client to receive updates for a query from the server over time. Pushing update notifications from the server is a good example.
The DGS framework supports subscriptions out of the box.
The Server Side Programming Model¶
In the DGS framework a Subscription is implemented as a data fetcher with the @DgsSubscription
annotation.
The @DgsSubscription
is just short-hand for @DgsData(parentType = "Subscription")
.
The difference with a normal data fetcher is that a subscription must return a org.reactivestreams.Publisher
.
import reactor.core.publisher.Flux;
import org.reactivestreams.Publisher;
@DgsSubscription
public Publisher<Stock> stocks() {
//Create a never-ending Flux that emits an item every second
return Flux.interval(Duration.ofSeconds(1)).map({ t -> Stock("NFLX", 500 + t) })
}
The Publisher
interface is from Reactive Streams.
The Spring Framework comes with the Reactor library to work with Reactive Streams.
A complete example can be found in SubscriptionDatafetcher.java
.
WebSockets¶
Spring for GraphQL provides the WebSockets transport layer for subscriptions.
The framework supports the graphql-ws
library which uses the graphql-transport-ws
sub-protocol for Websockets for both WebMVC and Webflux stacks.
To enable WebSockets you need to explicitly set the spring.graphql.websocket.path
property.
On WebMVC you also have to add the following dependency. No extra dependency is needed for Webflux.
implementation 'org.springframework.boot:spring-boot-starter-websocket'
Server Sent Events¶
Server Sent Events are also supported by the Spring for GraphQL transport layer, as an alternative to Websockets.
This can be useful for environments where only HTTP is supported, and Websockets is not an option.
No extra configuration or dependencies are needed for SSE.
To sent SSE requests you use the regular /graphql
endpoint, but with the text/event-stream
media type.
Unit Testing Subscriptions¶
Similar to a "normal" data fetcher test, you use the DgsQueryExecutor
to execute a query.
Just like a normal query, this results in a ExecutionResult
.
Instead of returning a result directly in the getData()
method, a subscription query returns a Publisher
.
A Publisher
can be asserted using the testing capabilities from Reactor.
Each onNext
of the Publisher
is another ExecutionResult
.
This ExecutionResult
contains the actual data!
It might take a minute to wrap your head around the concept of this nested ExecutionResult
, but it gives an excellent way to test Subscriptions, including corner cases.
The following is a simple example of such a test.
The example tests the stocks
subscription from above.
The stocks
subscription produces a result every second, so the test uses VirtualTime
to fast-forward time, without needing to wait in the test.
Also note that the emitted ExecutionResult
returns a Map<String, Object>
, and not the Java type that your data fetcher returns.
Use the Jackson Objectmapper
to convert the map to a Java object.
@SpringBootTest(classes = {SubscriptionDataFetcher.class, DgsExtendedScalarsAutoConfiguration.class, DgsPaginationAutoConfiguration.class, UploadScalar.class})
@EnableDgsTest
class SubscriptionDataFetcherTest {
@Autowired
DgsQueryExecutor queryExecutor;
ObjectMapper objectMapper = new ObjectMapper();
@Test
void stocks() {
ExecutionResult executionResult = queryExecutor.execute("subscription Stocks { stocks { name, price } }");
Publisher<ExecutionResult> publisher = executionResult.getData();
VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create();
StepVerifier.withVirtualTime(() -> publisher, 3)
.expectSubscription()
.thenRequest(3)
.assertNext(result -> assertThat(toStock(result).getPrice()).isEqualTo(500))
.assertNext(result -> assertThat(toStock(result).getPrice()).isEqualTo(501))
.assertNext(result -> assertThat(toStock(result).getPrice()).isEqualTo(502))
.thenCancel()
.verify();
}
private Stock toStock(ExecutionResult result) {
Map<String, Object> data = result.getData();
return objectMapper.convertValue(data.get("stocks"), Stock.class);
}
}
In this example the subscription works in isolation; it just emits a result every second. In other scenarios a subscription could depend on something else happening in the system, such as the processing of a mutation. Such scenarios are easy to set up in a unit test, simply run multiple queries/mutations in your test to see it all work together.
Notice that the unit tests really only test your code. It doesn't care about transport protocols. This is exactly what you need for your tests, because your tests should focus on testing your code, not the framework code.
Integration testing subscriptions¶
Although most subscription logic should be tested in unit tests, it can be useful to test end-to-end with a client.
This can be achieved with the DGS client, and works well in a @SpringBootTest
with a random port, and the WebSocketGraphQLTester
.
The example below starts a subscription, and sends to mutations that should result in updates on the subscription.
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class SubscriptionsGraphQlTesterTest {
@LocalServerPort
private int port;
@Value("http://localhost:${local.server.port}/graphql")
private String baseUrl;
private GraphQlTester graphQlTester;
@BeforeEach
void setUp() {
URI url = URI.create(baseUrl);
this.graphQlTester = WebSocketGraphQlTester.builder(url, new ReactorNettyWebSocketClient()).build();
}
@Test
void stocks() {
Flux<Stock> result = graphQlTester.document("subscription Stocks { stocks { name, price } }").executeSubscription().toFlux("stocks", Stock.class);
StepVerifier.create(result)
.assertNext(res -> Assertions.assertThat(res.getPrice()).isEqualTo(500))
.assertNext(res -> Assertions.assertThat(res.getPrice()).isEqualTo(501))
.assertNext(res -> Assertions.assertThat(res.getPrice()).isEqualTo(502))
.thenCancel()
.verify();
}
}