From 2a0d2c6e14693882a85a3c46bfc125405e4bf58f Mon Sep 17 00:00:00 2001 From: PALLAVI GOLIWALE Date: Mon, 19 Oct 2015 18:50:40 +0530 Subject: [PATCH] added reset all subscriptions functionality --- common/src/main/scala/tmt/common/Messages.scala | 1 + frontend/app/actors/ConnectionStore.scala | 9 +++++---- frontend/app/controllers/StreamController.scala | 5 +++++ frontend/app/services/ClusterClientService.scala | 4 ++++ frontend/conf/routes | 1 + 5 files changed, 16 insertions(+), 4 deletions(-) diff --git a/common/src/main/scala/tmt/common/Messages.scala b/common/src/main/scala/tmt/common/Messages.scala index 7d97c5b..9c2e8b2 100644 --- a/common/src/main/scala/tmt/common/Messages.scala +++ b/common/src/main/scala/tmt/common/Messages.scala @@ -8,4 +8,5 @@ object Messages { case class UpdateDelay(serverName: String, value: FiniteDuration) case class Subscribe(connection: Connection) case class Unsubscribe(connection: Connection) + case class ResetConnections() } diff --git a/frontend/app/actors/ConnectionStore.scala b/frontend/app/actors/ConnectionStore.scala index dc7b096..53c0fd9 100644 --- a/frontend/app/actors/ConnectionStore.scala +++ b/frontend/app/actors/ConnectionStore.scala @@ -5,7 +5,7 @@ import akka.cluster.Cluster import akka.cluster.ClusterEvent._ import akka.cluster.pubsub.DistributedPubSub import akka.cluster.pubsub.DistributedPubSubMediator.{Publish, Subscribe} -import akka.persistence.{SnapshotOffer, PersistentActor} +import akka.persistence.{SnapshotSelectionCriteria, SnapshotOffer, PersistentActor} import tmt.common.Messages import tmt.shared.Topics import tmt.shared.models.ConnectionSet @@ -34,9 +34,10 @@ class ConnectionStore extends PersistentActor { } def receiveCommand = { - case event: Messages.Subscribe => addConnection(event) - case event: Messages.Unsubscribe => removeConnection(event) - case x: MemberUp => + case event: Messages.Subscribe => addConnection(event) + case event: Messages.Unsubscribe => removeConnection(event) + case event: Messages.ResetConnections => connectionSet = ConnectionSet.empty; deleteSnapshots(SnapshotSelectionCriteria()) + case x: MemberUp => println("******: ", x) scheduler.scheduleOnce(2.second, mediator, Publish(Topics.Connections, connectionSet)) case ConnectionStore.GetConnections => sender() ! connectionSet diff --git a/frontend/app/controllers/StreamController.scala b/frontend/app/controllers/StreamController.scala index 6cbf9b6..2781f09 100644 --- a/frontend/app/controllers/StreamController.scala +++ b/frontend/app/controllers/StreamController.scala @@ -54,4 +54,9 @@ class StreamController @Inject()( clusterClientService.unsubscribe(Connection(serverName, topic)) Accepted("ok") } + + def resetConnections() = Action { + clusterClientService.resetConnections() + Accepted("ok") + } } diff --git a/frontend/app/services/ClusterClientService.scala b/frontend/app/services/ClusterClientService.scala index fcafb26..e6144ff 100644 --- a/frontend/app/services/ClusterClientService.scala +++ b/frontend/app/services/ClusterClientService.scala @@ -40,4 +40,8 @@ class ClusterClientService @Inject()(roleIndexService: RoleIndexService)(implici } def allConnections = (connectionStore ? ConnectionStore.GetConnections).mapTo[ConnectionSet] + + def resetConnections() = { + mediator ! Publish(Topics.Subscription, Messages.ResetConnections()) + } } diff --git a/frontend/conf/routes b/frontend/conf/routes index 001ba8e..8f06356 100644 --- a/frontend/conf/routes +++ b/frontend/conf/routes @@ -6,6 +6,7 @@ GET /connections controllers.StreamController.conn POST /:server/throttle/:delay controllers.StreamController.throttle(server, delay: Long) POST /:server/subscribe/:topic controllers.StreamController.subscribe(server, topic) POST /:server/unsubscribe/:topic controllers.StreamController.unsubscribe(server, topic) +POST /reset-connections controllers.StreamController.resetConnections # Map static resources from the /public folder to the /assets URL path GET /assets/*file controllers.Assets.at(path="/public", file)