Skip to content

Archive datasets flow

archive_datasets_flow(job_id, dataset_ids=None)

Prefect flow to archive a list of datasets. Corresponds to a "Job" in Scicat. Runs the individual archivals of the single datasets as subflows and reports the overall job status to Scicat.

Parameters:

Name Type Description Default
dataset_ids List[str]

description

None
job_id UUID

description

required

Raises:

Type Description
e

description

Source code in backend/archiver/flows/archive_datasets_flow.py
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
@flow(
    name="archive_datasetlist",
    log_prints=True,
    flow_run_name=generate_flow_name_job_id,
    on_failure=[on_job_flow_failure],
    on_cancellation=[on_job_flow_cancellation],
)
def archive_datasets_flow(job_id: UUID, dataset_ids: List[str] | None = None):
    """Prefect flow to archive a list of datasets. Corresponds to a "Job" in Scicat. Runs the individual archivals of the single datasets as subflows and reports
    the overall job status to Scicat.

    Args:
        dataset_ids (List[str]): _description_
        job_id (UUID): _description_

    Raises:
        e: _description_
    """
    dataset_ids: List[str] = dataset_ids or []
    access_token = get_scicat_access_token.submit()

    job_update = update_scicat_archival_job_status.submit(
        job_id=job_id,
        status_code=SciCatClient.JOBSTATUSCODE.IN_PROGRESS,
        status_message=SciCatClient.JOBSTATUSMESSAGE.JOB_IN_PROGRESS,
        token=access_token,
    )
    job_update.result()

    dataset_ids_future = get_job_datasetlist.submit(job_id=job_id, token=access_token)
    dataset_ids = dataset_ids_future.result()

    for id in dataset_ids:
        archive_single_dataset_flow(dataset_id=id)

    access_token = get_scicat_access_token.submit()

    update_scicat_archival_job_status.submit(
        job_id=job_id,
        status_code=SciCatClient.JOBSTATUSCODE.FINISHED_SUCCESSFULLY,
        status_message=SciCatClient.JOBSTATUSMESSAGE.JOB_FINISHED,
        token=access_token,
    ).result()

check_free_space_in_LTS()

Prefect task to wait for free space in the LTS. Checks periodically if the condition for enough free space is fulfilled. Only one of these task runs at time; the others are only scheduled once this task has finished, i.e. there is enough space.

Source code in backend/archiver/flows/archive_datasets_flow.py
212
213
214
215
216
217
218
@task(tags=[ConcurrencyLimits().LTS_FREE_TAG])
def check_free_space_in_LTS():
    """Prefect task to wait for free space in the LTS. Checks periodically if the condition for enough
    free space is fulfilled. Only one of these task runs at time; the others are only scheduled once this task
    has finished, i.e. there is enough space.
    """
    asyncio.run(wait_for_free_space())

copy_datablock_from_LTS(dataset_id, datablock)

Prefect task to move a datablock (.tar.gz file) to the LTS. Concurrency of this task is limited to 2 instances at the same time.

Source code in backend/archiver/flows/archive_datasets_flow.py
234
235
236
237
238
239
240
241
242
243
244
@task(
    task_run_name=generate_task_name_dataset,
    tags=[ConcurrencyLimits().LTS_READ_TAG],
    retries=5,
    retry_delay_seconds=[60, 120, 240, 480, 960],
)
def copy_datablock_from_LTS(dataset_id: str, datablock: DataBlock):
    """Prefect task to move a datablock (.tar.gz file) to the LTS. Concurrency of this task is limited to 2 instances
    at the same time.
    """
    datablocks_operations.copy_file_from_LTS(dataset_id, datablock)

create_datablocks_flow(dataset_id)

Prefect (sub-)flow to create datablocks (.tar.gz files) for files of a dataset and register them in Scicat.

Parameters:

Name Type Description Default
dataset_id str

Dataset id

required

Returns:

Type Description
List[DataBlock]

List[DataBlock]: List of created and registered datablocks

Source code in backend/archiver/flows/archive_datasets_flow.py
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
@flow(name="create_datablocks", flow_run_name=generate_subflow_run_name_job_id_dataset_id)
def create_datablocks_flow(dataset_id: str) -> List[DataBlock]:
    """Prefect (sub-)flow to create datablocks (.tar.gz files) for files of a dataset and register them in Scicat.

    Args:
        dataset_id (str): Dataset id

    Returns:
        List[DataBlock]: List of created and registered datablocks
    """

    scicat_token = get_scicat_access_token.submit()

    dataset_update = update_scicat_archival_dataset_lifecycle.submit(
        dataset_id=dataset_id,
        status=SciCatClient.ARCHIVESTATUSMESSAGE.STARTED,
        token=scicat_token,
    )

    orig_datablocks = get_origdatablocks.with_options(
        on_failure=[partial(on_get_origdatablocks_error, dataset_id)]
    ).submit(dataset_id=dataset_id, token=scicat_token, wait_for=[dataset_update])  # type: ignore

    files = download_origdatablocks.submit(dataset_id=dataset_id, origDataBlocks=orig_datablocks)

    tarfiles_future = create_tarfiles.submit(dataset_id, wait_for=[files])
    datablocks_future = create_datablock_entries.submit(dataset_id, orig_datablocks, tarfiles_future)

    # Prefect issue: https://github.com/PrefectHQ/prefect/issues/12028
    # Exceptions are not propagated correctly
    files.result()
    tarfiles_future.result()
    datablocks_future.result()

    scicat_token = get_scicat_access_token.submit(wait_for=[datablocks_future])

    register_future = register_datablocks.submit(
        datablocks=datablocks_future,  # type: ignore
        dataset_id=dataset_id,
        token=scicat_token,
    )

    register_future.result()

    return datablocks_future

move_data_to_LTS(dataset_id, datablock)

Prefect task to move a datablock (.tar.gz file) to the LTS. Concurrency of this task is limited to 2 instances at the same time.

Source code in backend/archiver/flows/archive_datasets_flow.py
226
227
228
229
230
231
@task(task_run_name=generate_task_name_dataset, tags=[ConcurrencyLimits().LTS_WRITE_TAG])
def move_data_to_LTS(dataset_id: str, datablock: DataBlock):
    """Prefect task to move a datablock (.tar.gz file) to the LTS. Concurrency of this task is limited to 2 instances
    at the same time.
    """
    datablocks_operations.move_data_to_LTS(dataset_id, datablock)

move_datablocks_to_lts_flow(dataset_id, datablocks)

Prefect (sub-)flow to move a datablock to the LTS. Implements the copying of data and verification via checksum.

Parameters:

Name Type Description Default
dataset_id str

description

required
datablock DataBlock

description

required
Source code in backend/archiver/flows/archive_datasets_flow.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
@flow(
    name="move_datablocks_to_lts",
    log_prints=True,
    flow_run_name=generate_subflow_run_name_job_id_dataset_id,
)
def move_datablocks_to_lts_flow(dataset_id: str, datablocks: List[DataBlock]):
    """Prefect (sub-)flow to move a datablock to the LTS. Implements the copying of data and verification via checksum.

    Args:
        dataset_id (str): _description_
        datablock (DataBlock): _description_
    """
    tasks = []
    all_tasks = []

    for datablock in datablocks:
        checksum = calculate_checksum.submit(dataset_id=dataset_id, datablock=datablock)
        free_space = check_free_space_in_LTS.submit(wait_for=[checksum])

        move = move_data_to_LTS.submit(dataset_id=dataset_id, datablock=datablock, wait_for=[free_space])  # type: ignore

        getLogger().info(f"Wait {Variables().ARCHIVER_LTS_WAIT_BEFORE_VERIFY_S}s before verifying datablock")
        sleep = sleep_for.submit(Variables().ARCHIVER_LTS_WAIT_BEFORE_VERIFY_S, wait_for=[move])

        copy = copy_datablock_from_LTS.submit(dataset_id=dataset_id, datablock=datablock, wait_for=[sleep])

        checksum_verification = verify_checksum.submit(
            dataset_id=dataset_id, datablock=datablock, checksum=checksum, wait_for=[copy]
        )

        w = verify_datablock_in_verification.submit(
            dataset_id=dataset_id, datablock=datablock, wait_for=[checksum_verification]
        )  # type: ignore
        tasks.append(w)

        all_tasks.append(free_space)
        all_tasks.append(checksum)
        all_tasks.append(move)
        all_tasks.append(sleep)
        all_tasks.append(copy)
        all_tasks.append(checksum_verification)
        all_tasks.append(w)

    wait_for_futures(tasks)

    # this is necessary to propagate the errors of the tasks
    for t in all_tasks:
        t.result()

on_get_origdatablocks_error(dataset_id, task, task_run, state)

Callback for get_origdatablocks tasks. Reports a user error.

Source code in backend/archiver/flows/archive_datasets_flow.py
49
50
51
52
def on_get_origdatablocks_error(dataset_id: str, task: Task, task_run: TaskRun, state: State):
    """Callback for get_origdatablocks tasks. Reports a user error."""
    scicat_token = get_scicat_access_token()
    report_dataset_user_error(dataset_id, token=scicat_token)

sleep_for(time_in_seconds)

Sleeps for a given amount of time. Required to wait for the LTS to update its internal state. Needs to be blocking as it should prevent the following task to run.

Source code in backend/archiver/flows/archive_datasets_flow.py
204
205
206
207
208
209
@task(task_run_name=generate_sleep_for_task_name)
def sleep_for(time_in_seconds: int):
    """Sleeps for a given amount of time. Required to wait for the LTS to update its internal state.
    Needs to be blocking as it should prevent the following task to run.
    """
    time.sleep(time_in_seconds)

verify_datablock_in_verification(dataset_id, datablock)

Prefect Task to verify a datablock in the LTS against a checksum. Task of this type run with no concurrency since the LTS does only allow limited concurrent access.

Source code in backend/archiver/flows/archive_datasets_flow.py
254
255
256
257
258
259
@task(task_run_name=generate_task_name_dataset)
def verify_datablock_in_verification(dataset_id: str, datablock: DataBlock) -> None:
    """Prefect Task to verify a datablock in the LTS against a checksum. Task of this type run with no concurrency since the LTS
    does only allow limited concurrent access.
    """
    datablocks_operations.verify_datablock_in_verification(dataset_id=dataset_id, datablock=datablock)