Skip to content
braindose.blog
Menu
  • Home
  • Open Source
    • .Net
    • Apache Camel
    • Apache Kafka
    • APIs
    • Containers
    • Data Services
    • Development
    • DevOps
    • Kubernetes
    • Microservices
    • Monitoring
    • Openshift
    • Quarkus
    • Serverless
    • ServiceMesh
    • Workflow & Business Rules
  • Raspberry Pi
  • Series
    • Event-Driven Payment
    • Payment
    • K8s on RPI4
  • Solution
    • Application Modernization
  • Others
  • About
Menu
Debezium CDC with Camel Connector

Debezium Change Data Capture without Apache Kafka Cluster

Posted on December 8, 2022August 11, 2023 by CK Gan
0
0
0
0
0
0
0
0
0

Table of Contents

  • Overview
    • What is Debezium Engine?
    • What is Apache Camel?
    • What is Quarkus?
  • Implementing Debezium Engine in Quarkus
    • Overview
    • Implementing Camel Quarkus Project
      • Bootstrap the Quarkus Project
      • Defining the Debezium PostgresSQL Connector Settings
      • Enriching Data with Database Connection
      • Converting Java Object to JSON
      • Complete Camel Codes
  • Running the Samples
  • Summary

Overview

Debezium is developed base on Kafka Connect framework and we need Apache Kafka cluster to store the captured events data from source databases. Kafka Connect provides excellent fault tolerance and scalability by making sure there is always connectors running in distributed mode, in the cluster. Sometime database use case does not require this level of tolerance and reliability, or maybe it is not justifiable to implement an Apache Kafka Cluster just to capture a small set of tables data, but you still need Change Data Capture (CDC). This is where Debezium Engine come into the picture.

What is Debezium Engine?

Debezium engine is Java API modules that you can embedded into your application so that the database changes can be captured using the same approach by monitoring the committed database redo log. Debezium Engine API module allows your application to perform CDC and data processing before the event data is sent to the downstream consumers such as messaging platform, databases, or other applications, using your choices of protocols and data formats.

In order to use the Debezium Engine, you will need to add the required dependencies. The detail of how to implement using Debezium Engine API is not the focus of this post but we will look at how to perform CDC using the Apache Camel.

For details on Debezium Engine, please refer to the Debezium Engine product documentation.

To implement using Debezium Engine API, you need to write your own codes and logic. For CDC and data processing use cases, we often requires to enrich the existing database event data. Data enriching can be done by retrieving the data from the same database or integrating to other systems (or databases). The integration implementation can be challenging and requires a lot of effort. This can be simplified by using the Debezium Connector Extensions provided by Apache Camel.

What is Apache Camel?

Apache Camel is an open source framework for message-oriented middleware with features such as rule-based routing, mediation and transformation. It supports wide rage of integration protocols and data formats, and provides integration best practices via the industry proven Enterprise Integration Patterns.

In most cases, nowadays, we want to be able to implement the solution for container, microservices and serverless. This is where we can implement Apache Camel using the Apache Camel extensions for Quarkus.

In other words, you can use Quarkus framework to implement the CDC using the Debezium Connector Extensions and at the same time being able to integrate to 3rd party systems using the Apache Camel Extension for Quarkus.

All-in-one meal package!

What is Quarkus?

Quarkus is a Java framework tailored for deployment on Kubernetes. Key technology components surrounding it are OpenJDK HotSpot and GraalVM. The goal of Quarkus is to make Java a leading platform in Kubernetes and serverless environments while offering developers a unified reactive and imperative programming model to optimally address a wider range of distributed application architectures.

It is statically proven that Quarkus can perform better compared to other Java frameworks and thus it is not only suitable for microservices but also serverless implementation. You can find out more about Quarkus performance information on this blog.

Implementing Debezium Engine in Quarkus

Overview

The scenarios in this post is looking at how can we capture database changes from a specific table in PostgresSQL database and perform a simple data enrichment before this enriched data is converted into JSON format and stored in a plain text file.

CDC using Debezium Connector for Camel Quarkus

The Quarkus application is implemented using the Debezium PostgresSQL Connector and it is capturing the new records created in the braindose.orders table. New orders are created by the Simulator apps. Each new order will create a new entry in the braindose.orders and braindose.customerorders tables. In this case, we only interested for the data in the orders table.

The event data is further enriched with customer name and custid in the Camel integration flow. This is done by performing a SQL query to the braindose.customer and braindose.customerorder tables, using the Apache Camel SQL Extension.

In real life scenario, the integrations and data enrichments can be coming from external 3rd party systems that can be located in the same data centre or at remote locations.

Note: This is a simplified demo showcase. Many details implementations such as circuit breaker and exception handling are omitted for simplication.

Now, let’s look at the details at the next sections.

Implementing Camel Quarkus Project

Bootstrap the Quarkus Project

You can quickly bootstrap a Quarkus project at code.quarkus.com to include all the dependencies required.

Using code.quarkus.com to bootstrap Quarkus project
Bootstrap Quarkus Project

You can also bootstrap the Quarkus project using quarkus or mvn command line interfaces.

You need to include the following dependencies.

    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-core</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-language</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-bean</artifactId>
    </dependency>
    <!- Dependency for Debezium PostgresSQL Connector ->
    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-debezium-postgres</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-direct</artifactId>
    </dependency>
    <!- Dependency for using SQL extension to connect to database ->
    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-sql</artifactId>
    </dependency>
    <!- Dependency for creating and reading files ->
    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-file</artifactId>
    </dependency>
    <!- Dependency for JSON ->
    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-jackson</artifactId>
    </dependency>

Defining the Debezium PostgresSQL Connector Settings

You need to create a Java class to get started. This Java class need to extend the org.apache.camel.builder.RouteBuilder class. You can refer the Routes.java at the GitHub for the complete sample codes used in this post.

package blog.braindose.demo.dbz.camel;

import org.apache.camel.Message;
import org.apache.camel.builder.RouteBuilder;
import org.apache.kafka.connect.data.Struct;

import blog.braindose.demo.dbz.camel.model.CustomerOrder;

import java.util.List;
import java.util.Map;

public class Routes extends RouteBuilder {
       // ...
       // Omitted codes 
       // ...
       @Override
       public void configure() throws Exception {
       // ...
       // Omitted codes 
       // ...
       }
}

Next, we need to define the settings for Debezium PostgresSQL connector. For this case, we are going to configure the values via the environmental variables. For example, dbz.offset.file is configured via DBZ_OFFSET_FILE environmental variable.

This approach allow us to dynamically define the values when we run this project as container later. We will look into this later when we configure the Docker compose file.

For a complete list of configurations, please refer to the Connector documentation at Apache Camel website.

String DBZ_SETTINGS = "debezium-postgres:dbz-camel?offsetStorageFileName={{dbz.offset.file}}" +
                     "&databaseDbname={{db.name}}" +
                     "&databaseHostname={{db.host}}" +
                     "&databasePort={{db.port}}" +
                     "&databaseUser={{db.user}}" +
                     "&databasePassword={{db.password}}" +
                     "&databaseServerName={{dbz.dbservername}}" +
                     "&databaseHistoryFileFilename={{dbz.dbhistoryfile}}" +
                     "&schemaIncludeList={{dbz.schemaincludelist}}" +
                     "&tableIncludeList={{dbz.tableincludelist}}";

Next, we are going to implement the Camel logic to read committed transactions from PostgresSQL database redo log.

First thing we need to do is to configure the Debezium PostgresSQL connector using the DBZ_SETTINGS variable from the above code.

In the above example, we also define which database schema and table to include in the CDC via the schemaIncludeList and tableIncludeList configuration.

from(DBZ_SETTINGS)

Next, the CDC result is populated into the CustomerOrder POJO object

   .process(
        exchange -> {
            // Create a CustomerOrder POJO
            Message in = exchange.getIn();
            final Struct body = in.getBody(Struct.class);
            CustomerOrder order = new CustomerOrder(
                body.getString("orderid"),
                body.getString("orderdate"),
                body.getString("sku"),
                body.getString("description"),
                body.getFloat64("amount"));
                in.setBody(order);
    })

Enriching Data with Database Connection

With the even data from the orders table, we are using the the orderId to query the same database to retrieve the customer name and custid. This is done via the SQL Camel extension. The SQL query detail can be found from the customer.sql file located in the project’s resource directory.

The SQL query result is then updated into the existing CustomerOrder object.

    .setProperty("orderDetail", simple("${body}"))
    .setProperty("orderId", simple("${body.orderId}"))
    // Connect to the database to query the customer name and custid using the orderid
    .to("sql:classpath:sql/customer.sql")
    .log("SQL Query from customer table: ${body}")
    .process(
        exchange -> {
            CustomerOrder order = (CustomerOrder) exchange.getProperty("orderDetail");
            Message in = exchange.getIn();
            String custname = (String) ((Map) ((List) in.getBody()).get(0)).get("name");
            Integer custId = (Integer) ((Map) ((List) in.getBody()).get(0)).get("custId");
            order.setCustName(custname);
            order.setCustId(custId);
            in.setBody(order);
    })

In order to connect to the PostgresSQL database, we need to configure the Data Source in the Quarkus project.

This is done via the application.properties configuration as per the following. You can refer to Quarkus Application Configuration guide for more detail for how to utilize the application configurations feature provided by Quarkus framework.

We are configuring these settings using environmental variables. We will look at this when we configure the Docker compose file in the later section.

quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=${DB_USER}
quarkus.datasource.password=${DB_PASSWORD}
quarkus.datasource.jdbc.url=jdbc:postgresql://${DB_HOST}:${DB_PORT}/${DB_NAME}
quarkus.datasource.jdbc.max-size=16
quarkus.native.resources.includes = sql/*.sql

Note: The Apache Camel integrations are provided by extensions and components. There are around 306 extensions and 288 components available today in Camel Extension for Quarkus.

Converting Java Object to JSON

At the end of the implementation, the CustomerOrder object is converted into JSON and appended into the preconfigured plaintext file.

    // convert the customerOrder Java object into JSON formatted string
    .marshal().json()
    .log("Transform Order Object to Json: ${body}")
    // append to the external file
    .to("file:{{output.dir}}?fileName={{output.filename}}&appendChars=\n&fileExist=Append")
    .log("JSON data saved into file.")

Note: Other than JSON format, there are around 43 data formats supported by Apache Camel extension for Quarkus.

Complete Camel Codes

The following shows the complete Camel implementation.

// Capture database event from PostgresSQL database
from(DBZ_SETTINGS)
    .log("Event received from Debezium : ${body}")
    .process(
        exchange -> {
            // Create a CustomerOrder POJO
            Message in = exchange.getIn();
            final Struct body = in.getBody(Struct.class);
            CustomerOrder order = new CustomerOrder(
                body.getString("orderid"),
                body.getString("orderdate"),
                body.getString("sku"),
                body.getString("description"),
                body.getFloat64("amount"));
                in.setBody(order);
    })
    .log("Captured order: ${body}")
    .setProperty("orderDetail", simple("${body}"))
    .setProperty("orderId", simple("${body.orderId}"))
    // Connect to the database to query the customer name and custid using the orderid
    .to("sql:classpath:sql/customer.sql")
    .log("SQL Query from customer table: ${body}")
    .process(
        exchange -> {
            CustomerOrder order = (CustomerOrder) exchange.getProperty("orderDetail");
            Message in = exchange.getIn();
            String custname = (String) ((Map) ((List) in.getBody()).get(0)).get("name");
            Integer custId = (Integer) ((Map) ((List) in.getBody()).get(0)).get("custId");
            order.setCustName(custname);
            order.setCustId(custId);
            in.setBody(order);
    })
    .log("Enriched Order: ${body}")
    // convert the customerOrder Java object into JSON formatted string
    .marshal().json()
    .log("Transform Order Object to Json: ${body}")
    // append to the external file
    .to("file:{{output.dir}}?fileName={{output.filename}}&appendChars=\n&fileExist=Append")
    .log("JSON data saved into file.")
;

Running the Samples

You can run the demo by using the Docker compose file provided in the GitHub project. Clone the GitHub project onto your local disk and run the following command in the camel-dbz project folder.

You will need to have JDK and Apache Maven installed in your local machine in order to build the project before packaging them to run as container. The build process will take a while if this is the first time you perform the build.

$ docker compose up --build

In the Docker compose file, please note that the following environmental variables are configured for the camel service which are used to configure the Debezium PostgresSQL connector in the earlier section.

    environment:
     - DB_NAME=dbzdemo
     - DBZ_OFFSET_FILE=/deployments/data/offset-file-1.dat
     - DB_HOST=postgres
     - DB_PORT=5432
     - DB_USER=camel
     - DB_PASSWORD=camel
     - DBZ_DBSERVERNAME=camel-dbz-connector
     - DBZ_DBHISTORYFILE=/deployments/data/history-file-1.dat
     - DBZ_SCHEMAINCLUDELIST=braindose
     - DBZ_TABLEINCLUDELIST=braindose.orders
     - OUTPUT_DIR=/tmp/dbzdemo
     - OUTPUT_FILENAME=dbz-camel-order.output

The Simulator application is written in Quarkus framework and is using cron job to randomly create new orders in the braindose.orders table.

camel-dbz-simulator-1  | 2022-12-08 06:20:00,008 INFO  [blo.bra.dem.deb.cam.sim.DBSimulator] (executor-thread-0) Simulating new order now ...
camel-dbz-simulator-1  | 2022-12-08 06:20:00,011 INFO  [blo.bra.dem.deb.cam.sim.DBSimulator] (executor-thread-0) The lucky customer is: Customer [custId=14, name=Anderson Hilton]
camel-dbz-simulator-1  | 2022-12-08 06:20:00,011 INFO  [blo.bra.dem.deb.cam.sim.DBSimulator] (executor-thread-0) The lucky SKU is: Sku [sku = MW11287, decription=iPad 9th Generation,price = 329.0]

You will observe the following output at the command prompt when the Camel service is processing the CDC event data.

camel-dbz-camel-1      | 2022-12-08 06:16:18,922 INFO  [route1] (Camel (camel-1) thread #1 - DebeziumConsumer) Event received from Debezium : Struct{orderid=B-20221208-061618027-81537,orderdate=2022-12-08T06:16:18.027Z,sku=MW84287,description=Green Alpine Loop,amount=99.0}
camel-dbz-camel-1      | 2022-12-08 06:16:18,925 INFO  [route2] (Camel (camel-1) thread #1 - DebeziumConsumer) Captured order: Order [orderId=B-20221208-061618027-81537, custId=0, custName=null, orderdate=Thu Dec 08 06:16:27 GMT 2022, sku=MW84287]
camel-dbz-camel-1      | 2022-12-08 06:16:18,969 INFO  [route2] (Camel (camel-1) thread #1 - DebeziumConsumer) SQL Query from customer table: [{custid=29, name=Alfred Hugo}]
camel-dbz-camel-1      | 2022-12-08 06:16:18,969 INFO  [route2] (Camel (camel-1) thread #1 - DebeziumConsumer) Enriched Order: Order [orderId=B-20221208-061618027-81537, custId=29, custName=Alfred Hugo, orderdate=Thu Dec 08 06:16:27 GMT 2022, sku=MW84287]
camel-dbz-camel-1      | 2022-12-08 06:16:18,971 INFO  [route2] (Camel (camel-1) thread #1 - DebeziumConsumer) Transform Order Object to Json: {"custId":29,"orderId":"B-20221208-061618027-81537","custName":"Alfred Hugo","orderdate":1670480187000,"sku":"MW84287","description":"Green Alpine Loop","amount":99.0}
camel-dbz-camel-1      | 2022-12-08 06:16:18,983 INFO  [route2] (Camel (camel-1) thread #1 - DebeziumConsumer) JSON data saved into file.

The results are appended at the the output file configured at environmental variables OUTPUT_DIR and OUTPUT_FILENAME in the Docker compose file.

 $ tail -f /tmp/dbzdemo/dbz-camel-order.output 

{"custId":20,"orderId":"B-20221208-061553010-96134","custName":"Clinton Hill","orderdate":1670480101000,"sku":"MW2729","description":"Sunglow Solo Loop","amount":49.0}
{"custId":22,"orderId":"B-20221208-061558020-46718","custName":"Sky High","orderdate":1670480121000,"sku":"MW11284","description":"iPad 10th Generation","amount":499.0}
{"custId":14,"orderId":"B-20221208-061603026-75192","custName":"Anderson Hilton","orderdate":1670480186000,"sku":"MW11118","description":"Apple Macbook Pro 16\"","amount":249

Summary

In this post we have gone through quickly how to use the Debezium CDC in Apache Camel for Quarkus. The Debezium Connectors extension provided by Apache Camel is implemented based on the Debezium Engine framework. This provides CDC feature on top of many other capabilities offered by Apache Camel.

Last but not least, Red Hat provides the complete commercially support solution via the Red Hat Integration portfolio. Correction, the Camel Debezium Connector is community supported today. It is my mistake to assume it is supported by Red Hat. Refer code.quarkus.redhat.com for the supportability. Components supported by Red Hat will be indicated with “SUPPORTED” label.

Related posts:

  1. A True Atomic Microservices Implementation with Debezium to Ensure Data Consistency
  2. Migrating a Spring Boot Microservices Application to Quarkus
  3. Implementing Integration Service with Apache Camel
  4. Event-Driven Payment Exceptions Handling Using Kogito

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

 

Follow us on Social Media
x youtube linkedin github

Recent Posts

  • Kubernetes Disaster Recovery

    Kubernetes Disaster Recovery

  • Monitor and Analyze Nginx Ingress Controller Logs on Kubernetes using ElasticSearch and Kibana

    Monitor and Analyze Nginx Ingress Controller Logs on Kubernetes using ElasticSearch and Kibana

  • Running ElasticSearch and Kibana on RPi4 for Logs Analytics

    Running ElasticSearch and Kibana on RPi4 for Logs Analytics

  • Automate Kubernetes etcd Data Backup

    Automate Kubernetes etcd Data Backup

  • K8s on RPi 4 Pt. 5 – Exposing Applications to the World

    K8s on RPi 4 Pt. 5 – Exposing Applications to the World

Archives

AMQ Streams apache camel Apache Kafka Apache Kafka Connect Apoche Kafka Connect application modernization business automation Business Rules CDC CI/CD Container Debezium decision service Docker elastic elasticsearch Event Processing fluentd GraalVM integration Jenkins kibana knative kubernetes logs microservices MongoDB OpenShift payment payment modernization quarkus raspberry pi red hat Red Hat Fuse serverless ServiceMesh springboot synology ubuntu uncluttered email uncluttered inbox wfh work from home work life balance work remotely

©2021 braindose.blog

a little dose to light up your days