Implementing Nested JSON/GeoJSON Flattening in Standardized ETL Pipelines Jump to heading
Geospatial data ingestion routinely encounters deeply nested payloads from municipal APIs, federal open-data portals, and third-party survey platforms. Normalizing these hierarchical structures into flat, relational, or columnar formats is a mandatory prerequisite for spatial indexing, downstream analysis, and compliance auditing. Nested JSON/GeoJSON Flattening operates as a discrete, high-precision transformation stage within broader Automated Attribute Transformation & ETL Workflows, requiring deterministic mapping logic, strict schema enforcement, and predictable fallback routing. This guide details the production implementation of a flattening module, emphasizing configuration-as-code, compliance alignment, and resource-efficient execution.
Configuration-Driven Architecture & Mapping Manifests Jump to heading
Hardcoded parsing routines introduce maintenance debt and schema drift vulnerabilities. The flattening stage must be driven by version-controlled YAML manifests that externalize all transformation logic. Each mapping rule defines a source JSONPath or dot-notation trajectory, a target column identifier, a delimiter convention, an expected data type, and explicit mandatory/optional status.
# mapping_manifest.yaml
schema_version: "1.0"
delimiter: "__"
rules:
- source_path: "properties.admin.zoning.code"
target_column: "zoning_code"
type: "string"
mandatory: true
fallback: null
- source_path: "properties.survey.responses"
target_column: "survey_responses"
type: "json_string"
mandatory: false
fallback: "[]"
array_policy: "serialize"
- source_path: "properties.metadata.last_updated"
target_column: "record_timestamp"
type: "iso8601"
mandatory: true
fallback: "1970-01-01T00:00:00Z"
Pipeline initialization must validate the manifest against a central JSON Schema before execution. Invalid path definitions, circular references, or conflicting target names trigger immediate pipeline halts with structured diagnostic output. This declarative architecture aligns directly with established Field Renaming & Type Coercion Rules, ensuring that attribute transformations remain auditable, reproducible, and environment-agnostic.
Deterministic Flattening Logic & GeoJSON Preservation Jump to heading
GeoJSON introduces rigid structural constraints that generic JSON flattening routines frequently violate. Per RFC 7946, top-level reserved keys (type, bbox, geometry, id, crs) must remain intact. The implementation must isolate these keys and restrict recursive traversal exclusively to the properties dictionary.
# geojson_flattener.py
import json
import sys
from typing import Any, Dict, List, Union
from pathlib import Path
RESERVED_KEYS = {"type", "bbox", "geometry", "id", "crs"}
def flatten_geojson(
feature: Dict[str, Any],
rules: List[Dict[str, Any]],
delimiter: str = "__"
) -> Dict[str, Any]:
"""Flattens a single GeoJSON feature according to manifest rules."""
output = {k: v for k, v in feature.items() if k in RESERVED_KEYS}
properties = feature.get("properties", {})
# Iterative stack avoids recursion depth limits
stack = [(properties, "")]
flat_props: Dict[str, Any] = {}
while stack:
current, prefix = stack.pop()
if isinstance(current, dict):
for key, val in current.items():
new_key = f"{prefix}{delimiter}{key}" if prefix else key
if isinstance(val, (dict, list)):
stack.append((val, new_key))
else:
flat_props[new_key] = val
# Apply manifest rules
for rule in rules:
target = rule["target_column"]
source = rule["source_path"]
fallback = rule.get("fallback")
mandatory = rule.get("mandatory", False)
value = flat_props.get(source)
if value is None:
if mandatory:
raise ValueError(f"Mandatory field missing: {source}")
output[target] = fallback
continue
# Type coercion & array policy
if rule.get("array_policy") == "serialize" and isinstance(value, list):
output[target] = json.dumps(value, separators=(",", ":"))
else:
output[target] = value
return output
For complex municipal datasets containing nested administrative boundaries, multi-level zoning attributes, or survey arrays, deterministic traversal prevents structural corruption. Detailed strategies for handling irregular geometries and preserving spatial topology during flattening are covered in Flattening deeply nested GeoJSON feature collections safely.
Schema Enforcement & Compliance Routing Jump to heading
Strict compliance alignment requires explicit handling of mandatory versus optional fields at runtime. Mandatory fields trigger pipeline failures or quarantine routing when absent or malformed. Optional fields default to configurable fallback values without interrupting execution.
| Field Attribute | Behavior | Compliance Action |
|---|---|---|
mandatory: true |
Must resolve to non-null value | Halt & quarantine on missing/invalid |
mandatory: false |
May be absent or null | Apply fallback or leave null |
type: iso8601 |
Validates datetime format | Reject non-conforming strings |
type: json_string |
Serializes nested structures | Compact serialization for archival |
This routing model integrates seamlessly with Batch Schema Processing Pipelines, enabling parallel validation across feature collections while maintaining strict audit trails for regulatory reporting.
Error Handling & Resource Optimization Jump to heading
Production ETL systems must handle malformed payloads, network timeouts, and memory constraints gracefully. Implement exponential backoff for transient API failures and enforce strict memory boundaries when processing large feature collections.
# retry_handler.py
import time
from functools import wraps
from typing import Callable, Any
def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
exceptions: tuple = (ConnectionError, TimeoutError)
) -> Callable:
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
delay = base_delay
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except exceptions as e:
if attempt == max_retries - 1:
raise
time.sleep(delay)
delay *= 2
return wrapper
return decorator
For datasets exceeding available RAM, stream features incrementally using generator-based parsers and write flattened rows directly to Parquet or PostGIS via batch inserts. Comprehensive techniques for managing heap allocation and preventing garbage collection bottlenecks are detailed in Optimizing memory usage for large GeoJSON flattening tasks.
CI/CD Validation Pipeline Jump to heading
Configuration manifests and flattening logic must be validated before deployment. A minimal GitHub Actions workflow ensures schema compliance, unit test coverage, and linting standards.
# .github/workflows/validate-flattener.yml
name: Validate Flattening Pipeline
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.10"
- name: Install dependencies
run: pip install pyyaml jsonschema pytest
- name: Validate YAML Manifest
run: |
python -c "
import yaml, jsonschema
with open('mapping_manifest.yaml') as f: manifest = yaml.safe_load(f)
schema = {'type': 'object', 'properties': {'rules': {'type': 'array'}}}
jsonschema.validate(manifest, schema)
print('Manifest valid.')
"
- name: Run Unit Tests
run: pytest tests/ --tb=short
This pipeline enforces version control discipline, prevents schema drift, and guarantees that all transformation logic remains deterministic across staging and production environments.