[944daf]: / data_loader.py

Download this file

354 lines (324 with data), 12.1 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
#!/usr/bin/python !/usr/bin/env python
# -*- coding: utf-8 -*
# Functions to extract knowledge from medical text. Everything related to
# reading and parsing.
import json
import py2neo
import pymongo
import langid
import pandas as pd
from config import settings
from utilities import time_log
from multiprocessing import cpu_count
import ijson.backends.yajl2_cffi as ijson2
def load_mongo(key):
"""
Parse collection from mongo
Input:
- key: str,
the type of input to read
Output:
- json_ : dic,
json-style dictionary with a field containing
documents
"""
# input mongo variables from settings.yaml
uri = settings['load']['mongo']['uri']
db_name = settings['load']['mongo']['db']
collection_name = settings['load']['mongo']['collection']
client = pymongo.MongoClient(uri)
db = client[db_name]
collection = db[collection_name]
# itemfield containing list of elements
out_outfield = settings['out']['json']['itemfield']
json_ = {out_outfield: []}
cur = collection.find({})
for item in cur:
del item['_id']
json_[out_outfield].append(item)
return json_
def load_mongo_batches(key, N_collection, ind_=0):
"""
Parse collection from mongo to be processed in streaming/parallel fashion.
Fetches step = (N X numb_cores) of documents starting from ind_ and
delivers it to the rest of the pipeline.
Input:
- key: str,
the type of input to read
- N_collection: int,
total collection length
- ind: int,
the starting point of the batch (or stream) to be read
Output:
- json_ : dic,
json-style dictionary with a field containing
items
"""
# input file path from settings.yaml
uri = settings['load']['mongo']['uri']
db_name = settings['load']['mongo']['db']
collection_name = settings['load']['mongo']['collection']
client = pymongo.MongoClient(uri)
db = client[db_name]
collection = db[collection_name]
# itemfield containing list of elements
out_outfield = settings['out']['json']['itemfield']
json_ = {out_outfield: []}
stream_flag = str(settings['pipeline']['in']['stream']) == 'True'
# batch size in case of streaming enviroment is just one
if stream_flag:
step = 1
# else N_THREADS*
else:
try:
N_THREADS = int(settings['num_cores'])
except:
N_THREADS = cpu_count()
try:
batch_per_core = int(settings['batch_per_core'])
except:
batch_per_core = 100
step = N_THREADS * batch_per_core
time_log("Will start from %d/%d and read %d items" % (ind_, N_collection, step))
if step > N_collection:
step = N_collection
else:
cur = collection.find({}, skip=ind_, limit=step)
c = 0
for item in cur:
del item['_id']
c += 1
json_[out_outfield].append(item)
return json_, ind_ + step
def load_file(key):
"""
Parse file containing items.
Input:
- key: str,
the type of input to read
Output:
- json_ : dic,
json-style dictionary with items
"""
# input file path from settings.yamml
if key == 'med_rec':
json_ = parse_medical_rec()
else:
inp_path = settings['load']['path']['file_path']
with open(inp_path, 'r') as f:
json_ = json.load(f, encoding='utf-8')
return json_
def load_file_batches(key, N_collection, ind_=0):
"""
Parse collection from file to be processed in streaming/parallel fashion.
Fetches step = (N X numb_cores) of documents starting from ind_ and
delivers it to the rest of the pipeline.
Input:
- key: str,
the type of input to read
- N_collection: int,
total collection length
- ind: int,
the starting point of the batch (or stream) to be read
Output:
- json_ : dic,
json-style dictionary with a field containing
items
"""
# Filepath to item collection
inp_path = settings['load']['path']['file_path']
# Document iterator field in the collection
infield = settings['load'][key]['itemfield']
# itemfield containing list of elements
out_outfield = settings['out']['json']['itemfield']
# The generated json_
json_ = {out_outfield: []}
# Check if streaming
stream_flag = str(settings['pipeline']['in']['stream']) == 'True'
# batch size in case of streaming enviroment is just one
if stream_flag:
step = 1
# else N_THREADS* Batches_per_core
else:
try:
N_THREADS = int(settings['num_cores'])
except:
N_THREADS = cpu_count()
try:
batch_per_core = int(settings['batch_per_core'])
except:
batch_per_core = 100
step = N_THREADS * batch_per_core
if step > N_collection:
step = N_collection
# Collection counter
col_counter = 0
#print infield
time_log("Will start from %d/%d and read %d items" % (ind_, N_collection, step))
with open(inp_path, 'r') as f:
docs = ijson2.items(f, '%s.item' % infield)
for c, item in enumerate(docs):
if c < ind_:
continue
json_[out_outfield].append(item)
#print json_
col_counter += 1
if col_counter >= step:
break
if col_counter == 0:
#print 'Col_counter'
#print col_counter
return None, None
else:
#print json_
return json_, ind_ + step
def parse_medical_rec():
"""
Parse file containing medical records.
Output:
- json_ : dic,
json-style dictionary with documents containing
a list of dicts, containing the medical record and the corresponding
attributes
"""
# path to file to read from
inp_path = settings['load']['path']['file_path']
# csv seperator from settings.yaml
sep = settings['load']['med_rec']['sep']
# textfield to read text from
textfield = settings['load']['med_rec']['textfield']
# idfield where id of document is stored
idfield = settings['load']['med_rec']['idfield']
with open(inp_path, 'r') as f:
diag = pd.DataFrame.from_csv(f, sep='\t')
# Get texts
texts = diag[textfield].values
# outerfield for the documents in json
itemfield = settings['out']['json']['itemfield']
# textfield to read text from
out_textfield = settings['out']['json']['json_text_field']
# labelfield where title of the document is stored
out_labelfield = settings['out']['json']['json_label_field']
diag[out_labelfield] = ['Medical Record' + str(i) for i in diag.index.values.tolist()]
if not('journal' in diag.columns.tolist()):
diag['journal'] = ['None' for i in diag.index.values.tolist()]
# Replace textfiled with out_textfield
diag[out_textfield] = diag[textfield]
del diag[textfield]
# Replace id with default out_idfield
diag['id'] = diag[idfield]
del diag[idfield]
json_ = {itemfield: diag.to_dict(orient='records')}
return json_
def parse_text(json_):
"""
Helper function to parse the loaded documents. Specifically,
we ignore documents with no assigned text field. We also provide
an empty string for label if non-existent. Other than that, norma-
lizing the id,text and label fields as indicated in the settings.
Input:
- json_: dicm
json-style dictionary with a field containing
items
Output:
- json_ : dic,
json-style dictionary with a field containing normalized and
cleaned items
"""
## Values to read from
# itemfield containing list of elements containing text
outfield = settings['load']['text']['itemfield']
# textfield to read text from
textfield = settings['load']['text']['textfield']
# idfield where id of document is stored
idfield = settings['load']['text']['idfield']
# labelfield where title of the document is stored
labelfield = settings['load']['text']['labelfield']
## Values to replace them with ##
# itemfield containing list of elements
out_outfield = settings['out']['json']['itemfield']
# textfield to read text from
out_textfield = settings['out']['json']['json_text_field']
# idfield where id of document is stored
out_idfield = settings['out']['json']['json_id_field']
# labelfield where title of the document is stored
out_labelfield = settings['out']['json']['json_label_field']
json_[outfield] = [art for art in json_[outfield] if textfield in art.keys()]
json_[outfield] = [art for art in json_[outfield] if langid.classify(art[textfield])[0] == 'en']
for article in json_[outfield]:
article[out_textfield] = article.pop(textfield)
article[out_idfield] = article.pop(idfield)
if labelfield != 'None':
article[out_labelfield] = article.pop(labelfield)
else:
article[out_labelfield] = ' '
if not('journal' in article.keys()):
article['journal'] = 'None'
json_[out_outfield] = json_.pop(outfield)
# N = len(json_[out_outfield])
# json_[out_outfield] = json_[out_outfield][(2*N/5):(3*N/5)]
json_[out_outfield] = json_[out_outfield][:]
return json_
def parse_remove_edges(key=None):
"""
Dummy function to conform with the pipeline when
we just want to delete edges instead of inserting
them.
Output:
- an empty dic to be passed around, as to
conform to the pipeline schema
"""
# Read neo4j essentials before
host = settings['neo4j']['host']
port = settings['neo4j']['port']
user = settings['neo4j']['user']
password = settings['neo4j']['password']
try:
graph = py2neo.Graph(host=host, port=port, user=user, password=password)
except Exception, e:
#time_log(e)
#time_log("Couldn't connect to db! Check settings!")
exit(2)
quer1 = """ MATCH ()-[r]->() WHERE r.resource = "%s" DELETE r;""" % (settings['neo4j']['resource'])
f = graph.run(quer1)
rem = f.stats()['relationships_deleted']
quer2 = """ MATCH ()-[r]->() WHERE "%s" in r.resource SET
r.resource = FILTER(x IN r.resource WHERE x <> "%s");""" % (settings['neo4j']['resource'], settings['neo4j']['resource'])
f = graph.run(quer2)
alt = f.stats()['properties_set']
time_log('Removed %d edges that were found only in %s' % (rem, settings['neo4j']['resource']))
time_log("Altered %s edges' resource attribute associated with %s" % (alt, settings['neo4j']['resource']))
exit(1)
return {}
def get_collection_count(source, type):
"""
Helper function to get total collection length.
Input:
- source: str, value denoting where we will read from (e.g 'mongo')
- type: str, value denoting what we will read (e.g. text, edges)
Output:
- N_collection: int,
number of items in the collection
"""
if source == 'file':
inp_path = settings['load']['path']['file_path']
# Document iterator field in the collection
infield = settings['load'][type]['itemfield']
with open(inp_path, 'r') as f:
docs = ijson2.items(f, '%s.item' % infield)
N_collection = 0
for item in docs:
N_collection += 1
elif source == 'mongo':
# input mongo variables from settings.yaml
uri = settings['load']['mongo']['uri']
db_name = settings['load']['mongo']['db']
collection_name = settings['load']['mongo']['collection']
client = pymongo.MongoClient(uri)
db = client[db_name]
collection = db[collection_name]
N_collection = collection.count()
else:
time_log("Can't calculate total collection count for source type %s" % settings['in']['source'])
raise NotImplementedError
return N_collection