package metadata

import (
	"context"
	"encoding/binary"
	"strings"
	"sync"
	"time"

	"github.com/boltdb/bolt"
	"github.com/containerd/containerd/content"
	"github.com/containerd/containerd/errdefs"
	"github.com/containerd/containerd/filters"
	"github.com/containerd/containerd/labels"
	"github.com/containerd/containerd/log"
	"github.com/containerd/containerd/metadata/boltutil"
	"github.com/containerd/containerd/namespaces"
	digest "github.com/opencontainers/go-digest"
	"github.com/pkg/errors"
)

type contentStore struct {
	content.Store
	db *DB
	l  sync.RWMutex
}

// newContentStore returns a namespaced content store using an existing
// content store interface.
func newContentStore(db *DB, cs content.Store) *contentStore {
	return &contentStore{
		Store: cs,
		db:    db,
	}
}

func (cs *contentStore) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
	ns, err := namespaces.NamespaceRequired(ctx)
	if err != nil {
		return content.Info{}, err
	}

	var info content.Info
	if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
		bkt := getBlobBucket(tx, ns, dgst)
		if bkt == nil {
			return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst)
		}

		info.Digest = dgst
		return readInfo(&info, bkt)
	}); err != nil {
		return content.Info{}, err
	}

	return info, nil
}

func (cs *contentStore) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
	ns, err := namespaces.NamespaceRequired(ctx)
	if err != nil {
		return content.Info{}, err
	}

	cs.l.RLock()
	defer cs.l.RUnlock()

	updated := content.Info{
		Digest: info.Digest,
	}
	if err := update(ctx, cs.db, func(tx *bolt.Tx) error {
		bkt := getBlobBucket(tx, ns, info.Digest)
		if bkt == nil {
			return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", info.Digest)
		}

		if err := readInfo(&updated, bkt); err != nil {
			return errors.Wrapf(err, "info %q", info.Digest)
		}

		if len(fieldpaths) > 0 {
			for _, path := range fieldpaths {
				if strings.HasPrefix(path, "labels.") {
					if updated.Labels == nil {
						updated.Labels = map[string]string{}
					}

					key := strings.TrimPrefix(path, "labels.")
					updated.Labels[key] = info.Labels[key]
					continue
				}

				switch path {
				case "labels":
					updated.Labels = info.Labels
				default:
					return errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on content info %q", path, info.Digest)
				}
			}
		} else {
			// Set mutable fields
			updated.Labels = info.Labels
		}
		if err := validateInfo(&updated); err != nil {
			return err
		}

		updated.UpdatedAt = time.Now().UTC()
		return writeInfo(&updated, bkt)
	}); err != nil {
		return content.Info{}, err
	}
	return updated, nil
}

func (cs *contentStore) Walk(ctx context.Context, fn content.WalkFunc, fs ...string) error {
	ns, err := namespaces.NamespaceRequired(ctx)
	if err != nil {
		return err
	}

	filter, err := filters.ParseAll(fs...)
	if err != nil {
		return err
	}

	// TODO: Batch results to keep from reading all info into memory
	var infos []content.Info
	if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
		bkt := getBlobsBucket(tx, ns)
		if bkt == nil {
			return nil
		}

		return bkt.ForEach(func(k, v []byte) error {
			dgst, err := digest.Parse(string(k))
			if err != nil {
				// Not a digest, skip
				return nil
			}
			bbkt := bkt.Bucket(k)
			if bbkt == nil {
				return nil
			}
			info := content.Info{
				Digest: dgst,
			}
			if err := readInfo(&info, bkt.Bucket(k)); err != nil {
				return err
			}
			if filter.Match(adaptContentInfo(info)) {
				infos = append(infos, info)
			}
			return nil
		})
	}); err != nil {
		return err
	}

	for _, info := range infos {
		if err := fn(info); err != nil {
			return err
		}
	}

	return nil
}

func (cs *contentStore) Delete(ctx context.Context, dgst digest.Digest) error {
	ns, err := namespaces.NamespaceRequired(ctx)
	if err != nil {
		return err
	}

	cs.l.RLock()
	defer cs.l.RUnlock()

	return update(ctx, cs.db, func(tx *bolt.Tx) error {
		bkt := getBlobBucket(tx, ns, dgst)
		if bkt == nil {
			return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst)
		}

		if err := getBlobsBucket(tx, ns).DeleteBucket([]byte(dgst.String())); err != nil {
			return err
		}

		// Mark content store as dirty for triggering garbage collection
		cs.db.dirtyL.Lock()
		cs.db.dirtyCS = true
		cs.db.dirtyL.Unlock()

		return nil
	})
}

func (cs *contentStore) ListStatuses(ctx context.Context, fs ...string) ([]content.Status, error) {
	ns, err := namespaces.NamespaceRequired(ctx)
	if err != nil {
		return nil, err
	}

	filter, err := filters.ParseAll(fs...)
	if err != nil {
		return nil, err
	}

	brefs := map[string]string{}
	if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
		bkt := getIngestBucket(tx, ns)
		if bkt == nil {
			return nil
		}

		return bkt.ForEach(func(k, v []byte) error {
			// TODO(dmcgowan): match name and potentially labels here
			brefs[string(k)] = string(v)
			return nil
		})
	}); err != nil {
		return nil, err
	}

	statuses := make([]content.Status, 0, len(brefs))
	for k, bref := range brefs {
		status, err := cs.Store.Status(ctx, bref)
		if err != nil {
			if errdefs.IsNotFound(err) {
				continue
			}
			return nil, err
		}
		status.Ref = k

		if filter.Match(adaptContentStatus(status)) {
			statuses = append(statuses, status)
		}
	}

	return statuses, nil

}

func getRef(tx *bolt.Tx, ns, ref string) string {
	bkt := getIngestBucket(tx, ns)
	if bkt == nil {
		return ""
	}
	v := bkt.Get([]byte(ref))
	if len(v) == 0 {
		return ""
	}
	return string(v)
}

func (cs *contentStore) Status(ctx context.Context, ref string) (content.Status, error) {
	ns, err := namespaces.NamespaceRequired(ctx)
	if err != nil {
		return content.Status{}, err
	}

	var bref string
	if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
		bref = getRef(tx, ns, ref)
		if bref == "" {
			return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref)
		}

		return nil
	}); err != nil {
		return content.Status{}, err
	}

	st, err := cs.Store.Status(ctx, bref)
	if err != nil {
		return content.Status{}, err
	}
	st.Ref = ref
	return st, nil
}

func (cs *contentStore) Abort(ctx context.Context, ref string) error {
	ns, err := namespaces.NamespaceRequired(ctx)
	if err != nil {
		return err
	}

	cs.l.RLock()
	defer cs.l.RUnlock()

	return update(ctx, cs.db, func(tx *bolt.Tx) error {
		bkt := getIngestBucket(tx, ns)
		if bkt == nil {
			return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref)
		}
		bref := string(bkt.Get([]byte(ref)))
		if bref == "" {
			return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref)
		}
		if err := bkt.Delete([]byte(ref)); err != nil {
			return err
		}

		return cs.Store.Abort(ctx, bref)
	})

}

func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) {
	ns, err := namespaces.NamespaceRequired(ctx)
	if err != nil {
		return nil, err
	}

	cs.l.RLock()
	defer cs.l.RUnlock()

	var w content.Writer
	if err := update(ctx, cs.db, func(tx *bolt.Tx) error {
		if expected != "" {
			cbkt := getBlobBucket(tx, ns, expected)
			if cbkt != nil {
				return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected)
			}
		}

		bkt, err := createIngestBucket(tx, ns)
		if err != nil {
			return err
		}

		var (
			bref  string
			brefb = bkt.Get([]byte(ref))
		)

		if brefb == nil {
			sid, err := bkt.NextSequence()
			if err != nil {
				return err
			}

			bref = createKey(sid, ns, ref)
			if err := bkt.Put([]byte(ref), []byte(bref)); err != nil {
				return err
			}
		} else {
			bref = string(brefb)
		}

		// Do not use the passed in expected value here since it was
		// already checked against the user metadata. If the content
		// store has the content, it must still be written before
		// linked into the given namespace. It is possible in the future
		// to allow content which exists in content store but not
		// namespace to be linked here and returned an exist error, but
		// this would require more configuration to make secure.
		w, err = cs.Store.Writer(ctx, bref, size, "")
		return err
	}); err != nil {
		return nil, err
	}

	// TODO: keep the expected in the writer to use on commit
	// when no expected is provided there.
	return &namespacedWriter{
		Writer:    w,
		ref:       ref,
		namespace: ns,
		db:        cs.db,
		l:         &cs.l,
	}, nil
}

type namespacedWriter struct {
	content.Writer
	ref       string
	namespace string
	db        transactor
	l         *sync.RWMutex
}

func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
	nw.l.RLock()
	defer nw.l.RUnlock()

	return update(ctx, nw.db, func(tx *bolt.Tx) error {
		bkt := getIngestBucket(tx, nw.namespace)
		if bkt != nil {
			if err := bkt.Delete([]byte(nw.ref)); err != nil {
				return err
			}
		}
		dgst, err := nw.commit(ctx, tx, size, expected, opts...)
		if err != nil {
			return err
		}
		return addContentLease(ctx, tx, dgst)
	})
}

func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, expected digest.Digest, opts ...content.Opt) (digest.Digest, error) {
	var base content.Info
	for _, opt := range opts {
		if err := opt(&base); err != nil {
			return "", err
		}
	}
	if err := validateInfo(&base); err != nil {
		return "", err
	}

	status, err := nw.Writer.Status()
	if err != nil {
		return "", err
	}
	if size != 0 && size != status.Offset {
		return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size)
	}
	size = status.Offset

	actual := nw.Writer.Digest()

	if err := nw.Writer.Commit(ctx, size, expected); err != nil {
		if !errdefs.IsAlreadyExists(err) {
			return "", err
		}
		if getBlobBucket(tx, nw.namespace, actual) != nil {
			return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual)
		}
	}

	bkt, err := createBlobBucket(tx, nw.namespace, actual)
	if err != nil {
		return "", err
	}

	commitTime := time.Now().UTC()

	sizeEncoded, err := encodeInt(size)
	if err != nil {
		return "", err
	}

	if err := boltutil.WriteTimestamps(bkt, commitTime, commitTime); err != nil {
		return "", err
	}
	if err := boltutil.WriteLabels(bkt, base.Labels); err != nil {
		return "", err
	}
	return actual, bkt.Put(bucketKeySize, sizeEncoded)
}

func (nw *namespacedWriter) Status() (content.Status, error) {
	st, err := nw.Writer.Status()
	if err == nil {
		st.Ref = nw.ref
	}
	return st, err
}

func (cs *contentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) {
	if err := cs.checkAccess(ctx, dgst); err != nil {
		return nil, err
	}
	return cs.Store.ReaderAt(ctx, dgst)
}

func (cs *contentStore) checkAccess(ctx context.Context, dgst digest.Digest) error {
	ns, err := namespaces.NamespaceRequired(ctx)
	if err != nil {
		return err
	}

	return view(ctx, cs.db, func(tx *bolt.Tx) error {
		bkt := getBlobBucket(tx, ns, dgst)
		if bkt == nil {
			return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst)
		}
		return nil
	})
}

func validateInfo(info *content.Info) error {
	for k, v := range info.Labels {
		if err := labels.Validate(k, v); err == nil {
			return errors.Wrapf(err, "info.Labels")
		}
	}

	return nil
}

func readInfo(info *content.Info, bkt *bolt.Bucket) error {
	if err := boltutil.ReadTimestamps(bkt, &info.CreatedAt, &info.UpdatedAt); err != nil {
		return err
	}

	labels, err := boltutil.ReadLabels(bkt)
	if err != nil {
		return err
	}
	info.Labels = labels

	if v := bkt.Get(bucketKeySize); len(v) > 0 {
		info.Size, _ = binary.Varint(v)
	}

	return nil
}

func writeInfo(info *content.Info, bkt *bolt.Bucket) error {
	if err := boltutil.WriteTimestamps(bkt, info.CreatedAt, info.UpdatedAt); err != nil {
		return err
	}

	if err := boltutil.WriteLabels(bkt, info.Labels); err != nil {
		return errors.Wrapf(err, "writing labels for info %v", info.Digest)
	}

	// Write size
	sizeEncoded, err := encodeInt(info.Size)
	if err != nil {
		return err
	}

	return bkt.Put(bucketKeySize, sizeEncoded)
}

func (cs *contentStore) garbageCollect(ctx context.Context) error {
	lt1 := time.Now()
	cs.l.Lock()
	defer func() {
		cs.l.Unlock()
		log.G(ctx).WithField("t", time.Now().Sub(lt1)).Debugf("content garbage collected")
	}()

	seen := map[string]struct{}{}
	if err := cs.db.View(func(tx *bolt.Tx) error {
		v1bkt := tx.Bucket(bucketKeyVersion)
		if v1bkt == nil {
			return nil
		}

		// iterate through each namespace
		v1c := v1bkt.Cursor()

		for k, v := v1c.First(); k != nil; k, v = v1c.Next() {
			if v != nil {
				continue
			}

			cbkt := v1bkt.Bucket(k).Bucket(bucketKeyObjectContent)
			if cbkt == nil {
				continue
			}
			bbkt := cbkt.Bucket(bucketKeyObjectBlob)
			if err := bbkt.ForEach(func(ck, cv []byte) error {
				if cv == nil {
					seen[string(ck)] = struct{}{}
				}
				return nil
			}); err != nil {
				return err
			}
		}

		return nil
	}); err != nil {
		return err
	}

	return cs.Store.Walk(ctx, func(info content.Info) error {
		if _, ok := seen[info.Digest.String()]; !ok {
			if err := cs.Store.Delete(ctx, info.Digest); err != nil {
				return err
			}
			log.G(ctx).WithField("digest", info.Digest).Debug("removed content")
		}
		return nil
	})
}