快速入门
本指南将帮助您在几分钟内快速上手并开始使用我们的量化交易系统。
系统概述
我们的量化交易系统是一个使用rust开发的支持python策略的高性能、高拓展性、支持多交易所的一款量化交易系统。
环境准备
Linux 环境配置
-
连接 Linux 服务器
- 使用 SSH 客户端(如 PuTTY、MobaXterm 或终端)连接服务器
ssh username@server_ip
- 如果使用密钥认证,请确保已配置好 SSH 密钥
详细请参考连接Linux
-
安装必要软件
# 更新软件包列表
sudo apt update
# 安装基础工具
sudo apt install -y wget curl git unzip
Python 安装
详情见Python安装指南
安装前置库
pip3 install toml
快速开始
1. 下载SDK
下载最新版本release
2. 发送SDK到Linux服务器
可以在源设备上通过scp命令发送SDK到Linux服务器,也可以通过第三方工具进行传输
# scp open_quant_*.zip 用户名@IP地址:目标路径
scp open_quant_*.zip ubuntu@192.168.1.100:/home/ubuntu
3. 解压
unzip open_quant_*.zip
4. 设置执行权限
# 进入解压后的目录
cd open_quant_*
# 给可执行文件添加执行权限
chmod +x open_quant
5. 编写配置文件
创建/修改文件 config.toml
(解压后的目录中通常已存在,无需创建):
strategy_path = "strategy.py" # 策略文件路径
strategy_config_path = "strategy.toml" # 策略配置文件路径
[log]
level = "debug" # 日志级别, 支持 trace, debug, info, warn, error
[[exchanges]]
exchange = "BinanceSwap" # 交易所名称
is_colo = false # 是否为Colo交易所
is_testnet = false # 是否为测试环境
is_unified = false # 是否为统一账号
key = "" # 交易所API Key
secret = "" # 交易所API Secret
passphrase = "" # 交易所API Passphrase
rebate_rate = 0 # 返佣率
use_ws_api = true # 是否使用WebSocket API
host = "" # 自定义的交易所API Host
params = {} # 自定义的交易所API参数,如aws=true表示使用aws的URL
详细配置请参考配置文件
6. 编写策略文件
以下是一个订单簿深度不平衡策略示例,展示了如何使用Trader API的核心功能:
"""
strategy.py
订单簿深度不平衡策略
实现原理:基于买卖盘深度不平衡检测市场情绪,捕捉短期价格波动
"""
import json
import time
import math
import trader # type: ignore
import base_strategy
class Strategy(base_strategy.BaseStrategy):
def __init__(self, cex_configs, dex_configs, config, trader: trader.Trader):
# 访问配置值
self.cex_configs = cex_configs # 中心化交易所配置
self.has_account = len(cex_configs) > 0 # 是否有可用账户
self.base_index = len(self.cex_configs)
self.dex_configs = dex_configs # 去中心化交易所配置
self.trader = trader # 交易执行器
self.stop_flag = False # 停止标志
# 持仓历史缓存,用于记录开仓价格和跟踪订单状态
self.position_cache = {} # 持仓历史缓存
self.positions = {} # 当前持仓信息
self.balances = {} # 账户余额信息
self.orders = {} # 订单信息
self.last_signal_time = {} # 上次信号触发时间
# 初始化配置
self.update_config(config)
# 记录策略启动时间和初始资产
self.start_time = time.time()
self.initial_equity = 10000.0 # 默认初始资产,会在连接账户后更新
# 尝试加载缓存的交易数据
self.load_cache_data()
# 交易对信息字典
self.instruments = {} # 交易对基本信息
self.bbos = {} # 最优买卖价信息
self.fee_rates = {} # 手续费率
self.flight_orders = {} # 在途订单,尚未收到交易所确认
self.last_trigger_time = {} # 上次触发交易的时间
# 强停状态
self.force_stop = False # 强制停止标志,用于紧急情况
def update_config(self, config):
"""
更新策略配置参数
参数:
config: 包含策略参数的字典
"""
self.min_seconds_between_triggers = config['min_seconds_between_triggers'] # 触发信号最小间隔时间(秒)
self.symbols = config['symbols'] # 交易品种列表
self.trigger_imbalance_ratio = config['trigger_imbalance_ratio'] # 触发信号的深度不平衡比率阈值
self.trigger_min_size = config['trigger_min_size'] # 触发信号的最小深度尺寸
self.order_timeout = config.get('order_timeout', 30) # 订单超时时间(秒)
self.leverage = config['leverage'] # 杠杆倍数
def name(self):
"""返回策略名称"""
return "Depth Imbalance Strategy"
def start(self):
"""
策略启动函数
完成以下初始化工作:
1. 查询账户余额
2. 初始化持仓信息
3. 设置持仓模式(如适用)
4. 初始化交易对信息
5. 设置杠杆倍数
"""
self.trader.log("启动深度不平衡策略", level="INFO", color="blue")
# 查询账户余额
if self.has_account:
cmd = {
"account_id": 0,
"cmd": {
"Sync": "UsdtBalance"
}
}
balances = self.trader.publish(cmd).get('Ok', {})
self.trader.log(f"余额: {balances}", level="DEBUG", color="blue")
self.balances = {
"USDT": {
"available_balance": balances.get('available_balance', 0),
"balance": balances.get('balance', 0)
}
}
self.initial_equity = self.balances.get("USDT", {}).get("balance", 0.0)
# 初始化仓位信息
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"Position": None
}
}
}
res = self.trader.publish(cmd)
self.positions = {}
if 'Ok' in res and isinstance(res['Ok'], list):
for pos in res['Ok']:
if isinstance(pos, dict) and 'symbol' in pos:
self.positions[pos['symbol']] = pos
# 如果交易所是合约交易所,设置单向持仓
if self.cex_configs[0]['exchange'].endswith('Swap'):
try:
# 查询是否双向持仓
cmd = {
"account_id": 0,
"cmd": {
"Sync": "IsDualSidePosition"
}
}
res = self.trader.publish(cmd)
if 'Ok' in res:
# 如果当前是双向持仓,改为单向持仓
if res['Ok']:
# 设置单向持仓
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"SetDualSidePosition": False
}
}
}
res = self.trader.publish(cmd)
self.trader.log(f"设置单向持仓: {res}", level="INFO", color="green")
except Exception as e:
self.trader.log(f"设置持仓模式时发生错误: {str(e)}", level="ERROR", color="red")
# 初始化交易对信息
for symbol in self.symbols:
# 查询合约信息
cmd = {
"account_id": 0,
"cmd": {"Sync": {"Instrument": symbol}}
}
instrument = self.trader.publish(cmd).get('Ok')
time.sleep(0.1) # 避免请求过快
if instrument:
self.instruments[symbol] = instrument
self.trader.log(f"交易对 {symbol} 信息: {instrument}", level="INFO", color="green")
else:
raise Exception(f"未找到交易对 {symbol} 信息")
# 查询Bbo(最佳买卖价)
cmd['cmd'] = {"Sync": {"Bbo": symbol}}
bbo = self.trader.publish(cmd).get('Ok')
time.sleep(0.1)
if bbo:
self.bbos[symbol] = bbo
else:
raise Exception(f"未找到交易对 {symbol} 的BBO")
# 查询手续费率
cmd['cmd'] = {"Sync": {"FeeRate": symbol}}
fee = self.trader.publish(cmd).get('Ok')
time.sleep(0.1)
if fee:
self.fee_rates[symbol] = fee
else:
raise Exception(f"未找到交易对 {symbol} 的手续费率")
# 如果是合约,设置杠杆
if self.cex_configs[0]['exchange'].endswith('Swap'):
# 设置杠杆倍数
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"SetLeverage": [symbol, self.leverage]
}
}
}
leverage_res = self.trader.publish(cmd)
self.trader.log(f"设置杠杆 {self.leverage}x: {leverage_res}", level="INFO", color="green")
# 记录初始资产
self.initial_equity = self.balances.get("USDT", {}).get("balance", 10000.0)
self.trader.log(f"策略初始资产: {self.initial_equity} USDT", level="INFO", color="green")
def subscribes(self):
"""
返回策略需要订阅的行情和账户数据
订阅内容包括:
1. 深度行情数据
2. 最优买卖价(BBO)
3. 订单和持仓更新(如有账户)
4. 余额定时更新
5. 定时检查订单状态
返回:
包含订阅信息的列表
"""
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Depth": {
"symbols": self.symbols,
"levels": 5 # 订阅5档深度
},
},
{
"Bbo": self.symbols
}
]
}
},
]
if self.has_account:
# 订阅交易所websocket
subs.append({
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Order": self.symbols # 订阅订单更新
},
{
"Position": self.symbols # 订阅持仓更新
},
]
}
},)
# 定时更新持仓和余额信息
subs.append({
"account_id": 0,
"sub": {
"SubscribeRest": {
"update_interval": {
"secs": 1,
"nanos": 0
},
"rest_type": "Balance" # 每秒更新账户余额
}
}
},)
# 定时检查订单状态
subs.append({
"sub": {
"SubscribeTimer": {
"update_interval": {
"secs": 10,
"nanos": 0
},
"name": "cancel_all_orders" # 每10秒检查一次订单状态
}
}
},)
return subs
def on_bbo(self, exchange, context, bbo):
"""
处理最优买卖价(BBO)数据事件
更新交易对的最新买卖价信息,用于后续交易决策
参数:
exchange: 交易所名称
context: 上下文信息
bbo: 最优买卖价数据
"""
if self.force_stop:
return
symbol = bbo['symbol']
if symbol not in self.symbols:
return
# 更新BBO数据
self.bbos[symbol] = bbo
def on_depth(self, exchange, context, depth):
"""
处理深度数据事件 - 策略核心逻辑
根据订单簿深度计算买卖盘不平衡程度,在满足条件时生成交易信号
参数:
exchange: 交易所名称
context: 上下文信息
depth: 深度数据,包含买卖盘各档位价格和数量
"""
if self.force_stop:
return
symbol = depth['symbol']
if symbol not in self.symbols:
return
# 计算买卖深度总量(前5档)
bid_total = sum(level[1] for level in depth['bids'][:5])
ask_total = sum(level[1] for level in depth['asks'][:5])
# 如果深度为空,跳过
if bid_total == 0 or ask_total == 0:
return
# 计算不平衡比例:(买盘量-卖盘量)/(买盘量+卖盘量)
# 范围在[-1,1]之间,正值表示买盘压力更大,负值表示卖盘压力更大
imbalance_ratio = (bid_total - ask_total) / (bid_total + ask_total)
# 检查是否满足触发信号的时间间隔条件
now = time.time()
last_signal_time = self.last_signal_time.get(symbol, 0)
time_since_last_signal = now - last_signal_time
# 如果距离上次信号时间不足,跳过
if time_since_last_signal < self.min_seconds_between_triggers:
return
# 计算深度总量,用于判断市场流动性
total_depth = bid_total + ask_total
# 检查信号触发条件:
# 1. 不平衡比例绝对值超过阈值
# 2. 总深度超过最小要求(确保足够流动性)
if abs(imbalance_ratio) > self.trigger_imbalance_ratio and total_depth > self.trigger_min_size:
# 更新最后信号时间
self.last_signal_time[symbol] = now
# 根据不平衡方向确定交易方向
# 买盘压力大(正比例)时做多,卖盘压力大(负比例)时做空
side = "Buy" if imbalance_ratio > 0 else "Sell"
# 打印信号信息
color = "green" if side == "Buy" else "red"
self.trader.log(f"深度不平衡信号: {symbol} {side} 不平衡比例: {imbalance_ratio:.4f} 总深度: {total_depth:.4f}",
level="INFO", color=color)
# 执行交易
if self.has_account:
self.execute_trade(symbol, context, side)
def on_order_submitted(self, account_id, context, order_id_result, order):
"""
处理订单提交事件回调
当订单提交到交易所后,处理交易所返回的确认信息
参数:
account_id: 账户ID
context: 上下文信息
order_id_result: 订单ID结果
order: 订单信息
"""
symbol = order['symbol']
del self.flight_orders[symbol] # 从在途订单中移除
if 'Ok' in order_id_result:
self.trader.log(f"订单提交成功: {order_id_result}", level="INFO", color="green")
else:
self.trader.log(f"订单提交失败: {order_id_result}", level="ERROR", color="red")
def on_order(self, account_id, context, order):
"""
处理订单更新事件
当订单状态有变化时(如已成交、已撤销等),更新本地订单记录
参数:
account_id: 账户ID
context: 上下文信息
order: 更新后的订单信息
"""
self.trader.log(f"订单更新: {order}", level="INFO", color="blue")
if order['id'] in self.orders:
# 更新订单状态
self.orders[order['id']]['status'] = order['status']
# 如果订单已完成(成交或取消),记录成交信息
if order['status'] in ['Filled', 'Canceled']:
self.trader.log(f"订单状态更新: {order['symbol']} {order['side']} 状态: {order['status']} 成交均价: {order.get('avg_price', 0)}",
level="INFO", color="blue")
def on_position(self, account_id, positions):
"""
处理持仓更新事件
当持仓有变化时,更新本地持仓记录
参数:
account_id: 账户ID
positions: 更新后的持仓信息
"""
self.trader.log(f"持仓更新: {positions}", level="INFO", color="blue")
if isinstance(positions, list):
for position in positions:
if isinstance(position, dict) and 'symbol' in position:
symbol = position['symbol']
self.positions[symbol] = position
def on_balance(self, account_id, balances):
"""
处理余额更新事件
当账户余额变化时,更新本地余额记录
参数:
account_id: 账户ID
balances: 更新后的余额信息
"""
for balance in balances:
asset = balance['asset']
self.balances[asset] = balance
def on_timer_subscribe(self, timer_name):
"""
处理定时器事件
根据不同的定时器名称执行相应的定时任务
参数:
timer_name: 定时器名称
"""
if timer_name == "cancel_all_orders":
self.cancel_all_orders() # 定期检查并取消长时间未成交的订单
def on_stop(self):
"""
策略停止时的处理逻辑
执行以下清理工作:
1. 取消所有未完成订单
2. 平掉所有持仓
3. 保存缓存数据
4. 计算策略收益
"""
self.trader.log("策略正在停止...", level="INFO", color="blue")
self.stop_flag = True
# 取消所有订单
self.cancel_all_orders()
# 平仓
self.close_all_positions()
# 保存缓存数据
self.save_cache_data()
# 计算收益
self.calculate_profit()
def on_config_update(self, config):
"""
热更新策略配置
在策略运行期间更新参数,无需重启策略
参数:
config: 新的配置参数
"""
self.trader.log(f"热更新策略配置: {config}", level="INFO", color="blue")
self.update_config(config)
# -------------------------------------------------------实用工具函数-------------------------------------------------------
def execute_trade(self, symbol, context, side):
"""
执行交易操作
根据交易信号和市场状况,计算下单参数并提交订单
参数:
symbol: 交易品种
context: 上下文信息
side: 交易方向(Buy/Sell)
"""
# 防止重复下单:同一品种有在途订单时跳过
if symbol in self.flight_orders:
return
# 检查信号触发的时间间隔
last_trigger_time = self.last_trigger_time.get(symbol, 0)
if time.time() - last_trigger_time < self.min_seconds_between_triggers:
return
# 确保有交易所行情数据
if symbol not in self.instruments or symbol not in self.bbos:
self.trader.log(f"未找到交易对信息: {symbol}", level="ERROR", color="red")
return
# 获取当前市场价格
bbo = self.bbos[symbol]
if side == "Buy":
price = bbo['ask_price'] # 买入时使用卖一价(ask)
else:
price = bbo['bid_price'] # 卖出时使用买一价(bid)
if price <= 0:
self.trader.log(f"无效价格: {price}", level="ERROR", color="red")
return
# 计算可用的名义交易价值
notional_value = self.balances.get("USDT", {}).get("available_balance", 0)
self.trader.log(f"当前可用余额: {notional_value}", level="INFO", color="blue")
# 应用杠杆
notional_value = notional_value * self.leverage
# 风险控制:只使用30%可用资金
notional_value = notional_value * 0.3
self.trader.log(f"开仓名义价值: {notional_value}", level="INFO", color="blue")
instrument = self.instruments[symbol]
# 检查是否满足最小下单名义价值要求
if notional_value <= instrument.get('min_notional', 0):
return
# 根据开仓名义价值计算开仓数量
amount = notional_value / price
self.trader.log(f"开仓数量(单位币数量): {amount}", level="INFO", color="blue")
# 处理数量精度
amount_precision = instrument.get('amount_precision', 3) # 数量精度
amount_tick = float(instrument.get('amount_tick', 0.001)) # 数量最小变动单位
amount_multiplier = instrument.get('amount_multiplier', 1) # 数量乘数(合约需要)
# 数量处理:乘以合约乘数换算为交易所接口所需的张数
amount = amount * amount_multiplier
# 数量处理:用数量步长向下取整
if amount_tick > 0:
amount = math.floor(amount / amount_tick) * amount_tick
# 保留精度
amount = round(amount, amount_precision)
self.trader.log(f"开仓数量(合约张数): {amount}", level="INFO", color="blue")
# 检查是否满足最小下单数量要求
if amount < instrument.get('min_qty', 0.000001):
return
# 处理价格精度
price_precision = instrument.get('price_precision', 5) # 价格精度
price_tick = float(instrument.get('price_tick', 0.00001)) # 价格最小变动单位
price_multiplier = instrument.get('price_multiplier', 1) # 价格乘数(合约需要)
# 价格处理:乘以合约乘数换算为交易所接口所需的价格
price = price * price_multiplier
# 价格处理:用价格步长根据买卖方向向上或向下取整
if price_tick > 0:
if side == "Buy":
# 买入时向上取整(确保能成交)
price = math.ceil(price / price_tick) * price_tick
else:
# 卖出时向下取整(确保能成交)
price = math.floor(price / price_tick) * price_tick
# 保留价格精度
price = round(price, price_precision)
# 准备下单参数
order = {
"cid": trader.create_cid(self.cex_configs[0]['exchange']), # 客户端订单ID
"symbol": symbol,
"order_type": "Limit", # 限价单
"side": side,
"pos_side": None, # 单向持仓模式
"time_in_force": "GTC", # 一直有效直到取消
"price": price,
"amount": amount,
}
# 提交限价订单
cmd = {
"account_id": 0,
"context": context,
"cmd": {
"Async": {
"PlaceOrder": [
order,
{
"is_dual_side": False, # 单向持仓
"margin_mode": "Cross", # 全仓模式(okx需要)
}
]
}
}
}
result = self.trader.publish(cmd)
if 'Ok' in result:
order_id = result['Ok']
self.trader.log(f"提交订单成功: {symbol} {side} 数量: {amount} 价格: {price} 订单ID: {order_id}",
level="INFO", color="green")
# 保存订单记录
order = {
"status": "Open",
'symbol': symbol,
'side': side,
'quantity': amount,
'price': price,
'create_time': time.time()
}
# 记录为在途订单
self.flight_orders[symbol] = order
# 更新最后触发时间
self.last_trigger_time[symbol] = time.time()
# 记录订单信息
self.orders[order_id] = order
else:
self.trader.log(f"提交订单失败: {result}", level="ERROR", color="red")
def load_cache_data(self):
"""
加载缓存数据
从本地文件恢复策略运行状态,包括持仓历史和资金信息
"""
try:
with open('depth_imbalance_cache.json', 'r') as f:
cache = json.load(f)
self.position_cache = cache.get('position_cache', {})
self.initial_equity = cache.get('initial_equity', 10000.0)
self.start_time = cache.get('start_time', time.time())
self.trader.log("加载缓存数据成功", level="INFO", color="green")
except:
self.trader.log("加载缓存数据失败,使用默认值", level="WARN", color="yellow")
def save_cache_data(self):
"""
保存缓存数据
将当前策略状态保存到本地文件,用于后续恢复
"""
try:
cache = {
'position_cache': self.position_cache,
'initial_equity': self.initial_equity,
'start_time': self.start_time
}
with open('depth_imbalance_cache.json', 'w') as f:
json.dump(cache, f)
self.trader.log("保存缓存数据成功", level="INFO", color="green")
except:
self.trader.log("保存缓存数据失败", level="ERROR", color="red")
def get_orders(self, symbol: str) -> list:
"""
获取当前所有挂单
查询指定交易对的未完成订单
参数:
symbol: 交易对名称
返回:
list: 挂单列表
"""
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"GetOpenOrders": symbol
}
}
}
result = self.trader.publish(cmd)
if 'Ok' in result :
orders = result['Ok']
if isinstance(orders, list) and len(orders) > 0:
self.trader.log(f"获取挂单成功: {orders}", level="DEBUG")
return orders
else:
self.trader.log(f"获取挂单失败: {result}", level="ERROR")
return []
def cancel_all_orders(self):
"""
取消所有未完成订单
遍历所有交易品种,取消所有挂起的订单
"""
for symbol in self.symbols:
orders = self.get_orders(symbol)
for order in orders:
self.trader.log(f"取消订单: {order}", level="INFO", color="blue")
res = self.trader.publish({"account_id": 0, "cmd": {"Sync": {"CancelOrder": [{"Id": order['id']}, order['symbol']]}}})
self.trader.log(f"取消订单结果: {res}", level="INFO", color="blue")
def get_position(self) -> dict:
"""
获取当前持仓信息
查询所有交易品种的持仓状态
返回:
dict: 持仓信息字典
"""
cmd = {
"account_id": 0,
"cmd": {
"Sync": {"Position": None}
}
}
res = self.trader.publish(cmd)
if 'Ok' in res:
return res['Ok']
else:
self.trader.log(f"获取持仓失败: {res}", level="ERROR", color="red")
return []
def close_all_positions(self):
"""
平掉所有持仓
遍历所有持仓,提交市价单平仓
"""
for position in self.get_position():
symbol = position['symbol']
amount = position['amount']
if amount > 0:
self.trader.log(f"平仓仓位: {position}", level="INFO", color="blue")
pos_side = position['side'] # 持仓方向
# 确定平仓方向:持多仓卖出,持空仓买入
side = 'Sell' if pos_side == 'Long' else 'Buy'
# 处理数量精度
instrument = self.instruments.get(symbol, {})
amount_precision = instrument.get('amount_precision', 3)
amount_multiplier = instrument.get('amount_multiplier', 1)
self.trader.log(f"平仓数量(单位币数量): {amount}", level="INFO", color="blue")
amount = amount * amount_multiplier # 转换为交易所张数
# 保留精度
amount = round(amount, amount_precision)
self.trader.log(f"平仓数量(合约张数): {amount}", level="INFO", color="blue")
# 准备市价平仓订单
order = {
"symbol": symbol,
"order_type": "Market", # 市价单,确保能立即成交
"side": side,
"pos_side": pos_side,
"time_in_force": "IOC", # 立即成交否则取消
"price": None, # 市价单无需指定价格
"amount": amount,
}
self.trader.log(f"平仓订单: {order}", level="INFO", color="blue")
# 提交市价平仓订单
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"PlaceOrder": [
order,
{
"is_dual_side": False,
"margin_mode": "Cross", # 全仓模式
"market_order_mode": "Normal" # 普通市价单
}
]
}
}
}
res = self.trader.publish(cmd)
self.trader.log(f"平仓结果: {res}", level="INFO", color="blue")
def calculate_profit(self):
"""
计算策略收益
计算策略运行期间的盈亏情况
"""
cmd = {
"account_id": 0,
"cmd": {
"Sync": "UsdtBalance"
}
}
balance = self.trader.publish(cmd).get('Ok', {}).get("balance", 0.0)
profit = balance - self.initial_equity
self.trader.log(f"收益: {profit}", level="INFO", color="blue")
7. 运行程序
# 其中第一个参数是配置文件,第二个参数是python策略文件路径
./open_quant config.toml strategy.py
由于存在交易所的网络限制,可以使用代理工具连接交易所,配置请参考代理配置。
策略开发指南
策略编写基础
-
策略类结构
- 继承
BaseStrategy
类,详情见策略基类说明 - 实现必要的方法:
__init__
,name
,start
,on_stop
,subscribes
- 根据需要实现回调方法:
on_bbo
,on_order
,on_position
等
- 继承
-
策略生命周期
__init__
: 初始化策略参数和状态start
: 策略启动时的操作on_stop
: 策略停止时的清理操作
-
数据订阅
- 在
subscribes
方法中定义需要订阅的数据,详情见数据订阅 - 支持订阅行情数据、订单状态、持仓信息等
- 在
-
交易操作
- 使用
trader.publish
发送交易指令 - 支持市价单、限价单、止损单等多种订单类型
- 使用
权限设置
交易所 API 权限
-
创建 API 密钥
- 登录交易所账户
- 进入 API 管理页面
- 创建新的 API 密钥,确保启用以下权限:
- 读取账户信息
- 查看订单
- 创建订单
- 取消订单
- 查看持仓
-
安全设置
- 设置 IP 白名单(如果交易所支持)
- 设置适当的权限范围
- 不要启用提现权限
- 定期轮换 API 密钥
系统权限设置
-
文件权限
# 设置配置文件权限
chmod 600 config.toml
# 设置策略文件权限
chmod 644 strategy.py -
运行权限
# 确保可执行文件有执行权限
chmod +x open_quant
核心功能
数据订阅
# 订阅标记价格
def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"MarkPrice": {
"symbols": ["BTC_USDT"],
},
},
]
}
}
]
return subs
# 订阅订单簿数据
def subscribes(self):
subs = [
{
"account_id": 0,
"sub": {
"SubscribeWs": [
{
"Bbo": {
"symbols": ["BTC_USDT"],
},
},
]
}
}
]
return subs
交易操作
# 通过回调进行下单操作
def on_depth(self, exchange, context, depth):
# ...
# 其他逻辑
cmd = self.build_trade_cmd(symbol, context, side)
return {
'cmds': [cmd], # 按照该格式返回
}
def build_trade_cmd(self, symbol, context, side):
# 通过其他逻辑,计算出价格和数量
price = round(price, price_precision)
amount = round(amount, amount_precision)
# 准备下单参数
order = {
"cid": self.trader.create_cid(self.cex_configs[0]['exchange']), # 客户端订单ID
"symbol": symbol,
"order_type": "Limit", # 限价单
"side": side,
"pos_side": None, # 单向持仓模式
"time_in_force": "GTC", # 一直有效直到取消
"price": price,
"amount": amount,
}
# 提交限价订单
cmd = {
"account_id": 0,
"context": context,
"cmd": {
"Async": {
"PlaceOrder": [
order,
{
"is_dual_side": False, # 单向持仓
"margin_mode": "Cross", # 全仓模式(okx需要)
}
]
}
}
}
return cmd
持仓管理
# 查询持仓
def get_position():
# 查询给定交易对持仓
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"Position": "BTC_USDT"
}
}
}
res = self.trader.publish(cmd)
# 查询账户余额
def get_usdt_balance():
cmd = {
"account_id": 0,
"cmd": {
"Sync": "UsdtBalance"
}
}
balance = self.trader.publish(cmd).get('Ok', {}).get("balance", 0.0)
公共行情获取
def get_depth():
# 查询给定交易对深度
cmd = {
"account_id": 0,
"cmd": {
"Sync": {
"Depth": ["BTC_USDT", 10]
}
}
}
depth = self.trader.publish(cmd)
下一步
- 查看策略开发学习如何开发更复杂的策略
- 参考Python SDK了解如何将策略部署到生产环境