--- a +++ b/bin/DeepMod_scripts/myGetFeatureBasedPos.py @@ -0,0 +1,782 @@ + +import os; +import sys; +import string; +import glob; +import time +import copy + +import h5py +import numpy as np +import multiprocessing + +from collections import defaultdict +from distutils.version import LooseVersion + +import tempfile +import subprocess + +import re; + +from . import myCom +from . import myDetect + +# +# map long reads +# then call another function to get feature for each base of interest +# +def mGetFeature1(moptions, sp_options, f5files): + # associate signals to events + f5data = myDetect.get_Event_Signals(moptions, sp_options, f5files) + + # save all sequences information + if moptions['outLevel']<=myCom.OUTPUT_DEBUG: start_time = time.time(); + temp_fa = tempfile.NamedTemporaryFile(suffix='.fa', mode='w') + f5keys = sorted(f5data.keys()); #f5keys.sort() + for f5k in f5keys: + temp_fa.write(''.join(['>', f5k, '\n', f5data[f5k][0], '\n'])) + temp_fa.flush(); + if moptions['outLevel']<=myCom.OUTPUT_DEBUG: + end_time = time.time(); + print ("Write consuming time %d" % (end_time-start_time)) + + # alignment using bwa-mem or minimap2 + temp_sam = tempfile.NamedTemporaryFile() + if moptions['alignStr']=='bwa': + cmd_opt = ['mem', '-x', 'ont2d', '-v', '1', '-t', '1', moptions['Ref'], temp_fa.name] + else: + cmd_opt = ['-ax', 'map-ont', moptions['Ref'], temp_fa.name] + returncode = subprocess.call([moptions['alignStr'],]+cmd_opt, stdout=temp_sam) + if not returncode==0: + print ('Fatal Error!!! returncode is non-zero(%d) for "%s"' % (returncode, curcmd)) + errkey = "Cannot running aligment" + for f5k in f5keys: + sp_options["Error"][errkey].append(f5data[f5k][3]) + return; + + temp_fa.close(); + temp_sam.seek(0); + # get sam information + align_info = temp_sam.readlines() + align_info = [str(align_info[i], 'utf-8').strip() for i in range(len(align_info))] + temp_sam.close(); + + sp_param = defaultdict(); + sp_param['f5data'] = f5data + + # for alignment + f5align = defaultdict() + f5keydict = defaultdict(); + sp_param['ref_info'] = defaultdict() + + if moptions['outLevel']<=myCom.OUTPUT_DEBUG:start_time = time.time(); + ilid = 0; + # for each record in sam, get alignment information + while ilid < len(align_info): + if len(align_info[ilid])==0 or align_info[ilid][0]=='@': + ilid += 1 + continue; + + sp_param['f5status'] = ""; + sp_param['line'] = align_info[ilid] + qname = handle_line(moptions, sp_param, f5align) + if sp_param['f5status'] == "": + f5keydict[qname] = True; + ilid += 1 + + # for unmapped reads + for f5k in f5keys: + if f5k not in f5keydict: + sp_options["Error"]["Not in alignment sam"].append(f5data[f5k][3]) + + if moptions['outLevel']<=myCom.OUTPUT_DEBUG: + end_time = time.time(); + print ("Get BAM consuming time %d" % (end_time-start_time)) + + sp_param['f5status']= "" + sp_param['line'] = "" + if moptions['outLevel']<=myCom.OUTPUT_DEBUG:start_time = time.time(); + # handle each alignment + handle_record(moptions, sp_options, sp_param, f5align, f5data) + if moptions['outLevel']<=myCom.OUTPUT_DEBUG: + end_time = time.time(); + print ("Analyze & annotate & save consuming time %d" % (end_time-start_time)) + +# +# get mapping information +# then call another function to get feature of each base in a long read +# +def handle_record(moptions, sp_options, sp_param, f5align, f5data): + alignkeys = list(f5align.keys()); + # alignment detail + numreg = re.compile('\d+') + mdireg = re.compile('[MIDNSHPX=]{1}') + + feat_file_ind_dict = [] + feat_list = None; feat_file_ind = 0 + start_c_time = time.time(); + + for readk in alignkeys: + if len(feat_file_ind_dict)>0 and feat_list.nbytes > moptions['size_per_batch']: + # save features when the size is larger than the defined size + cur_feat_file_base = sp_options['ctfolder'] + '/'+str(feat_file_ind) + np.savetxt(cur_feat_file_base+'.xy.gz', feat_list, fmt='%.3f') + with open(cur_feat_file_base+'.xy.ind', 'w') as ind_mw: + for f_ind in feat_file_ind_dict: + ind_mw.write('%d %s\n' % (f_ind[1], f_ind[0])) + print ("\t%s-%d Total consuming time %d" % (sp_options['ctfolder'][sp_options['ctfolder'].rfind('/'):], feat_file_ind, time.time()-start_c_time)); sys.stdout.flush() + feat_file_ind_dict = [] + feat_list = None; + feat_file_ind += 1 + + # get alignment detail + mapq, flag, rname, pos, cigar, readseq = f5align[readk] + + if not ( (rname in moptions['fulmodlist'] and len(moptions['fulmodlist'][rname])>0) or \ + ((not moptions['anymodlist']==None) and rname in moptions['anymodlist'] and len(moptions['anymodlist'][rname])>0) or \ + ((not moptions['nomodlist']==None) and rname in moptions['nomodlist'] and len(moptions['nomodlist'][rname])>0) ): + continue; + + # get reference sequece + if rname not in sp_param['ref_info']: + myDetect.getRefSeq(moptions, sp_param, rname) + refseq = sp_param['ref_info'][rname] + + # mapped starting position and strand + pos = pos - 1 + forward_reverse = '-' if flag&0x10 else '+' + + numinfo = numreg.findall(cigar); + mdiinfo = mdireg.findall(cigar) + numinfo = [int(numinfo[i]) for i in range(len(numinfo))] #map(int, numinfo) + + # remove clip from both tails + leftclip = 0; rightclip = 0; + while mdiinfo[0] in ['I', 'D', 'N', 'S', 'H', 'P', 'X']: + if mdiinfo[0] in ['I', 'S', 'X']: + leftclip += numinfo[0]; readseq = readseq[numinfo[0]:] + if mdiinfo[0] in ['H']: leftclip += numinfo[0] + if mdiinfo[0] in ['D', 'N', 'X']: + pos += numinfo[0] + numinfo = numinfo[1:]; mdiinfo = mdiinfo[1:] + while mdiinfo[-1] in ['I', 'D', 'N', 'S', 'H', 'P', 'X']: + if mdiinfo[-1] in ['I', 'S', 'X']: + rightclip += numinfo[-1]; readseq = readseq[:-numinfo[-1]] + if mdiinfo[-1] in ['H']: rightclip += numinfo[-1] + numinfo = numinfo[:-1]; mdiinfo = mdiinfo[:-1] + if forward_reverse=='+': + if rightclip>0: m_event = f5data[readk][1][leftclip:-rightclip] + else: m_event = f5data[readk][1][leftclip:] + else: + if leftclip>0: m_event = f5data[readk][1][rightclip:-leftclip] + else: m_event = f5data[readk][1][rightclip:] + + # is in region of interest if provided + isinreg = False; + if (moptions['region'][0] in ['', None, rname]) and \ + (moptions['region'][1] in ['', None] or pos>moptions['region'][1]) and \ + (moptions['region'][2] in ['', None] or pos+len(m_event)<moptions['region'][2]): + isinreg = True; + if not isinreg: + continue; + + # associate mapped reference positions with read positions + lastmatch = None; firstmatch = None; + first_match_pos = None; last_match_pos = None + last_al_match = None; first_al_match = None + lasmtind = 0; + base_map_info = []; #indel_groups = defaultdict() + nummismatch = 0; numinsert = 0; numdel = 0; + read_ind = 0; + for n1ind in range(len(numinfo)): + mdi = mdiinfo[n1ind]; + # for each mapped types + for n1i in range(numinfo[n1ind]): + if mdi=='M': + base_map_info.append((refseq[pos], readseq[read_ind], pos, read_ind)) + if refseq[pos]==readseq[read_ind]: + if firstmatch==None: firstmatch = read_ind + if lastmatch==None or lastmatch<read_ind: lastmatch = read_ind; lasmtind=n1ind + if first_al_match==None: first_al_match=len(base_map_info)-1 + if last_al_match==None or last_al_match<len(base_map_info): last_al_match=len(base_map_info)-1 + if first_match_pos==None: first_match_pos = pos + if last_match_pos==None or last_match_pos<pos: last_match_pos = pos + else: nummismatch += 1 + pos += 1; read_ind += 1; + elif mdi =='I': + base_map_info.append(('-', readseq[read_ind], pos, read_ind)) + read_ind += 1; + numinsert += 1 + elif mdi == 'D': + base_map_info.append((refseq[pos], '-', pos, read_ind)) + pos += 1; + numdel += 1 + elif mdi == 'N': + base_map_info.append((refseq[pos], '-', pos, read_ind)) + pos += 1; + if moptions['outLevel']<=myCom.OUTPUT_WARNING: + print ('CIGAR-Error N exist', f5data[readk][3]) + elif mdi == 'S': + read_ind += 1; + if moptions['outLevel']<=myCom.OUTPUT_WARNING: + print ('CIGAR-Error!!! S in the middle of the sequence', f5data[readk][3]) + elif mdi == 'H': + if moptions['outLevel']<=myCom.OUTPUT_WARNING: + print ('CIGAR-Error!!! H in the middle of the sequence', f5data[readk][3]) + elif mdi == 'P': + if moptions['outLevel']<=myCom.OUTPUT_WARNING: + print ('CIGAR-Error!!! P exist', f5data[readk][3]) + elif mdi == '=': + base_map_info.append((refseq[pos], readseq[read_ind], pos, read_ind)) + if first_match_pos==None: first_match_pos = pos + if last_match_pos==None or last_match_pos<pos: last_match_pos = pos + pos += 1; read_ind += 1; + if firstmatch==None: firstmatch = read_ind - 1 + if lastmatch==None or lastmatch<read_ind-1: lastmatch = read_ind - 1; lasmtind=n1ind + if last_al_match==None or last_al_match<len(base_map_info): last_al_match=len(base_map_info)-1 + if first_al_match==None: first_al_match=len(base_map_info)-1 + elif mdi == 'X': + base_map_info.append((refseq[pos], readseq[read_ind], pos, read_ind)) + pos += 1; read_ind += 1; + nummismatch += 1 + else: + if moptions['outLevel']<=myCom.OUTPUT_WARNING: + print ('CIGAR-Error!!!', 'Warning unknow CIGAR element ' + str(numinfo[n1ind]) + ' ' + mdi, f5data[readk][3]) + if firstmatch==None or lastmatch==None or firstmatch<0 or lastmatch<0: + if moptions['outLevel']<=myCom.OUTPUT_WARNING: + print ("Errorfast5 "+f5data[readk][3]) + print('match-Error!!! no first and/or last match',f5data[readk][3],('firstmatch=%d' % firstmatch) if not (firstmatch==None) else "N", ('lastmatch%d' % lastmatch) if not (lastmatch==None) else "N", str(flag), rname, str(pos)); + print('\tf=%d, chr=%s, p=%d, c=%s, s=%s' % (flag, rname, pos, cigar, readseq)) + continue; + + # remove unmatch in both tails + if not firstmatch==None: leftclip += firstmatch + if (not lastmatch==None) and len(m_event)-lastmatch>1: rightclip += len(m_event)-lastmatch-1 + # remove events whose bases are not mapped. + if forward_reverse=='+': + if len(m_event)-lastmatch>1: + m_event = m_event[firstmatch:(lastmatch+1-len(m_event))] + elif firstmatch>0: m_event = m_event[firstmatch:] + else: + if firstmatch>0: m_event = m_event[(len(m_event)-1-lastmatch):-firstmatch] + elif len(m_event)-lastmatch>1: m_event = m_event[(len(m_event)-1-lastmatch):] + # print detail if unexpected error occurs + if firstmatch>0 or len(base_map_info)-last_al_match>1: + if moptions['outLevel']<=myCom.OUTPUT_WARNING and ((firstmatch>0) or (len(base_map_info)-last_al_match>1 and refseq[last_match_pos+1] not in ['N'])): + print ("Errorfast5"+f5data[readk][3]) + print ('Warning!!! first not match', firstmatch, lastmatch, first_al_match, last_al_match, len(base_map_info), numinfo[lasmtind-2:(lasmtind+5)], mdiinfo[lasmtind-2:(lasmtind+5)], lasmtind, len(numinfo)) + print('\tref='+refseq[last_match_pos:last_match_pos+20]+"\n\tred="+readseq[lastmatch:lastmatch+20]) + if firstmatch>0: + print('\tref='+refseq[(first_match_pos-20 if first_match_pos-20>0 else 0):first_match_pos]+"\n\tred="+readseq[(firstmatch-20 if firstmatch-20>0 else 0):firstmatch]) + print('\tf=%d, chr=%s, p=%d, c=%s, s=%s' % (flag, rname, pos, cigar, readseq)) # flag, rname, pos, cigar, readseq + + if len(base_map_info)-last_al_match>1: + base_map_info = base_map_info[first_al_match:(last_al_match+1-len(base_map_info))] + elif first_al_match>0: + base_map_info = base_map_info[first_al_match:] + + # post-process mapping information + base_map_info = np.array(base_map_info, dtype=[('refbase', 'U1'), ('readbase', 'U1'), ('refbasei', np.uint64), ('readbasei', np.uint64)]) + if forward_reverse=='-': + base_map_info = np.flipud(base_map_info) + for bmii in range(len(base_map_info)): + base_map_info['refbase'][bmii] = get_complement(base_map_info['refbase'][bmii]) + base_map_info['readbase'][bmii] = get_complement(base_map_info['readbase'][bmii]) + leftclip, rightclip = rightclip, leftclip + if False: #True: # for test base_map_info ### for check consistency + ref_align_key = '/Analyses/NanomoCorrected_000/BaseCalled_template/Alignment/genome_alignment' + read_align_key = '/Analyses/NanomoCorrected_000/BaseCalled_template/Alignment/read_alignment' + with h5py.File(f5data[readk][3], 'r') as mf5: + read_align_list = [bt.decode(encoding="utf-8") for bt in mf5[read_align_key]] + ref_align_list = [bt.decode(encoding="utf-8") for bt in mf5[ref_align_key]] + for rali in range(len(read_align_list)): + if not read_align_list[rali]==base_map_info['readbase'][rali]: + print ("Error not equal1! %s %s %d %s" % (read_align_list[rali], base_map_info['readbase'][rali], rali, f5data[readk][3])) + if not ref_align_list[rali]==base_map_info['refbase'][rali]: + print ("Error not equal2! %s %s %d %s" % (ref_align_list[rali], base_map_info['refbase'][rali], rali, f5data[readk][3])) + # + # handle map like + # CCG or CGG + # C-G C-G + # + if 'motif' in moptions and moptions['motif'][0]=='CG': + for ali in range(len(base_map_info)): + if base_map_info['refbase'][ali]=='C' and base_map_info['readbase'][ali]=='C': + if ali+1<len(base_map_info) and base_map_info['readbase'][ali+1]=='-' and base_map_info['refbase'][ali+1]=='G': + addali = 2; + while ali + addali < len(base_map_info): + if base_map_info['readbase'][ali+addali]=='-' and base_map_info['refbase'][ali+addali]=='G': addali += 1; + else: break; + if ali + addali < len(base_map_info) and base_map_info['readbase'][ali+addali]=='G' and base_map_info['refbase'][ali+addali]=='G': + base_map_info['readbase'][ali+1], base_map_info['readbase'][ali+addali] = base_map_info['readbase'][ali+addali], base_map_info['readbase'][ali+1] + if base_map_info['refbase'][ali]=='G' and base_map_info['readbase'][ali]=='G': + if ali-1>-1 and base_map_info['readbase'][ali-1]=='-' and base_map_info['refbase'][ali-1]=='C': + addali = 2; + while ali - addali >-1: + if base_map_info['readbase'][ali-addali]=='-' and base_map_info['refbase'][ali-addali]=='C': addali += 1; + else: break; + if ali - addali>-1 and base_map_info['readbase'][ali-addali]=='C' and base_map_info['refbase'][ali-addali]=='C': + base_map_info['readbase'][ali-1], base_map_info['readbase'][ali-addali] = base_map_info['readbase'][ali-addali], base_map_info['readbase'][ali-1] + # too short reads + if len(m_event)<500: + sp_options["Error"]["Less(<500) events"].append(f5data[readk][3]) + continue; + + # get feautre + mfeatures,isdif = get_Feature(moptions, sp_options, sp_param, f5align, f5data, readk, leftclip, rightclip, base_map_info, forward_reverse, rname, first_match_pos, numinsert, numdel) + if isdif and moptions['outLevel']<=myCom.OUTPUT_WARNING: + print("Dif is true") + print([lastmatch, firstmatch, first_match_pos, last_match_pos, first_al_match, last_al_match, lasmtind, len(base_map_info), nummismatch, numinsert, numdel, len(base_map_info)-nummismatch-numinsert-numdel]) + + # merge to previously handled features of other fast5 files + if len(mfeatures)>0: + if len(feat_file_ind_dict)==0: + feat_file_ind_dict.append((f5data[readk][3], 0)); + feat_list = mfeatures + else: + feat_file_ind_dict.append((f5data[readk][3], len(feat_list))) + feat_list = np.concatenate((feat_list, mfeatures), axis=0) + + # store the last feature data. + if len(feat_file_ind_dict)>0: + cur_feat_file_base = sp_options['ctfolder'] + '/'+str(feat_file_ind) + np.savetxt(cur_feat_file_base+'.xy.gz', feat_list, fmt='%.3f') + with open(cur_feat_file_base+'.xy.ind', 'w') as ind_mw: + for f_ind in feat_file_ind_dict: + ind_mw.write('%d %s\n' % (f_ind[1], f_ind[0])) + print ("\t%s-%d Total consuming time %d" % (sp_options['ctfolder'][sp_options['ctfolder'].rfind('/'):], feat_file_ind, time.time()-start_c_time)); sys.stdout.flush() + feat_file_ind_dict = [] + feat_list = None; + feat_file_ind += 1 + +# +# get feature for each base of interest in long reads according to raw signals and mapping information +# +def get_Feature(moptions, sp_options, sp_param, f5align, f5data, readk, start_clip, end_clip, base_map_info, forward_reverse, rname, mapped_start_pos, num_insertions, num_deletions): + # event information + modevents = sp_param['f5data'][readk][1] + # class number, bin num and bin length + clnum = 2; binnum = 50; binlen = 0.2; + if forward_reverse=='+': + align_ref_pos = mapped_start_pos + else: + align_ref_pos = mapped_start_pos + len(base_map_info) - num_insertions - 1 + + # initialize feature matrix for all events. + if moptions['fnum']==57: + #mfeatures = np.zeros((len(modevents)-end_clip+100-(start_clip-100), (binnum+3+3+4))); + mfeatures = np.zeros((len(modevents)-end_clip+100-(start_clip-100), (binnum+3+3+4))); + else: mfeatures = np.zeros((len(modevents)-end_clip+100-(start_clip-100), (3+3+4))); + + # filter poor alignment + checkneighbornums = [3,6] + checkratios = {3:[6,5,4,2], 6:[11,10,9,3]} + checkratios = {3:[6,5,4,2], 6:[12,10,9,3]} + cgpos = [[], []] + affectneighbor = 1; # 2; + for aligni in range(len(base_map_info)): + # for methylated positions and not-used adjacent positions + if 'motif' in moptions and base_map_info['readbase'][aligni]==moptions['motif'][0][moptions['motif'][1]]: + m_a_st = aligni-moptions['motif'][1]; m_a_end = aligni+len(moptions['motif'][0])-moptions['motif'][1] + if m_a_st>-1 and m_a_end<=len(base_map_info) and ''.join(base_map_info['readbase'][m_a_st:m_a_end])==moptions['motif'][0] and (not ''.join(base_map_info['refbase'][m_a_st:m_a_end])==moptions['motif'][0]): + cgpos[1].extend([(forward_reverse, base_map_info['refbasei'][addi]) for addi in range(aligni-affectneighbor if aligni-affectneighbor>-1 else 0, aligni+affectneighbor+1 if aligni+affectneighbor+1<len(base_map_info) else len(base_map_info))]) + if (not base_map_info['refbase'][aligni]=='-') and \ + (forward_reverse, base_map_info['refbasei'][aligni]) in moptions['fulmodlist'][rname]: + if not base_map_info['readbase'][aligni]=='-': + nextnogap = aligni + 1; + while nextnogap<len(base_map_info): + if not base_map_info['refbase'][nextnogap]=='-': break; + nextnogap += 1 + iscg = False; + # find gaps + for checkneighbornum in checkneighbornums: + if not nextnogap<len(base_map_info): continue; + matchnum = 0; gapnum = 0; + # get gaps for two window sizes + for checki in range(aligni-checkneighbornum, aligni+checkneighbornum+1): + if checki>-1 and checki<len(base_map_info): + if base_map_info['refbase'][checki]==base_map_info['readbase'][checki]: matchnum += 1 + if base_map_info['refbase'][checki]=='-' or base_map_info['readbase'][checki]=='-': gapnum += 1 + if gapnum<=checkratios[checkneighbornum][3]: + for addi in range(aligni-affectneighbor if aligni-affectneighbor>-1 else 0, nextnogap+affectneighbor if nextnogap+affectneighbor<len(base_map_info) else len(base_map_info)): + if addi==aligni: # for methylated positions + cgpos[0].append((forward_reverse, base_map_info['refbasei'][addi])) + else: # for non-used positions + cgpos[1].append((forward_reverse, base_map_info['refbasei'][addi])) + iscg = True; break; + if iscg: continue; + # add more not-used positions if more gaps exist + if not base_map_info['readbase'][aligni]=='-': + nextnogap = aligni + for _ in range(affectneighbor): + nextnogap += 1; + while nextnogap<len(base_map_info['refbase']): + if not base_map_info['refbase'][nextnogap]=='-': break; + nextnogap += 1 + prenogap = aligni + for _ in range(affectneighbor): + prenogap -= 1; + while prenogap>-1: + if not base_map_info['refbase'][prenogap]=='-': break; + prenogap -= 1 + + read0 = aligni; read1 = aligni + for _ in range(affectneighbor): + read0 -= 1 + while read0>-1: + if base_map_info['readbase'][read0]=='-': read0 -= 1 + else: break; + read1 += 1 + while read1<len(base_map_info['readbase']): + if base_map_info['readbase'][read1]=='-': read1 += 1 + else: break; + + if read0<prenogap: + if read0>-1: prenogap = read0 + else: prenogap = 0 + if read1>nextnogap: + if read1<len(base_map_info['readbase']): nextnogap = read1 + else: nextnogap = len(base_map_info['readbase'])-1 + if prenogap<0: prenogap = 0 + if not nextnogap<len(base_map_info['readbase']): nextnogap=len(base_map_info['readbase'])-1 + if not prenogap<len(base_map_info['readbase']): prenogap=len(base_map_info['readbase'])-1 + for excldi in range(prenogap, nextnogap+1): + cgpos[1].append((forward_reverse, base_map_info['refbasei'][excldi])) + + print ('%s%s %d, %d >> %d %d, %d-%d=%d' % (forward_reverse, f5data[readk][3], len(cgpos[0]), len(cgpos[1]), len(modevents)-end_clip-start_clip, start_clip, len(modevents), end_clip, len(modevents)-end_clip)) + + aligni = 0; isdif = False; + for ie in range(start_clip-100, len(modevents)-end_clip+100): + cur_row_num = ie - (start_clip-100); cur_base = '' + # for aligned bases + if ie>=start_clip and ie<len(modevents)-end_clip: + if align_ref_pos<mapped_start_pos: + print ('ERRRR align_ref_pos(%d)<mapped_start_pos(%d)' % (align_ref_pos, mapped_start_pos)) + while base_map_info['readbase'][aligni]=='-': + if not align_ref_pos==base_map_info['refbasei'][aligni]: + print ('ERRRR align_ref_pos(%d) not equal to %d' % (align_ref_pos, base_map_info['refbasei'][aligni] )) + if not base_map_info['refbase'][aligni]=='-': + if forward_reverse=='+': align_ref_pos += 1 + else: align_ref_pos -= 1 + aligni += 1 + if not base_map_info['readbase'][aligni] == modevents['model_state'][ie][2]: + print ('Error Does not match', base_map_info['readbase'][aligni], modevents['model_state'][ie][2], aligni, ie) + isdif = True; + # the first column is the aligned reference position + mfeatures[cur_row_num][0] = align_ref_pos + cur_base = base_map_info['refbase'][aligni] + # the second/third column is for negative/positive labels of methylation + if moptions['posneg'] == 0: # for a data without any modification + if ( (not moptions['anymodlist']==None) and rname in moptions['nomodlist'] and (forward_reverse, base_map_info['refbasei'][aligni]) in moptions['nomodlist'][rname] ): + mfeatures[cur_row_num][1] = 1; mfeatures[cur_row_num][2] = 0 + elif (forward_reverse, base_map_info['refbasei'][aligni]) in moptions['fulmodlist'][rname]: + mfeatures[cur_row_num][1] = 1; mfeatures[cur_row_num][2] = 0 + elif ((not moptions['anymodlist']==None) and rname in moptions['anymodlist'] and (forward_reverse, base_map_info['refbasei'][aligni]) in moptions['anymodlist'][rname] ): + mfeatures[cur_row_num][1] = 1; mfeatures[cur_row_num][2] = 0 + else: # for a data with both modified and un-modified positions + if (forward_reverse, base_map_info['refbasei'][aligni]) in cgpos[0] and (not base_map_info['refbase'][aligni]=='-'): + mfeatures[cur_row_num][1] = 0; mfeatures[cur_row_num][2] = 1 + else: + if (forward_reverse, base_map_info['refbasei'][aligni]) not in cgpos[1]: + if moptions['anymodlist']==None: + if moptions['nomodlist']==None or ( rname in moptions['nomodlist'] and (forward_reverse, base_map_info['refbasei'][aligni]) in moptions['nomodlist'][rname] ): + mfeatures[cur_row_num][1] = 1; mfeatures[cur_row_num][2] = 0 + elif rname in moptions['anymodlist'] and (forward_reverse, base_map_info['refbasei'][aligni]) in moptions['anymodlist'][rname]: + pass + else: + if moptions['nomodlist']==None or ( rname in moptions['nomodlist'] and (forward_reverse, base_map_info['refbasei'][aligni]) in moptions['nomodlist'][rname] ): + mfeatures[cur_row_num][1] = 1; mfeatures[cur_row_num][2] = 0 + if not base_map_info['refbase'][aligni]=='-': + if forward_reverse=='+': align_ref_pos += 1 + else: align_ref_pos -= 1 + aligni += 1 + + # for bin features + if ie>=0 and ie<len(modevents) and moptions['fnum']==57: + for currs in sp_param['f5data'][readk][2][modevents['start'][ie]:int(modevents['start'][ie]+int(modevents['length'][ie]+0.5))]: + if currs>10 or currs<-10: print ('Error raw signal', currs, ie, modevents['start'][ie], modevents['length'][ie]) + curbin = int((currs+5)/binlen) + if curbin<0: curbin = 0 + elif not curbin<binnum: curbin = binnum-1 + mfeatures[cur_row_num][curbin+3] += 1 + if ie>=0 and ie<len(modevents): + # for reference base type feature + if cur_base in myCom.g_ACGT: + mfeatures[cur_row_num][moptions['fnum']-3+3-4+myCom.g_ACGT.index(cur_base)] = 1 + cur_index_add = moptions['fnum'] - 3 + 3 + # for signal mean std and length. + mfeatures[cur_row_num][cur_index_add + 0] = modevents["mean"][ie] + mfeatures[cur_row_num][cur_index_add + 1] = modevents["stdv"][ie] + mfeatures[cur_row_num][cur_index_add + 2] = modevents["length"][ie] + + # truncated too much not-used positions + nbkeys = defaultdict(); + for mfi in range(len(mfeatures)): + if mfeatures[mfi][1] + mfeatures[mfi][2] > 0.9: + for ini in range(mfi-25, mfi+26): + if ini<0 or ini>len(mfeatures)-1: + print("Warning wrong del mfeatures id %d for %s" % (ini, f5data[readk][3])) + else: + nbkeys[ini] = True; + keepInd = sorted(list(nbkeys.keys())); + if len(keepInd)>0: + if not len(keepInd)>len(mfeatures)*0.9: + mfeatures = mfeatures[np.array(keepInd)] + else: + mfeatures = [] + + return (mfeatures, isdif) + + +# +# get the complementary bases +# +def get_complement(na): + if na in myCom.acgt: return myCom.na_bp[na] + else: return na; + +# +# get required information for reach mapping records. +# +def handle_line(moptions, sp_param, f5align): + lsp = sp_param['line'].split('\t') + qname, flag, rname, pos, mapq, cigar, _, _, _, seq, _ = lsp[:11] + # checked query name + if qname=='*': sp_param['f5status'] = "qname is *" + # check mapping quality + elif int(mapq)==255: sp_param['f5status'] = "mapq is 255" + # check mapped positions + elif int(pos)==0: sp_param['f5status'] = "pos is 0" + # check mapped string + elif cigar=='*': sp_param['f5status'] = "cigar is *" + # check reference name + elif rname=='*': sp_param['f5status'] = "rname is *" + if not sp_param['f5status']=="": return qname + + if (qname not in f5align) or f5align[qname][0]<int(mapq): + f5align[qname] = (int(mapq), int(flag), rname, int(pos), cigar, seq) + + return qname + +# +# feature handler/workder for multiprocessing +# +def getFeature_handler(moptions, h5files_Q, failed_Q, version_Q): + while not h5files_Q.empty(): + try: + # get a list of files + f5files, ctfolderid = h5files_Q.get(block=False) + except: + break; + + sp_options = defaultdict(); + sp_options['ctfolder'] = moptions['outFolder']+str(ctfolderid) + if not os.path.isdir(sp_options['ctfolder']): + os.system('mkdir '+sp_options['ctfolder']) + # get features + mGetFeature1(moptions, sp_options, f5files) + # output errors + for errtype, errfiles in sp_options["Error"].items(): + failed_Q.put((errtype, errfiles)); + # double check albacore version + for vk in sp_options["get_albacore_version"]: + version_Q.put((vk, sp_options["get_albacore_version"][vk])) + +# +# read sequence information from a reference genome +# +def readFA(mfa, t_chr=None): + fadict = defaultdict(); + with open(mfa, 'r') as mr: + cur_chr = None; + line = mr.readline(); + while line: + # remove empty spaces + line = line.strip(); + if len(line)>0: + if line[0]=='>': # for each chromosome line + if (not cur_chr==None) and (t_chr in [None, cur_chr]): + fadict[cur_chr] = ''.join(fadict[cur_chr]) + cur_chr = line[1:].split()[0] + if t_chr in [None, cur_chr]: + fadict[cur_chr] = [] + else: # for sub-sequence line in a reference file + if t_chr in [None, cur_chr]: + fadict[cur_chr].append(line.upper()) + line = mr.readline(); + # for the last chromosome in the file + if (not cur_chr==None) and (t_chr in [None, cur_chr]): + fadict[cur_chr] = ''.join(fadict[cur_chr]) + return fadict + +# +# get reference positions for motif-based modifications +# +def readMotifMod(fadict, mpat='Cg', mposinpat=0, t_chr=None, t_start=None, t_end=None): + pos_dict = defaultdict(int) + + # get motif and complementary motif + pat3 = copy.deepcopy(mpat.upper()) + comp_pat3 = ''.join([get_complement(curna) for curna in pat3][::-1]) + comp_mposinpat = len(comp_pat3)-1-mposinpat + + fakeys = fadict.keys(); + cpgdict = defaultdict(int); + all_a = defaultdict() + for fak in fakeys: + cpgnum = [0, 0] + # motif-based reference positions + cpgdict[fak] = defaultdict() + # position of bases of interest + all_a[fak] = defaultdict() + for i in range(len(fadict[fak])): + if (t_start==None or i>=t_start) and (t_end==None or i<=t_end): + if fadict[fak][i]==mpat[mposinpat]: # for forward strand + all_a[fak][('+', i)] = True; + elif get_complement(fadict[fak][i])==mpat[mposinpat]: # for reverse strand + all_a[fak][('-', i)] = True; + + # check motif in forward strand + if i-mposinpat>=0 and i+len(comp_pat3)-1-mposinpat<len(fadict[fak]) and ''.join(fadict[fak][i-mposinpat:(i+len(comp_pat3)-1-mposinpat+1)])==pat3: + cpgdict[fak][('+', i)] = [1, fadict[fak][i]]; cpgnum[0] += 1 + elif i-comp_mposinpat>=0 and i+len(comp_pat3)-1-comp_mposinpat<len(fadict[fak]) and ''.join(fadict[fak][i-comp_mposinpat:(i+len(comp_pat3)-1-comp_mposinpat+1)])==comp_pat3: # check motif in reverse strand + cpgdict[fak][('-', i)] = [1, fadict[fak][i]]; cpgnum[1] += 1 + else: + pass + print('%s%d site: %d(+) %d(-) for %s' % (pat3, mposinpat, cpgnum[0], cpgnum[1], fak)) + return (cpgdict, all_a) + + +# +# a multiprocessing manager to get features from all long reads. +# +def getFeature_manager(moptions): + start_time = time.time(); + # multipprocessing manager + pmanager = multiprocessing.Manager(); + + # prepare output folder + if os.path.isdir(moptions['outFolder']): + os.system('rm -dr '+moptions['outFolder']) + if not os.path.isdir(moptions['outFolder']): + os.system('mkdir '+moptions['outFolder']) + + moptions['size_per_batch'] = moptions['size_per_batch'] * (10**7) + + # read reference information + fadict = readFA(moptions['Ref'],moptions['region'][0]) + if moptions['motifORPos']==1: # get motif-based positions for modifications + moptions['fulmodlist'], moptions['nomodlist'] = readMotifMod(fadict, moptions['motif'][0], moptions['motif'][1], moptions['region'][0], moptions['region'][1], moptions['region'][2]) + moptions['anymodlist'] = None + moptions['nomodlist'] = None; # add for simple process + elif moptions['motifORPos']==2: # modification position is specified by the files + fuldfiles = glob.glob(moptions["fulmod"]); + moptions['fulmodlist'] = defaultdict(lambda: defaultdict()); + if not moptions["anymod"]==None: # partially modified positions + anydfiles = glob.glob(moptions["anymod"]) + moptions['anymodlist'] = defaultdict(lambda: defaultdict()); + else: + moptions['anymodlist'] = None + if not moptions["nomod"]==None: # completely un-modified positions + nodfiles = glob.glob(moptions["nomod"]) + moptions['nomodlist'] = defaultdict(lambda: defaultdict()); + else: + moptions['nomodlist'] = None + mthreadin = [moptions['fulmodlist'], moptions['anymodlist'], moptions['nomodlist']] + mthfiles = [fuldfiles, anydfiles, nodfiles] + # read completely modified positions, partially modified positions, completely un-modified positions from files + for mthi in range(len(mthreadin)): + curmeth = mthreadin[mthi]; curfilelist = mthfiles[mthi] + if curmeth==None or curfilelist==None: continue; + for curmthf in curfilelist: + with open(curmthf, 'r') as mreader: + line = mreader.readline(); + while line: + if len(line)>0: + tchr, tstrand, tpos = line.split()[:3] + curmeth[tchr][(tstrand, int(tpos))] = [1-mthi, fadict[tchr][int(tpos)]]; + line = mreader.readline(); + for tchr in moptions['fulmodlist'] if moptions['anymodlist']==None else moptions['anymodlist']: + if len(moptions['fulmodlist'][tchr])>0 or ((not moptions['anymodlist']==None) and len(moptions['anymodlist'][tchr])>0): + print ('%s fulmod=%d anymod=%d nomod=%d' % (tchr, len(moptions['fulmodlist'][tchr]), len(moptions['anymodlist'][tchr]) if (not moptions['anymodlist']==None) else -1, len(moptions['nomodlist'][tchr]) if (not moptions['nomodlist']==None) else -1)) + + if True: #False: + # get all input fast5 files + f5files = glob.glob(os.path.join(moptions['wrkBase'],"*.fast5" )) + if moptions['recursive']==1: + f5files.extend(glob.glob(os.path.join(moptions['wrkBase'],"*/*.fast5" ))) + f5files.extend(glob.glob(os.path.join(moptions['wrkBase'],"*/*/*.fast5" ))) + f5files.extend(glob.glob(os.path.join(moptions['wrkBase'],"*/*/*/*.fast5" ))) + + + print('Total files=%d' % len(f5files)) + h5files_Q = pmanager.Queue(); + failed_Q = pmanager.Queue() + version_Q = pmanager.Queue() + + # split input fast5 files into different batch + h5_batch = []; h5batchind = 0; + for f5f in f5files: + h5_batch.append(f5f); + if len(h5_batch)==moptions['files_per_thread']: + h5files_Q.put((h5_batch, h5batchind)) + h5batchind += 1 + h5_batch = []; #break; ### feature500 + if len(h5_batch)>0: + h5files_Q.put((h5_batch, h5batchind)) + + # each thread handle a batch a time and repeat for all batches. + share_var = (moptions, h5files_Q, failed_Q, version_Q) + handlers = [] + for hid in range(moptions['threads']): + p = multiprocessing.Process(target=getFeature_handler, args=share_var); + p.start(); + handlers.append(p); + + # get failed files. + failed_files = defaultdict(list); + version_default = defaultdict(lambda: defaultdict(int)); + while any(p.is_alive() for p in handlers): + try: + errk, fns = failed_Q.get(block=False); + failed_files[errk].extend(fns) + curv, curv_num = version_Q.get(block=False); + version_default[curv] += curv_num + except: + time.sleep(1); + continue; + + # output failure information + if len(failed_files)>0: + print ('Error information for different fast5 files:') + for errtype, errfiles in failed_files.items(): + print ('\t%s %d' % (errtype, len(errfiles))) + print("abversion info {}".format(str(version_default))) + sys.stdout.flush() + end_time = time.time(); + print ("Total consuming time %d" % (end_time-start_time)) + + + +# for indepdent testing of code +if __name__=='__main__': +# if len(sys.argv)>4: + moptions = {} + moptions['basecall_1d'] = 'Basecall_1D_000' + moptions['basecall_1d'] = ['Basecall_1D_000'] + moptions['basecall_2strand'] = 'BaseCalled_template' + + moptions['outLevel'] = myCom.OUTPUT_WARNING + moptions['outLevel'] = myCom.OUTPUT_INFO + + moptions['modfile'] = '../../mod_output/train1/2/mod_train' + + moptions['fnum'] = 53; + moptions['hidden'] = 100; + moptions['windowsize'] = 21; + + moptions['threads'] = 8 + moptions['threads'] = 1 + moptions['files_per_thread'] = 500 + + mDetect_manager(moptions)