Last updated: July 05, 2025
Airflow Data Quality Testing Example
Read this guide to learn how to run data quality checks in an Apache Airflow DAG, and how to stop the pipeline if data quality issues are detected.
Overview
This page presents how to use the run checks operator in the non-blocking DAG configuration in Airflow.
Also, the example demonstrates how to control the loading data stage execution by the detection of a data quality issue.
Pre-requirements
Entry requirements include:
- Installation of python package from PyPi called dqops
- Configuration of data source and checks in DQOps
DAG configuration
The below DAG uses three tasks:
- run checks,
- wait for run checks execution,
- load data.
The code snippet does not contain the configuration of the data loading operator. It has to be completed first before using the example.
import datetime
import pendulum
from airflow import DAG
from dqops.airflow.run_checks.dqops_run_checks_operator import DqopsRunChecksOperator
from dqops.airflow.wait_for_job.dqops_wait_for_job_operator import DqopsWaitForJobOperator
from dqops.client.models.check_type import CheckType
from dqops.client.models.rule_severity_level import RuleSeverityLevel
... # (1)!
with DAG(
dag_id="example_connection_run_checks_wait_and_load",
schedule=datetime.timedelta(hours=12), # (2)!
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["dqops_example"]
) as dag:
run_checks = DqopsRunChecksOperator(
task_id="dqops_run_checks_operator_task",
base_url="http://host.docker.internal:8888", # (3)!
connection="example_connection",
check_type=CheckType.MONITORING,
fail_on_timeout=False, # (4)!
wait_timeout=10 # (5)!
)
wait_for_job = DqopsWaitForJobOperator(
task_id="dqops_wait_for_job",
base_url="http://host.docker.internal:8888",
retries=120, # (6)!
retry_delay=60
)
""" This section contains the business loading task
load_new_data = # first configure the loading operator used in your project
"""
run_checks >> wait_for_job >> load_new_data # (7)!
- The import of the operator for the load_new_data task.
- The DAG is set to run twice a day, every 12 hours.
- The example uses a dockerized Airflow environment. It connects to the local DQOps instance on a localhost which can be reached from images with substitution the "host.docker.internal" in place of "localhost".
- The parameter that allows bypass failure on the timeout on the long-running task. Otherwise, setting the trigger_rule to "all_done" in the configuration of the wait for job operator allows it's task to be executed when the upstream run checks task fails
- The task is expected to be finished in over 10 minutes. The run checks task responsibility is to call the DQOps API only. The default timeout (120s) is limited to 10s.
- The total time in seconds for the operator to wait will be the product of retries number and the retry_delay value in seconds.
- The example uses the automatically passed job ID. For this configuration it is important to place the wait for job task right after the run checks task.
First the run checks task is executed. It is expected to be a long-running activity and it will not finish in a short time. There is no reason to waste the Airflow worker which stays allocated. To make the run checks task non-blocking, the DAG uses the especially designed wait for job task. The responsibility to receive the API response with the finished execution details is excluded from run checks task and passed to the next, downstream task. Now the run checks task is only responsible for calling the DQOps API to initiate running the checks.
The example releases the worker that execute run checks task before the default timeout passes (which is 120 seconds) by setting the wait_timeout parameter to 10 seconds. The time of 10 seconds ensures proper operation of the task.
The wait for job task will track the run checks task for two hours. It will start and call DQOps API every 60 seconds, 120 times in total.
Wait for job operator usage
The wait for job operator can be used with an another job types as well.
The overall tracking time is a product of two Airflow operator parameters called retries and retry_delay.
The tracking task will now Fail on detection of a data quality issue blocking the execution of the load new data task.
Be informed by notifications
Consider a notification integration that is available in DQOps.
It allows you to receive the data quality status of resources as fast as possible.
In the above case you will be informed that run checks activity failed and new data have not loaded.
Working example in Airflow
An issue becomes visible in the Airflow Web UI. One of the recent task circles became red in the DAG. The failure is shown in the DAG details. The load new data task has not been started due to an issue. The default value "all_success" of the trigger_rule parameter of the operator demands all directly upstream tasks have finished.
It will not be started in the next scheduled DAG executions until the issue is solved.
For more details which will help to solve the issue use the Incidents in the DQOps UI and filter out the connection on the left side.
To reach more information about the issue click the link on the Data quality issue grouping in the top menu.
The issue in the example points to the Completeness dimension. The check that controls the expected number of rows failed. This means the data are incomplete and the issue has to be investigated and resolved.
Checks verification and adjustments
The actual number of rows verified by the check might be expected. It may be a result of some work with the data source. In this case the rules of the sensor have to be adjusted to the occurred change.
After fixing the issue, the task can be executed manually to rerun the data loading. You will ensure that the fix did not corrupt the data in case of another data quality dimension because run checks will be executed again.
Finally, the DAG execution should finish uninterrupted as presented on the screen.
Observe the table after loading
Consider using another run checks task as a downstream task to the loading task.
This approach allows you to observe the state of the table and the data quality before and after the loading phase.