构建自己的投资策略:版本1(定稿时间 2025-12-13)

2025/12/13 投资 共 49608 字,约 142 分钟

引子

这篇文章是我的当前版本1的投资策略的复盘,从盲人摸象到两条可以长期执行的策略(中证1000+黄金),以及这过程中踩到的坑。

想看最终选定的投资策略,可以直接拉到文章末尾。

前言

25年10月以后,开通了chatgpt的会员,开始大量的使用ai。机缘巧合下,在使用ai的过程中开始了调整投资策略。

在用ai前,完全是盲人摸象般做投资,包括之前选基金,虽然用了一些筛选策略,但始终是无法大仓位信任自己构建的投资组合。

有了ai后,开始通过ai调整自己的投资决策。

品种1:中证一千,从滚贴水到合成多头

开始吃贴水

年初5月份的时候开始滚IM,吃贴水。滚到了9月,收益结构分化很大:

  • 累计指数上涨带来收益1360点

  • 贴水贡献272点

意味着该阶段的主要收益并非来自贴水本身,而是来自趋势。

这几个月的贴水有收敛趋势,每个月能吃到的贴水变少了。

使用看涨期权替代期货

基于“上涨仍参与、下跌有边界”的想法,9月中旬以后开始购买看涨期权。可以吃到指数上涨带来的利润,下跌时又有保底。

没过几天,10月份时遇到了问题:指数进入横盘阶段后,期权仓位会持续承受时间价值损耗。

为了降低时间价值损耗,在 AI 建议下尝试使用牛市价差(Bull Call Spread)。牛市价差更能抵抗时间价值的损耗。

随之又出现了新的难点:上涨与下跌时的滚动规则缺失。

牛市价差实操难点:决策树

在牛市价差结构下,主要有两类问题:

  • 指数上涨:应当整体上移价差,还是仅调整短腿(卖出端)?
  • 指数下跌:是否应将看涨期权向下滚动以提高 delta,从而增强反弹参与度?

这期间的试错得到了一个通用的经验:

  • 指数下跌时,将看涨期权向下滚动会增加反弹收益,但也会显著放大回撤与波动。这更像“提高参与度”的奖励,而非无成本的优化。

构建策略与回测

2025年11月初,开始打算使用历史数据对策略进行回测,避免主观决策带来的决策失误。回测区间是2022-08 ~ 2025-10。

对比结果:

  • 策略1:只持有近月期权合成期货,到期移仓

    • 总收益点数:2013
    • 最大回撤点数:2740
  • 策略11:合成多头 + 保护性 Put + 条件调整

    • 总收益点数:3859
    • 最大回撤点数:1708

最终选择策略 11 作为版本 1 的中证 1000 方案。

中证一千策略版本 1:合成多头 + 保护性 Put

策略的目标不是追求单独的极致的收益,而是实现更稳定的风险收益结构:在贴水中获取收益,同时通过保护性 Put 减少回撤。并且成为时间的朋友,不怕指数横盘。

1.持仓结构

1.使用看涨期权和看跌期权合成当月的期货多头

  • 行权价选择离指数价格低的最近一档

2.持有当月的保护性看跌期权

  • 选择delta最接近-0.15这档

2.移仓/调仓策略

1.当月合约到期前3天,整体移仓到次月
2.指数价格每波动5%,对保护性期权进行调仓,重新调整到delta=-0.15这档
3.如果指数下跌,导致看涨期权的delta < 0.20,调整看涨期权到delta=0.6这档

实操示例(以 2025-12-12 收盘为例)

基本信息

  • 中证一千现货价格 7370
  • IM2601报价 7286
  • 到期日20260116
  • 剩余交易日24

下面是T型报价

call deltacall价格行权价put价格put delta
0.88616670024.4-0.10
0.86519680035-0.13
0.81435690046.4-0.18
0.76350700066-0.24
0.68276710089.8-0.31
0.592157200125-0.4
0.491557300166-0.49

开仓

假设之前没开仓,现在进行开仓

  1. 合成多头选择现货下方的最近一档
  • 买入一手MO-2601-C-7300
  • 卖出一手MO-2601-P-7300
  1. 保护put 选择delta=-0.15
  • 买入 一手MO-2601-P-6800

调仓

若建仓指数价 = 7370:

  • 5% = 7370 × 0.05 = 368.5

  • 上阈值 ≈ 7370 + 368.5 = 7739

  • 下阈值 ≈ 7370 - 368.5 = 7001

因此:

  • 指数 > 7739 或 < 7001:保护性 Put 调整回 delta≈-0.15

  • 若下跌导致 Call delta < 0.20:将 Call 滚动至 delta≈0.6

品种2:黄金,从看涨期权到备兑策略

开始入场

国庆节后,黄金有了一波疯涨的行情,让我动了心思将黄金纳入策略体系。此前虽长期关注黄金及相关配置框架(例如永久投资组合中对黄金的配置),但一直没实际行动。

最开始的想法很简单,就是直接买看涨期权,等黄金涨。

但随着价格的上涨,策略遇到了问题,应该加仓,还是止盈,如何锁定收益

复杂化的失败

一开始对手上的一手黄金做了牛市价差。

随着黄金的冲高,遇到了跟上面中证一千期权一样的问题。当时的选择是把牛市价差的短腿(卖出端)买回。随着后面黄金大幅回调,触发了盈利的大幅回撤。

10月份在实验各种策略,最后搞成了相当复杂的铁鹰策略:

买入P856,卖出P880,买入P920,买入C940,卖出C976

意识到复杂结构往往并不必然带来更高的收益风险比,反而增加执行负担与错误概率。

开始寻找简单的策略

回测与选择:备兑策略

策略的目标是降低回撤,并收益率接近期货,并执行简单。

使用22年1月到25年10月的数据进行测算,对比“单纯持有期货”与若干期权增强策略。最终选择 S7_pct_covered_call_30‰(第七个验证策略)。

策略: S1_fut_only 只持有一手期货

  • 年化收益率: 28.85%
  • 年化波动率: 12.01%
  • 最大回撤: -10.31%

策略: S7_pct_covered_call_30‰ 备兑

  • 年化收益率: 25.26%
  • 年化波动率: 6.39%
  • 最大回撤: -5.68%

策略优点在于执行简单,最大回撤显著降低,并且不怕横盘。

黄金策略版本1

开仓

1.持有一手主力合约期货
2.卖出高于开仓价3%的看涨期权(备兑)

移仓/调仓

期权到期前3-5天移仓到下个主力合约

总结

再次感谢ai的发展,不然光上面提到的回测代码,至少要花费我一周完整的时间进行开发与测试。我业务时间本来就不多,需要的时间太长会导致中途放弃,最后回到了老路。

版本1的策略都是时间的朋友,完全不怕横盘。

版本1的最终策略清单

中证1000 MO/IM

开仓结构:

1.使用看涨期权和看跌期权合成当月的期货多头,行权价选择离指数价格低的最近一档
2.额外持有当月的保护性看跌期权,选择delta最接近-0.15这档

移仓/调仓策略:

1.当月合约到期前3天,整体移仓到次月
2.指数价格每波动5%,对保护性期权进行调仓,重新调整到delta=-0.15这档
3.如果指数下跌,导致看涨期权的delta < 0.20,调整看涨期权到delta=0.6这档

黄金

开仓结构:

1.持有一手主力合约期货
2.卖出高于开仓价3%的看涨期权(备兑)

移仓/调仓策略:

期权到期前3-5天移仓到下个主力合约

回测代码

中证一千

代码

# -*- coding: utf-8 -*-
"""
中证1000(IM/MO)策略回测:把收益 / 风险拆到每个细项

相对你原来的代码,这个版本新增:
1) 合成腿(+Call -Put) 拆分:
   - Call 多头 pnl
   - Put 空头 pnl
   - 合成腿总 pnl = Call pnl + Put空头 pnl
2) Strategy 11 拆分:
   - 合成腿总 pnl
   - 保护性 Put pnl
   - Strategy11 总 pnl = 合成腿总 pnl + 保护Put pnl
3) 输出“规则带来的增量效果”:
   - 看涨期权 delta 触发滚动: SynthRoll - SynthFixed
   - 保护Put的增量: Strategy11 - SynthRoll(等价于保护Put pnl)
4) 输出每个细项的绩效指标(收益/波动/夏普/最大回撤等)
5) 保存每日明细到 CSV,方便画图与进一步归因
6) 输出换档/再平衡日志 CSV,方便核对触发次数与换档细节

注意:
- 仍沿用你原来的 close-to-close 计价方式;
- 合约切换发生在当日收盘后,下一交易日开始用新合约计算 pnl;
- 不显式计入手续费、滑点、保证金占用与利息。
"""

import pandas as pd
import numpy as np
import datetime as dt
import math
from typing import Optional, Dict


# ============================================================
# 配置区:文件路径 & 策略参数
# ============================================================

FUTURES_CSV_PATH = "merged_cleaned_only_im_mo.csv"   # IM + MO 在同一个文件
OPTIONS_CSV_PATH = "merged_cleaned_only_im_mo.csv"
INDEX_CSV_PATH = "中证1000_PE-TTM_市值加权_10年_20251123_164337.csv"

TRADING_DAYS_PER_YEAR = 252

# --- 持有期货:前月到期前 DTE<=3(按日历天)滚动到次月 ---
FUT_ROLL_DTE = 3

# --- Strategy 11 参数 ---
S11_CALL_DELTA_TRIGGER = 0.20   # Call delta < 0.20 触发 roll-down
S11_CALL_TARGET_DELTA  = 0.60   # roll-down 后选 delta 最接近 0.60 的 call(只允许 strike <= 当前strike)
S11_PUT_DELTA_TARGET   = -0.15  # 保护 Put 目标 delta
S11_PUT_REBAL_MOVE     = 0.05   # 指数相对上次重置价 |S/S_ref-1| >= 5% 才重选 put

# 是否保存每日明细(强烈建议打开,方便画图归因)
SAVE_DAILY_DETAIL = True
DAILY_DETAIL_PATH = "backtest_detail_strategy11_breakdown.csv"

# 是否保存“交易/换档”日志(方便验证触发次数)
SAVE_TRADE_LOG = True
TRADE_LOG_PATH = "trade_log_strategy11.csv"

# 是否输出最基础的三条策略净值(兼容你原先的输出)
SAVE_EQUITY_CURVE = True
EQUITY_CURVE_PATH = "equity_curve_3strategies.csv"


# ============================================================
# 通用工具
# ============================================================

def read_csv_auto(path: str) -> pd.DataFrame:
    """自动尝试常见编码读 CSV(utf-8-sig / utf-8 / gbk)。"""
    for enc in ("utf-8-sig", "utf-8", "gbk"):
        try:
            return pd.read_csv(path, encoding=enc)
        except UnicodeDecodeError:
            continue
    return pd.read_csv(path)  # 最后兜底


def third_friday(year: int, month: int) -> dt.date:
    """合约到期日近似:合约月份第三个星期五(未处理法定假日顺延)。"""
    d = dt.date(year, month, 15)
    days_to_friday = (4 - d.weekday()) % 7  # Friday=4
    return d + dt.timedelta(days=days_to_friday)


def get_contract_expiry_from_code(code: str) -> dt.date:
    """
    从 IM 合约代码推断到期日,例如 IM2208 -> 2022年8月第三个星期五。
    """
    i = 0
    while i < len(code) and not code[i].isdigit():
        i += 1
    digits = code[i:]
    if len(digits) < 4:
        raise ValueError(f"Unexpected contract code: {code}")
    year = 2000 + int(digits[:2])
    month = int(digits[2:4])
    return third_friday(year, month)


def parse_eq_num(x):
    """指数CSV里可能是 '=7067.7000' 形式,去 '=' 转 float。"""
    if isinstance(x, str):
        s = x.strip()
        if s.startswith("="):
            s = s[1:]
        if s == "":
            return np.nan
        try:
            return float(s)
        except ValueError:
            return np.nan
    return x


def extract_month_from_im_code(code: str) -> Optional[str]:
    """IM2208 -> '2208'"""
    i = 0
    while i < len(code) and not code[i].isdigit():
        i += 1
    digits = ''.join(ch for ch in code[i:] if ch.isdigit())
    return digits[:4] if len(digits) >= 4 else None


def pick_synth_initial_strike_by_spot(
    calls: pd.DataFrame,
    puts: pd.DataFrame,
    spot: float,
    offset_steps: int = 0,
) -> Optional[float]:
    """
    选择“合成多头(+Call -Put)”开仓时的初始行权价 K0:
      1) strikes 必须同时存在 call + put
      2) base=<=spot 的最大 strike(若没有 <=spot,则取最小 strike)
      3) offset_steps:0=base;-1 更ITM一档;+1 更OTM一档(越界截断)
    """
    if calls is None or puts is None or calls.empty or puts.empty:
        return None
    if spot is None or (isinstance(spot, float) and math.isnan(spot)):
        return None

    strikes = sorted(set(calls["strike"]).intersection(set(puts["strike"])))
    if not strikes:
        return None
    strikes = np.array(strikes, dtype=float)
    strikes.sort()

    below = strikes[strikes <= float(spot)]
    base = float(below.max()) if below.size > 0 else float(strikes.min())

    base_idx = int(np.where(strikes == base)[0][0])
    new_idx = max(0, min(base_idx + int(offset_steps), len(strikes) - 1))
    return float(strikes[new_idx])


# ============================================================
# 数据加载
# ============================================================

def load_index_data(path: str) -> pd.Series:
    """读取指数日线,返回 Series(index=交易日期, values=index_close)。"""
    df = read_csv_auto(path)
    df["日期"] = pd.to_datetime(df["日期"])
    df["index_close"] = df["收盘点位"].apply(parse_eq_num)
    df = df[["日期", "index_close"]].rename(columns={"日期": "交易日期"}).sort_values("交易日期")
    return df.set_index("交易日期")["index_close"]


def load_futures_data(path: str, index_series: pd.Series) -> pd.DataFrame:
    """读取 IM 期货,计算 expiry / DTE,并合并指数收盘。"""
    df = read_csv_auto(path)
    df["交易日期"] = pd.to_datetime(df["交易日期"])
    df = df[df["合约代码"].astype(str).str.startswith("IM")].copy()
    df = df.drop_duplicates(subset=["交易日期", "合约代码"]).sort_values(["交易日期", "合约代码"])

    df["expiry"] = pd.to_datetime(df["合约代码"].apply(get_contract_expiry_from_code))
    df["DTE"] = (df["expiry"] - df["交易日期"]).dt.days  # 日历天

    df["今收盘"] = pd.to_numeric(df["今收盘"], errors="coerce")

    # 合并指数收盘
    df = df.merge(index_series.rename("index_close"), left_on="交易日期", right_index=True, how="left")
    return df


def load_options_data(path: str) -> pd.DataFrame:
    """
    读取 MO 期权:
      - 解析 opt_month='yyMM', opt_type='C'/'P', strike=float
      - delta 若存在则转为 float
    """
    df = read_csv_auto(path)
    df["交易日期"] = pd.to_datetime(df["交易日期"])
    df = df[df["合约代码"].astype(str).str.startswith("MO")].copy()
    df = df.drop_duplicates(subset=["交易日期", "合约代码"]).sort_values(["交易日期", "合约代码"])

    parts = df["合约代码"].astype(str).str.split("-", expand=True)
    # MO2208-C-6200
    df["opt_month"] = parts[0].str[2:6]
    df["opt_type"] = parts[1].astype(str).str.upper()
    df["strike"] = pd.to_numeric(parts[2], errors="coerce").astype(float)

    df["今收盘"] = pd.to_numeric(df["今收盘"], errors="coerce")
    if "delta" in df.columns:
        df["delta"] = pd.to_numeric(df["delta"], errors="coerce")

    def ym_to_expiry(ym: str):
        y2 = int(ym[:2]); m = int(ym[2:4])
        return third_friday(2000 + y2, m)

    df["expiry"] = pd.to_datetime(df["opt_month"].apply(ym_to_expiry))
    df["DTE"] = (df["expiry"] - df["交易日期"]).dt.days
    return df


def make_futures_pivot(fut_df: pd.DataFrame) -> pd.DataFrame:
    """期货宽表:index=交易日期, columns=合约代码, values=今收盘"""
    return (fut_df[["交易日期", "合约代码", "今收盘"]]
            .drop_duplicates()
            .pivot(index="交易日期", columns="合约代码", values="今收盘")
            .sort_index())


def make_options_pivot(opt_df: pd.DataFrame) -> pd.DataFrame:
    """期权宽表:index=交易日期, columns=合约代码, values=今收盘"""
    return (opt_df[["交易日期", "合约代码", "今收盘"]]
            .drop_duplicates()
            .pivot(index="交易日期", columns="合约代码", values="今收盘")
            .sort_index())


# ============================================================
# 构造“持有期货”仓位(近月+滚动)
# ============================================================

def build_near_df(fut_df: pd.DataFrame) -> pd.DataFrame:
    """每交易日找 near1/near2(按 DTE 升序)。"""
    df = fut_df[fut_df["DTE"] >= 0].copy()
    df = df.sort_values(["交易日期", "DTE"])
    df["cc"] = df.groupby("交易日期").cumcount()

    near1 = df[df["cc"] == 0][["交易日期", "合约代码", "DTE"]].rename(
        columns={"合约代码": "near1", "DTE": "DTE1"}
    )
    near2 = df[df["cc"] == 1][["交易日期", "合约代码", "DTE"]].rename(
        columns={"合约代码": "near2", "DTE": "DTE2"}
    )
    return near1.merge(near2, on="交易日期", how="left")


def build_front_month_positions(near_df: pd.DataFrame, roll_dte_threshold: int = 3) -> pd.DataFrame:
    """
    持有期货:一直持 near1,当 DTE1<=阈值 且 near2 存在时,切换 near2。
    """
    df = near_df.sort_values("交易日期").copy()

    def choose(row):
        if (row["DTE1"] <= roll_dte_threshold) and pd.notna(row["near2"]):
            return row["near2"]
        return row["near1"]

    df["contract"] = df.apply(choose, axis=1)
    return df[["交易日期", "contract"]].set_index("交易日期")


def compute_futures_pnl(fut_pivot: pd.DataFrame, pos: pd.DataFrame) -> pd.Series:
    """期货单边多头:用上一日合约计算 d-1->d 的价格变动。"""
    dates = pos.index.intersection(fut_pivot.index).sort_values()
    pos = pos.reindex(dates)
    fut_pivot = fut_pivot.reindex(dates)

    pnl = np.zeros(len(dates))
    contracts = pos["contract"].values

    for i in range(1, len(dates)):
        d = dates[i]
        d_prev = dates[i - 1]
        c_prev = contracts[i - 1]
        if not isinstance(c_prev, str) or c_prev not in fut_pivot.columns:
            continue
        F_t = fut_pivot.at[d, c_prev]
        F_prev = fut_pivot.at[d_prev, c_prev]
        if pd.isna(F_t) or pd.isna(F_prev):
            continue
        pnl[i] = F_t - F_prev

    return pd.Series(pnl, index=dates, name="pnl_futures")


# ============================================================
# 合成腿:(+Call -Put) + Call delta 触发 roll-down(拆分为各腿 pnl)
# ============================================================

def compute_synth_legs_with_call_roll_by_delta(
    options_df: pd.DataFrame,
    options_pivot: pd.DataFrame,
    fut_pos: pd.DataFrame,
    index_series: pd.Series,
    delta_trigger: float,
    target_delta: float,
    init_k_offset_steps: int = 0,
) -> pd.DataFrame:
    """
    合成腿:+Call(K) - Put(K0)
      - 初始K0:入场日 K0=<=spot 最近一档(可 offset_steps 偏移)
      - Put 固定为初始K0
      - Call:若当日收盘时 call_delta < delta_trigger,则只允许 strike<=当前strike 的范围内,
              选 delta 最接近 target_delta 的 call(roll-down)
      - 日度 pnl 用“上一日持有合约”计算:close-to-close(与原脚本一致)
      - 输出拆分:
          pnl_call:Call 多头
          pnl_short_put:Put 空头
          pnl_synth = pnl_call + pnl_short_put
    """
    dates = (fut_pos.index
             .intersection(options_pivot.index)
             .intersection(index_series.index)).sort_values()

    fut_pos = fut_pos.reindex(dates)
    S = index_series.reindex(dates)

    cols = set(options_pivot.columns)
    opt = options_df.copy()
    if "delta" in opt.columns:
        opt["delta"] = pd.to_numeric(opt["delta"], errors="coerce")

    opt_group = opt.groupby(["交易日期", "opt_month", "opt_type"])

    n = len(dates)
    pnl_call = np.zeros(n)
    pnl_short_put = np.zeros(n)
    pnl_synth = np.zeros(n)

    call_codes = [None] * n
    put_codes = [None] * n
    call_strikes = [np.nan] * n
    put_strikes = [np.nan] * n
    call_deltas = [np.nan] * n
    put_deltas = [np.nan] * n

    # 记录换档事件(发生在当日收盘后)
    flag_call_roll_delta = [False] * n  # 由于 delta<触发值滚动
    flag_call_roll_month = [False] * n  # 由于换月(期货合约切换)重建K0

    state: Dict[str, dict] = {}  # fut_contract -> {"call_code","call_strike","put_code","put_strike"}

    for i, d in enumerate(dates):
        fut_contract = fut_pos.at[d, "contract"]
        spot = S.loc[d]

        # --- 先决定当天收盘后要持有什么合约(给下一日用) ---
        if isinstance(fut_contract, str) and pd.notna(spot):
            ym = extract_month_from_im_code(fut_contract)
            if ym is not None:
                try:
                    calls = opt_group.get_group((d, ym, "C"))
                    puts  = opt_group.get_group((d, ym, "P"))
                except KeyError:
                    calls, puts = None, None

                if calls is not None and puts is not None and (not calls.empty) and (not puts.empty):
                    key = fut_contract
                    prev_fut = fut_pos.at[dates[i-1], "contract"] if i > 0 else None
                    is_new_contract = (key not in state) or (i > 0 and fut_contract != prev_fut)

                    if is_new_contract:
                        K0 = pick_synth_initial_strike_by_spot(
                            calls, puts, float(spot), offset_steps=init_k_offset_steps
                        )
                        if K0 is not None:
                            call_row = calls[calls["strike"] == K0].iloc[0]
                            put_row  = puts[puts["strike"] == K0].iloc[0]
                            state[key] = {
                                "call_code": call_row["合约代码"],
                                "call_strike": float(K0),
                                "put_code": put_row["合约代码"],
                                "put_strike": float(K0),
                            }
                            flag_call_roll_month[i] = True
                    else:
                        # 同一合约:检查 call delta
                        cur_call = state[key]["call_code"]
                        cur_strike = state[key]["call_strike"]
                        cur_row = calls[calls["合约代码"] == cur_call]
                        cur_delta = np.nan
                        if (not cur_row.empty) and ("delta" in calls.columns):
                            cur_delta = cur_row.iloc[0].get("delta", np.nan)

                        if pd.notna(cur_delta) and float(cur_delta) < float(delta_trigger):
                            candidates = calls[(calls["strike"] <= cur_strike) & calls["delta"].notna()].copy()
                            if not candidates.empty:
                                idx = (candidates["delta"] - target_delta).abs().idxmin()
                                new_row = candidates.loc[idx]
                                new_call = new_row["合约代码"]
                                new_strike = float(new_row["strike"])
                                if new_call != cur_call:
                                    state[key]["call_code"] = new_call
                                    state[key]["call_strike"] = new_strike
                                    flag_call_roll_delta[i] = True

                    if key in state:
                        call_codes[i] = state[key]["call_code"]
                        put_codes[i]  = state[key]["put_code"]
                        call_strikes[i] = state[key]["call_strike"]
                        put_strikes[i]  = state[key]["put_strike"]

                        # 记录当日所选合约的 delta(用于诊断)
                        c_row = calls[calls["合约代码"] == call_codes[i]]
                        if (not c_row.empty) and ("delta" in calls.columns):
                            call_deltas[i] = float(c_row.iloc[0].get("delta", np.nan))
                        p_row = puts[puts["合约代码"] == put_codes[i]]
                        if (not p_row.empty) and ("delta" in puts.columns):
                            put_deltas[i] = float(p_row.iloc[0].get("delta", np.nan))

        # --- 再用“上一日持有的合约”计算今天 pnl(close-to-close) ---
        if i == 0:
            continue
        d_prev = dates[i - 1]
        c_prev = call_codes[i - 1]
        p_prev = put_codes[i - 1]
        if isinstance(c_prev, str) and isinstance(p_prev, str) and (c_prev in cols) and (p_prev in cols):
            C_t = options_pivot.at[d, c_prev]
            C_prev = options_pivot.at[d_prev, c_prev]
            P_t = options_pivot.at[d, p_prev]
            P_prev = options_pivot.at[d_prev, p_prev]
            if not any(pd.isna(x) for x in [C_t, C_prev, P_t, P_prev]):
                pnl_call[i] = (C_t - C_prev)
                pnl_short_put[i] = -(P_t - P_prev)   # 空头 Put
                pnl_synth[i] = pnl_call[i] + pnl_short_put[i]

    res = pd.DataFrame({
        "pnl_call": pnl_call,
        "pnl_short_put": pnl_short_put,
        "pnl_synth": pnl_synth,

        "call_code": call_codes,
        "put_code": put_codes,
        "call_strike": call_strikes,
        "put_strike": put_strikes,
        "call_delta": call_deltas,
        "put_delta": put_deltas,

        "flag_call_roll_delta": flag_call_roll_delta,
        "flag_call_roll_month": flag_call_roll_month,
    }, index=dates)

    res["cum_pnl_call"] = res["pnl_call"].cumsum()
    res["cum_pnl_short_put"] = res["pnl_short_put"].cumsum()
    res["cum_pnl_synth"] = res["pnl_synth"].cumsum()
    return res


# ============================================================
# Strategy11 保护 Put:delta target + 5% 再平衡(输出明细)
# ============================================================

def build_protective_put_position_series_delta_target_detail(
    fut_pos: pd.DataFrame,
    index_series: pd.Series,
    opt_df: pd.DataFrame,
    delta_target: float,
    move_threshold: float,
) -> pd.DataFrame:
    """
    保护 Put:
      - 月份跟随期货当月(IMyyMM -> MOyyMM)
      - 选同月份 put 中 delta 最接近 delta_target 的那档
      - 初次建仓/换月:强制选
      - 之后 |S/S_ref-1| >= move_threshold 才重选
    输出:
      put_contract / put_strike / put_delta
      reset_flag / reset_reason / ref_price
    """
    if "delta" not in opt_df.columns:
        raise ValueError("期权数据缺少 delta 列,保护 Put 选档需要 delta。")

    dates = (fut_pos.index
             .intersection(index_series.index)
             .intersection(opt_df["交易日期"].unique()))
    dates = pd.Index(sorted(dates))

    fut_pos = fut_pos.reindex(dates)
    S = index_series.reindex(dates)

    opt_put = opt_df[(opt_df["opt_type"] == "P") & opt_df["delta"].notna()].copy()
    opt_put["delta"] = pd.to_numeric(opt_put["delta"], errors="coerce")
    grouped = opt_put.groupby("交易日期")

    put_contracts = []
    put_strikes = []
    put_deltas = []
    reset_flags = []
    reset_reasons = []
    ref_prices = []

    current_put = None
    current_month = None
    ref_price = None
    current_strike = np.nan
    current_delta = np.nan

    for d in dates:
        fut_contract = fut_pos.at[d, "contract"]
        spot = S.loc[d]

        if (not isinstance(fut_contract, str)) or pd.isna(spot):
            put_contracts.append(None)
            put_strikes.append(np.nan)
            put_deltas.append(np.nan)
            reset_flags.append(False)
            reset_reasons.append("no_data")
            ref_prices.append(np.nan)

            current_put = None
            current_month = None
            ref_price = None
            current_strike = np.nan
            current_delta = np.nan
            continue

        ym = extract_month_from_im_code(fut_contract)
        if ym is None or d not in grouped.groups:
            # 当日无报价:延续昨日仓位
            put_contracts.append(current_put)
            put_strikes.append(current_strike)
            put_deltas.append(current_delta)
            reset_flags.append(False)
            reset_reasons.append("no_quote")
            ref_prices.append(ref_price if ref_price is not None else np.nan)
            continue

        day_opts = grouped.get_group(d)
        candidates = day_opts[day_opts["opt_month"] == ym]
        if candidates.empty:
            put_contracts.append(current_put)
            put_strikes.append(current_strike)
            put_deltas.append(current_delta)
            reset_flags.append(False)
            reset_reasons.append("no_candidate")
            ref_prices.append(ref_price if ref_price is not None else np.nan)
            continue

        need_reset = False
        reason = "hold"

        if (current_put is None) or (current_month != ym) or (ref_price is None):
            need_reset = True
            reason = "init" if current_put is None else "month_roll"
        else:
            move = abs(float(spot) / float(ref_price) - 1.0)
            if move >= move_threshold:
                need_reset = True
                reason = "spot_move"

        if need_reset:
            idx = (candidates["delta"] - delta_target).abs().idxmin()
            row = candidates.loc[idx]

            current_put = row["合约代码"]
            current_month = ym
            ref_price = float(spot)

            current_strike = float(row["strike"])
            current_delta = float(row["delta"])

            reset_flags.append(True)
            reset_reasons.append(reason)
        else:
            reset_flags.append(False)
            reset_reasons.append(reason)

        put_contracts.append(current_put)
        put_strikes.append(current_strike)
        put_deltas.append(current_delta)
        ref_prices.append(ref_price if ref_price is not None else np.nan)

    return pd.DataFrame({
        "put_contract": put_contracts,
        "put_strike": put_strikes,
        "put_delta": put_deltas,
        "reset_flag": reset_flags,
        "reset_reason": reset_reasons,
        "ref_price": ref_prices,
    }, index=dates)


def compute_put_pnl_detail(opt_pivot: pd.DataFrame, put_pos_detail: pd.DataFrame, ratio: float = 1.0) -> pd.DataFrame:
    """保护 Put 多头 PnL:用上一日 put_contract 计算差分,并带上换档标记。"""
    dates = put_pos_detail.index.intersection(opt_pivot.index).sort_values()
    put_pos_detail = put_pos_detail.reindex(dates)
    opt_pivot = opt_pivot.reindex(dates)

    pnl = np.zeros(len(dates))
    contracts = put_pos_detail["put_contract"].values
    cols = set(opt_pivot.columns)

    for i in range(1, len(dates)):
        d = dates[i]
        d_prev = dates[i - 1]
        c_prev = contracts[i - 1]
        if not isinstance(c_prev, str) or c_prev not in cols:
            continue
        P_t = opt_pivot.at[d, c_prev]
        P_prev = opt_pivot.at[d_prev, c_prev]
        if pd.isna(P_t) or pd.isna(P_prev):
            continue
        pnl[i] = (P_t - P_prev) * float(ratio)

    res = put_pos_detail.copy()
    res["pnl_put"] = pnl
    res["cum_pnl_put"] = res["pnl_put"].cumsum()
    res["flag_put_roll"] = res["put_contract"].ne(res["put_contract"].shift(1))  # 当日收盘后换档
    return res


# ============================================================
# 绩效统计 + 输出工具
# ============================================================

def performance_stats(pnl: pd.Series, trading_days_per_year: int = 252) -> dict:
    pnl = pnl.fillna(0.0)
    mean_daily = float(pnl.mean())
    std_daily = float(pnl.std(ddof=0))
    sharpe = np.nan if std_daily == 0 else mean_daily / std_daily * math.sqrt(trading_days_per_year)

    cum = pnl.cumsum()
    running_max = cum.cummax()
    dd = cum - running_max
    max_dd = float(dd.min()) if len(dd) else 0.0

    return {
        "total_pnl_points": float(cum.iloc[-1]) if len(cum) else 0.0,
        "mean_daily_pnl": mean_daily,
        "std_daily_pnl": std_daily,
        "sharpe_annual": float(sharpe) if sharpe == sharpe else np.nan,
        "max_drawdown_points": max_dd,
    }


def make_stats_table(pnl_dict: Dict[str, pd.Series], trading_days_per_year: int = 252) -> pd.DataFrame:
    rows = []
    for name, s in pnl_dict.items():
        s = s.fillna(0.0)
        st = performance_stats(s, trading_days_per_year)
        rows.append({
            "name": name,
            **st,
            "win_rate": float((s > 0).mean()) if len(s) else np.nan,
            "best_day": float(s.max()) if len(s) else np.nan,
            "worst_day": float(s.min()) if len(s) else np.nan,
        })
    return pd.DataFrame(rows).set_index("name")


def variance_contribution(components: pd.DataFrame) -> pd.DataFrame:
    """
    方差贡献分解(对 Strategy11 的“合成腿 vs 保护Put”很直观):
        Var(total) = Σ Cov(component_i, total)
    若某个 component 与 total 负相关,Cov 可能为负 -> 表示“降低波动”的贡献。
    """
    df = components.fillna(0.0)
    total = df.sum(axis=1)
    var_total = float(total.var(ddof=0))

    out = []
    for col in df.columns:
        if len(df) == 0:
            cov = np.nan
        else:
            cov = float(np.cov(df[col].values, total.values, ddof=0)[0, 1])
        out.append({
            "component": col,
            "cov_with_total": cov,
            "var_total": var_total,
            "pct_of_total_var": (cov / var_total) if (var_total != 0 and cov == cov) else np.nan,
        })

    return pd.DataFrame(out).set_index("component")


# ============================================================
# 主程序
# ============================================================

def main():
    pd.set_option("display.max_columns", 200)
    pd.set_option("display.width", 200)

    print("Loading index...")
    index_series = load_index_data(INDEX_CSV_PATH)

    print("Loading futures...")
    fut_df = load_futures_data(FUTURES_CSV_PATH, index_series)

    print("Loading options...")
    opt_df = load_options_data(OPTIONS_CSV_PATH)

    print("Building pivots...")
    fut_pivot = make_futures_pivot(fut_df)
    opt_pivot = make_options_pivot(opt_df)

    # ---- A:持有期货(近月+T-3滚动) ----
    near_df = build_near_df(fut_df)
    pos = build_front_month_positions(near_df, roll_dte_threshold=FUT_ROLL_DTE)
    pnl_fut = compute_futures_pnl(fut_pivot, pos)

    # ---- B:合成期货(固定K0,不做 delta roll) ----
    synth_fixed = compute_synth_legs_with_call_roll_by_delta(
        options_df=opt_df,
        options_pivot=opt_pivot,
        fut_pos=pos,
        index_series=index_series,
        delta_trigger=-1.0,  # 永不触发
        target_delta=S11_CALL_TARGET_DELTA,
        init_k_offset_steps=0
    )

    # ---- C:合成期货(Call delta 触发 roll-down) ----
    synth_roll = compute_synth_legs_with_call_roll_by_delta(
        options_df=opt_df,
        options_pivot=opt_pivot,
        fut_pos=pos,
        index_series=index_series,
        delta_trigger=S11_CALL_DELTA_TRIGGER,
        target_delta=S11_CALL_TARGET_DELTA,
        init_k_offset_steps=0
    )

    # ---- D:保护 Put(delta=-0.15 + 5% 再平衡) ----
    put_pos_detail = build_protective_put_position_series_delta_target_detail(
        fut_pos=pos,
        index_series=index_series,
        opt_df=opt_df,
        delta_target=S11_PUT_DELTA_TARGET,
        move_threshold=S11_PUT_REBAL_MOVE
    )
    protect_put = compute_put_pnl_detail(opt_pivot, put_pos_detail, ratio=1.0)

    # ---- 对齐日期:统一在同一交易日集合上比较 ----
    common_dates = (pnl_fut.index
                    .intersection(synth_fixed.index)
                    .intersection(synth_roll.index)
                    .intersection(protect_put.index)
                    .sort_values())

    pnl_fut_c = pnl_fut.reindex(common_dates).fillna(0.0)

    pnl_synth_fixed_call_c = synth_fixed["pnl_call"].reindex(common_dates).fillna(0.0)
    pnl_synth_fixed_short_put_c = synth_fixed["pnl_short_put"].reindex(common_dates).fillna(0.0)
    pnl_synth_fixed_c = synth_fixed["pnl_synth"].reindex(common_dates).fillna(0.0)

    pnl_synth_roll_call_c = synth_roll["pnl_call"].reindex(common_dates).fillna(0.0)
    pnl_synth_roll_short_put_c = synth_roll["pnl_short_put"].reindex(common_dates).fillna(0.0)
    pnl_synth_roll_c = synth_roll["pnl_synth"].reindex(common_dates).fillna(0.0)

    pnl_protect_put_c = protect_put["pnl_put"].reindex(common_dates).fillna(0.0)

    pnl_strategy11_c = pnl_synth_roll_c + pnl_protect_put_c

    # =========================
    # 绩效输出:每个细项
    # =========================
    pnl_dict = {
        # 基准
        "A_Futures": pnl_fut_c,

        # 合成固定K0
        "B_SynthFixed_Total": pnl_synth_fixed_c,
        "B_SynthFixed_Call": pnl_synth_fixed_call_c,
        "B_SynthFixed_ShortPut": pnl_synth_fixed_short_put_c,

        # 合成+Call调整
        "C_SynthRoll_Total": pnl_synth_roll_c,
        "C_SynthRoll_Call": pnl_synth_roll_call_c,
        "C_SynthRoll_ShortPut": pnl_synth_roll_short_put_c,

        # 保护Put
        "D_ProtectPut": pnl_protect_put_c,

        # Strategy 11
        "E_Strategy11_Total": pnl_strategy11_c,

        # 规则增量
        "Effect_CallAdjust (Roll-Fixed)": (pnl_synth_roll_c - pnl_synth_fixed_c),
        "Effect_ProtectPut (S11-Roll)": (pnl_strategy11_c - pnl_synth_roll_c),  # 等价于 D_ProtectPut
    }

    stats_df = make_stats_table(pnl_dict, TRADING_DAYS_PER_YEAR)

    print("\n================= PERFORMANCE SUMMARY (Aligned Dates) =================")
    print(f"Total aligned trading days: {len(common_dates)}")
    print(stats_df.round(4).to_string())

    # =========================
    # 触发次数 / 行为统计
    # =========================
    call_delta_rolls = int(synth_roll.loc[common_dates, "flag_call_roll_delta"].sum())
    call_month_rolls = int(synth_roll.loc[common_dates, "flag_call_roll_month"].sum())

    put_resets_total = int(protect_put.loc[common_dates, "reset_flag"].sum())
    put_resets_by_reason = protect_put.loc[common_dates].loc[protect_put["reset_flag"], "reset_reason"].value_counts()

    print("\n================= TRADING / REBALANCE COUNTS =================")
    print(f"Call roll-down by delta (<{S11_CALL_DELTA_TRIGGER}) count: {call_delta_rolls}")
    print(f"Synth month-roll (re-init K0) count: {call_month_rolls}")
    print(f"Protect put resets (total): {put_resets_total}")
    print("Protect put resets by reason:")
    if len(put_resets_by_reason):
        print(put_resets_by_reason.to_string())
    else:
        print("  (none)")

    # =========================
    # 相关性与方差贡献(解释“保护Put降低风险”很直观)
    # =========================
    corr = pd.DataFrame({
        "synth_roll": pnl_synth_roll_c,
        "protect_put": pnl_protect_put_c,
        "strategy11_total": pnl_strategy11_c
    }).corr()

    var_contrib = variance_contribution(pd.DataFrame({
        "synth_roll": pnl_synth_roll_c,
        "protect_put": pnl_protect_put_c
    }))

    print("\n================= CORRELATION (Daily PnL) =================")
    print(corr.round(4).to_string())

    print("\n================= VARIANCE CONTRIBUTION (Strategy11) =================")
    print(var_contrib.round(6).to_string())

    # =========================
    # 输出每日明细 CSV(画图/归因用)
    # =========================
    if SAVE_DAILY_DETAIL:
        detail = pd.DataFrame(index=common_dates)
        detail["index_close"] = index_series.reindex(common_dates)

        detail["fut_contract"] = pos.reindex(common_dates)["contract"]

        # synth fixed
        detail["synth_fixed_call_code"] = synth_fixed.reindex(common_dates)["call_code"]
        detail["synth_fixed_put_code"] = synth_fixed.reindex(common_dates)["put_code"]
        detail["pnl_synth_fixed_call"] = pnl_synth_fixed_call_c
        detail["pnl_synth_fixed_short_put"] = pnl_synth_fixed_short_put_c
        detail["pnl_synth_fixed_total"] = pnl_synth_fixed_c

        # synth roll
        detail["synth_roll_call_code"] = synth_roll.reindex(common_dates)["call_code"]
        detail["synth_roll_put_code"] = synth_roll.reindex(common_dates)["put_code"]
        detail["synth_roll_call_delta"] = synth_roll.reindex(common_dates)["call_delta"]
        detail["flag_call_roll_delta"] = synth_roll.reindex(common_dates)["flag_call_roll_delta"]
        detail["flag_call_roll_month"] = synth_roll.reindex(common_dates)["flag_call_roll_month"]
        detail["pnl_synth_roll_call"] = pnl_synth_roll_call_c
        detail["pnl_synth_roll_short_put"] = pnl_synth_roll_short_put_c
        detail["pnl_synth_roll_total"] = pnl_synth_roll_c

        # protective put
        detail["protect_put_code"] = protect_put.reindex(common_dates)["put_contract"]
        detail["protect_put_delta"] = protect_put.reindex(common_dates)["put_delta"]
        detail["protect_put_strike"] = protect_put.reindex(common_dates)["put_strike"]
        detail["flag_put_reset"] = protect_put.reindex(common_dates)["reset_flag"]
        detail["put_reset_reason"] = protect_put.reindex(common_dates)["reset_reason"]
        detail["put_ref_price"] = protect_put.reindex(common_dates)["ref_price"]
        detail["pnl_protect_put"] = pnl_protect_put_c

        # totals
        detail["pnl_futures"] = pnl_fut_c
        detail["pnl_strategy11"] = pnl_strategy11_c
        detail["pnl_effect_call_adjust"] = pnl_synth_roll_c - pnl_synth_fixed_c

        # cum
        detail["cum_futures"] = detail["pnl_futures"].cumsum()
        detail["cum_synth_fixed"] = detail["pnl_synth_fixed_total"].cumsum()
        detail["cum_synth_roll"] = detail["pnl_synth_roll_total"].cumsum()
        detail["cum_protect_put"] = detail["pnl_protect_put"].cumsum()
        detail["cum_strategy11"] = detail["pnl_strategy11"].cumsum()

        detail.to_csv(DAILY_DETAIL_PATH, encoding="utf-8-sig")
        print(f"\nSaved daily detail to: {DAILY_DETAIL_PATH}")

    # =========================
    # 输出换档日志(可选)
    # =========================
    if SAVE_TRADE_LOG:
        # Call delta-roll 事件
        call_roll_log = synth_roll.loc[common_dates].copy()
        call_roll_log["prev_call"] = call_roll_log["call_code"].shift(1)
        call_roll_log["prev_call_strike"] = call_roll_log["call_strike"].shift(1)
        call_roll_log["prev_call_delta"] = call_roll_log["call_delta"].shift(1)

        call_roll_log = call_roll_log[call_roll_log["flag_call_roll_delta"]].copy()
        call_roll_log["event"] = "call_roll_by_delta"
        call_roll_log = call_roll_log[[
            "event",
            "prev_call", "call_code",
            "prev_call_strike", "call_strike",
            "prev_call_delta", "call_delta",
        ]]

        # Protect put reset 事件
        put_log = protect_put.loc[common_dates].copy()
        put_log["prev_put"] = put_log["put_contract"].shift(1)
        put_log["prev_put_strike"] = put_log["put_strike"].shift(1)
        put_log["prev_put_delta"] = put_log["put_delta"].shift(1)

        put_log = put_log[put_log["reset_flag"]].copy()
        put_log["event"] = "protect_put_reset"
        put_log = put_log[[
            "event",
            "reset_reason",
            "prev_put", "put_contract",
            "prev_put_strike", "put_strike",
            "prev_put_delta", "put_delta",
            "ref_price",
        ]]

        trade_log = pd.concat([call_roll_log, put_log], axis=0).sort_index()
        trade_log.to_csv(TRADE_LOG_PATH, encoding="utf-8-sig")
        print(f"Saved trade/rebalance log to: {TRADE_LOG_PATH}")

    # =========================
    # 兼容你原来的三策略净值输出(但现在更细)
    # =========================
    if SAVE_EQUITY_CURVE:
        out = pd.DataFrame({
            "pnl_futures": pnl_fut_c,
            "pnl_synth_fixed": pnl_synth_fixed_c,
            "pnl_strategy11": pnl_strategy11_c,
        }, index=common_dates)
        out["cum_futures"] = out["pnl_futures"].cumsum()
        out["cum_synth_fixed"] = out["pnl_synth_fixed"].cumsum()
        out["cum_strategy11"] = out["pnl_strategy11"].cumsum()
        out.to_csv(EQUITY_CURVE_PATH, encoding="utf-8-sig")
        print(f"Saved equity curve to: {EQUITY_CURVE_PATH}")


if __name__ == "__main__":
    main()

结果:


Loading index...
Loading futures...
Loading options...
Building pivots...

================= PERFORMANCE SUMMARY (Aligned Dates) =================
Total aligned trading days: 788
                                total_pnl_points  mean_daily_pnl  std_daily_pnl  sharpe_annual  max_drawdown_points  win_rate  best_day  worst_day
name                                                                                                                                              
A_Futures                                 2051.2          2.6030        92.8232         0.4452              -2735.4    0.5216     451.2     -614.4
B_SynthFixed_Total                        2013.4          2.5551        91.7748         0.4420              -2740.0    0.5216     450.2     -572.2
B_SynthFixed_Call                         1610.4          2.0437        53.3489         0.6081              -1130.4    0.4251     450.4     -428.8
B_SynthFixed_ShortPut                      403.0          0.5114        60.3544         0.1345              -1675.8    0.5876     429.2     -569.0
C_SynthRoll_Total                         3215.8          4.0810       106.2306         0.6098              -2584.8    0.5241     697.2     -650.8
C_SynthRoll_Call                          2812.8          3.5695        60.5655         0.9356               -927.6    0.4657     450.4     -428.8
C_SynthRoll_ShortPut                       403.0          0.5114        60.3544         0.1345              -1675.8    0.5876     429.2     -569.0
D_ProtectPut                               643.4          0.8165        30.0795         0.4309               -512.0    0.3325     620.2     -129.8
E_Strategy11_Total                        3859.2          4.8975        90.6871         0.8573              -1708.8    0.5063     567.4     -382.4
Effect_CallAdjust (Roll-Fixed)            1202.4          1.5259        25.5752         0.9471               -438.4    0.1231     267.4     -113.8
Effect_ProtectPut (S11-Roll)               643.4          0.8165        30.0795         0.4309               -512.0    0.3325     620.2     -129.8

================= TRADING / REBALANCE COUNTS =================
Call roll-down by delta (<0.2) count: 21
Synth month-roll (re-init K0) count: 40
Protect put resets (total): 84
Protect put resets by reason:
reset_reason
spot_move     44
month_roll    39
init           1

================= CORRELATION (Daily PnL) =================
                  synth_roll  protect_put  strategy11_total
synth_roll            1.0000      -0.6205            0.9656
protect_put          -0.6205       1.0000           -0.3952
strategy11_total      0.9656      -0.3952            1.0000

================= VARIANCE CONTRIBUTION (Strategy11) =================
             cov_with_total    var_total  pct_of_total_var
component                                                 
synth_roll      9302.152058  8224.144511          1.131078
protect_put    -1078.007547  8224.144511         -0.131078

Saved daily detail to: backtest_detail_strategy11_breakdown.csv
Saved trade/rebalance log to: trade_log_strategy11.csv
Saved equity curve to: equity_curve_3strategies.csv

Process finished with exit code 0

黄金

代码

# -*- coding: utf-8 -*-
"""
au_fut_vs_cc3pct_backtest_breakdown.py

沪金(AU, 上期所) 两策略对比回测(带“收益/风险拆解”):

S1: FuturesOnly
    - 持有主力连续期货(按“期权到期前 N 日”共同换月构造连续)

S2: CoveredCall_3pct(备兑)
    - 持有一手期货 + 每个换月周期卖出 3% OTM Call(行权价≈S0*1.03)
    - 行权价选择:优先选 >= target_strike 的最小行权价(往上最近一档)
                 若所有行权价都 < target_strike,则选最大行权价(避免空缺)
    - 期权腿日度盯市(结算价差分),未计手续费/滑点/保证金

本版本在原回测基础上,新增:
1) Covered Call 的 PnL 分解:
   - CC_futures_leg:期货腿 PnL
   - CC_short_call:卖出看涨期权腿 PnL(盯市)
   - CC_short_call_intrinsic:内在价值变化导致的 PnL(近似 delta/gamma)
   - CC_short_call_time:时间价值变化导致的 PnL(近似 theta/vega)
   且 CC_short_call = intrinsic + time,CoveredCall_total = futures + short_call

2) 输出“收益贡献”与“风险贡献”:
   - 收益贡献:各分项累计 PnL / 占比
   - 风险贡献:基于日收益可加性的方差分解
       Var(r_total) = Σ Cov(r_i, r_total)
     输出每个分项对方差/波动的贡献(可能为负,表示对冲/降低波动)

数据格式:Excel(上期所风格)
- sheet_name="黄金期货": columns 至少包含 ["合约","交易日期","结算价"]
- sheet_name="黄金期权": columns 至少包含 ["合约","交易日期","结算价"]
  期权合约编码假定形如:underlying(6位) + 'C'/'P' + strike
  例如:au2406C520 / au2406P480
"""

from __future__ import annotations

import numpy as np
import pandas as pd
from pathlib import Path
from dataclasses import dataclass
from typing import Dict, Tuple, Optional, List


# =====================
# 全局配置(AU)
# =====================
FUT_MULT: float = 1000.0
OPT_MULT: float = 1000.0

TRADING_DAYS_PER_YEAR: int = 252
BACKTEST_START_DATE: str = "2022-01-04"

# 纯期货视角的兜底换月(日历里用不到时也保留)
ROLL_DAYS: int = 7

# 期货与期权共同换月:期权到期前 N 个交易日
OPT_ROLL_DAYS_BEFORE_EXPIRY: int = 5


# =====================
# 数据加载
# =====================
def load_shfe_au_excel(
    excel_path: str,
    start_date: Optional[str] = BACKTEST_START_DATE,
    end_date: Optional[str] = None,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    读取上期所黄金期货/期权 Excel,返回 (fut_df, opt_df)。

    为提高速度/内存占用,直接 usecols 读取所需列。
    """
    p = Path(excel_path)
    if not p.exists():
        raise FileNotFoundError(f"找不到文件: {excel_path}")

    xls = pd.ExcelFile(p)

    fut_df = pd.read_excel(xls, sheet_name="黄金期货", usecols=["合约", "交易日期", "结算价"])
    opt_df = pd.read_excel(xls, sheet_name="黄金期权", usecols=["合约", "交易日期", "结算价"])

    fut_df["交易日期"] = pd.to_datetime(fut_df["交易日期"])
    opt_df["交易日期"] = pd.to_datetime(opt_df["交易日期"])

    # 期权拆解:underlying(前6位) + opt_type(第7位) + strike(后面)
    opt_df["underlying"] = opt_df["合约"].astype(str).str.slice(0, 6)
    opt_df["opt_type"] = opt_df["合约"].astype(str).str[6]
    opt_df["strike"] = opt_df["合约"].astype(str).str[7:].astype(int)

    if start_date is not None:
        st = pd.to_datetime(start_date)
        fut_df = fut_df[fut_df["交易日期"] >= st].copy()
        opt_df = opt_df[opt_df["交易日期"] >= st].copy()

    if end_date is not None:
        ed = pd.to_datetime(end_date)
        fut_df = fut_df[fut_df["交易日期"] <= ed].copy()
        opt_df = opt_df[opt_df["交易日期"] <= ed].copy()

    fut_df.sort_values(["合约", "交易日期"], inplace=True)
    opt_df.sort_values(["underlying", "交易日期", "opt_type", "strike"], inplace=True)
    return fut_df, opt_df


# =====================
# 连续期货构造(按期权到期前N日共同换月)
# =====================
def _is_even_month(contract: str) -> bool:
    """AU合约 auYYMM,取最后两位月份,偶数月返回 True。"""
    try:
        return int(str(contract)[-2:]) % 2 == 0
    except Exception:
        return False


def compute_futures_meta(
    fut_df: pd.DataFrame,
    roll_days: int = ROLL_DAYS,
    opt_df: Optional[pd.DataFrame] = None,
    opt_roll_days_before_expiry: Optional[int] = OPT_ROLL_DAYS_BEFORE_EXPIRY,
) -> pd.DataFrame:
    """
    给每个期货合约计算:
    - start: 样本内首个交易日
    - end:   样本内最后交易日
    - roll_trigger: 换月触发日(交易日)
      若提供 opt_df,则优先用“期权到期前 N 日”作为共同换月触发日。

    注意:这里用“样本内该 underlying 的期权最后一个交易日”近似当作到期日,
          更严谨版本可改为接入交易所合约日历/最后交易日字段。
    """
    meta = fut_df.groupby("合约")["交易日期"].agg(["min", "max"]).rename(
        columns={"min": "start", "max": "end"}
    )
    global_max_date = pd.Timestamp(fut_df["交易日期"].max())

    # 1) 用期权样本推每个 underlying 的期权“到期日”与“共同换月日”
    opt_roll_map: Dict[str, pd.Timestamp] = {}
    if opt_df is not None and opt_roll_days_before_expiry is not None:
        for underlying, g_opt in opt_df.groupby("underlying"):
            opt_dates = np.sort(g_opt["交易日期"].unique())
            if len(opt_dates) == 0:
                continue

            expiry = pd.Timestamp(opt_dates[-1])  # 简化:样本内最后交易日视作到期日

            fut_dates = np.sort(
                fut_df.loc[fut_df["合约"] == underlying, "交易日期"].unique()
            )
            if len(fut_dates) == 0:
                continue

            # 确保 expiry 在期货交易日序列中;否则用交集最后一天
            if expiry not in fut_dates:
                common = np.intersect1d(fut_dates, opt_dates)
                if len(common) == 0:
                    continue
                expiry = pd.Timestamp(common[-1])

            idx_expiry = int(np.where(fut_dates == expiry)[0][0])
            roll_idx = max(0, idx_expiry - int(opt_roll_days_before_expiry))
            roll_date = pd.Timestamp(fut_dates[roll_idx])
            opt_roll_map[str(underlying)] = roll_date

    # 2) 合并:默认(期货自身到期前 roll_days) vs 期权驱动 roll_trigger
    roll_dict: Dict[str, pd.Timestamp] = {}
    for contract, g in fut_df.groupby("合约"):
        dates = np.sort(g["交易日期"].unique())
        if len(dates) == 0:
            continue

        end_date = pd.Timestamp(dates[-1])

        # 默认:期货样本内最后 roll_days 个交易日前换月
        if len(dates) > roll_days:
            default_roll = pd.Timestamp(dates[-roll_days])
        else:
            default_roll = pd.Timestamp(dates[0])

        # 样本尾部截断合约:不在样本内强制换月
        is_truncated = False
        if end_date == global_max_date:
            is_truncated = True
        if not is_truncated:
            try:
                c_month = int(str(contract)[-2:])
                if c_month != end_date.month:
                    is_truncated = True
            except Exception:
                pass

        if is_truncated:
            roll_dict[str(contract)] = end_date + pd.Timedelta(days=1)
            continue

        if str(contract) in opt_roll_map:
            roll_date = opt_roll_map[str(contract)]
            # 兜底:确保在生命周期内
            if roll_date < pd.Timestamp(dates[0]):
                roll_date = pd.Timestamp(dates[0])
            if roll_date > end_date:
                roll_date = default_roll
        else:
            roll_date = default_roll

        roll_dict[str(contract)] = roll_date

    meta["roll_trigger"] = meta.index.map(lambda x: roll_dict.get(str(x), pd.NaT))
    meta.sort_values("end", inplace=True)
    return meta


def build_continuous_futures(fut_df: pd.DataFrame, fut_meta: pd.DataFrame) -> pd.DataFrame:
    """
    按 roll_trigger 构建主力连续:
    - 只用偶数月合约
    - 每个交易日选择“end 最早”的那条(近月)
    - 只保留 交易日期 < roll_trigger(进入换月窗口前不再使用该合约)
    """
    tmp = fut_df.merge(
        fut_meta[["end", "roll_trigger"]],
        left_on="合约",
        right_index=True,
        how="left",
    )

    tmp = tmp[tmp["合约"].apply(_is_even_month)].copy()
    tmp = tmp[tmp["交易日期"] < tmp["roll_trigger"]].copy()
    tmp.sort_values(["交易日期", "end"], inplace=True)

    main = tmp.groupby("交易日期", as_index=False).head(1)

    cont = (
        main.sort_values("交易日期")
        .set_index("交易日期")
        .rename(columns={"合约": "fut_contract", "结算价": "fut_settle"})
    )
    cont = cont[["fut_contract", "fut_settle"]].copy()

    cont["fut_pnl"] = cont["fut_settle"].diff().fillna(0.0) * FUT_MULT
    cont["contract_change"] = cont["fut_contract"] != cont["fut_contract"].shift(1)
    cont["cycle_id"] = cont["contract_change"].cumsum()

    return cont


# =====================
# 行权价选择:往上最近一档
# =====================
def choose_strike_upward(
    date: pd.Timestamp,
    underlying: str,
    opt_type: str,
    target_strike: float,
    opt_prices: pd.Series,
) -> Optional[int]:
    """
    在某个交易日,从挂牌行权价里选“>= target_strike 的最小一档”;
    若都小于 target_strike,则选最大行权价(避免空缺)。
    """
    try:
        series_on_date = opt_prices.loc[date]
    except KeyError:
        return None

    try:
        strikes_series = series_on_date.xs((underlying, opt_type))
    except KeyError:
        return None

    strikes = np.array(strikes_series.index.values, dtype=float)
    if strikes.size == 0:
        return None

    # 往上最近
    ge = strikes[strikes >= float(target_strike) - 1e-12]
    if ge.size > 0:
        return int(ge.min())
    # 没有更高的,只能用最大一档
    return int(strikes.max())


# =====================
# 策略:Covered Call 3%(PnL 分解版)
# =====================
def build_covered_call_components(
    cont: pd.DataFrame,
    opt_df: pd.DataFrame,
    pct: float = 0.03,
    qty_fut: float = 1.0,
    qty_short_call: float = -1.0,
) -> pd.DataFrame:
    """
    返回 Covered Call 的逐日分解(index 与 cont 对齐):

    - fut_pnl:                期货腿 PnL
    - call_pnl:               卖出看涨期权腿 PnL(盯市)
    - call_pnl_intrinsic:     内在价值变化导致的 PnL(近似 delta/gamma)
    - call_pnl_time:          时间价值变化导致的 PnL(近似 theta/vega)
    - total_pnl:              合计 PnL

    辅助列(用于排查与归因):
    - call_active:            当日是否持有 short call
    - call_strike/call_price/call_intrinsic/call_time_value
    """
    opt_prices = (
        opt_df.set_index(["交易日期", "underlying", "opt_type", "strike"])["结算价"]
        .sort_index()
    )

    out = pd.DataFrame(index=cont.index)
    out["fut_contract"] = cont["fut_contract"].astype(str)
    out["fut_settle"] = cont["fut_settle"].astype(float)

    out["fut_pnl"] = cont["fut_pnl"].astype(float) * float(qty_fut)

    # 信息列
    out["call_active"] = False
    out["call_strike"] = np.nan
    out["call_price"] = np.nan
    out["call_intrinsic"] = np.nan
    out["call_time_value"] = np.nan

    # PnL 分解列(先置 0)
    out["call_pnl"] = 0.0
    out["call_pnl_intrinsic"] = 0.0
    out["call_pnl_time"] = 0.0

    for _, g in cont.groupby("cycle_id"):
        underlying = str(g["fut_contract"].iloc[0])
        dates = g.index
        if len(dates) < 2:
            continue

        # 选合约的日期:优先用 cycle 首日;若首日没有期权数据,则向后找第一天有数据
        select_date: Optional[pd.Timestamp] = None
        K: Optional[int] = None

        for d in dates:
            S_try = float(g.loc[d, "fut_settle"])
            target = S_try * (1.0 + float(pct))
            k_try = choose_strike_upward(d, underlying, "C", target, opt_prices)
            if k_try is None:
                continue
            # 确保当天该 strike 有结算价
            if (d, underlying, "C", int(k_try)) not in opt_prices.index:
                continue
            select_date = d
            K = int(k_try)
            break

        if select_date is None or K is None:
            continue

        # 取该 strike 的整个序列,并截取本 cycle(从 select_date 开始)
        try:
            series_all = opt_prices.xs(
                (underlying, "C", K),
                level=("underlying", "opt_type", "strike"),
            )
        except KeyError:
            continue

        active_dates = dates[dates >= select_date]
        if len(active_dates) == 0:
            continue

        # 对齐到 cycle 的日期(从 select_date 起),缺失则前向填充
        series_seg = series_all.reindex(active_dates).ffill()
        if pd.isna(series_seg.iloc[0]) or series_seg.isna().all():
            continue

        # 内在价值(基于期货结算价)
        fut_seg = cont.loc[active_dates, "fut_settle"].astype(float)
        intrinsic = np.maximum(fut_seg - float(K), 0.0)

        # 为避免因数据缺失 ffill 造成的“期权价 < 内在价值”问题,做一个下限修正
        call_price = np.maximum(series_seg.astype(float).values, intrinsic.values)
        call_price = pd.Series(call_price, index=active_dates)
        time_value = call_price - intrinsic

        # 日度盯市分解
        dp = call_price.diff().fillna(0.0)
        di = intrinsic.diff().fillna(0.0)
        dt = time_value.diff().fillna(0.0)

        pnl_total = dp * OPT_MULT * float(qty_short_call)
        pnl_intr = di * OPT_MULT * float(qty_short_call)
        pnl_time = dt * OPT_MULT * float(qty_short_call)

        # 写回
        out.loc[active_dates, "call_active"] = True
        out.loc[active_dates, "call_strike"] = float(K)
        out.loc[active_dates, "call_price"] = call_price
        out.loc[active_dates, "call_intrinsic"] = intrinsic
        out.loc[active_dates, "call_time_value"] = time_value

        out.loc[pnl_total.index, "call_pnl"] += pnl_total
        out.loc[pnl_intr.index, "call_pnl_intrinsic"] += pnl_intr
        out.loc[pnl_time.index, "call_pnl_time"] += pnl_time

    out["total_pnl"] = out["fut_pnl"] + out["call_pnl"]
    return out


# =====================
# 绩效统计(收益 + 风险)
# =====================
@dataclass
class Performance:
    total_return: float
    annual_return: float
    annual_vol: float
    sharpe: float
    max_drawdown: float


def calc_nav(pnl: pd.Series, init_capital: float) -> pd.Series:
    return init_capital + pnl.cumsum()


def calc_performance(pnl: pd.Series, init_capital: float) -> Performance:
    """
    日度PnL -> 总收益/年化收益/年化波动/Sharpe/最大回撤
    口径尽量保持与你原框架一致(用 nav 的 pct_change 估算年化收益)。
    """
    nav = calc_nav(pnl, init_capital)
    daily_ret = nav.pct_change().dropna()
    if daily_ret.empty:
        return Performance(np.nan, np.nan, np.nan, np.nan, np.nan)

    mean_ret = float(daily_ret.mean())
    vol = float(daily_ret.std())

    annual_return = (1.0 + mean_ret) ** TRADING_DAYS_PER_YEAR - 1.0
    annual_vol = vol * np.sqrt(TRADING_DAYS_PER_YEAR)
    sharpe = annual_return / annual_vol if annual_vol > 0 else np.nan

    cum_max = nav.cummax()
    dd = nav / cum_max - 1.0
    max_dd = float(dd.min())

    total_return = float(nav.iloc[-1] / init_capital - 1.0)
    return Performance(float(total_return), float(annual_return), float(annual_vol), float(sharpe), float(max_dd))


def performance_table(pnl_df: pd.DataFrame, init_capital: float) -> pd.DataFrame:
    """批量计算绩效,返回 DataFrame 方便打印/导出。"""
    rows = []
    for col in pnl_df.columns:
        perf = calc_performance(pnl_df[col].astype(float), init_capital=init_capital)
        rows.append(
            {
                "name": col,
                "total_return": perf.total_return,
                "annual_return": perf.annual_return,
                "annual_vol": perf.annual_vol,
                "sharpe": perf.sharpe,
                "max_drawdown": perf.max_drawdown,
            }
        )
    return pd.DataFrame(rows).set_index("name")


def yearly_return_and_maxdd(pnl: pd.Series, init_capital: float) -> pd.DataFrame:
    """每年收益率 & 每年最大回撤(以跨年连续净值为基础,不重置资金)"""
    nav = calc_nav(pnl, init_capital)
    nav_prev = nav.shift(1).fillna(init_capital)

    years = sorted(nav.index.year.unique())
    rows = []
    for y in years:
        mask = nav.index.year == y
        nav_y = nav.loc[mask]
        if nav_y.empty:
            continue

        first_day = nav_y.index[0]
        last_day = nav_y.index[-1]

        start_nav = float(nav_prev.loc[first_day])  # 上一交易日净值(年初起点)
        end_nav = float(nav.loc[last_day])
        year_ret = end_nav / start_nav - 1.0

        # 年内最大回撤(考虑 start_nav 作为年初高水位起点)
        peaks = np.maximum.accumulate(np.concatenate([[start_nav], nav_y.values]))
        dd = nav_y.values / peaks[1:] - 1.0
        max_dd = float(np.min(dd))

        rows.append({"year": int(y), "year_return": float(year_ret), "max_drawdown": float(max_dd)})

    return pd.DataFrame(rows).set_index("year")


# =====================
# 收益贡献与风险贡献(拆解)
# =====================
def pnl_contribution(pnl_df: pd.DataFrame, total_col: str, component_cols: List[str]) -> pd.DataFrame:
    """
    收益贡献:用“累计 PnL(=日度PnL求和)”做分解。
    注意:累计PnL可加,但年化收益率不可加,所以这里用累计PnL做贡献更直观。
    """
    total_pnl = float(pnl_df[total_col].sum())
    rows = []
    for c in component_cols:
        cum = float(pnl_df[c].sum())
        rows.append(
            {
                "component": c,
                "cum_pnl": cum,
                "pct_of_total_pnl": (cum / total_pnl) if total_pnl != 0 else np.nan,
            }
        )
    return pd.DataFrame(rows).set_index("component")


def component_returns_by_total_nav(pnl_df: pd.DataFrame, total_col: str, init_capital: float) -> pd.DataFrame:
    """
    将日度PnL转换成“以组合净值为分母”的日收益,用于风险贡献:
      r_i(t) = pnl_i(t) / NAV_total(t-1)
    这样保证:r_total = Σ r_i(当 pnl_total = Σ pnl_i)。
    """
    nav_total = init_capital + pnl_df[total_col].cumsum()
    nav_prev = nav_total.shift(1).fillna(init_capital)
    rets = pnl_df.div(nav_prev, axis=0)
    rets = rets.replace([np.inf, -np.inf], np.nan).fillna(0.0)
    return rets


def variance_risk_contribution(rets: pd.DataFrame, total_col: str) -> pd.DataFrame:
    """
    风险贡献(方差分解):
      Var(r_total) = Σ Cov(r_i, r_total)

    输出:
    - cov_with_total: Cov(r_i, r_total)
    - var_contrib_pct: 对总方差的贡献占比(可为负)
    - vol_contrib: 对总波动率的贡献(Σ vol_contrib = vol_total)
    - corr_with_total: Corr(r_i, r_total)
    """
    r_p = rets[total_col].astype(float)
    var_p = float(r_p.var(ddof=1))
    vol_p = float(np.sqrt(var_p)) if var_p > 0 else np.nan

    rows = []
    for col in rets.columns:
        if col == total_col:
            continue
        r_i = rets[col].astype(float)

        cov_i = float(r_i.cov(r_p))
        corr = float(r_i.corr(r_p)) if (r_i.std(ddof=1) > 0 and r_p.std(ddof=1) > 0) else np.nan

        var_contrib = cov_i
        var_pct = var_contrib / var_p if var_p > 0 else np.nan
        vol_contrib = var_contrib / vol_p if (vol_p is not None and vol_p > 0) else np.nan

        rows.append(
            {
                "component": col,
                "cov_with_total": cov_i,
                "corr_with_total": corr,
                "var_contrib": var_contrib,
                "var_contrib_pct": var_pct,
                "vol_contrib": vol_contrib,
                "vol_total": vol_p,
            }
        )
    return pd.DataFrame(rows).set_index("component")


# =====================
# 回测主函数(两策略 + 备兑分解)
# =====================
def backtest_au_two_strategies_with_breakdown(excel_path: str) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    fut_df, opt_df = load_shfe_au_excel(excel_path, start_date=BACKTEST_START_DATE)

    fut_meta = compute_futures_meta(
        fut_df,
        roll_days=ROLL_DAYS,
        opt_df=opt_df,
        opt_roll_days_before_expiry=OPT_ROLL_DAYS_BEFORE_EXPIRY,
    )
    cont = build_continuous_futures(fut_df, fut_meta)

    init_capital = float(cont["fut_settle"].iloc[0]) * FUT_MULT

    # S1:期货持有
    s1 = cont["fut_pnl"].copy().astype(float)
    s1.name = "FuturesOnly"

    # S2:Covered Call 3%(含拆解)
    cc = build_covered_call_components(cont, opt_df, pct=0.03, qty_fut=1.0, qty_short_call=-1.0)

    # 汇总到一个 pnl_df 里,便于统一统计
    pnl_df = pd.DataFrame(index=cont.index)
    pnl_df["FuturesOnly"] = s1
    pnl_df["CoveredCall_3pct"] = cc["total_pnl"].astype(float)

    # 备兑拆解:腿/子项
    pnl_df["CC_futures_leg"] = cc["fut_pnl"].astype(float)
    pnl_df["CC_short_call"] = cc["call_pnl"].astype(float)
    pnl_df["CC_short_call_intrinsic"] = cc["call_pnl_intrinsic"].astype(float)
    pnl_df["CC_short_call_time"] = cc["call_pnl_time"].astype(float)

    pnl_df.attrs["init_capital"] = init_capital

    # info_df:保留每日持仓信息,方便你后续做更细归因(例如按cycle统计)
    info_df = cc[
        [
            "fut_contract",
            "fut_settle",
            "call_active",
            "call_strike",
            "call_price",
            "call_intrinsic",
            "call_time_value",
        ]
    ].copy()

    return pnl_df, info_df, cont


# =====================
# CLI 入口
# =====================
if __name__ == "__main__":
    # 改成你本机 Excel 路径
    excel_path = r"D:\黄金期权\黄金期货期权行情_分sheet合并_全量日度.xlsx"

    pnl_df, info_df, cont = backtest_au_two_strategies_with_breakdown(excel_path)
    init_capital = float(pnl_df.attrs["init_capital"])

    print("策略列:", list(pnl_df.columns))
    print("INIT_CAPITAL =", init_capital)

    # -----------------
    # 1) 总体绩效(策略层)
    # -----------------
    strat_cols = ["FuturesOnly", "CoveredCall_3pct"]
    perf_strat = performance_table(pnl_df[strat_cols], init_capital=init_capital)

    print("\n=== 策略总体绩效 ===")
    with pd.option_context("display.float_format", "{:0.4%}".format):
        print(perf_strat[["total_return", "annual_return", "annual_vol", "sharpe", "max_drawdown"]])

    # -----------------
    # 2) 备兑分解:每个细项的“收益/风险”
    # -----------------
    cc_cols = ["CC_futures_leg", "CC_short_call", "CC_short_call_intrinsic", "CC_short_call_time"]
    perf_cc = performance_table(pnl_df[cc_cols], init_capital=init_capital)

    print("\n=== Covered Call 细项绩效(把收益/风险拆开看) ===")
    with pd.option_context("display.float_format", "{:0.4%}".format):
        print(perf_cc[["total_return", "annual_return", "annual_vol", "sharpe", "max_drawdown"]])

    # -----------------
    # 3) Covered Call 收益贡献(累计PnL)
    # -----------------
    cc_pnl_breakdown = pnl_contribution(
        pnl_df,
        total_col="CoveredCall_3pct",
        component_cols=["CC_futures_leg", "CC_short_call"],
    )

    call_sub_breakdown = pnl_contribution(
        pnl_df,
        total_col="CC_short_call",
        component_cols=["CC_short_call_intrinsic", "CC_short_call_time"],
    )

    print("\n=== Covered Call 收益贡献(累计PnL分解) ===")
    with pd.option_context("display.float_format", "{:0.4%}".format):
        print(cc_pnl_breakdown)

    print("\n=== 卖出Call 收益贡献(intrinsic vs time) ===")
    with pd.option_context("display.float_format", "{:0.4%}".format):
        print(call_sub_breakdown)

    # -----------------
    # 4) Covered Call 风险贡献(方差/波动分解)
    # -----------------
    # 4.1 futures vs call
    pnl_components = pnl_df[["CC_futures_leg", "CC_short_call", "CoveredCall_3pct"]].copy()
    rets = component_returns_by_total_nav(pnl_components, total_col="CoveredCall_3pct", init_capital=init_capital)
    risk_contrib = variance_risk_contribution(rets, total_col="CoveredCall_3pct")

    print("\n=== Covered Call 风险贡献(futures vs call) ===")
    with pd.option_context("display.float_format", "{:0.6f}".format):
        print(risk_contrib[["cov_with_total", "corr_with_total", "var_contrib_pct", "vol_contrib", "vol_total"]])

    # 4.2 futures vs intrinsic vs time(更细)
    pnl_components2 = pnl_df[
        ["CC_futures_leg", "CC_short_call_intrinsic", "CC_short_call_time", "CoveredCall_3pct"]
    ].copy()
    rets2 = component_returns_by_total_nav(pnl_components2, total_col="CoveredCall_3pct", init_capital=init_capital)
    risk_contrib2 = variance_risk_contribution(rets2, total_col="CoveredCall_3pct")

    print("\n=== Covered Call 风险贡献(futures vs intrinsic vs time) ===")
    with pd.option_context("display.float_format", "{:0.6f}".format):
        print(risk_contrib2[["cov_with_total", "corr_with_total", "var_contrib_pct", "vol_contrib", "vol_total"]])

    # -----------------
    # 5) 年度收益/回撤对比(策略层,跨年连续净值口径)
    # -----------------
    print("\n=== 年度收益率 & 最大回撤(跨年连续净值口径)===\n")
    yearly = []
    for col in strat_cols:
        df_y = yearly_return_and_maxdd(pnl_df[col], init_capital=init_capital)
        df_y.columns = [f"{col}_ret", f"{col}_maxdd"]
        yearly.append(df_y)

    yearly_df = pd.concat(yearly, axis=1)
    with pd.option_context("display.float_format", "{:0.4%}".format):
        print(yearly_df)

    # -----------------
    # 如需导出结果(可选)
    # -----------------
    # pnl_df.to_csv("au_backtest_pnl_breakdown.csv", encoding="utf-8-sig")
    # info_df.to_csv("au_cc_position_info.csv", encoding="utf-8-sig")

结果:

策略列 ['FuturesOnly', 'CoveredCall_3pct', 'CC_futures_leg', 'CC_short_call', 'CC_short_call_intrinsic', 'CC_short_call_time']
INIT_CAPITAL = 371900.0

=== 策略总体绩效 ===
                  total_return  annual_return  ...    sharpe  max_drawdown
name                                           ...                        
FuturesOnly          146.9212%       28.8453%  ... 240.2228%     -10.3087%
CoveredCall_3pct     126.9266%       25.2676%  ... 395.4176%      -5.6857%

[2 rows x 5 columns]

=== Covered Call 细项绩效把收益/风险拆开看 ===
                         total_return  annual_return  ...    sharpe  max_drawdown
name                                                  ...                        
CC_futures_leg              146.9212%       28.8453%  ... 240.2228%     -10.3087%
CC_short_call               -19.9946%       -4.3189%  ... -23.4895%     -44.1758%
CC_short_call_intrinsic     -56.3216%      -14.2812%  ... -36.4624%     -75.9882%
CC_short_call_time           36.3270%        8.9520%  ... 174.2273%      -4.0390%

[4 rows x 5 columns]

=== Covered Call 收益贡献累计PnL分解 ===
                      cum_pnl  pct_of_total_pnl
component                                      
CC_futures_leg 54640000.0000%         115.7529%
CC_short_call  -7436000.0000%         -15.7529%

=== 卖出Call 收益贡献intrinsic vs time ===
                                cum_pnl  pct_of_total_pnl
component                                                
CC_short_call_intrinsic -20946000.0000%         281.6837%
CC_short_call_time       13510000.0000%        -181.6837%

=== Covered Call 风险贡献futures vs call ===
                cov_with_total  corr_with_total  ...  vol_contrib  vol_total
component                                        ...                        
CC_futures_leg        0.000026         0.823272  ...     0.006361   0.004023
CC_short_call        -0.000009        -0.470401  ...    -0.002338   0.004023

[2 rows x 5 columns]

=== Covered Call 风险贡献futures vs intrinsic vs time ===
                         cov_with_total  ...  vol_total
component                                ...           
CC_futures_leg                 0.000026  ...   0.004023
CC_short_call_intrinsic       -0.000006  ...   0.004023
CC_short_call_time            -0.000003  ...   0.004023

[3 rows x 5 columns]

=== 年度收益率 & 最大回撤跨年连续净值口径===

      FuturesOnly_ret  ...  CoveredCall_3pct_maxdd
year                   ...                        
2022         10.0834%  ...                -5.6857%
2023         17.5916%  ...                -4.0484%
2024         28.0960%  ...                -3.4639%
2025         48.9103%  ...                -1.6770%

[4 rows x 4 columns]

Process finished with exit code 0

文档信息

Table of Contents