Switch to unified view

a b/third_party/nucleus/util/cigar.py
1
# Copyright 2018 Google LLC.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
# 1. Redistributions of source code must retain the above copyright notice,
8
#    this list of conditions and the following disclaimer.
9
#
10
# 2. Redistributions in binary form must reproduce the above copyright
11
#    notice, this list of conditions and the following disclaimer in the
12
#    documentation and/or other materials provided with the distribution.
13
#
14
# 3. Neither the name of the copyright holder nor the names of its
15
#    contributors may be used to endorse or promote products derived from this
16
#    software without specific prior written permission.
17
#
18
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
# POSSIBILITY OF SUCH DAMAGE.
29
"""Utility functions for working with alignment CIGAR operations.
30
31
The CIGAR format is defined within the SAM spec, available at
32
https://samtools.github.io/hts-specs/SAMv1.pdf
33
34
This module provides utility functions for interacting with the parsed
35
representations of CIGAR strings.
36
"""
37
38
from __future__ import absolute_import
39
from __future__ import division
40
from __future__ import print_function
41
42
import re
43
import six
44
from third_party.nucleus.protos import cigar_pb2
45
46
# A frozenset of all CigarUnit.Operation enum values that advance the alignment
47
# with respect to the reference genome and read, respectively.
48
REF_ADVANCING_OPS = frozenset([
49
    cigar_pb2.CigarUnit.ALIGNMENT_MATCH,
50
    cigar_pb2.CigarUnit.SEQUENCE_MATCH,
51
    cigar_pb2.CigarUnit.DELETE,
52
    cigar_pb2.CigarUnit.SKIP,
53
    cigar_pb2.CigarUnit.SEQUENCE_MISMATCH
54
])
55
READ_ADVANCING_OPS = frozenset([
56
    cigar_pb2.CigarUnit.ALIGNMENT_MATCH,
57
    cigar_pb2.CigarUnit.SEQUENCE_MATCH,
58
    cigar_pb2.CigarUnit.INSERT,
59
    cigar_pb2.CigarUnit.CLIP_SOFT,
60
    cigar_pb2.CigarUnit.SEQUENCE_MISMATCH
61
])
62
63
# A map from CigarUnit.Operation (e.g., CigarUnit.ALIGNMENT_MATCH) enum values
64
# to their corresponding single character cigar codes (e.g., 'M').
65
CIGAR_OPS_TO_CHAR = {
66
    cigar_pb2.CigarUnit.ALIGNMENT_MATCH: 'M',
67
    cigar_pb2.CigarUnit.INSERT: 'I',
68
    cigar_pb2.CigarUnit.DELETE: 'D',
69
    cigar_pb2.CigarUnit.SKIP: 'N',
70
    cigar_pb2.CigarUnit.CLIP_SOFT: 'S',
71
    cigar_pb2.CigarUnit.CLIP_HARD: 'H',
72
    cigar_pb2.CigarUnit.PAD: 'P',
73
    cigar_pb2.CigarUnit.SEQUENCE_MATCH: '=',
74
    cigar_pb2.CigarUnit.SEQUENCE_MISMATCH: 'X',
75
}
76
77
# A map from single character cigar codes (e.g., 'M') to their corresponding
78
# CigarUnit.Operation (e.g., CigarUnit.ALIGNMENT_MATCH) enum values.
79
CHAR_TO_CIGAR_OPS = {v: k for k, v in CIGAR_OPS_TO_CHAR.items()}
80
81
# All of the CigarUnit.Operation values in a frozen set.
82
ALL_CIGAR_OPS = frozenset(CIGAR_OPS_TO_CHAR.keys())
83
84
# Regular expression that matches only valid full cigar strings.
85
VALID_CIGAR_RE = re.compile(
86
    r'^(\d+[' + ''.join(CHAR_TO_CIGAR_OPS.keys()) + '])+$')
87
88
# Regular expression that matches a single len/op cigar element. The match is
89
# grouped, so CIGAR_STR_SPLITTER_RE.finditer(cigar_str) returns grouped units
90
# of the cigar string in order.
91
CIGAR_STR_SPLITTER_RE = re.compile(
92
    r'(\d+[' + ''.join(CHAR_TO_CIGAR_OPS.keys()) + '])')
93
94
95
def format_cigar_units(cigar_units):
96
  """Returns the string version of an iterable of CigarUnit protos.
97
98
  Args:
99
    cigar_units: iterable[CigarUnit] protos.
100
101
  Returns:
102
    A string representation of the CigarUnit protos that conforms to the
103
    CIGAR string specification.
104
  """
105
  return ''.join(
106
      str(unit.operation_length) + CIGAR_OPS_TO_CHAR[unit.operation]
107
      for unit in cigar_units)
108
109
110
def parse_cigar_string(cigar_str):
111
  """Parse a cigar string into a list of cigar units.
112
113
  For example, if cigar_str is 150M2S, this function will return:
114
115
  [
116
    CigarUnit(operation=ALIGNMENT_MATCH, operation_length=150),
117
    CigarUnit(operation=SOFT_CLIP, operation_length=2)
118
  ]
119
120
  Args:
121
    cigar_str: str containing a valid cigar.
122
123
  Returns:
124
    list[cigar_pb2.CigarUnit].
125
126
  Raises:
127
    ValueError: If cigar_str isn't a well-formed CIGAR.
128
  """
129
  if not cigar_str:
130
    raise ValueError('cigar_str cannot be empty')
131
  if not VALID_CIGAR_RE.match(cigar_str):
132
    raise ValueError('Malformed CIGAR string {}'.format(cigar_str))
133
  parts = CIGAR_STR_SPLITTER_RE.finditer(cigar_str)
134
  return [to_cigar_unit(part.group(1)) for part in parts]
135
136
137
def alignment_length(cigar_units):
138
  """Computes the span in basepairs of the cigar units.
139
140
  Args:
141
    cigar_units: iterable[CigarUnit] whose alignment length we want to compute.
142
143
  Returns:
144
    The number of basepairs spanned by the cigar_units.
145
  """
146
  return sum(unit.operation_length
147
             for unit in cigar_units
148
             if unit.operation in REF_ADVANCING_OPS)
149
150
151
def to_cigar_unit(source):
152
  """Creates a cigar_pb2 CigarUnit from source.
153
154
  This function attempts to convert source into a CigarUnit protobuf. If
155
  source is a string, it must be a single CIGAR string specification like
156
  '12M'. If source is a tuple or a list, must have exactly two elements
157
  (operation_length, opstr). operation_length can be a string or int, and must
158
  be >= 1. opstr should be a single character CIGAR specification (e.g., 'M').
159
  If source is already a CigarUnit, it is just passed through unmodified.
160
161
  Args:
162
    source: many types allowed. The object we want to convert to a CigarUnit
163
      proto.
164
165
  Returns:
166
    CigarUnit proto with operation_length and operation set to values from
167
      source.
168
169
  Raises:
170
    ValueError: if source cannot be converted or is malformed.
171
  """
172
  try:
173
    if isinstance(source, cigar_pb2.CigarUnit):
174
      return source
175
    elif isinstance(source, six.string_types):
176
      l, op = source[:-1], source[-1]
177
    elif isinstance(source, (tuple, list)):
178
      l, op = source
179
    else:
180
      raise ValueError('Unexpected source', source)
181
182
    if isinstance(op, six.string_types):
183
      op = CHAR_TO_CIGAR_OPS[op]
184
    l = int(l)
185
    if l < 1:
186
      raise ValueError('Length must be >= 1', l)
187
    return cigar_pb2.CigarUnit(operation=op, operation_length=int(l))
188
  except (KeyError, IndexError):
189
    raise ValueError('Failed to convert {} into a CigarUnit'.format(source))
190
191
192
def to_cigar_units(source):
193
  """Converts object to a list of CigarUnit.
194
195
  This function attempts to convert source into a list of CigarUnit protos.
196
  If source is a string, we assume it is a CIGAR string and call
197
  parse_cigar_string on it, returning the result. It not, we assume it's an
198
  iterable containing element to be converted with to_cigar_unit(). The
199
  resulting list of converted elements is returned.
200
201
  Args:
202
    source: str or iterable to convert to CigarUnit protos.
203
204
  Returns:
205
    list[CigarUnit].
206
  """
207
  if isinstance(source, six.string_types):
208
    return parse_cigar_string(source)
209
  else:
210
    return [to_cigar_unit(singleton) for singleton in source]