提交 6a88f48c 编写于 作者: D dp524856

update

......@@ -136,6 +136,34 @@ func (c *Context) forward() error {
return nil
}
func (c *Context) fastforward(n int, commit bool) error {
sp := *c.sp
var err error
i := 0
for i = 0; i < n && err == nil; i++ {
err = c.forward()
}
if !commit {
*c.sp = sp
}
if i < n && err != nil {
// last i reach the eof is ok.
return err
}
return nil
}
func (c *Context) save() *storagePointer {
saveSP := *c.sp;
return &saveSP
}
func (c *Context) restore(sp *storagePointer) {
*c.sp = *sp
}
func (c *Context) eof() bool {
sp := c.sp
return sp.dev >= uint8(len(c.storages)) || (sp.dev == uint8(len(c.storages)-1) && sp.posIdx == c.storages[sp.dev].Cap)
......@@ -190,6 +218,41 @@ func (c *Context) PutAt(value []byte, globalID uint32) (uint32, error) {
return index, nil
}
// BatchPut puts the value array to offset that current sp points to of the corrent device
func (c *Context) BatchPut(cnt int, valueArray []byte) (uint32, error) {
c.lock.Lock()
defer c.lock.Unlock()
// TODO: Can we leave this check to disk??
if err := c.fastforward(cnt, false); err != nil {
return 0, err
}
var err error
var index uint32
if (c.sp.posIdx + uint32(cnt) <= c.storages[c.sp.dev].Cap) {
index, err = c.putAt(valueArray, c.sp)
} else {
currentSP := *c.sp;
step1 := c.storages[currentSP.dev].Cap - currentSP.posIdx
index, err = c.putAt(valueArray[:step1*c.config.DataBlockSize], &currentSP)
step2 := uint32(cnt) - step1
currentSP.dev++
currentSP.posIdx = 0
currentSP.index += step1
if (currentSP.posIdx + uint32(step2) > c.storages[currentSP.dev].Cap) {
return 0, errors.New("Batch across 3 storage devices, not supported")
}
_, err = c.putAt(valueArray[step1*c.config.DataBlockSize:], &currentSP)
}
if err != nil {
return 0, err
}
c.fastforward(cnt, true)
return index, nil
}
func (c *Context) putAt(value []byte, sp *storagePointer) (uint32, error) {
if c.eof() {
return 0, errors.ErrDataOverflow
......
......@@ -2,6 +2,7 @@ package ytfs
import (
"path"
"sort"
ydcommon "github.com/yottachain/YTFS/common"
"github.com/yottachain/YTFS/opt"
......@@ -46,6 +47,24 @@ func (db *IndexDB) Put(key ydcommon.IndexTableKey, value ydcommon.IndexTableValu
return db.indexFile.Put(key, value)
}
// BatchPut add a set of new key value pairs to db.
func (db *IndexDB) BatchPut(kvPairs []ydcommon.IndexItem) (map[ydcommon.IndexTableKey]byte, error) {
// sorr kvPair by hash entry to make sure write in sequence.
sort.Slice(kvPairs, func(i, j int) bool {
return db.indexFile.GetTableEntryIndex(kvPairs[i].Hash) < db.indexFile.GetTableEntryIndex(kvPairs[j].Hash)
})
// var err error
// for _, v := range kvPairs{
// err = db.indexFile.Put(v.Hash, v.OffsetIdx)
// if err != nil {
// return err
// }
// }
// return nil
return db.indexFile.BatchPut(kvPairs);
}
// Close finishes all actions and close db connection.
func (db *IndexDB) Close() {
db.indexFile.Close()
......
......@@ -82,6 +82,8 @@ func main() {
err = simpleTest(ytfs)
case "stress":
err = stressTestReadAfterWrite(ytfs)
case "bstress":
err = stressTestReadAfterBatchWrite(ytfs)
case "hybrid":
err = hybridTestReadAfterWrite(ytfs)
case "read":
......@@ -172,9 +174,11 @@ func stressWrite(ytfs *ytfs.YTFS) error {
for i := (uint64)(0); i < dataCaps; i++ {
printProgress(i, dataCaps-1)
testHash := common.HexToHash(fmt.Sprintf("%032X", i))
data := make([]byte, ytfs.Meta().DataBlockSize, ytfs.Meta().DataBlockSize)
copy(data, testHash[:])
dataPair := KeyValuePair{
hash: testHash,
buf: testHash[:],
buf: data,
}
err := ytfs.Put((ydcommon.IndexTableKey)(dataPair.hash), dataPair.buf[:])
if err != nil {
......@@ -186,6 +190,44 @@ func stressWrite(ytfs *ytfs.YTFS) error {
return nil
}
func stressBatchWrite(ytfs *ytfs.YTFS) error {
type KeyValuePair struct {
hash common.Hash
buf []byte
}
dataCaps := ytfs.Cap()
fmt.Printf("Starting insert %d data blocks\n", dataCaps)
batch := map[ydcommon.IndexTableKey][]byte{}
i := (uint64)(0)
for ; i < dataCaps; i++ {
printProgress(i, dataCaps-1)
testHash := common.HexToHash(fmt.Sprintf("%032X", i))
data := make([]byte, ytfs.Meta().DataBlockSize, ytfs.Meta().DataBlockSize)
copy(data, testHash[:])
batch[ydcommon.IndexTableKey(testHash)] = data
if (i + 1) % 17 == 0 {
_, err := ytfs.BatchPut(batch)
if err != nil {
panic(err)
}
batch = map[ydcommon.IndexTableKey][]byte{}
}
}
if len(batch) > 0 {
_, err := ytfs.BatchPut(batch)
if err != nil {
panic(fmt.Errorf("%v at last input", err))
}
}
fmt.Println(ytfs)
return nil
}
func stressRead(ytfs *ytfs.YTFS) error {
type KeyValuePair struct {
hash common.Hash
......@@ -213,6 +255,27 @@ func stressRead(ytfs *ytfs.YTFS) error {
return nil
}
func stressTestReadAfterBatchWrite(ytfs *ytfs.YTFS) error {
err := stressBatchWrite(ytfs)
if err != nil {
panic(err)
}
wg := sync.WaitGroup{}
for i := 0; i < runtime.NumCPU(); i++ {
wg.Add(1)
go func(id int) {
err := stressRead(ytfs)
if err != nil {
panic(err)
}
wg.Done()
}(i)
}
wg.Wait()
return err
}
func stressTestReadAfterWrite(ytfs *ytfs.YTFS) error {
err := stressWrite(ytfs)
if err != nil {
......
......@@ -82,7 +82,7 @@ func (indexFile *YTFSIndexFile) Format() error {
return indexFile.clearTableFromStorage()
}
func (indexFile *YTFSIndexFile) getTableEntryIndex(key ydcommon.IndexTableKey) uint32 {
func (indexFile *YTFSIndexFile) GetTableEntryIndex(key ydcommon.IndexTableKey) uint32 {
msb := (uint32)(big.NewInt(0).SetBytes(key[common.HashLength-4:]).Uint64())
return msb & (indexFile.meta.RangeCapacity - 1)
}
......@@ -91,7 +91,7 @@ func (indexFile *YTFSIndexFile) getTableEntryIndex(key ydcommon.IndexTableKey) u
func (indexFile *YTFSIndexFile) Get(key ydcommon.IndexTableKey) (ydcommon.IndexTableValue, error) {
locker, _ := indexFile.store.Lock()
defer locker.Unlock()
idx := indexFile.getTableEntryIndex(key)
idx := indexFile.GetTableEntryIndex(key)
table, err := indexFile.loadTableFromStorage(idx)
if err != nil {
return 0, err
......@@ -180,12 +180,16 @@ func (indexFile *YTFSIndexFile) Put(key ydcommon.IndexTableKey, value ydcommon.I
locker, _ := indexFile.store.Lock()
defer locker.Unlock()
idx := indexFile.getTableEntryIndex(key)
idx := indexFile.GetTableEntryIndex(key)
table, err := indexFile.loadTableFromStorage(idx)
if err != nil {
return err
}
if _, ok := table[key]; ok {
return errors.ErrConflict
}
rowCount := uint32(len(table))
if rowCount >= indexFile.meta.RangeCoverage {
// move to overflow region
......@@ -249,6 +253,115 @@ func (indexFile *YTFSIndexFile) Put(key ydcommon.IndexTableKey, value ydcommon.I
return err
}
// BatchPut saves a key value pair.
func (indexFile *YTFSIndexFile) BatchPut(kvPairs []ydcommon.IndexItem) (map[ydcommon.IndexTableKey]byte, error) {
locker, _ := indexFile.store.Lock()
defer locker.Unlock()
dataWritten := uint64(0)
conflicts := map[ydcommon.IndexTableKey]byte{}
for _, kvPair := range kvPairs {
err := indexFile.updateTable(kvPair.Hash, kvPair.OffsetIdx);
if err != nil {
if err == errors.ErrConflict {
conflicts[kvPair.Hash]=1
} else {
return nil, err
}
}
dataWritten++;
}
if len(conflicts) != 0 {
return conflicts, errors.ErrConflict
}
return nil, indexFile.updateMeta(dataWritten)
}
func (indexFile *YTFSIndexFile) updateMeta(dataWritten uint64) error {
indexFile.meta.DataEndPoint+=dataWritten
valueBuf := make([]byte, 4)
writer, _ := indexFile.store.Writer()
binary.LittleEndian.PutUint32(valueBuf, uint32(indexFile.meta.DataEndPoint))
header := indexFile.meta
writer.Seek(int64(unsafe.Offsetof(header.DataEndPoint)), io.SeekStart)
_, err := writer.Write(valueBuf)
if err != nil {
return err
}
if (indexFile.stat.putCount & (indexFile.config.SyncPeriod - 1)) == 0 {
err = writer.Sync()
if err != nil {
return err
}
}
return nil
}
func (indexFile *YTFSIndexFile) updateTable(key ydcommon.IndexTableKey, value ydcommon.IndexTableValue) error {
idx := indexFile.GetTableEntryIndex(key)
table, err := indexFile.loadTableFromStorage(idx)
if err != nil {
return err
}
if _, ok := table[key]; ok {
return errors.ErrConflict
}
rowCount := uint32(len(table))
if rowCount >= indexFile.meta.RangeCoverage {
// move to overflow region
idx = indexFile.meta.RangeCapacity
table, err = indexFile.loadTableFromStorage(idx)
if err != nil {
return err
}
rowCount := uint32(len(table))
if rowCount >= indexFile.meta.RangeCoverage {
return errors.ErrRangeFull
}
}
// write cnt
writer, _ := indexFile.store.Writer()
itemSize := uint32(unsafe.Sizeof(ydcommon.IndexTableKey{}) + unsafe.Sizeof(ydcommon.IndexTableValue(0)))
tableAllocationSize := indexFile.meta.RangeCoverage*itemSize + 4
tableBeginPos := int64(indexFile.meta.HashOffset) + int64(idx)*int64(tableAllocationSize)
valueBuf := make([]byte, 4)
writer.Seek(tableBeginPos, io.SeekStart)
tableSize := uint32(len(table)) + 1
binary.LittleEndian.PutUint32(valueBuf, uint32(tableSize))
_, err = writer.Write(valueBuf)
if err != nil {
return err
}
// write new item
tableItemPos := tableBeginPos + 4 + int64(len(table))*int64(itemSize)
writer.Seek(tableItemPos, io.SeekStart)
_, err = writer.Write(key[:])
if err != nil {
return err
}
binary.LittleEndian.PutUint32(valueBuf, uint32(value))
_, err = writer.Write(valueBuf)
if err != nil {
return err
}
indexFile.index.sizes[idx] = tableSize
if debugPrint {
fmt.Printf("IndexDB put %x:%x\n", key, value)
}
return nil
}
// OpenYTFSIndexFile opens or creates a YTFSIndexFile for the given storage.
// The DB will be created if not exist, unless Error happens.
//
......
......@@ -89,8 +89,8 @@ func (disk *YottaDisk) WriteData(dataOffsetIndex ydcommon.IndexTableValue, data
defer locker.Unlock()
writer, err := disk.store.Writer()
ydcommon.YottaAssert(len(data) <= (int)(disk.meta.DataBlockSize))
dataBlock := make([]byte, disk.meta.DataBlockSize, disk.meta.DataBlockSize)
ydcommon.YottaAssert(len(data) % (int)(disk.meta.DataBlockSize) == 0)
dataBlock := make([]byte, len(data), len(data))
copy(dataBlock, data)
writer.Seek(int64(disk.meta.DataOffset)+int64(disk.meta.DataBlockSize)*int64(dataOffsetIndex), io.SeekStart)
//
......
......@@ -11,6 +11,11 @@ import (
"github.com/yottachain/YTFS/opt"
)
type ytfsStatus struct {
ctxSP *storagePointer
//TODO: index status
}
// YTFS is a data block save/load lib based on key-value styled db APIs.
type YTFS struct {
// config of this YTFS
......@@ -21,6 +26,9 @@ type YTFS struct {
context *Context
// lock of YTFS
mutex *sync.Mutex
// saved status
savedStatus []ytfsStatus
}
// Open opens or creates a YTFS for the given storage.
......@@ -182,6 +190,71 @@ func (ytfs *YTFS) Put(key ydcommon.IndexTableKey, buf []byte) error {
return ytfs.db.Put(key, ydcommon.IndexTableValue(pos))
}
/*
* Batch mode func list
*/
func (ytfs *YTFS) restoreYTFS() {
//TODO: save index
id := len(ytfs.savedStatus)-1
ydcommon.YottaAssert(id >= 0)
ytfs.context.restore(ytfs.savedStatus[id].ctxSP);
ytfs.savedStatus = ytfs.savedStatus[:id]
}
func (ytfs *YTFS) saveCurrentYTFS() {
//TODO: restore index
ytfs.savedStatus = append(ytfs.savedStatus, ytfsStatus{
ctxSP: ytfs.context.save(),
})
}
// BatchPut sets the value array for the given key array.
// It panics if there exists any previous value for that key as YottaDisk is not a multi-map.
// It is safe to modify the contents of the arguments after Put returns but not
// before.
func (ytfs *YTFS) BatchPut(batch map[ydcommon.IndexTableKey][]byte) (map[ydcommon.IndexTableKey]byte, error) {
ytfs.mutex.Lock()
defer ytfs.mutex.Unlock()
if (len(batch) > 32) {
return nil, fmt.Errorf("Batch Size is too big")
}
// NO get check, but retore all status if error
ytfs.saveCurrentYTFS();
batchIndexes := make([]ydcommon.IndexItem, len(batch))
batchBuffer := []byte{};
bufCnt := len(batch)
i:=0
for k, v := range batch {
batchBuffer = append(batchBuffer, v...)
batchIndexes[i] = ydcommon.IndexItem{
Hash: k,
OffsetIdx: ydcommon.IndexTableValue(0)}
i++
}
startPos, err := ytfs.context.BatchPut(bufCnt, batchBuffer);
if err != nil {
ytfs.restoreYTFS();
return nil, err
}
for i:=uint32(0); i<uint32(bufCnt); i++ {
batchIndexes[i] = ydcommon.IndexItem{
Hash: batchIndexes[i].Hash,
OffsetIdx: ydcommon.IndexTableValue(startPos + i)}
}
conflicts, err := ytfs.db.BatchPut(batchIndexes)
if err != nil {
ytfs.restoreYTFS();
return conflicts, err
}
return nil, nil
}
// Meta reports current meta information.
func (ytfs *YTFS) Meta() *ydcommon.Header {
return ytfs.db.schema
......
......@@ -157,6 +157,107 @@ func TestYTFSFullWriteRead(t *testing.T) {
}
}
func TestYTFSFullBatchWriteRead(t *testing.T) {
rootDir, err := ioutil.TempDir("/tmp", "ytfsTest")
config := opt.DefaultOptions()
ytfs, err := Open(rootDir, config)
if err != nil {
t.Fatal(err)
}
defer ytfs.Close()
dataCaps := ytfs.Cap()
fmt.Printf("Starting insert %d data blocks\n", dataCaps)
batch := map[types.IndexTableKey][]byte{}
for i := (uint64)(0); i < dataCaps; i++ {
testHash := (types.IndexTableKey)(common.HexToHash(fmt.Sprintf("%032X", i)))
buf := make([]byte, config.DataBlockSize)
copy(buf, testHash[:])
batch[testHash] = buf
if i > 0 && i % 7 == 0 {
_, err := ytfs.BatchPut(batch)
if err != nil {
panic(fmt.Sprintf("Error: %v in %d insert", err, i))
}
batch = map[types.IndexTableKey][]byte{}
}
}
if len(batch) > 0 {
_, err := ytfs.BatchPut(batch)
if err != nil {
panic(fmt.Sprintf("Error: %v in last insert", err))
}
batch = map[types.IndexTableKey][]byte{}
}
fmt.Printf("Starting validata %d data blocks\n", dataCaps)
for i := (uint64)(0); i < dataCaps; i++ {
testHash := (types.IndexTableKey)(common.HexToHash(fmt.Sprintf("%032X", i)))
buf, err := ytfs.Get(testHash)
if err != nil {
t.Fatal(fmt.Sprintf("Error: %v in %d check", err, i))
}
if bytes.Compare(buf[:len(testHash)], testHash[:]) != 0 {
t.Fatal(fmt.Sprintf("Fatal: %d test fail, want:\n%x\n, get:\n%x\n", i, testHash, buf[:len(testHash)]))
}
}
}
func TestYTFSBatchConflictReport(t *testing.T) {
rootDir, err := ioutil.TempDir("/tmp", "ytfsTest")
config := opt.DefaultOptions()
ytfs, err := Open(rootDir, config)
if err != nil {
t.Fatal(err)
}
defer ytfs.Close()
dataCaps := ytfs.Cap()
fmt.Printf("Starting insert %d data blocks\n", dataCaps)
batch := map[types.IndexTableKey][]byte{}
for i := (uint64)(0); i <= 7; i++ {
testHash := (types.IndexTableKey)(common.HexToHash(fmt.Sprintf("%032X", i)))
buf := make([]byte, config.DataBlockSize)
copy(buf, testHash[:])
batch[testHash] = buf
if i > 0 && i % 7 == 0 {
_, err := ytfs.BatchPut(batch)
if err != nil {
panic(fmt.Sprintf("Error: %v in %d insert", err, i))
}
// remove 1 item
testRemoveHash := (types.IndexTableKey)(common.HexToHash(fmt.Sprintf("%032X", 3)))
delete(batch, testRemoveHash)
// add 1 new
testNewHash := (types.IndexTableKey)(common.HexToHash(fmt.Sprintf("%032X", 8)))
buf := make([]byte, config.DataBlockSize)
copy(buf, testNewHash[:])
batch[testNewHash] = buf
conflicts, err := ytfs.BatchPut(batch)
if err != errors.ErrConflict {
panic(fmt.Sprintf("Error: Expected err errors.ErrConflict but get %v.", err))
} else {
if len(conflicts) != len(batch) - 1 {
panic(fmt.Sprintf("Error: conflicts has %d items but batch has %d.", len(conflicts), len(batch)))
}
for k := range conflicts {
if _, ok := batch[k]; !ok {
panic(fmt.Sprintf("Error: conflicts mismatch with batch on %v.", k))
}
}
if _, ok := conflicts[testNewHash]; ok {
panic(fmt.Sprintf("Error: conflicts should not have %v.", testNewHash))
}
}
}
}
}
func TestYTFSConcurrentAccessWriteSameKey(t *testing.T) {
rootDir, err := ioutil.TempDir("/tmp", "ytfsTest")
config := opt.DefaultOptions()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册