500e77ba |
package layer
import (
"errors"
"fmt"
"io"
"io/ioutil"
"sync"
|
05bd0435 |
"github.com/docker/distribution" |
500e77ba |
"github.com/docker/docker/daemon/graphdriver" |
f5916b10 |
"github.com/docker/docker/pkg/idtools" |
a98be034 |
"github.com/docker/docker/pkg/plugingetter" |
500e77ba |
"github.com/docker/docker/pkg/stringid" |
87abf34a |
"github.com/docker/docker/pkg/system" |
7a855799 |
"github.com/opencontainers/go-digest" |
1009e6a4 |
"github.com/sirupsen/logrus" |
500e77ba |
"github.com/vbatts/tar-split/tar/asm"
"github.com/vbatts/tar-split/tar/storage"
)
// maxLayerDepth represents the maximum number of
// layers which can be chained together. 125 was
// chosen to account for the 127 max in some
// graphdrivers plus the 2 additional layers
// used to create a rwlayer.
const maxLayerDepth = 125
type layerStore struct { |
ce8e529e |
store MetadataStore |
afd305c4 |
driver graphdriver.Driver
useTarSplit bool |
500e77ba |
layerMap map[ChainID]*roLayer
layerL sync.Mutex
mounts map[string]*mountedLayer
mountL sync.Mutex |
afd305c4 |
os string |
500e77ba |
}
|
f5916b10 |
// StoreOptions are the options used to create a new Store instance
type StoreOptions struct { |
ce8e529e |
Root string |
f5916b10 |
MetadataStorePathTemplate string |
afd305c4 |
GraphDriver string |
f5916b10 |
GraphDriverOptions []string |
09cd96c5 |
IDMappings *idtools.IDMappings |
a98be034 |
PluginGetter plugingetter.PluginGetter |
677fa036 |
ExperimentalEnabled bool |
afd305c4 |
OS string |
f5916b10 |
}
// NewStoreFromOptions creates a new Store instance
func NewStoreFromOptions(options StoreOptions) (Store, error) { |
afd305c4 |
driver, err := graphdriver.New(options.GraphDriver, options.PluginGetter, graphdriver.Options{
Root: options.Root,
DriverOptions: options.GraphDriverOptions,
UIDMaps: options.IDMappings.UIDs(),
GIDMaps: options.IDMappings.GIDs(),
ExperimentalEnabled: options.ExperimentalEnabled,
})
if err != nil {
return nil, fmt.Errorf("error initializing graphdriver: %v", err) |
f5916b10 |
} |
afd305c4 |
logrus.Debugf("Initialized graph driver %s", driver) |
f5916b10 |
|
afd305c4 |
fms, err := NewFSMetadataStore(fmt.Sprintf(options.MetadataStorePathTemplate, driver)) |
f5916b10 |
if err != nil {
return nil, err
}
|
afd305c4 |
return NewStoreFromGraphDriver(fms, driver, options.OS) |
f5916b10 |
}
|
afd305c4 |
// NewStoreFromGraphDriver creates a new Store instance using the provided
// metadata store and graph driver. The metadata store will be used to restore |
500e77ba |
// the Store. |
afd305c4 |
func NewStoreFromGraphDriver(store MetadataStore, driver graphdriver.Driver, os string) (Store, error) {
if !system.IsOSSupported(os) {
return nil, fmt.Errorf("failed to initialize layer store as operating system '%s' is not supported", os)
}
caps := graphdriver.Capabilities{}
if capDriver, ok := driver.(graphdriver.CapabilityDriver); ok {
caps = capDriver.Capabilities() |
aa96c317 |
}
|
500e77ba |
ls := &layerStore{ |
aa96c317 |
store: store, |
afd305c4 |
driver: driver, |
aa96c317 |
layerMap: map[ChainID]*roLayer{},
mounts: map[string]*mountedLayer{}, |
afd305c4 |
useTarSplit: !caps.ReproducesExactDiffs,
os: os, |
500e77ba |
}
ids, mounts, err := store.List()
if err != nil {
return nil, err
}
for _, id := range ids {
l, err := ls.loadLayer(id)
if err != nil {
logrus.Debugf("Failed to load layer %s: %s", id, err) |
caef48f4 |
continue |
500e77ba |
}
if l.parent != nil {
l.parent.referenceCount++
}
}
for _, mount := range mounts {
if err := ls.loadMount(mount); err != nil {
logrus.Debugf("Failed to load mount %s: %s", mount, err)
}
}
return ls, nil
}
func (ls *layerStore) loadLayer(layer ChainID) (*roLayer, error) {
cl, ok := ls.layerMap[layer]
if ok {
return cl, nil
}
diff, err := ls.store.GetDiffID(layer)
if err != nil { |
caef48f4 |
return nil, fmt.Errorf("failed to get diff id for %s: %s", layer, err) |
500e77ba |
}
size, err := ls.store.GetSize(layer)
if err != nil { |
caef48f4 |
return nil, fmt.Errorf("failed to get size for %s: %s", layer, err) |
500e77ba |
}
cacheID, err := ls.store.GetCacheID(layer)
if err != nil { |
caef48f4 |
return nil, fmt.Errorf("failed to get cache id for %s: %s", layer, err) |
500e77ba |
}
parent, err := ls.store.GetParent(layer)
if err != nil { |
caef48f4 |
return nil, fmt.Errorf("failed to get parent for %s: %s", layer, err) |
500e77ba |
}
|
2c60430a |
descriptor, err := ls.store.GetDescriptor(layer)
if err != nil {
return nil, fmt.Errorf("failed to get descriptor for %s: %s", layer, err)
}
|
0cba7740 |
os, err := ls.store.getOS(layer) |
fc21bf28 |
if err != nil { |
0380fbff |
return nil, fmt.Errorf("failed to get operating system for %s: %s", layer, err) |
fc21bf28 |
}
|
afd305c4 |
if os != ls.os {
return nil, fmt.Errorf("failed to load layer with os %s into layerstore for %s", os, ls.os)
}
|
500e77ba |
cl = &roLayer{
chainID: layer,
diffID: diff,
size: size,
cacheID: cacheID,
layerStore: ls,
references: map[Layer]struct{}{}, |
2c60430a |
descriptor: descriptor, |
05bd0435 |
}
|
500e77ba |
if parent != "" {
p, err := ls.loadLayer(parent)
if err != nil {
return nil, err
}
cl.parent = p
}
ls.layerMap[cl.chainID] = cl
return cl, nil
}
func (ls *layerStore) loadMount(mount string) error {
if _, ok := ls.mounts[mount]; ok {
return nil
}
mountID, err := ls.store.GetMountID(mount)
if err != nil {
return err
}
initID, err := ls.store.GetInitID(mount)
if err != nil {
return err
}
parent, err := ls.store.GetMountParent(mount)
if err != nil {
return err
}
ml := &mountedLayer{
name: mount,
mountID: mountID,
initID: initID,
layerStore: ls, |
d04fa49a |
references: map[RWLayer]*referencedRWLayer{}, |
500e77ba |
}
if parent != "" {
p, err := ls.loadLayer(parent)
if err != nil {
return err
}
ml.parent = p
p.referenceCount++
}
ls.mounts[ml.name] = ml
return nil
}
func (ls *layerStore) applyTar(tx MetadataTransaction, ts io.Reader, parent string, layer *roLayer) error { |
7a855799 |
digester := digest.Canonical.Digester() |
500e77ba |
tr := io.TeeReader(ts, digester.Hash())
|
aa96c317 |
rdr := tr |
afd305c4 |
if ls.useTarSplit { |
aa96c317 |
tsw, err := tx.TarSplitWriter(true)
if err != nil {
return err
}
metaPacker := storage.NewJSONPacker(tsw)
defer tsw.Close() |
500e77ba |
|
aa96c317 |
// we're passing nil here for the file putter, because the ApplyDiff will
// handle the extraction of the archive
rdr, err = asm.NewInputTarStream(tr, metaPacker, nil)
if err != nil {
return err
} |
500e77ba |
}
|
afd305c4 |
applySize, err := ls.driver.ApplyDiff(layer.cacheID, parent, rdr) |
500e77ba |
if err != nil {
return err
}
// Discard trailing data but ensure metadata is picked up to reconstruct stream
io.Copy(ioutil.Discard, rdr) // ignore error as reader may be closed
layer.size = applySize
layer.diffID = DiffID(digester.Digest())
logrus.Debugf("Applied tar %s to %s, size: %d", layer.diffID, layer.cacheID, applySize)
return nil
}
|
afd305c4 |
func (ls *layerStore) Register(ts io.Reader, parent ChainID) (Layer, error) {
return ls.registerWithDescriptor(ts, parent, distribution.Descriptor{}) |
05bd0435 |
}
|
afd305c4 |
func (ls *layerStore) registerWithDescriptor(ts io.Reader, parent ChainID, descriptor distribution.Descriptor) (Layer, error) { |
500e77ba |
// err is used to hold the error which will always trigger
// cleanup of creates sources but may not be an error returned
// to the caller (already exists).
var err error
var pid string
var p *roLayer |
87abf34a |
|
500e77ba |
if string(parent) != "" {
p = ls.get(parent)
if p == nil {
return nil, ErrLayerDoesNotExist
}
pid = p.cacheID
// Release parent chain if error
defer func() {
if err != nil {
ls.layerL.Lock()
ls.releaseLayer(p)
ls.layerL.Unlock()
}
}()
if p.depth() >= maxLayerDepth {
err = ErrMaxDepthExceeded
return nil, err
}
}
// Create new roLayer
layer := &roLayer{
parent: p,
cacheID: stringid.GenerateRandomID(),
referenceCount: 1,
layerStore: ls,
references: map[Layer]struct{}{}, |
2c60430a |
descriptor: descriptor, |
500e77ba |
}
|
afd305c4 |
if err = ls.driver.Create(layer.cacheID, pid, nil); err != nil { |
500e77ba |
return nil, err
}
tx, err := ls.store.StartTransaction()
if err != nil {
return nil, err
}
defer func() {
if err != nil {
logrus.Debugf("Cleaning up layer %s: %v", layer.cacheID, err) |
afd305c4 |
if err := ls.driver.Remove(layer.cacheID); err != nil { |
500e77ba |
logrus.Errorf("Error cleaning up cache layer %s: %v", layer.cacheID, err)
}
if err := tx.Cancel(); err != nil {
logrus.Errorf("Error canceling metadata transaction %q: %s", tx.String(), err)
}
}
}()
if err = ls.applyTar(tx, ts, pid, layer); err != nil {
return nil, err
}
if layer.parent == nil {
layer.chainID = ChainID(layer.diffID)
} else {
layer.chainID = createChainIDFromParent(layer.parent.chainID, layer.diffID)
}
if err = storeLayer(tx, layer); err != nil {
return nil, err
}
ls.layerL.Lock()
defer ls.layerL.Unlock()
|
cbf55b92 |
if existingLayer := ls.getWithoutLock(layer.chainID); existingLayer != nil { |
500e77ba |
// Set error for cleanup, but do not return the error
err = errors.New("layer already exists")
return existingLayer.getReference(), nil
}
if err = tx.Commit(layer.chainID); err != nil {
return nil, err
}
ls.layerMap[layer.chainID] = layer
return layer.getReference(), nil
}
|
cbf55b92 |
func (ls *layerStore) getWithoutLock(layer ChainID) *roLayer {
l, ok := ls.layerMap[layer] |
500e77ba |
if !ok {
return nil
}
|
cbf55b92 |
l.referenceCount++
return l
} |
500e77ba |
|
cbf55b92 |
func (ls *layerStore) get(l ChainID) *roLayer {
ls.layerL.Lock()
defer ls.layerL.Unlock()
return ls.getWithoutLock(l) |
500e77ba |
}
func (ls *layerStore) Get(l ChainID) (Layer, error) { |
0538981c |
ls.layerL.Lock()
defer ls.layerL.Unlock()
layer := ls.getWithoutLock(l) |
500e77ba |
if layer == nil {
return nil, ErrLayerDoesNotExist
}
return layer.getReference(), nil
}
|
148aef91 |
func (ls *layerStore) Map() map[ChainID]Layer {
ls.layerL.Lock()
defer ls.layerL.Unlock()
layers := map[ChainID]Layer{}
for k, v := range ls.layerMap {
layers[k] = v
}
return layers
}
|
500e77ba |
func (ls *layerStore) deleteLayer(layer *roLayer, metadata *Metadata) error { |
afd305c4 |
err := ls.driver.Remove(layer.cacheID) |
500e77ba |
if err != nil {
return err
}
err = ls.store.Remove(layer.chainID)
if err != nil {
return err
}
metadata.DiffID = layer.diffID
metadata.ChainID = layer.chainID
metadata.Size, err = layer.Size()
if err != nil {
return err
}
metadata.DiffSize = layer.size
return nil
}
|
a4d76853 |
func (ls *layerStore) releaseLayer(l *roLayer) ([]Metadata, error) {
depth := 0
removed := []Metadata{}
for {
if l.referenceCount == 0 {
panic("layer not retained")
}
l.referenceCount--
if l.referenceCount != 0 {
return removed, nil
} |
500e77ba |
|
a4d76853 |
if len(removed) == 0 && depth > 0 {
panic("cannot remove layer with child")
}
if l.hasReferences() {
panic("cannot delete referenced layer")
}
var metadata Metadata
if err := ls.deleteLayer(l, &metadata); err != nil {
return nil, err
} |
500e77ba |
|
a4d76853 |
delete(ls.layerMap, l.chainID)
removed = append(removed, metadata) |
500e77ba |
|
a4d76853 |
if l.parent == nil {
return removed, nil |
500e77ba |
}
|
a4d76853 |
depth++
l = l.parent
} |
500e77ba |
}
func (ls *layerStore) Release(l Layer) ([]Metadata, error) {
ls.layerL.Lock()
defer ls.layerL.Unlock()
layer, ok := ls.layerMap[l.ChainID()]
if !ok {
return []Metadata{}, nil
}
if !layer.hasReference(l) {
return nil, ErrLayerNotRetained
}
layer.deleteReference(l)
return ls.releaseLayer(layer)
}
|
afd305c4 |
func (ls *layerStore) CreateRWLayer(name string, parent ChainID, opts *CreateRWLayerOpts) (RWLayer, error) { |
f7f3d342 |
var (
storageOpt map[string]string
initFunc MountInit
mountLabel string
)
if opts != nil {
mountLabel = opts.MountLabel
storageOpt = opts.StorageOpt
initFunc = opts.InitFunc
}
|
500e77ba |
ls.mountL.Lock()
defer ls.mountL.Unlock()
m, ok := ls.mounts[name]
if ok { |
d04fa49a |
return nil, ErrMountNameConflict |
500e77ba |
}
|
d04fa49a |
var err error |
500e77ba |
var pid string
var p *roLayer
if string(parent) != "" { |
cbf55b92 |
p = ls.get(parent) |
500e77ba |
if p == nil {
return nil, ErrLayerDoesNotExist
}
pid = p.cacheID
// Release parent chain if error
defer func() {
if err != nil {
ls.layerL.Lock()
ls.releaseLayer(p)
ls.layerL.Unlock()
}
}()
}
m = &mountedLayer{
name: name,
parent: p, |
d04fa49a |
mountID: ls.mountID(name), |
500e77ba |
layerStore: ls, |
d04fa49a |
references: map[RWLayer]*referencedRWLayer{}, |
500e77ba |
}
if initFunc != nil { |
afd305c4 |
pid, err = ls.initMount(m.mountID, pid, mountLabel, initFunc, storageOpt) |
500e77ba |
if err != nil {
return nil, err
}
m.initID = pid
}
|
b937aa8e |
createOpts := &graphdriver.CreateOpts{
StorageOpt: storageOpt,
}
|
afd305c4 |
if err = ls.driver.CreateReadWrite(m.mountID, pid, createOpts); err != nil { |
500e77ba |
return nil, err
}
if err = ls.saveMount(m); err != nil {
return nil, err
}
|
d04fa49a |
return m.getReference(), nil |
500e77ba |
}
|
d04fa49a |
func (ls *layerStore) GetRWLayer(id string) (RWLayer, error) { |
500e77ba |
ls.mountL.Lock()
defer ls.mountL.Unlock() |
d04fa49a |
mount, ok := ls.mounts[id]
if !ok {
return nil, ErrMountDoesNotExist |
500e77ba |
}
|
d04fa49a |
return mount.getReference(), nil |
500e77ba |
}
|
9c4570a9 |
func (ls *layerStore) GetMountID(id string) (string, error) {
ls.mountL.Lock()
defer ls.mountL.Unlock()
mount, ok := ls.mounts[id]
if !ok {
return "", ErrMountDoesNotExist
} |
511a7058 |
logrus.Debugf("GetMountID id: %s -> mountID: %s", id, mount.mountID) |
9c4570a9 |
return mount.mountID, nil
}
|
d04fa49a |
func (ls *layerStore) ReleaseRWLayer(l RWLayer) ([]Metadata, error) { |
500e77ba |
ls.mountL.Lock()
defer ls.mountL.Unlock() |
d04fa49a |
m, ok := ls.mounts[l.Name()]
if !ok {
return []Metadata{}, nil |
500e77ba |
} |
d04fa49a |
if err := m.deleteReference(l); err != nil {
return nil, err |
500e77ba |
}
|
d04fa49a |
if m.hasReferences() {
return []Metadata{}, nil
} |
500e77ba |
|
afd305c4 |
if err := ls.driver.Remove(m.mountID); err != nil { |
500e77ba |
logrus.Errorf("Error removing mounted layer %s: %s", m.name, err) |
64530c8e |
m.retakeReference(l) |
500e77ba |
return nil, err
}
if m.initID != "" { |
afd305c4 |
if err := ls.driver.Remove(m.initID); err != nil { |
500e77ba |
logrus.Errorf("Error removing init layer %s: %s", m.name, err) |
64530c8e |
m.retakeReference(l) |
500e77ba |
return nil, err
}
}
if err := ls.store.RemoveMount(m.name); err != nil {
logrus.Errorf("Error removing mount metadata: %s: %s", m.name, err) |
64530c8e |
m.retakeReference(l) |
500e77ba |
return nil, err
}
|
d04fa49a |
delete(ls.mounts, m.Name())
|
500e77ba |
ls.layerL.Lock()
defer ls.layerL.Unlock()
if m.parent != nil {
return ls.releaseLayer(m.parent)
}
return []Metadata{}, nil
}
|
d04fa49a |
func (ls *layerStore) saveMount(mount *mountedLayer) error {
if err := ls.store.SetMountID(mount.name, mount.mountID); err != nil {
return err |
500e77ba |
} |
d04fa49a |
if mount.initID != "" {
if err := ls.store.SetInitID(mount.name, mount.initID); err != nil {
return err
} |
500e77ba |
} |
d04fa49a |
if mount.parent != nil {
if err := ls.store.SetMountParent(mount.name, mount.parent.chainID); err != nil {
return err
}
}
ls.mounts[mount.name] = mount
return nil
}
|
afd305c4 |
func (ls *layerStore) initMount(graphID, parent, mountLabel string, initFunc MountInit, storageOpt map[string]string) (string, error) { |
d04fa49a |
// Use "<graph-id>-init" to maintain compatibility with graph drivers
// which are expecting this layer with this special name. If all
// graph drivers can be updated to not rely on knowing about this layer
// then the initID should be randomly generated.
initID := fmt.Sprintf("%s-init", graphID)
|
b937aa8e |
createOpts := &graphdriver.CreateOpts{
MountLabel: mountLabel,
StorageOpt: storageOpt,
}
|
afd305c4 |
if err := ls.driver.CreateReadWrite(initID, parent, createOpts); err != nil { |
d04fa49a |
return "", err
} |
afd305c4 |
p, err := ls.driver.Get(initID, "") |
d04fa49a |
if err != nil {
return "", err
}
if err := initFunc(p); err != nil { |
afd305c4 |
ls.driver.Put(initID) |
d04fa49a |
return "", err
}
|
afd305c4 |
if err := ls.driver.Put(initID); err != nil { |
d04fa49a |
return "", err
}
return initID, nil |
500e77ba |
}
|
aa96c317 |
func (ls *layerStore) getTarStream(rl *roLayer) (io.ReadCloser, error) { |
afd305c4 |
if !ls.useTarSplit { |
aa96c317 |
var parentCacheID string
if rl.parent != nil {
parentCacheID = rl.parent.cacheID
}
|
afd305c4 |
return ls.driver.Diff(rl.cacheID, parentCacheID) |
aa96c317 |
}
r, err := ls.store.TarSplitReader(rl.chainID)
if err != nil {
return nil, err
}
pr, pw := io.Pipe()
go func() { |
afd305c4 |
err := ls.assembleTarTo(rl.cacheID, r, nil, pw) |
aa96c317 |
if err != nil {
pw.CloseWithError(err)
} else {
pw.Close()
}
}()
return pr, nil
}
|
afd305c4 |
func (ls *layerStore) assembleTarTo(graphID string, metadata io.ReadCloser, size *int64, w io.Writer) error {
diffDriver, ok := ls.driver.(graphdriver.DiffGetterDriver) |
500e77ba |
if !ok { |
afd305c4 |
diffDriver = &naiveDiffPathDriver{ls.driver} |
500e77ba |
}
|
a8f88ef4 |
defer metadata.Close()
|
500e77ba |
// get our relative path to the container |
58bec40d |
fileGetCloser, err := diffDriver.DiffGetter(graphID) |
500e77ba |
if err != nil { |
a8f88ef4 |
return err |
500e77ba |
} |
58bec40d |
defer fileGetCloser.Close() |
500e77ba |
|
a8f88ef4 |
metaUnpacker := storage.NewJSONUnpacker(metadata)
upackerCounter := &unpackSizeCounter{metaUnpacker, size} |
58bec40d |
logrus.Debugf("Assembling tar data for %s", graphID)
return asm.WriteOutputTarStream(fileGetCloser, upackerCounter, w) |
500e77ba |
}
|
f5916b10 |
func (ls *layerStore) Cleanup() error { |
afd305c4 |
return ls.driver.Cleanup() |
f5916b10 |
}
|
afd305c4 |
func (ls *layerStore) DriverStatus() [][2]string {
return ls.driver.Status() |
f5916b10 |
}
|
afd305c4 |
func (ls *layerStore) DriverName() string {
return ls.driver.String()
}
|
500e77ba |
type naiveDiffPathDriver struct {
graphdriver.Driver
}
|
58bec40d |
type fileGetPutter struct {
storage.FileGetter
driver graphdriver.Driver
id string
}
func (w *fileGetPutter) Close() error {
return w.driver.Put(w.id)
}
func (n *naiveDiffPathDriver) DiffGetter(id string) (graphdriver.FileGetCloser, error) { |
500e77ba |
p, err := n.Driver.Get(id, "")
if err != nil { |
58bec40d |
return nil, err |
500e77ba |
} |
7a7357da |
return &fileGetPutter{storage.NewPathFileGetter(p.Path()), n.Driver, id}, nil |
500e77ba |
} |