Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .mailmap
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ Joe Marsh Rossney <17361029+jmarshrossney@users.noreply.github.com>
Joe Marsh Rossney <17361029+jmarshrossney@users.noreply.github.com> <17361029+marshrossney@users.noreply.github.com>
Joseph Abram <joseph.abram@metoffice.gov.uk> J-J-Abram <joseph.abram@metoffice.gov.uk>
Joseph Abram <joseph.abram@metoffice.gov.uk> J-J-Abram <98320699+J-J-Abram@users.noreply.github.com>
David Rundle <david.rundle@metoffice.gov.uk> david-rundle <37152257+david-rundle@users.noreply.github.com>
Christopher Bennett <christopher.bennett@metoffice.gov.uk> christopher.bennett <christopher.bennett@metoffice.gov.uk>
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ below:
- Dimitrios Theodorakis (Met Office, UK)
- Joseph Abram (Met Office, UK)
- James Frost (Met Office, UK)
- David Rundle (Met Office, UK)
- Christopher Bennett (Met Office, UK)
<!-- end-shortlog -->

Expand Down
1 change: 0 additions & 1 deletion metomi/rose/app_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ def __init__(self, *args, **kwargs):

def run_impl(self, opts, args, uuid, work_files):
"""The actual logic for a run."""

# Preparation.
conf_tree = self.config_load(opts)
self._prep(conf_tree, opts)
Expand Down
21 changes: 20 additions & 1 deletion metomi/rose/apps/rose_arch.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ def _run_target_setup(
target.sources[checksum] = RoseArchSource(
checksum, name, path
)

if not target.sources:
if is_compulsory_target:
target.status = target.ST_BAD
Expand All @@ -314,6 +315,23 @@ def _run_target_setup(
)
)
target.status = target.ST_BAD

target.compress_threads = self._get_conf(config, t_node,
"compress-threads",
default="1")
if (
not target.compress_threads.isdigit()
or int(target.compress_threads) < 0
):
raise ConfigValueError(
[t_key, "compress-threads"],
target.compress_threads,
ValueError(
"compress-threads must be a 0 (automatic) or"
" a positive integer"
)
)

rename_format = self._get_conf(config, t_node, "rename-format")
if rename_format:
rename_parser_str = self._get_conf(config, t_node, "rename-parser")
Expand Down Expand Up @@ -398,7 +416,8 @@ def _run_target_update(cls, dao, app_runner, compress_manager, target):
# Compress sources
if target.compress_scheme:
handler = compress_manager.get_handler(target.compress_scheme)
handler.compress_sources(target, work_dir)
compress_args = {"threads": target.compress_threads}
handler.compress_sources(target, work_dir, **compress_args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried a very simple check

mode=rose_arch

[arch]
# rose-app.conf
command-format=cp %(sources)s %(target)s
target-prefix=/home/users/tim.pillinger/cylc-src/rose-apps/arch/archive/
source-prefix=/home/users/tim.pillinger/cylc-src/rose-apps/arch/source/

[arch:world.out]
source='world.out'

[arch:gunzipme.gz]
source='gunzipme.out'

[arch:targunzipme.tar.gz]
source='targunzipme.out'
export CYLC_WORKFLOW_ID='hippo'
export CYLC_TASK_ID='task-run'
export CYLC_TASK_NAME='task-run'
export CYLC_TASK_CYCLE_POINT='task-run'
export CYLC_TASK_LOG_ROOT="${HERE}/log"

echo Running app from "${HERE}/app"
rose task-run --config="${HERE}/app"

and got an error:

[FAIL] RoseArchGzip.compress_sources() got an unexpected keyword argument 'threads'

I think that you need to add the threads argument to rose_arch_gzip.py and possibly other items in that folder. You may want to consider emitting a warning if threads != 1 and program_is_single_thread:

times[1] = time() # transformed time
# Run archive command
sources = []
Expand Down
5 changes: 4 additions & 1 deletion metomi/rose/apps/rose_arch_compressions/rose_arch_gzip.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@ class RoseArchGzip:
def __init__(self, app_runner, *args, **kwargs):
self.app_runner = app_runner

def compress_sources(self, target, work_dir):
def compress_sources(self, target, work_dir, threads="1"):
"""Gzip each source in target.

Use work_dir to dump results.

"""
if threads != "1":
raise NotImplementedError("Gzip does not support multi-threading")

for source in target.sources.values():
if source.path.endswith("." + target.compress_scheme):
continue # assume already done
Expand Down
35 changes: 32 additions & 3 deletions metomi/rose/apps/rose_arch_compressions/rose_arch_tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,28 @@ class RoseArchTarGzip:

"""Compress archive sources in tar."""

SCHEMES = ["pax", "pax.gz", "tar", "tar.gz", "tgz"]
SCHEME_FORMATS = {"pax": tarfile.PAX_FORMAT, "pax.gz": tarfile.PAX_FORMAT}
SCHEMES = ["pax", "pax.gz", "pax.zst", "pax.xz",
"tar", "tar.gz", "tgz", "tar.zst", "tar.xz", "txz"]
SCHEME_FORMATS = {"pax": tarfile.PAX_FORMAT,
"pax.gz": tarfile.PAX_FORMAT,
"pax.zst": tarfile.PAX_FORMAT,
"pax.xz": tarfile.PAX_FORMAT}
GZIP_EXTS = ["pax.gz", "tar.gz", "tgz"]
ZSTD_EXTS = ["pax.zst", "tar.zst"]
XZ_EXTS = ["pax.xz", "tar.xz", "txz"]

def __init__(self, app_runner, *args, **kwargs):
self.app_runner = app_runner

def compress_sources(self, target, work_dir):
def compress_sources(self, target, work_dir, threads="1"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO threads should be cast to int as soon as we parse the config.

"""Create a tar archive of all files in target.

Use work_dir to dump results.

"""
if threads != "1":
raise NotImplementedError("xz does not support multi-threading")

sources = list(target.sources.values())
if len(sources) == 1 and sources[0].path.endswith(
"." + target.compress_scheme
Expand Down Expand Up @@ -70,3 +79,23 @@ def compress_sources(self, target, work_dir):
command = "gzip -c '%s' >'%s'" % (tar_name, gz_name)
self.app_runner.popen.run_simple(command, shell=True)
self.app_runner.fs_util.delete(tar_name)

elif target.compress_scheme in self.ZSTD_EXTS:
fdsec, zst_name = mkstemp(
suffix="." + target.compress_scheme, dir=work_dir
)
os.close(fdsec)
target.work_source_path = zst_name
command = f"zstd --rm -T{threads} -c '{tar_name}' >'{zst_name}'"
self.app_runner.popen.run_simple(command, shell=True)
self.app_runner.fs_util.delete(tar_name)

elif target.compress_scheme in self.XZ_EXTS:
fdsec, xz_name = mkstemp(
suffix="." + target.compress_scheme, dir=work_dir
)
os.close(fdsec)
target.work_source_path = xz_name
command = "xz -c '%s' >'%s'" % (tar_name, xz_name)
self.app_runner.popen.run_simple(command, shell=True)
self.app_runner.fs_util.delete(tar_name)
53 changes: 53 additions & 0 deletions metomi/rose/apps/rose_arch_compressions/rose_arch_xz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright (C) British Crown (Met Office) & Contributors.
# This file is part of Rose, a framework for meteorological suites.
#
# Rose is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Rose is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Rose. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""Compress archive sources using xz."""


import os


class RoseArchXz:

"""Compress archive sources in xz."""

SCHEMES = ["xz"]

def __init__(self, app_runner, *args, **kwargs):
self.app_runner = app_runner

def compress_sources(self, target, work_dir, threads="1"):
"""xz each source in target.

Use work_dir to dump results.

"""

if threads != "1":
raise NotImplementedError("xz does not support multi-threading")

for source in target.sources.values():
if source.path.endswith("." + target.compress_scheme):
continue # assume already done
name_xz = source.name + "." + target.compress_scheme
work_path_xz = os.path.join(work_dir, name_xz)
self.app_runner.fs_util.makedirs(
self.app_runner.fs_util.dirname(work_path_xz)
)

command = "xz -c '%s' >'%s'" % (source.path, work_path_xz)
self.app_runner.popen.run_simple(command, shell=True)
source.path = work_path_xz
50 changes: 50 additions & 0 deletions metomi/rose/apps/rose_arch_compressions/rose_arch_zstd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright (C) British Crown (Met Office) & Contributors.
# This file is part of Rose, a framework for meteorological suites.
#
# Rose is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Rose is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Rose. If not, see <http://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------
"""Compress archive sources using zstd."""


import os


class RoseArchZstd:

"""Compress archive sources in zstd."""

SCHEMES = ["zst", "zstd"]

def __init__(self, app_runner, *args, **kwargs):
self.app_runner = app_runner

def compress_sources(self, target, work_dir, threads="1"):
"""zstd each source in target.

Use work_dir to dump results.

"""
for source in target.sources.values():
if source.path.endswith("." + target.compress_scheme):
continue # assume already done
name_zst = source.name + "." + target.compress_scheme
work_path_zst = os.path.join(work_dir, name_zst)
self.app_runner.fs_util.makedirs(
self.app_runner.fs_util.dirname(work_path_zst)
)
command = \
f"zstd --rm -T{threads} -c {source.path} > {work_path_zst}"

self.app_runner.popen.run_simple(command, shell=True)
source.path = work_path_zst
11 changes: 9 additions & 2 deletions metomi/rose/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,16 @@ def __repr__(self):

def env_export(key, value, event_handler=None):
"""Export an environment variable."""
if key not in _EXPORTED_ENVS or os.environ.get(key) != value:
if (
key not in _EXPORTED_ENVS
or os.environb.get(key.encode("UTF-8"))
!= value.encode("UTF-8")
):
# N.B. Should be safe, because the list of environment variables is
# normally quite small.
_EXPORTED_ENVS[key] = value
os.environb[key.encode('UTF-8')] = value.encode('UTF-8')

if callable(event_handler):
event_handler(EnvExportEvent(key, value))

Expand All @@ -134,7 +139,7 @@ def env_var_escape(text, match_mode=None):
return ret


def env_var_process(text, unbound=None, match_mode=None, environ=os.environ):
def env_var_process(text, unbound=None, match_mode=None, environ=None):
"""Substitute environment variables into a string.

For each $NAME and ${NAME} in "text", substitute with the value
Expand All @@ -145,6 +150,8 @@ def env_var_process(text, unbound=None, match_mode=None, environ=os.environ):
value of "unbound".

"""
if environ is None:
environ = os.environ
ret = ""
try:
tail = text.decode()
Expand Down
Loading
Loading