Reactive Micro Services

Microservices are a hot topic these days. A lot has been written about them and a lot of people and companies have started building them. Some say they the concept is nothing new and I have yet to find a clear and widely accepted definition of a microservice. The one I like I have found in a book written by Jonas Bonér (link):

“Microservices-Based Architecture is a simple concept: it advocates creating a system from a collection of small, isolated services, each of which owns their data, and is independently isolated, scalable and resilient to failure.”

I am not saying that the definition is perfect however I liked it because it focuses on scalability and resilience. The last few services I built lately had to be able to handle a high load. This meant that the service had to be scalable and resilient. We build the APIs on the Openshift platform using apache camel (Fuse) as framework. Because of the stateless implementation and the ability of Openshift to ramp up containers with great ease it scales perfectly. However the problem of scalability and responsiveness under high load got me looking for other ways of building similar interfaces. While doing my research I found the term “reactive microservices” and “reactive programming”.

Reactive Programming

What is reactive programming? I found a nice page explaining “reactive programming” to my liking: link

In short it states that:

“Reactive programming is programming with asynchronous data streams.”

This is not something new, it can be done with a Hub and Spoke system in JMS. You subscribe to a queue and you “react” as soon as some message has been placed on the queue. However reactive programming goes further than that. It sees everything as a stream of data. From the call to a database to the simple retrieval of a value from a property file. All is handled as a stream of data. This way of programming has it pros and cons. In post I try to let you see why it is beneficial for micro services.

One of the first hits I came across while looking for a good definition / explanation of reactive programming was the reactive manifesto. Although I find that it is more targeted at reactive systems it is a nice place to start when first looking into “Reactive” as the manifesto simplifies the concept and thus the explanation a lot.

Reactive manifesto

According to the reactive manifesto reactive systems are Responsive, Resilient, Message Driven and Elastic. A reactive service is in essence a reactive system and it must adhere to the same principles.



A reactive service is always responsive. This means that a service must always give a response, no matter what the state of the service is. It shouldn’t be possible for a system to give a time out or not respond at all. This allows dependent system to react accordingly in a timely fashion.

Being responsive even when the service is failing means that the service needs to be resilient. Resilient to failure and thus change. Errors should be handled as fast and thus close as possible to the source. The service should also be able to restore itself. For example, when a database connection is no longer available the system should still be responding. When the connection is restored the service should immediately function properly again.

A reactive service should be able to handle a varying load. This means that it should be able to scale up or down given the amount of requests being sent at a given point. By definition the service should be able to scale by getting more resources or it should be possible to run multiple instances of the same service. The service should be elastic when it comes to handling the load.

Reactive services are message or data driven, by passing messages between the components of a service you can guarantee loose coupling and isolation. Both convenient when you need to scale out the application.

So how does it all translate to a reactive microservice, and reactive programming.
In order to explain it properly I am simplifying the concept a bit. The most important part of reactive programming is that everything is a stream of data. All data that is being received is received as a stream of data. This data can be anything and like a stream it is not pulled but pushed. It does not matter if something is only done once, it is still a stream of data. The subscriber of the stream decides for itself what to do with the “event / message”. It can alter the data, call a new service or just forwards it to the requester.

Because the subscriber is decoupled from the source of the stream one can subscribe more than once. This means that you can easily scale up your subscribers if needed. And an important part in this, is that all messages are processed in a non-blocking fashion. Retrieving the data and processing it is decoupled from each other so the one cannot block the other. This means that by default your transactions are handled in an asynchronous way.

That being sad when a reactive microservice calls a backend service it does not wait for the result, it does not block any further calls, it simply subscribes to the result and continuous processing other requests. As soon as the result comes in via the stream it processes that result and returns it back to the actual client.

All this means that building a micro service conform the reactive principles means that you build a scalable and resilient service.

Reactive components

These days there are a lot of libraries, tool-kits, programming languages that support reactive programming. Some of the more populars once are:

  • Vert.x;
  • Ratpack;
  • ReactiveX (with libraries for a multitude of languages like, .net java, javascript etc.);
  • Akka Streams;
  • React;
  • etc.

These frameworks / toolkists can be used to create reactive microservices. The ones that interested me the most were ReactiveX and Vert.X.


First off, Vert.x. According to the website Vert.x is event driven and non-blocking. It is a toolkit which developers can use to implement network driven applications. You can create a complete application with Vert.x or only use portions of it. But how does this work?

The smallest unit of deployment of Vert.x is called a verticle. A verticle is a piece of code that processes events. A Vert.x application consists out of one or more of these verticles. A single verticle receives a specific type of event and processes this event. An event can be based on almost anything, a network buffer, a timer, an incoming rest call or a message from another verticle.

In short a Vert.x application consists out of verticles that process events. This makes a Vert.x application event or message driven. These verticles run on an event loop. This means that the events are processed on that event loop.


When starting the application Vert.x creates several event loops (the default is 2 per cpu core). A event loop runs in a single thread. This means that when an event is taking too long to being processed it is blocking the loop. This will result in delaying or even blocking the processing of all other events. When you do block the event loop the application will slow down significantly or in worst case scenario it will stop working correctly. That is why the most important reason that the golden rule of Vert.x is: “Don’t block the event loop”. When an verticle takes up to much time of the event loop, Vert.x will automatically log this as an error.



ReactiveX is an API that has been implemented in several programming languages, one of them being Java (RxJava). Several reactive libraries have some form of support for or integration with reactiveX. Vert.x is one of the libraries that has integrated reactiveX.

ReactiveX implements the observer pattern, this is a pattern in which an object (in reactive the observable) “observes” a stream of data and distributes it among its subscribers (can be zero or more). A observable can emit three types of events:

  • onNext;
  • onCompletion;
  • onError.

onCompletion and onError are emitted only once or never. Once they have been emitted the observable is closed. An onNext event, the actual data you are interested in, can be emitted N times or never. This means that when subscribing to an observable you do not know if you ever get your onNext (data) event. You might as well receive an onError or onCompletion event before you ever receive the onNext event. This means that your code must act accordingly. It must be able to handle any of the three even types and act accordingly.

An observable can be based on anything, from a simple list to a service call to a remote rest API or a database call. By implementing the observable pattern you decouple the call from the response. This means that no thread will be explicitly blocked when it is waiting for a response. When you implement a micro service according to this principle you can create a highly flexible and responsive service. Even under high load the API will be able to handle your calls, if the backend however is not responding you might not like the answer.

In this blogpost I tried to explain Reactive microservices and how you can implement them on a very high overview. In the coming blogposts I will try to show you with small code examples how you can actually implement them.



Part 1 Creating a RestFull Interface in Vert.x

In this series of blog posts I will be explaining how you can create a reactive microservice using Vert.x and ReactiveX. With these types of posts, it is common to use a API that has some fictitious purpose. Because I worked on the Flight APIs of Schiphol (Dutch airport) and the required APIs are free. I will be creating a simple API that determines at what time and how you need to go to Schiphil to not miss your flight.

These days creating a rest interface in Java is easy, a lot of frameworks exist that do a lot of boiler plaiting for you. Vert.x is just like that, creating a rest interface in Vert.x is easy. It can be done with just a couple lines of code.

As mentioned in the article (link) a Vert.x application exists out of one or more verticles. These verticles are deployed on the event loop of vert.x. So without further ado let’s get cracking.

First, we need to create a Verticle that implements a rest interface. Vert.x has a need library for that so besides the normal Vert.x dependencies we must add the vert.x web dependency as well.

    <vertx.version><Latest and greatest></vertx.version> 



So, we have to dependencies, the vertx-core and vertx-web. These are all the dependencies we need for now.

You can create a verticle by implementing the Verticle interface. The easiest way to do this is by extending the AbstractVerticle class. There are several other verticle types, but the AbstractVerticle will do for our rest service.

The AbstractVerticle contains the start method which you need to implement. In this method you dictate what the verticle needs to do on startup. For our rest verticle this means that we need to define that the verticle is going to start a HTTP server with some routing options.

public class VertxRestService extends AbstractVerticle { 

  public void start(Future future) { 

          .listen(8080, httpServerAsyncResult -> { 
              if (httpServerAsyncResult.succeeded()) { 
              } else { 





In the example above, I created a http server that listens to the port 8080. When this verticle is deployed it will start listening on that port. You will also notice that Vert.x uses a Future class in the signature.

Because everything is loosly coupled in Vert.x, even the deployment of the verticles itself you do not know when something is completed / deployed. However, in some scenarios you want to control what is being started when. Sometimes you want to wait for the deployment to be finished. For this the Future.class exists. You can start a verticle with the Future class in the signature (as seen in the example). When deploying the verticle the deployment is only done when the future.complete() method is called. In this case, when the httpserver has been started.

The second part of creating a Rest interface is the routing part. When creating a rest service, you must define the methods that the service offers, the GET, POST, PUT, etc methods. For this you need to implement the vert.x Router class.

private Router createRouter() { 

        Router router = Router.router(vertx); 
        router.get("/flight/:id/car").handler(routingContext -> { 
            routingContext.response().end("Here be dragons!"); 
        router.get("/flight/:id/train").handler(routingContext -> { 
            routingContext.response().end("Here be dragons!"); 
        return router; 

This service supports 2 methods, it can calculate the time you need to leave when traveling by car or by train.

Now we are almost done, we have all that is required to run a rest service in Vert.x. The only thing that leaves us is the deployment.

Deploying and starting a Vert.x application is easy. The way I find the easiest is by using a main method that deploys the verticles.

public class VertxMain  { 

    private static final Logger logger = LogManager.getLogger(VertxMain.class); 

    public static void main(String... args){ 
        VertxOptions options = new VertxOptions(); 
        Vertx vertx = Vertx.vertx(options); 
        Consumer runner = clazz -> { 
            vertx.deployVerticle(clazz.getName(), stringAsyncResult -> { 
                if (stringAsyncResult.succeeded()){ 
                    logger.log(Level.INFO, "Succesfully deployed " + clazz.getSimpleName()); 
                } else { 
                    logger.log(Level.ERROR, "Failed to deploy" + stringAsyncResult.cause()); 

        Class[] clazzes = {VertxRestService.class}; 



The main method first instantiates an instance of Vertx. This is the core component and the start of every vertx application. You can instantiate the Vertx object in two ways. The regular and clustered. This means that you can create a clustered vertx application for extra flexibility and scalability. With the VertxOptions object you can configure your application.

A verticle can be deployed by calling the vertx.deployVerticle method. In the example I pass the class and a async result handler. With the result handler you can verify if the verticle has been deployed sucesfully. This is done with the above mentioned Future object. Each verticle that you create needs to be deployed in the Vertx instance.

That is it, we have our verticle with the rest interface and the httpserver. We have a main method that can deploy the verticle. The only thing we need to do is run the main method. This can ofcourse be done by creating the jar and running it in your JVM. But you can also run it from your IDE. once it is started you can test your rest interface by calling the get method.

The sources of this post can be found in my git repository here

WELD-001409 – Ambiguous dependencies

Weld/CDI resolves bean injection mainly based on type or name. This is sufficient for most cases and works like a charm. You can either specify a name yourself or inject it by simply letting CDI resolve the type for you. The following snippet shows how easy it is.

public class SomeClass implements SomeInterface {
 //Your class implementation

public class YourImplementation {

 SomeInterface someClass;
 SomeClass someClass;

Now in my case I had multiple classes that all implemented the same signature. Based on the name I wanted to inject one or another class. However, when letting CDI resolve the injection by type or name I got an error stating that the bean could not be injected because multiple options exist. The error message looked something like this:

org.jboss.weld.exceptions.DeploymentException: WELD-001409: Ambiguous dependencies for type AuthorizationInterface with qualifiers @Default
  at injection point [BackedAnnotatedParameter] Parameter 1 of [BackedAnnotatedConstructor] @Inject @Named public[SomeInterface]
  at nl.janssen.theClassInWhichYouTryTheInjection
  Possible dependencies: 
  - Managed Bean [ class nl.janssen.interfaceBasedClassOne] with qualifiers [@Default @Named @Any],
  - Managed Bean [ class nl.janssen.interfaceBasedClassTwo] with qualifiers [@Default @Named @Any],

It states that CDI cannot inject the class because it has more then one dependency. It cannot decide which one to pick. The solution is to create your own qualifiers. CDI already has some default qualifiers (default, any etc.) which you can see in the error but they are not sufficient in this case. You have to create a custom qualifier for each implementation of the interface that you want to inject. With these qualifiers you can tell CDI that you want inject a bean of type X and it has to have a qualifier y. By adding a custom qualifier to each interfaced object I can tell CDI to inject a specific class. A custom qualifier looks like this:

import javax.inject.Qualifier;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

public @interface One {

The @Target indicates the location in which you can use the qualifier. This qualifier you can use when injecting a parameter or a type.

Now you have to specify your implementations with the correct qualifiers:

import javax.enterprise.inject.Default;

//Interface implementation with the custom qualifiers

public class SomeClassOne implements SomeInterface {
 //Your class implementation

public class SomeClassTwo implements SomeInterface {
 //Your class implementation

//Injection of the 
public class YourImplementation {

 SomeInterface classOne;

 SomeInterface classTwo;
 SomeInterface classTwoIsDefault;

CDI will now resolve the correct implementation and no longer complain about ambiguous dependencies. When you do not provide a qualifier CDI will raise the same ambiguous error again. This because it doesn’t know which implementation is the default. You can specify this by adding the “@Default” annotation to one of your classes.

Camel and Xpath 2.0

Camel offers you to create predicates or expressions using XPath. This can be quiet convenient when you want to route your message based on the content of an XML message or extract a value from a XML payload. XPath itself is a powerful and useful tool that allows you to make complex queries with XML. Camel uses the default XPath package that comes with Java. Unfortunately it implements the XPath 1.0 version and not the 2.0 version. This means that you miss some powerful functions.

Luckily it is easy to load the Saxon XPath factory which contain the XPath 2.0+ functionality. First add the correct dependency:


After this you have to add the following to your route builder class:

import net.sf.saxon.xpath.*;


XPathFactory fac = new XPathFactoryImpl();

            .xpath("/service/service[lower-case(name) = 'lowercasename']/id", String.class);

Start your route and you will see a log statement saying that the Saxon XPathFactory is being used and you won’t get an exception stating that the lower-case function does not exists.

Enable CORS with Camel Rest DSL

When creating a public rest API you have to take into account how and who is using your API. This is not only important in how you structure your API but also how you expose your API. One of the issues I’ve faced with hosting a public API is CORS. Users of the API got the following error, or a variation of, it when calling the API from javascript.

Cross-Origin Request Blocked: ….

In short, it means that you have to enable CORS or “Cross-origin resource sharing”. When a website wants to access a public API that is hosted on a different domain it is normally blocked by the “Same-origin policy”. This policy, implemented by your browser blocks any call that is not in the same domain (hostname and port) as the website that is shown in your browser. For APIs that belong to the website this is normally not a real problem because you can simply serve the API via the same domain. However with public APIs this is not possible. With CORS your browser allows you to make cross domain requests.

When a browser has to get data from a public API that is hosted on a different domain it uses CORS to checks if the request is allowed. The browser sends an OPTIONS request to the API stating it wants to make a request. Part of the request are the custom headers you have specified for the call. The public API has to accept this OPTIONS request. After the request has been accepted the browser will execute the actual request.

As the rest API must accept the request it means that your API has to contain some configuration that handles the CORS requests. With the camel rest DSL this is relatively easy. First you have to enable the CORS, for simple requests this is enough. I however had to allow some custom headers as well. This meant that I had to white list my custom headers.

        .corsHeaderProperty("Access-Control-Allow-Headers", "Origin, Accept, X-Requested-With, Content-Type, Access-Control-Request-Method, Access-Control-Request-Headers,CustomHeader1, CustomHeader2")

ActiveMQ connection pooling using Weld

Last week I, again, had to create a connection pool for a camel route that was sending messages to an ActiveMQ broker. This time I had to specify it in a project that uses Weld as an injection framework. I based my configuration on the ActiveMQ documentation.

The amount of connections and concurrent consumers is based on the standard documentation. The documentation states that you can easily set those numbers without any major impact. Increasing the amount is of course possible, it will however have impact on your memory usage. Before changing it make sure it is required for your application. I decided to go for the “standard” settings and change it whenever the application usage exceeds my expectations.

The documentation also states that a connectionFactory needs to be started and stopped for it to work correctly. Because I was using CDI I had to look for a solution to do this on startup / shutdown of the application. I found the “Initialized/Destroyed” annotations in combination with the “Observes” annotation. The “Observer” will make sure that the method is called whenever the the application scope is ether initialized or destroyed.


import io.fabric8.annotations.ServiceName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.apache.camel.component.jms.JmsConfiguration;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.Destroyed;
import javax.enterprise.context.Initialized;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Produces;
import javax.inject.Named;

public class ActiveMQComponentFactory {

  public void init(@Observes @Initialized(ApplicationScoped.class) Object init, @Named("pooledConnectionFactory") PooledConnectionFactory pooledConnectionFactory){

  public void destroy(@Observes @Destroyed(ApplicationScoped.class) Object init, @Named("pooledConnectionFactory") PooledConnectionFactory pooledConnectionFactory){

  public ActiveMQConnectionFactory jmsConnectionFactory(){
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");

    return factory;

  public PooledConnectionFactory createPooledConnectionFactory(@Named("connectionFactory") ActiveMQConnectionFactory factory){
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
    return pooledConnectionFactory;

  public JmsConfiguration createJmsConfiguration(@Named("pooledConnectionFactory") PooledConnectionFactory pooledConnectionFactory){
    JmsConfiguration jmsConfiguration =  new JmsConfiguration();
    return  jmsConfiguration;

  public ActiveMQComponent createActiveMQComponent(@Named("jmsConfiguration") JmsConfiguration jmsConfiguration){
    ActiveMQComponent component = new ActiveMQComponent();
    return component;


And of course to use the connection pool in your route you have to refer to the ActiveMQComponent. In my case I refered to the alias “jms”.

public class IndexerApi extends RouteBuilder {

  public void configure() throws Exception {


DeltaSpike with CDI and Camel

Last week I was looking for a easy way to inject property values into my code. In my project I was using Camel in combination with Weld (CDI). One of the main requirements was that properties could be injected from multiple “locations”. From a property file or a system / environment property.

Because we are using Openshift to run our applications it is easy to use environment variables to inject your configuration. I did not want to write my own code to resolve the properties so I started looking. Eventually I found DeltaSpike, with DeltaSpike you can easily inject properties in your code and it works seamlessly with CDI.

You can inject properties from the following four locations:

  • System properties (ordinal = 400)
  • Environment properties (ordinal = 300)
  • JNDI values (ordinal = 200, the base name is “java:comp/env/deltaspike/”)
  • Properties file values ( (ordinal = 100, default filename is “META-INF/”)

When the same property is configured in two or more locations the value of the highest ordinal is used. So System properties will overwrite any configuration done in the property file.

Because I did not want to use the default property filename I implemented the following class in order to load my own property file:

import org.apache.deltaspike.core.api.config.PropertyFileConfig;

import javax.inject.Named;

public class MyCustomPropertyConfig implements PropertyFileConfig {

    @Override public String getPropertyFileName() {
      return "";

    @Override public boolean isOptional() {
      return true;

There are three ways to load a property in your class.

  • ConfigResolver methods for easy programmatic access to values
  • TypedResolver API for typed configuration values and precise control over resolution
  • @ConfigProperty for injection of configured values into beans
  • interface based configuration

When using DeltaSpike in combination with CDI you can inject property values in a similar way you are used to when injecting beans. For this you can use the “@ConfigProperty” annotation:

@ConfigProperty(name = "loglevel", defaultValue = "INFO")
  private String logLevel;

Custom (un)Marshalling with the Camel Rest DSL

Sometimes you want to use a custom marshaller to marshal or unmarshal your messages. In our case we needed to parse all date fields to a specific format when marshalling our output message. We noticed that we could set a custom jsonDataFormat or a custom xmlDataFormat. These options allow you to specify your own DataFormat class, in essence your customized marshaller.

We needed to send out XML or JSON, depending on the clients request (http header: Accept). Because you implement the same interface I will show only the JSON DataFormat. The dataFormat options expects a class that implements the “org.apache.camel.spi.DataFormat” interface. The interface defines two methods, marshal and unmarshal. When you only use the DataFormat for outgoing messages implementing the marshal method is sufficient.

public class JsonDataFormat implements DataFormat {

    private final ObjectMapper jacksonMapper;

    public JsonDataFormat() {
        jacksonMapper = new ObjectMapper();

    public void marshal(Exchange exchange, Object obj, OutputStream stream) throws Exception {

        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss");

        if (view != null) {
            ObjectWriter w = jacksonMapper.writerWithView(view);
            w.writeValue(stream, obj);
        } else {

    public Object unmarshal(Exchange exchange, InputStream stream) throws Exception {
        return null;

Now you cannot simply add the class as a bean and refer to it. You need to add it as a DataFormat to the camel context. This can be done by adding the following file to your META-INF directory: in the classpath: add a file which contains the following line:


The last thing you need to do is tell your restConfiguration that you want to use your dataFormat. When setting the dataFormat you have to use the file name you specified in the above mentioned classpath. In my case JsonFormat.


If you want a custom xmlDataFormat you simply have to implement the DataFormat interface again but now with an XML (un)marshalling implementation.

Alternative class in Weld

In a Camel project with Weld I faced an issue while trying to unit test a route that called a remote system (Elasticsearch). This was done with the help of a custom bean. The bean handles the connection and the actual calls to the system. I did not want to call the actual system during my Unit Test, so I created a stub. However, to create the context and start the route I used CamelCdiRunner. This meant that the original client bean was being loaded. So I had to replace the actual bean with my stub. Luckily Weld supports the use of alternative beans. In essence you can replace a bean with an alternative, in my case a stub.

The first thing you need to do is to make sure that your class is based on a interface. This enables Weld to successfully replace your bean with a alternative. Inject the bean based on the interface and not on the class itself.

ElasticsearchCamelClientInterface client;

Next create your mock and make sure that it implements the interface of the bean you want to replace. Add the Alternative annotation.

public class MockElasticsearchCamelClient implements ElasticsearchCamelClientInterface {

  public MockElasticsearchCamelClient() {

  Stub code.....

Most of the documentation I found shows you how to use an alternative bean using the beans.xml file. However it is not possible to simply add a beans.xml file to the test resources. You can only specify one for your project. You could replace it dynamically with your build tool. This however is, imho, not a nice solution. Luckily you can also specify an alternative class in your test class. By adding the “@Beans” annotation. This allows you to specify one or more alternative beans.

  alternatives = {MockElasticsearchCamelClient.class}
public class RouteTest { ...

The alternatives indicates that you want to run the CamelCdiRunner with the stub. When you now start the Unit Test you will see that both beans are being loaded but that the mock is used to run your tests.

Happy testing!