提交 5d150204 编写于 作者: H Hongze Cheng

more refact

上级 80cfde72
...@@ -1562,8 +1562,8 @@ typedef struct SVCreateTbReq { ...@@ -1562,8 +1562,8 @@ typedef struct SVCreateTbReq {
int8_t type; int8_t type;
union { union {
struct { struct {
tb_uid_t suid; tb_uid_t suid;
const void* pTag; const uint8_t* pTag;
} ctb; } ctb;
struct { struct {
SSchemaWrapper schema; SSchemaWrapper schema;
...@@ -2593,12 +2593,12 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) { ...@@ -2593,12 +2593,12 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) {
#define TD_AUTO_CREATE_TABLE 0x1 #define TD_AUTO_CREATE_TABLE 0x1
typedef struct { typedef struct {
int64_t suid; int64_t suid;
int64_t uid; int64_t uid;
int32_t sver; int32_t sver;
uint64_t nData; uint64_t nData;
const void* pData; const uint8_t* pData;
SVCreateTbReq cTbReq; SVCreateTbReq cTbReq;
} SVSubmitBlk; } SVSubmitBlk;
typedef struct { typedef struct {
......
...@@ -138,7 +138,7 @@ static int32_t tEncodeU64v(SCoder* pEncoder, uint64_t val); ...@@ -138,7 +138,7 @@ static int32_t tEncodeU64v(SCoder* pEncoder, uint64_t val);
static int32_t tEncodeI64v(SCoder* pEncoder, int64_t val); static int32_t tEncodeI64v(SCoder* pEncoder, int64_t val);
static int32_t tEncodeFloat(SCoder* pEncoder, float val); static int32_t tEncodeFloat(SCoder* pEncoder, float val);
static int32_t tEncodeDouble(SCoder* pEncoder, double val); static int32_t tEncodeDouble(SCoder* pEncoder, double val);
static int32_t tEncodeBinary(SCoder* pEncoder, const void* val, uint64_t len); static int32_t tEncodeBinary(SCoder* pEncoder, const uint8_t* val, uint64_t len);
static int32_t tEncodeCStrWithLen(SCoder* pEncoder, const char* val, uint64_t len); static int32_t tEncodeCStrWithLen(SCoder* pEncoder, const char* val, uint64_t len);
static int32_t tEncodeCStr(SCoder* pEncoder, const char* val); static int32_t tEncodeCStr(SCoder* pEncoder, const char* val);
...@@ -162,7 +162,7 @@ static int32_t tDecodeU64v(SCoder* pDecoder, uint64_t* val); ...@@ -162,7 +162,7 @@ static int32_t tDecodeU64v(SCoder* pDecoder, uint64_t* val);
static int32_t tDecodeI64v(SCoder* pDecoder, int64_t* val); static int32_t tDecodeI64v(SCoder* pDecoder, int64_t* val);
static int32_t tDecodeFloat(SCoder* pDecoder, float* val); static int32_t tDecodeFloat(SCoder* pDecoder, float* val);
static int32_t tDecodeDouble(SCoder* pDecoder, double* val); static int32_t tDecodeDouble(SCoder* pDecoder, double* val);
static int32_t tDecodeBinary(SCoder* pDecoder, const void** val, uint64_t* len); static int32_t tDecodeBinary(SCoder* pDecoder, const uint8_t** val, uint64_t* len);
static int32_t tDecodeCStrAndLen(SCoder* pDecoder, const char** val, uint64_t* len); static int32_t tDecodeCStrAndLen(SCoder* pDecoder, const char** val, uint64_t* len);
static int32_t tDecodeCStr(SCoder* pDecoder, const char** val); static int32_t tDecodeCStr(SCoder* pDecoder, const char** val);
static int32_t tDecodeCStrTo(SCoder* pDecoder, char* val); static int32_t tDecodeCStrTo(SCoder* pDecoder, char* val);
...@@ -292,7 +292,7 @@ static FORCE_INLINE int32_t tEncodeDouble(SCoder* pEncoder, double val) { ...@@ -292,7 +292,7 @@ static FORCE_INLINE int32_t tEncodeDouble(SCoder* pEncoder, double val) {
return tEncodeU64(pEncoder, v.ui); return tEncodeU64(pEncoder, v.ui);
} }
static FORCE_INLINE int32_t tEncodeBinary(SCoder* pEncoder, const void* val, uint64_t len) { static FORCE_INLINE int32_t tEncodeBinary(SCoder* pEncoder, const uint8_t* val, uint64_t len) {
if (tEncodeU64v(pEncoder, len) < 0) return -1; if (tEncodeU64v(pEncoder, len) < 0) return -1;
if (pEncoder->data) { if (pEncoder->data) {
if (TD_CODER_CHECK_CAPACITY_FAILED(pEncoder, len)) return -1; if (TD_CODER_CHECK_CAPACITY_FAILED(pEncoder, len)) return -1;
...@@ -413,7 +413,7 @@ static FORCE_INLINE int32_t tDecodeDouble(SCoder* pDecoder, double* val) { ...@@ -413,7 +413,7 @@ static FORCE_INLINE int32_t tDecodeDouble(SCoder* pDecoder, double* val) {
return 0; return 0;
} }
static FORCE_INLINE int32_t tDecodeBinary(SCoder* pDecoder, const void** val, uint64_t* len) { static FORCE_INLINE int32_t tDecodeBinary(SCoder* pDecoder, const uint8_t** val, uint64_t* len) {
if (tDecodeU64v(pDecoder, len) < 0) return -1; if (tDecodeU64v(pDecoder, len) < 0) return -1;
if (TD_CODER_CHECK_CAPACITY_FAILED(pDecoder, *len)) return -1; if (TD_CODER_CHECK_CAPACITY_FAILED(pDecoder, *len)) return -1;
...@@ -426,7 +426,7 @@ static FORCE_INLINE int32_t tDecodeBinary(SCoder* pDecoder, const void** val, ui ...@@ -426,7 +426,7 @@ static FORCE_INLINE int32_t tDecodeBinary(SCoder* pDecoder, const void** val, ui
} }
static FORCE_INLINE int32_t tDecodeCStrAndLen(SCoder* pDecoder, const char** val, uint64_t* len) { static FORCE_INLINE int32_t tDecodeCStrAndLen(SCoder* pDecoder, const char** val, uint64_t* len) {
if (tDecodeBinary(pDecoder, (const void**)val, len) < 0) return -1; if (tDecodeBinary(pDecoder, (const uint8_t**)val, len) < 0) return -1;
(*len) -= 1; (*len) -= 1;
return 0; return 0;
} }
......
...@@ -90,7 +90,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur); ...@@ -90,7 +90,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur);
// tsdb // tsdb
// typedef struct STsdb STsdb; // typedef struct STsdb STsdb;
typedef void *tsdbReaderT; typedef void *tsdbReaderT;
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 #define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 #define BLOCK_LOAD_TABLE_SEQ_ORDER 2
...@@ -108,12 +108,12 @@ int32_t tsdbQuerySTableByTagCond(void *pMeta, uint64_t uid, TSKEY skey, con ...@@ -108,12 +108,12 @@ int32_t tsdbQuerySTableByTagCond(void *pMeta, uint64_t uid, TSKEY skey, con
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle); int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle);
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle); bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool* allHave); int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList); SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList);
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond); void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond);
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
int32_t tsdbGetTableGroupFromIdList(SVnode *pVnode, SArray *pTableIdList, STableGroupInfo *pGroupInfo); int32_t tsdbGetTableGroupFromIdList(SVnode *pVnode, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
// tq // tq
...@@ -126,8 +126,8 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList ...@@ -126,8 +126,8 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle); bool tqNextDataBlock(STqReadHandle *pHandle);
int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, uint64_t* pUid, int32_t *pNumOfRows, int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, uint64_t *pUid,
int16_t *pNumOfCols); int32_t *pNumOfRows, int16_t *pNumOfCols);
// need to reposition // need to reposition
...@@ -189,10 +189,10 @@ struct SMetaEntry { ...@@ -189,10 +189,10 @@ struct SMetaEntry {
SSchemaWrapper schemaTag; SSchemaWrapper schemaTag;
} stbEntry; } stbEntry;
struct { struct {
int64_t ctime; int64_t ctime;
int32_t ttlDays; int32_t ttlDays;
tb_uid_t suid; tb_uid_t suid;
const void *pTags; const uint8_t *pTags;
} ctbEntry; } ctbEntry;
struct { struct {
int64_t ctime; int64_t ctime;
......
此差异已折叠。
...@@ -63,14 +63,18 @@ struct SMemSkipListCurosr { ...@@ -63,14 +63,18 @@ struct SMemSkipListCurosr {
SMemSkipListNode *pNodeC; SMemSkipListNode *pNodeC;
}; };
#define HASH_BUCKET(SUID, UID, NBUCKET) (TABS((SUID) + (UID)) % (NBUCKET))
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2) #define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
#define SL_NODE_HALF_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)) #define SL_NODE_HALF_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l))
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) #define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) #define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level)) #define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
#define SL_HEAD_NODE(sl) ((sl)->pHead) #define SL_HEAD_NODE(sl) ((sl)->pHead)
#define SL_TAIL_NODE(sl) ((SMemSkipListNode *)&SL_NODE_FORWARD(SL_HEAD_NODE(sl), (sl)->maxLevel)) #define SL_TAIL_NODE(sl) ((SMemSkipListNode *)&SL_NODE_FORWARD(SL_HEAD_NODE(sl), (sl)->maxLevel))
#define SL_HEAD_NODE_FORWARD(n, l) SL_NODE_FORWARD(n, l)
#define SL_TAIL_NODE_BACKWARD(n, l) SL_NODE_FORWARD(n, l)
// SMemTable // SMemTable
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) { int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) {
...@@ -111,23 +115,18 @@ int32_t tsdbMemTableDestroy2(STsdb *pTsdb, SMemTable *pMemTb) { ...@@ -111,23 +115,18 @@ int32_t tsdbMemTableDestroy2(STsdb *pTsdb, SMemTable *pMemTb) {
} }
int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *pSubmitBlk) { int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *pSubmitBlk) {
SMemData *pMemData; SMemData *pMemData;
STsdb *pTsdb = pMemTb->pTsdb; STsdb *pTsdb = pMemTb->pTsdb;
SVnode *pVnode = pTsdb->pVnode; SVnode *pVnode = pTsdb->pVnode;
SVBufPool *pPool = pVnode->inUse; SVBufPool *pPool = pVnode->inUse;
int32_t hash; tb_uid_t suid = pSubmitBlk->suid;
int32_t tlen; tb_uid_t uid = pSubmitBlk->uid;
uint8_t buf[16]; int32_t iBucket;
int32_t rlen;
const uint8_t *p; // search SMemData by hash
SMemSkipListNode *pSlNode; iBucket = HASH_BUCKET(suid, uid, pMemTb->nBucket);
const STSRow *pTSRow; for (pMemData = pMemTb->pBuckets[iBucket]; pMemData; pMemData = pMemData->pHashNext) {
SMemSkipListCurosr slc = {0}; if (pMemData->suid == suid && pMemData->uid == uid) break;
// search hash
hash = (pSubmitBlk->suid + pSubmitBlk->uid) % pMemTb->nBucket;
for (pMemData = pMemTb->pBuckets[hash]; pMemData; pMemData = pMemData->pHashNext) {
if (pMemData->suid == pSubmitBlk->suid && pMemData->uid == pSubmitBlk->uid) break;
} }
// create pMemData if need // create pMemData if need
...@@ -143,8 +142,8 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p ...@@ -143,8 +142,8 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
} }
pMemData->pHashNext = NULL; pMemData->pHashNext = NULL;
pMemData->suid = pSubmitBlk->suid; pMemData->suid = suid;
pMemData->uid = pSubmitBlk->uid; pMemData->uid = uid;
pMemData->minKey = TSKEY_MAX; pMemData->minKey = TSKEY_MAX;
pMemData->maxKey = TSKEY_MIN; pMemData->maxKey = TSKEY_MIN;
pMemData->minVer = -1; pMemData->minVer = -1;
...@@ -159,55 +158,61 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p ...@@ -159,55 +158,61 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
pHead->level = maxLevel; pHead->level = maxLevel;
pTail->level = maxLevel; pTail->level = maxLevel;
for (int iLevel = 0; iLevel < maxLevel; iLevel++) { for (int iLevel = 0; iLevel < maxLevel; iLevel++) {
SL_NODE_FORWARD(pHead, iLevel) = pTail; SL_HEAD_NODE_FORWARD(pHead, iLevel) = pTail;
SL_NODE_FORWARD(pTail, iLevel) = pHead; SL_TAIL_NODE_BACKWARD(pTail, iLevel) = pHead;
} }
// add to MemTable // add to hash
hash = (pMemData->suid + pMemData->uid) % pMemTb->nBucket; if (pMemTb->nHash >= pMemTb->nBucket) {
pMemData->pHashNext = pMemTb->pBuckets[hash]; // rehash (todo)
pMemTb->pBuckets[hash] = pMemData; }
iBucket = HASH_BUCKET(suid, uid, pMemTb->nBucket);
pMemData->pHashNext = pMemTb->pBuckets[iBucket];
pMemTb->pBuckets[iBucket] = pMemData;
pMemTb->nHash++; pMemTb->nHash++;
// sort organize (todo)
} }
// loop to insert data to skiplist // do insert data to SMemData
#if 0 SMemSkipListCurosr slc = {0};
tsdbMemSkipListCursorOpen(&slc, &pMemData->sl); const uint8_t *p = pSubmitBlk->pData;
p = pSubmitBlk->pData;
for (;;) { // tsdbMemSkipListCursorOpen(&slc, &pMemData->sl);
if (p - (uint8_t *)pSubmitBlk->pData >= pSubmitBlk->nData) break; for (; p - pSubmitBlk->pData < pSubmitBlk->nData;) {
// if (p - (uint8_t *)pSubmitBlk->pData >= pSubmitBlk->nData) break;
const uint8_t *pt = p; // const uint8_t *pt = p;
p = tGetBinary(p, &pTSRow, &rlen); // p = tGetBinary(p, &pTSRow, &rlen);
// check the row (todo) // // check the row (todo)
// move the cursor to position to write (todo) // // move the cursor to position to write (todo)
int32_t c; // int32_t c;
tsdbMemSkipListCursorMoveTo(&slc, pTSRow, version, &c); // tsdbMemSkipListCursorMoveTo(&slc, pTSRow, version, &c);
ASSERT(c); // ASSERT(c);
// encode row // // encode row
int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl); // int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl);
int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (p - pt); // int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (p - pt);
pSlNode = vnodeBufPoolMalloc(pPool, tsize); // pSlNode = vnodeBufPoolMalloc(pPool, tsize);
pSlNode->level = level; // pSlNode->level = level;
uint8_t *pData = SL_NODE_DATA(pSlNode); // uint8_t *pData = SL_NODE_DATA(pSlNode);
*(int64_t *)pData = version; // *(int64_t *)pData = version;
pData += sizeof(version); // pData += sizeof(version);
memcpy(pData, pt, p - pt); // memcpy(pData, pt, p - pt);
// insert row // // insert row
tsdbMemSkipListCursorPut(&slc, pSlNode); // tsdbMemSkipListCursorPut(&slc, pSlNode);
// update status // // update status
if (pTSRow->ts < pMemData->minKey) pMemData->minKey = pTSRow->ts; // if (pTSRow->ts < pMemData->minKey) pMemData->minKey = pTSRow->ts;
if (pTSRow->ts > pMemData->maxKey) pMemData->maxKey = pTSRow->ts; // if (pTSRow->ts > pMemData->maxKey) pMemData->maxKey = pTSRow->ts;
} }
tsdbMemSkipListCursorClose(&slc); // tsdbMemSkipListCursorClose(&slc);
#endif
// update status
if (pMemData->minVer == -1) pMemData->minVer = version; if (pMemData->minVer == -1) pMemData->minVer = version;
if (pMemData->maxVer == -1 || pMemData->maxVer < version) pMemData->maxVer = version; if (pMemData->maxVer == -1 || pMemData->maxVer < version) pMemData->maxVer = version;
...@@ -217,8 +222,4 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p ...@@ -217,8 +222,4 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
if (pMemTb->maxVer == -1 || pMemTb->maxVer < version) pMemTb->maxVer = version; if (pMemTb->maxVer == -1 || pMemTb->maxVer < version) pMemTb->maxVer = version;
return 0; return 0;
} }
\ No newline at end of file
// SMemData
// SMemSkipList
\ No newline at end of file
...@@ -17,6 +17,6 @@ target_include_directories( ...@@ -17,6 +17,6 @@ target_include_directories(
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
) )
#if(${BUILD_TEST}) if(${BUILD_TEST})
ADD_SUBDIRECTORY(test) ADD_SUBDIRECTORY(test)
#endif(${BUILD_TEST}) endif(${BUILD_TEST})
...@@ -411,7 +411,7 @@ SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) { ...@@ -411,7 +411,7 @@ SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) {
} }
uint64_t len; uint64_t len;
char* data = NULL; char* data = NULL;
if (tDecodeBinary(&decoder, (const void**)(&data), &len) < 0) { if (tDecodeBinary(&decoder, (const uint8_t**)(&data), &len) < 0) {
return NULL; return NULL;
} }
assert(len = pMsg->dataLen); assert(len = pMsg->dataLen);
...@@ -670,7 +670,7 @@ SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen) { ...@@ -670,7 +670,7 @@ SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen) {
} }
uint64_t len; uint64_t len;
char* data = NULL; char* data = NULL;
if (tDecodeBinary(&decoder, (const void**)(&data), &len) < 0) { if (tDecodeBinary(&decoder, (const uint8_t**)(&data), &len) < 0) {
return NULL; return NULL;
} }
assert(len = pMsg->dataLen); assert(len = pMsg->dataLen);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册