|
a |
|
b/BioAid/paralleltools.py |
|
|
1 |
# This code was developed and authored by Jerzy Twarowski in Malkova Lab at the University of Iowa |
|
|
2 |
# Contact: jerzymateusz-twarowski@uiowa.edu, tvarovski1@gmail.com |
|
|
3 |
|
|
|
4 |
import os |
|
|
5 |
import argparse |
|
|
6 |
import logging |
|
|
7 |
import subprocess |
|
|
8 |
import multiprocessing |
|
|
9 |
from math import ceil |
|
|
10 |
import pandas as pd |
|
|
11 |
|
|
|
12 |
def splitDF(df: pd.DataFrame, slices_num: int, savecsv_path_root=None) -> list: |
|
|
13 |
""" |
|
|
14 |
Splits a pandas DataFrame into n=slices_num slices and saves each slice to a CSV file if a path is provided. |
|
|
15 |
Returns a list of the DataFrame slices. |
|
|
16 |
|
|
|
17 |
Args: |
|
|
18 |
df (pandas.DataFrame): The DataFrame to be split. |
|
|
19 |
slices_num (int): The number of slices to create. |
|
|
20 |
savecsv_path_root (str): Optional. The root path to save the CSV files. If not provided, the slices are not saved. |
|
|
21 |
|
|
|
22 |
Returns: |
|
|
23 |
list: A list of the DataFrame slices. |
|
|
24 |
""" |
|
|
25 |
slice_size = ceil(len(df) / slices_num) |
|
|
26 |
slices_out = [] |
|
|
27 |
for slice in range(slices_num): |
|
|
28 |
df_slice = df[slice*slice_size:(slice+1)*slice_size].copy() |
|
|
29 |
|
|
|
30 |
if savecsv_path_root: |
|
|
31 |
df_slice_name = f"{savecsv_path_root[0]}_{slice}.csv" |
|
|
32 |
df_slice.to_csv(df_slice_name, index=False) |
|
|
33 |
logging.info(f'Slice {slice} saved to {df_slice_name}') |
|
|
34 |
|
|
|
35 |
slices_out.append(df_slice) |
|
|
36 |
|
|
|
37 |
logging.info(f'Input df split into {slices_num} slices.') |
|
|
38 |
return slices_out |
|
|
39 |
|
|
|
40 |
def joinSlices(slices_num: int, slice_path_root: str, savecsv_path=None) -> pd.DataFrame: |
|
|
41 |
""" |
|
|
42 |
Combines the results from each slice into one pandas DataFrame and saves it to a CSV file if a path is provided. |
|
|
43 |
Returns the combined DataFrame. |
|
|
44 |
|
|
|
45 |
Args: |
|
|
46 |
slices_num (int): The number of slices to combine. |
|
|
47 |
slice_path_root (str): The root path to the CSV files for each slice. |
|
|
48 |
*savecsv_path (str): Optional. The path to save the combined DataFrame as a CSV file. |
|
|
49 |
|
|
|
50 |
Returns: |
|
|
51 |
pandas.DataFrame: The combined DataFrame. |
|
|
52 |
""" |
|
|
53 |
df = pd.DataFrame() |
|
|
54 |
for slice in range(slices_num): |
|
|
55 |
df_slice = pd.read_csv(f"{slice_path_root}_{slice}.csv") |
|
|
56 |
df = pd.concat([df, df_slice], ignore_index=True) |
|
|
57 |
|
|
|
58 |
if savecsv_path: |
|
|
59 |
df.to_csv(savecsv_path[0], index=False) |
|
|
60 |
logging.info(f'Slices joined! Results saved to {savecsv_path[0]}') |
|
|
61 |
|
|
|
62 |
return df |
|
|
63 |
|
|
|
64 |
def cleanUpSlices(slices_num: int, slice_path_root: str) -> None: |
|
|
65 |
""" |
|
|
66 |
Deletes the CSV files for each slice of a DataFrame that was split using the `split_df` function. |
|
|
67 |
|
|
|
68 |
Args: |
|
|
69 |
slices_num (int): The number of slices that were created. |
|
|
70 |
slice_path_root (str): The root path to the CSV files for each slice. |
|
|
71 |
|
|
|
72 |
Returns: |
|
|
73 |
None |
|
|
74 |
""" |
|
|
75 |
logging.info('Combining results from each slice...') |
|
|
76 |
for slice in range(slices_num): |
|
|
77 |
os.remove(f"{slice_path_root}_{slice}.csv") |
|
|
78 |
logging.info(f'Clean up Finished! Deleted slice files.') |
|
|
79 |
|
|
|
80 |
def runScriptSubprocess(args: tuple[str, str]) -> None: |
|
|
81 |
""" |
|
|
82 |
Runs a Python script in a subprocess using the provided CSV file as input. |
|
|
83 |
|
|
|
84 |
Args: |
|
|
85 |
args (tuple): A tuple containing the path to the CSV file and the path to the Python script. |
|
|
86 |
|
|
|
87 |
Returns: |
|
|
88 |
None |
|
|
89 |
""" |
|
|
90 |
csv_slice, script_path = args |
|
|
91 |
subprocess.call(['python', script_path, csv_slice]) |
|
|
92 |
|
|
|
93 |
def runMainPool(script_path: str, results_df_path: str, num_processes: int = 8, joint_name: str = 'all') -> None: |
|
|
94 |
""" |
|
|
95 |
Runs a Python script in parallel on a pandas DataFrame using the provided CSV file as input. |
|
|
96 |
|
|
|
97 |
Args: |
|
|
98 |
script_path (str): The path to the Python script to be run. |
|
|
99 |
results_df_path (str): The path to the CSV file containing the input DataFrame. |
|
|
100 |
num_processes (int): Optional. The number of processes to use for parallelization. Default is 8. |
|
|
101 |
joint_name (str): Optional. The name of the output CSV file containing the combined results. Default is 'all'. |
|
|
102 |
|
|
|
103 |
Returns: |
|
|
104 |
None |
|
|
105 |
""" |
|
|
106 |
with open(results_df_path, 'r') as f: |
|
|
107 |
df = pd.read_csv(f) |
|
|
108 |
results_df_name = os.path.basename(results_df_path)[:-4] |
|
|
109 |
|
|
|
110 |
# split the df into n=num_processes slices and save each slice to a csv file |
|
|
111 |
logging.info(f'Splitting {results_df_name} into {num_processes} slices...') |
|
|
112 |
splitDF(df, num_processes, results_df_name) |
|
|
113 |
|
|
|
114 |
# run the script in parallel on each slice |
|
|
115 |
logging.info(f'Running {script_path} in parallel on {num_processes} slices...') |
|
|
116 |
pool = multiprocessing.Pool(processes=num_processes) |
|
|
117 |
pool.map(runScriptSubprocess, [(f"{results_df_name}_{slice}.csv", script_path) for slice in range(num_processes)]) |
|
|
118 |
pool.close() |
|
|
119 |
pool.join() |
|
|
120 |
logging.info(f'Finished {num_processes} processes...') |
|
|
121 |
|
|
|
122 |
# join the results from each slice into one DataFrame and save it to a CSV file |
|
|
123 |
joint_path = f'{results_df_name}_{joint_name}.csv' |
|
|
124 |
logging.info(f'Joining results from {num_processes} slices into {joint_path}...') |
|
|
125 |
joinSlices(num_processes, results_df_name, joint_path) |
|
|
126 |
|
|
|
127 |
# clean up the slice files |
|
|
128 |
logging.info(f'Cleaning up slice files for {results_df_name}...') |
|
|
129 |
cleanUpSlices(num_processes, results_df_name) |
|
|
130 |
|
|
|
131 |
logging.info(f'Finished processing {results_df_name}.') |
|
|
132 |
|
|
|
133 |
def parseArguments() -> argparse.Namespace: |
|
|
134 |
""" |
|
|
135 |
Parses command line arguments. |
|
|
136 |
|
|
|
137 |
Returns: |
|
|
138 |
Parsed arguments |
|
|
139 |
""" |
|
|
140 |
program_description = 'This program runs a Python script in parallel on a pandas DataFrame using \ |
|
|
141 |
the provided CSV file as input. The CSV file must contain a header row. \ |
|
|
142 |
Caution: The CSV file will be split into n=num_processes slices and each slice \ |
|
|
143 |
will be saved to a CSV file. The slice files will be deleted after the script \ |
|
|
144 |
is finished. Make sure that your input file can be divided into sub-files \ |
|
|
145 |
without losing data...' |
|
|
146 |
|
|
|
147 |
parser = argparse.ArgumentParser(description=program_description) |
|
|
148 |
|
|
|
149 |
args_dict = { |
|
|
150 |
'-v': {'name': '--version', 'action': 'version', 'version': '%(prog)s 1.0'}, |
|
|
151 |
'-t': {'name': '--target', 'type': str, 'default': 'myscript.py', 'help': 'Path to the Python script to be run'}, |
|
|
152 |
'-r': {'name': '--results_df_path', 'type': str, 'default': 'results_df.csv', 'help': 'Path to the results dataframe to be processed'}, |
|
|
153 |
'-p': {'name': '--num_processes', 'type': int, 'default': 8, 'help': 'Number of processes (cores) to use'} |
|
|
154 |
} |
|
|
155 |
|
|
|
156 |
for arg, properties in args_dict.items(): |
|
|
157 |
parser.add_argument(arg, properties['name'], **{key: value for key, value in properties.items() if key != 'name'}) |
|
|
158 |
|
|
|
159 |
return parser.parse_args() |
|
|
160 |
|
|
|
161 |
def main() -> None: |
|
|
162 |
""" |
|
|
163 |
This function is the entry point of the program. It sets up logging, and calls the function to run the main pool of processes. |
|
|
164 |
|
|
|
165 |
Returns: |
|
|
166 |
None |
|
|
167 |
""" |
|
|
168 |
args = parseArguments() |
|
|
169 |
|
|
|
170 |
logging.info(f'Running {args.target} on {args.results_df_path} in parallel on {args.num_processes} processes...') |
|
|
171 |
runMainPool(args.target, args.results_df_path, num_processes=args.num_processes) |
|
|
172 |
logging.info(f'Finished running {args.target} on {args.results_df_path} in parallel on {args.num_processes} processes...') |
|
|
173 |
logging.info(f'Output saved to {args.results_df_path[:-4]}_all.csv') |
|
|
174 |
|
|
|
175 |
if __name__ == "__main__": |
|
|
176 |
main() |