提交 a9673ceb 编写于 作者: wmmhello's avatar wmmhello

Merge branch 'master' of github.com:taosdata/TDengine into test/TD-6167

...@@ -36,7 +36,7 @@ extern "C" { ...@@ -36,7 +36,7 @@ extern "C" {
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE)) (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE))
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \ #define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo) || UTIL_TABLE_IS_TMP_TABLE(metaInfo))) (!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \ #define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE)) (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
......
...@@ -168,6 +168,9 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo ...@@ -168,6 +168,9 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
} else { } else {
pRes->code = numOfRows; pRes->code = numOfRows;
} }
if (pRes->code == TSDB_CODE_SUCCESS) {
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
}
tscAsyncResultOnError(pSql); tscAsyncResultOnError(pSql);
return; return;
......
...@@ -35,6 +35,7 @@ typedef struct SCompareParam { ...@@ -35,6 +35,7 @@ typedef struct SCompareParam {
static bool needToMerge(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, char **buf) { static bool needToMerge(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, char **buf) {
int32_t ret = 0; int32_t ret = 0;
size_t size = taosArrayGetSize(columnIndexList); size_t size = taosArrayGetSize(columnIndexList);
if (size > 0) { if (size > 0) {
ret = compare_aRv(pBlock, columnIndexList, (int32_t) size, index, buf, TSDB_ORDER_ASC); ret = compare_aRv(pBlock, columnIndexList, (int32_t) size, index, buf, TSDB_ORDER_ASC);
...@@ -564,9 +565,11 @@ static void savePrevOrderColumns(char** prevRow, SArray* pColumnList, SSDataBloc ...@@ -564,9 +565,11 @@ static void savePrevOrderColumns(char** prevRow, SArray* pColumnList, SSDataBloc
(*hasPrev) = true; (*hasPrev) = true;
} }
// tsdb_func_tag function only produce one row of result. Therefore, we need to copy the
// output value to multiple rows
static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t numOfRows) { static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t numOfRows) {
if (numOfRows <= 1) { if (numOfRows <= 1) {
return ; return;
} }
for (int32_t k = 0; k < numOfOutput; ++k) { for (int32_t k = 0; k < numOfOutput; ++k) {
...@@ -574,31 +577,20 @@ static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput ...@@ -574,31 +577,20 @@ static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput
continue; continue;
} }
int32_t inc = numOfRows - 1; // tsdb_func_tag function only produce one row of result
char* src = pCtx[k].pOutput; char* src = pCtx[k].pOutput;
char* dst = pCtx[k].pOutput + pCtx[k].outputBytes;
for (int32_t i = 0; i < inc; ++i) { // Let's start from the second row, as the first row has result value already.
pCtx[k].pOutput += pCtx[k].outputBytes; for (int32_t i = 1; i < numOfRows; ++i) {
memcpy(pCtx[k].pOutput, src, (size_t)pCtx[k].outputBytes); memcpy(dst, src, (size_t)pCtx[k].outputBytes);
dst += pCtx[k].outputBytes;
} }
} }
} }
static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) { static void doMergeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr, int32_t rowIndex, char** pDataPtr) {
SMultiwayMergeInfo* pInfo = pOperator->info;
SQLFunctionCtx* pCtx = pInfo->binfo.pCtx;
char** add = calloc(pBlock->info.numOfCols, POINTER_BYTES);
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
add[i] = pCtx[i].pInput;
pCtx[i].size = 1;
}
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
if (pInfo->hasPrev) {
if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) {
for (int32_t j = 0; j < numOfExpr; ++j) { for (int32_t j = 0; j < numOfExpr; ++j) {
pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i; pCtx[j].pInput = pDataPtr[j] + pCtx[j].inputBytes * rowIndex;
} }
for (int32_t j = 0; j < numOfExpr; ++j) { for (int32_t j = 0; j < numOfExpr; ++j) {
...@@ -609,16 +601,15 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD ...@@ -609,16 +601,15 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
if (functionId < 0) { if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE); doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
} else {
continue;
}
aAggs[functionId].mergeFunc(&pCtx[j]); aAggs[functionId].mergeFunc(&pCtx[j]);
} }
} else { }
for(int32_t j = 0; j < numOfExpr; ++j) { // TODO refactor }
static void doFinalizeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr) {
for(int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId; int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue; continue;
...@@ -626,15 +617,30 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD ...@@ -626,15 +617,30 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
if (functionId < 0) { if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
} else {
continue; aAggs[functionId].xFinalize(&pCtx[j]);
}
} }
}
aAggs[functionId].xFinalize(&pCtx[j]); static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) {
SMultiwayMergeInfo* pInfo = pOperator->info;
SQLFunctionCtx* pCtx = pInfo->binfo.pCtx;
char** addrPtr = calloc(pBlock->info.numOfCols, POINTER_BYTES);
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
addrPtr[i] = pCtx[i].pInput;
pCtx[i].size = 1;
} }
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
if (pInfo->hasPrev) {
if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) {
doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr);
} else {
doFinalizeResultImpl(pInfo, pCtx, numOfExpr);
int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput); int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput);
setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows); setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows);
...@@ -655,48 +661,10 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD ...@@ -655,48 +661,10 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo); aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo);
} }
for (int32_t j = 0; j < numOfExpr; ++j) { doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr);
pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i;
}
for (int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
continue;
}
aAggs[functionId].mergeFunc(&pCtx[j]);
}
} }
} else { } else {
for (int32_t j = 0; j < numOfExpr; ++j) { doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr);
pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i;
}
for (int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
continue;
}
aAggs[functionId].mergeFunc(&pCtx[j]);
}
} }
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev); savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev);
...@@ -704,11 +672,11 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD ...@@ -704,11 +672,11 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
{ {
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
pCtx[i].pInput = add[i]; pCtx[i].pInput = addrPtr[i];
} }
} }
tfree(add); tfree(addrPtr);
} }
static bool isAllSourcesCompleted(SGlobalMerger *pMerger) { static bool isAllSourcesCompleted(SGlobalMerger *pMerger) {
...@@ -816,6 +784,8 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) { ...@@ -816,6 +784,8 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) {
SLocalDataSource *pOneDataSrc = pMerger->pLocalDataSrc[pTree->pNode[0].index]; SLocalDataSource *pOneDataSrc = pMerger->pLocalDataSrc[pTree->pNode[0].index];
bool sameGroup = true; bool sameGroup = true;
if (pInfo->hasPrev) { if (pInfo->hasPrev) {
// todo refactor extract method
int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList); int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList);
// if this row belongs to current result set group // if this row belongs to current result set group
...@@ -955,9 +925,10 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { ...@@ -955,9 +925,10 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
break; break;
} }
bool sameGroup = true;
if (pAggInfo->hasGroupColData) { if (pAggInfo->hasGroupColData) {
bool sameGroup = isSameGroup(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData); sameGroup = isSameGroup(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData);
if (!sameGroup) { if (!sameGroup && !pAggInfo->multiGroupResults) {
*newgroup = true; *newgroup = true;
pAggInfo->hasDataBlockForNewGroup = true; pAggInfo->hasDataBlockForNewGroup = true;
pAggInfo->pExistBlock = pBlock; pAggInfo->pExistBlock = pBlock;
...@@ -976,26 +947,11 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { ...@@ -976,26 +947,11 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
} }
if (handleData) { // data in current group is all handled if (handleData) { // data in current group is all handled
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) { doFinalizeResultImpl(pAggInfo, pAggInfo->binfo.pCtx, pOperator->numOfOutput);
int32_t functionId = pAggInfo->binfo.pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pAggInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pAggInfo->binfo.pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
continue;
}
aAggs[functionId].xFinalize(&pAggInfo->binfo.pCtx[j]);
}
int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pAggInfo->binfo.pCtx, pOperator->numOfOutput); int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pAggInfo->binfo.pCtx, pOperator->numOfOutput);
pAggInfo->binfo.pRes->info.rows += numOfRows;
pAggInfo->binfo.pRes->info.rows += numOfRows;
setTagValueForMultipleRows(pAggInfo->binfo.pCtx, pOperator->numOfOutput, numOfRows); setTagValueForMultipleRows(pAggInfo->binfo.pCtx, pOperator->numOfOutput, numOfRows);
} }
...@@ -1019,71 +975,127 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { ...@@ -1019,71 +975,127 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
return (pRes->info.rows != 0)? pRes:NULL; return (pRes->info.rows != 0)? pRes:NULL;
} }
static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { static void doHandleDataInCurrentGroup(SSLimitOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex) {
SSLimitOperatorInfo *pInfo = pOperator->info; if (pInfo->currentOffset > 0) {
assert(pInfo->currentGroupOffset >= 0); pInfo->currentOffset -= 1;
} else {
// discard the data rows in current group
if (pInfo->limit.limit < 0 || (pInfo->limit.limit >= 0 && pInfo->rowsTotal < pInfo->limit.limit)) {
size_t num1 = taosArrayGetSize(pInfo->pRes->pDataBlock);
for (int32_t i = 0; i < num1; ++i) {
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData *pDstInfoData = taosArrayGet(pInfo->pRes->pDataBlock, i);
SSDataBlock* pBlock = NULL; SColumnInfo *pColInfo = &pColInfoData->info;
if (pInfo->currentGroupOffset == 0) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); char *pSrc = rowIndex * pColInfo->bytes + (char *)pColInfoData->pData;
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); char *pDst = (char *)pDstInfoData->pData + (pInfo->pRes->info.rows * pColInfo->bytes);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { memcpy(pDst, pSrc, pColInfo->bytes);
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
} }
if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) { pInfo->rowsTotal += 1;
while ((*newgroup) == false) { // ignore the remain blocks pInfo->pRes->info.rows += 1;
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
return NULL;
} }
} }
}
static void ensureOutputBuf(SSLimitOperatorInfo * pInfo, SSDataBlock *pResultBlock, int32_t numOfRows) {
if (pInfo->capacity < pResultBlock->info.rows + numOfRows) {
int32_t total = pResultBlock->info.rows + numOfRows;
size_t num = taosArrayGetSize(pResultBlock->pDataBlock);
for (int32_t i = 0; i < num; ++i) {
SColumnInfoData *pInfoData = taosArrayGet(pResultBlock->pDataBlock, i);
char *tmp = realloc(pInfoData->pData, total * pInfoData->info.bytes);
if (tmp != NULL) {
pInfoData->pData = tmp;
} else {
// todo handle the malloc failure
} }
return pBlock; pInfo->capacity = total;
pInfo->threshold = (int64_t) (total * 0.8);
} }
}
}
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); enum {
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); BLOCK_NEW_GROUP = 1,
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); BLOCK_NO_GROUP = 2,
BLOCK_SAME_GROUP = 3,
};
if (pBlock == NULL) { static int32_t doSlimitImpl(SOperatorInfo* pOperator, SSLimitOperatorInfo* pInfo, SSDataBlock* pBlock) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); int32_t rowIndex = 0;
pOperator->status = OP_EXEC_DONE;
return NULL;
}
while(1) { while (rowIndex < pBlock->info.rows) {
if (*newgroup) { int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList);
pInfo->currentGroupOffset -= 1;
*newgroup = false; bool samegroup = true;
if (pInfo->hasPrev) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColIndex *pIndex = taosArrayGet(pInfo->orderColumnList, i);
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, pIndex->colIndex);
SColumnInfo *pColInfo = &pColInfoData->info;
char *d = rowIndex * pColInfo->bytes + (char *)pColInfoData->pData;
int32_t ret = columnValueAscendingComparator(pInfo->prevRow[i], d, pColInfo->type, pColInfo->bytes);
if (ret != 0) { // it is a new group
samegroup = false;
break;
}
}
} }
while ((*newgroup) == false) { if (!samegroup || !pInfo->hasPrev) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); pInfo->ignoreCurrentGroup = false;
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, rowIndex, &pInfo->hasPrev);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group
pInfo->rowsTotal = 0;
if (pInfo->currentGroupOffset > 0) {
pInfo->ignoreCurrentGroup = true;
pInfo->currentGroupOffset -= 1; // now we are in the next group data
rowIndex += 1;
continue;
}
// A new group has arrived according to the result rows, and the group limitation has already reached.
// Let's jump out of current loop and return immediately.
if (pInfo->slimit.limit >= 0 && pInfo->groupTotal >= pInfo->slimit.limit) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
return NULL; return BLOCK_NO_GROUP;
} }
pInfo->groupTotal += 1;
// data in current group not allowed, return if current result does not belong to the previous group.And there
// are results exists in current SSDataBlock
if (!pInfo->multigroupResult && !samegroup && pInfo->pRes->info.rows > 0) {
return BLOCK_NEW_GROUP;
} }
// now we have got the first data block of the next group. doHandleDataInCurrentGroup(pInfo, pBlock, rowIndex);
if (pInfo->currentGroupOffset == 0) {
return pBlock; } else { // handle the offset in the same group
// All the data in current group needs to be discarded, due to the limit parameter in the SQL statement
if (pInfo->ignoreCurrentGroup) {
rowIndex += 1;
continue;
}
doHandleDataInCurrentGroup(pInfo, pBlock, rowIndex);
} }
rowIndex += 1;
} }
return NULL; return BLOCK_SAME_GROUP;
} }
SSDataBlock* doSLimit(void* param, bool* newgroup) { SSDataBlock* doSLimit(void* param, bool* newgroup) {
...@@ -1093,63 +1105,41 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) { ...@@ -1093,63 +1105,41 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) {
} }
SSLimitOperatorInfo *pInfo = pOperator->info; SSLimitOperatorInfo *pInfo = pOperator->info;
pInfo->pRes->info.rows = 0;
SSDataBlock *pBlock = NULL; if (pInfo->pPrevBlock != NULL) {
while (1) { ensureOutputBuf(pInfo, pInfo->pRes, pInfo->pPrevBlock->info.rows);
pBlock = skipGroupBlock(pOperator, newgroup); int32_t ret = doSlimitImpl(pOperator, pInfo, pInfo->pPrevBlock);
if (pBlock == NULL) { assert(ret != BLOCK_NEW_GROUP);
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
return NULL;
}
if (*newgroup) { // a new group arrives
pInfo->groupTotal += 1;
pInfo->rowsTotal = 0;
pInfo->currentOffset = pInfo->limit.offset;
}
assert(pInfo->currentGroupOffset == 0);
if (pInfo->currentOffset >= pBlock->info.rows) { pInfo->pPrevBlock = NULL;
pInfo->currentOffset -= pBlock->info.rows;
} else {
if (pInfo->currentOffset == 0) {
break;
} }
int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset); assert(pInfo->currentGroupOffset >= 0);
pBlock->info.rows = remain;
// move the remain rows of this data block to the front. while(1) {
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SSDataBlock *pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
int16_t bytes = pColInfoData->info.bytes; if (pBlock == NULL) {
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes); return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
} }
pInfo->currentOffset = 0; ensureOutputBuf(pInfo, pInfo->pRes, pBlock->info.rows);
break; int32_t ret = doSlimitImpl(pOperator, pInfo, pBlock);
} if (ret == BLOCK_NEW_GROUP) {
pInfo->pPrevBlock = pBlock;
return pInfo->pRes;
} }
if (pInfo->slimit.limit > 0 && pInfo->groupTotal > pInfo->slimit.limit) { // reach the group limit, abort if (pOperator->status == OP_EXEC_DONE) {
return NULL; return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
} }
if (pInfo->limit.limit > 0 && (pInfo->rowsTotal + pBlock->info.rows >= pInfo->limit.limit)) { // now the number of rows in current group is enough, let's return to the invoke function
pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->rowsTotal); if (pInfo->pRes->info.rows > pInfo->threshold) {
pInfo->rowsTotal = pInfo->limit.limit; return pInfo->pRes;
if (pInfo->slimit.limit > 0 && pInfo->groupTotal >= pInfo->slimit.limit) {
pOperator->status = OP_EXEC_DONE;
} }
// setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
} else {
pInfo->rowsTotal += pBlock->info.rows;
} }
return pBlock;
} }
...@@ -1757,6 +1757,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow ...@@ -1757,6 +1757,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
pSql->res.numOfRows = 0; pSql->res.numOfRows = 0;
code = doPackSendDataBlock(pSql, pInsertParam, pTableMeta, count, pTableDataBlock); code = doPackSendDataBlock(pSql, pInsertParam, pTableMeta, count, pTableDataBlock);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = code;
goto _error; goto _error;
} }
......
...@@ -422,7 +422,6 @@ int32_t readFromFile(char *name, uint32_t *len, void **buf) { ...@@ -422,7 +422,6 @@ int32_t readFromFile(char *name, uint32_t *len, void **buf) {
return TSDB_CODE_TSC_APP_ERROR; return TSDB_CODE_TSC_APP_ERROR;
} }
close(fd); close(fd);
tfree(*buf);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -926,7 +925,6 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -926,7 +925,6 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
pQueryInfo = pCmd->active; pQueryInfo = pCmd->active;
pQueryInfo->pUdfInfo = pUdfInfo; pQueryInfo->pUdfInfo = pUdfInfo;
pQueryInfo->udfCopy = true; pQueryInfo->udfCopy = true;
} }
} }
...@@ -6917,7 +6915,6 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo ...@@ -6917,7 +6915,6 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
const char* msg1 = "interval not allowed in group by normal column"; const char* msg1 = "interval not allowed in group by normal column";
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
SSchema* tagSchema = NULL; SSchema* tagSchema = NULL;
...@@ -8439,6 +8436,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS ...@@ -8439,6 +8436,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS
if (taosArrayGetSize(subInfo->pSubquery) >= 2) { if (taosArrayGetSize(subInfo->pSubquery) >= 2) {
return invalidOperationMsg(msgBuf, "not support union in subquery"); return invalidOperationMsg(msgBuf, "not support union in subquery");
} }
SQueryInfo* pSub = calloc(1, sizeof(SQueryInfo)); SQueryInfo* pSub = calloc(1, sizeof(SQueryInfo));
tscInitQueryInfo(pSub); tscInitQueryInfo(pSub);
...@@ -8461,6 +8459,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS ...@@ -8461,6 +8459,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS
if (pTableMetaInfo1 == NULL) { if (pTableMetaInfo1 == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
pTableMetaInfo1->pTableMeta = extractTempTableMetaFromSubquery(pSub); pTableMetaInfo1->pTableMeta = extractTempTableMetaFromSubquery(pSub);
pTableMetaInfo1->tableMetaCapacity = tscGetTableMetaSize(pTableMetaInfo1->pTableMeta); pTableMetaInfo1->tableMetaCapacity = tscGetTableMetaSize(pTableMetaInfo1->pTableMeta);
...@@ -8544,7 +8543,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -8544,7 +8543,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
// check if there is 3 level select // check if there is 3 level select
SRelElementPair* subInfo = taosArrayGet(pSqlNode->from->list, i); SRelElementPair* subInfo = taosArrayGet(pSqlNode->from->list, i);
SSqlNode* p = taosArrayGetP(subInfo->pSubquery, 0); SSqlNode* p = taosArrayGetP(subInfo->pSubquery, 0);
if (p->from->type == SQL_NODE_FROM_SUBQUERY){ if (p->from->type == SQL_NODE_FROM_SUBQUERY) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9);
} }
...@@ -8636,6 +8635,15 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -8636,6 +8635,15 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
} }
} }
// disable group result mixed up if interval/session window query exists.
if (isTimeWindowQuery(pQueryInfo)) {
size_t num = taosArrayGetSize(pQueryInfo->pUpstream);
for(int32_t i = 0; i < num; ++i) {
SQueryInfo* pUp = taosArrayGetP(pQueryInfo->pUpstream, i);
pUp->multigroupResult = false;
}
}
// parse the having clause in the first place // parse the having clause in the first place
int32_t joinQuery = (pSqlNode->from != NULL && taosArrayGetSize(pSqlNode->from->list) > 1); int32_t joinQuery = (pSqlNode->from != NULL && taosArrayGetSize(pSqlNode->from->list) > 1);
if (validateHavingClause(pQueryInfo, pSqlNode->pHaving, pCmd, pSqlNode->pSelNodeList, joinQuery, timeWindowQuery) != if (validateHavingClause(pQueryInfo, pSqlNode->pHaving, pCmd, pSqlNode->pSelNodeList, joinQuery, timeWindowQuery) !=
......
...@@ -2265,7 +2265,7 @@ void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) { ...@@ -2265,7 +2265,7 @@ void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) {
destroySup(pSup); destroySup(pSup);
taos_free_result(pSql); taos_free_result(pSql);
parent->res.code = code; parent->res.code = c;
tscAsyncResultOnError(parent); tscAsyncResultOnError(parent);
return; return;
} }
......
...@@ -3093,6 +3093,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) { ...@@ -3093,6 +3093,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
pQueryInfo->slimit.offset = 0; pQueryInfo->slimit.offset = 0;
pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES); pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->window = TSWINDOW_INITIALIZER; pQueryInfo->window = TSWINDOW_INITIALIZER;
pQueryInfo->multigroupResult = true;
} }
int32_t tscAddQueryInfo(SSqlCmd* pCmd) { int32_t tscAddQueryInfo(SSqlCmd* pCmd) {
...@@ -3104,7 +3105,6 @@ int32_t tscAddQueryInfo(SSqlCmd* pCmd) { ...@@ -3104,7 +3105,6 @@ int32_t tscAddQueryInfo(SSqlCmd* pCmd) {
} }
tscInitQueryInfo(pQueryInfo); tscInitQueryInfo(pQueryInfo);
pQueryInfo->msg = pCmd->payload; // pointer to the parent error message buffer pQueryInfo->msg = pCmd->payload; // pointer to the parent error message buffer
if (pCmd->pQueryInfo == NULL) { if (pCmd->pQueryInfo == NULL) {
...@@ -3186,6 +3186,7 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) { ...@@ -3186,6 +3186,7 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) {
pQueryInfo->window = pSrc->window; pQueryInfo->window = pSrc->window;
pQueryInfo->sessionWindow = pSrc->sessionWindow; pQueryInfo->sessionWindow = pSrc->sessionWindow;
pQueryInfo->pTableMetaInfo = NULL; pQueryInfo->pTableMetaInfo = NULL;
pQueryInfo->multigroupResult = pSrc->multigroupResult;
pQueryInfo->bufLen = pSrc->bufLen; pQueryInfo->bufLen = pSrc->bufLen;
pQueryInfo->orderProjectQuery = pSrc->orderProjectQuery; pQueryInfo->orderProjectQuery = pSrc->orderProjectQuery;
...@@ -3585,7 +3586,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t ...@@ -3585,7 +3586,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pNewQueryInfo->pTableMetaInfo = NULL; pNewQueryInfo->pTableMetaInfo = NULL;
pNewQueryInfo->bufLen = pQueryInfo->bufLen; pNewQueryInfo->bufLen = pQueryInfo->bufLen;
pNewQueryInfo->buf = malloc(pQueryInfo->bufLen); pNewQueryInfo->buf = malloc(pQueryInfo->bufLen);
pNewQueryInfo->multigroupResult = pQueryInfo->multigroupResult;
pNewQueryInfo->distinct = pQueryInfo->distinct; pNewQueryInfo->distinct = pQueryInfo->distinct;
if (pNewQueryInfo->buf == NULL) { if (pNewQueryInfo->buf == NULL) {
...@@ -4716,6 +4717,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt ...@@ -4716,6 +4717,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->distinct = pQueryInfo->distinct; pQueryAttr->distinct = pQueryInfo->distinct;
pQueryAttr->sw = pQueryInfo->sessionWindow; pQueryAttr->sw = pQueryInfo->sessionWindow;
pQueryAttr->stateWindow = pQueryInfo->stateWindow; pQueryAttr->stateWindow = pQueryInfo->stateWindow;
pQueryAttr->multigroupResult = pQueryInfo->multigroupResult;
pQueryAttr->numOfCols = numOfCols; pQueryAttr->numOfCols = numOfCols;
pQueryAttr->numOfOutput = numOfOutput; pQueryAttr->numOfOutput = numOfOutput;
......
...@@ -219,6 +219,7 @@ typedef struct SQueryAttr { ...@@ -219,6 +219,7 @@ typedef struct SQueryAttr {
bool distinct; // distinct query or not bool distinct; // distinct query or not
bool stateWindow; // window State on sub/normal table bool stateWindow; // window State on sub/normal table
bool createFilterOperator; // if filter operator is needed bool createFilterOperator; // if filter operator is needed
bool multigroupResult; // multigroup result can exist in one SSDataBlock
int32_t interBufSize; // intermediate buffer sizse int32_t interBufSize; // intermediate buffer sizse
int32_t havingNum; // having expr number int32_t havingNum; // having expr number
...@@ -470,6 +471,13 @@ typedef struct SSLimitOperatorInfo { ...@@ -470,6 +471,13 @@ typedef struct SSLimitOperatorInfo {
char **prevRow; char **prevRow;
SArray *orderColumnList; SArray *orderColumnList;
bool hasPrev;
bool ignoreCurrentGroup;
bool multigroupResult;
SSDataBlock *pRes; // result buffer
SSDataBlock *pPrevBlock;
int64_t capacity;
int64_t threshold;
} SSLimitOperatorInfo; } SSLimitOperatorInfo;
typedef struct SFilterOperatorInfo { typedef struct SFilterOperatorInfo {
...@@ -481,8 +489,9 @@ typedef struct SFillOperatorInfo { ...@@ -481,8 +489,9 @@ typedef struct SFillOperatorInfo {
SFillInfo *pFillInfo; SFillInfo *pFillInfo;
SSDataBlock *pRes; SSDataBlock *pRes;
int64_t totalInputRows; int64_t totalInputRows;
void **p;
SSDataBlock *existNewGroupBlock; SSDataBlock *existNewGroupBlock;
bool multigroupResult;
} SFillOperatorInfo; } SFillOperatorInfo;
typedef struct SGroupbyOperatorInfo { typedef struct SGroupbyOperatorInfo {
...@@ -544,9 +553,9 @@ typedef struct SMultiwayMergeInfo { ...@@ -544,9 +553,9 @@ typedef struct SMultiwayMergeInfo {
bool hasDataBlockForNewGroup; bool hasDataBlockForNewGroup;
SSDataBlock *pExistBlock; SSDataBlock *pExistBlock;
bool hasPrev;
bool groupMix;
SArray *udfInfo; SArray *udfInfo;
bool hasPrev;
bool multiGroupResults;
} SMultiwayMergeInfo; } SMultiwayMergeInfo;
// todo support the disk-based sort // todo support the disk-based sort
...@@ -568,7 +577,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -568,7 +577,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult);
SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
...@@ -577,10 +586,10 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf ...@@ -577,10 +586,10 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t numOfRows, void* merger, bool groupMix); int32_t numOfRows, void* merger);
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo); SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp);
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger); SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger, bool multigroupResult);
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
......
...@@ -148,6 +148,7 @@ typedef struct SQueryInfo { ...@@ -148,6 +148,7 @@ typedef struct SQueryInfo {
bool orderProjectQuery; bool orderProjectQuery;
bool stateWindow; bool stateWindow;
bool globalMerge; bool globalMerge;
bool multigroupResult;
} SQueryInfo; } SQueryInfo;
/** /**
......
...@@ -39,15 +39,12 @@ ...@@ -39,15 +39,12 @@
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) #define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
#define TSWINDOW_IS_EQUAL(t1, t2) (((t1).skey == (t2).skey) && ((t1).ekey == (t2).ekey)) #define TSWINDOW_IS_EQUAL(t1, t2) (((t1).skey == (t2).skey) && ((t1).ekey == (t2).ekey))
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
#define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0} #define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0}
#define MULTI_KEY_DELIM "-" #define MULTI_KEY_DELIM "-"
#define HASH_CAPACITY_LIMIT 10000000
#define TIME_WINDOW_COPY(_dst, _src) do {\ #define TIME_WINDOW_COPY(_dst, _src) do {\
(_dst).skey = (_src).skey;\ (_dst).skey = (_src).skey;\
(_dst).ekey = (_src).ekey;\ (_dst).ekey = (_src).ekey;\
...@@ -968,8 +965,6 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t ...@@ -968,8 +965,6 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t
break; break;
} }
} }
return;
} }
static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset,
...@@ -2266,30 +2261,30 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2266,30 +2261,30 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_Fill: { case OP_Fill: {
SOperatorInfo* pInfo = pRuntimeEnv->proot; SOperatorInfo* pInfo = pRuntimeEnv->proot;
pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput); pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput, pQueryAttr->multigroupResult);
break; break;
} }
case OP_MultiwayMergeSort: { case OP_MultiwayMergeSort: {
bool groupMix = true; pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, 4096, merger);
if (pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) { break;
groupMix = false;
} }
pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, case OP_GlobalAggregate: { // If fill operator exists, the result rows of different group can not be in the same SSDataBlock.
4096, merger, groupMix); // TODO hack it bool multigroupResult = pQueryAttr->multigroupResult;
break; if (pQueryAttr->multigroupResult) {
multigroupResult = (pQueryAttr->fillType == TSDB_FILL_NONE);
} }
case OP_GlobalAggregate: {
pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo); pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, multigroupResult);
break; break;
} }
case OP_SLimit: { case OP_SLimit: {
pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, int32_t num = pRuntimeEnv->proot->numOfOutput;
pQueryAttr->numOfExpr3, merger); SExprInfo* pExpr = pRuntimeEnv->proot->pExpr;
pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger, pQueryAttr->multigroupResult);
break; break;
} }
...@@ -3648,7 +3643,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf ...@@ -3648,7 +3643,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
pBInfo->pCtx[i].pOutput = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows; pBInfo->pCtx[i].pOutput = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows;
// re-estabilish output buffer pointer. // set the correct pointer after the memory buffer reallocated.
int32_t functionId = pBInfo->pCtx[i].functionId; int32_t functionId = pBInfo->pCtx[i].functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) { if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
if(i>0) pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput; if(i>0) pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput;
...@@ -4213,6 +4208,7 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti ...@@ -4213,6 +4208,7 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti
// refactor : extract method // refactor : extract method
SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0); SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0);
//add condition (pBlock->info.rows >= 1) just to runtime happy //add condition (pBlock->info.rows >= 1) just to runtime happy
if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && pBlock->info.rows >= 1) { if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && pBlock->info.rows >= 1) {
STimeWindow* w = &pBlock->info.window; STimeWindow* w = &pBlock->info.window;
...@@ -4292,15 +4288,15 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data ...@@ -4292,15 +4288,15 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
} }
} }
int32_t doFillTimeIntervalGapsInResults(SFillInfo* pFillInfo, SSDataBlock *pOutput, int32_t capacity) { int32_t doFillTimeIntervalGapsInResults(SFillInfo* pFillInfo, SSDataBlock *pOutput, int32_t capacity, void** p) {
void** p = calloc(pFillInfo->numOfCols, POINTER_BYTES);
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i);
p[i] = pColInfoData->pData; p[i] = pColInfoData->pData + (pColInfoData->info.bytes * pOutput->info.rows);
} }
pOutput->info.rows = (int32_t)taosFillResultDataBlock(pFillInfo, p, capacity); int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, p, capacity - pOutput->info.rows);
tfree(p); pOutput->info.rows += numOfRows;
return pOutput->info.rows; return pOutput->info.rows;
} }
...@@ -5344,11 +5340,12 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -5344,11 +5340,12 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) {
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param; SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param;
taosArrayDestroy(pInfo->orderColumnList); taosArrayDestroy(pInfo->orderColumnList);
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
tfree(pInfo->prevRow); tfree(pInfo->prevRow);
} }
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream,
SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo) { SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp) {
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
pInfo->resultRowFactor = pInfo->resultRowFactor =
...@@ -5356,13 +5353,12 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, ...@@ -5356,13 +5353,12 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
pRuntimeEnv->scanFlag = MERGE_STAGE; // TODO init when creating pCtx pRuntimeEnv->scanFlag = MERGE_STAGE; // TODO init when creating pCtx
pInfo->multiGroupResults = groupResultMixedUp;
pInfo->pMerge = param; pInfo->pMerge = param;
pInfo->bufCapacity = 4096; pInfo->bufCapacity = 4096;
pInfo->udfInfo = pUdfInfo; pInfo->udfInfo = pUdfInfo;
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr);
pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
...@@ -5417,17 +5413,15 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, ...@@ -5417,17 +5413,15 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
} }
SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput, SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput,
int32_t numOfRows, void *merger, bool groupMix) { int32_t numOfRows, void *merger) {
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
pInfo->pMerge = merger; pInfo->pMerge = merger;
pInfo->groupMix = groupMix;
pInfo->bufCapacity = numOfRows; pInfo->bufCapacity = numOfRows;
pInfo->orderColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); pInfo->orderColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows);
{ { // todo extract method to create prev compare buffer
int32_t len = 0; int32_t len = 0;
for(int32_t i = 0; i < numOfOutput; ++i) { for(int32_t i = 0; i < numOfOutput; ++i) {
len += pExpr[i].base.colBytes; len += pExpr[i].base.colBytes;
...@@ -5435,8 +5429,8 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx ...@@ -5435,8 +5429,8 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0; int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0;
pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len));
int32_t offset = POINTER_BYTES * numOfCols;
int32_t offset = POINTER_BYTES * numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
pInfo->prevRow[i] = (char*)pInfo->prevRow + offset; pInfo->prevRow[i] = (char*)pInfo->prevRow + offset;
...@@ -5452,7 +5446,8 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx ...@@ -5452,7 +5446,8 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; pOperator->numOfOutput = numOfOutput;
pOperator->pExpr = pExpr;
pOperator->exec = doMultiwayMergeSort; pOperator->exec = doMultiwayMergeSort;
pOperator->cleanup = destroyGlobalAggOperatorInfo; pOperator->cleanup = destroyGlobalAggOperatorInfo;
return pOperator; return pOperator;
...@@ -6362,19 +6357,13 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { ...@@ -6362,19 +6357,13 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
static SSDataBlock* doFill(void* param, bool* newgroup) { static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SQueryRuntimeEnv *pRuntimeEnv, bool *newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SFillOperatorInfo *pInfo = pOperator->info;
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
if (taosFillHasMoreResults(pInfo->pFillInfo)) { if (taosFillHasMoreResults(pInfo->pFillInfo)) {
*newgroup = false; *newgroup = false;
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity); doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity, pInfo->p);
return pInfo->pRes; if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult)) {
return;
}
} }
// handle the cached new group data block // handle the cached new group data block
...@@ -6386,11 +6375,47 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { ...@@ -6386,11 +6375,47 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p);
pInfo->existNewGroupBlock = NULL; pInfo->existNewGroupBlock = NULL;
*newgroup = true; *newgroup = true;
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
} }
}
static SSDataBlock* doFill(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
SFillOperatorInfo *pInfo = pOperator->info;
pInfo->pRes->info.rows = 0;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup);
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) {
return pInfo->pRes;
}
// if (taosFillHasMoreResults(pInfo->pFillInfo)) {
// *newgroup = false;
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity);
// return pInfo->pRes;
// }
//
// // handle the cached new group data block
// if (pInfo->existNewGroupBlock) {
// pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
// taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
//
// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
// taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
//
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
// pInfo->existNewGroupBlock = NULL;
// *newgroup = true;
// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
// }
while(1) { while(1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
...@@ -6405,8 +6430,8 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { ...@@ -6405,8 +6430,8 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
pInfo->existNewGroupBlock = pBlock; pInfo->existNewGroupBlock = pBlock;
*newgroup = false; *newgroup = false;
// fill the previous group data block // Fill the previous group data block, before handle the data block of new group.
// before handle a new data block, close the fill operation for previous group data block // Close the fill operation for previous group data block
taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey); taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey);
} else { } else {
if (pBlock == NULL) { if (pBlock == NULL) {
...@@ -6418,28 +6443,61 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { ...@@ -6418,28 +6443,61 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey); taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey);
} else { } else {
pInfo->totalInputRows += pBlock->info.rows; pInfo->totalInputRows += pBlock->info.rows;
taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
int64_t ekey = /*Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) ? pRuntimeEnv->pQueryAttr->window.ekey
: */pBlock->info.window.ekey;
taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, ekey);
taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock); taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock);
} }
} }
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p);
if (pInfo->pRes->info.rows > 0) { // current group has no more result to return
// current group has no more result to return
if (pInfo->pRes->info.rows > 0) {
// 1. The result in current group not reach the threshold of output result, continue
// 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL || (!pInfo->multigroupResult)) {
return pInfo->pRes; return pInfo->pRes;
}
doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup);
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL) {
return pInfo->pRes;
}
// if (taosFillHasMoreResults(pInfo->pFillInfo)) {
// *newgroup = false;
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity);
// return pInfo->pRes;
// }
//
// // handle the cached new group data block
// if (pInfo->existNewGroupBlock) {
// pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
// taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
//
// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
// taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
//
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
// pInfo->existNewGroupBlock = NULL;
// *newgroup = true;
//
// if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) {
// return pInfo->pRes;
// }
//
//// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
// }
} else if (pInfo->existNewGroupBlock) { // try next group } else if (pInfo->existNewGroupBlock) { // try next group
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
int64_t ekey = /*Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) ? pRuntimeEnv->pQueryAttr->window.ekey int64_t ekey = pInfo->existNewGroupBlock->info.window.ekey;
:*/ pInfo->existNewGroupBlock->info.window.ekey;
taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p);
pInfo->existNewGroupBlock = NULL; pInfo->existNewGroupBlock = NULL;
*newgroup = true; *newgroup = true;
...@@ -6447,7 +6505,6 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { ...@@ -6447,7 +6505,6 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
} else { } else {
return NULL; return NULL;
} }
// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
} }
} }
...@@ -6548,6 +6605,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -6548,6 +6605,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param; SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param;
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
pInfo->pRes = destroyOutputBuf(pInfo->pRes); pInfo->pRes = destroyOutputBuf(pInfo->pRes);
tfree(pInfo->p);
} }
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
...@@ -6891,10 +6949,10 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato ...@@ -6891,10 +6949,10 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
return pOperator; return pOperator;
} }
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) {
int32_t numOfOutput) {
SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
pInfo->multigroupResult = multigroupResult;
{ {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
...@@ -6909,6 +6967,8 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn ...@@ -6909,6 +6967,8 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput, taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput,
pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit, pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit,
(int8_t)pQueryAttr->precision, pQueryAttr->fillType, pColInfo, pRuntimeEnv->qinfo); (int8_t)pQueryAttr->precision, pQueryAttr->fillType, pColInfo, pRuntimeEnv->qinfo);
pInfo->p = calloc(pInfo->pFillInfo->numOfCols, POINTER_BYTES);
} }
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
...@@ -6928,7 +6988,7 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn ...@@ -6928,7 +6988,7 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
return pOperator; return pOperator;
} }
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger) { SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) {
SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo)); SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo));
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
...@@ -6936,9 +6996,11 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator ...@@ -6936,9 +6996,11 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr); pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr);
pInfo->slimit = pQueryAttr->slimit; pInfo->slimit = pQueryAttr->slimit;
pInfo->limit = pQueryAttr->limit; pInfo->limit = pQueryAttr->limit;
pInfo->capacity = pRuntimeEnv->resultInfo.capacity;
pInfo->currentGroupOffset = pQueryAttr->slimit.offset; pInfo->threshold = (int64_t) (pInfo->capacity * 0.8);
pInfo->currentOffset = pQueryAttr->limit.offset; pInfo->currentOffset = pQueryAttr->limit.offset;
pInfo->currentGroupOffset = pQueryAttr->slimit.offset;
pInfo->multigroupResult= multigroupResult;
// TODO refactor // TODO refactor
int32_t len = 0; int32_t len = 0;
...@@ -6946,10 +7008,10 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator ...@@ -6946,10 +7008,10 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
len += pExpr[i].base.resBytes; len += pExpr[i].base.resBytes;
} }
int32_t numOfCols = pInfo->orderColumnList != NULL? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0; int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0;
pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len));
int32_t offset = POINTER_BYTES * numOfCols;
int32_t offset = POINTER_BYTES * numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
pInfo->prevRow[i] = (char*)pInfo->prevRow + offset; pInfo->prevRow[i] = (char*)pInfo->prevRow + offset;
...@@ -6957,6 +7019,8 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator ...@@ -6957,6 +7019,8 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
offset += pExpr[index->colIndex].base.resBytes; offset += pExpr[index->colIndex].base.resBytes;
} }
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SLimitOperator"; pOperator->name = "SLimitOperator";
......
...@@ -430,7 +430,7 @@ void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) ...@@ -430,7 +430,7 @@ void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput)
SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i); SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i);
pFillInfo->pData[i] = pColData->pData; pFillInfo->pData[i] = pColData->pData;
if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) { // copy the tag value to tag value buffer if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex]; SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
assert (pTag->col.colId == pCol->col.colId); assert (pTag->col.colId == pCol->col.colId);
memcpy(pTag->tagVal, pColData->pData, pCol->col.bytes); // TODO not memcpy?? memcpy(pTag->tagVal, pColData->pData, pCol->col.bytes); // TODO not memcpy??
......
...@@ -15,6 +15,7 @@ import sys ...@@ -15,6 +15,7 @@ import sys
import subprocess import subprocess
import random import random
import math import math
import numpy as np
from util.log import * from util.log import *
from util.cases import * from util.cases import *
...@@ -57,16 +58,33 @@ class TDTestCase: ...@@ -57,16 +58,33 @@ class TDTestCase:
def td3690(self): def td3690(self):
tdLog.printNoPrefix("==========TD-3690==========") tdLog.printNoPrefix("==========TD-3690==========")
tdSql.prepare()
tdSql.execute("show variables")
res_off = tdSql.cursor.fetchall()
resList = np.array(res_off)
index = np.where(resList == "offlineThreshold")
index_value = np.dstack((index[0])).squeeze()
tdSql.query("show variables") tdSql.query("show variables")
tdSql.checkData(53, 1, 864000) tdSql.checkData(index_value, 1, 864000)
def td4082(self): def td4082(self):
tdLog.printNoPrefix("==========TD-4082==========") tdLog.printNoPrefix("==========TD-4082==========")
tdSql.prepare()
cfgfile = self.getCfgFile() cfgfile = self.getCfgFile()
max_compressMsgSize = 100000000 max_compressMsgSize = 100000000
tdSql.execute("show variables")
res_com = tdSql.cursor.fetchall()
rescomlist = np.array(res_com)
cpms_index = np.where(rescomlist == "compressMsgSize")
index_value = np.dstack((cpms_index[0])).squeeze()
tdSql.query("show variables") tdSql.query("show variables")
tdSql.checkData(26, 1, -1) tdSql.checkData(index_value, 1, -1)
tdSql.query("show dnodes") tdSql.query("show dnodes")
index = tdSql.getData(0, 0) index = tdSql.getData(0, 0)
...@@ -80,7 +98,7 @@ class TDTestCase: ...@@ -80,7 +98,7 @@ class TDTestCase:
tdDnodes.start(index) tdDnodes.start(index)
tdSql.query("show variables") tdSql.query("show variables")
tdSql.checkData(26, 1, 100000000) tdSql.checkData(index_value, 1, 100000000)
tdDnodes.stop(index) tdDnodes.stop(index)
cmd = f"sed -i '$s/{max_compressMsgSize}/{max_compressMsgSize+10}/g' {cfgfile} " cmd = f"sed -i '$s/{max_compressMsgSize}/{max_compressMsgSize+10}/g' {cfgfile} "
...@@ -91,7 +109,7 @@ class TDTestCase: ...@@ -91,7 +109,7 @@ class TDTestCase:
tdDnodes.start(index) tdDnodes.start(index)
tdSql.query("show variables") tdSql.query("show variables")
tdSql.checkData(26, 1, -1) tdSql.checkData(index_value, 1, -1)
tdDnodes.stop(index) tdDnodes.stop(index)
cmd = f"sed -i '$d' {cfgfile}" cmd = f"sed -i '$d' {cfgfile}"
...@@ -104,8 +122,12 @@ class TDTestCase: ...@@ -104,8 +122,12 @@ class TDTestCase:
def td4097(self): def td4097(self):
tdLog.printNoPrefix("==========TD-4097==========") tdLog.printNoPrefix("==========TD-4097==========")
tdSql.execute("drop database if exists db") tdSql.execute("drop database if exists db")
tdSql.execute("drop database if exists db1") tdSql.execute("drop database if exists db1")
tdDnodes.stop(1)
tdDnodes.start(1)
tdSql.execute("create database if not exists db keep 3650") tdSql.execute("create database if not exists db keep 3650")
tdSql.execute("create database if not exists db1 keep 3650") tdSql.execute("create database if not exists db1 keep 3650")
tdSql.execute("create database if not exists new keep 3650") tdSql.execute("create database if not exists new keep 3650")
...@@ -267,10 +289,22 @@ class TDTestCase: ...@@ -267,10 +289,22 @@ class TDTestCase:
# keep ~ [days,365000] # keep ~ [days,365000]
tdSql.execute("drop database if exists db") tdSql.execute("drop database if exists db")
tdSql.execute("create database if not exists db") tdSql.execute("create database if not exists db")
tdSql.execute("show variables")
res_kp = tdSql.cursor.fetchall()
resList = np.array(res_kp)
keep_index = np.where(resList == "keep")
index_value = np.dstack((keep_index[0])).squeeze()
tdSql.query("show variables") tdSql.query("show variables")
tdSql.checkData(38, 1, 3650) tdSql.checkData(index_value, 1, 3650)
tdSql.query("show databases") tdSql.query("show databases")
tdSql.checkData(0,7,"3650,3650,3650") selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
tdSql.checkData(0, 7, "3650,3650,3650")
else:
tdSql.checkData(0, 7, 3650)
days = tdSql.getData(0, 6) days = tdSql.getData(0, 6)
tdSql.error("alter database db keep 3650001") tdSql.error("alter database db keep 3650001")
...@@ -289,14 +323,22 @@ class TDTestCase: ...@@ -289,14 +323,22 @@ class TDTestCase:
tdSql.execute("alter database db keep 36500") tdSql.execute("alter database db keep 36500")
tdSql.query("show databases") tdSql.query("show databases")
tdSql.checkData(0, 7, "3650,3650,36500") if ("community" in selfPath):
tdSql.checkData(0, 7, "36500,36500,36500")
else:
tdSql.checkData(0, 7, 36500)
tdSql.execute("drop database if exists db") tdSql.execute("drop database if exists db")
tdSql.execute("create database if not exists db1") tdSql.execute("create database if not exists db1")
tdSql.query("show databases") tdSql.query("show databases")
if ("community" in selfPath):
tdSql.checkData(0, 7, "3650,3650,3650") tdSql.checkData(0, 7, "3650,3650,3650")
else:
tdSql.checkData(0, 7, 3650)
tdSql.query("show variables") tdSql.query("show variables")
tdSql.checkData(38, 1, 3650) tdSql.checkData(index_value, 1, 3650)
tdSql.execute("alter database db1 keep 365") tdSql.execute("alter database db1 keep 365")
tdSql.execute("drop database if exists db1") tdSql.execute("drop database if exists db1")
...@@ -697,10 +739,8 @@ class TDTestCase: ...@@ -697,10 +739,8 @@ class TDTestCase:
tdSql.checkRows(tbnum*3) tdSql.checkRows(tbnum*3)
tdSql.query(f"select distinct c1 c2, c2 c3 from t1 where c1 <{tbnum}") tdSql.query(f"select distinct c1 c2, c2 c3 from t1 where c1 <{tbnum}")
tdSql.checkRows(3) tdSql.checkRows(3)
tdSql.query("select distinct c1, c2 from stb1 order by ts") tdSql.error("select distinct c1, c2 from stb1 order by ts")
tdSql.checkRows(tbnum*3+1) tdSql.error("select distinct c1, c2 from t1 order by ts")
tdSql.query("select distinct c1, c2 from t1 order by ts")
tdSql.checkRows(4)
tdSql.error("select distinct c1, ts from stb1 group by c2") tdSql.error("select distinct c1, ts from stb1 group by c2")
tdSql.error("select distinct c1, ts from t1 group by c2") tdSql.error("select distinct c1, ts from t1 group by c2")
tdSql.error("select distinct c1, max(c2) from stb1 ") tdSql.error("select distinct c1, max(c2) from stb1 ")
...@@ -1085,9 +1125,9 @@ class TDTestCase: ...@@ -1085,9 +1125,9 @@ class TDTestCase:
def run(self): def run(self):
# master branch # master branch
# self.td3690() self.td3690()
# self.td4082() self.td4082()
# self.td4288() self.td4288()
# self.td4724() # self.td4724()
# self.td5798() # self.td5798()
# self.td5935() # self.td5935()
......
...@@ -12,6 +12,8 @@ ...@@ -12,6 +12,8 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import sys import sys
import numpy as np
from util.log import * from util.log import *
from util.cases import * from util.cases import *
from util.sql import * from util.sql import *
...@@ -24,8 +26,17 @@ class TDTestCase: ...@@ -24,8 +26,17 @@ class TDTestCase:
tdSql.init(conn.cursor(), logSql) tdSql.init(conn.cursor(), logSql)
def run(self): def run(self):
# tdSql.query("show variables")
# tdSql.checkData(54, 1, 864000)
tdSql.execute("show variables")
res = tdSql.cursor.fetchall()
resList = np.array(res)
index = np.where(resList == "offlineThreshold")
index_value = np.dstack((index[0])).squeeze()
tdSql.query("show variables") tdSql.query("show variables")
tdSql.checkData(54, 1, 864000) tdSql.checkData(index_value, 1, 864000)
pass
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
...@@ -18,6 +18,7 @@ from util.cases import tdCases ...@@ -18,6 +18,7 @@ from util.cases import tdCases
from util.sql import tdSql from util.sql import tdSql
from util.dnodes import tdDnodes from util.dnodes import tdDnodes
class TDTestCase: class TDTestCase:
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__) tdLog.debug("start to execute %s" % __file__)
...@@ -82,7 +83,8 @@ class TDTestCase: ...@@ -82,7 +83,8 @@ class TDTestCase:
## test case for https://jira.taosdata.com:18080/browse/TD-1930 ## test case for https://jira.taosdata.com:18080/browse/TD-1930
tdSql.execute("create table tb(ts timestamp, c1 int, c2 binary(10), c3 nchar(10), c4 float, c5 bool)") tdSql.execute("create table tb(ts timestamp, c1 int, c2 binary(10), c3 nchar(10), c4 float, c5 bool)")
for i in range(10): for i in range(10):
tdSql.execute("insert into tb values(%d, %d, 'binary%d', 'nchar%d', %f, %d)" % (self.ts + i, i, i, i, i + 0.1, i % 2)) tdSql.execute(
"insert into tb values(%d, %d, 'binary%d', 'nchar%d', %f, %d)" % (self.ts + i, i, i, i, i + 0.1, i % 2))
tdSql.error("select * from tb where c2 = binary2") tdSql.error("select * from tb where c2 = binary2")
tdSql.error("select * from tb where c3 = nchar2") tdSql.error("select * from tb where c3 = nchar2")
...@@ -119,9 +121,14 @@ class TDTestCase: ...@@ -119,9 +121,14 @@ class TDTestCase:
tdSql.query("select * from tb") tdSql.query("select * from tb")
tdSql.checkRows(1) tdSql.checkRows(1)
tdSql.query("select * from tb0") # For jira:https://jira.taosdata.com:18080/browse/TD-6314
tdSql.execute("use db")
tdSql.execute("create stable stb_001(ts timestamp,v int) tags(c0 int)")
tdSql.query("select _block_dist() from stb_001")
tdSql.checkRows(1) tdSql.checkRows(1)
tdSql.query("select * from tb0")
tdSql.checkRows(1)
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册