Source code for pattern_kit.creational.object_pool

from queue import Queue, Empty
from typing import Callable, TypeVar, Generic
from contextlib import contextmanager, asynccontextmanager

import asyncio

T = TypeVar("T")


[docs] class ObjectPool(Generic[T]): """ A simple thread-safe object pool for synchronous use. This pool reuses objects to avoid repeated construction. It does not strictly enforce a maximum number of active objects— `max_size` only limits how many can be stored for reuse. """ def __init__(self, factory: Callable[[], T], max_size: int = 10): self._factory = factory self._pool = Queue(maxsize=max_size)
[docs] def acquire(self) -> T: """ Acquire an object from the pool. If none are available, creates a new one. """ try: return self._pool.get_nowait() except Empty: return self._factory()
[docs] def release(self, obj: T) -> None: """ Return an object to the pool for reuse. If the pool is full, the object is discarded. """ try: self._pool.put_nowait(obj) except: pass
[docs] @contextmanager def borrow(self): """ Context manager version of `acquire()` + `release()`. """ obj = self.acquire() try: yield obj finally: self.release(obj)
[docs] def clear(self) -> None: """Remove all idle objects from the pool.""" while not self._pool.empty(): self._pool.get_nowait()
def __len__(self) -> int: """Return the number of idle objects in the pool.""" return self._pool.qsize()
[docs] class AsyncObjectPool(Generic[T]): """ An asyncio-compatible object pool. This pool reuses objects to avoid repeated construction. It does not strictly enforce a maximum number of active objects— `max_size` only limits how many can be stored for reuse. """ def __init__(self, factory: Callable[[], T], max_size: int = 10): self._factory = factory self._pool = asyncio.Queue(maxsize=max_size)
[docs] async def acquire(self) -> T: """ Acquire an object from the async pool. Returns a new one if no reusable objects are available. """ if self._pool.empty(): return self._factory() return await self._pool.get()
[docs] async def release(self, obj: T) -> None: """ Return an object to the pool. If the pool is full, the object is discarded. """ if self._pool.full(): return await self._pool.put(obj)
[docs] @asynccontextmanager async def borrow(self): """ Async context manager version of `acquire()` + `release()`. """ obj = await self.acquire() try: yield obj finally: await self.release(obj)
[docs] async def clear(self) -> None: """Remove all idle objects from the pool.""" while not self._pool.empty(): await self._pool.get()
def __len__(self) -> int: """Return the number of idle objects in the pool.""" return self._pool.qsize()