-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
executable file
·230 lines (184 loc) · 7.73 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
import os
from enum import Enum
from typing import Tuple, Literal
import numpy as np
import pandas as pd
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import label_binarize
# Constant URL to connect to the Master node
MASTER_CONNECTION_URL = "spark://master-node:7077"
# Nombre de la columna de la clase en los DataFrame
NEW_CLASS_NAME = 'class'
# To prevent some errors with SVM
# EPSILON = 1.E-03
EPSILON = 1
# Available datasets to use
DatasetName = Literal['Breast_Invasive_Carcinoma', 'Kidney_Renal_Papillary_Cell_Carcinoma', 'Lung_Adenocarcinoma']
# Available models descriptions to use
ModelName = Literal['svm', 'rf', 'clustering']
# Available SurvivalSVM kernels to use
KernelName = Literal['linear', 'poly', 'rbf', 'cosine']
# Available SurvivalSVM optimizers to use
OptimizerName = Literal["avltree", "rbtree"]
# Clustering algorithm
ClusteringAlgorithm = Literal['kmeans', 'spectral']
# Clustering scoring method
ClusteringScoringMethod = Literal["concordance_index", "log_likelihood"]
class ClusteringAlgorithmEnum(Enum):
"""
Clustering algorithm. This is an exact copy of Multiomix values to parse get the correct value during
load balancer inference.
"""
K_MEANS = 1
SPECTRAL = 2
class ClusteringScoringMethodEnum(Enum):
"""
Clustering scoring method. This is an exact copy of Multiomix values to parse get the correct value during
load balancer inference.
"""
C_INDEX = 1
LOG_LIKELIHOOD = 2
class SVMKernelEnum(Enum):
"""
SVM kernel. This is an exact copy of Multiomix values to parse get the correct value during
load balancer inference.
"""
LINEAR = 1
POLYNOMIAL = 2
RBF = 3
def get_clustering_algorithm_enum_value(clustering_algorithm: ClusteringAlgorithm) -> ClusteringAlgorithmEnum:
"""
Gets the clustering algorithm enum value.
:param clustering_algorithm: ClusteringAlgorithm value
:return: ClusteringAlgorithmEnum value.
"""
if clustering_algorithm == 'kmeans':
return ClusteringAlgorithmEnum.K_MEANS
if clustering_algorithm == 'spectral':
return ClusteringAlgorithmEnum.SPECTRAL
raise ValueError(f'Clustering algorithm {clustering_algorithm} not found')
def get_clustering_scoring_method_enum_value(clustering_scoring_method: ClusteringScoringMethod) -> \
ClusteringScoringMethodEnum:
"""
Gets the clustering scoring method enum value.
:param clustering_scoring_method: ClusteringScoringMethod value
:return: ClusteringScoringMethodEnum value.
"""
if clustering_scoring_method == 'concordance_index':
return ClusteringScoringMethodEnum.C_INDEX
if clustering_scoring_method == 'log_likelihood':
return ClusteringScoringMethodEnum.LOG_LIKELIHOOD
raise ValueError(f'Clustering scoring method {clustering_scoring_method} not found')
def get_kernel_enum_value(kernel: KernelName) -> SVMKernelEnum:
"""
Gets the kernel enum value.
:param kernel: KernelName value
:return: SVMKernelEnum value.
"""
if kernel == 'linear':
return SVMKernelEnum.LINEAR
if kernel == 'poly':
return SVMKernelEnum.POLYNOMIAL
if kernel == 'rbf':
return SVMKernelEnum.RBF
raise ValueError(f'Kernel {kernel} not found')
def specificity(y_true, y_pred):
"""
Computa la metrica de Especificidad ya que Sklearn no lo implementa en la libreria
Solution taken from https://stackoverflow.com/questions/33275461/specificity-in-scikit-learn
:param y_true: Y true
:param y_pred: Y pred
:return: Especificidad
"""
conf_res = confusion_matrix(y_true, y_pred).ravel()
tn, fp = conf_res[0], conf_res[1]
return tn / (tn + fp)
def clean_dataset(df: pd.DataFrame) -> pd.DataFrame:
"""
Removes NaN and Inf values
:param df: DataFrame to clean
:return: Cleaned DataFrame
"""
assert isinstance(df, pd.DataFrame), "df needs to be a pd.DataFrame"
df = df.dropna(axis='columns')
indices_to_keep = ~df.isin([np.nan, np.inf, -np.inf]).any('columns')
return df[indices_to_keep].astype(np.float64)
def read_survival_data(add_epsilon: bool,
dataset_folder: DatasetName = 'Breast_Invasive_Carcinoma') -> Tuple[pd.DataFrame, np.ndarray]:
"""
Reads and preprocess survival dataset
:param add_epsilon: If True it adds an epsilon to 0s in Y data to prevent errors in SVM training
:param dataset_folder: Dataset's folder's name
:return: Tuple with the filtered DataFrame, Y data
"""
# Gets X
x_file_path = os.path.join(os.path.dirname(__file__),
f'Datasets/{dataset_folder}/data_mrna_seq_v2_rsem_zscores_ref_normal_samples.txt')
x = pd.read_csv(x_file_path, sep='\t', index_col=0)
x = x[x.index.notnull()] # Removes NaN indexes
# Removes '-1' suffix to make the intersection
x.columns = x.columns.str.replace("-01$", "", regex=True)
patients_x = x.columns.values
# Gets Y
y_file_path = os.path.join(os.path.dirname(__file__), f'Datasets/{dataset_folder}/data_clinical_patient.txt')
y = pd.read_csv(y_file_path, sep='\t', skiprows=4, index_col=0)
# Keep only survival data
y = y.loc[:, ['OS_STATUS', 'OS_MONTHS']]
cond_living = y['OS_STATUS'] == '0:LIVING'
y.loc[cond_living, 'OS_STATUS'] = False
y.loc[~cond_living, 'OS_STATUS'] = True
# Removes NaNs samples
indices_to_keep = ~y.isin([np.nan, np.inf, -np.inf]).any('columns')
y = y[indices_to_keep]
# Gets in common patients
patients_y = y.index.values
patients_intersect = np.intersect1d(patients_x, patients_y)
y = y.loc[patients_intersect, :]
# Removes zeros
if add_epsilon:
zeros_cond = y['OS_MONTHS'] == 0
y.loc[zeros_cond, 'OS_MONTHS'] = y.loc[zeros_cond, 'OS_MONTHS'] + 1
assert y[y['OS_MONTHS'] == 0].empty
# Removes unneeded column and transpose to keep samples as rows and genes as columns
x.drop('Entrez_Gene_Id', axis=1, inplace=True)
x = x.transpose()
x = x.loc[patients_intersect, :]
# Removes NaN and Inf values
x = clean_dataset(x)
# TODO: REMOVE! ONLY USEFUL FOR DEBUG
# x = x.iloc[:, :50]
# Formats Y to a structured array
y = np.core.records.fromarrays(y.to_numpy().transpose(), names='event, time', formats='bool, float')
return x, y
def rename_class_column_name(df: pd.DataFrame, class_name_old: str):
"""
Renames the DataFrame class column to generalize the algorithms
:param df: DataFrame
:param class_name_old: Current class column name
"""
df.rename(columns={class_name_old: NEW_CLASS_NAME}, inplace=True)
def binarize_y(y: pd.Series) -> Tuple[np.ndarray, int]:
"""
Genera un arreglo de binarios indicando la clase
:param y: Arreglo con la clase
:return: Arreglo categorico binario
"""
classes = y.unique()
return label_binarize(y, classes=classes).ravel(), classes.shape[0]
def get_columns_by_categorical(columns_index: np.ndarray, df: pd.DataFrame) -> pd.DataFrame:
"""
Obtiene las columnas a partir de un arrego categorico
:param columns_index: Numpy Array with a {0, 1} in the column index to indicate absence/presence of the column
:param df: DataFrame to retrieve the columns data
:return: DataFrame with only the specified columns
"""
non_zero_idx = np.nonzero(columns_index)
return df.iloc[:, non_zero_idx[0]]
def get_columns_from_df(columns_list: np.array, df: pd.DataFrame) -> pd.DataFrame:
"""Devuelve un conjunto de columnas de un DataFrame. La utilidad de este metodo es que funciona
para indices categoricos o strings"""
if np.issubdtype(columns_list.dtype, np.number):
# Obtengo por indices enteros
return get_columns_by_categorical(columns_list, df)
# Obtengo por string/label de columna
return df[columns_list]