Switch to side-by-side view

--- a
+++ b/singlecellmultiomics/molecule/consensus.py
@@ -0,0 +1,204 @@
+import sklearn.ensemble
+import numpy as np
+import pandas as pd
+import time
+
+def calculate_consensus(molecule, consensus_model, molecular_identifier, out, **model_kwargs ):
+    """
+    Create consensus read for molecule
+
+    Args:
+        molecule (singlecellmultiomics.molecule.Molecule)
+
+        consensus_model
+
+        molecular_identifier (str) : identier for this molecule, will be suffixed to the reference_id
+
+        out(pysam.AlingmentFile) : target bam file
+
+        **model_kwargs : arguments passed to the consensus model
+
+    """
+    try:
+        consensus_reads = molecule.deduplicate_to_single_CIGAR_spaced(
+            out,
+            f'c_{molecule.get_a_reference_id()}_{molecular_identifier}',
+            consensus_model,
+            NUC_RADIUS=model_kwargs['consensus_k_rad']
+            )
+        for consensus_read in consensus_reads:
+            consensus_read.set_tag('RG', molecule[0].get_read_group())
+            consensus_read.set_tag('mi', molecular_identifier)
+            out.write(consensus_read)
+    except Exception as e:
+
+        molecule.set_rejection_reason('CONSENSUS_FAILED',set_qcfail=True)
+        molecule.write_pysam(out)
+
+def base_calling_matrix_to_df(
+        x,
+        ref_info=None,
+        NUC_RADIUS=1,
+        USE_RT=True):
+    """
+    Convert numpy base calling feature matrix to pandas dataframe with annotated columns
+
+    Args:
+        x(np.array) : feature matrix
+        ref_info(list) : reference position annotations (will be used as index)
+        NUC_RADIUS(int) : generate kmer features target nucleotide
+        USE_RT(bool) : use RT reaction features
+
+    Returns:
+        df (pd.DataFrame)
+    """
+    df = pd.DataFrame(x)
+    # annotate the columns
+    BASE_COUNT = 5
+    RT_INDEX = 7 if USE_RT else None
+    STRAND_INDEX = 0
+    PHRED_INDEX = 1
+    RC_INDEX = 2
+    MATE_INDEX = 3
+    CYCLE_INDEX = 4
+    MQ_INDEX = 5
+    FS_INDEX = 6
+    COLUMN_OFFSET = 0
+    features_per_block = 8 - (not USE_RT)
+
+    block_header = ["?"] * features_per_block
+    block_header[STRAND_INDEX] = 'strand'
+    block_header[PHRED_INDEX] = 'phred'
+    block_header[RC_INDEX] = 'read_count'
+    block_header[MATE_INDEX] = 'mate'
+    block_header[CYCLE_INDEX] = 'cycle'
+    block_header[MQ_INDEX] = 'mq'
+    block_header[FS_INDEX] = 'fragment_size'
+    block_header[RT_INDEX] = 'rt_reactions'
+    k_header = []
+    for k in range(NUC_RADIUS * 2 + 1):
+        for base in 'ACGTN':
+            k_header += [(k, b, base) for b in block_header]
+
+    try:
+        df.columns = pd.MultiIndex.from_tuples(k_header)
+    except ValueError:  # the dataframe is a concateenation of multiple molecules
+        pass
+    if ref_info is not None:
+        df.index = pd.MultiIndex.from_tuples(ref_info)
+    return df
+
+def get_consensus_training_data(
+        molecule_iterator,
+        mask_variants=None,
+        n_train=100_000, # When None, training data is created until molecule source depletion
+        skip_already_covered_bases = True,
+        #yield_results=False, # Yield results instead of returning a matrix
+        **feature_matrix_args):
+
+    """
+    Create a tensor/matrix containing alignment and base calling information, which can be used for consensus calling.
+    This function also creates a vector containing the corresponding reference bases, which can be used for training a consensus model.
+
+    Args:
+        molecule_iterator : generator which generates molecules from which base calling feature matrices are extracted
+
+        mask_variants (pysam.VariantFile) : variant locations which should be excluded from the matrix
+
+        n_train (int) : amount of rows in the matrix
+
+        skip_already_covered_bases(bool) : when True every reference position is at most a single row in the output matrix, this prevents overfitting
+
+        **feature_matrix_args : Arguments to pass to the feature matrix function of the molecules.
+
+    """
+
+    #if not yield_results:
+    X = None
+    y = []
+    molecules_used = 0
+    training_set_size = 0
+    last_end = None
+    last_chrom = None
+    try:
+
+        for i, molecule in enumerate(molecule_iterator):
+            # Never train the same genomic location twice
+            if skip_already_covered_bases:
+                if last_chrom is not None and last_chrom != molecule.chromosome:
+                    last_end = None
+                if last_end is not None and molecule.spanStart < last_end:
+                    continue
+
+            train_result = molecule.get_base_calling_training_data(
+                mask_variants, **feature_matrix_args)
+
+            if train_result is None:
+                # Continue when the molecule does not have bases where we can learn from
+                continue
+
+            x, _y = train_result
+            training_set_size += len(_y)
+
+            #if yield_results:
+            #    yield x, _y
+
+            #else:
+            if X is None:
+                X = np.empty((0, x.shape[1]))
+                print(
+                    f"Creating feature matrix with {x.shape[1]} dimensions and {n_train} training base-calls")
+            y += _y
+            X = np.append(X, x, axis=0)
+            last_chrom = molecule.chromosome
+
+            if training_set_size >= n_train:
+                break
+
+            molecules_used+=1
+            if molecule.spanEnd is not None:
+                last_end = molecule.spanEnd
+            else:
+                last_end += len(_y)
+
+
+    except KeyboardInterrupt as e:
+        print("Got keyboard interrupt, stopping to load more data")
+
+    if molecules_used > 0:
+        print(
+                f'Finished, last genomic coordinate: {molecule.chromosome} {molecule.spanEnd}, training set size is {training_set_size}, used {molecules_used} molecules for training')
+    #if not yield_results:
+    return X, y
+
+
+
+
+
+def train_consensus_model(
+        molecule_iterator,
+        mask_variants=None,
+        classifier=None,
+        n_train=100_000,
+        skip_already_covered_bases = True,
+        **feature_matrix_args):
+    if classifier is None:  # default to random forest
+        classifier = sklearn.ensemble.RandomForestClassifier(
+            n_jobs=-1,
+            n_estimators=100,
+            oob_score=True,
+            max_depth=7,
+            min_samples_leaf=5
+        )
+    X, y = get_consensus_training_data(
+        molecule_iterator, mask_variants=mask_variants, n_train=n_train,
+        skip_already_covered_bases=skip_already_covered_bases,**feature_matrix_args)
+    y = np.array(y)
+    # remove unkown ref bases from set
+    X = np.array(X)[y != 'N']
+    y = y[y != 'N']
+    classifier.fit(X, y)
+    if isinstance(classifier, sklearn.ensemble.forest.RandomForestClassifier):
+        print(f"Model out of bag accuracy: {classifier.oob_score_}")
+    classifier.n_jobs = 1  # fix amount of jobs to one, otherwise apply will be very slow
+    return classifier