Skip to content

Wrappers

Wrappers are a powerful feature of py-key-value that allow you to add functionality to any key-value store. Wrappers implement the AsyncKeyValue protocol, so they can be used anywhere a store can be used.

Available Wrappers

Wrapper Description
CompressionWrapper Compress values before storing and decompress on retrieval
FernetEncryptionWrapper Encrypt values before storing and decrypt on retrieval
FallbackWrapper Fallback to a secondary store when the primary store fails
LimitSizeWrapper Limit the size of entries stored in the cache
LoggingWrapper Log the operations performed on the store
PassthroughCacheWrapper Wrap two stores to provide a read-through cache
PrefixCollectionsWrapper Prefix all collections with a given prefix
PrefixKeysWrapper Prefix all keys with a given prefix
ReadOnlyWrapper Prevent all write operations on the underlying store
RetryWrapper Retry failed operations with exponential backoff
SingleCollectionWrapper Wrap a store to only use a single collection
TTLClampWrapper Clamp the TTL to a given range
StatisticsWrapper Track operation statistics for the store
TimeoutWrapper Add timeout protection to store operations

What Are Wrappers?

Wrappers follow the decorator pattern - they wrap around a key-value store and intercept operations to add additional behavior. Multiple wrappers can be stacked to combine their effects.

Wrapper Pattern Example

from key_value.aio.stores.memory import MemoryStore
from key_value.aio.wrappers.logging import LoggingWrapper
from key_value.aio.wrappers.statistics import StatisticsWrapper

# Stack wrappers to combine functionality
store = StatisticsWrapper(
    LoggingWrapper(
        MemoryStore()
    )
)

Execution Order

Wrappers execute in the order they are stacked:

  • Writes (put, delete): Outer wrapper → Inner wrapper → Store
  • Reads (get, ttl): Store → Inner wrapper → Outer wrapper

Wrapper Details

CompressionWrapper

Compresses values before storing and decompresses on retrieval using gzip compression.

CompressionWrapper

Bases: BaseWrapper

Wrapper that compresses values before storing and decompresses on retrieval.

This wrapper compresses the JSON-serialized value using the specified compression algorithm and stores it as a base64-encoded string within a special key in the dictionary. This allows compression while maintaining the dict[str, Any] interface.

The compressed format looks like: { "compressed_data": "base64-encoded-compressed-data", "compression_algorithm": "gzip", "compression_version": 1 }

key_value instance-attribute

key_value = key_value

min_size_to_compress instance-attribute

min_size_to_compress = min_size_to_compress

__init__

__init__(key_value, min_size_to_compress=1024)

Initialize the compression wrapper.

Parameters:

Name Type Description Default
key_value AsyncKeyValue

The store to wrap.

required
min_size_to_compress int

Only compress values larger than this many bytes. Defaults to 1024 bytes (1KB).

1024

_compress_value

_compress_value(value)

Compress a value into the compressed format.

_decompress_value

_decompress_value(value)

Decompress a value from the compressed format.

_should_compress

_should_compress(value)

Determine if a value should be compressed based on its size.

get async

get(key, *, collection=None)

get_many async

get_many(keys, *, collection=None)

put async

put(key, value, *, collection=None, ttl=None)

put_many async

put_many(keys, values, *, collection=None, ttl=None)

ttl async

ttl(key, *, collection=None)

ttl_many async

ttl_many(keys, *, collection=None)

Use Cases

  • Storing large JSON objects
  • Reducing network transfer for distributed stores
  • Optimizing disk usage

Example

from key_value.aio.stores.memory import MemoryStore
from key_value.aio.wrappers.compression import CompressionWrapper

store = CompressionWrapper(
    key_value=MemoryStore(),
    min_size_to_compress=1024  # Only compress values > 1KB
)

# Large values are automatically compressed
await store.put(
    key="large-doc",
    value={"content": "..." * 1000},
    collection="documents"
)

Performance Considerations

  • Compression adds CPU overhead but reduces storage/transfer size
  • The min_size_to_compress parameter helps avoid compressing small values where overhead exceeds benefit
  • Uses gzip with compression level 1 for speed

FernetEncryptionWrapper

Encrypts values before storing and decrypts on retrieval using Fernet symmetric encryption.

FernetEncryptionWrapper

Bases: BaseEncryptionWrapper

Wrapper that encrypts values before storing and decrypts on retrieval using Fernet (symmetric encryption).

__init__

__init__(
    key_value: AsyncKeyValue,
    *,
    fernet: Fernet | MultiFernet,
    raise_on_decryption_error: bool = True,
) -> None
__init__(
    key_value: AsyncKeyValue,
    *,
    source_material: str,
    salt: str,
    raise_on_decryption_error: bool = True,
) -> None
__init__(
    key_value,
    *,
    fernet=None,
    source_material=None,
    salt=None,
    raise_on_decryption_error=True,
)

Use Cases

  • Storing sensitive data (passwords, tokens, PII)
  • Compliance with data protection regulations
  • Encrypting data at rest

Example

from cryptography.fernet import Fernet
from key_value.aio.stores.memory import MemoryStore
from key_value.aio.wrappers.encryption.fernet import FernetEncryptionWrapper

# Generate or load a key
key = Fernet.generate_key()
fernet = Fernet(key)

store = FernetEncryptionWrapper(
    key_value=MemoryStore(),
    fernet=fernet,
    raise_on_decryption_error=True
)

# Values are automatically encrypted
await store.put(
    key="secret",
    value={"password": "super-secret"},
    collection="credentials"
)

Security Considerations

  • Store encryption keys securely (e.g., environment variables, key management services)
  • Use MultiFernet for key rotation
  • Set raise_on_decryption_error=True to detect tampering

FallbackWrapper

Provides failover to a secondary store if the primary store fails.

FallbackWrapper

Bases: BaseWrapper

Wrapper that falls back to a secondary store when the primary store fails.

This wrapper attempts operations on the primary store first. If the operation fails with one of the specified exceptions, it automatically falls back to the secondary store. This provides high availability and graceful degradation when the primary store is unavailable.

Note: This wrapper only provides read fallback by default. Writes always go to the primary store. For write fallback, consider using write_to_fallback=True, but be aware of potential consistency issues.

fallback_key_value instance-attribute

fallback_key_value = fallback_key_value

fallback_on instance-attribute

fallback_on = fallback_on

primary_key_value instance-attribute

primary_key_value = primary_key_value

write_to_fallback instance-attribute

write_to_fallback = write_to_fallback

__init__

__init__(
    primary_key_value,
    fallback_key_value,
    fallback_on=(Exception,),
    write_to_fallback=False,
)

Initialize the fallback wrapper.

Parameters:

Name Type Description Default
primary_key_value AsyncKeyValue

The primary store to use.

required
fallback_key_value AsyncKeyValue

The fallback store to use when primary fails.

required
fallback_on tuple[type[Exception], ...]

Tuple of exception types that trigger fallback. Defaults to (Exception,).

(Exception,)
write_to_fallback bool

If True, write operations also fall back to secondary store. If False (default), write operations only go to primary.

False

delete async

delete(key, *, collection=None)

delete_many async

delete_many(keys, *, collection=None)

get async

get(key, *, collection=None)

get_many async

get_many(keys, *, collection=None)

put async

put(key, value, *, collection=None, ttl=None)

put_many async

put_many(keys, values, *, collection=None, ttl=None)

ttl async

ttl(key, *, collection=None)

ttl_many async

ttl_many(keys, *, collection=None)

Use Cases

  • High availability setups
  • Gradual migration between stores
  • Local cache with remote fallback

Example

from key_value.aio.stores.memory import MemoryStore
from key_value.aio.stores.redis import RedisStore
from key_value.aio.wrappers.fallback import FallbackWrapper

store = FallbackWrapper(
    primary=RedisStore(url="redis://localhost:6379/0"),
    fallback=MemoryStore()
)

# If Redis is unavailable, operations fall back to MemoryStore
user = await store.get(key="user:123", collection="users")

LimitSizeWrapper

Enforces size limits on stored values, raising an error if values exceed the specified size.

LimitSizeWrapper

Bases: BaseWrapper

Wrapper that limits the size of entries stored in the cache. When using a key_value store as a cache, you may want to prevent caching of very small or very large entries. This wrapper allows you to silently (or loudly) ignore entries that do not fall within the specified size limits.

This wrapper checks the serialized size of values before storing them. This incurs a performance penalty as it requires JSON serialization of the value separate from serialization that occurs when the value is stored.

This wrapper does not prevent returning objects (get, ttl, get_many, ttl_many) that exceed the size limit, just storing them (put, put_many).

key_value instance-attribute

key_value = key_value

max_size instance-attribute

max_size = max_size

min_size instance-attribute

min_size = min_size

raise_on_too_large instance-attribute

raise_on_too_large = raise_on_too_large

raise_on_too_small instance-attribute

raise_on_too_small = raise_on_too_small

__init__

__init__(
    key_value,
    *,
    min_size=None,
    max_size=None,
    raise_on_too_small=False,
    raise_on_too_large=True,
)

Initialize the limit size wrapper.

Parameters:

Name Type Description Default
key_value AsyncKeyValue

The store to wrap.

required
min_size int | None

The minimum size (in bytes) allowed for each entry. If None, no minimum size is enforced.

None
max_size int | None

The maximum size (in bytes) allowed for each entry. If None, no maximum size is enforced.

None
raise_on_too_small bool

If True, raises EntryTooSmallError when an entry is less than min_size. If False (default), silently ignores entries that are too small.

False
raise_on_too_large bool

If True (default), raises EntryTooLargeError when an entry exceeds max_size. If False, silently ignores entries that are too large.

True

_within_size_limit

_within_size_limit(value, *, collection=None, key=None)

Check if a value exceeds the maximum size.

Parameters:

Name Type Description Default
value dict[str, Any]

The value to check.

required
collection str | None

The collection name (for error messages).

None
key str | None

The key name (for error messages).

None

Returns:

Type Description
bool

True if the value is within the size limit, False otherwise.

Raises:

Type Description
EntryTooSmallError

If raise_on_too_small is True and the value is less than min_size.

EntryTooLargeError

If raise_on_too_large is True and the value exceeds max_size.

put async

put(key, value, *, collection=None, ttl=None)

put_many async

put_many(keys, values, *, collection=None, ttl=None)

Use Cases

  • Preventing storage of excessively large values
  • Enforcing data constraints
  • Protecting against abuse

Example

from key_value.aio.stores.memory import MemoryStore
from key_value.aio.wrappers.limit_size import LimitSizeWrapper

store = LimitSizeWrapper(
    key_value=MemoryStore(),
    max_size=10240  # 10KB limit
)

# Raises ValueError if value exceeds 10KB
await store.put(
    key="doc",
    value={"content": "..."},
    collection="documents"
)

LoggingWrapper

Logs all key-value operations for debugging and auditing.

LoggingWrapper

Bases: BaseWrapper

Wrapper that logs all operations for debugging and auditing.

This wrapper logs all key-value operations including their parameters and results. It's useful for: - Debugging application behavior - Auditing data access - Understanding cache hit/miss patterns - Monitoring performance issues

key_value instance-attribute

key_value = key_value

log_level instance-attribute

log_level = log_level

log_values instance-attribute

log_values = log_values

logger instance-attribute

logger = logger or getLogger('key_value.logging')

structured_logs instance-attribute

structured_logs = structured_logs

__init__

__init__(
    key_value,
    logger=None,
    log_level=INFO,
    log_values=False,
    structured_logs=False,
)

Initialize the logging wrapper.

Parameters:

Name Type Description Default
key_value AsyncKeyValue

The store to wrap.

required
logger Logger | None

Logger instance to use. If None, creates a logger named 'key_value.logging'.

None
log_level int

Logging level to use. Defaults to logging.INFO.

INFO
log_values bool

If True, logs the actual values being stored/retrieved. If False (default), only logs metadata (keys, collections, operation types). Set to False to avoid logging sensitive data.

False
structured_logs bool

If True, logs the values as structured data. If False (default), logs the values as a string.

False

_format_collection

_format_collection(collection)

Format collection name for logging.

_format_message

_format_message(state, action, keys, collection, values=None, extra=None)

_log

_log(state, action, keys, collection, values=None, extra=None)

delete async

delete(key, *, collection=None)

delete_many async

delete_many(keys, *, collection=None)

get async

get(key, *, collection=None)

get_many async

get_many(keys, *, collection=None)

put async

put(key, value, *, collection=None, ttl=None)

put_many async

put_many(keys, values, *, collection=None, ttl=None)

ttl async

ttl(key, *, collection=None)

ttl_many async

ttl_many(keys, *, collection=None)

Use Cases

  • Debugging store operations
  • Auditing data access
  • Performance monitoring

Example

import logging
from key_value.aio.stores.memory import MemoryStore
from key_value.aio.wrappers.logging import LoggingWrapper

logging.basicConfig(level=logging.INFO)

store = LoggingWrapper(
    key_value=MemoryStore(),
    log_level=logging.INFO
)

# All operations are logged
await store.put(key="test", value={"data": "value"})
# INFO: PUT key='test' collection=None ttl=None

PassthroughCacheWrapper

Provides read-through caching with a fast local cache and a slower remote store.

PassthroughCacheWrapper

Bases: BaseWrapper

Two-tier wrapper: reads from cache store, falls back to primary and populates cache.

TTLs from the primary are respected when writing into the cache using a clamped TTL policy.

cache_key_value instance-attribute

cache_key_value = TTLClampWrapper(
    key_value=cache_key_value,
    min_ttl=0,
    max_ttl=maximum_ttl or DEFAULT_MAX_TTL,
    missing_ttl=missing_ttl or DEFAULT_MISSING_TTL,
)

primary_key_value instance-attribute

primary_key_value = primary_key_value

unwrapped_cache_key_value instance-attribute

unwrapped_cache_key_value = cache_key_value

__init__

__init__(
    primary_key_value, cache_key_value, maximum_ttl=None, missing_ttl=None
)

Initialize the passthrough cache wrapper.

Parameters:

Name Type Description Default
primary_key_value AsyncKeyValue

The primary store to wrap.

required
cache_key_value AsyncKeyValue

The cache store to wrap.

required
maximum_ttl SupportsFloat | None

The maximum TTL for puts into the cache store. Defaults to 30 minutes.

None
missing_ttl SupportsFloat | None

The TTL to use for entries that do not have a TTL. Defaults to 30 minutes.

None

delete async

delete(key, *, collection=None)

delete_many async

delete_many(keys, *, collection=None)

get async

get(key, *, collection=None)

get_many async

get_many(keys, *, collection=None)

put async

put(key, value, *, collection=None, ttl=None)

put_many async

put_many(keys, values, *, collection=None, ttl=None)

ttl async

ttl(key, *, collection=None)

ttl_many async

ttl_many(keys, *, collection=None)

Use Cases

  • Reducing latency for frequently accessed data
  • Reducing load on remote stores
  • Hybrid local/remote architectures

Example

from key_value.aio.stores.memory import MemoryStore
from key_value.aio.stores.redis import RedisStore
from key_value.aio.wrappers.passthrough_cache import PassthroughCacheWrapper

store = PassthroughCacheWrapper(
    cache=MemoryStore(),  # Fast local cache
    store=RedisStore(url="redis://localhost:6379/0")  # Remote store
)

# First read: from Redis, cached in memory
user = await store.get(key="user:123", collection="users")

# Second read: from memory cache (faster)
user = await store.get(key="user:123", collection="users")

PrefixCollectionsWrapper

Adds a prefix to all collection names.

PrefixCollectionsWrapper

Bases: BaseWrapper

A wrapper that prefixes collection names before delegating to the underlying store.

default_collection instance-attribute

default_collection = default_collection or DEFAULT_COLLECTION_NAME

key_value instance-attribute

key_value = key_value

prefix instance-attribute

prefix = prefix

__init__

__init__(key_value, prefix, default_collection=None)

Initialize the prefix collections wrapper.

Parameters:

Name Type Description Default
key_value AsyncKeyValue

The store to wrap.

required
prefix str

The prefix to add to the collections.

required
default_collection str | None

The default collection to use if no collection is provided. Will be automatically prefixed with the prefix

None

_prefix_collection

_prefix_collection(collection)

_unprefix_collection

_unprefix_collection(collection)

delete async

delete(key, *, collection=None)

delete_many async

delete_many(keys, *, collection=None)

get async

get(key, *, collection=None)

get_many async

get_many(keys, *, collection=None)

put async

put(key, value, *, collection=None, ttl=None)

put_many async

put_many(keys, values, *, collection=None, ttl=None)

ttl async

ttl(key, *, collection=None)

ttl_many async

ttl_many(keys, *, collection=None)

Use Cases

  • Multi-tenant applications
  • Environment separation (dev/staging/prod)
  • Namespace isolation

Example

from key_value.aio.stores.memory import MemoryStore
from key_value.aio.wrappers.prefix_collections import PrefixCollectionsWrapper

store = PrefixCollectionsWrapper(
    key_value=MemoryStore(),
    prefix="prod"
)

# Collection becomes "prod:users"
await store.put(
    key="alice",
    value={"name": "Alice"},
    collection="users"
)

PrefixKeysWrapper

Adds a prefix to all keys.

PrefixKeysWrapper

Bases: BaseWrapper

A wrapper that prefixes key names before delegating to the underlying store.

key_value instance-attribute

key_value = key_value

prefix instance-attribute

prefix = prefix

__init__

__init__(key_value, prefix)

Initialize the prefix keys wrapper.

Parameters:

Name Type Description Default
key_value AsyncKeyValue

The store to wrap.

required
prefix str

The prefix to add to the keys.

required

_prefix_key

_prefix_key(key)

_unprefix_key

_unprefix_key(key)

delete async

delete(key, *, collection=None)

delete_many async

delete_many(keys, *, collection=None)

get async

get(key, *, collection=None)

get_many async

get_many(keys, *, collection=None)

put async

put(key, value, *, collection=None, ttl=None)

put_many async

put_many(keys, values, *, collection=None, ttl=None)

ttl async

ttl(key, *, collection=None)

ttl_many async

ttl_many(keys, *, collection=None)

Use Cases

  • Namespace isolation within collections
  • Multi-tenant applications
  • Avoiding key collisions

Example

from key_value.aio.stores.memory import MemoryStore
from key_value.aio.wrappers.prefix_keys import PrefixKeysWrapper

store = PrefixKeysWrapper(
    key_value=MemoryStore(),
    prefix="app1"
)

# Key becomes "app1:user:123"
await store.put(
    key="user:123",
    value={"name": "Alice"},
    collection="users"
)

ReadOnlyWrapper

Prevents all write operations, making the store read-only.

ReadOnlyWrapper

Bases: BaseWrapper

Wrapper that prevents all write operations on the underlying store.

This wrapper allows all read operations (get, get_many, ttl, ttl_many) to pass through normally, but blocks all write operations (put, put_many, delete, delete_many). This is useful for: - Protecting production data during testing - Enforcing read-only access to read replicas - Preventing accidental modifications in certain environments

key_value instance-attribute

key_value = key_value

raise_on_write instance-attribute

raise_on_write = raise_on_write

__init__

__init__(key_value, raise_on_write=True)

Initialize the read-only wrapper.

Parameters:

Name Type Description Default
key_value AsyncKeyValue

The store to wrap.

required
raise_on_write bool

If True (default), raises ReadOnlyError on write attempts. If False, silently ignores writes (put/put_many return None, delete/delete_many return False/0).

True

delete async

delete(key, *, collection=None)

delete_many async

delete_many(keys, *, collection=None)

get async

get(key, *, collection=None)

get_many async

get_many(keys, *, collection=None)

put async

put(key, value, *, collection=None, ttl=None)

put_many async

put_many(keys, values, *, collection=None, ttl=None)

ttl async

ttl(key, *, collection=None)

ttl_many async

ttl_many(keys, *, collection=None)

Use Cases

  • Shared read-only caches
  • Preventing accidental writes
  • Read replicas

Example

from key_value.aio.stores.memory import MemoryStore
from key_value.aio.wrappers.read_only import ReadOnlyWrapper

store = ReadOnlyWrapper(
    key_value=MemoryStore()
)

# Raises ReadOnlyError
await store.put(key="test", value={"data": "value"})

RetryWrapper

Automatically retries failed operations with exponential backoff.

RetryWrapper

Bases: BaseWrapper

Wrapper that retries failed operations with exponential backoff.

This wrapper automatically retries operations that fail with specified exceptions, using exponential backoff between attempts. This is useful for handling transient failures like network issues or temporary service unavailability.

exponential_base instance-attribute

exponential_base = exponential_base

initial_delay instance-attribute

initial_delay = initial_delay

key_value instance-attribute

key_value = key_value

max_delay instance-attribute

max_delay = max_delay

max_retries instance-attribute

max_retries = max_retries

retry_on instance-attribute

retry_on = retry_on

__init__

__init__(
    key_value,
    max_retries=3,
    initial_delay=0.1,
    max_delay=10.0,
    exponential_base=2.0,
    retry_on=(ConnectionError, TimeoutError),
)

Initialize the retry wrapper.

Parameters:

Name Type Description Default
key_value AsyncKeyValue

The store to wrap.

required
max_retries int

Maximum number of retry attempts. Defaults to 3.

3
initial_delay float

Initial delay in seconds before first retry. Defaults to 0.1.

0.1
max_delay float

Maximum delay in seconds between retries. Defaults to 10.0.

10.0
exponential_base float

Base for exponential backoff calculation. Defaults to 2.0.

2.0
retry_on tuple[type[Exception], ...]

Tuple of exception types to retry on. Defaults to (ConnectionError, TimeoutError).

(ConnectionError, TimeoutError)

_calculate_delay

_calculate_delay(attempt)

Calculate the delay for a given attempt using exponential backoff.

delete async

delete(key, *, collection=None)

delete_many async

delete_many(keys, *, collection=None)

get async

get(key, *, collection=None)

get_many async

get_many(keys, *, collection=None)

put async

put(key, value, *, collection=None, ttl=None)

put_many async

put_many(keys, values, *, collection=None, ttl=None)

ttl async

ttl(key, *, collection=None)

ttl_many async

ttl_many(keys, *, collection=None)

Use Cases

  • Handling transient network failures
  • Improving reliability with remote stores
  • Rate limit handling

Example

from key_value.aio.stores.redis import RedisStore
from key_value.aio.wrappers.retry import RetryWrapper

store = RetryWrapper(
    key_value=RedisStore(url="redis://localhost:6379/0"),
    max_retries=3,
    initial_delay=0.1,
    max_delay=5.0,
    exponential_base=2
)

# Automatically retries on failure
user = await store.get(key="user:123", collection="users")

SingleCollectionWrapper

Forces all operations to use a single collection, ignoring the collection parameter.

SingleCollectionWrapper

Bases: BaseWrapper

A wrapper that stores all collections within a single backing collection via key prefixing.

default_collection instance-attribute

default_collection = default_collection or DEFAULT_COLLECTION_NAME

key_value instance-attribute

key_value = key_value

separator instance-attribute

separator = separator or DEFAULT_PREFIX_SEPARATOR

single_collection instance-attribute

single_collection = single_collection

__init__

__init__(key_value, single_collection, default_collection=None, separator=None)

Initialize the prefix collections wrapper.

Parameters:

Name Type Description Default
key_value AsyncKeyValue

The store to wrap.

required
single_collection str

The single collection to use to store all collections.

required
default_collection str | None

The default collection to use if no collection is provided.

None
separator str | None

The separator to use for the key prefix.

None

_prefix_key

_prefix_key(key, collection=None)

_unprefix_key

_unprefix_key(key)

delete async

delete(key, *, collection=None)

delete_many async

delete_many(keys, *, collection=None)

get async

get(key, *, collection=None)

get_many async

get_many(keys, *, collection=None)

put async

put(key, value, *, collection=None, ttl=None)

put_many async

put_many(keys, values, *, collection=None, ttl=None)

ttl async

ttl(key, *, collection=None)

ttl_many async

ttl_many(keys, *, collection=None)

Use Cases

  • Simplifying stores that don't need collections
  • Migrating from non-collection-based stores
  • Enforcing single-collection usage

Example

from key_value.aio.stores.memory import MemoryStore
from key_value.aio.wrappers.single_collection import SingleCollectionWrapper

store = SingleCollectionWrapper(
    key_value=MemoryStore(),
    collection="default"
)

# All operations use "default" collection regardless of parameter
await store.put(key="test", value={"data": "value"}, collection="ignored")

TTLClampWrapper

Clamps TTL values to a specified range, ensuring TTLs are within acceptable bounds.

TTLClampWrapper

Bases: BaseWrapper

Wrapper that enforces a maximum TTL for puts into the store.

This wrapper only modifies write operations (put, put_many). All read operations (get, get_many, ttl, ttl_many, delete, delete_many) pass through unchanged to the underlying store.

key_value instance-attribute

key_value = key_value

max_ttl instance-attribute

max_ttl = float(max_ttl)

min_ttl instance-attribute

min_ttl = float(min_ttl)

missing_ttl instance-attribute

missing_ttl = float(missing_ttl) if missing_ttl is not None else None

__init__

__init__(key_value, min_ttl, max_ttl, missing_ttl=None)

Initialize the TTL clamp wrapper.

Parameters:

Name Type Description Default
key_value AsyncKeyValue

The store to wrap.

required
min_ttl SupportsFloat

The minimum TTL for puts into the store.

required
max_ttl SupportsFloat

The maximum TTL for puts into the store.

required
missing_ttl SupportsFloat | None

The TTL to use for entries that do not have a TTL. Defaults to None.

None

_ttl_clamp

_ttl_clamp(ttl: SupportsFloat) -> float
_ttl_clamp(ttl: SupportsFloat | None) -> float | None
_ttl_clamp(ttl)

put async

put(key, value, *, collection=None, ttl=None)

put_many async

put_many(keys, values, *, collection=None, ttl=None)

Use Cases

  • Enforcing minimum/maximum TTLs
  • Preventing excessively long or short TTLs
  • Backend-specific TTL limitations

Example

from key_value.aio.stores.memory import MemoryStore
from key_value.aio.wrappers.ttl_clamp import TTLClampWrapper

store = TTLClampWrapper(
    key_value=MemoryStore(),
    min_ttl=60,      # Minimum 1 minute
    max_ttl=86400    # Maximum 1 day
)

# TTL is clamped to range [60, 86400]
await store.put(
    key="test",
    value={"data": "value"},
    ttl=30  # Clamped to 60
)

StatisticsWrapper

Tracks operation statistics including counts, hits, and misses.

StatisticsWrapper

Bases: BaseWrapper

Statistics wrapper around a KV Store that tracks operation statistics.

Note: enumeration and destroy operations are not tracked by this wrapper.

_statistics instance-attribute

_statistics = KVStoreStatistics()

key_value instance-attribute

key_value = key_value

statistics property

statistics

__init__

__init__(key_value)

delete async

delete(key, *, collection=None)

delete_many async

delete_many(keys, *, collection=None)

get async

get(key, *, collection=None)

get_many async

get_many(keys, *, collection=None)

put async

put(key, value, *, collection=None, ttl=None)

put_many async

put_many(keys, values, *, collection=None, ttl=None)

ttl async

ttl(key, *, collection=None)

ttl_many async

ttl_many(keys, *, collection=None)

Use Cases

  • Performance monitoring
  • Cache hit rate analysis
  • Usage analytics

Example

from key_value.aio.stores.memory import MemoryStore
from key_value.aio.wrappers.statistics import StatisticsWrapper

store = StatisticsWrapper(
    key_value=MemoryStore()
)

# Perform operations
await store.put(key="user:1", value={"name": "Alice"})
await store.get(key="user:1")  # Hit
await store.get(key="user:2")  # Miss

# Check statistics
stats = store.get_statistics()
print(f"Get operations: {stats.get.count}")
print(f"Get hits: {stats.get.hits}")
print(f"Get misses: {stats.get.misses}")
print(f"Hit rate: {stats.get.hit_rate()}")

# Reset statistics
store.reset_statistics()

TimeoutWrapper

Adds timeout constraints to all operations, raising an error if operations exceed the specified timeout.

TimeoutWrapper

Bases: BaseWrapper

Wrapper that adds timeout limits to all operations.

This wrapper ensures that no operation takes longer than the specified timeout. If an operation exceeds the timeout, it raises asyncio.TimeoutError. This is useful for preventing operations from hanging indefinitely and for enforcing SLAs.

key_value instance-attribute

key_value = key_value

timeout instance-attribute

timeout = timeout

__init__

__init__(key_value, timeout=5.0)

Initialize the timeout wrapper.

Parameters:

Name Type Description Default
key_value AsyncKeyValue

The store to wrap.

required
timeout float

Timeout in seconds for all operations. Defaults to 5.0 seconds.

5.0

delete async

delete(key, *, collection=None)

delete_many async

delete_many(keys, *, collection=None)

get async

get(key, *, collection=None)

get_many async

get_many(keys, *, collection=None)

put async

put(key, value, *, collection=None, ttl=None)

put_many async

put_many(keys, values, *, collection=None, ttl=None)

ttl async

ttl(key, *, collection=None)

ttl_many async

ttl_many(keys, *, collection=None)

Use Cases

  • Preventing operations from hanging indefinitely
  • Enforcing SLA requirements
  • Circuit breaker patterns

Example

from key_value.aio.stores.redis import RedisStore
from key_value.aio.wrappers.timeout import TimeoutWrapper

store = TimeoutWrapper(
    key_value=RedisStore(url="redis://localhost:6379/0"),
    timeout=1.0  # 1 second timeout
)

# Raises asyncio.TimeoutError if operation takes > 1 second
user = await store.get(key="user:123", collection="users")

Wrapper Stacking Guide

Wrappers can be stacked in any order, but some orderings are more effective than others. Here are some recommended patterns:

Performance Monitoring

StatisticsWrapper(
    LoggingWrapper(
        TimeoutWrapper(
            store
        )
    )
)

Production Ready

StatisticsWrapper(
    RetryWrapper(
        TimeoutWrapper(
            CompressionWrapper(
                FernetEncryptionWrapper(
                    store
                )
            )
        )
    )
)

Development

LoggingWrapper(
    StatisticsWrapper(
        store
    )
)

Multi-Tenant

PrefixCollectionsWrapper(
    PrefixKeysWrapper(
        store
    )
)

Creating Custom Wrappers

To create a custom wrapper, extend BaseWrapper and override the methods you want to modify:

from key_value.aio.wrappers.base import BaseWrapper
from typing_extensions import override

class CustomWrapper(BaseWrapper):
    def __init__(self, key_value: AsyncKeyValue):
        self.key_value = key_value
        super().__init__()

    @override
    async def get(self, key: str, *, collection: str | None = None):
        # Add custom logic before
        print(f"Getting key: {key}")

        # Call wrapped store
        result = await self.key_value.get(key=key, collection=collection)

        # Add custom logic after
        print(f"Got result: {result}")

        return result

See the API Reference for complete wrapper documentation.