a b/BioAid/paralleltools.py
1
# This code was developed and authored by Jerzy Twarowski in Malkova Lab at the University of Iowa 
2
# Contact: jerzymateusz-twarowski@uiowa.edu, tvarovski1@gmail.com
3
4
import os
5
import argparse
6
import logging
7
import subprocess
8
import multiprocessing
9
from math import ceil
10
import pandas as pd
11
12
def splitDF(df: pd.DataFrame, slices_num: int, savecsv_path_root=None) -> list:
13
    """
14
    Splits a pandas DataFrame into n=slices_num slices and saves each slice to a CSV file if a path is provided.
15
    Returns a list of the DataFrame slices.
16
17
    Args:
18
        df (pandas.DataFrame): The DataFrame to be split.
19
        slices_num (int): The number of slices to create.
20
        savecsv_path_root (str): Optional. The root path to save the CSV files. If not provided, the slices are not saved.
21
22
    Returns:
23
        list: A list of the DataFrame slices.
24
    """
25
    slice_size = ceil(len(df) / slices_num)
26
    slices_out = []
27
    for slice in range(slices_num):
28
        df_slice = df[slice*slice_size:(slice+1)*slice_size].copy()
29
        
30
        if savecsv_path_root:
31
            df_slice_name = f"{savecsv_path_root[0]}_{slice}.csv"
32
            df_slice.to_csv(df_slice_name, index=False)
33
            logging.info(f'Slice {slice} saved to {df_slice_name}')
34
        
35
        slices_out.append(df_slice)
36
    
37
    logging.info(f'Input df split into {slices_num} slices.')
38
    return slices_out
39
40
def joinSlices(slices_num: int, slice_path_root: str, savecsv_path=None) -> pd.DataFrame:
41
    """
42
    Combines the results from each slice into one pandas DataFrame and saves it to a CSV file if a path is provided.
43
    Returns the combined DataFrame.
44
45
    Args:
46
        slices_num (int): The number of slices to combine.
47
        slice_path_root (str): The root path to the CSV files for each slice.
48
        *savecsv_path (str): Optional. The path to save the combined DataFrame as a CSV file.
49
50
    Returns:
51
        pandas.DataFrame: The combined DataFrame.
52
    """
53
    df = pd.DataFrame()
54
    for slice in range(slices_num):
55
        df_slice = pd.read_csv(f"{slice_path_root}_{slice}.csv")
56
        df = pd.concat([df, df_slice], ignore_index=True)
57
    
58
    if savecsv_path:
59
        df.to_csv(savecsv_path[0], index=False)
60
        logging.info(f'Slices joined! Results saved to {savecsv_path[0]}')
61
    
62
    return df
63
64
def cleanUpSlices(slices_num: int, slice_path_root: str) -> None:
65
    """
66
    Deletes the CSV files for each slice of a DataFrame that was split using the `split_df` function.
67
68
    Args:
69
        slices_num (int): The number of slices that were created.
70
        slice_path_root (str): The root path to the CSV files for each slice.
71
72
    Returns:
73
        None
74
    """
75
    logging.info('Combining results from each slice...')
76
    for slice in range(slices_num):
77
        os.remove(f"{slice_path_root}_{slice}.csv")
78
    logging.info(f'Clean up Finished! Deleted slice files.')
79
80
def runScriptSubprocess(args: tuple[str, str]) -> None:
81
    """
82
    Runs a Python script in a subprocess using the provided CSV file as input.
83
84
    Args:
85
        args (tuple): A tuple containing the path to the CSV file and the path to the Python script.
86
87
    Returns:
88
        None
89
    """
90
    csv_slice, script_path = args
91
    subprocess.call(['python', script_path, csv_slice])
92
93
def runMainPool(script_path: str, results_df_path: str, num_processes: int = 8, joint_name: str = 'all') -> None:
94
    """
95
    Runs a Python script in parallel on a pandas DataFrame using the provided CSV file as input.
96
97
    Args:
98
        script_path (str): The path to the Python script to be run.
99
        results_df_path (str): The path to the CSV file containing the input DataFrame.
100
        num_processes (int): Optional. The number of processes to use for parallelization. Default is 8.
101
        joint_name (str): Optional. The name of the output CSV file containing the combined results. Default is 'all'.
102
103
    Returns:
104
        None
105
    """
106
    with open(results_df_path, 'r') as f:
107
        df = pd.read_csv(f)
108
        results_df_name = os.path.basename(results_df_path)[:-4]
109
110
        # split the df into n=num_processes slices and save each slice to a csv file
111
        logging.info(f'Splitting {results_df_name} into {num_processes} slices...')
112
        splitDF(df, num_processes, results_df_name)
113
114
        # run the script in parallel on each slice
115
        logging.info(f'Running {script_path} in parallel on {num_processes} slices...')
116
        pool = multiprocessing.Pool(processes=num_processes)
117
        pool.map(runScriptSubprocess, [(f"{results_df_name}_{slice}.csv", script_path) for slice in range(num_processes)])
118
        pool.close()
119
        pool.join()
120
        logging.info(f'Finished {num_processes} processes...')
121
122
        # join the results from each slice into one DataFrame and save it to a CSV file
123
        joint_path = f'{results_df_name}_{joint_name}.csv'
124
        logging.info(f'Joining results from {num_processes} slices into {joint_path}...')
125
        joinSlices(num_processes, results_df_name, joint_path)
126
127
        # clean up the slice files
128
        logging.info(f'Cleaning up slice files for {results_df_name}...')
129
        cleanUpSlices(num_processes, results_df_name)
130
131
    logging.info(f'Finished processing {results_df_name}.')
132
133
def parseArguments() -> argparse.Namespace:
134
    """
135
    Parses command line arguments.
136
137
    Returns:
138
    Parsed arguments
139
    """
140
    program_description =  'This program runs a Python script in parallel on a pandas DataFrame using \
141
                            the provided CSV file as input. The CSV file must contain a header row. \
142
                            Caution: The CSV file will be split into n=num_processes slices and each slice \
143
                            will be saved to a CSV file. The slice files will be deleted after the script \
144
                            is finished. Make sure that your input file can be divided into sub-files \
145
                            without losing data...'
146
    
147
    parser = argparse.ArgumentParser(description=program_description)
148
149
    args_dict = {
150
        '-v': {'name': '--version', 'action': 'version', 'version': '%(prog)s 1.0'},
151
        '-t': {'name': '--target', 'type': str, 'default': 'myscript.py', 'help': 'Path to the Python script to be run'},
152
        '-r': {'name': '--results_df_path', 'type': str, 'default': 'results_df.csv', 'help': 'Path to the results dataframe to be processed'},
153
        '-p': {'name': '--num_processes', 'type': int, 'default': 8, 'help': 'Number of processes (cores) to use'}
154
    }
155
156
    for arg, properties in args_dict.items():
157
        parser.add_argument(arg, properties['name'], **{key: value for key, value in properties.items() if key != 'name'})
158
159
    return parser.parse_args()
160
161
def main() -> None:
162
    """
163
    This function is the entry point of the program. It sets up logging, and calls the function to run the main pool of processes.
164
165
    Returns:
166
    None
167
    """
168
    args = parseArguments()
169
170
    logging.info(f'Running {args.target} on {args.results_df_path} in parallel on {args.num_processes} processes...')
171
    runMainPool(args.target, args.results_df_path, num_processes=args.num_processes)
172
    logging.info(f'Finished running {args.target} on {args.results_df_path} in parallel on {args.num_processes} processes...')
173
    logging.info(f'Output saved to {args.results_df_path[:-4]}_all.csv')
174
175
if __name__ == "__main__":
176
    main()