ActiveMQ connection pooling using Weld

Last week I, again, had to create a connection pool for a camel route that was sending messages to an ActiveMQ broker. This time I had to specify it in a project that uses Weld as an injection framework. I based my configuration on the ActiveMQ documentation.

The amount of connections and concurrent consumers is based on the standard documentation. The documentation states that you can easily set those numbers without any major impact. Increasing the amount is of course possible, it will however have impact on your memory usage. Before changing it make sure it is required for your application. I decided to go for the “standard” settings and change it whenever the application usage exceeds my expectations.

The documentation also states that a connectionFactory needs to be started and stopped for it to work correctly. Because I was using CDI I had to look for a solution to do this on startup / shutdown of the application. I found the “Initialized/Destroyed” annotations in combination with the “Observes” annotation. The “Observer” will make sure that the method is called whenever the the application scope is ether initialized or destroyed.


package nl.janssen.coolproject.mq;

import io.fabric8.annotations.ServiceName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.apache.camel.component.jms.JmsConfiguration;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.Destroyed;
import javax.enterprise.context.Initialized;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Produces;
import javax.inject.Named;

public class ActiveMQComponentFactory {

  public void init(@Observes @Initialized(ApplicationScoped.class) Object init, @Named("pooledConnectionFactory") PooledConnectionFactory pooledConnectionFactory){
    pooledConnectionFactory.start();
  }

  public void destroy(@Observes @Destroyed(ApplicationScoped.class) Object init, @Named("pooledConnectionFactory") PooledConnectionFactory pooledConnectionFactory){
    pooledConnectionFactory.stop();
  }

 @Produces
  @Named("connectionFactory")
  @ApplicationScoped
  public ActiveMQConnectionFactory jmsConnectionFactory(){
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    factory.setUserName("admin");
    factory.setPassword("admin");

    return factory;
  }

  @Produces
  @Named("pooledConnectionFactory")
  @ApplicationScoped
  public PooledConnectionFactory createPooledConnectionFactory(@Named("connectionFactory") ActiveMQConnectionFactory factory){
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
    pooledConnectionFactory.setMaxConnections(8);
    pooledConnectionFactory.setConnectionFactory(factory);
    return pooledConnectionFactory;
  }

  @Produces
  @Named("jmsConfiguration")
  @ApplicationScoped
  public JmsConfiguration createJmsConfiguration(@Named("pooledConnectionFactory") PooledConnectionFactory pooledConnectionFactory){
    JmsConfiguration jmsConfiguration =  new JmsConfiguration();
    jmsConfiguration.setConnectionFactory(pooledConnectionFactory);
    jmsConfiguration.setConcurrentConsumers(10);
    return  jmsConfiguration;
  }

  @Produces
  @Named
  @Alias("jms")
  @ApplicationScoped
  public ActiveMQComponent createActiveMQComponent(@Named("jmsConfiguration") JmsConfiguration jmsConfiguration){
    ActiveMQComponent component = new ActiveMQComponent();
    component.setConfiguration(jmsConfiguration);
    return component;
  }

}

And of course to use the connection pool in your route you have to refer to the ActiveMQComponent. In my case I refered to the alias “jms”.


public class IndexerApi extends RouteBuilder {

  @Override
  public void configure() throws Exception {

    from("direct:somewhere")
        .to("jms:queue:randomQueueName")
  }
}

Advertisements

DeltaSpike with CDI and Camel

Last week I was looking for a easy way to inject property values into my code. In my project I was using Camel in combination with Weld (CDI). One of the main requirements was that properties could be injected from multiple “locations”. From a property file or a system / environment property.

Because we are using Openshift to run our applications it is easy to use environment variables to inject your configuration. I did not want to write my own code to resolve the properties so I started looking. Eventually I found DeltaSpike, with DeltaSpike you can easily inject properties in your code and it works seamlessly with CDI.

You can inject properties from the following four locations:

  • System properties (ordinal = 400)
  • Environment properties (ordinal = 300)
  • JNDI values (ordinal = 200, the base name is “java:comp/env/deltaspike/”)
  • Properties file values (apache-deltaspike.properties) (ordinal = 100, default filename is “META-INF/apache-deltaspike.properties”)

When the same property is configured in two or more locations the value of the highest ordinal is used. So System properties will overwrite any configuration done in the property file.

Because I did not want to use the default property filename I implemented the following class in order to load my own property file:


import org.apache.deltaspike.core.api.config.PropertyFileConfig;

import javax.inject.Named;

@Named
public class MyCustomPropertyConfig implements PropertyFileConfig {

    @Override public String getPropertyFileName() {
      return "badasspropertyFile.properties";
    }

    @Override public boolean isOptional() {
      return true;
    }
}

There are three ways to load a property in your class.

  • ConfigResolver methods for easy programmatic access to values
  • TypedResolver API for typed configuration values and precise control over resolution
  • @ConfigProperty for injection of configured values into beans
  • interface based configuration

When using DeltaSpike in combination with CDI you can inject property values in a similar way you are used to when injecting beans. For this you can use the “@ConfigProperty” annotation:


@Inject
@ConfigProperty(name = "loglevel", defaultValue = "INFO")
  private String logLevel;