Axiomatic[Dev]

Creating a Kafka Consumer with Kafka, Kotlin, PostgreSQL, and Debezium

Continuing with tools of the trade, we’re going to explore writing a Consumer for a Kafka topic in Kotlin. We’ll generate these messages with debezium via an http endpoint so that the process will be end to end. Lucky for us, we can build on some of what we’ve already been working on to implement this solution.

We’ll do our work in the winch project

What’s Required

Overview

winch

We’ll setup instrumentation and polymer to send a continuous flow of messages into fast-data-dev (kafka) where we can consume them by winch and print “received!” when we get a new message

Getting Started

Let’s make sure we can get messages into fast-data-dev in an automated fashion first.

./scripts/startup

This should start up all required containers and register the debezium connector with kafka-fast-data-dev. Now e can navigate to localhost:3030 and check the connector-ui to make sure we’ve registered correctly.

topic-ui

Now that we’ve verified the connector’s attached correctly. Let’s move on to kicking off instrumentation with polymer.

./gradlew runenv

To get the most control out of our test environment, we’ll run instrumentation locally. To do this, clone instrumentation and run the following command from the root directory:

make run

instrumentation

Ok now we can see message generated by instrumentation via polymer, next we can move to actually writing the consumer. We’re going to start with the most simple and straight forward example from around the internet. Using this blog post from the guys over at confluent to get started:

We create a service to consume the simple-message-topic message

package com.jstone28.winch.services

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
class SimpleMessageTopicConsumer {

    @KafkaListener(topics = ["simple-message-topic"], groupId = "group_id")
    fun processMessage(message: String) {
        println("got message:" + message)
    }

}

With this consumer, we can take advantage of the built-in springboot functionality to process message found

kafka consumer