Follow Us on Twitter

Integratie met Apache Kafka

April 2017 - In de integratiewereld hoor ik de laatste tijd steeds vaker mensen over Apache Kafka spreken. Ook het aantal referenties naar artikelen en blogposts m.b.t. Apache Kafka laat een stijgende lijn zien op mijn twitter feed. Tijdens het recente Oracle Fusion Middleware Forum in Split bleek dat ook Oracle het product heeft omarmd om het te gebruiken in haar almaar uitdijende Cloud portfolio.

Hoog tijd om er zelf eens wat meer over te weten. In dit Whitebook wordt uitgelegd wat Apache Kafka is en waar het voor toegepast kan worden.

Daarnaast wordt er met een integratieblik gekeken naar manieren om met Apache Kafka te kunnen integreren. Hierbij worden een aantal technieken aan de hand van (code-)voorbeelden gedemonstreerd. Zo wordt gedemonstreerd hoe je middels Java en Mule met Kafka kan integreren. De codevoorbeelden zijn te downloaden via Github.

Apache Kafka

Kafka is een product dat initieel door LinkedIn is ontwikkeld. LinkedIn gebruikt Kafka voor het monitoren en verwerken van activiteiten op hun websites. Denk hierbij aan zoekvragen, pageviews, getoonde advertenties, etc. Gezien de enorme hoeveelheden van dit soort gebeurtenissen die dagelijks worden gegenereerd, is er de behoefte aan een product dat grote hoeveelheden van deze gebeurtenissen kan verwerken en schaalbaar is met een minimum aan overhead. Ik het kader hiervan is Kafka ontwikkeld. In 2011 is het beschikbaar gesteld als open source.

De website van Apache Kafka beschrijft het product simpelweg als een "distributed streaming platform", hetgeen wil zeggen dat Kafka een manier biedt om schaalbaar real-time data te streamen op basis van Publish-Subscribe (pub/sub) waarbij hoge volumes bereikt kunnen worden. Met name die hoge volumes is wat het product onderscheidt van traditionele message broker oplossingen, die ook pub/sub ondersteunen.

Mogelijke toepassingen van Apache Kafka zijn o.a.:

  • Messaging (vergelijkbaar met traditionele message brokers);
  • Website activity tracking (dit is waar LinkedIn het initieel voor heeft ontwikkeld);
  • Business activity monitoring (vergelijkbaar met wat een product als Oracle BAM levert);
  • Stream processing, waarbij ruwe data verrijkt, geaggregeerd of getransformeerd wordt.

Voor uitgebreide informatie, zie: kafka.apache.org. Daar staat ook een complete lijst van toepassingsmogelijkheden.

Messaging

Een van de toepassingen van Apache Kafka is voor data- en/of berichtuitwisseling tussen systemen en applicaties middels het Publish-Subscribe integratiepatroon. In die zin is het vergelijkbaar met traditionele message brokers zoals ActiveMQ, Weblogic JMS en Websphere MQ. Het grote verschil is dat Apache Kafka van de grond af aan is opgebouwd met schaalbaarheid in het achterhoofd en dat het mede daardoor grote hoeveelheden throughput kan verwerken (vergelijking Kafka en ActiveMQ / RabbitMQ).

Net als bij de genoemde message brokers maakt Kafka gebruik van topics. Vergeleken met een traditionele topic is een Kafka topic eerder een combinatie van een queue en een topic, waarbij de voordelen van beiden - schaalbaarheid én multi-consumer - zijn gecombineerd.

Voor integratie biedt Kafka de zgn. Producer API (voor het vastleggen van data op een topic) en Consumer API (voor het consumeren van de data op een topic) aan.

Plaats van Apache Kafka binnen een integratie landschap
Plaats van Apache Kafka binnen een integratie landschap.

Streaming

Apache Kafka kan ook worden toegepast om datastromen te transformeren. Data wordt van een of meerdere topics geconsumeerd en getransformeerd, waarna de output vervolgens naar een of meerdere output topics gestuurd wordt. Om hier gebruik van te maken biedt Kafka de .Installatie

Installeren van Kafka is vrij rechttoe rechtaan. Kijk op How to install Apache Kafka on Ubuntu voor een stap voor stap beschrijving van de installatie op Ubuntu 14.04.

Na installatie kan de Kafka server middels een command prompt gestart worden:

~/kafka/bin/kafka-server-start.sh ~/kafka/config/server.properties

Shellscripts

Kafka's installatie bevat twee handige shell scripts waarmee data op een topic geproduceerd en geconsumeerd kan worden.

Via een command prompt kan middels onderstaand commando het consumeren van een topic (genaamd ShellTopic in dit voorbeeld) gestart worden.

~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ShellTopic --from-beginning

De topic ShellTopic hoeft niet eerst gecreëerd te worden (je krijgt hier initieel wel een melding over, maar daarna is de topic aanwezig).

Via een tweede command prompt kan vervolgens wat data gepubliceerd worden op diezelfde topic:

echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic ShellTopic > /dev/null

Elk bericht dat via het producer script wordt aangeboden komt vervolgens in het consumer window tevoorschijn:

Kafka Consumer Window

Een lijst met aanwezige topics is ook via een shellscript te raadplegen:

~/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

Met hetzelfde shell script kunnen topics gecreëerd, bijgewerkt en verwijderd worden. Om de topic die hierboven gecreëerd is, weer te verwijderen kan bv. het volgende commando worden gebruikt:

~/kafka/bin/kafka-topics.sh --delete --topic ShellTopic --zookeeper localhost:2181

Integratie met Java

Pub/Sub

Een van de mogelijkheden om vanuit applicaties te integreren met Kafka, is plain old Java. Hoewel publish en subscribe op een Kafka topic heel veel weg heeft van integratie met JMS, biedt Kafka echter geen ondersteuning voor de JMS API.

Kafka biedt een eigen set aan APIs aan waarmee berichten op topics geproduceerd en geconsumeerd kunnen worden. Het heeft er alle schijn van dat deze APIs nog volop in ontwikkeling zijn. De codevoorbeelden die op internet voorhanden zijn, blijken vaak al achterhaald en gebruik te maken van deprecated code. Onderstaande codevoorbeelden zijn succesvol getest met Kafka versie 0.10.2 .

Zorg dat onderstaande Maven dependency aanwezig is in je project om de consumer en producer code te laten compileren.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.0</version>
</dependency>

Producer API

Onderstaand code voorbeeld kan worden gebruikt om de String "Hello World" een aantal malen (gelijk aan de waarde van de events parameter) te produceren richting een topic.

public class DataProducer {
  private static final String BROKERHOST = "127.0.0.1";
  private static final String BROKERPORT = "9092";

  private final String topic;

  public DataProducer(String topic) {
    this.topic = topic;
  }
   
  public void produce(long events) {
    Properties producerProps = new Properties();
    producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
    producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
      for (long nEvents = 0; nEvents < events; nEvents++) {
        String key = "Producer";
        String value = "Hello world " + nEvents;

        ProducerRecord<String, String> data = new ProducerRecord<>(topic, key, value);
        producer.send(data);
      }
    }
  }
}

Het ProducerRecord is in bovenstaande voorbeeld van het type String (key), String (value). Dit kan echter ook bv. een Integer of een ByteArray zijn. Let er dan wel op dat de properties key.serializer en value.serializer de juiste serializer class moeten krijgen. Keys in een record kunnen gebruikt worden voor message correlatie en hoeven niet uniek te zijn.

Kafka levert een aantal (de-)serializers voor standaard datatypes, maar biedt je ook de mogelijkheid om zelf een serializer te schrijven waarmee je bv. custum Java objecten kunt gebruiken voor data-uitwisseling.

Indien een consumer shell is gestart die naar dezelfde topic (PubSubTopic) luistert, kun je in bijbehorend command window de message voorbij zien komen om te testen of het werkt:

Kafka Consumer Shell met Producer code output
Output van de consumer shell nadat de producer code is uitgevoerd

Consumer API

Het consumeren van data van een topic heeft nauwelijks meer om handen dan het produceren. Zie onderstaande code.

public class DataConsumer {

  private static final String BROKERHOST = "127.0.0.1";
  private static final String BROKERPORT = "9092";

  private final String topic;
  private final String group;

  public DataConsumer(String topic, String group) {
    this.topic = topic;
    this.group = group;
  }

  public void consume() {
    Properties consumerProps = new Properties();
    consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group);
    consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
    consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
      consumer.subscribe(Arrays.asList(new String[]{topic}));

      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
        for (ConsumerRecord<String, String> record : records) {
          System.out.println(group + " " + record.offset() + ": " + record.key() + ":" + record.value());
        }
      }
    }
  }
}

Bovenstaande code luistert naar dezelfde topic als waar het producer voorbeeld naartoe produceert. Als de consumer draait en de producer code uitgevoerd wordt, is te zien dat de consumer code zijn werk doet.

Output van de Kafka consumer code, nadat de producer code is uitgevoerd
Output van de consumer code, nadat de producer code is uitgevoerd

Als ook de shell consumer nog draait, zie je daar - zoals verwacht in een Pub-Sub integratie - ook de berichten binnenkomen.

Een interessant aspect bij het consumeren is het zgn. group id. Indien meerdere consumer processen hetzelfde group id gebruiken wordt het record maar door 1 van deze consumers opgepakt. Bij verschillende group id’s wordt het record per groep eenmaal verwerkt.

Streaming API

Een interessant onderdeel van Kafka is de streaming API. Data welke binnenkomt op een queue, kan hiermee middels streaming direct worden getransformeerd en daarna worden afgeleverd op een andere queue.

Voor de streaming API is een andere Maven dependency nodig:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.0</version>
</dependency>

De Java code in onderstaand voorbeeld, transformeert de (string) records die binnenkomen op een input queue naar upper case. Een simpel voorbeeld, maar het laat wel zien hoe krachtig Kafka streams zijn. In de tranformatiestap kun je analoog aan Java 8 streams helemaal los gaan met transformaties, filters, aggregaties en meer.

public class DataStreamer {

private static final String BROKERHOST = "127.0.0.1";
private static final String BROKERPORT = "9092";

private final String inputTopic;
private final String outputTopic;

  public DataStreamer(String inputTopic, String outputTopic) {
    this.inputTopic = inputTopic;
    this.outputTopic = outputTopic;
  }
  
  public void stream() {
    Properties streamerProps = new Properties();
    streamerProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-capitalize");
    streamerProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
    streamerProps.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamerProps.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

    final Serde<String> stringSerde = Serdes.String();

    KStreamBuilder builder = new KStreamBuilder();

    KStream<String, String> values = builder.stream(stringSerde, stringSerde, inputTopic);
    values.mapValues(String::toUpperCase).to(stringSerde, stringSerde, outputTopic);

    KafkaStreams streams = new KafkaStreams(builder, streamerProps);
    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  }
}

Zie hieronder het resultaat van de consumer als zowel de streamer als de consumer gestart zijn, waarbij die laatste luistert naar de output queue van de streamer en de producer wordt gestart die op zijn beurt de input queue van de streamer bedient.

Kafka consumer met zowel de streamer de consumer gestart 

Integratie met een platform - Mule

Inmiddels zijn er al een aantal middleware platformen die integratie met Kafka ondersteunen. Zo kan er bv. via de Oracle Service Bus - zonder officiële ondersteuning van Oracle overigens - data met Kafka uitgewisseld worden. Zie hiervoor de blogpost Oracle Service Bus Transport for Apache Kafka (Part 1) van het Oracle A-team. 

In dit Whitebook werpen we een blik op integratie met Mule.

Installatie Apache Kafka connector

Ten behoeve van integratie met Mule is inmiddels een connector beschikbaar voor Kafka. Deze is niet standaard in Anypoint Studio beschikbaar, maar kan vrij eenvoudig geïnstalleerd worden.

Voer onderstaande stappen uit om de connector beschikbaar te maken in Anypoint Studio:

  • Install new software
  • Selecteer Anypoint Connectors Update Site
  • Zoek op Apache Kafka Connector
  • Installeer de connector

Integratie Mule en Kafka

Nu we de beschikking hebben over de Kafka connector kunnen we een eenvoudig Mule project optuigen om naar een topic te produceren en van een topic te consumeren. Dit project ziet er uiteindelijk als volgt uit (ook beschikbaar in de GitHub repository):

Kafka project in MuleSoft 

Middels een PUT request op een REST endpoint (bv. een tool als Postman) kan een bericht worden aangeboden. Hiervan wordt vervolgens de payload op een Kafka topic genaamd MuleTopic geplaatst. Dit is uit gemodelleerd in de mule-kafkaProducerFlow.

De mule-kafkaConsumerFlow zorgt ervoor dat vervolgens de berichten uit de topic MuleTopic worden opgepakt waarna tenslotte de payload wordt gelogd.

In XML vorm (namespace declaraties zijn weggelaten voor de leesbaarheid) ziet bovenstaand project er als volgt uit:

<?xml version="1.0" encoding="UTF-8"?>
<mule>
  <http:listener-config name="HTTP_Listener_Configuration" host="localhost" port="8081" doc:name="HTTP Listener Configuration"/>
  <apachekafka:config name="Apache_Kafka__Configuration" bootstrapServers="localhost:9092" consumerPropertiesFile="consumer.properties" doc:name="Apache Kafka: Configuration" producerPropertiesFile="producer.properties"/>
  <flow name="mule-kafkaProducerFlow">
    <http:listener config-ref="HTTP_Listener_Configuration" path="/mule-kafka" allowedMethods="PUT" doc:name="HTTP"/>
    <apachekafka:producer config-ref="Apache_Kafka__Configuration" topic="MuleTopic" key=""MuleProducer"" doc:name="Apache Kafka"/>
  </flow>
  <flow name="mule-kafkaConsumerFlow">
    <apachekafka:consumer config-ref="Apache_Kafka__Configuration" topic="MuleTopic" partitions="1" doc:name="Apache Kafka (Streaming)"/>
    <logger message="#[payload]" level="INFO" doc:name="Logger"/>
  </flow>
</mule>

De belangrijkste configuratiestappen zijn de verwijzing naar de Kafka server (localhost:9092) en de property files voor producing en consuming.

In ons voorbeeld is de producer file leeg, de consumer file bevat alleen het group id als property:

group.id=mule

Na opstarten van dit project levert het uitvoeren van een PUT request met body "Hello Mule with consumer!" op adres http://localhost:8081/mule-kafka de volgende logregel op in de Anypoint Studio console:

Output Anypoint Studio
Output Anypoint Studio na het sturen van een PUT request

Op de website van MuleSoft vind je meer informatie over (de configuratie van) de Mule Apache Kafka connector.

Conclusie

Apache Kafka lijkt heel erg op traditionele message brokers, de meerwaarde zit met name in de schaalbaarheid en extreem hoge performance. Indien je een message broker zoekt met bijzonder hoge throughput die bovendien weinig beslag op resources legt, is Kafka een zeer goede keuze.

In dit Whitebook hebben we een aantal technieken gedemonstreerd voor integratie met Apache Kafka. We hebben laten zien hoe je door middel van de Java APIs met Kafka kan integreren en hoe je d.m.v. een integratie tool als Mule met Kafka kan integreren. Bij de integratie middels de Java API viel op dat deze nog volop in ontwikkeling is (getuige ook dat de huidige versie 0.12 is). De kans is dus aanwezig dat de codevoorbeelden over een aantal maanden achterhaald zijn.

Ook valt op dat er geen ondersteuning is voor de JMS API. Ongetwijfeld is hiervoor gekozen omdat Kafka toch wel afwijkt van standaard JMS en allicht is ook om performance redenen afgeweken van de JMS specificatie. Dit maakt het wel wat lastiger om met Kafka aan de slag te gaan, aangezien je eventuele JMS API kennis geen toegevoegde waarde meer heeft. Desalniettemin hebben de code voorbeelden aangetoond dat de API vrij toegankelijk is en het niet bijster ingewikkeld is om hem te gebruiken.

Naast Java zijn er overigens voor de meeste andere gangbare programmeertalen ook clients beschikbaar. Denk onder andere aan Python, .Net en C/C++. Een complete lijst vind je op de hier.

Integratie met Mule is middels de Mule Apache Kafka connector ook een fluitje van een cent gebleken. Mule specialisten zullen vrij snel met Kafka aan de slag kunnen gaan.

Dit Whitebook heeft maar een tipje van de sluier opgelicht van de mogelijkheden en toepasbaarheid van Apache Kafka. Over onderdelen als clustering en scaling bv. zou ook een compleet Whitebook geschreven kunnen worden.

Waardering:
 
Tags:

Reacties

Nieuwe reactie inzenden

De inhoud van dit veld is privé en zal niet openbaar worden gemaakt.

Meer informatie over formaatmogelijkheden

CAPTCHA
Deze vraag is om te testen of u een persoon bent en om spam te voorkomen
Image CAPTCHA
Enter the characters shown in the image.