a b/data_saver.py
1
#!/usr/bin/python !/usr/bin/env python
2
# -*- coding: utf-8 -*
3
4
5
# Functions to extract knowledge from medical text. Everything related to 
6
# reading, parsing and extraction needed for the knowledge base. Also,
7
# some wrappers for SemRep, MetaMap and Reverb.
8
9
import json
10
import os
11
import py2neo
12
import unicodecsv as csv2
13
import pymongo
14
from config import settings
15
from utilities import time_log
16
from data_extractor import chunk_document_collection
17
from multiprocessing import cpu_count, Pool
18
19
20
suppress_log_to_file = py2neo.watch('neo4j',
21
                                    level='ERROR', out='./out/neo4j.log')
22
suppress_log_to_file2 = py2neo.watch('httpstream',
23
                                    level='ERROR', out='./out/neo4j.log')
24
25
def save_json2(json_):
26
    """
27
    Helper function to save enriched medical json to file.
28
    Input:
29
        - json_: dic,
30
        json-style dictionary generated from the extractors in the
31
        previous phase
32
    """
33
34
    # Output file location from settings
35
    outfile = settings['out']['json']['out_path']
36
    with open(outfile, 'w+') as f:
37
        json.dump(json_, f, indent=3)
38
39
def save_json(json_):
40
    """
41
    Helper function to save enriched medical json to file
42
.    Input:
43
        - json_: dic,
44
        json-style dictionary generated from the extractors in the
45
        previous phase
46
    """
47
48
    # Output file location from settings
49
    outfile = settings['out']['json']['out_path']
50
    if settings['pipeline']['in']['stream'] or settings['pipeline']['in']['parallel']:
51
        print 'mpainei append'
52
        if os.path.isfile(outfile):
53
            with open(outfile, 'r') as f:
54
                docs1 = json.load(f)[settings['out']['json']['json_doc_field']]
55
            json_[settings['out']['json']['json_doc_field']] = json_[settings['out']['json']['json_doc_field']] + docs1
56
    with open(outfile, 'w+') as f:
57
        json.dump(json_, f, indent=3)
58
59
    # with open (outfile, mode="r+") as file:
60
    #     file.seek(0,2)
61
    #     position = file.tell() -1
62
    #     file.seek(position)
63
    #     file.write( ",{}]".format(json.dumps(dictionary)) )
64
    
65
    # with open(outfile, 'a+') as f:
66
    #     json1 = json.load(f)
67
        
68
69
70
71
def save_csv(json_):
72
    """
73
    Helper function to save enriched medical json to file.
74
    Input:
75
        - json_: dic,
76
        json-style dictionary generated from the extractors in the
77
        previous phase
78
    """
79
80
    # Output file location from settings
81
    outfile = settings['out']['json']['out_path']
82
    with open(outfile, 'w+') as f:
83
        json.dump(json_, f, indent=3)
84
85
86
87
def save_neo4j(json_):
88
    """
89
    Helper function to save enriched medical json to file.
90
    Input:
91
        - json_: dic,
92
        json-style dictionary generated from the extractors in the
93
        previous phase
94
    """
95
96
    # Output file location from settings
97
    outfile = settings['out']['json']['out_path']
98
    with open(outfile, 'w+') as f:
99
        json.dump(json_, f, indent=3)
100
101
def aggregate_mentions(entity_pmc_edges):
102
    """
103
    Function to aggregate recurring entity:MENTIONED_IN:pmc relations.
104
    Input:
105
        - entity_pmc_edges: list,
106
        list of dicts as generated by create_neo4j_ functions
107
    Outpu:
108
        - entity_pmc_edges: list,
109
        list of dicts with aggregated values in identical ages
110
    """
111
    uniques = {}
112
    c = 0
113
    for edge in entity_pmc_edges:
114
        cur_key = str(edge[':START_ID'])+'_'+str(edge[':END_ID'])
115
        flag = False
116
        if cur_key in uniques:
117
            uniques[cur_key]['score:float[]'] = uniques[cur_key]['score:float[]']+';'+edge['score:float[]']
118
            uniques[cur_key]['sent_id:string[]'] = uniques[cur_key]['sent_id:string[]']+';'+edge['sent_id:string[]']
119
            uniques[cur_key]['resource:string[]'] = uniques[cur_key]['resource:string[]']+';'+edge['resource:string[]']
120
            flag = True
121
        else:
122
            uniques[cur_key] = edge
123
        if flag:
124
            c += 1
125
    un_list = []
126
    time_log('Aggregated %d mentions from %d in total' % (c, len(entity_pmc_edges)))
127
    for k, v in uniques.iteritems():
128
        un_list.append(v)
129
    return un_list
130
131
132
def aggregate_relations(relations_edges):
133
    """
134
    Function to aggregate recurring entity:SEMREP_RELATION:entity relations.
135
    Input:
136
        - relations_edges: list,
137
        list of dicts as generated by create_neo4j_ functions
138
    Outpu:
139
        - relations_edges: list,
140
        list of dicts with aggregated values in identical ages
141
    """
142
    uniques = {}
143
    c = 0
144
    for edge in relations_edges:
145
        cur_key = str(edge[':START_ID'])+'_'+str(edge[':TYPE'])+'_'+str(edge[':END_ID'])
146
        flag = False
147
        if cur_key in uniques:
148
            if 'sent_id:string[]' in edge.keys():
149
                if edge['sent_id:string[]'] in uniques[cur_key]['sent_id:string[]']:
150
                    continue
151
            for field in edge.keys():
152
                if not(field in [':START_ID', ':TYPE', ':END_ID']):
153
                    uniques[cur_key][field] = uniques[cur_key][field]+';'+edge[field]
154
            flag = True
155
        else:
156
            uniques[cur_key] = edge
157
        if flag:
158
            c += 1
159
    un_list = []
160
    time_log('Aggregated %d relations from %d in total' % (c, len(relations_edges)))
161
    for k, v in uniques.iteritems():
162
        un_list.append(v)
163
    return un_list
164
165
166
def create_neo4j_results(json_, key='harvester'):
167
    """
168
    Helper function to call either the create_neo4j_harvester or the
169
    create_neo4j_edges function, according to the type of input.
170
    Input:
171
        - json_: dic,
172
        dictionary-json style generated from the parsers/extractors in the
173
        previous stages
174
        - key: str,
175
        string for denoting which create_neo4j_ function to use
176
    Output:
177
        - results: dic,
178
        json-style dictionary with keys 'nodes' and 'edges' containing
179
        a list of the transformed nodes and edges to be created/updated in
180
        neo4j. Each element in the list has a 'type' field denoting the type
181
        of the node/edge and the 'value' field containg the nodes/edges
182
    """
183
    if key == 'harvester':
184
        results = create_neo4j_harvester(json_)
185
    elif key == 'edges':
186
        results = create_neo4j_edges(json_)
187
    else:
188
        time_log('Type %s of data not yet supported!' % key)
189
        raise NotImplementedError
190
    return results
191
192
def create_neo4j_edges(json_):
193
    """
194
    Function that takes the edges file as provided and generates the nodes
195
    and relationships entities needed for creating/updating the neo4j database.
196
    Currently supporting: 
197
        - Nodes: ['Articles(PMC)', 'Entities(MetaMapConcepts)'] 
198
        - Edges: ['Relations between Entities']
199
    Input:
200
        - json_: dic,
201
        json-style dictionary generated from the parser in the
202
        previous phase
203
    Output:
204
        - results: dic,
205
        json-style dictionary with keys 'nodes' and 'edges' containing
206
        a list of the transformed nodes and edges to be created/updated in
207
        neo4j. Each element in the list has a 'type' field denoting the type
208
        of the node/edge and the 'value' field containg the nodes/edges
209
    """
210
    # docfield containing list of elements containing the relations
211
    edgefield = settings['load']['edges']['itemfield']
212
    # field containing the type of the node for the subject
213
    sub_type = settings['load']['edges']['sub_type']
214
    # field containing the source of the node for the subject
215
    sub_source = settings['load']['edges']['sub_source']
216
    # field containing the type of the node for the object
217
    obj_type = settings['load']['edges']['obj_type']
218
    # field containing the source of the node for the object
219
    obj_source = settings['load']['edges']['obj_source']
220
    results = {'nodes':[], 'edges':[{'type':'NEW', 'values':[]}]}
221
    entities_nodes = []
222
    articles_nodes = []
223
    other_nodes_sub = []
224
    other_nodes_obj = []
225
226
    for edge in json_[edgefield]:
227
        if sub_type == 'Entity':
228
            if not(edge['s'] in entities_nodes):
229
                entities_nodes.append(edge['s'])
230
        elif sub_type == 'Article':
231
            if not(edge['s'] in articles_nodes):
232
                articles_nodes.append(edge['s'])
233
        else:
234
            if not(edge['s'] in other_nodes_sub):
235
                other_nodes_sub.append(edge['s'])
236
        if obj_type == 'Entity':
237
            if not(edge['o'] in entities_nodes):
238
                entities_nodes.append(edge['o'])
239
        elif obj_type == 'Article':
240
            if not(edge['o'] in articles_nodes):
241
                articles_nodes.append(edge['o'])
242
        else:
243
            if not(edge['o'] in other_nodes_obj):
244
                other_nodes_obj.append(edge['o'])
245
        #sub_id_key = next((key for key in edge['s'].keys() if ':ID' in key), None)
246
        #obj_id_key = next((key for key in edge['o'].keys() if ':ID' in key), None)
247
        results['edges'][0]['values'].append({':START_ID':edge['s']['id:ID'], ':TYPE':edge['p'], 'resource:string[]':settings['neo4j']['resource'], ':END_ID':edge['o']['id:ID']})
248
    if entities_nodes:
249
        results['nodes'].append({'type': 'Entity', 'values': entities_nodes})
250
    if articles_nodes:
251
        results['nodes'].append({'type': 'Article', 'values': articles_nodes})
252
    if other_nodes_sub:
253
        results['nodes'].append({'type': sub_type, 'values': other_nodes_sub})
254
    if other_nodes_obj:
255
        results['nodes'].append({'type': obj_type, 'values': other_nodes_obj})
256
    return results
257
258
def create_neo4j_harvester(json_):
259
    """
260
    Function that takes the enriched json_ file and generates the nodes
261
    and relationships entities needed for creating/updating the neo4j database.
262
    Currently supporting: 
263
        - Nodes: ['Articles(PMC)', 'Entities(UMLS-Concepts)'] 
264
        - Edges: ['Relations between Entities', 'Entity:MENTIONED_IN:Article']
265
    Input:
266
        - json_: dic,
267
        json-style dictionary generated from the extractors in the
268
        previous phase
269
    Output:
270
        - results: dic,
271
        json-style dictionary with keys 'nodes' and 'edges' containing
272
        a list of the transformed nodes and edges to be created/updated in
273
        neo4j. Each element in the list has a 'type' field denoting the type
274
        of the node/edge and the 'value' field containg the nodes/edges
275
    """
276
    # docfield containing list of elements
277
    out_outfield = settings['out']['json']['itemfield']
278
    # textfield to read text from
279
    out_textfield = settings['out']['json']['json_text_field']
280
    # idfield where id of document is stored
281
    out_idfield = settings['out']['json']['json_id_field']
282
    # labelfield where the label is located
283
    out_labelfield = settings['out']['json']['json_label_field']
284
    # Sentence Prefix
285
    sent_prefix = settings['load']['text']['sent_prefix']
286
    if sent_prefix == 'None' or not(sent_prefix):
287
        sent_prefix = ''
288
    entities_nodes = []
289
    unique_sent = {}
290
    articles_nodes = []
291
    entity_pmc_edges = []
292
    relations_edges = []
293
    unique_cuis = []
294
    for doc in json_[out_outfield]:
295
        pmid = doc[out_idfield]
296
        for sent in doc['sents']:
297
            cur_sent_id = str(pmid)+'_' + str(sent_prefix) + '_' +  str(sent['sent_id'])
298
            unique_sent[cur_sent_id] = sent['sent_text']
299
            for ent in sent['entities']:
300
                if ent['cuid']:
301
                    if not(ent['cuid'] in unique_cuis):
302
                        unique_cuis.append(ent['cuid'])
303
                        if (type(ent['sem_types']) == list and len(ent['sem_types']) > 1):
304
                            sem_types = ';'.join(ent['sem_types'])
305
                        elif (',' in ent['sem_types']):
306
                            sem_types = ';'.join(ent['sem_types'].split(','))
307
                        else:
308
                            sem_types = ent['sem_types']
309
                        #if not(ent['cuid']):
310
                        entities_nodes.append({'id:ID': ent['cuid'], 
311
                                         'label': ent['label'], 
312
                                         'sem_types:string[]': sem_types})
313
                    entity_pmc_edges.append({':START_ID': ent['cuid'],
314
                                             'score:float[]': ent['score'],
315
                                             'sent_id:string[]': cur_sent_id,
316
                                             ':TYPE':'MENTIONED_IN',
317
                                             'resource:string[]':settings['neo4j']['resource'],
318
                                             ':END_ID': pmid})
319
            for rel in sent['relations']:
320
                if rel['subject__cui'] and rel['object__cui']:
321
                    relations_edges.append({':START_ID': rel['subject__cui'],
322
                                     'subject_score:float[]': rel['subject__score'],
323
                                     'subject_sem_type:string[]': rel['subject__sem_type'],
324
                                     ':TYPE': rel['predicate'].replace('(','__').replace(')','__'),
325
                                     'pred_type:string[]': rel['predicate__type'],
326
                                     'object_score:float[]': rel['object__score'],
327
                                     'object_sem_type:string[]': rel['object__sem_type'],
328
                                     'sent_id:string[]': cur_sent_id,
329
                                     'negation:string[]': rel['negation'],
330
                                     'resource:string[]':settings['neo4j']['resource'],
331
                                     ':END_ID': rel['object__cui']})            
332
        articles_nodes.append({'id:ID': doc[out_idfield], 
333
                               'title': doc[out_labelfield], 
334
                               'journal': doc['journal']})
335
    entity_pmc_edges = aggregate_mentions(entity_pmc_edges)
336
    relations_edges = aggregate_relations(relations_edges)
337
    results = {'nodes': [{'type': 'Entity', 'values': entities_nodes}, {'type': 'Article', 'values': articles_nodes}],
338
               'edges': [{'type': 'relation', 'values': relations_edges}, {'type': 'mention', 'values': entity_pmc_edges}]
339
               }
340
    return results
341
342
343
def create_neo4j_csv(results):
344
    """
345
    Create csv's for use by the neo4j import tool. Relies on create_neo4j_ functions
346
    output and transforms it to suitable format for automatic importing.
347
    Input: 
348
        - results: dic,
349
        json-style dictionary. Check create_neo4j_ function output for
350
        details
351
    Output:
352
        - None just saves the documents in the allocated path as defined
353
        in settings.yaml 
354
    """
355
    outpath = settings['out']['csv']['out_path']
356
    entities_nodes = None
357
    articles_nodes = None
358
    relations_edges = None
359
    entity_pmc_edges = None
360
    other_nodes = []
361
    other_edges = []
362
    for nodes in results['nodes']:
363
        if nodes['type'] == 'Entity':
364
            entities_nodes = nodes['values']
365
        elif nodes['type'] == 'Article':
366
            articles_nodes = nodes['values']
367
        else:
368
            other_nodes.extend(nodes['values'])
369
    for edges in results['edges']:
370
        if edges['type'] == 'relation':
371
            relations_edges = edges['values']
372
        elif edges['type'] == 'mention':
373
            entity_pmc_edges = edges['values']
374
        elif edges['type'] == 'NEW':
375
            other_edges.extend(edges['values'])
376
377
    dic_ = {
378
        'entities.csv': entities_nodes,
379
        'articles.csv': articles_nodes,
380
        'other_nodes.csv': other_nodes,
381
        'entities_pmc.csv':entity_pmc_edges, 
382
        'relations.csv':relations_edges,
383
        'other_edges.csv': other_edges
384
    }
385
386
    dic_fiels = {
387
        'entities.csv': ['id:ID', 'label', 'sem_types:string[]'],
388
        'articles.csv': ['id:ID', 'title', 'journal','sent_id:string[]'],
389
        'other_nodes.csv': ['id:ID'],
390
        'entities_pmc.csv':[':START_ID','score:float[]','sent_id:string[]', 'resource:string[]', ':END_ID'], 
391
        'relations.csv':[':START_ID','subject_score:float[]','subject_sem_type:string[]',':TYPE','pred_type:string[]', 
392
        'object_score:float[]','object_sem_type:string[]','sent_id:string[]','negation:string[]', 'resource:string[]', ':END_ID'],
393
        'other_edges.csv':[':START_ID', ':TYPE', 'resource:string[]', ':END_ID']
394
    }
395
396
    for k, toCSV in dic_.iteritems():
397
        if toCSV:
398
            keys = toCSV[0].keys()
399
            out = os.path.join(outpath, k)
400
            with open(out, 'wb') as output_file:
401
                time_log("Created file %s" % k)
402
                dict_writer = csv2.DictWriter(output_file, fieldnames=dic_fiels[k], encoding='utf-8')
403
                dict_writer.writeheader()
404
                dict_writer.writerows(toCSV)
405
    time_log('Created all documents needed')
406
407
408
409
def fix_on_create_nodes(node):
410
    """
411
    Helper function to create the correct cypher string for
412
    querying and merging a new node to the graph. This is used
413
    when no node is matched and a new one has to be created.
414
    Input:
415
        - node: dic,
416
        dictionary of a node generated from some create_neo4j_
417
        function
418
    Output:
419
        - s: string,
420
        part of cypher query, responsible handling the creation of anew node
421
    """
422
    s = ' '
423
    # Has at least one other attribute to create than id
424
    if len(node.keys())>1:
425
        s = 'ON CREATE SET '
426
        for key, value in node.iteritems():
427
            if (value) and (value != " "):
428
                if 'ID' in key.split(':'):
429
                    continue
430
                elif 'string[]' in key:
431
                    field = key.split(':')[0]
432
                    string_value = '['
433
                    for i in value.split(';'):
434
                        string_value += '"' + i + '"' + ','
435
                    string_value = string_value[:-1] + ']'
436
                    s += ' a.%s = %s,' % (field, string_value)
437
                elif 'float[]' in key:
438
                    field = key.split(':')[0]
439
                    string_value = str([int(i) for i in value.split(';')])
440
                    s += ' a.%s = %s,' % (field, string_value)
441
                else:
442
                    field = key.split(':')[0]
443
                    s += ' a.%s = "%s",' % (field, value.replace('"', "'"))
444
        s = s[:-1]
445
    # No attributes
446
    return s
447
448
449
def create_merge_query(node, type_):
450
    """
451
    Creating the whole merge and update cypher query for a node.
452
    Input:
453
        - node: dic,
454
        dictionary of a node containing the attributes of the
455
        node
456
        - type_: str,
457
        type of the node to be merged
458
    Output:
459
        - quer: str,
460
        the complete cypher query ready to be run
461
    """
462
    quer = """
463
    MERGE (a:%s {id:"%s"})
464
    %s""" % (type_, node["id:ID"], fix_on_create_nodes(node))
465
    return quer
466
467
468
def populate_nodes(graph, nodes, type_):
469
    """
470
    Function that actually calls the cypher query and populates the graph
471
    with nodes of type_, merging on already existing nodes on their id_.
472
    Input:
473
        -graph: py2neo.Graph,
474
        object representing the graph in neo4j. Using py2neo.
475
        - nodes: list,
476
        list of dics containing the attributes of each node
477
        - type_: str,
478
        type of the node to be merged
479
    Output: None, populates the db.
480
    """
481
    c = 0
482
    total_rel = 0
483
    time_log('~~~~~~  Will create nodes of type: %s  ~~~~~~' % type_)
484
    for ent in nodes:
485
        c += 1
486
        quer = create_merge_query(ent, type_)
487
        f = graph.run(quer)
488
        total_rel += f.stats()['nodes_created']
489
        if c % 1000 == 0 and c > 999:
490
            time_log("Process: %d -- %0.2f %%" % (c, 100*c/float(len(nodes))))
491
    time_log('#%s : %d' % (type_, c))
492
    time_log('Finally added %d new nodes!' % total_rel) 
493
494
495
def create_edge_query(edge, sub_ent=settings['load']['edges']['sub_type'], 
496
                       obj_ent=settings['load']['edges']['obj_type']):
497
    """
498
    Takes as input an edge, in the form of a dictionary, and returns the
499
    corresponding cypher query that:
500
    1) First Matches the start-end nodes and the type of the edge
501
    2) Merges the edge the following way:
502
        - If the edge doesn't exist it creates it setting all attributes
503
          of the edge according to its' values
504
        - If the edge exists, it updates the attributes that are both in the
505
          graph edge and the dictionary and creates the attributes that are not
506
          found in the graph edge but are provided in the edge dictionary
507
    Input:
508
        - edge, dict
509
        dictionary containing the edge properties
510
        - sub_ent, str
511
        string denoting what type is the subject node
512
        - obj_ent, str
513
        string denoting what type is the object node
514
    Output:
515
        - s, string
516
        query string to perform
517
    """
518
    s = """MATCH (a:%s {id:"%s"}), (b:%s {id:"%s"}) 
519
           MERGE (a)-[r:%s]->(b)
520
           ON MATCH SET """ % (sub_ent, edge[':START_ID'], obj_ent, edge[':END_ID'],  edge[':TYPE'])
521
    for key, value in edge.iteritems():
522
        # Don't see why this check should be here???
523
        # if (value):
524
        if not(('START_ID' in key.split(':')) or ('END_ID' in key.split(':')) or ('TYPE' in key.split(':'))):
525
            if 'string[]' in key:
526
                field = key.split(':')[0]
527
                string_value = '['
528
                for i in value.split(';'):
529
                    string_value += '"' + i + '"' + ','
530
                string_value = string_value[:-1] + ']'
531
            elif 'float[]' in key:
532
                field = key.split(':')[0]
533
                # Dealing with empty or non-scored elements
534
                tmp_s = []
535
                for i in value.split(';'):
536
                    try:
537
                        tmp_s.append(int(i))
538
                    except ValueError:
539
                        tmp_s.append(0)
540
                string_value = str(tmp_s)
541
            else:
542
                field = key.split(':')[0]
543
                string_value = value.replace('"', "'")
544
            s += ' r.%s = CASE WHEN NOT EXISTS(r.%s) THEN %s ELSE r.%s + %s END,' % (field, field, string_value, field, string_value)
545
    s = s[:-1]
546
    s += ' ON CREATE SET '
547
    for key, value in edge.iteritems():
548
        # Don't see why this check should be here???
549
        # if (value):
550
        if not(('START_ID' in key.split(':')) or ('END_ID' in key.split(':')) or ('TYPE' in key.split(':'))):
551
            if 'string[]' in key:
552
                field = key.split(':')[0]
553
                string_value = '['
554
                for i in value.split(';'):
555
                    string_value += '"' + i + '"' + ','
556
                string_value = string_value[:-1] + ']'
557
            elif 'float[]' in key:
558
                field = key.split(':')[0]
559
                # Dealing with empty or non-scored elements
560
                tmp_s = []
561
                for i in value.split(';'):
562
                    try:
563
                        tmp_s.append(int(i))
564
                    except ValueError:
565
                        tmp_s.append(0)
566
                string_value = str(tmp_s)
567
            else:
568
                field = key.split(':')[0]
569
                string_value = value.replace('"', "'")
570
            s += ' r.%s = %s,' % (field, string_value)
571
    s = s[:-1]
572
    return s
573
574
575
576
577
def populate_relation_edges(graph, relations_edges):
578
    """
579
    Function to create/merge the relation edges between existing entities.
580
    Input:
581
        - graph: py2neo.Graph,
582
        object representing the graph in neo4j. Using py2neo.
583
        - relations_edges: list,
584
        list of dics containing the attributes of each relation
585
    Output: None, populates the db.
586
    """
587
    c = 0
588
    total_rel = 0
589
    for edge in relations_edges:
590
        c += 1  
591
        quer = """
592
        Match (a:Entity {id:"%s"}), (b:Entity {id:"%s"})
593
        MATCH (a)-[r:%s]->(b)
594
        WHERE "%s" in r.sent_id
595
        Return r;
596
        """ % (edge[':START_ID'], edge[':END_ID'], edge[':TYPE'], edge['sent_id:string[]'].split(';')[0])
597
        f = graph.run(quer)
598
        if len(f.data()) == 0 and edge[':START_ID'] and edge[':END_ID']:
599
            quer = create_edge_query(edge, 'Entity', 'Entity')
600
            # subj_s = '['
601
            # for i in edge['subject_sem_type:string[]'].split(';'):
602
            #     subj_s += '"' + i + '"' + ','
603
            # subj_s = subj_s[:-1] + ']'
604
            # obj_s = '['
605
            # for i in edge['object_sem_type:string[]'].split(';'):
606
            #     obj_s += '"' + i + '"' + ','
607
            # obj_s = obj_s[:-1] + ']'
608
            # sent_s = '['
609
            # for i in edge['sent_id:string[]'].split(';'):
610
            #     sent_s += '"' + i + '"' + ','
611
            # sent_s = sent_s[:-1] + ']'
612
            # neg_s = '['
613
            # for i in edge['negation:string[]'].split(';'):
614
            #     neg_s += '"' + i + '"' + ','
615
            # neg_s = neg_s[:-1] + ']'
616
            # sent_res = '['
617
            # for i in edge['resource:string[]'].split(';'):
618
            #     sent_res += '"' + i + '"' + ','
619
            # sent_res = sent_res[:-1] + ']'
620
            # quer = """
621
            # Match (a:Entity {id:"%s"}), (b:Entity {id:"%s"})
622
            # MERGE (a)-[r:%s]->(b)
623
            # ON MATCH SET r.subject_score = r.subject_score + %s, r.subject_sem_type = r.subject_sem_type + %s,
624
            # r.object_score = r.object_score + %s, r.object_sem_type = r.object_sem_type + %s,
625
            # r.sent_id = r.sent_id + %s, r.negation = r.negation + %s, r.resource = r.resource + %s
626
            # ON CREATE SET r.subject_score = %s, r.subject_sem_type =  %s,
627
            # r.object_score =  %s, r.object_sem_type =  %s,
628
            # r.sent_id =  %s, r.negation =  %s, r.resource = %s
629
            # """ % (edge[':START_ID'], edge[':END_ID'], edge[':TYPE'], 
630
            #        str([int(i) for i in edge['subject_score:float[]'].split(';')]), subj_s, 
631
            #        str([int(i) for i in edge['object_score:float[]'].split(';')]), obj_s,
632
            #      sent_s, neg_s, sent_res, str([int(i) for i in edge['subject_score:float[]'].split(';')]), subj_s, 
633
            #        str([int(i) for i in edge['object_score:float[]'].split(';')]), obj_s,
634
            #      sent_s, neg_s, sent_res)
635
            # print quer
636
            # print '~'*50
637
            # print edge
638
            # quer = """
639
            # Match (a:Entity {id:"%s"}), (b:Entity {id:"%s"})
640
            # MERGE (a)-[r:%s]->(b)
641
            # ON MATCH SET r.object_score = CASE WHEN NOT EXISTS(r.object_score) THEN %s ELSE r.object_score + %s END
642
            # """ % (edge[':START_ID'], edge[':END_ID'], edge[':TYPE'], 
643
            #        str([int(i) for i in edge['object_score:float[]'].split(';')]), str([int(i) for i in edge['object_score:float[]'].split(';')]))
644
            f = graph.run(quer)
645
            total_rel += f.stats()['relationships_created']
646
        if c % 1000 == 0 and c > 999:
647
            time_log('Process: %d -- %0.2f %%' % (c, 100*c/float(len(relations_edges))))
648
    time_log('#Relations :%d' % c)
649
    time_log('Finally added %d new relations!' % total_rel)
650
651
def populate_mentioned_edges(graph, entity_pmc_edges):
652
    """
653
    Function to create/merge the relation edges between existing entities.
654
    Input:
655
        - graph: py2neo.Graph,
656
        object representing the graph in neo4j. Using py2neo.
657
        - entity_pmc_edges: list,
658
        list of dics containing the attributes of each relation
659
    Output: None, populates the db.
660
    """
661
662
    c = 0
663
    total_rel = 0
664
    for edge in entity_pmc_edges:
665
        c += 1
666
        quer = """
667
        Match (a:Entity {id:"%s"}), (b:Article {id:"%s"})
668
        MATCH (a)-[r:%s]->(b)
669
        WHERE "%s" in r.sent_id
670
        Return r;
671
        """ % (edge[':START_ID'], edge[':END_ID'], edge[':TYPE'] , edge['sent_id:string[]'])
672
        f = graph.run(quer)
673
        if len(f.data()) == 0 and edge[':START_ID'] and edge[':END_ID']:
674
            quer = create_edge_query(edge, 'Entity', 'Article')
675
            # sent_s = '['
676
            # for i in edge['sent_id:string[]'].split(';'):
677
            #     sent_s += '"' + i + '"' + ','
678
            # sent_s = sent_s[:-1] + ']'
679
            # sent_res = '['
680
            # for i in edge['resource:string[]'].split(';'):
681
            #     sent_res += '"' + i + '"' + ','
682
            # sent_res = sent_res[:-1] + ']'
683
            # quer = """
684
            # Match (a:Entity {id:"%s"}), (b:Article {id:"%s"})
685
            # MERGE (a)-[r:MENTIONED_IN]->(b)
686
            # ON MATCH SET r.score = r.score + %s, r.sent_id = r.sent_id + %s, r.resource = r.resource + %s
687
            # ON CREATE SET r.score = %s, r.sent_id = %s, r.resource = %s
688
            # """ % (edge[':START_ID'], edge[':END_ID'], 
689
            #        str([int(i) for i in edge['score:float[]'].split(';')]), sent_s, sent_res,
690
            #        str([int(i) for i in edge['score:float[]'].split(';')]), sent_s, sent_res)
691
            f = graph.run(quer)
692
            total_rel += f.stats()['relationships_created']
693
        if c % 1000 == 0 and c>999:
694
            time_log("Process: %d -- %0.2f %%" % (c, 100*c/float(len(entity_pmc_edges))))
695
    time_log('#Mentions: %d' % c)
696
    time_log('Finally added %d new mentions!' % total_rel)
697
698
699
def populate_new_edges(graph, new_edges):
700
    """
701
    Function to create/merge an unknwon type of edge.
702
    Input:
703
        - graph: py2neo.Graph,
704
        object representing the graph in neo4j. Using py2neo.
705
        - new_edges: list,
706
        list of dics containing the attributes of each relation
707
    Output: None, populates the db.
708
    """
709
710
    c = 0
711
    total_rel = 0
712
    # field containing the type of the node for the subject
713
    sub_type = settings['load']['edges']['sub_type']
714
    # field containing the type of the node for the object
715
    obj_type = settings['load']['edges']['obj_type']
716
    for edge in new_edges:
717
        c += 1  
718
        quer = """
719
        Match (a:%s {id:"%s"}), (b:%s {id:"%s"})
720
        MATCH (a)-[r:%s]->(b)
721
        WHERE ("%s" in r.resource)
722
        Return r;
723
        """ % (sub_type, edge[':START_ID'], obj_type, edge[':END_ID'], edge[':TYPE'], settings['neo4j']['resource'])
724
        f = graph.run(quer)
725
        if len(f.data()) == 0 and edge[':START_ID'] and edge[':END_ID']:
726
            quer = create_edge_query(edge, sub_type, obj_type)
727
            # sent_res = '['
728
            # for i in edge['resource:string[]'].split(';'):
729
            #     sent_res += '"' + i + '"' + ','
730
            # sent_res = sent_res[:-1] + ']'
731
            # quer = """
732
            # MATCH (a:%s {id:"%s"}), (b:%s {id:"%s"})
733
            # MERGE (a)-[r:%s]->(b)
734
            # ON MATCH SET r.resource = r.resource + %s
735
            # ON CREATE SET r.resource = %s
736
            # """ % (sub_type, edge[':START_ID'], obj_type, edge[':END_ID'], 
737
            #        edge[':TYPE'], sent_res, sent_res)
738
            # print quer
739
            f = graph.run(quer)
740
            total_rel += f.stats()['relationships_created']
741
        if c % 1000 == 0 and c > 999:
742
            time_log("Process: %d -- %0.2f %%" % (c, 100*c/float(len(new_edges))))
743
    time_log('#Edges: %d' % c)
744
    time_log('Finally added %d new edges!' % total_rel)
745
746
747
def update_neo4j_parallel(results):
748
    
749
    """
750
    Function to create/update a neo4j database according to the nodeg and edges
751
    generated by the create_neo4j_ functions. Change settings.yaml values in
752
    the neo4j group of variables to match your needs.
753
    Input:
754
        - results: 
755
        json-style dictionary. Check create_neo4j_ functions output for
756
        details
757
    Output: None, creates/merges the nodes to the wanted database
758
    """
759
    found = False
760
    for key in ['nodes', 'edges']:
761
        for item in results[key]:
762
            if item['values'] and item['type'] == 'Entity':
763
                found = True
764
                break
765
        if found:
766
            break
767
    if not(found):
768
        time_log('NO NODES/EDGES FOUND! MOVING ON!')
769
        return 1
770
        #c = raw_input()
771
        #if c=='q':
772
        #    exit()
773
        #else:
774
        #    return
775
    try:
776
        N_THREADS = int(settings['num_cores'])
777
    except:
778
        N_THREADS = cpu_count()
779
    # results = {'nodes': [{'type': 'Entity', 'values': entities_nodes}, {'type': 'Article', 'values': articles_nodes}],
780
    #            'edges': [{'type': 'relation', 'values': relations_edges}, {'type': 'mention', 'values': entity_pmc_edges}]
781
    #            }
782
    par_res = [{'nodes': [{} for j in results['nodes']], 'edges': [{} for j in results['edges']]} for i in xrange(N_THREADS)]
783
    # Create mini batches of the results
784
    for i, nodes in enumerate(results['nodes']):
785
        par_nodes = chunk_document_collection(nodes['values'], N_THREADS)
786
        for batch_num in xrange(N_THREADS):
787
            par_res[batch_num]['nodes'][i]['type'] = nodes['type']
788
            par_res[batch_num]['nodes'][i]['values'] = par_nodes[batch_num]
789
    for i, edges in enumerate(results['edges']):
790
        par_edges = chunk_document_collection(edges['values'], N_THREADS)
791
        for batch_num in xrange(N_THREADS):
792
            par_res[batch_num]['edges'][i]['type'] = edges['type']
793
            par_res[batch_num]['edges'][i]['values'] = par_edges[batch_num]
794
    len_col = " | ".join([str(len(b)) for b in par_edges])
795
    time_log('Will break the collection into batches of: %s  %s edges!' % (len_col, edges['type']))
796
    pool = Pool(N_THREADS, maxtasksperchild=1)
797
    res = pool.map(update_neo4j_parallel_worker, par_res)
798
    pool.close()
799
    pool.join()
800
    del pool
801
    if sum(res) == N_THREADS:
802
        time_log('Completed parallel update of Neo4j!')
803
    else:
804
        time_log('Something wrong with the parallel execution?')
805
        time_log('Returned %d instead of %d' % (sum(res), N_THREADS))
806
    return 1
807
808
def update_neo4j_parallel_worker(results):
809
    """
810
    Just a worker interface for the different Neo4j update
811
    executions.
812
    Input:
813
        - results: 
814
        json-style dictionary. Check create_neo4j_ functions output for
815
        details
816
    Output:
817
        - res : dic,
818
        Output: None, creates/merges the nodes to the wanted database
819
    """
820
    # Update Neo4j as usual
821
    from pprint import pprint
822
    #pprint(results)
823
    #print('~'*50)
824
    update_neo4j(results)
825
    # Return 1 for everything is ok
826
    return 1
827
828
829
def update_neo4j(results):
830
    
831
    """
832
    Function to create/update a neo4j database according to the nodeg and edges
833
    generated by the create_neo4j_ functions. Change settings.yaml values in
834
    the neo4j group of variables to match your needs.
835
    Input:
836
        - results: 
837
        json-style dictionary. Check create_neo4j_ functions output for
838
        details
839
    Output: None, creates/merges the nodes to the wanted database
840
    """
841
    host = settings['neo4j']['host']
842
    port = settings['neo4j']['port']
843
    user = settings['neo4j']['user']
844
    password = settings['neo4j']['password']
845
    try:
846
        graph = py2neo.Graph(host=host, port=port, user=user, password=password)
847
    except Exception, e:
848
        #time_log(e)
849
        #time_log("Couldn't connect to db! Check settings!")
850
        exit(2)
851
    for nodes in results['nodes']:
852
        populate_nodes(graph, nodes['values'], nodes['type'])
853
    for edges in results['edges']:
854
        if edges['type'] == 'relation':
855
            time_log('~~~~~~  Will create Relations Between Entities ~~~~~~')
856
            populate_relation_edges(graph, edges['values'])
857
        elif edges['type'] == 'mention':
858
            time_log('~~~~~~  Will create Mentioned In  ~~~~~~')
859
            populate_mentioned_edges(graph, edges['values'])
860
        elif edges['type'] == 'NEW':
861
            time_log('~~~~~~  Will create new-type of edges~~~~~~')
862
            populate_new_edges(graph, edges['values'])
863
        else:
864
            time_log('Specific node type not handled! You have to update the code!')
865
            raise NotImplementedError 
866
867
868
def update_mongo_sentences(json_):
869
    """
870
    Helper function to save the sentences found in the enriched articles in
871
    mongodb. Connecting to a collection according to settings and then
872
    creating/updating the articles with the sentences found in them.
873
    Input:
874
        - json_: dic,
875
        json-style dictionary generated from the semrep extractor in the
876
        previous phase. Must make sure that there is a field named as indicated
877
        in json_['out']['json']['json_doc_field'], where the documents/articles
878
        are stored and each document/article has a field sents, as expected
879
        in the output of the semrep extractor.
880
    Output:
881
        None, just populates the database
882
883
    """
884
    uri = settings['mongo_sentences']['uri']
885
    db_name = settings['mongo_sentences']['db']
886
    collection_name = settings['mongo_sentences']['collection']
887
    client = pymongo.MongoClient(uri)
888
    db = client[db_name]
889
    collection = db[collection_name]
890
    new = 0
891
    upd = 0
892
    docs = json_[settings['out']['json']['itemfield']]
893
    for i, doc in enumerate(docs):
894
        cursor = collection.find({'id': doc['id']})
895
        sents = [{'sent_id': sent['sent_id'], 'text': sent['sent_text']} for sent in doc['sents']]
896
        if cursor.count() == 0:
897
            collection.insert_one({'id': doc['id'], 'sentences': sents})
898
            new += 1
899
        else:
900
            for mongo_doc in cursor:
901
                cur_sent = mongo_doc['sentences']
902
                cur_ids = [s['sent_id'] for s in cur_sent]
903
                new_sent = [s for s in sents if not(s['sent_id'] in cur_ids)]
904
                if new_sent:
905
                    cur_sent.extend(new_sent)
906
                    mongo_doc['sentences'] = cur_sent
907
                    collection.replace_one({'id': doc['id']}, mongo_doc)
908
                    upd += 1
909
        if i % 100 == 0 and i > 99:
910
            time_log("Process: %d -- %0.2f %%" % (i, 100*i/float(len(docs))))
911
    time_log('Finally updated %d -- inserted %d documents!' % (upd, new))
912
913
914
915
def save_mongo(json_):
916
    """
917
    Helper function to save edges/documents to mongo.
918
    Input:
919
        - json_: dic,
920
        json-style dictionary generated from the transformation modules in the
921
        previous phase. Must make sure that there is a field named as indicated
922
        in settings['out']['json']['json_doc_field'], where the edges/docs
923
        are stored. Specifically for the articles, they are replaced if another
924
        item with the same id is found in the collection.
925
    Output:
926
        None, just populates the database
927
928
    """
929
    uri = settings['out']['mongo']['uri']
930
    db_name = settings['out']['mongo']['db']
931
    collection_name = settings['out']['mongo']['collection']
932
    client = pymongo.MongoClient(uri)
933
    db = client[db_name]
934
    collection = db[collection_name]
935
    # Output Idfield
936
    idfield = settings['out']['json']['json_id_field']
937
    docs = json_[settings['out']['json']['itemfield']]
938
    for i, doc in enumerate(docs):
939
        if idfield in doc:
940
            result = collection.replace_one({'id': str(doc[idfield])}, doc, True)
941
        elif 'p' in doc:
942
            result = collection.insert_one(doc)
943
        else:
944
            time_log('Unknown type to persist to mongo')
945
            raise NotImplementedError
946
        if i % 100 == 0 and i > 99:
947
            time_log("Process: %d -- %0.2f %%" % (i, 100*i/float(len(docs))))
948
    return 1