Skip to content

Archive datasets flow

archive_datasets_flow(job_id, dataset_ids=None)

Prefect flow to archive a list of datasets. Corresponds to a "Job" in Scicat. Runs the individual archivals of the single datasets as subflows and reports the overall job status to Scicat.

Parameters:

Name Type Description Default
dataset_ids List[str]

description

None
job_id UUID

description

required

Raises:

Type Description
e

description

Source code in backend/archiver/flows/archive_datasets_flow.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
@flow(
    name="archive_datasetlist", log_prints=True,
    flow_run_name=generate_flow_name_job_id,
    on_failure=[on_job_flow_failure],
    on_cancellation=[on_job_flow_cancellation])
def archive_datasets_flow(job_id: UUID, dataset_ids: List[str] | None = None):
    """Prefect flow to archive a list of datasets. Corresponds to a "Job" in Scicat. Runs the individual archivals of the single datasets as subflows and reports
    the overall job status to Scicat.

    Args:
        dataset_ids (List[str]): _description_
        job_id (UUID): _description_

    Raises:
        e: _description_
    """
    dataset_ids: List[str] = dataset_ids or []
    access_token = get_scicat_access_token()

    job_update = update_scicat_archival_job_status.submit(
        job_id=job_id, status=SciCatClient.JOBSTATUS.IN_PROGRESS, token=access_token)
    job_update.result()

    if len(dataset_ids) == 0:
        dataset_ids_future = get_job_datasetlist.submit(job_id=job_id, token=access_token)
        dataset_ids = dataset_ids_future.result()

    try:
        for id in dataset_ids:
            archive_single_dataset_flow(dataset_id=id, scicat_token=access_token)
    except Exception as e:
        raise e

    update_scicat_archival_job_status.submit(
        job_id=job_id, status=SciCatClient.JOBSTATUS.FINISHED_SUCCESSFULLY, token=access_token).result()

check_free_space_in_LTS()

Prefect task to wait for free space in the LTS. Checks periodically if the condition for enough free space is fulfilled. Only one of these task runs at time; the others are only scheduled once this task has finished, i.e. there is enough space.

Source code in backend/archiver/flows/archive_datasets_flow.py
49
50
51
52
53
54
55
@task(tags=[ConcurrencyLimits().LTS_FREE_TAG])
def check_free_space_in_LTS():
    """ Prefect task to wait for free space in the LTS. Checks periodically if the condition for enough
    free space is fulfilled. Only one of these task runs at time; the others are only scheduled once this task
    has finished, i.e. there is enough space.
    """
    asyncio.run(wait_for_free_space())

create_datablocks(dataset_id, origDataBlocks)

Prefect task to create datablocks.

Parameters:

Name Type Description Default
dataset_id str

dataset id

required
origDataBlocks List[OrigDataBlock]

List of OrigDataBlocks (Pydantic Model)

required

Returns:

Type Description
List[DataBlock]

List[DataBlock]: List of DataBlocks (Pydantic Model)

Source code in backend/archiver/flows/archive_datasets_flow.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@task(task_run_name=generate_task_name_dataset)
def create_datablocks(dataset_id: str, origDataBlocks: List[OrigDataBlock]) -> List[DataBlock]:
    """Prefect task to create datablocks.

    Args:
        dataset_id (str): dataset id
        origDataBlocks (List[OrigDataBlock]): List of OrigDataBlocks (Pydantic Model)

    Returns:
        List[DataBlock]: List of DataBlocks (Pydantic Model)
    """

    s3_client = get_s3_client()
    return datablocks_operations.create_datablocks(s3_client, dataset_id, origDataBlocks)

create_datablocks_flow(dataset_id, scicat_token)

Prefect (sub-)flow to create datablocks (.tar.gz files) for files of a dataset and register them in Scicat.

Parameters:

Name Type Description Default
dataset_id str

Dataset id

required

Returns:

Type Description
List[DataBlock]

List[DataBlock]: List of created and registered datablocks

Source code in backend/archiver/flows/archive_datasets_flow.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
@flow(name="create_datablocks", flow_run_name=generate_subflow_run_name_job_id_dataset_id)
def create_datablocks_flow(dataset_id: str, scicat_token: SecretStr) -> List[DataBlock]:
    """Prefect (sub-)flow to create datablocks (.tar.gz files) for files of a dataset and register them in Scicat.

    Args:
        dataset_id (str): Dataset id

    Returns:
        List[DataBlock]: List of created and registered datablocks
    """

    dataset_update = update_scicat_archival_dataset_lifecycle.submit(
        dataset_id=dataset_id,
        status=SciCatClient.ARCHIVESTATUSMESSAGE.STARTED,
        token=scicat_token
    )

    orig_datablocks = get_origdatablocks.with_options(
        on_failure=[partial(on_get_origdatablocks_error, dataset_id)]
    ).submit(
        dataset_id=dataset_id,
        token=scicat_token,
        wait_for=[dataset_update]
    )  # type: ignore

    datablocks_future = create_datablocks.submit(
        dataset_id=dataset_id,
        origDataBlocks=orig_datablocks
    )  # type: ignore

    register_datablocks.submit(
        datablocks=datablocks_future,  # type: ignore
        dataset_id=dataset_id,
        token=scicat_token
    ).wait()

    return datablocks_future.result()

move_data_to_LTS(dataset_id, datablock)

Prefect task to move a datablock (.tar.gz file) to the LTS. Concurrency of this task is limited to 2 instances at the same time.

Source code in backend/archiver/flows/archive_datasets_flow.py
58
59
60
61
62
63
64
@task(task_run_name=generate_task_name_dataset, tags=[ConcurrencyLimits().MOVE_TO_LTS_TAG])
def move_data_to_LTS(dataset_id: str, datablock: DataBlock):
    """ Prefect task to move a datablock (.tar.gz file) to the LTS. Concurrency of this task is limited to 2 instances
    at the same time.
    """
    s3_client = get_s3_client()
    return datablocks_operations.move_data_to_LTS(s3_client, dataset_id, datablock)

move_datablock_to_lts_flow(dataset_id, datablock)

Prefect (sub-)flow to move a datablock to the LTS. Implements the copying of data and verification via checksum.

Parameters:

Name Type Description Default
dataset_id str

description

required
datablock DataBlock

description

required
Source code in backend/archiver/flows/archive_datasets_flow.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@flow(name="move_datablocks_to_lts", log_prints=True, flow_run_name=generate_subflow_run_name_job_id_dataset_id)
def move_datablock_to_lts_flow(dataset_id: str, datablock: DataBlock):
    """Prefect (sub-)flow to move a datablock to the LTS. Implements the copying of data and verification via checksum.

    Args:
        dataset_id (str): _description_
        datablock (DataBlock): _description_
    """

    wait = check_free_space_in_LTS.submit()

    move_data = move_data_to_LTS.submit(
        dataset_id=dataset_id,
        datablock=datablock,
        wait_for=[wait]
    )  # type: ignore

    verify_data_in_LTS.submit(
        dataset_id=dataset_id,
        datablock=datablock,
        wait_for=[move_data]
    ).result()  # type: ignore

on_get_origdatablocks_error(dataset_id, task, task_run, state)

Callback for get_origdatablocks tasks. Reports a user error.

Source code in backend/archiver/flows/archive_datasets_flow.py
26
27
28
29
30
def on_get_origdatablocks_error(dataset_id: str, task: Task, task_run: TaskRun, state: State):
    """Callback for get_origdatablocks tasks. Reports a user error.
    """
    scicat_token = get_scicat_access_token()
    report_dataset_user_error(dataset_id, token=scicat_token)

verify_data_in_LTS(dataset_id, datablock)

Prefect Task to verify a datablock in the LTS against a checksum. Task of this type run with no concurrency since the LTS does only allow limited concurrent access.

Source code in backend/archiver/flows/archive_datasets_flow.py
67
68
69
70
71
72
73
@task(task_run_name=generate_task_name_dataset, tags=[ConcurrencyLimits().VERIFY_LTS_TAG])
def verify_data_in_LTS(dataset_id: str, datablock: DataBlock) -> None:
    """ Prefect Task to verify a datablock in the LTS against a checksum. Task of this type run with no concurrency since the LTS
    does only allow limited concurrent access.
    """
    datablocks_operations.verify_data_in_LTS(
        dataset_id, datablock)