main.go 7.8 KB
Newer Older
H
agent  
heyanlong 已提交
1 2 3
package main

import (
H
heyanlong 已提交
4 5 6
	"agent/agent/pb/agent"
	"agent/agent/pb/common"
	"agent/agent/pb/register2"
H
agent  
heyanlong 已提交
7
	"agent/agent/service"
H
agent  
heyanlong 已提交
8 9 10
	"context"
	"encoding/json"
	"fmt"
H
agent  
heyanlong 已提交
11
	"github.com/google/uuid"
H
agent  
heyanlong 已提交
12 13 14 15 16 17 18 19 20 21
	"google.golang.org/grpc"
	"net"
	"os"
	"runtime"
	"strconv"
	"sync"
	"time"
)

type PHPSkyBind struct {
H
agent  
heyanlong 已提交
22 23 24 25
	Version    int
	AppId      int32
	InstanceId int32
	Uuid       string
H
agent  
heyanlong 已提交
26 27 28 29 30
}

type Register struct {
	AppCode string `json:"app_code"`
	Pid     int    `json:"pid"`
H
heyanlong 已提交
31
	Version int    `json:"version"`
H
agent  
heyanlong 已提交
32 33 34 35 36 37
}

var registerMapLock = new(sync.Mutex)
var registerMap sync.Map
var grpcConn *grpc.ClientConn

H
agent  
heyanlong 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
func ip4s() []string {
	ipv4s, addErr := net.InterfaceAddrs()
	var ips []string
	if addErr == nil {
		for _, i := range ipv4s {
			if ipnet, ok := i.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
				if ipnet.IP.To4() != nil {
					ips = append(ips, ipnet.IP.String())
				}
			}
		}
	}
	return ips
}

H
agent  
heyanlong 已提交
53 54 55 56
func register(c net.Conn, j string) {
	defer func() {
		err := recover()
		if err != nil {
H
heyanlong 已提交
57
			fmt.Println("System error[register]:", err)
H
agent  
heyanlong 已提交
58 59 60 61 62 63
		}
	}()

	info := Register{}
	err := json.Unmarshal([]byte(j), &info)
	if err != nil {
H
heyanlong 已提交
64
		fmt.Println("register => ", err)
H
agent  
heyanlong 已提交
65
		c.Write([]byte(""))
H
agent  
heyanlong 已提交
66 67 68 69 70 71
		return
	}

	pid := info.Pid
	if value, ok := registerMap.Load(pid); ok {
		bind := value.(PHPSkyBind)
H
heyanlong 已提交
72
		fmt.Printf("register => pid %d appid %d insId %d\n", pid, bind.AppId, bind.InstanceId)
H
agent  
heyanlong 已提交
73
		c.Write([]byte(strconv.FormatInt(int64(bind.AppId), 10) + "," + strconv.FormatInt(int64(bind.InstanceId), 10)))
H
agent  
heyanlong 已提交
74 75
		return
	} else {
H
agent  
heyanlong 已提交
76
		c.Write([]byte(""))
H
agent  
heyanlong 已提交
77 78 79 80 81 82 83
	}

	registerMapLock.Lock()
	defer registerMapLock.Unlock()

	// if map not found pid.. start register
	if _, ok := registerMap.Load(pid); !ok {
H
heyanlong 已提交
84
		fmt.Println("register => Start register...")
H
agent  
heyanlong 已提交
85
		var regAppStatus = false
H
heyanlong 已提交
86
		var appId int32 = 0
H
heyanlong 已提交
87
		var appInsId int32 = 0
H
heyanlong 已提交
88
		var regErr error
H
heyanlong 已提交
89
		agentUUID := uuid.New().String()
H
agent  
heyanlong 已提交
90

H
heyanlong 已提交
91
		if info.Version == 5 {
H
heyanlong 已提交
92 93 94 95 96 97 98 99 100 101 102 103
			c := agent.NewApplicationRegisterServiceClient(grpcConn)
			ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
			defer cancel()

			var regResp *agent.ApplicationMapping

			// loop register
			for {
				regResp, regErr = c.ApplicationCodeRegister(ctx, &agent.Application{
					ApplicationCode: info.AppCode,
				})
				if regErr != nil {
H
heyanlong 已提交
104
					fmt.Println("register error", regErr)
H
heyanlong 已提交
105 106 107 108 109 110 111 112 113
					break
				}
				if regResp.GetApplication() != nil {
					regAppStatus = true
					appId = regResp.GetApplication().GetValue()
					break
				}
				time.Sleep(time.Second)
			}
H
heyanlong 已提交
114
		} else if info.Version == 6 {
H
heyanlong 已提交
115
			c := register2.NewRegisterClient(grpcConn)
H
heyanlong 已提交
116 117 118
			ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
			defer cancel()

H
heyanlong 已提交
119 120 121
			var regResp *register2.ServiceRegisterMapping
			var services []*register2.Service
			services = append(services, &register2.Service{
H
heyanlong 已提交
122 123 124 125
				ServiceName: info.AppCode,
			})
			// loop register
			for {
H
heyanlong 已提交
126
				regResp, regErr = c.DoServiceRegister(ctx, &register2.Services{
H
heyanlong 已提交
127 128 129
					Services: services,
				})
				if regErr != nil {
H
heyanlong 已提交
130
					fmt.Println("register error", regErr)
H
heyanlong 已提交
131 132
					break
				}
H
heyanlong 已提交
133

H
heyanlong 已提交
134
				if regResp.GetServices() != nil {
H
heyanlong 已提交
135 136 137 138 139 140 141 142 143 144
					for _, v := range regResp.GetServices() {
						if v.GetKey() == info.AppCode {
							regAppStatus = true
							appId = v.GetValue()
							break
						}
					}
				}

				if regAppStatus {
H
heyanlong 已提交
145 146 147
					break
				}
				time.Sleep(time.Second)
H
agent  
heyanlong 已提交
148 149 150
			}
		}

H
agent  
heyanlong 已提交
151
		if regAppStatus {
H
agent  
heyanlong 已提交
152
			// start reg instance
H
heyanlong 已提交
153
			if info.Version == 5 {
H
heyanlong 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
				instanceClient := agent.NewInstanceDiscoveryServiceClient(grpcConn)
				instanceCtx, instanceCancel := context.WithTimeout(context.Background(), time.Second*3)
				defer instanceCancel()

				var instanceErr error
				var instanceResp *agent.ApplicationInstanceMapping
				hostName, _ := os.Hostname()

				instanceReq := &agent.ApplicationInstance{
					ApplicationId: appId,
					AgentUUID:     agentUUID,
					RegisterTime:  time.Now().UnixNano() / 1000000,
					Osinfo: &agent.OSInfo{
						OsName:    runtime.GOOS,
						Hostname:  hostName,
						ProcessNo: int32(pid),
						Ipv4S:     ip4s(),
					},
				}
				for {
					instanceResp, instanceErr = instanceClient.RegisterInstance(instanceCtx, instanceReq)
					if instanceErr != nil {
H
heyanlong 已提交
176
						fmt.Println("register error", instanceErr)
H
heyanlong 已提交
177 178 179 180 181 182 183 184
						break
					}
					if instanceResp.GetApplicationInstanceId() != 0 {
						appInsId = instanceResp.GetApplicationInstanceId()
						break
					}
					time.Sleep(time.Second)
				}
H
heyanlong 已提交
185
			} else if info.Version == 6 {
H
heyanlong 已提交
186
				instanceClient := register2.NewRegisterClient(grpcConn)
H
heyanlong 已提交
187 188 189 190
				instanceCtx, instanceCancel := context.WithTimeout(context.Background(), time.Second*3)
				defer instanceCancel()

				var instanceErr error
H
heyanlong 已提交
191
				var instanceResp *register2.ServiceInstanceRegisterMapping
H
heyanlong 已提交
192 193
				hostName, _ := os.Hostname()

H
heyanlong 已提交
194
				var instances []*register2.ServiceInstance
H
heyanlong 已提交
195 196
				var properties []*common.KeyStringValuePair

H
heyanlong 已提交
197
				instances = append(instances, &register2.ServiceInstance{
H
heyanlong 已提交
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
					ServiceId:    appId,
					InstanceUUID: agentUUID,
					Time:         time.Now().UnixNano() / 1000000,
					Properties:   properties,
				})

				properties = append(properties, &common.KeyStringValuePair{
					Key:   "os_name",
					Value: runtime.GOOS,
				})

				properties = append(properties, &common.KeyStringValuePair{
					Key:   "host_name",
					Value: hostName,
				})

				properties = append(properties, &common.KeyStringValuePair{
					Key:   "process_no",
					Value: string(pid),
				})

				properties = append(properties, &common.KeyStringValuePair{
					Key:   "language",
					Value: "php",
				})

				for _, ip := range ip4s() {
					properties = append(properties, &common.KeyStringValuePair{
						Key:   "ipV4s",
						Value: ip,
					})
				}

H
heyanlong 已提交
231
				instanceReq := &register2.ServiceInstances{
H
heyanlong 已提交
232 233 234 235 236
					Instances: instances,
				}
				for {
					instanceResp, instanceErr = instanceClient.DoServiceInstanceRegister(instanceCtx, instanceReq)
					if instanceErr != nil {
H
heyanlong 已提交
237
						fmt.Println("register error", instanceErr)
H
heyanlong 已提交
238 239 240 241 242 243 244 245 246 247 248 249 250 251
						break
					}
					if instanceResp.GetServiceInstances() != nil {
						for _, v := range instanceResp.GetServiceInstances() {
							if v.GetKey() == agentUUID {
								appInsId = v.GetValue()
								break
							}
						}
					}
					if appInsId != 0 {
						break
					}
					time.Sleep(time.Second)
H
agent  
heyanlong 已提交
252 253 254
				}
			}

H
heyanlong 已提交
255
			if appInsId != 0 {
H
agent  
heyanlong 已提交
256
				registerMap.Store(pid, PHPSkyBind{
H
heyanlong 已提交
257
					Version:    info.Version,
H
heyanlong 已提交
258
					AppId:      appId,
H
heyanlong 已提交
259
					InstanceId: appInsId,
H
agent  
heyanlong 已提交
260 261
					Uuid:       agentUUID,
				})
H
heyanlong 已提交
262
				fmt.Println("register => Start register end...")
H
agent  
heyanlong 已提交
263
			}
H
agent  
heyanlong 已提交
264
		} else {
H
heyanlong 已提交
265 266
			fmt.Println("register => ", err)
			fmt.Println("register => Start register error...")
H
agent  
heyanlong 已提交
267 268 269 270
		}
	}
}

H
agent  
heyanlong 已提交
271 272 273 274
func heartbeat() {
	defer func() {
		err := recover()
		if err != nil {
H
heyanlong 已提交
275
			fmt.Println("System error[heartbeat]:", err)
H
agent  
heyanlong 已提交
276 277 278 279 280 281
			go heartbeat()
		}
	}()

	for {
		registerMap.Range(func(key, value interface{}) bool {
H
heyanlong 已提交
282
			fmt.Println("heartbeat => ...")
H
agent  
heyanlong 已提交
283 284
			bind := value.(PHPSkyBind)

H
heyanlong 已提交
285 286
			if bind.Version == 5 {

H
heyanlong 已提交
287 288 289 290 291 292 293 294 295 296 297 298 299
				c := agent.NewInstanceDiscoveryServiceClient(grpcConn)
				ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
				defer cancel()

				_, err := c.Heartbeat(ctx, &agent.ApplicationInstanceHeartbeat{
					ApplicationInstanceId: bind.InstanceId,
					HeartbeatTime:         time.Now().UnixNano() / 1000000,
				})
				if err != nil {
					fmt.Println("heartbeat =>", err)
				} else {
					fmt.Printf("heartbeat => %d %d\n", bind.AppId, bind.InstanceId)
				}
H
heyanlong 已提交
300
			} else if bind.Version == 6 {
H
heyanlong 已提交
301
				c := register2.NewServiceInstancePingClient(grpcConn)
H
heyanlong 已提交
302 303 304
				ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
				defer cancel()

H
heyanlong 已提交
305
				_, err := c.DoPing(ctx, &register2.ServiceInstancePingPkg{
H
heyanlong 已提交
306 307 308 309 310 311 312 313 314 315
					ServiceInstanceId:   bind.InstanceId,
					Time:                time.Now().UnixNano() / 1000000,
					ServiceInstanceUUID: bind.Uuid,
				})
				if err != nil {
					fmt.Println("heartbeat =>", err)
				} else {
					fmt.Printf("heartbeat => %d %d\n", bind.AppId, bind.InstanceId)
				}
			}
H
agent  
heyanlong 已提交
316 317 318 319 320 321
			return true
		})
		time.Sleep(time.Second * 40)
	}
}

H
agent  
heyanlong 已提交
322
func main() {
H
heyanlong 已提交
323 324
	a := service.NewAgent()
	a.Run()
H
agent  
heyanlong 已提交
325
}