Runs a full workflow, i.e.
- creating and registering a dataset
- archving (on test volume)
- retrieving
It checks the following endpoints involved for the final result
- Prefect for flow execution
- Minio for the created/retrieved data
- Scicat for dataset and job creation
Source code in backend/archiver/tests/test_e2e.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323 | @pytest.mark.skip(reason="Manually executed end to end test")
@pytest.mark.asyncio
async def test_end_to_end(scicat_token_setup, set_env, s3_client):
"""Runs a full workflow, i.e.
- creating and registering a dataset
- archving (on test volume)
- retrieving
It checks the following endpoints involved for the final result
- Prefect for flow execution
- Minio for the created/retrieved data
- Scicat for dataset and job creation
"""
# Create and register dataset -> dataset pid
dataset_pid = await create_dataset()
dataset = await get_scicat_dataset(dataset_pid=dataset_pid, token=scicat_token_setup)
assert dataset is not None
# Verify Scicat datasetlifecycle
dataset_lifecycle = dataset.get("datasetlifecycle")
assert dataset_lifecycle is not None
assert dataset_lifecycle.get("archiveStatusMessage") == "datasetCreated"
assert dataset_lifecycle.get("archivable")
assert not dataset_lifecycle.get("retrievable")
# Verify datablocks in MINIO
orig_datablocks = list(map(lambda idx: s3_client.stat_object(bucket=Bucket("landingzone"),
filename=f"openem-network/datasets/{dataset_pid}/raw_files/file_{idx}.bin"), range(9)))
assert len(orig_datablocks) == 9
# trigger archive job in scicat
scicat_archival_job_id = await scicat_create_archival_job(dataset=dataset_pid, token=scicat_token_setup)
# Verify Scicat Job status
scicat_archival_job_status = await get_scicat_job(job_id=scicat_archival_job_id, token=scicat_token_setup)
assert scicat_archival_job_status is not None
assert scicat_archival_job_status.get("type") == "archive"
assert scicat_archival_job_status.get(
"statusCode") == "jobCreated" or scicat_archival_job_status.get("statusMessage") == "inProgress"
time.sleep(10)
# Verify Prefect Flow
archival_flow_run_id = await find_flow_in_prefect(scicat_archival_job_id)
archival_state = await get_flow_result(flow_run_id=archival_flow_run_id)
assert archival_state is not None
# Verify Scicat Job status
scicat_archival_job_status = await get_scicat_job(job_id=scicat_archival_job_id, token=scicat_token_setup)
assert scicat_archival_job_status is not None
assert scicat_archival_job_status.get("type") == "archive"
assert scicat_archival_job_status.get(
"statusMessage") == "finishedSuccessful"
# Verify Scicat datasetlifecycle
dataset = await get_scicat_dataset(dataset_pid=dataset_pid, token=scicat_token_setup)
assert dataset is not None
dataset_lifecycle = dataset.get("datasetlifecycle")
assert dataset_lifecycle is not None
assert dataset_lifecycle.get(
"archiveStatusMessage") == "datasetOnArchiveDisk"
assert dataset_lifecycle.get("retrieveStatusMessage") == ""
assert not dataset_lifecycle.get("archivable")
assert dataset_lifecycle.get("retrievable")
# trigger retrieval job in scicat
scicat_retrieval_job_id = await scicat_create_retrieval_job(dataset=dataset_pid, token=scicat_token_setup)
# Verify Scicat Job status
scicat_retrieval_job_status = await get_scicat_job(job_id=scicat_retrieval_job_id, token=scicat_token_setup)
assert scicat_retrieval_job_status is not None
assert scicat_retrieval_job_status.get("type") == "retrieve"
assert scicat_retrieval_job_status.get(
"statusCode") == "jobCreated" or scicat_retrieval_job_status.get("statusMessage") == "inProgress"
time.sleep(10)
# Verify Prefect Flow
retrieve_flow_run_id = await find_flow_in_prefect(scicat_retrieval_job_id)
retrieval_state = await get_flow_result(retrieve_flow_run_id)
assert retrieval_state is not None
# Verify Scicat Job status
scicat_retrieval_job_status = await get_scicat_job(job_id=scicat_retrieval_job_id, token=scicat_token_setup)
assert scicat_retrieval_job_status is not None
assert scicat_retrieval_job_status.get("type") == "retrieve"
assert scicat_retrieval_job_status.get(
"statusMessage") == "finishedSuccessful"
assert scicat_retrieval_job_status.get("jobResultObject") is not None
jobResult = scicat_retrieval_job_status.get(
"jobResultObject").get("result")
assert len(jobResult) == 1
assert jobResult[0].get("datasetId") == dataset_pid
assert jobResult[0].get("url") is not None
datablock_url = jobResult[0].get("url")
datablock_name = jobResult[0].get("name")
# verify file can be downloaded from MINIO via url in jobresult
with tempfile.TemporaryDirectory() as temp_dir:
dest_file: Path = Path(temp_dir) / datablock_name
urllib.request.urlretrieve(datablock_url, dest_file)
assert dest_file.exists()
# Verify retrieved datablock in MINIO
retrieved_datablock = s3_client.stat_object(
bucket=Bucket("retrieval"), filename=f"openem-network/datasets/{dataset_pid}/datablocks/{dataset_pid}_0.tar.gz")
assert retrieved_datablock is not None
assert retrieved_datablock.Size > 80 * 1024 * 1024
# Verify Scicat datasetlifecycle
dataset = await get_scicat_dataset(dataset_pid=dataset_pid, token=scicat_token_setup)
assert dataset is not None
dataset_lifecycle: Dict[Any, Any] = dataset.get("datasetlifecycle")
assert dataset_lifecycle.get("retrieveStatusMessage") == "datasetRetrieved"
# This is in fact a Scicat issue: fields in the datasetlifecycle are set to the default if not updated
# https://github.com/SciCatProject/scicat-backend-next/blob/release-jobs/src/datasets/datasets.service.ts#L273
# assert dataset_lifecycle.get("archiveStatusMessage") == "datasetOnArchiveDisk"
assert not dataset_lifecycle.get("archivable")
assert dataset_lifecycle.get("retrievable")
|