Diff of /aggmap/utils/multiproc.py [000000] .. [9e8054]

Switch to unified view

a b/aggmap/utils/multiproc.py
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Wed Nov 21 12:52:49 2018
5
6
@author: shenwanxiang
7
8
Multi process Run
9
"""
10
11
import time
12
import pandas as pd
13
from tqdm import tqdm
14
15
from concurrent.futures import ProcessPoolExecutor, wait, as_completed
16
from multiprocessing import Pool,cpu_count,current_process 
17
import subprocess
18
19
20
from aggmap.utils.logtools import print_info, print_error, pbar,print_warn
21
22
23
def RunCmd(cmd):
24
    '''
25
    input:
26
        cmd: str
27
    output:
28
        status: int, 0 for success
29
        stdout: str
30
        stderr: str
31
        
32
    '''
33
    print_info('run command : %s' % cmd)
34
    
35
    def swap_log(swap, error = True):
36
        sinfo = []
37
        for l in swap.split('\n'):
38
            if l == '':
39
                continue
40
            sinfo.append(l)
41
        for o in sinfo:
42
            if error:
43
                print_error(o) 
44
            else:
45
                print_info(o) 
46
        return            
47
    output = subprocess.run(cmd, 
48
                            shell=True, 
49
                            stdout=subprocess.PIPE, 
50
                            stderr=subprocess.PIPE, 
51
                            universal_newlines=True)    
52
    status = output.returncode
53
    stdout = output.stdout
54
    stderr = output.stderr
55
    
56
    if status != 0:
57
        if output.stdout:
58
             swap_log(output.stdout, error=True)
59
        if output.stderr:
60
             swap_log(output.stderr, error=True)
61
    else:
62
        if output.stdout:
63
            swap_log(output.stdout, error=False)
64
    #return status
65
66
    return status, stdout, stderr
67
68
69
70
def ImapUnorder(processor, iterator, max_workers=10, fail_in_file = './filed.lst'):
71
    '''
72
    processor: fuction
73
    iterator: list or iterator,each element should be a tuple or dict, so that data can be used as ordered 
74
    '''
75
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
76
        
77
        with open(fail_in_file, 'w+') as f:
78
            futures = {executor.submit(processor, IdPlusSmile):IdPlusSmile for IdPlusSmile in iterator}
79
            success, _ = wait(futures)
80
            with pbar(total = len(futures)) as pb:
81
                for i in success:
82
                    IdPlusSmile = futures[i]
83
                    print_info('deal '+ str(IdPlusSmile))
84
                    try:
85
                        data_dict = i.result()
86
                        yield data_dict
87
                    except Exception as exc:
88
                        print_warn('because of the process is dead, input: %s is fialed when deal with %s: %s, so we will deal it automatically' % (IdPlusSmile, processor, exc))
89
                        
90
                        try: 
91
                            yield processor(IdPlusSmile)
92
                        except:
93
                            f.write(str(IdPlusSmile)+'\n')
94
                            print_error(' input: %s is fialed when deal with %s: %s' % (IdPlusSmile, processor, exc))
95
                    pb.update(1)
96
97
                    
98
99
100
                    
101
def MultiProcessUnorderedBarRun(func, deal_list, n_cpus=None):
102
    if n_cpus ==None:
103
        N_CPUS = cpu_count()
104
    else:
105
        N_CPUS = int(n_cpus)
106
    print_info('the number of process is %s' % N_CPUS)
107
    
108
    p = Pool(N_CPUS)
109
    res_list = []
110
    with pbar(total = len(deal_list), ascii=True) as pb:
111
        for res in p.imap_unordered(func, deal_list):
112
            pb.update(1)
113
            res_list.append(res)
114
    p.close()
115
    p.join()
116
    return res_list
117
118
119
120
def MultiProcessRun(func, deal_list, n_cpus=None):
121
    
122
    '''
123
    input:
124
        func: function to do with each element in the deal_list
125
        deal_list: list to be done
126
        n_cpus: use the number of cpus
127
    output:
128
        list of the return result for each func
129
    '''
130
    
131
    #round_c = [deal_list[i:i+batch_size] for i  in range(0, len(deal_list), batch_size)]
132
    #mata thinking: https://my.oschina.net/leejun2005/blog/203148
133
    if n_cpus ==None:
134
        N_CPUS = cpu_count()
135
    else:
136
        N_CPUS = int(n_cpus)
137
138
    print_info('the number of process is %s' % N_CPUS)
139
140
    pool = Pool(N_CPUS)
141
    a = pool.map(func, deal_list)
142
    pool.close()
143
    pool.join()
144
    return a
145
146
147
148
149
150
151
########### ordered map reduce  ##############
152
def _decorate_func(func, i, j):
153
    return [i, func(j)]
154
155
def _executor(func, series, n_cpus = 4):
156
    with ProcessPoolExecutor(max_workers=n_cpus) as executor:
157
        futures = [executor.submit(_decorate_func, func, i, j) for i,j in series.iteritems()]
158
    return futures
159
160
161
def MultiExecutorRun(func, deal_list, n_cpus = 4, tqdm_args = {'unit':'one'}):
162
    
163
    '''
164
    input:
165
        func: function to do with each element in the deal_list
166
        deal_list: list to be done
167
        n_cpus: use the number of cpus
168
        tqdm_args: args for tqdm
169
    output:
170
        list of the return value for each func
171
    '''
172
    lst  =list(deal_list)
173
    series = pd.Series(lst)
174
    
175
    futures = _executor(func, series, n_cpus = n_cpus)
176
    args = {
177
        'total': len(deal_list),
178
        'unit': 'one',
179
        'ascii': True,
180
        'unit_scale': True,
181
        'leave': True
182
    }
183
    args.update(tqdm_args)
184
    
185
    print_info(args)
186
    
187
    results = []
188
    indexs = []
189
    for f in tqdm(as_completed(futures), **args):
190
        #print(f)
191
        idx, result = f.result()
192
        indexs.append(idx)
193
        results.append(result)
194
    
195
    res = pd.Series(results,index=indexs)
196
    #sort unordered result
197
    ordered_lst = res.sort_index().tolist()
198
    return ordered_lst
199