穿透K线的迷雾:多空力量博弈交易策略源码深度解析

在量化交易的浩瀚星海中,交易者们常常分化为两个阵营:一派是"趋势跟踪者",他们崇尚均线与通道,着眼于宏观的动能;另一派是"形态捕捉者",他们死磕K线组合与量价关系,试图在微观中洞察主力的买卖情绪。

今天我们要拆解的这套基于 Python 和 TqSdk 框架编写的 “多空力量博弈交易策略”,交出了一份将两者巧妙融合的答卷。它不仅拥有大级别的宏观视野,更具备拿着放大镜审视单根K线内部结构的细腻。它不盲目预测市场,而是通过"三重滤网"来确认一笔交易的绝对胜算:看大势、抓突破、审细节

以下,我们将穿透代码表象,为您层层剥开这套策略的逻辑内核。


一、 顺势而为:大级别的趋势与动能过滤

一笔高胜算的交易,首先必须处于阻力最小的方向上。该策略在入场前,设置了两道宏观与中观的"防线",以过滤掉无序的震荡行情,绝不逆势接刀。

1. 均线定大势 (趋势过滤)

策略调用了 TqSdkMA 算法,计算出经典的 20 周期简单移动平均线(MA20)。这根均线就是系统眼中的"牛熊分水岭"。价格运行在均线之上,系统自动屏蔽一切做空信号;反之则只做空。

2. 极值确动能 (通道突破)

仅仅顺势是不够的,趋势中的盘整期同样会消耗大量资金与耐心。策略计算了过去 20 根 K 线的最高价(HHV)和最低价(LLV)。只有当最新价格突破这个局部极值时,才认为沉寂的动能迎来了真正的爆发。

核心源码:

Python

# 1. 计算 20 周期均线作为多空分水岭
ma_df = MA(self.klines, TREND_MA_N)
ma_now = safe_float(ma_df["ma"].iloc[-2], None)

# 2. 获取过去 N 根 K 线的最高价与最低价(唐奇安通道概念)
highest_n = rolling_max(highs, idx, BREAKOUT_N, include_current=False)
lowest_n = rolling_min(lows, idx, BREAKOUT_N, include_current=False)


二、 见微知著:K线内部的多空力量解剖

如果说均线和通道是"望远镜",那么这一步就是策略的"显微镜"。大多数突破策略死于"假突破",而本策略通过解剖突破当天那根 K 线的内部结构,来辨别突破的真伪。

在突破发生的当天,策略会将当根 K 线的总振幅(最高价 - 最低价)视为 100% 的战场,并精细量化出四股力量的占比:

  • 上影线力量 (upper_ratio): 空方在上方构筑的火力压制(拒绝上涨)。

  • 下影线力量 (lower_ratio): 多方在下方形成的坚固堡垒(拒绝下跌)。

  • 阳线实体力量 (bull_body_ratio): 多方发起冲锋并成功占领的阵地(实际上涨空间)。

  • 阴线实体力量 (bear_body_ratio): 空方发起冲锋并成功占领的阵地(实际下跌空间)。

核心源码:

Python

# 定义单根K线的总振幅战场
full_range = high_now - low_now 

# 计算各部分的绝对力量点数
upper_shadow = high_now - max(open_now, close_now)
lower_shadow = min(open_now, close_now) - low_now
bull_body = max(close_now - open_now, 0.0)
bear_body = max(open_now - close_now, 0.0)

# 转化为占总战场的百分比,精确量化买卖情绪
upper_ratio = upper_shadow / full_range
lower_ratio = lower_shadow / full_range
bull_body_ratio = bull_body / full_range
bear_body_ratio = bear_body / full_range


三、 信号共振:严苛的三重入场条件

当宏观环境、中观动能与微观形态发生完美共振时,策略才会真正扣动扳机。

以做多为例,完美的入场形态必须满足:大势朝上 + 价格创出新高 + K线展现出极强的下影线托底力量与饱满的阳线拉升力量。只有当微观层面的买卖力量呈现出压倒性的优势时,系统才会确认入场。

核心源码:

Python

# 多头开仓的“三重共振”信号
long_signal = (
    lower_ratio > 0.5 and        # 微观:下影线支撑占据绝对主导
    bull_body_ratio > 0.5 and    # 微观:阳线实体拉升极其坚决
    close_now > ma_now and       # 宏观:处于均线多头趋势中
    high_now >= highest_n        # 中观:向上突破局部前高
)

# 空头开仓信号同理
short_signal = (
    upper_ratio > 0.5 and        
    bear_body_ratio > 0.5 and    
    close_now < ma_now and       
    low_now <= lowest_n          
)


四、 进退有度:双重风控与动态止盈网络

会买只是徒弟,会卖才是师傅。在退出机制上,策略展现出了量化系统应有的冷酷与纪律。系统设计了一静一动两套风险管理网络:

1. 刚性的底线(固定百分比止损)

进场的一瞬间,系统会自动在建仓价的反方向设置 3% 的硬性止损线(FIXED_STOP_PCT)。这道防线不随行情波动而改变。一旦触及,说明入场的逻辑已经彻底证伪,系统将无条件断臂求生,绝不扛单。

2. 克制的贪婪(利润回撤止盈)

为了让利润奔跑,策略没有设置固定的止盈目标,而是采用了"动态追踪止盈"。进场后,系统会像雷达一样死死盯住价格到达过的最高浮盈点。如果价格继续顺势,系统就一直持有;但如果行情出现反转,从最高利润点回吐了 30% 的利润(TRAIL_PROFITBACK_PCT)时,系统会果断获利了结。

核心源码:

Python

# 以多头持仓的风控逻辑为例
self.stop_price = self.entry_price * (1 - FIXED_STOP_PCT) # 画出 3% 刚性止损红线
self.best_price = max(self.best_price, close_now)         # 动态刷新曾经到达的最高点

# 动态止盈逻辑:
if self.best_price > self.entry_price:
    # 计算当前的最大浮盈
    floating_profit = self.best_price - self.entry_price
    # 设置追踪止盈线:最高价回落利润的 30%
    trail_line = self.best_price - floating_profit * TRAIL_PROFITBACK_PCT
    
    # 跌破追踪线,立刻清仓落袋为安
    if close_now <= trail_line:
        print(f"多头盈利回撤止盈 | close={fmt(close_now)} <= trail_line={fmt(trail_line)}")
        self.target_pos.set_target_volume(0) 
        return

# 跌破初始止损,认错离场
if close_now <= self.stop_price:
    self.target_pos.set_target_volume(0)


五、 实战工程的细腻之处:无缝换月处理

在代码架构层面,这份策略包含了处理国内期货特有机制(主力连续合约换月)的成熟逻辑。

当我们在 TqSdk 中订阅诸如 KQ.m@SHFE.rb(螺纹钢主力连续合约)时,由于底层物理合约(如 rb2405 切换到 rb2410)会定期发生跳转,策略通过 pending_trade_symbol 机制实现了极其优雅的处理:当发现主力换月时,如果手中持有旧合约的仓位,策略不会盲目平仓导致摩擦成本,而是继续管理旧合约直至触发正常的止损/止盈离场,随后才在新合约上寻找下一次开仓机会。


结语

"多空力量博弈交易策略"向我们展示了一个极具层次感的量化思考框架。它不迷信单一的指标,而是将大环境(均线)爆发点(突破通道)、买卖情绪(K线切片)底线思维(固定+追踪风控)熔于一炉。

对于量化交易者而言,这份源码不仅提供了一个坚实的趋势突破交易骨架,更传递了一种在宏观中寻找方向、在微观中确认胜算的精妙交易哲学。


from datetime import date
from math import isnan

from tqsdk import TqApi, TqAuth, TqSim, TqBacktest, BacktestFinished
from tqsdk.lib import TargetPosTask
from tqsdk.ta import MA


# ============================================================
# 多空力量博弈交易策略(TqSdk 版)
#
# 核心逻辑:
# 1) 计算 N 日均线,用价格相对均线判断大趋势
#    - close > MA -> 趋势偏多
#    - close < MA -> 趋势偏空
#
# 2) 计算K线多空力量
#    - 上影线力量(空头压力): (high - max(open, close)) / (high - low)
#    - 下影线力量(多头推升): (min(open, close) - low) / (high - low)
#    - 阳线实体力量(多头胜利): max(close - open, 0) / (high - low)
#    - 阴线实体力量(空头胜利): max(open - close, 0) / (high - low)
#
# 3) 入场
#    做多:
#      - 下影线力量 > 0.5
#      - 阳线实体力量 > 0.5
#      - close > MA
#      - 当前 high >= 过去N根K线最高价
#
#    做空:
#      - 上影线力量 > 0.5
#      - 阴线实体力量 > 0.5
#      - close < MA
#      - 当前 low <= 过去N根K线最低价
#
# 4) 平仓/风控
#    - 亏损状态:固定百分比止损
#    - 盈利状态:从浮盈高点回撤一定比例止盈
#
# 说明:
# - 使用 TqBacktest / TargetPosTask / MA 官方接口
# - 主连信号,实际交易合约使用 underlying_symbol
# ============================================================
# ----------------------------
AUTH_USER = ""   #快期账号
AUTH_PASSWORD = "" #快期密码
#
SYMBOL = "KQ.m@SHFE.rb"
START_DT = date(2021, 1, 1)
END_DT = date(2025, 1, 1)
INIT_BALANCE = 1_000_000
WEB_GUI = True

KLINE_SEC = 24 * 60 * 60
DATA_LENGTH = 320

TREND_MA_N = 20
BREAKOUT_N = 20

FIXED_STOP_PCT = 0.03       # 亏损3%止损
TRAIL_PROFITBACK_PCT = 0.30 # 盈利后回撤30%止盈

MAX_LOTS = 5
PRICE_MODE = "ACTIVE"
MIN_READY_BARS = 60


def safe_float(v, default=0.0):
    try:
        x = float(v)
        if isnan(x):
            return default
        return x
    except Exception:
        return default


def fmt(v, n=4):
    try:
        return f"{float(v):.{n}f}"
    except Exception:
        return str(v)


def net_pos(position):
    return int(getattr(position, "pos_long", 0)) - int(getattr(position, "pos_short", 0))


def rolling_max(values, idx, n, include_current=False):
    end = idx + 1 if include_current else idx
    start = end - n
    if start < 0:
        return None
    vals = values[start:end]
    if any(v is None for v in vals):
        return None
    return max(vals)


def rolling_min(values, idx, n, include_current=False):
    end = idx + 1 if include_current else idx
    start = end - n
    if start < 0:
        return None
    vals = values[start:end]
    if any(v is None for v in vals):
        return None
    return min(vals)


class LongShortPowerGameStrategy:
    def __init__(self, api, signal_symbol):
        self.api = api
        self.signal_symbol = signal_symbol

        self.account = api.get_account()
        self.signal_quote = api.get_quote(signal_symbol)
        self.klines = api.get_kline_serial(signal_symbol, KLINE_SEC, data_length=DATA_LENGTH)

        self.trade_symbol = None
        self.trade_quote = None
        self.position = None
        self.target_pos = None

        self.last_bar_dt = None
        self.pending_trade_symbol = None
        self.last_switch_notice = None

        self.direction = 0
        self.entry_price = None
        self.best_price = None   # 多头记录最高价,空头记录最低价
        self.stop_price = None

    def resolve_trade_symbol(self):
        if self.signal_symbol.startswith("KQ.m@"):
            underlying = getattr(self.signal_quote, "underlying_symbol", None)
            if underlying:
                return underlying
            return None
        return self.signal_symbol

    def ensure_trade_objects(self):
        resolved = self.resolve_trade_symbol()
        if not resolved:
            return False

        if self.trade_symbol is None:
            self.trade_symbol = resolved
            self.trade_quote = self.api.get_quote(self.trade_symbol)
            self.position = self.api.get_position(self.trade_symbol)
            self.target_pos = TargetPosTask(self.api, self.trade_symbol, price=PRICE_MODE)
            print("初始化交易合约:", self.trade_symbol)
            return True

        if resolved != self.trade_symbol:
            current_net = net_pos(self.position)
            if current_net == 0:
                if self.target_pos is not None:
                    self.target_pos.cancel()
                    while not self.target_pos.is_finished():
                        self.api.wait_update()

                old_symbol = self.trade_symbol
                self.trade_symbol = resolved
                self.trade_quote = self.api.get_quote(self.trade_symbol)
                self.position = self.api.get_position(self.trade_symbol)
                self.target_pos = TargetPosTask(self.api, self.trade_symbol, price=PRICE_MODE)

                self.pending_trade_symbol = None
                self.last_switch_notice = None
                print(f"主连映射切换,新的交易合约: {old_symbol} -> {self.trade_symbol}")
            else:
                self.pending_trade_symbol = resolved
                notice_key = (self.trade_symbol, resolved)
                if self.last_switch_notice != notice_key:
                    print(f"检测到主连切换,但当前有持仓,继续用原合约管理直至平仓。当前={self.trade_symbol},待切换={resolved}")
                    self.last_switch_notice = notice_key
        return True

    def sync_position_state(self):
        current_net = net_pos(self.position)

        if current_net > 0:
            if self.direction <= 0:
                self.direction = 1
                self.entry_price = safe_float(getattr(self.trade_quote, "last_price", 0.0), 0.0)
                self.best_price = self.entry_price
                self.stop_price = self.entry_price * (1 - FIXED_STOP_PCT)
                print(f"建仓完成 | LONG | entry={fmt(self.entry_price)} | stop={fmt(self.stop_price)}")
        elif current_net < 0:
            if self.direction >= 0:
                self.direction = -1
                self.entry_price = safe_float(getattr(self.trade_quote, "last_price", 0.0), 0.0)
                self.best_price = self.entry_price
                self.stop_price = self.entry_price * (1 + FIXED_STOP_PCT)
                print(f"建仓完成 | SHORT | entry={fmt(self.entry_price)} | stop={fmt(self.stop_price)}")
        else:
            if self.direction != 0:
                print("持仓归零,状态重置")
            self.direction = 0
            self.entry_price = None
            self.best_price = None
            self.stop_price = None

        if current_net == 0 and self.pending_trade_symbol and self.pending_trade_symbol != self.trade_symbol:
            if self.target_pos is not None:
                self.target_pos.cancel()
                while not self.target_pos.is_finished():
                    self.api.wait_update()

            old_symbol = self.trade_symbol
            self.trade_symbol = self.pending_trade_symbol
            self.trade_quote = self.api.get_quote(self.trade_symbol)
            self.position = self.api.get_position(self.trade_symbol)
            self.target_pos = TargetPosTask(self.api, self.trade_symbol, price=PRICE_MODE)

            print(f"原合约已平仓,执行切换: {old_symbol} -> {self.trade_symbol}")
            self.pending_trade_symbol = None
            self.last_switch_notice = None

    def update_risk(self, close_now):
        if self.direction == 0 or self.entry_price is None:
            return

        # 固定亏损止损始终有效
        if self.direction > 0:
            self.stop_price = self.entry_price * (1 - FIXED_STOP_PCT)
            self.best_price = max(self.best_price, close_now)

            # 盈利后回撤止盈
            if self.best_price > self.entry_price:
                floating_profit = self.best_price - self.entry_price
                trail_line = self.best_price - floating_profit * TRAIL_PROFITBACK_PCT
                if close_now <= trail_line:
                    print(f"多头盈利回撤止盈 | close={fmt(close_now)} <= trail_line={fmt(trail_line)}")
                    self.target_pos.set_target_volume(0)
                    return

            if close_now <= self.stop_price:
                print(f"多头固定止损 | close={fmt(close_now)} <= stop={fmt(self.stop_price)}")
                self.target_pos.set_target_volume(0)
                return

        elif self.direction < 0:
            self.stop_price = self.entry_price * (1 + FIXED_STOP_PCT)
            self.best_price = min(self.best_price, close_now)

            if self.best_price < self.entry_price:
                floating_profit = self.entry_price - self.best_price
                trail_line = self.best_price + floating_profit * TRAIL_PROFITBACK_PCT
                if close_now >= trail_line:
                    print(f"空头盈利回撤止盈 | close={fmt(close_now)} >= trail_line={fmt(trail_line)}")
                    self.target_pos.set_target_volume(0)
                    return

            if close_now >= self.stop_price:
                print(f"空头固定止损 | close={fmt(close_now)} >= stop={fmt(self.stop_price)}")
                self.target_pos.set_target_volume(0)
                return

    def on_bar(self):
        idx = len(self.klines) - 2
        if idx < MIN_READY_BARS:
            return

        opens = [safe_float(self.klines["open"].iloc[i], None) for i in range(len(self.klines))]
        highs = [safe_float(self.klines["high"].iloc[i], None) for i in range(len(self.klines))]
        lows = [safe_float(self.klines["low"].iloc[i], None) for i in range(len(self.klines))]
        closes = [safe_float(self.klines["close"].iloc[i], None) for i in range(len(self.klines))]

        open_now = opens[idx]
        high_now = highs[idx]
        low_now = lows[idx]
        close_now = closes[idx]

        if None in (open_now, high_now, low_now, close_now):
            return

        ma_df = MA(self.klines, TREND_MA_N)
        ma_now = safe_float(ma_df["ma"].iloc[-2], None)
        if ma_now is None:
            return

        highest_n = rolling_max(highs, idx, BREAKOUT_N, include_current=False)
        lowest_n = rolling_min(lows, idx, BREAKOUT_N, include_current=False)

        if None in (highest_n, lowest_n):
            return

        # K线振幅
        full_range = high_now - low_now
        if full_range <= 0:
            return

        upper_shadow = high_now - max(open_now, close_now)
        lower_shadow = min(open_now, close_now) - low_now
        bull_body = max(close_now - open_now, 0.0)
        bear_body = max(open_now - close_now, 0.0)

        upper_ratio = upper_shadow / full_range
        lower_ratio = lower_shadow / full_range
        bull_body_ratio = bull_body / full_range
        bear_body_ratio = bear_body / full_range

        long_signal = (
            lower_ratio > 0.5 and
            bull_body_ratio > 0.5 and
            close_now > ma_now and
            high_now >= highest_n
        )

        short_signal = (
            upper_ratio > 0.5 and
            bear_body_ratio > 0.5 and
            close_now < ma_now and
            low_now <= lowest_n
        )

        # 先处理已有持仓风控
        if self.direction != 0:
            self.update_risk(close_now)
            return

        if long_signal:
            print(
                f"开多 | close={fmt(close_now)} > MA={fmt(ma_now)} | "
                f"high={fmt(high_now)} >= HHV={fmt(highest_n)} | "
                f"lower_ratio={fmt(lower_ratio)} | bull_body_ratio={fmt(bull_body_ratio)}"
            )
            self.target_pos.set_target_volume(MAX_LOTS)
            return

        if short_signal:
            print(
                f"开空 | close={fmt(close_now)} < MA={fmt(ma_now)} | "
                f"low={fmt(low_now)} <= LLV={fmt(lowest_n)} | "
                f"upper_ratio={fmt(upper_ratio)} | bear_body_ratio={fmt(bear_body_ratio)}"
            )
            self.target_pos.set_target_volume(-MAX_LOTS)
            return

        print(
            f"状态 | dir={self.direction} | close={fmt(close_now)} | MA={fmt(ma_now)} | "
            f"HHV={fmt(highest_n)} | LLV={fmt(lowest_n)} | "
            f"upper_ratio={fmt(upper_ratio)} | lower_ratio={fmt(lower_ratio)} | "
            f"bull_body_ratio={fmt(bull_body_ratio)} | bear_body_ratio={fmt(bear_body_ratio)}"
        )

    def run(self):
        print("多空力量博弈交易策略启动...")
        while True:
            self.api.wait_update()

            if not self.ensure_trade_objects():
                continue

            self.sync_position_state()

            if not self.api.is_changing(self.klines.iloc[-1], "datetime"):
                continue

            current_last_dt = self.klines.iloc[-1]["datetime"]
            if self.last_bar_dt is None:
                self.last_bar_dt = current_last_dt
                continue

            if current_last_dt != self.last_bar_dt:
                self.last_bar_dt = current_last_dt
                print("-" * 120)
                print(f"新K线到来 | 数据合约={self.signal_symbol} | 交易合约={self.trade_symbol}")
                self.on_bar()


def main():
    sim = TqSim(INIT_BALANCE)
    api = None
    try:
        api = TqApi(
            account=sim,
            backtest=TqBacktest(start_dt=START_DT, end_dt=END_DT),
            auth=TqAuth(AUTH_USER, AUTH_PASSWORD),
            web_gui=WEB_GUI,
        )

        strategy = LongShortPowerGameStrategy(api, SYMBOL)
        strategy.run()

    except BacktestFinished:
        print("=" * 120)
        print("回测结束")
        print("trade_log:")
        print(sim.trade_log)
        print("tqsdk_stat:")
        print(sim.tqsdk_stat)

    finally:
        if api is not None:
            try:
                api.close()
            except Exception:
                pass


if __name__ == "__main__":
    main()