subsequence.event_emitter

 1import asyncio
 2import typing
 3
 4
 5CallbackType = typing.Callable[..., typing.Any]
 6
 7
 8class EventEmitter:
 9
10	"""
11	A simple event emitter supporting sync and async callbacks.
12	"""
13
14	def __init__ (self) -> None:
15
16		"""
17		Initialize an empty event registry.
18		"""
19
20		self._listeners: typing.Dict[str, typing.List[CallbackType]] = {}
21
22
23	def on (self, event_name: str, callback: CallbackType) -> None:
24
25		"""
26		Register a callback for an event name.
27		"""
28
29		if event_name not in self._listeners:
30			self._listeners[event_name] = []
31
32		self._listeners[event_name].append(callback)
33
34	def off (self, event_name: str, callback: CallbackType) -> None:
35
36		"""
37		Unregister a previously registered callback.
38
39		Raises ``ValueError`` if the callback is not registered for the event.
40		"""
41
42		if event_name not in self._listeners or callback not in self._listeners[event_name]:
43			raise ValueError(f"Callback not registered for event {event_name!r}")
44
45		self._listeners[event_name].remove(callback)
46
47
48	def emit_sync (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None:
49
50		"""
51		Emit an event and call non-async listeners immediately.
52		"""
53
54		if event_name not in self._listeners:
55			return
56
57		for callback in self._listeners[event_name]:
58
59			if asyncio.iscoroutinefunction(callback):
60				raise ValueError("Async callback encountered in emit_sync")
61
62			callback(*args, **kwargs)
63
64
65	async def emit_async (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None:
66
67		"""
68		Emit an event and await async listeners.
69		"""
70
71		if event_name not in self._listeners:
72			return
73
74		tasks: typing.List[typing.Awaitable[typing.Any]] = []
75
76		for callback in self._listeners[event_name]:
77
78			if asyncio.iscoroutinefunction(callback):
79				tasks.append(callback(*args, **kwargs))
80
81			else:
82				callback(*args, **kwargs)
83
84		if tasks:
85			await asyncio.gather(*tasks)
CallbackType = typing.Callable[..., typing.Any]
class EventEmitter:
 9class EventEmitter:
10
11	"""
12	A simple event emitter supporting sync and async callbacks.
13	"""
14
15	def __init__ (self) -> None:
16
17		"""
18		Initialize an empty event registry.
19		"""
20
21		self._listeners: typing.Dict[str, typing.List[CallbackType]] = {}
22
23
24	def on (self, event_name: str, callback: CallbackType) -> None:
25
26		"""
27		Register a callback for an event name.
28		"""
29
30		if event_name not in self._listeners:
31			self._listeners[event_name] = []
32
33		self._listeners[event_name].append(callback)
34
35	def off (self, event_name: str, callback: CallbackType) -> None:
36
37		"""
38		Unregister a previously registered callback.
39
40		Raises ``ValueError`` if the callback is not registered for the event.
41		"""
42
43		if event_name not in self._listeners or callback not in self._listeners[event_name]:
44			raise ValueError(f"Callback not registered for event {event_name!r}")
45
46		self._listeners[event_name].remove(callback)
47
48
49	def emit_sync (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None:
50
51		"""
52		Emit an event and call non-async listeners immediately.
53		"""
54
55		if event_name not in self._listeners:
56			return
57
58		for callback in self._listeners[event_name]:
59
60			if asyncio.iscoroutinefunction(callback):
61				raise ValueError("Async callback encountered in emit_sync")
62
63			callback(*args, **kwargs)
64
65
66	async def emit_async (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None:
67
68		"""
69		Emit an event and await async listeners.
70		"""
71
72		if event_name not in self._listeners:
73			return
74
75		tasks: typing.List[typing.Awaitable[typing.Any]] = []
76
77		for callback in self._listeners[event_name]:
78
79			if asyncio.iscoroutinefunction(callback):
80				tasks.append(callback(*args, **kwargs))
81
82			else:
83				callback(*args, **kwargs)
84
85		if tasks:
86			await asyncio.gather(*tasks)

A simple event emitter supporting sync and async callbacks.

EventEmitter()
15	def __init__ (self) -> None:
16
17		"""
18		Initialize an empty event registry.
19		"""
20
21		self._listeners: typing.Dict[str, typing.List[CallbackType]] = {}

Initialize an empty event registry.

def on(self, event_name: str, callback: Callable[..., Any]) -> None:
24	def on (self, event_name: str, callback: CallbackType) -> None:
25
26		"""
27		Register a callback for an event name.
28		"""
29
30		if event_name not in self._listeners:
31			self._listeners[event_name] = []
32
33		self._listeners[event_name].append(callback)

Register a callback for an event name.

def off(self, event_name: str, callback: Callable[..., Any]) -> None:
35	def off (self, event_name: str, callback: CallbackType) -> None:
36
37		"""
38		Unregister a previously registered callback.
39
40		Raises ``ValueError`` if the callback is not registered for the event.
41		"""
42
43		if event_name not in self._listeners or callback not in self._listeners[event_name]:
44			raise ValueError(f"Callback not registered for event {event_name!r}")
45
46		self._listeners[event_name].remove(callback)

Unregister a previously registered callback.

Raises ValueError if the callback is not registered for the event.

def emit_sync(self, event_name: str, *args: Any, **kwargs: Any) -> None:
49	def emit_sync (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None:
50
51		"""
52		Emit an event and call non-async listeners immediately.
53		"""
54
55		if event_name not in self._listeners:
56			return
57
58		for callback in self._listeners[event_name]:
59
60			if asyncio.iscoroutinefunction(callback):
61				raise ValueError("Async callback encountered in emit_sync")
62
63			callback(*args, **kwargs)

Emit an event and call non-async listeners immediately.

async def emit_async(self, event_name: str, *args: Any, **kwargs: Any) -> None:
66	async def emit_async (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None:
67
68		"""
69		Emit an event and await async listeners.
70		"""
71
72		if event_name not in self._listeners:
73			return
74
75		tasks: typing.List[typing.Awaitable[typing.Any]] = []
76
77		for callback in self._listeners[event_name]:
78
79			if asyncio.iscoroutinefunction(callback):
80				tasks.append(callback(*args, **kwargs))
81
82			else:
83				callback(*args, **kwargs)
84
85		if tasks:
86			await asyncio.gather(*tasks)

Emit an event and await async listeners.