Skip to content

Commit 8b5c4d7

Browse files
committed
exporter: support creating blobs with zstd compression
Signed-off-by: Tonis Tiigi <[email protected]>
1 parent 9b010e7 commit 8b5c4d7

File tree

12 files changed

+315
-219
lines changed

12 files changed

+315
-219
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ Keys supported by image output:
230230
* `unpack=true`: unpack image after creation (for use with containerd)
231231
* `dangling-name-prefix=[value]`: name image with `prefix@<digest>` , used for anonymous images
232232
* `name-canonical=true`: add additional canonical name `name@<digest>`
233-
* `compression=[uncompressed,gzip,estargz]`: choose compression type for layers newly created and cached, gzip is default value. estargz should be used with `oci-mediatypes=true`.
233+
* `compression=[uncompressed,gzip,estargz,zstd]`: choose compression type for layers newly created and cached, gzip is default value. estargz should be used with `oci-mediatypes=true`.
234234
* `force-compression=true`: forcefully apply `compression` option to all layers (including already existing layers).
235235

236236
If credentials are required, `buildctl` will attempt to read Docker configuration file `$DOCKER_CONFIG/config.json`.

cache/blobs.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/containerd/containerd/diff"
1212
"github.com/containerd/containerd/leases"
1313
"github.com/containerd/containerd/mount"
14+
"github.com/klauspost/compress/zstd"
1415
"github.com/moby/buildkit/session"
1516
"github.com/moby/buildkit/util/compression"
1617
"github.com/moby/buildkit/util/flightcontrol"
@@ -79,6 +80,9 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
7980
case compression.EStargz:
8081
compressorFunc, finalize = writeEStargz()
8182
mediaType = ocispecs.MediaTypeImageLayerGzip
83+
case compression.Zstd:
84+
compressorFunc = zstdWriter
85+
mediaType = ocispecs.MediaTypeImageLayer + "+zstd"
8286
default:
8387
return nil, errors.Errorf("unknown layer compression type: %q", compressionType)
8488
}
@@ -350,3 +354,7 @@ func ensureCompression(ctx context.Context, ref *immutableRef, compressionType c
350354
})
351355
return err
352356
}
357+
358+
func zstdWriter(dest io.Writer, requiredMediaType string) (io.WriteCloser, error) {
359+
return zstd.NewWriter(dest)
360+
}

cache/blobs_linux.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
//go:build linux
12
// +build linux
23

34
package cache
@@ -34,7 +35,6 @@ var emptyDesc = ocispecs.Descriptor{}
3435
// be computed (e.g. because the mounts aren't overlayfs), it returns
3536
// an error.
3637
func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper []mount.Mount, mediaType string, ref string, compressorFunc compressor) (_ ocispecs.Descriptor, ok bool, err error) {
37-
3838
// Get upperdir location if mounts are overlayfs that can be processed by this differ.
3939
upperdir, err := getOverlayUpperdir(lower, upper)
4040
if err != nil {
@@ -50,6 +50,8 @@ func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper
5050
compressorFunc = func(dest io.Writer, requiredMediaType string) (io.WriteCloser, error) {
5151
return ctdcompression.CompressStream(dest, ctdcompression.Gzip)
5252
}
53+
case ocispecs.MediaTypeImageLayer + "+zstd":
54+
compressorFunc = zstdWriter
5355
default:
5456
return emptyDesc, false, errors.Errorf("unsupported diff media type: %v", mediaType)
5557
}

cache/converter.go

Lines changed: 110 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@ import (
66
"fmt"
77
"io"
88

9+
cdcompression "github.com/containerd/containerd/archive/compression"
910
"github.com/containerd/containerd/content"
1011
"github.com/containerd/containerd/errdefs"
1112
"github.com/containerd/containerd/images"
1213
"github.com/containerd/containerd/images/converter"
13-
"github.com/containerd/containerd/images/converter/uncompress"
1414
"github.com/containerd/containerd/labels"
15+
"github.com/klauspost/compress/zstd"
1516
"github.com/moby/buildkit/util/compression"
1617
digest "github.com/opencontainers/go-digest"
1718
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
@@ -23,11 +24,15 @@ import (
2324
func needsConversion(mediaType string, compressionType compression.Type) (bool, error) {
2425
switch compressionType {
2526
case compression.Uncompressed:
26-
if !images.IsLayerType(mediaType) || uncompress.IsUncompressedType(mediaType) {
27+
if !images.IsLayerType(mediaType) || compression.FromMediaType(mediaType) == compression.Uncompressed {
2728
return false, nil
2829
}
2930
case compression.Gzip:
30-
if !images.IsLayerType(mediaType) || isGzipCompressedType(mediaType) {
31+
if !images.IsLayerType(mediaType) || compression.FromMediaType(mediaType) == compression.Gzip {
32+
return false, nil
33+
}
34+
case compression.Zstd:
35+
if !images.IsLayerType(mediaType) || compression.FromMediaType(mediaType) == compression.Zstd {
3136
return false, nil
3237
}
3338
case compression.EStargz:
@@ -49,113 +54,129 @@ func getConverter(desc ocispecs.Descriptor, compressionType compression.Type) (c
4954
// No conversion. No need to return an error here.
5055
return nil, nil
5156
}
57+
58+
c := conversion{target: compressionType}
59+
60+
from := compression.FromMediaType(desc.MediaType)
61+
switch from {
62+
case compression.Uncompressed:
63+
case compression.Gzip, compression.Zstd:
64+
c.decompress = cdcompression.DecompressStream
65+
default:
66+
return nil, errors.Errorf("unsupported source compression type %q from mediatype %q", from, desc.MediaType)
67+
}
68+
5269
switch compressionType {
5370
case compression.Uncompressed:
54-
return uncompress.LayerConvertFunc, nil
5571
case compression.Gzip:
56-
convertFunc := func(w io.Writer) (io.WriteCloser, error) { return gzip.NewWriter(w), nil }
57-
return gzipLayerConvertFunc(compressionType, convertFunc, nil), nil
72+
c.compress = func(w io.Writer) (io.WriteCloser, error) {
73+
return gzip.NewWriter(w), nil
74+
}
75+
case compression.Zstd:
76+
c.compress = func(w io.Writer) (io.WriteCloser, error) {
77+
return zstd.NewWriter(w)
78+
}
5879
case compression.EStargz:
5980
compressorFunc, finalize := writeEStargz()
60-
convertFunc := func(w io.Writer) (io.WriteCloser, error) { return compressorFunc(w, ocispecs.MediaTypeImageLayerGzip) }
61-
return gzipLayerConvertFunc(compressionType, convertFunc, finalize), nil
81+
c.compress = func(w io.Writer) (io.WriteCloser, error) {
82+
return compressorFunc(w, ocispecs.MediaTypeImageLayerGzip)
83+
}
84+
c.finalize = finalize
6285
default:
63-
return nil, fmt.Errorf("unknown compression type during conversion: %q", compressionType)
86+
return nil, errors.Errorf("unknown target compression type during conversion: %q", compressionType)
6487
}
88+
89+
return (&c).convert, nil
6590
}
6691

67-
func gzipLayerConvertFunc(compressionType compression.Type, convertFunc func(w io.Writer) (io.WriteCloser, error), finalize func(context.Context, content.Store) (map[string]string, error)) converter.ConvertFunc {
68-
return func(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (*ocispecs.Descriptor, error) {
69-
// prepare the source and destination
70-
info, err := cs.Info(ctx, desc.Digest)
71-
if err != nil {
72-
return nil, err
73-
}
74-
labelz := info.Labels
75-
if labelz == nil {
76-
labelz = make(map[string]string)
77-
}
78-
ra, err := cs.ReaderAt(ctx, desc)
79-
if err != nil {
80-
return nil, err
81-
}
82-
defer ra.Close()
83-
ref := fmt.Sprintf("convert-from-%s-to-%s", desc.Digest, compressionType.String())
84-
w, err := cs.Writer(ctx, content.WithRef(ref))
85-
if err != nil {
86-
return nil, err
87-
}
88-
defer w.Close()
89-
if err := w.Truncate(0); err != nil { // Old written data possibly remains
90-
return nil, err
91-
}
92-
zw, err := convertFunc(w)
92+
type conversion struct {
93+
target compression.Type
94+
decompress func(io.Reader) (cdcompression.DecompressReadCloser, error)
95+
compress func(w io.Writer) (io.WriteCloser, error)
96+
finalize func(context.Context, content.Store) (map[string]string, error)
97+
}
98+
99+
func (c *conversion) convert(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (*ocispecs.Descriptor, error) {
100+
// prepare the source and destination
101+
info, err := cs.Info(ctx, desc.Digest)
102+
if err != nil {
103+
return nil, err
104+
}
105+
labelz := info.Labels
106+
if labelz == nil {
107+
labelz = make(map[string]string)
108+
}
109+
ra, err := cs.ReaderAt(ctx, desc)
110+
if err != nil {
111+
return nil, err
112+
}
113+
defer ra.Close()
114+
ref := fmt.Sprintf("convert-from-%s-to-%s", desc.Digest, c.target.String())
115+
w, err := cs.Writer(ctx, content.WithRef(ref))
116+
if err != nil {
117+
return nil, err
118+
}
119+
defer w.Close()
120+
if err := w.Truncate(0); err != nil { // Old written data possibly remains
121+
return nil, err
122+
}
123+
var zw io.WriteCloser = w
124+
var compress io.WriteCloser
125+
if c.compress != nil {
126+
zw, err = c.compress(zw)
93127
if err != nil {
94128
return nil, err
95129
}
96130
defer zw.Close()
131+
compress = zw
132+
}
97133

98-
// convert this layer
99-
diffID := digest.Canonical.Digester()
100-
if _, err := io.Copy(zw, io.TeeReader(io.NewSectionReader(ra, 0, ra.Size()), diffID.Hash())); err != nil {
101-
return nil, err
102-
}
103-
if err := zw.Close(); err != nil { // Flush the writer
104-
return nil, err
105-
}
106-
labelz[labels.LabelUncompressed] = diffID.Digest().String() // update diffID label
107-
if err = w.Commit(ctx, 0, "", content.WithLabels(labelz)); err != nil && !errdefs.IsAlreadyExists(err) {
108-
return nil, err
109-
}
110-
if err := w.Close(); err != nil {
111-
return nil, err
112-
}
113-
info, err = cs.Info(ctx, w.Digest())
134+
// convert this layer
135+
diffID := digest.Canonical.Digester()
136+
var rdr io.Reader = io.NewSectionReader(ra, 0, ra.Size())
137+
if c.decompress != nil {
138+
rc, err := c.decompress(rdr)
114139
if err != nil {
115140
return nil, err
116141
}
117-
118-
newDesc := desc
119-
newDesc.MediaType = convertMediaTypeToGzip(desc.MediaType)
120-
newDesc.Digest = info.Digest
121-
newDesc.Size = info.Size
122-
if finalize != nil {
123-
a, err := finalize(ctx, cs)
124-
if err != nil {
125-
return nil, errors.Wrapf(err, "failed finalize compression")
126-
}
127-
for k, v := range a {
128-
if newDesc.Annotations == nil {
129-
newDesc.Annotations = make(map[string]string)
130-
}
131-
newDesc.Annotations[k] = v
132-
}
142+
defer rc.Close()
143+
rdr = rc
144+
}
145+
if _, err := io.Copy(zw, io.TeeReader(rdr, diffID.Hash())); err != nil {
146+
return nil, err
147+
}
148+
if compress != nil {
149+
if err := compress.Close(); err != nil { // Flush the writer
150+
return nil, err
133151
}
134-
return &newDesc, nil
135152
}
136-
}
137-
138-
func isGzipCompressedType(mt string) bool {
139-
switch mt {
140-
case
141-
images.MediaTypeDockerSchema2LayerGzip,
142-
images.MediaTypeDockerSchema2LayerForeignGzip,
143-
ocispecs.MediaTypeImageLayerGzip,
144-
ocispecs.MediaTypeImageLayerNonDistributableGzip:
145-
return true
146-
default:
147-
return false
153+
labelz[labels.LabelUncompressed] = diffID.Digest().String() // update diffID label
154+
if err = w.Commit(ctx, 0, "", content.WithLabels(labelz)); err != nil && !errdefs.IsAlreadyExists(err) {
155+
return nil, err
156+
}
157+
if err := w.Close(); err != nil {
158+
return nil, err
159+
}
160+
info, err = cs.Info(ctx, w.Digest())
161+
if err != nil {
162+
return nil, err
148163
}
149-
}
150164

151-
func convertMediaTypeToGzip(mt string) string {
152-
if uncompress.IsUncompressedType(mt) {
153-
if images.IsDockerType(mt) {
154-
mt += ".gzip"
155-
} else {
156-
mt += "+gzip"
165+
newDesc := desc
166+
newDesc.MediaType = c.target.DefaultMediaType()
167+
newDesc.Digest = info.Digest
168+
newDesc.Size = info.Size
169+
if c.finalize != nil {
170+
a, err := c.finalize(ctx, cs)
171+
if err != nil {
172+
return nil, errors.Wrapf(err, "failed finalize compression")
173+
}
174+
for k, v := range a {
175+
if newDesc.Annotations == nil {
176+
newDesc.Annotations = make(map[string]string)
177+
}
178+
newDesc.Annotations[k] = v
157179
}
158-
return mt
159180
}
160-
return mt
181+
return &newDesc, nil
161182
}

cache/estargz.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ import (
66
"io"
77
"sync"
88

9-
"github.com/containerd/containerd/archive/compression"
9+
cdcompression "github.com/containerd/containerd/archive/compression"
1010
"github.com/containerd/containerd/content"
1111
"github.com/containerd/stargz-snapshotter/estargz"
12+
"github.com/moby/buildkit/util/compression"
1213
digest "github.com/opencontainers/go-digest"
1314
"github.com/pkg/errors"
1415
)
@@ -22,8 +23,8 @@ func writeEStargz() (compressorFunc compressor, finalize func(context.Context, c
2223
var bInfo blobInfo
2324
var mu sync.Mutex
2425
return func(dest io.Writer, requiredMediaType string) (io.WriteCloser, error) {
25-
if !isGzipCompressedType(requiredMediaType) {
26-
return nil, fmt.Errorf("unsupported media type for estargz compressor %q", requiredMediaType)
26+
if compression.FromMediaType(requiredMediaType) != compression.Gzip {
27+
return nil, errors.Errorf("unsupported media type for estargz compressor %q", requiredMediaType)
2728
}
2829
done := make(chan struct{})
2930
pr, pw := io.Pipe()
@@ -127,7 +128,7 @@ func calculateBlob() (io.WriteCloser, chan blobInfo) {
127128
c := new(counter)
128129
dgstr := digest.Canonical.Digester()
129130
diffID := digest.Canonical.Digester()
130-
decompressR, err := compression.DecompressStream(io.TeeReader(pr, dgstr.Hash()))
131+
decompressR, err := cdcompression.DecompressStream(io.TeeReader(pr, dgstr.Hash()))
131132
if err != nil {
132133
pr.CloseWithError(err)
133134
return

0 commit comments

Comments
 (0)