Deploying Prefect Server with AWS ECS and Docker Storage

How to orchestrate and automate workflows with Prefect running on ECS Fargate with a private Docker registry

Emma Rizzi
Towards Data Science

--

This article is a step by step guide of how I deployed a Prefect Server with a Fargate ECS Cluster using Docker Storage on a private registry. A few articles covered deployment with Prefect Cloud and different combinations of runners and storage. As I ran through a few issues when deploying this specific architecture, I decided to detail the steps in this article.

Photo by Joshua Aragon on Unsplash

By the end of this tutorial, we will have :

  • Prefect Server running on EC2
  • a ECSAgent running on EC2
  • packaged our flows in docker images in a private registry
  • run a flow in an ECS cluster through the UI and GraphQL API

You can choose to use different Agents and Storage options, this article is only a base guideline to the many options of Prefect. Although I will not detail Prefect and AWS concepts, you will find references to relevant documentation to help you.

Prerequisites :

  • AWS Account
  • A Docker registry
  • Basic Python skills

If it is your first experience with Prefect, I recommend trying to deploy it locally first to grasp Prefect concepts easier.

Deploying Prefect Server

The first step is to deploy Prefect Server on an EC2 instance. Create and launch an EC2 instance. I used a g4dn.2xlarge instance as I deployed multiple services on it, but a small one can be enough if you plan to use only Prefect on it. You will need to update the security rules to allow TCP connections on ports 8080 to access the UI, and 4200 for the backend.

You may need to install docker-compose if it’s not already on your instance.

Next, SSH on your instance and install prefect and run :

pip install “prefect[aws]”

See other options of prefect installation here.

prefect backend server

Create the file ~/.prefect/config.toml containing the following information :

[server]
[server.ui]
apollo_url = “http://X.X.X.X:4200/graphql"

Replace X.X.X.X with your instance public adress as it allows to connect the UI to the backend.

You can now start Prefect with :

prefect server start
Prefect Server started

From now, you can connect to the UI from any machine (authorized by your security rules) by going on http://X.X.X.X:8080

Deploy AWS Agent

The agents are responsible for starting and monitoring your runs. Here, we deploy it on the same EC2 instance as Prefect Server. You can choose to deploy it on a second instance by providing the Prefect Server’s instance IP address in the config.toml file or throught the command line.

In both cases, you need to add :

[cloud]
api = “http://X.X.X.X:4200"

to the config.toml file, with X.X.X.X the public IP adress to the Prefect Host.

This variable is needed for AWS tasks to connect to the backend (even if we run Prefect Server and not Cloud). You can follow this issue for more information on this endpoint configuration.

If you don’t have an ECS fargate cluster already in place, you should create it now. There is no particular configuration needed to work with Prefect, so choose accordingly to your tasks’ needs. Keep the cluster ARN close as you will need it soon.

Then, create an AWS user with the permission AmazonECS_FullAccess, this is needed to give the agent permissions to start tasks in your ECS cluster on your behalf. Store the access key id and secret access key in the config file on the instance that will host your agent :

Create a file ~/.aws/config containing :

[default]
aws_access_key_id=SECRET_KEY_ID
aws_secret_access_key=SECRET_ACCESS_KEY
region=REGION

Now, deploy the agent :

prefect agent ecs start -cluster arn:your-cluster-arn -label your-label
Prefect Agent started

The cluster option allows you to specify which cluster will be used to run your tasks. The label option lets you provide labels for your agent. Note that agents can pick up flows only if their labels match exactly, this is useful to control which agent will run your flows. Learn more about agent configurations here.

Check on the UI, you can see your Agent is running. Now Prefect is ready to run your flows!

Agent Registered on Prefect UI

Create a flow

Let’s create a flow. Here is a example Hello world job :

import prefect
from prefect.run_configs import ECSRun
from prefect.storage import Docker
from prefect import task, Flow, Parameter
@task
def hello_world(name):
logger = prefect.context.get(“logger”)
logger.info(“Hello “+name)
with Flow(“Hello”) as flow:
name = Parameter(‘name’, default = ‘Bob’)
hello_world(name)
flow.storage = Docker(
registry_url=’your-registry-url’,
image_name=”hello_container”,
base_image=”python:3.8"
)
flow.run_config = ECSRun(
task_definition={
‘containerDefinitions’:[
{
‘name’: ‘flow’,
‘image’: ‘your-registry-url/hello_container:latest’,
‘repositoryCredentials’: {
‘credentialsParameter’: ‘arn:aws:secretsmanager:your-secret-arn’
}
}
],
‘networkMode’ : ‘awsvpc’,
‘memory’:’2 GB’,
‘cpu’:’1024'
},
execution_role_arn=’arn:aws:iam::your-arn-role’,
labels=[‘your-label’]
)
flow.register(project_name=”Hello_Project”)

This script defines a flow that executes one task and takes one input Parameter ‘name’.

flow.storage defines the storage strategy. Docker Storage is useful when you want to control which files to include in your flow, as other storage options only store the flow file. You can use a public repository, in which case you only need to specify the registry address. Here I detail the configuration needed for a private registry :

To allow Prefect to push the image, SSH on the Agent’s instance and run

docker login <registry_address>

and follow the authentication steps. Then, we need to allow the ECS cluster to pull the images: Create a secret on AWS to store your registry login, this ARN is given in the credentialsParameter argument.

Now, go to the role management section and create a new role with read permission on this secret, the ARN of this role is then given in execution_role_arn.

ECS Agents allow 2 types of role : execution and task role. Depending on your flows, you might need more roles. For informations about the roles, see prefect’s documentation.

The task_definition parameter is needed here as the only way to provide credentials for the private registry. ‘networkMode’, ‘memory’ and ‘cpu’ fields are required, read this documentation for more information

If you use a public registry, you can simply provide the registry and image name.

I experienced timeouts when pushing the image to the registry as my image size was quite big (2GB in my case due to machine learning dependencies). A quick workaround was to push manually the first image so the common layers with big dependencies are already available on my registry, next pushes can reuse them.

Finally, add labels accordingly to your Agent’s labels, and your flow is ready to be registered.

To perform Prefect commands on your local machine, you can install Prefect and create the ~/.prefect/config.toml such as the file on your EC2 instance. Every Prefect command will now connect to your Prefect Server instance.

You can run :

prefect create project ‘Hello_Project’

or create the project throught the UI. Then, execute your python script :

python flow.py

You’ll see the image being built and pushed to your registry.

Registering a flow

Run the flow

You can then start your flow directly from the UI, the Quick Run allow you to run the flow with default parameters.

Overview of a task

You can trigger it through the command line :

prefect run flow -name “Hello” -project “Hello_Project”

We can also launch the flow through the GraphQL API. This method allows you to trigger a flow from anywhere (allowed by your security rules on your Prefect Server EC2 instance!) with just an HTTP request. Here is a sample script using the flow ID :

import json
import urllib.parse
import urllib.request
PREFECT__FLOW_ID = 'your-flow-id'
PREFECT__CLOUD__API='http://X.X.X.X:4200/graphql'
def trigger_flow(flow_id, host):
create_mutation = """
mutation {
create_flow_run(input: { flow_id: "%s" }) {
id
}
}
""" % (flow_id)
inputs = dict(flowId=flow_id)
variables = dict(input=inputs)
data = json.dumps(
dict(query=create_mutation, variables=json.dumps(variables))
).encode("utf-8")
req = urllib.request.Request(host, data=data)
req.add_header("Content-Type", "application/json")
resp = urllib.request.urlopen(req) return json.loads(resp.read().decode())if __name__ == "__main__":
print(trigger_flow(PREFECT__FLOW_ID, PREFECT__CLOUD__API))

More options here. Here is the resulting logs after running this script :

Flow logs

Conclusion

We detailed in this article how to deploy a Prefect Server on EC2 and run tasks on an ECS cluster using Docker storage with a private registry. Now you’re ready to create your own tasks!

Ressources :

--

--