-
Notifications
You must be signed in to change notification settings - Fork 21
Description
The BagFile.generateIndexesForTopicList method is very inefficient:
It iterates through all connections for the requested topics, reading or decompressing all chunks containing those connections, then reading or decompressing all messages within each chunk, then filtering for messages with the requested connection id. If you supply a list of N topics, then assuming chunks contain messages of each connection id, the entire file is read or decompressed up to N times. For a large file (gigabytes), this is extremely inefficient. Also, the current indexing method involves a lot of non-consecutive seeking, which is very slow on a spinning disk.
Instead the loop nesting should be inverted so that only relevant chunks are read and processed, so that the file is only read once.
Relatedly, another issue with the current API is that there is no way to easily and efficiently iterate through all records out of a given topic list in a bag file, particularly if you need the bagfile timestamp (as opposed to the record header timestamp) -- see #22. Right now the only way to accomplish this is to call BagFile.generateIndexesForTopicList to produce a List<MessageIndex> (which may process the file multiple times, as described above), then for each MessageIndex (which has the required timestamp field), call BagFile.getMessageFromIndex to get a MessageType -- however this results in yet another reading and/or decompression of chunks and/or messages.
Here is a solution. These are all static methods, so that they can be used right now by anyone needing this functionality, without needing to modify the bag-reader-java library -- but they should probably be turned into non-static methods of the BagFile class (removing the BagFile file parameter and replacing with this).
In particular this code provides a significantly more efficient version of BagFile.generateIndexesForTopicList, and also a new method processMessagesForTopicList that iterates through all topics on a given topic list in an efficient way, loading/processing data only a single time.
/** Process a bag file record. */
private static interface MessageProcessorInternal {
public void process(Connection conn, Timestamp timestamp, long chunkPos,
long position, ByteBuffer data) throws BagReaderException;
}
/**
* Efficiently iterate through all messages in a bag file for a given topic
* list.
*/
private static void processAllMessagesForTopic(BagFile file,
List<String> topics, ProgressMonitor progressMonitor,
MessageProcessorInternal messageProcessorInternal)
throws BagReaderException, InterruptedException {
// Find connections corresponding to requested topics
Set<String> topicsSet = new HashSet<>();
for (String topic : topics) {
topicsSet.add(topic);
}
Map<Integer, Connection> connIdToConnForRequestedTopics = new HashMap<>();
for (Connection conn : file.getConnections()) {
if (topicsSet.contains(conn.getTopic())) {
connIdToConnForRequestedTopics.put(conn.getConnectionId(),
conn);
}
}
List<ChunkInfo> chunkInfos = file.getChunkInfos().stream()
.sorted(new Comparator<ChunkInfo>() {
// Sort chunkInfos into ascending order, to ensure chunks
// are accessed sequentially, to save on seek time
@Override
public int compare(ChunkInfo ci0, ChunkInfo ci1) {
return Long.compare(ci0.getChunkPos(),
ci1.getChunkPos());
}
}).filter(new Predicate<ChunkInfo>() {
// Filter for chunkInfos that contain requested topics
@Override
public boolean test(ChunkInfo chunkInfo) {
for (ChunkConnection chunkConn : chunkInfo
.getConnections()) {
if (connIdToConnForRequestedTopics
.containsKey(chunkConn.getConnectionId())) {
return true;
}
}
return false;
}
}).collect(Collectors.toList());
try (FileChannel channel = file.getChannel()) {
int chunkNum = 0;
int numChunks = chunkInfos.size();
if (progressMonitor != null) {
progressMonitor.setMinimum(0);
progressMonitor.setMaximum(numChunks);
if (progressMonitor.isCanceled()) {
throw new InterruptedException();
}
}
// Iterate through all chunks in bag file
for (ChunkInfo chunkInfo : chunkInfos) {
long chunkPos = chunkInfo.getChunkPos();
if (chunkNum % 500 == 0) {
myLogger.info("\t\tChunk " + chunkNum + "/" + numChunks);
}
if (progressMonitor != null) {
if (progressMonitor.isCanceled()) {
progressMonitor.setNote("canceling");
throw new InterruptedException("canceled indexing");
}
progressMonitor.setProgress(chunkNum);
}
chunkNum++;
// Decompress chunk if necessary
Record chunk = BagFile.recordAt(channel, chunkPos);
chunk.readData();
try (ByteBufferChannel chunkChannel = new ByteBufferChannel(
chunk.getData())) {
while (chunkChannel.position() < chunkChannel.size()) {
// Decompress message if necessary
long position = chunkChannel.position();
Record msg = new Record(chunkChannel);
ByteBuffer data = msg.getData()
.order(ByteOrder.LITTLE_ENDIAN);
Header header = msg.getHeader();
if (header.getType() == Record.RecordType.MESSAGE_DATA) {
// Check if message has one of the requested topics
int connId = header.getInt("conn");
Connection conn = connIdToConnForRequestedTopics
.get(connId);
if (conn != null) {
// Message has requested topic, process it
Timestamp timestamp = header.getTimestamp("time");
messageProcessorInternal.process(conn,
timestamp, chunkPos, position, data);
}
}
}
}
}
} catch (IOException e) {
throw new BagReaderException(e);
}
}
/**
* More efficient version of {#link
* {@link BagFile#generateIndexesForTopicList(List, ProgressMonitor)}}.
*/
public static List<MessageIndex> generateIndexesForTopicList(BagFile file,
List<String> topics, ProgressMonitor progressMonitor)
throws BagReaderException, InterruptedException {
List<MessageIndex> messageIndexList = new ArrayList<>();
processAllMessagesForTopic(file, topics, progressMonitor,
new MessageProcessorInternal() {
@Override
public void process(Connection conn, Timestamp timestamp,
long chunkPos, long position, ByteBuffer data) {
messageIndexList.add(new MessageIndex(chunkPos,
position, conn.getTopic(), timestamp));
}
});
return messageIndexList;
}
/** Process a message from a bag file. */
public static interface MessageProcessor {
public void process(String topic, Timestamp timestamp,
MessageType messageType) throws UninitializedFieldException,
BagReaderException;
}
/** Process all messages from a given topic list. */
public static void processMessagesForTopicList(BagFile file,
List<String> topics, ProgressMonitor progressMonitor,
MessageProcessor messageProcessor)
throws BagReaderException, InterruptedException {
processAllMessagesForTopic(file, topics, progressMonitor,
new MessageProcessorInternal() {
@Override
public void process(Connection conn, Timestamp timestamp,
long chunkPos, long position, ByteBuffer data)
throws BagReaderException {
// Deserialize and process message
MessageType messageType;
try {
messageType = conn.getMessageCollection()
.getMessageType();
} catch (UnknownMessageException e) {
throw new BagReaderException(e);
}
messageType.readMessage(data);
try {
messageProcessor.process(conn.getTopic(),
timestamp, messageType);
} catch (UninitializedFieldException e) {
throw new BagReaderException(e);
}
}
});
}