Oil Price API Documentation - Quick Start in 5 Minutes | REST API
GitHub
GitHub

WebSocket API

Reservoir Mastery Exclusive

WebSocket streaming is available exclusively to Reservoir Mastery subscribers ($129/month). Upgrade your plan to access real-time streaming.

Overview

The OilPriceAPI WebSocket service provides real-time price updates using ActionCable, eliminating the need for constant polling and reducing latency to milliseconds.

Benefits

  • Instant Updates: Receive price changes as they happen
  • Reduced API Calls: No need to poll the REST API
  • Lower Latency: Sub-second delivery times
  • Efficient: Single persistent connection
  • Reliable: Automatic reconnection handling

Performance Benchmarks

MetricWebSocketREST Polling (1min)REST Polling (5min)
Avg latency to detect change<100ms30,000ms150,000ms
API calls per day11,440288
Data freshnessReal-timeUp to 1 min staleUp to 5 min stale
Bandwidth per commodity/day~50KB~500KB~100KB
Connection overheadSingle TLS handshakePer-request handshakePer-request handshake

Why WebSocket?

For applications requiring real-time price monitoring, WebSocket reduces latency by 60x compared to 1-minute polling while using 99.9% fewer API calls. See the Migration Guide for step-by-step upgrade instructions.

Getting Started

Connection URL

wss://api.oilpriceapi.com/cable

Authentication

Include your API token in the connection parameters:

const cable = ActionCable.createConsumer("wss://api.oilpriceapi.com/cable", {
  headers: {
    Authorization: "Token YOUR_API_KEY",
  },
});

JavaScript Implementation

Using ActionCable (Recommended)

// Install ActionCable
// npm install @rails/actioncable

import { createConsumer } from "@rails/actioncable";

// Create connection
const cable = createConsumer("wss://api.oilpriceapi.com/cable");

// Subscribe to energy prices channel
const subscription = cable.subscriptions.create("EnergyPricesChannel", {
  connected() {
    console.log("Connected to WebSocket");
  },

  disconnected() {
    console.log("Disconnected from WebSocket");
  },

  received(data) {
    console.log("Received:", data);

    // Handle different message types
    switch (data.type) {
      case "price_update":
        handlePriceUpdate(data.data);
        break;
      case "drilling_intelligence_update":
        handleDrillingUpdate(data.data);
        break;
      case "alert":
        handleAlert(data.data);
        break;
    }
  },
});

// Handle price updates
function handlePriceUpdate(priceData) {
  console.log(`${priceData.code}: $${priceData.price}`);
  // Update your UI
}

// Unsubscribe when done
// subscription.unsubscribe();

Using Native WebSocket

class OilPriceWebSocket {
  constructor(apiKey) {
    this.apiKey = apiKey;
    this.ws = null;
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = 5;
  }

  connect() {
    this.ws = new WebSocket("wss://api.oilpriceapi.com/cable");

    this.ws.onopen = () => {
      console.log("WebSocket connected");
      this.reconnectAttempts = 0;

      // Send subscription command
      this.ws.send(
        JSON.stringify({
          command: "subscribe",
          identifier: JSON.stringify({
            channel: "EnergyPricesChannel",
            api_key: this.apiKey,
          }),
        }),
      );
    };

    this.ws.onmessage = (event) => {
      const message = JSON.parse(event.data);

      if (message.type === "ping") return;

      if (message.message) {
        this.handleMessage(message.message);
      }
    };

    this.ws.onerror = (error) => {
      console.error("WebSocket error:", error);
    };

    this.ws.onclose = () => {
      console.log("WebSocket disconnected");
      this.reconnect();
    };
  }

  handleMessage(data) {
    console.log("Price update:", data);
    // Process the price update
  }

  reconnect() {
    if (this.reconnectAttempts < this.maxReconnectAttempts) {
      this.reconnectAttempts++;
      console.log(
        `Reconnecting... (${this.reconnectAttempts}/${this.maxReconnectAttempts})`,
      );
      setTimeout(() => this.connect(), 5000);
    }
  }

  disconnect() {
    if (this.ws) {
      this.ws.close();
    }
  }
}

// Usage
const ws = new OilPriceWebSocket("YOUR_API_KEY");
ws.connect();

Python Implementation

import websocket
import json
import threading

class OilPriceWebSocket:
    def __init__(self, api_key):
        self.api_key = api_key
        self.ws = None

    def on_message(self, ws, message):
        data = json.loads(message)

        if data.get('type') == 'ping':
            return

        if 'message' in data:
            self.handle_price_update(data['message'])

    def on_error(self, ws, error):
        print(f"WebSocket error: {error}")

    def on_close(self, ws):
        print("WebSocket connection closed")

    def on_open(self, ws):
        print("WebSocket connected")

        # Subscribe to channel
        subscribe_message = {
            'command': 'subscribe',
            'identifier': json.dumps({
                'channel': 'EnergyPricesChannel',
                'api_key': self.api_key
            })
        }
        ws.send(json.dumps(subscribe_message))

    def handle_price_update(self, data):
        if data.get('type') == 'price_update':
            price_data = data['data']
            print(f"{price_data['code']}: ${price_data['price']}")

    def connect(self):
        websocket.enableTrace(True)
        self.ws = websocket.WebSocketApp(
            "wss://api.oilpriceapi.com/cable",
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close,
            on_open=self.on_open
        )

        # Run in a separate thread
        wst = threading.Thread(target=self.ws.run_forever)
        wst.daemon = True
        wst.start()

# Usage
client = OilPriceWebSocket('YOUR_API_KEY')
client.connect()

Go Implementation

package main

import (
	"encoding/json"
	"log"
	"os"
	"os/signal"
	"time"

	"github.com/gorilla/websocket"
)

type OilPriceClient struct {
	apiKey           string
	conn             *websocket.Conn
	done             chan struct{}
	reconnectAttempts int
	maxReconnects    int
}

type SubscribeMessage struct {
	Command    string `json:"command"`
	Identifier string `json:"identifier"`
}

type ChannelIdentifier struct {
	Channel string `json:"channel"`
	APIKey  string `json:"api_key"`
}

type PriceMessage struct {
	Type    string          `json:"type,omitempty"`
	Message json.RawMessage `json:"message,omitempty"`
}

type PriceUpdate struct {
	Type string `json:"type"`
	Data struct {
		Code   string  `json:"code"`
		Price  float64 `json:"price"`
		Change float64 `json:"change"`
	} `json:"data"`
}

func NewOilPriceClient(apiKey string) *OilPriceClient {
	return &OilPriceClient{
		apiKey:        apiKey,
		done:          make(chan struct{}),
		maxReconnects: 5,
	}
}

func (c *OilPriceClient) Connect() error {
	var err error
	c.conn, _, err = websocket.DefaultDialer.Dial("wss://api.oilpriceapi.com/cable", nil)
	if err != nil {
		return err
	}

	// Subscribe to channel
	identifier, _ := json.Marshal(ChannelIdentifier{
		Channel: "EnergyPricesChannel",
		APIKey:  c.apiKey,
	})

	subscribeMsg := SubscribeMessage{
		Command:    "subscribe",
		Identifier: string(identifier),
	}

	if err := c.conn.WriteJSON(subscribeMsg); err != nil {
		return err
	}

	log.Println("Connected and subscribed to EnergyPricesChannel")
	c.reconnectAttempts = 0
	return nil
}

func (c *OilPriceClient) Listen() {
	defer close(c.done)

	for {
		_, message, err := c.conn.ReadMessage()
		if err != nil {
			log.Printf("Read error: %v", err)
			c.reconnect()
			return
		}

		var msg PriceMessage
		if err := json.Unmarshal(message, &msg); err != nil {
			continue
		}

		// Skip ping messages
		if msg.Type == "ping" {
			continue
		}

		// Handle price updates
		if msg.Message != nil {
			var update PriceUpdate
			if err := json.Unmarshal(msg.Message, &update); err == nil {
				if update.Type == "price_update" {
					log.Printf("%s: $%.2f (%.2f%%)",
						update.Data.Code, update.Data.Price, update.Data.Change)
				}
			}
		}
	}
}

func (c *OilPriceClient) reconnect() {
	if c.reconnectAttempts >= c.maxReconnects {
		log.Println("Max reconnection attempts reached")
		return
	}

	c.reconnectAttempts++
	delay := time.Duration(c.reconnectAttempts*5) * time.Second
	log.Printf("Reconnecting in %v (attempt %d/%d)", delay, c.reconnectAttempts, c.maxReconnects)

	time.Sleep(delay)
	if err := c.Connect(); err != nil {
		log.Printf("Reconnection failed: %v", err)
		c.reconnect()
	} else {
		go c.Listen()
	}
}

func (c *OilPriceClient) Close() {
	c.conn.WriteMessage(websocket.CloseMessage,
		websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
	c.conn.Close()
}

func main() {
	client := NewOilPriceClient("YOUR_API_KEY")

	if err := client.Connect(); err != nil {
		log.Fatal("Connection failed:", err)
	}
	defer client.Close()

	go client.Listen()

	// Wait for interrupt signal
	interrupt := make(chan os.Signal, 1)
	signal.Notify(interrupt, os.Interrupt)
	<-interrupt
	log.Println("Shutting down...")
}

Ruby Implementation

require 'faye/websocket'
require 'eventmachine'
require 'json'

class OilPriceWebSocket
  WEBSOCKET_URL = 'wss://api.oilpriceapi.com/cable'

  def initialize(api_key)
    @api_key = api_key
    @reconnect_attempts = 0
    @max_reconnects = 5
  end

  def connect
    EM.run do
      @ws = Faye::WebSocket::Client.new(WEBSOCKET_URL)

      @ws.on :open do |event|
        puts 'WebSocket connected'
        @reconnect_attempts = 0
        subscribe
      end

      @ws.on :message do |event|
        handle_message(event.data)
      end

      @ws.on :close do |event|
        puts "Connection closed: #{event.code} - #{event.reason}"
        reconnect
      end

      @ws.on :error do |event|
        puts "WebSocket error: #{event.message}"
      end
    end
  end

  private

  def subscribe
    identifier = {
      channel: 'EnergyPricesChannel',
      api_key: @api_key
    }.to_json

    message = {
      command: 'subscribe',
      identifier: identifier
    }.to_json

    @ws.send(message)
    puts 'Subscribed to EnergyPricesChannel'
  end

  def handle_message(raw_data)
    data = JSON.parse(raw_data)

    # Skip ping messages
    return if data['type'] == 'ping'

    # Handle price updates
    if data['message']
      message = data['message']
      if message['type'] == 'price_update'
        price_data = message['data']
        puts "#{price_data['code']}: $#{price_data['price']} (#{price_data['change_percent']}%)"
      end
    end
  rescue JSON::ParserError => e
    puts "JSON parse error: #{e.message}"
  end

  def reconnect
    return if @reconnect_attempts >= @max_reconnects

    @reconnect_attempts += 1
    delay = @reconnect_attempts * 5
    puts "Reconnecting in #{delay} seconds (attempt #{@reconnect_attempts}/#{@max_reconnects})"

    EM.add_timer(delay) do
      @ws = Faye::WebSocket::Client.new(WEBSOCKET_URL)
      # Re-attach event handlers...
    end
  end
end

# Usage
client = OilPriceWebSocket.new('YOUR_API_KEY')
client.connect

Java Implementation

import okhttp3.*;
import com.google.gson.*;
import java.util.concurrent.TimeUnit;

public class OilPriceWebSocket extends WebSocketListener {
    private static final String WEBSOCKET_URL = "wss://api.oilpriceapi.com/cable";
    private final String apiKey;
    private final OkHttpClient client;
    private WebSocket webSocket;
    private int reconnectAttempts = 0;
    private static final int MAX_RECONNECTS = 5;
    private final Gson gson = new Gson();

    public OilPriceWebSocket(String apiKey) {
        this.apiKey = apiKey;
        this.client = new OkHttpClient.Builder()
            .readTimeout(0, TimeUnit.MILLISECONDS)
            .build();
    }

    public void connect() {
        Request request = new Request.Builder()
            .url(WEBSOCKET_URL)
            .build();
        webSocket = client.newWebSocket(request, this);
    }

    @Override
    public void onOpen(WebSocket webSocket, Response response) {
        System.out.println("WebSocket connected");
        reconnectAttempts = 0;
        subscribe();
    }

    private void subscribe() {
        JsonObject identifier = new JsonObject();
        identifier.addProperty("channel", "EnergyPricesChannel");
        identifier.addProperty("api_key", apiKey);

        JsonObject subscribeMessage = new JsonObject();
        subscribeMessage.addProperty("command", "subscribe");
        subscribeMessage.addProperty("identifier", identifier.toString());

        webSocket.send(subscribeMessage.toString());
        System.out.println("Subscribed to EnergyPricesChannel");
    }

    @Override
    public void onMessage(WebSocket webSocket, String text) {
        JsonObject message = gson.fromJson(text, JsonObject.class);

        // Skip ping messages
        if (message.has("type") && "ping".equals(message.get("type").getAsString())) {
            return;
        }

        // Handle price updates
        if (message.has("message")) {
            JsonObject payload = message.getAsJsonObject("message");
            if (payload.has("type") && "price_update".equals(payload.get("type").getAsString())) {
                JsonObject data = payload.getAsJsonObject("data");
                String code = data.get("code").getAsString();
                double price = data.get("price").getAsDouble();
                double change = data.get("change_percent").getAsDouble();
                System.out.printf("%s: $%.2f (%.2f%%)%n", code, price, change);
            }
        }
    }

    @Override
    public void onClosing(WebSocket webSocket, int code, String reason) {
        System.out.println("Connection closing: " + code + " - " + reason);
        webSocket.close(1000, null);
    }

    @Override
    public void onClosed(WebSocket webSocket, int code, String reason) {
        System.out.println("Connection closed: " + code + " - " + reason);
        reconnect();
    }

    @Override
    public void onFailure(WebSocket webSocket, Throwable t, Response response) {
        System.out.println("WebSocket error: " + t.getMessage());
        reconnect();
    }

    private void reconnect() {
        if (reconnectAttempts >= MAX_RECONNECTS) {
            System.out.println("Max reconnection attempts reached");
            return;
        }

        reconnectAttempts++;
        int delay = reconnectAttempts * 5000;
        System.out.printf("Reconnecting in %d ms (attempt %d/%d)%n",
            delay, reconnectAttempts, MAX_RECONNECTS);

        try {
            Thread.sleep(delay);
            connect();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void disconnect() {
        if (webSocket != null) {
            webSocket.close(1000, "Client disconnect");
        }
    }

    public static void main(String[] args) {
        OilPriceWebSocket client = new OilPriceWebSocket("YOUR_API_KEY");
        client.connect();

        // Keep running
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Shutting down...");
            client.disconnect();
        }));
    }
}

PHP Implementation

<?php

require 'vendor/autoload.php';

use WebSocket\Client;
use WebSocket\ConnectionException;

class OilPriceWebSocket
{
    private const WEBSOCKET_URL = 'wss://api.oilpriceapi.com/cable';
    private string $apiKey;
    private ?Client $client = null;
    private int $reconnectAttempts = 0;
    private int $maxReconnects = 5;
    private bool $running = true;

    public function __construct(string $apiKey)
    {
        $this->apiKey = $apiKey;
    }

    public function connect(): void
    {
        $this->client = new Client(self::WEBSOCKET_URL, [
            'timeout' => 60,
            'headers' => [
                'Origin' => 'https://oilpriceapi.com'
            ]
        ]);

        echo "WebSocket connected\n";
        $this->reconnectAttempts = 0;
        $this->subscribe();
    }

    private function subscribe(): void
    {
        $identifier = json_encode([
            'channel' => 'EnergyPricesChannel',
            'api_key' => $this->apiKey
        ]);

        $message = json_encode([
            'command' => 'subscribe',
            'identifier' => $identifier
        ]);

        $this->client->send($message);
        echo "Subscribed to EnergyPricesChannel\n";
    }

    public function listen(): void
    {
        while ($this->running) {
            try {
                $message = $this->client->receive();
                $this->handleMessage($message);
            } catch (ConnectionException $e) {
                echo "Connection error: " . $e->getMessage() . "\n";
                $this->reconnect();
            }
        }
    }

    private function handleMessage(string $rawMessage): void
    {
        $data = json_decode($rawMessage, true);

        if ($data === null) {
            return;
        }

        // Skip ping messages
        if (isset($data['type']) && $data['type'] === 'ping') {
            return;
        }

        // Handle price updates
        if (isset($data['message'])) {
            $message = $data['message'];
            if (isset($message['type']) && $message['type'] === 'price_update') {
                $priceData = $message['data'];
                printf(
                    "%s: $%.2f (%.2f%%)\n",
                    $priceData['code'],
                    $priceData['price'],
                    $priceData['change_percent']
                );
            }
        }
    }

    private function reconnect(): void
    {
        if ($this->reconnectAttempts >= $this->maxReconnects) {
            echo "Max reconnection attempts reached\n";
            $this->running = false;
            return;
        }

        $this->reconnectAttempts++;
        $delay = $this->reconnectAttempts * 5;
        printf(
            "Reconnecting in %d seconds (attempt %d/%d)\n",
            $delay,
            $this->reconnectAttempts,
            $this->maxReconnects
        );

        sleep($delay);

        try {
            $this->connect();
        } catch (\Exception $e) {
            echo "Reconnection failed: " . $e->getMessage() . "\n";
            $this->reconnect();
        }
    }

    public function disconnect(): void
    {
        $this->running = false;
        if ($this->client) {
            $this->client->close();
        }
    }
}

// Usage
$client = new OilPriceWebSocket('YOUR_API_KEY');

// Handle shutdown gracefully
pcntl_signal(SIGINT, function() use ($client) {
    echo "\nShutting down...\n";
    $client->disconnect();
    exit(0);
});

$client->connect();
$client->listen();

C# Implementation

using System;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

public class OilPriceWebSocket : IDisposable
{
    private const string WebSocketUrl = "wss://api.oilpriceapi.com/cable";
    private readonly string _apiKey;
    private ClientWebSocket _webSocket;
    private CancellationTokenSource _cancellationTokenSource;
    private int _reconnectAttempts;
    private const int MaxReconnects = 5;

    public OilPriceWebSocket(string apiKey)
    {
        _apiKey = apiKey;
        _webSocket = new ClientWebSocket();
        _cancellationTokenSource = new CancellationTokenSource();
    }

    public async Task ConnectAsync()
    {
        _webSocket = new ClientWebSocket();
        await _webSocket.ConnectAsync(new Uri(WebSocketUrl), _cancellationTokenSource.Token);

        Console.WriteLine("WebSocket connected");
        _reconnectAttempts = 0;

        await SubscribeAsync();
    }

    private async Task SubscribeAsync()
    {
        var identifier = JsonSerializer.Serialize(new
        {
            channel = "EnergyPricesChannel",
            api_key = _apiKey
        });

        var subscribeMessage = JsonSerializer.Serialize(new
        {
            command = "subscribe",
            identifier = identifier
        });

        var bytes = Encoding.UTF8.GetBytes(subscribeMessage);
        await _webSocket.SendAsync(
            new ArraySegment<byte>(bytes),
            WebSocketMessageType.Text,
            true,
            _cancellationTokenSource.Token
        );

        Console.WriteLine("Subscribed to EnergyPricesChannel");
    }

    public async Task ListenAsync()
    {
        var buffer = new byte[4096];

        while (_webSocket.State == WebSocketState.Open)
        {
            try
            {
                var result = await _webSocket.ReceiveAsync(
                    new ArraySegment<byte>(buffer),
                    _cancellationTokenSource.Token
                );

                if (result.MessageType == WebSocketMessageType.Close)
                {
                    await ReconnectAsync();
                    return;
                }

                var message = Encoding.UTF8.GetString(buffer, 0, result.Count);
                HandleMessage(message);
            }
            catch (WebSocketException ex)
            {
                Console.WriteLine($"WebSocket error: {ex.Message}");
                await ReconnectAsync();
                return;
            }
        }
    }

    private void HandleMessage(string rawMessage)
    {
        try
        {
            using var doc = JsonDocument.Parse(rawMessage);
            var root = doc.RootElement;

            // Skip ping messages
            if (root.TryGetProperty("type", out var type) && type.GetString() == "ping")
            {
                return;
            }

            // Handle price updates
            if (root.TryGetProperty("message", out var message))
            {
                if (message.TryGetProperty("type", out var msgType) &&
                    msgType.GetString() == "price_update")
                {
                    var data = message.GetProperty("data");
                    var code = data.GetProperty("code").GetString();
                    var price = data.GetProperty("price").GetDouble();
                    var change = data.GetProperty("change_percent").GetDouble();

                    Console.WriteLine($"{code}: ${price:F2} ({change:F2}%)");
                }
            }
        }
        catch (JsonException ex)
        {
            Console.WriteLine($"JSON parse error: {ex.Message}");
        }
    }

    private async Task ReconnectAsync()
    {
        if (_reconnectAttempts >= MaxReconnects)
        {
            Console.WriteLine("Max reconnection attempts reached");
            return;
        }

        _reconnectAttempts++;
        var delay = _reconnectAttempts * 5000;
        Console.WriteLine($"Reconnecting in {delay}ms (attempt {_reconnectAttempts}/{MaxReconnects})");

        await Task.Delay(delay);

        try
        {
            _webSocket.Dispose();
            await ConnectAsync();
            _ = ListenAsync();
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Reconnection failed: {ex.Message}");
            await ReconnectAsync();
        }
    }

    public async Task DisconnectAsync()
    {
        if (_webSocket.State == WebSocketState.Open)
        {
            await _webSocket.CloseAsync(
                WebSocketCloseStatus.NormalClosure,
                "Client disconnect",
                CancellationToken.None
            );
        }
    }

    public void Dispose()
    {
        _cancellationTokenSource.Cancel();
        _webSocket?.Dispose();
        _cancellationTokenSource?.Dispose();
    }
}

// Usage
class Program
{
    static async Task Main(string[] args)
    {
        using var client = new OilPriceWebSocket("YOUR_API_KEY");

        Console.CancelKeyPress += async (sender, e) =>
        {
            e.Cancel = true;
            Console.WriteLine("\nShutting down...");
            await client.DisconnectAsync();
        };

        await client.ConnectAsync();
        await client.ListenAsync();
    }
}

Rust Implementation

use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::time::Duration;
use tokio::time::sleep;
use tokio_tungstenite::{connect_async, tungstenite::Message};

const WEBSOCKET_URL: &str = "wss://api.oilpriceapi.com/cable";
const MAX_RECONNECTS: u32 = 5;

#[derive(Serialize)]
struct SubscribeMessage {
    command: String,
    identifier: String,
}

#[derive(Deserialize, Debug)]
struct PriceData {
    code: String,
    price: f64,
    change_percent: f64,
}

#[derive(Deserialize, Debug)]
struct PriceUpdate {
    #[serde(rename = "type")]
    msg_type: Option<String>,
    data: Option<PriceData>,
}

pub struct OilPriceClient {
    api_key: String,
    reconnect_attempts: u32,
}

impl OilPriceClient {
    pub fn new(api_key: &str) -> Self {
        Self {
            api_key: api_key.to_string(),
            reconnect_attempts: 0,
        }
    }

    pub async fn connect(&mut self) -> Result<(), Box<dyn std::error::Error>> {
        loop {
            match self.run_connection().await {
                Ok(_) => break Ok(()),
                Err(e) => {
                    eprintln!("Connection error: {}", e);
                    if !self.reconnect().await {
                        break Err(e);
                    }
                }
            }
        }
    }

    async fn run_connection(&mut self) -> Result<(), Box<dyn std::error::Error>> {
        let (ws_stream, _) = connect_async(WEBSOCKET_URL).await?;
        println!("WebSocket connected");
        self.reconnect_attempts = 0;

        let (mut write, mut read) = ws_stream.split();

        // Subscribe to channel
        let identifier = json!({
            "channel": "EnergyPricesChannel",
            "api_key": self.api_key
        });

        let subscribe_msg = SubscribeMessage {
            command: "subscribe".to_string(),
            identifier: identifier.to_string(),
        };

        let msg = Message::Text(serde_json::to_string(&subscribe_msg)?);
        write.send(msg).await?;
        println!("Subscribed to EnergyPricesChannel");

        // Listen for messages
        while let Some(message) = read.next().await {
            match message {
                Ok(Message::Text(text)) => {
                    self.handle_message(&text);
                }
                Ok(Message::Close(_)) => {
                    println!("Connection closed by server");
                    break;
                }
                Err(e) => {
                    eprintln!("Receive error: {}", e);
                    break;
                }
                _ => {}
            }
        }

        Ok(())
    }

    fn handle_message(&self, raw_message: &str) {
        let value: Result<Value, _> = serde_json::from_str(raw_message);

        if let Ok(msg) = value {
            // Skip ping messages
            if msg.get("type").and_then(|t| t.as_str()) == Some("ping") {
                return;
            }

            // Handle price updates
            if let Some(message) = msg.get("message") {
                if let Ok(update) = serde_json::from_value::<PriceUpdate>(message.clone()) {
                    if update.msg_type.as_deref() == Some("price_update") {
                        if let Some(data) = update.data {
                            println!(
                                "{}: ${:.2} ({:.2}%)",
                                data.code, data.price, data.change_percent
                            );
                        }
                    }
                }
            }
        }
    }

    async fn reconnect(&mut self) -> bool {
        if self.reconnect_attempts >= MAX_RECONNECTS {
            eprintln!("Max reconnection attempts reached");
            return false;
        }

        self.reconnect_attempts += 1;
        let delay = self.reconnect_attempts as u64 * 5;
        println!(
            "Reconnecting in {} seconds (attempt {}/{})",
            delay, self.reconnect_attempts, MAX_RECONNECTS
        );

        sleep(Duration::from_secs(delay)).await;
        true
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = OilPriceClient::new("YOUR_API_KEY");

    // Handle Ctrl+C
    tokio::spawn(async {
        tokio::signal::ctrl_c().await.unwrap();
        println!("\nShutting down...");
        std::process::exit(0);
    });

    client.connect().await?;
    Ok(())
}

Cargo.toml dependencies:

[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = { version = "0.21", features = ["native-tls"] }
futures-util = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"

Message Types

Price Update

Sent whenever a commodity price changes:

{
  "type": "price_update",
  "data": {
    "code": "WTI_USD",
    "name": "West Texas Intermediate",
    "price": 75.43,
    "formatted": "$75.43",
    "currency": "USD",
    "unit": "barrel",
    "change": 0.22,
    "change_percent": 0.29,
    "previous_price": 75.21,
    "created_at": "2025-07-18T14:30:00.000Z"
  }
}

Drilling Intelligence Update

Sent when new drilling intelligence data is available:

{
  "type": "drilling_intelligence_update",
  "data": {
    "code": "US_RIG_COUNT",
    "name": "US Rig Count",
    "value": 622,
    "previous_value": 619,
    "change": 3,
    "change_percent": 0.48,
    "region": "United States",
    "source": "baker_hughes",
    "created_at": "2025-07-18T18:00:00.000Z"
  }
}

Alert Notification

Sent when a configured alert is triggered:

{
  "type": "alert",
  "data": {
    "alert_type": "price_threshold",
    "commodity": "BRENT_CRUDE_USD",
    "condition": "above",
    "threshold": 80.0,
    "current_price": 80.15,
    "message": "Brent Crude has exceeded $80.00",
    "triggered_at": "2025-07-18T15:45:00.000Z"
  }
}

Connection Status

System messages about connection state:

{
  "type": "welcome",
  "message": "Successfully connected to OilPriceAPI WebSocket"
}

Subscribing to Specific Commodities

You can filter which commodities you receive updates for:

const subscription = cable.subscriptions.create(
  {
    channel: "EnergyPricesChannel",
    commodities: ["WTI_USD", "BRENT_CRUDE_USD", "NATURAL_GAS_USD"],
  },
  {
    received(data) {
      // Only receive updates for specified commodities
    },
  },
);

Error Handling

Connection Errors

subscription.rejected = () => {
  console.error("Subscription rejected. Check your API key and plan.");
};

Automatic Reconnection

ActionCable handles reconnection automatically. For custom implementations:

let reconnectInterval = 1000; // Start with 1 second

function connect() {
  const ws = new WebSocket("wss://api.oilpriceapi.com/cable");

  ws.onclose = () => {
    setTimeout(() => {
      reconnectInterval = Math.min(reconnectInterval * 2, 30000); // Max 30 seconds
      connect();
    }, reconnectInterval);
  };

  ws.onopen = () => {
    reconnectInterval = 1000; // Reset on successful connection
  };
}

Best Practices

1. Handle Disconnections Gracefully

Always implement reconnection logic and inform users of connection status.

2. Process Messages Efficiently

// Use a queue for high-frequency updates
const priceQueue = [];

subscription.received = (data) => {
  priceQueue.push(data);
};

// Process queue periodically
setInterval(() => {
  if (priceQueue.length > 0) {
    const updates = priceQueue.splice(0, priceQueue.length);
    updateUI(updates);
  }
}, 100); // Update UI every 100ms

3. Implement Heartbeat

Keep the connection alive during quiet periods:

setInterval(() => {
  if (subscription) {
    subscription.perform("ping");
  }
}, 30000); // Ping every 30 seconds

4. Monitor Connection Health

let lastMessageTime = Date.now();

subscription.received = (data) => {
  lastMessageTime = Date.now();
  // Process message
};

// Check connection health
setInterval(() => {
  if (Date.now() - lastMessageTime > 60000) {
    console.warn("No messages for 60 seconds, connection may be stale");
    // Reconnect if needed
  }
}, 10000);

Rate Limits

WebSocket connections are limited by plan:

  • Reservoir Mastery: Up to 5 concurrent connections per API key
  • Message Rate: No limit on received messages
  • Send Rate: 10 messages per second per connection

Testing & Development

Interactive Connection Tester

Test your WebSocket connection directly in your browser:

WebSocket Test Harness

We provide an open-source WebSocket Test Harness for testing your connection before integrating into your application. This tool lets you:

  • Test WebSocket connectivity from Node.js, Python, or browser
  • Verify your API key works with WebSocket
  • Inspect real-time messages to understand the data format
  • Debug connection issues before deploying to production

Quick Start (Node.js)

# Clone the harness
git clone https://github.com/karlwaldman/oilpriceapi-websocket-tester.git
cd oilpriceapi-websocket-tester

# Install and run
npm install
node cli/index.js YOUR_API_KEY

Browser Testing

Open browser/index.html directly in your browser - no server required:

  1. Open browser/index.html in your browser
  2. Enter your API key
  3. Click Connect to see real-time price updates

Python Testing

pip install -r python/requirements.txt
python python/tester.py YOUR_API_KEY

Resources

  • GitHub Repository: github.com/karlwaldman/oilpriceapi-websocket-tester
  • Includes: Node.js CLI, Python script, and standalone browser tester
  • No secrets stored: API key is always passed at runtime

Troubleshooting

Connection Rejected

  • Verify API key is valid
  • Confirm Reservoir Mastery subscription is active
  • Check if maximum connections reached

No Messages Received

  • Ensure subscription command was sent
  • Verify commodities are actively trading
  • Check browser console for errors

Frequent Disconnections

  • Check network stability
  • Verify firewall allows WebSocket connections
  • Consider implementing exponential backoff

Support

For WebSocket-specific support:

  • Technical docs: https://docs.oilpriceapi.com/websocket/
  • Email: [email protected]
  • Slack: Available for Reservoir Mastery subscribers

Upgrade to Reservoir Mastery to access WebSocket streaming.

Last Updated: 2/3/26, 1:27 AM