-
Notifications
You must be signed in to change notification settings - Fork 45
/
preprocessing.py
116 lines (82 loc) · 3.31 KB
/
preprocessing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import os
from datetime import date
import pandas as pd
def set_dtypes(df):
"""
set datetimeindex and convert all columns in pd.df to their proper dtype
assumes csv is read raw without modifications; pd.read_csv(csv_filename)"""
df['open_time'] = pd.to_datetime(df['open_time'], unit='ms')
df = df.set_index('open_time', drop=True)
df = df.astype(dtype={
'open': 'float64',
'high': 'float64',
'low': 'float64',
'close': 'float64',
'volume': 'float64',
'close_time': 'datetime64[ms]',
'quote_asset_volume': 'float64',
'number_of_trades': 'int64',
'taker_buy_base_asset_volume': 'float64',
'taker_buy_quote_asset_volume': 'float64',
'ignore': 'float64'
})
return df
def set_dtypes_compressed(df):
"""Create a `DatetimeIndex` and convert all critical columns in pd.df to a dtype with low
memory profile. Assumes csv is read raw without modifications; `pd.read_csv(csv_filename)`."""
df['open_time'] = pd.to_datetime(df['open_time'], unit='ms')
df = df.set_index('open_time', drop=True)
df = df.astype(dtype={
'open': 'float32',
'high': 'float32',
'low': 'float32',
'close': 'float32',
'volume': 'float32',
'number_of_trades': 'uint16',
'quote_asset_volume': 'float32',
'taker_buy_base_asset_volume': 'float32',
'taker_buy_quote_asset_volume': 'float32'
})
return df
def assert_integrity(df):
"""make sure no rows have empty cells or duplicate timestamps exist"""
assert df.isna().all(axis=1).any() == False
assert df['open_time'].duplicated().any() == False
def quick_clean(df):
"""clean a raw dataframe"""
# drop dupes
dupes = df['open_time'].duplicated().sum()
if dupes > 0:
df = df[df['open_time'].duplicated() == False]
# sort by timestamp, oldest first
df.sort_values(by=['open_time'], ascending=False)
# just a doublcheck
assert_integrity(df)
return df
def write_raw_to_parquet(df, full_path):
"""takes raw df and writes a parquet to disk"""
# some candlesticks do not span a full minute
# these points are not reliable and thus filtered
df = df[~(df['open_time'] - df['close_time'] != -59999)]
# `close_time` column has become redundant now, as is the column `ignore`
df = df.drop(['close_time', 'ignore'], axis=1)
df = set_dtypes_compressed(df)
# give all pairs the same nice cut-off
df = df[df.index < str(date.today())]
df.to_parquet(full_path)
def groom_data(dirname='data'):
"""go through data folder and perform a quick clean on all csv files"""
for filename in os.listdir(dirname):
if filename.endswith('.csv'):
full_path = f'{dirname}/{filename}'
quick_clean(pd.read_csv(full_path)).to_csv(full_path)
def compress_data(dirname='data'):
"""go through data folder and rewrite csv files to parquets"""
os.makedirs('compressed', exist_ok=True)
for filename in os.listdir(dirname):
if filename.endswith('.csv'):
full_path = f'{dirname}/{filename}'
df = pd.read_csv(full_path)
new_filename = filename.replace('.csv', '.parquet')
new_full_path = f'compressed/{new_filename}'
write_raw_to_parquet(df, new_full_path)