提交 fb45d0d9 编写于 作者: C Cary Xu

roll up sma generate

上级 4a8db285
......@@ -57,7 +57,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle);
* @param type
* @return
*/
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type);
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool converted);
/**
* Set multiple input data blocks for the stream scan.
......@@ -67,7 +67,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type);
* @param type
* @return
*/
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type);
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type, bool converted);
/**
* Update the table id list, add or remove.
......
......@@ -453,7 +453,6 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
}
}
if (pStb->ast2Len > 0) {
int32_t qmsgLen2 = 0;
if (mndConvertRSmaTask(pStb->pAst2, 0, 0, &pRSmaParam->qmsg2, &pRSmaParam->qmsg2Len) != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pRSmaParam->pFuncIds);
taosMemoryFreeClear(pRSmaParam->qmsg1);
......
......@@ -108,6 +108,7 @@ void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList)
int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
int32_t tqReadHandleSetMsgEx(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle);
int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, int32_t *pNumOfRows);
......
......@@ -51,7 +51,7 @@ static FORCE_INLINE tb_uid_t metaGenerateUid(SMeta* pMeta) { return tGenIdPI64()
#define META_CHILD_TABLE TD_CHILD_TABLE
#define META_NORMAL_TABLE TD_NORMAL_TABLE
int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg);
int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg, STbDdlHandle *pHandle);
int metaDropTable(SMeta* pMeta, tb_uid_t uid);
int metaCommit(SMeta* pMeta);
int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg);
......@@ -74,7 +74,7 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
// SMetaDB
int metaOpenDB(SMeta* pMeta);
void metaCloseDB(SMeta* pMeta);
int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg);
int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlHandle *pHandle);
int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid);
int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg);
int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
......
......@@ -24,8 +24,37 @@
extern "C" {
#endif
typedef int32_t (*__tb_ddl_fn_t)(void *ahandle, void **result, void *p1, void *p2);
struct STbDdlHandle {
void *ahandle;
void *result;
__tb_ddl_fn_t fp;
};
typedef struct {
tb_uid_t suid;
SArray *tbUids;
SHashObj *uidHash;
} STbUidStore;
static FORCE_INLINE int32_t tsdbUidStoreInit(STbUidStore **pStore) {
ASSERT(*pStore == NULL);
*pStore = taosMemoryCalloc(1, sizeof(STbUidStore));
if (*pStore == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t uid);
void *tsdbUidStoreFree(STbUidStore *pStore);
int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq);
int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, SSubmitReq *pMsg);
int32_t tsdbFetchTbUidList(void *pTsdb, void **result, void *suid, void *uid);
int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pUidStore);
int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, const void *pMsg, int32_t inputType);
#ifdef __cplusplus
}
......
......@@ -102,6 +102,8 @@ struct SVnode {
#define TD_VID(PVNODE) (PVNODE)->config.vgId
typedef struct STbDdlHandle STbDdlHandle;
// sma
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
......
......@@ -248,7 +248,7 @@ void metaCloseDB(SMeta *pMeta) {
}
}
int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlHandle *pHandle) {
tb_uid_t uid;
SMetaDB *pMetaDb;
void *pKey;
......@@ -347,6 +347,12 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
if (ret < 0) {
return -1;
}
// child table handle for rsma
if (pHandle && pHandle->fp) {
if (((*pHandle->fp)(pHandle->ahandle, &pHandle->result, &ctbIdxKey.suid, &uid)) < 0) {
return -1;
};
}
} else if (pTbCfg->type == META_NORMAL_TABLE) {
pKey = &uid;
kLen = sizeof(uid);
......
......@@ -15,7 +15,7 @@
#include "vnodeInt.h"
int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg) {
int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg, STbDdlHandle *pHandle) {
// Validate the tbOptions
// if (metaValidateTbCfg(pMeta, pTbCfg) < 0) {
// // TODO: handle error
......@@ -24,7 +24,7 @@ int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg) {
// TODO: add atomicity
if (metaSaveTableToDB(pMeta, pTbCfg) < 0) {
if (metaSaveTableToDB(pMeta, pTbCfg, pHandle) < 0) {
// TODO: handle error
return -1;
}
......
......@@ -287,7 +287,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
qTaskInfo_t task = pExec->task[workerId];
ASSERT(task);
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK, false);
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
......@@ -450,7 +450,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
qTaskInfo_t task = pTopic->buffer.output[workerId].task;
ASSERT(task);
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK, false);
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) {
SSDataBlock* pDataBlock = NULL;
......
......@@ -56,6 +56,22 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t
return 0;
}
int32_t tqReadHandleSetMsgEx(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) {
pReadHandle->pMsg = pMsg;
// iterate
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
while (true) {
if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1;
if (pReadHandle->pBlock == NULL) break;
}
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
pReadHandle->ver = ver;
memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
return 0;
}
bool tqNextDataBlock(STqReadHandle* pHandle) {
while (1) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
......
......@@ -101,23 +101,24 @@ typedef struct {
STSma *pSma; // cache schema
} SSmaStatItem;
#define RSMA_MAX_LEVEL 2
#define RSMA_TASK_INFO_HASH_SLOT 8
struct SRSmaInfo {
void *taskInfo[RSMA_MAX_LEVEL]; // qTaskInfo_t
};
struct SSmaStat {
union {
SHashObj *smaStatItems; // key: indexUid, value: SSmaStatItem for tsma
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
};
SRSmaInfo rsmaInfo;
T_REF_DECLARE()
};
#define SMA_STAT_ITEMS(s) ((s)->smaStatItems)
#define SMA_STAT_INFO_HASH(s) ((s)->rsmaInfoHash)
#define RSMA_MAX_LEVEL 2
#define RSMA_TASK_INFO_HASH_SLOT 8
struct SRSmaInfo {
void *taskInfo[RSMA_MAX_LEVEL]; // qTaskInfo_t
};
static FORCE_INLINE void tsdbFreeTaskHandle(qTaskInfo_t *taskHandle) {
// Note: free/kill may in RC
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
......@@ -1120,9 +1121,8 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
int32_t storageLevel = tsdbGetSmaStorageLevel(pSma->interval, pSma->intervalUnit);
int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel);
// key: skey + groupId
char smaKey[SMA_KEY_LEN] = {0};
char dataBuf[512] = {0};
char smaKey[SMA_KEY_LEN] = {0}; // key: skey + groupId
char dataBuf[512] = {0}; // val: aggr data // TODO: handle 512 buffer?
void *pDataBuf = NULL;
int32_t sz = taosArrayGetSize(pDataBlocks);
for (int32_t i = 0; i < sz; ++i) {
......@@ -1716,6 +1716,7 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) {
SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb);
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaInfo *pRSmaInfo = NULL;
SRSmaInfo *tRSmaInfo = &pStat->rsmaInfo;
TASSERT(pEnv != NULL && pStat != NULL);
......@@ -1749,6 +1750,7 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) {
taosMemoryFree(pReadHandle);
return TSDB_CODE_FAILED;
}
tRSmaInfo->taskInfo[0] = pRSmaInfo->taskInfo[0];
}
if (param->qmsg2) {
......@@ -1758,6 +1760,7 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) {
taosMemoryFree(pReadHandle);
return TSDB_CODE_FAILED;
}
tRSmaInfo->taskInfo[1] = pRSmaInfo->taskInfo[1];
}
if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->stbCfg.suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) !=
......@@ -1768,6 +1771,151 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) {
return TSDB_CODE_SUCCESS;
}
int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t uid) {
if ((suid == pStore->suid) || (pStore->suid == 0)) {
if (pStore->tbUids == NULL) {
if ((pStore->tbUids = taosArrayInit(1, sizeof(tb_uid_t))) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
pStore->suid = suid;
}
if (taosArrayPush(pStore->tbUids, &uid) == NULL) {
return TSDB_CODE_FAILED;
}
} else {
if (pStore->uidHash == NULL) {
pStore->uidHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (pStore->uidHash == NULL) {
return TSDB_CODE_FAILED;
}
}
SArray *hashVal = taosHashGet(pStore->uidHash, &suid, sizeof(tb_uid_t));
if (hashVal && ((hashVal = *(SArray **)hashVal) != NULL)) {
taosArrayPush(hashVal, &uid);
} else {
SArray *pItem = taosArrayInit(1, sizeof(tb_uid_t));
if (pItem == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
if (taosArrayPush(pItem, &uid) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pItem, sizeof(pItem)) != 0) {
return TSDB_CODE_FAILED;
}
}
}
return TSDB_CODE_SUCCESS;
}
int32_t tsdbFetchTbUidList(void *pTsdb, void **ppStore, void *suid, void *uid) {
SSmaEnv *pEnv = REPO_RSMA_ENV((STsdb *)pTsdb);
if (!pEnv) {
return TSDB_CODE_SUCCESS;
}
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SHashObj *infoHash = NULL;
if (!pStat || !(infoHash = SMA_STAT_INFO_HASH(pStat))) {
terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
return TSDB_CODE_FAILED;
}
// info cached when create rsma stable
if (!taosHashGet(infoHash, suid, sizeof(suid))) {
return TSDB_CODE_SUCCESS;
}
if (*ppStore == NULL) {
if (tsdbUidStoreInit((STbUidStore **)ppStore) != 0) {
return TSDB_CODE_FAILED;
}
}
if (tsdbUidStorePut(*ppStore, *(tb_uid_t *)suid, *(tb_uid_t *)uid) != 0) {
*ppStore = tsdbUidStoreFree(*ppStore);
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
void *tsdbUidStoreFree(STbUidStore *pStore) {
if (pStore) {
taosArrayDestroy(pStore->tbUids);
if (pStore->uidHash) {
void *pIter = taosHashIterate(pStore->uidHash, NULL);
while (pIter != NULL) {
SArray *arr = *(SArray **)pIter;
taosArrayDestroy(arr);
pIter = taosHashIterate(pStore->uidHash, pIter);
}
}
taosMemoryFree(pStore);
}
return NULL;
}
int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pUidStore) {
if (!pUidStore || (taosArrayGetSize(pUidStore->tbUids) == 0)) {
tsdbDebug("vgId:%d empty uidStore and no need to update", REPO_ID(pTsdb));
return TSDB_CODE_SUCCESS;
}
SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb);
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaInfo *pRSmaInfo = &pStat->rsmaInfo;
if (qUpdateQualifiedTableId(pRSmaInfo->taskInfo[0], pUidStore->tbUids, true) != 0) {
tsdbUidStoreFree(pUidStore);
tsdbError("vgId:%d update tbUidList failed since %s", REPO_ID(pTsdb), terrstr(terrno));
return TSDB_CODE_FAILED;
}
tsdbUidStoreFree(pUidStore);
return TSDB_CODE_SUCCESS;
}
int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, const void *pMsg, int32_t inputType) {
SArray *pResult = NULL;
pResult = taosArrayInit(0, sizeof(SSDataBlock));
if (pResult == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb);
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaInfo *pRSmaInfo = NULL;
TASSERT(pEnv != NULL && pStat != NULL);
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
// pRSmaInfo = taosHashGet(pStat->rsmaInfoHash, &pReq->stbCfg.suid, sizeof(tb_uid_t));
// if (pRSmaInfo != NULL) {
// pRSmaInfo = tsdbFreeRSmaInfo(pRSmaInfo);
// }
SRSmaInfo *tRSmaInfo = &pStat->rsmaInfo;
qSetStreamInput(tRSmaInfo->taskInfo[0], pMsg, inputType, true);
while (1) {
SSDataBlock *output;
uint64_t ts;
if (qExecTask(tRSmaInfo->taskInfo[0], &output, &ts) < 0) {
ASSERT(false);
}
if (output == NULL) {
break;
}
taosArrayPush(pResult, output);
}
}
blockDebugShowData(pResult);
return TSDB_CODE_SUCCESS;
}
#if 0
/**
* @brief Get the start TS key of the last data block of one interval/sliding.
......
......@@ -79,6 +79,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
case TDMT_VND_SUBMIT:
pRsp->msgType = TDMT_VND_SUBMIT_RSP;
vnodeProcessSubmitReq(pVnode, ptr, pRsp);
tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, ptr, STREAM_DATA_TYPE_SUBMIT_BLOCK);
break;
case TDMT_VND_MQ_VG_CHANGE:
if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
......@@ -112,7 +113,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
}
} break;
case TDMT_VND_CREATE_SMA: { // timeRangeSMA
if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
// TODO
}
......@@ -210,7 +210,7 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq) {
SVCreateTbReq vCreateTbReq = {0};
tDeserializeSVCreateTbReq(pReq, &vCreateTbReq);
if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) {
if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq), NULL) < 0) {
// TODO
return -1;
}
......@@ -235,6 +235,13 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
SVCreateTbBatchRsp vCreateTbBatchRsp = {0};
tDeserializeSVCreateTbBatchReq(pReq, &vCreateTbBatchReq);
int reqNum = taosArrayGetSize(vCreateTbBatchReq.pArray);
STbDdlHandle ddlHandle = {
.ahandle = pVnode->pTsdb,
.result = NULL,
.fp = tsdbFetchTbUidList,
};
for (int i = 0; i < reqNum; i++) {
SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i);
......@@ -250,7 +257,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
taosArrayPush(vCreateTbBatchRsp.rspList, &rsp);
}
if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) {
if (metaCreateTable(pVnode->pMeta, pCreateTbReq, &ddlHandle) < 0) {
// TODO: handle error
vError("vgId:%d, failed to create table: %s", TD_VID(pVnode), pCreateTbReq->name);
}
......@@ -286,6 +293,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
pRsp->contLen = contLen;
}
tsdbUpdateTbUidList(pVnode->pTsdb, ddlHandle.result);
return 0;
}
......
......@@ -19,7 +19,7 @@
#include "tdatablock.h"
#include "vnode.h"
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id, bool converted) {
ASSERT(pOperator != NULL);
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) {
......@@ -32,7 +32,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
return TSDB_CODE_QRY_APP_ERROR;
}
pOperator->status = OP_NOT_OPENED;
return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id, converted);
} else {
pOperator->status = OP_NOT_OPENED;
......@@ -46,8 +46,13 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
}
if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) {
qError("submit msg messed up when initing stream block, %s" PRIx64, id);
if (converted) {
if (tqReadHandleSetMsgEx(pInfo->readerHandle, input, 0) < 0) {
qError("converted: submit msg messed up when initing stream block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR;
}
} else if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) {
qError("raw msg: submit msg messed up when initing stream block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR;
}
} else {
......@@ -67,11 +72,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
}
}
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type) {
return qSetMultiStreamInput(tinfo, input, 1, type);
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool converted) {
return qSetMultiStreamInput(tinfo, input, 1, type, converted);
}
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type, bool converted) {
if (tinfo == NULL) {
return TSDB_CODE_QRY_APP_ERROR;
}
......@@ -82,7 +87,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo), converted);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
} else {
......
......@@ -110,7 +110,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
return -1;
}
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
qSetStreamInput(exec, input, inputType);
qSetStreamInput(exec, input, inputType, true);
while (1) {
SSDataBlock* output;
uint64_t ts;
......@@ -128,7 +128,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
/*for (int32_t i = 0; i < sz; i++) {*/
/*SSDataBlock* pBlock = taosArrayGet(blocks, i);*/
/*qSetStreamInput(exec, pBlock, inputType);*/
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, true);
while (1) {
SSDataBlock* output;
uint64_t ts;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册