Skip to content

exporter: support creating blobs with zstd compression #2344

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ Keys supported by image output:
* `unpack=true`: unpack image after creation (for use with containerd)
* `dangling-name-prefix=[value]`: name image with `prefix@<digest>` , used for anonymous images
* `name-canonical=true`: add additional canonical name `name@<digest>`
* `compression=[uncompressed,gzip,estargz]`: choose compression type for layers newly created and cached, gzip is default value. estargz should be used with `oci-mediatypes=true`.
* `compression=[uncompressed,gzip,estargz,zstd]`: choose compression type for layers newly created and cached, gzip is default value. estargz should be used with `oci-mediatypes=true`.
* `force-compression=true`: forcefully apply `compression` option to all layers (including already existing layers).

If credentials are required, `buildctl` will attempt to read Docker configuration file `$DOCKER_CONFIG/config.json`.
Expand Down
8 changes: 8 additions & 0 deletions cache/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/mount"
"github.com/klauspost/compress/zstd"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/flightcontrol"
Expand Down Expand Up @@ -79,6 +80,9 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
case compression.EStargz:
compressorFunc, finalize = writeEStargz()
mediaType = ocispecs.MediaTypeImageLayerGzip
case compression.Zstd:
compressorFunc = zstdWriter
mediaType = ocispecs.MediaTypeImageLayer + "+zstd"
default:
return nil, errors.Errorf("unknown layer compression type: %q", compressionType)
}
Expand Down Expand Up @@ -350,3 +354,7 @@ func ensureCompression(ctx context.Context, ref *immutableRef, compressionType c
})
return err
}

func zstdWriter(dest io.Writer, requiredMediaType string) (io.WriteCloser, error) {
return zstd.NewWriter(dest)
}
4 changes: 3 additions & 1 deletion cache/blobs_linux.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build linux
// +build linux

package cache
Expand Down Expand Up @@ -34,7 +35,6 @@ var emptyDesc = ocispecs.Descriptor{}
// be computed (e.g. because the mounts aren't overlayfs), it returns
// an error.
func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper []mount.Mount, mediaType string, ref string, compressorFunc compressor) (_ ocispecs.Descriptor, ok bool, err error) {

// Get upperdir location if mounts are overlayfs that can be processed by this differ.
upperdir, err := getOverlayUpperdir(lower, upper)
if err != nil {
Expand All @@ -50,6 +50,8 @@ func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper
compressorFunc = func(dest io.Writer, requiredMediaType string) (io.WriteCloser, error) {
return ctdcompression.CompressStream(dest, ctdcompression.Gzip)
}
case ocispecs.MediaTypeImageLayer + "+zstd":
compressorFunc = zstdWriter
default:
return emptyDesc, false, errors.Errorf("unsupported diff media type: %v", mediaType)
}
Expand Down
199 changes: 110 additions & 89 deletions cache/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"fmt"
"io"

cdcompression "github.com/containerd/containerd/archive/compression"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/images/converter"
"github.com/containerd/containerd/images/converter/uncompress"
"github.com/containerd/containerd/labels"
"github.com/klauspost/compress/zstd"
"github.com/moby/buildkit/util/compression"
digest "github.com/opencontainers/go-digest"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
Expand All @@ -23,11 +24,15 @@ import (
func needsConversion(mediaType string, compressionType compression.Type) (bool, error) {
switch compressionType {
case compression.Uncompressed:
if !images.IsLayerType(mediaType) || uncompress.IsUncompressedType(mediaType) {
if !images.IsLayerType(mediaType) || compression.FromMediaType(mediaType) == compression.Uncompressed {
return false, nil
}
case compression.Gzip:
if !images.IsLayerType(mediaType) || isGzipCompressedType(mediaType) {
if !images.IsLayerType(mediaType) || compression.FromMediaType(mediaType) == compression.Gzip {
return false, nil
}
case compression.Zstd:
if !images.IsLayerType(mediaType) || compression.FromMediaType(mediaType) == compression.Zstd {
return false, nil
}
case compression.EStargz:
Expand All @@ -49,113 +54,129 @@ func getConverter(desc ocispecs.Descriptor, compressionType compression.Type) (c
// No conversion. No need to return an error here.
return nil, nil
}

c := conversion{target: compressionType}

from := compression.FromMediaType(desc.MediaType)
switch from {
case compression.Uncompressed:
case compression.Gzip, compression.Zstd:
c.decompress = cdcompression.DecompressStream
default:
return nil, errors.Errorf("unsupported source compression type %q from mediatype %q", from, desc.MediaType)
}

switch compressionType {
case compression.Uncompressed:
return uncompress.LayerConvertFunc, nil
case compression.Gzip:
convertFunc := func(w io.Writer) (io.WriteCloser, error) { return gzip.NewWriter(w), nil }
return gzipLayerConvertFunc(compressionType, convertFunc, nil), nil
c.compress = func(w io.Writer) (io.WriteCloser, error) {
return gzip.NewWriter(w), nil
}
case compression.Zstd:
c.compress = func(w io.Writer) (io.WriteCloser, error) {
return zstd.NewWriter(w)
}
case compression.EStargz:
compressorFunc, finalize := writeEStargz()
convertFunc := func(w io.Writer) (io.WriteCloser, error) { return compressorFunc(w, ocispecs.MediaTypeImageLayerGzip) }
return gzipLayerConvertFunc(compressionType, convertFunc, finalize), nil
c.compress = func(w io.Writer) (io.WriteCloser, error) {
return compressorFunc(w, ocispecs.MediaTypeImageLayerGzip)
}
c.finalize = finalize
default:
return nil, fmt.Errorf("unknown compression type during conversion: %q", compressionType)
return nil, errors.Errorf("unknown target compression type during conversion: %q", compressionType)
}

return (&c).convert, nil
}

func gzipLayerConvertFunc(compressionType compression.Type, convertFunc func(w io.Writer) (io.WriteCloser, error), finalize func(context.Context, content.Store) (map[string]string, error)) converter.ConvertFunc {
return func(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (*ocispecs.Descriptor, error) {
// prepare the source and destination
info, err := cs.Info(ctx, desc.Digest)
if err != nil {
return nil, err
}
labelz := info.Labels
if labelz == nil {
labelz = make(map[string]string)
}
ra, err := cs.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
defer ra.Close()
ref := fmt.Sprintf("convert-from-%s-to-%s", desc.Digest, compressionType.String())
w, err := cs.Writer(ctx, content.WithRef(ref))
if err != nil {
return nil, err
}
defer w.Close()
if err := w.Truncate(0); err != nil { // Old written data possibly remains
return nil, err
}
zw, err := convertFunc(w)
type conversion struct {
target compression.Type
decompress func(io.Reader) (cdcompression.DecompressReadCloser, error)
compress func(w io.Writer) (io.WriteCloser, error)
finalize func(context.Context, content.Store) (map[string]string, error)
}

func (c *conversion) convert(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (*ocispecs.Descriptor, error) {
// prepare the source and destination
info, err := cs.Info(ctx, desc.Digest)
if err != nil {
return nil, err
}
labelz := info.Labels
if labelz == nil {
labelz = make(map[string]string)
}
ra, err := cs.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
defer ra.Close()
ref := fmt.Sprintf("convert-from-%s-to-%s", desc.Digest, c.target.String())
w, err := cs.Writer(ctx, content.WithRef(ref))
if err != nil {
return nil, err
}
defer w.Close()
if err := w.Truncate(0); err != nil { // Old written data possibly remains
return nil, err
}
var zw io.WriteCloser = w
var compress io.WriteCloser
if c.compress != nil {
zw, err = c.compress(zw)
if err != nil {
return nil, err
}
defer zw.Close()
compress = zw
}

// convert this layer
diffID := digest.Canonical.Digester()
if _, err := io.Copy(zw, io.TeeReader(io.NewSectionReader(ra, 0, ra.Size()), diffID.Hash())); err != nil {
return nil, err
}
if err := zw.Close(); err != nil { // Flush the writer
return nil, err
}
labelz[labels.LabelUncompressed] = diffID.Digest().String() // update diffID label
if err = w.Commit(ctx, 0, "", content.WithLabels(labelz)); err != nil && !errdefs.IsAlreadyExists(err) {
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
info, err = cs.Info(ctx, w.Digest())
// convert this layer
diffID := digest.Canonical.Digester()
var rdr io.Reader = io.NewSectionReader(ra, 0, ra.Size())
if c.decompress != nil {
rc, err := c.decompress(rdr)
if err != nil {
return nil, err
}

newDesc := desc
newDesc.MediaType = convertMediaTypeToGzip(desc.MediaType)
newDesc.Digest = info.Digest
newDesc.Size = info.Size
if finalize != nil {
a, err := finalize(ctx, cs)
if err != nil {
return nil, errors.Wrapf(err, "failed finalize compression")
}
for k, v := range a {
if newDesc.Annotations == nil {
newDesc.Annotations = make(map[string]string)
}
newDesc.Annotations[k] = v
}
defer rc.Close()
rdr = rc
}
if _, err := io.Copy(zw, io.TeeReader(rdr, diffID.Hash())); err != nil {
return nil, err
}
if compress != nil {
if err := compress.Close(); err != nil { // Flush the writer
return nil, err
}
return &newDesc, nil
}
}

func isGzipCompressedType(mt string) bool {
switch mt {
case
images.MediaTypeDockerSchema2LayerGzip,
images.MediaTypeDockerSchema2LayerForeignGzip,
ocispecs.MediaTypeImageLayerGzip,
ocispecs.MediaTypeImageLayerNonDistributableGzip:
return true
default:
return false
labelz[labels.LabelUncompressed] = diffID.Digest().String() // update diffID label
if err = w.Commit(ctx, 0, "", content.WithLabels(labelz)); err != nil && !errdefs.IsAlreadyExists(err) {
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
info, err = cs.Info(ctx, w.Digest())
if err != nil {
return nil, err
}
}

func convertMediaTypeToGzip(mt string) string {
if uncompress.IsUncompressedType(mt) {
if images.IsDockerType(mt) {
mt += ".gzip"
} else {
mt += "+gzip"
newDesc := desc
newDesc.MediaType = c.target.DefaultMediaType()
newDesc.Digest = info.Digest
newDesc.Size = info.Size
if c.finalize != nil {
a, err := c.finalize(ctx, cs)
if err != nil {
return nil, errors.Wrapf(err, "failed finalize compression")
}
for k, v := range a {
if newDesc.Annotations == nil {
newDesc.Annotations = make(map[string]string)
}
newDesc.Annotations[k] = v
}
return mt
}
return mt
return &newDesc, nil
}
9 changes: 5 additions & 4 deletions cache/estargz.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
"io"
"sync"

"github.com/containerd/containerd/archive/compression"
cdcompression "github.com/containerd/containerd/archive/compression"
"github.com/containerd/containerd/content"
"github.com/containerd/stargz-snapshotter/estargz"
"github.com/moby/buildkit/util/compression"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
Expand All @@ -22,8 +23,8 @@ func writeEStargz() (compressorFunc compressor, finalize func(context.Context, c
var bInfo blobInfo
var mu sync.Mutex
return func(dest io.Writer, requiredMediaType string) (io.WriteCloser, error) {
if !isGzipCompressedType(requiredMediaType) {
return nil, fmt.Errorf("unsupported media type for estargz compressor %q", requiredMediaType)
if compression.FromMediaType(requiredMediaType) != compression.Gzip {
return nil, errors.Errorf("unsupported media type for estargz compressor %q", requiredMediaType)
}
done := make(chan struct{})
pr, pw := io.Pipe()
Expand Down Expand Up @@ -127,7 +128,7 @@ func calculateBlob() (io.WriteCloser, chan blobInfo) {
c := new(counter)
dgstr := digest.Canonical.Digester()
diffID := digest.Canonical.Digester()
decompressR, err := compression.DecompressStream(io.TeeReader(pr, dgstr.Hash()))
decompressR, err := cdcompression.DecompressStream(io.TeeReader(pr, dgstr.Hash()))
if err != nil {
pr.CloseWithError(err)
return
Expand Down
Loading