ActiveMQ connection pooling using Weld

Last week I, again, had to create a connection pool for a camel route that was sending messages to an ActiveMQ broker. This time I had to specify it in a project that uses Weld as an injection framework. I based my configuration on the ActiveMQ documentation.

The amount of connections and concurrent consumers is based on the standard documentation. The documentation states that you can easily set those numbers without any major impact. Increasing the amount is of course possible, it will however have impact on your memory usage. Before changing it make sure it is required for your application. I decided to go for the “standard” settings and change it whenever the application usage exceeds my expectations.

The documentation also states that a connectionFactory needs to be started and stopped for it to work correctly. Because I was using CDI I had to look for a solution to do this on startup / shutdown of the application. I found the “Initialized/Destroyed” annotations in combination with the “Observes” annotation. The “Observer” will make sure that the method is called whenever the the application scope is ether initialized or destroyed.


package nl.janssen.coolproject.mq;

import io.fabric8.annotations.ServiceName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.apache.camel.component.jms.JmsConfiguration;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.Destroyed;
import javax.enterprise.context.Initialized;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Produces;
import javax.inject.Named;

public class ActiveMQComponentFactory {

  public void init(@Observes @Initialized(ApplicationScoped.class) Object init, @Named("pooledConnectionFactory") PooledConnectionFactory pooledConnectionFactory){
    pooledConnectionFactory.start();
  }

  public void destroy(@Observes @Destroyed(ApplicationScoped.class) Object init, @Named("pooledConnectionFactory") PooledConnectionFactory pooledConnectionFactory){
    pooledConnectionFactory.stop();
  }

 @Produces
  @Named("connectionFactory")
  @ApplicationScoped
  public ActiveMQConnectionFactory jmsConnectionFactory(){
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    factory.setUserName("admin");
    factory.setPassword("admin");

    return factory;
  }

  @Produces
  @Named("pooledConnectionFactory")
  @ApplicationScoped
  public PooledConnectionFactory createPooledConnectionFactory(@Named("connectionFactory") ActiveMQConnectionFactory factory){
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
    pooledConnectionFactory.setMaxConnections(8);
    pooledConnectionFactory.setConnectionFactory(factory);
    return pooledConnectionFactory;
  }

  @Produces
  @Named("jmsConfiguration")
  @ApplicationScoped
  public JmsConfiguration createJmsConfiguration(@Named("pooledConnectionFactory") PooledConnectionFactory pooledConnectionFactory){
    JmsConfiguration jmsConfiguration =  new JmsConfiguration();
    jmsConfiguration.setConnectionFactory(pooledConnectionFactory);
    jmsConfiguration.setConcurrentConsumers(10);
    return  jmsConfiguration;
  }

  @Produces
  @Named
  @Alias("jms")
  @ApplicationScoped
  public ActiveMQComponent createActiveMQComponent(@Named("jmsConfiguration") JmsConfiguration jmsConfiguration){
    ActiveMQComponent component = new ActiveMQComponent();
    component.setConfiguration(jmsConfiguration);
    return component;
  }

}

And of course to use the connection pool in your route you have to refer to the ActiveMQComponent. In my case I refered to the alias “jms”.


public class IndexerApi extends RouteBuilder {

  @Override
  public void configure() throws Exception {

    from("direct:somewhere")
        .to("jms:queue:randomQueueName")
  }
}

DeltaSpike with CDI and Camel

Last week I was looking for a easy way to inject property values into my code. In my project I was using Camel in combination with Weld (CDI). One of the main requirements was that properties could be injected from multiple “locations”. From a property file or a system / environment property.

Because we are using Openshift to run our applications it is easy to use environment variables to inject your configuration. I did not want to write my own code to resolve the properties so I started looking. Eventually I found DeltaSpike, with DeltaSpike you can easily inject properties in your code and it works seamlessly with CDI.

You can inject properties from the following four locations:

  • System properties (ordinal = 400)
  • Environment properties (ordinal = 300)
  • JNDI values (ordinal = 200, the base name is “java:comp/env/deltaspike/”)
  • Properties file values (apache-deltaspike.properties) (ordinal = 100, default filename is “META-INF/apache-deltaspike.properties”)

When the same property is configured in two or more locations the value of the highest ordinal is used. So System properties will overwrite any configuration done in the property file.

Because I did not want to use the default property filename I implemented the following class in order to load my own property file:


import org.apache.deltaspike.core.api.config.PropertyFileConfig;

import javax.inject.Named;

@Named
public class MyCustomPropertyConfig implements PropertyFileConfig {

    @Override public String getPropertyFileName() {
      return "badasspropertyFile.properties";
    }

    @Override public boolean isOptional() {
      return true;
    }
}

There are three ways to load a property in your class.

  • ConfigResolver methods for easy programmatic access to values
  • TypedResolver API for typed configuration values and precise control over resolution
  • @ConfigProperty for injection of configured values into beans
  • interface based configuration

When using DeltaSpike in combination with CDI you can inject property values in a similar way you are used to when injecting beans. For this you can use the “@ConfigProperty” annotation:


@Inject
@ConfigProperty(name = "loglevel", defaultValue = "INFO")
  private String logLevel;

Alternative class in Weld

In a Camel project with Weld I faced an issue while trying to unit test a route that called a remote system (Elasticsearch). This was done with the help of a custom bean. The bean handles the connection and the actual calls to the system. I did not want to call the actual system during my Unit Test, so I created a stub. However, to create the context and start the route I used CamelCdiRunner. This meant that the original client bean was being loaded. So I had to replace the actual bean with my stub. Luckily Weld supports the use of alternative beans. In essence you can replace a bean with an alternative, in my case a stub.

The first thing you need to do is to make sure that your class is based on a interface. This enables Weld to successfully replace your bean with a alternative. Inject the bean based on the interface and not on the class itself.

@Inject
@Named("elasticsearchCamelClient")
ElasticsearchCamelClientInterface client;

Next create your mock and make sure that it implements the interface of the bean you want to replace. Add the Alternative annotation.

@Alternative
@Named("elasticsearchCamelClient")
public class MockElasticsearchCamelClient implements ElasticsearchCamelClientInterface {

  public MockElasticsearchCamelClient() {
  }

  Stub code.....

Most of the documentation I found shows you how to use an alternative bean using the beans.xml file. However it is not possible to simply add a beans.xml file to the test resources. You can only specify one for your project. You could replace it dynamically with your build tool. This however is, imho, not a nice solution. Luckily you can also specify an alternative class in your test class. By adding the “@Beans” annotation. This allows you to specify one or more alternative beans.

@RunWith(CamelCdiRunner.class)
@Beans(
  alternatives = {MockElasticsearchCamelClient.class}
)
public class RouteTest { ...

The alternatives indicates that you want to run the CamelCdiRunner with the stub. When you now start the Unit Test you will see that both beans are being loaded but that the mock is used to run your tests.

Happy testing!