Kafka with Camel

Kafka is a populair opensource, distributed streaming, platform which prides itself on key features as fault-tolerance and replay options. My colleague Jan van Zoggel wrote a nice “getting started” blog post about kafka which can be found here.

In this blogpost I will show you, in some very easy steps, how you can start producing and consuming kafka messages with apache Camel.

First of all you need to install Kafka itself (https://kafka.apache.org/quickstart). After you have started the kafka service you need to create a topic.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic theBigKafkaTestTopic

When the topic is created you can start producing messages on the topic. Add the dependency to your pom file.

<dependency>
  <groupId>org.apache.camel</groupId>
  <artifactId>camel-kafka</artifactId>
</dependency>

The following code snippet shows how to send a message to a kafka topic. Note that a header is set (KafkaConstants.KEY) this is the unique identifier for the kafka message.

from("timer:trigger")
 .transform().simple("ref:myBean")
 .setHeader(KafkaConstants.KEY,simple("bean:generateUUID?method=getGuid") )
 .log("${header.kafka.KEY}")
 .to("kafka:localhost:9092?topic=theBigKafkaTestTopic");

Consuming can be done by adding the configuration url to your from component.


from("kafka:localhost:9092?topic=theBigKafkaTestTopic&groupId=testing&autoOffsetReset=earliest&consumersCount=1")
 .log("${body}")
 .end();

Advertisements

Contract first Rest API

As an integration consultant I have an extensive background in web services. To be specific SOAP enabled web services. The way I used to create a service was by first defining the contract, the WSDL and the XSD files. Based on those files I could generate a service without the need for me to manually type any code.

However lately I started working with Rest services, both JSON and XML. I have written a post (Rest just got easy) on how to create a service using the rest DSL from camel. However in some situations you might want to use a JAX-RS implementation, for example using CXFRS. This means you have to first create the interface definition using POJO’s. You could do this the manual way, like me in my first couple of implementations. Or you could generate the code based on an interface design (Hooray for the lazy!). In this blog post I will show you how to generate your rest interface implementation using swagger documentation (Swagger in a nutshell).

Using a maven plugin you can easily generate the required code based on a yaml styled swagger specification file. The plugin can be found through this dependency:

<dependency>
   <groupId>io.swagger</groupId>
   <artifactId>swagger-codegen-maven-plugin</artifactId>
   <version>2.1.6</version>
</dependency>

Because I am using CXFRS I added the dependency for that as well to my pom file:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-cxf</artifactId>
</dependency>

After you have added the dependencies to the pom file you can configure the plugin.

<plugin>
   <groupId>io.swagger</groupId>
   <artifactId>swagger-codegen-maven-plugin</artifactId>
   <version>2.2.0-SNAPSHOT</version>
   <executions>
      <execution>
        <goals>
          <goal>generate</goal>
        </goals>
        <configuration>
		<!-- specify the swagger yaml -->
		<inputSpec>src/main/resources/yaml/swagger.yaml</inputSpec>
		<!-- target to generate -->
		<language>jaxrs-cxf</language>
		<apiPackage>nl.rubix.api</apiPackage>
		<modelPackage>nl.rubix.api.model</modelPackage>
		<configOptions>
		<sourceFolder>src/main/java</sourceFolder>
		</configOptions>
		</configuration>
     </execution>
   </executions>
</plugin>

The plugin has several configuration options.

  • inputSpec – OpenAPI Spec file path
  • language – target generation language
  • output – target output path (default is ${project.build.directory}/generated-sources/swagger)
  • templateDirectory – directory with mustache templates
  • addCompileSourceRoot – add the output directory to the project as a source root (true by default)
  • modelPackage – the package to use for generated model objects/classes
  • apiPackage – the package to use for generated api objects/classes
  • invokerPackage – the package to use for the generated invoker objects
  • configOptions – a map of language-specific parameters (see below)
  • configHelp – dumps the configuration help for the specified library (generates no sources)

As you can see in my example I specified the language to be “jaxrs-cxf”. Meaning that the generated sources will be specific for jaxrs-cxf. The modelPackage will contain the actual objects. If specified the plugin will add the XML declarations to the model POJO’s. The apiPackage will contain the interface specifications.

Create the yaml file on the specified location. My yaml file looks like this:

---
swagger: '2.0'
info:
  version: 0.0.0
  title: Simple API
paths:
  /:
    get:
      responses:
        200:
          description: OK

Meaning, a very simple interface that listens to the root (“/”) of servic epath and will return a http 200 if everything went as planned.

Now that the configuration is done, you can generate the sources with “mvn compile”. You will see the generated sources in the packages you specified.

If everything went correct you can now specify your service. In my example I am using blueprint in combination with the java DSL.

The blueprint definition for the cxfrs component:

<cxf:rsServer id="api" address="http://localhost:9092/test"
               serviceClass="nl.rubix.api.contractfirst.rest.DefaultApi"
               loggingFeatureEnabled="false">
   <cxf:providers>
     <bean class="org.apache.cxf.jaxrs.provider.json.JSONProvider" />
   </cxf:providers>
</cxf:rsServer>

And the route configuration:

from("cxfrs:bean:api?bindingStyle=SimpleConsumer").log("you did it!");

My example project can be found here: Download sources

Basic authentication in Camel

I need to call a webservice that uses basic authentication. All the solutions and suggestions I could find via google where ether complicated or a lot of work. Until I looked at the CXF manual (http://camel.apache.org/cxf.html). As of version 2.12.3, you can do it by simply adding the username and password to your endpoint in your camel route.

.to("cxf:bean:myCxfEndpoint?username=<username>&password=<password>")

JBoss Fuse on a raspberry pi 3

I was looking for a small server to run my personal fuse installation on. After a quick search I found the raspberry Pi as a likely candidate. A cheap and energy efficient device. Although it runs on an ARM processor you can run a (ARM) JDK on it. Which allows you to run a Jboss server and thus a Fuse server.

First install an operating system on your Pi. I chose Raspbian (lite), because of its simplicity and the fact that is a Debian distro, which I like. You can get the latest here. Installing it is simple and the site provides a good installation guide.

After the installation you want to add an usb drive or an external HDD. The SD is required for the installation of your OS but it is not wise to install the rest of your software on the SD card. More information about this can be found here. Make sure that your drive is correctly formatted (for example ext4). Check the location of your drive and (auto) mount it via editing fstab.

sudo fdisk –l 
sudo vi /etc/fstab
/dev/sda1		/apps		ext4	user,exec	0	1

After rebooting your pi confirm that the mount has succeeded. When you run “mount -l” you should see the “/dev/sda1/” mounted on the “/apps” location. Change the access rights to the directory with “sudo chown pi:pi /apps”. Note that if you made a mistake you can no longer reach your pi with ssh. It will be started in “safe mode”. You have to access the pi directly and fix the problem.

Now you are all set up to install the JDK and Fuse. Because the raspberry pi has an ARM processer you cannot install the usual JDK. You have to install one for the ARM processor, you can find the installer here.

The installation is quite straightforward, after you have unzipped the JDK you have to set the java version. This can be done in the .profile file.

vi ~/.profile
export JAVA_HOME=&lt;locatie&gt;
export PATH=$PATH:$JAVA_HOME/bin

Conferm that java is working (java –version).

Install the fuse server (installation guide).

The last step is simple, start the fuse server. Mind that it will take a while before it is started, especially the first time.

Rest just got easy

As of version 2.14 camel supports an easy alternative for the implementation of rest interfaces, RedHat implemented it in Fuse 6.2 You can now use the rest DSL as an alternative for CXFRS. This means you no longer have to define the interface in a separate class. With the rest DSL you can specify the interface definition inside the route builder itself. This means that you can link your rest interface directly to your camel route implementation. No extra injection etc. is required. You can have a rest interface up and running in just a few easy steps. In this blog post I will describe these steps and some extra options you might find useful.

The code I use in the examples can be found here. I have used blueprint in combination with the java DSL.

The rest DSL is similar to the standard DSL you use when defining your camel routes. The configuration globally consists out of two factors. First there is the rest configuration.
In the configuration part you tell which Camel component you want to use to host your rest service. This component will handle the actual rest transportation. For my example I used the jetty component.

Besides the camel component you have to configure some other stuff. The most important ones are listed below:
• host – the location on which the endpoint needs to be hosted;
• port – the port on which the endpoint is hosted;
• bindingmode – dictates the way the input of the rest calls is binded / mapped. Here you can specify that a post method with a specific type must be cast to the corresponding pojo. In my example I set it to JSON. This means that the JSON input is mapped to the pojo automatically.

restConfiguration()
	.component("jetty")
	.host("localhost")
	.port(9091)
	.dataFormatProperty("prettyPrint", "true")
	.bindingMode(RestBindingMode.json);

The second part of the rest configuration is the interface definition. Here you will see the biggest advantage when using the rest DSL.

Normally you start a camel route with a simple “from()”, using the rest DSL you start with “rest()”.

The first thing you can set in the interface is the path on which your interface can be found. After that you can define your methods. The rest DSL supports the standard rest methods (post, get, delete etc.)

rest("/restDsl/")
	.get("/getOperation/{id}").id("getOperationRest").to("direct:getOperation")
	.post("/postOperatoin").id("postOperationRest").type(OrderInputPojo.class).to("direct:postOperation");

from("direct:getOperation").routeId("getOperation")
	.log("body: ${header.id}"
	.processRef("orderInputProcessor");

When defining a post operation you have the possibility to define the input type. By setting the POJO as the input type you can automatically cast (bindingmode) the input (json or XML) to the POJO class.

When defining an operation you have two options. You can either use the route functionality inside your rest definition or you can use the to() method to call another route. By calling another route you separate your interface definition from the orchestration which keeps your code clean.

References:
Camel RestDSL documentation

Calling a Soap Service with camel

Invoking a SOAP service in Fuse (camel) is similar to exposing a SOAP service. The first steps are the same, you have to create a CXF endpoint. This is explained in full in this blogpost.

You first generate the source classes based on the WSDL. After that you register it as a CXF endpoint in (in my case) the blueprint file of your project. The main difference is that you need to specify the address / endpoint of service.

<cxf:cxfEndpoint id="newOrderEndpoint"  
address="http://localhost:8088/mockNewOrder" 
serviceClass="nl.rubix.service.neworder.NewOrderServiceOperationsPortType"/>

Calling the service from your route can be done by simply setting the CXF endpoint in your “to” statement.

.to("cxf:bean:theRefToYourCXFBean")

If you would now call the service, CXF would choose the first operation in the WDSL. So we have to tell CXF which operation it needs to call. This is done by setting the header “OperationName”. This header will tell CXF to call the operation you want.

.setHeader("OperationName", simple("getTheSpecialOrder"))

The last part is creating the body for the request message. One way to do this is in a processor. In my case the code looks like this:

ObjectFactory objFac = new ObjectFactory();
GetTheSpecialOrderRequest requestMsg = objFac.createGetTheSpecialOrderRequest();
requestMsg.setOrderNumber(BigInteger.valueOf(123));
exchange.getOut().setBody(requestMsg);		

If everything is set correctly you should be able to call the service. In order to test it properly I created a mockserver in SoapUI. Which is included in the sources of this project (project sources).

Removing camel HTTP Headers

When trying to invoke a json rest service from a route in my CXF soap web service using HTTP4. I got the following error:

“If you are forwarding/bridging http endpoints, then enable the bridgeEndpoint option on the endpoint:”

It appeared that camel was trying to send more information then required. This because CXF uses / generates some HTTP headers. These headers are fine for CXF but when using HTTP4 it gave me the error.

In order to solve this I stumbled upon this post of RedHat.

By simply removing all the camel HTTP headers I was able to successfully call my json service. You can remove the headers with the following statement in your route:

.removeHeaders(“CamelHttp*”)

Fuse contract first webservice

In this blog post I will explain how you can create a SOAP web service in Fuse. You can implement a SOAP interface with the help of CXF. Creating a soap web service with CXF can be done in two ways, contract first or code based. Code based means that the contract is based on the java code. Contract first means that you first specify the contract (WSDL, XSD) and base the implementation on that contract. Because contract first gives you complete control over the interface contract I think it is the best way to go.

In this example I have chosen to use blueprint in combination with the java DSL.

So the first thing, and in my opinion the hardest part, is creating the WSDL. You can find mine and the sources of the project (and the WSDL) here.

The next thing you need to do is generate the code based on the WSDL. This is done with mvn. By adding the following snippets to your pom.xml file mvn will generate the java code that represents the WSDL.

First add the CXF dependency:

<dependency>
    <groupId>org.apache.camel</groupId>
     <artifactId>camel-cxf</artifactId>
     <version><your camel version></version>
</dependency>

Add the plugin that generates the java source code for the contract. Make sure it points to the location of your WSDL and that the WSDL is inside the package:

<plugin>
	<groupId>org.apache.cxf</groupId>
	<artifactId>cxf-codegen-plugin</artifactId>
	<version>3.0.4.redhat-620133</version>
	<executions>
		<execution>
			<id>generate-sources</id>
			<phase>generate-sources</phase>
			<configuration>
				<sourceRoot>target/generated/src/main/java</sourceRoot>
				<wsdlOptions>
					<wsdlOption>
						<wsdl>${project.basedir}/src/main/resources/wsdl/newOrder.wsdl</wsdl>
						<extraargs>
							<extraarg>-impl</extraarg>
						</extraargs>
					</wsdlOption>
				</wsdlOptions>
				<fork>true</fork>
				<additionalJvmArgs>-Djavax.xml.accessExternalSchema=jar:file,file</additionalJvmArgs>
			</configuration>
			<goals>
				<goal>wsdl2java</goal>
			</goals>
		</execution>
	</executions>
</plugin>

After these changes are made you have to run “mvn generate-sources” in order to generate the sources. After this is done, you will probably need to update your project inside your IDE. Otherwise you might not see your generated sources in the project structure.

After the sources have been generated you can add the endpoint to the blueprint of your project. Do not forget to add the CXF namespace. By specifying the endpoint inside your blueprint you can later use it inside your route.

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:cxf="http://camel.apache.org/schema/blueprint/cxf"
xmlns:camel="http://camel.apache.org/schema/blueprint"
xsi:schemaLocation="
                http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd
                http://camel.apache.org/schema/blueprint/cxf http://camel.apache.org/schema/blueprint/cxf/camel-cxf.xsd
                http://camel.apache.org/schema/blueprint http://camel.apache.org/schema/blueprint/camel-blueprint.xsd"
<cxf:cxfEndpoint id="newOrderEndpoint"  address="/newOrder/"  serviceClass="nl.rubix.service.neworder.NewOrderServiceOperationsPortType">

Now you are ready to create your endpoint in the camel route. Add the following statement to your route (in my case the java dsl), it should look like this:

from("cxf:bean:newOrderEndpoint")

If you want to see the results of your work you could deploy the bundle to your server. After this is done you should be able to see the service (and the contract) listed at http://localhost:8181/cxf.

Now that the configuration things are done we can actually start implementing the service. The contract in this example specifies two operations with, for the case of simplicity, the same response (xsd element). So we need to change the route so that it can process the two operations.

One way of doing this is by using the recipientList. This allows you to dynamically call another route. If you base the recipientList on the operationName header camel will send the message to the corresponding route.   All the HTTP / SOAP headers are accessible in the route so it is quite straightforward.

.recipientList(simple("direct:${header.operationName}"));
        
from("direct:getOrder")
.log("operation: getTheOrder")
.processRef("contractFirstProcessor");

from("direct:getTheSpecialOrder")
.log("operation: GetTheSpecialOrder")
.processRef("getTheSpecialOrderProcessor");

Each operation in a WSDL normally has a different implementation behind it. Within Fuse I solved it by creating sub routes and for each operation a separate processor. By adding the subroutes you can create specific orchestration per operation. For example call a different database procedure or underlying web service.

The final piece of the puzzle is of course the transformation of the input message and generating the output message. In this case I made a stub that maps part of the input message to the output message. This is done in the processors.

Accessing the input message in the processor can be done in one statement. The generated code contains the class that specifies the input message of the operation. In this case the GetOrderRequest.class. The only thing that you need to do is casting it to the right class. This can be done with the following statement:

GetOrderRequest input = exchange.getIn().getBody(GetOrderRequest.class);

In order to create the output message you can also use the classes generated by CXF. If everything has been generated successfully you should have an ObjectFactory class with which you can generate the output. I mapped the input to the output (OrderNumber) and made sure the generated XML was conform the contract specifications.

ObjectFactory objFac = new ObjectFactory();
GetOrderResponse outbody = objFac.createGetOrderResponse();
outbody.setOrderNumber(input.getOrderNumber());
CustomerType newCustomer = new CustomerType();
newCustomer.setName("Dirk");
newCustomer.setLastname("Janssen");
outbody.setCustomer(newCustomer);
OrderType order = new OrderType();
order.setPrice(12);
order.setProductName("TV");
OrderListType newOrderList = new OrderListType();
newOrderList.getOrder().add(order);

outbody.setOrderList(newOrderList);

After this is you only have to pass the output message back to CXF:

exchange.getOut().setBody(outbody);

Generate a similar processor for the other operation and you are done. Deploy the service to the server and test it (for example, with soapUI).

Read More »