Oil Price API Documentation - Quick Start in 5 Minutes | REST API
GitHub
GitHub

Webhook Cookbook

This cookbook provides production-ready recipes for common webhook integrations. Each recipe includes complete code in JavaScript and Python, tested patterns, and deployment considerations.

Recipe 1: Slack Price Alerts

Post price alerts to a Slack channel when oil exceeds a threshold.

Overview

flowchart LR
    OPA[OilPriceAPI] -->|price.updated| WH[Your Webhook]
    WH -->|Check threshold| LOGIC{Price > $80?}
    LOGIC -->|Yes| SLACK[Slack API]
    LOGIC -->|No| DROP[Ignore]
    SLACK --> CH[#oil-alerts]

JavaScript Implementation

const express = require("express");
const crypto = require("crypto");

const app = express();
app.use(express.json());

// Configuration
const WEBHOOK_SECRET = process.env.OILPRICE_WEBHOOK_SECRET;
const SLACK_WEBHOOK_URL = process.env.SLACK_WEBHOOK_URL;
const PRICE_THRESHOLD = 80.0;
const ALERT_COMMODITIES = ["BRENT_CRUDE_USD", "WTI_USD"];

// Signature verification
function verifySignature(payload, signature, timestamp) {
  if (Date.now() - parseInt(timestamp) * 1000 > 300000) {
    return false;
  }
  const expected = crypto
    .createHmac("sha256", WEBHOOK_SECRET)
    .update(`${JSON.stringify(payload)}.${timestamp}`)
    .digest("hex");
  return crypto.timingSafeEqual(Buffer.from(expected), Buffer.from(signature));
}

// Format Slack message
function formatSlackMessage(price) {
  const emoji =
    price.change_percent >= 0
      ? ":chart_with_upwards_trend:"
      : ":chart_with_downwards_trend:";
  const color = price.change_percent >= 0 ? "#36a64f" : "#dc3545";

  return {
    attachments: [
      {
        color: color,
        blocks: [
          {
            type: "header",
            text: {
              type: "plain_text",
              text: `${emoji} Price Alert: ${price.name}`,
              emoji: true,
            },
          },
          {
            type: "section",
            fields: [
              {
                type: "mrkdwn",
                text: `*Current Price:*\n$${price.value.toFixed(2)}`,
              },
              {
                type: "mrkdwn",
                text: `*Change:*\n${price.change_percent >= 0 ? "+" : ""}${price.change_percent.toFixed(2)}%`,
              },
              {
                type: "mrkdwn",
                text: `*Threshold:*\n$${PRICE_THRESHOLD.toFixed(2)}`,
              },
              {
                type: "mrkdwn",
                text: `*Time:*\n${new Date(price.timestamp).toLocaleString()}`,
              },
            ],
          },
        ],
      },
    ],
  };
}

// Send to Slack
async function sendSlackAlert(price) {
  const message = formatSlackMessage(price);

  const response = await fetch(SLACK_WEBHOOK_URL, {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify(message),
  });

  if (!response.ok) {
    throw new Error(`Slack API error: ${response.status}`);
  }
}

// Webhook endpoint
app.post("/webhook/prices", async (req, res) => {
  // Verify signature
  const signature = req.headers["x-oilpriceapi-signature"];
  const timestamp = req.headers["x-oilpriceapi-signature-timestamp"];

  if (!verifySignature(req.body, signature, timestamp)) {
    return res.status(401).send("Invalid signature");
  }

  // Acknowledge immediately
  res.status(200).send("OK");

  // Process asynchronously
  try {
    const { type, data } = req.body;

    if (
      type === "price.updated" &&
      ALERT_COMMODITIES.includes(data.commodity) &&
      data.value >= PRICE_THRESHOLD
    ) {
      await sendSlackAlert(data);
      console.log(`Alert sent for ${data.commodity}: $${data.value}`);
    }
  } catch (error) {
    console.error("Error processing webhook:", error);
  }
});

app.listen(3000, () => console.log("Webhook server running on port 3000"));

Python Implementation

from flask import Flask, request, jsonify
import hmac
import hashlib
import time
import json
import requests
import os

app = Flask(__name__)

# Configuration
WEBHOOK_SECRET = os.environ['OILPRICE_WEBHOOK_SECRET']
SLACK_WEBHOOK_URL = os.environ['SLACK_WEBHOOK_URL']
PRICE_THRESHOLD = 80.00
ALERT_COMMODITIES = ['BRENT_CRUDE_USD', 'WTI_USD']

def verify_signature(payload, signature, timestamp):
    """Verify OilPriceAPI webhook signature."""
    if int(time.time()) - int(timestamp) > 300:
        return False

    payload_string = json.dumps(payload, separators=(',', ':'))
    expected = hmac.new(
        WEBHOOK_SECRET.encode(),
        f"{payload_string}.{timestamp}".encode(),
        hashlib.sha256
    ).hexdigest()

    return hmac.compare_digest(expected, signature)

def format_slack_message(price):
    """Format price data as Slack message."""
    emoji = ':chart_with_upwards_trend:' if price['change_percent'] >= 0 else ':chart_with_downwards_trend:'
    color = '#36a64f' if price['change_percent'] >= 0 else '#dc3545'

    return {
        'attachments': [{
            'color': color,
            'blocks': [
                {
                    'type': 'header',
                    'text': {
                        'type': 'plain_text',
                        'text': f"{emoji} Price Alert: {price['name']}",
                        'emoji': True
                    }
                },
                {
                    'type': 'section',
                    'fields': [
                        {'type': 'mrkdwn', 'text': f"*Current Price:*\n${price['value']:.2f}"},
                        {'type': 'mrkdwn', 'text': f"*Change:*\n{'+' if price['change_percent'] >= 0 else ''}{price['change_percent']:.2f}%"},
                        {'type': 'mrkdwn', 'text': f"*Threshold:*\n${PRICE_THRESHOLD:.2f}"},
                        {'type': 'mrkdwn', 'text': f"*Time:*\n{price['timestamp']}"}
                    ]
                }
            ]
        }]
    }

def send_slack_alert(price):
    """Send alert to Slack."""
    message = format_slack_message(price)
    response = requests.post(SLACK_WEBHOOK_URL, json=message)
    response.raise_for_status()

@app.route('/webhook/prices', methods=['POST'])
def handle_webhook():
    # Verify signature
    signature = request.headers.get('X-OilPriceAPI-Signature')
    timestamp = request.headers.get('X-OilPriceAPI-Signature-Timestamp')

    if not verify_signature(request.json, signature, timestamp):
        return 'Invalid signature', 401

    # Process webhook
    event = request.json
    event_type = event.get('type')
    data = event.get('data', {})

    if (event_type == 'price.updated' and
        data.get('commodity') in ALERT_COMMODITIES and
        data.get('value', 0) >= PRICE_THRESHOLD):
        try:
            send_slack_alert(data)
            print(f"Alert sent for {data['commodity']}: ${data['value']}")
        except Exception as e:
            print(f"Error sending Slack alert: {e}")

    return 'OK', 200

if __name__ == '__main__':
    app.run(port=3000)

Recipe 2: Trading Signal Generator

Generate buy/sell signals based on price movements and send to your trading system.

Overview

flowchart LR
    OPA[OilPriceAPI] -->|price.updated| WH[Signal Generator]
    WH -->|Calculate indicators| CALC[RSI, Moving Avg]
    CALC -->|Generate signal| SIG{Buy/Sell/Hold}
    SIG -->|Buy| BUY[Place Buy Order]
    SIG -->|Sell| SELL[Place Sell Order]
    SIG -->|Hold| HOLD[Log Only]

JavaScript Implementation

const express = require("express");
const crypto = require("crypto");

const app = express();
app.use(express.json());

// Price history for indicator calculation
const priceHistory = new Map(); // commodity -> price array
const HISTORY_SIZE = 14; // For RSI calculation

// Configuration
const WEBHOOK_SECRET = process.env.OILPRICE_WEBHOOK_SECRET;
const TRADING_API_URL = process.env.TRADING_API_URL;
const TRADING_API_KEY = process.env.TRADING_API_KEY;

// RSI thresholds
const RSI_OVERSOLD = 30;
const RSI_OVERBOUGHT = 70;

// Calculate RSI
function calculateRSI(prices) {
  if (prices.length < HISTORY_SIZE) return null;

  const recent = prices.slice(-HISTORY_SIZE);
  let gains = 0;
  let losses = 0;

  for (let i = 1; i < recent.length; i++) {
    const change = recent[i] - recent[i - 1];
    if (change >= 0) {
      gains += change;
    } else {
      losses += Math.abs(change);
    }
  }

  const avgGain = gains / (HISTORY_SIZE - 1);
  const avgLoss = losses / (HISTORY_SIZE - 1);

  if (avgLoss === 0) return 100;

  const rs = avgGain / avgLoss;
  return 100 - 100 / (1 + rs);
}

// Calculate Simple Moving Average
function calculateSMA(prices, period) {
  if (prices.length < period) return null;
  const recent = prices.slice(-period);
  return recent.reduce((a, b) => a + b, 0) / period;
}

// Generate trading signal
function generateSignal(commodity, currentPrice, rsi, sma20) {
  const signal = {
    commodity,
    price: currentPrice,
    rsi,
    sma20,
    action: "HOLD",
    confidence: 0,
    reason: "",
  };

  // RSI-based signals
  if (rsi !== null) {
    if (rsi < RSI_OVERSOLD) {
      signal.action = "BUY";
      signal.confidence = Math.min((RSI_OVERSOLD - rsi) / RSI_OVERSOLD, 1);
      signal.reason = `RSI oversold at ${rsi.toFixed(2)}`;
    } else if (rsi > RSI_OVERBOUGHT) {
      signal.action = "SELL";
      signal.confidence = Math.min(
        (rsi - RSI_OVERBOUGHT) / (100 - RSI_OVERBOUGHT),
        1,
      );
      signal.reason = `RSI overbought at ${rsi.toFixed(2)}`;
    }
  }

  // SMA crossover confirmation
  if (sma20 !== null && signal.action !== "HOLD") {
    if (signal.action === "BUY" && currentPrice > sma20) {
      signal.confidence = Math.min(signal.confidence + 0.2, 1);
      signal.reason += ", price above SMA20";
    } else if (signal.action === "SELL" && currentPrice < sma20) {
      signal.confidence = Math.min(signal.confidence + 0.2, 1);
      signal.reason += ", price below SMA20";
    }
  }

  return signal;
}

// Execute trade via trading API
async function executeTrade(signal) {
  if (signal.action === "HOLD" || signal.confidence < 0.5) {
    console.log(
      `Signal: ${signal.action} ${signal.commodity} (confidence: ${signal.confidence.toFixed(2)}) - Not executing`,
    );
    return null;
  }

  const order = {
    symbol: signal.commodity,
    side: signal.action,
    type: "MARKET",
    quantity: calculatePositionSize(signal),
    metadata: {
      rsi: signal.rsi,
      confidence: signal.confidence,
      reason: signal.reason,
    },
  };

  console.log(
    `Executing ${signal.action} order for ${signal.commodity}:`,
    order,
  );

  const response = await fetch(`${TRADING_API_URL}/orders`, {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
      Authorization: `Bearer ${TRADING_API_KEY}`,
    },
    body: JSON.stringify(order),
  });

  return response.json();
}

function calculatePositionSize(signal) {
  // Scale position size by confidence
  const baseSize = 100;
  return Math.floor(baseSize * signal.confidence);
}

// Webhook endpoint
app.post("/webhook/trading", async (req, res) => {
  const signature = req.headers["x-oilpriceapi-signature"];
  const timestamp = req.headers["x-oilpriceapi-signature-timestamp"];

  // Verify signature (implementation same as Recipe 1)
  // ...

  res.status(200).send("OK");

  try {
    const { type, data } = req.body;

    if (type !== "price.updated") return;

    const commodity = data.commodity;
    const price = data.value;

    // Update price history
    if (!priceHistory.has(commodity)) {
      priceHistory.set(commodity, []);
    }
    const history = priceHistory.get(commodity);
    history.push(price);

    // Keep history bounded
    if (history.length > 100) {
      history.shift();
    }

    // Calculate indicators
    const rsi = calculateRSI(history);
    const sma20 = calculateSMA(history, 20);

    // Generate and potentially execute signal
    const signal = generateSignal(commodity, price, rsi, sma20);

    if (signal.action !== "HOLD") {
      const result = await executeTrade(signal);
      console.log("Trade result:", result);
    }
  } catch (error) {
    console.error("Error processing trading signal:", error);
  }
});

app.listen(3000);

Python Implementation

from flask import Flask, request
from collections import defaultdict
import requests
import os

app = Flask(__name__)

# Price history for indicators
price_history = defaultdict(list)
HISTORY_SIZE = 14

# Configuration
TRADING_API_URL = os.environ['TRADING_API_URL']
TRADING_API_KEY = os.environ['TRADING_API_KEY']
RSI_OVERSOLD = 30
RSI_OVERBOUGHT = 70

def calculate_rsi(prices):
    """Calculate Relative Strength Index."""
    if len(prices) < HISTORY_SIZE:
        return None

    recent = prices[-HISTORY_SIZE:]
    gains = []
    losses = []

    for i in range(1, len(recent)):
        change = recent[i] - recent[i - 1]
        if change >= 0:
            gains.append(change)
            losses.append(0)
        else:
            gains.append(0)
            losses.append(abs(change))

    avg_gain = sum(gains) / len(gains)
    avg_loss = sum(losses) / len(losses)

    if avg_loss == 0:
        return 100

    rs = avg_gain / avg_loss
    return 100 - (100 / (1 + rs))

def calculate_sma(prices, period):
    """Calculate Simple Moving Average."""
    if len(prices) < period:
        return None
    return sum(prices[-period:]) / period

def generate_signal(commodity, current_price, rsi, sma20):
    """Generate trading signal based on indicators."""
    signal = {
        'commodity': commodity,
        'price': current_price,
        'rsi': rsi,
        'sma20': sma20,
        'action': 'HOLD',
        'confidence': 0,
        'reason': ''
    }

    if rsi is not None:
        if rsi < RSI_OVERSOLD:
            signal['action'] = 'BUY'
            signal['confidence'] = min((RSI_OVERSOLD - rsi) / RSI_OVERSOLD, 1)
            signal['reason'] = f'RSI oversold at {rsi:.2f}'
        elif rsi > RSI_OVERBOUGHT:
            signal['action'] = 'SELL'
            signal['confidence'] = min((rsi - RSI_OVERBOUGHT) / (100 - RSI_OVERBOUGHT), 1)
            signal['reason'] = f'RSI overbought at {rsi:.2f}'

    # SMA confirmation
    if sma20 is not None and signal['action'] != 'HOLD':
        if signal['action'] == 'BUY' and current_price > sma20:
            signal['confidence'] = min(signal['confidence'] + 0.2, 1)
            signal['reason'] += ', price above SMA20'
        elif signal['action'] == 'SELL' and current_price < sma20:
            signal['confidence'] = min(signal['confidence'] + 0.2, 1)
            signal['reason'] += ', price below SMA20'

    return signal

def execute_trade(signal):
    """Execute trade via trading API."""
    if signal['action'] == 'HOLD' or signal['confidence'] < 0.5:
        print(f"Signal: {signal['action']} {signal['commodity']} (confidence: {signal['confidence']:.2f}) - Not executing")
        return None

    order = {
        'symbol': signal['commodity'],
        'side': signal['action'],
        'type': 'MARKET',
        'quantity': int(100 * signal['confidence']),
        'metadata': {
            'rsi': signal['rsi'],
            'confidence': signal['confidence'],
            'reason': signal['reason']
        }
    }

    print(f"Executing {signal['action']} order for {signal['commodity']}: {order}")

    response = requests.post(
        f"{TRADING_API_URL}/orders",
        json=order,
        headers={'Authorization': f'Bearer {TRADING_API_KEY}'}
    )
    return response.json()

@app.route('/webhook/trading', methods=['POST'])
def handle_webhook():
    # Verify signature (same as Recipe 1)
    # ...

    event = request.json
    if event.get('type') != 'price.updated':
        return 'OK', 200

    data = event.get('data', {})
    commodity = data.get('commodity')
    price = data.get('value')

    # Update history
    price_history[commodity].append(price)
    if len(price_history[commodity]) > 100:
        price_history[commodity].pop(0)

    # Calculate indicators
    history = price_history[commodity]
    rsi = calculate_rsi(history)
    sma20 = calculate_sma(history, 20)

    # Generate signal
    signal = generate_signal(commodity, price, rsi, sma20)

    if signal['action'] != 'HOLD':
        try:
            result = execute_trade(signal)
            print(f"Trade result: {result}")
        except Exception as e:
            print(f"Trade execution error: {e}")

    return 'OK', 200

if __name__ == '__main__':
    app.run(port=3000)

Recipe 3: Drilling Activity Dashboard

Build a real-time dashboard for drilling activity monitoring.

Overview

flowchart LR
    OPA[OilPriceAPI] -->|drilling events| WH[Dashboard Backend]
    WH -->|Update database| DB[(PostgreSQL)]
    WH -->|Broadcast| WS[WebSocket Server]
    WS -->|Push updates| DASH[Dashboard UI]

JavaScript Implementation

const express = require("express");
const { Server } = require("socket.io");
const { Pool } = require("pg");
const http = require("http");

const app = express();
const server = http.createServer(app);
const io = new Server(server, { cors: { origin: "*" } });

app.use(express.json());

// Database connection
const pool = new Pool({
  connectionString: process.env.DATABASE_URL,
});

// Initialize database tables
async function initDatabase() {
  await pool.query(`
    CREATE TABLE IF NOT EXISTS rig_counts (
      id SERIAL PRIMARY KEY,
      region VARCHAR(100),
      value INTEGER,
      oil_rigs INTEGER,
      gas_rigs INTEGER,
      change INTEGER,
      change_percent DECIMAL(5,2),
      source VARCHAR(100),
      recorded_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
    );

    CREATE TABLE IF NOT EXISTS well_permits (
      id SERIAL PRIMARY KEY,
      api_number VARCHAR(50) UNIQUE,
      state_code VARCHAR(2),
      county VARCHAR(100),
      operator_name VARCHAR(200),
      well_name VARCHAR(200),
      permit_type VARCHAR(50),
      formation VARCHAR(100),
      latitude DECIMAL(10,6),
      longitude DECIMAL(10,6),
      permit_date DATE,
      recorded_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
    );

    CREATE INDEX IF NOT EXISTS idx_rig_counts_recorded ON rig_counts(recorded_at);
    CREATE INDEX IF NOT EXISTS idx_permits_state ON well_permits(state_code);
  `);
}

// Store rig count data
async function storeRigCount(data) {
  const result = await pool.query(
    `
    INSERT INTO rig_counts (region, value, oil_rigs, gas_rigs, change, change_percent, source)
    VALUES ($1, $2, $3, $4, $5, $6, $7)
    RETURNING *
  `,
    [
      data.region,
      data.value,
      data.breakdown?.oil_rigs || 0,
      data.breakdown?.gas_rigs || 0,
      data.change,
      data.change_percent,
      data.source,
    ],
  );
  return result.rows[0];
}

// Store well permit data
async function storeWellPermit(data) {
  const result = await pool.query(
    `
    INSERT INTO well_permits
      (api_number, state_code, county, operator_name, well_name,
       permit_type, formation, latitude, longitude, permit_date)
    VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
    ON CONFLICT (api_number) DO UPDATE SET
      operator_name = EXCLUDED.operator_name,
      permit_type = EXCLUDED.permit_type
    RETURNING *
  `,
    [
      data.api_number,
      data.state_code,
      data.location?.county || data.county,
      data.operator?.name,
      data.well?.name,
      data.permit_type,
      data.target?.formation,
      data.location?.latitude,
      data.location?.longitude,
      data.permit_date,
    ],
  );
  return result.rows[0];
}

// Get dashboard statistics
async function getDashboardStats() {
  const [rigCounts, permitStats, recentPermits] = await Promise.all([
    pool.query(`
      SELECT DISTINCT ON (region) *
      FROM rig_counts
      ORDER BY region, recorded_at DESC
    `),
    pool.query(`
      SELECT state_code, COUNT(*) as count
      FROM well_permits
      WHERE permit_date >= NOW() - INTERVAL '30 days'
      GROUP BY state_code
      ORDER BY count DESC
      LIMIT 10
    `),
    pool.query(`
      SELECT * FROM well_permits
      ORDER BY recorded_at DESC
      LIMIT 20
    `),
  ]);

  return {
    rigCounts: rigCounts.rows,
    permitsByState: permitStats.rows,
    recentPermits: recentPermits.rows,
  };
}

// Webhook endpoint
app.post("/webhook/drilling", async (req, res) => {
  // Verify signature...
  res.status(200).send("OK");

  try {
    const { type, data } = req.body;
    let stored;

    switch (type) {
      case "drilling.rig_count.updated":
        stored = await storeRigCount(data);
        io.emit("rig_count_update", stored);
        console.log(`Rig count update: ${data.region} = ${data.value}`);
        break;

      case "drilling.well_permit.new":
        stored = await storeWellPermit(data);
        io.emit("new_permit", stored);
        console.log(`New permit: ${data.api_number} in ${data.state_code}`);
        break;

      case "drilling.frac_spread.updated":
        io.emit("frac_spread_update", data);
        console.log(`Frac spread update: ${data.region} = ${data.value}`);
        break;
    }
  } catch (error) {
    console.error("Error processing drilling webhook:", error);
  }
});

// API endpoint for dashboard initial load
app.get("/api/dashboard", async (req, res) => {
  const stats = await getDashboardStats();
  res.json(stats);
});

// WebSocket connection handling
io.on("connection", async (socket) => {
  console.log("Dashboard client connected");

  // Send initial data
  const stats = await getDashboardStats();
  socket.emit("initial_data", stats);

  socket.on("disconnect", () => {
    console.log("Dashboard client disconnected");
  });
});

initDatabase().then(() => {
  server.listen(3000, () =>
    console.log("Dashboard server running on port 3000"),
  );
});

Python Implementation

from flask import Flask, request, jsonify
from flask_socketio import SocketIO, emit
import psycopg2
from psycopg2.extras import RealDictCursor
import os

app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins="*")

# Database connection
def get_db():
    return psycopg2.connect(
        os.environ['DATABASE_URL'],
        cursor_factory=RealDictCursor
    )

def init_database():
    """Initialize database tables."""
    conn = get_db()
    cur = conn.cursor()
    cur.execute("""
        CREATE TABLE IF NOT EXISTS rig_counts (
            id SERIAL PRIMARY KEY,
            region VARCHAR(100),
            value INTEGER,
            oil_rigs INTEGER,
            gas_rigs INTEGER,
            change INTEGER,
            change_percent DECIMAL(5,2),
            source VARCHAR(100),
            recorded_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
        );

        CREATE TABLE IF NOT EXISTS well_permits (
            id SERIAL PRIMARY KEY,
            api_number VARCHAR(50) UNIQUE,
            state_code VARCHAR(2),
            county VARCHAR(100),
            operator_name VARCHAR(200),
            well_name VARCHAR(200),
            permit_type VARCHAR(50),
            formation VARCHAR(100),
            latitude DECIMAL(10,6),
            longitude DECIMAL(10,6),
            permit_date DATE,
            recorded_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
        );
    """)
    conn.commit()
    cur.close()
    conn.close()

def store_rig_count(data):
    """Store rig count in database."""
    conn = get_db()
    cur = conn.cursor()
    cur.execute("""
        INSERT INTO rig_counts (region, value, oil_rigs, gas_rigs, change, change_percent, source)
        VALUES (%s, %s, %s, %s, %s, %s, %s)
        RETURNING *
    """, (
        data.get('region'),
        data.get('value'),
        data.get('breakdown', {}).get('oil_rigs', 0),
        data.get('breakdown', {}).get('gas_rigs', 0),
        data.get('change'),
        data.get('change_percent'),
        data.get('source')
    ))
    result = cur.fetchone()
    conn.commit()
    cur.close()
    conn.close()
    return result

def store_well_permit(data):
    """Store well permit in database."""
    conn = get_db()
    cur = conn.cursor()

    location = data.get('location', {})
    operator = data.get('operator', {})
    well = data.get('well', {})
    target = data.get('target', {})

    cur.execute("""
        INSERT INTO well_permits
            (api_number, state_code, county, operator_name, well_name,
             permit_type, formation, latitude, longitude, permit_date)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT (api_number) DO UPDATE SET
            operator_name = EXCLUDED.operator_name,
            permit_type = EXCLUDED.permit_type
        RETURNING *
    """, (
        data.get('api_number'),
        data.get('state_code'),
        location.get('county', data.get('county')),
        operator.get('name'),
        well.get('name'),
        data.get('permit_type'),
        target.get('formation'),
        location.get('latitude'),
        location.get('longitude'),
        data.get('permit_date')
    ))
    result = cur.fetchone()
    conn.commit()
    cur.close()
    conn.close()
    return result

def get_dashboard_stats():
    """Get current dashboard statistics."""
    conn = get_db()
    cur = conn.cursor()

    # Get latest rig counts by region
    cur.execute("""
        SELECT DISTINCT ON (region) *
        FROM rig_counts
        ORDER BY region, recorded_at DESC
    """)
    rig_counts = cur.fetchall()

    # Get permits by state (last 30 days)
    cur.execute("""
        SELECT state_code, COUNT(*) as count
        FROM well_permits
        WHERE permit_date >= NOW() - INTERVAL '30 days'
        GROUP BY state_code
        ORDER BY count DESC
        LIMIT 10
    """)
    permits_by_state = cur.fetchall()

    # Get recent permits
    cur.execute("""
        SELECT * FROM well_permits
        ORDER BY recorded_at DESC
        LIMIT 20
    """)
    recent_permits = cur.fetchall()

    cur.close()
    conn.close()

    return {
        'rigCounts': rig_counts,
        'permitsByState': permits_by_state,
        'recentPermits': recent_permits
    }

@app.route('/webhook/drilling', methods=['POST'])
def handle_webhook():
    # Verify signature...

    event = request.json
    event_type = event.get('type')
    data = event.get('data', {})

    if event_type == 'drilling.rig_count.updated':
        stored = store_rig_count(data)
        socketio.emit('rig_count_update', stored)
        print(f"Rig count update: {data.get('region')} = {data.get('value')}")

    elif event_type == 'drilling.well_permit.new':
        stored = store_well_permit(data)
        socketio.emit('new_permit', stored)
        print(f"New permit: {data.get('api_number')} in {data.get('state_code')}")

    elif event_type == 'drilling.frac_spread.updated':
        socketio.emit('frac_spread_update', data)
        print(f"Frac spread update: {data.get('region')} = {data.get('value')}")

    return 'OK', 200

@app.route('/api/dashboard')
def dashboard_api():
    stats = get_dashboard_stats()
    return jsonify(stats)

@socketio.on('connect')
def handle_connect():
    print('Dashboard client connected')
    stats = get_dashboard_stats()
    emit('initial_data', stats)

if __name__ == '__main__':
    init_database()
    socketio.run(app, port=3000)

Recipe 4: Supply Chain Cost Tracker

Automatically update logistics and procurement costs when fuel prices change.

JavaScript Implementation

const express = require("express");
const app = express();
app.use(express.json());

// Simulated cost centers with fuel exposure
const costCenters = {
  LOGISTICS_FLEET: {
    name: "Logistics Fleet",
    fuelType: "diesel",
    monthlyGallons: 50000,
    commodityCode: "DIESEL_USD",
    stakeholders: ["logistics@company.com", "finance@company.com"],
  },
  AIR_FREIGHT: {
    name: "Air Freight",
    fuelType: "jet_fuel",
    monthlyGallons: 20000,
    commodityCode: "JET_FUEL_USD",
    stakeholders: ["supply-chain@company.com"],
  },
  MARINE_SHIPPING: {
    name: "Marine Shipping",
    fuelType: "bunker",
    monthlyGallons: 100000,
    commodityCode: "BUNKER_FUEL_USD",
    stakeholders: ["shipping@company.com"],
  },
};

// ERP integration mock
async function updateERPCostCenter(costCenter, newCost) {
  console.log(
    `ERP Update: ${costCenter.name} monthly cost = $${newCost.toFixed(2)}`,
  );

  // In production, call your ERP API
  // await fetch(`${ERP_API_URL}/cost-centers/${costCenter.id}`, {
  //   method: 'PATCH',
  //   body: JSON.stringify({ projected_monthly_cost: newCost })
  // });

  return { updated: true, newCost };
}

// Email notification
async function notifyStakeholders(costCenter, priceChange, costImpact) {
  const subject = `Fuel Cost Alert: ${costCenter.name}`;
  const body = `
    Fuel prices for ${costCenter.fuelType} have changed by ${priceChange.toFixed(2)}%.

    Impact on ${costCenter.name}:
    - Monthly gallons: ${costCenter.monthlyGallons.toLocaleString()}
    - Previous monthly cost: $${costImpact.previousCost.toLocaleString()}
    - New monthly cost: $${costImpact.newCost.toLocaleString()}
    - Cost change: $${costImpact.costChange.toLocaleString()} (${costImpact.changePercent.toFixed(2)}%)
  `;

  console.log(`Notifying: ${costCenter.stakeholders.join(", ")}`);
  console.log(body);

  // In production, send actual emails
  // await sendEmail(costCenter.stakeholders, subject, body);
}

// Store historical prices for cost change calculation
const priceHistory = new Map();

app.post("/webhook/supply-chain", async (req, res) => {
  res.status(200).send("OK");

  const { type, data } = req.body;

  if (type !== "price.updated") return;

  // Find affected cost centers
  const affectedCenters = Object.values(costCenters).filter(
    (cc) => cc.commodityCode === data.commodity,
  );

  if (affectedCenters.length === 0) return;

  // Get previous price for comparison
  const previousPrice = priceHistory.get(data.commodity) || data.previous_value;
  const currentPrice = data.value;
  const priceChangePercent =
    ((currentPrice - previousPrice) / previousPrice) * 100;

  // Update price history
  priceHistory.set(data.commodity, currentPrice);

  // Process each affected cost center
  for (const costCenter of affectedCenters) {
    const previousCost = previousPrice * costCenter.monthlyGallons;
    const newCost = currentPrice * costCenter.monthlyGallons;
    const costChange = newCost - previousCost;

    const costImpact = {
      previousCost,
      newCost,
      costChange,
      changePercent: (costChange / previousCost) * 100,
    };

    // Update ERP
    await updateERPCostCenter(costCenter, newCost);

    // Notify if significant change (>2%)
    if (Math.abs(costImpact.changePercent) >= 2) {
      await notifyStakeholders(costCenter, priceChangePercent, costImpact);
    }

    console.log(
      `${costCenter.name}: Cost ${costChange >= 0 ? "increased" : "decreased"} by $${Math.abs(costChange).toFixed(2)}`,
    );
  }
});

app.listen(3000);

Recipe 5: Audit Log System

Store all webhook events for compliance and analysis.

JavaScript Implementation

const express = require("express");
const { Pool } = require("pg");
const crypto = require("crypto");

const app = express();
app.use(express.json({ limit: "10mb" })); // Large payloads possible

const pool = new Pool({ connectionString: process.env.DATABASE_URL });

// Initialize audit tables
async function initAuditTables() {
  await pool.query(`
    CREATE TABLE IF NOT EXISTS webhook_audit_log (
      id SERIAL PRIMARY KEY,
      event_id VARCHAR(50) UNIQUE NOT NULL,
      event_type VARCHAR(100) NOT NULL,
      payload JSONB NOT NULL,
      signature VARCHAR(64) NOT NULL,
      timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
      received_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
      processed_at TIMESTAMP WITH TIME ZONE,
      processing_status VARCHAR(20) DEFAULT 'pending',
      processing_error TEXT,
      payload_hash VARCHAR(64) NOT NULL
    );

    CREATE INDEX IF NOT EXISTS idx_audit_event_type ON webhook_audit_log(event_type);
    CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON webhook_audit_log(timestamp);
    CREATE INDEX IF NOT EXISTS idx_audit_received ON webhook_audit_log(received_at);

    -- Partition by month for performance (PostgreSQL 12+)
    -- CREATE TABLE webhook_audit_log_2025_01 PARTITION OF webhook_audit_log
    --   FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
  `);
}

// Calculate payload hash for integrity verification
function calculatePayloadHash(payload) {
  return crypto
    .createHash("sha256")
    .update(JSON.stringify(payload))
    .digest("hex");
}

// Store webhook event
async function storeAuditLog(
  eventId,
  eventType,
  payload,
  signature,
  timestamp,
) {
  const payloadHash = calculatePayloadHash(payload);

  try {
    await pool.query(
      `
      INSERT INTO webhook_audit_log
        (event_id, event_type, payload, signature, timestamp, payload_hash)
      VALUES ($1, $2, $3, $4, $5, $6)
      ON CONFLICT (event_id) DO NOTHING
    `,
      [eventId, eventType, payload, signature, timestamp, payloadHash],
    );

    return { stored: true, duplicate: false };
  } catch (error) {
    if (error.code === "23505") {
      // Unique violation
      return { stored: false, duplicate: true };
    }
    throw error;
  }
}

// Update processing status
async function updateProcessingStatus(eventId, status, error = null) {
  await pool.query(
    `
    UPDATE webhook_audit_log
    SET processing_status = $1,
        processed_at = NOW(),
        processing_error = $2
    WHERE event_id = $3
  `,
    [status, error, eventId],
  );
}

// Query audit log
async function queryAuditLog(filters) {
  let query = `
    SELECT event_id, event_type, timestamp, processing_status, received_at
    FROM webhook_audit_log
    WHERE 1=1
  `;
  const params = [];
  let paramIndex = 1;

  if (filters.eventType) {
    query += ` AND event_type = $${paramIndex++}`;
    params.push(filters.eventType);
  }

  if (filters.startDate) {
    query += ` AND timestamp >= $${paramIndex++}`;
    params.push(filters.startDate);
  }

  if (filters.endDate) {
    query += ` AND timestamp <= $${paramIndex++}`;
    params.push(filters.endDate);
  }

  if (filters.status) {
    query += ` AND processing_status = $${paramIndex++}`;
    params.push(filters.status);
  }

  query += ` ORDER BY timestamp DESC LIMIT 100`;

  const result = await pool.query(query, params);
  return result.rows;
}

// Webhook endpoint with full audit logging
app.post("/webhook/audit", async (req, res) => {
  const eventId = req.headers["x-oilpriceapi-event-id"];
  const eventType = req.headers["x-oilpriceapi-event"];
  const signature = req.headers["x-oilpriceapi-signature"];
  const timestamp = req.headers["x-oilpriceapi-signature-timestamp"];

  // Store immediately, before processing
  const { stored, duplicate } = await storeAuditLog(
    eventId,
    eventType,
    req.body,
    signature,
    new Date(parseInt(timestamp) * 1000),
  );

  if (duplicate) {
    console.log(`Duplicate event received: ${eventId}`);
    return res.status(200).send("OK");
  }

  // Acknowledge receipt
  res.status(200).send("OK");

  // Process asynchronously
  try {
    // Your business logic here...
    await processEvent(req.body);

    await updateProcessingStatus(eventId, "completed");
    console.log(`Event ${eventId} processed successfully`);
  } catch (error) {
    await updateProcessingStatus(eventId, "failed", error.message);
    console.error(`Event ${eventId} processing failed:`, error);
  }
});

// API to query audit logs
app.get("/api/audit", async (req, res) => {
  const events = await queryAuditLog({
    eventType: req.query.type,
    startDate: req.query.start,
    endDate: req.query.end,
    status: req.query.status,
  });
  res.json({ events, count: events.length });
});

// API to get specific event details
app.get("/api/audit/:eventId", async (req, res) => {
  const result = await pool.query(
    "SELECT * FROM webhook_audit_log WHERE event_id = $1",
    [req.params.eventId],
  );
  if (result.rows.length === 0) {
    return res.status(404).json({ error: "Event not found" });
  }
  res.json(result.rows[0]);
});

// API to verify payload integrity
app.get("/api/audit/:eventId/verify", async (req, res) => {
  const result = await pool.query(
    "SELECT payload, payload_hash FROM webhook_audit_log WHERE event_id = $1",
    [req.params.eventId],
  );

  if (result.rows.length === 0) {
    return res.status(404).json({ error: "Event not found" });
  }

  const { payload, payload_hash } = result.rows[0];
  const calculatedHash = calculatePayloadHash(payload);
  const valid = calculatedHash === payload_hash;

  res.json({
    eventId: req.params.eventId,
    integrityValid: valid,
    storedHash: payload_hash,
    calculatedHash,
  });
});

async function processEvent(event) {
  // Simulate processing
  await new Promise((r) => setTimeout(r, 100));
}

initAuditTables().then(() => {
  app.listen(3000, () => console.log("Audit server running on port 3000"));
});

Recipe 6: Multi-Tenant Router

Route webhook events to different customer endpoints based on configuration.

JavaScript Implementation

const express = require("express");
const crypto = require("crypto");
const app = express();
app.use(express.json());

// Tenant configuration (in production, load from database)
const tenants = {
  tenant_acme: {
    name: "Acme Corp",
    webhookUrl: "https://acme.example.com/oilprice-webhook",
    webhookSecret: "acme_secret_123",
    commodityFilters: ["BRENT_CRUDE_USD", "WTI_USD"],
    eventFilters: ["price.updated", "price.significant_change"],
    retryEnabled: true,
  },
  tenant_globex: {
    name: "Globex Inc",
    webhookUrl: "https://api.globex.example.com/hooks/oil",
    webhookSecret: "globex_secret_456",
    commodityFilters: ["NATURAL_GAS_USD"],
    eventFilters: ["price.updated"],
    retryEnabled: true,
  },
  tenant_initech: {
    name: "Initech",
    webhookUrl: "https://initech.example.com/api/webhooks",
    webhookSecret: "initech_secret_789",
    commodityFilters: [], // All commodities
    eventFilters: ["drilling.rig_count.updated", "drilling.well_permit.new"],
    stateFilters: ["TX", "OK"],
    retryEnabled: false,
  },
};

// Check if event matches tenant filters
function eventMatchesTenant(event, tenant) {
  // Check event type filter
  if (
    tenant.eventFilters.length > 0 &&
    !tenant.eventFilters.includes(event.type)
  ) {
    return false;
  }

  // Check commodity filter
  if (
    tenant.commodityFilters.length > 0 &&
    event.data?.commodity &&
    !tenant.commodityFilters.includes(event.data.commodity)
  ) {
    return false;
  }

  // Check state filter (for drilling events)
  if (
    tenant.stateFilters &&
    tenant.stateFilters.length > 0 &&
    event.data?.state_code &&
    !tenant.stateFilters.includes(event.data.state_code)
  ) {
    return false;
  }

  return true;
}

// Sign payload for tenant
function signPayloadForTenant(payload, secret) {
  const timestamp = Math.floor(Date.now() / 1000);
  const message = `${JSON.stringify(payload)}.${timestamp}`;
  const signature = crypto
    .createHmac("sha256", secret)
    .update(message)
    .digest("hex");
  return { signature, timestamp };
}

// Forward event to tenant
async function forwardToTenant(tenant, event) {
  const { signature, timestamp } = signPayloadForTenant(
    event,
    tenant.webhookSecret,
  );

  const response = await fetch(tenant.webhookUrl, {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
      "X-OilPriceAPI-Event": event.type,
      "X-OilPriceAPI-Event-ID": event.id,
      "X-OilPriceAPI-Signature": signature,
      "X-OilPriceAPI-Signature-Timestamp": timestamp.toString(),
      "X-Forwarded-By": "MultiTenantRouter",
    },
    body: JSON.stringify(event),
    signal: AbortSignal.timeout(30000),
  });

  return {
    success: response.ok,
    statusCode: response.status,
    tenantName: tenant.name,
  };
}

// Retry failed delivery
async function retryDelivery(tenant, event, attempts = 3) {
  const delays = [1000, 5000, 15000];

  for (let i = 0; i < attempts; i++) {
    try {
      const result = await forwardToTenant(tenant, event);
      if (result.success) {
        return result;
      }
    } catch (error) {
      console.error(
        `Delivery attempt ${i + 1} failed for ${tenant.name}:`,
        error.message,
      );
    }

    if (i < attempts - 1) {
      await new Promise((r) => setTimeout(r, delays[i]));
    }
  }

  return {
    success: false,
    tenantName: tenant.name,
    error: "Max retries exceeded",
  };
}

// Main webhook endpoint
app.post("/webhook/router", async (req, res) => {
  // Verify signature from OilPriceAPI (same as other recipes)
  // ...

  // Acknowledge immediately
  res.status(200).send("OK");

  const event = req.body;

  // Route to matching tenants in parallel
  const deliveryPromises = Object.entries(tenants)
    .filter(([_, tenant]) => eventMatchesTenant(event, tenant))
    .map(async ([tenantId, tenant]) => {
      console.log(`Routing ${event.type} to ${tenant.name}`);

      if (tenant.retryEnabled) {
        return retryDelivery(tenant, event);
      } else {
        try {
          return await forwardToTenant(tenant, event);
        } catch (error) {
          return {
            success: false,
            tenantName: tenant.name,
            error: error.message,
          };
        }
      }
    });

  // Wait for all deliveries
  const results = await Promise.allSettled(deliveryPromises);

  // Log results
  results.forEach((result, index) => {
    if (result.status === "fulfilled") {
      const { success, tenantName, error } = result.value;
      if (success) {
        console.log(`Delivered to ${tenantName}`);
      } else {
        console.error(`Failed to deliver to ${tenantName}: ${error}`);
      }
    }
  });
});

// API to manage tenant configuration
app.get("/api/tenants", (req, res) => {
  const summary = Object.entries(tenants).map(([id, t]) => ({
    id,
    name: t.name,
    eventFilters: t.eventFilters,
    commodityFilters: t.commodityFilters,
  }));
  res.json({ tenants: summary });
});

app.post("/api/tenants/:id/test", async (req, res) => {
  const tenant = tenants[req.params.id];
  if (!tenant) {
    return res.status(404).json({ error: "Tenant not found" });
  }

  const testEvent = {
    id: `test_${Date.now()}`,
    type: "test.ping",
    data: { message: "Test webhook delivery" },
  };

  try {
    const result = await forwardToTenant(tenant, testEvent);
    res.json(result);
  } catch (error) {
    res.json({ success: false, error: error.message });
  }
});

app.listen(3000, () => console.log("Multi-tenant router running on port 3000"));

Deployment Considerations

Environment Variables

All recipes require these environment variables:

# OilPriceAPI
OILPRICE_WEBHOOK_SECRET=whsec_your_secret_here

# Recipe-specific
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/xxx
TRADING_API_URL=https://api.yourbroker.com
TRADING_API_KEY=your_trading_api_key
DATABASE_URL=postgres://user:pass@host:5432/dbname

Docker Deployment

FROM node:20-slim

WORKDIR /app
COPY package*.json ./
RUN npm ci --only=production

COPY . .

EXPOSE 3000
CMD ["node", "server.js"]

Health Checks

Add health check endpoints to all recipes:

app.get("/health", (req, res) => {
  res.json({ status: "healthy", timestamp: new Date().toISOString() });
});

app.get("/ready", async (req, res) => {
  // Check dependencies
  try {
    await pool.query("SELECT 1"); // Database check
    res.json({ status: "ready" });
  } catch (error) {
    res.status(503).json({ status: "not_ready", error: error.message });
  }
});

Related Documentation

  • Webhook API Reference - Event types and configuration
  • Architecture - System design and security
  • Troubleshooting - Common issues
Last Updated: 2/3/26, 1:30 AM