提交 d97d5a1e 编写于 作者: Z zhangzhengyan

Merge branch 'release-all' into release

......@@ -144,11 +144,10 @@ func (c *Context) locate(idx uint32) (*storagePointer, error) {
}
func (c *Context) forward() error {
fmt.Println("[memtrace] in forward()")
sp := c.sp
sp.posIdx++
if int(sp.dev) >= len(c.storages) {
fmt.Println("[memtrace] err int(sp.dev) >= len(c.storages)")
fmt.Println("[memtrace] error int(sp.dev) >= len(c.storages)")
return errors.ErrDataOverflow
}
if sp.posIdx >= c.storages[sp.dev].Cap {
......@@ -180,7 +179,6 @@ func (c *Context) fastforward(n int, commit bool) error {
fmt.Println("[memtrace] in fastforward error:",err)
return err
}
fmt.Println("[memtrace] fastforward end")
return nil
}
......@@ -260,10 +258,8 @@ func (c *Context) BatchPut(cnt int, valueArray []byte) (uint32, error) {
var err error
var index uint32
if (c.sp.posIdx + uint32(cnt) <= c.storages[c.sp.dev].Cap) {
fmt.Println("[memtrace] putAt in one dev")
index, err = c.putAt(valueArray, c.sp)
} else {
fmt.Println("[memtrace] putAt in two dev")
currentSP := *c.sp;
step1 := c.storages[currentSP.dev].Cap - currentSP.posIdx
index, err = c.putAt(valueArray[:step1*c.config.DataBlockSize], &currentSP)
......@@ -285,11 +281,9 @@ func (c *Context) BatchPut(cnt int, valueArray []byte) (uint32, error) {
}
func (c *Context) putAt(value []byte, sp *storagePointer) (uint32, error) {
fmt.Println("[memtrace] putAt start")
if c.eof() {
return 0, errors.ErrDataOverflow
}
fmt.Println("[memtrace] putAt no eof,continue ")
if debugPrint {
fmt.Printf("put data %x @ %v\n", value[:32], sp)
}
......
......@@ -6,6 +6,7 @@ import (
// Common errors.
var (
ErrDBConfig = errors.New("YTFS: error DB config")
ErrTwoMetaFile = errors.New("YTFS: two metafile exist")
ErrDataConflict = errors.New("YTFS: hash key conflict happens")
ErrDirNameConflict = errors.New("YTFS: ytfs can not open dir because of name conflict")
......
......@@ -61,23 +61,6 @@ func CheckDbStatus(dir,file1,file2 string) bool {
// NewIndexDB creates a new index db based on input file if it's exist.
func NewIndexDB(dir string, config *opt.Options) (*IndexDB, error) {
fileName := path.Join(dir, "index.db")
//fileName2 := path.Join(dir, "metadata.db")
//if config.UseKvDb {
// fmt.Println("[rocksdb] use rocksdb was configured,use rocksdb!")
// fileName = fileName2
//}
//
//if PathExists(fileName2){
// fmt.Println("[rocksdb] metadata.db exist,use rocksdb!")
// fileName = fileName2
// config.UseKvDb = true
//}
//
//bl := CheckDbStatus(dir,"index.db","metadata.db")
//if bl {
// fmt.Println("[rocksdb][error]there two metadata file")
// return nil, ErrTwoMetaFile
//}
indexFile, err := storage.OpenYTFSIndexFile(fileName, config)
if err != nil {
......
......@@ -6,6 +6,7 @@ import (
"github.com/tecbot/gorocksdb"
ydcommon "github.com/yottachain/YTFS/common"
"github.com/yottachain/YTFS/opt"
"github.com/yottachain/YTDataNode/logger"
"os"
"path"
"sync"
......@@ -56,6 +57,7 @@ func openKVDB(DBPath string) (kvdb *KvDB, err error) {
func openYTFSK(dir string, config *opt.Options) (*YTFS, error) {
//TODO: file lock to avoid re-open.
//1. open system dir for YTFS
if fi, err := os.Stat(dir); err == nil {
// dir/file exists, check if it can be reloaded.
if !fi.IsDir() {
......@@ -104,12 +106,29 @@ func openYTFSK(dir string, config *opt.Options) (*YTFS, error) {
//get start Pos from rocksdb
HKey := ydcommon.BytesToHash([]byte(ytPosKey))
mDB.PosKey = ydcommon.IndexTableKey(HKey)
PosIdx, err := mDB.Get(mDB.PosKey)
PosRocksdb, err := mDB.Get(mDB.PosKey)
if err != nil {
fmt.Println("[rocksdb] get start write pos err=",err)
return nil, err
}
mDB.PosIdx = PosIdx
//if indexdb exist, get write start pos from index.db
fileIdxdb := path.Join(dir,"index.db")
if PathExists(fileIdxdb){
indexDB, err := NewIndexDB(dir, config)
if err != nil {
return nil,err
}
//if rocksdb start pos < index.db start pos, there must be some error
posIdxdb := indexDB.schema.DataEndPoint
if uint64(PosRocksdb) < posIdxdb{
log.Println("pos error:",ErrDBConfig)
return nil,ErrDBConfig
}
}
mDB.PosIdx = PosRocksdb
fmt.Println("[rocksdb] OpenYTFSK Current start posidx=",mDB.PosIdx)
//check blksize to rocksdb
......@@ -149,6 +168,14 @@ func openYTFSK(dir string, config *opt.Options) (*YTFS, error) {
mutex : new(sync.Mutex),
}
fileName := path.Join(dir, "dbsafe")
if ! PathExists(fileName) {
if _, err := os.Create(fileName);err != nil {
log.Println("create arbiration file error!")
return nil,err
}
}
fmt.Println("Open YTFS success @" + dir)
return ytfs, nil
}
......@@ -158,7 +185,7 @@ func (rd *KvDB) Get(key ydcommon.IndexTableKey) (ydcommon.IndexTableValue, error
var retval uint32
val, err := rd.Rdb.Get(rd.ro, key[:])
if err != nil {
fmt.Println("[rocksdb] rocksdb get pos error:", err)
fmt.Println("[rocksdb] get pos error:", err)
return 0, err
}
......@@ -168,19 +195,6 @@ func (rd *KvDB) Get(key ydcommon.IndexTableKey) (ydcommon.IndexTableValue, error
return ydcommon.IndexTableValue(retval), nil
}
func (rd *KvDB)GetKeyVal(Key string) []byte{
HKey := ydcommon.BytesToHash([]byte(Key))
fmt.Println("diskposkey=",string(HKey[:]))
val,err := rd.Rdb.Get(rd.ro,HKey[:])
fmt.Println("get diskIdx val.exist=",val.Exists())
fmt.Println("get diskIdx err in openKVDB,err=",err)
if err != nil || !val.Exists(){
fmt.Println("get value error, key=",Key)
return nil
}
return val.Data()
}
func initializeHeader( config *opt.Options) (*ydcommon.Header, error) {
m, n := config.IndexTableCols, config.IndexTableRows
t, d, h := config.TotalVolumn, config.DataBlockSize, uint32(unsafe.Sizeof(ydcommon.Header{}))
......@@ -218,17 +232,18 @@ func (rd *KvDB) BatchPut(kvPairs []ydcommon.IndexItem) (map[ydcommon.IndexTableK
err := rd.Rdb.Put(rd.wo, HKey, valbuf)
if err != nil {
fmt.Println("[rocksdb]put dnhash to rocksdb error", err)
fmt.Println("[rocksdb]put dnhash to rocksdb error:", err)
return nil, err
}
i++
}
fmt.Println("[rockspos] BatchPut PosIdx=",rd.PosIdx,"i=",i)
rd.PosIdx = ydcommon.IndexTableValue(uint32(rd.PosIdx) + uint32(i))
binary.LittleEndian.PutUint32(valbuf, uint32(rd.PosIdx))
err := rd.Rdb.Put(rd.wo,rd.PosKey[:],valbuf)
if err != nil {
fmt.Println("update write pos to metadatafile err:", err)
fmt.Println("update write pos to db error:", err)
return nil, err
}
//fmt.Printf("[noconflict] write success batch_write_time: %d ms, batch_len %d", time.Now().Sub(begin).Milliseconds(), bufCnt)
......
......@@ -507,7 +507,7 @@ func (indexFile *YTFSIndexFile) updateTable(key ydcommon.IndexTableKey, value yd
if debugPrint {
fmt.Printf("IndexDB put %x:%x\n", key, value)
}
fmt.Printf("[memtrace] IndexDB put %x:%x\n", key, value)
return nil
}
......
......@@ -49,24 +49,6 @@ func GetTableIterator(indexpath string, opts *opt.Options) (*TableIterator, erro
return &ti, nil
}
// GetTableIterator 返回Table遍历器
func GetTableIterator2(indexpath, metadatapath string, opts *opt.Options, glbti TableIterator) (*TableIterator, error) {
// var ti TableIterator
ytfsIndexFile, err := OpenYTFSIndexFile(indexpath, opts)
if err != nil {
fmt.Println("ytfsIndexFile open err")
return nil, err
}
err = RebuildIdxHeader(ytfsIndexFile, metadatapath)
if err != nil {
fmt.Println("metadatapath rebuild err")
return nil, err
}
glbti.ytfsIndexFile = ytfsIndexFile
glbti.options = opts
return &glbti, nil
}
// GetTable 获取一个Table,指针后移一位
func (ti *TableIterator) GetTable() (common.IndexTable, error) {
if ti.tableIndex > ti.options.IndexTableRows {
......@@ -157,7 +139,6 @@ func (ti *TableIterator) GetNoNilTableBytes() (bytesTable, error) {
for {
table, err := ti.GetTableBytes()
if err != nil {
fmt.Println("GetTableBytes error,", err)
return nil, err
}
if table == nil {
......
......@@ -81,7 +81,6 @@ func (disk *YottaDisk) ReadData(dataIndex ydcommon.IndexTableValue) ([]byte, err
// WriteData writes data to low level storage
func (disk *YottaDisk) WriteData(dataOffsetIndex ydcommon.IndexTableValue, data []byte) error {
fmt.Println("[memtrace] WriteData start ")
if uint32(dataOffsetIndex) >= disk.meta.DataCapacity {
fmt.Println("[memtrace] WriteData error dataOffsetIndex out datacapacity")
return errors.ErrDataOverflow
......@@ -101,18 +100,16 @@ func (disk *YottaDisk) WriteData(dataOffsetIndex ydcommon.IndexTableValue, data
//if err != nil {
// return err
//}
fmt.Println("[memtrace] real write datablock")
_, err = writer.Write(dataBlock)
fmt.Println("[memtrace] real write end")
if err != nil {
fmt.Println("[memtrace] real write err:",err)
fmt.Println("[memtrace] real write error:",err)
return err
}
disk.stat.writeOps++
if disk.stat.writeOps&(disk.config.SyncPeriod-1) == 0 {
fmt.Println("[memtrace] writer.Sync:")
return writer.Sync()
}
......
......@@ -4,6 +4,7 @@ import (
// "encoding/binary"
"encoding/json"
"fmt"
log "github.com/yottachain/YTDataNode/logger"
//"time"
//"github.com/tecbot/gorocksdb"
......@@ -91,28 +92,17 @@ func NewYTFS(dir string, config *opt.Options) (*YTFS, error) {
return ytfs, nil
}
//func openKVDB(DBPath string) (db *leveldb.DB,err error){
// db,err = leveldb.OpenFile(DBPath,nil)
// if err != nil{
// fmt.Printf("open DB:%s error",DBPath)
// return nil,err
// }
// return db,err
//}
func openYTFS(dir string, config *opt.Options) (*YTFS, error) {
// fileName := path.Join(dir, "maindb")
if config.UseKvDb {
fmt.Println("use rocksdb")
return openYTFSK(dir,config)
}
fmt.Println("use indexdb")
return openYTFSI(dir,config)
}
func openYTFSI(dir string, config *opt.Options) (*YTFS, error) {
//TODO: file lock to avoid re-open.
//1. open system dir for YTFS
fileName := path.Join(dir, "dbsafe")
if PathExists(fileName) {
log.Printf("db config error!")
return nil,ErrDBConfig
}
if fi, err := os.Stat(dir); err == nil {
// dir/file exists, check if it can be reloaded.
if !fi.IsDir() {
......@@ -211,13 +201,6 @@ func validateYTFSSchema(meta *ydcommon.Header, opt *opt.Options) (*ydcommon.Head
// The returned slice is its own copy, it is safe to modify the contents
// of the returned slice.
// It is safe to modify the contents of the argument after Get returns.
//func (ytfs *YTFS) Get(key ydcommon.IndexTableKey) ([]byte, error) {
// if ytfs.config.UseKvDb {
// fmt.Println("[rocksdb] use rocksdb for matadata")
// return ytfs.GetK(key)
// }
// return ytfs.GetI(key)
//}
func (ytfs *YTFS) Get(key ydcommon.IndexTableKey) ([]byte, error) {
pos, err := ytfs.db.Get(key)
......@@ -228,18 +211,6 @@ func (ytfs *YTFS) Get(key ydcommon.IndexTableKey) ([]byte, error) {
return ytfs.context.Get(pos)
}
//func (ytfs *YTFS) GetK(key ydcommon.IndexTableKey) ([]byte, error) {
// val, err := ytfs.mdb.Rdb.Get(ytfs.mdb.ro, key[:])
// pos := binary.LittleEndian.Uint32(val.Data())
// // fmt.Println("[rocksdb] Rocksdbval=",val,"Rocksdbval32=",pos)
// if err != nil {
// fmt.Println("[rocksdb] rocksdb get pos error:", err)
// return nil, err
// }
//
// return ytfs.context.Get(ydcommon.IndexTableValue(pos))
//}
// Put sets the value for the given key. It panic if there exists any previous value
// for that key; YottaDisk is not a multi-map.
// It is safe to modify the contents of the arguments after Put returns but not
......@@ -340,13 +311,6 @@ func (ytfs *YTFS) checkConflicts(conflicts map[ydcommon.IndexTableKey]byte, batc
// It is safe to modify the contents of the arguments after Put returns but not
// before.
//func (ytfs *YTFS) BatchPut(batch map[ydcommon.IndexTableKey][]byte) (map[ydcommon.IndexTableKey]byte, error) {
// if ytfs.config.UseKvDb {
// fmt.Println("[rocksdb] write use rocksdb for matadata")
// return ytfs.BatchPutK(batch)
// }
// return ytfs.BatchPutI(batch)
//}
func (ytfs *YTFS) BatchPut(batch map[ydcommon.IndexTableKey][]byte) (map[ydcommon.IndexTableKey]byte, error) {
ytfs.mutex.Lock()
......
package ytfs
import (
"fmt"
"github.com/yottachain/YTFS/opt"
)
func openYTFS(dir string, config *opt.Options) (*YTFS, error) {
if config.UseKvDb {
fmt.Println("use rocksdb")
return openYTFSK(dir,config)
}
fmt.Println("use indexdb")
return openYTFSI(dir,config)
}
\ No newline at end of file
package ytfs
import (
"fmt"
"github.com/yottachain/YTFS/opt"
)
func openYTFS(dir string, config *opt.Options) (*YTFS, error) {
fmt.Println("use indexdb")
return openYTFSI(dir,config)
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册