Camel and Xpath 2.0

Camel offers you to create predicates or expressions using XPath. This can be quiet convenient when you want to route your message based on the content of an XML message or extract a value from a XML payload. XPath itself is a powerful and useful tool that allows you to make complex queries with XML. Camel uses the default XPath package that comes with Java. Unfortunately it implements the XPath 1.0 version and not the 2.0 version. This means that you miss some powerful functions.

Luckily it is easy to load the Saxon XPath factory which contain the XPath 2.0+ functionality. First add the correct dependency:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-saxon</artifactId>
    <version>2.19.0</version>
</dependency>

After this you have to add the following to your route builder class:

import net.sf.saxon.xpath.*;

…

XPathFactory fac = new XPathFactoryImpl();

from("direct:xmlSource")
        .setProperty("serviceID")
            .xpath("/service/service[lower-case(name) = 'lowercasename']/id", String.class);

Start your route and you will see a log statement saying that the Saxon XPathFactory is being used and you won’t get an exception stating that the lower-case function does not exists.

Enable CORS with Camel Rest DSL

When creating a public rest API you have to take into account how and who is using your API. This is not only important in how you structure your API but also how you expose your API. One of the issues I’ve faced with hosting a public API is CORS. Users of the API got the following error, or a variation of, it when calling the API from javascript.

Cross-Origin Request Blocked: ….

In short, it means that you have to enable CORS or “Cross-origin resource sharing”. When a website wants to access a public API that is hosted on a different domain it is normally blocked by the “Same-origin policy”. This policy, implemented by your browser blocks any call that is not in the same domain (hostname and port) as the website that is shown in your browser. For APIs that belong to the website this is normally not a real problem because you can simply serve the API via the same domain. However with public APIs this is not possible. With CORS your browser allows you to make cross domain requests.

When a browser has to get data from a public API that is hosted on a different domain it uses CORS to checks if the request is allowed. The browser sends an OPTIONS request to the API stating it wants to make a request. Part of the request are the custom headers you have specified for the call. The public API has to accept this OPTIONS request. After the request has been accepted the browser will execute the actual request.

As the rest API must accept the request it means that your API has to contain some configuration that handles the CORS requests. With the camel rest DSL this is relatively easy. First you have to enable the CORS, for simple requests this is enough. I however had to allow some custom headers as well. This meant that I had to white list my custom headers.

restConfiguration()
        .component("jetty")
        .host("0.0.0.0")
        .port(8888)
        .bindingMode(RestBindingMode.json)
        .enableCORS(true)
        .corsHeaderProperty("Access-Control-Allow-Headers", "Origin, Accept, X-Requested-With, Content-Type, Access-Control-Request-Method, Access-Control-Request-Headers,CustomHeader1, CustomHeader2")
        ;

ActiveMQ connection pooling using Weld

Last week I, again, had to create a connection pool for a camel route that was sending messages to an ActiveMQ broker. This time I had to specify it in a project that uses Weld as an injection framework. I based my configuration on the ActiveMQ documentation.

The amount of connections and concurrent consumers is based on the standard documentation. The documentation states that you can easily set those numbers without any major impact. Increasing the amount is of course possible, it will however have impact on your memory usage. Before changing it make sure it is required for your application. I decided to go for the “standard” settings and change it whenever the application usage exceeds my expectations.

The documentation also states that a connectionFactory needs to be started and stopped for it to work correctly. Because I was using CDI I had to look for a solution to do this on startup / shutdown of the application. I found the “Initialized/Destroyed” annotations in combination with the “Observes” annotation. The “Observer” will make sure that the method is called whenever the the application scope is ether initialized or destroyed.


package nl.janssen.coolproject.mq;

import io.fabric8.annotations.ServiceName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.apache.camel.component.jms.JmsConfiguration;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.Destroyed;
import javax.enterprise.context.Initialized;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Produces;
import javax.inject.Named;

public class ActiveMQComponentFactory {

  public void init(@Observes @Initialized(ApplicationScoped.class) Object init, @Named("pooledConnectionFactory") PooledConnectionFactory pooledConnectionFactory){
    pooledConnectionFactory.start();
  }

  public void destroy(@Observes @Destroyed(ApplicationScoped.class) Object init, @Named("pooledConnectionFactory") PooledConnectionFactory pooledConnectionFactory){
    pooledConnectionFactory.stop();
  }

 @Produces
  @Named("connectionFactory")
  @ApplicationScoped
  public ActiveMQConnectionFactory jmsConnectionFactory(){
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    factory.setUserName("admin");
    factory.setPassword("admin");

    return factory;
  }

  @Produces
  @Named("pooledConnectionFactory")
  @ApplicationScoped
  public PooledConnectionFactory createPooledConnectionFactory(@Named("connectionFactory") ActiveMQConnectionFactory factory){
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
    pooledConnectionFactory.setMaxConnections(8);
    pooledConnectionFactory.setConnectionFactory(factory);
    return pooledConnectionFactory;
  }

  @Produces
  @Named("jmsConfiguration")
  @ApplicationScoped
  public JmsConfiguration createJmsConfiguration(@Named("pooledConnectionFactory") PooledConnectionFactory pooledConnectionFactory){
    JmsConfiguration jmsConfiguration =  new JmsConfiguration();
    jmsConfiguration.setConnectionFactory(pooledConnectionFactory);
    jmsConfiguration.setConcurrentConsumers(10);
    return  jmsConfiguration;
  }

  @Produces
  @Named
  @Alias("jms")
  @ApplicationScoped
  public ActiveMQComponent createActiveMQComponent(@Named("jmsConfiguration") JmsConfiguration jmsConfiguration){
    ActiveMQComponent component = new ActiveMQComponent();
    component.setConfiguration(jmsConfiguration);
    return component;
  }

}

And of course to use the connection pool in your route you have to refer to the ActiveMQComponent. In my case I refered to the alias “jms”.


public class IndexerApi extends RouteBuilder {

  @Override
  public void configure() throws Exception {

    from("direct:somewhere")
        .to("jms:queue:randomQueueName")
  }
}

DeltaSpike with CDI and Camel

Last week I was looking for a easy way to inject property values into my code. In my project I was using Camel in combination with Weld (CDI). One of the main requirements was that properties could be injected from multiple “locations”. From a property file or a system / environment property.

Because we are using Openshift to run our applications it is easy to use environment variables to inject your configuration. I did not want to write my own code to resolve the properties so I started looking. Eventually I found DeltaSpike, with DeltaSpike you can easily inject properties in your code and it works seamlessly with CDI.

You can inject properties from the following four locations:

  • System properties (ordinal = 400)
  • Environment properties (ordinal = 300)
  • JNDI values (ordinal = 200, the base name is “java:comp/env/deltaspike/”)
  • Properties file values (apache-deltaspike.properties) (ordinal = 100, default filename is “META-INF/apache-deltaspike.properties”)

When the same property is configured in two or more locations the value of the highest ordinal is used. So System properties will overwrite any configuration done in the property file.

Because I did not want to use the default property filename I implemented the following class in order to load my own property file:


import org.apache.deltaspike.core.api.config.PropertyFileConfig;

import javax.inject.Named;

@Named
public class MyCustomPropertyConfig implements PropertyFileConfig {

    @Override public String getPropertyFileName() {
      return "badasspropertyFile.properties";
    }

    @Override public boolean isOptional() {
      return true;
    }
}

There are three ways to load a property in your class.

  • ConfigResolver methods for easy programmatic access to values
  • TypedResolver API for typed configuration values and precise control over resolution
  • @ConfigProperty for injection of configured values into beans
  • interface based configuration

When using DeltaSpike in combination with CDI you can inject property values in a similar way you are used to when injecting beans. For this you can use the “@ConfigProperty” annotation:


@Inject
@ConfigProperty(name = "loglevel", defaultValue = "INFO")
  private String logLevel;

Custom (un)Marshalling with the Camel Rest DSL

Sometimes you want to use a custom marshaller to marshal or unmarshal your messages. In our case we needed to parse all date fields to a specific format when marshalling our output message. We noticed that we could set a custom jsonDataFormat or a custom xmlDataFormat. These options allow you to specify your own DataFormat class, in essence your customized marshaller.

We needed to send out XML or JSON, depending on the clients request (http header: Accept). Because you implement the same interface I will show only the JSON DataFormat. The dataFormat options expects a class that implements the “org.apache.camel.spi.DataFormat” interface. The interface defines two methods, marshal and unmarshal. When you only use the DataFormat for outgoing messages implementing the marshal method is sufficient.

public class JsonDataFormat implements DataFormat {

    private final ObjectMapper jacksonMapper;

    public JsonDataFormat() {
        jacksonMapper = new ObjectMapper();
    }

    @Override
    public void marshal(Exchange exchange, Object obj, OutputStream stream) throws Exception {

        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss");
        jacksonMapper.setDateFormat(dateFormat);

        if (view != null) {
            ObjectWriter w = jacksonMapper.writerWithView(view);
            w.writeValue(stream, obj);
        } else {
            stream.write(jacksonMapper.writeValueAsBytes(obj));
        }
    }

    @Override
    public Object unmarshal(Exchange exchange, InputStream stream) throws Exception {
        return null;
    }

Now you cannot simply add the class as a bean and refer to it. You need to add it as a DataFormat to the camel context. This can be done by adding the following file to your META-INF directory: in the classpath: services.org.apache.camel.dataformat add a file which contains the following line:

class=nl.janssen.dataformats.JsonDataFormat

The last thing you need to do is tell your restConfiguration that you want to use your dataFormat. When setting the dataFormat you have to use the file name you specified in the above mentioned classpath. In my case JsonFormat.

restConfiguration()
      .component("jetty")
      .host("0.0.0.0")
      .port("8080")
      .bindingMode(RestBindingMode.auto)
      .xmlDataFormat("XmlFormat")
      .jsonDataFormat("JsonFormat");

If you want a custom xmlDataFormat you simply have to implement the DataFormat interface again but now with an XML (un)marshalling implementation.

Alternative class in Weld

In a Camel project with Weld I faced an issue while trying to unit test a route that called a remote system (Elasticsearch). This was done with the help of a custom bean. The bean handles the connection and the actual calls to the system. I did not want to call the actual system during my Unit Test, so I created a stub. However, to create the context and start the route I used CamelCdiRunner. This meant that the original client bean was being loaded. So I had to replace the actual bean with my stub. Luckily Weld supports the use of alternative beans. In essence you can replace a bean with an alternative, in my case a stub.

The first thing you need to do is to make sure that your class is based on a interface. This enables Weld to successfully replace your bean with a alternative. Inject the bean based on the interface and not on the class itself.

@Inject
@Named("elasticsearchCamelClient")
ElasticsearchCamelClientInterface client;

Next create your mock and make sure that it implements the interface of the bean you want to replace. Add the Alternative annotation.

@Alternative
@Named("elasticsearchCamelClient")
public class MockElasticsearchCamelClient implements ElasticsearchCamelClientInterface {

  public MockElasticsearchCamelClient() {
  }

  Stub code.....

Most of the documentation I found shows you how to use an alternative bean using the beans.xml file. However it is not possible to simply add a beans.xml file to the test resources. You can only specify one for your project. You could replace it dynamically with your build tool. This however is, imho, not a nice solution. Luckily you can also specify an alternative class in your test class. By adding the “@Beans” annotation. This allows you to specify one or more alternative beans.

@RunWith(CamelCdiRunner.class)
@Beans(
  alternatives = {MockElasticsearchCamelClient.class}
)
public class RouteTest { ...

The alternatives indicates that you want to run the CamelCdiRunner with the stub. When you now start the Unit Test you will see that both beans are being loaded but that the mock is used to run your tests.

Happy testing!

Teams in Fabric8

One of the first things you will probably do when starting to work with Fabric8 is the creation of a Team:

A team is a collection of people working together on one or more microservices.

A team has a number of environments such as Dev, Test, Staging, Production. Each team can decide whatever environments it requires. A team always has a Development environment where the development tools run such as Jenkins, Nexus and the developer console.

A team contains a namespace which must contain a Jenkins, Nexus and Gogs implementation in order to use the full benefits of Fabric8. However when you create a new team from the web console, you will notice that not everything is generated correctly. In the version of Fabric8 I was working with some parts where missing. The key components such as Fabric8 itself where not deployed in my namespace. This prevented the installation of the “CD-pipeline” which contains the Jenkins, Nexus and Gogs pods. After some research I found out that the workaround can be done from the command line. With the help of “kubectl”, the cli of kubernetes you first have to create a new namespace.

kubectl create namespace <namespace>
kubectl config set-context 'kubectl config current-context' --namespace=<namespace>
gofabric8 deploy

After the above commands have completed you will see a new namespace in your Fabric8 console. Select the “Create Team” button. Select the option “Existing Namespace” and pick the namespace you just created. Finish the installation of the team and you are ready to go.

createteam

When running the “CD-pipeline” installation I also ran into the persistence claim issue mentioned in the my previous blogpost. In order to create the persistence volume claims you have to specify the namespace as well. Otherwise the claims won’t be created.

gofabric8 volumes --namespace=<namespace>

Getting started with Fabric8

Last week I started to look into Fabric8. The first step I took, was of course the installation of Fabric8. This post is a short walk-through of my installation and the problems that I faced. You have several options for installing fabric8, as can be seen here. For simplicity I chose to install it on my Linux laptop (link). The installation of Fabric8 is quiet simple, I chose to install it with the KVM driver. When you are using Fabric8 you can easily create a lot of small applications. I noticed that the performance was not optimal when starting it with the default settings so I used the following command:

gofabric8 start --cpus=4 --memory=8144

After running the “gofabric8 start” command you will be redirected to your browser:
cleanworkspace
The first thing I tried to do was to create an application. The first time you do this Fabric8 will ask you to setup a Gogs Secrect. Just select the default entry “default-gogs-git”. Next fabric8 will ask you to run a CD pipeline. In order to create a new application Fabric8 requires several applications to be running in your domain. This includes Jenkins, Gogs and Nexus.
At this stage I found out that the installation was not completely successful. The Gogs, Nexus and Jenkins pods did not start correctly. It appeared that the latest version of kubernetes is not yet working correctly in combination with Fabric8. In order to get all the pods up and running I had to downgrade my kubernetes.

First stop your fabric8 instance and delete your minikube instance.

gofabric8 stop
minikube delete
minikube start --vm-driver=kvm --memory=8144 --cpus=1 --disk-size=50g --kubernetes-version v1.4.5
gofabric8 start

After this you will see that the pods are trying to start. However in my case they wouldn’t start correctly. I got the error:

StorageClass "standard" not found

The persistence claim could not be made. The claim is either not made correctly or not made at all. The solution is simple, you have to run the following command:

gofabric8 volumes

This command will create the persistence claims. When you go back to the Fabric8 console you will see that the pods will start correctly.
allpodsupandrunning

When you try to create an application you will no longer run into any problems. Complete the installation of the CD Pipeline and you can create your first application.

After that you can create a new Application. Fabric8 provides a list of example projects. I of course choose the integration (Camel with spring-boot) project).

Note that the current version of fabric8 dictates that you have to specify a project name with all lower-case. If you do not do this the project will only be partially created.

newproject

The next step is to select the pipeline you want to use. Default Fabric8 supplies several options, from a simple deploy to a complete Canary Release Stage and approval, which automatically creates a test, stage and production environment.

devopsoptions

Just select your choice and sit back and wait until everything is created. You will notice that a maven job is triggered. The code of your application will be added to Gogs. If you go to the Jenkins instance of your instance. You will see that the Jenkins job has been created as well. After a while the first pod will be created the first (test environment). And, in my case, eventually a staging environment is created as well.

projectrunningwithtwoenvironments

That is it, I have shown you how to install and create your first project in Fabric8. My first impression after the installation is that Fabric8 is not yet finished. The GUI feels a bit flaky, sometimes you have to hit the refresh button to see some data and not everything works out of the box. However, the options and potential that Fabric8 provides makes it an extreme interesting solution to run your applications. The out of the box provisioning, creation of environments, build-pipelines etc. look really promising.

Kafka with Camel

Kafka is a populair opensource, distributed streaming, platform which prides itself on key features as fault-tolerance and replay options. My colleague Jan van Zoggel wrote a nice “getting started” blog post about kafka which can be found here.

In this blogpost I will show you, in some very easy steps, how you can start producing and consuming kafka messages with apache Camel.

First of all you need to install Kafka itself (https://kafka.apache.org/quickstart). After you have started the kafka service you need to create a topic.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic theBigKafkaTestTopic

When the topic is created you can start producing messages on the topic. Add the dependency to your pom file.

<dependency>
  <groupId>org.apache.camel</groupId>
  <artifactId>camel-kafka</artifactId>
</dependency>

The following code snippet shows how to send a message to a kafka topic. Note that a header is set (KafkaConstants.KEY) this is the unique identifier for the kafka message.

from("timer:trigger")
 .transform().simple("ref:myBean")
 .setHeader(KafkaConstants.KEY,simple("bean:generateUUID?method=getGuid") )
 .log("${header.kafka.KEY}")
 .to("kafka:localhost:9092?topic=theBigKafkaTestTopic");

Consuming can be done by adding the configuration url to your from component.


from("kafka:localhost:9092?topic=theBigKafkaTestTopic&groupId=testing&autoOffsetReset=earliest&consumersCount=1")
 .log("${body}")
 .end();