500e77ba |
package layer
import (
"compress/gzip"
"errors"
"fmt"
"io"
"os"
|
7a855799 |
"github.com/opencontainers/go-digest" |
1009e6a4 |
"github.com/sirupsen/logrus" |
500e77ba |
"github.com/vbatts/tar-split/tar/asm"
"github.com/vbatts/tar-split/tar/storage"
)
|
d04fa49a |
// CreateRWLayerByGraphID creates a RWLayer in the layer store using
// the provided name with the given graphID. To get the RWLayer
// after migration the layer may be retrieved by the given name. |
afd305c4 |
func (ls *layerStore) CreateRWLayerByGraphID(name, graphID string, parent ChainID) (err error) { |
500e77ba |
ls.mountL.Lock()
defer ls.mountL.Unlock()
m, ok := ls.mounts[name]
if ok {
if m.parent.chainID != parent { |
d04fa49a |
return errors.New("name conflict, mismatched parent") |
500e77ba |
}
if m.mountID != graphID { |
d04fa49a |
return errors.New("mount already exists") |
500e77ba |
}
|
d04fa49a |
return nil |
500e77ba |
}
|
afd305c4 |
if !ls.driver.Exists(graphID) { |
55080fc0 |
return fmt.Errorf("graph ID does not exist: %q", graphID) |
500e77ba |
}
var p *roLayer
if string(parent) != "" { |
cbf55b92 |
p = ls.get(parent) |
500e77ba |
if p == nil { |
d04fa49a |
return ErrLayerDoesNotExist |
500e77ba |
}
// Release parent chain if error
defer func() {
if err != nil {
ls.layerL.Lock()
ls.releaseLayer(p)
ls.layerL.Unlock()
}
}()
}
// TODO: Ensure graphID has correct parent
m = &mountedLayer{
name: name,
parent: p,
mountID: graphID,
layerStore: ls, |
d04fa49a |
references: map[RWLayer]*referencedRWLayer{}, |
500e77ba |
}
// Check for existing init layer
initID := fmt.Sprintf("%s-init", graphID) |
afd305c4 |
if ls.driver.Exists(initID) { |
500e77ba |
m.initID = initID
}
|
b4a63139 |
return ls.saveMount(m) |
500e77ba |
}
|
a8f88ef4 |
func (ls *layerStore) ChecksumForGraphID(id, parent, oldTarDataPath, newTarDataPath string) (diffID DiffID, size int64, err error) {
defer func() { |
500e77ba |
if err != nil { |
a8f88ef4 |
logrus.Debugf("could not get checksum for %q with tar-split: %q", id, err)
diffID, size, err = ls.checksumForGraphIDNoTarsplit(id, parent, newTarDataPath) |
500e77ba |
} |
a8f88ef4 |
}() |
500e77ba |
|
a8f88ef4 |
if oldTarDataPath == "" {
err = errors.New("no tar-split file")
return
} |
500e77ba |
|
a8f88ef4 |
tarDataFile, err := os.Open(oldTarDataPath)
if err != nil {
return
}
defer tarDataFile.Close()
uncompressed, err := gzip.NewReader(tarDataFile)
if err != nil {
return
} |
500e77ba |
|
7a855799 |
dgst := digest.Canonical.Digester() |
afd305c4 |
err = ls.assembleTarTo(id, uncompressed, &size, dgst.Hash()) |
a8f88ef4 |
if err != nil {
return
} |
500e77ba |
|
a8f88ef4 |
diffID = DiffID(dgst.Digest())
err = os.RemoveAll(newTarDataPath)
if err != nil {
return
}
err = os.Link(oldTarDataPath, newTarDataPath) |
500e77ba |
|
a8f88ef4 |
return
} |
500e77ba |
|
a8f88ef4 |
func (ls *layerStore) checksumForGraphIDNoTarsplit(id, parent, newTarDataPath string) (diffID DiffID, size int64, err error) { |
afd305c4 |
rawarchive, err := ls.driver.Diff(id, parent) |
a8f88ef4 |
if err != nil {
return |
500e77ba |
} |
a8f88ef4 |
defer rawarchive.Close() |
500e77ba |
|
a8f88ef4 |
f, err := os.Create(newTarDataPath) |
500e77ba |
if err != nil { |
a8f88ef4 |
return |
500e77ba |
} |
a8f88ef4 |
defer f.Close()
mfz := gzip.NewWriter(f) |
1c05c65f |
defer mfz.Close() |
a8f88ef4 |
metaPacker := storage.NewJSONPacker(mfz) |
500e77ba |
|
a8f88ef4 |
packerCounter := &packSizeCounter{metaPacker, &size} |
500e77ba |
|
a8f88ef4 |
archive, err := asm.NewInputTarStream(rawarchive, packerCounter, nil)
if err != nil {
return
}
dgst, err := digest.FromReader(archive)
if err != nil {
return
}
diffID = DiffID(dgst)
return |
500e77ba |
}
|
a8f88ef4 |
func (ls *layerStore) RegisterByGraphID(graphID string, parent ChainID, diffID DiffID, tarDataFile string, size int64) (Layer, error) { |
500e77ba |
// err is used to hold the error which will always trigger
// cleanup of creates sources but may not be an error returned
// to the caller (already exists).
var err error
var p *roLayer
if string(parent) != "" {
p = ls.get(parent)
if p == nil {
return nil, ErrLayerDoesNotExist
}
// Release parent chain if error
defer func() {
if err != nil {
ls.layerL.Lock()
ls.releaseLayer(p)
ls.layerL.Unlock()
}
}()
}
// Create new roLayer
layer := &roLayer{
parent: p,
cacheID: graphID,
referenceCount: 1,
layerStore: ls,
references: map[Layer]struct{}{}, |
a8f88ef4 |
diffID: diffID,
size: size,
chainID: createChainIDFromParent(parent, diffID),
}
ls.layerL.Lock()
defer ls.layerL.Unlock()
if existingLayer := ls.getWithoutLock(layer.chainID); existingLayer != nil {
// Set error for cleanup, but do not return
err = errors.New("layer already exists")
return existingLayer.getReference(), nil |
500e77ba |
}
tx, err := ls.store.StartTransaction()
if err != nil {
return nil, err
}
defer func() {
if err != nil {
logrus.Debugf("Cleaning up transaction after failed migration for %s: %v", graphID, err)
if err := tx.Cancel(); err != nil {
logrus.Errorf("Error canceling metadata transaction %q: %s", tx.String(), err)
}
}
}()
|
a8f88ef4 |
tsw, err := tx.TarSplitWriter(false)
if err != nil { |
500e77ba |
return nil, err
} |
a8f88ef4 |
defer tsw.Close()
tdf, err := os.Open(tarDataFile)
if err != nil {
return nil, err
}
defer tdf.Close()
_, err = io.Copy(tsw, tdf)
if err != nil { |
500e77ba |
return nil, err
}
|
a8f88ef4 |
if err = storeLayer(tx, layer); err != nil {
return nil, err |
500e77ba |
}
if err = tx.Commit(layer.chainID); err != nil {
return nil, err
}
ls.layerMap[layer.chainID] = layer
return layer.getReference(), nil
}
type unpackSizeCounter struct {
unpacker storage.Unpacker
size *int64
}
func (u *unpackSizeCounter) Next() (*storage.Entry, error) {
e, err := u.unpacker.Next()
if err == nil && u.size != nil {
*u.size += e.Size
}
return e, err
}
type packSizeCounter struct {
packer storage.Packer
size *int64
}
func (p *packSizeCounter) AddEntry(e storage.Entry) (int, error) {
n, err := p.packer.AddEntry(e)
if err == nil && p.size != nil {
*p.size += e.Size
}
return n, err
} |