提交 eb06f999 编写于 作者: Y Your Name

控制台和节点间的通讯方式改为tcp通讯

上级 531e16e4
package cmd
import (
"encoding/json"
"errors"
"github.com/eolinker/goku-api-gateway/config"
)
var (
ErrorInvalidNodeInstance = errors.New("invalid instance value")
ErrorInvalidNodeConfig = errors.New("invalid instance config")
)
type Code string
const (
None Code = "none"
NodeRegister Code = "register"
NodeRegisterResult Code = "register-result"
NodeLevel Code = "level"
Config Code = "config"
Restart Code = "restart"
Stop Code = "stop"
Monitor Code = "monitor"
EventClientLeave Code = "leave"
Error Code = "error"
)
func EncodeConfig(c *config.GokuConfig) ([]byte, error) {
if c == nil {
return nil, ErrorInvalidNodeConfig
}
return json.Marshal(c)
}
func DecodeConfig(data []byte) (*config.GokuConfig, error) {
if len(data) == 0 {
return nil, ErrorInvalidNodeConfig
}
c := new(config.GokuConfig)
err := json.Unmarshal(data, c)
if err != nil {
return nil, err
}
return c, nil
}
package cmd
import (
"context"
"errors"
"net"
"sync"
)
var (
ErrorSendToClosedConnect = errors.New("send to closed connect")
)
type Connect struct {
conn net.Conn
//inputC chan _Frame
outputC chan []byte
doneC chan struct{}
ctx context.Context
cancelFunc context.CancelFunc
once sync.Once
}
func NewConnect(conn net.Conn) *Connect {
ctx, cancel := context.WithCancel(context.Background())
c := &Connect{
conn: conn,
//inputC: make(chan _Frame,10),
outputC: make(chan []byte, 10),
ctx: ctx,
cancelFunc: cancel,
}
go c.r()
return c
}
func (c *Connect) r() {
for {
frame, err := ReadFrame(c.conn)
if err != nil {
break
}
c.outputC <- frame
}
close(c.outputC)
}
func (c *Connect) Close() error {
c.once.Do(func() {
c.cancelFunc()
c.conn.Close()
})
return nil
}
func (c *Connect) LocalAddr() net.Addr {
return c.conn.LocalAddr()
}
func (c *Connect) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}
func (c *Connect) Send(code Code, data []byte) error {
return SendFrame(c.conn, code, data)
}
func (c *Connect) ReadC() <-chan []byte {
return c.outputC
}
func (c *Connect) Done() <-chan struct{} {
return c.ctx.Done()
}
package cmd
type ErrorInfo struct {
Error string `json:"error"`
}
func DecodeError(data []byte) (string, error) {
return string(data), nil
}
func EncodeError(err string) ([]byte, error) {
return []byte(err), nil
}
package cmd
import (
"encoding/json"
"github.com/eolinker/goku-api-gateway/config"
)
type RegisterResult struct {
Code int
Error string
Config *config.GokuConfig
}
func DecodeRegisterResult(data []byte) (*RegisterResult, error) {
r := new(RegisterResult)
err := json.Unmarshal(data, r)
if err != nil {
return nil, err
}
return r, nil
}
func EncodeRegisterResultConfig(c *config.GokuConfig) ([]byte, error) {
r := RegisterResult{
Code: 0,
Error: "",
Config: c,
}
return json.Marshal(r)
}
func EncodeRegisterResultError(err string) ([]byte, error) {
r := RegisterResult{
Code: -1,
Error: err,
Config: nil,
}
return json.Marshal(r)
}
func DecodeRegister(data []byte) (string, error) {
if len(data) == 32 {
return string(data), nil
}
return "", ErrorInvalidNodeInstance
}
func EncodeRegister(nodeKey string) ([]byte, error) {
data := []byte(nodeKey)
if len(data) == 32 {
return data, nil
}
return nil, ErrorInvalidNodeInstance
}
package cmd
import (
"bytes"
"encoding/binary"
"errors"
"io"
"github.com/eolinker/goku-api-gateway/common/ioutils"
)
var (
ErrorEmptyFrame = errors.New("empty frame")
ErrorInvalidCode = errors.New("invalid code")
)
func ReadFrame(reader io.Reader) ([]byte, error) {
sizeBuf := make([]byte, 4, 4)
// 获取报文头部信息
_, err := io.ReadFull(reader, sizeBuf)
if err != nil {
return nil, err
}
// 获取报文数据大小
size := binary.BigEndian.Uint32(sizeBuf)
data := make([]byte, size, size)
_, e := io.ReadFull(reader, data)
if e != nil {
return nil, err
}
return data, nil
}
func GetCmd(frame []byte) (Code, []byte, error) {
frameLen := len(frame)
if frameLen < 5 {
// 长度小于5时,报文没有数据
return "", nil, ErrorEmptyFrame
}
buf := bytes.NewBuffer(frame)
codeData, n, err := ioutils.ReadLField(buf, nil)
if err != nil {
return "", nil, err
}
return Code(codeData), frame[n:], nil
}
func SendError(w io.Writer, err error) {
if err == nil {
return
}
data := []byte(err.Error())
SendFrame(w, Error, data)
}
func SendFrame(w io.Writer, code Code, data []byte) error {
codeData := []byte(code)
size := uint32(len(data) + len(codeData) + 1)
sizeAll := size + 4
buf := bytes.NewBuffer(make([]byte, sizeAll, sizeAll))
buf.Reset()
err := binary.Write(buf, binary.BigEndian, size)
if err != nil {
return err
}
_, err = ioutils.WriteLField(buf, codeData)
if err != nil {
return err
}
if len(data) > 0 {
_, err := buf.Write(data)
if err != nil {
return err
}
}
//b:= buf.Bytes()
//_,err =w.Write(b)
_, err = buf.WriteTo(w)
return err
}
package console
import (
"github.com/eolinker/goku-api-gateway/admin/cmd"
goku_log "github.com/eolinker/goku-api-gateway/goku-log"
)
type Callback func(code cmd.Code, data []byte,client *Client) error
func (c Callback) ServerCode(code cmd.Code, data []byte, client *Client) error{
return c(code,data,client)
}
type CodeHandler interface {
ServerCode(code cmd.Code, data []byte,client *Client)error
}
//Register cmd 回调注册器
type Register struct {
callbacks map[cmd.Code][]CodeHandler
}
//NewRegister create register
func NewRegister() *Register {
return &Register{
callbacks: make(map[cmd.Code][]CodeHandler),
}
}
//Register 注册回调
func (s *Register) Register(code cmd.Code,handler CodeHandler){
s.callbacks[code] = append(s.callbacks[code],handler)
}
//Register 注册回调
func (s *Register) RegisterFunc(code cmd.Code,callback func(code cmd.Code, data []byte,client *Client) error){
s.callbacks[code] = append(s.callbacks[code],Callback(callback))
}
//Callback 调用回调
func (s *Register)Callback(code cmd.Code,data []byte,client *Client)error {
m:=s.callbacks
callbacks,has:= m[code]
if !has{
goku_log.Info("not exists call for ",code)
return nil
}
for _,handler:=range callbacks{
if e:=handler.ServerCode(code,data,client);e!=nil{
return e
}
}
return nil
}
\ No newline at end of file
package console
import (
"fmt"
"sync"
"github.com/eolinker/goku-api-gateway/common/listener"
)
type ClientManager struct {
clients map[string]*Client
locker sync.RWMutex
intercept *listener.Intercept
}
var (
clientManager = &ClientManager{
clients: make(map[string]*Client),
locker: sync.RWMutex{},
intercept: listener.NewIntercept(),
}
)
func (m *ClientManager) Add(client *Client) (err error) {
if _, has := m.Get(client.instance); has {
return ErrorDuplicateInstance
}
e := m.intercept.Call(client)
if e != nil {
return e
}
m.locker.Lock()
_, has := m.clients[client.instance]
if has {
fmt.Println(client.instance)
err = ErrorDuplicateInstance
} else {
m.clients[client.instance] = client
err = nil
}
m.locker.Unlock()
return
}
func (m *ClientManager) Get(instance string) (*Client, bool) {
m.locker.RLock()
c, has := m.clients[instance]
m.locker.RUnlock()
return c, has
}
func (m *ClientManager) Remove(instance string) {
m.locker.Lock()
delete(m.clients, instance)
m.locker.Unlock()
}
func InterceptNodeRegister(f func(client *Client) error) {
clientManager.intercept.Add(func(v interface{}) error {
c := v.(*Client)
return f(c)
})
}
func IsLive(key string) bool {
_, b := clientManager.Get(key)
return b
}
package console
import (
"net"
"github.com/eolinker/goku-api-gateway/admin/cmd"
"github.com/eolinker/goku-api-gateway/config"
entity "github.com/eolinker/goku-api-gateway/server/entity/console-entity"
)
type Client struct {
*cmd.Connect
instance string
}
func NewClient(conn net.Conn, instance string) *Client {
return &Client{
Connect: cmd.NewConnect(conn),
instance: instance,
}
}
func (c *Client) Instance() string {
return c.instance
}
func (c *Client) SendConfig(conf *config.GokuConfig, nodeInfo *entity.Node) error {
nodeConfig := toNodeConfig(conf, nodeInfo)
data, err := cmd.EncodeConfig(nodeConfig)
if err != nil {
return err
}
return c.Send(cmd.Config, data)
}
func (c *Client) SendRunCMD(operate string) error {
if operate == "stop" {
return c.Send(cmd.Stop, []byte(""))
} else if operate == "restart" {
return c.Send(cmd.Restart, []byte(""))
}
return nil
}
package console
import (
"github.com/eolinker/goku-api-gateway/config"
entity "github.com/eolinker/goku-api-gateway/server/entity/console-entity"
)
func toNodeConfig(c *config.GokuConfig, nodeInfo *entity.Node) *config.GokuConfig {
conf := *c
conf.Cluster = nodeInfo.Cluster
conf.BindAddress = nodeInfo.ListenAddress
conf.AdminAddress = nodeInfo.AdminAddress
conf.Instance = nodeInfo.NodeKey
return &conf
}
package console
import (
"context"
"fmt"
"net"
"sync"
"github.com/eolinker/goku-api-gateway/admin/cmd"
"github.com/eolinker/goku-api-gateway/console/module/node"
"github.com/eolinker/goku-api-gateway/console/module/versionConfig"
)
var (
register *Register
ctx context.Context
cancelFunc context.CancelFunc
once sync.Once
)
func Stop() {
cancelFunc()
}
func Start(addr string) error {
once.Do(func() {
versionConfig.InitVersionConfig()
register = doRegister()
})
var lc net.ListenConfig
ctx, cancelFunc = context.WithCancel(context.Background())
listener, err := lc.Listen(ctx, "tcp", addr)
if err != nil {
return err
}
go doAccept(listener)
return nil
}
func doAccept(listener net.Listener) {
for {
conn, e := listener.Accept()
if e != nil {
listener.Close()
return
}
go startClient(conn)
}
}
func readClient(conn net.Conn) (string, error) {
frame, e := cmd.ReadFrame(conn)
if e != nil {
return "", e
}
code, data, e := cmd.GetCmd(frame)
if e != nil {
return "", e
}
if code != cmd.NodeRegister {
return "", ErrorNeedRegister
}
instance, err := cmd.DecodeRegister(data)
if err != nil {
return "", err
}
return instance, nil
}
func startClient(conn net.Conn) {
instance, err := readClient(conn)
if err != nil {
return
}
fmt.Println(instance)
if !node.Lock(instance) {
data, err := cmd.EncodeRegisterResultError(ErrorDuplicateInstance.Error())
if err == nil {
cmd.SendFrame(conn, cmd.NodeRegisterResult, data)
}
//conn.Close()
return
}
client := NewClient(conn, instance)
defer func() {
node.UnLock(instance)
NodeLeave(client)
_ = client.Close()
}()
e := NodeRegister(client)
if e != nil {
data, err := cmd.EncodeRegisterResultError(e.Error())
if err == nil {
cmd.SendFrame(conn, cmd.NodeRegisterResult, data)
}
return
}
for {
select {
case frame, ok := <-client.ReadC():
{
if !ok {
return
}
code, data, e := cmd.GetCmd(frame)
if e != nil {
return
}
err := register.Callback(code, data, client)
if err != nil {
return
}
}
case <-client.Done():
return
}
}
}
package console
import (
"errors"
"github.com/eolinker/goku-api-gateway/admin/cmd"
"github.com/eolinker/goku-api-gateway/config"
"github.com/eolinker/goku-api-gateway/console/module/node"
"github.com/eolinker/goku-api-gateway/console/module/versionConfig"
log "github.com/eolinker/goku-api-gateway/goku-log"
entity "github.com/eolinker/goku-api-gateway/server/entity/console-entity"
)
var (
ErrorDuplicateInstance = errors.New("duplicate instance")
ErrorNeedRegister = errors.New("need register")
)
func NodeRegister(client *Client) error {
err := clientManager.Add(client)
if err != nil {
return err
}
nodeInfo, err := node.GetNodeInfoByKey(client.instance)
if err != nil {
return ErrorDuplicateInstance
}
result, err := versionConfig.GetConfig(nodeInfo.Cluster)
if err != nil {
return err
}
nodeConf := toNodeConfig(result, nodeInfo)
data, _ := cmd.EncodeRegisterResultConfig(nodeConf)
return client.Send(cmd.NodeRegisterResult, data)
}
func NodeLeave(client *Client) {
clientManager.Remove(client.instance)
}
func getNodeMapByCluster() (map[string][]*entity.Node, error) {
nodes, e := node.GetAllNode()
if e != nil {
return nil, e
}
nodeMap := make(map[string][]*entity.Node)
for _, node := range nodes {
nodeMap[node.Cluster] = append(nodeMap[node.Cluster], node)
}
return nodeMap, nil
}
func OnConfigChange(conf map[string]*config.GokuConfig) {
nodeMap, err := getNodeMapByCluster()
if err != nil {
log.Warn(err)
}
for cluster, c := range conf {
for _, nodeInfo := range nodeMap[cluster] {
client, has := clientManager.Get(nodeInfo.NodeKey)
if has {
_ = client.SendConfig(c, nodeInfo)
}
}
}
}
func StopNode(nodeKey string) {
client, has := clientManager.Get(nodeKey)
if has {
_ = client.SendRunCMD("stop")
NodeLeave(client)
}
}
func RestartNode(nodeKey string) {
client, has := clientManager.Get(nodeKey)
if has {
NodeLeave(client)
node.UnLock(nodeKey)
_ = client.SendRunCMD("restart")
}
}
package console
import (
"github.com/eolinker/goku-api-gateway/admin/cmd"
"github.com/eolinker/goku-api-gateway/console/module/versionConfig"
log "github.com/eolinker/goku-api-gateway/goku-log"
)
var(
callbacksInit = NewRegister()
)
func doRegister()*Register{
r:=callbacksInit
callbacksInit = nil
versionConfig.AddCallback(OnConfigChange)
return r
}
func AddRegisterHandler(code cmd.Code,handler CodeHandler) {
if callbacksInit == nil{
log.Panic("not allow register now")
}
callbacksInit.Register(code,handler)
}
func AddRegisterFunc(code cmd.Code,handleFunc func(code cmd.Code, data []byte,client *Client) error) {
if callbacksInit == nil{
log.Panic("not allow register now")
}
callbacksInit.RegisterFunc(code,handleFunc)
}
\ No newline at end of file
package node
import (
"github.com/eolinker/goku-api-gateway/admin/cmd"
)
type Callback func(code cmd.Code, data []byte) error
func (c Callback) ServerCode(code cmd.Code, data []byte) error {
return c(code, data)
}
type CodeHandler interface {
ServerCode(code cmd.Code, data []byte) error
}
//Register cmd 回调注册器
type Register struct {
callbacks map[cmd.Code][]CodeHandler
}
//NewRegister create register
func NewRegister() *Register {
return &Register{
callbacks: make(map[cmd.Code][]CodeHandler),
}
}
//Register 注册回调
func (s *Register) Register(code cmd.Code, handler CodeHandler) {
s.callbacks[code] = append(s.callbacks[code], handler)
}
//Register 注册回调
func (s *Register) RegisterFunc(code cmd.Code, callback func(code cmd.Code, data []byte) error) {
s.callbacks[code] = append(s.callbacks[code], Callback(callback))
}
//Callback 调用回调
func (s *Register) Callback(code cmd.Code, data []byte) error {
m := s.callbacks
for _, handler := range m[code] {
if e := handler.ServerCode(code, data); e != nil {
return e
}
}
return nil
}
package node
import (
"github.com/eolinker/goku-api-gateway/admin/cmd"
)
func (c *TcpConsole)OnConfigChange(code cmd.Code,data []byte) error {
conf,err:= cmd.DecodeConfig(data)
if err!=nil{
return err
}
c.lastConfig.Set(conf)
c.listener.Call(conf)
return nil
}
\ No newline at end of file
package node
import (
"context"
"errors"
"net"
"sync"
"time"
"github.com/eolinker/goku-api-gateway/admin/cmd"
"github.com/eolinker/goku-api-gateway/common/listener"
"github.com/eolinker/goku-api-gateway/common/manager"
"github.com/eolinker/goku-api-gateway/config"
"github.com/eolinker/goku-api-gateway/node/console"
)
type TcpConsole struct {
conn *cmd.Connect
addr string
lock sync.Mutex
instance string
register *Register
listener *listener.Listener
lastConfig *manager.Value
ctx context.Context
cancel context.CancelFunc
listenOnce sync.Once
}
func (c *TcpConsole) SendMonitor(data []byte) error {
return c.conn.Send(cmd.Monitor, data)
}
func (c *TcpConsole) GetConfig() (*config.GokuConfig, error) {
conf, b := c.lastConfig.Get()
if b {
return conf.(*config.GokuConfig), nil
}
return nil, errors.New("not register to console")
}
func (c *TcpConsole) Close() {
c.cancel()
}
func (c *TcpConsole) AddListen(callback console.ConfigCallbackFunc) {
c.listener.Listen(func(event interface{}) {
conf := event.(*config.GokuConfig)
callback(conf)
})
}
func NewConsole(addr string, instance string) *TcpConsole {
c := &TcpConsole{
addr: addr,
instance: instance,
conn: nil,
register: NewRegister(),
listener: listener.New(),
lastConfig: manager.NewValue(),
}
c.register.RegisterFunc(cmd.Config, c.OnConfigChange)
c.register.RegisterFunc(cmd.Restart, Restart)
c.register.RegisterFunc(cmd.Stop, Stop)
return c
}
func connect(addr string) net.Conn {
sleeps := []time.Duration{time.Second * 0, time.Second * 1, time.Second * 5, time.Second * 10}
maxSleep := sleeps[len(sleeps)-1]
retry := 0
for {
if retry > 0 {
if retry > len(sleeps)-1 {
time.Sleep(maxSleep)
} else {
time.Sleep(sleeps[retry])
}
}
conn, err := net.Dial("tcp", addr)
if err != nil {
continue
}
return conn
}
}
func (c *TcpConsole) RegisterToConsole() (*config.GokuConfig, error) {
data, err := cmd.EncodeRegister(c.instance)
if err != nil {
return nil, err
}
for {
conn := connect(c.addr)
e := cmd.SendFrame(conn, cmd.NodeRegister, data)
if e != nil {
conn.Close()
continue
}
frame, err := cmd.ReadFrame(conn)
if err != nil {
conn.Close()
continue
}
code, data, err := cmd.GetCmd(frame)
if err != nil {
conn.Close()
return nil, err
}
if code != cmd.NodeRegisterResult {
conn.Close()
return nil, ErrorNeedReadRegisterResult
}
result, err := cmd.DecodeRegisterResult(data)
if err != nil {
conn.Close()
return nil, err
}
if result.Code != 0 {
conn.Close()
return nil, errors.New(result.Error)
}
c.conn = cmd.NewConnect(conn)
return result.Config, nil
}
}
func (c *TcpConsole) Listen() {
c.listenOnce.Do(
func() {
go func() {
for {
c.listenRead()
c.RegisterToConsole()
}
}()
})
}
func (c *TcpConsole) listenRead() {
defer c.conn.Close()
for {
select {
case <-c.conn.Done():
return
case frame, ok := <-c.conn.ReadC():
{
if !ok {
return
}
code, data, e := cmd.GetCmd(frame)
if e != nil {
return
}
err := c.register.Callback(code, data)
if err != nil {
return
}
}
}
}
}
package node
import "errors"
var (
ErrorReadRegisterResultTimeOut = errors.New("read register result timeout")
ErrorNeedReadRegisterResult = errors.New("need register-result but not")
ErrorConsoleRefuse = errors.New("console refuse")
)
package node
import (
goku_log "github.com/eolinker/goku-api-gateway/goku-log"
"github.com/eolinker/goku-api-gateway/admin/cmd"
)
func Restart(code cmd.Code, data []byte) error {
goku_log.Info("restart")
//endless.RestartServer()
return nil
}
func Stop(code cmd.Code, data []byte) error {
goku_log.Info("stop")
//endless.StopServer()
return nil
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册