This document describes the current stable version of Celery (4.3). For development docs, go here.

Source code for kombu.utils.functional

"""Functional Utilities."""
from __future__ import absolute_import, unicode_literals

import random
import sys
import threading

from collections import OrderedDict

    from import Iterable, Mapping
except ImportError:
    from collections import Iterable, Mapping

from itertools import count, repeat
from time import sleep, time

from vine.utils import wraps

from kombu.five import (
    UserDict, items, keys, python_2_unicode_compatible, string_t,

from .encoding import safe_repr as _safe_repr

__all__ = (
    'LRUCache', 'memoize', 'lazy', 'maybe_evaluate',
    'is_list', 'maybe_list', 'dictfilter',

KEYWORD_MARK = object()

class ChannelPromise(object):

    def __init__(self, contract):
        self.__contract__ = contract

    def __call__(self):
            return self.__value__
        except AttributeError:
            value = self.__value__ = self.__contract__()
            return value

    def __repr__(self):
            return repr(self.__value__)
        except AttributeError:
            return '<promise: 0x{0:x}>'.format(id(self.__contract__))

[docs]class LRUCache(UserDict): """LRU Cache implementation using a doubly linked list to track access. Arguments: limit (int): The maximum number of keys to keep in the cache. When a new key is inserted and the limit has been exceeded, the *Least Recently Used* key will be discarded from the cache. """ def __init__(self, limit=None): self.limit = limit self.mutex = threading.RLock() = OrderedDict() def __getitem__(self, key): with self.mutex: value = self[key] = return value
[docs] def update(self, *args, **kwargs): with self.mutex: data, limit =, self.limit data.update(*args, **kwargs) if limit and len(data) > limit: # pop additional items in case limit exceeded for _ in range(len(data) - limit): data.popitem(last=False)
[docs] def popitem(self, last=True): with self.mutex: return
def __setitem__(self, key, value): # remove least recently used key. with self.mutex: if self.limit and len( >= self.limit:[key] = value def __iter__(self): return iter( def _iterate_items(self): with self.mutex: for k in self: try: yield (k,[k]) except KeyError: # pragma: no cover pass iteritems = _iterate_items def _iterate_values(self): with self.mutex: for k in self: try: yield[k] except KeyError: # pragma: no cover pass itervalues = _iterate_values def _iterate_keys(self): # userdict.keys in py3k calls __getitem__ with self.mutex: return keys( iterkeys = _iterate_keys
[docs] def incr(self, key, delta=1): with self.mutex: # this acts as memcached does- store as a string, but return a # integer as long as it exists and we can cast it newval = int( + delta self[key] = str(newval) return newval
def __getstate__(self): d = dict(vars(self)) d.pop('mutex') return d def __setstate__(self, state): self.__dict__ = state self.mutex = threading.RLock() if sys.version_info[0] == 3: # pragma: no cover keys = _iterate_keys values = _iterate_values items = _iterate_items else: # noqa
[docs] def keys(self): return list(self._iterate_keys())
[docs] def values(self): return list(self._iterate_values())
[docs] def items(self): return list(self._iterate_items())
[docs]def memoize(maxsize=None, keyfun=None, Cache=LRUCache): """Decorator to cache function return value.""" def _memoize(fun): mutex = threading.Lock() cache = Cache(limit=maxsize) @wraps(fun) def _M(*args, **kwargs): if keyfun: key = keyfun(args, kwargs) else: key = args + (KEYWORD_MARK,) + tuple(sorted(kwargs.items())) try: with mutex: value = cache[key] except KeyError: value = fun(*args, **kwargs) _M.misses += 1 with mutex: cache[key] = value else: _M.hits += 1 return value def clear(): """Clear the cache and reset cache statistics.""" cache.clear() _M.hits = _M.misses = 0 _M.hits = _M.misses = 0 _M.clear = clear _M.original_func = fun return _M return _memoize
[docs]@python_2_unicode_compatible class lazy(object): """Holds lazy evaluation. Evaluated when called or if the :meth:`evaluate` method is called. The function is re-evaluated on every call. Overloaded operations that will evaluate the promise: :meth:`__str__`, :meth:`__repr__`, :meth:`__cmp__`. """ def __init__(self, fun, *args, **kwargs): self._fun = fun self._args = args self._kwargs = kwargs def __call__(self): return self.evaluate()
[docs] def evaluate(self): return self._fun(*self._args, **self._kwargs)
def __str__(self): return str(self()) def __repr__(self): return repr(self()) def __eq__(self, rhs): return self() == rhs def __ne__(self, rhs): return self() != rhs def __deepcopy__(self, memo): memo[id(self)] = self return self def __reduce__(self): return (self.__class__, (self._fun,), {'_args': self._args, '_kwargs': self._kwargs}) if sys.version_info[0] < 3: def __cmp__(self, rhs): if isinstance(rhs, self.__class__): return -cmp(rhs, self()) return cmp(self(), rhs)
[docs]def maybe_evaluate(value): """Evaluate value only if value is a :class:`lazy` instance.""" if isinstance(value, lazy): return value.evaluate() return value
[docs]def is_list(l, scalars=(Mapping, string_t), iters=(Iterable,)): """Return true if the object is iterable. Note: Returns false if object is a mapping or string. """ return isinstance(l, iters) and not isinstance(l, scalars or ())
[docs]def maybe_list(l, scalars=(Mapping, string_t)): """Return list of one element if ``l`` is a scalar.""" return l if l is None or is_list(l, scalars) else [l]
[docs]def dictfilter(d=None, **kw): """Remove all keys from dict ``d`` whose value is :const:`None`.""" d = kw if d is None else (dict(d, **kw) if kw else d) return {k: v for k, v in items(d) if v is not None}
def shufflecycle(it): it = list(it) # don't modify callers list shuffle = random.shuffle for _ in repeat(None): shuffle(it) yield it[0] def fxrange(start=1.0, stop=None, step=1.0, repeatlast=False): cur = start * 1.0 while 1: if not stop or cur <= stop: yield cur cur += step else: if not repeatlast: break yield cur - step def fxrangemax(start=1.0, stop=None, step=1.0, max=100.0): sum_, cur = 0, start * 1.0 while 1: if sum_ >= max: break yield cur if stop: cur = min(cur + step, stop) else: cur += step sum_ += cur def retry_over_time(fun, catch, args=[], kwargs={}, errback=None, max_retries=None, interval_start=2, interval_step=2, interval_max=30, callback=None, timeout=None): """Retry the function over and over until max retries is exceeded. For each retry we sleep a for a while before we try again, this interval is increased for every retry until the max seconds is reached. Arguments: fun (Callable): The function to try catch (Tuple[BaseException]): Exceptions to catch, can be either tuple or a single exception class. Keyword Arguments: args (Tuple): Positional arguments passed on to the function. kwargs (Dict): Keyword arguments passed on to the function. errback (Callable): Callback for when an exception in ``catch`` is raised. The callback must take three arguments: ``exc``, ``interval_range`` and ``retries``, where ``exc`` is the exception instance, ``interval_range`` is an iterator which return the time in seconds to sleep next, and ``retries`` is the number of previous retries. max_retries (int): Maximum number of retries before we give up. If neither of this and timeout is set, we will retry forever. If one of this and timeout is reached, stop. interval_start (float): How long (in seconds) we start sleeping between retries. interval_step (float): By how much the interval is increased for each retry. interval_max (float): Maximum number of seconds to sleep between retries. timeout (int): Maximum seconds waiting before we give up. """ interval_range = fxrange(interval_start, interval_max + interval_start, interval_step, repeatlast=True) end = time() + timeout if timeout else None for retries in count(): try: return fun(*args, **kwargs) except catch as exc: if max_retries and retries >= max_retries: raise if end and time() > end: raise if callback: callback() tts = float(errback(exc, interval_range, retries) if errback else next(interval_range)) if tts: for _ in range(int(tts)): if callback: callback() sleep(1.0) # sleep remainder after int truncation above. sleep(abs(int(tts) - tts)) def reprkwargs(kwargs, sep=', ', fmt='{0}={1}'): return sep.join(fmt.format(k, _safe_repr(v)) for k, v in items(kwargs)) def reprcall(name, args=(), kwargs={}, sep=', '): return '{0}({1}{2}{3})'.format( name, sep.join(map(_safe_repr, args or ())), (args and kwargs) and sep or '', reprkwargs(kwargs, sep), ) # Compat names (before kombu 3.0) promise = lazy maybe_promise = maybe_evaluate