For optimizations.
https://github.com/docker/docker/issues/18370#issuecomment-161399901
Signed-off-by: Vincent Batts <vbatts@redhat.com>
| ... | ... |
@@ -41,7 +41,7 @@ clone git github.com/boltdb/bolt v1.1.0 |
| 41 | 41 |
|
| 42 | 42 |
# get graph and distribution packages |
| 43 | 43 |
clone git github.com/docker/distribution c6c9194e9c6097f84b0ff468a741086ff7704aa3 |
| 44 |
-clone git github.com/vbatts/tar-split v0.9.10 |
|
| 44 |
+clone git github.com/vbatts/tar-split v0.9.11 |
|
| 45 | 45 |
|
| 46 | 46 |
clone git github.com/docker/notary 45de2828b5e0083bfb4e9a5a781eddb05e2ef9d0 |
| 47 | 47 |
clone git google.golang.org/grpc 174192fc93efcb188fc8f46ca447f0da606b6885 https://github.com/grpc/grpc-go.git |
| ... | ... |
@@ -3,8 +3,10 @@ package asm |
| 3 | 3 |
import ( |
| 4 | 4 |
"bytes" |
| 5 | 5 |
"fmt" |
| 6 |
+ "hash" |
|
| 6 | 7 |
"hash/crc64" |
| 7 | 8 |
"io" |
| 9 |
+ "sync" |
|
| 8 | 10 |
|
| 9 | 11 |
"github.com/vbatts/tar-split/tar/storage" |
| 10 | 12 |
) |
| ... | ... |
@@ -23,45 +25,106 @@ func NewOutputTarStream(fg storage.FileGetter, up storage.Unpacker) io.ReadClose |
| 23 | 23 |
} |
| 24 | 24 |
pr, pw := io.Pipe() |
| 25 | 25 |
go func() {
|
| 26 |
- for {
|
|
| 27 |
- entry, err := up.Next() |
|
| 26 |
+ err := WriteOutputTarStream(fg, up, pw) |
|
| 27 |
+ if err != nil {
|
|
| 28 |
+ pw.CloseWithError(err) |
|
| 29 |
+ } else {
|
|
| 30 |
+ pw.Close() |
|
| 31 |
+ } |
|
| 32 |
+ }() |
|
| 33 |
+ return pr |
|
| 34 |
+} |
|
| 35 |
+ |
|
| 36 |
+// WriteOutputTarStream writes assembled tar archive to a writer. |
|
| 37 |
+func WriteOutputTarStream(fg storage.FileGetter, up storage.Unpacker, w io.Writer) error {
|
|
| 38 |
+ // ... Since these are interfaces, this is possible, so let's not have a nil pointer |
|
| 39 |
+ if fg == nil || up == nil {
|
|
| 40 |
+ return nil |
|
| 41 |
+ } |
|
| 42 |
+ var copyBuffer []byte |
|
| 43 |
+ var crcHash hash.Hash |
|
| 44 |
+ var crcSum []byte |
|
| 45 |
+ var multiWriter io.Writer |
|
| 46 |
+ for {
|
|
| 47 |
+ entry, err := up.Next() |
|
| 48 |
+ if err != nil {
|
|
| 49 |
+ if err == io.EOF {
|
|
| 50 |
+ return nil |
|
| 51 |
+ } |
|
| 52 |
+ return err |
|
| 53 |
+ } |
|
| 54 |
+ switch entry.Type {
|
|
| 55 |
+ case storage.SegmentType: |
|
| 56 |
+ if _, err := w.Write(entry.Payload); err != nil {
|
|
| 57 |
+ return err |
|
| 58 |
+ } |
|
| 59 |
+ case storage.FileType: |
|
| 60 |
+ if entry.Size == 0 {
|
|
| 61 |
+ continue |
|
| 62 |
+ } |
|
| 63 |
+ fh, err := fg.Get(entry.GetName()) |
|
| 28 | 64 |
if err != nil {
|
| 29 |
- pw.CloseWithError(err) |
|
| 30 |
- return |
|
| 65 |
+ return err |
|
| 66 |
+ } |
|
| 67 |
+ if crcHash == nil {
|
|
| 68 |
+ crcHash = crc64.New(storage.CRCTable) |
|
| 69 |
+ crcSum = make([]byte, 8) |
|
| 70 |
+ multiWriter = io.MultiWriter(w, crcHash) |
|
| 71 |
+ copyBuffer = byteBufferPool.Get().([]byte) |
|
| 72 |
+ defer byteBufferPool.Put(copyBuffer) |
|
| 73 |
+ } else {
|
|
| 74 |
+ crcHash.Reset() |
|
| 75 |
+ } |
|
| 76 |
+ |
|
| 77 |
+ if _, err := copyWithBuffer(multiWriter, fh, copyBuffer); err != nil {
|
|
| 78 |
+ fh.Close() |
|
| 79 |
+ return err |
|
| 31 | 80 |
} |
| 32 |
- switch entry.Type {
|
|
| 33 |
- case storage.SegmentType: |
|
| 34 |
- if _, err := pw.Write(entry.Payload); err != nil {
|
|
| 35 |
- pw.CloseWithError(err) |
|
| 36 |
- return |
|
| 37 |
- } |
|
| 38 |
- case storage.FileType: |
|
| 39 |
- if entry.Size == 0 {
|
|
| 40 |
- continue |
|
| 41 |
- } |
|
| 42 |
- fh, err := fg.Get(entry.GetName()) |
|
| 43 |
- if err != nil {
|
|
| 44 |
- pw.CloseWithError(err) |
|
| 45 |
- return |
|
| 46 |
- } |
|
| 47 |
- c := crc64.New(storage.CRCTable) |
|
| 48 |
- tRdr := io.TeeReader(fh, c) |
|
| 49 |
- if _, err := io.Copy(pw, tRdr); err != nil {
|
|
| 50 |
- fh.Close() |
|
| 51 |
- pw.CloseWithError(err) |
|
| 52 |
- return |
|
| 53 |
- } |
|
| 54 |
- if !bytes.Equal(c.Sum(nil), entry.Payload) {
|
|
| 55 |
- // I would rather this be a comparable ErrInvalidChecksum or such, |
|
| 56 |
- // but since it's coming through the PipeReader, the context of |
|
| 57 |
- // _which_ file would be lost... |
|
| 58 |
- fh.Close() |
|
| 59 |
- pw.CloseWithError(fmt.Errorf("file integrity checksum failed for %q", entry.GetName()))
|
|
| 60 |
- return |
|
| 61 |
- } |
|
| 81 |
+ |
|
| 82 |
+ if !bytes.Equal(crcHash.Sum(crcSum[:0]), entry.Payload) {
|
|
| 83 |
+ // I would rather this be a comparable ErrInvalidChecksum or such, |
|
| 84 |
+ // but since it's coming through the PipeReader, the context of |
|
| 85 |
+ // _which_ file would be lost... |
|
| 62 | 86 |
fh.Close() |
| 87 |
+ return fmt.Errorf("file integrity checksum failed for %q", entry.GetName())
|
|
| 63 | 88 |
} |
| 89 |
+ fh.Close() |
|
| 64 | 90 |
} |
| 65 |
- }() |
|
| 66 |
- return pr |
|
| 91 |
+ } |
|
| 92 |
+} |
|
| 93 |
+ |
|
| 94 |
+var byteBufferPool = &sync.Pool{
|
|
| 95 |
+ New: func() interface{} {
|
|
| 96 |
+ return make([]byte, 32*1024) |
|
| 97 |
+ }, |
|
| 98 |
+} |
|
| 99 |
+ |
|
| 100 |
+// copyWithBuffer is taken from stdlib io.Copy implementation |
|
| 101 |
+// https://github.com/golang/go/blob/go1.5.1/src/io/io.go#L367 |
|
| 102 |
+func copyWithBuffer(dst io.Writer, src io.Reader, buf []byte) (written int64, err error) {
|
|
| 103 |
+ for {
|
|
| 104 |
+ nr, er := src.Read(buf) |
|
| 105 |
+ if nr > 0 {
|
|
| 106 |
+ nw, ew := dst.Write(buf[0:nr]) |
|
| 107 |
+ if nw > 0 {
|
|
| 108 |
+ written += int64(nw) |
|
| 109 |
+ } |
|
| 110 |
+ if ew != nil {
|
|
| 111 |
+ err = ew |
|
| 112 |
+ break |
|
| 113 |
+ } |
|
| 114 |
+ if nr != nw {
|
|
| 115 |
+ err = io.ErrShortWrite |
|
| 116 |
+ break |
|
| 117 |
+ } |
|
| 118 |
+ } |
|
| 119 |
+ if er == io.EOF {
|
|
| 120 |
+ break |
|
| 121 |
+ } |
|
| 122 |
+ if er != nil {
|
|
| 123 |
+ err = er |
|
| 124 |
+ break |
|
| 125 |
+ } |
|
| 126 |
+ } |
|
| 127 |
+ return written, err |
|
| 67 | 128 |
} |
| ... | ... |
@@ -1,7 +1,6 @@ |
| 1 | 1 |
package storage |
| 2 | 2 |
|
| 3 | 3 |
import ( |
| 4 |
- "bufio" |
|
| 5 | 4 |
"encoding/json" |
| 6 | 5 |
"errors" |
| 7 | 6 |
"io" |
| ... | ... |
@@ -33,31 +32,15 @@ type PackUnpacker interface {
|
| 33 | 33 |
*/ |
| 34 | 34 |
|
| 35 | 35 |
type jsonUnpacker struct {
|
| 36 |
- r io.Reader |
|
| 37 |
- b *bufio.Reader |
|
| 38 |
- isEOF bool |
|
| 39 |
- seen seenNames |
|
| 36 |
+ seen seenNames |
|
| 37 |
+ dec *json.Decoder |
|
| 40 | 38 |
} |
| 41 | 39 |
|
| 42 | 40 |
func (jup *jsonUnpacker) Next() (*Entry, error) {
|
| 43 | 41 |
var e Entry |
| 44 |
- if jup.isEOF {
|
|
| 45 |
- // since ReadBytes() will return read bytes AND an EOF, we handle it this |
|
| 46 |
- // round-a-bout way so we can Unmarshal the tail with relevant errors, but |
|
| 47 |
- // still get an io.EOF when the stream is ended. |
|
| 48 |
- return nil, io.EOF |
|
| 49 |
- } |
|
| 50 |
- line, err := jup.b.ReadBytes('\n')
|
|
| 51 |
- if err != nil && err != io.EOF {
|
|
| 42 |
+ err := jup.dec.Decode(&e) |
|
| 43 |
+ if err != nil {
|
|
| 52 | 44 |
return nil, err |
| 53 |
- } else if err == io.EOF {
|
|
| 54 |
- jup.isEOF = true |
|
| 55 |
- } |
|
| 56 |
- |
|
| 57 |
- err = json.Unmarshal(line, &e) |
|
| 58 |
- if err != nil && jup.isEOF {
|
|
| 59 |
- // if the remainder actually _wasn't_ a remaining json structure, then just EOF |
|
| 60 |
- return nil, io.EOF |
|
| 61 | 45 |
} |
| 62 | 46 |
|
| 63 | 47 |
// check for dup name |
| ... | ... |
@@ -78,8 +61,7 @@ func (jup *jsonUnpacker) Next() (*Entry, error) {
|
| 78 | 78 |
// Each Entry read are expected to be delimited by new line. |
| 79 | 79 |
func NewJSONUnpacker(r io.Reader) Unpacker {
|
| 80 | 80 |
return &jsonUnpacker{
|
| 81 |
- r: r, |
|
| 82 |
- b: bufio.NewReader(r), |
|
| 81 |
+ dec: json.NewDecoder(r), |
|
| 83 | 82 |
seen: seenNames{},
|
| 84 | 83 |
} |
| 85 | 84 |
} |