a b/faigen/data/sequence.py
1
from fastai import *
2
from fastai.text import *
3
from Bio import Seq
4
from Bio.Seq import Seq
5
from Bio import SeqIO
6
from Bio.SeqRecord import SeqRecord
7
import re
8
from dna2vec.multi_k_model import MultiKModel
9
import random
10
from gensim.models import Word2Vec
11
from pathlib import Path
12
import os
13
from tqdm import tqdm
14
from  torch import tensor
15
import gzip
16
from mimetypes import guess_type
17
from functools import partial
18
19
# fasta extensions bansed on https://en.wikipedia.org/wiki/FASTA_format
20
gen_seq_extensions = ['.fasta', '.fna', '.ffn', '.faa', '.frn','.fa',".gz"]
21
22
23
24
def ifnone(a: Any, b: Any) -> Any:
25
    "`a` if `a` is not None, otherwise `b`."
26
    return b if a is None else a
27
28
29
def gen_seq_reader(fn: PathOrStr):
30
    "Read the sequences in `fn`."
31
32
    ext = str(fn).split(".")[-1]
33
    _open = partial(gzip.open, mode='rt') if ext.lower() == "gz" else open
34
    with _open(fn) as f:
35
        return SeqIO.to_dict(SeqIO.parse(f, 'fasta'))
36
37
def seq_record(fn: PathOrStr, record_id:str):
38
    content = gen_seq_reader(fn)
39
    for record in content:
40
        if content[record].id == record_id:
41
            return content[record].seq
42
    return None
43
44
45
##=====================================
46
## Processors
47
##=====================================
48
49
class GSFileProcessor(PreProcessor):
50
    """`PreProcessor` Opens the fasta file listed in item,
51
    reads fasta and returns sequences with IDs provided by the item.
52
    """
53
54
    def __init__(self, ds: ItemList = None, filters=None):
55
        self.ds,self.filters = ds, filters
56
57
    def process_one(self, item) -> Seq:
58
        return seq_record(item["file"], item["id"])
59
60
    def process(self, ds: Collection) -> Collection[Seq]:
61
        df = pd.DataFrame(data=list(ds.items), columns=['file', 'description', "id", "name"])
62
        multi_fastas = df.groupby("file").agg({"id": list})
63
        res = []
64
        for row in tqdm(multi_fastas.index.values):
65
            content = gen_seq_reader(str(row))
66
            for record in content:
67
                if content[record].id in multi_fastas.loc[row, 'id']:
68
                    res.append(content[record].seq)
69
        ds.items = apply_filters(res,self.filters)
70
        ds.state = "sequence"
71
72
class GSTokenizer():
73
    def __init__(self, ngram=8, skip=0, n_cpus=1):
74
        self.ngram, self.skip,self.n_cpus = ngram, skip,n_cpus
75
76
    def tokenizer(self, t):
77
        if self.ngram == 1:
78
            toks = list(t)
79
            if self.skip > 0:
80
                toks = toks[::2] if self.skip == 1 else toks[::self.skip]
81
        else:
82
            toks = [t[i:i + self.ngram] for i in range(0, len(t), self.ngram + self.skip) if i+self.ngram < len(t)]
83
        return toks
84
85
    def _process_all_1(self, texts:Collection[str]) -> List[List[str]]:
86
        "Process a list of `texts` in one process."
87
        return [self.tokenizer(str(t)) for t in texts]
88
89
    def process_all(self, texts:Collection[str]) -> List[List[str]]:
90
        "Process a list of `texts`."
91
        if self.n_cpus <= 1: return self._process_all_1(texts)
92
        with ProcessPoolExecutor(self.n_cpus) as e:
93
            res = sum(e.map(self._process_all_1,
94
                             partition_by_cores(texts, self.n_cpus)), [])
95
        return res
96
97
98
class GSTokenizeProcessor(PreProcessor):
99
    "`PreProcessor` that tokenizes the texts in `ds`."
100
101
    def __init__(self, ds: ItemList = None, tokenizer: Tokenizer = None, ngram:int=8, skip:int=0, chunksize: int = 10000,
102
                 mark_fields: bool = False):
103
        self.tokenizer, self.chunksize, self.mark_fields = ifnone(tokenizer, GSTokenizer(ngram=ngram, skip=skip)), chunksize, mark_fields
104
105
    def process_one(self, sequence):
106
        return self.tokenizer.tokenizer(str(sequence))
107
108
    def process(self, ds):
109
        tokens = []
110
        # if len(ds.items) < self.chunksize: ds.items = self.tokenizer._process_all_1(ds.items); return
111
        chunks = len(ds.items) // self.chunksize + 1
112
        for i in tqdm(range(chunks)):
113
            advance = min((len(ds.items) - i * self.chunksize), self.chunksize)
114
            tokens += self.tokenizer.process_all(ds.items[i:i + advance])
115
        ds.items = tokens
116
        ds.state = "tokens"
117
118
class GSVocab(Vocab):
119
    def __init__(self, itos):
120
        self.itos = itos
121
        self.stoi = collections.defaultdict(int, {v: k for k, v in enumerate(self.itos)})
122
123
    @classmethod
124
    def create(cls, tokens, max_vocab, min_freq):
125
        freq = Counter(p for o in tokens for p in o)
126
        itos = [o for o, c in freq.most_common(max_vocab) if c >= min_freq]
127
        itos.insert(0, 'pad')
128
        return cls(itos)
129
130
class GSNumericalizeProcessor(PreProcessor):
131
    "`PreProcessor` that numericalizes the tokens in `ds`."
132
133
    def __init__(self, ds: ItemList = None, vocab: Vocab = None, max_vocab: int = 80000, min_freq: int = 3):
134
        vocab = ifnone(vocab, ds.vocab if ds is not None else None)
135
        self.vocab, self.max_vocab, self.min_freq = vocab, max_vocab, min_freq
136
137
    def process_one(self, item): return np.array(self.vocab.numericalize(item), dtype=np.int64)
138
139
    def process(self, ds):
140
        if self.vocab is None: self.vocab = GSVocab.create(ds.items, self.max_vocab, self.min_freq)
141
        ds.vocab = self.vocab
142
        super().process(ds)
143
        ds.state="numericalized"
144
145
146
class Dna2VecProcessor(PreProcessor):
147
    "`PreProcessor` that tokenizes the texts in `ds`."
148
149
150
    def __init__(self, ds: ItemList = None, agg:Callable=partial(np.mean, axis=0), emb=None, n_cpu=7):
151
        self.agg, self.n_cpu = agg, n_cpu
152
        self.emb = None if emb is None else emb if isinstance(emb, Word2Vec) else Word2Vec.load_word2vec_format(emb)
153
154
155
    def process_one(self, tokens):
156
        if self.emb is None: raise ValueError("Provide path to embedding or Word2Vec object using  ```emb``` instance variable ")
157
        tokens= list(filter(lambda x: set(x) == set('ATGC'), tokens))
158
        vectors = np.asarray([[0.] * 100, [0.] * 100])
159
        while len(tokens) > 0:
160
            try:
161
                vectors = np.asarray(self.emb[tokens])
162
                break
163
            except KeyError as e:
164
                tokens.remove(e.args[0])  # remove k-mer absent in the embedding
165
        return vectors if self.agg is None else self.agg(vectors)
166
167
    def _process_all_1(self, tokens:Collection[str]) -> List[List[str]]:
168
        return [self.process_one(t) for t in tokens]
169
170
    def process(self, ds):
171
        self.emb = ds.emb if (hasattr(ds, "emb") and ds.emb is not None) else self.emb
172
        res =[]
173
174
        with ProcessPoolExecutor(self.n_cpu) as e:
175
            res = sum(e.map(self._process_all_1,
176
                             partition_by_cores(ds.items, self.n_cpu)), [])
177
        ds.items = res
178
        ds.state = "vector"
179
180
181
182
##=====================================
183
## DataBunch
184
##=====================================
185
186
187
class GSUDataBunch(DataBunch):
188
    "DataBunch suitable for unsupervised learning from fasta data"
189
190
    @classmethod
191
    def from_folder(cls, path: PathOrStr, train: str = 'train', valid: str = 'valid', test: Optional[str] = None,
192
                    classes: Collection[Any] = None, tokenizer: Tokenizer = None, vocab: Vocab = None,
193
                    chunksize: int = 10000,
194
                    max_vocab: int = 70000, min_freq: int = 2, mark_fields: bool = False, include_bos: bool = True,
195
                    filters:Collection[Callable] = None,
196
                    include_eos: bool = False, n_cpus: int = None, ngram: int = 8, skip: int = 0, **kwargs):
197
        "Create a unsupervised learning data bunch from fasta  files in folders."
198
199
        path = Path(path).absolute()
200
        tok = Tokenizer(tok_func=partial(GSTokenizer, ngram=ngram, skip=skip), n_cpus=n_cpus)
201
        processor = [GSFileProcessor(),
202
                      GSTokenizeProcessor(tokenizer=tok, chunksize=chunksize, mark_fields=mark_fields),
203
                      GSNumericalizeProcessor(vocab=vocab, max_vocab=max_vocab, min_freq=min_freq)]
204
        src = ItemLists(path, NumericalizedGSList.from_folder(path=path, filters = filters, processor=processor),
205
                        ItemList(items=[],ignore_empty=True))
206
        src=src.label_empty()
207
        if test is not None: src.add_test_folder(path / test)
208
        return src.databunch(**kwargs)
209
210
class Dna2VecDataBunch(DataBunch):
211
    "DataBunch of tokenized genomic sequences for use with dna2vec embedding"
212
213
    @classmethod
214
    def from_folder(cls, path: PathOrStr, train: str = 'train', valid: str = 'valid', test: Optional[str] = None,
215
                    classes: Collection[Any] = None, tokenizer: Tokenizer = None,
216
                    chunksize: int = 1000, mark_fields: bool = False,
217
                    filters:Collection[Callable] = None, labeler:Callable=None, n_cpus: int = 1,
218
                    ngram: int = 8, skip: int = 0, agg:Callable=None, emb = None, **kwargs):
219
220
        path = Path(path).absolute()
221
        tok = ifnone(tokenizer, GSTokenizer(ngram=ngram, skip=skip, n_cpus=n_cpus))
222
        processors = [GSFileProcessor(),
223
                     GSTokenizeProcessor(tokenizer=tok, chunksize=chunksize, mark_fields=mark_fields),
224
                     Dna2VecProcessor(emb=emb, agg=agg)]
225
        train_items = Dna2VecList.from_folder(path=path / train, filters=filters, processor=processors)
226
        valid_items = Dna2VecList.from_folder(path=path / valid, filters=filters, processor=processors)
227
        src = ItemLists(path, train_items, valid_items)
228
        tl,cl = train_items.label_from_description(labeler)
229
        vl,_ = valid_items.label_from_description(labeler, labels=cl)
230
231
        src=src.label_from_lists(train_labels=tl, valid_labels=vl,label_cls=CategoryList, classes = cl)
232
        if test is not None: src.add_test_folder(path / test)
233
        return src.databunch(**kwargs)
234
235
236
237
def regex_filter(items:Collection, rx:str= "", target:str= "description", search=True, keep:bool=True) -> Collection:
238
    if rx== "": return items
239
    rx = re.compile(rx)
240
    if search: return list(filter(lambda x: rx.search(x[target]) if keep else  not rx.search(x[target]), items))
241
    return list(filter(lambda x: rx.match(x[target]) if keep else not rx.match(x[target]), items))
242
243
244
def id_filter(items:Collection, ids:Collection)->Collection:
245
    return [i for i in list(items) if i['id'] in ids]
246
247
def name_filter(items:Collection, names:Collection)->Collection:
248
    return [i for i in list(items) if i['name'] in names]
249
250
def count_filter(items:Collection, num_fastas:tuple=(1, None), keep:int=None, sample:str= "first") -> Collection:
251
    df = pd.DataFrame(data=list(items), columns=['file', 'description', "id", "name"])
252
    df_agg = df.groupby("file").agg({"id": list})
253
    selected_ids = []
254
    for file in iter(df_agg.index):
255
        ids = df_agg.loc[file,"id"]
256
        if len(ids) < num_fastas[0]: continue
257
        if num_fastas[1] is not None and len(ids) > num_fastas[1]: continue
258
        if keep is None:
259
            selected_ids += ids
260
        else:
261
            selected_ids += ids[:min([keep, len(ids)])] if sample == "first" else ids[-min([keep, len(ids)]):]
262
    res= id_filter(items=items, ids=selected_ids)
263
    return res
264
265
def seq_len_filter(items:Collection, len:tuple=(1, None), keep:bool=True) -> Collection:
266
    """filters sequence by length. ```len``` tuple is (min,max) values for filtering, ```keep``` controls """
267
    selected_ids=[i["id"] for i in items if (i["len"] >= len[0] and (len[1] is None or i["len"] < len[1])) == keep]
268
    res= id_filter(items=items, ids=selected_ids)
269
    return res
270
271
272
def total_count_filter(items:Collection, parser:Callable,num_fastas:tuple=(1, None), balance:bool=True) -> Collection:
273
    """Counts items for category extracted by parser.
274
    Subsamples overrepresented categories to match max amount of samples in the least represented category  """
275
    pass
276
277
278
def describe(items:Collection) -> dict:
279
    """compute statistics for items in the list"""
280
    pass
281
282
283
def apply_filters(dicts:Collection, filters:Union[Callable, Collection[Callable]]=None) -> Collection:
284
    if filters is None: return dicts
285
    if callable(filters): return filters(items=dicts)
286
    for f in filters: dicts = f(items=dicts)
287
    return dicts
288
289
290
class NumericalizedGSList(ItemList):
291
    "`ItemList`of numericalised genomic sequences."
292
    _bunch, _processor = GSUDataBunch, [GSFileProcessor, GSTokenizeProcessor, GSNumericalizeProcessor]
293
294
    def __init__(self, items:Iterator, vocab:Vocab=None, pad_idx:int=1, **kwargs):
295
        super().__init__(items, **kwargs)
296
        self.vocab,self.pad_idx = vocab,pad_idx
297
        self.copy_new += ['vocab', 'pad_idx']
298
299
300
    @classmethod
301
    def from_folder(cls, path: PathOrStr = '.', extensions: Collection[str] = None,
302
                    filters:Union[Callable, Collection[Callable]]=None, vocab:GSVocab=None, **kwargs) -> ItemList:
303
        "Get the list of files in `path` that have an image suffix. `recurse` determines if we search subfolders."
304
        extensions = ifnone(extensions, gen_seq_extensions)
305
        this = super().from_folder(path=path, extensions=extensions, **kwargs)
306
        return cls(items=fasta_content(this,filters), path=path, vocab=vocab, **kwargs)
307
308
def _get_files(parent, p, f, extensions):
309
    "Return  sequence of files in a folder files including gzipped format for given extensions"
310
    p = Path(p)#.relative_to(parent)
311
    if isinstance(extensions,str): extensions = [extensions]
312
    low_extensions = [e.lower() for e in extensions] if extensions is not None else None
313
    res = [p/o for o in f if not o.startswith('.')
314
           and (extensions is None or f'.{o.split(".")[-1].lower()}' in low_extensions
315
           or (o.split(".")[-1].lower() == "gz" and  f'.{o.split(".")[-2].lower()}' in low_extensions))]
316
    return res
317
318
class Dna2VecList(ItemList):
319
    "`ItemList` of Kmer tokens vectorized by dna2vec embedding"
320
    _bunch, _processor = Dna2VecDataBunch, [GSFileProcessor, GSTokenizeProcessor,Dna2VecProcessor]
321
322
    def __init__(self, items:Iterator, path, ngram:int=8, n_cpus:int=7,
323
                 emb:Union[Path,str,Word2Vec]=None,
324
                 agg:Callable=partial(np.mean, axis=0), #mean values of dna2vec embedding vectors for all k-mers in genome
325
                **kwargs):
326
        super().__init__(items, path, **kwargs)
327
        self.ngram,self.agg,self.n_cpus = ngram,agg,n_cpus
328
        self.emb = emb if isinstance(emb, Word2Vec) else Word2Vec.load_word2vec_format(emb) if emb is not None else None
329
        self.descriptions, self.ids, self.names, self.files, self.lengths= None, None, None, None, None
330
        self.state = "initial"
331
332
    def get_metadata(self, filters):
333
        dicts = []
334
        for file in tqdm(self.items):
335
            content = gen_seq_reader(file)
336
            dicts += [
337
                {"file": str(file),
338
                 'description': content[r].description,
339
                 'id': content[r].id,
340
                 'name': content[r].name,
341
                 "len":len(content[r].seq)}
342
                for r in content.keys()]
343
        self.items = apply_filters(dicts, filters)
344
        self.descriptions = [item['description'] for item in list(self.items)]
345
        self.ids = [item['id'] for item in list(self.items)]
346
        self.names = [item['name'] for item in list(self.items)]
347
        self.files = [item['file'] for item in list(self.items)]
348
        self.lengths = [item["len"] for item in list(self.items)]
349
        return self
350
351
    @property
352
    def c(self):
353
        return len(self.items[0])
354
355
    def get(self, i) ->Any:
356
        return self.items[i]
357
358
    def process_one(self, i):
359
        return self.items[i]
360
361
    def analyze_pred(self, pred):
362
363
        _, index = ensor.max()
364
        return index
365
366
    @classmethod
367
    def get_files(cls, path: PathOrStr, extensions: Collection[str] = None, recurse: bool = False,
368
                  include: Optional[Collection[str]] = None) -> FilePathList:
369
        "Return list of files in `path` that have a suffix in `extensions`; optionally `recurse`."
370
371
        def _get_files(parent, p, f, extensions):
372
            "Return  sequence of files in a folder files including gzipped format for given extensions"
373
            p = Path(p)  # .relative_to(parent)
374
            if isinstance(extensions, str): extensions = [extensions]
375
            low_extensions = [e.lower() for e in extensions] if extensions is not None else None
376
            res = [p / o for o in f if not o.startswith('.')
377
                   and (extensions is None or f'.{o.split(".")[-1].lower()}' in low_extensions
378
                        or (o.split(".")[-1].lower() == "gz" and f'.{o.split(".")[-2].lower()}' in low_extensions))]
379
            return res
380
381
        if recurse:
382
            res = []
383
            for i, (p, d, f) in enumerate(os.walk(path)):
384
                # skip hidden dirs
385
                if include is not None and i == 0:
386
                    d[:] = [o for o in d if o in include]
387
                else:
388
                    d[:] = [o for o in d if not o.startswith('.')]
389
                res += _get_files(path, p, f, extensions)
390
            return res
391
        else:
392
            f = [o.name for o in os.scandir(path) if o.is_file()]
393
            return _get_files(path, path, f, extensions)
394
395
    def label_from_description(self, labeler:Callable=None, labels:Collection=None):
396
        assert labeler is not None, "must provide labeler"
397
        lbls=list(map(labeler, self.descriptions))
398
        cl = list(set(lbls)) if labels is None else labels
399
        def _oh(i, cat_cnt):
400
            res=np.zeros(cat_cnt,dtype=int); res[i] = 1
401
            return res
402
        # return [_oh(cl.index(x), len(cl)) for x in lbls], cl
403
        # return [cl.index(x) for x in lbls],cl
404
        return lbls,cl
405
406
407
    @classmethod
408
    def from_folder(cls, path: PathOrStr = '.', extensions: Collection[str] = None,
409
                    filters:Collection[Callable]=None, ngram:int=8, n_cpus=1, agg:Callable=None, recurse:bool = False, **kwargs) -> ItemList:
410
        "Get the list of files in `path` that have an sequence suffix. `recurse` determines if we search subfolders."
411
        extensions = ifnone(extensions, gen_seq_extensions)
412
        this = cls(items=cls.get_files(path, extensions, recurse),path=path, **kwargs)
413
        return this.get_metadata(filters)
414
415
416
    @classmethod
417
    def store_by_label_class(self,path):
418
        """store fasta into multi-fasta files labeled by class """
419
        pass
420
421
    def __repr__(self):
422
        return f"{self.__class__.__name__} {len(self.items)}  items; {self.ngram}-mer tokensation\n" \
423
            f" {self.emb}, reducer:{self.agg}" \
424
            f"\n Head: \n  {self.descriptions[0]}\n  {self.items[0]}" \
425
            f"\n Tail: \n  {self.descriptions[-1]}\n  {self.items[-1]}"
426
427
if __name__ == '__main__':
428
429
    #test DataBunch
430
    # DB = "/home/serge/database/data/genomes/ncbi-genomes-2019-04-07/Bacillus"
431
    # # DB="/data/genomes/GenSeq_fastas"
432
    # bunch = Dna2VecDataBunch.from_folder(DB,
433
    #                                      filters=None,  #[partial(count_filter, keep=3, sample="last")],
434
    #                                      emb="../pretrained/embeddings/dna2vec-20190611-1940-k8to8-100d-10c-4870Mbp-sliding-LmP.w2v",
435
    #                                      ngram=8, skip=0,
436
    #                                      labeler=lambda x: " ".join(x.split()[1:3]),
437
    #                                      n_cpus=7,agg=partial(np.mean, axis=0),one_hot=True)
438
    # print(bunch.train_ds.y)
439
440
    #test preprocessing for embedding training
441
442
    # Dna2VecList.preprocess_for_dna2vec_training(out_path="/data/genomes/dna2vec_train",
443
    #                                                    path="/data/genomes/GenSeq_fastas",
444
    #                                                    filters=[partial(regex_filter, rx="plasmid", keep=False),
445
    #                                                             partial(seq_len_filter, len=(100000,None))])
446
447
    #test labeling
448
449
    # data.label_from_description(lambda x: x.split()[1], from_item_lists=True)
450
    # print(data)
451
452
    #test get item
453
    # data = Dna2VecList.from_folder("/data/genomes/GenSeq_fastas/valid",filters=[partial(regex_filter, rx="plasmid")])
454
    # print(data.get(0))
455
456
    #test process all itmes
457
    # data = Dna2VecList.from_folder("/data/genomes/GenSeq_fastas", filters=[partial(regex_filter, rx="plasmid",keep=False)],
458
    #                                emb='/data/genomes/embeddings/dna2vec-20190614-1532-k11to11-100d-10c-4870Mbp-sliding-X6c.w2v')
459
    # # print(data.get(0)))
460
    # processors = [GSFileProcessor(),GSTokenizeProcessor(tokenizer=GSTokenizer(ngram=11, skip=0, n_cpus=7)),Dna2VecProcessor()]
461
    # for p in processors: p.process(data)
462
    # print(data)
463
464
    #test gzipped fastas
465
    data = Dna2VecList.from_folder("/data/genomes/gzipped", filters=[],
466
                                    emb='/data/genomes/embeddings/dna2vec-20190611-1940-k8to8-100d-10c-4870Mbp-sliding-LmP.w2v')
467
    print(data.get(0))