kiln_ai.utils

Utils

Misc utilities used in the kiln_ai library.

 1"""
 2# Utils
 3
 4Misc utilities used in the kiln_ai library.
 5"""
 6
 7from . import config, formatting
 8from .lock import AsyncLockManager, shared_async_lock_manager
 9
10__all__ = [
11    "AsyncLockManager",
12    "config",
13    "formatting",
14    "shared_async_lock_manager",
15]
class AsyncLockManager:
15class AsyncLockManager:
16    """
17    A per-key asyncio lock manager that automatically cleans up locks when they're no longer needed.
18
19    Usage:
20        locks = AsyncLockManager()
21
22        async with locks.acquire("user:123"):
23            # critical section for "user:123"
24            ...
25
26    The manager removes a key when there are no holders and no waiters.
27    """
28
29    def __init__(self) -> None:
30        # Protects the _locks dict and bookkeeping counters.
31        self._mu = asyncio.Lock()
32        self._locks: Dict[Hashable, _Entry] = {}
33
34    @asynccontextmanager
35    async def acquire(self, key: Hashable, *, timeout: float | None = None):
36        """
37        Acquire the lock for `key` as an async context manager.
38
39        - `timeout`: optional seconds to wait; raises TimeoutError on expiry.
40        """
41        # Phase 1: register as a waiter and get/create the entry (under manager mutex).
42        async with self._mu:
43            entry = self._locks.get(key)
44            if entry is None:
45                entry = self._locks[key] = _Entry()
46            entry.waiters += 1
47
48        # Phase 2: wait on the per-key lock (outside manager mutex).
49        try:
50            if timeout is None:
51                await entry.lock.acquire()
52            else:
53                # Manual timeout to keep compatibility across Python versions.
54                await asyncio.wait_for(entry.lock.acquire(), timeout=timeout)
55
56            # Phase 3: update counters: became a holder.
57            async with self._mu:
58                entry.waiters -= 1
59                entry.holders += 1
60
61            try:
62                yield  # critical section
63            finally:
64                # Phase 4: release holder and maybe cleanup.
65                entry.lock.release()
66                async with self._mu:
67                    entry.holders -= 1
68                    # Remove the entry if fully idle.
69                    if entry.waiters == 0 and entry.holders == 0:
70                        # Double-check we still point to same object (paranoia/race safety).
71                        if self._locks.get(key) is entry:
72                            del self._locks[key]
73
74        except asyncio.TimeoutError:
75            # Timed out while waiting; undo waiter count and maybe cleanup.
76            async with self._mu:
77                entry.waiters -= 1
78                if entry.waiters == 0 and entry.holders == 0:
79                    if self._locks.get(key) is entry:
80                        del self._locks[key]
81            raise
82        except asyncio.CancelledError:
83            # Cancelled while waiting; same cleanup as timeout.
84            async with self._mu:
85                entry.waiters -= 1
86                if entry.waiters == 0 and entry.holders == 0:
87                    if self._locks.get(key) is entry:
88                        del self._locks[key]
89            raise
90
91    # Optional: expose a snapshot for metrics/debugging
92    async def snapshot(self) -> Dict[Hashable, dict]:
93        async with self._mu:
94            return {
95                k: {"waiters": e.waiters, "holders": e.holders}
96                for k, e in self._locks.items()
97            }

A per-key asyncio lock manager that automatically cleans up locks when they're no longer needed.

Usage: locks = AsyncLockManager()

async with locks.acquire("user:123"):
    # critical section for "user:123"
    ...

The manager removes a key when there are no holders and no waiters.

@asynccontextmanager
async def acquire(self, key: Hashable, *, timeout: float | None = None):
34    @asynccontextmanager
35    async def acquire(self, key: Hashable, *, timeout: float | None = None):
36        """
37        Acquire the lock for `key` as an async context manager.
38
39        - `timeout`: optional seconds to wait; raises TimeoutError on expiry.
40        """
41        # Phase 1: register as a waiter and get/create the entry (under manager mutex).
42        async with self._mu:
43            entry = self._locks.get(key)
44            if entry is None:
45                entry = self._locks[key] = _Entry()
46            entry.waiters += 1
47
48        # Phase 2: wait on the per-key lock (outside manager mutex).
49        try:
50            if timeout is None:
51                await entry.lock.acquire()
52            else:
53                # Manual timeout to keep compatibility across Python versions.
54                await asyncio.wait_for(entry.lock.acquire(), timeout=timeout)
55
56            # Phase 3: update counters: became a holder.
57            async with self._mu:
58                entry.waiters -= 1
59                entry.holders += 1
60
61            try:
62                yield  # critical section
63            finally:
64                # Phase 4: release holder and maybe cleanup.
65                entry.lock.release()
66                async with self._mu:
67                    entry.holders -= 1
68                    # Remove the entry if fully idle.
69                    if entry.waiters == 0 and entry.holders == 0:
70                        # Double-check we still point to same object (paranoia/race safety).
71                        if self._locks.get(key) is entry:
72                            del self._locks[key]
73
74        except asyncio.TimeoutError:
75            # Timed out while waiting; undo waiter count and maybe cleanup.
76            async with self._mu:
77                entry.waiters -= 1
78                if entry.waiters == 0 and entry.holders == 0:
79                    if self._locks.get(key) is entry:
80                        del self._locks[key]
81            raise
82        except asyncio.CancelledError:
83            # Cancelled while waiting; same cleanup as timeout.
84            async with self._mu:
85                entry.waiters -= 1
86                if entry.waiters == 0 and entry.holders == 0:
87                    if self._locks.get(key) is entry:
88                        del self._locks[key]
89            raise

Acquire the lock for key as an async context manager.

  • timeout: optional seconds to wait; raises TimeoutError on expiry.
async def snapshot(self) -> Dict[Hashable, dict]:
92    async def snapshot(self) -> Dict[Hashable, dict]:
93        async with self._mu:
94            return {
95                k: {"waiters": e.waiters, "holders": e.holders}
96                for k, e in self._locks.items()
97            }
shared_async_lock_manager = <AsyncLockManager object>