package server
import (
"bytes"
"crypto/sha256"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"os"
"strconv"
"testing"
"time"
"github.com/docker/distribution"
"github.com/docker/distribution/configuration"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest/schema1"
//"github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/handlers"
_ "github.com/docker/distribution/registry/storage/driver/inmemory"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"github.com/openshift/origin/pkg/client/testclient"
registrytest "github.com/openshift/origin/pkg/dockerregistry/testutil"
imagetest "github.com/openshift/origin/pkg/image/admission/testutil"
imageapi "github.com/openshift/origin/pkg/image/api"
)
func TestPullthroughServeBlob(t *testing.T) {
ctx := context.Background()
installFakeAccessController(t)
testImage, err := registrytest.NewImageForManifest("user/app", registrytest.SampleImageManifestSchema1, false)
if err != nil {
t.Fatal(err)
}
client := &testclient.Fake{}
client.AddReactor("get", "images", registrytest.GetFakeImageGetHandler(t, *testImage))
// TODO: get rid of those nasty global vars
backupRegistryClient := DefaultRegistryClient
DefaultRegistryClient = makeFakeRegistryClient(client, fake.NewSimpleClientset())
defer func() {
// set it back once this test finishes to make other unit tests working again
DefaultRegistryClient = backupRegistryClient
}()
// pullthrough middleware will attempt to pull from this registry instance
remoteRegistryApp := handlers.NewApp(ctx, &configuration.Configuration{
Loglevel: "debug",
Auth: map[string]configuration.Parameters{
fakeAuthorizerName: {"realm": fakeAuthorizerName},
},
Storage: configuration.Storage{
"inmemory": configuration.Parameters{},
"cache": configuration.Parameters{
"blobdescriptor": "inmemory",
},
"delete": configuration.Parameters{
"enabled": true,
},
},
Middleware: map[string][]configuration.Middleware{
"registry": {{Name: "openshift"}},
"repository": {{Name: "openshift", Options: configuration.Parameters{"pullthrough": false}}},
"storage": {{Name: "openshift"}},
},
})
remoteRegistryServer := httptest.NewServer(remoteRegistryApp)
defer remoteRegistryServer.Close()
serverURL, err := url.Parse(remoteRegistryServer.URL)
if err != nil {
t.Fatalf("error parsing server url: %v", err)
}
os.Setenv("DOCKER_REGISTRY_URL", serverURL.Host)
testImage.DockerImageReference = fmt.Sprintf("%s/%s@%s", serverURL.Host, "user/app", testImage.Name)
testImageStream := registrytest.TestNewImageStreamObject("user", "app", "latest", testImage.Name, testImage.DockerImageReference)
if testImageStream.Annotations == nil {
testImageStream.Annotations = make(map[string]string)
}
testImageStream.Annotations[imageapi.InsecureRepositoryAnnotation] = "true"
client.AddReactor("get", "imagestreams", imagetest.GetFakeImageStreamGetHandler(t, *testImageStream))
blob1Desc, blob1Content, err := registrytest.UploadTestBlob(serverURL, nil, "user/app")
if err != nil {
t.Fatal(err)
}
blob2Desc, blob2Content, err := registrytest.UploadTestBlob(serverURL, nil, "user/app")
if err != nil {
t.Fatal(err)
}
blob1Storage := map[digest.Digest][]byte{blob1Desc.Digest: blob1Content}
blob2Storage := map[digest.Digest][]byte{blob2Desc.Digest: blob2Content}
for _, tc := range []struct {
name string
method string
blobDigest digest.Digest
localBlobs map[digest.Digest][]byte
expectedStatError error
expectedContentLength int64
expectedBytesServed int64
expectedBytesServedLocally int64
expectedLocalCalls map[string]int
}{
{
name: "stat local blob",
method: "HEAD",
blobDigest: blob1Desc.Digest,
localBlobs: blob1Storage,
expectedContentLength: int64(len(blob1Content)),
expectedLocalCalls: map[string]int{
"Stat": 1,
"ServeBlob": 1,
},
},
{
name: "serve local blob",
method: "GET",
blobDigest: blob1Desc.Digest,
localBlobs: blob1Storage,
expectedContentLength: int64(len(blob1Content)),
expectedBytesServed: int64(len(blob1Content)),
expectedBytesServedLocally: int64(len(blob1Content)),
expectedLocalCalls: map[string]int{
"Stat": 1,
"ServeBlob": 1,
},
},
{
name: "stat remote blob",
method: "HEAD",
blobDigest: blob1Desc.Digest,
localBlobs: blob2Storage,
expectedContentLength: int64(len(blob1Content)),
expectedLocalCalls: map[string]int{"Stat": 1},
},
{
name: "serve remote blob",
method: "GET",
blobDigest: blob1Desc.Digest,
expectedContentLength: int64(len(blob1Content)),
expectedBytesServed: int64(len(blob1Content)),
expectedLocalCalls: map[string]int{"Stat": 1},
},
{
name: "unknown blob digest",
method: "GET",
blobDigest: unknownBlobDigest,
expectedStatError: distribution.ErrBlobUnknown,
expectedLocalCalls: map[string]int{"Stat": 1},
},
} {
localBlobStore := newTestBlobStore(tc.localBlobs)
cachedLayers, err := newDigestToRepositoryCache(10)
if err != nil {
t.Fatal(err)
}
ptbs := &pullthroughBlobStore{
BlobStore: localBlobStore,
repo: &repository{
ctx: ctx,
namespace: "user",
name: "app",
pullthrough: true,
cachedLayers: cachedLayers,
registryOSClient: client,
},
digestToStore: make(map[string]distribution.BlobStore),
}
req, err := http.NewRequest(tc.method, fmt.Sprintf("http://example.org/v2/user/app/blobs/%s", tc.blobDigest), nil)
if err != nil {
t.Fatalf("[%s] failed to create http request: %v", tc.name, err)
}
w := httptest.NewRecorder()
dgst := digest.Digest(tc.blobDigest)
_, err = ptbs.Stat(ctx, dgst)
if err != tc.expectedStatError {
t.Errorf("[%s] Stat returned unexpected error: %#+v != %#+v", tc.name, err, tc.expectedStatError)
}
if err != nil || tc.expectedStatError != nil {
continue
}
err = ptbs.ServeBlob(ctx, w, req, dgst)
if err != nil {
t.Errorf("[%s] unexpected ServeBlob error: %v", tc.name, err)
continue
}
clstr := w.Header().Get("Content-Length")
if cl, err := strconv.ParseInt(clstr, 10, 64); err != nil {
t.Errorf(`[%s] unexpected Content-Length: %q != "%d"`, tc.name, clstr, tc.expectedContentLength)
} else {
if cl != tc.expectedContentLength {
t.Errorf("[%s] Content-Length does not match expected size: %d != %d", tc.name, cl, tc.expectedContentLength)
}
}
if w.Header().Get("Content-Type") != "application/octet-stream" {
t.Errorf("[%s] Content-Type does not match expected: %q != %q", tc.name, w.Header().Get("Content-Type"), "application/octet-stream")
}
body := w.Body.Bytes()
if int64(len(body)) != tc.expectedBytesServed {
t.Errorf("[%s] unexpected size of body: %d != %d", tc.name, len(body), tc.expectedBytesServed)
}
for name, expCount := range tc.expectedLocalCalls {
count := localBlobStore.calls[name]
if count != expCount {
t.Errorf("[%s] expected %d calls to method %s of local blob store, not %d", tc.name, expCount, name, count)
}
}
for name, count := range localBlobStore.calls {
if _, exists := tc.expectedLocalCalls[name]; !exists {
t.Errorf("[%s] expected no calls to method %s of local blob store, got %d", tc.name, name, count)
}
}
if localBlobStore.bytesServed != tc.expectedBytesServedLocally {
t.Errorf("[%s] unexpected number of bytes served locally: %d != %d", tc.name, localBlobStore.bytesServed, tc.expectedBytesServed)
}
}
}
const (
unknownBlobDigest = "sha256:bef57ec7f53a6d40beb640a780a639c83bc29ac8a9816f1fc6c5c6dcd93c4721"
)
func makeDigestFromBytes(data []byte) digest.Digest {
return digest.Digest(fmt.Sprintf("sha256:%x", sha256.Sum256(data)))
}
type testBlobStore struct {
// blob digest mapped to content
blobs map[digest.Digest][]byte
// method name mapped to number of invocations
calls map[string]int
bytesServed int64
}
var _ distribution.BlobStore = &testBlobStore{}
func newTestBlobStore(blobs map[digest.Digest][]byte) *testBlobStore {
b := make(map[digest.Digest][]byte)
for d, content := range blobs {
b[d] = content
}
return &testBlobStore{
blobs: b,
calls: make(map[string]int),
}
}
func (t *testBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
t.calls["Stat"]++
content, exists := t.blobs[dgst]
if !exists {
return distribution.Descriptor{}, distribution.ErrBlobUnknown
}
return distribution.Descriptor{
MediaType: schema1.MediaTypeManifestLayer,
Size: int64(len(content)),
Digest: makeDigestFromBytes(content),
}, nil
}
func (t *testBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
t.calls["Get"]++
content, exists := t.blobs[dgst]
if !exists {
return nil, distribution.ErrBlobUnknown
}
return content, nil
}
func (t *testBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
t.calls["Open"]++
content, exists := t.blobs[dgst]
if !exists {
return nil, distribution.ErrBlobUnknown
}
return &testBlobFileReader{
bs: t,
content: content,
}, nil
}
func (t *testBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
t.calls["Put"]++
return distribution.Descriptor{}, fmt.Errorf("method not implemented")
}
func (t *testBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
t.calls["Create"]++
return nil, fmt.Errorf("method not implemented")
}
func (t *testBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
t.calls["Resume"]++
return nil, fmt.Errorf("method not implemented")
}
func (t *testBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, req *http.Request, dgst digest.Digest) error {
t.calls["ServeBlob"]++
content, exists := t.blobs[dgst]
if !exists {
return distribution.ErrBlobUnknown
}
reader := bytes.NewReader(content)
setResponseHeaders(w, int64(len(content)), "application/octet-stream", dgst)
http.ServeContent(w, req, dgst.String(), time.Time{}, reader)
n, err := reader.Seek(0, 1)
if err != nil {
return err
}
t.bytesServed = n
return nil
}
func (t *testBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
t.calls["Delete"]++
return fmt.Errorf("method not implemented")
}
type testBlobFileReader struct {
bs *testBlobStore
content []byte
offset int64
}
var _ distribution.ReadSeekCloser = &testBlobFileReader{}
func (fr *testBlobFileReader) Read(p []byte) (n int, err error) {
fr.bs.calls["ReadSeakCloser.Read"]++
n = copy(p, fr.content[fr.offset:])
fr.offset += int64(n)
fr.bs.bytesServed += int64(n)
return n, nil
}
func (fr *testBlobFileReader) Seek(offset int64, whence int) (int64, error) {
fr.bs.calls["ReadSeakCloser.Seek"]++
newOffset := fr.offset
switch whence {
case os.SEEK_CUR:
newOffset += int64(offset)
case os.SEEK_END:
newOffset = int64(len(fr.content)) + offset
case os.SEEK_SET:
newOffset = int64(offset)
}
var err error
if newOffset < 0 {
err = fmt.Errorf("cannot seek to negative position")
} else {
// No problems, set the offset.
fr.offset = newOffset
}
return fr.offset, err
}
func (fr *testBlobFileReader) Close() error {
fr.bs.calls["ReadSeakCloser.Close"]++
return nil
}