kiln_ai.utils
Utils
Misc utilities used in the kiln_ai library.
class
AsyncLockManager:
15class AsyncLockManager: 16 """ 17 A per-key asyncio lock manager that automatically cleans up locks when they're no longer needed. 18 19 Usage: 20 locks = AsyncLockManager() 21 22 async with locks.acquire("user:123"): 23 # critical section for "user:123" 24 ... 25 26 The manager removes a key when there are no holders and no waiters. 27 """ 28 29 def __init__(self) -> None: 30 # Protects the _locks dict and bookkeeping counters. 31 self._mu = asyncio.Lock() 32 self._locks: Dict[Hashable, _Entry] = {} 33 34 @asynccontextmanager 35 async def acquire(self, key: Hashable, *, timeout: float | None = None): 36 """ 37 Acquire the lock for `key` as an async context manager. 38 39 - `timeout`: optional seconds to wait; raises TimeoutError on expiry. 40 """ 41 # Phase 1: register as a waiter and get/create the entry (under manager mutex). 42 async with self._mu: 43 entry = self._locks.get(key) 44 if entry is None: 45 entry = self._locks[key] = _Entry() 46 entry.waiters += 1 47 48 # Phase 2: wait on the per-key lock (outside manager mutex). 49 try: 50 if timeout is None: 51 await entry.lock.acquire() 52 else: 53 # Manual timeout to keep compatibility across Python versions. 54 await asyncio.wait_for(entry.lock.acquire(), timeout=timeout) 55 56 # Phase 3: update counters: became a holder. 57 async with self._mu: 58 entry.waiters -= 1 59 entry.holders += 1 60 61 try: 62 yield # critical section 63 finally: 64 # Phase 4: release holder and maybe cleanup. 65 entry.lock.release() 66 async with self._mu: 67 entry.holders -= 1 68 # Remove the entry if fully idle. 69 if entry.waiters == 0 and entry.holders == 0: 70 # Double-check we still point to same object (paranoia/race safety). 71 if self._locks.get(key) is entry: 72 del self._locks[key] 73 74 except asyncio.TimeoutError: 75 # Timed out while waiting; undo waiter count and maybe cleanup. 76 async with self._mu: 77 entry.waiters -= 1 78 if entry.waiters == 0 and entry.holders == 0: 79 if self._locks.get(key) is entry: 80 del self._locks[key] 81 raise 82 except asyncio.CancelledError: 83 # Cancelled while waiting; same cleanup as timeout. 84 async with self._mu: 85 entry.waiters -= 1 86 if entry.waiters == 0 and entry.holders == 0: 87 if self._locks.get(key) is entry: 88 del self._locks[key] 89 raise 90 91 # Optional: expose a snapshot for metrics/debugging 92 async def snapshot(self) -> Dict[Hashable, dict]: 93 async with self._mu: 94 return { 95 k: {"waiters": e.waiters, "holders": e.holders} 96 for k, e in self._locks.items() 97 }
A per-key asyncio lock manager that automatically cleans up locks when they're no longer needed.
Usage: locks = AsyncLockManager()
async with locks.acquire("user:123"):
# critical section for "user:123"
...
The manager removes a key when there are no holders and no waiters.
@asynccontextmanager
async def
acquire(self, key: Hashable, *, timeout: float | None = None):
34 @asynccontextmanager 35 async def acquire(self, key: Hashable, *, timeout: float | None = None): 36 """ 37 Acquire the lock for `key` as an async context manager. 38 39 - `timeout`: optional seconds to wait; raises TimeoutError on expiry. 40 """ 41 # Phase 1: register as a waiter and get/create the entry (under manager mutex). 42 async with self._mu: 43 entry = self._locks.get(key) 44 if entry is None: 45 entry = self._locks[key] = _Entry() 46 entry.waiters += 1 47 48 # Phase 2: wait on the per-key lock (outside manager mutex). 49 try: 50 if timeout is None: 51 await entry.lock.acquire() 52 else: 53 # Manual timeout to keep compatibility across Python versions. 54 await asyncio.wait_for(entry.lock.acquire(), timeout=timeout) 55 56 # Phase 3: update counters: became a holder. 57 async with self._mu: 58 entry.waiters -= 1 59 entry.holders += 1 60 61 try: 62 yield # critical section 63 finally: 64 # Phase 4: release holder and maybe cleanup. 65 entry.lock.release() 66 async with self._mu: 67 entry.holders -= 1 68 # Remove the entry if fully idle. 69 if entry.waiters == 0 and entry.holders == 0: 70 # Double-check we still point to same object (paranoia/race safety). 71 if self._locks.get(key) is entry: 72 del self._locks[key] 73 74 except asyncio.TimeoutError: 75 # Timed out while waiting; undo waiter count and maybe cleanup. 76 async with self._mu: 77 entry.waiters -= 1 78 if entry.waiters == 0 and entry.holders == 0: 79 if self._locks.get(key) is entry: 80 del self._locks[key] 81 raise 82 except asyncio.CancelledError: 83 # Cancelled while waiting; same cleanup as timeout. 84 async with self._mu: 85 entry.waiters -= 1 86 if entry.waiters == 0 and entry.holders == 0: 87 if self._locks.get(key) is entry: 88 del self._locks[key] 89 raise
Acquire the lock for key
as an async context manager.
timeout
: optional seconds to wait; raises TimeoutError on expiry.