0%

postgreSQL

库表设计

数据库:cryptoDB
行情表:kline_1m 分区一:日 分区二:[标的名哈希,10]
# ======================================
_date date
_symbol varchar(30)
_timestamp timestamp
_open float8
_high float8
_low float8
_close float8
_vol float8
# ======================================

数据库:factorDB
日频因子表:fator_1d 分区一:年 分区二:[标的名哈希,10]
因子列表:[市值 marketcap]
# ======================================
_year int
_factor varchar(30)
_timestamp timestamp
_symbol varchar(30)
_val float8
# ======================================

数据库:factorDB
5分钟频因子表:fator_5m 分区一:日 分区二:[标的名哈希,10]
因子列表:[全局多空比 global_lsr,大户多空比 top_lsr,大户多空持仓比 top_lsr_position,持仓量 open_interest 资费 fundingrate]
# ======================================
_date date
_factor varchar(30)
_timestamp timestamp
_symbol varchar(30)
_val float8
# ======================================


数据库:optionDB
表:option_data 分区一:日 分区二:[标的名哈希,25]
# ======================================
_date date
_symbol varchar(30)
_timestamp timestamp
_type varchar(10)
_strike_price float8
_expiration timestamp
_open_interest float8
_last_price float8
_bid_price float8
_bid_amount float8
_bid_iv float8
_ask_price float8
_ask_amount float8
_ask_iv float8
_mark_price float8
_mark_iv float8
_underlying_index varchar(30)
_underlying_price float8
_delta float8
_gamma float8
_vega float8
_theta float8
_rho float8
# ======================================
exchange,symbol,timestamp,local_timestamp,type,strike_price,expiration,open_interest,last_price,bid_price,bid_amount,bid_iv,ask_price,ask_amount,ask_iv,mark_price,mark_iv,underlying_index,underlying_price,delta,gamma,vega,theta,rho
deribit,BTC-27SEP24-160000-P,1704067200027000,1704067200029840,put,160000,1727424000000000,0,2.45,,,,,,,2.4467,75.18,BTC-27SEP24,46593.7,-0.94333,0,45.67483,-6.35109,-1169.77956

接口设计

查询kline

_conn_pool = connector.init_conn_pool()
_list = connector.execute_sql(_conn_pool, """select * from kline_1m where _symbol='BTC/USDT:USDT'""")
df = pd.DataFrame(_list, columns=['_date', '_symbol', '_timestamp', '_open', '_high', '_low', '_close', '_vol'])

查询5分钟因子

_conn_pool = connector.init_conn_pool("factordb")
_list = connector.execute_sql(_conn_pool, """select * from factor_5m where _symbol='BTC/USDT:USDT' order by _timestamp""")
df = pd.DataFrame(_list, columns=['_date', '_factor', '_timestamp', '_symbol', '_val',])

PS命令行

连接阿里云数据库:
ssh Administrator@47.74.5.152

拷贝文件至本地:
scp -r Administrator@47.74.5.152:C:/Users/Administrator\Desktop\crypto_database\U PERP D:\BackUps\kLines
scp -r Administrator@47.74.5.152:"/C:/Users/Administrator/Desktop/crypto_database/U PERP/" "D:/BackUps/kLines/"
登录:
psql -h <主机名> -p <端口号> -U <用户名> -d <数据库名>
psql -h localhost -p 5432 -U postgres -d cryptoDB

一般性
\bind [PARAM]... set query parameters
\copyright 显示PostgreSQL的使用和发行许可条款
\crosstabview [COLUMNS] execute query and display result in crosstab
\errverbose 以最冗长的形式显示最近的错误消息
\g [(OPTIONS)] [FILE] execute query (and send result to file or |pipe);
\g with no arguments is equivalent to a semicolon
\gdesc 描述查询结果,而不执行它
\gexec 执行策略,然后执行其结果中的每个值
\gset [PREFIX] execute query and store result in psql variables
\gx [(OPTIONS)] [FILE] 就像\g,但强制扩展输出模式
\q 退出 psql
\watch [[i=]SEC] [c=N] execute query every SEC seconds, up to N times

帮助
\? [commands] 显示反斜线命令的帮助
\? options 显示 psql 命令行选项的帮助
\? variables 显示特殊变量的帮助
\h [NAME] SQL命令语法上的说明,用*显示全部命令的语法说明

查询缓存区
\e [FILE] [LINE] 使用外部编辑器编辑查询缓存区(或文件)
\ef [FUNCNAME [LINE]] 使用外部编辑器编辑函数定义
\ev [VIEWNAME [LINE]] 用外部编辑器编辑视图定义
\p 显示查询缓存区的内容
\r 重置(清除)查询缓存区
\w 文件 将查询缓存区的内容写入文件

输入/输出
\copy ... 执行 SQL COPY,将数据流发送到客户端主机
\echo [-n] [STRING] 将字符串写到标准输出(-n表示没有换行符)
\i 文件 从文件中执行命令
\ir FILE 与 \i类似, 但是相对于当前脚本的位置
\o [文件] 将全部查询结果写入文件或 |管道
\qecho [-n] [STRING] 将字符串写入\o输出流(-n表示无换行)
\warn [-n] [STRING] 将字符串写入标准错误(-n 表示无换行)

条件
\if EXPR 开始条件块
\elif EXPR 当前条件块内的备选方案
\else 当前条件块内的最终备选方案
\endif 条件块的结尾

资讯性
(选项: S = 显示系统对象, + = 其余的详细信息)
\d[S+] 列出表,视图和序列
\d[S+] 名称 描述表,视图,序列,或索引
\da[S] [模式] 列出聚合函数
\dA[+] [模式] 列出访问方法
\dAc[+] [AMPTRN [TYPEPTRN]] 列出运算符
\dAf[+] [AMPTRN [TYPEPTRN]] 列出运算符集合
\dAo[+] [AMPTRN [OPFPTRN]] 列出运算符集合
\dAp[+] [AMPTRN [OPFPTRN]] 列出运算符集合所支持的功能
\db[+] [模式] 列出表空间
\dc[S+] [模式] 列表转换
\dconfig[+] [PATTERN] list configuration parameters
\dC[+] [模式] 列出类型强制转换
\dd[S] [模式] 显示没有在别处显示的对象描述
\dD[S+] [模式] 列出共同值域
\ddp [模式] 列出默认权限
\dE[S+] [模式] 列出引用表
\des[+] [模式] 列出外部服务器
\det[+] [模式] 列出引用表
\deu[+] [模式] 列出用户映射
\dew[+] [模式] 列出外部数据封装器
\df[anptw][S+] [FUNCPTRN [TYPEPTRN ...]]
列出 [only agg/normal/procedure/trigger/window] 函数
\dF[+] [模式] 列出文本搜索配置
\dFd[+] [模式] 列出文本搜索字典
\dFp[+] [模式] 列出文本搜索解析器
\dFt[+] [模式] 列出文本搜索模版
\dg[S+] [模式] 列出角色
\di[S+] [模式] 列出索引
\dl[+] list large objects, same as \lo_list
\dL[S+] [模式] 列出所有过程语言
\dm[S+] [模式] 列出所有物化视图
\dn[S+] [模式] 列出所有模式
\do[S+] [OPPTRN [TYPEPTRN [TYPEPTRN]]]
列出运算符
\dO[S+] [模式] 列出所有校对规则
\dp[S] [PATTERN] list table, view, and sequence access privileges
\dP[itn+] [PATTERN] 列出[仅表/索引]分区关系[n=nested]
\drds [ROLEPTRN [DBPTRN]] list per-database role settings
\drg[S] [PATTERN] list role grants
\dRp[+] [模式] 列出复制发布
\dRs[+] [模式] 列出复制订阅
\ds[S+] [模式] 列出序列
\dt[S+] [模式] 列出表
\dT[S+] [模式] 列出数据类型
\du[S+] [模式] 列出角色
\dv[S+] [模式] 列出视图
\dx[+] [模式] 列出扩展
\dX [PATTERN] 列出扩展统计信息
\dy[+] [PATTERN] 列出所有事件触发器
\l[+] [模式] 列出所有数据库
\sf[+] FUNCNAME 显示一个函数的定义
\sv[+] VIEWNAME 显示一个视图的定义
\z[S] [PATTERN] same as \dp

大对象
\lo_export LOBOID FILE write large object to file
\lo_import FILE [COMMENT]
read large object from file
\lo_list[+] list large objects
\lo_unlink LOBOID delete a large object

格式化
\a 在非对齐模式和对齐模式之间切换
\C [字符串] 设置表的标题,或如果没有的标题就取消
\f [字符串] 显示或设定非对齐模式查询输出的字段分隔符
\H 切换HTML输出模式 (目前是 关闭)
\pset [NAME [VALUE]] 设置表输出选项
(border|columns|csv_fieldsep|expanded|fieldsep|
fieldsep_zero|footer|format|linestyle|null|
numericlocale|pager|pager_min_lines|recordsep|
recordsep_zero|tableattr|title|tuples_only|
unicode_border_linestyle|unicode_column_linestyle|
unicode_header_linestyle
\t [开|关] 只显示记录 (目前是关闭)
\T [字符串] 设置HTML <表格>标签属性, 或者如果没有的话取消设置
\x [on|off|auto] 切换扩展输出模式(目前是 关闭)

连接
\c[onnect] {[DBNAME|- USER|- HOST|- PORT|-] | conninfo}
连接到新数据库(当前是"postgres"
\conninfo 显示当前连接的相关信息
\encoding [编码名称] 显示或设定客户端编码
\password [USERNAME] 安全地为用户更改口令

操作系统
\cd [目录] 更改目前的工作目录
\getenv PSQLVAR ENVVAR fetch environment variable
\setenv NAME [VALUE] 设置或清空环境变量
\timing [开|关] 切换命令计时开关 (目前是关闭)
\! [命令] 在 shell中执行命令或启动一个交互式shell

变量
\prompt [文本] 名称 提示用户设定内部变量
\set [名称 [值数]] 设定内部变量,若无参数则列出全部变量
\unset 名称 清空(删除)内部变量

批量写入

import psycopg2

# 连接到 PostgreSQL 数据库
conn = psycopg2.connect(database="your_db", user="your_user", password="your_password", host="localhost", port="5432")
cur = conn.cursor()

# 定义要插入的数据
data = [('value1', 'value2'), ('value3', 'value4')]

try:
# 开始事务
cur.execute("BEGIN TRANSACTION;")

# 构建 INSERT INTO 语句并执行
query = "INSERT INTO target_table (column1, column2) VALUES %s;"
execute_values(query, data, page_size=1000)

# 提交事务
cur.execute("COMMIT;")
except Exception as e:
print("Error occurred during batch insertion: ", str(e))
# 如果发生错误则回滚事务
cur.execute("ROLLBACK;")
finally:
# 关闭数据库连接
cur.close()
conn.close()

(建库表备份)

  • cryptodb kline_1m
CREATE TABLE kline_1m (
_date DATE NOT NULL,
_symbol VARCHAR(30) NOT NULL,
_timestamp timestamp NOT NULL,
_open float8,
_high float8,
_low float8,
_close float8,
_vol float8
) PARTITION BY RANGE (_date);


DO $$
DECLARE
start_date DATE := '2019-01-01';
end_date DATE := '2026-01-01';
cur_date DATE := start_date; -- 将 current_date 改为 cur_date
BEGIN
WHILE cur_date < end_date LOOP
-- 创建每日的分区
EXECUTE format('CREATE TABLE kline_1m_%s PARTITION OF kline_1m FOR VALUES FROM (%L) TO (%L) PARTITION BY HASH (_symbol);',
to_char(cur_date, 'YYYYMMDD'),
cur_date,
cur_date + 1);

-- 为每日的分区创建哈希子分区
FOR i IN 0..9 LOOP
EXECUTE format('CREATE TABLE kline_1m_%s_symbol_%s PARTITION OF kline_1m_%s FOR VALUES WITH (MODULUS 10, REMAINDER %s);',
to_char(cur_date, 'YYYYMMDD'), i,
to_char(cur_date, 'YYYYMMDD'), i);
END LOOP;

cur_date := cur_date + 1; -- 同样将 current_date 改为 cur_date
END LOOP;
END $$;


SELECT relname, pg_get_expr(relpartbound, oid)
FROM pg_class
WHERE relpartbound is not null;
  • factordb factor_5m
CREATE TABLE factor_5m (
_date DATE NOT NULL,
_factor VARCHAR(30) NOT NULL,
_timestamp timestamp NOT NULL,
_symbol VARCHAR(30) NOT NULL,
_val float8,
) PARTITION BY RANGE (_date);


DO $$
DECLARE
start_date DATE := '2023-01-01';
end_date DATE := '2026-01-01';
cur_date DATE := start_date;
BEGIN
WHILE cur_date < end_date LOOP
EXECUTE format('CREATE TABLE factor_5m_%s PARTITION OF factor_5m FOR VALUES FROM (%L) TO (%L) PARTITION BY HASH (_symbol);',
to_char(cur_date, 'YYYYMMDD'),
cur_date,
cur_date + 1);

-- 为每日的分区创建哈希子分区
FOR i IN 0..9 LOOP
EXECUTE format('CREATE TABLE factor_5m_%s_symbol_%s PARTITION OF factor_5m_%s FOR VALUES WITH (MODULUS 10, REMAINDER %s);',
to_char(cur_date, 'YYYYMMDD'), i,
to_char(cur_date, 'YYYYMMDD'), i);
END LOOP;

cur_date := cur_date + 1; -- 同样将 current_date 改为 cur_date
END LOOP;
END $$;
  • factordb factor_1d
CREATE TABLE factor_1d (
_year int2 NOT NULL,
_factor VARCHAR(30) NOT NULL,
_timestamp timestamp NOT NULL,
_symbol VARCHAR(30) NOT NULL,
_val float8,
) PARTITION BY RANGE (_year);

DO $$
DECLARE
start_year INT := 2019;
end_year INT := 2026;
cur_year INT := start_year;
BEGIN
WHILE cur_year <= end_year LOOP
EXECUTE format('CREATE TABLE factor_1d_%s PARTITION OF factor_1d FOR VALUES FROM (%L) TO (%L) PARTITION BY HASH (_symbol);',
cur_year, -- 直接使用年份
cur_year,
cur_year + 1);

-- 为每年的分区创建哈希子分区
FOR i IN 0..9 LOOP
EXECUTE format('CREATE TABLE factor_1d_%s_symbol_%s PARTITION OF factor_1d_%s FOR VALUES WITH (MODULUS 10, REMAINDER %s);',
cur_year, i, -- 直接使用年份
cur_year, i);
END LOOP;

cur_year := cur_year + 1; -- 将年份加1
END LOOP;
END $$;
  • optiondb option_data
CREATE TABLE option_data (
_date date NOT NULL,
_symbol VARCHAR(30) NOT NULL,
_timestamp timestamp,
_type VARCHAR(10),
_strike_price float8,
_expiration timestamp,
_open_interest float8,
_last_price float8,
_bid_price float8,
_bid_amount float8,
_bid_iv float8,
_ask_price float8,
_ask_amount float8,
_ask_iv float8,
_mark_price float8,
_mark_iv float8,
_underlying_index VARCHAR(30),
_underlying_price float8,
_delta float8,
_gamma float8,
_vega float8,
_theta float8,
_rho float8,
) PARTITION BY RANGE (_date);


DO $$
DECLARE
start_date DATE := '2023-01-01';
end_date DATE := '2026-01-01';
cur_date DATE := start_date;
BEGIN
WHILE cur_date < end_date LOOP
EXECUTE format('CREATE TABLE option_data_%s PARTITION OF option_data FOR VALUES FROM (%L) TO (%L) PARTITION BY HASH (_symbol);',
to_char(cur_date, 'YYYYMMDD'),
cur_date,
cur_date + 1);

-- 为每日的分区创建哈希子分区
FOR i IN 0..24 LOOP
EXECUTE format('CREATE TABLE option_data_%s_symbol_%s PARTITION OF option_data_%s FOR VALUES WITH (MODULUS 25, REMAINDER %s);',
to_char(cur_date, 'YYYYMMDD'), i,
to_char(cur_date, 'YYYYMMDD'), i);
END LOOP;

cur_date := cur_date + 1; -- 同样将 current_date 改为 cur_date
END LOOP;
END $$;
  • factordb factor_1m
CREATE TABLE factor_1m (
_date DATE NOT NULL,
_factor VARCHAR(30) NOT NULL,
_timestamp timestamp NOT NULL,
_symbol VARCHAR(30) NOT NULL,
_val float8,
) PARTITION BY RANGE (_date);

DO $$
DECLARE
start_date DATE := '2023-01-01';
end_date DATE := '2026-01-01';
cur_date DATE := start_date;
BEGIN
WHILE cur_date < end_date LOOP
EXECUTE format('CREATE TABLE factor_1m_%s PARTITION OF factor_1m FOR VALUES FROM (%L) TO (%L) PARTITION BY HASH (_symbol);',
to_char(cur_date, 'YYYYMMDD'),
cur_date,
cur_date + 1);

FOR i IN 0..9 LOOP
EXECUTE format('CREATE TABLE factor_1m_%s_symbol_%s PARTITION OF factor_1m_%s FOR VALUES WITH (MODULUS 10, REMAINDER %s);',
to_char(cur_date, 'YYYYMMDD'), i,
to_char(cur_date, 'YYYYMMDD'), i);
END LOOP;

cur_date := cur_date + 1;
END LOOP;
END $$;
import numpy as np
import pandas as pd
from gplearn import fitness
# 数据集
from gplearn.genetic import SymbolicRegressor

train_data = pd.read_csv('../data/IC_train.csv', index_col=0, parse_dates=[0])
test_data = pd.read_csv('../data/IC_test.csv', index_col=0, parse_dates=[0])
feature_names = list(train_data.columns)
train_data.loc[:, 'y'] = np.log(train_data['Close'].shift(-4) / train_data['Close'])
train_data.dropna(inplace=True)

from examples.backtest import BackTester

class SymbolicTestor(BackTester): # 回测的设定
def init(self):
self.params = {'factor': pd.Series}

@BackTester.process_strategy
def run_(self, *args, **kwargs) -> dict[str: int]:
factor = np.array(self.params['factor'])
long_cond = factor > 0
short_cond = factor < 0
self.backtest_env['signal'] = np.where(long_cond, 1, np.where(short_cond, -1, np.nan))
self.construct_position_(keep_raw=True, max_holding_period=1200, take_profit=None, stop_loss=None)

# 回测环境(适应度函数)
comm = [0 / 10000, 0 / 10000] # 买卖费率
bt = SymbolicTestor(train_data, transact_base='PreClose', commissions=(comm[0], comm[1])) # 加载数据,根据Close成交,comm是买-卖

def score_func_basic(y, y_pred, sample_weight): # 因子评价指标
try:
_ = bt.run_(factor=y_pred)
factor_ret = _['annualized_mean']/_['max_drawdown'] if _['max_drawdown'] != 0 else 0 # 可以把max_drawdown换成annualized_std
except:
factor_ret = 0
return factor_ret

def my_gplearn(function_set, my_fitness, pop_num=100, gen_num=3, tour_num=10, random_state = 42, feature_names=None):
# pop_num, gen_num, tour_num的几个可选值:500, 5, 50; 1000, 3, 20; 1000, 15, 100
metric = fitness.make_fitness(function=my_fitness, # function(y, y_pred, sample_weight) that returns a floating point number.
greater_is_better=True, # 上述y是输入的目标y向量,y_pred是genetic program中的预测值,sample_weight是样本权重向量
wrap=False) # 不保存,运行的更快 # gplearn.fitness.make_fitness(function, greater_is_better, wrap=True)
return SymbolicRegressor(population_size=pop_num, # 每一代公式群体中的公式数量 500,100
generations=gen_num, # 公式进化的世代数量 10,3
metric=metric, # 适应度指标,这里是前述定义的通过 大于0做多,小于0做空的 累积净值/最大回撤 的评判函数
tournament_size=tour_num, # 在每一代公式中选中tournament的规模,对适应度最高的公式进行变异或繁殖 50
function_set=function_set,
const_range=(-1.0, 1.0), # 公式中包含的常数范围
parsimony_coefficient='auto',
# 对较大树的惩罚,默认0.001,auto则用c = Cov(l,f)/Var( l), where Cov(l,f) is the covariance between program size l and program fitness f in the population, and Var(l) is the variance of program sizes.
stopping_criteria=100.0, # 是对metric的限制(此处为收益/回撤)
init_depth=(2, 3), # 公式树的初始化深度,树深度最小2层,最大6层
init_method='half and half', # 树的形状,grow生分枝整的不对称,full长出浓密
p_crossover=0.8, # 交叉变异概率 0.8
p_subtree_mutation=0.05, # 子树变异概率
p_hoist_mutation=0.05, # hoist变异概率 0.15
p_point_mutation=0.05, # 点变异概率
p_point_replace=0.05, # 点变异中每个节点进行变异进化的概率
max_samples=1.0, # The fraction of samples to draw from X to evaluate each program on.
feature_names=feature_names, warm_start=False, low_memory=False,
n_jobs=1,
verbose=1,
random_state=random_state)

# 函数集
function_set=['add', 'sub', 'mul', 'div', 'sqrt', 'log', # 用于构建和进化公式使用的函数集
'abs', 'neg', 'inv', 'sin', 'cos', 'tan', 'max', 'min',
# 'if', 'gtpn', 'andpn', 'orpn', 'ltpn', 'gtp', 'andp', 'orp', 'ltp', 'gtn', 'andn', 'orn', 'ltn', 'delayy', 'delta', 'signedpower', 'decayl', 'stdd', 'rankk'
] # 最后一行是自己的函数,目前不用自己函数效果更好

my_cmodel_gp = my_gplearn(function_set, score_func_basic, random_state=0,
feature_names=feature_names) # 可以通过换random_state来生成不同因子
my_cmodel_gp.fit(train_data.loc[:, :'rank_num'].values, train_data.loc[:, 'y'].values)
print(my_cmodel_gp)

# 策略结果
factor = my_cmodel_gp.predict(test_data.values)
bt_test = SymbolicTestor(test_data, transact_base='PreClose', commissions=(comm[0], comm[1])) # 加载数据,根据Close成交,comm是买-卖
bt_test.run_(factor=factor)
md = bt_test.summary()
print(md.out_stats)
print(bt.fees_factor)
md.plot_(comm=comm, show_bool=True)

函数

  • 获取所有分区表
SELECT 
pg_class.relname AS partition_table_name
FROM
pg_inherits
JOIN
pg_class ON pg_inherits.inhrelid = pg_class.oid
WHERE
pg_inherits.inhparent = 'option_data'::regclass;
  • 新建表空间
create tablespace pg_2 owner postgres location 'G:\DevTools\PostgreSQL\16\data';
  • 批量更换表空间
DO $$
DECLARE
start_date DATE := '2021-01-01';
end_date DATE := '2023-01-01';
cur_date DATE := start_date;
BEGIN
WHILE cur_date < end_date LOOP

FOR i IN 0..24 LOOP
EXECUTE format('ALTER TABLE option_data_%s_symbol_%s SET TABLESPACE database_2;',
to_char(cur_date, 'YYYYMMDD'), i);
END LOOP;

cur_date := cur_date + 1;
END LOOP;
END $$;
  • 添加索引
navicat -> 字段 -> 索引 -> 添加索引 ->  名:idx_timestamp  字段:_timestamp  索引方法:B-Tree