在量化交易的浩瀚星海中,交易者们常常分化为两个阵营:一派是"趋势跟踪者",他们崇尚均线与通道,着眼于宏观的动能;另一派是"形态捕捉者",他们死磕K线组合与量价关系,试图在微观中洞察主力的买卖情绪。
今天我们要拆解的这套基于 Python 和 TqSdk 框架编写的 “多空力量博弈交易策略”,交出了一份将两者巧妙融合的答卷。它不仅拥有大级别的宏观视野,更具备拿着放大镜审视单根K线内部结构的细腻。它不盲目预测市场,而是通过"三重滤网"来确认一笔交易的绝对胜算:看大势、抓突破、审细节。
以下,我们将穿透代码表象,为您层层剥开这套策略的逻辑内核。
一、 顺势而为:大级别的趋势与动能过滤
一笔高胜算的交易,首先必须处于阻力最小的方向上。该策略在入场前,设置了两道宏观与中观的"防线",以过滤掉无序的震荡行情,绝不逆势接刀。
1. 均线定大势 (趋势过滤)
策略调用了 TqSdk 的 MA 算法,计算出经典的 20 周期简单移动平均线(MA20)。这根均线就是系统眼中的"牛熊分水岭"。价格运行在均线之上,系统自动屏蔽一切做空信号;反之则只做空。
2. 极值确动能 (通道突破)
仅仅顺势是不够的,趋势中的盘整期同样会消耗大量资金与耐心。策略计算了过去 20 根 K 线的最高价(HHV)和最低价(LLV)。只有当最新价格突破这个局部极值时,才认为沉寂的动能迎来了真正的爆发。
核心源码:
Python
# 1. 计算 20 周期均线作为多空分水岭
ma_df = MA(self.klines, TREND_MA_N)
ma_now = safe_float(ma_df["ma"].iloc[-2], None)
# 2. 获取过去 N 根 K 线的最高价与最低价(唐奇安通道概念)
highest_n = rolling_max(highs, idx, BREAKOUT_N, include_current=False)
lowest_n = rolling_min(lows, idx, BREAKOUT_N, include_current=False)
二、 见微知著:K线内部的多空力量解剖
如果说均线和通道是"望远镜",那么这一步就是策略的"显微镜"。大多数突破策略死于"假突破",而本策略通过解剖突破当天那根 K 线的内部结构,来辨别突破的真伪。
在突破发生的当天,策略会将当根 K 线的总振幅(最高价 - 最低价)视为 100% 的战场,并精细量化出四股力量的占比:
-
上影线力量 (
upper_ratio): 空方在上方构筑的火力压制(拒绝上涨)。 -
下影线力量 (
lower_ratio): 多方在下方形成的坚固堡垒(拒绝下跌)。 -
阳线实体力量 (
bull_body_ratio): 多方发起冲锋并成功占领的阵地(实际上涨空间)。 -
阴线实体力量 (
bear_body_ratio): 空方发起冲锋并成功占领的阵地(实际下跌空间)。
核心源码:
Python
# 定义单根K线的总振幅战场
full_range = high_now - low_now
# 计算各部分的绝对力量点数
upper_shadow = high_now - max(open_now, close_now)
lower_shadow = min(open_now, close_now) - low_now
bull_body = max(close_now - open_now, 0.0)
bear_body = max(open_now - close_now, 0.0)
# 转化为占总战场的百分比,精确量化买卖情绪
upper_ratio = upper_shadow / full_range
lower_ratio = lower_shadow / full_range
bull_body_ratio = bull_body / full_range
bear_body_ratio = bear_body / full_range
三、 信号共振:严苛的三重入场条件
当宏观环境、中观动能与微观形态发生完美共振时,策略才会真正扣动扳机。
以做多为例,完美的入场形态必须满足:大势朝上 + 价格创出新高 + K线展现出极强的下影线托底力量与饱满的阳线拉升力量。只有当微观层面的买卖力量呈现出压倒性的优势时,系统才会确认入场。
核心源码:
Python
# 多头开仓的“三重共振”信号
long_signal = (
lower_ratio > 0.5 and # 微观:下影线支撑占据绝对主导
bull_body_ratio > 0.5 and # 微观:阳线实体拉升极其坚决
close_now > ma_now and # 宏观:处于均线多头趋势中
high_now >= highest_n # 中观:向上突破局部前高
)
# 空头开仓信号同理
short_signal = (
upper_ratio > 0.5 and
bear_body_ratio > 0.5 and
close_now < ma_now and
low_now <= lowest_n
)
四、 进退有度:双重风控与动态止盈网络
会买只是徒弟,会卖才是师傅。在退出机制上,策略展现出了量化系统应有的冷酷与纪律。系统设计了一静一动两套风险管理网络:
1. 刚性的底线(固定百分比止损)
进场的一瞬间,系统会自动在建仓价的反方向设置 3% 的硬性止损线(FIXED_STOP_PCT)。这道防线不随行情波动而改变。一旦触及,说明入场的逻辑已经彻底证伪,系统将无条件断臂求生,绝不扛单。
2. 克制的贪婪(利润回撤止盈)
为了让利润奔跑,策略没有设置固定的止盈目标,而是采用了"动态追踪止盈"。进场后,系统会像雷达一样死死盯住价格到达过的最高浮盈点。如果价格继续顺势,系统就一直持有;但如果行情出现反转,从最高利润点回吐了 30% 的利润(TRAIL_PROFITBACK_PCT)时,系统会果断获利了结。
核心源码:
Python
# 以多头持仓的风控逻辑为例
self.stop_price = self.entry_price * (1 - FIXED_STOP_PCT) # 画出 3% 刚性止损红线
self.best_price = max(self.best_price, close_now) # 动态刷新曾经到达的最高点
# 动态止盈逻辑:
if self.best_price > self.entry_price:
# 计算当前的最大浮盈
floating_profit = self.best_price - self.entry_price
# 设置追踪止盈线:最高价回落利润的 30%
trail_line = self.best_price - floating_profit * TRAIL_PROFITBACK_PCT
# 跌破追踪线,立刻清仓落袋为安
if close_now <= trail_line:
print(f"多头盈利回撤止盈 | close={fmt(close_now)} <= trail_line={fmt(trail_line)}")
self.target_pos.set_target_volume(0)
return
# 跌破初始止损,认错离场
if close_now <= self.stop_price:
self.target_pos.set_target_volume(0)
五、 实战工程的细腻之处:无缝换月处理
在代码架构层面,这份策略包含了处理国内期货特有机制(主力连续合约换月)的成熟逻辑。
当我们在 TqSdk 中订阅诸如 KQ.m@SHFE.rb(螺纹钢主力连续合约)时,由于底层物理合约(如 rb2405 切换到 rb2410)会定期发生跳转,策略通过 pending_trade_symbol 机制实现了极其优雅的处理:当发现主力换月时,如果手中持有旧合约的仓位,策略不会盲目平仓导致摩擦成本,而是继续管理旧合约直至触发正常的止损/止盈离场,随后才在新合约上寻找下一次开仓机会。
结语
"多空力量博弈交易策略"向我们展示了一个极具层次感的量化思考框架。它不迷信单一的指标,而是将大环境(均线)、爆发点(突破通道)、买卖情绪(K线切片)与底线思维(固定+追踪风控)熔于一炉。
对于量化交易者而言,这份源码不仅提供了一个坚实的趋势突破交易骨架,更传递了一种在宏观中寻找方向、在微观中确认胜算的精妙交易哲学。
from datetime import date
from math import isnan
from tqsdk import TqApi, TqAuth, TqSim, TqBacktest, BacktestFinished
from tqsdk.lib import TargetPosTask
from tqsdk.ta import MA
# ============================================================
# 多空力量博弈交易策略(TqSdk 版)
#
# 核心逻辑:
# 1) 计算 N 日均线,用价格相对均线判断大趋势
# - close > MA -> 趋势偏多
# - close < MA -> 趋势偏空
#
# 2) 计算K线多空力量
# - 上影线力量(空头压力): (high - max(open, close)) / (high - low)
# - 下影线力量(多头推升): (min(open, close) - low) / (high - low)
# - 阳线实体力量(多头胜利): max(close - open, 0) / (high - low)
# - 阴线实体力量(空头胜利): max(open - close, 0) / (high - low)
#
# 3) 入场
# 做多:
# - 下影线力量 > 0.5
# - 阳线实体力量 > 0.5
# - close > MA
# - 当前 high >= 过去N根K线最高价
#
# 做空:
# - 上影线力量 > 0.5
# - 阴线实体力量 > 0.5
# - close < MA
# - 当前 low <= 过去N根K线最低价
#
# 4) 平仓/风控
# - 亏损状态:固定百分比止损
# - 盈利状态:从浮盈高点回撤一定比例止盈
#
# 说明:
# - 使用 TqBacktest / TargetPosTask / MA 官方接口
# - 主连信号,实际交易合约使用 underlying_symbol
# ============================================================
# ----------------------------
AUTH_USER = "" #快期账号
AUTH_PASSWORD = "" #快期密码
#
SYMBOL = "KQ.m@SHFE.rb"
START_DT = date(2021, 1, 1)
END_DT = date(2025, 1, 1)
INIT_BALANCE = 1_000_000
WEB_GUI = True
KLINE_SEC = 24 * 60 * 60
DATA_LENGTH = 320
TREND_MA_N = 20
BREAKOUT_N = 20
FIXED_STOP_PCT = 0.03 # 亏损3%止损
TRAIL_PROFITBACK_PCT = 0.30 # 盈利后回撤30%止盈
MAX_LOTS = 5
PRICE_MODE = "ACTIVE"
MIN_READY_BARS = 60
def safe_float(v, default=0.0):
try:
x = float(v)
if isnan(x):
return default
return x
except Exception:
return default
def fmt(v, n=4):
try:
return f"{float(v):.{n}f}"
except Exception:
return str(v)
def net_pos(position):
return int(getattr(position, "pos_long", 0)) - int(getattr(position, "pos_short", 0))
def rolling_max(values, idx, n, include_current=False):
end = idx + 1 if include_current else idx
start = end - n
if start < 0:
return None
vals = values[start:end]
if any(v is None for v in vals):
return None
return max(vals)
def rolling_min(values, idx, n, include_current=False):
end = idx + 1 if include_current else idx
start = end - n
if start < 0:
return None
vals = values[start:end]
if any(v is None for v in vals):
return None
return min(vals)
class LongShortPowerGameStrategy:
def __init__(self, api, signal_symbol):
self.api = api
self.signal_symbol = signal_symbol
self.account = api.get_account()
self.signal_quote = api.get_quote(signal_symbol)
self.klines = api.get_kline_serial(signal_symbol, KLINE_SEC, data_length=DATA_LENGTH)
self.trade_symbol = None
self.trade_quote = None
self.position = None
self.target_pos = None
self.last_bar_dt = None
self.pending_trade_symbol = None
self.last_switch_notice = None
self.direction = 0
self.entry_price = None
self.best_price = None # 多头记录最高价,空头记录最低价
self.stop_price = None
def resolve_trade_symbol(self):
if self.signal_symbol.startswith("KQ.m@"):
underlying = getattr(self.signal_quote, "underlying_symbol", None)
if underlying:
return underlying
return None
return self.signal_symbol
def ensure_trade_objects(self):
resolved = self.resolve_trade_symbol()
if not resolved:
return False
if self.trade_symbol is None:
self.trade_symbol = resolved
self.trade_quote = self.api.get_quote(self.trade_symbol)
self.position = self.api.get_position(self.trade_symbol)
self.target_pos = TargetPosTask(self.api, self.trade_symbol, price=PRICE_MODE)
print("初始化交易合约:", self.trade_symbol)
return True
if resolved != self.trade_symbol:
current_net = net_pos(self.position)
if current_net == 0:
if self.target_pos is not None:
self.target_pos.cancel()
while not self.target_pos.is_finished():
self.api.wait_update()
old_symbol = self.trade_symbol
self.trade_symbol = resolved
self.trade_quote = self.api.get_quote(self.trade_symbol)
self.position = self.api.get_position(self.trade_symbol)
self.target_pos = TargetPosTask(self.api, self.trade_symbol, price=PRICE_MODE)
self.pending_trade_symbol = None
self.last_switch_notice = None
print(f"主连映射切换,新的交易合约: {old_symbol} -> {self.trade_symbol}")
else:
self.pending_trade_symbol = resolved
notice_key = (self.trade_symbol, resolved)
if self.last_switch_notice != notice_key:
print(f"检测到主连切换,但当前有持仓,继续用原合约管理直至平仓。当前={self.trade_symbol},待切换={resolved}")
self.last_switch_notice = notice_key
return True
def sync_position_state(self):
current_net = net_pos(self.position)
if current_net > 0:
if self.direction <= 0:
self.direction = 1
self.entry_price = safe_float(getattr(self.trade_quote, "last_price", 0.0), 0.0)
self.best_price = self.entry_price
self.stop_price = self.entry_price * (1 - FIXED_STOP_PCT)
print(f"建仓完成 | LONG | entry={fmt(self.entry_price)} | stop={fmt(self.stop_price)}")
elif current_net < 0:
if self.direction >= 0:
self.direction = -1
self.entry_price = safe_float(getattr(self.trade_quote, "last_price", 0.0), 0.0)
self.best_price = self.entry_price
self.stop_price = self.entry_price * (1 + FIXED_STOP_PCT)
print(f"建仓完成 | SHORT | entry={fmt(self.entry_price)} | stop={fmt(self.stop_price)}")
else:
if self.direction != 0:
print("持仓归零,状态重置")
self.direction = 0
self.entry_price = None
self.best_price = None
self.stop_price = None
if current_net == 0 and self.pending_trade_symbol and self.pending_trade_symbol != self.trade_symbol:
if self.target_pos is not None:
self.target_pos.cancel()
while not self.target_pos.is_finished():
self.api.wait_update()
old_symbol = self.trade_symbol
self.trade_symbol = self.pending_trade_symbol
self.trade_quote = self.api.get_quote(self.trade_symbol)
self.position = self.api.get_position(self.trade_symbol)
self.target_pos = TargetPosTask(self.api, self.trade_symbol, price=PRICE_MODE)
print(f"原合约已平仓,执行切换: {old_symbol} -> {self.trade_symbol}")
self.pending_trade_symbol = None
self.last_switch_notice = None
def update_risk(self, close_now):
if self.direction == 0 or self.entry_price is None:
return
# 固定亏损止损始终有效
if self.direction > 0:
self.stop_price = self.entry_price * (1 - FIXED_STOP_PCT)
self.best_price = max(self.best_price, close_now)
# 盈利后回撤止盈
if self.best_price > self.entry_price:
floating_profit = self.best_price - self.entry_price
trail_line = self.best_price - floating_profit * TRAIL_PROFITBACK_PCT
if close_now <= trail_line:
print(f"多头盈利回撤止盈 | close={fmt(close_now)} <= trail_line={fmt(trail_line)}")
self.target_pos.set_target_volume(0)
return
if close_now <= self.stop_price:
print(f"多头固定止损 | close={fmt(close_now)} <= stop={fmt(self.stop_price)}")
self.target_pos.set_target_volume(0)
return
elif self.direction < 0:
self.stop_price = self.entry_price * (1 + FIXED_STOP_PCT)
self.best_price = min(self.best_price, close_now)
if self.best_price < self.entry_price:
floating_profit = self.entry_price - self.best_price
trail_line = self.best_price + floating_profit * TRAIL_PROFITBACK_PCT
if close_now >= trail_line:
print(f"空头盈利回撤止盈 | close={fmt(close_now)} >= trail_line={fmt(trail_line)}")
self.target_pos.set_target_volume(0)
return
if close_now >= self.stop_price:
print(f"空头固定止损 | close={fmt(close_now)} >= stop={fmt(self.stop_price)}")
self.target_pos.set_target_volume(0)
return
def on_bar(self):
idx = len(self.klines) - 2
if idx < MIN_READY_BARS:
return
opens = [safe_float(self.klines["open"].iloc[i], None) for i in range(len(self.klines))]
highs = [safe_float(self.klines["high"].iloc[i], None) for i in range(len(self.klines))]
lows = [safe_float(self.klines["low"].iloc[i], None) for i in range(len(self.klines))]
closes = [safe_float(self.klines["close"].iloc[i], None) for i in range(len(self.klines))]
open_now = opens[idx]
high_now = highs[idx]
low_now = lows[idx]
close_now = closes[idx]
if None in (open_now, high_now, low_now, close_now):
return
ma_df = MA(self.klines, TREND_MA_N)
ma_now = safe_float(ma_df["ma"].iloc[-2], None)
if ma_now is None:
return
highest_n = rolling_max(highs, idx, BREAKOUT_N, include_current=False)
lowest_n = rolling_min(lows, idx, BREAKOUT_N, include_current=False)
if None in (highest_n, lowest_n):
return
# K线振幅
full_range = high_now - low_now
if full_range <= 0:
return
upper_shadow = high_now - max(open_now, close_now)
lower_shadow = min(open_now, close_now) - low_now
bull_body = max(close_now - open_now, 0.0)
bear_body = max(open_now - close_now, 0.0)
upper_ratio = upper_shadow / full_range
lower_ratio = lower_shadow / full_range
bull_body_ratio = bull_body / full_range
bear_body_ratio = bear_body / full_range
long_signal = (
lower_ratio > 0.5 and
bull_body_ratio > 0.5 and
close_now > ma_now and
high_now >= highest_n
)
short_signal = (
upper_ratio > 0.5 and
bear_body_ratio > 0.5 and
close_now < ma_now and
low_now <= lowest_n
)
# 先处理已有持仓风控
if self.direction != 0:
self.update_risk(close_now)
return
if long_signal:
print(
f"开多 | close={fmt(close_now)} > MA={fmt(ma_now)} | "
f"high={fmt(high_now)} >= HHV={fmt(highest_n)} | "
f"lower_ratio={fmt(lower_ratio)} | bull_body_ratio={fmt(bull_body_ratio)}"
)
self.target_pos.set_target_volume(MAX_LOTS)
return
if short_signal:
print(
f"开空 | close={fmt(close_now)} < MA={fmt(ma_now)} | "
f"low={fmt(low_now)} <= LLV={fmt(lowest_n)} | "
f"upper_ratio={fmt(upper_ratio)} | bear_body_ratio={fmt(bear_body_ratio)}"
)
self.target_pos.set_target_volume(-MAX_LOTS)
return
print(
f"状态 | dir={self.direction} | close={fmt(close_now)} | MA={fmt(ma_now)} | "
f"HHV={fmt(highest_n)} | LLV={fmt(lowest_n)} | "
f"upper_ratio={fmt(upper_ratio)} | lower_ratio={fmt(lower_ratio)} | "
f"bull_body_ratio={fmt(bull_body_ratio)} | bear_body_ratio={fmt(bear_body_ratio)}"
)
def run(self):
print("多空力量博弈交易策略启动...")
while True:
self.api.wait_update()
if not self.ensure_trade_objects():
continue
self.sync_position_state()
if not self.api.is_changing(self.klines.iloc[-1], "datetime"):
continue
current_last_dt = self.klines.iloc[-1]["datetime"]
if self.last_bar_dt is None:
self.last_bar_dt = current_last_dt
continue
if current_last_dt != self.last_bar_dt:
self.last_bar_dt = current_last_dt
print("-" * 120)
print(f"新K线到来 | 数据合约={self.signal_symbol} | 交易合约={self.trade_symbol}")
self.on_bar()
def main():
sim = TqSim(INIT_BALANCE)
api = None
try:
api = TqApi(
account=sim,
backtest=TqBacktest(start_dt=START_DT, end_dt=END_DT),
auth=TqAuth(AUTH_USER, AUTH_PASSWORD),
web_gui=WEB_GUI,
)
strategy = LongShortPowerGameStrategy(api, SYMBOL)
strategy.run()
except BacktestFinished:
print("=" * 120)
print("回测结束")
print("trade_log:")
print(sim.trade_log)
print("tqsdk_stat:")
print(sim.tqsdk_stat)
finally:
if api is not None:
try:
api.close()
except Exception:
pass
if __name__ == "__main__":
main()