提交 34924539 编写于 作者: H heyanlong

fix bugs

上级 7da396fe
...@@ -20,7 +20,7 @@ func main() { ...@@ -20,7 +20,7 @@ func main() {
app := cli.NewApp() app := cli.NewApp()
app.Name = "sky_php_agent" app.Name = "sky_php_agent"
app.Usage = "the skywalking trace sending agent" app.Usage = "the skywalking trace sending agent"
app.Version = "3.2.6" app.Version = "3.2.7"
app.Flags = []cli.Flag{ app.Flags = []cli.Flag{
&cli.StringSliceFlag{Name: "grpc", Usage: "SkyWalking collector grpc address", Value: cli.NewStringSlice("127.0.0.1:11800")}, &cli.StringSliceFlag{Name: "grpc", Usage: "SkyWalking collector grpc address", Value: cli.NewStringSlice("127.0.0.1:11800")},
&cli.StringFlag{Name: "socket", Usage: "Pipeline for communicating with PHP", Value: "/var/run/sky-agent.sock"}, &cli.StringFlag{Name: "socket", Usage: "Pipeline for communicating with PHP", Value: "/var/run/sky-agent.sock"},
......
...@@ -5,8 +5,6 @@ import ( ...@@ -5,8 +5,6 @@ import (
"agent/agent/pb/agent" "agent/agent/pb/agent"
"agent/agent/pb/agent2" "agent/agent/pb/agent2"
"agent/agent/pb/register2" "agent/agent/pb/register2"
"container/list"
"fmt"
cli "github.com/urfave/cli/v2" cli "github.com/urfave/cli/v2"
"google.golang.org/grpc" "google.golang.org/grpc"
"math/rand" "math/rand"
...@@ -38,10 +36,11 @@ type Agent struct { ...@@ -38,10 +36,11 @@ type Agent struct {
socket string socket string
socketListener net.Listener socketListener net.Listener
register chan *register register chan *register
registerCache sync.Map registerCache map[int]registerCache
registerCacheLock sync.Mutex registerCacheLock sync.RWMutex
trace chan string trace chan string
queue *list.List queue []string
queueLock sync.Mutex
} }
func NewAgent(cli *cli.Context) *Agent { func NewAgent(cli *cli.Context) *Agent {
...@@ -50,7 +49,7 @@ func NewAgent(cli *cli.Context) *Agent { ...@@ -50,7 +49,7 @@ func NewAgent(cli *cli.Context) *Agent {
socket: cli.String("socket"), socket: cli.String("socket"),
register: make(chan *register), register: make(chan *register),
trace: make(chan string), trace: make(chan string),
queue: list.New(), registerCache: make(map[int]registerCache),
} }
go agent.sub() go agent.sub()
...@@ -60,22 +59,25 @@ func NewAgent(cli *cli.Context) *Agent { ...@@ -60,22 +59,25 @@ func NewAgent(cli *cli.Context) *Agent {
func (t *Agent) Run() { func (t *Agent) Run() {
log.Info("hello skywalking") log.Info("hello skywalking")
t.connGRPC()
t.listenSocket()
log.Info("🍺 skywalking php agent started successfully, enjoy yourself")
defer func() { defer func() {
var err error var err error
if t.socketListener != nil {
err = t.socketListener.Close() err = t.socketListener.Close()
if err != nil { if err != nil {
log.Errorln(err) log.Errorln(err)
} }
}
if t.grpcConn != nil {
err = t.grpcConn.Close() err = t.grpcConn.Close()
if err != nil { if err != nil {
log.Errorln(err) log.Errorln(err)
} }
}
}() }()
t.connGRPC()
t.listenSocket()
} }
func (t *Agent) connGRPC() { func (t *Agent) connGRPC() {
...@@ -99,6 +101,7 @@ func (t *Agent) connGRPC() { ...@@ -99,6 +101,7 @@ func (t *Agent) connGRPC() {
t.grpcClient.segmentClientV6 = agent2.NewTraceSegmentReportServiceClient(t.grpcConn) t.grpcClient.segmentClientV6 = agent2.NewTraceSegmentReportServiceClient(t.grpcConn)
t.grpcClient.pingClient5 = agent.NewInstanceDiscoveryServiceClient(t.grpcConn) t.grpcClient.pingClient5 = agent.NewInstanceDiscoveryServiceClient(t.grpcConn)
t.grpcClient.pintClient6 = register2.NewServiceInstancePingClient(t.grpcConn) t.grpcClient.pintClient6 = register2.NewServiceInstancePingClient(t.grpcConn)
log.Info("🍺 skywalking php agent started successfully, enjoy yourself")
} }
func (t *Agent) listenSocket() { func (t *Agent) listenSocket() {
...@@ -144,17 +147,21 @@ func (t *Agent) sub() { ...@@ -144,17 +147,21 @@ func (t *Agent) sub() {
for { for {
select { select {
case <-traceSendTicker.C: case <-traceSendTicker.C:
len := t.queue.Len() len := len(t.queue)
if len > 0 { if len > 0 {
var segments []*upstreamSegment var segments []*upstreamSegment
for i := 0; i < len; i++ {
// front top 100 t.queueLock.Lock()
e := t.queue.Front() list := t.queue[:]
st := format(fmt.Sprintf("%v", e.Value)) t.queue = []string{}
t.queueLock.Unlock()
for _, trace := range list {
info, st := format(trace)
if st != nil { if st != nil {
t.recoverRegister(info)
segments = append(segments, st) segments = append(segments, st)
} }
t.queue.Remove(e)
} }
go t.send(segments) go t.send(segments)
} }
...@@ -163,8 +170,9 @@ func (t *Agent) sub() { ...@@ -163,8 +170,9 @@ func (t *Agent) sub() {
case register := <-t.register: case register := <-t.register:
go t.doRegister(register) go t.doRegister(register)
case trace := <-t.trace: case trace := <-t.trace:
t.queue.PushBack(trace) t.queueLock.Lock()
go t.recoverRegister(trace) t.queue = append(t.queue, trace)
t.queueLock.Unlock()
} }
} }
} }
...@@ -9,10 +9,10 @@ import ( ...@@ -9,10 +9,10 @@ import (
func (t *Agent) heartbeat() { func (t *Agent) heartbeat() {
t.registerCache.Range(func(key, value interface{}) bool { t.registerCacheLock.Lock()
defer t.registerCacheLock.Unlock()
for _, bind := range t.registerCache {
log.Infoln("heartbeat") log.Infoln("heartbeat")
bind := value.(registerCache)
if bind.Version == 5 { if bind.Version == 5 {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel() defer cancel()
...@@ -43,6 +43,5 @@ func (t *Agent) heartbeat() { ...@@ -43,6 +43,5 @@ func (t *Agent) heartbeat() {
log.Infof("heartbeat appId %d appInsId %d", bind.AppId, bind.InstanceId) log.Infof("heartbeat appId %d appInsId %d", bind.AppId, bind.InstanceId)
} }
} }
return true }
})
} }
...@@ -42,18 +42,21 @@ func ip4s() []string { ...@@ -42,18 +42,21 @@ func ip4s() []string {
return ips return ips
} }
func (t *Agent) recoverRegister(r string) { func (t *Agent) recoverRegister(info trace) {
var info trace
err := json.Unmarshal([]byte(r), &info) t.registerCacheLock.RLock()
if err == nil { _, ok := t.registerCache[info.Pid]
if _, ok := t.registerCache.Load(info.Pid); !ok { t.registerCacheLock.RUnlock()
t.registerCache.Store(info.Pid, registerCache{
if !ok {
t.registerCacheLock.Lock()
t.registerCache[info.Pid] = registerCache{
Version: info.Version, Version: info.Version,
AppId: info.ApplicationId, AppId: info.ApplicationId,
InstanceId: info.ApplicationInstance, InstanceId: info.ApplicationInstance,
Uuid: info.Uuid, Uuid: info.Uuid,
})
} }
t.registerCacheLock.Unlock()
} }
} }
...@@ -68,8 +71,10 @@ func (t *Agent) doRegister(r *register) { ...@@ -68,8 +71,10 @@ func (t *Agent) doRegister(r *register) {
} }
pid := info.Pid pid := info.Pid
if value, ok := t.registerCache.Load(pid); ok { t.registerCacheLock.RLock()
bind := value.(registerCache) bind, ok := t.registerCache[pid]
t.registerCacheLock.RUnlock()
if ok {
log.Infof("register => pid %d appid %d insId %d", pid, bind.AppId, bind.InstanceId) log.Infof("register => pid %d appid %d insId %d", pid, bind.AppId, bind.InstanceId)
r.c.Write([]byte(strconv.FormatInt(int64(bind.AppId), 10) + "," + strconv.FormatInt(int64(bind.InstanceId), 10) + "," + bind.Uuid)) r.c.Write([]byte(strconv.FormatInt(int64(bind.AppId), 10) + "," + strconv.FormatInt(int64(bind.InstanceId), 10) + "," + bind.Uuid))
return return
...@@ -79,9 +84,8 @@ func (t *Agent) doRegister(r *register) { ...@@ -79,9 +84,8 @@ func (t *Agent) doRegister(r *register) {
t.registerCacheLock.Lock() t.registerCacheLock.Lock()
defer t.registerCacheLock.Unlock() defer t.registerCacheLock.Unlock()
// if map not found pid.. start register // if map not found pid.. start register
if _, ok := t.registerCache.Load(pid); !ok { if !ok {
log.Infof("start register pid %d used SkyWalking v%d", pid, info.Version) log.Infof("start register pid %d used SkyWalking v%d", pid, info.Version)
var regAppStatus = false var regAppStatus = false
var appId int32 = 0 var appId int32 = 0
...@@ -254,12 +258,12 @@ func (t *Agent) doRegister(r *register) { ...@@ -254,12 +258,12 @@ func (t *Agent) doRegister(r *register) {
} }
if appInsId != 0 { if appInsId != 0 {
t.registerCache.Store(pid, registerCache{ t.registerCache[pid] = registerCache{
Version: info.Version, Version: info.Version,
AppId: appId, AppId: appId,
InstanceId: appInsId, InstanceId: appInsId,
Uuid: agentUUID, Uuid: agentUUID,
}) }
log.Infof("register pid %d appid %d insId %d", pid, appId, appInsId) log.Infof("register pid %d appid %d insId %d", pid, appId, appInsId)
} }
} else { } else {
......
...@@ -123,13 +123,13 @@ func (t *Agent) send(segments []*upstreamSegment) { ...@@ -123,13 +123,13 @@ func (t *Agent) send(segments []*upstreamSegment) {
log.Info("sending success...") log.Info("sending success...")
} }
func format(j string) *upstreamSegment { func format(j string) (trace, *upstreamSegment) {
info := trace{} info := trace{}
err := json.Unmarshal([]byte(j), &info) err := json.Unmarshal([]byte(j), &info)
if err != nil { if err != nil {
log.Error("trace json decode:", err) log.Error("trace json decode:", err)
return nil return info, nil
} }
if info.Version == 5 { if info.Version == 5 {
var globalTrace []*agent.UniqueId var globalTrace []*agent.UniqueId
...@@ -190,14 +190,14 @@ func format(j string) *upstreamSegment { ...@@ -190,14 +190,14 @@ func format(j string) *upstreamSegment {
//log.Info(seg) //log.Info(seg)
if err != nil { if err != nil {
log.Error("trace json encode:", err) log.Error("trace json encode:", err)
return nil return info, nil
} }
segment := &agent.UpstreamSegment{ segment := &agent.UpstreamSegment{
GlobalTraceIds: globalTrace, GlobalTraceIds: globalTrace,
Segment: seg, Segment: seg,
} }
return &upstreamSegment{ return info, &upstreamSegment{
Version: info.Version, Version: info.Version,
segment: segment, segment: segment,
} }
...@@ -261,7 +261,7 @@ func format(j string) *upstreamSegment { ...@@ -261,7 +261,7 @@ func format(j string) *upstreamSegment {
//log.Info(seg) //log.Info(seg)
if err != nil { if err != nil {
log.Error("trace proto encode:", err) log.Error("trace proto encode:", err)
return nil return info, nil
} }
segment := &agent.UpstreamSegment{ segment := &agent.UpstreamSegment{
...@@ -269,12 +269,12 @@ func format(j string) *upstreamSegment { ...@@ -269,12 +269,12 @@ func format(j string) *upstreamSegment {
Segment: seg, Segment: seg,
} }
return &upstreamSegment{ return info, &upstreamSegment{
Version: info.Version, Version: info.Version,
segment: segment, segment: segment,
} }
} }
return nil return info, nil
} }
func buildRefs(span *agent.SpanObject, refs []ref) { func buildRefs(span *agent.SpanObject, refs []ref) {
......
...@@ -25,7 +25,7 @@ extern zend_module_entry skywalking_module_entry; ...@@ -25,7 +25,7 @@ extern zend_module_entry skywalking_module_entry;
#define phpext_skywalking_ptr &skywalking_module_entry #define phpext_skywalking_ptr &skywalking_module_entry
#define SKY_DEBUG 0 #define SKY_DEBUG 0
#define PHP_SKYWALKING_VERSION "3.2.6" /* Replace with version number for your extension */ #define PHP_SKYWALKING_VERSION "3.2.7" /* Replace with version number for your extension */
#ifdef PHP_WIN32 #ifdef PHP_WIN32
# define PHP_SKYWALKING_API __declspec(dllexport) # define PHP_SKYWALKING_API __declspec(dllexport)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册