Diff of /BioAid/paralleltools.py [000000] .. [deb8e5]

Switch to side-by-side view

--- a
+++ b/BioAid/paralleltools.py
@@ -0,0 +1,176 @@
+# This code was developed and authored by Jerzy Twarowski in Malkova Lab at the University of Iowa 
+# Contact: jerzymateusz-twarowski@uiowa.edu, tvarovski1@gmail.com
+
+import os
+import argparse
+import logging
+import subprocess
+import multiprocessing
+from math import ceil
+import pandas as pd
+
+def splitDF(df: pd.DataFrame, slices_num: int, savecsv_path_root=None) -> list:
+    """
+    Splits a pandas DataFrame into n=slices_num slices and saves each slice to a CSV file if a path is provided.
+    Returns a list of the DataFrame slices.
+
+    Args:
+        df (pandas.DataFrame): The DataFrame to be split.
+        slices_num (int): The number of slices to create.
+        savecsv_path_root (str): Optional. The root path to save the CSV files. If not provided, the slices are not saved.
+
+    Returns:
+        list: A list of the DataFrame slices.
+    """
+    slice_size = ceil(len(df) / slices_num)
+    slices_out = []
+    for slice in range(slices_num):
+        df_slice = df[slice*slice_size:(slice+1)*slice_size].copy()
+        
+        if savecsv_path_root:
+            df_slice_name = f"{savecsv_path_root[0]}_{slice}.csv"
+            df_slice.to_csv(df_slice_name, index=False)
+            logging.info(f'Slice {slice} saved to {df_slice_name}')
+        
+        slices_out.append(df_slice)
+    
+    logging.info(f'Input df split into {slices_num} slices.')
+    return slices_out
+
+def joinSlices(slices_num: int, slice_path_root: str, savecsv_path=None) -> pd.DataFrame:
+    """
+    Combines the results from each slice into one pandas DataFrame and saves it to a CSV file if a path is provided.
+    Returns the combined DataFrame.
+
+    Args:
+        slices_num (int): The number of slices to combine.
+        slice_path_root (str): The root path to the CSV files for each slice.
+        *savecsv_path (str): Optional. The path to save the combined DataFrame as a CSV file.
+
+    Returns:
+        pandas.DataFrame: The combined DataFrame.
+    """
+    df = pd.DataFrame()
+    for slice in range(slices_num):
+        df_slice = pd.read_csv(f"{slice_path_root}_{slice}.csv")
+        df = pd.concat([df, df_slice], ignore_index=True)
+    
+    if savecsv_path:
+        df.to_csv(savecsv_path[0], index=False)
+        logging.info(f'Slices joined! Results saved to {savecsv_path[0]}')
+    
+    return df
+
+def cleanUpSlices(slices_num: int, slice_path_root: str) -> None:
+    """
+    Deletes the CSV files for each slice of a DataFrame that was split using the `split_df` function.
+
+    Args:
+        slices_num (int): The number of slices that were created.
+        slice_path_root (str): The root path to the CSV files for each slice.
+
+    Returns:
+        None
+    """
+    logging.info('Combining results from each slice...')
+    for slice in range(slices_num):
+        os.remove(f"{slice_path_root}_{slice}.csv")
+    logging.info(f'Clean up Finished! Deleted slice files.')
+
+def runScriptSubprocess(args: tuple[str, str]) -> None:
+    """
+    Runs a Python script in a subprocess using the provided CSV file as input.
+
+    Args:
+        args (tuple): A tuple containing the path to the CSV file and the path to the Python script.
+
+    Returns:
+        None
+    """
+    csv_slice, script_path = args
+    subprocess.call(['python', script_path, csv_slice])
+
+def runMainPool(script_path: str, results_df_path: str, num_processes: int = 8, joint_name: str = 'all') -> None:
+    """
+    Runs a Python script in parallel on a pandas DataFrame using the provided CSV file as input.
+
+    Args:
+        script_path (str): The path to the Python script to be run.
+        results_df_path (str): The path to the CSV file containing the input DataFrame.
+        num_processes (int): Optional. The number of processes to use for parallelization. Default is 8.
+        joint_name (str): Optional. The name of the output CSV file containing the combined results. Default is 'all'.
+
+    Returns:
+        None
+    """
+    with open(results_df_path, 'r') as f:
+        df = pd.read_csv(f)
+        results_df_name = os.path.basename(results_df_path)[:-4]
+
+        # split the df into n=num_processes slices and save each slice to a csv file
+        logging.info(f'Splitting {results_df_name} into {num_processes} slices...')
+        splitDF(df, num_processes, results_df_name)
+
+        # run the script in parallel on each slice
+        logging.info(f'Running {script_path} in parallel on {num_processes} slices...')
+        pool = multiprocessing.Pool(processes=num_processes)
+        pool.map(runScriptSubprocess, [(f"{results_df_name}_{slice}.csv", script_path) for slice in range(num_processes)])
+        pool.close()
+        pool.join()
+        logging.info(f'Finished {num_processes} processes...')
+
+        # join the results from each slice into one DataFrame and save it to a CSV file
+        joint_path = f'{results_df_name}_{joint_name}.csv'
+        logging.info(f'Joining results from {num_processes} slices into {joint_path}...')
+        joinSlices(num_processes, results_df_name, joint_path)
+
+        # clean up the slice files
+        logging.info(f'Cleaning up slice files for {results_df_name}...')
+        cleanUpSlices(num_processes, results_df_name)
+
+    logging.info(f'Finished processing {results_df_name}.')
+
+def parseArguments() -> argparse.Namespace:
+    """
+    Parses command line arguments.
+
+    Returns:
+    Parsed arguments
+    """
+    program_description =  'This program runs a Python script in parallel on a pandas DataFrame using \
+                            the provided CSV file as input. The CSV file must contain a header row. \
+                            Caution: The CSV file will be split into n=num_processes slices and each slice \
+                            will be saved to a CSV file. The slice files will be deleted after the script \
+                            is finished. Make sure that your input file can be divided into sub-files \
+                            without losing data...'
+    
+    parser = argparse.ArgumentParser(description=program_description)
+
+    args_dict = {
+        '-v': {'name': '--version', 'action': 'version', 'version': '%(prog)s 1.0'},
+        '-t': {'name': '--target', 'type': str, 'default': 'myscript.py', 'help': 'Path to the Python script to be run'},
+        '-r': {'name': '--results_df_path', 'type': str, 'default': 'results_df.csv', 'help': 'Path to the results dataframe to be processed'},
+        '-p': {'name': '--num_processes', 'type': int, 'default': 8, 'help': 'Number of processes (cores) to use'}
+    }
+
+    for arg, properties in args_dict.items():
+        parser.add_argument(arg, properties['name'], **{key: value for key, value in properties.items() if key != 'name'})
+
+    return parser.parse_args()
+
+def main() -> None:
+    """
+    This function is the entry point of the program. It sets up logging, and calls the function to run the main pool of processes.
+
+    Returns:
+    None
+    """
+    args = parseArguments()
+
+    logging.info(f'Running {args.target} on {args.results_df_path} in parallel on {args.num_processes} processes...')
+    runMainPool(args.target, args.results_df_path, num_processes=args.num_processes)
+    logging.info(f'Finished running {args.target} on {args.results_df_path} in parallel on {args.num_processes} processes...')
+    logging.info(f'Output saved to {args.results_df_path[:-4]}_all.csv')
+
+if __name__ == "__main__":
+    main()
\ No newline at end of file