virtualservice_controller.go 20.3 KB
Newer Older
Z
zryfish 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
Copyright 2020 KubeSphere Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

J
Jeff 已提交
17 18 19
package virtualservice

import (
H
hongming 已提交
20
	"context"
J
Jeff 已提交
21
	"fmt"
22 23
	apinetworkingv1alpha3 "istio.io/api/networking/v1alpha3"
	clientgonetworkingv1alpha3 "istio.io/client-go/pkg/apis/networking/v1alpha3"
Z
zackzhang 已提交
24 25 26
	istioclient "istio.io/client-go/pkg/clientset/versioned"
	istioinformers "istio.io/client-go/pkg/informers/externalversions/networking/v1alpha3"
	istiolisters "istio.io/client-go/pkg/listers/networking/v1alpha3"
M
magicsong 已提交
27
	v1 "k8s.io/api/core/v1"
J
Jeff 已提交
28 29
	"k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
J
Jeff 已提交
30
	"k8s.io/apimachinery/pkg/labels"
J
Jeff 已提交
31 32
	"k8s.io/apimachinery/pkg/types"
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
J
Jeff 已提交
33
	"k8s.io/apimachinery/pkg/util/sets"
J
Jeff 已提交
34 35 36
	"k8s.io/apimachinery/pkg/util/wait"
	coreinformers "k8s.io/client-go/informers/core/v1"
	clientset "k8s.io/client-go/kubernetes"
Z
zackzhang 已提交
37 38
	"k8s.io/client-go/kubernetes/scheme"
	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
J
Jeff 已提交
39 40 41 42
	corelisters "k8s.io/client-go/listers/core/v1"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/record"
	"k8s.io/client-go/util/workqueue"
Z
zackzhang 已提交
43
	log "k8s.io/klog"
J
Jeff 已提交
44
	servicemeshv1alpha2 "kubesphere.io/kubesphere/pkg/apis/servicemesh/v1alpha2"
J
fix bug  
Jeff 已提交
45
	servicemeshclient "kubesphere.io/kubesphere/pkg/client/clientset/versioned"
J
Jeff 已提交
46 47
	servicemeshinformers "kubesphere.io/kubesphere/pkg/client/informers/externalversions/servicemesh/v1alpha2"
	servicemeshlisters "kubesphere.io/kubesphere/pkg/client/listers/servicemesh/v1alpha2"
Z
zackzhangkai 已提交
48 49
	"kubesphere.io/kubesphere/pkg/controller/utils/servicemesh"
	"reflect"
J
Jeff 已提交
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66

	"time"
)

const (
	// maxRetries is the number of times a service will be retried before it is dropped out of the queue.
	// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the
	// sequence of delays between successive queuings of a service.
	//
	// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
	maxRetries = 15
)

type VirtualServiceController struct {
	client clientset.Interface

	virtualServiceClient istioclient.Interface
J
fix bug  
Jeff 已提交
67
	servicemeshClient    servicemeshclient.Interface
J
Jeff 已提交
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93

	eventBroadcaster record.EventBroadcaster
	eventRecorder    record.EventRecorder

	serviceLister corelisters.ServiceLister
	serviceSynced cache.InformerSynced

	virtualServiceLister istiolisters.VirtualServiceLister
	virtualServiceSynced cache.InformerSynced

	destinationRuleLister istiolisters.DestinationRuleLister
	destinationRuleSynced cache.InformerSynced

	strategyLister servicemeshlisters.StrategyLister
	strategySynced cache.InformerSynced

	queue workqueue.RateLimitingInterface

	workerLoopPeriod time.Duration
}

func NewVirtualServiceController(serviceInformer coreinformers.ServiceInformer,
	virtualServiceInformer istioinformers.VirtualServiceInformer,
	destinationRuleInformer istioinformers.DestinationRuleInformer,
	strategyInformer servicemeshinformers.StrategyInformer,
	client clientset.Interface,
J
fix bug  
Jeff 已提交
94 95
	virtualServiceClient istioclient.Interface,
	servicemeshClient servicemeshclient.Interface) *VirtualServiceController {
J
Jeff 已提交
96 97

	broadcaster := record.NewBroadcaster()
J
Jeff 已提交
98 99 100
	broadcaster.StartLogging(func(format string, args ...interface{}) {
		log.Info(fmt.Sprintf(format, args))
	})
J
Jeff 已提交
101 102 103 104 105 106
	broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
	recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "virtualservice-controller"})

	v := &VirtualServiceController{
		client:               client,
		virtualServiceClient: virtualServiceClient,
J
fix bug  
Jeff 已提交
107
		servicemeshClient:    servicemeshClient,
J
Jeff 已提交
108 109 110 111 112 113 114 115
		queue:                workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "virtualservice"),
		workerLoopPeriod:     time.Second,
	}

	serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    v.enqueueService,
		DeleteFunc: v.enqueueService,
		UpdateFunc: func(old, cur interface{}) {
J
Jeff 已提交
116
			// TODO(jeff): need a more robust mechanism, because user may change labels
J
Jeff 已提交
117 118 119 120 121 122 123 124 125 126 127
			v.enqueueService(cur)
		},
	})

	v.serviceLister = serviceInformer.Lister()
	v.serviceSynced = serviceInformer.Informer().HasSynced

	v.strategyLister = strategyInformer.Lister()
	v.strategySynced = strategyInformer.Informer().HasSynced

	strategyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
J
Jeff 已提交
128 129 130 131 132
		DeleteFunc: v.addStrategy,
		AddFunc:    v.addStrategy,
		UpdateFunc: func(old, cur interface{}) {
			v.addStrategy(cur)
		},
J
Jeff 已提交
133 134 135 136 137 138 139
	})

	v.destinationRuleLister = destinationRuleInformer.Lister()
	v.destinationRuleSynced = destinationRuleInformer.Informer().HasSynced

	destinationRuleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: v.addDestinationRule,
J
Jeff 已提交
140 141 142
		UpdateFunc: func(old, cur interface{}) {
			v.addDestinationRule(cur)
		},
J
Jeff 已提交
143 144 145 146 147 148 149 150 151 152 153 154 155
	})

	v.virtualServiceLister = virtualServiceInformer.Lister()
	v.virtualServiceSynced = virtualServiceInformer.Informer().HasSynced

	v.eventBroadcaster = broadcaster
	v.eventRecorder = recorder

	return v

}

func (v *VirtualServiceController) Start(stopCh <-chan struct{}) error {
M
magicsong 已提交
156
	return v.Run(5, stopCh)
J
Jeff 已提交
157 158
}

M
magicsong 已提交
159
func (v *VirtualServiceController) Run(workers int, stopCh <-chan struct{}) error {
J
Jeff 已提交
160 161 162
	defer utilruntime.HandleCrash()
	defer v.queue.ShutDown()

163
	log.V(0).Info("starting virtualservice controller")
J
Jeff 已提交
164 165
	defer log.Info("shutting down virtualservice controller")

M
magicsong 已提交
166
	if !cache.WaitForCacheSync(stopCh, v.serviceSynced, v.virtualServiceSynced, v.destinationRuleSynced, v.strategySynced) {
M
magicsong 已提交
167
		return fmt.Errorf("failed to wait for caches to sync")
J
Jeff 已提交
168 169 170 171 172 173 174
	}

	for i := 0; i < workers; i++ {
		go wait.Until(v.worker, v.workerLoopPeriod, stopCh)
	}

	<-stopCh
M
magicsong 已提交
175
	return nil
J
Jeff 已提交
176 177 178
}

func (v *VirtualServiceController) enqueueService(obj interface{}) {
M
magicsong 已提交
179
	key, err := cache.MetaNamespaceKeyFunc(obj)
J
Jeff 已提交
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
		return
	}

	v.queue.Add(key)
}

func (v *VirtualServiceController) worker() {

	for v.processNextWorkItem() {
	}
}

func (v *VirtualServiceController) processNextWorkItem() bool {
	eKey, quit := v.queue.Get()
	if quit {
		return false
	}

	defer v.queue.Done(eKey)

	err := v.syncService(eKey.(string))
	v.handleErr(err, eKey)

	return true
}

J
Jeff 已提交
208 209 210 211 212 213 214 215 216 217
// created virtualservice's name are same as the service name, same
// as the destinationrule name
// labels:
//      servicemesh.kubernetes.io/enabled: ""
//      app.kubernetes.io/name: bookinfo
//      app: reviews
// are used to bind them together.
// syncService are the main part of reconcile function body, it takes
// service, destinationrule, strategy as input to create a virtualservice
// for service.
J
Jeff 已提交
218 219 220 221
func (v *VirtualServiceController) syncService(key string) error {
	startTime := time.Now()
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
J
Jeff 已提交
222
		log.Error(err, "not a valid controller key", "key", key)
J
Jeff 已提交
223 224 225
		return err
	}

J
Jeff 已提交
226 227 228 229
	// default component name to service name
	appName := name

	defer func() {
Z
zryfish 已提交
230
		log.V(4).Infof("Finished syncing service virtualservice %s/%s in %s.", namespace, name, time.Since(startTime))
J
Jeff 已提交
231 232
	}()

J
Jeff 已提交
233 234
	service, err := v.serviceLister.Services(namespace).Get(name)
	if err != nil {
J
Jeff 已提交
235 236
		if errors.IsNotFound(err) {
			// Delete the corresponding virtualservice, as the service has been deleted.
H
hongming 已提交
237
			err = v.virtualServiceClient.NetworkingV1alpha3().VirtualServices(namespace).Delete(context.Background(), name, metav1.DeleteOptions{})
J
Jeff 已提交
238 239 240 241
			if err != nil && !errors.IsNotFound(err) {
				log.Error(err, "delete orphan virtualservice failed", "namespace", namespace, "name", service.Name)
				return err
			}
242 243

			// delete the orphan strategy if there is any
H
hongming 已提交
244
			err = v.servicemeshClient.ServicemeshV1alpha2().Strategies(namespace).Delete(context.Background(), name, metav1.DeleteOptions{})
245 246 247 248 249
			if err != nil && !errors.IsNotFound(err) {
				log.Error(err, "delete orphan strategy failed", "namespace", namespace, "name", service.Name)
				return err
			}

J
Jeff 已提交
250
			return nil
J
Jeff 已提交
251
		}
J
Jeff 已提交
252 253
		log.Error(err, "get service failed", "namespace", namespace, "name", name)
		return err
J
Jeff 已提交
254 255
	}

Z
zackzhang 已提交
256 257 258
	if len(service.Labels) < len(servicemesh.ApplicationLabels) ||
		!servicemesh.IsApplicationComponent(service.Labels) ||
		!servicemesh.IsServicemeshEnabled(service.Annotations) ||
J
Jeff 已提交
259 260 261 262 263 264
		len(service.Spec.Ports) == 0 {
		// services don't have enough labels to create a virtualservice
		// or they don't have necessary labels
		// or they don't have any ports defined
		return nil
	}
J
Jeff 已提交
265
	// get real component name, i.e label app value
Z
zackzhang 已提交
266
	appName = servicemesh.GetComponentName(&service.ObjectMeta)
J
Jeff 已提交
267 268 269 270 271 272

	destinationRule, err := v.destinationRuleLister.DestinationRules(namespace).Get(name)
	if err != nil {
		if errors.IsNotFound(err) {
			// there is no destinationrule for this service
			// maybe corresponding workloads are not created yet
J
Jeff 已提交
273 274
			log.Info("destination rules for service not found, retrying.", "namespace", namespace, "name", name)
			return fmt.Errorf("destination rule for service %s/%s not found", namespace, name)
J
Jeff 已提交
275 276 277 278 279 280 281 282
		}
		log.Error(err, "Couldn't get destinationrule for service.", "service", types.NamespacedName{Name: service.Name, Namespace: service.Namespace}.String())
		return err
	}

	subsets := destinationRule.Spec.Subsets
	if len(subsets) == 0 {
		// destination rule with no subsets, not possibly
J
Jeff 已提交
283
		return nil
J
Jeff 已提交
284 285 286
	}

	// fetch all strategies applied to service
Z
zackzhang 已提交
287
	strategies, err := v.strategyLister.Strategies(namespace).List(labels.SelectorFromSet(map[string]string{servicemesh.AppLabel: appName}))
J
Jeff 已提交
288 289 290 291 292 293 294 295 296 297 298 299 300 301
	if err != nil {
		log.Error(err, "list strategies for service failed", "namespace", namespace, "name", appName)
		return err
	} else if len(strategies) > 1 {
		// more than one strategies are not allowed, it will cause collision
		err = fmt.Errorf("more than one strategies applied to service %s/%s is forbbiden", namespace, appName)
		log.Error(err, "")
		return err
	}

	// get current virtual service
	currentVirtualService, err := v.virtualServiceLister.VirtualServices(namespace).Get(appName)
	if err != nil {
		if errors.IsNotFound(err) {
302
			currentVirtualService = &clientgonetworkingv1alpha3.VirtualService{
J
Jeff 已提交
303 304 305
				ObjectMeta: metav1.ObjectMeta{
					Name:      appName,
					Namespace: namespace,
Z
zackzhang 已提交
306
					Labels:    servicemesh.ExtractApplicationLabels(&service.ObjectMeta),
J
Jeff 已提交
307 308
				},
			}
J
Jeff 已提交
309 310 311
		} else {
			log.Error(err, "cannot get virtualservice ", "namespace", namespace, "name", appName)
			return err
J
Jeff 已提交
312
		}
J
Jeff 已提交
313 314 315
	}
	vs := currentVirtualService.DeepCopy()

J
fix bug  
Jeff 已提交
316 317 318 319 320
	// create a whole new virtualservice

	// TODO(jeff): use FQDN to replace service name
	vs.Spec.Hosts = []string{name}

Z
zackzhangkai 已提交
321 322 323
	vs.Spec.Http = []*apinetworkingv1alpha3.HTTPRoute{}
	vs.Spec.Tcp = []*apinetworkingv1alpha3.TCPRoute{}

J
fix bug  
Jeff 已提交
324 325
	// check if service has TCP protocol ports
	for _, port := range service.Spec.Ports {
326
		var route apinetworkingv1alpha3.HTTPRouteDestination
Z
zackzhangkai 已提交
327
		var match apinetworkingv1alpha3.HTTPMatchRequest
J
fix bug  
Jeff 已提交
328
		if port.Protocol == v1.ProtocolTCP {
329 330
			route = apinetworkingv1alpha3.HTTPRouteDestination{
				Destination: &apinetworkingv1alpha3.Destination{
J
fix bug  
Jeff 已提交
331 332
					Host:   name,
					Subset: subsets[0].Name,
333
					Port: &apinetworkingv1alpha3.PortSelector{
J
fix bug  
Jeff 已提交
334 335 336 337 338 339
						Number: uint32(port.Port),
					},
				},
				Weight: 100,
			}

Z
zackzhangkai 已提交
340 341
			match = apinetworkingv1alpha3.HTTPMatchRequest{Port: uint32(port.Port)}

J
fix bug  
Jeff 已提交
342 343
			// a http port, add to HTTPRoute

Z
zackzhangkai 已提交
344 345 346 347 348 349 350 351 352 353 354 355 356 357
			if servicemesh.SupportHttpProtocol(port.Name) {
				httpRoute := apinetworkingv1alpha3.HTTPRoute{
					Route: []*apinetworkingv1alpha3.HTTPRouteDestination{&route},
					Match: []*apinetworkingv1alpha3.HTTPMatchRequest{&match},
				}
				vs.Spec.Http = append(vs.Spec.Http, &httpRoute)
			} else {
				// everything else treated as TCPRoute
				tcpRoute := apinetworkingv1alpha3.TCPRoute{
					Route: []*apinetworkingv1alpha3.RouteDestination{
						{
							Destination: route.Destination,
							Weight:      route.Weight,
						},
358
					},
Z
zackzhangkai 已提交
359 360 361
					Match: []*apinetworkingv1alpha3.L4MatchAttributes{{Port: match.Port}},
				}
				vs.Spec.Tcp = append(vs.Spec.Tcp, &tcpRoute)
362
			}
J
fix bug  
Jeff 已提交
363 364 365
		}
	}

J
Jeff 已提交
366 367
	if len(strategies) > 0 {
		// apply strategy spec to virtualservice
J
Jeff 已提交
368

J
fix bug  
Jeff 已提交
369 370 371 372 373 374 375 376 377 378
		switch strategies[0].Spec.StrategyPolicy {
		case servicemeshv1alpha2.PolicyPause:
			break
		case servicemeshv1alpha2.PolicyWaitForWorkloadReady:
			set := v.getSubsets(strategies[0])

			setNames := sets.String{}
			for i := range subsets {
				setNames.Insert(subsets[i].Name)
			}
J
Jeff 已提交
379

J
fix bug  
Jeff 已提交
380 381 382 383 384
			nonExist := false
			for k := range set {
				if !setNames.Has(k) {
					nonExist = true
				}
J
Jeff 已提交
385
			}
J
fix bug  
Jeff 已提交
386 387 388 389 390 391 392 393 394 395
			// strategy has subset that are not ready
			if nonExist {
				break
			} else {
				vs.Spec = v.generateVirtualServiceSpec(strategies[0], service).Spec
			}
		case servicemeshv1alpha2.PolicyImmediately:
			vs.Spec = v.generateVirtualServiceSpec(strategies[0], service).Spec
		default:
			vs.Spec = v.generateVirtualServiceSpec(strategies[0], service).Spec
J
Jeff 已提交
396
		}
J
Jeff 已提交
397
	}
J
Jeff 已提交
398

J
Jeff 已提交
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421
	createVirtualService := len(currentVirtualService.ResourceVersion) == 0

	if !createVirtualService &&
		reflect.DeepEqual(vs.Spec, currentVirtualService.Spec) &&
		reflect.DeepEqual(service.Labels, currentVirtualService.Labels) {
		log.V(4).Info("virtual service are equal, skipping update ")
		return nil
	}

	newVirtualService := currentVirtualService.DeepCopy()
	newVirtualService.Labels = service.Labels
	newVirtualService.Spec = vs.Spec
	if newVirtualService.Annotations == nil {
		newVirtualService.Annotations = make(map[string]string)
	}

	if len(newVirtualService.Spec.Http) == 0 && len(newVirtualService.Spec.Tcp) == 0 && len(newVirtualService.Spec.Tls) == 0 {
		err = fmt.Errorf("service %s/%s doesn't have a valid port spec", namespace, name)
		log.Error(err, "")
		return err
	}

	if createVirtualService {
H
hongming 已提交
422
		_, err = v.virtualServiceClient.NetworkingV1alpha3().VirtualServices(namespace).Create(context.Background(), newVirtualService, metav1.CreateOptions{})
J
Jeff 已提交
423
	} else {
H
hongming 已提交
424
		_, err = v.virtualServiceClient.NetworkingV1alpha3().VirtualServices(namespace).Update(context.Background(), newVirtualService, metav1.UpdateOptions{})
J
Jeff 已提交
425 426 427 428 429
	}

	if err != nil {
		if createVirtualService {
			v.eventRecorder.Event(newVirtualService, v1.EventTypeWarning, "FailedToCreateVirtualService", fmt.Sprintf("Failed to create virtualservice for service %v/%v: %v", namespace, name, err))
J
Jeff 已提交
430
		} else {
J
Jeff 已提交
431
			v.eventRecorder.Event(newVirtualService, v1.EventTypeWarning, "FailedToUpdateVirtualService", fmt.Sprintf("Failed to update virtualservice for service %v/%v: %v", namespace, name, err))
J
Jeff 已提交
432
		}
J
Jeff 已提交
433 434

		return err
J
Jeff 已提交
435 436 437 438 439 440 441 442
	}

	return nil
}

// When a destinationrule is added, figure out which service it will be used
// and enqueue it. obj must have *v1alpha3.DestinationRule type
func (v *VirtualServiceController) addDestinationRule(obj interface{}) {
443
	dr := obj.(*clientgonetworkingv1alpha3.DestinationRule)
J
Jeff 已提交
444 445 446
	service, err := v.serviceLister.Services(dr.Namespace).Get(dr.Name)
	if err != nil {
		if errors.IsNotFound(err) {
J
Jeff 已提交
447
			log.V(3).Info("service not created yet", "namespace", dr.Namespace, "service", dr.Name)
J
Jeff 已提交
448 449 450 451 452 453
			return
		}
		utilruntime.HandleError(fmt.Errorf("unable to get service with name %s/%s", dr.Namespace, dr.Name))
		return
	}

454
	key, err := cache.MetaNamespaceKeyFunc(service)
J
Jeff 已提交
455
	if err != nil {
456 457
		utilruntime.HandleError(fmt.Errorf("get service %s/%s key failed", service.Namespace, service.Name))
		return
J
Jeff 已提交
458 459
	}

460
	v.queue.Add(key)
J
Jeff 已提交
461 462
}

J
Jeff 已提交
463 464 465 466
// when a strategy created
func (v *VirtualServiceController) addStrategy(obj interface{}) {
	strategy := obj.(*servicemeshv1alpha2.Strategy)

Z
zackzhang 已提交
467
	lbs := servicemesh.ExtractApplicationLabels(&strategy.ObjectMeta)
J
Jeff 已提交
468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491
	if len(lbs) == 0 {
		err := fmt.Errorf("invalid strategy %s/%s labels %s, not have required labels", strategy.Namespace, strategy.Name, strategy.Labels)
		log.Error(err, "")
		utilruntime.HandleError(err)
		return
	}

	allServices, err := v.serviceLister.Services(strategy.Namespace).List(labels.SelectorFromSet(lbs))
	if err != nil {
		log.Error(err, "list services failed")
		utilruntime.HandleError(err)
		return
	}

	// avoid insert a key multiple times
	set := sets.String{}

	for i := range allServices {
		service := allServices[i]
		if service.Spec.Selector == nil || len(service.Spec.Ports) == 0 {
			// services with nil selectors match nothing, not everything.
			continue
		}

M
magicsong 已提交
492
		key, err := cache.MetaNamespaceKeyFunc(service)
J
Jeff 已提交
493 494 495 496 497 498 499 500 501 502
		if err != nil {
			utilruntime.HandleError(err)
			return
		}
		set.Insert(key)
	}

	for key := range set {
		v.queue.Add(key)
	}
J
Jeff 已提交
503 504 505
}

func (v *VirtualServiceController) handleErr(err error, key interface{}) {
H
hongming 已提交
506
	if err == nil {
J
Jeff 已提交
507 508 509 510 511 512 513 514 515 516
		v.queue.Forget(key)
		return
	}

	if v.queue.NumRequeues(key) < maxRetries {
		log.V(2).Info("Error syncing virtualservice for service retrying.", "key", key, "error", err)
		v.queue.AddRateLimited(key)
		return
	}

J
Jeff 已提交
517
	log.V(4).Info("Dropping service out of the queue.", "key", key, "error", err)
J
Jeff 已提交
518 519 520
	v.queue.Forget(key)
	utilruntime.HandleError(err)
}
J
Jeff 已提交
521

J
fix bug  
Jeff 已提交
522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549
func (v *VirtualServiceController) getSubsets(strategy *servicemeshv1alpha2.Strategy) sets.String {
	set := sets.String{}

	for _, httpRoute := range strategy.Spec.Template.Spec.Http {
		for _, dw := range httpRoute.Route {
			set.Insert(dw.Destination.Subset)
		}

		if httpRoute.Mirror != nil {
			set.Insert(httpRoute.Mirror.Subset)
		}
	}

	for _, tcpRoute := range strategy.Spec.Template.Spec.Tcp {
		for _, dw := range tcpRoute.Route {
			set.Insert(dw.Destination.Subset)
		}
	}

	for _, tlsRoute := range strategy.Spec.Template.Spec.Tls {
		for _, dw := range tlsRoute.Route {
			set.Insert(dw.Destination.Subset)
		}
	}

	return set
}

550
func (v *VirtualServiceController) generateVirtualServiceSpec(strategy *servicemeshv1alpha2.Strategy, service *v1.Service) *clientgonetworkingv1alpha3.VirtualService {
J
Jeff 已提交
551 552

	// Define VirtualService to be created
Z
zackzhangkai 已提交
553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586
	vs := &clientgonetworkingv1alpha3.VirtualService{}
	vs.Spec.Hosts = strategy.Spec.Template.Spec.Hosts

	// For multi-ports, apply the rules to each port matched http/tcp protocol
	for _, port := range service.Spec.Ports {
		s := strategy.DeepCopy()
		strategyTempSpec := s.Spec.Template.Spec
		// fill route.destination.port and match.port filed
		if len(strategyTempSpec.Http) > 0 && servicemesh.SupportHttpProtocol(port.Name) {
			for _, http := range strategyTempSpec.Http {
				if len(http.Match) == 0 {
					http.Match = []*apinetworkingv1alpha3.HTTPMatchRequest{{Port: uint32(port.Port)}}
				} else {
					for _, match := range http.Match {
						match.Port = uint32(port.Port)
					}
				}
				for _, route := range http.Route {
					route.Destination.Port = &apinetworkingv1alpha3.PortSelector{
						Number: uint32(port.Port),
					}
				}
			}
			vs.Spec.Http = append(vs.Spec.Http, strategyTempSpec.Http...)
		}
		if len(strategyTempSpec.Tcp) > 0 && !servicemesh.SupportHttpProtocol(port.Name) {
			for _, tcp := range strategyTempSpec.Tcp {
				tcp.Match = []*apinetworkingv1alpha3.L4MatchAttributes{{Port: uint32(port.Port)}}
				for _, r := range tcp.Route {
					r.Destination.Port = &apinetworkingv1alpha3.PortSelector{Number: uint32(port.Port)}
				}
			}
			vs.Spec.Tcp = append(vs.Spec.Tcp, strategyTempSpec.Tcp...)
		}
J
Jeff 已提交
587 588 589 590
	}

	// one version rules them all
	if len(strategy.Spec.GovernorVersion) > 0 {
591 592
		governorDestinationWeight := apinetworkingv1alpha3.HTTPRouteDestination{
			Destination: &apinetworkingv1alpha3.Destination{
J
Jeff 已提交
593 594 595 596 597 598
				Host:   service.Name,
				Subset: strategy.Spec.GovernorVersion,
			},
			Weight: 100,
		}

Z
zackzhangkai 已提交
599 600 601 602 603 604 605 606
		for _, port := range service.Spec.Ports {
			match := apinetworkingv1alpha3.HTTPMatchRequest{Port: uint32(port.Port)}
			if len(strategy.Spec.Template.Spec.Http) > 0 {
				governorRoute := apinetworkingv1alpha3.HTTPRoute{
					Route: []*apinetworkingv1alpha3.HTTPRouteDestination{&governorDestinationWeight},
					Match: []*apinetworkingv1alpha3.HTTPMatchRequest{&match},
				}
				vs.Spec.Http = []*apinetworkingv1alpha3.HTTPRoute{&governorRoute}
J
Jeff 已提交
607

Z
zackzhangkai 已提交
608 609 610 611 612 613 614 615 616 617
			}
			if len(strategy.Spec.Template.Spec.Tcp) > 0 {
				tcpRoute := apinetworkingv1alpha3.TCPRoute{
					Route: []*apinetworkingv1alpha3.RouteDestination{
						{
							Destination: &apinetworkingv1alpha3.Destination{
								Host:   governorDestinationWeight.Destination.Host,
								Subset: governorDestinationWeight.Destination.Subset,
							},
							Weight: governorDestinationWeight.Weight,
618 619
						},
					},
Z
zackzhangkai 已提交
620 621
					Match: []*apinetworkingv1alpha3.L4MatchAttributes{{Port: match.Port}},
				}
622

Z
zackzhangkai 已提交
623 624 625
				//governorRoute := v1alpha3.TCPRoute{tcpRoute}
				vs.Spec.Tcp = []*apinetworkingv1alpha3.TCPRoute{&tcpRoute}
			}
J
Jeff 已提交
626 627 628
		}
	}

Z
zackzhang 已提交
629
	servicemesh.FillDestinationPort(vs, service)
J
Jeff 已提交
630 631
	return vs
}