Il database si aspetta dei dati così strutturati:
data = {
"time_received": int,
"sensor_group": string,
"sensor_id": int,
"uptime": int,
"temperature": int,
"pressure": int,
"humidity": int,
"ix": int,
"iy": int,
"iz": int,
"mask": int,
}I campi sensor_group, sensor_id e time_received formano una chiave primaria composta, time_received è il timestamp (int) di ricezione del messaggio dal primo elemento della coda, il Producer Kafka che smista i messaggi sui diversi topics.
La conversione tra chiavi nei messaggi avviene nel componente che si occupa di scrivere a database, su Kafka sono presenti soltanto i dati grezzi così come arrivati dal sensore.
Il progetto è diviso in tre package Python, simpss, simpss_persistence e mocks.
Il package simpss contiene la parte di interfaccia tra EMQ e Kafka, nella classe MqttKafkaProducer, il cui utilizzo è dimostrato nel file stress_producer.py.
Il package simpss_persistence gestisce lo storage su Cassandra e definisce le due interfacce per il pub/sub. L'utilizzo della classe CassandraStorage è dimostrato nel file stress_cassandra.py.
Il package mocks contiene il sensore dummy, utilizzabile per i test di carico.
Per poter essere eseguito il software necessita di:
- Docker
- Python 3.7
Sulle macchine che ospitano il Producer ed il Consumer si consiglia di installare Python ed in seguito creare un virtualenv seguendo questi passi:
pip install virtualenv
cd /dest/of/virtual/envirnoment
virtualenv .venv # puts the virtual environment in the folder .venv in the local folder
source .venv/bin/activate
pip install -r requirements.txt # install the requirementsIl consumer ed il producer sono modificabili direttamente nel codice sorgente o configurabili tramite variabili d'ambiente.
Il producer legge le seguenti variabili d'ambiente per la configurazione del client MQTT
MQTT_QOS: QoS per il broker MQTT (default 2)MQTT_CLIENT_ID: client id per il client MQTT (default 'prod1')MQTT_ADDRESS: url del broker MQTT (default 'localhost')MQTT_TOPIC: nome del topic a cui sottoscrivere (defailt simpss)MQTT_MAX_INFLIGHT: massimo numero di messaggi in volo (default 100)
e le seguenti per la configurazione del Producer Kafka
KAFKA_BOOTSTRAP_SERVERS: urls dei server kafka, completi di porta, opzionalmente divisi da virgola (default "localhost:9092")KAFKA_CLIENT_ID: id client per Kafka (default "k-prod-1")KAFKA_TIMEOUS_MS: timeout in millisecondi per la connessione a Kafka (default 6000)KAFKA_MAX_INFLIGHT: massimo numero di messaggi in volo (default 100)KAFKA_LINGER_MS: millisecondi di attesa per creare una batch di messaggi (default 1). Se 0, l'invio avviene sequenzialmente un messaggio alla volta (degrada prestazioni).
Inoltre il Producer necessita un mapping nella forma di un dizionario Python al momento della inizializzazione:
sensor_groups = {
120: 'g1', # id_sensore 120 --> gruppo 'g1'
121: 'g1', # id_sensore 121 --> gruppo 'g1'
122: 'g2', # ...
123: 'g2',
}Questo passo va fatto manualmente modificando il codice che crea il Producer, si veda il file stress_producer.py per un esempio.
Il Consumer legge le seguenti variabili d'ambiente per Kafka
KAFKA_BOOTSTRAP_SERVERS: urls dei server di Kafka, completi di porta, opzionalmente separati da virgola (default "localhost:9092")KAFKA_CONSUMER_GROUP_ID: id del gruppo di consumers a cui il Consumer vuole aggiungersi (default "cg1")
e le seguenti per Cassandra
CASSANDRA_CLUSTER_ADDRESSES: url di un nodo del cluster Cassandra (default localhost)CASSANDRA_KEYSPACE: nome del keyspace da utilizzare (default "simpss")CASSANDRA_REPLICATION: replication factor della tabella dati (default 3)
Inoltre la classe CassandraStorage necessita di un mapping che identifichi le chiavi delle colonne dati --> colonne tabella, come segue:
mapping = {
'sensor_group': 'sensor_group',
'id': 'sensor_id', # 'id' nei dati viene scritto in 'sensor_id' su Cassandra
'time_received': 'time_received',
'uptime': 'uptime',
'T': 'temperature', # 'T' nei dati viene scritto in 'temperature' su Cassandra
'P': 'pressure',
'H': 'humidity',
'Ix': 'ix',
'Iy': 'iy',
'Iz': 'iz',
'M': 'mask',
}Il file stress_cassandra.py presenta un esempio di utilizzo.
Utilizzare questi comandi nella root del progetto:
docker-compose rm -f
docker-compose pull
docker-compose up --build -d
docker-compose down # alla fine, terminato il testPer effettuare lo stress test, utilizzare i seguenti comandi:
docker-compose rm -f
docker-compose pull
docker-compose up --build -d
python stress_sensor.py # in un terminale separato
python stress_producer.py # in un terminale separato
python stress_cassandra.py # in un terminale separato
docker-compose downFar partire un Docker container così:
docker run -it --link nome_container_target:cassandra --rm cassandra:3.11 cqlsh cassandrache nel nostro caso, visto che il nome del container Cassandra è cassandra1 e il network creato da compose è cassandra_simpss-net sarà:
docker run -it --network unimibsimpss_simpss-net --link cassandra1:cassandra --rm cassandra:3.11 cqlsh cassandraI dischi vanno formattati in ext4 per essere utilizzati. Vogliamo usate dev/sda per kafka/zookeeper e montarlo su /mnt/kafka-zookeeper, mentre useremo /dev/sdb montato su /mnt/cassandra per cassandra.
Prima di tutto, formattare i dischi:
# primo disco, generalmente su /dev/sda
sudo parted /dev/sda mklabel gpt
# creo la partizione
sudo parted -a opt /dev/sda mkpart primary ext4 0% 100%
# formatto la partizione in ext4
sudo mkfs.ext4 -L kafka-zookeeper /dev/sda1
# secondo disco, generalmente su /dev/sdb
sudo parted /dev/sdb mklabel gpt
# creo la partizione
sudo parted -a opt /dev/sdb mkpart primary ext4 0% 100%
# formatto la partizione in ext4
sudo mkfs.ext4 -L cassandra /dev/sdb1Poi modificare il file /etc/fstab aggiungendo le seguenti righe:
LABEL=kafka-zookeeper /mnt/kafka-zookeeper ext4 defaults 0 2
LABEL=cassandra /mnt/cassandra ext4 defaults 0 2e poi
sudo mount -ache forza il refresh dei dischi e li monta nelle posizioni specificate.
In questo modo, i dischi verranno montati in automatico a ogni reboot del sistema.
Infine, creare le cartelle che serviranno a kafka, zookeeper e cassandra per scrivere i loro files:
sudo mkdir /mnt/kafka-zookeeper/data-kafka
sudo mkdir /mnt/kafka-zookeeper/data-zookeeper
sudo mkdir /mnt/kafka-zookeeper/logs-zookeeper
sudo mkdir /mnt/cassandra/dataLe cartelle create al punto precedente per Kafka, Zookeeper e Cassandra sono ancora di proprietà di root.
Bisogna quindi creare un nuovo gruppo di utenti, chiamato simpss_group, in cui inserire l'utente user_simpss (quello di login), e assegnare al gruppo le cartelle:
# creazione gruppo utenti
sudo groupadd simpss_group
# aggiungi l'utente al gruppo
sudo usermod -a -G simpss_group user_simpssFatto ciò, fare logout e login da SSH. Poi, cambiare il group owner delle cartelle:
sudo chgrp -R simpss_group /mnt/kafka-zookeeper/data-kafka/
sudo chmod -R g+rwx /mnt/kafka-zookeeper/data-kafka/
sudo chgrp -R simpss_group /mnt/kafka-zookeeper/data-zookeeper/
sudo chmod -R g+rwx /mnt/kafka-zookeeper/data-zookeeper/
sudo chgrp -R simpss_group /mnt/kafka-zookeeper/logs-zookeeper/
sudo chmod -R g+rwx /mnt/kafka-zookeeper/logs-zookeeper/
sudo chgrp -R simpss_group /mnt/cassandra/data/
sudo chmod -R g+rwx /mnt/cassandra/data/Visto che è richiesta la 3.7 e di default c'è la 3.6, utilizzare i seguenti comandi:
sudo add-apt-repository "deb http://archive.ubuntu.com/ubuntu $(lsb_release -sc) universe"
sudo apt-get update
sudo apt-get install python3.7 python3.7-devPoi installare Pipenv con python3.7 -m pip install --user pipenv.
Da terminale, navigare nella cartella del progetto e digitare pipenv --python 3.7 per creare l'ambiente virtuale, poi ancora pipenv install -r requirements.txt e pipenv install -r dev-requirements.txt.
- collegarsi in remoto con
ssh user_simpss@88.149.215.117 -p 2200 - assicurarsi che i dischi per kafka/zookeeper e cassandra siano montati:
ls /mntsi devono trovare i mountpoint/mnt/kafka-zookeepere/mnt/cassandra. Se ciò non è avvenuto, riferirsi agli step precedenti docker-compose rm -fdocker-compose pulle aspettare che scarichi le immaginidocker-compose up --build -d- attivare la shell Pipenv con
pipenv shelle fare lo stress test con:python stress-sensor.pypython stress_producer.pypython stress_cassandra.py- entrare in un container Docker per fare le query a Cassandra con
docker run -it --network unimibsimpss_simpss-net --link cassandra1:cassandra --rm cassandra:3.11 cqlsh cassandra - nella shell che si presenta, digitare
SELECT * from simpss.sensor_data LIMIT 10;e verficare che siano salvati i dati
- una volta finito,
docker-compose down
Per lanciare il sistema bisogna svolgere 4 operazioni: definire le associazioni tra sensori e gruppi, lanciare docker, lanciare il collegamento tra MQTT e Kafka (link_mqtt_kafka.py), e lanciare il collegamento tra Kafka e Cassandra (link_kafka_cassandra.py).
I sensori devono far parte ognuno di un solo gruppo. L'associazione si trova nel file sensor_group.csv, dove la prima colonna contiene l'id del sensore (type: int) e la seconda il nome del gruppo (type: string).
Importante 1: non sono ammessi id sensore duplicati, né righe/colonne mancanti nel file di definizione dell'associazione, né nomi di gruppi contenenti virgole.
Importante 2: il file va modificato prima di lanciare gli script Python, poiché essi lo caricano solo all'inizio dell'esecuzione e mai più. Se si effettuano modifiche al file, fermare e rilanciare gli script come descritto nelle prossime sezioni.
- collegarsi in remoto con
ssh user_simpss@88.149.215.117 -p 2200 - assicurarsi che i dischi per kafka/zookeeper e cassandra siano montati:
ls /mntsi devono trovare i mountpoint/mnt/kafka-zookeepere/mnt/cassandra. Se ciò non è avvenuto, riferirsi agli step precedenti docker-compose rm -fdocker-compose pulle aspettare che scarichi le immaginidocker-compose up --build -d
- collegarsi tramite ssh con
ssh user_simpss@88.149.215.117 -p 2200ed entrare nella cartella del progetto concd unimib-simpss - digitare il comando
screene premere invio alla schermata successiva. Si è ora in un terminale virtuale che continuerà a vivere anche dopo la nostra disconnessione (è quello che vogliamo visto che gli script Python devono girare sempre) - attivare la shell Pipenv con
pipenv shell. Il terminale visualizzerà(unimib-simpss)prima del solito prompt - attivare il link tra Kafka e Cassandra con
python link_kafka_cassandra.py
Il terminale resta quindi bloccato.
Per sbloccarlo, uscire dalla sessione di screen con il comando Ctrl-a d, ci si troverà nel terminale normale ancora collegati in SSH, ma non bloccati.
Per riconnettersi al terminale virtuale che sta eseguendo lo script Python, digitare screen -ls e poi scegliere dalla lista una sessione con la dicitura Detached.
Prendere il suo id (qualcosa di simile a 357.pts-1.data) e digitare screen -r id_sessione. Ci si troverà connessi di nuovo con lo script in esecuzione e i log visualizzati a schermo.
Per fermare lo script, semplicemente usare Ctrl-c.
- collegarsi tramite ssh con
ssh user_simpss@88.149.215.117 -p 2200ed entrare nella cartella del progetto concd unimib-simpss - digitare il comando
screene premere invio alla schermata successiva. Si è ora in un terminale virtuale che continuerà a vivere anche dopo la nostra disconnessione (è quello che vogliamo visto che gli script Python devono girare sempre) - attivare la shell Pipenv con
pipenv shell. Il terminale visualizzerà(unimib-simpss)prima del solito prompt - attivare il link tra MQTT e Kafka con
python link_mqtt_kafka.py
Il terminale resta quindi bloccato.
Per sbloccarlo, uscire dalla sessione di screen con il comando Ctrl-a d, ci si troverà nel terminale normale ancora collegati in SSH, ma non bloccati.
Per riconnettersi al terminale virtuale che sta eseguendo lo script Python, digitare screen -ls e poi scegliere dalla lista una sessione con la dicitura Detached.
Prendere il suo id (qualcosa di simile a 357.pts-1.data) e digitare screen -r id_sessione. Ci si troverà connessi di nuovo con lo script in esecuzione e i log visualizzati a schermo.
Per fermare lo script, semplicemente usare Ctrl-c.
Un buon tutorial su come usare screen si trova a questo link.
Per spegnere il sistema, basta collegarsi agli screen remoti e usare Ctrl-c.
Per uscire dalla shell Pipenv usare Ctrl-d, così come dal terminale virtuale screen.
Per arrestare i Docker containers di MQTT, Kafka e Cassandra, entrare nella cartella del progetto cd unimib-simpss ed eseguire il comando docker-compose down.
In caso di problemi, si possono vedere i log con docker-compose logs.
Per cancellare il database Cassandra e Kafka, fermare il sistema con docker-compose down dalla presente cartella e poi cancellare tutto il contenuto delle cartelle /mnt/kafka-zookeeper/data-kafka, /mnt/kafka-zookeeper/data-zookeeper, /mnt/kafka-zookeeper/logs-zookeeper, /mnt/cassandra/data.
Attenzione: non cancellare le cartelle, solo il loro contenuto. Se non esistono, il sistema non si avvia!
Se la connessione viene rifiutata è colpa del firewall di MGH. In questo caso, prima di connettersi, nel terminale locale eseguire
ssh-keygen -R [88.149.215.117]:2200che rimuove la vecchia chiave (apparentemente cambia ad ogni connessione, non so perché)
Conviene controllare periodicamente lo spazio disponibile nelle partizioni dati di Kafka e Cassandra. Un facile comando è df -h.