提交 c15bba01 编写于 作者: P plum-lihui

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

...@@ -78,11 +78,14 @@ int32_t create_stream() { ...@@ -78,11 +78,14 @@ int32_t create_stream() {
taos_free_result(pRes); taos_free_result(pRes);
/*const char* sql = "select min(k), max(k), sum(k) from tu1";*/ /*const char* sql = "select min(k), max(k), sum(k) from tu1";*/
const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1"; /*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*const char* sql = "select sum(k) from tu1 interval(10m)";*/
pRes = tmq_create_stream(pConn, "stream1", "out1", sql); /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes = taos_query(
pConn,
"create stream stream1 trigger window_close as select min(k), max(k), sum(k) as sum_of_k from tu1 interval(10m)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create stream out1, reason:%s\n", taos_errstr(pRes)); printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
......
...@@ -86,9 +86,9 @@ typedef struct taosField { ...@@ -86,9 +86,9 @@ typedef struct taosField {
} TAOS_FIELD; } TAOS_FIELD;
#ifdef WINDOWS #ifdef WINDOWS
#define DLL_EXPORT __declspec(dllexport) #define DLL_EXPORT __declspec(dllexport)
#else #else
#define DLL_EXPORT #define DLL_EXPORT
#endif #endif
typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code); typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code);
...@@ -271,10 +271,8 @@ DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message); ...@@ -271,10 +271,8 @@ DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message);
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */ /* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
#if 0 #if 0
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen); DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
#endif
DLL_EXPORT TAOS_RES *tmq_create_stream(TAOS *taos, const char *streamName, const char *tbName, const char *sql); DLL_EXPORT TAOS_RES *tmq_create_stream(TAOS *taos, const char *streamName, const char *tbName, const char *sql);
#endif
/* ------------------------------ TMQ END -------------------------------- */ /* ------------------------------ TMQ END -------------------------------- */
#if 1 // Shuduo: temporary enable for app build #if 1 // Shuduo: temporary enable for app build
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code); typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code);
......
...@@ -595,6 +595,34 @@ static FORCE_INLINE int32_t tdSRowSetInfo(SRowBuilder *pBuilder, int32_t nCols, ...@@ -595,6 +595,34 @@ static FORCE_INLINE int32_t tdSRowSetInfo(SRowBuilder *pBuilder, int32_t nCols,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
/**
* @brief
*
* @param pBuilder
* @param nCols
* @param nBoundCols use -1 if not available
* @param flen
* @return FORCE_INLINE
*/
static FORCE_INLINE int32_t tdSRowSetTpInfo(SRowBuilder *pBuilder, int32_t nCols, int32_t flen) {
pBuilder->flen = flen;
pBuilder->nCols = nCols;
if (pBuilder->flen <= 0 || pBuilder->nCols <= 0) {
TASSERT(0);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
#ifdef TD_SUPPORT_BITMAP
// the primary TS key is stored separatedly
pBuilder->nBitmaps = (int16_t)TD_BITMAP_BYTES(pBuilder->nCols - 1);
#else
pBuilder->nBitmaps = 0;
pBuilder->nBoundBitmaps = 0;
#endif
return TSDB_CODE_SUCCESS;
}
/** /**
* @brief To judge row type: STpRow/SKvRow * @brief To judge row type: STpRow/SKvRow
* *
...@@ -1385,7 +1413,6 @@ static void tdSRowPrint(STSRow *row, STSchema *pSchema, const char* tag) { ...@@ -1385,7 +1413,6 @@ static void tdSRowPrint(STSRow *row, STSchema *pSchema, const char* tag) {
} }
printf("\n"); printf("\n");
} }
#ifdef TROW_ORIGIN_HZ #ifdef TROW_ORIGIN_HZ
typedef struct { typedef struct {
uint32_t nRows; uint32_t nRows;
......
...@@ -35,19 +35,6 @@ enum { ...@@ -35,19 +35,6 @@ enum {
STREAM_CREATED_BY__SMA, STREAM_CREATED_BY__SMA,
}; };
#if 0
// pipe -> fetch/pipe queue
// merge -> merge queue
// write -> write queue
enum {
TASK_DISPATCH_MSG__SND_PIPE = 1,
TASK_DISPATCH_MSG__SND_MERGE,
TASK_DISPATCH_MSG__VND_PIPE,
TASK_DISPATCH_MSG__VND_MERGE,
TASK_DISPATCH_MSG__VND_WRITE,
};
#endif
typedef struct { typedef struct {
int32_t nodeId; // 0 for snode int32_t nodeId; // 0 for snode
SEpSet epSet; SEpSet epSet;
...@@ -100,10 +87,6 @@ typedef struct { ...@@ -100,10 +87,6 @@ typedef struct {
int8_t reserved; int8_t reserved;
} STaskSinkFetch; } STaskSinkFetch;
typedef struct {
int8_t reserved;
} STaskSinkShow;
enum { enum {
TASK_SOURCE__SCAN = 1, TASK_SOURCE__SCAN = 1,
TASK_SOURCE__PIPE, TASK_SOURCE__PIPE,
...@@ -128,7 +111,6 @@ enum { ...@@ -128,7 +111,6 @@ enum {
TASK_SINK__TABLE, TASK_SINK__TABLE,
TASK_SINK__SMA, TASK_SINK__SMA,
TASK_SINK__FETCH, TASK_SINK__FETCH,
TASK_SINK__SHOW,
}; };
typedef struct { typedef struct {
...@@ -155,7 +137,6 @@ typedef struct { ...@@ -155,7 +137,6 @@ typedef struct {
STaskSinkTb tbSink; STaskSinkTb tbSink;
STaskSinkSma smaSink; STaskSinkSma smaSink;
STaskSinkFetch fetchSink; STaskSinkFetch fetchSink;
STaskSinkShow showSink;
}; };
// dispatch // dispatch
......
...@@ -667,7 +667,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { ...@@ -667,7 +667,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
if (code != 0) goto FAIL; if (code != 0) goto FAIL;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) { while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
tscDebug("not ready, retry"); tscDebug("consumer not ready, retry");
taosMsleep(500); taosMsleep(500);
} }
...@@ -693,6 +693,7 @@ void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { ...@@ -693,6 +693,7 @@ void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) {
conf->commitCb = cb; conf->commitCb = cb;
} }
#if 0
TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) { TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) {
STscObj* pTscObj = (STscObj*)taos; STscObj* pTscObj = (STscObj*)taos;
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
...@@ -777,6 +778,7 @@ _return: ...@@ -777,6 +778,7 @@ _return:
return pRequest; return pRequest;
} }
#endif
#if 0 #if 0
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) { int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
......
...@@ -418,6 +418,9 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { ...@@ -418,6 +418,9 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1; if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1; if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1; if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->triggerParam) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->waterMark) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1; if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1; if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
...@@ -464,6 +467,9 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { ...@@ -464,6 +467,9 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1; if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->triggerParam) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->waterMark) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1; if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
......
...@@ -308,6 +308,8 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe ...@@ -308,6 +308,8 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
streamObj.smaId = 0; streamObj.smaId = 0;
/*streamObj.physicalPlan = "";*/ /*streamObj.physicalPlan = "";*/
streamObj.logicalPlan = "not implemented"; streamObj.logicalPlan = "not implemented";
streamObj.trigger = pCreate->triggerType;
streamObj.waterMark = pCreate->watermark;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
...@@ -431,7 +433,7 @@ static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *p ...@@ -431,7 +433,7 @@ static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *p
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pStream); pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
if (pShow->pIter == NULL) break; if (pShow->pIter == NULL) break;
SColumnInfoData *pColInfo; SColumnInfoData *pColInfo;
...@@ -471,8 +473,13 @@ static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *p ...@@ -471,8 +473,13 @@ static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *p
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->trigger, false); colDataAppend(pColInfo, numOfRows, (const char *)&pStream->trigger, false);
numOfRows++;
sdbRelease(pSdb, pStream);
} }
return 0;
pShow->numOfRows += numOfRows;
return numOfRows;
} }
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) { static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
......
...@@ -159,7 +159,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo ...@@ -159,7 +159,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
// decode // decode
pBuf = pVal; pBuf = pVal;
pSW = taosMemoryMalloc(sizeof(pSW)); pSW = taosMemoryMalloc(sizeof(SSchemaWrapper));
tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_DECODER); tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_DECODER);
tDecodeSSchemaWrapper(&coder, pSW); tDecodeSSchemaWrapper(&coder, pSW);
......
...@@ -864,7 +864,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { ...@@ -864,7 +864,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
} }
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
SStreamTask* pTask = taosMemoryMalloc(sizeof(SStreamTask)); SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) { if (pTask == NULL) {
return -1; return -1;
} }
......
...@@ -482,7 +482,8 @@ static SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowI ...@@ -482,7 +482,8 @@ static SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowI
// add a new result set for a new group // add a new result set for a new group
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, sizeof(SResultRowPosition)); taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
sizeof(SResultRowPosition));
} }
// 2. set the new time window to be the new active time window // 2. set the new time window to be the new active time window
...@@ -1034,8 +1035,8 @@ void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlo ...@@ -1034,8 +1035,8 @@ void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlo
} }
} }
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
int32_t paramIndex, int32_t numOfRows) { int32_t numOfRows) {
SColumnInfoData* pColInfo = NULL; SColumnInfoData* pColInfo = NULL;
if (pInput->pData[paramIndex] == NULL) { if (pInput->pData[paramIndex] == NULL) {
pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData)); pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
...@@ -1066,9 +1067,9 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc ...@@ -1066,9 +1067,9 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc
colDataAppendDouble(pColInfo, i, &v); colDataAppendDouble(pColInfo, i, &v);
} }
} else if (type == TSDB_DATA_TYPE_VARCHAR) { } else if (type == TSDB_DATA_TYPE_VARCHAR) {
char *tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE); char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen); STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
for(int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
colDataAppend(pColInfo, i, tmp, false); colDataAppend(pColInfo, i, tmp, false);
} }
} }
...@@ -1491,8 +1492,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -1491,8 +1492,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
pos->groupId = tableGroupId; pos->groupId = tableGroupId;
pos->pos = (SResultRowPosition) {.pageId = pResult->pageId, .offset = pResult->offset}; pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
*(int64_t*) pos->key = pResult->win.skey; *(int64_t*)pos->key = pResult->win.skey;
taosArrayPush(pUpdated, &pos); taosArrayPush(pUpdated, &pos);
} }
...@@ -1569,8 +1570,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -1569,8 +1570,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
pos->groupId = tableGroupId; pos->groupId = tableGroupId;
pos->pos = (SResultRowPosition) {.pageId = pResult->pageId, .offset = pResult->offset}; pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
*(int64_t*) pos->key = pResult->win.skey; *(int64_t*)pos->key = pResult->win.skey;
taosArrayPush(pUpdated, &pos); taosArrayPush(pUpdated, &pos);
} }
...@@ -2866,24 +2867,24 @@ void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbased ...@@ -2866,24 +2867,24 @@ void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbased
size_t num = taosArrayGetSize(pUpdateList); size_t num = taosArrayGetSize(pUpdateList);
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SResKeyPos * pPos = taosArrayGetP(pUpdateList, i); SResKeyPos* pPos = taosArrayGetP(pUpdateList, i);
SFilePage* bufPage = getBufPage(pBuf, pPos->pos.pageId); SFilePage* bufPage = getBufPage(pBuf, pPos->pos.pageId);
SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->pos.offset); SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->pos.offset);
// //
for (int32_t j = 0; j < numOfOutput; ++j) { for (int32_t j = 0; j < numOfOutput; ++j) {
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset); pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset);
// //
struct SResultRowEntryInfo* pResInfo = pCtx[j].resultInfo; struct SResultRowEntryInfo* pResInfo = pCtx[j].resultInfo;
// if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) { // if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
// continue; // continue;
// } // }
// //
// if (pCtx[j].fpSet.process) { // TODO set the dummy function. // if (pCtx[j].fpSet.process) { // TODO set the dummy function.
//// pCtx[j].fpSet.finalize(&pCtx[j]); //// pCtx[j].fpSet.finalize(&pCtx[j]);
// pResInfo->initialized = true; // pResInfo->initialized = true;
// } // }
// //
if (pRow->numOfRows < pResInfo->numOfRes) { if (pRow->numOfRows < pResInfo->numOfRes) {
pRow->numOfRows = pResInfo->numOfRes; pRow->numOfRows = pResInfo->numOfRes;
} }
...@@ -3005,9 +3006,8 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, u ...@@ -3005,9 +3006,8 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, u
SqlFunctionCtx* pCtx = pAggInfo->binfo.pCtx; SqlFunctionCtx* pCtx = pAggInfo->binfo.pCtx;
int32_t* rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset; int32_t* rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset;
SResultRow* pResultRow = SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, uid, (char*)&groupId,
doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, uid, (char*)&groupId, sizeof(groupId), sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
assert(pResultRow != NULL); assert(pResultRow != NULL);
/* /*
...@@ -3105,7 +3105,7 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased ...@@ -3105,7 +3105,7 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
} }
for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) { for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) {
SResKeyPos *pPos = taosArrayGetP(pGroupResInfo->pRows, i); SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
SFilePage* page = getBufPage(pBuf, pPos->pos.pageId); SFilePage* page = getBufPage(pBuf, pPos->pos.pageId);
SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset); SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
...@@ -3751,7 +3751,7 @@ static void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, ...@@ -3751,7 +3751,7 @@ static void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo,
ASSERT(numOfSrcCols >= pBlock->info.numOfCols); ASSERT(numOfSrcCols >= pBlock->info.numOfCols);
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
while(i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) { while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) {
SColumnInfoData* p = taosArrayGet(pCols, i); SColumnInfoData* p = taosArrayGet(pCols, i);
SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, j); SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, j);
if (!pmInfo->output) { if (!pmInfo->output) {
...@@ -3777,10 +3777,10 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI ...@@ -3777,10 +3777,10 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
blockDataEnsureCapacity(pRes, numOfRows); blockDataEnsureCapacity(pRes, numOfRows);
if (pColList == NULL) { // data from other sources if (pColList == NULL) { // data from other sources
int32_t dataLen = *(int32_t*) pData; int32_t dataLen = *(int32_t*)pData;
pData += sizeof(int32_t); pData += sizeof(int32_t);
pRes->info.groupId = *(uint64_t*) pData; pRes->info.groupId = *(uint64_t*)pData;
pData += sizeof(uint64_t); pData += sizeof(uint64_t);
int32_t* colLen = (int32_t*)pData; int32_t* colLen = (int32_t*)pData;
...@@ -3810,8 +3810,8 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI ...@@ -3810,8 +3810,8 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
memcpy(pColInfoData->pData, pStart, colLen[i]); memcpy(pColInfoData->pData, pStart, colLen[i]);
} }
//TODO setting this flag to true temporarily so aggregate function on stable will // TODO setting this flag to true temporarily so aggregate function on stable will
//examine NULL value for non-primary key column // examine NULL value for non-primary key column
pColInfoData->hasNull = true; pColInfoData->hasNull = true;
pStart += colLen[i]; pStart += colLen[i];
} }
...@@ -3847,11 +3847,11 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI ...@@ -3847,11 +3847,11 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
blockDataEnsureCapacity(&block, numOfRows); blockDataEnsureCapacity(&block, numOfRows);
int32_t dataLen = *(int32_t*) pStart; int32_t dataLen = *(int32_t*)pStart;
uint64_t groupId = *(uint64_t*) (pStart + sizeof(int32_t)); uint64_t groupId = *(uint64_t*)(pStart + sizeof(int32_t));
pStart += sizeof(int32_t) + sizeof(uint64_t); pStart += sizeof(int32_t) + sizeof(uint64_t);
int32_t* colLen = (int32_t*) (pStart); int32_t* colLen = (int32_t*)(pStart);
pStart += sizeof(int32_t) * numOfCols; pStart += sizeof(int32_t) * numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
...@@ -4226,8 +4226,8 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock ...@@ -4226,8 +4226,8 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
pOperator->numOfOutput = pBlock->info.numOfCols; pOperator->numOfOutput = pBlock->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
NULL, NULL, NULL); destroyExchangeOperatorInfo, NULL, NULL, NULL);
#if 1 #if 1
{ // todo refactor { // todo refactor
...@@ -4716,8 +4716,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR ...@@ -4716,8 +4716,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo, pOperator->fpSet =
NULL, NULL, NULL); createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, NULL);
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
...@@ -4933,8 +4933,8 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi ...@@ -4933,8 +4933,8 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
initResultRow(resultRow); initResultRow(resultRow);
prepareResultListBuffer(&pInfo->resultRowInfo, pOperator->pTaskInfo->env); prepareResultListBuffer(&pInfo->resultRowInfo, pOperator->pTaskInfo->env);
// pInfo->resultRowInfo.cur = pInfo->resultRowInfo.size; // pInfo->resultRowInfo.cur = pInfo->resultRowInfo.size;
// pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = // pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] =
// (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset}; // (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset}; pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
} }
...@@ -5015,7 +5015,8 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock) ...@@ -5015,7 +5015,8 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock)
// todo optimize performance // todo optimize performance
// If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
// they may not belong to the same group the limit/offset value is not valid in this case. // they may not belong to the same group the limit/offset value is not valid in this case.
if (pRes->info.rows >= pOperator->resultInfo.threshold || pProjectInfo->slimit.offset != -1 || pProjectInfo->slimit.limit != -1) { if (pRes->info.rows >= pOperator->resultInfo.threshold || pProjectInfo->slimit.offset != -1 ||
pProjectInfo->slimit.limit != -1) {
return PROJECT_RETRIEVE_DONE; return PROJECT_RETRIEVE_DONE;
} else { // not full enough, continue to accumulate the output data in the buffer. } else { // not full enough, continue to accumulate the output data in the buffer.
return PROJECT_RETRIEVE_CONTINUE; return PROJECT_RETRIEVE_CONTINUE;
...@@ -5291,7 +5292,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -5291,7 +5292,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) {
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
// finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput); // finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput);
// initGroupedResultInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo); // initGroupedResultInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo);
// doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes); // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes);
if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) { if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) {
...@@ -5342,7 +5343,7 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup ...@@ -5342,7 +5343,7 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf,
&pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
// initGroupedResultInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); // initGroupedResultInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
OPTR_SET_OPENED(pOperator); OPTR_SET_OPENED(pOperator);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
...@@ -5684,8 +5685,8 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n ...@@ -5684,8 +5685,8 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t)); pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK); pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
// pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); // pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
// pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell)); // pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell));
if (pAggSup->keyBuf == NULL /*|| pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL*/ || if (pAggSup->keyBuf == NULL /*|| pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL*/ ||
pAggSup->pResultRowHashTable == NULL) { pAggSup->pResultRowHashTable == NULL) {
...@@ -5703,8 +5704,8 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n ...@@ -5703,8 +5704,8 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
static void cleanupAggSup(SAggSupporter* pAggSup) { static void cleanupAggSup(SAggSupporter* pAggSup) {
taosMemoryFreeClear(pAggSup->keyBuf); taosMemoryFreeClear(pAggSup->keyBuf);
taosHashCleanup(pAggSup->pResultRowHashTable); taosHashCleanup(pAggSup->pResultRowHashTable);
// taosHashCleanup(pAggSup->pResultRowListSet); // taosHashCleanup(pAggSup->pResultRowListSet);
// taosArrayDestroy(pAggSup->pResultRowArrayList); // taosArrayDestroy(pAggSup->pResultRowArrayList);
destroyDiskbasedBuf(pAggSup->pResultBuf); destroyDiskbasedBuf(pAggSup->pResultBuf);
} }
...@@ -5715,7 +5716,7 @@ int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInf ...@@ -5715,7 +5716,7 @@ int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInf
doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, keyBufSize, pkey); doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, keyBufSize, pkey);
for(int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
pBasicInfo->pCtx[i].pBuf = pAggSup->pResultBuf; pBasicInfo->pCtx[i].pBuf = pAggSup->pResultBuf;
} }
...@@ -5732,6 +5733,10 @@ void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) { ...@@ -5732,6 +5733,10 @@ void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) {
} }
static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInfo) { static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInfo) {
if (pTableGroupInfo->numOfTables == 0) {
return NULL;
}
STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
if (pTableQueryInfo == NULL) { if (pTableQueryInfo == NULL) {
return NULL; return NULL;
...@@ -5757,7 +5762,8 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf ...@@ -5757,7 +5762,8 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo,
const STableGroupInfo* pTableGroupInfo) {
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -5771,7 +5777,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -5771,7 +5777,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
int32_t code = int32_t code =
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, keyBufSize, pTaskInfo->id.str); initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, keyBufSize, pTaskInfo->id.str);
pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -5926,8 +5932,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p ...@@ -5926,8 +5932,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
pOperator->pExpr = pExprInfo; pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = num; pOperator->numOfOutput = num;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL, destroyProjectOperatorInfo, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
NULL, NULL, NULL); destroyProjectOperatorInfo, NULL, NULL, NULL);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
...@@ -5954,7 +5960,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -5954,7 +5960,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->order = TSDB_ORDER_ASC; pInfo->order = TSDB_ORDER_ASC;
pInfo->interval = *pInterval; pInfo->interval = *pInterval;
// pInfo->execModel = OPTR_EXEC_MODEL_STREAM; // pInfo->execModel = OPTR_EXEC_MODEL_STREAM;
pInfo->execModel = pTaskInfo->execModel; pInfo->execModel = pTaskInfo->execModel;
pInfo->win = pTaskInfo->window; pInfo->win = pTaskInfo->window;
pInfo->twAggSup = *pTwAggSupp; pInfo->twAggSup = *pTwAggSupp;
...@@ -5984,8 +5990,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -5984,8 +5990,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->numOfOutput = numOfCols; pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL, destroyIntervalOperatorInfo, pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL,
aggEncodeResultRow, aggDecodeResultRow, NULL); destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -6004,7 +6010,8 @@ _error: ...@@ -6004,7 +6010,8 @@ _error:
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
SExecTaskInfo* pTaskInfo) {
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -6042,8 +6049,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr ...@@ -6042,8 +6049,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
pOperator->numOfOutput = numOfCols; pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doStreamIntervalAgg, doStreamIntervalAgg, NULL, destroyIntervalOperatorInfo, pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doStreamIntervalAgg, doStreamIntervalAgg, NULL,
aggEncodeResultRow, aggDecodeResultRow, NULL); destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -6052,13 +6059,12 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr ...@@ -6052,13 +6059,12 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
return pOperator; return pOperator;
_error: _error:
destroyIntervalOperatorInfo(pInfo, numOfCols); destroyIntervalOperatorInfo(pInfo, numOfCols);
taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
} }
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
...@@ -6122,8 +6128,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf ...@@ -6122,8 +6128,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL, destroyStateWindowOperatorInfo, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL,
aggEncodeResultRow, aggDecodeResultRow, NULL); destroyStateWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
...@@ -6168,8 +6174,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo ...@@ -6168,8 +6174,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
pOperator->numOfOutput = numOfCols; pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, destroySWindowOperatorInfo, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL,
aggEncodeResultRow, aggDecodeResultRow, NULL); destroySWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
...@@ -6257,8 +6263,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp ...@@ -6257,8 +6263,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
pOperator->numOfOutput = numOfCols; pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, pOperator->fpSet =
NULL, NULL, NULL); createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
...@@ -6269,7 +6275,6 @@ _error: ...@@ -6269,7 +6275,6 @@ _error:
return NULL; return NULL;
} }
static int32_t getColumnIndexInSource(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr, SColumnInfo* pTagCols) { static int32_t getColumnIndexInSource(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr, SColumnInfo* pTagCols) {
int32_t j = 0; int32_t j = 0;
...@@ -6503,8 +6508,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -6503,8 +6508,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} }
SInterval interval = extractIntervalInfo(pTableScanNode); SInterval interval = extractIntervalInfo(pTableScanNode);
return createTableScanOperatorInfo(pDataReader, &cond, numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq, pColList, return createTableScanOperatorInfo(pDataReader, &cond, numOfCols, pTableScanNode->dataRequired,
pResBlock, pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo); pTableScanNode->scanSeq, pColList, pResBlock, pScanPhyNode->node.pConditions,
&interval, pTableScanNode->ratio, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc);
...@@ -6512,14 +6518,16 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -6512,14 +6518,16 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
queryId, taskId);
SArray* tableIdList = extractTableIdList(pTableGroupInfo); SArray* tableIdList = extractTableIdList(pTableGroupInfo);
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
int32_t numOfCols = 0; int32_t numOfCols = 0;
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pCols, tableIdList, pTaskInfo); SOperatorInfo* pOperator =
createStreamScanOperatorInfo(pHandle->reader, pResBlock, pCols, tableIdList, pTaskInfo);
taosArrayDestroy(tableIdList); taosArrayDestroy(tableIdList);
return pOperator; return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
...@@ -6529,10 +6537,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -6529,10 +6537,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pScanNode->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pScanNode->node.pOutputDataBlockDesc);
int32_t numOfOutputCols = 0; int32_t numOfOutputCols = 0;
SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pScanNode->node.pOutputDataBlockDesc, &numOfOutputCols); SArray* colList =
extractColMatchInfo(pScanNode->pScanCols, pScanNode->node.pOutputDataBlockDesc, &numOfOutputCols);
SOperatorInfo* pOperator = createSysTableScanOperatorInfo( SOperatorInfo* pOperator = createSysTableScanOperatorInfo(
pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList,
colList, pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId); pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
return pOperator; return pOperator;
} else { } else {
ASSERT(0); ASSERT(0);
...@@ -6649,7 +6658,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -6649,7 +6658,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return pOptr; return pOptr;
} }
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) { static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) {
pCond->loadExternalRows = false; pCond->loadExternalRows = false;
...@@ -6664,7 +6672,7 @@ static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableS ...@@ -6664,7 +6672,7 @@ static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableS
pCond->twindow = pTableScanNode->scanRange; pCond->twindow = pTableScanNode->scanRange;
#if 1 #if 1
//todo work around a problem, remove it later // todo work around a problem, remove it later
if ((pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey > pCond->twindow.ekey) || if ((pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey > pCond->twindow.ekey) ||
(pCond->order == TSDB_ORDER_DESC && pCond->twindow.skey < pCond->twindow.ekey)) { (pCond->order == TSDB_ORDER_DESC && pCond->twindow.skey < pCond->twindow.ekey)) {
TSWAP(pCond->twindow.skey, pCond->twindow.ekey); TSWAP(pCond->twindow.skey, pCond->twindow.ekey);
...@@ -6717,7 +6725,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { ...@@ -6717,7 +6725,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
taosArrayPush(pList, &c); taosArrayPush(pList, &c);
} else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) { } else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
SValueNode* pValNode = (SValueNode*) pNode->pExpr; SValueNode* pValNode = (SValueNode*)pNode->pExpr;
SColumn c = {0}; SColumn c = {0};
c.slotId = pNode->slotId; c.slotId = pNode->slotId;
c.colId = pNode->slotId; c.colId = pNode->slotId;
...@@ -6925,7 +6933,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead ...@@ -6925,7 +6933,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
goto _complete; goto _complete;
} }
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoGroupInfo); (*pTaskInfo)->pRoot =
createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoGroupInfo);
if (NULL == (*pTaskInfo)->pRoot) { if (NULL == (*pTaskInfo)->pRoot) {
code = terrno; code = terrno;
goto _complete; goto _complete;
...@@ -7263,8 +7272,8 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf ...@@ -7263,8 +7272,8 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyBasicOperatorInfo, pOperator->fpSet =
NULL, NULL, NULL); createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyBasicOperatorInfo, NULL, NULL, NULL);
int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream); int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
return pOperator; return pOperator;
......
#!/bin/bash #!/bin/bash
set -e set -e
set -x
python3 ./test.py -f 0-others/taosShell.py python3 ./test.py -f 0-others/taosShell.py
......
...@@ -93,6 +93,7 @@ function runSimCaseOneByOnefq { ...@@ -93,6 +93,7 @@ function runSimCaseOneByOnefq {
if [[ $line =~ ^./test.sh* ]] || [[ $line =~ ^run* ]]; then if [[ $line =~ ^./test.sh* ]] || [[ $line =~ ^run* ]]; then
#case=`echo $line | grep sim$ |awk '{print $NF}'` #case=`echo $line | grep sim$ |awk '{print $NF}'`
case=`echo $line | grep -o ".*\.sim" |awk '{print $NF}'` case=`echo $line | grep -o ".*\.sim" |awk '{print $NF}'`
echo "$line running ..."
start_time=`date +%s` start_time=`date +%s`
date +%F\ %T | tee -a out.log date +%F\ %T | tee -a out.log
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册