mirror of
https://gitlab.com/ric_harvey/MailHog.git
synced 2024-11-30 17:54:03 +00:00
245 lines
5.3 KiB
Go
245 lines
5.3 KiB
Go
package fluent
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"net"
|
|
"reflect"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
defaultHost = "127.0.0.1"
|
|
defaultPort = 24224
|
|
defaultTimeout = 3 * time.Second
|
|
defaultBufferLimit = 8 * 1024 * 1024
|
|
defaultRetryWait = 500
|
|
defaultMaxRetry = 13
|
|
defaultReconnectWaitIncreRate = 1.5
|
|
)
|
|
|
|
type Config struct {
|
|
FluentPort int
|
|
FluentHost string
|
|
Timeout time.Duration
|
|
BufferLimit int
|
|
RetryWait int
|
|
MaxRetry int
|
|
TagPrefix string
|
|
}
|
|
|
|
type Fluent struct {
|
|
Config
|
|
conn net.Conn
|
|
pending []byte
|
|
reconnecting bool
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// New creates a new Logger.
|
|
func New(config Config) (f *Fluent, err error) {
|
|
if config.FluentHost == "" {
|
|
config.FluentHost = defaultHost
|
|
}
|
|
if config.FluentPort == 0 {
|
|
config.FluentPort = defaultPort
|
|
}
|
|
if config.Timeout == 0 {
|
|
config.Timeout = defaultTimeout
|
|
}
|
|
if config.BufferLimit == 0 {
|
|
config.BufferLimit = defaultBufferLimit
|
|
}
|
|
if config.RetryWait == 0 {
|
|
config.RetryWait = defaultRetryWait
|
|
}
|
|
if config.MaxRetry == 0 {
|
|
config.MaxRetry = defaultMaxRetry
|
|
}
|
|
f = &Fluent{Config: config, reconnecting: false}
|
|
err = f.connect()
|
|
return
|
|
}
|
|
|
|
// Post writes the output for a logging event.
|
|
//
|
|
// Examples:
|
|
//
|
|
// // send string
|
|
// f.Post("tag_name", "data")
|
|
//
|
|
// // send map[string]
|
|
// mapStringData := map[string]string{
|
|
// "foo": "bar",
|
|
// }
|
|
// f.Post("tag_name", mapStringData)
|
|
//
|
|
// // send message with specified time
|
|
// mapStringData := map[string]string{
|
|
// "foo": "bar",
|
|
// }
|
|
// tm := time.Now()
|
|
// f.PostWithTime("tag_name", tm, mapStringData)
|
|
//
|
|
// // send struct
|
|
// structData := struct {
|
|
// Name string `msg:"name"`
|
|
// } {
|
|
// "john smith",
|
|
// }
|
|
// f.Post("tag_name", structData)
|
|
//
|
|
func (f *Fluent) Post(tag string, message interface{}) error {
|
|
timeNow := time.Now()
|
|
return f.PostWithTime(tag, timeNow, message)
|
|
}
|
|
|
|
func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) error {
|
|
if len(f.TagPrefix) > 0 {
|
|
tag = f.TagPrefix + "." + tag
|
|
}
|
|
|
|
msg := reflect.ValueOf(message)
|
|
msgtype := msg.Type()
|
|
|
|
if msgtype.Kind() == reflect.Struct {
|
|
// message should be tagged by "codec" or "msg"
|
|
kv := make(map[string]interface{})
|
|
fields := msgtype.NumField()
|
|
for i := 0; i < fields; i++ {
|
|
field := msgtype.Field(i)
|
|
name := field.Name
|
|
if n1 := field.Tag.Get("msg"); n1 != "" {
|
|
name = n1
|
|
} else if n2 := field.Tag.Get("codec"); n2 != "" {
|
|
name = n2
|
|
}
|
|
kv[name] = msg.FieldByIndex(field.Index).Interface()
|
|
}
|
|
return f.EncodeAndPostData(tag, tm, kv)
|
|
}
|
|
|
|
if msgtype.Kind() != reflect.Map {
|
|
return errors.New("messge must be a map")
|
|
} else if msgtype.Key().Kind() != reflect.String {
|
|
return errors.New("map keys must be strings")
|
|
}
|
|
|
|
kv := make(map[string]interface{})
|
|
for _, k := range msg.MapKeys() {
|
|
kv[k.String()] = msg.MapIndex(k).Interface()
|
|
}
|
|
|
|
return f.EncodeAndPostData(tag, tm, kv)
|
|
}
|
|
|
|
func (f *Fluent) EncodeAndPostData(tag string, tm time.Time, message interface{}) error {
|
|
if data, dumperr := f.EncodeData(tag, tm, message); dumperr != nil {
|
|
return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%s' to msgpack:%s", message, dumperr)
|
|
// fmt.Println("fluent#Post: can't convert to msgpack:", message, dumperr)
|
|
} else {
|
|
f.PostRawData(data)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (f *Fluent) PostRawData(data []byte) {
|
|
f.mu.Lock()
|
|
f.pending = append(f.pending, data...)
|
|
f.mu.Unlock()
|
|
if err := f.send(); err != nil {
|
|
f.close()
|
|
if len(f.pending) > f.Config.BufferLimit {
|
|
f.flushBuffer()
|
|
}
|
|
} else {
|
|
f.flushBuffer()
|
|
}
|
|
}
|
|
|
|
func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (data []byte, err error) {
|
|
timeUnix := tm.Unix()
|
|
msg := &Message{Tag: tag, Time: timeUnix, Record: message}
|
|
data, err = msg.MarshalMsg(nil)
|
|
return
|
|
}
|
|
|
|
// Close closes the connection.
|
|
func (f *Fluent) Close() (err error) {
|
|
if len(f.pending) > 0 {
|
|
_ = f.send()
|
|
}
|
|
err = f.close()
|
|
return
|
|
}
|
|
|
|
// close closes the connection.
|
|
func (f *Fluent) close() (err error) {
|
|
if f.conn != nil {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
} else {
|
|
return
|
|
}
|
|
if f.conn != nil {
|
|
f.conn.Close()
|
|
f.conn = nil
|
|
}
|
|
return
|
|
}
|
|
|
|
// connect establishes a new connection using the specified transport.
|
|
func (f *Fluent) connect() (err error) {
|
|
f.conn, err = net.DialTimeout("tcp", f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort), f.Config.Timeout)
|
|
return
|
|
}
|
|
|
|
func e(x, y float64) int {
|
|
return int(math.Pow(x, y))
|
|
}
|
|
|
|
func (f *Fluent) reconnect() {
|
|
go func() {
|
|
for i := 0; ; i++ {
|
|
err := f.connect()
|
|
if err == nil {
|
|
f.mu.Lock()
|
|
f.reconnecting = false
|
|
f.mu.Unlock()
|
|
break
|
|
} else {
|
|
if i == f.Config.MaxRetry {
|
|
panic("fluent#reconnect: failed to reconnect!")
|
|
}
|
|
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
|
|
time.Sleep(time.Duration(waitTime) * time.Millisecond)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (f *Fluent) flushBuffer() {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
f.pending = f.pending[0:0]
|
|
}
|
|
|
|
func (f *Fluent) send() (err error) {
|
|
if f.conn == nil {
|
|
if f.reconnecting == false {
|
|
f.mu.Lock()
|
|
f.reconnecting = true
|
|
f.mu.Unlock()
|
|
f.reconnect()
|
|
}
|
|
err = errors.New("fluent#send: can't send logs, client is reconnecting")
|
|
} else {
|
|
f.mu.Lock()
|
|
_, err = f.conn.Write(f.pending)
|
|
f.mu.Unlock()
|
|
}
|
|
return
|
|
}
|