|
a |
|
b/deidentification/convert_upload_async.py |
|
|
1 |
|
|
|
2 |
# coding: utf-8 |
|
|
3 |
# parts of code are borrowed from: |
|
|
4 |
# https://fredrikaverpil.github.io/2017/06/20/async-and-await-with-subprocesses/ |
|
|
5 |
|
|
|
6 |
import pandas as pd |
|
|
7 |
import os |
|
|
8 |
from os import makedirs |
|
|
9 |
from os.path import dirname, join as pj |
|
|
10 |
from shutil import copyfile |
|
|
11 |
from subprocess import PIPE, call |
|
|
12 |
import sys |
|
|
13 |
sys.path.append('../anonymize_slide/') |
|
|
14 |
from anonymize_slide import do_aperio_svs |
|
|
15 |
import openslide |
|
|
16 |
|
|
|
17 |
import time |
|
|
18 |
import platform |
|
|
19 |
import asyncio |
|
|
20 |
|
|
|
21 |
|
|
|
22 |
|
|
|
23 |
async def anonymize_put_on_gcloud(oldpath, newpath): |
|
|
24 |
print('='*30) |
|
|
25 |
outdir = pj(outparentdir, dirname(newpath)) |
|
|
26 |
outfile = pj(outparentdir, newpath) |
|
|
27 |
routfile = '/'.join(outfile.split('/')[1:]) |
|
|
28 |
|
|
|
29 |
if not os.path.exists(outdir): |
|
|
30 |
makedirs(outdir) |
|
|
31 |
copyfile(pj(inparentdir, oldpath), outfile) |
|
|
32 |
# remove the label |
|
|
33 |
do_aperio_svs(outfile) |
|
|
34 |
# make sure no label |
|
|
35 |
slide = openslide.OpenSlide(outfile) |
|
|
36 |
assert 'label' not in list(slide.associated_images) |
|
|
37 |
|
|
|
38 |
await run_command_shell(" ".join(["gsutil","cp", "-r", outfile, "gs://kidney-rejection/" + routfile])) |
|
|
39 |
await run_command_shell((["rm", "-r", outfile])) |
|
|
40 |
|
|
|
41 |
|
|
|
42 |
|
|
|
43 |
async def run_command(*args): |
|
|
44 |
"""Run command in subprocess |
|
|
45 |
|
|
|
46 |
Example from: |
|
|
47 |
http://asyncio.readthedocs.io/en/latest/subprocess.html |
|
|
48 |
""" |
|
|
49 |
# Create subprocess |
|
|
50 |
process = await asyncio.create_subprocess_exec( |
|
|
51 |
*args, |
|
|
52 |
# stdout must a pipe to be accessible as process.stdout |
|
|
53 |
stdout=asyncio.subprocess.PIPE) |
|
|
54 |
|
|
|
55 |
# Status |
|
|
56 |
print('Started:', args, '(pid = ' + str(process.pid) + ')') |
|
|
57 |
|
|
|
58 |
# Wait for the subprocess to finish |
|
|
59 |
stdout, stderr = await process.communicate() |
|
|
60 |
|
|
|
61 |
# Progress |
|
|
62 |
if process.returncode == 0: |
|
|
63 |
print('Done:', args, '(pid = ' + str(process.pid) + ')') |
|
|
64 |
else: |
|
|
65 |
print('Failed:', args, '(pid = ' + str(process.pid) + ')') |
|
|
66 |
|
|
|
67 |
# Result |
|
|
68 |
result = stdout.decode().strip() |
|
|
69 |
|
|
|
70 |
# Return stdout |
|
|
71 |
return result |
|
|
72 |
|
|
|
73 |
|
|
|
74 |
async def run_command_shell(command): |
|
|
75 |
"""Run command in subprocess (shell) |
|
|
76 |
|
|
|
77 |
Note: |
|
|
78 |
This can be used if you wish to execute e.g. "copy" |
|
|
79 |
on Windows, which can only be executed in the shell. |
|
|
80 |
""" |
|
|
81 |
if isinstance(command, list): |
|
|
82 |
command = " ".join(command) |
|
|
83 |
# Create subprocess |
|
|
84 |
process = await asyncio.create_subprocess_shell( |
|
|
85 |
command, |
|
|
86 |
stdout=asyncio.subprocess.PIPE) |
|
|
87 |
|
|
|
88 |
# Status |
|
|
89 |
print('Started:', command, '(pid = ' + str(process.pid) + ')') |
|
|
90 |
|
|
|
91 |
# Wait for the subprocess to finish |
|
|
92 |
stdout, stderr = await process.communicate() |
|
|
93 |
|
|
|
94 |
# Progress |
|
|
95 |
if process.returncode == 0: |
|
|
96 |
print('Done:', command, '(pid = ' + str(process.pid) + ')') |
|
|
97 |
else: |
|
|
98 |
print('Failed:', command, '(pid = ' + str(process.pid) + ')') |
|
|
99 |
|
|
|
100 |
# Result |
|
|
101 |
result = stdout.decode().strip() |
|
|
102 |
|
|
|
103 |
# Return stdout |
|
|
104 |
return result |
|
|
105 |
|
|
|
106 |
|
|
|
107 |
def make_chunks(l, n): |
|
|
108 |
"""Yield successive n-sized chunks from l. |
|
|
109 |
|
|
|
110 |
Note: |
|
|
111 |
Taken from https://stackoverflow.com/a/312464 |
|
|
112 |
""" |
|
|
113 |
if sys.version_info.major == 2: |
|
|
114 |
for i in xrange(0, len(l), n): |
|
|
115 |
yield l[i:i + n] |
|
|
116 |
else: |
|
|
117 |
# Assume Python 3 |
|
|
118 |
for i in range(0, len(l), n): |
|
|
119 |
yield l[i:i + n] |
|
|
120 |
|
|
|
121 |
|
|
|
122 |
def run_asyncio_commands(tasks, max_concurrent_tasks=0): |
|
|
123 |
"""Run tasks asynchronously using asyncio and return results |
|
|
124 |
|
|
|
125 |
If max_concurrent_tasks are set to 0, no limit is applied. |
|
|
126 |
|
|
|
127 |
Note: |
|
|
128 |
By default, Windows uses SelectorEventLoop, which does not support |
|
|
129 |
subprocesses. Therefore ProactorEventLoop is used on Windows. |
|
|
130 |
https://docs.python.org/3/library/asyncio-eventloops.html#windows |
|
|
131 |
""" |
|
|
132 |
|
|
|
133 |
all_results = [] |
|
|
134 |
|
|
|
135 |
if max_concurrent_tasks == 0: |
|
|
136 |
chunks = [tasks] |
|
|
137 |
else: |
|
|
138 |
chunks = make_chunks(l=tasks, n=max_concurrent_tasks) |
|
|
139 |
|
|
|
140 |
for tasks_in_chunk in chunks: |
|
|
141 |
if platform.system() == 'Windows': |
|
|
142 |
loop = asyncio.ProactorEventLoop() |
|
|
143 |
asyncio.set_event_loop(loop) |
|
|
144 |
else: |
|
|
145 |
loop = asyncio.get_event_loop() |
|
|
146 |
|
|
|
147 |
commands = asyncio.gather(*tasks_in_chunk) # Unpack list using * |
|
|
148 |
results = loop.run_until_complete(commands) |
|
|
149 |
all_results += results |
|
|
150 |
# loop.close() |
|
|
151 |
return all_results |
|
|
152 |
|
|
|
153 |
|
|
|
154 |
if __name__ == '__main__': |
|
|
155 |
import yaml |
|
|
156 |
with open("upload-config.yaml") as fh: |
|
|
157 |
config = yaml.load(fh) |
|
|
158 |
config |
|
|
159 |
|
|
|
160 |
inparentdir = config["inparentdir"] |
|
|
161 |
outparentdir = config["outparentdir"] |
|
|
162 |
fnmap = config["fnmap"] |
|
|
163 |
max_concurrent_tasks = config["max_concurrent_tasks"] |
|
|
164 |
|
|
|
165 |
dfmap = pd.read_table(fnmap) |
|
|
166 |
tasks = [] |
|
|
167 |
for nn,(kk, ds) in enumerate(dfmap.iterrows()): |
|
|
168 |
tasks.append(anonymize_put_on_gcloud(ds["filepath"], ds["newpath"])) |
|
|
169 |
|
|
|
170 |
|
|
|
171 |
run_asyncio_commands(tasks, max_concurrent_tasks=max_concurrent_tasks) |
|
|
172 |
|
|
|
173 |
print("removing the temporary directory") |
|
|
174 |
run_asyncio_commands([ run_command_shell((["rm", "-r", outparentdir])) ]) |
|
|
175 |
|
|
|
176 |
|