eureka.go 4.7 KB
Newer Older
E
eoLinker API Management 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
package eureka

import (
	"context"
	"encoding/xml"
	"fmt"
	"io/ioutil"
	"net/http"
	"net/url"
	"strconv"
	"strings"
	"sync/atomic"
	"time"
Y
Your Name 已提交
14 15 16

	log "github.com/eolinker/goku-api-gateway/goku-log"
	"github.com/eolinker/goku-api-gateway/goku-service/common"
E
eoLinker API Management 已提交
17 18
)

Y
Your Name 已提交
19
//Eureka eureka
E
eoLinker API Management 已提交
20
type Eureka struct {
Y
Your Name 已提交
21
	services []*common.Service
E
eoLinker API Management 已提交
22
	//AppNames  map[string]string
Y
Your Name 已提交
23
	eurekaURL       []string
Y
Your Name 已提交
24 25 26 27
	weightKey       string
	callback        func(services []*common.Service)
	ct              uint64
	cancelFunc      context.CancelFunc
E
eoLinker API Management 已提交
28 29 30
	instanceFactory *common.InstanceFactory
}

Y
Your Name 已提交
31
//SetConfig setConfig
E
eoLinker API Management 已提交
32
func (d *Eureka) SetConfig(config string) error {
Y
Your Name 已提交
33
	tags := strings.Split(config, ";")
E
eoLinker API Management 已提交
34
	weightKey := ""
Y
Your Name 已提交
35 36
	if len(tags) > 1 {
		weightKey = tags[1]
E
eoLinker API Management 已提交
37 38
	}

Y
Your Name 已提交
39
	urls := strings.Split(tags[0], ",")
E
eoLinker API Management 已提交
40

Y
Your Name 已提交
41
	d.setConfig(urls, weightKey)
E
eoLinker API Management 已提交
42 43 44

	return nil
}
Y
Your Name 已提交
45 46
func (d *Eureka) setConfig(eurekaURL []string, weightKey string) {
	d.eurekaURL = eurekaURL
E
eoLinker API Management 已提交
47 48
	d.weightKey = weightKey
}
Y
Your Name 已提交
49 50

//Driver driver
E
eoLinker API Management 已提交
51 52 53 54
func (d *Eureka) Driver() string {
	return DriverName
}

Y
Your Name 已提交
55
//SetCallback setCallBack
E
eoLinker API Management 已提交
56 57 58 59
func (d *Eureka) SetCallback(callback func(services []*common.Service)) {
	d.callback = callback
}

Y
Your Name 已提交
60
//GetServers getServers
E
eoLinker API Management 已提交
61
func (d *Eureka) GetServers() ([]*common.Service, error) {
Y
Your Name 已提交
62
	return d.services, nil
E
eoLinker API Management 已提交
63 64
}

Y
Your Name 已提交
65
//Close close
E
eoLinker API Management 已提交
66
func (d *Eureka) Close() error {
Y
Your Name 已提交
67
	if d.cancelFunc != nil {
E
eoLinker API Management 已提交
68 69 70 71 72 73
		d.cancelFunc()
		d.cancelFunc = nil
	}
	return nil
}

Y
Your Name 已提交
74
//Open open
E
eoLinker API Management 已提交
75
func (d *Eureka) Open() error {
Y
Your Name 已提交
76
	d.ScheduleAtFixedRate(time.Second * 5)
E
eoLinker API Management 已提交
77 78 79
	return nil
}

Y
Your Name 已提交
80
//NewEurekaDiscovery 创建Eureka
E
eoLinker API Management 已提交
81
func NewEurekaDiscovery(config string) *Eureka {
Y
Your Name 已提交
82
	e := &Eureka{
E
eoLinker API Management 已提交
83 84 85 86 87 88 89 90 91 92 93
		services:        nil,
		callback:        nil,
		ct:              0,
		cancelFunc:      nil,
		instanceFactory: common.NewInstanceFactory(),
	}
	e.SetConfig(config)
	return e
}

func (d *Eureka) execCallbacks(apps *Applications) {
Y
Your Name 已提交
94
	if d.callback == nil {
E
eoLinker API Management 已提交
95 96
		return
	}
Y
Your Name 已提交
97
	if apps == nil {
E
eoLinker API Management 已提交
98 99 100
		d.callback(nil)
		return
	}
Y
Your Name 已提交
101
	if len(apps.Applications) == 0 {
E
eoLinker API Management 已提交
102 103 104 105
		d.callback(nil)
		return
	}

Y
Your Name 已提交
106 107 108 109 110
	services := make([]*common.Service, 0, len(apps.Applications))
	for _, app := range apps.Applications {
		inses := make([]*common.Instance, 0, len(app.Instances))
		for _, ins := range app.Instances {
			if ins.Status != EurekaStatusUp {
E
eoLinker API Management 已提交
111 112
				continue
			}
Y
Your Name 已提交
113 114 115
			weight := 0
			if w, has := ins.Metadata.Map[d.weightKey]; has {
				weight, _ = strconv.Atoi(w)
E
eoLinker API Management 已提交
116 117

			}
Y
Your Name 已提交
118
			if weight == 0 {
E
eoLinker API Management 已提交
119 120
				weight = 1
			}
Y
Your Name 已提交
121 122
			port := 0
			if ins.Port.Enabled {
E
eoLinker API Management 已提交
123
				port = ins.Port.Port
Y
Your Name 已提交
124
			} else if ins.SecurePort.Enabled {
E
eoLinker API Management 已提交
125 126
				port = ins.SecurePort.Port
			}
Y
Your Name 已提交
127
			inses = append(inses, d.instanceFactory.General(ins.IPAddr, port, weight))
E
eoLinker API Management 已提交
128
		}
Y
Your Name 已提交
129 130
		server := common.NewService(app.Name, inses)
		services = append(services, server)
E
eoLinker API Management 已提交
131 132 133 134 135 136
	}

	d.callback(services)

}

Y
Your Name 已提交
137
//ScheduleAtFixedRate scheduleAtFixedRate
E
eoLinker API Management 已提交
138 139
func (d *Eureka) ScheduleAtFixedRate(second time.Duration) {
	d.run()
Y
Your Name 已提交
140
	if d.cancelFunc != nil {
E
eoLinker API Management 已提交
141 142 143 144
		d.cancelFunc()
		d.cancelFunc = nil
	}

Y
Your Name 已提交
145
	ctx, cancel := context.WithCancel(context.Background())
E
eoLinker API Management 已提交
146
	d.cancelFunc = cancel
Y
Your Name 已提交
147
	go d.runTask(ctx, second)
E
eoLinker API Management 已提交
148 149
}

Y
Your Name 已提交
150
func (d *Eureka) runTask(ctx context.Context, second time.Duration) {
E
eoLinker API Management 已提交
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
	timer := time.NewTicker(second)
	for {
		select {
		case <-timer.C:
			d.run()
		case <-ctx.Done():
			return

		}
	}
}

func (d *Eureka) run() {
	apps, err := d.GetApplications()
	if err == nil || apps != nil {
		//d.apps = apps
		d.execCallbacks(apps)
	} else {
		log.Error(err)
	}
}

Y
Your Name 已提交
173
//GetApplications 获取应用
E
eoLinker API Management 已提交
174
func (d *Eureka) GetApplications() (*Applications, error) {
Y
Your Name 已提交
175
	url, err := d.getEurekaServerURL()
Y
Your Name 已提交
176 177
	if err != nil {
		return nil, err
E
eoLinker API Management 已提交
178
	}
Y
Your Name 已提交
179
	url = fmt.Sprintf("%s/apps", url)
E
eoLinker API Management 已提交
180 181 182 183 184 185 186 187 188 189 190 191 192

	res, err := http.Get(url)
	if err != nil {
		return nil, err
	}

	respBody, err := ioutil.ReadAll(res.Body)
	if err != nil {
		return nil, err
	}
	if res.StatusCode != http.StatusOK {
		return nil, err
	}
Y
Your Name 已提交
193
	var applications = new(Applications)
E
eoLinker API Management 已提交
194 195 196 197 198 199 200
	err = xml.Unmarshal(respBody, applications)

	//	log.Info(string(respBody))
	//	log.Info(err, applications)
	return applications, err
}

Y
Your Name 已提交
201
func (d *Eureka) getEurekaServerURL() (string, error) {
E
eoLinker API Management 已提交
202
	ct := atomic.AddUint64(&d.ct, 1)
Y
Your Name 已提交
203
	size := len(d.eurekaURL)
E
eoLinker API Management 已提交
204
	if size == 0 {
Y
Your Name 已提交
205
		e := NilPointError("eureka url is empty")
E
eoLinker API Management 已提交
206

Y
Your Name 已提交
207
		return "", e
E
eoLinker API Management 已提交
208 209
	}
	index := int(ct) % size
Y
Your Name 已提交
210
	url := d.eurekaURL[index]
E
eoLinker API Management 已提交
211 212 213
	//if strings.LastIndex(url,"/")>-1{
	url = strings.TrimSuffix(url, "/")
	//}
Y
Your Name 已提交
214
	return url, nil
E
eoLinker API Management 已提交
215 216
}

Y
Your Name 已提交
217
//Health health
E
eoLinker API Management 已提交
218 219 220
func (d *Eureka) Health() (bool, string) {
	ok, desc := true, "ok"
	i := 0
Y
Your Name 已提交
221
	for _, u := range d.eurekaURL {
E
eoLinker API Management 已提交
222 223 224 225 226 227 228

		url, err := url.Parse(u)
		if err != nil {
			i++
			ok, desc = false, err.Error()
			continue
		}
Y
Your Name 已提交
229 230
		healthURL := url.Scheme + "://" + url.Host + "/health"
		res, err := http.Get(healthURL)
E
eoLinker API Management 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
		if err != nil {
			i++
			ok, desc = false, err.Error()
			continue
		}
		if res == nil || res.StatusCode != http.StatusOK {
			i++
			ok, desc = true, res.Status
			continue
		}

	}

	return ok, desc

}