package container

import (
	"encoding/base64"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"os"
	"runtime"
	"strings"
	"syscall"
	"time"

	"github.com/docker/distribution/reference"
	"github.com/docker/docker/api/types"
	"github.com/docker/docker/api/types/backend"
	containertypes "github.com/docker/docker/api/types/container"
	"github.com/docker/docker/api/types/events"
	containerpkg "github.com/docker/docker/container"
	"github.com/docker/docker/daemon/cluster/convert"
	executorpkg "github.com/docker/docker/daemon/cluster/executor"
	"github.com/docker/libnetwork"
	"github.com/docker/swarmkit/agent/exec"
	"github.com/docker/swarmkit/api"
	"github.com/docker/swarmkit/log"
	gogotypes "github.com/gogo/protobuf/types"
	"github.com/opencontainers/go-digest"
	"github.com/sirupsen/logrus"
	"golang.org/x/net/context"
	"golang.org/x/time/rate"
)

// containerAdapter conducts remote operations for a container. All calls
// are mostly naked calls to the client API, seeded with information from
// containerConfig.
type containerAdapter struct {
	backend      executorpkg.Backend
	container    *containerConfig
	dependencies exec.DependencyGetter
}

func newContainerAdapter(b executorpkg.Backend, task *api.Task, node *api.NodeDescription, dependencies exec.DependencyGetter) (*containerAdapter, error) {
	ctnr, err := newContainerConfig(task, node)
	if err != nil {
		return nil, err
	}

	return &containerAdapter{
		container:    ctnr,
		backend:      b,
		dependencies: dependencies,
	}, nil
}

func (c *containerAdapter) pullImage(ctx context.Context) error {
	spec := c.container.spec()

	// Skip pulling if the image is referenced by image ID.
	if _, err := digest.Parse(spec.Image); err == nil {
		return nil
	}

	// Skip pulling if the image is referenced by digest and already
	// exists locally.
	named, err := reference.ParseNormalizedNamed(spec.Image)
	if err == nil {
		if _, ok := named.(reference.Canonical); ok {
			_, err := c.backend.LookupImage(spec.Image)
			if err == nil {
				return nil
			}
		}
	}

	// if the image needs to be pulled, the auth config will be retrieved and updated
	var encodedAuthConfig string
	if spec.PullOptions != nil {
		encodedAuthConfig = spec.PullOptions.RegistryAuth
	}

	authConfig := &types.AuthConfig{}
	if encodedAuthConfig != "" {
		if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuthConfig))).Decode(authConfig); err != nil {
			logrus.Warnf("invalid authconfig: %v", err)
		}
	}

	pr, pw := io.Pipe()
	metaHeaders := map[string][]string{}
	go func() {
		// TODO @jhowardmsft LCOW Support: This will need revisiting as
		// the stack is built up to include LCOW support for swarm.
		platform := runtime.GOOS
		err := c.backend.PullImage(ctx, c.container.image(), "", platform, metaHeaders, authConfig, pw)
		pw.CloseWithError(err)
	}()

	dec := json.NewDecoder(pr)
	dec.UseNumber()
	m := map[string]interface{}{}
	spamLimiter := rate.NewLimiter(rate.Every(time.Second), 1)

	lastStatus := ""
	for {
		if err := dec.Decode(&m); err != nil {
			if err == io.EOF {
				break
			}
			return err
		}
		l := log.G(ctx)
		// limit pull progress logs unless the status changes
		if spamLimiter.Allow() || lastStatus != m["status"] {
			// if we have progress details, we have everything we need
			if progress, ok := m["progressDetail"].(map[string]interface{}); ok {
				// first, log the image and status
				l = l.WithFields(logrus.Fields{
					"image":  c.container.image(),
					"status": m["status"],
				})
				// then, if we have progress, log the progress
				if progress["current"] != nil && progress["total"] != nil {
					l = l.WithFields(logrus.Fields{
						"current": progress["current"],
						"total":   progress["total"],
					})
				}
			}
			l.Debug("pull in progress")
		}
		// sometimes, we get no useful information at all, and add no fields
		if status, ok := m["status"].(string); ok {
			lastStatus = status
		}
	}

	// if the final stream object contained an error, return it
	if errMsg, ok := m["error"]; ok {
		return fmt.Errorf("%v", errMsg)
	}
	return nil
}

func (c *containerAdapter) createNetworks(ctx context.Context) error {
	for name := range c.container.networksAttachments {
		ncr, err := c.container.networkCreateRequest(name)
		if err != nil {
			return err
		}

		if err := c.backend.CreateManagedNetwork(ncr); err != nil { // todo name missing
			if _, ok := err.(libnetwork.NetworkNameError); ok {
				continue
			}

			return err
		}
	}

	return nil
}

func (c *containerAdapter) removeNetworks(ctx context.Context) error {
	for name, v := range c.container.networksAttachments {
		if err := c.backend.DeleteManagedNetwork(v.Network.ID); err != nil {
			switch err.(type) {
			case *libnetwork.ActiveEndpointsError:
				continue
			case libnetwork.ErrNoSuchNetwork:
				continue
			default:
				log.G(ctx).Errorf("network %s remove failed: %v", name, err)
				return err
			}
		}
	}

	return nil
}

func (c *containerAdapter) networkAttach(ctx context.Context) error {
	config := c.container.createNetworkingConfig(c.backend)

	var (
		networkName string
		networkID   string
	)

	if config != nil {
		for n, epConfig := range config.EndpointsConfig {
			networkName = n
			networkID = epConfig.NetworkID
			break
		}
	}

	return c.backend.UpdateAttachment(networkName, networkID, c.container.networkAttachmentContainerID(), config)
}

func (c *containerAdapter) waitForDetach(ctx context.Context) error {
	config := c.container.createNetworkingConfig(c.backend)

	var (
		networkName string
		networkID   string
	)

	if config != nil {
		for n, epConfig := range config.EndpointsConfig {
			networkName = n
			networkID = epConfig.NetworkID
			break
		}
	}

	return c.backend.WaitForDetachment(ctx, networkName, networkID, c.container.taskID(), c.container.networkAttachmentContainerID())
}

func (c *containerAdapter) create(ctx context.Context) error {
	var cr containertypes.ContainerCreateCreatedBody
	var err error
	if cr, err = c.backend.CreateManagedContainer(types.ContainerCreateConfig{
		Name:       c.container.name(),
		Config:     c.container.config(),
		HostConfig: c.container.hostConfig(),
		// Use the first network in container create
		NetworkingConfig: c.container.createNetworkingConfig(c.backend),
	}); err != nil {
		return err
	}

	// Docker daemon currently doesn't support multiple networks in container create
	// Connect to all other networks
	nc := c.container.connectNetworkingConfig(c.backend)

	if nc != nil {
		for n, ep := range nc.EndpointsConfig {
			if err := c.backend.ConnectContainerToNetwork(cr.ID, n, ep); err != nil {
				return err
			}
		}
	}

	container := c.container.task.Spec.GetContainer()
	if container == nil {
		return errors.New("unable to get container from task spec")
	}

	if err := c.backend.SetContainerDependencyStore(cr.ID, c.dependencies); err != nil {
		return err
	}

	// configure secrets
	secretRefs := convert.SecretReferencesFromGRPC(container.Secrets)
	if err := c.backend.SetContainerSecretReferences(cr.ID, secretRefs); err != nil {
		return err
	}

	configRefs := convert.ConfigReferencesFromGRPC(container.Configs)
	if err := c.backend.SetContainerConfigReferences(cr.ID, configRefs); err != nil {
		return err
	}

	return c.backend.UpdateContainerServiceConfig(cr.ID, c.container.serviceConfig())
}

// checkMounts ensures that the provided mounts won't have any host-specific
// problems at start up. For example, we disallow bind mounts without an
// existing path, which slightly different from the container API.
func (c *containerAdapter) checkMounts() error {
	spec := c.container.spec()
	for _, mount := range spec.Mounts {
		switch mount.Type {
		case api.MountTypeBind:
			if _, err := os.Stat(mount.Source); os.IsNotExist(err) {
				return fmt.Errorf("invalid bind mount source, source path not found: %s", mount.Source)
			}
		}
	}

	return nil
}

func (c *containerAdapter) start(ctx context.Context) error {
	if err := c.checkMounts(); err != nil {
		return err
	}

	return c.backend.ContainerStart(c.container.name(), nil, "", "")
}

func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, error) {
	cs, err := c.backend.ContainerInspectCurrent(c.container.name(), false)
	if ctx.Err() != nil {
		return types.ContainerJSON{}, ctx.Err()
	}
	if err != nil {
		return types.ContainerJSON{}, err
	}
	return *cs, nil
}

// events issues a call to the events API and returns a channel with all
// events. The stream of events can be shutdown by cancelling the context.
func (c *containerAdapter) events(ctx context.Context) <-chan events.Message {
	log.G(ctx).Debugf("waiting on events")
	buffer, l := c.backend.SubscribeToEvents(time.Time{}, time.Time{}, c.container.eventFilter())
	eventsq := make(chan events.Message, len(buffer))

	for _, event := range buffer {
		eventsq <- event
	}

	go func() {
		defer c.backend.UnsubscribeFromEvents(l)

		for {
			select {
			case ev := <-l:
				jev, ok := ev.(events.Message)
				if !ok {
					log.G(ctx).Warnf("unexpected event message: %q", ev)
					continue
				}
				select {
				case eventsq <- jev:
				case <-ctx.Done():
					return
				}
			case <-ctx.Done():
				return
			}
		}
	}()

	return eventsq
}

func (c *containerAdapter) wait(ctx context.Context) (<-chan containerpkg.StateStatus, error) {
	return c.backend.ContainerWait(ctx, c.container.nameOrID(), containerpkg.WaitConditionNotRunning)
}

func (c *containerAdapter) shutdown(ctx context.Context) error {
	// Default stop grace period to nil (daemon will use the stopTimeout of the container)
	var stopgrace *int
	spec := c.container.spec()
	if spec.StopGracePeriod != nil {
		stopgraceValue := int(spec.StopGracePeriod.Seconds)
		stopgrace = &stopgraceValue
	}
	return c.backend.ContainerStop(c.container.name(), stopgrace)
}

func (c *containerAdapter) terminate(ctx context.Context) error {
	return c.backend.ContainerKill(c.container.name(), uint64(syscall.SIGKILL))
}

func (c *containerAdapter) remove(ctx context.Context) error {
	return c.backend.ContainerRm(c.container.name(), &types.ContainerRmConfig{
		RemoveVolume: true,
		ForceRemove:  true,
	})
}

func (c *containerAdapter) createVolumes(ctx context.Context) error {
	// Create plugin volumes that are embedded inside a Mount
	for _, mount := range c.container.task.Spec.GetContainer().Mounts {
		if mount.Type != api.MountTypeVolume {
			continue
		}

		if mount.VolumeOptions == nil {
			continue
		}

		if mount.VolumeOptions.DriverConfig == nil {
			continue
		}

		req := c.container.volumeCreateRequest(&mount)

		// Check if this volume exists on the engine
		if _, err := c.backend.VolumeCreate(req.Name, req.Driver, req.DriverOpts, req.Labels); err != nil {
			// TODO(amitshukla): Today, volume create through the engine api does not return an error
			// when the named volume with the same parameters already exists.
			// It returns an error if the driver name is different - that is a valid error
			return err
		}

	}

	return nil
}

func (c *containerAdapter) activateServiceBinding() error {
	return c.backend.ActivateContainerServiceBinding(c.container.name())
}

func (c *containerAdapter) deactivateServiceBinding() error {
	return c.backend.DeactivateContainerServiceBinding(c.container.name())
}

func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (<-chan *backend.LogMessage, error) {
	apiOptions := &types.ContainerLogsOptions{
		Follow: options.Follow,

		// Always say yes to Timestamps and Details. we make the decision
		// of whether to return these to the user or not way higher up the
		// stack.
		Timestamps: true,
		Details:    true,
	}

	if options.Since != nil {
		since, err := gogotypes.TimestampFromProto(options.Since)
		if err != nil {
			return nil, err
		}
		// print since as this formatted string because the docker container
		// logs interface expects it like this.
		// see github.com/docker/docker/api/types/time.ParseTimestamps
		apiOptions.Since = fmt.Sprintf("%d.%09d", since.Unix(), int64(since.Nanosecond()))
	}

	if options.Tail < 0 {
		// See protobuf documentation for details of how this works.
		apiOptions.Tail = fmt.Sprint(-options.Tail - 1)
	} else if options.Tail > 0 {
		return nil, errors.New("tail relative to start of logs not supported via docker API")
	}

	if len(options.Streams) == 0 {
		// empty == all
		apiOptions.ShowStdout, apiOptions.ShowStderr = true, true
	} else {
		for _, stream := range options.Streams {
			switch stream {
			case api.LogStreamStdout:
				apiOptions.ShowStdout = true
			case api.LogStreamStderr:
				apiOptions.ShowStderr = true
			}
		}
	}
	msgs, _, err := c.backend.ContainerLogs(ctx, c.container.name(), apiOptions)
	if err != nil {
		return nil, err
	}
	return msgs, nil
}

// todo: typed/wrapped errors
func isContainerCreateNameConflict(err error) bool {
	return strings.Contains(err.Error(), "Conflict. The name")
}

func isUnknownContainer(err error) bool {
	return strings.Contains(err.Error(), "No such container:")
}

func isStoppedContainer(err error) bool {
	return strings.Contains(err.Error(), "is already stopped")
}