Post

Getting started with GCP Dataflow for simple batch pipelines

In this article let us see how to get started with GCP Dataflow for a simple batch pipeline, and why you might choose this approach when you want Google to manage most of the heavy lifting for running Apache Beam jobs. If you are coming from cron scripts, Airflow operators, or even Spark jobs, Dataflow feels different at first. You write a Beam pipeline, package it, and let Dataflow run and scale it for you. For small teams this is useful because you spend less time managing clusters and more time building the actual pipeline.

For our use case, let us assume we receive CSV files in a GCS bucket every day and we want to clean the records and write them to BigQuery. We could do this in many ways in GCP, but Dataflow is a good fit when the logic is more than just a simple load and when we want the same Beam code pattern to grow later.

When Dataflow makes sense

Before building anything, it helps to know where Dataflow fits. I usually think of it like this:

ToolGood forNot ideal for
BigQuery load jobsSimple file loads into tablesComplex row-level transformations
Cloud FunctionsSmall event-driven processingLarge batch workloads or heavy transforms
DataflowManaged batch and streaming pipelinesVery small jobs where the setup cost is not worth it
Dataproc/SparkMore control over cluster and Spark ecosystemTeams that do not want cluster management

If the file just needs to be copied into BigQuery as-is, Dataflow may be too much. But if you need parsing, validation, enrichment, deduplication, or multiple outputs, it starts to make more sense.

What we need before starting

For a basic demo, we need the below items in GCP:

  1. A GCS bucket for input files
  2. A BigQuery dataset and destination table
  3. A service account with permissions for Dataflow, GCS, and BigQuery
  4. Dataflow API enabled
  5. A local environment with Python and the Apache Beam SDK

You can also use Java for Beam, but for a first pipeline I feel Python is easier to get running and easier to read.

Install the Beam SDK

On the local system, create a virtual environment and install Apache Beam with GCP extras.

1
2
3
4
python3 -m venv .venv
source .venv/bin/activate
pip install --upgrade pip
pip install 'apache-beam[gcp]==2.61.0'

The exact Beam version can change over time. It is better to pin it because Dataflow runner behavior can differ between versions.

Create a sample input file

Let us create a small CSV file that we will place in GCS.

customer_id,customer_name,country,spend
1,Asha,AU,125.50
2,Ravi,IN,88.10
3,Mina,AU,210.00
4,Leo,SG,not_available

The last row is useful because it gives us one bad value to handle. In real projects, source files always have some surprises.

Upload it to GCS:

1
gsutil cp customers.csv gs://my-dataflow-demo-bucket/input/customers.csv

Create the BigQuery table

For the destination, let us keep the schema simple.

1
2
3
4
5
6
7
8
9
create schema if not exists demo_dw;

create table if not exists demo_dw.customer_spend (
  customer_id int64,
  customer_name string,
  country string,
  spend numeric,
  load_ts timestamp
);

You can create this in the BigQuery UI as well, but I prefer keeping the schema in SQL so it is easier to review and reuse.

Write the Beam pipeline

Now let us create a simple pipeline file called main.py.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from datetime import datetime
import csv

class ParseCsvRow(beam.DoFn):
    def process(self, line):
        if line.startswith('customer_id'):
            return

        row = next(csv.reader([line]))
        try:
            yield {
                'customer_id': int(row[0]),
                'customer_name': row[1],
                'country': row[2],
                'spend': float(row[3]),
                'load_ts': datetime.utcnow().isoformat()
            }
        except Exception:
            return

def run():
    options = PipelineOptions()

    with beam.Pipeline(options=options) as p:
        (
            p
            | 'Read CSV' >> beam.io.ReadFromText('gs://my-dataflow-demo-bucket/input/customers.csv')
            | 'Parse Rows' >> beam.ParDo(ParseCsvRow())
            | 'Write To BigQuery' >> beam.io.WriteToBigQuery(
                'my-project:demo_dw.customer_spend',
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER
            )
        )

if __name__ == '__main__':
    run()

This is intentionally small. We read the file, skip the header, parse rows, ignore bad rows, and append the output to BigQuery. For a demo, this is enough to understand the flow.

One thing to notice is that silently dropping bad rows is usually not a good production design. In production, I would send failed rows to a dead-letter location in GCS or another BigQuery table so they can be reviewed later.

Run the job on Dataflow

We can now submit the pipeline to Dataflow using the Dataflow runner.

1
2
3
4
5
6
7
8
python main.py \
  --runner DataflowRunner \
  --project my-project \
  --region australia-southeast1 \
  --temp_location gs://my-dataflow-demo-bucket/temp \
  --staging_location gs://my-dataflow-demo-bucket/staging \
  --job_name customer-spend-batch-job \
  --save_main_session

Once the command starts, Dataflow creates the managed resources needed for the job. From the GCP console you can monitor the job graph, worker logs, and execution details. For beginner teams, this is one of the nicer parts of Dataflow. You do not have to build your own execution UI.

What to check after the job runs

After completion, run a quick query in BigQuery.

1
2
3
select customer_id, customer_name, country, spend
from demo_dw.customer_spend
order by customer_id;

You should see the valid rows loaded and the bad not_available row excluded.

This is also a good place to verify the worker logs in Cloud Logging. If the data volume is larger, logs become important because many pipeline issues only show up clearly there.

Practical things to be careful about

From my experience, a few things trip people up when they first use Dataflow:

  1. Permissions: missing IAM roles are very common. The Dataflow worker service account needs access to read from GCS and write to BigQuery.
  2. Region mismatch: keep your Dataflow job, GCS bucket, and BigQuery dataset in sensible locations. Cross-region setup can create latency and cost surprises.
  3. Dependency packaging: once the pipeline grows, external Python dependencies need to be packaged properly.
  4. Bad record handling: demos skip this, but production pipelines need a clear reject strategy.
  5. Cost awareness: Dataflow is managed, not free. For very tiny daily jobs, a simpler tool may cost less and be easier to maintain.

What I would change in production

For a real production pipeline, I would make a few changes immediately:

  • Parameterize the input path instead of hardcoding one file
  • Write invalid rows to a separate dead-letter sink
  • Add unit tests for the parsing logic
  • Use Terraform or another IaC tool for buckets, service accounts, and datasets
  • Add CI/CD so deployments are repeatable
  • Add monitoring and alerting for failed jobs

I would also think carefully about whether batch is still the right mode. If files arrive every few minutes, a streaming design or event-based ingestion pattern might be better than running many small batch jobs.

Conclusion

Dataflow is a good starting point when you need more than a basic file load but do not want to manage your own processing cluster. In this article we saw a small batch example that reads a CSV from GCS, transforms the rows, and writes them to BigQuery. The demo is simple, but the same Beam pattern can be expanded for validation, enrichment, joins, and more reliable production handling once the pipeline grows.

This post is licensed under CC BY 4.0 by the author.