A True Atomic Microservices Implementation with Debezium to Ensure Data Consistency

Overview

You probably have known that one of the best practices of microservices design is to implement an application service with it’s own private data service (in case of stateful application), as described in database per service pattern. Eventually when you have grown your application landscape, you will have many microservices and data services, if not thousands could be hundreds. These services run as independent deployable modules but it is very likely they will need to exchange data among themselves or with 3rd party systems.

For example, when a payment request is submitted to the CASA service, it will need to atomically update it’s own database, and at the same time send the transaction data to the other interested consumer services such as the core banking. At most of the time, you also need to log a copy of audit trail for the transaction at each of the processing stages (CASA and core banking). You would not want the CASA service to responsible to update the audit trail data service or calling the core banking service directly, instead of atomicity, they now tightly dependance on each others.

You may think of updating the data services across both services but there is no clean approach for microservices (automated approach like traditional 2PC transaction management) to span the transaction session across multiple systems, which in most cases the systems may not supporting 2PC (two-phase commits) such as messaging platform, file systems and etc. Situation can become complex very quickly that may results in data consistency and integrity problem.

This is where the Outbox Pattern comes to rescue.

Outbox Pattern

According to Outbox pattern, instead of letting the application performs the CRUD operations on multiple data services or calling other services to do so, the application service should just performing the CRUD operations on it’s own database. In order to share the data with other consumer services, it should insert the relevant data into an outbox table in the same database or schema. This will ensure an all-fail or all-success scenario because all these CRUD operations are performed in the same local transaction session.

A separate independent mechanism is required to relay these data from the outbox table to the respective consumer services. In our example, the core banking that need to consume the data for further processing.

Talking about this message delivery mechanism, one of the option is none of other than the popular event driven platform – Apache Kafka, which will be used in the example outlined in this article, as shown on the following diagram.

Outbox Pattern based on Apache Kafka and Debezium

The above diagram illustrates the Outbox pattern implementation using Apache Kafka and Debezium.

Note that each of the application service has it’s own database (in this case PostgreSQL database), and each of the database has it’s own business data table and corresponding outbox event table. The Debezium is responsible to consume these outbox table messages and produces them to the Apache Kafka.

You can see this is the ideal microservices architecture which promote atomicity but at the same time preventing data consistency issues. Most importantly the Debezium Outbox Pattern together with Outbox Quarkus Extension eliminates many manual steps and let the developer focus on business logic implementation.

Let’s take a quick look on Debezium before we jump into the implementation detail.

What is Debezium?

Debezium is an open source distributed platform for change data capture(CDC). It provides none-intrusive CDC to capture database inserts, updates and deletes that are committed by applications, and streams these changes to Apache Kafka.

Debezium is built on top of Apache Kafka. It is implemented using Kafka Connect framework. Each database integration is a Kafka source connector implementation that captures the database changes.

Debezium currently provides the following connectors:

You can apply declarative message transformation using Simple Message Transformation (SMT). SMT allows your to define predicate for Kafka message transformation. The predicate specifies how to apply the transformation conditionally to a subset of the messages that the connector processes. You can assign predicates to transformations that you configure for source connectors, such as Debezium, or to sink connectors.

Debezium provides several SMTs that you can use to modify event records before Kafka Connect saves the records to Kafka topics. The following are the list of the SMTs provided by Debezium.

TransformDescription
Topic RoutingRe-routes records to different topics based on a regular expression applied to the original topic name.
Content-Based RoutingReroute selected events to other topics, based on the event content.
New Record State ExtractionExtracts the flat structure of field names and values from Debezium change events, facilitating sink connectors which cannot process Debezium’s complex event structure.
MongoDB New Document State ExtractionThe MongoDB-specific counter-part to the New Record State Extraction SMT.
Outbox Event RouterProvides a way to safely and reliably exchange data between multiple (micro) services.
Message FilteringApplies a filter to the change events emitted by the connectors, based on their content. This lets you propagate only those records that are relevant to you.
List of SMTs provided by Debezium

There are a number of other interesting features in Debezium. You should get started by looking at the Debezium product documentations, FAQ and the blog.

Outbox Implementation Using Debezium

Debezium introduces Outbox Event Router in the attempt to provide the Outbox Pattern implementation from the beginning of very early version.

Gunnar Morling from Red Hat wrote an interesting and inspiring article on this concept in Reliable Microservices Data Exchange with the Outbox Pattern, which provides the blueprint of how this Outbox Pattern should be implemented using Debezium.

In the article, Gunnar outlined the manual approach of doing this using Debezium alone. Today we will look into how can we automate most of this manual steps using the Debezium Outbox Router and Debezium Quarkus Extension. With this new Quarkus extension, it provides a declarative approach to make developer life easier, such as the auto-creation of outbox table, event based routing and etc.

The Debezium Quarkus Extension feature is currently in incubating state, i.e. exact semantics, configuration options etc. may change in future revisions, based on the feedbacks received.

Let’s dive into detail by going through the sample that I have created based on the following illustration.

A fictional payment transaction implementation using Debezium outbox pattern

As illustrated, the casa-service does 2 things:

  • Provides REST interfaces for CASA transaction requests and queries. It performs the necessary database inserts and queries (casa table) on it’s own database called casa-postgres.
  • Consumes the response messages from core-service via the Kafka Topic (casa.response.events) and updates the casa table based on the response received.

The core-service consumes messages from the Kafka Topic (casa.events) and perform account balancing on the respective accounts in it’s own casa table in the core-postgres database.

As you can see these 2 business services are purely performing their own business logic processing and maintaining their own private database.

There are outbox table (CasaOutboxEvent and ResponseOutboxEvent) in each of the database. Debezium with the help of the Quarkus extension provides the outbox table implementation, persistency and event streaming out of the box. As a developer, your hand is free to focus on your business logic implementation.

So how does these outbox tables and messages being created?

There are a few things you need to do. Let’s look at the casa-service as an example.

Maven Dependency

Firstly, enable your Quarkus application to use the Debezium Outbox Quarkus Extension by inserting the following maven dependency in your pom.xml, among other dependencies.

<dependency>
   <groupId>io.debezium</groupId>
   <artifactId>debezium-quarkus-outbox</artifactId>
   <version>1.7.0.Alpha1</version>
</dependency>

Event Data Model

Create a data model to represent the data you wish to send to the consumers, in this case, the consumer is core-service. The following is how the POJO CasaEventData.java code snipplet looks like. This POJO data model will be used by the CasaEvent.java that we are going to create later.

/**
 * Provide the event data model for Casa outbox event implementation.
 * Provides the implementation for Debezium Outbox Pattern.
 */
@RegisterForReflection
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class CasaEventData {
    /**
     * Casa unique transaction id.
     */
    private String id;
    private String recipientAccountNo;
    private String sourceAccountNo;
    private double amount;
    /**
     * Casa created timestamp to indicates the timestamp when the Casa create event is being fired.
     */
    private Instant createdTimestamp;
    /**
     * Core processed timestamp to indicates the timestamp when the core processed the transaction.
     */
    private Instant coreProcessedTimestamp;
    /**
     * Audit timestamp to indicates the timestamp when the audit event is being fired.
     */
    private Instant auditTimestamp;
    /**
     * Response received timestamp to indicates the timestamp when the response from core is received by origination.
     */
    private Instant responseReceivedTimestamp;
    /**
     * Referecense information for recipient.
     */
    private String recipientReference;
    /**
     * Payment type. @see blog.braindose.paygate.model.PaymentTypes
     */
    private PaymentTypes paymentType;
    /**
     * Status of casa processing. @see blog.braindose.paygate.model.Status
     */
    private Status status;
    /**
     * Messages response from core backend if any. This could be error message when processing failed.
     */
    private String responseMessages;

    /**
     * Kafka header id
     * This is an unique id for each kafka message. Can be used to perform deduplication in the event of duplicated message sent by producer in the event of message resent due to unforeseen failure.
     */
    private String messageId;

    /**
     * Event source. This should be unique identifiable value for auditing purpose. Suggest to use Class.getName()
     */
    private String eventSources;
    /**
     * Event timestamp to provides timestamp when the event is being fired.
     */
    private Instant eventTimestamp;

   /// ... more omitted codes

Outbox Event Implementation

CasaEvent.java is the Java class that implements the io.debezium.outbox.quarkus.ExportedEvent. It provides the necessary implementation for the event information such as the payload, aggregation, type of event, etc. The payload is expected to be in JsonNode type, this is where we decide what data to be sent as payload.

/**
 * Casa Event for Outbox pattern implementation using Debezium.
 */
@Immutable
public class CasaEvent implements ExportedEvent<String, JsonNode> {

    private static ObjectMapper mapper = new ObjectMapper();

    /**
     * Unique Casa transaction id. @see blog.braindose.opay.casa.Casa#id
     */
    private final String id;
    /**
     * Payload to be sent to Kafka in JSON format.
     */
    private final JsonNode casa;
    /**
     * Timestamp for outbox pattern implementation. Defaulted to Casa transactionTimestamp. @see blog.braindose.opay.casa.Casa#transactionTimestamp
     */
    private final Instant timestamp;

    public CasaEvent(CasaEventData casa) {
        this.id = casa.getId();
        this.timestamp = casa.getEventTimestamp();
        this.casa = convertToJson(casa);
    }

    private JsonNode convertToJson(CasaEventData casa) {
        ObjectNode asJson = mapper.createObjectNode()
                .put("id", casa.getId())
                .put("recipientAccountNo", casa.getRecipientAccountNo())
                .put("sourceAccountNo", casa.getSourceAccountNo())
                .put("amount", casa.getAmount())
                .put("recipientReference", casa.getRecipientReference())
                .put("paymentType", casa.getPaymentType().toString())
                .put("createdTimestamp", casa.getCreatedTimestamp().toString())
                .put("eventSources", casa.getEventSources().toString())
                .put("eventTimestamp", casa.getEventTimestamp().toString())
                .put("status", casa.getStatus().toString());
        return asJson;
    }

    @Override
    public String getAggregateId() {
        return id;
    }

    @Override
    public String getAggregateType() {
        return "casa";
    }

    @Override
    public JsonNode getPayload() {
        return casa;
    }

    @Override
    public String getType() {
        return "payment";
    }

    @Override
    public Instant getTimestamp() {
        return timestamp;
    }

}

The CasaEvent will be deserialised and populated into the following table structure by the Quarkus Extension API. The data represented by CasaEvent will be inserted into the this table when the fire(ExportedEvent) method from javax.enterprise.event.Event is called in your codes. We will come to this later.

Column        |          Type          | Modifiers
--------------+------------------------+-----------
id            | uuid                   | not null
aggregatetype | character varying(255) | not null
aggregateid   | character varying(255) | not null
type          | character varying(255) | not null
payload       | jsonb                  |

The following information is taken from the Debezium documentation. These information provided through the CasaEvent.java implementation will be use by the SMT to construct the proper messages to be sent to the Apache Kafka.

ColumnEffect
idContains the unique ID of the event. In an outbox message, this value is a header. You can use this ID, for example, to remove duplicate messages.

To obtain the unique ID of the event from a different outbox table column, set the table.field.event.id SMT option in the connector configuration.
aggregatetypeContains a value that the SMT appends to the name of the topic to which the connector emits an outbox message. The default behavior is that this value replaces the default ${routedByValue} variable in the route.topic.replacement SMT option.

For example, in a default configuration, the route.by.field SMT option is set to aggregatetype and the route.topic.replacement SMT option is set to outbox.event.${routedByValue}. Suppose that your application adds two records to the outbox table. In the first record, the value in the aggregatetype column is customers. In the second record, the value in the aggregatetype column is orders. The connector emits the first record to the outbox.event.customers topic. The connector emits the second record to the outbox.event.orders topic.

To obtain this value from a different outbox table column, set the route.by.field SMT option in the connector configuration.
aggregateidContains the event key, which provides an ID for the payload. The SMT uses this value as the key in the emitted outbox message. This is important for maintaining correct order in Kafka partitions.

To obtain the event key from a different outbox table column, set the table.field.event.key SMT option in the connector configuration.
payloadA representation of the outbox change event. The default structure is JSON. By default, the Kafka message value is solely comprised of the payload value. However, if the outbox event is configured to include additional fields, the Kafka message value contains an envelope encapsulating both payload and the additional fields, and each field is represented separately. For more information, see Emitting messages with additional fields.

To obtain the event payload from a different outbox table column, set the table.field.event.payload SMT option in the connector configuration.
Additional custom columnsAny additional columns from the outbox table can be added to outbox events either within the payload section or as a message header.

One example could be a column eventType which conveys a user-defined value that helps to categorize or organize events.

Fire The Outbox Event

The next thing you need to do is to fire the Outbox event in your application code. Let’s take a look in the CasaResource.java which provides the REST interface for creating a CASA request.

/**
 * Provides REST interfaces for Casa services.
 */
@Path("casa")
public class CasaResource {

    @Inject
    Event<ExportedEvent<?, ?>> event;
    
    private static final Logger LOGGER = Logger.getLogger(CasaResource.class);
    private CasaEventData casaEventData = null;
    private boolean failed = false;

    /**
     * Create a new Casa transaction
     * @param casa
     * @return
     */
    @POST
    @Produces(MediaType.APPLICATION_JSON)
    @Consumes(MediaType.APPLICATION_JSON)
    @Transactional
    public Casa add(Casa casa) {
        try{
            casa.id = GenTxnId.id(TxnTypes.CASA);
            casa.createdTimestamp = Instant.now();
            casa.status = Status.SUBMITTED;
            casaEventData = new CasaEventData(casa.id, casa.recipientAccountNo, casa.sourceAccountNo, casa.amount, casa.createdTimestamp, casa.recipientReference, casa.paymentType, casa.status);
            casaEventData.setEventTimestamp(casa.createdTimestamp);
            casaEventData.setEventSources(Casa.class.getName());
            casa.persistAndFlush();
            event.fire(new CasaEvent(casaEventData));
        }
        catch(PersistenceException e){
            failed = true;
            if (casaEventData != null){
                casaEventData.setStatus(Status.FAILED);
                casaEventData.setResponseMessages("Error creating the Casa record in database.");
            }
            LOGGER.error("Error creating the Casa record in database.", e);
            throw e;
        }
        finally{
            if (casaEventData != null){
                event.fire(new CasaAuditEvent(casaEventData, EventTypes.PAYMENT, AggregateTypes.AUDIT_CASA));
                if (failed) event.fire(new CasaFailedEvent(casaEventData));
            }
        }
        return casa;
    }
    /// more codes omitted
    ...
    ...
    ...

}

As you can see from the above codes, you inject javax.enterprise.event.Event as event

@Inject
Event<ExportedEvent<?, ?>> event;

and then use it to fire the Outbox event with the CasaEventData as the parameter. That’s all you need to do from the coding perspective. Simple and clean!

event.fire(new CasaEvent(casaEventData));

Application.Properties

A final thing that we need to do is to configure the application.properties. This is the place to specify the name for your Outbox table that you want it to be named. You can also configure whether you want to remove the event data in the outbox table once it is inserted. For production, you probably want to do this to save the database storage usage. You do not need to keep the event data in the outbox table once it is inserted because Debezium already captured the insert events and they are not needed after that.

# Debezium outbox
quarkus.debezium-outbox.table-name=CasaOutboxEvent
%dev.quarkus.debezium-outbox.remove-after-insert=false
%prod.quarkus.debezium-outbox.remove-after-insert=true

There are many other configurations that you can use for Quarkus implementation to customise the outbox behaviours.

Kafka Connect Cluster

Once the application implementation is ready. You need to have a running Kafka Connect cluster with the proper Debezium connector plug-ins. You can configure the connector plug-ins, in this example is the Debezium connector for PostgresQL, with some extra configurations as per the following for the casa-service example.

{
    "name": "outbox-connector-casa",
    "config" : {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "casa-postgres",
        "database.port": "5432",
        "database.user": "casa",
        "database.password": "casa",
        "database.dbname": "casa",
        "database.server.name": "casa-event",
        "schema.include.list": "payment",
        "table.include.list": "payment.CasaOutboxEvent",
        "tombstones.on.delete": "false",
        "transforms": "outbox",
        "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
        "transforms.outbox.route.topic.replacement": "${routedByValue}.events",
        "transforms.outbox.table.field.event.timestamp": "timestamp",
        "transforms.outbox.table.field.event.id": "aggregateid",
        "transforms.outbox.table.fields.additional.placement": "type:header:eventType"
    }
}

The Kafka Connector container is available in the Docker Hub. You can also get a copy of the Dockerfile from GitHub and build a version yourself.

We are using outbox transformation as specified by the transforms key. Follows that setting is others Debezium Outbox SMT configuration which can be easily identified with key started with “transforms.outbox.“. The details of these SMT settings can be found here. The others are standard Kafka Connector configuration such as the database server related settings.

Notice the value configured for transforms.outbox.route.topic.replacement. ${routedByValue} is referring to the aggregateType in the CasaEvent.java. This allows us to dynamically route the respective messages for different types of transaction to different Kafka Topics. In this case, the value is “casa” and the Kafka Topic become casa.events.

The transforms.outbox.table.field.event.id is configured to the aggregateId, which is intended. With the unique transaction ID as the Kafka message key, we can easily perform transformation on these messages from different topics using Kafka Streams in the later part of this example.

Notice the transforms.outbox.table.fields.additional.placement which specify to place additional header fields into the Kafka message header by injecting the eventType from CasaEvent.java. This is pretty useful tweak where you may have scenario to use single Kafka Topic to capture same transaction type events but for different status, such as the example given by Gunnar Morling. In my case, I am now using it and just keep it as it is.

Some Additional Implementations

With the above, I have replicated the similar approach for core-service and I got it done in pretty short time frame.

To make things more complicated (which is not actually), I also use the same approach to implement the audit trail events for both casa-service and core-service as you can see from the following codes snipplet. Since I am using the same outbox table structure, I can basically reuse many of my Java classes and completed the event trails event implementation in a blink.

if (casaEventData != null){
   event.fire(new CasaAuditEvent(casaEventData, EventTypes.PAYMENT, AggregateTypes.AUDIT_CASA));
   if (failed) event.fire(new CasaFailedEvent(casaEventData));
}

As it has became easier, I also implement to capture FailedEvent so that these failed transactions can be captured into another Kafka topic for possible human intervention or maybe some automated processing using workflow. In this case casa-service and core-service will not need to worry how to handle those failed transactions. Neat!

Audit Trails Aggregation Using Kafka Streams

The audit events are pieces of information from both casa-service and core-service. We need to aggregate these disconnected information to have a complete single audit trail entry per each transaction and store this into database for safe keeping and tempered proof (not covered in this example). This is where the Kafka Streams come in handy. I basically do a simple join() and followed by a reduce() to do this. This can be easily done because my Kafka Message key is the unique transaction id I have configured earlier.

There is only one catch here. The payload is using JsonNode which I notice it is deserialised into a JSON string with extra double quotes, when it is being captured during the initial stage. It will fail to be direct serialised into Java object using the ObjectMapperSerde and I have no choice but to serialised it into String format. I hope this can be improved when it is GA in the future.

builder.stream(
            KAFKA_TOPIC_CASA_AUDIT,
            Consumed.with(Serdes.String(), Serdes.String()))        // Serialized JSON string from JsonNode creates extra double quotes, causing it is not possible to use Jackson to deserialize into Java object
            .join(
                coreAuditStream,
                (casaAudit, coreAudit) -> {
                    // Multiple audit entries since Casa service received the response from core service.
                    List<AuditEntry> auditEntries = new ArrayList<>();
                    try {
                        LOGGER.debug("Processing casa audit trail...");
                        CasaEventData casaAuditObj = createCasaEventData(casaAudit);
                        auditEntries.add(createAuditEntry(casaAuditObj));
                        
                        LOGGER.debug("Processing core audit trail for casa transaction ...");

                        auditEntries.add(createAuditEntry(createCasaEventData(coreAudit)));
                        
                        AuditData<Casa> auditData = new AuditData<>(
                            casaAuditObj.getId(), 
                            auditEntries, 
                            new Casa(casaAuditObj.getRecipientAccountNo(), casaAuditObj.getSourceAccountNo(), casaAuditObj.getAmount(), casaAuditObj.getRecipientReference()), 
                            Instant.now().toString(), 
                            casaAuditObj.getStatus().toString());

                        String jsonInString = mapper.writeValueAsString(auditData);
                        LOGGER.debug("Joined result = " + jsonInString);
                        return jsonInString;
                        
                    } catch (JsonProcessingException e) {
                        LOGGER.error("Problem parsing Kafka message into JSON.");
                        throw new RuntimeException("Problem parsing Kafka message into JSON", e);
                    }
                },
                JoinWindows.of(Duration.ofMinutes(KAFKA_STREAMS_JOINWINDOW_DURATION)),
                Joined.with(Serdes.String(), Serdes.String(), Serdes.String())
            )
            .groupByKey()
            .reduce(            // deduplication of audit trail ... 
                (value1, value2) -> AuditData.reduce(value1, value2),
                Materialized.with(Serdes.String(), Serdes.String())
            )
            .toStream()
            //.print(org.apache.kafka.streams.kstream.Printed.toSysOut())
            .to(KAFKA_TOPIC_PAYMENT_AUDIT, Produced.with(Serdes.String(), Serdes.String()))
        ;

To Run The Example

Please head to the GitHub and clone a copy of the projects in your local disk. In the command prompt navigate to the modules directory. Build the modules as instructed in the README.md and then run the following docker compose command to start all the services as containers.

docker compose up --build

Check the status of all the container using docker ps, wait for them to become healthy.

b4567be0b926   a3ae174cc61e   "/deployments/run-ja…"   About a minute ago   Up 32 seconds (healthy)       0.0.0.0:8080->8080/tcp, :::8080->8080/tcp                                                     modules_casa-service_1
6deb333ad96b   999d39c422f1   "/opt/kafka/kafka_co…"   About a minute ago   Up 32 seconds                 0.0.0.0:9080->9080/tcp, :::9080->9080/tcp                                                     modules_kafka-connect_1
3f162df090f3   aae19c8c41da   "/deployments/run-ja…"   About a minute ago   Up 32 seconds (healthy)       0.0.0.0:8081->8080/tcp, :::8081->8080/tcp                                                     modules_core-service_1
010142200cd3   80aa3991c152   "/deployments/run-ja…"   About a minute ago   Up 33 seconds (healthy)       0.0.0.0:8082->8080/tcp, :::8082->8080/tcp                                                     modules_audit-aggregator-streams_1
81ce03c1387a   b70c0caa0c55   "docker-entrypoint.s…"   About a minute ago   Up About a minute (healthy)   0.0.0.0:5433->5432/tcp, :::5433->5432/tcp                                                     modules_core-postgres_1
66a6f4bc810a   618f3ff31b20   "/bin/sh -c ${KAFKA_…"   About a minute ago   Up About a minute (healthy)   2181/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:9092-9093->9092-9093/tcp, :::9092-9093->9092-9093/tcp   modules_kafka_1
0bb3fcf88769   b70c0caa0c55   "docker-entrypoint.s…"   About a minute ago   Up About a minute (healthy)   0.0.0.0:5432->5432/tcp, :::5432->5432/tcp                                                     modules_casa-postgres_1
1aca37d67ecf   be15b71c7789   "/bin/sh -c ${MONGOD…"   About a minute ago   Up About a minute (healthy)   0.0.0.0:27017->27017/tcp, :::27017->27017/tcp                                                 modules_audit-mongodb_1

Create the necessary Kafka connectors using the following configurations. There are 3 connectors to register here.

  • outbox-connector-casa – This is the Debezium connector for outbox table in casa-postgres database
  • outbox-connector-core – This is the Debezium connector for outbox table in core-postgres database
  • mongodb-sink – This is the Kafka MongoDB connector for the audit table.
{
    "name": "outbox-connector-casa",
    "config" : {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "casa-postgres",
        "database.port": "5432",
        "database.user": "casa",
        "database.password": "casa",
        "database.dbname": "casa",
        "database.server.name": "casa-event",
        "schema.include.list": "payment",
        "table.include.list": "payment.CasaOutboxEvent",
        "tombstones.on.delete": "false",
        "transforms": "outbox",
        "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
        "transforms.outbox.route.topic.replacement": "${routedByValue}.events",
        "transforms.outbox.table.field.event.timestamp": "timestamp",
        "transforms.outbox.table.field.event.id": "aggregateid",
        "transforms.outbox.table.fields.additional.placement": "type:header:eventType"
    }
}
{
    "name": "outbox-connector-core",
    "config" : {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "core-postgres",
        "database.port": "5432",
        "database.user": "core",
        "database.password": "core",
        "database.dbname": "core",
        "database.server.name": "core-event",
        "schema.include.list": "core",
        "table.include.list": "core.ResponseOutboxEvent",
        "tombstones.on.delete": "false",
        "transforms": "outbox",
        "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
        "transforms.outbox.route.topic.replacement": "${routedByValue}.events",
        "transforms.outbox.table.field.event.timestamp": "timestamp",
        "transforms.outbox.table.field.event.id": "aggregateid",
        "transforms.outbox.table.fields.additional.placement": "type:header:eventType"
    }
}
{
    "name": "mongodb-sink",
    "config": {
        "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
        "tasks.max": 1,
        "topics": "payment.audit.events",
        "connection.uri": "mongodb://audit:audit@audit-mongodb:27017",
        "database": "audit",
        "collection": "payment",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "max.num.retries": 3
    }
}

The Kafka Connect cluster url is http://localhost:9080/connectors. Do a HTTP post to create the above connectors either using curl or Postman, for each of the connectors mentioned earlier.

Register Kafka Connectors

Try to send a CASA request via the casa-service

Post to create a CASA transaction request

Query the result of the CASA processing

Query the CASA transaction status

The following is what you will see as a result in the audit table.

rs0:PRIMARY> use audit
switched to db audit
rs0:PRIMARY> db.payment.find().pretty();
{
	"_id" : ObjectId("613981a793f9aa4f64c33eb6"),
	"payload" : {
		"amount" : 50.58,
		"recipientAccountNo" : "1-987654-1234-4569",
		"recipientReference" : "Payment for lunch",
		"sourceAccountNo" : "1-234567-4321-9876"
	},
	"id" : "1-20210909-033739893-17839",
	"lastStatus" : "COMPLETED",
	"auditEntries" : [
		{
			"eventSource" : "blog.braindose.opay.casa.Casa",
			"eventTimestamp" : "2021-09-09T03:37:39.905947Z",
			"responseMessages" : null,
			"status" : "SUBMITTED"
		},
		{
			"eventSource" : "blog.braindose.opay.core.casa.ConsumeCasa",
			"eventTimestamp" : "2021-09-09T03:37:42.180891Z",
			"responseMessages" : null,
			"status" : "COMPLETED"
		},
		{
			"eventSource" : "blog.braindose.opay.casa.ConsumeCasaResponse",
			"eventTimestamp" : "2021-09-09T03:37:43.271790Z",
			"responseMessages" : null,
			"status" : "COMPLETED"
		}
	],
	"eventTimestamp" : "2021-09-09T03:37:45.720514Z"
}

You can watch the Youtube video for a complete end to end explanation and demo.

Summary

Typical microservices outbox pattern implementation requires developers to manually create the outbox event table and write the codes to send the data from the outbox table to the respective messaging platform. Debezium Outbox Event Router together with Outbox Quarkus Extension take cares of this and enforce a standard approach to do this with declarative implementation. This allows the developers to focus on business logic implementation and achieve faster application delivery.

Let me know if this something relevant that could benefit you and your application team.

References

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s