subsequence.event_emitter
1import asyncio 2import typing 3 4 5CallbackType = typing.Callable[..., typing.Any] 6 7 8class EventEmitter: 9 10 """ 11 A simple event emitter supporting sync and async callbacks. 12 """ 13 14 def __init__ (self) -> None: 15 16 """ 17 Initialize an empty event registry. 18 """ 19 20 self._listeners: typing.Dict[str, typing.List[CallbackType]] = {} 21 22 23 def on (self, event_name: str, callback: CallbackType) -> None: 24 25 """ 26 Register a callback for an event name. 27 """ 28 29 if event_name not in self._listeners: 30 self._listeners[event_name] = [] 31 32 self._listeners[event_name].append(callback) 33 34 def off (self, event_name: str, callback: CallbackType) -> None: 35 36 """ 37 Unregister a previously registered callback. 38 39 Raises ``ValueError`` if the callback is not registered for the event. 40 """ 41 42 if event_name not in self._listeners or callback not in self._listeners[event_name]: 43 raise ValueError(f"Callback not registered for event {event_name!r}") 44 45 self._listeners[event_name].remove(callback) 46 47 48 def emit_sync (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None: 49 50 """ 51 Emit an event and call non-async listeners immediately. 52 """ 53 54 if event_name not in self._listeners: 55 return 56 57 for callback in self._listeners[event_name]: 58 59 if asyncio.iscoroutinefunction(callback): 60 raise ValueError("Async callback encountered in emit_sync") 61 62 callback(*args, **kwargs) 63 64 65 async def emit_async (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None: 66 67 """ 68 Emit an event and await async listeners. 69 """ 70 71 if event_name not in self._listeners: 72 return 73 74 tasks: typing.List[typing.Awaitable[typing.Any]] = [] 75 76 for callback in self._listeners[event_name]: 77 78 if asyncio.iscoroutinefunction(callback): 79 tasks.append(callback(*args, **kwargs)) 80 81 else: 82 callback(*args, **kwargs) 83 84 if tasks: 85 await asyncio.gather(*tasks)
CallbackType =
typing.Callable[..., typing.Any]
class
EventEmitter:
9class EventEmitter: 10 11 """ 12 A simple event emitter supporting sync and async callbacks. 13 """ 14 15 def __init__ (self) -> None: 16 17 """ 18 Initialize an empty event registry. 19 """ 20 21 self._listeners: typing.Dict[str, typing.List[CallbackType]] = {} 22 23 24 def on (self, event_name: str, callback: CallbackType) -> None: 25 26 """ 27 Register a callback for an event name. 28 """ 29 30 if event_name not in self._listeners: 31 self._listeners[event_name] = [] 32 33 self._listeners[event_name].append(callback) 34 35 def off (self, event_name: str, callback: CallbackType) -> None: 36 37 """ 38 Unregister a previously registered callback. 39 40 Raises ``ValueError`` if the callback is not registered for the event. 41 """ 42 43 if event_name not in self._listeners or callback not in self._listeners[event_name]: 44 raise ValueError(f"Callback not registered for event {event_name!r}") 45 46 self._listeners[event_name].remove(callback) 47 48 49 def emit_sync (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None: 50 51 """ 52 Emit an event and call non-async listeners immediately. 53 """ 54 55 if event_name not in self._listeners: 56 return 57 58 for callback in self._listeners[event_name]: 59 60 if asyncio.iscoroutinefunction(callback): 61 raise ValueError("Async callback encountered in emit_sync") 62 63 callback(*args, **kwargs) 64 65 66 async def emit_async (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None: 67 68 """ 69 Emit an event and await async listeners. 70 """ 71 72 if event_name not in self._listeners: 73 return 74 75 tasks: typing.List[typing.Awaitable[typing.Any]] = [] 76 77 for callback in self._listeners[event_name]: 78 79 if asyncio.iscoroutinefunction(callback): 80 tasks.append(callback(*args, **kwargs)) 81 82 else: 83 callback(*args, **kwargs) 84 85 if tasks: 86 await asyncio.gather(*tasks)
A simple event emitter supporting sync and async callbacks.
EventEmitter()
15 def __init__ (self) -> None: 16 17 """ 18 Initialize an empty event registry. 19 """ 20 21 self._listeners: typing.Dict[str, typing.List[CallbackType]] = {}
Initialize an empty event registry.
def
on(self, event_name: str, callback: Callable[..., Any]) -> None:
24 def on (self, event_name: str, callback: CallbackType) -> None: 25 26 """ 27 Register a callback for an event name. 28 """ 29 30 if event_name not in self._listeners: 31 self._listeners[event_name] = [] 32 33 self._listeners[event_name].append(callback)
Register a callback for an event name.
def
off(self, event_name: str, callback: Callable[..., Any]) -> None:
35 def off (self, event_name: str, callback: CallbackType) -> None: 36 37 """ 38 Unregister a previously registered callback. 39 40 Raises ``ValueError`` if the callback is not registered for the event. 41 """ 42 43 if event_name not in self._listeners or callback not in self._listeners[event_name]: 44 raise ValueError(f"Callback not registered for event {event_name!r}") 45 46 self._listeners[event_name].remove(callback)
Unregister a previously registered callback.
Raises ValueError if the callback is not registered for the event.
def
emit_sync(self, event_name: str, *args: Any, **kwargs: Any) -> None:
49 def emit_sync (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None: 50 51 """ 52 Emit an event and call non-async listeners immediately. 53 """ 54 55 if event_name not in self._listeners: 56 return 57 58 for callback in self._listeners[event_name]: 59 60 if asyncio.iscoroutinefunction(callback): 61 raise ValueError("Async callback encountered in emit_sync") 62 63 callback(*args, **kwargs)
Emit an event and call non-async listeners immediately.
async def
emit_async(self, event_name: str, *args: Any, **kwargs: Any) -> None:
66 async def emit_async (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None: 67 68 """ 69 Emit an event and await async listeners. 70 """ 71 72 if event_name not in self._listeners: 73 return 74 75 tasks: typing.List[typing.Awaitable[typing.Any]] = [] 76 77 for callback in self._listeners[event_name]: 78 79 if asyncio.iscoroutinefunction(callback): 80 tasks.append(callback(*args, **kwargs)) 81 82 else: 83 callback(*args, **kwargs) 84 85 if tasks: 86 await asyncio.gather(*tasks)
Emit an event and await async listeners.