|
| 1 | +// Copyright 2017 Johannes Köster. |
| 2 | +// Licensed under the MIT license (http://opensource.org/licenses/MIT) |
| 3 | +// This file may not be copied, modified, or distributed |
| 4 | +// except according to those terms. |
| 5 | + |
| 6 | + |
| 7 | +use std::collections::{vec_deque, VecDeque}; |
| 8 | +use std::error::Error; |
| 9 | +use std::str; |
| 10 | + |
| 11 | +use bam; |
| 12 | +use bam::Read; |
| 13 | + |
| 14 | + |
| 15 | +/// A buffer for BAM records. This allows access regions in a sorted BAM file while iterating |
| 16 | +/// over it in a single pass. |
| 17 | +/// The buffer is implemented as a ringbuffer, such that extension or movement to the right has |
| 18 | +/// linear complexity. The buffer makes use of indexed random access. Hence, when fetching a |
| 19 | +/// region at the very end of the BAM, everything before is omitted without cost. |
| 20 | +pub struct RecordBuffer { |
| 21 | + reader: bam::IndexedReader, |
| 22 | + inner: VecDeque<bam::Record>, |
| 23 | + overflow: Option<bam::Record> |
| 24 | +} |
| 25 | + |
| 26 | + |
| 27 | +unsafe impl Sync for RecordBuffer {} |
| 28 | +unsafe impl Send for RecordBuffer {} |
| 29 | + |
| 30 | + |
| 31 | +impl RecordBuffer { |
| 32 | + /// Create a new `RecordBuffer`. |
| 33 | + pub fn new(bam: bam::IndexedReader) -> Self { |
| 34 | + RecordBuffer { |
| 35 | + reader: bam, |
| 36 | + inner: VecDeque::new(), |
| 37 | + overflow: None |
| 38 | + } |
| 39 | + } |
| 40 | + |
| 41 | + /// Return end position of buffer. |
| 42 | + fn end(&self) -> Option<u32> { |
| 43 | + self.inner.back().map(|rec| rec.pos() as u32) |
| 44 | + } |
| 45 | + |
| 46 | + fn tid(&self) -> Option<i32> { |
| 47 | + self.inner.back().map(|rec| rec.tid()) |
| 48 | + } |
| 49 | + |
| 50 | + /// Fill buffer at the given interval. The start coordinate has to be left of |
| 51 | + /// the start coordinate of any previous `fill` operation. |
| 52 | + /// Coordinates are 0-based, and end is exclusive. |
| 53 | + /// Returns tuple with numbers of added and deleted records since the previous fetch. |
| 54 | + pub fn fetch(&mut self, chrom: &[u8], start: u32, end: u32) -> Result<(usize, usize), Box<Error>> { |
| 55 | + let mut added = 0; |
| 56 | + // move overflow from last fetch into ringbuffer |
| 57 | + if self.overflow.is_some() { |
| 58 | + added += 1; |
| 59 | + self.inner.push_back(self.overflow.take().unwrap()); |
| 60 | + } |
| 61 | + |
| 62 | + if let Some(tid) = self.reader.header.tid(chrom) { |
| 63 | + let mut deleted = 0; |
| 64 | + let window_start = start; |
| 65 | + if self.inner.is_empty() || self.end().unwrap() < window_start || self.tid().unwrap() != tid as i32 { |
| 66 | + let end = self.reader.header.target_len(tid).unwrap(); |
| 67 | + try!(self.reader.fetch(tid, window_start, end)); |
| 68 | + deleted = self.inner.len(); |
| 69 | + self.inner.clear(); |
| 70 | + } else { |
| 71 | + // remove records too far left |
| 72 | + let to_remove = self.inner.iter().take_while(|rec| rec.pos() < window_start as i32).count(); |
| 73 | + for _ in 0..to_remove { |
| 74 | + self.inner.pop_front(); |
| 75 | + } |
| 76 | + deleted = to_remove; |
| 77 | + } |
| 78 | + |
| 79 | + // extend to the right |
| 80 | + loop { |
| 81 | + let mut record = bam::Record::new(); |
| 82 | + if let Err(e) = self.reader.read(&mut record) { |
| 83 | + if e.is_eof() { |
| 84 | + break; |
| 85 | + } |
| 86 | + return Err(Box::new(e)); |
| 87 | + } |
| 88 | + |
| 89 | + if record.is_unmapped() { |
| 90 | + continue; |
| 91 | + } |
| 92 | + |
| 93 | + let pos = record.pos(); |
| 94 | + if pos >= end as i32 { |
| 95 | + self.overflow = Some(record); |
| 96 | + break; |
| 97 | + } else { |
| 98 | + self.inner.push_back(record); |
| 99 | + added += 1; |
| 100 | + } |
| 101 | + } |
| 102 | + |
| 103 | + Ok((added, deleted)) |
| 104 | + } else { |
| 105 | + Err(Box::new(RecordBufferError::UnknownSequence(str::from_utf8(chrom).unwrap().to_owned()))) |
| 106 | + } |
| 107 | + } |
| 108 | + |
| 109 | + /// Iterate over records that have been fetched with `fetch`. |
| 110 | + pub fn iter(&self) -> vec_deque::Iter<bam::Record> { |
| 111 | + self.inner.iter() |
| 112 | + } |
| 113 | + |
| 114 | + pub fn len(&self) -> usize { |
| 115 | + self.inner.len() |
| 116 | + } |
| 117 | +} |
| 118 | + |
| 119 | + |
| 120 | +quick_error! { |
| 121 | + #[derive(Debug)] |
| 122 | + pub enum RecordBufferError { |
| 123 | + UnknownSequence(chrom: String) { |
| 124 | + description("unknown sequence") |
| 125 | + display("sequence {} cannot be found in BAM", chrom) |
| 126 | + } |
| 127 | + } |
| 128 | +} |
| 129 | + |
| 130 | + |
| 131 | +#[cfg(test)] |
| 132 | +mod tests { |
| 133 | + use super::*; |
| 134 | + use itertools::Itertools; |
| 135 | + use bam; |
| 136 | + |
| 137 | + #[test] |
| 138 | + fn test_buffer() { |
| 139 | + let reader = bam::IndexedReader::from_path(&"test/test.bam").unwrap(); |
| 140 | + |
| 141 | + let mut buffer = RecordBuffer::new(reader); |
| 142 | + |
| 143 | + buffer.fetch(b"CHROMOSOME_I", 1, 5).unwrap(); |
| 144 | + { |
| 145 | + let records = buffer.iter().collect_vec(); |
| 146 | + assert_eq!(records.len(), 6); |
| 147 | + assert_eq!(records[0].pos(), 1); |
| 148 | + assert_eq!(records[1].pos(), 1); |
| 149 | + assert_eq!(records[2].pos(), 1); |
| 150 | + assert_eq!(records[3].pos(), 1); |
| 151 | + assert_eq!(records[4].pos(), 1); |
| 152 | + assert_eq!(records[5].pos(), 1); |
| 153 | + } |
| 154 | + } |
| 155 | +} |
0 commit comments