提交 e054b3b4 编写于 作者: C Cary Xu

feat: sma refactor and add cases

上级 8215bb0f
...@@ -2255,20 +2255,22 @@ static FORCE_INLINE void tdDestroyTSma(STSma* pSma) { ...@@ -2255,20 +2255,22 @@ static FORCE_INLINE void tdDestroyTSma(STSma* pSma) {
} }
} }
static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW) { static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW, bool deepCopy) {
if (pSW) { if (pSW) {
if (pSW->tSma) { if (pSW->tSma) {
for (uint32_t i = 0; i < pSW->number; ++i) { if (deepCopy) {
tdDestroyTSma(pSW->tSma + i); for (uint32_t i = 0; i < pSW->number; ++i) {
tdDestroyTSma(pSW->tSma + i);
}
} }
taosMemoryFreeClear(pSW->tSma); taosMemoryFreeClear(pSW->tSma);
} }
} }
} }
static FORCE_INLINE void* tdFreeTSmaWrapper(STSmaWrapper* pSW) { static FORCE_INLINE void* tdFreeTSmaWrapper(STSmaWrapper* pSW, bool deepCopy) {
tdDestroyTSmaWrapper(pSW); tdDestroyTSmaWrapper(pSW, deepCopy);
taosMemoryFree(pSW); taosMemoryFreeClear(pSW);
return NULL; return NULL;
} }
......
...@@ -530,6 +530,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) { ...@@ -530,6 +530,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) {
pStream = mndAcquireStream(pMnode, createReq.name); pStream = mndAcquireStream(pMnode, createReq.name);
if (pStream != NULL) { if (pStream != NULL) {
mError("sma:%s, failed to create since stream:%s already exist", createReq.name, createReq.name); mError("sma:%s, failed to create since stream:%s already exist", createReq.name, createReq.name);
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
goto _OVER; goto _OVER;
} }
...@@ -565,7 +566,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) { ...@@ -565,7 +566,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) {
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("sma:%s, failed to create since %s", createReq.name, terrstr()); mError("sma:%s, failed to create since %s", createReq.name, terrstr(terrno));
} }
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
......
...@@ -24,7 +24,6 @@ extern "C" { ...@@ -24,7 +24,6 @@ extern "C" {
typedef struct SMetaIdx SMetaIdx; typedef struct SMetaIdx SMetaIdx;
typedef struct SMetaDB SMetaDB; typedef struct SMetaDB SMetaDB;
typedef struct SMSmaCursor SMSmaCursor;
// metaDebug ================== // metaDebug ==================
// clang-format off // clang-format off
...@@ -114,22 +113,12 @@ typedef struct { ...@@ -114,22 +113,12 @@ typedef struct {
int64_t smaUid; int64_t smaUid;
} SSmaIdxKey; } SSmaIdxKey;
#if 1
SMSmaCursor* metaOpenSmaCursor(SMeta* pMeta, tb_uid_t uid);
void metaCloseSmaCursor(SMSmaCursor* pSmaCur);
int64_t metaSmaCursorNext(SMSmaCursor* pSmaCur);
#ifndef META_REFACT #ifndef META_REFACT
// SMetaDB // SMetaDB
int metaOpenDB(SMeta* pMeta); int metaOpenDB(SMeta* pMeta);
void metaCloseDB(SMeta* pMeta); void metaCloseDB(SMeta* pMeta);
int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle); int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle);
int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid); int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid);
int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg);
int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
#endif
#endif #endif
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -218,6 +218,11 @@ static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SDisk ...@@ -218,6 +218,11 @@ static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SDisk
void *tdFreeRSmaInfo(SRSmaInfo *pInfo); void *tdFreeRSmaInfo(SRSmaInfo *pInfo);
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version);
// TODO: This is the basic params, and should wrap the params to a queryHandle.
int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -89,11 +89,13 @@ STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver); ...@@ -89,11 +89,13 @@ STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
int metaGetTableEntryByName(SMetaReader* pReader, const char* name); int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
int metaGetTbNum(SMeta* pMeta); int metaGetTbNum(SMeta* pMeta);
SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid); SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid);
void metaCloseCtbCurosr(SMCtbCursor* pCtbCur); void metaCloseCtbCursor(SMCtbCursor* pCtbCur);
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur); tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
SArray* metaGetSmaTbUids(SMeta* pMeta, bool isDup); STSma* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid);
void* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid, bool isDecode); STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid, bool deepCopy);
STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid); SArray* metaGetSmaIdsByTable(SMeta* pMeta, tb_uid_t uid);
SArray* metaGetSmaTbUids(SMeta* pMeta);
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
...@@ -126,7 +128,7 @@ int32_t smaOpen(SVnode* pVnode); ...@@ -126,7 +128,7 @@ int32_t smaOpen(SVnode* pVnode);
int32_t smaClose(SSma* pSma); int32_t smaClose(SSma* pSma);
int32_t tdUpdateExpireWindow(SSma* pSma, SSubmitReq* pMsg, int64_t version); int32_t tdUpdateExpireWindow(SSma* pSma, SSubmitReq* pMsg, int64_t version);
int32_t tdProcessTSmaCreate(SSma* pSma, char* pMsg); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
int32_t tdProcessRSmaCreate(SSma* pSma, SMeta* pMeta, SVCreateStbReq* pReq, SMsgCb* pMsgCb); int32_t tdProcessRSmaCreate(SSma* pSma, SMeta* pMeta, SVCreateStbReq* pReq, SMsgCb* pMsgCb);
......
...@@ -69,6 +69,11 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { ...@@ -69,6 +69,11 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1; if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1;
if (tDecodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1; if (tDecodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1;
} else if (pME->type == TSDB_TSMA_TABLE) { } else if (pME->type == TSDB_TSMA_TABLE) {
pME->smaEntry.tsma = taosMemoryCalloc(1, sizeof(STSma));
if(!pME->smaEntry.tsma) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (tDecodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1; if (tDecodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1;
} else { } else {
ASSERT(0); ASSERT(0);
......
...@@ -225,7 +225,7 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) { ...@@ -225,7 +225,7 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
return pCtbCur; return pCtbCur;
} }
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) { void metaCloseCtbCursor(SMCtbCursor *pCtbCur) {
if (pCtbCur) { if (pCtbCur) {
if (pCtbCur->pMeta) metaULock(pCtbCur->pMeta); if (pCtbCur->pMeta) metaULock(pCtbCur->pMeta);
if (pCtbCur->pCur) { if (pCtbCur->pCur) {
...@@ -291,178 +291,268 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { ...@@ -291,178 +291,268 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
return pTSchema; return pTSchema;
} }
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) { int metaGetTbNum(SMeta *pMeta) {
#if 0 // TODO
#ifdef META_TDB_SMA_TEST // ASSERT(0);
STSmaWrapper *pSW = NULL; return 0;
}
pSW = taosMemoryCalloc(1, sizeof(*pSW)); typedef struct {
if (pSW == NULL) { SMeta *pMeta;
TDBC *pCur;
tb_uid_t uid;
void *pKey;
void *pVal;
int kLen;
int vLen;
} SMSmaCursor;
SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
SMSmaCursor *pSmaCur = NULL;
SSmaIdxKey smaIdxKey;
int ret;
int c;
pSmaCur = (SMSmaCursor *)taosMemoryCalloc(1, sizeof(*pSmaCur));
if (pSmaCur == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid); pSmaCur->pMeta = pMeta;
if (pCur == NULL) { pSmaCur->uid = uid;
taosMemoryFree(pSW); metaRLock(pMeta);
ret = tdbDbcOpen(pMeta->pSmaIdx, &pSmaCur->pCur, NULL);
if (ret < 0) {
metaULock(pMeta);
taosMemoryFree(pSmaCur);
return NULL; return NULL;
} }
void *pBuf = NULL; // move to the suid
SSmaIdxKey *pSmaIdxKey = NULL; smaIdxKey.uid = uid;
smaIdxKey.smaUid = INT64_MIN;
tdbDbcMoveTo(pSmaCur->pCur, &smaIdxKey, sizeof(smaIdxKey), &c);
if (c > 0) {
tdbDbcMoveToNext(pSmaCur->pCur);
}
while (true) { return pSmaCur;
// TODO: lock during iterate? }
if (tdbDbcNext(pCur->pCur, &pCur->pKey, &pCur->kLen, NULL, &pCur->vLen) == 0) {
pSmaIdxKey = pCur->pKey;
ASSERT(pSmaIdxKey != NULL);
void *pSmaVal = metaGetSmaInfoByIndex(pMeta, pSmaIdxKey->smaUid, false); void metaCloseSmaCursor(SMSmaCursor *pSmaCur) {
if (pSmaCur) {
if (pSmaCur->pMeta) metaULock(pSmaCur->pMeta);
if (pSmaCur->pCur) {
tdbDbcClose(pSmaCur->pCur);
if (pSmaVal == NULL) { tdbFree(pSmaCur->pKey);
tsdbWarn("no tsma exists for indexUid: %" PRIi64, pSmaIdxKey->smaUid); tdbFree(pSmaCur->pVal);
continue; }
}
++pSW->number; taosMemoryFree(pSmaCur);
STSma *tptr = (STSma *)taosMemoryRealloc(pSW->tSma, pSW->number * sizeof(STSma)); }
if (tptr == NULL) { }
tdbFree(pSmaVal);
metaCloseSmaCursor(pCur); tb_uid_t metaSmaCursorNext(SMSmaCursor *pSmaCur) {
tdDestroyTSmaWrapper(pSW); int ret;
taosMemoryFreeClear(pSW); SSmaIdxKey *pSmaIdxKey;
return NULL;
ret = tdbDbcNext(pSmaCur->pCur, &pSmaCur->pKey, &pSmaCur->kLen, &pSmaCur->pVal, &pSmaCur->vLen);
if (ret < 0) {
return 0;
}
pSmaIdxKey = pSmaCur->pKey;
if (pSmaIdxKey->uid > pSmaCur->uid) {
return 0;
}
return pSmaIdxKey->uid;
}
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
STSmaWrapper *pSW = NULL;
SArray *pSmaIds = NULL;
if (!(pSmaIds = metaGetSmaIdsByTable(pMeta, uid))) {
return NULL;
}
pSW = taosMemoryCalloc(1, sizeof(*pSW));
if (!pSW) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pSW->number = taosArrayGetSize(pSmaIds);
pSW->tSma = taosMemoryCalloc(pSW->number, sizeof(STSma));
if (!pSW->tSma) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
if (pCur == NULL) {
goto _err;
}
SMetaReader mr = {0};
metaReaderInit(&mr, pMeta, 0);
int64_t smaId;
int smaIdx = 0;
STSma *pTSma = NULL;
for (int i = 0; i < pSW->number; ++i) {
smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i);
if (metaGetTableEntryByUid(&mr, smaId) < 0) {
metaWarn("vgId:%d no entry for tbId: %" PRIi64 ", smaId: %" PRIi64, TD_VID(pMeta->pVnode), uid, smaId);
continue;
}
pTSma = pSW->tSma + smaIdx;
memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));
if (deepCopy) {
if (pTSma->exprLen > 0) {
if (!(pTSma->expr = taosMemoryCalloc(1, pTSma->exprLen))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy((void*)pTSma->expr, mr.me.smaEntry.tsma->expr, pTSma->exprLen);
} }
pSW->tSma = tptr; if (pTSma->tagsFilterLen > 0) {
pBuf = pSmaVal; if (!(pTSma->tagsFilter = taosMemoryCalloc(1, pTSma->tagsFilterLen))) {
if (tDecodeTSma(pBuf, pSW->tSma + pSW->number - 1) == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY;
tdbFree(pSmaVal); goto _err;
metaCloseSmaCursor(pCur); }
tdDestroyTSmaWrapper(pSW);
taosMemoryFreeClear(pSW);
return NULL;
} }
tdbFree(pSmaVal); memcpy((void*)pTSma->tagsFilter, mr.me.smaEntry.tsma->tagsFilter, pTSma->tagsFilterLen);
continue; } else {
pTSma->exprLen = 0;
pTSma->expr = NULL;
pTSma->tagsFilterLen = 0;
pTSma->tagsFilter = NULL;
} }
break;
++smaIdx;
} }
metaCloseSmaCursor(pCur); if (smaIdx <= 0) goto _err;
pSW->number = smaIdx;
metaReaderClear(&mr);
taosArrayDestroy(pSmaIds);
metaCloseSmaCursor(pCur);
return pSW; return pSW;
_err:
#endif metaReaderClear(&mr);
#endif taosArrayDestroy(pSmaIds);
metaCloseSmaCursor(pCur);
tdFreeTSmaWrapper(pSW, deepCopy);
return NULL; return NULL;
} }
int metaGetTbNum(SMeta *pMeta) { STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
// TODO STSma *pTSma = NULL;
// ASSERT(0); SMetaReader mr = {0};
return 0; metaReaderInit(&mr, pMeta, 0);
if (metaGetTableEntryByUid(&mr, indexUid) < 0) {
metaWarn("vgId:%d failed to get table entry for smaId: %" PRIi64, TD_VID(pMeta->pVnode), indexUid);
metaReaderClear(&mr);
return NULL;
}
pTSma = (STSma *)taosMemoryMalloc(sizeof(STSma));
if (!pTSma) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
metaReaderClear(&mr);
return NULL;
}
memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));
metaReaderClear(&mr);
return pTSma;
} }
SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) { SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
#if 0 SArray *pUids = NULL;
// TODO SSmaIdxKey *pSmaIdxKey = NULL;
// ASSERT(0); // comment this line to pass CI
// return NULL:
#ifdef META_TDB_SMA_TEST
SArray *pUids = NULL;
SMetaDB *pDB = pMeta->pDB;
void *pKey;
// TODO: lock? SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, 0); if (!pCur) {
if (pCur == NULL) {
return NULL; return NULL;
} }
// TODO: lock?
SSmaIdxKey *pSmaIdxKey = NULL; while (1) {
tb_uid_t uid = 0; tb_uid_t id = metaSmaCursorNext(pCur);
while (true) { if (id == 0) {
// TODO: lock during iterate? break;
if (tdbDbcNext(pCur->pCur, &pCur->pKey, &pCur->kLen, NULL, &pCur->vLen) == 0) { }
ASSERT(pSmaIdxKey != NULL);
pSmaIdxKey = pCur->pKey;
if (pSmaIdxKey->uid == 0 || pSmaIdxKey->uid == uid) {
continue;
}
uid = pSmaIdxKey->uid;
if (!pUids) {
pUids = taosArrayInit(16, sizeof(tb_uid_t));
if (!pUids) { if (!pUids) {
pUids = taosArrayInit(16, sizeof(tb_uid_t)); terrno = TSDB_CODE_OUT_OF_MEMORY;
if (!pUids) { metaCloseSmaCursor(pCur);
metaCloseSmaCursor(pCur); return NULL;
return NULL;
}
} }
}
taosArrayPush(pUids, &uid); pSmaIdxKey = (SSmaIdxKey *)pCur->pKey;
continue; if (taosArrayPush(pUids, &pSmaIdxKey->smaUid) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
metaCloseSmaCursor(pCur);
taosArrayDestroy(pUids);
return NULL;
} }
break;
} }
metaCloseSmaCursor(pCur); metaCloseSmaCursor(pCur);
return pUids; return pUids;
#endif
#endif
return NULL;
} }
void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode) { SArray *metaGetSmaTbUids(SMeta *pMeta) {
#if 0 SArray *pUids = NULL;
// TODO SSmaIdxKey *pSmaIdxKey = NULL;
// ASSERT(0); tb_uid_t lastUid = 0;
// return NULL;
#ifdef META_TDB_SMA_TEST SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, 0);
SMetaDB *pDB = pMeta->pDB; if (!pCur) {
void *pKey = NULL;
void *pVal = NULL;
int kLen = 0;
int vLen = 0;
int ret = -1;
// Set key
pKey = (void *)&indexUid;
kLen = sizeof(indexUid);
// Query
ret = tdbDbGet(pDB->pSmaDB, pKey, kLen, &pVal, &vLen);
if (ret != 0 || !pVal) {
return NULL; return NULL;
} }
if (!isDecode) { while (1) {
// return raw value tb_uid_t uid = metaSmaCursorNext(pCur);
return pVal; if (uid == 0) {
} break;
}
// Decode if (lastUid == uid) {
STSma *pCfg = (STSma *)taosMemoryCalloc(1, sizeof(STSma)); continue;
if (pCfg == NULL) { }
taosMemoryFree(pVal);
return NULL;
}
void *pBuf = pVal; lastUid = uid;
if (tDecodeTSma(pBuf, pCfg) == NULL) {
tdDestroyTSma(pCfg); if (!pUids) {
taosMemoryFree(pCfg); pUids = taosArrayInit(16, sizeof(tb_uid_t));
tdbFree(pVal); if (!pUids) {
return NULL; terrno = TSDB_CODE_OUT_OF_MEMORY;
metaCloseSmaCursor(pCur);
return NULL;
}
}
if (taosArrayPush(pUids, &uid) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
metaCloseSmaCursor(pCur);
taosArrayDestroy(pUids);
return NULL;
}
} }
tdbFree(pVal); metaCloseSmaCursor(pCur);
return pCfg; return pUids;
#endif
#endif
return NULL;
} }
#endif #endif
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#include "meta.h" #include "meta.h"
static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME); static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME);
static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME); static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME);
int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) { int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
// TODO: Validate the cfg // TODO: Validate the cfg
...@@ -81,55 +81,6 @@ int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid) { ...@@ -81,55 +81,6 @@ int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// static int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
// int32_t ret = 0;
// void *pBuf = NULL, *qBuf = NULL;
// void *key = {0}, *val = {0};
// // save sma info
// int32_t len = tEncodeTSma(NULL, pSmaCfg);
// pBuf = taosMemoryCalloc(1, len);
// if (pBuf == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
// return -1;
// }
// key = (void *)&pSmaCfg->indexUid;
// qBuf = pBuf;
// tEncodeTSma(&qBuf, pSmaCfg);
// val = pBuf;
// int32_t kLen = sizeof(pSmaCfg->indexUid);
// int32_t vLen = POINTER_DISTANCE(qBuf, pBuf);
// ret = tdbDbInsert(pMeta->pTbDb, key, kLen, val, vLen, &pMeta->txn);
// if (ret < 0) {
// taosMemoryFreeClear(pBuf);
// return -1;
// }
// // add sma idx
// SSmaIdxKey smaIdxKey;
// smaIdxKey.uid = pSmaCfg->tableUid;
// smaIdxKey.smaUid = pSmaCfg->indexUid;
// key = &smaIdxKey;
// kLen = sizeof(smaIdxKey);
// val = NULL;
// vLen = 0;
// ret = tdbDbInsert(pMeta->pSmaIdx, key, kLen, val, vLen, &pMeta->txn);
// if (ret < 0) {
// taosMemoryFreeClear(pBuf);
// return -1;
// }
// // release
// taosMemoryFreeClear(pBuf);
// return 0;
// }
static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME) { static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME) {
STbDbKey tbDbKey; STbDbKey tbDbKey;
void *pKey = NULL; void *pKey = NULL;
...@@ -182,6 +133,10 @@ static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) { ...@@ -182,6 +133,10 @@ static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbDbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn); return tdbDbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
} }
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbDbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn);
}
static int metaUpdateSmaIdx(SMeta *pMeta, const SMetaEntry *pME) { static int metaUpdateSmaIdx(SMeta *pMeta, const SMetaEntry *pME) {
SSmaIdxKey smaIdxKey = {.uid = pME->smaEntry.tsma->tableUid, .smaUid = pME->smaEntry.tsma->indexUid}; SSmaIdxKey smaIdxKey = {.uid = pME->smaEntry.tsma->tableUid, .smaUid = pME->smaEntry.tsma->indexUid};
...@@ -194,9 +149,13 @@ static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME) { ...@@ -194,9 +149,13 @@ static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME) {
// save to table.db // save to table.db
if (metaSaveSmaToDB(pMeta, pME) < 0) goto _err; if (metaSaveSmaToDB(pMeta, pME) < 0) goto _err;
// // update uid.idx // update uid.idx
if (metaUpdateUidIdx(pMeta, pME) < 0) goto _err; if (metaUpdateUidIdx(pMeta, pME) < 0) goto _err;
// update name.idx
if (metaUpdateNameIdx(pMeta, pME) < 0) goto _err;
// update sma.idx
if (metaUpdateSmaIdx(pMeta, pME) < 0) goto _err; if (metaUpdateSmaIdx(pMeta, pME) < 0) goto _err;
metaULock(pMeta); metaULock(pMeta);
......
...@@ -27,4 +27,28 @@ int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg) { ...@@ -27,4 +27,28 @@ int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg) {
return code; return code;
} }
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdProcessTSmaCreateImpl(pSma, version, msg)) < 0) {
smaWarn("vgId:%d create tsma failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
// TODO: destroy SSDataBlocks(msg)
return code;
}
int32_t tdUpdateExpireWindow(SSma* pSma, SSubmitReq* pMsg, int64_t version) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdUpdateExpiredWindowImpl(pSma, pMsg, version)) < 0) {
smaWarn("vgId:%d update expired sma window failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
return code;
}
int32_t tdGetTSmaData(SSma* pSma, char* pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdGetTSmaDataImpl(pSma, pData, indexUid, querySKey, nMaxResult)) < 0) {
smaWarn("vgId:%d get tSma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
return code;
}
...@@ -122,12 +122,10 @@ static void poolFree(void *arg, void *ptr) { ...@@ -122,12 +122,10 @@ static void poolFree(void *arg, void *ptr) {
} }
int32_t tdInitSma(SSma *pSma) { int32_t tdInitSma(SSma *pSma) {
// tSma int32_t numOfTSma = taosArrayGetSize(metaGetSmaTbUids(SMA_META(pSma)));
int32_t numOfTSma = taosArrayGetSize(metaGetSmaTbUids(SMA_META(pSma), false));
if (numOfTSma > 0) { if (numOfTSma > 0) {
atomic_store_16(&SMA_TSMA_NUM(pSma), (int16_t)numOfTSma); atomic_store_16(&SMA_TSMA_NUM(pSma), (int16_t)numOfTSma);
} }
// TODO: rSma
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -443,7 +443,7 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb ...@@ -443,7 +443,7 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
// TODO: use the proper schema instead of 0, and cache STSchema in cache // TODO: use the proper schema instead of 0, and cache STSchema in cache
STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, 0); STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, 1);
if (!pTSchema) { if (!pTSchema) {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
......
...@@ -70,15 +70,15 @@ static bool tdSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey); ...@@ -70,15 +70,15 @@ static bool tdSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey);
static int32_t tdInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyLen, void *pData, int32_t dataLen, static int32_t tdInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyLen, void *pData, int32_t dataLen,
TXN *txn); TXN *txn);
// expired window // expired window
static int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version);
static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey,
int64_t version); int64_t version);
static int32_t tdResetExpiredWindow(SSma *pSma, SSmaStat *pStat, int64_t indexUid, TSKEY skey); static int32_t tdResetExpiredWindow(SSma *pSma, SSmaStat *pStat, int64_t indexUid, TSKEY skey);
static int32_t tdDropTSmaDataImpl(SSma *pSma, int64_t indexUid); static int32_t tdDropTSmaDataImpl(SSma *pSma, int64_t indexUid);
// read data // read data
// TODO: This is the basic params, and should wrap the params to a queryHandle.
static int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult);
// implementation // implementation
...@@ -713,7 +713,7 @@ static int32_t tdDropTSmaDataImpl(SSma *pSma, int64_t indexUid) { ...@@ -713,7 +713,7 @@ static int32_t tdDropTSmaDataImpl(SSma *pSma, int64_t indexUid) {
* @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM. * @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM.
* @return int32_t * @return int32_t
*/ */
static int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult) { int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult) {
SSmaEnv *pEnv = atomic_load_ptr(&SMA_TSMA_ENV(pSma)); SSmaEnv *pEnv = atomic_load_ptr(&SMA_TSMA_ENV(pSma));
SSmaStat *pStat = NULL; SSmaStat *pStat = NULL;
...@@ -834,35 +834,15 @@ static int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKE ...@@ -834,35 +834,15 @@ static int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKE
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tdProcessTSmaCreate(SSma *pSma, char *pMsg) { int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
#if 0 SSmaCfg *pCfg = (SSmaCfg *)pMsg;
SSmaCfg vCreateSmaReq = {0};
if (!tDeserializeSVCreateTSmaReq(pMsg, &vCreateSmaReq)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
smaWarn("vgId:%d tsma create msg received but deserialize failed since %s", SMA_VID(pSma), terrstr(terrno));
return -1;
}
smaDebug("vgId:%d tsma create msg %s:%" PRIi64 " for table %" PRIi64 " received", SMA_VID(pSma),
vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid, vCreateSmaReq.tSma.tableUid);
// record current timezone of server side if (metaCreateTSma(SMA_META(pSma), version, pCfg) < 0) {
vCreateSmaReq.tSma.timezoneInt = tsTimezone;
if (metaCreateTSma(SMA_META(pSma), &vCreateSmaReq) < 0) {
// TODO: handle error
smaWarn("vgId:%d tsma %s:%" PRIi64 " create failed for table %" PRIi64 " since %s", SMA_VID(pSma),
vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid, vCreateSmaReq.tSma.tableUid, terrstr(terrno));
tdDestroyTSma(&vCreateSmaReq.tSma);
return -1; return -1;
} }
tdTSmaAdd(pSma, 1); tdTSmaAdd(pSma, 1);
return 0;
tdDestroyTSma(&vCreateSmaReq.tSma);
// TODO: return directly or go on follow steps?
#endif
return TSDB_CODE_SUCCESS;
} }
int32_t tdDropTSma(SSma *pSma, char *pMsg) { int32_t tdDropTSma(SSma *pSma, char *pMsg) {
...@@ -930,7 +910,7 @@ static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t inde ...@@ -930,7 +910,7 @@ static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t inde
} }
// cache smaMeta // cache smaMeta
STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid, true); STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid);
if (!pTSma) { if (!pTSma) {
terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META; terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META;
taosHashCleanup(pItem->expiredWindows); taosHashCleanup(pItem->expiredWindows);
...@@ -1031,25 +1011,25 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version) ...@@ -1031,25 +1011,25 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version)
SSubmitBlkIter blkIter = {0}; SSubmitBlkIter blkIter = {0};
if (tInitSubmitBlkIter(&msgIter, pBlock, &blkIter) < 0) { if (tInitSubmitBlkIter(&msgIter, pBlock, &blkIter) < 0) {
pSW = tdFreeTSmaWrapper(pSW); pSW = tdFreeTSmaWrapper(pSW, false);
break; break;
} }
while (true) { while (true) {
STSRow *row = tGetSubmitBlkNext(&blkIter); STSRow *row = tGetSubmitBlkNext(&blkIter);
if (!row) { if (!row) {
tdFreeTSmaWrapper(pSW); pSW = tdFreeTSmaWrapper(pSW, false);
break; break;
} }
if (!pSW || (pTSma->tableUid != pBlock->suid)) { if (!pSW || (pTSma->tableUid != msgIter.suid)) {
if (pSW) { if (pSW) {
pSW = tdFreeTSmaWrapper(pSW); pSW = tdFreeTSmaWrapper(pSW, false);
} }
if (!(pSW = metaGetSmaInfoByTable(SMA_META(pSma), pBlock->suid))) { if (!(pSW = metaGetSmaInfoByTable(SMA_META(pSma), msgIter.suid, false))) {
break; break;
} }
if ((pSW->number) <= 0 || !pSW->tSma) { if ((pSW->number) <= 0 || !pSW->tSma) {
pSW = tdFreeTSmaWrapper(pSW); pSW = tdFreeTSmaWrapper(pSW, false);
break; break;
} }
...@@ -1068,6 +1048,7 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version) ...@@ -1068,6 +1048,7 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version)
if (lastWinSKey != winSKey) { if (lastWinSKey != winSKey) {
lastWinSKey = winSKey; lastWinSKey = winSKey;
if (tdSetExpiredWindow(pSma, pItemsHash, pTSma->indexUid, winSKey, version) < 0) { if (tdSetExpiredWindow(pSma, pItemsHash, pTSma->indexUid, winSKey, version) < 0) {
pSW = tdFreeTSmaWrapper(pSW, false);
tdUnRefSmaStat(pSma, pStat); tdUnRefSmaStat(pSma, pStat);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -1083,21 +1064,3 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version) ...@@ -1083,21 +1064,3 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tdUpdateExpireWindow(SSma *pSma, SSubmitReq *pMsg, int64_t version) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdUpdateExpiredWindowImpl(pSma, pMsg, version)) < 0) {
smaWarn("vgId:%d update expired sma window failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
return code;
}
int32_t tdGetTSmaData(SSma *pSma, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdGetTSmaDataImpl(pSma, pData, indexUid, querySKey, nMaxResult)) < 0) {
smaWarn("vgId:%d get tSma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
return code;
}
...@@ -2760,7 +2760,7 @@ static int32_t getAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) { ...@@ -2760,7 +2760,7 @@ static int32_t getAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
taosArrayPush(list, &info); taosArrayPush(list, &info);
} }
metaCloseCtbCurosr(pCur); metaCloseCtbCursor(pCur);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -711,15 +711,22 @@ static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -711,15 +711,22 @@ static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq
goto _err; goto _err;
} }
if (metaCreateTSma(pVnode->pMeta, version, &req) < 0) { // record current timezone of server side
req.timezoneInt = tsTimezone;
if (tdProcessTSmaCreate(pVnode->pSma, version, (const char *)&req) < 0) {
pRsp->code = terrno; pRsp->code = terrno;
goto _err; goto _err;
} }
tDecoderClear(&coder); tDecoderClear(&coder);
vDebug("vgId:%d success to create tsma %s:%" PRIi64 " for table %" PRIi64, TD_VID(pVnode), req.indexName,
req.indexUid, req.tableUid);
return 0; return 0;
_err: _err:
tDecoderClear(&coder); tDecoderClear(&coder);
vError("vgId:%d failed to create tsma %s:%" PRIi64 " for table %" PRIi64 " since %s", TD_VID(pVnode), req.indexName,
req.indexUid, req.tableUid, terrstr(terrno));
return -1; return -1;
} }
...@@ -358,7 +358,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECREATED, "Table re-created") ...@@ -358,7 +358,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECREATED, "Table re-created")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR, "TDB env open error") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR, "TDB env open error")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_SMA_INDEX_IN_META, "No sma index in meta") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_SMA_INDEX_IN_META, "No sma index in meta")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_SMA_STAT, "Invalid sma state") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_SMA_STAT, "Invalid sma state")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TSMA_ALREADY_EXIST, "Tsma already exists") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TSMA_ALREADY_EXIST, "TSMA already exists")
// query // query
......
...@@ -104,7 +104,8 @@ ...@@ -104,7 +104,8 @@
./test.sh -f tsim/mnode/basic1.sim -m ./test.sh -f tsim/mnode/basic1.sim -m
# --- sma # --- sma
# ./test.sh -f tsim/sma/tsmaCreateInsertData.sim ./test.sh -f tsim/sma/tsmaCreateInsertData.sim
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
# --- valgrind # --- valgrind
./test.sh -f tsim/valgrind/checkError.sim -v ./test.sh -f tsim/valgrind/checkError.sim -v
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database with retentions
sql create database d0 retentions 15s:7d,1m:21d,15m:365d;
sql use d0
print =============== create super table and register rsma
sql create table if not exists stb (ts timestamp, c1 int) tags (city binary(20),district binary(20)) rollup(min) file_factor 0.1 delay 2;
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
sql create table ct1 using stb tags("BeiJing", "ChaoYang");
sql show tables
if $rows != 1 then
return -1
endi
print =============== insert data and trigger rollup
sql insert into ct1 values(now, 10);
sql insert into ct1 values(now+1s, 1);
sql insert into ct1 values(now+2s, 100);
print =============== select * from retention level 2 from memory
sql select * from ct1;
print $data00 $data01
if $rows > 1 then
print retention level 2 file rows $rows > 1
return -1
endi
print =============== select * from retention level 1 from memory
sql select * from ct1 where ts > now-8d;
print $data00 $data01
if $rows > 1 then
print retention level 1 file rows $rows > 1
return -1
endi
print =============== select * from retention level 0 from memory
sql select * from ct1 where ts > now-3d;
print $data00 $data01
print $data10 $data11
print $data20 $data21
if $rows < 1 then
print retention level 0 file rows $rows < 1
return -1
endi
#===================================================================
#==================== reboot to trigger commit data to file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
print =============== select * from retention level 2 from file
sql select * from ct1;
print $data00 $data01
if $rows > 1 then
print retention level 2 file rows $rows > 1
return -1
endi
print =============== select * from retention level 1 from file
sql select * from ct1 where ts > now-8d;
print $data00 $data01
if $rows > 1 then
print retention level 1 file rows $rows > 1
return -1
endi
print =============== select * from retention level 0 from file
sql select * from ct1 where ts > now-3d;
print $data00 $data01
print $data10 $data11
print $data20 $data21
if $rows < 1 then
print retention level 0 file rows $rows < 1
return -1
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册