Chapter 5. Developing reactive applications using Spring Boot with Eclipse Vert.x
This section provides an introduction to developing applications in a reactive way using Spring Boot starters based on Spring Boot and Eclipse Vert.x. The following examples demonstrate how you can use the starters to create reactive applications.
5.1. Introduction to Spring Boot with Eclipse Vert.x
The Spring reactive stack is build on Project Reactor, a reactive library that implements backpressure and is compliant with the Reactive Streams specification. It provides the Flux
and Mono
functional API types that enable asynchronous event stream processing.
On top of Project Reactor, Spring provides WebFlux, an asynchronous event-driven web application framework. While WebFlux is designed to work primarily with Reactor Netty, it can also operate with other reactive HTTP servers, such as Eclipse Vert.x.
Spring WebFlux and Reactor enable you to create applications that are:
- Non-blocking: The application continues to handle further requests when waiting for a response from a remote component or service that is required to complete the current request.
- Asynchronous: the application responds to events from an event stream by generating response events and publishing them back to the event stream where they can be picked up by other clients in the application.
- Event-driven: The application responds to events generated by the user or by another service, such as mouse clicks, HTTP requests, or new files being added to a storage.
- Scalable: Increasing the number of Publishers or Subscribers to match the required event processing capacity of an application only results in a slight increase in the complexity of routing requests between individual clients in the application. Reactive applications can handle large numbers of events using fewer computing and networking resources as compared to other application programming models.
- Resilient: The application can handle failure of services it depend on without a negative impact on its overall quality of service.
Additional advantages of using Spring WebFlux include:
- Similarity with SpringMVC
- The SpringMVC API types and WebFlux API types are similar, and it is easy for developers to apply knowledge of SpringMVC to programming applications with WebFlux.
The Spring Reactive offering by Red Hat brings the benefits of Reactor and WebFlux to OpenShift and stand-alone RHEL, and introduces a set of Eclipse Vert.x extensions for the WebFLux framework. This allows you to retain the level of abstraction and rapid prototyping capabilities of Spring Boot, and provides an asynchronous IO API that handles the network communications between the services in your application in a fully reactive manner.
- Annotated controllers support
- WebFlux retains the endpoint controller annotations introduced by SpringMVC (Both SpringMVC and WebFlux support reactive RxJava2 and Reactor return types).
- Functional programming support
-
Reactor interacts with the Java 8 Functional API, as well as
CompletablebFuture
, andStream
APIs. In addition to annotation-based endpoints, WebFlux also supports functional endpoints.
Additional resources
See the following resources for additional in-depth information on the implementation details of technologies that are part of the Spring Reactive stack:
- The Reactive Manifesto
- Reactive Streams specification
- Spring Framework reference documentation: Web Applications on Reactive Stack
- Reactor Netty documentation
-
API Reference page for the
Mono
class in Project Reactor Documentation -
API Reference page for the
Flux
class in Project Reactor Documentation
5.2. Reactive Spring Web
The spring-web
module provides the foundational elements of the reactive capabilities of Spring WebFlux, including:
-
HTTP abstractions provided by the
HttpHandler
API - Reactive Streams adapters for supported servers (Eclipse Vert.x, Undertow and others)
Codecs for encoding and decoding event stream data. This includes:
-
DataBuffer
, an abstraction for different types of byte buffer representations (NettyByteBuf
,java.nio.ByteBuffer
, as well as others) - Low-level contracts to encode and decode content independent of HTTP
-
HttpMessageReader
andHTTPMessageWriter
contracts to encode and decode HTTP message content
-
-
The
WebHandler
API (a counterpart to the Servlet 3.1 I/O API that uses non-blocking contracts).
When designing your web application, you can choose between 2 programming models that Spring WebFlux provides:
- Annotated Controllers
-
Annotated controllers in Spring WebFlux are consistent with Spring MVC, and are based on the same annotations from the
spring-web
module. In addition to thespring-web
module from SpringMVC, its WebFlux counterpart also supports reactive@RequestBody
arguments. - Functional Endpoints
- Functional endpoints provided by spring WebFlux on Java 8 Lambda expressions and functional APIs, this programming model relies on a dedicated library (Reactor, in this case) that routes and handles requests. As opposed to annotation-based endpoint controllers that rely on declaring Intent and using callbacks to complete an activity, the reactive model based on functional endpoints allows request handling to be fully controlled by the application.
5.3. Creating a reactive Spring Boot HTTP service with WebFlux
Create a basic reactive Hello World HTTP web service using Spring Boot and WebFlux.
Prerequisites
- JDK 8 or JDK 11 installed
- Maven installed
- A Maven-based application project configured to use Spring Boot
Procedure
Add
vertx-spring-boot-starter-http
as a dependency in thepom.xml
file of your project.pom.xml
<project> ... <dependencies> ... <dependency> <groupId>dev.snowdrop</groupId> <artifactId>vertx-spring-boot-starter-http</artifactId> </dependency> ... <dependencies> ... </project>
Create a main class for your application and define the router and handler methods.
HttpSampleApplication.java
package dev.snowdrop.vertx.sample.http; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; import reactor.core.publisher.Mono; import static org.springframework.web.reactive.function.BodyInserters.fromObject; import static org.springframework.web.reactive.function.server.RouterFunctions.route; import static org.springframework.web.reactive.function.server.ServerResponse.ok; @SpringBootApplication public class HttpSampleApplication { public static void main(String[] args) { SpringApplication.run(HttpSampleApplication.class, args); } @Bean public RouterFunction<ServerResponse> helloRouter() { return route() .GET("/hello", this::helloHandler) .build(); } private Mono<ServerResponse> helloHandler(ServerRequest request) { String name = request .queryParam("name") .orElse("World"); String message = String.format("Hello, %s!", name); return ok() .body(fromObject(message)); } }
OPTIONAL: Run and test your application locally:
Navigate to the root directory of your Maven project:
$ cd myApp
Package your application:
$ mvn clean package
Start your application from the command line:
$ java -jar target/vertx-spring-boot-sample-http.jar
In a new terminal window, issue an HTTP request on the
/hello
endpoint:$ curl localhost:8080/hello Hello, World!
Provide a custom name with your request to get a personalized response:
$ curl http://localhost:8080/hello?name=John Hello, John!
Additional resources
- You can deploy your application to an OpenShift cluster using Fabric8 Maven Plugin.
- You can also configure your application for deployment on stand-alone Red Hat Enterprise Linux.
- For more detail on creating reactive web services with Spring Boot, see the reactive REST service development guide in the Spring community documentation.
5.4. Using basic authentication in a reactive Spring Boot WebFlux application.
Create a reactive Hello World HTTP web service with basic form-based authentication using Spring Security and WebFlux starters.
Prerequisites
- JDK 8 or JDK 11 installed
- Maven installed
- A Maven-based application project configured to use Spring Boot
Procedure
Add
vertx-spring-boot-starter-http
andspring-boot-starter-security
as dependencies in thepom.xml
file of your project.pom.xml
<project> ... <dependencies> ... <dependency> <groupId>dev.snowdrop</groupId> <artifactId>vertx-spring-boot-starter-http</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-security</artifactId> </dependency> ... <dependencies> ... </project>
Create an endpoint controller class for your application:
HelloController.java
package dev.snowdrop.vertx.sample.http.security; import java.security.Principal; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono; @RestController public class HelloController { @GetMapping("/") public Mono<String> hello(Mono<Principal> principal) { return principal .map(Principal::getName) .map(this::helloMessage); } private String helloMessage(String username) { return "Hello, " + username + "!"; } }
Create the main class of your application:
HttpSecuritySampleApplication.java
package dev.snowdrop.vertx.sample.http.security; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class HttpSecuritySampleApplication { public static void main(String[] args) { SpringApplication.run(HttpSecuritySampleApplication.class, args); } }
Create a
SecurityConfiguration
class that stores the user credentials for accessing the/hello
endpoint.SecurityConfiguration.java
package dev.snowdrop.vertx.sample.http.security; import org.springframework.context.annotation.Bean; import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity; import org.springframework.security.core.userdetails.MapReactiveUserDetailsService; import org.springframework.security.core.userdetails.User; import org.springframework.security.core.userdetails.UserDetails; @EnableWebFluxSecurity public class SecurityConfiguration { @Bean public MapReactiveUserDetailsService userDetailsService() { UserDetails user = User.withDefaultPasswordEncoder() .username("user") .password("user") .roles("USER") .build(); return new MapReactiveUserDetailsService(user); } }
OPTIONAL: Run and test your application locally:
Navigate to the root directory of your Maven project:
$ cd myApp
Package your application:
$ mvn clean package
Start your application from the command line:
$ java -jar target/vertx-spring-boot-sample-http-security.jar
-
Navigate to
http://localhost:8080
using a browser to access the login screen. Log in using the credentials below:
- username: user
- password: user
You receive a customized greeting when you are logged in:
Hello, user!
-
Navigate to
http://localhost:8080/logout
using a web browser and use the Log out button to log out of your application. Alternatively, use a terminal to make an unauthenticated HTTP request on
localhost:8080
. You receive HTTP401 Unauthorized
response from your application.$ curl -I http://localhost:8080 HTTP/1.1 401 Unauthorized WWW-Authenticate: Basic realm="Realm" Cache-Control: no-cache, no-store, max-age=0, must-revalidate Pragma: no-cache Expires: 0 X-Content-Type-Options: nosniff X-Frame-Options: DENY X-XSS-Protection: 1 ; mode=block Referrer-Policy: no-referrer
Issue an authenticated request using the example user credentials. You receive a personalized response.
$ curl -u user:user http://localhost:8080 Hello, user!
Additional resources
- You can deploy your application to an OpenShift cluster using Fabric8 Maven Plugin.
- You can also configure your application for deployment on stand-alone Red Hat Enterprise Linux.
- For the full specification of the Basic HTTP authentication scheme, see document RFC-7617.
- For the full specification of HTTP authentication extensions for interactive clients, including form-based authentication, see document RFC-8053.
5.5. Using OAuth2 authentication in a reactive Spring Boot application.
Set up OAuth2 authentication for your reactive Spring Boot application and authenticate using your client ID and client secret.
Prerequisites
- JDK 8 or JDK 11 installed
- Maven installed
- A Maven-based application project configured to use Spring Boot
- A GitHub account
Procedure
Register a new OAuth 2 application on your Github account. Ensure that you provide the following values in the registration form:
-
Homepage URL:
http://localhost:8080
Authorization callback URL:
http://localhost:8080/login/oauth2/code/github
Ensure that you save the client ID and a client secret that you receive upon completing the registration.
-
Homepage URL:
Add the following dependencies in the
pom.xml
file of your project:-
vertx-spring-boot-starter-http
-
spring-boot-starter-security
-
spring-boot-starter-oauth2-client
reactor-netty
Note that the
reactor-netty
client is required to ensure thatspring-boot-starter-oauth2-client
works properly.pom.xml
<project> ... <dependencies> ... <dependency> <groupId>dev.snowdrop</groupId> <artifactId>vertx-spring-boot-starter-http</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-security</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-oauth2-client</artifactId> </dependency> <!-- Spring OAuth2 client only works with Reactor Netty client --> <dependency> <groupId>io.projectreactor.netty</groupId> <artifactId>reactor-netty</artifactId> </dependency> ... <dependencies> ... </project>
-
Create an endpoint controller class for your application:
HelloController.java
package dev.snowdrop.vertx.sample.http.oauth; import org.springframework.security.core.annotation.AuthenticationPrincipal; import org.springframework.security.oauth2.core.user.OAuth2User; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono; @RestController public class HelloController { @GetMapping public Mono<String> hello(@AuthenticationPrincipal OAuth2User oauth2User) { return Mono.just("Hello, " + oauth2User.getAttributes().get("name") + "!"); } }
Create the main class of your application:
OAuthSampleApplication.java
package dev.snowdrop.vertx.sample.http.oauth; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class OAuthSampleApplication { public static void main(String[] args) { SpringApplication.run(OAuthSampleApplication.class, args); } }
Create a YAML configuration file to store the OAuth2 client ID and client secret you received from GitHub upon registering your application.
src/main/resources/application.yml
spring: security: oauth2: client: registration: github: client-id: YOUR_GITHUB_CLIENT_ID client-secret: YOUR_GITHUB_CLIENT_SECRET
OPTIONAL: Run and test your application locally:
Navigate to the root directory of your Maven project:
$ cd myApp
Package your application:
$ mvn clean package
Start your application from the command line:
$ java -jar target/vertx-spring-boot-sample-http-oauth.jar
-
Navigate to
http://localhost:8080
using a web browser. You are redirected to an OAuth2 application authorization screen on GitHub. If prompted, log in using your GitHub account credentials. - Click Authorize to confirm. You are redirected to a screen showing a personalized greeting message.
Additional resources
- You can deploy your application to an OpenShift cluster using Fabric8 Maven Plugin.
- You can also configure your application for deployment on stand-alone Red Hat Enterprise Linux.
- For more information, see the OAuth2 tutorial in the Spring community documentation. Alternatively, see the tutorial on using OAuth2 with Spring Security.
- For the full OAuth2 authentication framework specification, see document RFC-6749.
5.6. Creating a reactive Spring Boot SMTP mail application
Create a reactive SMTP email service with Spring Boot with Eclipse Vert.x.
Prerequisites
- JDK 8 or JDK 11 installed
- Maven installed
- A Maven-based application project configured to use Spring Boot
- A SMTP mail server configured on your machine
Procedure
Add
vertx-spring-boot-starter-http
andvertx-spring-boot-starter-mail
as dependencies in thepom.xml
file of your project.pom.xml
<project> ... <dependencies> ... <dependency> <groupId>dev.snowdrop</groupId> <artifactId>vertx-spring-boot-starter-http</artifactId> </dependency> <dependency> <groupId>dev.snowdrop</groupId> <artifactId>vertx-spring-boot-starter-mail</artifactId> </dependency> ... <dependencies> ... </project>
Create a mail handler class for your application:
MailHandler.java
package dev.snowdrop.vertx.sample.mail; import dev.snowdrop.vertx.mail.MailClient; import dev.snowdrop.vertx.mail.MailMessage; import dev.snowdrop.vertx.mail.SimpleMailMessage; import org.springframework.stereotype.Component; import org.springframework.util.MultiValueMap; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; import reactor.core.publisher.Mono; import static org.springframework.web.reactive.function.server.ServerResponse.noContent; @Component public class MailHandler { private final MailClient mailClient; public MailHandler(MailClient mailClient) { this.mailClient = mailClient; } public Mono<ServerResponse> send(ServerRequest request) { return request.formData() .log() .map(this::formToMessage) .flatMap(mailClient::send) .flatMap(result -> noContent().build()); } private MailMessage formToMessage(MultiValueMap<String, String> form) { return new SimpleMailMessage() .setFrom(form.getFirst("from")) .setTo(form.get("to")) .setSubject(form.getFirst("subject")) .setText(form.getFirst("text")); } }
Create the main class of your application:
MailSampleApplication.java
package dev.snowdrop.vertx.sample.mail; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.core.io.ClassPathResource; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.ServerResponse; import static org.springframework.http.MediaType.APPLICATION_FORM_URLENCODED; import static org.springframework.web.reactive.function.server.RequestPredicates.accept; import static org.springframework.web.reactive.function.server.RouterFunctions.resources; import static org.springframework.web.reactive.function.server.RouterFunctions.route; @SpringBootApplication public class MailSampleApplication { public static void main(String[] args) { SpringApplication.run(MailSampleApplication.class, args); } @Bean public RouterFunction<ServerResponse> mailRouter(MailHandler mailHandler) { return route() .POST("/mail", accept(APPLICATION_FORM_URLENCODED), mailHandler::send) .build(); } @Bean public RouterFunction<ServerResponse> staticResourceRouter() { return resources("/**", new ClassPathResource("static/")); } }
Create an
application.properties
file to store your SMTP server credentials:application.properties
vertx.mail.host=YOUR_SMTP_SERVER_HOSTNAME vertx.mail.username=YOUR_SMTP_SERVER_USERNAME vertx.mail.password=YOUR_SMTP_SERVER_PASSWORD
-
Create a
src/main/resources/static/index.html
file that serves as the frontend of your application. Alternatively, use the example HTML email form available for this procedure. OPTIONAL: Run and test your application locally:
Navigate to the root directory of your Maven project:
$ cd myApp
Package your application:
$ mvn clean package
Start your application from the command line.
$ java -jar target/vertx-spring-boot-sample-mail.jar
-
Navigate to
http://localhost:8080/index.html
using a web browser to access the email form.
Additional resources
- For more information on setting up an SMTP mail server on RHEL 7, see the Mail Transport Agent Configuration section in the RHEL 7 documentation.
- You can deploy your application to an OpenShift cluster using Fabric8 Maven Plugin
- You can also configure your application for deployment on stand-alone Red Hat Enterprise Linux.
5.7. Server-sent events
Server-sent events (SSE) is a push technology allowing HTTP sever to send unidirectional updates to the client. SSE works by establishing a connection between the event source and the client. The event source uses this connection to push events to the client-side. After the server pushes the events, the connection remains open and can be used to push subsequent events. When the client terminates the request on the server, the connection is closed. SSE represents a more resource-efficient alternative to polling, where a new connection must be established each time the client polls the event source for updates. As opposed to WebSockets, SSE pushes events in one direction only (that is, from the source to the client). It does not handle bidirectional communication between the event source and the client.
The specification for SSE is incorporated into HTML5, and is widely supported by web browsers, including their legacy versions. SSE can be used from the command line, and is relatively simple to set up compared to other protocols.
SSE is suitable for use cases that require frequent updates from the server to the client, while updates from the client side to the server are expected to be less frequent. Updates form the client side to the server can then be handled over a different protocol, such as REST. Examples of such use cases include social media feed updates or notifications sent to a client when new files are uploaded to a file server.
5.8. Using Server-sent events in a reactive Spring Boot application
Create a simple service that accepts HTTP requests and returns a stream of server-sent events (SSE). When the client establishes a connection to the server and the streaming starts, the connection remains open. The server re-uses the connection to continuously push new events to the client. Canceling the request closes the connection and stops the stream, causing the client to stop receiving updates form the server.
Prerequisites
- JDK 8 or JDK 11 installed
- Maven installed
- A Maven-based application project configured to use Spring Boot
Procedure
Add
vertx-spring-boot-starter-http
as a dependency in thepom.xml
file of your project.pom.xml
<project> ... <dependencies> ... <dependency> <groupId>dev.snowdrop</groupId> <artifactId>vertx-spring-boot-starter-http</artifactId> </dependency> ... <dependencies> ... </project>
Create the main class of your application:
SseExampleApplication.java
package dev.snowdrop.vertx.sample.sse; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SseSampleApplication { public static void main(String[] args) { SpringApplication.run(SseSampleApplication.class, args); } }
Create a Server-sent Event controller class for your application. In this example, the class generates a stream of random integers and prints them to a terminal application.
SseController.java
package dev.snowdrop.vertx.sample.sse; import java.time.Duration; import java.util.Random; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; @RestController public class SseController { @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<Integer> getRandomNumberStream() { Random random = new Random(); return Flux.interval(Duration.ofSeconds(1)) .map(i -> random.nextInt()) .log(); } }
OPTIONAL: Run and test your application locally:
Navigate to the root directory of your Maven project:
$ cd myApp
Package your application:
$ mvn clean package
Start your application from the command line:
$ java -jar target/vertx-spring-boot-sample-sse.jar
In a new terminal window, issue a HTTP request to
localhost
. You start receiving a continuous stream of random integers from the server-sent event controller:$ curl localhost:8080 data:-2126721954 data:-573499422 data:1404187823 data:1338766210 data:-666543077 ...
Press
Ctrl
+C
to cancel your HTTP request and terminate the stream of responses.
Additional resources
- You can deploy your application to an OpenShift cluster using Fabric8 Maven Plugin.
- You can also configure your application for deployment on stand-alone Red Hat Enterprise Linux.
5.9. WebSocket Protocol
The WebSocket protocol upgrades a standard HTTP connection to make it persistent and subsequently uses that connection to pass specially formatted messages between the client and server of your application. While the protocol relies on HTTP like handshakes to establish the initial connection between client and server over TCP, it uses a special message format for communication between client and server.
Unlike a standard HTTP connection, a WebSocket connection:
- can be used to send messages in both directions
- remains open after the initial request is completed,
- uses special framing headers in messages, which allows you to send non-HTTP-formatted message payloads (for example control data) inside an HTTP request.
As a result, the WebSockets protocol extends the possibilities of a standard HTTP connection while requiring fewer networking resources and decreasing the risk of services failing due to network timeouts (compared to alternative methods of providing a real time messaging functionality, such as HTTP Long Polling).
WebSockets connections are supported by default on most currently available web browsers across different operating systems and hardware architectures, which makes WebSockets a suitable choice for writing cross-platform web-based applications that you can connect to using only a web browser.
5.10. Using WebSockets in a reactive application based on WebFlux
The following example demonstrates how you can use the WebSocket protocol in an application that provides a backend service that you can connect to using a web browser. When you access the web front end URL of your application using a web browser, the front-end initiates a WebSocket connection to a backend service. You can use the web form available on the website to send values formatted as text strings to the back-end service using the WebSocket connection. The application processes the received value by converting all characters to uppercase and sends the result to the front end using the same WebSocket connection.
Create an application using Spring on Reactive Stack that consists of:
- a back end Java-based service with a WebSocket handler
- a web front end based on HTML and JavaScript.
Prerequisites
- A Maven-based Java application project that uses Spring Boot
- JDK 8 or JDK 11 installed
- Maven installed
Procedure:
Add the
vertx-spring-boot-starter-http
as a dependency in thepom.xml
file of your application project:pom.xml
... <dependencies> ... <dependency> <groupId>dev.snowdrop</groupId> <artifactId>vertx-spring-boot-starter-http</artifactId> </dependency> ... </dependencies> ...
Create the class file containing the back-end application code:
/src/main/java/webSocketSampleApplication.java
package dev.snowdrop.WebSocketSampleApplication; import java.util.Collections; import java.util.Map; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.web.reactive.HandlerMapping; import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping; import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketSession; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @SpringBootApplication public class WebSocketSampleApplication { public static void main(String[] args) { SpringApplication.run(WebSocketSampleApplication.class, args); } @Bean public HandlerMapping handlerMapping() { // Define URL mapping for the socket handlers Map<String, WebSocketHandler> handlers = Collections.singletonMap("/echo-upper", this::toUppercaseHandler); SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping(); handlerMapping.setUrlMap(handlers); // Set a higher precedence than annotated controllers (smaller value means higher precedence) handlerMapping.setOrder(-1); return handlerMapping; } private Mono<Void> toUppercaseHandler(WebSocketSession session) { Flux<WebSocketMessage> messages = session.receive() // Get incoming messages stream .filter(message -> message.getType() == WebSocketMessage.Type.TEXT) // Filter out non-text messages .map(message -> message.getPayloadAsText().toUpperCase()) // Execute service logic .map(session::textMessage); // Create a response message return session.send(messages); // Send response messages } }
Create the HTML document that serves as a front end for the application. Note, that in the following example, the
<script>
element contains the JavaScript code that handles the communication with the back end of your application:/src/main/resources/static/index.html
<!doctype html> <html> <head> <meta charset="utf-8"/> <title>WebSocket Example</title> <script> const socket = new WebSocket("ws://localhost:8080/echo-upper"); socket.onmessage = function(e) { console.log("Received a value: " + e.data); const messages = document.getElementById("messages"); const message = document.createElement("li"); message.innerHTML = e.data; messages.append(message); } window.onbeforeunload = function(e) { console.log("Closing socket"); socket.close(); } function send(event) { event.preventDefault(); const value = document.getElementById("value-to-send").value.trim(); if (value.length > 0) { console.log("Sending value to socket: " + value); socket.send(value); } } </script> </head> <body> <div> <h1>Vert.x Spring Boot WebSocket example</h1> <p> Enter a value to the form below and click submit. The value will be sent via socket to a backend service. The service will then uppercase the value and send it back via the same socket. </p> </div> <div> <form onsubmit="send(event)"> <input type="text" id="value-to-send" placeholder="A value to be sent"/> <input type="submit"/> </form> </div> <div> <ol id="messages"></ol> </div> </body> </html>
OPTIONAL: Run and test your application locally:
Navigate to the root directory of your Maven project:
$ cd myApp
Package your application:
$ mvn clean package
Start your application from the command line:
$ java -jar target/vertx-spring-boot-sample-websocket.jar
Navigate to
http://localhost:8080/index.html
using a web browser. The website shows a web interface that contains- an input text box,
- a list of processed results,
- a Submit button.
- Enter a string value into the text box and select Submit.
- View the resulting value rendered in uppercase in the list below the input text box.
Additional resources
- You can deploy your application to an OpenShift cluster using Fabric8 Maven Plugin.
- You can also configure your application for deployment on stand-alone Red Hat Enterprise Linux.
5.11. Advanced Message Queuing Protocol
The Advanced Message Queuing Protocol (AMQP) is a communication protocol designed to move messages between applications in a a non-blocking way. Standardized as AMQP 1.0, the protocol provides interoperability and messaging integration between new and legacy applications across different network topologies and environments. AMQP works with multiple broker architectures and provides a range of ways to deliver, receive, queue and route messages. AMQP can also work peer-to-peer when you are not using a broker. In a hybrid cloud environment, you can use AMQP to integrate your services with legacy applications without having to deal with processing a variety of different message formats. AMQP Supports real-time asynchronous message processing capabilities and is therefore suitable for use in reactive applications.
5.12. How the AMQP reactive example works
The messaging integration pattern that this example features is a Publisher-Subscriber pattern that 2 queues and a broker.
- The Request queue stores HTTP requests containing strings that you enter using the Web interface to be processed by the text string processor.
- The Result queue stores responses containing the strings that have been converted to Uppercase and are ready to be displayed.
The components that the application consist of are:
- A front-end service that you can use to submit a text string to the application.
- A back-end service that converts the string to uppercase characters.
- A HTTP controller that is configured and provided by the Spring Boot HTTP Starter
- An embedded Artemis AMQP Broker instance that routes messages between 2 messaging queues:
The request queue passes messages containing text strings from the front end to the text string processor service. When you submit a string for processing:
-
The front end service sends a HTTP
POST
request containing your string as the payload of the request to the HTTP controller. - The request is picked up by the messaging manager that routes the message to the AMQP Broker.
- The broker routes the message to the text string processor service. If the text processor service is unavailable to pick up the request, the broker routes the message to the next available processor instance, if such instance is available. Alternatively, the broker waits before resending the request to the same instance when it becomes available again.
- The text string processor service picks up the message and converts the characters in the string to uppercase. The processor service sends a request with the processed result in uppercase to the AMQP Broker.
- The AMQP broker routes the request with the processed results to the messaging manager.
- The messaging manager stores the request with the processed results in the outgoing queue where it can be accessed by the front end service.
The response queue stores HTTP responses that contain results processed by the string processor service. The front end application polls this queue at regular intervals to retrieve the results. When the processed result is ready to be displayed:
-
The front end service sends a HTTP
GET
request to the HTTP controller provided by the Spring Boot HTTP Starter. - The HTTP controller routes the request to the messaging manager.
-
When a request previously submitted by the front end for processing is ready and available in the outgoing queue, the messaging manager sends the result as a response to the HTTP
GET
request back to the HTTP controller - The HTTP controller routes the response back to the front end service that displays the result.
5.13. Using AMQP in a reactive application
Develop a simple messaging reactive application using the AMQP Client Starter with a Spring Boot HTTP controller. This example application integrates 2 services in a Publisher-Subscriber messaging integration pattern that uses 2 messaging queues and a broker.
This example shows how you can create a basic application with Spring Boot and Eclipse Vert.x on Reactor Netty that consists of 2 services integrated using AMQP messaging. The application consist of the following components:
- A front-end service that you can use to submit text strings to the application
- A back-end service that converts strings to uppercase characters
- An Artemis AMQP broker that routes massages between the services and manages the request queue and response queue.
- A HTTP controller provided by the Spring Boot HTTP Starter
Prerequisites
- A Maven-based Java application project configured to use Spring Boot
- JDK 8 or JDK 11 installed
- Maven installed
Procedure
Add the following dependencies to the
pom.xml
file of your application project:pom.xml
... <dependencies> ... <dependency> <groupId>dev.snowdrop</groupId> <artifactId>vertx-spring-boot-starter-http</artifactId> </dependency> <dependency> <groupId>dev.snowdrop</groupId> <artifactId>vertx-spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-artemis</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>artemis-jms-server</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>artemis-amqp-protocol</artifactId> <exclusions> <exclusion> <groupId>org.apache.qpid</groupId> <artifactId>proton-j</artifactId> </exclusion> </exclusions> </dependency> ... </dependencies> ...
Create the main class file of the example application. This class contains methods that define the respective processing queues for requests and results:
/src/main/java/AmqpExampleApplication.java
package dev.snowdrop.AmqpExampleApplication.java; import java.util.HashMap; import java.util.Map; import dev.snowdrop.vertx.amqp.AmqpProperties; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jms.artemis.ArtemisConfigurationCustomizer; import org.springframework.context.annotation.Bean; @SpringBootApplication public class AmqpExampleApplication { final static String PROCESSING_REQUESTS_QUEUE = "processing-requests"; final static String PROCESSING_RESULTS_QUEUE = "processing-results"; public static void main(String[] args) { SpringApplication.run(AmqpExampleApplication.class, args); } /** * Add Netty acceptor to the embedded Artemis server. */ @Bean public ArtemisConfigurationCustomizer artemisConfigurationCustomizer(AmqpProperties properties) { Map<String, Object> params = new HashMap<>(); params.put("host", properties.getHost()); params.put("port", properties.getPort()); return configuration -> configuration .addAcceptorConfiguration(new TransportConfiguration(NettyAcceptorFactory.class.getName(), params)); } }
Create the class file containing the code for the HTTP REST controller that manages the request queue and the response queue by exposing REST endpoints that handle your GET and POST requests:
/src/main/java/Controller.java
package dev.snowdrop.vertx.sample.amqp; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import static org.springframework.http.MediaType.TEXT_EVENT_STREAM_VALUE; /** * Rest controller exposing GET and POST resources to receive processed messages and submit messages for processing. */ @RestController public class Controller { private final MessagesManager messagesManager; public Controller(MessagesManager messagesManager) { this.messagesManager = messagesManager; } /** * Get a flux of messages processed up to this point. */ @GetMapping(produces = TEXT_EVENT_STREAM_VALUE) public Flux<String> getProcessedMessages() { return Flux.fromIterable(messagesManager.getProcessedMessages()); } /** * Submit a message for processing by publishing it to a processing requests queue. */ @PostMapping public Mono<Void> submitMessageForProcessing(@RequestBody String body) { return messagesManager.processMessage(body.trim()); } }
Create the class file containing the messaging manager. The manager controls how applications components publish requests to the request queue and subsequently subscribe to the response queue to obtain processed results:
/src/main/java/MessagesManager.java:
package dev.snowdrop.vertx.sample.amqp; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import dev.snowdrop.vertx.amqp.AmqpClient; import dev.snowdrop.vertx.amqp.AmqpMessage; import dev.snowdrop.vertx.amqp.AmqpSender; import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import reactor.core.Disposable; import reactor.core.publisher.Mono; import static dev.snowdrop.vertx.sample.amqp.AmqpSampleApplication.PROCESSING_REQUESTS_QUEUE; import static dev.snowdrop.vertx.sample.amqp.AmqpSampleApplication.PROCESSING_RESULTS_QUEUE; /** * Processor client submits messages to the requests queue and subscribes to the results queue for processed messages. */ @Component public class MessagesManager implements InitializingBean, DisposableBean { private final Logger logger = LoggerFactory.getLogger(MessagesManager.class); private final List<String> processedMessages = new CopyOnWriteArrayList<>(); private final AmqpClient client; private Disposable receiverDisposer; // Injecting EmbeddedActiveMQ to make sure it has started before creating this component. public MessagesManager(AmqpClient client, EmbeddedActiveMQ server) { this.client = client; } /** * Create a processed messages receiver and subscribe to its messages publisher. */ @Override public void afterPropertiesSet() { receiverDisposer = client.createReceiver(PROCESSING_RESULTS_QUEUE) .flatMapMany(receiver -> receiver.flux() .doOnCancel(() -> receiver.close().block())) // Close the receiver once subscription is disposed .subscribe(this::handleMessage); } /** * Cancel processed messages publisher subscription. */ @Override public void destroy() { if (receiverDisposer != null) { receiverDisposer.dispose(); } } /** * Get messages which were processed up to this moment. * * @return List of processed messages. */ public List<String> getProcessedMessages() { return processedMessages; } /** * Submit a message for processing by publishing it to a processing requests queue. * * @param body Message body to be processed. * @return Mono which is completed once the message is sent. */ public Mono<Void> processMessage(String body) { logger.info("Sending message '{}' for processing", body); AmqpMessage message = AmqpMessage.create() .withBody(body) .build(); return client.createSender(PROCESSING_REQUESTS_QUEUE) .map(sender -> sender.send(message)) .flatMap(AmqpSender::close); } private void handleMessage(AmqpMessage message) { String body = message.bodyAsString(); logger.info("Received processed message '{}'", body); processedMessages.add(body); } }
Create the class file containing the uppercase processor that receives text strings from the request queue and converts them to uppercase characters. The processor subsequently publishes the results to the response queue:
/src/main/java/UppercaseProcessor.java
package dev.snowdrop.vertx.sample.amqp; import dev.snowdrop.vertx.amqp.AmqpClient; import dev.snowdrop.vertx.amqp.AmqpMessage; import dev.snowdrop.vertx.amqp.AmqpSender; import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import reactor.core.Disposable; import reactor.core.publisher.Mono; import static dev.snowdrop.vertx.sample.amqp.AmqpSampleApplication.PROCESSING_REQUESTS_QUEUE; import static dev.snowdrop.vertx.sample.amqp.AmqpSampleApplication.PROCESSING_RESULTS_QUEUE; /** * Uppercase processor subscribes to the requests queue, converts each received message to uppercase and send it to the * results queue. */ @Component public class UppercaseProcessor implements InitializingBean, DisposableBean { private final Logger logger = LoggerFactory.getLogger(UppercaseProcessor.class); private final AmqpClient client; private Disposable receiverDisposer; // Injecting EmbeddedActiveMQ to make sure it has started before creating this component. public UppercaseProcessor(AmqpClient client, EmbeddedActiveMQ server) { this.client = client; } /** * Create a processing requests receiver and subscribe to its messages publisher. */ @Override public void afterPropertiesSet() { receiverDisposer = client.createReceiver(PROCESSING_REQUESTS_QUEUE) .flatMapMany(receiver -> receiver.flux() .doOnCancel(() -> receiver.close().block())) // Close the receiver once subscription is disposed .flatMap(this::handleMessage) .subscribe(); } /** * Cancel processing requests publisher subscription. */ @Override public void destroy() { if (receiverDisposer != null) { receiverDisposer.dispose(); } } /** * Convert the message body to uppercase and send it to the results queue. */ private Mono<Void> handleMessage(AmqpMessage originalMessage) { logger.info("Processing '{}'", originalMessage.bodyAsString()); AmqpMessage processedMessage = AmqpMessage.create() .withBody(originalMessage.bodyAsString().toUpperCase()) .build(); return client.createSender(PROCESSING_RESULTS_QUEUE) .map(sender -> sender.send(processedMessage)) .flatMap(AmqpSender::close); } }
OPTIONAL: Run and test your application locally:
Navigate to the root directory of your Maven project:
$ cd myApp
Package your application:
$ mvn clean package
Start your application from the command line:
$ java -jar target/vertx-spring-boot-sample-amqp.jar
In a new terminal window, send a number of HTTP
POST
request that contain text strings to be processed tolocalhost
$ curl -H "Content-Type: text/plain" -d 'Hello, World' -X POST http://localhost:8080 $ curl -H "Content-Type: text/plain" -d 'Hello again' -X POST http://localhost:8080
Send an HTTP
GET
request tolocalhost
. You receive a HTTP response with the strings in uppercase.$ curl http://localhost:8080 HTTP/1.1 200 OK Content-Type: text/event-stream;charset=UTF-8 transfer-encoding: chunked data:HELLO, WORLD data:HELLO AGAIN
Additional resources
- You can deploy your application to an OpenShift cluster using Fabric8 Maven Plugin.
- You can also configure your application for deployment on stand-alone Red Hat Enterprise Linux.
5.14. Apache Kafka
Apache Kafka is a scalable messaging integration system that is designed to exchange messages between processes, applications, and services. Kafka is based on clusters with one or more brokers that maintain a set of topics. Essentially, topics are categories that can be defined for every cluster using topic IDs. Each topic contains pieces of data called records that contain information about the events taking place in your application. Applications connected to the system can add records to these topics, or process and reprocess messages added earlier.
The broker is responsible for handling the communication with client applications and for managing the records in the topic. To ensure that no records are lost, the broker tracks all records in a commit log and keeps track of an offset value for each application. The offset is similar to a pointer that indicates the most recently added record.
Applications can pull the latest records from the topic, or they can change the offset to read records that have been added earlier earlier message. This functionality prevents client applications from becoming overwhelmed with incoming requests in case they can not process them in real time. When this happens, Kafka prevents loss of data by storing records that cannot be processed in real time in the commit log. when the client application is able to catch up with the incoming requests, it resumes processing records in real time
A broker can manage records in multiple topics by sorting them into topic partitions. Apache Kafka replicates these partitions to allow records form a single topic to be handled by multiple brokers in parallel, allowing you to scale the rate at which your applications process records in a topic. The replicated topic partitions (also called followers) are synchronized with the original topic partition (also called a Leader) to avoid redundancy in processing records. New records are committed to the Leader partition, Followers only replicate the changes made to the leader.
5.15. How the Apache Kafka reactive example works
This example application is based on a Publisher-Subscriber message streaming pattern implemented using an Apache Kafka. The components that the application consist of are:
-
The
KafkaExampleApplication
class that instantiates the log message producer and consumer - A WebFlux HTTP controller that is configured and provided by the Spring Boot HTTP Starter. The controller provides rest resources used to publish and read messages.
-
A
KafkaLogger
class that defines how the producer publishes messages to thelog
topic on Kafka. -
A
KafkaLog
class that displays messages that the example application receives from thelog
topic on Kafka.
Publishing messages:
- You make an HTTP POST request to the example application with the log message as the payload.
- The HTTP controller routes the message to the REST endpoint used for publishing messages, and passes the message to the logger instance.
-
The HTTP controller publishes the received message to the
log
topic on Kafka. - KafkaLog instance receives the log message from a Kafka topic.
Reading messages:
-
You send a HTTP
GET
request to the example application URL. -
The controller gets the messages from the
KafkaLog
instance and returns them as the body of the HTTP response.
5.16. Using Kafka in a reactive application
This example shows how you can create an example messaging application that uses Apache Kafka with Spring Boot and Eclipse Vert.x on Reactor Netty. The application publishes messages to a Kafka topic and then retrieves them and displays them when you send a request.
The Kafka configuration properties for message topics, URLs, and metadata used by the the Kafka cluster are stored in src/main/resources/application.yml
.
Prerequisites
- A Maven-based Java application project configured to use Spring Boot
- JDK 8 or JDK 11 installed
- Maven installed
Procedure
Add the WebFlux HTTP Starter and the Apache Kafka Starter as dependencies in the
pom.xml
file of your application project:pom.xml
... <dependencies> ... <!-- Vert.x WebFlux starter used to handle HTTP requests --> <dependency> <groupId>dev.snowdrop</groupId> <artifactId>vertx-spring-boot-starter-http</artifactId> </dependency> <!-- Vert.x Kafka starter used to send and receive messages to/from Kafka cluster --> <dependency> <groupId>dev.snowdrop</groupId> <artifactId>vertx-spring-boot-starter-kafka</artifactId> </dependency> ... </dependencies> ...
Create the
KafkaLogger
class. This class functions s a producer and sendas messages TheKafkaLogger
class defines how the Producer publishes messages (also called records) to the topic:/src/main/java/KafkaLogger.java
... final class KafkaLogger { private final KafkaProducer<String, String> producer; KafkaLogger(KafkaProducer<String, String> producer) { this.producer = producer; } public Mono<Void> logMessage(String body) { // Generic key and value types can be inferred if both key and value are used to create a builder ProducerRecord<String, String> record = ProducerRecord.<String, String>builder(LOG_TOPIC, body).build(); return producer.send(record) .log("Kafka logger producer") .then(); } } ...
Crate
KafkaLog
class. This class functions as the consumer of kafka messages.KafkaLog
retrieves messages from the topic an displays them in your terminal:/src/main/java/KafkaLog.java
... final class KafkaLog implements InitializingBean, DisposableBean { private final List<String> messages = new CopyOnWriteArrayList<>(); private final KafkaConsumer<String, String> consumer; private Disposable consumerDisposer; KafkaLog(KafkaConsumer<String, String> consumer) { this.consumer = consumer; } @Override public void afterPropertiesSet() { consumerDisposer = consumer.subscribe(LOG_TOPIC) .thenMany(consumer.flux()) .log("Kafka log consumer") .map(ConsumerRecord::value) .subscribe(messages::add); } @Override public void destroy() { if (consumerDisposer != null) { consumerDisposer.dispose(); } consumer.unsubscribe() .block(Duration.ofSeconds(2)); } public List<String> getMessages() { return messages; } } ...
Create the class file that contains the the HTTP REST controller. The controller that exposes REST resources that your application uses to handle the logging and reading of messages.
/src/main/java/Controller.java
package dev.snowdrop.vertx.sample.kafka; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import static org.springframework.http.MediaType.TEXT_EVENT_STREAM_VALUE; /** * HTTP controller exposes GET and POST resources to log messages and to receive the previously logged ones. */ @RestController public class Controller { private final KafkaLogger logger; private final KafkaLog log; public Controller(KafkaLogger logger, KafkaLog log) { this.logger = logger; this.log = log; } /** * Get a Flux of previously logged messages. */ @GetMapping(produces = TEXT_EVENT_STREAM_VALUE) public Flux<String> getMessages() { return Flux.fromIterable(log.getMessages()); } /** * Log a message. */ @PostMapping public Mono<Void> logMessage(@RequestBody String body) { return logger.logMessage(body.trim()); } }
Crate the YAML template that contains the URLs that producers and consumers in your Apache Kafka Cluster use to log and read messages. In this example, the consumer and producer on your Apache Kafka Cluster communicate using port
9092
onlocalhost
by default. Note, that you must configure the producers and consumers separately, as the following example shows:/src/main/resources/application.yml
vertx: kafka: producer: bootstrap: # The producer in your cluster uses this URL to publish messages to the log. servers: localhost:9092 key: # This class assigns the mandatory key attribute that is assigned to each message. serializer: org.apache.kafka.common.serialization.StringSerializer value: # This class assigns the mandatory value attribute that is assigned to each message. serializer: org.apache.kafka.common.serialization.StringSerializer consumer: bootstrap: servers: localhost:9092 # The consumer in your cluster uses this URL to read messages from the log. group: id: log # The consumer group IDs used to define a group of consumers that subscribe to the same topic. In this example, all consumers belong in the same consumer group. key: deserializer: org.apache.kafka.common.serialization.StringDeserializer # This class generates the mandatory key attribute that is assigned to each message. value: deserializer: org.apache.kafka.common.serialization.StringDeserializer # This class generates the mandatory value attribute that is assigned to each message.
OPTIONAL: Run and test your application locally:
Navigate to the root directory of your Maven project:
$ cd vertx-spring-boot-sample-kafka
Package your application:
$ mvn clean package
Start your application from the command line:
$ java -jar target/vertx-spring-boot-sample-kafka.jar
In a new terminal window, send a number of HTTP
POST
request that contain messages formatted as text strings tolocalhost
. The messages are all published to thelog
topic.$ curl -H "Content-Type: text/plain" -d 'Hello, World' -X POST http://localhost:8080 $ curl -H "Content-Type: text/plain" -d 'Hello again' -X POST http://localhost:8080 ...
Send an HTTP
GET
request tolocalhost
. You receive a HTTP response that contains all the messages in the topic that your consumers subscribe to.$ curl http://localhost:8080 HTTP/1.1 200 OK Content-Type: text/event-stream;charset=UTF-8 transfer-encoding: chunked data:Hello, World data:Hello, again ...
Additional resources
- You can deploy your application to an OpenShift cluster using Fabric8 Maven Plugin.
- You can also configure your application for deployment on stand-alone Red Hat Enterprise Linux.
In addition to using an example, you can also use Spring Boot with Eclipse Vert.x to create new Spring Boot applications from scratch and deploy them to OpenShift.