DEX API

Dex 集成#

为了成功将您的 DEX 集成到 OKX 路由引擎中,请按照以下步骤操作:

  1. 提供与 OKX DEX AMM 接口兼容的 DEX SDK: 我们需要一个与我们的 AMM(自动化市场制造商)接口完全兼容的 DEX SDK。该SDK将允许我们与您的 DEX 进行通信并集成其功能,使流动性提供和交易执行能够在我们的平台上无缝进行。SDK 应能够处理定价、订单匹配、流动性池交互和交易执行等关键功能。

  2. 允许我们分叉您的 SDK: 为了确保长期的维护和用户支持,我们需要能够分叉您的 SDK。这意味着我们需要创建自己版本的 SDK 以供集成使用。通过这样做,我们可以:

  • 确保 SDK 的稳定性。
  • 独立维护和更新 SDK,修复潜在的错误并解决与您 DEX 集成相关的问题。
  • 根据我们的特定需求定制 SDK,进行性能优化。
  1. 遵循我们提供的指南和示例: 我们将提供详细的集成指南和示例代码,确保集成过程顺利进行。这将帮助您理解技术要求和结构,使过程更加简便和高效。通过遵循这些文档,您可以确保您的 DEX SDK 与我们的平台需求和集成标准对齐。

通过满足这些指南,我们将能够为您的DEX提供可靠的支持,并确保您的流动性能够通过 OKX DEX 路由引擎进行访问。

注意
目前,该接口仅适用于基于 Solana 的 DEX。

AMM 接入示例#

use async_trait::async_trait;
use solana_client::rpc_client::RpcClient;
use solana_sdk::pubkey::Pubkey;
use std::collections::HashMap;
use std::error::Error;
use tokio::sync::mpsc::Sender;

pub mod raydium_clmm;
// 其他 DEX 模块声明
// pub mod raydium_amm;
// pub mod orca_amm;
// ...

#[async_trait]
pub trait Dex: Send + Sync {
    // 返回 DEX 名称
    fn dex_name(&self) -> String;

    // 返回 DEX 的程序 ID
    fn dex_program_id(&self) -> Pubkey;

    // 报价函数
    fn quote(&self, amount_in: f64, metadata: &PoolMetadata) -> f64;

    // 扫描链上所有池子地址
    fn fetch_pool_addresses(&self, client: &RpcClient) -> Vec<String>;

    // 监听新池子和数据变化的池子
    async fn listen_new_pool_addresses(
        &self,
        client: &RpcClient,
        address_tx: Sender<String>,
    ) -> Result<(), Box<dyn Error>>;

    // 从池子地址导出报价参数
    fn fetch_pool_metadata(&self, client: &RpcClient, pool_address: &str) -> Option<PoolMetadata>;
}

// 池子元数据(抽象设计)
#[derive(Clone)]
pub struct PoolMetadata {
    pub pool_address: String,
    pub base_mint: String,
    pub quote_mint: String,
    pub base_reserve: Option<f64>,
    pub quote_reserve: Option<f64>,
    pub trade_fee: Option<f64>,
    pub extra: HashMap<String, PoolMetadataValue>,
}

// 池子元数据的扩展值类型
#[derive(Clone)]
pub enum PoolMetadataValue {
    String(String),
    Number(f64),
    Bool(bool),
    Array(Vec<PoolMetadataValue>),
    Map(HashMap<String, PoolMetadataValue>),
}

// 通用宏简化 HashMap 访问
macro_rules! get_extra {
    ($metadata:expr, $key:expr, $variant:path) => {
        $metadata.extra.get($key).and_then(|v| match v {
            $variant(val) => Some(val.clone()),
            _ => None,
        })
    };
}

pub(crate) use get_extra;

基于 Raydium CLMM 的实现例子#

use async_trait::async_trait;
use solana_client::rpc_client::RpcClient;
use solana_sdk::{
    pubkey::Pubkey,
    signature::Signature,
    commitment_config::CommitmentConfig,
};
use std::collections::HashMap;
use std::error::Error;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc::Sender;
use base58::{FromBase58, ToBase58};
use log::{info, error};
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};

use super::{Dex, PoolMetadata, get_extra};

// Raydium CLMM 程序 ID
const PROGRAM_ID: &str = "CLMM9tUush29+wnRVN2QqohW5Ns5mYAPbXTRmbn6kYH";

// 全局映射表
lazy_static::lazy_static! {
    static ref POOL_ADDRESS_MAP: Arc<Mutex<HashMap<String, PoolDerivedAccounts>>> = Arc::new(Mutex::new(HashMap::new()));
}

// Raydium CLMM 的池子派生账户
#[derive(Clone)]
struct PoolDerivedAccounts {
    pool_address: String,
    base_mint: String,
    quote_mint: String,
    base_vault: String,
    quote_vault: String,
    fee_state: String,
    tick_array_current: String,
    tick_array_prev: String,
    tick_array_next: String,
    observation_state: String,
}

pub struct RaydiumCLMM;

#[async_trait]
impl Dex for RaydiumCLMM {
    fn dex_name(&self) -> String {
        "RaydiumCLMM".to_string()
    }

    fn dex_program_id(&self) -> Pubkey {
        Pubkey::from_str(PROGRAM_ID).unwrap()
    }

    fn quote(&self, amount_in: f64, metadata: &PoolMetadata) -> f64 {
        if amount_in <= 0.0 {
            return 0.0;
        }

        let fee_rate = metadata.trade_fee.unwrap_or(0.0);
        let amount_in_with_fee = amount_in * (1.0 - fee_rate);
        let mut remaining_in = amount_in_with_fee;
        let mut total_out = 0.0;

        let sqrt_price_x64 = get_extra!(metadata, "sqrt_price_x64", PoolMetadataValue::Number).unwrap_or(0.0);
        let tick_current_index = get_extra!(metadata, "tick_current_index", PoolMetadataValue::Number).unwrap_or(0.0) as i32;
        let tick_array = get_extra!(metadata, "tick_array", PoolMetadataValue::Array).unwrap_or(vec![]);

        if tick_array.is_empty() {
            return 0.0;
        }
        let current_price = sqrt_price_x64 / 2_f64.powi(64);
        let mut current_tick = tick_current_index;
        let mut ticks: Vec<(i32, f64)> = tick_array.iter()
            .filter_map(|v| match v {
                PoolMetadataValue::Map(m) => {
                    let tick_index = get_extra!(m, "tick_index", PoolMetadataValue::Number)? as i32;
                    let liquidity = get_extra!(m, "liquidity", PoolMetadataValue::Number)?;
                    Some((tick_index, liquidity))
                },
                _ => None,
            })
            .collect();
        ticks.sort_by(|a, b| a.0.cmp(&b.0));

        for (tick_index, liquidity) in ticks {
            if tick_index <= current_tick {
                continue;
            }
            let sqrt_price_lower = (1.0001_f64.powf(current_tick as f64)).sqrt();
            let sqrt_price_upper = (1.0001_f64.powf(tick_index as f64)).sqrt();
            let delta_sqrt_price = sqrt_price_upper - sqrt_price_lower;

            let max_amount_out = liquidity * delta_sqrt_price;
            let cost = max_amount_out * (sqrt_price_upper + sqrt_price_lower) / 2.0;

            if remaining_in >= cost {
                total_out += max_amount_out;
                remaining_in -= cost;
                current_tick = tick_index;
            } else {
                let fraction = remaining_in / cost;
                total_out += max_amount_out * fraction;
                remaining_in = 0.0;
                break;
            }
            if remaining_in <= 0.0 {
                break;
            }
        }
        total_out
    }

    fn fetch_pool_addresses(&self, client: &RpcClient) -> Vec<String> {
        let program_id = self.dex_program_id();
        let accounts = match client.get_program_accounts(&program_id) {
            Ok(accs) => accs,
            Err(e) => {
                error!("Failed to fetch {} pool addresses: {}", self.dex_name(), e);
                return Vec::new();
            }
        };

        let mut pool_addresses = Vec::new();
        for (pubkey, account) in accounts {
            if account.owner != program_id {
                continue;
            }

            let data = account.data;
            if data.len() < 200 {
                continue;
            }

            let base_mint = data[0..32].to_base58();
            let quote_mint = data[32..64].to_base58();
            if base_mint.is_empty() || quote_mint.is_empty() {
                continue;
            }

            let discriminator = u64::from_le_bytes(data[0..8].try_into().unwrap());
            if discriminator == 0 {
                continue;
            }

            let pool_address = pubkey.to_string();
            pool_addresses.push(pool_address.clone());

            if let Some(accounts) = self.derive_accounts_from_pool_address(client, &pool_address) {
                POOL_ADDRESS_MAP.lock().unwrap().insert(pool_address, accounts);
            }
        }
        pool_addresses
    }

    async fn listen_new_pool_addresses(
        &self,
        client: &RpcClient,
        address_tx: Sender<String>,
    ) -> Result<(), Box<dyn Error>> {
        let program_id = self.dex_program_id();
        let ws_url = "wss://api.mainnet-beta.solana.com";
        let (mut ws_stream, _) = connect_async(ws_url).await?;
        let subscribe_msg = format!(
            r#"{"jsonrpc":"2.0","id":1,"method":"logsSubscribe","params":["mentions","{}"]}"#,
            program_id
        );
        ws_stream.send(Message::Text(subscribe_msg)).await?;

        while let Some(msg) = ws_stream.next().await {
            let msg = msg?;
            if let Message::Text(text) = msg {
                let log: serde_json::Value = serde_json::from_str(&text)?;
                if log.get("result").is_some() {
                    continue;
                }

                let params = log.get("params").and_then(|p| p.get("result")).ok_or("No params")?;
                let tx_sig = params.get("signature").and_then(|s| s.as_str()).ok_or("No signature")?;
                let logs = params.get("logs").and_then(|l| l.as_array()).ok_or("No logs")?;

                let tx = client.get_transaction(
                    &Signature::from_str(tx_sig)?,
                    CommitmentConfig::confirmed(),
                )?;
                if tx.meta.is_some() && tx.meta.unwrap().err.is_some() {
                    continue;
                }

                let log_str = logs.iter().filter_map(|l| l.as_str()).collect::<Vec<&str>>().join(" ");
                let account_keys = tx.transaction.message.account_keys;

                for (i, key) in account_keys.iter().enumerate() {
                    if tx.transaction.message.is_writable(i) {
                        let pool_address = self.find_pool_address_from_account(&key.to_string());
                        if pool_address.is_empty() && self.is_valid_pool_address(client, &key.to_string()) {
                            if let Some(accounts) = self.derive_accounts_from_pool_address(client, &key.to_string()) {
                                POOL_ADDRESS_MAP.lock().unwrap().insert(key.to_string(), accounts);
                                info!("Detected new {} pool address: {}", self.dex_name(), key);
                                address_tx.send(key.to_string()).await?;
                            }
                        } else if !pool_address.is_empty() {
                            info!("Detected writable account affecting pool: {}, Pool: {}", tx_sig, pool_address);
                            address_tx.send(pool_address).await?;
                        }
                    }
                }

                if log_str.contains("initialize") {
                    if let Some(pool_address) = self.extract_new_pool_address(client, tx_sig) {
                        if self.is_valid_pool_address(client, &pool_address) {
                            if let Some(accounts) = self.derive_accounts_from_pool_address(client, &pool_address) {
                                POOL_ADDRESS_MAP.lock().unwrap().insert(pool_address.clone(), accounts);
                                info!("Detected new {} pool address: {}", self.dex_name(), pool_address);
                                address_tx.send(pool_address).await?;
                            }
                        }
                    }
                }
            }
        }
        Ok(())
    }

    fn fetch_pool_metadata(&self, client: &RpcClient, pool_address: &str) -> Option<PoolMetadata> {
        let derived_accounts = self.derive_accounts_from_pool_address(client, pool_address)?;
        let out = client.get_account_data(&Pubkey::from_str(pool_address).ok()?).ok()?;
        if out.len() < 200 {
            error!("Pool data too short for {}", pool_address);
            return None;
        }

        let base_reserve = self.get_vault_balance(client, &derived_accounts.base_vault);
        let quote_reserve = self.get_vault_balance(client, &derived_accounts.quote_vault);

        let fee_data = client.get_account_data(&Pubkey::from_str(&derived_accounts.fee_state).ok()?).ok()?;
        let fee_numerator = u64::from_le_bytes(fee_data[0..8].try_into().unwrap());
        let fee_denominator = u64::from_le_bytes(fee_data[8..16].try_into().unwrap());
        let trade_fee = if fee_denominator > 0 { Some(fee_numerator as f64 / fee_denominator as f64) } else { None };

        let sqrt_price_x64 = u64::from_le_bytes(out[64..72].try_into().unwrap());
        let tick_current_index = i32::from_le_bytes(out[72..76].try_into().unwrap());

        let tick_array_data = client.get_account_data(&Pubkey::from_str(&derived_accounts.tick_array_current).ok()?).unwrap_or_default();
        let mut tick_array = Vec::new();
        if !tick_array_data.is_empty() {
            for i in (8..tick_array_data.len() - 12).step_by(12) {
                let tick_index = i32::from_le_bytes(tick_array_data[i..i+4].try_into().unwrap());
                let liquidity = f64::from_le_bytes(tick_array_data[i+4..i+12].try_into().unwrap()) / 1_000_000.0;
                let mut tick_map = HashMap::new();
                tick_map.insert("tick_index".to_string(), PoolMetadataValue::Number(tick_index as f64));
                tick_map.insert("liquidity".to_string(), PoolMetadataValue::Number(liquidity));
                tick_array.push(PoolMetadataValue::Map(tick_map));
            }
        }

        let mut extra = HashMap::new();
        extra.insert("sqrt_price_x64".to_string(), PoolMetadataValue::Number(sqrt_price_x64 as f64));
        extra.insert("tick_current_index".to_string(), PoolMetadataValue::Number(tick_current_index as f64));
        extra.insert("tick_array".to_string(), PoolMetadataValue::Array(tick_array));

        Some(PoolMetadata {
            pool_address: pool_address.to_string(),
            base_mint: derived_accounts.base_mint,
            quote_mint: derived_accounts.quote_mint,
            base_reserve: Some(base_reserve),
            quote_reserve: Some(quote_reserve),
            trade_fee,
            extra,
        })
    }
}
impl RaydiumCLMM {
    fn derive_accounts_from_pool_address(&self, client: &RpcClient, pool_address: &str) -> Option<PoolDerivedAccounts> {
        let out = client.get_account_data(&Pubkey::from_str(pool_address).ok()?).ok()?;
        if out.len() < 200 {
            error!("Pool data too short for {}", pool_address);
            return None;
        }

        let base_mint = out[0..32].to_base58();
        let quote_mint = out[32..64].to_base58();
        let base_vault = out[96..128].to_base58();
        let quote_vault = out[128..160].to_base58();
        let fee_state = out[160..192].to_base58();

        let tick_array_current_seed = format!("tick_array{}", pool_address);
        let tick_array_current = Pubkey::create_program_address(
            &[&tick_array_current_seed.as_bytes()],
            &Pubkey::from_str(PROGRAM_ID).unwrap(),
        ).ok()?.to_string();

        let tick_array_prev_seed = format!("tick_array_prev{}", pool_address);
        let tick_array_prev = Pubkey::create_program_address(
            &[&tick_array_prev_seed.as_bytes()],
            &Pubkey::from_str(PROGRAM_ID).unwrap(),
        ).ok()?.to_string();

        let tick_array_next_seed = format!("tick_array_next{}", pool_address);
        let tick_array_next = Pubkey::create_program_address(
            &[&tick_array_next_seed.as_bytes()],
            &Pubkey::from_str(PROGRAM_ID).unwrap(),
        ).ok()?.to_string();

        let observation_state_seed = format!("observation_state{}", pool_address);
        let observation_state = Pubkey::create_program_address(
            &[&observation_state_seed.as_bytes()],
            &Pubkey::from_str(PROGRAM_ID).unwrap(),
        ).ok()?.to_string();

        Some(PoolDerivedAccounts {
            pool_address: pool_address.to_string(),
            base_mint,
            quote_mint,
            base_vault,
            quote_vault,
            fee_state,
            tick_array_current,
            tick_array_prev,
            tick_array_next,
            observation_state,
        })
    }

    fn find_pool_address_from_account(&self, account_address: &str) -> String {
        let map = POOL_ADDRESS_MAP.lock().unwrap();
        for (pool_address, accounts) in map.iter() {
            if account_address == pool_address ||
               account_address == &accounts.base_vault ||
               account_address == &accounts.quote_vault ||
               account_address == &accounts.tick_array_current ||
               account_address == &accounts.tick_array_prev ||
               account_address == &accounts.tick_array_next {
                return pool_address.clone();
            }
        }
        String::new()
    }

    fn is_valid_pool_address(&self, client: &RpcClient, pool_address: &str) -> bool {
        let out = match client.get_account_data(&Pubkey::from_str(pool_address).unwrap()) {
            Ok(data) => data,
            Err(_) => return false,
        };

        if out.len() < 200 {
            return false;
        }

        if Pubkey::from_str(&out.owner.to_string()).unwrap() != Pubkey::from_str(PROGRAM_ID).unwrap() {
            return false;
        }

        let base_mint = out[0..32].to_base58();
        let quote_mint = out[32..64].to_base58();
        if base_mint.is_empty() || quote_mint.is_empty() {
            return false;
        }

        let discriminator = u64::from_le_bytes(out[0..8].try_into().unwrap());
        if discriminator == 0 {
            return false;
        }

        true
    }

    fn get_vault_balance(&self, client: &RpcClient, vault: &str) -> f64 {
        match client.get_token_account_balance(&Pubkey::from_str(vault).unwrap(), CommitmentConfig::confirmed()) {
            Ok(resp) => resp.ui_amount.unwrap_or(0.0),
            Err(_) => 0.0,
        }
    }

    fn extract_new_pool_address(&self, client: &RpcClient, tx_sig: &str) -> Option<String> {
        let tx = client.get_transaction(
            &Signature::from_str(tx_sig).ok()?,
            CommitmentConfig::confirmed(),
        ).ok()?;
        for key in tx.transaction.message.account_keys {
            if key.is_writable() && !key.is_signer() {
                return Some(key.to_string());
            }
        }
        None
    }
}