Webhook Cookbook
This cookbook provides production-ready recipes for common webhook integrations. Each recipe includes complete code in JavaScript and Python, tested patterns, and deployment considerations.
Recipe 1: Slack Price Alerts
Post price alerts to a Slack channel when oil exceeds a threshold.
Overview
flowchart LR
OPA[OilPriceAPI] -->|price.updated| WH[Your Webhook]
WH -->|Check threshold| LOGIC{Price > $80?}
LOGIC -->|Yes| SLACK[Slack API]
LOGIC -->|No| DROP[Ignore]
SLACK --> CH[#oil-alerts]
JavaScript Implementation
const express = require("express");
const crypto = require("crypto");
const app = express();
app.use(express.json());
// Configuration
const WEBHOOK_SECRET = process.env.OILPRICE_WEBHOOK_SECRET;
const SLACK_WEBHOOK_URL = process.env.SLACK_WEBHOOK_URL;
const PRICE_THRESHOLD = 80.0;
const ALERT_COMMODITIES = ["BRENT_CRUDE_USD", "WTI_USD"];
// Signature verification
function verifySignature(payload, signature, timestamp) {
if (Date.now() - parseInt(timestamp) * 1000 > 300000) {
return false;
}
const expected = crypto
.createHmac("sha256", WEBHOOK_SECRET)
.update(`${JSON.stringify(payload)}.${timestamp}`)
.digest("hex");
return crypto.timingSafeEqual(Buffer.from(expected), Buffer.from(signature));
}
// Format Slack message
function formatSlackMessage(price) {
const emoji =
price.change_percent >= 0
? ":chart_with_upwards_trend:"
: ":chart_with_downwards_trend:";
const color = price.change_percent >= 0 ? "#36a64f" : "#dc3545";
return {
attachments: [
{
color: color,
blocks: [
{
type: "header",
text: {
type: "plain_text",
text: `${emoji} Price Alert: ${price.name}`,
emoji: true,
},
},
{
type: "section",
fields: [
{
type: "mrkdwn",
text: `*Current Price:*\n$${price.value.toFixed(2)}`,
},
{
type: "mrkdwn",
text: `*Change:*\n${price.change_percent >= 0 ? "+" : ""}${price.change_percent.toFixed(2)}%`,
},
{
type: "mrkdwn",
text: `*Threshold:*\n$${PRICE_THRESHOLD.toFixed(2)}`,
},
{
type: "mrkdwn",
text: `*Time:*\n${new Date(price.timestamp).toLocaleString()}`,
},
],
},
],
},
],
};
}
// Send to Slack
async function sendSlackAlert(price) {
const message = formatSlackMessage(price);
const response = await fetch(SLACK_WEBHOOK_URL, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(message),
});
if (!response.ok) {
throw new Error(`Slack API error: ${response.status}`);
}
}
// Webhook endpoint
app.post("/webhook/prices", async (req, res) => {
// Verify signature
const signature = req.headers["x-oilpriceapi-signature"];
const timestamp = req.headers["x-oilpriceapi-signature-timestamp"];
if (!verifySignature(req.body, signature, timestamp)) {
return res.status(401).send("Invalid signature");
}
// Acknowledge immediately
res.status(200).send("OK");
// Process asynchronously
try {
const { type, data } = req.body;
if (
type === "price.updated" &&
ALERT_COMMODITIES.includes(data.commodity) &&
data.value >= PRICE_THRESHOLD
) {
await sendSlackAlert(data);
console.log(`Alert sent for ${data.commodity}: $${data.value}`);
}
} catch (error) {
console.error("Error processing webhook:", error);
}
});
app.listen(3000, () => console.log("Webhook server running on port 3000"));
Python Implementation
from flask import Flask, request, jsonify
import hmac
import hashlib
import time
import json
import requests
import os
app = Flask(__name__)
# Configuration
WEBHOOK_SECRET = os.environ['OILPRICE_WEBHOOK_SECRET']
SLACK_WEBHOOK_URL = os.environ['SLACK_WEBHOOK_URL']
PRICE_THRESHOLD = 80.00
ALERT_COMMODITIES = ['BRENT_CRUDE_USD', 'WTI_USD']
def verify_signature(payload, signature, timestamp):
"""Verify OilPriceAPI webhook signature."""
if int(time.time()) - int(timestamp) > 300:
return False
payload_string = json.dumps(payload, separators=(',', ':'))
expected = hmac.new(
WEBHOOK_SECRET.encode(),
f"{payload_string}.{timestamp}".encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
def format_slack_message(price):
"""Format price data as Slack message."""
emoji = ':chart_with_upwards_trend:' if price['change_percent'] >= 0 else ':chart_with_downwards_trend:'
color = '#36a64f' if price['change_percent'] >= 0 else '#dc3545'
return {
'attachments': [{
'color': color,
'blocks': [
{
'type': 'header',
'text': {
'type': 'plain_text',
'text': f"{emoji} Price Alert: {price['name']}",
'emoji': True
}
},
{
'type': 'section',
'fields': [
{'type': 'mrkdwn', 'text': f"*Current Price:*\n${price['value']:.2f}"},
{'type': 'mrkdwn', 'text': f"*Change:*\n{'+' if price['change_percent'] >= 0 else ''}{price['change_percent']:.2f}%"},
{'type': 'mrkdwn', 'text': f"*Threshold:*\n${PRICE_THRESHOLD:.2f}"},
{'type': 'mrkdwn', 'text': f"*Time:*\n{price['timestamp']}"}
]
}
]
}]
}
def send_slack_alert(price):
"""Send alert to Slack."""
message = format_slack_message(price)
response = requests.post(SLACK_WEBHOOK_URL, json=message)
response.raise_for_status()
@app.route('/webhook/prices', methods=['POST'])
def handle_webhook():
# Verify signature
signature = request.headers.get('X-OilPriceAPI-Signature')
timestamp = request.headers.get('X-OilPriceAPI-Signature-Timestamp')
if not verify_signature(request.json, signature, timestamp):
return 'Invalid signature', 401
# Process webhook
event = request.json
event_type = event.get('type')
data = event.get('data', {})
if (event_type == 'price.updated' and
data.get('commodity') in ALERT_COMMODITIES and
data.get('value', 0) >= PRICE_THRESHOLD):
try:
send_slack_alert(data)
print(f"Alert sent for {data['commodity']}: ${data['value']}")
except Exception as e:
print(f"Error sending Slack alert: {e}")
return 'OK', 200
if __name__ == '__main__':
app.run(port=3000)
Recipe 2: Trading Signal Generator
Generate buy/sell signals based on price movements and send to your trading system.
Overview
flowchart LR
OPA[OilPriceAPI] -->|price.updated| WH[Signal Generator]
WH -->|Calculate indicators| CALC[RSI, Moving Avg]
CALC -->|Generate signal| SIG{Buy/Sell/Hold}
SIG -->|Buy| BUY[Place Buy Order]
SIG -->|Sell| SELL[Place Sell Order]
SIG -->|Hold| HOLD[Log Only]
JavaScript Implementation
const express = require("express");
const crypto = require("crypto");
const app = express();
app.use(express.json());
// Price history for indicator calculation
const priceHistory = new Map(); // commodity -> price array
const HISTORY_SIZE = 14; // For RSI calculation
// Configuration
const WEBHOOK_SECRET = process.env.OILPRICE_WEBHOOK_SECRET;
const TRADING_API_URL = process.env.TRADING_API_URL;
const TRADING_API_KEY = process.env.TRADING_API_KEY;
// RSI thresholds
const RSI_OVERSOLD = 30;
const RSI_OVERBOUGHT = 70;
// Calculate RSI
function calculateRSI(prices) {
if (prices.length < HISTORY_SIZE) return null;
const recent = prices.slice(-HISTORY_SIZE);
let gains = 0;
let losses = 0;
for (let i = 1; i < recent.length; i++) {
const change = recent[i] - recent[i - 1];
if (change >= 0) {
gains += change;
} else {
losses += Math.abs(change);
}
}
const avgGain = gains / (HISTORY_SIZE - 1);
const avgLoss = losses / (HISTORY_SIZE - 1);
if (avgLoss === 0) return 100;
const rs = avgGain / avgLoss;
return 100 - 100 / (1 + rs);
}
// Calculate Simple Moving Average
function calculateSMA(prices, period) {
if (prices.length < period) return null;
const recent = prices.slice(-period);
return recent.reduce((a, b) => a + b, 0) / period;
}
// Generate trading signal
function generateSignal(commodity, currentPrice, rsi, sma20) {
const signal = {
commodity,
price: currentPrice,
rsi,
sma20,
action: "HOLD",
confidence: 0,
reason: "",
};
// RSI-based signals
if (rsi !== null) {
if (rsi < RSI_OVERSOLD) {
signal.action = "BUY";
signal.confidence = Math.min((RSI_OVERSOLD - rsi) / RSI_OVERSOLD, 1);
signal.reason = `RSI oversold at ${rsi.toFixed(2)}`;
} else if (rsi > RSI_OVERBOUGHT) {
signal.action = "SELL";
signal.confidence = Math.min(
(rsi - RSI_OVERBOUGHT) / (100 - RSI_OVERBOUGHT),
1,
);
signal.reason = `RSI overbought at ${rsi.toFixed(2)}`;
}
}
// SMA crossover confirmation
if (sma20 !== null && signal.action !== "HOLD") {
if (signal.action === "BUY" && currentPrice > sma20) {
signal.confidence = Math.min(signal.confidence + 0.2, 1);
signal.reason += ", price above SMA20";
} else if (signal.action === "SELL" && currentPrice < sma20) {
signal.confidence = Math.min(signal.confidence + 0.2, 1);
signal.reason += ", price below SMA20";
}
}
return signal;
}
// Execute trade via trading API
async function executeTrade(signal) {
if (signal.action === "HOLD" || signal.confidence < 0.5) {
console.log(
`Signal: ${signal.action} ${signal.commodity} (confidence: ${signal.confidence.toFixed(2)}) - Not executing`,
);
return null;
}
const order = {
symbol: signal.commodity,
side: signal.action,
type: "MARKET",
quantity: calculatePositionSize(signal),
metadata: {
rsi: signal.rsi,
confidence: signal.confidence,
reason: signal.reason,
},
};
console.log(
`Executing ${signal.action} order for ${signal.commodity}:`,
order,
);
const response = await fetch(`${TRADING_API_URL}/orders`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${TRADING_API_KEY}`,
},
body: JSON.stringify(order),
});
return response.json();
}
function calculatePositionSize(signal) {
// Scale position size by confidence
const baseSize = 100;
return Math.floor(baseSize * signal.confidence);
}
// Webhook endpoint
app.post("/webhook/trading", async (req, res) => {
const signature = req.headers["x-oilpriceapi-signature"];
const timestamp = req.headers["x-oilpriceapi-signature-timestamp"];
// Verify signature (implementation same as Recipe 1)
// ...
res.status(200).send("OK");
try {
const { type, data } = req.body;
if (type !== "price.updated") return;
const commodity = data.commodity;
const price = data.value;
// Update price history
if (!priceHistory.has(commodity)) {
priceHistory.set(commodity, []);
}
const history = priceHistory.get(commodity);
history.push(price);
// Keep history bounded
if (history.length > 100) {
history.shift();
}
// Calculate indicators
const rsi = calculateRSI(history);
const sma20 = calculateSMA(history, 20);
// Generate and potentially execute signal
const signal = generateSignal(commodity, price, rsi, sma20);
if (signal.action !== "HOLD") {
const result = await executeTrade(signal);
console.log("Trade result:", result);
}
} catch (error) {
console.error("Error processing trading signal:", error);
}
});
app.listen(3000);
Python Implementation
from flask import Flask, request
from collections import defaultdict
import requests
import os
app = Flask(__name__)
# Price history for indicators
price_history = defaultdict(list)
HISTORY_SIZE = 14
# Configuration
TRADING_API_URL = os.environ['TRADING_API_URL']
TRADING_API_KEY = os.environ['TRADING_API_KEY']
RSI_OVERSOLD = 30
RSI_OVERBOUGHT = 70
def calculate_rsi(prices):
"""Calculate Relative Strength Index."""
if len(prices) < HISTORY_SIZE:
return None
recent = prices[-HISTORY_SIZE:]
gains = []
losses = []
for i in range(1, len(recent)):
change = recent[i] - recent[i - 1]
if change >= 0:
gains.append(change)
losses.append(0)
else:
gains.append(0)
losses.append(abs(change))
avg_gain = sum(gains) / len(gains)
avg_loss = sum(losses) / len(losses)
if avg_loss == 0:
return 100
rs = avg_gain / avg_loss
return 100 - (100 / (1 + rs))
def calculate_sma(prices, period):
"""Calculate Simple Moving Average."""
if len(prices) < period:
return None
return sum(prices[-period:]) / period
def generate_signal(commodity, current_price, rsi, sma20):
"""Generate trading signal based on indicators."""
signal = {
'commodity': commodity,
'price': current_price,
'rsi': rsi,
'sma20': sma20,
'action': 'HOLD',
'confidence': 0,
'reason': ''
}
if rsi is not None:
if rsi < RSI_OVERSOLD:
signal['action'] = 'BUY'
signal['confidence'] = min((RSI_OVERSOLD - rsi) / RSI_OVERSOLD, 1)
signal['reason'] = f'RSI oversold at {rsi:.2f}'
elif rsi > RSI_OVERBOUGHT:
signal['action'] = 'SELL'
signal['confidence'] = min((rsi - RSI_OVERBOUGHT) / (100 - RSI_OVERBOUGHT), 1)
signal['reason'] = f'RSI overbought at {rsi:.2f}'
# SMA confirmation
if sma20 is not None and signal['action'] != 'HOLD':
if signal['action'] == 'BUY' and current_price > sma20:
signal['confidence'] = min(signal['confidence'] + 0.2, 1)
signal['reason'] += ', price above SMA20'
elif signal['action'] == 'SELL' and current_price < sma20:
signal['confidence'] = min(signal['confidence'] + 0.2, 1)
signal['reason'] += ', price below SMA20'
return signal
def execute_trade(signal):
"""Execute trade via trading API."""
if signal['action'] == 'HOLD' or signal['confidence'] < 0.5:
print(f"Signal: {signal['action']} {signal['commodity']} (confidence: {signal['confidence']:.2f}) - Not executing")
return None
order = {
'symbol': signal['commodity'],
'side': signal['action'],
'type': 'MARKET',
'quantity': int(100 * signal['confidence']),
'metadata': {
'rsi': signal['rsi'],
'confidence': signal['confidence'],
'reason': signal['reason']
}
}
print(f"Executing {signal['action']} order for {signal['commodity']}: {order}")
response = requests.post(
f"{TRADING_API_URL}/orders",
json=order,
headers={'Authorization': f'Bearer {TRADING_API_KEY}'}
)
return response.json()
@app.route('/webhook/trading', methods=['POST'])
def handle_webhook():
# Verify signature (same as Recipe 1)
# ...
event = request.json
if event.get('type') != 'price.updated':
return 'OK', 200
data = event.get('data', {})
commodity = data.get('commodity')
price = data.get('value')
# Update history
price_history[commodity].append(price)
if len(price_history[commodity]) > 100:
price_history[commodity].pop(0)
# Calculate indicators
history = price_history[commodity]
rsi = calculate_rsi(history)
sma20 = calculate_sma(history, 20)
# Generate signal
signal = generate_signal(commodity, price, rsi, sma20)
if signal['action'] != 'HOLD':
try:
result = execute_trade(signal)
print(f"Trade result: {result}")
except Exception as e:
print(f"Trade execution error: {e}")
return 'OK', 200
if __name__ == '__main__':
app.run(port=3000)
Recipe 3: Drilling Activity Dashboard
Build a real-time dashboard for drilling activity monitoring.
Overview
flowchart LR
OPA[OilPriceAPI] -->|drilling events| WH[Dashboard Backend]
WH -->|Update database| DB[(PostgreSQL)]
WH -->|Broadcast| WS[WebSocket Server]
WS -->|Push updates| DASH[Dashboard UI]
JavaScript Implementation
const express = require("express");
const { Server } = require("socket.io");
const { Pool } = require("pg");
const http = require("http");
const app = express();
const server = http.createServer(app);
const io = new Server(server, { cors: { origin: "*" } });
app.use(express.json());
// Database connection
const pool = new Pool({
connectionString: process.env.DATABASE_URL,
});
// Initialize database tables
async function initDatabase() {
await pool.query(`
CREATE TABLE IF NOT EXISTS rig_counts (
id SERIAL PRIMARY KEY,
region VARCHAR(100),
value INTEGER,
oil_rigs INTEGER,
gas_rigs INTEGER,
change INTEGER,
change_percent DECIMAL(5,2),
source VARCHAR(100),
recorded_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS well_permits (
id SERIAL PRIMARY KEY,
api_number VARCHAR(50) UNIQUE,
state_code VARCHAR(2),
county VARCHAR(100),
operator_name VARCHAR(200),
well_name VARCHAR(200),
permit_type VARCHAR(50),
formation VARCHAR(100),
latitude DECIMAL(10,6),
longitude DECIMAL(10,6),
permit_date DATE,
recorded_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_rig_counts_recorded ON rig_counts(recorded_at);
CREATE INDEX IF NOT EXISTS idx_permits_state ON well_permits(state_code);
`);
}
// Store rig count data
async function storeRigCount(data) {
const result = await pool.query(
`
INSERT INTO rig_counts (region, value, oil_rigs, gas_rigs, change, change_percent, source)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *
`,
[
data.region,
data.value,
data.breakdown?.oil_rigs || 0,
data.breakdown?.gas_rigs || 0,
data.change,
data.change_percent,
data.source,
],
);
return result.rows[0];
}
// Store well permit data
async function storeWellPermit(data) {
const result = await pool.query(
`
INSERT INTO well_permits
(api_number, state_code, county, operator_name, well_name,
permit_type, formation, latitude, longitude, permit_date)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (api_number) DO UPDATE SET
operator_name = EXCLUDED.operator_name,
permit_type = EXCLUDED.permit_type
RETURNING *
`,
[
data.api_number,
data.state_code,
data.location?.county || data.county,
data.operator?.name,
data.well?.name,
data.permit_type,
data.target?.formation,
data.location?.latitude,
data.location?.longitude,
data.permit_date,
],
);
return result.rows[0];
}
// Get dashboard statistics
async function getDashboardStats() {
const [rigCounts, permitStats, recentPermits] = await Promise.all([
pool.query(`
SELECT DISTINCT ON (region) *
FROM rig_counts
ORDER BY region, recorded_at DESC
`),
pool.query(`
SELECT state_code, COUNT(*) as count
FROM well_permits
WHERE permit_date >= NOW() - INTERVAL '30 days'
GROUP BY state_code
ORDER BY count DESC
LIMIT 10
`),
pool.query(`
SELECT * FROM well_permits
ORDER BY recorded_at DESC
LIMIT 20
`),
]);
return {
rigCounts: rigCounts.rows,
permitsByState: permitStats.rows,
recentPermits: recentPermits.rows,
};
}
// Webhook endpoint
app.post("/webhook/drilling", async (req, res) => {
// Verify signature...
res.status(200).send("OK");
try {
const { type, data } = req.body;
let stored;
switch (type) {
case "drilling.rig_count.updated":
stored = await storeRigCount(data);
io.emit("rig_count_update", stored);
console.log(`Rig count update: ${data.region} = ${data.value}`);
break;
case "drilling.well_permit.new":
stored = await storeWellPermit(data);
io.emit("new_permit", stored);
console.log(`New permit: ${data.api_number} in ${data.state_code}`);
break;
case "drilling.frac_spread.updated":
io.emit("frac_spread_update", data);
console.log(`Frac spread update: ${data.region} = ${data.value}`);
break;
}
} catch (error) {
console.error("Error processing drilling webhook:", error);
}
});
// API endpoint for dashboard initial load
app.get("/api/dashboard", async (req, res) => {
const stats = await getDashboardStats();
res.json(stats);
});
// WebSocket connection handling
io.on("connection", async (socket) => {
console.log("Dashboard client connected");
// Send initial data
const stats = await getDashboardStats();
socket.emit("initial_data", stats);
socket.on("disconnect", () => {
console.log("Dashboard client disconnected");
});
});
initDatabase().then(() => {
server.listen(3000, () =>
console.log("Dashboard server running on port 3000"),
);
});
Python Implementation
from flask import Flask, request, jsonify
from flask_socketio import SocketIO, emit
import psycopg2
from psycopg2.extras import RealDictCursor
import os
app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins="*")
# Database connection
def get_db():
return psycopg2.connect(
os.environ['DATABASE_URL'],
cursor_factory=RealDictCursor
)
def init_database():
"""Initialize database tables."""
conn = get_db()
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS rig_counts (
id SERIAL PRIMARY KEY,
region VARCHAR(100),
value INTEGER,
oil_rigs INTEGER,
gas_rigs INTEGER,
change INTEGER,
change_percent DECIMAL(5,2),
source VARCHAR(100),
recorded_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS well_permits (
id SERIAL PRIMARY KEY,
api_number VARCHAR(50) UNIQUE,
state_code VARCHAR(2),
county VARCHAR(100),
operator_name VARCHAR(200),
well_name VARCHAR(200),
permit_type VARCHAR(50),
formation VARCHAR(100),
latitude DECIMAL(10,6),
longitude DECIMAL(10,6),
permit_date DATE,
recorded_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
""")
conn.commit()
cur.close()
conn.close()
def store_rig_count(data):
"""Store rig count in database."""
conn = get_db()
cur = conn.cursor()
cur.execute("""
INSERT INTO rig_counts (region, value, oil_rigs, gas_rigs, change, change_percent, source)
VALUES (%s, %s, %s, %s, %s, %s, %s)
RETURNING *
""", (
data.get('region'),
data.get('value'),
data.get('breakdown', {}).get('oil_rigs', 0),
data.get('breakdown', {}).get('gas_rigs', 0),
data.get('change'),
data.get('change_percent'),
data.get('source')
))
result = cur.fetchone()
conn.commit()
cur.close()
conn.close()
return result
def store_well_permit(data):
"""Store well permit in database."""
conn = get_db()
cur = conn.cursor()
location = data.get('location', {})
operator = data.get('operator', {})
well = data.get('well', {})
target = data.get('target', {})
cur.execute("""
INSERT INTO well_permits
(api_number, state_code, county, operator_name, well_name,
permit_type, formation, latitude, longitude, permit_date)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (api_number) DO UPDATE SET
operator_name = EXCLUDED.operator_name,
permit_type = EXCLUDED.permit_type
RETURNING *
""", (
data.get('api_number'),
data.get('state_code'),
location.get('county', data.get('county')),
operator.get('name'),
well.get('name'),
data.get('permit_type'),
target.get('formation'),
location.get('latitude'),
location.get('longitude'),
data.get('permit_date')
))
result = cur.fetchone()
conn.commit()
cur.close()
conn.close()
return result
def get_dashboard_stats():
"""Get current dashboard statistics."""
conn = get_db()
cur = conn.cursor()
# Get latest rig counts by region
cur.execute("""
SELECT DISTINCT ON (region) *
FROM rig_counts
ORDER BY region, recorded_at DESC
""")
rig_counts = cur.fetchall()
# Get permits by state (last 30 days)
cur.execute("""
SELECT state_code, COUNT(*) as count
FROM well_permits
WHERE permit_date >= NOW() - INTERVAL '30 days'
GROUP BY state_code
ORDER BY count DESC
LIMIT 10
""")
permits_by_state = cur.fetchall()
# Get recent permits
cur.execute("""
SELECT * FROM well_permits
ORDER BY recorded_at DESC
LIMIT 20
""")
recent_permits = cur.fetchall()
cur.close()
conn.close()
return {
'rigCounts': rig_counts,
'permitsByState': permits_by_state,
'recentPermits': recent_permits
}
@app.route('/webhook/drilling', methods=['POST'])
def handle_webhook():
# Verify signature...
event = request.json
event_type = event.get('type')
data = event.get('data', {})
if event_type == 'drilling.rig_count.updated':
stored = store_rig_count(data)
socketio.emit('rig_count_update', stored)
print(f"Rig count update: {data.get('region')} = {data.get('value')}")
elif event_type == 'drilling.well_permit.new':
stored = store_well_permit(data)
socketio.emit('new_permit', stored)
print(f"New permit: {data.get('api_number')} in {data.get('state_code')}")
elif event_type == 'drilling.frac_spread.updated':
socketio.emit('frac_spread_update', data)
print(f"Frac spread update: {data.get('region')} = {data.get('value')}")
return 'OK', 200
@app.route('/api/dashboard')
def dashboard_api():
stats = get_dashboard_stats()
return jsonify(stats)
@socketio.on('connect')
def handle_connect():
print('Dashboard client connected')
stats = get_dashboard_stats()
emit('initial_data', stats)
if __name__ == '__main__':
init_database()
socketio.run(app, port=3000)
Recipe 4: Supply Chain Cost Tracker
Automatically update logistics and procurement costs when fuel prices change.
JavaScript Implementation
const express = require("express");
const app = express();
app.use(express.json());
// Simulated cost centers with fuel exposure
const costCenters = {
LOGISTICS_FLEET: {
name: "Logistics Fleet",
fuelType: "diesel",
monthlyGallons: 50000,
commodityCode: "DIESEL_USD",
stakeholders: ["logistics@company.com", "finance@company.com"],
},
AIR_FREIGHT: {
name: "Air Freight",
fuelType: "jet_fuel",
monthlyGallons: 20000,
commodityCode: "JET_FUEL_USD",
stakeholders: ["supply-chain@company.com"],
},
MARINE_SHIPPING: {
name: "Marine Shipping",
fuelType: "bunker",
monthlyGallons: 100000,
commodityCode: "BUNKER_FUEL_USD",
stakeholders: ["shipping@company.com"],
},
};
// ERP integration mock
async function updateERPCostCenter(costCenter, newCost) {
console.log(
`ERP Update: ${costCenter.name} monthly cost = $${newCost.toFixed(2)}`,
);
// In production, call your ERP API
// await fetch(`${ERP_API_URL}/cost-centers/${costCenter.id}`, {
// method: 'PATCH',
// body: JSON.stringify({ projected_monthly_cost: newCost })
// });
return { updated: true, newCost };
}
// Email notification
async function notifyStakeholders(costCenter, priceChange, costImpact) {
const subject = `Fuel Cost Alert: ${costCenter.name}`;
const body = `
Fuel prices for ${costCenter.fuelType} have changed by ${priceChange.toFixed(2)}%.
Impact on ${costCenter.name}:
- Monthly gallons: ${costCenter.monthlyGallons.toLocaleString()}
- Previous monthly cost: $${costImpact.previousCost.toLocaleString()}
- New monthly cost: $${costImpact.newCost.toLocaleString()}
- Cost change: $${costImpact.costChange.toLocaleString()} (${costImpact.changePercent.toFixed(2)}%)
`;
console.log(`Notifying: ${costCenter.stakeholders.join(", ")}`);
console.log(body);
// In production, send actual emails
// await sendEmail(costCenter.stakeholders, subject, body);
}
// Store historical prices for cost change calculation
const priceHistory = new Map();
app.post("/webhook/supply-chain", async (req, res) => {
res.status(200).send("OK");
const { type, data } = req.body;
if (type !== "price.updated") return;
// Find affected cost centers
const affectedCenters = Object.values(costCenters).filter(
(cc) => cc.commodityCode === data.commodity,
);
if (affectedCenters.length === 0) return;
// Get previous price for comparison
const previousPrice = priceHistory.get(data.commodity) || data.previous_value;
const currentPrice = data.value;
const priceChangePercent =
((currentPrice - previousPrice) / previousPrice) * 100;
// Update price history
priceHistory.set(data.commodity, currentPrice);
// Process each affected cost center
for (const costCenter of affectedCenters) {
const previousCost = previousPrice * costCenter.monthlyGallons;
const newCost = currentPrice * costCenter.monthlyGallons;
const costChange = newCost - previousCost;
const costImpact = {
previousCost,
newCost,
costChange,
changePercent: (costChange / previousCost) * 100,
};
// Update ERP
await updateERPCostCenter(costCenter, newCost);
// Notify if significant change (>2%)
if (Math.abs(costImpact.changePercent) >= 2) {
await notifyStakeholders(costCenter, priceChangePercent, costImpact);
}
console.log(
`${costCenter.name}: Cost ${costChange >= 0 ? "increased" : "decreased"} by $${Math.abs(costChange).toFixed(2)}`,
);
}
});
app.listen(3000);
Recipe 5: Audit Log System
Store all webhook events for compliance and analysis.
JavaScript Implementation
const express = require("express");
const { Pool } = require("pg");
const crypto = require("crypto");
const app = express();
app.use(express.json({ limit: "10mb" })); // Large payloads possible
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
// Initialize audit tables
async function initAuditTables() {
await pool.query(`
CREATE TABLE IF NOT EXISTS webhook_audit_log (
id SERIAL PRIMARY KEY,
event_id VARCHAR(50) UNIQUE NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
signature VARCHAR(64) NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
received_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
processed_at TIMESTAMP WITH TIME ZONE,
processing_status VARCHAR(20) DEFAULT 'pending',
processing_error TEXT,
payload_hash VARCHAR(64) NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_audit_event_type ON webhook_audit_log(event_type);
CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON webhook_audit_log(timestamp);
CREATE INDEX IF NOT EXISTS idx_audit_received ON webhook_audit_log(received_at);
-- Partition by month for performance (PostgreSQL 12+)
-- CREATE TABLE webhook_audit_log_2025_01 PARTITION OF webhook_audit_log
-- FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
`);
}
// Calculate payload hash for integrity verification
function calculatePayloadHash(payload) {
return crypto
.createHash("sha256")
.update(JSON.stringify(payload))
.digest("hex");
}
// Store webhook event
async function storeAuditLog(
eventId,
eventType,
payload,
signature,
timestamp,
) {
const payloadHash = calculatePayloadHash(payload);
try {
await pool.query(
`
INSERT INTO webhook_audit_log
(event_id, event_type, payload, signature, timestamp, payload_hash)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (event_id) DO NOTHING
`,
[eventId, eventType, payload, signature, timestamp, payloadHash],
);
return { stored: true, duplicate: false };
} catch (error) {
if (error.code === "23505") {
// Unique violation
return { stored: false, duplicate: true };
}
throw error;
}
}
// Update processing status
async function updateProcessingStatus(eventId, status, error = null) {
await pool.query(
`
UPDATE webhook_audit_log
SET processing_status = $1,
processed_at = NOW(),
processing_error = $2
WHERE event_id = $3
`,
[status, error, eventId],
);
}
// Query audit log
async function queryAuditLog(filters) {
let query = `
SELECT event_id, event_type, timestamp, processing_status, received_at
FROM webhook_audit_log
WHERE 1=1
`;
const params = [];
let paramIndex = 1;
if (filters.eventType) {
query += ` AND event_type = $${paramIndex++}`;
params.push(filters.eventType);
}
if (filters.startDate) {
query += ` AND timestamp >= $${paramIndex++}`;
params.push(filters.startDate);
}
if (filters.endDate) {
query += ` AND timestamp <= $${paramIndex++}`;
params.push(filters.endDate);
}
if (filters.status) {
query += ` AND processing_status = $${paramIndex++}`;
params.push(filters.status);
}
query += ` ORDER BY timestamp DESC LIMIT 100`;
const result = await pool.query(query, params);
return result.rows;
}
// Webhook endpoint with full audit logging
app.post("/webhook/audit", async (req, res) => {
const eventId = req.headers["x-oilpriceapi-event-id"];
const eventType = req.headers["x-oilpriceapi-event"];
const signature = req.headers["x-oilpriceapi-signature"];
const timestamp = req.headers["x-oilpriceapi-signature-timestamp"];
// Store immediately, before processing
const { stored, duplicate } = await storeAuditLog(
eventId,
eventType,
req.body,
signature,
new Date(parseInt(timestamp) * 1000),
);
if (duplicate) {
console.log(`Duplicate event received: ${eventId}`);
return res.status(200).send("OK");
}
// Acknowledge receipt
res.status(200).send("OK");
// Process asynchronously
try {
// Your business logic here...
await processEvent(req.body);
await updateProcessingStatus(eventId, "completed");
console.log(`Event ${eventId} processed successfully`);
} catch (error) {
await updateProcessingStatus(eventId, "failed", error.message);
console.error(`Event ${eventId} processing failed:`, error);
}
});
// API to query audit logs
app.get("/api/audit", async (req, res) => {
const events = await queryAuditLog({
eventType: req.query.type,
startDate: req.query.start,
endDate: req.query.end,
status: req.query.status,
});
res.json({ events, count: events.length });
});
// API to get specific event details
app.get("/api/audit/:eventId", async (req, res) => {
const result = await pool.query(
"SELECT * FROM webhook_audit_log WHERE event_id = $1",
[req.params.eventId],
);
if (result.rows.length === 0) {
return res.status(404).json({ error: "Event not found" });
}
res.json(result.rows[0]);
});
// API to verify payload integrity
app.get("/api/audit/:eventId/verify", async (req, res) => {
const result = await pool.query(
"SELECT payload, payload_hash FROM webhook_audit_log WHERE event_id = $1",
[req.params.eventId],
);
if (result.rows.length === 0) {
return res.status(404).json({ error: "Event not found" });
}
const { payload, payload_hash } = result.rows[0];
const calculatedHash = calculatePayloadHash(payload);
const valid = calculatedHash === payload_hash;
res.json({
eventId: req.params.eventId,
integrityValid: valid,
storedHash: payload_hash,
calculatedHash,
});
});
async function processEvent(event) {
// Simulate processing
await new Promise((r) => setTimeout(r, 100));
}
initAuditTables().then(() => {
app.listen(3000, () => console.log("Audit server running on port 3000"));
});
Recipe 6: Multi-Tenant Router
Route webhook events to different customer endpoints based on configuration.
JavaScript Implementation
const express = require("express");
const crypto = require("crypto");
const app = express();
app.use(express.json());
// Tenant configuration (in production, load from database)
const tenants = {
tenant_acme: {
name: "Acme Corp",
webhookUrl: "https://acme.example.com/oilprice-webhook",
webhookSecret: "acme_secret_123",
commodityFilters: ["BRENT_CRUDE_USD", "WTI_USD"],
eventFilters: ["price.updated", "price.significant_change"],
retryEnabled: true,
},
tenant_globex: {
name: "Globex Inc",
webhookUrl: "https://api.globex.example.com/hooks/oil",
webhookSecret: "globex_secret_456",
commodityFilters: ["NATURAL_GAS_USD"],
eventFilters: ["price.updated"],
retryEnabled: true,
},
tenant_initech: {
name: "Initech",
webhookUrl: "https://initech.example.com/api/webhooks",
webhookSecret: "initech_secret_789",
commodityFilters: [], // All commodities
eventFilters: ["drilling.rig_count.updated", "drilling.well_permit.new"],
stateFilters: ["TX", "OK"],
retryEnabled: false,
},
};
// Check if event matches tenant filters
function eventMatchesTenant(event, tenant) {
// Check event type filter
if (
tenant.eventFilters.length > 0 &&
!tenant.eventFilters.includes(event.type)
) {
return false;
}
// Check commodity filter
if (
tenant.commodityFilters.length > 0 &&
event.data?.commodity &&
!tenant.commodityFilters.includes(event.data.commodity)
) {
return false;
}
// Check state filter (for drilling events)
if (
tenant.stateFilters &&
tenant.stateFilters.length > 0 &&
event.data?.state_code &&
!tenant.stateFilters.includes(event.data.state_code)
) {
return false;
}
return true;
}
// Sign payload for tenant
function signPayloadForTenant(payload, secret) {
const timestamp = Math.floor(Date.now() / 1000);
const message = `${JSON.stringify(payload)}.${timestamp}`;
const signature = crypto
.createHmac("sha256", secret)
.update(message)
.digest("hex");
return { signature, timestamp };
}
// Forward event to tenant
async function forwardToTenant(tenant, event) {
const { signature, timestamp } = signPayloadForTenant(
event,
tenant.webhookSecret,
);
const response = await fetch(tenant.webhookUrl, {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-OilPriceAPI-Event": event.type,
"X-OilPriceAPI-Event-ID": event.id,
"X-OilPriceAPI-Signature": signature,
"X-OilPriceAPI-Signature-Timestamp": timestamp.toString(),
"X-Forwarded-By": "MultiTenantRouter",
},
body: JSON.stringify(event),
signal: AbortSignal.timeout(30000),
});
return {
success: response.ok,
statusCode: response.status,
tenantName: tenant.name,
};
}
// Retry failed delivery
async function retryDelivery(tenant, event, attempts = 3) {
const delays = [1000, 5000, 15000];
for (let i = 0; i < attempts; i++) {
try {
const result = await forwardToTenant(tenant, event);
if (result.success) {
return result;
}
} catch (error) {
console.error(
`Delivery attempt ${i + 1} failed for ${tenant.name}:`,
error.message,
);
}
if (i < attempts - 1) {
await new Promise((r) => setTimeout(r, delays[i]));
}
}
return {
success: false,
tenantName: tenant.name,
error: "Max retries exceeded",
};
}
// Main webhook endpoint
app.post("/webhook/router", async (req, res) => {
// Verify signature from OilPriceAPI (same as other recipes)
// ...
// Acknowledge immediately
res.status(200).send("OK");
const event = req.body;
// Route to matching tenants in parallel
const deliveryPromises = Object.entries(tenants)
.filter(([_, tenant]) => eventMatchesTenant(event, tenant))
.map(async ([tenantId, tenant]) => {
console.log(`Routing ${event.type} to ${tenant.name}`);
if (tenant.retryEnabled) {
return retryDelivery(tenant, event);
} else {
try {
return await forwardToTenant(tenant, event);
} catch (error) {
return {
success: false,
tenantName: tenant.name,
error: error.message,
};
}
}
});
// Wait for all deliveries
const results = await Promise.allSettled(deliveryPromises);
// Log results
results.forEach((result, index) => {
if (result.status === "fulfilled") {
const { success, tenantName, error } = result.value;
if (success) {
console.log(`Delivered to ${tenantName}`);
} else {
console.error(`Failed to deliver to ${tenantName}: ${error}`);
}
}
});
});
// API to manage tenant configuration
app.get("/api/tenants", (req, res) => {
const summary = Object.entries(tenants).map(([id, t]) => ({
id,
name: t.name,
eventFilters: t.eventFilters,
commodityFilters: t.commodityFilters,
}));
res.json({ tenants: summary });
});
app.post("/api/tenants/:id/test", async (req, res) => {
const tenant = tenants[req.params.id];
if (!tenant) {
return res.status(404).json({ error: "Tenant not found" });
}
const testEvent = {
id: `test_${Date.now()}`,
type: "test.ping",
data: { message: "Test webhook delivery" },
};
try {
const result = await forwardToTenant(tenant, testEvent);
res.json(result);
} catch (error) {
res.json({ success: false, error: error.message });
}
});
app.listen(3000, () => console.log("Multi-tenant router running on port 3000"));
Deployment Considerations
Environment Variables
All recipes require these environment variables:
# OilPriceAPI
OILPRICE_WEBHOOK_SECRET=whsec_your_secret_here
# Recipe-specific
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/xxx
TRADING_API_URL=https://api.yourbroker.com
TRADING_API_KEY=your_trading_api_key
DATABASE_URL=postgres://user:pass@host:5432/dbname
Docker Deployment
FROM node:20-slim
WORKDIR /app
COPY package*.json ./
RUN npm ci --only=production
COPY . .
EXPOSE 3000
CMD ["node", "server.js"]
Health Checks
Add health check endpoints to all recipes:
app.get("/health", (req, res) => {
res.json({ status: "healthy", timestamp: new Date().toISOString() });
});
app.get("/ready", async (req, res) => {
// Check dependencies
try {
await pool.query("SELECT 1"); // Database check
res.json({ status: "ready" });
} catch (error) {
res.status(503).json({ status: "not_ready", error: error.message });
}
});
Related Documentation
- Webhook API Reference - Event types and configuration
- Architecture - System design and security
- Troubleshooting - Common issues