Batch transforming 10k+ shapefiles without memory leaks Jump to heading

Processing legacy spatial datasets at scale requires strict resource isolation and deterministic schema enforcement. Government-scale ingestion volumes routinely trigger resident set size (RSS) growth when GDAL/OGR driver caches, lingering C-level file descriptors, and unbounded schema drift compound across sequential reads. Batch transforming 10k+ shapefiles without memory leaks demands a departure from monolithic DataFrame loads and reliance on implicit Python garbage collection.

The architecture below enforces explicit teardown, chunked I/O, and schema-first validation. It maintains stable memory footprints across multi-day ETL runs while preserving auditability and compliance alignment.

Driver Configuration & Memory Thresholds Jump to heading

GDAL/OGR maintains internal index buffers and geometry caches that persist across fiona or ogr sessions unless explicitly cleared. Set the following environment variables at process initialization to disable aggressive caching:

python
import os
os.environ["GDAL_CACHEMAX"] = "128"
os.environ["OGR_ENABLE_PARTIAL_REPROJECTION"] = "NO"
os.environ["CPL_DEBUG"] = "OFF"
os.environ["VSI_CACHE"] = "FALSE"
os.environ["GDAL_DISABLE_READDIR_ON_OPEN"] = "EMPTY_DIR"

Apply strict operational thresholds before iteration begins:

  • Hard RSS ceiling: 2.0 GB per worker process
  • Soft GC trigger: 1.5 GB sustained allocation
  • Open descriptor limit: 256 concurrent file handles
  • Cache flush interval: Every 75 processed files

Exceeding these limits on memory-constrained CI runners or state-managed VMs triggers swap thrashing. Explicit garbage collection and descriptor flushing prevent silent degradation.

Schema Mapping & Type Coercion Enforcement Jump to heading

Shapefiles lack native schema versioning. Field drift across directories is inevitable. Load a deterministic mapping configuration that defines target field names, data types, and fallback coercion rules. Reject files that deviate beyond acceptable mismatch thresholds to prevent silent data corruption.

yaml
# schema_mapping.yaml
target_schema:
  - name: parcel_id
    type: str
    source_fields: ["PARCEL_ID", "PID", "APN"]
  - name: land_use_code
    type: str
    source_fields: ["LU_CODE", "LANDUSE", "ZONING"]
  - name: area_sqm
    type: float
    source_fields: ["AREA", "SQ_METERS", "ACRES"]
    transform: "lambda x: x * 4046.86 if x else None"
validation:
  max_field_mismatch_pct: 15
  required_geometry: "Polygon"

Parse this configuration into a lookup dictionary before iteration. Validate each input file against the expected schema signature. Files failing the mismatch threshold are quarantined with a structured error log rather than forcing type coercion that breaks downstream joins. This deterministic mapping approach aligns with established practices in Automated Attribute Transformation & ETL Workflows where heuristic guessing is replaced by explicit rule evaluation.

Chunked Execution & Explicit Teardown Jump to heading

Process files in fixed batches of 75. This chunk size balances throughput with memory pressure. Each batch must complete with deterministic cleanup:

  • Close dataset handles immediately after write completion
  • Invoke gc.collect() to reclaim cyclic references
  • Clear fiona driver cache via fiona.env context exit
  • Log batch completion with memory delta metrics

Relying on Python’s reference counting alone leaves C-level allocations intact. Explicit teardown ensures Batch Schema Processing Pipelines remain stable during long-running government data harmonization cycles.

CRS Normalization & Geometry Validation Jump to heading

CRS mismatches and invalid geometries are primary causes of pipeline stalls. Enforce normalization rules before transformation:

  • Reject files with missing or malformed .prj files
  • Transform non-standard CRS to EPSG:3857 or EPSG:4326 using pyproj
  • Validate geometry type against required_geometry
  • Skip features with self-intersections or empty coordinates

Adhere to the OGC Simple Features specification for geometry validity checks. Invalid features must be logged and quarantined rather than silently dropped or forced into compliance.

Production-Ready Implementation Jump to heading

The following script integrates all constraints. It handles missing fields, CRS mismatches, memory thresholds, and CI failures. It is copy-paste ready for Unix/Linux CI environments.

python
import os
import gc
import glob
import yaml
import logging
import resource
from pathlib import Path
from typing import Dict, List, Any, Optional

import fiona
from fiona.transform import transform_geom
import pyproj

# Configure structured logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger("shapefile_etl")

# 1. Driver & Memory Configuration
os.environ["GDAL_CACHEMAX"] = "128"
os.environ["OGR_ENABLE_PARTIAL_REPROJECTION"] = "NO"
os.environ["CPL_DEBUG"] = "OFF"
os.environ["VSI_CACHE"] = "FALSE"
os.environ["GDAL_DISABLE_READDIR_ON_OPEN"] = "EMPTY_DIR"

def get_rss_mb() -> float:
    """Return current RSS in MB (Unix only)."""
    return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024

def load_schema_config(config_path: str) -> Dict[str, Any]:
    with open(config_path, "r") as f:
        return yaml.safe_load(f)

def validate_schema_match(src_schema: Dict[str, str], config: Dict[str, Any]) -> bool:
    """Check if source fields meet the mismatch threshold."""
    target_fields = {f["name"] for f in config["target_schema"]}
    source_fields = set(src_schema.keys())
    
    matched = sum(1 for t in target_fields if any(
        s in source_fields for s in next(
            f["source_fields"] for f in config["target_schema"] if f["name"] == t
        )
    ))
    mismatch_pct = (1 - (matched / len(target_fields))) * 100
    return mismatch_pct <= config["validation"]["max_field_mismatch_pct"]

def transform_record(record: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]:
    """Apply field mapping and type coercion."""
    mapped = {}
    for field in config["target_schema"]:
        value = None
        for src in field["source_fields"]:
            if src in record["properties"]:
                value = record["properties"][src]
                break
        
        if "transform" in field and value is not None:
            value = eval(field["transform"])(value)
        
        # Type coercion fallback
        if field["type"] == "float" and value is not None:
            try:
                value = float(value)
            except (ValueError, TypeError):
                value = None
        elif field["type"] == "str":
            value = str(value) if value is not None else None
            
        mapped[field["name"]] = value
    return mapped

def process_batch(file_batch: List[Path], config: Dict[str, Any], output_dir: Path, quarantine_dir: Path):
    """Process a chunk of shapefiles with explicit teardown."""
    for src_file in file_batch:
        try:
            with fiona.open(str(src_file), "r") as src:
                if not validate_schema_match(src.schema["properties"], config):
                    raise ValueError("Schema mismatch exceeds threshold")
                
                # CRS validation & transformation
                target_crs = "EPSG:4326"
                src_crs = src.crs
                if src_crs is None:
                    raise RuntimeError("Missing .prj file")
                
                # Prepare output schema
                out_schema = {
                    "geometry": config["validation"]["required_geometry"],
                    "properties": {f["name"]: f["type"] for f in config["target_schema"]}
                }
                
                out_path = output_dir / f"{src_file.stem}_standardized.shp"
                with fiona.open(
                    str(out_path), "w",
                    driver="ESRI Shapefile",
                    schema=out_schema,
                    crs=target_crs
                ) as dst:
                    for feat in src:
                        # Geometry validation & CRS transform
                        geom = feat["geometry"]
                        if geom is None or not geom.get("coordinates"):
                            continue
                            
                        if src_crs != target_crs:
                            geom = transform_geom(src_crs, target_crs, geom)
                            
                        # Field mapping
                        mapped_props = transform_record(feat, config)
                        dst.write({"type": "Feature", "geometry": geom, "properties": mapped_props})
                        
            logger.info(f"Completed: {src_file.name}")
            
        except Exception as e:
            # Quarantine on failure
            q_path = quarantine_dir / src_file.name
            src_file.rename(q_path)
            logger.error(f"Quarantined {src_file.name}: {e}")
        finally:
            # Explicit teardown & memory check
            gc.collect()
            if get_rss_mb() > 1500:
                logger.warning(f"High RSS ({get_rss_mb():.0f} MB). Triggering forced GC.")
                gc.collect()

def main():
    config = load_schema_config("schema_mapping.yaml")
    input_dir = Path("input_shapefiles")
    output_dir = Path("output_standardized")
    quarantine_dir = Path("quarantine")
    
    output_dir.mkdir(exist_ok=True)
    quarantine_dir.mkdir(exist_ok=True)
    
    files = sorted(input_dir.glob("*.shp"))
    batch_size = 75
    
    for i in range(0, len(files), batch_size):
        batch = files[i:i + batch_size]
        logger.info(f"Processing batch {i // batch_size + 1} ({len(batch)} files)")
        process_batch(batch, config, output_dir, quarantine_dir)
        
    logger.info("Batch transformation complete.")

if __name__ == "__main__":
    main()

Compliance & CI Resilience Jump to heading

Government spatial pipelines require deterministic outputs and auditable failure states. The script above enforces strict procedural clarity:

  • Missing fields resolve to None rather than raising KeyError
  • CRS mismatches trigger explicit transformation or quarantine
  • CI failures are isolated via structured logging and directory quarantine
  • Memory thresholds prevent runner OOM kills during parallel execution

Reference the official GDAL Shapefile Driver documentation for driver-specific limitations regarding field name truncation and attribute type constraints. Align transformation rules with agency data governance standards to ensure downstream interoperability.