WebSocket API
Reservoir Mastery Exclusive
WebSocket streaming is available exclusively to Reservoir Mastery subscribers ($129/month). Upgrade your plan to access real-time streaming.
Overview
The OilPriceAPI WebSocket service provides real-time price updates using ActionCable, eliminating the need for constant polling and reducing latency to milliseconds.
Benefits
- Instant Updates: Receive price changes as they happen
- Reduced API Calls: No need to poll the REST API
- Lower Latency: Sub-second delivery times
- Efficient: Single persistent connection
- Reliable: Automatic reconnection handling
Performance Benchmarks
| Metric | WebSocket | REST Polling (1min) | REST Polling (5min) |
|---|---|---|---|
| Avg latency to detect change | <100ms | 30,000ms | 150,000ms |
| API calls per day | 1 | 1,440 | 288 |
| Data freshness | Real-time | Up to 1 min stale | Up to 5 min stale |
| Bandwidth per commodity/day | ~50KB | ~500KB | ~100KB |
| Connection overhead | Single TLS handshake | Per-request handshake | Per-request handshake |
Why WebSocket?
For applications requiring real-time price monitoring, WebSocket reduces latency by 60x compared to 1-minute polling while using 99.9% fewer API calls. See the Migration Guide for step-by-step upgrade instructions.
Getting Started
Connection URL
wss://api.oilpriceapi.com/cable
Authentication
Include your API token in the connection parameters:
const cable = ActionCable.createConsumer("wss://api.oilpriceapi.com/cable", {
headers: {
Authorization: "Token YOUR_API_KEY",
},
});
JavaScript Implementation
Using ActionCable (Recommended)
// Install ActionCable
// npm install @rails/actioncable
import { createConsumer } from "@rails/actioncable";
// Create connection
const cable = createConsumer("wss://api.oilpriceapi.com/cable");
// Subscribe to energy prices channel
const subscription = cable.subscriptions.create("EnergyPricesChannel", {
connected() {
console.log("Connected to WebSocket");
},
disconnected() {
console.log("Disconnected from WebSocket");
},
received(data) {
console.log("Received:", data);
// Handle different message types
switch (data.type) {
case "price_update":
handlePriceUpdate(data.data);
break;
case "drilling_intelligence_update":
handleDrillingUpdate(data.data);
break;
case "alert":
handleAlert(data.data);
break;
}
},
});
// Handle price updates
function handlePriceUpdate(priceData) {
console.log(`${priceData.code}: $${priceData.price}`);
// Update your UI
}
// Unsubscribe when done
// subscription.unsubscribe();
Using Native WebSocket
class OilPriceWebSocket {
constructor(apiKey) {
this.apiKey = apiKey;
this.ws = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
}
connect() {
this.ws = new WebSocket("wss://api.oilpriceapi.com/cable");
this.ws.onopen = () => {
console.log("WebSocket connected");
this.reconnectAttempts = 0;
// Send subscription command
this.ws.send(
JSON.stringify({
command: "subscribe",
identifier: JSON.stringify({
channel: "EnergyPricesChannel",
api_key: this.apiKey,
}),
}),
);
};
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
if (message.type === "ping") return;
if (message.message) {
this.handleMessage(message.message);
}
};
this.ws.onerror = (error) => {
console.error("WebSocket error:", error);
};
this.ws.onclose = () => {
console.log("WebSocket disconnected");
this.reconnect();
};
}
handleMessage(data) {
console.log("Price update:", data);
// Process the price update
}
reconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
console.log(
`Reconnecting... (${this.reconnectAttempts}/${this.maxReconnectAttempts})`,
);
setTimeout(() => this.connect(), 5000);
}
}
disconnect() {
if (this.ws) {
this.ws.close();
}
}
}
// Usage
const ws = new OilPriceWebSocket("YOUR_API_KEY");
ws.connect();
Python Implementation
import websocket
import json
import threading
class OilPriceWebSocket:
def __init__(self, api_key):
self.api_key = api_key
self.ws = None
def on_message(self, ws, message):
data = json.loads(message)
if data.get('type') == 'ping':
return
if 'message' in data:
self.handle_price_update(data['message'])
def on_error(self, ws, error):
print(f"WebSocket error: {error}")
def on_close(self, ws):
print("WebSocket connection closed")
def on_open(self, ws):
print("WebSocket connected")
# Subscribe to channel
subscribe_message = {
'command': 'subscribe',
'identifier': json.dumps({
'channel': 'EnergyPricesChannel',
'api_key': self.api_key
})
}
ws.send(json.dumps(subscribe_message))
def handle_price_update(self, data):
if data.get('type') == 'price_update':
price_data = data['data']
print(f"{price_data['code']}: ${price_data['price']}")
def connect(self):
websocket.enableTrace(True)
self.ws = websocket.WebSocketApp(
"wss://api.oilpriceapi.com/cable",
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open
)
# Run in a separate thread
wst = threading.Thread(target=self.ws.run_forever)
wst.daemon = True
wst.start()
# Usage
client = OilPriceWebSocket('YOUR_API_KEY')
client.connect()
Go Implementation
package main
import (
"encoding/json"
"log"
"os"
"os/signal"
"time"
"github.com/gorilla/websocket"
)
type OilPriceClient struct {
apiKey string
conn *websocket.Conn
done chan struct{}
reconnectAttempts int
maxReconnects int
}
type SubscribeMessage struct {
Command string `json:"command"`
Identifier string `json:"identifier"`
}
type ChannelIdentifier struct {
Channel string `json:"channel"`
APIKey string `json:"api_key"`
}
type PriceMessage struct {
Type string `json:"type,omitempty"`
Message json.RawMessage `json:"message,omitempty"`
}
type PriceUpdate struct {
Type string `json:"type"`
Data struct {
Code string `json:"code"`
Price float64 `json:"price"`
Change float64 `json:"change"`
} `json:"data"`
}
func NewOilPriceClient(apiKey string) *OilPriceClient {
return &OilPriceClient{
apiKey: apiKey,
done: make(chan struct{}),
maxReconnects: 5,
}
}
func (c *OilPriceClient) Connect() error {
var err error
c.conn, _, err = websocket.DefaultDialer.Dial("wss://api.oilpriceapi.com/cable", nil)
if err != nil {
return err
}
// Subscribe to channel
identifier, _ := json.Marshal(ChannelIdentifier{
Channel: "EnergyPricesChannel",
APIKey: c.apiKey,
})
subscribeMsg := SubscribeMessage{
Command: "subscribe",
Identifier: string(identifier),
}
if err := c.conn.WriteJSON(subscribeMsg); err != nil {
return err
}
log.Println("Connected and subscribed to EnergyPricesChannel")
c.reconnectAttempts = 0
return nil
}
func (c *OilPriceClient) Listen() {
defer close(c.done)
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
log.Printf("Read error: %v", err)
c.reconnect()
return
}
var msg PriceMessage
if err := json.Unmarshal(message, &msg); err != nil {
continue
}
// Skip ping messages
if msg.Type == "ping" {
continue
}
// Handle price updates
if msg.Message != nil {
var update PriceUpdate
if err := json.Unmarshal(msg.Message, &update); err == nil {
if update.Type == "price_update" {
log.Printf("%s: $%.2f (%.2f%%)",
update.Data.Code, update.Data.Price, update.Data.Change)
}
}
}
}
}
func (c *OilPriceClient) reconnect() {
if c.reconnectAttempts >= c.maxReconnects {
log.Println("Max reconnection attempts reached")
return
}
c.reconnectAttempts++
delay := time.Duration(c.reconnectAttempts*5) * time.Second
log.Printf("Reconnecting in %v (attempt %d/%d)", delay, c.reconnectAttempts, c.maxReconnects)
time.Sleep(delay)
if err := c.Connect(); err != nil {
log.Printf("Reconnection failed: %v", err)
c.reconnect()
} else {
go c.Listen()
}
}
func (c *OilPriceClient) Close() {
c.conn.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
c.conn.Close()
}
func main() {
client := NewOilPriceClient("YOUR_API_KEY")
if err := client.Connect(); err != nil {
log.Fatal("Connection failed:", err)
}
defer client.Close()
go client.Listen()
// Wait for interrupt signal
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
<-interrupt
log.Println("Shutting down...")
}
Ruby Implementation
require 'faye/websocket'
require 'eventmachine'
require 'json'
class OilPriceWebSocket
WEBSOCKET_URL = 'wss://api.oilpriceapi.com/cable'
def initialize(api_key)
@api_key = api_key
@reconnect_attempts = 0
@max_reconnects = 5
end
def connect
EM.run do
@ws = Faye::WebSocket::Client.new(WEBSOCKET_URL)
@ws.on :open do |event|
puts 'WebSocket connected'
@reconnect_attempts = 0
subscribe
end
@ws.on :message do |event|
handle_message(event.data)
end
@ws.on :close do |event|
puts "Connection closed: #{event.code} - #{event.reason}"
reconnect
end
@ws.on :error do |event|
puts "WebSocket error: #{event.message}"
end
end
end
private
def subscribe
identifier = {
channel: 'EnergyPricesChannel',
api_key: @api_key
}.to_json
message = {
command: 'subscribe',
identifier: identifier
}.to_json
@ws.send(message)
puts 'Subscribed to EnergyPricesChannel'
end
def handle_message(raw_data)
data = JSON.parse(raw_data)
# Skip ping messages
return if data['type'] == 'ping'
# Handle price updates
if data['message']
message = data['message']
if message['type'] == 'price_update'
price_data = message['data']
puts "#{price_data['code']}: $#{price_data['price']} (#{price_data['change_percent']}%)"
end
end
rescue JSON::ParserError => e
puts "JSON parse error: #{e.message}"
end
def reconnect
return if @reconnect_attempts >= @max_reconnects
@reconnect_attempts += 1
delay = @reconnect_attempts * 5
puts "Reconnecting in #{delay} seconds (attempt #{@reconnect_attempts}/#{@max_reconnects})"
EM.add_timer(delay) do
@ws = Faye::WebSocket::Client.new(WEBSOCKET_URL)
# Re-attach event handlers...
end
end
end
# Usage
client = OilPriceWebSocket.new('YOUR_API_KEY')
client.connect
Java Implementation
import okhttp3.*;
import com.google.gson.*;
import java.util.concurrent.TimeUnit;
public class OilPriceWebSocket extends WebSocketListener {
private static final String WEBSOCKET_URL = "wss://api.oilpriceapi.com/cable";
private final String apiKey;
private final OkHttpClient client;
private WebSocket webSocket;
private int reconnectAttempts = 0;
private static final int MAX_RECONNECTS = 5;
private final Gson gson = new Gson();
public OilPriceWebSocket(String apiKey) {
this.apiKey = apiKey;
this.client = new OkHttpClient.Builder()
.readTimeout(0, TimeUnit.MILLISECONDS)
.build();
}
public void connect() {
Request request = new Request.Builder()
.url(WEBSOCKET_URL)
.build();
webSocket = client.newWebSocket(request, this);
}
@Override
public void onOpen(WebSocket webSocket, Response response) {
System.out.println("WebSocket connected");
reconnectAttempts = 0;
subscribe();
}
private void subscribe() {
JsonObject identifier = new JsonObject();
identifier.addProperty("channel", "EnergyPricesChannel");
identifier.addProperty("api_key", apiKey);
JsonObject subscribeMessage = new JsonObject();
subscribeMessage.addProperty("command", "subscribe");
subscribeMessage.addProperty("identifier", identifier.toString());
webSocket.send(subscribeMessage.toString());
System.out.println("Subscribed to EnergyPricesChannel");
}
@Override
public void onMessage(WebSocket webSocket, String text) {
JsonObject message = gson.fromJson(text, JsonObject.class);
// Skip ping messages
if (message.has("type") && "ping".equals(message.get("type").getAsString())) {
return;
}
// Handle price updates
if (message.has("message")) {
JsonObject payload = message.getAsJsonObject("message");
if (payload.has("type") && "price_update".equals(payload.get("type").getAsString())) {
JsonObject data = payload.getAsJsonObject("data");
String code = data.get("code").getAsString();
double price = data.get("price").getAsDouble();
double change = data.get("change_percent").getAsDouble();
System.out.printf("%s: $%.2f (%.2f%%)%n", code, price, change);
}
}
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
System.out.println("Connection closing: " + code + " - " + reason);
webSocket.close(1000, null);
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
System.out.println("Connection closed: " + code + " - " + reason);
reconnect();
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
System.out.println("WebSocket error: " + t.getMessage());
reconnect();
}
private void reconnect() {
if (reconnectAttempts >= MAX_RECONNECTS) {
System.out.println("Max reconnection attempts reached");
return;
}
reconnectAttempts++;
int delay = reconnectAttempts * 5000;
System.out.printf("Reconnecting in %d ms (attempt %d/%d)%n",
delay, reconnectAttempts, MAX_RECONNECTS);
try {
Thread.sleep(delay);
connect();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void disconnect() {
if (webSocket != null) {
webSocket.close(1000, "Client disconnect");
}
}
public static void main(String[] args) {
OilPriceWebSocket client = new OilPriceWebSocket("YOUR_API_KEY");
client.connect();
// Keep running
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutting down...");
client.disconnect();
}));
}
}
PHP Implementation
<?php
require 'vendor/autoload.php';
use WebSocket\Client;
use WebSocket\ConnectionException;
class OilPriceWebSocket
{
private const WEBSOCKET_URL = 'wss://api.oilpriceapi.com/cable';
private string $apiKey;
private ?Client $client = null;
private int $reconnectAttempts = 0;
private int $maxReconnects = 5;
private bool $running = true;
public function __construct(string $apiKey)
{
$this->apiKey = $apiKey;
}
public function connect(): void
{
$this->client = new Client(self::WEBSOCKET_URL, [
'timeout' => 60,
'headers' => [
'Origin' => 'https://oilpriceapi.com'
]
]);
echo "WebSocket connected\n";
$this->reconnectAttempts = 0;
$this->subscribe();
}
private function subscribe(): void
{
$identifier = json_encode([
'channel' => 'EnergyPricesChannel',
'api_key' => $this->apiKey
]);
$message = json_encode([
'command' => 'subscribe',
'identifier' => $identifier
]);
$this->client->send($message);
echo "Subscribed to EnergyPricesChannel\n";
}
public function listen(): void
{
while ($this->running) {
try {
$message = $this->client->receive();
$this->handleMessage($message);
} catch (ConnectionException $e) {
echo "Connection error: " . $e->getMessage() . "\n";
$this->reconnect();
}
}
}
private function handleMessage(string $rawMessage): void
{
$data = json_decode($rawMessage, true);
if ($data === null) {
return;
}
// Skip ping messages
if (isset($data['type']) && $data['type'] === 'ping') {
return;
}
// Handle price updates
if (isset($data['message'])) {
$message = $data['message'];
if (isset($message['type']) && $message['type'] === 'price_update') {
$priceData = $message['data'];
printf(
"%s: $%.2f (%.2f%%)\n",
$priceData['code'],
$priceData['price'],
$priceData['change_percent']
);
}
}
}
private function reconnect(): void
{
if ($this->reconnectAttempts >= $this->maxReconnects) {
echo "Max reconnection attempts reached\n";
$this->running = false;
return;
}
$this->reconnectAttempts++;
$delay = $this->reconnectAttempts * 5;
printf(
"Reconnecting in %d seconds (attempt %d/%d)\n",
$delay,
$this->reconnectAttempts,
$this->maxReconnects
);
sleep($delay);
try {
$this->connect();
} catch (\Exception $e) {
echo "Reconnection failed: " . $e->getMessage() . "\n";
$this->reconnect();
}
}
public function disconnect(): void
{
$this->running = false;
if ($this->client) {
$this->client->close();
}
}
}
// Usage
$client = new OilPriceWebSocket('YOUR_API_KEY');
// Handle shutdown gracefully
pcntl_signal(SIGINT, function() use ($client) {
echo "\nShutting down...\n";
$client->disconnect();
exit(0);
});
$client->connect();
$client->listen();
C# Implementation
using System;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
public class OilPriceWebSocket : IDisposable
{
private const string WebSocketUrl = "wss://api.oilpriceapi.com/cable";
private readonly string _apiKey;
private ClientWebSocket _webSocket;
private CancellationTokenSource _cancellationTokenSource;
private int _reconnectAttempts;
private const int MaxReconnects = 5;
public OilPriceWebSocket(string apiKey)
{
_apiKey = apiKey;
_webSocket = new ClientWebSocket();
_cancellationTokenSource = new CancellationTokenSource();
}
public async Task ConnectAsync()
{
_webSocket = new ClientWebSocket();
await _webSocket.ConnectAsync(new Uri(WebSocketUrl), _cancellationTokenSource.Token);
Console.WriteLine("WebSocket connected");
_reconnectAttempts = 0;
await SubscribeAsync();
}
private async Task SubscribeAsync()
{
var identifier = JsonSerializer.Serialize(new
{
channel = "EnergyPricesChannel",
api_key = _apiKey
});
var subscribeMessage = JsonSerializer.Serialize(new
{
command = "subscribe",
identifier = identifier
});
var bytes = Encoding.UTF8.GetBytes(subscribeMessage);
await _webSocket.SendAsync(
new ArraySegment<byte>(bytes),
WebSocketMessageType.Text,
true,
_cancellationTokenSource.Token
);
Console.WriteLine("Subscribed to EnergyPricesChannel");
}
public async Task ListenAsync()
{
var buffer = new byte[4096];
while (_webSocket.State == WebSocketState.Open)
{
try
{
var result = await _webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer),
_cancellationTokenSource.Token
);
if (result.MessageType == WebSocketMessageType.Close)
{
await ReconnectAsync();
return;
}
var message = Encoding.UTF8.GetString(buffer, 0, result.Count);
HandleMessage(message);
}
catch (WebSocketException ex)
{
Console.WriteLine($"WebSocket error: {ex.Message}");
await ReconnectAsync();
return;
}
}
}
private void HandleMessage(string rawMessage)
{
try
{
using var doc = JsonDocument.Parse(rawMessage);
var root = doc.RootElement;
// Skip ping messages
if (root.TryGetProperty("type", out var type) && type.GetString() == "ping")
{
return;
}
// Handle price updates
if (root.TryGetProperty("message", out var message))
{
if (message.TryGetProperty("type", out var msgType) &&
msgType.GetString() == "price_update")
{
var data = message.GetProperty("data");
var code = data.GetProperty("code").GetString();
var price = data.GetProperty("price").GetDouble();
var change = data.GetProperty("change_percent").GetDouble();
Console.WriteLine($"{code}: ${price:F2} ({change:F2}%)");
}
}
}
catch (JsonException ex)
{
Console.WriteLine($"JSON parse error: {ex.Message}");
}
}
private async Task ReconnectAsync()
{
if (_reconnectAttempts >= MaxReconnects)
{
Console.WriteLine("Max reconnection attempts reached");
return;
}
_reconnectAttempts++;
var delay = _reconnectAttempts * 5000;
Console.WriteLine($"Reconnecting in {delay}ms (attempt {_reconnectAttempts}/{MaxReconnects})");
await Task.Delay(delay);
try
{
_webSocket.Dispose();
await ConnectAsync();
_ = ListenAsync();
}
catch (Exception ex)
{
Console.WriteLine($"Reconnection failed: {ex.Message}");
await ReconnectAsync();
}
}
public async Task DisconnectAsync()
{
if (_webSocket.State == WebSocketState.Open)
{
await _webSocket.CloseAsync(
WebSocketCloseStatus.NormalClosure,
"Client disconnect",
CancellationToken.None
);
}
}
public void Dispose()
{
_cancellationTokenSource.Cancel();
_webSocket?.Dispose();
_cancellationTokenSource?.Dispose();
}
}
// Usage
class Program
{
static async Task Main(string[] args)
{
using var client = new OilPriceWebSocket("YOUR_API_KEY");
Console.CancelKeyPress += async (sender, e) =>
{
e.Cancel = true;
Console.WriteLine("\nShutting down...");
await client.DisconnectAsync();
};
await client.ConnectAsync();
await client.ListenAsync();
}
}
Rust Implementation
use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::time::Duration;
use tokio::time::sleep;
use tokio_tungstenite::{connect_async, tungstenite::Message};
const WEBSOCKET_URL: &str = "wss://api.oilpriceapi.com/cable";
const MAX_RECONNECTS: u32 = 5;
#[derive(Serialize)]
struct SubscribeMessage {
command: String,
identifier: String,
}
#[derive(Deserialize, Debug)]
struct PriceData {
code: String,
price: f64,
change_percent: f64,
}
#[derive(Deserialize, Debug)]
struct PriceUpdate {
#[serde(rename = "type")]
msg_type: Option<String>,
data: Option<PriceData>,
}
pub struct OilPriceClient {
api_key: String,
reconnect_attempts: u32,
}
impl OilPriceClient {
pub fn new(api_key: &str) -> Self {
Self {
api_key: api_key.to_string(),
reconnect_attempts: 0,
}
}
pub async fn connect(&mut self) -> Result<(), Box<dyn std::error::Error>> {
loop {
match self.run_connection().await {
Ok(_) => break Ok(()),
Err(e) => {
eprintln!("Connection error: {}", e);
if !self.reconnect().await {
break Err(e);
}
}
}
}
}
async fn run_connection(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let (ws_stream, _) = connect_async(WEBSOCKET_URL).await?;
println!("WebSocket connected");
self.reconnect_attempts = 0;
let (mut write, mut read) = ws_stream.split();
// Subscribe to channel
let identifier = json!({
"channel": "EnergyPricesChannel",
"api_key": self.api_key
});
let subscribe_msg = SubscribeMessage {
command: "subscribe".to_string(),
identifier: identifier.to_string(),
};
let msg = Message::Text(serde_json::to_string(&subscribe_msg)?);
write.send(msg).await?;
println!("Subscribed to EnergyPricesChannel");
// Listen for messages
while let Some(message) = read.next().await {
match message {
Ok(Message::Text(text)) => {
self.handle_message(&text);
}
Ok(Message::Close(_)) => {
println!("Connection closed by server");
break;
}
Err(e) => {
eprintln!("Receive error: {}", e);
break;
}
_ => {}
}
}
Ok(())
}
fn handle_message(&self, raw_message: &str) {
let value: Result<Value, _> = serde_json::from_str(raw_message);
if let Ok(msg) = value {
// Skip ping messages
if msg.get("type").and_then(|t| t.as_str()) == Some("ping") {
return;
}
// Handle price updates
if let Some(message) = msg.get("message") {
if let Ok(update) = serde_json::from_value::<PriceUpdate>(message.clone()) {
if update.msg_type.as_deref() == Some("price_update") {
if let Some(data) = update.data {
println!(
"{}: ${:.2} ({:.2}%)",
data.code, data.price, data.change_percent
);
}
}
}
}
}
}
async fn reconnect(&mut self) -> bool {
if self.reconnect_attempts >= MAX_RECONNECTS {
eprintln!("Max reconnection attempts reached");
return false;
}
self.reconnect_attempts += 1;
let delay = self.reconnect_attempts as u64 * 5;
println!(
"Reconnecting in {} seconds (attempt {}/{})",
delay, self.reconnect_attempts, MAX_RECONNECTS
);
sleep(Duration::from_secs(delay)).await;
true
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = OilPriceClient::new("YOUR_API_KEY");
// Handle Ctrl+C
tokio::spawn(async {
tokio::signal::ctrl_c().await.unwrap();
println!("\nShutting down...");
std::process::exit(0);
});
client.connect().await?;
Ok(())
}
Cargo.toml dependencies:
[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = { version = "0.21", features = ["native-tls"] }
futures-util = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
Message Types
Price Update
Sent whenever a commodity price changes:
{
"type": "price_update",
"data": {
"code": "WTI_USD",
"name": "West Texas Intermediate",
"price": 75.43,
"formatted": "$75.43",
"currency": "USD",
"unit": "barrel",
"change": 0.22,
"change_percent": 0.29,
"previous_price": 75.21,
"created_at": "2025-07-18T14:30:00.000Z"
}
}
Drilling Intelligence Update
Sent when new drilling intelligence data is available:
{
"type": "drilling_intelligence_update",
"data": {
"code": "US_RIG_COUNT",
"name": "US Rig Count",
"value": 622,
"previous_value": 619,
"change": 3,
"change_percent": 0.48,
"region": "United States",
"source": "baker_hughes",
"created_at": "2025-07-18T18:00:00.000Z"
}
}
Alert Notification
Sent when a configured alert is triggered:
{
"type": "alert",
"data": {
"alert_type": "price_threshold",
"commodity": "BRENT_CRUDE_USD",
"condition": "above",
"threshold": 80.0,
"current_price": 80.15,
"message": "Brent Crude has exceeded $80.00",
"triggered_at": "2025-07-18T15:45:00.000Z"
}
}
Connection Status
System messages about connection state:
{
"type": "welcome",
"message": "Successfully connected to OilPriceAPI WebSocket"
}
Subscribing to Specific Commodities
You can filter which commodities you receive updates for:
const subscription = cable.subscriptions.create(
{
channel: "EnergyPricesChannel",
commodities: ["WTI_USD", "BRENT_CRUDE_USD", "NATURAL_GAS_USD"],
},
{
received(data) {
// Only receive updates for specified commodities
},
},
);
Error Handling
Connection Errors
subscription.rejected = () => {
console.error("Subscription rejected. Check your API key and plan.");
};
Automatic Reconnection
ActionCable handles reconnection automatically. For custom implementations:
let reconnectInterval = 1000; // Start with 1 second
function connect() {
const ws = new WebSocket("wss://api.oilpriceapi.com/cable");
ws.onclose = () => {
setTimeout(() => {
reconnectInterval = Math.min(reconnectInterval * 2, 30000); // Max 30 seconds
connect();
}, reconnectInterval);
};
ws.onopen = () => {
reconnectInterval = 1000; // Reset on successful connection
};
}
Best Practices
1. Handle Disconnections Gracefully
Always implement reconnection logic and inform users of connection status.
2. Process Messages Efficiently
// Use a queue for high-frequency updates
const priceQueue = [];
subscription.received = (data) => {
priceQueue.push(data);
};
// Process queue periodically
setInterval(() => {
if (priceQueue.length > 0) {
const updates = priceQueue.splice(0, priceQueue.length);
updateUI(updates);
}
}, 100); // Update UI every 100ms
3. Implement Heartbeat
Keep the connection alive during quiet periods:
setInterval(() => {
if (subscription) {
subscription.perform("ping");
}
}, 30000); // Ping every 30 seconds
4. Monitor Connection Health
let lastMessageTime = Date.now();
subscription.received = (data) => {
lastMessageTime = Date.now();
// Process message
};
// Check connection health
setInterval(() => {
if (Date.now() - lastMessageTime > 60000) {
console.warn("No messages for 60 seconds, connection may be stale");
// Reconnect if needed
}
}, 10000);
Rate Limits
WebSocket connections are limited by plan:
- Reservoir Mastery: Up to 5 concurrent connections per API key
- Message Rate: No limit on received messages
- Send Rate: 10 messages per second per connection
Testing & Development
Interactive Connection Tester
Test your WebSocket connection directly in your browser:
WebSocket Test Harness
We provide an open-source WebSocket Test Harness for testing your connection before integrating into your application. This tool lets you:
- Test WebSocket connectivity from Node.js, Python, or browser
- Verify your API key works with WebSocket
- Inspect real-time messages to understand the data format
- Debug connection issues before deploying to production
Quick Start (Node.js)
# Clone the harness
git clone https://github.com/karlwaldman/oilpriceapi-websocket-tester.git
cd oilpriceapi-websocket-tester
# Install and run
npm install
node cli/index.js YOUR_API_KEY
Browser Testing
Open browser/index.html directly in your browser - no server required:
- Open
browser/index.htmlin your browser - Enter your API key
- Click Connect to see real-time price updates
Python Testing
pip install -r python/requirements.txt
python python/tester.py YOUR_API_KEY
Resources
- GitHub Repository: github.com/karlwaldman/oilpriceapi-websocket-tester
- Includes: Node.js CLI, Python script, and standalone browser tester
- No secrets stored: API key is always passed at runtime
Troubleshooting
Connection Rejected
- Verify API key is valid
- Confirm Reservoir Mastery subscription is active
- Check if maximum connections reached
No Messages Received
- Ensure subscription command was sent
- Verify commodities are actively trading
- Check browser console for errors
Frequent Disconnections
- Check network stability
- Verify firewall allows WebSocket connections
- Consider implementing exponential backoff
Support
For WebSocket-specific support:
- Technical docs: https://docs.oilpriceapi.com/websocket/
- Email: [email protected]
- Slack: Available for Reservoir Mastery subscribers
Upgrade to Reservoir Mastery to access WebSocket streaming.