著名的三重滤网交易系统:基于TqSdk的多空力量博弈策略源码解析

穿透价格的表象:基于TqSdk的多空力量博弈策略源码解析

在量化交易的浩瀚星海中,交易者们常常面临一个经典的路线抉择:是做宏观的"趋势跟随",还是做微观的"形态捕捉"?

今天我们将深度拆解一份基于 Python 和 TqSdk 框架编写的"多空力量博弈交易策略"。这套策略交出了一份将两者巧妙融合的答卷------它通过均线看大势、通过通道抓突破,最后再用放大镜解剖单根K线的"多空力量",构建了一套严密的立体交易系统。

以下,我们将结合核心源码,为您层层剥开这套策略的逻辑内核。


一、 顺势而为:大级别的趋势与动能过滤

一笔高胜算的交易,首先必须处于阻力最小的方向上。该策略在入场前,设置了两道宏观与中观的"防线",以过滤掉无序的震荡行情。

1. 均线定大势

策略调用了 TqSdkMA 算法,计算出 20 周期简单移动平均线。这根均线就是系统眼中的"牛熊分水岭"。价格在均线之上只做多,在均线之下只做空。

2. 极值确动能

光有趋势不够,还要有爆发的动能。策略计算了过去 20 根 K 线的最高价(HHV)和最低价(LLV),只有当最新价格突破这个局部极值时,才认为动能真正显现。

核心源码:

Python

# 获取当前 20周期均线
ma_df = MA(self.klines, TREND_MA_N)
ma_now = safe_float(ma_df["ma"].iloc[-2], None)

# 获取过去 N 根 K 线的最高价与最低价(唐奇安通道)
highest_n = rolling_max(highs, idx, BREAKOUT_N, include_current=False)
lowest_n = rolling_min(lows, idx, BREAKOUT_N, include_current=False)


二、 见微知著:K线内部的多空力量博弈

如果说均线和通道是"望远镜",那么这一步就是策略的"显微镜"。在突破发生的当天,策略会将当根 K 线的总振幅(最高价 - 最低价)视为 100% 的战场,并精细量化出四股力量的占比。

  • 下影线力量 (lower_ratio): 代表下方支撑力,多方的防守底线。

  • 阳线实体力量 (bull_body_ratio): 代表绝对的上涨空间,多方的主动进攻。

  • 上影线力量 (upper_ratio): 代表上方抛压,空方的防守底线。

  • 阴线实体力量 (bear_body_ratio): 代表绝对的下跌空间,空方的主动进攻。

核心源码:

Python

full_range = high_now - low_now # 定义单根K线总振幅

# 计算各部分绝对点数
upper_shadow = high_now - max(open_now, close_now)
lower_shadow = min(open_now, close_now) - low_now
bull_body = max(close_now - open_now, 0.0)
bear_body = max(open_now - close_now, 0.0)

# 转化为占总战场的百分比
upper_ratio = upper_shadow / full_range
lower_ratio = lower_shadow / full_range
bull_body_ratio = bull_body / full_range
bear_body_ratio = bear_body / full_range


三、 信号共振:严苛的三重入场条件

当宏观环境与微观形态发生共振时,策略才会真正扣动扳机。

以做多为例,系统要求市场必须同时满足:站在均线之上 + 突破局部新高 + K线呈现强大的下方支撑与多头实体推进。只有买卖力量在微观层面呈现一边倒的态势,入场信号才被确认。

核心源码:

Python

# 多头开仓信号共振
long_signal = (
    lower_ratio > 0.5 and        # 微观:下影线支撑强劲
    bull_body_ratio > 0.5 and    # 微观:阳线实体拉升坚决
    close_now > ma_now and       # 宏观:多头趋势
    high_now >= highest_n        # 中观:突破局部前高
)

# 空头开仓信号共振
short_signal = (
    upper_ratio > 0.5 and        # 微观:上影线抛压沉重
    bear_body_ratio > 0.5 and    # 微观:阴线实体杀跌坚决
    close_now < ma_now and       # 宏观:空头趋势
    low_now <= lowest_n          # 中观:跌破局部前低
)


四、 进退有度:双重风控与动态止盈

"截断亏损,让利润奔跑"是这套策略退出机制的核心理念。系统设计了一静一动两套风险管理网络:

1. 刚性底线(固定止损)

进场瞬间,系统会根据开仓价设置反向 3% 的死止损。这道防线不随行情波动而改变,一旦触及,说明入场逻辑完全破裂,系统将无条件清仓。

2. 利润追踪(回撤止盈)

为了不被微小的震荡洗盘,同时保住胜利果实,系统会记录持仓期间到达的"最佳浮盈价"(best_price)。一旦价格从最高点回吐了最大利润的 30%,策略判断趋势可能发生反转,立刻落袋为安。

核心源码:

Python

# 以多头持仓的风控为例
self.stop_price = self.entry_price * (1 - FIXED_STOP_PCT) # 3% 刚性止损线
self.best_price = max(self.best_price, close_now)         # 动态刷新最高价

if self.best_price > self.entry_price:
# 计算当前最大浮盈
floating_profit = self.best_price - self.entry_price
# 设置追踪止盈线:最高价回落利润的 30%
trail_line = self.best_price - floating_profit * TRAIL_PROFITBACK_PCT

if close_now <= trail_line:
    print("多头盈利回撤止盈")
    self.target_pos.set_target_volume(0) # 清仓离场

五、 结语

"多空力量博弈交易策略"向我们展示了一个极具层次感的量化思考框架。它不迷信单一的指标,而是将大环境(均线)爆发点(突破)、买卖情绪(K线切片)底线思维(风控)熔于一炉。

对于量化开发者而言,这份源码不仅提供了一个可扩展的交易骨架,更传递了一种敬畏市场、步步为营的交易哲学。

六、 源码


from datetime import date
from math import isnan

from tqsdk import TqApi, TqAuth, TqSim, TqBacktest, BacktestFinished
from tqsdk.lib import TargetPosTask
from tqsdk.ta import MA


# ============================================================
# 多空力量博弈交易策略(TqSdk 版)
#
# 官方接口依据:
# - TqBacktest 用于回测。来源:TqSdk 官方回测文档
# - 主连回测建议通过 quote.underlying_symbol 切换到实际主力合约交易。来源:官方回测文档
# - MA 来自 tqsdk.ta。来源:tqsdk.ta 技术指标文档
# - TargetPosTask 用于目标持仓管理。来源:官方索引/用法文档
#
# 策略逻辑:
# 1) 趋势判断:
#    close > MA(N) -> 趋势偏多
#    close < MA(N) -> 趋势偏空
#
# 2) K线多空力量:
#    上影线力量 = (high - max(open, close)) / (high - low)
#    下影线力量 = (min(open, close) - low) / (high - low)
#    阳线实体力量 = max(close - open, 0) / (high - low)
#    阴线实体力量 = max(open - close, 0) / (high - low)
#
# 3) 入场:
#    做多:
#      下影线力量 > 0.5
#      阳线实体力量 > 0.5
#      close > MA
#      high >= 过去N根K线最高价
#
#    做空:
#      上影线力量 > 0.5
#      阴线实体力量 > 0.5
#      close < MA
#      low <= 过去N根K线最低价
#
# 4) 风控:
#    - 亏损固定百分比止损
#    - 盈利后从最佳浮盈价回撤一定比例止盈
# ============================================================
# ----------------------------
AUTH_USER = ""   #快期账号
AUTH_PASSWORD = "" #快期密码

SYMBOL = "KQ.m@SHFE.rb"
START_DT = date(2021, 1, 1)
END_DT = date(2025, 1, 1)
INIT_BALANCE = 1_000_000
WEB_GUI = True

KLINE_SEC = 24 * 60 * 60
DATA_LENGTH = 320

TREND_MA_N = 20
BREAKOUT_N = 20

FIXED_STOP_PCT = 0.03
TRAIL_PROFITBACK_PCT = 0.30

MAX_LOTS = 5
PRICE_MODE = "ACTIVE"
MIN_READY_BARS = 60


def safe_float(v, default=0.0):
    try:
        x = float(v)
        if isnan(x):
            return default
        return x
    except Exception:
        return default


def fmt(v, n=4):
    try:
        return f"{float(v):.{n}f}"
    except Exception:
        return str(v)


def net_pos(position):
    return int(getattr(position, "pos_long", 0)) - int(getattr(position, "pos_short", 0))


def rolling_max(values, idx, n, include_current=False):
    end = idx + 1 if include_current else idx
    start = end - n
    if start < 0:
        return None
    vals = values[start:end]
    if any(v is None for v in vals):
        return None
    return max(vals)


def rolling_min(values, idx, n, include_current=False):
    end = idx + 1 if include_current else idx
    start = end - n
    if start < 0:
        return None
    vals = values[start:end]
    if any(v is None for v in vals):
        return None
    return min(vals)


class LongShortPowerGameStrategy:
    def __init__(self, api, signal_symbol):
        self.api = api
        self.signal_symbol = signal_symbol

        self.account = api.get_account()
        self.signal_quote = api.get_quote(signal_symbol)
        self.klines = api.get_kline_serial(signal_symbol, KLINE_SEC, data_length=DATA_LENGTH)

        self.trade_symbol = None
        self.trade_quote = None
        self.position = None
        self.target_pos = None

        self.last_bar_dt = None
        self.pending_trade_symbol = None
        self.last_switch_notice = None

        self.direction = 0
        self.entry_price = None
        self.best_price = None
        self.stop_price = None

    def resolve_trade_symbol(self):
        if self.signal_symbol.startswith("KQ.m@"):
            underlying = getattr(self.signal_quote, "underlying_symbol", None)
            if underlying:
                return underlying
            return None
        return self.signal_symbol

    def ensure_trade_objects(self):
        resolved = self.resolve_trade_symbol()
        if not resolved:
            return False

        if self.trade_symbol is None:
            self.trade_symbol = resolved
            self.trade_quote = self.api.get_quote(self.trade_symbol)
            self.position = self.api.get_position(self.trade_symbol)
            self.target_pos = TargetPosTask(self.api, self.trade_symbol, price=PRICE_MODE)
            print("初始化交易合约:", self.trade_symbol)
            return True

        if resolved != self.trade_symbol:
            current_net = net_pos(self.position)
            if current_net == 0:
                if self.target_pos is not None:
                    self.target_pos.cancel()
                    while not self.target_pos.is_finished():
                        self.api.wait_update()

                old_symbol = self.trade_symbol
                self.trade_symbol = resolved
                self.trade_quote = self.api.get_quote(self.trade_symbol)
                self.position = self.api.get_position(self.trade_symbol)
                self.target_pos = TargetPosTask(self.api, self.trade_symbol, price=PRICE_MODE)

                self.pending_trade_symbol = None
                self.last_switch_notice = None
                print(f"主连映射切换,新的交易合约: {old_symbol} -> {self.trade_symbol}")
            else:
                self.pending_trade_symbol = resolved
                notice_key = (self.trade_symbol, resolved)
                if self.last_switch_notice != notice_key:
                    print(f"检测到主连切换,但当前有持仓,继续用原合约管理直至平仓。当前={self.trade_symbol},待切换={resolved}")
                    self.last_switch_notice = notice_key
        return True

    def sync_position_state(self):
        current_net = net_pos(self.position)

        if current_net > 0:
            if self.direction <= 0:
                self.direction = 1
                self.entry_price = safe_float(getattr(self.trade_quote, "last_price", 0.0), 0.0)
                self.best_price = self.entry_price
                self.stop_price = self.entry_price * (1 - FIXED_STOP_PCT)
                print(f"建仓完成 | LONG | entry={fmt(self.entry_price)} | stop={fmt(self.stop_price)}")
        elif current_net < 0:
            if self.direction >= 0:
                self.direction = -1
                self.entry_price = safe_float(getattr(self.trade_quote, "last_price", 0.0), 0.0)
                self.best_price = self.entry_price
                self.stop_price = self.entry_price * (1 + FIXED_STOP_PCT)
                print(f"建仓完成 | SHORT | entry={fmt(self.entry_price)} | stop={fmt(self.stop_price)}")
        else:
            if self.direction != 0:
                print("持仓归零,状态重置")
            self.direction = 0
            self.entry_price = None
            self.best_price = None
            self.stop_price = None

        if current_net == 0 and self.pending_trade_symbol and self.pending_trade_symbol != self.trade_symbol:
            if self.target_pos is not None:
                self.target_pos.cancel()
                while not self.target_pos.is_finished():
                    self.api.wait_update()

            old_symbol = self.trade_symbol
            self.trade_symbol = self.pending_trade_symbol
            self.trade_quote = self.api.get_quote(self.trade_symbol)
            self.position = self.api.get_position(self.trade_symbol)
            self.target_pos = TargetPosTask(self.api, self.trade_symbol, price=PRICE_MODE)

            print(f"原合约已平仓,执行切换: {old_symbol} -> {self.trade_symbol}")
            self.pending_trade_symbol = None
            self.last_switch_notice = None

    def update_risk(self, close_now):
        if self.direction == 0 or self.entry_price is None:
            return False

        if self.direction > 0:
            self.stop_price = self.entry_price * (1 - FIXED_STOP_PCT)
            self.best_price = max(self.best_price, close_now)

            if self.best_price > self.entry_price:
                floating_profit = self.best_price - self.entry_price
                trail_line = self.best_price - floating_profit * TRAIL_PROFITBACK_PCT
                if close_now <= trail_line:
                    print(f"多头盈利回撤止盈 | close={fmt(close_now)} <= trail_line={fmt(trail_line)}")
                    self.target_pos.set_target_volume(0)
                    return True

            if close_now <= self.stop_price:
                print(f"多头固定止损 | close={fmt(close_now)} <= stop={fmt(self.stop_price)}")
                self.target_pos.set_target_volume(0)
                return True

        elif self.direction < 0:
            self.stop_price = self.entry_price * (1 + FIXED_STOP_PCT)
            self.best_price = min(self.best_price, close_now)

            if self.best_price < self.entry_price:
                floating_profit = self.entry_price - self.best_price
                trail_line = self.best_price + floating_profit * TRAIL_PROFITBACK_PCT
                if close_now >= trail_line:
                    print(f"空头盈利回撤止盈 | close={fmt(close_now)} >= trail_line={fmt(trail_line)}")
                    self.target_pos.set_target_volume(0)
                    return True

            if close_now >= self.stop_price:
                print(f"空头固定止损 | close={fmt(close_now)} >= stop={fmt(self.stop_price)}")
                self.target_pos.set_target_volume(0)
                return True

        return False

    def on_bar(self):
        idx = len(self.klines) - 2
        if idx < MIN_READY_BARS:
            return

        opens = [safe_float(self.klines["open"].iloc[i], None) for i in range(len(self.klines))]
        highs = [safe_float(self.klines["high"].iloc[i], None) for i in range(len(self.klines))]
        lows = [safe_float(self.klines["low"].iloc[i], None) for i in range(len(self.klines))]
        closes = [safe_float(self.klines["close"].iloc[i], None) for i in range(len(self.klines))]

        open_now = opens[idx]
        high_now = highs[idx]
        low_now = lows[idx]
        close_now = closes[idx]

        if None in (open_now, high_now, low_now, close_now):
            return

        ma_df = MA(self.klines, TREND_MA_N)
        ma_now = safe_float(ma_df["ma"].iloc[-2], None)
        if ma_now is None:
            return

        highest_n = rolling_max(highs, idx, BREAKOUT_N, include_current=False)
        lowest_n = rolling_min(lows, idx, BREAKOUT_N, include_current=False)
        if None in (highest_n, lowest_n):
            return

        full_range = high_now - low_now
        if full_range <= 0:
            return

        upper_shadow = high_now - max(open_now, close_now)
        lower_shadow = min(open_now, close_now) - low_now
        bull_body = max(close_now - open_now, 0.0)
        bear_body = max(open_now - close_now, 0.0)

        upper_ratio = upper_shadow / full_range
        lower_ratio = lower_shadow / full_range
        bull_body_ratio = bull_body / full_range
        bear_body_ratio = bear_body / full_range

        long_signal = (
            lower_ratio > 0.5 and
            bull_body_ratio > 0.5 and
            close_now > ma_now and
            high_now >= highest_n
        )

        short_signal = (
            upper_ratio > 0.5 and
            bear_body_ratio > 0.5 and
            close_now < ma_now and
            low_now <= lowest_n
        )

        if self.direction != 0:
            if self.update_risk(close_now):
                return

        if self.direction == 0:
            if long_signal:
                print(
                    f"开多 | close={fmt(close_now)} > MA={fmt(ma_now)} | "
                    f"high={fmt(high_now)} >= HHV={fmt(highest_n)} | "
                    f"lower_ratio={fmt(lower_ratio)} | bull_body_ratio={fmt(bull_body_ratio)}"
                )
                self.target_pos.set_target_volume(MAX_LOTS)
                return

            if short_signal:
                print(
                    f"开空 | close={fmt(close_now)} < MA={fmt(ma_now)} | "
                    f"low={fmt(low_now)} <= LLV={fmt(lowest_n)} | "
                    f"upper_ratio={fmt(upper_ratio)} | bear_body_ratio={fmt(bear_body_ratio)}"
                )
                self.target_pos.set_target_volume(-MAX_LOTS)
                return

        elif self.direction > 0 and short_signal:
            print("多头反手开空")
            self.target_pos.set_target_volume(-MAX_LOTS)
            return

        elif self.direction < 0 and long_signal:
            print("空头反手开多")
            self.target_pos.set_target_volume(MAX_LOTS)
            return

        print(
            f"状态 | dir={self.direction} | close={fmt(close_now)} | MA={fmt(ma_now)} | "
            f"HHV={fmt(highest_n)} | LLV={fmt(lowest_n)} | "
            f"upper_ratio={fmt(upper_ratio)} | lower_ratio={fmt(lower_ratio)} | "
            f"bull_body_ratio={fmt(bull_body_ratio)} | bear_body_ratio={fmt(bear_body_ratio)} | "
            f"best_price={fmt(self.best_price)} | stop={fmt(self.stop_price)}"
        )

    def run(self):
        print("多空力量博弈交易策略启动...")
        while True:
            self.api.wait_update()

            if not self.ensure_trade_objects():
                continue

            self.sync_position_state()

            if not self.api.is_changing(self.klines.iloc[-1], "datetime"):
                continue

            current_last_dt = self.klines.iloc[-1]["datetime"]
            if self.last_bar_dt is None:
                self.last_bar_dt = current_last_dt
                continue

            if current_last_dt != self.last_bar_dt:
                self.last_bar_dt = current_last_dt
                print("-" * 120)
                print(f"新K线到来 | 数据合约={self.signal_symbol} | 交易合约={self.trade_symbol}")
                self.on_bar()


def main():
    sim = TqSim(INIT_BALANCE)
    api = None
    try:
        api = TqApi(
            account=sim,
            backtest=TqBacktest(start_dt=START_DT, end_dt=END_DT),
            auth=TqAuth(AUTH_USER, AUTH_PASSWORD),
            web_gui=WEB_GUI,
        )

        strategy = LongShortPowerGameStrategy(api, SYMBOL)
        strategy.run()

    except BacktestFinished:
        print("=" * 120)
        print("回测结束")
        print("trade_log:")
        print(sim.trade_log)
        print("tqsdk_stat:")
        print(sim.tqsdk_stat)

    finally:
        if api is not None:
            try:
                api.close()
            except Exception:
                pass


if __name__ == "__main__":
    main()

2 个赞