Skip to content

Commit 0ca7f5d

Browse files
committed
feat: Add Communication Module in Java
1 parent 9d30da8 commit 0ca7f5d

File tree

9 files changed

+541
-9
lines changed

9 files changed

+541
-9
lines changed

adf_core_python/core/gateway/gateway_agent.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from typing import Optional, TYPE_CHECKING, Callable
44

5-
from rcrs_core.connection import RCRSProto_pb2
5+
from rcrs_core.connection import RCRSProto_pb2, URN
66

77
from adf_core_python.core.agent.info.agent_info import AgentInfo
88
from adf_core_python.core.agent.info.scenario_info import ScenarioInfo
@@ -111,3 +111,8 @@ def message_received(self, msg: RCRSProto_pb2) -> None:
111111

112112
self._gateway_modules[c_msg.module_id].set_execute_response(c_msg.result)
113113
self._gateway_modules[c_msg.module_id].set_is_executed(True)
114+
115+
if msg.urn == URN.Command.AK_SPEAK:
116+
if self.send_msg is None:
117+
raise RuntimeError("send_msg is None")
118+
self.send_msg(msg)

adf_core_python/launcher.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import argparse
2+
import resource
23

34
from adf_core_python.core.config.config import Config
45
from adf_core_python.core.launcher.agent_launcher import AgentLauncher
@@ -11,6 +12,8 @@ def __init__(
1112
self,
1213
launcher_config_file: str,
1314
) -> None:
15+
resource.setrlimit(resource.RLIMIT_NOFILE, (8192, 9223372036854775807))
16+
1417
configure_logger()
1518

1619
self.logger = get_logger(__name__)

java/lib/src/main/java/adf_core_python/agent/Agent.java

Lines changed: 65 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
package adf_core_python.agent;
22

3-
import adf.core.agent.communication.MessageManager;
3+
import adf.core.agent.communication.standard.bundle.StandardMessageBundle;
44
import adf.core.agent.info.ScenarioInfo;
55
import adf.core.agent.info.WorldInfo;
6+
import adf.core.launcher.ConsoleOutput;
7+
import adf_core_python.agent.communication.MessageManager;
8+
import adf_core_python.agent.communication.standard.StandardCommunicationModule;
69
import adf_core_python.agent.config.ModuleConfig;
710
import adf_core_python.agent.develop.DevelopData;
811
import adf_core_python.agent.info.AgentInfo;
912
import adf_core_python.agent.module.ModuleManager;
1013
import adf_core_python.agent.precompute.PrecomputeData;
14+
import adf_core_python.component.communication.CommunicationModule;
1115
import adf_core_python.component.module.AbstractModule;
16+
import adf_core_python.gateway.Coordinator;
1217
import adf_core_python.gateway.mapper.AbstractMapper;
1318
import adf_core_python.gateway.mapper.MapperDict;
1419
import jakarta.annotation.Nonnull;
@@ -17,40 +22,50 @@
1722
import org.apache.logging.log4j.Logger;
1823
import rescuecore2.config.Config;
1924
import rescuecore2.messages.Command;
25+
import rescuecore2.messages.Message;
2026
import rescuecore2.standard.entities.StandardEntityURN;
2127
import rescuecore2.standard.entities.StandardWorldModel;
28+
import rescuecore2.standard.messages.AKSubscribe;
2229
import rescuecore2.worldmodel.ChangeSet;
2330
import rescuecore2.worldmodel.Entity;
2431
import rescuecore2.worldmodel.EntityID;
2532

2633
import java.lang.reflect.Constructor;
2734
import java.lang.reflect.InvocationTargetException;
35+
import java.util.Arrays;
2836
import java.util.Collection;
2937
import java.util.HashMap;
3038
import java.util.Objects;
3139

3240
public class Agent {
33-
private final AgentInfo agentInfo;
34-
private final WorldInfo worldInfo;
35-
private final ScenarioInfo scenarioInfo;
41+
public final AgentInfo agentInfo;
42+
public final WorldInfo worldInfo;
43+
public final ScenarioInfo scenarioInfo;
3644
private final ModuleManager moduleManager;
3745
private final DevelopData developData;
3846
private final PrecomputeData precomputeData;
3947
private final MessageManager messageManager;
48+
private CommunicationModule communicationModule;
4049
private final HashMap<String, AbstractMapper> modules = new HashMap<>();
4150
private final MapperDict mapperDict;
4251
private final Logger logger;
52+
private int ignoreTime;
53+
private final Coordinator coordinator;
4354

44-
public Agent(EntityID entityID, Collection<Entity> entities, ScenarioInfo scenarioInfo, DevelopData developData, ModuleConfig moduleConfig) {
55+
public Agent(EntityID entityID, Collection<Entity> entities, ScenarioInfo scenarioInfo, DevelopData developData, ModuleConfig moduleConfig, Coordinator coordinator) {
4556
StandardWorldModel worldModel = new StandardWorldModel();
4657
worldModel.addEntities(entities);
4758
worldModel.index();
4859

60+
this.ignoreTime = scenarioInfo.getRawConfig()
61+
.getIntValue(kernel.KernelConstants.IGNORE_AGENT_COMMANDS_KEY);
62+
4963
this.agentInfo = new AgentInfo(entityID, worldModel);
5064
this.worldInfo = new WorldInfo(worldModel);
5165
this.scenarioInfo = scenarioInfo;
5266
this.developData = developData;
5367
this.moduleManager = new ModuleManager(this.agentInfo, this.worldInfo, this.scenarioInfo, moduleConfig, this.developData);
68+
this.coordinator = coordinator;
5469

5570
String dataStorageName = "";
5671
StandardEntityURN agentURN = Objects.requireNonNull(this.worldInfo.getEntity(this.agentInfo.getID())).getStandardURN();
@@ -97,13 +112,48 @@ public Class<?> registerModule(@Nonnull String moduleID, @Nonnull String moduleN
97112
}
98113

99114
public void update(int time, ChangeSet changed, Collection<Command> heard) {
115+
worldInfo.setTime(time);
116+
worldInfo.merge(changed);
100117
agentInfo.recordThinkStartTime();
101118
agentInfo.setTime(time);
119+
120+
if (time == 1) {
121+
if (this.communicationModule != null) {
122+
ConsoleOutput.out(ConsoleOutput.State.ERROR,
123+
"[ERROR ] Loader is not found.");
124+
ConsoleOutput.out(ConsoleOutput.State.NOTICE,
125+
"CommunicationModule is modified - " + this);
126+
} else {
127+
this.communicationModule = new StandardCommunicationModule();
128+
}
129+
130+
this.messageManager.registerMessageBundle(new StandardMessageBundle());
131+
}
132+
133+
// agents can subscribe after ignore time
134+
if (time >= ignoreTime) {
135+
this.messageManager.subscribe(this.agentInfo, this.worldInfo,
136+
this.scenarioInfo);
137+
138+
if (!this.messageManager.getIsSubscribed()) {
139+
int[] channelsToSubscribe = this.messageManager.getChannels();
140+
if (channelsToSubscribe != null) {
141+
this.messageManager.setIsSubscribed(true);
142+
}
143+
}
144+
}
145+
102146
agentInfo.setHeard(heard);
103147
agentInfo.setChanged(changed);
104-
worldInfo.setTime(time);
105-
worldInfo.merge(changed);
106148
worldInfo.setChanged(changed);
149+
150+
this.messageManager.refresh();
151+
this.communicationModule.receive(this, this.messageManager);
152+
153+
this.messageManager.coordinateMessages(this.agentInfo, this.worldInfo,
154+
this.scenarioInfo);
155+
this.communicationModule.send(this, this.messageManager);
156+
107157
logger.debug("Agent Update (Time: {}, Changed: {}, Heard: {})", agentInfo.getTime(), agentInfo.getChanged(), agentInfo.getHeard());
108158
}
109159

@@ -113,4 +163,12 @@ public Config execModuleMethod(String moduleID, String methodName, Config argume
113163
logger.debug("Executed Method Result (MethodName: {}, Result: {}", methodName, result);
114164
return result;
115165
}
166+
167+
public EntityID getID() {
168+
return this.agentInfo.getID();
169+
}
170+
171+
public void send(Message[] messages) {
172+
Arrays.stream(messages).forEach(coordinator::sendMessage);
173+
}
116174
}

0 commit comments

Comments
 (0)