diff --git a/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileListener.java b/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileListener.java index a96b412..71f902e 100644 --- a/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileListener.java +++ b/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileListener.java @@ -6,6 +6,15 @@ @FunctionalInterface public interface ColumnFileListener { + /** + * Listener callback. For each section of the column file other than the metadata, this will be called. + * @param armorSection file section type + * @param metadata metadata information for this column file. + * @param inputStream data to be read. + * @param compressedLength if the data is compressed, this value will be > 0. If uncompressed, value will be 0. + * @param uncompressedLength the uncompressed length of the data. if compressedLength is 0, this is the number of bytes available to read. + * @return number of bytes read by this function from inputStream . For backward compatibility, function should return 0 bytes for unhandled @{ColumnFileSection}s, such as unknown types. + */ int columnFileSection( ColumnFileSection armorSection, ColumnMetadata metadata, DataInputStream inputStream, int compressedLength, int uncompressedLength); } diff --git a/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileReader.java b/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileReader.java index 64fadd5..cdb3a04 100644 --- a/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileReader.java +++ b/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileReader.java @@ -95,14 +95,16 @@ private void readV2(DataInputStream dataInputStream, ColumnFileListener listener int readBytes = 0; if (entry.sectionType == ColumnFileSection.METADATA) { readBytes = readMetadata(dataInputStream); - } else - { + } else { readBytes = readSection(dataInputStream, listener, entry.sectionType); } + totalBytesRead += readBytes; } } + // Reads the section by calling back to the listener, if the listener exists. + // listener must return the # of bytes it has read from the dataInputStream. private int readSection(DataInputStream dataInputStream, ColumnFileListener listener, ColumnFileSection sectionType) throws IOException { diff --git a/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileSection.java b/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileSection.java index 6bb7362..c1b22a8 100644 --- a/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileSection.java +++ b/armor-base/src/main/java/com/rapid7/armor/columnfile/ColumnFileSection.java @@ -9,7 +9,8 @@ public enum ColumnFileSection { ENTITY_DICTIONARY(2), VALUE_DICTIONARY(3), ENTITY_INDEX(4), - ROWGROUP(5); + ROWGROUP(5), + EXTENDED_INDEX(6); private final int sectionID; diff --git a/armor-base/src/main/java/com/rapid7/armor/io/IOTools.java b/armor-base/src/main/java/com/rapid7/armor/io/IOTools.java index 3092c04..8402823 100644 --- a/armor-base/src/main/java/com/rapid7/armor/io/IOTools.java +++ b/armor-base/src/main/java/com/rapid7/armor/io/IOTools.java @@ -82,4 +82,11 @@ public static byte[] toByteArray(int value) { (byte) (value >> 24), (byte) (value >> 16), (byte) (value >> 8), (byte) value }; } + + public static byte[] toByteArray(long value) { + return new byte[] { + (byte) (value >> 56), (byte) (value >> 48), (byte) (value >> 40), (byte) (value >> 32), + (byte) (value >> 24), (byte) (value >> 16), (byte) (value >> 8), (byte) value + }; + } } diff --git a/armor-base/src/main/java/com/rapid7/armor/schema/DataType.java b/armor-base/src/main/java/com/rapid7/armor/schema/DataType.java index 818c84d..4728d6c 100644 --- a/armor-base/src/main/java/com/rapid7/armor/schema/DataType.java +++ b/armor-base/src/main/java/com/rapid7/armor/schema/DataType.java @@ -207,4 +207,50 @@ public void traverseByteBuffer(ByteBuffer valueByteBuffer, RoaringBitmap nilBitM rowCount++; } } + + public Number fromString(String key) + { + if ("null".equals(key)) { + return null; + } else { + switch (this) { + case LONG: + case DATETIME: + return Long.parseLong(key); + case FLOAT: + return Float.parseFloat(key); + case INTEGER: + case STRING: + return Integer.parseInt(key); + case BOOLEAN: + return Byte.parseByte(key); + case DOUBLE: + return Double.parseDouble(key); + } + } + throw new IllegalStateException("unknown object type"); + } + + public String toString(Object val) { + if (val == null) { + return "null"; + } else { + switch (this) { + case LONG: + case DATETIME: + return Long.toString((Long) val); + case FLOAT: + return Float.toString((Float) val); + case INTEGER: + case STRING: + return Integer.toString((Integer) val); + case BOOLEAN: + return Byte.toString((Byte) val); + case DOUBLE: + return Double.toString((Double) val); + } + } + throw new IllegalStateException("unknown object type"); + } + } diff --git a/armor-write/src/main/java/com/rapid7/armor/write/component/ComponentHeaderWriter.java b/armor-write/src/main/java/com/rapid7/armor/write/component/ComponentHeaderWriter.java new file mode 100644 index 0000000..b4c38d9 --- /dev/null +++ b/armor-write/src/main/java/com/rapid7/armor/write/component/ComponentHeaderWriter.java @@ -0,0 +1,30 @@ +package com.rapid7.armor.write.component; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.SequenceInputStream; + +public class ComponentHeaderWriter implements Component +{ + private final Component component; + private final byte[] header; + + public ComponentHeaderWriter(byte[] header, Component c) + { + this.component = c; + this.header = header; + } + + @Override public InputStream getInputStream() + throws IOException + { + return new SequenceInputStream(new ByteArrayInputStream(this.header), component.getInputStream()); + } + + @Override public long getCurrentSize() + throws IOException + { + return this.header.length + component.getCurrentSize(); + } +} diff --git a/armor-write/src/main/java/com/rapid7/armor/write/component/ExtendedIndexWriter.java b/armor-write/src/main/java/com/rapid7/armor/write/component/ExtendedIndexWriter.java new file mode 100644 index 0000000..66a36ac --- /dev/null +++ b/armor-write/src/main/java/com/rapid7/armor/write/component/ExtendedIndexWriter.java @@ -0,0 +1,17 @@ +package com.rapid7.armor.write.component; + +import java.io.IOException; +import java.io.InputStream; + +public interface ExtendedIndexWriter extends Component, IndexUpdater { + /** + * an integer extended type which will be used for serialization and dispatch in ColumnFileWriter. + * The ExtendedIndex section will include a section header that indicates which type it is. + * Thus, extendedType must be unique. + * + * @return the extendedType value. + */ + long extendedType(); + + void load(InputStream data) throws IOException; +} diff --git a/armor-write/src/main/java/com/rapid7/armor/write/component/ExtendedIndexWriterFactory.java b/armor-write/src/main/java/com/rapid7/armor/write/component/ExtendedIndexWriterFactory.java new file mode 100644 index 0000000..11f294a --- /dev/null +++ b/armor-write/src/main/java/com/rapid7/armor/write/component/ExtendedIndexWriterFactory.java @@ -0,0 +1,7 @@ +package com.rapid7.armor.write.component; + +import com.rapid7.armor.shard.ColumnShardId; + +public interface ExtendedIndexWriterFactory { + ExtendedIndexWriter constructWriter(ColumnShardId columnShardId); +} diff --git a/armor-write/src/main/java/com/rapid7/armor/write/component/IndexUpdater.java b/armor-write/src/main/java/com/rapid7/armor/write/component/IndexUpdater.java new file mode 100644 index 0000000..06795fd --- /dev/null +++ b/armor-write/src/main/java/com/rapid7/armor/write/component/IndexUpdater.java @@ -0,0 +1,35 @@ +package com.rapid7.armor.write.component; + +import java.util.Collection; + +public interface IndexUpdater +{ + + void add(Number value, Integer entityId, Integer rowCount); + + void finish(); + + /** + * create an indexUpdater that wraps a collection of updaters. + * @return wrapping indexUpdater + */ + static IndexUpdater listIndexUpdater(final Collection updaters) { + IndexUpdater result = new IndexUpdater() { + + @Override public void add(Number value, Integer entityId, Integer rowCount) + { + for (IndexUpdater item : updaters) { + item.add(value, entityId, rowCount); + } + } + + @Override public void finish() + { + for (IndexUpdater item : updaters) { + item.finish(); + } + } + }; + return result; + } +} diff --git a/armor-write/src/main/java/com/rapid7/armor/write/component/RowGroupWriter.java b/armor-write/src/main/java/com/rapid7/armor/write/component/RowGroupWriter.java index 02100dc..ee1452b 100644 --- a/armor-write/src/main/java/com/rapid7/armor/write/component/RowGroupWriter.java +++ b/armor-write/src/main/java/com/rapid7/armor/write/component/RowGroupWriter.java @@ -177,6 +177,7 @@ public void customTraverseThoughValues(List records, Consumer cardinality = new HashSet<>(); @@ -187,8 +188,9 @@ private class MetadataUpdater { private boolean success; - MetadataUpdater(ColumnMetadata metadata) { + MetadataUpdater(ColumnMetadata metadata, IndexUpdater indexUpdater) { this.metadata = metadata; + this.indexUpdater = indexUpdater; prevMax = metadata.getMaxValue(); prevMax = metadata.getMinValue(); @@ -232,6 +234,11 @@ public void updateRow(EntityRecord eir) throws IOException { if (nilRb == null || !nilRb.contains(rowCount)) { metadata.handleMinMax(value); cardinality.add(value); + if(indexUpdater != null) + indexUpdater.add(value, eir.getEntityId(), rowCount); + } else { + if(indexUpdater != null) + indexUpdater.add(null, eir.getEntityId(), rowCount); } }); } @@ -258,6 +265,9 @@ private ByteBuffer insureByteBufferIsBigEnough(int len, ByteBuffer buf) { public void finishUpdate() { metadata.setCardinality(cardinality.size()); + if (indexUpdater != null) { + indexUpdater.finish(); + } success = true; } } @@ -278,7 +288,7 @@ public void finishUpdate() { // from within the for loop. public void runThoughValues(ColumnMetadata metadata, List records) throws IOException { long previousPosition = position(); - MetadataUpdater metadataUpdater = new MetadataUpdater(metadata); + MetadataUpdater metadataUpdater = new MetadataUpdater(metadata, null); try { for (EntityRecord eir : records) { if (eir.getDeleted() == 1) @@ -452,13 +462,13 @@ private RgOffsetWriteResult fillNullValues(ByteBuffer bytesBuffer, int numNull) * @throws IOException If an io error occurs. */ public List compact(List entitiesToKeep) throws IOException { - return compactAndUpdateMetadata(entitiesToKeep, null); + return compactAndUpdateMetadata(entitiesToKeep, null, null); } - public List compactAndUpdateMetadata(List entitiesToKeep, ColumnMetadata metadata) throws IOException { + public List compactAndUpdateMetadata(List entitiesToKeep, ColumnMetadata metadata, IndexUpdater indexUpdater) throws IOException { MetadataUpdater metadataUpdater = null; if (metadata != null) { - metadataUpdater = new MetadataUpdater(metadata); + metadataUpdater = new MetadataUpdater(metadata, indexUpdater); } int totalRequiredBytes = entitiesToKeep.stream().mapToInt(EntityRecord::totalLength).sum(); ByteBuffer output = ByteBuffer.allocate(totalRequiredBytes * 2); diff --git a/armor-write/src/main/java/com/rapid7/armor/write/component/ValueIndexWriter.java b/armor-write/src/main/java/com/rapid7/armor/write/component/ValueIndexWriter.java new file mode 100644 index 0000000..5907480 --- /dev/null +++ b/armor-write/src/main/java/com/rapid7/armor/write/component/ValueIndexWriter.java @@ -0,0 +1,87 @@ +package com.rapid7.armor.write.component; + +import com.rapid7.armor.schema.DataType; +import com.rapid7.armor.shard.ColumnShardId; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * + */ +public class ValueIndexWriter implements ExtendedIndexWriter { + private final ColumnShardId columnShardId; + private final DataType dataType; + private Map> valToEntities = new HashMap<>(); + private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public ValueIndexWriter(ColumnShardId columnShardId) { + this.columnShardId = columnShardId; + this.dataType = columnShardId.getColumnId().dataType(); + } + + @Override public long extendedType() + { + return 0x31460001L; // value must be globally unique + } + + @Override public void load(InputStream json) throws IOException { + @SuppressWarnings("unchecked") + Map> map = OBJECT_MAPPER.readValue(json, Map.class); + valToEntities = new HashMap<>(); + int highestSurrogate = -1; + for (Map.Entry > e : map.entrySet()) { + Number val = dataType.fromString(e.getKey()); + valToEntities.put(val, new HashSet<>(e.getValue())); + } + } + + public Map> getValToEntities() { return valToEntities; } + + public boolean isEmpty() { + return valToEntities.isEmpty(); + } + + private byte[] toBytes() throws JsonProcessingException { + HashMap> serializable = new HashMap<>(); + for (Map.Entry> e : valToEntities.entrySet()) { + serializable.put(this.dataType.toString(e.getKey()), new ArrayList<>(e.getValue())); + } + return OBJECT_MAPPER.writeValueAsBytes(serializable); + } + + @Override + public InputStream getInputStream() throws IOException { + return new ByteArrayInputStream(toBytes()); + } + + @Override + public long getCurrentSize() throws IOException { + return toBytes().length; + } + + + + @Override public void add(Number value, Integer entityId, Integer rowCount) + { + Set entities = this.valToEntities.get(value); + if (entities == null) { + entities = new HashSet<>(); + this.valToEntities.put(value, entities); + } + entities.add(entityId); + } + + @Override public void finish() + { + // don't need to do anything + } +} diff --git a/armor-write/src/main/java/com/rapid7/armor/write/writers/ColumnFileWriter.java b/armor-write/src/main/java/com/rapid7/armor/write/writers/ColumnFileWriter.java index 6a40078..741ebf3 100644 --- a/armor-write/src/main/java/com/rapid7/armor/write/writers/ColumnFileWriter.java +++ b/armor-write/src/main/java/com/rapid7/armor/write/writers/ColumnFileWriter.java @@ -15,11 +15,16 @@ import com.rapid7.armor.write.StreamProduct; import com.rapid7.armor.write.WriteRequest; import com.rapid7.armor.write.component.Component; +import com.rapid7.armor.write.component.ComponentHeaderWriter; import com.rapid7.armor.write.component.DictionaryWriter; import com.rapid7.armor.write.component.EntityIndexVariableWidthException; import com.rapid7.armor.write.component.EntityIndexWriter; +import com.rapid7.armor.write.component.ExtendedIndexWriter; +import com.rapid7.armor.write.component.ExtendedIndexWriterFactory; +import com.rapid7.armor.write.component.IndexUpdater; import com.rapid7.armor.write.component.RowGroupWriter; import com.rapid7.armor.write.component.RowGroupWriter.RgOffsetWriteResult; +import com.rapid7.armor.write.component.ValueIndexWriter; import com.fasterxml.jackson.databind.ObjectMapper; import com.github.luben.zstd.RecyclingBufferPool; import com.github.luben.zstd.Zstd; @@ -65,9 +70,12 @@ public class ColumnFileWriter implements AutoCloseable { private ColumnMetadata metadata; private DictionaryWriter valueDictionary; private DictionaryWriter entityDictionary; + private ExtendedIndexWriterFactory[] extendedIndexFactories = { ValueIndexWriter::new }; + private Map extendedIndexes = new HashMap<>(); private final ColumnShardId columnShardId; private final String ROWGROUP_STORE_SUFFIX = "_rowgroup-"; private final String ENTITYINDEX_STORE_SUFFIX = "_entityindex-"; + private boolean skipMetaData = false; private boolean alwaysCompact = true; @@ -88,6 +96,11 @@ public ColumnFileWriter(ColumnShardId columnShardId) throws IOException { entityDictionary = new DictionaryWriter(true); rowGroupWriter = new RowGroupWriter(TempFileUtil.createTempFile(columnShardId.alternateString() + ROWGROUP_STORE_SUFFIX, ".armor"), columnShardId, valueDictionary); entityIndexWriter = new EntityIndexWriter(TempFileUtil.createTempFile(columnShardId.alternateString() + ENTITYINDEX_STORE_SUFFIX, ".armor"), columnShardId); + + for (ExtendedIndexWriterFactory factory : extendedIndexFactories) { + ExtendedIndexWriter item = factory.constructWriter(columnShardId); + extendedIndexes.put(item.extendedType(), item); + } } public ColumnFileWriter(DataInputStream dataInputStream, ColumnShardId columnShardId) { @@ -114,6 +127,10 @@ public ColumnFileWriter(DataInputStream dataInputStream, ColumnShardId columnSha entityDictionary = new DictionaryWriter(true); rowGroupWriter = new RowGroupWriter(TempFileUtil.createTempFile(columnShardId.alternateString() + ROWGROUP_STORE_SUFFIX, ".armor"), columnShardId, valueDictionary); entityIndexWriter = new EntityIndexWriter(TempFileUtil.createTempFile(columnShardId.alternateString() + ENTITYINDEX_STORE_SUFFIX, ".armor"), columnShardId); + for (ExtendedIndexWriterFactory factory : extendedIndexFactories) { + ExtendedIndexWriter item = factory.constructWriter(columnShardId); + extendedIndexes.put(item.extendedType(), item); + } } } catch (IOException ioe) { throw new RuntimeException(ioe); @@ -201,7 +218,36 @@ private int loadValueDictionary(DataInputStream inputStream, int compressed, int } return read; } - + + private int loadExtendedIndex(DataInputStream inputStream, int compressed, int uncompressed) throws IOException { + // Load str value dictionary + int read = 0; + byte[] decompressed = null; + if (compressed > 0) { + byte[] compressedDict = new byte[compressed]; + read = IOTools.readFully(inputStream, compressedDict, 0, compressed); + decompressed = Zstd.decompress(compressedDict, uncompressed); + } else if (uncompressed > 0) { + decompressed = new byte[uncompressed]; + read = IOTools.readFully(inputStream, decompressed, 0, uncompressed); + } + if (uncompressed < 8) { + // no good + LOGGER.warn("Extended index section is too short - should be >= 8 bytes long, but was {}. Skipping processing of section.", uncompressed); + } else { + DataInputStream is = new DataInputStream(new ByteArrayInputStream(decompressed)); + long extendedType = is.readLong(); + + ExtendedIndexWriter index = extendedIndexes.get(extendedType); + if (index != null) + index.load(is); + else + LOGGER.warn("Unknown extended index type {}. Skipping processing of section.", extendedType); + } + return read; + } + + private int loadEntityIndex(DataInputStream inputStream, int compressed, int uncompressed, List temps) throws IOException { Path entityIndexTemp = TempFileUtil.createTempFile(columnShardId.alternateString() + ENTITYINDEX_STORE_SUFFIX, ".armor"); temps.add(entityIndexTemp); @@ -267,6 +313,8 @@ private void load(DataInputStream inputStream) throws IOException { return loadEntityIndex(is, compressed, uncompressed, tempPaths); } else if (section == ColumnFileSection.ROWGROUP) { return loadRowGroup(inputStream, compressed, uncompressed, tempPaths); + } else if (section == ColumnFileSection.EXTENDED_INDEX) { + return loadExtendedIndex(inputStream, compressed, uncompressed); } else return 0; } catch (IOException ioe) { @@ -511,6 +559,10 @@ public StreamProduct buildInputStreamV2(Compression compress) throws IOException sections.add(computeEntityDictionarySection(compress, tempPaths)); sections.add(computeValueDictionarySection(compress, tempPaths)); sections.add(computeEntityIndexSection(compress, tempPaths)); + for ( Map.Entry e : extendedIndexes.entrySet() ) { + sections.add(computeExtendedIndexSection(e, compress, tempPaths)); + + } sections.add(computeRowGroupSection(compress, tempPaths)); ArrayList sequenceInputStreams = new ArrayList<>(); @@ -571,6 +623,14 @@ private Section computeEntityIndexSection(Compression compress, List tempP ColumnFileSection.ENTITY_INDEX, entityIndexWriter, null, columnShardId.alternateString()); } + private Section computeExtendedIndexSection(Map.Entry e, Compression compress, List tempPaths) + throws IOException + { + ComponentHeaderWriter ch = new ComponentHeaderWriter(IOTools.toByteArray(e.getKey()), e.getValue()); + return computeSectionCompressible(compress, tempPaths, "extended-index-temp_", + ColumnFileSection.EXTENDED_INDEX, ch, x -> x == null, columnShardId.alternateString()); + } + private Section computeSectionCompressible( Compression compress, List tempPaths, String tempPrefix, ColumnFileSection sectionType, T component, Predicate isEmptyPredicate, String shardTempName) throws IOException { @@ -790,7 +850,8 @@ public void compact(List entitiesToKeep, boolean updateMeta } // With a list of sorted records, lets start the process of compaction - List adjustedRecords = rowGroupWriter.compactAndUpdateMetadata(entityRecords, metadataToUpdate); + //List adjustedRecords = rowGroupWriter.compactAndUpdateMetadata(entityRecords, metadataToUpdate, valueIndexWriter); + List adjustedRecords = rowGroupWriter.compactAndUpdateMetadata(entityRecords, metadataToUpdate, IndexUpdater.listIndexUpdater( extendedIndexes.values() )); entityIndexWriter.compactAndUpdateMetadata(adjustedRecords, metadataToUpdate); // Now hard-deleted deleted entites from entity index writer. diff --git a/armor-write/src/test/java/com/rapid7/armor/write/writers/ColumnFileReaderWriterTest.java b/armor-write/src/test/java/com/rapid7/armor/write/writers/ColumnFileReaderWriterTest.java index a1715bf..54e34ec 100644 --- a/armor-write/src/test/java/com/rapid7/armor/write/writers/ColumnFileReaderWriterTest.java +++ b/armor-write/src/test/java/com/rapid7/armor/write/writers/ColumnFileReaderWriterTest.java @@ -8,6 +8,7 @@ import com.rapid7.armor.entity.EntityRecordSummary; import com.rapid7.armor.interval.Interval; import com.rapid7.armor.io.Compression; +import com.rapid7.armor.io.IOTools; import com.rapid7.armor.meta.ColumnMetadata; import com.rapid7.armor.schema.ColumnId; import com.rapid7.armor.schema.DataType; @@ -17,15 +18,19 @@ import com.rapid7.armor.write.StreamProduct; import com.google.common.io.ByteSource; import com.google.common.io.ByteStreams; +import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.time.Instant; import java.util.*; import com.rapid7.armor.write.WriteRequest; +import com.rapid7.armor.write.component.ValueIndexWriter; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -86,6 +91,80 @@ public void testWriteThenReadV2() return; } + static private class FoundValueIndex { + boolean foundValueIndex = false; + }; + + @Test + public void testValueIndex() + throws IOException { + ModShardStrategy shardStrategy = new ModShardStrategy(10); + int entity1Shard = shardStrategy.shardNum(1); + ColumnId testColumn = new ColumnId("vuln", DataType.LONG); + + ColumnShardId columnShardId = new ColumnShardId(new ShardId(TENANT, TABLE, INTERVAL.getInterval(), INTERVAL.getIntervalStart(TIMESTAMP), entity1Shard), testColumn); + + ColumnFileWriter cfw = new ColumnFileWriter(columnShardId); + List writeRequests = new ArrayList<>(); + for(int i = 0 ; i < 10 ; ++i) + { + Number entityId = 1000 + i; + long version = 1; + String randomId = UUID.randomUUID().toString(); + Column ecv = new Column(testColumn); + for (long j = 0 ; j < 10; ++j) + { + ecv.addValue(j); + } + + WriteRequest wr = new WriteRequest(entityId, version, randomId, ecv); + writeRequests.add(wr); + } + cfw.write(writeRequests); + + StreamProduct result = cfw.buildInputStreamV2(Compression.NONE); + assertNotNull(result); + byte[] bytes = bytesFromStreamProduct(result); + assertEquals(result.getByteSize(), bytes.length); + assertTrue(bytes.length > 0); + + runColumnFileListener(bytes, printListener()); + FoundValueIndex fvi = new FoundValueIndex(); + runColumnFileListener(bytes, new ColumnFileListener() + { + @Override public int columnFileSection(ColumnFileSection armorSection, ColumnMetadata metadata, DataInputStream inputStream, int compressedLength, int uncompressedLength) + { + if (armorSection == ColumnFileSection.EXTENDED_INDEX) { + assertEquals(compressedLength, 0); + assertEquals(uncompressedLength, 569); + + try + { + long uuid = inputStream.readLong(); + assertEquals(0x31460001, uuid); + byte[] raw = new byte[561]; + int bytesRead = 0; + bytesRead = inputStream.read(raw); + + ValueIndexWriter x = new ValueIndexWriter(columnShardId); + x.load(new ByteArrayInputStream(raw)); + Set vals = x.getValToEntities().get(0L); + assertEquals(10, vals.size()); + fvi.foundValueIndex = true; + return bytesRead; + } + catch (IOException e) + { + throw new RuntimeException("Unexpected IO exception within columnFileSection", e); + } + } + return 0; + } + }); + assertTrue(fvi.foundValueIndex, "found value index"); + return; + } + private ColumnFileListener printListener() { return new ColumnFileListener() { @Override