Source code for aim.sdk.run

import logging

import os
import datetime
import json
import pytz
import sys

from collections import defaultdict

from aim.sdk.base_run import BaseRun
from aim.sdk.sequence import Sequence
from aim.sdk.tracker import RunTracker
from aim.sdk.reporter import RunStatusReporter
from aim.sdk.sequence_collection import SingleRunSequenceCollection
from aim.sdk.utils import (
    backup_run,
)
from aim.ext.utils import (
    get_environment_variables,
    get_installed_packages,
    get_git_info,
)
from aim.sdk.types import AimObject
from aim.sdk.configs import AIM_RUN_INDEXING_TIMEOUT

from aim.storage.treeview import TreeView
from aim.storage.context import Context
from aim.storage import treeutils

from aim.ext.resource import ResourceTracker, DEFAULT_SYSTEM_TRACKING_INT
from aim.ext.cleanup import AutoClean

from typing import Any, Dict, Iterator, Optional, Tuple, Union
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from pandas import DataFrame

    from aim.sdk.sequences.metric import Metric
    from aim.sdk.sequences.image_sequence import Images
    from aim.sdk.sequences.audio_sequence import Audios
    from aim.sdk.sequences.distribution_sequence import Distributions
    from aim.sdk.sequences.figure_sequence import Figures
    from aim.sdk.sequences.text_sequence import Texts
    from aim.sdk.sequence_collection import SequenceCollection
    from aim.ext.resource.log import Logs
    from aim.sdk.repo import Repo

logger = logging.getLogger(__name__)


class RunAutoClean(AutoClean['Run']):
    PRIORITY = 90

    def __init__(self, instance: 'Run') -> None:
        """
        Prepare the `Run` for automatic cleanup.

        Args:
            instance: The `Run` instance to be cleaned up.
        """
        super().__init__(instance)

        self.read_only = instance.read_only
        self.hash = instance.hash
        self.meta_run_tree = instance.meta_run_tree
        self.repo = instance.repo
        self._system_resource_tracker = instance._system_resource_tracker
        # this reference is needed for system resource tracker finalization
        self._tracker = instance._tracker
        self._checkins = instance._checkins

    def finalize_run(self):
        """
        Finalize the run by indexing all the data.
        """
        self.meta_run_tree['end_time'] = datetime.datetime.now(pytz.utc).timestamp()
        try:
            timeout = os.getenv(AIM_RUN_INDEXING_TIMEOUT, 2 * 60)
            index = self.repo._get_index_tree('meta', timeout=timeout).view(())
            logger.debug(f'Indexing Run {self.hash}...')
            self.meta_run_tree.finalize(index=index)
        except TimeoutError:
            logger.warning(f'Cannot index Run {self.hash}. Index is locked.')

    def finalize_system_tracker(self):
        """
        Stop the system resource tracker before closing the run.
        """
        if self._system_resource_tracker is not None:
            logger.debug('Stopping resource tracker')
            self._system_resource_tracker.stop()

    def finalize_rpc_queue(self):
        if self.repo.is_remote_repo:
            self.repo._client.get_queue(self.hash).stop()
            self.repo._client.remove_queue(self.hash)

    def _close(self) -> None:
        """
        Close the `Run` instance resources and trigger indexing.
        """
        if self.read_only:
            logger.debug(f'Run {self.hash} is read-only, skipping cleanup')
            return
        self.finalize_system_tracker()
        self.finalize_run()
        self.finalize_rpc_queue()
        if self._checkins is not None:
            self._checkins.close()


# TODO: [AT] generate automatically based on ModelMappedRun
class StructuredRunMixin:
    @property
    def name(self):
        """Run name, set by user.

            :getter: Returns run's name.
            :setter: Sets run's name.
            :type: string
        """
        return self.props.name

    @name.setter
    def name(self, value):
        self.props.name = value

    @property
    def description(self):
        """Run description, set by user.

            :getter: Returns run's description.
            :setter: Sets run's description.
            :type: string
        """
        return self.props.description

    @description.setter
    def description(self, value):
        self.props.description = value

    @property
    def archived(self):
        """Check is run archived or not.

            :getter: Returns run's archived state.
            :setter: Archive/un-archive run.
            :type: bool
        """
        return self.props.archived

    @archived.setter
    def archived(self, value):
        self.props.archived = value

    @property
    def creation_time(self):
        """Run object creation time [UTC] as timestamp.

            :getter: Returns run creation time.
        """
        return self.props.creation_time

    @property
    def created_at(self):
        if self.repo.is_remote_repo:
            return datetime.datetime.fromtimestamp(self.creation_time, tz=pytz.utc).replace(tzinfo=None)
        else:
            return self.props.created_at

    @property
    def finalized_at(self):
        if self.end_time:
            return datetime.datetime.fromtimestamp(self.end_time, tz=pytz.utc).replace(tzinfo=None)
        else:
            return None

    @property
    def end_time(self):
        """Run finalization time [UTC] as timestamp.

            :getter: Returns run finalization time.
        """
        try:
            return self.meta_run_tree['end_time']
        except KeyError:
            # run saved with old version. fallback to sqlite data
            return self.props.end_time

    @property
    def duration(self):
        """Run duration in seconds (end_time - creation_time)

            :getter: Returns run duration.
        """
        if self.end_time:
            return self.end_time - self.creation_time
        else:
            from aim.web.api.utils import datetime_now
            return datetime_now().timestamp() - self.creation_time

    @property
    def active(self):
        """Check if run is active or not.

            :getter: Returns run's active state.
            :type: bool
        """

        if self.end_time:
            return False
        else:
            return True

    @property
    def experiment(self):
        """Run experiment.

            :getter: Returns run's experiment name.
            :setter: Sets run's experiment.
            :type: string
        """
        return self.props.experiment

    @experiment.setter
    def experiment(self, value):
        self.props.experiment = value

    @property
    def tags(self):
        """List of run tags.

            :getter: Returns run's tag list.
        """
        return self.props.tags

    def add_tag(self, value):
        """Add tag to run

        Args:
            value (str): Tag to add.
        """
        return self.props.add_tag(value)

    def remove_tag(self, tag_name):
        """Remove run tag.

        Args:
            tag_name (str): :obj:`name` of tag to be removed.
        """
        return self.props.remove_tag(tag_name)


[docs]class Run(BaseRun, StructuredRunMixin): """Run object used for tracking metrics. Provides method :obj:`track` to track value and object series for multiple names and contexts. Provides dictionary-like interface for Run object meta-parameters. Provides API for iterating through tracked sequences. Args: run_hash (:obj:`str`, optional): Run's hash. If skipped, generated automatically. repo (:obj:`Union[Repo,str], optional): Aim repository path or Repo object to which Run object is bound. If skipped, default Repo is used. read_only (:obj:`bool`, optional): Run creation mode. Default is False, meaning Run object can be used to track metrics. experiment (:obj:`str`, optional): Sets Run's `experiment` property. 'default' if not specified. Can be used later to query runs/sequences. system_tracking_interval (:obj:`int`, optional): Sets the tracking interval in seconds for system usage metrics (CPU, Memory, etc.). Set to `None` to disable system metrics tracking. log_system_params (:obj:`bool`, optional): Enable/Disable logging of system params such as installed packages, git info, environment variables, etc. """ _metric_version_warning_shown = False def __init__(self, run_hash: Optional[str] = None, *, repo: Optional[Union[str, 'Repo']] = None, read_only: bool = False, experiment: Optional[str] = None, system_tracking_interval: Optional[Union[int, float]] = DEFAULT_SYSTEM_TRACKING_INT, log_system_params: Optional[bool] = False, capture_terminal_logs: Optional[bool] = True): self._resources: Optional[RunAutoClean] = None super().__init__(run_hash, repo=repo, read_only=read_only) self.meta_attrs_tree: TreeView = self.meta_tree.subtree('attrs') self.meta_run_attrs_tree: TreeView = self.meta_run_tree.subtree('attrs') if not read_only: logger.debug(f'Opening Run {self.hash} in write mode') if self.check_metrics_version(): if self.repo.is_remote_repo: logger.warning(f'Cannot track Run with remote repo {self.repo.path}. Please upgrade repo first ' f'with the following command:') logger.warning(f'aim storage --repo {self.repo.path} upgrade 3.11+ \'*\'') raise RuntimeError else: logger.warning(f'Detected sub-optimal format metrics for Run {self.hash}. Upgrading...') backup_path = backup_run(self) try: self.update_metrics() logger.warning(f'Successfully converted Run {self.hash}') logger.warning(f'Run backup can be found at {backup_path}. ' f'In case of any issues the following command can be used to restore data: ' f'`aim storage --repo {self.repo.root_path} restore {self.hash}`') except Exception as e: logger.error(f'Failed to convert metrics. {e}') logger.warning(f'Run backup can be found at {backup_path}. ' f'To restore data please run the following command: ' f'`aim storage --repo {self.repo.root_path} restore {self.hash}`') raise self._props = None self._checkins = None if not read_only: if not self.repo.is_remote_repo: self._checkins = RunStatusReporter(self) if log_system_params: system_params = { 'packages': get_installed_packages(), 'env_variables': get_environment_variables(), 'git_info': get_git_info(), 'executable': sys.executable, 'arguments': sys.argv } self.__setitem__("__system_params", system_params) try: self.meta_run_attrs_tree.first_key() except (KeyError, StopIteration): # no run params are set. use empty dict self[...] = {} self.meta_run_tree['end_time'] = None self.props if experiment: self.experiment = experiment self._tracker = RunTracker(self) self._system_resource_tracker: ResourceTracker = None self._prepare_resource_tracker(system_tracking_interval, capture_terminal_logs) self._resources = RunAutoClean(self) def __hash__(self) -> int: return super().__hash__() def idx_to_ctx(self, idx: int) -> Context: return self._tracker.idx_to_ctx(idx)
[docs] def __setitem__(self, key: str, val: Any): """Set Run top-level meta-parameter. Args: key (:obj:`str`): Top-level meta-parameter name. Use ellipsis to reset run's all meta-parameters. val: Meta-parameter value. Examples: >>> run = Run('3df703c') >>> run[...] = params >>> run['hparams'] = {'batch_size': 42} """ self.meta_run_attrs_tree[key] = val self.meta_attrs_tree[key] = val
[docs] def __getitem__(self, key): """Get run meta-parameter by key. Args: key: path to Run meta-parameter. Returns: Collected sub-tree of Run meta-parameters. Examples: >>> run = Run('3df703c') >>> run['hparams'] # -> {'batch_size': 42} >>> run['hparams', 'batch_size'] # -> 42 """ return self._collect(key)
def set(self, key, val: Any, strict: bool = True): self.meta_run_attrs_tree.set(key, val, strict) self.meta_attrs_tree.set(key, val, strict) def get(self, key, default: Any = None, strict: bool = True, resolve_objects=False): try: return self._collect(key, strict=strict, resolve_objects=resolve_objects) except KeyError: return default def _collect(self, key, strict: bool = True, resolve_objects: bool = False): return self.meta_run_attrs_tree.collect(key, strict=strict, resolve_objects=resolve_objects) def _prepare_resource_tracker( self, tracking_interval: Union[int, float] = None, capture_terminal_logs: bool = True ): if self.read_only: return if ResourceTracker.check_interval(tracking_interval) or capture_terminal_logs: current_logs = self.get_terminal_logs() log_offset = current_logs.last_step() + 1 if current_logs else 0 self._system_resource_tracker = ResourceTracker(self._tracker, tracking_interval, capture_terminal_logs, log_offset) self._system_resource_tracker.start()
[docs] def __delitem__(self, key: str): """Remove key from run meta-params. Args: key: meta-parameter path """ del self.meta_attrs_tree[key] del self.meta_run_attrs_tree[key]
[docs] def track( self, value, name: str = None, step: int = None, epoch: int = None, *, context: AimObject = None, ): """Main method for tracking numeric value series and object series. Args: value: The tracked value. name (str): Tracked sequence name. step (:obj:`int`, optional): Sequence tracking iteration. Auto-incremented if not specified. epoch (:obj:`int`, optional): The training epoch. context (:obj:`dict`, optional): Sequence tracking context. Appends the tracked value to sequence specified by `name` and `context`. Appended values should be of the same type, in other words, sequence is a homogeneous collection. """ self._tracker(value, name, step, epoch, context=context)
@property def props(self): if self._props is None: self._props = self.repo.request_props(self.hash, self.read_only) return self._props
[docs] def iter_metrics_info(self) -> Iterator[Tuple[str, Context, 'Run']]: """Iterator for all run metrics info. Yields: tuples of (name, context, run) where run is the Run object itself and name, context defines Metric type sequence (with values of `float` and `int`). """ yield from self.iter_sequence_info_by_type(('float', 'int'))
[docs] def iter_sequence_info_by_type(self, dtypes: Union[str, Tuple[str, ...]]) -> Iterator[Tuple[str, Context, 'Run']]: """Iterator for run sequence infos for the given object data types Args: dtypes: The objects data types list. Yields: tuples of (name, context, run) where run is the Run object itself and name, context defines sequence for one of `dtypes` types. """ if isinstance(dtypes, str): dtypes = (dtypes,) for ctx_idx, run_ctx_dict in self.meta_run_tree.subtree('traces').items(): assert isinstance(ctx_idx, int) ctx = self.idx_to_ctx(ctx_idx) # run_ctx_view = run_meta_traces.view(ctx_idx) for seq_name in run_ctx_dict.keys(): assert isinstance(seq_name, str) # skip sequences not matching dtypes. # sequences with no dtype are considered to be float sequences. # '*' stands for all data types if '*' in dtypes or run_ctx_dict[seq_name].get('dtype', 'float') in dtypes: yield seq_name, ctx, self
[docs] def metrics(self) -> 'SequenceCollection': """Get iterable object for all run tracked metrics. Returns: :obj:`MetricCollection`: Iterable for run metrics. Examples: >>> run = Run('3df703c') >>> for metric in run.metrics(): >>> metric.values.sparse_numpy() """ return SingleRunSequenceCollection(self)
def __eq__(self, other: 'Run') -> bool: return self.hash == other.hash and self.repo == other.repo
[docs] def get_metric( self, name: str, context: Context ) -> Optional['Metric']: """Retrieve metric sequence by it's name and context. Args: name (str): Tracked metric name. context (:obj:`Context`): Tracking context. Returns: :obj:`Metric` object if exists, `None` otherwise. """ if self.read_only and not Run._metric_version_warning_shown: if self.check_metrics_version(): logger.warning(f'Detected sub-optimal format metrics for Run {self.hash}. Consider upgrading repo ' f'to improve queries performance:') logger.warning(f'aim storage --repo {self.repo.path} upgrade 3.11+ \'*\'') Run._metric_version_warning_shown = True return self._get_sequence('metric', name, context)
[docs] def get_image_sequence( self, name: str, context: Context ) -> Optional['Images']: """Retrieve images sequence by it's name and context. Args: name (str): Tracked image sequence name. context (:obj:`Context`): Tracking context. Returns: :obj:`Images` object if exists, `None` otherwise. """ return self._get_sequence('images', name, context)
[docs] def get_figure_sequence( self, name: str, context: Context ) -> Optional['Figures']: """Retrieve figure sequence by its name and context. Args: name (str): Tracked figure sequence name. context (:obj:`Context`): Tracking context. Returns: :obj:`Figures` object if exists, `None` otherwise. """ return self._get_sequence('figures', name, context)
[docs] def get_audio_sequence( self, name: str, context: Context ) -> Optional['Audios']: """Retrieve audios sequence by its name and context. Args: name (str): Tracked audios sequence name. context (:obj:`Context`): Tracking context. Returns: :obj:`Audios` object if exists, `None` otherwise. """ return self._get_sequence('audios', name, context)
[docs] def get_distribution_sequence( self, name: str, context: Context ) -> Optional['Distributions']: """Retrieve distributions sequence by it's name and context. Args: name (str): Tracked distribution sequence name. context (:obj:`Context`): Tracking context. Returns: :obj:`Distributions` object if exists, `None` otherwise. """ return self._get_sequence('distributions', name, context)
[docs] def get_terminal_logs(self) -> Optional['Logs']: """Retrieve duplicated terminal logs for a run Returns: :obj:`Logs` object if exists, `None` otherwise. """ return self._get_sequence('logs', 'logs', Context({}))
[docs] def get_text_sequence( self, name: str, context: Context ) -> Optional['Texts']: """Retrieve texts sequence by it's name and context. Args: name (str): Tracked text sequence name. context (:obj:`Context`): Tracking context. Returns: :obj:`Texts` object if exists, `None` otherwise. """ return self._get_sequence('texts', name, context)
def _get_sequence_dtype( self, sequence_name: str, context: Context ) -> str: try: return self.meta_run_tree.subtree(('traces', hash(context), sequence_name, 'dtype')).collect() except KeyError: # fallback to `float`, cause in older versions there was no `dtype` return 'float' def _get_sequence( self, seq_type: str, sequence_name: str, context: Context ) -> Optional[Sequence]: seq_cls = Sequence.registry.get(seq_type, None) if seq_cls is None: raise ValueError(f'\'{seq_type}\' is not a valid Sequence') assert issubclass(seq_cls, Sequence) tracked_dtype = self._get_sequence_dtype(sequence_name, context) if tracked_dtype not in seq_cls.allowed_dtypes(): return None sequence = seq_cls(sequence_name, context, self) return sequence if bool(sequence) else None
[docs] def collect_sequence_info( self, sequence_types: Union[str, Tuple[str, ...]], skip_last_value=False ) -> Dict[str, list]: """Retrieve Run's all sequences general overview. Args: sequence_types: Type names of sequences for which to collect name/context pairs. skip_last_value (:obj:`bool`, optional): Boolean flag to include tracked sequence last value in sequence info. False by default. Returns: :obj:`list`: list of sequence's `context`, `name` and optionally last tracked value triplets. """ traces = self.meta_run_tree.subtree('traces') traces_overview = {} # build reverse map of sequence supported dtypes dtype_to_sequence_type_map = defaultdict(list) if isinstance(sequence_types, str): sequence_types = (sequence_types,) for seq_type in sequence_types: traces_overview[seq_type] = [] seq_cls = Sequence.registry.get(seq_type, None) if seq_cls is None: raise ValueError(f'\'{seq_type}\' is not a valid Sequence') assert issubclass(seq_cls, Sequence) dtypes = seq_cls.allowed_dtypes() for dtype in dtypes: dtype_to_sequence_type_map[dtype].append(seq_type) for idx in traces.keys(): ctx_dict = self.idx_to_ctx(idx).to_dict() for name, value in traces[idx].items(): dtype = value.get('dtype', 'float') # old sequences without dtype set are considered float sequences if dtype in dtype_to_sequence_type_map: trace_data = { 'context': ctx_dict, 'name': name, } if not skip_last_value: trace_data['last_value'] = value for seq_type in dtype_to_sequence_type_map[dtype]: traces_overview[seq_type].append(trace_data) return traces_overview
def _cleanup_trees(self): del self.meta_run_attrs_tree del self.meta_attrs_tree del self.meta_run_tree del self.meta_tree del self._series_run_trees self.meta_run_attrs_tree = None self.meta_run_tree = None self.meta_attrs_tree = None self.meta_tree = None self._series_run_trees = None def close(self): if self._resources is None: return self._resources.close() self._tracker.sequence_infos.clear() # de-reference trees and other resources del self._resources del self._props self._resources = None self._props = None self._cleanup_trees() def finalize(self): if self._resources is None: return self._resources.finalize_run()
[docs] def dataframe( self, include_props: bool = True, include_params: bool = True, ) -> 'DataFrame': """Get run properties and params as pandas DataFrame Args: include_props: (:obj:`int`, optional): If true, include run structured props include_params: (:obj:`int`, optional): If true, include run parameters """ data = { 'hash': self.hash, } if include_props: # TODO [GA]: Auto collect props based on StructuredRunMixin: # - Exclude created_at, updated_at, finalized_at auto-populated fields # - Collect list of representations in case of ModelMappedCollection's data['name'] = self.props.name data['description'] = self.props.description data['archived'] = self.props.archived data['creation_time'] = self.props.creation_time data['end_time'] = self.end_time data['active'] = self.active data['experiment'] = self.props.experiment data['tags'] = json.dumps(self.props.tags) if include_params: # TODO [GA]: # - Move run params collection to utility function # - Remove code duplication from Metric.dataframe for path, val in treeutils.unfold_tree(self[...], unfold_array=False, depth=3): s = '' for key in path: if isinstance(key, str): s += f'.{key}' if len(s) else f'{key}' else: s += f'[{key}]' if isinstance(val, (tuple, list, dict)): val = json.dumps(val) if s not in data.keys(): data[s] = val import pandas as pd df = pd.DataFrame(data, index=[0]) return df
[docs] def report_progress( self, *, expect_next_in: int = 0, block: bool = False, ) -> None: """ Report progress for the run. Report the expected time for the next progress report. If no progress reports are received by the expiry date (plus the grace period), the run is considered as failed. Args: expect_next_in: (:obj:`int`, optional): The number of seconds to wait before the next progress report. block: (:obj:`bool`, optional): If true, block the thread until the report is written to filesystem. """ if self._checkins is None: raise ValueError('Progress reports are not enabled for this run') self._checkins._check_in(expect_next_in=expect_next_in, block=block)
[docs] def report_successful_finish( self, *, block: bool = True, ) -> None: """ Report successful finish of the run. If the run is not marked as successfully finished, it can potentially be considered as failed. Args: block: (:obj:`bool`, optional): If true, block the thread until the report is written to filesystem. """ if self._checkins is None: raise ValueError('Progress reports are not enabled for this run') self._checkins._report_successful_finish(block=block)