未验证 提交 8bbfb3b8 编写于 作者: S Sandy Xu 提交者: GitHub

meta/tikv: add CopyFileRange (#631)

上级 1f486257
......@@ -171,17 +171,17 @@ func (m *kvMeta) fmtKey(args ...interface{}) []byte {
session ssssssss
All keys:
setting format
C... counter
AiiiiiiiiI inode attribute
AiiiiiiiiD... dentry
AiiiiiiiiCnnnn file chunks
AiiiiiiiiS symlink target
AiiiiiiiiX... extented attribute
Diiiiiiiinnnn delete inodes
Kccccccccnnnn slice refs
SHssssssss session heartbeat
SIssssssss session info
setting format
C... counter
AiiiiiiiiI inode attribute
AiiiiiiiiD... dentry
AiiiiiiiiCnnnn file chunks
AiiiiiiiiS symlink target
AiiiiiiiiX... extented attribute
Diiiiiiiinnnn delete inodes
Kccccccccnnnn slice refs
SHssssssss session heartbeat
SIssssssss session info
SSssssssssiiiiiiii sustained inode
*/
......@@ -1720,7 +1720,105 @@ func (m *kvMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice Sl
}
func (m *kvMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, offOut uint64, size uint64, flags uint32, copied *uint64) syscall.Errno {
return syscall.ENOTSUP
var newSpace int64
defer func() { m.of.InvalidateChunk(fout, 0xFFFFFFFF) }()
err := m.txn(func(tx kvTxn) error {
rs := tx.gets(m.inodeKey(fin), m.inodeKey(fout))
if rs[0] == nil || rs[1] == nil {
return syscall.ENOENT
}
var sattr Attr
m.parseAttr(rs[0], &sattr)
if sattr.Typ != TypeFile {
return syscall.EINVAL
}
if offIn >= sattr.Length {
*copied = 0
return nil
}
if offIn+size > sattr.Length {
size = sattr.Length - offIn
}
var attr Attr
m.parseAttr(rs[1], &attr)
if attr.Typ != TypeFile {
return syscall.EINVAL
}
newleng := offOut + size
if newleng > attr.Length {
newSpace = align4K(newleng) - align4K(attr.Length)
attr.Length = newleng
}
if m.checkQuota(newSpace, 0) {
return syscall.ENOSPC
}
now := time.Now()
attr.Mtime = now.Unix()
attr.Mtimensec = uint32(now.Nanosecond())
attr.Ctime = now.Unix()
attr.Ctimensec = uint32(now.Nanosecond())
vals := tx.scanRange(m.chunkKey(fin, uint32(offIn/ChunkSize)), m.chunkKey(fin, uint32(offIn+size/ChunkSize)+1))
chunks := make(map[uint32][]*slice)
for indx := uint32(offIn / ChunkSize); indx <= uint32((offIn+size)/ChunkSize); indx++ {
if v, ok := vals[string(m.chunkKey(fin, indx))]; ok {
chunks[indx] = readSliceBuf(v)
}
}
coff := offIn / ChunkSize * ChunkSize
for coff < offIn+size {
if coff%ChunkSize != 0 {
panic("coff")
}
// Add a zero chunk for hole
ss := append([]*slice{{len: ChunkSize}}, chunks[uint32(coff/ChunkSize)]...)
cs := buildSlice(ss)
for _, s := range cs {
pos := coff
coff += uint64(s.Len)
if pos < offIn+size && pos+uint64(s.Len) > offIn {
if pos < offIn {
dec := offIn - pos
s.Off += uint32(dec)
pos += dec
s.Len -= uint32(dec)
}
if pos+uint64(s.Len) > offIn+size {
dec := pos + uint64(s.Len) - (offIn + size)
s.Len -= uint32(dec)
}
doff := pos - offIn + offOut
indx := uint32(doff / ChunkSize)
dpos := uint32(doff % ChunkSize)
if dpos+s.Len > ChunkSize {
tx.append(m.chunkKey(fout, indx), marshalSlice(dpos, s.Chunkid, s.Size, s.Off, ChunkSize-dpos))
if s.Chunkid > 0 {
tx.incrBy(m.sliceKey(s.Chunkid, s.Size), 1)
}
skip := ChunkSize - dpos
tx.append(m.chunkKey(fout, indx+1), marshalSlice(0, s.Chunkid, s.Size, s.Off+skip, s.Len-skip))
if s.Chunkid > 0 {
tx.incrBy(m.sliceKey(s.Chunkid, s.Size), 1)
}
} else {
tx.append(m.chunkKey(fout, indx), marshalSlice(dpos, s.Chunkid, s.Size, s.Off, s.Len))
if s.Chunkid > 0 {
tx.incrBy(m.sliceKey(s.Chunkid, s.Size), 1)
}
}
}
}
}
tx.set(m.inodeKey(fout), m.marshal(&attr))
*copied = size
return nil
})
if err == nil {
m.updateStats(newSpace, 0)
}
return errno(err)
}
/* TODO
......
......@@ -35,7 +35,7 @@ func TestTiKVClient(t *testing.T) {
// testLocks(t, m)
testConcurrentWrite(t, m)
// testCompaction(t, m)
// testCopyFileRange(t, m)
testCopyFileRange(t, m)
m.(*kvMeta).conf.CaseInsensi = true
testCaseIncensi(t, m)
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册