提交 5859a7f9 编写于 作者: H heyanlong

agent

上级 2a65b024
......@@ -2,6 +2,7 @@ package main
import (
"agent/agent/pb5"
"agent/agent/service"
"context"
"encoding/json"
"fmt"
......@@ -151,14 +152,9 @@ func register(c net.Conn, j string) {
log.Println("register => ", err)
log.Println("register => Start register error...")
}
}
}
func sendTrace(json string) {
fmt.Println(json)
}
func handleConn(c net.Conn) {
defer func() {
err := recover()
......@@ -191,7 +187,7 @@ func handleConn(c net.Conn) {
go register(c, body[1:])
} else if body[:1] == "1" {
log.Println("Service send trace protocol")
go sendTrace(body[1:])
go service.SendTrace(grpcConn, body[1:])
}
json = json[endIndex+1:]
} else {
......@@ -201,9 +197,41 @@ func handleConn(c net.Conn) {
}
}
func heartbeat() {
defer func() {
err := recover()
if err != nil {
log.Println("System error[register]:", err)
go heartbeat()
}
}()
for {
registerMap.Range(func(key, value interface{}) bool {
log.Println("heartbeat => ...")
bind := value.(PHPSkyBind)
c := pb5.NewInstanceDiscoveryServiceClient(grpcConn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
_, err := c.Heartbeat(ctx, &pb5.ApplicationInstanceHeartbeat{
ApplicationInstanceId: bind.InstanceId,
HeartbeatTime: time.Now().UnixNano(),
})
if err != nil {
log.Println("heartbeat =>", err)
}
return true
})
time.Sleep(time.Second * 40)
}
}
func main() {
// connection to sky server
log.Println("hello")
var err error
grpcConn, err = grpc.Dial("172.16.68.37:11800", grpc.WithInsecure())
......@@ -223,6 +251,8 @@ func main() {
}
defer l.Close()
go heartbeat()
for {
c, err := l.Accept()
if err != nil {
......
package service
import (
"agent/agent/pb5"
"context"
"encoding/json"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"log"
"strconv"
"strings"
"time"
)
type trace struct {
ApplicationInstance int32 `json:"application_instance"`
Pid int `json:"pid"`
ApplicationId int32 `json:"application_id"`
Version int `json:"version"`
Segment segment `json:"segment"`
GlobalTraceIds []string `json:"globalTraceIds"`
}
type segment struct {
TraceSegmentId string `json:"traceSegmentId"`
IsSizeLimited int `json:"isSizeLimited"`
Spans []span `json:"spans"`
}
type span struct {
Tags map[string]string `json:"tags"`
SpanId int32 `json:"spanId"`
ParentSpanId int32 `json:"parentSpanId"`
StartTime int64 `json:"startTime"`
OperationName string `json:"operationName"`
Peer string `json:"peer"`
SpanType int32 `json:"spanType"`
SpanLayer int32 `json:"spanLayer"`
ComponentId int32 `json:"componentId"`
ComponentName string `json:"component"`
Refs []ref `json:"refs"`
EndTime int64 `json:"endTime"`
IsError int `json:"isError"`
}
type ref struct {
Type int32 `json:"type"`
ParentTraceSegmentId string `json:"parentTraceSegmentId"`
ParentSpanId int32 `json:"parentSpanId"`
ParentApplicationInstanceId int32 `json:"parentApplicationInstanceId"`
NetworkAddress string `json:"networkAddress"`
EntryApplicationInstanceId int32 `json:"entryApplicationInstanceId"`
EntryServiceName string `json:"entryServiceName"`
ParentServiceName string `json:"parentServiceName"`
}
func SendTrace(conn *grpc.ClientConn, j string) {
info := trace{}
err := json.Unmarshal([]byte(j), &info)
if err != nil {
log.Println("trace => ", err)
return
}
log.Println("trace => Start trace...")
c := pb5.NewTraceSegmentServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
client, err := c.Collect(ctx)
if err != nil {
log.Println("trace => ", err)
return
}
var globalTrace []*pb5.UniqueId
for _, v := range info.GlobalTraceIds {
globalTrace = append(globalTrace, buildUniqueId(v))
}
var spans []*pb5.SpanObject
for _, v := range info.Segment.Spans {
span := &pb5.SpanObject{
SpanId: v.SpanId,
ParentSpanId: v.ParentSpanId,
StartTime: v.StartTime,
EndTime: v.EndTime,
OperationName: v.OperationName,
Peer: v.Peer,
Component: v.ComponentName,
IsError: v.IsError != 0,
}
if v.ComponentId != 0 {
span.ComponentId = v.ComponentId
}
if v.SpanType == 0 {
span.SpanType = pb5.SpanType_Entry
} else if v.SpanType == 1 {
span.SpanType = pb5.SpanType_Exit
} else if v.SpanType == 2 {
span.SpanType = pb5.SpanType_Local
}
if v.SpanLayer == 3 {
span.SpanLayer = pb5.SpanLayer_Http
} else if v.SpanLayer == 1 {
span.SpanLayer = pb5.SpanLayer_Database
}
buildTags(span, v.Tags)
buildRefs(span, v.Refs)
spans = append(spans, span)
}
segmentObject := &pb5.TraceSegmentObject{
TraceSegmentId: buildUniqueId(info.Segment.TraceSegmentId),
Spans: spans,
ApplicationId: info.ApplicationId,
ApplicationInstanceId: info.ApplicationInstance,
IsSizeLimited: info.Segment.IsSizeLimited != 0,
}
//m := jsonpb.Marshaler{
// EnumsAsInts: true,
//}
seg, err := proto.Marshal(segmentObject)
//fmt.Println(seg)
if err != nil {
log.Println("trace => ", err)
return
}
segment := &pb5.UpstreamSegment{
GlobalTraceIds: globalTrace,
Segment: seg,
}
err = client.Send(segment)
if err != nil {
log.Println("trace => ", err)
return
}
reply, err := client.CloseAndRecv()
if err != nil {
log.Println("trace =>", err)
}
log.Println("trace => send ok")
log.Printf("Route summary: %v", reply)
}
func buildRefs(span *pb5.SpanObject, refs []ref) {
// refs
var spanRefs []*pb5.TraceSegmentReference
for _, rev := range refs {
var refType pb5.RefType
if rev.Type == 0 {
refType = pb5.RefType_CrossProcess
}
spanRefs = append(spanRefs, &pb5.TraceSegmentReference{
RefType: refType,
ParentTraceSegmentId: buildUniqueId(rev.ParentTraceSegmentId),
ParentSpanId: rev.ParentSpanId,
ParentApplicationInstanceId: rev.ParentApplicationInstanceId,
NetworkAddress: rev.NetworkAddress,
EntryApplicationInstanceId: rev.EntryApplicationInstanceId,
EntryServiceName: rev.EntryServiceName,
ParentServiceName: rev.ParentServiceName,
})
}
if len(spanRefs) > 0 {
span.Refs = spanRefs
}
}
func buildUniqueId(str string) *pb5.UniqueId {
uniqueId := &pb5.UniqueId{}
var ids []int64
for _, idStr := range strings.Split(str, ".") {
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
log.Println("trace => ", err)
panic(err)
}
ids = append(ids, id)
}
uniqueId.IdParts = ids
return uniqueId
}
func buildTags(span *pb5.SpanObject, t map[string]string) {
// tags
var tags []*pb5.KeyWithStringValue
for k, v := range t {
kv := &pb5.KeyWithStringValue{
Key: k,
Value: v,
}
tags = append(tags, kv)
}
if len(tags) > 0 {
span.Tags = tags
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册