提交 70a02863 编写于 作者: H Haojun Liao

[td-2859] refactor and fix memory leaks.

上级 fb8024fb
...@@ -44,26 +44,16 @@ typedef struct SLocalMerger { ...@@ -44,26 +44,16 @@ typedef struct SLocalMerger {
int32_t numOfCompleted; int32_t numOfCompleted;
int32_t numOfVnode; int32_t numOfVnode;
SLoserTreeInfo * pLoserTree; SLoserTreeInfo * pLoserTree;
char * prevRowOfInput;
tFilePage * pResultBuf; tFilePage * pResultBuf;
int32_t nResultBufSize; int32_t nResultBufSize;
tFilePage * pTempBuffer; tFilePage * pTempBuffer;
struct SQLFunctionCtx *pCtx; struct SQLFunctionCtx *pCtx;
int32_t rowSize; // size of each intermediate result. int32_t rowSize; // size of each intermediate result.
bool hasPrevRow; // cannot be released
bool hasUnprocessedRow;
tOrderDescriptor * pDesc; tOrderDescriptor * pDesc;
SColumnModel * resColModel; SColumnModel * resColModel;
SColumnModel* finalModel; SColumnModel* finalModel;
tExtMemBuffer ** pExtMemBuffer; // disk-based buffer tExtMemBuffer ** pExtMemBuffer; // disk-based buffer
SFillInfo* pFillInfo; // interpolation support structure
char* pFinalRes; // result data after interpo
tFilePage* discardData;
bool discard;
int32_t offset; // limit offset value
bool orderPrjOnSTable; // projection query on stable bool orderPrjOnSTable; // projection query on stable
char* tagBuf; // max tag buffer
int32_t tagBufLen;
} SLocalMerger; } SLocalMerger;
typedef struct SRetrieveSupport { typedef struct SRetrieveSupport {
...@@ -96,7 +86,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde ...@@ -96,7 +86,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
void tscDestroyLocalMerger(SSqlObj *pSql); void tscDestroyLocalMerger(SSqlObj *pSql);
int32_t tscDoLocalMerge(SSqlObj *pSql); //int32_t tscDoLocalMerge(SSqlObj *pSql);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -129,58 +129,39 @@ void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchem ...@@ -129,58 +129,39 @@ void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchem
} }
} }
static UNUSED_FUNC void setCtxInputOutputBuffer(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx, SLocalMerger *pMerger, //static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) {
tOrderDescriptor *pDesc) { // int32_t numOfCols = (int32_t)tscNumOfFields(pQueryInfo);
size_t size = tscSqlExprNumOfExprs(pQueryInfo); // int32_t offset = 0;
//
for (int32_t i = 0; i < size; ++i) { // SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo));
SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i); // for(int32_t i = 0; i < numOfCols; ++i) {
pCtx[i].pOutput = pMerger->pResultBuf->data + pExpr->base.offset * pMerger->resColModel->capacity; // SInternalField* pIField = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i);
//
// input buffer hold only one point data // if (pIField->pExpr->pExpr == NULL) {
int16_t offset = getColumnModelOffset(pDesc->pColumnModel, i); // SExprInfo* pExpr = pIField->pExpr;
pCtx[i].pInput = pMerger->pTempBuffer->data + offset; //
// pFillCol[i].col.bytes = pExpr->base.resBytes;
int32_t functionId = pCtx[i].functionId; // pFillCol[i].col.type = (int8_t)pExpr->base.resType;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { // pFillCol[i].col.colId = pExpr->base.colInfo.colId;
pCtx[i].ptsOutputBuf = pCtx[0].pOutput; // pFillCol[i].flag = pExpr->base.colInfo.flag;
} // pFillCol[i].col.offset = offset;
} // pFillCol[i].functionId = pExpr->base.functionId;
} // pFillCol[i].fillVal.i = pQueryInfo->fillVal[i];
// } else {
static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) { // pFillCol[i].col.bytes = pIField->field.bytes;
int32_t numOfCols = (int32_t)tscNumOfFields(pQueryInfo); // pFillCol[i].col.type = (int8_t)pIField->field.type;
int32_t offset = 0; // pFillCol[i].col.colId = -100;
// pFillCol[i].flag = TSDB_COL_NORMAL;
SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo)); // pFillCol[i].col.offset = offset;
for(int32_t i = 0; i < numOfCols; ++i) { // pFillCol[i].functionId = -1;
SInternalField* pIField = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i); // pFillCol[i].fillVal.i = pQueryInfo->fillVal[i];
// }
if (pIField->pExpr->pExpr == NULL) { //
SExprInfo* pExpr = pIField->pExpr; // offset += pFillCol[i].col.bytes;
// }
pFillCol[i].col.bytes = pExpr->base.resBytes; //
pFillCol[i].col.type = (int8_t)pExpr->base.resType; // return pFillCol;
pFillCol[i].col.colId = pExpr->base.colInfo.colId; //}
pFillCol[i].flag = pExpr->base.colInfo.flag;
pFillCol[i].col.offset = offset;
pFillCol[i].functionId = pExpr->base.functionId;
pFillCol[i].fillVal.i = pQueryInfo->fillVal[i];
} else {
pFillCol[i].col.bytes = pIField->field.bytes;
pFillCol[i].col.type = (int8_t)pIField->field.type;
pFillCol[i].col.colId = -100;
pFillCol[i].flag = TSDB_COL_NORMAL;
pFillCol[i].col.offset = offset;
pFillCol[i].functionId = -1;
pFillCol[i].fillVal.i = pQueryInfo->fillVal[i];
}
offset += pFillCol[i].col.bytes;
}
return pFillCol;
}
void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc, void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SColumnModel *finalmodel, SColumnModel *pFFModel, SSqlObj *pSql) { SColumnModel *finalmodel, SColumnModel *pFFModel, SSqlObj *pSql) {
...@@ -333,15 +314,8 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde ...@@ -333,15 +314,8 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
assert(false); // todo fixed row size is larger than the minimum page size; assert(false); // todo fixed row size is larger than the minimum page size;
} }
pMerger->hasPrevRow = false;
pMerger->hasUnprocessedRow = false;
pMerger->prevRowOfInput = (char *)calloc(1, pMerger->rowSize);
// used to keep the latest input row // used to keep the latest input row
pMerger->pTempBuffer = (tFilePage *)calloc(1, pMerger->rowSize + sizeof(tFilePage)); pMerger->pTempBuffer = (tFilePage *)calloc(1, pMerger->rowSize + sizeof(tFilePage));
pMerger->discardData = (tFilePage *)calloc(1, pMerger->rowSize + sizeof(tFilePage));
pMerger->discard = false;
pMerger->nResultBufSize = pMemBuffer[0]->pageSize * 16; pMerger->nResultBufSize = pMemBuffer[0]->pageSize * 16;
pMerger->pResultBuf = (tFilePage *)calloc(1, pMerger->nResultBufSize + sizeof(tFilePage)); pMerger->pResultBuf = (tFilePage *)calloc(1, pMerger->nResultBufSize + sizeof(tFilePage));
...@@ -355,15 +329,15 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde ...@@ -355,15 +329,15 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
} }
assert(finalmodel->rowSize > 0 && finalmodel->rowSize <= pMerger->rowSize); assert(finalmodel->rowSize > 0 && finalmodel->rowSize <= pMerger->rowSize);
pMerger->pFinalRes = calloc(1, pMerger->rowSize * pMerger->resColModel->capacity); // pMerger->pFinalRes = calloc(1, pMerger->rowSize * pMerger->resColModel->capacity);
if (pMerger->pTempBuffer == NULL || pMerger->discardData == NULL || pMerger->pResultBuf == NULL || if (pMerger->pTempBuffer == NULL || pMerger->pLoserTree == NULL /*|| pMerger->pResultBuf == NULL ||
pMerger->pFinalRes == NULL || pMerger->prevRowOfInput == NULL) { pMerger->pFinalRes == NULL || pMerger->prevRowOfInput == NULL*/) {
tfree(pMerger->pTempBuffer); tfree(pMerger->pTempBuffer);
tfree(pMerger->discardData); // tfree(pMerger->discardData);
tfree(pMerger->pResultBuf); // tfree(pMerger->pResultBuf);
tfree(pMerger->pFinalRes); // tfree(pMerger->pFinalRes);
tfree(pMerger->prevRowOfInput); // tfree(pMerger->prevRowOfInput);
tfree(pMerger->pLoserTree); tfree(pMerger->pLoserTree);
tfree(param); tfree(param);
tfree(pMerger); tfree(pMerger);
...@@ -372,7 +346,6 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde ...@@ -372,7 +346,6 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
} }
pMerger->pTempBuffer->num = 0; pMerger->pTempBuffer->num = 0;
tscCreateResPointerInfo(pRes, pQueryInfo); tscCreateResPointerInfo(pRes, pQueryInfo);
SSchema* pschema = calloc(pDesc->pColumnModel->numOfCols, sizeof(SSchema)); SSchema* pschema = calloc(pDesc->pColumnModel->numOfCols, sizeof(SSchema));
...@@ -393,8 +366,6 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde ...@@ -393,8 +366,6 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
} }
} }
pMerger->tagBuf = calloc(1, maxBufSize);
// we change the capacity of schema to denote that there is only one row in temp buffer // we change the capacity of schema to denote that there is only one row in temp buffer
pMerger->pDesc->pColumnModel->capacity = 1; pMerger->pDesc->pColumnModel->capacity = 1;
...@@ -404,24 +375,22 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde ...@@ -404,24 +375,22 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
pQueryInfo->limit.offset = pQueryInfo->prjOffset; pQueryInfo->limit.offset = pQueryInfo->prjOffset;
} }
pMerger->offset = (int32_t)pQueryInfo->limit.offset;
pRes->pLocalMerger = pMerger; pRes->pLocalMerger = pMerger;
pRes->numOfGroups = 0; pRes->numOfGroups = 0;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); // STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); // STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
TSKEY stime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.skey : pQueryInfo->window.ekey; // TSKEY stime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.skey : pQueryInfo->window.ekey;
int64_t revisedSTime = taosTimeTruncate(stime, &pQueryInfo->interval, tinfo.precision); // int64_t revisedSTime = taosTimeTruncate(stime, &pQueryInfo->interval, tinfo.precision);
if (pQueryInfo->fillType != TSDB_FILL_NONE) { // if (pQueryInfo->fillType != TSDB_FILL_NONE) {
SFillColInfo* pFillCol = createFillColInfo(pQueryInfo); // SFillColInfo* pFillCol = createFillColInfo(pQueryInfo);
pMerger->pFillInfo = // pMerger->pFillInfo =
taosCreateFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, 4096, // taosCreateFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, 4096,
(int32_t)pQueryInfo->fieldsInfo.numOfOutput, pQueryInfo->interval.sliding, // (int32_t)pQueryInfo->fieldsInfo.numOfOutput, pQueryInfo->interval.sliding,
pQueryInfo->interval.slidingUnit, tinfo.precision, pQueryInfo->fillType, pFillCol, pSql); // pQueryInfo->interval.slidingUnit, tinfo.precision, pQueryInfo->fillType, pFillCol, pSql);
} // }
} }
static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage,
...@@ -522,33 +491,30 @@ void tscDestroyLocalMerger(SSqlObj *pSql) { ...@@ -522,33 +491,30 @@ void tscDestroyLocalMerger(SSqlObj *pSql) {
return; return;
} }
SSqlCmd * pCmd = &pSql->cmd; // SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); // SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
// there is no more result, so we release all allocated resource // there is no more result, so we release all allocated resource
SLocalMerger *pLocalMerge = (SLocalMerger *)atomic_exchange_ptr(&pRes->pLocalMerger, NULL); SLocalMerger *pLocalMerge = (SLocalMerger *)atomic_exchange_ptr(&pRes->pLocalMerger, NULL);
if (pLocalMerge != NULL) { if (pLocalMerge != NULL) {
pLocalMerge->pFillInfo = taosDestroyFillInfo(pLocalMerge->pFillInfo); // pLocalMerge->pFillInfo = taosDestroyFillInfo(pLocalMerge->pFillInfo);
if (pLocalMerge->pCtx != NULL) {
int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfExprs; ++i) {
SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[i];
tVariantDestroy(&pCtx->tag); // if (pLocalMerge->pCtx != NULL) {
tfree(pCtx->resultInfo); // int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
// for (int32_t i = 0; i < numOfExprs; ++i) {
if (pCtx->tagInfo.pTagCtxList != NULL) { // SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[i];
tfree(pCtx->tagInfo.pTagCtxList); //
} // tVariantDestroy(&pCtx->tag);
} // tfree(pCtx->resultInfo);
//
tfree(pLocalMerge->pCtx); // if (pCtx->tagInfo.pTagCtxList != NULL) {
} // tfree(pCtx->tagInfo.pTagCtxList);
// }
tfree(pLocalMerge->prevRowOfInput); // }
//
// tfree(pLocalMerge->pCtx);
// }
tfree(pLocalMerge->pTempBuffer);
tfree(pLocalMerge->pResultBuf); tfree(pLocalMerge->pResultBuf);
if (pLocalMerge->pLoserTree) { if (pLocalMerge->pLoserTree) {
...@@ -556,9 +522,6 @@ void tscDestroyLocalMerger(SSqlObj *pSql) { ...@@ -556,9 +522,6 @@ void tscDestroyLocalMerger(SSqlObj *pSql) {
tfree(pLocalMerge->pLoserTree); tfree(pLocalMerge->pLoserTree);
} }
tfree(pLocalMerge->pFinalRes);
tfree(pLocalMerge->discardData);
tscLocalReducerEnvDestroy(pLocalMerge->pExtMemBuffer, pLocalMerge->pDesc, pLocalMerge->resColModel, pLocalMerge->finalModel, tscLocalReducerEnvDestroy(pLocalMerge->pExtMemBuffer, pLocalMerge->pDesc, pLocalMerge->resColModel, pLocalMerge->finalModel,
pLocalMerge->numOfVnode); pLocalMerge->numOfVnode);
for (int32_t i = 0; i < pLocalMerge->numOfBuffer; ++i) { for (int32_t i = 0; i < pLocalMerge->numOfBuffer; ++i) {
...@@ -905,227 +868,6 @@ void adjustLoserTreeFromNewData(SLocalMerger *pLocalMerge, SLocalDataSource *pOn ...@@ -905,227 +868,6 @@ void adjustLoserTreeFromNewData(SLocalMerger *pLocalMerge, SLocalDataSource *pOn
} }
} }
void savePrevRecordAndSetupFillInfo(SLocalMerger *pLocalMerge, SQueryInfo *pQueryInfo, SFillInfo *pFillInfo) {
// discard following dataset in the same group and reset the interpolation information
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
if (pFillInfo != NULL) {
int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
int64_t revisedSTime = taosTimeTruncate(stime, &pQueryInfo->interval, tinfo.precision);
taosResetFillInfo(pFillInfo, revisedSTime);
}
pLocalMerge->discard = true;
pLocalMerge->discardData->num = 0;
SColumnModel *pModel = pLocalMerge->pDesc->pColumnModel;
tColModelAppend(pModel, pLocalMerge->discardData, pLocalMerge->prevRowOfInput, 0, 1, 1);
}
static void genFinalResWithoutFill(SSqlRes* pRes, SLocalMerger *pLocalMerge, SQueryInfo* pQueryInfo) {
assert(pQueryInfo->interval.interval == 0 || pQueryInfo->fillType == TSDB_FILL_NONE);
tFilePage * pBeforeFillData = pLocalMerge->pResultBuf;
pRes->data = pLocalMerge->pFinalRes;
pRes->numOfRows = (int32_t) pBeforeFillData->num;
if (pQueryInfo->limit.offset > 0) {
if (pQueryInfo->limit.offset < pRes->numOfRows) {
int32_t prevSize = (int32_t) pBeforeFillData->num;
tColModelErase(pLocalMerge->finalModel, pBeforeFillData, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1);
/* remove the hole in column model */
tColModelCompact(pLocalMerge->finalModel, pBeforeFillData, prevSize);
pRes->numOfRows -= (int32_t) pQueryInfo->limit.offset;
pQueryInfo->limit.offset = 0;
} else {
pQueryInfo->limit.offset -= pRes->numOfRows;
pRes->numOfRows = 0;
}
}
if (pRes->numOfRowsGroup >= pQueryInfo->limit.limit && pQueryInfo->limit.limit > 0) {
pRes->numOfRows = 0;
pBeforeFillData->num = 0;
pLocalMerge->discard = true;
return;
}
pRes->numOfRowsGroup += pRes->numOfRows;
// impose the limitation of output rows on the final result
if (pQueryInfo->limit.limit >= 0 && pRes->numOfRowsGroup > pQueryInfo->limit.limit) {
int32_t prevSize = (int32_t)pBeforeFillData->num;
int32_t overflow = (int32_t)(pRes->numOfRowsGroup - pQueryInfo->limit.limit);
assert(overflow < pRes->numOfRows);
pRes->numOfRowsGroup = pQueryInfo->limit.limit;
pRes->numOfRows -= overflow;
pBeforeFillData->num -= overflow;
tColModelCompact(pLocalMerge->finalModel, pBeforeFillData, prevSize);
// set remain data to be discarded, and reset the interpolation information
savePrevRecordAndSetupFillInfo(pLocalMerge, pQueryInfo, pLocalMerge->pFillInfo);
}
memcpy(pRes->data, pBeforeFillData->data, (size_t)(pRes->numOfRows * pLocalMerge->finalModel->rowSize));
pRes->numOfClauseTotal += pRes->numOfRows;
pBeforeFillData->num = 0;
}
/*
* Note: pRes->pLocalMerge may be null, due to the fact that "tscDestroyLocalMerger" is called
* by "interuptHandler" function in shell
*/
static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutput) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
tFilePage *pBeforeFillData = pLocalMerge->pResultBuf;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SFillInfo *pFillInfo = pLocalMerge->pFillInfo;
// todo extract function
int64_t actualETime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.ekey: pQueryInfo->window.skey;
void** pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
pResPages[i] = calloc(1, pField->bytes * pLocalMerge->resColModel->capacity);
}
while (1) {
int64_t newRows = taosFillResultDataBlock(pFillInfo, pResPages, pLocalMerge->resColModel->capacity);
if (pQueryInfo->limit.offset < newRows) {
newRows -= pQueryInfo->limit.offset;
if (pQueryInfo->limit.offset > 0) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
memmove(pResPages[i], ((char*)pResPages[i]) + pField->bytes * pQueryInfo->limit.offset,
(size_t)(newRows * pField->bytes));
}
}
pRes->data = pLocalMerge->pFinalRes;
pRes->numOfRows = (int32_t) newRows;
pQueryInfo->limit.offset = 0;
break;
} else {
pQueryInfo->limit.offset -= newRows;
pRes->numOfRows = 0;
if (!taosFillHasMoreResults(pFillInfo)) {
if (!doneOutput) { // reduce procedure has not completed yet, but current results for fill are exhausted
break;
}
// all output in current group are completed
int32_t totalRemainRows = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, actualETime, pLocalMerge->resColModel->capacity);
if (totalRemainRows <= 0) {
break;
}
}
}
}
if (pRes->numOfRows > 0) {
int32_t currentTotal = (int32_t)(pRes->numOfRowsGroup + pRes->numOfRows);
if (pQueryInfo->limit.limit >= 0 && currentTotal > pQueryInfo->limit.limit) {
int32_t overflow = (int32_t)(currentTotal - pQueryInfo->limit.limit);
pRes->numOfRows -= overflow;
assert(pRes->numOfRows >= 0);
/* set remain data to be discarded, and reset the interpolation information */
savePrevRecordAndSetupFillInfo(pLocalMerge, pQueryInfo, pFillInfo);
}
int32_t offset = 0;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
memcpy(pRes->data + offset * pRes->numOfRows, pResPages[i], (size_t)(pField->bytes * pRes->numOfRows));
offset += pField->bytes;
}
pRes->numOfRowsGroup += pRes->numOfRows;
pRes->numOfClauseTotal += pRes->numOfRows;
}
pBeforeFillData->num = 0;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
tfree(pResPages[i]);
}
tfree(pResPages);
}
static void savePreviousRow(SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) {
SColumnModel *pColumnModel = pLocalMerge->pDesc->pColumnModel;
assert(pColumnModel->capacity == 1 && tmpBuffer->num == 1);
// copy to previous temp buffer
for (int32_t i = 0; i < pColumnModel->numOfCols; ++i) {
SSchema *pSchema = getColumnModelSchema(pColumnModel, i);
int16_t offset = getColumnModelOffset(pColumnModel, i);
memcpy(pLocalMerge->prevRowOfInput + offset, tmpBuffer->data + offset, pSchema->bytes);
}
tmpBuffer->num = 0;
pLocalMerge->hasPrevRow = true;
}
static void doExecuteFinalMerge( SLocalMerger *pLocalMerge, int32_t numOfExpr, bool needInit) {
// the tag columns need to be set before all functions execution
for (int32_t j = 0; j < numOfExpr; ++j) {
SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[j];
// tags/tags_dummy function, the tag field of SQLFunctionCtx is from the input buffer
int32_t functionId = pCtx->functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS_DUMMY) {
tVariantDestroy(&pCtx->tag);
char* input = pCtx->pInput;
if (pCtx->inputType == TSDB_DATA_TYPE_BINARY || pCtx->inputType == TSDB_DATA_TYPE_NCHAR) {
assert(varDataLen(input) <= pCtx->inputBytes);
tVariantCreateFromBinary(&pCtx->tag, varDataVal(input), varDataLen(input), pCtx->inputType);
} else {
tVariantCreateFromBinary(&pCtx->tag, input, pCtx->inputBytes, pCtx->inputType);
}
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
// SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, j); // TODO this data is from
// pCtx->param[0].i64 = pExpr->base.param[0].i64;
}
pCtx->currentStage = MERGE_STAGE;
if (needInit) {
aAggs[pCtx->functionId].init(pCtx);
}
}
for (int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pLocalMerge->pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
aAggs[functionId].mergeFunc(&pLocalMerge->pCtx[j]);
}
}
//TODO it is not ordered, fix it //TODO it is not ordered, fix it
static void savePrevOrderColumns(char** prevRow, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex, bool* hasPrev) { static void savePrevOrderColumns(char** prevRow, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex, bool* hasPrev) {
int32_t size = (int32_t) taosArrayGetSize(pColumnList); int32_t size = (int32_t) taosArrayGetSize(pColumnList);
...@@ -1242,39 +984,27 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S ...@@ -1242,39 +984,27 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S
tfree(add); tfree(add);
} }
static void handleUnprocessedRow(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) { //static int64_t getNumOfResultLocal(SQLFunctionCtx *pCtx, int32_t numOfExprs) {
if (pLocalMerge->hasUnprocessedRow) { // int64_t maxOutput = 0;
pLocalMerge->hasUnprocessedRow = false; //
// for (int32_t j = 0; j < numOfExprs; ++j) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); // /*
size_t size = tscSqlExprNumOfExprs(pQueryInfo); // * ts, tag, tagprj function can not decide the output number of current query
// * the number of output result is decided by main output
doExecuteFinalMerge(pLocalMerge, size, true); // */
savePreviousRow(pLocalMerge, tmpBuffer); // int32_t functionId = pCtx[j].functionId;
} // if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG) {
} // continue;
// }
static int64_t getNumOfResultLocal(SQLFunctionCtx *pCtx, int32_t numOfExprs) { //
int64_t maxOutput = 0; // SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
// if (maxOutput < pResInfo->numOfRes) {
for (int32_t j = 0; j < numOfExprs; ++j) { // maxOutput = pResInfo->numOfRes;
/* // }
* ts, tag, tagprj function can not decide the output number of current query // }
* the number of output result is decided by main output //
*/ // return maxOutput;
int32_t functionId = pCtx[j].functionId; //}
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG) {
continue;
}
SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
if (maxOutput < pResInfo->numOfRes) {
maxOutput = pResInfo->numOfRes;
}
}
return maxOutput;
}
/* /*
* in handling the top/bottom query, which produce more than one rows result, * in handling the top/bottom query, which produce more than one rows result,
...@@ -1282,38 +1012,38 @@ static int64_t getNumOfResultLocal(SQLFunctionCtx *pCtx, int32_t numOfExprs) { ...@@ -1282,38 +1012,38 @@ static int64_t getNumOfResultLocal(SQLFunctionCtx *pCtx, int32_t numOfExprs) {
* filled with the same result, which is the tags, specified in group by clause * filled with the same result, which is the tags, specified in group by clause
* *
*/ */
static void fillMultiRowsOfTagsVal(SLocalMerger *pLocalMerge, int32_t numOfRes, int32_t numOfExprs) { //static void fillMultiRowsOfTagsVal(SLocalMerger *pLocalMerge, int32_t numOfRes, int32_t numOfExprs) {
for (int32_t k = 0; k < numOfExprs; ++k) { // for (int32_t k = 0; k < numOfExprs; ++k) {
SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[k]; // SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[k];
if (pCtx->functionId != TSDB_FUNC_TAG) { // if (pCtx->functionId != TSDB_FUNC_TAG) {
continue; // continue;
} // }
//
int32_t inc = numOfRes - 1; // tsdb_func_tag function only produce one row of result // int32_t inc = numOfRes - 1; // tsdb_func_tag function only produce one row of result
memset(pLocalMerge->tagBuf, 0, (size_t)pLocalMerge->tagBufLen); // memset(pLocalMerge->tagBuf, 0, (size_t)pLocalMerge->tagBufLen);
memcpy(pLocalMerge->tagBuf, pCtx->pOutput, (size_t)pCtx->outputBytes); // memcpy(pLocalMerge->tagBuf, pCtx->pOutput, (size_t)pCtx->outputBytes);
//
for (int32_t i = 0; i < inc; ++i) { // for (int32_t i = 0; i < inc; ++i) {
pCtx->pOutput += pCtx->outputBytes; // pCtx->pOutput += pCtx->outputBytes;
memcpy(pCtx->pOutput, pLocalMerge->tagBuf, (size_t)pCtx->outputBytes); // memcpy(pCtx->pOutput, pLocalMerge->tagBuf, (size_t)pCtx->outputBytes);
} // }
} // }
} //}
int32_t finalizeRes(SLocalMerger *pLocalMerge, int32_t numOfExprs) {
for (int32_t k = 0; k < numOfExprs; ++k) {
SQLFunctionCtx* pCtx = &pLocalMerge->pCtx[k];
aAggs[pCtx->functionId].xFinalize(pCtx);
}
pLocalMerge->hasPrevRow = false;
int32_t numOfRes = (int32_t)getNumOfResultLocal(pLocalMerge->pCtx, numOfExprs);
pLocalMerge->pResultBuf->num += numOfRes;
fillMultiRowsOfTagsVal(pLocalMerge, numOfRes, numOfExprs); //int32_t finalizeRes(SLocalMerger *pLocalMerge, int32_t numOfExprs) {
return numOfRes; // for (int32_t k = 0; k < numOfExprs; ++k) {
} // SQLFunctionCtx* pCtx = &pLocalMerge->pCtx[k];
// aAggs[pCtx->functionId].xFinalize(pCtx);
// }
//
// pLocalMerge->hasPrevRow = false;
//
// int32_t numOfRes = (int32_t)getNumOfResultLocal(pLocalMerge->pCtx, numOfExprs);
// pLocalMerge->pResultBuf->num += numOfRes;
//
// fillMultiRowsOfTagsVal(pLocalMerge, numOfRes, numOfExprs);
// return numOfRes;
//}
/* /*
* points merge: * points merge:
...@@ -1322,29 +1052,29 @@ int32_t finalizeRes(SLocalMerger *pLocalMerge, int32_t numOfExprs) { ...@@ -1322,29 +1052,29 @@ int32_t finalizeRes(SLocalMerger *pLocalMerge, int32_t numOfExprs) {
* results generated by simple aggregation function, we merge them all into one points * results generated by simple aggregation function, we merge them all into one points
* *Exception*: column projection query, required no merge procedure * *Exception*: column projection query, required no merge procedure
*/ */
bool needToMerge(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) { //bool needToMerge(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) {
int32_t ret = 0; // merge all result by default // int32_t ret = 0; // merge all result by default
//
int16_t functionId = pLocalMerge->pCtx[0].functionId; // int16_t functionId = pLocalMerge->pCtx[0].functionId;
//
// todo opt performance // // todo opt performance
if ((/*functionId == TSDB_FUNC_PRJ || */functionId == TSDB_FUNC_ARITHM) || (tscIsProjectionQueryOnSTable(pQueryInfo, 0) && pQueryInfo->distinctTag == false)) { // column projection query // if ((/*functionId == TSDB_FUNC_PRJ || */functionId == TSDB_FUNC_ARITHM) || (tscIsProjectionQueryOnSTable(pQueryInfo, 0) && pQueryInfo->distinctTag == false)) { // column projection query
ret = 1; // disable merge procedure // ret = 1; // disable merge procedure
} else { // } else {
tOrderDescriptor *pDesc = pLocalMerge->pDesc; // tOrderDescriptor *pDesc = pLocalMerge->pDesc;
if (pDesc->orderInfo.numOfCols > 0) { // if (pDesc->orderInfo.numOfCols > 0) {
if (pDesc->tsOrder == TSDB_ORDER_ASC) { // asc // if (pDesc->tsOrder == TSDB_ORDER_ASC) { // asc
// todo refactor comparator // // todo refactor comparator
ret = compare_a(pLocalMerge->pDesc, 1, 0, pLocalMerge->prevRowOfInput, 1, 0, tmpBuffer->data); // ret = compare_a(pLocalMerge->pDesc, 1, 0, pLocalMerge->prevRowOfInput, 1, 0, tmpBuffer->data);
} else { // desc // } else { // desc
ret = compare_d(pLocalMerge->pDesc, 1, 0, pLocalMerge->prevRowOfInput, 1, 0, tmpBuffer->data); // ret = compare_d(pLocalMerge->pDesc, 1, 0, pLocalMerge->prevRowOfInput, 1, 0, tmpBuffer->data);
} // }
} // }
} // }
//
/* if ret == 0, means the result belongs to the same group */ // /* if ret == 0, means the result belongs to the same group */
return (ret == 0); // return (ret == 0);
} //}
bool needToMergeRv(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, char **buf) { bool needToMergeRv(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, char **buf) {
int32_t ret = 0; int32_t ret = 0;
...@@ -1356,90 +1086,6 @@ bool needToMergeRv(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, ...@@ -1356,90 +1086,6 @@ bool needToMergeRv(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index,
return (ret == 0); return (ret == 0);
} }
static bool reachGroupResultLimit(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
return (pRes->numOfGroups >= pQueryInfo->slimit.limit && pQueryInfo->slimit.limit >= 0);
}
static bool saveGroupResultInfo(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
if (pRes->numOfRowsGroup > 0) {
pRes->numOfGroups += 1;
}
// the output group is limited by the slimit clause
if (reachGroupResultLimit(pQueryInfo, pRes)) {
return true;
}
// pRes->pGroupRec = realloc(pRes->pGroupRec, pRes->numOfGroups*sizeof(SResRec));
// pRes->pGroupRec[pRes->numOfGroups-1].numOfRows = pRes->numOfRows;
// pRes->pGroupRec[pRes->numOfGroups-1].numOfClauseTotal = pRes->numOfClauseTotal;
return false;
}
/**
*
* @param pSql
* @param pLocalMerge
* @param noMoreCurrentGroupRes
* @return if current group is skipped, return false, and do NOT record it into pRes->numOfGroups
*/
bool genFinalResults(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool noMoreCurrentGroupRes) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SQueryInfo * pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
tFilePage * pResBuf = pLocalMerge->pResultBuf;
SColumnModel *pModel = pLocalMerge->resColModel;
pRes->code = TSDB_CODE_SUCCESS;
/*
* Ignore the output of the current group since this group is skipped by user
* We set the numOfRows to be 0 and discard the possible remain results.
*/
if (pQueryInfo->slimit.offset > 0) {
pRes->numOfRows = 0;
pQueryInfo->slimit.offset -= 1;
pLocalMerge->discard = !noMoreCurrentGroupRes;
if (pLocalMerge->discard) {
SColumnModel *pInternModel = pLocalMerge->pDesc->pColumnModel;
tColModelAppend(pInternModel, pLocalMerge->discardData, pLocalMerge->pTempBuffer->data, 0, 1, 1);
}
return false;
}
tColModelCompact(pModel, pResBuf, pModel->capacity);
if (tscIsSecondStageQuery(pQueryInfo)) {
doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalMerge->finalModel->rowSize);
}
// no interval query, no fill operation
if (pQueryInfo->interval.interval == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) {
genFinalResWithoutFill(pRes, pLocalMerge, pQueryInfo);
} else {
SFillInfo* pFillInfo = pLocalMerge->pFillInfo;
if (pFillInfo != NULL) {
TSKEY ekey = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.ekey: pQueryInfo->window.skey;
taosFillSetStartInfo(pFillInfo, (int32_t)pResBuf->num, ekey);
taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf);
}
doFillResult(pSql, pLocalMerge, noMoreCurrentGroupRes);
}
return true;
}
void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) {// reset output buffer to the beginning void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) {// reset output buffer to the beginning
size_t t = tscSqlExprNumOfExprs(pQueryInfo); size_t t = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < t; ++i) { for (int32_t i = 0; i < t; ++i) {
...@@ -1454,305 +1100,10 @@ void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) {// reset ...@@ -1454,305 +1100,10 @@ void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) {// reset
memset(pLocalMerge->pResultBuf, 0, pLocalMerge->nResultBufSize + sizeof(tFilePage)); memset(pLocalMerge->pResultBuf, 0, pLocalMerge->nResultBufSize + sizeof(tFilePage));
} }
static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalMerger *pLocalMerge) {
// In handling data in other groups, we need to reset the interpolation information for a new group data
pRes->numOfRows = 0;
pRes->numOfRowsGroup = 0;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
pQueryInfo->limit.offset = pLocalMerge->offset;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
// for group result interpolation, do not return if not data is generated
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
TSKEY skey = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.skey:pQueryInfo->window.ekey;//MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey);
int64_t newTime = taosTimeTruncate(skey, &pQueryInfo->interval, tinfo.precision);
taosResetFillInfo(pLocalMerge->pFillInfo, newTime);
}
}
static bool isAllSourcesCompleted(SLocalMerger *pLocalMerge) { static bool isAllSourcesCompleted(SLocalMerger *pLocalMerge) {
return (pLocalMerge->numOfBuffer == pLocalMerge->numOfCompleted); return (pLocalMerge->numOfBuffer == pLocalMerge->numOfCompleted);
} }
static bool doBuildFilledResultForGroup(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SLocalMerger *pLocalMerge = pRes->pLocalMerger;
SFillInfo *pFillInfo = pLocalMerge->pFillInfo;
if (pFillInfo != NULL && taosFillHasMoreResults(pFillInfo)) {
assert(pQueryInfo->fillType != TSDB_FILL_NONE);
tFilePage *pFinalDataBuf = pLocalMerge->pResultBuf;
int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pFillInfo->numOfRows - 1));
// the first column must be the timestamp column
int32_t rows = (int32_t) getNumOfResultsAfterFillGap(pFillInfo, etime, pLocalMerge->resColModel->capacity);
if (rows > 0) { // do fill gap
doFillResult(pSql, pLocalMerge, false);
}
return true;
} else {
return false;
}
}
static bool doHandleLastRemainData(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SLocalMerger *pLocalMerge = pRes->pLocalMerger;
SFillInfo *pFillInfo = pLocalMerge->pFillInfo;
bool prevGroupCompleted = (!pLocalMerge->discard) && pLocalMerge->hasUnprocessedRow;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
if ((isAllSourcesCompleted(pLocalMerge) && !pLocalMerge->hasPrevRow) || pLocalMerge->pLocalDataSrc[0] == NULL ||
prevGroupCompleted) {
// if fillType == TSDB_FILL_NONE, return directly
if (pQueryInfo->fillType != TSDB_FILL_NONE &&
((pRes->numOfRowsGroup < pQueryInfo->limit.limit && pQueryInfo->limit.limit > 0) || (pQueryInfo->limit.limit < 0))) {
int64_t etime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.ekey : pQueryInfo->window.skey;
int32_t rows = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, etime, pLocalMerge->resColModel->capacity);
if (rows > 0) {
doFillResult(pSql, pLocalMerge, true);
}
}
/*
* 1. numOfRows == 0, means no interpolation results are generated.
* 2. if all local data sources are consumed, and no un-processed rows exist.
*
* No results will be generated and query completed.
*/
if (pRes->numOfRows > 0 || (isAllSourcesCompleted(pLocalMerge) && (!pLocalMerge->hasUnprocessedRow))) {
return true;
}
// start to process result for a new group and save the result info of previous group
if (saveGroupResultInfo(pSql)) {
return true;
}
resetEnvForNewResultset(pRes, pCmd, pLocalMerge);
}
return false;
}
static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SLocalMerger *pLocalMerge = pRes->pLocalMerger;
SQueryInfo * pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t k = 0; k < size; ++k) {
SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[k];
pCtx->pOutput += pCtx->outputBytes * numOfRes;
// set the correct output timestamp column position
if (pCtx->functionId == TSDB_FUNC_TOP || pCtx->functionId == TSDB_FUNC_BOTTOM) {
pCtx->ptsOutputBuf = ((char *)pCtx->ptsOutputBuf + TSDB_KEYSIZE * numOfRes);
}
}
doExecuteFinalMerge(pLocalMerge, size, true);
}
int32_t tscDoLocalMerge(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
tscResetForNextRetrieve(pRes);
assert(pSql->signature == pSql);
if (pRes->pLocalMerger == NULL) { // all data has been processed
if (pRes->code == TSDB_CODE_SUCCESS) {
return pRes->code;
}
tscError("%p local merge abort due to error occurs, code:%s", pSql, tstrerror(pRes->code));
return pRes->code;
}
SLocalMerger *pLocalMerge = pRes->pLocalMerger;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
tFilePage *tmpBuffer = pLocalMerge->pTempBuffer;
int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
if (doHandleLastRemainData(pSql)) {
return TSDB_CODE_SUCCESS;
}
if (doBuildFilledResultForGroup(pSql)) {
return TSDB_CODE_SUCCESS;
}
SLoserTreeInfo *pTree = pLocalMerge->pLoserTree;
// clear buffer
handleUnprocessedRow(pCmd, pLocalMerge, tmpBuffer);
SColumnModel *pModel = pLocalMerge->pDesc->pColumnModel;
while (1) {
if (isAllSourcesCompleted(pLocalMerge)) {
break;
}
#ifdef _DEBUG_VIEW
printf("chosen data in pTree[0] = %d\n", pTree->pNode[0].index);
#endif
assert((pTree->pNode[0].index < pLocalMerge->numOfBuffer) && (pTree->pNode[0].index >= 0) && tmpBuffer->num == 0);
// chosen from loser tree
SLocalDataSource *pOneDataSrc = pLocalMerge->pLocalDataSrc[pTree->pNode[0].index];
tColModelAppend(pModel, tmpBuffer, pOneDataSrc->filePage.data, pOneDataSrc->rowIdx, 1,
pOneDataSrc->pMemBuffer->pColumnModel->capacity);
#if defined(_DEBUG_VIEW)
printf("chosen row:\t");
SSrcColumnInfo colInfo[256] = {0};
tscGetSrcColumnInfo(colInfo, pQueryInfo);
tColModelDisplayEx(pModel, tmpBuffer->data, tmpBuffer->num, pModel->capacity, colInfo);
#endif
if (pLocalMerge->discard) {
assert(pLocalMerge->hasUnprocessedRow == false);
/* current record belongs to the same group of previous record, need to discard it */
if (isSameGroup(pCmd, pLocalMerge, pLocalMerge->discardData->data, tmpBuffer)) {
tmpBuffer->num = 0;
pOneDataSrc->rowIdx += 1;
adjustLoserTreeFromNewData(pLocalMerge, pOneDataSrc, pTree);
// all inputs are exhausted, abort current process
if (isAllSourcesCompleted(pLocalMerge)) {
break;
}
// data belongs to the same group needs to be discarded
continue;
} else {
pLocalMerge->discard = false;
pLocalMerge->discardData->num = 0;
if (saveGroupResultInfo(pSql)) {
return TSDB_CODE_SUCCESS;
}
resetEnvForNewResultset(pRes, pCmd, pLocalMerge);
}
}
if (pLocalMerge->hasPrevRow) {
if (needToMerge(pQueryInfo, pLocalMerge, tmpBuffer)) {
// belong to the group of the previous row, continue process it
doExecuteFinalMerge(pLocalMerge, numOfExprs, false);
// copy to buffer
savePreviousRow(pLocalMerge, tmpBuffer);
} else {
/*
* current row does not belong to the group of previous row.
* so the processing of previous group is completed.
*/
int32_t numOfRes = finalizeRes(pLocalMerge, numOfExprs);
bool sameGroup = isSameGroup(pCmd, pLocalMerge, pLocalMerge->prevRowOfInput, tmpBuffer);
tFilePage *pResBuf = pLocalMerge->pResultBuf;
/*
* if the previous group does NOT generate any result (pResBuf->num == 0),
* continue to process results instead of return results.
*/
if ((!sameGroup && pResBuf->num > 0) || (pResBuf->num == pLocalMerge->resColModel->capacity)) {
// does not belong to the same group
bool notSkipped = genFinalResults(pSql, pLocalMerge, !sameGroup);
// this row needs to discard, since it belongs to the group of previous
if (pLocalMerge->discard && sameGroup) {
pLocalMerge->hasUnprocessedRow = false;
tmpBuffer->num = 0;
} else { // current row does not belongs to the previous group, so it is not be handled yet.
pLocalMerge->hasUnprocessedRow = true;
}
resetOutputBuf(pQueryInfo, pLocalMerge);
pOneDataSrc->rowIdx += 1;
// here we do not check the return value
adjustLoserTreeFromNewData(pLocalMerge, pOneDataSrc, pTree);
if (pRes->numOfRows == 0) {
handleUnprocessedRow(pCmd, pLocalMerge, tmpBuffer);
if (!sameGroup) {
/*
* previous group is done, prepare for the next group
* If previous group is not skipped, keep it in pRes->numOfGroups
*/
if (notSkipped && saveGroupResultInfo(pSql)) {
return TSDB_CODE_SUCCESS;
}
resetEnvForNewResultset(pRes, pCmd, pLocalMerge);
}
} else {
/*
* if next record belongs to a new group, we do not handle this record here.
* We start the process in a new round.
*/
if (sameGroup) {
handleUnprocessedRow(pCmd, pLocalMerge, tmpBuffer);
}
}
// current group has no result,
if (pRes->numOfRows == 0) {
continue;
} else {
return TSDB_CODE_SUCCESS;
}
} else { // result buffer is not full
doProcessResultInNextWindow(pSql, numOfRes);
savePreviousRow(pLocalMerge, tmpBuffer);
}
}
} else {
doExecuteFinalMerge(pLocalMerge, numOfExprs,true);
savePreviousRow(pLocalMerge, tmpBuffer); // copy the processed row to buffer
}
pOneDataSrc->rowIdx += 1;
adjustLoserTreeFromNewData(pLocalMerge, pOneDataSrc, pTree);
}
if (pLocalMerge->hasPrevRow) {
finalizeRes(pLocalMerge, numOfExprs);
}
if (pLocalMerge->pResultBuf->num) {
genFinalResults(pSql, pLocalMerge, true);
}
return TSDB_CODE_SUCCESS;
}
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen) { void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen) {
SSqlRes *pRes = &pObj->res; SSqlRes *pRes = &pObj->res;
if (pRes->pLocalMerger != NULL) { if (pRes->pLocalMerger != NULL) {
...@@ -1823,7 +1174,6 @@ int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_ ...@@ -1823,7 +1174,6 @@ int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_
#define COLMODEL_GET_VAL(data, schema, allrow, rowId, colId) \ #define COLMODEL_GET_VAL(data, schema, allrow, rowId, colId) \
(data + (schema)->pFields[colId].offset * (allrow) + (rowId) * (schema)->pFields[colId].field.bytes) (data + (schema)->pFields[colId].offset * (allrow) + (rowId) * (schema)->pFields[colId].field.bytes)
static void appendOneRowToDataBlock(SSDataBlock *pBlock, char *buf, SColumnModel *pModel, int32_t rowIndex, static void appendOneRowToDataBlock(SSDataBlock *pBlock, char *buf, SColumnModel *pModel, int32_t rowIndex,
int32_t maxRows) { int32_t maxRows) {
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
...@@ -2116,14 +1466,14 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -2116,14 +1466,14 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
} }
SSDataBlock* doSLimit(void* param, bool* newgroup) { SSDataBlock* doSLimit(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*)param; SOperatorInfo *pOperator = (SOperatorInfo *)param;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
SSLimitOperatorInfo *pInfo = pOperator->info; SSLimitOperatorInfo *pInfo = pOperator->info;
SSDataBlock* pBlock = NULL; SSDataBlock *pBlock = NULL;
while (1) { while (1) {
pBlock = skipGroupBlock(pOperator, newgroup); pBlock = skipGroupBlock(pOperator, newgroup);
if (pBlock == NULL) { if (pBlock == NULL) {
...@@ -2163,90 +1513,9 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) { ...@@ -2163,90 +1513,9 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) {
} }
} }
/*
if (!pInfo->hasPrev) {
pInfo->groupTotal = 1;
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev);
} else {
bool sameGroup = isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow);
if (!sameGroup) { // reset info for new group data
pInfo->rowsTotal = 0;
pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev);
} else { // data in current group has reached the limit, ignore the remain data of this group
if (pInfo->limit.limit > 0 && (pInfo->rowsTotal >= pInfo->limit.limit)) {
continue;
}
}
}
*/
// if (pInfo->currentGroupOffset == 0) {
// if (pInfo->currentOffset == 0) { // TODO refactor
// break;
// } else if (pInfo->currentOffset >= pBlock->info.rows) {
// pInfo->currentOffset -= pBlock->info.rows;
// } else {
// int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset);
// pBlock->info.rows = remain;
//
// // move the remain rows of this data block to the front.
// for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
// SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
//
// int16_t bytes = pColInfoData->info.bytes;
// memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes);
// }
//
// pInfo->currentOffset = 0;
// break;
// }
// } else {
// if (pInfo->hasPrev) {
// // Check if current data block belongs to current result group or not
// bool sameGroup = isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow);
// if (sameGroup) {
// continue; // ignore the data block of the same group and try next
// } else {
// //update the group column data by using the current group.
// savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev);
//
// pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group
// pInfo->rowsTotal = 0;
//
// if ((--pInfo->currentGroupOffset) == 0) {
// if (pInfo->currentOffset == 0) { // TODO refactor
// break;
// } else if (pInfo->currentOffset >= pBlock->info.rows) {
// pInfo->currentOffset -= pBlock->info.rows;
// } else {
// int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset);
// pBlock->info.rows = remain;
//
// // move the remain rows of this data block to the front.
// for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
// SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
//
// int16_t bytes = pColInfoData->info.bytes;
// memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes);
// }
//
// pInfo->currentOffset = 0;
// break;
// }
// }
// }
// } else {
// savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev);
// }
// }
// }
// if (!pInfo->hasPrev || !isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow)) {
// pInfo->groupTotal += 1;
if (pInfo->slimit.limit > 0 && pInfo->groupTotal > pInfo->slimit.limit) { // reach the group limit, abort if (pInfo->slimit.limit > 0 && pInfo->groupTotal > pInfo->slimit.limit) { // reach the group limit, abort
return NULL; return NULL;
} }
// }
if (pInfo->limit.limit > 0 && (pInfo->rowsTotal + pBlock->info.rows >= pInfo->limit.limit)) { if (pInfo->limit.limit > 0 && (pInfo->rowsTotal + pBlock->info.rows >= pInfo->limit.limit)) {
pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->rowsTotal); pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->rowsTotal);
...@@ -2256,11 +1525,10 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) { ...@@ -2256,11 +1525,10 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
// setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); // setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
} else { } else {
pInfo->rowsTotal += pBlock->info.rows; pInfo->rowsTotal += pBlock->info.rows;
} }
return pBlock; return pBlock;
} }
...@@ -85,15 +85,15 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool ...@@ -85,15 +85,15 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool
static int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd); static int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd);
static int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode); static int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode);
static int32_t parseIntervalOffset(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* offsetToken); static int32_t parseIntervalOffset(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* offsetToken);
static int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* pSliding); static int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* pSliding);
static int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem); static int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem);
static int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql); static int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql);
static int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode); static int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode);
static int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode, SSchema* pSchema); static int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, SSchema* pSchema);
static int32_t tsRewriteFieldNameIfNecessary(SSqlCmd* pCmd, SQueryInfo* pQueryInfo); static int32_t tsRewriteFieldNameIfNecessary(SSqlCmd* pCmd, SQueryInfo* pQueryInfo);
static int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo); static int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo);
...@@ -110,14 +110,14 @@ static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField); ...@@ -110,14 +110,14 @@ static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField);
static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo); static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo);
static bool hasNormalColumnFilter(SQueryInfo* pQueryInfo); static bool hasNormalColumnFilter(SQueryInfo* pQueryInfo);
static int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t index, SQuerySqlNode* pQuerySqlNode, SSqlObj* pSql); static int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t index, SSqlNode* pSqlNode, SSqlObj* pSql);
static int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDbInfo* pCreateDbSql); static int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDbInfo* pCreateDbSql);
static int32_t getColumnIndexByName(SSqlCmd* pCmd, const SStrToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex); static int32_t getColumnIndexByName(SSqlCmd* pCmd, const SStrToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
static int32_t getTableIndexByName(SStrToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex); static int32_t getTableIndexByName(SStrToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
static int32_t getTableIndexImpl(SStrToken* pTableToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex); static int32_t getTableIndexImpl(SStrToken* pTableToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
static int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo); static int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo);
static int32_t doLocalQueryProcess(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode); static int32_t doLocalQueryProcess(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode);
static int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCreateDbMsg* pCreate); static int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCreateDbMsg* pCreate);
static SColumnList createColumnList(int32_t num, int16_t tableIndex, int32_t columnIndex); static SColumnList createColumnList(int32_t num, int16_t tableIndex, int32_t columnIndex);
...@@ -125,7 +125,7 @@ static SColumnList createColumnList(int32_t num, int16_t tableIndex, int32_t col ...@@ -125,7 +125,7 @@ static SColumnList createColumnList(int32_t num, int16_t tableIndex, int32_t col
static int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSqlInfo* pInfo); static int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSqlInfo* pInfo);
static int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo); static int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo);
static int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo); static int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo);
static int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t index); static int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index);
static int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQueryInfo* pQueryInfo, SArray* pCols, uint64_t *uid); static int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQueryInfo* pQueryInfo, SArray* pCols, uint64_t *uid);
static bool validateDebugFlag(int32_t v); static bool validateDebugFlag(int32_t v);
static int32_t checkQueryRangeForFill(SSqlCmd* pCmd, SQueryInfo* pQueryInfo); static int32_t checkQueryRangeForFill(SSqlCmd* pCmd, SQueryInfo* pQueryInfo);
...@@ -617,7 +617,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -617,7 +617,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
case TSDB_SQL_SELECT: { case TSDB_SQL_SELECT: {
const char* msg1 = "columns in select clause not identical"; const char* msg1 = "columns in select clause not identical";
for (int32_t i = pCmd->numOfClause; i < pInfo->subclauseInfo.numOfClause; ++i) { size_t size = taosArrayGetSize(pInfo->list);
for (int32_t i = pCmd->numOfClause; i < size; ++i) {
SQueryInfo* p = tscGetQueryInfoS(pCmd, i); SQueryInfo* p = tscGetQueryInfoS(pCmd, i);
if (p == NULL) { if (p == NULL) {
pRes->code = terrno; pRes->code = terrno;
...@@ -625,11 +626,11 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -625,11 +626,11 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
} }
} }
assert(pCmd->numOfClause == pInfo->subclauseInfo.numOfClause); assert(pCmd->numOfClause == size);
for (int32_t i = pCmd->clauseIndex; i < pInfo->subclauseInfo.numOfClause; ++i) { for (int32_t i = pCmd->clauseIndex; i < size; ++i) {
SQuerySqlNode* pQuerySqlNode = pInfo->subclauseInfo.pClause[i]; SSqlNode* pSqlNode = taosArrayGetP(pInfo->list, i);
tscTrace("%p start to parse %dth subclause, total:%d", pSql, i, pInfo->subclauseInfo.numOfClause); tscTrace("%p start to parse %dth subclause, total:%"PRId64, pSql, i, size);
if ((code = validateSqlNode(pSql, pQuerySqlNode, i)) != TSDB_CODE_SUCCESS) { if ((code = validateSqlNode(pSql, pSqlNode, i)) != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -784,7 +785,7 @@ static int32_t checkInvalidExprForTimeWindow(SSqlCmd* pCmd, SQueryInfo* pQueryIn ...@@ -784,7 +785,7 @@ static int32_t checkInvalidExprForTimeWindow(SSqlCmd* pCmd, SQueryInfo* pQueryIn
return addPrimaryTsColumnForTimeWindowQuery(pQueryInfo); return addPrimaryTsColumnForTimeWindowQuery(pQueryInfo);
} }
int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode) { int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode) {
const char* msg2 = "interval cannot be less than 10 ms"; const char* msg2 = "interval cannot be less than 10 ms";
const char* msg3 = "sliding cannot be used without interval"; const char* msg3 = "sliding cannot be used without interval";
...@@ -793,8 +794,8 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySqlNod ...@@ -793,8 +794,8 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySqlNod
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
if (!TPARSER_HAS_TOKEN(pQuerySqlNode->interval.interval)) { if (!TPARSER_HAS_TOKEN(pSqlNode->interval.interval)) {
if (TPARSER_HAS_TOKEN(pQuerySqlNode->sliding)) { if (TPARSER_HAS_TOKEN(pSqlNode->sliding)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
} }
...@@ -807,7 +808,7 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySqlNod ...@@ -807,7 +808,7 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySqlNod
} }
// interval is not null // interval is not null
SStrToken *t = &pQuerySqlNode->interval.interval; SStrToken *t = &pSqlNode->interval.interval;
if (parseNatualDuration(t->z, t->n, &pQueryInfo->interval.interval, &pQueryInfo->interval.intervalUnit) != TSDB_CODE_SUCCESS) { if (parseNatualDuration(t->z, t->n, &pQueryInfo->interval.interval, &pQueryInfo->interval.intervalUnit) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
...@@ -824,11 +825,11 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySqlNod ...@@ -824,11 +825,11 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySqlNod
} }
} }
if (parseIntervalOffset(pCmd, pQueryInfo, &pQuerySqlNode->interval.offset) != TSDB_CODE_SUCCESS) { if (parseIntervalOffset(pCmd, pQueryInfo, &pSqlNode->interval.offset) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
if (parseSlidingClause(pCmd, pQueryInfo, &pQuerySqlNode->sliding) != TSDB_CODE_SUCCESS) { if (parseSlidingClause(pCmd, pQueryInfo, &pSqlNode->sliding) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
...@@ -836,19 +837,19 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySqlNod ...@@ -836,19 +837,19 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySqlNod
return checkInvalidExprForTimeWindow(pCmd, pQueryInfo); return checkInvalidExprForTimeWindow(pCmd, pQueryInfo);
} }
int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode * pQuerySqlNode) { int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode * pSqlNode) {
const char* msg1 = "gap should be fixed time window"; const char* msg1 = "gap should be fixed time window";
const char* msg2 = "only one type time window allowed"; const char* msg2 = "only one type time window allowed";
const char* msg3 = "invalid column name"; const char* msg3 = "invalid column name";
const char* msg4 = "invalid time window"; const char* msg4 = "invalid time window";
// no session window // no session window
if (!TPARSER_HAS_TOKEN(pQuerySqlNode->sessionVal.gap)) { if (!TPARSER_HAS_TOKEN(pSqlNode->sessionVal.gap)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SStrToken* col = &pQuerySqlNode->sessionVal.col; SStrToken* col = &pSqlNode->sessionVal.col;
SStrToken* gap = &pQuerySqlNode->sessionVal.gap; SStrToken* gap = &pSqlNode->sessionVal.gap;
char timeUnit = 0; char timeUnit = 0;
if (parseNatualDuration(gap->z, gap->n, &pQueryInfo->sessionWindow.gap, &timeUnit) != TSDB_CODE_SUCCESS) { if (parseNatualDuration(gap->z, gap->n, &pQueryInfo->sessionWindow.gap, &timeUnit) != TSDB_CODE_SUCCESS) {
...@@ -4714,9 +4715,9 @@ int32_t tsRewriteFieldNameIfNecessary(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { ...@@ -4714,9 +4715,9 @@ int32_t tsRewriteFieldNameIfNecessary(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode) { int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode) {
SArray* pFillToken = pQuerySqlNode->fillType; SArray* pFillToken = pSqlNode->fillType;
if (pQuerySqlNode->fillType == NULL) { if (pSqlNode->fillType == NULL) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -4854,7 +4855,7 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) { ...@@ -4854,7 +4855,7 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) {
} }
} }
int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode, SSchema* pSchema) { int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, SSchema* pSchema) {
const char* msg0 = "only support order by primary timestamp"; const char* msg0 = "only support order by primary timestamp";
const char* msg1 = "invalid column name"; const char* msg1 = "invalid column name";
const char* msg2 = "order by primary timestamp or first tag in groupby clause allowed"; const char* msg2 = "order by primary timestamp or first tag in groupby clause allowed";
...@@ -4869,11 +4870,11 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode ...@@ -4869,11 +4870,11 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode
pQueryInfo->order.orderColId = 0; pQueryInfo->order.orderColId = 0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pQuerySqlNode->pSortOrder == NULL) { if (pSqlNode->pSortOrder == NULL) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SArray* pSortorder = pQuerySqlNode->pSortOrder; SArray* pSortorder = pSqlNode->pSortOrder;
/* /*
* for table query, there is only one or none order option is allowed, which is the * for table query, there is only one or none order option is allowed, which is the
...@@ -4941,7 +4942,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode ...@@ -4941,7 +4942,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode
if (orderByTags) { if (orderByTags) {
pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
tVariantListItem* p1 = taosArrayGet(pQuerySqlNode->pSortOrder, 0); tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->groupbyExpr.orderType = p1->sortOrder; pQueryInfo->groupbyExpr.orderType = p1->sortOrder;
} else if (isTopBottomQuery(pQueryInfo)) { } else if (isTopBottomQuery(pQueryInfo)) {
/* order of top/bottom query in interval is not valid */ /* order of top/bottom query in interval is not valid */
...@@ -4953,12 +4954,12 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode ...@@ -4953,12 +4954,12 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
} }
tVariantListItem* p1 = taosArrayGet(pQuerySqlNode->pSortOrder, 0); tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->order.order = p1->sortOrder; pQueryInfo->order.order = p1->sortOrder;
pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
tVariantListItem* p1 = taosArrayGet(pQuerySqlNode->pSortOrder, 0); tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->order.order = p1->sortOrder; pQueryInfo->order.order = p1->sortOrder;
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
...@@ -4971,7 +4972,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode ...@@ -4971,7 +4972,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode
} }
if (s == 2) { if (s == 2) {
tVariantListItem *pItem = taosArrayGet(pQuerySqlNode->pSortOrder, 0); tVariantListItem *pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
if (orderByTags) { if (orderByTags) {
pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
pQueryInfo->groupbyExpr.orderType = pItem->sortOrder; pQueryInfo->groupbyExpr.orderType = pItem->sortOrder;
...@@ -4980,7 +4981,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode ...@@ -4980,7 +4981,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
} }
pItem = taosArrayGet(pQuerySqlNode->pSortOrder, 1); pItem = taosArrayGet(pSqlNode->pSortOrder, 1);
tVariant* pVar2 = &pItem->pVar; tVariant* pVar2 = &pItem->pVar;
SStrToken cname = {pVar2->nLen, pVar2->nType, pVar2->pz}; SStrToken cname = {pVar2->nLen, pVar2->nType, pVar2->pz};
if (getColumnIndexByName(pCmd, &cname, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { if (getColumnIndexByName(pCmd, &cname, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
...@@ -5015,13 +5016,13 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode ...@@ -5015,13 +5016,13 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
} }
tVariantListItem* pItem = taosArrayGet(pQuerySqlNode->pSortOrder, 0); tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->order.order = pItem->sortOrder; pQueryInfo->order.order = pItem->sortOrder;
pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
tVariantListItem* pItem = taosArrayGet(pQuerySqlNode->pSortOrder, 0); tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->order.order = pItem->sortOrder; pQueryInfo->order.order = pItem->sortOrder;
} }
...@@ -5553,7 +5554,7 @@ bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo) { ...@@ -5553,7 +5554,7 @@ bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo) {
return !(pQueryInfo->window.skey != pQueryInfo->window.ekey && pQueryInfo->interval.interval == 0); return !(pQueryInfo->window.skey != pQueryInfo->window.ekey && pQueryInfo->interval.interval == 0);
} }
int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySqlNode* pQuerySqlNode, SSqlObj* pSql) { int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIndex, SSqlNode* pSqlNode, SSqlObj* pSql) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
const char* msg0 = "soffset/offset can not be less than 0"; const char* msg0 = "soffset/offset can not be less than 0";
...@@ -5561,9 +5562,9 @@ int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseI ...@@ -5561,9 +5562,9 @@ int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseI
const char* msg2 = "slimit/soffset can not apply to projection query"; const char* msg2 = "slimit/soffset can not apply to projection query";
// handle the limit offset value, validate the limit // handle the limit offset value, validate the limit
pQueryInfo->limit = pQuerySqlNode->limit; pQueryInfo->limit = pSqlNode->limit;
pQueryInfo->clauseLimit = pQueryInfo->limit.limit; pQueryInfo->clauseLimit = pQueryInfo->limit.limit;
pQueryInfo->slimit = pQuerySqlNode->slimit; pQueryInfo->slimit = pSqlNode->slimit;
tscDebug("%p limit:%" PRId64 ", offset:%" PRId64 " slimit:%" PRId64 ", soffset:%" PRId64, pSql, pQueryInfo->limit.limit, tscDebug("%p limit:%" PRId64 ", offset:%" PRId64 " slimit:%" PRId64 ", soffset:%" PRId64, pSql, pQueryInfo->limit.limit,
pQueryInfo->limit.offset, pQueryInfo->slimit.limit, pQueryInfo->slimit.offset); pQueryInfo->limit.offset, pQueryInfo->slimit.limit, pQueryInfo->slimit.offset);
...@@ -6214,12 +6215,12 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { ...@@ -6214,12 +6215,12 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
return checkUpdateTagPrjFunctions(pQueryInfo, pCmd); return checkUpdateTagPrjFunctions(pQueryInfo, pCmd);
} }
} }
int32_t doLocalQueryProcess(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySqlNode* pQuerySqlNode) { int32_t doLocalQueryProcess(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode) {
const char* msg1 = "only one expression allowed"; const char* msg1 = "only one expression allowed";
const char* msg2 = "invalid expression in select clause"; const char* msg2 = "invalid expression in select clause";
const char* msg3 = "invalid function"; const char* msg3 = "invalid function";
SArray* pExprList = pQuerySqlNode->pSelNodeList; SArray* pExprList = pSqlNode->pSelNodeList;
size_t size = taosArrayGetSize(pExprList); size_t size = taosArrayGetSize(pExprList);
if (size != 1) { if (size != 1) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
...@@ -6680,18 +6681,18 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { ...@@ -6680,18 +6681,18 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
// if sql specifies db, use it, otherwise use default db // if sql specifies db, use it, otherwise use default db
SStrToken* pName = &(pCreateTable->name); SStrToken* pName = &(pCreateTable->name);
SQuerySqlNode* pQuerySqlNode = pCreateTable->pSelect; SSqlNode* pSqlNode = pCreateTable->pSelect;
if (tscValidateName(pName) != TSDB_CODE_SUCCESS) { if (tscValidateName(pName) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
} }
SFromInfo* pFromInfo = pInfo->pCreateTableInfo->pSelect->from; SRelationInfo* pFromInfo = pInfo->pCreateTableInfo->pSelect->from;
if (pFromInfo == NULL || taosArrayGetSize(pFromInfo->tableList) == 0) { if (pFromInfo == NULL || taosArrayGetSize(pFromInfo->list) == 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
} }
STableNamePair* p1 = taosArrayGet(pFromInfo->tableList, 0); STableNamePair* p1 = taosArrayGet(pFromInfo->list, 0);
SStrToken srcToken = {.z = p1->name.z, .n = p1->name.n, .type = TK_STRING}; SStrToken srcToken = {.z = p1->name.z, .n = p1->name.n, .type = TK_STRING};
if (tscValidateName(&srcToken) != TSDB_CODE_SUCCESS) { if (tscValidateName(&srcToken) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
...@@ -6708,18 +6709,18 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { ...@@ -6708,18 +6709,18 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
} }
bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
if (validateSelectNodeList(&pSql->cmd, 0, pQueryInfo, pQuerySqlNode->pSelNodeList, isSTable, false, false) != TSDB_CODE_SUCCESS) { if (validateSelectNodeList(&pSql->cmd, 0, pQueryInfo, pSqlNode->pSelNodeList, isSTable, false, false) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
if (pQuerySqlNode->pWhere != NULL) { // query condition in stream computing if (pSqlNode->pWhere != NULL) { // query condition in stream computing
if (validateWhereNode(pQueryInfo, &pQuerySqlNode->pWhere, pSql) != TSDB_CODE_SUCCESS) { if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
} }
// set interval value // set interval value
if (validateIntervalNode(pSql, pQueryInfo, pQuerySqlNode) != TSDB_CODE_SUCCESS) { if (validateIntervalNode(pSql, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
...@@ -6737,7 +6738,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { ...@@ -6737,7 +6738,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return code; return code;
} }
if (pQuerySqlNode->sqlstr.n > TSDB_MAX_SAVED_SQL_LEN) { if (pSqlNode->sqlstr.n > TSDB_MAX_SAVED_SQL_LEN) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
} }
...@@ -6755,12 +6756,12 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { ...@@ -6755,12 +6756,12 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
* check if fill operation is available, the fill operation is parsed and executed during query execution, * check if fill operation is available, the fill operation is parsed and executed during query execution,
* not here. * not here.
*/ */
if (pQuerySqlNode->fillType != NULL) { if (pSqlNode->fillType != NULL) {
if (pQueryInfo->interval.interval == 0) { if (pQueryInfo->interval.interval == 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
} }
tVariantListItem* pItem = taosArrayGet(pQuerySqlNode->fillType, 0); tVariantListItem* pItem = taosArrayGet(pSqlNode->fillType, 0);
if (pItem->pVar.nType == TSDB_DATA_TYPE_BINARY) { if (pItem->pVar.nType == TSDB_DATA_TYPE_BINARY) {
if (!((strncmp(pItem->pVar.pz, "none", 4) == 0 && pItem->pVar.nLen == 4) || if (!((strncmp(pItem->pVar.pz, "none", 4) == 0 && pItem->pVar.nLen == 4) ||
(strncmp(pItem->pVar.pz, "null", 4) == 0 && pItem->pVar.nLen == 4))) { (strncmp(pItem->pVar.pz, "null", 4) == 0 && pItem->pVar.nLen == 4))) {
...@@ -6809,7 +6810,7 @@ int32_t checkQueryRangeForFill(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { ...@@ -6809,7 +6810,7 @@ int32_t checkQueryRangeForFill(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t doLoadAllTableMeta(SSqlObj* pSql, int32_t index, SQuerySqlNode* pQuerySqlNode, int32_t numOfTables) { static int32_t doLoadAllTableMeta(SSqlObj* pSql, int32_t index, SSqlNode* pSqlNode, int32_t numOfTables) {
const char* msg1 = "invalid table name"; const char* msg1 = "invalid table name";
const char* msg2 = "invalid table alias name"; const char* msg2 = "invalid table alias name";
const char* msg3 = "alias name too long"; const char* msg3 = "alias name too long";
...@@ -6824,7 +6825,7 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, int32_t index, SQuerySqlNode* p ...@@ -6824,7 +6825,7 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, int32_t index, SQuerySqlNode* p
tscAddEmptyMetaInfo(pQueryInfo); tscAddEmptyMetaInfo(pQueryInfo);
} }
STableNamePair *item = taosArrayGet(pQuerySqlNode->from->tableList, i); STableNamePair *item = taosArrayGet(pSqlNode->from->list, i);
SStrToken *oriName = &item->name; SStrToken *oriName = &item->name;
if (oriName->type == TK_INTEGER || oriName->type == TK_FLOAT) { if (oriName->type == TK_INTEGER || oriName->type == TK_FLOAT) {
...@@ -6897,8 +6898,8 @@ static STableMeta* extractTempTableMetaFromNestQuery(SQueryInfo* pUpstream) { ...@@ -6897,8 +6898,8 @@ static STableMeta* extractTempTableMetaFromNestQuery(SQueryInfo* pUpstream) {
return meta; return meta;
} }
int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t index) { int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) {
assert(pQuerySqlNode != NULL && (pQuerySqlNode->from == NULL || taosArrayGetSize(pQuerySqlNode->from->tableList) > 0)); assert(pSqlNode != NULL && (pSqlNode->from == NULL || taosArrayGetSize(pSqlNode->from->list) > 0));
const char* msg1 = "point interpolation query needs timestamp"; const char* msg1 = "point interpolation query needs timestamp";
const char* msg2 = "too many tables in from clause"; const char* msg2 = "too many tables in from clause";
...@@ -6923,15 +6924,18 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind ...@@ -6923,15 +6924,18 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind
* select client_version(); * select client_version();
* select server_state(); * select server_state();
*/ */
if (pQuerySqlNode->from == NULL) { if (pSqlNode->from == NULL) {
assert(pQuerySqlNode->fillType == NULL && pQuerySqlNode->pGroupby == NULL && pQuerySqlNode->pWhere == NULL && assert(pSqlNode->fillType == NULL && pSqlNode->pGroupby == NULL && pSqlNode->pWhere == NULL &&
pQuerySqlNode->pSortOrder == NULL); pSqlNode->pSortOrder == NULL);
return doLocalQueryProcess(pCmd, pQueryInfo, pQuerySqlNode); return doLocalQueryProcess(pCmd, pQueryInfo, pSqlNode);
} }
if (pQuerySqlNode->from->type == SQL_NODE_FROM_SUBQUERY) { if (pSqlNode->from->type == SQL_NODE_FROM_SUBQUERY) {
// parse the subquery in the first place // parse the subquery in the first place
code = validateSqlNode(pSql, pQuerySqlNode->from->pNode.pClause[0], 0); SArray* list = taosArrayGetP(pSqlNode->from->list, 0);
SSqlNode* p = taosArrayGetP(list, 0);
code = validateSqlNode(pSql, p, 0);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return code; return code;
} }
...@@ -6949,24 +6953,25 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind ...@@ -6949,24 +6953,25 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind
current->pTableMetaInfo = calloc(1, POINTER_BYTES); current->pTableMetaInfo = calloc(1, POINTER_BYTES);
current->pTableMetaInfo[0] = pTableMetaInfo1; current->pTableMetaInfo[0] = pTableMetaInfo1;
current->numOfTables = 1;
pCmd->pQueryInfo[0] = current; pCmd->pQueryInfo[0] = current;
pQueryInfo->pDownstream = current; pQueryInfo->pDownstream = current;
if (validateSelectNodeList(pCmd, index, current, pQuerySqlNode->pSelNodeList, false, false, false) != TSDB_CODE_SUCCESS) { if (validateSelectNodeList(pCmd, index, current, pSqlNode->pSelNodeList, false, false, false) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
} else { } else {
pQueryInfo->command = TSDB_SQL_SELECT; pQueryInfo->command = TSDB_SQL_SELECT;
size_t fromSize = taosArrayGetSize(pQuerySqlNode->from->tableList); size_t fromSize = taosArrayGetSize(pSqlNode->from->list);
if (fromSize > TSDB_MAX_JOIN_TABLE_NUM) { if (fromSize > TSDB_MAX_JOIN_TABLE_NUM) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
} }
// set all query tables, which are maybe more than one. // set all query tables, which are maybe more than one.
code = doLoadAllTableMeta(pSql, index, pQuerySqlNode, fromSize); code = doLoadAllTableMeta(pSql, index, pSqlNode, fromSize);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -6984,47 +6989,47 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind ...@@ -6984,47 +6989,47 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind
} }
// parse the group by clause in the first place // parse the group by clause in the first place
if (validateGroupbyNode(pQueryInfo, pQuerySqlNode->pGroupby, pCmd) != TSDB_CODE_SUCCESS) { if (validateGroupbyNode(pQueryInfo, pSqlNode->pGroupby, pCmd) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
// set where info // set where info
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
if (pQuerySqlNode->pWhere != NULL) { if (pSqlNode->pWhere != NULL) {
if (validateWhereNode(pQueryInfo, &pQuerySqlNode->pWhere, pSql) != TSDB_CODE_SUCCESS) { if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
pQuerySqlNode->pWhere = NULL; pSqlNode->pWhere = NULL;
if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) { if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) {
pQueryInfo->window.skey = pQueryInfo->window.skey / 1000; pQueryInfo->window.skey = pQueryInfo->window.skey / 1000;
pQueryInfo->window.ekey = pQueryInfo->window.ekey / 1000; pQueryInfo->window.ekey = pQueryInfo->window.ekey / 1000;
} }
} else { // set the time rang } else { // set the time rang
if (taosArrayGetSize(pQuerySqlNode->from->tableList) > 1) { if (taosArrayGetSize(pSqlNode->from->list) > 1) {
// If it is a join query, no where clause is not allowed. // If it is a join query, no where clause is not allowed.
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "condition missing for join query "); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "condition missing for join query ");
} }
} }
int32_t joinQuery = (pQuerySqlNode->from != NULL && taosArrayGetSize(pQuerySqlNode->from->tableList) > 1); int32_t joinQuery = (pSqlNode->from != NULL && taosArrayGetSize(pSqlNode->from->list) > 1);
int32_t timeWindowQuery = int32_t timeWindowQuery =
(TPARSER_HAS_TOKEN(pQuerySqlNode->interval.interval) || TPARSER_HAS_TOKEN(pQuerySqlNode->sessionVal.gap)); (TPARSER_HAS_TOKEN(pSqlNode->interval.interval) || TPARSER_HAS_TOKEN(pSqlNode->sessionVal.gap));
if (validateSelectNodeList(pCmd, index, pQueryInfo, pQuerySqlNode->pSelNodeList, isSTable, joinQuery, timeWindowQuery) != if (validateSelectNodeList(pCmd, index, pQueryInfo, pSqlNode->pSelNodeList, isSTable, joinQuery, timeWindowQuery) !=
TSDB_CODE_SUCCESS) { TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
// set order by info // set order by info
if (validateOrderbyNode(pCmd, pQueryInfo, pQuerySqlNode, tscGetTableSchema(pTableMetaInfo->pTableMeta)) != if (validateOrderbyNode(pCmd, pQueryInfo, pSqlNode, tscGetTableSchema(pTableMetaInfo->pTableMeta)) !=
TSDB_CODE_SUCCESS) { TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
// set interval value // set interval value
if (validateIntervalNode(pSql, pQueryInfo, pQuerySqlNode) != TSDB_CODE_SUCCESS) { if (validateIntervalNode(pSql, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} else { } else {
if (isTimeWindowQuery(pQueryInfo) && if (isTimeWindowQuery(pQueryInfo) &&
...@@ -7033,7 +7038,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind ...@@ -7033,7 +7038,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind
} }
} }
if (validateSessionNode(pCmd, pQueryInfo, pQuerySqlNode) != TSDB_CODE_SUCCESS) { if (validateSessionNode(pCmd, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
...@@ -7055,7 +7060,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind ...@@ -7055,7 +7060,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind
} }
} }
if ((code = validateLimitNode(pCmd, pQueryInfo, index, pQuerySqlNode, pSql)) != TSDB_CODE_SUCCESS) { if ((code = validateLimitNode(pCmd, pQueryInfo, index, pSqlNode, pSql)) != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -7066,7 +7071,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind ...@@ -7066,7 +7071,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind
updateLastScanOrderIfNeeded(pQueryInfo); updateLastScanOrderIfNeeded(pQueryInfo);
tscFieldInfoUpdateOffset(pQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo);
if ((code = validateFillNode(pCmd, pQueryInfo, pQuerySqlNode)) != TSDB_CODE_SUCCESS) { if ((code = validateFillNode(pCmd, pQueryInfo, pSqlNode)) != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} }
......
...@@ -1383,7 +1383,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1383,7 +1383,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg = (char *)pSchema; pMsg = (char *)pSchema;
if (type == TSQL_CREATE_STREAM) { // check if it is a stream sql if (type == TSQL_CREATE_STREAM) { // check if it is a stream sql
SQuerySqlNode *pQuerySql = pInfo->pCreateTableInfo->pSelect; SSqlNode *pQuerySql = pInfo->pCreateTableInfo->pSelect;
strncpy(pMsg, pQuerySql->sqlstr.z, pQuerySql->sqlstr.n + 1); strncpy(pMsg, pQuerySql->sqlstr.z, pQuerySql->sqlstr.n + 1);
pCreateMsg->sqlLen = htons(pQuerySql->sqlstr.n + 1); pCreateMsg->sqlLen = htons(pQuerySql->sqlstr.n + 1);
......
...@@ -3590,6 +3590,8 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST ...@@ -3590,6 +3590,8 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST
STsBufInfo bufInfo = {0}; STsBufInfo bufInfo = {0};
SQueryParam param = {.pOperator = pa}; SQueryParam param = {.pOperator = pa};
/*int32_t code = */initQInfo(&bufInfo, NULL, pSourceOperator, pQInfo, &param, NULL, 0, merger); /*int32_t code = */initQInfo(&bufInfo, NULL, pSourceOperator, pQInfo, &param, NULL, 0, merger);
taosArrayDestroy(pa);
return pQInfo; return pQInfo;
_cleanup: _cleanup:
......
...@@ -525,18 +525,14 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) { ...@@ -525,18 +525,14 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock) { void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock) {
assert(pRes->numOfCols > 0); assert(pRes->numOfCols > 0);
// int32_t offset = 0;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i); SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
pRes->urow[i] = pColData->pData/* + offset * pColData->info.bytes*/; pRes->urow[i] = pColData->pData;
pRes->length[i] = pInfo->field.bytes; pRes->length[i] = pInfo->field.bytes;
//offset += pInfo->field.bytes;
// generated the user-defined column result // generated the user-defined column result
if (pInfo->pExpr->pExpr == NULL && TSDB_COL_IS_UD_COL(pInfo->pExpr->base.colInfo.flag)) { if (pInfo->pExpr->pExpr == NULL && TSDB_COL_IS_UD_COL(pInfo->pExpr->base.colInfo.flag)) {
if (pInfo->pExpr->base.param[1].nType == TSDB_DATA_TYPE_NULL) { if (pInfo->pExpr->base.param[1].nType == TSDB_DATA_TYPE_NULL) {
...@@ -609,7 +605,7 @@ static SColumnInfo* extractColumnInfoFromResult(STableMeta* pTableMeta, SArray* ...@@ -609,7 +605,7 @@ static SColumnInfo* extractColumnInfoFromResult(STableMeta* pTableMeta, SArray*
} }
typedef struct SDummyInputInfo { typedef struct SDummyInputInfo {
SSDataBlock block; SSDataBlock *block;
SSqlRes *pRes; // refactor: remove it SSqlRes *pRes; // refactor: remove it
} SDummyInputInfo; } SDummyInputInfo;
...@@ -619,7 +615,7 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) { ...@@ -619,7 +615,7 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) {
SDummyInputInfo *pInput = pOperator->info; SDummyInputInfo *pInput = pOperator->info;
char* pData = pInput->pRes->data; char* pData = pInput->pRes->data;
SSDataBlock* pBlock = &pInput->block; SSDataBlock* pBlock = pInput->block;
pBlock->info.rows = pInput->pRes->numOfRows; pBlock->info.rows = pInput->pRes->numOfRows;
if (pBlock->info.rows == 0) { if (pBlock->info.rows == 0) {
return NULL; return NULL;
...@@ -638,30 +634,47 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) { ...@@ -638,30 +634,47 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) {
return pBlock; return pBlock;
} }
static void destroyDummyInputOperator(void* param, int32_t numOfOutput) {
SDummyInputInfo* pInfo = (SDummyInputInfo*) param;
// tricky
for(int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pInfo->block->pDataBlock, i);
pColInfoData->pData = NULL;
}
pInfo->block = destroyOutputBuf(pInfo->block);
pInfo->pRes = NULL;
}
// todo this operator servers as the adapter for Operator tree and SqlRes result, remove it later
SOperatorInfo* createDummyInputOperator(char* pResult, SSchema* pSchema, int32_t numOfCols) { SOperatorInfo* createDummyInputOperator(char* pResult, SSchema* pSchema, int32_t numOfCols) {
assert(numOfCols > 0); assert(numOfCols > 0);
SDummyInputInfo* pInfo = calloc(1, sizeof(SDummyInputInfo)); SDummyInputInfo* pInfo = calloc(1, sizeof(SDummyInputInfo));
pInfo->pRes = (SSqlRes*) pResult; pInfo->pRes = (SSqlRes*) pResult;
pInfo->block.info.numOfCols = numOfCols; pInfo->block = calloc(numOfCols, sizeof(SSDataBlock));
pInfo->block.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); pInfo->block->info.numOfCols = numOfCols;
pInfo->block->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colData = {0}; SColumnInfoData colData = {0};
colData.info.bytes = pSchema[i].bytes; colData.info.bytes = pSchema[i].bytes;
colData.info.type = pSchema[i].type; colData.info.type = pSchema[i].type;
colData.info.colId = pSchema[i].colId; colData.info.colId = pSchema[i].colId;
taosArrayPush(pInfo->block.pDataBlock, &colData); taosArrayPush(pInfo->block->pDataBlock, &colData);
} }
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
pOptr->name = "DummyInputOperator"; pOptr->name = "DummyInputOperator";
pOptr->operatorType = OP_DummyInput; pOptr->operatorType = OP_DummyInput;
pOptr->numOfOutput = numOfCols;
pOptr->blockingOptr = false; pOptr->blockingOptr = false;
pOptr->info = pInfo; pOptr->info = pInfo;
pOptr->exec = doGetDataBlock; pOptr->exec = doGetDataBlock;
pOptr->cleanup = destroyDummyInputOperator;
return pOptr; return pOptr;
} }
...@@ -683,18 +696,12 @@ void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) { ...@@ -683,18 +696,12 @@ void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
if (pQueryInfo->pDownstream != NULL) { if (pQueryInfo->pDownstream != NULL) {
// handle the following query process // handle the following query process
SQueryInfo *px = pQueryInfo->pDownstream; SQueryInfo *px = pQueryInfo->pDownstream;
SColumnInfo* colInfo = extractColumnInfoFromResult(px->pTableMetaInfo[0]->pTableMeta, px->colList); SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->pTableMetaInfo[0]->pTableMeta, px->colList);
int32_t numOfOutput = tscSqlExprNumOfExprs(px); int32_t numOfOutput = tscSqlExprNumOfExprs(px);
SExprInfo *exprInfo = NULL;
SQLFunctionCtx *pCtx = calloc(numOfOutput, sizeof(SQLFunctionCtx));
int32_t numOfCols = taosArrayGetSize(px->colList); int32_t numOfCols = taosArrayGetSize(px->colList);
SQueriedTableInfo info = {.colList = colInfo, .numOfCols = numOfCols,}; SQueriedTableInfo info = {.colList = pColumnInfo, .numOfCols = numOfCols,};
/*int32_t code = */createQueryFunc(&info, numOfOutput, &exprInfo, px->exprList->pData, NULL, px->type, NULL);
SSchema* pSchema = tscGetTableSchema(px->pTableMetaInfo[0]->pTableMeta); SSchema* pSchema = tscGetTableSchema(px->pTableMetaInfo[0]->pTableMeta);
tsCreateSQLFunctionCtx(px, pCtx, pSchema);
STableGroupInfo tableGroupInfo = {.numOfTables = 1, .pGroupList = taosArrayInit(1, POINTER_BYTES),}; STableGroupInfo tableGroupInfo = {.numOfTables = 1, .pGroupList = taosArrayInit(1, POINTER_BYTES),};
tableGroupInfo.map = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); tableGroupInfo.map = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
...@@ -708,11 +715,15 @@ void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) { ...@@ -708,11 +715,15 @@ void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
SOperatorInfo* pSourceOptr = createDummyInputOperator((char*)pRes, pSchema, numOfOutput); SOperatorInfo* pSourceOptr = createDummyInputOperator((char*)pRes, pSchema, numOfOutput);
SExprInfo *exprInfo = NULL;
/*int32_t code = */createQueryFunc(&info, numOfOutput, &exprInfo, px->exprList->pData, NULL, px->type, NULL);
px->pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOptr, NULL, NULL, MASTER_SCAN); px->pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOptr, NULL, NULL, MASTER_SCAN);
uint64_t qId = 0; uint64_t qId = 0;
qTableQuery(px->pQInfo, &qId); qTableQuery(px->pQInfo, &qId);
convertQueryResult(pRes, px); convertQueryResult(pRes, px);
tfree(pColumnInfo);
} }
} }
...@@ -751,8 +762,27 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) { ...@@ -751,8 +762,27 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) {
for (int32_t i = 0; i < pCmd->numOfClause; ++i) { for (int32_t i = 0; i < pCmd->numOfClause; ++i) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, i); SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, i);
// recursive call it
if (taosArrayGetSize(pQueryInfo->pUpstream) > 0) {
SQueryInfo* pUp = taosArrayGetP(pQueryInfo->pUpstream, 0);
freeQueryInfoImpl(pUp);
clearAllTableMetaInfo(pUp, removeMeta);
if (pUp->pQInfo != NULL) {
qDestroyQueryInfo(pUp->pQInfo);
pUp->pQInfo = NULL;
}
tfree(pUp);
}
freeQueryInfoImpl(pQueryInfo); freeQueryInfoImpl(pQueryInfo);
clearAllTableMetaInfo(pQueryInfo, removeMeta); clearAllTableMetaInfo(pQueryInfo, removeMeta);
if (pQueryInfo->pQInfo != NULL) {
qDestroyQueryInfo(pQueryInfo->pQInfo);
pQueryInfo->pQInfo = NULL;
}
tfree(pQueryInfo); tfree(pQueryInfo);
} }
...@@ -2312,6 +2342,10 @@ void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta) { ...@@ -2312,6 +2342,10 @@ void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta) {
for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
if (pTableMetaInfo->pTableMeta && pTableMetaInfo->pTableMeta->tableType == TSDB_TEMP_TABLE) {
tfree(pTableMetaInfo->pTableMeta);
}
if (removeMeta) { if (removeMeta) {
char name[TSDB_TABLE_FNAME_LEN] = {0}; char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, name); tNameExtractFullName(&pTableMetaInfo->name, name);
......
...@@ -221,7 +221,7 @@ typedef struct { ...@@ -221,7 +221,7 @@ typedef struct {
typedef struct { typedef struct {
uint32_t numOfTables; uint32_t numOfTables;
SArray * pGroupList; SArray *pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo by table uid SHashObj *map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo; } STableGroupInfo;
......
...@@ -503,6 +503,8 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); ...@@ -503,6 +503,8 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
SSDataBlock* doSLimit(void* param, bool* newgroup); SSDataBlock* doSLimit(void* param, bool* newgroup);
SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows); SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows);
void* destroyOutputBuf(SSDataBlock* pBlock);
void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput); int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput);
void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
......
...@@ -41,7 +41,7 @@ enum SQL_NODE_TYPE { ...@@ -41,7 +41,7 @@ enum SQL_NODE_TYPE {
enum SQL_NODE_FROM_TYPE { enum SQL_NODE_FROM_TYPE {
SQL_NODE_FROM_SUBQUERY = 1, SQL_NODE_FROM_SUBQUERY = 1,
SQL_NODE_FROM_NAMELIST = 2, SQL_NODE_FROM_TABLELIST = 2,
}; };
extern char tTokenTypeSwitcher[13]; extern char tTokenTypeSwitcher[13];
...@@ -83,11 +83,11 @@ typedef struct SSessionWindowVal { ...@@ -83,11 +83,11 @@ typedef struct SSessionWindowVal {
SStrToken gap; SStrToken gap;
} SSessionWindowVal; } SSessionWindowVal;
struct SFromInfo; struct SRelationInfo;
typedef struct SQuerySqlNode { typedef struct SSqlNode {
struct SArray *pSelNodeList; // select clause struct SArray *pSelNodeList; // select clause
struct SFromInfo *from; // from clause SArray<SQuerySqlNode> struct SRelationInfo *from; // from clause SArray<SSqlNode>
struct tSqlExpr *pWhere; // where clause [optional] struct tSqlExpr *pWhere; // where clause [optional]
SArray *pGroupby; // groupby clause, only for tags[optional], SArray<tVariantListItem> SArray *pGroupby; // groupby clause, only for tags[optional], SArray<tVariantListItem>
SArray *pSortOrder; // orderby [optional], SArray<tVariantListItem> SArray *pSortOrder; // orderby [optional], SArray<tVariantListItem>
...@@ -98,25 +98,22 @@ typedef struct SQuerySqlNode { ...@@ -98,25 +98,22 @@ typedef struct SQuerySqlNode {
SLimitVal limit; // limit offset [optional] SLimitVal limit; // limit offset [optional]
SLimitVal slimit; // group limit offset [optional] SLimitVal slimit; // group limit offset [optional]
SStrToken sqlstr; // sql string in select clause SStrToken sqlstr; // sql string in select clause
} SQuerySqlNode; } SSqlNode;
typedef struct STableNamePair { typedef struct STableNamePair {
SStrToken name; SStrToken name;
SStrToken aliasName; SStrToken aliasName;
} STableNamePair; } STableNamePair;
typedef struct SSubclauseInfo { // "UNION" multiple select sub-clause //typedef struct SSubclauseInfo { // "UNION" multiple select sub-clause
SQuerySqlNode **pClause; // SSqlNode **pClause;
int32_t numOfClause; // int32_t numOfClause;
} SSubclauseInfo; //} SSubclauseInfo;
typedef struct SFromInfo { typedef struct SRelationInfo {
int32_t type; // nested query|table name list int32_t type; // nested query|table name list
union { SArray *list; // SArray<STableNamePair>|SArray<SSqlNode*>
SSubclauseInfo pNode; } SRelationInfo;
SArray *tableList; // SArray<STableNamePair>
};
} SFromInfo;
typedef struct SCreatedTableInfo { typedef struct SCreatedTableInfo {
SStrToken name; // table name token SStrToken name; // table name token
...@@ -139,7 +136,7 @@ typedef struct SCreateTableSql { ...@@ -139,7 +136,7 @@ typedef struct SCreateTableSql {
} colInfo; } colInfo;
SArray *childTableInfo; // SArray<SCreatedTableInfo> SArray *childTableInfo; // SArray<SCreatedTableInfo>
SQuerySqlNode *pSelect; SSqlNode *pSelect;
} SCreateTableSql; } SCreateTableSql;
typedef struct SAlterTableInfo { typedef struct SAlterTableInfo {
...@@ -216,7 +213,7 @@ typedef struct SMiscInfo { ...@@ -216,7 +213,7 @@ typedef struct SMiscInfo {
typedef struct SSqlInfo { typedef struct SSqlInfo {
int32_t type; int32_t type;
bool valid; bool valid;
SSubclauseInfo subclauseInfo; SArray *list; // todo refactor
char msg[256]; char msg[256];
union { union {
SCreateTableSql *pCreateTableInfo; SCreateTableSql *pCreateTableInfo;
...@@ -253,9 +250,9 @@ SArray *tVariantListAppend(SArray *pList, tVariant *pVar, uint8_t sortOrder); ...@@ -253,9 +250,9 @@ SArray *tVariantListAppend(SArray *pList, tVariant *pVar, uint8_t sortOrder);
SArray *tVariantListInsert(SArray *pList, tVariant *pVar, uint8_t sortOrder, int32_t index); SArray *tVariantListInsert(SArray *pList, tVariant *pVar, uint8_t sortOrder, int32_t index);
SArray *tVariantListAppendToken(SArray *pList, SStrToken *pAliasToken, uint8_t sortOrder); SArray *tVariantListAppendToken(SArray *pList, SStrToken *pAliasToken, uint8_t sortOrder);
SFromInfo *setTableNameList(SFromInfo* pFromInfo, SStrToken *pName, SStrToken* pAlias); SRelationInfo *setTableNameList(SRelationInfo* pFromInfo, SStrToken *pName, SStrToken* pAlias);
SFromInfo *setSubquery(SFromInfo* pFromInfo, SSubclauseInfo* pSqlNode); SRelationInfo *setSubquery(SRelationInfo* pFromInfo, SArray* pSqlNode);
void *destroyFromInfo(SFromInfo* pFromInfo); void *destroyFromInfo(SRelationInfo* pFromInfo);
// sql expr leaf node // sql expr leaf node
tSqlExpr *tSqlExprCreateIdValue(SStrToken *pToken, int32_t optrType); tSqlExpr *tSqlExprCreateIdValue(SStrToken *pToken, int32_t optrType);
...@@ -270,23 +267,22 @@ void tSqlExprDestroy(tSqlExpr *pExpr); ...@@ -270,23 +267,22 @@ void tSqlExprDestroy(tSqlExpr *pExpr);
SArray *tSqlExprListAppend(SArray *pList, tSqlExpr *pNode, SStrToken *pDistinct, SStrToken *pToken); SArray *tSqlExprListAppend(SArray *pList, tSqlExpr *pNode, SStrToken *pDistinct, SStrToken *pToken);
void tSqlExprListDestroy(SArray *pList); void tSqlExprListDestroy(SArray *pList);
SQuerySqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SFromInfo *pFrom, tSqlExpr *pWhere, SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelationInfo *pFrom, tSqlExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *ps, SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *ps,
SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pgLimit); SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pgLimit);
SCreateTableSql *tSetCreateTableInfo(SArray *pCols, SArray *pTags, SQuerySqlNode *pSelect, int32_t type); SCreateTableSql *tSetCreateTableInfo(SArray *pCols, SArray *pTags, SSqlNode *pSelect, int32_t type);
SAlterTableInfo *tSetAlterTableInfo(SStrToken *pTableName, SArray *pCols, SArray *pVals, int32_t type, int16_t tableTable); SAlterTableInfo *tSetAlterTableInfo(SStrToken *pTableName, SArray *pCols, SArray *pVals, int32_t type, int16_t tableTable);
SCreatedTableInfo createNewChildTableInfo(SStrToken *pTableName, SArray *pTagNames, SArray *pTagVals, SStrToken *pToken, SStrToken* igExists); SCreatedTableInfo createNewChildTableInfo(SStrToken *pTableName, SArray *pTagNames, SArray *pTagVals, SStrToken *pToken, SStrToken* igExists);
void destroyAllSelectClause(SSubclauseInfo *pSql); void destroyAllSqlNode(SArray *pSqlNode);
void destroyQuerySqlNode(SQuerySqlNode *pSql); void destroySqlNode(SSqlNode *pSql);
void freeCreateTableInfo(void* p); void freeCreateTableInfo(void* p);
SSqlInfo *setSqlInfo(SSqlInfo *pInfo, void *pSqlExprInfo, SStrToken *pTableName, int32_t type); SSqlInfo *setSqlInfo(SSqlInfo *pInfo, void *pSqlExprInfo, SStrToken *pTableName, int32_t type);
SSubclauseInfo *setSubclause(SSubclauseInfo *pClause, void *pSqlExprInfo); SArray *setSubclause(SArray *pList, void *pSqlNode);
SArray *appendSelectClause(SArray *pList, void *pSubclause);
SSubclauseInfo *appendSelectClause(SSubclauseInfo *pInfo, void *pSubclause);
void setCreatedTableName(SSqlInfo *pInfo, SStrToken *pTableNameToken, SStrToken *pIfNotExists); void setCreatedTableName(SSqlInfo *pInfo, SStrToken *pTableNameToken, SStrToken *pIfNotExists);
......
...@@ -450,16 +450,16 @@ tagitem(A) ::= PLUS(X) FLOAT(Y). { ...@@ -450,16 +450,16 @@ tagitem(A) ::= PLUS(X) FLOAT(Y). {
} }
//////////////////////// The SELECT statement ///////////////////////////////// //////////////////////// The SELECT statement /////////////////////////////////
%type select {SQuerySqlNode*} %type select {SSqlNode*}
%destructor select {destroyQuerySqlNode($$);} %destructor select {destroySqlNode($$);}
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). { select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). {
A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &S, F, &L, &G); A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &S, F, &L, &G);
} }
select(A) ::= LP select(B) RP. {A = B;} select(A) ::= LP select(B) RP. {A = B;}
%type union {SSubclauseInfo*} %type union {SArray*}
%destructor union {destroyAllSelectClause($$);} %destructor union {destroyAllSqlNode($$);}
union(Y) ::= select(X). { Y = setSubclause(NULL, X); } union(Y) ::= select(X). { Y = setSubclause(NULL, X); }
union(Y) ::= union(Z) UNION ALL select(X). { Y = appendSelectClause(Z, X); } union(Y) ::= union(Z) UNION ALL select(X). { Y = appendSelectClause(Z, X); }
...@@ -505,11 +505,11 @@ distinct(X) ::= DISTINCT(Y). { X = Y; } ...@@ -505,11 +505,11 @@ distinct(X) ::= DISTINCT(Y). { X = Y; }
distinct(X) ::= . { X.n = 0;} distinct(X) ::= . { X.n = 0;}
// A complete FROM clause. // A complete FROM clause.
%type from {SFromInfo*} %type from {SRelationInfo*}
from(A) ::= FROM tablelist(X). {A = X;} from(A) ::= FROM tablelist(X). {A = X;}
from(A) ::= FROM LP union(Y) RP. {A = setSubquery(NULL, Y);} from(A) ::= FROM LP union(Y) RP. {A = setSubquery(NULL, Y);}
%type tablelist {SFromInfo*} %type tablelist {SRelationInfo*}
tablelist(A) ::= ids(X) cpxName(Y). { tablelist(A) ::= ids(X) cpxName(Y). {
X.n += Y.n; X.n += Y.n;
A = setTableNameList(NULL, &X, NULL); A = setTableNameList(NULL, &X, NULL);
......
...@@ -201,7 +201,7 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO ...@@ -201,7 +201,7 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
return res; return res;
} }
static void* destroyOutputBuf(SSDataBlock* pBlock) { void* destroyOutputBuf(SSDataBlock* pBlock) {
if (pBlock == NULL) { if (pBlock == NULL) {
return NULL; return NULL;
} }
......
...@@ -452,13 +452,13 @@ SArray *tVariantListInsert(SArray *pList, tVariant *pVar, uint8_t sortOrder, int ...@@ -452,13 +452,13 @@ SArray *tVariantListInsert(SArray *pList, tVariant *pVar, uint8_t sortOrder, int
return pList; return pList;
} }
SFromInfo *setTableNameList(SFromInfo* pFromInfo, SStrToken *pName, SStrToken* pAlias) { SRelationInfo *setTableNameList(SRelationInfo* pRelationInfo, SStrToken *pName, SStrToken* pAlias) {
if (pFromInfo == NULL) { if (pRelationInfo == NULL) {
pFromInfo = calloc(1, sizeof(SFromInfo)); pRelationInfo = calloc(1, sizeof(SRelationInfo));
pFromInfo->tableList = taosArrayInit(4, sizeof(STableNamePair)); pRelationInfo->list = taosArrayInit(4, sizeof(STableNamePair));
} }
pFromInfo->type = SQL_NODE_FROM_NAMELIST; pRelationInfo->type = SQL_NODE_FROM_TABLELIST;
STableNamePair p = {.name = *pName}; STableNamePair p = {.name = *pName};
if (pAlias != NULL) { if (pAlias != NULL) {
p.aliasName = *pAlias; p.aliasName = *pAlias;
...@@ -466,34 +466,39 @@ SFromInfo *setTableNameList(SFromInfo* pFromInfo, SStrToken *pName, SStrToken* p ...@@ -466,34 +466,39 @@ SFromInfo *setTableNameList(SFromInfo* pFromInfo, SStrToken *pName, SStrToken* p
TPARSER_SET_NONE_TOKEN(p.aliasName); TPARSER_SET_NONE_TOKEN(p.aliasName);
} }
taosArrayPush(pFromInfo->tableList, &p); taosArrayPush(pRelationInfo->list, &p);
return pRelationInfo;
return pFromInfo;
} }
SFromInfo *setSubquery(SFromInfo* pFromInfo, SSubclauseInfo* pSqlNode) { SRelationInfo* setSubquery(SRelationInfo* pRelationInfo, SArray* pList) {
if (pFromInfo == NULL) { if (pRelationInfo == NULL) {
pFromInfo = calloc(1, sizeof(SFromInfo)); pRelationInfo = calloc(1, sizeof(SRelationInfo));
pRelationInfo->list = taosArrayInit(4, POINTER_BYTES);
} }
pFromInfo->type = SQL_NODE_FROM_SUBQUERY; pRelationInfo->type = SQL_NODE_FROM_SUBQUERY;
pFromInfo->pNode = *pSqlNode; taosArrayPush(pRelationInfo->list, &pList);
return pFromInfo; return pRelationInfo;
} }
void* destroyFromInfo(SFromInfo* pFromInfo) { void* destroyFromInfo(SRelationInfo* pRelationInfo) {
if (pFromInfo == NULL) { if (pRelationInfo == NULL) {
return NULL; return NULL;
} }
if (pFromInfo->type == SQL_NODE_FROM_NAMELIST) { if (pRelationInfo->type == SQL_NODE_FROM_TABLELIST) {
taosArrayDestroy(pFromInfo->tableList); taosArrayDestroy(pRelationInfo->list);
} else { } else {
destroyAllSelectClause(&pFromInfo->pNode); size_t size = taosArrayGetSize(pRelationInfo->list);
for(int32_t i = 0; i < size; ++i) {
SArray* pa = taosArrayGetP(pRelationInfo->list, 0);
destroyAllSqlNode(pa);
}
taosArrayDestroy(pRelationInfo->list);
} }
tfree(pFromInfo); tfree(pRelationInfo);
return NULL; return NULL;
} }
...@@ -637,13 +642,13 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) { ...@@ -637,13 +642,13 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) {
/* /*
* extract the select info out of sql string * extract the select info out of sql string
*/ */
SQuerySqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SFromInfo *pFrom, tSqlExpr *pWhere, SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelationInfo *pFrom, tSqlExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval,
SSessionWindowVal *pSession, SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SSessionWindowVal *pSession, SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit,
SLimitVal *psLimit) { SLimitVal *psLimit) {
assert(pSelNodeList != NULL); assert(pSelNodeList != NULL);
SQuerySqlNode *pSqlNode = calloc(1, sizeof(SQuerySqlNode)); SSqlNode *pSqlNode = calloc(1, sizeof(SSqlNode));
// all later sql string are belonged to the stream sql // all later sql string are belonged to the stream sql
pSqlNode->sqlstr = *pSelectToken; pSqlNode->sqlstr = *pSelectToken;
...@@ -706,46 +711,47 @@ void freeCreateTableInfo(void* p) { ...@@ -706,46 +711,47 @@ void freeCreateTableInfo(void* p) {
tfree(pInfo->tagdata.data); tfree(pInfo->tagdata.data);
} }
void destroyQuerySqlNode(SQuerySqlNode *pQuerySql) { void destroySqlNode(SSqlNode *pSqlNode) {
if (pQuerySql == NULL) { if (pSqlNode == NULL) {
return; return;
} }
tSqlExprListDestroy(pQuerySql->pSelNodeList); tSqlExprListDestroy(pSqlNode->pSelNodeList);
pQuerySql->pSelNodeList = NULL; pSqlNode->pSelNodeList = NULL;
tSqlExprDestroy(pQuerySql->pWhere); tSqlExprDestroy(pSqlNode->pWhere);
pQuerySql->pWhere = NULL; pSqlNode->pWhere = NULL;
taosArrayDestroyEx(pQuerySql->pSortOrder, freeVariant); taosArrayDestroyEx(pSqlNode->pSortOrder, freeVariant);
pQuerySql->pSortOrder = NULL; pSqlNode->pSortOrder = NULL;
taosArrayDestroyEx(pQuerySql->pGroupby, freeVariant); taosArrayDestroyEx(pSqlNode->pGroupby, freeVariant);
pQuerySql->pGroupby = NULL; pSqlNode->pGroupby = NULL;
pQuerySql->from = destroyFromInfo(pQuerySql->from); pSqlNode->from = destroyFromInfo(pSqlNode->from);
taosArrayDestroyEx(pQuerySql->fillType, freeVariant); taosArrayDestroyEx(pSqlNode->fillType, freeVariant);
pQuerySql->fillType = NULL; pSqlNode->fillType = NULL;
free(pQuerySql); free(pSqlNode);
} }
void destroyAllSelectClause(SSubclauseInfo *pClause) { void destroyAllSqlNode(SArray *pList) {
if (pClause == NULL || pClause->numOfClause == 0) { if (pList == NULL) {
return; return;
} }
for(int32_t i = 0; i < pClause->numOfClause; ++i) { size_t size = taosArrayGetSize(pList);
SQuerySqlNode *pQuerySql = pClause->pClause[i]; for(int32_t i = 0; i < size; ++i) {
destroyQuerySqlNode(pQuerySql); SSqlNode *pNode = taosArrayGetP(pList, 0);
destroySqlNode(pNode);
} }
tfree(pClause->pClause); taosArrayDestroy(pList);
} }
SCreateTableSql *tSetCreateTableInfo(SArray *pCols, SArray *pTags, SQuerySqlNode *pSelect, int32_t type) { SCreateTableSql *tSetCreateTableInfo(SArray *pCols, SArray *pTags, SSqlNode *pSelect, int32_t type) {
SCreateTableSql *pCreate = calloc(1, sizeof(SCreateTableSql)); SCreateTableSql *pCreate = calloc(1, sizeof(SCreateTableSql));
switch (type) { switch (type) {
...@@ -813,7 +819,7 @@ SAlterTableInfo *tSetAlterTableInfo(SStrToken *pTableName, SArray *pCols, SArray ...@@ -813,7 +819,7 @@ SAlterTableInfo *tSetAlterTableInfo(SStrToken *pTableName, SArray *pCols, SArray
} }
void* destroyCreateTableSql(SCreateTableSql* pCreate) { void* destroyCreateTableSql(SCreateTableSql* pCreate) {
destroyQuerySqlNode(pCreate->pSelect); destroySqlNode(pCreate->pSelect);
taosArrayDestroy(pCreate->colInfo.pColumns); taosArrayDestroy(pCreate->colInfo.pColumns);
taosArrayDestroy(pCreate->colInfo.pTagColumns); taosArrayDestroy(pCreate->colInfo.pTagColumns);
...@@ -828,7 +834,7 @@ void SqlInfoDestroy(SSqlInfo *pInfo) { ...@@ -828,7 +834,7 @@ void SqlInfoDestroy(SSqlInfo *pInfo) {
if (pInfo == NULL) return; if (pInfo == NULL) return;
if (pInfo->type == TSDB_SQL_SELECT) { if (pInfo->type == TSDB_SQL_SELECT) {
destroyAllSelectClause(&pInfo->subclauseInfo); destroyAllSqlNode(pInfo->list);
} else if (pInfo->type == TSDB_SQL_CREATE_TABLE) { } else if (pInfo->type == TSDB_SQL_CREATE_TABLE) {
pInfo->pCreateTableInfo = destroyCreateTableSql(pInfo->pCreateTableInfo); pInfo->pCreateTableInfo = destroyCreateTableSql(pInfo->pCreateTableInfo);
} else if (pInfo->type == TSDB_SQL_ALTER_TABLE) { } else if (pInfo->type == TSDB_SQL_ALTER_TABLE) {
...@@ -849,31 +855,20 @@ void SqlInfoDestroy(SSqlInfo *pInfo) { ...@@ -849,31 +855,20 @@ void SqlInfoDestroy(SSqlInfo *pInfo) {
} }
} }
SSubclauseInfo* setSubclause(SSubclauseInfo* pSubclause, void *pSqlExprInfo) { SArray* setSubclause(SArray* pList, void *pSqlNode) {
if (pSubclause == NULL) { if (pList == NULL) {
pSubclause = calloc(1, sizeof(SSubclauseInfo)); pList = taosArrayInit(1, POINTER_BYTES);
}
int32_t newSize = pSubclause->numOfClause + 1;
char* tmp = realloc(pSubclause->pClause, newSize * POINTER_BYTES);
if (tmp == NULL) {
return pSubclause;
} }
pSubclause->pClause = (SQuerySqlNode**) tmp; taosArrayPush(pList, &pSqlNode);
return pList;
pSubclause->pClause[newSize - 1] = pSqlExprInfo;
pSubclause->numOfClause++;
return pSubclause;
} }
SSqlInfo* setSqlInfo(SSqlInfo *pInfo, void *pSqlExprInfo, SStrToken *pTableName, int32_t type) { SSqlInfo* setSqlInfo(SSqlInfo *pInfo, void *pSqlExprInfo, SStrToken *pTableName, int32_t type) {
pInfo->type = type; pInfo->type = type;
if (type == TSDB_SQL_SELECT) { if (type == TSDB_SQL_SELECT) {
pInfo->subclauseInfo = *(SSubclauseInfo*) pSqlExprInfo; pInfo->list = (SArray*) pSqlExprInfo;
free(pSqlExprInfo);
} else { } else {
pInfo->pCreateTableInfo = pSqlExprInfo; pInfo->pCreateTableInfo = pSqlExprInfo;
} }
...@@ -885,16 +880,9 @@ SSqlInfo* setSqlInfo(SSqlInfo *pInfo, void *pSqlExprInfo, SStrToken *pTableName, ...@@ -885,16 +880,9 @@ SSqlInfo* setSqlInfo(SSqlInfo *pInfo, void *pSqlExprInfo, SStrToken *pTableName,
return pInfo; return pInfo;
} }
SSubclauseInfo* appendSelectClause(SSubclauseInfo *pQueryInfo, void *pSubclause) { SArray* appendSelectClause(SArray *pList, void *pSubclause) {
char* tmp = realloc(pQueryInfo->pClause, (pQueryInfo->numOfClause + 1) * POINTER_BYTES); taosArrayPush(pList, &pSubclause);
if (tmp == NULL) { // out of memory return pList;
return pQueryInfo;
}
pQueryInfo->pClause = (SQuerySqlNode**) tmp;
pQueryInfo->pClause[pQueryInfo->numOfClause++] = pSubclause;
return pQueryInfo;
} }
void setCreatedTableName(SSqlInfo *pInfo, SStrToken *pTableNameToken, SStrToken *pIfNotExists) { void setCreatedTableName(SSqlInfo *pInfo, SStrToken *pTableNameToken, SStrToken *pIfNotExists) {
......
...@@ -105,20 +105,19 @@ typedef union { ...@@ -105,20 +105,19 @@ typedef union {
ParseTOKENTYPE yy0; ParseTOKENTYPE yy0;
SCreateTableSql* yy14; SCreateTableSql* yy14;
int yy20; int yy20;
SSqlNode* yy116;
tSqlExpr* yy118; tSqlExpr* yy118;
SArray* yy159; SArray* yy159;
SIntervalVal yy184; SIntervalVal yy184;
SCreatedTableInfo yy206; SCreatedTableInfo yy206;
SRelationInfo* yy236;
SSessionWindowVal yy249; SSessionWindowVal yy249;
SQuerySqlNode* yy272;
int64_t yy317; int64_t yy317;
SCreateDbInfo yy322; SCreateDbInfo yy322;
SCreateAcctInfo yy351; SCreateAcctInfo yy351;
SSubclauseInfo* yy391;
TAOS_FIELD yy407; TAOS_FIELD yy407;
SLimitVal yy440; SLimitVal yy440;
tVariant yy488; tVariant yy488;
SFromInfo* yy514;
} YYMINORTYPE; } YYMINORTYPE;
#ifndef YYSTACKDEPTH #ifndef YYSTACKDEPTH
#define YYSTACKDEPTH 100 #define YYSTACKDEPTH 100
...@@ -1426,7 +1425,7 @@ destroyCreateTableSql((yypminor->yy14)); ...@@ -1426,7 +1425,7 @@ destroyCreateTableSql((yypminor->yy14));
break; break;
case 234: /* select */ case 234: /* select */
{ {
destroyQuerySqlNode((yypminor->yy272)); destroySqlNode((yypminor->yy116));
} }
break; break;
case 237: /* selcollist */ case 237: /* selcollist */
...@@ -1446,7 +1445,7 @@ tSqlExprDestroy((yypminor->yy118)); ...@@ -1446,7 +1445,7 @@ tSqlExprDestroy((yypminor->yy118));
break; break;
case 249: /* union */ case 249: /* union */
{ {
destroyAllSelectClause((yypminor->yy391)); destroyAllSqlNode((yypminor->yy159));
} }
break; break;
case 257: /* sortitem */ case 257: /* sortitem */
...@@ -2540,7 +2539,7 @@ static void yy_reduce( ...@@ -2540,7 +2539,7 @@ static void yy_reduce(
break; break;
case 141: /* create_table_args ::= ifnotexists ids cpxName AS select */ case 141: /* create_table_args ::= ifnotexists ids cpxName AS select */
{ {
yylhsminor.yy14 = tSetCreateTableInfo(NULL, NULL, yymsp[0].minor.yy272, TSQL_CREATE_STREAM); yylhsminor.yy14 = tSetCreateTableInfo(NULL, NULL, yymsp[0].minor.yy116, TSQL_CREATE_STREAM);
setSqlInfo(pInfo, yylhsminor.yy14, NULL, TSDB_SQL_CREATE_TABLE); setSqlInfo(pInfo, yylhsminor.yy14, NULL, TSDB_SQL_CREATE_TABLE);
yymsp[-3].minor.yy0.n += yymsp[-2].minor.yy0.n; yymsp[-3].minor.yy0.n += yymsp[-2].minor.yy0.n;
...@@ -2595,29 +2594,29 @@ static void yy_reduce( ...@@ -2595,29 +2594,29 @@ static void yy_reduce(
break; break;
case 156: /* select ::= SELECT selcollist from where_opt interval_opt session_option fill_opt sliding_opt groupby_opt orderby_opt having_opt slimit_opt limit_opt */ case 156: /* select ::= SELECT selcollist from where_opt interval_opt session_option fill_opt sliding_opt groupby_opt orderby_opt having_opt slimit_opt limit_opt */
{ {
yylhsminor.yy272 = tSetQuerySqlNode(&yymsp[-12].minor.yy0, yymsp[-11].minor.yy159, yymsp[-10].minor.yy514, yymsp[-9].minor.yy118, yymsp[-4].minor.yy159, yymsp[-3].minor.yy159, &yymsp[-8].minor.yy184, &yymsp[-7].minor.yy249, &yymsp[-5].minor.yy0, yymsp[-6].minor.yy159, &yymsp[0].minor.yy440, &yymsp[-1].minor.yy440); yylhsminor.yy116 = tSetQuerySqlNode(&yymsp[-12].minor.yy0, yymsp[-11].minor.yy159, yymsp[-10].minor.yy236, yymsp[-9].minor.yy118, yymsp[-4].minor.yy159, yymsp[-3].minor.yy159, &yymsp[-8].minor.yy184, &yymsp[-7].minor.yy249, &yymsp[-5].minor.yy0, yymsp[-6].minor.yy159, &yymsp[0].minor.yy440, &yymsp[-1].minor.yy440);
} }
yymsp[-12].minor.yy272 = yylhsminor.yy272; yymsp[-12].minor.yy116 = yylhsminor.yy116;
break; break;
case 157: /* select ::= LP select RP */ case 157: /* select ::= LP select RP */
{yymsp[-2].minor.yy272 = yymsp[-1].minor.yy272;} {yymsp[-2].minor.yy116 = yymsp[-1].minor.yy116;}
break; break;
case 158: /* union ::= select */ case 158: /* union ::= select */
{ yylhsminor.yy391 = setSubclause(NULL, yymsp[0].minor.yy272); } { yylhsminor.yy159 = setSubclause(NULL, yymsp[0].minor.yy116); }
yymsp[0].minor.yy391 = yylhsminor.yy391; yymsp[0].minor.yy159 = yylhsminor.yy159;
break; break;
case 159: /* union ::= union UNION ALL select */ case 159: /* union ::= union UNION ALL select */
{ yylhsminor.yy391 = appendSelectClause(yymsp[-3].minor.yy391, yymsp[0].minor.yy272); } { yylhsminor.yy159 = appendSelectClause(yymsp[-3].minor.yy159, yymsp[0].minor.yy116); }
yymsp[-3].minor.yy391 = yylhsminor.yy391; yymsp[-3].minor.yy159 = yylhsminor.yy159;
break; break;
case 160: /* cmd ::= union */ case 160: /* cmd ::= union */
{ setSqlInfo(pInfo, yymsp[0].minor.yy391, NULL, TSDB_SQL_SELECT); } { setSqlInfo(pInfo, yymsp[0].minor.yy159, NULL, TSDB_SQL_SELECT); }
break; break;
case 161: /* select ::= SELECT selcollist */ case 161: /* select ::= SELECT selcollist */
{ {
yylhsminor.yy272 = tSetQuerySqlNode(&yymsp[-1].minor.yy0, yymsp[0].minor.yy159, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); yylhsminor.yy116 = tSetQuerySqlNode(&yymsp[-1].minor.yy0, yymsp[0].minor.yy159, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
} }
yymsp[-1].minor.yy272 = yylhsminor.yy272; yymsp[-1].minor.yy116 = yylhsminor.yy116;
break; break;
case 162: /* sclp ::= selcollist COMMA */ case 162: /* sclp ::= selcollist COMMA */
{yylhsminor.yy159 = yymsp[-1].minor.yy159;} {yylhsminor.yy159 = yymsp[-1].minor.yy159;}
...@@ -2655,38 +2654,38 @@ static void yy_reduce( ...@@ -2655,38 +2654,38 @@ static void yy_reduce(
yymsp[0].minor.yy0 = yylhsminor.yy0; yymsp[0].minor.yy0 = yylhsminor.yy0;
break; break;
case 171: /* from ::= FROM tablelist */ case 171: /* from ::= FROM tablelist */
{yymsp[-1].minor.yy514 = yymsp[0].minor.yy514;} {yymsp[-1].minor.yy236 = yymsp[0].minor.yy236;}
break; break;
case 172: /* from ::= FROM LP union RP */ case 172: /* from ::= FROM LP union RP */
{yymsp[-3].minor.yy514 = setSubquery(NULL, yymsp[-1].minor.yy391);} {yymsp[-3].minor.yy236 = setSubquery(NULL, yymsp[-1].minor.yy159);}
break; break;
case 173: /* tablelist ::= ids cpxName */ case 173: /* tablelist ::= ids cpxName */
{ {
yymsp[-1].minor.yy0.n += yymsp[0].minor.yy0.n; yymsp[-1].minor.yy0.n += yymsp[0].minor.yy0.n;
yylhsminor.yy514 = setTableNameList(NULL, &yymsp[-1].minor.yy0, NULL); yylhsminor.yy236 = setTableNameList(NULL, &yymsp[-1].minor.yy0, NULL);
} }
yymsp[-1].minor.yy514 = yylhsminor.yy514; yymsp[-1].minor.yy236 = yylhsminor.yy236;
break; break;
case 174: /* tablelist ::= ids cpxName ids */ case 174: /* tablelist ::= ids cpxName ids */
{ {
yymsp[-2].minor.yy0.n += yymsp[-1].minor.yy0.n; yymsp[-2].minor.yy0.n += yymsp[-1].minor.yy0.n;
yylhsminor.yy514 = setTableNameList(NULL, &yymsp[-2].minor.yy0, &yymsp[0].minor.yy0); yylhsminor.yy236 = setTableNameList(NULL, &yymsp[-2].minor.yy0, &yymsp[0].minor.yy0);
} }
yymsp[-2].minor.yy514 = yylhsminor.yy514; yymsp[-2].minor.yy236 = yylhsminor.yy236;
break; break;
case 175: /* tablelist ::= tablelist COMMA ids cpxName */ case 175: /* tablelist ::= tablelist COMMA ids cpxName */
{ {
yymsp[-1].minor.yy0.n += yymsp[0].minor.yy0.n; yymsp[-1].minor.yy0.n += yymsp[0].minor.yy0.n;
yylhsminor.yy514 = setTableNameList(yymsp[-3].minor.yy514, &yymsp[-1].minor.yy0, NULL); yylhsminor.yy236 = setTableNameList(yymsp[-3].minor.yy236, &yymsp[-1].minor.yy0, NULL);
} }
yymsp[-3].minor.yy514 = yylhsminor.yy514; yymsp[-3].minor.yy236 = yylhsminor.yy236;
break; break;
case 176: /* tablelist ::= tablelist COMMA ids cpxName ids */ case 176: /* tablelist ::= tablelist COMMA ids cpxName ids */
{ {
yymsp[-2].minor.yy0.n += yymsp[-1].minor.yy0.n; yymsp[-2].minor.yy0.n += yymsp[-1].minor.yy0.n;
yylhsminor.yy514 = setTableNameList(yymsp[-4].minor.yy514, &yymsp[-2].minor.yy0, &yymsp[0].minor.yy0); yylhsminor.yy236 = setTableNameList(yymsp[-4].minor.yy236, &yymsp[-2].minor.yy0, &yymsp[0].minor.yy0);
} }
yymsp[-4].minor.yy514 = yylhsminor.yy514; yymsp[-4].minor.yy236 = yylhsminor.yy236;
break; break;
case 177: /* tmvar ::= VARIABLE */ case 177: /* tmvar ::= VARIABLE */
{yylhsminor.yy0 = yymsp[0].minor.yy0;} {yylhsminor.yy0 = yymsp[0].minor.yy0;}
......
...@@ -3414,14 +3414,16 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) { ...@@ -3414,14 +3414,16 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) {
size_t numOfTables = taosArrayGetSize(p); size_t numOfTables = taosArrayGetSize(p);
for(int32_t j = 0; j < numOfTables; ++j) { for(int32_t j = 0; j < numOfTables; ++j) {
STable* pTable = taosArrayGetP(p, j); STable* pTable = taosArrayGetP(p, j);
assert(pTable != NULL); if (pTable != NULL) { // in case of handling retrieve data from tsdb
tsdbUnRefTable(pTable); tsdbUnRefTable(pTable);
} }
//assert(pTable != NULL);
}
taosArrayDestroy(p); taosArrayDestroy(p);
} }
taosHashCleanup(pGroupList->map);
taosArrayDestroy(pGroupList->pGroupList); taosArrayDestroy(pGroupList->pGroupList);
pGroupList->numOfTables = 0; pGroupList->numOfTables = 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册