Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cache-cli/Dockerfile.dev
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
FROM golang:1.23

RUN apt-get update && apt-get install -y zstd && apt-get clean

RUN go install gotest.tools/gotestsum@v1.12.3
RUN mkdir /root/.ssh
COPY id_rsa /root/.ssh/semaphore_cache_key
Expand Down
2 changes: 1 addition & 1 deletion cache-cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ require (
github.com/googleapis/gax-go/v2 v2.7.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.15.13 // indirect
github.com/klauspost/compress v1.18.3 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
Expand Down
2 changes: 2 additions & 0 deletions cache-cli/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGw
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/klauspost/compress v1.15.13 h1:NFn1Wr8cfnenSJSA46lLq4wHCcBzKTSjnBIexDMMOV0=
github.com/klauspost/compress v1.15.13/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw=
github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
github.com/klauspost/pgzip v1.2.6 h1:8RXeL5crjEUFnR2/Sn6GJNWtSQ3Dk8pq4CL3jvdDyjU=
github.com/klauspost/pgzip v1.2.6/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
Expand Down
78 changes: 66 additions & 12 deletions cache-cli/pkg/archive/native_archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"
"time"

"github.com/klauspost/compress/zstd"
pgzip "github.com/klauspost/pgzip"
"github.com/semaphoreci/toolbox/cache-cli/pkg/metrics"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -39,9 +40,12 @@ func (a *NativeArchiver) Compress(dst, src string) error {
return err
}

// The order is 'tar > gzip > file'
gzipWriter := a.newGzipWriter(dstFile)
tarWriter := tar.NewWriter(gzipWriter)
// The order is 'tar > compress > file'
compressingWriter, err := a.newCompressingWriter(dstFile)
if err != nil {
return err
}
tarWriter := tar.NewWriter(compressingWriter)

// We walk through every file in the specified path, adding them to the tar archive.
err = filepath.Walk(src, func(fileName string, fileInfo os.FileInfo, e error) error {
Expand Down Expand Up @@ -99,8 +103,8 @@ func (a *NativeArchiver) Compress(dst, src string) error {
return fmt.Errorf("error closing tar writer: %v", err)
}

if err := gzipWriter.Close(); err != nil {
return fmt.Errorf("error closing gzip writer: %v", err)
if err := compressingWriter.Close(); err != nil {
return fmt.Errorf("error closing compressing writer: %v", err)
}

if err := dstFile.Close(); err != nil {
Expand All @@ -124,9 +128,9 @@ func (a *NativeArchiver) Decompress(src string) (string, error) {

defer srcFile.Close()

uncompressedStream, err := a.newGzipReader(srcFile)
uncompressedStream, err := a.newDecompressingReader(srcFile)
if err != nil {
log.Errorf("error creating gzip reader: %v", err)
log.Errorf("error creating decompressing reader: %v", err)
a.publishCorruptionMetric()
return "", err
}
Expand Down Expand Up @@ -278,15 +282,65 @@ func (a *NativeArchiver) openFile(header *tar.Header, tarReader *tar.Reader) (*o
return outFile, nil
}

func (a *NativeArchiver) newGzipWriter(dstFile *os.File) io.WriteCloser {
if a.UseParallelism {
return pgzip.NewWriter(dstFile)
func (a *NativeArchiver) newCompressingWriter(dstFile *os.File) (io.WriteCloser, error) {
if !a.UseParallelism {
// zstd uses limited parallelism by default so we need to explicitly turn it off
return zstd.NewWriter(dstFile, zstd.WithEncoderConcurrency(1))
} else {
return zstd.NewWriter(dstFile)
}
}

func IsZstdCompressed(file *os.File) (bool, error) {
// Always reset file pointer before returning
defer file.Seek(0, io.SeekStart)

return gzip.NewWriter(dstFile)
reader, err := zstd.NewReader(file)
if err != nil {
if err == zstd.ErrMagicMismatch {
return false, nil
}
return false, err
}
defer reader.Close()

// Try to read a byte to force magic number check
buf := make([]byte, 1)
_, err = reader.Read(buf)
if err == nil {
return true, nil
}
if err == zstd.ErrMagicMismatch {
return false, nil
}
return false, err
}

func (a *NativeArchiver) newGzipReader(dstFile *os.File) (io.ReadCloser, error) {
func (a *NativeArchiver) newDecompressingReader(dstFile *os.File) (io.ReadCloser, error) {
is_zstd, err := IsZstdCompressed(dstFile)
if err != nil {
return nil, err
}

if is_zstd {
var reader *zstd.Decoder
var err error

if !a.UseParallelism {
// zstd uses limited parallelism by default so we need to explicitly turn it off
reader, err = zstd.NewReader(dstFile, zstd.WithDecoderConcurrency(1))
} else {
reader, err = zstd.NewReader(dstFile)
}

if err != nil {
return nil, err
}

return reader.IOReadCloser(), nil
}

// Not zstd, fall back to previous behaviour with gzip
if a.UseParallelism {
return pgzip.NewReader(dstFile)
}
Expand Down
37 changes: 28 additions & 9 deletions cache-cli/pkg/archive/shell_out_archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os/exec"
"path/filepath"

"github.com/klauspost/compress/zstd"
"github.com/semaphoreci/toolbox/cache-cli/pkg/metrics"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -60,18 +61,36 @@ func (a *ShellOutArchiver) Decompress(src string) (string, error) {

func (a *ShellOutArchiver) compressionCommand(dst, src string) *exec.Cmd {
if filepath.IsAbs(src) {
return exec.Command("tar", "czPf", dst, src)
return exec.Command("tar", "cPf", dst, "--zstd", src)
}

return exec.Command("tar", "czf", dst, src)
return exec.Command("tar", "cf", dst, "--zstd", src)
}

func (a *ShellOutArchiver) decompressionCmd(dst, tempFile string) *exec.Cmd {
if filepath.IsAbs(dst) {
return exec.Command("tar", "xzPf", tempFile, "-C", ".")
return exec.Command("tar", "xPf", tempFile, "-C", ".")
}

return exec.Command("tar", "xzf", tempFile, "-C", ".")
return exec.Command("tar", "xf", tempFile, "-C", ".")
}

func openDecompressingReader(file *os.File) (io.ReadCloser, error) {
is_zstd, err := IsZstdCompressed(file)
if err != nil {
return nil, err
}

if is_zstd {
var reader *zstd.Decoder
reader, err = zstd.NewReader(file)
if err != nil {
return nil, err
}
return reader.IOReadCloser(), nil
}

return gzip.NewReader(file)
}

func (a *ShellOutArchiver) findRestorationPath(src string) (string, error) {
Expand All @@ -85,24 +104,24 @@ func (a *ShellOutArchiver) findRestorationPath(src string) (string, error) {
// #nosec
defer file.Close()

gzipReader, err := gzip.NewReader(file)
reader, err := openDecompressingReader(file)
if err != nil {
log.Errorf("error creating gzip reader: %v", err)
return "", err
}

tr := tar.NewReader(gzipReader)
tr := tar.NewReader(reader)
header, err := tr.Next()
if err == io.EOF {
log.Warning("No files in archive.")
_ = gzipReader.Close()
_ = reader.Close()
return "", nil
}

if err != nil {
_ = gzipReader.Close()
_ = reader.Close()
return "", fmt.Errorf("error reading archive %s: %v", src, err)
}

return filepath.FromSlash(header.Name), gzipReader.Close()
return filepath.FromSlash(header.Name), reader.Close()
}