a b/tasks.py
1
#!/usr/bin/python !/usr/bin/env python
2
# -*- coding: utf-8 -*
3
4
5
# Functions tha combine modular subfunctions creating
6
# a task to complete, such as reading from file, extracting concepts
7
# and saving to disk again.
8
9
from config import settings
10
from utilities import time_log
11
from data_loader import load_file, load_file_batches, load_mongo, load_mongo_batches, \
12
                        parse_remove_edges, parse_text, get_collection_count
13
from data_extractor import extract_semrep, extract_semrep_parallel, extract_metamap, \
14
                           get_concepts_from_edges, get_concepts_from_edges_parallel
15
from data_saver import save_csv, save_neo4j, save_json, save_json2, create_neo4j_results, \
16
                        create_neo4j_csv, update_neo4j, update_mongo_sentences, save_mongo, update_neo4j_parallel
17
from tqdm import tqdm
18
import ijson.backends.yajl2_cffi as ijson2
19
20
21
22
class Parser(object):
23
    """
24
    Parser class for reading input. According to which pipeline
25
    task it is called upon, it parses the appropriate file.
26
    Filepaths and details according to settings.yaml.
27
    """
28
29
    def __init__(self, source, key, name=None):
30
        """
31
        Initialization of the class.
32
        Attributes:
33
            - source: str, value denoting where we will read from (e.g 'mongo')
34
            - type: str, value denoting what we will read (e.g. text, edges)
35
            - name: str, The name is only for pretty-printing purposes.
36
        """
37
38
        self.source = source
39
        self.key = key
40
        parallel_flag = str(settings['pipeline']['in']['parallel']) == 'True'
41
        stream_flag = str(settings['pipeline']['in']['stream']) == 'True'
42
        if self.source == 'mongo':
43
            if parallel_flag or stream_flag:
44
                self.load = load_mongo_batches
45
            else:
46
                self.load = load_mongo
47
        elif self.source == 'file':
48
            if parallel_flag or stream_flag:
49
                self.load = load_file_batches
50
            else:
51
                self.load = load_file
52
        elif self.source == 'delete':
53
            self.load = parse_remove_edges
54
        else:
55
            time_log('Source to read was %s. Please change settings' % self.source)
56
            raise NotImplementedError
57
        if self.key == 'text':
58
            self.parse = parse_text
59
        elif self.key == 'med_red':
60
            self.parse = None
61
        elif self.key == 'edges':
62
            self.parse = None
63
        else:
64
            time_log('Type to read was %s. Please change settings' % self.key)
65
            raise NotImplementedError
66
        if name:
67
            self.name = name
68
        else:
69
            self.name = 'Type: %s From : %s' % (self.source, self.key)
70
71
    def read(self, N=None, ind_=0):
72
        """
73
        Run the corresponding parsing function and return:
74
        Input:
75
            - ind_: int, the starting point to read from
76
        Output:
77
        1) In case of the batch or streaming processing:
78
            - json_: dict, the corresponding read batch
79
            - N: int, the total number of items to iterate through
80
            - ind_: int, the index where the next iteration of readings
81
            should start from
82
83
        2) In case of loading the whole collection:
84
            - json_: dict, the corresponding collection
85
        """
86
        parallel_flag = str(settings['pipeline']['in']['parallel']) == 'True'
87
        stream_flag = str(settings['pipeline']['in']['stream']) == 'True'
88
        if parallel_flag or stream_flag:
89
            json_, ind_ = self.load(self.key, N, ind_)
90
            if json_:
91
                if self.parse:
92
                    json_ = self.parse(json_)
93
                time_log('Completed Parsing. Read: %d documents!' % len(json_[settings['out']['json']['itemfield']]))
94
            return json_, ind_
95
        else:
96
            json_ = self.load(self.key)
97
            if self.parse:
98
                json_ = self.parse(json_)
99
            time_log('Completed Parsing. Read: %d documents!' % len(json_[settings['out']['json']['itemfield']]))
100
            return json_
101
102
103
class Extractor(object):
104
    """
105
    Class for extracting concepts/entities and relations from medical text.
106
    Expects to work with json files generated from the corresponding Parser
107
    objects. Currently ['semrep'] implemented.
108
    Filepaths and details according to settings.yaml.
109
    """
110
111
    def __init__(self, key, parser_key, name=None):
112
        """
113
        Initialization of the class.
114
        Input:
115
            - key: str,
116
            string denoting what extraction task is to take place
117
            - parser_key: str,
118
            string denoting what type of input to expect
119
            - name: str,
120
            optional string for the tast to be printed
121
        """
122
123
        self.key = key
124
        self.parser_key = parser_key
125
        if self.key == 'semrep':
126
            if str(settings['pipeline']['in']['parallel']) == 'True':
127
                self.func = extract_semrep_parallel
128
                time_log('Will use multiprocessing for the semrep extraction!')
129
            else:
130
                self.func = extract_semrep
131
        elif self.key == 'metamap':
132
            self.func = extract_metamap
133
            # self.func = extract_metamap
134
        elif self.key == 'reverb':
135
            raise NotImplementedError
136
        elif self.key == 'get_concepts_from_edges':
137
            if str(settings['pipeline']['in']['parallel']) == 'True':
138
                self.func = get_concepts_from_edges_parallel
139
            else:
140
                self.func = get_concepts_from_edges
141
            # self.func = extract_reverb
142
        if name:
143
            self.name = name
144
        else:
145
            self.name = self.key
146
147
    def run(self, json):
148
        """
149
        Run the corresponding extracting function and return the .json_
150
        dictionary result.
151
        """
152
153
        if type(json) == dict:
154
            json_ = self.func(json, self.parser_key)
155
            time_log('Completed extracting using %s!' % self.name)
156
        else:
157
            time_log('Unsupported type of json to work on!')
158
            time_log('Task : %s  --- Type of json: %s' % (self.name, type(json)))
159
            time_log(json)
160
            json_ = {}
161
        return json_
162
163
164
class Dumper(object):
165
    """
166
    Class for saving the extracted results. Expects to work with json files
167
    generated from the previous extraction phases. Currently implemented
168
    dumping methods for keys:
169
        -json : for the enriched medical documents
170
        -csv : for nodes, relations before importing into neo4j
171
        -neo4j: for nodes, relations updating neo4j db directly
172
    Filepaths and details according to settings.yaml.
173
    Params:
174
        - key: str,
175
        one of the json, csv, neo4j
176
        - inp_key: str,
177
        the Parser key for this pipeline
178
        - name: str,
179
        Name of the Dumper. For printing purposes only
180
    """
181
182
    def __init__(self, key, inp_key='text', name=None):
183
        self.key = key
184
        if self.key == 'json':
185
            self.transform = None
186
            self.func = save_json
187
            #self.func = save_json2
188
        elif self.key == 'csv':
189
            self.transform = create_neo4j_results
190
            self.func = create_neo4j_csv
191
        elif self.key == 'neo4j':
192
            self.transform = create_neo4j_results
193
            parallel_flag = str(settings['pipeline']['in']['parallel']) == 'True'
194
            self.func = update_neo4j
195
            if parallel_flag:
196
                self.func = update_neo4j_parallel
197
        elif self.key == 'mongo_sentences':
198
            self.transform = None
199
            self.func = update_mongo_sentences
200
        elif self.key == 'mongo':
201
            self.transform = None
202
            self.func = save_mongo
203
        if inp_key == 'text':
204
            self.type_ = 'harvester'
205
        elif inp_key == 'edges':
206
            self.type_ = 'edges'
207
        if name:
208
            self.name = name
209
        else:
210
            self.name = self.key
211
212
    def save(self, json_):
213
        if type(json_) == dict:
214
            if self.transform:
215
                results = self.transform(json_, self.type_)
216
            else:
217
                results = json_
218
            json_ = self.func(results)
219
            if self.key == 'mongo_sentences':
220
                out_p = '/'.join([settings[self.key]['uri'],settings[self.key]['db'],settings[self.key]['collection']])
221
                time_log('Completed saving data. Results saved in:\n %s' % out_p)
222
            else:
223
                time_log('Completed saving data. Results saved in:\n %s' % settings['out'][self.key]['out_path'])
224
        else:
225
            time_log('Unsupported type of json to work on!')
226
            time_log('Task : %s  --- Type of json: %s' % (self.name, type(json)))
227
            time_log(json)
228
            json_ = {}
229
        return json_
230
231
232
class taskCoordinator(object):
233
    """
234
    Orchestrator class for the different saving values.
235
    """
236
237
    def __init__(self):
238
        self.pipeline = {}
239
        self.phases = ['in', 'trans', 'out']
240
        for phase, dic_ in sorted(settings['pipeline'].iteritems()):
241
            self.pipeline[phase] = {}
242
            for key, value in dic_.iteritems():
243
                if value:
244
                    self.pipeline[phase][key] = value
245
246
    def run(self):
247
        parallel_flag = False
248
        stream_flag = False
249
        if 'parallel' in self.pipeline['in']:
250
            parallel_flag = True
251
        if 'stream' in self.pipeline['in']:
252
            stream_flag = True
253
        if parallel_flag or stream_flag:
254
            parser = Parser(self.pipeline['in']['source'], self.pipeline['in']['type'])
255
            ind_ = 0
256
            N = get_collection_count(parser.source, parser.key)
257
            while ind_ < N:
258
                old_ind = ind_
259
                json_all, ind_ = parser.read(N=N, ind_=ind_)
260
                outfield = settings['out']['json']['itemfield']
261
                if json_all:
262
                    json_ = json_all
263
                    for phase in self.phases:
264
                        dic = self.pipeline[phase]
265
                        if phase == 'trans':
266
                            for key, value in dic.iteritems():
267
                                if value:
268
                                    extractor = Extractor(key, parser.key)
269
                                    json_ = extractor.run(json_)
270
                        if phase == 'out':
271
                            for key, value in sorted(dic.iteritems()):
272
                                if value:
273
                                    dumper = Dumper(key, parser.key)
274
                                    dumper.save(json_)
275
                if ind_:
276
                    time_log('Processed %d documents in parallel. We are at index %d!' % (ind_ - old_ind, ind_))
277
                    proc = int(ind_/float(N)*100)
278
                if proc % 10 == 0 and proc > 0:
279
                    time_log('~'*50)
280
                    time_log('We are at %d/%d documents processed -- %0.2f %%' % (ind_, N, proc))
281
                    time_log('~'*50)
282
        else:
283
            parser = Parser(self.pipeline['in']['source'], self.pipeline['in']['type'])
284
            json_ = parser.read()
285
            for phase in self.phases:
286
                dic = self.pipeline[phase]
287
                if phase == 'trans':
288
                    for key, value in dic.iteritems():
289
                        if value:
290
                            extractor = Extractor(key, parser.key)
291
                            json_ = extractor.run(json_)
292
                if phase == 'out':
293
                    for key, value in sorted(dic.iteritems()):
294
                        if value:
295
                            dumper = Dumper(key, parser.key)
296
                            dumper.save(json_)
297
298
        # else:
299
        #     if 'stream' in self.pipeline['in']:
300
        #         stream_flag = True
301
        #     else:
302
        #         stream_flag = False
303
        #     if stream_flag:
304
        #         if self.pipeline['in']['inp'] == 'json' or self.pipeline['in']['inp'] == 'edges':
305
        #             inp_path = settings['load'][self.pipeline['in']['inp']]['inp_path']
306
        #             if self.pipeline['in']['inp'] == 'json':
307
        #                 outfield_inp = settings['load'][self.pipeline['in']['inp']]['docfield']
308
        #             elif self.pipeline['in']['inp'] == 'edges':
309
        #                 outfield_inp = settings['load'][self.pipeline['in']['inp']]['edge_field']
310
        #             else:
311
        #                 raise NotImplementedError
312
        #             outfield_out = settings['out']['json']['itemfield']
313
        #             c = 0
314
        #             with open(inp_path, 'r') as f:
315
        #                 docs = ijson2.items(f, '%s.item' % outfield_inp)
316
        #                 for item in docs:
317
        #                     c += 1
318
        #                     if c < 0:
319
        #                         continue
320
        #                     print c
321
        #                     time_log(c)
322
        #                     json_ = {outfield_out:[item]}
323
        #                     if self.pipeline['in']['inp'] == 'json':
324
        #                         json_ = parse_json(json_)
325
        #                     elif self.pipeline['in']['inp'] == 'edges':
326
        #                         json_ = parse_edges(json_)
327
        #                     parser = Parser(self.pipeline['in']['inp'])
328
        #                     for phase in self.phases:
329
        #                         dic = self.pipeline[phase]
330
        #                         if phase == 'trans':
331
        #                             for key, value in dic.iteritems():
332
        #                                 if value:
333
        #                                     extractor = Extractor(key, parser.key)
334
        #                                     json_ = extractor.run(json_)
335
        #                         if phase == 'out':
336
        #                             for key, value in sorted(dic.iteritems()):
337
        #                                 if value:
338
        #                                     dumper = Dumper(key, self.pipeline['in']['inp'])
339
        #                                     dumper.save(json_)
340
341
        #                 if int(c) % 1000 == 0 and c > 1000:
342
        #                     time_log('Processed %d documents in stream mode!' % (c))
343
        #         elif self.pipeline['in']['inp'] == 'mongo':
344
        #             parser = Parser(self.pipeline['in']['inp'])
345
        #             ind_ = 0#2390
346
        #             while ind_ or (ind_ == 0):
347
        #                 old_ind = ind_
348
        #                 json_all, ind_, N = parser.read(ind_)
349
        #                 if not(ind_):
350
        #                     break
351
        #                 outfield = settings['out']['json']['itemfield']
352
        #                 if json_all:
353
        #                     json_ = json_all
354
        #                     for phase in self.phases:
355
        #                         dic = self.pipeline[phase]
356
        #                         if phase == 'trans':
357
        #                             for key, value in dic.iteritems():
358
        #                                 if value:
359
        #                                     extractor = Extractor(key, parser.key)
360
        #                                     json_ = extractor.run(json_)
361
        #                         if phase == 'out':
362
        #                             for key, value in sorted(dic.iteritems()):
363
        #                                 if value:
364
        #                                     dumper = Dumper(key, parser.key)
365
        #                                     dumper.save(json_)
366
        #                 if ind_:
367
        #                     time_log('Processed %d documents in parallel. We are at index %d!' % (ind_ - old_ind, ind_))
368
        #                     proc = int(ind_/float(N)*100)
369
        #                 if proc % 10 == 0 and proc > 0:
370
        #                     time_log('~'*50)
371
        #                     time_log('We are at %d/%d documents processed -- %0.2f %%' % (ind_, N, proc))
372
        #                     time_log('~'*50)
373
374
            # parser = Parser(self.pipeline['in']['inp'])
375
            # outfield = settings['out']['json']['itemfield']
376
            # json_all = parser.read()
377
            # if stream_flag:
378
            #     for item in json_all[outfield]:
379
            #         json_ = {outfield:[item]}
380
            #         for phase in self.phases:
381
            #             dic = self.pipeline[phase]
382
            #             if phase == 'trans':
383
            #                 for key, value in dic.iteritems():
384
            #                     if value:
385
            #                         extractor = Extractor(key, parser.key)
386
            #                         json_ = extractor.run(json_)
387
            #             if phase == 'out':
388
            #                 for key, value in sorted(dic.iteritems()):
389
            #                     if value:
390
            #                         dumper = Dumper(key, parser.key)
391
            #                         dumper.save(json_)
392
393
394
395
396
        # parser = Parser(self.pipeline['in']['inp'])
397
        # out_outfield = settings['out']['json']['itemfield']
398
        # json_ = parser.read()
399
        # for doc in tqdm(json_[out_outfield]):
400
        #     tmp = {out_outfield:[doc]}
401
        #     for phase in self.phases:
402
        #         dic = self.pipeline[phase]
403
        #         if phase == 'in':
404
        #             pass
405
        #         if phase == 'trans':
406
        #             for key, value in dic.iteritems():
407
        #                 if value:
408
        #                     extractor = Extractor(key, parser.key)
409
        #                     tmp = extractor.run(tmp)
410
        #         if phase == 'out':
411
        #             for key, value in sorted(dic.iteritems()):
412
        #                 if value:
413
        #                     dumper = Dumper(key, parser.key)
414
        #                     dumper.save(tmp)
415
416
    def print_pipeline(self):
417
        print('#'*30 + ' Pipeline Schedule' + '#'*30)
418
        for phase in self.phases:
419
            dic = self.pipeline[phase]
420
            if phase == 'in':
421
                if dic['source'] == 'delete':
422
                    print("Will delete all %s resource associated edges!" % settings['neo4j']['resource'])
423
                    break
424
                if dic['source'] == 'file':
425
                    source = settings['load']['path']['file_path']
426
                elif dic['source'] == 'mongo':
427
                    source = settings['load']['mongo']['file_path']
428
                print('Will read from: %s' % source)
429
            if phase == 'trans':
430
                print('Will use the following transformation utilities:')
431
                for key, value in dic.iteritems():
432
                    print ('- %s' % key)
433
            if phase == 'out':
434
                print('Will save the outcome as follows:')
435
                for key, value in dic.iteritems():
436
                    if key == 'mongo_sentences':
437
                        out_p = '/'.join([settings[key]['uri'],settings[key]['db'],settings[key]['collection']])
438
                        print('%s  : %s' % (key, out_p))
439
                    else:
440
                        print('%s  : %s' % (key, settings['out'][key]['out_path']))
441
        print('#'*30 + ' Pipeline Schedule ' + '#'*30)