Post

Using AWS Step Functions to Orchestrate a Data Pipeline

In this article let us see how to use AWS Step Functions for orchestrating a simple data pipeline and why this approach is useful when you want something more structured than a set of Lambda triggers. Many teams start with a few Lambda functions and some EventBridge schedules, but after some time the flow becomes harder to understand, retry, and monitor. Step Functions gives us a way to make the workflow visible and manageable.

For our use case, let us assume we receive a CSV file into S3 every day. Once the file arrives, we want to validate it, load the raw data into a staging table, transform it, and finally send a notification if the load succeeds or fails. This is not a huge pipeline, but it is enough to show where Step Functions fits well.

Why use Step Functions

When the pipeline has multiple steps and each step can fail in a different way, having the workflow represented as a state machine becomes useful. Instead of burying the control flow inside one large Lambda function, we can keep each step separate and let Step Functions handle sequencing, retries, and branching.

A simple comparison looks like this:

ApproachGood forLimitation
Lambda onlyVery small flows with 1 or 2 stepsLogic becomes messy when retries and branching increase
Step FunctionsEvent-driven pipelines with clear stagesLong-running heavy compute still needs other services
AirflowComplex DAGs, scheduling, dependencies across many jobsMore infrastructure and operational effort

From my experience, Step Functions is a good middle ground when the workflow is not large enough to justify a full Airflow setup, but still needs better control than chaining Lambdas together.

Pipeline architecture

The simple flow we will build is below:

  1. A file lands in S3
  2. EventBridge or S3 notification starts the state machine
  3. A Lambda validates the file structure
  4. A Lambda loads the data into a raw table
  5. A Lambda runs a transformation step
  6. A Choice state checks if row counts are acceptable
  7. SNS sends a success or failure notification

You could also replace some Lambda tasks with Glue jobs, ECS tasks, or AWS Batch if the processing is heavier. The nice thing is that Step Functions does not force every step to be Lambda.

Create the state machine

A Step Functions workflow is defined using Amazon States Language, which is JSON based. Below is a simplified definition:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
{
  "Comment": "Daily CSV pipeline",
  "StartAt": "ValidateFile",
  "States": {
    "ValidateFile": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-southeast-2:123456789012:function:validate-file",
      "Retry": [
        {
          "ErrorEquals": ["Lambda.ServiceException", "Lambda.TooManyRequestsException"],
          "IntervalSeconds": 2,
          "MaxAttempts": 3,
          "BackoffRate": 2
        }
      ],
      "Next": "LoadRawData"
    },
    "LoadRawData": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-southeast-2:123456789012:function:load-raw-data",
      "Next": "TransformData"
    },
    "TransformData": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-southeast-2:123456789012:function:transform-data",
      "Next": "CheckQuality"
    },
    "CheckQuality": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.bad_records",
          "NumericEquals": 0,
          "Next": "NotifySuccess"
        }
      ],
      "Default": "NotifyFailure"
    },
    "NotifySuccess": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "arn:aws:sns:ap-southeast-2:123456789012:data-pipeline-status",
        "Message": "Pipeline completed successfully"
      },
      "End": true
    },
    "NotifyFailure": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "arn:aws:sns:ap-southeast-2:123456789012:data-pipeline-status",
        "Message": "Pipeline failed quality checks"
      },
      "End": true
    }
  }
}

Even in this small example, the benefits are already visible. We can see the steps clearly, define retry behavior on only the states that need it, and branch based on output without writing a lot of control logic ourselves.

Example Lambda for validation

The first Lambda could validate whether the file exists, whether the header matches the expected columns, and whether the file is not empty.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import csv
import boto3

s3 = boto3.client("s3")

def handler(event, context):
    bucket = event["bucket"]
    key = event["key"]

    obj = s3.get_object(Bucket=bucket, Key=key)
    rows = obj["Body"].read().decode("utf-8").splitlines()

    reader = csv.reader(rows)
    header = next(reader)

    expected = ["customer_id", "order_id", "amount", "order_ts"]
    if header != expected:
        raise Exception(f"Invalid header: {header}")

    return {
        "bucket": bucket,
        "key": key,
        "header_valid": True
    }

In a simple demo, this is enough. In a production use case, I would also validate file size, file naming convention, duplicate delivery, and maybe whether the file was already processed earlier.

Loading and transforming the data

The next step could load the CSV into a raw table in RDS, Redshift, or even S3 backed tables through Athena depending on the design. For a warehouse style flow, I usually prefer a raw table first and then a transformed table after validations.

A transform query may look like this:

1
2
3
4
5
6
7
8
9
insert into analytics.orders_clean
select
    cast(customer_id as bigint) as customer_id,
    cast(order_id as bigint) as order_id,
    cast(amount as decimal(10,2)) as amount,
    cast(order_ts as timestamp) as order_ts,
    current_timestamp as loaded_at
from staging.orders_raw
where amount is not null;

If the transformation is small, a Lambda calling the database might be okay. If the transformation is larger, I would consider Glue, EMR, ECS, or a warehouse native job instead of pushing too much work into Lambda.

Triggering the workflow

There are multiple ways to start the state machine. If the file arrival is the trigger, then S3 notifications can call Lambda and that Lambda can start the execution. Another option is EventBridge if you want a more flexible event routing pattern.

A Python snippet for starting a state machine execution is below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import json
import boto3

sfn = boto3.client("stepfunctions")

def handler(event, context):
    response = sfn.start_execution(
        stateMachineArn="arn:aws:states:ap-southeast-2:123456789012:stateMachine:daily-csv-pipeline",
        input=json.dumps({
            "bucket": event["bucket"],
            "key": event["key"]
        })
    )
    return response

Things to be careful about

There are a few practical limitations with this approach.

  1. Step Functions is great for orchestration, not for heavy data processing itself. The actual compute still has to happen somewhere else.
  2. State input and output size is limited, so do not try to pass large datasets between steps. Pass file locations, IDs, or metadata instead.
  3. If you use many state transitions at large scale, the cost can add up. For small and medium pipelines this is usually acceptable, but it is still worth estimating.
  4. Debugging becomes harder if each Lambda logs differently. It is better to standardize logging and include execution IDs in every log line.
  5. Idempotency matters. If a retry happens, your load step should not create duplicates.

This last point is the one I would pay extra attention to. Retries are useful, but if your downstream step is not safe to run again, retries can make the problem worse instead of better.

Production considerations

For a real project, I would add a few more things beyond the simple demo:

  • Dead-letter handling for repeated failures
  • CloudWatch dashboards and alerts
  • Separate dev and prod state machines
  • IAM roles with only the needed permissions
  • A metadata table to track file processing status
  • Better data quality checks than just row counts

I would also think carefully about whether Standard or Express workflows make more sense. Standard workflows are usually easier for pipelines where you want durable execution history. Express can be useful for high-volume event processing, but for many data engineering cases Standard is the safer starting point.

Conclusion

Step Functions is a good option when your pipeline needs visible orchestration, retries, and simple branching without bringing in a heavier scheduler. It will not replace every orchestration tool, but for AWS-based pipelines with a few clear stages, it gives a clean and maintainable way to control the flow. If you already have Lambda based ingestion jobs becoming difficult to manage, this is one of the first services I would try.

This post is licensed under CC BY 4.0 by the author.