Skip to content

Commit 2161e22

Browse files
committed
Add support for zstd layer upload
It is now possible to `crane append -f bla.tar.zstd` to upload Zstd layer, both from file and from pipe. Before this commit, crane would erroneously upload such layer with gzip mime type. While this PR does not _fully_ solve #1501, it is very close to that. Signed-off-by: Marat Radchenko <[email protected]>
1 parent dbcd01c commit 2161e22

File tree

13 files changed

+231
-90
lines changed

13 files changed

+231
-90
lines changed

cmd/crane/cmd/flatten.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,11 @@ func flattenImage(old v1.Image, repo name.Repository, use string, o crane.Option
228228
}
229229

230230
// TODO: Make compression configurable?
231-
layer := stream.NewLayer(mutate.Extract(old), stream.WithCompressionLevel(gzip.BestCompression))
231+
layer, err := stream.NewLayer(mutate.Extract(old), stream.WithCompressionLevel(gzip.BestCompression))
232+
if err != nil {
233+
return nil, fmt.Errorf("new layer: %w", err)
234+
}
235+
232236
if err := remote.WriteLayer(repo, layer, o.Remote...); err != nil {
233237
return nil, fmt.Errorf("uploading layer: %w", err)
234238
}

internal/zstd/zstd.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,14 @@ func UnzipReadCloser(r io.ReadCloser) (io.ReadCloser, error) {
102102
}, nil
103103
}
104104

105+
func NewReader(r io.Reader) (io.Reader, error) {
106+
return zstd.NewReader(r)
107+
}
108+
109+
func NewWriterLevel(w io.Writer, level int) (*zstd.Encoder, error) {
110+
return zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(level)))
111+
}
112+
105113
// Is detects whether the input stream is compressed.
106114
func Is(r io.Reader) (bool, error) {
107115
magicHeader := make([]byte, 4)

pkg/compression/compression.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@
1515
// Package compression abstracts over gzip and zstd.
1616
package compression
1717

18+
import (
19+
"fmt"
20+
21+
"github.com/google/go-containerregistry/pkg/v1/types"
22+
)
23+
1824
// Compression is an enumeration of the supported compression algorithms
1925
type Compression string
2026

@@ -24,3 +30,29 @@ const (
2430
GZip Compression = "gzip"
2531
ZStd Compression = "zstd"
2632
)
33+
34+
func (compression Compression) ToMediaType(oci bool) (types.MediaType, error) {
35+
if oci {
36+
switch compression {
37+
case ZStd:
38+
return types.OCILayerZStd, nil
39+
case GZip:
40+
return types.OCILayer, nil
41+
case None:
42+
return types.OCIUncompressedLayer, nil
43+
default:
44+
return types.OCILayer, fmt.Errorf("unsupported compression: %s", compression)
45+
}
46+
} else {
47+
switch compression {
48+
case ZStd:
49+
return types.DockerLayerZstd, nil
50+
case GZip:
51+
return types.DockerLayer, nil
52+
case None:
53+
return types.DockerUncompressedLayer, nil
54+
default:
55+
return types.DockerLayer, fmt.Errorf("unsupported compression: %s", compression)
56+
}
57+
}
58+
}

pkg/crane/append.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,11 @@ func Append(base v1.Image, paths ...string) (v1.Image, error) {
5555
return nil, fmt.Errorf("getting base image media type: %w", err)
5656
}
5757

58-
layerType := types.DockerLayer
59-
60-
if baseMediaType == types.OCIManifestSchema1 {
61-
layerType = types.OCILayer
62-
}
58+
oci := baseMediaType == types.OCIManifestSchema1
6359

6460
layers := make([]v1.Layer, 0, len(paths))
6561
for _, path := range paths {
66-
layer, err := getLayer(path, layerType)
62+
layer, err := getLayer(path, oci)
6763
if err != nil {
6864
return nil, fmt.Errorf("reading layer %q: %w", path, err)
6965
}
@@ -81,16 +77,16 @@ func Append(base v1.Image, paths ...string) (v1.Image, error) {
8177
return mutate.AppendLayers(base, layers...)
8278
}
8379

84-
func getLayer(path string, layerType types.MediaType) (v1.Layer, error) {
80+
func getLayer(path string, oci bool) (v1.Layer, error) {
8581
f, err := streamFile(path)
8682
if err != nil {
8783
return nil, err
8884
}
8985
if f != nil {
90-
return stream.NewLayer(f, stream.WithMediaType(layerType)), nil
86+
return stream.NewLayer(f, stream.WithOCIMediaType(oci))
9187
}
9288

93-
return tarball.LayerFromFile(path, tarball.WithMediaType(layerType))
89+
return tarball.LayerFromFile(path, tarball.WithOCIMediaType(oci))
9490
}
9591

9692
// If we're dealing with a named pipe, trying to open it multiple times will

pkg/v1/layout/write_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -473,8 +473,12 @@ func TestStreamingWriteLayer(t *testing.T) {
473473
return tw.Close()
474474
}())
475475
}()
476+
layer, err := stream.NewLayer(pr)
477+
if err != nil {
478+
t.Fatalf("stream.NewLayer: %v", err)
479+
}
476480
img, err := mutate.Append(empty.Image, mutate.Addendum{
477-
Layer: stream.NewLayer(pr),
481+
Layer: layer,
478482
})
479483
if err != nil {
480484
t.Fatalf("creating random streaming image failed: %v", err)

pkg/v1/mutate/mutate_test.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -451,12 +451,19 @@ func TestMutateMediaType(t *testing.T) {
451451
}
452452

453453
func TestAppendStreamableLayer(t *testing.T) {
454-
img, err := mutate.AppendLayers(
455-
sourceImage(t),
456-
stream.NewLayer(io.NopCloser(strings.NewReader(strings.Repeat("a", 100)))),
457-
stream.NewLayer(io.NopCloser(strings.NewReader(strings.Repeat("b", 100)))),
458-
stream.NewLayer(io.NopCloser(strings.NewReader(strings.Repeat("c", 100)))),
459-
)
454+
l1, err := stream.NewLayer(io.NopCloser(strings.NewReader(strings.Repeat("a", 100))))
455+
if err != nil {
456+
t.Fatalf("stream.NewLayer: %v", err)
457+
}
458+
l2, err := stream.NewLayer(io.NopCloser(strings.NewReader(strings.Repeat("b", 100))))
459+
if err != nil {
460+
t.Fatalf("stream.NewLayer: %v", err)
461+
}
462+
l3, err := stream.NewLayer(io.NopCloser(strings.NewReader(strings.Repeat("c", 100))))
463+
if err != nil {
464+
t.Fatalf("stream.NewLayer: %v", err)
465+
}
466+
img, err := mutate.AppendLayers(sourceImage(t), l1, l2, l3)
460467
if err != nil {
461468
t.Fatalf("AppendLayers: %v", err)
462469
}

pkg/v1/remote/multi_write_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,12 @@ func streamable(t *testing.T) v1.Layer {
5050
t.Fatalf("Uncompressed(): %v", err)
5151
}
5252

53-
return stream.NewLayer(rc)
53+
l, err := stream.NewLayer(rc)
54+
if err != nil {
55+
t.Fatalf("stream.NewLayer: %v", err)
56+
}
57+
58+
return l
5459
}
5560

5661
type rawManifest struct {

pkg/v1/remote/write_test.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,10 @@ func TestDedupeLayers(t *testing.T) {
540540
// Append three identical stream.Layers, whose uploads will *not* be
541541
// deduped since Write can't tell they're identical ahead of time.
542542
for i := 0; i < 3; i++ {
543-
sl := stream.NewLayer(newBlob())
543+
sl, err := stream.NewLayer(newBlob())
544+
if err != nil {
545+
t.Fatalf("stream.NewLayer(#%d): %v", i, err)
546+
}
544547
img, err = mutate.AppendLayers(img, sl)
545548
if err != nil {
546549
t.Fatalf("mutate.AppendLayer(#%d): %v", i, err)
@@ -697,7 +700,10 @@ func TestStreamLayer(t *testing.T) {
697700
defer closer.Close()
698701

699702
streamLocation := w.url(expectedPath)
700-
sl := stream.NewLayer(newBlob())
703+
sl, err := stream.NewLayer(newBlob())
704+
if err != nil {
705+
t.Fatalf("stream.NewLayer: %v", err)
706+
}
701707

702708
commitLocation, err := w.streamBlob(context.Background(), sl, streamLocation.String())
703709
if err != nil {
@@ -856,7 +862,10 @@ func TestUploadOneStreamedLayer(t *testing.T) {
856862
newBlob := func() io.ReadCloser { return io.NopCloser(bytes.NewReader(bytes.Repeat([]byte{'a'}, int(n)))) }
857863
wantDigest := "sha256:3d7c465be28d9e1ed810c42aeb0e747b44441424f566722ba635dc93c947f30e"
858864
wantDiffID := "sha256:27dd1f61b867b6a0f6e9d8a41c43231de52107e53ae424de8f847b821db4b711"
859-
l := stream.NewLayer(newBlob())
865+
l, err := stream.NewLayer(newBlob())
866+
if err != nil {
867+
t.Fatalf("stream.NewLayer: %v", err)
868+
}
860869
if err := w.uploadOne(ctx, l); err != nil {
861870
t.Fatalf("uploadOne: %v", err)
862871
}

pkg/v1/stream/layer.go

Lines changed: 73 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ import (
2626
"os"
2727
"sync"
2828

29+
internalcomp "github.com/google/go-containerregistry/internal/compression"
30+
"github.com/google/go-containerregistry/internal/zstd"
31+
"github.com/google/go-containerregistry/pkg/compression"
2932
v1 "github.com/google/go-containerregistry/pkg/v1"
3033
"github.com/google/go-containerregistry/pkg/v1/types"
3134
)
@@ -42,14 +45,19 @@ var (
4245

4346
// Layer is a streaming implementation of v1.Layer.
4447
type Layer struct {
45-
blob io.ReadCloser
46-
consumed bool
47-
compression int
48+
closer io.Closer
49+
uncompressedReader io.Reader
50+
51+
consumed bool
52+
53+
compression compression.Compression
54+
compressionLevel int
4855

4956
mu sync.Mutex
5057
digest, diffID *v1.Hash
5158
size int64
52-
mediaType types.MediaType
59+
60+
oci bool
5361
}
5462

5563
var _ v1.Layer = (*Layer)(nil)
@@ -60,32 +68,54 @@ type LayerOption func(*Layer)
6068
// WithCompressionLevel sets the gzip compression. See `gzip.NewWriterLevel` for possible values.
6169
func WithCompressionLevel(level int) LayerOption {
6270
return func(l *Layer) {
63-
l.compression = level
71+
l.compressionLevel = level
6472
}
6573
}
6674

67-
// WithMediaType is a functional option for overriding the layer's media type.
68-
func WithMediaType(mt types.MediaType) LayerOption {
75+
// WithOCIMediaType is a functional option for overriding the layer's media type.
76+
func WithOCIMediaType(oci bool) LayerOption {
6977
return func(l *Layer) {
70-
l.mediaType = mt
78+
l.oci = oci
7179
}
7280
}
7381

7482
// NewLayer creates a Layer from an io.ReadCloser.
75-
func NewLayer(rc io.ReadCloser, opts ...LayerOption) *Layer {
83+
func NewLayer(rc io.ReadCloser, opts ...LayerOption) (*Layer, error) {
84+
comp, peekReader, err := internalcomp.PeekCompression(rc)
85+
if err != nil {
86+
return nil, err
87+
}
88+
7689
layer := &Layer{
77-
blob: rc,
78-
compression: gzip.BestSpeed,
79-
// We use DockerLayer for now as uncompressed layers
80-
// are unimplemented
81-
mediaType: types.DockerLayer,
90+
closer: rc,
91+
compression: comp,
92+
compressionLevel: gzip.BestSpeed,
93+
}
94+
95+
switch comp {
96+
case compression.ZStd:
97+
layer.compression = comp
98+
layer.uncompressedReader, err = zstd.NewReader(peekReader)
99+
if err != nil {
100+
return nil, err
101+
}
102+
case compression.GZip:
103+
layer.compression = comp
104+
layer.uncompressedReader, err = gzip.NewReader(peekReader)
105+
if err != nil {
106+
return nil, err
107+
}
108+
default:
109+
// No support for uncompressed layers for now
110+
layer.compression = compression.GZip
111+
layer.uncompressedReader = peekReader
82112
}
83113

84114
for _, opt := range opts {
85115
opt(layer)
86116
}
87117

88-
return layer
118+
return layer, nil
89119
}
90120

91121
// Digest implements v1.Layer.
@@ -120,7 +150,7 @@ func (l *Layer) Size() (int64, error) {
120150

121151
// MediaType implements v1.Layer
122152
func (l *Layer) MediaType() (types.MediaType, error) {
123-
return l.mediaType, nil
153+
return l.compression.ToMediaType(l.oci)
124154
}
125155

126156
// Uncompressed implements v1.Layer.
@@ -183,9 +213,27 @@ func newCompressedReader(l *Layer) (*compressedReader, error) {
183213
// Buffer the output of the gzip writer so we don't have to wait on pr to keep writing.
184214
// 64K ought to be small enough for anybody.
185215
bw := bufio.NewWriterSize(mw, 2<<16)
186-
zw, err := gzip.NewWriterLevel(bw, l.compression)
187-
if err != nil {
188-
return nil, err
216+
217+
var compressedWriter io.Writer
218+
var compressedCloser io.Closer
219+
220+
switch l.compression {
221+
case compression.ZStd:
222+
w, err := zstd.NewWriterLevel(bw, l.compressionLevel)
223+
if err != nil {
224+
return nil, err
225+
}
226+
compressedWriter = w
227+
compressedCloser = w
228+
case compression.GZip:
229+
w, err := gzip.NewWriterLevel(bw, l.compressionLevel)
230+
if err != nil {
231+
return nil, err
232+
}
233+
compressedWriter = w
234+
compressedCloser = w
235+
case compression.None:
236+
compressedWriter = bw
189237
}
190238

191239
doneDigesting := make(chan struct{})
@@ -211,7 +259,7 @@ func newCompressedReader(l *Layer) (*compressedReader, error) {
211259
//
212260
// NOTE: net/http will call close on success, so if we've already
213261
// closed the inner rc, it's not an error.
214-
if err := l.blob.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
262+
if err := l.closer.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
215263
return err
216264
}
217265

@@ -223,13 +271,16 @@ func newCompressedReader(l *Layer) (*compressedReader, error) {
223271
go func() {
224272
// Copy blob into the gzip writer, which also hashes and counts the
225273
// size of the compressed output, and hasher of the raw contents.
226-
_, copyErr := io.Copy(io.MultiWriter(h, zw), l.blob)
274+
_, copyErr := io.Copy(io.MultiWriter(h, compressedWriter), l.uncompressedReader)
227275

228276
// Close the gzip writer once copying is done. If this is done in the
229277
// Close method of compressedReader instead, then it can cause a panic
230278
// when the compressedReader is closed before the blob is fully
231279
// consumed and io.Copy in this goroutine is still blocking.
232-
closeErr := zw.Close()
280+
var closeErr error
281+
if compressedCloser != nil {
282+
closeErr = compressedCloser.Close()
283+
}
233284

234285
// Check errors from writing and closing streams.
235286
if copyErr != nil {

0 commit comments

Comments
 (0)