Post

Building Your First Airflow DAG for ETL: A Practical Walkthrough

In this article, we will build a simple but real ETL pipeline using Apache Airflow. If you have been writing cron jobs or one-off scripts to move data around and are looking for something more manageable, Airflow is a solid choice. We will go from setting up Airflow locally to writing a DAG that extracts data from an API, transforms it, and loads it into a PostgreSQL table. By the end, you will have a working pipeline and a decent understanding of how Airflow DAGs are structured.

Let us get into it.

Setting Up Airflow Locally

The quickest way to get Airflow running on your machine is with the Astro CLI. It wraps Docker Compose and sets up the scheduler, webserver, and a Postgres metadata database in one go.

1
2
3
4
brew install astro
astro dev init my-first-dag
cd my-first-dag
astro dev start

Once the containers are up, the Airflow UI will be available at http://localhost:8080. The default credentials are admin / admin.

If you do not want to use Astro, you can run Airflow standalone with pip install apache-airflow and then airflow standalone, but Astro is what I use for local development because it mirrors a production-like setup pretty closely.

What a DAG Actually Is

A DAG — Directed Acyclic Graph — is just a collection of tasks with defined dependencies and a schedule. Each task is an operator: a Python function, a SQL query, a bash command, or a call to an external system. The DAG defines the order those tasks run in.

Here is the simplest possible DAG:

1
2
3
4
5
6
7
8
9
10
11
12
13
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from datetime import datetime

with DAG(
    dag_id="my_first_dag",
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
    catchup=False,
):
    start = EmptyOperator(task_id="start")
    end = EmptyOperator(task_id="end")
    start >> end

You drop this into your dags/ folder, and Airflow picks it up. The >> operator means “run start, then run end.” That is the core idea — everything builds on that.

Our ETL Scenario

Let us build something closer to real work. We will:

  1. Extract weather data from the Open-Meteo API (free, no API key needed)
  2. Transform the JSON response into a flat structure
  3. Load the results into a Postgres table

I picked Open-Meteo because it is free and lets you focus on the Airflow part instead of fighting with API authentication for a demo. In production you would swap this out for whatever API or database you are pulling from.

Step 1: Extract — The API Call

We will use the HttpHook from Airflow’s HTTP provider. First, add an HTTP connection in the Airflow UI under Admin → Connections. Set the Conn ID to open_meteo_api and the Host to https://api.open-meteo.com.

For production, you would define this connection as an environment variable or in a secrets backend, not through the UI.

Here is our extract task:

1
2
3
4
5
6
7
8
9
10
from airflow.providers.http.operators.http import SimpleHttpOperator

extract_weather = SimpleHttpOperator(
    task_id="extract_weather",
    http_conn_id="open_meteo_api",
    endpoint="/v1/forecast?latitude=52.52&longitude=13.41&hourly=temperature_2m",
    method="GET",
    response_filter=lambda response: response.json(),
    log_response=True,
)

The response_filter is handy — it lets you transform the API response before it gets passed to the next task via XCom. Airflow stores the return value in its metadata database and the next task can pull it out.

A word of caution: XComs are stored in the Airflow metadata DB. For large payloads (say, a multi-megabyte API response), this gets expensive and slow. For those cases, push the data to S3 or GCS and pass the path through XCom instead.

Step 2: Transform — Shape the Data

The Open-Meteo API returns a nested JSON with hourly time and temperature arrays. We want each row to be {time, temperature} so it maps neatly to a database table.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from airflow.operators.python import PythonOperator

def transform_weather_data(ti):
    raw = ti.xcom_pull(task_ids="extract_weather")
    hourly = raw.get("hourly", {})
    times = hourly.get("time", [])
    temps = hourly.get("temperature_2m", [])

    rows = [
        {"recorded_at": t, "temperature_c": temp}
        for t, temp in zip(times, temps)
    ]
    
    ti.xcom_push(key="transformed_rows", value=rows)
    return len(rows)

transform_data = PythonOperator(
    task_id="transform_data",
    python_callable=transform_weather_data,
)

ti.xcom_pull grabs the output of the extract task. We parse out the arrays, zip them together, and push the result back. Returning the row count gives us a quick sanity check in the logs.

In a production ETL you would probably add validation here — check for nulls, validate schema, maybe flag anomalies. For a demo this is enough to get the idea across.

Step 3: Load — Write to Postgres

We could insert rows one by one, but that is slow. The PostgresHook has an insert_rows method that takes a list of rows and does a bulk insert.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from airflow.providers.postgres.hooks.postgres import PostgresHook

def load_to_postgres(ti):
    rows = ti.xcom_pull(task_ids="transform_data", key="transformed_rows")
    
    pg_hook = PostgresHook(postgres_conn_id="my_postgres")
    
    create_sql = """
    CREATE TABLE IF NOT EXISTS weather_hourly (
        id SERIAL PRIMARY KEY,
        recorded_at TIMESTAMP,
        temperature_c FLOAT,
        ingested_at TIMESTAMP DEFAULT NOW()
    );
    """
    pg_hook.run(create_sql)
    
    pg_hook.insert_rows(
        table="weather_hourly",
        rows=[(r["recorded_at"], r["temperature_c"]) for r in rows],
        target_fields=["recorded_at", "temperature_c"],
    )

load_data = PythonOperator(
    task_id="load_data",
    python_callable=load_to_postgres,
)

You need a Postgres connection configured in Airflow with conn_id set to my_postgres. For local development you can add a Postgres container to your Astro docker-compose.override.yml or point to one running on your machine.

ApproachWhen to use it
SimpleHttpOperatorQuick API calls with minimal auth
PythonOperator with requestsComplex auth, pagination, retry logic
Custom Airflow operatorsReusable logic shared across teams
Sensors + triggersWaiting for external file or data arrival

Tying It All Together

Here is the full DAG with all three tasks wired up:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime

def transform_weather_data(ti):
    raw = ti.xcom_pull(task_ids="extract_weather")
    hourly = raw.get("hourly", {})
    times = hourly.get("time", [])
    temps = hourly.get("temperature_2m", [])
    rows = [
        {"recorded_at": t, "temperature_c": temp}
        for t, temp in zip(times, temps)
    ]
    ti.xcom_push(key="transformed_rows", value=rows)
    return len(rows)

def load_to_postgres(ti):
    rows = ti.xcom_pull(task_ids="transform_data", key="transformed_rows")
    pg_hook = PostgresHook(postgres_conn_id="my_postgres")
    pg_hook.run("""
        CREATE TABLE IF NOT EXISTS weather_hourly (
            id SERIAL PRIMARY KEY,
            recorded_at TIMESTAMP,
            temperature_c FLOAT,
            ingested_at TIMESTAMP DEFAULT NOW()
        );
    """)
    pg_hook.insert_rows(
        table="weather_hourly",
        rows=[(r["recorded_at"], r["temperature_c"]) for r in rows],
        target_fields=["recorded_at", "temperature_c"],
    )

with DAG(
    dag_id="weather_etl",
    start_date=datetime(2025, 4, 1),
    schedule="@hourly",
    catchup=False,
    tags=["etl", "weather"],
):
    extract_weather = SimpleHttpOperator(
        task_id="extract_weather",
        http_conn_id="open_meteo_api",
        endpoint="/v1/forecast?latitude=52.52&longitude=13.41&hourly=temperature_2m",
        method="GET",
        response_filter=lambda response: response.json(),
        log_response=True,
    )

    transform_data = PythonOperator(
        task_id="transform_data",
        python_callable=transform_weather_data,
    )

    load_data = PythonOperator(
        task_id="load_data",
        python_callable=load_to_postgres,
    )

    extract_weather >> transform_data >> load_data

Drop this file into your dags/ folder and Airflow will pick it up within a couple of minutes. You will see it in the UI under the DAGs list.

Things to Watch Out For

XCom size limits. Airflow caps XCom entries at 48 KB by default for SQLite and Postgres backends. Larger payloads need a custom XCom backend (S3, GCS) or you should write files to object storage and pass the path instead. I learned this the hard way when a DAG silently dropped data.

Idempotency. Your load step should handle re-runs gracefully. Ours uses CREATE TABLE IF NOT EXISTS and does a plain insert — fine for a demo, but in production you would add deduplication logic or use a merge pattern. If someone clears the task and re-runs it, you do not want duplicate rows.

Connection management. The PostgresHook creates a new connection each time by default. For high-throughput pipelines, connection pooling matters. Airflow’s provider hooks handle this decently for smaller workloads, but once you hit hundreds of parallel tasks you will want to look at PGBouncer or similar.

Secrets. Do not hardcode API keys or database passwords in your DAG files. Use Airflow connections and variables, or better yet, a secrets backend like HashiCorp Vault or AWS Secrets Manager.

Monitoring. By default, if a task fails, Airflow retries it — configurable with retries on the task. But you also want alerting — Slack webhooks or PagerDuty integrations — so you know when something breaks. Airflow will happily keep retrying a task silently otherwise.

What Changes in Production

This demo uses the Astro CLI and local Postgres, which is fine for development. In production you would:

  • Run Airflow on Kubernetes (Astro’s managed offering or self-hosted with the official Helm chart)
  • Use a managed Postgres or Cloud SQL for the metadata database
  • Store DAGs in a Git repository synced via Git-sync or a CI/CD pipeline
  • Replace the local Postgres target with Redshift, BigQuery, or Snowflake depending on your stack
  • Add data quality checks with something like Great Expectations or Soda before loading
  • Configure proper retry policies and SLAs on your DAGs

The DAG structure does not change much — operators and dependencies stay the same. Most of the difference is in the infrastructure around it.

Wrapping Up

You now have a working Airflow DAG that pulls data from an API, transforms it, and loads it into a database. It is a simple pattern but it covers the fundamentals: extract via operators, transform in Python, load through hooks, and wire everything together with >>.

The code in this article gives you a starting point. The real learning happens when you add your own data sources, handle failures, and figure out what monitoring you need. Airflow has its quirks — the scheduler can be finicky, the DAG parsing can get slow if you have hundreds of files — but for most ETL workloads it does the job well.

Pick a small pipeline you run manually today and try moving it into Airflow. That is the fastest way to get comfortable with it.

This post is licensed under CC BY 4.0 by the author.