consul_catalog.go 4.0 KB
Newer Older
E
eoLinker API Management 已提交
1 2 3 4
package consul

import (
	"context"
Y
Your Name 已提交
5 6
	"time"

黄孟柱 已提交
7 8
	log "github.com/eolinker/goku-api-gateway/goku-log"
	"github.com/eolinker/goku-api-gateway/goku-service/common"
E
eoLinker API Management 已提交
9 10 11
	"github.com/hashicorp/consul/api"
)

Y
Your Name 已提交
12 13
//Discovery discovery
type Discovery struct {
E
eoLinker API Management 已提交
14 15 16 17 18 19 20 21 22 23 24 25
	//Config *api.Config

	orgConfig string

	callback func([]*common.Service)
	client   *api.Client
	services []*common.Service

	instanceFactory *common.InstanceFactory
	cancel          context.CancelFunc
}

Y
Your Name 已提交
26 27
//SetConfig setConfig
func (d *Discovery) SetConfig(config string) error {
E
eoLinker API Management 已提交
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
	if d.orgConfig == config {
		return nil
	}
	d.orgConfig = config
	c := api.DefaultConfig()
	c.Address = config
	//d.Config = c
	client, err := api.NewClient(c)
	if err != nil {
		return err
	}
	d.client = client

	return nil

}

Y
Your Name 已提交
45 46
//Driver driver
func (d *Discovery) Driver() string {
E
eoLinker API Management 已提交
47 48 49
	return DriverName
}

Y
Your Name 已提交
50 51
//SetCallback setCallback
func (d *Discovery) SetCallback(callback func(services []*common.Service)) {
E
eoLinker API Management 已提交
52 53 54
	d.callback = callback
}

Y
Your Name 已提交
55 56
//GetServers getServers
func (d *Discovery) GetServers() ([]*common.Service, error) {
E
eoLinker API Management 已提交
57 58 59
	return d.services, nil
}

Y
Your Name 已提交
60 61
//Close close
func (d *Discovery) Close() error {
E
eoLinker API Management 已提交
62 63 64 65 66 67
	if d.cancel != nil {
		d.cancel()
	}
	return nil
}

Y
Your Name 已提交
68 69
//Open open
func (d *Discovery) Open() error {
E
eoLinker API Management 已提交
70 71 72 73 74

	d.ScheduleAtFixedRate(time.Second * 5)
	return nil
}

Y
Your Name 已提交
75 76
//NewConsulDiscovery address: [hostName:port]
func NewConsulDiscovery(address string) *Discovery {
E
eoLinker API Management 已提交
77 78 79 80 81 82 83 84 85
	config := api.DefaultConfig()
	config.Address = address

	client, err := api.NewClient(config)
	if err != nil {
		log.Error(err)
		return nil
	}

Y
Your Name 已提交
86
	cd := &Discovery{
E
eoLinker API Management 已提交
87 88 89 90 91 92 93 94 95 96
		callback:        nil,
		client:          client,
		services:        nil,
		orgConfig:       address,
		instanceFactory: common.NewInstanceFactory(),
	}

	return cd
}

Y
Your Name 已提交
97 98
//GetServicesInTime getServicesInTime
func (d *Discovery) GetServicesInTime() (map[string][]string, map[string][]*api.ServiceEntry, error) {
E
eoLinker API Management 已提交
99 100 101 102 103 104 105 106 107 108

	q := &api.QueryOptions{}
	services, _, err := d.client.Catalog().Services(q)
	if err != nil {
		return nil, nil, err
	}

	catalogServices := make(map[string][]*api.ServiceEntry)

	for serviceName := range services {
Y
Your Name 已提交
109
		cs, _, err := d.client.Health().Service(serviceName, "", true, q)
E
eoLinker API Management 已提交
110 111 112 113 114 115 116 117 118 119 120 121
		if err != nil {
			log.Info(err.Error())
			continue
		}
		catalogServices[serviceName] = cs

	}

	return services, catalogServices, nil

}

Y
Your Name 已提交
122 123
//ScheduleAtFixedRate scheduleAtFixedRate
func (d *Discovery) ScheduleAtFixedRate(second time.Duration) {
E
eoLinker API Management 已提交
124 125 126 127 128 129 130
	if d.cancel != nil {
		d.cancel()
		d.cancel = nil
	}
	d.run()
	ctx, cancel := context.WithCancel(context.Background())
	d.cancel = cancel
Y
Your Name 已提交
131
	go d.runTask(ctx, second)
E
eoLinker API Management 已提交
132 133
}

Y
Your Name 已提交
134
func (d *Discovery) runTask(ctx context.Context, second time.Duration) {
E
eoLinker API Management 已提交
135 136 137 138 139 140 141 142 143 144 145 146 147
	timer := time.NewTicker(second)
	defer timer.Stop()
	for {
		select {
		case <-ctx.Done():
			{
				return
			}
		case <-timer.C:
			go d.run()
		}
	}
}
Y
Your Name 已提交
148
func (d *Discovery) run() {
E
eoLinker API Management 已提交
149 150 151 152 153 154 155 156
	services, catalogServices, err := d.GetServicesInTime()
	if err == nil || services != nil || catalogServices != nil {
		d.execCallbacks(services, catalogServices)
	} else {
		log.Info(err.Error())
	}
}

Y
Your Name 已提交
157
func (d *Discovery) execCallbacks(services map[string][]string, catalogServices map[string][]*api.ServiceEntry) {
E
eoLinker API Management 已提交
158 159 160 161 162 163 164 165 166 167 168 169
	if services == nil {
		log.Info("consul services is empty")
		return
	}
	if d.callback == nil {
		return
	}

	serviceList := make([]*common.Service, 0, len(catalogServices))
	for appName, catalogInstances := range catalogServices {

		size := len(catalogInstances)
Y
Your Name 已提交
170
		if size == 0 {
E
eoLinker API Management 已提交
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
			continue
		}
		hosts := make([]*common.Instance, size)
		for i, instance := range catalogInstances {
			//h.hostChangedCallback(appName, newHostInstanceByEureka(appName, &instance))
			hosts[i] = d.instanceFactory.General(instance.Node.Address, instance.Service.Port, 1)

		}

		s := common.NewService(appName, hosts)
		serviceList = append(serviceList, s)
	}
	d.services = serviceList
	d.callback(serviceList)

}

Y
Your Name 已提交
188 189
//Health health
func (d *Discovery) Health() (bool, string) {
E
eoLinker API Management 已提交
190 191 192 193 194 195 196 197 198 199
	leader, err := d.client.Status().Leader()
	if err != nil || leader == "" {
		return false, err.Error()
	}

	ok, desc := true, "ok"

	return ok, desc

}