From d4e02a2ac60541c18512025208bb93326bfca50e Mon Sep 17 00:00:00 2001 From: Herman Engelbrecht Date: Wed, 9 Jun 2021 13:29:56 +0200 Subject: [PATCH 1/3] Added real UDP socket. Simulator can now switch between real network and simulated network. Also added standalone version of single node and matcher node. Verified that standalone versions work corrected. Had to hard code the IP and ports for the standalone versions --- StandaloneMatcherNode.py | 69 +++++++++++++++++++++++++++++++ StandaloneNode.py | 89 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 158 insertions(+) create mode 100644 StandaloneMatcherNode.py create mode 100644 StandaloneNode.py diff --git a/StandaloneMatcherNode.py b/StandaloneMatcherNode.py new file mode 100644 index 0000000..ee261b1 --- /dev/null +++ b/StandaloneMatcherNode.py @@ -0,0 +1,69 @@ +import sys +import getopt + + +from Node import MatcherNode +from Node import VASTNode, SPSNode +from VAST import VASTInterface +from Message import Message +from Connector import * +from Area import Area + + +messageProcessingTick = 0.001 +simulationTick = 0.01 + +async def processReceiveMessage(node): + while True: + await asyncio.sleep(messageProcessingTick) + node.processSingleMessage() + +def main(argv): + + + IP = '' + port = '' + try: + opts, args = getopt.getopt(argv, "hi:p:c:s:") + except getopt.GetoptError: + print + 'singlenode.py -i -p ' + sys.exit(2) + for opt, arg in opts: + if opt == '-h': + print + 'singlenode.py -i -p ' + sys.exit() + elif opt in ("-ip"): + IP = arg + elif opt in ("-port"): + port = int(arg) + + print('(IP:Port) = (%s:%d)' % (IP, port)) + + format = "%(asctime)s: %(message)s" + logging.basicConfig(format=format, level=logging.INFO, + datefmt="%H:%M:%S") + + VAST = VASTInterface() + matcherNodeID = 12000 + VAST.join(matcherNodeID, '127.0.0.1',12000) + VAST.join(12001, '127.0.0.1',12001) + VAST.join(12002, '127.0.0.1',12002) + + matcher = MatcherNode(nodeID=matcherNodeID, networkInterface=RealNetworkInterface(senderIP=IP, senderPort=12000), VASTInterface=VAST) + matcher.registerID() + + loop = asyncio.get_event_loop() + try: + asyncio.ensure_future(processReceiveMessage(matcher)) + loop.run_forever() + except KeyboardInterrupt: + pass + finally: + logging.info("Closing Loop") + + loop.close() + +if __name__ == '__main__': + main(sys.argv[1:]) \ No newline at end of file diff --git a/StandaloneNode.py b/StandaloneNode.py new file mode 100644 index 0000000..b91b4b2 --- /dev/null +++ b/StandaloneNode.py @@ -0,0 +1,89 @@ +import sys +import getopt + + +from Node import VASTNode +from VAST import VASTInterface +from Message import Message +from Connector import * +import random + +messageProcessingTick = 0.001 + + +async def processReceiveMessage(node): + while True: + await asyncio.sleep(messageProcessingTick) + node.processSingleMessage() + + +async def publishMessage(node, channel, destinationID, messageNumber): + while True: + publishTick = random.randint(0,2) + await asyncio.sleep(publishTick) + payload = 'publication message %d' % messageNumber + message = Message(senderNodeID=node.getNodeID(), type='pub', channel=channel, payload=payload) + node.send(destinationNodeID=destinationID, message=message) + messageNumber += 1 + +def main(argv): + + IP = '' + port = '' + subchannel = '' + pubchannel = '' + try: + opts, args = getopt.getopt(argv, "hi:p:c:s:") + except getopt.GetoptError: + print + 'singlenode.py -i -p ' + sys.exit(2) + for opt, arg in opts: + if opt == '-h': + print + 'singlenode.py -i -p ' + sys.exit() + elif opt in ("-i"): + IP = arg + elif opt in ("-p"): + port = int(arg) + elif opt in ("-c"): + pubchannel = arg + elif opt in ("-s"): + subchannel = arg + + + print('(IP:Port) = (%s:%d)' % (IP, port)) + + format = "%(asctime)s: %(message)s" + logging.basicConfig(format=format, level=logging.INFO, + datefmt="%H:%M:%S") + + VAST = VASTInterface() + matcherNodeID = 12000 + VAST.join(matcherNodeID, '127.0.0.1',12000) + VAST.join(12001, '127.0.0.1',12001) + VAST.join(12002, '127.0.0.1',12002) + + node = VASTNode(nodeID=port, networkInterface=RealNetworkInterface(senderIP=IP, senderPort=port), VASTInterface=VAST) + node.registerID() + + # Node '1' subscribes to channel 'test2' + message = Message(senderNodeID=node.getNodeID(), type='sub', channel=subchannel, payload=None) + node.send(destinationNodeID=matcherNodeID, message=message) + + + loop = asyncio.get_event_loop() + try: + asyncio.ensure_future(processReceiveMessage(node)) + asyncio.ensure_future(publishMessage(node, pubchannel, matcherNodeID, 0)) + loop.run_forever() + except KeyboardInterrupt: + pass + finally: + logging.info("Closing Loop") + + loop.close() + +if __name__ == '__main__': + main(sys.argv[1:]) \ No newline at end of file From 30d29d11e52bb6180e528b1fc442b3ce397a4c09 Mon Sep 17 00:00:00 2001 From: Herman Engelbrecht Date: Wed, 9 Jun 2021 13:55:15 +0200 Subject: [PATCH 2/3] Remember to commit all the files --- Connector.py | 69 +++++++++++++++++++++++++++++++++++++++++++--------- Simulator.py | 38 ++++++++++++++++++++--------- 2 files changed, 85 insertions(+), 22 deletions(-) diff --git a/Connector.py b/Connector.py index a9c6a84..67ff49f 100644 --- a/Connector.py +++ b/Connector.py @@ -2,6 +2,10 @@ from copy import deepcopy import queue import socket +import pickle +import codecs + +import asyncio import logging @@ -16,6 +20,10 @@ def connect(self, destinationIP, destinationPort): pass + @abstractmethod + def disconnect(self): + pass + @abstractmethod def send(self, message): @@ -52,45 +60,80 @@ def getMessage(self): pass +class ServerProtocol(asyncio.DatagramProtocol): + + def __init__(self, queue): + self.queue = queue + + + def connection_made(self, transport): + self.transport = transport + + + def datagram_received(self, data, addr): + #logging.info("ServerProtocol::datagram_received => Received datagram '%s' from %s" % (data, addr)) + message = pickle.loads(codecs.decode(data, "base64")) + self.queue.put(message) + + class RealNetworkInterface(NetworkInterface): + + def __init__(self, senderIP, senderPort): self.senderIP = senderIP self.senderPort = senderPort - self.serverSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.receiveQueue = queue.Queue() + self.loop = asyncio.get_event_loop() + def connect(self, destinationIP, destinationPort): + + self.destinationIP = destinationIP + self.destinationPort = destinationPort + #logging.info("RealNetwork::connect => Connecting to (%s:%d)" % (self.destinationIP, self.destinationPort)) self.clientSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + def disconnect(self): + self.listen.close() + + def send(self, message): - self.clientSock.sendto( (UDP_IP_ADDRESS, UDP_PORT_NO)) - pass + #logging.info("RealNetwork::send => Sending message '%s' to (%s:%d)" % (message.getPayload(), self.destinationIP, self.destinationPort)) + data = codecs.encode(pickle.dumps(message), "base64") + self.clientSock.sendto(data, (self.destinationIP, self.destinationPort)) def receive(self, message): - pass + self.receiveQueue.put(message) def bind(self): - serverSock.bind(self.senderIP, self.senderPort) - pass + logging.info("RealNetworkInterface::bind => Registering (IP:Port) = (%s:%d )" % (self.senderIP, self.senderPort)) + self.listen = self.loop.create_datagram_endpoint( + lambda: ServerProtocol(self.receiveQueue), local_addr=(self.senderIP, self.senderPort)) + self.loop.run_until_complete(self.listen) + #self.transport, self.protocol + # try: + # self.loop.run_forever() + # except KeyboardInterrupt: + # pass def getIP(self): - pass + return self.senderIP def getPort(self): - pass + return self.senderPort def getNumberOfMessages(self): - pass + return self.receiveQueue.qsize() def getMessage(self): - pass + return self.receiveQueue.get() class FakeNetworkInterface(NetworkInterface): @@ -103,10 +146,14 @@ def __init__(self, senderIP, senderPort): def connect(self, destinationIP, destinationPort): - logging.debug("Connecting (%s:%s) to (%s:%s)" % (self.senderIP, self.senderPort, destinationIP, destinationPort)) + #logging.info("FakeNetwork::connect => Connecting (%s:%s) to (%s:%s)" % (self.senderIP, self.senderPort, destinationIP, destinationPort)) self.destinationInterface = interfaces[destinationIP+":"+destinationPort] + def disconnect(self): + interfaces[self.senderIP + ":" + self.senderPort] = None + + def bind(self): logging.info("FakeNetworkInterface::bind => Registering (IP:Port) = (%s:%s )" % (self.senderIP, self.senderPort)) interfaces[self.senderIP+":"+self.senderPort] = self diff --git a/Simulator.py b/Simulator.py index 529d497..0c119b1 100644 --- a/Simulator.py +++ b/Simulator.py @@ -21,11 +21,6 @@ async def work(node): async def simulate(matcher, nodes, loop): matcherNodeID = matcher.getNodeID() - # nodes 'joioniconnect to matcher - matcher.registerID() - for node in nodes: - node.registerID() - # Channel based pub/sub logging.info("") logging.info("Simulator::main => TESTING CHANNEL-BASED PUB/SUB") @@ -36,12 +31,12 @@ async def simulate(matcher, nodes, loop): matcher.send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) - # Node '1' subscribes to channel 'test' + # Node '1' subscribes to channel 'test2' message = Message(senderNodeID='1', type='sub', channel='test', payload=None) nodes[0].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) - # Node '1' subscribes to channel 'test' + # Node '1' subscribes to channel 'test2' message = Message(senderNodeID='2', type='sub', channel='test2', payload=None) nodes[1].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) @@ -281,10 +276,30 @@ def main(): VAST = VASTInterface() matcherNodeID = '0' - matcher = MatcherNode(nodeID=matcherNodeID, - networkInterface=FakeNetworkInterface(senderIP='127.0.0.1', senderPort='1000'),VASTInterface=VAST) - nodes = [VASTNode(nodeID='1', networkInterface=FakeNetworkInterface(senderIP='127.0.0.1', senderPort='1001'), VASTInterface=VAST), - VASTNode(nodeID='2', networkInterface=FakeNetworkInterface(senderIP='127.0.0.1', senderPort='1002'), VASTInterface=VAST)] + + RealNetwork = True + + if (RealNetwork): + matcher = MatcherNode(nodeID=matcherNodeID, networkInterface=RealNetworkInterface(senderIP='127.0.0.1', senderPort=12000), VASTInterface=VAST) + nodes = [VASTNode(nodeID='1', networkInterface=RealNetworkInterface(senderIP='127.0.0.1', senderPort=12001), VASTInterface=VAST), + VASTNode(nodeID='2', networkInterface=RealNetworkInterface(senderIP='127.0.0.1', senderPort=12002), VASTInterface=VAST)] + + + else: + matcher = MatcherNode(nodeID=matcherNodeID, + networkInterface=FakeNetworkInterface(senderIP='127.0.0.1', senderPort='1000'), + VASTInterface=VAST) + nodes = [VASTNode(nodeID='1', networkInterface=FakeNetworkInterface(senderIP='127.0.0.1', senderPort='1001'), + VASTInterface=VAST), + VASTNode(nodeID='2', networkInterface=FakeNetworkInterface(senderIP='127.0.0.1', senderPort='1002'), + VASTInterface=VAST)] + + + # nodes connect to network infrastructure + matcher.registerID() + for node in nodes: + node.registerID() + loop = asyncio.get_event_loop() try: @@ -297,6 +312,7 @@ def main(): pass finally: logging.info("Closing Loop") + loop.close() From 47c99b3bfee76f5049992de821f9d6b162eabe29 Mon Sep 17 00:00:00 2001 From: Herman Engelbrecht Date: Mon, 14 Jun 2021 07:53:21 +0200 Subject: [PATCH 3/3] Added gateway. Nodes now get a nodeID by sending a JoinMessage to the gateway. The assumption is that the matcher is the first node to join. All nodeIDs are then sent to all nodes (note that this is not a scalable solution to the node discovery problem) --- Connector.py | 23 ++++--- Message.py | 49 ++++++++++++++ Node.py | 139 +++++++++++++++++++++++++++++++++------ Simulator.py | 81 +++++++++++++---------- StandaloneMatcherNode.py | 21 +++--- StandaloneNode.py | 43 +++++++----- VAST.py | 9 ++- 7 files changed, 272 insertions(+), 93 deletions(-) diff --git a/Connector.py b/Connector.py index 67ff49f..d6b58f5 100644 --- a/Connector.py +++ b/Connector.py @@ -90,6 +90,8 @@ def connect(self, destinationIP, destinationPort): self.destinationIP = destinationIP self.destinationPort = destinationPort + #print(self.destinationIP) + #print(self.destinationPort) #logging.info("RealNetwork::connect => Connecting to (%s:%d)" % (self.destinationIP, self.destinationPort)) self.clientSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -99,12 +101,14 @@ def disconnect(self): def send(self, message): - #logging.info("RealNetwork::send => Sending message '%s' to (%s:%d)" % (message.getPayload(), self.destinationIP, self.destinationPort)) + #logging.info("RealNetwork::send => Sending message '%s' to (%s:%d)" % (message.getType(), self.destinationIP, self.destinationPort)) data = codecs.encode(pickle.dumps(message), "base64") - self.clientSock.sendto(data, (self.destinationIP, self.destinationPort)) + result = self.clientSock.sendto(data, (self.destinationIP, self.destinationPort)) + #logging.info("RealNetwork::send => Results <%s>" % result) def receive(self, message): + #logging.info("RealNetwork::receive => Received a message" ) self.receiveQueue.put(message) @@ -114,11 +118,6 @@ def bind(self): lambda: ServerProtocol(self.receiveQueue), local_addr=(self.senderIP, self.senderPort)) self.loop.run_until_complete(self.listen) - #self.transport, self.protocol - # try: - # self.loop.run_forever() - # except KeyboardInterrupt: - # pass def getIP(self): return self.senderIP @@ -146,8 +145,8 @@ def __init__(self, senderIP, senderPort): def connect(self, destinationIP, destinationPort): - #logging.info("FakeNetwork::connect => Connecting (%s:%s) to (%s:%s)" % (self.senderIP, self.senderPort, destinationIP, destinationPort)) - self.destinationInterface = interfaces[destinationIP+":"+destinationPort] + logging.info("FakeNetwork::connect => Connecting (%s:%s) to (%s:%s)" % (self.senderIP, self.senderPort, destinationIP, destinationPort)) + self.destinationInterface = interfaces[destinationIP+":"+str(destinationPort)] def disconnect(self): @@ -156,16 +155,16 @@ def disconnect(self): def bind(self): logging.info("FakeNetworkInterface::bind => Registering (IP:Port) = (%s:%s )" % (self.senderIP, self.senderPort)) - interfaces[self.senderIP+":"+self.senderPort] = self + interfaces[self.senderIP+":"+str(self.senderPort)] = self def send(self, message): - logging.debug("Sending message '%s' to destinationInterface (%s:%s)" % (message, self.destinationInterface.getIP(), self.destinationInterface.getPort())) + #logging.info("Sending message '%s' to destinationInterface (%s:%s)" % (message, self.destinationInterface.getIP(), self.destinationInterface.getPort())) self.destinationInterface.receive(deepcopy(message)) def receive(self, message): - logging.debug("Receive Queue of NIC (%s:%s) has <%d> messages" % (self.senderIP, self.senderPort, self.receiveQueue.qsize())) + #logging.debug("Receive Queue of NIC (%s:%s) has <%d> messages" % (self.senderIP, self.senderPort, self.receiveQueue.qsize())) self.receiveQueue.put(message) logging.debug("Receive Queue of NIC (%s:%s) has <%d> messages" % (self.senderIP, self.senderPort, self.receiveQueue.qsize())) diff --git a/Message.py b/Message.py index 5713d3b..a7a9956 100644 --- a/Message.py +++ b/Message.py @@ -1,3 +1,52 @@ +class JoinMessage(): + def __init__(self, IP, port): + self.nodeID = None + self.type = None # join, query, matcher + self.IP = IP + self.port = port + self.registeredNodes = {} + + + def setNodeID(self, nodeID): + self.nodeID = nodeID + + + def getNodeID(self): + return self.nodeID + + + def setIP(self, IP): + self.IP = IP + + + def setPort(self, port): + self.port = port + + + def getIP(self): + return self.IP + + + def getPort(self): + return self.port + + + def getType(self): + return self.type + + + def setType(self, type): + self.type = type + + + def setRegisteredNodes(self, nodes): + self.registeredNodes = nodes + + + def getRegisteredNodes(self): + return self.registeredNodes + + class Message(object): diff --git a/Node.py b/Node.py index dcd721b..78378e5 100644 --- a/Node.py +++ b/Node.py @@ -1,7 +1,69 @@ from VAST import VASTInterface from collections import defaultdict -from Message import Message +from Message import Message, JoinMessage import logging +from Connector import NetworkInterface + +class Gateway(): + def __init__(self, interface): + self.interface = interface + self.ID = 0 + self.registeredNodes = {} + self.matchers = [] + + + def initialiseNetworkInterface(self): + self.interface.bind() + + def processSingleMessage(self): + + #logging.info("Gateway::processSingleMessage => Number of received messages <%d>" % self.interface.getNumberOfMessages()) + if (self.interface.getNumberOfMessages()): + message = self.interface.getMessage() + type = message.getType() + #logging.info("Gateway::processSingleMessage => Received message of type <%s>" % type) + if (type == 'join'): + IP = message.getIP() + port = message.getPort() + nodeID = self.generateID(IP, port) + self.registerNode(IP, port, nodeID) + message.setRegisteredNodes(self.registeredNodes) + for ID in self.registeredNodes: + (nodeIP, nodePort) = self.registeredNodes[ID] + message.setIP(nodeIP) + message.setPort(nodePort) + message.setNodeID(ID) + self.interface.connect(nodeIP, nodePort) + self.interface.send(message) + elif (type == 'matcher'): + nodeID = message.getNodeID() + self.registerMatcher(nodeID) + elif (type == 'query'): + nodeID = message.getNodeID() + (IP, port) = self.queryNodeID(nodeID) + message.setIP(IP) + message.setPort(port) + self.interface.connect(IP, port) + self.interface.send(message) + + + def generateID(self, IP, port): + nodeID = self.ID + self.ID += 1 + return nodeID + + + def registerNode(self, IP, port, nodeID): + self.registeredNodes[nodeID] = (IP, port) + + + def registerMatcher(self, nodeID): + self.matchers.append(nodeID) + + + def queryNodeID(self, nodeID): + (IP, port) = self.registeredNodes[nodeID] + return (IP, port) class SPSNode(object): @@ -49,17 +111,23 @@ def unsubscribeFromArea(self, area): class VASTNode(object): - def __init__(self, nodeID, networkInterface, VASTInterface): - self.nodeID = nodeID + def __init__(self, networkInterface, VASTInterface): + self.nodeID = None self.networkInterface = networkInterface self.VAST = VASTInterface - def registerID(self): + def initialiseNetworkInterface(self): + self.networkInterface.bind() + + + def registerID(self, gatewayIP, gatewayPort): IP = self.networkInterface.getIP() port = self.networkInterface.getPort() - self.VAST.join(self.nodeID, IP, port) - self.networkInterface.bind() + message = JoinMessage(IP, port) + message.setType('join') + self.networkInterface.connect(gatewayIP, gatewayPort) + self.networkInterface.send(message) def connect(self, IP, port): @@ -69,19 +137,35 @@ def connect(self, IP, port): def send(self, destinationNodeID, message): #logging.info("VASTNode::send => Node [%s] is sending message with content '%s' to node [%s]" % (self.nodeID, message.getPayload(), destinationNodeID)) (IP, port) = self.VAST.getIPPort(destinationNodeID) + #message.setSenderID(self.getNodeID()) self.networkInterface.connect(IP, port) self.networkInterface.send(message=message) def processSingleMessage(self): - #while (self.networkInterface.getNumberOfMessages()): + #logging.info("Gateway::processSingleMessage => Number of received messages <%d>" % self.networkInterface.getNumberOfMessages()) + if (self.networkInterface.getNumberOfMessages()): message = self.networkInterface.getMessage() - payload = message.getPayload() - senderID = message.getSenderID() type = message.getType() - logging.info("VASTNode::processSingleMessage => Node [%s] has received message <%s> from node [%s] with content '%s'" % (self.nodeID, type, senderID, payload)) - # print("Node [%s] has <%d> messages left in queue" % (self.nodeID, self.networkInterface.getNumberOfMessages()) ) + #logging.info("VASTNode::processSingleMessage => Received message of type <%s>" % type) + if (type == 'join'): + self.nodeID = message.getNodeID() + IP = self.networkInterface.getIP() + port = self.networkInterface.getPort() + self.VAST.join(IP, port, self.nodeID) + registeredNodes = message.getRegisteredNodes() + for nodeID in registeredNodes: + (nodeIP, nodePort) = registeredNodes[nodeID] + self.VAST.join(nodeIP, nodePort, nodeID) + logging.info("VASTNode::handleMessage => MatcherNode::handleMessage => Received nodeID <%s> from gateway" % self.nodeID) + else: + payload = message.getPayload() + senderID = message.getSenderID() + channel = message.getChannel() + logging.info("VASTNode::handleMessage => Node [%s] has received message type <%s> for channel <%s> from node [%s] with content '%s'" % (self.nodeID, type, channel, senderID, payload)) + # logging.info("VASTNode::processSingleMessage => Node [%s] has received message <%s> from node [%s] with content '%s'" % (self.nodeID, type, senderID, payload)) + # print("Node [%s] has <%d> messages left in queue" % (self.nodeID, self.networkInterface.getNumberOfMessages()) ) def processMessages(self): @@ -90,8 +174,9 @@ def processMessages(self): payload = message.getPayload() senderID = message.getSenderID() type = message.getType() - logging.info("VASTNode::processMessages => Node [%s] has received message <%s> from node [%s] with content '%s'" % (self.nodeID, type, senderID, payload)) - # print("Node [%s] has <%d> messages left in queue" % (self.nodeID, self.networkInterface.getNumberOfMessages()) ) + channel = message.getChannel() + logging.info("VASTNode::handleMessage => Node [%s] has received message type <%s> for channel <%s> from node [%s] with content '%s'" % (self.nodeID, type, channel, senderID, payload)) +# # print("Node [%s] has <%d> messages left in queue" % (self.nodeID, self.networkInterface.getNumberOfMessages()) ) def receive(self, message, **kwargs): @@ -105,8 +190,8 @@ def getNodeID(self): class MatcherNode(VASTNode): - def __init__(self, nodeID, networkInterface, VASTInterface): - self.nodeID = nodeID + def __init__(self, networkInterface, VASTInterface): + self.nodeID = None self.networkInterface = networkInterface self.VAST = VASTInterface self.channelSubscriptions = defaultdict() @@ -115,17 +200,20 @@ def __init__(self, nodeID, networkInterface, VASTInterface): def handleMessage(self, message): type = message.getType() - senderID = message.getSenderID() - logging.info("Matcher::handleMessage => Matcher handling message <%s> from node <%s>" % (type, senderID)) + # + #logging.info("Matcher::handleMessage => Matcher handling message <%s> from node <%s>" % (type, senderID)) if type == 'sub': + senderID = message.getSenderID() channel = message.getChannel() logging.info("Matcher::handleMessage => Node <%s> subscribing to channel <%s>" % (senderID, channel)) self.addChannelSubscription(senderID, channel) elif type == 'unsub': + senderID = message.getSenderID() channel = message.getChannel() logging.info("Matcher::handleMessage => Node <%s> unsubscribing from channel <%s>" % (senderID, channel)) self.removeChannelSubscription(senderID, channel) elif type == 'pub': + senderID = message.getSenderID() channel = message.getChannel() payload = message.getPayload() logging.info("Matcher::handleMessage => Node <%s> publishing message '%s' to channel <%s>" % (senderID, payload, channel)) @@ -134,21 +222,34 @@ def handleMessage(self, message): payload = message.getPayload() senderID = message.getSenderID() type = message.getType() - logging.info("Matcher::handleMessage => Node [%s] has received message <%s> from node [%s] with content '%s'" % (self.nodeID, type, senderID, payload)) + channel = message.getChannel() + logging.info("Matcher::handleMessage => Node [%s] has received message type <%s> for channel <%s> from node [%s] with content '%s'" % (self.nodeID, type, channel, senderID, payload)) elif type == 'spatialsub': + senderID = message.getSenderID() area = message.getArea() logging.info("Matcher::handleMessage => Node <%s> subscribing to area <%d,%d,%d>" % (senderID, area.position[0], area.position[1],area.radius)) self.addSpatialSubscription(senderID, area) elif type == 'spatialunsub': + senderID = message.getSenderID() area = message.getArea() logging.info("Matcher::handleMessage => Node <%s> unsubscribing from area <%d,%d,%d>" % (senderID, area.position[0], area.position[1],area.radius)) self.removeSpatialSubscription(senderID, area) elif type == 'spatialpub': + senderID = message.getSenderID() area = message.getArea() payload = message.getPayload() logging.info("Matcher::handleMessage => MatcherNode::handleMessage => Node <%s> publishing message '%s' to area <%d,%d,%d>" % (senderID, payload, area.position[0], area.position[1],area.radius)) self.publishSpatial(senderID, area, payload) - + if (type == 'join'): + self.nodeID = message.getNodeID() + IP = self.networkInterface.getIP() + port = self.networkInterface.getPort() + self.VAST.join(IP, port, self.nodeID) + registeredNodes = message.getRegisteredNodes() + for nodeID in registeredNodes: + (nodeIP, nodePort) = registeredNodes[nodeID] + self.VAST.join(nodeIP, nodePort, nodeID) + logging.info("Matcher::handleMessage => MatcherNode::handleMessage => Received nodeID <%s> from gateway" % self.nodeID) def addChannelSubscription(self, nodeID, channel): if (channel in self.channelSubscriptions): diff --git a/Simulator.py b/Simulator.py index 0c119b1..e7cccbe 100644 --- a/Simulator.py +++ b/Simulator.py @@ -3,7 +3,7 @@ import asyncio from Node import MatcherNode -from Node import VASTNode, SPSNode +from Node import VASTNode, SPSNode, Gateway from VAST import VASTInterface from Message import Message from Connector import * @@ -19,6 +19,7 @@ async def work(node): async def simulate(matcher, nodes, loop): + await asyncio.sleep(2) matcherNodeID = matcher.getNodeID() # Channel based pub/sub @@ -27,54 +28,54 @@ async def simulate(matcher, nodes, loop): logging.info("") # Node '0' subscribes to channel 'test' - message = Message(senderNodeID='0', type='sub', channel='test', payload=None) + message = Message(senderNodeID=matcher.getNodeID(), type='sub', channel='test', payload=None) matcher.send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) # Node '1' subscribes to channel 'test2' - message = Message(senderNodeID='1', type='sub', channel='test', payload=None) + message = Message(senderNodeID=nodes[0].getNodeID(), type='sub', channel='test', payload=None) nodes[0].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) # Node '1' subscribes to channel 'test2' - message = Message(senderNodeID='2', type='sub', channel='test2', payload=None) + message = Message(senderNodeID=nodes[1].getNodeID(), type='sub', channel='test2', payload=None) nodes[1].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) - message = Message(senderNodeID='2', type='pub', channel='test2', payload='publication message 1') + message = Message(senderNodeID=nodes[1].getNodeID(), type='pub', channel='test2', payload='publication message 1') nodes[1].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) - message = Message(senderNodeID='1', type='pub', channel='test2', payload='publication message 2') + message = Message(senderNodeID=nodes[0].getNodeID(), type='pub', channel='test2', payload='publication message 2') nodes[0].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) - message = Message(senderNodeID='2', type='pub', channel='test', payload='publication message 3') + message = Message(senderNodeID=nodes[1].getNodeID(), type='pub', channel='test', payload='publication message 3') nodes[1].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) # Node '0' unsubscribes from channel 'test' - message = Message(senderNodeID='0', type='unsub', channel='test') + message = Message(senderNodeID=matcher.getNodeID(), type='unsub', channel='test') matcher.send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) - message = Message(senderNodeID='2', type='pub', channel='test', payload='publication message 4') + message = Message(senderNodeID=nodes[1].getNodeID(), type='pub', channel='test', payload='publication message 4') nodes[1].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) # Node '0' unsubscribes from channel 'test' - message = Message(senderNodeID='1', type='unsub', channel='test') + message = Message(senderNodeID=nodes[0].getNodeID(), type='unsub', channel='test') nodes[0].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) - message = Message(senderNodeID='2', type='pub', channel='test', payload='publication message 5') + message = Message(senderNodeID=nodes[1].getNodeID(), type='pub', channel='test', payload='publication message 5') nodes[1].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) # Node '2' subscribes to channel 'test' - message = Message(senderNodeID='2', type='unsub', channel='test2') + message = Message(senderNodeID=nodes[1].getNodeID(), type='unsub', channel='test2') nodes[1].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) @@ -87,7 +88,7 @@ async def simulate(matcher, nodes, loop): # Node '0' subscribes to area (x,y,r)=(0,0,10) pos1 = (0, 0) radius1 = 10 - message = Message(senderNodeID='0', type='spatialsub') + message = Message(senderNodeID=matcher.getNodeID(), type='spatialsub') message.setArea(area=Area(pos1, radius1)) matcher.send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) @@ -95,7 +96,7 @@ async def simulate(matcher, nodes, loop): # Node '1' subscribes to area (x,y,r)=(0,5,5) pos2 = (0, 5) radius2 = 5 - message = Message(senderNodeID='1', type='spatialsub') + message = Message(senderNodeID=nodes[0].getNodeID(), type='spatialsub') message.setArea(area=Area(pos2, radius2)) nodes[0].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) @@ -103,7 +104,7 @@ async def simulate(matcher, nodes, loop): # Node '2' subscribes to area (x,y,r)=(0,5,5) pos2 = (0, 5) radius2 = 5 - message = Message(senderNodeID='2', type='spatialsub') + message = Message(senderNodeID=nodes[1].getNodeID(), type='spatialsub') message.setArea(area=Area(pos2, radius2)) nodes[1].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) @@ -111,7 +112,7 @@ async def simulate(matcher, nodes, loop): # Node '1' publishes to area (x,y,r)=(0,5,1) pos2 = (0, 5) radius3 = 1 - message = Message(senderNodeID='1', type='spatialpub') + message = Message(senderNodeID=nodes[0].getNodeID(), type='spatialpub') message.setArea(area=Area(pos2, radius3)) message.setPayload(payload='spatial publication message 1') nodes[0].send(destinationNodeID=matcherNodeID, message=message) @@ -120,7 +121,7 @@ async def simulate(matcher, nodes, loop): # Node '2' unsubscribes from area (x,y,r)=(0,5,5) pos2 = (0, 5) radius2 = 5 - message = Message(senderNodeID='2', type='spatialunsub') + message = Message(senderNodeID=nodes[1].getNodeID(), type='spatialunsub') message.setArea(area=Area(pos2, radius2)) nodes[1].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) @@ -128,7 +129,7 @@ async def simulate(matcher, nodes, loop): # Node '0' publishes to area (x,y,r)=(0,5,1) pos2 = (0, 5) radius3 = 1 - message = Message(senderNodeID='0', type='spatialpub') + message = Message(senderNodeID=matcher.getNodeID(), type='spatialpub') message.setArea(area=Area(pos2, radius3)) message.setPayload(payload='spatial publication message 2') matcher.send(destinationNodeID=matcherNodeID, message=message) @@ -137,7 +138,7 @@ async def simulate(matcher, nodes, loop): # Node '0' publishes to area (x,y,r)=(0,-5,1) pos3 = (0, -5) radius3 = 1 - message = Message(senderNodeID='0', type='spatialpub') + message = Message(senderNodeID=matcher.getNodeID(), type='spatialpub') message.setArea(area=Area(pos3, radius3)) message.setPayload(payload='spatial publication message 3') matcher.send(destinationNodeID=matcherNodeID, message=message) @@ -146,7 +147,7 @@ async def simulate(matcher, nodes, loop): # Node '0' publishes to area (x,y,r)=(0,20,1) pos4 = (0, 20) radius3 = 1 - message = Message(senderNodeID='0', type='spatialpub') + message = Message(senderNodeID=matcher.getNodeID(), type='spatialpub') message.setArea(area=Area(pos4, radius3)) message.setPayload(payload='spatial publication message 4') matcher.send(destinationNodeID=matcherNodeID, message=message) @@ -156,7 +157,7 @@ async def simulate(matcher, nodes, loop): # Node '0' unsubscribes from area (x,y,r)=(0,0,10) pos1 = (0, 0) radius1 = 10 - message = Message(senderNodeID='0', type='spatialunsub') + message = Message(senderNodeID=matcher.getNodeID(), type='spatialunsub') message.setArea(area=Area(pos1, radius1)) matcher.send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) @@ -164,7 +165,7 @@ async def simulate(matcher, nodes, loop): # Node '1' unsubscribes from area (x,y,r)=(0,5,5) pos2 = (0, 5) radius2 = 5 - message = Message(senderNodeID='1', type='spatialunsub') + message = Message(senderNodeID=nodes[0].getNodeID(), type='spatialunsub') message.setArea(area=Area(pos2, radius2)) nodes[0].send(destinationNodeID=matcherNodeID, message=message) await asyncio.sleep(simulationTick) @@ -274,36 +275,46 @@ def main(): logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") - VAST = VASTInterface() - matcherNodeID = '0' + gatewayIP = '127.0.0.1' + gatewayPort = 10000 + VAST = VASTInterface() RealNetwork = True if (RealNetwork): - matcher = MatcherNode(nodeID=matcherNodeID, networkInterface=RealNetworkInterface(senderIP='127.0.0.1', senderPort=12000), VASTInterface=VAST) - nodes = [VASTNode(nodeID='1', networkInterface=RealNetworkInterface(senderIP='127.0.0.1', senderPort=12001), VASTInterface=VAST), - VASTNode(nodeID='2', networkInterface=RealNetworkInterface(senderIP='127.0.0.1', senderPort=12002), VASTInterface=VAST)] + gateway = Gateway(RealNetworkInterface(senderIP=gatewayIP, senderPort=gatewayPort)) + matcher = MatcherNode(networkInterface=RealNetworkInterface(senderIP='127.0.0.1', senderPort=12000), VASTInterface=VAST) + nodes = [VASTNode(networkInterface=RealNetworkInterface(senderIP='127.0.0.1', senderPort=12001), VASTInterface=VAST), + VASTNode(networkInterface=RealNetworkInterface(senderIP='127.0.0.1', senderPort=12002), VASTInterface=VAST)] else: - matcher = MatcherNode(nodeID=matcherNodeID, - networkInterface=FakeNetworkInterface(senderIP='127.0.0.1', senderPort='1000'), + gateway = Gateway(RealNetworkInterface(senderIP=gatewayIP, senderPort=gatewayPort)) + matcher = MatcherNode(networkInterface=FakeNetworkInterface(senderIP='127.0.0.1', senderPort=12000), VASTInterface=VAST) - nodes = [VASTNode(nodeID='1', networkInterface=FakeNetworkInterface(senderIP='127.0.0.1', senderPort='1001'), + nodes = [VASTNode(networkInterface=FakeNetworkInterface(senderIP='127.0.0.1', senderPort=12001), VASTInterface=VAST), - VASTNode(nodeID='2', networkInterface=FakeNetworkInterface(senderIP='127.0.0.1', senderPort='1002'), + VASTNode(networkInterface=FakeNetworkInterface(senderIP='127.0.0.1', senderPort=12002), VASTInterface=VAST)] - - # nodes connect to network infrastructure - matcher.registerID() + # Initialise network interface + gateway.initialiseNetworkInterface() + matcher.initialiseNetworkInterface() for node in nodes: - node.registerID() + node.initialiseNetworkInterface() + # Matcher obtain nodeID from gateway + matcher.registerID(gatewayIP, gatewayPort) + matcherNodeID = matcher.getNodeID() + + # Nodes obtain nodeIDs from gateway + for node in nodes: + node.registerID(gatewayIP, gatewayPort) loop = asyncio.get_event_loop() try: asyncio.ensure_future(work(matcher)) + asyncio.ensure_future(work(gateway)) for node in nodes: asyncio.ensure_future(work(node)) asyncio.ensure_future(simulate(matcher, nodes, loop)) diff --git a/StandaloneMatcherNode.py b/StandaloneMatcherNode.py index ee261b1..6b30365 100644 --- a/StandaloneMatcherNode.py +++ b/StandaloneMatcherNode.py @@ -3,11 +3,8 @@ from Node import MatcherNode -from Node import VASTNode, SPSNode from VAST import VASTInterface -from Message import Message from Connector import * -from Area import Area messageProcessingTick = 0.001 @@ -24,7 +21,7 @@ def main(argv): IP = '' port = '' try: - opts, args = getopt.getopt(argv, "hi:p:c:s:") + opts, args = getopt.getopt(argv, "hi:p:") except getopt.GetoptError: print 'singlenode.py -i -p ' @@ -45,14 +42,18 @@ def main(argv): logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") + gatewayIP = '127.0.0.1' + gatewayPort = 49152 + VAST = VASTInterface() - matcherNodeID = 12000 - VAST.join(matcherNodeID, '127.0.0.1',12000) - VAST.join(12001, '127.0.0.1',12001) - VAST.join(12002, '127.0.0.1',12002) + #matcherNodeID = 0 + #VAST.join('127.0.0.1', 49153, matcherNodeID) + #VAST.join('127.0.0.1', 49154, 1) + #VAST.join('127.0.0.1', 49155, 2) - matcher = MatcherNode(nodeID=matcherNodeID, networkInterface=RealNetworkInterface(senderIP=IP, senderPort=12000), VASTInterface=VAST) - matcher.registerID() + matcher = MatcherNode(networkInterface=RealNetworkInterface(senderIP=IP, senderPort=49153), VASTInterface=VAST) + matcher.initialiseNetworkInterface() + matcher.registerID(gatewayIP, gatewayPort) loop = asyncio.get_event_loop() try: diff --git a/StandaloneNode.py b/StandaloneNode.py index b91b4b2..61beff3 100644 --- a/StandaloneNode.py +++ b/StandaloneNode.py @@ -17,14 +17,24 @@ async def processReceiveMessage(node): node.processSingleMessage() -async def publishMessage(node, channel, destinationID, messageNumber): +async def publishMessage(node, channel, subchannel, destinationID, messageNumber): + subscribed = False while True: - publishTick = random.randint(0,2) + publishTick = random.randint(0, 2) await asyncio.sleep(publishTick) - payload = 'publication message %d' % messageNumber - message = Message(senderNodeID=node.getNodeID(), type='pub', channel=channel, payload=payload) - node.send(destinationNodeID=destinationID, message=message) - messageNumber += 1 + if (node.getNodeID() != None): + logging:info("Have received nodeID <%s>" % node.getNodeID()) + if (subscribed == False): + # Node '1' subscribes to channel 'test2' + message = Message(senderNodeID=node.getNodeID(), type='sub', channel=subchannel, payload=None) + node.send(destinationNodeID=0, message=message) + subscribed = True + else: + + payload = 'publication message %d' % messageNumber + message = Message(senderNodeID=node.getNodeID(), type='pub', channel=channel, payload=payload) + node.send(destinationNodeID=destinationID, message=message) + messageNumber += 1 def main(argv): @@ -59,24 +69,25 @@ def main(argv): logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") + gatewayIP = '127.0.0.1' + gatewayPort = 49152 + VAST = VASTInterface() - matcherNodeID = 12000 - VAST.join(matcherNodeID, '127.0.0.1',12000) - VAST.join(12001, '127.0.0.1',12001) - VAST.join(12002, '127.0.0.1',12002) + matcherNodeID = 0 + #VAST.join('127.0.0.1', 49153, matcherNodeID) + #VAST.join('127.0.0.1', 49154, 1) + #VAST.join('127.0.0.1', 49155, 2) - node = VASTNode(nodeID=port, networkInterface=RealNetworkInterface(senderIP=IP, senderPort=port), VASTInterface=VAST) - node.registerID() - # Node '1' subscribes to channel 'test2' - message = Message(senderNodeID=node.getNodeID(), type='sub', channel=subchannel, payload=None) - node.send(destinationNodeID=matcherNodeID, message=message) + node = VASTNode(networkInterface=RealNetworkInterface(senderIP=IP, senderPort=port), VASTInterface=VAST) + node.initialiseNetworkInterface() + node.registerID(gatewayIP, gatewayPort) loop = asyncio.get_event_loop() try: asyncio.ensure_future(processReceiveMessage(node)) - asyncio.ensure_future(publishMessage(node, pubchannel, matcherNodeID, 0)) + asyncio.ensure_future(publishMessage(node, pubchannel, subchannel, matcherNodeID, 0)) loop.run_forever() except KeyboardInterrupt: pass diff --git a/VAST.py b/VAST.py index 8e9488e..6d041ee 100644 --- a/VAST.py +++ b/VAST.py @@ -1,12 +1,19 @@ +from Connector import NetworkInterface +from Message import JoinMessage + class VASTInterface(object): def __init__(self): self.registeredNodes = {} + def join(self, IP, port, ID): + self.registerNode(IP, port, ID) - def join(self, nodeID, IP, port): + def registerNode(self, IP, port, nodeID): self.registeredNodes[nodeID] = (IP, port) + #print(self.registeredNodes) def getIPPort(self, nodeID): + #print(self.registeredNodes) result = self.registeredNodes[nodeID] return result \ No newline at end of file