DEX API

Dex-integration#

To successfully integrate your DEX into the OKX Routing Engine, please follow the steps below:

  1. Provide a DEX SDK compatible with the OKX DEX AMM Interface: We require a DEX SDK that is fully compatible with our AMM (Automated Market Maker) interface. This SDK will allow us to communicate with your DEX and integrate its functionalities, enabling liquidity provision and trade execution seamlessly on our platform. The SDK should be able to handle key functions like pricing, order matching, liquidity pool interaction, and trade execution.

  2. Allow us to fork your SDK: In order to ensure long-term maintenance and support for our users, it’s important that we can fork your SDK. This means we need the ability to create our own version of the SDK for integration purposes. By doing so, we can:

  • Guarantee the stability of the SDK for our users.
  • Maintain and update the SDK independently, fixing potential bugs and addressing issues related to your DEX’s integration within our ecosystem.
  • Customize the SDK to meet our specific requirements and adapt it for performance optimization.
  1. Follow the guide and example we provide: We will provide a detailed integration guide and example code to ensure smooth integration. This will help you understand the technical requirements and structure, making the process easier and faster. By adhering to this documentation, you ensure that your DEX SDK aligns with our platform’s needs and integration standards.

By meeting these guidelines, we will be able to provide your DEX with reliable support and ensure that your liquidity is accessible through the OKX DEX Routing Engine.

Note
Currently, the interface only works for Solana-based DEXs.

AMM Interface#

use async_trait::async_trait;
use solana_client::rpc_client::RpcClient;
use solana_sdk::pubkey::Pubkey;
use std::collections::HashMap;
use std::error::Error;
use tokio::sync::mpsc::Sender;

pub mod raydium_clmm;
// Other dex template
// pub mod raydium_amm;
// pub mod orca_amm;
// ...

#[async_trait]
pub trait Dex: Send + Sync {
    // Return DEX Name
    fn dex_name(&self) -> String;

    // Return DEX Program ID
    fn dex_program_id(&self) -> Pubkey;

    // Quote function
    fn quote(&self, amount_in: f64, metadata: &PoolMetadata) -> f64;

    // fetch all the pool address
    fn fetch_pool_addresses(&self, client: &RpcClient) -> Vec<String>;

   // monitor new pools and event changes
    async fn listen_new_pool_addresses(
        &self,
        client: &RpcClient,
        address_tx: Sender<String>,
    ) -> Result<(), Box<dyn Error>>;

    // To export quote parameters from the pool address
    fn fetch_pool_metadata(&self, client: &RpcClient, pool_address: &str) -> Option<PoolMetadata>;
}

// abstract design for pool metadata
#[derive(Clone)]
pub struct PoolMetadata {
    pub pool_address: String,
    pub base_mint: String,
    pub quote_mint: String,
    pub base_reserve: Option<f64>,
    pub quote_reserve: Option<f64>,
    pub trade_fee: Option<f64>,
    pub extra: HashMap<String, PoolMetadataValue>,
}

// Extended value types for pool 
#[derive(Clone)]
pub enum PoolMetadataValue {
    String(String),
    Number(f64),
    Bool(bool),
    Array(Vec<PoolMetadataValue>),
    Map(HashMap<String, PoolMetadataValue>),
}

// Generic simplify HashMap access
macro_rules! get_extra {
    ($metadata:expr, $key:expr, $variant:path) => {
        $metadata.extra.get($key).and_then(|v| match v {
            $variant(val) => Some(val.clone()),
            _ => None,
        })
    };
}


pub(crate) use get_extra;

An example of implementation based on Raydium CLMM#

use async_trait::async_trait;
use solana_client::rpc_client::RpcClient;
use solana_sdk::{
    pubkey::Pubkey,
    signature::Signature,
    commitment_config::CommitmentConfig,
};
use std::collections::HashMap;
use std::error::Error;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc::Sender;
use base58::{FromBase58, ToBase58};
use log::{info, error};
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};

use super::{Dex, PoolMetadata, get_extra};

// Raydium CLMM Program ID
const PROGRAM_ID: &str = "CLMM9tUush29+wnRVN2QqohW5Ns5mYAPbXTRmbn6kYH";

// Global Mapping Table
lazy_static::lazy_static! {
    static ref POOL_ADDRESS_MAP: Arc<Mutex<HashMap<String, PoolDerivedAccounts>>> = Arc::new(Mutex::new(HashMap::new()));
}

// Raydium CLMM Pool Derived Account
#[derive(Clone)]
struct PoolDerivedAccounts {
    pool_address: String,
    base_mint: String,
    quote_mint: String,
    base_vault: String,
    quote_vault: String,
    fee_state: String,
    tick_array_current: String,
    tick_array_prev: String,
    tick_array_next: String,
    observation_state: String,
}

pub struct RaydiumCLMM;

#[async_trait]
impl Dex for RaydiumCLMM {
    fn dex_name(&self) -> String {
        "RaydiumCLMM".to_string()
    }

    fn dex_program_id(&self) -> Pubkey {
        Pubkey::from_str(PROGRAM_ID).unwrap()
    }

    fn quote(&self, amount_in: f64, metadata: &PoolMetadata) -> f64 {
        if amount_in <= 0.0 {
            return 0.0;
        }

        let fee_rate = metadata.trade_fee.unwrap_or(0.0);
        let amount_in_with_fee = amount_in * (1.0 - fee_rate);
        let mut remaining_in = amount_in_with_fee;
        let mut total_out = 0.0;

        let sqrt_price_x64 = get_extra!(metadata, "sqrt_price_x64", PoolMetadataValue::Number).unwrap_or(0.0);
        let tick_current_index = get_extra!(metadata, "tick_current_index", PoolMetadataValue::Number).unwrap_or(0.0) as i32;
        let tick_array = get_extra!(metadata, "tick_array", PoolMetadataValue::Array).unwrap_or(vec![]);

        if tick_array.is_empty() {
            return 0.0;
        }

        let current_price = sqrt_price_x64 / 2_f64.powi(64);
        let mut current_tick = tick_current_index;
        let mut ticks: Vec<(i32, f64)> = tick_array.iter()
            .filter_map(|v| match v {
                PoolMetadataValue::Map(m) => {
                    let tick_index = get_extra!(m, "tick_index", PoolMetadataValue::Number)? as i32;
                    let liquidity = get_extra!(m, "liquidity", PoolMetadataValue::Number)?;
                    Some((tick_index, liquidity))
                },
                _ => None,
            })
            .collect();
        ticks.sort_by(|a, b| a.0.cmp(&b.0));

        for (tick_index, liquidity) in ticks {
            if tick_index <= current_tick {
                continue;
            }
            let sqrt_price_lower = (1.0001_f64.powf(current_tick as f64)).sqrt();
            let sqrt_price_upper = (1.0001_f64.powf(tick_index as f64)).sqrt();
            let delta_sqrt_price = sqrt_price_upper - sqrt_price_lower;

            let max_amount_out = liquidity * delta_sqrt_price;
            let cost = max_amount_out * (sqrt_price_upper + sqrt_price_lower) / 2.0;

            if remaining_in >= cost {
                total_out += max_amount_out;
                remaining_in -= cost;
                current_tick = tick_index;
            } else {
                let fraction = remaining_in / cost;
                total_out += max_amount_out * fraction;
                remaining_in = 0.0;
                break;
            }
            if remaining_in <= 0.0 {
                break;
            }
        }
        total_out
    }

    fn fetch_pool_addresses(&self, client: &RpcClient) -> Vec<String> {
        let program_id = self.dex_program_id();
        let accounts = match client.get_program_accounts(&program_id) {
            Ok(accs) => accs,
            Err(e) => {
                error!("Failed to fetch {} pool addresses: {}", self.dex_name(), e);
                return Vec::new();
            }
        };

        let mut pool_addresses = Vec::new();
        for (pubkey, account) in accounts {
            if account.owner != program_id {
                continue;
            }

            let data = account.data;
            if data.len() < 200 {
                continue;
            }

            let base_mint = data[0..32].to_base58();
            let quote_mint = data[32..64].to_base58();
            if base_mint.is_empty() || quote_mint.is_empty() {
                continue;
            }

            let discriminator = u64::from_le_bytes(data[0..8].try_into().unwrap());
            if discriminator == 0 {
                continue;
            }

            let pool_address = pubkey.to_string();
            pool_addresses.push(pool_address.clone());

            if let Some(accounts) = self.derive_accounts_from_pool_address(client, &pool_address) {
                POOL_ADDRESS_MAP.lock().unwrap().insert(pool_address, accounts);
            }
        }
        pool_addresses
    }

    async fn listen_new_pool_addresses(
        &self,
        client: &RpcClient,
        address_tx: Sender<String>,
    ) -> Result<(), Box<dyn Error>> {
        let program_id = self.dex_program_id();
        let ws_url = "wss://api.mainnet-beta.solana.com";
        let (mut ws_stream, _) = connect_async(ws_url).await?;
        let subscribe_msg = format!(
            r#"{"jsonrpc":"2.0","id":1,"method":"logsSubscribe","params":["mentions","{}"]}"#,
            program_id
        );
        ws_stream.send(Message::Text(subscribe_msg)).await?;

        while let Some(msg) = ws_stream.next().await {
            let msg = msg?;
            if let Message::Text(text) = msg {
                let log: serde_json::Value = serde_json::from_str(&text)?;
                if log.get("result").is_some() {
                    continue;
                }

                let params = log.get("params").and_then(|p| p.get("result")).ok_or("No params")?;
                let tx_sig = params.get("signature").and_then(|s| s.as_str()).ok_or("No signature")?;
                let logs = params.get("logs").and_then(|l| l.as_array()).ok_or("No logs")?;

                let tx = client.get_transaction(
                    &Signature::from_str(tx_sig)?,
                    CommitmentConfig::confirmed(),
                )?;
                if tx.meta.is_some() && tx.meta.unwrap().err.is_some() {
                    continue;
                }

                let log_str = logs.iter().filter_map(|l| l.as_str()).collect::<Vec<&str>>().join(" ");
                let account_keys = tx.transaction.message.account_keys;

                for (i, key) in account_keys.iter().enumerate() {
                    if tx.transaction.message.is_writable(i) {
                        let pool_address = self.find_pool_address_from_account(&key.to_string());
                        if pool_address.is_empty() && self.is_valid_pool_address(client, &key.to_string()) {
                            if let Some(accounts) = self.derive_accounts_from_pool_address(client, &key.to_string()) {
                                POOL_ADDRESS_MAP.lock().unwrap().insert(key.to_string(), accounts);
                                info!("Detected new {} pool address: {}", self.dex_name(), key);
                                address_tx.send(key.to_string()).await?;
                            }
                        } else if !pool_address.is_empty() {
                            info!("Detected writable account affecting pool: {}, Pool: {}", tx_sig, pool_address);
                            address_tx.send(pool_address).await?;
                        }
                    }
                }

                if log_str.contains("initialize") {
                    if let Some(pool_address) = self.extract_new_pool_address(client, tx_sig) {
                        if self.is_valid_pool_address(client, &pool_address) {
                            if let Some(accounts) = self.derive_accounts_from_pool_address(client, &pool_address) {
                                POOL_ADDRESS_MAP.lock().unwrap().insert(pool_address.clone(), accounts);
                                info!("Detected new {} pool address: {}", self.dex_name(), pool_address);
                                address_tx.send(pool_address).await?;
                            }
                        }
                    }
                }
            }
        }
        Ok(())
    }

    fn fetch_pool_metadata(&self, client: &RpcClient, pool_address: &str) -> Option<PoolMetadata> {
        let derived_accounts = self.derive_accounts_from_pool_address(client, pool_address)?;
        let out = client.get_account_data(&Pubkey::from_str(pool_address).ok()?).ok()?;
        if out.len() < 200 {
            error!("Pool data too short for {}", pool_address);
            return None;
        }

        let base_reserve = self.get_vault_balance(client, &derived_accounts.base_vault);
        let quote_reserve = self.get_vault_balance(client, &derived_accounts.quote_vault);

        let fee_data = client.get_account_data(&Pubkey::from_str(&derived_accounts.fee_state).ok()?).ok()?;
        let fee_numerator = u64::from_le_bytes(fee_data[0..8].try_into().unwrap());
        let fee_denominator = u64::from_le_bytes(fee_data[8..16].try_into().unwrap());
        let trade_fee = if fee_denominator > 0 { Some(fee_numerator as f64 / fee_denominator as f64) } else { None };

        let sqrt_price_x64 = u64::from_le_bytes(out[64..72].try_into().unwrap());
        let tick_current_index = i32::from_le_bytes(out[72..76].try_into().unwrap());

        let tick_array_data = client.get_account_data(&Pubkey::from_str(&derived_accounts.tick_array_current).ok()?).unwrap_or_default();
        let mut tick_array = Vec::new();
        if !tick_array_data.is_empty() {
            for i in (8..tick_array_data.len() - 12).step_by(12) {
                let tick_index = i32::from_le_bytes(tick_array_data[i..i+4].try_into().unwrap());
                let liquidity = f64::from_le_bytes(tick_array_data[i+4..i+12].try_into().unwrap()) / 1_000_000.0;
                let mut tick_map = HashMap::new();
                tick_map.insert("tick_index".to_string(), PoolMetadataValue::Number(tick_index as f64));
                tick_map.insert("liquidity".to_string(), PoolMetadataValue::Number(liquidity));
                tick_array.push(PoolMetadataValue::Map(tick_map));
            }
        }

        let mut extra = HashMap::new();
        extra.insert("sqrt_price_x64".to_string(), PoolMetadataValue::Number(sqrt_price_x64 as f64));
        extra.insert("tick_current_index".to_string(), PoolMetadataValue::Number(tick_current_index as f64));
        extra.insert("tick_array".to_string(), PoolMetadataValue::Array(tick_array));

        Some(PoolMetadata {
            pool_address: pool_address.to_string(),
            base_mint: derived_accounts.base_mint,
            quote_mint: derived_accounts.quote_mint,
            base_reserve: Some(base_reserve),
            quote_reserve: Some(quote_reserve),
            trade_fee,
            extra,
        })
    }
}

impl RaydiumCLMM {
    fn derive_accounts_from_pool_address(&self, client: &RpcClient, pool_address: &str) -> Option<PoolDerivedAccounts> {
        let out = client.get_account_data(&Pubkey::from_str(pool_address).ok()?).ok()?;
        if out.len() < 200 {
            error!("Pool data too short for {}", pool_address);
            return None;
        }

        let base_mint = out[0..32].to_base58();
        let quote_mint = out[32..64].to_base58();
        let base_vault = out[96..128].to_base58();
        let quote_vault = out[128..160].to_base58();
        let fee_state = out[160..192].to_base58();

        let tick_array_current_seed = format!("tick_array{}", pool_address);
        let tick_array_current = Pubkey::create_program_address(
            &[&tick_array_current_seed.as_bytes()],
            &Pubkey::from_str(PROGRAM_ID).unwrap(),
        ).ok()?.to_string();

        let tick_array_prev_seed = format!("tick_array_prev{}", pool_address);
        let tick_array_prev = Pubkey::create_program_address(
            &[&tick_array_prev_seed.as_bytes()],
            &Pubkey::from_str(PROGRAM_ID).unwrap(),
        ).ok()?.to_string();

        let tick_array_next_seed = format!("tick_array_next{}", pool_address);
        let tick_array_next = Pubkey::create_program_address(
            &[&tick_array_next_seed.as_bytes()],
            &Pubkey::from_str(PROGRAM_ID).unwrap(),
        ).ok()?.to_string();

        let observation_state_seed = format!("observation_state{}", pool_address);
        let observation_state = Pubkey::create_program_address(
            &[&observation_state_seed.as_bytes()],
            &Pubkey::from_str(PROGRAM_ID).unwrap(),
        ).ok()?.to_string();

        Some(PoolDerivedAccounts {
            pool_address: pool_address.to_string(),
            base_mint,
            quote_mint,
            base_vault,
            quote_vault,
            fee_state,
            tick_array_current,
            tick_array_prev,
            tick_array_next,
            observation_state,
        })
    }

    fn find_pool_address_from_account(&self, account_address: &str) -> String {
        let map = POOL_ADDRESS_MAP.lock().unwrap();
        for (pool_address, accounts) in map.iter() {
            if account_address == pool_address ||
               account_address == &accounts.base_vault ||
               account_address == &accounts.quote_vault ||
               account_address == &accounts.tick_array_current ||
               account_address == &accounts.tick_array_prev ||
               account_address == &accounts.tick_array_next {
                return pool_address.clone();
            }
        }
        String::new()
    }

    fn is_valid_pool_address(&self, client: &RpcClient, pool_address: &str) -> bool {
        let out = match client.get_account_data(&Pubkey::from_str(pool_address).unwrap()) {
            Ok(data) => data,
            Err(_) => return false,
        };

        if out.len() < 200 {
            return false;
        }

        if Pubkey::from_str(&out.owner.to_string()).unwrap() != Pubkey::from_str(PROGRAM_ID).unwrap() {
            return false;
        }

        let base_mint = out[0..32].to_base58();
        let quote_mint = out[32..64].to_base58();
        if base_mint.is_empty() || quote_mint.is_empty() {
            return false;
        }

        let discriminator = u64::from_le_bytes(out[0..8].try_into().unwrap());
        if discriminator == 0 {
            return false;
        }

        true
    }

    fn get_vault_balance(&self, client: &RpcClient, vault: &str) -> f64 {
        match client.get_token_account_balance(&Pubkey::from_str(vault).unwrap(), CommitmentConfig::confirmed()) {
            Ok(resp) => resp.ui_amount.unwrap_or(0.0),
            Err(_) => 0.0,
        }
    }

    fn extract_new_pool_address(&self, client: &RpcClient, tx_sig: &str) -> Option<String> {
        let tx = client.get_transaction(
            &Signature::from_str(tx_sig).ok()?,
            CommitmentConfig::confirmed(),
        ).ok()?;
        for key in tx.transaction.message.account_keys {
            if key.is_writable() && !key.is_signer() {
                return Some(key.to_string());
            }
        }
        None
    }
}