Wrappers¶
Wrappers are a powerful feature of py-key-value that allow you to add
functionality to any key-value store. Wrappers implement the AsyncKeyValue
protocol, so they can be used anywhere a store can be used.
Available Wrappers¶
| Wrapper | Description |
|---|---|
| CompressionWrapper | Compress values before storing and decompress on retrieval |
| FernetEncryptionWrapper | Encrypt values before storing and decrypt on retrieval |
| FallbackWrapper | Fallback to a secondary store when the primary store fails |
| LimitSizeWrapper | Limit the size of entries stored in the cache |
| LoggingWrapper | Log the operations performed on the store |
| PassthroughCacheWrapper | Wrap two stores to provide a read-through cache |
| PrefixCollectionsWrapper | Prefix all collections with a given prefix |
| PrefixKeysWrapper | Prefix all keys with a given prefix |
| ReadOnlyWrapper | Prevent all write operations on the underlying store |
| RetryWrapper | Retry failed operations with exponential backoff |
| SingleCollectionWrapper | Wrap a store to only use a single collection |
| TTLClampWrapper | Clamp the TTL to a given range |
| StatisticsWrapper | Track operation statistics for the store |
| TimeoutWrapper | Add timeout protection to store operations |
What Are Wrappers?¶
Wrappers follow the decorator pattern - they wrap around a key-value store and intercept operations to add additional behavior. Multiple wrappers can be stacked to combine their effects.
Wrapper Pattern Example¶
from key_value.aio.stores.memory import MemoryStore
from key_value.aio.wrappers.logging import LoggingWrapper
from key_value.aio.wrappers.statistics import StatisticsWrapper
# Stack wrappers to combine functionality
store = StatisticsWrapper(
LoggingWrapper(
MemoryStore()
)
)
Execution Order¶
Wrappers execute in the order they are stacked:
- Writes (put, delete): Outer wrapper → Inner wrapper → Store
- Reads (get, ttl): Store → Inner wrapper → Outer wrapper
Wrapper Details¶
CompressionWrapper¶
Compresses values before storing and decompresses on retrieval using gzip compression.
CompressionWrapper
¶
Bases: BaseWrapper
Wrapper that compresses values before storing and decompresses on retrieval.
This wrapper compresses the JSON-serialized value using the specified compression algorithm and stores it as a base64-encoded string within a special key in the dictionary. This allows compression while maintaining the dict[str, Any] interface.
The compressed format looks like: { "compressed_data": "base64-encoded-compressed-data", "compression_algorithm": "gzip", "compression_version": 1 }
__init__
¶
Initialize the compression wrapper.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key_value
|
AsyncKeyValue
|
The store to wrap. |
required |
min_size_to_compress
|
int
|
Only compress values larger than this many bytes. Defaults to 1024 bytes (1KB). |
1024
|
_should_compress
¶
Determine if a value should be compressed based on its size.
Use Cases¶
- Storing large JSON objects
- Reducing network transfer for distributed stores
- Optimizing disk usage
Example¶
from key_value.aio.stores.memory import MemoryStore
from key_value.aio.wrappers.compression import CompressionWrapper
store = CompressionWrapper(
key_value=MemoryStore(),
min_size_to_compress=1024 # Only compress values > 1KB
)
# Large values are automatically compressed
await store.put(
key="large-doc",
value={"content": "..." * 1000},
collection="documents"
)
Performance Considerations¶
- Compression adds CPU overhead but reduces storage/transfer size
- The
min_size_to_compressparameter helps avoid compressing small values where overhead exceeds benefit - Uses gzip with compression level 1 for speed
FernetEncryptionWrapper¶
Encrypts values before storing and decrypts on retrieval using Fernet symmetric encryption.
FernetEncryptionWrapper
¶
Bases: BaseEncryptionWrapper
Wrapper that encrypts values before storing and decrypts on retrieval using Fernet (symmetric encryption).
__init__
¶
__init__(
key_value: AsyncKeyValue,
*,
fernet: Fernet | MultiFernet,
raise_on_decryption_error: bool = True,
) -> None
__init__(
key_value: AsyncKeyValue,
*,
source_material: str,
salt: str,
raise_on_decryption_error: bool = True,
) -> None
__init__(
key_value,
*,
fernet=None,
source_material=None,
salt=None,
raise_on_decryption_error=True,
)
Use Cases¶
- Storing sensitive data (passwords, tokens, PII)
- Compliance with data protection regulations
- Encrypting data at rest
Example¶
from cryptography.fernet import Fernet
from key_value.aio.stores.memory import MemoryStore
from key_value.aio.wrappers.encryption.fernet import FernetEncryptionWrapper
# Generate or load a key
key = Fernet.generate_key()
fernet = Fernet(key)
store = FernetEncryptionWrapper(
key_value=MemoryStore(),
fernet=fernet,
raise_on_decryption_error=True
)
# Values are automatically encrypted
await store.put(
key="secret",
value={"password": "super-secret"},
collection="credentials"
)
Security Considerations¶
- Store encryption keys securely (e.g., environment variables, key management services)
- Use
MultiFernetfor key rotation - Set
raise_on_decryption_error=Trueto detect tampering
FallbackWrapper¶
Provides failover to a secondary store if the primary store fails.
FallbackWrapper
¶
Bases: BaseWrapper
Wrapper that falls back to a secondary store when the primary store fails.
This wrapper attempts operations on the primary store first. If the operation fails with one of the specified exceptions, it automatically falls back to the secondary store. This provides high availability and graceful degradation when the primary store is unavailable.
Note: This wrapper only provides read fallback by default. Writes always go to the primary store. For write fallback, consider using write_to_fallback=True, but be aware of potential consistency issues.
__init__
¶
__init__(
primary_key_value,
fallback_key_value,
fallback_on=(Exception,),
write_to_fallback=False,
)
Initialize the fallback wrapper.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
primary_key_value
|
AsyncKeyValue
|
The primary store to use. |
required |
fallback_key_value
|
AsyncKeyValue
|
The fallback store to use when primary fails. |
required |
fallback_on
|
tuple[type[Exception], ...]
|
Tuple of exception types that trigger fallback. Defaults to (Exception,). |
(Exception,)
|
write_to_fallback
|
bool
|
If True, write operations also fall back to secondary store. If False (default), write operations only go to primary. |
False
|
Use Cases¶
- High availability setups
- Gradual migration between stores
- Local cache with remote fallback
Example¶
from key_value.aio.stores.memory import MemoryStore
from key_value.aio.stores.redis import RedisStore
from key_value.aio.wrappers.fallback import FallbackWrapper
store = FallbackWrapper(
primary=RedisStore(url="redis://localhost:6379/0"),
fallback=MemoryStore()
)
# If Redis is unavailable, operations fall back to MemoryStore
user = await store.get(key="user:123", collection="users")
LimitSizeWrapper¶
Enforces size limits on stored values, raising an error if values exceed the specified size.
LimitSizeWrapper
¶
Bases: BaseWrapper
Wrapper that limits the size of entries stored in the cache. When using a key_value store as a cache, you may want to prevent caching of very small or very large entries. This wrapper allows you to silently (or loudly) ignore entries that do not fall within the specified size limits.
This wrapper checks the serialized size of values before storing them. This incurs a performance penalty as it requires JSON serialization of the value separate from serialization that occurs when the value is stored.
This wrapper does not prevent returning objects (get, ttl, get_many, ttl_many) that exceed the size limit, just storing them (put, put_many).
__init__
¶
__init__(
key_value,
*,
min_size=None,
max_size=None,
raise_on_too_small=False,
raise_on_too_large=True,
)
Initialize the limit size wrapper.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key_value
|
AsyncKeyValue
|
The store to wrap. |
required |
min_size
|
int | None
|
The minimum size (in bytes) allowed for each entry. If None, no minimum size is enforced. |
None
|
max_size
|
int | None
|
The maximum size (in bytes) allowed for each entry. If None, no maximum size is enforced. |
None
|
raise_on_too_small
|
bool
|
If True, raises EntryTooSmallError when an entry is less than min_size. If False (default), silently ignores entries that are too small. |
False
|
raise_on_too_large
|
bool
|
If True (default), raises EntryTooLargeError when an entry exceeds max_size. If False, silently ignores entries that are too large. |
True
|
_within_size_limit
¶
Check if a value exceeds the maximum size.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
dict[str, Any]
|
The value to check. |
required |
collection
|
str | None
|
The collection name (for error messages). |
None
|
key
|
str | None
|
The key name (for error messages). |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the value is within the size limit, False otherwise. |
Raises:
| Type | Description |
|---|---|
EntryTooSmallError
|
If raise_on_too_small is True and the value is less than min_size. |
EntryTooLargeError
|
If raise_on_too_large is True and the value exceeds max_size. |
Use Cases¶
- Preventing storage of excessively large values
- Enforcing data constraints
- Protecting against abuse
Example¶
from key_value.aio.stores.memory import MemoryStore
from key_value.aio.wrappers.limit_size import LimitSizeWrapper
store = LimitSizeWrapper(
key_value=MemoryStore(),
max_size=10240 # 10KB limit
)
# Raises ValueError if value exceeds 10KB
await store.put(
key="doc",
value={"content": "..."},
collection="documents"
)
LoggingWrapper¶
Logs all key-value operations for debugging and auditing.
LoggingWrapper
¶
Bases: BaseWrapper
Wrapper that logs all operations for debugging and auditing.
This wrapper logs all key-value operations including their parameters and results. It's useful for: - Debugging application behavior - Auditing data access - Understanding cache hit/miss patterns - Monitoring performance issues
__init__
¶
Initialize the logging wrapper.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key_value
|
AsyncKeyValue
|
The store to wrap. |
required |
logger
|
Logger | None
|
Logger instance to use. If None, creates a logger named 'key_value.logging'. |
None
|
log_level
|
int
|
Logging level to use. Defaults to logging.INFO. |
INFO
|
log_values
|
bool
|
If True, logs the actual values being stored/retrieved. If False (default), only logs metadata (keys, collections, operation types). Set to False to avoid logging sensitive data. |
False
|
structured_logs
|
bool
|
If True, logs the values as structured data. If False (default), logs the values as a string. |
False
|
Use Cases¶
- Debugging store operations
- Auditing data access
- Performance monitoring
Example¶
import logging
from key_value.aio.stores.memory import MemoryStore
from key_value.aio.wrappers.logging import LoggingWrapper
logging.basicConfig(level=logging.INFO)
store = LoggingWrapper(
key_value=MemoryStore(),
log_level=logging.INFO
)
# All operations are logged
await store.put(key="test", value={"data": "value"})
# INFO: PUT key='test' collection=None ttl=None
PassthroughCacheWrapper¶
Provides read-through caching with a fast local cache and a slower remote store.
PassthroughCacheWrapper
¶
Bases: BaseWrapper
Two-tier wrapper: reads from cache store, falls back to primary and populates cache.
TTLs from the primary are respected when writing into the cache using a clamped TTL policy.
cache_key_value
instance-attribute
¶
cache_key_value = TTLClampWrapper(
key_value=cache_key_value,
min_ttl=0,
max_ttl=maximum_ttl or DEFAULT_MAX_TTL,
missing_ttl=missing_ttl or DEFAULT_MISSING_TTL,
)
__init__
¶
Initialize the passthrough cache wrapper.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
primary_key_value
|
AsyncKeyValue
|
The primary store to wrap. |
required |
cache_key_value
|
AsyncKeyValue
|
The cache store to wrap. |
required |
maximum_ttl
|
SupportsFloat | None
|
The maximum TTL for puts into the cache store. Defaults to 30 minutes. |
None
|
missing_ttl
|
SupportsFloat | None
|
The TTL to use for entries that do not have a TTL. Defaults to 30 minutes. |
None
|
Use Cases¶
- Reducing latency for frequently accessed data
- Reducing load on remote stores
- Hybrid local/remote architectures
Example¶
from key_value.aio.stores.memory import MemoryStore
from key_value.aio.stores.redis import RedisStore
from key_value.aio.wrappers.passthrough_cache import PassthroughCacheWrapper
store = PassthroughCacheWrapper(
cache=MemoryStore(), # Fast local cache
store=RedisStore(url="redis://localhost:6379/0") # Remote store
)
# First read: from Redis, cached in memory
user = await store.get(key="user:123", collection="users")
# Second read: from memory cache (faster)
user = await store.get(key="user:123", collection="users")
PrefixCollectionsWrapper¶
Adds a prefix to all collection names.
PrefixCollectionsWrapper
¶
Bases: BaseWrapper
A wrapper that prefixes collection names before delegating to the underlying store.
default_collection
instance-attribute
¶
__init__
¶
Initialize the prefix collections wrapper.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key_value
|
AsyncKeyValue
|
The store to wrap. |
required |
prefix
|
str
|
The prefix to add to the collections. |
required |
default_collection
|
str | None
|
The default collection to use if no collection is provided. Will be automatically prefixed with the |
None
|
Use Cases¶
- Multi-tenant applications
- Environment separation (dev/staging/prod)
- Namespace isolation
Example¶
from key_value.aio.stores.memory import MemoryStore
from key_value.aio.wrappers.prefix_collections import PrefixCollectionsWrapper
store = PrefixCollectionsWrapper(
key_value=MemoryStore(),
prefix="prod"
)
# Collection becomes "prod:users"
await store.put(
key="alice",
value={"name": "Alice"},
collection="users"
)
PrefixKeysWrapper¶
Adds a prefix to all keys.
PrefixKeysWrapper
¶
Bases: BaseWrapper
A wrapper that prefixes key names before delegating to the underlying store.
__init__
¶
Initialize the prefix keys wrapper.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key_value
|
AsyncKeyValue
|
The store to wrap. |
required |
prefix
|
str
|
The prefix to add to the keys. |
required |
Use Cases¶
- Namespace isolation within collections
- Multi-tenant applications
- Avoiding key collisions
Example¶
from key_value.aio.stores.memory import MemoryStore
from key_value.aio.wrappers.prefix_keys import PrefixKeysWrapper
store = PrefixKeysWrapper(
key_value=MemoryStore(),
prefix="app1"
)
# Key becomes "app1:user:123"
await store.put(
key="user:123",
value={"name": "Alice"},
collection="users"
)
ReadOnlyWrapper¶
Prevents all write operations, making the store read-only.
ReadOnlyWrapper
¶
Bases: BaseWrapper
Wrapper that prevents all write operations on the underlying store.
This wrapper allows all read operations (get, get_many, ttl, ttl_many) to pass through normally, but blocks all write operations (put, put_many, delete, delete_many). This is useful for: - Protecting production data during testing - Enforcing read-only access to read replicas - Preventing accidental modifications in certain environments
__init__
¶
Initialize the read-only wrapper.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key_value
|
AsyncKeyValue
|
The store to wrap. |
required |
raise_on_write
|
bool
|
If True (default), raises ReadOnlyError on write attempts. If False, silently ignores writes (put/put_many return None, delete/delete_many return False/0). |
True
|
Use Cases¶
- Shared read-only caches
- Preventing accidental writes
- Read replicas
Example¶
from key_value.aio.stores.memory import MemoryStore
from key_value.aio.wrappers.read_only import ReadOnlyWrapper
store = ReadOnlyWrapper(
key_value=MemoryStore()
)
# Raises ReadOnlyError
await store.put(key="test", value={"data": "value"})
RetryWrapper¶
Automatically retries failed operations with exponential backoff.
RetryWrapper
¶
Bases: BaseWrapper
Wrapper that retries failed operations with exponential backoff.
This wrapper automatically retries operations that fail with specified exceptions, using exponential backoff between attempts. This is useful for handling transient failures like network issues or temporary service unavailability.
__init__
¶
__init__(
key_value,
max_retries=3,
initial_delay=0.1,
max_delay=10.0,
exponential_base=2.0,
retry_on=(ConnectionError, TimeoutError),
)
Initialize the retry wrapper.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key_value
|
AsyncKeyValue
|
The store to wrap. |
required |
max_retries
|
int
|
Maximum number of retry attempts. Defaults to 3. |
3
|
initial_delay
|
float
|
Initial delay in seconds before first retry. Defaults to 0.1. |
0.1
|
max_delay
|
float
|
Maximum delay in seconds between retries. Defaults to 10.0. |
10.0
|
exponential_base
|
float
|
Base for exponential backoff calculation. Defaults to 2.0. |
2.0
|
retry_on
|
tuple[type[Exception], ...]
|
Tuple of exception types to retry on. Defaults to (ConnectionError, TimeoutError). |
(ConnectionError, TimeoutError)
|
_calculate_delay
¶
Calculate the delay for a given attempt using exponential backoff.
Use Cases¶
- Handling transient network failures
- Improving reliability with remote stores
- Rate limit handling
Example¶
from key_value.aio.stores.redis import RedisStore
from key_value.aio.wrappers.retry import RetryWrapper
store = RetryWrapper(
key_value=RedisStore(url="redis://localhost:6379/0"),
max_retries=3,
initial_delay=0.1,
max_delay=5.0,
exponential_base=2
)
# Automatically retries on failure
user = await store.get(key="user:123", collection="users")
SingleCollectionWrapper¶
Forces all operations to use a single collection, ignoring the collection parameter.
SingleCollectionWrapper
¶
Bases: BaseWrapper
A wrapper that stores all collections within a single backing collection via key prefixing.
default_collection
instance-attribute
¶
__init__
¶
Initialize the prefix collections wrapper.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key_value
|
AsyncKeyValue
|
The store to wrap. |
required |
single_collection
|
str
|
The single collection to use to store all collections. |
required |
default_collection
|
str | None
|
The default collection to use if no collection is provided. |
None
|
separator
|
str | None
|
The separator to use for the key prefix. |
None
|
Use Cases¶
- Simplifying stores that don't need collections
- Migrating from non-collection-based stores
- Enforcing single-collection usage
Example¶
from key_value.aio.stores.memory import MemoryStore
from key_value.aio.wrappers.single_collection import SingleCollectionWrapper
store = SingleCollectionWrapper(
key_value=MemoryStore(),
collection="default"
)
# All operations use "default" collection regardless of parameter
await store.put(key="test", value={"data": "value"}, collection="ignored")
TTLClampWrapper¶
Clamps TTL values to a specified range, ensuring TTLs are within acceptable bounds.
TTLClampWrapper
¶
Bases: BaseWrapper
Wrapper that enforces a maximum TTL for puts into the store.
This wrapper only modifies write operations (put, put_many). All read operations (get, get_many, ttl, ttl_many, delete, delete_many) pass through unchanged to the underlying store.
missing_ttl
instance-attribute
¶
__init__
¶
Initialize the TTL clamp wrapper.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key_value
|
AsyncKeyValue
|
The store to wrap. |
required |
min_ttl
|
SupportsFloat
|
The minimum TTL for puts into the store. |
required |
max_ttl
|
SupportsFloat
|
The maximum TTL for puts into the store. |
required |
missing_ttl
|
SupportsFloat | None
|
The TTL to use for entries that do not have a TTL. Defaults to None. |
None
|
_ttl_clamp
¶
Use Cases¶
- Enforcing minimum/maximum TTLs
- Preventing excessively long or short TTLs
- Backend-specific TTL limitations
Example¶
from key_value.aio.stores.memory import MemoryStore
from key_value.aio.wrappers.ttl_clamp import TTLClampWrapper
store = TTLClampWrapper(
key_value=MemoryStore(),
min_ttl=60, # Minimum 1 minute
max_ttl=86400 # Maximum 1 day
)
# TTL is clamped to range [60, 86400]
await store.put(
key="test",
value={"data": "value"},
ttl=30 # Clamped to 60
)
StatisticsWrapper¶
Tracks operation statistics including counts, hits, and misses.
StatisticsWrapper
¶
Bases: BaseWrapper
Statistics wrapper around a KV Store that tracks operation statistics.
Note: enumeration and destroy operations are not tracked by this wrapper.
Use Cases¶
- Performance monitoring
- Cache hit rate analysis
- Usage analytics
Example¶
from key_value.aio.stores.memory import MemoryStore
from key_value.aio.wrappers.statistics import StatisticsWrapper
store = StatisticsWrapper(
key_value=MemoryStore()
)
# Perform operations
await store.put(key="user:1", value={"name": "Alice"})
await store.get(key="user:1") # Hit
await store.get(key="user:2") # Miss
# Check statistics
stats = store.get_statistics()
print(f"Get operations: {stats.get.count}")
print(f"Get hits: {stats.get.hits}")
print(f"Get misses: {stats.get.misses}")
print(f"Hit rate: {stats.get.hit_rate()}")
# Reset statistics
store.reset_statistics()
TimeoutWrapper¶
Adds timeout constraints to all operations, raising an error if operations exceed the specified timeout.
TimeoutWrapper
¶
Bases: BaseWrapper
Wrapper that adds timeout limits to all operations.
This wrapper ensures that no operation takes longer than the specified timeout. If an operation exceeds the timeout, it raises asyncio.TimeoutError. This is useful for preventing operations from hanging indefinitely and for enforcing SLAs.
__init__
¶
Initialize the timeout wrapper.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key_value
|
AsyncKeyValue
|
The store to wrap. |
required |
timeout
|
float
|
Timeout in seconds for all operations. Defaults to 5.0 seconds. |
5.0
|
Use Cases¶
- Preventing operations from hanging indefinitely
- Enforcing SLA requirements
- Circuit breaker patterns
Example¶
from key_value.aio.stores.redis import RedisStore
from key_value.aio.wrappers.timeout import TimeoutWrapper
store = TimeoutWrapper(
key_value=RedisStore(url="redis://localhost:6379/0"),
timeout=1.0 # 1 second timeout
)
# Raises asyncio.TimeoutError if operation takes > 1 second
user = await store.get(key="user:123", collection="users")
Wrapper Stacking Guide¶
Wrappers can be stacked in any order, but some orderings are more effective than others. Here are some recommended patterns:
Performance Monitoring¶
Production Ready¶
StatisticsWrapper(
RetryWrapper(
TimeoutWrapper(
CompressionWrapper(
FernetEncryptionWrapper(
store
)
)
)
)
)
Development¶
Multi-Tenant¶
Creating Custom Wrappers¶
To create a custom wrapper, extend BaseWrapper and override the methods you
want to modify:
from key_value.aio.wrappers.base import BaseWrapper
from typing_extensions import override
class CustomWrapper(BaseWrapper):
def __init__(self, key_value: AsyncKeyValue):
self.key_value = key_value
super().__init__()
@override
async def get(self, key: str, *, collection: str | None = None):
# Add custom logic before
print(f"Getting key: {key}")
# Call wrapped store
result = await self.key_value.get(key=key, collection=collection)
# Add custom logic after
print(f"Got result: {result}")
return result
See the API Reference for complete wrapper documentation.