package fluent

import (
	"encoding/json"
	"errors"
	"fmt"
	"math"
	"net"
	"reflect"
	"strconv"
	"sync"
	"time"

	"github.com/tinylib/msgp/msgp"
)

const (
	defaultHost                   = "127.0.0.1"
	defaultNetwork                = "tcp"
	defaultSocketPath             = ""
	defaultPort                   = 24224
	defaultTimeout                = 3 * time.Second
	defaultWriteTimeout           = time.Duration(0) // Write() will not time out
	defaultBufferLimit            = 8 * 1024 * 1024
	defaultRetryWait              = 500
	defaultMaxRetry               = 13
	defaultReconnectWaitIncreRate = 1.5
	// Default sub-second precision value to false since it is only compatible
	// with fluentd versions v0.14 and above.
	defaultSubSecondPrecision = false
)

type Config struct {
	FluentPort       int           `json:"fluent_port"`
	FluentHost       string        `json:"fluent_host"`
	FluentNetwork    string        `json:"fluent_network"`
	FluentSocketPath string        `json:"fluent_socket_path"`
	Timeout          time.Duration `json:"timeout"`
	WriteTimeout     time.Duration `json:"write_timeout"`
	BufferLimit      int           `json:"buffer_limit"`
	RetryWait        int           `json:"retry_wait"`
	MaxRetry         int           `json:"max_retry"`
	TagPrefix        string        `json:"tag_prefix"`
	AsyncConnect     bool          `json:"async_connect"`
	MarshalAsJSON    bool          `json:"marshal_as_json"`

	// Sub-second precision timestamps are only possible for those using fluentd
	// v0.14+ and serializing their messages with msgpack.
	SubSecondPrecision bool `json:"sub_second_precision"`
}

type Fluent struct {
	Config

	mubuff  sync.Mutex
	pending []byte

	muconn       sync.Mutex
	conn         net.Conn
	reconnecting bool
}

// New creates a new Logger.
func New(config Config) (f *Fluent, err error) {
	if config.FluentNetwork == "" {
		config.FluentNetwork = defaultNetwork
	}
	if config.FluentHost == "" {
		config.FluentHost = defaultHost
	}
	if config.FluentPort == 0 {
		config.FluentPort = defaultPort
	}
	if config.FluentSocketPath == "" {
		config.FluentSocketPath = defaultSocketPath
	}
	if config.Timeout == 0 {
		config.Timeout = defaultTimeout
	}
	if config.WriteTimeout == 0 {
		config.WriteTimeout = defaultWriteTimeout
	}
	if config.BufferLimit == 0 {
		config.BufferLimit = defaultBufferLimit
	}
	if config.RetryWait == 0 {
		config.RetryWait = defaultRetryWait
	}
	if config.MaxRetry == 0 {
		config.MaxRetry = defaultMaxRetry
	}
	if config.AsyncConnect {
		f = &Fluent{Config: config, reconnecting: true}
		go f.reconnect()
	} else {
		f = &Fluent{Config: config, reconnecting: false}
		err = f.connect()
	}
	return
}

// Post writes the output for a logging event.
//
// Examples:
//
//  // send map[string]
//  mapStringData := map[string]string{
//  	"foo":  "bar",
//  }
//  f.Post("tag_name", mapStringData)
//
//  // send message with specified time
//  mapStringData := map[string]string{
//  	"foo":  "bar",
//  }
//  tm := time.Now()
//  f.PostWithTime("tag_name", tm, mapStringData)
//
//  // send struct
//  structData := struct {
//  		Name string `msg:"name"`
//  } {
//  		"john smith",
//  }
//  f.Post("tag_name", structData)
//
func (f *Fluent) Post(tag string, message interface{}) error {
	timeNow := time.Now()
	return f.PostWithTime(tag, timeNow, message)
}

func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) error {
	if len(f.TagPrefix) > 0 {
		tag = f.TagPrefix + "." + tag
	}

	if m, ok := message.(msgp.Marshaler); ok {
		return f.EncodeAndPostData(tag, tm, m)
	}

	msg := reflect.ValueOf(message)
	msgtype := msg.Type()

	if msgtype.Kind() == reflect.Struct {
		// message should be tagged by "codec" or "msg"
		kv := make(map[string]interface{})
		fields := msgtype.NumField()
		for i := 0; i < fields; i++ {
			field := msgtype.Field(i)
			name := field.Name
			if n1 := field.Tag.Get("msg"); n1 != "" {
				name = n1
			} else if n2 := field.Tag.Get("codec"); n2 != "" {
				name = n2
			}
			kv[name] = msg.FieldByIndex(field.Index).Interface()
		}
		return f.EncodeAndPostData(tag, tm, kv)
	}

	if msgtype.Kind() != reflect.Map {
		return errors.New("fluent#PostWithTime: message must be a map")
	} else if msgtype.Key().Kind() != reflect.String {
		return errors.New("fluent#PostWithTime: map keys must be strings")
	}

	kv := make(map[string]interface{})
	for _, k := range msg.MapKeys() {
		kv[k.String()] = msg.MapIndex(k).Interface()
	}

	return f.EncodeAndPostData(tag, tm, kv)
}

func (f *Fluent) EncodeAndPostData(tag string, tm time.Time, message interface{}) error {
	var data []byte
	var err error
	if data, err = f.EncodeData(tag, tm, message); err != nil {
		return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%#v' to msgpack:%v", message, err)
	}
	return f.postRawData(data)
}

// Deprecated: Use EncodeAndPostData instead
func (f *Fluent) PostRawData(data []byte) {
	f.postRawData(data)
}

func (f *Fluent) postRawData(data []byte) error {
	if err := f.appendBuffer(data); err != nil {
		return err
	}
	if err := f.send(); err != nil {
		f.close()
		return err
	}
	return nil
}

// For sending forward protocol adopted JSON
type MessageChunk struct {
	message Message
}

// Golang default marshaler does not support
// ["value", "value2", {"key":"value"}] style marshaling.
// So, it should write JSON marshaler by hand.
func (chunk *MessageChunk) MarshalJSON() ([]byte, error) {
	data, err := json.Marshal(chunk.message.Record)
	return []byte(fmt.Sprintf("[\"%s\",%d,%s,null]", chunk.message.Tag,
		chunk.message.Time, data)), err
}

func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (data []byte, err error) {
	timeUnix := tm.Unix()
	if f.Config.MarshalAsJSON {
		msg := Message{Tag: tag, Time: timeUnix, Record: message}
		chunk := &MessageChunk{message: msg}
		data, err = json.Marshal(chunk)
	} else if f.Config.SubSecondPrecision {
		msg := &MessageExt{Tag: tag, Time: EventTime(tm), Record: message}
		data, err = msg.MarshalMsg(nil)
	} else {
		msg := &Message{Tag: tag, Time: timeUnix, Record: message}
		data, err = msg.MarshalMsg(nil)
	}
	return
}

// Close closes the connection.
func (f *Fluent) Close() (err error) {
	if len(f.pending) > 0 {
		err = f.send()
	}
	f.close()
	return
}

// appendBuffer appends data to buffer with lock.
func (f *Fluent) appendBuffer(data []byte) error {
	f.mubuff.Lock()
	defer f.mubuff.Unlock()
	if len(f.pending)+len(data) > f.Config.BufferLimit {
		return errors.New(fmt.Sprintf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit))
	}
	f.pending = append(f.pending, data...)
	return nil
}

// close closes the connection.
func (f *Fluent) close() {
	f.muconn.Lock()
	if f.conn != nil {
		f.conn.Close()
		f.conn = nil
	}
	f.muconn.Unlock()
}

// connect establishes a new connection using the specified transport.
func (f *Fluent) connect() (err error) {
	f.muconn.Lock()
	defer f.muconn.Unlock()

	switch f.Config.FluentNetwork {
	case "tcp":
		f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort), f.Config.Timeout)
	case "unix":
		f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentSocketPath, f.Config.Timeout)
	default:
		err = net.UnknownNetworkError(f.Config.FluentNetwork)
	}

	if err == nil {
		f.reconnecting = false
	}
	return
}

func e(x, y float64) int {
	return int(math.Pow(x, y))
}

func (f *Fluent) reconnect() {
	for i := 0; ; i++ {
		err := f.connect()
		if err == nil {
			f.send()
			return
		}
		if i == f.Config.MaxRetry {
			// TODO: What we can do when connection failed MaxRetry times?
			panic("fluent#reconnect: failed to reconnect!")
		}
		waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
		time.Sleep(time.Duration(waitTime) * time.Millisecond)
	}
}

func (f *Fluent) send() error {
	f.muconn.Lock()
	defer f.muconn.Unlock()

	if f.conn == nil {
		if f.reconnecting == false {
			f.reconnecting = true
			go f.reconnect()
		}
		return errors.New("fluent#send: can't send logs, client is reconnecting")
	}

	f.mubuff.Lock()
	defer f.mubuff.Unlock()

	var err error
	if len(f.pending) > 0 {
		t := f.Config.WriteTimeout
		if time.Duration(0) < t {
			f.conn.SetWriteDeadline(time.Now().Add(t))
		} else {
			f.conn.SetWriteDeadline(time.Time{})
		}
		_, err = f.conn.Write(f.pending)
		if err != nil {
			f.conn.Close()
			f.conn = nil
		} else {
			f.pending = f.pending[:0]
		}
	}
	return err
}