Skip to content
This repository was archived by the owner on Mar 12, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@

@FunctionalInterface
public interface ColumnFileListener {
/**
* Listener callback. For each section of the column file other than the metadata, this will be called.
* @param armorSection file section type
* @param metadata metadata information for this column file.
* @param inputStream data to be read.
* @param compressedLength if the data is compressed, this value will be > 0. If uncompressed, value will be 0.
* @param uncompressedLength the uncompressed length of the data. if compressedLength is 0, this is the number of bytes available to read.
* @return number of bytes read by this function from inputStream . For backward compatibility, function should return 0 bytes for unhandled @{ColumnFileSection}s, such as unknown types.
*/
int columnFileSection(
ColumnFileSection armorSection, ColumnMetadata metadata, DataInputStream inputStream, int compressedLength, int uncompressedLength);
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,16 @@ private void readV2(DataInputStream dataInputStream, ColumnFileListener listener
int readBytes = 0;
if (entry.sectionType == ColumnFileSection.METADATA) {
readBytes = readMetadata(dataInputStream);
} else
{
} else {
readBytes = readSection(dataInputStream, listener, entry.sectionType);
}

totalBytesRead += readBytes;
}
}

// Reads the section by calling back to the listener, if the listener exists.
// listener must return the # of bytes it has read from the dataInputStream.
private int readSection(DataInputStream dataInputStream, ColumnFileListener listener, ColumnFileSection sectionType)
throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ public enum ColumnFileSection {
ENTITY_DICTIONARY(2),
VALUE_DICTIONARY(3),
ENTITY_INDEX(4),
ROWGROUP(5);
ROWGROUP(5),
EXTENDED_INDEX(6);

private final int sectionID;

Expand Down
7 changes: 7 additions & 0 deletions armor-base/src/main/java/com/rapid7/armor/io/IOTools.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,11 @@ public static byte[] toByteArray(int value) {
(byte) (value >> 24), (byte) (value >> 16), (byte) (value >> 8), (byte) value
};
}

public static byte[] toByteArray(long value) {
return new byte[] {
(byte) (value >> 56), (byte) (value >> 48), (byte) (value >> 40), (byte) (value >> 32),
(byte) (value >> 24), (byte) (value >> 16), (byte) (value >> 8), (byte) value
};
}
}
46 changes: 46 additions & 0 deletions armor-base/src/main/java/com/rapid7/armor/schema/DataType.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,4 +207,50 @@ public void traverseByteBuffer(ByteBuffer valueByteBuffer, RoaringBitmap nilBitM
rowCount++;
}
}

public Number fromString(String key)
{
if ("null".equals(key)) {
return null;
} else {
switch (this) {
case LONG:
case DATETIME:
return Long.parseLong(key);
case FLOAT:
return Float.parseFloat(key);
case INTEGER:
case STRING:
return Integer.parseInt(key);
case BOOLEAN:
return Byte.parseByte(key);
case DOUBLE:
return Double.parseDouble(key);
}
}
throw new IllegalStateException("unknown object type");
}

public String toString(Object val) {
if (val == null) {
return "null";
} else {
switch (this) {
case LONG:
case DATETIME:
return Long.toString((Long) val);
case FLOAT:
return Float.toString((Float) val);
case INTEGER:
case STRING:
return Integer.toString((Integer) val);
case BOOLEAN:
return Byte.toString((Byte) val);
case DOUBLE:
return Double.toString((Double) val);
}
}
throw new IllegalStateException("unknown object type");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.rapid7.armor.write.component;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;

public class ComponentHeaderWriter implements Component
{
private final Component component;
private final byte[] header;

public ComponentHeaderWriter(byte[] header, Component c)
{
this.component = c;
this.header = header;
}

@Override public InputStream getInputStream()
throws IOException
{
return new SequenceInputStream(new ByteArrayInputStream(this.header), component.getInputStream());
}

@Override public long getCurrentSize()
throws IOException
{
return this.header.length + component.getCurrentSize();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.rapid7.armor.write.component;

import java.io.IOException;
import java.io.InputStream;

public interface ExtendedIndexWriter extends Component, IndexUpdater {
/**
* an integer extended type which will be used for serialization and dispatch in ColumnFileWriter.
* The ExtendedIndex section will include a section header that indicates which type it is.
* Thus, extendedType must be unique.
*
* @return the extendedType value.
*/
long extendedType();

void load(InputStream data) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.rapid7.armor.write.component;

import com.rapid7.armor.shard.ColumnShardId;

public interface ExtendedIndexWriterFactory {
ExtendedIndexWriter constructWriter(ColumnShardId columnShardId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.rapid7.armor.write.component;

import java.util.Collection;

public interface IndexUpdater
{

void add(Number value, Integer entityId, Integer rowCount);

void finish();

/**
* create an indexUpdater that wraps a collection of updaters.
* @return wrapping indexUpdater
*/
static IndexUpdater listIndexUpdater(final Collection<? extends IndexUpdater> updaters) {
IndexUpdater result = new IndexUpdater() {

@Override public void add(Number value, Integer entityId, Integer rowCount)
{
for (IndexUpdater item : updaters) {
item.add(value, entityId, rowCount);
}
}

@Override public void finish()
{
for (IndexUpdater item : updaters) {
item.finish();
}
}
};
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public void customTraverseThoughValues(List<EntityRecord> records, Consumer<List

private class MetadataUpdater {
private final ColumnMetadata metadata;
private final IndexUpdater indexUpdater;
Double prevMax;
Double prevMin;
Set<Object> cardinality = new HashSet<>();
Expand All @@ -187,8 +188,9 @@ private class MetadataUpdater {
private boolean success;


MetadataUpdater(ColumnMetadata metadata) {
MetadataUpdater(ColumnMetadata metadata, IndexUpdater indexUpdater) {
this.metadata = metadata;
this.indexUpdater = indexUpdater;
prevMax = metadata.getMaxValue();
prevMax = metadata.getMinValue();

Expand Down Expand Up @@ -232,6 +234,11 @@ public void updateRow(EntityRecord eir) throws IOException {
if (nilRb == null || !nilRb.contains(rowCount)) {
metadata.handleMinMax(value);
cardinality.add(value);
if(indexUpdater != null)
indexUpdater.add(value, eir.getEntityId(), rowCount);
} else {
if(indexUpdater != null)
indexUpdater.add(null, eir.getEntityId(), rowCount);
}
});
}
Expand All @@ -258,6 +265,9 @@ private ByteBuffer insureByteBufferIsBigEnough(int len, ByteBuffer buf) {

public void finishUpdate() {
metadata.setCardinality(cardinality.size());
if (indexUpdater != null) {
indexUpdater.finish();
}
success = true;
}
}
Expand All @@ -278,7 +288,7 @@ public void finishUpdate() {
// from within the for loop.
public void runThoughValues(ColumnMetadata metadata, List<EntityRecord> records) throws IOException {
long previousPosition = position();
MetadataUpdater metadataUpdater = new MetadataUpdater(metadata);
MetadataUpdater metadataUpdater = new MetadataUpdater(metadata, null);
try {
for (EntityRecord eir : records) {
if (eir.getDeleted() == 1)
Expand Down Expand Up @@ -452,13 +462,13 @@ private RgOffsetWriteResult fillNullValues(ByteBuffer bytesBuffer, int numNull)
* @throws IOException If an io error occurs.
*/
public List<EntityRecord> compact(List<EntityRecord> entitiesToKeep) throws IOException {
return compactAndUpdateMetadata(entitiesToKeep, null);
return compactAndUpdateMetadata(entitiesToKeep, null, null);
}

public List<EntityRecord> compactAndUpdateMetadata(List<EntityRecord> entitiesToKeep, ColumnMetadata metadata) throws IOException {
public List<EntityRecord> compactAndUpdateMetadata(List<EntityRecord> entitiesToKeep, ColumnMetadata metadata, IndexUpdater indexUpdater) throws IOException {
MetadataUpdater metadataUpdater = null;
if (metadata != null) {
metadataUpdater = new MetadataUpdater(metadata);
metadataUpdater = new MetadataUpdater(metadata, indexUpdater);
}
int totalRequiredBytes = entitiesToKeep.stream().mapToInt(EntityRecord::totalLength).sum();
ByteBuffer output = ByteBuffer.allocate(totalRequiredBytes * 2);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package com.rapid7.armor.write.component;

import com.rapid7.armor.schema.DataType;
import com.rapid7.armor.shard.ColumnShardId;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
*
*/
public class ValueIndexWriter implements ExtendedIndexWriter {
private final ColumnShardId columnShardId;
private final DataType dataType;
private Map<Number, Set<Integer>> valToEntities = new HashMap<>();
private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();

public ValueIndexWriter(ColumnShardId columnShardId) {
this.columnShardId = columnShardId;
this.dataType = columnShardId.getColumnId().dataType();
}

@Override public long extendedType()
{
return 0x31460001L; // value must be globally unique
}

@Override public void load(InputStream json) throws IOException {
@SuppressWarnings("unchecked")
Map<String, List<Integer>> map = OBJECT_MAPPER.readValue(json, Map.class);
valToEntities = new HashMap<>();
int highestSurrogate = -1;
for (Map.Entry<String, List<Integer> > e : map.entrySet()) {
Number val = dataType.fromString(e.getKey());
valToEntities.put(val, new HashSet<>(e.getValue()));
}
}

public Map<Number, Set<Integer>> getValToEntities() { return valToEntities; }

public boolean isEmpty() {
return valToEntities.isEmpty();
}

private byte[] toBytes() throws JsonProcessingException {
HashMap<String, List<Integer>> serializable = new HashMap<>();
for (Map.Entry<Number, Set<Integer>> e : valToEntities.entrySet()) {
serializable.put(this.dataType.toString(e.getKey()), new ArrayList<>(e.getValue()));
}
return OBJECT_MAPPER.writeValueAsBytes(serializable);
}

@Override
public InputStream getInputStream() throws IOException {
return new ByteArrayInputStream(toBytes());
}

@Override
public long getCurrentSize() throws IOException {
return toBytes().length;
}



@Override public void add(Number value, Integer entityId, Integer rowCount)
{
Set<Integer> entities = this.valToEntities.get(value);
if (entities == null) {
entities = new HashSet<>();
this.valToEntities.put(value, entities);
}
entities.add(entityId);
}

@Override public void finish()
{
// don't need to do anything
}
}
Loading