Skip to content

Latest commit

 

History

History
303 lines (236 loc) · 74.1 KB

README.md

File metadata and controls

303 lines (236 loc) · 74.1 KB

文件构成 (!!!Engish version see the "README_en.md" file in this project!!!)

English comments can be seen in the files

main.py

功用:

主程序文件,负责读取数据、运行BSM和Heston模型、计算误差率、并进行可视化展示。

依赖关系:

依赖BSM_Model和Heston_Model类来计算期权价格。 依赖pandas和matplotlib库进行数据处理和可视化。

重要部分:

数据读取和预处理。 运行BSM模型和Heston模型,计算误差率。 可视化误差率随时间的变化和两个模型的平均误差率比较。

bestpara.py

功用:

利用模拟退火算法寻找Heston模型的最优参数,使市场价格和模型价格的均方误差最小化。

依赖关系:

依赖Heston_Model类来计算期权价格。 依赖pandas库进行数据处理。

重要部分:

random_range函数:在一定范围内随机调整参数。 NG类:实现模拟退火算法的核心逻辑。 NGHeston类:继承自NG类,专门用于Heston模型参数优化。 SV_SA类:封装了模拟退火算法的具体实现。 getBestPara函数:读取数据并调用模拟退火算法寻找最优参数。

Heston_Model.py

功用:

定义Heston模型类,用于计算期权价格。

依赖关系:

依赖numpy和scipy.integrate库进行数学计算。

重要部分:

Heston_Model类:包含初始化方法和计算期权价格的核心方法。 characteristic_function方法:计算特征函数。 integral_function方法:计算积分部分。 P_Value方法:计算p值。 Call_Value方法:计算看涨期权价格。

BSM_Model.py

功用:

定义BSM模型类,用于计算期权价格。

依赖关系:

依赖scipy.stats和numpy库进行数学计算。

重要部分:

BSM_Model类:包含初始化方法和计算期权价格的核心方法。 d1和d2方法:计算中间变量d1和d2。 price方法:计算看涨或看跌期权价格。 互相的依赖关系 main.py依赖BSM_Model.py和Heston_Model.py来计算期权价格。 bestpara.py依赖Heston_Model.py来计算期权价格,并通过模拟退火算法优化Heston模型的参数。

运行方法

BSM模型仅需要期权本身参数,可直接调用BSM_Model.py

Heston模型除了需要期权本身参数外,还需要拟合参数,该拟合参数由bestpara.py得到。先运行bestpara.py得到较优数据(即当前历史最优解)后,再运行main.py进行测试和绘图。

image-20240823161646743

关于数据:16年至20年上证50ETF期权数据。在这里使用了2020-6-12至2020-9-12的数据用于训练拟合参数;2020-9-12至2020-12-12的数据为测试数据。

关于训练拟合参数:论文在1w多次循环后才得到结果,我这边跑了160个循环,循环越多理论上效果更好。

image-20240823161050555

关于可视化绘图:

  • BSM模型与Heston模型的平均误差率随时间变化图(折线图)
  • BSM模型与Heston模型的总平均误差率比较(柱状图)

复杂文件bestpara.py的构成

  1. 定义辅助函数
    • random_range(x, a, b):对输入的 x 增加一个随机变动量,并确保其在 [a, b] 范围内。
  2. 定义模拟退火算法类 NG
    • __init__(self, func, x0):初始化算法的参数,包括初始解 x0、目标函数 func、温度参数等。
    • T_change(self):温度下降函数。
    • save_xy(self)save_best_xy(self):保存当前解和最优解。
    • __p_delta(self, alpha)__find_solver(self, func, f0):计算接受新解的概率和用二分法寻找方程解。
    • find_alpha(self):寻找调节概率因子 alpha
    • get_x_new(self):生成新的解。
    • judge(self):判断是否接受新的解。
    • get_history_best_xy(self):获取历史最优解。
    • plot_best(self):绘制最优值变化过程。
    • count_times_delta_smaller(self):统计新旧函数值之差的绝对值连续小于某值的次数。
    • condition_end(self):判断是否满足终止条件。
    • run_T(self):在某一特定温度下进行循环。
    • accept_best_xf(self):在每个温度下的循环结束时,有一定概率将当前接受的新解替换为历史最优解。
    • run(self):运行模拟退火算法。
  3. 定义继承自 NGNGHeston
    • __init__(self, func, x0):初始化参数,设置特定的温度参数。
    • get_x_new(self):生成新的解,并对 Heston 模型的参数进行范围限制。
  4. 定义 SV_SA
    • __init__(self, data, v0, kappa, theta, sigma, rho):初始化数据和初始参数。
    • error_mean_percent(self, init_params):计算 Heston 模型期权定价的百分比误差均值。
    • error_mean(self, init_params):计算 Heston 模型期权定价的均方误差。
    • test_error_mean(self, multiple_parmas):计算多组初始参数的均方误差。
    • test_option_price(self, multiple_parmas):计算多组期权数据和初始参数的期权价格。
    • sa(self):对均方误差函数用模拟退火算法计算最优值。
  5. 定义主函数 getBestPara()
    • 读取期权数据,选择指定日期和类型的数据。
    • 建立 SV_SA 类并开始训练模型,寻找最优解。
    • 返回最优解。

在该文件中涉及到一些复杂函数。

run 函数

代码

def run(self):
    while self.T > self.T_min:
        self.run_T()  # 循环在该温度下的求解
        self.xf_best_T[self.T] = [
            self.get_history_best_xy()
        ]  # 记录在每一个温度下的最优解
        self.T_change()  # 温度继续下降
        self.accept_best_xf()  # 当每个温度下的循环结束时,有一定概率将当前接受的新解替换为历史最优解
        if self.condition_end() == True:  # 如果满足终止条件,终止该温度循环
            break

解释

run 函数是模拟退火算法的核心循环。它通过不断降低温度来寻找最优解。具体步骤如下:

  1. 温度循环:当当前温度 T 大于最小温度 T_min 时,进入循环。
  2. 在当前温度下求解:调用 run_T 函数,在当前温度下进行求解。
  3. 记录最优解:将当前温度下的最优解记录在 xf_best_T 字典中。
  4. 温度下降:调用 T_change 函数,降低温度。
  5. 接受最优解:调用 accept_best_xf 函数,有一定概率将当前接受的新解替换为历史最优解。
  6. 终止条件:如果满足终止条件(调用 condition_end 函数),则跳出循环。

run_T 函数

代码

def run_T(self):
    for time_ in range(self.times_max):
        self.time_ = time_
        self.x_new = self.get_x_new()  # 获得新解
        self.f_new = self.func(self.x_new)  # 获得新的函数值
        self.save_xy()  # 将新解和函数值记录下来
        self.delta = self.f_new - self.f  # 计算函数值的变化值
        self.judge()  # 判断是否接受新解
        self.times_cycle += 1  # 统计循环次数
        self.delta_best = np.abs(
            self.f - self.f_last
        )  # 上次函数值与这次函数值的差值绝对值
        self.count_times_delta_smaller()  # 统计新旧函数值之差的绝对值连续小于此值的次数
        if self.condition_end() == True:  # 如果满足终止条件,终止该温度循环
            print(
                "满足终止条件:接受新解后的函数值变化连续小于{}达到次数".format(
                    self.delta_min
                )
            )
            break
        print(
            "当前历史最优解{}:{}".format(self.f_best, self.x_best)
        )  # 展示当前最优值
        print("当前接受的新解{}:{}".format(self.f, self.x))  # 展示当前接受的新解
        print("当前新解{}:{}".format(self.f_new, self.x_new))  # 展示当前新产生的解
        print("当前温度为{}".format(self.T))  # 展示当前温度

解释

run_T 函数在特定温度下进行循环求解。具体步骤如下:

  1. 循环次数:在每个温度下,循环 times_max 次。
  2. 获得新解:调用 get_x_new 函数,生成新的解 x_new
  3. 计算新函数值:使用目标函数 func 计算新的函数值 f_new
  4. 记录新解和函数值:调用 save_xy 函数,将新解和函数值记录下来。
  5. 计算函数值变化:计算新旧函数值的变化 delta
  6. 判断是否接受新解:调用 judge 函数,判断是否接受新解。
  7. 统计循环次数:增加循环次数 times_cycle
  8. 计算函数值差值:计算上次函数值与这次函数值的差值绝对值 delta_best
  9. 统计差值次数:调用 count_times_delta_smaller 函数,统计新旧函数值之差的绝对值连续小于某值的次数。
  10. 终止条件:如果满足终止条件(调用 condition_end 函数),则跳出循环。
  11. 打印信息:打印当前最优解、接受的新解、新产生的解和当前温度。

judge 函数

代码

def judge(self):
    if self.delta < 0:  # 如果函数值变动幅度小于0,则接受新解
        self.x = self.x_new
        self.f_last = self.f  # 在最优解函数值更新之前将其记录下来
        self.f = self.f_new
        self.save_best_xy()  # 记录每次循环接受的新解
        self.get_history_best_xy()  # 更新历史最优解
        self.times_stay = 0  # 由于未接受新解,将连续未接受新解的次数归零
        print(
            "由于函数值变小新接受解{}:{}".format(self.f, self.x)
        )  # 展示当前接受的新解
    else:
        p = np.exp(-self.delta / (self.T * self.alpha))  # 接受新解的概率
        p_ = np.random.random()  # 判断标准概率
        if p > p_:  # 如果概率足够大,接受新解
            self.x = self.x_new
            self.f_last = self.f  # 在接受的新解更新之前将其记录下来
            self.f = self.f_new
            self.save_best_xy()  # 记录每次循环接受的新解
            self.get_history_best_xy()  # 更新历史最优解
            print(
                "由于概率{}大于{},新接受解{}:{}".format(p, p_, self.f, self.x)
            )  # 展示当前接受的新解
            self.times_p += 1  # 统计因为概率而接受新解的次数
            self.times_stay = 0  # 由于未接受新解,将连续未接受新解的次数归零
        else:
            if self.time_ == 0:
                self.f_last = self.f  # 在接受的新解更新之前将其记录下来
            self.times_stay += 1  # 连续接受新解次数加1
            print("连续未接受新解{}次".format(self.times_stay))

解释

judge 函数用于判断是否接受新的解 x_new。具体步骤如下:

  1. 函数值变小:如果新函数值 f_new 小于当前函数值 f,则直接接受新解。
    • 更新当前解 x 和函数值 f
    • 记录最优解。
    • 更新历史最优解。
    • 将连续未接受新解的次数 times_stay 归零。
    • 打印接受的新解信息。
  2. 函数值变大:如果新函数值 f_new 大于当前函数值 f,则根据概率判断是否接受新解。
    • 计算接受新解的概率 p
    • 生成一个随机概率 p_
    • 如果 p 大于 p_,则接受新解。
      • 更新当前解 x 和函数值 f
      • 记录最优解。
      • 更新历史最优解。
      • 打印接受的新解信息。
      • 统计因为概率而接受新解的次数 times_p
      • 将连续未接受新解的次数 times_stay 归零。
    • 如果 p 小于 p_,则不接受新解。
      • 如果是第一次循环,将当前函数值 f 记录为 f_last
      • 增加连续未接受新解的次数 times_stay
      • 打印未接受新解的次数。

get_history_best_xy 函数

代码

def get_history_best_xy(self):
    x_array = list(
        np.array(list(self.xf_best_all.values()),dtype=object)[:, 0]
    )  # 从历史所有的最优x和f中获得所有的x
    f_array = list(
        np.array(list(self.xf_best_all.values()),dtype=object)[:, 1]
    )  # 从历史所有的最优x和f中获得所有的f
    self.f_best = min(f_array)  # 从每阶段最优的f中获得最优的f
    self.x_best = x_array[f_array.index(self.f_best)]  # 利用最优f反推最优x
    return self.x_best, self.f_best

解释

get_history_best_xy 函数用于获取历史最优解。具体步骤如下:

  1. 获取所有历史最优解:从 xf_best_all 字典中提取所有的 xf
  2. 找到最优函数值:从所有的 f 中找到最小的 f_best
  3. 找到对应的最优解:根据最优函数值 f_best 找到对应的 x_best
  4. 返回最优解:返回最优解 x_best 和最优函数值 f_best

getdata.py

** 功用:**

可用于用户自己改造,以便于用户从数据库(Tushare)等获得自己所需的数据以进行研究或应用。

重要通知

运行 main.py 可能需要一些时间,因为受计算机硬件条件的影响,模拟退火算法非常耗时。此外,模拟退火算法通常需要较多的迭代次数,但由于计算机硬件条件的限制,我迭代的次数较少,这可能会对结果产生一些影响。