package server

import (
	"fmt"
	"sort"
	"time"

	"github.com/docker/distribution"
	"github.com/docker/distribution/context"
	"github.com/docker/distribution/digest"
	"github.com/docker/distribution/registry/middleware/registry"
	"github.com/docker/distribution/registry/storage"

	kerrors "k8s.io/kubernetes/pkg/api/errors"

	imageapi "github.com/openshift/origin/pkg/image/api"
)

// ByGeneration allows for sorting tag events from latest to oldest.
type ByGeneration []*imageapi.TagEvent

func (b ByGeneration) Less(i, j int) bool { return b[i].Generation > b[j].Generation }
func (b ByGeneration) Len() int           { return len(b) }
func (b ByGeneration) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }

func init() {
	middleware.RegisterOptions(storage.BlobDescriptorServiceFactory(&blobDescriptorServiceFactory{}))
}

// blobDescriptorServiceFactory needs to be able to work with blobs
// directly without using links. This allows us to ignore the distribution
// of blobs between repositories.
type blobDescriptorServiceFactory struct{}

func (bf *blobDescriptorServiceFactory) BlobAccessController(svc distribution.BlobDescriptorService) distribution.BlobDescriptorService {
	return &blobDescriptorService{svc}
}

type blobDescriptorService struct {
	distribution.BlobDescriptorService
}

// Stat returns a a blob descriptor if the given blob is either linked in repository or is referenced in
// corresponding image stream. This method is invoked from inside of upstream's linkedBlobStore. It expects
// a proper repository object to be set on given context by upper openshift middleware wrappers.
func (bs *blobDescriptorService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
	repo, found := RepositoryFrom(ctx)
	if !found || repo == nil {
		err := fmt.Errorf("failed to retrieve repository from context")
		context.GetLogger(ctx).Error(err)
		return distribution.Descriptor{}, err
	}

	// if there is a repo layer link, return its descriptor
	desc, err := bs.BlobDescriptorService.Stat(ctx, dgst)
	if err == nil {
		// and remember the association
		repo.cachedLayers.RememberDigest(dgst, repo.blobrepositorycachettl, imageapi.DockerImageReference{
			Namespace: repo.namespace,
			Name:      repo.name,
		}.Exact())
		return desc, nil
	}

	context.GetLogger(ctx).Debugf("could not stat layer link %q in repository %q: %v", dgst.String(), repo.Named().Name(), err)

	// verify the blob is stored locally
	desc, err = dockerRegistry.BlobStatter().Stat(ctx, dgst)
	if err != nil {
		return desc, err
	}

	// ensure it's referenced inside of corresponding image stream
	if imageStreamHasBlob(repo, dgst) {
		return desc, nil
	}

	return distribution.Descriptor{}, distribution.ErrBlobUnknown
}

func (bs *blobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error {
	repo, found := RepositoryFrom(ctx)
	if !found || repo == nil {
		err := fmt.Errorf("failed to retrieve repository from context")
		context.GetLogger(ctx).Error(err)
		return err
	}

	repo.cachedLayers.ForgetDigest(dgst, imageapi.DockerImageReference{
		Namespace: repo.namespace,
		Name:      repo.name,
	}.Exact())
	return bs.BlobDescriptorService.Clear(ctx, dgst)
}

// imageStreamHasBlob returns true if the given blob digest is referenced in image stream corresponding to
// given repository. If not found locally, image stream's images will be iterated and fetched from newest to
// oldest until found. Each processed image will update local cache of blobs.
func imageStreamHasBlob(r *repository, dgst digest.Digest) bool {
	repoCacheName := imageapi.DockerImageReference{Namespace: r.namespace, Name: r.name}.Exact()
	if r.cachedLayers.RepositoryHasBlob(repoCacheName, dgst) {
		context.GetLogger(r.ctx).Debugf("found cached blob %q in repository %s", dgst.String(), r.Named().Name())
		return true
	}

	context.GetLogger(r.ctx).Debugf("verifying presence of blob %q in image stream %s/%s", dgst.String(), r.namespace, r.name)
	started := time.Now()
	logFound := func(found bool) bool {
		elapsed := time.Now().Sub(started)
		if found {
			context.GetLogger(r.ctx).Debugf("verified presence of blob %q in image stream %s/%s after %s", dgst.String(), r.namespace, r.name, elapsed.String())
		} else {
			context.GetLogger(r.ctx).Debugf("detected absence of blob %q in image stream %s/%s after %s", dgst.String(), r.namespace, r.name, elapsed.String())
		}
		return found
	}

	// verify directly with etcd
	is, err := r.getImageStream()
	if err != nil {
		context.GetLogger(r.ctx).Errorf("failed to get image stream: %v", err)
		return logFound(false)
	}

	tagEvents := []*imageapi.TagEvent{}
	event2Name := make(map[*imageapi.TagEvent]string)
	for name, eventList := range is.Status.Tags {
		for i := range eventList.Items {
			event := &eventList.Items[i]
			tagEvents = append(tagEvents, event)
			event2Name[event] = name
		}
	}
	// search from youngest to oldest
	sort.Sort(ByGeneration(tagEvents))

	processedImages := map[string]struct{}{}

	for _, tagEvent := range tagEvents {
		if _, processed := processedImages[tagEvent.Image]; processed {
			continue
		}
		if imageHasBlob(r, repoCacheName, tagEvent.Image, dgst.String(), !r.pullthrough) {
			tagName := event2Name[tagEvent]
			context.GetLogger(r.ctx).Debugf("blob found under istag %s/%s:%s in image %s", r.namespace, r.name, tagName, tagEvent.Image)
			return logFound(true)
		}
		processedImages[tagEvent.Image] = struct{}{}
	}

	context.GetLogger(r.ctx).Warnf("blob %q exists locally but is not referenced in repository %s/%s", dgst.String(), r.namespace, r.name)

	return logFound(false)
}

// imageHasBlob returns true if the image identified by imageName refers to the given blob. The image is
// fetched. If requireManaged is true and the image is not managed (it refers to remote registry), the image
// will not be processed. Fetched image will update local cache of blobs -> repositories with (blobDigest,
// cacheName) pairs.
func imageHasBlob(
	r *repository,
	cacheName,
	imageName,
	blobDigest string,
	requireManaged bool,
) bool {
	context.GetLogger(r.ctx).Debugf("getting image %s", imageName)
	image, err := r.getImage(digest.Digest(imageName))
	if err != nil {
		if kerrors.IsNotFound(err) {
			context.GetLogger(r.ctx).Debugf("image %q not found: imageName")
		} else {
			context.GetLogger(r.ctx).Errorf("failed to get image: %v", err)
		}
		return false
	}

	// in case of pullthrough disabled, client won't be able to download a blob belonging to not managed image
	// (image stored in external registry), thus don't consider them as candidates
	if managed := image.Annotations[imageapi.ManagedByOpenShiftAnnotation]; requireManaged && managed != "true" {
		context.GetLogger(r.ctx).Debugf("skipping not managed image")
		return false
	}

	if len(image.DockerImageLayers) == 0 {
		if len(image.DockerImageManifestMediaType) > 0 {
			// If the media type is set, we can safely assume that the best effort to fill the image layers
			// has already been done. There are none.
			return false
		}
		err = imageapi.ImageWithMetadata(image)
		if err != nil {
			context.GetLogger(r.ctx).Errorf("failed to get metadata for image %s: %v", imageName, err)
			return false
		}
	}

	for _, layer := range image.DockerImageLayers {
		if layer.Name == blobDigest {
			// remember all the layers of matching image
			r.rememberLayersOfImage(image, cacheName)
			return true
		}
	}

	// only manifest V2 schema2 has docker image config filled where dockerImage.Metadata.id is its digest
	if len(image.DockerImageConfig) > 0 && image.DockerImageMetadata.ID == blobDigest {
		// remember manifest config reference of schema 2 as well
		r.rememberLayersOfImage(image, cacheName)
		return true
	}

	return false
}