a b/data_loader.py
1
#!/usr/bin/python !/usr/bin/env python
2
# -*- coding: utf-8 -*
3
4
5
# Functions to extract knowledge from medical text. Everything related to 
6
# reading and parsing.
7
8
import json
9
import py2neo
10
import pymongo
11
import langid
12
import pandas as pd
13
from config import settings
14
from utilities import time_log
15
from multiprocessing import cpu_count
16
import ijson.backends.yajl2_cffi as ijson2
17
18
19
def load_mongo(key):
20
    """
21
    Parse collection from mongo
22
    Input:
23
        - key: str,
24
        the type of input to read
25
    Output:
26
        - json_ : dic,
27
        json-style dictionary with a field containing
28
        documents
29
    """
30
31
    # input mongo variables from settings.yaml
32
    uri = settings['load']['mongo']['uri']
33
    db_name = settings['load']['mongo']['db']
34
    collection_name = settings['load']['mongo']['collection']
35
    client = pymongo.MongoClient(uri)
36
    db = client[db_name]
37
    collection = db[collection_name]
38
    # itemfield containing list of elements
39
    out_outfield = settings['out']['json']['itemfield']
40
    json_ = {out_outfield: []}
41
    cur = collection.find({})
42
    for item in cur:
43
        del item['_id']
44
        json_[out_outfield].append(item)
45
    return json_
46
47
48
def load_mongo_batches(key, N_collection, ind_=0):
49
    """
50
    Parse collection from mongo to be processed in streaming/parallel fashion.
51
    Fetches step = (N X numb_cores) of documents starting from ind_ and
52
    delivers it to the rest of the pipeline.
53
    Input:
54
        - key: str,
55
        the type of input to read
56
        - N_collection: int,
57
        total collection length
58
        - ind: int,
59
        the starting point of the batch (or stream) to be read
60
    Output:
61
        - json_ : dic,
62
        json-style dictionary with a field containing
63
        items
64
    """
65
    # input file path from settings.yaml
66
    uri = settings['load']['mongo']['uri']
67
    db_name = settings['load']['mongo']['db']
68
    collection_name = settings['load']['mongo']['collection']
69
    client = pymongo.MongoClient(uri)
70
    db = client[db_name]
71
    collection = db[collection_name]
72
    # itemfield containing list of elements
73
    out_outfield = settings['out']['json']['itemfield']
74
    json_ = {out_outfield: []}
75
    stream_flag = str(settings['pipeline']['in']['stream']) == 'True'
76
    # batch size in case of streaming enviroment is just one
77
    if stream_flag:
78
        step = 1
79
    # else N_THREADS*
80
    else:
81
        try:
82
            N_THREADS = int(settings['num_cores'])
83
        except:
84
            N_THREADS = cpu_count()
85
        try:
86
            batch_per_core = int(settings['batch_per_core'])
87
        except:
88
            batch_per_core = 100
89
        step = N_THREADS * batch_per_core
90
    time_log("Will start from %d/%d and read %d items" % (ind_, N_collection, step))
91
    if step > N_collection:
92
        step = N_collection
93
    else:
94
        cur = collection.find({}, skip=ind_, limit=step)
95
        c = 0
96
        for item in cur:
97
            del item['_id']
98
            c += 1
99
            json_[out_outfield].append(item)
100
        return json_, ind_ + step
101
102
def load_file(key):
103
    """
104
    Parse file containing items.
105
    Input:
106
        - key: str,
107
        the type of input to read
108
    Output:
109
        - json_ : dic,
110
        json-style dictionary with items
111
    """
112
113
    # input file path from settings.yamml
114
    if key == 'med_rec':
115
        json_ = parse_medical_rec()
116
    else:
117
        inp_path = settings['load']['path']['file_path']
118
        with open(inp_path, 'r') as f:
119
            json_ = json.load(f, encoding='utf-8')
120
    return json_
121
122
123
def load_file_batches(key, N_collection, ind_=0):
124
    """
125
    Parse collection from file to be processed in streaming/parallel fashion.
126
    Fetches step = (N X numb_cores) of documents starting from ind_ and
127
    delivers it to the rest of the pipeline.
128
    Input:
129
        - key: str,
130
        the type of input to read
131
        - N_collection: int,
132
        total collection length
133
        - ind: int,
134
        the starting point of the batch (or stream) to be read  
135
    Output:
136
        - json_ : dic,
137
        json-style dictionary with a field containing
138
        items
139
    """
140
    # Filepath to item collection
141
    inp_path = settings['load']['path']['file_path']
142
    # Document iterator field in the collection
143
    infield = settings['load'][key]['itemfield']
144
    # itemfield containing list of elements
145
    out_outfield = settings['out']['json']['itemfield']
146
    # The generated json_
147
    json_ = {out_outfield: []}
148
    # Check if streaming
149
    stream_flag = str(settings['pipeline']['in']['stream']) == 'True'
150
    # batch size in case of streaming enviroment is just one
151
    if stream_flag:
152
        step = 1
153
    # else N_THREADS* Batches_per_core
154
    else:
155
        try:
156
            N_THREADS = int(settings['num_cores'])
157
        except:
158
            N_THREADS = cpu_count()
159
        try:
160
            batch_per_core = int(settings['batch_per_core'])
161
        except:
162
            batch_per_core = 100
163
        step = N_THREADS * batch_per_core
164
    if step > N_collection:
165
        step = N_collection
166
    # Collection counter
167
    col_counter = 0
168
    #print infield
169
    time_log("Will start from %d/%d and read %d items" % (ind_, N_collection, step))
170
    with open(inp_path, 'r') as f:
171
        docs = ijson2.items(f, '%s.item' % infield)
172
        for c, item in enumerate(docs):
173
            if c < ind_:
174
                continue
175
            json_[out_outfield].append(item)
176
            #print json_
177
            col_counter += 1
178
            if col_counter >= step:
179
                break
180
    
181
    if col_counter == 0:
182
        #print 'Col_counter'
183
        #print col_counter
184
        return None, None
185
    else:
186
        #print json_
187
        return json_, ind_ + step
188
189
190
def parse_medical_rec():
191
    """
192
    Parse file containing medical records.
193
    Output:
194
        - json_ : dic,
195
        json-style dictionary with documents containing
196
        a list of dicts, containing the medical record and the corresponding
197
        attributes
198
    """
199
200
    # path to file to read from
201
    inp_path = settings['load']['path']['file_path']
202
    # csv seperator from settings.yaml
203
    sep = settings['load']['med_rec']['sep']
204
    # textfield to read text from
205
    textfield = settings['load']['med_rec']['textfield']
206
    # idfield where id of document is stored
207
    idfield = settings['load']['med_rec']['idfield']
208
    with open(inp_path, 'r') as f:
209
        diag = pd.DataFrame.from_csv(f, sep='\t')
210
    # Get texts
211
    texts = diag[textfield].values
212
    # outerfield for the documents in json
213
    itemfield = settings['out']['json']['itemfield']
214
    # textfield to read text from
215
    out_textfield = settings['out']['json']['json_text_field']
216
    # labelfield where title of the document is stored
217
    out_labelfield = settings['out']['json']['json_label_field']
218
    diag[out_labelfield] = ['Medical Record' + str(i) for i in diag.index.values.tolist()]
219
    if not('journal' in diag.columns.tolist()):
220
        diag['journal'] = ['None' for i in diag.index.values.tolist()]
221
    # Replace textfiled with out_textfield
222
    diag[out_textfield] = diag[textfield]
223
    del diag[textfield]
224
    # Replace id with default out_idfield
225
    diag['id'] = diag[idfield]
226
    del diag[idfield]
227
    json_ = {itemfield: diag.to_dict(orient='records')}
228
    return json_
229
230
231
232
233
def parse_text(json_):
234
    """
235
    Helper function to parse the loaded documents. Specifically,
236
    we ignore documents with no assigned text field. We also provide
237
    an empty string for label if non-existent. Other than that, norma-
238
    lizing the id,text and label fields as indicated in the settings.
239
    Input:
240
        - json_: dicm
241
        json-style dictionary with a field containing
242
        items
243
    Output:
244
        - json_ : dic,
245
        json-style dictionary with a field containing normalized and
246
        cleaned items
247
    """
248
249
    ## Values to read from
250
251
    # itemfield containing list of elements containing text
252
    outfield = settings['load']['text']['itemfield']
253
    # textfield to read text from
254
    textfield = settings['load']['text']['textfield']
255
    # idfield where id of document is stored
256
    idfield = settings['load']['text']['idfield']
257
    # labelfield where title of the document is stored
258
    labelfield = settings['load']['text']['labelfield']
259
    
260
    ## Values to replace them with ##
261
262
    # itemfield containing list of elements
263
    out_outfield = settings['out']['json']['itemfield']
264
    # textfield to read text from
265
    out_textfield = settings['out']['json']['json_text_field']
266
    # idfield where id of document is stored
267
    out_idfield = settings['out']['json']['json_id_field']
268
    # labelfield where title of the document is stored
269
    out_labelfield = settings['out']['json']['json_label_field']
270
    json_[outfield] = [art for art in json_[outfield] if textfield in art.keys()]
271
    json_[outfield] = [art for art in json_[outfield] if langid.classify(art[textfield])[0] == 'en']
272
    for article in json_[outfield]:
273
        article[out_textfield] = article.pop(textfield)
274
        article[out_idfield] = article.pop(idfield)
275
        if labelfield != 'None':
276
            article[out_labelfield] = article.pop(labelfield)
277
        else:
278
            article[out_labelfield] = ' '
279
        if not('journal' in article.keys()):
280
            article['journal'] = 'None'
281
    json_[out_outfield] = json_.pop(outfield)
282
    # N = len(json_[out_outfield])
283
    # json_[out_outfield] = json_[out_outfield][(2*N/5):(3*N/5)]
284
    json_[out_outfield] = json_[out_outfield][:]
285
    return json_
286
287
288
def parse_remove_edges(key=None):
289
    """
290
    Dummy function to conform with the pipeline when
291
    we just want to delete edges instead of inserting
292
    them.
293
    Output:
294
        - an empty dic to be passed around, as to 
295
        conform to the pipeline schema 
296
    """
297
298
    # Read neo4j essentials before 
299
    host = settings['neo4j']['host']
300
    port = settings['neo4j']['port']
301
    user = settings['neo4j']['user']
302
    password = settings['neo4j']['password']
303
    try:
304
        graph = py2neo.Graph(host=host, port=port, user=user, password=password)
305
    except Exception, e:
306
        #time_log(e)
307
        #time_log("Couldn't connect to db! Check settings!")
308
        exit(2)
309
    quer1 = """ MATCH ()-[r]->() WHERE r.resource = "%s" DELETE r;""" % (settings['neo4j']['resource'])
310
    f = graph.run(quer1)
311
    rem = f.stats()['relationships_deleted']
312
    quer2 = """ MATCH ()-[r]->() WHERE "%s" in  r.resource SET 
313
    r.resource = FILTER(x IN r.resource WHERE x <> "%s");""" % (settings['neo4j']['resource'], settings['neo4j']['resource'])
314
    f = graph.run(quer2)
315
    alt = f.stats()['properties_set']
316
    time_log('Removed %d edges that were found only in %s' % (rem, settings['neo4j']['resource']))
317
    time_log("Altered %s edges' resource attribute associated with %s" % (alt, settings['neo4j']['resource']))
318
    exit(1)
319
    return {}
320
321
322
def get_collection_count(source, type):
323
    """
324
    Helper function to get total collection length.
325
    Input:
326
        - source: str, value denoting where we will read from (e.g 'mongo')
327
        - type: str, value denoting what we will read (e.g. text, edges)
328
    Output:
329
        - N_collection: int,
330
        number of items in the collection
331
    """
332
    if source == 'file':
333
        inp_path = settings['load']['path']['file_path'] 
334
        # Document iterator field in the collection
335
        infield = settings['load'][type]['itemfield']
336
        with open(inp_path, 'r') as f:
337
            docs = ijson2.items(f, '%s.item' % infield)
338
            N_collection = 0
339
            for item in docs:
340
                N_collection += 1
341
    elif source == 'mongo':
342
        # input mongo variables from settings.yaml
343
        uri = settings['load']['mongo']['uri']
344
        db_name = settings['load']['mongo']['db']
345
        collection_name = settings['load']['mongo']['collection']
346
        client = pymongo.MongoClient(uri)
347
        db = client[db_name]
348
        collection = db[collection_name]
349
        N_collection = collection.count()
350
    else:
351
        time_log("Can't calculate total collection count for source type %s" % settings['in']['source'])
352
        raise NotImplementedError
353
    return N_collection