1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
//! Cut the input stream in chunks for batch processing.
use crate::as_mut_slice_no_borrow_check;
use crate::options::ARGS;
use std::fs::File;
use std::io;
use std::io::Read;
use std::iter::Peekable;
use std::path::Path;
use std::path::PathBuf;
use std::slice;
use std::slice::Iter;
/// This is the type used to count bytes in the input stream. Maybe in a future
/// version we raise this to `u128`.
pub type ByteCounter = u64;
/// This is the size of `input_buffer` in bytes. It should be aligned with a
/// multiple of the memory page size, which is - depending on the hardware - `n *
/// 4096` bytes.
#[cfg(not(test))]
pub const INPUT_BUF_LEN: usize = 4096;
#[cfg(test)]
pub const INPUT_BUF_LEN: usize = 0x20;
/// Struct to store the `Slicer`-iterator state. The iterator fills the
/// `input-buffer` with bytes coming from files, whose names are given in the
/// vector `ARGS.inputs`. When one file is exhausted, the iterator switches
/// automatically and transparently to the next file in `ARGS.inputs`. When no
/// data is left in any file, `next()` returns `None`.
pub struct Slicer<'a> {
/// An iterator over `ARGS.inputs` wrapped in an option. If the option is
/// `Some()`, then the input should be read from files, whose filenames are
/// delivered with the iterator's `next()`. If the option is `None`, then the
/// data comes from `std::stdin`.
filename_iter: Option<Peekable<Iter<'a, PathBuf>>>,
/// The reader associated with the current file.
reader: Box<dyn Read>,
/// An index identifying the source of the input:
/// The input comes from:
/// * 0: `stdin`,
/// * 1: the first file in `ARGS.inputs`,
/// * 2: the second file in `ARGS.inputs`,
/// * 3: ...
current_input_idx: usize,
/// Is true, when this is the last iteration. After this, comes
/// only `None`.
current_input_is_last: bool,
/// Buffer to store all incoming bytes from the readers. The input is
/// streamed in this buffer first, before being analysed later in batches.
input_buffer: [u8; INPUT_BUF_LEN],
}
impl<'a> Slicer<'_> {
#[inline]
pub fn new() -> Self {
if (ARGS.inputs.is_empty())
|| ((ARGS.inputs.len() == 1) && ARGS.inputs[0] == Path::new("-"))
{
Self {
filename_iter: None,
reader: Box::new(io::stdin()) as Box<dyn Read>,
current_input_idx: 0,
current_input_is_last: true,
input_buffer: [0u8; INPUT_BUF_LEN],
}
} else {
let mut filename_iter = ARGS.inputs.iter().peekable();
// `unwrap()` is save because we know `if` above, that there is at least one
// filename.
let filename = filename_iter.next().unwrap();
let reader = match File::open(&Path::new(filename)) {
Ok(file) => Box::new(file) as Box<dyn Read>,
Err(e) => {
eprintln!("Error: can not read file`{:?}`: {}", filename, e);
Box::new(io::empty()) as Box<dyn Read>
}
};
let current_input_is_last = filename_iter.peek().is_none();
Self {
filename_iter: Some(filename_iter),
// Just to start with something, will be overwritten
// immediately.
reader,
// Convention here: `0` means "not started".
current_input_idx: 1,
// There might be more than one file.
current_input_is_last,
input_buffer: [0u8; INPUT_BUF_LEN],
}
}
}
}
/// Iterator over the input stream coming from `std::stdin` or from files whose
/// names are listed in `ARGS.inputs`.
impl<'a> Iterator for Slicer<'a> {
/// The iterator's `next()` returns a tuple `(&[u8], Option<u8>, bool)` with 3 members:
/// * First member `&[u8]`: \
/// a slice of input bytes comprising all valid bytes in `input_buffer`.
/// * Second member `Option<u8>`:\
/// A label identifying the origin of the bytes in `&[u8]`:\
/// * `None`: the origin of the input is `stdin`,
/// * `Some(1)`: the bytes come from the first file in `ARGS.inputs`,
/// * `Some(2)`: the bytes come from the second file in `ARGS.inputs`,
/// * `Some(3)`: ...
/// * Third member `bool`:\
/// * `true`: this chunk of input data is the very last one. All further
/// `next()` will return `None`.
/// * `false`: More input data will come with the next `next()`.
type Item = (&'a [u8], Option<u8>, bool);
/// Returns the next slice of input.
fn next(&mut self) -> Option<Self::Item> {
let input_buffer_slice = as_mut_slice_no_borrow_check!(self.input_buffer);
// Fill the input buffer.
let no_bytes_received = self.reader.read(input_buffer_slice).expect(&*format!(
"Error: Could not read input stream no. {}",
self.current_input_idx
));
let result = &input_buffer_slice[..no_bytes_received];
let this_stream_ended = no_bytes_received == 0;
let input_ended = self.current_input_is_last && this_stream_ended;
// More files to open?
if this_stream_ended {
if self.current_input_is_last {
// Early return
return None;
} else {
// We can safely do first `unwrap()` because
// `!self.current_input_is_last` can only happen (be true)
// if `self.filename_iter()` is not `None`.
// We can safely do second `unwarp()` here, because we have `peek()` ed
// we already and know there is at least one more filename.
let filename = self.filename_iter.as_mut().unwrap().next().unwrap();
self.current_input_idx += 1;
// The next run needs to know if there is more.
self.current_input_is_last = self.filename_iter.as_mut().unwrap().peek().is_none();
let reader = match File::open(&Path::new(filename)) {
Ok(file) => Box::new(file) as Box<dyn Read>,
Err(e) => {
eprintln!("Error: can not read file: {}", e);
Box::new(io::empty()) as Box<dyn Read>
}
};
// Store the reader for the `next()` run.
self.reader = reader;
}
};
// Change type for output.
let current_file_id = match self.current_input_idx {
0 => None,
// Map 1 -> "A", 2 -> "B", ...
c => Some(c as u8),
};
Some((result, current_file_id, input_ended))
}
}