未验证 提交 25a54dba 编写于 作者: C Cary Xu 提交者: GitHub

Merge pull request #12559 from taosdata/feature/TD-14481-3.0

feat: sma refactor and add cases
......@@ -2267,20 +2267,22 @@ static FORCE_INLINE void tdDestroyTSma(STSma* pSma) {
}
}
static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW) {
static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW, bool deepCopy) {
if (pSW) {
if (pSW->tSma) {
for (uint32_t i = 0; i < pSW->number; ++i) {
tdDestroyTSma(pSW->tSma + i);
if (deepCopy) {
for (uint32_t i = 0; i < pSW->number; ++i) {
tdDestroyTSma(pSW->tSma + i);
}
}
taosMemoryFreeClear(pSW->tSma);
}
}
}
static FORCE_INLINE void* tdFreeTSmaWrapper(STSmaWrapper* pSW) {
tdDestroyTSmaWrapper(pSW);
taosMemoryFree(pSW);
static FORCE_INLINE void* tdFreeTSmaWrapper(STSmaWrapper* pSW, bool deepCopy) {
tdDestroyTSmaWrapper(pSW, deepCopy);
taosMemoryFreeClear(pSW);
return NULL;
}
......
......@@ -530,6 +530,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) {
pStream = mndAcquireStream(pMnode, createReq.name);
if (pStream != NULL) {
mError("sma:%s, failed to create since stream:%s already exist", createReq.name, createReq.name);
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
goto _OVER;
}
......@@ -565,7 +566,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) {
_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("sma:%s, failed to create since %s", createReq.name, terrstr());
mError("sma:%s, failed to create since %s", createReq.name, terrstr(terrno));
}
mndReleaseStb(pMnode, pStb);
......
......@@ -24,7 +24,6 @@ extern "C" {
typedef struct SMetaIdx SMetaIdx;
typedef struct SMetaDB SMetaDB;
typedef struct SMSmaCursor SMSmaCursor;
// metaDebug ==================
// clang-format off
......@@ -114,22 +113,12 @@ typedef struct {
int64_t smaUid;
} SSmaIdxKey;
#if 1
SMSmaCursor* metaOpenSmaCursor(SMeta* pMeta, tb_uid_t uid);
void metaCloseSmaCursor(SMSmaCursor* pSmaCur);
int64_t metaSmaCursorNext(SMSmaCursor* pSmaCur);
#ifndef META_REFACT
// SMetaDB
int metaOpenDB(SMeta* pMeta);
void metaCloseDB(SMeta* pMeta);
int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle);
int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid);
int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg);
int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
#endif
#endif
#ifdef __cplusplus
......
......@@ -218,6 +218,11 @@ static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SDisk
void *tdFreeRSmaInfo(SRSmaInfo *pInfo);
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version);
// TODO: This is the basic params, and should wrap the params to a queryHandle.
int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult);
#ifdef __cplusplus
}
#endif
......
......@@ -89,11 +89,13 @@ STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
int metaGetTbNum(SMeta* pMeta);
SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid);
void metaCloseCtbCurosr(SMCtbCursor* pCtbCur);
void metaCloseCtbCursor(SMCtbCursor* pCtbCur);
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
SArray* metaGetSmaTbUids(SMeta* pMeta, bool isDup);
void* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid, bool isDecode);
STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid);
STSma* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid);
STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid, bool deepCopy);
SArray* metaGetSmaIdsByTable(SMeta* pMeta, tb_uid_t uid);
SArray* metaGetSmaTbUids(SMeta* pMeta);
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
......@@ -127,7 +129,7 @@ int32_t smaOpen(SVnode* pVnode);
int32_t smaClose(SSma* pSma);
int32_t tdUpdateExpireWindow(SSma* pSma, SSubmitReq* pMsg, int64_t version);
int32_t tdProcessTSmaCreate(SSma* pSma, char* pMsg);
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
int32_t tdProcessRSmaCreate(SSma* pSma, SMeta* pMeta, SVCreateStbReq* pReq, SMsgCb* pMsgCb);
......
......@@ -69,6 +69,11 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1;
if (tDecodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1;
} else if (pME->type == TSDB_TSMA_TABLE) {
pME->smaEntry.tsma = taosMemoryCalloc(1, sizeof(STSma));
if(!pME->smaEntry.tsma) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (tDecodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1;
} else {
ASSERT(0);
......
......@@ -225,7 +225,7 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
return pCtbCur;
}
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) {
void metaCloseCtbCursor(SMCtbCursor *pCtbCur) {
if (pCtbCur) {
if (pCtbCur->pMeta) metaULock(pCtbCur->pMeta);
if (pCtbCur->pCur) {
......@@ -291,178 +291,268 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
return pTSchema;
}
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
#if 0
#ifdef META_TDB_SMA_TEST
STSmaWrapper *pSW = NULL;
int metaGetTbNum(SMeta *pMeta) {
// TODO
// ASSERT(0);
return 0;
}
pSW = taosMemoryCalloc(1, sizeof(*pSW));
if (pSW == NULL) {
typedef struct {
SMeta *pMeta;
TDBC *pCur;
tb_uid_t uid;
void *pKey;
void *pVal;
int kLen;
int vLen;
} SMSmaCursor;
SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
SMSmaCursor *pSmaCur = NULL;
SSmaIdxKey smaIdxKey;
int ret;
int c;
pSmaCur = (SMSmaCursor *)taosMemoryCalloc(1, sizeof(*pSmaCur));
if (pSmaCur == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
if (pCur == NULL) {
taosMemoryFree(pSW);
pSmaCur->pMeta = pMeta;
pSmaCur->uid = uid;
metaRLock(pMeta);
ret = tdbDbcOpen(pMeta->pSmaIdx, &pSmaCur->pCur, NULL);
if (ret < 0) {
metaULock(pMeta);
taosMemoryFree(pSmaCur);
return NULL;
}
void *pBuf = NULL;
SSmaIdxKey *pSmaIdxKey = NULL;
// move to the suid
smaIdxKey.uid = uid;
smaIdxKey.smaUid = INT64_MIN;
tdbDbcMoveTo(pSmaCur->pCur, &smaIdxKey, sizeof(smaIdxKey), &c);
if (c > 0) {
tdbDbcMoveToNext(pSmaCur->pCur);
}
while (true) {
// TODO: lock during iterate?
if (tdbDbcNext(pCur->pCur, &pCur->pKey, &pCur->kLen, NULL, &pCur->vLen) == 0) {
pSmaIdxKey = pCur->pKey;
ASSERT(pSmaIdxKey != NULL);
return pSmaCur;
}
void *pSmaVal = metaGetSmaInfoByIndex(pMeta, pSmaIdxKey->smaUid, false);
void metaCloseSmaCursor(SMSmaCursor *pSmaCur) {
if (pSmaCur) {
if (pSmaCur->pMeta) metaULock(pSmaCur->pMeta);
if (pSmaCur->pCur) {
tdbDbcClose(pSmaCur->pCur);
if (pSmaVal == NULL) {
tsdbWarn("no tsma exists for indexUid: %" PRIi64, pSmaIdxKey->smaUid);
continue;
}
tdbFree(pSmaCur->pKey);
tdbFree(pSmaCur->pVal);
}
++pSW->number;
STSma *tptr = (STSma *)taosMemoryRealloc(pSW->tSma, pSW->number * sizeof(STSma));
if (tptr == NULL) {
tdbFree(pSmaVal);
metaCloseSmaCursor(pCur);
tdDestroyTSmaWrapper(pSW);
taosMemoryFreeClear(pSW);
return NULL;
taosMemoryFree(pSmaCur);
}
}
tb_uid_t metaSmaCursorNext(SMSmaCursor *pSmaCur) {
int ret;
SSmaIdxKey *pSmaIdxKey;
ret = tdbDbcNext(pSmaCur->pCur, &pSmaCur->pKey, &pSmaCur->kLen, &pSmaCur->pVal, &pSmaCur->vLen);
if (ret < 0) {
return 0;
}
pSmaIdxKey = pSmaCur->pKey;
if (pSmaIdxKey->uid > pSmaCur->uid) {
return 0;
}
return pSmaIdxKey->uid;
}
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
STSmaWrapper *pSW = NULL;
SArray *pSmaIds = NULL;
if (!(pSmaIds = metaGetSmaIdsByTable(pMeta, uid))) {
return NULL;
}
pSW = taosMemoryCalloc(1, sizeof(*pSW));
if (!pSW) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pSW->number = taosArrayGetSize(pSmaIds);
pSW->tSma = taosMemoryCalloc(pSW->number, sizeof(STSma));
if (!pSW->tSma) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
if (pCur == NULL) {
goto _err;
}
SMetaReader mr = {0};
metaReaderInit(&mr, pMeta, 0);
int64_t smaId;
int smaIdx = 0;
STSma *pTSma = NULL;
for (int i = 0; i < pSW->number; ++i) {
smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i);
if (metaGetTableEntryByUid(&mr, smaId) < 0) {
metaWarn("vgId:%d no entry for tbId: %" PRIi64 ", smaId: %" PRIi64, TD_VID(pMeta->pVnode), uid, smaId);
continue;
}
pTSma = pSW->tSma + smaIdx;
memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));
if (deepCopy) {
if (pTSma->exprLen > 0) {
if (!(pTSma->expr = taosMemoryCalloc(1, pTSma->exprLen))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy((void*)pTSma->expr, mr.me.smaEntry.tsma->expr, pTSma->exprLen);
}
pSW->tSma = tptr;
pBuf = pSmaVal;
if (tDecodeTSma(pBuf, pSW->tSma + pSW->number - 1) == NULL) {
tdbFree(pSmaVal);
metaCloseSmaCursor(pCur);
tdDestroyTSmaWrapper(pSW);
taosMemoryFreeClear(pSW);
return NULL;
if (pTSma->tagsFilterLen > 0) {
if (!(pTSma->tagsFilter = taosMemoryCalloc(1, pTSma->tagsFilterLen))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
tdbFree(pSmaVal);
continue;
memcpy((void*)pTSma->tagsFilter, mr.me.smaEntry.tsma->tagsFilter, pTSma->tagsFilterLen);
} else {
pTSma->exprLen = 0;
pTSma->expr = NULL;
pTSma->tagsFilterLen = 0;
pTSma->tagsFilter = NULL;
}
break;
++smaIdx;
}
metaCloseSmaCursor(pCur);
if (smaIdx <= 0) goto _err;
pSW->number = smaIdx;
metaReaderClear(&mr);
taosArrayDestroy(pSmaIds);
metaCloseSmaCursor(pCur);
return pSW;
#endif
#endif
_err:
metaReaderClear(&mr);
taosArrayDestroy(pSmaIds);
metaCloseSmaCursor(pCur);
tdFreeTSmaWrapper(pSW, deepCopy);
return NULL;
}
int metaGetTbNum(SMeta *pMeta) {
// TODO
// ASSERT(0);
return 0;
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
STSma *pTSma = NULL;
SMetaReader mr = {0};
metaReaderInit(&mr, pMeta, 0);
if (metaGetTableEntryByUid(&mr, indexUid) < 0) {
metaWarn("vgId:%d failed to get table entry for smaId: %" PRIi64, TD_VID(pMeta->pVnode), indexUid);
metaReaderClear(&mr);
return NULL;
}
pTSma = (STSma *)taosMemoryMalloc(sizeof(STSma));
if (!pTSma) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
metaReaderClear(&mr);
return NULL;
}
memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));
metaReaderClear(&mr);
return pTSma;
}
SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) {
#if 0
// TODO
// ASSERT(0); // comment this line to pass CI
// return NULL:
#ifdef META_TDB_SMA_TEST
SArray *pUids = NULL;
SMetaDB *pDB = pMeta->pDB;
void *pKey;
SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
SArray *pUids = NULL;
SSmaIdxKey *pSmaIdxKey = NULL;
// TODO: lock?
SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, 0);
if (pCur == NULL) {
SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
if (!pCur) {
return NULL;
}
// TODO: lock?
SSmaIdxKey *pSmaIdxKey = NULL;
tb_uid_t uid = 0;
while (true) {
// TODO: lock during iterate?
if (tdbDbcNext(pCur->pCur, &pCur->pKey, &pCur->kLen, NULL, &pCur->vLen) == 0) {
ASSERT(pSmaIdxKey != NULL);
pSmaIdxKey = pCur->pKey;
if (pSmaIdxKey->uid == 0 || pSmaIdxKey->uid == uid) {
continue;
}
uid = pSmaIdxKey->uid;
while (1) {
tb_uid_t id = metaSmaCursorNext(pCur);
if (id == 0) {
break;
}
if (!pUids) {
pUids = taosArrayInit(16, sizeof(tb_uid_t));
if (!pUids) {
pUids = taosArrayInit(16, sizeof(tb_uid_t));
if (!pUids) {
metaCloseSmaCursor(pCur);
return NULL;
}
terrno = TSDB_CODE_OUT_OF_MEMORY;
metaCloseSmaCursor(pCur);
return NULL;
}
}
taosArrayPush(pUids, &uid);
pSmaIdxKey = (SSmaIdxKey *)pCur->pKey;
continue;
if (taosArrayPush(pUids, &pSmaIdxKey->smaUid) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
metaCloseSmaCursor(pCur);
taosArrayDestroy(pUids);
return NULL;
}
break;
}
metaCloseSmaCursor(pCur);
return pUids;
#endif
#endif
return NULL;
}
void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode) {
#if 0
// TODO
// ASSERT(0);
// return NULL;
#ifdef META_TDB_SMA_TEST
SMetaDB *pDB = pMeta->pDB;
void *pKey = NULL;
void *pVal = NULL;
int kLen = 0;
int vLen = 0;
int ret = -1;
// Set key
pKey = (void *)&indexUid;
kLen = sizeof(indexUid);
// Query
ret = tdbDbGet(pDB->pSmaDB, pKey, kLen, &pVal, &vLen);
if (ret != 0 || !pVal) {
SArray *metaGetSmaTbUids(SMeta *pMeta) {
SArray *pUids = NULL;
SSmaIdxKey *pSmaIdxKey = NULL;
tb_uid_t lastUid = 0;
SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, 0);
if (!pCur) {
return NULL;
}
if (!isDecode) {
// return raw value
return pVal;
}
while (1) {
tb_uid_t uid = metaSmaCursorNext(pCur);
if (uid == 0) {
break;
}
// Decode
STSma *pCfg = (STSma *)taosMemoryCalloc(1, sizeof(STSma));
if (pCfg == NULL) {
taosMemoryFree(pVal);
return NULL;
}
if (lastUid == uid) {
continue;
}
void *pBuf = pVal;
if (tDecodeTSma(pBuf, pCfg) == NULL) {
tdDestroyTSma(pCfg);
taosMemoryFree(pCfg);
tdbFree(pVal);
return NULL;
lastUid = uid;
if (!pUids) {
pUids = taosArrayInit(16, sizeof(tb_uid_t));
if (!pUids) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
metaCloseSmaCursor(pCur);
return NULL;
}
}
if (taosArrayPush(pUids, &uid) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
metaCloseSmaCursor(pCur);
taosArrayDestroy(pUids);
return NULL;
}
}
tdbFree(pVal);
return pCfg;
#endif
#endif
return NULL;
metaCloseSmaCursor(pCur);
return pUids;
}
#endif
......
......@@ -16,7 +16,7 @@
#include "meta.h"
static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME);
static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME);
static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME);
int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
// TODO: Validate the cfg
......@@ -81,55 +81,6 @@ int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid) {
return TSDB_CODE_SUCCESS;
}
// static int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
// int32_t ret = 0;
// void *pBuf = NULL, *qBuf = NULL;
// void *key = {0}, *val = {0};
// // save sma info
// int32_t len = tEncodeTSma(NULL, pSmaCfg);
// pBuf = taosMemoryCalloc(1, len);
// if (pBuf == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
// return -1;
// }
// key = (void *)&pSmaCfg->indexUid;
// qBuf = pBuf;
// tEncodeTSma(&qBuf, pSmaCfg);
// val = pBuf;
// int32_t kLen = sizeof(pSmaCfg->indexUid);
// int32_t vLen = POINTER_DISTANCE(qBuf, pBuf);
// ret = tdbDbInsert(pMeta->pTbDb, key, kLen, val, vLen, &pMeta->txn);
// if (ret < 0) {
// taosMemoryFreeClear(pBuf);
// return -1;
// }
// // add sma idx
// SSmaIdxKey smaIdxKey;
// smaIdxKey.uid = pSmaCfg->tableUid;
// smaIdxKey.smaUid = pSmaCfg->indexUid;
// key = &smaIdxKey;
// kLen = sizeof(smaIdxKey);
// val = NULL;
// vLen = 0;
// ret = tdbDbInsert(pMeta->pSmaIdx, key, kLen, val, vLen, &pMeta->txn);
// if (ret < 0) {
// taosMemoryFreeClear(pBuf);
// return -1;
// }
// // release
// taosMemoryFreeClear(pBuf);
// return 0;
// }
static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME) {
STbDbKey tbDbKey;
void *pKey = NULL;
......@@ -182,6 +133,10 @@ static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbDbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
}
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbDbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn);
}
static int metaUpdateSmaIdx(SMeta *pMeta, const SMetaEntry *pME) {
SSmaIdxKey smaIdxKey = {.uid = pME->smaEntry.tsma->tableUid, .smaUid = pME->smaEntry.tsma->indexUid};
......@@ -194,9 +149,13 @@ static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME) {
// save to table.db
if (metaSaveSmaToDB(pMeta, pME) < 0) goto _err;
// // update uid.idx
// update uid.idx
if (metaUpdateUidIdx(pMeta, pME) < 0) goto _err;
// update name.idx
if (metaUpdateNameIdx(pMeta, pME) < 0) goto _err;
// update sma.idx
if (metaUpdateSmaIdx(pMeta, pME) < 0) goto _err;
metaULock(pMeta);
......
......@@ -27,4 +27,28 @@ int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg) {
return code;
}
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdProcessTSmaCreateImpl(pSma, version, msg)) < 0) {
smaWarn("vgId:%d create tsma failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
// TODO: destroy SSDataBlocks(msg)
return code;
}
int32_t tdUpdateExpireWindow(SSma* pSma, SSubmitReq* pMsg, int64_t version) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdUpdateExpiredWindowImpl(pSma, pMsg, version)) < 0) {
smaWarn("vgId:%d update expired sma window failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
return code;
}
int32_t tdGetTSmaData(SSma* pSma, char* pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdGetTSmaDataImpl(pSma, pData, indexUid, querySKey, nMaxResult)) < 0) {
smaWarn("vgId:%d get tSma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
return code;
}
......@@ -122,12 +122,10 @@ static void poolFree(void *arg, void *ptr) {
}
int32_t tdInitSma(SSma *pSma) {
// tSma
int32_t numOfTSma = taosArrayGetSize(metaGetSmaTbUids(SMA_META(pSma), false));
int32_t numOfTSma = taosArrayGetSize(metaGetSmaTbUids(SMA_META(pSma)));
if (numOfTSma > 0) {
atomic_store_16(&SMA_TSMA_NUM(pSma), (int16_t)numOfTSma);
}
// TODO: rSma
return TSDB_CODE_SUCCESS;
}
......
......@@ -443,7 +443,7 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
// TODO: use the proper schema instead of 0, and cache STSchema in cache
STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, 0);
STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, 1);
if (!pTSchema) {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return TSDB_CODE_FAILED;
......
......@@ -70,15 +70,15 @@ static bool tdSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey);
static int32_t tdInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyLen, void *pData, int32_t dataLen,
TXN *txn);
// expired window
static int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version);
static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey,
int64_t version);
static int32_t tdResetExpiredWindow(SSma *pSma, SSmaStat *pStat, int64_t indexUid, TSKEY skey);
static int32_t tdDropTSmaDataImpl(SSma *pSma, int64_t indexUid);
// read data
// TODO: This is the basic params, and should wrap the params to a queryHandle.
static int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult);
// implementation
......@@ -713,7 +713,7 @@ static int32_t tdDropTSmaDataImpl(SSma *pSma, int64_t indexUid) {
* @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM.
* @return int32_t
*/
static int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult) {
int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult) {
SSmaEnv *pEnv = atomic_load_ptr(&SMA_TSMA_ENV(pSma));
SSmaStat *pStat = NULL;
......@@ -834,35 +834,15 @@ static int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKE
return TSDB_CODE_SUCCESS;
}
int32_t tdProcessTSmaCreate(SSma *pSma, char *pMsg) {
#if 0
SSmaCfg vCreateSmaReq = {0};
if (!tDeserializeSVCreateTSmaReq(pMsg, &vCreateSmaReq)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
smaWarn("vgId:%d tsma create msg received but deserialize failed since %s", SMA_VID(pSma), terrstr(terrno));
return -1;
}
smaDebug("vgId:%d tsma create msg %s:%" PRIi64 " for table %" PRIi64 " received", SMA_VID(pSma),
vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid, vCreateSmaReq.tSma.tableUid);
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
SSmaCfg *pCfg = (SSmaCfg *)pMsg;
// record current timezone of server side
vCreateSmaReq.tSma.timezoneInt = tsTimezone;
if (metaCreateTSma(SMA_META(pSma), &vCreateSmaReq) < 0) {
// TODO: handle error
smaWarn("vgId:%d tsma %s:%" PRIi64 " create failed for table %" PRIi64 " since %s", SMA_VID(pSma),
vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid, vCreateSmaReq.tSma.tableUid, terrstr(terrno));
tdDestroyTSma(&vCreateSmaReq.tSma);
if (metaCreateTSma(SMA_META(pSma), version, pCfg) < 0) {
return -1;
}
tdTSmaAdd(pSma, 1);
tdDestroyTSma(&vCreateSmaReq.tSma);
// TODO: return directly or go on follow steps?
#endif
return TSDB_CODE_SUCCESS;
return 0;
}
int32_t tdDropTSma(SSma *pSma, char *pMsg) {
......@@ -930,7 +910,7 @@ static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t inde
}
// cache smaMeta
STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid, true);
STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid);
if (!pTSma) {
terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META;
taosHashCleanup(pItem->expiredWindows);
......@@ -1031,25 +1011,25 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version)
SSubmitBlkIter blkIter = {0};
if (tInitSubmitBlkIter(&msgIter, pBlock, &blkIter) < 0) {
pSW = tdFreeTSmaWrapper(pSW);
pSW = tdFreeTSmaWrapper(pSW, false);
break;
}
while (true) {
STSRow *row = tGetSubmitBlkNext(&blkIter);
if (!row) {
tdFreeTSmaWrapper(pSW);
pSW = tdFreeTSmaWrapper(pSW, false);
break;
}
if (!pSW || (pTSma->tableUid != pBlock->suid)) {
if (!pSW || (pTSma->tableUid != msgIter.suid)) {
if (pSW) {
pSW = tdFreeTSmaWrapper(pSW);
pSW = tdFreeTSmaWrapper(pSW, false);
}
if (!(pSW = metaGetSmaInfoByTable(SMA_META(pSma), pBlock->suid))) {
if (!(pSW = metaGetSmaInfoByTable(SMA_META(pSma), msgIter.suid, false))) {
break;
}
if ((pSW->number) <= 0 || !pSW->tSma) {
pSW = tdFreeTSmaWrapper(pSW);
pSW = tdFreeTSmaWrapper(pSW, false);
break;
}
......@@ -1068,6 +1048,7 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version)
if (lastWinSKey != winSKey) {
lastWinSKey = winSKey;
if (tdSetExpiredWindow(pSma, pItemsHash, pTSma->indexUid, winSKey, version) < 0) {
pSW = tdFreeTSmaWrapper(pSW, false);
tdUnRefSmaStat(pSma, pStat);
return TSDB_CODE_FAILED;
}
......@@ -1083,21 +1064,3 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version)
return TSDB_CODE_SUCCESS;
}
int32_t tdUpdateExpireWindow(SSma *pSma, SSubmitReq *pMsg, int64_t version) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdUpdateExpiredWindowImpl(pSma, pMsg, version)) < 0) {
smaWarn("vgId:%d update expired sma window failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
return code;
}
int32_t tdGetTSmaData(SSma *pSma, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdGetTSmaDataImpl(pSma, pData, indexUid, querySKey, nMaxResult)) < 0) {
smaWarn("vgId:%d get tSma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
return code;
}
......@@ -2760,7 +2760,7 @@ static int32_t getAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
taosArrayPush(list, &info);
}
metaCloseCtbCurosr(pCur);
metaCloseCtbCursor(pCur);
return TSDB_CODE_SUCCESS;
}
......
......@@ -728,15 +728,22 @@ static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq
goto _err;
}
if (metaCreateTSma(pVnode->pMeta, version, &req) < 0) {
// record current timezone of server side
req.timezoneInt = tsTimezone;
if (tdProcessTSmaCreate(pVnode->pSma, version, (const char *)&req) < 0) {
pRsp->code = terrno;
goto _err;
}
tDecoderClear(&coder);
vDebug("vgId:%d success to create tsma %s:%" PRIi64 " for table %" PRIi64, TD_VID(pVnode), req.indexName,
req.indexUid, req.tableUid);
return 0;
_err:
tDecoderClear(&coder);
vError("vgId:%d failed to create tsma %s:%" PRIi64 " for table %" PRIi64 " since %s", TD_VID(pVnode), req.indexName,
req.indexUid, req.tableUid, terrstr(terrno));
return -1;
}
......@@ -359,7 +359,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECREATED, "Table re-created")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR, "TDB env open error")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_SMA_INDEX_IN_META, "No sma index in meta")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_SMA_STAT, "Invalid sma state")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TSMA_ALREADY_EXIST, "Tsma already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TSMA_ALREADY_EXIST, "TSMA already exists")
// query
......
......@@ -104,7 +104,8 @@
./test.sh -f tsim/mnode/basic1.sim -m
# --- sma
# ./test.sh -f tsim/sma/tsmaCreateInsertData.sim
./test.sh -f tsim/sma/tsmaCreateInsertData.sim
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
# --- valgrind
./test.sh -f tsim/valgrind/checkError.sim -v
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database with retentions
sql create database d0 retentions 15s:7d,1m:21d,15m:365d;
sql use d0
print =============== create super table and register rsma
sql create table if not exists stb (ts timestamp, c1 int) tags (city binary(20),district binary(20)) rollup(min) file_factor 0.1 delay 2;
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
sql create table ct1 using stb tags("BeiJing", "ChaoYang");
sql show tables
if $rows != 1 then
return -1
endi
print =============== insert data and trigger rollup
sql insert into ct1 values(now, 10);
sql insert into ct1 values(now+1s, 1);
sql insert into ct1 values(now+2s, 100);
print =============== select * from retention level 2 from memory
sql select * from ct1;
print $data00 $data01
if $rows > 2 then
print retention level 2 file rows $rows > 2
return -1
endi
print =============== select * from retention level 1 from memory
sql select * from ct1 where ts > now-8d;
print $data00 $data01
if $rows > 2 then
print retention level 1 file rows $rows > 2
return -1
endi
print =============== select * from retention level 0 from memory
sql select * from ct1 where ts > now-3d;
print $data00 $data01
print $data10 $data11
print $data20 $data21
if $rows < 1 then
print retention level 0 file rows $rows < 1
return -1
endi
#===================================================================
#==================== reboot to trigger commit data to file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
print =============== select * from retention level 2 from file
sql select * from ct1;
print $data00 $data01
if $rows > 2 then
print retention level 2 file rows $rows > 2
return -1
endi
print =============== select * from retention level 1 from file
sql select * from ct1 where ts > now-8d;
print $data00 $data01
if $rows > 2 then
print retention level 1 file rows $rows > 2
return -1
endi
print =============== select * from retention level 0 from file
sql select * from ct1 where ts > now-3d;
print $data00 $data01
print $data10 $data11
print $data20 $data21
if $rows < 1 then
print retention level 0 file rows $rows < 1
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册