利用Python把港交所買回來的期指交易紀錄整理成OHLC

#1 讀取CSV檔
#2 設定dataframe的每一行的data type.
#3 過濾掉沒有用的交易
#4 儲存成為tick CSV

下面是convert_hkex_data_to_csv.py


import os, sys, pandas as pd, numpy as np, multiprocessing as mp, datetime
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from pathlib import Path
import data_action_config as config

def formatting_csv_data(month_str, data_folder_path, tr_csv_file, tr_aht_csv_file):
start_time = datetime.datetime.now()
print(month_str, "function formatting_csv_data:", month_str, data_folder_path, tr_csv_file, tr_aht_csv_file)
is_target_file_exist = False
if tr_csv_file is not None:
target_csv_path = config.HKEX_INDEX_FUTURE_CSV_DATA_DIR + "tick\\" +month_str + "_HSI_tick.csv"
target_csv = Path(target_csv_path)
if target_csv.is_file():
is_target_file_exist = True
if not is_target_file_exist:
print(month_str, "formatted csv file not exist. Start formatting")
month_int = int(month_str)
columns = []
converters = []
format_version = -1
if month_int<=200412:
columns = ["ticker", "future_option", "expiry_month", "strike_price", "call_put", "date_datetime",
"time", "price", "quantity"]
converters = {"ticker":str, "future_option":str, "expiry_month":str, "strike_price":int, "call_put":str, "date":int,
"time":int, "price":int, "quantity":int}
format_version = 1
elif month_int<=201310:
columns = ["ticker", "future_option", "expiry_month", "strike_price", "call_put", "date_datetime",
"time", "price", "quantity", "trade_type"]
converters = {"ticker":str, "future_option":str, "expiry_month":str, "strike_price":int, "call_put":str, "date":int,
"time":int, "price":int, "quantity":int, "trade_type":int}
format_version = 2
else:
columns = ["ticker", "future_option", "expiry_month", "strike_price", "call_put", "date_datetime",
"time", "price", "quantity", "trade_type"]
converters = {"ticker":str, "future_option":str, "expiry_month":str, "strike_price":int, "call_put":str, "date":int,
"time":int, "price":int, "quantity":int, "trade_type":int}
format_version = 3

print(month_str, "format version:", format_version)
df1 = df2 = None
if tr_csv_file is not None:
df1 = pd.read_csv(data_folder_path + tr_csv_file, names=columns, index_col=False, parse_dates=['date_datetime'])
else:
print("tr_csv_file:",tr_csv_file,"not found")

if tr_aht_csv_file is not None:
print(month_str, "tr_aht_csv_file:", tr_aht_csv_file, "found")
df2 = pd.read_csv(data_folder_path + tr_aht_csv_file, names=columns, index_col=False, parse_dates=['date_datetime'])
else:
print(month_str, "tr_aht_csv_file:", tr_aht_csv_file, "not found")

if df2 is None:
df = df1
elif df1 is None:
df = df2
else:
print(month_str, "concat tr_csv_file and tr_aht_csv_file")
df = pd.concat([df1, df2])
print(month_str, "concat finish")

if format_version == 1:
print(month_str, "insert trade_type for format version 1 data")
df['trade_type'] = 2

#Select future data
print(month_str, "select future data")
df['future_option'] = df['future_option'].astype(str)
df = pd.DataFrame(df.ix[(df['future_option'] == "F")])

print(month_str, "create date time related column")
df['date'] = df.apply(lambda x: x['date_datetime'].strftime('%Y%m%d'), axis=1)
df['date'] = df['date'].astype(int)
df['time'] = df['time'].astype(int)
df['hour_int'] = np.floor(df['time'] / 10000).astype(int)
df['mintus_int'] = np.floor(df['time'] / 100).astype(int) - df['hour_int'] * 100
df['second_int'] = np.floor(df['time'] % 100).astype(int)
df['datetime'] = df.apply(lambda x: pd.datetime.strptime(
"{0} {1}:{2}:{3}".format(x['date_datetime'].strftime('%Y-%m-%d'), x['hour_int'], x['mintus_int'], x['second_int']),
"%Y-%m-%d %H:%M:%S"), axis=1)

print(month_str, "update data type")
#fill nan to zero before set data type to int32
df[['strike_price']] = df[['strike_price']].fillna(value=0)
df['ticker'] = df['ticker'].astype(str)
df['expiry_month'] = df['expiry_month'].astype(str)
df['strike_price'] = df['strike_price'].astype(int)
df['call_put'] = df['call_put'].astype(str)
df['price'] = df['price'].astype(int)
df['quantity'] = df['quantity'].astype(int)
df['trade_type'] = df['trade_type'].astype(int)

print(month_str, "sort dataframe by date, time, row_id with ascending order")
df['row_id'] = df.index
df.sort_values(by=["date","time","row_id"], ascending=[True,True,True], inplace=True)
df.reset_index()

print(month_str, "select trade type")
df = pd.DataFrame(df.ix[(df['trade_type'] != 3) &
(df['trade_type'] != 4) &
(df['trade_type'] != 5) &
(df['trade_type'] != 6) &
(df['trade_type'] != 7) &
(df['trade_type'] != 20) &
(df['trade_type'] != 36) &
(df['trade_type'] != 43)])

print(month_str, "categorise dataframe by ticker")
symbol_df = {}
ticker_df = pd.DataFrame({'count': df.groupby(["ticker"]).size()}).reset_index()
for index, row in ticker_df.iterrows():
print(month_str, row['ticker'], "found")
symbol_df[row['ticker']] = pd.DataFrame(df.ix[(df['ticker'] == row['ticker'])])

print(month_str, "create trade date dataframe")
trade_date_df = pd.DataFrame({'count' : df.groupby( ["date"] ).size()}).reset_index()
trade_date_df = trade_date_df.reset_index(drop=True)
trade_date_df['close_time'] = 0
trade_date_df['aht_close_time'] = 0
trade_date_df['expected_expiry_month'] = "0"
num_trade_date = len(trade_date_df)

print(month_str, "find close time and expiry month for each trade day")
for index, row in trade_date_df.iterrows():
single_day_df = pd.DataFrame(df.ix[(df['date'] == row['date']) & (df['time'] < 170000)])
if len(single_day_df)!=0:
single_day_df = single_day_df.iloc[[-1]]
single_day_df = single_day_df.reset_index(drop=True)
trade_date_df.set_value(index, 'close_time', single_day_df.get_value(0, 'time'))

single_day_df = pd.DataFrame(df.ix[(df['date'] == row['date']) & (df['time'] > 170000)])
if len(single_day_df) != 0:
single_day_df = single_day_df.iloc[[-1]]
single_day_df = single_day_df.reset_index(drop=True)
trade_date_df.set_value(index, 'aht_close_time', single_day_df.get_value(0, 'time'))

current_expiry_month = month_str[2:]
next_expiry_year = int(current_expiry_month[:2])
next_expiry_month = int(current_expiry_month[2:]) + 1
if(next_expiry_month>12):
next_expiry_month = 1
next_expiry_year += 1
next_expiry_year = str(next_expiry_year).zfill(2)
next_expiry_month = str(next_expiry_month).zfill(2)
next_expiry_month = next_expiry_year + next_expiry_month

if(index < num_trade_date-4):
target_expiry_month = current_expiry_month
else:
target_expiry_month = next_expiry_month
trade_date_df.set_value(index, 'expected_expiry_month', target_expiry_month)

for ticker, ticker_df in symbol_df.items():
print(month_str, "add expiry month for each ticker:", ticker)
ticker_df['expected_expiry_month'] = -1
for index, row in trade_date_df.iterrows():
date = row['date']
expected_expiry_month = row['expected_expiry_month']

ticker_df['expected_expiry_month'] = np.where(ticker_df['date'] == date,
expected_expiry_month, ticker_df['expected_expiry_month'])

ticker_df['expiry_month_int'] = ticker_df['expiry_month']
ticker_df['expected_expiry_month_int'] = ticker_df['expected_expiry_month']
ticker_df['expiry_month_int'] = ticker_df['expiry_month_int'].astype(int)
ticker_df['expected_expiry_month_int'] = ticker_df['expected_expiry_month_int'].astype(int)

ticker_df = pd.DataFrame(ticker_df.ix[(ticker_df['expiry_month_int'] == ticker_df['expected_expiry_month_int'])])
ticker_df = ticker_df.drop(
['future_option', 'strike_price', 'call_put', 'date_datetime', 'trade_type', 'hour_int', 'mintus_int',
'second_int', 'datetime', 'row_id', 'expected_expiry_month', 'expiry_month_int', 'expected_expiry_month_int'], 1)

ticker_df['expiry_month'] = ticker_df['expiry_month'].astype(str)
ticker_df['expiry_month'] = ticker_df['expiry_month'].str.zfill(4)
ticker_df = ticker_df[['ticker', 'date', 'time', 'quantity', 'price', 'expiry_month']]
ticker_df = ticker_df.reset_index(drop=True)
ticker_df.to_csv(config.HKEX_INDEX_FUTURE_CSV_DATA_DIR + "tick\\" +month_str + "_"+ticker+"_tick.csv",index=False)

trade_date_df.to_csv(config.HKEX_INDEX_FUTURE_CSV_DATA_DIR + "trade_date\\" +month_str + "_trade_date.csv",index=False)
time_used = datetime.datetime.now() - start_time
print(month_str, "complete. Time used:", time_used.total_seconds(), "seconds.")
return df
else:
print("Formatted csv file found and exit function now.")
return None
return None

if __name__ == "__main__":
print(config.HKEX_INDEX_FUTURE_RAW_DATA_DIR)
pool = mp.Pool(processes=8)
################################################################################################################
for f in os.listdir(config.HKEX_INDEX_FUTURE_RAW_DATA_DIR):
#2. Find csv file in side the folder in #1
#check folder is leading with tt which are raw data folder
if f[0:2] == "tt":
data_folder_path = config.HKEX_INDEX_FUTURE_RAW_DATA_DIR + f + "\\"
month_str = f.replace("tt_D01_", "")
tr_csv_file = None
tr_aht_csv_file = None
for raw_csv_file in os.listdir(data_folder_path):
if month_str in raw_csv_file:
if "_TR.csv" in raw_csv_file:
# set tr file name if found
tr_csv_file = raw_csv_file
if "_TR_AHT.csv" in raw_csv_file:
tr_aht_csv_file = raw_csv_file
pool.apply_async(formatting_csv_data, args=(month_str, data_folder_path, tr_csv_file, tr_aht_csv_file,))
print(month_str, tr_csv_file, tr_aht_csv_file)
pool.close()
pool.join()

#5 再次讀入tick CSV
#6 Resmaple tick data 成為 1 秒的OHLC
#7 儲存1秒OHLC CSV
下面是convert_hkex_data_to_csv.py


import os, sys, pandas as pd, numpy as np, multiprocessing as mp, datetime
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from pathlib import Path
import data_action_config as config

def is_trading_date(row):
trade_date = [int(i) for i in row['trade_date'].split(',')]
if row['day'] in trade_date:
return True
else:
return False

def is_trading_time(row):
morning_open, morning_close, afternoon_open, afternoon_close, night_open, night_close = get_trade_hour_by_period(get_trade_period_by_date(row['datetime']))
hour = row['datetime'].hour
if hour < 10:
hour = "0" + str(hour)
else:
hour = str(hour)

minute = row['datetime'].minute
if minute < 10:
minute = "0" + str(minute)
else:
minute = str(minute)

second = row['datetime'].second
if second < 10:
second = "0" + str(second)
else:
second = str(second)
int_time = int(hour + minute + second)
if (int_time >= morning_open and int_time <= morning_close):
result = False
elif (int_time >= afternoon_open and int_time <= afternoon_close):
result = False
elif night_open != None and night_close != None:
if (int_time >= night_open and int_time <= night_close):
result = False
else:
result = True
else:
result = True
return result

def get_trade_period_by_date(_date):
if (_date < pd.to_datetime('2012-03-07')):
period = 1
elif (_date < pd.to_datetime('2013-03-05')):
period = 2
elif (_date < pd.to_datetime('2016-07-25')):
if (_date < pd.to_datetime('2013-04-08')):
period = 3 #no night market
elif (_date < pd.to_datetime('2014-11-03')):
period = 4 #has night market close at 2300
else:
period = 5 #has night market close at 2345
else:
period = 6 #has U order
return period

def get_trade_hour_by_period( _period):
morning_open = None
morning_close = None
afternoon_open = None
afternoon_close = None
night_open = None
night_close = None
if (_period == 1):
morning_open = 94500 #094500
morning_close = 123000 #123000
afternoon_open = 143000 #143000
afternoon_close = 161500 #161500
elif(_period == 2):
morning_open = 91500 #091500
morning_close = 120000 #120000
afternoon_open = 133000 #133000
afternoon_close = 161500 #161500
elif(_period == 3):
morning_open = 91500 #091500
morning_close = 120000 #120000
afternoon_open = 130000 #130000
afternoon_close = 161500 #161500
elif(_period == 4):
morning_open = 91500 #091500
morning_close = 120000 #120000
afternoon_open = 130000 #130000
afternoon_close = 161500 #161500
night_open = 170000 #170000
night_close = 230000 #230000
elif(_period == 5):
morning_open = 91500 #091500
morning_close = 120000 #120000
afternoon_open = 130000 #130000
afternoon_close = 161500 #161500
night_open = 170000 #170000
night_close = 234500 #234500
elif(_period == 6):
morning_open = 91500 #091500
morning_close = 120000 #120000
afternoon_open = 130000 #130000
afternoon_close = 161500 #161500
night_open = 171500 #171500
night_close = 234500 #234500

return morning_open, morning_close, afternoon_open, afternoon_close, night_open, night_close

def prepare_ticker_df(ticker_df):
ticker_df['expiry_month'] = ticker_df['expiry_month'].astype(str)
ticker_df['expiry_month'] = ticker_df['expiry_month'].str.zfill(4)
ticker_df['ticker'] = ticker_df.index

ticker_df['time'] = ticker_df['time'].astype(int)
ticker_df['hour'] = np.floor(ticker_df['time'] / 10000).astype(int)
ticker_df['mintus'] = np.floor(ticker_df['time'] / 100).astype(int) - ticker_df['hour'] * 100
ticker_df['second'] = np.floor(ticker_df['time'] % 100).astype(int)

ticker_df['date_int'] = ticker_df['date']
ticker_df['date_int'] = ticker_df['date_int'].astype(int)
ticker_df['year'] = np.floor(ticker_df['date_int'] / 10000).astype(int)
ticker_df['month'] = np.floor(ticker_df['date_int'] / 100).astype(int) - ticker_df['year'] * 100
ticker_df['day'] = np.floor(ticker_df['date_int'] % 100).astype(int)

ticker_df['datetime'] = ticker_df.apply(lambda x: pd.datetime.strptime(
"{0}-{1}-{2} {3}:{4}:{5}".format(x['year'], x['month'], x['day'], x['hour'], x['mintus'], x['second']),
"%Y-%m-%d %H:%M:%S"), axis=1)
ticker_df = ticker_df[['ticker', 'datetime', 'date', 'time', 'quantity', 'price', 'expiry_month']]
ticker_df.index = ticker_df['datetime']

return ticker_df

def resample_tick_data_to_ohlc(ticker_df, trade_date_str, resample_size, resample_csv_file):
ohlc_df = ticker_df['price'].resample(resample_size).ohlc().ffill()
quantity_series = ticker_df['quantity'].resample(resample_size).sum().fillna(0)
ohlc_df['volume'] = quantity_series

ohlc_df['open'] = ohlc_df['open'].astype(int)
ohlc_df['high'] = ohlc_df['high'].astype(int)
ohlc_df['low'] = ohlc_df['low'].astype(int)
ohlc_df['close'] = ohlc_df['close'].astype(int)
ohlc_df['volume'] = ohlc_df['volume'].astype(int)

if resample_size in ["1S", "2S", "5S", "30S", "1T", "2T", "5T", "8T", "10T", "12T", "15T", "20T", "30T", "1H",
"2H"]:
ohlc_df['datetime'] = ohlc_df.index
ohlc_df['is_out_of_trade_hour'] = ohlc_df.apply(is_trading_time, axis=1) # returns DataFrame
selector = (ohlc_df['is_out_of_trade_hour'] == True)
ohlc_df.drop(ohlc_df.index[selector], inplace=True)

ohlc_df['trade_date'] = trade_date_str
ohlc_df['day'] = ohlc_df.index.day

ohlc_df['is_trade_date'] = ohlc_df.apply(is_trading_date, axis=1) # returns DataFrame
selector = (ohlc_df['is_trade_date'] == False)
ohlc_df.drop(ohlc_df.index[selector], inplace=True)
ohlc_df.drop(['is_out_of_trade_hour', 'trade_date', 'day', 'is_trade_date'], axis=1, inplace=True)

ohlc_df.drop('datetime', axis=1, inplace=True)

if resample_size in ["1D"]:
ohlc_df = ohlc_df[ohlc_df.volume != 0]

ohlc_df.to_csv(resample_csv_file)

if __name__ == "__main__":
resample_size_list = ["1S", "2S", "5S", "30S", "1T", "2T", "5T", "8T", "10T", "12T", "15T", "20T", "30T", "1H",
"2H", "1D"]
tick_files_dir = config.HKEX_INDEX_FUTURE_CSV_DATA_DIR + "tick"
tick_file_list = [f for f in os.listdir(tick_files_dir) if os.path.isfile(os.path.join(tick_files_dir, f))]
trade_date_files_dir = config.HKEX_INDEX_FUTURE_CSV_DATA_DIR + "trade_date"
for tick_filename in tick_file_list:
tick_filename_list = tick_filename.split('_')
month_str = tick_filename_list[0]
ticker = tick_filename_list[1]
trade_date_filename = month_str + "_trade_date.csv"
print("prepare ticker dataframe:", tick_filename)
ticker_df = pd.DataFrame.from_csv(tick_files_dir + "\\" + tick_filename)
if len(ticker_df)==0:
print("ticker df len is 0. It will be skipped")
continue
ticker_df = prepare_ticker_df(ticker_df)
trade_date_df = pd.DataFrame.from_csv(trade_date_files_dir + "\\" + trade_date_filename)
trade_date_list = []
trade_date_df['date'] = trade_date_df.index
for index, row in trade_date_df.iterrows():
trade_date_list.append(str(row['date'].day))
trade_date_str = ",".join(trade_date_list)
pool = mp.Pool(processes=8)
for resample_size in resample_size_list:
resample_csv_file = config.HKEX_INDEX_FUTURE_CSV_DATA_DIR + resample_size + "\\" + month_str + "_" + ticker + "_" + resample_size +".csv"
if not os.path.exists(config.HKEX_INDEX_FUTURE_CSV_DATA_DIR + resample_size):
os.makedirs(config.HKEX_INDEX_FUTURE_CSV_DATA_DIR + resample_size)
target_csv_path = config.HKEX_INDEX_FUTURE_CSV_DATA_DIR + resample_size +"\\" + month_str + "_HSI_tick.csv"
target_csv = Path(resample_csv_file)
if not target_csv.is_file():
print("resample ticker dataframe:", tick_filename, "to", resample_size)
pool.apply_async(resample_tick_data_to_ohlc, args=(ticker_df, trade_date_str, resample_size, resample_csv_file,))
else:
print("file exist skipped resample ticker dataframe:", tick_filename, "to", resample_size)
pool.close()
pool.join()

Leave a Reply

Your email address will not be published. Required fields are marked *