writer.go 4.3 KB
Newer Older
E
eoLinker API Management 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
package goku_log

import (
	"bufio"
	"bytes"
	"context"
	"fmt"
	"os"
	"path/filepath"
	"strings"
	"sync"
	"time"
)
Y
Your Name 已提交
14 15 16 17

const MaxBufferd = 1024 * 500

var (
E
eoLinker API Management 已提交
18 19 20 21 22 23
	bufferPool = &sync.Pool{
		New: func() interface{} {
			return new(bytes.Buffer)
		},
	}
)
Y
Your Name 已提交
24

E
eoLinker API Management 已提交
25
type FileWriterByPeriod struct {
Y
Your Name 已提交
26 27 28 29 30 31 32 33 34
	wC         chan *bytes.Buffer
	dir        string
	file       string
	period     LogPeriod
	enable     bool
	cancelFunc context.CancelFunc
	locker     sync.Mutex
	wg         sync.WaitGroup
	expire     time.Duration
E
eoLinker API Management 已提交
35 36 37
}

func NewFileWriteBytePeriod() *FileWriterByPeriod {
Y
Your Name 已提交
38 39 40 41
	w := &FileWriterByPeriod{
		locker: sync.Mutex{},
		wg:     sync.WaitGroup{},
		enable: false,
E
eoLinker API Management 已提交
42 43 44 45
	}

	return w
}
Y
Your Name 已提交
46
func (w *FileWriterByPeriod) getExpire() time.Duration {
E
eoLinker API Management 已提交
47
	w.locker.Lock()
Y
Your Name 已提交
48
	expire := w.expire
E
eoLinker API Management 已提交
49 50 51
	w.locker.Unlock()
	return expire
}
Y
Your Name 已提交
52 53
func (w *FileWriterByPeriod) Set(dir, file string, period LogPeriod, expire time.Duration) {
	fileName := strings.TrimSuffix(file, ".log")
E
eoLinker API Management 已提交
54 55 56 57 58 59 60 61

	w.locker.Lock()
	w.file = fileName
	w.dir = dir
	w.period = period
	w.expire = expire
	w.locker.Unlock()
}
Y
Your Name 已提交
62
func (w *FileWriterByPeriod) Open() {
E
eoLinker API Management 已提交
63
	w.locker.Lock()
Y
Your Name 已提交
64
	defer w.locker.Unlock()
E
eoLinker API Management 已提交
65

Y
Your Name 已提交
66
	if w.enable {
E
eoLinker API Management 已提交
67 68 69 70 71
		return
	}

	ctx, cancel := context.WithCancel(context.Background())
	w.cancelFunc = cancel
Y
Your Name 已提交
72
	w.wC = make(chan *bytes.Buffer, 100)
E
eoLinker API Management 已提交
73 74 75 76
	w.wg.Add(1)
	w.enable = true
	go w.do(ctx)
}
Y
Your Name 已提交
77
func (w *FileWriterByPeriod) Close() {
E
eoLinker API Management 已提交
78 79 80

	isClose := false
	w.locker.Lock()
Y
Your Name 已提交
81
	if !w.enable {
E
eoLinker API Management 已提交
82 83 84 85
		w.locker.Unlock()
		return
	}

Y
Your Name 已提交
86
	if w.cancelFunc != nil {
E
eoLinker API Management 已提交
87 88 89 90 91 92
		isClose = true
		w.cancelFunc()
		w.cancelFunc = nil
	}
	w.enable = false
	w.locker.Unlock()
Y
Your Name 已提交
93
	if isClose {
E
eoLinker API Management 已提交
94 95 96 97 98 99
		w.wg.Wait()
	}

}
func (w *FileWriterByPeriod) Write(p []byte) (n int, err error) {

Y
Your Name 已提交
100
	l := len(p)
E
eoLinker API Management 已提交
101
	if !w.enable {
Y
Your Name 已提交
102
		return l, nil
E
eoLinker API Management 已提交
103 104 105 106 107
	}
	buffer := bufferPool.Get().(*bytes.Buffer)
	buffer.Reset()
	buffer.Write(p)

Y
Your Name 已提交
108 109
	w.wC <- buffer
	return l, nil
E
eoLinker API Management 已提交
110 111
}

Y
Your Name 已提交
112
func (w *FileWriterByPeriod) do(ctx context.Context) {
E
eoLinker API Management 已提交
113
	w.initFile()
Y
Your Name 已提交
114 115 116
	f, lastTag, e := w.openFile()
	if e != nil {
		fmt.Printf("open log file:%s\n", e.Error())
E
eoLinker API Management 已提交
117 118 119
		return
	}

Y
Your Name 已提交
120 121
	buf := bufio.NewWriter(f)
	t := time.NewTicker(time.Second * 5)
E
eoLinker API Management 已提交
122
	defer t.Stop()
Y
Your Name 已提交
123
	tflusth := time.NewTimer(time.Second)
E
eoLinker API Management 已提交
124

Y
Your Name 已提交
125
	for {
E
eoLinker API Management 已提交
126 127 128
		select {
		case <-ctx.Done():
			{
Y
Your Name 已提交
129 130
				for len(w.wC) > 0 {
					p := <-w.wC
E
eoLinker API Management 已提交
131 132 133 134 135 136 137 138 139 140 141 142
					buf.Write(p.Bytes())
					bufferPool.Put(p)
				}
				buf.Flush()
				f.Close()
				t.Stop()
				w.wg.Done()
				return
			}

		case <-t.C:
			{
Y
Your Name 已提交
143
				if buf.Buffered() > 0 {
E
eoLinker API Management 已提交
144 145 146
					buf.Flush()
					tflusth.Reset(time.Second)
				}
Y
Your Name 已提交
147
				if lastTag != w.timeTag(time.Now()) {
E
eoLinker API Management 已提交
148 149 150

					f.Close()
					w.history(lastTag)
Y
Your Name 已提交
151 152
					fnew, tag, err := w.openFile()
					if err != nil {
E
eoLinker API Management 已提交
153 154 155
						return
					}
					lastTag = tag
Y
Your Name 已提交
156
					f = fnew
E
eoLinker API Management 已提交
157 158 159 160 161 162 163 164
					buf.Reset(f)

					go w.dropHistory()
				}

			}
		case <-tflusth.C:
			{
Y
Your Name 已提交
165
				if buf.Buffered() > 0 {
E
eoLinker API Management 已提交
166 167 168 169
					buf.Flush()
				}
				tflusth.Reset(time.Second)
			}
Y
Your Name 已提交
170 171 172 173 174 175 176 177
		case p := <-w.wC:
			{
				buf.Write(p.Bytes())
				bufferPool.Put(p)
				if buf.Buffered() > MaxBufferd {
					buf.Flush()
				}
				tflusth.Reset(time.Second)
E
eoLinker API Management 已提交
178 179 180 181 182 183 184
			}
		}
	}
}
func (w *FileWriterByPeriod) timeTag(t time.Time) string {

	w.locker.Lock()
Y
Your Name 已提交
185
	tag := t.Format(w.period.FormatLayout())
E
eoLinker API Management 已提交
186 187 188 189 190
	w.locker.Unlock()
	return tag
}
func (w *FileWriterByPeriod) history(tag string) {

Y
Your Name 已提交
191 192 193
	path := filepath.Join(w.dir, fmt.Sprintf("%s.log", w.file))
	histroy := filepath.Join(w.dir, fmt.Sprintf("%s-%s.log", w.file, tag))
	_ = os.Rename(path, histroy)
E
eoLinker API Management 已提交
194 195

}
Y
Your Name 已提交
196 197
func (w *FileWriterByPeriod) dropHistory() {
	expire := w.getExpire()
E
eoLinker API Management 已提交
198
	expireTime := time.Now().Add(- expire)
Y
Your Name 已提交
199
	pathPatten := filepath.Join(w.dir, fmt.Sprintf("%s-*", w.file))
E
eoLinker API Management 已提交
200
	files, err := filepath.Glob(pathPatten)
Y
Your Name 已提交
201 202 203
	if err == nil {
		for _, f := range files {
			if info, e := os.Stat(f); e == nil {
E
eoLinker API Management 已提交
204

Y
Your Name 已提交
205 206
				if expireTime.After(info.ModTime()) {
					_ = os.Remove(f)
E
eoLinker API Management 已提交
207 208 209 210 211
				}
			}
		}
	}
}
Y
Your Name 已提交
212 213 214 215 216 217 218 219
func (w *FileWriterByPeriod) initFile() {
	_ = os.MkdirAll(w.dir, os.ModePerm)
	path := filepath.Join(w.dir, fmt.Sprintf("%s.log", w.file))
	nowTag := w.timeTag(time.Now())
	if info, e := os.Stat(path); e == nil {

		timeTag := w.timeTag(info.ModTime())
		if timeTag != nowTag {
E
eoLinker API Management 已提交
220 221 222 223 224 225 226 227
			w.history(timeTag)
		}
	}

	w.dropHistory()

}

Y
Your Name 已提交
228 229 230 231
func (w *FileWriterByPeriod) openFile() (*os.File, string, error) {
	path := filepath.Join(w.dir, fmt.Sprintf("%s.log", w.file))
	nowTag := w.timeTag(time.Now())
	f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
E
eoLinker API Management 已提交
232

Y
Your Name 已提交
233 234
	if err != nil {
		return nil, "", err
E
eoLinker API Management 已提交
235
	}
Y
Your Name 已提交
236
	return f, nowTag, err
E
eoLinker API Management 已提交
237

Y
Your Name 已提交
238
}