Switch to unified view

a b/deidentification/convert_upload_async.py
1
2
# coding: utf-8
3
# parts of code are borrowed from:
4
# https://fredrikaverpil.github.io/2017/06/20/async-and-await-with-subprocesses/
5
6
import pandas as pd
7
import os
8
from os import makedirs
9
from os.path import dirname, join as pj
10
from shutil import copyfile
11
from subprocess import PIPE, call
12
import sys
13
sys.path.append('../anonymize_slide/')
14
from anonymize_slide import do_aperio_svs
15
import openslide
16
17
import time
18
import platform
19
import asyncio
20
21
22
23
async def anonymize_put_on_gcloud(oldpath, newpath):
24
    print('='*30)
25
    outdir = pj(outparentdir, dirname(newpath))
26
    outfile = pj(outparentdir, newpath)
27
    routfile = '/'.join(outfile.split('/')[1:])
28
29
    if not os.path.exists(outdir):
30
        makedirs(outdir)
31
    copyfile(pj(inparentdir, oldpath), outfile)
32
    # remove the label
33
    do_aperio_svs(outfile)
34
    # make sure no label
35
    slide = openslide.OpenSlide(outfile)
36
    assert 'label' not in list(slide.associated_images)
37
38
    await run_command_shell(" ".join(["gsutil","cp", "-r", outfile, "gs://kidney-rejection/" + routfile]))
39
    await run_command_shell((["rm", "-r", outfile]))
40
41
42
43
async def run_command(*args):
44
    """Run command in subprocess
45
    
46
    Example from:
47
        http://asyncio.readthedocs.io/en/latest/subprocess.html
48
    """
49
    # Create subprocess
50
    process = await asyncio.create_subprocess_exec(
51
        *args,
52
        # stdout must a pipe to be accessible as process.stdout
53
        stdout=asyncio.subprocess.PIPE)
54
55
    # Status
56
    print('Started:', args, '(pid = ' + str(process.pid) + ')')
57
58
    # Wait for the subprocess to finish
59
    stdout, stderr = await process.communicate()
60
61
    # Progress
62
    if process.returncode == 0:
63
        print('Done:', args, '(pid = ' + str(process.pid) + ')')
64
    else:
65
        print('Failed:', args, '(pid = ' + str(process.pid) + ')')
66
67
    # Result
68
    result = stdout.decode().strip()
69
70
    # Return stdout
71
    return result
72
73
74
async def run_command_shell(command):
75
    """Run command in subprocess (shell)
76
    
77
    Note:
78
        This can be used if you wish to execute e.g. "copy"
79
        on Windows, which can only be executed in the shell.
80
    """
81
    if isinstance(command, list):
82
        command = " ".join(command)
83
    # Create subprocess
84
    process = await asyncio.create_subprocess_shell(
85
        command,
86
        stdout=asyncio.subprocess.PIPE)
87
88
    # Status
89
    print('Started:', command, '(pid = ' + str(process.pid) + ')')
90
91
    # Wait for the subprocess to finish
92
    stdout, stderr = await process.communicate()
93
94
    # Progress
95
    if process.returncode == 0:
96
        print('Done:', command, '(pid = ' + str(process.pid) + ')')
97
    else:
98
        print('Failed:', command, '(pid = ' + str(process.pid) + ')')
99
100
    # Result
101
    result = stdout.decode().strip()
102
103
    # Return stdout
104
    return result
105
106
107
def make_chunks(l, n):
108
    """Yield successive n-sized chunks from l.
109
110
    Note:
111
        Taken from https://stackoverflow.com/a/312464
112
    """
113
    if sys.version_info.major == 2:
114
        for i in xrange(0, len(l), n):
115
            yield l[i:i + n]
116
    else:
117
        # Assume Python 3
118
        for i in range(0, len(l), n):
119
            yield l[i:i + n]
120
121
122
def run_asyncio_commands(tasks, max_concurrent_tasks=0):
123
    """Run tasks asynchronously using asyncio and return results
124
125
    If max_concurrent_tasks are set to 0, no limit is applied.
126
127
    Note:
128
        By default, Windows uses SelectorEventLoop, which does not support
129
        subprocesses. Therefore ProactorEventLoop is used on Windows.
130
        https://docs.python.org/3/library/asyncio-eventloops.html#windows
131
    """
132
133
    all_results = []
134
135
    if max_concurrent_tasks == 0:
136
        chunks = [tasks]
137
    else:
138
        chunks = make_chunks(l=tasks, n=max_concurrent_tasks)
139
140
    for tasks_in_chunk in chunks:
141
        if platform.system() == 'Windows':
142
            loop = asyncio.ProactorEventLoop()
143
            asyncio.set_event_loop(loop)
144
        else:
145
            loop = asyncio.get_event_loop()
146
147
        commands = asyncio.gather(*tasks_in_chunk)  # Unpack list using *
148
        results = loop.run_until_complete(commands)
149
        all_results += results
150
#         loop.close()
151
    return all_results
152
153
154
if __name__ == '__main__':
155
    import yaml
156
    with open("upload-config.yaml") as fh:
157
        config = yaml.load(fh)
158
        config
159
    
160
    inparentdir = config["inparentdir"]
161
    outparentdir = config["outparentdir"] 
162
    fnmap = config["fnmap"]
163
    max_concurrent_tasks = config["max_concurrent_tasks"]
164
165
    dfmap = pd.read_table(fnmap)
166
    tasks = []
167
    for nn,(kk, ds) in enumerate(dfmap.iterrows()):
168
        tasks.append(anonymize_put_on_gcloud(ds["filepath"], ds["newpath"]))
169
170
171
    run_asyncio_commands(tasks, max_concurrent_tasks=max_concurrent_tasks)
172
173
    print("removing the temporary directory")
174
    run_asyncio_commands([ run_command_shell((["rm", "-r", outparentdir])) ])
175
176