How to run a Checkpoint in Airflow
This guide will help you run a Great Expectations checkpoint in Apache Airflow, which allows you to trigger validation of a data asset using an Expectation Suite directly within an Airflow DAG.
Prerequisites: This how-to guide assumes you have:
- Completed the Getting Started Tutorial
- Have a working installation of Great Expectations
- Set up a working deployment of Great Expectations
- Created an Expectation Suite
- Created a checkpoint for that Expectation Suite and a data asset
- Created an Airflow DAG file
Using checkpoints is the most straightforward way to trigger a validation run from within Airflow.
The following sections describe two alternative approaches to accomplishing this:
#
Steps#
Option 1: Running a checkpoint with a BashOperatorYou can use a simple BashOperator
in Airflow to trigger the checkpoint run. The following snippet shows an Airflow task for an Airflow DAG named dag
that triggers the run of a checkpoint we named my_checkpoint
:
validation_task = BashOperator( task_id='validation_task', bash_command='great_expectations checkpoint run my_checkpoint', dag=dag)
checkpoint script
output with a PythonOperator#
Option 2: Running the Another option is to use the output of the great_expectations checkpoint script
command and paste it into a method that is called from a PythonOperator in the DAG. This gives you more fine-grained control over how to respond to Validation Results:
Run
great_expectations checkpoint script
great_expectations checkpoint script my_checkpoint ... A Python script was created that runs the checkpoint named: `my_checkpoint`- The script is located in `great_expectations/uncommitted/my_checkpoint.py`- The script can be run with `python great_expectations/uncommitted/my_checkpoint.py`
Navigate to the generated Python script and copy the content
Create a method in your Airflow DAG file and call it from a PythonOperator:
def run_checkpoint(): # paste content from the checkpoint script here task_run_checkpoint = PythonOperator( task_id='run_checkpoint', python_callable=run_checkpoint, dag=dag,)
#
Additional ResourcesPlease see How to configure a New Checkpoint using "test_yaml_config" for additional Checkpoint configuration and DataContext.run_checkpoint()
examples.