Skip to content

Last updated: July 05, 2025

Airflow Wait for Job Operator with Example DAG

Read this guide to understand how to start data quality jobs from Apache Airflow DAG, how to wait for long-running data quality jobs, and how to handle errors.

Overview

The wait for job operator is designed to release the computation resources from the Airflow when waiting for a job to be completed.

Instead of running an Airflow task for an hour, which allocates an Airflow worker, the tracking task based on DqopsWaitForJobOperator will poke DQOps every couple of seconds for a running job until it is finished.

At first, the operator allows a long-running task to be completed with timeout. Then the wait for job operator is used to track the status of the long-running task over a specific time. The time is configured by a user with the use of two Airflow-specific variables from the BaseOperator: retries and retry_delay.

Total time of wait for job task

The retries variable refers to the number of task instantiations with a single DQOps API call. The retry delay is simply the delay before the next try. The overall time that the wait for job operator become active is the product of the values of these two parameters.

The wait for job operator is based on a unique identifier of the tracked task, which remains unchanged throughout the application’s lifetime. Therefore, pulling the status of a specific job by its identifier is always available.

There are two forms of different identifiers that can be used. The first is a Job ID that is automatically generated by the DQOps platform when the job is created. The second option is to use a job business key that can be set by a user, which allows for setting custom identifiers

Uniqueness of the job business key

Be careful when using the job business key because the identifier has to be unique.

Operator parameters

The table presents the parameters that are supported by the wait for job operator. All parameters are optional.

Name Description Type
task_id_to_wait_for The ID of a task that the operator will wait for. int
base_url The base URL to the DQOps application. str [optional, default="http://localhost:8888/"]
job_business_key Job business key is a user-assigned unique job ID, used to check the job status by looking up the job by a user-assigned identifier, instead of the DQOps-assigned job identifier. Union[Unset, None, str] = UNSET
wait_timeout Execution timeout value in seconds. It prevents hanging tasks if action is never completed. If not set, the default timeout is 120 seconds. int
fail_on_timeout Timeout is leading the task status to Failed by default. It can be omitted marking the task as Success by setting the flag to True. bool [optional, default=True]
fail_at_severity (Used only when tracking run checks task) The threshold level of rule severity, causing that an airflow task finishes with failed status. RuleSeverityLevel [optional, default=RuleSeverityLevel.FATAL]

The operator inherits from BaseOperator and adds the above parameters. For the complete list of BaseOperator parameters, visit the official Airflow webpage https://airflow.apache.org/

Setup the task to be tracked

To track the task, use one of three ways of configuration the operator:

  • Automatically passed job id
  • Setting the task_id_to_wait_for
  • Setting the job_business_key

With the first approach, the operator uses the return_value pushed to Airflow's XCom to retrieve the job ID value. To make this work, it is crucial to set the wait for job task directly after the long-running task. The wait for job task has to be a downstream task to the long-running task. Otherwise, the automatically passed job ID will not work or the job ID passed to the wait for job task does not come from the long-running task, but does come the upstream one.

The second approach use the task_id_to_wait_for parameter. The value of the parameter has to be the name of the task to be tracked.

Third approach use the job_business_key parameter. The same value has to be set to the both, the job and the tracking task. The value of the job_business_key has to be unique in the system, for each of the executions.

Usage example

Before starting any task in Airflow, it is important to determine if the task can be completed within the default 120 seconds.

A point of interest might be a run checks operator since the time of checks execution depends on number of checks and the data size.

import pendulum
from airflow import DAG
from dqops.airflow.run_checks.dqops_run_checks_operator import DqopsRunChecksOperator
from dqops.airflow.wait_for_job.dqops_wait_for_job_operator import DqopsWaitForJobOperator
from dqops.client.models.check_type import CheckType
from dqops.client.models.rule_severity_level import RuleSeverityLevel

with DAG(
    dag_id="example_connection_wait_for_job_run_checks",
    schedule="@once",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["dqops_example"]
) as dag:
    run_checks_task = DqopsRunChecksOperator(
        task_id="dqops_run_checks",
        base_url="http://host.docker.internal:8888",
        connection="example_connection",
        fail_at_severity=RuleSeverityLevel.WARNING,
        check_type=CheckType.MONITORING
    )

    wait_for_job = DqopsWaitForJobOperator(
        task_id="dqops_wait_for_job",
        base_url="http://host.docker.internal:8888",
        retries=200,
        retry_delay=30,
        trigger_rule="all_done"
    )

    run_checks_task >> wait_for_job

Troubleshooting

The job_business_key value is different among the tasks in an Airflow DAG.

Make sure that the job_business_key is a constant string literal rather than a dynamically generated value that is set to a variable in a DAG. The Python code that defines the DAG should be treated as a static declaration file or a configuration script similar to an XML. Airflow manages the execution of the tasks defined in the DAG, which may be distributed across multiple Airflow workers. Each part of the DAG can be executed on a different machine in a different environment.

Missing a wait for job task

It is important to ensure that the wait for job task only appears once as a node in the DAG. As the name DAG stands for Directed Acyclic Graph, a single wait for job task should not be used to track multiple long-running tasks. If this happens, the node will become a cyclic node, and the DAG will be broken.

To avoid this issue, make sure that the DAG uses a separate wait for job tasks for each of the tracked tasks.

What's next