Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
水淹萌龙
kubesphere
提交
1a513a9a
K
kubesphere
项目概览
水淹萌龙
/
kubesphere
与 Fork 源项目一致
Fork自
KubeSphere / kubesphere
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
K
kubesphere
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
未验证
提交
1a513a9a
编写于
8月 27, 2019
作者:
R
runzexia
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix comment
Signed-off-by:
N
runzexia
<
runzexia@yunify.com
>
上级
f00917b0
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
112 addition
and
137 deletion
+112
-137
pkg/apis/devops/v1alpha1/s2ibinary_types.go
pkg/apis/devops/v1alpha1/s2ibinary_types.go
+3
-3
pkg/apis/network/v1alpha1/common.go
pkg/apis/network/v1alpha1/common.go
+1
-1
pkg/apiserver/devops/s2ibinary.go
pkg/apiserver/devops/s2ibinary.go
+10
-10
pkg/controller/s2ibinary/s2ibinary_controller.go
pkg/controller/s2ibinary/s2ibinary_controller.go
+19
-41
pkg/controller/s2irun/s2irun_controller.go
pkg/controller/s2irun/s2irun_controller.go
+18
-41
pkg/models/devops/s2ibinary_handler.go
pkg/models/devops/s2ibinary_handler.go
+35
-29
pkg/simple/client/s2is3/s3.go
pkg/simple/client/s2is3/s3.go
+26
-12
未找到文件。
pkg/apis/devops/v1alpha1/s2ibinary_types.go
浏览文件 @
1a513a9a
...
...
@@ -27,9 +27,9 @@ const (
)
const
(
StatusUploading
=
"Uploading"
StatusReady
=
"Ready"
StatusU
nableToDownload
=
"UnableToDownloa
d"
StatusUploading
=
"Uploading"
StatusReady
=
"Ready"
StatusU
ploadFailed
=
"UploadFaile
d"
)
const
(
...
...
pkg/apis/network/v1alpha1/common.go
浏览文件 @
1a513a9a
package
v1alpha1
import
(
"kubesphere.io/kubesphere/pkg/apis/network/v1alpha1/numorstring"
corev1
"k8s.io/api/core/v1"
"kubesphere.io/kubesphere/pkg/apis/network/v1alpha1/numorstring"
)
// A Rule encapsulates a set of match criteria and an action. Both selector-based security Policy
...
...
pkg/apiserver/devops/s2ibinary.go
浏览文件 @
1a513a9a
...
...
@@ -4,7 +4,7 @@ import (
"code.cloudfoundry.org/bytefmt"
"fmt"
"github.com/emicklei/go-restful"
"
github.com/golang/g
log"
"
k8s.io/k
log"
"kubesphere.io/kubesphere/pkg/errors"
"kubesphere.io/kubesphere/pkg/models/devops"
"kubesphere.io/kubesphere/pkg/utils/hashutil"
...
...
@@ -17,38 +17,38 @@ func UploadS2iBinary(req *restful.Request, resp *restful.Response) {
err
:=
req
.
Request
.
ParseMultipartForm
(
bytefmt
.
MEGABYTE
*
20
)
if
err
!=
nil
{
g
log
.
Errorf
(
"%+v"
,
err
)
k
log
.
Errorf
(
"%+v"
,
err
)
errors
.
ParseSvcErr
(
restful
.
NewError
(
http
.
StatusBadRequest
,
err
.
Error
()),
resp
)
return
}
if
len
(
req
.
Request
.
MultipartForm
.
File
)
==
0
{
err
:=
restful
.
NewError
(
http
.
StatusBadRequest
,
"could not get file from form"
)
g
log
.
Errorf
(
"%+v"
,
err
)
k
log
.
Errorf
(
"%+v"
,
err
)
errors
.
ParseSvcErr
(
restful
.
NewError
(
http
.
StatusBadRequest
,
err
.
Error
()),
resp
)
return
}
if
len
(
req
.
Request
.
MultipartForm
.
File
[
"s2ibinary"
])
==
0
{
err
:=
restful
.
NewError
(
http
.
StatusBadRequest
,
"could not get file from form"
)
g
log
.
Errorf
(
"%+v"
,
err
)
k
log
.
Errorf
(
"%+v"
,
err
)
errors
.
ParseSvcErr
(
err
,
resp
)
return
}
if
len
(
req
.
Request
.
MultipartForm
.
File
[
"s2ibinary"
])
>
1
{
err
:=
restful
.
NewError
(
http
.
StatusBadRequest
,
"s2ibinary should only have one file"
)
g
log
.
Errorf
(
"%+v"
,
err
)
k
log
.
Errorf
(
"%+v"
,
err
)
errors
.
ParseSvcErr
(
err
,
resp
)
return
}
defer
req
.
Request
.
MultipartForm
.
RemoveAll
()
file
,
err
:=
req
.
Request
.
MultipartForm
.
File
[
"s2ibinary"
][
0
]
.
Open
()
if
err
!=
nil
{
g
log
.
Error
(
err
)
k
log
.
Error
(
err
)
errors
.
ParseSvcErr
(
err
,
resp
)
return
}
filemd5
,
err
:=
hashutil
.
GetMD5
(
file
)
if
err
!=
nil
{
g
log
.
Error
(
err
)
k
log
.
Error
(
err
)
errors
.
ParseSvcErr
(
err
,
resp
)
return
}
...
...
@@ -56,7 +56,7 @@ func UploadS2iBinary(req *restful.Request, resp *restful.Response) {
if
ok
&&
len
(
req
.
Request
.
MultipartForm
.
Value
[
"md5"
])
>
0
{
if
md5
[
0
]
!=
filemd5
{
err
:=
restful
.
NewError
(
http
.
StatusBadRequest
,
fmt
.
Sprintf
(
"md5 not match, origin: %+v, calculate: %+v"
,
md5
[
0
],
filemd5
))
g
log
.
Error
(
err
)
k
log
.
Error
(
err
)
errors
.
ParseSvcErr
(
err
,
resp
)
return
}
...
...
@@ -64,7 +64,7 @@ func UploadS2iBinary(req *restful.Request, resp *restful.Response) {
s2ibin
,
err
:=
devops
.
UploadS2iBinary
(
ns
,
name
,
filemd5
,
req
.
Request
.
MultipartForm
.
File
[
"s2ibinary"
][
0
])
if
err
!=
nil
{
g
log
.
Errorf
(
"%+v"
,
err
)
k
log
.
Errorf
(
"%+v"
,
err
)
errors
.
ParseSvcErr
(
err
,
resp
)
return
}
...
...
@@ -78,7 +78,7 @@ func DownloadS2iBinary(req *restful.Request, resp *restful.Response) {
fileName
:=
req
.
PathParameter
(
"file"
)
url
,
err
:=
devops
.
DownloadS2iBinary
(
ns
,
name
,
fileName
)
if
err
!=
nil
{
g
log
.
Errorf
(
"%+v"
,
err
)
k
log
.
Errorf
(
"%+v"
,
err
)
errors
.
ParseSvcErr
(
err
,
resp
)
return
}
...
...
pkg/controller/s2ibinary/s2ibinary_controller.go
浏览文件 @
1a513a9a
...
...
@@ -19,7 +19,6 @@ import (
"k8s.io/kubernetes/pkg/util/metrics"
"kubesphere.io/kubesphere/pkg/simple/client/s2is3"
"kubesphere.io/kubesphere/pkg/utils/sliceutil"
logf
"sigs.k8s.io/controller-runtime/pkg/runtime/log"
"time"
devopsv1alpha1
"kubesphere.io/kubesphere/pkg/apis/devops/v1alpha1"
...
...
@@ -28,8 +27,6 @@ import (
devopslisters
"kubesphere.io/kubesphere/pkg/client/listers/devops/v1alpha1"
)
var
log
=
logf
.
Log
.
WithName
(
"s2ibinary-controller"
)
type
S2iBinaryController
struct
{
client
clientset
.
Interface
devopsClient
devopsclient
.
Interface
...
...
@@ -51,7 +48,7 @@ func NewController(devopsclientset devopsclient.Interface,
broadcaster
:=
record
.
NewBroadcaster
()
broadcaster
.
StartLogging
(
func
(
format
string
,
args
...
interface
{})
{
log
.
Info
(
fmt
.
Sprintf
(
format
,
args
))
k
log
.
Info
(
fmt
.
Sprintf
(
format
,
args
))
})
broadcaster
.
StartRecordingToSink
(
&
v1core
.
EventSinkImpl
{
Interface
:
client
.
CoreV1
()
.
Events
(
""
)})
recorder
:=
broadcaster
.
NewRecorder
(
scheme
.
Scheme
,
v1
.
EventSource
{
Component
:
"s2ibinary-controller"
})
...
...
@@ -73,24 +70,24 @@ func NewController(devopsclientset devopsclient.Interface,
v
.
eventRecorder
=
recorder
s2ibinInformer
.
Informer
()
.
AddEventHandler
(
cache
.
ResourceEventHandlerFuncs
{
AddFunc
:
v
.
enqueue
Foo
,
AddFunc
:
v
.
enqueue
S2iBinary
,
UpdateFunc
:
func
(
oldObj
,
newObj
interface
{})
{
old
:=
oldObj
.
(
*
devopsv1alpha1
.
S2iBinary
)
new
:=
newObj
.
(
*
devopsv1alpha1
.
S2iBinary
)
if
old
.
ResourceVersion
==
new
.
ResourceVersion
{
return
}
v
.
enqueue
Foo
(
newObj
)
v
.
enqueue
S2iBinary
(
newObj
)
},
DeleteFunc
:
v
.
enqueue
Foo
,
DeleteFunc
:
v
.
enqueue
S2iBinary
,
})
return
v
}
// enqueue
Foo
takes a Foo resource and converts it into a namespace/name
// enqueue
S2iBinary
takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work workqueue. This method should *not* be
// passed resources of any type other than
Foo
.
func
(
c
*
S2iBinaryController
)
enqueue
Foo
(
obj
interface
{})
{
// passed resources of any type other than
S2iBinary
.
func
(
c
*
S2iBinaryController
)
enqueue
S2iBinary
(
obj
interface
{})
{
var
key
string
var
err
error
if
key
,
err
=
cache
.
MetaNamespaceKeyFunc
(
obj
);
err
!=
nil
{
...
...
@@ -107,46 +104,27 @@ func (c *S2iBinaryController) processNextWorkItem() bool {
return
false
}
// We wrap this block in a func so we can defer c.workqueue.Done.
err
:=
func
(
obj
interface
{})
error
{
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer
c
.
workqueue
.
Done
(
obj
)
var
key
string
var
ok
bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if
key
,
ok
=
obj
.
(
string
);
!
ok
{
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c
.
workqueue
.
Forget
(
obj
)
utilruntime
.
HandleError
(
fmt
.
Errorf
(
"expected string in workqueue but got %#v"
,
obj
))
return
nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
if
err
:=
c
.
syncHandler
(
key
);
err
!=
nil
{
// Put the item back on the workqueue to handle any transient errors.
c
.
workqueue
.
AddRateLimited
(
key
)
return
fmt
.
Errorf
(
"error syncing '%s': %s, requeuing"
,
key
,
err
.
Error
())
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c
.
workqueue
.
Forget
(
obj
)
klog
.
Infof
(
"Successfully synced '%s'"
,
key
)
return
nil
}(
obj
)
if
err
!=
nil
{
log
.
Error
(
err
,
"could not reconcile s2ibinary"
)
k
log
.
Error
(
err
,
"could not reconcile s2ibinary"
)
utilruntime
.
HandleError
(
err
)
return
true
}
...
...
@@ -168,8 +146,8 @@ func (c *S2iBinaryController) Run(workers int, stopCh <-chan struct{}) error {
defer
utilruntime
.
HandleCrash
()
defer
c
.
workqueue
.
ShutDown
()
log
.
Info
(
"starting s2ibinary controller"
)
defer
log
.
Info
(
"shutting down s2ibinary controller"
)
k
log
.
Info
(
"starting s2ibinary controller"
)
defer
k
log
.
Info
(
"shutting down s2ibinary controller"
)
if
!
cache
.
WaitForCacheSync
(
stopCh
,
c
.
s2iBinarySynced
)
{
return
fmt
.
Errorf
(
"failed to wait for caches to sync"
)
...
...
@@ -189,16 +167,16 @@ func (c *S2iBinaryController) Run(workers int, stopCh <-chan struct{}) error {
func
(
c
*
S2iBinaryController
)
syncHandler
(
key
string
)
error
{
namespace
,
name
,
err
:=
cache
.
SplitMetaNamespaceKey
(
key
)
if
err
!=
nil
{
log
.
Error
(
err
,
fmt
.
Sprintf
(
"could not split s2ibin meta %s "
,
key
))
k
log
.
Error
(
err
,
fmt
.
Sprintf
(
"could not split s2ibin meta %s "
,
key
))
return
nil
}
s2ibin
,
err
:=
c
.
s2iBinaryLister
.
S2iBinaries
(
namespace
)
.
Get
(
name
)
if
err
!=
nil
{
if
errors
.
IsNotFound
(
err
)
{
log
.
Info
(
fmt
.
Sprintf
(
"s2ibin '%s' in work queue no longer exists "
,
key
))
k
log
.
Info
(
fmt
.
Sprintf
(
"s2ibin '%s' in work queue no longer exists "
,
key
))
return
nil
}
log
.
Error
(
err
,
fmt
.
Sprintf
(
"could not get s2ibin %s "
,
key
))
k
log
.
Error
(
err
,
fmt
.
Sprintf
(
"could not get s2ibin %s "
,
key
))
return
err
}
if
s2ibin
.
ObjectMeta
.
DeletionTimestamp
.
IsZero
()
{
...
...
@@ -206,7 +184,7 @@ func (c *S2iBinaryController) syncHandler(key string) error {
s2ibin
.
ObjectMeta
.
Finalizers
=
append
(
s2ibin
.
ObjectMeta
.
Finalizers
,
devopsv1alpha1
.
S2iBinaryFinalizerName
)
_
,
err
:=
c
.
devopsClient
.
DevopsV1alpha1
()
.
S2iBinaries
(
namespace
)
.
Update
(
s2ibin
)
if
err
!=
nil
{
log
.
Error
(
err
,
fmt
.
Sprintf
(
"failed to update s2ibin %s "
,
key
))
k
log
.
Error
(
err
,
fmt
.
Sprintf
(
"failed to update s2ibin %s "
,
key
))
return
err
}
}
...
...
@@ -214,7 +192,7 @@ func (c *S2iBinaryController) syncHandler(key string) error {
}
else
{
if
sliceutil
.
HasString
(
s2ibin
.
ObjectMeta
.
Finalizers
,
devopsv1alpha1
.
S2iBinaryFinalizerName
)
{
if
err
:=
c
.
DeleteBinaryInS3
(
s2ibin
);
err
!=
nil
{
log
.
Error
(
err
,
fmt
.
Sprintf
(
"failed to delete resource %s in s3"
,
key
))
k
log
.
Error
(
err
,
fmt
.
Sprintf
(
"failed to delete resource %s in s3"
,
key
))
return
err
}
s2ibin
.
ObjectMeta
.
Finalizers
=
sliceutil
.
RemoveString
(
s2ibin
.
ObjectMeta
.
Finalizers
,
func
(
item
string
)
bool
{
...
...
@@ -222,7 +200,7 @@ func (c *S2iBinaryController) syncHandler(key string) error {
})
_
,
err
:=
c
.
devopsClient
.
DevopsV1alpha1
()
.
S2iBinaries
(
namespace
)
.
Update
(
s2ibin
)
if
err
!=
nil
{
log
.
Error
(
err
,
fmt
.
Sprintf
(
"failed to update s2ibin %s "
,
key
))
k
log
.
Error
(
err
,
fmt
.
Sprintf
(
"failed to update s2ibin %s "
,
key
))
return
err
}
}
...
...
@@ -244,11 +222,11 @@ func (c *S2iBinaryController) DeleteBinaryInS3(s2ibin *devopsv1alpha1.S2iBinary)
case
s3
.
ErrCodeNoSuchKey
:
return
nil
default
:
log
.
Error
(
err
,
fmt
.
Sprintf
(
"failed to delete s2ibin %s/%s in s3"
,
s2ibin
.
Namespace
,
s2ibin
.
Name
))
k
log
.
Error
(
err
,
fmt
.
Sprintf
(
"failed to delete s2ibin %s/%s in s3"
,
s2ibin
.
Namespace
,
s2ibin
.
Name
))
return
err
}
}
else
{
log
.
Error
(
err
,
fmt
.
Sprintf
(
"failed to delete s2ibin %s/%s in s3"
,
s2ibin
.
Namespace
,
s2ibin
.
Name
))
k
log
.
Error
(
err
,
fmt
.
Sprintf
(
"failed to delete s2ibin %s/%s in s3"
,
s2ibin
.
Namespace
,
s2ibin
.
Name
))
return
err
}
}
...
...
pkg/controller/s2irun/s2irun_controller.go
浏览文件 @
1a513a9a
...
...
@@ -15,7 +15,6 @@ import (
"k8s.io/klog"
"k8s.io/kubernetes/pkg/util/metrics"
"kubesphere.io/kubesphere/pkg/utils/sliceutil"
logf
"sigs.k8s.io/controller-runtime/pkg/runtime/log"
"time"
s2iv1alpha1
"github.com/kubesphere/s2ioperator/pkg/apis/devops/v1alpha1"
...
...
@@ -28,8 +27,6 @@ import (
devopslisters
"kubesphere.io/kubesphere/pkg/client/listers/devops/v1alpha1"
)
var
log
=
logf
.
Log
.
WithName
(
"s2irun-controller"
)
type
S2iRunController
struct
{
client
clientset
.
Interface
s2iClient
s2iclient
.
Interface
...
...
@@ -56,7 +53,7 @@ func NewController(devopsclientset devopsclient.Interface, s2iclientset s2iclien
broadcaster
:=
record
.
NewBroadcaster
()
broadcaster
.
StartLogging
(
func
(
format
string
,
args
...
interface
{})
{
log
.
Info
(
fmt
.
Sprintf
(
format
,
args
))
k
log
.
Info
(
fmt
.
Sprintf
(
format
,
args
))
})
broadcaster
.
StartRecordingToSink
(
&
v1core
.
EventSinkImpl
{
Interface
:
client
.
CoreV1
()
.
Events
(
""
)})
recorder
:=
broadcaster
.
NewRecorder
(
scheme
.
Scheme
,
v1
.
EventSource
{
Component
:
"s2irun-controller"
})
...
...
@@ -81,16 +78,16 @@ func NewController(devopsclientset devopsclient.Interface, s2iclientset s2iclien
v
.
eventRecorder
=
recorder
s2iRunInformer
.
Informer
()
.
AddEventHandler
(
cache
.
ResourceEventHandlerFuncs
{
AddFunc
:
v
.
enqueue
Foo
,
AddFunc
:
v
.
enqueue
S2iRun
,
UpdateFunc
:
func
(
oldObj
,
newObj
interface
{})
{
old
:=
oldObj
.
(
*
s2iv1alpha1
.
S2iRun
)
new
:=
newObj
.
(
*
s2iv1alpha1
.
S2iRun
)
if
old
.
ResourceVersion
==
new
.
ResourceVersion
{
return
}
v
.
enqueue
Foo
(
newObj
)
v
.
enqueue
S2iRun
(
newObj
)
},
DeleteFunc
:
v
.
enqueue
Foo
,
DeleteFunc
:
v
.
enqueue
S2iRun
,
})
return
v
}
...
...
@@ -98,7 +95,7 @@ func NewController(devopsclientset devopsclient.Interface, s2iclientset s2iclien
// enqueueFoo takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work workqueue. This method should *not* be
// passed resources of any type other than Foo.
func
(
c
*
S2iRunController
)
enqueue
Foo
(
obj
interface
{})
{
func
(
c
*
S2iRunController
)
enqueue
S2iRun
(
obj
interface
{})
{
var
key
string
var
err
error
if
key
,
err
=
cache
.
MetaNamespaceKeyFunc
(
obj
);
err
!=
nil
{
...
...
@@ -115,46 +112,26 @@ func (c *S2iRunController) processNextWorkItem() bool {
return
false
}
// We wrap this block in a func so we can defer c.workqueue.Done.
err
:=
func
(
obj
interface
{})
error
{
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer
c
.
workqueue
.
Done
(
obj
)
var
key
string
var
ok
bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if
key
,
ok
=
obj
.
(
string
);
!
ok
{
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c
.
workqueue
.
Forget
(
obj
)
utilruntime
.
HandleError
(
fmt
.
Errorf
(
"expected string in workqueue but got %#v"
,
obj
))
return
nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
if
err
:=
c
.
syncHandler
(
key
);
err
!=
nil
{
// Put the item back on the workqueue to handle any transient errors.
c
.
workqueue
.
AddRateLimited
(
key
)
return
fmt
.
Errorf
(
"error syncing '%s': %s, requeuing"
,
key
,
err
.
Error
())
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c
.
workqueue
.
Forget
(
obj
)
klog
.
Infof
(
"Successfully synced '%s'"
,
key
)
return
nil
}(
obj
)
if
err
!=
nil
{
log
.
Error
(
err
,
"could not reconcile s2irun"
)
k
log
.
Error
(
err
,
"could not reconcile s2irun"
)
utilruntime
.
HandleError
(
err
)
return
true
}
...
...
@@ -176,8 +153,8 @@ func (c *S2iRunController) Run(workers int, stopCh <-chan struct{}) error {
defer
utilruntime
.
HandleCrash
()
defer
c
.
workqueue
.
ShutDown
()
log
.
Info
(
"starting s2irun controller"
)
defer
log
.
Info
(
"shutting down s2irun controller"
)
k
log
.
Info
(
"starting s2irun controller"
)
defer
k
log
.
Info
(
"shutting down s2irun controller"
)
if
!
cache
.
WaitForCacheSync
(
stopCh
,
c
.
s2iBinarySynced
)
{
return
fmt
.
Errorf
(
"failed to wait for caches to sync"
)
...
...
@@ -197,16 +174,16 @@ func (c *S2iRunController) Run(workers int, stopCh <-chan struct{}) error {
func
(
c
*
S2iRunController
)
syncHandler
(
key
string
)
error
{
namespace
,
name
,
err
:=
cache
.
SplitMetaNamespaceKey
(
key
)
if
err
!=
nil
{
log
.
Error
(
err
,
fmt
.
Sprintf
(
"could not split s2irun meta %s "
,
key
))
k
log
.
Error
(
err
,
fmt
.
Sprintf
(
"could not split s2irun meta %s "
,
key
))
return
nil
}
s2irun
,
err
:=
c
.
s2iRunLister
.
S2iRuns
(
namespace
)
.
Get
(
name
)
if
err
!=
nil
{
if
errors
.
IsNotFound
(
err
)
{
log
.
Info
(
fmt
.
Sprintf
(
"s2irun '%s' in work queue no longer exists "
,
key
))
k
log
.
Info
(
fmt
.
Sprintf
(
"s2irun '%s' in work queue no longer exists "
,
key
))
return
nil
}
log
.
Error
(
err
,
fmt
.
Sprintf
(
"could not get s2irun %s "
,
key
))
k
log
.
Error
(
err
,
fmt
.
Sprintf
(
"could not get s2irun %s "
,
key
))
return
err
}
if
s2irun
.
Labels
!=
nil
{
...
...
@@ -217,7 +194,7 @@ func (c *S2iRunController) syncHandler(key string) error {
s2irun
.
ObjectMeta
.
Finalizers
=
append
(
s2irun
.
ObjectMeta
.
Finalizers
,
devopsv1alpha1
.
S2iBinaryFinalizerName
)
_
,
err
:=
c
.
s2iClient
.
DevopsV1alpha1
()
.
S2iRuns
(
namespace
)
.
Update
(
s2irun
)
if
err
!=
nil
{
log
.
Error
(
err
,
fmt
.
Sprintf
(
"failed to update s2irun %s"
,
key
))
k
log
.
Error
(
err
,
fmt
.
Sprintf
(
"failed to update s2irun %s"
,
key
))
return
err
}
}
...
...
@@ -225,7 +202,7 @@ func (c *S2iRunController) syncHandler(key string) error {
}
else
{
if
sliceutil
.
HasString
(
s2irun
.
ObjectMeta
.
Finalizers
,
devopsv1alpha1
.
S2iBinaryFinalizerName
)
{
if
err
:=
c
.
DeleteS2iBinary
(
s2irun
);
err
!=
nil
{
log
.
Error
(
err
,
fmt
.
Sprintf
(
"failed to delete s2ibin %s in"
,
key
))
k
log
.
Error
(
err
,
fmt
.
Sprintf
(
"failed to delete s2ibin %s in"
,
key
))
return
err
}
s2irun
.
ObjectMeta
.
Finalizers
=
sliceutil
.
RemoveString
(
s2irun
.
ObjectMeta
.
Finalizers
,
func
(
item
string
)
bool
{
...
...
@@ -233,7 +210,7 @@ func (c *S2iRunController) syncHandler(key string) error {
})
_
,
err
:=
c
.
s2iClient
.
DevopsV1alpha1
()
.
S2iRuns
(
namespace
)
.
Update
(
s2irun
)
if
err
!=
nil
{
log
.
Error
(
err
,
fmt
.
Sprintf
(
"failed to update s2irun %s "
,
key
))
k
log
.
Error
(
err
,
fmt
.
Sprintf
(
"failed to update s2irun %s "
,
key
))
return
err
}
}
...
...
@@ -249,19 +226,19 @@ func (c *S2iRunController) DeleteS2iBinary(s2irun *s2iv1alpha1.S2iRun) error {
s2iBin
,
err
:=
c
.
s2iBinaryLister
.
S2iBinaries
(
s2irun
.
Namespace
)
.
Get
(
s2iBinName
)
if
err
!=
nil
{
if
errors
.
IsNotFound
(
err
)
{
log
.
Info
(
fmt
.
Sprintf
(
"s2ibin '%s/%s' has been delted "
,
s2irun
.
Namespace
,
s2iBinName
))
k
log
.
Info
(
fmt
.
Sprintf
(
"s2ibin '%s/%s' has been delted "
,
s2irun
.
Namespace
,
s2iBinName
))
return
nil
}
log
.
Error
(
err
,
fmt
.
Sprintf
(
"failed to get s2ibin %s/%s "
,
s2irun
.
Namespace
,
s2iBinName
))
k
log
.
Error
(
err
,
fmt
.
Sprintf
(
"failed to get s2ibin %s/%s "
,
s2irun
.
Namespace
,
s2iBinName
))
return
err
}
err
=
c
.
devopsClient
.
DevopsV1alpha1
()
.
S2iBinaries
(
s2iBin
.
Namespace
)
.
Delete
(
s2iBinName
,
nil
)
if
err
!=
nil
{
if
errors
.
IsNotFound
(
err
)
{
log
.
Info
(
fmt
.
Sprintf
(
"s2ibin '%s/%s' has been delted "
,
s2irun
.
Namespace
,
s2iBinName
))
k
log
.
Info
(
fmt
.
Sprintf
(
"s2ibin '%s/%s' has been delted "
,
s2irun
.
Namespace
,
s2iBinName
))
return
nil
}
log
.
Error
(
err
,
fmt
.
Sprintf
(
"failed to delete s2ibin %s/%s "
,
s2irun
.
Namespace
,
s2iBinName
))
k
log
.
Error
(
err
,
fmt
.
Sprintf
(
"failed to delete s2ibin %s/%s "
,
s2irun
.
Namespace
,
s2iBinName
))
return
err
}
...
...
pkg/models/devops/s2ibinary_handler.go
浏览文件 @
1a513a9a
...
...
@@ -8,9 +8,9 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/emicklei/go-restful"
"github.com/golang/glog"
metav1
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/apis/devops/v1alpha1"
"kubesphere.io/kubesphere/pkg/informers"
"kubesphere.io/kubesphere/pkg/simple/client/k8s"
...
...
@@ -28,20 +28,20 @@ const (
func
UploadS2iBinary
(
namespace
,
name
,
md5
string
,
fileHeader
*
multipart
.
FileHeader
)
(
*
v1alpha1
.
S2iBinary
,
error
)
{
binFile
,
err
:=
fileHeader
.
Open
()
if
err
!=
nil
{
g
log
.
Errorf
(
"%+v"
,
err
)
k
log
.
Errorf
(
"%+v"
,
err
)
return
nil
,
err
}
defer
binFile
.
Close
()
origin
,
err
:=
informers
.
KsSharedInformerFactory
()
.
Devops
()
.
V1alpha1
()
.
S2iBinaries
()
.
Lister
()
.
S2iBinaries
(
namespace
)
.
Get
(
name
)
if
err
!=
nil
{
g
log
.
Errorf
(
"%+v"
,
err
)
k
log
.
Errorf
(
"%+v"
,
err
)
return
nil
,
err
}
//Check file is uploading
if
origin
.
Status
.
Phase
==
v1alpha1
.
StatusUploading
{
err
:=
restful
.
NewError
(
http
.
StatusConflict
,
"file is uploading, please try later"
)
g
log
.
Error
(
err
)
k
log
.
Error
(
err
)
return
nil
,
err
}
copy
:=
origin
.
DeepCopy
()
...
...
@@ -54,13 +54,14 @@ func UploadS2iBinary(namespace, name, md5 string, fileHeader *multipart.FileHead
}
//Set status Uploading to lock resource
origin
,
err
=
SetS2iBinaryStatus
(
origin
,
v1alpha1
.
StatusUploading
)
uploading
,
err
:=
SetS2iBinaryStatus
(
copy
,
v1alpha1
.
StatusUploading
)
if
err
!=
nil
{
err
:=
restful
.
NewError
(
http
.
StatusConflict
,
fmt
.
Sprintf
(
"could not set status: %+v"
,
err
))
g
log
.
Error
(
err
)
k
log
.
Error
(
err
)
return
nil
,
err
}
copy
=
origin
.
DeepCopy
()
copy
=
uploading
.
DeepCopy
()
copy
.
Spec
.
MD5
=
md5
copy
.
Spec
.
Size
=
bytefmt
.
ByteSize
(
uint64
(
fileHeader
.
Size
))
copy
.
Spec
.
FileName
=
fileHeader
.
Filename
...
...
@@ -68,7 +69,12 @@ func UploadS2iBinary(namespace, name, md5 string, fileHeader *multipart.FileHead
s3session
:=
s2is3
.
Session
()
if
s3session
==
nil
{
err
:=
fmt
.
Errorf
(
"could not connect to s2i s3"
)
glog
.
Error
(
err
)
klog
.
Error
(
err
)
_
,
serr
:=
SetS2iBinaryStatusWithRetry
(
copy
,
origin
.
Status
.
Phase
)
if
serr
!=
nil
{
klog
.
Error
(
serr
)
return
nil
,
err
}
return
nil
,
err
}
uploader
:=
s3manager
.
NewUploader
(
s3session
,
func
(
uploader
*
s3manager
.
Uploader
)
{
...
...
@@ -87,22 +93,22 @@ func UploadS2iBinary(namespace, name, md5 string, fileHeader *multipart.FileHead
if
aerr
,
ok
:=
err
.
(
awserr
.
Error
);
ok
{
switch
aerr
.
Code
()
{
case
s3
.
ErrCodeNoSuchBucket
:
g
log
.
Error
(
err
)
_
,
serr
:=
SetS2iBinaryStatusWithRetry
(
origin
,
origin
.
Status
.
Phase
)
k
log
.
Error
(
err
)
_
,
serr
:=
SetS2iBinaryStatusWithRetry
(
copy
,
origin
.
Status
.
Phase
)
if
serr
!=
nil
{
g
log
.
Error
(
serr
)
k
log
.
Error
(
serr
)
}
return
nil
,
err
default
:
g
log
.
Error
(
err
)
_
,
serr
:=
SetS2iBinaryStatusWithRetry
(
origin
,
v1alpha1
.
StatusUnableToDownloa
d
)
k
log
.
Error
(
err
)
_
,
serr
:=
SetS2iBinaryStatusWithRetry
(
copy
,
v1alpha1
.
StatusUploadFaile
d
)
if
serr
!=
nil
{
g
log
.
Error
(
serr
)
k
log
.
Error
(
serr
)
}
return
nil
,
err
}
}
g
log
.
Error
(
err
)
k
log
.
Error
(
err
)
return
nil
,
err
}
...
...
@@ -110,40 +116,40 @@ func UploadS2iBinary(namespace, name, md5 string, fileHeader *multipart.FileHead
copy
.
Spec
.
UploadTimeStamp
=
new
(
metav1
.
Time
)
}
*
copy
.
Spec
.
UploadTimeStamp
=
metav1
.
Now
()
resp
,
err
:
=
k8s
.
KsClient
()
.
DevopsV1alpha1
()
.
S2iBinaries
(
namespace
)
.
Update
(
copy
)
copy
,
err
=
k8s
.
KsClient
()
.
DevopsV1alpha1
()
.
S2iBinaries
(
namespace
)
.
Update
(
copy
)
if
err
!=
nil
{
g
log
.
Error
(
err
)
k
log
.
Error
(
err
)
return
nil
,
err
}
resp
,
err
=
SetS2iBinaryStatusWithRetry
(
resp
,
v1alpha1
.
StatusReady
)
copy
,
err
=
SetS2iBinaryStatusWithRetry
(
copy
,
v1alpha1
.
StatusReady
)
if
err
!=
nil
{
g
log
.
Error
(
err
)
k
log
.
Error
(
err
)
return
nil
,
err
}
return
resp
,
nil
return
copy
,
nil
}
func
DownloadS2iBinary
(
namespace
,
name
,
fileName
string
)
(
string
,
error
)
{
origin
,
err
:=
informers
.
KsSharedInformerFactory
()
.
Devops
()
.
V1alpha1
()
.
S2iBinaries
()
.
Lister
()
.
S2iBinaries
(
namespace
)
.
Get
(
name
)
if
err
!=
nil
{
g
log
.
Errorf
(
"%+v"
,
err
)
k
log
.
Errorf
(
"%+v"
,
err
)
return
""
,
err
}
if
origin
.
Spec
.
FileName
!=
fileName
{
err
:=
fmt
.
Errorf
(
"could not fould file %s"
,
fileName
)
g
log
.
Error
(
err
)
k
log
.
Error
(
err
)
return
""
,
err
}
if
origin
.
Status
.
Phase
!=
v1alpha1
.
StatusReady
{
err
:=
restful
.
NewError
(
http
.
StatusBadRequest
,
"file is not ready, please try later"
)
g
log
.
Error
(
err
)
k
log
.
Error
(
err
)
return
""
,
err
}
s3Client
:=
s2is3
.
Client
()
if
s3Client
==
nil
{
err
:=
fmt
.
Errorf
(
"could not get s3 client"
)
g
log
.
Error
(
err
)
k
log
.
Error
(
err
)
return
""
,
err
}
req
,
_
:=
s3Client
.
GetObjectRequest
(
&
s3
.
GetObjectInput
{
...
...
@@ -153,7 +159,7 @@ func DownloadS2iBinary(namespace, name, fileName string) (string, error) {
})
url
,
err
:=
req
.
Presign
(
5
*
time
.
Minute
)
if
err
!=
nil
{
g
log
.
Error
(
err
)
k
log
.
Error
(
err
)
return
""
,
err
}
return
url
,
nil
...
...
@@ -165,7 +171,7 @@ func SetS2iBinaryStatus(s2ibin *v1alpha1.S2iBinary, status string) (*v1alpha1.S2
copy
.
Status
.
Phase
=
status
copy
,
err
:=
k8s
.
KsClient
()
.
DevopsV1alpha1
()
.
S2iBinaries
(
s2ibin
.
Namespace
)
.
Update
(
copy
)
if
err
!=
nil
{
g
log
.
Error
(
err
)
k
log
.
Error
(
err
)
return
nil
,
err
}
return
copy
,
nil
...
...
@@ -178,19 +184,19 @@ func SetS2iBinaryStatusWithRetry(s2ibin *v1alpha1.S2iBinary, status string) (*v1
err
=
retry
.
RetryOnConflict
(
retry
.
DefaultRetry
,
func
()
error
{
bin
,
err
=
informers
.
KsSharedInformerFactory
()
.
Devops
()
.
V1alpha1
()
.
S2iBinaries
()
.
Lister
()
.
S2iBinaries
(
s2ibin
.
Namespace
)
.
Get
(
s2ibin
.
Name
)
if
err
!=
nil
{
g
log
.
Error
(
err
)
k
log
.
Error
(
err
)
return
err
}
bin
.
Status
.
Phase
=
status
bin
,
err
=
k8s
.
KsClient
()
.
DevopsV1alpha1
()
.
S2iBinaries
(
s2ibin
.
Namespace
)
.
Update
(
bin
)
if
err
!=
nil
{
g
log
.
Error
(
err
)
k
log
.
Error
(
err
)
return
err
}
return
nil
})
if
err
!=
nil
{
g
log
.
Error
(
err
)
k
log
.
Error
(
err
)
return
nil
,
err
}
...
...
pkg/simple/client/s2is3/s3.go
浏览文件 @
1a513a9a
...
...
@@ -6,7 +6,8 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/golang/glog"
"k8s.io/klog"
"sync"
)
var
(
...
...
@@ -19,8 +20,12 @@ var (
s3SessionToken
string
s3Bucket
string
)
var
s2iS3
*
s3
.
S3
var
s2iS3Session
*
session
.
Session
var
(
s2iS3
*
s3
.
S3
s2iS3Session
*
session
.
Session
sessionInitMutex
sync
.
Mutex
clientInitMutex
sync
.
Mutex
)
func
init
()
{
flag
.
StringVar
(
&
s3Region
,
"s2i-s3-region"
,
"us-east-1"
,
"region of s2i s3"
)
...
...
@@ -37,6 +42,23 @@ func Client() *s3.S3 {
if
s2iS3
!=
nil
{
return
s2iS3
}
clientInitMutex
.
Lock
()
defer
clientInitMutex
.
Unlock
()
if
s2iS3Session
==
nil
{
if
sess
:=
Session
();
sess
!=
nil
{
klog
.
Error
(
"failed to connect to s2i s3"
)
return
nil
}
}
s2iS3
=
s3
.
New
(
s2iS3Session
)
return
s2iS3
}
func
Session
()
*
session
.
Session
{
if
s2iS3Session
!=
nil
{
return
s2iS3Session
}
sessionInitMutex
.
Lock
()
defer
sessionInitMutex
.
Unlock
()
creds
:=
credentials
.
NewStaticCredentials
(
s3AccessKeyID
,
s3SecretAccessKey
,
s3SessionToken
,
)
...
...
@@ -49,18 +71,10 @@ func Client() *s3.S3 {
}
sess
,
err
:=
session
.
NewSession
(
config
)
if
err
!=
nil
{
g
log
.
Errorf
(
"failed to connect to s2i s3: %+v"
,
err
)
k
log
.
Errorf
(
"failed to connect to s2i s3: %+v"
,
err
)
return
nil
}
s2iS3Session
=
sess
s2iS3
=
s3
.
New
(
sess
)
return
s2iS3
}
func
Session
()
*
session
.
Session
{
if
s2iS3Session
!=
nil
{
return
s2iS3Session
}
Client
()
return
s2iS3Session
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录