源码分支: release-3.6
作者简介: 阿旺(奇富科技) 本人是一名 java 老鸟,工作内容并非 C++ 方向,平时也很少接触 C++ 语言,不懂的代码都是使用 GPT 作代码辅助解释,故难免存在准确性问题,但大体思路应该没太大问题,如有错误问题还望评论区指出回复,相互学习共同成长。以下内容源自我在使用 NebulaGraph 过程中遇到的一些问题和思考。我通过查阅源码、动手实践,解决了一些实际生产中的疑难问题。也希望借此与大家分享经验,探讨更多技术细节。同时,也想鼓励每一位技术人——不要给自己轻易设限,勇于突破边界,探索新的领域。
内容大纲
nebula目录结构说明
nebula启动会运行多少个rocksdb实例
nebula是如何查询一个点的数据
nebula中value是如何编码的
nebula中TTL是如何实现的
nebula中如何管理session
nebula错误日志中未知错误码如何定位原因
一、nebula源码src目录结构 src |-clients : 内部服务客户端,主要用于graphd、metad、storaged内部服务RPC调用 |-codec :nebula数据的key-value编码和解码 |-daemons :启动入口,graphd、metad、storaged启动 |-graph :graphd相关业务实现,graphd主要负责处理客户端请求,属于nebula的计算层 |-interface : thrift接口定义,nebula采用thrift做异构语言rpc调用,nebula客户端与nebula交互接口皆定义于此 |-kvstore :键值存储引擎,storaged和metad都会用到,比如:rocksdb和hbase,nebula默认采用rocksdb作为内嵌存储 |-meta :元数据相关,比如:session会话、用户信息、分片信息等等,属于nebula元数据管理层 |-parser : 语法解析 |-storage : nebula存储相关,提供一些简单的kv存储和查询的接口服务给graphd调用
温馨提示: nebula通过CMake进行编译打包,如果想了解打包过程和依赖信息见每个目录的CMakeLists.txt文件
二、单个nebula-storaged启动时会运行多少个rocksdb实例 结论: 数据存储磁盘目录数 *space总数 数据存储磁盘目录: etc/nebula-storaged.conf中data_path 比如:有2个space,3个数据目录,则rocksdb实例数=2*3=6
相关知识点: space和part分区数越多nebula-storaged启动越慢
大体过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 1.StorageDaemon::main() 程序启动入口 2.StorageServer::start() 启动存储引擎 3.StorageServer::getStoreInstance() 初始化存储引擎 4.NebulaStore::init() nebula存储引擎初始化 5.NebulaStore::loadPartFromDataPath() 通过获取data_path存储目录路径,为每个目录下space启动一个rocksdb实例,并且建立了space和kvEngine的映射关系,以及partId和part的映射关系,后面查询或者更新实则是定位数据partId,然后通过partId获取到part进行操作,part中保留了kvEngine的引用 最终构建spaces_,里面包含了SpaceId和spacePart分片的映射关系,后面所有的读写都需要此映射关系操作对应的rocksdb,这里的映射关系类似于java中SpringBean std::unordered_map<GraphSpaceID, std::shared_ptr<SpacePartInfo>> spaces_; struct SpacePartInfo { //Part中保留了engin的引用,当前版本engine其实就是rocksdb std::unordered_map<PartitionID, std::shared_ptr<Part>> parts_; std::vector<std::unique_ptr<KVEngine>> engines_; }
关键代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 StorageDaemon::main (){ auto storageServer = std::make_unique <nebula::storage::StorageServer>( localhost, metaAddrsRet.value (), paths, FLAGS_wal_path, FLAGS_listener_path); storageServer->start () } StorageServer::start (){ LOG (INFO) << "Init kvstore" ; kvstore_ = getStoreInstance (); } StorageServer::getStoreInstance (){ auto nbStore = std::make_unique <kvstore::NebulaStore>( std::move (options), ioThreadPool_, localHost_, workers_); nbStore->init () } NebulaStore::init (){ loadPartFromDataPath () } NebulaStore::loadPartFromDataPath (){ std::vector<folly::Future<std::pair<GraphSpaceID, std::unique_ptr<KVEngine>>>> futures; std::vector<std::string> enginesPath; for (auto & path : options_.dataPaths_) { auto rootPath = folly::stringPrintf ("%s/nebula" , path.c_str ()); auto dirs = fs::FileUtils::listAllDirsInDir (rootPath.c_str ()); for (auto & dir : dirs) { GraphSpaceID spaceId = folly::to <GraphSpaceID>(dir); if (spaceId == 0 ) { continue ; } enginesPath.emplace_back (rootPath + "/" + dir); futures.emplace_back (newEngineAsync (spaceId, path, options_.walPath_)); } std::unordered_map<GraphSpaceID, std::vector<std::unique_ptr<KVEngine>>> spaceEngines; auto tries = folly::collectAll (futures).get (); for (auto & t : tries) { auto && p = t.value (); auto spaceIt = spaceEngines.find (p.first); if (spaceIt == spaceEngines.end ()) { spaceIt = spaceEngines.emplace (p.first, std::vector<std::unique_ptr<KVEngine>>()).first; } spaceIt->second.emplace_back (std::move (p.second)); } for (auto & spaceEngine : spaceEngines) { GraphSpaceID spaceId = spaceEngine.first; for (auto & engine : spaceEngine.second) { std::map<PartitionID, Peers> partRaftPeers; partRaftPeers.emplace (partId, raftPeers); for (auto & it : partRaftPeers) { auto part = newPart (spaceId, partId, enginePtr, isLearner, addrs); auto ret = iter->second->parts_.emplace (partId, part); } } } } } NebulaStore::newEngineAsync () { engine = std::make_unique <RocksEngine>( spaceId, vIdLen, dataPath, walPath, options_.mergeOp_, cfFactory); }
三、nebula是如何查询一个点的数据 graphd负责接受client客户端请求,并解析 nGQL,生成查询计划,再经过优化器,最终执行executor,为了突出重点省去从接受请求开始到调用executor,而直接定位executor,这边以GetVerticesExecutor.cpp为例来探究如何查找vertex点的数据,主要探究vertexId如何定位到数据
相关知识点
part数和storaged查询并发数成正比
vid和partId映射计算公式: partId = hash(vid) % numParts + 1
大体过程
1 2 3 4 5 6 7 8 9 10 //nebula-graphd 1.GetVerticesExecutor::getVertices() 获取多个点属性列表 2.StorageClient::getProps() 计算vertexId对应的part,然后请求对应part分片数据 //nebula-storaged 3.GraphStorageServiceHandler::future_getProps() storagd收到graphd来的请求进行处理 4.GetPropProcessor::doProcess() 构建tag属性查询计划并调用 5.GetTagPropNode::doExecute() 根据vid构建rocksdb的key,然后调用NebulaStore进行获取 6.NebulaStore::get() 通过partId定位出part,然后通过part获取对应key的信息 7.RocksEngine::get() 从rocksdb中获取数据
关键源码 nebula-graphd部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 GetVerticesExecutor::getVertices () { StorageClient *storageClient = qctx ()->getStorageClient (); storageClient->getProps (param, std::move (vertices), gv->props (), nullptr , gv->exprs (), gv->dedup (), gv->orderBy (), gv->getValidLimit (), gv->filter ()) } StorageRpcRespFuture<cpp2::GetPropResponse> StorageClient::getProps () { auto status = clusterIdsToHosts (param.space, input.rows, std::move (cbStatus).value ()); cpp2::GetPropRequest> requests; for (auto & c : clusters) { auto & host = c.first; auto & req = requests[host]; req.space_id_ref () = param.space; req.parts_ref () = std::move (c.second); } return collectResponse ( param.evb, std::move (requests), [](ThriftClientType* client, const cpp2::GetPropRequest& r) { return client->future_getProps (r); }); } StorageClientBase::clusterIdsToHosts (){ auto status = metaClient_->partsNum (spaceId); std::unordered_map<PartitionID, HostAddr> leaders; for (int32_t partId = 1 ; partId <= numParts; ++partId) { auto leader = getLeader (spaceId, partId); leaders[partId] = std::move (leader).value (); } for (auto & id : ids) { PartitionID part = vid % numParts + 1 ; const auto & leader = leaders[part]; clusters[leader][part].emplace_back (std::move (id)); } return clusters; }
nebula-storaged部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 folly::Future<cpp2::GetPropResponse> GraphStorageServiceHandler::future_getProps ( const cpp2::GetPropRequest& req) { auto * processor = GetPropProcessor::instance (env_, &kGetPropCounters, readerPool_.get ()); RETURN_FUTURE (processor); } GetPropProcessor::doProcess (){ if (!FLAGS_query_concurrently) { runInSingleThread (req); } else { runInMultipleThread (req); } } GetPropProcessor::runInSingleThread (){ auto plan = buildTagPlan (&contexts_.front (), &resultDataSet_); for (const auto & partEntry : req.get_parts ()) { auto partId = partEntry.first; for (const auto & row : partEntry.second) { auto vId = row.values[0 ].getStr (); plan.go (partId, vId); } } } GetPropProcessor::buildTagPlan (RuntimeContext* context, nebula::DataSet* result) { StoragePlan<VertexID> plan; std::vector<TagNode*> tags; for (const auto & tc : tagContext_.propContexts_) { auto tag = std::make_unique <TagNode>(context, &tagContext_, tc.first, &tc.second); tags.emplace_back (tag.get ()); plan.addNode (std::move (tag)); } auto output = std::make_unique <GetTagPropNode>( context, tags, result, filter_ == nullptr ? nullptr : filter_->clone (), limit_, &tagContext_); for (auto * tag : tags) { output->addDependency (tag); } plan.addNode (std::move (output)); return plan; } TagNode::doExecute (){ key_ = NebulaKeyUtils::tagKey (context_->vIdLen (), partId, vId, tagId_); ret = context_->env ()->kvstore_->get (context_->spaceId (), partId, key_, &value_); reader_.reset (*schemas_, value_); } GetTagPropNode::doExecute (){ List row; if (context_->isIntId ()) { row.emplace_back (*reinterpret_cast <const int64_t *>(vId.data ())); } else { row.emplace_back (vId); } for (auto * tagNode : tagNodes_) { ret = tagNode->collectTagPropsIfValid ( [&row, vIdLen, isIntId, tagNode, this ](){ auto status = QueryUtils::collectVertexProps ( key, vIdLen, isIntId, reader, props, row, expCtx_.get (), tagNode->getTagName ()); }); } } QueryUtils::collectVertexProps (){ for (const auto & prop : *props) { auto value = QueryUtils::readVertexProp (key, vIdLen, isIntId, reader, prop); list.emplace_back (value.value ()); } } QueryUtils::readVertexProp (){ switch (prop.propInKeyType_) { case PropContext::PropInKeyType::NONE: { return readValue (reader, prop.name_, prop.field_); } case PropContext::PropInKeyType::VID: { auto vId = NebulaKeyUtils::getVertexId (vIdLen, key); if (isIntId) { return *reinterpret_cast <const int64_t *>(vId.data ()); } else { return vId.subpiece (0 , vId.find_first_of ('\0' )).toString (); } } case PropContext::PropInKeyType::TAG: { auto tag = NebulaKeyUtils::getTagId (vIdLen, key); return tag; } } }
四、nebula中value是如何编码的 从2.x开始起nebula使用新的编码方式version 2,由5个部分组成:
1 2 3 <header> <schema version> <NULL flags> <all properties> <string content> | | | | 1 byte 0 - 7 bytes 0+ bytes N bytes
header: 占用1个字节,用来标识schema version字节长度,目前取值07schema version: 07个字节,标识schema版本NULL flags: NULL字段标识,跟mysql设计类似,为NULL的字段就在对应的bit位设置为1all properties: 固定长度的属性值,根据schema进行排列,由于固定长度所以想获取某个字段的值可以直接计算offset+length就能拿到, 除 STRING 类型属性外,所有属性都就地存储。STRING 属性将字符串内容的偏移量存储在前 4 个字节中,将字符串的长度存储在后 4 个字节中。字符串内容将附加到编码字符串的末尾,相当于多了一层指针string content: 存储非固定长度的string字符串,获取string属性内容得多次计算,先计算string字符串指针位置,然后在根据指针报错的offset+length得到完整的string内容
各类型占用内存空间大小:
1 2 3 4 5 6 7 8 9 10 11 12 13 BOOL (1 byte) INT8 (1 byte) INT16 (2 bytes) INT32 (4 bytes) INT64 (8 bytes) FLOAT (4 bytes) DOUBLE (8 bytes) STRING (8 bytes) * FIXED_STRING (Length defined in the schema) TIMESTAMP (8 bytes) DATE (4 bytes) DATETIME (15 bytes) GEOGRAPHY (8 bytes) *
关键代码: RowWriterV2.cpp和RowWriterV2.h
五、nebula中TTL是如何实现的 nebula中TTL是通过rocksdb中KVFilter实现的,KVFilter是rocksdb对外保留的扩展接口,查询和compact时用来做数据的过滤,数据每次搜索时都会经过此Filter,nebula实现此接口用来过滤TTL过期的数据,以及rocksdb做compact时筛选出过期的数据让其删除。
相关知识点: TTL到期的数据可能并未删除,仅仅只是查询不出来,如果此时加大TTL或者删除TTL,原来查询不到的数据又会查询出来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 StorageServer::getStoreInstance (){ kvstore::KVOptions options; options.compaction_filter_factory = std::make_unique <StorageCompactionFilterFactoryBuilder>(schemaMan_.get (), indexMan_.get ()); status = rocksdb::DB::Open (options, path, &db); } class StorageCompactionFilter final : public kvstore::KVFilter { bool filter (int level, GraphSpaceID spaceId, const folly::StringPiece& key, const folly::StringPiece& val) const override { if (level < FLAGS_min_level_for_custom_filter) { return false ; } if (NebulaKeyUtils::isTag (vIdLen_, key)) { return !tagValid (spaceId, key, val); } else if (NebulaKeyUtils::isEdge (vIdLen_, key)) { return !edgeValid (spaceId, key, val); } else if (IndexKeyUtils::isIndexKey (key)) { return !indexValid (spaceId, key, val); } else if (!FLAGS_use_vertex_key && NebulaKeyUtils::isVertex (key)) { return true ; } else if (NebulaKeyUtils::isLock (vIdLen_, key)) { return !lockValid (spaceId, key); } else { VLOG (3 ) << "Skip the system key inside, key " << key; } return false ; } private : bool tagValid (GraphSpaceID spaceId, const folly::StringPiece& key, const folly::StringPiece& val) const { ...... if (ttlExpired (schema.get (), reader.get ())) { VLOG (3 ) << "Ttl expired" ; return false ; } return true ; } bool edgeValid (GraphSpaceID spaceId, const folly::StringPiece& key, const folly::StringPiece& val) const { .... if (ttlExpired (schema.get (), reader.get ())) { VLOG (3 ) << "Ttl expired" ; return false ; } return true ; } bool ttlExpired (const meta::NebulaSchemaProvider* schema, nebula::RowReaderWrapper* reader) const { if (schema == nullptr ) { return true ; } auto ttl = CommonUtils::ttlProps (schema); if (!ttl.first) { return false ; } return CommonUtils::checkDataExpiredForTTL (schema, reader, ttl.second.second, ttl.second.first); } bool ttlExpired (const meta::NebulaSchemaProvider* schema, const Value& v) const { if (schema == nullptr ) { return true ; } auto ttl = CommonUtils::ttlProps (schema); if (!ttl.first) { return false ; } return CommonUtils::checkDataExpiredForTTL (schema, v, ttl.second.second, ttl.second.first); } }; bool CommonUtils::checkDataExpiredForTTL (const meta::NebulaSchemaProvider* schema, const Value& v, const std::string& ttlCol, int64_t ttlDuration) { const auto & ftype = schema->getFieldType (ttlCol); if (ftype != nebula::cpp2::PropertyType::TIMESTAMP && ftype != nebula::cpp2::PropertyType::INT64) { return false ; } int64_t now; if (!FLAGS_ttl_use_ms) { now = std::time (nullptr ); } else { auto t = std::chrono::system_clock::now (); now = std::chrono::duration_cast <std::chrono::milliseconds>(t.time_since_epoch ()).count (); } if (v.isInt () && (now > (v.getInt () + ttlDuration))) { VLOG (2 ) << "ttl expired" ; return true ; } return false ; } std::pair<bool , std::pair<int64_t , std::string>> CommonUtils::ttlProps ( const meta::NebulaSchemaProvider* schema) { DCHECK (schema != nullptr ); const auto * ns = dynamic_cast <const meta::NebulaSchemaProvider*>(schema); const auto sp = ns->getProp (); int64_t duration = 0 ; if (sp.get_ttl_duration ()) { duration = *sp.get_ttl_duration (); } std::string col; if (sp.get_ttl_col ()) { col = *sp.get_ttl_col (); } return std::make_pair (!(duration <= 0 || col.empty ()), std::make_pair (duration, col)); } StatusOr<Value> CommonUtils::ttlValue (const meta::NebulaSchemaProvider* schema, RowReaderWrapper* reader) { DCHECK (schema != nullptr ); const auto * ns = dynamic_cast <const meta::NebulaSchemaProvider*>(schema); auto ttlProp = ttlProps (ns); if (!ttlProp.first) { return Status::Error (); } return reader->getValueByName (std::move (ttlProp).second.second); }
六、nebula中是如何管理session的 session是客户端和服务端nebula-graphd交互的凭证,session保存在nebula-metad中,session的创建和销毁都需要跟nebula-metad交互,后面将分析session的创建和销毁。相关知识点
client客户端通过sessionId来标识session
nebula-graphd重启不会影响已创建的session
nebula-metad将session保存到rocksdb中
总体步骤
nebula-client创建连接并调用用户验证创建session
nebula-graphd接收到client的请求验证,并调用nebula-metad进行session创建
nebula-metad生成sessionId并通过raft协议进行保存
session的创建 nebula-client-3.6 客户端代码:创建nebula连接并认证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private NebulaSession createSessionObject (SessionState state) { SyncConnection connection = new SyncConnection (); connection.open(getAddress(), sessionPoolConfig.getTimeout()); AuthResult authResult = connection.authenticate(sessionPoolConfig.getUsername(), sessionPoolConfig.getPassword()); NebulaSession nebulaSession = new NebulaSession (connection, authResult.getSessionId(), authResult.getTimezoneOffset(), state); return nebulaSession; } public ResultSet execute (String stmt) throws IOErrorException { return new ResultSet (connection.execute(sessionID, stmt), timezoneOffset); } public ExecutionResponse executeWithParameter (long sessionId, byte [] stmt, Map parameterMap) throws TException{ ContextStack ctx = getContextStack("GraphService.executeWithParameter" , null ); this .setContextStack(ctx); send_executeWithParameter(sessionId, stmt, parameterMap); return recv_executeWithParameter(); }
nebula-graphd接收client的授权请求,并返回sessionId
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 GraphService::future_authenticate (const std::string& username,const std::string& password){ auto authResult = auth (username, password); if (!authResult.ok ()) { ctx->resp ().errorCode = ErrorCode::E_BAD_USERNAME_PASSWORD; return future; } if (sessionManager_->isOutOfConnections ()) { ctx->resp ().errorCode = ErrorCode::E_TOO_MANY_CONNECTIONS; return future; } sessionManager_->createSession (username, clientIp, getThreadManager ()) } GraphSessionManager::createSession (){ std::string key = userName + clientIp; auto maxSessions = FLAGS_max_sessions_per_ip_per_user; auto uiscFindPtr = sessionCnt (key); if (uiscFindPtr->get () > maxSessions - 1 ) { return Status::Error ("Create Session failed: Too many sessions created" ); } return metaClient_->createSession (userName, myAddr_, clientIp) } MetaClient::createSession (){ client->future_createSession (request) }
nebula-metad接收到nebula-graphd请求进行session创建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 MetaServiceHandler::future_createSession ( const cpp2::CreateSessionReq& req) { auto * processor = CreateSessionProcessor::instance (kvstore_); RETURN_FUTURE (processor); } CreateSessionProcessor::process (){ cpp2::Session session; session.session_id_ref () = time::WallClock::fastNowInMicroSec (); session.create_time_ref () = session.get_session_id (); session.update_time_ref () = session.get_create_time (); session.user_name_ref () = user; session.graph_addr_ref () = req.get_graph_addr (); session.client_ip_ref () = req.get_client_ip (); std::vector<kvstore::KV> data; data.emplace_back (MetaKeyUtils::sessionKey (session.get_session_id ()), MetaKeyUtils::sessionVal (session)); resp_.session_ref () = session; ret = doSyncPut (std::move (data)); } static const PartitionID kDefaultPartId = 0 ;static const GraphSpaceID kDefaultSpaceId = 0 ;BaseProcessor<RESP>::doSyncPut (){ kvstore_->asyncMultiPut (kDefaultSpaceId,kDefaultPartId) } NebulaStore::asyncMultiPut (){ auto ret = part (spaceId, partId); auto part = nebula::value (ret); part->asyncMultiPut (std::move (keyValues), std::move (cb)); } Part::asyncMultiPut (){ std::string log = encodeMultiValues (OP_MULTI_PUT, keyValues); appendLogAsync (source, LogType::NORMAL, std::move (log)) }
session的销毁 session的销毁通过两种途径:
client客户端主动signout
空闲时间超过session_idle_time自动被清除。
方式一、client客户端主动quit
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 ---------nebula-client------------------ public void release () { connection.signout(sessionID); connection.close(); } public void signout (long sessionId) { ContextStack ctx = getContextStack("GraphService.signout" , null ); this .setContextStack(ctx); send_signout(sessionId); } ---------nebula-graphd------------------ GraphService::signout(int64_t sessionId) { VLOG(2 ) << "Sign out session " << sessionId; sessionManager_->removeSession(sessionId); } GraphSessionManager::removeMultiSessions(){ metaClient_->removeSessions(ids) }
方式二、nebula-graphd定时任务清除过期session
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 GraphSessionManager::threadFunc () { reclaimExpiredSessions (); scavenger_->addDelayTask ( FLAGS_session_reclaim_interval_secs * 1000 , &GraphSessionManager::threadFunc, this ); } GraphSessionManager::reclaimExpiredSessions () { std::vector<SessionID> expiredSessions; for (const auto & iter : activeSessions_) { int32_t idleSecs = iter.second->idleSeconds (); if (idleSecs < FLAGS_session_idle_timeout_secs) { continue ; } expiredSessions.emplace_back (iter.first); } } metaClient_->removeSessions (std::move (expiredSessions))
nebula错误日志中未知错误码如何定位原因 nebula错误输出有时只输出错误码,未输出错误原因,此时可通过源码src/interface/common.thrift文件进行搜索即可,我这里贴一些常见错误码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 E_DISCONNECTED = -1, // Lost connection E_FAIL_TO_CONNECT = -2, // Unable to establish connection E_RPC_FAILURE = -3, // RPC failure E_LEADER_CHANGED = -4, // Raft leader has been changed // 1xxx for graphd E_BAD_USERNAME_PASSWORD = -1001, // Authentication failed E_SESSION_INVALID = -1002, // Invalid session E_SESSION_TIMEOUT = -1003, // Session timeout E_SYNTAX_ERROR = -1004, // Syntax error E_EXECUTION_ERROR = -1005, // Execution error E_STATEMENT_EMPTY = -1006, // Statement is empty E_SEMANTIC_ERROR = -1009, // Semantic error E_TOO_MANY_CONNECTIONS = -1010, // Maximum number of connections exceeded E_PARTIAL_SUCCEEDED = -1011, // Access to storage failed (only some requests succeeded) // 2xxx for metad E_NO_HOSTS = -2001, // Host does not exist E_EXISTED = -2002, // Host already exists E_INVALID_HOST = -2003, // Invalid host E_UNSUPPORTED = -2004, // The current command, statement, or function is not supported E_NOT_DROP = -2005, // Not allowed to drop E_BALANCER_RUNNING = -2006, // The balancer is running E_CONFIG_IMMUTABLE = -2007, // Configuration items cannot be changed E_CONFLICT = -2008, // Parameters conflict with meta data E_SESSION_NOT_FOUND = -2069, // Session does not exist // 3xxx for storaged E_CONSENSUS_ERROR = -3001, // Consensus cannot be reached during an election E_KEY_HAS_EXISTS = -3002, // Key already exists E_DATA_TYPE_MISMATCH = -3003, // Data type mismatch E_INVALID_FIELD_VALUE = -3004, // Invalid field value E_INVALID_OPERATION = -3005, // Invalid operation E_NOT_NULLABLE = -3006, // Current value is not allowed to be empty E_FIELD_UNSET = -3007, // Field value must be set if the field value is NOT NULL or has no default value E_OUT_OF_RANGE = -3008, // The value is out of the range of the current type E_DATA_CONFLICT_ERROR = -3010, // Data conflict, for index write without toss.
通过深入分析 NebulaGraph 的源码,我们可以看到它在高效存储、查询、数据编码和 TTL 管理方面的设计细节。希望本文能为开发者们提供一些实用的参考,帮助大家更好地理解 NebulaGraph 的工作原理,也希望通过共同学习,推动技术进步和创新。