Quarkus, Kafka, Camel Servlet, Wiring it Together

Quarkus, Kafka, Camel Servlet, Wiring it Together

July 04, 2019 ( last updated : July 05, 2019 )
quarkus kubernetes camel kafka smallrye

https://github.com/ngtranminhtuan/quarkus-kafka-camel-servlet


Abstract

When using Apache Camel with Quarkus as of today, we are limited number of Camel connectors. One important one being the Apache Kafka connector. The Kafka connector provided through the Smallrye Kafka Extension is available for Quarkus though. So in this article, I will show how to wire Smallrye Kafka connector and Camel together. We will also be using the camel-servlet component to reuse the undertow http endpoint provided by Quarkus and to spice things up a little we will use the XML DSL of Camel. All of this will be natively compiled in the end.

Setup a project

In a previous article I showed how to create a hello world using Camel and Quarkus. Here We will try to do something more advanced. Lets start by setting up the project

 mkdir quarkus-kafka-camel-servlet cd
quarkus-kafka-camel-servlet mvn io.quarkus:quarkus-maven-plugin:0.18.0:create \
-DprojectGroupId=agilabs \ -DprojectArtifactId=quarkus-kafka-camel-servlet \
-DclassName="agilabs.GreetingResource" \ -Dpath="/hello" 

Add the following dependencies to the pom.xml file.

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-camel-core</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-camel-servlet</artifactId>
</dependency>

Configure Camel

Application properties

Configure application properties in src/main/resources/application.properties

quarkus.camel.servlet.url-patterns = /camel/*
%dev.quarkus.camel.routes-uris = camel/routes.xml
quarkus.camel.routes-uris=file:/camel/routes.xml
quarkus.camel.defer-init-phase = true

We will match all calls that go to /camel/* to the camel servlet component. In dev mode we will scan the file routes.xml for camel routes located in the classpath, in the resource folder camel.

When launched in "production mode" with the native executable the resources are not included by default in the binary. I'm externalizing the routes.xml file to the folder /camel so that it can be set and changed dynamically. This is actually quite interesting because we get to have a configurable native executable that we can fine tune by switching the routes during run time. For some when I tried to change the property quarkus.camel.routes-uris during native run time, it's new value doesn't seem to be taken into account. So this file path is kind of like being hard coded right now..

Edit July 6,2019 : I discovered that there are different Configuration Phases : BUILD_TIME, BUILD_AND_RUN_TIME_FIXED & RUN_TIME. It turns out this property is set to BUILD_AND_RUN_TIME_FIXED, so it is not dynamically read during native run time for the moment. More details here.

Quarkus.camel.defer-init-phase property is set to true because it seems that the camel context is created before all the CDI bean wiring and dependency injection happens. I need to do further digging to understand entirely how this works.

Camel Routes XML Files

Lets set up the routes in the src/main/resources/camel/routes.xml file.

<?xml version="1.0" encoding="UTF-8" ?>
<routes
  xmlns="http://camel.apache.org/schema/spring"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"
>
  <route id="receive-kafka-events">
    <from uri="direct:receive-events"></from>
    <log message="Received from kafka broker : ${body}"></log>
  </route>

  <route id="send-kafka-events">
    <from uri="servlet:send"></from>
    <log message="Sending"></log>
    <convertBodyTo type="java.lang.String"></convertBodyTo>
    <bean ref="smallrye-producer" method="produce(${body})"></bean>
    <setBody>
      <constant>Message sent</constant>
    </setBody>
  </route>
</routes>

The first route is to consume messages coming from Kafka. We will wire later on a camel producer template to send messages to this route from any other class in application

The second route listens to http requests coming on the path http://0.0.0.0:8080/camel/send. It will use a bean that incorporates an emitter to send messages.

Configure sinks for Kafka topics

Application properties

mp.messaging.incoming.in-events.connector=smallrye-kafka
mp.messaging.incoming.in-events.topic=events
mp.messaging.incoming.in-events.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

mp.messaging.outgoing.out-events.connector=smallrye-kafka
mp.messaging.outgoing.out-events.topic=events
mp.messaging.outgoing.out-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer

We will configure a smallrye-kafka connector for incoming messages and outgoing messages

Create a consumer

Now lets go ahead and create a consumer with the @Incoming annotation. We will need to inject a Camel ProducerTemplate instance into the consumer so that the message can be forwarded to our Camel direct endpoint

 package agilabs; import
javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import
org.apache.camel.ProducerTemplate; import
org.eclipse.microprofile.reactive.messaging.Incoming; @ApplicationScoped public
class Consumer { @Inject ProducerTemplate camelProducer; @Incoming("in-events")
public void consume(String message) {
camelProducer.sendBody("direct:receive-events", message); } } 

Create a producer

Now lets create a producer. Note that this producer will be used by the camel xml context. So it needs to support reflection to find the different methods dynamically. We need to register this class for reflection so that it can work once compiled to native code.

 package agilabs; import
javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import
org.apache.camel.ProducerTemplate; import
org.eclipse.microprofile.reactive.messaging.Incoming; import
io.quarkus.runtime.annotations.RegisterForReflection; import
io.smallrye.reactive.messaging.annotations.Emitter; import
io.smallrye.reactive.messaging.annotations.Stream; @ApplicationScoped
@RegisterForReflection public class Producer { @Inject @Stream("out-events")
Emitter<String>
  emitter; public void produce(String message) { emitter.send(message); } } 

Wiring it all together

Now that we have everything configured. We need to wire everything together. Register the SmallRye producer into the camel context and inject a camel producer template in the Quarkus CDI context.

As mentioned earlier, it seems that the Camel context is started earlier than the CDI context, so when Camel tries to look for a bean called smallrye-producer it will fail to start because it can't find it. This also comes from the fact that ApplicationScoped beans are instantiated in a lazy fashion. Since the smallrye-producer is never called in the CDI context, it will never be initialized. For these reasons we need to observe the Camel InitializedEvent and register the SmallRye producer at that specific time.

The @Produces annotation provides the Camel producer template to the CDI context.

 package agilabs; import javax.enterprise.event.Observes;
  import javax.enterprise.inject.Produces; import javax.inject.Inject; import
  org.apache.camel.CamelContext; import org.apache.camel.ProducerTemplate;
  import org.slf4j.Logger; import org.slf4j.LoggerFactory; import
  io.quarkus.camel.core.runtime.InitializedEvent; import
  io.smallrye.reactive.messaging.annotations.Emitter; import
  io.smallrye.reactive.messaging.annotations.Stream; public class CamelWiring{
  @Inject Producer smallRyeProducer; @Inject CamelContext camelContext; public
  void registerSmallryeEmitter( @Observes InitializedEvent ev ) throws Exception
  { camelContext.getRegistry().bind("smallrye-producer", smallRyeProducer); }
  @Produces public ProducerTemplate buildCamelProducerTemplate() throws
  Exception { return camelContext.createProducerTemplate(); ; } } 

Compiling and testing the project

Now lets compile it to native code. The compilation is quite memory & CPU intensive. I guess this is due to the costly compilation of XML support libraries. At peak I observed 9.8 GB of memory consumption. The whole compilation process took about 7 to 8 minutes

git clone https://github.com/ngtranminhtuan/quarkus-kafka-camel-servlet.git
cd quarkus-kafka-camel-servlet
mvn package -Pnative

If you are in a rush. I have already built a container image for you. You can run everything wih only one command using the docker-compose.yml file. It will boot a zookeeper and a kafka broker using images provided by the Strimzi project and also a container with the current project.

docker-compose up

You can also get the container image directly here :
https://hub.docker.com/r/ngtranminhtuan/quarkus-kafka-camel-servlet

Test it!

curl http://localhost:8080/camel/send  -X POST -H "Content-Type: text/plain"  -d 'Hello there!!'

The output should be

quarkus-kafka-camel-servlet_1  | 2019-07-05 13:12:58,025 INFO  [send-kafka-events] (executor-thread-1) Sending
quarkus-kafka-camel-servlet_1  | 2019-07-05 13:12:58,027 INFO  [io.sma.rea.mes.kaf.KafkaSink] (vert.x-eventloop-thread-1) Message org.eclipse.microprofile.reactive.messaging.Message$$Lambda$5e1799c60041209b57937765b33186671a72f6f8@7f35f7425e98 sent successfully to Kafka topic 'events'
quarkus-kafka-camel-servlet_1  | 2019-07-05 13:12:58,027 INFO  [receive-kafka-events] (vert.x-eventloop-thread-0) Received from kafka broker : Hello there!

Thanks for reading !

Resources I used for inspiration

Originally published July 04, 2019
Latest update July 05, 2019

Related posts :