Post

Getting Started with GCP Dataflow for Batch Pipelines: A Practical Guide

When I first started working with GCP, I kept hearing about Dataflow but wasn’t quite sure where it fit. We had Cloud Functions for light event-driven work, Composer (managed Airflow) for orchestration, and BigQuery for SQL analytics. Why reach for Dataflow?

In this article, I will walk through what Dataflow is, when you would use it for batch pipelines, and how to get a simple pipeline running. By the end, you should have a working pipeline that reads from a GCS file, does a basic transformation, and writes the output somewhere useful.

What is Dataflow and when should you use it?

Dataflow is GCP’s managed service for running Apache Beam pipelines. Apache Beam is an open-source programming model that lets you define both batch and streaming data processing jobs using a unified API. The nice part is that Beam abstracts away the execution details — you write your pipeline logic once, and Dataflow handles the scaling, worker provisioning, and fault tolerance.

So when would you pick Dataflow over, say, a Cloud Function or a BigQuery query?

ApproachGood forNot great for
Cloud FunctionsLight transformations, event-driven triggersLarge datasets, long-running jobs (9 min timeout)
BigQuery SQLAggregations, joins over structured dataComplex per-row logic, custom I/O formats
Dataflow (batch)Heavy ETL, custom transformations, large file processingQuick one-off queries, very small datasets

If your batch job needs to process gigabytes of data, do non-trivial row-level transformations, or work with file formats that BigQuery doesn’t natively handle well, Dataflow is a solid choice. If you can do it in a BigQuery query, do it there — it will be simpler and cheaper.

Setting up your first batch pipeline

Let us build a pipeline that reads a CSV file from GCS, cleans up the rows, and writes the result as Parquet files back to GCS. This is a common pattern — taking raw ingested data and landing it in a more queryable format.

Prerequisites

  1. A GCP project with billing enabled
  2. Dataflow API enabled
  3. A GCS bucket for input, output, and temporary files
  4. Python 3.9+ on your local machine

Enable the APIs:

1
2
3
gcloud services enable dataflow.googleapis.com
gcloud services enable compute.googleapis.com
gcloud services enable cloudresourcemanager.googleapis.com

Create a bucket if you don’t have one:

1
gsutil mb gs://your-bucket-name/

Install Apache Beam

1
pip install apache-beam[gcp]

The pipeline code

Create a file called batch_pipeline.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions

def parse_csv_line(line):
    """Parse a CSV line and return a dict."""
    fields = line.split(',')
    if len(fields) < 4:
        return None
    return {
        'user_id': fields[0].strip(),
        'event_type': fields[1].strip(),
        'amount': float(fields[2].strip()),
        'timestamp': fields[3].strip()
    }

def filter_valid_records(record):
    """Filter out invalid records."""
    if record is None:
        return False
    if record['amount'] <= 0:
        return False
    if not record['timestamp']:
        return False
    return True

def format_for_output(record):
    """Format record for Parquet output."""
    return {
        'user_id': record['user_id'],
        'event_type': record['event_type'],
        'amount': record['amount'],
        'timestamp': record['timestamp']
    }

def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument('--input', required=True, help='GCS path to input CSV')
    parser.add_argument('--output', required=True, help='GCS path for output Parquet')
    known_args, pipeline_args = parser.parse_known_args(argv)

    options = PipelineOptions(pipeline_args)

    with beam.Pipeline(options=options) as p:
        (
            p
            | 'Read CSV' >> beam.io.ReadFromText(known_args.input)
            | 'Parse lines' >> beam.Map(parse_csv_line)
            | 'Filter invalid' >> beam.Filter(filter_valid_records)
            | 'Format output' >> beam.Map(format_for_output)
            | 'Write Parquet' >> beam.io.WriteToParquet(
                known_args.output,
                schema={
                    'user_id': ('STRING',),
                    'event_type': ('STRING',),
                    'amount': ('DOUBLE',),
                    'timestamp': ('STRING',)
                }
            )
        )

if __name__ == '__main__':
    run()

This pipeline does four things: reads from a GCS text file, parses each line into a structured record, filters out invalid data, and writes the result as Parquet.

Creating some test data

Let us upload a small CSV to try things out locally before running on Dataflow:

user_id,event_type,amount,timestamp
u1,purchase,29.99,2025-01-15T10:30:00Z
u2,view,-1.00,2025-01-15T11:00:00Z
u3,purchase,45.00,2025-01-15T12:00:00Z
u4,,15.50,
u5,purchase,0.00,2025-01-15T14:00:00Z

Note the bad rows: u2 has a negative amount, u4 is missing an event type and timestamp, and u5 has a zero amount. Our filter should catch all of these, leaving only u1 and u3 in the output.

Upload it:

1
gsutil cp sample_data.csv gs://your-bucket-name/input/

Running locally first

Before deploying to Dataflow, running locally with DirectRunner helps catch issues quickly:

1
2
3
python batch_pipeline.py \
  --input gs://your-bucket-name/input/sample_data.csv \
  --output gs://your-bucket-name/output/parquet

If no extra arguments are passed, Beam defaults to DirectRunner. This runs the entire pipeline on your local machine — great for testing, not so great for 50 GB files.

Running on Dataflow

To run on Dataflow, you need to specify the runner and your GCP project:

1
2
3
4
5
6
7
8
9
python batch_pipeline.py \
  --input gs://your-bucket-name/input/sample_data.csv \
  --output gs://your-bucket-name/output/parquet \
  --runner DataflowRunner \
  --project your-gcp-project-id \
  --region us-central1 \
  --temp_location gs://your-bucket-name/tmp \
  --staging_location gs://your-bucket-name/staging \
  --job_name simple-batch-pipeline-001

A few things to note about those flags:

  • temp_location: Dataflow uses this for temporary files during execution. Keep it in the same region as your data to avoid cross-region charges.
  • staging_location: Where your pipeline code and dependencies get staged. Also best in the same region.
  • job_name: Must be unique per project. Include a timestamp or version number if you will run this repeatedly.

Once submitted, go to the Dataflow section in the GCP Console. You should see your job listed with a status graph showing each step of the pipeline and how many elements passed through.

Things I learned the hard way

Worker machine type matters more than you think. The default n1-standard-1 is fine for light work, but if you are doing CPU-heavy transformations (decompression, complex parsing, encryption), bumping to n1-standard-4 or even n1-highmem instances can cut your job time significantly. Play around with the --worker_machine_type flag.

Shuffle is expensive. If you use GroupByKey or CoGroupByKey, Dataflow needs to shuffle data between workers. This is where costs add up. Try to avoid grouping unless you really need it. For many batch use cases, you can do everything with Map and Filter.

Streaming vs. batch — the line is blurry but the cost is not. Batch jobs spin up workers, do the work, and shut down. Streaming jobs keep workers running indefinitely. If you run a streaming job with only a few hundred events per hour, you are paying for idle resources. Make sure streaming is actually what you need.

Watch the temporary bucket size. Dataflow does not automatically clean up temp files. If you run many jobs, that temp bucket can accumulate gigabytes of old staging files. Set a lifecycle rule on your temp bucket to delete objects older than a few days.

Production considerations

Here is what changes when moving a pipeline like this from demo to production:

  1. Templating: Instead of running the script directly each time, use Dataflow templates. This lets you trigger jobs via the API, Cloud Scheduler, or Cloud Composer without needing the source code on hand.

  2. Error handling: Our simple pipeline drops invalid rows silently. In production, you want a dead-letter queue — write bad records to a separate GCS path or PubSub topic so you can inspect and recover them.

  3. Monitoring: Set up Cloud Monitoring alerts for pipeline failures and data freshness (e.g., if no output appears in the expected window).

  4. Idempotency: Make sure your pipeline can handle being re-run on the same data without producing duplicates. This usually means overwriting the output with WriteDisposition.WRITE_TRUNCATE or partitioning by run date.

  5. VPC and private IPs: For anything touching internal systems, run workers on private IPs only (--no_use_public_ips).

  6. IAM: Give your Dataflow worker service account the minimum permissions it needs — read access to the input bucket, write access to the output bucket, and nothing more.

Wrapping up

Dataflow is a powerful tool in the GCP ecosystem, and once you get past the initial setup, it becomes a reliable workhorse for batch ETL. The Beam programming model takes a little getting used to, especially if you are coming from pure SQL or procedural scripting, but the separation of logic from execution details is genuinely useful when you later decide to move a pipeline from batch to streaming.

Start with DirectRunner locally, move to DataflowRunner when you are confident your logic is correct, and keep an eye on worker specs and temp storage. The rest is just iteration.

This post is licensed under CC BY 4.0 by the author.