穿透价格的表象:基于TqSdk的多空力量博弈策略源码解析
在量化交易的浩瀚星海中,交易者们常常面临一个经典的路线抉择:是做宏观的"趋势跟随",还是做微观的"形态捕捉"?
今天我们将深度拆解一份基于 Python 和 TqSdk 框架编写的"多空力量博弈交易策略"。这套策略交出了一份将两者巧妙融合的答卷------它通过均线看大势、通过通道抓突破,最后再用放大镜解剖单根K线的"多空力量",构建了一套严密的立体交易系统。
以下,我们将结合核心源码,为您层层剥开这套策略的逻辑内核。
一、 顺势而为:大级别的趋势与动能过滤
一笔高胜算的交易,首先必须处于阻力最小的方向上。该策略在入场前,设置了两道宏观与中观的"防线",以过滤掉无序的震荡行情。
1. 均线定大势
策略调用了 TqSdk 的 MA 算法,计算出 20 周期简单移动平均线。这根均线就是系统眼中的"牛熊分水岭"。价格在均线之上只做多,在均线之下只做空。
2. 极值确动能
光有趋势不够,还要有爆发的动能。策略计算了过去 20 根 K 线的最高价(HHV)和最低价(LLV),只有当最新价格突破这个局部极值时,才认为动能真正显现。
核心源码:
Python
# 获取当前 20周期均线
ma_df = MA(self.klines, TREND_MA_N)
ma_now = safe_float(ma_df["ma"].iloc[-2], None)
# 获取过去 N 根 K 线的最高价与最低价(唐奇安通道)
highest_n = rolling_max(highs, idx, BREAKOUT_N, include_current=False)
lowest_n = rolling_min(lows, idx, BREAKOUT_N, include_current=False)
二、 见微知著:K线内部的多空力量博弈
如果说均线和通道是"望远镜",那么这一步就是策略的"显微镜"。在突破发生的当天,策略会将当根 K 线的总振幅(最高价 - 最低价)视为 100% 的战场,并精细量化出四股力量的占比。
-
下影线力量 (
lower_ratio): 代表下方支撑力,多方的防守底线。 -
阳线实体力量 (
bull_body_ratio): 代表绝对的上涨空间,多方的主动进攻。 -
上影线力量 (
upper_ratio): 代表上方抛压,空方的防守底线。 -
阴线实体力量 (
bear_body_ratio): 代表绝对的下跌空间,空方的主动进攻。
核心源码:
Python
full_range = high_now - low_now # 定义单根K线总振幅
# 计算各部分绝对点数
upper_shadow = high_now - max(open_now, close_now)
lower_shadow = min(open_now, close_now) - low_now
bull_body = max(close_now - open_now, 0.0)
bear_body = max(open_now - close_now, 0.0)
# 转化为占总战场的百分比
upper_ratio = upper_shadow / full_range
lower_ratio = lower_shadow / full_range
bull_body_ratio = bull_body / full_range
bear_body_ratio = bear_body / full_range
三、 信号共振:严苛的三重入场条件
当宏观环境与微观形态发生共振时,策略才会真正扣动扳机。
以做多为例,系统要求市场必须同时满足:站在均线之上 + 突破局部新高 + K线呈现强大的下方支撑与多头实体推进。只有买卖力量在微观层面呈现一边倒的态势,入场信号才被确认。
核心源码:
Python
# 多头开仓信号共振
long_signal = (
lower_ratio > 0.5 and # 微观:下影线支撑强劲
bull_body_ratio > 0.5 and # 微观:阳线实体拉升坚决
close_now > ma_now and # 宏观:多头趋势
high_now >= highest_n # 中观:突破局部前高
)
# 空头开仓信号共振
short_signal = (
upper_ratio > 0.5 and # 微观:上影线抛压沉重
bear_body_ratio > 0.5 and # 微观:阴线实体杀跌坚决
close_now < ma_now and # 宏观:空头趋势
low_now <= lowest_n # 中观:跌破局部前低
)
四、 进退有度:双重风控与动态止盈
"截断亏损,让利润奔跑"是这套策略退出机制的核心理念。系统设计了一静一动两套风险管理网络:
1. 刚性底线(固定止损)
进场瞬间,系统会根据开仓价设置反向 3% 的死止损。这道防线不随行情波动而改变,一旦触及,说明入场逻辑完全破裂,系统将无条件清仓。
2. 利润追踪(回撤止盈)
为了不被微小的震荡洗盘,同时保住胜利果实,系统会记录持仓期间到达的"最佳浮盈价"(best_price)。一旦价格从最高点回吐了最大利润的 30%,策略判断趋势可能发生反转,立刻落袋为安。
核心源码:
Python
# 以多头持仓的风控为例
self.stop_price = self.entry_price * (1 - FIXED_STOP_PCT) # 3% 刚性止损线
self.best_price = max(self.best_price, close_now) # 动态刷新最高价
if self.best_price > self.entry_price:
# 计算当前最大浮盈
floating_profit = self.best_price - self.entry_price
# 设置追踪止盈线:最高价回落利润的 30%
trail_line = self.best_price - floating_profit * TRAIL_PROFITBACK_PCT
if close_now <= trail_line:
print("多头盈利回撤止盈")
self.target_pos.set_target_volume(0) # 清仓离场
五、 结语
"多空力量博弈交易策略"向我们展示了一个极具层次感的量化思考框架。它不迷信单一的指标,而是将大环境(均线)、爆发点(突破)、买卖情绪(K线切片)与底线思维(风控)熔于一炉。
对于量化开发者而言,这份源码不仅提供了一个可扩展的交易骨架,更传递了一种敬畏市场、步步为营的交易哲学。
六、 源码
from datetime import date
from math import isnan
from tqsdk import TqApi, TqAuth, TqSim, TqBacktest, BacktestFinished
from tqsdk.lib import TargetPosTask
from tqsdk.ta import MA
# ============================================================
# 多空力量博弈交易策略(TqSdk 版)
#
# 官方接口依据:
# - TqBacktest 用于回测。来源:TqSdk 官方回测文档
# - 主连回测建议通过 quote.underlying_symbol 切换到实际主力合约交易。来源:官方回测文档
# - MA 来自 tqsdk.ta。来源:tqsdk.ta 技术指标文档
# - TargetPosTask 用于目标持仓管理。来源:官方索引/用法文档
#
# 策略逻辑:
# 1) 趋势判断:
# close > MA(N) -> 趋势偏多
# close < MA(N) -> 趋势偏空
#
# 2) K线多空力量:
# 上影线力量 = (high - max(open, close)) / (high - low)
# 下影线力量 = (min(open, close) - low) / (high - low)
# 阳线实体力量 = max(close - open, 0) / (high - low)
# 阴线实体力量 = max(open - close, 0) / (high - low)
#
# 3) 入场:
# 做多:
# 下影线力量 > 0.5
# 阳线实体力量 > 0.5
# close > MA
# high >= 过去N根K线最高价
#
# 做空:
# 上影线力量 > 0.5
# 阴线实体力量 > 0.5
# close < MA
# low <= 过去N根K线最低价
#
# 4) 风控:
# - 亏损固定百分比止损
# - 盈利后从最佳浮盈价回撤一定比例止盈
# ============================================================
# ----------------------------
AUTH_USER = "" #快期账号
AUTH_PASSWORD = "" #快期密码
SYMBOL = "KQ.m@SHFE.rb"
START_DT = date(2021, 1, 1)
END_DT = date(2025, 1, 1)
INIT_BALANCE = 1_000_000
WEB_GUI = True
KLINE_SEC = 24 * 60 * 60
DATA_LENGTH = 320
TREND_MA_N = 20
BREAKOUT_N = 20
FIXED_STOP_PCT = 0.03
TRAIL_PROFITBACK_PCT = 0.30
MAX_LOTS = 5
PRICE_MODE = "ACTIVE"
MIN_READY_BARS = 60
def safe_float(v, default=0.0):
try:
x = float(v)
if isnan(x):
return default
return x
except Exception:
return default
def fmt(v, n=4):
try:
return f"{float(v):.{n}f}"
except Exception:
return str(v)
def net_pos(position):
return int(getattr(position, "pos_long", 0)) - int(getattr(position, "pos_short", 0))
def rolling_max(values, idx, n, include_current=False):
end = idx + 1 if include_current else idx
start = end - n
if start < 0:
return None
vals = values[start:end]
if any(v is None for v in vals):
return None
return max(vals)
def rolling_min(values, idx, n, include_current=False):
end = idx + 1 if include_current else idx
start = end - n
if start < 0:
return None
vals = values[start:end]
if any(v is None for v in vals):
return None
return min(vals)
class LongShortPowerGameStrategy:
def __init__(self, api, signal_symbol):
self.api = api
self.signal_symbol = signal_symbol
self.account = api.get_account()
self.signal_quote = api.get_quote(signal_symbol)
self.klines = api.get_kline_serial(signal_symbol, KLINE_SEC, data_length=DATA_LENGTH)
self.trade_symbol = None
self.trade_quote = None
self.position = None
self.target_pos = None
self.last_bar_dt = None
self.pending_trade_symbol = None
self.last_switch_notice = None
self.direction = 0
self.entry_price = None
self.best_price = None
self.stop_price = None
def resolve_trade_symbol(self):
if self.signal_symbol.startswith("KQ.m@"):
underlying = getattr(self.signal_quote, "underlying_symbol", None)
if underlying:
return underlying
return None
return self.signal_symbol
def ensure_trade_objects(self):
resolved = self.resolve_trade_symbol()
if not resolved:
return False
if self.trade_symbol is None:
self.trade_symbol = resolved
self.trade_quote = self.api.get_quote(self.trade_symbol)
self.position = self.api.get_position(self.trade_symbol)
self.target_pos = TargetPosTask(self.api, self.trade_symbol, price=PRICE_MODE)
print("初始化交易合约:", self.trade_symbol)
return True
if resolved != self.trade_symbol:
current_net = net_pos(self.position)
if current_net == 0:
if self.target_pos is not None:
self.target_pos.cancel()
while not self.target_pos.is_finished():
self.api.wait_update()
old_symbol = self.trade_symbol
self.trade_symbol = resolved
self.trade_quote = self.api.get_quote(self.trade_symbol)
self.position = self.api.get_position(self.trade_symbol)
self.target_pos = TargetPosTask(self.api, self.trade_symbol, price=PRICE_MODE)
self.pending_trade_symbol = None
self.last_switch_notice = None
print(f"主连映射切换,新的交易合约: {old_symbol} -> {self.trade_symbol}")
else:
self.pending_trade_symbol = resolved
notice_key = (self.trade_symbol, resolved)
if self.last_switch_notice != notice_key:
print(f"检测到主连切换,但当前有持仓,继续用原合约管理直至平仓。当前={self.trade_symbol},待切换={resolved}")
self.last_switch_notice = notice_key
return True
def sync_position_state(self):
current_net = net_pos(self.position)
if current_net > 0:
if self.direction <= 0:
self.direction = 1
self.entry_price = safe_float(getattr(self.trade_quote, "last_price", 0.0), 0.0)
self.best_price = self.entry_price
self.stop_price = self.entry_price * (1 - FIXED_STOP_PCT)
print(f"建仓完成 | LONG | entry={fmt(self.entry_price)} | stop={fmt(self.stop_price)}")
elif current_net < 0:
if self.direction >= 0:
self.direction = -1
self.entry_price = safe_float(getattr(self.trade_quote, "last_price", 0.0), 0.0)
self.best_price = self.entry_price
self.stop_price = self.entry_price * (1 + FIXED_STOP_PCT)
print(f"建仓完成 | SHORT | entry={fmt(self.entry_price)} | stop={fmt(self.stop_price)}")
else:
if self.direction != 0:
print("持仓归零,状态重置")
self.direction = 0
self.entry_price = None
self.best_price = None
self.stop_price = None
if current_net == 0 and self.pending_trade_symbol and self.pending_trade_symbol != self.trade_symbol:
if self.target_pos is not None:
self.target_pos.cancel()
while not self.target_pos.is_finished():
self.api.wait_update()
old_symbol = self.trade_symbol
self.trade_symbol = self.pending_trade_symbol
self.trade_quote = self.api.get_quote(self.trade_symbol)
self.position = self.api.get_position(self.trade_symbol)
self.target_pos = TargetPosTask(self.api, self.trade_symbol, price=PRICE_MODE)
print(f"原合约已平仓,执行切换: {old_symbol} -> {self.trade_symbol}")
self.pending_trade_symbol = None
self.last_switch_notice = None
def update_risk(self, close_now):
if self.direction == 0 or self.entry_price is None:
return False
if self.direction > 0:
self.stop_price = self.entry_price * (1 - FIXED_STOP_PCT)
self.best_price = max(self.best_price, close_now)
if self.best_price > self.entry_price:
floating_profit = self.best_price - self.entry_price
trail_line = self.best_price - floating_profit * TRAIL_PROFITBACK_PCT
if close_now <= trail_line:
print(f"多头盈利回撤止盈 | close={fmt(close_now)} <= trail_line={fmt(trail_line)}")
self.target_pos.set_target_volume(0)
return True
if close_now <= self.stop_price:
print(f"多头固定止损 | close={fmt(close_now)} <= stop={fmt(self.stop_price)}")
self.target_pos.set_target_volume(0)
return True
elif self.direction < 0:
self.stop_price = self.entry_price * (1 + FIXED_STOP_PCT)
self.best_price = min(self.best_price, close_now)
if self.best_price < self.entry_price:
floating_profit = self.entry_price - self.best_price
trail_line = self.best_price + floating_profit * TRAIL_PROFITBACK_PCT
if close_now >= trail_line:
print(f"空头盈利回撤止盈 | close={fmt(close_now)} >= trail_line={fmt(trail_line)}")
self.target_pos.set_target_volume(0)
return True
if close_now >= self.stop_price:
print(f"空头固定止损 | close={fmt(close_now)} >= stop={fmt(self.stop_price)}")
self.target_pos.set_target_volume(0)
return True
return False
def on_bar(self):
idx = len(self.klines) - 2
if idx < MIN_READY_BARS:
return
opens = [safe_float(self.klines["open"].iloc[i], None) for i in range(len(self.klines))]
highs = [safe_float(self.klines["high"].iloc[i], None) for i in range(len(self.klines))]
lows = [safe_float(self.klines["low"].iloc[i], None) for i in range(len(self.klines))]
closes = [safe_float(self.klines["close"].iloc[i], None) for i in range(len(self.klines))]
open_now = opens[idx]
high_now = highs[idx]
low_now = lows[idx]
close_now = closes[idx]
if None in (open_now, high_now, low_now, close_now):
return
ma_df = MA(self.klines, TREND_MA_N)
ma_now = safe_float(ma_df["ma"].iloc[-2], None)
if ma_now is None:
return
highest_n = rolling_max(highs, idx, BREAKOUT_N, include_current=False)
lowest_n = rolling_min(lows, idx, BREAKOUT_N, include_current=False)
if None in (highest_n, lowest_n):
return
full_range = high_now - low_now
if full_range <= 0:
return
upper_shadow = high_now - max(open_now, close_now)
lower_shadow = min(open_now, close_now) - low_now
bull_body = max(close_now - open_now, 0.0)
bear_body = max(open_now - close_now, 0.0)
upper_ratio = upper_shadow / full_range
lower_ratio = lower_shadow / full_range
bull_body_ratio = bull_body / full_range
bear_body_ratio = bear_body / full_range
long_signal = (
lower_ratio > 0.5 and
bull_body_ratio > 0.5 and
close_now > ma_now and
high_now >= highest_n
)
short_signal = (
upper_ratio > 0.5 and
bear_body_ratio > 0.5 and
close_now < ma_now and
low_now <= lowest_n
)
if self.direction != 0:
if self.update_risk(close_now):
return
if self.direction == 0:
if long_signal:
print(
f"开多 | close={fmt(close_now)} > MA={fmt(ma_now)} | "
f"high={fmt(high_now)} >= HHV={fmt(highest_n)} | "
f"lower_ratio={fmt(lower_ratio)} | bull_body_ratio={fmt(bull_body_ratio)}"
)
self.target_pos.set_target_volume(MAX_LOTS)
return
if short_signal:
print(
f"开空 | close={fmt(close_now)} < MA={fmt(ma_now)} | "
f"low={fmt(low_now)} <= LLV={fmt(lowest_n)} | "
f"upper_ratio={fmt(upper_ratio)} | bear_body_ratio={fmt(bear_body_ratio)}"
)
self.target_pos.set_target_volume(-MAX_LOTS)
return
elif self.direction > 0 and short_signal:
print("多头反手开空")
self.target_pos.set_target_volume(-MAX_LOTS)
return
elif self.direction < 0 and long_signal:
print("空头反手开多")
self.target_pos.set_target_volume(MAX_LOTS)
return
print(
f"状态 | dir={self.direction} | close={fmt(close_now)} | MA={fmt(ma_now)} | "
f"HHV={fmt(highest_n)} | LLV={fmt(lowest_n)} | "
f"upper_ratio={fmt(upper_ratio)} | lower_ratio={fmt(lower_ratio)} | "
f"bull_body_ratio={fmt(bull_body_ratio)} | bear_body_ratio={fmt(bear_body_ratio)} | "
f"best_price={fmt(self.best_price)} | stop={fmt(self.stop_price)}"
)
def run(self):
print("多空力量博弈交易策略启动...")
while True:
self.api.wait_update()
if not self.ensure_trade_objects():
continue
self.sync_position_state()
if not self.api.is_changing(self.klines.iloc[-1], "datetime"):
continue
current_last_dt = self.klines.iloc[-1]["datetime"]
if self.last_bar_dt is None:
self.last_bar_dt = current_last_dt
continue
if current_last_dt != self.last_bar_dt:
self.last_bar_dt = current_last_dt
print("-" * 120)
print(f"新K线到来 | 数据合约={self.signal_symbol} | 交易合约={self.trade_symbol}")
self.on_bar()
def main():
sim = TqSim(INIT_BALANCE)
api = None
try:
api = TqApi(
account=sim,
backtest=TqBacktest(start_dt=START_DT, end_dt=END_DT),
auth=TqAuth(AUTH_USER, AUTH_PASSWORD),
web_gui=WEB_GUI,
)
strategy = LongShortPowerGameStrategy(api, SYMBOL)
strategy.run()
except BacktestFinished:
print("=" * 120)
print("回测结束")
print("trade_log:")
print(sim.trade_log)
print("tqsdk_stat:")
print(sim.tqsdk_stat)
finally:
if api is not None:
try:
api.close()
except Exception:
pass
if __name__ == "__main__":
main()