diff --git a/AR_EEG.bat b/AR_EEG.bat new file mode 100644 index 0000000..c8ca6cb --- /dev/null +++ b/AR_EEG.bat @@ -0,0 +1,12 @@ +Rem run webserver to access task +Rem assume have psiclj.exe from native-image in this directory build +Rem will need to replace symlinks with files from resources/public/ +Rem out/imgs/ out/audio/ out/style.css +Rem shouldn't need to set password, server is open if running on localhost +set HTTPDBPASS="" +set RUNTOKEN="psiclj_%RANDOM%" +set PYTHON=C:\Program Files\PsychoPy\python.exe + +start %RUNTOKEN% cmd /c psiclj +start "" "%PYTHON%" http_ttl.py areeg +start "" http://127.0.0.1:3001/loeffeeg.html diff --git a/http_ttl.py b/http_ttl.py index 1238421..502d836 100755 --- a/http_ttl.py +++ b/http_ttl.py @@ -30,7 +30,7 @@ from tornado.httpserver import HTTPServer from tornado.web import RequestHandler, Application from tornado.ioloop import IOLoop -from pynput.keyboard import Controller, Key +from pynput.keyboard import Controller, Key, Listener import os import os.path @@ -154,6 +154,39 @@ async def watch(self): self.trigger("5 sec trigger") +class KeyboardListener(): + """ watch a real keyboard (cf. Cedrus, RTBox) + send ttl if approprate key pushed + left,right,up to ttl 2,3,4 matching cedrus (as of 20240510) + (1 reserved for photodiode) + """ + def __init__(self, hw, verbose=False): + self.hw = hw + self.verbose = verbose + self.keys_ttl = {'left': 2,'right': 3, 'up': 4} + self.keys = self.keys_ttl.keys() + + def parse_key(self, key): + "send key if key is in " + try: + k = key.name # 'left', 'right', 'up' + except AttributeError: + k = key.char # single-char keys 'a' + + if k in self.keys: + ttl = self.keys_ttl[k] + self.hw.send(ttl) + + if self.verbose: + print(f"[{datetime.datetime.now()}] {k} pushed") + + async def watch(self): + "watch keypresses in background. forward key on to see if button should be pushed" + listener = Listener(on_press=self.parse_key) + listener.start() + listener.join() + + class Cedrus(): """ cedrus response box (RB-x40) top 3 right buttons are 5, 6, 7 (0-2 left, 3,4 thumb 5-7 right)""" @@ -320,6 +353,10 @@ def http_run(this_hardware): async def loeffeeg(verbose=False): + """settings for Luna Lab EEG in Loeffler building + uses Cedrus response box (+attached photodiode) + and TTL over LPT (biosemi) + """ hw = LPT(address=0xD010, verbose=verbose) kb = KB() rb = Cedrus(hw, kb) @@ -327,7 +364,24 @@ async def loeffeeg(verbose=False): await asyncio.create_task(rb.watch()) +async def ar_eeg(verbose=False): + """ + button pushes directly from they keyboard. + TTL over LPT (biosemi) + 20240510WF - init for LAF + """ + port = 0xCFE8 + hw = LPT(address=port, verbose=verbose) + rb = KeyboardListener(hw, verbose=verbose) + http_run(hw) + await asyncio.create_task(rb.watch()) + + async def seeg(verbose=False): + """ + sEEG at childrens hospital using USB 1208FS (binary TTL!) attached to grapevine + responses from RTBox with attached photodiode + """ hw = DAQ(verbose=verbose) kb = KB() rb = RTBox(hw, kb, verbose) @@ -345,6 +399,14 @@ async def test_DAQ(verbose=False): print("sending high and zeroing") hw.send(250) # 250 just has to be non-zero +async def test_keyboard(verbose=True): + "confirm keyboard responses are registered (for ar_eeg)" + hw = Hardware(verbose=verbose) + rb = KeyboardListener(hw, verbose=True) + http_run(hw) + print("push arrow keys. should see events here") + await asyncio.create_task(rb.watch()) + async def test_LPT(verbose=False, address=0xD010): "only test LPT. loop forever: send high and auto reset (loef eeg)" @@ -392,7 +454,8 @@ async def fakeeeg(usekeyboard=False, verbose=False): def parser(args): import argparse p = argparse.ArgumentParser(description="Intercept http queries and watch ButtonBox/PhotoDiode") - p.add_argument('place', choices=["loeff", "seeg", "test_http", + p.add_argument('place', choices=["loeff", "seeg", "areeg", + "test_http", "test_keyboard", "test_rtbox", "test_DAQ", "test_cedrus", "test_lpt"], help='where (also how) to use button and ttl') @@ -408,6 +471,10 @@ def parser(args): asyncio.run(loeffeeg(verbose=args.verbose)) elif args.place == "seeg": asyncio.run(seeg(verbose=args.verbose)) + elif args.place == "areeg": + asyncio.run(ar_eeg(verbose=args.verbose)) + elif args.place == "seeg": + asyncio.run(seeg(verbose=args.verbose)) elif args.place == "test_http": asyncio.run(fakeeeg(args.usekeyboard, verbose=args.verbose)) @@ -415,7 +482,8 @@ def parser(args): asyncio.run(test_DAQ(verbose=args.verbose)) elif args.place == "test_rtbox": asyncio.run(rtbox_test(verbose=args.verbose)) - + elif args.place == "test_keyboard": + asyncio.run(test_keyboard(verbose=args.verbose)) elif args.place == "test_cedrus": asyncio.run(cedrus_test(verbose=args.verbose)) elif args.place == "test_lpt": diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..014886a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +# 20240510 - psychopy cant run on newest python. using pyenv for +# pypy3.10-7.3.15 +# git+https://github.com/psychopy/psychopy +tornado +pynput