The Airbyte 0.50 release has brought some exciting changes to the platform:
- checkpointing (so that you don’t have to start from scratch in case of interruptions during sync)
- automatic schema propagation (so that you don’t have to take care of it manually whenever the source schema changes)
- column selection (so that you just sync the fields you need from the source).
The last feature - column selection - had been really sought-after by the community. If you go to Airbyte’s repository and the issues, sorting them by the thumbs-up emoji brings Allow a user to filter fields/columns from a table to the very top.
This article describes two small tests of the new feature we performed to get answers for the below questions:
- How does the column selection mechanism behave when we have a relational database with CDC (change data capture) enabled on the source side of synchronization?
- Does Airbyte still grab all the data from the source (select * from table_foo) in the case of relational databases?
Let’s quickly go through the possible reasons for using column filtering in your data ingestion processes.
Column selection - why should you even care?
- Compliance & data security - we often work in regulated environments imposing various restrictions around how data should be handled. GDPR and its concept of data minimisation states that personal data must be "adequate, relevant and limited to what is necessary in relation to the purposes for which they are processed". A lack of compliance in this regard may cause both financial and reputational damage to organizations. It was also a blocker for the adoption of Airbyte itself within companies.
- Cost - less data means lower costs of compute and storage. Furthermore, this is especially true when you’re using a cloud data warehouse which usually bills you based on the resources consumed.
- Time - only sending the subset of a dataset over the network will be faster. This will also speed up the process of loading & normalizing data in the destination.
But column selection was possible before…
Well, it was. Kind of.
There were some workarounds that might have worked for you. One of them was using database views. You could create a view exposing only the columns you needed for synchronization and that was it! However, views do not always solve the problem:
- If you don’t own the source system, you might not be allowed to create any database objects in there
- Views are not supported when using incremental synchronization with the CDC mechanism (change data capture)
- You won’t be able to use views when working with a non-database type of source system (like HTTP APIs for example).
The other way you could approach the problem was to hide the redundant columns during the normalization process (removing them from select statements within a custom DBT project). While this solved the problem for normalized tables, Airbyte raw tables (the ones starting with _airbyte_raw_ prefix) would still contain all the fields from the source. Moreover, raw tables store source data as JSONB string, which is usually much bigger in terms of size compared to source tables (it is difficult to apply efficient compression when keeping all the records as a string).
As a last resort, if you’re knowledgeable enough, you could try implementing your own custom connector with column selection.
Let’s get our hands dirty - preparing the environment
We’ll use Docker & docker-compose for spinning up a container with the PostgreSQL 15 image. To keep things simple, the source and destination will be the same Postgres database (but the data will be moved between different schemas). If you’re not familiar with the concept of incremental ingestion with CDC, we encourage you to check Airbyte’s documentation for this topic.
Enabling CDC requires some extra steps to be executed.
- Setting the wal_level variable to logical
- Creating a logical replication slot
- Creating a publication for the table we’d like to sync
We’ll take care of them inside docker-compose and db init SQL script, so everything will be ready upon database startup.
Here’s the docker-compose
file:
version: '3.7'
services:
db:
container_name: airbyte-column-selection-db
image: postgres:15
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
ports:
- "5555:5432"
volumes:
- postgres_airbyte_volume:/var/lib/postgresql/data
- ./init_db.sql:/docker-entrypoint-initdb.d/init_db.sql
command: ["postgres", "-c", "wal_level=logical"]
volumes:
postgres_airbyte_volume:
And here’s the init_db.sql
:
CREATE SCHEMA source;
CREATE SCHEMA destination;
CREATE TABLE source.table_a (
id serial primary key,
column_1 varchar,
column_2 varchar,
column_3 varchar
);
SELECT pg_create_logical_replication_slot('airbyte_slot', 'pgoutput');
CREATE PUBLICATION airbyte_publication FOR TABLE source.table_a;
INSERT INTO source.table_a (column_1, column_2, column_3)
VALUES
('foo1', 'foo2', 'foo3'),
('bar1', 'bar2', 'bar3'),
('baz1', 'baz2', 'baz3');
You can also find these files in the github repository created for the purpose of this article.
Environment can be started with the command:
docker-compose up -d
Let’s run some queries to confirm everything was created as expected.
docker exec -ti airbyte-column-selection-db psql -U postgres -d postgres
Everything seems correct! It’s time to perform the first test.
Test #1 - How does Airbyte handle updates to deselected columns when using CDC incremental ingestion?
Oops, we have duplicates!
Before column selection was introduced, I tried to overcome the problem of only synchronizing a subset of columns by adding a custom DBT transformation, which only contained the columns I wanted to sync (bearing in mind that such an approach only removes columns from the normalized tables). There’s an article in official documentation that walks through the steps of generating Airbyte’s normalization DBT project (so you quickly get the boilerplate code that you can modify according to your needs).
The problem with such an approach is that, when using CDC, raw tables will contain changes for all columns (including the ones that we’d like to exclude from the synchronization). If you simply remove the unwanted columns from select statements in the normalization SQL, the normalized table will contain duplicated records. The below diagram depicts such a situation. We have a table with columns (id, col_1, col_2, col_3) and we’d like to sync only the first three. Col_3 will be removed in the normalization SQL.
The first synchronization pulls a full snapshot of data. The second one captured an update to the value of col_2 so a new version of the record is appended to the target table. During the third sync, an update to col_3 is pulled. Again, another version of the record is appended to the destination table. However, since we’ve removed the column with the updated value from the table, we’ll see a duplicate of the record with id=1, col_1=val 1, col_2=val 2_.
Keep in mind that Airbyte adds some extra metadata columns related to the CDC process (_ab_cdc_lsn - log sequence number, _ab_cdc_updated_at - timestamp
of update event, _ab_cdc_deleted_at - timestamp of record deletion
) and their values will be different. Yet from the source data view, we’re getting duplicated data.
When we stumbled upon this issue, we added some extra code in the normalization DBT project to deduplicate the normalized tables. We were curious whether the new feature produced the duplicated data or not. Let’s find out!
Testing Airbyte’s behavior
After adding source & destination pointing to the appropriate schemas, we created a connection between them. In the field selection pane, we deselected column_3. Sync mode was set to “Incremental | Append” and, for normalization, the default one was used.
Let’s run the first sync and copy all of the data to the destination.
Here are the results from target table:
The column selection feature works as expected - we don’t see column_3 in the table. Let’s also ensure this column is not loaded into the raw table. Here’s a formatted JSON taken from the first record from _airbyte_raw_table_a table:
{
"id": 1,
"column_1": "foo1",
"column_2": "foo2",
"_ab_cdc_lsn": 23242280,
"_ab_cdc_deleted_at": null,
"_ab_cdc_updated_at": "2023-06-18T21:07:07.873Z"
}
The raw table does not contain deselected fields. Cool!
Now, let’s try to run 2 updates on the source database: one for column_1 and the other one for column_3 (which is not selected in our synchronization):
update source.table_a
set column_1 = column_1 || ' updated'
where id = 1;
update source.table_a
set column_3 = column_3 || ' updated'
where id = 1;
Then run the synchronization.
As you can see from the logs, 2 records had been emitted. Filtering the destination table by id=1 shows that there are 3 versions of the record and the last two have the same values for id, column_1 and column_2 columns.
The conclusion of this test is that Airbyte does not deduplicate extra rows generated by updates on columns excluded from a connection (in a scenario with a CDC-enabled source). It is your responsibility to handle such scenarios.
Test #2 - does Airbyte extract all columns from the source (PostgreSQL)?
When reading the article about column selection implementation details, it caught our eye that the feature was implemented on a worker level, rather than on the source connector level. Our understanding of this paragraph was that despite of deselecting columns from sync, Airbyte will still pull all of the data from the source (basically running select * from source_table
) and the deselected columns will be dropped later by the worker.
While we totally get the justification for such an implementation (removing fields by the worker does not require any changes to the source connectors), we hoped that the column selection feature would also help with reducing the overall size of bytes transferred over the network (which obviously impacts the execution time of synchronization).
We decided to perform a small test and see what the actual query that Airbyte executes was over the source database. Again, we tested that with PostgreSQL 15 -> PostgreSQL 15 connection. The source table (named foobar) contained 11 fields (id, column_1, column_2, … up to column_10). Fields column_5 to column_10 were deselected in the connection settings. We populated this table with a few more records than the one from the previous test - so the sync took more time and we were able to see the Airbyte’s query by looking at the pg_stat_activity view (it lists currently running queries).
We used the psql interactive terminal along with \watch command to refresh the results from the select statement every second:
\watch select query from pg_stat_activity where query ilike '%foobar%'
And the result was a surprise - in a good way :-):
select query from pg_stat_activity where query ilike '%foobar%';
SELECT "id","col_1","col_2","col_3","col_4","col_5" FROM "public"."foobar"
As you can see, the select statement did not use an asterisk and only enumerated s the columns selected in the connection settings. This was cool!
Wrapping it up
Summing up the tests:
- If you’re synchronizing data from CDC enabled sources and would like to exclude some columns from connection with the new column selection feature, remember to handle possible duplicates.
- Airbyte does not necessarily select all the fields from the source (at least that’s not the case for the Postgres source). If you deselect many columns from the source table, it might impact the speed of synchronization in a good way.