Airflow Fargate

Posted onby

Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: CC-BY-SA-4.0

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. When you create an environment, Amazon MWAA attaches the configuration settings you specify on the Amazon MWAA console in Airflow configuration options as environment variables to the AWS Fargate container for your environment. In Part 1 I’ve explained every step required to deploy Apache Airflow with celery executor on AWS ECS using EC2 (Highly recommend to read Part1 first). AWS Fargate is a technology that you can.

Now let’s look at two patterns for batch processing in Fargate, running scheduled tasks and integrating with a workflow system.

Scheduled batch processing

We often need to load data from an external site, and SFTP is still commonly used as a transfer mechanism. In this lab we’ll run a scheduled process to process files from an SFTP server.

Testing the SFTP connection

We created an SFTP server for you in Lab 1. You should see three outputs in the CDK from Lab 1; note these down for this lab.

To find the DNS address to use for the SFTP endpoint, go to the Transfer console and click on the VPC endpoint ID. Copy any of the IP addresses under the Subnets tab.

Go to the AWS Transfer for FTP console and upload your public key pair for the sftpuser account. Then back on the command line

Once connected, you should be able to go into the bucket directory and run commands.

Go ahead and upload a file into the SFTP site. We’ll process this file later in this lab.

Create a Git repository

Let’s create a Git repository. We’ll do this manually as setting up development tools is often done by hand.

Follow the guide to create a new repository in CodeCommit. Name it FargateSftpProcessor.

Follow the instructions on the console to clone your new repo into a local folder in Cloud9.

Check in source code

Go to your local copy of the FargateSftpProcessor repo:

Store your SSH key as a secret

Follow the instructions to store a new secret in SecretsManager. Choose Other type of secret, and store the contents of ~/.ssh.id_rsa as a Plain text secret. Name the secret sftpkey. Accept all other default values.

Airflow Aws Fargate

Push image

On the CLI:

Note that value in the output, which is our ECR repository URI. Use this in the following commands:

Deploy the scheduled task

Airflow Fargate Executor

Deploy our task and scheduler using the CDK:

This will set up a CloudWatch event that kicks off the task every 5 minutes. You can verify this by looking at CloudWatch logs for the task.

The task right now is a stub that just prints out the name of any files in the SFTP site. As a challenge:

  • Enhance the task to download files and store them in another S3 bucket. This will require not only changes to the Python code, but also changes to the permission policy for the task.
  • Set up a CI/CD pipeline to deploy the task automatically. You can use the pipelines from Lab 3 as a template.

Integrating with workflow system

Let’s consider a case where you need to convert a batch of small XML files to JSON as part of a larger batch processing pipeline. While generally you can do this sort of operation using Spark or some other tool from the Hadoop ecosystem, a Spark job might be overkill if the number of XML files is small. Let’s look at how to tackle this operation using Fargate tasks orchestrated by a workflow system, AWS Step Functions.

Create a Git repository

Let’s create a Git repository. We’ll do this manually as setting up development tools is often done by hand.

Airflow fargate executor

Follow the guide to create a new repository in CodeCommit. Name it FargateBatchProcessor.

Follow the instructions on the console to clone your new repo into a local folder in Cloud9.

Check in source code

Go to your local copy of the FargateBatchProcessor repo:

If you look at the code in app.py, you’ll see that this is a very simple task. It downloads one file from S3, converts it from XML to JSON, and then uploads the output to S3. You could do this sort of task equally well in Lambda, but do note that Fargate tasks have more storage available (10 GB for layer storage and 4 GB for volume storage) and don’t have any limitations on processing time.

Push image

On the CLI:

Note that value in the output, which is our ECR repository URI. Use this in the following commands:

Deploy the task

Deploy our batch processing task using the CDK:

Invoke the task

Now we’re ready to invoke this task as part of a data pipeline. There are several workflow tools available to manage data pipelines, including Apache Airflow and Glue Workflows. But in this lab we’ll create a Step Function workflow. Step Functions are general purpose state machines and can coordinate any number of activities. Plus, Step Functions integrate natively with ECS.

Your task is to create a Step Function that will:

  • Accept a list of XML files in S3 as input
  • For each file, invoke our Fargate task to process it

Once you’ve created your Step Function, invoke it with a list of XML files and make sure you see a corresponding JSON file after the workflow completes.

A possible solution is available in ../fargate-workshop/labs/batch-processing/solutions/batch_sfn.json.

Here are some hints:

Aws
  • You’ll need to provide some information like the Fargate cluster ID. In the provided solution, the fields you need to fill in are surrounded by brackets.
  • The input to the Step Function should be a list of paths (without the bucket name) in the S3 bucket we created in the last step. The task already has the bucket name set as an environment variable. There are two example XML files in the ../fargate-workshop/labs/batch-processing/data directory, or you can try some XML files of your own. In either case you’ll need to upload these files to S3.
  • The role you use to run the Step Function must have permission to invoke your task.

Next steps

Although this was a simple example of batch processing, you can easily extend it to more realistic use cases. Your pipeline (Step Function) should have a step to gather the input file list, and then after the XML-to-JSON conversion is done, you can send those JSON files to another system for further processing.

Also, think about how you’d automate the creation of the Step Function using the CDK and CodePipeline, similar to some of our previous labs.

To use these operators, you must do a few things:

Airflow Eks Fargate

  • Create necessary resources using AWS Console or AWS CLI.

  • Install API libraries via pip.

    Detailed information is available Installation

  • Setup Connection.

Use theECSOperatorto run a task defined in AWS ECS.

In the following example,the task “hello_world” runs hello-world task in c cluster.It overrides the command in the hello-world-container container.

Before using ECSOperator, cluster and task definition need to be created.

For further information, look at the documentation of run_task() methodin boto3.