提交 771e83fd 编写于 作者: C Cary Xu

feat: rollup refactor

上级 a558b445
......@@ -1520,8 +1520,8 @@ typedef struct {
char* qmsg2; // pAst2:qmsg2:SRetention2 => trigger aggr task2
} SRSmaParam;
int tEncodeSRSmaParam(SCoder* pCoder, const SRSmaParam* pRSmaParam);
int tDecodeSRSmaParam(SCoder* pCoder, SRSmaParam* pRSmaParam);
int32_t tEncodeSRSmaParam(SCoder* pCoder, const SRSmaParam* pRSmaParam);
int32_t tDecodeSRSmaParam(SCoder* pCoder, SRSmaParam* pRSmaParam);
typedef struct SVCreateStbReq {
const char* name;
......
......@@ -214,6 +214,14 @@ STSRow *tdRowDup(STSRow *row);
static FORCE_INLINE SKvRowIdx *tdKvRowColIdxAt(STSRow *pRow, col_id_t idx) {
return (SKvRowIdx *)TD_ROW_COL_IDX(pRow) + idx;
}
static FORCE_INLINE int16_t tdKvRowColIdAt(STSRow *pRow, col_id_t idx) {
ASSERT(idx >= 0);
if (idx == 0) {
return PRIMARYKEY_TIMESTAMP_COL_ID;
}
return ((SKvRowIdx *)TD_ROW_COL_IDX(pRow) + idx - 1)->colId;
}
static FORCE_INLINE void *tdKVRowColVal(STSRow *pRow, SKvRowIdx *pIdx) { return POINTER_SHIFT(pRow, pIdx->offset); }
#define TD_ROW_OFFSET(p) ((p)->toffset); // During ParseInsert when without STSchema, how to get the offset for STpRow?
......
......@@ -3616,6 +3616,43 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
taosMemoryFreeClear(pReq->ast);
}
int32_t tEncodeSRSmaParam(SCoder *pCoder, const SRSmaParam *pRSmaParam) {
if (tEncodeFloat(pCoder, pRSmaParam->xFilesFactor) < 0) return -1;
if (tEncodeI32v(pCoder, pRSmaParam->delay) < 0) return -1;
if (tEncodeI32v(pCoder, pRSmaParam->qmsg1Len) < 0) return -1;
if (tEncodeI32v(pCoder, pRSmaParam->qmsg2Len) < 0) return -1;
if (pRSmaParam->qmsg1Len > 0) {
if (tEncodeBinary(pCoder, pRSmaParam->qmsg1, (uint64_t)pRSmaParam->qmsg1Len) < 0) // qmsg1Len contains len of '\0'
return -1;
}
if (pRSmaParam->qmsg2Len > 0) {
if (tEncodeBinary(pCoder, pRSmaParam->qmsg2, (uint64_t)pRSmaParam->qmsg2Len) < 0) // qmsg2Len contains len of '\0'
return -1;
}
return 0;
}
int32_t tDecodeSRSmaParam(SCoder *pCoder, SRSmaParam *pRSmaParam) {
if (tDecodeFloat(pCoder, &pRSmaParam->xFilesFactor) < 0) return -1;
if (tDecodeI32v(pCoder, &pRSmaParam->delay) < 0) return -1;
if (tDecodeI32v(pCoder, &pRSmaParam->qmsg1Len) < 0) return -1;
if (tDecodeI32v(pCoder, &pRSmaParam->qmsg2Len) < 0) return -1;
if (pRSmaParam->qmsg1Len > 0) {
uint64_t len;
if (tDecodeBinaryAlloc(pCoder, (void **)&pRSmaParam->qmsg1, &len) < 0) return -1; // qmsg1Len contains len of '\0'
} else {
pRSmaParam->qmsg1 = NULL;
}
if (pRSmaParam->qmsg2Len > 0) {
uint64_t len;
if (tDecodeBinaryAlloc(pCoder, (void **)&pRSmaParam->qmsg2, &len) < 0) return -1; // qmsg2Len contains len of '\0'
} else {
pRSmaParam->qmsg2 = NULL;
}
return 0;
}
int tEncodeSVCreateStbReq(SCoder *pCoder, const SVCreateStbReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
......@@ -3624,9 +3661,9 @@ int tEncodeSVCreateStbReq(SCoder *pCoder, const SVCreateStbReq *pReq) {
if (tEncodeI8(pCoder, pReq->rollup) < 0) return -1;
if (tEncodeSSchemaWrapper(pCoder, &pReq->schema) < 0) return -1;
if (tEncodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1;
// if (pReq->rollup) {
// if (tEncodeSRSmaParam(pCoder, pReq->pRSmaParam) < 0) return -1;
// }
if (pReq->rollup) {
if (tEncodeSRSmaParam(pCoder, &pReq->pRSmaParam) < 0) return -1;
}
tEndEncode(pCoder);
return 0;
......@@ -3640,9 +3677,9 @@ int tDecodeSVCreateStbReq(SCoder *pCoder, SVCreateStbReq *pReq) {
if (tDecodeI8(pCoder, &pReq->rollup) < 0) return -1;
if (tDecodeSSchemaWrapper(pCoder, &pReq->schema) < 0) return -1;
if (tDecodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1;
// if (pReq->rollup) {
// if (tDecodeSRSmaParam(pCoder, pReq->pRSmaParam) < 0) return -1;
// }
if (pReq->rollup) {
if (tDecodeSRSmaParam(pCoder, &pReq->pRSmaParam) < 0) return -1;
}
tEndDecode(pCoder);
return 0;
......
......@@ -220,7 +220,7 @@ static uint8_t tdGetMergedBitmapByte(uint8_t byte) {
}
/**
* @brief Merge bitmap from 2 bits to 1 bits, and the memory buffer should be guaranteed by the invoker.
* @brief Merge bitmap from 2 bits to 1 bit, and the memory buffer should be guaranteed by the invoker.
*
* @param srcBitmap
* @param nBits
......
......@@ -22,13 +22,13 @@
extern "C" {
#endif
typedef int32_t (*__tb_ddl_fn_t)(void *ahandle, void **result, void *p1, void *p2);
// typedef int32_t (*__tb_ddl_fn_t)(void *ahandle, void **result, void *p1, void *p2);
struct STbDdlH {
void *ahandle;
void *result;
__tb_ddl_fn_t fp;
};
// struct STbDdlH {
// void *ahandle;
// void *result;
// __tb_ddl_fn_t fp;
// };
static FORCE_INLINE int32_t tsdbUidStoreInit(STbUidStore **pStore) {
ASSERT(*pStore == NULL);
......@@ -40,14 +40,6 @@ static FORCE_INLINE int32_t tsdbUidStoreInit(STbUidStore **pStore) {
return TSDB_CODE_SUCCESS;
}
int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
void tsdbUidStoreDestory(STbUidStore *pStore);
void *tsdbUidStoreFree(STbUidStore *pStore);
int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq);
int32_t tsdbFetchTbUidList(void *pTsdb, void **result, void *suid, void *uid);
int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pUidStore);
int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, void *pMsg, int32_t inputType);
#ifdef __cplusplus
}
......
......@@ -108,6 +108,15 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
// sma
int32_t tsdbRegisterRSma(STsdb* pTsdb, SMeta* pMeta, SVCreateStbReq* pReq);
int32_t tsdbFetchTbUidList(STsdb* pTsdb, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
int32_t tsdbUpdateTbUidList(STsdb* pTsdb, STbUidStore* pUidStore);
void tsdbUidStoreDestory(STbUidStore* pStore);
void* tsdbUidStoreFree(STbUidStore* pStore);
int32_t tsdbTriggerRSma(STsdb* pTsdb, SMeta* pMeta, void* pMsg, int32_t inputType);
typedef struct {
int8_t streamType; // sma or other
int8_t dstType;
......@@ -163,7 +172,7 @@ struct STbUidStore {
#define TD_VID(PVNODE) (PVNODE)->config.vgId
typedef struct STbDdlH STbDdlH;
// typedef struct STbDdlH STbDdlH;
// sma
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
......
......@@ -1519,8 +1519,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
} else if (isRow1DataRow) {
colIdOfRow1 = pSchema1->columns[j].colId;
} else {
SKvRowIdx* pColIdx = tdKvRowColIdxAt(row1, j);
colIdOfRow1 = pColIdx->colId;
colIdOfRow1 = tdKvRowColIdAt(row1, j);
}
int32_t colIdOfRow2;
......@@ -1529,8 +1528,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
} else if (isRow2DataRow) {
colIdOfRow2 = pSchema2->columns[k].colId;
} else {
SKvRowIdx* pColIdx = tdKvRowColIdxAt(row2, k);
colIdOfRow2 = pColIdx->colId;
colIdOfRow2 = tdKvRowColIdAt(row2, j);
}
if (colIdOfRow1 == colIdOfRow2) {
......
......@@ -173,6 +173,7 @@ static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[])
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char *msg);
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg);
static FORCE_INLINE int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid, SArray *tbUids);
// mgmt interface
static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid);
......@@ -1692,18 +1693,16 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) {
* @param pReq
* @return int32_t
*/
int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) {
#if 0
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
if (!param) {
tsdbDebug("vgId:%d return directly since no rollup for stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name,
pReq->stbCfg.suid);
int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateStbReq *pReq) {
if (!pReq->rollup) {
tsdbDebug("vgId:%d return directly since no rollup for stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->suid);
return TSDB_CODE_SUCCESS;
}
SRSmaParam *param = &pReq->pRSmaParam;
if ((param->qmsg1Len == 0) && (param->qmsg2Len == 0)) {
tsdbWarn("vgId:%d no qmsg1/qmsg2 for rollup stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->stbCfg.suid);
tsdbWarn("vgId:%d no qmsg1/qmsg2 for rollup stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->suid);
return TSDB_CODE_SUCCESS;
}
......@@ -1716,9 +1715,9 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) {
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaInfo *pRSmaInfo = NULL;
pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &pReq->stbCfg.suid, sizeof(tb_uid_t));
pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t));
if (pRSmaInfo) {
tsdbWarn("vgId:%d rsma info already exists for stb: %s, %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->stbCfg.suid);
tsdbWarn("vgId:%d rsma info already exists for stb: %s, %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->suid);
return TSDB_CODE_SUCCESS;
}
......@@ -1758,14 +1757,13 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) {
}
}
if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->stbCfg.suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) !=
if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) !=
TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED;
} else {
tsdbDebug("vgId:%d register rsma info succeed for suid:%" PRIi64, REPO_ID(pTsdb), pReq->stbCfg.suid);
tsdbDebug("vgId:%d register rsma info succeed for suid:%" PRIi64, REPO_ID(pTsdb), pReq->suid);
}
#endif
return TSDB_CODE_SUCCESS;
}
......@@ -1777,7 +1775,7 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) {
* @param uid
* @return int32_t
*/
int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) {
static int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) {
// prefer to store suid/uids in array
if ((suid == pStore->suid) || (pStore->suid == 0)) {
if (pStore->suid == 0) {
......@@ -1833,6 +1831,7 @@ void tsdbUidStoreDestory(STbUidStore *pStore) {
if (pStore) {
if (pStore->uidHash) {
if (pStore->tbUids) {
// When pStore->tbUids not NULL, the pStore->uidHash has k/v; otherwise pStore->uidHash only has keys.
void *pIter = taosHashIterate(pStore->uidHash, NULL);
while (pIter) {
SArray *arr = *(SArray **)pIter;
......@@ -1847,8 +1846,10 @@ void tsdbUidStoreDestory(STbUidStore *pStore) {
}
void *tsdbUidStoreFree(STbUidStore *pStore) {
tsdbUidStoreDestory(pStore);
taosMemoryFree(pStore);
if (pStore) {
tsdbUidStoreDestory(pStore);
taosMemoryFree(pStore);
}
return NULL;
}
......@@ -1861,7 +1862,7 @@ void *tsdbUidStoreFree(STbUidStore *pStore) {
* @param uid
* @return int32_t
*/
int32_t tsdbFetchTbUidList(void *pTsdb, void **ppStore, void *suid, void *uid) {
int32_t tsdbFetchTbUidList(STsdb *pTsdb, STbUidStore **ppStore, tb_uid_t suid, tb_uid_t uid) {
SSmaEnv *pEnv = REPO_RSMA_ENV((STsdb *)pTsdb);
// only applicable to rollup SMA ctables
......@@ -1877,7 +1878,7 @@ int32_t tsdbFetchTbUidList(void *pTsdb, void **ppStore, void *suid, void *uid) {
}
// info cached when create rsma stable and return directly for non-rsma ctables
if (!taosHashGet(infoHash, suid, sizeof(tb_uid_t))) {
if (!taosHashGet(infoHash, &suid, sizeof(tb_uid_t))) {
return TSDB_CODE_SUCCESS;
}
......@@ -1887,7 +1888,7 @@ int32_t tsdbFetchTbUidList(void *pTsdb, void **ppStore, void *suid, void *uid) {
}
}
if (tsdbUidStorePut(*ppStore, *(tb_uid_t *)suid, (tb_uid_t *)uid) != 0) {
if (tsdbUidStorePut(*ppStore, suid, &uid) != 0) {
*ppStore = tsdbUidStoreFree(*ppStore);
return TSDB_CODE_FAILED;
}
......@@ -1935,12 +1936,10 @@ static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid
int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pStore) {
if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) {
tsdbDebug("vgId:%d no need to update tbUids since empty uidStore", REPO_ID(pTsdb));
tsdbUidStoreFree(pStore);
return TSDB_CODE_SUCCESS;
}
if (tsdbUpdateTbUidListImpl(pTsdb, &pStore->suid, pStore->tbUids) != TSDB_CODE_SUCCESS) {
tsdbUidStoreFree(pStore);
return TSDB_CODE_FAILED;
}
......@@ -1951,15 +1950,11 @@ int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pStore) {
if (tsdbUpdateTbUidListImpl(pTsdb, pTbSuid, pTbUids) != TSDB_CODE_SUCCESS) {
taosHashCancelIterate(pStore->uidHash, pIter);
tsdbUidStoreFree(pStore);
return TSDB_CODE_FAILED;
}
pIter = taosHashIterate(pStore->uidHash, pIter);
}
tsdbUidStoreFree(pStore);
return TSDB_CODE_SUCCESS;
}
......@@ -1971,8 +1966,6 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
STSRow *row = NULL;
terrno = TSDB_CODE_SUCCESS;
// pMsg->length = htonl(pMsg->length);
// pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if (tInitSubmitMsgIterEx(pMsg, &msgIter) < 0) return -1;
while (true) {
......
......@@ -300,13 +300,13 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq,
goto _err;
}
// tsdbRegisterRSma(pVnode->pTsdb, pVnode->pMeta, &vCreateTbReq);
if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) {
pRsp->code = terrno;
goto _err;
}
tsdbRegisterRSma(pVnode->pTsdb, pVnode->pMeta, &req);
tCoderClear(&coder);
return 0;
......@@ -323,6 +323,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
SVCreateTbBatchRsp rsp = {0};
SVCreateTbRsp cRsp = {0};
char tbName[TSDB_TABLE_FNAME_LEN];
STbUidStore *pStore = NULL;
pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
......@@ -361,6 +362,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
cRsp.code = terrno;
} else {
cRsp.code = TSDB_CODE_SUCCESS;
tsdbFetchTbUidList(pVnode->pTsdb, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
}
taosArrayPush(rsp.pArray, &cRsp);
......@@ -368,6 +370,9 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
tCoderClear(&coder);
tsdbUpdateTbUidList(pVnode->pTsdb, pStore);
tsdbUidStoreFree(pStore);
// prepare rsp
tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen);
pRsp->pCont = rpcMallocCont(pRsp->contLen);
......@@ -425,7 +430,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
SSubmitRsp rsp = {0};
pRsp->code = 0;
tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
// handle the request
if (tsdbInsertData(pVnode->pTsdb, version, pSubmitReq, &rsp) < 0) {
pRsp->code = terrno;
......@@ -434,7 +439,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
// pRsp->msgType = TDMT_VND_SUBMIT_RSP;
// vnodeProcessSubmitReq(pVnode, ptr, pRsp);
// tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, ptr, STREAM_DATA_TYPE_SUBMIT_BLOCK);
// tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
// encode the response (TODO)
pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册