package datastore

import (
	"errors"
	"path"
	"strings"
	"sync"

	store "github.com/moby/moby/v2/daemon/libnetwork/internal/kvstore"
	"github.com/moby/moby/v2/daemon/libnetwork/internal/kvstore/boltdb"
	"github.com/moby/moby/v2/daemon/libnetwork/types"
)

// ErrKeyModified is raised for an atomic update when the update is working on a stale state
var (
	ErrKeyModified = store.ErrKeyModified
	ErrKeyNotFound = store.ErrKeyNotFound
)

type Store struct {
	mu    sync.Mutex
	store store.Store
	cache *cache
}

// KVObject is Key/Value interface used by objects to be part of the Store.
type KVObject interface {
	// Key method lets an object provide the Key to be used in KV Store
	Key() []string
	// KeyPrefix method lets an object return immediate parent key that can be used for tree walk
	KeyPrefix() []string
	// Value method lets an object marshal its content to be stored in the KV store
	Value() []byte
	// SetValue is used by the datastore to set the object's value when loaded from the data store.
	SetValue([]byte) error
	// Index method returns the latest DB Index as seen by the object
	Index() uint64
	// SetIndex method allows the datastore to store the latest DB Index into the object
	SetIndex(uint64)
	// Exists returns true if the object exists in the datastore, false if it hasn't been stored yet.
	// When SetIndex() is called, the object has been stored.
	Exists() bool
	// Skip provides a way for a KV Object to avoid persisting it in the KV Store
	Skip() bool
	// New returns a new object which is created based on the
	// source object
	New() KVObject
	// CopyTo deep copies the contents of the implementing object
	// to the passed destination object
	CopyTo(KVObject) error
}

const (
	// NetworkKeyPrefix is the prefix for network key in the kv store
	NetworkKeyPrefix = "network"
	// EndpointKeyPrefix is the prefix for endpoint key in the kv store
	EndpointKeyPrefix = "endpoint"
)

var (
	defaultRootChain = []string{"docker", "network", "v1.0"}
	rootChain        = defaultRootChain
)

const DefaultBucket = "libnetwork"

// Key provides convenient method to create a Key
func Key(key ...string) string {
	var b strings.Builder
	for _, parts := range [][]string{rootChain, key} {
		for _, part := range parts {
			b.WriteString(part)
			b.WriteString("/")
		}
	}
	return b.String()
}

// New creates a new Store instance.
func New(dir, bucket string) (*Store, error) {
	if dir == "" {
		return nil, errors.New("empty dir")
	}
	if bucket == "" {
		return nil, errors.New("empty bucket")
	}

	s, err := boltdb.New(path.Join(dir, "local-kv.db"), bucket)
	if err != nil {
		return nil, err
	}

	return &Store{store: s, cache: newCache(s)}, nil
}

// Close closes the data store.
func (ds *Store) Close() {
	ds.store.Close()
}

// PutObjectAtomic provides an atomic add and update operation for a Record.
func (ds *Store) PutObjectAtomic(kvObject KVObject) error {
	ds.mu.Lock()
	defer ds.mu.Unlock()

	if kvObject == nil {
		return types.InvalidParameterErrorf("invalid KV Object: nil")
	}

	kvObjValue := kvObject.Value()

	if kvObjValue == nil {
		return types.InvalidParameterErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
	}

	if !kvObject.Skip() {
		var previous *store.KVPair
		if kvObject.Exists() {
			previous = &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
		}

		pair, err := ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous)
		if err != nil {
			if errors.Is(err, store.ErrKeyExists) {
				return ErrKeyModified
			}
			return err
		}

		kvObject.SetIndex(pair.LastIndex)
	}

	// If persistent store is skipped, sequencing needs to
	// happen in cache.
	return ds.cache.add(kvObject, kvObject.Skip())
}

// GetObject gets data from the store and unmarshals to the specified object.
func (ds *Store) GetObject(o KVObject) error {
	ds.mu.Lock()
	defer ds.mu.Unlock()

	return ds.cache.get(o)
}

func (ds *Store) ensureParent(parent string) error {
	exists, err := ds.store.Exists(parent)
	if err != nil {
		return err
	}
	if exists {
		return nil
	}
	return ds.store.Put(parent, []byte{})
}

// List returns of a list of KVObjects belonging to the parent key. The caller
// must pass a KVObject of the same type as the objects that need to be listed.
func (ds *Store) List(kvObject KVObject) ([]KVObject, error) {
	ds.mu.Lock()
	defer ds.mu.Unlock()

	return ds.cache.list(kvObject)
}

func (ds *Store) iterateKVPairsFromStore(key string, ctor KVObject, callback func(string, KVObject)) error {
	// Make sure the parent key exists
	if err := ds.ensureParent(key); err != nil {
		return err
	}

	kvList, err := ds.store.List(key)
	if err != nil {
		return err
	}

	for _, kvPair := range kvList {
		if len(kvPair.Value) == 0 {
			continue
		}

		dstO := ctor.New()
		if err := dstO.SetValue(kvPair.Value); err != nil {
			return err
		}

		// Make sure the object has a correct view of the DB index in
		// case we need to modify it and update the DB.
		dstO.SetIndex(kvPair.LastIndex)
		callback(kvPair.Key, dstO)
	}

	return nil
}

// Map returns a Map of KVObjects.
func (ds *Store) Map(key string, kvObject KVObject) (map[string]KVObject, error) {
	ds.mu.Lock()
	defer ds.mu.Unlock()

	results := map[string]KVObject{}
	err := ds.iterateKVPairsFromStore(key, kvObject, func(key string, val KVObject) {
		// Trim the leading & trailing "/" to make it consistent across all stores
		results[strings.Trim(key, "/")] = val
	})
	if err != nil {
		return nil, err
	}
	return results, nil
}

// DeleteObject deletes a kvObject from the on-disk DB and the in-memory cache.
// Unlike DeleteObjectAtomic, it doesn't check the optimistic lock of the
// passed kvObject.
func (ds *Store) DeleteObject(kvObject KVObject) error {
	ds.mu.Lock()
	defer ds.mu.Unlock()

	if kvObject == nil {
		return types.InvalidParameterErrorf("invalid KV Object: nil")
	}

	if !kvObject.Skip() {
		if err := ds.store.Delete(Key(kvObject.Key()...)); err != nil {
			return err
		}
	}

	// cleanup the cache only if AtomicDelete went through successfully
	// If persistent store is skipped, sequencing needs to
	// happen in cache.
	return ds.cache.del(kvObject, false)
}

// DeleteObjectAtomic performs atomic delete on a record.
func (ds *Store) DeleteObjectAtomic(kvObject KVObject) error {
	ds.mu.Lock()
	defer ds.mu.Unlock()

	if kvObject == nil {
		return types.InvalidParameterErrorf("invalid KV Object: nil")
	}

	previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}

	if !kvObject.Skip() {
		if err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil {
			if errors.Is(err, store.ErrKeyExists) {
				return ErrKeyModified
			}
			return err
		}
	}

	// cleanup the cache only if AtomicDelete went through successfully
	// If persistent store is skipped, sequencing needs to
	// happen in cache.
	return ds.cache.del(kvObject, kvObject.Skip())
}