提交 c580bfa9 编写于 作者: Y Your Name

#console 整理代码

上级 dc5082a8
/.idea/
/vendor/
/out/
/vendor
/.vscode/
\ No newline at end of file
cluster:
-
name: "Default"
- name: "Default"
title: "默认机房"
note: "默认机房"
db:
......
......@@ -15,17 +15,16 @@ var (
UserPassword string
UserName string
ConfFilepath = "./config/goku.conf"
)
func main() {
flag.StringVar(&ConfFilepath, "c", "./config/goku.conf", "Please provide a valid configuration file path")
flag.StringVar(&UserName, "u", "", "Please provide user name")
flag.StringVar(&UserPassword, "p", "", "Please provide user password")
isDebug := flag.Bool("debug",false,"")
isDebug := flag.Bool("debug", false, "")
flag.Parse()
if *isDebug{
if *isDebug {
log.StartDebug()
}
// 初始化配置
......@@ -42,11 +41,10 @@ func main() {
_ = general.General()
// 检测是否安装
if s, err := account.CheckSuperAdminCount(); err!= nil {
if s, err := account.CheckSuperAdminCount(); err != nil {
log.Panic(err)
return
}else if s == 0 {
} else if s == 0 {
if UserName == "" {
log.Fatal("[ERROR] Fail to create administrator. Please try again or contact technical support of eoLinker GOKU API Gateway.")
//fmt.Println("[ERROR] Fail to create administrator. Please try again or contact technical support of eoLinker GOKU API Gateway.")
......@@ -68,7 +66,6 @@ func main() {
}
}
console.Router()
console.Server()
}
......@@ -40,16 +40,15 @@ func initConfig(resultInfo map[string]interface{}) *entity.ClusterInfo {
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
flag.StringVar(&adminHost, "admin", "127.0.0.1:7005", "Please provide a valid host!")
//flag.IntVar(&adminPort, "P", 7005, "Please provide a valid port")
flag.IntVar(&listenPort, "port", 6689, "Please provide a valid listen port!")
isDebug := flag.Bool("debug",false,"")
isDebug := flag.Bool("debug", false, "")
flag.Parse()
if *isDebug{
if *isDebug {
log.StartDebug()
}
//
......@@ -64,9 +63,9 @@ func main() {
node_common.SetClusterName(config.Name)
err:=database.InitConnection(&config.DB)
if err!=nil{
log.Fatal("Fail to Init db:",err)
err := database.InitConnection(&config.DB)
if err != nil {
log.Fatal("Fail to Init db:", err)
return
}
goku_node.InitLog()
......@@ -76,7 +75,7 @@ func main() {
log.Debug("redis_manager.SetDefault")
// 其他需要初始化的模块
_=general.General()
_ = general.General()
log.Debug("general.General()")
goku_node.InitServer()
......@@ -89,5 +88,4 @@ func main() {
}
log.Fatalf("Server on :%d stoped \n", listenPort)
}
package auto
import (
"errors"
"fmt"
"net"
......@@ -90,10 +89,10 @@ func setValues(ctx Values, c interface{}) error {
if !has {
continue
}
if tag == "-"{
if tag == "-" {
continue
}
name,opts := parseTag(tag)
name, opts := parseTag(tag)
if !isValidTag(name) {
continue
}
......@@ -104,8 +103,8 @@ func setValues(ctx Values, c interface{}) error {
return err
}
} else {
if opts.Contains("require"){
return fmt.Errorf("require value of [%s] but has no",name)
if opts.Contains("require") {
return fmt.Errorf("require value of [%s] but has no", name)
}
defaultVal := field.Tag.Get("default")
......
......@@ -59,4 +59,4 @@ func isValidTag(s string) bool {
}
}
return true
}
\ No newline at end of file
}
......@@ -137,7 +137,7 @@ func NewServer(addr string, handler http.Handler) (srv *endlessServer) {
srv.Server.Handler = handler
srv.BeforeBegin = func(addr string) {
log.Info("start service:",syscall.Getpid(), addr)
log.Info("start service:", syscall.Getpid(), addr)
}
runningServersOrder = append(runningServersOrder, addr)
......
......@@ -26,7 +26,7 @@ func Create(config RedisConfig) Redis {
return &redisProxy{
Cmdable: redis.NewClusterClient(option),
config: config,
config: config,
}
}
case RedisModeSentinel:
......@@ -41,7 +41,7 @@ func Create(config RedisConfig) Redis {
}
return &redisProxy{
Cmdable: redis.NewSentinelRing(&option),
config: config,
config: config,
}
}
case RedisModeStand:
......@@ -63,7 +63,7 @@ func Create(config RedisConfig) Redis {
return &redisProxy{
Cmdable: redis.NewRing(&option),
config: config,
config: config,
}
}
}
......
......@@ -5,7 +5,7 @@ import (
)
var (
redisOfCluster = make(map[string]Redis)
redisOfCluster = make(map[string]Redis)
redisConfCluster = make(map[string]RedisConfig)
locker sync.RWMutex
......@@ -28,20 +28,20 @@ func get(name string) (Redis, bool) {
return r, h
}
func Get(name string) (Redis, bool) {
r,has:=get(name)
r, has := get(name)
if has{
return r,r!=nil
if has {
return r, r != nil
}
locker.Lock()
defer locker.Unlock()
r, has = redisOfCluster[name]
if has{
return r,has
if has {
return r, has
}
c,h:= redisConfCluster[name]
if h{
c, h := redisConfCluster[name]
if h {
r = Create(c)
redisOfCluster[name] = r
return r, h
......@@ -49,5 +49,5 @@ func Get(name string) (Redis, bool) {
redisOfCluster[name] = nil
return nil,false
return nil, false
}
......@@ -9,18 +9,16 @@ type redisProxy struct {
config RedisConfig
}
func (p *redisProxy) Nodes() []string {
ch:= make(chan string,1)
ch := make(chan string, 1)
switch p.config.GetMode() {
case RedisModeCluster:
{
conn := p.Cmdable.(*redis.ClusterClient)
go func(ch chan string ) {
go func(ch chan string) {
conn.ForEachMaster(func(client *redis.Client) error {
ch<-client.Options().Addr
ch <- client.Options().Addr
return nil
})
close(ch)
......@@ -31,9 +29,9 @@ func (p *redisProxy) Nodes() []string {
{
conn := p.Cmdable.(*redis.SentinelRing)
go func(ch chan string ) {
go func(ch chan string) {
conn.ForEachAddr(func(addr string) error {
ch<-addr
ch <- addr
return nil
})
close(ch)
......@@ -43,9 +41,9 @@ func (p *redisProxy) Nodes() []string {
{
conn := p.Cmdable.(*redis.Ring)
go func(ch chan string ) {
go func(ch chan string) {
conn.ForEachShard(func(client *redis.Client) error {
ch<-client.Options().Addr
ch <- client.Options().Addr
return nil
})
close(ch)
......@@ -53,10 +51,9 @@ func (p *redisProxy) Nodes() []string {
}
}
nodes:= make([]string,0,10)
for addr:=range ch{
nodes = append(nodes,addr)
nodes := make([]string, 0, 10)
for addr := range ch {
nodes = append(nodes, addr)
}
return nodes
}
......
......@@ -8,12 +8,11 @@ const (
RedisModeStand = "stand"
)
type Redis interface {
redis.Cmdable
GetConfig() RedisConfig
//Foreach(fn func(client *localRedis.Client) error) error
Nodes()[]string
Nodes() []string
}
type RedisConfig interface {
......
......@@ -30,4 +30,3 @@ func GetConnection() Redis {
def = Create(defaultConfig)
return def
}
......@@ -17,13 +17,12 @@ import (
"github.com/eolinker/goku/common/redis/internal"
)
// RingOptions are used to configure a ring client and should be
// passed to NewRing.
type SentinelRingOptions struct {
// Map of name => host:port addresses of ring shards.
Addrs []string
Masters[]string
Addrs []string
Masters []string
// Frequency of PING commands sent to check shards availability.
// Shard is considered down after 3 subsequent failed checks.
HeartbeatFrequency time.Duration
......@@ -98,10 +97,10 @@ func (opt *SentinelRingOptions) init() {
func (opt *SentinelRingOptions) clientOptions(masterName string) *FailoverOptions {
return &FailoverOptions{
MasterName:masterName,
SentinelAddrs:opt.Addrs,
DB: opt.DB,
Password: opt.Password,
MasterName: masterName,
SentinelAddrs: opt.Addrs,
DB: opt.DB,
Password: opt.Password,
DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout,
......@@ -322,8 +321,8 @@ func NewSentinelRing(opt *SentinelRingOptions) *SentinelRing {
clopt := opt.clientOptions(masterName)
ring.shards.Add(masterName, NewFailoverClient(clopt))
}
ring.sentinelClients = make([]*SentinelClient,0,len(opt.Addrs))
for _, addr:=range opt.Addrs{
ring.sentinelClients = make([]*SentinelClient, 0, len(opt.Addrs))
for _, addr := range opt.Addrs {
sentinel := NewSentinelClient(&Options{
Addr: addr,
......@@ -335,11 +334,11 @@ func NewSentinelRing(opt *SentinelRingOptions) *SentinelRing {
WriteTimeout: opt.WriteTimeout,
PoolSize: 1,
PoolTimeout: opt.PoolTimeout,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
})
ring.sentinelClients = append(ring.sentinelClients,sentinel)
ring.sentinelClients = append(ring.sentinelClients, sentinel)
}
go ring.shards.Heartbeat(opt.HeartbeatFrequency)
......@@ -429,24 +428,21 @@ func (c *SentinelRing) PSubscribe(channels ...string) *PubSub {
//// It returns the first error if any.
func (c *SentinelRing) ForEachAddr(fn func(addr string) error) error {
for _, masterName := range c.opt.Masters {
for _, sentinel := range c.sentinelClients {
masterAddrs, err := sentinel.GetMasterAddrByName(masterName).Result()
for _, sentinel := range c.sentinelClients {
masterAddrs, err := sentinel.GetMasterAddrByName(masterName).Result()
if err != nil {
internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s",masterName, err)
continue
}
fn(fmt.Sprintf("%s:%s",masterAddrs[0],masterAddrs[1]))
break
if err != nil {
internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s", masterName, err)
continue
}
fn(fmt.Sprintf("%s:%s", masterAddrs[0], masterAddrs[1]))
break
}
}
return nil
}
......
......@@ -5,7 +5,8 @@ import (
"strconv"
"strings"
)
func GetIpPort(r *http.Request) (string,int,error){
func GetIpPort(r *http.Request) (string, int, error) {
ip := r.RemoteAddr
ip = ip[:strings.Index(ip, ":")]
if realIP := strings.TrimSpace(r.Header.Get("X-Real-Ip")); realIP != "" {
......@@ -15,9 +16,7 @@ func GetIpPort(r *http.Request) (string,int,error){
p := r.FormValue("port")
port, err := strconv.Atoi(p)
if err != nil {
return ip,port,err
return ip, port, err
}
return ip,port,nil
return ip, port, nil
}
......@@ -7,21 +7,19 @@ import (
"strconv"
)
func heartbead(w http.ResponseWriter, r *http.Request) {
func heartbead(w http.ResponseWriter, r *http.Request) {
ip, port, err := GetIpPort(r)
if err != nil {
controller.WriteError(w, "700000", "node", err.Error(), err)
return
return
}
node.Refresh(ip,strconv.Itoa(port))
node.Refresh(ip, strconv.Itoa(port))
controller.WriteResultInfo(w, "node", "node", nil)
}
func stopNode(w http.ResponseWriter, r *http.Request) {
func stopNode(w http.ResponseWriter, r *http.Request) {
ip, port, err := GetIpPort(r)
if err != nil {
......@@ -29,6 +27,6 @@ func stopNode(w http.ResponseWriter, r *http.Request) {
controller.WriteError(w, "700000", "node", err.Error(), err)
return
}
node.NodeStop(ip,strconv.Itoa(port))
node.NodeStop(ip, strconv.Itoa(port))
controller.WriteResultInfo(w, "node", "node", nil)
}
\ No newline at end of file
}
......@@ -22,7 +22,7 @@ func Register(w http.ResponseWriter, r *http.Request) {
controller.WriteError(w, "700001", "cluster", err.Error()+ip, err)
return
}
node.Refresh(ip,strconv.Itoa(port))
node.Refresh(ip, strconv.Itoa(port))
controller.WriteResultInfo(w, "cluster", "cluster", cluster)
}
......
......@@ -7,7 +7,7 @@ func router() http.Handler {
serverHandler := http.NewServeMux()
serverHandler.HandleFunc("/register", Register)
serverHandler.HandleFunc("/node/heartbeat", heartbead)
serverHandler.HandleFunc("/node/stop",stopNode)
serverHandler.HandleFunc("/node/stop", stopNode)
serverHandler.HandleFunc("/alert/msg/add", AddAlertMsg)
return serverHandler
}
......@@ -9,8 +9,6 @@ import (
"strconv"
)
// 用户登录
func Login(httpResponse http.ResponseWriter, httpRequest *http.Request) {
......
package node
import (
"encoding/json"
"errors"
log "github.com/eolinker/goku/goku-log"
"net/http"
"strconv"
"github.com/eolinker/goku/console/controller"
"github.com/eolinker/goku/console/module/node"
cluster2 "github.com/eolinker/goku/server/cluster"
......@@ -42,7 +39,6 @@ func AddNode(httpResponse http.ResponseWriter, httpRequest *http.Request) {
groupID := httpRequest.PostFormValue("groupID")
gatewayPath := httpRequest.PostFormValue("gatewayPath")
gID, err := strconv.Atoi(groupID)
if err != nil && groupID != "" {
controller.WriteError(httpResponse, "230015", "", "[ERROR]Illegal groupID!", err)
......@@ -72,7 +68,6 @@ func AddNode(httpResponse http.ResponseWriter, httpRequest *http.Request) {
}
}
exits := node.CheckIsExistRemoteAddr(0, nodeIP, nodePort)
if exits {
......@@ -84,8 +79,6 @@ func AddNode(httpResponse http.ResponseWriter, httpRequest *http.Request) {
return
}
flag, result, err := node.AddNode(clusterId, nodeName, nodeIP, nodePort, gatewayPath, gID)
if !flag {
......@@ -168,7 +161,7 @@ func EditNode(httpResponse http.ResponseWriter, httpRequest *http.Request) {
}
flag, result, _ := node.EditNode(nodeName, nodeIP, nodePort, gatewayPath, id, gID)
flag, result, _ := node.EditNode(nodeName, nodeIP, nodePort, gatewayPath, id, gID)
if !flag {
controller.WriteError(httpResponse, "330000", "node", result, nil)
......@@ -292,7 +285,6 @@ func GetNodeInfo(httpResponse http.ResponseWriter, httpRequest *http.Request) {
return
}
// 节点IP查重
func CheckIsExistRemoteAddr(httpResponse http.ResponseWriter, httpRequest *http.Request) {
......@@ -331,7 +323,6 @@ func CheckIsExistRemoteAddr(httpResponse http.ResponseWriter, httpRequest *http.
return
}
// 批量修改节点分组
func BatchEditNodeGroup(httpResponse http.ResponseWriter, httpRequest *http.Request) {
......
......@@ -188,8 +188,8 @@ func GetNodeGroupList(httpResponse http.ResponseWriter, httpRequest *http.Reques
}
cluserName := httpRequest.FormValue("cluster")
clusterId,has := cluster2.GetId(cluserName)
if !has{
clusterId, has := cluster2.GetId(cluserName)
if !has {
controller.WriteError(httpResponse,
"280001",
"nodeGroup",
......
......@@ -5,11 +5,11 @@ import (
log "github.com/eolinker/goku/goku-log"
)
func InitLog() {
func InitLog() {
c, _ := module.Get(module.ConsoleLog)
period,_:=log.ParsePeriod(c.Period)
log.SetOutPut(c.Enable ,c.Dir,c.File,period,c.Expire)
l,_:= log.ParseLevel(c.Level)
period, _ := log.ParsePeriod(c.Period)
log.SetOutPut(c.Enable, c.Dir, c.File, period, c.Expire)
l, _ := log.ParseLevel(c.Level)
log.SetLevel(l)
}
\ No newline at end of file
}
......@@ -19,7 +19,7 @@ func Register(loginCall, loginPassword string) bool {
return console_mysql.Register(loginCall, loginPassword)
}
func CheckSuperAdminCount()(int,error){
b,err:=console_mysql.CheckSuperAdminCount()
return b,err
}
\ No newline at end of file
func CheckSuperAdminCount() (int, error) {
b, err := console_mysql.CheckSuperAdminCount()
return b, err
}
......@@ -3,6 +3,7 @@ package account
import (
"github.com/eolinker/goku/server/dao/console-mysql"
)
// 获取具有编辑权限的用户列表
func GetUserListWithPermission(operationType, operation string) (bool, []map[string]interface{}, error) {
return console_mysql.GetUserListWithPermission(operationType, operation)
......
package api
import (
dao "github.com/eolinker/goku/server/dao"
"github.com/eolinker/goku/server/dao"
console_mysql "github.com/eolinker/goku/server/dao/console-mysql"
entity "github.com/eolinker/goku/server/entity/console-entity"
)
......
package api
import (
dao "github.com/eolinker/goku/server/dao"
"github.com/eolinker/goku/server/dao"
console_mysql "github.com/eolinker/goku/server/dao/console-mysql"
)
......
......@@ -4,7 +4,7 @@ import (
"strconv"
"strings"
dao "github.com/eolinker/goku/server/dao"
"github.com/eolinker/goku/server/dao"
console_mysql "github.com/eolinker/goku/server/dao/console-mysql"
)
......
package api
import (
dao "github.com/eolinker/goku/server/dao"
"github.com/eolinker/goku/server/dao"
console_mysql "github.com/eolinker/goku/server/dao/console-mysql"
)
......
......@@ -14,69 +14,72 @@ import (
func Add(info *Param) (string, error) {
const TableName = "goku_balance"
serviceInfo,err:= service.Get(info.ServiceName)
if err!=nil{
return fmt.Sprintf("serviceName:%s",err.Error()),err
serviceInfo, err := service.Get(info.ServiceName)
if err != nil {
return fmt.Sprintf("serviceName:%s", err.Error()), err
}
switch serviceInfo.Type {
case driver2.Static:{
if info.Static == ""&& info.StaticCluster ==""{
return "param:static 和 staticCluster 不能同时为空",errors.New( "param:static 和 staticCluster 不能同时为空")
case driver2.Static:
{
if info.Static == "" && info.StaticCluster == "" {
return "param:static 和 staticCluster 不能同时为空", errors.New("param:static 和 staticCluster 不能同时为空")
}
now := time.Now().Format("2006-01-02 15:04:05")
result, err := dao_balance.AddStatic(info.Name, info.ServiceName, info.Static, info.StaticCluster, info.Desc, now)
if err == nil {
dao.UpdateTable(TableName)
}
return result, err
}
now := time.Now().Format("2006-01-02 15:04:05")
result, err :=dao_balance.AddStatic(info.Name,info.ServiceName,info.Static,info.StaticCluster,info.Desc,now)
if err == nil {
dao.UpdateTable(TableName)
case driver2.Discovery:
{
if info.AppName == "" {
return "param:appName 不能为空", errors.New("param:appName 不能为空")
}
now := time.Now().Format("2006-01-02 15:04:05")
result, err := dao_balance.AddDiscovery(info.Name, info.ServiceName, info.AppName, info.Desc, now)
if err == nil {
dao.UpdateTable(TableName)
}
return result, err
}
return result, err
}
case driver2.Discovery:{
if info.AppName == ""{
return "param:appName 不能为空",errors.New( "param:appName 不能为空")
}
now := time.Now().Format("2006-01-02 15:04:05")
result, err :=dao_balance.AddDiscovery(info.Name,info.ServiceName,info.AppName,info.Desc,now)
if err == nil {
dao.UpdateTable(TableName)
}
return result, err
}
}
return "无效serviceName", errors.New("invalid serviceName")
}
func Save(info *Param) (string, error) {
const TableName = "goku_balance"
serviceInfo,err:= service.Get(info.ServiceName)
if err!=nil{
return fmt.Sprintf("serviceName:%s",err.Error()),err
serviceInfo, err := service.Get(info.ServiceName)
if err != nil {
return fmt.Sprintf("serviceName:%s", err.Error()), err
}
switch serviceInfo.Type {
case driver2.Static:{
if info.Static == ""&& info.StaticCluster ==""{
return "param:static 和 staticCluster 不能同时为空",errors.New( "param:static 和 staticCluster 不能同时为空")
case driver2.Static:
{
if info.Static == "" && info.StaticCluster == "" {
return "param:static 和 staticCluster 不能同时为空", errors.New("param:static 和 staticCluster 不能同时为空")
}
now := time.Now().Format("2006-01-02 15:04:05")
result, err := dao_balance.SaveStatic(info.Name, info.ServiceName, info.Static, info.StaticCluster, info.Desc, now)
if err == nil {
dao.UpdateTable(TableName)
}
return result, err
}
now := time.Now().Format("2006-01-02 15:04:05")
result, err :=dao_balance.SaveStatic(info.Name,info.ServiceName,info.Static,info.StaticCluster,info.Desc,now)
if err == nil {
dao.UpdateTable(TableName)
case driver2.Discovery:
{
if info.AppName == "" {
return "param:appName 不能为空", errors.New("param:appName 不能为空")
}
now := time.Now().Format("2006-01-02 15:04:05")
result, err := dao_balance.SaveDiscover(info.Name, info.ServiceName, info.AppName, info.Desc, now)
if err == nil {
dao.UpdateTable(TableName)
}
return result, err
}
return result, err
}
case driver2.Discovery:{
if info.AppName == ""{
return "param:appName 不能为空",errors.New( "param:appName 不能为空")
}
now := time.Now().Format("2006-01-02 15:04:05")
result, err :=dao_balance.SaveDiscover(info.Name,info.ServiceName,info.AppName,info.Desc,now)
if err == nil {
dao.UpdateTable(TableName)
}
return result, err
}
}
......@@ -88,18 +91,18 @@ func Get(name string) (*Info, error) {
return nil, e
}
return ReadInfo(b),nil
return ReadInfo(b), nil
}
func Search(keyworkd string)([]*Info, error){
func Search(keyworkd string) ([]*Info, error) {
var entities []*entity.Balance
if keyworkd == ""{
es, e:= dao_balance.GetAll()
if keyworkd == "" {
es, e := dao_balance.GetAll()
if e != nil {
return nil, e
}
entities = es
}else{
es, e:= dao_balance.Search(keyworkd)
} else {
es, e := dao_balance.Search(keyworkd)
if e != nil {
return nil, e
}
......
......@@ -6,41 +6,40 @@ import (
)
type Param struct {
Name string `opt:"balanceName,require"`
ServiceName string `opt:"serviceName,require"`
AppName string `opt:"appName"`
Static string `opt:"static"`
Name string `opt:"balanceName,require"`
ServiceName string `opt:"serviceName,require"`
AppName string `opt:"appName"`
Static string `opt:"static"`
StaticCluster string `opt:"staticCluster"`
Desc string `opt:"balanceDesc"`
Desc string `opt:"balanceDesc"`
}
type Info struct {
Name string `json:"balanceName"`
ServiceName string `json:"serviceName"`
ServiceType string `json:"serviceType"`
ServiceDriver string `json:"serviceDriver"`
AppName string `json:"appName"`
Static string `json:"static"`
StaticCluster map[string]string `json:"staticCluster"`
Desc string `json:"balanceDesc"`
CreateTime string `json:"createTime"`
UpdateTime string `json:"updateTime"`
Name string `json:"balanceName"`
ServiceName string `json:"serviceName"`
ServiceType string `json:"serviceType"`
ServiceDriver string `json:"serviceDriver"`
AppName string `json:"appName"`
Static string `json:"static"`
StaticCluster map[string]string `json:"staticCluster"`
Desc string `json:"balanceDesc"`
CreateTime string `json:"createTime"`
UpdateTime string `json:"updateTime"`
}
func ReadInfo(balance *entity.Balance) *Info {
info:=&Info{
info := &Info{
Name: balance.Name,
ServiceName: balance.ServiceName,
ServiceType: balance.ServiceType,
ServiceDriver: balance.ServiceDriver,
AppName: balance.AppName,
AppName: balance.AppName,
Static: balance.Static,
StaticCluster: nil,
Desc: balance.Desc,
CreateTime: balance.CreateTime,
UpdateTime: balance.UpdateTime,
}
json.Unmarshal([]byte(balance.StaticCluster),&info.StaticCluster)
json.Unmarshal([]byte(balance.StaticCluster), &info.StaticCluster)
return info
}
\ No newline at end of file
}
......@@ -10,31 +10,29 @@ import (
func init() {
general.RegeditLater(Update)
}
func Update()error {
func Update() error {
l,e:= dao_balance_update.GetAllOldVerSion()
if e!=nil{
l, e := dao_balance_update.GetAllOldVerSion()
if e != nil {
return e
}
defStaticServiceName :=dao_balance_update.GetDefaultServiceStatic()
for _,e:=range l{
update(e,defStaticServiceName)
defStaticServiceName := dao_balance_update.GetDefaultServiceStatic()
for _, e := range l {
update(e, defStaticServiceName)
}
return nil
}
func update(e *entity.BalanceInfoEntity,serviceName string) {
func update(e *entity.BalanceInfoEntity, serviceName string) {
if e==nil{
if e == nil {
return
}
param:=&Param{
param := &Param{
Name: e.Name,
ServiceName: serviceName,
AppName: "",
......@@ -43,27 +41,26 @@ func update(e *entity.BalanceInfoEntity,serviceName string) {
Desc: e.Desc,
}
info,err:=e.Decode()
info, err := e.Decode()
if err!=nil{
if err != nil {
return
}
if info.Default!= nil{
if info.Default != nil {
param.Static = info.Default.ServersConfigOrg
}
if info.Cluster !=nil{
cluster:=make(map[string]string)
for clusterName,server:=range info.Cluster{
if info.Cluster != nil {
cluster := make(map[string]string)
for clusterName, server := range info.Cluster {
cluster[clusterName] = server.ServersConfigOrg
}
data ,err:= json.Marshal(cluster)
data, err := json.Marshal(cluster)
if err==nil{
if err == nil {
param.StaticCluster = string(data)
}
......@@ -71,5 +68,4 @@ func update(e *entity.BalanceInfoEntity,serviceName string) {
Save(param)
}
\ No newline at end of file
}
......@@ -7,25 +7,25 @@ import (
dao "github.com/eolinker/goku/server/dao/config-log"
)
func Get(name string) (*LogConfig,error) {
if _,has:=logNames[name];!has{
return nil,fmt.Errorf("not has that log config of %s",name)
func Get(name string) (*LogConfig, error) {
if _, has := logNames[name]; !has {
return nil, fmt.Errorf("not has that log config of %s", name)
}
c:=&LogConfig{}
c := &LogConfig{}
c.Levels = Levels
c.Periods = Periods
c.Expires = Expires
config, e := dao.Get(name)
if e!= nil || config == nil{
if e != nil || config == nil {
auto.SetDefaults(c)
c.Name = name
c.File = name
c.Level = log.ErrorLevel.String()
c.Period = log.PeriodHour.String()
c.Expire = ExpireDefault
}else{
} else {
c.Read(config)
}
......@@ -34,18 +34,18 @@ func Get(name string) (*LogConfig,error) {
func GetAccess() (*AccessConfig, error) {
config, e := dao.Get(AccessLog)
c:=new(AccessConfig)
c := new(AccessConfig)
c.Periods = Periods
c.Expires = Expires
if e!= nil || config == nil{
if e != nil || config == nil {
auto.SetDefaults(c)
c.Name = AccessLog
c.Period = log.PeriodHour.String()
c.Expire = ExpireDefault
c.InitFields()
}else{
} else {
c.Read(config)
}
return c, nil
}
\ No newline at end of file
}
package config_log
func InitLog() {
}
\ No newline at end of file
func InitLog() {
}
......@@ -10,9 +10,9 @@ import (
)
const (
ConsoleLog = "console"
NodeLog = "node"
AccessLog = "access"
ConsoleLog = "console"
NodeLog = "node"
AccessLog = "access"
ExpireDefault = 3
)
......@@ -24,24 +24,24 @@ var (
}
Expires = []ValueTitle{
{
Value:3,
Title:"3天",
Value: 3,
Title: "3天",
},
{
Value:7,
Title:"7天",
Value: 7,
Title: "7天",
},
{
Value:30,
Title:"30天",
Value: 30,
Title: "30天",
},
{
Value:90,
Title:"90天",
Value: 90,
Title: "90天",
},
{
Value:180,
Title:"180天",
Value: 180,
Title: "180天",
},
}
Periods = []NameTitle{
......@@ -98,7 +98,7 @@ type PutParam struct {
File string `opt:"file,require"`
Level string `opt:"level,require"`
Period string `opt:"period,require"`
Expire int `opt:"expire,require"`
Expire int `opt:"expire,require"`
}
func (p *PutParam) Format() (*Param, error) {
......@@ -128,7 +128,7 @@ type AccessParam struct {
File string `opt:"file,require"`
Period string `opt:"period,require"`
Fields string `opt:"fields,require"`
Expire int `opt:"expire,require"`
Expire int `opt:"expire,require"`
}
func (p *AccessParam) Format() (*Param, error) {
......@@ -169,7 +169,7 @@ func (c *LogConfig) Read(ent *entity.LogConfig) {
c.Level = ent.Level
c.Period = ent.Period
c.Expire = ent.Expire
if c.Expire < ExpireDefault{
if c.Expire < ExpireDefault {
c.Expire = ExpireDefault
}
}
......
......@@ -8,41 +8,40 @@ import (
entity "github.com/eolinker/goku/server/entity/config-log"
)
func Set(name string,param *Param)error {
if _,has:=logNames[name];!has{
return fmt.Errorf("not has that log config of %s",name)
func Set(name string, param *Param) error {
if _, has := logNames[name]; !has {
return fmt.Errorf("not has that log config of %s", name)
}
c:=new(entity.LogConfig)
c := new(entity.LogConfig)
c.Name = name
c.Level = param.Level
c.Period = param.Period
c.File = param.File
c.Dir = param.Dir
if param.Enable {
c.Enable = 1
}else{
} else {
c.Enable = 0
}
c.Fields = param.Fields
c.Expire = param.Expire
err :=dao.Set(c)
if err!=nil{
err := dao.Set(c)
if err != nil {
return err
}
_=dao2.UpdateTable("goku_config_log")
_ = dao2.UpdateTable("goku_config_log")
if name == ConsoleLog{
if name == ConsoleLog {
go reset(c)
}
return nil
}
func reset(c *entity.LogConfig) {
func reset(c *entity.LogConfig) {
period,_:=log.ParsePeriod(c.Period)
log.SetOutPut(c.Enable == 1,c.Dir,c.File,period,c.Expire)
l,_:= log.ParseLevel(c.Level)
period, _ := log.ParsePeriod(c.Period)
log.SetOutPut(c.Enable == 1, c.Dir, c.File, period, c.Expire)
l, _ := log.ParseLevel(c.Level)
log.SetLevel(l)
}
\ No newline at end of file
}
......@@ -5,30 +5,31 @@ import (
"time"
)
func genHour(beginTime, endTime string, period int)( int,int ){
func genHour(beginTime, endTime string, period int) (int, int) {
startHour := 0
endHour,_ := strconv.Atoi(time.Now().Add(time.Hour).Format("2006010215"))
endHour, _ := strconv.Atoi(time.Now().Add(time.Hour).Format("2006010215"))
switch period {
case 3:{
case 3:
{
bt,e:=time.Parse("2006-01-02", beginTime)
if e==nil{
startHour,_= strconv.Atoi(bt.Format("2006010215"))
bt, e := time.Parse("2006-01-02", beginTime)
if e == nil {
startHour, _ = strconv.Atoi(bt.Format("2006010215"))
}
et, e := time.Parse("2006-01-02", endTime)
if e == nil {
et.Add(time.Hour*24 - time.Minute)
endHour, _ = strconv.Atoi(et.Format("2006010215"))
}
}
et,e:=time.Parse("2006-01-02", endTime)
if e == nil{
et.Add(time.Hour*24-time.Minute)
endHour,_= strconv.Atoi(et.Format("2006010215"))
}
}
case 2:
startHour ,_= strconv.Atoi(time.Now().Add(- time.Hour*24*7).Format("2006010215"))
startHour, _ = strconv.Atoi(time.Now().Add(- time.Hour * 24 * 7).Format("2006010215"))
case 1:
startHour ,_= strconv.Atoi(time.Now().Add(- time.Hour*24*3).Format("2006010215"))
startHour, _ = strconv.Atoi(time.Now().Add(- time.Hour * 24 * 3).Format("2006010215"))
default:
startHour ,_= strconv.Atoi(time.Now().Format("2006010200"))
startHour, _ = strconv.Atoi(time.Now().Format("2006010200"))
}
return startHour,endHour
return startHour, endHour
}
......@@ -6,14 +6,14 @@ import (
)
// 新增节点信息
func AddNode(clusterId int, nodeName, nodeIP, nodePort, gatewayPath string, groupID int) (bool, map[string]interface{}, error) {
return console_mysql.AddNode(clusterId, nodeName, nodeIP, nodePort, gatewayPath, groupID)
func AddNode(clusterId int, nodeName, nodeIP, nodePort, gatewayPath string, groupID int) (bool, map[string]interface{}, error) {
return console_mysql.AddNode(clusterId, nodeName, nodeIP, nodePort, gatewayPath, groupID)
}
// 修改节点
func EditNode(nodeName, nodeIP, nodePort, gatewayPath string, nodeID, groupID int) (bool, string, error) {
return console_mysql.EditNode(nodeName, nodeIP, nodePort, gatewayPath, nodeID, groupID)
return console_mysql.EditNode(nodeName, nodeIP, nodePort, gatewayPath, nodeID, groupID)
}
// 删除节点
......@@ -23,32 +23,30 @@ func DeleteNode(nodeID int) (bool, string, error) {
// 获取节点信息
func GetNodeInfo(nodeID int) (bool, *entity.Node, error) {
b, node, e := console_mysql.GetNodeInfo(nodeID)
b, node, e := console_mysql.GetNodeInfo(nodeID)
ResetNodeStatus(node)
return b,node,e
return b, node, e
}
// 获取节点信息
func GetNodeInfoByIpPort(ip string, port int) (bool, *entity.Node, error) {
b, node, e := console_mysql.GetNodeByIpPort(ip, port)
ResetNodeStatus(node)
return b,node,e
return b, node, e
}
// GetNodeList 获取节点列表
func GetNodeList(clusterID, groupID int, keyword string) (bool, []*entity.Node, error) {
b, nodes, e := console_mysql.GetNodeList(clusterID, groupID, keyword)
ResetNodeStatus(nodes...)
return b,nodes,e
return b, nodes, e
}
// 节点IP查重
func CheckIsExistRemoteAddr(nodeID int, nodeIP, nodePort string) bool {
return console_mysql.CheckIsExistRemoteAddr(nodeID, nodeIP, nodePort)
}
// 批量删除节点
func BatchDeleteNode(nodeIDList string) (bool, string, error) {
flag, nodeIDList, err := console_mysql.GetAvaliableNodeListFromNodeList(nodeIDList, 0)
......@@ -65,9 +63,7 @@ func BatchEditNodeGroup(nodeIDList string, groupID int) (bool, string, error) {
return console_mysql.BatchEditNodeGroup(nodeIDList, groupID)
}
// 获取节点IP列表
func GetNodeIPList() (bool, []map[string]interface{}, error) {
return console_mysql.GetNodeIPList()
}
......@@ -7,71 +7,73 @@ import (
"time"
)
const EXPIRE = time.Second * 10
var(
const EXPIRE = time.Second * 10
var (
manager = _StatusManager{
locker:sync.RWMutex{},
lastHeartBeat:make(map[string]time.Time),
locker: sync.RWMutex{},
lastHeartBeat: make(map[string]time.Time),
}
)
type _StatusManager struct {
locker sync.RWMutex
locker sync.RWMutex
lastHeartBeat map[string]time.Time
}
func (m *_StatusManager)refresh(id string) {
t:=time.Now()
func (m *_StatusManager) refresh(id string) {
t := time.Now()
m.locker.Lock()
m.lastHeartBeat[id]=t
m.lastHeartBeat[id] = t
m.locker.Unlock()
}
func (m *_StatusManager)stop(id string) {
func (m *_StatusManager) stop(id string) {
m.locker.Lock()
delete(m.lastHeartBeat,id)
delete(m.lastHeartBeat, id)
m.locker.Unlock()
}
func (m *_StatusManager)get(id string) (time.Time, bool) {
func (m *_StatusManager) get(id string) (time.Time, bool) {
m.locker.RLock()
t,b:=m.lastHeartBeat[id]
t, b := m.lastHeartBeat[id]
m.locker.RUnlock()
return t,b
return t, b
}
func Refresh(ip string,port string) {
id:=fmt.Sprintf("%s:%d",ip,port)
func Refresh(ip string, port string) {
id := fmt.Sprintf("%s:%d", ip, port)
manager.refresh(id)
}
func NodeStop(ip,port string) {
id:=fmt.Sprintf("%s:%d",ip,port)
func NodeStop(ip, port string) {
id := fmt.Sprintf("%s:%d", ip, port)
manager.stop(id)
}
func IsLive(ip string,port string) bool {
id:=fmt.Sprintf("%s:%d",ip,port)
t,has:=manager.get(id)
if !has{
func IsLive(ip string, port string) bool {
id := fmt.Sprintf("%s:%d", ip, port)
t, has := manager.get(id)
if !has {
return false
}
if time.Now().Sub(t) > EXPIRE{
if time.Now().Sub(t) > EXPIRE {
return false
}
return true
}
func ResetNodeStatus(nodes... *entity.Node) {
for _, node:=range nodes{
func ResetNodeStatus(nodes ...*entity.Node) {
for _, node := range nodes {
if IsLive(node.NodeIP,node.NodePort){
if IsLive(node.NodeIP, node.NodePort) {
node.NodeStatus = 1
}else{
} else {
node.NodeStatus = 0
}
}
}
\ No newline at end of file
}
......@@ -23,17 +23,17 @@ func CheckConfig(pluginName string, config []byte) (bool, error) {
switch v := err.(type) {
case *json.SyntaxError:
{
end := int64(bytes.IndexFunc(config[v.Offset:], isEnd))
if end == -1{
end = int64(len(config)-1)
}else{
end := int64(bytes.IndexFunc(config[v.Offset:], isEnd))
if end == -1 {
end = int64(len(config) - 1)
} else {
end = end + v.Offset
}
start := 0
if v.Offset > 0 {
start = bytes.LastIndexFunc(config[:v.Offset], isEnd)
}
if start == -1{
if start == -1 {
start = 0
}
......@@ -41,7 +41,7 @@ func CheckConfig(pluginName string, config []byte) (bool, error) {
}
case *json.UnmarshalTypeError:
{
return false, fmt.Errorf("数据类型不正确:\"%s\":%s", v.Field,v.Value)
return false, fmt.Errorf("数据类型不正确:\"%s\":%s", v.Field, v.Value)
}
}
......
package plugin_config
func init() {
allConfigOfPlugin=map[string]interface{}{
"goku-apikey_auth":new(APIKeyConf),
"goku-basic_auth":new(basicAuthConf),
"goku-circuit_breaker":new(CircuitBreakerConf),
"goku-cors":new(gokuCorsConfig),
"goku-data_format_transformer":new(dataFormatTranformerConf),
"goku-default_response":new(defaultResponseConf),
"goku-extra_params":new(extraParamsConf),
"goku-http_log":new(Log),
"goku-ip_restriction":new(IPList),
"goku-jwt_auth":new(JwtConf),
"goku-oauth2_auth":new(Oauth2Conf),
"goku-params_check":new(paramsCheckConf),
"goku-params_transformer":new(paramsTransformerconf),
"goku-proxy_caching":new(ProxyCachingConf),
"goku-rate_limiting":new(_RateLimitingConf),
"goku-replay_attack_defender":new(ReplayAttackDefenderConf),
"goku-request_size_limiting":new(requestSizeLimit),
"goku-response_headers":new(responseHeader),
"goku-service_downgrade":new(serviceDowngradeConf),
allConfigOfPlugin = map[string]interface{}{
"goku-apikey_auth": new(APIKeyConf),
"goku-basic_auth": new(basicAuthConf),
"goku-circuit_breaker": new(CircuitBreakerConf),
"goku-cors": new(gokuCorsConfig),
"goku-data_format_transformer": new(dataFormatTranformerConf),
"goku-default_response": new(defaultResponseConf),
"goku-extra_params": new(extraParamsConf),
"goku-http_log": new(Log),
"goku-ip_restriction": new(IPList),
"goku-jwt_auth": new(JwtConf),
"goku-oauth2_auth": new(Oauth2Conf),
"goku-params_check": new(paramsCheckConf),
"goku-params_transformer": new(paramsTransformerconf),
"goku-proxy_caching": new(ProxyCachingConf),
"goku-rate_limiting": new(_RateLimitingConf),
"goku-replay_attack_defender": new(ReplayAttackDefenderConf),
"goku-request_size_limiting": new(requestSizeLimit),
"goku-response_headers": new(responseHeader),
"goku-service_downgrade": new(serviceDowngradeConf),
}
}
......@@ -170,7 +168,6 @@ type ProxyCachingConf struct {
RedisDatabase int `json:"redisDatabase"`
}
type _RateLimitingConf struct {
Second int64 `json:"second,omitempty"`
Minute int64 `json:"minute,omitempty"`
......@@ -195,4 +192,4 @@ type serviceDowngradeConf struct {
StatusCode int `json:"statusCode"`
Headers map[string]string `json:"headers"`
Body string `json:"body"`
}
\ No newline at end of file
}
......@@ -8,7 +8,8 @@ import (
entity "github.com/eolinker/goku/server/entity/console-entity"
)
const _TableName = "goku_service_config"
const _TableName = "goku_service_config"
func Add(param *AddParam) error {
err := dao_service.Add(param.Name, param.Driver, param.Desc, param.Config, param.ClusterConfig, false, param.HealthCheck, param.HealthCheckPath, param.HealthCheckCode, param.HealthCheckPeriod, param.HealthCheckTimeOut)
......@@ -30,7 +31,7 @@ func Save(param *AddParam) error {
return fmt.Errorf("not allowed change dirver from %s to %s for service", v.Driver, param.Driver)
}
err:= dao_service.Save(param.Name, param.Desc, param.Config, param.ClusterConfig, param.HealthCheck, param.HealthCheckPath, param.HealthCheckCode, param.HealthCheckPeriod, param.HealthCheckTimeOut)
err := dao_service.Save(param.Name, param.Desc, param.Config, param.ClusterConfig, param.HealthCheck, param.HealthCheckPath, param.HealthCheckCode, param.HealthCheckPeriod, param.HealthCheckTimeOut)
if err == nil {
dao.UpdateTable(_TableName)
}
......@@ -77,7 +78,7 @@ func tran(v *entity.Service) *Service {
IsDefault: v.IsDefault,
HealthCheck: v.HealthCheck,
UpdateTime: v.UpdateTime,
CreateTime:v.CreateTime,
CreateTime: v.CreateTime,
}
d, has := driver2.Get(v.Driver)
......
......@@ -16,30 +16,30 @@ type Service struct {
IsDefault bool `json:"isDefault"`
HealthCheck bool `json:"healthCheck"`
UpdateTime string `json:"updateTime"`
CreateTime string `json:"createTime"`
CreateTime string `json:"createTime"`
}
type Info struct {
*Service
Config string `json:"config"`
ClusterConfig string `json:"-"`
ClusterConfigObj map[string]string `json:"clusterConfig"`
HealthCheckPath string `json:"healthCheckPath"`
HealthCheckPeriod int `json:"healthCheckPeriod"`
HealthCheckCode string `json:"healthCheckCode"`
HealthCheckTimeOut int `json:"healthCheckTimeOut"`
Config string `json:"config"`
ClusterConfig string `json:"-"`
ClusterConfigObj map[string]string `json:"clusterConfig"`
HealthCheckPath string `json:"healthCheckPath"`
HealthCheckPeriod int `json:"healthCheckPeriod"`
HealthCheckCode string `json:"healthCheckCode"`
HealthCheckTimeOut int `json:"healthCheckTimeOut"`
}
func (i *Info) Decode() {
json.Unmarshal([]byte(i.ClusterConfig),&i.ClusterConfigObj)
func (i *Info) Decode() {
json.Unmarshal([]byte(i.ClusterConfig), &i.ClusterConfigObj)
}
type AddParam struct {
Name string `opt:"name,require"`
Driver string `opt:"driver" default:"static"`
Desc string `opt:"desc"`
Config string `opt:"config"`
ClusterConfig string `opt:"clusterConfig"`
Name string `opt:"name,require"`
Driver string `opt:"driver" default:"static"`
Desc string `opt:"desc"`
Config string `opt:"config"`
ClusterConfig string `opt:"clusterConfig"`
//ClusterConfigObj map[string]string `json:"clusterConfig"`
HealthCheck bool `opt:"healthCheck" default:"false"`
HealthCheckPath string `opt:"healthCheckPath"`
......
......@@ -45,9 +45,6 @@ func Router() {
http.HandleFunc("/monitor/gateway/getSummaryInfo", monitor.GetGatewayMonitorSummaryByPeriod)
// 项目
http.HandleFunc("/project/add", project.AddProject)
http.HandleFunc("/project/edit", project.EditProject)
......@@ -157,11 +154,9 @@ func Router() {
http.HandleFunc("/node/getList", node.GetNodeList)
http.HandleFunc("/node/checkIsExistRemoteAddr", node.CheckIsExistRemoteAddr)
http.HandleFunc("/node/batchEditGroup", node.BatchEditNodeGroup)
http.HandleFunc("/node/batchDelete", node.BatchDeleteNode)
// 节点分组
http.HandleFunc("/node/group/add", node.AddNodeGroup)
http.HandleFunc("/node/group/edit", node.EditNodeGroup)
......
......@@ -3,7 +3,7 @@ module github.com/eolinker/goku
go 1.12
require (
github.com/360EntSecGroup-Skylar/excelize v1.4.1
github.com/360EntSecGroup-Skylar/excelize v1.4.1 // indirect
github.com/eolinker/goku-plugin v0.1.3
github.com/go-redis/redis v6.15.2+incompatible
github.com/go-sql-driver/mysql v1.4.1
......@@ -11,9 +11,8 @@ require (
github.com/json-iterator/go v1.1.7
github.com/onsi/ginkgo v1.8.0 // indirect
github.com/onsi/gomega v1.5.0 // indirect
github.com/pkg/errors v0.8.1
github.com/sirupsen/logrus v1.4.0
github.com/yuchenfw/gocrypt v0.0.0-20190627061521-ee7b5965ec93
github.com/yuchenfw/gocrypt v0.0.0-20190627061521-ee7b5965ec93 // indirect
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4
google.golang.org/appengine v1.6.0 // indirect
gopkg.in/yaml.v2 v2.2.2
......
......@@ -6,13 +6,12 @@ import (
"os"
)
func StartDebug() {
func StartDebug() {
logger.AddHook(new(debugHook))
}
type debugHook struct {
}
func (h *debugHook) Levels() []logrus.Level {
......@@ -29,7 +28,7 @@ func (h *debugHook) Levels() []logrus.Level {
func (h *debugHook) Fire(entry *logrus.Entry) error {
s, e := logger.Formatter.Format(entry)
if e!=nil{
if e != nil {
fmt.Println(entry)
return nil
}
......
......@@ -8,11 +8,13 @@ import (
"strings"
"time"
)
const (
defaultTimestampFormat = time.RFC3339
)
type LineFormatter struct {
TimestampFormat string
TimestampFormat string
CallerPrettyfier func(*runtime.Frame) (function string, file string)
}
......@@ -30,7 +32,6 @@ func (f *LineFormatter) Format(entry *logrus.Entry) ([]byte, error) {
b = &bytes.Buffer{}
}
timestampFormat := f.TimestampFormat
if timestampFormat == "" {
timestampFormat = defaultTimestampFormat
......@@ -39,9 +40,8 @@ func (f *LineFormatter) Format(entry *logrus.Entry) ([]byte, error) {
levelText := strings.ToUpper(entry.Level.String())
levelText = levelText[0:4]
b.WriteString(fmt.Sprint("[", entry.Time.Format(timestampFormat),"] "))
b.WriteString(fmt.Sprint("[", levelText,"] "))
b.WriteString(fmt.Sprint("[", entry.Time.Format(timestampFormat), "] "))
b.WriteString(fmt.Sprint("[", levelText, "] "))
if entry.HasCaller() {
......@@ -61,23 +61,22 @@ func (f *LineFormatter) Format(entry *logrus.Entry) ([]byte, error) {
b.WriteString(strings.TrimSuffix(entry.Message, "\n"))
for k, v := range data {
for k ,v:= range data {
appendKeyValue(b,k,v)
appendKeyValue(b, k, v)
}
b.WriteByte('\n')
return b.Bytes(), nil
}
func needsQuoting(text string) bool {
func needsQuoting(text string) bool {
if len(text) == 0 {
if len(text) == 0 {
return true
}
if text[0] =='"' {
if text[0] == '"' {
return false
}
for _, ch := range text {
......@@ -91,7 +90,7 @@ func needsQuoting(text string) bool {
return false
}
func appendKeyValue(b *bytes.Buffer, key string, value interface{}) {
func appendKeyValue(b *bytes.Buffer, key string, value interface{}) {
if b.Len() > 0 {
b.WriteByte(' ')
}
......@@ -100,7 +99,7 @@ func appendKeyValue(b *bytes.Buffer, key string, value interface{}) {
appendValue(b, value)
}
func appendValue(b *bytes.Buffer, value interface{}) {
func appendValue(b *bytes.Buffer, value interface{}) {
stringVal, ok := value.(string)
if !ok {
stringVal = fmt.Sprint(value)
......
......@@ -7,18 +7,18 @@ import (
type LogPeriod interface {
String() string
FormatLayout()string
FormatLayout() string
}
type LogPeriodType int
func ParsePeriod(v string)(LogPeriod,error) {
func ParsePeriod(v string) (LogPeriod, error) {
switch strings.ToLower(v) {
case "month":
return PeriodMonth,nil
return PeriodMonth, nil
case "day":
return PeriodDay,nil
return PeriodDay, nil
case "hour":
return PeriodHour,nil
return PeriodHour, nil
}
return nil, fmt.Errorf("not a valid period: %q", v)
......@@ -42,12 +42,11 @@ const (
PeriodHour
)
func (period LogPeriodType) FormatLayout() string {
switch period {
case PeriodHour:
{
return "2006-01-02-15"
return "2006-01-02-15"
}
case PeriodDay:
{
......
......@@ -11,44 +11,46 @@ import (
"sync"
"time"
)
const MaxBufferd = 1024*500
var(
const MaxBufferd = 1024 * 500
var (
bufferPool = &sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
)
type FileWriterByPeriod struct {
wC chan *bytes.Buffer
dir string
file string
period LogPeriod
enable bool
cancelFunc context.CancelFunc
locker sync.Mutex
wg sync.WaitGroup
expire time.Duration
wC chan *bytes.Buffer
dir string
file string
period LogPeriod
enable bool
cancelFunc context.CancelFunc
locker sync.Mutex
wg sync.WaitGroup
expire time.Duration
}
func NewFileWriteBytePeriod() *FileWriterByPeriod {
w:=&FileWriterByPeriod{
locker:sync.Mutex{},
wg:sync.WaitGroup{},
enable:false,
w := &FileWriterByPeriod{
locker: sync.Mutex{},
wg: sync.WaitGroup{},
enable: false,
}
return w
}
func (w *FileWriterByPeriod)getExpire()time.Duration{
func (w *FileWriterByPeriod) getExpire() time.Duration {
w.locker.Lock()
expire:= w.expire
expire := w.expire
w.locker.Unlock()
return expire
}
func (w *FileWriterByPeriod)Set(dir,file string,period LogPeriod,expire time.Duration){
fileName:=strings.TrimSuffix(file,".log")
func (w *FileWriterByPeriod) Set(dir, file string, period LogPeriod, expire time.Duration) {
fileName := strings.TrimSuffix(file, ".log")
w.locker.Lock()
w.file = fileName
......@@ -57,76 +59,75 @@ func (w *FileWriterByPeriod)Set(dir,file string,period LogPeriod,expire time.Dur
w.expire = expire
w.locker.Unlock()
}
func (w *FileWriterByPeriod) Open() {
func (w *FileWriterByPeriod) Open() {
w.locker.Lock()
defer w.locker.Unlock()
defer w.locker.Unlock()
if w.enable{
if w.enable {
return
}
ctx, cancel := context.WithCancel(context.Background())
w.cancelFunc = cancel
w.wC = make(chan *bytes.Buffer,100)
w.wC = make(chan *bytes.Buffer, 100)
w.wg.Add(1)
w.enable = true
go w.do(ctx)
}
func (w *FileWriterByPeriod) Close() {
func (w *FileWriterByPeriod) Close() {
isClose := false
w.locker.Lock()
if !w.enable{
if !w.enable {
w.locker.Unlock()
return
}
if w.cancelFunc != nil{
if w.cancelFunc != nil {
isClose = true
w.cancelFunc()
w.cancelFunc = nil
}
w.enable = false
w.locker.Unlock()
if isClose{
if isClose {
w.wg.Wait()
}
}
func (w *FileWriterByPeriod) Write(p []byte) (n int, err error) {
l:=len(p)
l := len(p)
if !w.enable {
return l,nil
return l, nil
}
buffer := bufferPool.Get().(*bytes.Buffer)
buffer.Reset()
buffer.Write(p)
w.wC<-buffer
return l,nil
w.wC <- buffer
return l, nil
}
func (w *FileWriterByPeriod) do(ctx context.Context) {
func (w *FileWriterByPeriod) do(ctx context.Context) {
w.initFile()
f,lastTag,e:=w.openFile()
if e!=nil{
fmt.Printf("open log file:%s\n",e.Error())
f, lastTag, e := w.openFile()
if e != nil {
fmt.Printf("open log file:%s\n", e.Error())
return
}
buf:=bufio.NewWriter(f)
t:=time.NewTicker(time.Second*5)
buf := bufio.NewWriter(f)
t := time.NewTicker(time.Second * 5)
defer t.Stop()
tflusth:=time.NewTimer(time.Second)
tflusth := time.NewTimer(time.Second)
for{
for {
select {
case <-ctx.Done():
{
for len(w.wC)>0{
p:=<-w.wC
for len(w.wC) > 0 {
p := <-w.wC
buf.Write(p.Bytes())
bufferPool.Put(p)
}
......@@ -139,20 +140,20 @@ func (w *FileWriterByPeriod) do(ctx context.Context) {
case <-t.C:
{
if buf.Buffered() >0{
if buf.Buffered() > 0 {
buf.Flush()
tflusth.Reset(time.Second)
}
if lastTag != w.timeTag(time.Now()){
if lastTag != w.timeTag(time.Now()) {
f.Close()
w.history(lastTag)
fnew,tag,err:=w.openFile()
if err!=nil{
fnew, tag, err := w.openFile()
if err != nil {
return
}
lastTag = tag
f=fnew
f = fnew
buf.Reset(f)
go w.dropHistory()
......@@ -161,60 +162,61 @@ func (w *FileWriterByPeriod) do(ctx context.Context) {
}
case <-tflusth.C:
{
if buf.Buffered()> 0{
if buf.Buffered() > 0 {
buf.Flush()
}
tflusth.Reset(time.Second)
}
case p:=<-w.wC:{
buf.Write(p.Bytes())
bufferPool.Put(p)
if buf.Buffered()>MaxBufferd{
buf.Flush()
case p := <-w.wC:
{
buf.Write(p.Bytes())
bufferPool.Put(p)
if buf.Buffered() > MaxBufferd {
buf.Flush()
}
tflusth.Reset(time.Second)
}
tflusth.Reset(time.Second)
}
}
}
}
func (w *FileWriterByPeriod) timeTag(t time.Time) string {
w.locker.Lock()
tag:= t.Format(w.period.FormatLayout())
tag := t.Format(w.period.FormatLayout())
w.locker.Unlock()
return tag
}
func (w *FileWriterByPeriod) history(tag string) {
path := filepath.Join(w.dir,fmt.Sprintf("%s.log",w.file))
histroy:= filepath.Join(w.dir,fmt.Sprintf("%s-%s.log",w.file,tag))
_=os.Rename(path,histroy)
path := filepath.Join(w.dir, fmt.Sprintf("%s.log", w.file))
histroy := filepath.Join(w.dir, fmt.Sprintf("%s-%s.log", w.file, tag))
_ = os.Rename(path, histroy)
}
func (w *FileWriterByPeriod) dropHistory(){
expire:=w.getExpire()
func (w *FileWriterByPeriod) dropHistory() {
expire := w.getExpire()
expireTime := time.Now().Add(- expire)
pathPatten := filepath.Join(w.dir,fmt.Sprintf("%s-*",w.file))
pathPatten := filepath.Join(w.dir, fmt.Sprintf("%s-*", w.file))
files, err := filepath.Glob(pathPatten)
if err==nil{
for _,f:=range files{
if info, e := os.Stat(f);e==nil{
if err == nil {
for _, f := range files {
if info, e := os.Stat(f); e == nil {
if expireTime.After(info.ModTime()){
_=os.Remove(f)
if expireTime.After(info.ModTime()) {
_ = os.Remove(f)
}
}
}
}
}
func (w *FileWriterByPeriod) initFile() {
_=os.MkdirAll(w.dir,os.ModePerm)
path := filepath.Join(w.dir,fmt.Sprintf("%s.log",w.file))
nowTag:= w.timeTag(time.Now())
if info, e := os.Stat(path);e==nil{
timeTag:=w.timeTag(info.ModTime())
if timeTag !=nowTag{
func (w *FileWriterByPeriod) initFile() {
_ = os.MkdirAll(w.dir, os.ModePerm)
path := filepath.Join(w.dir, fmt.Sprintf("%s.log", w.file))
nowTag := w.timeTag(time.Now())
if info, e := os.Stat(path); e == nil {
timeTag := w.timeTag(info.ModTime())
if timeTag != nowTag {
w.history(timeTag)
}
}
......@@ -223,14 +225,14 @@ func (w *FileWriterByPeriod) initFile() {
}
func (w *FileWriterByPeriod)openFile()(*os.File,string,error) {
path := filepath.Join(w.dir,fmt.Sprintf("%s.log",w.file))
nowTag:= w.timeTag(time.Now())
f,err:=os.OpenFile(path,os.O_WRONLY|os.O_CREATE|os.O_APPEND,0666)
func (w *FileWriterByPeriod) openFile() (*os.File, string, error) {
path := filepath.Join(w.dir, fmt.Sprintf("%s.log", w.file))
nowTag := w.timeTag(time.Now())
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
if err!= nil{
return nil,"",err
if err != nil {
return nil, "", err
}
return f,nowTag,err
return f, nowTag, err
}
\ No newline at end of file
}
......@@ -8,7 +8,6 @@ import (
)
type MinPeriod struct {
}
func (p *MinPeriod) String() string {
......@@ -20,17 +19,17 @@ func (p *MinPeriod) FormatLayout() string {
}
func TestFileWriterByPeriod(t *testing.T) {
w:=NewFileWriteBytePeriod("/Users/huangmengzhu/test/log","app.log",new(MinPeriod))
w := NewFileWriteBytePeriod("/Users/huangmengzhu/test/log", "app.log", new(MinPeriod))
defer w.Close()
ctx,_:=context.WithTimeout(context.Background(),time.Minute*3)
ctx, _ := context.WithTimeout(context.Background(), time.Minute*3)
tick:=time.NewTicker(time.Millisecond)
tick := time.NewTicker(time.Millisecond)
defer tick.Stop()
index:= 0
index := 0
for {
select {
case <-ctx.Done():
case <-ctx.Done():
{
w.Close()
return
......@@ -39,8 +38,8 @@ func TestFileWriterByPeriod(t *testing.T) {
case <-tick.C:
{
index++
fmt.Fprintf(w,"line:%d\n",index)
fmt.Fprintf(w, "line:%d\n", index)
}
}
}
}
\ No newline at end of file
}
......@@ -3,6 +3,6 @@ package access_log
import "time"
const (
DefaultTimeStampFormatter ="[2006-01-02 15:04:05]"
TimeIso8601Formatter = "["+time.RFC3339+"]"
DefaultTimeStampFormatter = "[2006-01-02 15:04:05]"
TimeIso8601Formatter = "[" + time.RFC3339 + "]"
)
......@@ -10,10 +10,11 @@ import (
)
type AccessLogFormatter struct {
fields []access_field.AccessFieldKey
locker sync.RWMutex
fields []access_field.AccessFieldKey
locker sync.RWMutex
TimestampFormat string
}
func (f *AccessLogFormatter) SetFields(fields []access_field.AccessFieldKey) {
f.locker.Lock()
f.fields = fields
......@@ -32,27 +33,27 @@ func (f *AccessLogFormatter) Format(entry *logrus.Entry) ([]byte, error) {
timestampFormat = DefaultTimeStampFormatter
}
data:= entry.Data
data := entry.Data
data[access_field.TimeLocal] = entry.Time.Format(timestampFormat)
data[access_field.TimeIso8601] = entry.Time.Format(TimeIso8601Formatter)
data[access_field.TimeIso8601] = entry.Time.Format(TimeIso8601Formatter)
msec := entry.Time.UnixNano()/int64(time.Millisecond)
data[access_field.Msec] = fmt.Sprintf("%d.%d",msec/1000,msec%1000)
msec := entry.Time.UnixNano() / int64(time.Millisecond)
data[access_field.Msec] = fmt.Sprintf("%d.%d", msec/1000, msec%1000)
requestTIme:= data[access_field.RequestTime].(time.Duration)
data[access_field.RequestTime] = fmt.Sprintf("%dms",requestTIme/time.Millisecond)
requestTIme := data[access_field.RequestTime].(time.Duration)
data[access_field.RequestTime] = fmt.Sprintf("%dms", requestTIme/time.Millisecond)
for _,key:=range f.fields{
for _, key := range f.fields {
b.WriteByte('\t')
if v,has := data[key.Key()];has{
f.appendValue(b,v)
}else{
f.appendValue(b,"-")
if v, has := data[key.Key()]; has {
f.appendValue(b, v)
} else {
f.appendValue(b, "-")
}
}
b.WriteByte('\n')
p:=b.Bytes()
return p[1:],nil
p := b.Bytes()
return p[1:], nil
}
//
......@@ -75,11 +76,11 @@ func (f *AccessLogFormatter) appendValue(b *bytes.Buffer, value interface{}) {
}
//if !f.needsQuoting(stringVal) {
b.WriteString(stringVal)
b.WriteString(stringVal)
//} else {
// b.WriteString(fmt.Sprintf("%q", stringVal))
//}
}
func NewAccessLogFormatter(fields []access_field.AccessFieldKey)*AccessLogFormatter {
return &AccessLogFormatter{fields:fields}
}
\ No newline at end of file
func NewAccessLogFormatter(fields []access_field.AccessFieldKey) *AccessLogFormatter {
return &AccessLogFormatter{fields: fields}
}
......@@ -6,10 +6,11 @@ import (
"github.com/sirupsen/logrus"
"time"
)
var (
logger *logrus.Logger
formatter *AccessLogFormatter
writer *log.FileWriterByPeriod
logger *logrus.Logger
formatter *AccessLogFormatter
writer *log.FileWriterByPeriod
)
//func InitLogger(enable bool,fields []string,dir,file string,period log.LogPeriod) {
......@@ -19,41 +20,42 @@ var (
//
//}
type Fields = logrus.Fields
func Log(fields Fields) {
if logger ==nil{
func Log(fields Fields) {
if logger == nil {
return
}
logger.WithFields(fields).Info()
}
func SetFields(fields []access_field.AccessFieldKey){
if formatter == nil{
formatter = NewAccessLogFormatter(fields)
}else{
func SetFields(fields []access_field.AccessFieldKey) {
if formatter == nil {
formatter = NewAccessLogFormatter(fields)
} else {
formatter.SetFields(fields)
}
}
func SetOutput(enable bool,dir, file string, period log.LogPeriod,expire int) {
func SetOutput(enable bool, dir, file string, period log.LogPeriod, expire int) {
if enable{
if writer == nil{
if enable {
if writer == nil {
writer = log.NewFileWriteBytePeriod()
}
writer.Set(dir,file,period,time.Duration(expire)*time.Hour*24)
writer.Set(dir, file, period, time.Duration(expire)*time.Hour*24)
writer.Open()
if logger == nil{
if logger == nil {
logger = logrus.New()
logger.SetFormatter(formatter)
logger.SetOutput(writer)
logger.SetLevel(logrus.InfoLevel)
}
}else{
if writer!=nil{
} else {
if writer != nil {
writer.Close()
}
}
}
\ No newline at end of file
}
......@@ -8,41 +8,40 @@ import (
func Test(t *testing.T) {
dir:="/Users/huangmengzhu/test/log"
file:="access.log"
period:=log.PeriodDay
dir := "/Users/huangmengzhu/test/log"
file := "access.log"
period := log.PeriodDay
SetFields(access_field.All())
log.SetOutPut(true,dir,file,period)
demoCtx:=Fields{
"$remote_addr":"192.168.0.1",
"$http_x_forwarded_for":"192.168.0.99",
log.SetOutPut(true, dir, file, period)
demoCtx := Fields{
"$remote_addr": "192.168.0.1",
"$http_x_forwarded_for": "192.168.0.99",
//"$remote_user":"",
"$request":"\"GET /kingsword\"",
"$status":200,
"$body_bytes_sent":300,
"$bytes_sent":500,
"$request": "\"GET /kingsword\"",
"$status": 200,
"$body_bytes_sent": 300,
"$bytes_sent": 500,
//"$msec":"日志写入时间。单位为秒,精度是毫秒。",
//"$http_referer":"",
"$http_user_agent":"\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36\"",
"$request_length":100,
"$request_time":100,
"$http_user_agent": "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36\"",
"$request_length": 100,
"$request_time": 100,
//"$time_iso8601":"ISO8601标准格式下的本地时间。",
//"$time_local":"通用日志格式下的本地时间。",
"$requestId":"xxffsdffadf",
"$finally_server":"127.0.0.1:8080",
"$balance":"Static_Load",
"$strategy":"FKdCm2",
"$api":"\"1657 kingsword\"",
"$retry":"10.1.0.1:80,10.1.0.2:800",
"$proxy":"\"POST /proxy HTTPS\"",
"$proxy_status":200,
"$requestId": "xxffsdffadf",
"$finally_server": "127.0.0.1:8080",
"$balance": "Static_Load",
"$strategy": "FKdCm2",
"$api": "\"1657 kingsword\"",
"$retry": "10.1.0.1:80,10.1.0.2:800",
"$proxy": "\"POST /proxy HTTPS\"",
"$proxy_status": 200,
}
Log(demoCtx)
writer.Close()
t.Log("xx")
}
\ No newline at end of file
}
......@@ -9,4 +9,4 @@ type CMD struct {
type ClusterConfig struct {
CMD
Cluster *entity.ClusterInfo `json:"cluster"`
}
\ No newline at end of file
}
......@@ -11,6 +11,7 @@ import (
node_common "github.com/eolinker/goku/goku-node/node-common"
)
// 新增报警信息
func AddAlertMessage(apiID int, apiName, requestURL, targetServer, targetURL, requestMethod, proxyMethod, headerList, queryParamList, formParamList, responseHeaderList string, alertPeriodType, alertCount, responseStatus int, isAlert string, strategyID string, strategyName, requestID string) (bool, string, error) {
client := &http.Client{
......
......@@ -40,7 +40,7 @@ func (ctx *Context) SetRetryTargetServers(retryTargetServers string) {
ctx.retryTargetServers = retryTargetServers
}
func (ctx *Context) Finish() (n int,statusCode int){
func (ctx *Context) Finish() (n int, statusCode int) {
header := ctx.PriorityHeader.header
......@@ -91,10 +91,10 @@ func (ctx *Context) Finish() (n int,statusCode int){
ctx.w.WriteHeader(statusCode)
if !bodyAllowed {
return 0,statusCode
return 0, statusCode
}
n, _ = ctx.w.Write(ctx.Body)
return n,statusCode
return n, statusCode
}
func (ctx *Context) RequestId() string {
return ctx.requestId
......
package common
import (
goku_plugin "github.com/eolinker/goku-plugin"
"net/http"
"net/url"
)
type Header struct {
......@@ -20,7 +18,7 @@ func (h *Header) Headers() http.Header {
}
return n
}
func (h *Header) String()string {
func (h *Header) String() string {
return url.Values(h.header).Encode()
//buf:=bytes.NewBuffer(nil)
......
package goku_node
import (
_ "github.com/eolinker/goku/goku-node/manager/service-manager"
_ "github.com/eolinker/goku/goku-service/driver/consul"
_ "github.com/eolinker/goku/goku-service/driver/eureka"
......
......@@ -5,24 +5,24 @@ import (
)
type Entry struct {
Pattern string
Pattern string
HandlerFunc func(w http.ResponseWriter, r *http.Request)
}
func init() {
}
func Handler() []Entry {
return []Entry{
{
Pattern:"/goku-update", HandlerFunc:gokuUpdate,
Pattern: "/goku-update", HandlerFunc: gokuUpdate,
},
{
Pattern:"/goku-check_update",HandlerFunc: gokuCheckUpdate},
Pattern: "/goku-check_update", HandlerFunc: gokuCheckUpdate},
{
Pattern:"/goku-check_plugin" , HandlerFunc:gokuCheckPlugin},
Pattern: "/goku-check_plugin", HandlerFunc: gokuCheckPlugin},
{
Pattern:"/goku-monitor",HandlerFunc:gokuMonitor},
Pattern: "/goku-monitor", HandlerFunc: gokuMonitor},
}
}
......@@ -8,7 +8,7 @@ import (
"net/http"
)
func gokuCheckPlugin( w http.ResponseWriter,req *http.Request) {
func gokuCheckPlugin(w http.ResponseWriter, req *http.Request) {
req.ParseForm()
pluginName := req.PostFormValue("pluginName")
......
......@@ -2,12 +2,11 @@ package handler
import (
"encoding/json"
. "github.com/eolinker/goku/common/version"
"github.com/eolinker/goku/goku-node/manager/updater"
"net/http"
)
func gokuUpdate(w http.ResponseWriter, r *http.Request) {
func gokuUpdate(w http.ResponseWriter, r *http.Request) {
updater.Update()
}
func gokuCheckUpdate(w http.ResponseWriter, r *http.Request) {
......@@ -21,4 +20,4 @@ func gokuCheckUpdate(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
_, _ = w.Write(resultStr)
return
}
\ No newline at end of file
}
......@@ -47,7 +47,7 @@ func genBalance(e *entity.Balance) *balance.Balance {
b.AppConfig = e.Static
if err := json.Unmarshal([]byte(e.StaticCluster), &m); err == nil {
if v, has := m[node_common.ClusterName()]; has {
if len(strings.Trim(v, " "))>0{
if len(strings.Trim(v, " ")) > 0 {
b.AppConfig = v
}
}
......
package config_manager
type AccessField struct {
Name string `json:"name"`
Select bool `json:"select"`
}
\ No newline at end of file
Name string `json:"name"`
Select bool `json:"select"`
}
......@@ -11,14 +11,14 @@ import (
)
const (
AccessLog = "access"
NodeLog = "node"
AccessLog = "access"
NodeLog = "node"
)
func init() {
updater.Add(reloadLogConfig,1, "goku_config_log")
updater.Add(reloadLogConfig, 1, "goku_config_log")
}
func defaultAccessLogConfig()*entity.LogConfig {
func defaultAccessLogConfig() *entity.LogConfig {
return &entity.LogConfig{
Name: AccessLog,
Enable: 1,
......@@ -29,7 +29,7 @@ func defaultAccessLogConfig()*entity.LogConfig {
Fields: "",
}
}
func defaultNodeAppLogConfig()*entity.LogConfig {
func defaultNodeAppLogConfig() *entity.LogConfig {
return &entity.LogConfig{
Name: AccessLog,
Enable: 1,
......@@ -37,74 +37,72 @@ func defaultNodeAppLogConfig()*entity.LogConfig {
File: "node.log",
Level: "error",
Period: "hour",
}
}
func InitLog() {
func InitLog() {
reloadLogConfig()
}
func reloadLogConfig() {
func reloadLogConfig() {
reloadAppLog()
reloadAccessLog()
}
func reloadAppLog() {
c,e:= dao.Get(NodeLog)
if e!=nil{
log.Warn("manager/config load goku_config_log fro node error:",e)
func reloadAppLog() {
c, e := dao.Get(NodeLog)
if e != nil {
log.Warn("manager/config load goku_config_log fro node error:", e)
c = defaultNodeAppLogConfig()
}
period,err:=log.ParsePeriod(c.Period)
if err!=nil{
period, err := log.ParsePeriod(c.Period)
if err != nil {
period = log.PeriodDay
log.Warn("manager/config unmarshal access log period failed for nod , use the default config:%s",e)
log.Warn("manager/config unmarshal access log period failed for nod , use the default config:%s", e)
}
level,err:= log.ParseLevel(c.Level)
if err!=nil{
level, err := log.ParseLevel(c.Level)
if err != nil {
level = log.WarnLevel
log.Warn("manager/config unmarshal access log level failed for nod , use the default config:%s",e)
log.Warn("manager/config unmarshal access log level failed for nod , use the default config:%s", e)
}
enable:= c.Enable == 1
log.SetOutPut(enable,c.Dir,c.File,period,c.Expire)
enable := c.Enable == 1
log.SetOutPut(enable, c.Dir, c.File, period, c.Expire)
log.SetLevel(level)
}
func reloadAccessLog() {
func reloadAccessLog() {
c, e := dao.Get(AccessLog)
if e!=nil{
log.Warn("manager/config load goku_config_log for access log error:",e)
c= defaultAccessLogConfig()
if e != nil {
log.Warn("manager/config load goku_config_log for access log error:", e)
c = defaultAccessLogConfig()
}
period,err:=log.ParsePeriod(c.Period)
if err!=nil{
period, err := log.ParsePeriod(c.Period)
if err != nil {
period = log.PeriodDay
log.Warn("manager/config unmarshal period failed for , use the default config:%s",e)
log.Warn("manager/config unmarshal period failed for , use the default config:%s", e)
}
enable:= c.Enable == 1
enable := c.Enable == 1
fieldsConfig:= make([]AccessField ,0,access_field.Size())
err =json.Unmarshal([]byte(c.Fields),&fieldsConfig)
fieldsConfig := make([]AccessField, 0, access_field.Size())
err = json.Unmarshal([]byte(c.Fields), &fieldsConfig)
if err!= nil || len(fieldsConfig) == 0{
log.Warn("manager/config unmarshal access log fields error:",err)
if err != nil || len(fieldsConfig) == 0 {
log.Warn("manager/config unmarshal access log fields error:", err)
access_log.SetFields(access_field.Default())
}else{
fields:=make([]access_field.AccessFieldKey,0,access_field.Size())
for _,f:=range fieldsConfig{
if f.Select{
if access_field.Has(f.Name){
fields = append(fields,access_field.Parse(f.Name))
} else {
fields := make([]access_field.AccessFieldKey, 0, access_field.Size())
for _, f := range fieldsConfig {
if f.Select {
if access_field.Has(f.Name) {
fields = append(fields, access_field.Parse(f.Name))
}
}
}
access_log.SetFields(fields)
}
access_log.SetOutput( enable,c.Dir,c.File,period,c.Expire)
access_log.SetOutput(enable, c.Dir, c.File, period, c.Expire)
}
\ No newline at end of file
}
......@@ -68,7 +68,7 @@ func LoadPlugin(pis map[string]*entity.PluginInfo) (allFactory map[string]*entit
for key, value := range pis {
handle, err, _ := globalPluginManager.loadPlugin(key)
if err != nil {
goku_plugin.Warn("LoadPlugin:",err.Error())
goku_plugin.Warn("LoadPlugin:", err.Error())
continue
}
factory := &entity.PluginFactoryHandler{
......@@ -144,14 +144,14 @@ func (m *_GlodPluginManager) loadPlugin(name string) (goku_plugin.PluginFactory,
vp, ok := v.(func() goku_plugin.PluginFactory)
if !ok {
e := fmt.Errorf("The builder func can not implemented interface named goku_plugin.PluginFactory:%s ",name)
e := fmt.Errorf("The builder func can not implemented interface named goku_plugin.PluginFactory:%s ", name)
m.errors[name] = e
m.errorCodes[name] = LoadInterFaceError
return nil, e, LoadInterFaceError
}
factory := vp()
if factory == nil || reflect.ValueOf(factory).IsNil() {
e := fmt.Errorf("The builder result is nil:%s ",name)
e := fmt.Errorf("The builder result is nil:%s ", name)
m.errors[name] = e
m.errorCodes[name] = LoadInterFaceError
return nil, e, LoadInterFaceError
......
......@@ -51,7 +51,7 @@ func toDiscoverConfig(e *entity.Service) *discovery.Config {
clusterName := node_common.ClusterName()
if v, has := clusterConfigObj[clusterName]; has {
if len(strings.Trim(v, " "))>0{
if len(strings.Trim(v, " ")) > 0 {
c.Config = v
}
}
......
......@@ -36,7 +36,7 @@ func CheckApiFromStrategy(strategyId, requestPath string, requestMethod string)
apiextend.Target = apiInfo.BalanceName
}
apiextend.Target = utils.TrimSuffixAll(apiextend.Target,"/")
apiextend.Target = utils.TrimSuffixAll(apiextend.Target, "/")
//apiextend.TargetServer = balance_manager.ParseTargetServer(apiextend.Target)
return apiextend, splitURL, param, true
......
......@@ -19,11 +19,11 @@ func ClusterName() string {
}
func SetAdmin(host string) {
h:= strings.TrimPrefix(host,"http://")
h = strings.TrimSuffix(h,"/")
h := strings.TrimPrefix(host, "http://")
h = strings.TrimSuffix(h, "/")
adminUrl = fmt.Sprintf("http://%s", h)
}
func GetAdminUrl(path string) string {
p:=strings.TrimPrefix(path,"/")
p := strings.TrimPrefix(path, "/")
return fmt.Sprintf("%s/%s", adminUrl, p)
}
......@@ -8,6 +8,7 @@ import (
"reflect"
"time"
)
var (
authNames = map[string]string{
"Oauth2": "goku-oauth2_auth",
......@@ -23,40 +24,37 @@ var (
}
)
func getPluginNameByType(authType string) (string, bool) {
name, has := authNames[authType]
return name, has
}
// 执行插件的Access函数
func AccessFunc(ctx *common.Context, handleFunc []*entity.PluginHandlerExce) (bool, int) {
requestId := ctx.RequestId()
authType := ctx.Request().GetHeader("Authorization-Type")
authName, _ := getPluginNameByType(authType)
defer func(ctx *common.Context) {
log.Debug(requestId," access plugin default: begin")
log.Debug(requestId, " access plugin default: begin")
for _, handler := range plugin_manager.GetDefaultPlugins() {
if handler.PluginObj.Access==nil || reflect.ValueOf(handler.PluginObj.Access).IsNil() {
if handler.PluginObj.Access == nil || reflect.ValueOf(handler.PluginObj.Access).IsNil() {
continue
}
ctx.SetPlugin(handler.Name)
log.Info(requestId," access plugin:",handler.Name)
now:=time.Now()
_, err:=handler.PluginObj.Access.Access(ctx)
log.Debug(requestId," access plugin:",handler.Name," Duration",time.Since(now))
if err!=nil{
log.Warn(requestId," access plugin:",handler.Name," error:",err.Error())
log.Info(requestId, " access plugin:", handler.Name)
now := time.Now()
_, err := handler.PluginObj.Access.Access(ctx)
log.Debug(requestId, " access plugin:", handler.Name, " Duration", time.Since(now))
if err != nil {
log.Warn(requestId, " access plugin:", handler.Name, " error:", err.Error())
}
}
log.Debug(requestId," access plugin default: end")
log.Debug(requestId, " access plugin default: end")
}(ctx)
isAuthSucess := false
isNeedAuth := false
log.Debug(requestId," access plugin auth check: begin")
log.Debug(requestId, " access plugin auth check: begin")
for _, handler := range handleFunc {
if _, has := authPluginNames[handler.Name]; has {
isNeedAuth = true
......@@ -67,33 +65,33 @@ func AccessFunc(ctx *common.Context, handleFunc []*entity.PluginHandlerExce) (bo
continue
}
ctx.SetPlugin(handler.Name)
log.Debug(requestId," access plugin:",handler.Name," begin")
now:=time.Now()
log.Debug(requestId, " access plugin:", handler.Name, " begin")
now := time.Now()
flag, err := handler.PluginObj.Access.Access(ctx)
log.Debug(requestId," access plugin:",handler.Name," Duration",time.Since(now))
log.Debug(requestId, " access plugin:", handler.Name, " Duration", time.Since(now))
if flag == false {
// 校验失败
if err != nil {
log.Warn(requestId," access auth:[",handler.Name,"] error:",err.Error())
log.Warn(requestId, " access auth:[", handler.Name, "] error:", err.Error())
}
log.Info(requestId," auth [",authName,"] refuse")
log.Info(requestId, " auth [", authName, "] refuse")
return false, 0
}
log.Debug(requestId," auth [",authName,"] pass")
log.Debug(requestId, " auth [", authName, "] pass")
isAuthSucess = true
}
}
log.Debug(requestId," access plugin auth check: end")
log.Debug(requestId, " access plugin auth check: end")
// 需要校验但是没有执行校验
if isNeedAuth && !isAuthSucess {
log.Warn(requestId," Illegal authorization type:",authType)
log.Warn(requestId, " Illegal authorization type:", authType)
ctx.SetStatus(403, "403")
ctx.SetBody([]byte("[ERROR]Illegal authorization type!"))
return false, 0
}
lastIndex := 0
log.Debug(requestId," access plugin : begin")
log.Debug(requestId, " access plugin : begin")
// 执行校验以外的插件
for index, handler := range handleFunc {
lastIndex = index
......@@ -106,19 +104,19 @@ func AccessFunc(ctx *common.Context, handleFunc []*entity.PluginHandlerExce) (bo
}
ctx.SetPlugin(handler.Name)
log.Debug(requestId," access plugin:",handler.Name)
now:=time.Now()
log.Debug(requestId, " access plugin:", handler.Name)
now := time.Now()
flag, err := handler.PluginObj.Access.Access(ctx)
log.Debug(requestId," access plugin:",handler.Name," Duration:",time.Since(now))
log.Debug(requestId, " access plugin:", handler.Name, " Duration:", time.Since(now))
if err != nil {
log.Warn(requestId," access plugin:",handler.Name," error:",err.Error())
log.Warn(requestId, " access plugin:", handler.Name, " error:", err.Error())
}
if flag == false && handler.IsStop {
log.Info(requestId," access plugin:",handler.Name," stop")
log.Info(requestId, " access plugin:", handler.Name, " stop")
return false, index
}
log.Debug(requestId," access plugin:",handler.Name," continue")
log.Debug(requestId, " access plugin:", handler.Name, " continue")
}
log.Debug(requestId," access plugin : end")
log.Debug(requestId, " access plugin : end")
return true, lastIndex
}
\ No newline at end of file
}
......@@ -12,24 +12,24 @@ import (
func BeforeMatch(ctx *common.Context) bool {
requestId := ctx.RequestId()
defer func(ctx *common.Context) {
log.Debug(requestId," before plugin default: begin")
log.Debug(requestId, " before plugin default: begin")
for _, handler := range plugin_manager.GetDefaultPlugins() {
if handler.PluginObj.BeforeMatch == nil || reflect.ValueOf(handler.PluginObj.BeforeMatch).IsNil() {
continue
}
ctx.SetPlugin(handler.Name)
log.Debug(requestId," before plugin :",handler.Name," start")
now:=time.Now()
_,err:=handler.PluginObj.BeforeMatch.BeforeMatch(ctx)
log.Debug(requestId," before plugin :",handler.Name," Duration:",time.Since(now))
log.Debug(requestId," before plugin :",handler.Name," end")
log.Debug(requestId, " before plugin :", handler.Name, " start")
now := time.Now()
_, err := handler.PluginObj.BeforeMatch.BeforeMatch(ctx)
log.Debug(requestId, " before plugin :", handler.Name, " Duration:", time.Since(now))
log.Debug(requestId, " before plugin :", handler.Name, " end")
if err != nil {
log.Warn(requestId," before plugin:",handler.Name," error:",err.Error())
log.Warn(requestId, " before plugin:", handler.Name, " error:", err.Error())
}
}
log.Debug(requestId," before plugin default: end")
log.Debug(requestId, " before plugin default: end")
}(ctx)
log.Debug(requestId," before plugin : begin")
log.Debug(requestId, " before plugin : begin")
for _, handler := range plugin_manager.GetBeforPlugins() {
if handler.PluginObj.BeforeMatch == nil || reflect.ValueOf(handler.PluginObj.BeforeMatch).IsNil() {
......@@ -37,14 +37,14 @@ func BeforeMatch(ctx *common.Context) bool {
}
ctx.SetPlugin(handler.Name)
log.Debug(requestId," before plugin :",handler.Name," start")
now:=time.Now()
log.Debug(requestId, " before plugin :", handler.Name, " start")
now := time.Now()
flag, err := handler.PluginObj.BeforeMatch.BeforeMatch(ctx)
log.Debug(requestId," before plugin :",handler.Name," Duration:",time.Since(now))
log.Debug(requestId," before plugin :",handler.Name," end")
log.Debug(requestId, " before plugin :", handler.Name, " Duration:", time.Since(now))
log.Debug(requestId, " before plugin :", handler.Name, " end")
if err != nil {
log.Warn(requestId," before plugin:",handler.Name," error:",err.Error())
log.Warn(requestId, " before plugin:", handler.Name, " error:", err.Error())
}
if flag == false {
if handler.IsStop == true {
......@@ -52,6 +52,6 @@ func BeforeMatch(ctx *common.Context) bool {
}
}
}
log.Debug(requestId," before plugin : end")
log.Debug(requestId, " before plugin : end")
return true
}
\ No newline at end of file
}
......@@ -14,26 +14,26 @@ import (
func ProxyFunc(ctx *common.Context, handleFunc []*entity.PluginHandlerExce) (bool, int) {
requestId := ctx.RequestId()
defer func(ctx *common.Context) {
log.Debug(requestId," Proxy plugin default: begin")
log.Debug(requestId, " Proxy plugin default: begin")
for _, handler := range plugin_manager.GetDefaultPlugins() {
if handler.PluginObj.Proxy == nil || reflect.ValueOf(handler.PluginObj.Proxy).IsNil() {
continue
}
ctx.SetPlugin(handler.Name)
log.Debug(requestId," Proxy plugin :",handler.Name," start")
now:=time.Now()
_, err:= handler.PluginObj.Proxy.Proxy(ctx)
log.Debug(requestId," Proxy plugin :",handler.Name," Duration:",time.Since(now))
log.Debug(requestId," Proxy plugin :",handler.Name," end")
log.Debug(requestId, " Proxy plugin :", handler.Name, " start")
now := time.Now()
_, err := handler.PluginObj.Proxy.Proxy(ctx)
log.Debug(requestId, " Proxy plugin :", handler.Name, " Duration:", time.Since(now))
log.Debug(requestId, " Proxy plugin :", handler.Name, " end")
if err != nil {
log.Warn(requestId," Proxy plugin:",handler.Name," error:",err.Error())
log.Warn(requestId, " Proxy plugin:", handler.Name, " error:", err.Error())
}
}
log.Debug(requestId," Proxy plugin default: begin")
log.Debug(requestId, " Proxy plugin default: begin")
}(ctx)
lastIndex := 0
log.Debug(requestId," Proxy plugin : begin")
log.Debug(requestId, " Proxy plugin : begin")
for index, handler := range handleFunc {
lastIndex = index
if handler.PluginObj.Proxy == nil || reflect.ValueOf(handler.PluginObj.Proxy).IsNil() {
......@@ -41,21 +41,20 @@ func ProxyFunc(ctx *common.Context, handleFunc []*entity.PluginHandlerExce) (boo
}
ctx.SetPlugin(handler.Name)
log.Debug(requestId," Proxy plugin :",handler.Name," start")
now:=time.Now()
log.Debug(requestId, " Proxy plugin :", handler.Name, " start")
now := time.Now()
flag, err := handler.PluginObj.Proxy.Proxy(ctx)
log.Debug(requestId," Proxy plugin :",handler.Name," Duration:",time.Since(now))
log.Debug(requestId," Proxy plugin :",handler.Name," end")
log.Debug(requestId, " Proxy plugin :", handler.Name, " Duration:", time.Since(now))
log.Debug(requestId, " Proxy plugin :", handler.Name, " end")
if err != nil {
log.Warn(requestId," Proxy plugin :",handler.Name," error: ",err.Error())
log.Warn(requestId, " Proxy plugin :", handler.Name, " error: ", err.Error())
}
if flag == false && handler.IsStop == true {
return false, lastIndex
}
}
log.Debug(requestId," Proxy plugin : end")
log.Debug(requestId, " Proxy plugin : end")
return true, lastIndex
}
......@@ -14,23 +14,23 @@ import (
)
// 创建转发请求
func CreateRequest(ctx *common.Context, apiInfo *entity.ApiExtend, timeout, retry int) ( error, *http.Response) {
func CreateRequest(ctx *common.Context, apiInfo *entity.ApiExtend, timeout, retry int) (error, *http.Response) {
app,has:=balance_manager.Get(apiInfo.Target)
if !has{
err:= fmt.Errorf("get balance error:%s",apiInfo.Target)
return err,nil
app, has := balance_manager.Get(apiInfo.Target)
if !has {
err := fmt.Errorf("get balance error:%s", apiInfo.Target)
return err, nil
}
rawbody, _ := ctx.ProxyRequest.RawBody()
response,finalTargetServer,retryTargetServers,err:=app.Send(apiInfo.Protocol,ctx.ProxyRequest.Method,ctx.ProxyRequest.TargetURL(),ctx.ProxyRequest.Querys(),ctx.ProxyRequest.Headers() ,rawbody,time.Duration(timeout)*time.Millisecond,retry)
response, finalTargetServer, retryTargetServers, err := app.Send(apiInfo.Protocol, ctx.ProxyRequest.Method, ctx.ProxyRequest.TargetURL(), ctx.ProxyRequest.Querys(), ctx.ProxyRequest.Headers(), rawbody, time.Duration(timeout)*time.Millisecond, retry)
ctx.SetRetryTargetServers(strings.Join(retryTargetServers, ","))
ctx.SetFinalTargetServer(finalTargetServer)
if err!= nil{
return err,nil
if err != nil {
return err, nil
}
return nil,response
return nil, response
}
......@@ -7,7 +7,6 @@ import (
access_log "github.com/eolinker/goku/goku-node/access-log"
"github.com/eolinker/goku/goku-node/handler"
"github.com/eolinker/goku/goku-node/plugin-flow"
. "github.com/eolinker/goku/server/access-field"
"net/http"
"strconv"
"strings"
......@@ -30,7 +29,7 @@ type Router struct {
mu map[string]http.HandlerFunc
}
func (mux*Router) ServeHTTP(w http.ResponseWriter, r*http.Request) {
func (mux *Router) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.RequestURI == "*" {
if r.ProtoAtLeast(1, 1) {
w.Header().Set("Connection", "close")
......@@ -38,72 +37,69 @@ func (mux*Router) ServeHTTP(w http.ResponseWriter, r*http.Request) {
w.WriteHeader(http.StatusBadRequest)
return
}
path:=r.URL.Path
h, has:= mux.mu[path]
if has{
path := r.URL.Path
h, has := mux.mu[path]
if has {
h.ServeHTTP(w, r)
return
}
ServeHTTP(w,r)
ServeHTTP(w, r)
}
func NewRouter() http.Handler {
r:=&Router{
mu :make(map[string]http.HandlerFunc),
r := &Router{
mu: make(map[string]http.HandlerFunc),
}
hs := handler.Handler()
hs:= handler.Handler()
for _,h:=range hs{
for _, h := range hs {
r.mu[h.Pattern] = h.HandlerFunc
}
return r
return r
}
var systemRequestPath = []string{"/oauth2/token", "/oauth2/authorize", "/oauth2/verify"}
func ServeHTTP(w http.ResponseWriter, req *http.Request) {
func ServeHTTP(w http.ResponseWriter, req *http.Request) {
defer func() {
if err := recover(); err != nil {
log.Warn(err)
}
}()
timeStart := time.Now()
timeStart:=time.Now()
logFields:=make(log.Fields)
logFields := make(log.Fields)
// 记录访问次数
requestID := GetRandomString(16)
ctx := common.NewContext(req, requestID, w)
proxyStatusCode := 0
log.Debug(requestID," url: ",req.URL.String())
log.Debug(requestID," header: ",ctx.RequestOrg.Header.String())
rawBody ,err:=ctx.RequestOrg.RawBody()
if err==nil{
log.Debug(requestID," body: ",string(rawBody))
}
log.Debug(requestID, " url: ", req.URL.String())
log.Debug(requestID, " header: ", ctx.RequestOrg.Header.String())
rawBody, err := ctx.RequestOrg.RawBody()
if err == nil {
log.Debug(requestID, " body: ", string(rawBody))
}
defer func() {
n,status:=ctx.Finish()
n, status := ctx.Finish()
if ctx.ProxyResponseHandler != nil {
proxyStatusCode = ctx.ProxyResponseHandler.StatusCode()
}
logFields[RequestId]= requestID
logFields[RequestId] = requestID
logFields[StatusCode] = status
logFields[HttpUserAgent] = fmt.Sprint("\"",req.UserAgent(),"\"")
logFields[HttpUserAgent] = fmt.Sprint("\"", req.UserAgent(), "\"")
logFields[HttpReferer] = req.Referer()
logFields[RequestTime] = time.Since(timeStart)
logFields[Request] = fmt.Sprint("\"",req.Method," ",req.URL.Path," ",req.Proto,"\"")
logFields[Request] = fmt.Sprint("\"", req.Method, " ", req.URL.Path, " ", req.Proto, "\"")
logFields[BodyBytesSent] = n
logFields[Host] = req.Host
access_log.Log(logFields)
......@@ -120,12 +116,12 @@ func ServeHTTP(w http.ResponseWriter, req *http.Request) {
}()
remoteAddr := Intercept(req.RemoteAddr, ":")
logFields[RemoteAddr]= remoteAddr
logFields[RemoteAddr] = remoteAddr
if realIp := ctx.GetHeader("X-Real-Ip") ;realIp == ""{
if realIp := ctx.GetHeader("X-Real-Ip"); realIp == "" {
ctx.ProxyRequest.SetHeader("X-Real-Ip", remoteAddr)
logFields[HttpXForwardedFor] = remoteAddr
}else{
} else {
logFields[HttpXForwardedFor] = realIp
}
......@@ -133,21 +129,20 @@ func ServeHTTP(w http.ResponseWriter, req *http.Request) {
var isBefor bool
start := time.Now()
isBefor = plugin_flow.BeforeMatch(ctx)
log.Info(requestID," BeforeMatch plugin duration:",time.Since(start))
log.Info(requestID, " BeforeMatch plugin duration:", time.Since(start))
if !isBefor {
log.Info(requestID," stop by BeforeMatch plugin")
log.Info(requestID, " stop by BeforeMatch plugin")
return
}
var timeout, retryCount int
strategyID, ok := retrieveStrategyID(ctx)
if !ok {
return
}
logFields[Strategy] = fmt.Sprintf("\"%s %s\"", strategyID,ctx.StrategyName())
logFields[Strategy] = fmt.Sprintf("\"%s %s\"", strategyID, ctx.StrategyName())
requestPath := req.URL.Path
requestMenthod := ctx.Request().Method()
......@@ -160,7 +155,6 @@ func ServeHTTP(w http.ResponseWriter, req *http.Request) {
//ctx.IsMatch = true
timeout = apiInfo.Timeout
ctx.ProxyRequest.SetTargetServer(fmt.Sprintf("%s://%s", apiInfo.Protocol, apiInfo.Target))
targetUrl := apiInfo.TargetURL + requestPath
......@@ -185,7 +179,7 @@ func ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
start = time.Now()
isAccess, _ := plugin_flow.AccessFunc(ctx, handleFunc)
log.Info(requestID," Access plugin duration:",time.Since(start))
log.Info(requestID, " Access plugin duration:", time.Since(start))
if !isAccess {
// todo
......@@ -193,20 +187,20 @@ func ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
if apiInfo == nil {
log.Info(requestID," URL dose not exist!")
log.Info(requestID, " URL dose not exist!")
ctx.SetStatus(404, "404")
ctx.SetBody([]byte("[ERROR]URL dose not exist!"))
return
}
logFields[Api] = fmt.Sprintf("\"%d %s\"",apiInfo.ApiID,apiInfo.ApiName)
logFields[Proxy] = fmt.Sprintf("\"%s %s %s\"",ctx.ProxyRequest.Method,ctx.ProxyRequest.TargetURL(),apiInfo.Protocol)
logFields[Api] = fmt.Sprintf("\"%d %s\"", apiInfo.ApiID, apiInfo.ApiName)
logFields[Proxy] = fmt.Sprintf("\"%s %s %s\"", ctx.ProxyRequest.Method, ctx.ProxyRequest.TargetURL(), apiInfo.Protocol)
logFields[Balance] = apiInfo.Target
start = time.Now()
err, response := CreateRequest(ctx, apiInfo, timeout, retryCount)
log.Info(requestID," Proxy request duration:",time.Since(start))
log.Info(requestID, " Proxy request duration:", time.Since(start))
if err != nil {
log.Warn(err.Error())
log.Warn(err.Error())
}
logFields[FinallyServer] = ctx.FinalTargetServer()
logFields[Retry] = ctx.RetryTargetServers()
......@@ -243,7 +237,7 @@ func ServeHTTP(w http.ResponseWriter, req *http.Request) {
start = time.Now()
isProxy, _ := plugin_flow.ProxyFunc(ctx, handleFunc)
log.Info(requestID," Proxy plugin Duration:",time.Since(start))
log.Info(requestID, " Proxy plugin Duration:", time.Since(start))
if !isProxy {
return
}
......
......@@ -14,8 +14,6 @@ import (
"strings"
)
func InitPluginUtils() {
goku_plugin.SetRedisManager(redis_plugin_proxy.Create())
goku_plugin.InitLog(log.GetLogger())
......@@ -26,10 +24,10 @@ func InitDiscovery() {
all := discovery.AllDrivers()
log.Infof("install service discovery driver:[%s]\n", strings.Join(all, ","))
}
func InitLog() {
func InitLog() {
config_manager.InitLog()
}
func InitServer(){
func InitServer() {
log.Debug("init InitServer start")
InitPluginUtils()
......
......@@ -51,4 +51,3 @@ func PanicTrace(kb int) []byte {
stack = bytes.TrimRight(stack, "\n")
return stack
}
......@@ -225,23 +225,23 @@ func UpdateProxyFailureCount(apiInfo *entity.ApiExtend,
// 记录告警日志
func AlertLog(requestURL, targetServer, targetURL, requestMethod, proxyMethod, headerList, queryParamList, formParamList, responseHeaderList string, responseStatus int, strategyID string, strategyName, requestID string) {
log.WithFields(log.Fields{
"request_id":requestID,
"strategy_name":strategyName,
"strategy_id":strategyID,
"request_method":requestMethod,
"request_url":requestURL,
"target_method":proxyMethod,
"target_server":targetServer,
"target_url":targetURL,
"request_query":queryParamList,
"request_header":headerList,
"request_form_param":formParamList,
"response_statusCode":strconv.Itoa(responseStatus),
"response_header":responseHeaderList,
log.WithFields(log.Fields{
"request_id": requestID,
"strategy_name": strategyName,
"strategy_id": strategyID,
"request_method": requestMethod,
"request_url": requestURL,
"target_method": proxyMethod,
"target_server": targetServer,
"target_url": targetURL,
"request_query": queryParamList,
"request_header": headerList,
"request_form_param": formParamList,
"response_statusCode": strconv.Itoa(responseStatus),
"response_header": responseHeaderList,
}).Warning("alert")
//_= logutils.Log("log/alertLog", "alert", log.PeriodDay, logInfo)
//_= logutils.Log("log/alertLog", "alert", log.PeriodDay, logInfo)
return
}
......@@ -7,5 +7,5 @@ import (
)
type IHttpApplication interface {
Send(Proto string, method string,path string,querys url.Values,header http.Header,body []byte,timeout time.Duration,retry int)(*http.Response,string,[]string,error)
Send(Proto string, method string, path string, querys url.Values, header http.Header, body []byte, timeout time.Duration, retry int) (*http.Response, string, []string, error)
}
......@@ -11,36 +11,36 @@ import (
type Org struct {
server string
}
// 忽略重试
func (app *Org) Send(proto string,method string, path string,querys url.Values, header http.Header, body []byte,timeout time.Duration, retry int) (*http.Response,string,[]string,error) {
// 忽略重试
func (app *Org) Send(proto string, method string, path string, querys url.Values, header http.Header, body []byte, timeout time.Duration, retry int) (*http.Response, string, []string, error) {
var response *http.Response = nil
var err error = nil
FinalTargetServer := ""
RetryTargetServers :=make([]string,0,retry+1)
RetryTargetServers := make([]string, 0, retry+1)
path = utils.TrimPrefixAll(path,"/")
path = utils.TrimPrefixAll(path, "/")
for doTrice := retry +1;doTrice>0;doTrice--{
for doTrice := retry + 1; doTrice > 0; doTrice-- {
u:= fmt.Sprintf("%s://%s/%s",proto,app.server,path)
u := fmt.Sprintf("%s://%s/%s", proto, app.server, path)
FinalTargetServer = app.server
RetryTargetServers = append(RetryTargetServers,FinalTargetServer)
response,err =request(method,u,querys,header,body,timeout)
if err != nil{
continue
}else{
return response,FinalTargetServer,RetryTargetServers,err
RetryTargetServers = append(RetryTargetServers, FinalTargetServer)
response, err = request(method, u, querys, header, body, timeout)
if err != nil {
continue
} else {
return response, FinalTargetServer, RetryTargetServers, err
}
}
return response,FinalTargetServer,RetryTargetServers,err
return response, FinalTargetServer, RetryTargetServers, err
}
func NewOrg(server string)IHttpApplication {
func NewOrg(server string) IHttpApplication {
return &Org{
server:server,
server: server,
}
}
\ No newline at end of file
}
......@@ -7,36 +7,30 @@ import (
"time"
)
func request(method string, backendDomain string,query url.Values, header http.Header, body []byte,timeout time.Duration) (*http.Response,error) {
func request(method string, backendDomain string, query url.Values, header http.Header, body []byte, timeout time.Duration) (*http.Response, error) {
if backendDomain == "" {
return nil,fmt.Errorf("invaild url")
return nil, fmt.Errorf("invaild url")
}
u, err := url.ParseRequestURI(backendDomain)
if err != nil {
return nil, err
}
req, err := NewRequest(method, u)
if err != nil {
return nil,err
return nil, err
}
req.headers = header
req.queryParams = query
req.headers = header
req.queryParams = query
req.SetRawBody(body)
if timeout != 0 {
req.SetTimeout(timeout)
}
return req.Send()
}
\ No newline at end of file
return req.Send()
}
......@@ -11,50 +11,48 @@ import (
)
type Application struct {
service *common.Service
service *common.Service
healthCheckHandler health.CheckHandler
}
func NewApplication(service *common.Service,healthCheckHandler health.CheckHandler) *Application {
func NewApplication(service *common.Service, healthCheckHandler health.CheckHandler) *Application {
return &Application{
service: service,
healthCheckHandler: healthCheckHandler,
}
}
func (app *Application) Send(proto string,method string, path string,querys url.Values, header http.Header, body []byte,timeout time.Duration, retry int)( *http.Response,string,[]string,error) {
func (app *Application) Send(proto string, method string, path string, querys url.Values, header http.Header, body []byte, timeout time.Duration, retry int) (*http.Response, string, []string, error) {
var response *http.Response = nil
var err error = nil
FinalTargetServer := ""
RetryTargetServers :=make([]string,0,retry+1)
RetryTargetServers := make([]string, 0, retry+1)
lastIndex := -1
path = utils.TrimPrefixAll(path,"/")
for doTrice := retry +1;doTrice>0;doTrice--{
instance,index,has:=app.service.Next(lastIndex)
path = utils.TrimPrefixAll(path, "/")
for doTrice := retry + 1; doTrice > 0; doTrice-- {
instance, index, has := app.service.Next(lastIndex)
lastIndex = index
if !has{
return nil,FinalTargetServer,RetryTargetServers,fmt.Errorf("not found instance for app:%s",app.service.Name)
if !has {
return nil, FinalTargetServer, RetryTargetServers, fmt.Errorf("not found instance for app:%s", app.service.Name)
}
FinalTargetServer = fmt.Sprintf("%s:%d",instance.IP,instance.Port)
RetryTargetServers = append(RetryTargetServers,FinalTargetServer)
u:= fmt.Sprintf("%s://%s:%d/%s",proto,instance.IP,instance.Port,path)
response,err =request(method,u,querys,header,body,timeout)
FinalTargetServer = fmt.Sprintf("%s:%d", instance.IP, instance.Port)
RetryTargetServers = append(RetryTargetServers, FinalTargetServer)
u := fmt.Sprintf("%s://%s:%d/%s", proto, instance.IP, instance.Port, path)
response, err = request(method, u, querys, header, body, timeout)
if err != nil{
if app.healthCheckHandler.IsNeedCheck(){
if err != nil {
if app.healthCheckHandler.IsNeedCheck() {
app.healthCheckHandler.Check(instance)
}
}else{
return response,FinalTargetServer,RetryTargetServers,err
} else {
return response, FinalTargetServer, RetryTargetServers, err
}
}
return response,FinalTargetServer,RetryTargetServers,err
return response, FinalTargetServer, RetryTargetServers, err
}
......@@ -5,5 +5,3 @@ type Balance struct {
Discovery string
AppConfig string
}
......@@ -5,11 +5,11 @@ import (
"github.com/eolinker/goku/goku-service/discovery"
)
func ResetBalances(balances []*Balance) {
func ResetBalances(balances []*Balance) {
bmap:=make(map[string]*Balance)
bmap := make(map[string]*Balance)
for _,b:=range balances{
for _, b := range balances {
bmap[b.Name] = b
}
......@@ -18,20 +18,20 @@ func ResetBalances(balances []*Balance) {
}
func GetByName(name string)(application.IHttpApplication,bool) {
b,has:=manager.get(name)
if !has{
return application.NewOrg(name),true
func GetByName(name string) (application.IHttpApplication, bool) {
b, has := manager.get(name)
if !has {
return application.NewOrg(name), true
}
sources,has:=discovery.GetDiscoverer(b.Discovery)
if has{
sources, has := discovery.GetDiscoverer(b.Discovery)
if has {
service, handler, yes:= sources.GetApp(b.AppConfig)
if yes{
return application.NewApplication(service,handler),true
service, handler, yes := sources.GetApp(b.AppConfig)
if yes {
return application.NewApplication(service, handler), true
}
}
return nil,false
}
\ No newline at end of file
return nil, false
}
......@@ -5,30 +5,26 @@ import (
)
var manager = &Manager{
locker:sync.RWMutex{},
balances:make(map[string]*Balance),
locker: sync.RWMutex{},
balances: make(map[string]*Balance),
}
type Manager struct {
locker sync.RWMutex
locker sync.RWMutex
balances map[string]*Balance
}
func (m *Manager)set(balances map[string]*Balance) {
func (m *Manager) set(balances map[string]*Balance) {
m.locker.Lock()
m.balances = balances
m.locker.Unlock()
}
func (m *Manager)get(name string)( *Balance,bool) {
func (m *Manager) get(name string) (*Balance, bool) {
m.locker.RLock()
b,has:=m.balances[name]
b, has := m.balances[name]
m.locker.RUnlock()
return b,has
return b, has
}
......@@ -6,7 +6,7 @@ import (
)
type InstanceFactory struct {
locker sync.RWMutex
locker sync.RWMutex
instances map[string]*Instance
}
......@@ -17,28 +17,28 @@ func NewInstanceFactory() *InstanceFactory {
}
}
func (m *InstanceFactory) General(ip string,port int,weight int)*Instance {
if weight<1{
func (m *InstanceFactory) General(ip string, port int, weight int) *Instance {
if weight < 1 {
weight = 1
}
key:=fmt.Sprintf("%s:%d-%d",ip,port,weight)
key := fmt.Sprintf("%s:%d-%d", ip, port, weight)
m.locker.RLock()
i,h:=m.instances[key]
i, h := m.instances[key]
m.locker.RUnlock()
if h{
if h {
i.Weight = weight
return i
}
m.locker.Lock()
i,h =m.instances[key]
if h{
i, h = m.instances[key]
if h {
m.locker.Unlock()
i.Weight = weight
return i
}
i =&Instance{
InstanceId: fmt.Sprintf("%s:%d",ip,port),
i = &Instance{
InstanceId: fmt.Sprintf("%s:%d", ip, port),
IP: ip,
Port: port,
Weight: weight,
......
......@@ -4,11 +4,11 @@ import "sync"
type Instance struct {
InstanceId string
IP string
Port int
Weight int
Status InstanceStatus
locker sync.RWMutex
IP string
Port int
Weight int
Status InstanceStatus
locker sync.RWMutex
}
type PInstances []*Instance
......@@ -17,32 +17,31 @@ func (p PInstances) Len() int {
}
func (p PInstances) Less(i, j int) bool {
return p[i].Weight<p[j].Weight
return p[i].Weight < p[j].Weight
}
func (p PInstances) Swap(i, j int) {
p[i],p[j]=p[j],p[i]
p[i], p[j] = p[j], p[i]
}
func (i*Instance)CheckStatus(status InstanceStatus) bool {
func (i *Instance) CheckStatus(status InstanceStatus) bool {
i.locker.RLock()
b:= i.Status == status
b := i.Status == status
i.locker.RUnlock()
return b
}
//ChangeStatus set status to desc where status is org
func (i *Instance)ChangeStatus(org,dest InstanceStatus)bool {
if org == dest{
func (i *Instance) ChangeStatus(org, dest InstanceStatus) bool {
if org == dest {
return i.CheckStatus(org)
}
i.locker.RLock()
b:= i.Status == org
b := i.Status == org
i.locker.RUnlock()
if !b{
if !b {
return false
}
......@@ -54,4 +53,4 @@ func (i *Instance)ChangeStatus(org,dest InstanceStatus)bool {
i.locker.Unlock()
return b
}
\ No newline at end of file
}
......@@ -6,26 +6,23 @@ import (
"sync"
)
type Service struct {
Name string
instances []*Instance
//lastIndex int
locker sync.RWMutex
locker sync.RWMutex
}
func NewService(name string,Instances []*Instance) *Service{
func NewService(name string, Instances []*Instance) *Service {
return &Service{
Name: name,
instances: Instances,
//lastIndex: 0,
locker: sync.RWMutex{},
locker: sync.RWMutex{},
}
}
func (s*Service)SetInstances(instances []*Instance) {
func (s *Service) SetInstances(instances []*Instance) {
sort.Sort(sort.Reverse(PInstances(instances)))
s.locker.Lock()
......@@ -39,57 +36,57 @@ func (s*Service)SetInstances(instances []*Instance) {
//}
}
func (s*Service)Weighting()(*Instance,int,bool) {
func (s *Service) Weighting() (*Instance, int, bool) {
s.locker.RLock()
instances:=s.instances
instances := s.instances
s.locker.RUnlock()
if len(instances)==0{
return nil,0,false
if len(instances) == 0 {
return nil, 0, false
}
weightSum:= 0
for _,ins:=range instances{
if ins.CheckStatus(InstanceRun){
weightSum := 0
for _, ins := range instances {
if ins.CheckStatus(InstanceRun) {
weightSum += ins.Weight
}
}
if weightSum ==0{
return nil,0,false
if weightSum == 0 {
return nil, 0, false
}
weightValue:= rand.Intn(weightSum)+1
for i,ins:=range instances{
if ins.CheckStatus(InstanceRun){
weightValue := rand.Intn(weightSum) + 1
for i, ins := range instances {
if ins.CheckStatus(InstanceRun) {
weightValue = weightValue - ins.Weight
if weightValue <=0{
return ins,i,true
if weightValue <= 0 {
return ins, i, true
}
}
}
return nil,0,false
return nil, 0, false
}
func (s*Service)Next(lastIndex int)(*Instance,int ,bool) {
if lastIndex == -1{
func (s *Service) Next(lastIndex int) (*Instance, int, bool) {
if lastIndex == -1 {
return s.Weighting()
}
s.locker.RLock()
instances := s.instances
s.locker.RUnlock()
size:= len(instances)
if size == 0{
return nil,0,false
size := len(instances)
if size == 0 {
return nil, 0, false
}
for i:=0;i<size;i++{
index:=(lastIndex +i)%size
instance:=instances[index]
if instance != nil{
if instance.CheckStatus(InstanceRun){
return instance,index,true
for i := 0; i < size; i++ {
index := (lastIndex + i) % size
instance := instances[index]
if instance != nil {
if instance.CheckStatus(InstanceRun) {
return instance, index, true
}
}
}
return nil,0,false
}
\ No newline at end of file
return nil, 0, false
}
......@@ -4,10 +4,8 @@ import "strings"
type InstanceStatus int
const (
InstanceRun= iota
InstanceRun = iota
InstanceDown
InstanceChecking
)
......@@ -24,8 +22,8 @@ func (status InstanceStatus) String() string {
return "unkown"
}
func ParseStatus(status string)InstanceStatus {
s:=strings.ToLower(status)
func ParseStatus(status string) InstanceStatus {
s := strings.ToLower(status)
switch s {
case "down":
......@@ -35,4 +33,4 @@ func ParseStatus(status string)InstanceStatus {
default:
return InstanceRun
}
}
\ No newline at end of file
}
......@@ -2,15 +2,14 @@ package discovery
type HealthCheckConfig struct {
IsHealthCheck bool
Url string
Second int
TimeOutMill int
StatusCode string
Url string
Second int
TimeOutMill int
StatusCode string
}
type Config struct {
Name string
Name string
Driver string
Config string
HealthCheckConfig
}
\ No newline at end of file
}
......@@ -4,14 +4,11 @@ import (
"github.com/eolinker/goku/goku-service/common"
)
type Discovery interface {
SetConfig(config string)error
Driver()string
SetConfig(config string) error
Driver() string
SetCallback(callback func(services []*common.Service))
GetServers()([]*common.Service,error)
Close()error
Open()error
GetServers() ([]*common.Service, error)
Close() error
Open() error
}
......@@ -7,13 +7,14 @@ import (
var (
//isLock =false
drivers =make(map[string]Driver)
driverNames =make([]string,0)
drivers = make(map[string]Driver)
driverNames = make([]string, 0)
)
func AllDrivers()[]string {
func AllDrivers() []string {
return driverNames
}
// main里应该调用这个方法,以锁住driver, @为了线程安全并且避免锁操作@
//func LockDriver(){
// if isLock{
......@@ -22,7 +23,7 @@ func AllDrivers()[]string {
// isLock=true
//}
func RegisteredDiscovery(name string,driver Driver) {
func RegisteredDiscovery(name string, driver Driver) {
//if isLock{
// panic("can not Register now")
......@@ -30,11 +31,11 @@ func RegisteredDiscovery(name string,driver Driver) {
name = strings.ToLower(name)
_,has:=drivers[name]
if has{
log.Panic("driver duplicate:"+name)
_, has := drivers[name]
if has {
log.Panic("driver duplicate:" + name)
}
drivers[name]=driver
drivers[name] = driver
driverNames =append(driverNames,name)
driverNames = append(driverNames, name)
}
......@@ -3,18 +3,18 @@ package discovery
import "sync"
type Driver interface {
Open(name string,config string)(ISource,error)
Open(name string, config string) (ISource, error)
}
type CreateHandler func(config string)Discovery
type CreateHandler func(config string) Discovery
type DriverBase struct {
createFunc CreateHandler
locker sync.RWMutex
sources map[string]*SourceDiscovery
locker sync.RWMutex
sources map[string]*SourceDiscovery
}
func NewDriver( createFunc CreateHandler) *DriverBase{
func NewDriver(createFunc CreateHandler) *DriverBase {
return &DriverBase{
createFunc: createFunc,
locker: sync.RWMutex{},
......@@ -25,26 +25,24 @@ func NewDriver( createFunc CreateHandler) *DriverBase{
func (d *DriverBase) Open(name string, config string) (ISource, error) {
d.locker.RLock()
s,h:= d.sources[name]
s, h := d.sources[name]
d.locker.RUnlock()
if h{
return s,s.SetDriverConfig(config)
if h {
return s, s.SetDriverConfig(config)
}
d.locker.Lock()
s,h = d.sources[name]
if h{
s, h = d.sources[name]
if h {
d.locker.Unlock()
return s,s.SetDriverConfig(config)
return s, s.SetDriverConfig(config)
}
ds:=d.createFunc(config)
ds := d.createFunc(config)
s,_ = NewSource(name,ds)
d.sources[name]=s
s, _ = NewSource(name, ds)
d.sources[name] = s
d.locker.Unlock()
return s,nil
return s, nil
}
......@@ -5,9 +5,9 @@ import (
"sync"
)
var manager=&Manager{
locker:sync.RWMutex{},
sources:make( map[string]ISource),
var manager = &Manager{
locker: sync.RWMutex{},
sources: make(map[string]ISource),
}
type Manager struct {
......@@ -16,49 +16,49 @@ type Manager struct {
sources map[string]ISource
}
func ResetAllServiceConfig(confs []*Config) {
func ResetAllServiceConfig(confs []*Config) {
sources:=make( map[string]ISource)
sources := make(map[string]ISource)
manager.locker.RLock()
oldSources:= manager.sources
oldSources := manager.sources
manager.locker.RUnlock()
for _,conf:=range confs{
for _, conf := range confs {
name := conf.Name
s,has:= oldSources[name]
if has && !s.CheckDriver(conf.Driver){
s, has := oldSources[name]
if has && !s.CheckDriver(conf.Driver) {
// 如果驱动不一样,关闭旧的
has = false
s.Close()
s=nil
delete(oldSources,name)
has = false
s.Close()
s = nil
delete(oldSources, name)
}
if !has{
driverName:=conf.Driver
driver,has:=drivers[driverName]
if !has{
log.Error("invalid driver:",driverName)
if !has {
driverName := conf.Driver
driver, has := drivers[driverName]
if !has {
log.Error("invalid driver:", driverName)
continue
}
ns, err:=driver.Open(name,conf.Config)
if err!=nil{
ns, err := driver.Open(name, conf.Config)
if err != nil {
continue
}
s=ns
s = ns
}
sources[name]=s
sources[name] = s
s.SetHealthConfig(&conf.HealthCheckConfig)
err:=s.SetDriverConfig(conf.Config)
err := s.SetDriverConfig(conf.Config)
if err!=nil{
if err != nil {
continue
}
}
for name,s:=range oldSources{
if _,has:=sources[name];!has{
for name, s := range oldSources {
if _, has := sources[name]; !has {
s.Close()
}
}
......@@ -68,9 +68,9 @@ func ResetAllServiceConfig(confs []*Config) {
manager.locker.Unlock()
}
func GetDiscoverer(discoveryName string)(ISource,bool) {
func GetDiscoverer(discoveryName string) (ISource, bool) {
manager.locker.RLock()
s,has:=manager.sources[discoveryName]
s, has := manager.sources[discoveryName]
manager.locker.RUnlock()
return s,has
}
\ No newline at end of file
return s, has
}
......@@ -10,48 +10,45 @@ import (
)
type ISource interface {
GetApp(app string)(*common.Service ,health.CheckHandler,bool)
GetApp(app string) (*common.Service, health.CheckHandler, bool)
SetHealthConfig(conf *HealthCheckConfig)
SetDriverConfig(config string)error
SetDriverConfig(config string) error
Close()
CheckDriver(driverName string)bool
CheckDriver(driverName string) bool
}
type SourceDiscovery struct {
name string
discovery Discovery
name string
discovery Discovery
healthCheckHandler health.CheckHandler
locker sync.RWMutex
locker sync.RWMutex
services map[string]*common.Service
}
func (s *SourceDiscovery) SetDriverConfig(config string)error {
func (s *SourceDiscovery) SetDriverConfig(config string) error {
return s.discovery.SetConfig(config)
}
func (s *SourceDiscovery) Close() {
instances := s.healthCheckHandler.Close()
for _,instance:=range instances{
instance.ChangeStatus(common.InstanceChecking,common.InstanceRun)
for _, instance := range instances {
instance.ChangeStatus(common.InstanceChecking, common.InstanceRun)
}
}
func (s *SourceDiscovery) CheckDriver(driverName string) bool {
if s.discovery == nil{
if s.discovery == nil {
return false
}
if s.discovery.Driver() == driverName{
if s.discovery.Driver() == driverName {
return true
}
return false
}
func (s *SourceDiscovery) SetHealthConfig(conf *HealthCheckConfig) {
if conf==nil || !conf.IsHealthCheck {
if conf == nil || !conf.IsHealthCheck {
s.Close()
return
}
......@@ -63,21 +60,21 @@ func (s *SourceDiscovery) SetHealthConfig(conf *HealthCheckConfig) {
time.Duration(conf.TimeOutMill)*time.Millisecond)
}
func (s *SourceDiscovery) GetApp(name string) (*common.Service,health.CheckHandler ,bool){
func (s *SourceDiscovery) GetApp(name string) (*common.Service, health.CheckHandler, bool) {
s.locker.RLock()
service,has:=s.services[name]
service, has := s.services[name]
s.locker.RUnlock()
if has{
return service,s.healthCheckHandler,true
if has {
return service, s.healthCheckHandler, true
}
return nil, nil,false
return nil, nil, false
}
func (s*SourceDiscovery)SetServices(services []*common.Service) {
func (s *SourceDiscovery) SetServices(services []*common.Service) {
serviceMap := make(map[string]*common.Service)
for _,se:=range services{
for _, se := range services {
serviceMap[se.Name] = se
}
......@@ -88,18 +85,19 @@ func (s*SourceDiscovery)SetServices(services []*common.Service) {
}
var ErrorEmptyDiscovery = errors.New("discovery is nil")
func NewSource(name string,d Discovery) (*SourceDiscovery ,error){
if d==nil || reflect.ValueOf(d).IsNil(){
return nil,ErrorEmptyDiscovery
func NewSource(name string, d Discovery) (*SourceDiscovery, error) {
if d == nil || reflect.ValueOf(d).IsNil() {
return nil, ErrorEmptyDiscovery
}
s:=&SourceDiscovery{
name:name,
discovery:d,
healthCheckHandler:new(health.CheckBox),
services:make(map[string]*common.Service),
locker:sync.RWMutex{},
s := &SourceDiscovery{
name: name,
discovery: d,
healthCheckHandler: new(health.CheckBox),
services: make(map[string]*common.Service),
locker: sync.RWMutex{},
}
//services, e := d.GetServers()
......@@ -111,5 +109,5 @@ func NewSource(name string,d Discovery) (*SourceDiscovery ,error){
d.SetCallback(s.SetServices)
return s,d.Open()
}
\ No newline at end of file
return s, d.Open()
}
......@@ -5,13 +5,11 @@ import (
)
type Driver struct {
}
func (d *Driver) Open(name string,config string)(discovery.ISource,error) {
func (d *Driver) Open(name string, config string) (discovery.ISource, error) {
panic("implement me")
}
type ConsulKeyValueDiscovery struct {
}
......@@ -3,7 +3,9 @@ package consul_kv
import (
"github.com/eolinker/goku/goku-service/discovery"
)
const DriverName = "consulKv"
func init() {
discovery.RegisteredDiscovery(DriverName,new(Driver))
discovery.RegisteredDiscovery(DriverName, new(Driver))
}
......@@ -3,7 +3,7 @@ package consul
import (
"context"
log "github.com/eolinker/goku/goku-log"
"github.com/eolinker/goku/goku-service/common"
"github.com/eolinker/goku/goku-service/common"
"github.com/hashicorp/consul/api"
"time"
)
......@@ -35,8 +35,6 @@ func (d *ConsulDiscovery) SetConfig(config string) error {
}
d.client = client
return nil
}
......@@ -99,7 +97,7 @@ func (d *ConsulDiscovery) GetServicesInTime() (map[string][]string, map[string][
catalogServices := make(map[string][]*api.ServiceEntry)
for serviceName := range services {
cs, _, err := d.client.Health().Service(serviceName, "",true, q)
cs, _, err := d.client.Health().Service(serviceName, "", true, q)
if err != nil {
log.Info(err.Error())
continue
......@@ -159,7 +157,7 @@ func (d *ConsulDiscovery) execCallbacks(services map[string][]string, catalogSer
for appName, catalogInstances := range catalogServices {
size := len(catalogInstances)
if size == 0{
if size == 0 {
continue
}
hosts := make([]*common.Instance, size)
......
......@@ -3,13 +3,15 @@ package consul
import (
"github.com/eolinker/goku/goku-service/discovery"
)
const DriverName = "consul"
func init() {
discovery.RegisteredDiscovery(DriverName,discovery.NewDriver(Create))
discovery.RegisteredDiscovery(DriverName, discovery.NewDriver(Create))
}
func Create(config string)discovery.Discovery {
func Create(config string) discovery.Discovery {
return NewConsulDiscovery(config)
}
\ No newline at end of file
}
......@@ -16,32 +16,30 @@ import (
)
type Eureka struct {
services []*common.Service
services []*common.Service
//AppNames map[string]string
eurekaUrl []string
weightKey string
callback func(services []*common.Service)
ct uint64
cancelFunc context.CancelFunc
eurekaUrl []string
weightKey string
callback func(services []*common.Service)
ct uint64
cancelFunc context.CancelFunc
instanceFactory *common.InstanceFactory
}
func (d *Eureka) SetConfig(config string) error {
tags:=strings.Split(config,";")
tags := strings.Split(config, ";")
weightKey := ""
if len(tags)>1{
weightKey=tags[1]
if len(tags) > 1 {
weightKey = tags[1]
}
urls:=strings.Split(tags[0],",")
urls := strings.Split(tags[0], ",")
d.setConfig(urls,weightKey)
d.setConfig(urls, weightKey)
return nil
}
func (d *Eureka)setConfig(eurekaUrl []string,weightKey string) {
func (d *Eureka) setConfig(eurekaUrl []string, weightKey string) {
d.eurekaUrl = eurekaUrl
d.weightKey = weightKey
}
......@@ -54,11 +52,11 @@ func (d *Eureka) SetCallback(callback func(services []*common.Service)) {
}
func (d *Eureka) GetServers() ([]*common.Service, error) {
return d.services,nil
return d.services, nil
}
func (d *Eureka) Close() error {
if d.cancelFunc !=nil{
if d.cancelFunc != nil {
d.cancelFunc()
d.cancelFunc = nil
}
......@@ -66,12 +64,12 @@ func (d *Eureka) Close() error {
}
func (d *Eureka) Open() error {
d.ScheduleAtFixedRate(time.Second*5)
d.ScheduleAtFixedRate(time.Second * 5)
return nil
}
func NewEurekaDiscovery(config string) *Eureka {
e:= &Eureka{
e := &Eureka{
services: nil,
callback: nil,
ct: 0,
......@@ -82,45 +80,44 @@ func NewEurekaDiscovery(config string) *Eureka {
return e
}
func (d *Eureka) execCallbacks(apps *Applications) {
if d.callback == nil{
if d.callback == nil {
return
}
if apps ==nil{
if apps == nil {
d.callback(nil)
return
}
if len(apps.Applications) == 0{
if len(apps.Applications) == 0 {
d.callback(nil)
return
}
services:=make([]*common.Service,0,len(apps.Applications))
for _,app:=range apps.Applications{
inses:= make([]*common.Instance,0,len(app.Instances))
for _,ins:=range app.Instances{
if ins.Status != EurekaStatusUp{
services := make([]*common.Service, 0, len(apps.Applications))
for _, app := range apps.Applications {
inses := make([]*common.Instance, 0, len(app.Instances))
for _, ins := range app.Instances {
if ins.Status != EurekaStatusUp {
continue
}
weight:= 0
if w,has:=ins.Metadata.Map[d.weightKey];has{
weight,_ = strconv.Atoi(w)
weight := 0
if w, has := ins.Metadata.Map[d.weightKey]; has {
weight, _ = strconv.Atoi(w)
}
if weight == 0{
if weight == 0 {
weight = 1
}
port:= 0
if ins.Port.Enabled{
port := 0
if ins.Port.Enabled {
port = ins.Port.Port
} else if ins.SecurePort.Enabled{
} else if ins.SecurePort.Enabled {
port = ins.SecurePort.Port
}
inses = append(inses, d.instanceFactory.General(ins.IpAddr, port,weight))
inses = append(inses, d.instanceFactory.General(ins.IpAddr, port, weight))
}
server:=common.NewService(app.Name,inses)
services = append(services,server)
server := common.NewService(app.Name, inses)
services = append(services, server)
}
d.callback(services)
......@@ -129,17 +126,17 @@ func (d *Eureka) execCallbacks(apps *Applications) {
func (d *Eureka) ScheduleAtFixedRate(second time.Duration) {
d.run()
if d.cancelFunc !=nil{
if d.cancelFunc != nil {
d.cancelFunc()
d.cancelFunc = nil
}
ctx,cancel:=context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
d.cancelFunc = cancel
go d.runTask(second,ctx)
go d.runTask(second, ctx)
}
func (d *Eureka) runTask(second time.Duration,ctx context.Context) {
func (d *Eureka) runTask(second time.Duration, ctx context.Context) {
timer := time.NewTicker(second)
for {
select {
......@@ -164,11 +161,11 @@ func (d *Eureka) run() {
func (d *Eureka) GetApplications() (*Applications, error) {
//url := c.eurekaUrl + "/apps"
url, err:= d.getEurekaServerUrl()
if err!= nil{
return nil,err
url, err := d.getEurekaServerUrl()
if err != nil {
return nil, err
}
url = fmt.Sprintf("%s/apps",url)
url = fmt.Sprintf("%s/apps", url)
res, err := http.Get(url)
if err != nil {
......@@ -190,20 +187,20 @@ func (d *Eureka) GetApplications() (*Applications, error) {
return applications, err
}
func (d *Eureka) getEurekaServerUrl()( string ,error){
func (d *Eureka) getEurekaServerUrl() (string, error) {
ct := atomic.AddUint64(&d.ct, 1)
size := len(d.eurekaUrl)
if size == 0 {
e:= NilPointError("eureka url is empty")
e := NilPointError("eureka url is empty")
return "",e
return "", e
}
index := int(ct) % size
url := d.eurekaUrl[index]
//if strings.LastIndex(url,"/")>-1{
url = strings.TrimSuffix(url, "/")
//}
return url,nil
return url, nil
}
func (d *Eureka) Health() (bool, string) {
......
......@@ -3,13 +3,15 @@ package eureka
import (
"github.com/eolinker/goku/goku-service/discovery"
)
const DriverName = "eureka"
const EurekaStatusUp ="UP"
const EurekaStatusUp = "UP"
func init() {
discovery.RegisteredDiscovery(DriverName,discovery.NewDriver(Create))
discovery.RegisteredDiscovery(DriverName, discovery.NewDriver(Create))
}
func Create(config string) discovery.Discovery {
return NewEurekaDiscovery(config)
}
\ No newline at end of file
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册