提交 8551ec9e 编写于 作者: O openeuler-ci-bot 提交者: Gitee

!10 exporter: refactor exporer make it more general and widely use

Merge pull request !10 from DCCooper/save
......@@ -260,6 +260,8 @@ func runBuild(ctx context.Context, cli Cli) (string, error) {
content string
dest string
isIsulad bool
imageID string
msg *pb.BuildResponse
)
if dest, isIsulad, err = checkAndProcessOutput(); err != nil {
......@@ -299,15 +301,38 @@ func runBuild(ctx context.Context, cli Cli) (string, error) {
return msg.ImageID, nil
}
imageID, err := exporter.ArchiveRecv(ctx, dest, isIsulad, budStream.Recv)
if err != nil {
return "", err
}
ch := make(chan []byte, constant.BufferSize)
eg, _ := errgroup.WithContext(ctx)
eg.Go(func() error {
defer close(ch)
for {
msg, err = budStream.Recv()
if msg != nil {
imageID = msg.ImageID
}
if err == io.EOF {
break
}
if err != nil {
imageID = ""
return err
}
ch <- msg.Data
}
return nil
})
eg.Go(func() error {
if err = exporter.ArchiveRecv(ctx, dest, isIsulad, ch); err != nil {
return err
}
return nil
})
return imageID, nil
return imageID, eg.Wait()
}
// encryptes those sensitive args before transmissing via GRPC
// encrypts those sensitive args before transporting via GRPC
func encryptBuildArgs() error {
var hasSensiArg bool
for _, v := range buildOpts.buildArgs {
......
......@@ -56,6 +56,9 @@ const (
// BuildContainerImageType is the default build type
BuildContainerImageType = "ctr-img"
// BufferSize is default buffer size for file transportation
BufferSize = 32 * 1024
)
var (
......
......@@ -51,6 +51,7 @@ func (b *Backend) Build(req *pb.BuildRequest, stream pb.Control_BuildServer) (er
imageID string
pipeFile string
eg *errgroup.Group
fileChan chan []byte
errc = make(chan error, 1)
)
......@@ -76,9 +77,26 @@ func (b *Backend) Build(req *pb.BuildRequest, stream pb.Control_BuildServer) (er
return err
})
eg.Go(func() error {
return exporter.PipeArchiveStream(req.BuildID, pipeWrapper, stream.Send)
if pipeWrapper == nil {
return nil
}
fileChan, err = exporter.PipeArchiveStream(req.BuildID, pipeWrapper)
if err != nil {
return err
}
for c := range fileChan {
if err = stream.Send(&pb.BuildResponse{
Data: c,
}); err != nil {
return err
}
}
return pipeWrapper.Err
})
go func() {
errc <- eg.Wait()
}()
......
......@@ -37,7 +37,6 @@ import (
"github.com/sirupsen/logrus"
constant "isula.org/isula-build"
pb "isula.org/isula-build/api/services"
"isula.org/isula-build/image"
"isula.org/isula-build/store"
"isula.org/isula-build/util"
......@@ -46,7 +45,6 @@ import (
const (
// Uncompressed represents uncompressed
Uncompressed = archive.Uncompressed
bufferSize = 32 * 1024
)
// ExportOptions is a struct for exporter
......@@ -179,6 +177,7 @@ func newPolicyContext(sc *types.SystemContext) (*signature.PolicyContext, error)
type PipeWrapper struct {
PipeFile string
Done bool
Err error
}
// Close set the done flag for this pip
......@@ -199,50 +198,54 @@ func NewPipeWrapper(runDir, expt string) (*PipeWrapper, error) {
}
// PipeArchiveStream pipes the GRPC stream with pipeFile
func PipeArchiveStream(buildID string, pipeWrapper *PipeWrapper, send func(*pb.BuildResponse) error) (err error) {
func PipeArchiveStream(buildID string, pipeWrapper *PipeWrapper) (fc chan []byte, err error) {
var (
file *os.File
length int
)
if pipeWrapper == nil || pipeWrapper.PipeFile == "" {
return nil
return nil, errors.New("no pipe wrapper found")
}
var file *os.File
if file, err = os.OpenFile(pipeWrapper.PipeFile, os.O_RDONLY, os.ModeNamedPipe); err != nil {
return err
return nil, err
}
defer func() {
if cerr := file.Close(); cerr != nil {
logrus.WithField(util.LogKeyBuildID, buildID).Warnf("Closing archive stream pipe %q failed: %v", pipeWrapper.PipeFile, cerr)
}
}()
reader := bufio.NewReader(file)
buf := make([]byte, bufferSize, bufferSize)
var length int
for {
if length, err = reader.Read(buf); err != nil && err != io.EOF {
return err
}
if err = send(&pb.BuildResponse{
Data: buf[0:length],
}); err != nil {
return err
}
if length == 0 && pipeWrapper.Done {
break
buf := make([]byte, constant.BufferSize, constant.BufferSize)
fc = make(chan []byte, constant.BufferSize)
go func() {
defer func() {
if cerr := file.Close(); cerr != nil {
logrus.WithField(util.LogKeyBuildID, buildID).Warnf("Closing archive stream pipe %q failed: %v", pipeWrapper.PipeFile, cerr)
}
close(fc)
}()
for {
if length, err = reader.Read(buf); err != nil && err != io.EOF {
pipeWrapper.Err = err
}
bytes := make([]byte, length, length)
copy(bytes, buf[0:length])
fc <- bytes
if length == 0 && pipeWrapper.Done {
break
}
}
}
}()
logrus.WithField(util.LogKeyBuildID, buildID).Debugf("Piping archive stream done")
return nil
return fc, nil
}
// ArchiveRecv receive data stream and write to file
func ArchiveRecv(ctx context.Context, dest string, isIsulad bool, recv func() (*pb.BuildResponse, error)) (string, error) {
func ArchiveRecv(ctx context.Context, dest string, isIsulad bool, fc chan []byte) error {
var (
err error
msg *pb.BuildResponse
)
fo, err := os.Create(dest)
if err != nil {
return "", err
return err
}
defer func() {
......@@ -257,38 +260,27 @@ func ArchiveRecv(ctx context.Context, dest string, isIsulad bool, recv func() (*
}()
if err = fo.Chmod(constant.DefaultRootFileMode); err != nil {
return "", err
return err
}
w := bufio.NewWriter(fo)
imageID := ""
for {
msg, err = recv()
if msg != nil {
imageID = msg.ImageID
}
if err == io.EOF {
break
}
if err != nil {
return "", err
}
if _, werr := w.Write(msg.Data); werr != nil {
return "", werr
for bytes := range fc {
if _, werr := w.Write(bytes); werr != nil {
return err
}
}
if err = w.Flush(); err != nil {
return "", errors.Errorf("flush buffer failed: %v", err)
return errors.Errorf("flush buffer failed: %v", err)
}
if isIsulad {
// dest here will not be influenced by external input, no security risk
cmd := exec.CommandContext(ctx, "isula", "load", "-i", dest) // nolint:gosec
if bytes, lerr := cmd.CombinedOutput(); lerr != nil {
return "", errors.Errorf("load image to isulad failed, stderr: %v, err: %v", string(bytes), lerr)
return errors.Errorf("load image to isulad failed, stderr: %v, err: %v", string(bytes), lerr)
}
}
return imageID, nil
return nil
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册