Skip to content

Commit

Permalink
🚧 http_ttl: ar_eeg
Browse files Browse the repository at this point in the history
  • Loading branch information
WillForan committed May 10, 2024
1 parent baa22b0 commit b94f8f1
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 3 deletions.
12 changes: 12 additions & 0 deletions AR_EEG.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Rem run webserver to access task
Rem assume have psiclj.exe from native-image in this directory build
Rem will need to replace symlinks with files from resources/public/
Rem out/imgs/ out/audio/ out/style.css
Rem shouldn't need to set password, server is open if running on localhost
set HTTPDBPASS=""
set RUNTOKEN="psiclj_%RANDOM%"
set PYTHON=C:\Program Files\PsychoPy\python.exe

start %RUNTOKEN% cmd /c psiclj
start "" "%PYTHON%" http_ttl.py areeg
start "" http://127.0.0.1:3001/loeffeeg.html
74 changes: 71 additions & 3 deletions http_ttl.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from tornado.httpserver import HTTPServer
from tornado.web import RequestHandler, Application
from tornado.ioloop import IOLoop
from pynput.keyboard import Controller, Key
from pynput.keyboard import Controller, Key, Listener
import os
import os.path

Expand Down Expand Up @@ -154,6 +154,39 @@ async def watch(self):
self.trigger("5 sec trigger")


class KeyboardListener():
""" watch a real keyboard (cf. Cedrus, RTBox)
send ttl if approprate key pushed
left,right,up to ttl 2,3,4 matching cedrus (as of 20240510)
(1 reserved for photodiode)
"""
def __init__(self, hw, verbose=False):
self.hw = hw
self.verbose = verbose
self.keys_ttl = {'left': 2,'right': 3, 'up': 4}
self.keys = self.keys_ttl.keys()

def parse_key(self, key):
"send key if key is in "
try:
k = key.name # 'left', 'right', 'up'
except AttributeError:
k = key.char # single-char keys 'a'

if k in self.keys:
ttl = self.keys_ttl[k]
self.hw.send(ttl)

if self.verbose:
print(f"[{datetime.datetime.now()}] {k} pushed")

async def watch(self):
"watch keypresses in background. forward key on to see if button should be pushed"
listener = Listener(on_press=self.parse_key)
listener.start()
listener.join()


class Cedrus():
""" cedrus response box (RB-x40)
top 3 right buttons are 5, 6, 7 (0-2 left, 3,4 thumb 5-7 right)"""
Expand Down Expand Up @@ -320,14 +353,35 @@ def http_run(this_hardware):


async def loeffeeg(verbose=False):
"""settings for Luna Lab EEG in Loeffler building
uses Cedrus response box (+attached photodiode)
and TTL over LPT (biosemi)
"""
hw = LPT(address=0xD010, verbose=verbose)
kb = KB()
rb = Cedrus(hw, kb)
http_run(hw)
await asyncio.create_task(rb.watch())


async def ar_eeg(verbose=False):
"""
button pushes directly from they keyboard.
TTL over LPT (biosemi)
20240510WF - init for LAF
"""
port = 0xCFE8
hw = LPT(address=port, verbose=verbose)
rb = KeyboardListener(hw, verbose=verbose)
http_run(hw)
await asyncio.create_task(rb.watch())


async def seeg(verbose=False):
"""
sEEG at childrens hospital using USB 1208FS (binary TTL!) attached to grapevine
responses from RTBox with attached photodiode
"""
hw = DAQ(verbose=verbose)
kb = KB()
rb = RTBox(hw, kb, verbose)
Expand All @@ -345,6 +399,14 @@ async def test_DAQ(verbose=False):
print("sending high and zeroing")
hw.send(250) # 250 just has to be non-zero

async def test_keyboard(verbose=True):
"confirm keyboard responses are registered (for ar_eeg)"
hw = Hardware(verbose=verbose)
rb = KeyboardListener(hw, verbose=True)
http_run(hw)
print("push arrow keys. should see events here")
await asyncio.create_task(rb.watch())


async def test_LPT(verbose=False, address=0xD010):
"only test LPT. loop forever: send high and auto reset (loef eeg)"
Expand Down Expand Up @@ -392,7 +454,8 @@ async def fakeeeg(usekeyboard=False, verbose=False):
def parser(args):
import argparse
p = argparse.ArgumentParser(description="Intercept http queries and watch ButtonBox/PhotoDiode")
p.add_argument('place', choices=["loeff", "seeg", "test_http",
p.add_argument('place', choices=["loeff", "seeg", "areeg",
"test_http", "test_keyboard",
"test_rtbox", "test_DAQ",
"test_cedrus", "test_lpt"],
help='where (also how) to use button and ttl')
Expand All @@ -408,14 +471,19 @@ def parser(args):
asyncio.run(loeffeeg(verbose=args.verbose))
elif args.place == "seeg":
asyncio.run(seeg(verbose=args.verbose))
elif args.place == "areeg":
asyncio.run(ar_eeg(verbose=args.verbose))
elif args.place == "seeg":
asyncio.run(seeg(verbose=args.verbose))
elif args.place == "test_http":
asyncio.run(fakeeeg(args.usekeyboard, verbose=args.verbose))

elif args.place == "test_DAQ":
asyncio.run(test_DAQ(verbose=args.verbose))
elif args.place == "test_rtbox":
asyncio.run(rtbox_test(verbose=args.verbose))

elif args.place == "test_keyboard":
asyncio.run(test_keyboard(verbose=args.verbose))
elif args.place == "test_cedrus":
asyncio.run(cedrus_test(verbose=args.verbose))
elif args.place == "test_lpt":
Expand Down
5 changes: 5 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# 20240510 - psychopy cant run on newest python. using pyenv for
# pypy3.10-7.3.15
# git+https://github.com/psychopy/psychopy
tornado
pynput

0 comments on commit b94f8f1

Please sign in to comment.