These days, Big Data and Business Intelligence platforms are one of the fastest-growing areas of computer science. Companies want to extract knowledge from their data and analyze it in real-time to make data-driven business decisions. At GetInData we struggle with these challenges to tame our client's data and provide best-in-class solutions to extract knowledge from data in real-time.
Complex event processing is an innovative approach which opens new opportunities for companies that want to monitor and analyze, and respond to events occurring throughout the organization. Today I would like to share our experience with CEP by presenting a new Flink connector, which GetInData has developed. In the example, we will create a simple Flink job, which can capture changes from a SQL database and run pattern recognition on data streams from many sources using FlinkSQL.
Before we start, I will define some concepts, which will be used in the article.
Overview
Flink and FlinkSQL
Flink is an open-source framework to combat the subject of complex event processing. It supports low-latency stream processing on a large scale. Furthermore, FlinkSQL is a language provided by Flink, which allows you to write complex data pipelines without using a single line of Java or Scala code. If you know SQL, you will be able to learn FlinkSQL and build your pipelines quickly. Flink provides "connectors" and "sinks", which allow us to treat data from external systems like Kafka, ElasticSearch or PostgreSQL, as a table, which FlinkSQL can process.
Complex Event Processing (CEP)
The term "complex event processing" defines methods of analyzing pattern relationships between streamed events. When done in real-time, it can provide an advanced insight further into the data processing system.
Change Data Capture (CDC)
CDC is a method of recognizing when data in a source system has changed and capturing these changes for further processing. For example, you can use CDC to capture data changes in your SQL database and produce a stream of events, which describe data changes. The stream consists of a sequence of events, which describe insert, update, delete operations performed on database rows. Moreover, the CDC stream can be processed by Flink, which allows us to run complex analytics jobs like complex event processing or pattern recognition.
Use Case
I believe that the easiest way to understand an approach is to show it in action. Therefore, I created a simple use case inspired by real business needs from a product owner of a mobile banking application.
Let's start with a user story:
As a business analyst, I want to check the marketing campaign's effectiveness in promoting quick loans using the mobile application.
I want to have a view of users who:
- carried out a transaction for amount > 1000
- used the mobile application within 3 days of the transaction
- didn't take a loan
- used the mobile application again within 7 days from the previous usage
Data are stored in many sources:
- Kafka has topics with transactions and mobile application events
- PostgreSQL contains information about user loans
The view has to be updated in near real-time.
We will use Flink and pattern recognition from FlinkSQL to build a solution that can meet the business requirements. Flink provides a connector to Kafka, treating a topic as a table in FlinkSQL. It allows us to process information about transactions and mobile application events, however capturing changes from DB is a more challenging problem. We need to transform data changes from the SQL databases as a stream of events. We have several tools on the market, which can help us with the CDC problem, so let's have a look at them.
Debezium
This is one of the most popular open-source CDC tools, maintained by Red Hat. Debezium captures data changes from DB transaction logs and publish appropriate events on Kafka.
Pros:
- does not have an impact on the DB performance
- capture data in a real-time
- can capture deletes
- never misses an event (it captures every change in the table)
Cons:
- difficult setup - it requires using Kafka
- needs access to a database binlog
- supports only a few databases
- some old versions of databases do not support CDC
Ververica Flink CDC Connectors
Ververica provides flink-cdc-connectors, which can easily be used with Flink to capture data changes. In addition, the connector has integrated Debezium as a CDC engine, so it doesn't require extra effort to set up a full Debezium stack.
Pros:
- features provided by Debezium, but without setting up a "full environment."
Cons:
- supports only MySQL (5.7, 8.0.x), PostgreSQL (9.6, 10, 11, 12) and MongoDB (4.0, 4.2, 5.0)
- needs access to a database binlog
- some old versions of databases do not support CDC
Flink Connector JDBC
Connector, which allows us to write and read data from SQL databases directly in the FlinkSQL. It is one of the official connectors maintained by Apache Flink.
Pros:
- allows us to write results into SQL databases
- built-in to Flink, no need to add anything
Cons:
- reads data from a table only once - the connector does not provide CDC
- supports only a few DBs (MySQL, PostgreSQL, Derby)
GetInData CDC by JDBC Connector
Connector developed by GetInData for CDC purpose. The connector allows us to read data from SQL databases by periodically reading data from tables.
Pros:
- does not require additional components, as Debezium does.
- can easily be reused with any database. Just provide SQL queries for the CDC and JDBC Driver.
- uses pure SQL for CDC, so it doesn't require the special configuration of a database.
Cons:
- does not support delete operations.
- some CDC strategies require additional columns eg. last_modify_date.
- may have an impact on a database because it reads data by a periodically run SQL query.
- refreshes data in near real-time. The CDC logic is run at specific intervals.
- may omit an CDC event in frequently changing rows (insertion and deletion of a row, before the connector refreshes data).
Custom connector overview
We used the Table API provided by Flink to develop our CDC connector. Flink provides interfaces, which must be implemented by a custom user-specific logic to treat external data sources like a table. Next, the table can be processed by using FlinkSQL. Flink won't modify any external data while executing a query. Instead, the Flink execution engine uses a table definition saved in a CatalogTable, to read all of the data from the source during query execution.
For more details on how to write custom connectors, please check the Flink documentation.
Example
In this example, I want to show you how to used GetInData CDC by JDBC Connector with pattern recognition in FlinkSQL, which meets our user story's business requirements.
Before we start building Flink jobs, I want to define the data model used in the example.
On Kafka, we are going to include the following topics:
Topix | Example payload | Description |
---|
trx | {"cif":"3", "amount": 200, "ts": "2021-05-03 00:00:00"} | Contains information about transactions carried out by the user. |
clikstream | {"cif":"3", "type":"click", "ts": "2021-05-03 00:00:05"} | Contains information about user behaviour in the mobile application |
In PostgreSQL, we are going to have tables:
Table | Schema | Description |
---|
v_loan | CREATE TABLE v_loan ( id serial constraint v_loan_pk primary key, customer_id varchar(10), account_id varchar(30), decision_dttm timestamp );
| Contains information about loans taken by the user |
In this example, I want to set up our environment by using docker-compose. The script will set up a Flink cluster, Kafka and Postgres in Docker containers.
version: "3"
services:
jobmanager:
image: flink:1.13.2-scala_2.12-java11
hostname: jobmanager
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
JOB_MANAGER_RPC_ADDRESS: "jobmanager"
networks:
- flink_jdbc_connector
taskmanager:
image: flink:1.13.2-scala_2.12-java11
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- jobmanager:jobmanager
environment:
JOB_MANAGER_RPC_ADDRESS: "jobmanager"
networks:
- flink_jdbc_connector
postgres:
image: postgres
environment:
POSTGRES_PASSWORD: example
volumes:
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- "5432:5432"
networks:
- flink_jdbc_connector
kafka:
image: wurstmeister/kafka:2.12-2.4.0
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
networks:
- flink_jdbc_connector
networks:
flink_jdbc_connector:
driver: bridge
It might be challenging to set up a complete environment with an example mobile application, so I prepared a small python script, which we can use to simulate user behaviour. The script will simulate two scenarios - the happy path, which meets the user story and the unhappy path, which should not match the pattern recognition query.
import json
import time
from kafka import KafkaProducer
from datetime import datetime, timezone
from sqlalchemy import create_engine
from sqlalchemy.sql import text
kafka_server = "local-dev_kafka_1:9092"
producer = KafkaProducer(bootstrap_servers=kafka_server)
def send_to_kafka(topic: str, record: bytes):
producer.send(topic, record)
producer.flush()
def generate_clickstream_record(cif: str, eventType: str, ts: datetime):
payload = {'cif': cif, 'type': eventType, 'ts': ts.replace(tzinfo=timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")}
send_to_kafka('clickstream', json.dumps(payload).encode('utf-8'))
def generate_trx_record(cif: str, amountPLN: float, ts: datetime):
payload = {'cif': cif, 'amount': amountPLN, 'ts': ts.replace(tzinfo=timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")}
send_to_kafka('trx', json.dumps(payload).encode('utf-8'))
engine = create_engine('postgresql+pg8000://postgres:example@postgres:5432/postgres')
con = engine.connect()
def generate_loan(cif: str, day: datetime):
stmt_loan = text("""
INSERT INTO V_LOAN
VALUES (default, :cif, :account, :day)
""")
data = {'cif': cif, 'account': cif, 'day': day}
con.execute(stmt_loan, **data)
def generate_scenario_happy_path(cif: str):
generate_trx_record(cif, 2000, datetime(2021, 4, 1, 0, 0, 0))
generate_clickstream_record(cif, 'click', datetime(2021, 4, 2, 12, 0, 0))
generate_clickstream_record(cif, 'click', datetime(2021, 4, 4, 12, 0, 0))
generate_scenario_happy_path("happy")
def generate_scenario_not_happy_path(cif: str):
generate_trx_record(cif, 400, datetime(2021, 4, 1, 0, 0, 0))
generate_clickstream_record(cif, 'click', datetime(2021, 5, 2, 12, 0, 0))
generate_scenario_not_happy_path("nothapp")
The first thing we need to do before building a pattern recognition pipeline is define data sources in FlinkSQL. To connect to data sources, we use connectors. Connectors allow us to treat data stored in PostgreSQL and Kafka as tables.
CREATE TABLE v_loan
(
id INT,
customer_id VARCHAR,
account_id VARCHAR,
decision_dttm AS PROCTIME(),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc-cdc',
'url' = 'jdbc:postgresql://local-dev_postgres_1:5432/postgres',
'table-name' = 'v_loan',
'username' = 'postgres',
'password' = 'example',
'cdc.strategy' = 'SIMPLE',
'cdc.simple-strategy.ordering-columns' = 'id'
);
CREATE TABLE trx (
cif STRING,
amount DOUBLE,
ts AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'trx',
'properties.bootstrap.servers' = 'local-dev_kafka_1:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
CREATE TABLE clickstream (
cif STRING,
type STRING,
ts AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'clickstream',
'properties.bootstrap.servers' = 'local-dev_kafka_1:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
CREATE VIEW events AS
SELECT UUID() AS event_id,
customer_id AS customer_id,
'loan_event' AS type,
CAST(account_id AS STRING) AS payload,
decision_dttm AS ts
FROM v_loan
UNION ALL
SELECT UUID() AS event_id,
cif AS customer_id,
'trx_event' AS type,
CAST(amount AS STRING) AS payload,
ts AS ts
FROM trx
UNION ALL
SELECT UUID() AS event_id,
cif AS customer_id,
'clickstream_event' AS type,
type AS payload,
ts AS ts
FROM clickstream;
This example shows how to define pattern recognition by using FlinkSQL.
Firstly, we need to define our events.
- TRX - an event, which describes a transaction with an amount greater than 1000.
- APP_1 - an event representing the user interaction with the application within 3 days after the transaction.
- NO_LOAN - an event, which informs usthat the user didn't take a loan.
Secondly, we need to specify the order of events and expected output. We look for a pattern from the user story. To define the pattern, we use the pattern expression from the Flink documentation. The pattern expression syntax is quite similar to a regular expression syntax.
SELECT *
FROM events
MATCH_RECOGNIZE(
PARTITION BY customer_id
ORDER BY ts
MEASURES
TRX.event_id AS trx_event_id,
TRX.customer_id AS trx_customer_id,
TRX.type AS trx_type,
TRX.payload AS trx_payload,
TRX.ts AS trx_ts,
APP_1.event_id AS app_1_event_id,
APP_1.customer_id AS app_1_customer_id,
APP_1.type AS app_1_type,
APP_1.payload AS app_1_payload,
APP_1.ts AS app_1_ts
ONE ROW PER MATCH
PATTERN (TRX APP_1 NOT_LOAN*? APP_2) WITHIN INTERVAL '10' DAY
DEFINE
TRX AS TRX.type = 'trx_event' AND TRX.payload > 1000,
APP_1 AS APP_1.type = 'clickstream_event' AND APP_1.ts < TRX.ts + INTERVAL '3' DAY,
APP_2 AS APP_2.type = 'clickstream_event' AND APP_2.ts > APP_1.ts AND APP_2.ts < APP_1.ts + INTERVAL '7' DAY,
NO_LOAN AS NOT_LOAN.type <> 'loan_event'
) MR;
As we can see, the pattern recognition query works as expected.
Conclusions - Complex Event Processing with Flink
Flink is a powerful platform for building real-time data processing platforms, which can be fed from many sources. Using GetInData CDC by JDBC connector, we can start extracting knowledge from legacy applications and implementing "data-driven culture" in an organization.
Data is one of your company's most valuable assets, and when skillfully used, it allows you to take your business to a new level.
We plan to publish the connector code as an open source project for future work, so stay tuned!
If you would like to know more about Complex Event Processing, check our CEP Platform.