dbt-flink-adapter - job lifecycle management. Transforming data streaming
It's been a year since the announcement of the dbt-flink-adapter, and the concept of enabling real-time analytics with dbt and Flink SQL is simply…
Read moreAirflow is a commonly used orchestrator that helps you schedule, run and monitor all kinds of workflows. Thanks to Python, it offers lots of freedom of expression to define all kinds of tasks and connect APIs in different domains. It also provides many extensions to schedule workloads on other platforms such as Kubernetes.
Thanks to those qualities, Airflow is often also used in machine learning projects to manage tasks there. However, there is another tool that additionally provides pipeline abstraction and excels at organizing ML related tasks - Kedro.
Can the two be used together? Yes, they can! In this article I'll show you how to do this and how to make it efficient and seamlessly automated!
For this task I decided to use the official Kedro plugin, as it has recently got an update that allows you to customize its usage using custom jinja templates for DAG generation.
The plugin is very simple in structure, in fact, it does one job only - it allows you to generate a DAG code for a given Kedro pipeline and pass parameters to it using a jinja template.
Here's everything it has to offer:
kedro airflow create
- Create an Airflow DAG for a project
-p, --pipeline TEXT
- Name of the registered pipeline to convert. If not set, the '__default__' pipeline is used.-e, --env TEXT
- Kedro configuration environment name. Defaults to local.-t, --target-dir DIRECTORY
- The directory path to store the generated Airflow dags-j, --jinja-file FILE
- The template file for the generated Airflow dags--params TEXT
- Specify extra parameters that you want to pass to the context initializer. Items must be separated by a comma, keys - by colon or equals sign, example: param1=value1
, param2=value2
. Each parameter is split by the first comma, so parameter values are allowed to contain colons, parameter keys are not. To pass a nested dictionary as a parameter, separate keys by '.', example: param_group.param1:value1
.The first thing I did was to read the documentation and try to go through the quick start steps on a locally established Airflow environment with docker-compose. The provided example template creates KedroOperator
which is just another variant of PythonOperator
that executes Kedro nodes in separate processes spawned from session.run()
. I quickly established my opinion about the quick start setup - the example given there is unpractical, as it is flawed in a few ways that I'd like to avoid in my solution:
So at first I had the solution in mind to use either DockerOperator
or KubernetesOperator
to achieve that isolation and scalability of execution. As my target was Google cloud, I am going to work with managed Airflow (GCP Composer) backed by a GKE Autopilot cluster, as they are native to GCP. Naturally, that led to the choice of GKEPodOperator provided by Google to work with GKE. GKEPodOperator
inherits from KubernetesOperator
and provides the same functionality, just with an added bonus of handling and hiding Google authentication mechanisms to authorize with the GKE cluster for you. It was funny to me that this plugin also provides create/delete cluster operators, as if creating a computation cluster is a good idea to just run one task on... I guess it could be if you run one big task not so often, as the costs for provisioning the cluster are 0 in GCP, while you pay for the upkeep time. Well it's helpful in the tutorial as it handles GKE clusters for you.
While I had no need for Airflow requirements in the Kedro project, it was useful to have another environment withapache-airflow[google,kubernetes], kubernetes
python packages installed to check the validity of generated DAGs. The Kubernetes package is used here to define and validate Kubernetes object descriptions, mainly machine resource specifications.
The next step, after choosing the tools for the job, was to set up the environment for Composer and GKE. At first I did it in the GCP Console UI, however I wanted easily reproducible results, so I wrote some terraform
code to quickly provision and destroy the environment. I also wanted to use and set up MLflow for experiment tracking and you can check out this other blog post on how to set it up on Google Cloud Run. I used the official Google modules, however given its complexity and warnings, I'd seriously consider whether to use it outside of just demo purposes or not.
Allowing communication between MLflow and the GKE cluster required some additional effort, that is outside the scope of this article. To keep it short, MLflow is secured with an IAP proxy, so we needed some service accounts that would have access to it and make Airflow executors (here: GKE Pods) use those service accounts (Workload Identities is the go-to mechanism here). You can find more details on this in the repository README of this demo.
Here I'll show you how to use the plugin and how to customize it to your needs in detail. We have some inputs handled for us by the plugin. Here's how it calls our jinja template to fill it:
emplate.stream(
dag_name=package_name,
dependencies=dependencies,
env=env,
pipeline_name=pipeline_name,
package_name=package_name,
pipeline=pipeline,
**dag_config,
).dump(str(target_path))t
Where env
is the Kedro environment, pipeline
is the Kedro pipeline object, dag_config
is the dictionary of parameters passed to the template and dependencies
is a dictionary of parent-child relationships between nodes defined by the pipeline. Parameters can be passed either via the command line at creation invocation or using airflow params config file. The config file is loaded in the plugin with the _load_config
function in the plugin (and we can see here where it looks for the config file as a file pattern by default):
def _load_config(context: KedroContext, pipeline_name: str) -> dict[str, Any]:
# Set the default pattern for `airflow` if not provided in `settings.py`
if "airflow" not in context.config_loader.config_patterns.keys():
context.config_loader.config_patterns.update( # pragma: no cover
{"airflow": ["airflow*", "airflow/**"]}
)
...
try:
config_airflow = context.config_loader["airflow"]
...
All those parameters will be visible as variables in jinja, available to use in our template. Let's get down to it and configure it!
Here's my conf/base/airflow.yml
defining the parameters:
default:
grouping_prefix: "airflow:"
resources_tag_prefix: "machine:"
# When grouping is enabled, nodes tagged with grouping prefix get grouped together at the same node of Airflow DAG for shared execution
# Make sure the grouping_prefix is not a prefix for any node names and that every node has only one of tags with such prefix and that they are not disjoint
grouping: true
gcp_project_id: "gid-labs-mlops-sandbox"
gcp_region: "europe-west1"
gcp_gke_cluster_name: "europe-west1-test-environme-d1ea8bdc-gke"
k8s_namespace: "airflow-ml-jobs"
k8s_service_account: "composer-airflow"
docker_image: "europe-west1-docker.pkg.dev/gid-labs-mlops-sandbox/images/spaceflights-airflow"
start_date: [2023, 1, 1]
max_active_runs: 2
# https://airflow.apache.org/docs/stable/scheduler.html#dag-runs
schedule_interval: "@once" # null
catchup: false
# Default settings applied to all tasks
owner: "airflow"
depends_on_past: false
email_on_failure: false
email_on_retry: false
retries: 0
retry_delay: 5
# Arguments specific to the pipeline (overrides the parameters above)
data_science:
owner: "airflow-ds"
In this config we can define any custom variables we want and the context of using them will become clear once we view the jinja template. The parameters defined here configure Airflow behavior, point to GKE cluster location, define parameters in the k8s pod template and supplement pipelines with additional informative tags.
Sets of parameters can be defined as a default used for all cases and pipeline specific cases with the pipeline’s name overriding the defaults. We use Spaceflights starter as a starting point, so we have __default__, data_science
and data_processing
pipelines.
The DAG template can be found here. The main points are:
GKEPodOperator
for each node, passing docker command to run only selected nodes in each step - we use the same docker image built from our Kedro project repository,slugify
to sanitize strings to be accepted, regardless of the character restrictions of Kubernetes API,"machine:..."
(as a convention),airflow_xcom
mechanism.By default, every Kedro node is translated to one node of another framework, here - Airflow DAG node. As of version 0.18.13 Kedro still does not support any encapsulation of nodes into groups (neither do most of its plugins). For Kedro pipelines you want high granularity of nodes, to make them responsible only for one thing to be easily testable and reusable. Granularity in task division in a single process has almost no overhead, as the memory can be shared between the nodes. However, in Airflow (using docker images) you want to have as few nodes as possible to reduce the overhead of pod creation and destruction. More nodes also mean more time wasted on data serialization and communication between them. So how should you handle that?
What could give us more control over how the pipeline is structured?
Tags! Tags are a great way to group nodes together and define their properties.
We've got all the pieces of solution at hand. In Kedro we can execute only selected nodes using tag filtering mechanisms, e.g.:
kedro run --tags data_processing
We’re going to use a convention of special tags that will be used for this purpose. By default we’ll consider tags starting with “airflow:”
as grouping tags with the name of the group being text after the prefix.
As each node is run in a docker container, all that's left is to determine whether nodes are grouped or not and pass the correct command: either run a single node or group of nodes with a tag. Now, it would be best to do that as a part of a hook to the plugin to do that processing at dag generation, but we don't have such options in this plugin yet. The next best thing would be to do it using programming in jinja, but that would result in a quite complex and unmaintainable template. So the last solution is to utilize the power of Python and Airflow and embed the code doing that work inside the DAG definition. So I've implemented a few functions that will create new node mapping and update the tags based on the grouping tags.
The code is as follows:
def group_nodes_with_tags(node_tags:dict, grouping_prefix:str = "airflow:") -> Tuple[dict, dict]:
# Helper dictionary that says to which group/node each node is part of
group_translator = { k:k for k in node_tags.keys() }
# Dict of groups and nodes they consist of
tag_groups = dict()
for node, tags in node_tags.items():
for tag in tags:
if tag.startswith(grouping_prefix):
if tag not in tag_groups:
tag_groups[tag] = set()
tag_groups[tag].add(node)
group_translator[node] = tag
return group_translator, tag_groups
def get_tasks_from_dependencies(node_dependencies: dict, group_translator: dict) -> Tuple[set, dict]:
# Calculating graph structure after grouping nodes by grouping tags
group_dependencies = {}
task_names = set()
for parent, children in node_dependencies.items():
if group_translator[parent] not in group_dependencies:
group_dependencies[group_translator[parent]] = set()
this_group_deps = group_dependencies[group_translator[parent]]
task_names.add(group_translator[parent])
for child in children:
if group_translator[child] != group_translator[parent]:
this_group_deps.add(group_translator[child])
task_names.add(group_translator[child])
return task_names, group_dependencies
def update_node_tags(node_tags: dict, tag_groups: dict) -> dict:
# Grouping tags of new group nodes as sum of nodes' tags
node_tags.update({ group : set([tag for node in tag_groups[group] for tag in node_tags[node]]) for group in tag_groups})
return node_tags
...
group_translator, tag_groups = group_nodes_with_tags(node_tags)
task_names, group_dependencies = get_tasks_from_dependencies(node_dependencies, group_translator)
update_node_tags(node_tags, tag_groups)
By convention I've decided to make this mechanism optional and use the prefix defined in the config file for convenience. Then, based on the results of grouping, dags choose whether to run a single node or a group of them:
task_id=name.lstrip(GROUPING_PREFIX),
cmds=["python", "-m", "kedro", "run", "--pipeline", pipeline_name, "--tags", name, "--env", "{{ env | default(local) }}"] if name.startswith(GROUPING_PREFIX)
else ["python", "-m", "kedro", "run", "--pipeline", pipeline_name, "--nodes", name, "--env", "{{ env | default(local) }}"],
name=f"pod-{ slugify(pipeline_name) }-{ slugify(name.lstrip(GROUPING_PREFIX)) }",
And that's it!
... or is it? What if we make a mistake in our tagging and the DAG stops being a DAG (a cycle is introduced)? Well, then Airflow DAG validation would shout at us for defining the incorrect DAG. But the mistake here can be also obscured, as Kedro's dependencies are hidden in grouped nodes and not visible from Airflow's perspective after translation. So to lessen the burden of debugging, it would be nice to add the tag validation code to the DAG creation process. As I've mentioned before, we don't have hooks available for this plugin (as of Kedro 0.18.13), so the next best place it fits is at the register pipelines function.
Now the kedro airflow create
command will result in an error or warning with the following message, should we make a mistake in tagging:
[09/29/23 18:48:00] INFO Validating pipelines tagging...
WARNING Group airflow:split has multiple machine tags, this may cause unexpected behavior in which machine is used for the group, please use only one machine tag per group
ERROR Pipeline __default__ has invalid grouping that creates a cycle in its grouping tags regarding nodes: {'train_model_node', '__start__', 'airflow:split'}
(“__start__” being the virtual node here that points to all other nodes added for simplicity of the algorithm)
If you've made it this far, thanks for reading. Now you get to see the images of this solution in action. You can find more detailed instructions in the project's README file. Here's the pipeline definition for reference:
Kedro Spaceflights pipelines:
[ # data processing pipeline
node(
func=preprocess_companies,
inputs="companies",
outputs="preprocessed_companies",
name="preprocess_companies_node",
tags=["airflow:companies"]
),
node(
func=preprocess_shuttles,
inputs="shuttles",
outputs="preprocessed_shuttles",
name="preprocess_shuttles_node",
tags=["airflow:shuttles", "machine:medium"]
),
node(
func=create_model_input_table,
inputs=["preprocessed_shuttles", "preprocessed_companies", "reviews"],
outputs="model_input_table",
name="create_model_input_table_node",
tags=["airflow:split", "machine:medium"]
),
]
...
[ # data science pipeline
node(
func=split_data,
inputs=["model_input_table", "params:model_options"],
outputs=["X_train", "X_test", "y_train", "y_test"],
name="split_data_node",
tags=["airflow:split", "machine:medium"]
),
node(
func=train_model,
inputs=["X_train", "y_train"],
outputs="regressor",
name="train_model_node",
tags=["machine:medium"]
),
node(
func=evaluate_model,
inputs=["regressor", "X_test", "y_test"],
outputs=None,
name="evaluate_model_node",
tags=["machine:medium"]
),
]
Pay special attention to the tags. It translates to the following Kedro pipeline:
We need the docker image, so we build & ship it to the docker registry in gcp in one go:
Then we create the DAG using the plugin and copy it to the Composer's DAGs bucket:
After a few minutes we should see our DAG in the Composer UI. We can trigger it manually and see the following results:
This was done with grouping disabled. Now let's enable it and see the difference:
In this example we use the grouping feature to change the names of single nodes (they define one node group) with "airflow:companies"
and "airflow:shuttles"
. Then we group the model input table creation and the data split into one node with an "airflow:split" tag.
Here's a side-by-side comparison of the generated tags with and without grouping:
Zooming in on one node, let's observe how the node's tags, name and machine tag translate to the pod's parameters.
KubernetesOperator
XComLet's have a look at GKE to observe how nodes translate to pods:
We can see that the airflow xcom node is sometimes left running behind for a long time - that's due to how slow the xcom mechanism can be in KubernetesPodOperator
. The exporter side container essentially waits for a signal to get killed after the data is read, but it can get left hanging for longer. This mechanism is also the reason why in this small example node to create a session in MLflow takes the longest to process (around 5 minutes). In many scenarios this is not an issue. In retrospect, this could be improved. The MLflow session creation can be either done directly in Airflow with PythonOperator
(and accept adding MLflow library as an external dependency in Airflow) or the communication mechanism can be replaced with the bucket as a medium instead of xcom.
Kedro-Airflow plugin, with some effort, can automate your chores away and make your engineers forget about DAG translation tasks and stay focused on more creative tasks! The only step left is to include the DAG creation and upload the process to your CI tools.
We hope you've enjoyed this article and found it useful. If you have any questions or suggestions, please contact us at hello@getindata.com
It's been a year since the announcement of the dbt-flink-adapter, and the concept of enabling real-time analytics with dbt and Flink SQL is simply…
Read moreData processing in real-time has become crucial for businesses, and Apache Flink, with its powerful stream processing capabilities, is at the…
Read moreSnowflake has officially entered the world of Data Lakehouses! What is a data lakehouse, where would such solutions be a perfect fit and how could…
Read moreThe need for a unified format for geospatial data In recent years, a lot of geospatial frameworks have been created to process and analyze big…
Read moreHardly anyone needs convincing that the more a data-driven company you are, the better. We all have examples of great tech companies in mind. The…
Read moreModern Data Stack has been around for some time already. Both tools and integration patterns have become more mature and battle tested. We shared our…
Read moreTogether, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.
What did you find most impressive about GetInData?