-
Notifications
You must be signed in to change notification settings - Fork 0
Create redis command router module #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…redis module and a services/command-router-redis module. Also add appropriate configurations to integration tests to run them against a Redis backed command router with a Redis server
|
Addresses eclipse-hono#3532 |
|
This comment is a reminder to myself. To run the integration tests against the redis command router: Currently I'm trying to run only the tests that fail... but they only fail when I run the whole integration test suite. If I limit the run to the tests that fail the tests don't fail. Here's the command I'm using to run only the failing tests: So what I'm going to try now is to run the tests once using the Finally, you shut down the running containers with: |
|
I have verified that when I run the sequence described above, in the while loop that repeatedly runs the tests, eventually the tests will fail with a timeout error (on the test run side). In the command router service logs there are log lines that display some warnings related to the Kafka client there. These log lines and warnings also seem to appear in the infinispan command router when I run the tests against that, but that does not result in the same timeout error. However, in the redis command router, when the tests error out, I did find the following error in the command router logs which does not appear in the logs for the infinispan based command router: This might be a red herring though... I have not confirmed that a similar error appears every time the tests error out. |
|
correction. The error mentioned above does seem to appear every now and then in the infinispan based command router, but to be clear, the test runs do not error out. So the error found in the command router logs does not seem to be related to the test errors. For reference, here is a similar error log line from the infinispan command router that happened while tests were running successfully: |
|
Sometimes the tests run successfully and sometimes they fail. There seems to be quite a variance in when they fail... sometimes they will fail on the first run but sometimes there will be 5-6 successful runs and then a run will fail. If a test run is repeated immediately after a failed test run (without rebooting the containers) the tests sometimes seem to fail consistently, indicating that the command router service may get into a corrupted state. However, this does not always happen, sometimes it is possible to get successful runs after a failed run. Attached is a log of a failed test run. Also, when the containers are stopped I'm seeing this error appear in the command router, however, it appears whether or not a test run has failed previously (so I always get this when shutting down the redis command router container in the integration tests): |
|
It seems that the failure happens eventually even if you only run that single test (although I've seen test from the Mqtt test fail as well. But this is sufficient to get the failure to appear eventually: This will eventually result in a failed test run and often (but not always) subsequent test runs will then fail consistently. Sometimes however, subsequent test runs pass successfully. |
sophokles73
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks pretty solid to me :-)
At first glance, I cannot see, why the tests should fail. I haven't run the tests yet but will first take another look at the integration test implementation. We sometimes have odd bugs regarding async behavior in them ...
tests/pom.xml
Outdated
| </profile> | ||
| </profiles> | ||
| </project> | ||
| </project> No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing EOL
| "io.quarkus.vertx.core.runtime": | ||
| level: DEBUG | ||
| vertx: | ||
| max-event-loop-execute-time: ${max.event-loop.execute-time} No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing EOL
| } | ||
| futures.add(api.set(params)); | ||
| }); | ||
| return Future.all(Collections.unmodifiableList(futures)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO there is no value in creating this list of futures because you are not doing anything with it anyway, right?
| }) | ||
| .compose(ignored -> api.exec()) | ||
| // null reply means transaction aborted | ||
| .map(Objects::nonNull) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arey you sure that this handles the response properly? Does api.exec() return a failed future if the transaction has aborted? Or does it return a succeeded future that contains an empty array? In the latter case, this code will need to be adapted in order to create a failed future ...
| Objects.requireNonNull(key); | ||
|
|
||
| return api.get(key) | ||
| .compose(value -> Future.succeededFuture(String.valueOf(value))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about
.map(value -> value.toString(UTF8));| // key does not exist | ||
| return Future.succeededFuture(false); | ||
| } | ||
| if (String.valueOf(response).equals(value)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FMPOV this should better be
| if (String.valueOf(response).equals(value)) { | |
| if (value.equals(response.toString(UTF8))) { |
| /** | ||
| * TODO. | ||
| */ | ||
| public class RedisConfigInterceptor extends RelocateConfigSourceInterceptor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FMPOV we should simply use the default Quarkus mechanism for configuring the connection to the Redis cache ...
| # | ||
| # SPDX-License-Identifier: EPL-2.0 | ||
|
|
||
| Args = -H:+RunReachabilityHandlersConcurrently |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this strictly necessary?
| <removeVolumes>true</removeVolumes> | ||
| <removeNamePattern>**/hono-*-test:*</removeNamePattern> | ||
| <stopNamePattern>hono-*-test-*,all-in-one-*,cp-*,postgres-*</stopNamePattern> | ||
| <stopNamePattern>hono-*-test-*,all-in-one-*,cp-*,postgres-*,redis-*</stopNamePattern> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't the Redis container be covered by the hono-*-test-* pattern?
|
@StFS I believe I have found the culprit. When the integration test sends the one-way commands at a high rate (as it does), the Command Router sometimes fails to query the Redis cache for the command handling adapter instances, which it needs to do for each command to be forwarded to a protocol adapter instance. The Jaeger trace includes the following stack trace: io.vertx.core.http.ConnectionPoolTooBusyException: Connection pool reached max wait queue size of 40
at io.vertx.core.net.impl.pool.SimpleConnectionPool$Acquire$6.run(SimpleConnectionPool.java:630)
at io.vertx.core.net.impl.pool.Task.runNextTasks(Task.java:43)
at io.vertx.core.net.impl.pool.CombinerExecutor.submit(CombinerExecutor.java:91)
at io.vertx.core.net.impl.pool.SimpleConnectionPool.execute(SimpleConnectionPool.java:244)
at io.vertx.core.net.impl.pool.SimpleConnectionPool.acquire(SimpleConnectionPool.java:639)
at io.vertx.core.net.impl.pool.SimpleConnectionPool.acquire(SimpleConnectionPool.java:643)
at io.vertx.redis.client.impl.RedisConnectionManager$RedisEndpoint.requestConnection(RedisConnectionManager.java:430)
at io.vertx.core.net.impl.pool.Endpoint.getConnection(Endpoint.java:41)
at io.vertx.core.net.impl.pool.ConnectionManager.getConnection(ConnectionManager.java:51)
at io.vertx.core.net.impl.pool.ConnectionManager.getConnection(ConnectionManager.java:40)
at io.vertx.redis.client.impl.RedisConnectionManager.getConnection(RedisConnectionManager.java:394)
at io.vertx.redis.client.impl.RedisClient.connect(RedisClient.java:36)
at io.vertx.redis.client.impl.BaseRedisClient.send(BaseRedisClient.java:42)
at io.quarkus.redis.runtime.client.ObservableRedis.send(ObservableRedis.java:83)
at io.vertx.redis.client.impl.RedisAPIImpl.send(RedisAPIImpl.java:56)
at io.vertx.redis.client.RedisAPI.mget(RedisAPI.java:3791)
at io.vertx.redis.client.RedisAPI_DVNloIyh16wAC4m7ssePIaeqbbc_Synthetic_ClientProxy.mget(Unknown Source)
at org.eclipse.hono.deviceconnection.redis.client.RedisCache.getAll(RedisCache.java:183)
at org.eclipse.hono.deviceconnection.common.CacheBasedDeviceConnectionInfo.getInstancesQueryingAllGatewaysFirst(CacheBasedDeviceConnectionInfo.java:320)
at org.eclipse.hono.deviceconnection.common.CacheBasedDeviceConnectionInfo.getCommandHandlingAdapterInstances(CacheBasedDeviceConnectionInfo.java:296)
at org.eclipse.hono.commandrouter.impl.CommandTargetMapperImpl.lambda$getTargetGatewayAndAdapterInstance$2(CommandTargetMapperImpl.java:96)
at io.vertx.core.impl.future.Composition.onSuccess(Composition.java:38)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:66)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:259)
at io.vertx.core.impl.future.Composition$1.onSuccess(Composition.java:62)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:66)
at io.vertx.core.impl.future.SucceededFuture.addListener(SucceededFuture.java:88)
at io.vertx.core.impl.future.Composition.onSuccess(Composition.java:43)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:66)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:259)
at io.vertx.core.impl.future.Mapping.onSuccess(Mapping.java:40)
at io.vertx.core.impl.future.FutureImpl$ListenerArray.onSuccess(FutureImpl.java:310)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:66)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:259)
at io.vertx.core.impl.future.Mapping.onSuccess(Mapping.java:40)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:66)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:259)
at io.vertx.core.impl.future.Composition$1.onSuccess(Composition.java:62)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:66)
at io.vertx.core.impl.future.SucceededFuture.addListener(SucceededFuture.java:88)
at io.vertx.core.impl.future.Composition.onSuccess(Composition.java:43)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:66)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:259)
at io.vertx.core.impl.future.Composition$1.onSuccess(Composition.java:62)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:66)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:259)
at io.vertx.core.impl.future.Mapping.onSuccess(Mapping.java:40)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:66)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:259)
at io.vertx.core.impl.future.Composition$1.onSuccess(Composition.java:62)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:66)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:259)
at io.vertx.core.impl.future.PromiseImpl.onSuccess(PromiseImpl.java:49)
at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:41)
at org.eclipse.hono.client.amqp.RequestResponseClient.handleResponse(RequestResponseClient.java:368)
at org.eclipse.hono.client.amqp.connection.impl.HonoConnectionImpl.lambda$createReceiver$13(HonoConnectionImpl.java:688)
at io.vertx.proton.impl.ProtonReceiverImpl.onDelivery(ProtonReceiverImpl.java:236)
at io.vertx.proton.impl.ProtonTransport.handleSocketBuffer(ProtonTransport.java:168)
at io.vertx.core.net.impl.NetSocketImpl.lambda$new$1(NetSocketImpl.java:104)
at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:255)
at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:134)
at io.vertx.core.net.impl.NetSocketImpl$DataMessageHandler.handle(NetSocketImpl.java:412)
at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:328)
at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:321)
at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:388)
at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:159)
at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:801)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Unknown Source)I have increased the connection pool: commandRouter:
amqp:
insecurePortEnabled: true
insecurePortBindAddress: "0.0.0.0"
cache:
redis:
hosts: "redis://redis:6379"
max-pool-size: 8
max-pool-waiting: 40but this did not (yet) fix the issue. I can also hardly believe that just running a few queries already brings Redis to its limits. Maybe you want to investigate a little further ... |
Use dedicated connections for Redis operations that require WATCH/MULTI/EXEC transactions, since the pooled RedisAPI does not support stateful transactional commands. Also remove the api.close() call from stop() as the RedisAPI lifecycle is managed by the Quarkus CDI container.
This is a DRAFT PR to add a Redis backed command-router.
It consists of the following changes:
There are some recently discovered problems though, namely that the CommandAndControlAmqpIT and CommandAndControlMqttIT tests fail when run against the redis backed command router service. There also seems to be a problem with the redis configuration. The config interceptor that is supposed to "redirect"
hono.commandRouter.cache.redis.*config options to thequarkus.redis.*namespace does not seem to be working so it is necessary to explicitly set thequarkus.redis.hostsproperty.