Date: November 18 2023
1. Combining Multiple Reactive Streams: zip and flatMap
In real apps, data often comes from multiple reactive sources that you need to combine.
Example: Combine User Info and User Orders
Suppose you have two reactive services:
Mono<User> getUserById(Long id);
Flux<Order> getOrdersForUser(Long userId);
You want to create a combined DTO:
public class UserOrders {
private User user;
private List<Order> orders;
// constructors, getters, setters
}
Reactive Combination Using zip and flatMap
public Mono<UserOrders> getUserOrders(Long userId) {
Mono<User> userMono = getUserById(userId);
Flux<Order> ordersFlux = getOrdersForUser(userId);
return userMono.flatMap(user ->
ordersFlux.collectList()
.map(orders -> new UserOrders(user, orders))
);
}
flatMapunwraps the user Mono.collectList()aggregates the Flux of orders into aList<Order>.- Finally, a new
UserOrdersobject is returned wrapped in a Mono.
Alternatively, if you had two Monos, you could use Mono.zip:
Mono<User> userMono = getUserById(userId);
Mono<List<Order>> ordersMono = getOrdersForUser(userId).collectList();
return Mono.zip(userMono, ordersMono)
.map(tuple -> new UserOrders(tuple.getT1(), tuple.getT2()));
2. Advanced Error Handling Strategies
Graceful Fallbacks with onErrorResume
Instead of failing the whole stream, fallback to a default value or alternative flow:
@GetMapping("/user/{id}")
public Mono<User> getUserSafe(@PathVariable Long id) {
return getUserById(id)
.onErrorResume(e -> {
// Log the error and return default user
log.error("User fetch failed", e);
return Mono.just(new User(-1L, "Guest User"));
});
}
Conditional Error Handling
You can filter on exception types:
.onErrorResume(WebClientResponseException.NotFound.class, e -> Mono.empty())
3. Controlling Backpressure
Backpressure helps prevent your system from being overwhelmed.
For example, when consuming a large stream of events:
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Event> streamEvents() {
return eventService.getEvents()
.onBackpressureBuffer(100) // Buffer max 100 events
.onBackpressureDrop(event -> log.warn("Dropped event: " + event));
}
You can:
- Buffer a limited number of events (
onBackpressureBuffer) - Drop events when overwhelmed (
onBackpressureDrop) - Switch to latest values only (
onBackpressureLatest)
4. Integrating with External Reactive Services
Spring WebFlux works great with reactive HTTP clients like WebClient.
Example: Calling an External API Reactively
@Autowired
private WebClient webClient;
public Mono<Weather> getWeather(String city) {
return webClient.get()
.uri("https://api.weather.com/{city}", city)
.retrieve()
.bodyToMono(Weather.class)
.timeout(Duration.ofSeconds(2)) // Timeout if slow
.retry(3) // Retry up to 3 times
.onErrorResume(e -> Mono.just(new Weather("Unavailable", 0)));
}
timeout()sets max wait time.retry()retries transient errors.onErrorResume()provides fallback data.
5. Functional Endpoints (Alternative to Annotations)
Spring WebFlux supports a functional programming model for defining routes and handlers:
@Bean
public RouterFunction<ServerResponse> route(UserHandler handler) {
return RouterFunctions.route(GET("/users/{id}"), handler::getUserById)
.andRoute(POST("/users"), handler::createUser);
}
@Component
public class UserHandler {
private final UserRepository repo;
public UserHandler(UserRepository repo) {
this.repo = repo;
}
public Mono<ServerResponse> getUserById(ServerRequest request) {
Long id = Long.valueOf(request.pathVariable("id"));
return repo.findById(id)
.flatMap(user -> ServerResponse.ok().bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> createUser(ServerRequest request) {
Mono<User> userMono = request.bodyToMono(User.class);
return userMono.flatMap(user -> repo.save(user))
.flatMap(saved -> ServerResponse.ok().bodyValue(saved));
}
}
Functional endpoints provide a more lightweight, testable approach, especially for APIs or microservices.
Summary
By mastering these advanced concepts, you can build truly scalable, resilient reactive apps with Spring WebFlux:
- Combining streams elegantly with
flatMapandzip. - Handling errors gracefully to improve robustness.
- Controlling backpressure to maintain system stability.
- Integrating with external reactive APIs via
WebClient. - Choosing between annotation-based and functional routing.
Ready to take your Spring WebFlux skills further? Let me know if you’d like a deep dive into reactive security, testing reactive streams, or integrating with Kafka and messaging systems next!