提交 8338b479 编写于 作者: X xiaojianming

solve merge conflict

......@@ -75,6 +75,15 @@ func (db *IndexDB) Reset() {
}
func validateDBSchema(meta *ydcommon.Header, opt *opt.Options) error {
if opt.UseKvDb {
fmt.Println("[rocksdb] using rocksdb")
if meta.DataBlockSize != opt.DataBlockSize {
fmt.Println("[rocksdb] config datablock size miss match")
return ErrConfigIndexMismatch
}
return nil
}
if meta.YtfsCapability != opt.TotalVolumn || meta.DataBlockSize != opt.DataBlockSize {
return ErrConfigIndexMismatch
}
......
......@@ -41,6 +41,8 @@ type Options struct {
IndexTableRows uint32 `json:"N"`
DataBlockSize uint32 `json:"D"`
TotalVolumn uint64 `json:"C"`
//updata to leveldb
UseKvDb bool
}
// Equal compares 2 Options to tell if it is equal
......
package ytfs
import (
"encoding/binary"
"encoding/json"
"fmt"
"github.com/linux-go/go1.13.5.linux-amd64/go/src/time"
// "github.com/syndtr/goleveldb/leveldb"
"github.com/tecbot/gorocksdb"
// "github.com/linux-go/go1.13.5.linux-amd64/go/src/time"
"github.com/mr-tron/base58/base58"
"github.com/yottachain/YTDataNode/util"
......@@ -17,17 +19,27 @@ import (
// log "github.com/yottachain/YTDataNode/logger"
)
var mdbFileName = "/maindb"
type ytfsStatus struct {
ctxSP *storagePointer
//TODO: index status
}
type KvDB struct{
Rdb *gorocksdb.DB
ro *gorocksdb.ReadOptions
wo *gorocksdb.WriteOptions
}
// YTFS is a data block save/load lib based on key-value styled db APIs.
type YTFS struct {
// config of this YTFS
config *opt.Options
// key-value db which saves hash <-> position
db *IndexDB
// main rocksdb
mdb *KvDB //todo xiaojm
// running context
context *Context
// lock of YTFS
......@@ -85,6 +97,39 @@ func NewYTFS(dir string, config *opt.Options) (*YTFS, error) {
return ytfs, nil
}
//func openKVDB(DBPath string) (db *leveldb.DB,err error){
// db,err = leveldb.OpenFile(DBPath,nil)
// if err != nil{
// fmt.Printf("open DB:%s error",DBPath)
// return nil,err
// }
// return db,err
//}
func openKVDB(DBPath string) (kvdb *KvDB,err error){
// 使用 gorocksdb 连接 RocksDB
bbto := gorocksdb.NewDefaultBlockBasedTableOptions()
bbto.SetBlockCache(gorocksdb.NewLRUCache(3 << 30))
opts := gorocksdb.NewDefaultOptions()
opts.SetBlockBasedTableFactory(bbto)
opts.SetCreateIfMissing(true)
// 设置输入目标数据库文件(可自行配置,./db 为当前测试文件的目录下的 db 文件夹)
db, err := gorocksdb.OpenDb(opts, DBPath)
if err != nil {
fmt.Println("[kvdb] open rocksdb error")
return nil,err
}
// 创建输入输出流
ro := gorocksdb.NewDefaultReadOptions()
wo := gorocksdb.NewDefaultWriteOptions()
return &KvDB {
Rdb: db,
ro: ro,
wo: wo,
},err
}
func openYTFS(dir string, config *opt.Options) (*YTFS, error) {
//TODO: file lock to avoid re-open.
//1. open system dir for YTFS
......@@ -112,6 +157,14 @@ func openYTFS(dir string, config *opt.Options) (*YTFS, error) {
return nil, err
}
//open main kv-db
mainDBPath := path.Join(dir,mdbFileName)
mDB,err := openKVDB(mainDBPath)
if err != nil {
fmt.Println("[KVDB]open main kv-DB for save hash error:",err)
return nil,err
}
// open index db
indexDB, err := NewIndexDB(dir, config)
if err != nil {
......@@ -125,11 +178,13 @@ func openYTFS(dir string, config *opt.Options) (*YTFS, error) {
}
ytfs := &YTFS{
config: config,
mdb: mDB,
db: indexDB,
context: context,
mutex: new(sync.Mutex),
}
ytfs.config.UseKvDb = true
fmt.Println("Open YTFS success @" + dir)
return ytfs, nil
}
......@@ -167,14 +222,34 @@ func validateYTFSSchema(meta *ydcommon.Header, opt *opt.Options) (*ydcommon.Head
// of the returned slice.
// It is safe to modify the contents of the argument after Get returns.
func (ytfs *YTFS) Get(key ydcommon.IndexTableKey) ([]byte, error) {
if ytfs.config.UseKvDb{
fmt.Println("[rocksdb] use rocksdb for matadata")
return ytfs.GetK(key)
}
return ytfs.GetI(key)
}
func (ytfs *YTFS) GetI(key ydcommon.IndexTableKey) ([]byte, error) {
pos, err := ytfs.db.Get(key)
if err != nil {
fmt.Println("[indexdb] indexdb get pos error:",err)
return nil, err
}
return ytfs.context.Get(pos)
}
func (ytfs *YTFS) GetK(key ydcommon.IndexTableKey) ([]byte, error) {
val, err := ytfs.mdb.Rdb.Get(ytfs.mdb.ro,key[:])
pos := binary.LittleEndian.Uint32(val.Data())
// fmt.Println("[rocksdb] Rocksdbval=",val,"Rocksdbval32=",pos)
if err != nil {
fmt.Println("[rocksdb] rocksdb get pos error:",err)
return nil, err
}
return ytfs.context.Get(ydcommon.IndexTableValue(pos))
}
// Put sets the value for the given key. It panic if there exists any previous value
// for that key; YottaDisk is not a multi-map.
// It is safe to modify the contents of the arguments after Put returns but not
......@@ -200,7 +275,7 @@ func (ytfs *YTFS) Put(key ydcommon.IndexTableKey, buf []byte) error {
*/
func (ytfs *YTFS) restoreYTFS() {
//TODO: save index
fmt.Println("[memtrace] in restoreYTFS()")
fmt.Println("[rocksdb] in restoreYTFS()")
id := len(ytfs.savedStatus) - 1
ydcommon.YottaAssert(id >= 0)
ytfs.context.restore(ytfs.savedStatus[id].ctxSP)
......@@ -268,13 +343,91 @@ func (ytfs *YTFS)checkConflicts(conflicts map[ydcommon.IndexTableKey]byte, batch
}
}
func (ytfs *YTFS) BatchWriteKV(batch map[ydcommon.IndexTableKey][]byte) error {
var err error
Wbatch := new(gorocksdb.WriteBatch)
for key,val := range batch {
Wbatch.Put(key[:],val)
}
err = ytfs.mdb.Rdb.Write(ytfs.mdb.wo, Wbatch)
return err
}
func (ytfs *YTFS)resetKV(batchIndexes []ydcommon.IndexItem,resetCnt uint32){
for j:= uint32(0); j < resetCnt; j++ {
hashKey := batchIndexes[j].Hash[:]
ytfs.mdb.Rdb.Delete(ytfs.mdb.wo,hashKey[:])
}
}
//var mutexindex uint64 = 0
// BatchPut sets the value array for the given key array.
// It panics if there exists any previous value for that key as YottaDisk is not a multi-map.
// It is safe to modify the contents of the arguments after Put returns but not
// before.
func (ytfs *YTFS) BatchPut(batch map[ydcommon.IndexTableKey][]byte) (map[ydcommon.IndexTableKey]byte, error) {
begin:=time.Now()
// begin:=time.Now()
if ytfs.config.UseKvDb {
fmt.Println("[rocksdb] write use rocksdb for matadata")
return ytfs.BatchPutK(batch)
}
return ytfs.BatchPutI(batch)
}
func (ytfs *YTFS) BatchPutI(batch map[ydcommon.IndexTableKey][]byte) (map[ydcommon.IndexTableKey]byte, error) {
ytfs.mutex.Lock()
defer ytfs.mutex.Unlock()
if len(batch) > 1000 {
return nil, fmt.Errorf("Batch Size is too big")
}
fmt.Println("[indexdb] BatchPutI len(batch)=",len(batch))
// NO get check, but retore all status if error
ytfs.saveCurrentYTFS()
batchIndexes := make([]ydcommon.IndexItem, len(batch))
batchBuffer := []byte{}
bufCnt := len(batch)
i := 0
for k, v := range batch {
batchBuffer = append(batchBuffer, v...)
batchIndexes[i] = ydcommon.IndexItem{
Hash: k,
OffsetIdx: ydcommon.IndexTableValue(0)}
i++
}
startPos, err := ytfs.context.BatchPut(bufCnt, batchBuffer)
if err != nil {
fmt.Println("[indexdb] ytfs.context.BatchPut error")
ytfs.restoreYTFS()
return nil, err
}
for i := uint32(0); i < uint32(bufCnt); i++ {
batchIndexes[i] = ydcommon.IndexItem{
Hash: batchIndexes[i].Hash,
OffsetIdx: ydcommon.IndexTableValue(startPos + i)}
}
conflicts, err := ytfs.db.BatchPut(batchIndexes)
if err != nil {
fmt.Println("[indexdb] update indexdb error:",err)
ytfs.restoreIndex(conflicts, batchIndexes, uint32(bufCnt))
ytfs.restoreYTFS()
return conflicts, err
}
return nil, nil
}
func (ytfs *YTFS) BatchPutK(batch map[ydcommon.IndexTableKey][]byte) (map[ydcommon.IndexTableKey]byte, error) {
ytfs.mutex.Lock()
defer ytfs.mutex.Unlock()
......@@ -282,7 +435,6 @@ func (ytfs *YTFS) BatchPut(batch map[ydcommon.IndexTableKey][]byte) (map[ydcommo
return nil, fmt.Errorf("Batch Size is too big")
}
// NO get check, but retore all status if error
ytfs.saveCurrentYTFS()
......@@ -301,12 +453,12 @@ func (ytfs *YTFS) BatchPut(batch map[ydcommon.IndexTableKey][]byte) (map[ydcommo
startPos, err := ytfs.context.BatchPut(bufCnt, batchBuffer)
if err != nil {
fmt.Println("[memtrace] ytfs.context.BatchPut error")
fmt.Println("[rocksdb] ytfs.context.BatchPut error")
ytfs.restoreYTFS()
fmt.Printf("[noconflict] write error batch_write_time: %d ms, batch_len %d", time.Now().Sub(begin).Milliseconds(),bufCnt)
return nil, err
}
for i := uint32(0); i < uint32(bufCnt); i++ {
batchIndexes[i] = ydcommon.IndexItem{
Hash: batchIndexes[i].Hash,
......@@ -321,6 +473,7 @@ func (ytfs *YTFS) BatchPut(batch map[ydcommon.IndexTableKey][]byte) (map[ydcommo
ytfs.restoreYTFS()
fmt.Printf("[noconflict] write error batch_write_time: %d ms, batch_len %d", time.Now().Sub(begin).Milliseconds(),bufCnt)
return conflicts, err
}
fmt.Printf("[noconflict] write success batch_write_time: %d ms, batch_len %d", time.Now().Sub(begin).Milliseconds(),bufCnt)
return nil, nil
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册