提交 c0312e42 编写于 作者: X xiaojianming

ytfs write position

上级 dc152b0d
......@@ -7,6 +7,7 @@ type DB interface {
Get(key ydcommon.IndexTableKey) (ydcommon.IndexTableValue, error)
Put(key ydcommon.IndexTableKey, value ydcommon.IndexTableValue) error
BatchPut(kvPairs []ydcommon.IndexItem) (map[ydcommon.IndexTableKey]byte, error)
UpdateMeta(account uint64) error
Len() uint64
TotalSize() uint64
BlockSize() uint32
......
......@@ -94,6 +94,10 @@ func (db *IndexDB) Put(key ydcommon.IndexTableKey, value ydcommon.IndexTableValu
return err
}
func (db *IndexDB)UpdateMeta(accout uint64) error{
return db.indexFile.UpdateMeta(accout)
}
// BatchPut add a set of new key value pairs to db.
func (db *IndexDB) BatchPut(kvPairs []ydcommon.IndexItem) (map[ydcommon.IndexTableKey]byte, error) {
// sorr kvPair by hash entry to make sure write in sequence.
......
......@@ -215,9 +215,19 @@ func initializeHeader( config *opt.Options) (*ydcommon.Header, error) {
return &header, nil
}
func (rd *KvDB) UpdateMeta(account uint64) error {
fmt.Println("[rockspos] BatchPut PosIdx=",rd.PosIdx,"account=",account)
rd.PosIdx = ydcommon.IndexTableValue(uint32(rd.PosIdx) + uint32(account))
binary.LittleEndian.PutUint32(valbuf, uint32(rd.PosIdx))
err := rd.Rdb.Put(rd.wo,rd.PosKey[:],valbuf)
if err != nil {
fmt.Println("update write pos to db error:", err)
}
return err
}
func (rd *KvDB) BatchPut(kvPairs []ydcommon.IndexItem) (map[ydcommon.IndexTableKey]byte, error) {
// keyValue:=make(map[ydcommon.IndexTableKey]ydcommon.IndexTableValue,len(batch))
i := 0
valbuf := make([]byte, 4)
for _,value := range kvPairs{
HKey := value.Hash[:]
......@@ -229,17 +239,8 @@ func (rd *KvDB) BatchPut(kvPairs []ydcommon.IndexItem) (map[ydcommon.IndexTableK
fmt.Println("[rocksdb]put dnhash to rocksdb error:", err)
return nil, err
}
i++
}
fmt.Println("[rockspos] BatchPut PosIdx=",rd.PosIdx,"i=",i)
rd.PosIdx = ydcommon.IndexTableValue(uint32(rd.PosIdx) + uint32(i))
binary.LittleEndian.PutUint32(valbuf, uint32(rd.PosIdx))
err := rd.Rdb.Put(rd.wo,rd.PosKey[:],valbuf)
if err != nil {
fmt.Println("update write pos to db error:", err)
return nil, err
}
//fmt.Printf("[noconflict] write success batch_write_time: %d ms, batch_len %d", time.Now().Sub(begin).Milliseconds(), bufCnt)
return nil, nil
}
......
......@@ -345,7 +345,7 @@ func (indexFile *YTFSIndexFile) BatchPut(kvPairs []ydcommon.IndexItem) (map[ydco
locker, _ := indexFile.store.Lock()
defer locker.Unlock()
dataWritten := uint64(0)
//dataWritten := uint64(0)
conflicts := map[ydcommon.IndexTableKey]byte{}
for _, kvPair := range kvPairs {
err := indexFile.updateTable(kvPair.Hash, kvPair.OffsetIdx)
......@@ -356,15 +356,16 @@ func (indexFile *YTFSIndexFile) BatchPut(kvPairs []ydcommon.IndexItem) (map[ydco
return conflicts, err
}
}
dataWritten++
//dataWritten++
}
//if len(conflicts) != 0 {
// return conflicts, errors.ErrConflict
//}
return conflicts, indexFile.updateMeta(dataWritten)
//pos metadata has been updated before this func
return conflicts,nil
//return conflicts, indexFile.updateMeta(dataWritten)
}
func (indexFile *YTFSIndexFile) UpdateMeta(dataWritten uint64) error {
......
......@@ -343,6 +343,13 @@ func (ytfs *YTFS) BatchPut(batch map[ydcommon.IndexTableKey][]byte) (map[ydcommo
return nil, err
}
//update the write position to db
err = ytfs.db.UpdateMeta(uint64(bufCnt))
if err != nil {
fmt.Println("update position error:",err)
return nil,err
}
for i := uint32(0); i < uint32(bufCnt); i++ {
batchIndexes[i] = ydcommon.IndexItem{
Hash: batchIndexes[i].Hash,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册