-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_handler.py
86 lines (68 loc) · 2.85 KB
/
test_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#!/usr/bin/env python
"""Test handler to showcase operational `shm_open(2)` access.
Uses multiprocessing.Queue and multiprocessing.Semaphore to prove functionality.
"""
import functools
import operator
import os.path
import random
import logging
from contextlib import nullcontext, closing
from multiprocessing import get_context, Queue, Semaphore
from typing import Optional, Any, Dict, List
module_name: str = f"{os.path.splitext(__file__)[0]}" if __name__ == "__main__" else __name__
logger: logging.Logger = logging.getLogger(module_name)
# process globals:
_fork_queue: Optional[Queue] = None
_fork_semaphore: Optional[Semaphore] = None
def handler(event: Dict[str, str], context: Any) -> Dict[str, List[str]]:
global _fork_semaphore, _fork_queue
logger = logging.getLogger(f"{module_name}.handler")
event.setdefault("limit", 23)
limit = int(event["limit"])
args = range(limit)
context = get_context("fork")
_fork_semaphore = context.Semaphore(4)
_fork_queue = context.Queue()
with context.Pool() as p, closing(_fork_queue) as queue:
result = p.map(slow_call, args)
logger.info(f"Single map returned {args!r} -> {result!r}")
result = p.map(functools.partial(slow_call, lock=True, queue=True), args)
with _fork_semaphore:
result_q = []
for _ in range(limit):
result_q.append(queue.get())
logger.info(f"Queue returned {result_q!r}")
if frozenset(result_q) != frozenset(result):
raise ValueError("This isn't right! results should be identical!")
return {"by_return": list(result), "by_queue": list(result_q)}
def slow_call(value: int, queue: Optional[Queue] = None, lock: Optional[Semaphore] = None) -> int:
logger = logging.getLogger(f"{module_name}.slow_call")
if lock is True:
lock = _fork_semaphore
if queue is True:
queue = _fork_queue
logger.debug(f"Queue is {queue!r}, lock is {lock!r}")
with lock or nullcontext():
op = random.choice([operator.pow, operator.add])
# time.sleep(random.randrange(0, 1))
val = op(value, abs(value - 2) or 1)
if queue is not None:
queue.put(val)
return val
if __name__ == "__main__":
import argparse
def _setup_logging(debug: bool = False):
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
if debug:
handler.setLevel(logging.DEBUG)
handler.setFormatter(logging.Formatter("%(name)s: %(message)s"))
logger.addHandler(handler)
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("-d", "--debug", action="store_true", default=False)
parser.add_argument("limit", default=23, nargs="?")
args = parser.parse_args()
_setup_logging(args.debug)
handler({"limit": args.limit}, None)