"""
The container module provides implementation of a Container class. Container is a core class, representing a inter-related
set of properties, parameters, Sequence and Record entities. Specific Container types can be used to collect and store
data representing a process execution, such as model training, LLM chain execution, etc.
Container class provides interface for getting and setting it's pre-defined Properties as well as free-form dict-like parameters.
Container class has an interface for managing sequence objects.
"""
import logging
import cachetools.func
from collections import defaultdict
from typing import Optional, Type, Union, Dict, Callable, Iterator, Tuple, Any
from aim._sdk.interfaces.container import (
Container as ABCContainer
)
from aim._sdk.sequence import Sequence
from aim._sdk.interfaces.sequence import SequenceMap, SequenceCollection
from aim._sdk import type_utils
from aim._sdk.utils import generate_hash, utc_timestamp
from aim._sdk.query_utils import ContainerQueryProxy, construct_query_expression
from aim._sdk.collections import ContainerSequenceCollection
from aim._sdk.constants import ContainerOpenMode, KeyNames
from aim._sdk.exceptions import MissingContainerError
from aimcore.cleanup import AutoClean
from aim._core.storage.hashing import hash_auto
from aim._sdk.query import RestrictedPythonQuery
from aim._sdk.context import Context
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from aim._core.storage.treeview import TreeView
from aim._sdk.repo import Repo
from aim._sdk.collections import ContainerCollection
logger = logging.getLogger(__name__)
class ContainerAutoClean(AutoClean['Container']):
PRIORITY = 90
def __init__(self, instance: 'Container') -> None:
super().__init__(instance)
self.mode = instance.mode
self.hash = instance.hash
self._state = instance._state
self._tree = instance._tree
self._props_tree = instance._props_tree
self.storage = instance.storage
self._status_reporter = instance._status_reporter
self._lock = instance._lock
def _set_end_time(self):
"""
Finalize the run by indexing all the data.
"""
self._props_tree['end_time'] = utc_timestamp()
def _wait_for_empty_queue(self):
queue = self.storage.task_queue()
if queue:
queue.wait_for_finish()
def _close(self) -> None:
"""
Close the `Run` instance resources and trigger indexing.
"""
if self.mode == ContainerOpenMode.READONLY:
logger.debug(f'Run {self.hash} is read-only, skipping cleanup')
return
self._state['cleanup'] = True
self._wait_for_empty_queue()
if not self._state.get('deleted'):
self._set_end_time()
if self._status_reporter is not None:
self._status_reporter.close()
if self._lock:
self._lock.release()
class Property:
PROP_NAME_BLACKLIST = ( # do not allow property names to be dict class public methods
'clear', 'copy', 'fromkeys', 'get', 'items', 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values')
def __init__(self, default=None):
self._default = default
self._name = None # Will be set by __set_name__
def __set_name__(self, owner, name):
if name in Property.PROP_NAME_BLACKLIST:
raise RuntimeError(f'Cannot define Aim Property with name \'{name}\'.')
self._name = name
def __get__(self, instance: 'Container', owner):
if instance is None:
return self
return instance._get_property(self._name)
def __set__(self, instance: 'Container', value: Any):
instance._set_property(self._name, value)
def initialize(self, instance: 'Container'):
if callable(self._default):
instance._set_property(self._name, self._default())
else:
instance._set_property(self._name, self._default)
[docs]@type_utils.query_alias('container', 'c')
@type_utils.auto_registry
class Container(ABCContainer):
version = Property(default='1.0.0')
creation_time = Property(default=utc_timestamp)
def __init__(self, hash_: Optional[str] = None, *,
repo: Optional[Union[str, 'Repo']] = None,
mode: Optional[Union[str, ContainerOpenMode]] = ContainerOpenMode.WRITE):
"""
Initializes the Container instance.
Args:
hash_ (str, optional): Unique identifier for the container.
repo (Union[str, 'Repo'], optional): Repository path or Repo object.
mode (Union[str, ContainerOpenMode], optional): Mode for the container (e.g., "READONLY" or "WRITE"). Defaults to "WRITE".
"""
if isinstance(mode, str):
mode = ContainerOpenMode[mode]
self.mode = mode
if repo is None:
from aim._sdk.repo import Repo
repo = Repo.default()
elif isinstance(repo, str):
from aim._sdk.repo import Repo
read_only = True if mode == ContainerOpenMode.READONLY else False
repo = Repo.from_path(repo, read_only=read_only)
self.repo = repo
self.storage = repo.storage_engine
if hash_ is None:
if not self._is_readonly:
self.hash = generate_hash()
else:
raise MissingContainerError(hash_, mode)
else:
if hash_ in repo.container_hashes:
self.hash = hash_
else:
raise MissingContainerError(hash_, mode)
self._resources: Optional[ContainerAutoClean] = None
self._hash = self._calc_hash()
self._lock = None
self._status_reporter = None
self._state = {}
if not self._is_readonly:
self._lock = self.storage.lock(self.hash, 0)
self._status_reporter = self.storage.status_reporter(self.hash)
if self._is_readonly:
self._meta_tree: TreeView = repo._meta_tree
else:
self._meta_tree: TreeView = self.storage.tree(self.hash, 'meta', read_only=self._is_readonly)
self.__storage_init__()
if not self._is_readonly:
if hash_ is None: # newly created Container
self._tree[KeyNames.INFO_PREFIX, KeyNames.OBJECT_CATEGORY] = self.object_category
container_type = self.get_full_typename()
self._tree[KeyNames.INFO_PREFIX, KeyNames.CONTAINER_TYPE] = container_type
self._meta_tree[KeyNames.CONTAINER_TYPES_MAP, self.hash] = container_type
for typename in container_type.split('->'):
self._meta_tree[KeyNames.CONTAINERS, typename] = 1
self[...] = {}
Container._init_properties(self.__class__, self)
self.end_time = None
self._resources = ContainerAutoClean(self)
[docs] @classmethod
def from_storage(cls, storage, meta_tree: 'TreeView', *, hash_: str):
"""
Restores a serialized container instance from given storage and meta tree.
Args:
storage: Storage backend instance.
meta_tree ('TreeView'): Tree view of the metadata.
hash_ (str): Unique identifier for the container.
"""
self = cls.__new__(cls)
self.mode = ContainerOpenMode.READONLY
self.storage = storage
self.hash = hash_
self._resources = None
self._hash = self._calc_hash()
self._lock = None
self._status_reporter = None
self._state = {}
self._meta_tree = meta_tree
self.__storage_init__()
return self
[docs] @classmethod
def filter(cls, expr: str = '', repo: 'Repo' = None) -> 'ContainerCollection':
"""
Filters the containers based on the given expression and repository.
Args:
expr (str, optional): Query expression for filtering.
repo ('Repo', optional): Repository instance. Defaults to the active Repo.
"""
if repo is None:
from aim._sdk.repo import Repo
repo = Repo.active_repo()
return repo.containers(query_=expr, type_=cls)
[docs] @classmethod
def find(cls, hash_: str) -> Optional['Container']:
"""
Finds and returns a container instance based on the given hash.
Args:
hash_ (str): Unique identifier for the container.
"""
from aim._sdk.repo import Repo
repo = Repo.active_repo()
try:
return cls(hash_, repo=repo, mode='READONLY')
except MissingContainerError:
return None
def __storage_init__(self):
self._tree: TreeView = self._meta_tree.subtree('chunks').subtree(self.hash)
self._meta_attrs_tree: TreeView = self._meta_tree.subtree('attrs')
self._attrs_tree: TreeView = self._tree.subtree('attrs')
self._meta_props_tree: TreeView = self._meta_tree.subtree('_props')
self._props_tree: TreeView = self._tree.subtree('_props')
self._data_loader: Callable[[], 'TreeView'] = lambda: self._sequence_data_tree
self.__sequence_data_tree: TreeView = None
self._sequence_map = ContainerSequenceMap(self, Sequence)
@property
def _is_readonly(self) -> bool:
return self.mode == ContainerOpenMode.READONLY
@property
def _sequence_data_tree(self) -> 'TreeView':
if self.__sequence_data_tree is None:
self.__sequence_data_tree = self.storage.tree(
self.hash, 'seqs', read_only=self._is_readonly).subtree('chunks').subtree(self.hash)
return self.__sequence_data_tree
[docs] def __setitem__(self, key, value):
"""
Sets a value in the container for a given key.
Args:
key: Key to set the value for.
value: Value to be set.
"""
self._attrs_tree[key] = value
self._meta_attrs_tree.merge(key, value)
[docs] def set(self, key, value, strict: bool):
"""
Sets a value in the container for a given key with optional strictness.
Args:
key: Key to set the value for.
value: Value to be set.
strict (bool): Whether to enforce strict setting.
"""
self._attrs_tree.set(key, value, strict)
self._meta_attrs_tree.set(key, value, strict)
[docs] def __getitem__(self, key):
"""
Retrieves a value from the container based on the given key.
Args:
key: Key for which value is to be retrieved.
"""
return self._attrs_tree.collect(key, strict=True)
[docs] def get(self, key, default: Any = None, strict: bool = False):
"""
Retrieves a value from the container based on the key or returns a default value.
Args:
key: Key for which value is to be retrieved.
default (optional): Default value to return if key doesn't exist.
strict (bool, optional): Whether to enforce strict retrieval.
"""
try:
return self._attrs_tree.collect(key, strict=strict)
except KeyError:
return default
[docs] def __delitem__(self, key):
"""
Deletes a value in the container based on the given key.
Args:
key: Key for which the value is to be deleted.
"""
del self._attrs_tree[key]
def _set_property(self, name: str, value: Any):
self._props_tree[name] = value
self._meta_props_tree.merge(name, value)
def _get_property(self, name: str, default: Any = None) -> Any:
return self._props_tree.get(name, default)
[docs] def collect_properties(self) -> Dict:
"""
Collects and returns all properties associated with the container as a dictionary object.
"""
try:
return self._props_tree.collect()
except KeyError:
return {}
[docs] def get_logged_typename(self) -> str:
"""
Returns the typename of the logged data in the container.
"""
return self._tree[KeyNames.INFO_PREFIX, KeyNames.CONTAINER_TYPE]
[docs] def match(self, expr) -> bool:
"""
Checks if the container matches the given expression.
"""
query = RestrictedPythonQuery(expr)
query_cache = {}
return self._check(query, query_cache)
def _check(self, query, query_cache, *, aliases=()) -> bool:
proxy = ContainerQueryProxy(self.hash, self._tree, query_cache)
if isinstance(aliases, str):
aliases = (aliases,)
alias_names = self.default_aliases.union(aliases)
query_params = {p: proxy for p in alias_names}
return query.check(**query_params)
[docs] def delete_sequence(self, name, context=None):
"""
Deletes a sequence from the container based on the given name and context.
Args:
name: Name of the sequence to delete.
context (optional): Contextual information for the sequence. `{}` if not specified.
"""
if self._is_readonly:
raise RuntimeError('Cannot delete sequence in read-only mode.')
context = {} if context is None else context
sequence = self._sequence_map._sequence(name, context)
sequence.delete()
[docs] def delete(self):
"""
Deletes the container and its associated data.
"""
if self._is_readonly:
raise RuntimeError('Cannot delete container in read-only mode.')
# remove container meta tree
meta_tree = self.storage.tree(self.hash, 'meta', read_only=False)
del meta_tree.subtree('chunks')[self.hash]
# remove container sequence tree
seq_tree = self.storage.tree(self.hash, 'seqs', read_only=False)
del seq_tree.subtree('chunks')[self.hash]
# remove container blobs trees
blobs_tree = self.storage.tree(self.hash, 'BLOBS', read_only=False)
del blobs_tree.subtree(('meta', 'chunks'))[self.hash]
del blobs_tree.subtree(('seqs', 'chunks'))[self.hash]
# delete entry from container map
del meta_tree.subtree('cont_types_map')[self.hash]
# set a deleted flag
self._state['deleted'] = True
# close the container
self.close()
@property
def sequences(self) -> 'ContainerSequenceMap':
"""
Returns a map of sequences associated with the container.
"""
return self._sequence_map
# TODO [AT]: Implement end_time as a Property similar to other pre-defined props
@property
def end_time(self):
return self._get_property('end_time')
@end_time.setter
def end_time(self, value):
self._set_property('end_time', value)
def __repr__(self) -> str:
return f'<{self.get_typename()} #{hash(self)} hash={self.hash} mode={self.mode}>'
[docs] def __hash__(self) -> int:
return self._hash
def _calc_hash(self):
return hash_auto((self.hash, hash(self.storage.url), str(self.mode)))
[docs] def close(self):
"""
Closes the container and releases any associated resources.
"""
self._resources._close()
@staticmethod
def _init_properties(cls: Type['Container'], inst: 'Container'):
if cls != Container:
for base_cls in cls.__bases__:
if issubclass(base_cls, Container):
Container._init_properties(base_cls, inst)
for attr in cls.__dict__.values():
if isinstance(attr, Property):
attr.initialize(inst)
[docs]class ContainerSequenceMap(SequenceMap[Sequence]):
def __init__(self, container: Container, sequence_cls: Type[Sequence]):
self._container: Container = container
self._sequence_cls: Type[Sequence] = sequence_cls
self._sequence_tree: 'TreeView' = container._tree.subtree(KeyNames.SEQUENCES)
self._data_loader: Callable[[], 'TreeView'] = container._data_loader
[docs] def __call__(self,
query_: Optional[str] = None,
type_: Union[str, Type[Sequence]] = Sequence,
**kwargs) -> SequenceCollection:
"""
Retrieves a sequence collection based on a query expression.
Args:
query_ (str, optional): Query expression for filtering.
type_ (Union[str, Type[Sequence]]): Sequence type or type name. Defaults to Sequence.
**kwargs: Additional keyword arguments.
Returns:
SequenceCollection: The filtered sequence collection.
"""
query_context = {
'storage': self._container.storage,
'var_name': None,
'meta_tree': self._container._meta_tree,
'query_cache': defaultdict(dict),
KeyNames.ALLOWED_VALUE_TYPES: type_utils.get_sequence_value_types(type_),
KeyNames.SEQUENCE_TYPE: type_,
KeyNames.CONTAINER_TYPE: Container,
}
q = construct_query_expression('container', query_, **kwargs)
seq_collection = ContainerSequenceCollection(self._container.hash, query_context)
return seq_collection.filter(q) if q else seq_collection
[docs] def __iter__(self) -> Iterator[Sequence]:
"""
Returns an iterator over the sequences.
Yields:
Sequence: The next sequence in the container.
"""
for ctx_idx in self._sequence_tree.keys():
for name in self._sequence_tree.subtree(ctx_idx).keys():
yield self._sequence_cls(self._container, name=name, context=ctx_idx)
[docs] def __getitem__(self, item: Union[str, Tuple[str, Dict]]) -> Sequence:
"""
Retrieves a sequence based on the given item (name or tuple of name and context).
Args:
item (Union[str, Tuple[str, Dict]]): Name of the sequence or a tuple of name and context.
Returns:
Sequence: The retrieved sequence.
"""
if isinstance(item, str):
name = item
context = {}
else:
assert isinstance(item, tuple)
name = item[0]
context = {} if item[1] is None else item[1]
return self._sequence(name, Context(context))
[docs] def typed_sequence(self, sequence_type: Type[Sequence], name: str, context: Dict):
"""
Retrieves a sequence of specified type based on the given name and context.
Args:
sequence_type (Type[Sequence]): The desired sequence type.
name (str): Name of the sequence.
context (Dict): Contextual information for the sequence.
Returns:
Sequence: The sequence instance.
"""
return self._sequence(name, Context(context), sequence_type=sequence_type)
@cachetools.func.ttl_cache()
def _sequence(self, name: str, context: Context, *, sequence_type: Optional[Type[Sequence]] = None) -> Sequence:
"""
Retrieves or creates a sequence based on the name, context, and optional type.
Args:
name (str): Name of the sequence.
context (Context): Contextual information for the sequence.
sequence_type (Type[Sequence], optional): Desired sequence type. Defaults to self._sequence_cls.
Returns:
Sequence: The sequence instance.
"""
ctx_idx = context.idx
try:
self._sequence_tree.subtree((ctx_idx, name)).last_key()
exists = True
except KeyError:
exists = False
if self._container._is_readonly and not exists:
raise ValueError('Cannot create sequence from a readonly container.')
seq_cls = sequence_type or self._sequence_cls
return seq_cls(self._container, name=name, context=context)
[docs] def __delitem__(self, item: Union[str, Tuple[str, Dict]]):
"""
Deletes a sequence based on the given item (name or tuple of name and context).
Args:
item (Union[str, Tuple[str, Dict]]): Name of the sequence or a tuple of name and context.
"""
if self._container._is_readonly:
raise ValueError('Cannot delete sequence from a readonly container.')
if isinstance(item, str):
name = item
context = {}
else:
assert isinstance(item, tuple)
name = item[0]
context = {} if item[1] is None else item[1]
context_idx = Context(context).idx
del self._sequence_tree[context_idx, name]
data_tree = self._data_loader()
del data_tree[context_idx, name]