Building Advanced Reactive Web Apps with Spring WebFlux – part II

Date: November 18 2023


1. Combining Multiple Reactive Streams: zip and flatMap

In real apps, data often comes from multiple reactive sources that you need to combine.

Example: Combine User Info and User Orders

Suppose you have two reactive services:

Mono<User> getUserById(Long id);
Flux<Order> getOrdersForUser(Long userId);

You want to create a combined DTO:

public class UserOrders {
    private User user;
    private List<Order> orders;

    // constructors, getters, setters
}

Reactive Combination Using zip and flatMap

public Mono<UserOrders> getUserOrders(Long userId) {
    Mono<User> userMono = getUserById(userId);
    Flux<Order> ordersFlux = getOrdersForUser(userId);

    return userMono.flatMap(user ->
        ordersFlux.collectList()
                  .map(orders -> new UserOrders(user, orders))
    );
}
  • flatMap unwraps the user Mono.
  • collectList() aggregates the Flux of orders into a List<Order>.
  • Finally, a new UserOrders object is returned wrapped in a Mono.

Alternatively, if you had two Monos, you could use Mono.zip:

Mono<User> userMono = getUserById(userId);
Mono<List<Order>> ordersMono = getOrdersForUser(userId).collectList();

return Mono.zip(userMono, ordersMono)
           .map(tuple -> new UserOrders(tuple.getT1(), tuple.getT2()));

2. Advanced Error Handling Strategies

Graceful Fallbacks with onErrorResume

Instead of failing the whole stream, fallback to a default value or alternative flow:

@GetMapping("/user/{id}")
public Mono<User> getUserSafe(@PathVariable Long id) {
    return getUserById(id)
        .onErrorResume(e -> {
            // Log the error and return default user
            log.error("User fetch failed", e);
            return Mono.just(new User(-1L, "Guest User"));
        });
}

Conditional Error Handling

You can filter on exception types:

.onErrorResume(WebClientResponseException.NotFound.class, e -> Mono.empty())

3. Controlling Backpressure

Backpressure helps prevent your system from being overwhelmed.

For example, when consuming a large stream of events:

@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Event> streamEvents() {
    return eventService.getEvents()
                       .onBackpressureBuffer(100) // Buffer max 100 events
                       .onBackpressureDrop(event -> log.warn("Dropped event: " + event));
}

You can:

  • Buffer a limited number of events (onBackpressureBuffer)
  • Drop events when overwhelmed (onBackpressureDrop)
  • Switch to latest values only (onBackpressureLatest)

4. Integrating with External Reactive Services

Spring WebFlux works great with reactive HTTP clients like WebClient.

Example: Calling an External API Reactively

@Autowired
private WebClient webClient;

public Mono<Weather> getWeather(String city) {
    return webClient.get()
                    .uri("https://api.weather.com/{city}", city)
                    .retrieve()
                    .bodyToMono(Weather.class)
                    .timeout(Duration.ofSeconds(2))  // Timeout if slow
                    .retry(3)                        // Retry up to 3 times
                    .onErrorResume(e -> Mono.just(new Weather("Unavailable", 0)));
}
  • timeout() sets max wait time.
  • retry() retries transient errors.
  • onErrorResume() provides fallback data.

5. Functional Endpoints (Alternative to Annotations)

Spring WebFlux supports a functional programming model for defining routes and handlers:

@Bean
public RouterFunction<ServerResponse> route(UserHandler handler) {
    return RouterFunctions.route(GET("/users/{id}"), handler::getUserById)
                          .andRoute(POST("/users"), handler::createUser);
}

@Component
public class UserHandler {
    private final UserRepository repo;

    public UserHandler(UserRepository repo) {
        this.repo = repo;
    }

    public Mono<ServerResponse> getUserById(ServerRequest request) {
        Long id = Long.valueOf(request.pathVariable("id"));
        return repo.findById(id)
                   .flatMap(user -> ServerResponse.ok().bodyValue(user))
                   .switchIfEmpty(ServerResponse.notFound().build());
    }

    public Mono<ServerResponse> createUser(ServerRequest request) {
        Mono<User> userMono = request.bodyToMono(User.class);
        return userMono.flatMap(user -> repo.save(user))
                       .flatMap(saved -> ServerResponse.ok().bodyValue(saved));
    }
}

Functional endpoints provide a more lightweight, testable approach, especially for APIs or microservices.


Summary

By mastering these advanced concepts, you can build truly scalable, resilient reactive apps with Spring WebFlux:

  • Combining streams elegantly with flatMap and zip.
  • Handling errors gracefully to improve robustness.
  • Controlling backpressure to maintain system stability.
  • Integrating with external reactive APIs via WebClient.
  • Choosing between annotation-based and functional routing.

Ready to take your Spring WebFlux skills further? Let me know if you’d like a deep dive into reactive security, testing reactive streams, or integrating with Kafka and messaging systems next!


Comments

No comments yet. Why don’t you start the discussion?

Leave a Reply

Your email address will not be published. Required fields are marked *