未验证 提交 2f1929fa 编写于 作者: A Anders Björklund 提交者: GitHub

Merge pull request #3899 from afbjorklund/reference

Use Reference, allow caching images with both Tag and Digest
......@@ -183,7 +183,8 @@
version = "v0.2.0"
[[projects]]
digest = "1:186f7de0e878b5ff1fca82271ce36a7abf9747be09d03b3f08a921584c2f26fc"
branch = "master"
digest = "1:dfed0914a28dd3a8561fbfdd5c7a1deb2b90dee8edea6f58c9285680fc37b5c2"
name = "github.com/google/go-containerregistry"
packages = [
"pkg/authn",
......@@ -192,12 +193,13 @@
"pkg/v1/partial",
"pkg/v1/remote",
"pkg/v1/remote/transport",
"pkg/v1/stream",
"pkg/v1/tarball",
"pkg/v1/types",
"pkg/v1/v1util",
]
pruneopts = "NUT"
revision = "3165313d6d3f973ec0b0ed3ec5a63b520e065d40"
revision = "019cdfc6adf96a4905a1b93a7aeaea1e50c0b6cf"
[[projects]]
digest = "1:63ede27834b468648817fb80cfb95d40abfc61341f89cb7a0d6779b6aa955425"
......
......@@ -302,12 +302,12 @@ func CacheImage(image, dst string) error {
return errors.Wrapf(err, "making cache image directory: %s", dst)
}
tag, err := name.NewTag(image, name.WeakValidation)
ref, err := name.ParseReference(image, name.WeakValidation)
if err != nil {
return errors.Wrap(err, "creating docker image name")
}
img, err := remote.Image(tag, remote.WithAuthFromKeychain(authn.DefaultKeychain))
img, err := remote.Image(ref, remote.WithAuthFromKeychain(authn.DefaultKeychain))
if err != nil {
return errors.Wrap(err, "fetching remote image")
}
......@@ -317,7 +317,7 @@ func CacheImage(image, dst string) error {
if err != nil {
return err
}
err = tarball.Write(tag, img, nil, f)
err = tarball.Write(ref, img, f)
if err != nil {
return err
}
......
......@@ -72,7 +72,7 @@ func (h *helper) Authorization() (string, error) {
var out bytes.Buffer
cmd.Stdout = &out
err := h.r.Run(cmd)
cmdErr := h.r.Run(cmd)
// If we see this specific message, it means the domain wasn't found
// and we should fall back on anonymous auth.
......@@ -81,16 +81,22 @@ func (h *helper) Authorization() (string, error) {
return Anonymous.Authorization()
}
if err != nil {
return "", err
}
// Any other output should be parsed as JSON and the Username / Secret
// fields used for Basic authentication.
ho := helperOutput{}
if err := json.Unmarshal([]byte(output), &ho); err != nil {
if cmdErr != nil {
// If we failed to parse output, it won't contain Secret, so returning it
// in an error should be fine.
return "", fmt.Errorf("invoking %s: %v; output: %s", helperName, cmdErr, output)
}
return "", err
}
if cmdErr != nil {
return "", fmt.Errorf("invoking %s: %v", helperName, cmdErr)
}
b := Basic{Username: ho.Username, Password: ho.Secret}
return b.Authorization()
}
......@@ -73,14 +73,14 @@ func NewDigest(name string, strict Strictness) (Digest, error) {
base := parts[0]
digest := parts[1]
// We don't require a digest, but if we get one check it's valid,
// even when not being strict.
// If we are being strict, we want to validate the digest regardless in case
// it's empty.
if digest != "" || strict == StrictValidation {
if err := checkDigest(digest); err != nil {
return Digest{}, err
}
// Always check that the digest is valid.
if err := checkDigest(digest); err != nil {
return Digest{}, err
}
tag, err := NewTag(base, strict)
if err == nil {
base = tag.Repository.Name()
}
repo, err := NewRepository(base, strict)
......
......@@ -15,12 +15,14 @@
package name
import (
"net"
"net/url"
"regexp"
"strings"
)
const (
// DefaultRegistry is Docker Hub, assumed when a hostname is omitted.
DefaultRegistry = "index.docker.io"
defaultRegistryAlias = "docker.io"
)
......@@ -63,11 +65,29 @@ func (r Registry) Scope(string) string {
return "registry:catalog:*"
}
func (r Registry) isRFC1918() bool {
ipStr := strings.Split(r.Name(), ":")[0]
ip := net.ParseIP(ipStr)
if ip == nil {
return false
}
for _, cidr := range []string{"10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"} {
_, block, _ := net.ParseCIDR(cidr)
if block.Contains(ip) {
return true
}
}
return false
}
// Scheme returns https scheme for all the endpoints except localhost or when explicitly defined.
func (r Registry) Scheme() string {
if r.insecure {
return "http"
}
if r.isRFC1918() {
return "http"
}
if strings.HasPrefix(r.Name(), "localhost:") {
return "http"
}
......
......@@ -21,27 +21,28 @@ import (
)
// ConfigFile is the configuration file that holds the metadata describing
// how to launch a container. The names of the fields are chosen to reflect
// the JSON payload of the ConfigFile as defined here: https://git.io/vrAEY
// how to launch a container. See:
// https://github.com/opencontainers/image-spec/blob/master/config.md
type ConfigFile struct {
Architecture string `json:"architecture"`
Container string `json:"container"`
Created Time `json:"created"`
DockerVersion string `json:"docker_version"`
History []History `json:"history"`
Author string `json:"author,omitempty"`
Container string `json:"container,omitempty"`
Created Time `json:"created,omitempty"`
DockerVersion string `json:"docker_version,omitempty"`
History []History `json:"history,omitempty"`
OS string `json:"os"`
RootFS RootFS `json:"rootfs"`
Config Config `json:"config"`
ContainerConfig Config `json:"container_config"`
OSVersion string `json:"osversion"`
ContainerConfig Config `json:"container_config,omitempty"`
OSVersion string `json:"osversion,omitempty"`
}
// History is one entry of a list recording how this container image was built.
type History struct {
Author string `json:"author"`
Created Time `json:"created"`
CreatedBy string `json:"created_by"`
Comment string `json:"comment"`
Author string `json:"author,omitempty"`
Created Time `json:"created,omitempty"`
CreatedBy string `json:"created_by,omitempty"`
Comment string `json:"comment,omitempty"`
EmptyLayer bool `json:"empty_layer,omitempty"`
}
......
......@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// Package v1 defines structured types for OCI v1 images
//go:generate deepcopy-gen -O zz_deepcopy_generated --go-header-file $BOILER_PLATE_FILE -i .
// +k8s:deepcopy-gen=package
//go:generate deepcopy-gen -O zz_deepcopy_generated --go-header-file $BOILER_PLATE_FILE -i .
// Package v1 defines structured types for OCI v1 images
package v1
......@@ -49,7 +49,7 @@ func NewHash(s string) (Hash, error) {
}
// MarshalJSON implements json.Marshaler
func (h *Hash) MarshalJSON() ([]byte, error) {
func (h Hash) MarshalJSON() ([]byte, error) {
return json.Marshal(h.String())
}
......
......@@ -24,9 +24,6 @@ type Image interface {
// The order of the list is oldest/base layer first, and most-recent/top layer last.
Layers() ([]Layer, error)
// BlobSet returns an unordered collection of all the blobs in the image.
BlobSet() (map[Hash]struct{}, error)
// MediaType of this image's manifest.
MediaType() (types.MediaType, error)
......
......@@ -18,6 +18,7 @@ import (
"github.com/google/go-containerregistry/pkg/v1/types"
)
// ImageIndex defines the interface for interacting with an OCI image index.
type ImageIndex interface {
// MediaType of this image's manifest.
MediaType() (types.MediaType, error)
......@@ -28,6 +29,12 @@ type ImageIndex interface {
// IndexManifest returns this image index's manifest object.
IndexManifest() (*IndexManifest, error)
// RawIndexManifest returns the serialized bytes of IndexManifest().
RawIndexManifest() ([]byte, error)
// RawManifest returns the serialized bytes of IndexManifest().
RawManifest() ([]byte, error)
// Image returns a v1.Image that this ImageIndex references.
Image(Hash) (Image, error)
// ImageIndex returns a v1.ImageIndex that this ImageIndex references.
ImageIndex(Hash) (ImageIndex, error)
}
......@@ -23,7 +23,7 @@ import (
// Manifest represents the OCI image manifest in a structured way.
type Manifest struct {
SchemaVersion int64 `json:"schemaVersion"`
SchemaVersion int64 `json:"schemaVersion,omitempty"`
MediaType types.MediaType `json:"mediaType"`
Config Descriptor `json:"config"`
Layers []Descriptor `json:"layers"`
......
......@@ -17,7 +17,7 @@ package partial
import (
"io"
"github.com/google/go-containerregistry/pkg/v1"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/v1util"
)
......@@ -91,11 +91,6 @@ type compressedImageExtender struct {
// Assert that our extender type completes the v1.Image interface
var _ v1.Image = (*compressedImageExtender)(nil)
// BlobSet implements v1.Image
func (i *compressedImageExtender) BlobSet() (map[v1.Hash]struct{}, error) {
return BlobSet(i)
}
// Digest implements v1.Image
func (i *compressedImageExtender) Digest() (v1.Hash, error) {
return Digest(i)
......
......@@ -19,7 +19,7 @@ import (
"io"
"sync"
"github.com/google/go-containerregistry/pkg/v1"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/types"
"github.com/google/go-containerregistry/pkg/v1/v1util"
)
......@@ -37,8 +37,12 @@ type UncompressedLayer interface {
// uncompressedLayerExtender implements v1.Image using the uncompressed base properties.
type uncompressedLayerExtender struct {
UncompressedLayer
// TODO(mattmoor): Memoize size/hash so that the methods aren't twice as
// Memoize size/hash so that the methods aren't twice as
// expensive as doing this manually.
hash v1.Hash
size int64
hashSizeError error
once sync.Once
}
// Compressed implements v1.Layer
......@@ -52,29 +56,31 @@ func (ule *uncompressedLayerExtender) Compressed() (io.ReadCloser, error) {
// Digest implements v1.Layer
func (ule *uncompressedLayerExtender) Digest() (v1.Hash, error) {
r, err := ule.Compressed()
if err != nil {
return v1.Hash{}, err
}
defer r.Close()
h, _, err := v1.SHA256(r)
return h, err
ule.calcSizeHash()
return ule.hash, ule.hashSizeError
}
// Size implements v1.Layer
func (ule *uncompressedLayerExtender) Size() (int64, error) {
r, err := ule.Compressed()
if err != nil {
return -1, err
}
defer r.Close()
_, i, err := v1.SHA256(r)
return i, err
ule.calcSizeHash()
return ule.size, ule.hashSizeError
}
func (ule *uncompressedLayerExtender) calcSizeHash() {
ule.once.Do(func() {
var r io.ReadCloser
r, ule.hashSizeError = ule.Compressed()
if ule.hashSizeError != nil {
return
}
defer r.Close()
ule.hash, ule.size, ule.hashSizeError = v1.SHA256(r)
})
}
// UncompressedToLayer fills in the missing methods from an UncompressedLayer so that it implements v1.Layer
func UncompressedToLayer(ul UncompressedLayer) (v1.Layer, error) {
return &uncompressedLayerExtender{ul}, nil
return &uncompressedLayerExtender{UncompressedLayer: ul}, nil
}
// UncompressedImageCore represents the bare minimum interface a natively
......@@ -106,11 +112,6 @@ type uncompressedImageExtender struct {
// Assert that our extender type completes the v1.Image interface
var _ v1.Image = (*uncompressedImageExtender)(nil)
// BlobSet implements v1.Image
func (i *uncompressedImageExtender) BlobSet() (map[v1.Hash]struct{}, error) {
return BlobSet(i)
}
// Digest implements v1.Image
func (i *uncompressedImageExtender) Digest() (v1.Hash, error) {
return Digest(i)
......@@ -214,13 +215,6 @@ func (i *uncompressedImageExtender) LayerByDiffID(diffID v1.Hash) (v1.Layer, err
// LayerByDigest implements v1.Image
func (i *uncompressedImageExtender) LayerByDigest(h v1.Hash) (v1.Layer, error) {
// Support returning the ConfigFile when asked for its hash.
if cfgName, err := i.ConfigName(); err != nil {
return nil, err
} else if cfgName == h {
return ConfigLayer(i)
}
diffID, err := BlobToDiffID(i, h)
if err != nil {
return nil, err
......
......@@ -19,8 +19,9 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"github.com/google/go-containerregistry/pkg/v1"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/v1util"
)
......@@ -49,8 +50,6 @@ func ConfigName(i WithRawConfigFile) (v1.Hash, error) {
return h, err
}
// configLayer implements v1.Layer from the raw config bytes.
// This is so that clients (e.g. remote) can access the config as a blob.
type configLayer struct {
hash v1.Hash
content []byte
......@@ -68,12 +67,12 @@ func (cl *configLayer) DiffID() (v1.Hash, error) {
// Uncompressed implements v1.Layer
func (cl *configLayer) Uncompressed() (io.ReadCloser, error) {
return v1util.NopReadCloser(bytes.NewBuffer(cl.content)), nil
return ioutil.NopCloser(bytes.NewBuffer(cl.content)), nil
}
// Compressed implements v1.Layer
func (cl *configLayer) Compressed() (io.ReadCloser, error) {
return v1util.NopReadCloser(bytes.NewBuffer(cl.content)), nil
return ioutil.NopCloser(bytes.NewBuffer(cl.content)), nil
}
// Size implements v1.Layer
......@@ -83,6 +82,8 @@ func (cl *configLayer) Size() (int64, error) {
var _ v1.Layer = (*configLayer)(nil)
// ConfigLayer implements v1.Layer from the raw config bytes.
// This is so that clients (e.g. remote) can access the config as a blob.
func ConfigLayer(i WithRawConfigFile) (v1.Layer, error) {
h, err := ConfigName(i)
if err != nil {
......@@ -190,20 +191,6 @@ func FSLayers(i WithManifest) ([]v1.Hash, error) {
return fsl, nil
}
// BlobSet is a helper for implementing v1.Image
func BlobSet(i WithManifest) (map[v1.Hash]struct{}, error) {
m, err := i.Manifest()
if err != nil {
return nil, err
}
bs := make(map[v1.Hash]struct{})
for _, l := range m.Layers {
bs[l.Digest] = struct{}{}
}
bs[m.Config.Digest] = struct{}{}
return bs, nil
}
// BlobSize is a helper for implementing v1.Image
func BlobSize(i WithManifest, h v1.Hash) (int64, error) {
m, err := i.Manifest()
......
......@@ -25,15 +25,8 @@ import (
"github.com/google/go-containerregistry/pkg/v1/remote/transport"
)
// DeleteOptions are used to expose optional information to guide or
// control the image deletion.
type DeleteOptions struct {
// TODO(mattmoor): Fail on not found?
// TODO(mattmoor): Delete tag and manifest?
}
// Delete removes the specified image reference from the remote registry.
func Delete(ref name.Reference, auth authn.Authenticator, t http.RoundTripper, do DeleteOptions) error {
func Delete(ref name.Reference, auth authn.Authenticator, t http.RoundTripper) error {
scopes := []string{ref.Scope(transport.DeleteScope)}
tr, err := transport.New(ref.Context().Registry, auth, t, scopes)
if err != nil {
......
......@@ -21,27 +21,35 @@ import (
"io/ioutil"
"net/http"
"net/url"
"strings"
"sync"
"github.com/google/go-containerregistry/pkg/authn"
"github.com/google/go-containerregistry/pkg/name"
"github.com/google/go-containerregistry/pkg/v1"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/partial"
"github.com/google/go-containerregistry/pkg/v1/remote/transport"
"github.com/google/go-containerregistry/pkg/v1/types"
"github.com/google/go-containerregistry/pkg/v1/v1util"
)
var defaultPlatform = v1.Platform{
Architecture: "amd64",
OS: "linux",
}
// remoteImage accesses an image from a remote registry
type remoteImage struct {
ref name.Reference
client *http.Client
fetcher
manifestLock sync.Mutex // Protects manifest
manifest []byte
configLock sync.Mutex // Protects config
config []byte
mediaType types.MediaType
platform v1.Platform
}
// ImageOption is a functional option for Image.
type ImageOption func(*imageOpener) error
var _ partial.CompressedImageCore = (*remoteImage)(nil)
......@@ -51,6 +59,7 @@ type imageOpener struct {
transport http.RoundTripper
ref name.Reference
client *http.Client
platform v1.Platform
}
func (i *imageOpener) Open() (v1.Image, error) {
......@@ -59,8 +68,11 @@ func (i *imageOpener) Open() (v1.Image, error) {
return nil, err
}
ri := &remoteImage{
ref: i.ref,
client: &http.Client{Transport: tr},
fetcher: fetcher{
Ref: i.ref,
Client: &http.Client{Transport: tr},
},
platform: i.platform,
}
imgCore, err := partial.CompressedToImage(ri)
if err != nil {
......@@ -81,6 +93,7 @@ func Image(ref name.Reference, options ...ImageOption) (v1.Image, error) {
auth: authn.Anonymous,
transport: http.DefaultTransport,
ref: ref,
platform: defaultPlatform,
}
for _, option := range options {
......@@ -91,69 +104,115 @@ func Image(ref name.Reference, options ...ImageOption) (v1.Image, error) {
return img.Open()
}
func (r *remoteImage) url(resource, identifier string) url.URL {
return url.URL{
Scheme: r.ref.Context().Registry.Scheme(),
Host: r.ref.Context().RegistryStr(),
Path: fmt.Sprintf("/v2/%s/%s/%s", r.ref.Context().RepositoryStr(), resource, identifier),
}
}
func (r *remoteImage) MediaType() (types.MediaType, error) {
// TODO(jonjohnsonjr): Determine this based on response.
return types.DockerManifestSchema2, nil
// fetcher implements methods for reading from a remote image.
type fetcher struct {
Ref name.Reference
Client *http.Client
}
// TODO(jonjohnsonjr): Handle manifest lists.
func (r *remoteImage) RawManifest() ([]byte, error) {
r.manifestLock.Lock()
defer r.manifestLock.Unlock()
if r.manifest != nil {
return r.manifest, nil
// url returns a url.Url for the specified path in the context of this remote image reference.
func (f *fetcher) url(resource, identifier string) url.URL {
return url.URL{
Scheme: f.Ref.Context().Registry.Scheme(),
Host: f.Ref.Context().RegistryStr(),
Path: fmt.Sprintf("/v2/%s/%s/%s", f.Ref.Context().RepositoryStr(), resource, identifier),
}
}
u := r.url("manifests", r.ref.Identifier())
func (f *fetcher) fetchManifest(acceptable []types.MediaType) ([]byte, *v1.Descriptor, error) {
u := f.url("manifests", f.Ref.Identifier())
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
return nil, nil, err
}
accept := []string{}
for _, mt := range acceptable {
accept = append(accept, string(mt))
}
// TODO(jonjohnsonjr): Accept OCI manifest, manifest list, and image index.
req.Header.Set("Accept", string(types.DockerManifestSchema2))
resp, err := r.client.Do(req)
req.Header.Set("Accept", strings.Join(accept, ","))
resp, err := f.Client.Do(req)
if err != nil {
return nil, err
return nil, nil, err
}
defer resp.Body.Close()
if err := CheckError(resp, http.StatusOK); err != nil {
return nil, err
if err := transport.CheckError(resp, http.StatusOK); err != nil {
return nil, nil, err
}
manifest, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
return nil, nil, err
}
digest, _, err := v1.SHA256(bytes.NewReader(manifest))
digest, size, err := v1.SHA256(bytes.NewReader(manifest))
if err != nil {
return nil, err
return nil, nil, err
}
// Validate the digest matches what we asked for, if pulling by digest.
if dgst, ok := r.ref.(name.Digest); ok {
if dgst, ok := f.Ref.(name.Digest); ok {
if digest.String() != dgst.DigestStr() {
return nil, fmt.Errorf("manifest digest: %q does not match requested digest: %q for %q", digest, dgst.DigestStr(), r.ref)
return nil, nil, fmt.Errorf("manifest digest: %q does not match requested digest: %q for %q", digest, dgst.DigestStr(), f.Ref)
}
} else if checksum := resp.Header.Get("Docker-Content-Digest"); checksum != "" && checksum != digest.String() {
err := fmt.Errorf("manifest digest: %q does not match Docker-Content-Digest: %q for %q", digest, checksum, r.ref)
if r.ref.Context().RegistryStr() == name.DefaultRegistry {
// TODO(docker/distribution#2395): Remove this check.
} else {
// When pulling by tag, we can only validate that the digest matches what the registry told us it should be.
} else {
// Do nothing for tags; I give up.
//
// We'd like to validate that the "Docker-Content-Digest" header matches what is returned by the registry,
// but so many registries implement this incorrectly that it's not worth checking.
//
// For reference:
// https://github.com/docker/distribution/issues/2395
// https://github.com/GoogleContainerTools/kaniko/issues/298
}
// Return all this info since we have to calculate it anyway.
desc := v1.Descriptor{
Digest: digest,
Size: size,
MediaType: types.MediaType(resp.Header.Get("Content-Type")),
}
return manifest, &desc, nil
}
func (r *remoteImage) MediaType() (types.MediaType, error) {
if string(r.mediaType) != "" {
return r.mediaType, nil
}
return types.DockerManifestSchema2, nil
}
// TODO(jonjohnsonjr): Handle manifest lists.
func (r *remoteImage) RawManifest() ([]byte, error) {
r.manifestLock.Lock()
defer r.manifestLock.Unlock()
if r.manifest != nil {
return r.manifest, nil
}
acceptable := []types.MediaType{
types.DockerManifestSchema2,
types.OCIManifestSchema1,
// We'll resolve these to an image based on the platform.
types.DockerManifestList,
types.OCIImageIndex,
}
manifest, desc, err := r.fetchManifest(acceptable)
if err != nil {
return nil, err
}
// We want an image but the registry has an index, resolve it to an image.
for desc.MediaType == types.DockerManifestList || desc.MediaType == types.OCIImageIndex {
manifest, desc, err = r.matchImage(manifest)
if err != nil {
return nil, err
}
}
r.mediaType = desc.MediaType
r.manifest = manifest
return r.manifest, nil
}
......@@ -201,12 +260,12 @@ func (rl *remoteLayer) Digest() (v1.Hash, error) {
// Compressed implements partial.CompressedLayer
func (rl *remoteLayer) Compressed() (io.ReadCloser, error) {
u := rl.ri.url("blobs", rl.digest.String())
resp, err := rl.ri.client.Get(u.String())
resp, err := rl.ri.Client.Get(u.String())
if err != nil {
return nil, err
}
if err := CheckError(resp, http.StatusOK); err != nil {
if err := transport.CheckError(resp, http.StatusOK); err != nil {
resp.Body.Close()
return nil, err
}
......@@ -243,3 +302,36 @@ func (r *remoteImage) LayerByDigest(h v1.Hash) (partial.CompressedLayer, error)
digest: h,
}, nil
}
// This naively matches the first manifest with matching Architecture and OS.
//
// We should probably use this instead:
// github.com/containerd/containerd/platforms
//
// But first we'd need to migrate to:
// github.com/opencontainers/image-spec/specs-go/v1
func (r *remoteImage) matchImage(rawIndex []byte) ([]byte, *v1.Descriptor, error) {
index, err := v1.ParseIndexManifest(bytes.NewReader(rawIndex))
if err != nil {
return nil, nil, err
}
for _, childDesc := range index.Manifests {
// If platform is missing from child descriptor, assume it's amd64/linux.
p := defaultPlatform
if childDesc.Platform != nil {
p = *childDesc.Platform
}
if r.platform.Architecture == p.Architecture && r.platform.OS == p.OS {
childRef, err := name.ParseReference(fmt.Sprintf("%s@%s", r.Ref.Context(), childDesc.Digest), name.StrictValidation)
if err != nil {
return nil, nil, err
}
r.fetcher = fetcher{
Client: r.Client,
Ref: childRef,
}
return r.fetchManifest([]types.MediaType{childDesc.MediaType})
}
}
return nil, nil, fmt.Errorf("no matching image for %s/%s, index: %s", r.platform.Architecture, r.platform.OS, string(rawIndex))
}
// Copyright 2018 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"bytes"
"fmt"
"net/http"
"sync"
"github.com/google/go-containerregistry/pkg/authn"
"github.com/google/go-containerregistry/pkg/name"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/partial"
"github.com/google/go-containerregistry/pkg/v1/remote/transport"
"github.com/google/go-containerregistry/pkg/v1/types"
)
// remoteIndex accesses an index from a remote registry
type remoteIndex struct {
fetcher
manifestLock sync.Mutex // Protects manifest
manifest []byte
mediaType types.MediaType
}
// Index provides access to a remote index reference, applying functional options
// to the underlying imageOpener before resolving the reference into a v1.ImageIndex.
func Index(ref name.Reference, options ...ImageOption) (v1.ImageIndex, error) {
i := &imageOpener{
auth: authn.Anonymous,
transport: http.DefaultTransport,
ref: ref,
}
for _, option := range options {
if err := option(i); err != nil {
return nil, err
}
}
tr, err := transport.New(i.ref.Context().Registry, i.auth, i.transport, []string{i.ref.Scope(transport.PullScope)})
if err != nil {
return nil, err
}
return &remoteIndex{
fetcher: fetcher{
Ref: i.ref,
Client: &http.Client{Transport: tr},
},
}, nil
}
func (r *remoteIndex) MediaType() (types.MediaType, error) {
if string(r.mediaType) != "" {
return r.mediaType, nil
}
return types.DockerManifestList, nil
}
func (r *remoteIndex) Digest() (v1.Hash, error) {
return partial.Digest(r)
}
func (r *remoteIndex) RawManifest() ([]byte, error) {
r.manifestLock.Lock()
defer r.manifestLock.Unlock()
if r.manifest != nil {
return r.manifest, nil
}
acceptable := []types.MediaType{
types.DockerManifestList,
types.OCIImageIndex,
}
manifest, desc, err := r.fetchManifest(acceptable)
if err != nil {
return nil, err
}
r.mediaType = desc.MediaType
r.manifest = manifest
return r.manifest, nil
}
func (r *remoteIndex) IndexManifest() (*v1.IndexManifest, error) {
b, err := r.RawManifest()
if err != nil {
return nil, err
}
return v1.ParseIndexManifest(bytes.NewReader(b))
}
func (r *remoteIndex) Image(h v1.Hash) (v1.Image, error) {
imgRef, err := name.ParseReference(fmt.Sprintf("%s@%s", r.Ref.Context(), h), name.StrictValidation)
if err != nil {
return nil, err
}
ri := &remoteImage{
fetcher: fetcher{
Ref: imgRef,
Client: r.Client,
},
}
imgCore, err := partial.CompressedToImage(ri)
if err != nil {
return imgCore, err
}
// Wrap the v1.Layers returned by this v1.Image in a hint for downstream
// remote.Write calls to facilitate cross-repo "mounting".
return &mountableImage{
Image: imgCore,
Reference: r.Ref,
}, nil
}
func (r *remoteIndex) ImageIndex(h v1.Hash) (v1.ImageIndex, error) {
idxRef, err := name.ParseReference(fmt.Sprintf("%s@%s", r.Ref.Context(), h), name.StrictValidation)
if err != nil {
return nil, err
}
return &remoteIndex{
fetcher: fetcher{
Ref: idxRef,
Client: r.Client,
},
}, nil
}
......@@ -25,12 +25,12 @@ import (
"github.com/google/go-containerregistry/pkg/v1/remote/transport"
)
type Tags struct {
type tags struct {
Name string `json:"name"`
Tags []string `json:"tags"`
}
// TODO(jonjohnsonjr): return []name.Tag?
// List calls /tags/list for the given repository.
func List(repo name.Repository, auth authn.Authenticator, t http.RoundTripper) ([]string, error) {
scopes := []string{repo.Scope(transport.PullScope)}
tr, err := transport.New(repo.Registry, auth, t, scopes)
......@@ -51,14 +51,14 @@ func List(repo name.Repository, auth authn.Authenticator, t http.RoundTripper) (
}
defer resp.Body.Close()
if err := CheckError(resp, http.StatusOK); err != nil {
if err := transport.CheckError(resp, http.StatusOK); err != nil {
return nil, err
}
tags := Tags{}
if err := json.NewDecoder(resp.Body).Decode(&tags); err != nil {
parsed := tags{}
if err := json.NewDecoder(resp.Body).Decode(&parsed); err != nil {
return nil, err
}
return tags.Tags, nil
return parsed.Tags, nil
}
......@@ -16,7 +16,7 @@ package remote
import (
"github.com/google/go-containerregistry/pkg/name"
"github.com/google/go-containerregistry/pkg/v1"
v1 "github.com/google/go-containerregistry/pkg/v1"
)
// MountableLayer wraps a v1.Layer in a shim that enables the layer to be
......
......@@ -19,6 +19,7 @@ import (
"net/http"
"github.com/google/go-containerregistry/pkg/authn"
v1 "github.com/google/go-containerregistry/pkg/v1"
)
// WithTransport is a functional option for overriding the default transport
......@@ -54,3 +55,10 @@ func WithAuthFromKeychain(keys authn.Keychain) ImageOption {
return nil
}
}
func WithPlatform(p v1.Platform) ImageOption {
return func(i *imageOpener) error {
i.platform = p
return nil
}
}
......@@ -39,7 +39,8 @@ func (bt *basicTransport) RoundTrip(in *http.Request) (*http.Response, error) {
// abstraction, so to avoid forwarding Authorization headers to places
// we are redirected, only set it when the authorization header matches
// the host with which we are interacting.
if in.Host == bt.target {
// In case of redirect http.Client can use an empty Host, check URL too.
if in.Host == bt.target || in.URL.Host == bt.target {
in.Header.Set("Authorization", hdr)
}
in.Header.Set("User-Agent", transportName)
......
......@@ -15,9 +15,8 @@
package transport
import (
"fmt"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
......@@ -40,28 +39,48 @@ type bearerTransport struct {
// See https://docs.docker.com/registry/spec/auth/token/
service string
scopes []string
// Scheme we should use, determined by ping response.
scheme string
}
var _ http.RoundTripper = (*bearerTransport)(nil)
// RoundTrip implements http.RoundTripper
func (bt *bearerTransport) RoundTrip(in *http.Request) (*http.Response, error) {
hdr, err := bt.bearer.Authorization()
sendRequest := func() (*http.Response, error) {
hdr, err := bt.bearer.Authorization()
if err != nil {
return nil, err
}
// http.Client handles redirects at a layer above the http.RoundTripper
// abstraction, so to avoid forwarding Authorization headers to places
// we are redirected, only set it when the authorization header matches
// the registry with which we are interacting.
// In case of redirect http.Client can use an empty Host, check URL too.
if in.Host == bt.registry.RegistryStr() || in.URL.Host == bt.registry.RegistryStr() {
in.Header.Set("Authorization", hdr)
}
in.Header.Set("User-Agent", transportName)
in.URL.Scheme = bt.scheme
return bt.inner.RoundTrip(in)
}
res, err := sendRequest()
if err != nil {
return nil, err
}
// http.Client handles redirects at a layer above the http.RoundTripper
// abstraction, so to avoid forwarding Authorization headers to places
// we are redirected, only set it when the authorization header matches
// the registry with which we are interacting.
if in.Host == bt.registry.RegistryStr() {
in.Header.Set("Authorization", hdr)
// Perform a token refresh() and retry the request in case the token has expired
if res.StatusCode == http.StatusUnauthorized {
if err = bt.refresh(); err != nil {
return nil, err
}
return sendRequest()
}
in.Header.Set("User-Agent", transportName)
// TODO(mattmoor): On 401s perform a single refresh() and retry.
return bt.inner.RoundTrip(in)
return res, err
}
func (bt *bearerTransport) refresh() error {
......@@ -87,6 +106,10 @@ func (bt *bearerTransport) refresh() error {
}
defer resp.Body.Close()
if err := CheckError(resp, http.StatusOK); err != nil {
return err
}
content, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
......
......@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
package transport
import (
"encoding/json"
......@@ -35,7 +35,7 @@ var _ error = (*Error)(nil)
func (e *Error) Error() string {
switch len(e.Errors) {
case 0:
return "<empty remote.Error response>"
return "<empty transport.Error response>"
case 1:
return e.Errors[0].String()
default:
......@@ -55,9 +55,13 @@ type Diagnostic struct {
Detail interface{} `json:"detail,omitempty"`
}
// String stringifies the Diagnostic
// String stringifies the Diagnostic in the form: $Code: $Message[; $Detail]
func (d Diagnostic) String() string {
return fmt.Sprintf("%s: %q", d.Code, d.Message)
msg := fmt.Sprintf("%s: %s", d.Code, d.Message)
if d.Detail != nil {
msg = fmt.Sprintf("%s; %v", msg, d.Detail)
}
return msg
}
// ErrorCode is an enumeration of supported error codes.
......@@ -83,6 +87,7 @@ const (
UnsupportedErrorCode ErrorCode = "UNSUPPORTED"
)
// CheckError returns a structured error if the response status is not in codes.
func CheckError(resp *http.Response, codes ...int) error {
for _, code := range codes {
if resp.StatusCode == code {
......
......@@ -36,6 +36,9 @@ type pingResp struct {
// Following the challenge there are often key/value pairs
// e.g. Bearer service="gcr.io",realm="https://auth.gcr.io/v36/tokenz"
parameters map[string]string
// The registry's scheme to use. Communicates whether we fell back to http.
scheme string
}
func (c challenge) Canonical() challenge {
......@@ -63,31 +66,50 @@ func parseChallenge(suffix string) map[string]string {
func ping(reg name.Registry, t http.RoundTripper) (*pingResp, error) {
client := http.Client{Transport: t}
url := fmt.Sprintf("%s://%s/v2/", reg.Scheme(), reg.Name())
resp, err := client.Get(url)
if err != nil {
return nil, err
// This first attempts to use "https" for every request, falling back to http
// if the registry matches our localhost heuristic or if it is intentionally
// set to insecure via name.NewInsecureRegistry.
schemes := []string{"https"}
if reg.Scheme() == "http" {
schemes = append(schemes, "http")
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK:
// If we get a 200, then no authentication is needed.
return &pingResp{challenge: anonymous}, nil
case http.StatusUnauthorized:
wac := resp.Header.Get(http.CanonicalHeaderKey("WWW-Authenticate"))
if parts := strings.SplitN(wac, " ", 2); len(parts) == 2 {
// If there are two parts, then parse the challenge parameters.
var connErr error
for _, scheme := range schemes {
url := fmt.Sprintf("%s://%s/v2/", scheme, reg.Name())
resp, err := client.Get(url)
if err != nil {
connErr = err
// Potentially retry with http.
continue
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK:
// If we get a 200, then no authentication is needed.
return &pingResp{
challenge: anonymous,
scheme: scheme,
}, nil
case http.StatusUnauthorized:
wac := resp.Header.Get(http.CanonicalHeaderKey("WWW-Authenticate"))
if parts := strings.SplitN(wac, " ", 2); len(parts) == 2 {
// If there are two parts, then parse the challenge parameters.
return &pingResp{
challenge: challenge(parts[0]).Canonical(),
parameters: parseChallenge(parts[1]),
scheme: scheme,
}, nil
}
// Otherwise, just return the challenge without parameters.
return &pingResp{
challenge: challenge(parts[0]).Canonical(),
parameters: parseChallenge(parts[1]),
challenge: challenge(wac).Canonical(),
scheme: scheme,
}, nil
default:
return nil, fmt.Errorf("unrecognized HTTP status: %v", resp.Status)
}
// Otherwise, just return the challenge without parameters.
return &pingResp{
challenge: challenge(wac).Canonical(),
}, nil
default:
return nil, fmt.Errorf("unrecognized HTTP status: %v", resp.Status)
}
return nil, connErr
}
......@@ -73,6 +73,7 @@ func New(reg name.Registry, auth authn.Authenticator, t http.RoundTripper, scope
registry: reg,
service: service,
scopes: scopes,
scheme: pr.scheme,
}
if err := bt.refresh(); err != nil {
return nil, err
......
......@@ -18,26 +18,29 @@ import (
"bytes"
"errors"
"fmt"
"io"
"log"
"net/http"
"net/url"
"github.com/google/go-containerregistry/pkg/authn"
"github.com/google/go-containerregistry/pkg/name"
"github.com/google/go-containerregistry/pkg/v1"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/partial"
"github.com/google/go-containerregistry/pkg/v1/remote/transport"
"github.com/google/go-containerregistry/pkg/v1/stream"
"github.com/google/go-containerregistry/pkg/v1/types"
"golang.org/x/sync/errgroup"
)
// WriteOptions are used to expose optional information to guide or
// control the image write.
type WriteOptions struct {
// TODO(mattmoor): Expose "threads" to limit parallelism?
type manifest interface {
RawManifest() ([]byte, error)
MediaType() (types.MediaType, error)
Digest() (v1.Hash, error)
}
// Write pushes the provided img to the specified image reference.
func Write(ref name.Reference, img v1.Image, auth authn.Authenticator, t http.RoundTripper,
wo WriteOptions) error {
func Write(ref name.Reference, img v1.Image, auth authn.Authenticator, t http.RoundTripper) error {
ls, err := img.Layers()
if err != nil {
return err
......@@ -49,50 +52,74 @@ func Write(ref name.Reference, img v1.Image, auth authn.Authenticator, t http.Ro
return err
}
w := writer{
ref: ref,
client: &http.Client{Transport: tr},
img: img,
options: wo,
}
ref: ref,
client: &http.Client{Transport: tr},
}
// Upload individual layers in goroutines and collect any errors.
// If we can dedupe by the layer digest, try to do so. If the layer is
// a stream.Layer, we can't dedupe and might re-upload.
var g errgroup.Group
uploaded := map[v1.Hash]bool{}
for _, l := range ls {
l := l
if _, ok := l.(*stream.Layer); !ok {
h, err := l.Digest()
if err != nil {
return err
}
// If we can determine the layer's digest ahead of
// time, use it to dedupe uploads.
if uploaded[h] {
continue // Already uploading.
}
uploaded[h] = true
}
bs, err := img.BlobSet()
if err != nil {
return err
g.Go(func() error {
return w.uploadOne(l)
})
}
// Spin up go routines to publish each of the members of BlobSet(),
// and use an error channel to collect their results.
errCh := make(chan error)
defer close(errCh)
for h := range bs {
go func(h v1.Hash) {
errCh <- w.uploadOne(h)
}(h)
}
if l, err := partial.ConfigLayer(img); err == stream.ErrNotComputed {
// We can't read the ConfigLayer, because of streaming layers, since the
// config hasn't been calculated yet.
if err := g.Wait(); err != nil {
return err
}
// Now wait for all of the blob uploads to complete.
var errors []error
for _ = range bs {
if err := <-errCh; err != nil {
errors = append(errors, err)
// Now that all the layers are uploaded, upload the config file blob.
l, err := partial.ConfigLayer(img)
if err != nil {
return err
}
if err := w.uploadOne(l); err != nil {
return err
}
} else if err != nil {
// This is an actual error, not a streaming error, just return it.
return err
} else {
// We *can* read the ConfigLayer, so upload it concurrently with the layers.
g.Go(func() error {
return w.uploadOne(l)
})
// Wait for the layers + config.
if err := g.Wait(); err != nil {
return err
}
}
if len(errors) > 0 {
// Return the first error we encountered.
return errors[0]
}
// With all of the constituent elements uploaded, upload the manifest
// to commit the image.
return w.commitImage()
return w.commitImage(img)
}
// writer writes the elements of an image to a remote image reference.
type writer struct {
ref name.Reference
client *http.Client
img v1.Image
options WriteOptions
ref name.Reference
client *http.Client
}
// url returns a url.Url for the specified path in the context of this remote image reference.
......@@ -120,11 +147,11 @@ func (w *writer) nextLocation(resp *http.Response) (string, error) {
return resp.Request.URL.ResolveReference(u).String(), nil
}
// checkExisting checks if a blob exists already in the repository by making a
// checkExistingBlob checks if a blob exists already in the repository by making a
// HEAD request to the blob store API. GCR performs an existence check on the
// initiation if "mount" is specified, even if no "from" sources are specified.
// However, this is not broadly applicable to all registries, e.g. ECR.
func (w *writer) checkExisting(h v1.Hash) (bool, error) {
func (w *writer) checkExistingBlob(h v1.Hash) (bool, error) {
u := w.url(fmt.Sprintf("/v2/%s/blobs/%s", w.ref.Context().RepositoryStr(), h.String()))
resp, err := w.client.Head(u.String())
......@@ -133,7 +160,31 @@ func (w *writer) checkExisting(h v1.Hash) (bool, error) {
}
defer resp.Body.Close()
if err := CheckError(resp, http.StatusOK, http.StatusNotFound); err != nil {
if err := transport.CheckError(resp, http.StatusOK, http.StatusNotFound); err != nil {
return false, err
}
return resp.StatusCode == http.StatusOK, nil
}
// checkExistingManifest checks if a manifest exists already in the repository
// by making a HEAD request to the manifest API.
func (w *writer) checkExistingManifest(h v1.Hash, mt types.MediaType) (bool, error) {
u := w.url(fmt.Sprintf("/v2/%s/manifests/%s", w.ref.Context().RepositoryStr(), h.String()))
req, err := http.NewRequest(http.MethodHead, u.String(), nil)
if err != nil {
return false, err
}
req.Header.Set("Accept", string(mt))
resp, err := w.client.Do(req)
if err != nil {
return false, err
}
defer resp.Body.Close()
if err := transport.CheckError(resp, http.StatusOK, http.StatusNotFound); err != nil {
return false, err
}
......@@ -146,20 +197,13 @@ func (w *writer) checkExisting(h v1.Hash) (bool, error) {
// On success, the layer was either mounted (nothing more to do) or a blob
// upload was initiated and the body of that blob should be sent to the returned
// location.
func (w *writer) initiateUpload(h v1.Hash) (location string, mounted bool, err error) {
func (w *writer) initiateUpload(from, mount string) (location string, mounted bool, err error) {
u := w.url(fmt.Sprintf("/v2/%s/blobs/uploads/", w.ref.Context().RepositoryStr()))
uv := url.Values{
"mount": []string{h.String()},
}
l, err := w.img.LayerByDigest(h)
if err != nil {
return "", false, err
}
if ml, ok := l.(*MountableLayer); ok {
if w.ref.Context().RegistryStr() == ml.Reference.Context().RegistryStr() {
uv["from"] = []string{ml.Reference.Context().RepositoryStr()}
}
uv := url.Values{}
if mount != "" && from != "" {
// Quay will fail if we specify a "mount" without a "from".
uv["mount"] = []string{mount}
uv["from"] = []string{from}
}
u.RawQuery = uv.Encode()
......@@ -170,7 +214,7 @@ func (w *writer) initiateUpload(h v1.Hash) (location string, mounted bool, err e
}
defer resp.Body.Close()
if err := CheckError(resp, http.StatusCreated, http.StatusAccepted); err != nil {
if err := transport.CheckError(resp, http.StatusCreated, http.StatusAccepted); err != nil {
return "", false, err
}
......@@ -191,15 +235,7 @@ func (w *writer) initiateUpload(h v1.Hash) (location string, mounted bool, err e
// streamBlob streams the contents of the blob to the specified location.
// On failure, this will return an error. On success, this will return the location
// header indicating how to commit the streamed blob.
func (w *writer) streamBlob(h v1.Hash, streamLocation string) (commitLocation string, err error) {
l, err := w.img.LayerByDigest(h)
if err != nil {
return "", err
}
blob, err := l.Compressed()
if err != nil {
return "", err
}
func (w *writer) streamBlob(blob io.ReadCloser, streamLocation string) (commitLocation string, err error) {
defer blob.Close()
req, err := http.NewRequest(http.MethodPatch, streamLocation, blob)
......@@ -213,7 +249,7 @@ func (w *writer) streamBlob(h v1.Hash, streamLocation string) (commitLocation st
}
defer resp.Body.Close()
if err := CheckError(resp, http.StatusNoContent, http.StatusAccepted, http.StatusCreated); err != nil {
if err := transport.CheckError(resp, http.StatusNoContent, http.StatusAccepted, http.StatusCreated); err != nil {
return "", err
}
......@@ -222,14 +258,15 @@ func (w *writer) streamBlob(h v1.Hash, streamLocation string) (commitLocation st
return w.nextLocation(resp)
}
// commitBlob commits this blob by sending a PUT to the location returned from streaming the blob.
func (w *writer) commitBlob(h v1.Hash, location string) (err error) {
// commitBlob commits this blob by sending a PUT to the location returned from
// streaming the blob.
func (w *writer) commitBlob(location, digest string) error {
u, err := url.Parse(location)
if err != nil {
return err
}
v := u.Query()
v.Set("digest", h.String())
v.Set("digest", digest)
u.RawQuery = v.Encode()
req, err := http.NewRequest(http.MethodPut, u.String(), nil)
......@@ -243,47 +280,82 @@ func (w *writer) commitBlob(h v1.Hash, location string) (err error) {
}
defer resp.Body.Close()
return CheckError(resp, http.StatusCreated)
return transport.CheckError(resp, http.StatusCreated)
}
// uploadOne performs a complete upload of a single layer.
func (w *writer) uploadOne(h v1.Hash) error {
existing, err := w.checkExisting(h)
if err != nil {
return err
func (w *writer) uploadOne(l v1.Layer) error {
var from, mount, digest string
if _, ok := l.(*stream.Layer); !ok {
// Layer isn't streamable, we should take advantage of that to
// skip uploading if possible.
// By sending ?digest= in the request, we'll also check that
// our computed digest matches the one computed by the
// registry.
h, err := l.Digest()
if err != nil {
return err
}
digest = h.String()
existing, err := w.checkExistingBlob(h)
if err != nil {
return err
}
if existing {
log.Printf("existing blob: %v", h)
return nil
}
mount = h.String()
}
if existing {
log.Printf("existing blob: %v", h)
return nil
if ml, ok := l.(*MountableLayer); ok {
if w.ref.Context().RegistryStr() == ml.Reference.Context().RegistryStr() {
from = ml.Reference.Context().RepositoryStr()
}
}
location, mounted, err := w.initiateUpload(h)
location, mounted, err := w.initiateUpload(from, mount)
if err != nil {
return err
} else if mounted {
log.Printf("mounted blob: %v", h)
h, err := l.Digest()
if err != nil {
return err
}
log.Printf("mounted blob: %s", h.String())
return nil
}
location, err = w.streamBlob(h, location)
blob, err := l.Compressed()
if err != nil {
return err
}
location, err = w.streamBlob(blob, location)
if err != nil {
return err
}
if err := w.commitBlob(h, location); err != nil {
h, err := l.Digest()
if err != nil {
return err
}
log.Printf("pushed blob %v", h)
digest = h.String()
if err := w.commitBlob(location, digest); err != nil {
return err
}
log.Printf("pushed blob: %s", digest)
return nil
}
// commitImage does a PUT of the image's manifest.
func (w *writer) commitImage() error {
raw, err := w.img.RawManifest()
func (w *writer) commitImage(man manifest) error {
raw, err := man.RawManifest()
if err != nil {
return err
}
mt, err := w.img.MediaType()
mt, err := man.MediaType()
if err != nil {
return err
}
......@@ -303,11 +375,11 @@ func (w *writer) commitImage() error {
}
defer resp.Body.Close()
if err := CheckError(resp, http.StatusOK, http.StatusCreated, http.StatusAccepted); err != nil {
if err := transport.CheckError(resp, http.StatusOK, http.StatusCreated, http.StatusAccepted); err != nil {
return err
}
digest, err := w.img.Digest()
digest, err := man.Digest()
if err != nil {
return err
}
......@@ -334,11 +406,68 @@ func scopesForUploadingImage(ref name.Reference, layers []v1.Layer) []string {
// Push scope should be the first element because a few registries just look at the first scope to determine access.
scopes = append(scopes, ref.Scope(transport.PushScope))
for scope, _ := range scopeSet {
for scope := range scopeSet {
scopes = append(scopes, scope)
}
return scopes
}
// TODO(mattmoor): WriteIndex
// WriteIndex pushes the provided ImageIndex to the specified image reference.
// WriteIndex will attempt to push all of the referenced manifests before
// attempting to push the ImageIndex, to retain referential integrity.
func WriteIndex(ref name.Reference, ii v1.ImageIndex, auth authn.Authenticator, t http.RoundTripper) error {
index, err := ii.IndexManifest()
if err != nil {
return err
}
scopes := []string{ref.Scope(transport.PushScope)}
tr, err := transport.New(ref.Context().Registry, auth, t, scopes)
if err != nil {
return err
}
w := writer{
ref: ref,
client: &http.Client{Transport: tr},
}
for _, desc := range index.Manifests {
ref, err := name.ParseReference(fmt.Sprintf("%s@%s", ref.Context(), desc.Digest), name.StrictValidation)
if err != nil {
return err
}
exists, err := w.checkExistingManifest(desc.Digest, desc.MediaType)
if err != nil {
return err
}
if exists {
log.Printf("existing manifest: %v", desc.Digest)
continue
}
switch desc.MediaType {
case types.OCIImageIndex, types.DockerManifestList:
ii, err := ii.ImageIndex(desc.Digest)
if err != nil {
return err
}
if err := WriteIndex(ref, ii, auth, t); err != nil {
return err
}
case types.OCIManifestSchema1, types.DockerManifestSchema2:
img, err := ii.Image(desc.Digest)
if err != nil {
return err
}
if err := Write(ref, img, auth, t); err != nil {
return err
}
}
}
// With all of the constituent elements uploaded, upload the manifest
// to commit the image.
return w.commitImage(ii)
}
// Copyright 2018 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stream
import (
"compress/gzip"
"crypto/sha256"
"encoding/hex"
"errors"
"hash"
"io"
"sync"
v1 "github.com/google/go-containerregistry/pkg/v1"
)
var (
// ErrNotComputed is returned when the requested value is not yet
// computed because the stream has not been consumed yet.
ErrNotComputed = errors.New("value not computed until stream is consumed")
// ErrConsumed is returned by Compressed when the underlying stream has
// already been consumed and closed.
ErrConsumed = errors.New("stream was already consumed")
)
// Layer is a streaming implementation of v1.Layer.
type Layer struct {
blob io.ReadCloser
consumed bool
mu sync.Mutex
digest, diffID *v1.Hash
size int64
}
var _ v1.Layer = (*Layer)(nil)
// NewLayer creates a Layer from an io.ReadCloser.
func NewLayer(rc io.ReadCloser) *Layer { return &Layer{blob: rc} }
// Digest implements v1.Layer.
func (l *Layer) Digest() (v1.Hash, error) {
l.mu.Lock()
defer l.mu.Unlock()
if l.digest == nil {
return v1.Hash{}, ErrNotComputed
}
return *l.digest, nil
}
// DiffID implements v1.Layer.
func (l *Layer) DiffID() (v1.Hash, error) {
l.mu.Lock()
defer l.mu.Unlock()
if l.diffID == nil {
return v1.Hash{}, ErrNotComputed
}
return *l.diffID, nil
}
// Size implements v1.Layer.
func (l *Layer) Size() (int64, error) {
l.mu.Lock()
defer l.mu.Unlock()
if l.size == 0 {
return 0, ErrNotComputed
}
return l.size, nil
}
// Uncompressed implements v1.Layer.
func (l *Layer) Uncompressed() (io.ReadCloser, error) {
return nil, errors.New("NYI: stream.Layer.Uncompressed is not implemented")
}
// Compressed implements v1.Layer.
func (l *Layer) Compressed() (io.ReadCloser, error) {
if l.consumed {
return nil, ErrConsumed
}
return newCompressedReader(l)
}
type compressedReader struct {
closer io.Closer // original blob's Closer.
h, zh hash.Hash // collects digests of compressed and uncompressed stream.
pr io.Reader
count *countWriter
l *Layer // stream.Layer to update upon Close.
}
func newCompressedReader(l *Layer) (*compressedReader, error) {
h := sha256.New()
zh := sha256.New()
count := &countWriter{}
// gzip.Writer writes to the output stream via pipe, a hasher to
// capture compressed digest, and a countWriter to capture compressed
// size.
pr, pw := io.Pipe()
zw, err := gzip.NewWriterLevel(io.MultiWriter(pw, zh, count), gzip.BestSpeed)
if err != nil {
return nil, err
}
cr := &compressedReader{
closer: newMultiCloser(zw, l.blob),
pr: pr,
h: h,
zh: zh,
count: count,
l: l,
}
go func() {
if _, err := io.Copy(io.MultiWriter(h, zw), l.blob); err != nil {
pw.CloseWithError(err)
return
}
// Now close the compressed reader, to flush the gzip stream
// and calculate digest/diffID/size. This will cause pr to
// return EOF which will cause readers of the Compressed stream
// to finish reading.
pw.CloseWithError(cr.Close())
}()
return cr, nil
}
func (cr *compressedReader) Read(b []byte) (int, error) { return cr.pr.Read(b) }
func (cr *compressedReader) Close() error {
cr.l.mu.Lock()
defer cr.l.mu.Unlock()
// Close the inner ReadCloser.
if err := cr.closer.Close(); err != nil {
return err
}
diffID, err := v1.NewHash("sha256:" + hex.EncodeToString(cr.h.Sum(nil)))
if err != nil {
return err
}
cr.l.diffID = &diffID
digest, err := v1.NewHash("sha256:" + hex.EncodeToString(cr.zh.Sum(nil)))
if err != nil {
return err
}
cr.l.digest = &digest
cr.l.size = cr.count.n
cr.l.consumed = true
return nil
}
// countWriter counts bytes written to it.
type countWriter struct{ n int64 }
func (c *countWriter) Write(p []byte) (int, error) {
c.n += int64(len(p))
return len(p), nil
}
// multiCloser is a Closer that collects multiple Closers and Closes them in order.
type multiCloser []io.Closer
var _ io.Closer = (multiCloser)(nil)
func newMultiCloser(c ...io.Closer) multiCloser { return multiCloser(c) }
func (m multiCloser) Close() error {
for _, c := range m {
if err := c.Close(); err != nil {
return err
}
}
return nil
}
......@@ -26,7 +26,7 @@ import (
"sync"
"github.com/google/go-containerregistry/pkg/name"
"github.com/google/go-containerregistry/pkg/v1"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/partial"
"github.com/google/go-containerregistry/pkg/v1/types"
"github.com/google/go-containerregistry/pkg/v1/v1util"
......@@ -54,6 +54,7 @@ type compressedImage struct {
var _ partial.UncompressedImageCore = (*uncompressedImage)(nil)
var _ partial.CompressedImageCore = (*compressedImage)(nil)
// Opener is a thunk for opening a tar file.
type Opener func() (io.ReadCloser, error)
func pathOpener(path string) Opener {
......@@ -62,6 +63,7 @@ func pathOpener(path string) Opener {
}
}
// ImageFromPath returns a v1.Image from a tarball located on path.
func ImageFromPath(path string, tag *name.Tag) (v1.Image, error) {
return Image(pathOpener(path), tag)
}
......
......@@ -20,7 +20,7 @@ import (
"io/ioutil"
"os"
"github.com/google/go-containerregistry/pkg/v1"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/v1util"
)
......
......@@ -23,94 +23,134 @@ import (
"os"
"github.com/google/go-containerregistry/pkg/name"
"github.com/google/go-containerregistry/pkg/v1"
v1 "github.com/google/go-containerregistry/pkg/v1"
)
// WriteOptions are used to expose optional information to guide or
// control the image write.
type WriteOptions struct {
// TODO(mattmoor): Whether to store things compressed?
}
// WriteToFile writes in the compressed format to a tarball, on disk.
// This is just syntactic sugar wrapping tarball.Write with a new file.
func WriteToFile(p string, tag name.Tag, img v1.Image, wo *WriteOptions) error {
func WriteToFile(p string, ref name.Reference, img v1.Image) error {
w, err := os.Create(p)
if err != nil {
return err
}
defer w.Close()
return Write(ref, img, w)
}
// MultiWriteToFile writes in the compressed format to a tarball, on disk.
// This is just syntactic sugar wrapping tarball.MultiWrite with a new file.
func MultiWriteToFile(p string, tagToImage map[name.Tag]v1.Image) error {
var refToImage map[name.Reference]v1.Image = make(map[name.Reference]v1.Image, len(tagToImage))
for i, d := range tagToImage {
refToImage[i] = d
}
return MultiRefWriteToFile(p, refToImage)
}
// MultiRefWriteToFile writes in the compressed format to a tarball, on disk.
// This is just syntactic sugar wrapping tarball.MultiRefWrite with a new file.
func MultiRefWriteToFile(p string, refToImage map[name.Reference]v1.Image) error {
w, err := os.Create(p)
if err != nil {
return err
}
defer w.Close()
return Write(tag, img, wo, w)
return MultiRefWrite(refToImage, w)
}
// Write the contents of the image to the provided reader, in the compressed format.
// Write is a wrapper to write a single image and tag to a tarball.
func Write(ref name.Reference, img v1.Image, w io.Writer) error {
return MultiRefWrite(map[name.Reference]v1.Image{ref: img}, w)
}
// MultiWrite writes the contents of each image to the provided reader, in the compressed format.
// The contents are written in the following format:
// One manifest.json file at the top level containing information about several images.
// One file for each layer, named after the layer's SHA.
// One file for the config blob, named after its SHA.
func Write(tag name.Tag, img v1.Image, wo *WriteOptions, w io.Writer) error {
func MultiWrite(tagToImage map[name.Tag]v1.Image, w io.Writer) error {
var refToImage map[name.Reference]v1.Image = make(map[name.Reference]v1.Image, len(tagToImage))
for i, d := range tagToImage {
refToImage[i] = d
}
return MultiRefWrite(refToImage, w)
}
// MultiRefWrite writes the contents of each image to the provided reader, in the compressed format.
// The contents are written in the following format:
// One manifest.json file at the top level containing information about several images.
// One file for each layer, named after the layer's SHA.
// One file for the config blob, named after its SHA.
func MultiRefWrite(refToImage map[name.Reference]v1.Image, w io.Writer) error {
tf := tar.NewWriter(w)
defer tf.Close()
// Write the config.
cfgName, err := img.ConfigName()
if err != nil {
return err
}
cfgBlob, err := img.RawConfigFile()
if err != nil {
return err
}
if err := writeTarEntry(tf, cfgName.String(), bytes.NewReader(cfgBlob), int64(len(cfgBlob))); err != nil {
return err
}
imageToTags := dedupRefToImage(refToImage)
var td tarDescriptor
// Write the layers.
layers, err := img.Layers()
if err != nil {
return err
}
layerFiles := make([]string, len(layers))
for i, l := range layers {
d, err := l.Digest()
for img, tags := range imageToTags {
// Write the config.
cfgName, err := img.ConfigName()
if err != nil {
return err
}
// Munge the file name to appease ancient technology.
//
// tar assumes anything with a colon is a remote tape drive:
// https://www.gnu.org/software/tar/manual/html_section/tar_45.html
// Drop the algorithm prefix, e.g. "sha256:"
hex := d.Hex
// gunzip expects certain file extensions:
// https://www.gnu.org/software/gzip/manual/html_node/Overview.html
layerFiles[i] = fmt.Sprintf("%s.tar.gz", hex)
r, err := l.Compressed()
cfgBlob, err := img.RawConfigFile()
if err != nil {
return err
}
blobSize, err := l.Size()
if err != nil {
if err := writeTarEntry(tf, cfgName.String(), bytes.NewReader(cfgBlob), int64(len(cfgBlob))); err != nil {
return err
}
if err := writeTarEntry(tf, layerFiles[i], r, blobSize); err != nil {
// Write the layers.
layers, err := img.Layers()
if err != nil {
return err
}
}
layerFiles := make([]string, len(layers))
for i, l := range layers {
d, err := l.Digest()
if err != nil {
return err
}
// Munge the file name to appease ancient technology.
//
// tar assumes anything with a colon is a remote tape drive:
// https://www.gnu.org/software/tar/manual/html_section/tar_45.html
// Drop the algorithm prefix, e.g. "sha256:"
hex := d.Hex
// Generate the tar descriptor and write it.
td := tarDescriptor{
singleImageTarDescriptor{
// gunzip expects certain file extensions:
// https://www.gnu.org/software/gzip/manual/html_node/Overview.html
layerFiles[i] = fmt.Sprintf("%s.tar.gz", hex)
r, err := l.Compressed()
if err != nil {
return err
}
blobSize, err := l.Size()
if err != nil {
return err
}
if err := writeTarEntry(tf, layerFiles[i], r, blobSize); err != nil {
return err
}
}
// Generate the tar descriptor and write it.
sitd := singleImageTarDescriptor{
Config: cfgName.String(),
RepoTags: []string{tag.String()},
RepoTags: tags,
Layers: layerFiles,
},
}
td = append(td, sitd)
}
tdBytes, err := json.Marshal(td)
if err != nil {
return err
......@@ -118,6 +158,26 @@ func Write(tag name.Tag, img v1.Image, wo *WriteOptions, w io.Writer) error {
return writeTarEntry(tf, "manifest.json", bytes.NewReader(tdBytes), int64(len(tdBytes)))
}
func dedupRefToImage(refToImage map[name.Reference]v1.Image) map[v1.Image][]string {
imageToTags := make(map[v1.Image][]string)
for ref, img := range refToImage {
if tag, ok := ref.(name.Tag); ok {
if tags, ok := imageToTags[img]; ok && tags != nil {
imageToTags[img] = append(tags, tag.String())
} else {
imageToTags[img] = []string{tag.String()}
}
} else {
if _, ok := imageToTags[img]; !ok {
imageToTags[img] = nil
}
}
}
return imageToTags
}
// write a file to the provided writer with a corresponding tar header
func writeTarEntry(tf *tar.Writer, path string, r io.Reader, size int64) error {
hdr := &tar.Header{
......
// Copyright 2018 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package v1util
import (
"io"
)
func nop() error {
return nil
}
// NopWriteCloser wraps the io.Writer as an io.WriteCloser with a Close() method that does nothing.
func NopWriteCloser(w io.Writer) io.WriteCloser {
return &writeAndCloser{
Writer: w,
CloseFunc: nop,
}
}
// NopReadCloser wraps the io.Reader as an io.ReadCloser with a Close() method that does nothing.
// This is technically redundant with ioutil.NopCloser, but provided for symmetry and clarity.
func NopReadCloser(r io.Reader) io.ReadCloser {
return &readAndCloser{
Reader: r,
CloseFunc: nop,
}
}
......@@ -20,7 +20,7 @@ import (
"hash"
"io"
"github.com/google/go-containerregistry/pkg/v1"
v1 "github.com/google/go-containerregistry/pkg/v1"
)
type verifyReader struct {
......
......@@ -70,56 +70,14 @@ func GunzipReadCloser(r io.ReadCloser) (io.ReadCloser, error) {
}, nil
}
// GzipWriteCloser returns an io.WriteCloser to which uncompressed data may be
// written, and the compressed data is then written to the provided
// io.WriteCloser.
func GzipWriteCloser(w io.WriteCloser) io.WriteCloser {
gw := gzip.NewWriter(w)
return &writeAndCloser{
Writer: gw,
CloseFunc: func() error {
if err := gw.Close(); err != nil {
return err
}
return w.Close()
},
}
}
// gunzipWriteCloser implements io.WriteCloser
// It is used to implement GunzipWriteClose.
type gunzipWriteCloser struct {
*bytes.Buffer
writer io.WriteCloser
}
// Close implements io.WriteCloser
func (gwc *gunzipWriteCloser) Close() error {
// TODO(mattmoor): How to avoid buffering this whole thing into memory?
gr, err := gzip.NewReader(gwc.Buffer)
if err != nil {
return err
}
if _, err := io.Copy(gwc.writer, gr); err != nil {
return err
}
return gwc.writer.Close()
}
// GunzipWriteCloser returns an io.WriteCloser to which compressed data may be
// written, and the uncompressed data is then written to the provided
// io.WriteCloser.
func GunzipWriteCloser(w io.WriteCloser) (io.WriteCloser, error) {
return &gunzipWriteCloser{
Buffer: bytes.NewBuffer(nil),
writer: w,
}, nil
}
// IsGzipped detects whether the input stream is compressed.
func IsGzipped(r io.Reader) (bool, error) {
magicHeader := make([]byte, 2)
if _, err := r.Read(magicHeader); err != nil {
n, err := r.Read(magicHeader)
if n == 0 && err == io.EOF {
return false, nil
}
if err != nil {
return false, err
}
return bytes.Equal(magicHeader, gzipMagicHeader), nil
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册