Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
水淹萌龙
kubesphere
提交
635dcc9b
K
kubesphere
项目概览
水淹萌龙
/
kubesphere
与 Fork 源项目一致
Fork自
KubeSphere / kubesphere
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
K
kubesphere
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
未验证
提交
635dcc9b
编写于
9月 16, 2019
作者:
K
KubeSphere CI Bot
提交者:
GitHub
9月 16, 2019
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #739 from wnxn/attach-controller
add expand volume controller
上级
7c049a24
55483e65
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
870 addition
and
0 deletion
+870
-0
cmd/controller-manager/app/controllers.go
cmd/controller-manager/app/controllers.go
+11
-0
pkg/controller/storage/expansion/expansion_controller.go
pkg/controller/storage/expansion/expansion_controller.go
+505
-0
pkg/controller/storage/expansion/expansion_controller_test.go
...controller/storage/expansion/expansion_controller_test.go
+288
-0
pkg/controller/storage/expansion/expansion_util.go
pkg/controller/storage/expansion/expansion_util.go
+66
-0
未找到文件。
cmd/controller-manager/app/controllers.go
浏览文件 @
635dcc9b
...
...
@@ -26,6 +26,7 @@ import (
"kubesphere.io/kubesphere/pkg/controller/job"
"kubesphere.io/kubesphere/pkg/controller/s2ibinary"
"kubesphere.io/kubesphere/pkg/controller/s2irun"
"kubesphere.io/kubesphere/pkg/controller/storage/expansion"
//"kubesphere.io/kubesphere/pkg/controller/job"
"kubesphere.io/kubesphere/pkg/controller/virtualservice"
...
...
@@ -119,6 +120,15 @@ func AddControllers(mgr manager.Manager, cfg *rest.Config, stopCh <-chan struct{
kubesphereInformer
.
Devops
()
.
V1alpha1
()
.
S2iBinaries
(),
s2iInformer
.
Devops
()
.
V1alpha1
()
.
S2iRuns
())
volumeExpansionController
:=
expansion
.
NewVolumeExpansionController
(
kubeClient
,
informerFactory
.
Core
()
.
V1
()
.
PersistentVolumeClaims
(),
informerFactory
.
Storage
()
.
V1
()
.
StorageClasses
(),
informerFactory
.
Core
()
.
V1
()
.
Pods
(),
informerFactory
.
Apps
()
.
V1
()
.
Deployments
(),
informerFactory
.
Apps
()
.
V1
()
.
ReplicaSets
(),
informerFactory
.
Apps
()
.
V1
()
.
StatefulSets
())
kubesphereInformer
.
Start
(
stopCh
)
istioInformer
.
Start
(
stopCh
)
informerFactory
.
Start
(
stopCh
)
...
...
@@ -132,6 +142,7 @@ func AddControllers(mgr manager.Manager, cfg *rest.Config, stopCh <-chan struct{
"job-controller"
:
jobController
,
"s2ibinary-controller"
:
s2iBinaryController
,
"s2irun-controller"
:
s2iRunController
,
"volumeexpansion-controller"
:
volumeExpansionController
,
}
for
name
,
ctrl
:=
range
controllers
{
...
...
pkg/controller/storage/expansion/expansion_controller.go
0 → 100644
浏览文件 @
635dcc9b
/*
Copyright 2019 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package
expansion
import
(
"errors"
"fmt"
appsv1
"k8s.io/api/apps/v1"
autoscalingv1
"k8s.io/api/autoscaling/v1"
corev1
"k8s.io/api/core/v1"
storagev1
"k8s.io/api/storage/v1"
apierrors
"k8s.io/apimachinery/pkg/api/errors"
metav1
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
appsv1informers
"k8s.io/client-go/informers/apps/v1"
corev1informers
"k8s.io/client-go/informers/core/v1"
storagev1informer
"k8s.io/client-go/informers/storage/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1
"k8s.io/client-go/kubernetes/typed/core/v1"
listerappsv1
"k8s.io/client-go/listers/apps/v1"
listercorev1
"k8s.io/client-go/listers/core/v1"
listerstoragev1
"k8s.io/client-go/listers/storage/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"time"
)
const
controllerAgentName
=
"expansion-controller"
var
supportedProvisioner
=
[]
string
{
"disk.csi.qingcloud.com"
,
"csi-qingcloud"
}
var
retryTime
=
wait
.
Backoff
{
Duration
:
1
*
time
.
Second
,
Factor
:
2
,
Steps
:
12
,
}
type
VolumeExpansionController
struct
{
kubeclientset
kubernetes
.
Interface
pvcLister
listercorev1
.
PersistentVolumeClaimLister
pvcSynced
cache
.
InformerSynced
classLister
listerstoragev1
.
StorageClassLister
classSynced
cache
.
InformerSynced
podLister
listercorev1
.
PodLister
podSynced
cache
.
InformerSynced
deployLister
listerappsv1
.
DeploymentLister
deploySynced
cache
.
InformerSynced
rsLister
listerappsv1
.
ReplicaSetLister
rsSynced
cache
.
InformerSynced
stsLister
listerappsv1
.
StatefulSetLister
stsSynced
cache
.
InformerSynced
workqueue
workqueue
.
RateLimitingInterface
recorder
record
.
EventRecorder
}
// NewController returns a new volume expansion controller
func
NewVolumeExpansionController
(
kubeclientset
kubernetes
.
Interface
,
pvcInformer
corev1informers
.
PersistentVolumeClaimInformer
,
classInformer
storagev1informer
.
StorageClassInformer
,
podInformer
corev1informers
.
PodInformer
,
deployInformer
appsv1informers
.
DeploymentInformer
,
rsInformer
appsv1informers
.
ReplicaSetInformer
,
stsInformer
appsv1informers
.
StatefulSetInformer
)
*
VolumeExpansionController
{
klog
.
V
(
4
)
.
Info
(
"Creating event broadcaster"
)
eventBroadcaster
:=
record
.
NewBroadcaster
()
eventBroadcaster
.
StartLogging
(
klog
.
Infof
)
eventBroadcaster
.
StartRecordingToSink
(
&
typedcorev1
.
EventSinkImpl
{
Interface
:
kubeclientset
.
CoreV1
()
.
Events
(
""
)})
recorder
:=
eventBroadcaster
.
NewRecorder
(
scheme
.
Scheme
,
corev1
.
EventSource
{
Component
:
controllerAgentName
})
controller
:=
&
VolumeExpansionController
{
kubeclientset
:
kubeclientset
,
pvcLister
:
pvcInformer
.
Lister
(),
pvcSynced
:
pvcInformer
.
Informer
()
.
HasSynced
,
classLister
:
classInformer
.
Lister
(),
classSynced
:
classInformer
.
Informer
()
.
HasSynced
,
podLister
:
podInformer
.
Lister
(),
podSynced
:
podInformer
.
Informer
()
.
HasSynced
,
deployLister
:
deployInformer
.
Lister
(),
deploySynced
:
deployInformer
.
Informer
()
.
HasSynced
,
rsLister
:
rsInformer
.
Lister
(),
rsSynced
:
rsInformer
.
Informer
()
.
HasSynced
,
stsLister
:
stsInformer
.
Lister
(),
stsSynced
:
stsInformer
.
Informer
()
.
HasSynced
,
workqueue
:
workqueue
.
NewNamedRateLimitingQueue
(
workqueue
.
DefaultControllerRateLimiter
(),
"expansion"
),
recorder
:
recorder
,
}
klog
.
V
(
2
)
.
Info
(
"Setting up event handlers"
)
pvcInformer
.
Informer
()
.
AddEventHandler
(
cache
.
ResourceEventHandlerFuncs
{
AddFunc
:
controller
.
enqueuePVC
,
UpdateFunc
:
func
(
old
,
new
interface
{})
{
oldPVC
,
ok
:=
old
.
(
*
corev1
.
PersistentVolumeClaim
)
if
!
ok
{
return
}
oldSize
:=
oldPVC
.
Spec
.
Resources
.
Requests
[
corev1
.
ResourceStorage
]
newPVC
,
ok
:=
new
.
(
*
corev1
.
PersistentVolumeClaim
)
if
!
ok
{
return
}
newSize
:=
newPVC
.
Spec
.
Resources
.
Requests
[
corev1
.
ResourceStorage
]
if
newSize
.
Cmp
(
oldSize
)
>
0
&&
newSize
.
Cmp
(
newPVC
.
Status
.
Capacity
[
corev1
.
ResourceStorage
])
>
0
{
controller
.
handleObject
(
new
)
}
},
DeleteFunc
:
controller
.
enqueuePVC
,
})
return
controller
}
func
(
c
*
VolumeExpansionController
)
Start
(
stopCh
<-
chan
struct
{})
error
{
return
c
.
Run
(
5
,
stopCh
)
}
func
(
c
*
VolumeExpansionController
)
Run
(
threadiness
int
,
stopCh
<-
chan
struct
{})
error
{
defer
utilruntime
.
HandleCrash
()
defer
c
.
workqueue
.
ShutDown
()
klog
.
V
(
2
)
.
Info
(
"Starting expand volume controller"
)
klog
.
V
(
2
)
.
Info
(
"Waiting for informer caches to sync"
)
if
ok
:=
cache
.
WaitForCacheSync
(
stopCh
,
c
.
pvcSynced
,
c
.
classSynced
,
c
.
podSynced
,
c
.
deploySynced
,
c
.
rsSynced
,
c
.
stsSynced
);
!
ok
{
return
fmt
.
Errorf
(
"failed to wait for caches to sync"
)
}
klog
.
V
(
2
)
.
Info
(
"Starting workers"
)
for
i
:=
0
;
i
<
threadiness
;
i
++
{
go
wait
.
Until
(
c
.
runWorker
,
time
.
Second
,
stopCh
)
}
klog
.
V
(
2
)
.
Info
(
"Started workers"
)
<-
stopCh
klog
.
V
(
2
)
.
Info
(
"Shutting down workers"
)
return
nil
}
func
(
c
*
VolumeExpansionController
)
runWorker
()
{
for
c
.
processNextWorkItem
()
{
}
}
func
(
c
*
VolumeExpansionController
)
processNextWorkItem
()
bool
{
obj
,
shutdown
:=
c
.
workqueue
.
Get
()
if
shutdown
{
return
false
}
err
:=
func
(
obj
interface
{})
error
{
defer
c
.
workqueue
.
Done
(
obj
)
var
key
string
var
ok
bool
if
key
,
ok
=
obj
.
(
string
);
!
ok
{
c
.
workqueue
.
Forget
(
obj
)
utilruntime
.
HandleError
(
fmt
.
Errorf
(
"expected string in workqueue but got %#v"
,
obj
))
return
nil
}
if
err
:=
c
.
syncHandler
(
key
);
err
!=
nil
{
c
.
workqueue
.
AddRateLimited
(
key
)
return
fmt
.
Errorf
(
"error syncing '%s': %s, requeuing"
,
key
,
err
.
Error
())
}
c
.
workqueue
.
Forget
(
obj
)
klog
.
V
(
2
)
.
Infof
(
"Successfully synced '%s'"
,
key
)
return
nil
}(
obj
)
if
err
!=
nil
{
utilruntime
.
HandleError
(
err
)
return
true
}
return
true
}
// syncHandler will re-attach PVC on workloads.
// Step 1. Find the workload (deployment or statefulset) mounting PVC.
// If more than one workloads mounts the same PVC, the controller will not
// do anything.
// Step 2. Verify workload types.
// Step 3. Scale down workload.
// Step 4. Retry to check PVC status.
// Step 5. Scale up workload.
func
(
c
*
VolumeExpansionController
)
syncHandler
(
key
string
)
error
{
klog
.
V
(
5
)
.
Infof
(
"syncHandler: handle %s"
,
key
)
namespace
,
name
,
err
:=
cache
.
SplitMetaNamespaceKey
(
key
)
if
err
!=
nil
{
utilruntime
.
HandleError
(
fmt
.
Errorf
(
"invalid resource key: %s"
,
key
))
return
nil
}
// Get PVC source
pvc
,
err
:=
c
.
pvcLister
.
PersistentVolumeClaims
(
namespace
)
.
Get
(
name
)
if
err
!=
nil
{
// The PVC resource may no longer exist, in which case we stop processing.
if
apierrors
.
IsNotFound
(
err
)
{
klog
.
V
(
4
)
.
Infof
(
"PVC '%s' in work queue no longer exists"
,
key
)
utilruntime
.
HandleError
(
fmt
.
Errorf
(
"PVC '%s' in work queue no longer exists"
,
key
))
return
nil
}
return
err
}
// find workload
workload
,
err
:=
c
.
findWorkload
(
name
,
namespace
)
if
err
!=
nil
{
return
err
}
if
workload
==
nil
{
klog
.
V
(
4
)
.
Infof
(
"Cannot find any Pods mounting PVC %s"
,
key
)
return
nil
}
klog
.
V
(
5
)
.
Infof
(
"Find workload %T pvc name %s"
,
workload
,
pvc
.
GetName
())
// handle supported workload
switch
workload
.
(
type
)
{
case
*
appsv1
.
StatefulSet
:
sts
:=
workload
.
(
*
appsv1
.
StatefulSet
)
klog
.
V
(
5
)
.
Infof
(
"Find StatefulSet %s"
,
sts
.
GetName
())
case
*
appsv1
.
Deployment
:
deploy
:=
workload
.
(
*
appsv1
.
Deployment
)
klog
.
V
(
5
)
.
Infof
(
"Find Deployment %s"
,
deploy
.
GetName
())
default
:
klog
.
Errorf
(
"Unsupported workload type %T"
,
workload
)
return
nil
}
// Scale workload to 0
if
err
=
c
.
scaleDown
(
workload
,
pvc
.
GetNamespace
());
err
!=
nil
{
klog
.
V
(
2
)
.
Infof
(
"scale down PVC %s mounted workloads failed %s"
,
key
,
err
.
Error
())
return
err
}
klog
.
V
(
2
)
.
Infof
(
"Scale down PVC %s mounted workloads succeed"
,
key
)
// Wait to scale up
err
=
retry
.
RetryOnConflict
(
retryTime
,
func
()
error
{
klog
.
V
(
4
)
.
Info
(
"waiting for PVC filesystem expansion"
)
if
!
c
.
isWaitingScaleUp
(
name
,
namespace
)
{
return
apierrors
.
NewConflict
(
schema
.
GroupResource
{
Resource
:
"PersistentVolumeClaim"
},
key
,
errors
.
New
(
"waiting for scaling down and expanding disk"
))
}
return
nil
})
klog
.
V
(
5
)
.
Info
(
"after waiting"
)
if
err
!=
nil
{
klog
.
Errorf
(
"Waiting timeout, error: %s"
,
err
.
Error
())
}
// Scale up
if
err
=
c
.
scaleUp
(
workload
,
namespace
);
err
!=
nil
{
klog
.
V
(
2
)
.
Infof
(
"Scale up PVC %s mounted workloads failed %s"
,
key
,
err
.
Error
())
return
err
}
klog
.
V
(
2
)
.
Infof
(
"Scale up PVC %s mounted workloads succeed"
,
key
)
return
nil
}
// handleObject will take any resource implementing metav1.Object and attempt
// to find the PVC resource that 'owns' it. It does this by looking at the
// StorageClass of PVC whether supporting provisioner and allowing volume
// expansion.
// In KS 2.1, the controller only supports disk.csi.qingcloud.com and
// csi-qingcloud as storageclass provisioner.
func
(
c
*
VolumeExpansionController
)
handleObject
(
obj
interface
{})
{
var
object
metav1
.
Object
var
ok
bool
if
object
,
ok
=
obj
.
(
metav1
.
Object
);
!
ok
{
tombstone
,
ok
:=
obj
.
(
cache
.
DeletedFinalStateUnknown
)
if
!
ok
{
utilruntime
.
HandleError
(
fmt
.
Errorf
(
"error decoding object, invalid type"
))
return
}
object
,
ok
=
tombstone
.
Obj
.
(
metav1
.
Object
)
if
!
ok
{
utilruntime
.
HandleError
(
fmt
.
Errorf
(
"error decoding object tombstone, invalid type"
))
return
}
klog
.
V
(
4
)
.
Infof
(
"Recovered deleted object '%s' from tombstone"
,
object
.
GetName
())
}
klog
.
V
(
4
)
.
Infof
(
"Processing object: %s"
,
object
.
GetName
())
pvc
:=
obj
.
(
*
corev1
.
PersistentVolumeClaim
)
// Check storage class
// In KS 2.1, we only support disk.csi.qingcloud.com as storageclass provisioner.
class
:=
c
.
getStorageClass
(
pvc
)
klog
.
V
(
4
)
.
Infof
(
"Get PVC %s SC was %s"
,
pvc
.
String
(),
class
.
String
())
if
class
==
nil
{
return
}
if
*
class
.
AllowVolumeExpansion
==
false
{
return
}
for
_
,
p
:=
range
supportedProvisioner
{
if
class
.
Provisioner
==
p
{
klog
.
V
(
5
)
.
Infof
(
"enqueue PVC %s"
,
claimToClaimKey
(
pvc
))
c
.
enqueuePVC
(
obj
)
return
}
}
}
func
(
c
*
VolumeExpansionController
)
getStorageClass
(
pvc
*
corev1
.
PersistentVolumeClaim
)
*
storagev1
.
StorageClass
{
if
pvc
==
nil
{
return
nil
}
claimClass
:=
getPersistentVolumeClaimClass
(
pvc
)
if
claimClass
==
""
{
klog
.
V
(
4
)
.
Infof
(
"volume expansion is disabled for PVC without StorageClasses: %s"
,
claimToClaimKey
(
pvc
))
return
nil
}
class
,
err
:=
c
.
classLister
.
Get
(
claimClass
)
if
err
!=
nil
{
klog
.
V
(
4
)
.
Infof
(
"failed to expand PVC: %s with error: %v"
,
claimToClaimKey
(
pvc
),
err
)
return
nil
}
return
class
}
// enqueuePVC takes a PVC resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than PVC.
func
(
c
*
VolumeExpansionController
)
enqueuePVC
(
obj
interface
{})
{
pvc
,
ok
:=
obj
.
(
*
corev1
.
PersistentVolumeClaim
)
if
!
ok
{
return
}
size
:=
pvc
.
Spec
.
Resources
.
Requests
[
corev1
.
ResourceStorage
]
statusSize
:=
pvc
.
Status
.
Capacity
[
corev1
.
ResourceStorage
]
if
pvc
.
Status
.
Phase
==
corev1
.
ClaimBound
&&
size
.
Cmp
(
statusSize
)
>
0
{
key
,
err
:=
cache
.
DeletionHandlingMetaNamespaceKeyFunc
(
pvc
)
if
err
!=
nil
{
utilruntime
.
HandleError
(
fmt
.
Errorf
(
"couldn't get key for object %#v: %v"
,
pvc
,
err
))
return
}
c
.
workqueue
.
Add
(
key
)
}
}
// findWorkload returns the pointer of Pod, StatefulSet or Deployment mounting the PVC.
func
(
c
*
VolumeExpansionController
)
findWorkload
(
pvc
,
namespace
string
)
(
workloadPtr
interface
{},
err
error
)
{
podList
,
err
:=
c
.
podLister
.
Pods
(
namespace
)
.
List
(
labels
.
Everything
())
klog
.
V
(
4
)
.
Infof
(
"podlist len %d"
,
len
(
podList
))
if
err
!=
nil
{
return
nil
,
err
}
// Which pod mounting PVC
podPtrListMountPVC
:=
getPodMountPVC
(
podList
,
pvc
)
klog
.
V
(
4
)
.
Infof
(
"Get %d pods mounting PVC"
,
len
(
podPtrListMountPVC
))
// In KS 2.1, automatic re-attach PVC only support PVC mounted on a single Pod.
if
len
(
podPtrListMountPVC
)
!=
1
{
return
nil
,
nil
}
// If pod managed by Deployment, StatefulSet, it returns Deployment or StatefulSet.
// If not, it returns Pod.
klog
.
V
(
4
)
.
Info
(
"Find pod parent"
)
ownerRef
,
err
:=
c
.
findPodParent
(
podPtrListMountPVC
[
0
])
if
err
!=
nil
{
return
nil
,
err
}
if
ownerRef
==
nil
{
// a single pod
return
podPtrListMountPVC
[
0
],
nil
}
else
{
klog
.
V
(
4
)
.
Infof
(
"OwnerRef kind %s"
,
ownerRef
.
Kind
)
switch
ownerRef
.
Kind
{
case
"StatefulSet"
:
return
c
.
stsLister
.
StatefulSets
(
namespace
)
.
Get
(
ownerRef
.
Name
)
case
"Deployment"
:
return
c
.
deployLister
.
Deployments
(
namespace
)
.
Get
(
ownerRef
.
Name
)
default
:
return
nil
,
nil
}
}
}
// If the Pod don't controlled by any controller, return nil.
func
(
c
*
VolumeExpansionController
)
findPodParent
(
pod
*
corev1
.
Pod
)
(
*
metav1
.
OwnerReference
,
error
)
{
if
pod
==
nil
{
return
nil
,
nil
}
if
ownerRef
:=
metav1
.
GetControllerOf
(
pod
);
ownerRef
!=
nil
{
switch
ownerRef
.
Kind
{
case
"ReplicaSet"
:
// get deploy
rs
,
err
:=
c
.
rsLister
.
ReplicaSets
(
pod
.
GetNamespace
())
.
Get
(
ownerRef
.
Name
)
if
err
!=
nil
{
return
nil
,
err
}
if
rsOwnerRef
:=
metav1
.
GetControllerOf
(
rs
);
rsOwnerRef
!=
nil
{
return
rsOwnerRef
,
nil
}
else
{
return
ownerRef
,
nil
}
case
"StatefulSet"
:
return
ownerRef
,
nil
default
:
return
&
metav1
.
OwnerReference
{},
nil
}
}
return
nil
,
nil
}
func
(
c
*
VolumeExpansionController
)
scaleDown
(
workload
interface
{},
namespace
string
)
error
{
switch
workload
.
(
type
)
{
case
*
appsv1
.
Deployment
:
deploy
:=
workload
.
(
*
appsv1
.
Deployment
)
scale
:=
&
autoscalingv1
.
Scale
{
ObjectMeta
:
metav1
.
ObjectMeta
{
Name
:
deploy
.
GetName
(),
Namespace
:
deploy
.
GetNamespace
(),
},
Spec
:
autoscalingv1
.
ScaleSpec
{
Replicas
:
0
,
},
}
_
,
err
:=
c
.
kubeclientset
.
AppsV1
()
.
Deployments
(
namespace
)
.
UpdateScale
(
deploy
.
GetName
(),
scale
)
return
err
case
*
appsv1
.
StatefulSet
:
sts
:=
workload
.
(
*
appsv1
.
StatefulSet
)
scale
:=
&
autoscalingv1
.
Scale
{
ObjectMeta
:
metav1
.
ObjectMeta
{
Name
:
sts
.
GetName
(),
Namespace
:
sts
.
GetNamespace
(),
},
Spec
:
autoscalingv1
.
ScaleSpec
{
Replicas
:
0
,
},
}
_
,
err
:=
c
.
kubeclientset
.
AppsV1
()
.
StatefulSets
(
namespace
)
.
UpdateScale
(
sts
.
GetName
(),
scale
)
return
err
default
:
return
fmt
.
Errorf
(
"unsupported type %T"
,
workload
)
}
}
func
(
c
*
VolumeExpansionController
)
scaleUp
(
workload
interface
{},
namespace
string
)
error
{
switch
workload
.
(
type
)
{
case
*
appsv1
.
Deployment
:
deploy
:=
workload
.
(
*
appsv1
.
Deployment
)
scale
:=
&
autoscalingv1
.
Scale
{
ObjectMeta
:
metav1
.
ObjectMeta
{
Name
:
deploy
.
GetName
(),
Namespace
:
deploy
.
GetNamespace
(),
},
Spec
:
autoscalingv1
.
ScaleSpec
{
Replicas
:
*
deploy
.
Spec
.
Replicas
,
},
}
_
,
err
:=
c
.
kubeclientset
.
AppsV1
()
.
Deployments
(
namespace
)
.
UpdateScale
(
deploy
.
GetName
(),
scale
)
return
err
case
*
appsv1
.
StatefulSet
:
sts
:=
workload
.
(
*
appsv1
.
StatefulSet
)
scale
:=
&
autoscalingv1
.
Scale
{
ObjectMeta
:
metav1
.
ObjectMeta
{
Name
:
sts
.
GetName
(),
Namespace
:
sts
.
GetNamespace
(),
},
Spec
:
autoscalingv1
.
ScaleSpec
{
Replicas
:
*
sts
.
Spec
.
Replicas
,
},
}
_
,
err
:=
c
.
kubeclientset
.
AppsV1
()
.
StatefulSets
(
namespace
)
.
UpdateScale
(
sts
.
GetName
(),
scale
)
return
err
default
:
return
fmt
.
Errorf
(
"unsupported type %T"
,
workload
)
}
}
// isWaitingScaleUp tries to check whether PVC is waiting for restart Pod.
func
(
c
*
VolumeExpansionController
)
isWaitingScaleUp
(
name
,
namespace
string
)
bool
{
pvc
,
err
:=
c
.
pvcLister
.
PersistentVolumeClaims
(
namespace
)
.
Get
(
name
)
if
err
!=
nil
{
klog
.
Errorf
(
"Get PVC error"
)
}
if
pvc
==
nil
{
return
false
}
for
_
,
condition
:=
range
pvc
.
Status
.
Conditions
{
if
condition
.
Type
==
corev1
.
PersistentVolumeClaimFileSystemResizePending
{
return
true
}
}
return
false
}
pkg/controller/storage/expansion/expansion_controller_test.go
0 → 100644
浏览文件 @
635dcc9b
/*
Copyright 2019 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package
expansion
import
(
"encoding/json"
"flag"
"fmt"
appsv1
"k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
storagev1
"k8s.io/api/storage/v1"
metav1
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
clientgotesting
"k8s.io/client-go/testing"
"os"
"testing"
"time"
)
func
TestMain
(
m
*
testing
.
M
)
{
flag
.
Set
(
"alsologtostderr"
,
"true"
)
flag
.
Set
(
"log_dir"
,
"/tmp"
)
flag
.
Set
(
"v"
,
"3"
)
flag
.
Parse
()
ret
:=
m
.
Run
()
os
.
Exit
(
ret
)
}
func
TestSyncHandler
(
t
*
testing
.
T
)
{
retryTime
=
wait
.
Backoff
{
Duration
:
1
*
time
.
Second
,
Factor
:
2
,
Steps
:
2
,
}
tests
:=
[]
struct
{
name
string
pvc
*
v1
.
PersistentVolumeClaim
pod
*
v1
.
Pod
deploy
*
appsv1
.
Deployment
rs
*
appsv1
.
ReplicaSet
sts
*
appsv1
.
StatefulSet
sc
*
storagev1
.
StorageClass
pvcKey
string
hasError
bool
}{
{
name
:
"mount pvc on deploy"
,
pvc
:
getFakePersistentVolumeClaim
(
"fake-pvc"
,
"vol-12345"
,
"fake-sc"
,
types
.
UID
(
123
)),
sc
:
getFakeStorageClass
(
"fake-sc"
,
"fake.sc.com"
),
deploy
:
getFakeDeployment
(
"fake-deploy"
,
"234"
,
1
,
getFakePersistentVolumeClaim
(
"fake-pvc"
,
"vol-12345"
,
"fake-sc"
,
types
.
UID
(
123
))),
pvcKey
:
"default/fake-pvc"
,
hasError
:
false
,
},
{
name
:
"unmounted pvc"
,
pvc
:
getFakePersistentVolumeClaim
(
"fake-pvc"
,
"vol-12345"
,
"fake-sc"
,
types
.
UID
(
123
)),
sc
:
getFakeStorageClass
(
"fake-sc"
,
"fake.sc.com"
),
pvcKey
:
"default/fake-pvc"
,
hasError
:
true
,
},
}
for
_
,
tc
:=
range
tests
{
test
:=
tc
fakeKubeClient
:=
&
fake
.
Clientset
{}
fakeWatch
:=
watch
.
NewFake
()
fakeKubeClient
.
AddWatchReactor
(
"*"
,
clientgotesting
.
DefaultWatchReactor
(
fakeWatch
,
nil
))
informerFactory
:=
informers
.
NewSharedInformerFactory
(
fakeKubeClient
,
0
)
pvcInformer
:=
informerFactory
.
Core
()
.
V1
()
.
PersistentVolumeClaims
()
storageClassInformer
:=
informerFactory
.
Storage
()
.
V1
()
.
StorageClasses
()
podInformer
:=
informerFactory
.
Core
()
.
V1
()
.
Pods
()
deployInformer
:=
informerFactory
.
Apps
()
.
V1
()
.
Deployments
()
rsInformer
:=
informerFactory
.
Apps
()
.
V1
()
.
ReplicaSets
()
stsInformer
:=
informerFactory
.
Apps
()
.
V1
()
.
StatefulSets
()
pvc
:=
tc
.
pvc
if
tc
.
pvc
!=
nil
{
informerFactory
.
Core
()
.
V1
()
.
PersistentVolumeClaims
()
.
Informer
()
.
GetIndexer
()
.
Add
(
pvc
)
}
if
tc
.
sc
!=
nil
{
informerFactory
.
Storage
()
.
V1
()
.
StorageClasses
()
.
Informer
()
.
GetIndexer
()
.
Add
(
tc
.
sc
)
}
if
tc
.
deploy
!=
nil
{
informerFactory
.
Apps
()
.
V1
()
.
Deployments
()
.
Informer
()
.
GetIndexer
()
.
Add
(
tc
.
deploy
)
tc
.
rs
=
generateReplicaSetFromDeployment
(
tc
.
deploy
)
}
if
tc
.
rs
!=
nil
{
informerFactory
.
Apps
()
.
V1
()
.
ReplicaSets
()
.
Informer
()
.
GetIndexer
()
.
Add
(
tc
.
rs
)
tc
.
pod
=
generatePodFromReplicaSet
(
tc
.
rs
)
}
if
tc
.
sts
!=
nil
{
informerFactory
.
Apps
()
.
V1
()
.
StatefulSets
()
.
Informer
()
.
GetIndexer
()
.
Add
(
tc
.
sts
)
}
if
tc
.
pod
!=
nil
{
informerFactory
.
Core
()
.
V1
()
.
Pods
()
.
Informer
()
.
GetIndexer
()
.
Add
(
tc
.
pod
)
}
expc
:=
NewVolumeExpansionController
(
fakeKubeClient
,
pvcInformer
,
storageClassInformer
,
podInformer
,
deployInformer
,
rsInformer
,
stsInformer
)
fakeKubeClient
.
AddReactor
(
"patch"
,
"persistentvolumeclaims"
,
func
(
action
clientgotesting
.
Action
)
(
bool
,
runtime
.
Object
,
error
)
{
if
action
.
GetSubresource
()
==
"status"
{
patchActionaction
,
_
:=
action
.
(
clientgotesting
.
PatchAction
)
pvc
,
err
:=
applyPVCPatch
(
pvc
,
patchActionaction
.
GetPatch
())
if
err
!=
nil
{
return
false
,
nil
,
err
}
return
true
,
pvc
,
nil
}
return
true
,
pvc
,
nil
})
err
:=
expc
.
syncHandler
(
test
.
pvcKey
)
if
err
!=
nil
&&
!
test
.
hasError
{
t
.
Fatalf
(
"for: %s; unexpected error while running handler : %v"
,
test
.
name
,
err
)
}
}
}
func
applyPVCPatch
(
originalPVC
*
v1
.
PersistentVolumeClaim
,
patch
[]
byte
)
(
*
v1
.
PersistentVolumeClaim
,
error
)
{
pvcData
,
err
:=
json
.
Marshal
(
originalPVC
)
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"failed to marshal pvc with %v"
,
err
)
}
updated
,
err
:=
strategicpatch
.
StrategicMergePatch
(
pvcData
,
patch
,
v1
.
PersistentVolumeClaim
{})
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"failed to apply patch on pvc %v"
,
err
)
}
updatedPVC
:=
&
v1
.
PersistentVolumeClaim
{}
if
err
:=
json
.
Unmarshal
(
updated
,
updatedPVC
);
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"failed to unmarshal updated pvc : %v"
,
err
)
}
return
updatedPVC
,
nil
}
func
getFakePersistentVolumeClaim
(
pvcName
,
volumeName
,
scName
string
,
uid
types
.
UID
)
*
v1
.
PersistentVolumeClaim
{
pvc
:=
&
v1
.
PersistentVolumeClaim
{
ObjectMeta
:
metav1
.
ObjectMeta
{
Name
:
pvcName
,
Namespace
:
"default"
,
UID
:
uid
},
Spec
:
v1
.
PersistentVolumeClaimSpec
{},
}
if
volumeName
!=
""
{
pvc
.
Spec
.
VolumeName
=
volumeName
}
if
scName
!=
""
{
pvc
.
Spec
.
StorageClassName
=
&
scName
}
return
pvc
}
func
getFakeStorageClass
(
scName
,
pluginName
string
)
*
storagev1
.
StorageClass
{
return
&
storagev1
.
StorageClass
{
ObjectMeta
:
metav1
.
ObjectMeta
{
Name
:
scName
},
Provisioner
:
pluginName
,
}
}
func
getFakePod
(
podName
string
,
ownerRef
*
metav1
.
OwnerReference
,
uid
types
.
UID
)
*
v1
.
Pod
{
pod
:=
&
v1
.
Pod
{
ObjectMeta
:
metav1
.
ObjectMeta
{
Name
:
podName
,
Namespace
:
"default"
,
UID
:
uid
,
OwnerReferences
:
[]
metav1
.
OwnerReference
{
*
ownerRef
},
},
Spec
:
v1
.
PodSpec
{},
}
return
pod
}
func
getFakeReplicaSet
(
rsName
string
,
ownerRef
*
metav1
.
OwnerReference
,
uid
types
.
UID
,
replicas
int
)
*
appsv1
.
ReplicaSet
{
rs
:=
&
appsv1
.
ReplicaSet
{
ObjectMeta
:
metav1
.
ObjectMeta
{
Name
:
rsName
,
Namespace
:
"default"
,
UID
:
uid
,
OwnerReferences
:
[]
metav1
.
OwnerReference
{
*
ownerRef
},
},
Spec
:
appsv1
.
ReplicaSetSpec
{},
}
return
rs
}
func
getFakeDeployment
(
deployName
string
,
uid
types
.
UID
,
replicas
int32
,
mountPVC
*
v1
.
PersistentVolumeClaim
)
*
appsv1
.
Deployment
{
return
&
appsv1
.
Deployment
{
ObjectMeta
:
metav1
.
ObjectMeta
{
Name
:
deployName
,
Namespace
:
"default"
,
},
Spec
:
appsv1
.
DeploymentSpec
{
Replicas
:
&
replicas
,
Template
:
v1
.
PodTemplateSpec
{
Spec
:
v1
.
PodSpec
{
Volumes
:
[]
v1
.
Volume
{
{
Name
:
"test"
,
VolumeSource
:
v1
.
VolumeSource
{
PersistentVolumeClaim
:
&
v1
.
PersistentVolumeClaimVolumeSource
{
ClaimName
:
mountPVC
.
GetName
(),
},
},
},
},
},
},
},
}
}
func
getFakeStatefulSet
(
stsName
string
,
uid
types
.
UID
,
replicas
int32
,
mountPVC
*
v1
.
PersistentVolumeClaim
)
*
appsv1
.
StatefulSet
{
return
&
appsv1
.
StatefulSet
{
ObjectMeta
:
metav1
.
ObjectMeta
{
Name
:
stsName
,
Namespace
:
"default"
,
},
Spec
:
appsv1
.
StatefulSetSpec
{
Replicas
:
&
replicas
,
Template
:
v1
.
PodTemplateSpec
{
Spec
:
v1
.
PodSpec
{
Volumes
:
[]
v1
.
Volume
{
{
Name
:
"test"
,
VolumeSource
:
v1
.
VolumeSource
{
PersistentVolumeClaim
:
&
v1
.
PersistentVolumeClaimVolumeSource
{
ClaimName
:
mountPVC
.
GetName
(),
},
},
},
},
},
},
},
}
}
// generatePodFromRS creates a pod, with the input ReplicaSet's selector and its template
func
generatePodFromReplicaSet
(
rs
*
appsv1
.
ReplicaSet
)
*
v1
.
Pod
{
trueVar
:=
true
return
&
v1
.
Pod
{
ObjectMeta
:
metav1
.
ObjectMeta
{
Name
:
rs
.
Name
+
"-pod"
,
Namespace
:
rs
.
Namespace
,
OwnerReferences
:
[]
metav1
.
OwnerReference
{
{
UID
:
rs
.
UID
,
APIVersion
:
"v1beta1"
,
Kind
:
"ReplicaSet"
,
Name
:
rs
.
Name
,
Controller
:
&
trueVar
},
},
},
Spec
:
rs
.
Spec
.
Template
.
Spec
,
}
}
func
generateReplicaSetFromDeployment
(
deploy
*
appsv1
.
Deployment
)
*
appsv1
.
ReplicaSet
{
trueVar
:=
true
return
&
appsv1
.
ReplicaSet
{
ObjectMeta
:
metav1
.
ObjectMeta
{
Name
:
deploy
.
Name
+
"-rs"
,
Namespace
:
deploy
.
Namespace
,
OwnerReferences
:
[]
metav1
.
OwnerReference
{
{
UID
:
deploy
.
UID
,
APIVersion
:
"v1beta1"
,
Kind
:
"Deployment"
,
Name
:
deploy
.
Name
,
Controller
:
&
trueVar
},
},
},
Spec
:
appsv1
.
ReplicaSetSpec
{
Replicas
:
deploy
.
Spec
.
Replicas
,
Template
:
deploy
.
Spec
.
Template
,
},
}
}
pkg/controller/storage/expansion/expansion_util.go
0 → 100644
浏览文件 @
635dcc9b
/*
Copyright 2019 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package
expansion
import
(
"fmt"
corev1
"k8s.io/api/core/v1"
"k8s.io/klog"
)
func
getPodMountPVC
(
pods
[]
*
corev1
.
Pod
,
pvc
string
)
[]
*
corev1
.
Pod
{
var
res
[]
*
corev1
.
Pod
for
_
,
pod
:=
range
pods
{
klog
.
V
(
4
)
.
Infof
(
"check pod %s is mount pvc %s"
,
pod
.
Name
,
pvc
)
curPod
:=
pod
if
isMounted
(
pod
,
pvc
)
{
res
=
append
(
res
,
curPod
)
}
}
return
res
}
func
isMounted
(
pod
*
corev1
.
Pod
,
pvc
string
)
bool
{
for
_
,
vol
:=
range
pod
.
Spec
.
Volumes
{
if
vol
.
PersistentVolumeClaim
!=
nil
&&
vol
.
PersistentVolumeClaim
.
ClaimName
==
pvc
{
return
true
}
}
return
false
}
// claimToClaimKey return namespace/name string for pvc
func
claimToClaimKey
(
claim
*
corev1
.
PersistentVolumeClaim
)
string
{
return
fmt
.
Sprintf
(
"%s/%s"
,
claim
.
Namespace
,
claim
.
Name
)
}
// GetPersistentVolumeClaimClass returns StorageClassName. If no storage class was
// requested, it returns "".
func
getPersistentVolumeClaimClass
(
claim
*
corev1
.
PersistentVolumeClaim
)
string
{
// Use beta annotation first
if
class
,
found
:=
claim
.
Annotations
[
corev1
.
BetaStorageClassAnnotation
];
found
{
return
class
}
if
claim
.
Spec
.
StorageClassName
!=
nil
{
return
*
claim
.
Spec
.
StorageClassName
}
return
""
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录