subsequence.event_emitter

  1import asyncio
  2import inspect
  3import logging
  4import typing
  5
  6
  7logger = logging.getLogger(__name__)
  8
  9CallbackType = typing.Callable[..., typing.Any]
 10
 11
 12class EventEmitter:
 13
 14	"""
 15	A simple event emitter supporting sync and async callbacks.
 16	"""
 17
 18	def __init__ (self) -> None:
 19
 20		"""
 21		Initialize an empty event registry.
 22		"""
 23
 24		self._listeners: typing.Dict[str, typing.List[CallbackType]] = {}
 25
 26
 27	def on (self, event_name: str, callback: CallbackType) -> None:
 28
 29		"""
 30		Register a callback for an event name.
 31		"""
 32
 33		if event_name not in self._listeners:
 34			self._listeners[event_name] = []
 35
 36		self._listeners[event_name].append(callback)
 37
 38	def off (self, event_name: str, callback: CallbackType) -> None:
 39
 40		"""
 41		Unregister a previously registered callback.
 42
 43		Raises ``ValueError`` if the callback is not registered for the event.
 44		"""
 45
 46		if event_name not in self._listeners or callback not in self._listeners[event_name]:
 47			raise ValueError(f"Callback not registered for event {event_name!r}")
 48
 49		self._listeners[event_name].remove(callback)
 50
 51
 52	def emit_sync (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None:
 53
 54		"""
 55		Emit an event and call non-async listeners immediately.
 56		"""
 57
 58		if event_name not in self._listeners:
 59			return
 60
 61		for callback in self._listeners[event_name]:
 62
 63			if inspect.iscoroutinefunction(callback):
 64				raise ValueError("Async callback encountered in emit_sync")
 65
 66			result = callback(*args, **kwargs)
 67
 68			# Catch async-callable objects too (async __call__ fails the
 69			# iscoroutinefunction check but still returns an awaitable).
 70			if inspect.isawaitable(result):
 71				typing.cast(typing.Coroutine, result).close()
 72				raise ValueError("Async callback encountered in emit_sync")
 73
 74
 75	async def emit_async (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None:
 76
 77		"""
 78		Emit an event, awaiting async listeners.
 79
 80		One raising listener never silences the others: sync exceptions are
 81		logged and the remaining listeners still run, and async listeners are
 82		gathered with their exceptions logged individually.
 83		"""
 84
 85		if event_name not in self._listeners:
 86			return
 87
 88		tasks: typing.List[typing.Awaitable[typing.Any]] = []
 89
 90		for callback in self._listeners[event_name]:
 91
 92			# Calling first and checking the RESULT handles both coroutine
 93			# functions and async-callable objects (async __call__), which
 94			# iscoroutinefunction misses.
 95			try:
 96				result = callback(*args, **kwargs)
 97			except Exception:
 98				logger.exception("Listener for %r raised - continuing with remaining listeners", event_name)
 99				continue
100
101			if inspect.isawaitable(result):
102				tasks.append(result)
103
104		if tasks:
105			results = await asyncio.gather(*tasks, return_exceptions=True)
106
107			for outcome in results:
108				if isinstance(outcome, BaseException):
109					logger.error("Async listener for %r raised", event_name, exc_info=outcome)
logger = <Logger subsequence.event_emitter (WARNING)>
CallbackType = typing.Callable[..., typing.Any]
class EventEmitter:
 13class EventEmitter:
 14
 15	"""
 16	A simple event emitter supporting sync and async callbacks.
 17	"""
 18
 19	def __init__ (self) -> None:
 20
 21		"""
 22		Initialize an empty event registry.
 23		"""
 24
 25		self._listeners: typing.Dict[str, typing.List[CallbackType]] = {}
 26
 27
 28	def on (self, event_name: str, callback: CallbackType) -> None:
 29
 30		"""
 31		Register a callback for an event name.
 32		"""
 33
 34		if event_name not in self._listeners:
 35			self._listeners[event_name] = []
 36
 37		self._listeners[event_name].append(callback)
 38
 39	def off (self, event_name: str, callback: CallbackType) -> None:
 40
 41		"""
 42		Unregister a previously registered callback.
 43
 44		Raises ``ValueError`` if the callback is not registered for the event.
 45		"""
 46
 47		if event_name not in self._listeners or callback not in self._listeners[event_name]:
 48			raise ValueError(f"Callback not registered for event {event_name!r}")
 49
 50		self._listeners[event_name].remove(callback)
 51
 52
 53	def emit_sync (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None:
 54
 55		"""
 56		Emit an event and call non-async listeners immediately.
 57		"""
 58
 59		if event_name not in self._listeners:
 60			return
 61
 62		for callback in self._listeners[event_name]:
 63
 64			if inspect.iscoroutinefunction(callback):
 65				raise ValueError("Async callback encountered in emit_sync")
 66
 67			result = callback(*args, **kwargs)
 68
 69			# Catch async-callable objects too (async __call__ fails the
 70			# iscoroutinefunction check but still returns an awaitable).
 71			if inspect.isawaitable(result):
 72				typing.cast(typing.Coroutine, result).close()
 73				raise ValueError("Async callback encountered in emit_sync")
 74
 75
 76	async def emit_async (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None:
 77
 78		"""
 79		Emit an event, awaiting async listeners.
 80
 81		One raising listener never silences the others: sync exceptions are
 82		logged and the remaining listeners still run, and async listeners are
 83		gathered with their exceptions logged individually.
 84		"""
 85
 86		if event_name not in self._listeners:
 87			return
 88
 89		tasks: typing.List[typing.Awaitable[typing.Any]] = []
 90
 91		for callback in self._listeners[event_name]:
 92
 93			# Calling first and checking the RESULT handles both coroutine
 94			# functions and async-callable objects (async __call__), which
 95			# iscoroutinefunction misses.
 96			try:
 97				result = callback(*args, **kwargs)
 98			except Exception:
 99				logger.exception("Listener for %r raised - continuing with remaining listeners", event_name)
100				continue
101
102			if inspect.isawaitable(result):
103				tasks.append(result)
104
105		if tasks:
106			results = await asyncio.gather(*tasks, return_exceptions=True)
107
108			for outcome in results:
109				if isinstance(outcome, BaseException):
110					logger.error("Async listener for %r raised", event_name, exc_info=outcome)

A simple event emitter supporting sync and async callbacks.

EventEmitter()
19	def __init__ (self) -> None:
20
21		"""
22		Initialize an empty event registry.
23		"""
24
25		self._listeners: typing.Dict[str, typing.List[CallbackType]] = {}

Initialize an empty event registry.

def on(self, event_name: str, callback: Callable[..., Any]) -> None:
28	def on (self, event_name: str, callback: CallbackType) -> None:
29
30		"""
31		Register a callback for an event name.
32		"""
33
34		if event_name not in self._listeners:
35			self._listeners[event_name] = []
36
37		self._listeners[event_name].append(callback)

Register a callback for an event name.

def off(self, event_name: str, callback: Callable[..., Any]) -> None:
39	def off (self, event_name: str, callback: CallbackType) -> None:
40
41		"""
42		Unregister a previously registered callback.
43
44		Raises ``ValueError`` if the callback is not registered for the event.
45		"""
46
47		if event_name not in self._listeners or callback not in self._listeners[event_name]:
48			raise ValueError(f"Callback not registered for event {event_name!r}")
49
50		self._listeners[event_name].remove(callback)

Unregister a previously registered callback.

Raises ValueError if the callback is not registered for the event.

def emit_sync(self, event_name: str, *args: Any, **kwargs: Any) -> None:
53	def emit_sync (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None:
54
55		"""
56		Emit an event and call non-async listeners immediately.
57		"""
58
59		if event_name not in self._listeners:
60			return
61
62		for callback in self._listeners[event_name]:
63
64			if inspect.iscoroutinefunction(callback):
65				raise ValueError("Async callback encountered in emit_sync")
66
67			result = callback(*args, **kwargs)
68
69			# Catch async-callable objects too (async __call__ fails the
70			# iscoroutinefunction check but still returns an awaitable).
71			if inspect.isawaitable(result):
72				typing.cast(typing.Coroutine, result).close()
73				raise ValueError("Async callback encountered in emit_sync")

Emit an event and call non-async listeners immediately.

async def emit_async(self, event_name: str, *args: Any, **kwargs: Any) -> None:
 76	async def emit_async (self, event_name: str, *args: typing.Any, **kwargs: typing.Any) -> None:
 77
 78		"""
 79		Emit an event, awaiting async listeners.
 80
 81		One raising listener never silences the others: sync exceptions are
 82		logged and the remaining listeners still run, and async listeners are
 83		gathered with their exceptions logged individually.
 84		"""
 85
 86		if event_name not in self._listeners:
 87			return
 88
 89		tasks: typing.List[typing.Awaitable[typing.Any]] = []
 90
 91		for callback in self._listeners[event_name]:
 92
 93			# Calling first and checking the RESULT handles both coroutine
 94			# functions and async-callable objects (async __call__), which
 95			# iscoroutinefunction misses.
 96			try:
 97				result = callback(*args, **kwargs)
 98			except Exception:
 99				logger.exception("Listener for %r raised - continuing with remaining listeners", event_name)
100				continue
101
102			if inspect.isawaitable(result):
103				tasks.append(result)
104
105		if tasks:
106			results = await asyncio.gather(*tasks, return_exceptions=True)
107
108			for outcome in results:
109				if isinstance(outcome, BaseException):
110					logger.error("Async listener for %r raised", event_name, exc_info=outcome)

Emit an event, awaiting async listeners.

One raising listener never silences the others: sync exceptions are logged and the remaining listeners still run, and async listeners are gathered with their exceptions logged individually.