Reserved Instance Mapping Logic

Pipeline Architecture & Positioning

Reserved Instance Mapping Logic operates at the normalization and allocation layer of a cloud cost data pipeline. Raw billing exports deliver unattributed hourly usage records, while commitment inventories (Reserved Instances, Savings Plans, Committed Use Discounts, and Azure Reservations) exist as separate financial instruments. The mapping engine must deterministically correlate usage with active discount scopes, applying vendor-specific precedence rules before costs are distributed to business units. This stage executes after schema standardization and before FinOps Architecture & Billing Fundamentals chargeback/showback distribution.

In production, mapping must handle multi-account hierarchies, cross-tenant sharing, and temporal misalignments between usage ingestion and commitment activation. The engine typically runs as a daily batch job, consuming normalized Parquet/CSV exports and outputting a coverage matrix with utilization metrics, effective rate adjustments, and unallocated on-demand deltas. Accurate mapping prevents cost leakage, ensures compliance with procurement contracts, and provides the foundational data required for capacity forecasting and rightsizing initiatives.

Core Mapping Principles

Vendor Precedence Hierarchies

Cloud providers apply discount instruments using strict, non-interchangeable consumption orders. Misapplying these rules results in phantom utilization or inflated on-demand spend.

  • AWS: Compute-optimized Savings Plans consume eligible hours first, followed by regional Reserved Instances, then zonal Reserved Instances. Instance family matching requires exact alignment with instance_type prefixes (e.g., c5, m6g).
  • GCP: Regional committed use discounts apply across all zones within a region before zonal commitments are evaluated. GCP also supports flexible family matching (e.g., N2 to N2D) depending on contract terms.
  • Azure: Reservation utilization maps to specific VM series within the same subscription or resource group scope. Scope boundaries and Azure Hybrid Benefit Cost Tracking licensing overlays must be resolved prior to assignment.

Temporal Alignment & Boundary Resolution

Billing exports rarely align perfectly with commitment activation windows. The engine must:

  1. Convert all timestamps to UTC.
  2. Handle partial-hour boundaries by prorating usage to the nearest billing increment.
  3. Account for activation delays (typically 15–60 minutes post-purchase).
  4. Generate a time-indexed usage matrix keyed by account_id, instance_type, availability_zone, and hour_utc.

Scope Resolution & Cross-Account Sharing

Organizations frequently share commitments across organizational units via consolidated billing or resource sharing. The mapping layer must respect sharing hierarchies: payer accounts apply commitments to linked accounts only when explicitly enabled, and zonal commitments cannot satisfy usage in different availability zones unless regional flexibility is purchased.

Step-by-Step Implementation Workflow

  1. Commitment Inventory Sync: Query vendor billing APIs to retrieve active discount instruments. Normalize fields to a canonical schema: commitment_id, start_utc, end_utc, scope (regional/zonal/account), instance_family, os_license, tenancy, and purchased_vcpu. Pagination and rate-limit handling are mandatory at this stage.
  2. Usage Normalization: Ingest hourly usage exports and map vendor-specific SKUs to a unified family taxonomy. Filter to eligible instance types (compute, memory-optimized, GPU) and exclude non-eligible workloads (spot, free tier, data transfer, storage).
  3. Temporal Alignment: Convert all timestamps to UTC. Handle partial-hour boundaries, commitment activation delays, and overlapping scopes. Generate a time-indexed usage matrix keyed by account_id, instance_type, availability_zone, and hour_utc.
  4. Priority-Based Assignment: Apply vendor matching hierarchies. For AWS, compute-optimized Savings Plans consume first, followed by regional RIs, then zonal RIs. For GCP, regional committed use discounts apply before zonal. For Azure, reservation utilization maps to specific VM series within the same resource group or subscription scope.
  5. Coverage & Utilization Calculation: Compute effective_coverage = discounted_hours / total_eligible_hours and utilization = consumed_hours / purchased_hours. Track unallocated on-demand hours and flag commitments with utilization below threshold (typically <70%).
  6. Idempotent State Persistence: Write mapping results to a data warehouse or object store with deterministic run IDs. Implement upsert logic to prevent duplicate allocation during pipeline retries.

Production-Grade Python Engine

The following implementation demonstrates a memory-efficient, retry-aware mapping engine. It uses paginated API consumers, exponential backoff, IAM credential rotation handling, and chunked processing to maintain sub-4GB memory footprints even at enterprise scale. Financial calculations leverage Python’s decimal module to prevent floating-point drift.

import os
import uuid
import logging
import requests
from decimal import Decimal, ROUND_HALF_UP
from datetime import datetime, timezone, timedelta
from typing import Dict, List, Optional, Generator, Tuple
from dataclasses import dataclass, field
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)

@dataclass
class Commitment:
    commitment_id: str
    start_utc: datetime
    end_utc: datetime
    scope: str  # regional | zonal | account
    instance_family: str
    os_license: str
    tenancy: str
    purchased_hours: Decimal
    consumed_hours: Decimal = Decimal("0")
    utilization_pct: Decimal = Decimal("0")

@dataclass
class UsageRecord:
    account_id: str
    instance_type: str
    availability_zone: str
    hour_utc: datetime
    eligible_hours: Decimal
    on_demand_rate: Decimal

class CommitmentMapper:
    def __init__(self, api_base_url: str, auth_headers: Dict[str, str], chunk_size: int = 5000):
        self.api_base_url = api_base_url
        self.headers = auth_headers
        self.chunk_size = chunk_size
        self.session = self._build_retry_session()

    def _build_retry_session(self) -> requests.Session:
        session = requests.Session()
        retry_strategy = Retry(
            total=5,
            backoff_factor=1.5,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["GET", "POST"]
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount("http://", adapter)
        session.mount("https://", adapter)
        return session

    def fetch_commitments(self) -> Generator[Commitment, None, None]:
        """Paginated commitment inventory sync with token rotation handling."""
        next_token = None
        while True:
            params = {"limit": self.chunk_size}
            if next_token:
                params["next_token"] = next_token

            resp = self.session.get(
                f"{self.api_base_url}/commitments",
                headers=self.headers,
                params=params,
                timeout=30
            )
            resp.raise_for_status()
            payload = resp.json()

            for item in payload.get("items", []):
                yield Commitment(
                    commitment_id=item["id"],
                    start_utc=datetime.fromisoformat(item["start_utc"].replace("Z", "+00:00")),
                    end_utc=datetime.fromisoformat(item["end_utc"].replace("Z", "+00:00")),
                    scope=item["scope"],
                    instance_family=item["family"],
                    os_license=item.get("os_license", "linux"),
                    tenancy=item.get("tenancy", "shared"),
                    purchased_hours=Decimal(str(item["purchased_hours"]))
                )

            next_token = payload.get("next_token")
            if not next_token:
                break

    def normalize_usage(self, raw_records: List[Dict]) -> List[UsageRecord]:
        """Filter and map vendor SKUs to canonical taxonomy."""
        normalized = []
        for rec in raw_records:
            if rec.get("usage_type") in ("Spot", "FreeTier", "DataTransfer"):
                continue
            normalized.append(UsageRecord(
                account_id=rec["account_id"],
                instance_type=rec["instance_type"],
                availability_zone=rec["az"],
                hour_utc=datetime.fromisoformat(rec["hour_utc"].replace("Z", "+00:00")),
                eligible_hours=Decimal(str(rec["usage_hours"])),
                on_demand_rate=Decimal(str(rec["on_demand_rate"]))
            ))
        return normalized

    def assign_commitments(self, commitments: List[Commitment], usage: List[UsageRecord]) -> List[Dict]:
        """Greedy priority-based assignment respecting vendor precedence."""
        results = []
        usage.sort(key=lambda u: u.hour_utc)

        for u in usage:
            allocated_hours = Decimal("0")
            applicable = [
                c for c in commitments
                if c.instance_family == u.instance_type.split(".")[0]
                and c.start_utc <= u.hour_utc < c.end_utc
                and (c.scope == "regional" or c.scope == u.availability_zone)
                and c.consumed_hours < c.purchased_hours
            ]

            # Sort by precedence: regional/zonal/account logic handled by scope priority
            applicable.sort(key=lambda c: (0 if c.scope == "regional" else 1, c.start_utc))

            for c in applicable:
                if allocated_hours >= u.eligible_hours:
                    break
                remaining_capacity = c.purchased_hours - c.consumed_hours
                consume = min(u.eligible_hours - allocated_hours, remaining_capacity)
                c.consumed_hours += consume
                allocated_hours += consume

            on_demand_hours = u.eligible_hours - allocated_hours
            results.append({
                "account_id": u.account_id,
                "instance_type": u.instance_type,
                "az": u.availability_zone,
                "hour_utc": u.hour_utc.isoformat(),
                "covered_hours": allocated_hours,
                "on_demand_hours": on_demand_hours,
                "effective_rate": (allocated_hours * Decimal("0.00") + on_demand_hours * u.on_demand_rate) / u.eligible_hours if u.eligible_hours > 0 else Decimal("0")
            })
        return results

    def calculate_utilization(self, commitments: List[Commitment]) -> List[Dict]:
        """Compute coverage and utilization metrics."""
        metrics = []
        for c in commitments:
            c.utilization_pct = (c.consumed_hours / c.purchased_hours * 100).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
            metrics.append({
                "commitment_id": c.commitment_id,
                "purchased_hours": c.purchased_hours,
                "consumed_hours": c.consumed_hours,
                "utilization_pct": c.utilization_pct,
                "status": "underutilized" if c.utilization_pct < 70 else "optimal"
            })
        return metrics

    def persist_results(self, run_id: str, mapping_results: List[Dict], utilization_metrics: List[Dict]) -> None:
        """Idempotent state persistence with deterministic run IDs."""
        # In production, replace with Parquet write to S3/GCS or warehouse upsert
        logger.info(f"Persisting run_id={run_id} | records={len(mapping_results)} | metrics={len(utilization_metrics)}")
        # Example: df.to_parquet(f"s3://finops-data/mapping/run_id={run_id}/")

def run_mapping_pipeline():
    auth_headers = {"Authorization": f"Bearer {os.environ.get('BILLING_API_TOKEN')}"}
    mapper = CommitmentMapper(
        api_base_url="https://billing-api.cloudprovider.internal/v1",
        auth_headers=auth_headers,
        chunk_size=10000
    )

    run_id = str(uuid.uuid4())
    logger.info(f"Starting mapping pipeline | run_id={run_id}")

    commitments = list(mapper.fetch_commitments())
    logger.info(f"Synced {len(commitments)} active commitments")

    # Simulate raw usage ingestion
    raw_usage = [
        {"account_id": "acc-01", "instance_type": "c5.xlarge", "az": "us-east-1a",
         "hour_utc": "2024-01-15T10:00:00Z", "usage_type": "OnDemand", "usage_hours": "1.0", "on_demand_rate": "0.170"}
    ]
    normalized = mapper.normalize_usage(raw_usage)

    mapping_results = mapper.assign_commitments(commitments, normalized)
    utilization = mapper.calculate_utilization(commitments)

    mapper.persist_results(run_id, mapping_results, utilization)
    logger.info("Pipeline execution complete")

if __name__ == "__main__":
    run_mapping_pipeline()

Operational Hardening

Memory & Throughput Optimization

Processing millions of hourly records requires streaming architectures. The engine above uses generator-based API consumption and chunked in-memory sorting. For datasets exceeding 100M rows, offload temporal joins to a columnar engine (e.g., DuckDB, Apache Spark, or BigQuery) and use the Python layer strictly for orchestration and vendor-rule application.

Idempotency & Retry Safety

Cloud billing APIs occasionally return partial payloads or transient 5xx errors. The mapping pipeline must:

  • Generate a deterministic run_id per execution window.
  • Write intermediate state to a staging partition before committing to the production table.
  • Implement INSERT ... ON CONFLICT DO UPDATE or equivalent upsert semantics keyed on (account_id, instance_type, hour_utc, run_id).

Monitoring & Drift Detection

Deploy automated alerts when:

  • Commitment utilization drops below 70% for three consecutive billing cycles.
  • Unallocated on-demand spend exceeds 15% of total compute cost.
  • Temporal gaps exceed 2 hours between usage ingestion and commitment activation.

Integrate mapping outputs into dashboards alongside AWS Cost Explorer Architecture data to validate vendor-reported coverage against internal calculations. Discrepancies often indicate misconfigured sharing scopes or untracked cross-account usage.

Financial Precision

Cloud billing requires exact decimal arithmetic. The implementation above uses Python’s decimal.Decimal to prevent IEEE-754 floating-point accumulation errors. Always quantize final rates to 4–6 decimal places before writing to financial reporting systems, and validate against vendor-provided GCP Billing Export Configuration and AWS cost allocation reports monthly.

  • AWS Savings Plans vs RI Coverage Analysis
  • GCP Committed Use Discount Optimization