Utilities
Core utility functions and helpers.
utils
Attributes
EXPIRES_GRACE_SECONDS = 15
module-attribute
T = TypeVar('T')
module-attribute
Classes
TwoLevelCache
A two-level caching system with soft and hard time-to-live (TTL) expiration.
This cache implements a two-tier caching mechanism to allow for background refresh of cached values. It uses a soft TTL for quick access and a hard TTL as a fallback, which helps in reducing latency and maintaining data freshness.
Attributes:
| Name | Type | Description |
|---|---|---|
soft_cache |
TTLCache
|
A cache with a shorter TTL for quick access. |
hard_cache |
TTLCache
|
A cache with a longer TTL as a fallback. |
locks |
defaultdict
|
Thread-safe locks for each cache key. |
futures |
dict
|
Stores ongoing asynchronous population tasks. |
pool |
ThreadPoolExecutor
|
Thread pool for executing cache population tasks. |
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
soft_ttl
|
int
|
Time-to-live in seconds for the soft cache. |
required |
hard_ttl
|
int
|
Time-to-live in seconds for the hard cache. |
required |
max_workers
|
int
|
Maximum number of workers in the thread pool. |
10
|
max_items
|
int
|
Maximum number of items in the cache. |
1000000
|
Example
cache = TwoLevelCache(soft_ttl=60, hard_ttl=300) def populate_func(): ... return "cached_value" value = cache.get("key", populate_func)
Source code in diracx-core/src/diracx/core/utils.py
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 | |
Attributes
soft_cache = TTLCache(max_items, soft_ttl)
instance-attribute
hard_cache = TTLCache(max_items, hard_ttl)
instance-attribute
locks = defaultdict(threading.Lock)
instance-attribute
futures = {}
instance-attribute
pool = ThreadPoolExecutor(max_workers=max_workers)
instance-attribute
Functions
get(key, populate_func, blocking=True)
Retrieve a value from the cache, populating it if necessary.
This method first checks the soft cache for the key. If not found, it checks the hard cache while initiating a background refresh. If the key is not in either cache, it waits for the populate_func to complete and stores the result in both caches.
Locks are used to ensure there is never more than one concurrent population task for a given key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The cache key to retrieve or populate. |
required |
populate_func
|
Callable[[], Any]
|
A function to call to populate the cache if the key is not found. |
required |
blocking
|
bool
|
If True, wait for the cache to be populated if the key is not found. If False, raise NotReadyError if the key is not ready. |
True
|
Returns:
| Name | Type | Description |
|---|---|---|
Any |
T
|
The cached value associated with the key. |
Note
This method is thread-safe and handles concurrent requests for the same key.
Source code in diracx-core/src/diracx/core/utils.py
clear()
Clear all caches and reset the thread pool.
Source code in diracx-core/src/diracx/core/utils.py
Functions
recursive_merge(base, override)
Recursively merge dictionaries; values in override take precedence.
- If both
baseandoverrideare dicts, merge keys recursively. - Otherwise, return
overrideif it is notNone; fallback tobase.
Source code in diracx-core/src/diracx/core/utils.py
dotenv_files_from_environment(prefix)
Get the sorted list of .env files to use for configuration.
Source code in diracx-core/src/diracx/core/utils.py
serialize_credentials(token_response)
Serialize DiracX client credentials to a string.
This method is separated from write_credentials to allow for DIRAC to be able to serialize credentials for inclusion in the proxy file.
Source code in diracx-core/src/diracx/core/utils.py
read_credentials(location)
Read credentials from a file.
Source code in diracx-core/src/diracx/core/utils.py
write_credentials(token_response, *, location=None)
Write credentials received in dirax_preferences.credentials_path.
Source code in diracx-core/src/diracx/core/utils.py
batched_async(iterable, n, *, strict=False)
async
Yield successive n-sized chunks from an async iterable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
iterable
|
async iterable
|
The input async iterable to be batched. |
required |
n
|
int
|
The size of each batch. |
required |
strict
|
bool
|
If True, raises ValueError for incomplete batches. |
False
|
Yields:
| Name | Type | Description |
|---|---|---|
tuple |
AsyncIterable[tuple[T, ...]]
|
A tuple containing the next n elements from the iterable. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If strict is True and the last batch is not of size n. |
Example
async for batch in batched(aiter("ABCDEFG"), 3): ... print(batch) ('A', 'B', 'C') ('D', 'E', 'F') ('G',) async for batch in batched(aiter("ABCDEFG"), 3, strict=True): ... print(batch) ValueError: batched(): incomplete batch