Camel and Xpath 2.0

Camel offers you to create predicates or expressions using XPath. This can be quiet convenient when you want to route your message based on the content of an XML message or extract a value from a XML payload. XPath itself is a powerful and useful tool that allows you to make complex queries with XML. Camel uses the default XPath package that comes with Java. Unfortunately it implements the XPath 1.0 version and not the 2.0 version. This means that you miss some powerful functions.

Luckily it is easy to load the Saxon XPath factory which contain the XPath 2.0+ functionality. First add the correct dependency:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-saxon</artifactId>
    <version>2.19.0</version>
</dependency>

After this you have to add the following to your route builder class:

import net.sf.saxon.xpath.*;

…

XPathFactory fac = new XPathFactoryImpl();

from("direct:xmlSource")
        .setProperty("serviceID")
            .xpath("/service/service[lower-case(name) = 'lowercasename']/id", String.class);

Start your route and you will see a log statement saying that the Saxon XPathFactory is being used and you won’t get an exception stating that the lower-case function does not exists.

Advertisements

Enable CORS with Camel Rest DSL

When creating a public rest API you have to take into account how and who is using your API. This is not only important in how you structure your API but also how you expose your API. One of the issues I’ve faced with hosting a public API is CORS. Users of the API got the following error, or a variation of, it when calling the API from javascript.

Cross-Origin Request Blocked: ….

In short, it means that you have to enable CORS or “Cross-origin resource sharing”. When a website wants to access a public API that is hosted on a different domain it is normally blocked by the “Same-origin policy”. This policy, implemented by your browser blocks any call that is not in the same domain (hostname and port) as the website that is shown in your browser. For APIs that belong to the website this is normally not a real problem because you can simply serve the API via the same domain. However with public APIs this is not possible. With CORS your browser allows you to make cross domain requests.

When a browser has to get data from a public API that is hosted on a different domain it uses CORS to checks if the request is allowed. The browser sends an OPTIONS request to the API stating it wants to make a request. Part of the request are the custom headers you have specified for the call. The public API has to accept this OPTIONS request. After the request has been accepted the browser will execute the actual request.

As the rest API must accept the request it means that your API has to contain some configuration that handles the CORS requests. With the camel rest DSL this is relatively easy. First you have to enable the CORS, for simple requests this is enough. I however had to allow some custom headers as well. This meant that I had to white list my custom headers.

restConfiguration()
        .component("jetty")
        .host("0.0.0.0")
        .port(8888)
        .bindingMode(RestBindingMode.json)
        .enableCORS(true)
        .corsHeaderProperty("Access-Control-Allow-Headers", "Origin, Accept, X-Requested-With, Content-Type, Access-Control-Request-Method, Access-Control-Request-Headers,CustomHeader1, CustomHeader2")
        ;

ActiveMQ connection pooling using Weld

Last week I, again, had to create a connection pool for a camel route that was sending messages to an ActiveMQ broker. This time I had to specify it in a project that uses Weld as an injection framework. I based my configuration on the ActiveMQ documentation.

The amount of connections and concurrent consumers is based on the standard documentation. The documentation states that you can easily set those numbers without any major impact. Increasing the amount is of course possible, it will however have impact on your memory usage. Before changing it make sure it is required for your application. I decided to go for the “standard” settings and change it whenever the application usage exceeds my expectations.

The documentation also states that a connectionFactory needs to be started and stopped for it to work correctly. Because I was using CDI I had to look for a solution to do this on startup / shutdown of the application. I found the “Initialized/Destroyed” annotations in combination with the “Observes” annotation. The “Observer” will make sure that the method is called whenever the the application scope is ether initialized or destroyed.


package nl.janssen.coolproject.mq;

import io.fabric8.annotations.ServiceName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.apache.camel.component.jms.JmsConfiguration;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.Destroyed;
import javax.enterprise.context.Initialized;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Produces;
import javax.inject.Named;

public class ActiveMQComponentFactory {

  public void init(@Observes @Initialized(ApplicationScoped.class) Object init, @Named("pooledConnectionFactory") PooledConnectionFactory pooledConnectionFactory){
    pooledConnectionFactory.start();
  }

  public void destroy(@Observes @Destroyed(ApplicationScoped.class) Object init, @Named("pooledConnectionFactory") PooledConnectionFactory pooledConnectionFactory){
    pooledConnectionFactory.stop();
  }

 @Produces
  @Named("connectionFactory")
  @ApplicationScoped
  public ActiveMQConnectionFactory jmsConnectionFactory(){
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    factory.setUserName("admin");
    factory.setPassword("admin");

    return factory;
  }

  @Produces
  @Named("pooledConnectionFactory")
  @ApplicationScoped
  public PooledConnectionFactory createPooledConnectionFactory(@Named("connectionFactory") ActiveMQConnectionFactory factory){
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
    pooledConnectionFactory.setMaxConnections(8);
    pooledConnectionFactory.setConnectionFactory(factory);
    return pooledConnectionFactory;
  }

  @Produces
  @Named("jmsConfiguration")
  @ApplicationScoped
  public JmsConfiguration createJmsConfiguration(@Named("pooledConnectionFactory") PooledConnectionFactory pooledConnectionFactory){
    JmsConfiguration jmsConfiguration =  new JmsConfiguration();
    jmsConfiguration.setConnectionFactory(pooledConnectionFactory);
    jmsConfiguration.setConcurrentConsumers(10);
    return  jmsConfiguration;
  }

  @Produces
  @Named
  @Alias("jms")
  @ApplicationScoped
  public ActiveMQComponent createActiveMQComponent(@Named("jmsConfiguration") JmsConfiguration jmsConfiguration){
    ActiveMQComponent component = new ActiveMQComponent();
    component.setConfiguration(jmsConfiguration);
    return component;
  }

}

And of course to use the connection pool in your route you have to refer to the ActiveMQComponent. In my case I refered to the alias “jms”.


public class IndexerApi extends RouteBuilder {

  @Override
  public void configure() throws Exception {

    from("direct:somewhere")
        .to("jms:queue:randomQueueName")
  }
}

DeltaSpike with CDI and Camel

Last week I was looking for a easy way to inject property values into my code. In my project I was using Camel in combination with Weld (CDI). One of the main requirements was that properties could be injected from multiple “locations”. From a property file or a system / environment property.

Because we are using Openshift to run our applications it is easy to use environment variables to inject your configuration. I did not want to write my own code to resolve the properties so I started looking. Eventually I found DeltaSpike, with DeltaSpike you can easily inject properties in your code and it works seamlessly with CDI.

You can inject properties from the following four locations:

  • System properties (ordinal = 400)
  • Environment properties (ordinal = 300)
  • JNDI values (ordinal = 200, the base name is “java:comp/env/deltaspike/”)
  • Properties file values (apache-deltaspike.properties) (ordinal = 100, default filename is “META-INF/apache-deltaspike.properties”)

When the same property is configured in two or more locations the value of the highest ordinal is used. So System properties will overwrite any configuration done in the property file.

Because I did not want to use the default property filename I implemented the following class in order to load my own property file:


import org.apache.deltaspike.core.api.config.PropertyFileConfig;

import javax.inject.Named;

@Named
public class MyCustomPropertyConfig implements PropertyFileConfig {

    @Override public String getPropertyFileName() {
      return "badasspropertyFile.properties";
    }

    @Override public boolean isOptional() {
      return true;
    }
}

There are three ways to load a property in your class.

  • ConfigResolver methods for easy programmatic access to values
  • TypedResolver API for typed configuration values and precise control over resolution
  • @ConfigProperty for injection of configured values into beans
  • interface based configuration

When using DeltaSpike in combination with CDI you can inject property values in a similar way you are used to when injecting beans. For this you can use the “@ConfigProperty” annotation:


@Inject
@ConfigProperty(name = "loglevel", defaultValue = "INFO")
  private String logLevel;

Alternative class in Weld

In a Camel project with Weld I faced an issue while trying to unit test a route that called a remote system (Elasticsearch). This was done with the help of a custom bean. The bean handles the connection and the actual calls to the system. I did not want to call the actual system during my Unit Test, so I created a stub. However, to create the context and start the route I used CamelCdiRunner. This meant that the original client bean was being loaded. So I had to replace the actual bean with my stub. Luckily Weld supports the use of alternative beans. In essence you can replace a bean with an alternative, in my case a stub.

The first thing you need to do is to make sure that your class is based on a interface. This enables Weld to successfully replace your bean with a alternative. Inject the bean based on the interface and not on the class itself.

@Inject
@Named("elasticsearchCamelClient")
ElasticsearchCamelClientInterface client;

Next create your mock and make sure that it implements the interface of the bean you want to replace. Add the Alternative annotation.

@Alternative
@Named("elasticsearchCamelClient")
public class MockElasticsearchCamelClient implements ElasticsearchCamelClientInterface {

  public MockElasticsearchCamelClient() {
  }

  Stub code.....

Most of the documentation I found shows you how to use an alternative bean using the beans.xml file. However it is not possible to simply add a beans.xml file to the test resources. You can only specify one for your project. You could replace it dynamically with your build tool. This however is, imho, not a nice solution. Luckily you can also specify an alternative class in your test class. By adding the “@Beans” annotation. This allows you to specify one or more alternative beans.

@RunWith(CamelCdiRunner.class)
@Beans(
  alternatives = {MockElasticsearchCamelClient.class}
)
public class RouteTest { ...

The alternatives indicates that you want to run the CamelCdiRunner with the stub. When you now start the Unit Test you will see that both beans are being loaded but that the mock is used to run your tests.

Happy testing!

Kafka with Camel

Kafka is a populair opensource, distributed streaming, platform which prides itself on key features as fault-tolerance and replay options. My colleague Jan van Zoggel wrote a nice “getting started” blog post about kafka which can be found here.

In this blogpost I will show you, in some very easy steps, how you can start producing and consuming kafka messages with apache Camel.

First of all you need to install Kafka itself (https://kafka.apache.org/quickstart). After you have started the kafka service you need to create a topic.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic theBigKafkaTestTopic

When the topic is created you can start producing messages on the topic. Add the dependency to your pom file.

<dependency>
  <groupId>org.apache.camel</groupId>
  <artifactId>camel-kafka</artifactId>
</dependency>

The following code snippet shows how to send a message to a kafka topic. Note that a header is set (KafkaConstants.KEY) this is the unique identifier for the kafka message.

from("timer:trigger")
 .transform().simple("ref:myBean")
 .setHeader(KafkaConstants.KEY,simple("bean:generateUUID?method=getGuid") )
 .log("${header.kafka.KEY}")
 .to("kafka:localhost:9092?topic=theBigKafkaTestTopic");

Consuming can be done by adding the configuration url to your from component.


from("kafka:localhost:9092?topic=theBigKafkaTestTopic&groupId=testing&autoOffsetReset=earliest&consumersCount=1")
 .log("${body}")
 .end();

Contract first Rest API

As an integration consultant I have an extensive background in web services. To be specific SOAP enabled web services. The way I used to create a service was by first defining the contract, the WSDL and the XSD files. Based on those files I could generate a service without the need for me to manually type any code.

However lately I started working with Rest services, both JSON and XML. I have written a post (Rest just got easy) on how to create a service using the rest DSL from camel. However in some situations you might want to use a JAX-RS implementation, for example using CXFRS. This means you have to first create the interface definition using POJO’s. You could do this the manual way, like me in my first couple of implementations. Or you could generate the code based on an interface design (Hooray for the lazy!). In this blog post I will show you how to generate your rest interface implementation using swagger documentation (Swagger in a nutshell).

Using a maven plugin you can easily generate the required code based on a yaml styled swagger specification file. The plugin can be found through this dependency:

<dependency>
   <groupId>io.swagger</groupId>
   <artifactId>swagger-codegen-maven-plugin</artifactId>
   <version>2.1.6</version>
</dependency>

Because I am using CXFRS I added the dependency for that as well to my pom file:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-cxf</artifactId>
</dependency>

After you have added the dependencies to the pom file you can configure the plugin.

<plugin>
   <groupId>io.swagger</groupId>
   <artifactId>swagger-codegen-maven-plugin</artifactId>
   <version>2.2.0-SNAPSHOT</version>
   <executions>
      <execution>
        <goals>
          <goal>generate</goal>
        </goals>
        <configuration>
		<!-- specify the swagger yaml -->
		<inputSpec>src/main/resources/yaml/swagger.yaml</inputSpec>
		<!-- target to generate -->
		<language>jaxrs-cxf</language>
		<apiPackage>nl.rubix.api</apiPackage>
		<modelPackage>nl.rubix.api.model</modelPackage>
		<configOptions>
		<sourceFolder>src/main/java</sourceFolder>
		</configOptions>
		</configuration>
     </execution>
   </executions>
</plugin>

The plugin has several configuration options.

  • inputSpec – OpenAPI Spec file path
  • language – target generation language
  • output – target output path (default is ${project.build.directory}/generated-sources/swagger)
  • templateDirectory – directory with mustache templates
  • addCompileSourceRoot – add the output directory to the project as a source root (true by default)
  • modelPackage – the package to use for generated model objects/classes
  • apiPackage – the package to use for generated api objects/classes
  • invokerPackage – the package to use for the generated invoker objects
  • configOptions – a map of language-specific parameters (see below)
  • configHelp – dumps the configuration help for the specified library (generates no sources)

As you can see in my example I specified the language to be “jaxrs-cxf”. Meaning that the generated sources will be specific for jaxrs-cxf. The modelPackage will contain the actual objects. If specified the plugin will add the XML declarations to the model POJO’s. The apiPackage will contain the interface specifications.

Create the yaml file on the specified location. My yaml file looks like this:

---
swagger: '2.0'
info:
  version: 0.0.0
  title: Simple API
paths:
  /:
    get:
      responses:
        200:
          description: OK

Meaning, a very simple interface that listens to the root (“/”) of servic epath and will return a http 200 if everything went as planned.

Now that the configuration is done, you can generate the sources with “mvn compile”. You will see the generated sources in the packages you specified.

If everything went correct you can now specify your service. In my example I am using blueprint in combination with the java DSL.

The blueprint definition for the cxfrs component:

<cxf:rsServer id="api" address="http://localhost:9092/test"
               serviceClass="nl.rubix.api.contractfirst.rest.DefaultApi"
               loggingFeatureEnabled="false">
   <cxf:providers>
     <bean class="org.apache.cxf.jaxrs.provider.json.JSONProvider" />
   </cxf:providers>
</cxf:rsServer>

And the route configuration:

from("cxfrs:bean:api?bindingStyle=SimpleConsumer").log("you did it!");

My example project can be found here: Download sources

Basic authentication in Camel

I need to call a webservice that uses basic authentication. All the solutions and suggestions I could find via google where ether complicated or a lot of work. Until I looked at the CXF manual (http://camel.apache.org/cxf.html). As of version 2.12.3, you can do it by simply adding the username and password to your endpoint in your camel route.

.to("cxf:bean:myCxfEndpoint?username=<username>&password=<password>")