Integration Patterns
Integrate OilPriceAPI into larger systems: microservices architecture, event-driven updates, data pipelines, and real-time streaming.
Problem
Embedding price data into a single script is easy. Doing it reliably across a distributed system is harder: multiple services need the same data, updates must propagate without every service polling the API independently, and pipelines need a clean, governed source of truth. You want patterns that keep API usage bounded while making fresh prices available everywhere they are needed.
Solution Architecture
Centralize ingestion behind a single price service, then distribute internally:
┌──> Cache / read API ──> Service A
OilPriceAPI ──> Price Service ──> Event bus ──> Service B
(webhook + (1 ingest (fan-out) Service C
backfill) point)
└──> Data pipeline ──> Warehouse
- One ingestion service is the only thing that talks to OilPriceAPI.
- It publishes updates onto an event bus so other services react instead of polling.
- A data pipeline lands the same events in your warehouse for analytics.
Code Implementation
Microservices architecture
Run a dedicated price service that owns the OilPriceAPI credentials and exposes a small internal API. Other microservices call your price service, not OilPriceAPI directly. This gives you one place to manage rate limits, caching, retries, and the API key, and prevents quota fragmentation across teams.
# price-service: the only service holding the OilPriceAPI key
from fastapi import FastAPI
import requests
app = FastAPI()
BASE_URL = "https://api.oilpriceapi.com/v1"
@app.get("/internal/prices/{commodity}")
def latest(commodity: str):
resp = requests.get(
f"{BASE_URL}/prices/latest",
headers={"Authorization": "Token YOUR_API_KEY"},
params={"commodity": commodity},
timeout=10,
)
resp.raise_for_status()
return resp.json()
Event-driven updates
Instead of every consumer polling, have the ingestion service publish a message whenever prices change (driven by an OilPriceAPI webhook). Downstream services subscribe and react.
import json
def on_webhook(event, publisher):
"""Translate an OilPriceAPI webhook into an internal domain event."""
message = {
"type": "price.updated",
"commodity": event["commodity"],
"price": event["price"],
"observed_at": event["created_at"],
}
publisher.publish("prices", json.dumps(message)) # Kafka/SNS/RabbitMQ
Data pipeline integration
Land the same events into a warehouse for analytics. A consumer reads from the bus and batches writes to your store:
def pipeline_consumer(stream, warehouse):
batch = []
for message in stream:
batch.append(json.loads(message))
if len(batch) >= 500:
warehouse.bulk_insert("commodity_prices", batch)
batch.clear()
Pair the live stream with a periodic backfill job that uses the historical endpoint to fill any gaps (see the Data Analysis tutorial).
Real-time streaming
For sub-minute latency, consume the WebSocket feed instead of polling. See the WebSocket API and its architecture guide. The ingestion service holds a single WebSocket connection and republishes ticks onto your internal event bus, so consumers get streaming updates without each opening their own connection.
Best Practices
- Single ingestion point. Only one service should hold the API key and talk to OilPriceAPI; everything else consumes internally.
- Push, don't poll, between services. Use an event bus so consumers scale independently of API usage.
- Make consumers idempotent. Key events by ID so re-delivery is harmless.
- Pair streaming with backfill. Live events for freshness, historical endpoint for completeness and gap recovery.
- Centralize retries and circuit breaking in the ingestion service so a provider hiccup does not cascade.
Common Pitfalls
- Every service calling OilPriceAPI directly, multiplying quota usage and scattering the API key. Centralize ingestion instead.
- Treating webhooks/streams as exactly-once. Delivery is at-least-once; design consumers to dedupe.
- No backfill path. If the stream drops, gaps stay forever unless a historical backfill job reconciles them.
- Tight coupling to payload shape. Translate provider payloads into a stable internal event schema so a provider change does not ripple through every service.