[deb8e5]: / BioAid / paralleltools.py

Download this file

176 lines (140 with data), 7.4 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# This code was developed and authored by Jerzy Twarowski in Malkova Lab at the University of Iowa
# Contact: jerzymateusz-twarowski@uiowa.edu, tvarovski1@gmail.com
import os
import argparse
import logging
import subprocess
import multiprocessing
from math import ceil
import pandas as pd
def splitDF(df: pd.DataFrame, slices_num: int, savecsv_path_root=None) -> list:
"""
Splits a pandas DataFrame into n=slices_num slices and saves each slice to a CSV file if a path is provided.
Returns a list of the DataFrame slices.
Args:
df (pandas.DataFrame): The DataFrame to be split.
slices_num (int): The number of slices to create.
savecsv_path_root (str): Optional. The root path to save the CSV files. If not provided, the slices are not saved.
Returns:
list: A list of the DataFrame slices.
"""
slice_size = ceil(len(df) / slices_num)
slices_out = []
for slice in range(slices_num):
df_slice = df[slice*slice_size:(slice+1)*slice_size].copy()
if savecsv_path_root:
df_slice_name = f"{savecsv_path_root[0]}_{slice}.csv"
df_slice.to_csv(df_slice_name, index=False)
logging.info(f'Slice {slice} saved to {df_slice_name}')
slices_out.append(df_slice)
logging.info(f'Input df split into {slices_num} slices.')
return slices_out
def joinSlices(slices_num: int, slice_path_root: str, savecsv_path=None) -> pd.DataFrame:
"""
Combines the results from each slice into one pandas DataFrame and saves it to a CSV file if a path is provided.
Returns the combined DataFrame.
Args:
slices_num (int): The number of slices to combine.
slice_path_root (str): The root path to the CSV files for each slice.
*savecsv_path (str): Optional. The path to save the combined DataFrame as a CSV file.
Returns:
pandas.DataFrame: The combined DataFrame.
"""
df = pd.DataFrame()
for slice in range(slices_num):
df_slice = pd.read_csv(f"{slice_path_root}_{slice}.csv")
df = pd.concat([df, df_slice], ignore_index=True)
if savecsv_path:
df.to_csv(savecsv_path[0], index=False)
logging.info(f'Slices joined! Results saved to {savecsv_path[0]}')
return df
def cleanUpSlices(slices_num: int, slice_path_root: str) -> None:
"""
Deletes the CSV files for each slice of a DataFrame that was split using the `split_df` function.
Args:
slices_num (int): The number of slices that were created.
slice_path_root (str): The root path to the CSV files for each slice.
Returns:
None
"""
logging.info('Combining results from each slice...')
for slice in range(slices_num):
os.remove(f"{slice_path_root}_{slice}.csv")
logging.info(f'Clean up Finished! Deleted slice files.')
def runScriptSubprocess(args: tuple[str, str]) -> None:
"""
Runs a Python script in a subprocess using the provided CSV file as input.
Args:
args (tuple): A tuple containing the path to the CSV file and the path to the Python script.
Returns:
None
"""
csv_slice, script_path = args
subprocess.call(['python', script_path, csv_slice])
def runMainPool(script_path: str, results_df_path: str, num_processes: int = 8, joint_name: str = 'all') -> None:
"""
Runs a Python script in parallel on a pandas DataFrame using the provided CSV file as input.
Args:
script_path (str): The path to the Python script to be run.
results_df_path (str): The path to the CSV file containing the input DataFrame.
num_processes (int): Optional. The number of processes to use for parallelization. Default is 8.
joint_name (str): Optional. The name of the output CSV file containing the combined results. Default is 'all'.
Returns:
None
"""
with open(results_df_path, 'r') as f:
df = pd.read_csv(f)
results_df_name = os.path.basename(results_df_path)[:-4]
# split the df into n=num_processes slices and save each slice to a csv file
logging.info(f'Splitting {results_df_name} into {num_processes} slices...')
splitDF(df, num_processes, results_df_name)
# run the script in parallel on each slice
logging.info(f'Running {script_path} in parallel on {num_processes} slices...')
pool = multiprocessing.Pool(processes=num_processes)
pool.map(runScriptSubprocess, [(f"{results_df_name}_{slice}.csv", script_path) for slice in range(num_processes)])
pool.close()
pool.join()
logging.info(f'Finished {num_processes} processes...')
# join the results from each slice into one DataFrame and save it to a CSV file
joint_path = f'{results_df_name}_{joint_name}.csv'
logging.info(f'Joining results from {num_processes} slices into {joint_path}...')
joinSlices(num_processes, results_df_name, joint_path)
# clean up the slice files
logging.info(f'Cleaning up slice files for {results_df_name}...')
cleanUpSlices(num_processes, results_df_name)
logging.info(f'Finished processing {results_df_name}.')
def parseArguments() -> argparse.Namespace:
"""
Parses command line arguments.
Returns:
Parsed arguments
"""
program_description = 'This program runs a Python script in parallel on a pandas DataFrame using \
the provided CSV file as input. The CSV file must contain a header row. \
Caution: The CSV file will be split into n=num_processes slices and each slice \
will be saved to a CSV file. The slice files will be deleted after the script \
is finished. Make sure that your input file can be divided into sub-files \
without losing data...'
parser = argparse.ArgumentParser(description=program_description)
args_dict = {
'-v': {'name': '--version', 'action': 'version', 'version': '%(prog)s 1.0'},
'-t': {'name': '--target', 'type': str, 'default': 'myscript.py', 'help': 'Path to the Python script to be run'},
'-r': {'name': '--results_df_path', 'type': str, 'default': 'results_df.csv', 'help': 'Path to the results dataframe to be processed'},
'-p': {'name': '--num_processes', 'type': int, 'default': 8, 'help': 'Number of processes (cores) to use'}
}
for arg, properties in args_dict.items():
parser.add_argument(arg, properties['name'], **{key: value for key, value in properties.items() if key != 'name'})
return parser.parse_args()
def main() -> None:
"""
This function is the entry point of the program. It sets up logging, and calls the function to run the main pool of processes.
Returns:
None
"""
args = parseArguments()
logging.info(f'Running {args.target} on {args.results_df_path} in parallel on {args.num_processes} processes...')
runMainPool(args.target, args.results_df_path, num_processes=args.num_processes)
logging.info(f'Finished running {args.target} on {args.results_df_path} in parallel on {args.num_processes} processes...')
logging.info(f'Output saved to {args.results_df_path[:-4]}_all.csv')
if __name__ == "__main__":
main()