Airflow Schedule Dag

Posted onby
This is where the branching Operators come in. The BranchPythonOperator is much like the PythonOperator except that it expects a python_callable that returns a task_id (or list of task_ids). The task_id returned is followed, and all of the other paths are skipped. It can also return None to skip all downstream task. The task_id returned by the Python function has to reference a task directly downstream from the BranchPythonOperator task. When a Task is downstream of both the branching operator and downstream of one or more of the selected tasks, it will not be skipped:. The paths of the branching task are branch_a, join and branch_b. Since join is a downstream task of branch_a, it will still be run, even though it was not returned as part of the branch decision. The BranchPythonOperator can also be used with XComs allowing branching context to dynamically decide what branch to follow based on upstream tasks. If you wish to implement your own operators with branching functionality, you can inherit from BaseBranchOperator, which behaves similarly to BranchPythonOperator but expects you to provide an implementation of the method choose_branch. As with the callable for BranchPythonOperator, this method can return the ID of a downstream task, or a list of task IDs, which will be run, and all others will be skipped. It can also return None to skip all downstream task:. Airflow's DAG Runs are often run for a date that is not the same as the current date - for example, running one copy of a DAG for every day in the last month to backfill some data. There are situations, though, where you don't want to let some (or all) parts of a DAG run for a previous date; in this case, you can use the LatestOnlyOperator. This special Operator skips all tasks downstream of itself if you are not on the "latest" DAG run (if the wall-clock time right now is between its execution_time and the next scheduled execution_time, and it was not an externally-triggered run). Here's an example:. In the case of this DAG:. task1 is directly downstream of latest_only and will be skipped for all runs except the latest. task2 is entirely independent of latest_only and will run in all scheduled periods. task3 is downstream of task1 and task2 and because of the default trigger rule being all_success will receive a cascaded skip from task1. task4 is downstream of task1 and task2, but it will not be skipped, since its trigger_rule is set to all_done. You can also say a task can only run if the previous run of the task in the previous DAG Run succeeded. To use this, you just need to set the depends_on_past argument on your Task to True. Note that if you are running the DAG at the very start of its life---specifically, its first ever automated run---then the Task will still run, as there is no previous run to depend on. By default, Airflow will wait for all upstream (direct parents) tasks for a task to be successful before it runs that task. However, this is just the default behaviour, and you can control it using the trigger_rule argument to a Task. The options for trigger_rule are:. all_success (default): All upstream tasks have succeeded. all_failed: All upstream tasks are in a failed or upstream_failed state. all_done: All upstream tasks are done with their execution. all_skipped: All upstream tasks are in a skipped state. one_failed: At least one upstream task has failed (does not wait for all upstream tasks to be done). one_success: At least one upstream task has succeeded (does not wait for all upstream tasks to be done). none_failed: All upstream tasks have not failed or upstream_failed - that is, all upstream tasks have succeeded or been skipped. none_failed_min_one_success: All upstream tasks have not failed or upstream_failed, and at least one upstream task has succeeded. none_skipped: No upstream task is in a skipped state - that is, all upstream tasks are in a success, failed, or upstream_failed state. always: No dependencies at all, run this task at any time. You can also combine this with the Depends On Past functionality if you wish. It's important to be aware of the interaction between trigger rules and skipped tasks, especially tasks that are skipped as part of a branching operation. You almost never want to use all_success or all_failed downstream of a branching operation. Skipped tasks will cascade through trigger rules all_success and all_failed, and cause them to skip as well. Consider the following DAG:. join is downstream of follow_branch_a and branch_false. The join task will show up as skipped because its trigger_rule is set to all_success by default, and the skip caused by the branching operation cascades down to skip a task marked as all_success. By setting trigger_rule to none_failed_min_one_success in the join task, we can instead get the intended behaviour:. Since a DAG is defined by Python code, there is no need for it to be purely declarative; you are free to use loops, functions, and more to define your DAG. For example, here is a DAG that uses a for loop to define some Tasks:. In general, we advise you to try and keep the topology (the layout) of your DAG tasks relatively stable; dynamic DAGs are usually better used for dynamically loading configuration options or changing operator options. If you want to see a visual representation of a DAG, you have two options:. You can load up the Airflow UI, navigate to your DAG, and select "Graph". You can run airflowdagsshow, which renders it out as an image file. We generally recommend you use the Graph view, as it will also show you the state of all the Task Instances within any DAG Run you select. Of course, as you develop out your DAGs they are going to get increasingly complex, so we provide a few ways to modify these DAG views to make them easier to understand. A TaskGroup can be used to organize tasks into hierarchical groups in Graph view. It is useful for creating repeating patterns and cutting down visual clutter. Unlike SubDAGs, TaskGroups are purely a UI grouping concept. Tasks in TaskGroups live on the same original DAG, and honor all the DAG settings and pool configurations. Dependency relationships can be applied across all tasks in a TaskGroup with the >> and << operators. For example, the following code puts task1 and task2 in TaskGroup group1 and then puts both tasks upstream of task3:. TaskGroup also supports default_args like DAG, it will overwrite the default_args in DAG level:. If you want to see a more advanced use of TaskGroup, you can look at the example_task_group.py example DAG that comes with Airflow. By default, child tasks/TaskGroups have their IDs prefixed with the group_id of their parent TaskGroup. This helps to ensure uniqueness of group_id and task_id throughout the DAG. To disable the prefixing, pass prefix_group_id=False when creating the TaskGroup, but note that you will now be responsible for ensuring every single task and group has a unique ID of its own. As well as grouping tasks into groups, you can also label the dependency edges between different tasks in the Graph view - this can be especially useful for branching areas of your DAG, so you can label the conditions under which certain branches might run. To add labels, you can use them directly inline with the >> and << operators:. Or, you can pass a Label object to set_upstream/set_downstream:. Here's an example DAG which illustrates labeling different branches:. It's possible to add documentation or notes to your DAGs & task objects that are visible in the web interface ("Graph" & "Tree" for DAGs, "Task Instance Details" for tasks). There are a set of special task attributes that get rendered as rich content if defined:. Please note that for DAGs, doc_md is the only attribute interpreted. This is especially useful if your tasks are built dynamically from configuration files, as it allows you to expose the configuration that led to the related tasks in Airflow:. Sometimes, you will find that you are regularly adding exactly the same set of tasks to every DAG, or you want to group a lot of tasks into a single, logical unit. This is what SubDAGs are for. For example, here's a DAG that has a lot of parallel tasks in two sections:. We can combine all of the parallel task-* operators into a single SubDAG, so that the resulting DAG resembles the following:. Note that SubDAG operators should contain a factory method that returns a DAG object. This will prevent the SubDAG from being treated like a separate DAG in the main UI - remember, if Airflow sees a DAG at the top level of a Python file, it will load it as its own DAG. This SubDAG can then be referenced in your main DAG file:. You can zoom into a SubDagOperator from the graph view of the main DAG to show the tasks contained within the SubDAG:. Some other tips when using SubDAGs:. By convention, a SubDAG's dag_id should be prefixed by the name of its parent DAG and a dot (parent.child). You should share arguments between the main DAG and the SubDAG by passing arguments to the SubDAG operator (as demonstrated above). SubDAGs must have a schedule and be enabled. If the SubDAG's schedule is set to None or @once, the SubDAG will succeed without having done anything. Clearing a SubDagOperator also clears the state of the tasks within it. Marking success on a SubDagOperator does not affect the state of the tasks within it. Refrain from using Depends On Past in tasks within the SubDAG as this can be confusing. You can specify an executor for the SubDAG. It is common to use the SequentialExecutor if you want to run the SubDAG in-process and effectively limit its parallelism to one. Using LocalExecutor can be problematic as it may over-subscribe your worker, running multiple tasks in a single slot. See airflow/example_dags for a demonstration. Parallelism is not honored by SubDagOperator, and so resources could be consumed by SubdagOperators beyond any limits you may have set. SubDAGs, while serving a similar purpose as TaskGroups, introduces both performance and functional issues due to its implementation. The SubDagOperator starts a BackfillJob, which ignores existing parallelism configurations potentially oversubscribing the worker environment. SubDAGs have their own DAG attributes. When the SubDAG DAG attributes are inconsistent with its parent DAG, unexpected behavior can occur. Unable to see the "full" DAG in one view as SubDAGs exists as a full fledged DAG. SubDAGs introduces all sorts of edge cases and caveats. This can disrupt user experience and expectation. TaskGroups, on the other hand, is a better option given that it is purely a UI grouping concept.