TiKV Coprocessor(1)

TiKV Coprocessor 和代码阅读 (报菜名)

TiKV coprocessor 类似概念:

  • MySQL 存储过程
  • HBase Coprocessor 的 Endpoint

以下参考这篇 blog 和对应的代码作出分析:

  • TiDB 查询流程
  • TiDB table 存储格式
  • TiKV cop
  • TiKV 源码

对于 TiKV 的 coprocessor,有下面两种概念:

  1. For Split, before the Region split requests are truly proposed, the split key needs to be checked if it is legal. For example, for a Row in TiDB, there are many versions of it in TiKV, such as V1, V2, and V3, V3 being the latest version. Assuming that V2 is the selected split key, then the data of the Row might be split to two different Regions, which means the data in the Row cannot be handled atomically. Therefore, the Split Coprocessor will adjust the split key to V1. In this way, the data in this Row is still in the same Region during the splitting.
  2. For push-down, the Coprocessor is used to improve the performance of TiDB. For some operations like select count(*), there is no need for TiDB to get data from row to row first and then count. The quicker way is that TiDB pushes down these operations to the corresponding TiKV nodes, the TiKV nodes do the computing and then TiDB consolidates the final results.

(这是我原样抄来的)

前面一个在src/raftstore 里头定义,后面在 src/coprocessor 里,提供 coprocessor api


TiDB Select 流程

tidb_select_schedule

我整理一下流程

  1. Client Send Request
  2. TiDB 从缓冲中找/计算出 执行计划
  3. get start_ts from pd
  4. 拿到对应的 table 的 key ranges, 并且找到 key ranges 对应的 机器
  5. 并行对TiKV 机器发送 coprocessor task
  6. 拿到数据
  7. 进行必要的处理

实际上流程在上面的图和 blog 中更详细些

  1. tidb 从缓存中获取 information_schema,若没有,从 tikv 获取 information_schema。
  2. tidb 从 information_schema 中获取到当前用户所操作的 table 的元信息。
  3. tidb 根据准备好的执行计划,将 tidb 这边的 keyrange 带上 table 的元信息后组织成 tikv 的 keyrange。
  4. tidb 从缓存或 PD 获取每个 keyrange 所在的 regions 信息。
  5. tidb 根据 regions 对 keyrange 进行分组。

这五步表示我的4,其实做的事情还是很多

table 存储数据的格式

Record Data:
t_${table_id}_r_${handle}=>${v1}${v2}${..}
Unique Index:
t_${table_id}_i_${index_id}${v1}${v2}=>${handle}
Non-Unique Index:
t_${table_id}_i_${index_id}${v1}${v2}${v...}${handle}=>null
  • 这里 v1, v2 都是 列上的值,以 Datum 的形式表示

    tikv_cop_datum
    在这上面完成处理

  • table_id 是对应的 table 的 id

  • handle: TiDB 内部会给每一行数据赋予一个连续递增的整数作为其 handle(目前,如果当前表有连续递增的 primary key, 则将其设为 handle)— handle 用来保证数据的唯一性,你看 unique_index 的 handle 就是个 value

为了防止冲突,这里对一些东西改造了一下编码,做了一下必要的转化

var (
tablePrefix = []byte{'t'}
recordPrefixSep = []byte("_r")
indexPrefixSep = []byte("_i")
)

这个在 TiDB 的 tablecodec 能够找到对应的代码

Coprocessor 和下推

在 17 年的博客中,有:

目前Coprocessor 支持的接口有以下三种:

  1. Select 旧的查询下推,目前仅用于 TiSpark, 即将废除。
  2. DAG 查询下推,新查询下推接口,以应用于 TiDB 中。
  3. Analyze 统计下推,主要用于统计相关的下推,以协助 join 相关的优化, 以应用于 TiDB。

这边当时介绍了 DAG 的接口,我们一起来看看:

cop_dag_excutor

其实这图我还是蛮熟悉的,当初 explain 的时候好像接触了很多2333

现在我们来看看 coprocessor::dag::batch::executors:

pub struct BatchTableScanExecutor<S: Store>(
super::util::scan_executor::ScanExecutor<
S,
TableScanExecutorImpl,
super::util::ranges_iter::PointRangeEnable,
>,
);

(什么几把玩意——)

然后这边实现了接口:

impl<S: Store> BatchExecutor for BatchTableScanExecutor<S> {
#[inline]
fn schema(&self) -> &[FieldType] {
self.0.schema()
}
#[inline]
fn next_batch(&mut self, scan_rows: usize) -> BatchExecuteResult {
self.0.next_batch(scan_rows)
}
#[inline]
fn collect_statistics(&mut self, destination: &mut BatchExecuteStatistics) {
self.0.collect_statistics(destination);
}
}

哦,它实现了 BatchExecutor 这个 trait:

/// The interface for pull-based executors. It is similar to the Volcano Iterator model, but
/// pulls data in batch and stores data by column.
pub trait BatchExecutor: Send {
/// Gets the schema of the output.
///
/// Provides an `Arc` instead of a pure reference to make it possible to share this schema in
/// multiple executors. Actually the schema is only possible to be shared in executors, but it
/// requires the ability to store a reference to a field in the same structure. There are other
/// solutions so far but looks like `Arc` is the simplest one.
fn schema(&self) -> &[FieldType];
/// Pulls next several rows of data (stored by column).
///
/// This function might return zero rows, which doesn't mean that there is no more result.
/// See `is_drained` in `BatchExecuteResult`.
fn next_batch(&mut self, scan_rows: usize) -> BatchExecuteResult;
/// Collects statistics (including but not limited to metrics and execution summaries)
/// accumulated during execution and prepares for next collection.
///
/// The executor implementation must invoke this function for each children executor. However
/// the invocation order of children executors is not stipulated.
///
/// This function may be invoked several times during execution. For each invocation, it should
/// not contain accumulated meta data in last invocation. Normally the invocation frequency of
/// this function is less than `next_batch()`.
fn collect_statistics(&mut self, destination: &mut BatchExecuteStatistics);
fn with_summary_collector<C: ExecSummaryCollector + Send>(
self,
summary_collector: C,
) -> WithSummaryCollector<C, Self>
where
Self: Sized,
{
WithSummaryCollector {
summary_collector,
inner: self,
}
}
}

同时,我们也可以看到博客中的基本的 Executor,在

pub trait Executor {
fn next(&mut self) -> Result<Option<Row>>;
fn collect_output_counts(&mut self, counts: &mut Vec<i64>);
fn collect_metrics_into(&mut self, metrics: &mut ExecutorMetrics);
fn collect_execution_summaries(&mut self, target: &mut [ExecSummary]);
fn get_len_of_columns(&self) -> usize;
/// Only executors with eval computation need to implement `take_eval_warnings`
/// It returns warnings happened during eval computation.
fn take_eval_warnings(&mut self) -> Option<EvalWarnings> {
None
}
/// Only `TableScan` and `IndexScan` need to implement `start_scan`.
fn start_scan(&mut self) {}
/// Only `TableScan` and `IndexScan` need to implement `stop_scan`.
///
/// It returns a `KeyRange` the executor has scaned.
fn stop_scan(&mut self) -> Option<KeyRange> {
None
}
fn with_summary_collector<C: ExecSummaryCollector>(
self,
summary_collector: C,
) -> WithSummaryCollector<C, Self>
where
Self: Sized,
{
WithSummaryCollector {
summary_collector,
inner: self,
}
}
}

差别大概在火山模型这个 走Batch 上,我们之后会介绍。

对于 Executor, next 作为它的数据源;对于 BatchExecutor,这边会 Batch 的返回数据,便于处理。

调用 coprocessor 我们可以在 tipb 的 repo 中找到:

// It represents a Executor.
message Executor {
optional ExecType tp = 1 [(gogoproto.nullable) = false];
optional TableScan tbl_scan = 2;
optional IndexScan idx_scan = 3;
optional Selection selection = 4;
optional Aggregation aggregation = 5;
optional TopN topN = 6;
optional Limit limit = 7;
optional Aggregation stream_agg= 8;
}
message TableScan {
optional int64 table_id = 1 [(gogoproto.nullable) = false];
repeated ColumnInfo columns = 2;
optional bool desc = 3 [(gogoproto.nullable) = false];
}
message IndexScan {
optional int64 table_id = 1 [(gogoproto.nullable) = false];
optional int64 index_id = 2 [(gogoproto.nullable) = false];
repeated ColumnInfo columns = 3;
optional bool desc = 4 [(gogoproto.nullable) = false];
optional bool unique = 5; // check whether it is a unique index.
}

我们结合之前说到的编码方式看看:

Executor

看到了我之前做测试的时候很熟悉的几个算子(类型):

  • ExecType: 这个是啥我不知道
  • TableScan: 扫整张table,来拿取对应的信息
  • IndexScan:对于 UniqueIndex 和
  • Selection: Select 算子
  • Aggregation: Agg 算子,聚合
  • TopN:Limit 算子
  • stream_agg:Index 相关, Stream 聚合

请求的处理

Most TiDB read queries are processed by Coprocessor instead of KV interface. By doing so, the CPU of TiKV nodes can be utilized for computing and the amount of data to transfer can be reduced (i.e. filtered at TiKV side).

Notice that Coprocessor handles more than simple SQL query executors (DAG request). It also handles analyzing requests and checksum requests.

The entry point of handling all coprocessor requests is Endpoint. Common steps are:

  1. Parse the request into a DAG request, Checksum request or Analyze request.
  2. Retrieve a snapshot from the underlying engine according to the given timestamp.
  3. Build corresponding request handlers from the snapshot and request detail. (根据这个请求动态编译生成?)
  4. Run request handlers once (for unary requests) or multiple times (for streaming requests) on a future threadpool. (该执行了)
  5. Return handling result as a response.

Please refer to Endpoint for more details.

Endpoint 这里会尝试编译生成对应的目标:

pub struct Endpoint<E: Engine> { /* fields omitted */ }

Endpoint 可以拿到对应的 存储引擎,处理返回对应的 Response。

对于 Executor 的处理,有:

  • Executors that only produce rows (i.e. fetch data from the KV layer)

Samples: TableScanExecutor, IndexScanExecutor

Obviously, this kind of executor must be the first executor in the pipeline.

  • Executors that only work over previous executor’s output row and produce a new row (or just eat it)

Samples: SelectionExecutor, AggregationExecutor, LimitExecutor, etc

Obviously, this kind of executor must not be the first executor in the pipeline.

DAGBuilder 会帮我们把目标打包成 Executor.

有 DAGRequestHandler 和 BatchDAGHandler,后者由前者构建而来。

TiKV

代码的入口在 src/server/service/kv.rs:

fn coprocessor(&mut self, ctx: RpcContext<'_>, req: Request, sink: UnarySink<Response>) {
let timer = GRPC_MSG_HISTOGRAM_VEC.coprocessor.start_coarse_timer();
let future = future_cop(&self.cop, req, Some(ctx.peer()))
.and_then(|resp| sink.success(resp).map_err(Error::from))
.map(|_| timer.observe_duration())
.map_err(move |e| {
debug!("kv rpc failed";
"request" => "coprocessor",
"err" => ?e
);
GRPC_MSG_FAIL_COUNTER.coprocessor.inc();
});
ctx.spawn(future);
}
fn coprocessor_stream(
&mut self,
ctx: RpcContext<'_>,
req: Request,
sink: ServerStreamingSink<Response>,
) {
let timer = GRPC_MSG_HISTOGRAM_VEC
.coprocessor_stream
.start_coarse_timer();
let stream = self
.cop
.parse_and_handle_stream_request(req, Some(ctx.peer()))
.map(|resp| (resp, WriteFlags::default().buffer_hint(true)))
.map_err(|e| {
let code = RpcStatusCode::Unknown;
let msg = Some(format!("{:?}", e));
GrpcError::RpcFailure(RpcStatus::new(code, msg))
});
let future = sink
.send_all(stream)
.map(|_| timer.observe_duration())
.map_err(Error::from)
.map_err(move |e| {
debug!("kv rpc failed";
"request" => "coprocessor_stream",
"err" => ?e
);
GRPC_MSG_FAIL_COUNTER.coprocessor_stream.inc();
});
ctx.spawn(future);
}

我们可以看到 self.cop,哦,还有 future_cop 的定义:

fn future_cop<E: Engine>(
cop: &Endpoint<E>,
req: Request,
peer: Option<String>,
) -> impl Future<Item = Response, Error = Error> {
cop.parse_and_handle_unary_request(req, peer)
.map_err(|_| unreachable!())
}

哦, cop 是一个 EndPoint, 利用 MVCC Api,读取后解析并处理结果,在这个过程中,我们返回到之前的:

The entry point of handling all coprocessor requests is Endpoint. Common steps are:

  1. Parse the request into a DAG request, Checksum request or Analyze request.
  2. Retrieve a snapshot from the underlying engine according to the given timestamp.
  3. Build corresponding request handlers from the snapshot and request detail. (根据这个请求动态编译生成?)
  4. Run request handlers once (for unary requests) or multiple times (for streaming requests) on a future threadpool. (该执行了)
  5. Return handling result as a response.

我们滚去 endpoint.rs:

/// Handle a unary request and run on the read pool.
///
/// Returns `Err(err)` if the read pool is full. Returns `Ok(future)` in other cases.
/// The future inside may be an error however.
fn handle_unary_request(
&self,
req_ctx: ReqContext,
handler_builder: RequestHandlerBuilder<E::Snap>,
) -> Result<impl Future<Item = coppb::Response, Error = Error>> {
// 我真不知道这是啥
let priority = readpool::Priority::from(req_ctx.context.get_priority());
// box the tracker so that moving it is cheap.
let tracker = Box::new(Tracker::new(req_ctx));
self.read_pool
.spawn_handle(priority, move || {
Self::handle_unary_request_impl(tracker, handler_builder)
})
.map_err(|_| Error::Full)
}
/// Parses and handles a unary request. Returns a future that will never fail. If there are
/// errors during parsing or handling, they will be converted into a `Response` as the success
/// result of the future.
#[inline]
pub fn parse_and_handle_unary_request(
&self,
req: coppb::Request,
peer: Option<String>,
) -> impl Future<Item = coppb::Response, Error = ()> {
let result_of_future =
self.parse_request(req, peer, false)
.and_then(|(handler_builder, req_ctx)| {
self.handle_unary_request(req_ctx, handler_builder)
});
future::result(result_of_future)
.flatten()
.or_else(|e| Ok(make_error_response(e)))
}

这里调用了 handle_unary_request_implparse_request 我们把重心放在后者上

/// Parse the raw `Request` to create `RequestHandlerBuilder` and `ReqContext`.
/// Returns `Err` if fails.
fn parse_request(
&self,
mut req: coppb::Request,
peer: Option<String>,
is_streaming: bool,
) -> Result<(RequestHandlerBuilder<E::Snap>, ReqContext)> {
fail_point!("coprocessor_parse_request", |_| Err(box_err!(
"unsupported tp (failpoint)"
)));
let (context, data, ranges) = (
req.take_context(),
req.take_data(),
req.take_ranges().to_vec(),
);
let mut is = CodedInputStream::from_bytes(&data);
is.set_recursion_limit(self.recursion_limit);
let req_ctx: ReqContext;
let builder: RequestHandlerBuilder<E::Snap>;
match req.get_tp() {
REQ_TYPE_DAG => {
let mut dag = DAGRequest::new();
box_try!(dag.merge_from(&mut is));
let mut table_scan = false;
let mut is_desc_scan = false;
if let Some(scan) = dag.get_executors().iter().next() {
table_scan = scan.get_tp() == ExecType::TypeTableScan;
if table_scan {
is_desc_scan = scan.get_tbl_scan().get_desc();
} else {
is_desc_scan = scan.get_idx_scan().get_desc();
}
}
req_ctx = ReqContext::new(
make_tag(table_scan),
context,
ranges.as_slice(),
self.max_handle_duration,
peer,
Some(is_desc_scan),
Some(dag.get_start_ts()),
);
let batch_row_limit = self.get_batch_row_limit(is_streaming);
let enable_batch_if_possible = self.enable_batch_if_possible;
builder = Box::new(move |snap, req_ctx: &ReqContext| {
// TODO: Remove explicit type once rust-lang#41078 is resolved
let store = SnapshotStore::new(
snap,
dag.get_start_ts(),
req_ctx.context.get_isolation_level(),
!req_ctx.context.get_not_fill_cache(),
);
dag::DAGBuilder::build(
dag,
ranges,
store,
req_ctx.deadline,
batch_row_limit,
is_streaming,
enable_batch_if_possible,
)
});
}
REQ_TYPE_ANALYZE => {
let mut analyze = AnalyzeReq::new();
box_try!(analyze.merge_from(&mut is));
let table_scan = analyze.get_tp() == AnalyzeType::TypeColumn;
req_ctx = ReqContext::new(
make_tag(table_scan),
context,
ranges.as_slice(),
self.max_handle_duration,
peer,
None,
Some(analyze.get_start_ts()),
);
builder = Box::new(move |snap, req_ctx: &_| {
// TODO: Remove explicit type once rust-lang#41078 is resolved
statistics::analyze::AnalyzeContext::new(analyze, ranges, snap, req_ctx)
.map(|h| h.into_boxed())
});
}
REQ_TYPE_CHECKSUM => {
let mut checksum = ChecksumRequest::new();
box_try!(checksum.merge_from(&mut is));
let table_scan = checksum.get_scan_on() == ChecksumScanOn::Table;
req_ctx = ReqContext::new(
make_tag(table_scan),
context,
ranges.as_slice(),
self.max_handle_duration,
peer,
None,
Some(checksum.get_start_ts()),
);
builder = Box::new(move |snap, req_ctx: &_| {
// TODO: Remove explicit type once rust-lang#41078 is resolved
checksum::ChecksumContext::new(checksum, ranges, snap, req_ctx)
.map(|h| h.into_boxed())
});
}
tp => return Err(box_err!("unsupported tp {}", tp)),
};
Ok((builder, req_ctx))
}

可以看到,对于一个 DAG 请求,这里会创建一个

let mut dag = DAGRequest::new();

然后从 Request 的 Data 读入数据

if let Some(scan) = dag.get_executors().iter().next() {
table_scan = scan.get_tp() == ExecType::TypeTableScan;
if table_scan {
is_desc_scan = scan.get_tbl_scan().get_desc();
}
else {
is_desc_scan = scan.get_idx_scan().get_desc();
}
}

dag.get_executors() 这个函数如下:

pub fn get_executors(&self) -> &[super::executor::Executor] {
&self.executors
}

So, 对于 REQ_TYPE_DAG: 这个会处理拿到 scan 的对象,然后返回一个 dag::Builder (中间这坨代码我没看懂)

关于dag::Builder

Currently all executors are executed in sequence and there is no task scheduler, so this builder is in fact a pipeline builder. The builder will finally build a Box<Executor> which may contain another executor as its source, embedded in the field, one after another. These nested executors together form an executor pipeline that a single iteration at the out-most executor (i.e. calling next()) will drive the whole pipeline.

build 返回一个 Result<dyn RequestHandler>>,和我们开始调用的地方正好搭上了(废话…要不然编译怎么通过的)

那我们再来看看 RequestHandler,在 CLion 中搜索可以看到:

RequestHandler

哦,终于到了 BatchDAGHandlerDAGReqeustHandler,我们去看看 batch:

/// Must be built from DAGRequestHandler.
pub struct BatchDAGHandler {
/// The deadline of this handler. For each check point (e.g. each iteration) we need to check
/// whether or not the deadline is exceeded and break the process if so.
// TODO: Deprecate it using a better deadline mechanism.
deadline: Deadline,
out_most_executor: Box<dyn BatchExecutor>,
/// The offset of the columns need to be outputted. For example, TiDB may only needs a subset
/// of the columns in the result so that unrelated columns don't need to be encoded and
/// returned back.
output_offsets: Vec<u32>,
config: Arc<EvalConfig>,
/// Accumulated statistics.
// TODO: Currently we return statistics only once, so these statistics are accumulated only
// once. However in future when we introduce reenterable DAG processor, these statistics may
// be accumulated and returned several times during the life time of the request. At that time
// we may remove this field.
statistics: BatchExecuteStatistics,
/// Traditional metric interface.
// TODO: Deprecate it in Coprocessor DAG v2.
metrics: ExecutorMetrics,
/// Whether or not execution summary need to be collected.
collect_exec_summary: bool,
}

去掉 Deprecate 的字段:

/// Must be built from DAGRequestHandler.
pub struct BatchDAGHandler {
out_most_executor: Box<dyn BatchExecutor>,
/// The offset of the columns need to be outputted. For example, TiDB may only needs a subset
/// of the columns in the result so that unrelated columns don't need to be encoded and
/// returned back.
output_offsets: Vec<u32>,
config: Arc<EvalConfig>,
/// Accumulated statistics.
// TODO: Currently we return statistics only once, so these statistics are accumulated only
// once. However in future when we introduce reenterable DAG processor, these statistics may
// be accumulated and returned several times during the life time of the request. At that time
// we may remove this field.
statistics: BatchExecuteStatistics,
/// Whether or not execution summary need to be collected.
collect_exec_summary: bool,
}

同时 这个 Handler 实现了 RequestHandler

impl RequestHandler for BatchDAGHandler {
fn handle_request(&mut self) -> Result<Response> {
let mut chunks = vec![];
let mut batch_size = BATCH_INITIAL_SIZE;
let mut warnings = self.config.new_eval_warnings();
loop {
self.deadline.check_if_exceeded()?;
let mut result = self.out_most_executor.next_batch(batch_size);
let is_drained;
// Check error first, because it means that we should directly respond error.
match result.is_drained {
Err(Error::Eval(err)) => {
let mut resp = Response::new();
let mut sel_resp = SelectResponse::new();
sel_resp.set_error(err);
let data = box_try!(sel_resp.write_to_bytes());
resp.set_data(data);
return Ok(resp);
}
Err(e) => return Err(e),
Ok(f) => is_drained = f,
}
// We will only get warnings limited by max_warning_count. Note that in future we
// further want to ignore warnings from unused rows. See TODOs in the `result.warnings`
// field.
warnings.merge(&mut result.warnings);
// Notice that logical rows len == 0 doesn't mean that it is drained.
if !result.logical_rows.is_empty() {
assert_eq!(
result.physical_columns.columns_len(),
self.out_most_executor.schema().len()
);
let mut chunk = Chunk::new();
{
let data = chunk.mut_rows_data();
data.reserve(
result
.physical_columns
.maximum_encoded_size(&result.logical_rows, &self.output_offsets)?,
);
// Although `schema()` can be deeply nested, it is ok since we process data in
// batch.
result.physical_columns.encode(
&result.logical_rows,
&self.output_offsets,
self.out_most_executor.schema(),
data,
)?;
}
chunks.push(chunk);
}
if is_drained {
self.out_most_executor
.collect_statistics(&mut self.statistics);
self.metrics.cf_stats.add(&self.statistics.cf_stats);
let mut resp = Response::new();
let mut sel_resp = SelectResponse::new();
sel_resp.set_chunks(chunks.into());
// TODO: output_counts should not be i64. Let's fix it in Coprocessor DAG V2.
sel_resp.set_output_counts(
self.statistics
.scanned_rows_per_range
.iter()
.map(|v| *v as i64)
.collect(),
);
if self.collect_exec_summary {
let summaries = self
.statistics
.summary_per_executor
.iter()
.map(|summary| {
let mut ret = ExecutorExecutionSummary::new();
ret.set_num_iterations(summary.num_iterations as u64);
ret.set_num_produced_rows(summary.num_produced_rows as u64);
ret.set_time_processed_ns(summary.time_processed_ns as u64);
ret
})
.collect();
sel_resp.set_execution_summaries(RepeatedField::from_vec(summaries));
}
sel_resp.set_warnings(warnings.warnings.into());
sel_resp.set_warning_count(warnings.warning_cnt as i64);
let data = box_try!(sel_resp.write_to_bytes());
resp.set_data(data);
// Not really useful here, because we only collect it once. But when we change it
// in future, hope we will not forget it.
self.statistics.clear();
return Ok(resp);
}
// Grow batch size
if batch_size < BATCH_MAX_SIZE {
batch_size *= BATCH_GROW_FACTOR;
if batch_size > BATCH_MAX_SIZE {
batch_size = BATCH_MAX_SIZE
}
}
}
}
fn collect_metrics_into(&mut self, target_metrics: &mut ExecutorMetrics) {
// FIXME: This interface will be broken in streaming mode.
target_metrics.merge(&mut self.metrics);
// Notice: Exec count is collected during building the batch handler.
}
}

回顾

我们整理一下流程:

  1. src/server/service/kv.rs 的 coprocessor/coprocessor_stream 接口负责响应 coprocessor 请求,这里他们生成一个 future 对象,然后交给 grpc-rs 的 ctx.
  2. 具体而言,Endpoint 对象会把请求打包成 DAGRequest, 然后调用 build 返回 `Result<dyn RequestHandler>>
  3. BatchDAGHandler 实现了 RequestHandler
  4. BatchDAGHandler包装了 BatchDAGExecutor, 所有的 Batch dag 算子都会 impl 它