提交 45b88c1a 编写于 作者: C Cary Xu

feat: build submit blk by groupid in rsma result

上级 675fe706
...@@ -230,7 +230,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); ...@@ -230,7 +230,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
void blockDebugShowData(const SArray* dataBlocks); void blockDebugShowData(const SArray* dataBlocks);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t uid, tb_uid_t suid); tb_uid_t suid);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId); const char* stbFullName, int32_t vgId);
...@@ -299,4 +299,3 @@ static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* da ...@@ -299,4 +299,3 @@ static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* da
#endif #endif
#endif /*_TD_COMMON_EP_H_*/ #endif /*_TD_COMMON_EP_H_*/
...@@ -1508,14 +1508,11 @@ void blockDebugShowData(const SArray* dataBlocks) { ...@@ -1508,14 +1508,11 @@ void blockDebugShowData(const SArray* dataBlocks) {
* @param pReq * @param pReq
* @param pDataBlocks * @param pDataBlocks
* @param vgId * @param vgId
* @param uid set as parameter temporarily // TODO: remove this parameter, and the executor should set uid in
* SDataBlock->info.uid
* @param suid // TODO: check with Liao whether suid response is reasonable * @param suid // TODO: check with Liao whether suid response is reasonable
* *
* TODO: colId should be set * TODO: colId should be set
*/ */
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, tb_uid_t suid) {
tb_uid_t uid, tb_uid_t suid) {
int32_t sz = taosArrayGetSize(pDataBlocks); int32_t sz = taosArrayGetSize(pDataBlocks);
int32_t bufSize = sizeof(SSubmitReq); int32_t bufSize = sizeof(SSubmitReq);
for (int32_t i = 0; i < sz; ++i) { for (int32_t i = 0; i < sz; ++i) {
...@@ -1551,7 +1548,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks ...@@ -1551,7 +1548,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
SSubmitBlk* pSubmitBlk = POINTER_SHIFT(pDataBuf, msgLen); SSubmitBlk* pSubmitBlk = POINTER_SHIFT(pDataBuf, msgLen);
pSubmitBlk->suid = suid; pSubmitBlk->suid = suid;
pSubmitBlk->uid = uid; pSubmitBlk->uid = pDataBlock->info.groupId;
pSubmitBlk->numOfRows = rows; pSubmitBlk->numOfRows = rows;
++numOfBlks; ++numOfBlks;
...@@ -1562,6 +1559,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks ...@@ -1562,6 +1559,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen)); // set row buf tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen)); // set row buf
printf("|"); printf("|");
bool isStartKey = false; bool isStartKey = false;
int32_t offset = 0;
for (int32_t k = 0; k < colNum; ++k) { // iterate by column for (int32_t k = 0; k < colNum; ++k) { // iterate by column
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
...@@ -1570,18 +1568,18 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks ...@@ -1570,18 +1568,18 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
if (!isStartKey) { if (!isStartKey) {
isStartKey = true; isStartKey = true;
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true,
0, 0); offset, k);
} else { } else {
tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, 8, k); tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, offset, k);
break;
} }
break; break;
case TSDB_DATA_TYPE_NCHAR: { case TSDB_DATA_TYPE_NCHAR: {
tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true, 8, k); tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true, offset, k);
break; break;
} }
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true, 8, k); tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true, offset, k);
break; break;
} }
case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARBINARY:
...@@ -1593,13 +1591,14 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks ...@@ -1593,13 +1591,14 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
break; break;
default: default:
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) { if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
tdAppendColValToRow(&rb, 2, pColInfoData->info.type, TD_VTYPE_NORM, var, true, 8, k); tdAppendColValToRow(&rb, 2, pColInfoData->info.type, TD_VTYPE_NORM, var, true, offset, k);
} else { } else {
printf("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type); printf("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
TASSERT(0); TASSERT(0);
} }
break; break;
} }
offset += TYPE_BYTES[pColInfoData->info.type];
} }
dataLen += TD_ROW_LEN(rb.pBuf); dataLen += TD_ROW_LEN(rb.pBuf);
} }
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
static FORCE_INLINE int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static FORCE_INLINE int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids); static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, qTaskInfo_t *taskInfo, static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, qTaskInfo_t *taskInfo,
STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid, int8_t level); STSchema *pTSchema, tb_uid_t suid, int8_t level);
struct SRSmaInfo { struct SRSmaInfo {
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
...@@ -364,7 +364,7 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { ...@@ -364,7 +364,7 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
} }
static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, qTaskInfo_t *taskInfo, static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, qTaskInfo_t *taskInfo,
STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid, int8_t level) { STSchema *pTSchema, tb_uid_t suid, int8_t level) {
SArray *pResult = NULL; SArray *pResult = NULL;
if (!taskInfo) { if (!taskInfo) {
...@@ -399,7 +399,7 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3 ...@@ -399,7 +399,7 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
blockDebugShowData(pResult); blockDebugShowData(pResult);
STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2); STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2);
SSubmitReq *pReq = NULL; SSubmitReq *pReq = NULL;
if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), uid, suid) != 0) { if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) != 0) {
taosArrayDestroy(pResult); taosArrayDestroy(pResult);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -418,15 +418,13 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3 ...@@ -418,15 +418,13 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid, tb_uid_t uid) { static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
if (!pEnv) { if (!pEnv) {
// only applicable when rsma env exists // only applicable when rsma env exists
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
ASSERT(uid != 0); // TODO: remove later
SSmaStat *pStat = SMA_ENV_STAT(pEnv); SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaInfo *pRSmaInfo = NULL; SRSmaInfo *pRSmaInfo = NULL;
...@@ -448,8 +446,8 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb ...@@ -448,8 +446,8 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo->taskInfo[0], pTSchema, suid, uid, TSDB_RETENTION_L1); tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo->taskInfo[0], pTSchema, suid, TSDB_RETENTION_L1);
tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo->taskInfo[1], pTSchema, suid, uid, TSDB_RETENTION_L2); tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo->taskInfo[1], pTSchema, suid, TSDB_RETENTION_L2);
taosMemoryFree(pTSchema); taosMemoryFree(pTSchema);
} }
...@@ -468,12 +466,12 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { ...@@ -468,12 +466,12 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
tdFetchSubmitReqSuids(pMsg, &uidStore); tdFetchSubmitReqSuids(pMsg, &uidStore);
if (uidStore.suid != 0) { if (uidStore.suid != 0) {
tdExecuteRSma(pSma, pMsg, inputType, uidStore.suid, uidStore.uid); tdExecuteRSma(pSma, pMsg, inputType, uidStore.suid);
void *pIter = taosHashIterate(uidStore.uidHash, NULL); void *pIter = taosHashIterate(uidStore.uidHash, NULL);
while (pIter) { while (pIter) {
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
tdExecuteRSma(pSma, pMsg, inputType, *pTbSuid, 0); tdExecuteRSma(pSma, pMsg, inputType, *pTbSuid);
pIter = taosHashIterate(uidStore.uidHash, pIter); pIter = taosHashIterate(uidStore.uidHash, pIter);
} }
......
...@@ -2040,7 +2040,7 @@ static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg, ...@@ -2040,7 +2040,7 @@ static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg,
blockDebugShowData(pResult); blockDebugShowData(pResult);
STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pTsdb->pVnode->pRSma1 : pTsdb->pVnode->pRSma2); STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pTsdb->pVnode->pRSma1 : pTsdb->pVnode->pRSma2);
SSubmitReq *pReq = NULL; SSubmitReq *pReq = NULL;
if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, TD_VID(pTsdb->pVnode), uid, suid) != 0) { if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, TD_VID(pTsdb->pVnode), suid) != 0) {
taosArrayDestroy(pResult); taosArrayDestroy(pResult);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -2083,7 +2083,7 @@ static int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType ...@@ -2083,7 +2083,7 @@ static int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType
} }
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
// TODO: use the proper schema instead of 0, and cache STSchema in cache // TODO: use the proper schema instead of 1, and cache STSchema in cache
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 1); STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 1);
if (!pTSchema) { if (!pTSchema) {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
......
...@@ -117,7 +117,7 @@ ...@@ -117,7 +117,7 @@
#./test.sh -f tsim/mnode/basic1.sim -m #./test.sh -f tsim/mnode/basic1.sim -m
# --- sma # --- sma
./test.sh -f tsim/sma/tsmaCreateInsertData.sim #./test.sh -f tsim/sma/tsmaCreateInsertData.sim
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim ./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
# --- valgrind # --- valgrind
......
...@@ -37,5 +37,12 @@ print =============== trigger stream to execute sma aggr task and insert sma dat ...@@ -37,5 +37,12 @@ print =============== trigger stream to execute sma aggr task and insert sma dat
sql insert into ct1 values(now+5s, 20, 20.0, 30.0) sql insert into ct1 values(now+5s, 20, 20.0, 30.0)
#=================================================================== #===================================================================
print =============== select * from ct1 from memory
sql select * from ct1;
print $data00 $data01
if $rows != 5 then
print rows $rows != 5
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册