--- a +++ b/exseek/scripts/machine_learning.py @@ -0,0 +1,467 @@ +#! /usr/bin/env python +import argparse, sys, os, errno +import logging +logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] %(name)s: %(message)s') +logger = logging.getLogger('machine_learning') +import json +from tqdm import tqdm + +command_handlers = {} +def command_handler(f): + command_handlers[f.__name__] = f + return f + +def read_data_matrix(matrix, sample_classes, features=None, transpose=False, positive_class=None, negative_class=None): + import pandas as pd + import numpy as np + # read data matrix + logger.info('read data matrix: ' + matrix) + X = pd.read_table(matrix, index_col=0, sep='\t') + # transpose + if transpose: + logger.info('transpose feature matrix') + X = X.T + if features is not None: + logger.info('read subset of feature names from: ' + features) + features = pd.read_table(features, header=None).iloc[:, 0].values + logger.info('select {} features'.format(len(features))) + X = X.reindex(columns=features) + is_na_features = X.isna().any(axis=0) + na_features = X.columns.values[is_na_features] + if na_features.shape[0] > 0: + logger.warn('missing features found in matrix file: {}'.format(na_features[:10].tolist())) + #X = X.loc[:, ~is_na_features] + #raise ValueError('some given features are not found in matrix file: {}'.format(na_features[:10].tolist())) + logger.info('number of features: {}'.format(X.shape[1])) + # read sample classes + logger.info('read sample classes: ' + sample_classes) + sample_classes = pd.read_table(sample_classes, index_col=0, sep='\t') + sample_classes = sample_classes.iloc[:, 0] + sample_classes = sample_classes.loc[X.index.values] + logger.info('sample_classes: {}'.format(sample_classes.shape[0])) + + # get positive and negative classes + if (positive_class is not None) and (negative_class is not None): + #positive_class = positive_class.split(',') + #negative_class = negative_class.split(',') + pass + else: + unique_classes = np.unique(sample_classes.values) + if len(unique_classes) != 2: + raise ValueError('expect 2 classes but {} classes found'.format(len(unique_classes))) + positive_class, negative_class = unique_classes + positive_class = np.atleast_1d(positive_class) + negative_class = np.atleast_1d(negative_class) + # select positive samples and negative samples + logger.info('positive class: {}'.format(positive_class)) + logger.info('negative class: {}'.format(negative_class)) + X_pos = X.loc[sample_classes[sample_classes.isin(positive_class)].index.values] + X_neg = X.loc[sample_classes[sample_classes.isin(negative_class)].index.values] + logger.info('number of positive samples: {}, negative samples: {}, class ratio: {}'.format( + X_pos.shape[0], X_neg.shape[0], float(X_pos.shape[0])/X_neg.shape[0])) + X = pd.concat([X_pos, X_neg], axis=0) + # set negative class to 0 and positive class to 1 + y = np.zeros(X.shape[0], dtype=np.int32) + y[:X_pos.shape[0]] = 1 + del X_pos + del X_neg + n_samples, n_features = X.shape + sample_ids = X.index.values + feature_names = X.columns.values + X = X.values + + return X, y, sample_ids, feature_names + +def search_params_in_args(args, prefix): + params = {} + for key, val in args.items(): + if key.startswith(prefix) and (val is not None): + params[key[len(prefix):]] = val + return params + +@command_handler +def cross_validation1(args): + from estimators2 import search_dict, CollectMetrics, CollectPredictions, CollectTrainIndex, FeatureSelectionMatrix,\ + CombinedEstimator, get_features_from_pipeline, parse_params + from estimators2 import cross_validation as _cross_validation + import pandas as pd + import numpy as np + import h5py + import pickle + import yaml + + # read input data matrix + X, y, sample_ids, feature_names = read_data_matrix(args.matrix, args.sample_classes, + **search_dict(vars(args), ('features', 'transpose', 'positive_class', 'negative_class'))) + #has_missing_features = np.any(np.isnan(X)) + #if has_missing_features: + # fill missing features with 0 + # X[np.isnan(X)] = 0 + # remove missing features + X = X[:, np.all(~np.isnan(X), axis=0)] + if X.shape[0] < 5: + raise ValueError('too few samples for machine learning') + if not os.path.isdir(args.output_dir): + logger.info('create output directory: ' + args.output_dir) + os.makedirs(args.output_dir) + # read other input files + logger.info('save class labels to: ' + os.path.join(args.output_dir, 'classes.txt')) + pd.Series(y).to_csv(os.path.join(args.output_dir, 'classes.txt'), header=False, index=False) + logger.info('save feature names to: ' + os.path.join(args.output_dir, 'feature_names.txt')) + pd.Series(feature_names).to_csv(os.path.join(args.output_dir, 'feature_names.txt'), header=False, index=False) + logger.info('save sample ids to: ' + os.path.join(args.output_dir, 'samples.txt')) + pd.Series(sample_ids).to_csv(os.path.join(args.output_dir, 'samples.txt'), header=False, index=False) + # read configuration file + config = {} + if args.config is not None: + logger.info('read configuration file: ' + args.config) + with open(args.config, 'r') as f: + config = yaml.load(f) + # overwride config with command-line pamameters + argdict = vars(args) + config.update(search_dict(argdict, ( + 'zero_fraction_filter', 'zero_fraction_filter_params', + 'rpm_filter', 'rpm_filter_params', + 'rpkm_filter', 'rpkm_filter_params', + 'fold_change_filter', 'fold_change_filter_params', + 'diffexp_filter', 'diffexp_filter_params', + 'log_transform','log_transform_params', + 'scaler', 'scaler_params', + 'selector', 'selector_params', 'n_features_to_select', + 'classifier', 'classifier_params', + 'grid_search', 'grid_search_params' + ))) + + + + for key in ('rpkm_filter_params', 'rpm_filter_params', 'fold_change_filter_params', + 'zero_fraction_filter_params', 'log_transform_params', 'diffexp_filter_params', + 'scaler_params', 'classifier_params', 'selector_params', 'grid_search_params'): + config[key] = parse_params(argdict[key]) + # set temp_dir for diffexp_selector + if args.diffexp_filter: + config['diffexp_filter_params']['temp_dir'] = os.path.join(args.output_dir, 'diffexp') + logger.info('set temp_dir of diffexp_filter: {}'.format(config['diffexp_filter_params']['temp_dir'])) + logger.info('build combined estimator') + estimator = CombinedEstimator(**config) + + logger.info('start cross-validation') + collect_metrics = CollectMetrics() + collect_predictions = CollectPredictions() + collect_train_index = CollectTrainIndex() + cv_callbacks = [collect_metrics, collect_predictions, collect_train_index] + if args.selector is not None: + feature_selection_matrix = FeatureSelectionMatrix() + cv_callbacks.append(feature_selection_matrix) + cv_params = parse_params(args.cv_params) + if args.sample_weight is not None: + if args.sample_weight == 'auto': + sample_weight = 'auto' + else: + sample_weight = pd.read_table(args.sample_weight, header=None, index_col=0).iloc[:, 0] + else: + sample_weight = None + _cross_validation(estimator, X, y, sample_weight=sample_weight, params=cv_params, callbacks=cv_callbacks) + logger.info('collect_metrics:') + #print(cv_callbacks[0].get_metrics()) + logger.info('fit estimator on full dataset') + estimator.fit(X, y) + logger.info('save final model to: ' + os.path.join(args.output_dir, 'final_model.pkl')) + with open(os.path.join(args.output_dir, 'final_model.pkl'), 'wb') as f: + pickle.dump(estimator, f) + logger.info('classifier params: {}'.format(estimator.classifier_.get_params())) + if args.selector is not None: + feature_index = estimator.features_ + logger.info('number of selected features: {}'.format(feature_index.shape[0])) + logger.info('save features to: ' + os.path.join(args.output_dir, 'features.txt')) + pd.Series(feature_names[feature_index]).to_csv(os.path.join(args.output_dir, 'features.txt'), index=False, header=False) + logger.info('save feature importances to: ' + os.path.join(args.output_dir, 'feature_importances.txt')) + pd.Series(estimator.feature_importances_, index=feature_names[feature_index])\ + .to_csv(os.path.join(args.output_dir, 'feature_importances.txt'), sep='\t', header=False, index=True) + #logger.info('save feature selection matrix to: ' + os.path.join(args.output_dir, 'feature_selection_matrix.txt')) + #m = pd.DataFrame(feature_selection_matrix.get_matrix(), columns=feature_names) + #m.columns.name = 'feature' + #m.T.to_csv(os.path.join(args.output_dir, 'feature_selection_matrix.txt'), sep='\t', header=True, index=False) + + metrics = collect_metrics.get_metrics() + for name in ('train', 'test'): + logger.info('save metrics to: ' + os.path.join(args.output_dir, 'metrics.{}.txt'.format(name))) + # if there are missing features, set metrics to NA + metrics[name].to_csv(os.path.join(args.output_dir, 'metrics.{}.txt'.format(name)), header=True, index=True, na_rep='NA', sep='\t') + + logger.info('save cross-validation details to: ' + os.path.join(args.output_dir, 'cross_validation.h5')) + with h5py.File(os.path.join(args.output_dir, 'cross_validation.h5'), 'w') as f: + f.create_dataset('labels', data=y) + f.create_dataset('predicted_labels', data=collect_predictions.get_pred_labels()) + f.create_dataset('predictions', data=collect_predictions.get_pred_probs()) + f.create_dataset('train_index', data=collect_train_index.get_train_index()) + if args.selector is not None: + f.create_dataset('feature_selection', data=feature_selection_matrix.get_matrix()) + +@command_handler +def run_pipeline(args): + from estimators2 import search_dict, CollectMetrics, CollectPredictions, CollectTrainIndex, FeatureSelectionMatrix,\ + CombinedEstimator, get_features_from_pipeline, parse_params + from estimators2 import cross_validation as _cross_validation + from sklearn.utils.class_weight import compute_sample_weight + import pandas as pd + import numpy as np + import h5py + import pickle + import yaml + + logger.info('read configuration file: ' + args.config) + with open(args.config, 'r') as f: + config = yaml.load(f) + # overwride config variables with command line arguments + if args.matrix is not None: + config['matrix'] = args.matrix + if args.sample_classes is not None: + config['sample_classes'] = args.sample_classes + if args.positive_class is not None: + config['positive_class'] = args.positive_class.split(',') + if args.negative_class is not None: + config['negative_class'] = args.negative_class.split(',') + if args.features is not None: + config['features'] = args.features + + # read input data matrix + X, y, sample_ids, feature_names = read_data_matrix( + config['matrix'], config['sample_classes'], + **search_dict(config, ('features', 'transpose', 'positive_class', 'negative_class'))) + + # fill missing features + X = np.nan_to_num(X) + #X = X[:, np.all(~np.isnan(X), axis=0)] + if X.shape[0] < 5: + raise ValueError('too few samples for machine learning') + if not os.path.isdir(args.output_dir): + logger.info('create output directory: ' + args.output_dir) + os.makedirs(args.output_dir) + # read other input files + logger.info('save class labels to: ' + os.path.join(args.output_dir, 'classes.txt')) + pd.Series(y).to_csv(os.path.join(args.output_dir, 'classes.txt'), header=False, index=False) + logger.info('save feature names to: ' + os.path.join(args.output_dir, 'feature_names.txt')) + pd.Series(feature_names).to_csv(os.path.join(args.output_dir, 'feature_names.txt'), header=False, index=False) + logger.info('save sample ids to: ' + os.path.join(args.output_dir, 'samples.txt')) + pd.Series(sample_ids).to_csv(os.path.join(args.output_dir, 'samples.txt'), header=False, index=False) + + # set temp_dir for diffexp_selector + preprocess_steps = {} + for step_dict in config['preprocess_steps']: + step_tuple = tuple(step_dict.items())[0] + preprocess_steps[step_tuple[0]] = step_tuple[1] + preprocess_steps[step_tuple[0]]['params'] = step_tuple[1].get('params', {}) + if step_tuple[1]['name'] in ('DiffExpFilter', 'SIS'): + temp_dir = os.path.join(args.output_dir, 'tmp') + preprocess_steps[step_tuple[0]]['params']['temp_dir'] = temp_dir + logger.info('set temp_dir of {} to {}'.format(step_tuple[1]['name'], temp_dir)) + + logger.info('build combined estimator') + estimator = CombinedEstimator(config) + + # add callbacks for cross-validation + collect_metrics = CollectMetrics() + collect_predictions = CollectPredictions() + collect_train_index = CollectTrainIndex() + cv_callbacks = [collect_metrics, collect_predictions, collect_train_index] + # output feature selection if selector are found + has_selector = False + for step in preprocess_steps.values(): + if step['type'] == 'selector': + has_selector = True + if has_selector: + logger.info('add cross-validation callback: FeatureSelectionMatrix') + feature_selection_matrix = FeatureSelectionMatrix() + cv_callbacks.append(feature_selection_matrix) + # get feature weight + if config.get('sample_weight') is not None: + if config['sample_weight'] == 'balanced': + logger.info('compute sample weight from class ratio') + sample_weight = 'balanced' + else: + logger.info('read sample weight from file: ' + config['sample_weight']) + sample_weight = pd.read_table( + config['sample_weight'], header=None, index_col=0).iloc[:, 0] + else: + sample_weight = None + logger.info('start cross-validation') + _cross_validation(estimator, X, y, sample_weight=sample_weight, + params=config['cv_params'], callbacks=cv_callbacks) + logger.info('collect_metrics:') + #print(cv_callbacks[0].get_metrics()) + logger.info('fit estimator on full dataset') + if config.get('sample_weight') == 'balanced': + sample_weight = compute_sample_weight(class_weight='balanced', y=y) + estimator.fit(X, y, sample_weight=sample_weight) + logger.info('save final model to: ' + os.path.join(args.output_dir, 'final_model.pkl')) + with open(os.path.join(args.output_dir, 'final_model.pkl'), 'wb') as f: + pickle.dump(estimator, f) + logger.info('classifier params: {}'.format(estimator.classifier_.get_params())) + if has_selector: + feature_index = estimator.features_ + logger.info('number of selected features: {}'.format(feature_index.shape[0])) + logger.info('save features to: ' + os.path.join(args.output_dir, 'features.txt')) + pd.Series(feature_names[feature_index]).to_csv(os.path.join(args.output_dir, 'features.txt'), index=False, header=False) + logger.info('save feature importances to: ' + os.path.join(args.output_dir, 'feature_importances.txt')) + pd.Series(estimator.feature_importances_, index=feature_names[feature_index])\ + .to_csv(os.path.join(args.output_dir, 'feature_importances.txt'), sep='\t', header=False, index=True) + #logger.info('save feature selection matrix to: ' + os.path.join(args.output_dir, 'feature_selection_matrix.txt')) + #m = pd.DataFrame(feature_selection_matrix.get_matrix(), columns=feature_names) + #m.columns.name = 'feature' + #m.T.to_csv(os.path.join(args.output_dir, 'feature_selection_matrix.txt'), sep='\t', header=True, index=False) + + metrics = collect_metrics.get_metrics() + for name in ('train', 'test'): + logger.info('save metrics to: ' + os.path.join(args.output_dir, 'metrics.{}.txt'.format(name))) + # if there are missing features, set metrics to NA + metrics[name].to_csv(os.path.join(args.output_dir, 'metrics.{}.txt'.format(name)), header=True, index=True, na_rep='NA', sep='\t') + + logger.info('save cross-validation details to: ' + os.path.join(args.output_dir, 'cross_validation.h5')) + with h5py.File(os.path.join(args.output_dir, 'cross_validation.h5'), 'w') as f: + f.create_dataset('labels', data=y) + f.create_dataset('predicted_labels', data=collect_predictions.get_pred_labels()) + f.create_dataset('predictions', data=collect_predictions.get_pred_probs()) + f.create_dataset('train_index', data=collect_train_index.get_train_index()) + #print(feature_selection_matrix.get_matrix()) + if has_selector: + f.create_dataset('feature_selection', data=feature_selection_matrix.get_matrix()) + +def predict(args): + import pandas as pd + import numpy as np + + logger.info('read data matrix: ' + matrix) + X = pd.read_table(matrix, index_col=0, sep='\t') + # transpose + if transpose: + logger.info('transpose feature matrix') + X = X.T + model_file = os.path.join(args.model_dir, 'final_model.pkl') + logger.info('load model: ' + model_file) + with open(model_file, 'rb') as f: + model = pickle.load(f) + logger.info('run model') + predicted_scores = model.predict(X.values) + logger.info('save predictions to file: ' + args.output_file) + output_df = pd.DataFrame({'scores': predicted_scores[:, 1]}) + output_df.index = X.index.values + output_df.index.name = 'sample_id' + output_df.to_csv(args.output_file, sep='\t', na_rep='NA') + +if __name__ == '__main__': + main_parser = argparse.ArgumentParser(description='Machine learning module') + subparsers = main_parser.add_subparsers(dest='command') + + parser = subparsers.add_parser('cross_validation') + g_input = parser.add_argument_group('input') + g_input.add_argument('--matrix', '-i', type=str, metavar='FILE', required=True, + help='input feature matrix (rows are samples and columns are features') + g_input.add_argument('--sample-classes', type=str, metavar='FILE', required=True, + help='input file containing sample classes with 2 columns: sample_id, sample_class') + g_input.add_argument('--positive-class', type=str, metavar='STRING', + help='comma-separated list of sample classes to use as positive class') + g_input.add_argument('--negative-class', type=str, metavar='STRING', + help='comma-separates list of sample classes to use as negative class') + g_input.add_argument('--transpose', action='store_true', default=False, + help='transpose the feature matrix') + g_input.add_argument('--features', type=str, metavar='FILE', + help='input file containing subset of feature names') + g_input.add_argument('--config', '-c', type=str, metavar='FILE', + help='configuration file of parameters in YAML format') + + g_filter = parser.add_argument_group('filter') + g_filter.add_argument('--zero-fraction-filter', action='store_true') + #g_filter.add_argument('--zero-fraction-filter-threshold', type=float, metavar='NUMBER') + g_filter.add_argument('--zero-fraction-filter-params', type=str, metavar='STRING') + g_filter.add_argument('--rpkm-filter', action='store_true') + #g_filter.add_argument('--rpkm-filter-threshold', type=float, metavar='NUMBER') + g_filter.add_argument('--rpkm-filter-params', type=str, metavar='STRING') + g_filter.add_argument('--rpm-filter', action='store_true') + #g_filter.add_argument('--rpm-filter-threshold', type=float, metavar='NUMBER') + g_filter.add_argument('--rpm-filter-params', type=str, metavar='STRING') + g_filter.add_argument('--fold-change-filter', action='store_true') + #g_filter.add_argument('--fold-change-filter-direction', type=str, default='any', metavar='STRING') + g_filter.add_argument('--fold-change-filter-params', type=str, metavar='STRING') + g_filter.add_argument('--diffexp-filter', action='store_true') + g_filter.add_argument('--diffexp-filter-params', type=str, metavar='STRING') + + g_scaler = parser.add_argument_group('scaler') + g_scaler.add_argument('--log-transform', action='store_true') + #g_scaler.add_argument('--log-transform-base', type=float, metavar='NUMBER') + g_scaler.add_argument('--log-transform-params', type=str, metavar='STRING') + g_scaler.add_argument('--scaler', type=str, metavar='NAME') + g_scaler.add_argument('--scaler-params', type=str, metavar='STRING') + + g_select = parser.add_argument_group('feature_selection') + g_select.add_argument('--selector', type=str, metavar='NAME') + g_select.add_argument('--selector-params', type=str, metavar='STRING') + g_select.add_argument('--n-features-to-select', type=int, metavar='INTEGER') + + g_classifier = parser.add_argument_group('classifier') + g_classifier.add_argument('--classifier', type=str, metavar='NAME', default='random_forest') + g_classifier.add_argument('--classifier-params', type=str, metavar='STRING') + + g_cv = parser.add_argument_group('cross_validation') + g_cv.add_argument('--cv-params', type=str, metavar='STRING', nargs='?') + g_cv.add_argument('--grid-search', action='store_true') + g_cv.add_argument('--grid-search-params', type=str, metavar='STRING') + + g_misc= parser.add_argument_group('misc') + g_misc.add_argument('--sample-weight', type=str, default='auto', + help='''sample weight to balance classes. + Compute from data if set to "auto". + Can be a tab-separated file with two columns (no header): sample_id, weight. + No sample weight if set to "none".''') + + g_output= parser.add_argument_group('output') + g_output.add_argument('--output-dir', '-o', type=str, metavar='DIR', + required=True, help='output directory') + + parser = subparsers.add_parser('run_pipeline') + parser.add_argument('--matrix', '-i', type=str, metavar='FILE', required=True, + help='input feature matrix (rows are samples and columns are features') + parser.add_argument('--sample-classes', type=str, metavar='FILE', required=True, + help='input file containing sample classes with 2 columns: sample_id, sample_class') + parser.add_argument('--positive-class', type=str, metavar='STRING', + help='comma-separated list of sample classes to use as positive class') + parser.add_argument('--negative-class', type=str, metavar='STRING', + help='comma-separates list of sample classes to use as negative class') + parser.add_argument('--features', type=str, metavar='FILE', + help='input file containing subset of feature names') + parser.add_argument('--config', '-c', type=str, metavar='FILE', required=True, + help='configuration file of parameters in YAML format') + parser.add_argument('--output-dir', '-o', type=str, metavar='DIR', + required=True, help='output directory') + parser.add_argument('--log-level', type=str, default='INFO', + help='logging level') + + parser = subparsers.add_parser('predict') + parser.add_argument('--matrix', '-i', type=str, metavar='FILE', required=True, + help='input feature matrix (rows are samples and columns are features') + parser.add_argument('--model-dir', '-m', type=str, metavar='DIR', required=True, + help='directory generated by feature_selection') + parser.add_argument('--transpose', action='store_true', default=False, + help='transpose the input matrix') + parser.add_argument('--output-file', '-o', type=str, metavar='DIR', required=True, + help='output file') + + parser = subparsers.add_parser('generate_cv_splits') + parser.add_argument('--samples', '-i', type=str, required=True, + help='input file to determine number of samples by n_lines - 1') + parser.add_argument('--splitter', type=str, default='KFold') + parser.add_argument('--spliter-params', type=str, + help='parameters of the splitter') + parser.add_argument('--output-file', '-o', type=str, required=True, + help='output file') + + args = main_parser.parse_args() + if args.command is None: + print('Errror: missing command', file=sys.stdout) + main_parser.print_help() + sys.exit(1) + + import pandas as pd + import numpy as np + + command_handlers.get(args.command)(args) \ No newline at end of file