Skip to content

Test e2e

test_end_to_end(scicat_token_setup, set_env, s3_client) async

Runs a full workflow, i.e. - creating and registering a dataset - archving (on test volume) - retrieving

It checks the following endpoints involved for the final result - Prefect for flow execution - Minio for the created/retrieved data - Scicat for dataset and job creation

Source code in backend/archiver/tests/test_e2e.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
@pytest.mark.skip(reason="Manually executed end to end test")
@pytest.mark.asyncio
async def test_end_to_end(scicat_token_setup, set_env, s3_client):
    """Runs a full workflow, i.e.
    - creating and registering a dataset
    - archving (on test volume)
    - retrieving

    It checks the following endpoints involved for the final result
    - Prefect for flow execution
    - Minio for the created/retrieved data
    - Scicat for dataset and job creation

    """

    # Create and register dataset -> dataset pid
    dataset_pid = await create_dataset()
    dataset = await get_scicat_dataset(dataset_pid=dataset_pid, token=scicat_token_setup)
    assert dataset is not None

    # Verify Scicat datasetlifecycle
    dataset_lifecycle = dataset.get("datasetlifecycle")
    assert dataset_lifecycle is not None
    assert dataset_lifecycle.get("archiveStatusMessage") == "datasetCreated"
    assert dataset_lifecycle.get("archivable")
    assert not dataset_lifecycle.get("retrievable")

    # Verify datablocks in MINIO
    orig_datablocks = list(map(lambda idx: s3_client.stat_object(bucket=Bucket("landingzone"),
                           filename=f"openem-network/datasets/{dataset_pid}/raw_files/file_{idx}.bin"), range(9)))
    assert len(orig_datablocks) == 9

    # trigger archive job in scicat
    scicat_archival_job_id = await scicat_create_archival_job(dataset=dataset_pid, token=scicat_token_setup)

    # Verify Scicat Job status
    scicat_archival_job_status = await get_scicat_job(job_id=scicat_archival_job_id, token=scicat_token_setup)
    assert scicat_archival_job_status is not None
    assert scicat_archival_job_status.get("type") == "archive"
    assert scicat_archival_job_status.get(
        "statusCode") == "jobCreated" or scicat_archival_job_status.get("statusMessage") == "inProgress"

    time.sleep(10)
    # Verify Prefect Flow
    archival_flow_run_id = await find_flow_in_prefect(scicat_archival_job_id)
    archival_state = await get_flow_result(flow_run_id=archival_flow_run_id)
    assert archival_state is not None

    # Verify Scicat Job status
    scicat_archival_job_status = await get_scicat_job(job_id=scicat_archival_job_id, token=scicat_token_setup)
    assert scicat_archival_job_status is not None
    assert scicat_archival_job_status.get("type") == "archive"
    assert scicat_archival_job_status.get(
        "statusMessage") == "finishedSuccessful"

    # Verify Scicat datasetlifecycle
    dataset = await get_scicat_dataset(dataset_pid=dataset_pid, token=scicat_token_setup)
    assert dataset is not None
    dataset_lifecycle = dataset.get("datasetlifecycle")
    assert dataset_lifecycle is not None
    assert dataset_lifecycle.get(
        "archiveStatusMessage") == "datasetOnArchiveDisk"
    assert dataset_lifecycle.get("retrieveStatusMessage") == ""
    assert not dataset_lifecycle.get("archivable")
    assert dataset_lifecycle.get("retrievable")

    # trigger retrieval job in scicat
    scicat_retrieval_job_id = await scicat_create_retrieval_job(dataset=dataset_pid, token=scicat_token_setup)

    # Verify Scicat Job status
    scicat_retrieval_job_status = await get_scicat_job(job_id=scicat_retrieval_job_id, token=scicat_token_setup)
    assert scicat_retrieval_job_status is not None
    assert scicat_retrieval_job_status.get("type") == "retrieve"
    assert scicat_retrieval_job_status.get(
        "statusCode") == "jobCreated" or scicat_retrieval_job_status.get("statusMessage") == "inProgress"

    time.sleep(10)
    # Verify Prefect Flow
    retrieve_flow_run_id = await find_flow_in_prefect(scicat_retrieval_job_id)
    retrieval_state = await get_flow_result(retrieve_flow_run_id)
    assert retrieval_state is not None

    # Verify Scicat Job status
    scicat_retrieval_job_status = await get_scicat_job(job_id=scicat_retrieval_job_id, token=scicat_token_setup)
    assert scicat_retrieval_job_status is not None
    assert scicat_retrieval_job_status.get("type") == "retrieve"
    assert scicat_retrieval_job_status.get(
        "statusMessage") == "finishedSuccessful"
    assert scicat_retrieval_job_status.get("jobResultObject") is not None
    jobResult = scicat_retrieval_job_status.get(
        "jobResultObject").get("result")
    assert len(jobResult) == 1
    assert jobResult[0].get("datasetId") == dataset_pid
    assert jobResult[0].get("url") is not None
    datablock_url = jobResult[0].get("url")
    datablock_name = jobResult[0].get("name")

    # verify file can be downloaded from MINIO via url in jobresult
    with tempfile.TemporaryDirectory() as temp_dir:
        dest_file: Path = Path(temp_dir) / datablock_name
        urllib.request.urlretrieve(datablock_url, dest_file)
        assert dest_file.exists()

    # Verify retrieved datablock in MINIO
    retrieved_datablock = s3_client.stat_object(
        bucket=Bucket("retrieval"), filename=f"openem-network/datasets/{dataset_pid}/datablocks/{dataset_pid}_0.tar.gz")
    assert retrieved_datablock is not None
    assert retrieved_datablock.Size > 80 * 1024 * 1024

    # Verify Scicat datasetlifecycle
    dataset = await get_scicat_dataset(dataset_pid=dataset_pid, token=scicat_token_setup)
    assert dataset is not None
    dataset_lifecycle: Dict[Any, Any] = dataset.get("datasetlifecycle")
    assert dataset_lifecycle.get("retrieveStatusMessage") == "datasetRetrieved"
    # This is in fact a Scicat issue: fields in the datasetlifecycle are set to the default if not updated
    # https://github.com/SciCatProject/scicat-backend-next/blob/release-jobs/src/datasets/datasets.service.ts#L273
    # assert dataset_lifecycle.get("archiveStatusMessage") == "datasetOnArchiveDisk"
    assert not dataset_lifecycle.get("archivable")
    assert dataset_lifecycle.get("retrievable")