Operational Guide: Configuring Spatial Pipeline Health Checks in Apache Airflow for MTTR Reduction

Spatial ETL pipelines degrade differently than standard tabular workflows. Geometry corruption, coordinate reference system (CRS) mismatches, spatial index bloat, and topology violations compound silently until downstream GIS services begin returning null geometries or misaligned features. For SREs, GIS platform administrators, and compliance teams, the operational priority is immediate mean time to resolution (MTTR) reduction through deterministic health checks that fail fast, route precisely, and isolate spatial faults before they cascade. This guide details the exact configuration, threshold tuning, and instrumentation required to deploy production-grade spatial pipeline health checks in Apache Airflow.

1. DAG Architecture & Execution Isolation

Align your DAG architecture with established Geospatial Observability Architecture & Fundamentals principles by strictly separating structural validation from semantic business logic. In Airflow, spatial health checks must run immediately after raw ingestion and before downstream transformation tasks. Isolate them into a dedicated DAG group with strict concurrency controls to prevent lock contention during heavy spatial index rebuilds.

flowchart TD
  ING["Raw ingestion"] --> HC["Spatial health checks"]
  HC --> V{"Validity ratio within 0.5%?"}
  V -- "no" --> SKIP["AirflowSkipException · pause downstream"]
  V -- "yes" --> C{"Single SRID and stable extent?"}
  C -- "no" --> HALT["Raise · realign projection"]
  C -- "yes" --> DOWN["Trigger downstream transforms and tiles"]
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from airflow.exceptions import AirflowSkipException
from airflow.providers.postgres.hooks.postgres import PostgresHook
import logging

default_args = {
    "owner": "sre-gis-ops",
    "retries": 2,
    "retry_delay": timedelta(seconds=30),
    "retry_exponential_backoff": True,
    "sla": timedelta(hours=2),
    "on_failure_callback": lambda context: logging.critical(
        f"Spatial health check failed: {context['task_instance']}"
    )
}

dag = DAG(
    dag_id="spatial_ingestion_health_checks",
    default_args=default_args,
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    tags=["spatial-observability", "mttr-reduction", "postgis"],
    catchup=False,
    max_active_runs=1,
    max_active_tasks=1,
)

2. Spatial Trust Boundaries & Validation Queries

Define spatial data trust boundaries at the ingestion layer to prevent invalid geometries from propagating into analytical stores. Construct validation tasks that query the target PostGIS table using deterministic spatial predicates. The following PythonOperator executes a batch validation against raw features, calculating validity ratios and CRS consistency.

def validate_spatial_integrity(**kwargs):
    hook = PostgresHook(postgres_conn_id="gis_warehouse")
    conn = hook.get_conn()
    cursor = conn.cursor()

    cursor.execute("""
        SELECT
            COUNT(*) AS total_features,
            SUM(CASE WHEN NOT ST_IsValid(geom) OR ST_IsEmpty(geom) THEN 1 ELSE 0 END) AS invalid_features,
            COUNT(DISTINCT ST_SRID(geom)) AS unique_srids,
            MIN(ST_SRID(geom)) AS expected_srid
        FROM raw_ingestion_layer
        WHERE ingestion_batch_id = %s;
    """, (kwargs['dag_run'].run_id,))

    total, invalid, unique_srids, expected = cursor.fetchone()
    invalid_ratio = invalid / total if total > 0 else 0.0

    if invalid_ratio > 0.005:
        raise AirflowSkipException(
            f"Geometry validity threshold breached: {invalid_ratio:.4%} invalid. "
            f"Skipping downstream consumers to prevent topology corruption."
        )

    if unique_srids > 1:
        raise ValueError(
            f"CRS mismatch detected: {unique_srids} distinct SRIDs found. "
            f"Expected uniform {expected}. Halt pipeline for projection realignment."
        )

    logging.info(f"Spatial trust boundaries validated: {total} features, {invalid_ratio:.4%} invalid ratio.")
    cursor.close()
    conn.close()

validate_task = PythonOperator(
    task_id="validate_spatial_integrity",
    python_callable=validate_spatial_integrity,
    dag=dag
)

3. Threshold Mapping & Alert Routing

Threshold configuration drives MTTR reduction. Map each spatial metric to a precise alerting tier to ensure the right team receives actionable context. Reference the Geospatial Metric Taxonomy for ETL to standardize metric naming across your observability stack. Configure routing rules that bypass generic SRE queues when faults are strictly spatial.

Metric Threshold Alert Tier Routing Target
Geometry Validity Ratio > 0.005 P1 Critical GIS Platform Admins
Spatial Index Fragmentation (idx_scan / seq_scan) < 0.65 P2 Warning Database Reliability Engineers
Topology Violation Count (ST_Relate failures) > 50 P1 Critical Compliance & QA Ops
Ingestion Latency (PostGIS pg_stat_activity) > 300s P2 Warning Data Engineering

Implement index fragmentation monitoring using PostgreSQL system catalogs. Route alerts via Airflow’s on_failure_callback or a dedicated Slack webhook:

def check_index_fragmentation(**kwargs):
    hook = PostgresHook(postgres_conn_id="gis_warehouse")
    conn = hook.get_conn()
    cursor = conn.cursor()
    cursor.execute("""
        SELECT
            relname,
            idx_scan,
            seq_scan,
            CASE WHEN idx_scan + seq_scan = 0 THEN 0
                 ELSE idx_scan::float / (idx_scan + seq_scan)
            END AS index_utilization
        FROM pg_stat_user_tables
        WHERE idx_scan + seq_scan > 1000
          AND CASE WHEN idx_scan + seq_scan = 0 THEN 0
                   ELSE idx_scan::float / (idx_scan + seq_scan)
              END < 0.65;
    """)

    degraded_tables = cursor.fetchall()
    if degraded_tables:
        table_names = [row[0] for row in degraded_tables]
        raise RuntimeError(
            f"Spatial index degradation detected: {', '.join(table_names)}. "
            "Initiate REINDEX CONCURRENTLY."
        )
    cursor.close()
    conn.close()

4. Observability Scoping & Extent Drift Detection

Scope health checks to specific bounding boxes using ST_Extent comparisons against known historical extents. Parcel boundaries, cadastral layers, and LiDAR-derived footprints require different tolerance thresholds than administrative boundaries. Flag any spatial extent drift exceeding 5.0 meters horizontally or 5% area expansion, as this typically indicates CRS misassignment or projection transformation errors during ingestion.

def detect_extent_drift(**kwargs):
    hook = PostgresHook(postgres_conn_id="gis_warehouse")
    conn = hook.get_conn()
    cursor = conn.cursor()

    cursor.execute("""
        WITH current_extent AS (
            SELECT ST_Extent(geom) AS bbox FROM raw_ingestion_layer
            WHERE ingestion_batch_id = %s
        ),
        historical_extent AS (
            SELECT ST_Extent(geom) AS bbox FROM validated_master_layer
            WHERE feature_type = 'cadastral'
        )
        SELECT
            ST_Distance(
                ST_Transform(ST_Centroid(c.bbox::geometry), 3857),
                ST_Transform(ST_Centroid(h.bbox::geometry), 3857)
            ) AS centroid_drift_meters,
            ST_Area(c.bbox::geometry) / NULLIF(ST_Area(h.bbox::geometry), 0) AS area_ratio
        FROM current_extent c, historical_extent h;
    """, (kwargs['dag_run'].run_id,))

    drift_m, area_ratio = cursor.fetchone()

    if drift_m > 5.0 or (area_ratio is not None and abs(area_ratio - 1.0) > 0.05):
        raise AirflowSkipException(
            f"Spatial extent drift detected: {drift_m:.2f}m centroid shift, "
            f"{area_ratio:.2f} area ratio. Halt for CRS verification."
        )
    cursor.close()
    conn.close()

5. OpenTelemetry Instrumentation & Advanced Debugging

Instrument spatial validation tasks with OpenTelemetry to trace metric lag and isolate bottlenecks in geometry processing. Attach custom attributes to spans for feature count, validation duration, and CRS codes. This enables precise correlation between Airflow task execution and downstream GIS API latency.

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.trace import Status, StatusCode

trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer("spatial-etl-health")

def otel_instrumented_validation(**kwargs):
    with tracer.start_as_current_span("spatial_integrity_check") as span:
        span.set_attribute("spatial.feature_type", "cadastral")
        span.set_attribute("spatial.validation_engine", "postgis")

        try:
            validate_spatial_integrity(**kwargs)
            span.set_status(Status(StatusCode.OK))
        except Exception as e:
            span.set_status(Status(StatusCode.ERROR, str(e)))
            span.record_exception(e)
            raise

For advanced debugging of spatial metric lag, correlate Airflow execution timestamps with PostGIS pg_stat_statements and query planner outputs. Use EXPLAIN (ANALYZE, BUFFERS) on validation queries to identify missing spatial indexes or inefficient ST_Intersects joins that cause pipeline timeouts.

6. Incident Playbook & Fallback Execution

When health checks trigger, execute the following deterministic runbook to minimize blast radius and restore pipeline stability:

  1. Acknowledge & Isolate: Confirm AirflowSkipException propagation. Verify downstream DAGs are paused via Airflow UI or CLI: airflow dags pause <downstream_dag_id>.
  2. Diagnose Fault Class:
    • CRS Mismatch: Query ST_SRID distribution. Run ST_Transform(geom, <target_srid>) in a staging table. Validate with ST_IsValid.
    • Topology Violations: Execute ST_MakeValid() or ST_SnapToGrid() on offending geometries. Log feature IDs for compliance review.
    • Index Bloat: Trigger REINDEX CONCURRENTLY on spatial indexes. Monitor pg_stat_progress_create_index for lock contention.
  3. Activate Fallback Chains: If primary spatial APIs fail validation, route requests to cached GeoPackage snapshots stored in S3. Implement circuit breakers using Airflow’s BranchPythonOperator to switch between live PostGIS and fallback static stores.
  4. Verify & Resume: Re-run health checks against the remediated dataset. Once thresholds pass, unpause downstream DAGs and clear skipped task instances: airflow tasks clear <dag_id> --start-date <date> --end-date <date>.

Maintain strict adherence to validation thresholds and observability scoping rules to prevent silent degradation. By embedding deterministic spatial checks directly into the Airflow execution graph, teams achieve predictable MTTR, enforce data quality at the boundary, and maintain compliance across multi-region GIS deployments.