Operational Guide: Configuring Spatial Pipeline Health Checks in Apache Airflow for MTTR Reduction
Spatial ETL pipelines degrade differently than standard tabular workflows. Geometry corruption, coordinate reference system (CRS) mismatches, spatial index bloat, and topology violations compound silently until downstream GIS services begin returning null geometries or misaligned features. For SREs, GIS platform administrators, and compliance teams, the operational priority is immediate mean time to resolution (MTTR) reduction through deterministic health checks that fail fast, route precisely, and isolate spatial faults before they cascade. This guide details the exact configuration, threshold tuning, and instrumentation required to deploy production-grade spatial pipeline health checks in Apache Airflow.
1. DAG Architecture & Execution Isolation
Align your DAG architecture with established Geospatial Observability Architecture & Fundamentals principles by strictly separating structural validation from semantic business logic. In Airflow, spatial health checks must run immediately after raw ingestion and before downstream transformation tasks. Isolate them into a dedicated DAG group with strict concurrency controls to prevent lock contention during heavy spatial index rebuilds.
flowchart TD
ING["Raw ingestion"] --> HC["Spatial health checks"]
HC --> V{"Validity ratio within 0.5%?"}
V -- "no" --> SKIP["AirflowSkipException · pause downstream"]
V -- "yes" --> C{"Single SRID and stable extent?"}
C -- "no" --> HALT["Raise · realign projection"]
C -- "yes" --> DOWN["Trigger downstream transforms and tiles"]
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from airflow.exceptions import AirflowSkipException
from airflow.providers.postgres.hooks.postgres import PostgresHook
import logging
default_args = {
"owner": "sre-gis-ops",
"retries": 2,
"retry_delay": timedelta(seconds=30),
"retry_exponential_backoff": True,
"sla": timedelta(hours=2),
"on_failure_callback": lambda context: logging.critical(
f"Spatial health check failed: {context['task_instance']}"
)
}
dag = DAG(
dag_id="spatial_ingestion_health_checks",
default_args=default_args,
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
tags=["spatial-observability", "mttr-reduction", "postgis"],
catchup=False,
max_active_runs=1,
max_active_tasks=1,
)
2. Spatial Trust Boundaries & Validation Queries
Define spatial data trust boundaries at the ingestion layer to prevent invalid geometries from propagating into analytical stores. Construct validation tasks that query the target PostGIS table using deterministic spatial predicates. The following PythonOperator executes a batch validation against raw features, calculating validity ratios and CRS consistency.
def validate_spatial_integrity(**kwargs):
hook = PostgresHook(postgres_conn_id="gis_warehouse")
conn = hook.get_conn()
cursor = conn.cursor()
cursor.execute("""
SELECT
COUNT(*) AS total_features,
SUM(CASE WHEN NOT ST_IsValid(geom) OR ST_IsEmpty(geom) THEN 1 ELSE 0 END) AS invalid_features,
COUNT(DISTINCT ST_SRID(geom)) AS unique_srids,
MIN(ST_SRID(geom)) AS expected_srid
FROM raw_ingestion_layer
WHERE ingestion_batch_id = %s;
""", (kwargs['dag_run'].run_id,))
total, invalid, unique_srids, expected = cursor.fetchone()
invalid_ratio = invalid / total if total > 0 else 0.0
if invalid_ratio > 0.005:
raise AirflowSkipException(
f"Geometry validity threshold breached: {invalid_ratio:.4%} invalid. "
f"Skipping downstream consumers to prevent topology corruption."
)
if unique_srids > 1:
raise ValueError(
f"CRS mismatch detected: {unique_srids} distinct SRIDs found. "
f"Expected uniform {expected}. Halt pipeline for projection realignment."
)
logging.info(f"Spatial trust boundaries validated: {total} features, {invalid_ratio:.4%} invalid ratio.")
cursor.close()
conn.close()
validate_task = PythonOperator(
task_id="validate_spatial_integrity",
python_callable=validate_spatial_integrity,
dag=dag
)
3. Threshold Mapping & Alert Routing
Threshold configuration drives MTTR reduction. Map each spatial metric to a precise alerting tier to ensure the right team receives actionable context. Reference the Geospatial Metric Taxonomy for ETL to standardize metric naming across your observability stack. Configure routing rules that bypass generic SRE queues when faults are strictly spatial.
| Metric | Threshold | Alert Tier | Routing Target |
|---|---|---|---|
| Geometry Validity Ratio | > 0.005 |
P1 Critical | GIS Platform Admins |
Spatial Index Fragmentation (idx_scan / seq_scan) |
< 0.65 |
P2 Warning | Database Reliability Engineers |
Topology Violation Count (ST_Relate failures) |
> 50 |
P1 Critical | Compliance & QA Ops |
Ingestion Latency (PostGIS pg_stat_activity) |
> 300s |
P2 Warning | Data Engineering |
Implement index fragmentation monitoring using PostgreSQL system catalogs. Route alerts via Airflow’s on_failure_callback or a dedicated Slack webhook:
def check_index_fragmentation(**kwargs):
hook = PostgresHook(postgres_conn_id="gis_warehouse")
conn = hook.get_conn()
cursor = conn.cursor()
cursor.execute("""
SELECT
relname,
idx_scan,
seq_scan,
CASE WHEN idx_scan + seq_scan = 0 THEN 0
ELSE idx_scan::float / (idx_scan + seq_scan)
END AS index_utilization
FROM pg_stat_user_tables
WHERE idx_scan + seq_scan > 1000
AND CASE WHEN idx_scan + seq_scan = 0 THEN 0
ELSE idx_scan::float / (idx_scan + seq_scan)
END < 0.65;
""")
degraded_tables = cursor.fetchall()
if degraded_tables:
table_names = [row[0] for row in degraded_tables]
raise RuntimeError(
f"Spatial index degradation detected: {', '.join(table_names)}. "
"Initiate REINDEX CONCURRENTLY."
)
cursor.close()
conn.close()
4. Observability Scoping & Extent Drift Detection
Scope health checks to specific bounding boxes using ST_Extent comparisons against known historical extents. Parcel boundaries, cadastral layers, and LiDAR-derived footprints require different tolerance thresholds than administrative boundaries. Flag any spatial extent drift exceeding 5.0 meters horizontally or 5% area expansion, as this typically indicates CRS misassignment or projection transformation errors during ingestion.
def detect_extent_drift(**kwargs):
hook = PostgresHook(postgres_conn_id="gis_warehouse")
conn = hook.get_conn()
cursor = conn.cursor()
cursor.execute("""
WITH current_extent AS (
SELECT ST_Extent(geom) AS bbox FROM raw_ingestion_layer
WHERE ingestion_batch_id = %s
),
historical_extent AS (
SELECT ST_Extent(geom) AS bbox FROM validated_master_layer
WHERE feature_type = 'cadastral'
)
SELECT
ST_Distance(
ST_Transform(ST_Centroid(c.bbox::geometry), 3857),
ST_Transform(ST_Centroid(h.bbox::geometry), 3857)
) AS centroid_drift_meters,
ST_Area(c.bbox::geometry) / NULLIF(ST_Area(h.bbox::geometry), 0) AS area_ratio
FROM current_extent c, historical_extent h;
""", (kwargs['dag_run'].run_id,))
drift_m, area_ratio = cursor.fetchone()
if drift_m > 5.0 or (area_ratio is not None and abs(area_ratio - 1.0) > 0.05):
raise AirflowSkipException(
f"Spatial extent drift detected: {drift_m:.2f}m centroid shift, "
f"{area_ratio:.2f} area ratio. Halt for CRS verification."
)
cursor.close()
conn.close()
5. OpenTelemetry Instrumentation & Advanced Debugging
Instrument spatial validation tasks with OpenTelemetry to trace metric lag and isolate bottlenecks in geometry processing. Attach custom attributes to spans for feature count, validation duration, and CRS codes. This enables precise correlation between Airflow task execution and downstream GIS API latency.
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.trace import Status, StatusCode
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer("spatial-etl-health")
def otel_instrumented_validation(**kwargs):
with tracer.start_as_current_span("spatial_integrity_check") as span:
span.set_attribute("spatial.feature_type", "cadastral")
span.set_attribute("spatial.validation_engine", "postgis")
try:
validate_spatial_integrity(**kwargs)
span.set_status(Status(StatusCode.OK))
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
For advanced debugging of spatial metric lag, correlate Airflow execution timestamps with PostGIS pg_stat_statements and query planner outputs. Use EXPLAIN (ANALYZE, BUFFERS) on validation queries to identify missing spatial indexes or inefficient ST_Intersects joins that cause pipeline timeouts.
6. Incident Playbook & Fallback Execution
When health checks trigger, execute the following deterministic runbook to minimize blast radius and restore pipeline stability:
- Acknowledge & Isolate: Confirm
AirflowSkipExceptionpropagation. Verify downstream DAGs are paused via Airflow UI or CLI:airflow dags pause <downstream_dag_id>. - Diagnose Fault Class:
- CRS Mismatch: Query
ST_SRIDdistribution. RunST_Transform(geom, <target_srid>)in a staging table. Validate withST_IsValid. - Topology Violations: Execute
ST_MakeValid()orST_SnapToGrid()on offending geometries. Log feature IDs for compliance review. - Index Bloat: Trigger
REINDEX CONCURRENTLYon spatial indexes. Monitorpg_stat_progress_create_indexfor lock contention.
- CRS Mismatch: Query
- Activate Fallback Chains: If primary spatial APIs fail validation, route requests to cached GeoPackage snapshots stored in S3. Implement circuit breakers using Airflow’s
BranchPythonOperatorto switch between live PostGIS and fallback static stores. - Verify & Resume: Re-run health checks against the remediated dataset. Once thresholds pass, unpause downstream DAGs and clear skipped task instances:
airflow tasks clear <dag_id> --start-date <date> --end-date <date>.
Maintain strict adherence to validation thresholds and observability scoping rules to prevent silent degradation. By embedding deterministic spatial checks directly into the Airflow execution graph, teams achieve predictable MTTR, enforce data quality at the boundary, and maintain compliance across multi-region GIS deployments.