Transaction In TiKV Read(1)

真实的逻辑

我们的 Percolator Training 需要服务端实现四个接口:

service! {
service timestamp {
rpc get_timestamp(TimestampRequest) returns (TimestampResponse);
}
}
pub use timestamp::{add_service as add_tso_service, Client as TSOClient, Service};
service! {
service transaction {
rpc get(GetRequest) returns (GetResponse);
rpc prewrite(PrewriteRequest) returns (PrewriteResponse);
rpc commit(CommitRequest) returns (CommitResponse);
}
}
  • get prewrite commit get_timestamp

其中,以下操作在这里交给服务端来进行或者处理不太好

  1. Cleanup:txn primary failed, need to clean up. 对给定的 key,Prewrite 没办法 commit 的时候 做 Cleanup 处理失败
  2. GC — 这个好像原来根本没有,我不知道除了多余的数据还会回收什么
  3. ResolveLock — 我的菜比实现里面, Get 这票操作运行不下去就会 ResolveLock,具体是 Resolve(start_ts, end_ts) 和我的操作相同,这里会把 table 里面这些都 清楚掉,应该挺耗时间的吧

这里实际的流程比我们的 Lab 完善很多,参考 shirly 姐的博客,我们有对应的内容:

  • 客户端

// Transaction defines the interface for operations inside a Transaction.
// This is not thread safe.
type Transaction interface {
MemBuffer
AssertionProto
// Commit commits the transaction operations to KV store.
Commit(context.Context) error
// Rollback undoes the transaction operations to KV store.
Rollback() error
// String implements fmt.Stringer interface.
String() string
// LockKeys tries to lock the entries with the keys in KV store.
LockKeys(ctx context.Context, forUpdateTS uint64, keys ...Key) error
// SetOption sets an option with a value, when val is nil, uses the default
// value of this option.
SetOption(opt Option, val interface{})
// DelOption deletes an option.
DelOption(opt Option)
// IsReadOnly checks if the transaction has only performed read operations.
IsReadOnly() bool
// StartTS returns the transaction start timestamp.
StartTS() uint64
// Valid returns if the transaction is valid.
// A transaction become invalid after commit or rollback.
Valid() bool
// GetMemBuffer return the MemBuffer binding to this transaction.
GetMemBuffer() MemBuffer
// SetVars sets variables to the transaction.
SetVars(vars *Variables)
// BatchGet gets kv from the memory buffer of statement and transaction, and the kv storage.
BatchGet(keys []Key) (map[string][]byte, error)
IsPessimistic() bool
}

这里提供了一个 KV 的 interface,我估摸着这个类似 Service 那套了。

实现这个对象的除了我目前不晓得是啥的 Scanner,还有 tikvTxn

在这里我们重点关注 tikv/txn :

// tikvTxn implements kv.Transaction.
type tikvTxn struct {
snapshot *tikvSnapshot
us kv.UnionStore
store *tikvStore // for connection to region.
startTS uint64
startTime time.Time // Monotonic timestamp for recording txn time consuming.
commitTS uint64
valid bool
lockKeys [][]byte
lockedMap map[string]struct{}
mu sync.Mutex // For thread-safe LockKeys function.
dirty bool
setCnt int64
vars *kv.Variables
committer *twoPhaseCommitter
// For data consistency check.
// assertions[:confirmed] is the assertion of current transaction.
// assertions[confirmed:len(assertions)] is the assertions of current statement.
// StmtCommit/StmtRollback may change the confirmed position.
assertions []assertionPair
confirmed int
}

可以看到 UnionStore:

// UnionStore is a store that wraps a snapshot for read and a BufferStore for buffered write.
// Also, it provides some transaction related utilities.
type UnionStore interface {
MemBuffer
// Returns related condition pair
LookupConditionPair(k Key) *conditionPair
// DeleteConditionPair deletes a condition pair.
DeleteConditionPair(k Key)
// WalkBuffer iterates all buffered kv pairs.
WalkBuffer(f func(k Key, v []byte) error) error
// SetOption sets an option with a value, when val is nil, uses the default
// value of this option.
SetOption(opt Option, val interface{})
// DelOption deletes an option.
DelOption(opt Option)
// GetOption gets an option.
GetOption(opt Option) interface{}
// GetMemBuffer return the MemBuffer binding to this UnionStore.
GetMemBuffer() MemBuffer
}

Commit 是一个很长的函数,不过因为我贴东西简单,所以贴了:

func (txn *tikvTxn) Commit(ctx context.Context) error {
if !txn.valid {
return kv.ErrInvalidTxn
}
defer txn.close()
failpoint.Inject("mockCommitError", func(val failpoint.Value) {
if val.(bool) && kv.IsMockCommitErrorEnable() {
kv.MockCommitErrorDisable()
failpoint.Return(errors.New("mock commit error"))
}
})
tikvTxnCmdCountWithSet.Add(float64(txn.setCnt))
tikvTxnCmdCountWithCommit.Inc()
start := time.Now()
defer func() { tikvTxnCmdHistogramWithCommit.Observe(time.Since(start).Seconds()) }()
// connID is used for log.
var connID uint64
val := ctx.Value(sessionctx.ConnID)
if val != nil {
connID = val.(uint64)
}
var err error
// If the txn use pessimistic lock, committer is initialized.
committer := txn.committer
if committer == nil {
committer, err = newTwoPhaseCommitter(txn, connID)
if err != nil {
return errors.Trace(err)
}
}
if err := committer.initKeysAndMutations(); err != nil {
return errors.Trace(err)
}
if len(committer.keys) == 0 {
return nil
}
defer func() {
ctxValue := ctx.Value(execdetails.CommitDetailCtxKey)
if ctxValue != nil {
commitDetail := ctxValue.(**execdetails.CommitDetails)
if *commitDetail != nil {
(*commitDetail).TxnRetry += 1
} else {
*commitDetail = committer.detail
}
}
}()
// latches disabled
// pessimistic transaction should also bypass latch.
if txn.store.txnLatches == nil || txn.IsPessimistic() {
err = committer.executeAndWriteFinishBinlog(ctx)
logutil.Logger(ctx).Debug("[kv] txnLatches disabled, 2pc directly", zap.Error(err))
return errors.Trace(err)
}
// latches enabled
// for transactions which need to acquire latches
start = time.Now()
lock := txn.store.txnLatches.Lock(committer.startTS, committer.keys)
committer.detail.LocalLatchTime = time.Since(start)
if committer.detail.LocalLatchTime > 0 {
metrics.TiKVLocalLatchWaitTimeHistogram.Observe(committer.detail.LocalLatchTime.Seconds())
}
defer txn.store.txnLatches.UnLock(lock)
if lock.IsStale() {
return kv.ErrWriteConflictInTiDB.FastGenByArgs(txn.startTS)
}
err = committer.executeAndWriteFinishBinlog(ctx)
if err == nil {
lock.SetCommitTS(committer.commitTS)
}
logutil.Logger(ctx).Debug("[kv] txnLatches enabled while txn retryable", zap.Error(err))
return errors.Trace(err)
}
func (txn *tikvTxn) Set(k kv.Key, v []byte) error {
txn.setCnt++
txn.dirty = true
return txn.us.Set(k, v)
}

可以看到 twoPhaseCommitter 完成了具体的 txn

入口

首先 TiKV 这在 kvproto 这个仓库定义了一大堆 KV 相关的接口:

// Serve as a distributed kv database.
service Tikv {
// KV commands with mvcc/txn supported.
rpc KvGet(kvrpcpb.GetRequest) returns (kvrpcpb.GetResponse) {}
rpc KvScan(kvrpcpb.ScanRequest) returns (kvrpcpb.ScanResponse) {}
rpc KvPrewrite(kvrpcpb.PrewriteRequest) returns (kvrpcpb.PrewriteResponse) {}
rpc KvPessimisticLock(kvrpcpb.PessimisticLockRequest) returns (kvrpcpb.PessimisticLockResponse) {}
rpc KVPessimisticRollback(kvrpcpb.PessimisticRollbackRequest) returns (kvrpcpb.PessimisticRollbackResponse) {}
rpc KvCommit(kvrpcpb.CommitRequest) returns (kvrpcpb.CommitResponse) {}
rpc KvImport(kvrpcpb.ImportRequest) returns (kvrpcpb.ImportResponse) {}
rpc KvCleanup(kvrpcpb.CleanupRequest) returns (kvrpcpb.CleanupResponse) {}
rpc KvBatchGet(kvrpcpb.BatchGetRequest) returns (kvrpcpb.BatchGetResponse) {}
rpc KvBatchRollback(kvrpcpb.BatchRollbackRequest) returns (kvrpcpb.BatchRollbackResponse) {}
rpc KvScanLock(kvrpcpb.ScanLockRequest) returns (kvrpcpb.ScanLockResponse) {}
rpc KvResolveLock(kvrpcpb.ResolveLockRequest) returns (kvrpcpb.ResolveLockResponse) {}
rpc KvGC(kvrpcpb.GCRequest) returns (kvrpcpb.GCResponse) {}
rpc KvDeleteRange(kvrpcpb.DeleteRangeRequest) returns (kvrpcpb.DeleteRangeResponse) {}
// 下略
}

这些是 Txn 级别的。同时定义的这些 Request Response 的结构可以在 kvrpcpb 中找到:

在 TiKV 的 src/server/service ,可以看到 BatchGet 相关的代码:

fn kv_batch_get(
&mut self,
ctx: RpcContext<'_>,
req: BatchGetRequest,
sink: UnarySink<BatchGetResponse>,
) {
let timer = GRPC_MSG_HISTOGRAM_VEC.kv_batch_get.start_coarse_timer();
let future = future_batch_get(&self.storage, req)
.and_then(|res| sink.success(res).map_err(Error::from))
.map(|_| timer.observe_duration())
.map_err(move |e| {
debug!("kv rpc failed";
"request" => "kv_batch_get",
"err" => ?e
);
GRPC_MSG_FAIL_COUNTER.kv_batch_get.inc();
});
ctx.spawn(future);
}
fn kv_get(&mut self, ctx: RpcContext<'_>, req: GetRequest, sink: UnarySink<GetResponse>) {
let timer = GRPC_MSG_HISTOGRAM_VEC.kv_get.start_coarse_timer();
let future = future_get(&self.storage, req)
.and_then(|res| sink.success(res).map_err(Error::from))
.map(|_| timer.observe_duration())
.map_err(move |e| {
debug!("kv rpc failed";
"request" => "kv_get",
"err" => ?e
);
GRPC_MSG_FAIL_COUNTER.kv_get.inc();
});
ctx.spawn(future);
}
fn kv_scan(&mut self, ctx: RpcContext<'_>, req: ScanRequest, sink: UnarySink<ScanResponse>) {
let timer = GRPC_MSG_HISTOGRAM_VEC.kv_scan.start_coarse_timer();
let future = future_scan(&self.storage, req)
.and_then(|res| sink.success(res).map_err(Error::from))
.map(|_| timer.observe_duration())
.map_err(move |e| {
debug!("kv rpc failed";
"request" => "kv_scan",
"err" => ?e
);
GRPC_MSG_FAIL_COUNTER.kv_scan.inc();
});
ctx.spawn(future);
}

好像这个 future 很重要,我们来看一眼(希望不是 macro 生成的):

fn future_get<E: Engine>(
storage: &Storage<E>,
mut req: GetRequest,
) -> impl Future<Item = GetResponse, Error = Error> {
storage
.async_get(
req.take_context(),
Key::from_raw(req.get_key()),
req.get_version(),
)
.then(|v| {
let mut resp = GetResponse::default();
if let Some(err) = extract_region_error(&v) {
resp.set_region_error(err);
} else {
match v {
Ok(Some(val)) => resp.set_value(val),
Ok(None) => (),
Err(e) => resp.set_error(extract_key_error(&e)),
}
}
Ok(resp)
})
}

我在之前的 slice 中介绍过,storage: &Storage<E> 是很多层都实现过的一个接口,而 GetRequest 在实际使用的时候,调用的是 RaftKv 对应的 Handler,这边会妥善的读取数据。

其中,同时可以看到 storage/mod.rs 的结构注释:

Storage implements transactional KV APIs and raw KV APIs on a given Engine. An Engine provides low level KV functionality. Engine has multiple implementations. When a TiKV server is running, a RaftKv will be the underlying Engine of Storage. The other two types of engines are for test purpose.

Storage is reference counted and cloning Storage will just increase the reference counter. Storage resources (i.e. threads, engine) will be released when all references are dropped.

Notice that read and write methods may not be performed over full data in most cases, i.e. when underlying engine is RaftKv, which limits data access in the range of a single region according to specified ctx parameter. However,async_unsafe_destroy_range is the only exception. It’s always performed on the whole TiKV.

Operations of Storage can be divided into two types: MVCC operations and raw operations. MVCC operations uses MVCC keys, which usually consist of several physical keys in different CFs. In default CF and write CF, the key will be memcomparable-encoded and append the timestamp to it, so that multiple versions can be saved at the same time. Raw operations use raw keys, which are saved directly to the engine without memcomparable- encoding and appending timestamp.

看来这里 async_get 调用了 MVCC 的接口:

/// Get value of the given key from a snapshot.
///
/// Only writes that are committed before `start_ts` are visible.
pub fn async_get(
&self,
ctx: Context,
key: Key,
start_ts: u64,
) -> impl Future<Item = Option<Value>, Error = Error> {
const CMD: &str = "get";
let priority = readpool::Priority::from(ctx.get_priority());
let res = self.read_pool.spawn_handle(priority, move || {
tls_collect_command_count(CMD, priority);
let command_duration = tikv_util::time::Instant::now_coarse();
with_tls_engine(|engine| {
Self::async_snapshot(engine, &ctx)
.and_then(move |snapshot: E::Snap| {
tls_processing_read_observe_duration(CMD, || {
let mut statistics = Statistics::default();
let snap_store = SnapshotStore::new(
snapshot,
start_ts,
ctx.get_isolation_level(),
!ctx.get_not_fill_cache(),
);
let result = snap_store
.get(&key, &mut statistics)
// map storage::txn::Error -> storage::Error
.map_err(Error::from)
.map(|r| {
tls_collect_key_reads(CMD, 1);
r
});
tls_collect_scan_count(CMD, &statistics);
tls_collect_read_flow(ctx.get_region_id(), &statistics);
result
})
})
.then(move |r| {
tls_collect_command_duration(CMD, command_duration.elapsed());
r
})
})
});
future::result(res)
.map_err(|_| Error::SchedTooBusy)
.flatten()
}

对于 async_get, 里面的操作会同步的在 self.read_pool 中进行,不过听说以后这个 pool 会被砍成别的,这边流程如下:

  1. async_snapshot 拿到一个 Raft 底下的 Snapshot,表示目前的状态。Snapshot 也实现了和 rocksdb 这边类似的 接口
  2. 在同步的方法中:
    1. 用给定的 start_ts 去从 snapshot 读取对应的数据
    2. 统计信息

那么,在 snap_store.get(&key, &mut statistics) 中,我们需要研究下他具体怎么当个读取的

在这之前我们看到这里首先是说我们看到一大把 TLS_STORAGE_METRICS 有关的统计用 macro,虽然MVCC Read 并不包括统计这部分,但是看到一大堆自己看不懂的代码总是不太好,我们可以看下对应的数据

thread_local! {
static TLS_STORAGE_METRICS: RefCell<StorageLocalMetrics> = RefCell::new(
StorageLocalMetrics {
local_sched_histogram_vec: SCHED_HISTOGRAM_VEC.local(),
local_sched_processing_read_histogram_vec: SCHED_PROCESSING_READ_HISTOGRAM_VEC.local(),
local_kv_command_keyread_histogram_vec: KV_COMMAND_KEYREAD_HISTOGRAM_VEC.local(),
local_kv_command_counter_vec: KV_COMMAND_COUNTER_VEC.local(),
local_sched_commands_pri_counter_vec: SCHED_COMMANDS_PRI_COUNTER_VEC.local(),
local_kv_command_scan_details: KV_COMMAND_SCAN_DETAILS.local(),
local_read_flow_stats: HashMap::default(),
}
);
}

这边函数会统计一定的数值,然后作出统计。把数据上报给 Prometheus。


这个 snapshot 的 get 从 MVCCReader 中取数据:

#[inline]
fn get(&self, key: &Key, statistics: &mut Statistics) -> Result<Option<Value>> {
// 创建
// TODO:等等,你的 timestamp 呢
let mut reader = MvccReader::new(
// raft 的快照
self.snapshot.clone(),
None,
self.fill_cache,
// 诶,为啥
None,
None,
self.isolation_level,
);
// 对 key 读到数据,用 start_ts 去 read, 释放掉
let v = reader.get(key, self.start_ts)?;
statistics.add(reader.get_statistics());
Ok(v)
}

那我们应该直接看看 MVCCReader 的 get

/// 对快照读取的 get
/// 有两个隔离级别:SI/RC
/// - SI 参照原论文
/// - RC 没有 prevent from Read
///
/// Get 返回对应的 Value, 其实全他妈是 u8
pub fn get(&mut self, key: &Key, mut ts: u64) -> Result<Option<Value>> {
// Check for locks that signal concurrent writes.
// ts 处理
match self.isolation_level {
// 锁条件不允许的情况下是无法 get 的
// 这里应该会:
// 1. 等待
// 2. 实在很久了会 ResolveLock?
// 3. 卧槽,读到了锁直接返回有 LOCK, 是不是有点快
IsolationLevel::SI => ts = self.check_lock(key, ts)?,
IsolationLevel::RC => {}
}
// 读取 write
if let Some(mut write) = self.get_write(key, ts)? {
// check 到 write
// ???, write 为什么会有数值
// --> answer: short value 表示对 "length<64 byte" 数据的读取
if write.short_value.is_some() {
// 只读 key 的话...这个东西就死啦
// TODO: 但是我也不知道为啥,也不知道 key_only 是啥
if self.key_only {
return Ok(Some(vec![]));
}
return Ok(write.short_value.take());
}
// 根据 write 的 TS 读到具体的数据
// 估计跟 load_lock 差不多
// 返回的完全是 `Vec<u8>`
match self.load_data(key, write.start_ts)? {
None => {
// 没找到
return Err(default_not_found_error(key.to_raw()?, write, "get"));
}
// 返回读到的数据
Some(v) => return Ok(Some(v)),
}
}
Ok(None)
}

这里我们需要看看实际上在 Percolator 论文中提到的3个 Column Family: Data/Write/Lock 被组织称什么样子。

src/cf.rs 中,有:

// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
pub type CfName = &'static str;
pub const CF_DEFAULT: CfName = "default";
pub const CF_LOCK: CfName = "lock";
pub const CF_WRITE: CfName = "write";
pub const CF_RAFT: CfName = "raft";
// Cfs that should be very large generally.
pub const LARGE_CFS: &[CfName] = &[CF_DEFAULT, CF_WRITE];
pub const ALL_CFS: &[CfName] = &[CF_DEFAULT, CF_LOCK, CF_WRITE, CF_RAFT];
pub const DATA_CFS: &[CfName] = &[CF_DEFAULT, CF_LOCK, CF_WRITE];

Data 存放在default 的CF,即 CF_DEFUALT,所以有的时候看不到这个被显示使用,被存成了 Value,这个存法很淳朴,因为它里面直接是 Vec<u8>,嗯,异常淳朴。它的 key 是 (Key, u64), value 是 Option<Value>

Write 表示 Commit,存放在 write CF, 它的 key 是 (Key, u64),value 是 (u64, Write)

/// Write 的值,ts
/// 其实我总觉得 Write 应该叫 Commit XD
#[derive(PartialEq, Clone)]
pub struct Write {
// 提交的类型()
pub write_type: WriteType,
// start_ts, 指向对应的小数据
pub start_ts: u64,
// short_value 优化,length<64 byte
pub short_value: Option<Value>,
}

WriteType 这个我们稍微看一下:

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum WriteType {
// Put
Put,
// Delete,是不是 Delete 我们就不用查看对应的数据了卧槽
Delete,
// Lock for update
Lock,
// 我不知道,是不是表示这些数据被 Rollback 了,没卵用了
Rollback,
}

我们看看 get_write:

pub fn get_write(&mut self, key: &Key, mut ts: u64) -> Result<Option<Write>> {
loop {
match self.seek_write(key, ts)? {
Some((commit_ts, write)) => match write.write_type {
// 看来得去 Data 那拿东西了
WriteType::Put => {
return Ok(Some(write));
}
// Delete 了肯定就没了...
WriteType::Delete => {
return Ok(None);
}
// Lock 是 Lock for update
// 然后咱就只能读之前的数据了
// Rollback 是因为重试的情况,处理一些 corner case
WriteType::Lock | WriteType::Rollback => ts = commit_ts - 1,
},
None => return Ok(None),
}
}
}

有 roll_back 就回退到上一个 timestamp ,直到没有数据或者读到数据

最不好理解的感觉是 Lock,感觉程序里面看到 Lock 应该都不好理解咋的。存放的数据key是 Key (很单纯), value 是 Option<Lock>

#[derive(PartialEq, Debug)]
pub struct Lock {
// lock 的数据对应的类型
pub lock_type: LockType,
// primary lock
pub primary: Vec<u8>,
// start_ts of txn
pub ts: u64,
// time-to-live, 超时可以被清除
pub ttl: u64,
// 优化的 short_value
pub short_value: Option<Value>,
// If for_update_ts != 0, this lock belongs to a pessimistic transaction
pub for_update_ts: u64,
pub txn_size: u64,
}

LockType 和之前的 WriteType 差不多:

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum LockType {
/// Put
Put,
/// Delete
Delete,
/// Lock for update
Lock,
/// 悲观锁
Pessimistic,
}

理解了这些后就可以理解这个流程了。


  1. Read Commited 保证:
    1. No Dirty Read
    2. No Dirty Write
    3. 会产生 non-repeatable-read