context.go 6.2 KB
Newer Older
D
DoMyJob 已提交
1 2 3 4
package ytfs

import (
	"fmt"
5
	"math"
D
DoMyJob 已提交
6 7 8 9 10 11 12 13
	"sync"

	ydcommon "github.com/yottachain/YTFS/common"
	"github.com/yottachain/YTFS/errors"
	"github.com/yottachain/YTFS/opt"
	"github.com/yottachain/YTFS/storage"
)

D
DoMyJob 已提交
14
var (
D
DoMyJob 已提交
15
	debugPrint = opt.DebugPrint
D
DoMyJob 已提交
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
)

type storageContext struct {
	// storage name
	Name string
	// full capability of data block
	Cap uint32
	// used data block slot number
	Len uint32
	// Storage
	Disk *storage.YottaDisk
}

type storagePointer struct {
	dev    uint8  // device id, from 0~255
	posIdx uint32 // device inside offset id.
	index  uint32 // global id of data. if one device can hold 1 data, then 0 == [0, 0], 1 == [1, 0], 2 == [2, 0]
}

// Context - the running YTFS context
type Context struct {
	config   *opt.Options
38
	sp       *storagePointer
D
DoMyJob 已提交
39 40 41 42 43 44
	storages []*storageContext
	// cm     		*cache.Manager
	lock sync.RWMutex
}

// NewContext creates a new YTFS context
45 46
func NewContext(dir string, config *opt.Options, dataCount uint64) (*Context, error) {
	storages, err := initStorages(config)
D
DoMyJob 已提交
47 48 49 50
	if err != nil {
		return nil, err
	}

51 52 53 54 55
	if dataCount > math.MaxUint32 {
		return nil, errors.ErrContextOverflow
	}

	context := &Context{
D
DoMyJob 已提交
56
		config:   config,
57
		sp:       nil,
D
DoMyJob 已提交
58 59
		storages: storages,
		lock:     sync.RWMutex{},
60 61 62 63 64
	}

	context.SetStoragePointer(uint32(dataCount))
	fmt.Println("Create YTFS content success, current sp = ", context.sp)
	return context, nil
D
DoMyJob 已提交
65 66
}

67
func initStorages(config *opt.Options) ([]*storageContext, error) {
D
DoMyJob 已提交
68
	contexts := []*storageContext{}
69
	for _, storageOpt := range config.Storages {
D
DoMyJob 已提交
70 71 72
		disk, err := storage.OpenYottaDisk(&storageOpt)
		if err != nil {
			// TODO: handle error if necessary, like keep using successed storages.
73
			return nil, err
D
DoMyJob 已提交
74 75 76 77
		}
		contexts = append(contexts, &storageContext{
			Name: storageOpt.StorageName,
			Cap:  disk.Capability(),
78
			Len:  0,
D
DoMyJob 已提交
79 80 81 82
			Disk: disk,
		})
	}

83 84 85 86 87 88 89 90 91 92
	return contexts, nil
}

// SetStoragePointer set the storage pointer position of current storage context
func (c *Context) SetStoragePointer(globalID uint32) error {
	sp, err := c.locate(globalID)
	if sp != nil {
		c.sp = sp
	}
	return err
D
DoMyJob 已提交
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
}

// Locate find the correct offset in correct device
func (c *Context) locate(idx uint32) (*storagePointer, error) {
	// TODO: binary search
	var dev, posIdx uint32 = 0, 0
	var devBegin, devEnd uint32 = 0, 0
	for _, s := range c.storages {
		devEnd += s.Cap
		if devBegin <= idx && idx < devEnd {
			posIdx = idx - devBegin
			return &storagePointer{
				uint8(dev),
				posIdx,
				idx,
			}, nil
		}
		devBegin += s.Cap
		dev++
	}

114 115 116 117 118
	return &storagePointer{
		uint8(len(c.storages)),
		0,
		0,
	}, errors.ErrContextIDMapping
D
DoMyJob 已提交
119 120 121
}

func (c *Context) forward() error {
122
	sp := c.sp
D
DoMyJob 已提交
123 124
	sp.posIdx++
	if sp.posIdx == c.storages[sp.dev].Cap {
125 126
		if int(sp.dev+1) == len(c.storages) {
			return errors.ErrDataOverflow
D
DoMyJob 已提交
127
		}
D
DoMyJob 已提交
128 129 130
		if debugPrint {
			fmt.Println("Move to next dev", sp.dev+1)
		}
D
DoMyJob 已提交
131 132 133 134 135 136 137 138
		sp.dev++
		sp.posIdx = 0
	}

	sp.index++
	return nil
}

139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
func (c *Context) fastforward(n int, commit bool) error {
	sp := *c.sp
	var err error
	i := 0
	for i = 0; i < n && err == nil; i++ {
			err = c.forward()
	}
	if !commit {
			*c.sp = sp
	}

	if i < n && err != nil {
			// last i reach the eof is ok.
			return err
	}

	return nil
}

158 159 160
func (c *Context) save() *storagePointer {
	saveSP := *c.sp;
	return &saveSP
161 162
}

163 164
func (c *Context) restore(sp *storagePointer) {
	*c.sp = *sp
165 166
}

D
DoMyJob 已提交
167
func (c *Context) eof() bool {
168 169
	sp := c.sp
	return sp.dev >= uint8(len(c.storages)) || (sp.dev == uint8(len(c.storages)-1) && sp.posIdx == c.storages[sp.dev].Cap)
D
DoMyJob 已提交
170 171
}

172 173
func (c *Context) setEOF() {
	sp := c.sp
D
DoMyJob 已提交
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
	sp.dev = uint8(len(c.storages) - 1)
	sp.posIdx = c.storages[sp.dev].Cap
}

// Get gets the value from offset of the correct device
func (c *Context) Get(globalIdx ydcommon.IndexTableValue) (value []byte, err error) {
	c.lock.RLock()
	defer c.lock.RUnlock()
	sp, err := c.locate(uint32(globalIdx))
	if err != nil {
		return nil, err
	}

	if debugPrint {
		fmt.Printf("get data globalId %d @%v\n", globalIdx, sp)
	}

	return c.storages[sp.dev].Disk.ReadData(ydcommon.IndexTableValue(sp.posIdx))
}

194
// Put puts the vale to offset that current sp points to of the corrent device
D
DoMyJob 已提交
195 196 197
func (c *Context) Put(value []byte) (uint32, error) {
	c.lock.Lock()
	defer c.lock.Unlock()
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
	index, err := c.putAt(value, c.sp)
	if err != nil {
		return index, err
	}
	c.forward()
	return index, nil
}

// PutAt puts the vale to specific offset of the corrent device
func (c *Context) PutAt(value []byte, globalID uint32) (uint32, error) {
	c.lock.Lock()
	defer c.lock.Unlock()
	sp, err := c.locate(globalID)
	if err != nil {
		return 0, err
	}
	index, err := c.putAt(value, sp)
	if err != nil {
		return index, err
	}
	return index, nil
}

221 222 223 224 225 226 227
// BatchPut puts the value array to offset that current sp points to of the corrent device
func (c *Context) BatchPut(cnt int, valueArray []byte) (uint32, error) {
	c.lock.Lock()
	defer c.lock.Unlock()

	// TODO: Can we leave this check to disk??
	if err := c.fastforward(cnt, false); err != nil {
228
		return 0, err
229 230 231 232 233
	}

	var err error
	var index uint32
	if (c.sp.posIdx + uint32(cnt) <= c.storages[c.sp.dev].Cap) {
234
		index, err = c.putAt(valueArray, c.sp)
235
	} else {
236 237 238 239 240 241 242 243 244 245 246
		currentSP := *c.sp;
		step1 := c.storages[currentSP.dev].Cap - currentSP.posIdx
		index, err = c.putAt(valueArray[:step1*c.config.DataBlockSize], &currentSP)
		step2 := uint32(cnt) - step1
		currentSP.dev++
		currentSP.posIdx = 0
		currentSP.index += step1
		if (currentSP.posIdx + uint32(step2) > c.storages[currentSP.dev].Cap) {
				return 0, errors.New("Batch across 3 storage devices, not supported")
		}
		_, err = c.putAt(valueArray[step1*c.config.DataBlockSize:], &currentSP)
247 248 249
	}

	if err != nil {
250
		return 0, err
251 252 253 254 255
	}
	c.fastforward(cnt, true)
	return index, nil
}

256
func (c *Context) putAt(value []byte, sp *storagePointer) (uint32, error) {
D
DoMyJob 已提交
257
	if c.eof() {
258
		return 0, errors.ErrDataOverflow
D
DoMyJob 已提交
259 260 261
	}

	if debugPrint {
D
DoMyJob 已提交
262
		fmt.Printf("put data %x @ %v\n", value[:32], sp)
D
DoMyJob 已提交
263 264
	}

265 266
	dataPos := sp.posIdx
	err := c.storages[sp.dev].Disk.WriteData(ydcommon.IndexTableValue(dataPos), value)
D
DoMyJob 已提交
267
	if err != nil {
268
		return sp.index, err
D
DoMyJob 已提交
269
	}
270
	return sp.index, nil
D
DoMyJob 已提交
271 272 273 274 275 276
}

// Close finishes all actions and close all storages
func (c *Context) Close() {
	for _, storage := range c.storages {
		storage.Disk.Close()
277
		c.setEOF()
D
DoMyJob 已提交
278 279 280 281 282
	}
}

// Reset reset current context.
func (c *Context) Reset() {
283
	c.sp = &storagePointer{0, 0, 0}
D
DoMyJob 已提交
284 285 286
	for _, storage := range c.storages {
		storage.Disk.Format()
	}
D
DoMyJob 已提交
287
}