[45ad7e]: / singlecellmultiomics / statistic / conversions.py

Download this file

129 lines (109 with data), 4.8 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import seaborn as sns
import matplotlib.pyplot as plt
from .statistic import StatisticHistogram
import singlecellmultiomics.pyutils as pyutils
import collections
import pandas as pd
import matplotlib
matplotlib.rcParams['figure.dpi'] = 160
matplotlib.use('Agg')
class ConversionMatrix(StatisticHistogram):
def __init__(self, args, process_reads=200_000):
StatisticHistogram.__init__(self, args)
self.conversion_obs = collections.defaultdict(collections.Counter)
self.base_obs = collections.defaultdict(collections.Counter)
self.stranded_base_conversions = collections.defaultdict(
collections.Counter)
self.processed_reads = 0
self.process_reads = process_reads
def processRead(self, R1,R2):
for read in [R1,R2]:
if read is None:
continue
if self.processed_reads >= self.process_reads or read is None or read.is_unmapped or read.mapping_quality < 30:
return
self.processed_reads += 1
try:
for index, reference_pos, reference_base in read.get_aligned_pairs(
with_seq=True, matches_only=True):
query_base = read.seq[index]
reference_base = reference_base.upper()
if reference_base != 'N' and query_base != 'N':
k = (
query_base, 'R1' if read.is_read1 else (
'R2' if read.is_read2 else 'R?'), ('forward', 'reverse')[
read.is_reverse])
self.base_obs[reference_base][k] += 1
if reference_base != query_base:
self.conversion_obs[reference_base][k] += 1
if read.has_tag('RS'):
k = (
query_base,
'R1' if read.is_read1 else (
'R2' if read.is_read2 else 'R?'),
read.get_tag('RS'))
self.stranded_base_conversions[reference_base][k] += 1
except ValueError: # Fails when the MD tag is not present
continue
def __repr__(self):
return f'Observed base conversions'
def get_df(self):
return pd.DataFrame(
self.base_obs).fillna(0).sort_index(
axis=0).sort_index(
axis=1)
def plot(self, target_path, title=None):
df = pd.DataFrame(
self.conversion_obs).fillna(0).sort_index(
axis=0).sort_index(
axis=1)
cm = sns.clustermap(df, row_cluster=True, col_cluster=True)
ax = cm.ax_heatmap
ax.set_yticklabels(ax.get_yticklabels(), rotation=0)
cm.ax_heatmap.set_title('Raw conversions')
cm.ax_heatmap.set_xlabel('reference base')
cm.ax_heatmap.set_ylabel('sequenced base')
plt.subplots_adjust(left=0, right=0.8)
if title is not None:
cm.ax_heatmap.set_title(title)
#
plt.savefig(target_path.replace('.png', f'.conversions.png'))
plt.close()
df = self.get_df()
cm = sns.clustermap(df, row_cluster=False, col_cluster=False)
ax = cm.ax_heatmap
ax.set_yticklabels(ax.get_yticklabels(), rotation=0)
cm.ax_heatmap.set_title('Raw conversions')
cm.ax_heatmap.set_xlabel('reference base')
cm.ax_heatmap.set_ylabel('sequenced base')
#plt.tight_layout(pad=0.4, w_pad=0.8, h_pad=1.0)
plt.subplots_adjust(left=0, right=0.8)
if title is not None:
cm.ax_heatmap.set_title(title)
plt.savefig(target_path.replace('.png', f'.base_obs.png'))
plt.close()
if len(self.stranded_base_conversions) == 0:
return
df = pd.DataFrame(
self.stranded_base_conversions).fillna(0).sort_index(
axis=0).sort_index(
axis=1)
cm = sns.clustermap(df, row_cluster=False, col_cluster=False)
ax = cm.ax_heatmap
ax.set_yticklabels(ax.get_yticklabels(), rotation=0)
cm.ax_heatmap.set_title('Raw conversions')
cm.ax_heatmap.set_xlabel('reference base')
cm.ax_heatmap.set_ylabel('sequenced base')
#plt.tight_layout(pad=0.4, w_pad=0.8, h_pad=1.0)
plt.subplots_adjust(left=0, right=0.8)
if title is not None:
cm.ax_heatmap.set_title(title)
plt.savefig(
target_path.replace(
'.png',
f'.RS_TAG_strand_conversions.png'))
plt.close()
def to_csv(self, path):
self.get_df().to_csv(path)