Reactive Micro Services

Microservices are a hot topic these days. A lot has been written about them and a lot of people and companies have started building them. Some say they the concept is nothing new and I have yet to find a clear and widely accepted definition of a microservice. The one I like I have found in a book written by Jonas Bonér (link):

“Microservices-Based Architecture is a simple concept: it advocates creating a system from a collection of small, isolated services, each of which owns their data, and is independently isolated, scalable and resilient to failure.”

I am not saying that the definition is perfect however I liked it because it focuses on scalability and resilience. The last few services I built lately had to be able to handle a high load. This meant that the service had to be scalable and resilient. We build the APIs on the Openshift platform using apache camel (Fuse) as framework. Because of the stateless implementation and the ability of Openshift to ramp up containers with great ease it scales perfectly. However the problem of scalability and responsiveness under high load got me looking for other ways of building similar interfaces. While doing my research I found the term “reactive microservices” and “reactive programming”.

Reactive Programming

What is reactive programming? I found a nice page explaining “reactive programming” to my liking: link

In short it states that:

“Reactive programming is programming with asynchronous data streams.”

This is not something new, it can be done with a Hub and Spoke system in JMS. You subscribe to a queue and you “react” as soon as some message has been placed on the queue. However reactive programming goes further than that. It sees everything as a stream of data. From the call to a database to the simple retrieval of a value from a property file. All is handled as a stream of data. This way of programming has it pros and cons. In post I try to let you see why it is beneficial for micro services.

One of the first hits I came across while looking for a good definition / explanation of reactive programming was the reactive manifesto. Although I find that it is more targeted at reactive systems it is a nice place to start when first looking into “Reactive” as the manifesto simplifies the concept and thus the explanation a lot.

Reactive manifesto

According to the reactive manifesto reactive systems are Responsive, Resilient, Message Driven and Elastic. A reactive service is in essence a reactive system and it must adhere to the same principles.

 

reactive-traits

A reactive service is always responsive. This means that a service must always give a response, no matter what the state of the service is. It shouldn’t be possible for a system to give a time out or not respond at all. This allows dependent system to react accordingly in a timely fashion.

Being responsive even when the service is failing means that the service needs to be resilient. Resilient to failure and thus change. Errors should be handled as fast and thus close as possible to the source. The service should also be able to restore itself. For example, when a database connection is no longer available the system should still be responding. When the connection is restored the service should immediately function properly again.

A reactive service should be able to handle a varying load. This means that it should be able to scale up or down given the amount of requests being sent at a given point. By definition the service should be able to scale by getting more resources or it should be possible to run multiple instances of the same service. The service should be elastic when it comes to handling the load.

Reactive services are message or data driven, by passing messages between the components of a service you can guarantee loose coupling and isolation. Both convenient when you need to scale out the application.

So how does it all translate to a reactive microservice, and reactive programming.
In order to explain it properly I am simplifying the concept a bit. The most important part of reactive programming is that everything is a stream of data. All data that is being received is received as a stream of data. This data can be anything and like a stream it is not pulled but pushed. It does not matter if something is only done once, it is still a stream of data. The subscriber of the stream decides for itself what to do with the “event / message”. It can alter the data, call a new service or just forwards it to the requester.

Because the subscriber is decoupled from the source of the stream one can subscribe more than once. This means that you can easily scale up your subscribers if needed. And an important part in this, is that all messages are processed in a non-blocking fashion. Retrieving the data and processing it is decoupled from each other so the one cannot block the other. This means that by default your transactions are handled in an asynchronous way.

That being sad when a reactive microservice calls a backend service it does not wait for the result, it does not block any further calls, it simply subscribes to the result and continuous processing other requests. As soon as the result comes in via the stream it processes that result and returns it back to the actual client.

All this means that building a micro service conform the reactive principles means that you build a scalable and resilient service.

Reactive components

These days there are a lot of libraries, tool-kits, programming languages that support reactive programming. Some of the more populars once are:

  • Vert.x;
  • Ratpack;
  • ReactiveX (with libraries for a multitude of languages like, .net java, javascript etc.);
  • Akka Streams;
  • React;
  • etc.

These frameworks / toolkists can be used to create reactive microservices. The ones that interested me the most were ReactiveX and Vert.X.

Vert.x

First off, Vert.x. According to the website Vert.x is event driven and non-blocking. It is a toolkit which developers can use to implement network driven applications. You can create a complete application with Vert.x or only use portions of it. But how does this work?

The smallest unit of deployment of Vert.x is called a verticle. A verticle is a piece of code that processes events. A Vert.x application consists out of one or more of these verticles. A single verticle receives a specific type of event and processes this event. An event can be based on almost anything, a network buffer, a timer, an incoming rest call or a message from another verticle.

In short a Vert.x application consists out of verticles that process events. This makes a Vert.x application event or message driven. These verticles run on an event loop. This means that the events are processed on that event loop.

Vert.X-event-loop

When starting the application Vert.x creates several event loops (the default is 2 per cpu core). A event loop runs in a single thread. This means that when an event is taking too long to being processed it is blocking the loop. This will result in delaying or even blocking the processing of all other events. When you do block the event loop the application will slow down significantly or in worst case scenario it will stop working correctly. That is why the most important reason that the golden rule of Vert.x is: “Don’t block the event loop”. When an verticle takes up to much time of the event loop, Vert.x will automatically log this as an error.

ReactiveX

reactivex_logo

ReactiveX is an API that has been implemented in several programming languages, one of them being Java (RxJava). Several reactive libraries have some form of support for or integration with reactiveX. Vert.x is one of the libraries that has integrated reactiveX.

ReactiveX implements the observer pattern, this is a pattern in which an object (in reactive the observable) “observes” a stream of data and distributes it among its subscribers (can be zero or more). A observable can emit three types of events:

  • onNext;
  • onCompletion;
  • onError.

onCompletion and onError are emitted only once or never. Once they have been emitted the observable is closed. An onNext event, the actual data you are interested in, can be emitted N times or never. This means that when subscribing to an observable you do not know if you ever get your onNext (data) event. You might as well receive an onError or onCompletion event before you ever receive the onNext event. This means that your code must act accordingly. It must be able to handle any of the three even types and act accordingly.

An observable can be based on anything, from a simple list to a service call to a remote rest API or a database call. By implementing the observable pattern you decouple the call from the response. This means that no thread will be explicitly blocked when it is waiting for a response. When you implement a micro service according to this principle you can create a highly flexible and responsive service. Even under high load the API will be able to handle your calls, if the backend however is not responding you might not like the answer.

In this blogpost I tried to explain Reactive microservices and how you can implement them on a very high overview. In the coming blogposts I will try to show you with small code examples how you can actually implement them.

 

Advertisements

Part 1 Creating a RestFull Interface in Vert.x

In this series of blog posts I will be explaining how you can create a reactive microservice using Vert.x and ReactiveX. With these types of posts, it is common to use a API that has some fictitious purpose. Because I worked on the Flight APIs of Schiphol (Dutch airport) and the required APIs are free. I will be creating a simple API that determines at what time and how you need to go to Schiphil to not miss your flight.

These days creating a rest interface in Java is easy, a lot of frameworks exist that do a lot of boiler plaiting for you. Vert.x is just like that, creating a rest interface in Vert.x is easy. It can be done with just a couple lines of code.

As mentioned in the article (link) a Vert.x application exists out of one or more verticles. These verticles are deployed on the event loop of vert.x. So without further ado let’s get cracking.

First, we need to create a Verticle that implements a rest interface. Vert.x has a need library for that so besides the normal Vert.x dependencies we must add the vert.x web dependency as well.

 
<properties> 
    <vertx.version><Latest and greatest></vertx.version> 
</properties> 
 

<dependencies> 
  <dependency> 
    <groupId>io.vertx</groupId> 
    <artifactId>vertx-core</artifactId> 
    <version>${vertx.version}</version> 
  </dependency> 

  <dependency> 
    <groupId>io.vertx</groupId> 
    <artifactId>vertx-web</artifactId> 
    <version>${vertx.version}</version> 
  </dependency> 
</dependencies> 

So, we have to dependencies, the vertx-core and vertx-web. These are all the dependencies we need for now.

You can create a verticle by implementing the Verticle interface. The easiest way to do this is by extending the AbstractVerticle class. There are several other verticle types, but the AbstractVerticle will do for our rest service.

The AbstractVerticle contains the start method which you need to implement. In this method you dictate what the verticle needs to do on startup. For our rest verticle this means that we need to define that the verticle is going to start a HTTP server with some routing options.

public class VertxRestService extends AbstractVerticle { 

  @Override 
  public void start(Future future) { 

      vertx.createHttpServer() 
          .requestHandler(createRouter()::accept) 
          .listen(8080, httpServerAsyncResult -> { 
              if (httpServerAsyncResult.succeeded()) { 
                  future.complete(); 
              } else { 
                  future.fail(httpServerAsyncResult.cause()); 
          } 		

      }); 

  } 

    .... 

}   

In the example above, I created a http server that listens to the port 8080. When this verticle is deployed it will start listening on that port. You will also notice that Vert.x uses a Future class in the signature.

Because everything is loosly coupled in Vert.x, even the deployment of the verticles itself you do not know when something is completed / deployed. However, in some scenarios you want to control what is being started when. Sometimes you want to wait for the deployment to be finished. For this the Future.class exists. You can start a verticle with the Future class in the signature (as seen in the example). When deploying the verticle the deployment is only done when the future.complete() method is called. In this case, when the httpserver has been started.

The second part of creating a Rest interface is the routing part. When creating a rest service, you must define the methods that the service offers, the GET, POST, PUT, etc methods. For this you need to implement the vert.x Router class.

private Router createRouter() { 

        Router router = Router.router(vertx); 
        router.get("/flight/:id/car").handler(routingContext -> { 
            routingContext.response().end("Here be dragons!"); 
        }); 
        router.get("/flight/:id/train").handler(routingContext -> { 
            routingContext.response().end("Here be dragons!"); 
        }); 
        return router; 
} 

This service supports 2 methods, it can calculate the time you need to leave when traveling by car or by train.

Now we are almost done, we have all that is required to run a rest service in Vert.x. The only thing that leaves us is the deployment.

Deploying and starting a Vert.x application is easy. The way I find the easiest is by using a main method that deploys the verticles.

public class VertxMain  { 
 

    private static final Logger logger = LogManager.getLogger(VertxMain.class); 
 

    public static void main(String... args){ 
        VertxOptions options = new VertxOptions(); 
        Vertx vertx = Vertx.vertx(options); 
 
        Consumer runner = clazz -> { 
            vertx.deployVerticle(clazz.getName(), stringAsyncResult -> { 
                if (stringAsyncResult.succeeded()){ 
                    logger.log(Level.INFO, "Succesfully deployed " + clazz.getSimpleName()); 
                } else { 
                    logger.log(Level.ERROR, "Failed to deploy" + stringAsyncResult.cause()); 
                } 
            }); 
        }; 

        Class[] clazzes = {VertxRestService.class}; 
 
        Stream.of(clazzes).forEach(runner); 

    } 

} 

The main method first instantiates an instance of Vertx. This is the core component and the start of every vertx application. You can instantiate the Vertx object in two ways. The regular and clustered. This means that you can create a clustered vertx application for extra flexibility and scalability. With the VertxOptions object you can configure your application.

A verticle can be deployed by calling the vertx.deployVerticle method. In the example I pass the class and a async result handler. With the result handler you can verify if the verticle has been deployed sucesfully. This is done with the above mentioned Future object. Each verticle that you create needs to be deployed in the Vertx instance.

That is it, we have our verticle with the rest interface and the httpserver. We have a main method that can deploy the verticle. The only thing we need to do is run the main method. This can ofcourse be done by creating the jar and running it in your JVM. But you can also run it from your IDE. once it is started you can test your rest interface by calling the get method.

The sources of this post can be found in my git repository here

Enable CORS with Camel Rest DSL

When creating a public rest API you have to take into account how and who is using your API. This is not only important in how you structure your API but also how you expose your API. One of the issues I’ve faced with hosting a public API is CORS. Users of the API got the following error, or a variation of, it when calling the API from javascript.

Cross-Origin Request Blocked: ….

In short, it means that you have to enable CORS or “Cross-origin resource sharing”. When a website wants to access a public API that is hosted on a different domain it is normally blocked by the “Same-origin policy”. This policy, implemented by your browser blocks any call that is not in the same domain (hostname and port) as the website that is shown in your browser. For APIs that belong to the website this is normally not a real problem because you can simply serve the API via the same domain. However with public APIs this is not possible. With CORS your browser allows you to make cross domain requests.

When a browser has to get data from a public API that is hosted on a different domain it uses CORS to checks if the request is allowed. The browser sends an OPTIONS request to the API stating it wants to make a request. Part of the request are the custom headers you have specified for the call. The public API has to accept this OPTIONS request. After the request has been accepted the browser will execute the actual request.

As the rest API must accept the request it means that your API has to contain some configuration that handles the CORS requests. With the camel rest DSL this is relatively easy. First you have to enable the CORS, for simple requests this is enough. I however had to allow some custom headers as well. This meant that I had to white list my custom headers.

restConfiguration()
        .component("jetty")
        .host("0.0.0.0")
        .port(8888)
        .bindingMode(RestBindingMode.json)
        .enableCORS(true)
        .corsHeaderProperty("Access-Control-Allow-Headers", "Origin, Accept, X-Requested-With, Content-Type, Access-Control-Request-Method, Access-Control-Request-Headers,CustomHeader1, CustomHeader2")
        ;

Custom (un)Marshalling with the Camel Rest DSL

Sometimes you want to use a custom marshaller to marshal or unmarshal your messages. In our case we needed to parse all date fields to a specific format when marshalling our output message. We noticed that we could set a custom jsonDataFormat or a custom xmlDataFormat. These options allow you to specify your own DataFormat class, in essence your customized marshaller.

We needed to send out XML or JSON, depending on the clients request (http header: Accept). Because you implement the same interface I will show only the JSON DataFormat. The dataFormat options expects a class that implements the “org.apache.camel.spi.DataFormat” interface. The interface defines two methods, marshal and unmarshal. When you only use the DataFormat for outgoing messages implementing the marshal method is sufficient.

public class JsonDataFormat implements DataFormat {

    private final ObjectMapper jacksonMapper;

    public JsonDataFormat() {
        jacksonMapper = new ObjectMapper();
    }

    @Override
    public void marshal(Exchange exchange, Object obj, OutputStream stream) throws Exception {

        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss");
        jacksonMapper.setDateFormat(dateFormat);

        if (view != null) {
            ObjectWriter w = jacksonMapper.writerWithView(view);
            w.writeValue(stream, obj);
        } else {
            stream.write(jacksonMapper.writeValueAsBytes(obj));
        }
    }

    @Override
    public Object unmarshal(Exchange exchange, InputStream stream) throws Exception {
        return null;
    }

Now you cannot simply add the class as a bean and refer to it. You need to add it as a DataFormat to the camel context. This can be done by adding the following file to your META-INF directory: in the classpath: services.org.apache.camel.dataformat add a file which contains the following line:

class=nl.janssen.dataformats.JsonDataFormat

The last thing you need to do is tell your restConfiguration that you want to use your dataFormat. When setting the dataFormat you have to use the file name you specified in the above mentioned classpath. In my case JsonFormat.

restConfiguration()
      .component("jetty")
      .host("0.0.0.0")
      .port("8080")
      .bindingMode(RestBindingMode.auto)
      .xmlDataFormat("XmlFormat")
      .jsonDataFormat("JsonFormat");

If you want a custom xmlDataFormat you simply have to implement the DataFormat interface again but now with an XML (un)marshalling implementation.

Contract first Rest API

As an integration consultant I have an extensive background in web services. To be specific SOAP enabled web services. The way I used to create a service was by first defining the contract, the WSDL and the XSD files. Based on those files I could generate a service without the need for me to manually type any code.

However lately I started working with Rest services, both JSON and XML. I have written a post (Rest just got easy) on how to create a service using the rest DSL from camel. However in some situations you might want to use a JAX-RS implementation, for example using CXFRS. This means you have to first create the interface definition using POJO’s. You could do this the manual way, like me in my first couple of implementations. Or you could generate the code based on an interface design (Hooray for the lazy!). In this blog post I will show you how to generate your rest interface implementation using swagger documentation (Swagger in a nutshell).

Using a maven plugin you can easily generate the required code based on a yaml styled swagger specification file. The plugin can be found through this dependency:

<dependency>
   <groupId>io.swagger</groupId>
   <artifactId>swagger-codegen-maven-plugin</artifactId>
   <version>2.1.6</version>
</dependency>

Because I am using CXFRS I added the dependency for that as well to my pom file:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-cxf</artifactId>
</dependency>

After you have added the dependencies to the pom file you can configure the plugin.

<plugin>
   <groupId>io.swagger</groupId>
   <artifactId>swagger-codegen-maven-plugin</artifactId>
   <version>2.2.0-SNAPSHOT</version>
   <executions>
      <execution>
        <goals>
          <goal>generate</goal>
        </goals>
        <configuration>
		<!-- specify the swagger yaml -->
		<inputSpec>src/main/resources/yaml/swagger.yaml</inputSpec>
		<!-- target to generate -->
		<language>jaxrs-cxf</language>
		<apiPackage>nl.rubix.api</apiPackage>
		<modelPackage>nl.rubix.api.model</modelPackage>
		<configOptions>
		<sourceFolder>src/main/java</sourceFolder>
		</configOptions>
		</configuration>
     </execution>
   </executions>
</plugin>

The plugin has several configuration options.

  • inputSpec – OpenAPI Spec file path
  • language – target generation language
  • output – target output path (default is ${project.build.directory}/generated-sources/swagger)
  • templateDirectory – directory with mustache templates
  • addCompileSourceRoot – add the output directory to the project as a source root (true by default)
  • modelPackage – the package to use for generated model objects/classes
  • apiPackage – the package to use for generated api objects/classes
  • invokerPackage – the package to use for the generated invoker objects
  • configOptions – a map of language-specific parameters (see below)
  • configHelp – dumps the configuration help for the specified library (generates no sources)

As you can see in my example I specified the language to be “jaxrs-cxf”. Meaning that the generated sources will be specific for jaxrs-cxf. The modelPackage will contain the actual objects. If specified the plugin will add the XML declarations to the model POJO’s. The apiPackage will contain the interface specifications.

Create the yaml file on the specified location. My yaml file looks like this:

---
swagger: '2.0'
info:
  version: 0.0.0
  title: Simple API
paths:
  /:
    get:
      responses:
        200:
          description: OK

Meaning, a very simple interface that listens to the root (“/”) of servic epath and will return a http 200 if everything went as planned.

Now that the configuration is done, you can generate the sources with “mvn compile”. You will see the generated sources in the packages you specified.

If everything went correct you can now specify your service. In my example I am using blueprint in combination with the java DSL.

The blueprint definition for the cxfrs component:

<cxf:rsServer id="api" address="http://localhost:9092/test"
               serviceClass="nl.rubix.api.contractfirst.rest.DefaultApi"
               loggingFeatureEnabled="false">
   <cxf:providers>
     <bean class="org.apache.cxf.jaxrs.provider.json.JSONProvider" />
   </cxf:providers>
</cxf:rsServer>

And the route configuration:

from("cxfrs:bean:api?bindingStyle=SimpleConsumer").log("you did it!");

My example project can be found here: Download sources

Rest just got easy

As of version 2.14 camel supports an easy alternative for the implementation of rest interfaces, RedHat implemented it in Fuse 6.2 You can now use the rest DSL as an alternative for CXFRS. This means you no longer have to define the interface in a separate class. With the rest DSL you can specify the interface definition inside the route builder itself. This means that you can link your rest interface directly to your camel route implementation. No extra injection etc. is required. You can have a rest interface up and running in just a few easy steps. In this blog post I will describe these steps and some extra options you might find useful.

The code I use in the examples can be found here. I have used blueprint in combination with the java DSL.

The rest DSL is similar to the standard DSL you use when defining your camel routes. The configuration globally consists out of two factors. First there is the rest configuration.
In the configuration part you tell which Camel component you want to use to host your rest service. This component will handle the actual rest transportation. For my example I used the jetty component.

Besides the camel component you have to configure some other stuff. The most important ones are listed below:
• host – the location on which the endpoint needs to be hosted;
• port – the port on which the endpoint is hosted;
• bindingmode – dictates the way the input of the rest calls is binded / mapped. Here you can specify that a post method with a specific type must be cast to the corresponding pojo. In my example I set it to JSON. This means that the JSON input is mapped to the pojo automatically.

restConfiguration()
	.component("jetty")
	.host("localhost")
	.port(9091)
	.dataFormatProperty("prettyPrint", "true")
	.bindingMode(RestBindingMode.json);

The second part of the rest configuration is the interface definition. Here you will see the biggest advantage when using the rest DSL.

Normally you start a camel route with a simple “from()”, using the rest DSL you start with “rest()”.

The first thing you can set in the interface is the path on which your interface can be found. After that you can define your methods. The rest DSL supports the standard rest methods (post, get, delete etc.)

rest("/restDsl/")
	.get("/getOperation/{id}").id("getOperationRest").to("direct:getOperation")
	.post("/postOperatoin").id("postOperationRest").type(OrderInputPojo.class).to("direct:postOperation");

from("direct:getOperation").routeId("getOperation")
	.log("body: ${header.id}"
	.processRef("orderInputProcessor");

When defining a post operation you have the possibility to define the input type. By setting the POJO as the input type you can automatically cast (bindingmode) the input (json or XML) to the POJO class.

When defining an operation you have two options. You can either use the route functionality inside your rest definition or you can use the to() method to call another route. By calling another route you separate your interface definition from the orchestration which keeps your code clean.

References:
Camel RestDSL documentation

Removing camel HTTP Headers

When trying to invoke a json rest service from a route in my CXF soap web service using HTTP4. I got the following error:

“If you are forwarding/bridging http endpoints, then enable the bridgeEndpoint option on the endpoint:”

It appeared that camel was trying to send more information then required. This because CXF uses / generates some HTTP headers. These headers are fine for CXF but when using HTTP4 it gave me the error.

In order to solve this I stumbled upon this post of RedHat.

By simply removing all the camel HTTP headers I was able to successfully call my json service. You can remove the headers with the following statement in your route:

.removeHeaders(“CamelHttp*”)