diff --git a/openwhisk/db2_insert.py b/openwhisk/db2_insert.py new file mode 100644 index 0000000..6e57100 --- /dev/null +++ b/openwhisk/db2_insert.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python +# author mark_purcell@ie.ibm.com + +import json +from db2 import DB2 + +MAX_INSERT_COUNT = 1000 + +# insert a double value +SQL_TEMPLATE1 = "CALL {database}('{{device_id}}', TIMESTAMP('{{observed}}'), {{value}})" + +# insert a combination double/text value +SQL_TEMPLATE2 = "CALL {database}('{{device_id}}', TIMESTAMP('{{observed}}'), {{value}}, '{{text}}')" + +# insert a text value +SQL_TEMPLATE3 = "CALL {database}('{{device_id}}', TIMESTAMP('{{observed}}'), NULL, '{{text}}')" + + +def gen_statement_insert(database, row): + '''generate sql insert statment + + returns: (statment, true) on success, (error, false) otherwise + ''' + sql_template1 = SQL_TEMPLATE1.format(database=database) + sql_template2 = SQL_TEMPLATE2.format(database=database) + sql_template3 = SQL_TEMPLATE3.format(database=database) + + sql = '' + + try: + if 'device_id' not in row: + return sql + + if 'observed_timestamp' not in row: + return sql + + did = row['device_id'] + ots = row['observed_timestamp'] + + txt, val = None, None + if 'value' in row: + val = row['value'] + if 'text' in row: + txt = row['text'] + + if val is not None: + if type(val) == str: + # string value - ignore any text + sql = sql_template3.format(device_id=did, observed=ots, text=val) + elif txt is not None: + sql = sql_template2.format(device_id=did, observed=ots, value=val, text=txt) + else: + sql = sql_template1.format(device_id=did, observed=ots, value=val) + elif txt is not None: + sql = sql_template3.format(device_id=did, observed=ots, text=txt) + + except Exception as err: + print('error: {}'.format(err)) + return str(err), False + + return sql, True + + +def process(args): + '''insert messages into db2''' + errs = [] + commands = 0 + requested = 0 + + try: + db2 = DB2(args, ['database']) + count = 0 + sql = '' + + for msg in args['messages']: + stmt, err = gen_statement_insert(args['database'], msg) + if not err: + errs.append(err) + continue + + sql += stmt + '; ' + + count += 1 + if count == MAX_INSERT_COUNT: + rsp = db2.insert(sql) + d = rsp.json() + commands += d['commands_count'] + requested += count + count = 0 + sql = '' + + if count != 0: + rsp = db2.insert(sql) + d = rsp.json() + commands += d['commands_count'] + requested += count + + retval = {'status': 'success', + 'requested': requested} + except Exception as err: + retval = {'status': 'failed', + 'requested': requested, + 'msg': str(err)} + print('error: {}'.format(err)) + + if errs: + update = {'status': 'warning', + 'rejected': len(errs), + 'errors': errs} + retval.update(update) + + dbg = args.get('debug', False) + if dbg: + print(retval) + + return retval + + +def main(args): + if 'status' not in args: + return {'status': 'failure', + 'messages': ['no message status found']} + + if args['status'] != 'success': + return {'status': 'failure', + 'messages': ['message with status failure found']} + + if 'messages' not in args: + return {'status': 'failure', + 'messages': ['no messages found']} + + return process(args) diff --git a/openwhisk/install.sh b/openwhisk/install.sh index f9ec79e..362942a 100755 --- a/openwhisk/install.sh +++ b/openwhisk/install.sh @@ -1,6 +1,7 @@ #!/bin/bash #Author: Mark Purcell (markpurcell@ie.ibm.com) + if [[ ! -f my_setup.sh ]]; then echo "Must run provision.sh first " exit 1 @@ -16,15 +17,20 @@ if [[ ! -f $DIR/get_ts-$DBVERSION.zip ]]; then echo "Must run make build first " exit 1 fi -if [[ ! -f $DIR/insert-$DBVERSION.zip ]]; then +if [[ ! -f $DIR/get_devices-$DBVERSION.zip ]]; then echo "Must run make build first " exit 1 fi -if [[ ! -f $DIR/get_devices-$DBVERSION.zip ]]; then +if [[ ! -f $DIR/db2-insert-$DBVERSION.zip ]]; then + echo "Must run make build first " + exit 1 +fi +if [[ ! -f $DIR/mhub-parse-$DBVERSION.zip ]]; then echo "Must run make build first " exit 1 fi +# create/update get timeseries action ACTION=create EXISTS=$(bx wsk action list | grep $WSK_TS) if [ -n "$EXISTS" ]; then @@ -32,19 +38,37 @@ if [ -n "$EXISTS" ]; then fi bx wsk action $ACTION --kind python:3 $DB2_PARAMS --param database $DB2_SCHEMA.GET_TS --param debug true $WSK_TS $DIR/get_ts-$DBVERSION.zip +# create/update get devices action ACTION=create -EXISTS=$(bx wsk action list | grep $WSK_ADD) +EXISTS=$(bx wsk action list | grep $WSK_DEV) if [ -n "$EXISTS" ]; then ACTION=update fi -bx wsk action $ACTION --kind python:3 $DB2_PARAMS --param database $DB2_SCHEMA.ADD_TS --param debug true $WSK_ADD $DIR/insert-$DBVERSION.zip +bx wsk action $ACTION --kind python:3 $DB2_PARAMS --param database $DB2_SCHEMA.GET_DEVICES --param debug true $WSK_DEV $DIR/get_devices-$DBVERSION.zip +# create/update message hub parse action ACTION=create -EXISTS=$(bx wsk action list | grep $WSK_DEV) +EXISTS=$(bx wsk action list | grep $WSK_MHUB) if [ -n "$EXISTS" ]; then ACTION=update fi -bx wsk action $ACTION --kind python:3 $DB2_PARAMS --param database $DB2_SCHEMA.GET_DEVICES --param debug true $WSK_DEV $DIR/get_devices-$DBVERSION.zip +bx wsk action $ACTION --kind python:3 $WSK_MHUB $DIR/mhub-parse-$DBVERSION.zip + +# create/update timeseries insert action +ACTION=create +EXISTS=$(bx wsk action list | grep $WSK_TSINSERT) +if [ -n "$EXISTS" ]; then + ACTION=update +fi +bx wsk action $ACTION --kind python:3 $DB2_PARAMS --param database $DB2_SCHEMA.ADD_TS --param debug true $WSK_TSINSERT $DIR/db2-insert-$DBVERSION.zip + +# create/update insert sequence +ACTION=create +EXISTS=$(bx wsk action list | grep $WSK_INSERT) +if [ -n "$EXISTS" ]; then + ACTION=update +fi +bx wsk action create $WSK_INSERT --sequence $WSK_MHUB,$WSK_TSINSERT bx wsk package refresh @@ -60,5 +84,4 @@ fi increment=$RANDOM bx wsk trigger create $CRISTATA_WSK_TRIGGER$increment --feed $WSK_FEED --param topic $TOPIC_MHUB -bx wsk rule create $CRISTATA_WSK_RULE$increment $CRISTATA_WSK_TRIGGER$increment $WSK_ADD - +bx wsk rule create $CRISTATA_WSK_RULE$increment $CRISTATA_WSK_TRIGGER$increment $WSK_INSERT diff --git a/openwhisk/makefile b/openwhisk/makefile index 9aac8b9..2721bd8 100644 --- a/openwhisk/makefile +++ b/openwhisk/makefile @@ -9,8 +9,10 @@ clean: -@rm -f __main__.py build: - @cp insert.py __main__.py - zip -r insert-$(VERSION).zip __main__.py db2.py + @cp mhub_parse.py __main__.py + zip -r mhub-parse-$(VERSION).zip __main__.py + @cp db2_insert.py __main__.py + zip -r db2-insert-$(VERSION).zip __main__.py db2.py @cp ts.py __main__.py zip -r get_ts-$(VERSION).zip __main__.py db2.py @cp devices.py __main__.py diff --git a/openwhisk/mhub_parse.py b/openwhisk/mhub_parse.py new file mode 100644 index 0000000..1e712f8 --- /dev/null +++ b/openwhisk/mhub_parse.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python +#author mark_purcell@ie.ibm.com + +'''parse message fromm message hub''' +import json + + +def process(args): + ''' process data coming from Message Hub + + data fromm Message Hub looks like, + { + 'messages': + [ + { + 'partition': 0, + 'key': '{\"orgId\":\"abcdef\",\"deviceType\":\"10101\",\"deviceId\":\"800800803\",\"eventType\":\"status\",\"format\":\"json\",\"timestamp\":\"2018-08-13T08:54:18.966Z\"}', + 'offset': 2, + 'topic': 'watson-iot', + 'value': '[{\"observed_timestamp\": \"2018-04-22 13:24:23\", \"value\": 52.2, \"device_id\": \"1\"}, {\"observed_timestamp\": \"2018-04-22 13:24:24\", \"value\": 57.2, \"device_id\": \"1\"}, {\"observed_timestamp\": \"2018-04-22 13:24:25\", \"value\": \"something\", \"device_id\": \"1\"}, {\"observed_timestamp\": \"2018-04-22 13:24:26\", \"text\": \"something else\", \"device_id\": \"1\"}, {\"observed_timestamp\": \"2018-04-22 13:24:27\", \"text\": \"more\", \"value\": 444.22, \"device_id\": \"1\"}]' + }, + ... + ] + } + returns, + + { + 'status': 'success', + 'messages': + [ + { + 'observed_timestamp': '2018-04-22 13:24:23', + 'value': 52.2, + 'device_id': '1' + }, + { + 'observed_timestamp': '2018-04-22 13:24:24', + 'value': 57.2, + 'device_id': '1' + }, + { + 'observed_timestamp': '2018-04-22 13:24:25', + 'value': 'something', + 'device_id': '1' + }, + { + 'observed_timestamp': '2018-04-22 13:24:26', + 'text': 'something else', + 'device_id': '1' + }, + { + 'observed_timestamp': '2018-04-22 13:24:27', + 'text': 'more', + 'value': 444.22, + 'device_id': '1' + } + ] + } + ''' + + try: + if 'messages' not in args: + return {'status': 'failure', 'messages': ['no messages found']} + + values = [] + for message in args['messages']: + if 'value' in message: + value = json.loads(message['value']) + # this produces an array of json objects + for v in value: + values.append(v) + + if len(values) == 0: + return {'status': 'failure', 'messages': ['no values found']} + + return {'status': 'success', 'messages': values} + except Execption as e: + print('error: {}'.format(e)) + return {'status': 'failure', 'messages': ['exception thrown']} + + +def main(args): + return process(args) diff --git a/service_names.sh b/service_names.sh index 732f7ba..1e18a3b 100644 --- a/service_names.sh +++ b/service_names.sh @@ -16,8 +16,10 @@ export TOPIC_MHUB=watson-iot export CRISTATA_WSK_TRIGGER=$prefix-mhub- export CRISTATA_WSK_RULE=$prefix-mhub-rule- export WSK_TS=$prefix-TimeSeriesRetrieve -export WSK_ADD=$prefix-TimeSeriesInsert export WSK_DEV=$prefix-DeviceListing +export WSK_MHUB=$prefix-MessageHubParse +export WSK_TSINSERT=$prefix-DB2Insert +export WSK_INSERT=$prefix-TimeSeriesInsert export WSK_FEED='Bluemix_'$CRISTATA_MHUB'_'$CREDENTIAL_MHUB'/messageHubFeed'