From c963598fce19f052cafdab27e1b35ad0ee6248b1 Mon Sep 17 00:00:00 2001 From: Bryan Ogawa Date: Thu, 5 Aug 2021 15:27:23 -0700 Subject: [PATCH 1/2] Add an example Value Index. for the moment, this is simply a JSON index that maps values to the Entity ID of entities containing that value, although it would be simple to extend that to include the row within that entity as well. This is a proof of concept for the ability to add additional indices which might improve query performance. --- .../armor/columnfile/ColumnFileListener.java | 9 ++ .../armor/columnfile/ColumnFileReader.java | 6 +- .../armor/columnfile/ColumnFileSection.java | 3 +- .../com/rapid7/armor/schema/DataType.java | 46 ++++++++++ .../armor/write/component/IndexUpdater.java | 8 ++ .../armor/write/component/RowGroupWriter.java | 20 +++-- .../write/component/ValueIndexWriter.java | 83 +++++++++++++++++++ .../armor/write/writers/ColumnFileWriter.java | 35 +++++++- .../writers/ColumnFileReaderWriterTest.java | 80 ++++++++++++++++++ 9 files changed, 280 insertions(+), 10 deletions(-) create mode 100644 armor-write/src/main/java/com/rapid7/armor/write/component/IndexUpdater.java create mode 100644 armor-write/src/main/java/com/rapid7/armor/write/component/ValueIndexWriter.java diff --git a/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileListener.java b/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileListener.java index a96b412..71f902e 100644 --- a/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileListener.java +++ b/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileListener.java @@ -6,6 +6,15 @@ @FunctionalInterface public interface ColumnFileListener { + /** + * Listener callback. For each section of the column file other than the metadata, this will be called. + * @param armorSection file section type + * @param metadata metadata information for this column file. + * @param inputStream data to be read. + * @param compressedLength if the data is compressed, this value will be > 0. If uncompressed, value will be 0. + * @param uncompressedLength the uncompressed length of the data. if compressedLength is 0, this is the number of bytes available to read. + * @return number of bytes read by this function from inputStream . For backward compatibility, function should return 0 bytes for unhandled @{ColumnFileSection}s, such as unknown types. + */ int columnFileSection( ColumnFileSection armorSection, ColumnMetadata metadata, DataInputStream inputStream, int compressedLength, int uncompressedLength); } diff --git a/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileReader.java b/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileReader.java index 64fadd5..cdb3a04 100644 --- a/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileReader.java +++ b/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileReader.java @@ -95,14 +95,16 @@ private void readV2(DataInputStream dataInputStream, ColumnFileListener listener int readBytes = 0; if (entry.sectionType == ColumnFileSection.METADATA) { readBytes = readMetadata(dataInputStream); - } else - { + } else { readBytes = readSection(dataInputStream, listener, entry.sectionType); } + totalBytesRead += readBytes; } } + // Reads the section by calling back to the listener, if the listener exists. + // listener must return the # of bytes it has read from the dataInputStream. private int readSection(DataInputStream dataInputStream, ColumnFileListener listener, ColumnFileSection sectionType) throws IOException { diff --git a/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileSection.java b/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileSection.java index 6bb7362..ce43d5d 100644 --- a/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileSection.java +++ b/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileSection.java @@ -9,7 +9,8 @@ public enum ColumnFileSection { ENTITY_DICTIONARY(2), VALUE_DICTIONARY(3), ENTITY_INDEX(4), - ROWGROUP(5); + ROWGROUP(5), + VALUE_INDEX(6); private final int sectionID; diff --git a/armor-base/src/main/java/com/rapid7/armor/schema/DataType.java b/armor-base/src/main/java/com/rapid7/armor/schema/DataType.java index 818c84d..4728d6c 100644 --- a/armor-base/src/main/java/com/rapid7/armor/schema/DataType.java +++ b/armor-base/src/main/java/com/rapid7/armor/schema/DataType.java @@ -207,4 +207,50 @@ public void traverseByteBuffer(ByteBuffer valueByteBuffer, RoaringBitmap nilBitM rowCount++; } } + + public Number fromString(String key) + { + if ("null".equals(key)) { + return null; + } else { + switch (this) { + case LONG: + case DATETIME: + return Long.parseLong(key); + case FLOAT: + return Float.parseFloat(key); + case INTEGER: + case STRING: + return Integer.parseInt(key); + case BOOLEAN: + return Byte.parseByte(key); + case DOUBLE: + return Double.parseDouble(key); + } + } + throw new IllegalStateException("unknown object type"); + } + + public String toString(Object val) { + if (val == null) { + return "null"; + } else { + switch (this) { + case LONG: + case DATETIME: + return Long.toString((Long) val); + case FLOAT: + return Float.toString((Float) val); + case INTEGER: + case STRING: + return Integer.toString((Integer) val); + case BOOLEAN: + return Byte.toString((Byte) val); + case DOUBLE: + return Double.toString((Double) val); + } + } + throw new IllegalStateException("unknown object type"); + } + } diff --git a/armor-write/src/main/java/com/rapid7/armor/write/component/IndexUpdater.java b/armor-write/src/main/java/com/rapid7/armor/write/component/IndexUpdater.java new file mode 100644 index 0000000..dffd665 --- /dev/null +++ b/armor-write/src/main/java/com/rapid7/armor/write/component/IndexUpdater.java @@ -0,0 +1,8 @@ +package com.rapid7.armor.write.component; + +public interface IndexUpdater +{ + void add(Number value, Integer entityId, Integer rowCount); + + void finish(); +} diff --git a/armor-write/src/main/java/com/rapid7/armor/write/component/RowGroupWriter.java b/armor-write/src/main/java/com/rapid7/armor/write/component/RowGroupWriter.java index 02100dc..ee1452b 100644 --- a/armor-write/src/main/java/com/rapid7/armor/write/component/RowGroupWriter.java +++ b/armor-write/src/main/java/com/rapid7/armor/write/component/RowGroupWriter.java @@ -177,6 +177,7 @@ public void customTraverseThoughValues(List records, Consumer cardinality = new HashSet<>(); @@ -187,8 +188,9 @@ private class MetadataUpdater { private boolean success; - MetadataUpdater(ColumnMetadata metadata) { + MetadataUpdater(ColumnMetadata metadata, IndexUpdater indexUpdater) { this.metadata = metadata; + this.indexUpdater = indexUpdater; prevMax = metadata.getMaxValue(); prevMax = metadata.getMinValue(); @@ -232,6 +234,11 @@ public void updateRow(EntityRecord eir) throws IOException { if (nilRb == null || !nilRb.contains(rowCount)) { metadata.handleMinMax(value); cardinality.add(value); + if(indexUpdater != null) + indexUpdater.add(value, eir.getEntityId(), rowCount); + } else { + if(indexUpdater != null) + indexUpdater.add(null, eir.getEntityId(), rowCount); } }); } @@ -258,6 +265,9 @@ private ByteBuffer insureByteBufferIsBigEnough(int len, ByteBuffer buf) { public void finishUpdate() { metadata.setCardinality(cardinality.size()); + if (indexUpdater != null) { + indexUpdater.finish(); + } success = true; } } @@ -278,7 +288,7 @@ public void finishUpdate() { // from within the for loop. public void runThoughValues(ColumnMetadata metadata, List records) throws IOException { long previousPosition = position(); - MetadataUpdater metadataUpdater = new MetadataUpdater(metadata); + MetadataUpdater metadataUpdater = new MetadataUpdater(metadata, null); try { for (EntityRecord eir : records) { if (eir.getDeleted() == 1) @@ -452,13 +462,13 @@ private RgOffsetWriteResult fillNullValues(ByteBuffer bytesBuffer, int numNull) * @throws IOException If an io error occurs. */ public List compact(List entitiesToKeep) throws IOException { - return compactAndUpdateMetadata(entitiesToKeep, null); + return compactAndUpdateMetadata(entitiesToKeep, null, null); } - public List compactAndUpdateMetadata(List entitiesToKeep, ColumnMetadata metadata) throws IOException { + public List compactAndUpdateMetadata(List entitiesToKeep, ColumnMetadata metadata, IndexUpdater indexUpdater) throws IOException { MetadataUpdater metadataUpdater = null; if (metadata != null) { - metadataUpdater = new MetadataUpdater(metadata); + metadataUpdater = new MetadataUpdater(metadata, indexUpdater); } int totalRequiredBytes = entitiesToKeep.stream().mapToInt(EntityRecord::totalLength).sum(); ByteBuffer output = ByteBuffer.allocate(totalRequiredBytes * 2); diff --git a/armor-write/src/main/java/com/rapid7/armor/write/component/ValueIndexWriter.java b/armor-write/src/main/java/com/rapid7/armor/write/component/ValueIndexWriter.java new file mode 100644 index 0000000..593c425 --- /dev/null +++ b/armor-write/src/main/java/com/rapid7/armor/write/component/ValueIndexWriter.java @@ -0,0 +1,83 @@ +package com.rapid7.armor.write.component; + +import com.rapid7.armor.schema.DataType; +import com.rapid7.armor.shard.ColumnShardId; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * + */ +public class ValueIndexWriter implements Component, IndexUpdater { + private final ColumnShardId columnShardId; + private final DataType dataType; + private Map> valToEntities = new HashMap<>(); + private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public ValueIndexWriter(ColumnShardId columnShardId) { + this.columnShardId = columnShardId; + this.dataType = columnShardId.getColumnId().dataType(); + } + + public ValueIndexWriter(ColumnShardId columnShardId, byte[] json) throws IOException { + this(columnShardId); + @SuppressWarnings("unchecked") + Map> map = OBJECT_MAPPER.readValue(json, Map.class); + valToEntities = new HashMap<>(); + int highestSurrogate = -1; + for (Map.Entry > e : map.entrySet()) { + Number val = dataType.fromString(e.getKey()); + valToEntities.put(val, new HashSet<>(e.getValue())); + } + } + + public Map> getValToEntities() { return valToEntities; } + + public boolean isEmpty() { + return valToEntities.isEmpty(); + } + + private byte[] toBytes() throws JsonProcessingException { + HashMap> serializable = new HashMap<>(); + for (Map.Entry> e : valToEntities.entrySet()) { + serializable.put(this.dataType.toString(e.getKey()), new ArrayList<>(e.getValue())); + } + return OBJECT_MAPPER.writeValueAsBytes(serializable); + } + + @Override + public InputStream getInputStream() throws IOException { + return new ByteArrayInputStream(toBytes()); + } + + @Override + public long getCurrentSize() throws IOException { + return toBytes().length; + } + + + + @Override public void add(Number value, Integer entityId, Integer rowCount) + { + Set entities = this.valToEntities.get(value); + if (entities == null) { + entities = new HashSet<>(); + this.valToEntities.put(value, entities); + } + entities.add(entityId); + } + + @Override public void finish() + { + // don't need to do anything + } +} diff --git a/armor-write/src/main/java/com/rapid7/armor/write/writers/ColumnFileWriter.java b/armor-write/src/main/java/com/rapid7/armor/write/writers/ColumnFileWriter.java index 6a40078..3fc0443 100644 --- a/armor-write/src/main/java/com/rapid7/armor/write/writers/ColumnFileWriter.java +++ b/armor-write/src/main/java/com/rapid7/armor/write/writers/ColumnFileWriter.java @@ -20,6 +20,7 @@ import com.rapid7.armor.write.component.EntityIndexWriter; import com.rapid7.armor.write.component.RowGroupWriter; import com.rapid7.armor.write.component.RowGroupWriter.RgOffsetWriteResult; +import com.rapid7.armor.write.component.ValueIndexWriter; import com.fasterxml.jackson.databind.ObjectMapper; import com.github.luben.zstd.RecyclingBufferPool; import com.github.luben.zstd.Zstd; @@ -65,9 +66,12 @@ public class ColumnFileWriter implements AutoCloseable { private ColumnMetadata metadata; private DictionaryWriter valueDictionary; private DictionaryWriter entityDictionary; + private ValueIndexWriter valueIndexWriter; private final ColumnShardId columnShardId; private final String ROWGROUP_STORE_SUFFIX = "_rowgroup-"; private final String ENTITYINDEX_STORE_SUFFIX = "_entityindex-"; + private final String VALUEINDEX_STORE_SUFFIX = "_valueindex-"; + private boolean skipMetaData = false; private boolean alwaysCompact = true; @@ -88,6 +92,7 @@ public ColumnFileWriter(ColumnShardId columnShardId) throws IOException { entityDictionary = new DictionaryWriter(true); rowGroupWriter = new RowGroupWriter(TempFileUtil.createTempFile(columnShardId.alternateString() + ROWGROUP_STORE_SUFFIX, ".armor"), columnShardId, valueDictionary); entityIndexWriter = new EntityIndexWriter(TempFileUtil.createTempFile(columnShardId.alternateString() + ENTITYINDEX_STORE_SUFFIX, ".armor"), columnShardId); + valueIndexWriter = new ValueIndexWriter(columnShardId); } public ColumnFileWriter(DataInputStream dataInputStream, ColumnShardId columnShardId) { @@ -114,6 +119,7 @@ public ColumnFileWriter(DataInputStream dataInputStream, ColumnShardId columnSha entityDictionary = new DictionaryWriter(true); rowGroupWriter = new RowGroupWriter(TempFileUtil.createTempFile(columnShardId.alternateString() + ROWGROUP_STORE_SUFFIX, ".armor"), columnShardId, valueDictionary); entityIndexWriter = new EntityIndexWriter(TempFileUtil.createTempFile(columnShardId.alternateString() + ENTITYINDEX_STORE_SUFFIX, ".armor"), columnShardId); + valueIndexWriter = new ValueIndexWriter(columnShardId); } } catch (IOException ioe) { throw new RuntimeException(ioe); @@ -201,7 +207,23 @@ private int loadValueDictionary(DataInputStream inputStream, int compressed, int } return read; } - + + private int loadValueIndex(DataInputStream inputStream, int compressed, int uncompressed) throws IOException { + // Load str value dictionary + int read = 0; + if (compressed > 0) { + byte[] compressedDict = new byte[compressed]; + read = IOTools.readFully(inputStream, compressedDict, 0, compressed); + byte[] decompressed = Zstd.decompress(compressedDict, uncompressed); + valueIndexWriter = new ValueIndexWriter(columnShardId, decompressed); + } else if (uncompressed > 0) { + byte[] uncompressedDict = new byte[uncompressed]; + read = IOTools.readFully(inputStream, uncompressedDict, 0, uncompressed); + valueIndexWriter = new ValueIndexWriter(columnShardId, uncompressedDict); + } + return read; + } + private int loadEntityIndex(DataInputStream inputStream, int compressed, int uncompressed, List temps) throws IOException { Path entityIndexTemp = TempFileUtil.createTempFile(columnShardId.alternateString() + ENTITYINDEX_STORE_SUFFIX, ".armor"); temps.add(entityIndexTemp); @@ -267,6 +289,8 @@ private void load(DataInputStream inputStream) throws IOException { return loadEntityIndex(is, compressed, uncompressed, tempPaths); } else if (section == ColumnFileSection.ROWGROUP) { return loadRowGroup(inputStream, compressed, uncompressed, tempPaths); + } else if (section == ColumnFileSection.VALUE_INDEX) { + return loadValueIndex(inputStream, compressed, uncompressed); } else return 0; } catch (IOException ioe) { @@ -511,6 +535,7 @@ public StreamProduct buildInputStreamV2(Compression compress) throws IOException sections.add(computeEntityDictionarySection(compress, tempPaths)); sections.add(computeValueDictionarySection(compress, tempPaths)); sections.add(computeEntityIndexSection(compress, tempPaths)); + sections.add(computeValueIndexSection(compress, tempPaths)); sections.add(computeRowGroupSection(compress, tempPaths)); ArrayList sequenceInputStreams = new ArrayList<>(); @@ -571,6 +596,12 @@ private Section computeEntityIndexSection(Compression compress, List tempP ColumnFileSection.ENTITY_INDEX, entityIndexWriter, null, columnShardId.alternateString()); } + private Section computeValueIndexSection(Compression compress, List tempPaths) throws IOException { + return computeSectionCompressible(compress, tempPaths, "value-index-temp_", + ColumnFileSection.VALUE_INDEX, valueIndexWriter, x -> x == null, columnShardId.alternateString()); + } + + private Section computeSectionCompressible( Compression compress, List tempPaths, String tempPrefix, ColumnFileSection sectionType, T component, Predicate isEmptyPredicate, String shardTempName) throws IOException { @@ -790,7 +821,7 @@ public void compact(List entitiesToKeep, boolean updateMeta } // With a list of sorted records, lets start the process of compaction - List adjustedRecords = rowGroupWriter.compactAndUpdateMetadata(entityRecords, metadataToUpdate); + List adjustedRecords = rowGroupWriter.compactAndUpdateMetadata(entityRecords, metadataToUpdate, valueIndexWriter); entityIndexWriter.compactAndUpdateMetadata(adjustedRecords, metadataToUpdate); // Now hard-deleted deleted entites from entity index writer. diff --git a/armor-write/src/test/java/com/rapid7/armor/write/writers/ColumnFileReaderWriterTest.java b/armor-write/src/test/java/com/rapid7/armor/write/writers/ColumnFileReaderWriterTest.java index a1715bf..833f0bf 100644 --- a/armor-write/src/test/java/com/rapid7/armor/write/writers/ColumnFileReaderWriterTest.java +++ b/armor-write/src/test/java/com/rapid7/armor/write/writers/ColumnFileReaderWriterTest.java @@ -24,8 +24,10 @@ import java.util.*; import com.rapid7.armor.write.WriteRequest; +import com.rapid7.armor.write.component.ValueIndexWriter; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -86,6 +88,84 @@ public void testWriteThenReadV2() return; } + static private class FoundValueIndex { + boolean foundValueIndex = false; + }; + + @Test + public void testValueIndex() + throws IOException { + ModShardStrategy shardStrategy = new ModShardStrategy(10); + int entity1Shard = shardStrategy.shardNum(1); + ColumnId testColumn = new ColumnId("vuln", DataType.LONG); + + ColumnShardId columnShardId = new ColumnShardId(new ShardId(TENANT, TABLE, INTERVAL.getInterval(), INTERVAL.getIntervalStart(TIMESTAMP), entity1Shard), testColumn); + + ColumnFileWriter cfw = new ColumnFileWriter(columnShardId); + List writeRequests = new ArrayList<>(); + for(int i = 0 ; i < 10 ; ++i) + { + Number entityId = 1000 + i; + long version = 1; + String randomId = UUID.randomUUID().toString(); + Column ecv = new Column(testColumn); + for (long j = 0 ; j < 10; ++j) + { + ecv.addValue(j); + } + + WriteRequest wr = new WriteRequest(entityId, version, randomId, ecv); + writeRequests.add(wr); + } + cfw.write(writeRequests); + + StreamProduct result = cfw.buildInputStreamV2(Compression.NONE); + assertNotNull(result); + byte[] bytes = bytesFromStreamProduct(result); + assertEquals(result.getByteSize(), bytes.length); + assertTrue(bytes.length > 0); + + runColumnFileListener(bytes, printListener()); + FoundValueIndex fvi = new FoundValueIndex(); + runColumnFileListener(bytes, new ColumnFileListener() + { + @Override public int columnFileSection(ColumnFileSection armorSection, ColumnMetadata metadata, DataInputStream inputStream, int compressedLength, int uncompressedLength) + { + if (armorSection == ColumnFileSection.VALUE_INDEX) { + assertEquals(compressedLength, 0); + assertEquals(uncompressedLength, 561); + byte[] json = new byte[561]; + int bytesRead = 0; + try + { + bytesRead = inputStream.read(json); + } + catch (IOException e) + { + assertFalse(true, "error reading inputStream for 561 bytes"); + } + + System.out.println(json); + try + { + ValueIndexWriter x = new ValueIndexWriter(columnShardId, json); + Set vals = x.getValToEntities().get(0L); + assertEquals(10, vals.size()); + } + catch (IOException e) + { + assertFalse(true, "error while parsing valueIndexWriter"); + } + fvi.foundValueIndex = true; + return bytesRead; + } + return 0; + } + }); + assertTrue(fvi.foundValueIndex, "found value index"); + return; + } + private ColumnFileListener printListener() { return new ColumnFileListener() { @Override From a1416cf71884cf9aee857d18660abcd1d3f3213f Mon Sep 17 00:00:00 2001 From: Bryan Ogawa Date: Fri, 6 Aug 2021 15:41:21 -0700 Subject: [PATCH 2/2] Refactor code to make it more feasible to dynamically add indexes without changing so much code. This changes to a generic header for the table of contents, and the resulting section would contain a long which must uniquely identify the section type. There is still a missing way to dynamically register index types, but there is now an entry point in the form of a generic map that maps from the index type to the code. currently this is limited to the ColumnFileWriter, so changes would need to make the extension points visible when constructing ColumnFileWriter objects in callers. --- .../armor/columnfile/ColumnFileSection.java | 2 +- .../java/com/rapid7/armor/io/IOTools.java | 7 ++ .../component/ComponentHeaderWriter.java | 30 +++++++++ .../write/component/ExtendedIndexWriter.java | 17 +++++ .../component/ExtendedIndexWriterFactory.java | 7 ++ .../armor/write/component/IndexUpdater.java | 27 ++++++++ .../write/component/ValueIndexWriter.java | 10 ++- .../armor/write/writers/ColumnFileWriter.java | 66 ++++++++++++++----- .../writers/ColumnFileReaderWriterTest.java | 33 +++++----- 9 files changed, 160 insertions(+), 39 deletions(-) create mode 100644 armor-write/src/main/java/com/rapid7/armor/write/component/ComponentHeaderWriter.java create mode 100644 armor-write/src/main/java/com/rapid7/armor/write/component/ExtendedIndexWriter.java create mode 100644 armor-write/src/main/java/com/rapid7/armor/write/component/ExtendedIndexWriterFactory.java diff --git a/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileSection.java b/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileSection.java index ce43d5d..c1b22a8 100644 --- a/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileSection.java +++ b/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileSection.java @@ -10,7 +10,7 @@ public enum ColumnFileSection { VALUE_DICTIONARY(3), ENTITY_INDEX(4), ROWGROUP(5), - VALUE_INDEX(6); + EXTENDED_INDEX(6); private final int sectionID; diff --git a/armor-base/src/main/java/com/rapid7/armor/io/IOTools.java b/armor-base/src/main/java/com/rapid7/armor/io/IOTools.java index 3092c04..8402823 100644 --- a/armor-base/src/main/java/com/rapid7/armor/io/IOTools.java +++ b/armor-base/src/main/java/com/rapid7/armor/io/IOTools.java @@ -82,4 +82,11 @@ public static byte[] toByteArray(int value) { (byte) (value >> 24), (byte) (value >> 16), (byte) (value >> 8), (byte) value }; } + + public static byte[] toByteArray(long value) { + return new byte[] { + (byte) (value >> 56), (byte) (value >> 48), (byte) (value >> 40), (byte) (value >> 32), + (byte) (value >> 24), (byte) (value >> 16), (byte) (value >> 8), (byte) value + }; + } } diff --git a/armor-write/src/main/java/com/rapid7/armor/write/component/ComponentHeaderWriter.java b/armor-write/src/main/java/com/rapid7/armor/write/component/ComponentHeaderWriter.java new file mode 100644 index 0000000..b4c38d9 --- /dev/null +++ b/armor-write/src/main/java/com/rapid7/armor/write/component/ComponentHeaderWriter.java @@ -0,0 +1,30 @@ +package com.rapid7.armor.write.component; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.SequenceInputStream; + +public class ComponentHeaderWriter implements Component +{ + private final Component component; + private final byte[] header; + + public ComponentHeaderWriter(byte[] header, Component c) + { + this.component = c; + this.header = header; + } + + @Override public InputStream getInputStream() + throws IOException + { + return new SequenceInputStream(new ByteArrayInputStream(this.header), component.getInputStream()); + } + + @Override public long getCurrentSize() + throws IOException + { + return this.header.length + component.getCurrentSize(); + } +} diff --git a/armor-write/src/main/java/com/rapid7/armor/write/component/ExtendedIndexWriter.java b/armor-write/src/main/java/com/rapid7/armor/write/component/ExtendedIndexWriter.java new file mode 100644 index 0000000..66a36ac --- /dev/null +++ b/armor-write/src/main/java/com/rapid7/armor/write/component/ExtendedIndexWriter.java @@ -0,0 +1,17 @@ +package com.rapid7.armor.write.component; + +import java.io.IOException; +import java.io.InputStream; + +public interface ExtendedIndexWriter extends Component, IndexUpdater { + /** + * an integer extended type which will be used for serialization and dispatch in ColumnFileWriter. + * The ExtendedIndex section will include a section header that indicates which type it is. + * Thus, extendedType must be unique. + * + * @return the extendedType value. + */ + long extendedType(); + + void load(InputStream data) throws IOException; +} diff --git a/armor-write/src/main/java/com/rapid7/armor/write/component/ExtendedIndexWriterFactory.java b/armor-write/src/main/java/com/rapid7/armor/write/component/ExtendedIndexWriterFactory.java new file mode 100644 index 0000000..11f294a --- /dev/null +++ b/armor-write/src/main/java/com/rapid7/armor/write/component/ExtendedIndexWriterFactory.java @@ -0,0 +1,7 @@ +package com.rapid7.armor.write.component; + +import com.rapid7.armor.shard.ColumnShardId; + +public interface ExtendedIndexWriterFactory { + ExtendedIndexWriter constructWriter(ColumnShardId columnShardId); +} diff --git a/armor-write/src/main/java/com/rapid7/armor/write/component/IndexUpdater.java b/armor-write/src/main/java/com/rapid7/armor/write/component/IndexUpdater.java index dffd665..06795fd 100644 --- a/armor-write/src/main/java/com/rapid7/armor/write/component/IndexUpdater.java +++ b/armor-write/src/main/java/com/rapid7/armor/write/component/IndexUpdater.java @@ -1,8 +1,35 @@ package com.rapid7.armor.write.component; +import java.util.Collection; + public interface IndexUpdater { + void add(Number value, Integer entityId, Integer rowCount); void finish(); + + /** + * create an indexUpdater that wraps a collection of updaters. + * @return wrapping indexUpdater + */ + static IndexUpdater listIndexUpdater(final Collection updaters) { + IndexUpdater result = new IndexUpdater() { + + @Override public void add(Number value, Integer entityId, Integer rowCount) + { + for (IndexUpdater item : updaters) { + item.add(value, entityId, rowCount); + } + } + + @Override public void finish() + { + for (IndexUpdater item : updaters) { + item.finish(); + } + } + }; + return result; + } } diff --git a/armor-write/src/main/java/com/rapid7/armor/write/component/ValueIndexWriter.java b/armor-write/src/main/java/com/rapid7/armor/write/component/ValueIndexWriter.java index 593c425..5907480 100644 --- a/armor-write/src/main/java/com/rapid7/armor/write/component/ValueIndexWriter.java +++ b/armor-write/src/main/java/com/rapid7/armor/write/component/ValueIndexWriter.java @@ -17,7 +17,7 @@ /** * */ -public class ValueIndexWriter implements Component, IndexUpdater { +public class ValueIndexWriter implements ExtendedIndexWriter { private final ColumnShardId columnShardId; private final DataType dataType; private Map> valToEntities = new HashMap<>(); @@ -28,8 +28,12 @@ public ValueIndexWriter(ColumnShardId columnShardId) { this.dataType = columnShardId.getColumnId().dataType(); } - public ValueIndexWriter(ColumnShardId columnShardId, byte[] json) throws IOException { - this(columnShardId); + @Override public long extendedType() + { + return 0x31460001L; // value must be globally unique + } + + @Override public void load(InputStream json) throws IOException { @SuppressWarnings("unchecked") Map> map = OBJECT_MAPPER.readValue(json, Map.class); valToEntities = new HashMap<>(); diff --git a/armor-write/src/main/java/com/rapid7/armor/write/writers/ColumnFileWriter.java b/armor-write/src/main/java/com/rapid7/armor/write/writers/ColumnFileWriter.java index 3fc0443..741ebf3 100644 --- a/armor-write/src/main/java/com/rapid7/armor/write/writers/ColumnFileWriter.java +++ b/armor-write/src/main/java/com/rapid7/armor/write/writers/ColumnFileWriter.java @@ -15,9 +15,13 @@ import com.rapid7.armor.write.StreamProduct; import com.rapid7.armor.write.WriteRequest; import com.rapid7.armor.write.component.Component; +import com.rapid7.armor.write.component.ComponentHeaderWriter; import com.rapid7.armor.write.component.DictionaryWriter; import com.rapid7.armor.write.component.EntityIndexVariableWidthException; import com.rapid7.armor.write.component.EntityIndexWriter; +import com.rapid7.armor.write.component.ExtendedIndexWriter; +import com.rapid7.armor.write.component.ExtendedIndexWriterFactory; +import com.rapid7.armor.write.component.IndexUpdater; import com.rapid7.armor.write.component.RowGroupWriter; import com.rapid7.armor.write.component.RowGroupWriter.RgOffsetWriteResult; import com.rapid7.armor.write.component.ValueIndexWriter; @@ -66,11 +70,11 @@ public class ColumnFileWriter implements AutoCloseable { private ColumnMetadata metadata; private DictionaryWriter valueDictionary; private DictionaryWriter entityDictionary; - private ValueIndexWriter valueIndexWriter; + private ExtendedIndexWriterFactory[] extendedIndexFactories = { ValueIndexWriter::new }; + private Map extendedIndexes = new HashMap<>(); private final ColumnShardId columnShardId; private final String ROWGROUP_STORE_SUFFIX = "_rowgroup-"; private final String ENTITYINDEX_STORE_SUFFIX = "_entityindex-"; - private final String VALUEINDEX_STORE_SUFFIX = "_valueindex-"; private boolean skipMetaData = false; private boolean alwaysCompact = true; @@ -92,7 +96,11 @@ public ColumnFileWriter(ColumnShardId columnShardId) throws IOException { entityDictionary = new DictionaryWriter(true); rowGroupWriter = new RowGroupWriter(TempFileUtil.createTempFile(columnShardId.alternateString() + ROWGROUP_STORE_SUFFIX, ".armor"), columnShardId, valueDictionary); entityIndexWriter = new EntityIndexWriter(TempFileUtil.createTempFile(columnShardId.alternateString() + ENTITYINDEX_STORE_SUFFIX, ".armor"), columnShardId); - valueIndexWriter = new ValueIndexWriter(columnShardId); + + for (ExtendedIndexWriterFactory factory : extendedIndexFactories) { + ExtendedIndexWriter item = factory.constructWriter(columnShardId); + extendedIndexes.put(item.extendedType(), item); + } } public ColumnFileWriter(DataInputStream dataInputStream, ColumnShardId columnShardId) { @@ -119,7 +127,10 @@ public ColumnFileWriter(DataInputStream dataInputStream, ColumnShardId columnSha entityDictionary = new DictionaryWriter(true); rowGroupWriter = new RowGroupWriter(TempFileUtil.createTempFile(columnShardId.alternateString() + ROWGROUP_STORE_SUFFIX, ".armor"), columnShardId, valueDictionary); entityIndexWriter = new EntityIndexWriter(TempFileUtil.createTempFile(columnShardId.alternateString() + ENTITYINDEX_STORE_SUFFIX, ".armor"), columnShardId); - valueIndexWriter = new ValueIndexWriter(columnShardId); + for (ExtendedIndexWriterFactory factory : extendedIndexFactories) { + ExtendedIndexWriter item = factory.constructWriter(columnShardId); + extendedIndexes.put(item.extendedType(), item); + } } } catch (IOException ioe) { throw new RuntimeException(ioe); @@ -208,22 +219,35 @@ private int loadValueDictionary(DataInputStream inputStream, int compressed, int return read; } - private int loadValueIndex(DataInputStream inputStream, int compressed, int uncompressed) throws IOException { + private int loadExtendedIndex(DataInputStream inputStream, int compressed, int uncompressed) throws IOException { // Load str value dictionary int read = 0; + byte[] decompressed = null; if (compressed > 0) { byte[] compressedDict = new byte[compressed]; read = IOTools.readFully(inputStream, compressedDict, 0, compressed); - byte[] decompressed = Zstd.decompress(compressedDict, uncompressed); - valueIndexWriter = new ValueIndexWriter(columnShardId, decompressed); + decompressed = Zstd.decompress(compressedDict, uncompressed); } else if (uncompressed > 0) { - byte[] uncompressedDict = new byte[uncompressed]; - read = IOTools.readFully(inputStream, uncompressedDict, 0, uncompressed); - valueIndexWriter = new ValueIndexWriter(columnShardId, uncompressedDict); + decompressed = new byte[uncompressed]; + read = IOTools.readFully(inputStream, decompressed, 0, uncompressed); + } + if (uncompressed < 8) { + // no good + LOGGER.warn("Extended index section is too short - should be >= 8 bytes long, but was {}. Skipping processing of section.", uncompressed); + } else { + DataInputStream is = new DataInputStream(new ByteArrayInputStream(decompressed)); + long extendedType = is.readLong(); + + ExtendedIndexWriter index = extendedIndexes.get(extendedType); + if (index != null) + index.load(is); + else + LOGGER.warn("Unknown extended index type {}. Skipping processing of section.", extendedType); } return read; } + private int loadEntityIndex(DataInputStream inputStream, int compressed, int uncompressed, List temps) throws IOException { Path entityIndexTemp = TempFileUtil.createTempFile(columnShardId.alternateString() + ENTITYINDEX_STORE_SUFFIX, ".armor"); temps.add(entityIndexTemp); @@ -289,8 +313,8 @@ private void load(DataInputStream inputStream) throws IOException { return loadEntityIndex(is, compressed, uncompressed, tempPaths); } else if (section == ColumnFileSection.ROWGROUP) { return loadRowGroup(inputStream, compressed, uncompressed, tempPaths); - } else if (section == ColumnFileSection.VALUE_INDEX) { - return loadValueIndex(inputStream, compressed, uncompressed); + } else if (section == ColumnFileSection.EXTENDED_INDEX) { + return loadExtendedIndex(inputStream, compressed, uncompressed); } else return 0; } catch (IOException ioe) { @@ -535,7 +559,10 @@ public StreamProduct buildInputStreamV2(Compression compress) throws IOException sections.add(computeEntityDictionarySection(compress, tempPaths)); sections.add(computeValueDictionarySection(compress, tempPaths)); sections.add(computeEntityIndexSection(compress, tempPaths)); - sections.add(computeValueIndexSection(compress, tempPaths)); + for ( Map.Entry e : extendedIndexes.entrySet() ) { + sections.add(computeExtendedIndexSection(e, compress, tempPaths)); + + } sections.add(computeRowGroupSection(compress, tempPaths)); ArrayList sequenceInputStreams = new ArrayList<>(); @@ -596,12 +623,14 @@ private Section computeEntityIndexSection(Compression compress, List tempP ColumnFileSection.ENTITY_INDEX, entityIndexWriter, null, columnShardId.alternateString()); } - private Section computeValueIndexSection(Compression compress, List tempPaths) throws IOException { - return computeSectionCompressible(compress, tempPaths, "value-index-temp_", - ColumnFileSection.VALUE_INDEX, valueIndexWriter, x -> x == null, columnShardId.alternateString()); + private Section computeExtendedIndexSection(Map.Entry e, Compression compress, List tempPaths) + throws IOException + { + ComponentHeaderWriter ch = new ComponentHeaderWriter(IOTools.toByteArray(e.getKey()), e.getValue()); + return computeSectionCompressible(compress, tempPaths, "extended-index-temp_", + ColumnFileSection.EXTENDED_INDEX, ch, x -> x == null, columnShardId.alternateString()); } - private Section computeSectionCompressible( Compression compress, List tempPaths, String tempPrefix, ColumnFileSection sectionType, T component, Predicate isEmptyPredicate, String shardTempName) throws IOException { @@ -821,7 +850,8 @@ public void compact(List entitiesToKeep, boolean updateMeta } // With a list of sorted records, lets start the process of compaction - List adjustedRecords = rowGroupWriter.compactAndUpdateMetadata(entityRecords, metadataToUpdate, valueIndexWriter); + //List adjustedRecords = rowGroupWriter.compactAndUpdateMetadata(entityRecords, metadataToUpdate, valueIndexWriter); + List adjustedRecords = rowGroupWriter.compactAndUpdateMetadata(entityRecords, metadataToUpdate, IndexUpdater.listIndexUpdater( extendedIndexes.values() )); entityIndexWriter.compactAndUpdateMetadata(adjustedRecords, metadataToUpdate); // Now hard-deleted deleted entites from entity index writer. diff --git a/armor-write/src/test/java/com/rapid7/armor/write/writers/ColumnFileReaderWriterTest.java b/armor-write/src/test/java/com/rapid7/armor/write/writers/ColumnFileReaderWriterTest.java index 833f0bf..54e34ec 100644 --- a/armor-write/src/test/java/com/rapid7/armor/write/writers/ColumnFileReaderWriterTest.java +++ b/armor-write/src/test/java/com/rapid7/armor/write/writers/ColumnFileReaderWriterTest.java @@ -8,6 +8,7 @@ import com.rapid7.armor.entity.EntityRecordSummary; import com.rapid7.armor.interval.Interval; import com.rapid7.armor.io.Compression; +import com.rapid7.armor.io.IOTools; import com.rapid7.armor.meta.ColumnMetadata; import com.rapid7.armor.schema.ColumnId; import com.rapid7.armor.schema.DataType; @@ -17,9 +18,11 @@ import com.rapid7.armor.write.StreamProduct; import com.google.common.io.ByteSource; import com.google.common.io.ByteStreams; +import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.time.Instant; import java.util.*; @@ -131,33 +134,29 @@ public void testValueIndex() { @Override public int columnFileSection(ColumnFileSection armorSection, ColumnMetadata metadata, DataInputStream inputStream, int compressedLength, int uncompressedLength) { - if (armorSection == ColumnFileSection.VALUE_INDEX) { + if (armorSection == ColumnFileSection.EXTENDED_INDEX) { assertEquals(compressedLength, 0); - assertEquals(uncompressedLength, 561); - byte[] json = new byte[561]; - int bytesRead = 0; - try - { - bytesRead = inputStream.read(json); - } - catch (IOException e) - { - assertFalse(true, "error reading inputStream for 561 bytes"); - } + assertEquals(uncompressedLength, 569); - System.out.println(json); try { - ValueIndexWriter x = new ValueIndexWriter(columnShardId, json); + long uuid = inputStream.readLong(); + assertEquals(0x31460001, uuid); + byte[] raw = new byte[561]; + int bytesRead = 0; + bytesRead = inputStream.read(raw); + + ValueIndexWriter x = new ValueIndexWriter(columnShardId); + x.load(new ByteArrayInputStream(raw)); Set vals = x.getValToEntities().get(0L); assertEquals(10, vals.size()); + fvi.foundValueIndex = true; + return bytesRead; } catch (IOException e) { - assertFalse(true, "error while parsing valueIndexWriter"); + throw new RuntimeException("Unexpected IO exception within columnFileSection", e); } - fvi.foundValueIndex = true; - return bytesRead; } return 0; }