Diff of /aggmap/utils/multiproc.py [000000] .. [9e8054]

Switch to side-by-side view

--- a
+++ b/aggmap/utils/multiproc.py
@@ -0,0 +1,199 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Wed Nov 21 12:52:49 2018
+
+@author: shenwanxiang
+
+Multi process Run
+"""
+
+import time
+import pandas as pd
+from tqdm import tqdm
+
+from concurrent.futures import ProcessPoolExecutor, wait, as_completed
+from multiprocessing import Pool,cpu_count,current_process 
+import subprocess
+
+
+from aggmap.utils.logtools import print_info, print_error, pbar,print_warn
+
+
+def RunCmd(cmd):
+    '''
+    input:
+        cmd: str
+    output:
+        status: int, 0 for success
+        stdout: str
+        stderr: str
+        
+    '''
+    print_info('run command : %s' % cmd)
+    
+    def swap_log(swap, error = True):
+        sinfo = []
+        for l in swap.split('\n'):
+            if l == '':
+                continue
+            sinfo.append(l)
+        for o in sinfo:
+            if error:
+                print_error(o) 
+            else:
+                print_info(o) 
+        return            
+    output = subprocess.run(cmd, 
+                            shell=True, 
+                            stdout=subprocess.PIPE, 
+                            stderr=subprocess.PIPE, 
+                            universal_newlines=True)    
+    status = output.returncode
+    stdout = output.stdout
+    stderr = output.stderr
+    
+    if status != 0:
+        if output.stdout:
+             swap_log(output.stdout, error=True)
+        if output.stderr:
+             swap_log(output.stderr, error=True)
+    else:
+        if output.stdout:
+            swap_log(output.stdout, error=False)
+    #return status
+
+    return status, stdout, stderr
+
+
+
+def ImapUnorder(processor, iterator, max_workers=10, fail_in_file = './filed.lst'):
+    '''
+    processor: fuction
+    iterator: list or iterator,each element should be a tuple or dict, so that data can be used as ordered 
+    '''
+    with ProcessPoolExecutor(max_workers=max_workers) as executor:
+        
+        with open(fail_in_file, 'w+') as f:
+            futures = {executor.submit(processor, IdPlusSmile):IdPlusSmile for IdPlusSmile in iterator}
+            success, _ = wait(futures)
+            with pbar(total = len(futures)) as pb:
+                for i in success:
+                    IdPlusSmile = futures[i]
+                    print_info('deal '+ str(IdPlusSmile))
+                    try:
+                        data_dict = i.result()
+                        yield data_dict
+                    except Exception as exc:
+                        print_warn('because of the process is dead, input: %s is fialed when deal with %s: %s, so we will deal it automatically' % (IdPlusSmile, processor, exc))
+                        
+                        try: 
+                            yield processor(IdPlusSmile)
+                        except:
+                            f.write(str(IdPlusSmile)+'\n')
+                            print_error(' input: %s is fialed when deal with %s: %s' % (IdPlusSmile, processor, exc))
+                    pb.update(1)
+
+                    
+
+
+                    
+def MultiProcessUnorderedBarRun(func, deal_list, n_cpus=None):
+    if n_cpus ==None:
+        N_CPUS = cpu_count()
+    else:
+        N_CPUS = int(n_cpus)
+    print_info('the number of process is %s' % N_CPUS)
+    
+    p = Pool(N_CPUS)
+    res_list = []
+    with pbar(total = len(deal_list), ascii=True) as pb:
+        for res in p.imap_unordered(func, deal_list):
+            pb.update(1)
+            res_list.append(res)
+    p.close()
+    p.join()
+    return res_list
+
+
+
+def MultiProcessRun(func, deal_list, n_cpus=None):
+    
+    '''
+    input:
+        func: function to do with each element in the deal_list
+        deal_list: list to be done
+        n_cpus: use the number of cpus
+    output:
+        list of the return result for each func
+    '''
+    
+    #round_c = [deal_list[i:i+batch_size] for i  in range(0, len(deal_list), batch_size)]
+    #mata thinking: https://my.oschina.net/leejun2005/blog/203148
+    if n_cpus ==None:
+        N_CPUS = cpu_count()
+    else:
+        N_CPUS = int(n_cpus)
+
+    print_info('the number of process is %s' % N_CPUS)
+
+    pool = Pool(N_CPUS)
+    a = pool.map(func, deal_list)
+    pool.close()
+    pool.join()
+    return a
+
+
+
+
+
+
+########### ordered map reduce  ##############
+def _decorate_func(func, i, j):
+    return [i, func(j)]
+
+def _executor(func, series, n_cpus = 4):
+    with ProcessPoolExecutor(max_workers=n_cpus) as executor:
+        futures = [executor.submit(_decorate_func, func, i, j) for i,j in series.iteritems()]
+    return futures
+
+
+def MultiExecutorRun(func, deal_list, n_cpus = 4, tqdm_args = {'unit':'one'}):
+    
+    '''
+    input:
+        func: function to do with each element in the deal_list
+        deal_list: list to be done
+        n_cpus: use the number of cpus
+        tqdm_args: args for tqdm
+    output:
+        list of the return value for each func
+    '''
+    lst  =list(deal_list)
+    series = pd.Series(lst)
+    
+    futures = _executor(func, series, n_cpus = n_cpus)
+    args = {
+        'total': len(deal_list),
+        'unit': 'one',
+        'ascii': True,
+        'unit_scale': True,
+        'leave': True
+    }
+    args.update(tqdm_args)
+    
+    print_info(args)
+    
+    results = []
+    indexs = []
+    for f in tqdm(as_completed(futures), **args):
+        #print(f)
+        idx, result = f.result()
+        indexs.append(idx)
+        results.append(result)
+    
+    res = pd.Series(results,index=indexs)
+    #sort unordered result
+    ordered_lst = res.sort_index().tolist()
+    return ordered_lst
+    
\ No newline at end of file