Using custom containers with Dataflow flex templates

Travis Webb
4 min readOct 9, 2022

--

It’s easier than it looks

Dataflow custom containers are awesome. As an example, custom containers enables geobeam to pre-install GDAL, PROJ, and other system dependencies onto dataflow workers to process spatial data at scale. Dataflow templates allow you to templatize a pipeline so that you can easily kick off a job from a Cloud Function, or take advantage of the form UI that is generated for a template.

Product documentation has to cover everything, so there isn’t always a well-paved happy path for every use case. This article describes how to use custom containers with Dataflow templates.

Custom containers are useful if your workers need a bunch of extra OS libraries or other dependencies. If you need OS dependencies, custom containers are your best option. Just create your Dockerfile using one of the Apache Beam base images, such as apache/beam_python3.8_sdk:2.41.0. (geobeam’s big ugly Dockerfile is available as an example). If you have a ton of language dependencies, custom containers can also reduce worker startup time and make your pipeline run faster.

First, get your custom container job working by itself, without using a template. The command to start a job might look something like this:

python nfhl_pipeline.py 
--runner DataflowRunner
--sdk_container_image gcr.io/dataflow-geobeam/base
--project <project>
--temp_location gs://<somewhere>
--service_account_email <sa>@<project>.iam.gserviceaccount.com
--region us-central1
--dataset nfhl_staging
--worker_machine_type c2-standard-4
--max_num_workers 12
--gcs_url gs://geo-demos/ark-demo/sources/nfhl/NFHL_12_20220411.zip

This pipeline imports flood zone and risk data from the National Flood Hazard Layer (NFHL), reprojects the geometries, casts some field types, and inserts the resulting records into BigQuery for analysis.

But let’s say you only want to pass in gcs_url and have everything else built-in. You might want to only allow a pipeline to use a fixed number workers, or a particular container image, no matter who’s starting the job. Because the container image is coupled tightly to the pipeline code, it can make sense to hardcode that value in the pipeline itself.

You can configure these values directly in your pipeline code by using the PipelineOptions object:

pipeline_options = PipelineOptions(
pipeline_args,
experiments=[‘use_runner_v2’],
temp_location=’gs://<somewhere>’,
sdk_container_image=’gcr.io/dataflow-geobeam/base’,
project=’<project>’,
region=’us-central1',
worker_machine_type=’c2-standard-4',
max_num_workers=12,
dataset='nfhl_staging'
)
with beam.Pipeline(options=pipeline_options) as p:
# ... etc

Now, even running the job normally, you don’t need to set as many command-line flags:

python nfhl_pipeline.py 
--runner DataflowRunner
--service_account_email <sa>@<project>.iam.gserviceaccount.com
--gcs_url gs://geo-demos/ark-demo/sources/nfhl/NFHL_12_20220411.zip

This can be beneficial by making the pipeline more accessible and easier to use by others, particularly if you’re using a custom container that works best on a particular machine size. Some GDAL operations can be memory and CPU-intensive which is why I want to default to a c2 family machine.

Now, templatize

Dataflow flex templates consist of a “launcher” container, and a metadata file that specifies the parameters the pipeline should expect. (If you’re used to firing up Dataflow jobs directly for your laptop or Cloud Shell, then in that scenario your laptop or Cloud Shell is the “launcher”)

The launcher container image contains only your pipeline code (nfhl_pipeline.py in this case) and any Python dependencies that your main pipeline code requires (e.g. requirements.txt for a Python pipeline, or pom.xml for a Java pipeline). The launcher container doesn’t do any work for your job — it only generates the execution graph that Dataflow uses to farm out work to the workers.

In this case, my custom container is running Python 3.8, so I use the Python 3.8 version of the launcher template base image: python38-template-launcher-base.

FROM gcr.io/dataflow-templates-base/python38-template-launcher-baseARG DIR=/dataflow/template
RUN mkdir -p ${DIR}
WORKDIR ${DIR}
COPY requirements.txt .
COPY nfhl_pipeline.py .
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=”${DIR}/requirements.txt”
ENV FLEX_TEMPLATE_PYTHON_PY_FILE=”${DIR}/nfhl_pipeline.py”
RUN pip install --upgrade pip
RUN pip install apache-beam[gcp]
RUN pip install -U -r ./requirements.txt

Now build the launcher container, and docker push it to Google Container Registry, e.g.

docker build -t gcr.io/<project>/nfhl_template_launcher
docker push gcr.io/<project>/nfhl_template_launcher

The metadata file is a json file that identifies the template and specifies its parameters. For the example pipeline above, the metadata.json file is straightforward:

{
“name”: “nfhl-template”,
“description”: “load all nfhl layers into bigquery”,
“parameters”: [
{
“name”: “gcs_url”,
“label”: “gcs path to nfhl state geodatabase”,
“helpText”: “gcs path to nfhl state geodatabase”
}
]
}

Now that you have the launcher container and the metadata file, we can deploy the new template using the gcloud CLI:

gcloud dataflow flex-template build gs://<bucket>/nfhl-template.json
--image gcr.io/geo-solution-demos/nfhl_pipeline_template
--sdk-language PYTHON
--metadata-file metadata.json

This creates a template using the specified metadata file and container image, and upload the metadata file to the specified GCS location. Now, it’s ready to run, either from the command-line:

gcloud dataflow flex-template run “nfhl-import”
--template-file-gcs-location gs://<bucket>/nfhl-template.json
--parameters “gcs_url=gs://geo-demos/ark-demo/sources/nfhl/NFHL_45_20220329.zip”

or programmatically, from a Cloud Function or any other application:

def run(data, context):
from googleapiclient.discovery import build
project = ‘<project>’
job = ‘nfhl-load’
template = ‘gs://geo-demos/ark-demo/templates/nfhl-template.json’
inputFile = ‘gs://’+str(data[‘bucket’]) + ‘/’ + str(data[‘name’])
parameters = {
‘gcs_url’: inputFile,
}
environment = {‘temp_location’: ‘gs://gsd-pipeline-temp’}
service = build(‘dataflow’, ‘v1b3’, cache_discovery=False) request = service.projects().locations().flexTemplates().launch(
projectId=project,
location=’us-central1',
body={
‘launchParameter’: {
‘jobName’: job,
‘parameters’: parameters,
‘environment’: environment,
‘containerSpecGcsPath’: template
}
}
)
response = request.execute()

Now you have a working Dataflow flex template using a custom container that can be launched from Cloud Functions or any other Python program. You can deploy a cloud function that runs when a new file is uploaded using this command:

gcloud functions deploy nfhl-pipeline-launcher
--region us-central1
--runtime python38
--entry-point run
--trigger-bucket nfhl-uploads
--source .

--

--