NebulaGraph源码解读系列一

源码分支: release-3.6

作者简介:

阿旺(奇富科技)
本人是一名 java 老鸟,工作内容并非 C++ 方向,平时也很少接触 C++ 语言,不懂的代码都是使用 GPT 作代码辅助解释,故难免存在准确性问题,但大体思路应该没太大问题,如有错误问题还望评论区指出回复,相互学习共同成长。以下内容源自我在使用 NebulaGraph 过程中遇到的一些问题和思考。我通过查阅源码、动手实践,解决了一些实际生产中的疑难问题。也希望借此与大家分享经验,探讨更多技术细节。同时,也想鼓励每一位技术人——不要给自己轻易设限,勇于突破边界,探索新的领域。

内容大纲
  1. nebula目录结构说明
  2. nebula启动会运行多少个rocksdb实例
  3. nebula是如何查询一个点的数据
  4. nebula中value是如何编码的
  5. nebula中TTL是如何实现的
  6. nebula中如何管理session
  7. nebula错误日志中未知错误码如何定位原因
一、nebula源码src目录结构

src
|-clients : 内部服务客户端,主要用于graphd、metad、storaged内部服务RPC调用
|-codec :nebula数据的key-value编码和解码
|-daemons :启动入口,graphd、metad、storaged启动
|-graph :graphd相关业务实现,graphd主要负责处理客户端请求,属于nebula的计算层
|-interface : thrift接口定义,nebula采用thrift做异构语言rpc调用,nebula客户端与nebula交互接口皆定义于此
|-kvstore :键值存储引擎,storaged和metad都会用到,比如:rocksdb和hbase,nebula默认采用rocksdb作为内嵌存储
|-meta :元数据相关,比如:session会话、用户信息、分片信息等等,属于nebula元数据管理层
|-parser : 语法解析
|-storage : nebula存储相关,提供一些简单的kv存储和查询的接口服务给graphd调用

温馨提示: nebula通过CMake进行编译打包,如果想了解打包过程和依赖信息见每个目录的CMakeLists.txt文件

二、单个nebula-storaged启动时会运行多少个rocksdb实例

结论: 数据存储磁盘目录数 *space总数
数据存储磁盘目录: etc/nebula-storaged.conf中data_path
比如:有2个space,3个数据目录,则rocksdb实例数=2*3=6

相关知识点:
space和part分区数越多nebula-storaged启动越慢

大体过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
1.StorageDaemon::main() 程序启动入口
2.StorageServer::start() 启动存储引擎
3.StorageServer::getStoreInstance() 初始化存储引擎
4.NebulaStore::init() nebula存储引擎初始化
5.NebulaStore::loadPartFromDataPath() 通过获取data_path存储目录路径,为每个目录下space启动一个rocksdb实例,并且建立了space和kvEngine的映射关系,以及partId和part的映射关系,后面查询或者更新实则是定位数据partId,然后通过partId获取到part进行操作,part中保留了kvEngine的引用

最终构建spaces_,里面包含了SpaceId和spacePart分片的映射关系,后面所有的读写都需要此映射关系操作对应的rocksdb,这里的映射关系类似于java中SpringBean
std::unordered_map<GraphSpaceID, std::shared_ptr<SpacePartInfo>> spaces_;
struct SpacePartInfo {
//Part中保留了engin的引用,当前版本engine其实就是rocksdb
std::unordered_map<PartitionID, std::shared_ptr<Part>> parts_;
std::vector<std::unique_ptr<KVEngine>> engines_;
}

关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// 创建storageServer并启动
StorageDaemon::main(){
auto storageServer = std::make_unique<nebula::storage::StorageServer>(
localhost, metaAddrsRet.value(), paths, FLAGS_wal_path, FLAGS_listener_path);
storageServer->start()
}

// 初始化kv存储引擎
StorageServer::start(){
LOG(INFO) << "Init kvstore";
kvstore_ = getStoreInstance();
}

// 实例化nebula存储引擎并初始化
StorageServer::getStoreInstance(){
auto nbStore = std::make_unique<kvstore::NebulaStore>(
std::move(options), ioThreadPool_, localHost_, workers_);
nbStore->init()
}

// 从data_path中载入part分区数据
NebulaStore::init(){
loadPartFromDataPath()
}

//遍历data_path目录为每个space创建一个rocksdb引擎
NebulaStore::loadPartFromDataPath(){
std::vector<folly::Future<std::pair<GraphSpaceID, std::unique_ptr<KVEngine>>>> futures;
std::vector<std::string> enginesPath;
for (auto& path : options_.dataPaths_) {
//这里其实就是data_path,比如:/usr/local/nebula/data
auto rootPath = folly::stringPrintf("%s/nebula", path.c_str());
//获取每个data_pah子目录,其实就是space
// 比如 /usr/local/nebula/data/1、/usr/local/nebula/data/2
auto dirs = fs::FileUtils::listAllDirsInDir(rootPath.c_str());
//遍历space目录,为每个space目录创建一个rocksdb
for (auto& dir : dirs) {
//目录名称就是spaceId,比如/usr/local/nebula/data/1中的1就是spaceId
GraphSpaceID spaceId = folly::to<GraphSpaceID>(dir);
// 目录为0的跳过
if (spaceId == 0) {
// skip the system space, only handle data space here.
continue;
}
enginesPath.emplace_back(rootPath + "/" + dir);
//异步创建rocksdb存储引擎
futures.emplace_back(newEngineAsync(spaceId, path, options_.walPath_));
}
// space和kvEngine列表的映射关系
std::unordered_map<GraphSpaceID, std::vector<std::unique_ptr<KVEngine>>> spaceEngines;
// 等待并获取前面的异步创建引擎列表
auto tries = folly::collectAll(futures).get();
for (auto& t : tries) {
//p就是个二元组,第一个元素为space,第二个元素为kvEngine
auto&& p = t.value();
auto spaceIt = spaceEngines.find(p.first);
if (spaceIt == spaceEngines.end()) {
spaceIt = spaceEngines.emplace(p.first, std::vector<std::unique_ptr<KVEngine>>()).first;
}
spaceIt->second.emplace_back(std::move(p.second));
}
// 遍历所有space存储引擎
for (auto& spaceEngine : spaceEngines) {
GraphSpaceID spaceId = spaceEngine.first;
for (auto& engine : spaceEngine.second) {
// part分片与part副本节点集合,后面raft需要用到
std::map<PartitionID, Peers> partRaftPeers;
partRaftPeers.emplace(partId, raftPeers);
//建立起partId和part映射关系,后面操作某partId的数据时就是通过part进行的,part中其实保留了kvEngine的引用
for (auto& it : partRaftPeers) {
auto part = newPart(spaceId, partId, enginePtr, isLearner, addrs);
auto ret = iter->second->parts_.emplace(partId, part);
}
}
}
}
}
// 创建rocksdb引擎
NebulaStore::newEngineAsync() {
engine = std::make_unique<RocksEngine>(
spaceId, vIdLen, dataPath, walPath, options_.mergeOp_, cfFactory);
}

三、nebula是如何查询一个点的数据

graphd负责接受client客户端请求,并解析 nGQL,生成查询计划,再经过优化器,最终执行executor,为了突出重点省去从接受请求开始到调用executor,而直接定位executor,这边以GetVerticesExecutor.cpp为例来探究如何查找vertex点的数据,主要探究vertexId如何定位到数据

相关知识点

  1. part数和storaged查询并发数成正比
  2. vid和partId映射计算公式: partId = hash(vid) % numParts + 1

大体过程

1
2
3
4
5
6
7
8
9
10
//nebula-graphd
1.GetVerticesExecutor::getVertices() 获取多个点属性列表
2.StorageClient::getProps() 计算vertexId对应的part,然后请求对应part分片数据

//nebula-storaged
3.GraphStorageServiceHandler::future_getProps() storagd收到graphd来的请求进行处理
4.GetPropProcessor::doProcess() 构建tag属性查询计划并调用
5.GetTagPropNode::doExecute() 根据vid构建rocksdb的key,然后调用NebulaStore进行获取
6.NebulaStore::get() 通过partId定位出part,然后通过part获取对应key的信息
7.RocksEngine::get() 从rocksdb中获取数据

关键源码
nebula-graphd部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// 通过上下文获取到storageClient然后调用获取属性的方法
GetVerticesExecutor::getVertices() {
StorageClient *storageClient = qctx()->getStorageClient();
storageClient->getProps(param,
std::move(vertices),
gv->props(),
nullptr,
gv->exprs(),
gv->dedup(),
gv->orderBy(),
gv->getValidLimit(),
gv->filter())
}

// 通过vertexId获取到part所在节点信息
// 请求对应节点的vertexId信息
StorageRpcRespFuture<cpp2::GetPropResponse> StorageClient::getProps(){
// 建立vertexId和part节点映射关系
auto status = clusterIdsToHosts(param.space, input.rows, std::move(cbStatus).value());
// 为不同的节点构建对应的request
cpp2::GetPropRequest> requests;
for (auto& c : clusters) {
auto& host = c.first;
auto& req = requests[host];
req.space_id_ref() = param.space;
req.parts_ref() = std::move(c.second);
}
//调用rpc请求storaged获取点的属性信息
return collectResponse(
param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::GetPropRequest& r) {
return client->future_getProps(r);
});
}

//通过(vid % numParts + 1)计算出对应的part
// 这种方式也预示着space设定的分区数量一旦创建不可更改,否则数据就会映射不上
StorageClientBase::clusterIdsToHosts(){
//获取space空间有多少个part
auto status = metaClient_->partsNum(spaceId);
std::unordered_map<PartitionID, HostAddr> leaders;
//建立起partId和leader节点的映射关系
for (int32_t partId = 1; partId <= numParts; ++partId) {
auto leader = getLeader(spaceId, partId);
leaders[partId] = std::move(leader).value();
}
//
for (auto& id : ids) {
// 计算vid对应的partId
PartitionID part = vid % numParts + 1;
const auto& leader = leaders[part];
clusters[leader][part].emplace_back(std::move(id));
}
return clusters;
}

nebula-storaged部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
folly::Future<cpp2::GetPropResponse> GraphStorageServiceHandler::future_getProps(
const cpp2::GetPropRequest& req) {
auto* processor = GetPropProcessor::instance(env_, &kGetPropCounters, readerPool_.get());
RETURN_FUTURE(processor);
}

GetPropProcessor::doProcess(){
if (!FLAGS_query_concurrently) {
//为了方便理解,这里直接选择单线程
runInSingleThread(req);
} else {
runInMultipleThread(req);
}
}

GetPropProcessor::runInSingleThread(){
// 构建tag查询计划,这个计划相当于构建一个DAG
// 里面有完整的依赖信息
auto plan = buildTagPlan(&contexts_.front(), &resultDataSet_);
// 遍历待查询的part分区
for (const auto& partEntry : req.get_parts()) {
auto partId = partEntry.first;
for (const auto& row : partEntry.second) {
auto vId = row.values[0].getStr();
// 执行查询计划
plan.go(partId, vId);
}
}
}

// 构建执行计划DAG
GetPropProcessor::buildTagPlan(RuntimeContext* context,
nebula::DataSet* result) {
StoragePlan<VertexID> plan;
std::vector<TagNode*> tags;
for (const auto& tc : tagContext_.propContexts_) {
// TagNode获取vid指定tag的属性列表,TagNode will return a DataSet of specified props of tagId
// 这里面就有从rocksdb中读取tag的value属性
// 查询语句指定多少个tag,这里就会有多少个TagNode
// 比如:fetch prop on tag_1,tag_2,tag_3 idxxx,那这里就有3个TagNode
auto tag = std::make_unique<TagNode>(context, &tagContext_, tc.first, &tc.second);
tags.emplace_back(tag.get());
plan.addNode(std::move(tag));
}
// GetTagPropNode用于收集vid对应多个tag的属性
auto output = std::make_unique<GetTagPropNode>(
context, tags, result, filter_ == nullptr ? nullptr : filter_->clone(), limit_, &tagContext_);
for (auto* tag : tags) {
output->addDependency(tag);
}
plan.addNode(std::move(output));
return plan;
}

TagNode::doExecute(){
// 根据vid和tagId构建rocksdb中key
key_ = NebulaKeyUtils::tagKey(context_->vIdLen(), partId, vId, tagId_);
// 从响应分片的rocksdb中获取value,此value为经过RowWriter编码的value
ret = context_->env()->kvstore_->get(context_->spaceId(), partId, key_, &value_);
// RowReaderWrapper设置schema和对应的value,后面用于GetTagPropNode收集prop做准备
reader_.reset(*schemas_, value_);
}


// 收集vid对应所有tag的属性值列表
GetTagPropNode::doExecute(){
// 这个集合用于保存查询到的vid对应不同tag的属性集合
List row;
// vertexId is the first column
if (context_->isIntId()) {
row.emplace_back(*reinterpret_cast<const int64_t*>(vId.data()));
} else {
row.emplace_back(vId);
}
// 遍历本次要查询的所有tag
for (auto* tagNode : tagNodes_) {
ret = tagNode->collectTagPropsIfValid(
// 收集tagNode中属性到row中
[&row, vIdLen, isIntId, tagNode, this](){
auto status = QueryUtils::collectVertexProps(
key, vIdLen, isIntId, reader, props, row, expCtx_.get(), tagNode->getTagName());
});
}
}

QueryUtils::collectVertexProps(){
// 遍历tag中的属性列表,挨个的取出来放入到list中,这个list就是GetTagPropNode::doExecute中的row
for (const auto& prop : *props) {
auto value = QueryUtils::readVertexProp(key, vIdLen, isIntId, reader, prop);
list.emplace_back(value.value());
}
}

// 读取属性值
QueryUtils::readVertexProp(){
switch (prop.propInKeyType_) {
// prop in value
case PropContext::PropInKeyType::NONE: {
// ddl中的属性,这里的readValue就是RowReader根据schame获取字段的offset,然后获取对应的field值
return readValue(reader, prop.name_, prop.field_);
}
case PropContext::PropInKeyType::VID: {
// 属性类型为vid
auto vId = NebulaKeyUtils::getVertexId(vIdLen, key);
if (isIntId) {
return *reinterpret_cast<const int64_t*>(vId.data());
} else {
return vId.subpiece(0, vId.find_first_of('\0')).toString();
}
}
case PropContext::PropInKeyType::TAG: {
// 属性类型为tagId
auto tag = NebulaKeyUtils::getTagId(vIdLen, key);
return tag;
}
}
}

四、nebula中value是如何编码的

从2.x开始起nebula使用新的编码方式version 2,由5个部分组成:

1
2
3
<header> <schema version> <NULL flags> <all properties> <string content>
| | | |
1 byte 0 - 7 bytes 0+ bytes N bytes

header: 占用1个字节,用来标识schema version字节长度,目前取值07
schema version: 0
7个字节,标识schema版本
NULL flags: NULL字段标识,跟mysql设计类似,为NULL的字段就在对应的bit位设置为1
all properties: 固定长度的属性值,根据schema进行排列,由于固定长度所以想获取某个字段的值可以直接计算offset+length就能拿到, 除 STRING 类型属性外,所有属性都就地存储。STRING 属性将字符串内容的偏移量存储在前 4 个字节中,将字符串的长度存储在后 4 个字节中。字符串内容将附加到编码字符串的末尾,相当于多了一层指针
string content: 存储非固定长度的string字符串,获取string属性内容得多次计算,先计算string字符串指针位置,然后在根据指针报错的offset+length得到完整的string内容

各类型占用内存空间大小:

1
2
3
4
5
6
7
8
9
10
11
12
13
BOOL            (1 byte)
INT8 (1 byte)
INT16 (2 bytes)
INT32 (4 bytes)
INT64 (8 bytes)
FLOAT (4 bytes)
DOUBLE (8 bytes)
STRING (8 bytes) *
FIXED_STRING (Length defined in the schema)
TIMESTAMP (8 bytes)
DATE (4 bytes)
DATETIME (15 bytes)
GEOGRAPHY (8 bytes) *

关键代码:
RowWriterV2.cpp和RowWriterV2.h

五、nebula中TTL是如何实现的

nebula中TTL是通过rocksdb中KVFilter实现的,KVFilter是rocksdb对外保留的扩展接口,查询和compact时用来做数据的过滤,数据每次搜索时都会经过此Filter,nebula实现此接口用来过滤TTL过期的数据,以及rocksdb做compact时筛选出过期的数据让其删除。

相关知识点:
TTL到期的数据可能并未删除,仅仅只是查询不出来,如果此时加大TTL或者删除TTL,原来查询不到的数据又会查询出来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// KVFilter在启动的rocksdb的时候就需要设置进去,所以这里直接看rocksdb启动的代码即可
StorageServer::getStoreInstance(){
kvstore::KVOptions options;
// 这个StorageCompactionFilter就是nebula给予VKFilter实现的
options.compaction_filter_factory = std::make_unique<StorageCompactionFilterFactoryBuilder>(schemaMan_.get(), indexMan_.get());
status = rocksdb::DB::Open(options, path, &db);
}

class StorageCompactionFilter final : public kvstore::KVFilter {

//这里是filter的入口
bool filter(int level,
GraphSpaceID spaceId,
const folly::StringPiece& key,
const folly::StringPiece& val) const override {
if (level < FLAGS_min_level_for_custom_filter) {
// for upper level such as L0/L1, we don't go through the custom
// validation to achieve better performance
return false;
}
if (NebulaKeyUtils::isTag(vIdLen_, key)) {
return !tagValid(spaceId, key, val);
} else if (NebulaKeyUtils::isEdge(vIdLen_, key)) {
return !edgeValid(spaceId, key, val);
} else if (IndexKeyUtils::isIndexKey(key)) {
return !indexValid(spaceId, key, val);
} else if (!FLAGS_use_vertex_key && NebulaKeyUtils::isVertex(key)) {
return true;
} else if (NebulaKeyUtils::isLock(vIdLen_, key)) {
return !lockValid(spaceId, key);
} else {
// skip uuid/system/operation
VLOG(3) << "Skip the system key inside, key " << key;
}
return false;
}

//判断tag是否有效,这里其实就是比对ttl字段的值对比当前时间是否超过ttl
private:
bool tagValid(GraphSpaceID spaceId,
const folly::StringPiece& key,
const folly::StringPiece& val) const {
......
if (ttlExpired(schema.get(), reader.get())) {
VLOG(3) << "Ttl expired";
return false;
}
return true;
}

//判断边是否过期
bool edgeValid(GraphSpaceID spaceId,
const folly::StringPiece& key,
const folly::StringPiece& val) const {
....
if (ttlExpired(schema.get(), reader.get())) {
VLOG(3) << "Ttl expired";
return false;
}
return true;
}


// TODO(panda) Optimize the method in the future
bool ttlExpired(const meta::NebulaSchemaProvider* schema,
nebula::RowReaderWrapper* reader) const {
if (schema == nullptr) {
return true;
}
auto ttl = CommonUtils::ttlProps(schema);
// Only support the specified ttl_col mode
// Not specifying or non-positive ttl_duration behaves like ttl_duration =
// infinity
if (!ttl.first) {
return false;
}
//这里比对数据的ttl字段的值和当前时间,如果过期就返回true
return CommonUtils::checkDataExpiredForTTL(schema, reader, ttl.second.second, ttl.second.first);
}

bool ttlExpired(const meta::NebulaSchemaProvider* schema, const Value& v) const {
if (schema == nullptr) {
return true;
}
auto ttl = CommonUtils::ttlProps(schema);
if (!ttl.first) {
return false;
}
return CommonUtils::checkDataExpiredForTTL(schema, v, ttl.second.second, ttl.second.first);
}
};

//判断ttl是否过期
bool CommonUtils::checkDataExpiredForTTL(const meta::NebulaSchemaProvider* schema,
const Value& v,
const std::string& ttlCol,
int64_t ttlDuration) {
const auto& ftype = schema->getFieldType(ttlCol);
if (ftype != nebula::cpp2::PropertyType::TIMESTAMP &&
ftype != nebula::cpp2::PropertyType::INT64) {
return false;
}

int64_t now;
// The unit of ttl expiration unit is controlled by user, we just use a gflag here.
if (!FLAGS_ttl_use_ms) {
now = std::time(nullptr);
} else {
auto t = std::chrono::system_clock::now();
now = std::chrono::duration_cast<std::chrono::milliseconds>(t.time_since_epoch()).count();
}

// if the value is not INT type (sush as NULL), it will never expire.
// TODO (sky) : DateTime
if (v.isInt() && (now > (v.getInt() + ttlDuration))) {
VLOG(2) << "ttl expired";
return true;
}
return false;
}

//从ddl的schema里面读取ttl_duration和ttl_col字段
std::pair<bool, std::pair<int64_t, std::string>> CommonUtils::ttlProps(
const meta::NebulaSchemaProvider* schema) {
DCHECK(schema != nullptr);
const auto* ns = dynamic_cast<const meta::NebulaSchemaProvider*>(schema);
const auto sp = ns->getProp();
int64_t duration = 0;
if (sp.get_ttl_duration()) {
duration = *sp.get_ttl_duration();
}
std::string col;
if (sp.get_ttl_col()) {
col = *sp.get_ttl_col();
}
return std::make_pair(!(duration <= 0 || col.empty()), std::make_pair(duration, col));
}

//获取ttl_col字段的值
StatusOr<Value> CommonUtils::ttlValue(const meta::NebulaSchemaProvider* schema,
RowReaderWrapper* reader) {
DCHECK(schema != nullptr);
const auto* ns = dynamic_cast<const meta::NebulaSchemaProvider*>(schema);
auto ttlProp = ttlProps(ns);
if (!ttlProp.first) {
return Status::Error();
}
return reader->getValueByName(std::move(ttlProp).second.second);
}


六、nebula中是如何管理session的

session是客户端和服务端nebula-graphd交互的凭证,session保存在nebula-metad中,session的创建和销毁都需要跟nebula-metad交互,后面将分析session的创建和销毁。
相关知识点

  1. client客户端通过sessionId来标识session
  2. nebula-graphd重启不会影响已创建的session
  3. nebula-metad将session保存到rocksdb中

总体步骤

  1. nebula-client创建连接并调用用户验证创建session
  2. nebula-graphd接收到client的请求验证,并调用nebula-metad进行session创建
  3. nebula-metad生成sessionId并通过raft协议进行保存

session的创建
nebula-client-3.6 客户端代码:创建nebula连接并认证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// nebula客户端创建连接都需要先进行用户名和密码验证,nebula服务端验证成功后将返回sessionId,客户端使用sessionId作为凭证操作nebula
private NebulaSession createSessionObject(SessionState state){
SyncConnection connection = new SyncConnection();
//建立连接
connection.open(getAddress(), sessionPoolConfig.getTimeout());
//用户名密码认证
AuthResult authResult = connection.authenticate(sessionPoolConfig.getUsername(),
sessionPoolConfig.getPassword());
//将认证结果sessionId放到NebulaSession中
NebulaSession nebulaSession = new NebulaSession(connection, authResult.getSessionId(),
authResult.getTimezoneOffset(), state);
return nebulaSession;
}

// NebulaSession执行语句
public ResultSet execute(String stmt) throws IOErrorException {
//传入sessionId进行RPC请求
return new ResultSet(connection.execute(sessionID, stmt), timezoneOffset);
}

// 调用thrift接口,传入sessionId和语句进行nebula服务端请求
public ExecutionResponse executeWithParameter(long sessionId, byte[] stmt, Map parameterMap) throws TException
{
ContextStack ctx = getContextStack("GraphService.executeWithParameter", null);
this.setContextStack(ctx);
send_executeWithParameter(sessionId, stmt, parameterMap);
return recv_executeWithParameter();
}

nebula-graphd接收client的授权请求,并返回sessionId

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
GraphService::future_authenticate(const std::string& username,const std::string& password){
// 请求nebula-metad校验用户名密码
auto authResult = auth(username, password);
if (!authResult.ok()) {
//用户密码错误
ctx->resp().errorCode = ErrorCode::E_BAD_USERNAME_PASSWORD;
return future;
}
if (sessionManager_->isOutOfConnections()) {
//session创建过多
ctx->resp().errorCode = ErrorCode::E_TOO_MANY_CONNECTIONS;
return future;
}
//请求nebula-metad创建会话
sessionManager_->createSession(username, clientIp, getThreadManager())
}

GraphSessionManager::createSession(){
std::string key = userName + clientIp;
//nebula-graphd.conf中配置的max_sessions_per_ip_per_user参数值
auto maxSessions = FLAGS_max_sessions_per_ip_per_user;
//获取相同用户IP创建session数量
auto uiscFindPtr = sessionCnt(key);
if (uiscFindPtr->get() > maxSessions - 1) {
return Status::Error("Create Session failed: Too many sessions created");
}
//通过metaClient创建会话
return metaClient_->createSession(userName, myAddr_, clientIp)
}

MetaClient::createSession(){
//像nebula-metad发起RPC请求创建session
client->future_createSession(request)
}

nebula-metad接收到nebula-graphd请求进行session创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//接收到session创建请求并进行处理
MetaServiceHandler::future_createSession(
const cpp2::CreateSessionReq& req) {
auto* processor = CreateSessionProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

CreateSessionProcessor::process(){
cpp2::Session session;
// 创建sessionId,以当前时间微妙作为值
session.session_id_ref() = time::WallClock::fastNowInMicroSec();
session.create_time_ref() = session.get_session_id();
session.update_time_ref() = session.get_create_time();
session.user_name_ref() = user;
session.graph_addr_ref() = req.get_graph_addr();
session.client_ip_ref() = req.get_client_ip();
std::vector<kvstore::KV> data;
//构建kv引擎键值对,key为sessionId,value为会话信息
data.emplace_back(MetaKeyUtils::sessionKey(session.get_session_id()),
MetaKeyUtils::sessionVal(session));
resp_.session_ref() = session;
//调用kv引擎的put方法
ret = doSyncPut(std::move(data));
}

static const PartitionID kDefaultPartId = 0;
static const GraphSpaceID kDefaultSpaceId = 0;

//调用NebulaStore的put方法
BaseProcessor<RESP>::doSyncPut(){
kvstore_->asyncMultiPut(kDefaultSpaceId,kDefaultPartId)
}

NebulaStore::asyncMultiPut(){
//获取session会话分片信息,spaceId和 partId都为0
auto ret = part(spaceId, partId);
auto part = nebula::value(ret);
//分片数据写入
part->asyncMultiPut(std::move(keyValues), std::move(cb));
}

Part::asyncMultiPut(){
//简直编码
std::string log = encodeMultiValues(OP_MULTI_PUT, keyValues);
//通过raft协议进行数据写入
appendLogAsync(source, LogType::NORMAL, std::move(log))
}

session的销毁
session的销毁通过两种途径:

  1. client客户端主动signout
  2. 空闲时间超过session_idle_time自动被清除。

方式一、client客户端主动quit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
---------nebula-client------------------
//NebulaSession中release方法退出session
public void release() {
connection.signout(sessionID);
connection.close();
}
//通过thrift协议调用GraphService.signout方法,这里向nebula-graphd发起退出请求
public void signout(long sessionId)
{
ContextStack ctx = getContextStack("GraphService.signout", null);
this.setContextStack(ctx);
send_signout(sessionId);
}

---------nebula-graphd------------------
//接收到nebula-client的请求进行会话清除
GraphService::signout(int64_t sessionId) {
VLOG(2) << "Sign out session " << sessionId;
sessionManager_->removeSession(sessionId);
}
//调用metad服务进行session的移除
GraphSessionManager::removeMultiSessions(){
metaClient_->removeSessions(ids)
}

方式二、nebula-graphd定时任务清除过期session

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
GraphSessionManager::threadFunc() {
//回收过期会话
reclaimExpiredSessions();
//启动下次回收任务
scavenger_->addDelayTask(
FLAGS_session_reclaim_interval_secs * 1000, &GraphSessionManager::threadFunc, this);
}

GraphSessionManager::reclaimExpiredSessions() {
//过期session列表
std::vector<SessionID> expiredSessions;
// activeSessions_为当前有效session列表,在创建session的时候进行填充
for (const auto& iter : activeSessions_) {
int32_t idleSecs = iter.second->idleSeconds();
if (idleSecs < FLAGS_session_idle_timeout_secs) {
continue;
}
//空闲时间大于session_idle_timeout_secs就会加入到过期列表中
expiredSessions.emplace_back(iter.first);
}
}
//请求nebula-metad进行会话清除,根据之前创建session可知其实就是通过raft协议从rocksdb中删除sessionId为key的值
metaClient_->removeSessions(std::move(expiredSessions))

nebula错误日志中未知错误码如何定位原因

nebula错误输出有时只输出错误码,未输出错误原因,此时可通过源码src/interface/common.thrift文件进行搜索即可,我这里贴一些常见错误码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
E_DISCONNECTED                    = -1,     // Lost connection
E_FAIL_TO_CONNECT = -2, // Unable to establish connection
E_RPC_FAILURE = -3, // RPC failure
E_LEADER_CHANGED = -4, // Raft leader has been changed

// 1xxx for graphd
E_BAD_USERNAME_PASSWORD = -1001, // Authentication failed
E_SESSION_INVALID = -1002, // Invalid session
E_SESSION_TIMEOUT = -1003, // Session timeout
E_SYNTAX_ERROR = -1004, // Syntax error
E_EXECUTION_ERROR = -1005, // Execution error
E_STATEMENT_EMPTY = -1006, // Statement is empty
E_SEMANTIC_ERROR = -1009, // Semantic error
E_TOO_MANY_CONNECTIONS = -1010, // Maximum number of connections exceeded
E_PARTIAL_SUCCEEDED = -1011, // Access to storage failed (only some requests succeeded)

// 2xxx for metad
E_NO_HOSTS = -2001, // Host does not exist
E_EXISTED = -2002, // Host already exists
E_INVALID_HOST = -2003, // Invalid host
E_UNSUPPORTED = -2004, // The current command, statement, or function is not supported
E_NOT_DROP = -2005, // Not allowed to drop
E_BALANCER_RUNNING = -2006, // The balancer is running
E_CONFIG_IMMUTABLE = -2007, // Configuration items cannot be changed
E_CONFLICT = -2008, // Parameters conflict with meta data
E_SESSION_NOT_FOUND = -2069, // Session does not exist

// 3xxx for storaged
E_CONSENSUS_ERROR = -3001, // Consensus cannot be reached during an election
E_KEY_HAS_EXISTS = -3002, // Key already exists
E_DATA_TYPE_MISMATCH = -3003, // Data type mismatch
E_INVALID_FIELD_VALUE = -3004, // Invalid field value
E_INVALID_OPERATION = -3005, // Invalid operation
E_NOT_NULLABLE = -3006, // Current value is not allowed to be empty
E_FIELD_UNSET = -3007, // Field value must be set if the field value is NOT NULL or has no default value
E_OUT_OF_RANGE = -3008, // The value is out of the range of the current type
E_DATA_CONFLICT_ERROR = -3010, // Data conflict, for index write without toss.

通过深入分析 NebulaGraph 的源码,我们可以看到它在高效存储、查询、数据编码和 TTL 管理方面的设计细节。希望本文能为开发者们提供一些实用的参考,帮助大家更好地理解 NebulaGraph 的工作原理,也希望通过共同学习,推动技术进步和创新。