subsequence.event_emitter
1import asyncio 2import inspect 3import logging 4import typing 5 6 7logger = logging.getLogger(__name__) 8 9CallbackType = typing.Callable[..., typing.Any] 10 11 12class EventEmitter: 13 14 """ 15 A simple event emitter supporting sync and async callbacks. 16 """ 17 18 def __init__ (self) -> None: 19 20 """ 21 Initialize an empty event registry. 22 """ 23 24 self._listeners: typing.Dict[str, typing.List[CallbackType]] = {} 25 26 27 def on (self, event_name: str, callback: CallbackType) -> None: 28 29 """ 30 Register a callback for an event name. 31 """ 32 33 if event_name not in self._listeners: 34 self._listeners[event_name] = [] 35 36 self._listeners[event_name].append(callback) 37 38 def off (self, event_name: str, callback: CallbackType) -> None: 39 40 """ 41 Unregister a previously registered callback. 42 43 Raises ``ValueError`` if the callback is not registered for the event. 44 """ 45 46 if event_name not in self._listeners or callback not in self._listeners[event_name]: 47 raise ValueError(f"Callback not registered for event {event_name!r}") 48 49 self._listeners[event_name].remove(callback) 50 51 52 def emit_sync (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None: 53 54 """ 55 Emit an event and call non-async listeners immediately. 56 """ 57 58 if event_name not in self._listeners: 59 return 60 61 for callback in self._listeners[event_name]: 62 63 if inspect.iscoroutinefunction(callback): 64 raise ValueError("Async callback encountered in emit_sync") 65 66 result = callback(*args, **kwargs) 67 68 # Catch async-callable objects too (async __call__ fails the 69 # iscoroutinefunction check but still returns an awaitable). 70 if inspect.isawaitable(result): 71 typing.cast(typing.Coroutine, result).close() 72 raise ValueError("Async callback encountered in emit_sync") 73 74 75 async def emit_async (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None: 76 77 """ 78 Emit an event, awaiting async listeners. 79 80 One raising listener never silences the others: sync exceptions are 81 logged and the remaining listeners still run, and async listeners are 82 gathered with their exceptions logged individually. 83 """ 84 85 if event_name not in self._listeners: 86 return 87 88 tasks: typing.List[typing.Awaitable[typing.Any]] = [] 89 90 for callback in self._listeners[event_name]: 91 92 # Calling first and checking the RESULT handles both coroutine 93 # functions and async-callable objects (async __call__), which 94 # iscoroutinefunction misses. 95 try: 96 result = callback(*args, **kwargs) 97 except Exception: 98 logger.exception("Listener for %r raised - continuing with remaining listeners", event_name) 99 continue 100 101 if inspect.isawaitable(result): 102 tasks.append(result) 103 104 if tasks: 105 results = await asyncio.gather(*tasks, return_exceptions=True) 106 107 for outcome in results: 108 if isinstance(outcome, BaseException): 109 logger.error("Async listener for %r raised", event_name, exc_info=outcome)
logger =
<Logger subsequence.event_emitter (WARNING)>
CallbackType =
typing.Callable[..., typing.Any]
class
EventEmitter:
13class EventEmitter: 14 15 """ 16 A simple event emitter supporting sync and async callbacks. 17 """ 18 19 def __init__ (self) -> None: 20 21 """ 22 Initialize an empty event registry. 23 """ 24 25 self._listeners: typing.Dict[str, typing.List[CallbackType]] = {} 26 27 28 def on (self, event_name: str, callback: CallbackType) -> None: 29 30 """ 31 Register a callback for an event name. 32 """ 33 34 if event_name not in self._listeners: 35 self._listeners[event_name] = [] 36 37 self._listeners[event_name].append(callback) 38 39 def off (self, event_name: str, callback: CallbackType) -> None: 40 41 """ 42 Unregister a previously registered callback. 43 44 Raises ``ValueError`` if the callback is not registered for the event. 45 """ 46 47 if event_name not in self._listeners or callback not in self._listeners[event_name]: 48 raise ValueError(f"Callback not registered for event {event_name!r}") 49 50 self._listeners[event_name].remove(callback) 51 52 53 def emit_sync (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None: 54 55 """ 56 Emit an event and call non-async listeners immediately. 57 """ 58 59 if event_name not in self._listeners: 60 return 61 62 for callback in self._listeners[event_name]: 63 64 if inspect.iscoroutinefunction(callback): 65 raise ValueError("Async callback encountered in emit_sync") 66 67 result = callback(*args, **kwargs) 68 69 # Catch async-callable objects too (async __call__ fails the 70 # iscoroutinefunction check but still returns an awaitable). 71 if inspect.isawaitable(result): 72 typing.cast(typing.Coroutine, result).close() 73 raise ValueError("Async callback encountered in emit_sync") 74 75 76 async def emit_async (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None: 77 78 """ 79 Emit an event, awaiting async listeners. 80 81 One raising listener never silences the others: sync exceptions are 82 logged and the remaining listeners still run, and async listeners are 83 gathered with their exceptions logged individually. 84 """ 85 86 if event_name not in self._listeners: 87 return 88 89 tasks: typing.List[typing.Awaitable[typing.Any]] = [] 90 91 for callback in self._listeners[event_name]: 92 93 # Calling first and checking the RESULT handles both coroutine 94 # functions and async-callable objects (async __call__), which 95 # iscoroutinefunction misses. 96 try: 97 result = callback(*args, **kwargs) 98 except Exception: 99 logger.exception("Listener for %r raised - continuing with remaining listeners", event_name) 100 continue 101 102 if inspect.isawaitable(result): 103 tasks.append(result) 104 105 if tasks: 106 results = await asyncio.gather(*tasks, return_exceptions=True) 107 108 for outcome in results: 109 if isinstance(outcome, BaseException): 110 logger.error("Async listener for %r raised", event_name, exc_info=outcome)
A simple event emitter supporting sync and async callbacks.
EventEmitter()
19 def __init__ (self) -> None: 20 21 """ 22 Initialize an empty event registry. 23 """ 24 25 self._listeners: typing.Dict[str, typing.List[CallbackType]] = {}
Initialize an empty event registry.
def
on(self, event_name: str, callback: Callable[..., Any]) -> None:
28 def on (self, event_name: str, callback: CallbackType) -> None: 29 30 """ 31 Register a callback for an event name. 32 """ 33 34 if event_name not in self._listeners: 35 self._listeners[event_name] = [] 36 37 self._listeners[event_name].append(callback)
Register a callback for an event name.
def
off(self, event_name: str, callback: Callable[..., Any]) -> None:
39 def off (self, event_name: str, callback: CallbackType) -> None: 40 41 """ 42 Unregister a previously registered callback. 43 44 Raises ``ValueError`` if the callback is not registered for the event. 45 """ 46 47 if event_name not in self._listeners or callback not in self._listeners[event_name]: 48 raise ValueError(f"Callback not registered for event {event_name!r}") 49 50 self._listeners[event_name].remove(callback)
Unregister a previously registered callback.
Raises ValueError if the callback is not registered for the event.
def
emit_sync(self, event_name: str, *args: Any, **kwargs: Any) -> None:
53 def emit_sync (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None: 54 55 """ 56 Emit an event and call non-async listeners immediately. 57 """ 58 59 if event_name not in self._listeners: 60 return 61 62 for callback in self._listeners[event_name]: 63 64 if inspect.iscoroutinefunction(callback): 65 raise ValueError("Async callback encountered in emit_sync") 66 67 result = callback(*args, **kwargs) 68 69 # Catch async-callable objects too (async __call__ fails the 70 # iscoroutinefunction check but still returns an awaitable). 71 if inspect.isawaitable(result): 72 typing.cast(typing.Coroutine, result).close() 73 raise ValueError("Async callback encountered in emit_sync")
Emit an event and call non-async listeners immediately.
async def
emit_async(self, event_name: str, *args: Any, **kwargs: Any) -> None:
76 async def emit_async (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None: 77 78 """ 79 Emit an event, awaiting async listeners. 80 81 One raising listener never silences the others: sync exceptions are 82 logged and the remaining listeners still run, and async listeners are 83 gathered with their exceptions logged individually. 84 """ 85 86 if event_name not in self._listeners: 87 return 88 89 tasks: typing.List[typing.Awaitable[typing.Any]] = [] 90 91 for callback in self._listeners[event_name]: 92 93 # Calling first and checking the RESULT handles both coroutine 94 # functions and async-callable objects (async __call__), which 95 # iscoroutinefunction misses. 96 try: 97 result = callback(*args, **kwargs) 98 except Exception: 99 logger.exception("Listener for %r raised - continuing with remaining listeners", event_name) 100 continue 101 102 if inspect.isawaitable(result): 103 tasks.append(result) 104 105 if tasks: 106 results = await asyncio.gather(*tasks, return_exceptions=True) 107 108 for outcome in results: 109 if isinstance(outcome, BaseException): 110 logger.error("Async listener for %r raised", event_name, exc_info=outcome)
Emit an event, awaiting async listeners.
One raising listener never silences the others: sync exceptions are logged and the remaining listeners still run, and async listeners are gathered with their exceptions logged individually.