Source code for pattern_kit.behavioral.handler_pipeline

from abc import ABC, abstractmethod
from typing import Any, Union
import inspect

[docs] class StopPipeline(Exception): """ Raised by handlers to exit the pipeline early. Optionally carry a `result` to return from `run` or `run_async`. """ def __init__(self, result: Any = None): super().__init__() self.result = result
[docs] class Handler(ABC): """ Base handler class for use in a HandlerPipeline. Override `can_handle()` and `handle()` as needed. """
[docs] def can_handle(self, data: Any, **kwargs) -> bool: """Override if you want filtering behavior. Default: always handle.""" return True
[docs] @abstractmethod def handle(self, data: Any, **kwargs) -> Any: """Perform processing on data and return result.""" pass
[docs] class AsyncHandler(ABC): """ Base handler class for use in a HandlerPipeline. Override `can_handle()` and `handle()` as needed. """
[docs] def can_handle(self, data: Any, **kwargs) -> bool: """Override if you want filtering behavior. Default: always handle.""" return True
[docs] @abstractmethod async def handle(self, data: Any, **kwargs) -> Any: """Perform processing on data and return result.""" pass
[docs] class HandlerPipeline: """ A configurable pipeline of handlers. Args: pass_result (bool): If True, result of each handler is passed to the next handler. """ def __init__(self, pass_result: bool = False): self._handlers: list[Union[Handler, AsyncHandler]] = [] self._pass_result = pass_result
[docs] def add_handler(self, handler: Union[Handler, AsyncHandler]) -> None: """ Add an handler. """ self._handlers.append(handler)
[docs] def remove_handler(self, handler: Union[Handler, AsyncHandler]) -> None: """ Remove an handler. """ self._handlers.remove(handler)
def __iadd__(self, handler: Union[Handler, AsyncHandler]): """Add handler using `+=` operator.""" self.add_handler(handler) return self def __isub__(self, handler: Union[Handler, AsyncHandler]): """Remove handler using `-=` operator.""" self.remove_handler(handler) return self
[docs] def run(self, data: Any, **kwargs) -> Any: """ Run the pipeline. """ result = current = data try: for handler in self._handlers: if handler.can_handle(current, **kwargs): result = handler.handle(current, **kwargs) if self._pass_result: current = result except StopPipeline as stop: return stop.result return result
[docs] async def run_async(self, data: Any, **kwargs) -> Any: """ Run the pipeline asynchronously. Supports both sync and async handlers transparently. """ result = current = data try: for handler in self._handlers: if handler.can_handle(current, **kwargs): method = getattr(handler, "handle", None) if inspect.iscoroutinefunction(method): result = await method(current, **kwargs) else: result = method(current, **kwargs) if self._pass_result: current = result except StopPipeline as stop: return stop.result return result