"""
Rnalib is a python utilities library for handling genomics data with a focus on transcriptomics.
It implements a transcriptome model that can be instantiated from various popular
GFF 'flavours' as published in encode, ensembl, ucsc, chess, mirgenedb, mirbase, wormbase and flybase databases.
Several filtering options are available to filter transcripts by gene type, biotype, etc.
The transcriptome model is implemented as a hierarchical tree of frozen dataclasses
(derived from the 'Feature' class) and supports incremental and flexible annotation of
transcriptome features. It also supports the extraction of genomic sequences for
transcriptome features.
The transcriptome model supports genomic range queries via `query()` which are
implemented by a combination of interval and linear search queries. A transcriptome
object maintains one intervaltree per chromosome built from gene annotations.
Overlap/envelop queries will first be applied to the respective intervaltree and the
(typically small result sets) will then be filtered, e.g., for requested sub-feature types.
"""
__version__ = "0.0.4"
import heapq
import sys
from abc import abstractmethod, ABC
from collections import Counter, abc
from dataclasses import dataclass, make_dataclass # type: ignore # import dataclass to avoid PyCharm warnings
from itertools import chain
from os import PathLike
from typing import List, Callable, NamedTuple, Any, Tuple, Iterable
import HTSeq
import bioframe
import dill
import pyranges
from intervaltree import IntervalTree
from more_itertools import pairwise, triplewise, windowed, peekable
from sortedcontainers import SortedList, SortedSet
import biotite.sequence as seq
from .constants import *
from .interfaces import Archs4Dataset
from .testdata import get_resource, list_resources
from .tools import *
from .utils import *
import logging
logging.basicConfig(stream=sys.stderr, level=logging.INFO)
# location of the test data directory. Use the 'RNALIB_TESTDATA' environment variable or monkey patching to set to your
# favourite location, e.g., rnalib.__RNALIB_TESTDATA__ = "your_path'
__RNALIB_TESTDATA__ = os.environ.get("RNALIB_TESTDATA")
# ------------------------------------------------------------------------
# Genomic Interval (gi) model
# ------------------------------------------------------------------------
[docs]
class GI(NamedTuple):
"""
Genomic intervals (GI) in rnalib are inclusive, continuous and 1-based. This model differs from other
genomics libraries but was chosen to make interpretation of GIs straightforward: start and end coordinates
represent the first/last included nucleotide as also seen in a genome browser (such as IGV).
GIs are implemented as (readonly) namedtuples and can safely be used as keys in a dict.
They are instantiated by the gi() factory method that either parses from a string representation or by passing
explicitly chrom/start/stop coordinates. Coordinate components can be 'unrestricted' by setting them to
None, e.g., gi('chr1', 1, None) refers to the whole chromosome 1, gi('chr1', 100000) refers to the section on
chr1 from (and including) position 100k on, gi(start=100, end=200) refers to positions 100-200 (inclusive) on
any chromosome.
Intervals can be stranded. The strand can be '+' or '-' or None if unstranded. Note that '.' will be converted
to None. Individual genomic positions (points) are represented by intervals with same start and stop
coordinate. Empty intervals can be represented by start>end coordinates (e.g., gi('chr1', 1,0).is_empty() ->
True) but note that this feature is still experimental and may not be supported in all methods.
GIs can be compared for equality, overlap, envelopment, etc. and can be sorted by chromosome and coordinates.
GIs are grouped by chromosomes/contigs of a given reference genome and their order is defined by a
`RefDict` (that extends regular python dicts). RefDicts are typically instantiated automatically
from indexing data structures (e.g., .tbi or .bai files) and keep track of the order of chromosomes/contigs
and their lengths. They are used throughout rnalib to assert the compatibility of different genomic datasets
and to sort GIs by chromosome and coordinate, e.g., via
`sorted(gis, key=lambda x: (refdict.index(x.chromosome), x))`. Note that the index of chromosome 'None'
is always 0, i.e., it is the first chromosome in the sorted list.
GIs can be iterated over to yield individual positions (points) as GIs, e.g.,
tuple(gi('chr2', 1, 3)) -> (gi('chr2', 1, 1), gi('chr2', 2, 2), gi('chr2', 3, 3)).
GIs should be instantiated by the `gi()` factory method that allows to configure each coordinate component
individually or can parse from a string representation, e.g., `gi('chr1:1-10 (+)')`. The factory method
conducts type and value checking and conversions (e.g., '.' to None for strand).
Attributes
----------
chromosome: str
Chromosome (default: None)
start: int
First included position, 1-based (default: 0).
end: int
Last included position, 1-based (default: MAX_INT)
strand: str
Strand, either '+' or '-' or None if unstranded. Note that '.' will be converted to None. Default: None
Examples
--------
>>> gi('chr3')
>>> gi('chr2', 1, 10)
>>> gi('1', 1, 10, strand='+')
>>> gi('chr4:1-10 (-)')
>>> assert gi('chrX:2-1').is_empty()
Notes
-----
Note that there is also a frozen (immutable) dataclass version of this class, `GI_dataclass` that
shares most functionality with GIs and is used as superclass for genomic features (genes, transcripts,
etc.) as instantiated by the `Transcriptome` class. The reason for this separation is that dataclass
instantiation is relatively slow and has performance impacts when instantiating large sets of GIs.
"""
chromosome: str = None
start: int = 0 # unbounded, ~-inf
end: int = rna.MAX_INT # unbounded, ~+inf
strand: str = None
def __len__(self):
if self.is_empty(): # empty intervals have zero length
return 0
if self.start == 0 or self.end == MAX_INT:
return (
MAX_INT # length of (partially) unbounded intervals is always max_int.
)
return self.end - self.start + 1
[docs]
@classmethod
def from_str(cls, loc_string):
"""Parse from <chr>:<start>-<end> (<strand>). Strand is optional"""
pattern = re.compile(r"(\w+):(\d+)-(\d+)(?:[\s]*\(([+-])\))?$") # noqa
match = pattern.findall(loc_string.strip().replace(",", "")) # convenience
if len(match) == 0:
return None
chromosome, start, end, strand = match[0]
strand = None if strand == "" else strand
return gi(chromosome, int(start), int(end), strand)
[docs]
@staticmethod
def sort(intervals, refdict, reverse: bool = False):
"""Returns a chromosome + coordinate sorted iterable over the passed intervals. Chromosome order is defined by
the passed reference dict."""
try:
return sorted(
intervals,
key=lambda x: (refdict.index(x.chromosome), x),
reverse=reverse,
)
except TypeError as e:
logging.error(
f"Error sorting intervals, are all interval chromosomes defined in the refdict?: {e}"
)
raise e
def __repr__(self):
if self.is_empty():
return f"{self.chromosome}:<empty>"
return f"{self.chromosome}:{self.start}-{self.end}{'' if self.strand is None else f' ({self.strand})'}"
[docs]
def get_stranded(self, strand):
"""Get a new object with same coordinates; the strand will be set according to the passed variable."""
return gi(self.chromosome, self.start, self.end, strand)
[docs]
def to_file_str(self):
"""returns a sluggified string representation "<chrom>_<start>_<end>_<strand>" """
return f"{self.chromosome}_{self.start}_{self.end}_{'u' if self.strand is None else self.strand}"
[docs]
def is_unbounded(self):
return [self.chromosome, self.start, self.end, self.strand] == [
None,
0,
MAX_INT,
None,
]
[docs]
def is_empty(self):
return self.start > self.end
[docs]
def is_stranded(self):
return self.strand is not None
[docs]
def cs_match(self, other, strand_specific=False):
"""True if this location is on the same chrom/strand as the passed one.
will not compare chromosomes if they are unrestricted in one of the intervals.
Empty intervals always return False hee
"""
if strand_specific and self.strand != other.strand:
return False
if (
self.chromosome
and other.chromosome
and (self.chromosome != other.chromosome)
):
return False
return True
def __cmp__(self, other, cmp_str, refdict=None):
if not self.cs_match(other, strand_specific=False):
if refdict is not None:
return getattr(refdict.index(self.chromosome), cmp_str)(
refdict.index(other.chromosome)
)
return None
if self.start != other.start:
return getattr(self.start, cmp_str)(other.start)
return getattr(self.end, cmp_str)(other.end)
def __lt__(self, other):
"""
Test whether this interval is smaller than the other.
Defined only on same chromosome but allows unrestricted coordinates.
If chroms do not match, None is returned.
"""
return self.__cmp__(other, "__lt__")
def __le__(self, other):
"""
Test whether this interval is smaller or equal than the other.
Defined only on same chromosome but allows unrestricted coordinates.
If chroms do not match, None is returned.
"""
return self.__cmp__(other, "__le__")
def __gt__(self, other):
"""
Test whether this interval is greater than the other.
Defined only on same chromosome but allows unrestricted coordinates.
If chroms do not match, None is returned.
"""
return self.__cmp__(other, "__gt__")
def __ge__(self, other):
"""
Test whether this interval is greater or equal than the other.
Defined only on same chromosome but allows unrestricted coordinates.
If chroms do not match, None is returned.
"""
return self.__cmp__(other, "__ge__")
[docs]
def left_match(self, other, strand_specific=False):
if not self.cs_match(other, strand_specific):
return False
return self.start == other.start
[docs]
def right_match(self, other, strand_specific=False):
if not self.cs_match(other, strand_specific):
return False
return self.end == other.end
[docs]
def left_pos(self):
return gi(self.chromosome, self.start, self.start, strand=self.strand)
[docs]
def right_pos(self):
return gi(self.chromosome, self.end, self.end, strand=self.strand)
[docs]
def envelops(self, other, strand_specific=False) -> bool:
"""Tests whether this interval envelops (i.e., contains) the passed one."""
if self.is_unbounded(): # envelops all
return True
if self.is_empty() or other.is_empty(): # zero overlap with empty intervals
return False
if not self.cs_match(other, strand_specific):
return False
return self.start <= other.start and self.end >= other.end
[docs]
def overlaps(self, other, strand_specific=False) -> bool:
"""Tests whether this interval overlaps the passed one.
Supports unrestricted start/end coordinates and optional strand check
"""
if self.is_unbounded() or other.is_unbounded(): # overlaps all
return True
if self.is_empty() or other.is_empty(): # zero overlap with empty intervals
return False
if not self.cs_match(other, strand_specific):
return False
return self.start <= other.end and other.start <= self.end
[docs]
def overlap(self, other, strand_specific=False) -> int:
"""Calculates the overlap with the passed genomic interval.
Supports unrestricted start/end coordinates and optional strand check
If the intervals are not on the same chromosome or strand, 0 is returned.
If the intervals are empty, 0 is returned.
If the intervals are adjacent, 0 is returned.
If the intervals do not overlap, 0 is returned.
"""
if self.is_unbounded() or other.is_unbounded(): # overlaps all
return MAX_INT
if self.is_empty() or other.is_empty(): # zero overlap with empty intervals
return 0
if not self.cs_match(other, strand_specific):
return 0
return max(0, min(self.end, other.end) - max(self.start, other.start) + 1.0)
[docs]
def split_coordinates(self) -> (str, int, int):
return self.chromosome, self.start, self.end
def __add__(self, other):
"""Returns a new interval that is the union of this and the passed interval.
Use merge() to merge a list of intervals.
If and int is passed, an interval that is extended by the passed number of nucleotides is returned.
"""
assert isinstance(other, (GI, int)), "Can only add GIs or ints"
if isinstance(other, int):
if self.end + other <= 0:
return gi(self.chromosome, 0, -1, self.strand) # return empty interval
return gi(
self.chromosome,
self.start,
self.end + other,
self.strand,
)
if not self.cs_match(other, strand_specific=True):
return self.copy()
return gi(
self.chromosome,
min(self.start, other.start),
max(self.end, other.end),
self.strand,
)
def __radd__(self, other):
"""Reverse add, see https://docs.python.org/3/reference/datamodel.html#emulating-numeric-types
Note that sum() is also possible on lists of GIs. sum starts with the int 0 and recursively adds the GIs.
"""
return self.__add__(other)
def __sub__(self, other, strand_specific=False):
"""Returns a new interval that is the difference of this and the passed interval.
If intervals do not overlap, a copy of the original interval is returned.
If the passed interval is fully contained in this interval, the passed interval is removed.
If the passed interval overlaps with this interval, the overlapping parts are removed.
Returns
-------
GenomicInterval or list of GenomicIntervals
"""
assert isinstance(other, (GI, int)), "Can only subtract GIs or ints"
if isinstance(other, int):
if self.end - other <= 0:
return gi(self.chromosome, 0, -1, self.strand) # return empty interval
return gi(
self.chromosome,
self.start,
self.end - other,
self.strand,
)
if self.overlaps(other, strand_specific=strand_specific):
s1, e1, s2, e2 = self.start, self.end, other.start, other.end
# aaa | aaa | aaa | aaa | aaa
# bbb | bbb | b | bbb | bbbbb
# a | a | a a | |
if s1 < s2:
if e1 <= e2:
return gi(self.chromosome, s1, s2 - 1, self.strand)
else:
return [
gi(self.chromosome, s1, s2 - 1, self.strand),
gi(self.chromosome, e2 + 1, e1, self.strand),
]
else: # s1 >= s2
if e1 > e2:
return gi(self.chromosome, e2 + 1, e1, self.strand)
else:
return gi(
self.chromosome, 0, -1, self.strand
) # return empty interval
return self.copy()
[docs]
@classmethod
def join(cls, *locs, refdict, join_adjacent=False):
"""joins lists of intervals. Overlapping intervals are merged.
Parameters
----------
locs: list of lists of GenomicIntervals
List of lists of GenomicIntervals to be joined.
refdict: RefDict
Reference dictionary to sort the intervals by chromosome and coordinate.
join_adjacent: bool
If True, adjacent intervals are joined as well.
Examples
--------
>>> intervals = [gi("1:3-9"), gi("1:2-6"), gi("1:8-10"), gi("1:15-18")]
>>> assert list(GI.join(intervals, refdict=rna.RefDict({'1':None}))) == [gi("1:2-10"), gi("1:15-18")]
>>> intervals = [[gi("1:3-9"), gi("1:2-6")], [gi("1:8-10"), gi("1:15-18")]]
>>> assert list(GI.join(*intervals, refdict=rna.RefDict({'1':None}))) == [gi("1:2-10"), gi("1:15-18")]
"""
lst = GI.sort(sum(locs, []), refdict)
last = None
for loc in lst:
if last is None:
last = loc
continue
if last.overlaps(loc):
last = last + loc
elif join_adjacent and last.is_adjacent(loc):
last = last + loc
else:
yield last
last = loc
if last is not None:
yield last
[docs]
@classmethod
def merge(cls, loc):
"""Merges a list of intervals.
If intervals are not on the same chromosome or if strand is not matching, None is returned
The resulting interval will inherit the chromosome and strand of the first passed one.
This is equal to sum() on a list of GIs sharing the same chromosome and strand.
Users may apply this method w/o checking for this condition though.
Examples
--------
>>> GI.merge([gi('chr1', 1, 10), gi('chr1', 11, 20)])
"""
if loc is None:
return None
assert not isinstance(
loc, GI
), "Argument is a GI. Usage example: GI.merge([gi1, gi2, ...])"
assert isinstance(
loc, Iterable
), "Argument is not iterable. Usage example: GI.merge([gi1, gi2, ...])"
first = next(iter(loc))
if len(loc) == 1:
return first
if not all([x.cs_match(first, strand_specific=True) for x in loc]):
return None # check strand and chrom
chroms, starts, ends, strands = zip(
*[(x.chromosome, x.start, x.end, x.strand) for x in loc]
)
chrom = None if None in chroms else chroms[0] # unrestricted
return gi(chrom, min(starts), max(ends), strands[0])
[docs]
def is_adjacent(self, other, strand_specific=False):
"""true if intervals are directly next to each other (not overlapping!)"""
if not self.cs_match(other, strand_specific=strand_specific):
return False
a, b = (
(self.end + 1, other.start)
if self.end < other.end
else (other.end + 1, self.start)
)
return a == b
[docs]
def get_downstream(self, width=100):
"""Returns an upstream genomic interval"""
if self.is_stranded():
s, e = (
(self.end + 1, self.end + width)
if self.strand == "+"
else (self.start - width, self.start - 1)
)
return gi(self.chromosome, s, e, self.strand)
else:
return None
[docs]
def get_upstream(self, width=100):
"""Returns an upstream genomic interval"""
if self.is_stranded():
s, e = (
(self.end + 1, self.end + width)
if self.strand == "-"
else (self.start - width, self.start - 1)
)
return gi(self.chromosome, s, e, self.strand)
else:
return None
[docs]
def get_extended(self, width=100):
"""Returns a genomic interval that is extended up- and downstream by width nt"""
return gi(self.chromosome, self.start - width, self.end + width, self.strand)
[docs]
def split_by_maxwidth(self, maxwidth):
"""Splits this into n intervals of maximum width"""
k, m = divmod(self.end - self.start + 1, maxwidth)
ret = [
gi(
self.chromosome,
self.start + i * maxwidth,
self.start + (i + 1) * maxwidth - 1,
strand=self.strand,
)
for i in range(k)
]
if m > 0:
ret += [
gi(
self.chromosome,
self.start + k * maxwidth,
self.end,
strand=self.strand,
)
]
return ret
[docs]
def copy(self):
"""Returns a copy of this gi"""
return gi(self.chromosome, self.start, self.end, self.strand)
[docs]
def distance(self, other, strand_specific=False):
"""
Distance to other interval.
- None if chromosomes do not match
- 0 if intervals overlap, 1 if they are adjacent
- negative if other < self
"""
if self.cs_match(other, strand_specific=strand_specific):
if self.overlaps(other):
return 0
return other.start - self.end if other > self else other.end - self.start
return None
[docs]
def to_bed(self, name=None):
"""Returns a corresponding bed4 string representation"""
name = self.to_file_str() if name is None else name
return f"{self.chromosome}\t{self.start - 1}\t{self.end}\t{name}"
[docs]
def to_bed12(self, name=None, score=0, rgb="0,0,0"):
"""Returns a corresponding bed12 string representation:
chrom, start, end, name, score, strand, thickStart, thickEnd, itemRgb, blockCount, blockSizes, blockStarts
"""
strand = "." if self.strand is None else self.strand
return f"{self.to_bed(name)}\t{score}\t{strand}\t{self.start - 1}\t{self.end}\t{rgb}\t0\t{len(self)}\t0"
[docs]
def to_htseq(self):
"""
Returns a corresponding HTSeq interval object.
Note that this will fail on open or empty intervals as those are not supported by HTSeq.
Examples
--------
>>> gi('chr1',1,10).to_htseq()
>>> gi('chr1',1,10, strand='-').to_htseq()
>>> gi('chr1').to_htseq()
"""
return HTSeq.GenomicInterval(
self.chromosome,
self.start - 1,
self.end,
strand="." if self.strand is None else self.strand,
)
def __iter__(self):
"""Iterates over all positions in this interval.
Cannote be applied to unbounded or empty intervals.
example:
>>> list(gi('chr1', 1, 3))
"""
assert (
not self.is_unbounded() and not self.is_empty()
), "Cannot iterate over unbounded/empty genomic intervals"
for pos in range(self.start, self.end + 1):
yield gi(self.chromosome, pos, pos, self.strand)
[docs]
def gi(chromosome: str = None, start: int = 0, end: int = MAX_INT, strand: str = None):
"""Factory function for genomic intervals (GI).
Examples
--------
>>> gi('chr1', 1, 10, strand='-') # explicit coordinates
>>> gi('chr1:1-10 (-)') # parsed from string
>>> gi('chr1') # whole chromosome
>>> gi() # unbounded interval
"""
if chromosome is not None and ":" in chromosome:
return GI.from_str(chromosome)
start = 0 if start is None else start
end = rna.MAX_INT if end is None else end
if strand == ".":
strand = None
elif strand is not None:
assert strand in [None, "+", "-"]
if start > end: # empty interval, set start/end to 0/-1
start, end = 0, -1
return GI(chromosome, start, end, strand)
[docs]
@dataclass(frozen=True, init=True, slots=True)
class GI_dataclass: # noqa
"""
Dataclass for genomic intervals (GIs) in rnalib.
Copies the functionality of the namedtuple GI, but is slower to instantiate due to the post_init assertions.
Needed for `Feature` hierarchies and other dataclasses that need to be frozen.
"""
chromosome: str = None
start: int = 0 # unbounded, ~-inf
end: int = MAX_INT # unbounded, ~+inf
strand: str = None
def __post_init__(self):
"""Some sanity checks and default values.
This is slow, adds ~500ns per object creation, so we disable it for performance critical code.
"""
object.__setattr__(self, "start", 0 if self.start is None else self.start)
object.__setattr__(self, "end", MAX_INT if self.end is None else self.end)
object.__setattr__(self, "strand", self.strand if self.strand != "." else None)
assert isinstance(self.start, numbers.Number)
assert isinstance(self.end, numbers.Number)
if self.start > self.end: # empty interval, set start/end to 0/-1
object.__setattr__(self, "start", 0)
object.__setattr__(self, "end", -1)
assert self.strand in [None, "+", "-"]
# copy the methods from the GI named tuple implementation
__len__ = GI.__len__
__repr__ = GI.__repr__
get_stranded = GI.get_stranded
to_file_str = GI.to_file_str
is_unbounded = GI.is_unbounded
is_empty = GI.is_empty
is_stranded = GI.is_stranded
cs_match = GI.cs_match
__cmp__ = GI.__cmp__
__lt__ = GI.__lt__
__le__ = GI.__le__
__gt__ = GI.__gt__
__ge__ = GI.__ge__
left_match = GI.left_match
right_match = GI.right_match
left_pos = GI.left_pos
right_pos = GI.right_pos
envelops = GI.envelops
overlaps = GI.overlaps
overlap = GI.overlap
split_coordinates = GI.split_coordinates
__add__ = GI.__add__
__radd__ = GI.__radd__
__sub__ = GI.__sub__
join = GI.join
merge = GI.merge
is_adjacent = GI.is_adjacent
get_downstream = GI.get_downstream
get_upstream = GI.get_upstream
get_extended = GI.get_extended
split_by_maxwidth = GI.split_by_maxwidth
copy = GI.copy
distance = GI.distance
to_pybedtools = GI.to_pybedtools
to_htseq = GI.to_htseq
__iter__ = GI.__iter__
def _transcript_to_bed(idx, tx, item): # noqa
"""Default conversion of transcripts to BED12 format.
CDS are used as thick_start/thick_end if available, otherwise the transcript start/end is used.
Note that the BED12 format is 0-based, half-open, i.e., the end coordinate is not included.
Parameters
----------
idx : int
Index of the transcript in the transcriptome
tx : Transcript
Transcript feature
item : Any
Item associated with the transcript (not used here)
Returns
-------
tuple
name, score, thick_start, thick_end, rgb, block_count, block_sizes, block_starts as required by BED12
"""
if hasattr(tx, "CDS") and len(tx.CDS) > 0:
thick_start, thick_end = (
(tx.CDS[-1].start - 1, tx.CDS[0].end)
if tx.strand == "-"
else (tx.CDS[0].start - 1, tx.CDS[-1].end)
)
else:
thick_start, thick_end = (tx.start - 1, tx.end)
if hasattr(tx, "exon"):
block_count = len(tx.exon)
block_sizes, block_starts = zip(
*[
(str(len(ex)), str(ex.start - tx.start))
for ex in (reversed(tx.exon) if tx.strand == "-" else tx.exon)
]
)
else:
block_count, block_sizes, block_starts = 0, ("0",), (str(len(tx)),)
color = (
"255,153,153" if tx.strand == "-" else "153,255,153"
) # this works in standalone IGV but not JS !
return (
tx.feature_id,
".",
thick_start,
thick_end,
color,
block_count,
",".join(block_sizes),
",".join(block_starts),
)
[docs]
class FixedKeyTypeDefaultdict(defaultdict):
"""
A defaultdict that allows only keys of a certain type.
Examples
--------
>>> d = rna.FixedKeyTypeDefaultdict(defaultdict, allowed_key_type=int)
>>> d[1] = 1 # this works, key is an int
>>> import pytest
>>> with pytest.raises(TypeError) as e_info:
>>> d['x'] = 2 # this raises a TypeError as key is a string
>>> d.allowed_key_type = str # you can change the allowed key type
>>> d['x'] = 2 # now it works
>>> assert dill.loads(dill.dumps(d)).allowed_key_type == str # pickling/dill works
"""
def __init__(self, /, *args, **kwargs):
self.allowed_key_type = kwargs.pop(
"allowed_key_type", None
) # remove it from kwargs
super().__init__(*args, **kwargs)
# check if all keys are of the allowed type
if self.allowed_key_type is not None and not all(
[isinstance(key, self.allowed_key_type) for key in self.keys()]
):
raise TypeError(
f"Only {self.allowed_key_type} objects can be added to this dict."
)
def __setitem__(self, key, value):
# check key type
if (self.allowed_key_type is not None) and (
not isinstance(key, self.allowed_key_type)
):
raise TypeError(
f"Only {self.allowed_key_type} objects can be added to this dict, you passed a {type(key)} object"
)
super().__setitem__(key, value)
def __getstate__(self):
return self.allowed_key_type # needed for pickling
def __setstate__(self, state):
self.allowed_key_type = state # needed for pickling
def __missing__(self, key):
# check key type
if (self.allowed_key_type is not None) and (
not isinstance(key, self.allowed_key_type)
):
raise TypeError(
f"Only {self.allowed_key_type} objects can be added to this dict, you passed a {type(key)} object"
)
return super().__missing__(key)
def __reduce__(self):
"""Pickle support. See https://docs.python.org/3/library/pickle.html#object.__reduce__
add allowed_key_type to the state
"""
c, arg, state, it1, it2 = super().__reduce__()
state = self.allowed_key_type
return c, arg, state, it1, it2
[docs]
class Transcriptome:
"""
Represents a transcriptome as modelled by a GTF/GFF file.
* Model contains genes, transcripts and arbitrary sub-features (e.g., exons, intron, 3'/5'-UTRs, CDS) as
defined in the GFF file. Frozen dataclasses (derived from the 'Feature' class) are created for all parsed
feature types automatically and users may configure which GTF/GFF attributes will be added to those (and are
thus accessible via dot notation, e.g., gene.gene_type).
* This `transcriptome` implementation exploits the hierarchical relationship between genes and their
sub-features to optimize storage and computational requirements, see the `Feature` documentation for
examples. To enable this, however, parent features *must* envelop (i.e., completely contain) child feature
locations and this requirement is asserted when building the `transcriptome`.
* A `transcriptome` maintains an `anno` dict mapping (frozen) features to dicts of arbitrary annotation
values. This supports incremental and flexible annotation of `transcriptome` features. Values can directly
be accessed via dot notation <feature>.<attribute> and can be stored/loaded to/from a (pickled) file.
* `Feature` sequences can be added via `load_sequences()` which will extract the sequence of the top-level
feature ('gene') from the configured reference genome. Sequences can then be accessed via get_sequence().
For sub-features (e.g., transcripts, exons, etc.) the respective sequence will be sliced from the gene
sequence. If mode='rna' is passed, the sequence is returned in 5'-3' orientation, i.e., they are
reverse-complemented for minus-strand transcripts. The returned sequence will, however, still use the DNA
alphabet (ACTG) to enable direct alignment/comparison with genomic sequences.
if mode='spliced', the spliced 5'-3' sequence will be returned.
if mode='translated', the spliced 5'-3' CDS sequence will be returned.
* Genomic range queries via `query()` are supported by a combination of interval and linear search queries.
A transcriptome object maintains one intervaltree per chromosome built from gene annotations.
Overlap/envelop queries will first be applied to the respective intervaltree and the (typically small
result sets) will then be filtered, e.g., for requested sub-feature types.
* When building a transcriptome model from a GFF/GTF file, contained transcripts can be filtered using a
:func:`TranscriptFilter <rnalib.TranscriptFilter>`.
* | The current implementation does not implement the full GFF3 format as specified in
| https://github.com/The-Sequence-Ontology/Specifications/blob/master/gff3.md
| but currently supports various popular gff 'flavours' as published in
| encode, ensembl, ucsc, chess, mirgendb, wormbase and flybase databases (see
| :func:`GFF_FLAVOURS <rnalib.constants.GFF_FLAVOURS>`). As such this implementation will likely be extended
| in the future.
* Limitations: Currently, subfeatures can have only one parent feature which must envelop the subfeature
location. Otherwise, the subfeature will not be added to the transcriptome.
@see the `README.ipynb <https://github.com/popitsch/rnalib/blob/main/notebooks/README.ipynb>`_ jupyter
notebook for various querying and iteration examples
Parameters
----------
annotation_gff : str
Path to a GFF/GTF file containing the transcriptome annotations.
annotation_flavour : str
The annotation flavour of the GFF/GTF file.
Currently built-in formats: encode, ensembl, ucsc, chess, mirgenedb, mirbase, wormbase and flybase.
If a custom format is used, pass a dict with the format specification, see GFF_FLAVOURS for examples.
genome_fa : str
Path to a FASTA file containing the reference genome. If provided, sequences will be loaded and can be
accessed via get_sequence().
gene_name_alias_file : str
Path to a gene name alias file. If provided, gene symbols will be normalized using the alias file.
annotation_fun_alias : Callable
Name of a function that will be used to alias gene names.
The function must be defined in the global namespace and must accept a gene name and return the alias.
If provided, gene symbols will be normalized using the alias function.
copied_fields : set
Set of additional GFF/GTF info field namess that will be copied to the respective feature annotations beyond
the ones specified in the 'copied_fields' section of the annotation flavour format.
calc_introns : bool
If true, intron features will be added to transcripts that do not have them.
load_sequence_data : bool
If true, sequences will be loaded from the genome_fa file.
disable_progressbar : bool
If true, progressbars will be disabled.
genome_offsets : dict
A dict mapping chromosome names to offsets. If provided, the offset will be added to all coordinates.
name : str
An (optional) human-readable name of the transcriptome object. default: 'Transcriptome'
feature_filter : TranscriptFilter
A TranscriptFilter object that will be used to filter transcripts.
Attributes
----------
genes : List[Gene]
List of genes in the transcriptome.
transcripts : List[Transcript]
List of transcripts in the transcriptome.
anno : Dict[Feature, Dict[str, Any]]
Dictionary mapping features (e.g., genes, transcripts) to their annotations.
Examples
--------
>>> fb_gff = ... # path to flybase gff file
>>> t = rna.Transcriptome(annotation_gff = fb_gff, annotation_flavour ='flybase')
>>> # download full flybase GTF to temporary directory, use the create_testdata function to download, sort and index
>>> with tempfile.TemporaryDirectory() as tmpdir: # noqa
>>> res = {"flybase_full": {
>>> "uri": "https://ftp.flybase.net/releases/current/dmel_r6.55/gtf/dmel-all-r6.55.gtf.gz",
>>> "filename": "dmel-all-r6.55.gtf.gz"}}
>>> rna.testdata.create_testdata(tmpdir, res) # download, sort + index; needs bgzip, bedtools, tabix
>>> t2 = rna.Transcriptome(annotation_gff = rna.get_resource("flybase_full", tmpdir, res), annotation_flavour ='flybase')
>>> # instantiate Ensembl with chrom aliasing using a sliced genome FASTA file
>>> config = {
>>> 'genome_fa': rna.get_resource('ACTB+SOX2_genome'), # a sliced genome FASTA file; part of rnalib testdata
>>> 'genome_offsets': {'chr3': 181711825, 'chr7': 5526309}, # genome offsets for the sliced genome
>>> 'annotation_gff': rna.get_resource('ensembl_gff'), # ensembl GFF file; part of rnalib testdata
>>> 'annotation_flavour': 'ensembl', # ensembl annotation flavour
>>> 'annotation_fun_alias': 'toggle_chr', # aliasing function for chromosome names; will all/remove 'chr' prefix
>>> 'load_sequence_data': True # load sequences from genome_fa
>>> }
>>> t3 = rna.Transcriptome(**config) # instantiate transcriptome with config dict
>>> import random
>>> tx = random.choice(t3.transcripts) # get a random transcript
>>> print(tx.gene.gene_name) # print the name of the gene this transcript belongs to
"""
def __init__(
self,
annotation_gff: str,
annotation_flavour: str | dict,
genome_fa: str = None,
gene_name_alias_file: str = None,
annotation_fun_alias: Callable = None,
copied_fields: set = None,
load_sequence_data: bool = False,
calc_introns: bool = True,
disable_progressbar: bool = False,
genome_offsets: dict = None,
name="Transcriptome",
feature_filter=None,
custom_feature_types: list = None,
):
self.annotation_gff = annotation_gff
assert os.path.exists(
self.annotation_gff
), f"Annotation file {self.annotation_gff} not found."
self.file_format = guess_file_format(self.annotation_gff)
if isinstance(annotation_flavour, dict): # custom format passed
self.annotation_flavour = 'custom'
self.fmt = annotation_flavour
else:
self.annotation_flavour = annotation_flavour.lower()
assert ((self.annotation_flavour, self.file_format,)
in GFF_FLAVOURS), (f"Unsupported annotation flavour {self.annotation_flavour}/{self.file_format}. "
f"Supported:\n") + ", ".join([f"{k}/{v}" for k, v in GFF_FLAVOURS])
self.fmt = GFF_FLAVOURS[self.annotation_flavour, self.file_format]
self.genome_fa = genome_fa
self.gene_name_alias_file = gene_name_alias_file
self.annotation_fun_alias = annotation_fun_alias
# get GFF aliasing function
if self.annotation_fun_alias is not None:
assert (
self.annotation_fun_alias in globals()
), f"fun_alias func {self.annotation_fun_alias} undefined in globals()"
self.annotation_fun_alias = globals()[self.annotation_fun_alias]
logging.info(
f"Using aliasing function for annotation_gff: {self.annotation_fun_alias}"
)
self.copied_fields = set(self.fmt['copied_fields'])
if copied_fields is not None:
self.copied_fields = self.copied_fields | set(copied_fields)
self.load_sequence_data = load_sequence_data
self.calc_introns = calc_introns
self.disable_progressbar = disable_progressbar
self.genome_offsets = {} if genome_offsets is None else genome_offsets
self.name = name
self.feature_filter = (
TranscriptFilter()
if feature_filter is None
else TranscriptFilter(feature_filter)
if isinstance(feature_filter, dict)
else feature_filter
)
self.log = Counter()
self.merged_refdict = None
self.gene = {} # gid: gene
self.transcript = {} # tid: gene
self.cached = False # if true then transcriptome was loaded from a pickled file
self.has_seq = False # if true, then gene objects are annotated with the respective genomic (dna) sequences
self.anno = FixedKeyTypeDefaultdict(
defaultdict
) # a dict of dicts that holds annotation data for each feature
self.chr2itree = (
{}
) # a dict mapping chromosome ids to annotation interval trees.
self.genes = [] # list of genes
self.transcripts = [] # list of transcripts
self.duplicate_gene_names = (
{}
) # dict mapping gene names to lists of genes with the same name
self._ft2anno_class: dict[
str, dict
] = None # mapping feature types to annotation fields parsed from GFF
self._ft2child_ftype = None # mapping feature types to child feature types
self._ft2subclass = None # mapping feature types to the respective subclass
self.is_sorted = (
False # if true, then the annotation dict keys are sorted by location
)
self.custom_feature_types = custom_feature_types # list of custom feature types
self._first_add_experimental_warning = (
False # for displaying the experimental warning only once
)
self.build() # build the transcriptome object
[docs]
def build(self):
# reset log
self.log = Counter()
# read gene aliases (optional)
aliases, current_symbols = (
(None, None)
if self.gene_name_alias_file is None
else read_alias_file(
self.gene_name_alias_file, disable_progressbar=self.disable_progressbar
)
)
# estimate valid chromosomes
rd = (
[]
if self.genome_fa is None
else [RefDict.load(open_file_obj(self.genome_fa))]
)
rd += [
RefDict.load(
open_file_obj(self.annotation_gff), fun_alias=self.annotation_fun_alias
)
]
self.merged_refdict = RefDict.merge_and_validate(
*rd, check_order=False, included_chrom=self.feature_filter.get_chromosomes()
)
assert len(self.merged_refdict) > 0, "No shared chromosomes!"
# iterate gff
genes = {}
transcripts = {}
customs = {}
line_number = 0
for chrom in tqdm(
self.merged_refdict,
f"Building transcriptome ({len(self.merged_refdict)} chromosomes)\n",
disable=self.disable_progressbar,
):
# PASS 1: build gene objects
filtered_gene_ids = set()
with GFF3Iterator(
self.annotation_gff, chrom, fun_alias=self.annotation_fun_alias
) as lit:
try:
info = None
for line_number, (loc, info) in enumerate(lit):
self.log["parsed_gff_lines"] += 1
feature_type = self.fmt["ftype_to_SO"].get(
info["feature_type"], None
)
if feature_type == "gene": # build gene object
# filter...
filtered, filter_message = self.feature_filter.filter(
loc, info, feature_type
)
if filtered:
self.log[
f"filtered_{info['feature_type']}_{filter_message}"
] += 1
filtered_gene_ids.add(info.get(self.fmt["gid"], "None"))
continue
gid = info.get(self.fmt["gid"], "None")
if gid is None:
warnings.warn(
f"Skipping {self.annotation_flavour} {self.file_format} line "
f"{line_number + 1} ({info['feature_type']}), info:\n\t{info} as no "
f"gene_id found."
)
continue
genes[gid] = _Feature(
self,
"gene",
gid,
loc,
parent=None,
children={"transcript": []},
)
for cf in self.copied_fields:
genes[gid].anno[cf] = info.get(cf, None)
genes[gid].anno["gene_name"] = norm_gn(
info.get(self.fmt["gene_name"], gid),
current_symbols,
aliases,
) # normalized gene symbol/name
genes[gid].anno["gff_feature_type"] = info["feature_type"]
except Exception as exc:
logging.error(
f"ERROR parsing {self.annotation_flavour} {lit.file_format} at line "
f"{line_number + 1}, info:\n\t{info}"
)
raise exc
# PASS 2: build transcript objects and add missing gene annotations
missing_genes = {}
with GFF3Iterator(
self.annotation_gff, chrom, fun_alias=self.annotation_fun_alias
) as lit:
try:
for line_number, (loc, info) in enumerate(lit):
feature_type = self.fmt["ftype_to_SO"].get(
info["feature_type"], None
)
if feature_type == "transcript": # build tx object
# filter...
filtered, filter_message = self.feature_filter.filter(
loc, info, feature_type
)
if filtered:
self.log[
f"filtered_{info['feature_type']}_{filter_message}"
] += 1
continue
# get transcript and gene id
tid = info.get(self.fmt["tid"], None)
if tid is None:
warnings.warn(
f"Skipping {self.annotation_flavour} {self.file_format} line "
f" {line_number + 1} ({info['feature_type']}), info:\n\t{info} as no"
f" {self.fmt['tid']} field found."
)
continue
gid = (
f"gene_{tid}"
if self.fmt["tx_gid"] is None
else info.get(self.fmt["tx_gid"], None)
)
if gid is None:
warnings.warn(
f"Skipping {self.annotation_flavour} {self.file_format} line "
f"{line_number + 1} {info['feature_type']}), info:\n\t{info} as no "
f" {self.fmt['tx_gid']} field found."
)
continue
if gid in filtered_gene_ids:
self.log[
f"filtered_{info['feature_type']}_parent_gene_filtered"
] += 1
continue
parent_gene = genes.get(gid, None)
if parent_gene and not parent_gene.loc.envelops(loc):
warnings.warn(
f"Gene {gid} does not envelop transcript {tid}, skipping tx"
)
self.log[
f"filtered_{info['feature_type']}_not_enveloped_by_gene_filtered"
] += 1
continue
# create transcript object
transcripts[tid] = _Feature(
self,
"transcript",
tid,
loc,
parent=genes.get(gid, None),
children={
k: []
for k in set(self.fmt["ftype_to_SO"].values()) - {"gene", "transcript", None}
},
)
for cf in self.copied_fields:
transcripts[tid].anno[cf] = info.get(cf, None)
transcripts[tid].anno["gff_feature_type"] = info[
"feature_type"
]
# add missing gene annotation (e.g., ucsc, flybase, chess)
if gid not in genes:
if gid in missing_genes:
newloc = GI.merge([missing_genes[gid].loc, loc])
if newloc is None:
# special case, e.g., in Chess annotation/tx CHS.40038.9 is annotated on the
# opposite strand. We skip this tx and keep the gene annotation.
# In chess 3.0.1 there are 3 such entries, all pseudo genes.
# This happens also in RefSeq annotations, e.g., human Gene Q9Y6F8
warnings.warn(
f"Gene {gid} has tx with incompatible coordinates! "
f"{missing_genes[gid].loc} vs {loc}, skipping tx {tid}"
)
self.log[
(
f"filtered_"
f"{info['feature_type']}_incompatible_coordinates_filtered"
)
] += 1
del transcripts[tid]
else:
missing_genes[gid].loc = newloc
missing_genes[gid].children[
"transcript"
].append(
transcripts[tid]
) # add child
else:
missing_genes[gid] = _Feature(
self,
"gene",
gid,
loc,
parent=None,
children={"transcript": [transcripts[tid]]},
)
for cf in self.copied_fields:
missing_genes[gid].anno[cf] = info.get(cf, None)
missing_genes[gid].anno["gene_id"] = gid
missing_genes[gid].anno["gene_name"] = norm_gn(
info.get(self.fmt["gene_name"], gid),
current_symbols,
aliases,
)
# normalized gene symbol/name
else: # add as child
genes[gid].children["transcript"].append(
transcripts[tid]
)
for gid, mg in missing_genes.items():
genes[gid] = missing_genes[gid]
except Exception as exc:
logging.error(
f"ERROR parsing {self.annotation_flavour} {lit.file_format} at line"
f" {line_number + 1}, info:\n\t{info}"
)
raise exc
# PASS 3: add features
allowed_feature_types = set(self.fmt["ftype_to_SO"].values()) - {
"gene",
"transcript",
None,
}
# {'CDS', 'exon', 'five_prime_UTR', 'intron', 'three_prime_UTR'}
with GFF3Iterator(
self.annotation_gff, chrom, fun_alias=self.annotation_fun_alias
) as lit:
try:
for line_number, (loc, info) in enumerate(lit):
feature_type = self.fmt["ftype_to_SO"].get(
info["feature_type"], None
)
if (
feature_type in allowed_feature_types
): # supported feature types
# filter...
filtered, filter_message = self.feature_filter.filter(
loc, info, feature_type
)
# feature types that are mapped to None are filtered (e.g., wormbase pseudogenic_transcript)
if filtered or feature_type is None:
self.log[
f"filtered_{info['feature_type']}_{filter_message}"
] += 1
continue
# get transcript and gene id
tid = info.get(self.fmt["feat_tid"], None)
if (tid is None) or (
tid not in transcripts
): # no parent tx found
self.log[
f"filtered_{info['feature_type']}_no_tx_found"
] += 1
continue
feature_id = f"{tid}_{feature_type}_{len(transcripts[tid].children[feature_type])}"
feature = _Feature(
self,
feature_type,
feature_id,
loc,
parent=transcripts[tid],
children={},
)
for cf in self.copied_fields:
feature.anno[cf] = info.get(cf, None)
feature.anno["gff_feature_type"] = info["feature_type"]
transcripts[tid].children[feature_type].append(feature)
except Exception as exc:
logging.error(
f"ERROR parsing {self.annotation_flavour} {lit.file_format} at line"
f" {line_number + 1}, info:\n\t{info}"
)
raise exc
# PASS 4: custom features
if self.custom_feature_types is not None:
with GFF3Iterator(
self.annotation_gff,
region=chrom,
fun_alias=self.annotation_fun_alias,
feature_types=self.custom_feature_types,
) as lit:
try:
for line_number, (loc, info) in enumerate(lit):
feature_type = info["feature_type"]
# filter...
filtered, filter_message = self.feature_filter.filter(
loc, info, feature_type
)
if filtered:
self.log[
f"filtered_{info['feature_type']}_{filter_message}"
] += 1
continue
# get feature id (uses same GFF attribute as for genes)
feature_id = info.get(self.fmt["gid"], None)
if feature_id is None:
self.log[
f"filtered_{info['feature_type']}_no_feature_id"
] += 1
continue
customs[feature_id] = _Feature(
self,
feature_type,
feature_id,
loc,
parent=None,
children={},
)
logging.debug(
f"Adding custom feature of type {feature_type}"
)
for cf in self.copied_fields:
customs[feature_id].anno[cf] = info.get(cf, None)
customs[feature_id].anno["gff_feature_type"] = info[
"feature_type"
]
except Exception as exc:
logging.error(
f"ERROR parsing {self.annotation_flavour} {lit.file_format} at line"
f" {line_number + 1}, info:\n\t{info}"
)
raise exc
# drop genes w/o transcripts (e.g., after filtering)
for k in [k for k, v in genes.items() if len(v.children["transcript"]) == 0]:
self.log["dropped_empty_genes"] += 1
genes.pop(k, None)
# add intron features if not parsed
if self.calc_introns:
for tid, tx in transcripts.items():
if ("exon" not in tx.children) or (len(tx.children["exon"]) <= 1):
continue
strand = tx.loc.strand
for rnk, (ex0, ex1) in enumerate(pairwise(tx.children["exon"])):
loc = gi(
tx.loc.chromosome, ex0.loc.end + 1, ex1.loc.start - 1, strand
)
if loc.is_empty():
continue # TODO: what happens to rnk?!
feature_type = "intron"
feature_id = (
f"{tid}_{feature_type}_{len(tx.children[feature_type])}"
)
intron = _Feature(
self, feature_type, feature_id, loc, parent=tx, children={}
)
# copy fields from previous exon
intron.anno = ex0.anno.copy()
# add to transcript only if this is non-empty
ex0.parent.children[feature_type].append(intron)
# step1: create custom dataclasses
self._ft2anno_class = {} # contains annotation fields parsed from GFF
self._ft2child_ftype = {} # feature 2 child feature types
fts = set()
for g in genes.values():
a, t, s = g.get_anno_rec()
self._ft2anno_class.update(a)
self._ft2child_ftype.update(t)
fts.update(s)
for c in customs.values():
a, t, s = c.get_anno_rec()
self._ft2anno_class.update(a)
self._ft2child_ftype.update(t)
fts.update(s)
self._ft2subclass = {
ft: Feature.create_sub_class(
ft, self._ft2anno_class.get(ft, {}), self._ft2child_ftype.get(ft, [])
)
for ft in fts
}
# step2: freeze and add to auxiliary data structures
self.genes = [g.freeze(self._ft2subclass) for g in genes.values()]
all_features = list()
for g in self.genes:
all_features.append(g)
for f in g.features():
all_features.append(f)
# add custom features
customs = [c.freeze(self._ft2subclass) for c in customs.values()]
all_features.extend(customs)
# sort by chromosome and start coordinate
all_features.sort(key=lambda x: (self.merged_refdict.index(x.chromosome), x))
self.anno = FixedKeyTypeDefaultdict(defaultdict, {f: {} for f in all_features})
# self.anno = FixedKeyTypeDefaultdict({f: {} for f in all_features})
# assert that parents intervals always envelop their children
for f in all_features:
if f.parent is not None:
assert f.parent.envelops(
f
), f"parent intervals must envelop their child intervals: {f.parent}.envelops({f})==False"
# build some auxiliary dicts
self.transcripts = [f for f in all_features if f.feature_type == "transcript"]
self.gene = {f.feature_id: f for f in all_features if f.feature_type == "gene"}
self.gene.update(
{f.gene_name: f for f in all_features if f.feature_type == "gene"}
)
self.transcript = {f.feature_id: f for f in self.transcripts}
# Create a dict with genes that share the same gene_name (buf different ids), such as PAR genes
self.duplicate_gene_names = Counter(g.gene_name for g in self.genes)
self.duplicate_gene_names = {
x: list() for x, count in self.duplicate_gene_names.items() if count > 1
}
for g in [
g for g in self.genes if g.gene_name in self.duplicate_gene_names.keys()
]:
self.duplicate_gene_names[g.gene_name].append(g)
# load sequences
if self.load_sequence_data:
self.load_sequences()
# build interval trees
for g in tqdm(
self.genes,
desc=f"Build interval trees",
total=len(self.genes),
disable=self.disable_progressbar,
):
if g.chromosome not in self.chr2itree:
self.chr2itree[g.chromosome] = IntervalTree()
# add 1 to end coordinate, see itree conventions @ https://github.com/chaimleib/intervaltree
self.chr2itree[g.chromosome].addi(g.start, g.end + 1, g)
self.is_sorted = True
[docs]
def load_sequences(self):
"""Loads feature sequences from a genome FASTA file.
Requires a 'genome_fa' config entry.
"""
# show or hide progressbar
assert self.genome_fa is not None, "No genome_fa file provided"
with pysam.Fastafile(self.genome_fa) as fasta:
for g in tqdm(
self.genes,
desc="Load sequences",
total=len(self.genes),
disable=self.disable_progressbar,
):
start = g.start - self.genome_offsets.get(g.chromosome, 1)
end = g.end - self.genome_offsets.get(g.chromosome, 1) + 1
prefix = ""
if (
start < 0
): # add 'N' prefix if coordinates start before (offset-corrected) FASTA
prefix = "N" * abs(start)
start = 0
self.anno[g]["dna_seq"] = prefix + fasta.fetch(
reference=g.chromosome, start=start, end=end
)
# add 'N' postfix if coordinates exceed available sequence in fasta
self.anno[g]["dna_seq"] += "N" * (len(g) - len(self.anno[g]["dna_seq"]))
self.has_seq = True
def __getitem__(self, key):
"""Returns the feature with the passed feature_id"""
if isinstance(key, str):
if key in self.gene:
return self.gene[key]
return self.transcript[key]
elif isinstance(key, GI) or isinstance(key, GI_dataclass):
return self.anno[key]
else:
raise TypeError(
f"Index must be a GI or a feature id string, not {type(key).__name__}"
)
[docs]
def get(self, key, default=None):
"""Returns the feature with the passed feature_id or the default value if not found"""
try:
return self[key]
except KeyError:
return default
[docs]
def add(
self,
location: GI,
feature_id: str,
feature_type: str,
parent=None,
children: tuple = (),
sort=True,
): # -> Feature:
"""
Adds a feature to the transcriptome
Parameters
----------
location : GI
Genomic interval of the feature
feature_id : str
Feature id
feature_type : str
Feature type
parent : Feature
Parent feature
children : tuple
Children features
sort : bool
If true, the annotation dict will be sorted by location
"""
assert location.chromosome in self.merged_refdict, (
f"Chromosome {location.chromosome} not in RefDict of this " f"transcriptome"
)
if not self._first_add_experimental_warning:
warnings.warn(
"Adding custom features is still an experimental feature in rnalib. Expect bugs!"
)
self._first_add_experimental_warning = True
f = Feature(
chromosome=location.chromosome,
start=location.start,
end=location.end,
strand=location.strand,
transcriptome=self,
feature_id=feature_id,
feature_type=feature_type,
parent=parent,
subfeature_types=children,
)
self.anno[f] = {}
self.is_sorted = False
if sort:
self.sort() # for now: by default: sort the annotation dict
return f
[docs]
def sort(self):
if not self.is_sorted:
self.anno = FixedKeyTypeDefaultdict(
defaultdict,
dict(
sorted(
self.anno.items(),
key=lambda x: (self.merged_refdict.index(x[0].chromosome), x),
)
),
)
self.is_sorted = True
[docs]
def find_attr_rec(self, f, attr):
"""recursively finds attribute from parent(s)"""
if f is None:
return None, None
if attr in self.anno[f]:
return f, self.anno[f][attr]
return self.find_attr_rec(f.parent, attr)
[docs]
def get_sequence(self, feature, mode="dna", show_exon_boundaries=False):
"""
Returns the sequence of the passed feature.
- If mode is 'rna' then the reverse complement of negative-strand features (using a DNA alphabet) will be
returned.
- if mode is 'spliced', the fully spliced sequence of a transcript will be returned. This will always use
'rna' mode and is valid only for containers of exons.
- if mode is 'translated' then the CDS sequence is reported. To access the amino-acid sequence,
`tx.get_protein_sequence(.)`can be used which will return a biotite ProteinSequence object.
- else, the 5'-3' DNA sequence (as shown in a genome browser) of this feature is returned
`show_exon_boundaries=True` can be used to insert '*' characters at splicing junctions of spliced/translated
sequences.
"""
if mode == "spliced":
assert (
"exon" in feature.subfeature_types
), "Can only splice features that have annotated exons"
sep = "*" if show_exon_boundaries else ""
fseq = self.get_sequence(feature, mode="dna")
if fseq is None:
return None
if feature.strand == "-":
seq = reverse_complement(
sep.join(
[
fseq[(ex.start - feature.start): (ex.start - feature.start) + len(ex)]
for ex in reversed(feature.exon)
]
)
)
else:
seq = sep.join(
[
fseq[(ex.start - feature.start): (ex.start - feature.start) + len(ex)]
for ex in feature.exon
]
)
elif mode == "translated":
assert (
"CDS" in feature.subfeature_types
), "Can only translate features that have annotated CDS"
sep = "*" if show_exon_boundaries else ""
fseq = self.get_sequence(feature, mode="dna")
if fseq is None:
return None
if feature.strand == "-":
seq = reverse_complement(
sep.join(
[
fseq[(cds.start - feature.start): (cds.start - feature.start) + len(cds)]
for cds in reversed(feature.CDS)
]
)
)
else:
seq = sep.join(
[
fseq[(cds.start - feature.start): (cds.start - feature.start) + len(cds)]
for cds in feature.CDS
]
)
else:
p, pseq = self.find_attr_rec(feature, "dna_seq")
if p is None:
return None
if p == feature:
seq = pseq
else:
idx = feature.start - p.start
seq = pseq[idx: idx + len(feature)] # slice from parent sequence
if (
(seq is not None) and (mode == "rna") and (feature.strand == "-")
): # revcomp if rna mode and - strand
seq = reverse_complement(seq)
return seq
[docs]
def slice_from_parent(self, f, attr, default_value=None):
"""
Gets an attr from the passed feature or its predecessors (by traversing the parent/child relationships).
If retrieved from an (enveloping) parent interval, the returned value will be sliced.
Use only to access attributes that contain one item per genomic position (e.g, arrays of per-position
values)
"""
p, pseq = self.find_attr_rec(f, attr)
if p is None:
return default_value
if p == f:
return pseq
else:
idx = f.start - p.start
return pseq[idx: idx + len(f)] # slice from parent sequence
[docs]
def gene_triples(self, max_dist=None):
"""
Convenience method that yields genes and their neighbouring (up-/downstream) genes.
If max_dist is set and the neighbours are further away (or on other chromosomes),
None is returned.
To iterate over all neighbouring genes within a given genomic window, consider query()
or implement a custom iterator.
"""
for (x, y, z) in triplewise(chain([None], self.genes, [None])):
if max_dist is not None:
dx = None if x is None else x.distance(y)
if (dx is None) or (abs(dx) > max_dist):
x = None
dz = None if z is None else z.distance(y)
if (dz is None) or (abs(dz) > max_dist):
z = None
yield x, y, z
[docs]
def query(self, query, feature_types=None, envelop=False, sort=True):
"""
Query features of the passed class at the passed query location.
Parameters
----------
query : GenomicInterval or string that is parsed by GI.from_str()
Query interval
feature_types : str or List[str]
Feature types to query. If None, all feature types will be queried.
envelop : bool
If true, only features fully contained in the query interval are returned.
sort : bool
If true, the returned features will be sorted by chromosome and start coordinate.
"""
if isinstance(query, str):
query = gi(query)
if query.chromosome not in self.chr2itree:
return []
if isinstance(feature_types, str):
feature_types = (feature_types,)
# add 1 to end coordinate, see itree conventions @ https://github.com/chaimleib/intervaltree
overlapping_genes = [
x.data
for x in self.chr2itree[query.chromosome].overlap(
query.start, query.end + 1
)
]
overlapping_features = (
overlapping_genes
if (feature_types is None) or ("gene" in feature_types)
else []
)
if envelop:
overlapping_features = [
g for g in overlapping_features if query.envelops(g)
]
for g in overlapping_genes:
if envelop:
overlapping_features += [
f for f in g.features(feature_types) if query.envelops(f)
]
else:
overlapping_features += [
f for f in g.features(feature_types) if query.overlaps(f)
]
if sort:
overlapping_features.sort(
key=lambda x: (self.merged_refdict.index(x.chromosome), x)
)
return overlapping_features
[docs]
def annotate(
self,
anno_its,
fun_anno,
labels=None,
region=None,
feature_types=None,
disable_progressbar=True,
):
"""Annotates all features of the configured type and in the configured genomic region using the passed fun_anno
function.
NOTE: consider removing previous annotations with the clear_annotations() functions before (re-)annotating
a transcriptome.
"""
with AnnotationIterator(
TranscriptomeIterator(self, region=region, feature_types=feature_types),
anno_its,
labels,
) as lit:
for item in (pbar := tqdm(lit, disable=disable_progressbar)):
pbar.set_description(f"buffer_size={[len(x) for x in lit.buffer]}")
fun_anno(item)
[docs]
def annotate_with_mygene(
self,
fields,
attribute="feature_id",
species=None,
anno_prefix="",
region=None,
batch_size=1000,
default_value=None,
disable_progressbar=True,
):
"""Annotates all gene features in the configured genomic region using the MyGene.info API.
This method requires an internet connection and will send batch requests to MyGene.info for the
configured fields, see https://docs.mygene.info/en/latest/doc/data.html#available-fields for available
annotation fields.
The submitted gene ids will be accessed via the configured "attribute" (default: feature_id) and must be in
Entrez (“1017”) or Ensembl (“ENSG00000123374.1”) format. For Ensembl ids, the method will remove the minor
qualifier (e.g., ENSG00000123374.1 -> ENSG00000123374).
Please see here for mygene.info terms of use: https://docs.mygene.info/en/latest/terms.html
Parameters
----------
fields : list or dict
Data fields that will be retrieved from mygene. If a {field: annotation_name} dict is passed, the
values will be used as annotation names. Otherwise, the field names will be used as annotation names
with dots replaced by underscores (e.g., if 'pathway.kegg' is requested, the annotation name will be
'pathway_kegg'). Final annotation names will be prefixed by the anno_prefix parameter.
Available fields and data types are in rna.mygeneinfo_fields. Note that for keyword and object fields,
_rnalib_ converts the results always to a list using mygene.alwayslist().
attribute : str
The attribute to be used for gene id retrieval. Default: 'feature_id'
species : str
Species to be used for annotation. Default: None
anno_prefix : str
Prefix for the annotation keys. Default: ''. Annotation fields will be named as
f'{anno_prefix}{annotation_name}'
region : GenomicInterval
Genomic region to be annotated. If None, the whole transcriptome will be annotated.
batch_size : int
Batch size for the API requests. Default: 1000
default_value : Any
Default value for missing annotations. Default: None
disable_progressbar : bool
If true (default), progressbars will be disabled.
Returns
-------
The number of annotated genes
"""
mg = mygene.MyGeneInfo() # create mygene object
# assert fields are valid and convert to dict if necessary
if not isinstance(fields, dict):
fields = to_list(fields)
fields = {f: f.replace(".", "_") for f in fields}
assert all(
f in rna.mygeneinfo_fields for f in fields.keys()
), f"Invalid field(s) {fields}. Valid fields: {rna.mygeneinfo_fields}"
ftypes = [rna.mygeneinfo_fields[f] for f in fields.keys()]
# iterate genes in batches
genes = [g for g, _ in self.iterator(region=region, feature_types="gene")]
for batch in tqdm(
rna.split_list(genes, n=batch_size, is_chunksize=True),
disable=disable_progressbar,
):
# extract list of ids
gids = [getattr(g, attribute) for g in batch]
if len(gids) == 0:
warnings.warn(
f"No gene ids found in the transcriptome for attribute {attribute}"
)
return 0
# remove minor qualifier for ensembl gene ids.
if gids[0].startswith("ENSG"):
gids = [x.split(".")[0] for x in gids]
# query mygene
mgi = mg.getgenes(
gids, species=species, fields=list(fields.keys()), verbose=False
)
# annotate
for g, m in zip(batch, mgi):
for (f, anno), ftype in zip(fields.items(), ftypes):
# get value from mygene dict. Field qualifiers are converted to paths
val = rna.get_config(
m, f.replace(".", "/"), default_value=default_value
)
if val is not None and ftype in ["object", "keyword"]:
val = mygene.alwayslist(val)
self.anno[g][f"{anno_prefix}{anno}"] = val
return len(genes)
[docs]
def save(self, out_file):
"""
Stores this transcriptome and all annotations as dill (pickle) object.
Note that this can be slow for large-scaled transcriptomes and will produce large ouput files.
Consider using save_annotations()/load_annotations() to save/load only the annotation dictionary.
"""
with open(out_file, "wb") as out:
dill.dump(self, out, recurse=True)
# byref=True cannot vbe used as dynamically created dataclasses are not supported yet
[docs]
@classmethod
def load(cls, in_file):
"""Load transcriptome from pickled file"""
import gc
gc.disable() # disable garbage collector
with open(in_file, "rb") as infile:
obj = dill.load(infile)
gc.enable()
obj.cached = True
return obj
[docs]
def clear_annotations(self, retain_keys=("dna_seq",)):
"""
Clears this transcriptome's annotations (except for retain_keys annotations (by default: 'dna_seq')).
"""
for a in self.anno:
if retain_keys is None:
self.anno[a] = {}
else:
for k in {k for k in self.anno[a].keys() if k not in retain_keys}:
del self.anno[a][k]
[docs]
def save_annotations(self, out_file, excluded_keys=("dna_seq",)):
"""
Stores this transcriptome annotations as dill (pickle) object.
Note that the data is stored not by object reference but by comparison
key, so it can be assigned to newly created transcriptome objects.
Parameters
----------
out_file : str
The output file name
excluded_keys : tuple or set
The keys to be excluded from the output file. Default: ('dna_seq',)
"""
keys = {a for adict in self.anno.values() for a in adict.keys()}
if excluded_keys is not None:
keys = keys ^ set(excluded_keys)
logging.info(f"Storing annotations keys: {keys} to {out_file}")
with open(out_file, "wb") as out:
dill.dump(
{
k.key(): {x: v[x] for x in v.keys() & keys}
for k, v in self.anno.items()
if len(v.keys() & keys) > 0
},
out,
recurse=True,
)
[docs]
def load_annotations(self, in_file, update=False):
"""
Loads annotation data from the passed dill (pickle) object.
If update is true, the current annotation dictionary will be updated.
Parameters
----------
in_file : str
The input file name
update : bool
If true, the current annotation dictionary will be updated.
So, existing annotations with different keys will be kept.
"""
with open(in_file, "rb") as infile:
anno = dill.load(infile)
logging.info(f"Loaded {len(anno)} annotations from {in_file}")
k2o = {f.key(): f for f in self.anno}
for k, v in anno.items():
assert k in k2o, f"Could not find target feature for key {k}"
if update:
self.anno[k2o[k]].update(v)
else:
self.anno[k2o[k]] = v
[docs]
def to_gff3(
self,
out_file,
bgzip=True,
feature_types=(
"gene",
"transcript",
"exon",
"intron",
"CDS",
"three_prime_UTR",
"five_prime_UTR",
),
):
"""
Writes a GFF3 file with all features of the configured types.
Parameters
----------
out_file : str
The output file name
bgzip : bool
If true, the output file will be bgzipped and tabixed.
feature_types : tuple
The feature types to be included in the output file.
For the used feature type names, see
@see https://github.com/The-Sequence-Ontology/Specifications/blob/master/gff3.md
Returns
-------
the name of the (bgzipped) output file
Examples
--------
>>> transcriptome = ...
>>> # create a file containing all intron annotations
>>> transcriptome.to_gff3('introns.gff3', feature_types=['intron'])
"""
def write_line(o, feature_type, data_dict, out_stream):
print(
"\t".join(
[
str(x)
for x in [
o.chromosome,
"rnalib",
feature_type,
o.start,
o.end,
to_str(o.score if hasattr(o, "score") else None, na="."),
"." if o.strand is None else o.strand,
to_str(o.phase if hasattr(o, "phase") else None, na="."),
to_str([f"{k}={v}" for k, v in data_dict.items()], sep=";"),
]
]
),
file=out_stream,
)
copied_fields = [x for x in self.copied_fields if x not in ["score", "phase"]]
with (open(out_file, "w") as out):
with self.iterator(feature_types=feature_types) as lit:
for f, dat in lit:
if f.feature_type == "gene":
info = {"ID": f.feature_id, "gene_name": f.gene_name}
else:
info = {"ID": f.feature_id}
if f.parent:
info["Parent"] = f.parent.feature_id
info.update(
{k: getattr(f, k) for k in copied_fields if hasattr(f, k)}
) # add copied fields
write_line(f, f.feature_type, info, out)
if bgzip:
bgzip_and_tabix(out_file)
return out_file + ".gz"
return out_file
[docs]
def to_bed(
self,
out,
region=None,
feature_types=("transcript",),
fun_anno=_transcript_to_bed,
bed_header=None,
disable_progressbar=True,
no_header=False,
):
"""Outputs transcripts of this transcriptome in BED format.
Pass your custom annotation function via fun_anno to output custom BED fields (e.g., color based on
transcript type).
Parameters
----------
out : file-like object
The output file-like object.
region : gi or str
The genomic region to be included in the output file. If None (default), all features will be included.
feature_types : tuple
The feature types to be included in the output file (e.g., feature_types=('transcript',)).
Custom feature types need to be added here if they should be serialized.
Default: ('transcript',) (all transcripts).
fun_anno : function
A function that takes a feature index, a location and a feature as input and returns a tuple of
strings that will be added to the BED output file.
bed_header : dict
A dict containing the BED header fields. If None (default), a default header will be used.
disable_progressbar : bool
If true, the progressbar will be disabled.
no_header : bool
If true, no header will be written to the output file.
Example
-------
>>> transcriptome = ...
>>> transcriptome.to_bed('transcripts.bed')
"""
if bed_header is None:
bed_header = {
"name": self.name,
"description": self.name,
"useScore": 0,
"itemRgb": "On",
}
return self.iterator(region=region, feature_types=feature_types).to_bed(
out, fun_anno, bed_header, disable_progressbar, no_header
)
def __len__(self):
return len(self.anno)
def __repr__(self):
return (
f"{self.name} with {len(self.genes)} genes and {len(self.transcripts)} tx"
+ (" (+seq)" if self.has_seq else "")
+ (" (cached)" if self.cached else "")
)
[docs]
def iterator(self, region=None, feature_types=None):
"""returns a :class:`.TranscriptomeIterator` for iterating over all features of the passed type(s).
If feature_types is None (default), all features will be returned.
"""
return TranscriptomeIterator(self, region=region, feature_types=feature_types)
def __iter__(self):
with self.iterator() as pos_it:
yield from pos_it
[docs]
def get_struct(self):
"""Return a dict mapping feature to child feature types"""
return self._ft2child_ftype
[docs]
@dataclass(frozen=True, repr=False)
class Feature(GI_dataclass):
"""
A (frozen) genomic feature, e.g., a gene, a transcript, an exon, an intron,
a CDS or a three_prime_UTR/five_prime_UTR. Features are themselves containers of sub-features
(e.g., a gene contains transcripts, a transcript contains exons, etc.). Sub-features are stored in
the respective child tuples (e.g., gene.transcript, transcript.exon, etc.).
Features are built by the Transcriptome class and are typically not instantiated directly. They
contain a reference to their parent feature (e.g., a transcript's parent is a gene) and a list of
child features (e.g., a gene's children are transcripts). The parent-child relationships are
defined by the transcriptome implementation and are typically defined by the GFF file used to build
the transcriptome. For example, a GFF file may contain a gene feature with a child transcript feature
and the transcript feature may contain a child exon feature. In this case, the transcriptome
implementation will define the parent-child relationships as follows:
gene.parent = None
gene.children = {'transcript': [transcript1, transcript2, ...]}
transcript.parent = gene
transcript.children = {'exon': [exon1, exon2, ...]}
exon.parent = transcript
exon.children = {}
Features will also contain a dynamically created dataclass (see create_sub_class()) that contains
annotations parsed from the GFF file. For example, a gene feature may contain a 'gene_name' annotation
that was parsed from the GFF file. Annotations are stored in the feature's anno dict and can be accessed
via <feature>.<annotation> (e.g., gene.gene_name). Annotations are typically strings but can be of any
type.
Equality of features is defined by comparing their genomic coordinates and strand,
as well as their feature_type (e.g., transcript, exon, five_prime_UTR) and feature_id (which should be unique
within a transcriptome), as returned by the key() method.
Features are typically associated with the `Transcriptome` object used to create them and (mutable)
additional annotations are stored in the respective transcriptome's `anno` dict can be directly accessed via
<feature>.<annotation>. This hybrid approach allows efficient and transparent access to readonly (immutable)
annotations parsed from the GFF file as well as to mutable annotations that are added later (e.g, using the
Transcriptome.annotate() function).
This also includes dynamically calculated (derived) annotations such as feature sequences that are sliced from
a feature's predecessor via the get_sequence() method. Rnalib stores sequences only at the gene level and
slices the subsequences of child features from those to minimize storage requirements.
For example, if the sequence of an exon is requested via `exon.sequence` then the Feature implementation will
search for a 'sequence' annotation in the exons super-features by recursively traversing 'parent' relationships.
The exon sequence will then be sliced form this sequence by comparing the respective genomic coordinates (which
works only if parent intervals always envelop their children as asserted by the transcriptome implementation).
This mechanism can also be used for any other sliceable annotation (e.g., a numpy array) that is stored in the
transcriptome's anno dict. To access such annotations, use the get(..., slice_from_parent=True) method (e.g.,
`exon.get('my_array', slice_from_parent=True)`). This will recursively traverse the parent/child
relationships until the my_array annotation is found and then slice the respective section from there (assuming
that array indices correspond to genomic coordinates).
Subfeatures of a feature can be accessed via the features() method which returns a generator over all such
features that is unsorted by default. To iterate over features in a sorted order, consider using the
TranscriptomeIterator class.
To inspect a features methods and attributes, use vars(feature), dir(feature) or help(feature).
"""
transcriptome: Transcriptome = None # parent transcriptome
feature_id: str = None # unique feature id
feature_type: str = None # a feature type (e.g., exon, intron, etc.)
parent: object = field(
default=None, hash=False, compare=False
) # an optional parent
subfeature_types: tuple = tuple() # sub-feature types
def __repr__(self) -> str:
return f"{self.feature_type}@{self.chromosome}:{self.start}-{self.end}"
[docs]
def key(self) -> tuple:
"""Returns a tuple containing feature_id, feature_type and genomic coordinates including strand"""
return (
self.feature_id,
self.feature_type,
self.chromosome,
self.start,
self.end,
self.strand,
)
def __eq__(self, other):
"""Compares two features by key."""
# if issubclass(other.__class__, Feature): # we cannot check for subclass as pickle/unpickle by ref will
# result in different parent classes
if (other is None) or (
not hasattr(other, 'key')
): # should we check whether key is callable?
return False
return self.key() == other.key()
def __getattr__(self, attr):
if attr == "location":
return self.get_location()
elif attr == "rnk":
return self.get_rnk()
elif self.transcriptome: # get value from transcriptome anno dict
if attr == "sequence":
return self.transcriptome.get_sequence(self)
elif attr == "spliced_sequence":
return self.transcriptome.get_sequence(self, mode="spliced")
elif attr == "rna_sequence":
return self.transcriptome.get_sequence(self, mode="rna")
elif attr == "translated_sequence":
return self.transcriptome.get_sequence(self, mode="translated")
if attr in self.transcriptome.anno[self]:
return self.transcriptome.anno[self][attr]
if self.parent and self.parent.feature_type == attr: # e.g., tx.gene # noqa
return self.parent
raise AttributeError(
f"{self.feature_type} has no attribute/magic function {attr}"
)
[docs]
def get(self, attr, default_value=None, slice_from_parent=False):
"""Safe getter supporting default value and slice-from-parent"""
if slice_from_parent and (self.transcriptome is not None):
return self.transcriptome.slice_from_parent(
self, attr, default_value=default_value
)
else:
return getattr(self, attr, default_value)
[docs]
@classmethod
def from_gi(
cls,
loc,
):
"""Init from gi"""
return cls(loc.chromosome, loc.start, loc.end, loc.strand)
[docs]
def get_location(self):
"""Returns a genomic interval representing the genomic location of this feature."""
return gi(self.chromosome, self.start, self.end, self.strand)
[docs]
def get_rnk(self):
"""Rank (1-based index) of feature in this feature's parent children list"""
if not self.parent:
return None
return self.parent.__dict__[self.feature_type].index(self) + 1
[docs]
def features(self, feature_types=None):
"""Yields all sub-features (not sorted).
To get a coordinate-sorted iterator, use sorted(feature.features()). Sorting by chromosome
is not required as subfeatures are enveloped by their parents by convention.
"""
for ft in self.subfeature_types:
for f in self.__dict__[ft]:
if (not feature_types) or (f.feature_type in feature_types):
yield f
for sf in f.features(): # recursion
if (not feature_types) or (sf.feature_type in feature_types):
yield sf
# dynamic feature class creation
[docs]
@classmethod
def create_sub_class(
cls, feature_type, annotations: dict = None, child_feature_types: list = None
):
"""Create a subclass of feature with additional fields (as defined in the annotations dict)
and child tuples
"""
assert feature_type is not None, (
f"feature_type must not be None: {cls}, {feature_type}, {annotations}, "
f"{child_feature_types}"
)
fields = [
("feature_id", str, field(default=None)),
("feature_type", str, field(default=feature_type)),
]
fields += [
(k, v, field(default=None))
for k, v in annotations.items()
if k not in ["feature_id", "feature_type"]
]
if child_feature_types is not None:
fields += [
(k, tuple, field(default=tuple(), hash=False, compare=False))
for k in child_feature_types
]
sub_class = make_dataclass(
feature_type, fields=fields, bases=(cls,), frozen=True, repr=False, eq=False
)
return sub_class
[docs]
def get_rel_coordinates(self, subfeature, query, strand_specific=False):
"""Returns a GI with relative coordinates of the passed query in this feature"""
return rna.get_rel_coord(self, subfeature, query, strand_specific)
[docs]
def get_protein_sequence(self, query=None):
"""Returns the biotite ProteinSequence object describing the amino-acid sequence of this feature (if it is a
transcript, has a CDS and has a sequence). Otherwise, None is returned.
"""
if (self.feature_type != "transcript") or ("CDS" not in self.subfeature_types):
return None
tseq = self.translated_sequence
if tseq is None:
return None
aa_seq = seq.NucleotideSequence(tseq).translate(complete=True)
if query is not None:
offsets = self.get_rel_coordinates("CDS", query)
if offsets is None:
return None
aa_seq = aa_seq[(offsets.start-1)//3: (offsets.end-1)//3+1]
return aa_seq
class _Feature:
"""
A mutable genomic (annotation) feature that is used only for building a transcriptome.
"""
def __init__(
self,
transcriptome,
feature_type,
feature_id,
loc=None,
parent=None,
children=None,
):
self.transcriptome = transcriptome
self.loc = loc
self.feature_type = feature_type
self.feature_id = feature_id
self.parent = parent
self.children = {} if children is None else children
self.anno = {}
assert loc is not None
def get_anno_rec(self):
"""compiles a dict containing all annotations of this feature and all its children per feature_type"""
a = {self.feature_type: {k: type(v) for k, v in self.anno.items()}}
t = {self.feature_type: set()}
s = {self.feature_type}
if self.children:
for cat in self.children:
t[self.feature_type].add(cat)
s.add(cat)
for c in self.children[cat]:
x, y, z = c.get_anno_rec()
a.update(x)
t.update(y)
s.update(z)
return a, t, s
def set_location(self, loc):
self.loc = loc
# def __repr__(self):
# return feature"{self.feature_type}@{super().__repr__()} ({ {k: len(v) for k, v in self.children.items()}
# if self.children else 'NA'})"
def freeze(self, ft2class):
"""Create a frozen instance (recursively)"""
f = ft2class[self.feature_type].from_gi(self.loc)
object.__setattr__(f, "transcriptome", self.transcriptome)
object.__setattr__(f, "feature_id", self.feature_id)
for k, v in self.anno.items():
object.__setattr__(f, k, v)
if self.children:
object.__setattr__(f, "subfeature_types", tuple(self.children))
for k in self.children:
children = [x.freeze(ft2class) for x in self.children[k]]
if self.loc.strand == "-": # reverse order if on neg strand
children = list(reversed(children))
for c in children:
object.__setattr__(c, "parent", f)
object.__setattr__(f, k, tuple(children))
return f
[docs]
class AbstractFeatureFilter(ABC):
"""For filtering genes/transcripts entries from a GFF when building a transcriptome."""
[docs]
@abstractmethod
def filter(self, loc, info): # -> bool, str:
"""Returns true if the passed feature should be filtered, false otherwise.
The filter can also return a filter message that will be added to the log.
"""
pass
[docs]
def get_chromosomes(self): # -> Set[str]:
"""Returns the set of chromosomes to be included or None if all (no filtering applied)."""
return None
[docs]
class TranscriptFilter(AbstractFeatureFilter):
"""A transcript filter that can be used to filter genes/transcripts by location and/or feature type
specific info fields. It can be configured by chaining include_* methods or by passing a dict to the
constructor. Features will first be filtered by location, then by included and then by excluded feature type
specific info fields.
For location filtering, the following rules apply:
- if included_chrom is not None, only transcripts on these chromosomes will be included
- if excluded_chrom is not None, transcripts on these chromosomes will be excluded
- if included_regions is not None, only transcripts overlapping these regions will be included
- if excluded_regions is not None, transcripts overlapping these regions will be excluded
For feature type specific filtering, the following rules apply:
- in the feature is a transcript and included_tids is not None, it will be filtered unless its feature_id is in
included_tids
- if included is not None, only transcripts with the specified feature type specific info fields
will be included. Info fields are parsed to sets of values by splitting on `','`.
if the feature type specific info field is not present, the feature will be
filtered unless None is added to the included list.
- if excluded is not None, transcripts with the specified feature type specific info fields
will be excluded. Info fields are parsed to sets of values by splitting on `','`.
Examples
--------
>>> # create a filter that includes only protein coding genes
>>> tf1 = TranscriptFilter().include_gene_types({'protein_coding'})
>>> # create a filter that includes only protein coding genes and transcripts on chr1
>>> tf2 = TranscriptFilter().include_gene_types({'protein_coding'}).include_chromosomes({'chr1'})
>>> # create a filter that includes only canonical genes and transcripts in a given region.
>>> # Features without tags will be included
>>> tf3 = TranscriptFilter().include_tags({'Ensembl_canonical', None}).include_regions({'chr1:1-10000'})
>>> # build only for given set of transcript ids
>>> tf4 = TranscriptFilter().include_transcript_ids({'ENST00000367770', 'ENST00000367771'})
>>> # create a filtered Transcriptome
>>> config = { ... }
>>> t = Transcriptome(config, feature_filter=tf4)
Notes
-----
TODO
* add greater, smaller than filters for feature type specific info fields
* add gene name filtering
* redesign?
"""
def __init__(self, config=None):
self.config = AutoDict() if config is None else config
self.included_ftypes = get_config(
self.config, ["location", "included", "feature_types"], default_value=None
)
self.excluded_ftypes = get_config(
self.config, ["location", "excluded", "feature_types"], default_value=None
)
self.included_chrom = get_config(
self.config, ["location", "included", "chromosomes"], default_value=None
)
self.excluded_chrom = get_config(
self.config, ["location", "excluded", "chromosomes"], default_value=None
)
self.included_regions = get_config(
self.config, ["location", "included", "regions"], default_value=None
)
self.excluded_regions = get_config(
self.config, ["location", "excluded", "regions"], default_value=None
)
self.included_tids = get_config(
self.config, ["location", "included", "transcript_ids"], default_value=None
)
if self.included_ftypes is not None:
self.included_ftypes = set(self.included_ftypes)
if self.excluded_ftypes is not None:
self.excluded_ftypes = set(self.excluded_ftypes)
if self.included_chrom is not None:
self.included_chrom = set(self.included_chrom)
if self.excluded_chrom is not None:
self.excluded_chrom = set(self.excluded_chrom)
if self.included_regions is not None:
self.included_regions = {GI.from_str(s) for s in self.included_regions}
if self.excluded_regions is not None:
self.excluded_regions = {GI.from_str(s) for s in self.excluded_regions}
if self.included_tids is not None:
self.included_tids = set(self.included_tids)
[docs]
def filter(self, loc, info, feature_type=None):
if feature_type is None:
feature_type = info.get("feature_type", None)
# location filtering
if (
self.included_chrom is not None
and loc.chromosome not in self.included_chrom
):
return True, "included_chromosome"
if self.excluded_chrom is not None and loc.chromosome in self.excluded_chrom:
return True, "excluded_chromosome"
if self.included_regions is not None and not any(
loc.overlaps(r) for r in self.included_regions
):
return True, "included_location"
if self.excluded_regions is not None and any(
loc.overlaps(r) for r in self.excluded_regions
):
return True, "excluded_region"
# feature type specific info field filtering
if (self.excluded_ftypes is not None) or (self.included_ftypes is not None):
if feature_type in self.excluded_ftypes:
return True, "excluded_ftypes"
if feature_type not in self.included_ftypes:
return True, "included_ftypes"
# filter for ids
if feature_type == "transcript" and (self.included_tids is not None):
if info.get("transcript_id", "") not in self.included_tids:
return True, "not_in_id_list"
included = get_config(
self.config, [info["feature_type"], "included"], default_value=None
)
if included is not None:
for info_field in included:
if info_field not in info:
if None not in included[info_field]:
return True, f"missing_{info_field}"
else:
found_values = set(info[info_field].split(","))
n_found = len(set(included[info_field]) & found_values)
if n_found == 0: # values not found
return True, f"missing_{info_field}_value"
excluded = get_config(
self.config, [info["feature_type"], "excluded"], default_value=None
)
if excluded is not None:
for info_field in excluded:
if info_field not in info:
return (
False,
f"found_{info_field}",
) # excluded does not support None
found_values = set(info[info_field].split(","))
n_found = len(set(excluded[info_field]) & found_values)
if n_found > 0: # values found
return True, f"found_{info_field}_value"
return False, "passed"
[docs]
def get_chromosomes(self):
"""Returns the set of chromosomes to be included"""
if self.included_chrom is None:
return None
return self.included_chrom.difference(
{} if self.excluded_chrom is None else self.excluded_chrom
)
def __repr__(self):
"""Returns a string representation of the filter"""
return json.dumps(self.config, indent=4)
[docs]
def include_feature_types(self, ftypes: set):
"""Convenience method to add ftypes to the included_ftypes set"""
ftypes = to_set(ftypes)
if self.included_ftypes is None:
self.included_ftypes = ftypes
else:
self.included_ftypes.update(ftypes)
self.config["location"]["included"]["feature_types"] = list(
self.included_ftypes
)
return self
[docs]
def include_chromosomes(self, chromosomes: set):
"""Convenience method to add chromosomes to the included_chrom set"""
chromosomes = to_set(chromosomes)
if self.included_chrom is None:
self.included_chrom = chromosomes
else:
self.included_chrom.update(chromosomes)
self.config["location"]["included"]["chromosomes"] = list(self.included_chrom)
return self
[docs]
def include_transcript_ids(self, tids: set):
"""Convenience method to add transcript ids to the included_tids set"""
tids = to_set(tids)
if self.included_tids is None:
self.included_tids = tids
else:
self.included_tids.update(tids)
self.config["location"]["included"]["transcript_ids"] = list(self.included_tids)
return self
[docs]
def include_regions(self, regions: set):
"""Convenience method to add regions to the included_regions set"""
if isinstance(regions, str):
regions = {gi(r) for r in regions.split(",")}
if self.included_regions is None:
self.included_regions = regions
else:
self.included_regions.update(regions)
self.config["location"]["included"]["regions"] = list(self.included_regions)
return self
[docs]
def include_gene_ids(self, ids: set):
"""Convenience method to add included gene ids"""
self.config["gene"]["included"]["gene_id"] = to_set(ids)
return self
[docs]
def include_gene_types(
self, gene_types: set, include_missing=True, feature_type="gene"
):
"""Convenience method to add included gene_types to gene+transcript inclusion rules. Use, e.g.,
{'protein_coding'} to load only protein coding genes. If include_missing is True then genes/transcripts
without gene_type will also be included."""
gene_types = to_set(gene_types)
self.config[feature_type]["included"]["gene_type"] = (
list(gene_types) + [None] if include_missing else list(gene_types)
)
return self
[docs]
def include_transcript_types(
self, transcript_types: set, include_missing=True, feature_type="transcript"
):
"""Convenience method to add included transcript_types. Use, e.g., {'miRNA'} to load only
miRNA transcripts. If include_missing is True (default) then transcripts without transcript_type will
also be included."""
transcript_types = to_set(transcript_types)
self.config[feature_type]["included"]["transcript_type"] = (
list(transcript_types) + [None]
if include_missing
else list(transcript_types)
)
return self
[docs]
class RefDict(abc.Mapping[str, int]):
"""
Named mapping for representing a set of references (contigs) and their lengths.
Supports aliasing by passing a function (e.g., fun_alias=toggle_chr which will add/remove 'chr' prefixes) to
easily integrate genomic files that use different (but compatible) reference names. If an aliasing function is
passed, original reference names are accessible via the orig property. An aliasing function must be reversible,
i.e., fun_alias(fun_alias(str))==str and support None.
Note that two reference dicts match if their (aliased) contig dicts match (name of RefDict is not
compared).
"""
def __init__(self, d, name=None, fun_alias=None):
self.d = d
self.name = name
self.fun_alias = fun_alias
if fun_alias is not None:
self.orig = d.copy()
self.d = {fun_alias(k): v for k, v in d.items()} # apply fun to keys
else:
self.orig = self
def __getitem__(self, key):
return self.d[key]
def __len__(self):
return len(self.d)
def __iter__(self):
return iter(self.d)
[docs]
def has_len(self, chrom=None):
"""Returns true if the passed chromosome (or all if chrom is None) has an assigned length, false otherwise."""
if chrom is None:
return not all(v is None for v in self.values())
return self.d[chrom] is not None
[docs]
def set_len(self, chrom: str = None, length: int = None):
"""
Sets the length of the passed chromosome.
If chrom is None, all chromosomes will be set to the passed length.
Use set_len() to unset the length of all chromosomes.
Parameters
----------
chrom : str
The chromosome name. If None, all chromosomes will be set to the passed length.
length : int
The chromosome length or None if not set
"""
if chrom is None:
for c in self.d:
self.d[c] = length
else:
self.d[chrom] = length
[docs]
def tile(self, tile_size=int(1e6)):
"""
Iterates in an ordered fashion over the reference dict, yielding non-overlapping genomic intervals of the
given tile_size (or smaller at chromosome ends).
"""
for chrom, chrlen in self.d.items():
chrom_gi = gi(chrom, 1, chrlen) # will use maxint if chrlen is None!
for tile in chrom_gi.split_by_maxwidth(tile_size):
yield tile
def __repr__(self):
return (
f"RefDict (size: {len(self.d.keys())}): {self.d.keys()}"
f"{f' (aliased from {self.orig.keys()})' if self.fun_alias else ''}, {self.d.values()} name:"
f" {self.name} "
)
[docs]
def chromosomes(self):
"""Returns a list of chromosome names"""
return list(self.d.keys())
[docs]
def chromosomes_orig(self):
"""Returns a list of chromosome names"""
return list(self.orig.keys())
[docs]
def alias(self, chrom):
if self.fun_alias:
return self.fun_alias(chrom)
return chrom
[docs]
def index(self, chrom):
"""Index of the passed chromosome, None if chromosome not in refdict or -1 if None was passed.
Useful, e.g., for sorting genomic coordinates
"""
if not chrom:
return -1
try:
return list(self.keys()).index(chrom)
except ValueError:
warnings.warn(f"{chrom} not in refdict")
return None
[docs]
@staticmethod
def merge_and_validate(*refdicts, check_order=False, included_chrom=None):
"""
Checks whether the passed reference sets are compatible and returns the
merged reference set containing the intersection of common references.
Parameters
----------
refdicts:
list of RefDicts
check_order:
if True, the order of the common references is asserted. default: False
included_chrom:
if passed, only the passed chromosomes are considered. default: None
Returns
-------
RefDict containing the intersection of common references
"""
refdicts = [r for r in refdicts if r is not None]
if len(refdicts) == 0:
return None
# intersect all contig lists while preserving order (set.intersection() or np.intersect1d() do not work!)
shared_ref = {
k: None
for k in intersect_lists(
*[list(r.keys()) for r in refdicts], check_order=check_order
)
if (included_chrom is None) or (k in included_chrom)
}
# check whether contig lengths match
for r in refdicts:
for contig, oldlen in shared_ref.items():
newlen = r.get(contig)
if newlen is None:
continue
if oldlen is None:
shared_ref[contig] = newlen
else:
assert oldlen == newlen, (
f"Incompatible lengths for contig ({oldlen}!={newlen}) when comparing "
f"RefDicts {refdicts}"
)
return RefDict(
shared_ref,
name=",".join(
[r.name if r.name else "<unnamed refdict>" for r in refdicts]
),
fun_alias=None,
)
[docs]
@staticmethod
def load(fh, fun_alias=None, calc_chromlen=False):
"""Extracts chromosome names, order and (where possible) length from pysam objects.
Parameters
----------
fh : pysam object or file path (str)
fun_alias : aliasing functions (see RefDict)
calc_chromlen : bool, if True, chromosome lengths will be calculated from the file (if required)
Returns
-------
dict: chromosome name to length
Raises
------
NotImplementedError
if input type is not supported yet
"""
was_opened = False
try:
if isinstance(fh, str):
fh = open_file_obj(fh)
was_opened = True
if isinstance(fh, pysam.Fastafile): # @UndefinedVariable
return RefDict(
{c: fh.get_reference_length(c) for c in fh.references},
name=f"References from FASTA file {fh.filename}",
fun_alias=fun_alias,
)
elif isinstance(fh, pysam.AlignmentFile): # @UndefinedVariable
return RefDict(
{c: fh.header.get_reference_length(c) for c in fh.references},
name=f"References from SAM/BAM file {fh.filename}",
fun_alias=fun_alias,
)
elif isinstance(fh, pysam.TabixFile): # @UndefinedVariable
if calc_chromlen: # no ref length info in tabix, we need to iterate :-(
refdict = {}
for c in fh.contigs:
line = ("", "", "0") # default
for _, line in enumerate(fh.fetch(c)):
pass # move to last line
refdict[c] = int(line.split("\t")[2])
else:
refdict = {
c: None for c in fh.contigs
} # no ref length info in tabix...
return RefDict(
refdict,
name=f"References from TABIX file {fh.filename}",
fun_alias=fun_alias,
)
elif isinstance(fh, pysam.VariantFile): # @UndefinedVariable
return RefDict(
{c: fh.header.contigs.get(c).length for c in fh.header.contigs},
name=f"References from VCF file {fh.filename}",
fun_alias=fun_alias,
)
elif isinstance(fh, pyBigWig.pyBigWig): # @UndefinedVariable
return RefDict(
{c: l for c, l in fh.chroms().items()},
name=f"References from bigWig/bigBed file {fh}",
fun_alias=fun_alias,
)
else:
raise NotImplementedError(f"Unknown input object type {type(fh)}")
finally:
if was_opened:
fh.close()
# class Item(namedtuple('Item',['location','data'])):
# """ A location, data tuple, returned by an LocationIterator """
#
# def __len__(self):
# """ Reports the length of the wrapped location, not the current tuple"""
# return self.location.__len__()
[docs]
class Item(NamedTuple):
"""A location, data tuple, returned by an LocationIterator"""
location: gi
data: Any
def __len__(self):
"""Reports the length of the wrapped location, not the current tuple"""
return self.location.__len__()
[docs]
class LocationIterator:
"""
Superclass for genomic iterators (mostly based on pysam) for efficient, indexed iteration over genomic
datasets. Most rnalib iterables inherit from this superclass.
A LocationIterator iterates over a genomic dataset and yields `Item`'s, that are tuples of genomic intervals and
associated data. The data can be any object (e.g., a string, a dict, a list, etc.). Location iterators can be
restricted to a genomic region.
LocationIterators keep track of their current genomic location (self.location) as well as statistics about
the iterated and yielded items (stats()). They can be consumed and converted to lists (self.to_list()),
pandas dataframes (self.to_dataframe()), interval trees (self.to_intervaltrees()) or bed files (self.to_bed()).
LocationIterators can be grouped or tiled, see the group() and tile() methods.
LocationIterators are iterable, implement the context manager protocol ('with' statement) and provide a
standard interface for region-type filtering (by passing chromosome sets or genomic regions of
interest). Where feasible and not supported by the underlying (pysam) implementation, they implement chunked
I/O where for efficient iteration (e.g., FastaIterator).
The maximum number of iterated items can be queried via the max_items() method which tries to guesstimate this
number (or an upper bound) from the underlying index data structures.
Examples
--------
>>> # Creates a LocationIterator via factory method and consume its items, splitting into locs and data lists.
>>> locs, data = zip(*it('myfile.bed'))
Attributes
----------
_stats : Counter
A counter that collects statistics (e.g., yielded items per chromosome) for QC and reporting.
location : GI
The current genomic location of this iterator
Parameters
----------
file : str or pysam file object
A file path or an open file object. In the latter case, the object will not be closed
region : GI or str
genomic region to iterate
file_format : str
optional, will be determined from filename if omitted
per_position : bool
optional, if True, the iterator will yield per-position items (e.g., FASTA files)
fun_alias : Callable
optional, if set, the iterator will use this function for aliasing chromosome names
refdict : RefDict
optional, if set, the iterator will use the passed reference dict instead of reading it from the file
Notes
-----
TODOs:
* strand specific iteration
* url streaming
* add is_exhausted flag?
"""
location: GI = None
file: Any = None
refdict: RefDict = None
region: GI = None
chromosomes: list = None
fun_alias: Callable = None
per_position: bool = False
@abstractmethod
def __init__(
self,
file,
region: GI = None,
file_format: str = None,
per_position: bool = False,
fun_alias: Callable = None,
refdict: RefDict = None,
):
self._stats = Counter() # counter for collecting stats
self.location = None
self.per_position = per_position
if isinstance(file, (str, PathLike)):
self.file = open_file_obj(file, file_format=file_format) # open new object
self._was_opened = True
else:
self.file = file
self._was_opened = False
self.fun_alias = fun_alias
# use custom refdict or load from file if possible
if refdict is None:
assert file is not None, "No file or refdict passed"
self.refdict = RefDict.load(self.file, fun_alias=fun_alias)
else:
self.refdict = refdict
# region is a gi object. List of iterated chromosomes is set in self.chromosomes
self.set_region(region)
@property
def stats(self):
"""Returns the collected stats"""
return self._stats
[docs]
def set_region(self, region):
"""Update the iterated region of this iterator.
Note that the region's chromosome must be in this anno_its refdict (if any)
"""
if region is not None:
self.region = gi(region) if isinstance(region, str) else region
if (self.refdict is not None) and self.region.chromosome is not None:
assert self.region.chromosome in self.refdict, (
f"{self.region.chromosome} not found in references"
f" {self.refdict}"
)
# all chroms in correct order, as seen in the datafile (not aliased)
self.chromosomes = (
[self.refdict.alias(self.region.chromosome)]
if self.region.chromosome is not None
else (self.refdict.orig.keys())
)
else:
self.region = gi()
self.chromosomes = self.refdict.orig.keys()
[docs]
def to_list(self, style="item"):
"""Convenience method that consumes iterator and returns a list of items, locations or data.
Parameters
----------
style : str
'item' (default): all items with be returned. Same as list(iterator).
'location': only the locations of the items will be returned
'data': only the data of the items will be returned
Examples
--------
>>> it('myfile.bed').to_list()
"""
if style == "location":
return [x.location for x in self]
elif style == "data":
return [x.data for x in self]
return list(self)
[docs]
def to_bed(
self,
out,
fun_anno=lambda idx, loc, item: (
f"item{idx}",
".",
loc.start - 1,
loc.end,
"0,0,0",
1,
len(loc),
0,
),
bed_header=None,
disable_progressbar=True,
no_header=False,
n_col=12,
):
"""Consumes iterator and returns results in BED format to the passed output stream.
Parameters
----------
out : file-like object
The output file-like object. If None, then the results will be returned as a list.
fun_anno : function
A function that takes a feature index, a location and a feature as input and returns a tuple of
strings that will be added to the BED output file.
bed_header : dict
A dict containing the BED header fields. If None (default), a default header will be used.
disable_progressbar : bool
If true, the progressbar will be disabled.
no_header : bool
If true, no header will be written to the output file.
n_col : int
The number of columns to be written to the output file. Default: 12
Examples
--------
>>> out_file = ...
>>> with open_file_obj(out_file, 'wt') as wout:
>>> with LocationIterator(...) as lit: # use respective subclass here
>>> lit.to_bed(wout)
>>> bgzip_and_tabix(out_file)
"""
if not no_header and out is not None:
if bed_header is None:
bed_header = {"visibility": 1, "itemRgb": "On", "useScore": 1}
print(
f"track {' '.join([f'{x}={bed_header[x]}' for x in bed_header])}",
file=out,
)
ret = list()
for idx, (loc, item) in tqdm(
enumerate(self), desc=f"Writing bed file", disable=disable_progressbar
):
(
name,
score,
thick_start,
thick_end,
rgb,
block_count,
block_sizes,
block_starts,
) = fun_anno(idx, loc, item)
s = (
loc.chromosome,
loc.start - 1,
loc.end,
name,
score,
loc.strand,
thick_start,
thick_end,
rgb,
block_count,
block_sizes,
block_starts,
)[:n_col]
if out is None:
ret.append(s)
else:
print(to_str(s, sep="\t", na="."), file=out)
if out is None:
return ret
return None
[docs]
def to_dataframe(
self,
fun=lambda loc, item, fun_col, default_value: [
str(item)
], # default: convert item to string repr
fun_col=("Value",),
coord_inc=(0, 0),
coord_colnames=("Chromosome", "Start", "End", "Strand"),
excluded_columns=None,
included_columns=None,
dtypes=None,
default_value=None,
max_items=None,
disable_progressbar=True,
):
"""Consumes iterator (up to max_items items) and returns results in a dataframe.
Start/stop Coordinates will be corrected by the passed coord_inc tuple.
Parameters
----------
fun : function
a function that converts the yielded items of this iterator into a tuple of values that represent
fun_col column values in the created dataframe
fun_col : tuple
a tuple of column names for the created dataframe
coord_inc : tuple
a tuple of values to be added to start and end coordinates
coord_colnames : tuple
a tuple of column names for the created dataframe
excluded_columns : tuple
optional, a tuple of column names to be excluded from the created dataframe
included_columns : tuple
optional, a tuple of column names to be included in the created dataframe
dtypes : dict
optional, a dict of column names and their respective dtypes
default_value : any
optional, a default value to be used if a column value is not present in the iterated item
max_items : int
maximum number of included items (None: all)
disable_progressbar : bool
optional, if True, no progressbar will be shown
"""
assert fun is not None
assert (fun_col is not None) and (isinstance(fun_col, tuple))
# exclude/include columns
if excluded_columns is not None:
fun_col = tuple(col for col in fun_col if col not in excluded_columns)
if included_columns is not None:
fun_col += included_columns
if max_items is None: # fast list comprehension
df = pd.DataFrame(
[
[
loc.chromosome,
loc.start,
loc.end,
"." if loc.strand is None else loc.strand,
]
+ fun(loc, item, fun_col, default_value)
for idx, (loc, item) in enumerate(
tqdm(
self,
desc=f"Building dataframe",
disable=disable_progressbar,
)
)
],
columns=coord_colnames + fun_col,
)
else: # construct (small) list and then build df
lst = list()
for idx, (loc, item) in enumerate(
tqdm(self, desc=f"Building dataframe", disable=disable_progressbar)
):
if idx >= max_items:
break
lst.append(
[
loc.chromosome,
loc.start,
loc.end,
"." if loc.strand is None else loc.strand,
]
+ fun(loc, item, fun_col, None)
)
df = pd.DataFrame(lst, columns=coord_colnames + fun_col)
if dtypes is not None:
for k, v in dtypes.items():
df[k] = df[k].astype(v)
# Correct coordinates if required
for inc, col in zip(coord_inc, [coord_colnames[1], coord_colnames[2]]):
if inc != 0:
df[col] += inc
return df
[docs]
def describe(
self,
fun=lambda loc, item, fun_col, default_value: [
str(item)
], # default: convert item to string repr
fun_col=("Value",),
coord_inc=(0, 0),
coord_colnames=("Chromosome", "Start", "End", "Strand"),
excluded_columns=None,
included_columns=None,
dtypes=None,
default_value=None,
max_items=None,
disable_progressbar=True,
) -> Tuple[pd.DataFrame, dict]:
"""Converts this iterator to a pandas dataframe and calls describe(include='all')"""
df = self.to_dataframe(
fun=fun,
fun_col=fun_col,
coord_inc=coord_inc,
coord_colnames=coord_colnames,
excluded_columns=excluded_columns,
included_columns=included_columns,
dtypes=dtypes,
default_value=default_value,
max_items=max_items,
disable_progressbar=disable_progressbar,
)
# calculate overlap with bioframe and check for empty intervals (end<start)?
return (
df.describe(include="all"),
{
"contains_overlapping": len(df.index)
> len(bioframe.merge(df, cols=("Chromosome", "Start", "End")).index),
"contains_empty": sum((df["End"] - df["Start"]) < 0) > 0,
},
)
[docs]
def to_intervaltrees(self, disable_progressbar=False):
"""Consumes iterator and returns results in a dict of intervaltrees.
.. warning::
NOTE that this will silently drop empty intervals
Notes
-----
* TODO
improve performance with from_tuples method
"""
chr2itree = {} # a dict mapping chromosome ids to annotation interval trees.
for loc, item in tqdm(
self, desc=f"Building interval trees", disable=disable_progressbar
):
if loc.is_empty():
continue
if loc.chromosome not in chr2itree:
chr2itree[loc.chromosome] = IntervalTree()
# add 1 to end coordinate, see itree conventions @ https://github.com/chaimleib/intervaltree
chr2itree[loc.chromosome].addi(loc.start, loc.end + 1, item)
return chr2itree
[docs]
def merge(
self,
iterables: Iterable['LocationIterator'],
labels: Iterable[str] = None,
refdict: RefDict = None,
):
"""Merges this iterator with the passed iterables and returns a new iterator.
Parameters
----------
iterables : iterable
an iterable of LocationIterators to be merged with this iterator
labels : iterable
an iterable of labels for the passed iterables. If None, f"it{idx}" will be used.
refdict : RefDict
optional, if set, this refdict will be used for the merged iterator, otherwise a merged refdict will be used
"""
if not isinstance(iterables, (list, tuple)):
iterables = [self] + [iterables]
else:
iterables = [self] + list(iterables)
return MergedLocationIterator(iterables, labels=labels, refdict=refdict)
[docs]
def group(self, strategy="both"):
"""Wraps this iterator in a GroupedLocationIterator"""
return GroupedLocationIterator(self, strategy=strategy)
[docs]
def tile(self, regions_iterable: Iterable[GI] = None, tile_size=1e8):
"""Wraps this iterator in a TiledIterator.
Parameters
----------
regions_iterable : iterable
an iterable of genomic regions (GI) that defines the tiles. Will be calculated from the refdict if None.
tile_size : int
the size of the tiles
"""
return TiledIterator(
location_iterator=self,
regions_iterable=regions_iterable,
tile_size=tile_size,
)
[docs]
@abstractmethod
def max_items(self):
"""Maximum number of items yielded by this iterator or None if unknown.
Note that this is the upper bound of yielded items but less (or even no) items may be yielded
based on filter settings, etc. Useful, e.g., for progressbars or time estimates
"""
return None
@abstractmethod
def __iter__(self) -> Item:
yield None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
[docs]
def close(self):
"""Closes the underlying file object if it was opened by this iterator"""
if self.file and self._was_opened:
# logging.debug(feature"Closing iterator {self}")
self.file.close()
[docs]
class MemoryIterator(LocationIterator):
"""
A location iterator that iterates over intervals retrieved from one of the following datastructures:
* {str:gi} dict: yields (gi,str); note that the passed strings must be unique
* {gi:any} dict: yields (gi,any)
* iterable of gis: yields (gi, index in input iterable). If a single GI object is passed, its contained
positions are iterated
Regions will be sorted according to a refdict that is created from the data automatically.
For associative datastructures (dicts), the key or value that is not the GI will be yielded as data item.
For others, the index in the sorted iterable will be yielded as data item.
Notes
-----
* reported stats:
iterated_items, chromosome: (int, str)
Number of iterated items
yielded_items, chromosome: (int, str)
Number of yielded items (remaining are filtered, e.g., due to region constraints)
"""
# d = { gi(1, 10,11): 'a', gi(1, 1,10): 'b', gi(2, 1,10): 'c' }
def __init__(self, d, region=None, fun_alias=None):
if isinstance(d, dict):
if len(d) > 0 and isinstance(next(iter(d.values())), str): # {gi:str}
d = {name: loc for loc, name in d.items()}
else:
# treat as iterable of GIs. If f is a GI, this will iterate over all contained positions
d = {name: loc for name, loc in enumerate(d)}
if fun_alias is not None: # chrom aliasing
d = {
name: gi(fun_alias(loc.chromosome), loc.start, loc.end, loc.strand)
for name, loc in d.items()
}
self._maxitems = len(d)
# get list of chromosomes
chromosomes = SortedSet([loc.chromosome for loc in d.values()])
self.data = {c: dict() for c in chromosomes} # split by chrom and sort
for name, loc in d.items():
self.data[loc.chromosome][name] = loc
# create refdict
self.refdict = RefDict(
{c: max(self.data[c].values()).end for c in self.data.keys()}
)
# call super constructor
super().__init__(
self.data,
region=region,
file_format=None,
per_position=False,
fun_alias=None, # already applied! don't set again
refdict=self.refdict,
)
[docs]
def to_bed(
self,
out,
fun_anno=lambda idx, loc, item: (
f"{item}",
".",
loc.start - 1,
loc.end,
"0,0,0",
1,
len(loc),
0,
),
bed_header=None,
disable_progressbar=True,
no_header=False,
n_col=4,
):
return super().to_bed(
out, fun_anno, bed_header, disable_progressbar, no_header, n_col
)
def __iter__(self) -> Item[gi, object]:
for chromosome in self.chromosomes:
for name, self.location in dict(
sorted(self.data[chromosome].items(), key=lambda item: item[1])
).items():
self._stats["iterated_items", chromosome] += 1
if self.region is None or self.region.overlaps(self.location):
self._stats["yielded_items", chromosome] += 1
yield Item(self.location, name)
[docs]
def max_items(self):
return self._maxitems
[docs]
class TranscriptomeIterator(LocationIterator):
"""
Iterates over (sorted) features in a transcriptome object. Yielded items contain the respective feature
(location) and its transcriptome annotation (data).
Note that no chromosome aliasing is possible with this iterator as returned features are immutable.
The transcriptome features will be filtered by feature_type and sorted if the transcriptome object
is not sorted (i.e., if features were added after it was built from a (sorted) GFF3 file).
A transcriptome iterator can be converted to a pandas dataframe via the to_dataframe() method
which consumes the remaining items. Column names of the dataframe are automatically estimated
from the transcriptome model. Data columns may be excluded via the 'excluded_columns' parameter;
by default the reserved 'dna_seq' field is excluded as this field may contain very long sequence strings.
The default conversion function (fun) extracts values from the iterated location objects (=feature).
To add custom columns, one needs to provide (i) a list of column names via the 'included_columns' parameter and
(ii) a custom extraction methof (fun) that return the respective values. Example:
Examples
--------
>>> # advanced usage: convert to pandas dataframe with a custom conversion function
>>> # here we add a 'feature length' column
>>> def my_fun(tx, item, fun_col, default_value):
>>> return [len(tx) if col == 'feature_len' else tx.get(col, default_value) for col in fun_col] # noqa
>>> t = Transcriptome(...)
>>> TranscriptomeIterator(t).to_dataframe(fun=my_fun, included_annotations=['feature_len']).head()
Parameters
----------
transcriptome : Transcriptome
The transcriptome object to iterate over
region : gi or str
optional, if set, only features overlapping with this region will be iterated
feature_types : list
optional, if set, only features of these types will be iterated
features : list
optional, if set, only these features will be iterated
Notes
-----
* reported stats:
iterated_items, chromosome: (int, str)
Number of iterated items
yielded_items, chromosome: (int, str)
Number of yielded items (remaining are filtered, e.g., due to region constraints)
"""
def __init__(self, transcriptome, region=None, feature_types=None, features=None):
super().__init__(None, region=region, refdict=transcriptome.merged_refdict)
self.t = transcriptome
self.feature_types = feature_types
self.features = (
None if features is None else to_set(features)
) # convert to set to speed-up contains() checks
self.is_sorted = self.t.is_sorted
[docs]
def max_items(self):
return len(self.t.anno)
[docs]
def to_dataframe(
self,
fun=lambda loc, item, fun_col, default_value: [
loc.get(col, default_value) for col in fun_col
],
fun_col=None,
coord_inc=(0, 0),
coord_colnames=("Chromosome", "Start", "End", "Strand"),
excluded_columns=("dna_seq",),
included_columns=None,
dtypes=None,
default_value=None,
max_items=None,
disable_progressbar=True,
):
"""Consumes iterator and returns results in a dataframe.
Start/stop Coordinates will be corrected by the passed coord_inc tuple.
fun_col is a tuple of column names for the created dataframe.
fun is a function that converts the yielded items of this iterator into a tuple of
values that represent fun_col column values in the created dataframe.
Example
-------
>>> t = Transcriptome(...)
>>> TranscriptomeIterator(t).to_dataframe().head()
"""
# mandatory fields; we use a dict to keep column order nice
fun_col = {"feature_id": None, "feature_type": None}
# add all annotation keys from the anno dict
fun_col.update(dict.fromkeys(get_unique_keys(self.t.anno), None))
# add annotation fields parsed from GFF
fun_col.update(
dict.fromkeys(get_unique_keys(self.t._ft2anno_class), None)
) # noqa
# preserve order
fun_col = tuple(fun_col.keys())
# call super method
return super().to_dataframe(
fun=fun,
fun_col=fun_col,
coord_inc=coord_inc,
coord_colnames=coord_colnames,
dtypes=dtypes,
excluded_columns=excluded_columns,
included_columns=included_columns,
default_value=default_value,
max_items=max_items,
disable_progressbar=disable_progressbar,
)
[docs]
def describe(
self,
fun=lambda loc, item, fun_col, default_value: [
loc.get(col, default_value) for col in fun_col
],
fun_col=("Value",),
coord_inc=(0, 0),
coord_colnames=("Chromosome", "Start", "End", "Strand"),
excluded_columns=("dna_seq",),
included_columns=None,
dtypes=None,
default_value=None,
max_items=None,
disable_progressbar=True,
) -> Tuple[pd.DataFrame, dict]:
# call super method
return super().describe(
fun=fun,
fun_col=fun_col,
coord_inc=coord_inc,
coord_colnames=coord_colnames,
dtypes=dtypes,
excluded_columns=excluded_columns,
included_columns=included_columns,
default_value=default_value,
max_items=max_items,
disable_progressbar=disable_progressbar,
)
def __iter__(self) -> Item:
items = self.features if self.features else self.t.anno.keys()
if (
not self.is_sorted
): # TODO: this will sort for evey tile if wrapped by an tilediterator
items = sorted(items, key=lambda x: (self.refdict.index(x.chromosome), x))
self._stats["iterated_items", "all"] = len(items)
if self.feature_types:
try:
items = filter(
lambda feat: feat.feature_type in self.feature_types, items
)
except AttributeError:
# this happens if the user put non-features in the anno dict
pass
if self.region is not None:
items = filter(lambda feat: feat.overlaps(self.region), items)
for f in items:
self._stats["yielded_items", f.chromosome] += 1
yield Item(f, self.t.anno[f])
[docs]
class FastaIterator(LocationIterator):
"""
Iterates over a FASTA file yielding sequence strings and keeps track of the covered genomic location.
The yielded sequence of length <width> will be padded with <fillvalue> characters if padding is True.
This generator will yield every step_size window, for a tiling window approach set step_size = width.
FASTA files will be automatically indexed if no existing index file is found and will be read in chunked mode.
Parameters
----------
fasta_file : str or pysam FastaFile
A file path or an open FastaFile object. In the latter case, the object will not be closed
region : gi or str
optional, if set, only features overlapping with this region will be iterated
width : int
sequence window size
step : int
increment for each yield
file_format : str
optional, will be determined from filename if omitted
chunk_size : int
optional, chunk size for reading fasta files. default: 1024
fill_value : str
optional, fill value for padding. default: 'N'
padding : bool
optional, if True, the sequence will be padded with fill_value characters. default: False
Padding may prefix/postfix the reported sequences at the region boundaries and accordingly, first and last
reported location GIs may extend beyond the region boundaries. Especially, if the region is close to the
chromosome start or end, the first and last reported locations may start/end outside the chromosome boundaries
and start coordinates may become negative.
fun_alias : Callable
optional, if set, the iterator will use this function for aliasing chromosome names
Yields
------
sequence: str
The extracted sequence in including sequence context.
The core sequence w/o context can be accessed via seq[context_size:context_size+width].
Notes
-----
* reported stats:
iterated_items, chromosome: (int, str)
Number of iterated and yielded items
"""
def __init__(
self,
fasta_file,
region=None,
width=1,
step=1,
file_format=None,
chunk_size: int = 1024,
fill_value="N",
padding=False,
fun_alias=None,
):
super().__init__(
fasta_file, region, file_format, per_position=True, fun_alias=fun_alias
)
self.width = 1 if width is None else width
self.step = step
self.fill_value = fill_value
self.padding = padding
self.chunk_size = chunk_size
[docs]
def max_items(self):
"""Maximum number of items yielded by this iterator or None if unknown."""
return None
[docs]
def iterate_data(self, chromosome):
"""
Reads pysam data in chunks and yields individual data items
"""
start_chunk = max(0, self.region.start - 1) # 0-based coordinates in pysam!
while True:
end_chunk = min(self.region.end, start_chunk + self.chunk_size)
if end_chunk <= start_chunk:
break
chunk = self.file.fetch(
reference=chromosome, start=start_chunk, end=end_chunk
)
if (chunk is None) or (len(chunk) == 0): # we are done
break
start_chunk += len(chunk)
for d in chunk:
yield d
def __iter__(self) -> Item:
padding = self.fill_value * (self.width // 2) if self.padding else ""
for chromosome in self.chromosomes:
pos1 = max(1, self.region.start) # 0-based coordinates in pysam!
pos1 -= len(padding)
for dat in windowed(
chain(padding, self.iterate_data(chromosome), padding),
fillvalue=self.fill_value,
n=self.width,
step=self.step,
):
if isinstance(dat, tuple):
dat = "".join(dat)
chromosome = self.refdict.alias(chromosome) # chrom aliasing
end_loc = pos1 + len(dat) - 1
self.location = gi(chromosome, pos1, end_loc)
self._stats["iterated_items", chromosome] += 1
yield Item(self.location, dat)
pos1 += self.step
[docs]
class TabixIterator(LocationIterator):
"""Iterates over a bgzipped + tabix-indexed file and returns location/tuple pairs.
Genomic locations will be parsed from the columns with given pos_indices and interval coordinates will be
converted to 1-based inclusive coordinates by adding values from the configured coord_inc tuple to start and end
coordinates. Note that this class serves as super-class for various file format specific anno_its (e.g.,
BedIterator, VcfIterator, etc.) which use proper coord_inc/pos_index default values.
Parameters
----------
tabix_file : str or pysam.TabixFile
A file path or an open TabixFile object. In the latter case, the object will not be closed
region : gi or str
optional, if set, only features overlapping with this region will be iterated
fun_alias : Callable
optional, if set, the iterator will use this function for aliasing chromosome names
per_position : bool
optional, if True, the iterator will yield per-position items (e.g., FASTA files)
coord_inc : tuple
optional, a tuple of values to be added to start and end coordinates
pos_indices : tuple
optional, a tuple of column indices for the interval start, end and chromosome
refdict : RefDict
optional, if set, the iterator will use the passed reference dict instead of reading it from the file
Notes
-----
* stats:
iterated_items, chromosome: (int, str)
Number of iterated/yielded items
* TODO
- add slop
- improve docs
"""
def __init__(
self,
tabix_file,
region=None,
fun_alias=None,
per_position=False,
coord_inc=(0, 0),
pos_indices=(0, 1, 2),
refdict=None,
):
super().__init__(
file=tabix_file,
region=region,
file_format="tsv",
per_position=per_position,
fun_alias=fun_alias, # e.g., toggle_chr
refdict=refdict,
)
self.coord_inc = coord_inc
self.pos_indices = pos_indices
[docs]
def max_items(self):
"""Maximum number of items yielded by this iterator or None if unknown."""
return None
def __iter__(self) -> Item:
for chromosome in self.chromosomes:
if chromosome not in self.file.contigs:
self.stats["empty_chromosomes"] += 1
continue
for row in self.file.fetch(
reference=chromosome,
start=(self.region.start - 1) if (self.region.start > 0) else None,
# 0-based coordinates in pysam!
end=self.region.end if (self.region.end < MAX_INT) else None,
parser=pysam.asTuple(),
): # @UndefinedVariable
chromosome = self.refdict.alias(row[self.pos_indices[0]])
start = int(row[self.pos_indices[1]]) + self.coord_inc[0]
end = int(row[self.pos_indices[2]]) + self.coord_inc[1]
self.location = gi(chromosome, start, end)
self._stats["iterated_items", chromosome] += 1
yield Item(self.location, tuple(row))
[docs]
class BedGraphIterator(TabixIterator):
"""
Iterates a bgzipped and indexed bedgraph file and yields float values
If a strand is passed, all yielded intervals will have this strand assigned.
Parameters
----------
bedgraph_file : str or pysam.TabixFile
A file path or an open TabixFile object. In the latter case, the object will not be closed
region : gi or str
optional, if set, only features overlapping with this region will be iterated
fun_alias : Callable
optional, if set, the iterator will use this function for aliasing chromosome names
strand : str
optional, if set, all yielded intervals will have this strand assigned
refdict : RefDict
optional, if set, the iterator will use the passed reference dict instead of reading it from the file
Notes
-----
* reported stats:
yielded_items, chromosome: (int, str)
Number of yielded items (remaining are filtered, e.g., due to region constraints)
"""
def __init__(
self, bedgraph_file, region=None, fun_alias=None, strand=None, refdict=None
):
super().__init__(
tabix_file=bedgraph_file,
region=region,
per_position=False,
fun_alias=fun_alias,
coord_inc=(1, 0),
pos_indices=(0, 1, 2),
refdict=refdict,
)
self.strand = strand
def __iter__(self) -> Item:
for loc, t in super().__iter__():
self._stats["yielded_items", loc.chromosome] += 1
if self.strand:
loc = loc.get_stranded(self.strand)
yield Item(loc, float(t[3]))
[docs]
@dataclass
class BedRecord:
"""
Parsed and mutable version of pysam.BedProxy
See https://genome.ucsc.edu/FAQ/FAQformat.html#format1 for BED format details
"""
name: str
score: int
location: gi
thick_start: int
thick_end: int
item_rgb: str
block_count: int
block_sizes: List[int]
block_starts: List[int]
def __init__(self, tup, refdict=None):
super().__init__()
self.name = tup[3] if len(tup) >= 4 else None
self.score = int(tup[4]) if len(tup) >= 5 and tup[4] != "." else None
strand = tup[5] if len(tup) >= 6 else None
chromosome = tup[0] if refdict is None else refdict.alias(tup[0])
self.location = gi(
chromosome, int(tup[1]) + 1, int(tup[2]), strand
) # convert -based to 1-based start
self.thick_start = int(tup[6]) + 1 if len(tup) >= 7 else None
self.thick_end = int(tup[7]) if len(tup) >= 8 else None
self.item_rgb = tup[8] if len(tup) >= 9 else None
self.block_count = int(tup[9]) if len(tup) >= 10 else None
self.block_sizes = (
[int(x) for x in tup[10].split(",") if x != ""] if len(tup) >= 11 else None
)
self.block_starts = (
[int(x) for x in tup[11].split(",") if x != ""] if len(tup) >= 12 else None
)
def __repr__(self):
return f"{self.location.chromosome}:{self.location.start}-{self.location.end} ({self.name})"
def __len__(self):
"""Reports the length of the wrapped location, not the current tuple"""
return self.location.__len__()
[docs]
class BedIterator(TabixIterator):
"""
Iterates a BED file and yields 1-based coordinates and pysam BedProxy objects
@see https://pysam.readthedocs.io/en/latest/api.html#pysam.asBed
Parameters
----------
bed_file : str or pysam.TabixFile
A file path or an open TabixFile object. In the latter case, the object will not be closed
region : gi or str
optional, if set, only features overlapping with this region will be iterated
fun_alias : Callable
optional, if set, the iterator will use this function for aliasing chromosome names
Notes
-----
* NOTE that empty intervals (i.e., start==stop coordinate) will not be iterated.
* reported stats:
yielded_items, chromosome: (int, str)
Number of yielded items (remaining are filtered, e.g., due to region constraints)
"""
def __init__(self, bed_file, region=None, fun_alias=None):
assert (
guess_file_format(bed_file) == "bed"
), f"expected BED file but guessed file format is {guess_file_format(bed_file)}"
super().__init__(
tabix_file=bed_file,
region=region,
per_position=False,
fun_alias=fun_alias,
coord_inc=(1, 0),
pos_indices=(0, 1, 2),
)
def __iter__(self) -> Item[gi, BedRecord]:
for chromosome in self.chromosomes:
if chromosome not in self.file.contigs:
self.stats["empty_chromosomes"] += 1
continue
for bed in self.file.fetch(
reference=chromosome,
start=(self.region.start - 1) if (self.region.start > 0) else None,
# 0-based coordinates in pysam!
end=self.region.end if (self.region.end < MAX_INT) else None,
parser=pysam.asTuple(),
): # @UndefinedVariable
rec = BedRecord(tuple(bed), refdict=self.refdict) # parse bed record
self.location = rec.location
self._stats["yielded_items", chromosome] += 1
yield Item(rec.location, rec)
[docs]
@dataclass
class BigBedRecord:
"""
Parsed BigBed record
See https://github.com/deeptools/pyBigWig/tree/master for format details
"""
name: str
score: float
location: gi
thick_start: int
level: float
signif: float
score2: int
def __init__(self, tup, refdict=None):
super().__init__()
self.name = tup[3] if len(tup) >= 4 else None
self.score = float(tup[4]) if len(tup) >= 5 and tup[4] != "." else None
strand = tup[5] if len(tup) >= 6 else None
chromosome = tup[0] if refdict is None else refdict.alias(tup[0])
self.location = gi(
chromosome, int(tup[1]) + 1, int(tup[2]), strand
) # convert -based to 1-based start
self.level = float(tup[6]) + 1 if len(tup) >= 7 else None
self.signif = float(tup[7]) if len(tup) >= 8 else None
self.score2 = int(tup[8]) if len(tup) >= 9 else None
def __repr__(self):
return f"{self.location.chromosome}:{self.location.start}-{self.location.end} ({self.name})"
def __len__(self):
"""Reports the length of the wrapped location, not the current tuple"""
return self.location.__len__()
[docs]
class BigBedIterator(LocationIterator):
"""Iterates over a BigBed file via pyBigWig.
Parameters
----------
file : str, PathLike or pyBigWig
The pyBigWig file to iterate over
region : gi or str
optional, if set, only features overlapping with this region will be iterated
fun_alias : Callable
optional, if set, the iterator will use this function for aliasing chromosome names
"""
[docs]
def max_items(self):
"""Maximum number of items yielded by this iterator or None if unknown."""
return None
def __init__(self, file, region=None, fun_alias=None):
if isinstance(file, (str, PathLike)):
assert (
guess_file_format(file) == "bigbed"
), f"expected BigBed file but guessed file format is {guess_file_format(file)}"
super().__init__(
file=file, region=region, fun_alias=fun_alias, per_position=False
)
assert (
self.file.isBigBed() == 1
), f"Wrong file format. Is this actually a BigWig file?"
def __iter__(self) -> Item[gi, float]:
for chromosome in self.chromosomes:
# set start to 1 if it is zero (i.e. unbounded))
start = self.region.start if self.region.start > 0 else 1
# if end is not set, the locationIterator will use MAXINT as end but this does not work with BigWig files.
# So here, use max chromosome length if end is not set
end = (
self.region.end
if self.region.end < MAX_INT
else self.refdict[chromosome]
)
# use pyBigWig's entries fetcher
for s, e, bed_str in self.file.entries(chromosome, start - 1, end):
bed_tup = (self.refdict.alias(chromosome), s + 1, e) + tuple(
bed_str.split("\t")
)
rec = BigBedRecord(bed_tup, refdict=self.refdict) # parse bigbed record
self.location = rec.location
self._stats["iterated_items", self.location.chromosome] += 1
yield Item(self.location, rec)
[docs]
class BigWigIterator(LocationIterator):
"""Iterates over a pyBigWig object.
If per_position is True, the iterator will yield per-position values, otherwise it will yield interval values.
Parameters
----------
file : str, PathLike or pyBigWig
The pyBigWig file to iterate over
region : gi or str
optional, if set, only features overlapping with this region will be iterated
fun_alias : Callable
optional, if set, the iterator will use this function for aliasing chromosome names
strand : str
optional, if set, all yielded intervals will have this strand assigned
per_position : bool
optional, if True, the iterator will yield per-position values, otherwise it will yield interval values
Notes
-----
Note that you can also access remote files, but this is slow and not recommended unless files are small.
Example:
>>> # iterate a remote bigwig file and report intervals. This file is ~100M
>>> import itertools
>>> uri = 'https://hgdownload.soe.ucsc.edu/goldenPath/hg19/encodeDCC/wgEncodeMapability/wgEncodeCrgMapabilityAlign100mer.bigWig' # noqa
>>> for loc, val in itertools.islice(rna.it(uri),0,10):
>>> display(f"{loc}: {val}")
"""
[docs]
def max_items(self):
"""Maximum number of items yielded by this iterator or None if unknown."""
return None
def __init__(
self, file, region=None, fun_alias=None, per_position=False, strand=None
):
if isinstance(file, (str, PathLike)):
assert (
guess_file_format(file) == "bigwig"
), f"expected BigWig file but guessed file format is {guess_file_format(file)}"
super().__init__(
file=file, region=region, fun_alias=fun_alias, per_position=per_position
)
self.strand = strand
assert (
self.file.isBigWig() == 1
), f"Wrong file format. Is this actually a BigBed file?"
def __iter__(self) -> Item[gi, float]:
for chromosome in self.chromosomes:
# set start to 1 if it is zero (i.e. unbounded))
start = self.region.start if self.region.start > 0 else 1
# if end is not set, the locationIterator will use MAXINT as end but this does not work with BigWig files.
# So here, use max chromosome length if end is not set
end = (
self.region.end
if self.region.end < MAX_INT
else self.refdict[chromosome]
)
if self.per_position:
# use pyBigWig's values fetcher
for off, value in enumerate(
self.file.values(chromosome, start - 1, end, numpy=True)
):
self.location = gi(
self.refdict.alias(chromosome),
start + off,
start + off,
strand=self.strand,
)
self._stats["iterated_items", chromosome] += 1
if math.isnan(value):
value = None
yield Item(self.location, value)
else:
# use pyBigWig's interval fetcher
ivals = self.file.intervals(
chromosome, start - 1, end
) # returns None if no intervals in the given
# range
if ivals is not None:
for s, e, value in ivals:
self.location = gi(
self.refdict.alias(chromosome), s + 1, e, strand=self.strand
)
self._stats["iterated_items", self.location.chromosome] += 1
if math.isnan(value):
value = None
yield Item(self.location, value)
[docs]
@dataclass
class VcfRecord:
"""
Parsed version of `pysam VCFProxy`, no type conversions for performance reasons.
Attributes
----------
location: gi
genomic interval representing this record
pos: int
1-based genomic (start) position. For deletions, this is the first deleted genomic position
is_indel: bool
True if this is an INDEL
ref/alt: str
reference/alternate allele string
qual: float
variant call quality
info: dict
dict of info fields/values
genotype (per-sample) dicts:
for each FORMAT field (including 'GT'), a {sample_id: value} dict will be created
zyg: dict
zygosity information per sample. Created by mapping genotypes to zygosity values using gt2zyg()
(0=nocall, 1=heterozygous call, 2=homozygous call).
n_calls: int
number of called alleles (among all considered samples)
Attributes can be accessed via dot notation, f no such attribute exists,, None will be returned.
Notes
-----
@see https://samtools.github.io/hts-specs/VCFv4.2.pdf
"""
location: gi
pos: int
id: str
filter: str
ref: str
alt: str
qual: float
info: dict
format: List[str]
def __init__(self, pysam_var, samples, sample_indices, formats, refdict):
def parse_info(info):
if info == ".":
return None
ret = {}
for s in info.split(";"):
s = s.strip().split("=")
if len(s) == 1:
ret[s[0]] = True
elif len(s) == 2:
ret[s[0]] = s[1]
return ret
self.id = pysam_var.id if pysam_var.id != "." else None
self.ref = pysam_var.ref
self.alt = (
pysam_var.alt.split(",")[0] if "," in pysam_var.alt else pysam_var.alt
) # use first alt only for now
if self.alt == "<*>": # treat as homozygous reference site
self.alt = self.ref
self.filter = None if pysam_var.filter in ['.', 'PASS'] else pysam_var.filter
self.qual = pysam_var.qual if pysam_var.qual != "." else None
self.info = parse_info(pysam_var.info)
if (len(self.ref) == 1) and (len(self.alt) == 1):
self.is_indel = False
self.is_sv = False
start, end = pysam_var.pos + 1, pysam_var.pos + 1 # 0-based in pysam
elif self.alt[0] == "<": # SV (structural variant)
self.is_indel = False
self.is_sv = True
self.svtype = self.info.get("SVTYPE", "UNK")
start, end = pysam_var.pos + 1, int(
self.info.get("END", -1)
) # TODO: support complex SVs, CHR2, etc.
else: # INDELs
self.is_indel = True
self.is_sv = False
if len(self.ref) > len(self.alt):
# deletion: the returned location contains all deleted bases
start, end = pysam_var.pos + 2, pysam_var.pos + len(self.ref)
else:
# insertion: the returned location selects the reference nucleotide before the insertion
start, end = pysam_var.pos + 1, pysam_var.pos + 1
self.pos = start
self.location = gi(
refdict.alias(pysam_var.contig), start, end, None
) # noinspection PyTypeChecker
if len(formats) > 0: # formats was read from header
self.format = pysam_var.format.split(":") # may break
for col, x in enumerate(self.format): # make faster
self.__dict__[x] = {
k: v
for k, v in zip(
[samples[i] for i in sample_indices],
[pysam_var[i].split(":")[col] for i in sample_indices],
)
}
# calc zygosity per call
if "GT" in self.__dict__:
zyg, calls = zip(*map(gt2zyg, self.__dict__["GT"].values()))
self.zyg = {
k: v for k, v in zip(self.__dict__["GT"].keys(), zyg)
} # sample: <0,1,2> (0=nocall, 1=het, 2=hom)
self.n_calls = sum(calls)
def __repr__(self):
return f"{self.location.chromosome}:{self.pos}{self.ref}>{self.alt}"
def __getattr__(self, attr):
"""Returns None if attribute does not exist"""
if attr in self.__dict__:
return self.__dict__[attr]
if attr in self.info:
return self.info[attr]
return None
[docs]
class VcfIterator(TabixIterator):
"""
Iterates a VCF file and yields 1-based coordinates and VcfRecord objects (wrapping pysam VcfProxy object)
@see https://pysam.readthedocs.io/en/latest/api.html#pysam.asVCF
Besides coordinate-related filtering, users cam also pass a list of sample ids to be considered (samples).
The iterator will by default report only records with at least 1 call in one of the configured samples.
To force reporting of all records, set filter_nocalls=False.
Parameters
----------
vcf_file : str or pysam.VariantFile
A file path or an open VariantFile object. In the latter case, the object will not be closed
region : gi or str
optional, if set, only features overlapping with this region will be iterated
fun_alias : Callable
optional, if set, the iterator will use this function for aliasing chromosome names
samples : list
optional, if set, only records with calls in these samples will be reported
filter_nopass : bool
if True (default), the iterator will report only records with the PASS filter flag
filter_nocalls : bool
if True (default), the iterator will report only records with at least 1 call in one of the configured
samples.
Attributes
----------
header:
pysam VariantFile header
allsamples: list
list of all contained samples
shownsampleindices:
indices of all configured samples
Notes
-----
* reported stats:
iterated_items, chromosome: (int, str)
Number of iterated items
filtered_nopass_{filter}, chromosome: (int, str)
Number of filtered non-PASS records (filter is the filter value)
filtered_nocalls, chromosome: (int, str)
Number of filtered no-call records
yielded_items, chromosome: (int, str)
Number of yielded items (remaining are filtered, e.g., due to region constraints)
"""
def __init__(
self,
vcf_file,
region=None,
fun_alias=None,
samples=None,
filter_nopass: bool = True,
filter_nocalls: bool = True,
):
if isinstance(vcf_file, str):
assert (
guess_file_format(vcf_file) == "vcf"
), f"expected VCF file but guessed file format is {guess_file_format(vcf_file)}"
# pass refdict extracted from VCF header, otherwise it is read from tabix index which would contain only the
# chroms that are contained in the actual file
super().__init__(
tabix_file=vcf_file,
region=region,
per_position=True,
fun_alias=fun_alias,
coord_inc=(0, 0),
pos_indices=(0, 1, 1),
refdict=RefDict.load(vcf_file, fun_alias),
)
# get header
self.header = pysam.VariantFile(vcf_file).header # @UndefinedVariable
self.allsamples = list(
self.header.samples
) # list of all samples in this VCF file
self.shownsampleindices = (
[i for i, j in enumerate(self.header.samples) if j in samples]
if (samples is not None)
else range(len(self.allsamples))
) # list of all sample indices to be considered
self.filter_nopass = filter_nopass
self.filter_nocalls = filter_nocalls
self.formats = list(self.header.formats)
def __iter__(self) -> Item[gi, VcfRecord]:
for chromosome in self.chromosomes:
if chromosome not in self.file.contigs:
self.stats["empty_chromosomes", "all"] += 1
continue
for (
pysam_var
) in self.file.fetch( # NOTE that we use a pysam Tabixfile here for performance purposes
reference=chromosome,
start=(self.region.start - 1) if (self.region.start > 0) else None,
# 0-based coordinates in pysam!
end=self.region.end if (self.region.end < MAX_INT) else None,
parser=pysam.asVCF(),
): # @UndefinedVariable
rec = VcfRecord(
pysam_var,
self.allsamples,
self.shownsampleindices,
self.formats,
self.refdict,
)
self.location = rec.location
chromosome = self.location.chromosome
self._stats["iterated_items", chromosome] += 1
if self.filter_nopass and (rec.filter is not None):
self._stats[f"filtered_nopass_{rec.filter}", chromosome] += 1
continue
if (
("n_calls" in rec.__dict__)
and self.filter_nocalls
and (rec.n_calls == 0)
):
self._stats["filtered_nocalls", chromosome] += 1 # filter no-calls
continue
self._stats["yielded_items", chromosome] += 1
yield Item(self.location, rec)
[docs]
class GFF3Iterator(TabixIterator):
"""
Iterates a GTF/GFF3 file and yields 1-based coordinates and dicts containing key/value pairs parsed from
the respective info sections. The feature_type, source, score and phase fields from the GFF/GTF entries are
copied to this dict (NOTE: attribute fields with the same name will be overloaded).
@see https://pysam.readthedocs.io/en/latest/api.html#pysam.asGTF
This iterator is used to build a transcriptome object from a GFF3/GTF file.
Parameters
----------
gtf_file : str or pysam.TabixFile
A file path or an open TabixFile object. In the latter case, the object will not be closed
region : gi or str
optional, if set, only features overlapping with this region will be iterated
fun_alias : Callable
optional, if set, the iterator will use this function for aliasing chromosome names
Notes
-----
* reported stats:
yielded_items, chromosome: (int, str)
Number of iterated/yielded items
"""
def __init__(self, gtf_file, region=None, fun_alias=None, feature_types=None):
super().__init__(
tabix_file=gtf_file,
region=region,
per_position=False,
fun_alias=fun_alias,
coord_inc=(0, 0),
pos_indices=(0, 1, 2),
)
self.feature_types = (
to_set(feature_types) if feature_types is not None else None
)
self.file_format = guess_file_format(gtf_file)
assert self.file_format in [
"gtf",
"gff",
], f"expected GFF3/GTF file but guessed file format is {self.file_format}"
def __iter__(self) -> Item[gi, dict]:
for chromosome in self.chromosomes:
if chromosome not in self.file.contigs:
self.stats["empty_chromosomes"] += 1
continue
for row in self.file.fetch(
reference=chromosome,
start=(self.region.start - 1) if (self.region.start > 0) else None,
# 0-based coordinates in pysam!
end=self.region.end if (self.region.end < MAX_INT) else None,
parser=pysam.asTuple(),
): # @UndefinedVariable
(
chromosome,
source,
feature_type,
start,
end,
score,
strand,
phase,
info,
) = row
if (self.feature_types is not None) and (
feature_type not in self.feature_types
):
continue
self.location = gi(
self.refdict.alias(chromosome),
int(start) + self.coord_inc[0],
int(end) + self.coord_inc[1],
strand,
)
info = parse_gff_attributes(info, self.file_format)
info["feature_type"] = None if feature_type == "." else feature_type
info["source"] = None if source == "." else source
info["score"] = None if score == "." else float(score)
info["phase"] = None if phase == "." else int(phase)
self._stats["yielded_items", chromosome] += 1
yield Item(self.location, info)
[docs]
class PandasIterator(LocationIterator):
"""Iterates over a pandas dataframe that contains three columns with chromosome/start/end coordinates.
Notable subclasses are PyrangesIterator and BioframeIterator.
If the passed dataframe is not coordinate sorted, is_sorted must be set to False.
Parameters
----------
df : dataframe
pandas datafram with at least 4 columns names as in coord_columns and feature parameter. This dataframe
will be sorted by chromosome and start values unless is_sorted is set to True
feature : str
Name of column to yield. If null, the whole row will be yielded
region : GI or str
optional, if set, only features overlapping with this region will be iterated
coord_columns : list
Names of coordinate columns, default: ['Chromosome', 'Start', 'End', 'Strand']
is_sorted : bool
optional, if set, the iterator will assume that the dataframe is already sorted by chromosome and start
coord_off : list
Coordinate offsets, default: (1, 0). These offsets will be added to the read start/end coordinates to
convert to the rnalib convention.
fun_alias : Callable
optional, if set, the iterator will use this function for aliasing chromosome names
calc_chromlen : bool
optional, if set, the iterator will calculate the maximum end coordinate for each chromosome and use this
information to create a reference dictionary
refdict : RefDict
optional, if set, the iterator will use the passed reference dict instead of estimating from the data
Yields
------
location: gi
Location object describing the current coordinates
value: any
The extracted feature value
Notes
-----
* Note, that iteration over dataframes is generally discouraged as there are much more efficient methods for
data manipulation.
* reported stats:
iterated_items, chromosome: (int, str)
Number of iterated items
yielded_items, chromosome: (int, str)
Number of yielded items (remaining are filtered, e.g., due to region constraints)
Examples
--------
Here is another (efficient) way to use a pandas iterator:
>>> gencode_gff = ... # a gff file
>>> with BioframeIterator(gencode_gff, chromosome='chr2') as lit: # create a filtered DF from the passed gff
>>> lit.df = lit.df.query("strand=='-' ") # further filter the dataframe with pandas
>>> print(Counter(lit.df['feature'])) # count minus strand features
>>> # you can also use this code to then iterate the data which may be convenient/readable if the
>>> # dataframe is small:
>>> for tx, row in lit: # now iterate with rnalib iterator
>>> # do something with location and pandas data row
"""
def __init__(
self,
df,
feature: str = None,
region: GI = None,
coord_columns: tuple = ("Chromosome", "Start", "End", "Strand"),
is_sorted=False,
coord_off=(0, 0),
fun_alias: Callable = None,
calc_chromlen=False,
refdict=None,
):
self._stats = Counter()
self.location = None
self.feature = feature
self.coord_columns = coord_columns
self.coord_off = coord_off
# get dataframe and sort if not sorted.
self.df = (
df
if is_sorted
else df.sort_values(
[coord_columns[0], coord_columns[1], coord_columns[2]]
).reset_index(drop=True)
)
# add strand column if not included
if coord_columns[3] not in self.df.columns:
self.df[coord_columns[3]] = "."
# apply chrom aliasing if any
if fun_alias is not None:
self.df[coord_columns[0]] = self.df[coord_columns[0]].apply(fun_alias)
self.chromosomes = list(
dict.fromkeys(self.df[coord_columns[0]])
) # unique set with preserved order
self.refdict = refdict
if self.refdict is None:
if calc_chromlen:
# group by chrom, calc max end coordinate and create refdict
self.refdict = RefDict(
self.df.groupby(coord_columns[0])[coord_columns[2]].max().to_dict()
)
else:
# refdict w/o chrom lengths
self.refdict = RefDict({c: None for c in self.chromosomes})
super().__init__(
file=None,
region=region,
per_position=False,
fun_alias=None, # already applied! don't set again
refdict=self.refdict,
) # reference dict exists
# filter dataframe for region. TODO: check whether coordinate filtering is exact
if (self.region is not None) and (not self.region.is_unbounded()):
logging.debug(f"filtering dataframe for region {self.region}")
filter_query = (
[]
if self.region.chromosome is None
else [f"{coord_columns[0]}==@self.region.chromosome"]
)
# overlap check: self.region.start <= other.end and other.start <= self.region.end
filter_query += (
[]
if self.region.end is None
else [f"{coord_columns[1]}<=(@self.region.end-@coord_off[0])"]
)
filter_query += (
[]
if self.region.start is None
else [f"{coord_columns[2]}>=(@self.region.start-@coord_off[1])"]
)
self.df = self.df.query("&".join(filter_query))
def __iter__(self) -> Item[gi, pd.Series]:
for row in self.df.itertuples():
chromosome = self.refdict.alias(getattr(row, self.coord_columns[0]))
# NOTE in df, coordinates are 0-based. Start is included, End is excluded.
start = getattr(row, self.coord_columns[1]) + self.coord_off[0]
end = getattr(row, self.coord_columns[2]) + self.coord_off[1]
strand = getattr(row, self.coord_columns[3], ".")
self.location = gi(chromosome, start, end, strand=strand)
self._stats["iterated_items", chromosome] += 1
if self.region.overlaps(self.location):
self._stats["yielded_items", chromosome] += 1
yield Item(
self.location,
row if self.feature is None else getattr(row, self.feature, None),
)
[docs]
def to_dataframe(self, **kwargs):
return self.df
[docs]
def max_items(self):
return len(self.df.index)
[docs]
class BioframeIterator(PandasIterator):
"""
Iterates over a [bioframe](https://bioframe.readthedocs.io/) dataframe.
The genomic coordinates of yielded locations are corrected automatically.
Parameters
----------
df : dataframe
bioframe dataframe with at least 4 columns names as in coord_columns and feature parameter. This dataframe
will be sorted by chromosome and start values unless is_sorted is set to True
feature : str
Name of column to yield. If null, the whole row will be yielded
region : gi or str
optional, if set, only features overlapping with this region will be iterated
coord_columns : list
Names of coordinate columns, default: ['chrom', 'start', 'end', 'strand']
is_sorted : bool
optional, if set, the iterator will assume that the dataframe is already sorted by chromosome and start
coord_off : list
Coordinate offsets, default: (1, 0). These offsets will be added to the read start/end coordinates to
convert to the rnalib convention.
calc_chromlen : bool
optional, if set, the iterator will calculate chromosome lengths from the data (if required)
schema : str
optional, if set, it is passed to by bioframe.read_table to parse the dataframe
fun_alias: Callable
optional, if set, the iterator will use this function for aliasing chromosome names
refdict: RefDict
optional, if set, the iterator will use this reference dictionary for chromosome aliasing
"""
def __init__(
self,
df,
feature: str = None,
region: GI = None,
is_sorted=False,
fun_alias=None,
schema=None,
coord_columns: tuple = ("chrom", "start", "end", "strand"),
coord_off=(1, 0),
calc_chromlen=False,
refdict=None,
):
if isinstance(df, str):
# assume a filename and read via bioframe read_table method and make sure that dtypes match
self.file = df
schema = guess_file_format(self.file) if schema is None else schema
schema = "bed" if schema == "bedgraph" else schema # map begraph->bed
df = bioframe.read_table(
self.file,
schema=guess_file_format(self.file) if schema is None else schema,
)
# ensure that chr is a string
df = df.astype({coord_columns[0]: str})
# filter the 'chrom' column for header lines and replace NaN's
df = df[~df.chrom.str.startswith("#", na=False)].replace(np.nan, ".")
# ensure proper dtypes for start/end/strand
df = df.astype({coord_columns[1]: int, coord_columns[2]: int})
if coord_columns[3] in df.columns:
df[coord_columns[3]] = df[coord_columns[3]].astype(str)
super().__init__(
df if is_sorted else bioframe.sort_bedframe(df),
feature=feature,
region=region,
coord_columns=coord_columns,
coord_off=coord_off, # coord correction
is_sorted=True, # we used bioframe for sorting above.
fun_alias=fun_alias, # chrom aliasing will be applied to df
calc_chromlen=calc_chromlen,
refdict=refdict,
)
[docs]
class PyrangesIterator(PandasIterator):
"""
Iterates over a [pyranges](https://pyranges.readthedocs.io/) object.
The genomic coordinates of yielded locations are corrected automatically.
Parameters
----------
probj : pyranges.PyRanges
A pyranges object
feature : str
Name of column to yield. If null, the whole row will be yielded
region : gi or str
optional, if set, only features overlapping with this region will be iterated
coord_columns : list
Names of coordinate columns, default: ['Chromosome', 'Start', 'End', 'Strand']
is_sorted : bool
optional, if set, the iterator will assume that the dataframe is already sorted by chromosome and start
fun_alias: Callable
optional, if set, the iterator will use this function for aliasing chromosome names
coord_off : list
Coordinate offsets, default: (1, 0). These offsets will be added to the read start/end coordinates to
convert to the rnalib convention.
calc_chromlen : bool
optional, if set, the iterator will calculate chromosome lengths from the data (if required)
refdict: RefDict
optional, if set, the iterator will use this reference dictionary for chromosome aliasing
"""
def __init__(
self,
probj,
feature=None,
region=None,
is_sorted=False,
fun_alias=None,
coord_columns=("Chromosome", "Start", "End", "Strand"),
coord_off=(1, 0),
calc_chromlen=False,
refdict=None,
):
if isinstance(probj, str):
# assume a filename and read via pyranges read_xxx method and make sure that dtypes match
self.file = probj
self.file_format = guess_file_format(self.file)
if self.file_format == "bed":
probj = pyranges.read_bed(self.file)
elif self.file_format == "gff":
probj = pyranges.read_gff(self.file)
elif self.file_format == "gtf":
probj = pyranges.read_gtf(self.file)
elif self.file_format == "bam":
probj = pyranges.read_bam(self.file)
else:
raise ValueError(f"Unsupported file format {self.file_format}")
if not is_sorted:
probj = probj.sort(["Chromosome", "Start", "End"])
self.probj = probj
super().__init__(
probj.df,
feature=feature,
region=region,
coord_columns=coord_columns,
coord_off=coord_off,
is_sorted=True,
fun_alias=fun_alias,
calc_chromlen=calc_chromlen,
refdict=refdict,
)
# ---------------------------------------------------------
# SAM/BAM anno_its
# ---------------------------------------------------------
[docs]
class ReadIterator(LocationIterator):
"""Iterates over a BAM alignment.
Parameters
----------
bam_file : str
BAM file name
region : gi or str
optional, if set, only features overlapping with this region will be iterated
file_format : str
optional, if set, the iterator will assume this file format (e.g., 'bam' or 'sam')
min_mapping_quality : int
Minimum mapping quality. If set, reads with lower mapping quality will be filtered.
flag_filter : int
Bitwise flag filter. If set, reads with any of the specified flags will be filtered.
tag_filters : list
List of TagFilter objects. If set, reads with any of the specified tags will be filtered.
max_span : int
Maximum alignment span. If set, reads with a larger alignment span will be filtered.
report_mismatches : bool
If set, this iterator will additionally yield (read, mismatches) tuples where 'mismatches' is a list of
(read_position, genomic_position, ref_allele, alt_allele) tuples that describe differences wrt. the
aligned reference sequence. This options requires MD BAM tags to be present.
min_base_quality : int
Useful only in combination with report_mismatches; filters mismatches based on minimum per-base quality
fun_alias : Callable
optional, if set, the iterator will use this function for aliasing chromosome names
include_unmapped : bool
If set, this iterator will also yield unmapped reads after all mapped ones.
Notes
-----
* Reported stats
iterated_items, chromosome: (int, str)
Number of iterated items
yielded_items, chromosome: (int, str)
Number of yielded items
n_fil_flag, chromosome:
Number of reads filtered due to a FLAG filter
n_fil_mq, chromosome:
Number of reads filtered due to low mapping quality
n_fil_tag chromosome:
Number of reads filtered due to a TAG filter
n_fil_max_span, chromosome:
Number of reads filtered due to exceeding a configured maximum alignment span
* If you need to non-default settings for opening the BAM file, you should open the pysam.AlignmentFile object
yourself and pass the file object to this iterator.
"""
def __init__(
self,
bam_file,
region=None,
file_format=None,
min_mapping_quality=0,
flag_filter=DEFAULT_FLAG_FILTER,
tag_filters=None,
max_span=None,
report_mismatches=False,
min_base_quality=0,
fun_alias=None,
include_unmapped=False,
):
super().__init__(
bam_file,
region=region,
file_format=file_format,
per_position=False,
fun_alias=fun_alias,
)
self.min_mapping_quality = min_mapping_quality
self.flag_filter = flag_filter
self.max_span = max_span
self.tag_filters = tag_filters
self.report_mismatches = report_mismatches
self.min_base_quality = min_base_quality
self.include_unmapped = include_unmapped
[docs]
def max_items(self):
"""Returns number of reads retrieved from the SAM/BAM index"""
return sum([x.total for x in self.file.get_index_statistics()])
def __iter__(self) -> Item[gi, pysam.AlignedSegment]:
md_check = False
for chromosome in self.chromosomes:
if chromosome not in self.file.references:
self.stats["empty_chromosomes"] += 1
continue
for r in self.file.fetch(
contig=chromosome,
start=self.region.start - 1 if (self.region.start > 0) else None,
end=self.region.end if (self.region.end < MAX_INT) else None,
until_eof=True,
):
self.location = gi(
self.refdict.alias(r.reference_name),
r.reference_start + 1,
r.reference_end,
"-" if r.is_reverse else "+",
)
self._stats["iterated_items", self.location.chromosome] += 1
if r.flag & self.flag_filter: # filter based on BAM flags
self._stats["n_fil_flag", self.location.chromosome] += 1
continue
if (
r.mapping_quality < self.min_mapping_quality
): # filter based on mapping quality
self._stats["n_fil_mq", self.location.chromosome] += 1
continue
if self.tag_filters is not None: # filter based on BAM tags
is_filtered = False
for tf in self.tag_filters:
is_filtered = is_filtered | tf.filter(r)
if is_filtered:
self._stats["n_fil_tag", self.location.chromosome] += 1
continue
# test max_span and drop reads that span larger genomic regions
if (self.max_span is not None) and (len(self.location) > self.max_span):
self._stats["n_fil_max_span", self.location.chromosome] += 1
continue
self._stats["yielded_items", self.location.chromosome] += 1
# report mismatches
if self.report_mismatches:
if not md_check:
assert r.has_tag(
"MD"
), "BAM does not contain MD tag: cannot report mismatches"
md_check = True
mm = [
(off, pos + 1, ref.upper(), r.query_sequence[off])
for (off, pos, ref) in r.get_aligned_pairs(
with_seq=True, matches_only=True
)
if ref.islower()
and r.query_qualities[off] >= self.min_base_quality
] # mask bases with low per-base quailty
yield Item(self.location, (r, mm)) # yield read/mismatch tuple
else:
yield Item(self.location, r)
if self.include_unmapped:
for r in self.file.fetch(until_eof=True):
if r.is_mapped:
continue
self.location = gi()
self._stats["unmapped_reads", "all"] += 1
if self.report_mismatches:
yield Item(self.location, (r, ())) # yield read/mismatch tuple
else:
yield Item(self.location, r)
[docs]
class ReadPair(NamedTuple):
"""namedtuple for paired reads"""
r1: pysam.AlignedSegment
r2: pysam.AlignedSegment
mm1: List[Tuple[int, int, str, str]]
mm2: List[Tuple[int, int, str, str]]
[docs]
class PairedReadIterator(ReadIterator):
"""
Iterates over a BAM alignment and yields paired reads as readpair objects.
This iterator is used to extract paired reads from a BAM file. It is assumed that the input BAM file is coordinate
sorted and that the reads are paired.
This iterator stores incomplete pairs in a dictionary and yields them when the second read is found. This approach
is more memory efficient than the naive approach of loading all reads into memory and then pairing them. It may,
however, still result in high memory consumption if the input BAM file contains many orphaned reads or pairs with
large insert sizes. It is thus recommended to use this iterator with a region parameter to limit the number of
reads that are loaded into memory.
Parameters
----------
bam_file : str
BAM file name
region : gi or str
optional, if set, only features overlapping with this region will be iterated
file_format : str
optional, if set, the iterator will assume this file format (e.g., 'bam' or 'sam')
min_mapping_quality : int
Minimum mapping quality. If set, reads with lower mapping quality will be filtered.
flag_filter : int
Bitwise flag filter. If set, reads with any of the specified flags will be filtered.
tag_filters : list
List of TagFilter objects. If set, reads with any of the specified tags will be filtered.
max_span : int
Maximum alignment span. If set, reads with a larger alignment span will be filtered.
report_mismatches : bool
If set, this iterator will additionally yield (read, mismatches) tuples where 'mismatches' is a list of
(read_position, genomic_position, ref_allele, alt_allele) tuples that describe differences wrt. the
aligned reference sequence. This options requires MD BAM tags to be present.
min_base_quality : int
Useful only in combination with report_mismatches; filters mismatches based on minimum per-base quality
fun_alias : Callable
optional, if set, the iterator will use this function for aliasing chromosome names
filter_pcr_duplicates : bool
If set, this iterator will filter PCR duplicates based on the read sequence and same start/end coordinates.
Returns
-------
Item[gi, readpair]
The (merged) location of the read pair and a readpair namedtuple that contains the two reads as
pysam.AlignedSegment objects. If report_mismatches is set, mm1 and mm2 contain the mismatch lists of the
two mates. Note that r1 always contains read1 and r2 always read2. Both can be None for orphaned reads
(e.g., if the mate was not mapped or maps outside the region).
Notes
-----
* Reported stats (additional to ReadIterator stats)
* ("found_pairs", "complete"): number of complete read pairs found
* ("found_pairs", "incomplete"): number of incomplete (i.e., read1 or read2 missing) read pairs found
* ("filtered_duplicates", chromosome): number of filtered PCR duplicates per chromosome
"""
def __init__(
self,
bam_file,
region,
file_format=None,
min_mapping_quality=0,
flag_filter=DEFAULT_FLAG_FILTER,
tag_filters=None,
max_span=None,
report_mismatches=False,
min_base_quality=0,
fun_alias=None,
filter_pcr_duplicates=False,
):
super().__init__(
bam_file=bam_file,
region=region,
file_format=file_format,
min_mapping_quality=min_mapping_quality,
flag_filter=flag_filter,
tag_filters=tag_filters,
max_span=max_span,
report_mismatches=report_mismatches,
min_base_quality=min_base_quality,
fun_alias=fun_alias,
include_unmapped=False,
) # no unmapped reads are reported
self.filter_pcr_duplicates = filter_pcr_duplicates
if region is None:
logging.warning(
"running PairedReadIterator without location. This might result in high memory "
"consumption and is not recommended"
)
def __iter__(self) -> Item[gi, ReadPair]:
pairs = defaultdict(list)
out_queue: List[(gi, ReadPair)] = list()
for loc, r in super().__iter__():
mm = None
if self.report_mismatches:
r, mm = r
read_name = r.query_name
pairs[read_name].append((loc, r, mm))
if len(pairs[read_name]) == 2:
(l1, r1, mm1), (l2, r2, mm2) = pairs[read_name]
del pairs[read_name]
# check chrom
if l1.chromosome == l2.chromosome:
loc = gi(
l1.chromosome,
min(l1.start, l2.start),
max(l1.end, l2.end),
'-' if r1.is_reverse else '+',
)
if r1.is_read2: # always report read1 first
r1, r2 = r2, r1
mm1, mm2 = mm2, mm1
out_queue.append((loc, ReadPair(r1, r2, mm1, mm2)))
# add orphaned reads
self._stats["found_pairs", "complete"] = len(out_queue)
self._stats["found_pairs", "incomplete"] = len(pairs)
for read_name in pairs:
(loc, r1, mm1) = pairs[read_name][0]
r2, mm2 = None, None
if r1.is_read2: # always report read1 first
r1, r2 = r2, r1
mm1, mm2 = mm2, mm1
out_queue.append((loc, ReadPair(r1, r2, mm1, mm2)))
last_left, dups = None, set()
for loc, p in out_queue:
if self.filter_pcr_duplicates:
if (last_left is None) or (
last_left < loc.left_pos()
): # find duplicates
last_left = loc.left_pos()
dups = set()
else:
key = ("" if (p.r1 is None) else p.r1.query_sequence) + \
("" if (p.r2 is None) else p.r2.query_sequence)
if (key != "") and (key in dups):
self._stats["filtered_duplicates", loc.chromosome] += 1
continue
dups.add(key)
yield Item(loc, p)
# if self.report_mismatches:
# yield Item(p.loc, (p.r1, p.r2, p.mm1, p.mm2))
# else:
# yield Item(p.loc, (p.r1, p.r2))
[docs]
class FastPileupIterator(LocationIterator):
"""
Fast pileup iterator that yields a complete pileup (w/o insertions) over a set of genomic positions. This is
more lightweight and considerably faster than `pysam's pileup()` but lacks some features (such as
`ignore_overlaps` or `ignore_orphans`).
By default, it basically reports what is seen in the default IGV view (using, e.g., the same read flag filter).
This iterator uses a ReadIterator to iterate the BAM file and then builds a pileup from the reads that overlap
the reported positions. The pileup is a dict with genomic positions as keys and a Counter object as values. The
Counter object contains the counts of each base at the respective position.
Parameters
----------
bam_file : str
BAM file name
chromosome : str
Chromosome name. If set, only this chromosome will be iterated and reported_positions will be derived from
the respective parameter. Note, that either chromosome and reported_positions or region must be set.
reported_positions : range or set
Range or set of genomic positions for which counts will be reported. The chromosome is derived from the
respective parameter. Note, that either chromosome and reported_positions or region must be set.
region : gi
Genomic region. If set, chromosome and reported_positions will be derived from this region.
Note, that either chromosome and reported_positions or region must be set.
file_format : str
optional, if set, the iterator will assume this file format (e.g., 'bam' or 'sam')
min_mapping_quality : int
Filters pileup reads based on minimum mapping quality
flag_filter : int
Filters pileup reads based on read flags (see utils.BamFlag for details)
tag_filters : list
Filters pileup reads based on BAM tags (see utils.TagFilter for details)
max_span : int
Restricts maximum pileup depth.
min_base_quality : int
Filters pileup based on minimum per-base quality
fun_alias : function
Optional function for aliasing chromosome names
Returns
-------
A <base>:<count> Counter object. Base is 'None' for deletions.
Notes
-----
* initial performance tests that used a synchronized iterator over a FASTA and 2 BAMs showed ~50kpos/sec
* reported stats:
iterated_items, chromosome: (int, str)
Number of iterated items (reads)
yielded_items, chromosome: (int, str)
Number of yielded items (positions)
Examples
--------
>>> pileup_data = rna.it("test.bam", style="pileup", region=gi('chr1:1-100')).to_list()
>>> rna.it("test.bam", style='pileup',
>>> region='1:22418244-22418244', # pileup single position
>>> min_mapping_quality=30, flag_filter=0, min_base_quality=40, # read filtering
>>> tag_filters=[rna.TagFilter('NM', [0], True, inverse=True)], # include only reads w/o mismatches
>>> ).to_list()
"""
def __init__(
self,
bam_file,
chromosome: str = None,
reported_positions: set = None,
region: GI = None,
file_format: str = None,
min_mapping_quality: int = 0,
flag_filter: int = DEFAULT_FLAG_FILTER,
tag_filters: list[TagFilter] = None,
max_span: int = None,
min_base_quality: int = 0,
fun_alias=None,
strand_specific: bool = False,
):
self.file = bam_file
self.location = None
self.per_position = False
self._stats = Counter()
if chromosome is None: # get from region parameter
# assert that region is set
assert (region is not None) and (
reported_positions is None
), "Either chromosome and reported_positions or region must be set"
if not isinstance(region, (GI, Feature)):
region = gi(region)
assert not region.is_unbounded(), "region must be bounded"
assert len(region) < 1e7, f"region {region} is too large for pileup"
reported_positions = {p.start for p in region} # a set of positions
chromosome = region.chromosome
else:
assert (
reported_positions is not None
), "Either chromosome and reported_positions or region must be set"
if isinstance(reported_positions, range):
reported_positions = set(reported_positions)
elif not isinstance(reported_positions, set):
warnings.warn(
"reported_positions should be a tuple(start, end) or a set() to avoid slow processing"
)
# set iterated region and chromosomes
self.reported_positions = reported_positions
self.region = gi(chromosome, min(reported_positions), max(reported_positions))
self.chromosomes = (chromosome,)
# pileup counter dict
self.count_dict = defaultdict(Counter)
# ReadIterator options
self.file_format = file_format
self.min_mapping_quality = min_mapping_quality
self.flag_filter = flag_filter
self.tag_filters = tag_filters
self.max_span = max_span
self.min_base_quality = min_base_quality
self.strand_specific = strand_specific
super().__init__(
bam_file,
region=self.region,
file_format=self.file_format,
per_position=True,
fun_alias=fun_alias,
)
[docs]
def max_items(self):
"""Maximum number of items yielded by this iterator or None if unknown."""
return len(self.reported_positions)
def __iter__(self) -> Item[gi, Counter]:
# there is only 1 chromosome, no need to iterate
with ReadIterator(
bam_file=self.file,
region=self.region,
min_mapping_quality=self.min_mapping_quality,
flag_filter=self.flag_filter,
tag_filters=self.tag_filters,
max_span=self.max_span,
min_base_quality=self.min_base_quality,
fun_alias=self.fun_alias,
) as rit:
for loc, r in rit:
strand = (
'-' if (not r.is_reverse if r.is_read2 else r.is_reverse) else '+'
)
self._stats["iterated_items", loc.chromosome] += 1
# find reportable positions, skips soft-clipped bases
gpos = r.reference_start + 1
rpos = 0
for op, l in r.cigartuples:
if op in [0, 7, 8]: # M, =, X
for _ in range(l):
if gpos in self.reported_positions:
if (
r.query_qualities[rpos] >= self.min_base_quality
): # check base qual
if not self.count_dict[gpos]:
self.count_dict[gpos] = Counter()
if self.strand_specific:
self.count_dict[gpos][
r.query_sequence[rpos], strand
] += 1
else:
self.count_dict[gpos][
r.query_sequence[rpos]
] += 1
rpos += 1
gpos += 1
elif op == 1: # I
rpos += l
elif op == 2: # D
for gpos in range(gpos, gpos + l):
if gpos in self.reported_positions:
if not self.count_dict[gpos]:
self.count_dict[gpos] = Counter()
if self.strand_specific:
self.count_dict[gpos][None, strand] += 1
else:
self.count_dict[gpos][None] += 1
gpos += 1
elif op == 4: # S
rpos += l
elif op == 3: # N
gpos += l
elif op in [5, 6]: # H, P
pass
else:
warnings.warn("unsupported CIGAR op %i" % op)
# yield all reported positions (including uncovered ones)
for gpos in self.reported_positions:
self.location = gi(self.region.chromosome, gpos, gpos)
self._stats["yielded_items", self.region.chromosome] += 1
yield Item(
self.location,
self.count_dict[gpos] if gpos in self.count_dict else Counter(),
)
# ---------------------------------------------------------
# grouped anno_its
# ---------------------------------------------------------
[docs]
class GroupedLocationIterator(LocationIterator):
"""
Wraps another location iterator and yields groups of items sharing (parts of) the same location
given a matching strategy (e.g., same start, same end, same coords, overlapping).
The iterator yields tuples of (merged) group location and a (locations, items) tuple containing lists of
locations/items yielded from the wrapped iterator.
This iterator is useful for grouping features that share ovelapping genomic locations (e.g., overlapping
features, features with shared start coordinates, etc.).
It is typically instantiated from an existing LocationIterator via its `group()` method.
Parameters
----------
lit : LocationIterator
The wrapped location iterator
strategy : str
The grouping strategy to use: left (start coordinate match), right (end coordinate match), both (complete
location match; default), overlap (coordinate overlap).
"""
def __init__(self, lit: LocationIterator, strategy="both"):
self.orgit = lit
self.it = peekable(lit)
assert strategy in [
"start",
"end",
"both",
"overlap",
], f"Unsupported grouping strategy {strategy}"
self.strategy = strategy
self.per_position = self.orgit.per_position
[docs]
def max_items(self):
"""Maximum number of items yielded by this iterator or None if unknown."""
return self.orgit.max_items()
def __iter__(self) -> Item[gi, (tuple, tuple)]:
for loc, value in self.it:
if isinstance(loc, Feature):
loc = loc.location
mloc = loc.copy()
values = [value]
locations = [loc]
if self.strategy == "start":
while self.it.peek(None) and self.it.peek()[0].left_match(mloc): # noqa
loc, v = next(self.it)
locations += [loc]
values += [v]
mloc = GI.merge((mloc, loc))
elif self.strategy == "end":
while self.it.peek(None) and self.it.peek()[0].right_match(
mloc
): # noqa
loc, v = next(self.it)
locations += [loc]
values += [v]
mloc = GI.merge((mloc, loc))
elif self.strategy == "both":
while self.it.peek(None) and self.it.peek()[0] == mloc: # noqa
loc, v = next(self.it)
locations += [loc]
values += [v]
mloc = GI.merge((mloc, loc))
elif self.strategy == "overlap":
while self.it.peek(None) and self.it.peek()[0].overlaps(mloc): # noqa
loc, v = next(self.it)
locations += [loc]
values += [v]
mloc = GI.merge((mloc, loc))
yield Item(mloc, (locations, values))
[docs]
def close(self):
try:
self.orgit.close()
except AttributeError:
pass
[docs]
class MergedLocationIterator(LocationIterator):
"""
Merges multiple location iterators into a single iterator that yields sorted locations and corresponding
result items (named tuples) that contain the data from the yielding iterator and an associated label.
This class uses a heap queue to merge-sort the iterators in a memory-efficient way.
It is typically instantiated from an existing LocationIterator via its `merge()` method.
Parameters
----------
iterables : list
A list of LocationIterators to merge
labels : list
A list of labels for the merged iterators. If None, default labels (f"it{idx}") will be used.
refdict : RefDict
A reference dictionary to use. If not set, the merged refdict of the passed iterators will be used.
"""
def __init__(self, iterables, labels=None, refdict=None):
# get iterables
self.iterables = iterables
if not isinstance(self.iterables, (tuple, list)):
self.iterables = [iterables]
for lit in iterables:
assert issubclass(
type(lit), LocationIterator
), f"Only implemented for LocationIterators, not for {type(lit)}"
# get labels
if labels is None:
self.labels = [f"it{i}" for i in range(len(self.iterables))]
else:
self.labels = to_list(labels)
assert len(self.labels) == len(self.iterables)
if refdict is None:
self.refdict = RefDict.merge_and_validate(
*[lit.refdict for lit in iterables]
)
if self.refdict is None:
warnings.warn(
"Could not determine RefDict from anno_its: using alphanumerical chrom order."
)
else:
if len(self.refdict) == 0:
warnings.warn("RefDict is empty! {self.refdict}")
else:
self.refdict = refdict
self.chromosomes = self.refdict.chromosomes()
assert len(self.chromosomes) > 0, "No common chromosomes found"
self.Result = namedtuple("Result", "data name") # result type
def _yield_per_chrom(self, i, label, chromosome):
if chromosome not in i.refdict:
return # not covered
i.set_region(gi(chromosome))
for loc, dat in i:
yield loc, dat, label
[docs]
def max_items(self):
return sum([lit.max_items() for lit in self.iterables])
def __iter__(self) -> Item[gi, tuple]:
for chromosome in self.chromosomes:
its = [
self._yield_per_chrom(i, label, chromosome)
for i, label in zip(self.iterables, self.labels)
]
for loc, dat, label in heapq.merge(*its):
yield Item(loc, self.Result(dat, label)) # noqa
@DeprecationWarning
class SyncPerPositionIterator(LocationIterator):
"""Synchronizes the passed location anno_its by genomic location and yields
individual genomic positions and overlapping intervals per passed iterator.
Expects (coordinate-sorted) location anno_its.
The chromosome order will be determined from a merged refdict or, if not possible,
by alphanumerical order.
Examples
--------
>>> it1, it2, it3 = ...
>>> for pos, (i1,i2,i3) in SyncPerPositionIterator([it1, it2, it3]): # noqa
>>> print(pos,i1,i2,i3)
>>> # where i1,...,i3 are lists of tx/data tuples from the passed LocationIterators
Notes
-----
* reported stats
yielded_items, chromosome: (int, str)
Number of yielded items (positions)
* TODOs
optimize
documentation and usage scenarios
"""
def __init__(self, iterables, refdict=None):
"""
Parameters
----------
iterables : List[LocationIterator]
a list of location anno_its
refdict : RefDict
a reference dict for the passed anno_its. If None, a merged refdict of all anno_its will be used
"""
self.iterables = iterables
for lit in iterables:
assert issubclass(
type(lit), LocationIterator
), "Only implemented for LocationIterators"
self.per_position = True
self._stats = Counter()
self.iterators = [peekable(lit) for lit in iterables] # make peekable anno_its
if refdict is None:
self.refdict = RefDict.merge_and_validate(
*[lit.refdict for lit in iterables]
)
if self.refdict is None:
warnings.warn(
"Could not determine RefDict from anno_its: using alphanumerical chrom order."
)
else:
if len(self.refdict) == 0:
warnings.warn("RefDict is empty! {self.refdict}")
else:
self.refdict = refdict
self.current = {i: list() for i, _ in enumerate(self.iterators)}
# get first position if any
first_positions = SortedList(
d[0]
for d in [lit.peek(default=None) for lit in self.iterators]
if d is not None
)
if len(first_positions) > 0:
# defined chrom sort order (fixed list) or create on the fly (sorted set supports addition in iteration)
self.chroms = (
SortedSet([first_positions[0].chromosome])
if self.refdict is None
else list(self.refdict.keys())
) # chrom must be in same order as in iterator!
# get min.max pos
self.pos, self.maxpos = first_positions[0].start, first_positions[0].end
else:
self.chroms = set() # no data
def max_items(self):
"""Maximum number of items yielded by this iterator or None if unknown."""
return None
def first_pos(self):
"""Returns the first position of the next item from the anno_its"""
first_positions = SortedList(
d[0]
for d in [lit.peek(default=None) for lit in self.iterators]
if d is not None
)
if len(first_positions) > 0:
return first_positions[0]
return None
def update(self, chromosome):
"""Updates the current list of overlapping intervals for the passed chromosome"""
if self.pos is None:
return
for i, lit in enumerate(self.iterators):
self.current[i] = [
(loc, d) for loc, d in self.current[i] if loc.end >= self.pos
]
while True:
nxt = lit.peek(default=None)
if nxt is None:
break # exhausted
loc, dat = nxt
if loc.chromosome != chromosome:
if self.refdict is None:
self.chroms.add(loc.chromosome) # added chr
break
self.maxpos = max(self.maxpos, loc.end)
if loc.start > self.pos:
break
self.current[i].append(next(lit)) # consume interval
def __iter__(self) -> Item[gi, tuple]:
"""Yields a tuple of genomic position and a list of overlapping intervals per iterator"""
for c in self.chroms:
self.update(c)
while self.pos <= self.maxpos:
self._stats["yielded_items", c] += 1
yield Item(
gi(c, self.pos, self.pos), [list(x) for x in self.current.values()]
)
self.pos += 1
self.update(c)
tmp = self.first_pos()
if tmp is None:
break # exhausted
self.pos, self.maxpos = tmp.start, tmp.end
if self.refdict is None:
self.chroms.add(tmp.chromosome)
def close(self):
"""Closes all wrapped anno_its"""
for lit in self.iterables:
try:
lit.close()
except AttributeError:
pass
[docs]
class AnnotationIterator(LocationIterator):
"""
Annotates locations in the first iterator with data from the ano_its location anno_its.
The returned data is a namedtuple with the following fields:
Item(location=gi, data=Result(anno=dat_from_it, label1=[Item(tx, dat_from_anno_it1)], ..., labeln=[Item(tx,
dat_from_anno_itn)])
This enables access to the following data:
* item.location: gi of the currently annotated location
* item.data.anno: data of the currently annotated location
* item.data.<label_n>: list of items from <iterator_n> that overlap the currently annotated position.
if no labels are provided, the following will be used: it0, it1, ..., itn.
Parameters
----------
lit : LocationIterator
The main location iterator. The created AnnotationIterator will yield each item from this iterator
alongside all overlapping items from the configured anno_its anno_its
anno_its : List[LocationIterator]
A list of LocationIterators for annotating the main iterator
labels : List[str]
A list of labels for storing data from the annotating anno_its
refdict : RefDict
A reference dict for the main iterator. If None, the refdict of the main iterator will be used
disable_progressbar : bool
If True, disables the tqdm progressbar
Yields
------
Item(location=gi, data=Result(anno=dat_from_it, label1=[Item(tx, dat_from_anno_it1)], ..., labeln=[Item(tx,
dat_from_anno_itn)])
"""
[docs]
def __init__(
self, lit, anno_its, labels=None, refdict=None, disable_progressbar=False
):
"""
Parameters
----------
lit: the main location iterator. The created AnnotationIterator will yield each item from this iterator
alongside all overlapping items from the configured anno_its anno_its
anno_its: a list of location anno_its for annotating the main iterator
labels: a list of labels for storing data from the annotating anno_its
refdict: a reference dict for the main iterator. If None, the refdict of the main iterator will be used
disable_progressbar: if True, disables the tqdm progressbar
"""
if not isinstance(anno_its, list):
anno_its = [anno_its]
if labels is None:
labels = [f"it{i}" for i in range(len(anno_its))]
elif not isinstance(labels, list):
labels = [labels]
for x in [lit] + anno_its:
assert issubclass(
type(x), LocationIterator
), f"Only implemented for LocationIterators but not for {type(x)}"
self._stats = Counter()
self.it = lit
self.refdict = lit.refdict if refdict is None else refdict
self.anno_its = anno_its
self.region = lit.region
self.chromosomes = lit.chromosomes
self.disable_progressbar = disable_progressbar
self.Result = namedtuple(
typename="Result", field_names=["anno"] + labels
) # result type
self.current = None
[docs]
def max_items(self):
"""Maximum number of items yielded by this iterator or None if unknown."""
return self.it.max_items()
[docs]
def stats(self):
"""Return stats of wrapped main iterator"""
return self.it.stats
[docs]
def update(self, ref, anno_its):
"""Updates the current buffer of overlapping intervals wrt. the passed reference location"""
def coord_overlaps(a, b):
return a.start <= b.end and b.start <= a.end
for i, lit in enumerate(anno_its):
self.buffer[i] = [
Item(loc, d) for loc, d in self.buffer[i] if loc.end >= ref.start
] # drop left
while True:
nxt = lit.peek(default=None)
if nxt is None:
break # exhausted
loc, dat = nxt
if loc.start > ref.end:
break # done
nxt = next(lit) # consume interval
if loc.end < ref.start:
continue # skip
self.buffer[i].append(nxt)
self.current[i] = [
Item(loc, d) for loc, d in self.buffer[i] if coord_overlaps(loc, ref)
]
return anno_its
def __iter__(self) -> Item[gi, tuple]:
"""Yields a tuple of the current genomic position and a named results tuple"""
for chromosome in tqdm(
self.chromosomes,
total=len(self.chromosomes),
disable=self.disable_progressbar,
):
self.buffer = [
list() for i, _ in enumerate(self.anno_its)
] # holds sorted intervals that overlap or are > than the currently annotated interval
self.current = [list() for i, _ in enumerate(self.anno_its)]
# set chrom of it
self.it.set_region(gi(chromosome, self.region.start, self.region.end))
anno_its = [lit for lit in self.anno_its if chromosome in lit.refdict]
if len(anno_its) == 0:
# warnings.warn(feature"Skipping chromosome {chromosome} as no annotation data found!")
for loc, dat in self.it:
yield Item(
loc, self.Result(dat, *self.current) # noqa
) # noqa # yield empty results
continue
for lit in anno_its: # set current chrom
lit.set_region(gi(chromosome, self.region.start, self.region.end))
anno_its = [peekable(lit) for lit in anno_its] # make peekable anno_its
for loc, dat in self.it:
anno_its = self.update(loc, anno_its)
yield Item(loc, self.Result(dat, *self.current)) # noqa
[docs]
def close(self):
"""Closes all wrapped anno_its"""
for lit in [self.it] + self.anno_its:
try:
lit.close()
except AttributeError:
pass
[docs]
class TiledIterator(LocationIterator):
"""Wraps a location iterator and reports tuples of items yielded for each genomic tile.
Tiles are either iterated from the passed regions_iterable or calculated from the reference dict via the
RefDict.tile(tile_size=...) method.
The iterator yields the tile location and a tuple of overlapping items from the location iterator.
During iteration, the locations and data of the last yielded item can be access via it.tile_locations and
it.tile_items.
Note that this can be slow if tile size is too small.
Parameters
----------
location_iterator: LocationIterator
The location iterator to be wrapped
regions_iterable: Iterable[GI]
An iterable of genomic intervals that define the tiles. Note, that if the location_iterator has a region set
then only tiles fully enveloped by this region will be iterated.
If not set, tiles will be calculated from the location iterators region (or its reference dict if none set)
based on the passed tile_size.
tile_size: int
The size of the tiles in base pairs. Used only if regions_iterable is not set.
"""
def __init__(
self, location_iterator, regions_iterable: Iterable[GI] = None, tile_size=1e8
):
assert issubclass(
type(location_iterator), LocationIterator
), f"Only implemented for LocationIterators but not for {type(location_iterator)}"
self.refdict = location_iterator.refdict
super().__init__(
file=None,
region=None,
file_format=None,
per_position=False,
fun_alias=None,
refdict=self.refdict,
)
self.location_iterator = location_iterator
self.tile_size = tile_size
if (
location_iterator.region is not None
and not location_iterator.region.is_unbounded()
):
# merge with passed regions_iterable
if regions_iterable is None:
regions_iterable = location_iterator.region.split_by_maxwidth(tile_size)
else:
# keep only tiles fully enveloped by iterator region
regions_iterable = [
r for r in regions_iterable if location_iterator.region.envelops(r)
]
if regions_iterable is None:
assert (
self.refdict is not None
), "Cannot calculate tiles without a reference dict"
assert self.refdict.has_len(), (
"Cannot calculate tiles from refdict without lengths. "
"Consider creating a refdict with calc_chromlen=True."
)
self.regions_iterable = list(
self.location_iterator.refdict.tile(tile_size=int(self.tile_size))
)
else:
self.regions_iterable = list(regions_iterable)
[docs]
def max_items(self):
"""Maximum number of items yielded by this iterator or None if unknown."""
return len(list(self.regions_iterable))
def __iter__(self) -> Item:
for reg in self.regions_iterable:
self.location = reg
self.location_iterator.set_region(reg)
dat = list(self.location_iterator)
self.tile_locations, self.tile_items = (
zip(*dat) if len(dat) > 0 else ((), ())
)
self._stats["iterated_items", reg.chromosome] += 1
yield Item(self.location, self.tile_items)
# ---------------------------------------------------------
# Additional, non location-bound anno_its
# ---------------------------------------------------------
[docs]
class FastqIterator:
"""
Iterates a fastq file and yields FastqRead objects (named tuples containing sequence name, sequence and quality
string (omitting line 3 in the FASTQ file)).
Parameters
----------
fastq_file : str
The FASTQ file name
Notes
-----
Note that this is not a location iterator.
reported stats:
* yielded_items: int
Number of yielded items
"""
def __init__(self, fastq_file):
self.file = fastq_file
self.per_position = False
self._stats = Counter()
@property
def stats(self):
return self._stats
def __len__(self):
"""Fast read counting"""
i = 0
with open_file_obj(self.file, file_format="fastq") as fin:
for i, _ in enumerate(fin):
pass
assert (
i + 1
) % 4 == 0, "Invalid read_count, not divisible by 4: {i+1}" # fastq complete?
return (i + 1) // 4
def __iter__(self) -> FastqRead:
"""
Iterates FASTQ entries
"""
with open_file_obj(self.file, file_format="fastq") as fin:
for d in grouper(fin, 4, ""):
self._stats["yielded_items", "all"] += 1
name, seq, qual = d[0].strip(), d[1].strip(), d[3].strip()
if type(name) is bytes:
name, seq, qual = (
name.decode('utf-8'),
seq.decode('utf-8'),
qual.decode('utf-8'),
)
yield FastqRead(name, seq, qual)
[docs]
def to_list(self):
"""Exhausts iterator and returns results in a list.
For debugging/testing only
"""
return [x for x in self]
[docs]
def read_aligns_to_loc(loc: gi, read: pysam.AlignedSegment):
"""Tests whether a read aligns to the passed location by checking the respective alignment block coordinates and
the read strand. Note that the chromosome is *not* checked.
"""
if (loc.strand is not None) and (loc.strand != "-" if read.is_reverse else "+"):
return False # wrong strand
# chr is not checked
for read_start, read_end in read.get_blocks():
if (loc.start <= read_end) and (read_start < loc.end):
return True
return False
[docs]
def it(obj, **kwargs):
"""Factory function for creating :meth:`LocationIterators <LocationIterator>` for the passed object.
In most cases, the type of the created iterator will be determined by the type of the passed object.
If there are multiple possibilities, the style parameter can be used to select the desired iterator type.
For example, if a pandas dataframe is passed then the ``style=...`` parameter determines whether a
:meth:`PandasIterator`, a :meth:`BioframeIterator` or a :meth:`PyrangesIterator` is returned.
The following object types are supported:
* If the object is a standard python type (**dict, list, tuple**), a :meth:`MemoryIterator` will be created.
* If the object is a **file or a string**, the file format will be guessed from the file extension.
* The following file formats are supported:
* fasta: :meth:`FastaIterator`
* sam, bam: :meth:`ReadIterator`
* or :meth:`FastPileupIterator` if you pass ``style=='pileup'``
* or :meth:`PairedReadIterator` if you pass ``style=='paired'``
* tsv: :meth:`TabixIterator`
* bed: :meth:`BedIterator`
* vcf, bcf: :meth:`VcfIterator`
* gff, gtf: :meth:`GFF3Iterator`
* fastq: :meth:`FastqIterator`
* If you pass ``style=='pybedtools'``, a :meth:`PybedtoolsIterator` will be created
* If you pass ``style=='tabix'``, a :meth:`TabixIterator` will be created
* If the object is a pandas dataframe, a :meth:`PandasIterator` will be created.
* If you pass ``style='bioframe'``, a :meth:`BioframeIterator` will be created
* If you pass ``style='pyranges'``, a :meth:`PyrangesIterator` will be created
* If the object is a **pybedtools.BedTool**, a :meth:`PybedtoolsIterator` will be created
* If the object is a **pyBigWig.pyBigWig**:
* if ``style == 'bigbed'``, a :meth:`BigBedIterator` will be created
* else a :meth:`BigWigIterator` will be created
* If the object is a :meth:`GI`, a :meth:`MemoryIterator` will be created that iterates the GI's individual
positions.
* If the object is a :meth:`LocationIterator`, an :meth:`AnnotationIterator` will be created
* If the object is a :meth:`Transcriptome`, a :meth:`TranscriptomeIterator` will be created
Parameters
----------
obj: any
The object for which an iterator should be created
style: str
The (optional) style of the iterator for disambiguation. This parameter will be removed from kwargs
before the iterator is created.
kwargs: dict
Additional parameters that will be passed to the iterator constructor
Returns
-------
LocationIterator
Examples
--------
>>> it('test.bed') # creates a BedIterator
>>> it('test.bed', style='bedgraph') # creates a BedGraphIterator
>>> it('test.bam', style='pileup', region='chr1:1000-2000', min_mapping_quality=30) # creates FastPileupIterator
>>> it(pd.DataFrame(...), style='bioframe') # creates a BioframeIterator
>>> it(rna.Transcriptome(...), feature_types='exon') # creates a TranscriptomeIterator
>>> vars(it("test.bam")) # inspect the created iterator
"""
style = kwargs.get("style", None)
if style is not None:
del kwargs["style"]
if isinstance(obj, GI): # iterate over positions
return MemoryIterator(obj, **kwargs)
if (
isinstance(obj, dict)
or isinstance(obj, list)
or isinstance(obj, tuple)
or isinstance(obj, set)
):
return MemoryIterator(obj, **kwargs)
elif isinstance(obj, str) or isinstance(obj, (str, PathLike)):
ff = guess_file_format(obj)
assert ff is not None, f"Could not guess file format for file {obj}."
if style == "pybedtools":
return PybedtoolsIterator(obj, **kwargs)
elif style == "tabix":
return TabixIterator(obj, **kwargs)
if ff == "fasta":
return FastaIterator(obj, **kwargs)
elif ff == "sam" or ff == "bam":
if style == "pileup":
return FastPileupIterator(obj, **kwargs)
elif style == "paired":
return PairedReadIterator(obj, **kwargs)
return ReadIterator(obj, **kwargs)
elif ff == "tsv":
return TabixIterator(obj, **kwargs)
elif ff == "bed":
if style == "bedgraph":
return BedGraphIterator(obj, **kwargs)
return BedIterator(obj, **kwargs)
elif ff == "bedgraph":
return BedGraphIterator(obj, **kwargs)
elif ff == "vcf" or ff == "bcf":
return VcfIterator(obj, **kwargs)
elif ff == "gff":
return GFF3Iterator(obj, **kwargs)
elif ff == "gtf":
return GFF3Iterator(obj, **kwargs)
elif ff == "fastq":
return FastqIterator(obj)
elif ff == "bigwig":
return BigWigIterator(obj, **kwargs)
elif ff == "bigbed":
return BigBedIterator(obj, **kwargs)
elif isinstance(obj, pysam.Fastafile): # @UndefinedVariable
return FastaIterator(obj, **kwargs)
elif isinstance(obj, pysam.AlignmentFile): # @UndefinedVariable
if style == "pileup":
return FastPileupIterator(obj, **kwargs)
elif style == "paired":
return PairedReadIterator(obj, **kwargs)
return ReadIterator(obj, **kwargs)
elif isinstance(obj, pysam.VariantFile): # @UndefinedVariable
return VcfIterator(obj, **kwargs)
elif isinstance(obj, pd.DataFrame):
if style == "bioframe":
return BioframeIterator(obj, **kwargs)
elif style == "pyranges":
return PyrangesIterator(obj, **kwargs)
return PandasIterator(obj, **kwargs)
elif isinstance(obj, pybedtools.BedTool):
return PybedtoolsIterator(obj, **kwargs)
elif isinstance(obj, pyBigWig.pyBigWig):
if style == "bigbed":
return BigBedIterator(obj, **kwargs)
return BigWigIterator(obj, **kwargs)
elif isinstance(obj, LocationIterator):
return AnnotationIterator(obj, **kwargs)
elif isinstance(obj, Transcriptome):
return TranscriptomeIterator(obj, **kwargs)
raise NotImplementedError(
f"Object type {type(obj)} not supported by factory method. Try to create the iterator "
f"manually."
)