diff --git a/object/hdfs.go b/object/hdfs.go index 5d7f948b8de45508035595e9b0afc9faf69448a7..63a90b6999eb784b0e99723cc8fe22877950aa01 100644 --- a/object/hdfs.go +++ b/object/hdfs.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "io/ioutil" + "math/rand" "os" "os/user" "path/filepath" @@ -100,27 +101,34 @@ func (h *hdfsclient) Get(key string, off, limit int64) (io.ReadCloser, error) { return f, nil } +const abcException = "org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException" + func (h *hdfsclient) Put(key string, in io.Reader) error { path := h.path(key) if strings.HasSuffix(path, dirSuffix) { return h.c.MkdirAll(path, os.FileMode(0755)) } - tmp := filepath.Join(filepath.Dir(path), "."+filepath.Base(path)+".tmp") + tmp := filepath.Join(filepath.Dir(path), fmt.Sprintf(".%s.tmp.%d", filepath.Base(path), rand.Int())) f, err := h.c.CreateFile(tmp, 3, 128<<20, 0755) + defer h.c.Remove(tmp) if err != nil { if pe, ok := err.(*os.PathError); ok && pe.Err == os.ErrNotExist { h.c.MkdirAll(filepath.Dir(path), 0755) f, err = h.c.CreateFile(tmp, 3, 128<<20, 0755) } - if pe, ok := err.(*os.PathError); ok && pe.Err == os.ErrExist { - h.c.Remove(tmp) - f, err = h.c.CreateFile(tmp, 3, 128<<20, 0755) + if pe, ok := err.(*os.PathError); ok { + if remoteErr, ok := pe.Err.(hdfs.Error); ok && remoteErr.Exception() == abcException { + pe.Err = os.ErrExist + } + if pe.Err == os.ErrExist { + h.c.Remove(tmp) + f, err = h.c.CreateFile(tmp, 3, 128<<20, 0755) + } } if err != nil { return err } } - defer h.c.Remove(tmp) _, err = io.Copy(f, in) if err != nil { f.Close()