Source code for vein.sources.memory.environment

import os

from vein.flow.state import AbstractLiveStateTracker
from vein.exceptions import MissingEnvironmentVariableError

[docs] class EnvironmentLiveTracker(AbstractLiveStateTracker): def __init__(self, key: str, **kwargs): super().__init__(**kwargs) self._key = key
[docs] def get_state_value(self): try: return os.environ[self._key] except KeyError as exc: raise MissingEnvironmentVariableError(self._key) from exc