提交 929810ce 编写于 作者: D DoMyJob

Return conflict index items iff batch write failed on index confliction.

Signed-off-by: NDoMyJob <DoMyJob@Yotta>
上级 70ebd46c
......@@ -225,29 +225,29 @@ func (c *Context) BatchPut(cnt int, valueArray []byte) (uint32, error) {
// TODO: Can we leave this check to disk??
if err := c.fastforward(cnt, false); err != nil {
return 0, err
return 0, err
}
var err error
var index uint32
if (c.sp.posIdx + uint32(cnt) <= c.storages[c.sp.dev].Cap) {
index, err = c.putAt(valueArray, c.sp)
index, err = c.putAt(valueArray, c.sp)
} else {
currentSP := *c.sp;
step1 := c.storages[currentSP.dev].Cap - currentSP.posIdx
index, err = c.putAt(valueArray[:step1*c.config.DataBlockSize], &currentSP)
step2 := uint32(cnt) - step1
currentSP.dev++
currentSP.posIdx = 0
currentSP.index += step1
if (currentSP.posIdx + uint32(step2) > c.storages[currentSP.dev].Cap) {
return 0, errors.New("Batch across 3 storage devices, not supported")
}
_, err = c.putAt(valueArray[step1*c.config.DataBlockSize:], &currentSP)
currentSP := *c.sp;
step1 := c.storages[currentSP.dev].Cap - currentSP.posIdx
index, err = c.putAt(valueArray[:step1*c.config.DataBlockSize], &currentSP)
step2 := uint32(cnt) - step1
currentSP.dev++
currentSP.posIdx = 0
currentSP.index += step1
if (currentSP.posIdx + uint32(step2) > c.storages[currentSP.dev].Cap) {
return 0, errors.New("Batch across 3 storage devices, not supported")
}
_, err = c.putAt(valueArray[step1*c.config.DataBlockSize:], &currentSP)
}
if err != nil {
return 0, err
return 0, err
}
c.fastforward(cnt, true)
return index, nil
......
......@@ -48,7 +48,7 @@ func (db *IndexDB) Put(key ydcommon.IndexTableKey, value ydcommon.IndexTableValu
}
// BatchPut add a set of new key value pairs to db.
func (db *IndexDB) BatchPut(kvPairs []ydcommon.IndexItem) error {
func (db *IndexDB) BatchPut(kvPairs []ydcommon.IndexItem) (map[ydcommon.IndexTableKey]byte, error) {
// sorr kvPair by hash entry to make sure write in sequence.
sort.Slice(kvPairs, func(i, j int) bool {
return db.indexFile.GetTableEntryIndex(kvPairs[i].Hash) < db.indexFile.GetTableEntryIndex(kvPairs[j].Hash)
......
......@@ -202,26 +202,26 @@ func stressBatchWrite(ytfs *ytfs.YTFS) error {
batch := map[ydcommon.IndexTableKey][]byte{}
i := (uint64)(0)
for ; i < dataCaps; i++ {
printProgress(i, dataCaps-1)
testHash := common.HexToHash(fmt.Sprintf("%032X", i))
data := make([]byte, ytfs.Meta().DataBlockSize, ytfs.Meta().DataBlockSize)
copy(data, testHash[:])
batch[ydcommon.IndexTableKey(testHash)] = data
if (i + 1) % 17 == 0 {
err := ytfs.BatchPut(batch)
if err != nil {
panic(err)
}
batch = map[ydcommon.IndexTableKey][]byte{}
printProgress(i, dataCaps-1)
testHash := common.HexToHash(fmt.Sprintf("%032X", i))
data := make([]byte, ytfs.Meta().DataBlockSize, ytfs.Meta().DataBlockSize)
copy(data, testHash[:])
batch[ydcommon.IndexTableKey(testHash)] = data
if (i + 1) % 17 == 0 {
_, err := ytfs.BatchPut(batch)
if err != nil {
panic(err)
}
batch = map[ydcommon.IndexTableKey][]byte{}
}
}
if len(batch) > 0 {
err := ytfs.BatchPut(batch)
if err != nil {
panic(fmt.Errorf("%v at last input", err))
}
_, err := ytfs.BatchPut(batch)
if err != nil {
panic(fmt.Errorf("%v at last input", err))
}
}
fmt.Println(ytfs)
......
......@@ -254,17 +254,30 @@ func (indexFile *YTFSIndexFile) Put(key ydcommon.IndexTableKey, value ydcommon.I
}
// BatchPut saves a key value pair.
func (indexFile *YTFSIndexFile) BatchPut(kvPairs []ydcommon.IndexItem) error {
func (indexFile *YTFSIndexFile) BatchPut(kvPairs []ydcommon.IndexItem) (map[ydcommon.IndexTableKey]byte, error) {
locker, _ := indexFile.store.Lock()
defer locker.Unlock()
dataWritten := uint64(0)
conflicts := map[ydcommon.IndexTableKey]byte{}
for _, kvPair := range kvPairs {
indexFile.updateTable(kvPair.Hash, kvPair.OffsetIdx);
err := indexFile.updateTable(kvPair.Hash, kvPair.OffsetIdx);
if err != nil {
if err == errors.ErrConflict {
conflicts[kvPair.Hash]=1
} else {
return nil, err
}
}
dataWritten++;
}
return indexFile.updateMeta(dataWritten)
if len(conflicts) != 0 {
return conflicts, errors.ErrConflict
}
return nil, indexFile.updateMeta(dataWritten)
}
func (indexFile *YTFSIndexFile) updateMeta(dataWritten uint64) error {
......
......@@ -212,12 +212,12 @@ func (ytfs *YTFS) saveCurrentYTFS() {
// It panics if there exists any previous value for that key as YottaDisk is not a multi-map.
// It is safe to modify the contents of the arguments after Put returns but not
// before.
func (ytfs *YTFS) BatchPut(batch map[ydcommon.IndexTableKey][]byte) error {
func (ytfs *YTFS) BatchPut(batch map[ydcommon.IndexTableKey][]byte) (map[ydcommon.IndexTableKey]byte, error) {
ytfs.mutex.Lock()
defer ytfs.mutex.Unlock()
if (len(batch) > 32) {
return fmt.Errorf("Batch Size is too big")
return nil, fmt.Errorf("Batch Size is too big")
}
// NO get check, but retore all status if error
......@@ -238,7 +238,7 @@ func (ytfs *YTFS) BatchPut(batch map[ydcommon.IndexTableKey][]byte) error {
startPos, err := ytfs.context.BatchPut(bufCnt, batchBuffer);
if err != nil {
ytfs.restoreYTFS();
return err
return nil, err
}
for i:=uint32(0); i<uint32(bufCnt); i++ {
......@@ -247,12 +247,12 @@ func (ytfs *YTFS) BatchPut(batch map[ydcommon.IndexTableKey][]byte) error {
OffsetIdx: ydcommon.IndexTableValue(startPos + i)}
}
err = ytfs.db.BatchPut(batchIndexes)
conflicts, err := ytfs.db.BatchPut(batchIndexes)
if err != nil {
ytfs.restoreYTFS();
return err
return conflicts, err
}
return nil
return nil, nil
}
// Meta reports current meta information.
......
......@@ -176,7 +176,7 @@ func TestYTFSFullBatchWriteRead(t *testing.T) {
copy(buf, testHash[:])
batch[testHash] = buf
if i > 0 && i % 7 == 0 {
err := ytfs.BatchPut(batch)
_, err := ytfs.BatchPut(batch)
if err != nil {
panic(fmt.Sprintf("Error: %v in %d insert", err, i))
}
......@@ -185,7 +185,7 @@ func TestYTFSFullBatchWriteRead(t *testing.T) {
}
if len(batch) > 0 {
err := ytfs.BatchPut(batch)
_, err := ytfs.BatchPut(batch)
if err != nil {
panic(fmt.Sprintf("Error: %v in last insert", err))
}
......@@ -206,6 +206,58 @@ func TestYTFSFullBatchWriteRead(t *testing.T) {
}
}
func TestYTFSBatchConflictReport(t *testing.T) {
rootDir, err := ioutil.TempDir("/tmp", "ytfsTest")
config := opt.DefaultOptions()
ytfs, err := Open(rootDir, config)
if err != nil {
t.Fatal(err)
}
defer ytfs.Close()
dataCaps := ytfs.Cap()
fmt.Printf("Starting insert %d data blocks\n", dataCaps)
batch := map[types.IndexTableKey][]byte{}
for i := (uint64)(0); i <= 7; i++ {
testHash := (types.IndexTableKey)(common.HexToHash(fmt.Sprintf("%032X", i)))
buf := make([]byte, config.DataBlockSize)
copy(buf, testHash[:])
batch[testHash] = buf
if i > 0 && i % 7 == 0 {
_, err := ytfs.BatchPut(batch)
if err != nil {
panic(fmt.Sprintf("Error: %v in %d insert", err, i))
}
// remove 1 item
testRemoveHash := (types.IndexTableKey)(common.HexToHash(fmt.Sprintf("%032X", 3)))
delete(batch, testRemoveHash)
// add 1 new
testNewHash := (types.IndexTableKey)(common.HexToHash(fmt.Sprintf("%032X", 8)))
buf := make([]byte, config.DataBlockSize)
copy(buf, testNewHash[:])
batch[testNewHash] = buf
conflicts, err := ytfs.BatchPut(batch)
if err != errors.ErrConflict {
panic(fmt.Sprintf("Error: Expected err errors.ErrConflict but get %v.", err))
} else {
if len(conflicts) != len(batch) - 1 {
panic(fmt.Sprintf("Error: conflicts has %d items but batch has %d.", len(conflicts), len(batch)))
}
for k := range conflicts {
if _, ok := batch[k]; !ok {
panic(fmt.Sprintf("Error: conflicts mismatch with batch on %v.", k))
}
}
if _, ok := conflicts[testNewHash]; ok {
panic(fmt.Sprintf("Error: conflicts should not have %v.", testNewHash))
}
}
}
}
}
func TestYTFSConcurrentAccessWriteSameKey(t *testing.T) {
rootDir, err := ioutil.TempDir("/tmp", "ytfsTest")
config := opt.DefaultOptions()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册