Source code for temporalcache.interval

# *****************************************************************************
#
# Copyright (c) 2021, the temporal-cache authors.
#
# This file is part of the temporal-cache library, distributed under the terms of
# the Apache License 2.0.  The full license can be found in the LICENSE file.
#
import datetime
from functools import lru_cache, wraps

from frozendict import frozendict

from . import utils
from .persistent_lru_cache import persistent_lru_cache
from .utils import calc


[docs]def interval( seconds=0, minutes=0, hours=0, days=0, weeks=0, months=0, years=0, maxsize=128, persistent="", custom=None, **kwargs ): """Expires all entries in the cache every interval""" if not any((seconds, minutes, hours, days, weeks, months, years)): seconds = 1 def _wrapper(foo): last = datetime.datetime.now() if custom: foo = custom(**kwargs)(foo) elif persistent: foo = persistent_lru_cache(persistent, maxsize=maxsize)(foo) else: foo = lru_cache(maxsize)(foo) @wraps(foo) def _wrapped_foo(*args, **kwargs): nonlocal last now = datetime.datetime.now() if (now - last).total_seconds() > calc( seconds, minutes, hours, days, weeks, months, years ) or utils.TEMPORAL_CACHE_GLOBAL_DISABLE: foo.cache_clear() last = now args = tuple( [ frozendict(arg) if isinstance(arg, dict) else tuple(arg) if isinstance(arg, list) else arg for arg in args ] ) kwargs = { k: frozendict(v) if isinstance(v, dict) else tuple(v) if isinstance(v, list) else v for k, v in kwargs.items() } return foo(*args, **kwargs) return _wrapped_foo return _wrapper
[docs]def minutely(maxsize=128, persistent="", custom=None, **kwargs): def _wrapper(foo): return interval( seconds=60, maxsize=maxsize, persistent=persistent, custom=custom, **kwargs )(foo) return _wrapper
[docs]def hourly(maxsize=128, persistent="", custom=None, **kwargs): def _wrapper(foo): return interval( minutes=60, maxsize=maxsize, persistent=persistent, custom=custom, **kwargs )(foo) return _wrapper
[docs]def daily(maxsize=128, persistent="", custom=None, **kwargs): def _wrapper(foo): return interval( hours=24, maxsize=maxsize, persistent=persistent, custom=custom, **kwargs )(foo) return _wrapper
[docs]def monthly(maxsize=128, persistent="", custom=None, **kwargs): def _wrapper(foo): return interval( months=1, maxsize=maxsize, persistent=persistent, custom=custom, **kwargs )(foo) return _wrapper