from sklearn.model_selection import GridSearchCV, train_test_split
# from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import MinMaxScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import f1_score, accuracy_score
from sklearn.metrics import confusion_matrix
from joblib import dump, load
import numpy as np
import pandas as pd
from time import strftime, localtime
from .data import DataSet
from .plot import plot_confusion_matrix
@DeprecationWarning
def lower_bound(cv_results: dict):
"""
Calculate the lower bound within 1 standard deviation
of the best `mean_test_scores`.
Author: Wenhao Zhang <wenhaoz@ucla.edu>
Args:
cv_results: dict of numpy(masked) ndarrays
See attribute cv_results_ of `GridSearchCV`
Returns: float
Lower bound within 1 standard deviation of the
best `mean_test_score`.
"""
best_score_idx = np.argmax(cv_results['mean_test_score'])
return (cv_results['mean_test_score'][best_score_idx]
- cv_results['std_test_score'][best_score_idx])
@DeprecationWarning
def best_low_complexity(cv_results: dict):
"""
Balance model complexity with cross-validated score.
Author: Wenhao Zhang <wenhaoz@ucla.edu>
Args:
cv_results: dict of numpy(masked) ndarrays
See attribute cv_results_ of `GridSearchCV`.
Returns:int
Index of a model that has the fewest PCA components
while has its test score within 1 standard deviation of the best
`mean_test_score`.
"""
threshold = lower_bound(cv_results)
candidate_idx = np.flatnonzero(cv_results['mean_test_score'] >= threshold)
best_idx = candidate_idx[
cv_results['param_reduce_dim__n_components'][candidate_idx].argmin()
]
return best_idx
[文档]def grid_search_and_result(
Xtrain: pd.DataFrame,
ytrain: pd.Series,
Xtest: pd.DataFrame,
ytest: pd.Series,
pipe: Pipeline,
grid: dict,
log_dir: str,
score=None,
verbose: int = 2,
k: int = 5,
fit_params: dict = None):
"""
交叉验证网格搜索,测试集和训练集得分,混淆矩阵和ROC曲线绘制
Args:
Xtrain: 训练集特征
ytrain: 训练集标签
Xtest: 测试集特征
ytest: 测试集标签
pipe: 模型管道
grid: 超参数搜索空间
log_dir: 训练结果输出目录,注意一定要先创建该目录
score: 评分指标,默认使用f1和acc,最后用f1 refit
verbose: 日志级别,0为静默
k: 交叉验证折数
fit_params: 训练时参数
Returns: 训练好的GridSearchCV模型
"""
scoring = score
if scoring is None:
scoring = {
'f1': 'f1_macro',
'accuracy': 'accuracy'
}
gsCV = GridSearchCV(
estimator=pipe,
cv=k, n_jobs=-1,
param_grid=grid,
scoring=scoring,
refit='f1',
verbose=verbose
)
if fit_params is None:
gsCV.fit(Xtrain, ytrain)
else:
gsCV.fit(Xtrain, ytrain, **fit_params)
dump(gsCV, log_dir + '/gsCV')
dump(gsCV.best_estimator_, log_dir + '/best_model')
file_prefix = log_dir + '/' + strftime("%Y_%m_%d_%H_%M_%S", localtime())
file = open(file_prefix + '.log.txt', 'x')
if verbose > 2:
file.write(gsCV.cv_results_.__str__())
if verbose:
file.write(gsCV.get_params().__str__())
file.write('\nBest score on training set by grid search cross validation: {}\n'
.format(gsCV.score(Xtrain, ytrain)))
best_model = load(log_dir + '/best_model')
test_prediction = best_model.predict(Xtest)
file.write('Accuracy on test set: {}\n'.format(accuracy_score(ytest, test_prediction)))
file.write('F1-score on test set: {}\n'.format(f1_score(ytest, test_prediction, average='macro')))
if verbose:
cm = confusion_matrix(ytrain, best_model.predict(Xtrain))
plot_confusion_matrix(cm, ['特别困难', '一般困难', '不困难'], file_prefix + '_train_cm.png')
file.write('\ntrain_cm:\n')
file.write(cm.__str__())
cm = confusion_matrix(ytest, test_prediction)
plot_confusion_matrix(cm, ['特别困难', '一般困难', '不困难'], file_prefix + '_test_cm.png')
file.write('\ntest_cm:\n')
file.write(cm.__str__())
# plot_roc(best_model, Xtest, ytest, file_prefix + '_roc.png')
file.close()
return gsCV
[文档]def do_decision_tree(dataset: DataSet, log_dir: str = '../log', grid: dict = None):
"""
训练决策树
Args:
grid:超参数搜索空间的网格,不填则使用默认搜索空间
dataset:输入数据集,将会按照0.7, 0.3比例分为训练集和测试集
log_dir:输出结果文件的目录
Returns:返回训练好的GridSearchCV模型
"""
from sklearn.tree import DecisionTreeClassifier
if grid is None:
grid = {
'dt__criterion': ['gini', 'entropy'],
'dt__max_features': ['auto', 'sqrt', 'log2'],
'dt__class_weight': [None, 'balanced'],
'dt__ccp_alpha': [0.0, 0.1],
'dt__min_impurity_decrease': [0., 0.01],
'dt__min_samples_leaf': [1, 5],
'dt__min_samples_split': [2, 8],
}
pipe = Pipeline([
('scaler', MinMaxScaler()),
('dt', DecisionTreeClassifier())
])
Xtrain, Xtest, ytrain, ytest = train_test_split(dataset.features, dataset.label, train_size=0.7)
return grid_search_and_result(Xtrain, ytrain, Xtest, ytest, pipe, grid, log_dir)
[文档]def do_random_forest(dataset: DataSet, log_dir: str = '../log', grid: dict = None):
"""
训练随机森林
Args:
grid:超参数搜索空间的网格,不填则使用默认搜索空间
dataset:输入数据集,将会按照0.7, 0.3比例分为训练集和测试集
log_dir:输出结果文件的目录
Returns:返回训练好的GridSearchCV模型
"""
from sklearn.ensemble import RandomForestClassifier
if grid is None:
# raw grid
# grid = {
# 'rf__criterion': ['gini', 'entropy'],
# 'rf__n_estimators': [100, 300, 600, 800, 1200],
# 'rf__min_samples_split': [2, 5], # 这里数字是随机给的,无根据
# 'rf__min_samples_leaf': [1, 4], # 这里数字是随机给的,无根据
# 'rf__bootstrap': [True, False],
# 'rf__min_impurity_decrease': [0., 0.01, 0.1],
# 'rf__class_weight': ['balanced', 'balanced_subsample', None],
# 'rf__warm_start': [True, False],
# 'rf__oob_score': [True, False],
# 'rf__ccp_alpha': [0., 0.1, 0.5]
# }
# fine grid
grid = {
'rf__criterion': ['gini', 'entropy'],
'rf__n_estimators': [80, 100, 150, 200, 500],
'rf__min_samples_split': [1, 2], # 这里数字是随机给的,无根据
'rf__min_samples_leaf': [1, 4], # 这里数字是随机给的,无根据
'rf__min_impurity_decrease': [0., 0.01, 0.1],
'rf__warm_start': [True, False],
'rf__oob_score': [True, False],
'rf__ccp_alpha': [0., 0.1, 0.001]
}
pipe = Pipeline([
('scaler', MinMaxScaler()),
('rf', RandomForestClassifier(max_depth=None, n_jobs=-1))
])
Xtrain, Xtest, ytrain, ytest = train_test_split(dataset.features, dataset.label, train_size=0.7)
return grid_search_and_result(Xtrain, ytrain, Xtest, ytest, pipe, grid, log_dir)
[文档]def do_svm(dataset: DataSet, log_dir: str = '../log', grid: dict = None):
"""
训练支持向量机
Args:
grid:超参数搜索空间的网格,不填则使用默认搜索空间
dataset:输入数据集,将会按照0.7, 0.3比例分为训练集和测试集
log_dir:输出结果文件的目录
Returns:返回训练好的GridSearchCV模型
"""
from sklearn.svm import SVC
if grid is None:
# rough grid
# grid = {
# 'SVM__kernel': ['linear', 'rbf', 'poly', 'sigmoid'],
# 'SVM__C': [0.01, 0.1, 0.5, 1, 5, 10, 100],
# 'SVM__gamma': [0.0001, 0.001, 0.01, 'scale', 'auto'],
# 'SVM__degree': [3, 5],
# 'SVM__decision_function_shape': ['ovo', 'ovr'],
# 'SVM__class_weight': [None, 'balanced'],
# 'SVM__max_iter': [-1, 300],
# 'SVM__break_ties': [True, False],
# 'SVM__shrinking': [True, False]
# }
# fine grid
grid = {
'SVM__kernel': ['linear', 'rbf', 'poly'],
'SVM__C': [0.7, 0.8, 0.9, 0.95, 1, 1.05, 1.1, 1.2, 1.5, 2],
'SVM__degree': [2, 3, 4],
'SVM__gamma': [0.001, 'scale'],
'SVM__decision_function_shape': ['ovo', 'ovr'],
'SVM__break_ties': [True, False],
'SVM__tol': [1e-2, 1e-3, 1e-4, 1e-5]
}
pipe = Pipeline([
('scaler', MinMaxScaler()),
('SVM', SVC(cache_size=500))
])
Xtrain, Xtest, ytrain, ytest = train_test_split(dataset.features, dataset.label, train_size=0.7)
return grid_search_and_result(Xtrain, ytrain, Xtest, ytest, pipe, grid, log_dir)
[文档]def do_logistic(dataset: DataSet, log_dir: str = '../log', grid: dict = None):
"""
训练逻辑回归
Args:
grid:超参数搜索空间的网格,不填则使用默认搜索空间
dataset:输入数据集,将会按照0.7, 0.3比例分为训练集和测试集
log_dir:输出结果文件的目录
Returns:返回训练好的GridSearchCV模型
"""
from sklearn.linear_model import LogisticRegression
if grid is None:
grid = {
'Logistic__penalty': ['l1', 'l2', 'elasticnet', 'none'],
'Logistic__C': [0.0001, 0.001, 0.01, 0.1, 1, 2, 5, 10, 100, 1000], # 这里数字是随机给的,无根据
'Logistic__solver': ['lbfgs', 'liblinear', 'newton-cg', 'sag', 'saga'],
'Logistic__fit_intercept': [True, False],
'Logistic__dual': [True, False],
'Logistic__l1_ratio': [True, False],
'Logistic__warm_start': [True, False],
'Logistic__intercept_scaling': [0.01, 0.1, 0.5, 1, 2, 5, 10]
}
pipe = Pipeline([
('scaler', MinMaxScaler()),
('Logistic', LogisticRegression(n_jobs=-1, max_iter=500))
])
Xtrain, Xtest, ytrain, ytest = train_test_split(dataset.features, dataset.label, train_size=0.7)
return grid_search_and_result(Xtrain, ytrain, Xtest, ytest, pipe, grid, log_dir)
[文档]def do_naive_bayes(dataset: DataSet, log_dir: str = '../log', grid: dict = None):
"""
训练朴素贝叶斯
Args:
grid:超参数搜索空间的网格,不填则使用默认搜索空间
dataset:输入数据集,将会按照0.7, 0.3比例分为训练集和测试集
log_dir:输出结果文件的目录
Returns:返回训练好的GridSearchCV模型
"""
from sklearn.naive_bayes import GaussianNB
if grid is None:
grid = {
'NB__var_smoothing': [1e-10, 1e-9, 1e-8, 1e-6, 1e-4, 1e-2, 1],
}
pipe = Pipeline([
('scaler', MinMaxScaler()),
('NB', GaussianNB())
])
Xtrain, Xtest, ytrain, ytest = train_test_split(dataset.features, dataset.label, train_size=0.7)
return grid_search_and_result(Xtrain, ytrain, Xtest, ytest, pipe, grid, log_dir)
[文档]def do_xgb(dataset: DataSet, log_dir: str = '../log', grid: dict = None):
"""
训练Xgboost
Args:
grid:超参数搜索空间的网格,不填则使用默认搜索空间
dataset:输入数据集,将会按照0.7, 0.3比例分为训练集和测试集
log_dir:输出结果文件的目录
Returns:返回训练好的GridSearchCV模型
"""
from xgboost import XGBClassifier
if grid is None:
grid = {
'xgb__n_estimators': [80, 100, 150, 200, 400, 500, 600, 800],
'xgb__max_depth': [6, 8, 10, 15, 20],
'xgb__colsample_bytree': [0.8, 1],
'xgb__learning_rate': [0.01, 0.1, 0.3],
# 'xgb__n_estimators': [1]
}
train_param = {
'xgb__early_stopping_rounds': 100
}
pipe = Pipeline([
('scaler', MinMaxScaler()),
('xgb', XGBClassifier(
objective='multi:softmax',
n_jobs=-1,
booster='gbtree',
verbosity=2,
verbose=True
)
)
])
Xtrain, Xtest, ytrain, ytest = train_test_split(dataset.features, dataset.label, train_size=0.7)
gscv = grid_search_and_result(Xtrain, ytrain, Xtest, ytest, pipe, grid, log_dir)
best_model = gscv.best_estimator_
file = open(log_dir + '/feature.txt', 'a')
file.write('\nfeature importance\n')
file.write(best_model['xgb'].feature_importances_.__str__())
file.close()
return gscv