未验证 提交 1bc3801e 编写于 作者: C Cary Xu 提交者: GitHub

Merge pull request #10830 from taosdata/feature/TD-11463-3.0

Feature/td 11463 3.0
......@@ -2052,27 +2052,19 @@ static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) {
buf = taosDecodeFixedI64(buf, &pSma->sliding);
if (pSma->exprLen > 0) {
pSma->expr = (char*)calloc(pSma->exprLen, 1);
if (pSma->expr != NULL) {
buf = taosDecodeStringTo(buf, pSma->expr);
} else {
if ((buf = taosDecodeString(buf, &pSma->expr)) == NULL) {
tdDestroyTSma(pSma);
return NULL;
}
} else {
pSma->expr = NULL;
}
if (pSma->tagsFilterLen > 0) {
pSma->tagsFilter = (char*)calloc(pSma->tagsFilterLen, 1);
if (pSma->tagsFilter != NULL) {
buf = taosDecodeStringTo(buf, pSma->tagsFilter);
} else {
if ((buf = taosDecodeString(buf, &pSma->tagsFilter)) == NULL) {
tdDestroyTSma(pSma);
return NULL;
}
} else {
pSma->tagsFilter = NULL;
}
......
......@@ -26,8 +26,9 @@ typedef struct SDBFile SDBFile;
typedef DB_ENV* TDBEnv;
struct SDBFile {
DB* pDB;
char* path;
int32_t fid;
DB* pDB;
char* path;
};
int32_t tsdbOpenDBF(TDBEnv pEnv, SDBFile* pDBF);
......
......@@ -884,7 +884,7 @@ const char *metaSmaCursorNext(SMSmaCursor *pCur) {
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
STSmaWrapper *pSW = NULL;
pSW = calloc(sizeof(*pSW), 1);
pSW = calloc(1, sizeof(*pSW));
if (pSW == NULL) {
return NULL;
}
......
......@@ -156,10 +156,6 @@ static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv) {
return TSDB_CODE_FAILED;
}
if (*pEnv) {
return TSDB_CODE_SUCCESS;
}
if (*pEnv == NULL) {
if ((*pEnv = tsdbNewSmaEnv(pTsdb, path)) == NULL) {
return TSDB_CODE_FAILED;
......@@ -260,10 +256,15 @@ static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) {
int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
if (pSmaStat) {
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
SSmaStatItem *item = taosHashIterate(pSmaStat->smaStatItems, NULL);
void *item = taosHashIterate(pSmaStat->smaStatItems, NULL);
while (item != NULL) {
tfree(item->pSma);
taosHashCleanup(item->expiredWindows);
SSmaStatItem *pItem = *(SSmaStatItem **)item;
if (pItem != NULL) {
tdDestroyTSma(pItem->pSma);
tfree(pItem->pSma);
taosHashCleanup(pItem->expiredWindows);
tfree(pItem);
}
item = taosHashIterate(pSmaStat->smaStatItems, item);
}
taosHashCleanup(pSmaStat->smaStatItems);
......@@ -292,9 +293,10 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
// init sma env
tsdbLockRepo(pTsdb);
if (pTsdb->pTSmaEnv == NULL) {
pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&pTsdb->pTSmaEnv) : atomic_load_ptr(&pTsdb->pRSmaEnv);
if (pEnv == NULL) {
char rname[TSDB_FILENAME_LEN] = {0};
char aname[TSDB_FILENAME_LEN * 2 + 32] = {0}; // TODO: make TMPNAME_LEN public as TSDB_FILENAME_LEN?
char aname[TSDB_FILENAME_LEN] = {0}; // use TSDB_FILENAME_LEN currently
SDiskID did = {0};
tfsAllocDisk(pTsdb->pTfs, TFS_PRIMARY_LEVEL, &did);
......@@ -315,11 +317,8 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
return TSDB_CODE_FAILED;
}
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
atomic_store_ptr(&pTsdb->pTSmaEnv, pEnv);
} else {
atomic_store_ptr(&pTsdb->pRSmaEnv, pEnv);
}
(smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&pTsdb->pTSmaEnv, pEnv)
: atomic_store_ptr(&pTsdb->pRSmaEnv, pEnv);
}
tsdbUnlockRepo(pTsdb);
......@@ -359,8 +358,10 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) {
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv);
TASSERT(pEnv != NULL && pStat != NULL && pItemsHash != NULL);
tsdbRefSmaStat(pTsdb, pStat);
SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
if (pItem == NULL) {
pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state
if (pItem == NULL) {
......@@ -421,9 +422,9 @@ static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t ind
tsdbRefSmaStat(pTsdb, pStat);
if (pStat && pStat->smaStatItems) {
pItem = *(SSmaStatItem **)taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid));
pItem = taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid));
}
if (pItem != NULL) {
if ((pItem != NULL) && ((pItem = *(SSmaStatItem **)pItem) != NULL)) {
// pItem resides in hash buffer all the time unless drop sma index
// TODO: multithread protect
if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) {
......@@ -494,7 +495,7 @@ static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) {
* @brief Insert TSma data blocks to DB File build by B+Tree
*
* @param pSmaH
* @param smaKey
* @param smaKey tableUid-colId-skeyOfWindow(8-2-8)
* @param keyLen
* @param pData
* @param dataLen
......@@ -502,12 +503,11 @@ static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) {
*/
static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen) {
SDBFile *pDBFile = &pSmaH->dFile;
// TODO: insert sma data blocks into B+Tree
tsdbDebug("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d",
REPO_ID(pSmaH->pTsdb), pDBFile->path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8),
*(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen);
// TODO: insert sma data blocks into B+Tree(TDB)
if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen) != 0) {
return TSDB_CODE_FAILED;
}
......@@ -564,34 +564,34 @@ static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit
return interval / 1e3;
} else if (TIME_UNIT_NANOSECOND == intervalUnit) { // nano second
return interval / 1e6;
} else {
} else { // ms
return interval;
}
break;
case TSDB_TIME_PRECISION_MICRO:
if (TIME_UNIT_MICROSECOND == intervalUnit) { // us
return interval;
} else if (TIME_UNIT_NANOSECOND == intervalUnit) { // nano second
} else if (TIME_UNIT_NANOSECOND == intervalUnit) { // ns
return interval / 1e3;
} else {
} else { // ms
return interval * 1e3;
}
break;
case TSDB_TIME_PRECISION_NANO:
if (TIME_UNIT_MICROSECOND == intervalUnit) {
if (TIME_UNIT_MICROSECOND == intervalUnit) { // us
return interval * 1e3;
} else if (TIME_UNIT_NANOSECOND == intervalUnit) { // nano second
} else if (TIME_UNIT_NANOSECOND == intervalUnit) { // ns
return interval;
} else {
} else { // ms
return interval * 1e6;
}
break;
default: // ms
if (TIME_UNIT_MICROSECOND == intervalUnit) { // us
return interval / 1e3;
} else if (TIME_UNIT_NANOSECOND == intervalUnit) { // nano second
} else if (TIME_UNIT_NANOSECOND == intervalUnit) { // ns
return interval / 1e6;
} else {
} else { // ms
return interval;
}
break;
......@@ -663,9 +663,13 @@ static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) {
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid) {
STsdb *pTsdb = pSmaH->pTsdb;
ASSERT(pSmaH->dFile.path == NULL && pSmaH->dFile.pDB == NULL);
pSmaH->dFile.fid = fid;
char tSmaFile[TSDB_FILENAME_LEN] = {0};
snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.tsma", REPO_ID(pTsdb), fid);
pSmaH->dFile.path = strdup(tSmaFile);
return TSDB_CODE_SUCCESS;
}
......@@ -705,7 +709,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
STsdbCfg * pCfg = REPO_CFG(pTsdb);
STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
if (!pTsdb->pTSmaEnv) {
if (!atomic_load_ptr(&pTsdb->pTSmaEnv)) {
terrno = TSDB_CODE_INVALID_PTR;
tsdbWarn("vgId:%d insert tSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
return terrno;
......@@ -883,15 +887,15 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) {
static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval,
int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySKey,
int32_t nMaxResult) {
if (!pTsdb->pTSmaEnv) {
if (!atomic_load_ptr(&pTsdb->pTSmaEnv)) {
terrno = TSDB_CODE_INVALID_PTR;
tsdbWarn("vgId:%d getTSmaDataImpl failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
return TSDB_CODE_FAILED;
}
tsdbRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv));
SSmaStatItem *pItem = *(SSmaStatItem **)taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &indexUid, sizeof(indexUid));
if (pItem == NULL) {
SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &indexUid, sizeof(indexUid));
if ((pItem == NULL) || ((pItem = *(SSmaStatItem **)pItem) == NULL)) {
// Normally pItem should not be NULL, mark all windows as expired and notify query module to fetch raw TS data if
// it's NULL.
tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv));
......
......@@ -144,6 +144,9 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
return -1;
}
// record current timezone of server side
tstrncpy(vCreateSmaReq.tSma.timezone, tsTimezone, TD_TIMEZONE_LEN);
if (metaCreateTSma(pVnode->pMeta, &vCreateSmaReq) < 0) {
// TODO: handle error
tdDestroyTSma(&vCreateSmaReq.tSma);
......
......@@ -49,7 +49,7 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) {
STSmaWrapper tSmaWrapper = {.number = 1, .tSma = &tSma};
uint32_t bufLen = tEncodeTSmaWrapper(NULL, &tSmaWrapper);
void *buf = calloc(bufLen, 1);
void *buf = calloc(1, bufLen);
ASSERT_NE(buf, nullptr);
STSmaWrapper *pSW = (STSmaWrapper *)buf;
......@@ -84,6 +84,7 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) {
}
// resource release
tfree(pSW);
tdDestroyTSma(&tSma);
tdDestroyTSmaWrapper(&dstTSmaWrapper);
}
......@@ -113,7 +114,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
tSma.tableUid = tbUid;
tSma.exprLen = strlen(expr);
tSma.expr = (char *)calloc(tSma.exprLen + 1, 1);
tSma.expr = (char *)calloc(1, tSma.exprLen + 1);
ASSERT_NE(tSma.expr, nullptr);
tstrncpy(tSma.expr, expr, tSma.exprLen + 1);
......@@ -251,12 +252,12 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
tSma.tableUid = tbUid;
tSma.exprLen = strlen(expr);
tSma.expr = (char *)calloc(tSma.exprLen + 1, 1);
tSma.expr = (char *)calloc(1, tSma.exprLen + 1);
ASSERT_NE(tSma.expr, nullptr);
tstrncpy(tSma.expr, expr, tSma.exprLen + 1);
tSma.tagsFilterLen = strlen(tagsFilter);
tSma.tagsFilter = (char *)calloc(tSma.tagsFilterLen + 1, 1);
tSma.tagsFilter = (char *)calloc(1, tSma.tagsFilterLen + 1);
ASSERT_NE(tSma.tagsFilter, nullptr);
tstrncpy(tSma.tagsFilter, tagsFilter, tSma.tagsFilterLen + 1);
......@@ -273,20 +274,20 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
// step 2: insert data
STSmaDataWrapper *pSmaData = NULL;
STsdb tsdb = {0};
STsdbCfg * pCfg = &tsdb.config;
tsdb.pMeta = pMeta;
tsdb.vgId = 2;
tsdb.config.daysPerFile = 10; // default days is 10
tsdb.config.keep1 = 30;
tsdb.config.keep2 = 90;
tsdb.config.keep = 365;
tsdb.config.precision = TSDB_TIME_PRECISION_MILLI;
tsdb.config.update = TD_ROW_OVERWRITE_UPDATE;
tsdb.config.compression = TWO_STAGE_COMP;
switch (tsdb.config.precision) {
STsdb * pTsdb = (STsdb *)calloc(1, sizeof(STsdb));
STsdbCfg * pCfg = &pTsdb->config;
pTsdb->pMeta = pMeta;
pTsdb->vgId = 2;
pTsdb->config.daysPerFile = 10; // default days is 10
pTsdb->config.keep1 = 30;
pTsdb->config.keep2 = 90;
pTsdb->config.keep = 365;
pTsdb->config.precision = TSDB_TIME_PRECISION_MILLI;
pTsdb->config.update = TD_ROW_OVERWRITE_UPDATE;
pTsdb->config.compression = TWO_STAGE_COMP;
switch (pTsdb->config.precision) {
case TSDB_TIME_PRECISION_MILLI:
skey1 *= 1e3;
break;
......@@ -304,12 +305,12 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
SDiskCfg pDisks = {.level = 0, .primary = 1};
strncpy(pDisks.dir, "/var/lib/taos", TSDB_FILENAME_LEN);
int32_t numOfDisks = 1;
tsdb.pTfs = tfsOpen(&pDisks, numOfDisks);
ASSERT_NE(tsdb.pTfs, nullptr);
pTsdb->pTfs = tfsOpen(&pDisks, numOfDisks);
ASSERT_NE(pTsdb->pTfs, nullptr);
char *msg = (char *)calloc(1, 100);
ASSERT_NE(msg, nullptr);
ASSERT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0);
ASSERT_EQ(tsdbUpdateSmaWindow(pTsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0);
// init
int32_t allocCnt = 0;
......@@ -367,13 +368,13 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
ASSERT_GE(bufSize, pSmaData->dataLen);
// execute
ASSERT_EQ(tsdbInsertTSmaData(&tsdb, (char *)pSmaData), TSDB_CODE_SUCCESS);
ASSERT_EQ(tsdbInsertTSmaData(pTsdb, (char *)pSmaData), TSDB_CODE_SUCCESS);
// step 3: query
uint32_t checkDataCnt = 0;
for (int32_t t = 0; t < numOfTables; ++t) {
for (col_id_t c = 0; c < numOfCols; ++c) {
ASSERT_EQ(tsdbGetTSmaData(&tsdb, NULL, indexUid1, interval1, intervalUnit1, tbUid + t,
ASSERT_EQ(tsdbGetTSmaData(pTsdb, NULL, indexUid1, interval1, intervalUnit1, tbUid + t,
c + PRIMARYKEY_TIMESTAMP_COL_ID, skey1, 1),
TSDB_CODE_SUCCESS);
++checkDataCnt;
......@@ -383,9 +384,12 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
printf("%s:%d The sma data check count for insert and query is %" PRIu32 "\n", __FILE__, __LINE__, checkDataCnt);
// release data
tfree(msg);
taosTZfree(buf);
// release meta
tdDestroyTSma(&tSma);
tfsClose(pTsdb->pTfs);
tsdbClose(pTsdb);
metaClose(pMeta);
}
#endif
......
......@@ -204,7 +204,8 @@ void tfsDirname(const STfsFile *pFile, char *dest) {
void tfsAbsoluteName(STfs *pTfs, SDiskID diskId, const char *rname, char *aname) {
STfsDisk *pDisk = TFS_DISK_AT(pTfs, diskId);
snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname);
snprintf(aname, TSDB_FILENAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname);
}
int32_t tfsRemoveFile(const STfsFile *pFile) { return taosRemoveFile(pFile->aname); }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册