Singleton abstracting access to all Variables expected to be defined in Prefect
Source code in backend/archiver/config/variables.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 | class Variables:
"""Singleton abstracting access to all Variables expected to be defined in Prefect
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(Variables, cls).__new__(cls)
return cls._instance
def __get(self, name: str) -> str:
c = None
# try for local overrides as env variables
c = os.environ.get(name.upper())
if c is not None:
return c
try:
c = Variable.get(name)
finally:
if c is None:
getLogger().warning(
f"Value {name} not found in config, returning empty string")
return ""
return c
@property
def SCICAT_ENDPOINT(self) -> str:
return self.__get("scicat_endpoint")
@property
def SCICAT_API_PREFIX(self) -> str:
return self.__get("scicat_api_prefix") or "/"
@property
def MINIO_RETRIEVAL_BUCKET(self) -> str:
return self.__get("minio_retrieval_bucket")
@property
def MINIO_LANDINGZONE_BUCKET(self) -> str:
return self.__get("minio_landingzone_bucket")
@property
def MINIO_STAGING_BUCKET(self) -> str:
return self.__get("minio_staging_bucket")
@property
def MINIO_REGION(self) -> str:
return self.__get("minio_region")
@property
def MINIO_ENDPOINT(self) -> str:
return self.__get("minio_endpoint")
@property
def MINIO_EXTERNAL_ENDPOINT(self) -> str:
return self.__get("minio_external_endpoint")
@property
def ARCHIVER_SCRATCH_FOLDER(self) -> Path:
return Path(self.__get("archiver_scratch_folder"))
@property
def ARCHIVER_TARGET_SIZE_MB(self) -> int:
return int(self.__get("archiver_target_size_mb") or 200)
@property
def ARCHIVER_LTS_FILE_TIMEOUT_S(self) -> int:
return int(self.__get("archiver_lts_file_timeout_s") or 10)
@property
def LTS_STORAGE_ROOT(self) -> Path:
return Path(self.__get("lts_storage_root"))
@property
def LTS_FREE_SPACE_PERCENTAGE(self) -> float:
return float(self.__get("lts_free_space_percentage") or 1.0)
|