public TagItem getSpecialVersionTagItem(String spaceName, int tagId, long version) { if (!spaceTagItemVersions.containsKey(spaceName)) { Map<Integer, Map<Long, TagItem>> tagVersionMap = Maps.newHashMap(); List<TagItem> allTagItemList = getTags(spaceName); if (!allTagItemList.isEmpty()) { allTagItemList.forEach(tagItem -> tagVersionMap.computeIfAbsent(tagItem.getTag_id(), k -> Maps.newHashMap()).put(tagItem.getVersion(), tagItem)); spaceTagItemVersions.put(spaceName, tagVersionMap); } } Map<Integer, Map<Long, TagItem>> tagVersionMap = spaceTagItemVersions.get(spaceName); if (Objects.isNull(tagVersionMap)) { return null; } return tagVersionMap.getOrDefault(tagId, Collections.emptyMap()).get(version); }
private RowReader getRowReader(String spaceName, ScanVertex scanTag, Map<Integer, RowReader> readers) { int tagId = scanTag.getTagId(); // 解析当前tag数据的schema版本 long schemaVersion = NebulaUtils.parseSchemaVersion(scanTag.getValue()); Map<Integer, Map<Long, RowReader>> tagVersionReaderMap = spaceTagVersionReaders.computeIfAbsent(spaceName, k -> new ConcurrentHashMap<>()); Map<Long, RowReader> versionReaderMap = tagVersionReaderMap.computeIfAbsent(tagId, k -> new ConcurrentHashMap<>()); RowReader reader = versionReaderMap.get(schemaVersion); if (reader != null) { return reader; } //构建对应schema版本的RowReader TagItem tagItem = metaClient.getSpecialVersionTagItem(spaceName, tagId, schemaVersion); if (tagItem != null) { log.debug("Add special version tagItem | spaceName:{} | tagId:{} | schemaVersion:{}", spaceName, tagId, schemaVersion); versionReaderMap.computeIfAbsent(schemaVersion, k -> new RowReader(tagItem.schema, tagItem.version)); return versionReaderMap.get(schemaVersion); } RowReader rowReader = readers.get(tagId); if (rowReader == null) { log.error("Not match vertex reader | spaceName:{} | tagId:{} | schemaVersion:{} | data={}", spaceName, tagId, schemaVersion, Hex.encodeHexString(scanTag.value)); } return rowReader; } //解析数据的schema版本,根据nebula源码翻译成的java代码 public static long parseSchemaVersion(byte[] row) { if (row == null || row.length == 0) { System.err.println("Row data is empty, so there is no schema version"); return 0; } // The first three bits indicate the number of bytes for the // schema version. If the number is zero, no schema version // presents int verBytes = row[0] >> 5; int ver = 0; if (verBytes > 0) { if (verBytes + 1 > row.length) { // Data is too short // System.err.println("Row data is too short"); return 0; } // Schema Version is stored in Little Endian for (int i = 0; i < verBytes; i++) { ver |= ((int) row[i + 1] << (8 * i)); } } return ver; }
//增加cursor参数 public Iterator<ScanVertexResponse> scanVertex( String space, int part, Map<String, List<String>> returnCols, boolean allCols, int limit, long startTime, long endTime, byte[] cursor) throws IOException { HostAndPort leader = getLeader(space, part); if (Objects.isNull(leader)) { throw new IllegalArgumentException("Part " + part + " not found in space " + space); }