提交 3364e59b 编写于 作者: Y Your Name

3.1.3版本提交

上级 f63a55b4
...@@ -2,8 +2,6 @@ package database ...@@ -2,8 +2,6 @@ package database
import ( import (
"database/sql" "database/sql"
"io/ioutil"
"strings"
log "github.com/eolinker/goku-api-gateway/goku-log" log "github.com/eolinker/goku-api-gateway/goku-log"
...@@ -13,15 +11,9 @@ import ( ...@@ -13,15 +11,9 @@ import (
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
) )
var (
defaultDB *sql.DB
)
//InitConnection 初始化数据库连接 //InitConnection 初始化数据库连接
func InitConnection(config Config) error { func InitConnection(config Config) (*sql.DB, error) {
db, e := getConnection(config) return getConnection(config)
defaultDB = db
return e
} }
func getConnection(config Config) (*sql.DB, error) { func getConnection(config Config) (*sql.DB, error) {
...@@ -34,7 +26,6 @@ func getConnection(config Config) (*sql.DB, error) { ...@@ -34,7 +26,6 @@ func getConnection(config Config) (*sql.DB, error) {
} }
db.SetMaxOpenConns(1000) db.SetMaxOpenConns(1000)
db.SetMaxIdleConns(100) db.SetMaxIdleConns(100)
defaultDB = db
return db, nil return db, nil
} }
log.Info(e) log.Info(e)
...@@ -42,25 +33,17 @@ func getConnection(config Config) (*sql.DB, error) { ...@@ -42,25 +33,17 @@ func getConnection(config Config) (*sql.DB, error) {
} }
//GetConnection 获取数据库连接 //CheckConnection 检查数据库连接
func GetConnection() *sql.DB { func CheckConnection(driver string, source string) error {
return defaultDB
}
//InitTable 初始化表
func InitTable() error {
content, err := ioutil.ReadFile("sql/goku_ce.sql") db, e := sql.Open(driver, source)
sqls := strings.Split(string(content), ";") defer db.Close()
Tx, _ := GetConnection().Begin() if e == nil {
for _, sql := range sqls { if err := db.Ping(); err != nil {
_, err = Tx.Exec(sql)
if err != nil {
Tx.Rollback()
log.Error("InitTable error:",err,"\t sql:",sql)
return err return err
} }
return nil
} }
Tx.Commit() return e
return nil
} }
package database package database
const (
//MysqlDriver mysql驱动器
MysqlDriver = "mysql"
//Sqlite3Driver Sqlite3Driver驱动
Sqlite3Driver = "sqlite3"
)
//Config 数据库配置结构体 //Config 数据库配置结构体
type Config interface { type Config interface {
GetDriver() string GetDriver() string
......
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"crypto/tls" "crypto/tls"
"errors" "errors"
"fmt" "fmt"
log "github.com/eolinker/goku-api-gateway/goku-log"
"net" "net"
"net/http" "net/http"
"os" "os"
...@@ -15,6 +14,8 @@ import ( ...@@ -15,6 +14,8 @@ import (
"sync" "sync"
"syscall" "syscall"
"time" "time"
log "github.com/eolinker/goku-api-gateway/goku-log"
) )
const ( const (
...@@ -39,9 +40,10 @@ var ( ...@@ -39,9 +40,10 @@ var (
DefaultMaxHeaderBytes int DefaultMaxHeaderBytes int
DefaultHammerTime time.Duration DefaultHammerTime time.Duration
isChild bool isChild bool
socketOrder string socketOrder string
adminServer *endlessServer
nodeServer *endlessServer
hookableSignals []os.Signal hookableSignals []os.Signal
) )
...@@ -71,15 +73,24 @@ func init() { ...@@ -71,15 +73,24 @@ func init() {
type endlessServer struct { type endlessServer struct {
http.Server http.Server
EndlessListener net.Listener EndlessListener net.Listener
SignalHooks map[int]map[os.Signal][]func() SignalHooks map[int]map[os.Signal][]func()
tlsInnerListener *endlessListener tlsInnerListener *endlessListener
wg sync.WaitGroup wg sync.WaitGroup
sigChan chan os.Signal sigChan chan os.Signal
isChild bool isChild bool
state uint8 state uint8
lock *sync.RWMutex lock *sync.RWMutex
BeforeBegin func(add string) BeforeBegin func(add string)
runningServersForked bool
}
func SetAdminServer(server *endlessServer) {
adminServer = server
}
func SetNodeServer(server *endlessServer) {
nodeServer = server
} }
/* /*
...@@ -140,17 +151,25 @@ func NewServer(addr string, handler http.Handler) (srv *endlessServer) { ...@@ -140,17 +151,25 @@ func NewServer(addr string, handler http.Handler) (srv *endlessServer) {
runningServersOrder = append(runningServersOrder, addr) runningServersOrder = append(runningServersOrder, addr)
runningServers[addr] = srv runningServers[addr] = srv
return return
} }
/* func RestartServer() {
nodeServer.sigChan <- syscall.SIGHUP
}
func StopServer() {
nodeServer.sigChan <- syscall.SIGTERM
}
/*F
ListenAndServe listens on the TCP network address addr and then calls Serve ListenAndServe listens on the TCP network address addr and then calls Serve
with handler to handle requests on incoming connections. Handler is typically with handler to handle requests on incoming connections. Handler is typically
nil, in which case the DefaultServeMux is used. nil, in which case the DefaultServeMux is used.
*/ */
func ListenAndServe(addr string, handler http.Handler) error { func ListenAndServe(addr string, handler http.Handler) error {
server := NewServer(addr, handler) server := NewServer(addr, handler)
nodeServer = server
return server.ListenAndServe() return server.ListenAndServe()
} }
...@@ -338,7 +357,7 @@ func (srv *endlessServer) handleSignals() { ...@@ -338,7 +357,7 @@ func (srv *endlessServer) handleSignals() {
log.Info(pid, "Received SIGINT.") log.Info(pid, "Received SIGINT.")
srv.shutdown() srv.shutdown()
case syscall.SIGTERM: case syscall.SIGTERM:
log.Info(pid, "Received SIGTERM.") log.Info(``, "Received SIGTERM.")
srv.shutdown() srv.shutdown()
case syscall.SIGTSTP: case syscall.SIGTSTP:
log.Info(pid, "Received SIGTSTP.") log.Info(pid, "Received SIGTSTP.")
...@@ -418,15 +437,15 @@ func (srv *endlessServer) hammerTime(d time.Duration) { ...@@ -418,15 +437,15 @@ func (srv *endlessServer) hammerTime(d time.Duration) {
func (srv *endlessServer) fork() (err error) { func (srv *endlessServer) fork() (err error) {
runningServerReg.Lock() runningServerReg.Lock()
defer runningServerReg.Unlock() defer runningServerReg.Unlock()
log.Info(1234)
// only one server instance should fork! // only one server instance should fork!
if runningServersForked { if srv.runningServersForked {
return errors.New("Another process already forked. Ignoring this one.") return errors.New("Another process already forked. Ignoring this one.")
} }
log.Info(5456)
runningServersForked = true srv.runningServersForked = true
var files = make([]*os.File, len(runningServers)) var files = make([]*os.File, len(runningServers))
var orderArgs = make([]string, len(runningServers)) var orderArgs = make([]string, len(runningServers))
...@@ -443,7 +462,7 @@ func (srv *endlessServer) fork() (err error) { ...@@ -443,7 +462,7 @@ func (srv *endlessServer) fork() (err error) {
} }
orderArgs[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.Server.Addr orderArgs[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.Server.Addr
} }
log.Info(123)
env := append( env := append(
os.Environ(), os.Environ(),
"ENDLESS_CONTINUE=1", "ENDLESS_CONTINUE=1",
...@@ -458,7 +477,7 @@ func (srv *endlessServer) fork() (err error) { ...@@ -458,7 +477,7 @@ func (srv *endlessServer) fork() (err error) {
if len(os.Args) > 1 { if len(os.Args) > 1 {
args = os.Args[1:] args = os.Args[1:]
} }
log.Info(path, args)
cmd := exec.Command(path, args...) cmd := exec.Command(path, args...)
cmd.Stdout = os.Stdout cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr cmd.Stderr = os.Stderr
...@@ -476,6 +495,7 @@ func (srv *endlessServer) fork() (err error) { ...@@ -476,6 +495,7 @@ func (srv *endlessServer) fork() (err error) {
log.Fatalf("Restart: Failed to launch, error: %v", err) log.Fatalf("Restart: Failed to launch, error: %v", err)
} }
srv.runningServersForked = false
return return
} }
......
package ioutils
import (
"encoding/binary"
"io"
)
//WriteLField 写域内容
func WriteLField(writer io.Writer, s []byte) (int, error) {
l := uint8(len(s))
e := binary.Write(writer, binary.BigEndian, l)
if e != nil {
return 0, e
}
n, err := writer.Write(s)
if err != nil {
return n + 1, err
}
return n + 1, nil
}
//ReadLField 读取域内容
func ReadLField(reader io.Reader, buf []byte) ([]byte, int, error) {
l := uint8(0)
e := binary.Read(reader, binary.BigEndian, &l)
if e != nil {
return nil, 0, e
}
tmpbuf := buf
if int(l) > len(buf) {
tmpbuf = make([]byte, l, l)
} else {
tmpbuf = buf[:l]
}
_, err := reader.Read(tmpbuf)
if err != nil {
return nil, 0, err
}
return tmpbuf, int(l) + 1, nil
}
package listener
import "sync"
//InterceptFunc 拦截函数
type InterceptFunc func(event interface{}) error
//Intercept 拦截器
type Intercept struct {
callbacks []InterceptFunc
locker sync.RWMutex
}
//NewIntercept 创建拦截器
func NewIntercept() *Intercept {
return &Intercept{
callbacks: nil,
locker: sync.RWMutex{},
}
}
//Add add
func (i *Intercept) Add(f func(v interface{}) error) {
i.locker.Lock()
i.callbacks = append(i.callbacks, InterceptFunc(f))
i.locker.Unlock()
}
//Call call
func (i *Intercept) Call(v interface{}) error {
i.locker.RLock()
fs := i.callbacks
i.locker.RUnlock()
for _, f := range fs {
err := f(v)
if err != nil {
return err
}
}
return nil
}
...@@ -5,7 +5,7 @@ import "sync" ...@@ -5,7 +5,7 @@ import "sync"
//CallbackFunc 回调函数 //CallbackFunc 回调函数
type CallbackFunc func(event interface{}) type CallbackFunc func(event interface{})
//Listener //Listener 监听者
type Listener struct { type Listener struct {
callbacks []CallbackFunc callbacks []CallbackFunc
callbacksOnce []CallbackFunc callbacksOnce []CallbackFunc
......
package manager
import "sync"
//Manager manager
type Manager struct {
locker sync.RWMutex
objs map[string]interface{}
}
//NewManager new manager
func NewManager() *Manager {
return &Manager{
locker: sync.RWMutex{},
objs: make(map[string]interface{}),
}
}
//Get get
func (m *Manager) Get(key string) (interface{}, bool) {
m.locker.RLock()
v, has := m.objs[key]
m.locker.RUnlock()
return v, has
}
//Set set
func (m *Manager) Set(key string, value interface{}) {
m.locker.Lock()
m.objs[key] = value
m.locker.Unlock()
}
//Value value
type Value struct {
locker sync.RWMutex
isHas bool
value interface{}
}
//NewValue new value
func NewValue() *Value {
return &Value{
locker: sync.RWMutex{},
isHas: false,
value: nil,
}
}
//Set set
func (v *Value) Set(value interface{}) {
v.locker.Lock()
v.value = value
v.isHas = true
v.locker.Unlock()
}
//Get get
func (v *Value) Get() (interface{}, bool) {
v.locker.RLock()
value, has := v.value, v.isHas
v.locker.RUnlock()
return value, has
}
package main
import (
"fmt"
"github.com/eolinker/goku-api-gateway/common/pdao"
)
type DemoGetDao interface {
GetNameById(id int) string
GetIdByName(name string) int
}
type DemoSetDao interface {
Set(id int,name string)
}
type Demo struct {
names map[string]int
ids map[int]string
}
func NewDemo() *Demo {
return &Demo{
names: make(map[string]int),
ids: make(map[int]string),
}}
func (d *Demo) Set(id int, name string) {
d.names[name]=id
d.ids[id]=name
}
func (d *Demo) GetNameById(id int) string {
return d.ids[id]
}
func (d *Demo) GetIdByName(name string) int {
return d.names[name]
}
var (
getDao DemoGetDao
setDao DemoSetDao
)
func init() {
pdao.Need(&getDao)
pdao.Need(&setDao)
}
func main() {
demo:=NewDemo()
var seter DemoSetDao = demo
pdao.Set(&seter)
var getter DemoGetDao = demo
pdao.Set(&getter)
pdao.Check()
setDao.Set(1,"test")
fmt.Println(getDao.GetNameById(1))
fmt.Println(getDao.GetIdByName("test"))
}
\ No newline at end of file
package pdao
import "database/sql"
//FactoryFunc factoryFunc
type FactoryFunc func(db *sql.DB) (interface{}, error)
//Create create
func (f FactoryFunc) Create(db *sql.DB) (interface{}, error) {
return f(db)
}
//Factory factory
type Factory interface {
Create(db *sql.DB) (interface{}, error)
}
//DBBuilder dbBuilder
type DBBuilder interface {
Build(db *sql.DB) error
}
package pdao
import (
"fmt"
"reflect"
"sync"
)
//NeedsManager needsManager
type NeedsManager struct {
daoInterfaces map[string][]*reflect.Value
lock sync.Mutex
}
//NewNeedsManager 创建新的needsManager
func NewNeedsManager() *NeedsManager {
return &NeedsManager{
daoInterfaces: make(map[string][]*reflect.Value),
lock: sync.Mutex{},
}
}
func (m *NeedsManager) add(key string, v *reflect.Value) {
m.lock.Lock()
m.daoInterfaces[key] = append(m.daoInterfaces[key], v)
m.lock.Unlock()
}
func (m *NeedsManager) set(key string, v reflect.Value) {
m.lock.Lock()
for _, e := range m.daoInterfaces[key] {
e.Set(v)
}
delete(m.daoInterfaces, key)
m.lock.Unlock()
}
func (m *NeedsManager) check() []string {
m.lock.Lock()
r := make([]string, 0, len(m.daoInterfaces))
for pkg := range m.daoInterfaces {
r = append(r, pkg)
}
m.lock.Unlock()
return r
}
//Need 声明
func (m *NeedsManager) Need(p interface{}) {
v := reflect.ValueOf(p)
if v.Kind() != reflect.Ptr {
panic("must ptr")
}
e := v.Elem()
pkg := key(e.Type())
if pkg == "" {
panic("invalid interface")
}
if !e.CanSet() {
panic("invalid interface")
}
m.add(pkg, &e)
}
//Set 注入
func (m *NeedsManager) Set(i interface{}) {
v := reflect.ValueOf(i)
if v.Kind() != reflect.Ptr {
panic("must ptr")
}
e := v.Elem()
pkg := key(e.Type())
if pkg == "" {
panic("invalid interface")
}
m.set(pkg, e)
}
func key(t reflect.Type) string {
return fmt.Sprintf("%s.%s", t.PkgPath(), t.String())
}
//Check 检查是否实现相关dao类
func (m *NeedsManager) Check() error {
rs := m.check()
if len(rs) > 0 {
return fmt.Errorf("need:%v", rs)
}
return nil
}
package pdao
import (
"fmt"
"testing"
test1 "github.com/eolinker/goku-api-gateway/common/pdao/test/test/test/test"
)
type T1 struct {
}
func (t *T1) Test4() {
fmt.Println("implement me")
}
type T2 struct {
}
func (t *T2) Test3() {
fmt.Println("implement me")
}
type T3 struct {
}
func (t *T3) Test2() {
fmt.Println("implement me")
}
type T4 struct {
}
func (t *T4) Test1() {
fmt.Println("implement me")
}
func Test(t *testing.T) {
t.Run("all", func(t *testing.T) {
var t1 test1.Test
//var t2 test2.Test = nil
//var t3 test3.Test = nil
//var t4 test4.Test = nil
Need(&t1)
//Need(&t2)
//Need(&t3)
//Need(&t4)
var t1v test1.Test = new(T4)
Set(&t1v)
//Set(test2.Test(new(T3)))
//
//Set(test3.Test(new(T2)))
//Set(test4.Test(new(T1)))
t1.Test1()
//t2.Test2()
//t3.Test3()
//t4.Test4()
Check()
})
}
package pdao
import (
"database/sql"
"sync"
)
var (
needsManager = NewNeedsManager()
factoryManage = NewFactoryManager()
)
//Need need
func Need(is ...interface{}) {
for _, i := range is {
needsManager.Need(i)
}
}
//Check check
func Check() error {
return needsManager.Check()
}
//RegisterDao 注册dao类
func RegisterDao(driver string, factories ...Factory) {
for _, factory := range factories {
factoryManage.RegisterDao(driver, factory)
}
}
//RegisterDBBuilder 注册dbBuilder
func RegisterDBBuilder(driver string, builders ...DBBuilder) {
for _, builder := range builders {
factoryManage.RegisterDBBuilder(driver, builder)
}
}
//Build build
func Build(driver string, db *sql.DB) error {
return factoryManage.Build(driver, db, needsManager)
}
//FactoryManager 工厂管理者
type FactoryManager struct {
factories map[string][]Factory
builders map[string][]DBBuilder
locker sync.Mutex
}
//NewFactoryManager new工厂管理者
func NewFactoryManager() *FactoryManager {
return &FactoryManager{
factories: make(map[string][]Factory),
builders: make(map[string][]DBBuilder),
locker: sync.Mutex{},
}
}
//RegisterDBBuilder dbBuilder注册器
func (f *FactoryManager) RegisterDBBuilder(driver string, builder DBBuilder) {
f.locker.Lock()
f.builders[driver] = append(f.builders[driver], builder)
f.locker.Unlock()
}
//RegisterDao dao类注册器
func (f *FactoryManager) RegisterDao(driver string, factory Factory) {
f.locker.Lock()
f.factories[driver] = append(f.factories[driver], factory)
f.locker.Unlock()
}
func (f *FactoryManager) get(driver string) []Factory {
f.locker.Lock()
fs := f.factories[driver]
delete(f.factories, driver)
f.locker.Unlock()
return fs
}
func (f *FactoryManager) callBuild(driver string, db *sql.DB) error {
f.locker.Lock()
bs := f.builders[driver]
for _, b := range bs {
err := b.Build(db)
if err != nil {
f.locker.Unlock()
return err
}
}
f.locker.Unlock()
return nil
}
//Build build
func (f *FactoryManager) Build(driver string, db *sql.DB, m *NeedsManager) error {
err := f.callBuild(driver, db)
if err != nil {
return err
}
fs := f.get(driver)
for _, factory := range fs {
i, err := factory.Create(db)
if err != nil {
return err
}
m.Set(i)
}
return nil
}
package test
type Test interface {
Test4()
}
\ No newline at end of file
package test
type Test interface {
Test3()
}
\ No newline at end of file
package test
type Test interface {
Test2()
}
\ No newline at end of file
package test
type Test interface {
Test1()
}
\ No newline at end of file
...@@ -54,3 +54,18 @@ func Get(name string) (Redis, bool) { ...@@ -54,3 +54,18 @@ func Get(name string) (Redis, bool) {
return nil, false return nil, false
} }
func del(name string) {
locker.RLock()
defer locker.RUnlock()
delete(redisOfCluster, name)
return
}
//Delete delete
func Delete(name string) {
del(name)
return
}
...@@ -5,6 +5,9 @@ import "github.com/go-redis/redis" ...@@ -5,6 +5,9 @@ import "github.com/go-redis/redis"
const ( const (
//RedisModeCluster cluster模式 //RedisModeCluster cluster模式
RedisModeCluster = "cluster" RedisModeCluster = "cluster"
//RedisModeSentinel sentinel模式
RedisModeSentinel = "sentinel"
//RedisModeStand stand模式 //RedisModeStand stand模式
RedisModeStand = "stand" RedisModeStand = "stand"
) )
......
package redis_manager package redis_manager
import ( import (
"fmt"
"sort"
"strings"
"sync" "sync"
"github.com/go-redis/redis"
) )
var ( var (
...@@ -17,17 +22,77 @@ func SetDefault(r Redis) { ...@@ -17,17 +22,77 @@ func SetDefault(r Redis) {
//GetConnection 获取redis连接 //GetConnection 获取redis连接
func GetConnection() Redis { func GetConnection() Redis {
if def != nil { //if def != nil {
return def return def
} //}
//defLocker.Lock()
//defer defLocker.Unlock()
//
//def = Create(defaultConfig)
//return def
}
defLocker.Lock() //CheckConnection 获取redis连接
defer defLocker.Unlock() func CheckConnection(mode, addrs, password, masters string, dbIndex int) bool {
switch mode {
case RedisModeCluster:
{
a := strings.Split(addrs, ",")
option := &redis.ClusterOptions{
Addrs: a,
Password: password,
PoolSize: 2000,
ReadOnly: true,
RouteByLatency: true,
}
r := redis.NewClusterClient(option)
defer r.Close()
if _, err := r.Ping().Result(); err != nil {
return false
}
return true
}
case RedisModeSentinel:
{
a := strings.Split(addrs, ",")
option := redis.FailoverOptions{
SentinelAddrs: a,
MasterName: masters,
Password: password,
DB: dbIndex,
PoolSize: _PoolSize,
}
r := redis.NewFailoverClient(&option)
defer r.Close()
if _, err := r.Ping().Result(); err != nil {
return false
}
return true
}
case RedisModeStand:
{
if def != nil { a := strings.Split(addrs, ",")
return def
option := redis.RingOptions{
Addrs: make(map[string]string),
Password: password,
DB: dbIndex,
PoolSize: _PoolSize,
}
sort.Strings(a)
for i, ad := range a {
option.Addrs[fmt.Sprintf("shad:%d", i)] = ad
}
r := redis.NewRing(&option)
defer r.Close()
if _, err := r.Ping().Result(); err != nil {
return false
}
return true
}
} }
return false
def = Create(defaultConfig)
return def
} }
...@@ -5,9 +5,12 @@ import ( ...@@ -5,9 +5,12 @@ import (
"errors" "errors"
"sync" "sync"
) )
var ( var (
//ErrorContextDone context done error
ErrorContextDone = errors.New("context done") ErrorContextDone = errors.New("context done")
) )
//Telegraph telegraph //Telegraph telegraph
type Telegraph struct { type Telegraph struct {
value interface{} value interface{}
...@@ -50,7 +53,7 @@ func (t *Telegraph) get() (string, <-chan struct{}, interface{}) { ...@@ -50,7 +53,7 @@ func (t *Telegraph) get() (string, <-chan struct{}, interface{}) {
} }
//Get get //Get get
func (t *Telegraph) Get(version string) (interface{},error) { func (t *Telegraph) Get(version string) (interface{}, error) {
return t.GetWidthContext(context.Background(), version) return t.GetWidthContext(context.Background(), version)
} }
...@@ -63,21 +66,21 @@ func (t *Telegraph) Close() { ...@@ -63,21 +66,21 @@ func (t *Telegraph) Close() {
} }
//GetWidthContext 获取上下文 //GetWidthContext 获取上下文
func (t *Telegraph) GetWidthContext(ctx context.Context, version string) (interface{} ,error){ func (t *Telegraph) GetWidthContext(ctx context.Context, version string) (interface{}, error) {
v, c, value := t.get() v, c, value := t.get()
if v == "" { if v == "" {
// closed // closed
return nil,nil return nil, nil
} }
if version != v { if version != v {
return value,nil return value, nil
} }
select { select {
case <-c: case <-c:
return t.GetWidthContext(ctx, version) return t.GetWidthContext(ctx, version)
case <-ctx.Done(): case <-ctx.Done():
return nil,ErrorContextDone return nil, ErrorContextDone
} }
} }
package version package version
//Version 版本号 //Version 版本号
const Version = "3.1.1" const Version = "3.1.3"
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册