未验证 提交 e11147dd 编写于 作者: C Cary Xu 提交者: GitHub

Merge pull request #11964 from taosdata/feature/TD-14481-3.0

feat: rollup sma refactor
...@@ -1521,8 +1521,8 @@ typedef struct { ...@@ -1521,8 +1521,8 @@ typedef struct {
char* qmsg2; // pAst2:qmsg2:SRetention2 => trigger aggr task2 char* qmsg2; // pAst2:qmsg2:SRetention2 => trigger aggr task2
} SRSmaParam; } SRSmaParam;
int tEncodeSRSmaParam(SCoder* pCoder, const SRSmaParam* pRSmaParam); int32_t tEncodeSRSmaParam(SCoder* pCoder, const SRSmaParam* pRSmaParam);
int tDecodeSRSmaParam(SCoder* pCoder, SRSmaParam* pRSmaParam); int32_t tDecodeSRSmaParam(SCoder* pCoder, SRSmaParam* pRSmaParam);
typedef struct SVCreateStbReq { typedef struct SVCreateStbReq {
const char* name; const char* name;
......
...@@ -214,6 +214,14 @@ STSRow *tdRowDup(STSRow *row); ...@@ -214,6 +214,14 @@ STSRow *tdRowDup(STSRow *row);
static FORCE_INLINE SKvRowIdx *tdKvRowColIdxAt(STSRow *pRow, col_id_t idx) { static FORCE_INLINE SKvRowIdx *tdKvRowColIdxAt(STSRow *pRow, col_id_t idx) {
return (SKvRowIdx *)TD_ROW_COL_IDX(pRow) + idx; return (SKvRowIdx *)TD_ROW_COL_IDX(pRow) + idx;
} }
static FORCE_INLINE int16_t tdKvRowColIdAt(STSRow *pRow, col_id_t idx) {
ASSERT(idx >= 0);
if (idx == 0) {
return PRIMARYKEY_TIMESTAMP_COL_ID;
}
return ((SKvRowIdx *)TD_ROW_COL_IDX(pRow) + idx - 1)->colId;
}
static FORCE_INLINE void *tdKVRowColVal(STSRow *pRow, SKvRowIdx *pIdx) { return POINTER_SHIFT(pRow, pIdx->offset); } static FORCE_INLINE void *tdKVRowColVal(STSRow *pRow, SKvRowIdx *pIdx) { return POINTER_SHIFT(pRow, pIdx->offset); }
#define TD_ROW_OFFSET(p) ((p)->toffset); // During ParseInsert when without STSchema, how to get the offset for STpRow? #define TD_ROW_OFFSET(p) ((p)->toffset); // During ParseInsert when without STSchema, how to get the offset for STpRow?
......
...@@ -3621,6 +3621,43 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) { ...@@ -3621,6 +3621,43 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
taosMemoryFreeClear(pReq->ast); taosMemoryFreeClear(pReq->ast);
} }
int32_t tEncodeSRSmaParam(SCoder *pCoder, const SRSmaParam *pRSmaParam) {
if (tEncodeFloat(pCoder, pRSmaParam->xFilesFactor) < 0) return -1;
if (tEncodeI32v(pCoder, pRSmaParam->delay) < 0) return -1;
if (tEncodeI32v(pCoder, pRSmaParam->qmsg1Len) < 0) return -1;
if (tEncodeI32v(pCoder, pRSmaParam->qmsg2Len) < 0) return -1;
if (pRSmaParam->qmsg1Len > 0) {
if (tEncodeBinary(pCoder, pRSmaParam->qmsg1, (uint64_t)pRSmaParam->qmsg1Len) < 0) // qmsg1Len contains len of '\0'
return -1;
}
if (pRSmaParam->qmsg2Len > 0) {
if (tEncodeBinary(pCoder, pRSmaParam->qmsg2, (uint64_t)pRSmaParam->qmsg2Len) < 0) // qmsg2Len contains len of '\0'
return -1;
}
return 0;
}
int32_t tDecodeSRSmaParam(SCoder *pCoder, SRSmaParam *pRSmaParam) {
if (tDecodeFloat(pCoder, &pRSmaParam->xFilesFactor) < 0) return -1;
if (tDecodeI32v(pCoder, &pRSmaParam->delay) < 0) return -1;
if (tDecodeI32v(pCoder, &pRSmaParam->qmsg1Len) < 0) return -1;
if (tDecodeI32v(pCoder, &pRSmaParam->qmsg2Len) < 0) return -1;
if (pRSmaParam->qmsg1Len > 0) {
uint64_t len;
if (tDecodeBinaryAlloc(pCoder, (void **)&pRSmaParam->qmsg1, &len) < 0) return -1; // qmsg1Len contains len of '\0'
} else {
pRSmaParam->qmsg1 = NULL;
}
if (pRSmaParam->qmsg2Len > 0) {
uint64_t len;
if (tDecodeBinaryAlloc(pCoder, (void **)&pRSmaParam->qmsg2, &len) < 0) return -1; // qmsg2Len contains len of '\0'
} else {
pRSmaParam->qmsg2 = NULL;
}
return 0;
}
int tEncodeSVCreateStbReq(SCoder *pCoder, const SVCreateStbReq *pReq) { int tEncodeSVCreateStbReq(SCoder *pCoder, const SVCreateStbReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1; if (tStartEncode(pCoder) < 0) return -1;
...@@ -3629,9 +3666,9 @@ int tEncodeSVCreateStbReq(SCoder *pCoder, const SVCreateStbReq *pReq) { ...@@ -3629,9 +3666,9 @@ int tEncodeSVCreateStbReq(SCoder *pCoder, const SVCreateStbReq *pReq) {
if (tEncodeI8(pCoder, pReq->rollup) < 0) return -1; if (tEncodeI8(pCoder, pReq->rollup) < 0) return -1;
if (tEncodeSSchemaWrapper(pCoder, &pReq->schema) < 0) return -1; if (tEncodeSSchemaWrapper(pCoder, &pReq->schema) < 0) return -1;
if (tEncodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1; if (tEncodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1;
// if (pReq->rollup) { if (pReq->rollup) {
// if (tEncodeSRSmaParam(pCoder, pReq->pRSmaParam) < 0) return -1; if (tEncodeSRSmaParam(pCoder, &pReq->pRSmaParam) < 0) return -1;
// } }
tEndEncode(pCoder); tEndEncode(pCoder);
return 0; return 0;
...@@ -3645,9 +3682,9 @@ int tDecodeSVCreateStbReq(SCoder *pCoder, SVCreateStbReq *pReq) { ...@@ -3645,9 +3682,9 @@ int tDecodeSVCreateStbReq(SCoder *pCoder, SVCreateStbReq *pReq) {
if (tDecodeI8(pCoder, &pReq->rollup) < 0) return -1; if (tDecodeI8(pCoder, &pReq->rollup) < 0) return -1;
if (tDecodeSSchemaWrapper(pCoder, &pReq->schema) < 0) return -1; if (tDecodeSSchemaWrapper(pCoder, &pReq->schema) < 0) return -1;
if (tDecodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1; if (tDecodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1;
// if (pReq->rollup) { if (pReq->rollup) {
// if (tDecodeSRSmaParam(pCoder, pReq->pRSmaParam) < 0) return -1; if (tDecodeSRSmaParam(pCoder, &pReq->pRSmaParam) < 0) return -1;
// } }
tEndDecode(pCoder); tEndDecode(pCoder);
return 0; return 0;
......
...@@ -220,7 +220,7 @@ static uint8_t tdGetMergedBitmapByte(uint8_t byte) { ...@@ -220,7 +220,7 @@ static uint8_t tdGetMergedBitmapByte(uint8_t byte) {
} }
/** /**
* @brief Merge bitmap from 2 bits to 1 bits, and the memory buffer should be guaranteed by the invoker. * @brief Merge bitmap from 2 bits to 1 bit, and the memory buffer should be guaranteed by the invoker.
* *
* @param srcBitmap * @param srcBitmap
* @param nBits * @param nBits
......
...@@ -22,13 +22,13 @@ ...@@ -22,13 +22,13 @@
extern "C" { extern "C" {
#endif #endif
typedef int32_t (*__tb_ddl_fn_t)(void *ahandle, void **result, void *p1, void *p2); // typedef int32_t (*__tb_ddl_fn_t)(void *ahandle, void **result, void *p1, void *p2);
struct STbDdlH { // struct STbDdlH {
void *ahandle; // void *ahandle;
void *result; // void *result;
__tb_ddl_fn_t fp; // __tb_ddl_fn_t fp;
}; // };
static FORCE_INLINE int32_t tsdbUidStoreInit(STbUidStore **pStore) { static FORCE_INLINE int32_t tsdbUidStoreInit(STbUidStore **pStore) {
ASSERT(*pStore == NULL); ASSERT(*pStore == NULL);
...@@ -40,14 +40,6 @@ static FORCE_INLINE int32_t tsdbUidStoreInit(STbUidStore **pStore) { ...@@ -40,14 +40,6 @@ static FORCE_INLINE int32_t tsdbUidStoreInit(STbUidStore **pStore) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
void tsdbUidStoreDestory(STbUidStore *pStore);
void *tsdbUidStoreFree(STbUidStore *pStore);
int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq);
int32_t tsdbFetchTbUidList(void *pTsdb, void **result, void *suid, void *uid);
int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pUidStore);
int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, void *pMsg, int32_t inputType);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -107,6 +107,15 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); ...@@ -107,6 +107,15 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId); int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
// sma
int32_t tsdbRegisterRSma(STsdb* pTsdb, SMeta* pMeta, SVCreateStbReq* pReq);
int32_t tsdbFetchTbUidList(STsdb* pTsdb, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
int32_t tsdbUpdateTbUidList(STsdb* pTsdb, STbUidStore* pUidStore);
void tsdbUidStoreDestory(STbUidStore* pStore);
void* tsdbUidStoreFree(STbUidStore* pStore);
int32_t tsdbTriggerRSma(STsdb* pTsdb, SMeta* pMeta, void* pMsg, int32_t inputType);
typedef struct { typedef struct {
int8_t streamType; // sma or other int8_t streamType; // sma or other
int8_t dstType; int8_t dstType;
...@@ -162,7 +171,7 @@ struct STbUidStore { ...@@ -162,7 +171,7 @@ struct STbUidStore {
#define TD_VID(PVNODE) (PVNODE)->config.vgId #define TD_VID(PVNODE) (PVNODE)->config.vgId
typedef struct STbDdlH STbDdlH; // typedef struct STbDdlH STbDdlH;
// sma // sma
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
......
...@@ -1519,8 +1519,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit ...@@ -1519,8 +1519,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
} else if (isRow1DataRow) { } else if (isRow1DataRow) {
colIdOfRow1 = pSchema1->columns[j].colId; colIdOfRow1 = pSchema1->columns[j].colId;
} else { } else {
SKvRowIdx* pColIdx = tdKvRowColIdxAt(row1, j); colIdOfRow1 = tdKvRowColIdAt(row1, j);
colIdOfRow1 = pColIdx->colId;
} }
int32_t colIdOfRow2; int32_t colIdOfRow2;
...@@ -1529,8 +1528,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit ...@@ -1529,8 +1528,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
} else if (isRow2DataRow) { } else if (isRow2DataRow) {
colIdOfRow2 = pSchema2->columns[k].colId; colIdOfRow2 = pSchema2->columns[k].colId;
} else { } else {
SKvRowIdx* pColIdx = tdKvRowColIdxAt(row2, k); colIdOfRow2 = tdKvRowColIdAt(row2, j);
colIdOfRow2 = pColIdx->colId;
} }
if (colIdOfRow1 == colIdOfRow2) { if (colIdOfRow1 == colIdOfRow2) {
......
...@@ -173,6 +173,7 @@ static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) ...@@ -173,6 +173,7 @@ static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[])
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char *msg); static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char *msg);
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg); static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg);
static FORCE_INLINE int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid, SArray *tbUids); static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid, SArray *tbUids);
// mgmt interface // mgmt interface
static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid); static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid);
...@@ -1692,18 +1693,16 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) { ...@@ -1692,18 +1693,16 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) {
* @param pReq * @param pReq
* @return int32_t * @return int32_t
*/ */
int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) { int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateStbReq *pReq) {
#if 0 if (!pReq->rollup) {
SRSmaParam *param = pReq->stbCfg.pRSmaParam; tsdbDebug("vgId:%d return directly since no rollup for stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->suid);
if (!param) {
tsdbDebug("vgId:%d return directly since no rollup for stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name,
pReq->stbCfg.suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SRSmaParam *param = &pReq->pRSmaParam;
if ((param->qmsg1Len == 0) && (param->qmsg2Len == 0)) { if ((param->qmsg1Len == 0) && (param->qmsg2Len == 0)) {
tsdbWarn("vgId:%d no qmsg1/qmsg2 for rollup stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->stbCfg.suid); tsdbWarn("vgId:%d no qmsg1/qmsg2 for rollup stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1716,9 +1715,9 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) { ...@@ -1716,9 +1715,9 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) {
SSmaStat *pStat = SMA_ENV_STAT(pEnv); SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaInfo *pRSmaInfo = NULL; SRSmaInfo *pRSmaInfo = NULL;
pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &pReq->stbCfg.suid, sizeof(tb_uid_t)); pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t));
if (pRSmaInfo) { if (pRSmaInfo) {
tsdbWarn("vgId:%d rsma info already exists for stb: %s, %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->stbCfg.suid); tsdbWarn("vgId:%d rsma info already exists for stb: %s, %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1758,14 +1757,13 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) { ...@@ -1758,14 +1757,13 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) {
} }
} }
if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->stbCfg.suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) != if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) !=
TSDB_CODE_SUCCESS) { TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} else { } else {
tsdbDebug("vgId:%d register rsma info succeed for suid:%" PRIi64, REPO_ID(pTsdb), pReq->stbCfg.suid); tsdbDebug("vgId:%d register rsma info succeed for suid:%" PRIi64, REPO_ID(pTsdb), pReq->suid);
} }
#endif
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1777,7 +1775,7 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) { ...@@ -1777,7 +1775,7 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) {
* @param uid * @param uid
* @return int32_t * @return int32_t
*/ */
int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) { static int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) {
// prefer to store suid/uids in array // prefer to store suid/uids in array
if ((suid == pStore->suid) || (pStore->suid == 0)) { if ((suid == pStore->suid) || (pStore->suid == 0)) {
if (pStore->suid == 0) { if (pStore->suid == 0) {
...@@ -1833,6 +1831,7 @@ void tsdbUidStoreDestory(STbUidStore *pStore) { ...@@ -1833,6 +1831,7 @@ void tsdbUidStoreDestory(STbUidStore *pStore) {
if (pStore) { if (pStore) {
if (pStore->uidHash) { if (pStore->uidHash) {
if (pStore->tbUids) { if (pStore->tbUids) {
// When pStore->tbUids not NULL, the pStore->uidHash has k/v; otherwise pStore->uidHash only has keys.
void *pIter = taosHashIterate(pStore->uidHash, NULL); void *pIter = taosHashIterate(pStore->uidHash, NULL);
while (pIter) { while (pIter) {
SArray *arr = *(SArray **)pIter; SArray *arr = *(SArray **)pIter;
...@@ -1847,8 +1846,10 @@ void tsdbUidStoreDestory(STbUidStore *pStore) { ...@@ -1847,8 +1846,10 @@ void tsdbUidStoreDestory(STbUidStore *pStore) {
} }
void *tsdbUidStoreFree(STbUidStore *pStore) { void *tsdbUidStoreFree(STbUidStore *pStore) {
tsdbUidStoreDestory(pStore); if (pStore) {
taosMemoryFree(pStore); tsdbUidStoreDestory(pStore);
taosMemoryFree(pStore);
}
return NULL; return NULL;
} }
...@@ -1861,7 +1862,7 @@ void *tsdbUidStoreFree(STbUidStore *pStore) { ...@@ -1861,7 +1862,7 @@ void *tsdbUidStoreFree(STbUidStore *pStore) {
* @param uid * @param uid
* @return int32_t * @return int32_t
*/ */
int32_t tsdbFetchTbUidList(void *pTsdb, void **ppStore, void *suid, void *uid) { int32_t tsdbFetchTbUidList(STsdb *pTsdb, STbUidStore **ppStore, tb_uid_t suid, tb_uid_t uid) {
SSmaEnv *pEnv = REPO_RSMA_ENV((STsdb *)pTsdb); SSmaEnv *pEnv = REPO_RSMA_ENV((STsdb *)pTsdb);
// only applicable to rollup SMA ctables // only applicable to rollup SMA ctables
...@@ -1877,7 +1878,7 @@ int32_t tsdbFetchTbUidList(void *pTsdb, void **ppStore, void *suid, void *uid) { ...@@ -1877,7 +1878,7 @@ int32_t tsdbFetchTbUidList(void *pTsdb, void **ppStore, void *suid, void *uid) {
} }
// info cached when create rsma stable and return directly for non-rsma ctables // info cached when create rsma stable and return directly for non-rsma ctables
if (!taosHashGet(infoHash, suid, sizeof(tb_uid_t))) { if (!taosHashGet(infoHash, &suid, sizeof(tb_uid_t))) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1887,7 +1888,7 @@ int32_t tsdbFetchTbUidList(void *pTsdb, void **ppStore, void *suid, void *uid) { ...@@ -1887,7 +1888,7 @@ int32_t tsdbFetchTbUidList(void *pTsdb, void **ppStore, void *suid, void *uid) {
} }
} }
if (tsdbUidStorePut(*ppStore, *(tb_uid_t *)suid, (tb_uid_t *)uid) != 0) { if (tsdbUidStorePut(*ppStore, suid, &uid) != 0) {
*ppStore = tsdbUidStoreFree(*ppStore); *ppStore = tsdbUidStoreFree(*ppStore);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -1935,12 +1936,10 @@ static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid ...@@ -1935,12 +1936,10 @@ static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid
int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pStore) { int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pStore) {
if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) { if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) {
tsdbDebug("vgId:%d no need to update tbUids since empty uidStore", REPO_ID(pTsdb)); tsdbDebug("vgId:%d no need to update tbUids since empty uidStore", REPO_ID(pTsdb));
tsdbUidStoreFree(pStore);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (tsdbUpdateTbUidListImpl(pTsdb, &pStore->suid, pStore->tbUids) != TSDB_CODE_SUCCESS) { if (tsdbUpdateTbUidListImpl(pTsdb, &pStore->suid, pStore->tbUids) != TSDB_CODE_SUCCESS) {
tsdbUidStoreFree(pStore);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -1951,15 +1950,11 @@ int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pStore) { ...@@ -1951,15 +1950,11 @@ int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pStore) {
if (tsdbUpdateTbUidListImpl(pTsdb, pTbSuid, pTbUids) != TSDB_CODE_SUCCESS) { if (tsdbUpdateTbUidListImpl(pTsdb, pTbSuid, pTbUids) != TSDB_CODE_SUCCESS) {
taosHashCancelIterate(pStore->uidHash, pIter); taosHashCancelIterate(pStore->uidHash, pIter);
tsdbUidStoreFree(pStore);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
pIter = taosHashIterate(pStore->uidHash, pIter); pIter = taosHashIterate(pStore->uidHash, pIter);
} }
tsdbUidStoreFree(pStore);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1971,8 +1966,6 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { ...@@ -1971,8 +1966,6 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
STSRow *row = NULL; STSRow *row = NULL;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
// pMsg->length = htonl(pMsg->length);
// pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if (tInitSubmitMsgIterEx(pMsg, &msgIter) < 0) return -1; if (tInitSubmitMsgIterEx(pMsg, &msgIter) < 0) return -1;
while (true) { while (true) {
......
...@@ -300,13 +300,13 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -300,13 +300,13 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq,
goto _err; goto _err;
} }
// tsdbRegisterRSma(pVnode->pTsdb, pVnode->pMeta, &vCreateTbReq);
if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) { if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) {
pRsp->code = terrno; pRsp->code = terrno;
goto _err; goto _err;
} }
tsdbRegisterRSma(pVnode->pTsdb, pVnode->pMeta, &req);
tCoderClear(&coder); tCoderClear(&coder);
return 0; return 0;
...@@ -323,6 +323,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -323,6 +323,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
SVCreateTbBatchRsp rsp = {0}; SVCreateTbBatchRsp rsp = {0};
SVCreateTbRsp cRsp = {0}; SVCreateTbRsp cRsp = {0};
char tbName[TSDB_TABLE_FNAME_LEN]; char tbName[TSDB_TABLE_FNAME_LEN];
STbUidStore *pStore = NULL;
pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP; pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
pRsp->code = TSDB_CODE_SUCCESS; pRsp->code = TSDB_CODE_SUCCESS;
...@@ -361,6 +362,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -361,6 +362,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
cRsp.code = terrno; cRsp.code = terrno;
} else { } else {
cRsp.code = TSDB_CODE_SUCCESS; cRsp.code = TSDB_CODE_SUCCESS;
tsdbFetchTbUidList(pVnode->pTsdb, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
} }
taosArrayPush(rsp.pArray, &cRsp); taosArrayPush(rsp.pArray, &cRsp);
...@@ -368,6 +370,9 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -368,6 +370,9 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
tCoderClear(&coder); tCoderClear(&coder);
tsdbUpdateTbUidList(pVnode->pTsdb, pStore);
tsdbUidStoreFree(pStore);
// prepare rsp // prepare rsp
int32_t ret = 0; int32_t ret = 0;
tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret); tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
...@@ -426,7 +431,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -426,7 +431,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
SSubmitRsp rsp = {0}; SSubmitRsp rsp = {0};
pRsp->code = 0; pRsp->code = 0;
tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
// handle the request // handle the request
if (tsdbInsertData(pVnode->pTsdb, version, pSubmitReq, &rsp) < 0) { if (tsdbInsertData(pVnode->pTsdb, version, pSubmitReq, &rsp) < 0) {
pRsp->code = terrno; pRsp->code = terrno;
...@@ -435,7 +440,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -435,7 +440,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
// pRsp->msgType = TDMT_VND_SUBMIT_RSP; // pRsp->msgType = TDMT_VND_SUBMIT_RSP;
// vnodeProcessSubmitReq(pVnode, ptr, pRsp); // vnodeProcessSubmitReq(pVnode, ptr, pRsp);
// tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, ptr, STREAM_DATA_TYPE_SUBMIT_BLOCK); // tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
// encode the response (TODO) // encode the response (TODO)
pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp)); pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册