Skip to content
This repository was archived by the owner on Jul 22, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions openwhisk/db2_insert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#!/usr/bin/env python
# author mark_purcell@ie.ibm.com

import json
from db2 import DB2

MAX_INSERT_COUNT = 1000

# insert a double value
SQL_TEMPLATE1 = "CALL {database}('{{device_id}}', TIMESTAMP('{{observed}}'), {{value}})"

# insert a combination double/text value
SQL_TEMPLATE2 = "CALL {database}('{{device_id}}', TIMESTAMP('{{observed}}'), {{value}}, '{{text}}')"

# insert a text value
SQL_TEMPLATE3 = "CALL {database}('{{device_id}}', TIMESTAMP('{{observed}}'), NULL, '{{text}}')"


def gen_statement_insert(database, row):
'''generate sql insert statment

returns: (statment, true) on success, (error, false) otherwise
'''
sql_template1 = SQL_TEMPLATE1.format(database=database)
sql_template2 = SQL_TEMPLATE2.format(database=database)
sql_template3 = SQL_TEMPLATE3.format(database=database)

sql = ''

try:
if 'device_id' not in row:
return sql

if 'observed_timestamp' not in row:
return sql

did = row['device_id']
ots = row['observed_timestamp']

txt, val = None, None
if 'value' in row:
val = row['value']
if 'text' in row:
txt = row['text']

if val is not None:
if type(val) == str:
# string value - ignore any text
sql = sql_template3.format(device_id=did, observed=ots, text=val)
elif txt is not None:
sql = sql_template2.format(device_id=did, observed=ots, value=val, text=txt)
else:
sql = sql_template1.format(device_id=did, observed=ots, value=val)
elif txt is not None:
sql = sql_template3.format(device_id=did, observed=ots, text=txt)

except Exception as err:
print('error: {}'.format(err))
return str(err), False

return sql, True


def process(args):
'''insert messages into db2'''
errs = []
commands = 0
requested = 0

try:
db2 = DB2(args, ['database'])
count = 0
sql = ''

for msg in args['messages']:
stmt, err = gen_statement_insert(args['database'], msg)
if not err:
errs.append(err)
continue

sql += stmt + '; '

count += 1
if count == MAX_INSERT_COUNT:
rsp = db2.insert(sql)
d = rsp.json()
commands += d['commands_count']
requested += count
count = 0
sql = ''

if count != 0:
rsp = db2.insert(sql)
d = rsp.json()
commands += d['commands_count']
requested += count

retval = {'status': 'success',
'requested': requested}
except Exception as err:
retval = {'status': 'failed',
'requested': requested,
'msg': str(err)}
print('error: {}'.format(err))

if errs:
update = {'status': 'warning',
'rejected': len(errs),
'errors': errs}
retval.update(update)

dbg = args.get('debug', False)
if dbg:
print(retval)

return retval


def main(args):
if 'status' not in args:
return {'status': 'failure',
'messages': ['no message status found']}

if args['status'] != 'success':
return {'status': 'failure',
'messages': ['message with status failure found']}

if 'messages' not in args:
return {'status': 'failure',
'messages': ['no messages found']}

return process(args)
39 changes: 31 additions & 8 deletions openwhisk/install.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/bin/bash
#Author: Mark Purcell (markpurcell@ie.ibm.com)


if [[ ! -f my_setup.sh ]]; then
echo "Must run provision.sh first "
exit 1
Expand All @@ -16,35 +17,58 @@ if [[ ! -f $DIR/get_ts-$DBVERSION.zip ]]; then
echo "Must run make build first "
exit 1
fi
if [[ ! -f $DIR/insert-$DBVERSION.zip ]]; then
if [[ ! -f $DIR/get_devices-$DBVERSION.zip ]]; then
echo "Must run make build first "
exit 1
fi
if [[ ! -f $DIR/get_devices-$DBVERSION.zip ]]; then
if [[ ! -f $DIR/db2-insert-$DBVERSION.zip ]]; then
echo "Must run make build first "
exit 1
fi
if [[ ! -f $DIR/mhub-parse-$DBVERSION.zip ]]; then
echo "Must run make build first "
exit 1
fi

# create/update get timeseries action
ACTION=create
EXISTS=$(bx wsk action list | grep $WSK_TS)
if [ -n "$EXISTS" ]; then
ACTION=update
fi
bx wsk action $ACTION --kind python:3 $DB2_PARAMS --param database $DB2_SCHEMA.GET_TS --param debug true $WSK_TS $DIR/get_ts-$DBVERSION.zip

# create/update get devices action
ACTION=create
EXISTS=$(bx wsk action list | grep $WSK_ADD)
EXISTS=$(bx wsk action list | grep $WSK_DEV)
if [ -n "$EXISTS" ]; then
ACTION=update
fi
bx wsk action $ACTION --kind python:3 $DB2_PARAMS --param database $DB2_SCHEMA.ADD_TS --param debug true $WSK_ADD $DIR/insert-$DBVERSION.zip
bx wsk action $ACTION --kind python:3 $DB2_PARAMS --param database $DB2_SCHEMA.GET_DEVICES --param debug true $WSK_DEV $DIR/get_devices-$DBVERSION.zip

# create/update message hub parse action
ACTION=create
EXISTS=$(bx wsk action list | grep $WSK_DEV)
EXISTS=$(bx wsk action list | grep $WSK_MHUB)
if [ -n "$EXISTS" ]; then
ACTION=update
fi
bx wsk action $ACTION --kind python:3 $DB2_PARAMS --param database $DB2_SCHEMA.GET_DEVICES --param debug true $WSK_DEV $DIR/get_devices-$DBVERSION.zip
bx wsk action $ACTION --kind python:3 $WSK_MHUB $DIR/mhub-parse-$DBVERSION.zip

# create/update timeseries insert action
ACTION=create
EXISTS=$(bx wsk action list | grep $WSK_TSINSERT)
if [ -n "$EXISTS" ]; then
ACTION=update
fi
bx wsk action $ACTION --kind python:3 $DB2_PARAMS --param database $DB2_SCHEMA.ADD_TS --param debug true $WSK_TSINSERT $DIR/db2-insert-$DBVERSION.zip

# create/update insert sequence
ACTION=create
EXISTS=$(bx wsk action list | grep $WSK_INSERT)
if [ -n "$EXISTS" ]; then
ACTION=update
fi
bx wsk action create $WSK_INSERT --sequence $WSK_MHUB,$WSK_TSINSERT

bx wsk package refresh

Expand All @@ -60,5 +84,4 @@ fi

increment=$RANDOM
bx wsk trigger create $CRISTATA_WSK_TRIGGER$increment --feed $WSK_FEED --param topic $TOPIC_MHUB
bx wsk rule create $CRISTATA_WSK_RULE$increment $CRISTATA_WSK_TRIGGER$increment $WSK_ADD

bx wsk rule create $CRISTATA_WSK_RULE$increment $CRISTATA_WSK_TRIGGER$increment $WSK_INSERT
6 changes: 4 additions & 2 deletions openwhisk/makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ clean:
-@rm -f __main__.py

build:
@cp insert.py __main__.py
zip -r insert-$(VERSION).zip __main__.py db2.py
@cp mhub_parse.py __main__.py
zip -r mhub-parse-$(VERSION).zip __main__.py
@cp db2_insert.py __main__.py
zip -r db2-insert-$(VERSION).zip __main__.py db2.py
@cp ts.py __main__.py
zip -r get_ts-$(VERSION).zip __main__.py db2.py
@cp devices.py __main__.py
Expand Down
83 changes: 83 additions & 0 deletions openwhisk/mhub_parse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#!/usr/bin/env python
#author mark_purcell@ie.ibm.com

'''parse message fromm message hub'''
import json


def process(args):
''' process data coming from Message Hub

data fromm Message Hub looks like,
{
'messages':
[
{
'partition': 0,
'key': '{\"orgId\":\"abcdef\",\"deviceType\":\"10101\",\"deviceId\":\"800800803\",\"eventType\":\"status\",\"format\":\"json\",\"timestamp\":\"2018-08-13T08:54:18.966Z\"}',
'offset': 2,
'topic': 'watson-iot',
'value': '[{\"observed_timestamp\": \"2018-04-22 13:24:23\", \"value\": 52.2, \"device_id\": \"1\"}, {\"observed_timestamp\": \"2018-04-22 13:24:24\", \"value\": 57.2, \"device_id\": \"1\"}, {\"observed_timestamp\": \"2018-04-22 13:24:25\", \"value\": \"something\", \"device_id\": \"1\"}, {\"observed_timestamp\": \"2018-04-22 13:24:26\", \"text\": \"something else\", \"device_id\": \"1\"}, {\"observed_timestamp\": \"2018-04-22 13:24:27\", \"text\": \"more\", \"value\": 444.22, \"device_id\": \"1\"}]'
},
...
]
}
returns,

{
'status': 'success',
'messages':
[
{
'observed_timestamp': '2018-04-22 13:24:23',
'value': 52.2,
'device_id': '1'
},
{
'observed_timestamp': '2018-04-22 13:24:24',
'value': 57.2,
'device_id': '1'
},
{
'observed_timestamp': '2018-04-22 13:24:25',
'value': 'something',
'device_id': '1'
},
{
'observed_timestamp': '2018-04-22 13:24:26',
'text': 'something else',
'device_id': '1'
},
{
'observed_timestamp': '2018-04-22 13:24:27',
'text': 'more',
'value': 444.22,
'device_id': '1'
}
]
}
'''

try:
if 'messages' not in args:
return {'status': 'failure', 'messages': ['no messages found']}

values = []
for message in args['messages']:
if 'value' in message:
value = json.loads(message['value'])
# this produces an array of json objects
for v in value:
values.append(v)

if len(values) == 0:
return {'status': 'failure', 'messages': ['no values found']}

return {'status': 'success', 'messages': values}
except Execption as e:
print('error: {}'.format(e))
return {'status': 'failure', 'messages': ['exception thrown']}


def main(args):
return process(args)
4 changes: 3 additions & 1 deletion service_names.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ export TOPIC_MHUB=watson-iot
export CRISTATA_WSK_TRIGGER=$prefix-mhub-
export CRISTATA_WSK_RULE=$prefix-mhub-rule-
export WSK_TS=$prefix-TimeSeriesRetrieve
export WSK_ADD=$prefix-TimeSeriesInsert
export WSK_DEV=$prefix-DeviceListing
export WSK_MHUB=$prefix-MessageHubParse
export WSK_TSINSERT=$prefix-DB2Insert
export WSK_INSERT=$prefix-TimeSeriesInsert

export WSK_FEED='Bluemix_'$CRISTATA_MHUB'_'$CREDENTIAL_MHUB'/messageHubFeed'