Source code for vein.sources.files

import os
from typing import Optional

from vein.backoff import BackoffPolicy
from vein.cache import Cache
from vein.flow.items import LiveFlowItem

# TODO add these back
# from vein.sources.files.ini import IniTracker
# from vein.sources.files.json import JsonTracker
from vein.sources.files.yaml import YamlFileReader


[docs] def from_yaml( file_path: os.PathLike, mode: str = "r", encoding: str = "utf-8", ): yaml_reader = YamlFileReader( file_path=file_path, mode=mode, encoding=encoding, ) return yaml_reader.get_data()
[docs] def from_json( file_path: os.PathLike, mode: str = "r", encoding: str = "utf-8", use_cache: bool = True, retry: bool = False, backoff_policy: Optional[BackoffPolicy] = None, cache: Optional[Cache] = None, ) -> LiveFlowItem: # TODO HIGH: Add back in json tracker return None return LiveFlowItem( JsonTracker( file_path=file_path, mode=mode, encoding=encoding, use_cache=use_cache, retry=retry, backoff_policy=backoff_policy, cache=cache, ) )
[docs] def from_ini( file_path: os.PathLike, mode: str = "r", encoding: str = "utf-8", use_cache: bool = True, retry: bool = False, backoff_policy: Optional[BackoffPolicy] = None, cache: Optional[Cache] = None, ) -> LiveFlowItem: return LiveFlowItem( IniTracker( file_path=file_path, mode=mode, encoding=encoding, use_cache=use_cache, retry=retry, backoff_policy=backoff_policy, cache=cache, ) )