coprocessor阅读2

Coprocessor 和代码分析: rpn_expr/types

我们可以看到,上一期中,我们找到了 coprocessor 接口,能够基本把 coprocessor 请求和 dag api 处理

新版本的 tiqb 有:

可以看到,这里有个神奇的 Runner:

pub struct BatchExecutorsRunner<SS> {
/// The deadline of this handler. For each check point (e.g. each iteration) we need to check
/// whether or not the deadline is exceeded and break the process if so.
// TODO: Deprecate it using a better deadline mechanism.
deadline: Deadline,
out_most_executor: Box<dyn BatchExecutor<StorageStats = SS>>,
/// The offset of the columns need to be outputted. For example, TiDB may only needs a subset
/// of the columns in the result so that unrelated columns don't need to be encoded and
/// returned back.
output_offsets: Vec<u32>,
config: Arc<EvalConfig>,
/// Whether or not execution summary need to be collected.
collect_exec_summary: bool,
exec_stats: ExecuteStats,
}

其中:

  1. EvalConfig 是个很常见的玩意:

    #[derive(Clone, Debug)]
    pub struct EvalConfig {
    /// timezone to use when parse/calculate time.
    pub tz: Tz,
    pub flag: Flag,
    // TODO: max warning count is not really a EvalConfig. Instead it is a ExecutionConfig, because
    // warning is a executor stuff instead of a evaluation stuff.
    pub max_warning_cnt: usize,
    pub sql_mode: SqlMode,
    }

    包含 SqlMode/TimeZone 等信息

  2. out_most_executor 是 DAG 最外层,拿到这个就可以用 next_batch 请求里面的东西。

可以看到这里

pub fn from_request<S: Storage<Statistics = SS> + 'static>(
mut req: DAGRequest,
ranges: Vec<KeyRange>,
storage: S,
deadline: Deadline,
) -> Result<Self> {
let executors_len = req.get_executors().len();
let collect_exec_summary = req.get_collect_execution_summaries();
let config = Arc::new(EvalConfig::from_request(&req)?);
let out_most_executor = if collect_exec_summary {
build_executors::<_, ExecSummaryCollectorEnabled>(
req.take_executors().into(),
storage,
ranges,
config.clone(),
)?
} else {
build_executors::<_, ExecSummaryCollectorDisabled>(
req.take_executors().into(),
storage,
ranges,
config.clone(),
)?
};
// Check output offsets
let output_offsets = req.take_output_offsets();
let schema_len = out_most_executor.schema().len();
for offset in &output_offsets {
if (*offset as usize) >= schema_len {
return Err(other_err!(
"Invalid output offset (schema has {} columns, access index {})",
schema_len,
offset
));
}
}
let exec_stats = ExecuteStats::new(if collect_exec_summary {
executors_len
} else {
0 // Avoid allocation for executor summaries when it is not needed
});
Ok(Self {
deadline,
out_most_executor,
output_offsets,
config,
collect_exec_summary,
exec_stats,
})
}

好吧,我们最终还是依赖 BatchExecutor 的接口:

/// The interface for pull-based executors. It is similar to the Volcano Iterator model, but
/// pulls data in batch and stores data by column.
pub trait BatchExecutor: Send {
type StorageStats;
/// Gets the schema of the output.
///
/// Provides an `Arc` instead of a pure reference to make it possible to share this schema in
/// multiple executors. Actually the schema is only possible to be shared in executors, but it
/// requires the ability to store a reference to a field in the same structure. There are other
/// solutions so far but looks like `Arc` is the simplest one.
fn schema(&self) -> &[FieldType];
/// Pulls next several rows of data (stored by column).
///
/// This function might return zero rows, which doesn't mean that there is no more result.
/// See `is_drained` in `BatchExecuteResult`.
fn next_batch(&mut self, scan_rows: usize) -> BatchExecuteResult;
/// Collects execution statistics (including but not limited to metrics and execution summaries)
/// accumulated during execution and prepares for next collection.
///
/// The executor implementation must invoke this function for each children executor. However
/// the invocation order of children executors is not stipulated.
///
/// This function may be invoked several times during execution. For each invocation, it should
/// not contain accumulated meta data in last invocation. Normally the invocation frequency of
/// this function is less than `next_batch()`.
fn collect_exec_stats(&mut self, dest: &mut ExecuteStats);
/// Collects underlying storage statistics accumulated during execution and prepares for
/// next collection.
///
/// Similar to `collect_exec_stats()`, the implementation must invoke this function for each
/// children executor and this function may be invoked several times during execution.
fn collect_storage_stats(&mut self, dest: &mut Self::StorageStats);
fn with_summary_collector<C: ExecSummaryCollector + Send>(
self,
summary_collector: C,
) -> WithSummaryCollector<C, Self>
where
Self: Sized,
{
WithSummaryCollector {
summary_collector,
inner: self,
}
}
}

返回的接口是 BatchExecutorResult, 这玩意:

pub struct BatchExecuteResult {
pub physical_columns: LazyBatchColumnVec,
pub logical_rows: Vec<usize>,
pub warnings: EvalWarnings,
pub is_drained: Result<bool>,
}

我把注释都删了,实际上这玩意内容也蛮多的(笑)

  1. is_drained 表示是不是没有数据了(比如 SELECT 算子很可能虽然没返回,但是还有别的)

  2. physical_columns:

    /// Stores multiple `LazyBatchColumn`s. Each column has an equal length.
    #[derive(Clone, Debug)]
    pub struct LazyBatchColumnVec {
    /// Multiple lazy batch columns. Each column is either decoded, or not decoded.
    ///
    /// For decoded columns, they may be in different types. If the column is in
    /// type `LazyBatchColumn::Raw`, it means that it is not decoded.
    columns: Vec<LazyBatchColumn>,
    }

    里面还有:

    /// A container stores an array of datums, which can be either raw (not decoded), or decoded into
    /// the `VectorValue` type.
    ///
    /// TODO:
    /// Since currently the data format in response can be the same as in storage, we use this structure
    /// to avoid unnecessary repeated serialization / deserialization. In future, Coprocessor will
    /// respond all data in Arrow format which is different to the format in storage. At that time,
    /// this structure is no longer useful and should be removed.
    #[derive(Clone, Debug)]
    pub enum LazyBatchColumn {
    Raw(BufferVec),
    Decoded(VectorValue),
    }

    Buffer 标志了 Vec<u8> 和 bios,
    VectorValue 就是真正 Parse 出来的类型了:

    /// A vector value container, a.k.a. column, for all concrete eval types.
    ///
    /// The inner concrete value is immutable. However it is allowed to push and remove values from
    /// this vector container.
    #[derive(Debug, PartialEq, Clone)]
    pub enum VectorValue {
    Int(Vec<Option<Int>>),
    Real(Vec<Option<Real>>),
    Decimal(Vec<Option<Decimal>>),
    // TODO: We need to improve its performance, i.e. store strings in adjacent memory places
    Bytes(Vec<Option<Bytes>>),
    DateTime(Vec<Option<DateTime>>),
    Duration(Vec<Option<Duration>>),
    Json(Vec<Option<Json>>),
    }

数据类型

上面我们刚聊到了 VectorValue:

/// A vector value container, a.k.a. column, for all concrete eval types.
///
/// The inner concrete value is immutable. However it is allowed to push and remove values from
/// this vector container.
#[derive(Debug, PartialEq, Clone)]
pub enum VectorValue {
Int(Vec<Option<Int>>),
Real(Vec<Option<Real>>),
Decimal(Vec<Option<Decimal>>),
// TODO: We need to improve its performance, i.e. store strings in adjacent memory places
Bytes(Vec<Option<Bytes>>),
DateTime(Vec<Option<DateTime>>),
Duration(Vec<Option<Duration>>),
Json(Vec<Option<Json>>),
}

这几个类型在 codec/data_type/mod.rs 下面

在 tiqb_data_type 中,这些被看作是 EvalType, 实际用于计算的类型。

Scan 的开始

pub struct BatchTableScanExecutor<S: Storage>(ScanExecutor<S, TableScanExecutorImpl>);

TableScan

  • table_id: 所扫描 table 的编号
  • columns: 所关心的列信息
  • desc: 是否倒着扫

其实按理说你不 Scan Index 的话之前有:

Record Data:
t_${table_id}_r_${handle}=>${v1}${v2}${..}

好吧,总之我们会拿到想要的数据。

依赖的 Scan Executor 如下:

/// Common interfaces for table scan and index scan implementations.
pub trait ScanExecutorImpl: Send {
/// Gets the schema.
fn schema(&self) -> &[FieldType];
/// Gets a mutable reference of the executor context.
fn mut_context(&mut self) -> &mut EvalContext;
fn build_column_vec(&self, scan_rows: usize) -> LazyBatchColumnVec;
/// Accepts a key value pair and fills the column vector.
///
/// The column vector does not need to be regular when there are errors during this process.
/// However if there is no error, the column vector must be regular.
fn process_kv_pair(
&mut self,
key: &[u8],
value: &[u8],
columns: &mut LazyBatchColumnVec,
) -> Result<()>;
}
/// A shared executor implementation for both table scan and index scan. Implementation differences
/// between table scan and index scan are further given via `ScanExecutorImpl`.
pub struct ScanExecutor<S: Storage, I: ScanExecutorImpl> {
/// The internal scanning implementation.
imp: I,
/// The scanner that scans over ranges.
scanner: RangesScanner<S>,
/// A flag indicating whether this executor is ended. When table is drained or there was an
/// error scanning the table, this flag will be set to `true` and `next_batch` should be never
/// called again.
is_ended: bool,
}
pub struct ScanExecutorOptions<S, I> {
pub imp: I,
pub storage: S,
pub key_ranges: Vec<KeyRange>,
pub is_backward: bool,
pub is_key_only: bool,
pub accept_point_range: bool,
}

可以看到, Storage 是对要读取的 Storage 的抽象,Imp 是具体的实现

对于 TableScan:

struct TableScanExecutorImpl {
/// Note: Although called `EvalContext`, it is some kind of execution context instead.
// TODO: Rename EvalContext to ExecContext.
context: EvalContext,
/// The schema of the output. All of the output come from specific columns in the underlying
/// storage.
schema: Vec<FieldType>,
/// The default value of corresponding columns in the schema. When column data is missing,
/// the default value will be used to fill the output.
columns_default_value: Vec<Vec<u8>>,
/// The output position in the schema giving the column id.
column_id_index: HashMap<i64, usize>,
/// The index in output row to put the handle.
handle_index: Option<usize>,
/// A vector of flags indicating whether corresponding column is filled in `next_batch`.
/// It is a struct level field in order to prevent repeated memory allocations since its length
/// is fixed for each `next_batch` call.
is_column_filled: Vec<bool>,
}

让我们看下面这坨代码:

impl ScanExecutorImpl for TableScanExecutorImpl {
#[inline]
fn schema(&self) -> &[FieldType] {
&self.schema
}
#[inline]
fn mut_context(&mut self) -> &mut EvalContext {
&mut self.context
}
/// Constructs empty columns, with PK in decoded format and the rest in raw format.
fn build_column_vec(&self, scan_rows: usize) -> LazyBatchColumnVec {
let columns_len = self.schema.len();
let mut columns = Vec::with_capacity(columns_len);
if let Some(handle_index) = self.handle_index {
// PK is specified in schema. PK column should be decoded and the rest is in raw format
// like this:
// non-pk non-pk non-pk pk non-pk non-pk non-pk
// ^handle_index = 3
// ^columns_len = 7
// Columns before `handle_index` (if any) should be raw.
for _ in 0..handle_index {
columns.push(LazyBatchColumn::raw_with_capacity(scan_rows));
}
// For PK handle, we construct a decoded `VectorValue` because it is directly
// stored as i64, without a datum flag, at the end of key.
columns.push(LazyBatchColumn::decoded_with_capacity_and_tp(
scan_rows,
EvalType::Int,
));
// Columns after `handle_index` (if any) should also be raw.
for _ in handle_index + 1..columns_len {
columns.push(LazyBatchColumn::raw_with_capacity(scan_rows));
}
} else {
// PK is unspecified in schema. All column should be in raw format.
for _ in 0..columns_len {
columns.push(LazyBatchColumn::raw_with_capacity(scan_rows));
}
}
assert_eq!(columns.len(), columns_len);
LazyBatchColumnVec::from(columns)
}
fn process_kv_pair(
&mut self,
key: &[u8],
value: &[u8],
columns: &mut LazyBatchColumnVec,
) -> Result<()> {
use crate::codec::{datum, table};
use tikv_util::codec::number;
let columns_len = self.schema.len();
let mut decoded_columns = 0;
if let Some(handle_index) = self.handle_index {
let handle_id = table::decode_handle(key)?;
// TODO: We should avoid calling `push_int` repeatedly. Instead we should specialize
// a `&mut Vec` first. However it is hard to program due to lifetime restriction.
columns[handle_index]
.mut_decoded()
.push_int(Some(handle_id));
decoded_columns += 1;
self.is_column_filled[handle_index] = true;
}
if value.is_empty() || (value.len() == 1 && value[0] == datum::NIL_FLAG) {
// Do nothing
} else {
// The layout of value is: [col_id_1, value_1, col_id_2, value_2, ...]
// where each element is datum encoded.
// The column id datum must be in var i64 type.
let mut remaining = value;
while !remaining.is_empty() && decoded_columns < columns_len {
if remaining[0] != datum::VAR_INT_FLAG {
return Err(other_err!(
"Unable to decode row: column id must be VAR_INT"
));
}
remaining = &remaining[1..];
let column_id = box_try!(number::decode_var_i64(&mut remaining));
let (val, new_remaining) = datum::split_datum(remaining, false)?;
// Note: The produced columns may be not in the same length if there is error due
// to corrupted data. It will be handled in `ScanExecutor`.
let some_index = self.column_id_index.get(&column_id);
if let Some(index) = some_index {
let index = *index;
if !self.is_column_filled[index] {
columns[index].mut_raw().push(val);
decoded_columns += 1;
self.is_column_filled[index] = true;
} else {
// This indicates that there are duplicated elements in the row, which is
// unexpected. We won't abort the request or overwrite the previous element,
// but will output a log anyway.
warn!(
"Ignored duplicated row datum in table scan";
"key" => hex::encode_upper(&key),
"value" => hex::encode_upper(&value),
"dup_column_id" => column_id,
);
}
}
remaining = new_remaining;
}
}
// Some fields may be missing in the row, we push corresponding default value to make all
// columns in same length.
for i in 0..columns_len {
if !self.is_column_filled[i] {
// Missing fields must not be a primary key, so it must be
// `LazyBatchColumn::raw`.
let default_value = if !self.columns_default_value[i].is_empty() {
// default value is provided, use the default value
self.columns_default_value[i].as_slice()
} else if !self.schema[i]
.as_accessor()
.flag()
.contains(tidb_query_datatype::FieldTypeFlag::NOT_NULL)
{
// NULL is allowed, use NULL
datum::DATUM_DATA_NULL
} else {
return Err(other_err!(
"Data is corrupted, missing data for NOT NULL column (offset = {})",
i
));
};
columns[i].mut_raw().push(default_value);
} else {
// Reset to not-filled, prepare for next function call.
self.is_column_filled[i] = false;
}
}
Ok(())
}
}
  • schema 和 mut_context 很好理解
  • build_column_vec

然后这里有个具体的实现:

impl<S: Storage, I: ScanExecutorImpl> BatchExecutor for ScanExecutor<S, I> {
type StorageStats = S::Statistics;
#[inline]
fn schema(&self) -> &[FieldType] {
self.imp.schema()
}
#[inline]
fn next_batch(&mut self, scan_rows: usize) -> BatchExecuteResult {
assert!(!self.is_ended);
assert!(scan_rows > 0);
let mut logical_columns = self.imp.build_column_vec(scan_rows);
let is_drained = self.fill_column_vec(scan_rows, &mut logical_columns);
logical_columns.assert_columns_equal_length();
let \logical_rows = (0..logical_columns.rows_len()).collect();
// TODO
// If `is_drained.is_err()`, it means that there is an error after *successfully* retrieving
// these rows. After that, if we only consumes some of the rows (TopN / Limit), we should
// ignore this error.
match &is_drained {
// Note: `self.is_ended` is only used for assertion purpose.
Err(_) | Ok(true) => self.is_ended = true,
Ok(false) => {}
};
BatchExecuteResult {
physical_columns: logical_columns,
logical_rows,
is_drained,
warnings: self.imp.mut_context().take_warnings(),
}
}
#[inline]
fn collect_exec_stats(&mut self, dest: &mut ExecuteStats) {
self.scanner
.collect_scanned_rows_per_range(&mut dest.scanned_rows_per_range);
}
#[inline]
fn collect_storage_stats(&mut self, dest: &mut Self::StorageStats) {
self.scanner.collect_storage_stats(dest);
}
}

可以看到build_column_vecfill_column_vec:

/// Fills a column vector and returns whether or not all ranges are drained.
///
/// The columns are ensured to be regular even if there are errors during the process.
fn fill_column_vec(
&mut self,
scan_rows: usize,
columns: &mut LazyBatchColumnVec,
) -> Result<bool> {
assert!(scan_rows > 0);
for _ in 0..scan_rows {
let some_row = self.scanner.next()?;
if let Some((key, value)) = some_row {
// Retrieved one row from point range or non-point range.
if let Err(e) = self.imp.process_kv_pair(&key, &value, columns) {
// When there are errors in `process_kv_pair`, columns' length may not be
// identical. For example, the filling process may be partially done so that
// first several columns have N rows while the rest have N-1 rows. Since we do
// not immediately fail when there are errors, these irregular columns may
// further cause future executors to panic. So let's truncate these columns to
// make they all have N-1 rows in that case.
columns.truncate_into_equal_length();
return Err(e);
}
} else {
// Drained
return Ok(true);
}
}
// Not drained
Ok(false)
}

Process KV Pair 真正开始处理 KV Pair:

fn process_kv_pair(
&mut self,
key: &[u8],
value: &[u8],
columns: &mut LazyBatchColumnVec,
) -> Result<()> {
use crate::codec::{datum, table};
use tikv_util::codec::number;
let columns_len = self.schema.len();
let mut decoded_columns = 0;
if let Some(handle_index) = self.handle_index {
let handle_id = table::decode_handle(key)?;
// TODO: We should avoid calling `push_int` repeatedly. Instead we should specialize
// a `&mut Vec` first.
// However it is hard to program due to lifetime restriction.
columns[handle_index]
.mut_decoded()
.push_int(Some(handle_id));
decoded_columns += 1;
self.is_column_filled[handle_index] = true;
}
if value.is_empty() || (value.len() == 1 && value[0] == datum::NIL_FLAG) {
// Do nothing
} else {
// The layout of value is: [col_id_1, value_1, col_id_2, value_2, ...]
// where each element is datum encoded.
// The column id datum must be in var i64 type.
let mut remaining = value;
while !remaining.is_empty() && decoded_columns < columns_len {
if remaining[0] != datum::VAR_INT_FLAG {
return Err(other_err!(
"Unable to decode row: column id must be VAR_INT"
));
}
remaining = &remaining[1..];
let column_id = box_try!(number::decode_var_i64(&mut remaining));
let (val, new_remaining) = datum::split_datum(remaining, false)?;
// Note: The produced columns may be not in the same length if there is error due
// to corrupted data. It will be handled in `ScanExecutor`.
let some_index = self.column_id_index.get(&column_id);
if let Some(index) = some_index {
let index = *index;
if !self.is_column_filled[index] {
columns[index].mut_raw().push(val);
decoded_columns += 1;
self.is_column_filled[index] = true;
} else {
// This indicates that there are duplicated elements in the row, which is
// unexpected. We won't abort the request or overwrite the previous element,
// but will output a log anyway.
warn!(
"Ignored duplicated row datum in table scan";
"key" => hex::encode_upper(&key),
"value" => hex::encode_upper(&value),
"dup_column_id" => column_id,
);
}
}
remaining = new_remaining;
}
}
// Some fields may be missing in the row, we push corresponding default value to make all
// columns in same length.
for i in 0..columns_len {
if !self.is_column_filled[i] {
// Missing fields must not be a primary key, so it must be
// `LazyBatchColumn::raw`.
let default_value = if !self.columns_default_value[i].is_empty() {
// default value is provided, use the default value
self.columns_default_value[i].as_slice()
} else if !self.schema[i]
.as_accessor()
.flag()
.contains(tidb_query_datatype::FieldTypeFlag::NOT_NULL)
{
// NULL is allowed, use NULL
datum::DATUM_DATA_NULL
} else {
return Err(other_err!(
"Data is corrupted, missing data for NOT NULL column (offset = {})",
i
));
};
columns[i].mut_raw().push(default_value);
} else {
// Reset to not-filled, prepare for next function call.
self.is_column_filled[i] = false;
}
}
Ok(())
}
  1. Scanner 对 table 进行 Scan (废话)
  2. 每一行:只有一定的列是需要的
  3. ColumnInfo 是这些列的信息

rpn_expr/types

这个包相当重要,可以看 mod.rs

mod expr;
mod expr_builder;
mod expr_eval;
pub mod function;
#[cfg(test)]
pub mod test_util;
pub use self::expr::{RpnExpression, RpnExpressionNode};
pub use self::expr_builder::RpnExpressionBuilder;
pub use self::expr_eval::RpnStackNode;
pub use self::function::{RpnFnCallExtra, RpnFnMeta};

先不看 test_util. 这个 cfg(test) 的包。可以看到,对于最重要的 expr:

/// A type for each node in the RPN expression list.
#[derive(Debug, Clone)]
pub enum RpnExpressionNode {
/// Represents a function call.
FnCall {
// 调用函数的信息
func_meta: RpnFnMeta,
// 调用的参数的长度
args_len: usize,
// field 的结果类型
field_type: FieldType,
implicit_args: Vec<ScalarValue>,
},
/// Represents a scalar constant value.
Constant {
value: ScalarValue,
field_type: FieldType,
},
/// Represents a reference to a column in the columns specified in evaluation.
ColumnRef { offset: usize },
}

ScalarValue 我们之前讲过,这玩意用 enum 静态的表示 RpnExpression 的数据类型:

#[derive(Clone, Debug, PartialEq)]
pub enum ScalarValue {
Int(Option<super::Int>),
Real(Option<super::Real>),
Decimal(Option<super::Decimal>),
Bytes(Option<super::Bytes>),
DateTime(Option<super::DateTime>),
Duration(Option<super::Duration>),
Json(Option<super::Json>),
}

可以看到,这还有逆波兰表达式呢:

/// An expression in Reverse Polish notation, which is simply a list of RPN expression nodes.
///
/// You may want to build it using `RpnExpressionBuilder`.
#[derive(Debug, Clone)]
pub struct RpnExpression(Vec<RpnExpressionNode>);

用 RpnExpression 的 Vector 装一堆表达式,便于求值

/// Helper to build an `RpnExpression`.
#[derive(Debug, Clone)]
pub struct RpnExpressionBuilder(Vec<RpnExpressionNode>);

这里表达式/执行计划来自于 tipb,可以看到:

message FieldType {
optional int32 tp = 1 [(gogoproto.nullable) = false];
optional uint32 flag = 2 [(gogoproto.nullable) = false];
optional int32 flen = 3 [(gogoproto.nullable) = false];
optional int32 decimal = 4 [(gogoproto.nullable) = false];
optional int32 collate = 5 [(gogoproto.nullable) = false];
optional string charset = 6 [(gogoproto.nullable) = false];
}

对于 Expr:

// Evaluators should implement evaluation functions for every expression type.
message Expr {
optional ExprType tp = 1 [(gogoproto.nullable) = false];
optional bytes val = 2;
repeated Expr children = 3;
optional ScalarFuncSig sig = 4 [(gogoproto.nullable) = false];
optional FieldType field_type = 5;
}

ScalarFuncSig 是我们挑选一堆孩子,并决定自己状况的。可以看到,拿到请求的时候,这里就已经有对应的方法了,在这里有:

/// Builds the RPN expression node list from an expression definition tree.
pub fn build_from_expr_tree(
tree_node: Expr,
time_zone: &Tz,
max_columns: usize,
) -> Result<RpnExpression> {
let mut expr_nodes = Vec::new();
append_rpn_nodes_recursively(
tree_node,
&mut expr_nodes,
time_zone,
super::super::map_pb_sig_to_rpn_func,
max_columns,
)?;
Ok(RpnExpression::from(expr_nodes))
}

map_pb_sig_to_rpn_func 有 rpn_function 的 FunSig 和 实现的 match 映射,重要的代码是 append_rpn_nodes_recursively,按照注释(和代码逻辑),这段把 tree_node 这里树的结构拆成了逆波兰表达式的 RpnExpression. Ok(RpnExpression::from(expr_nodes)) 这里并没有什么特殊逻辑,只是 Vector 外面包装一层:

fn append_rpn_nodes_recursively<F>(
tree_node: Expr,
rpn_nodes: &mut Vec<RpnExpressionNode>,
time_zone: &Tz,
fn_mapper: F,
max_columns: usize,
// TODO: Passing `max_columns` is only a workaround solution that works when we only check
// column offset. To totally check whether or not the expression is valid, we need to pass in
// the full schema instead.
) -> Result<()>
where
F: Fn(tipb::ScalarFuncSig, &[Expr]) -> Result<RpnFnMeta> + Copy,
{
match tree_node.get_tp() {
ExprType::ScalarFunc => {
handle_node_fn_call(tree_node, rpn_nodes, time_zone, fn_mapper, max_columns)
}
ExprType::ColumnRef => handle_node_column_ref(tree_node, rpn_nodes, max_columns),
_ => handle_node_constant(tree_node, rpn_nodes, time_zone),
}
}

这玩意其实很简单了:

  1. 如果是 ConstExpr, 就根据 node 的类型,加入一个 ScalarValue

    1. tree_node 有一个 expr 类型,求出 EvalType ,和目前 ScalarValue 的 type 对比 (ExprType 是 MySQL 那边的类型,EvalType 是 cop 实际处理的类型)。
  2. 如果是 ScalarFunc:

    1. 调用 map_pb_sig_to_rpn_func 找到对应的 FuncMeta (这个很重要), &[Expr] 内容全是他自己的崽种,所以不用怕
    2. 调用 validate 函数,验证参数合理性
    3. 递归先添加所有子节点

    有一个问题是函数的参数数目,可以看到 FnCall 的结构是:

    FnCall {
    func_meta: RpnFnMeta,
    args_len: usize,
    field_type: FieldType,
    implicit_args: Vec<ScalarValue>,
    }

    这里提供了 args_len 和 implicit_args 来保存元信息,便于后续的调用。

关于 validator_fn. 它会:

let validator_fn = ValidatorFnGenerator::new()
.validate_return_type(&self.ret_type) // 验证返回类型
.validate_min_args(self.min_args) // 验证最小参数
.validate_by_fn(&self.extra_validator) // 调用 extra_validator
.generate(&impl_generics, where_clause); // 生成实际的函数

那么,在这里,函数实际调用之前,build的时候会生成一个复杂的函数,这里我打印了这一段 json_object 的生成的代码:

pub const fn json_object_fn_meta () -> crate :: rpn_expr :: RpnFnMeta
{
# [inline] fn run
(ctx : & mut crate :: expr :: EvalContext , output_rows : usize , args : &
[crate :: rpn_expr :: RpnStackNode < '_ >] , extra : & mut crate ::
rpn_expr :: RpnFnCallExtra < '_ > ,) -> crate :: Result < crate :: codec
:: data_type :: VectorValue >
{
crate :: rpn_expr :: function :: RAW_VARG_PARAM_BUF . with
(| mut vargs_buf | {
let mut vargs_buf = vargs_buf.borrow_mut () ;
let args_len = args.len () ;
let mut result = Vec :: with_capacity
(output_rows) ; for row_index in 0 .. output_rows
{
vargs_buf . clear () ; for arg_index in 0 .. args_len
{
let scalar_arg = args [arg_index] .
get_logical_scalar_ref (row_index) ; let scalar_arg =
unsafe
{
std :: mem :: transmute :: < ScalarValueRef < '_ > ,
ScalarValueRef < 'static >> (scalar_arg ,)
} ; vargs_buf . push (scalar_arg) ;
} result . push (json_object (vargs_buf . as_slice ()) ?) ;
} Ok (Evaluable :: into_vector_value (result))
})
}
fn validate (expr : & tipb :: Expr) -> crate :: Result < () >
{
use crate :: codec :: data_type :: Evaluable ;
use crate::rpn_expr::function ;
println ! ("你妈死了") ;
function :: validate_expr_return_type (expr , Json :: EVAL_TYPE) ? ;
json_validator (expr) ? ;
Ok (())
}
crate :: rpn_expr :: RpnFnMeta { name : "json_object" , validator_ptr : validate , fn_ptr : run , }
}

tidb_query_codegenrpn_functions 有过程宏来生成这段的代码,我就不讲了。

实际调用的时候:

pub fn new(
config: std::sync::Arc<EvalConfig>,
src: Src,
order_exprs_def: Vec<Expr>,
order_is_desc: Vec<bool>,
n: usize,
) -> Result<Self> {
assert_eq!(order_exprs_def.len(), order_is_desc.len());
let mut order_exprs = Vec::with_capacity(order_exprs_def.len());
for def in order_exprs_def {
order_exprs.push(RpnExpressionBuilder::build_from_expr_tree(
def,
&config.tz,
src.schema().len(),
)?);
}
Ok(Self {
// Avoid large N causing OOM
heap: BinaryHeap::with_capacity(n.min(1024)),
// Simply large enough to avoid repeated allocations
eval_columns_buffer_unsafe: Box::new(Vec::with_capacity(512)),
order_exprs: order_exprs.into_boxed_slice(),
order_is_desc: order_is_desc.into_boxed_slice(),
n,
context: EvalContext::new(config),
src,
is_ended: false,
})
}

这个是 top_n_executor 的 new

具体 eval 表达式的时候:

/// Evaluates the expression into a vector.
///
/// If referred columns are not decoded, they will be decoded according to the given schema.
///
/// # Panics
///
/// Panics if the expression is not valid.
///
/// Panics when referenced column does not have equal length as specified in `rows`.
pub fn eval<'a>(
&'a self,
ctx: &mut EvalContext,
schema: &'a [FieldType],
input_physical_columns: &'a mut LazyBatchColumnVec,
input_logical_rows: &'a [usize],
output_rows: usize,
) -> Result<RpnStackNode<'a>> {
// We iterate two times. The first time we decode all referred columns. The second time
// we evaluate. This is to make Rust's borrow checker happy because there will be
// mutable reference during the first iteration and we can't keep these references.
self.ensure_columns_decoded(
&ctx.cfg.tz,
schema,
input_physical_columns,
input_logical_rows,
)?;
self.eval_decoded(
ctx,
schema,
input_physical_columns,
input_logical_rows,
output_rows,
)
}
  1. 确保 decode
  2. 调用 eval_decoded
/// Evaluates the expression into a vector. The input columns must be already decoded.
///
/// It differs from `eval` in that `eval_decoded` needn't receive a mutable reference
/// to `LazyBatchColumnVec`. However, since `eval_decoded` doesn't decode columns,
/// it will panic if referred columns are not decoded.
///
/// # Panics
///
/// Panics if the expression is not valid.
///
/// Panics if referred columns are not decoded.
///
/// Panics when referenced column does not have equal length as specified in `rows`.
pub fn eval_decoded<'a>(
&'a self,
ctx: &mut EvalContext,
schema: &'a [FieldType],
input_physical_columns: &'a LazyBatchColumnVec,
input_logical_rows: &'a [usize],
output_rows: usize,
) -> Result<RpnStackNode<'a>> {
assert!(output_rows > 0);
let mut stack = Vec::with_capacity(self.len());
// Logical rows for generated columns
// TODO: Eliminate allocation
let identity_logical_rows: Vec<_> = (0..output_rows).collect();
let identity_logical_rows = Arc::from(identity_logical_rows);
for node in self.as_ref() {
match node {
RpnExpressionNode::Constant { value, field_type } => {
stack.push(RpnStackNode::Scalar {
value: &value,
field_type,
});
}
RpnExpressionNode::ColumnRef { offset } => {
let field_type = &schema[*offset];
let decoded_physical_column = input_physical_columns[*offset].decoded();
assert_eq!(input_logical_rows.len(), output_rows);
stack.push(RpnStackNode::Vector {
value: RpnStackNodeVectorValue::Ref {
physical_value: &decoded_physical_column,
logical_rows: input_logical_rows,
},
field_type,
});
}
RpnExpressionNode::FnCall {
func_meta,
args_len,
field_type: ret_field_type,
implicit_args,
} => {
// Suppose that we have function call `Foo(A, B, C)`, the RPN nodes looks like
// `[A, B, C, Foo]`.
// Now we receives a function call `Foo`, so there are `[A, B, C]` in the stack
// as the last several elements. We will directly use the last N (N = number of
// arguments) elements in the stack as function arguments.
assert!(stack.len() >= *args_len);
let stack_slice_begin = stack.len() - *args_len;
let stack_slice = &stack[stack_slice_begin..];
let mut call_extra = RpnFnCallExtra {
ret_field_type,
implicit_args,
};
let ret = (func_meta.fn_ptr)(ctx, output_rows, stack_slice, &mut call_extra)?;
stack.truncate(stack_slice_begin);
stack.push(RpnStackNode::Vector {
value: RpnStackNodeVectorValue::Generated {
physical_value: ret,
logical_rows: Arc::clone(&identity_logical_rows),
},
field_type: ret_field_type,
});
}
}
}
assert_eq!(stack.len(), 1);
Ok(stack.into_iter().next().unwrap())
}
}
  1. 备好运行时 stack

stack 里面的成员出现在:

/// A type for each node in the RPN evaluation stack. It can be one of a scalar value node or a
/// vector value node. The vector value node can be either an owned vector value or a reference.
#[derive(Debug)]
pub enum RpnStackNode<'a> {
/// Represents a scalar value. Comes from a constant node in expression list.
Scalar {
value: &'a ScalarValue,
field_type: &'a FieldType,
},
/// Represents a vector value. Comes from a column reference or evaluated result.
Vector {
value: RpnStackNodeVectorValue<'a>,
field_type: &'a FieldType,
},
}

OK,这就是 RpnStackNode, 可能是 Cost 的 ScalarValue, 也会是一组 vec,最后求值即可。