Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/java/de/bwaldvogel/mongo/MongoBackend.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Collection;
import java.util.List;

import de.bwaldvogel.mongo.backend.DatabaseCommand;
import de.bwaldvogel.mongo.backend.QueryResult;
import de.bwaldvogel.mongo.bson.Document;
import de.bwaldvogel.mongo.wire.message.MongoMessage;
Expand All @@ -14,7 +15,7 @@ public interface MongoBackend {

void handleClose(Channel channel);

Document handleCommand(Channel channel, String database, String command, Document query);
Document handleCommand(Channel channel, String database, DatabaseCommand command, Document query);

QueryResult handleQuery(MongoQuery query);

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/de/bwaldvogel/mongo/MongoDatabase.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package de.bwaldvogel.mongo;

import de.bwaldvogel.mongo.backend.CollectionOptions;
import de.bwaldvogel.mongo.backend.DatabaseCommand;
import de.bwaldvogel.mongo.backend.DatabaseResolver;
import de.bwaldvogel.mongo.backend.QueryResult;
import de.bwaldvogel.mongo.bson.Document;
Expand All @@ -14,7 +15,7 @@ public interface MongoDatabase {

void handleClose(Channel channel);

Document handleCommand(Channel channel, String command, Document query, DatabaseResolver databaseResolver, Oplog oplog);
Document handleCommand(Channel channel, DatabaseCommand command, Document query, DatabaseResolver databaseResolver, Oplog oplog);

QueryResult handleQuery(MongoQuery query);

Expand Down
193 changes: 103 additions & 90 deletions core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoBackend.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -152,57 +151,54 @@ private Document getLog(String argument) {
return response;
}

private Document handleAdminCommand(String command, Document query) {
if (command.equalsIgnoreCase("listdatabases")) {
List<Document> databases = listDatabaseNames().stream()
.sorted()
.map(databaseName -> {
MongoDatabase database = openOrCreateDatabase(databaseName);
Document dbObj = new Document("name", database.getDatabaseName());
dbObj.put("empty", Boolean.valueOf(database.isEmpty()));
return dbObj;
})
.collect(Collectors.toList());
Document response = new Document();
response.put("databases", databases);
Utils.markOkay(response);
return response;
} else if (command.equalsIgnoreCase("find")) {
String collectionName = (String) query.get(command);
if (collectionName.equals("$cmd.sys.inprog")) {
return Utils.firstBatchCursorResponse(collectionName, new Document("inprog", Collections.emptyList()));
} else {
throw new NoSuchCommandException(new Document(command, collectionName).toString());
private Document handleAdminCommand(DatabaseCommand command, Document query) {
return switch (command.getCommand()) {
case LIST_DATABASES -> handleListDatabases();
case FIND -> handleFind(command, query);
case REPL_SET_GET_STATUS -> throw new NoReplicationEnabledException();
case GET_LOG -> {
final Object argument = query.get(command.getQueryValue());
yield getLog(argument == null ? null : argument.toString());
}
case RENAME_COLLECTION -> handleRenameCollection(command.getQueryValue(), query);
case GET_LAST_ERROR -> {
log.debug("getLastError on admin database");
yield successResponse();
}
} else if (command.equalsIgnoreCase("replSetGetStatus")) {
throw new NoReplicationEnabledException();
} else if (command.equalsIgnoreCase("getLog")) {
final Object argument = query.get(command);
return getLog(argument == null ? null : argument.toString());
} else if (command.equalsIgnoreCase("renameCollection")) {
return handleRenameCollection(command, query);
} else if (command.equalsIgnoreCase("getLastError")) {
log.debug("getLastError on admin database");
return successResponse();
} else if (command.equalsIgnoreCase("connectionStatus")) {
Document response = new Document();
response.append("authInfo", new Document()
.append("authenticatedUsers", Collections.emptyList())
.append("authenticatedUserRoles", Collections.emptyList())
);
Utils.markOkay(response);
return response;
} else if (command.equalsIgnoreCase("hostInfo")) {
return handleHostInfo();
} else if (command.equalsIgnoreCase("getCmdLineOpts")) {
return handleGetCmdLineOpts();
} else if (command.equalsIgnoreCase("getFreeMonitoringStatus")) {
return handleGetFreeMonitoringStatus();
} else if (command.equalsIgnoreCase("endSessions")) {
log.debug("endSessions on admin database");
return successResponse();
case CONNECTION_STATUS -> handleConnectionStatus();
case HOST_INFO -> handleHostInfo();
case GET_CMD_LINE_OPTS -> handleGetCmdLineOpts();
case GET_FREE_MONITORING_STATUS -> handleGetFreeMonitoringStatus();
case END_SESSIONS -> {
log.debug("endSessions on admin database");
yield successResponse();
}
default -> throw new NoSuchCommandException(command.getQueryValue());
};
}

private Document handleListDatabases() {
List<Document> databases = listDatabaseNames().stream()
.sorted()
.map(databaseName -> {
MongoDatabase database = openOrCreateDatabase(databaseName);
Document dbObj = new Document("name", database.getDatabaseName());
dbObj.put("empty", Boolean.valueOf(database.isEmpty()));
return dbObj;
})
.toList();
Document response = new Document();
response.put("databases", databases);
Utils.markOkay(response);
return response;
}

private static Document handleFind(DatabaseCommand command, Document query) {
String collectionName = query.get(command.getQueryValue()).toString();
if (collectionName.equals("$cmd.sys.inprog")) {
return Utils.firstBatchCursorResponse(collectionName, new Document("inprog", Collections.emptyList()));
} else {
throw new NoSuchCommandException(command);
throw new NoSuchCommandException(new Document(command.getQueryValue(), collectionName).toString());
}
}

Expand All @@ -212,6 +208,16 @@ private static Document successResponse() {
return response;
}

private static Document handleConnectionStatus() {
Document response = new Document();
response.append("authInfo", new Document()
.append("authenticatedUsers", Collections.emptyList())
.append("authenticatedUserRoles", Collections.emptyList())
);
Utils.markOkay(response);
return response;
}

private Document handleHostInfo() {
Document response = new Document();
String osName = System.getProperty("os.name");
Expand Down Expand Up @@ -299,45 +305,24 @@ private MongoCollection<?> resolveCollection(String namespace) {
protected abstract MongoDatabase openOrCreateDatabase(String databaseName);

@Override
public Document handleCommand(Channel channel, String databaseName, String command, Document query) {
if (command.equalsIgnoreCase("whatsmyuri")) {
Document response = new Document();
InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress();
response.put("you", remoteAddress.getAddress().getHostAddress() + ":" + remoteAddress.getPort());
Utils.markOkay(response);
return response;
} else if (command.equalsIgnoreCase("ismaster")) {
Document response = new Document("ismaster", Boolean.TRUE);
response.put("maxBsonObjectSize", Integer.valueOf(BsonConstants.MAX_BSON_OBJECT_SIZE));
response.put("maxWriteBatchSize", Integer.valueOf(MongoWireProtocolHandler.MAX_WRITE_BATCH_SIZE));
response.put("maxMessageSizeBytes", Integer.valueOf(MongoWireProtocolHandler.MAX_MESSAGE_SIZE_BYTES));
response.put("maxWireVersion", Integer.valueOf(version.getWireVersion()));
response.put("minWireVersion", Integer.valueOf(0));
response.put("localTime", Instant.now(clock));
Utils.markOkay(response);
return response;
} else if (command.equalsIgnoreCase("buildinfo")) {
Document response = new Document("version", version.toVersionString());
response.put("versionArray", version.getVersionArray());
response.put("maxBsonObjectSize", Integer.valueOf(BsonConstants.MAX_BSON_OBJECT_SIZE));
Utils.markOkay(response);
return response;
} else if (command.equalsIgnoreCase("dropDatabase")) {
return handleDropDatabase(databaseName);
} else if (command.equalsIgnoreCase("getMore")) {
return handleGetMore(databaseName, command, query);
} else if (command.equalsIgnoreCase("killCursors")) {
return handleKillCursors(query);
} else if (command.equalsIgnoreCase("ping")) {
return successResponse();
} else if (command.equalsIgnoreCase("serverStatus")) {
return getServerStatus();
} else if (databaseName.equals(ADMIN_DB_NAME)) {
return handleAdminCommand(command, query);
}

MongoDatabase mongoDatabase = resolveDatabase(databaseName);
return mongoDatabase.handleCommand(channel, command, query, this::resolveDatabase, oplog);
public Document handleCommand(Channel channel, String databaseName, DatabaseCommand command, Document query) {
return switch (command.getCommand()) {
case WHATS_MY_URI -> handleWhatsMyUri(channel);
case IS_MASTER -> handleIsMaster();
case BUILD_INFO -> handleBuildInfo();
case DROP_DATABASE -> handleDropDatabase(databaseName);
case GET_MORE -> handleGetMore(databaseName, command.getQueryValue(), query);
case KILL_CURSORS -> handleKillCursors(query);
case PING -> successResponse();
case SERVER_STATUS -> getServerStatus();
default -> {
if (databaseName.equals(ADMIN_DB_NAME)) {
yield handleAdminCommand(command, query);
}
MongoDatabase mongoDatabase = resolveDatabase(databaseName);
yield mongoDatabase.handleCommand(channel, command, query, this::resolveDatabase, oplog);
}
};
}

@Override
Expand All @@ -356,6 +341,34 @@ public void closeCursors(List<Long> cursorIds) {
cursorIds.forEach(cursorRegistry::remove);
}

private static Document handleWhatsMyUri(Channel channel) {
Document response = new Document();
InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress();
response.put("you", remoteAddress.getAddress().getHostAddress() + ":" + remoteAddress.getPort());
Utils.markOkay(response);
return response;
}

private Document handleIsMaster() {
Document response = new Document("ismaster", Boolean.TRUE);
response.put("maxBsonObjectSize", Integer.valueOf(BsonConstants.MAX_BSON_OBJECT_SIZE));
response.put("maxWriteBatchSize", Integer.valueOf(MongoWireProtocolHandler.MAX_WRITE_BATCH_SIZE));
response.put("maxMessageSizeBytes", Integer.valueOf(MongoWireProtocolHandler.MAX_MESSAGE_SIZE_BYTES));
response.put("maxWireVersion", Integer.valueOf(version.getWireVersion()));
response.put("minWireVersion", Integer.valueOf(0));
response.put("localTime", Instant.now(clock));
Utils.markOkay(response);
return response;
}

private Document handleBuildInfo() {
Document response = new Document("version", version.toVersionString());
response.put("versionArray", version.getVersionArray());
response.put("maxBsonObjectSize", Integer.valueOf(BsonConstants.MAX_BSON_OBJECT_SIZE));
Utils.markOkay(response);
return response;
}

protected Document handleKillCursors(Document query) {
List<Long> cursorIds = (List<Long>) query.get("cursors");
List<Long> cursorsKilled = new ArrayList<>();
Expand Down Expand Up @@ -409,7 +422,7 @@ public Document handleMessage(MongoMessage message) {
Channel channel = message.getChannel();
String databaseName = message.getDatabaseName();
Document query = message.getDocument();
String command = query.keySet().iterator().next();
DatabaseCommand command = DatabaseCommand.of(query.keySet().iterator().next());
return handleCommand(channel, databaseName, command, query);
}

Expand Down
Loading
Loading