提交 9120c74a 编写于 作者: H heyanlong

New agent

上级 2ebeafc8
......@@ -11,6 +11,7 @@ import (
"log"
"net"
"os"
"sync"
"time"
)
......@@ -27,6 +28,7 @@ type Agent struct {
socket string
socketListener net.Listener
register chan *register
registerCache sync.Map
trace chan string
queue *list.List
}
......@@ -52,7 +54,6 @@ func NewAgent() *Agent {
}()
go agent.sub()
go agent.send()
return agent
}
......@@ -112,8 +113,13 @@ func (t *Agent) listenSocket() {
}
func (t *Agent) sub() {
heartbeatTicker := time.NewTicker(time.Duration(time.Second * 40))
defer heartbeatTicker.Stop()
for {
select {
case <-heartbeatTicker.C:
go t.heartbeat()
case register := <-t.register:
go t.reg(register)
case trace := <-t.trace:
......
package service
import (
"agent/agent/pb/agent"
"agent/agent/pb/register2"
"context"
"fmt"
"time"
)
func (t *Agent) heartbeat() {
var client5 agent.InstanceDiscoveryServiceClient
var client6 register2.ServiceInstancePingClient
t.registerCache.Range(func(key, value interface{}) bool {
fmt.Println("heartbeat => ...")
bind := value.(registerCache)
if bind.Version == 5 {
if client5 == nil {
client5 = agent.NewInstanceDiscoveryServiceClient(t.conn)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
_, err := client5.Heartbeat(ctx, &agent.ApplicationInstanceHeartbeat{
ApplicationInstanceId: bind.InstanceId,
HeartbeatTime: time.Now().UnixNano() / 1000000,
})
if err != nil {
fmt.Println("heartbeat =>", err)
} else {
fmt.Printf("heartbeat => %d %d\n", bind.AppId, bind.InstanceId)
}
} else if bind.Version == 6 {
if client6 == nil {
client6 = register2.NewServiceInstancePingClient(t.conn)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
_, err := client6.DoPing(ctx, &register2.ServiceInstancePingPkg{
ServiceInstanceId: bind.InstanceId,
Time: time.Now().UnixNano() / 1000000,
ServiceInstanceUUID: bind.Uuid,
})
if err != nil {
fmt.Println("heartbeat =>", err)
} else {
fmt.Printf("heartbeat => %d %d\n", bind.AppId, bind.InstanceId)
}
}
return true
})
}
package service
type registerCache struct {
Version int
AppId int32
InstanceId int32
Uuid string
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册