plugin/blobstore.go
3d86b0c7
 package plugin
 
 import (
7a855799
 	"fmt"
3d86b0c7
 	"io"
 	"io/ioutil"
 	"os"
 	"path/filepath"
 
 	"github.com/docker/docker/distribution/xfer"
 	"github.com/docker/docker/image"
 	"github.com/docker/docker/layer"
 	"github.com/docker/docker/pkg/archive"
6f3f907c
 	"github.com/docker/docker/pkg/chrootarchive"
3d86b0c7
 	"github.com/docker/docker/pkg/progress"
7a855799
 	"github.com/opencontainers/go-digest"
3d86b0c7
 	"github.com/pkg/errors"
1009e6a4
 	"github.com/sirupsen/logrus"
3d86b0c7
 	"golang.org/x/net/context"
 )
 
 type blobstore interface {
 	New() (WriteCommitCloser, error)
 	Get(dgst digest.Digest) (io.ReadCloser, error)
 	Size(dgst digest.Digest) (int64, error)
 }
 
 type basicBlobStore struct {
 	path string
 }
 
 func newBasicBlobStore(p string) (*basicBlobStore, error) {
 	tmpdir := filepath.Join(p, "tmp")
 	if err := os.MkdirAll(tmpdir, 0700); err != nil {
 		return nil, errors.Wrapf(err, "failed to mkdir %v", p)
 	}
 	return &basicBlobStore{path: p}, nil
 }
 
 func (b *basicBlobStore) New() (WriteCommitCloser, error) {
 	f, err := ioutil.TempFile(filepath.Join(b.path, "tmp"), ".insertion")
 	if err != nil {
 		return nil, errors.Wrap(err, "failed to create temp file")
 	}
 	return newInsertion(f), nil
 }
 
 func (b *basicBlobStore) Get(dgst digest.Digest) (io.ReadCloser, error) {
 	return os.Open(filepath.Join(b.path, string(dgst.Algorithm()), dgst.Hex()))
 }
 
 func (b *basicBlobStore) Size(dgst digest.Digest) (int64, error) {
 	stat, err := os.Stat(filepath.Join(b.path, string(dgst.Algorithm()), dgst.Hex()))
 	if err != nil {
 		return 0, err
 	}
 	return stat.Size(), nil
 }
 
 func (b *basicBlobStore) gc(whitelist map[digest.Digest]struct{}) {
 	for _, alg := range []string{string(digest.Canonical)} {
 		items, err := ioutil.ReadDir(filepath.Join(b.path, alg))
 		if err != nil {
 			continue
 		}
 		for _, fi := range items {
 			if _, exists := whitelist[digest.Digest(alg+":"+fi.Name())]; !exists {
 				p := filepath.Join(b.path, alg, fi.Name())
 				err := os.RemoveAll(p)
 				logrus.Debugf("cleaned up blob %v: %v", p, err)
 			}
 		}
 	}
 
 }
 
 // WriteCommitCloser defines object that can be committed to blobstore.
 type WriteCommitCloser interface {
 	io.WriteCloser
 	Commit() (digest.Digest, error)
 }
 
 type insertion struct {
 	io.Writer
 	f        *os.File
 	digester digest.Digester
 	closed   bool
 }
 
 func newInsertion(tempFile *os.File) *insertion {
7a855799
 	digester := digest.Canonical.Digester()
3d86b0c7
 	return &insertion{f: tempFile, digester: digester, Writer: io.MultiWriter(tempFile, digester.Hash())}
 }
 
 func (i *insertion) Commit() (digest.Digest, error) {
 	p := i.f.Name()
 	d := filepath.Join(filepath.Join(p, "../../"))
 	i.f.Sync()
 	defer os.RemoveAll(p)
 	if err := i.f.Close(); err != nil {
 		return "", err
 	}
 	i.closed = true
 	dgst := i.digester.Digest()
 	if err := os.MkdirAll(filepath.Join(d, string(dgst.Algorithm())), 0700); err != nil {
 		return "", errors.Wrapf(err, "failed to mkdir %v", d)
 	}
 	if err := os.Rename(p, filepath.Join(d, string(dgst.Algorithm()), dgst.Hex())); err != nil {
 		return "", errors.Wrapf(err, "failed to rename %v", p)
 	}
 	return dgst, nil
 }
 
 func (i *insertion) Close() error {
 	if i.closed {
 		return nil
 	}
 	defer os.RemoveAll(i.f.Name())
 	return i.f.Close()
 }
 
 type downloadManager struct {
 	blobStore    blobstore
 	tmpDir       string
 	blobs        []digest.Digest
 	configDigest digest.Digest
 }
 
ce8e529e
 func (dm *downloadManager) Download(ctx context.Context, initialRootFS image.RootFS, os string, layers []xfer.DownloadDescriptor, progressOutput progress.Output) (image.RootFS, func(), error) {
3d86b0c7
 	for _, l := range layers {
 		b, err := dm.blobStore.New()
 		if err != nil {
 			return initialRootFS, nil, err
 		}
 		defer b.Close()
 		rc, _, err := l.Download(ctx, progressOutput)
 		if err != nil {
 			return initialRootFS, nil, errors.Wrap(err, "failed to download")
 		}
 		defer rc.Close()
 		r := io.TeeReader(rc, b)
 		inflatedLayerData, err := archive.DecompressStream(r)
 		if err != nil {
 			return initialRootFS, nil, err
 		}
7a855799
 		digester := digest.Canonical.Digester()
6f3f907c
 		if _, err := chrootarchive.ApplyLayer(dm.tmpDir, io.TeeReader(inflatedLayerData, digester.Hash())); err != nil {
3d86b0c7
 			return initialRootFS, nil, err
 		}
 		initialRootFS.Append(layer.DiffID(digester.Digest()))
 		d, err := b.Commit()
 		if err != nil {
 			return initialRootFS, nil, err
 		}
 		dm.blobs = append(dm.blobs, d)
 	}
 	return initialRootFS, nil, nil
 }
 
 func (dm *downloadManager) Put(dt []byte) (digest.Digest, error) {
 	b, err := dm.blobStore.New()
 	if err != nil {
 		return "", err
 	}
 	defer b.Close()
 	n, err := b.Write(dt)
 	if err != nil {
 		return "", err
 	}
 	if n != len(dt) {
 		return "", io.ErrShortWrite
 	}
 	d, err := b.Commit()
 	dm.configDigest = d
 	return d, err
 }
 
 func (dm *downloadManager) Get(d digest.Digest) ([]byte, error) {
7a855799
 	return nil, fmt.Errorf("digest not found")
3d86b0c7
 }
ce8e529e
 func (dm *downloadManager) RootFSAndOSFromConfig(c []byte) (*image.RootFS, string, error) {
3d86b0c7
 	return configToRootFS(c)
 }