Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ jobs:
env:
PYTHON_VERSION: ${{ matrix.python_version }}

- name: Fix PsychXR numpy dependency DLL issues (Windows only)
if: matrix.os == 'windows-latest'
run: |
conda install --force-reinstall numpy

- name: Run eegnb install test
run: |
Expand Down
172 changes: 162 additions & 10 deletions eegnb/devices/eeg.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
"""

import sys
import time
import logging
from time import sleep
from time import sleep, time
from datetime import datetime
from multiprocessing import Process

import numpy as np
Expand All @@ -26,6 +26,8 @@
EEG_CHANNELS,
)

import socket, json, struct


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -54,6 +56,7 @@
]



class EEG:
device_name: str
stream_started: bool = False
Expand All @@ -67,8 +70,8 @@ def __init__(
other=None,
ip_addr=None,
ch_names=None,
config=None
):
config=None,
make_logfile=False):
"""The initialization function takes the name of the EEG device and determines whether or not
the device belongs to the Muse or Brainflow families and initializes the appropriate backend.

Expand All @@ -86,6 +89,7 @@ def __init__(
self.ip_addr = ip_addr
self.other = other
self.config = config
self.make_logfile = make_logfile # currently only used for kf
self.backend = self._get_backend(self.device_name)
self.initialize_backend()
self.n_channels = len(EEG_INDICES[self.device_name])
Expand All @@ -94,23 +98,30 @@ def __init__(
self.ch_names = ch_names

def initialize_backend(self):
# run this at initialization to get some
# stream metadata into the eeg class
if self.backend == "brainflow":
self._init_brainflow()
self.timestamp_channel = BoardShim.get_timestamp_channel(self.brainflow_id)
elif self.backend == "muselsl":
self._init_muselsl()
self._muse_get_recent() # run this at initialization to get some
# stream metadata into the eeg class
self._muse_get_recent()
elif self.backend == "kernelflow":
self._init_kf()

def _get_backend(self, device_name):
if device_name in brainflow_devices:
return "brainflow"
elif device_name in ["muse2016", "muse2", "museS"]:
return "muselsl"
elif device_name in ["kernelflow"]:
return "kernelflow"


#####################
# MUSE functions #
#####################

def _init_muselsl(self):
# Currently there's nothing we need to do here. However keeping the
# option open to add things with this init method.
Expand Down Expand Up @@ -141,9 +152,9 @@ def _start_muse(self, duration):
self.recording = Process(target=record, args=(duration, self.save_fn))
self.recording.start()

time.sleep(5)
sleep(5)
self.stream_started = True
self.push_sample([99], timestamp=time.time())
self.push_sample([99], timestamp=time())

def _stop_muse(self):
pass
Expand Down Expand Up @@ -188,9 +199,11 @@ def _muse_get_recent(self, n_samples: int = 256, restart_inlet: bool = False):
df = pd.DataFrame(samples, index=timestamps, columns=ch_names)
return df


##########################
# BrainFlow functions #
##########################

def _init_brainflow(self):
"""This function initializes the brainflow backend based on the input device name. It calls
a utility function to determine the appropriate USB port to use based on the current operating system.
Expand Down Expand Up @@ -410,10 +423,143 @@ def _brainflow_get_recent(self, n_samples=256):
# print (df)
return df



###########################
# Kernel Flow functions #
###########################


def _init_kf(self):

self._notes = None #muse_recent_inlet = None

# Grab the init time for tracking
dtstr = str(datetime.now()).replace(' ', '_').split('.')[0].replace(':', '-')

# Initiate connection to trigger-recording port
host = 'localhost' # could be another computer on network with a visible IP address?
port = 6767
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
self.kf_host = host
self.kf_port = port
self.kf_socket = sock

# Triggers history list
self.kf_triggers_history = []
self.kf_triggers_history.append('Initiated connection: %s' %dtstr)

# Optionally maintain a logfile
if self.make_logfile:
self.kf_logfile_fname = 'eegexpy_kf_logfile__%s.txt' % dtstr
self.kf_logfile_handle = open(self.kf_logfile_fname, 'a')
self.kf_logfile_handle.write('KF TRIGGERS LOGFILE %s \n\n' %dtstr)


def _start_kf(self): #:, duration):

kf_start_timestamp = int(time()*1e6)

self.kf_evnum = 0
self.kf_trialnum = 0

# Send first data packet
data_to_send = {"id": self.kf_evnum,
"timestamp": kf_start_timestamp,
"event": "start_experiment",
"value": "0"
}
self._kf_sendeventinfo(data_to_send)

# Update logfile text
self.kf_triggers_history.append({'kf_evnum': '%s' %self.kf_evnum,
'kf_start_timestamp': kf_start_timestamp,
'packet_sent': data_to_send})


def _kf_push_sample(self, timestamp, marker, marker_name):


self.kf_trialnum += 1

# 1/3: Send start trial trigger
self.kf_evnum+=1
kf_trigger_timestamp = int(time()*1e6)
data_to_send = {
"id": self.kf_evnum, #event_id,
"timestamp": kf_trigger_timestamp, # timestamp
"event": 'start_trial', #marker_name, #event_name,
"value": str(self.kf_trialnum), #str(marker_name),
}
self._kf_sendeventinfo(data_to_send)
self.kf_triggers_history.append({'kf_evnum': '%s' %self.kf_evnum,
'kf_trigger_timestamp': kf_trigger_timestamp,
'experiment_timestamp': timestamp,
'packet_sent': data_to_send})
# 2/3: Send trial_type trigger
self.kf_evnum+=1
kf_trigger_timestamp = int(time()*1e6)
data_to_send = {
"id": self.kf_evnum, #event_id,
"timestamp": kf_trigger_timestamp, # timestamp
"event": 'trial_type', #marker_name, #event_name,
"value": str(marker), #str(marker_name),
}
self._kf_sendeventinfo(data_to_send)
self.kf_triggers_history.append({'kf_evnum': '%s' %self.kf_evnum,
'kf_trigger_timestamp': kf_trigger_timestamp,
'experiment_timestamp': timestamp,
'packet_sent': data_to_send})
# 3/3: Send end trial trigger
self.kf_evnum+=1
kf_trigger_timestamp = int(time()*1e6)
data_to_send = {
"id": self.kf_evnum, #event_id,
"timestamp": kf_trigger_timestamp, # timestamp
"event": 'end_trial', #marker_name, #event_name,
"value": str(self.kf_trialnum), #str(marker_name),
}
self._kf_sendeventinfo(data_to_send)
self.kf_triggers_history.append({'kf_evnum': '%s' %self.kf_evnum,
'kf_trigger_timestamp': kf_trigger_timestamp,
'experiment_timestamp': timestamp,
'packet_sent': data_to_send})

def _stop_kf(self):

self.kf_evnum+=1
kf_stop_timestamp = int(time()*1e6)

# Send end experiment trigger
data_to_send = {
"id": self.kf_evnum,
"timestamp": kf_stop_timestamp,
"event": "end_experiment",
"value": "1"
}
self._kf_sendeventinfo(data_to_send)

self.kf_triggers_history.append({'kf_evnum': '%s' % self.kf_evnum,
'kf_stop_timestamp': kf_stop_timestamp,
'packet_sent': data_to_send})

if self.make_logfile:
self.kf_logfile_handle.write(self.kf_triggers_history)
self.kf_logfile_handle.close()


def _kf_sendeventinfo(self, event_info):

event_info_pack = json.dumps(event_info).encode("utf-8")
msg = struct.pack("!I", len(event_info_pack)) + event_info_pack
self.kf_socket.sendall(msg)


#################################
# Highlevel device functions #
#################################

def start(self, fn, duration=None):
"""Starts the EEG device based on the defined backend.

Expand All @@ -428,8 +574,10 @@ def start(self, fn, duration=None):
self.markers = []
elif self.backend == "muselsl":
self._start_muse(duration)
elif self.backend == "kernelflow":
self._start_kf()

def push_sample(self, marker, timestamp):
def push_sample(self, marker, timestamp, marker_name=None):
"""
Universal method for pushing a marker and its timestamp to store alongside the EEG data.

Expand All @@ -441,12 +589,16 @@ def push_sample(self, marker, timestamp):
self._brainflow_push_sample(marker=marker)
elif self.backend == "muselsl":
self._muse_push_sample(marker=marker, timestamp=timestamp)
elif self.backend == "kernelflow":
self._kf_push_sample(marker=marker,timestamp=timestamp, marker_name=marker_name)

def stop(self):
if self.backend == "brainflow":
self._stop_brainflow()
elif self.backend == "muselsl":
pass
elif self.backend == "kernelflow":
self._stop_kf()

def get_recent(self, n_samples: int = 256):
"""
Expand Down
9 changes: 6 additions & 3 deletions eegnb/devices/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"notion2": BoardShim.get_eeg_names(BoardIds.NOTION_2_BOARD.value),
"crown": BoardShim.get_eeg_names(BoardIds.CROWN_BOARD.value),
"freeeeg32": [f"eeg_{i}" for i in range(0, 32)],
"kernelflow": []
}

BRAINFLOW_CHANNELS = {
Expand Down Expand Up @@ -56,7 +57,8 @@
"notion2": BoardShim.get_eeg_channels(BoardIds.NOTION_2_BOARD.value),
"crown": BoardShim.get_eeg_channels(BoardIds.CROWN_BOARD.value),
"freeeeg32": BoardShim.get_eeg_channels(BoardIds.FREEEEG32_BOARD.value),
}
"kernelflow": [],
}

SAMPLE_FREQS = {
"muse2016": 256,
Expand All @@ -78,7 +80,8 @@
"notion2": BoardShim.get_sampling_rate(BoardIds.NOTION_2_BOARD.value),
"crown": BoardShim.get_sampling_rate(BoardIds.CROWN_BOARD.value),
"freeeeg32": BoardShim.get_sampling_rate(BoardIds.FREEEEG32_BOARD.value),
}
"kernelflow": [],
}


def create_stim_array(timestamps, markers):
Expand All @@ -88,7 +91,7 @@ def create_stim_array(timestamps, markers):
timestamps (array of floats): Timestamps from the EEG data.
markers (array of ints): Markers and their associated timestamps.
"""
marker_max = np.max(markers)
# marker_max = np.max(markers)
num_samples = len(timestamps)
stim_array = np.zeros((num_samples, 1))
for marker in markers:
Expand Down
11 changes: 5 additions & 6 deletions eegnb/experiments/Experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@

from abc import abstractmethod
from typing import Callable
from psychopy import prefs
from psychopy.visual.rift import Rift
#change the pref libraty to PTB and set the latency mode to high precision
prefs.hardware['audioLib'] = 'PTB'
prefs.hardware['audioLatencyMode'] = 3

from time import time
import random
Expand All @@ -29,7 +25,7 @@
class BaseExperiment:

def __init__(self, exp_name, duration, eeg, save_fn, n_trials: int, iti: float, soa: float, jitter: float,
use_vr=False, use_fullscr = True):
use_vr=False, use_fullscr = True, screen_num=0):
""" Initializer for the Base Experiment Class

Args:
Expand All @@ -38,6 +34,7 @@ def __init__(self, exp_name, duration, eeg, save_fn, n_trials: int, iti: float,
soa (float): Stimulus on arrival
jitter (float): Random delay between stimulus
use_vr (bool): Use VR for displaying stimulus
screen_num (int): Screen number (if multiple monitors present)
"""

self.exp_name = exp_name
Expand All @@ -51,6 +48,7 @@ def __init__(self, exp_name, duration, eeg, save_fn, n_trials: int, iti: float,
self.soa = soa
self.jitter = jitter
self.use_vr = use_vr
self.screen_num = screen_num
if use_vr:
# VR interface accessible by specific experiment classes for customizing and using controllers.
self.rift: Rift = visual.Rift(monoscopic=True, headLocked=True)
Expand Down Expand Up @@ -91,7 +89,8 @@ def setup(self, instructions=True):
# Setting up Graphics
self.window = (
self.rift if self.use_vr
else visual.Window(self.window_size, monitor="testMonitor", units="deg", fullscr=self.use_fullscr))
else visual.Window(self.window_size, monitor="testMonitor", units="deg",
screen = self.screen_num, fullscr=self.use_fullscr))

# Loading the stimulus from the specific experiment, throws an error if not overwritten in the specific experiment
self.stim = self.load_stimulus()
Expand Down
Loading