Outbox Pattern using Change Data Capture (CDC): Kafka,Outbox,Debezium,Postgres,Springboot

Gajendrakhandelwal
4 min readMar 1, 2022

In this article, we are going to talk about implementing outbox pattern with debezium and kafka.

Prerequisite:

Basic Docker Knowledge(Must For this article)

Basic idea of CDC,WAL, Outbox Pattern.

Basic idea of Debezium,Kafka.

Requirement:

  1. ) Spring boot App
  2. ) Docker

Running All Dependent Services:

First we will start all dependent services using docker-compose.

Services available in Docker compose
1.)Kafka
2.)Postgres
3.)Debezium
4.)Zookeeper
5.)KafDrop(Kafka UI)

Below find docker-compose.yml code

version: ‘3.9’services:
db:
image: postgres:13
ports:
— “5438:5432”
environment:
— POSTGRES_PASSWORD=test
— POSTGRES_USER=postgres
— POSTGRES_DB=test
zookeeper:
image: debezium/zookeeper
ports:
— “2181:2181”
— “2888:2888”
— “3888:3888”
kafka:
image: debezium/kafka
ports:
— “9092:9092”
— “29092:29092”
depends_on:
— zookeeper
environment:
— ZOOKEEPER_CONNECT=zookeeper:2181
— KAFKA_ADVERTISED_LISTENERS=LISTENER_EXT://localhost:29092,LISTENER_INT://kafka:9092
— KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=LISTENER_INT:PLAINTEXT,LISTENER_EXT:PLAINTEXT
— KAFKA_LISTENERS=LISTENER_INT://0.0.0.0:9092,LISTENER_EXT://0.0.0.0:29092
— KAFKA_INTER_BROKER_LISTENER_NAME=LISTENER_INT



kafdrop:
image: obsidiandynamics/kafdrop
container_name: kafka-drop
ports:
— “9011:9010”
environment:
KAFKA_BROKERCONNECT: “kafka:9092”
JVM_OPTS: “-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify”
server.port: “9010”
depends_on:
— “kafka”

connect:
image: debezium/connect
ports:
— “8083:8083”
environment:
— BOOTSTRAP_SERVERS=kafka:9092
— GROUP_ID=1
— CONFIG_STORAGE_TOPIC=my_connect_configs
— OFFSET_STORAGE_TOPIC=my_connect_offsets
— STATUS_STORAGE_TOPIC=my_connect_statuses
— CONNECT_PRODUCER_MAX_REQUEST_SIZE=3145728
depends_on:
— zookeeper
— kafka

For running this docker compose file just use command docker-compose up.

Steps are as follows:

  1. ) Creating Outbox table as per Debezium
  2. ) Enabling Wal log if postgres version <10
  3. ) Creating Debezium Connector using Rest API.
  4. ) Connector Configuration Explanation
  5. ) Sample Producer Implementation.
  6. ) Working Explanation

Step 1:Creating Outbox table as per Debezium

  1. . Connect your spring boot app with postgres service with configuration.
Postgres DB Connection Configuration
+-------------+--------------+
| Config Name | Config Value |
+-------------+--------------+
| db host | localhost |
| db port | 5438 |
| db name | test |
| db user | postgres |
| db password | test |
| | |
+-------------+--------------+

2) Create Outbox Table with Query.

CREATE TABLE outbox (
id uuid NOT NULL PRIMARY KEY,
payload jsonb NOT NULL,
aggregatetype varchar(255) NOT NULL,
aggregateid varchar(255) NOT NULL,
type varchar(255) NOT NULL
);

3.) Column Definition

+---------------+---------------------------------+
| Column Name | Column Definition |
+---------------+---------------------------------+
| id | unique UUID |
| aggregatetype | kafka Topic Name |
| aggregateid | kafka key for order maintain |
| type | Event Type |
| payload | Event Payload |
| | |
+---------------+---------------------------------+

Step 2:- Enabling Wal log if postgres version <10

If postgres version is below 10. Then we have to enable WAL log .

1.) docker exec -it <dbcontainerid> bash
2.) psql -U postgres
3.) alter system set wal_level to 'logical';
4.) docker restart <dbcontainerid>

Step3: Creating Debezium Connector using Rest API

Url:- 127.0.0.1:8083/connectors
Type:- POST
Body:-
{
"name": "arctype-connector-test",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"plugin.name": "pgoutput",
"database.hostname": "db",
"database.port": "5432",
"database.user": "postgres",
"database.password": "arctype",
"database.dbname": "test",
"database.server.name": "ARCTYPE",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"table.whitelist": "public.outbox",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.fields.additional.placement": "type:header:type",
"transforms.outbox.table.expand.json.payload": "true",
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 10,
"topic.creation.default.compression.type": "lz4",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"slot.name": "testpgoutput"
}
}

Note:- If postgres version is below 10 then replace plugin.name key in body with value wal2json instead of pgoutput.

There are multiple properties in Connector, modify it as per the requirement.

Step4: Debezium Connector configuration Explanation

https://airtable.com/shrnHO8YtuWPid413/tblVxt7EFbR0WfsbDFor further Detailed info for properties please read
https://debezium.io/documentation/reference/stable/connectors/postgresql.html

Step5: Producer Side Sample Implementation

1.)Save Entity in Main Table (i.e student).
2.)In same Transaction save student event payload in outbox table.
3.)Delete Saved outbox table entry.
@Transactional
public void saveStudent(StudentEntity student){
studentRepository.save(student);
StudentEventMessage studentEvent = createEventMessage(student);
JsonNode payload = objectMapper.readTree(studentEvent);
OutboxEntity outboxEntity = new OutboxEntity();
outboxEntity.setId(UUID.randomUUID());
outboxEntity.setAggregateType("student");
outboxEntity.setAggregateId(String.valueOf(student.getId());
outboxEntity.setType("CREATE");
outboxEntity.setPayload(payload);
outboxrepository.save(outboxEntity);
outboxrepository.delete(outboxEntity);// we can delete entity instantly because Debezium Read WAL log directly for SAVE only.
}

Note:- You have to save and delete instantly from outbox as data is read from WAL log so we don’t need to keep the record.

Make sure to perform all operation in same transaction.

Step6: Working Explanation

1.) Once you save and delete data from outbox table.

2.)After successful commit of your data, WAL log read by Debezium.

3.) Event will be published to outbox.event.student topic(suffix topic name picked from aggregatetype column).

4.) Event message will be picked from payload column of outbox table.

5.) In header type column value(CREATE) will be sent because of this configuration(transforms.outbox.table.fields.additional.placement”: “type:header:type”).

Note:- Debezium Automatically appends outbox.event as prefix in your topic name(aggregatetype column value).

It’s a sample implementation of outbox pattern with debezium and kafka, for production we need to configure multiple things like heartbeat etc.

References:-

Team Members:-

Pranjay Singh,Alok Mishra ,Ravendra Patel

--

--

Gajendrakhandelwal

Senior Software Engineer, Hilti Technology Solutions India