context.go 6.3 KB
Newer Older
D
DoMyJob 已提交
1 2 3 4
package ytfs

import (
	"fmt"
5
	"math"
D
DoMyJob 已提交
6 7 8 9 10 11 12 13
	"sync"

	ydcommon "github.com/yottachain/YTFS/common"
	"github.com/yottachain/YTFS/errors"
	"github.com/yottachain/YTFS/opt"
	"github.com/yottachain/YTFS/storage"
)

D
DoMyJob 已提交
14
var (
D
DoMyJob 已提交
15
	debugPrint = opt.DebugPrint
D
DoMyJob 已提交
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
)

type storageContext struct {
	// storage name
	Name string
	// full capability of data block
	Cap uint32
	// used data block slot number
	Len uint32
	// Storage
	Disk *storage.YottaDisk
}

type storagePointer struct {
	dev    uint8  // device id, from 0~255
	posIdx uint32 // device inside offset id.
	index  uint32 // global id of data. if one device can hold 1 data, then 0 == [0, 0], 1 == [1, 0], 2 == [2, 0]
}

// Context - the running YTFS context
type Context struct {
	config   *opt.Options
38
	sp       *storagePointer
D
DoMyJob 已提交
39 40 41 42 43 44
	storages []*storageContext
	// cm     		*cache.Manager
	lock sync.RWMutex
}

// NewContext creates a new YTFS context
45 46
func NewContext(dir string, config *opt.Options, dataCount uint64) (*Context, error) {
	storages, err := initStorages(config)
D
DoMyJob 已提交
47 48 49 50
	if err != nil {
		return nil, err
	}

51 52 53 54 55
	if dataCount > math.MaxUint32 {
		return nil, errors.ErrContextOverflow
	}

	context := &Context{
D
DoMyJob 已提交
56
		config:   config,
57
		sp:       nil,
D
DoMyJob 已提交
58 59
		storages: storages,
		lock:     sync.RWMutex{},
60 61 62 63 64
	}

	context.SetStoragePointer(uint32(dataCount))
	fmt.Println("Create YTFS content success, current sp = ", context.sp)
	return context, nil
D
DoMyJob 已提交
65 66
}

67
func initStorages(config *opt.Options) ([]*storageContext, error) {
D
DoMyJob 已提交
68
	contexts := []*storageContext{}
69
	for _, storageOpt := range config.Storages {
D
DoMyJob 已提交
70 71 72
		disk, err := storage.OpenYottaDisk(&storageOpt)
		if err != nil {
			// TODO: handle error if necessary, like keep using successed storages.
73
			return nil, err
D
DoMyJob 已提交
74 75 76 77
		}
		contexts = append(contexts, &storageContext{
			Name: storageOpt.StorageName,
			Cap:  disk.Capability(),
78
			Len:  0,
D
DoMyJob 已提交
79 80 81 82
			Disk: disk,
		})
	}

83 84 85 86 87 88 89 90 91 92
	return contexts, nil
}

// SetStoragePointer set the storage pointer position of current storage context
func (c *Context) SetStoragePointer(globalID uint32) error {
	sp, err := c.locate(globalID)
	if sp != nil {
		c.sp = sp
	}
	return err
D
DoMyJob 已提交
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
}

// Locate find the correct offset in correct device
func (c *Context) locate(idx uint32) (*storagePointer, error) {
	// TODO: binary search
	var dev, posIdx uint32 = 0, 0
	var devBegin, devEnd uint32 = 0, 0
	for _, s := range c.storages {
		devEnd += s.Cap
		if devBegin <= idx && idx < devEnd {
			posIdx = idx - devBegin
			return &storagePointer{
				uint8(dev),
				posIdx,
				idx,
			}, nil
		}
		devBegin += s.Cap
		dev++
	}

114 115 116 117 118
	return &storagePointer{
		uint8(len(c.storages)),
		0,
		0,
	}, errors.ErrContextIDMapping
D
DoMyJob 已提交
119 120 121
}

func (c *Context) forward() error {
122
	sp := c.sp
D
DoMyJob 已提交
123 124
	sp.posIdx++
	if sp.posIdx == c.storages[sp.dev].Cap {
125 126
		if int(sp.dev+1) == len(c.storages) {
			return errors.ErrDataOverflow
D
DoMyJob 已提交
127
		}
D
DoMyJob 已提交
128 129 130
		if debugPrint {
			fmt.Println("Move to next dev", sp.dev+1)
		}
D
DoMyJob 已提交
131 132 133 134 135 136 137 138
		sp.dev++
		sp.posIdx = 0
	}

	sp.index++
	return nil
}

139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
func (c *Context) fastforward(n int, commit bool) error {
	sp := *c.sp
	var err error
	i := 0
	for i = 0; i < n && err == nil; i++ {
			err = c.forward()
	}
	if !commit {
			*c.sp = sp
	}

	if i < n && err != nil {
			// last i reach the eof is ok.
			return err
	}

	return nil
}

158 159 160
func (c *Context) save() *storagePointer {
	saveSP := *c.sp;
	return &saveSP
161 162
}

163 164
func (c *Context) restore(sp *storagePointer) {
	*c.sp = *sp
165 166
}

D
DoMyJob 已提交
167
func (c *Context) eof() bool {
168 169
	sp := c.sp
	return sp.dev >= uint8(len(c.storages)) || (sp.dev == uint8(len(c.storages)-1) && sp.posIdx == c.storages[sp.dev].Cap)
D
DoMyJob 已提交
170 171
}

172 173
func (c *Context) setEOF() {
	sp := c.sp
D
DoMyJob 已提交
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
	sp.dev = uint8(len(c.storages) - 1)
	sp.posIdx = c.storages[sp.dev].Cap
}

// Get gets the value from offset of the correct device
func (c *Context) Get(globalIdx ydcommon.IndexTableValue) (value []byte, err error) {
	c.lock.RLock()
	defer c.lock.RUnlock()
	sp, err := c.locate(uint32(globalIdx))
	if err != nil {
		return nil, err
	}

	if debugPrint {
		fmt.Printf("get data globalId %d @%v\n", globalIdx, sp)
	}

	return c.storages[sp.dev].Disk.ReadData(ydcommon.IndexTableValue(sp.posIdx))
}

194
// Put puts the vale to offset that current sp points to of the corrent device
D
DoMyJob 已提交
195 196 197
func (c *Context) Put(value []byte) (uint32, error) {
	c.lock.Lock()
	defer c.lock.Unlock()
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
	index, err := c.putAt(value, c.sp)
	if err != nil {
		return index, err
	}
	c.forward()
	return index, nil
}

// PutAt puts the vale to specific offset of the corrent device
func (c *Context) PutAt(value []byte, globalID uint32) (uint32, error) {
	c.lock.Lock()
	defer c.lock.Unlock()
	sp, err := c.locate(globalID)
	if err != nil {
		return 0, err
	}
	index, err := c.putAt(value, sp)
	if err != nil {
		return index, err
	}
	return index, nil
}

221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
// BatchPut puts the value array to offset that current sp points to of the corrent device
func (c *Context) BatchPut(cnt int, valueArray []byte) (uint32, error) {
	c.lock.Lock()
	defer c.lock.Unlock()

	// TODO: Can we leave this check to disk??
	if err := c.fastforward(cnt, false); err != nil {
			return 0, err
	}

	var err error
	var index uint32
	if (c.sp.posIdx + uint32(cnt) <= c.storages[c.sp.dev].Cap) {
			index, err = c.putAt(valueArray, c.sp)
	} else {
			currentSP := *c.sp;
			step1 := c.storages[currentSP.dev].Cap - currentSP.posIdx
			index, err = c.putAt(valueArray[:step1*c.config.DataBlockSize], &currentSP)
			step2 := uint32(cnt) - step1
			currentSP.dev++
			currentSP.posIdx = 0
			currentSP.index += step1
			if (currentSP.posIdx + uint32(step2) > c.storages[currentSP.dev].Cap) {
					return 0, errors.New("Batch across 3 storage devices, not supported")
			}
			_, err = c.putAt(valueArray[step1*c.config.DataBlockSize:], &currentSP)
	}

	if err != nil {
			return 0, err
	}
	c.fastforward(cnt, true)
	return index, nil
}

256
func (c *Context) putAt(value []byte, sp *storagePointer) (uint32, error) {
D
DoMyJob 已提交
257
	if c.eof() {
258
		return 0, errors.ErrDataOverflow
D
DoMyJob 已提交
259 260 261
	}

	if debugPrint {
D
DoMyJob 已提交
262
		fmt.Printf("put data %x @ %v\n", value[:32], sp)
D
DoMyJob 已提交
263 264
	}

265 266
	dataPos := sp.posIdx
	err := c.storages[sp.dev].Disk.WriteData(ydcommon.IndexTableValue(dataPos), value)
D
DoMyJob 已提交
267
	if err != nil {
268
		return sp.index, err
D
DoMyJob 已提交
269
	}
270
	return sp.index, nil
D
DoMyJob 已提交
271 272 273 274 275 276
}

// Close finishes all actions and close all storages
func (c *Context) Close() {
	for _, storage := range c.storages {
		storage.Disk.Close()
277
		c.setEOF()
D
DoMyJob 已提交
278 279 280 281 282
	}
}

// Reset reset current context.
func (c *Context) Reset() {
283
	c.sp = &storagePointer{0, 0, 0}
D
DoMyJob 已提交
284 285 286
	for _, storage := range c.storages {
		storage.Disk.Format()
	}
D
DoMyJob 已提交
287
}