coprocessor 中的 Mvcc Read

How do coprocessor read tikv


Questions Before Reading

  1. How do coprocessor read tikv with MVCC
  2. TiKV 数据组织之前已经讲过,本身是基于行的存储,Cop 做的一些处理是怎么转化这些数据类型的?
  3. 上一次我们有意淡化了ColumnRef,这次我们应该怎么处理它?
  4. coprocessor 新的 TiKVStorage 和 Storage trait 都是什么东西?

How do Mvcc Read

storage/mvcc/reader/mod.rs:

mod reader;
mod scanner;
pub use self::reader::MvccReader;
pub use self::scanner::{Scanner, ScannerBuilder};

MvccReader 是一个点查的类型,我们分析它依赖什么:

// 很真实的 MvccReader, 我终于到这里了么!
pub struct MvccReader<S: Snapshot> {
snapshot: S,
statistics: Statistics,
// cursors are used for speeding up scans.
data_cursor: Option<Cursor<S::Iter>>,
lock_cursor: Option<Cursor<S::Iter>>,
write_cursor: Option<Cursor<S::Iter>>,
scan_mode: Option<ScanMode>,
key_only: bool,
/// 调用 rocksdb 的 fill_cache
fill_cache: bool,
lower_bound: Option<Vec<u8>>,
upper_bound: Option<Vec<u8>>,
isolation_level: IsolationLevel,
}

不管 scan_mode 这些,可以看到:

pub struct MvccReader<S: Snapshot> {
snapshot: S,
statistics: Statistics,
// cursors are used for speeding up scans.
data_cursor: Option<Cursor<S::Iter>>,
lock_cursor: Option<Cursor<S::Iter>>,
write_cursor: Option<Cursor<S::Iter>>,
...
}

关于读的行为,Cursor 是很重要的,不过我们先看看 Snapshot:

/// KV 的 snapshot trait, 提供了 snapshot 的操作
pub trait Snapshot: Send + Clone {
type Iter: Iterator;
fn get(&self, key: &Key) -> Result<Option<Value>>;
fn get_cf(&self, cf: CfName, key: &Key) -> Result<Option<Value>>;
fn iter(&self, iter_opt: IterOption, mode: ScanMode) -> Result<Cursor<Self::Iter>>;
fn iter_cf(
&self,
cf: CfName,
iter_opt: IterOption,
mode: ScanMode,
) -> Result<Cursor<Self::Iter>>;
fn get_properties(&self) -> Result<TablePropertiesCollection> {
self.get_properties_cf(CF_DEFAULT)
}
fn get_properties_cf(&self, _: CfName) -> Result<TablePropertiesCollection> {
Err(box_err!("no user properties"))
}
// The minimum key this snapshot can retrieve.
#[inline]
fn lower_bound(&self) -> Option<&[u8]> {
None
}
// The maximum key can be fetched from the snapshot should less than the upper bound.
#[inline]
fn upper_bound(&self) -> Option<&[u8]> {
None
}
}

很多层都封装了 Snapshot, Cursor 也是这来的。我们在 Snapshot 上封装读。

Storage trait

/// The abstract storage interface. The table scan and index scan executor relies on a `Storage`
/// implementation to provide source data.
pub trait Storage: Send {
type Statistics;
// TODO: Use const generics.
// TODO: Use reference is better.
fn begin_scan(
&mut self,
is_backward_scan: bool,
is_key_only: bool,
range: IntervalRange,
) -> Result<()>;
fn scan_next(&mut self) -> Result<Option<OwnedKvPair>>;
// TODO: Use const generics.
// TODO: Use reference is better.
fn get(&mut self, is_key_only: bool, range: PointRange) -> Result<Option<OwnedKvPair>>;
fn collect_statistics(&mut self, dest: &mut Self::Statistics);
}

可以看到,这里对 Scanner 有一定程度指定,希望返回的是 Option<OwnedKvPair>

pub type OwnedKvPair = (Vec<u8>, Vec<u8>);

这里拿到了对应的所有权,返回了两个 Vec<u8>

实际实现这个 trait 的是 TiKVStorage, 这个项目位于 TiKV 内:

/// A `Storage` implementation over TiKV's storage.
pub struct TiKVStorage<S: Store> {
store: S,
scanner: Option<S::Scanner>,
cf_stats_backlog: Statistics,
}
impl<S: Store> TiKVStorage<S> {
pub fn new(store: S) -> Self {
Self {
store,
scanner: None,
cf_stats_backlog: Statistics::default(),
}
}
}
// 具体实现,代码被我略去了
impl<S: Store> Storage for TiKVStorage<S> {}

关于 Store,这里是存储读的 trait,可以看到 Storage 和 Snapshot 有一定的关系,实际真正构建的时候:

builder = Box::new(move |snap, req_ctx: &ReqContext| {
// TODO: Remove explicit type once rust-lang#41078 is resolved
let store = SnapshotStore::new(
snap,
dag.get_start_ts(),
req_ctx.context.get_isolation_level(),
!req_ctx.context.get_not_fill_cache(),
);
dag::build_handler(
dag,
ranges,
store,
req_ctx.deadline,
batch_row_limit,
is_streaming,
enable_batch_if_possible,
)
});

根据 SnapshotStorestart_ts 构建 Snap, 从 Snapshot 中读.

这样,Question 1\4 有了答案:

  1. How do coprocessor read tikv with MVCC:

    请求的时候有 ts, 构建一个 snap,这个时候可以完成 MVCC Read

  2. coprocessor 新的 TiKVStorage 和 Storage trait 都是什么东西:

    1. Storage 是具体读的对象的抽象
    2. TiKVStorage 在 tikv 的 src 内,是具体读的对象。为了节省编译时间把这个拆开了。

具体处理读取数据

  1. TiKV 数据组织之前已经讲过,本身是基于行的存储,Cop 做的一些处理是怎么转化这些数据类型的?
  2. 上一次我们有意淡化了ColumnRef,这次我们应该怎么处理它?

具体看:

/// Fills a column vector and returns whether or not all ranges are drained.
///
/// The columns are ensured to be regular even if there are errors during the process.
fn fill_column_vec(
&mut self,
scan_rows: usize,
columns: &mut LazyBatchColumnVec,
) -> Result<bool> {
assert!(scan_rows > 0);
for _ in 0..scan_rows {
let some_row = self.scanner.next()?;
// 这里 (key, value) 都是 `Vec<u8>`
if let Some((key, value)) = some_row {
// Retrieved one row from point range or non-point range.
if let Err(e) = self.imp.process_kv_pair(&key, &value, columns) {
// When there are errors in `process_kv_pair`, columns' length may not be
// identical. For example, the filling process may be partially done so that
// first several columns have N rows while the rest have N-1 rows. Since we do
// not immediately fail when there are errors, these irregular columns may
// further cause future executors to panic. So let's truncate these columns to
// make they all have N-1 rows in that case.
columns.truncate_into_equal_length();
return Err(e);
}
} else {
// Drained
return Ok(true);
}
}
// Not drained
Ok(false)
}

imp 实际上是

/// Common interfaces for table scan and index scan implementations.
pub trait ScanExecutorImpl: Send {
/// Gets the schema.
fn schema(&self) -> &[FieldType];
/// Gets a mutable reference of the executor context.
fn mut_context(&mut self) -> &mut EvalContext;
fn build_column_vec(&self, scan_rows: usize) -> LazyBatchColumnVec;
/// Accepts a key value pair and fills the column vector.
///
/// The column vector does not need to be regular when there are errors during this process.
/// However if there is no error, the column vector must be regular.
fn process_kv_pair(
&mut self,
key: &[u8],
value: &[u8],
columns: &mut LazyBatchColumnVec,
) -> Result<()>;
}

这个关键是处理的 LazyBatchColumnVec:

/// Stores multiple `LazyBatchColumn`s. Each column has an equal length.
#[derive(Clone, Debug)]
pub struct LazyBatchColumnVec {
/// Multiple lazy batch columns. Each column is either decoded, or not decoded.
///
/// For decoded columns, they may be in different types. If the column is in
/// type `LazyBatchColumn::Raw`, it means that it is not decoded.
columns: Vec<LazyBatchColumn>,
}

具体的 Column 信息如下:

#[derive(Clone, Debug)]
pub enum LazyBatchColumn {
Raw(BufferVec),
Decoded(VectorValue),
}

这个 Lazy 应该还是很好理解的,注意 Decoded 是一个 VectorValue ,代表是 batch 处理的一组列。然后 BufferVec 是:

/// A vector like container storing multiple buffers. Each buffer is a `[u8]` slice in
/// arbitrary length.
#[derive(Default, Clone)]
pub struct BufferVec {
data: Vec<u8>,
offsets: Vec<usize>,
}

所以实际情况应该是:

Record Data:
t_${table_id}_r_${handle}=>${v1}${v2}${..}

codec 包拿到 data 和 offset,对信息弄到 BufferVec 当列存,这个过程走一遍解析,实际拿到的数据可能是 Vector 的。

作为 ColumnRef:

#[inline]
fn handle_node_column_ref(
tree_node: Expr,
rpn_nodes: &mut Vec<RpnExpressionNode>,
max_columns: usize,
) -> Result<()> {
let offset = number::decode_i64(&mut tree_node.get_val())
.map_err(|_| other_err!("Unable to decode column reference offset from the request"))?
as usize;
if offset >= max_columns {
return Err(other_err!(
"Invalid column offset (schema has {} columns, access index {})",
max_columns,
offset
));
}
rpn_nodes.push(RpnExpressionNode::ColumnRef { offset });
Ok(())
}

解释的时候拿出 ColumnRef 解释执行:

RpnExpressionNode::ColumnRef { offset } => {
let field_type = &schema[*offset];
let decoded_physical_column = input_physical_columns[*offset].decoded();
assert_eq!(input_logical_rows.len(), output_rows);
stack.push(RpnStackNode::Vector {
value: RpnStackNodeVectorValue::Ref {
physical_value: &decoded_physical_column,
logical_rows: input_logical_rows,
},
field_type,
});
}

可以看到 http://blog.itpub.net/6906/viewspace-2648329/:

ColumnRef 是对 Column 的引用。