diff --git a/include/common/tcommon.h b/include/common/tcommon.h index fb59d8b9a037a4be39b93137fea989c1d6db43df..a071516fbfc2e647e78ae00c57842ac92253dea6 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -205,7 +205,7 @@ typedef struct SColumn { int16_t slotId; char name[TSDB_COL_NAME_LEN]; - int8_t flag; // column type: normal column, tag, or user-input column (integer/float/string) + int16_t colType; // column type: normal column, tag, or window column int16_t type; int32_t bytes; uint8_t precision; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index cec6f1a6919ab66ad3928254d47a0943f60936b5..3a1eaf289e4ba245544b985e893f746845c37c88 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -57,7 +57,9 @@ typedef enum EColumnType { COLUMN_TYPE_COLUMN = 1, COLUMN_TYPE_TAG, COLUMN_TYPE_TBNAME, - COLUMN_TYPE_WINDOW_PC, + COLUMN_TYPE_WINDOW_START, + COLUMN_TYPE_WINDOW_END, + COLUMN_TYPE_WINDOW_DURATION, COLUMN_TYPE_GROUP_KEY } EColumnType; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 197d94dcf48a196aec7acd01eb0cd88557cadb1c..b89579a0177caa798565e5a90772e71137b2c1f0 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -284,17 +284,17 @@ int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, return TSDB_CODE_SUCCESS; } -typedef struct tagFilterAssist{ - SHashObj *colHash; +typedef struct tagFilterAssist { + SHashObj* colHash; int32_t index; - SArray *cInfoList; -}tagFilterAssist; + SArray* cInfoList; +} tagFilterAssist; static EDealRes getColumn(SNode** pNode, void* pContext) { SColumnNode* pSColumnNode = NULL; if (QUERY_NODE_COLUMN == nodeType((*pNode))) { pSColumnNode = *(SColumnNode**)pNode; - }else if(QUERY_NODE_FUNCTION == nodeType((*pNode))){ + } else if (QUERY_NODE_FUNCTION == nodeType((*pNode))) { SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode); if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) { pSColumnNode = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); @@ -307,24 +307,26 @@ static EDealRes getColumn(SNode** pNode, void* pContext) { pSColumnNode->node.resType.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE; nodesDestroyNode(*pNode); *pNode = (SNode*)pSColumnNode; - }else{ + } else { return DEAL_RES_CONTINUE; } - }else{ + } else { return DEAL_RES_CONTINUE; } - tagFilterAssist *pData = (tagFilterAssist *)pContext; - void *data = taosHashGet(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId)); - if(!data){ + tagFilterAssist* pData = (tagFilterAssist*)pContext; + void* data = taosHashGet(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId)); + if (!data) { taosHashPut(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId), pNode, sizeof((*pNode))); pSColumnNode->slotId = pData->index++; - SColumnInfo cInfo = {.colId = pSColumnNode->colId, .type = pSColumnNode->node.resType.type, .bytes = pSColumnNode->node.resType.bytes}; + SColumnInfo cInfo = {.colId = pSColumnNode->colId, + .type = pSColumnNode->node.resType.type, + .bytes = pSColumnNode->node.resType.bytes}; #if TAG_FILTER_DEBUG qDebug("tagfilter build column info, slotId:%d, colId:%d, type:%d", pSColumnNode->slotId, cInfo.colId, cInfo.type); #endif taosArrayPush(pData->cInfoList, &cInfo); - }else{ + } else { SColumnNode* col = *(SColumnNode**)data; pSColumnNode->slotId = col->slotId; } @@ -339,9 +341,9 @@ static int32_t createResultData(SDataType* pType, int32_t numOfRows, SScalarPara return terrno; } - pColumnData->info.type = pType->type; - pColumnData->info.bytes = pType->bytes; - pColumnData->info.scale = pType->scale; + pColumnData->info.type = pType->type; + pColumnData->info.bytes = pType->bytes; + pColumnData->info.scale = pType->scale; pColumnData->info.precision = pType->precision; int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows); @@ -356,27 +358,27 @@ static int32_t createResultData(SDataType* pType, int32_t numOfRows, SScalarPara return TSDB_CODE_SUCCESS; } -static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray* uidList, SNode* pTagCond){ - int32_t code = TSDB_CODE_SUCCESS; - SArray* pBlockList = NULL; +static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray* uidList, SNode* pTagCond) { + int32_t code = TSDB_CODE_SUCCESS; + SArray* pBlockList = NULL; SSDataBlock* pResBlock = NULL; - SHashObj * tags = NULL; + SHashObj* tags = NULL; SScalarParam output = {0}; tagFilterAssist ctx = {0}; ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK); - if(ctx.colHash == NULL){ + if (ctx.colHash == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto end; } ctx.index = 0; ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo)); - if(ctx.cInfoList == NULL){ + if (ctx.cInfoList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto end; } - nodesRewriteExprPostOrder(&pTagCond, getColumn, (void *)&ctx); + nodesRewriteExprPostOrder(&pTagCond, getColumn, (void*)&ctx); pResBlock = createDataBlock(); if (pResBlock == NULL) { @@ -390,7 +392,7 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray blockDataAppendColInfo(pResBlock, &colInfo); } -// int64_t stt = taosGetTimestampUs(); + // int64_t stt = taosGetTimestampUs(); tags = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); code = metaGetTableTags(metaHandle, suid, uidList, tags); if (code != TSDB_CODE_SUCCESS) { @@ -400,11 +402,11 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray } int32_t rows = taosArrayGetSize(uidList); - if(rows == 0){ + if (rows == 0) { goto end; } -// int64_t stt1 = taosGetTimestampUs(); -// qDebug("generate tag meta rows:%d, cost:%ld us", rows, stt1-stt); + // int64_t stt1 = taosGetTimestampUs(); + // qDebug("generate tag meta rows:%d, cost:%ld us", rows, stt1-stt); code = blockDataEnsureCapacity(pResBlock, rows); if (code != TSDB_CODE_SUCCESS) { @@ -412,46 +414,46 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray goto end; } -// int64_t st = taosGetTimestampUs(); + // int64_t st = taosGetTimestampUs(); for (int32_t i = 0; i < rows; i++) { int64_t* uid = taosArrayGet(uidList, i); - for(int32_t j = 0; j < taosArrayGetSize(pResBlock->pDataBlock); j++){ + for (int32_t j = 0; j < taosArrayGetSize(pResBlock->pDataBlock); j++) { SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j); - if(pColInfo->info.colId == -1){ // tbname + if (pColInfo->info.colId == -1) { // tbname char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; metaGetTableNameByUid(metaHandle, *uid, str); colDataAppend(pColInfo, i, str, false); #if TAG_FILTER_DEBUG - qDebug("tagfilter uid:%ld, tbname:%s", *uid, str+2); + qDebug("tagfilter uid:%ld, tbname:%s", *uid, str + 2); #endif - }else{ + } else { void* tag = taosHashGet(tags, uid, sizeof(int64_t)); ASSERT(tag); STagVal tagVal = {0}; tagVal.cid = pColInfo->info.colId; const char* p = metaGetTableTagVal(tag, pColInfo->info.type, &tagVal); - if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)){ + if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) { colDataAppend(pColInfo, i, p, true); } else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) { colDataAppend(pColInfo, i, p, false); } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { - char *tmp = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1); + char* tmp = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1); varDataSetLen(tmp, tagVal.nData); memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData); colDataAppend(pColInfo, i, tmp, false); #if TAG_FILTER_DEBUG - qDebug("tagfilter varch:%s", tmp+2); + qDebug("tagfilter varch:%s", tmp + 2); #endif taosMemoryFree(tmp); } else { colDataAppend(pColInfo, i, (const char*)&tagVal.i64, false); #if TAG_FILTER_DEBUG - if(pColInfo->info.type == TSDB_DATA_TYPE_INT){ + if (pColInfo->info.type == TSDB_DATA_TYPE_INT) { qDebug("tagfilter int:%d", *(int*)(&tagVal.i64)); - }else if(pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE){ - qDebug("tagfilter double:%f", *(double *)(&tagVal.i64)); + } else if (pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE) { + qDebug("tagfilter double:%f", *(double*)(&tagVal.i64)); } #endif } @@ -460,8 +462,8 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray } pResBlock->info.rows = rows; -// int64_t st1 = taosGetTimestampUs(); -// qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st); + // int64_t st1 = taosGetTimestampUs(); + // qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st); pBlockList = taosArrayInit(2, POINTER_BYTES); taosArrayPush(pBlockList, &pResBlock); @@ -475,13 +477,13 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray } code = scalarCalculate(pTagCond, pBlockList, &output); - if(code != TSDB_CODE_SUCCESS){ + if (code != TSDB_CODE_SUCCESS) { qError("failed to calculate scalar, reason:%s", tstrerror(code)); terrno = code; goto end; } -// int64_t st2 = taosGetTimestampUs(); -// qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1); + // int64_t st2 = taosGetTimestampUs(); + // qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1); end: taosHashCleanup(tags); @@ -493,43 +495,43 @@ end: } static void releaseColInfoData(void* pCol) { - if(pCol){ - SColumnInfoData* col = (SColumnInfoData*) pCol; + if (pCol) { + SColumnInfoData* col = (SColumnInfoData*)pCol; colDataDestroy(col); taosMemoryFree(col); } } -int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableListInfo* pTableListInfo){ - int32_t code = TSDB_CODE_SUCCESS; - SArray *pBlockList = NULL; - SSDataBlock *pResBlock = NULL; - SHashObj *tags = NULL; - SArray *uidList = NULL; - void *keyBuf = NULL; - SArray *groupData = NULL; +int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableListInfo* pTableListInfo) { + int32_t code = TSDB_CODE_SUCCESS; + SArray* pBlockList = NULL; + SSDataBlock* pResBlock = NULL; + SHashObj* tags = NULL; + SArray* uidList = NULL; + void* keyBuf = NULL; + SArray* groupData = NULL; int32_t rows = taosArrayGetSize(pTableListInfo->pTableList); - if(rows == 0){ + if (rows == 0) { return TDB_CODE_SUCCESS; } tagFilterAssist ctx = {0}; ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK); - if(ctx.colHash == NULL){ + if (ctx.colHash == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto end; } ctx.index = 0; ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo)); - if(ctx.cInfoList == NULL){ + if (ctx.cInfoList == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto end; } - SNode* pNode = NULL; + SNode* pNode = NULL; FOREACH(pNode, group) { - nodesRewriteExprPostOrder(&pNode, getColumn, (void *)&ctx); + nodesRewriteExprPostOrder(&pNode, getColumn, (void*)&ctx); REPLACE_NODE(pNode); } @@ -551,61 +553,61 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis taosArrayPush(uidList, &pkeyInfo->uid); } -// int64_t stt = taosGetTimestampUs(); + // int64_t stt = taosGetTimestampUs(); tags = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); code = metaGetTableTags(metaHandle, pTableListInfo->suid, uidList, tags); if (code != TSDB_CODE_SUCCESS) { goto end; } -// int64_t stt1 = taosGetTimestampUs(); -// qDebug("generate tag meta rows:%d, cost:%ld us", rows, stt1-stt); + // int64_t stt1 = taosGetTimestampUs(); + // qDebug("generate tag meta rows:%d, cost:%ld us", rows, stt1-stt); code = blockDataEnsureCapacity(pResBlock, rows); if (code != TSDB_CODE_SUCCESS) { goto end; } -// int64_t st = taosGetTimestampUs(); + // int64_t st = taosGetTimestampUs(); for (int32_t i = 0; i < rows; i++) { int64_t* uid = taosArrayGet(uidList, i); - for(int32_t j = 0; j < taosArrayGetSize(pResBlock->pDataBlock); j++){ + for (int32_t j = 0; j < taosArrayGetSize(pResBlock->pDataBlock); j++) { SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j); - if(pColInfo->info.colId == -1){ // tbname + if (pColInfo->info.colId == -1) { // tbname char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; metaGetTableNameByUid(metaHandle, *uid, str); colDataAppend(pColInfo, i, str, false); #if TAG_FILTER_DEBUG - qDebug("tagfilter uid:%ld, tbname:%s", *uid, str+2); + qDebug("tagfilter uid:%ld, tbname:%s", *uid, str + 2); #endif - }else{ + } else { void* tag = taosHashGet(tags, uid, sizeof(int64_t)); ASSERT(tag); STagVal tagVal = {0}; tagVal.cid = pColInfo->info.colId; const char* p = metaGetTableTagVal(tag, pColInfo->info.type, &tagVal); - if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)){ + if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) { colDataAppend(pColInfo, i, p, true); } else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) { colDataAppend(pColInfo, i, p, false); } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { - char *tmp = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1); + char* tmp = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1); varDataSetLen(tmp, tagVal.nData); memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData); colDataAppend(pColInfo, i, tmp, false); #if TAG_FILTER_DEBUG - qDebug("tagfilter varch:%s", tmp+2); + qDebug("tagfilter varch:%s", tmp + 2); #endif taosMemoryFree(tmp); } else { colDataAppend(pColInfo, i, (const char*)&tagVal.i64, false); #if TAG_FILTER_DEBUG - if(pColInfo->info.type == TSDB_DATA_TYPE_INT){ + if (pColInfo->info.type == TSDB_DATA_TYPE_INT) { qDebug("tagfilter int:%d", *(int*)(&tagVal.i64)); - }else if(pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE){ - qDebug("tagfilter double:%f", *(double *)(&tagVal.i64)); + } else if (pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE) { + qDebug("tagfilter double:%f", *(double*)(&tagVal.i64)); } #endif } @@ -614,8 +616,8 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis } pResBlock->info.rows = rows; -// int64_t st1 = taosGetTimestampUs(); -// qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st); + // int64_t st1 = taosGetTimestampUs(); + // qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st); pBlockList = taosArrayInit(2, POINTER_BYTES); taosArrayPush(pBlockList, &pResBlock); @@ -629,7 +631,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis break; case QUERY_NODE_COLUMN: case QUERY_NODE_OPERATOR: - case QUERY_NODE_FUNCTION:{ + case QUERY_NODE_FUNCTION: { SExprNode* expNode = (SExprNode*)pNode; code = createResultData(&expNode->resType, rows, &output); if (code != TSDB_CODE_SUCCESS) { @@ -641,16 +643,16 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis code = TSDB_CODE_OPS_NOT_SUPPORT; goto end; } - if(nodeType(pNode) == QUERY_NODE_COLUMN){ - SColumnNode* pSColumnNode = (SColumnNode*)pNode; + if (nodeType(pNode) == QUERY_NODE_COLUMN) { + SColumnNode* pSColumnNode = (SColumnNode*)pNode; SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId); code = colDataAssign(output.columnData, pColInfo, rows, NULL); - }else if(nodeType(pNode) == QUERY_NODE_VALUE){ + } else if (nodeType(pNode) == QUERY_NODE_VALUE) { continue; - }else{ + } else { code = scalarCalculate(pNode, pBlockList, &output); } - if(code != TSDB_CODE_SUCCESS){ + if (code != TSDB_CODE_SUCCESS) { releaseColInfoData(output.columnData); goto end; } @@ -658,7 +660,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis } int32_t keyLen = 0; - SNode* node; + SNode* node; FOREACH(node, group) { SExprNode* pExpr = (SExprNode*)node; keyLen += pExpr->resType.bytes; @@ -672,12 +674,12 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis code = TSDB_CODE_OUT_OF_MEMORY; goto end; } - for(int i = 0; i < rows; i++){ + for (int i = 0; i < rows; i++) { STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); char* isNull = (char*)keyBuf; char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(group); - for(int j = 0; j < taosArrayGetSize(groupData); j++){ + for (int j = 0; j < taosArrayGetSize(groupData); j++) { SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j); if (colDataIsNull_s(pValue, i)) { @@ -690,7 +692,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis code = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR; goto end; } - if(tTagIsJsonNull(data)){ + if (tTagIsJsonNull(data)) { isNull[j] = 1; continue; } @@ -712,10 +714,10 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); } -// int64_t st2 = taosGetTimestampUs(); -// qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1); + // int64_t st2 = taosGetTimestampUs(); + // qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1); - end: +end: taosMemoryFreeClear(keyBuf); taosHashCleanup(tags); taosHashCleanup(ctx.colHash); @@ -745,7 +747,7 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SIndexMetaArg metaArg = { .metaEx = metaHandle, .idx = tsdbGetIdx(metaHandle), .ivtIdx = tsdbGetIvtIdx(metaHandle), .suid = tableUid}; -// int64_t stt = taosGetTimestampUs(); + // int64_t stt = taosGetTimestampUs(); SIdxFltStatus status = SFLT_NOT_INDEX; code = doFilterTag(pTagIndexCond, &metaArg, res, &status); if (code != 0 || status == SFLT_NOT_INDEX) { @@ -753,13 +755,13 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, code = TDB_CODE_SUCCESS; } -// int64_t stt1 = taosGetTimestampUs(); -// qDebug("generate table list, cost:%ld us", stt1-stt); - }else if(!pTagCond){ + // int64_t stt1 = taosGetTimestampUs(); + // qDebug("generate table list, cost:%ld us", stt1-stt); + } else if (!pTagCond) { vnodeGetCtbIdList(pVnode, pScanNode->suid, res); } } else { // Create one table group. - if(metaIsTableExist(metaHandle, tableUid)){ + if (metaIsTableExist(metaHandle, tableUid)) { taosArrayPush(res, &tableUid); } } @@ -767,7 +769,7 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, if (pTagCond) { terrno = TDB_CODE_SUCCESS; SColumnInfoData* pColInfoData = getColInfoResult(metaHandle, pListInfo->suid, res, pTagCond); - if(terrno != TDB_CODE_SUCCESS){ + if (terrno != TDB_CODE_SUCCESS) { colDataDestroy(pColInfoData); taosMemoryFreeClear(pColInfoData); taosArrayDestroy(res); @@ -829,7 +831,7 @@ size_t getTableTagsBufLen(const SNodeList* pGroups) { int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId) { SMetaReader mr = {0}; metaReaderInit(&mr, pMeta, 0); - if(metaGetTableEntryByUid(&mr, uid) != 0){ // table not exist + if (metaGetTableEntryByUid(&mr, uid) != 0) { // table not exist metaReaderClear(&mr); return TSDB_CODE_PAR_TABLE_NOT_EXIST; } @@ -987,7 +989,7 @@ static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, i return s; } -static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDataType* pType) { +static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDataType* pType, EColumnType colType) { SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn)); if (pCol == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1001,7 +1003,7 @@ static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDa pCol->scale = pType->scale; pCol->precision = pType->precision; pCol->dataBlockId = blockId; - + pCol->colType = colType; return pCol; } @@ -1045,7 +1047,8 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* SDataType* pType = &pColNode->node.resType; pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName); - pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType); + pExp->base.pParam[0].pCol = + createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType); pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; } else if (type == QUERY_NODE_VALUE) { pExp->pExpr->nodeType = QUERY_NODE_VALUE; @@ -1097,7 +1100,8 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* SColumnNode* pcn = (SColumnNode*)p1; pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN; - pExp->base.pParam[j].pCol = createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType); + pExp->base.pParam[j].pCol = + createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType, pcn->colType); } else if (p1->type == QUERY_NODE_VALUE) { SValueNode* pvn = (SValueNode*)p1; pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 98c7c56d729c350b5cb573d7349e8f0c6dbf605e..63d8563a09a26f4667ca203ab8d7973f3326a6ed 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -148,20 +148,6 @@ static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); -// setup the output buffer for each operator -static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) { - if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) || - pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - return false; - } - - if (pStatis != NULL && pStatis->numOfNull == 0) { - return false; - } - - return true; -} - #if 0 static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, char* pData, int16_t bytes, bool masterscan, uint64_t uid) { diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 59dd58070d1b96fa5003c0162957d6e8adedb061..f23552c5a7b82207ffc368dbae7c1894cb6a8edd 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -36,6 +36,7 @@ #define GET_DEST_SLOT_ID(_p) ((_p)->pExpr->base.resSchema.slotId) static void doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey); +static bool fillIfWindowPseudoColumn(SFillInfo* pFillInfo, SFillColInfo* pCol, SColumnInfoData* pDstColInfoData, int32_t rowIndex); static void setNullRow(SSDataBlock* pBlock, SFillInfo* pFillInfo, int32_t rowIndex) { for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { @@ -43,9 +44,8 @@ static void setNullRow(SSDataBlock* pBlock, SFillInfo* pFillInfo, int32_t rowInd int32_t dstSlotId = GET_DEST_SLOT_ID(pCol); SColumnInfoData* pDstColInfo = taosArrayGet(pBlock->pDataBlock, dstSlotId); if (pCol->notFillCol) { - if (pDstColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) { - colDataAppend(pDstColInfo, rowIndex, (const char*)&pFillInfo->currentKey, false); - } else { + bool filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstColInfo, rowIndex); + if (!filled) { SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal; SGroupKeys* pKey = taosArrayGet(p, i); doSetVal(pDstColInfo, rowIndex, pKey); @@ -76,6 +76,35 @@ static void doSetUserSpecifiedValue(SColumnInfoData* pDst, SVariant* pVar, int32 } } +//fill windows pseudo column, _wstart, _wend, _wduration and return true, otherwise return false +static bool fillIfWindowPseudoColumn(SFillInfo* pFillInfo, SFillColInfo* pCol, SColumnInfoData* pDstColInfoData, int32_t rowIndex) { + if (!pCol->notFillCol) { + return false; + } + if (pCol->pExpr->pExpr->nodeType == QUERY_NODE_COLUMN) { + if (pCol->pExpr->base.numOfParams != 1) { + return false; + } + if (pCol->pExpr->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_START) { + colDataAppend(pDstColInfoData, rowIndex, (const char*)&pFillInfo->currentKey, false); + return true; + } else if (pCol->pExpr->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_END) { + //TODO: include endpoint + SInterval* pInterval = &pFillInfo->interval; + int32_t step = (pFillInfo->order == TSDB_ORDER_ASC) ? 1 : -1; + int64_t windowEnd = + taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision); + colDataAppend(pDstColInfoData, rowIndex, (const char*)&windowEnd, false); + return true; + } else if (pCol->pExpr->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_DURATION) { + //TODO: include endpoint + colDataAppend(pDstColInfoData, rowIndex, (const char*)&pFillInfo->interval.sliding, false); + return true; + } + } + return false; +} + static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* pSrcBlock, int64_t ts, bool outOfBound) { SPoint point1, point2, point; @@ -92,10 +121,8 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* SFillColInfo* pCol = &pFillInfo->pFillCol[i]; SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol)); - - if (pDstColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) { - colDataAppend(pDstColInfoData, index, (const char*)&pFillInfo->currentKey, false); - } else { + bool filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstColInfoData, index); + if (!filled) { SGroupKeys* pKey = taosArrayGet(p, i); doSetVal(pDstColInfoData, index, pKey); } @@ -106,10 +133,8 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol)); - - if (pDstColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) { - colDataAppend(pDstColInfoData, index, (const char*)&pFillInfo->currentKey, false); - } else { + bool filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstColInfoData, index); + if (!filled) { SGroupKeys* pKey = taosArrayGet(p, i); doSetVal(pDstColInfoData, index, pKey); } @@ -127,9 +152,8 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* int16_t type = pDstCol->info.type; if (pCol->notFillCol) { - if (type == TSDB_DATA_TYPE_TIMESTAMP) { - colDataAppend(pDstCol, index, (const char*)&pFillInfo->currentKey, false); - } else { + bool filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstCol, index); + if (!filled) { SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal; SGroupKeys* pKey = taosArrayGet(p, i); doSetVal(pDstCol, index, pKey); @@ -170,9 +194,8 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, slotId); if (pCol->notFillCol) { - if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) { - colDataAppend(pDst, index, (const char*)&pFillInfo->currentKey, false); - } else { + bool filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDst, index); + if (!filled) { SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal; SGroupKeys* pKey = taosArrayGet(p, i); doSetVal(pDst, index, pKey); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index ba0e534cc6b65c76d470507bf1bf4979e5d40377..bacc5c28559ece721a777e2aa6162c8a94c7bf4e 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1947,8 +1947,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) { doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); doKeepTuple(pRowSup, tsList[j], gid); - } else if ((tsList[j] - pRowSup->prevTs >= 0) && tsList[j] - pRowSup->prevTs <= gap || - (pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap)) { + } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) || + ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) { // The gap is less than the threshold, so it belongs to current session window that has been opened already. doKeepTuple(pRowSup, tsList[j], gid); if (j == 0 && pRowSup->startRowIndex != 0) { diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 71f084d41226fee17bd0b8c0d63f69ad07ca3a20..0667c5f5b9e2984d119dab5297abb16e374977fb 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -44,12 +44,15 @@ static void setColumnInfo(SFunctionNode* pFunc, SColumnNode* pCol) { pCol->colType = COLUMN_TYPE_TBNAME; break; case FUNCTION_TYPE_WSTART: + pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID; + pCol->colType = COLUMN_TYPE_WINDOW_START; + break; case FUNCTION_TYPE_WEND: pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID; - pCol->colType = COLUMN_TYPE_WINDOW_PC; + pCol->colType = COLUMN_TYPE_WINDOW_END; break; case FUNCTION_TYPE_WDURATION: - pCol->colType = COLUMN_TYPE_WINDOW_PC; + pCol->colType = COLUMN_TYPE_WINDOW_DURATION; break; case FUNCTION_TYPE_GROUP_KEY: pCol->colType = COLUMN_TYPE_GROUP_KEY; @@ -784,7 +787,10 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele static EDealRes needFillValueImpl(SNode* pNode, void* pContext) { if (QUERY_NODE_COLUMN == nodeType(pNode)) { SColumnNode* pCol = (SColumnNode*)pNode; - if (COLUMN_TYPE_WINDOW_PC != pCol->colType && COLUMN_TYPE_GROUP_KEY != pCol->colType) { + if (COLUMN_TYPE_WINDOW_START != pCol->colType && + COLUMN_TYPE_WINDOW_END != pCol->colType && + COLUMN_TYPE_WINDOW_DURATION != pCol->colType && + COLUMN_TYPE_GROUP_KEY != pCol->colType) { *(bool*)pContext = true; return DEAL_RES_END; }