Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions learning_observer/learning_observer/blob_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import learning_observer.kvs
import learning_observer.stream_analytics.helpers as sa_helpers

def state_blob():
'''Dummy function for the reducer name portion of the
KVS key
'''
pass

def _make_key(user_id, source, activity):
'''Helper function to format keys for the KVS
'''
key = sa_helpers.make_key(
state_blob,
{
sa_helpers.EventField('source'): source,
sa_helpers.EventField('activity'): activity,
sa_helpers.KeyField.STUDENT: user_id
},
sa_helpers.KeyStateType.INTERNAL
)
return key

async def fetch_blob(user_id, source, activity):
'''Fetch the blob from the KVS
'''
key = _make_key(user_id, source, activity)
kvs = learning_observer.kvs.KVS()
return await kvs[key]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the key is missing from the kvs? Should probably return that there was nothing (empty object or None).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine for now. null and not None.

$ json.dumps(None)
'null'

In the long term, we should support having a default value. In the long term, I would envision this being part of a module rather than the core. But we've got a long way before we're ready for that.

(Also, note that's different from json.dumps("null") which is '"null"')


async def save_blob(user_id, source, activity, blob):
'''Store a blob in the KVS
'''
key = _make_key(user_id, source, activity)
kvs = learning_observer.kvs.KVS()
await kvs.set(key, blob)
33 changes: 32 additions & 1 deletion learning_observer/learning_observer/incoming_student_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import learning_observer.auth.events
import learning_observer.adapters.adapter
import learning_observer.blacklist
import learning_observer.blob_storage

import learning_observer.constants as constants

Expand Down Expand Up @@ -427,7 +428,7 @@ async def decode_lock_fields(events):
'''
async for event in events:
if event['event'] == 'lock_fields':
if event['fields'].get('source', '') != lock_fields.get('source', ''):
if 'source' not in event['fields'] or event['fields'].get('source', '') != lock_fields.get('source', ''):
lock_fields.update(event['fields'])
else:
event.update(lock_fields)
Expand All @@ -447,6 +448,35 @@ async def filter_blacklist_events(events):
await ws.send_json(bl_status)
await ws.close()

async def process_blob_storage_events(events):
'''HACK This function manages events related to storing and
retrieving blobs from server-side storage. It is primarily
used for LO Assess. Ideally, this functionality should reside
in an independent module, rather than being directly integrated
into Learning Observer, as it is currently implemented.
'''
async for event in events:
# Extract metadata
if event['event'] in ['save_blob', 'fetch_blob']:
user_id = event['auth']['user_id']
source = event['source']
activity = event['activity']

# Save, fetch, or ignore (continue)
if event['event'] == 'save_blob':
await learning_observer.blob_storage.save_blob(
user_id, source, activity,
event['blob']
)
elif event['event'] == 'fetch_blob':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bradley-erickson I am wondering how this will play with client-side code. Ideally, this would be spent back before we're done with metadata / init / headers / auth (before the events start streaming).

We should chat in the morning about whether the client should request this when opening the websocket or whether this should be automatically sent.

blob = await learning_observer.blob_storage.fetch_blob(user_id, source, activity)
await ws.send_json({
'status': 'fetch_blob',
'data': blob
})
else:
yield event

async def check_for_reducer_update(events):
'''Check to see if the reducers updated
'''
Expand All @@ -470,6 +500,7 @@ async def process_ws_message_through_pipeline():
events = decode_lock_fields(events)
events = handle_auth_events(events)
events = filter_blacklist_events(events)
events = process_blob_storage_events(events)
events = check_for_reducer_update(events)
events = pass_through_reducers(events)
# empty loop to start the generator pipeline
Expand Down
4 changes: 2 additions & 2 deletions modules/lo_event/lo_event/browserStorage.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ const thunkStorage = {
* `storage.sync.get`/`chrome.sync.get` API.
*/
function getWithCallback (getItem) {
function get (items, callback) {
function get (items, callback = () => {}) {
if (typeof items === 'string') {
items = [items];
}
Expand All @@ -77,7 +77,7 @@ function getWithCallback (getItem) {
* `storage.sync.set`/`chrome.sync.set` API.
*/
function setWithCallback (setItem) {
function set (items, callback) {
function set (items, callback = () => {}) {
for (const item in items) {
setItem(item, items[item]);
}
Expand Down
7 changes: 6 additions & 1 deletion modules/lo_event/lo_event/lo_assess/reducers.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ const DEBUG = false;
const dclog = (...args) => {if(DEBUG) {console.log.apply(console, Array.from(args));} };

export const LOAD_DATA_EVENT = 'LOAD_DATA_EVENT';
export const LOAD_STATE = 'LOAD_STATE';
export const NAVIGATE = 'NAVIGATE';
export const SHOW_SECTION='SHOW_SECTION';
export const STEPTHROUGH_NEXT = 'STEPTHROUGH_NEXT';
export const STEPTHROUGH_PREV = 'STEPTHROUGH_PREV';
export const STORE_VARIABLE = 'STORE_VARIABLE';
export const STORE_SETTING = 'STORE_SETTING';
export const UPDATE_INPUT = 'UPDATE_INPUT';
export const UPDATE_LLM_RESPONSE = 'UPDATE_LLM_RESPONSE';
export const VIDEO_TIME_EVENT = 'VIDEO_TIME_EVENT';
Expand Down Expand Up @@ -46,11 +48,14 @@ export const updateResponseReducer = (state = initialState, action) => {

registerReducer(
[LOAD_DATA_EVENT,
LOAD_STATE,
NAVIGATE,
SHOW_SECTION,
STEPTHROUGH_NEXT, STEPTHROUGH_PREV,
STORE_SETTING,
STORE_VARIABLE,
UPDATE_INPUT,
UPDATE_LLM_RESPONSE, VIDEO_TIME_EVENT],
UPDATE_LLM_RESPONSE,
VIDEO_TIME_EVENT],
updateResponseReducer
);
145 changes: 128 additions & 17 deletions modules/lo_event/lo_event/reduxLogger.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,92 @@
*/
import * as redux from 'redux';
import { thunk } from 'redux-thunk';
import { createStateSyncMiddleware, initMessageListener } from 'redux-state-sync';
import debounce from 'lodash/debounce';

import * as util from './util.js';

const EMIT_EVENT = 'EMIT_EVENT';
const EMIT_LOCKFIELDS = 'EMIT_LOCKFIELDS';
const EMIT_SET_STATE = 'SET_STATE';

let IS_LOADED = false;

// TODO: Import debugLog and use those functions.
const DEBUG = false;

function debug_log(...args) {
if(DEBUG) {
function debug_log (...args) {
if (DEBUG) {
console.log(...args);
}
}

/**
* Update the redux logger's state with `data`.
* This is fired when consuming a custom `fetch_blob`
* event.
*/
export function handleLoadState (data) {
IS_LOADED = true;
const state = store.getState();
if (data) {
setState(
{
...state,
...data,
settings: {
...state.settings,
reduxStoreStatus: IS_LOADED
}
});
} else {
debug_log('No data provided while handling state from server, continuing.');
setState(
{
...state,
settings: {
...state.settings,
reduxStoreStatus: IS_LOADED
}
});
}
}

async function saveStateToLocalStorage (state) {
if (!IS_LOADED) {
debug_log('Not saving store locally because IS_LOADED is set to false.');
return;
}

try {
const KEY = state?.settings?.reduxID || 'redux';
const serializedState = JSON.stringify(state);
localStorage.setItem(KEY, serializedState);
} catch (e) {
// Ignore
}
}

/**
* Dispatch a `save_blob` event on the redux
* logger.
*/
async function saveStateToServer (state) {
if (!IS_LOADED) {
debug_log('Not saving store on the server because IS_LOADED is set to false.');
return;
}

try {
// console.log("dispatching save_blob")
util.dispatchCustomEvent('save_blob', { detail: state });
// store.dispatch('save_blob', { detail: state });
} catch (e) {
// Ignore
debug_log('Error in dispatch', { e });
}
}

// Action creator function This is a little bit messy, since we
// duplicate type from the payload. It's not clear if this is a good
// idea. We used to have `type` be set to the current contents of
Expand Down Expand Up @@ -65,11 +137,11 @@ const emitSetState = (state) => {
};
};

function store_last_event_reducer(state = {}, action) {
function store_last_event_reducer (state = {}, action) {
return { ...state, event: action.payload };
};

function lock_fields_reducer(state = {}, action) {
function lock_fields_reducer (state = {}, action) {
const payload = JSON.parse(action.payload);
return {
...state,
Expand Down Expand Up @@ -116,7 +188,7 @@ export const updateComponentStateReducer = ({}) => (state = initialState, action
return new_state;
}

function set_state_reducer(state = {}, action) {
function set_state_reducer (state = {}, action) {
return action.payload;
}

Expand All @@ -126,16 +198,16 @@ const BASE_REDUCERS = {
[EMIT_SET_STATE]: [set_state_reducer]
}

const APPLICATION_REDUCERS = {
}
const APPLICATION_REDUCERS = {};

export const registerReducer = (keys, reducer) => {
const reducerKeys = Array.isArray(keys) ? keys : [keys];

reducerKeys.forEach(key => {
if (!APPLICATION_REDUCERS[key])
debug_log('registering key: ' + key);
if (!APPLICATION_REDUCERS[key]) {
APPLICATION_REDUCERS[key] = [];

}
APPLICATION_REDUCERS[key].push(reducer);
});
return reducer;
Expand All @@ -145,11 +217,20 @@ export const registerReducer = (keys, reducer) => {
const reducer = (state = {}, action) => {
let payload;

debug_log("Reducing ", action," on ", state);
debug_log('Reducing ', action, ' on ', state);
state = BASE_REDUCERS[action.redux_type] ? composeReducers(...BASE_REDUCERS[action.redux_type])(state, action) : state;

if (action.redux_type === EMIT_EVENT) {
payload = JSON.parse(action.payload);
if (action.type === 'save_setting') {
return {
...state,
settings: {
...state.settings,
payload
}
};
}
debug_log(Object.keys(payload));

if (APPLICATION_REDUCERS[payload.event]) {
Expand All @@ -160,21 +241,24 @@ const reducer = (state = {}, action) => {
return state;
};


const eventQueue = [];
const composeEnhancers = (typeof window !== 'undefined' && window.__REDUX_DEVTOOLS_EXTENSION_COMPOSE__) || redux.compose;


// This should just be redux.applyMiddleware(thunk))
// There is a bug in our version of redux-thunk where, in node, this must be thunk.default.
//
// This shows up as an error in the test case. If the error goes away, we should switch this
// back to thunk.
// const presistedState = loadState();

export let store = redux.createStore(
reducer,
{event: null}, // Base state
composeEnhancers(redux.applyMiddleware(thunk.default || thunk))
{ event: null }, // Base state
composeEnhancers(redux.applyMiddleware((thunk.default || thunk), createStateSyncMiddleware()))
);

initMessageListener(store);

let promise = null;
let previousEvent = null;
let lockFields = null;
Expand All @@ -200,13 +284,35 @@ function composeReducers(...reducers) {
}

export function setState(state) {
debug_log("Set state called");
debug_log('Set state called');
if (Object.keys(state).length === 0) {
const storeState = store.getState();
state = {
settings: {
...storeState.settings,
reduxStoreStatus: IS_LOADED
}
};
}
store.dispatch(emitSetState(state));
}

const debouncedSaveStateToLocalStorage = debounce((state) => {
saveStateToLocalStorage(state);
}, 1000);

const debouncedSaveStateToServer = debounce((state) => {
saveStateToServer(state);
}, 1000);

function initializeStore () {
store.subscribe(() => {
const state = store.getState();
// we use debounce to save the state once every second
// for better performances in case multiple changes occur in a short time
debouncedSaveStateToLocalStorage(state);
debouncedSaveStateToServer(state);

if (state.lock_fields) {
lockFields = state.lock_fields.fields;
}
Expand All @@ -233,7 +339,7 @@ function initializeStore () {
});
}

export function reduxLogger (subscribers, initialState = {}) {
export function reduxLogger (subscribers, initialState = null) {
if (subscribers != null) {
eventSubscribers = subscribers;
}
Expand All @@ -254,7 +360,9 @@ export function reduxLogger (subscribers, initialState = {}) {

logEvent.getLockFields = function () { return lockFields; };

setState(initialState);
// do we want to initialize the store here? We set it to the stored state in create store
// if (initialState) {
// }

return logEvent;
}
Expand Down Expand Up @@ -282,3 +390,6 @@ export const awaitEvent = () => {
promise.resolve = resolvePromise;
return promise;
};

// Start listening for fetch
util.consumeCustomEvent('fetch_blob', handleLoadState);
Loading
Loading