Skip to content

Datablocks

calculate_md5_checksum(filename, chunksize=1024 * 1025)

Calculate an md5 hash of a file

Parameters:

Name Type Description Default
filename Path

absolute or relative path to file

required
chunksize int

default chunk size to calculate hash on. Defaults to 1024*1025.

1024 * 1025

Returns:

Name Type Description
str str

hash as str

Source code in backend/archiver/utils/datablocks.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
@ log
def calculate_md5_checksum(filename: Path, chunksize: int = 1024 * 1025) -> str:
    """Calculate an md5 hash of a file

    Args:
        filename (Path): absolute or relative path to file
        chunksize (int, optional): default chunk size to calculate hash on. Defaults to 1024*1025.

    Returns:
        str: hash as str
    """
    import hashlib
    m = hashlib.md5()
    with open(filename, 'rb') as f:
        while chunk := f.read(chunksize):
            m.update(chunk)
    return m.hexdigest()

copy_file_to_folder(src_file, dst_folder)

Copies a file to a destination folder (does not need to exist)

Parameters:

Name Type Description Default
src_file Path

Source file

required
dst_folder Path

destination folder - needs to exist

required

Raises:

Type Description
SystemError

raises if operation fails

Source code in backend/archiver/utils/datablocks.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
@ log
def copy_file_to_folder(src_file: Path, dst_folder: Path):
    """Copies a file to a destination folder (does not need to exist)

    Args:
        src_file (Path): Source file
        dst_folder (Path): destination folder - needs to exist

    Raises:
        SystemError: raises if operation fails
    """
    if not src_file.exists() or not src_file.is_file():
        raise SystemError(
            f"Source file {src_file} is not a file or does not exist")
    if dst_folder.is_file():
        raise SystemError(f"Destination folder {dst_folder} is not a folder")

    getLogger().info(f"Start Copy operation. src:{src_file}, dst{dst_folder}")

    with subprocess.Popen(["rsync", "-rcvz", "--stats", "--mkpath",
                          str(src_file), str(dst_folder)],
                          stdout=subprocess.PIPE,
                          stderr=subprocess.STDOUT,
                          universal_newlines=True) as popen:

        for line in popen.stdout:
            getLogger().info(line)

        popen.stdout.close()
        return_code = popen.wait()
        getLogger().info(f"Finished with return code : {return_code}")

        expected_dst_file = dst_folder / src_file.name

    if not expected_dst_file.exists():
        raise SystemError(f"Copying did not produce file {expected_dst_file}")

create_datablock_entries(dataset_id, folder, origDataBlocks, tar_infos)

Create datablock entries compliant with schema provided by scicat

Parameters:

Name Type Description Default
dataset_id str

Dataset identifier

required
folder Path

description

required
origDataBlocks List[OrigDataBlock]

description

required
tarballs List[Path]

description

required

Returns:

Type Description
List[DataBlock]

List[DataBlock]: description

Source code in backend/archiver/utils/datablocks.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
@ log
def create_datablock_entries(
        dataset_id: str, folder: Path, origDataBlocks: List[OrigDataBlock],
        tar_infos: List[TarInfo]) -> List[DataBlock]:
    """Create datablock entries compliant with schema provided by scicat

    Args:
        dataset_id (str): Dataset identifier
        folder (Path): _description_
        origDataBlocks (List[OrigDataBlock]): _description_
        tarballs (List[Path]): _description_

    Returns:
        List[DataBlock]: _description_
    """

    version = 1.0

    datablocks: List[DataBlock] = []
    for tar in tar_infos:
        o = origDataBlocks[0]

        data_file_list: List[DataFile] = []

        tar_path = folder / tar.path

        tarball = tarfile.open(tar_path)

        for tar_info in tarball.getmembers():
            checksum = calculate_md5_checksum(StoragePaths.scratch_archival_raw_files_folder(
                dataset_id) / tar_info.path)

            data_file_list.append(DataFile(
                path=tar_info.path,
                size=tar_info.size,
                chk=checksum,
                uid=str(tar_info.uid),
                gid=str(tar_info.gid),
                perm=str(tar_info.mode),
                time=str(datetime.datetime.now(datetime.UTC).isoformat())
            ))

        datablocks.append(DataBlock(
            archiveId=str(StoragePaths.relative_datablocks_folder(
                dataset_id) / tar_path.name),
            size=tar.unpackedSize,
            packedSize=tar.packedSize,
            chkAlg="md5",
            version=str(version),
            dataFileList=data_file_list,
            rawDatasetId=o.rawdatasetId,
            derivedDatasetId=o.derivedDatasetId
        ))

    return datablocks

create_tarballs(dataset_id, src_folder, dst_folder, target_size=300 * 1024 ** 2)

Create datablocks, i.e. .tar.gz files, from files in a folder. The files will be named according to the dataset they belong to. The target size of the created files is 300 MB by default

Parameters:

Name Type Description Default
dataset_id str
required
folder Path

description

required
target_size int

description. Defaults to 300(1024*2).

300 * 1024 ** 2

Returns:

Type Description
List[TarInfo]

List[Path]: description

Source code in backend/archiver/utils/datablocks.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@log
def create_tarballs(dataset_id: str, src_folder: Path, dst_folder: Path,
                    target_size: int = 300 * (1024**2)) -> List[TarInfo]:
    """ Create datablocks, i.e. .tar.gz files, from files in a folder. The files will be named according to
    the dataset they belong to. The target size of the created files is 300 MB by default

    Args:
        dataset_id (str):
        folder (Path): _description_
        target_size (int, optional): _description_. Defaults to 300*(1024**2).

    Returns:
        List[Path]: _description_
    """

    # TODO: corner case: target size < file size
    tarballs: List[TarInfo] = []
    tar_name = dataset_id.replace("/", "--")

    current_tar_info = TarInfo(
        unpackedSize=0,
        packedSize=0,
        path=Path(dst_folder / Path(f"{tar_name}_{len(tarballs)}.tar.gz")))

    current_tarfile: tarfile.TarFile = tarfile.open(current_tar_info.path, 'w')

    if not any(Path(src_folder).iterdir()):
        raise SystemError(f"Empty folder {src_folder} found.")

    for file in src_folder.iterdir():
        if file.stat().st_size > target_size:
            raise SystemError(
                f"Size of {file} is larger than target size {target_size}. Increase target_size.")
        if current_tar_info.path.stat().st_size + file.stat().st_size > target_size:
            current_tar_info.packedSize = current_tar_info.path.stat().st_size
            current_tarfile.close()
            tarballs.append(current_tar_info)

            current_tar_info = TarInfo(
                unpackedSize=0,
                packedSize=0,
                path=Path(dst_folder / Path(f"{tar_name}_{len(tarballs)}.tar.gz")))
            current_tarfile = tarfile.open(current_tar_info.path, 'w')

        current_tar_info.unpackedSize += file.stat().st_size
        current_tarfile.add(name=file, arcname=file.name, recursive=False)

    current_tar_info.packedSize = current_tar_info.path.stat().st_size
    current_tarfile.close()
    tarballs.append(current_tar_info)

    return tarballs

download_object_from_s3(client, bucket, folder, object_name, target_path)

Download an object from S3 storage.

Parameters:

Name Type Description Default
bucket Bucket

Bucket to look for file

required
folder Path

s3 prefix for object

required
object_name str

object name, no prefix

required
target_path Path

absolute or relative path for the file to be created

required
Source code in backend/archiver/utils/datablocks.py
119
120
121
122
123
124
125
126
127
128
129
130
@ log
def download_object_from_s3(client: S3Storage, bucket: Bucket, folder: Path, object_name: str, target_path: Path):
    """Download an object from S3 storage.

    Args:
        bucket (Bucket): Bucket to look for file
        folder (Path): s3 prefix for object
        object_name (str): object name, no prefix
        target_path (Path): absolute or relative path for the file to be created
    """
    client.fget_object(bucket=bucket, folder=str(folder),
                       object_name=object_name, target_path=target_path)

download_objects_from_s3(client, prefix, bucket, destination_folder)

Download objects form s3 storage to folder

Parameters:

Name Type Description Default
prefix Path

S3 prefix

required
bucket Bucket

s3 bucket

required
destination_folder Path

Target folder. Will be created if it does not exist.

required

Returns:

Type Description
List[Path]

List[Path]: List of paths of created files

Source code in backend/archiver/utils/datablocks.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
@ log
def download_objects_from_s3(client: S3Storage, prefix: Path, bucket: Bucket, destination_folder: Path) -> List[Path]:
    """Download objects form s3 storage to folder

    Args:
        prefix (Path): S3 prefix
        bucket (Bucket): s3 bucket
        destination_folder (Path): Target folder. Will be created if it does not exist.

    Returns:
        List[Path]: List of paths of created files
    """
    destination_folder.mkdir(parents=True, exist_ok=True)

    files: List[Path] = []

    for item in client.list_objects(bucket, str(prefix)):
        item_name = Path(item.Name).name
        local_filepath = destination_folder / item_name
        local_filepath.parent.mkdir(parents=True, exist_ok=True)
        client.fget_object(bucket=bucket, folder=str(prefix),
                           object_name=item.Name, target_path=local_filepath)
        files.append(local_filepath)

    if len(files) == 0:
        raise SystemError(f"No files found in bucket {bucket} at {prefix}")

    return files

list_datablocks(client, prefix, bucket)

List all objects in s3 bucket and path

Parameters:

Name Type Description Default
minio_prefix Path

prefix for files to be listed

required
bucket Bucket

s3 bucket

required

Returns:

Name Type Description
_type_ List[ListedObject]

Iterator to objects

Source code in backend/archiver/utils/datablocks.py
133
134
135
136
137
138
139
140
141
142
143
144
@ log
def list_datablocks(client: S3Storage, prefix: Path, bucket: Bucket) -> List[S3Storage.ListedObject]:
    """List all objects in s3 bucket and path

    Args:
        minio_prefix (Path): prefix for files to be listed
        bucket (Bucket): s3 bucket

    Returns:
        _type_: Iterator to objects
    """
    return client.list_objects(bucket, str(prefix))

sufficient_free_space_on_lts()

Checks for free space on configured LTS storage with respect to configured free space percentage.

Returns:

Name Type Description
boolean

condition of eneough free space satisfied

Source code in backend/archiver/utils/datablocks.py
512
513
514
515
516
517
518
519
520
521
522
523
524
525
@ log
def sufficient_free_space_on_lts():
    """ Checks for free space on configured LTS storage with respect to configured free space percentage.

    Returns:
        boolean: condition of eneough free space satisfied
    """

    path = Variables().LTS_STORAGE_ROOT
    stat = shutil.disk_usage(path)
    free_percentage = 100.0 * stat.free / stat.total
    getLogger().info(
        f"LTS free space:{free_percentage:.2}%, expected: {Variables().LTS_FREE_SPACE_PERCENTAGE:.2}%")
    return free_percentage >= Variables().LTS_FREE_SPACE_PERCENTAGE

wait_for_file_accessible(file, timeout_s=360) async

Returns:

Source code in backend/archiver/utils/datablocks.py
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
@ log
async def wait_for_file_accessible(file: Path, timeout_s=360):
    """
    Returns:
    """
    total_time_waited_s = 0
    while not os.access(path=file, mode=os.R_OK):
        seconds_to_wait = 30
        getLogger().info(
            f"File {file} currently not available. Try again in {seconds_to_wait} seconds.")
        await asyncio.sleep(seconds_to_wait)
        total_time_waited_s += seconds_to_wait
        if total_time_waited_s > timeout_s:
            raise SystemError(
                f"File f{file} was not accessible within {timeout_s} seconds")

    return True

wait_for_free_space() async

Asynchronous wait until there is enough free space. Waits in linear intervals to check for free space

TODO: add exponential backoff for waiting time

Returns:

Name Type Description
boolean

Returns True once there is enough free space

Source code in backend/archiver/utils/datablocks.py
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
@ log
async def wait_for_free_space():
    """ Asynchronous wait until there is enough free space. Waits in linear intervals to check for free space

        TODO: add exponential backoff for waiting time

    Returns:
        boolean: Returns True once there is enough free space
    """
    while not sufficient_free_space_on_lts():
        seconds_to_wait = 30
        getLogger().info(
            f"Not enough free space. Waiting for {seconds_to_wait}s")
        await asyncio.sleep(seconds_to_wait)

    return True