s2ibinary_handler.go 5.9 KB
Newer Older
R
runzexia 已提交
1 2 3 4 5 6 7 8 9 10 11 12
package devops

import (
	"code.cloudfoundry.org/bytefmt"
	"fmt"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/awserr"
	"github.com/aws/aws-sdk-go/service/s3"
	"github.com/aws/aws-sdk-go/service/s3/s3manager"
	"github.com/emicklei/go-restful"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/util/retry"
R
runzexia 已提交
13
	"k8s.io/klog"
R
runzexia 已提交
14 15
	"kubesphere.io/kubesphere/pkg/apis/devops/v1alpha1"
	"kubesphere.io/kubesphere/pkg/informers"
16
	"kubesphere.io/kubesphere/pkg/simple/client"
R
runzexia 已提交
17 18 19 20 21 22 23
	"mime/multipart"
	"net/http"
	"reflect"
	"time"
)

const (
R
runzexia 已提交
24
	GetS2iBinaryURL = "kapis/devops.kubesphere.io/v1alpha2/namespaces/%s/s2ibinaries/%s/file/%s"
R
runzexia 已提交
25 26 27
)

func UploadS2iBinary(namespace, name, md5 string, fileHeader *multipart.FileHeader) (*v1alpha1.S2iBinary, error) {
28 29 30 31 32
	s3Client, err := client.ClientSets().S3()
	if err != nil {
		return nil, err
	}

R
runzexia 已提交
33 34
	binFile, err := fileHeader.Open()
	if err != nil {
R
runzexia 已提交
35
		klog.Errorf("%+v", err)
R
runzexia 已提交
36 37 38 39 40 41
		return nil, err
	}
	defer binFile.Close()

	origin, err := informers.KsSharedInformerFactory().Devops().V1alpha1().S2iBinaries().Lister().S2iBinaries(namespace).Get(name)
	if err != nil {
R
runzexia 已提交
42
		klog.Errorf("%+v", err)
R
runzexia 已提交
43 44 45 46 47
		return nil, err
	}
	//Check file is uploading
	if origin.Status.Phase == v1alpha1.StatusUploading {
		err := restful.NewError(http.StatusConflict, "file is uploading, please try later")
R
runzexia 已提交
48
		klog.Error(err)
R
runzexia 已提交
49 50 51 52 53 54 55 56 57 58 59 60
		return nil, err
	}
	copy := origin.DeepCopy()
	copy.Spec.MD5 = md5
	copy.Spec.Size = bytefmt.ByteSize(uint64(fileHeader.Size))
	copy.Spec.FileName = fileHeader.Filename
	copy.Spec.DownloadURL = fmt.Sprintf(GetS2iBinaryURL, namespace, name, copy.Spec.FileName)
	if origin.Status.Phase == v1alpha1.StatusReady && reflect.DeepEqual(origin, copy) {
		return origin, nil
	}

	//Set status Uploading to lock resource
R
runzexia 已提交
61
	uploading, err := SetS2iBinaryStatus(copy, v1alpha1.StatusUploading)
R
runzexia 已提交
62 63
	if err != nil {
		err := restful.NewError(http.StatusConflict, fmt.Sprintf("could not set status: %+v", err))
R
runzexia 已提交
64
		klog.Error(err)
R
runzexia 已提交
65 66
		return nil, err
	}
R
runzexia 已提交
67 68

	copy = uploading.DeepCopy()
R
runzexia 已提交
69 70 71 72
	copy.Spec.MD5 = md5
	copy.Spec.Size = bytefmt.ByteSize(uint64(fileHeader.Size))
	copy.Spec.FileName = fileHeader.Filename
	copy.Spec.DownloadURL = fmt.Sprintf(GetS2iBinaryURL, namespace, name, copy.Spec.FileName)
73 74

	s3session := s3Client.Session()
R
runzexia 已提交
75 76
	if s3session == nil {
		err := fmt.Errorf("could not connect to s2i s3")
R
runzexia 已提交
77 78 79 80 81 82
		klog.Error(err)
		_, serr := SetS2iBinaryStatusWithRetry(copy, origin.Status.Phase)
		if serr != nil {
			klog.Error(serr)
			return nil, err
		}
R
runzexia 已提交
83 84 85 86 87 88 89
		return nil, err
	}
	uploader := s3manager.NewUploader(s3session, func(uploader *s3manager.Uploader) {
		uploader.PartSize = 5 * bytefmt.MEGABYTE
		uploader.LeavePartsOnError = true
	})
	_, err = uploader.Upload(&s3manager.UploadInput{
90
		Bucket:             s3Client.Bucket(),
R
runzexia 已提交
91 92 93 94 95 96 97 98 99 100
		Key:                aws.String(fmt.Sprintf("%s-%s", namespace, name)),
		Body:               binFile,
		ContentMD5:         aws.String(md5),
		ContentDisposition: aws.String(fmt.Sprintf("attachment; filename=\"%s\"", copy.Spec.FileName)),
	})

	if err != nil {
		if aerr, ok := err.(awserr.Error); ok {
			switch aerr.Code() {
			case s3.ErrCodeNoSuchBucket:
R
runzexia 已提交
101 102
				klog.Error(err)
				_, serr := SetS2iBinaryStatusWithRetry(copy, origin.Status.Phase)
R
runzexia 已提交
103
				if serr != nil {
R
runzexia 已提交
104
					klog.Error(serr)
R
runzexia 已提交
105 106 107
				}
				return nil, err
			default:
R
runzexia 已提交
108 109
				klog.Error(err)
				_, serr := SetS2iBinaryStatusWithRetry(copy, v1alpha1.StatusUploadFailed)
R
runzexia 已提交
110
				if serr != nil {
R
runzexia 已提交
111
					klog.Error(serr)
R
runzexia 已提交
112 113 114 115
				}
				return nil, err
			}
		}
R
runzexia 已提交
116
		klog.Error(err)
R
runzexia 已提交
117 118 119 120 121 122 123
		return nil, err
	}

	if copy.Spec.UploadTimeStamp == nil {
		copy.Spec.UploadTimeStamp = new(metav1.Time)
	}
	*copy.Spec.UploadTimeStamp = metav1.Now()
124
	copy, err = client.ClientSets().K8s().KubeSphere().DevopsV1alpha1().S2iBinaries(namespace).Update(copy)
R
runzexia 已提交
125
	if err != nil {
R
runzexia 已提交
126
		klog.Error(err)
R
runzexia 已提交
127 128 129
		return nil, err
	}

R
runzexia 已提交
130
	copy, err = SetS2iBinaryStatusWithRetry(copy, v1alpha1.StatusReady)
R
runzexia 已提交
131
	if err != nil {
R
runzexia 已提交
132
		klog.Error(err)
R
runzexia 已提交
133 134
		return nil, err
	}
R
runzexia 已提交
135
	return copy, nil
R
runzexia 已提交
136 137 138
}

func DownloadS2iBinary(namespace, name, fileName string) (string, error) {
139 140 141 142 143
	s3Client, err := client.ClientSets().S3()
	if err != nil {
		return "", err
	}

R
runzexia 已提交
144 145
	origin, err := informers.KsSharedInformerFactory().Devops().V1alpha1().S2iBinaries().Lister().S2iBinaries(namespace).Get(name)
	if err != nil {
R
runzexia 已提交
146
		klog.Errorf("%+v", err)
R
runzexia 已提交
147 148 149 150
		return "", err
	}
	if origin.Spec.FileName != fileName {
		err := fmt.Errorf("could not fould file %s", fileName)
R
runzexia 已提交
151
		klog.Error(err)
R
runzexia 已提交
152 153 154 155
		return "", err
	}
	if origin.Status.Phase != v1alpha1.StatusReady {
		err := restful.NewError(http.StatusBadRequest, "file is not ready, please try later")
R
runzexia 已提交
156
		klog.Error(err)
R
runzexia 已提交
157 158
		return "", err
	}
159 160 161

	req, _ := s3Client.Client().GetObjectRequest(&s3.GetObjectInput{
		Bucket:                     s3Client.Bucket(),
R
runzexia 已提交
162 163 164 165 166
		Key:                        aws.String(fmt.Sprintf("%s-%s", namespace, name)),
		ResponseContentDisposition: aws.String(fmt.Sprintf("attachment; filename=\"%s\"", origin.Spec.FileName)),
	})
	url, err := req.Presign(5 * time.Minute)
	if err != nil {
R
runzexia 已提交
167
		klog.Error(err)
R
runzexia 已提交
168 169 170 171 172 173 174 175 176
		return "", err
	}
	return url, nil

}

func SetS2iBinaryStatus(s2ibin *v1alpha1.S2iBinary, status string) (*v1alpha1.S2iBinary, error) {
	copy := s2ibin.DeepCopy()
	copy.Status.Phase = status
177
	copy, err := client.ClientSets().K8s().KubeSphere().DevopsV1alpha1().S2iBinaries(s2ibin.Namespace).Update(copy)
R
runzexia 已提交
178
	if err != nil {
R
runzexia 已提交
179
		klog.Error(err)
R
runzexia 已提交
180 181 182 183 184 185 186 187 188 189 190 191
		return nil, err
	}
	return copy, nil
}

func SetS2iBinaryStatusWithRetry(s2ibin *v1alpha1.S2iBinary, status string) (*v1alpha1.S2iBinary, error) {

	var bin *v1alpha1.S2iBinary
	var err error
	err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
		bin, err = informers.KsSharedInformerFactory().Devops().V1alpha1().S2iBinaries().Lister().S2iBinaries(s2ibin.Namespace).Get(s2ibin.Name)
		if err != nil {
R
runzexia 已提交
192
			klog.Error(err)
R
runzexia 已提交
193 194 195
			return err
		}
		bin.Status.Phase = status
196
		bin, err = client.ClientSets().K8s().KubeSphere().DevopsV1alpha1().S2iBinaries(s2ibin.Namespace).Update(bin)
R
runzexia 已提交
197
		if err != nil {
R
runzexia 已提交
198
			klog.Error(err)
R
runzexia 已提交
199 200 201 202 203
			return err
		}
		return nil
	})
	if err != nil {
R
runzexia 已提交
204
		klog.Error(err)
R
runzexia 已提交
205 206 207 208 209
		return nil, err
	}

	return bin, nil
}