public TagItem getSpecialVersionTagItem(String spaceName, int tagId, long version) { if (!spaceTagItemVersions.containsKey(spaceName)) { Map<Integer, Map<Long, TagItem>> tagVersionMap = Maps.newHashMap(); List<TagItem> allTagItemList = getTags(spaceName); if (!allTagItemList.isEmpty()) { allTagItemList.forEach(tagItem -> tagVersionMap.computeIfAbsent(tagItem.getTag_id(), k -> Maps.newHashMap()).put(tagItem.getVersion(), tagItem)); spaceTagItemVersions.put(spaceName, tagVersionMap); } } Map<Integer, Map<Long, TagItem>> tagVersionMap = spaceTagItemVersions.get(spaceName); if (Objects.isNull(tagVersionMap)) { return null; } return tagVersionMap.getOrDefault(tagId, Collections.emptyMap()).get(version); }
private RowReader getRowReader(String spaceName, ScanVertex scanTag, Map<Integer, RowReader> readers) { int tagId = scanTag.getTagId(); // 解析当前tag数据的schema版本 long schemaVersion = NebulaUtils.parseSchemaVersion(scanTag.getValue()); Map<Integer, Map<Long, RowReader>> tagVersionReaderMap = spaceTagVersionReaders.computeIfAbsent(spaceName, k -> new ConcurrentHashMap<>()); Map<Long, RowReader> versionReaderMap = tagVersionReaderMap.computeIfAbsent(tagId, k -> new ConcurrentHashMap<>()); RowReader reader = versionReaderMap.get(schemaVersion); if (reader != null) { return reader; } //构建对应schema版本的RowReader TagItem tagItem = metaClient.getSpecialVersionTagItem(spaceName, tagId, schemaVersion); if (tagItem != null) { log.debug("Add special version tagItem | spaceName:{} | tagId:{} | schemaVersion:{}", spaceName, tagId, schemaVersion); versionReaderMap.computeIfAbsent(schemaVersion, k -> new RowReader(tagItem.schema, tagItem.version)); return versionReaderMap.get(schemaVersion); } RowReader rowReader = readers.get(tagId); if (rowReader == null) { log.error("Not match vertex reader | spaceName:{} | tagId:{} | schemaVersion:{} | data={}", spaceName, tagId, schemaVersion, Hex.encodeHexString(scanTag.value)); } return rowReader; } //解析数据的schema版本,根据nebula源码翻译成的java代码 public static long parseSchemaVersion(byte[] row) { if (row == null || row.length == 0) { System.err.println("Row data is empty, so there is no schema version"); return 0; } // The first three bits indicate the number of bytes for the // schema version. If the number is zero, no schema version // presents int verBytes = row[0] >> 5; int ver = 0; if (verBytes > 0) { if (verBytes + 1 > row.length) { // Data is too short // System.err.println("Row data is too short"); return 0; } // Schema Version is stored in Little Endian for (int i = 0; i < verBytes; i++) { ver |= ((int) row[i + 1] << (8 * i)); } } return ver; }
//增加cursor参数 public Iterator<ScanVertexResponse> scanVertex( String space, int part, Map<String, List<String>> returnCols, boolean allCols, int limit, long startTime, long endTime, byte[] cursor) throws IOException { HostAndPort leader = getLeader(space, part); if (Objects.isNull(leader)) { throw new IllegalArgumentException("Part " + part + " not found in space " + space); }
// TODO(panda) Optimize the method in the future boolttlExpired(const meta::NebulaSchemaProvider* schema, nebula::RowReaderWrapper* reader)const{ if (schema == nullptr) { returntrue; } auto ttl = CommonUtils::ttlProps(schema); // Only support the specified ttl_col mode // Not specifying or non-positive ttl_duration behaves like ttl_duration = // infinity if (!ttl.first) { returnfalse; } //这里比对数据的ttl字段的值和当前时间,如果过期就返回true return CommonUtils::checkDataExpiredForTTL(schema, reader, ttl.second.second, ttl.second.first); }
boolttlExpired(const meta::NebulaSchemaProvider* schema, const Value& v)const{ if (schema == nullptr) { returntrue; } auto ttl = CommonUtils::ttlProps(schema); if (!ttl.first) { returnfalse; } return CommonUtils::checkDataExpiredForTTL(schema, v, ttl.second.second, ttl.second.first); } };
int64_t now; // The unit of ttl expiration unit is controlled by user, we just use a gflag here. if (!FLAGS_ttl_use_ms) { now = std::time(nullptr); } else { auto t = std::chrono::system_clock::now(); now = std::chrono::duration_cast<std::chrono::milliseconds>(t.time_since_epoch()).count(); }
// if the value is not INT type (sush as NULL), it will never expire. // TODO (sky) : DateTime if (v.isInt() && (now > (v.getInt() + ttlDuration))) { VLOG(2) << "ttl expired"; returntrue; } returnfalse; }
NebulaStore::asyncMultiPut(){ //获取session会话分片信息,spaceId和 partId都为0 auto ret = part(spaceId, partId); auto part = nebula::value(ret); //分片数据写入 part->asyncMultiPut(std::move(keyValues), std::move(cb)); }
E_DISCONNECTED = -1, // Lost connection E_FAIL_TO_CONNECT = -2, // Unable to establish connection E_RPC_FAILURE = -3, // RPC failure E_LEADER_CHANGED = -4, // Raft leader has been changed
// 1xxx for graphd E_BAD_USERNAME_PASSWORD = -1001, // Authentication failed E_SESSION_INVALID = -1002, // Invalid session E_SESSION_TIMEOUT = -1003, // Session timeout E_SYNTAX_ERROR = -1004, // Syntax error E_EXECUTION_ERROR = -1005, // Execution error E_STATEMENT_EMPTY = -1006, // Statement is empty E_SEMANTIC_ERROR = -1009, // Semantic error E_TOO_MANY_CONNECTIONS = -1010, // Maximum number of connections exceeded E_PARTIAL_SUCCEEDED = -1011, // Access to storage failed (only some requests succeeded)
// 2xxx for metad E_NO_HOSTS = -2001, // Host does not exist E_EXISTED = -2002, // Host already exists E_INVALID_HOST = -2003, // Invalid host E_UNSUPPORTED = -2004, // The current command, statement, or function is not supported E_NOT_DROP = -2005, // Not allowed to drop E_BALANCER_RUNNING = -2006, // The balancer is running E_CONFIG_IMMUTABLE = -2007, // Configuration items cannot be changed E_CONFLICT = -2008, // Parameters conflict with meta data E_SESSION_NOT_FOUND = -2069, // Session does not exist
// 3xxx for storaged E_CONSENSUS_ERROR = -3001, // Consensus cannot be reached during an election E_KEY_HAS_EXISTS = -3002, // Key already exists E_DATA_TYPE_MISMATCH = -3003, // Data type mismatch E_INVALID_FIELD_VALUE = -3004, // Invalid field value E_INVALID_OPERATION = -3005, // Invalid operation E_NOT_NULLABLE = -3006, // Current value is not allowed to be empty E_FIELD_UNSET = -3007, // Field value must be set if the field value is NOT NULL or has no default value E_OUT_OF_RANGE = -3008, // The value is out of the range of the current type E_DATA_CONFLICT_ERROR = -3010, // Data conflict, for index write without toss.
Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.