提交 d5195233 编写于 作者: D Davies Liu

remote tmp file in hdfs

上级 b9464ced
......@@ -5,6 +5,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"os/user"
"path/filepath"
......@@ -100,27 +101,34 @@ func (h *hdfsclient) Get(key string, off, limit int64) (io.ReadCloser, error) {
return f, nil
}
const abcException = "org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException"
func (h *hdfsclient) Put(key string, in io.Reader) error {
path := h.path(key)
if strings.HasSuffix(path, dirSuffix) {
return h.c.MkdirAll(path, os.FileMode(0755))
}
tmp := filepath.Join(filepath.Dir(path), "."+filepath.Base(path)+".tmp")
tmp := filepath.Join(filepath.Dir(path), fmt.Sprintf(".%s.tmp.%d", filepath.Base(path), rand.Int()))
f, err := h.c.CreateFile(tmp, 3, 128<<20, 0755)
defer h.c.Remove(tmp)
if err != nil {
if pe, ok := err.(*os.PathError); ok && pe.Err == os.ErrNotExist {
h.c.MkdirAll(filepath.Dir(path), 0755)
f, err = h.c.CreateFile(tmp, 3, 128<<20, 0755)
}
if pe, ok := err.(*os.PathError); ok && pe.Err == os.ErrExist {
h.c.Remove(tmp)
f, err = h.c.CreateFile(tmp, 3, 128<<20, 0755)
if pe, ok := err.(*os.PathError); ok {
if remoteErr, ok := pe.Err.(hdfs.Error); ok && remoteErr.Exception() == abcException {
pe.Err = os.ErrExist
}
if pe.Err == os.ErrExist {
h.c.Remove(tmp)
f, err = h.c.CreateFile(tmp, 3, 128<<20, 0755)
}
}
if err != nil {
return err
}
}
defer h.c.Remove(tmp)
_, err = io.Copy(f, in)
if err != nil {
f.Close()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册