提交 269fe312 编写于 作者: H Hongze Cheng

make compile

上级 69edf751
...@@ -181,6 +181,21 @@ typedef struct SSubmitMsg { ...@@ -181,6 +181,21 @@ typedef struct SSubmitMsg {
char blocks[]; char blocks[];
} SSubmitMsg; } SSubmitMsg;
typedef struct {
int32_t totalLen;
int32_t len;
SMemRow row;
} SSubmitBlkIter;
typedef struct {
int32_t totalLen;
int32_t len;
void* pMsg;
} SSubmitMsgIter;
int tsdbInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter);
int tsdbGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
typedef struct { typedef struct {
int32_t index; // index of failed block in submit blocks int32_t index; // index of failed block in submit blocks
int32_t vnode; // vnode index of failed block int32_t vnode; // vnode index of failed block
......
...@@ -26,10 +26,12 @@ extern "C" { ...@@ -26,10 +26,12 @@ extern "C" {
typedef struct STsdb STsdb; typedef struct STsdb STsdb;
typedef struct STsdbCfg { typedef struct STsdbCfg {
int8_t precision;
uint64_t lruCacheSize; uint64_t lruCacheSize;
uint32_t keep0; uint32_t keep;
uint32_t keep1; uint32_t keep1;
uint32_t keep2; uint32_t keep2;
int32_t daysPerFile;
} STsdbCfg; } STsdbCfg;
// STsdb // STsdb
......
...@@ -27,6 +27,42 @@ ...@@ -27,6 +27,42 @@
#undef TD_MSG_SEG_CODE_ #undef TD_MSG_SEG_CODE_
#include "tmsgdef.h" #include "tmsgdef.h"
int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
}
pIter->totalLen = pMsg->length;
pIter->len = 0;
pIter->pMsg = pMsg;
if (pMsg->length <= sizeof(SSubmitMsg)) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
}
return 0;
}
int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
if (pIter->len == 0) {
pIter->len += sizeof(SSubmitMsg);
} else {
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen);
}
if (pIter->len > pIter->totalLen) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
*pPBlock = NULL;
return -1;
}
*pPBlock = (pIter->len == pIter->totalLen) ? NULL : (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
return 0;
}
int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
int tlen = 0; int tlen = 0;
......
...@@ -554,7 +554,7 @@ static void dndGenerateVnodeCfg(SCreateVnodeMsg *pCreate, SVnodeCfg *pCfg) { ...@@ -554,7 +554,7 @@ static void dndGenerateVnodeCfg(SCreateVnodeMsg *pCreate, SVnodeCfg *pCfg) {
pCfg->ttl = 4; pCfg->ttl = 4;
pCfg->keep = pCreate->daysToKeep0; pCfg->keep = pCreate->daysToKeep0;
pCfg->isWeak = true; pCfg->isWeak = true;
pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0; pCfg->tsdbCfg.keep = pCreate->daysToKeep0;
pCfg->tsdbCfg.keep1 = pCreate->daysToKeep2; pCfg->tsdbCfg.keep1 = pCreate->daysToKeep2;
pCfg->tsdbCfg.keep2 = pCreate->daysToKeep0; pCfg->tsdbCfg.keep2 = pCreate->daysToKeep0;
pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize; pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize;
......
...@@ -17,12 +17,15 @@ ...@@ -17,12 +17,15 @@
#define _TD_TSDB_DEF_H_ #define _TD_TSDB_DEF_H_
#include "mallocator.h" #include "mallocator.h"
#include "tmsg.h" #include "tglobal.h"
#include "tlist.h"
#include "thash.h" #include "thash.h"
#include "tlist.h"
#include "tmsg.h"
#include "tskiplist.h" #include "tskiplist.h"
#include "ttime.h"
#include "tsdb.h" #include "tsdb.h"
#include "tsdbLog.h"
#include "tsdbMemTable.h" #include "tsdbMemTable.h"
#include "tsdbOptions.h" #include "tsdbOptions.h"
...@@ -31,13 +34,16 @@ extern "C" { ...@@ -31,13 +34,16 @@ extern "C" {
#endif #endif
struct STsdb { struct STsdb {
int32_t vgId;
char * path; char * path;
STsdbCfg options; STsdbCfg config;
STsdbMemTable * mem; STsdbMemTable * mem;
STsdbMemTable * imem; STsdbMemTable * imem;
SMemAllocatorFactory *pmaf; SMemAllocatorFactory *pmaf;
}; };
#define REPO_ID(r) (r)->vgId
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_LOG_H_
#define _TD_TSDB_LOG_H_
#include "tlog.h"
extern int32_t tsdbDebugFlag;
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} while(0)
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }} while(0)
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }} while(0)
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }} while(0)
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
#endif /* _TD_TSDB_LOG_H_ */
\ No newline at end of file
...@@ -24,9 +24,9 @@ extern "C" { ...@@ -24,9 +24,9 @@ extern "C" {
typedef struct STsdbMemTable STsdbMemTable; typedef struct STsdbMemTable STsdbMemTable;
STsdbMemTable *tsdbNewMemTable(SMemAllocatorFactory *pMAF); STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb);
void tsdbFreeMemTable(SMemAllocatorFactory *pMAF, STsdbMemTable *pMemTable); void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable);
int tsdbInsertDataToMemTable(STsdbMemTable *pMemTable, SSubmitMsg *pMsg); int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -72,7 +72,7 @@ static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorF ...@@ -72,7 +72,7 @@ static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorF
} }
pTsdb->path = strdup(path); pTsdb->path = strdup(path);
tsdbOptionsCopy(&(pTsdb->options), pTsdbCfg); tsdbOptionsCopy(&(pTsdb->config), pTsdbCfg);
pTsdb->pmaf = pMAF; pTsdb->pmaf = pMAF;
return pTsdb; return pTsdb;
......
...@@ -15,22 +15,9 @@ ...@@ -15,22 +15,9 @@
#include "tsdbDef.h" #include "tsdbDef.h"
#if 1 struct STbData {
typedef struct STbData { tb_uid_t uid;
TD_SLIST_NODE(STbData); };
SSubmitMsg *pMsg;
} STbData;
#else
typedef struct STbData {
TD_SLIST_NODE(STbData);
uint64_t uid; // TODO: change here as tb_uid_t
TSKEY keyMin;
TSKEY keyMax;
uint64_t nRows;
SSkipList *pData; // Here need a container, may not use the SL
T_REF_DECLARE()
} STbData;
#endif
struct STsdbMemTable { struct STsdbMemTable {
T_REF_DECLARE() T_REF_DECLARE()
...@@ -40,45 +27,169 @@ struct STsdbMemTable { ...@@ -40,45 +27,169 @@ struct STsdbMemTable {
uint64_t nRow; uint64_t nRow;
SMemAllocator *pMA; SMemAllocator *pMA;
// Container // Container
#if 1
SSkipList *pData; // SSkiplist<STbData>
SHashObj * pHashIdx;
#else
TD_SLIST(STbData) list; TD_SLIST(STbData) list;
#endif
}; };
STsdbMemTable *tsdbNewMemTable(SMemAllocatorFactory *pMAF) { static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg);
STsdbMemTable *pMemTable;
SMemAllocator *pMA;
pMA = (*pMAF->create)(pMAF);
ASSERT(pMA != NULL);
pMemTable = (STsdbMemTable *)TD_MA_MALLOC(pMA, sizeof(*pMemTable)); STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb) {
STsdbMemTable *pMemTable = (STsdbMemTable *)calloc(1, sizeof(*pMemTable));
if (pMemTable == NULL) { if (pMemTable == NULL) {
(*pMAF->destroy)(pMAF, pMA); terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
T_REF_INIT_VAL(pMemTable, 1); T_REF_INIT_VAL(pMemTable, 1);
taosInitRWLatch(&(pMemTable->latch)); taosInitRWLatch(&(pMemTable->latch));
pMemTable->keyMin = TSKEY_MAX;
pMemTable->keyMax = TSKEY_MIN; pMemTable->keyMax = TSKEY_MIN;
pMemTable->keyMin = TSKEY_MAX;
pMemTable->nRow = 0; pMemTable->nRow = 0;
pMemTable->pMA = pMA; pMemTable->pMA = pTsdb->pmaf->create(pTsdb->pmaf);
TD_SLIST_INIT(&(pMemTable->list)); if (pMemTable->pMA == NULL) {
free(pMemTable);
return NULL;
}
// Initialize the container
pMemTable->pData =
tSkipListCreate(5, TSDB_DATA_TYPE_BIGINT, sizeof(tb_uid_t), NULL /*TODO*/, SL_DISCARD_DUP_KEY, NULL /* TODO */);
if (pMemTable->pData == NULL) {
pTsdb->pmaf->destroy(pTsdb->pmaf, pMemTable->pMA);
free(pMemTable);
return NULL;
}
pMemTable->pHashIdx = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (pMemTable->pHashIdx == NULL) {
pTsdb->pmaf->destroy(pTsdb->pmaf, pMemTable->pMA);
tSkipListDestroy(pMemTable->pData);
free(pMemTable);
return NULL;
}
// TODO
return pMemTable; return pMemTable;
} }
void tsdbFreeMemTable(SMemAllocatorFactory *pMAF, STsdbMemTable *pMemTable) { void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable) {
SMemAllocator *pMA = pMemTable->pMA; if (pMemTable) {
taosHashCleanup(pMemTable->pHashIdx);
tSkipListDestroy(pMemTable->pData);
if (pMemTable->pMA) {
pTsdb->pmaf->destroy(pTsdb->pmaf, pMemTable->pMA);
}
free(pMemTable);
}
}
if (TD_MA_FREE_FUNC(pMA) != NULL) { int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) {
// TODO SSubmitBlk * pBlock = NULL;
ASSERT(0); SSubmitMsgIter msgIter = {0};
int32_t affectedrows = 0, numOfRows = 0;
if (tsdbScanAndConvertSubmitMsg(pTsdb, pMsg) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) {
tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return -1;
}
tsdbInitSubmitMsgIter(pMsg, &msgIter);
while (true) {
tsdbGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break;
#if 0
if (tsdbInsertDataToTable(pTsdb, pBlock, &affectedrows) < 0) {
return -1;
}
#endif
numOfRows += pBlock->numOfRows;
}
if (pRsp != NULL) {
pRsp->affectedRows = htonl(affectedrows);
pRsp->numOfRows = htonl(numOfRows);
} }
(*pMAF->destroy)(pMAF, pMA); return 0;
} }
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg) {
ASSERT(pMsg != NULL);
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
SSubmitMsgIter msgIter = {0};
SSubmitBlk * pBlock = NULL;
SSubmitBlkIter blkIter = {0};
SMemRow row = NULL;
TSKEY now = taosGetTimestamp(pTsdb->config.precision);
TSKEY minKey = now - tsTickPerDay[pTsdb->config.precision] * pTsdb->config.keep;
TSKEY maxKey = now + tsTickPerDay[pTsdb->config.precision] * pTsdb->config.daysPerFile;
terrno = TSDB_CODE_SUCCESS;
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
while (true) {
if (tsdbGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (pBlock == NULL) break;
pBlock->uid = htobe64(pBlock->uid);
pBlock->tid = htonl(pBlock->tid);
pBlock->sversion = htonl(pBlock->sversion);
pBlock->dataLen = htonl(pBlock->dataLen);
pBlock->schemaLen = htonl(pBlock->schemaLen);
pBlock->numOfRows = htons(pBlock->numOfRows);
#if 0
if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) {
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pTsdb), pBlock->uid,
pBlock->tid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
STable *pTable = pMeta->tables[pBlock->tid];
if (pTable == NULL || TABLE_UID(pTable) != pBlock->uid) {
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pTsdb), pBlock->uid,
pBlock->tid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
tsdbError("vgId:%d invalid action trying to insert a super table %s", REPO_ID(pTsdb), TABLE_CHAR_NAME(pTable));
terrno = TSDB_CODE_TDB_INVALID_ACTION;
return -1;
}
// Check schema version and update schema if needed
if (tsdbCheckTableSchema(pTsdb, pBlock, pTable) < 0) {
if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
continue;
} else {
return -1;
}
}
tsdbInitSubmitBlkIter(pBlock, &blkIter);
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
if (tsdbCheckRowRange(pTsdb, pTable, row, minKey, maxKey, now) < 0) {
return -1;
}
}
#endif
}
if (terrno != TSDB_CODE_SUCCESS) return -1;
return 0;
}
#if 0
int tsdbInsertDataToMemTable(STsdbMemTable *pMemTable, SSubmitMsg *pMsg) { int tsdbInsertDataToMemTable(STsdbMemTable *pMemTable, SSubmitMsg *pMsg) {
SMemAllocator *pMA = pMemTable->pMA; SMemAllocator *pMA = pMemTable->pMA;
STbData * pTbData = (STbData *)TD_MA_MALLOC(pMA, sizeof(*pTbData)); STbData * pTbData = (STbData *)TD_MA_MALLOC(pMA, sizeof(*pTbData));
...@@ -91,29 +202,11 @@ int tsdbInsertDataToMemTable(STsdbMemTable *pMemTable, SSubmitMsg *pMsg) { ...@@ -91,29 +202,11 @@ int tsdbInsertDataToMemTable(STsdbMemTable *pMemTable, SSubmitMsg *pMsg) {
return 0; return 0;
} }
/* ------------------------ STATIC METHODS ------------------------ */
#if 0
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdataformat.h" #include "tdataformat.h"
#include "tfunctional.h" #include "tfunctional.h"
#include "tsdbRowMergeBuf.h"
#include "tsdbint.h" #include "tsdbint.h"
#include "tskiplist.h" #include "tskiplist.h"
#include "tsdbRowMergeBuf.h"
#define TSDB_DATA_SKIPLIST_LEVEL 5 #define TSDB_DATA_SKIPLIST_LEVEL 5
#define TSDB_MAX_INSERT_BATCH 512 #define TSDB_MAX_INSERT_BATCH 512
...@@ -149,37 +242,6 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, ...@@ -149,37 +242,6 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable,
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey, static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey,
TSKEY now); TSKEY now);
int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) {
STsdbRepo * pRepo = repo;
SSubmitMsgIter msgIter = {0};
SSubmitBlk * pBlock = NULL;
int32_t affectedrows = 0, numOfRows = 0;
if (tsdbScanAndConvertSubmitMsg(pRepo, pMsg) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) {
tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno));
}
return -1;
}
tsdbInitSubmitMsgIter(pMsg, &msgIter);
while (true) {
tsdbGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break;
if (tsdbInsertDataToTable(pRepo, pBlock, &affectedrows) < 0) {
return -1;
}
numOfRows += pBlock->numOfRows;
}
if (pRsp != NULL) {
pRsp->affectedRows = htonl(affectedrows);
pRsp->numOfRows = htonl(numOfRows);
}
if (tsdbCheckCommit(pRepo) < 0) return -1;
return 0;
}
// ---------------- INTERNAL FUNCTIONS ---------------- // ---------------- INTERNAL FUNCTIONS ----------------
int tsdbRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { int tsdbRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
...@@ -564,59 +626,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -564,59 +626,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
} }
// ---------------- LOCAL FUNCTIONS ---------------- // ---------------- LOCAL FUNCTIONS ----------------
static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
SMemTable *pMemTable = (SMemTable *)calloc(1, sizeof(*pMemTable));
if (pMemTable == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
pMemTable->keyFirst = INT64_MAX;
pMemTable->keyLast = 0;
pMemTable->numOfRows = 0;
pMemTable->maxTables = pMeta->maxTables;
pMemTable->tData = (STableData **)calloc(pMemTable->maxTables, sizeof(STableData *));
if (pMemTable->tData == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
pMemTable->actList = tdListNew(0);
if (pMemTable->actList == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
pMemTable->bufBlockList = tdListNew(sizeof(STsdbBufBlock*));
if (pMemTable->bufBlockList == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
T_REF_INC(pMemTable);
return pMemTable;
_err:
tsdbFreeMemTable(pMemTable);
return NULL;
}
static void tsdbFreeMemTable(SMemTable* pMemTable) {
if (pMemTable) {
ASSERT((pMemTable->bufBlockList == NULL) ? true : (listNEles(pMemTable->bufBlockList) == 0));
ASSERT((pMemTable->actList == NULL) ? true : (listNEles(pMemTable->actList) == 0));
tdListFree(pMemTable->extraBuffList);
tdListFree(pMemTable->bufBlockList);
tdListFree(pMemTable->actList);
tfree(pMemTable->tData);
free(pMemTable);
}
}
static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) { static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
STableData *pTableData = (STableData *)calloc(1, sizeof(*pTableData)); STableData *pTableData = (STableData *)calloc(1, sizeof(*pTableData));
...@@ -737,74 +747,6 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMem ...@@ -737,74 +747,6 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMem
return 0; return 0;
} }
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
ASSERT(pMsg != NULL);
STsdbMeta * pMeta = pRepo->tsdbMeta;
SSubmitMsgIter msgIter = {0};
SSubmitBlk * pBlock = NULL;
SSubmitBlkIter blkIter = {0};
SMemRow row = NULL;
TSKEY now = taosGetTimestamp(pRepo->config.precision);
TSKEY minKey = now - tsTickPerDay[pRepo->config.precision] * pRepo->config.keep;
TSKEY maxKey = now + tsTickPerDay[pRepo->config.precision] * pRepo->config.daysPerFile;
terrno = TSDB_CODE_SUCCESS;
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
while (true) {
if (tsdbGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (pBlock == NULL) break;
pBlock->uid = htobe64(pBlock->uid);
pBlock->tid = htonl(pBlock->tid);
pBlock->sversion = htonl(pBlock->sversion);
pBlock->dataLen = htonl(pBlock->dataLen);
pBlock->schemaLen = htonl(pBlock->schemaLen);
pBlock->numOfRows = htons(pBlock->numOfRows);
if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) {
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
pBlock->tid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
STable *pTable = pMeta->tables[pBlock->tid];
if (pTable == NULL || TABLE_UID(pTable) != pBlock->uid) {
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
pBlock->tid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
tsdbError("vgId:%d invalid action trying to insert a super table %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable));
terrno = TSDB_CODE_TDB_INVALID_ACTION;
return -1;
}
// Check schema version and update schema if needed
if (tsdbCheckTableSchema(pRepo, pBlock, pTable) < 0) {
if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
continue;
} else {
return -1;
}
}
tsdbInitSubmitBlkIter(pBlock, &blkIter);
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
if (tsdbCheckRowRange(pRepo, pTable, row, minKey, maxKey, now) < 0) {
return -1;
}
}
}
if (terrno != TSDB_CODE_SUCCESS) return -1;
return 0;
}
//row1 has higher priority //row1 has higher priority
static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRepo, static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRepo,
......
...@@ -17,9 +17,9 @@ ...@@ -17,9 +17,9 @@
int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg) { int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg) {
// Check if mem is there. If not, create one. // Check if mem is there. If not, create one.
pTsdb->mem = tsdbNewMemTable(pTsdb->pmaf); pTsdb->mem = tsdbNewMemTable(pTsdb);
if (pTsdb->mem == NULL) { if (pTsdb->mem == NULL) {
return -1; return -1;
} }
return tsdbInsertDataToMemTable(pTsdb->mem, pMsg); return tsdbMemTableInsert(pTsdb, pTsdb->mem, pMsg, NULL);
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册