未验证 提交 880e5c4e 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2152 from taosdata/feature/query

[td-485]
...@@ -63,19 +63,21 @@ typedef struct SLocalReducer { ...@@ -63,19 +63,21 @@ typedef struct SLocalReducer {
// char * pBufForInterpo; // intermediate buffer for interpolation // char * pBufForInterpo; // intermediate buffer for interpolation
tFilePage * pTempBuffer; tFilePage * pTempBuffer;
struct SQLFunctionCtx *pCtx; struct SQLFunctionCtx *pCtx;
int32_t rowSize; // size of each intermediate result. int32_t rowSize; // size of each intermediate result.
int32_t status; // denote it is in reduce process, in reduce process, it int32_t finalRowSize; // final result row size
bool hasPrevRow; // cannot be released int32_t status; // denote it is in reduce process, in reduce process, it
bool hasPrevRow; // cannot be released
bool hasUnprocessedRow; bool hasUnprocessedRow;
tOrderDescriptor * pDesc; tOrderDescriptor * pDesc;
SColumnModel * resColModel; SColumnModel * resColModel;
tExtMemBuffer ** pExtMemBuffer; // disk-based buffer tExtMemBuffer ** pExtMemBuffer; // disk-based buffer
SFillInfo* pFillInfo; // interpolation support structure SFillInfo* pFillInfo; // interpolation support structure
char * pFinalRes; // result data after interpo char * pFinalRes; // result data after interpo
tFilePage * discardData; tFilePage * discardData;
SResultInfo * pResInfo; SResultInfo * pResInfo;
bool discard; bool discard;
int32_t offset; // limit offset value int32_t offset; // limit offset value
bool orderPrjOnSTable; // projection query on stable
} SLocalReducer; } SLocalReducer;
typedef struct SSubqueryState { typedef struct SSubqueryState {
......
...@@ -341,16 +341,6 @@ bool stableQueryFunctChanged(int32_t funcId) { ...@@ -341,16 +341,6 @@ bool stableQueryFunctChanged(int32_t funcId) {
*/ */
void resetResultInfo(SResultInfo *pResInfo) { pResInfo->initialized = false; } void resetResultInfo(SResultInfo *pResInfo) { pResInfo->initialized = false; }
void initResultInfo(SResultInfo *pResInfo) {
pResInfo->initialized = true; // the this struct has been initialized flag
pResInfo->complete = false;
pResInfo->hasResult = false;
pResInfo->numOfRes = 0;
memset(pResInfo->interResultBuf, 0, (size_t)pResInfo->bufLen);
}
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable) { void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable) {
assert(pResInfo->interResultBuf == NULL); assert(pResInfo->interResultBuf == NULL);
...@@ -387,9 +377,7 @@ static bool function_setup(SQLFunctionCtx *pCtx) { ...@@ -387,9 +377,7 @@ static bool function_setup(SQLFunctionCtx *pCtx) {
*/ */
static void function_finalizer(SQLFunctionCtx *pCtx) { static void function_finalizer(SQLFunctionCtx *pCtx) {
SResultInfo *pResInfo = GET_RES_INFO(pCtx); SResultInfo *pResInfo = GET_RES_INFO(pCtx);
if (pResInfo->hasResult != DATA_SET_FLAG) { if (pResInfo->hasResult != DATA_SET_FLAG) {
tscTrace("no result generated, result is set to NULL");
if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) { if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pCtx->aOutputBuf, pCtx->outputType); setVardataNull(pCtx->aOutputBuf, pCtx->outputType);
} else { } else {
......
...@@ -48,7 +48,7 @@ static int32_t tscToInteger(SSQLToken *pToken, int64_t *value, char **endPtr) { ...@@ -48,7 +48,7 @@ static int32_t tscToInteger(SSQLToken *pToken, int64_t *value, char **endPtr) {
int32_t radix = 10; int32_t radix = 10;
int32_t radixList[3] = {16, 8, 2}; int32_t radixList[3] = {16, 8, 2}; // the integer number with different radix: hex, oct, bin
if (pToken->type == TK_HEX || pToken->type == TK_OCT || pToken->type == TK_BIN) { if (pToken->type == TK_HEX || pToken->type == TK_OCT || pToken->type == TK_BIN) {
radix = radixList[pToken->type - TK_HEX]; radix = radixList[pToken->type - TK_HEX];
} }
......
...@@ -494,7 +494,6 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { ...@@ -494,7 +494,6 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
tsem_init(&pSql->rspSem, 0, 0); tsem_init(&pSql->rspSem, 0, 0);
pSql->signature = pSql; pSql->signature = pSql;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
//pSql->pTscObj->pSql = pSql;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM; pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
pStmt->pSql = pSql; pStmt->pSql = pSql;
...@@ -515,7 +514,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { ...@@ -515,7 +514,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
//doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen); //doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
pSql->param = (void*)pSql; pSql->param = (void*) pSql;
pSql->fp = waitForQueryRsp; pSql->fp = waitForQueryRsp;
pSql->insertType = TSDB_QUERY_TYPE_STMT_INSERT; pSql->insertType = TSDB_QUERY_TYPE_STMT_INSERT;
......
...@@ -217,7 +217,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -217,7 +217,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->numOfBuffer = numOfFlush; pReducer->numOfBuffer = numOfFlush;
pReducer->numOfVnode = numOfBuffer; pReducer->numOfVnode = numOfBuffer;
pReducer->pDesc = pDesc; pReducer->pDesc = pDesc;
tscTrace("%p the number of merged leaves is: %d", pSql, pReducer->numOfBuffer); tscTrace("%p the number of merged leaves is: %d", pSql, pReducer->numOfBuffer);
...@@ -278,6 +278,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -278,6 +278,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
param->groupOrderType = pQueryInfo->groupbyExpr.orderType; param->groupOrderType = pQueryInfo->groupbyExpr.orderType;
pReducer->orderPrjOnSTable = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0);
pRes->code = tLoserTreeCreate(&pReducer->pLoserTree, pReducer->numOfBuffer, param, treeComparator); pRes->code = tLoserTreeCreate(&pReducer->pLoserTree, pReducer->numOfBuffer, param, treeComparator);
if (pReducer->pLoserTree == NULL || pRes->code != 0) { if (pReducer->pLoserTree == NULL || pRes->code != 0) {
...@@ -309,10 +310,10 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -309,10 +310,10 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->nResultBufSize = pMemBuffer[0]->pageSize * 16; pReducer->nResultBufSize = pMemBuffer[0]->pageSize * 16;
pReducer->pResultBuf = (tFilePage *)calloc(1, pReducer->nResultBufSize + sizeof(tFilePage)); pReducer->pResultBuf = (tFilePage *)calloc(1, pReducer->nResultBufSize + sizeof(tFilePage));
int32_t finalRowLength = tscGetResRowLength(pQueryInfo->exprList); pReducer->finalRowSize = tscGetResRowLength(pQueryInfo->exprList);
pReducer->resColModel = finalmodel; pReducer->resColModel = finalmodel;
pReducer->resColModel->capacity = pReducer->nResultBufSize / finalRowLength; pReducer->resColModel->capacity = pReducer->nResultBufSize / pReducer->finalRowSize;
assert(finalRowLength <= pReducer->rowSize); assert(pReducer->finalRowSize <= pReducer->rowSize);
pReducer->pFinalRes = calloc(1, pReducer->rowSize * pReducer->resColModel->capacity); pReducer->pFinalRes = calloc(1, pReducer->rowSize * pReducer->resColModel->capacity);
// pReducer->pBufForInterpo = calloc(1, pReducer->nResultBufSize); // pReducer->pBufForInterpo = calloc(1, pReducer->nResultBufSize);
...@@ -389,7 +390,7 @@ static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor ...@@ -389,7 +390,7 @@ static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor
assert(pPage->num <= pDesc->pColumnModel->capacity); assert(pPage->num <= pDesc->pColumnModel->capacity);
// sort before flush to disk, the data must be consecutively put on tFilePage. // sort before flush to disk, the data must be consecutively put on tFilePage.
if (pDesc->orderIdx.numOfCols > 0) { if (pDesc->orderInfo.numOfCols > 0) {
tColDataQSort(pDesc, pPage->num, 0, pPage->num - 1, pPage->data, orderType); tColDataQSort(pDesc, pPage->num, 0, pPage->num - 1, pPage->data, orderType);
} }
...@@ -590,12 +591,10 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm ...@@ -590,12 +591,10 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage *tmpBuffer) { bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage *tmpBuffer) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
int16_t functionId = tscSqlExprGet(pQueryInfo, 0)->functionId;
// disable merge procedure for column projection query // disable merge procedure for column projection query
int16_t functionId = pReducer->pCtx[0].functionId;
assert(functionId != TSDB_FUNC_ARITHM); assert(functionId != TSDB_FUNC_ARITHM);
if (pReducer->orderPrjOnSTable) {
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
return true; return true;
} }
...@@ -604,26 +603,33 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage ...@@ -604,26 +603,33 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage
} }
tOrderDescriptor *pOrderDesc = pReducer->pDesc; tOrderDescriptor *pOrderDesc = pReducer->pDesc;
int32_t numOfCols = pOrderDesc->orderIdx.numOfCols; SColumnOrderInfo* orderInfo = &pOrderDesc->orderInfo;
// no group by columns, all data belongs to one group // no group by columns, all data belongs to one group
int32_t numOfCols = orderInfo->numOfCols;
if (numOfCols <= 0) { if (numOfCols <= 0) {
return true; return true;
} }
if (pOrderDesc->orderIdx.pData[numOfCols - 1] == PRIMARYKEY_TIMESTAMP_COL_INDEX) { //<= 0 if (orderInfo->pData[numOfCols - 1] == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
// super table interval query /*
* super table interval query
* if the order columns is the primary timestamp, all result data belongs to one group
*/
assert(pQueryInfo->intervalTime > 0); assert(pQueryInfo->intervalTime > 0);
pOrderDesc->orderIdx.numOfCols -= 1; if (numOfCols == 1) {
return true;
}
} else { // simple group by query } else { // simple group by query
assert(pQueryInfo->intervalTime == 0); assert(pQueryInfo->intervalTime == 0);
} }
// only one row exists // only one row exists
int32_t ret = compare_a(pOrderDesc, 1, 0, pPrev, 1, 0, tmpBuffer->data); int32_t index = orderInfo->pData[0];
pOrderDesc->orderIdx.numOfCols = numOfCols; int32_t offset = (pOrderDesc->pColumnModel)->pFields[index].offset;
return (ret == 0); int32_t ret = memcmp(pPrev + offset, tmpBuffer->data + offset, pOrderDesc->pColumnModel->rowSize - offset);
return ret == 0;
} }
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pOrderDesc, int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pOrderDesc,
...@@ -873,24 +879,24 @@ static void reversedCopyFromInterpolationToDstBuf(SQueryInfo *pQueryInfo, SSqlRe ...@@ -873,24 +879,24 @@ static void reversedCopyFromInterpolationToDstBuf(SQueryInfo *pQueryInfo, SSqlRe
* Note: pRes->pLocalReducer may be null, due to the fact that "tscDestroyLocalReducer" is called * Note: pRes->pLocalReducer may be null, due to the fact that "tscDestroyLocalReducer" is called
* by "interuptHandler" function in shell * by "interuptHandler" function in shell
*/ */
static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) { static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) {
SSqlCmd * pCmd = &pSql->cmd; SSqlCmd * pCmd = &pSql->cmd;
SSqlRes * pRes = &pSql->res; SSqlRes * pRes = &pSql->res;
tFilePage * pFinalDataPage = pLocalReducer->pResultBuf; tFilePage * pFinalDataPage = pLocalReducer->pResultBuf;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (pRes->pLocalReducer != pLocalReducer) { // if (pRes->pLocalReducer != pLocalReducer) {
/* // /*
* Release the SSqlObj is called, and it is int destroying function invoked by other thread. // * Release the SSqlObj is called, and it is int destroying function invoked by other thread.
* However, the other thread will WAIT until current process fully completes. // * However, the other thread will WAIT until current process fully completes.
* Since the flag of release struct is set by doLocalReduce function // * Since the flag of release struct is set by doLocalReduce function
*/ // */
assert(pRes->pLocalReducer == NULL); // assert(pRes->pLocalReducer == NULL);
} // }
// no interval query, no fill operation
if (pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) { if (pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) {
// no interval query, no fill operation
pRes->data = pLocalReducer->pFinalRes; pRes->data = pLocalReducer->pFinalRes;
pRes->numOfRows = pFinalDataPage->num; pRes->numOfRows = pFinalDataPage->num;
pRes->numOfClauseTotal += pRes->numOfRows; pRes->numOfClauseTotal += pRes->numOfRows;
...@@ -929,9 +935,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo ...@@ -929,9 +935,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo); savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo);
} }
int32_t rowSize = tscGetResRowLength(pQueryInfo->exprList); memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * pLocalReducer->finalRowSize);
memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * rowSize);
pFinalDataPage->num = 0; pFinalDataPage->num = 0;
return; return;
} }
...@@ -1037,16 +1041,13 @@ static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) ...@@ -1037,16 +1041,13 @@ static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer)
static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, bool needInit) { static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, bool needInit) {
// the tag columns need to be set before all functions execution // the tag columns need to be set before all functions execution
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
size_t size = tscSqlExprNumOfExprs(pQueryInfo); size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t j = 0; j < size; ++j) { for (int32_t j = 0; j < size; ++j) {
SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, j);
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[j]; SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[j];
tVariantAssign(&pCtx->param[0], &pExpr->param[0]);
// tags/tags_dummy function, the tag field of SQLFunctionCtx is from the input buffer // tags/tags_dummy function, the tag field of SQLFunctionCtx is from the input buffer
int32_t functionId = pExpr->functionId; int32_t functionId = pCtx->functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS_DUMMY) { if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS_DUMMY) {
tVariantDestroy(&pCtx->tag); tVariantDestroy(&pCtx->tag);
char* input = pCtx->aInputElemBuf; char* input = pCtx->aInputElemBuf;
...@@ -1057,17 +1058,20 @@ static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, ...@@ -1057,17 +1058,20 @@ static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer,
} else { } else {
tVariantCreateFromBinary(&pCtx->tag, input, pCtx->inputBytes, pCtx->inputType); tVariantCreateFromBinary(&pCtx->tag, input, pCtx->inputBytes, pCtx->inputType);
} }
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, j);
pCtx->param[0].i64Key = pExpr->param[0].i64Key;
} }
pCtx->currentStage = SECONDARY_STAGE_MERGE; pCtx->currentStage = SECONDARY_STAGE_MERGE;
if (needInit) { if (needInit) {
aAggs[pExpr->functionId].init(pCtx); aAggs[pCtx->functionId].init(pCtx);
} }
} }
for (int32_t j = 0; j < size; ++j) { for (int32_t j = 0; j < size; ++j) {
int32_t functionId = tscSqlExprGet(pQueryInfo, j)->functionId; int32_t functionId = pLocalReducer->pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue; continue;
} }
...@@ -1101,8 +1105,7 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx) ...@@ -1101,8 +1105,7 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx)
* ts, tag, tagprj function can not decide the output number of current query * ts, tag, tagprj function can not decide the output number of current query
* the number of output result is decided by main output * the number of output result is decided by main output
*/ */
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, j); int32_t functionId = pCtx[j].functionId;
int32_t functionId = pExpr->functionId;
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ) { if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ) {
continue; continue;
} }
...@@ -1136,15 +1139,13 @@ static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLo ...@@ -1136,15 +1139,13 @@ static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLo
char *buf = malloc((size_t)maxBufSize); char *buf = malloc((size_t)maxBufSize);
for (int32_t k = 0; k < size; ++k) { for (int32_t k = 0; k < size; ++k) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k); SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[k];
if (pExpr->functionId != TSDB_FUNC_TAG) { if (pCtx->functionId != TSDB_FUNC_TAG) {
continue; continue;
} }
int32_t inc = numOfRes - 1; // tsdb_func_tag function only produce one row of result int32_t inc = numOfRes - 1; // tsdb_func_tag function only produce one row of result
memset(buf, 0, (size_t)maxBufSize); memset(buf, 0, (size_t)maxBufSize);
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[k];
memcpy(buf, pCtx->aOutputBuf, (size_t)pCtx->outputBytes); memcpy(buf, pCtx->aOutputBuf, (size_t)pCtx->outputBytes);
for (int32_t i = 0; i < inc; ++i) { for (int32_t i = 0; i < inc; ++i) {
...@@ -1160,8 +1161,8 @@ int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) { ...@@ -1160,8 +1161,8 @@ int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {
size_t size = tscSqlExprNumOfExprs(pQueryInfo); size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t k = 0; k < size; ++k) { for (int32_t k = 0; k < size; ++k) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k); SQLFunctionCtx* pCtx = &pLocalReducer->pCtx[k];
aAggs[pExpr->functionId].xFinalize(&pLocalReducer->pCtx[k]); aAggs[pCtx->functionId].xFinalize(pCtx);
} }
pLocalReducer->hasPrevRow = false; pLocalReducer->hasPrevRow = false;
...@@ -1182,13 +1183,13 @@ int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) { ...@@ -1182,13 +1183,13 @@ int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {
*/ */
bool needToMerge(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) { bool needToMerge(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) {
int32_t ret = 0; // merge all result by default int32_t ret = 0; // merge all result by default
int16_t functionId = tscSqlExprGet(pQueryInfo, 0)->functionId;
int16_t functionId = pLocalReducer->pCtx[0].functionId;
if (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_ARITHM) { // column projection query if (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_ARITHM) { // column projection query
ret = 1; // disable merge procedure ret = 1; // disable merge procedure
} else { } else {
tOrderDescriptor *pDesc = pLocalReducer->pDesc; tOrderDescriptor *pDesc = pLocalReducer->pDesc;
if (pDesc->orderIdx.numOfCols > 0) { if (pDesc->orderInfo.numOfCols > 0) {
if (pDesc->tsOrder == TSDB_ORDER_ASC) { // asc if (pDesc->tsOrder == TSDB_ORDER_ASC) { // asc
// todo refactor comparator // todo refactor comparator
ret = compare_a(pLocalReducer->pDesc, 1, 0, pLocalReducer->prevRowOfInput, 1, 0, tmpBuffer->data); ret = compare_a(pLocalReducer->pDesc, 1, 0, pLocalReducer->prevRowOfInput, 1, 0, tmpBuffer->data);
...@@ -1274,7 +1275,7 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no ...@@ -1274,7 +1275,7 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no
taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf); taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf);
} }
doInterpolateResult(pSql, pLocalReducer, noMoreCurrentGroupRes); doFillResult(pSql, pLocalReducer, noMoreCurrentGroupRes);
return true; return true;
} }
...@@ -1341,7 +1342,7 @@ static bool doBuildFilledResultForGroup(SSqlObj *pSql) { ...@@ -1341,7 +1342,7 @@ static bool doBuildFilledResultForGroup(SSqlObj *pSql) {
// the first column must be the timestamp column // the first column must be the timestamp column
int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, ekey, pLocalReducer->resColModel->capacity); int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, ekey, pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo if (rows > 0) { // do interpo
doInterpolateResult(pSql, pLocalReducer, false); doFillResult(pSql, pLocalReducer, false);
} }
return true; return true;
...@@ -1374,7 +1375,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { ...@@ -1374,7 +1375,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
pQueryInfo->slidingTimeUnit, tinfo.precision); pQueryInfo->slidingTimeUnit, tinfo.precision);
int32_t rows = taosGetNumOfResultWithFill(pFillInfo, 0, etime, pLocalReducer->resColModel->capacity); int32_t rows = taosGetNumOfResultWithFill(pFillInfo, 0, etime, pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo if (rows > 0) { // do interpo
doInterpolateResult(pSql, pLocalReducer, true); doFillResult(pSql, pLocalReducer, true);
} }
} }
...@@ -1408,13 +1409,11 @@ static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) { ...@@ -1408,13 +1409,11 @@ static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) {
size_t size = tscSqlExprNumOfExprs(pQueryInfo); size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t k = 0; k < size; ++k) { for (int32_t k = 0; k < size; ++k) {
SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, k);
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[k]; SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[k];
pCtx->aOutputBuf += pCtx->outputBytes * numOfRes; pCtx->aOutputBuf += pCtx->outputBytes * numOfRes;
// set the correct output timestamp column position // set the correct output timestamp column position
if (pExpr->functionId == TSDB_FUNC_TOP || pExpr->functionId == TSDB_FUNC_BOTTOM) { if (pCtx->functionId == TSDB_FUNC_TOP || pCtx->functionId == TSDB_FUNC_BOTTOM) {
pCtx->ptsOutputBuf = ((char *)pCtx->ptsOutputBuf + TSDB_KEYSIZE * numOfRes); pCtx->ptsOutputBuf = ((char *)pCtx->ptsOutputBuf + TSDB_KEYSIZE * numOfRes);
} }
} }
......
...@@ -2104,7 +2104,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) { ...@@ -2104,7 +2104,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
} }
void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) { void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) {
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(pFieldInfo, columnIndex); SFieldSupInfo* pInfo = taosArrayGet(pFieldInfo->pSupportInfo, columnIndex);//tscFieldInfoGetSupp(pFieldInfo, columnIndex);
assert(pInfo->pSqlExpr != NULL); assert(pInfo->pSqlExpr != NULL);
int32_t type = pInfo->pSqlExpr->resType; int32_t type = pInfo->pSqlExpr->resType;
......
...@@ -188,9 +188,10 @@ typedef void *TsdbPosT; ...@@ -188,9 +188,10 @@ typedef void *TsdbPosT;
* @param tsdb tsdb handle * @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block * @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param groupInfo tableId list in the form of set, seperated into different groups according to group by condition * @param groupInfo tableId list in the form of set, seperated into different groups according to group by condition
* @param qinfo query info handle from query processor
* @return * @return
*/ */
TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo); TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void* qinfo);
/** /**
* Get the last row of the given query time window for all the tables in STableGroupInfo object. * Get the last row of the given query time window for all the tables in STableGroupInfo object.
...@@ -202,11 +203,11 @@ TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STable ...@@ -202,11 +203,11 @@ TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STable
* @param groupInfo tableId list. * @param groupInfo tableId list.
* @return * @return
*/ */
TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo); TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void* qinfo);
SArray* tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle); SArray* tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle);
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList); TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo);
/** /**
* move to next block if exists * move to next block if exists
......
...@@ -172,10 +172,11 @@ typedef struct SQueryRuntimeEnv { ...@@ -172,10 +172,11 @@ typedef struct SQueryRuntimeEnv {
STSBuf* pTSBuf; STSBuf* pTSBuf;
STSCursor cur; STSCursor cur;
SQueryCostInfo summary; SQueryCostInfo summary;
bool stableQuery; // super table query or not bool stableQuery; // super table query or not
void* pQueryHandle; void* pQueryHandle;
void* pSecQueryHandle; // another thread for void* pSecQueryHandle; // another thread for
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
bool topBotQuery; // false;
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
typedef struct SQInfo { typedef struct SQInfo {
......
...@@ -31,7 +31,8 @@ void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot); ...@@ -31,7 +31,8 @@ void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot);
void closeAllTimeWindow(SWindowResInfo* pWindowResInfo); void closeAllTimeWindow(SWindowResInfo* pWindowResInfo);
void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order); void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order);
SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot); SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot);
int32_t curTimeWindow(SWindowResInfo *pWindowResInfo);
#define curTimeWindow(_winres) ((_winres)->curIndex)
bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot); bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot);
void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo); void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo);
......
...@@ -28,7 +28,7 @@ extern "C" { ...@@ -28,7 +28,7 @@ extern "C" {
#include "tdataformat.h" #include "tdataformat.h"
#include "talgo.h" #include "talgo.h"
#define DEFAULT_PAGE_SIZE (1024L*56) // 16k larger than the SHistoInfo #define DEFAULT_PAGE_SIZE (1024L*64) // 16k larger than the SHistoInfo
#define MAX_TMPFILE_PATH_LENGTH PATH_MAX #define MAX_TMPFILE_PATH_LENGTH PATH_MAX
#define INITIAL_ALLOCATION_BUFFER_SIZE 64 #define INITIAL_ALLOCATION_BUFFER_SIZE 64
...@@ -96,7 +96,7 @@ typedef struct SColumnOrderInfo { ...@@ -96,7 +96,7 @@ typedef struct SColumnOrderInfo {
typedef struct tOrderDescriptor { typedef struct tOrderDescriptor {
SColumnModel * pColumnModel; SColumnModel * pColumnModel;
int32_t tsOrder; // timestamp order type if exists int32_t tsOrder; // timestamp order type if exists
SColumnOrderInfo orderIdx; SColumnOrderInfo orderInfo;
} tOrderDescriptor; } tOrderDescriptor;
typedef struct tExtMemBuffer { typedef struct tExtMemBuffer {
......
...@@ -85,7 +85,7 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId); ...@@ -85,7 +85,7 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId);
* @param id * @param id
* @return * @return
*/ */
tFilePage* getResultBufferPageById(SDiskbasedResultBuf* pResultBuf, int32_t id); #define GET_RES_BUF_PAGE_BY_ID(buf, id) ((tFilePage*)((buf)->pBuf + DEFAULT_INTERN_BUF_PAGE_SIZE*(id)))
/** /**
* get the total buffer size in the format of disk file * get the total buffer size in the format of disk file
......
...@@ -647,9 +647,9 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). { ...@@ -647,9 +647,9 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). {
} }
////////////////////////////////////////kill statement/////////////////////////////////////// ////////////////////////////////////////kill statement///////////////////////////////////////
cmd ::= KILL CONNECTION IPTOKEN(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); setKillSQL(pInfo, TSDB_SQL_KILL_CONNECTION, &X);} cmd ::= KILL CONNECTION INTEGER(Y). {setKillSQL(pInfo, TSDB_SQL_KILL_CONNECTION, &Y);}
cmd ::= KILL STREAM IPTOKEN(X) COLON(Z) INTEGER(Y) COLON(K) INTEGER(F). {X.n += (Z.n + Y.n + K.n + F.n); setKillSQL(pInfo, TSDB_SQL_KILL_STREAM, &X);} cmd ::= KILL STREAM INTEGER(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); setKillSQL(pInfo, TSDB_SQL_KILL_STREAM, &X);}
cmd ::= KILL QUERY IPTOKEN(X) COLON(Z) INTEGER(Y) COLON(K) INTEGER(F). {X.n += (Z.n + Y.n + K.n + F.n); setKillSQL(pInfo, TSDB_SQL_KILL_QUERY, &X);} cmd ::= KILL QUERY INTEGER(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); setKillSQL(pInfo, TSDB_SQL_KILL_QUERY, &X);}
%fallback ID ABORT AFTER ASC ATTACH BEFORE BEGIN CASCADE CLUSTER CONFLICT COPY DATABASE DEFERRED %fallback ID ABORT AFTER ASC ATTACH BEFORE BEGIN CASCADE CLUSTER CONFLICT COPY DATABASE DEFERRED
DELIMITERS DESC DETACH EACH END EXPLAIN FAIL FOR GLOB IGNORE IMMEDIATE INITIALLY INSTEAD DELIMITERS DESC DETACH EACH END EXPLAIN FAIL FOR GLOB IGNORE IMMEDIATE INITIALLY INSTEAD
......
...@@ -272,9 +272,18 @@ bool top_bot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, char *mi ...@@ -272,9 +272,18 @@ bool top_bot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, char *mi
bool stableQueryFunctChanged(int32_t funcId); bool stableQueryFunctChanged(int32_t funcId);
void resetResultInfo(SResultInfo *pResInfo); void resetResultInfo(SResultInfo *pResInfo);
void initResultInfo(SResultInfo *pResInfo);
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable); void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable);
static FORCE_INLINE void initResultInfo(SResultInfo *pResInfo) {
pResInfo->initialized = true; // the this struct has been initialized flag
pResInfo->complete = false;
pResInfo->hasResult = false;
pResInfo->numOfRes = 0;
memset(pResInfo->interResultBuf, 0, (size_t)pResInfo->bufLen);
}
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
此差异已折叠。
...@@ -206,11 +206,6 @@ bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot) { ...@@ -206,11 +206,6 @@ bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot) {
return (getWindowResult(pWindowResInfo, slot)->status.closed == true); return (getWindowResult(pWindowResInfo, slot)->status.closed == true);
} }
int32_t curTimeWindow(SWindowResInfo *pWindowResInfo) {
assert(pWindowResInfo->curIndex >= 0 && pWindowResInfo->curIndex < pWindowResInfo->size);
return pWindowResInfo->curIndex;
}
void closeTimeWindow(SWindowResInfo *pWindowResInfo, int32_t slot) { void closeTimeWindow(SWindowResInfo *pWindowResInfo, int32_t slot) {
getWindowResult(pWindowResInfo, slot)->status.closed = true; getWindowResult(pWindowResInfo, slot)->status.closed = true;
} }
......
...@@ -356,17 +356,15 @@ static FORCE_INLINE int32_t primaryKeyComparator(int64_t f1, int64_t f2, int32_t ...@@ -356,17 +356,15 @@ static FORCE_INLINE int32_t primaryKeyComparator(int64_t f1, int64_t f2, int32_t
static FORCE_INLINE int32_t columnValueAscendingComparator(char *f1, char *f2, int32_t type, int32_t bytes) { static FORCE_INLINE int32_t columnValueAscendingComparator(char *f1, char *f2, int32_t type, int32_t bytes) {
switch (type) { switch (type) {
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
int32_t first = *(int32_t *)f1; int32_t first = *(int32_t *) f1;
int32_t second = *(int32_t *)f2; int32_t second = *(int32_t *) f2;
if (first == second) { if (first == second) {
return 0; return 0;
} }
return (first < second) ? -1 : 1; return (first < second) ? -1 : 1;
}; };
case TSDB_DATA_TYPE_DOUBLE: { case TSDB_DATA_TYPE_DOUBLE: {
//double first = *(double *)f1; double first = GET_DOUBLE_VAL(f1);
double first = GET_DOUBLE_VAL(f1);
//double second = *(double *)f2;
double second = GET_DOUBLE_VAL(f2); double second = GET_DOUBLE_VAL(f2);
if (first == second) { if (first == second) {
return 0; return 0;
...@@ -374,9 +372,7 @@ static FORCE_INLINE int32_t columnValueAscendingComparator(char *f1, char *f2, i ...@@ -374,9 +372,7 @@ static FORCE_INLINE int32_t columnValueAscendingComparator(char *f1, char *f2, i
return (first < second) ? -1 : 1; return (first < second) ? -1 : 1;
}; };
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_FLOAT: {
//float first = *(float *)f1; float first = GET_FLOAT_VAL(f1);
//float second = *(float *)f2;
float first = GET_FLOAT_VAL(f1);
float second = GET_FLOAT_VAL(f2); float second = GET_FLOAT_VAL(f2);
if (first == second) { if (first == second) {
return 0; return 0;
...@@ -439,9 +435,9 @@ int32_t compare_a(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1, ...@@ -439,9 +435,9 @@ int32_t compare_a(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1,
int32_t s2, char *data2) { int32_t s2, char *data2) {
assert(numOfRows1 == numOfRows2); assert(numOfRows1 == numOfRows2);
int32_t cmpCnt = pDescriptor->orderIdx.numOfCols; int32_t cmpCnt = pDescriptor->orderInfo.numOfCols;
for (int32_t i = 0; i < cmpCnt; ++i) { for (int32_t i = 0; i < cmpCnt; ++i) {
int32_t colIdx = pDescriptor->orderIdx.pData[i]; int32_t colIdx = pDescriptor->orderInfo.pData[i];
char *f1 = COLMODEL_GET_VAL(data1, pDescriptor->pColumnModel, numOfRows1, s1, colIdx); char *f1 = COLMODEL_GET_VAL(data1, pDescriptor->pColumnModel, numOfRows1, s1, colIdx);
char *f2 = COLMODEL_GET_VAL(data2, pDescriptor->pColumnModel, numOfRows2, s2, colIdx); char *f2 = COLMODEL_GET_VAL(data2, pDescriptor->pColumnModel, numOfRows2, s2, colIdx);
...@@ -471,9 +467,9 @@ int32_t compare_d(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1, ...@@ -471,9 +467,9 @@ int32_t compare_d(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1,
int32_t s2, char *data2) { int32_t s2, char *data2) {
assert(numOfRows1 == numOfRows2); assert(numOfRows1 == numOfRows2);
int32_t cmpCnt = pDescriptor->orderIdx.numOfCols; int32_t cmpCnt = pDescriptor->orderInfo.numOfCols;
for (int32_t i = 0; i < cmpCnt; ++i) { for (int32_t i = 0; i < cmpCnt; ++i) {
int32_t colIdx = pDescriptor->orderIdx.pData[i]; int32_t colIdx = pDescriptor->orderInfo.pData[i];
char *f1 = COLMODEL_GET_VAL(data1, pDescriptor->pColumnModel, numOfRows1, s1, colIdx); char *f1 = COLMODEL_GET_VAL(data1, pDescriptor->pColumnModel, numOfRows1, s1, colIdx);
char *f2 = COLMODEL_GET_VAL(data2, pDescriptor->pColumnModel, numOfRows2, s2, colIdx); char *f2 = COLMODEL_GET_VAL(data2, pDescriptor->pColumnModel, numOfRows2, s2, colIdx);
...@@ -563,13 +559,13 @@ static void median(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta ...@@ -563,13 +559,13 @@ static void median(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
int32_t midIdx = ((end - start) >> 1) + start; int32_t midIdx = ((end - start) >> 1) + start;
#if defined(_DEBUG_VIEW) #if defined(_DEBUG_VIEW)
int32_t f = pDescriptor->orderIdx.pData[0]; int32_t f = pDescriptor->orderInfo.pData[0];
char *midx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, midIdx, f); char *midx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, midIdx, f);
char *startx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, start, f); char *startx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, start, f);
char *endx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, end, f); char *endx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, end, f);
int32_t colIdx = pDescriptor->orderIdx.pData[0]; int32_t colIdx = pDescriptor->orderInfo.pData[0];
tSortDataPrint(pDescriptor->pColumnModel->pFields[colIdx].field.type, "before", startx, midx, endx); tSortDataPrint(pDescriptor->pColumnModel->pFields[colIdx].field.type, "before", startx, midx, endx);
#endif #endif
...@@ -596,7 +592,7 @@ static void median(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta ...@@ -596,7 +592,7 @@ static void median(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
} }
static UNUSED_FUNC void tRowModelDisplay(tOrderDescriptor *pDescriptor, int32_t numOfRows, char *d, int32_t len) { static UNUSED_FUNC void tRowModelDisplay(tOrderDescriptor *pDescriptor, int32_t numOfRows, char *d, int32_t len) {
int32_t colIdx = pDescriptor->orderIdx.pData[0]; int32_t colIdx = pDescriptor->orderInfo.pData[0];
for (int32_t i = 0; i < len; ++i) { for (int32_t i = 0; i < len; ++i) {
char *startx = COLMODEL_GET_VAL(d, pDescriptor->pColumnModel, numOfRows, i, colIdx); char *startx = COLMODEL_GET_VAL(d, pDescriptor->pColumnModel, numOfRows, i, colIdx);
...@@ -1062,9 +1058,9 @@ tOrderDescriptor *tOrderDesCreate(const int32_t *orderColIdx, int32_t numOfOrder ...@@ -1062,9 +1058,9 @@ tOrderDescriptor *tOrderDesCreate(const int32_t *orderColIdx, int32_t numOfOrder
desc->pColumnModel = pModel; desc->pColumnModel = pModel;
desc->tsOrder = tsOrderType; desc->tsOrder = tsOrderType;
desc->orderIdx.numOfCols = numOfOrderCols; desc->orderInfo.numOfCols = numOfOrderCols;
for (int32_t i = 0; i < numOfOrderCols; ++i) { for (int32_t i = 0; i < numOfOrderCols; ++i) {
desc->orderIdx.pData[i] = orderColIdx[i]; desc->orderInfo.pData[i] = orderColIdx[i];
} }
return desc; return desc;
......
...@@ -50,12 +50,6 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t si ...@@ -50,12 +50,6 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t si
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
tFilePage* getResultBufferPageById(SDiskbasedResultBuf* pResultBuf, int32_t id) {
assert(id < pResultBuf->numOfPages && id >= 0);
return (tFilePage*)(pResultBuf->pBuf + DEFAULT_INTERN_BUF_PAGE_SIZE * id);
}
int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->idsTable); } int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->idsTable); }
int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; } int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; }
...@@ -169,7 +163,7 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32 ...@@ -169,7 +163,7 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32
*pageId = (pResultBuf->allocateId++); *pageId = (pResultBuf->allocateId++);
registerPageId(pResultBuf, groupId, *pageId); registerPageId(pResultBuf, groupId, *pageId);
tFilePage* page = getResultBufferPageById(pResultBuf, *pageId); tFilePage* page = GET_RES_BUF_PAGE_BY_ID(pResultBuf, *pageId);
// clear memory for the new page // clear memory for the new page
memset(page, 0, DEFAULT_INTERN_BUF_PAGE_SIZE); memset(page, 0, DEFAULT_INTERN_BUF_PAGE_SIZE);
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "tulog.h" #include "tulog.h"
#include "talgo.h" #include "talgo.h"
#include "tutil.h" #include "tutil.h"
#include "ttime.h"
#include "tcompare.h" #include "tcompare.h"
#include "exception.h" #include "exception.h"
...@@ -71,6 +72,8 @@ typedef struct STableCheckInfo { ...@@ -71,6 +72,8 @@ typedef struct STableCheckInfo {
int32_t compSize; int32_t compSize;
int32_t numOfBlocks; // number of qualified data blocks not the original blocks int32_t numOfBlocks; // number of qualified data blocks not the original blocks
SDataCols* pDataCols; SDataCols* pDataCols;
int32_t chosen; // indicate which iterator should move forward
bool initBuf; // whether to initialize the in-memory skip list iterator or not bool initBuf; // whether to initialize the in-memory skip list iterator or not
SSkipListIterator* iter; // mem buffer skip list iterator SSkipListIterator* iter; // mem buffer skip list iterator
SSkipListIterator* iiter; // imem buffer skip list iterator SSkipListIterator* iiter; // imem buffer skip list iterator
...@@ -79,8 +82,6 @@ typedef struct STableCheckInfo { ...@@ -79,8 +82,6 @@ typedef struct STableCheckInfo {
typedef struct STableBlockInfo { typedef struct STableBlockInfo {
SCompBlock* compBlock; SCompBlock* compBlock;
STableCheckInfo* pTableCheckInfo; STableCheckInfo* pTableCheckInfo;
// int32_t blockIndex;
// int32_t groupIdx; /* number of group is less than the total number of tables */
} STableBlockInfo; } STableBlockInfo;
typedef struct SBlockOrderSupporter { typedef struct SBlockOrderSupporter {
...@@ -120,7 +121,7 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle); ...@@ -120,7 +121,7 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock, static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
SArray* sa); SArray* sa);
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey, static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
STsdbQueryHandle* pQueryHandle); STsdbQueryHandle* pQueryHandle);
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) { static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
...@@ -134,7 +135,7 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) { ...@@ -134,7 +135,7 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
pCompBlockLoadInfo->fileId = -1; pCompBlockLoadInfo->fileId = -1;
} }
TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList) { TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) {
// todo 1. filter not exist table // todo 1. filter not exist table
// todo 2. add the reference count for each table that is involved in query // todo 2. add the reference count for each table that is involved in query
...@@ -147,6 +148,7 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable ...@@ -147,6 +148,7 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
pQueryHandle->cur.win = TSWINDOW_INITIALIZER; pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
pQueryHandle->checkFiles = true;//ASCENDING_TRAVERSE(pQueryHandle->order); pQueryHandle->checkFiles = true;//ASCENDING_TRAVERSE(pQueryHandle->order);
pQueryHandle->activeIndex = 0; // current active table index pQueryHandle->activeIndex = 0; // current active table index
pQueryHandle->qinfo = qinfo;
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock; pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb); tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb);
...@@ -201,8 +203,8 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable ...@@ -201,8 +203,8 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
return (TsdbQueryHandleT) pQueryHandle; return (TsdbQueryHandleT) pQueryHandle;
} }
TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList) { TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo) {
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList); STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
pQueryHandle->type = TSDB_QUERY_TYPE_LAST; pQueryHandle->type = TSDB_QUERY_TYPE_LAST;
pQueryHandle->order = TSDB_ORDER_DESC; pQueryHandle->order = TSDB_ORDER_DESC;
...@@ -227,8 +229,8 @@ SArray* tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle) { ...@@ -227,8 +229,8 @@ SArray* tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle) {
return res; return res;
} }
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList) { TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo) {
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList); STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL; pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
// pQueryHandle->outputCapacity = 2; // only allowed two rows to be loaded // pQueryHandle->outputCapacity = 2; // only allowed two rows to be loaded
...@@ -303,6 +305,83 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh ...@@ -303,6 +305,83 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
return true; return true;
} }
SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) {
SDataRow rmem = NULL, rimem = NULL;
if (pCheckInfo->iter) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
if (node != NULL) {
rmem = SL_GET_NODE_DATA(node);
}
}
if (pCheckInfo->iiter) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
if (node != NULL) {
rimem = SL_GET_NODE_DATA(node);
}
}
if (rmem != NULL && rimem != NULL) {
if (dataRowKey(rmem) < dataRowKey(rimem)) {
pCheckInfo->chosen = 0;
return rmem;
} else if (dataRowKey(rmem) == dataRowKey(rimem)) {
// data ts are duplicated, ignore the data in mem
tSkipListIterNext(pCheckInfo->iter);
pCheckInfo->chosen = 1;
return rimem;
} else {
pCheckInfo->chosen = 1;
return rimem;
}
}
if (rmem != NULL) {
pCheckInfo->chosen = 0;
return rmem;
}
if (rimem != NULL) {
pCheckInfo->chosen = 1;
return rimem;
}
return NULL;
}
bool moveToNextRow(STableCheckInfo* pCheckInfo) {
bool hasNext = false;
if (pCheckInfo->chosen == 0) {
if (pCheckInfo->iter != NULL) {
hasNext = tSkipListIterNext(pCheckInfo->iter);
}
if (hasNext) {
return hasNext;
}
if (pCheckInfo->iiter != NULL) {
return tSkipListIterGet(pCheckInfo->iiter) != NULL;
}
} else {
if (pCheckInfo->chosen == 1) {
if (pCheckInfo->iiter != NULL) {
hasNext = tSkipListIterNext(pCheckInfo->iiter);
}
if (hasNext) {
return hasNext;
}
if (pCheckInfo->iter != NULL) {
return tSkipListIterGet(pCheckInfo->iter) != NULL;
}
}
}
return hasNext;
}
static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
size_t size = taosArrayGetSize(pHandle->pTableCheckInfo); size_t size = taosArrayGetSize(pHandle->pTableCheckInfo);
assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1); assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
...@@ -312,31 +391,13 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { ...@@ -312,31 +391,13 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
STable* pTable = pCheckInfo->pTableObj; STable* pTable = pCheckInfo->pTableObj;
assert(pTable != NULL); assert(pTable != NULL);
// no data in cache, abort
if (pTable->mem == NULL && pTable->imem == NULL) {
return false;
}
if (pCheckInfo->iter == NULL && pTable->mem) {
pCheckInfo->iter = tSkipListCreateIterFromVal(pTable->mem->pData, (const char*) &pCheckInfo->lastKey,
TSDB_DATA_TYPE_TIMESTAMP, pHandle->order);
if (pCheckInfo->iter == NULL) {
return false;
}
if (!tSkipListIterNext(pCheckInfo->iter)) { // buffer is empty
return false;
}
}
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); initTableMemIterator(pHandle, pCheckInfo);
if (node == NULL) { SDataRow row = getSDataRowInTableMem(pCheckInfo);
if (row == NULL) {
return false; return false;
} }
SDataRow row = SL_GET_NODE_DATA(node);
pCheckInfo->lastKey = dataRowKey(row); // first timestamp in buffer pCheckInfo->lastKey = dataRowKey(row); // first timestamp in buffer
tsdbTrace("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d, %p", pHandle, tsdbTrace("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d, %p", pHandle,
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qinfo); pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qinfo);
...@@ -349,7 +410,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { ...@@ -349,7 +410,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
int32_t step = ASCENDING_TRAVERSE(pHandle->order)? 1:-1; int32_t step = ASCENDING_TRAVERSE(pHandle->order)? 1:-1;
STimeWindow* win = &pHandle->cur.win; STimeWindow* win = &pHandle->cur.win;
pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, pHandle->window.ekey, pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey,
pHandle->outputCapacity, &win->skey, &win->ekey, pHandle); // todo refactor API pHandle->outputCapacity, &win->skey, &win->ekey, pHandle); // todo refactor API
// update the last key value // update the last key value
...@@ -382,7 +443,7 @@ static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precisio ...@@ -382,7 +443,7 @@ static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precisio
return fid; return fid;
} }
static int32_t binarySearchForBlockImpl(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) { static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
int32_t firstSlot = 0; int32_t firstSlot = 0;
int32_t lastSlot = numOfBlocks - 1; int32_t lastSlot = numOfBlocks - 1;
...@@ -448,7 +509,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo ...@@ -448,7 +509,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
TSKEY e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey); TSKEY e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey);
// discard the unqualified data block based on the query time window // discard the unqualified data block based on the query time window
int32_t start = binarySearchForBlockImpl(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC); int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
int32_t end = start; int32_t end = start;
if (s > pCompInfo->blocks[start].keyLast) { if (s > pCompInfo->blocks[start].keyLast) {
...@@ -522,7 +583,9 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo ...@@ -522,7 +583,9 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
bool blockLoaded = false; bool blockLoaded = false;
SArray* sa = getDefaultLoadColumns(pQueryHandle, true); SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
int64_t st = taosGetTimestampUs();
if (pCheckInfo->pDataCols == NULL) { if (pCheckInfo->pDataCols == NULL) {
STsdbMeta* pMeta = tsdbGetMeta(pRepo); STsdbMeta* pMeta = tsdbGetMeta(pRepo);
pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock); pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
...@@ -540,9 +603,15 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo ...@@ -540,9 +603,15 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
blockLoaded = true; blockLoaded = true;
} }
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
assert(pCols->numOfRows != 0);
taosArrayDestroy(sa); taosArrayDestroy(sa);
tfree(data); tfree(data);
int64_t et = taosGetTimestampUs() - st;
tsdbTrace("%p load file block into buffer, elapsed time:%"PRId64 " us", pQueryHandle, et);
return blockLoaded; return blockLoaded;
} }
...@@ -600,7 +669,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* ...@@ -600,7 +669,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
// do not load file block into buffer // do not load file block into buffer
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
cur->rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, binfo.window.skey - step, cur->rows = tsdbReadRowsFromCache(pCheckInfo, binfo.window.skey - step,
pQueryHandle->outputCapacity, &cur->win.skey, &cur->win.ekey, pQueryHandle); pQueryHandle->outputCapacity, &cur->win.skey, &cur->win.ekey, pQueryHandle);
pQueryHandle->realNumOfRows = cur->rows; pQueryHandle->realNumOfRows = cur->rows;
...@@ -649,7 +718,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock ...@@ -649,7 +718,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) { if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
return false; return false;
} }
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0]; SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows); assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
...@@ -1155,7 +1224,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO ...@@ -1155,7 +1224,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
*numOfAllocBlocks = numOfBlocks; *numOfAllocBlocks = numOfBlocks;
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
SBlockOrderSupporter sup = {0}; SBlockOrderSupporter sup = {0};
sup.numOfTables = numOfTables; sup.numOfTables = numOfTables;
sup.numOfBlocksPerTable = calloc(1, sizeof(int32_t) * numOfTables); sup.numOfBlocksPerTable = calloc(1, sizeof(int32_t) * numOfTables);
...@@ -1192,15 +1261,26 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO ...@@ -1192,15 +1261,26 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
pBlockInfo->compBlock = &pBlock[k]; pBlockInfo->compBlock = &pBlock[k];
pBlockInfo->pTableCheckInfo = pTableCheck; pBlockInfo->pTableCheckInfo = pTableCheck;
// pBlockInfo->groupIdx = pTableCheckInfo[j]->groupIdx; // set the group index
// pBlockInfo->blockIndex = pTableCheckInfo[j]->start + k; // set the block index in original table
cnt++; cnt++;
} }
numOfQualTables++; numOfQualTables++;
} }
tsdbTrace("%p create data blocks info struct completed, %d blocks in %d tables", pQueryHandle, cnt, numOfQualTables); assert(numOfBlocks == cnt);
// since there is only one table qualified, blocks are not sorted
if (numOfQualTables == 1) {
memcpy(pQueryHandle->pDataBlockInfo, sup.pDataBlockInfo[0], sizeof(STableBlockInfo) * numOfBlocks);
cleanBlockOrderSupporter(&sup, numOfQualTables);
tsdbTrace("%p create data blocks info struct completed for 1 table, %d blocks not sorted %p ", pQueryHandle, cnt,
pQueryHandle->qinfo);
return TSDB_CODE_SUCCESS;
}
tsdbTrace("%p create data blocks info struct completed, %d blocks in %d tables %p", pQueryHandle, cnt,
numOfQualTables, pQueryHandle->qinfo);
assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pTableQueryInfo[j]->numOfBlocks may be 0 assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pTableQueryInfo[j]->numOfBlocks may be 0
sup.numOfTables = numOfQualTables; sup.numOfTables = numOfQualTables;
...@@ -1257,8 +1337,8 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) { ...@@ -1257,8 +1337,8 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) {
break; break;
} }
tsdbTrace("%p %d blocks found in file for %d table(s), fid:%d", pQueryHandle, numOfBlocks, tsdbTrace("%p %d blocks found in file for %d table(s), fid:%d, %p", pQueryHandle, numOfBlocks,
numOfTables, pQueryHandle->pFileGroup->fileId); numOfTables, pQueryHandle->pFileGroup->fileId, pQueryHandle->qinfo);
assert(numOfBlocks >= 0); assert(numOfBlocks >= 0);
if (numOfBlocks == 0) { if (numOfBlocks == 0) {
...@@ -1565,19 +1645,22 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) { ...@@ -1565,19 +1645,22 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) {
pQueryHandle->window = (STimeWindow) {info.lastKey, TSKEY_INITIAL_VAL}; pQueryHandle->window = (STimeWindow) {info.lastKey, TSKEY_INITIAL_VAL};
} }
static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey, static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
STsdbQueryHandle* pQueryHandle) { STsdbQueryHandle* pQueryHandle) {
int numOfRows = 0; int numOfRows = 0;
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns); int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
*skey = TSKEY_INITIAL_VAL; *skey = TSKEY_INITIAL_VAL;
int64_t st = taosGetTimestampUs();
STSchema* pSchema = tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj);
int32_t numOfTableCols = schemaNCols(pSchema);
do { do {
SSkipListNode* node = tSkipListIterGet(pIter); SDataRow row = getSDataRowInTableMem(pCheckInfo);
if (node == NULL) { if (row == NULL) {
break; break;
} }
SDataRow row = SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row); TSKEY key = dataRowKey(row);
if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) || if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
...@@ -1590,49 +1673,69 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY ...@@ -1590,49 +1673,69 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY
} }
if (*skey == INT64_MIN) { if (*skey == INT64_MIN) {
*skey = dataRowKey(row); *skey = key;
} }
*ekey = dataRowKey(row); *ekey = key;
int32_t offset = -1;
char* pData = NULL; char* pData = NULL;
STSchema* pSchema = tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pTable);
int32_t numOfTableCols = schemaNCols(pSchema);
for (int32_t i = 0; i < numOfCols; ++i) { int32_t i = 0, j = 0;
while(i < numOfCols && j < numOfTableCols) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (pSchema->columns[j].colId < pColInfo->info.colId) {
j++;
continue;
}
if (ASCENDING_TRAVERSE(pQueryHandle->order)) { if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else { } else {
pData = pColInfo->pData + (maxRowsToRead - numOfRows - 1) * pColInfo->info.bytes; pData = pColInfo->pData + (maxRowsToRead - numOfRows - 1) * pColInfo->info.bytes;
} }
for(int32_t j = 0; j < numOfTableCols; ++j) { if (pSchema->columns[j].colId == pColInfo->info.colId) {
if (pColInfo->info.colId == pSchema->columns[j].colId) { void* value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
offset = pSchema->columns[j].offset; if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
break; memcpy(pData, value, varDataTLen(value));
} else {
memcpy(pData, value, pColInfo->info.bytes);
} }
j++;
i++;
} else { // pColInfo->info.colId < pSchema->columns[j].colId, it is a NULL data
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pData, pColInfo->info.type);
} else {
setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
}
i++;
} }
}
assert(offset != -1); // todo handle error
void *value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + offset); while (i < numOfCols) { // the remain columns are all null data
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else {
pData = pColInfo->pData + (maxRowsToRead - numOfRows - 1) * pColInfo->info.bytes;
}
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
memcpy(pData, value, varDataTLen(value)); setVardataNull(pData, pColInfo->info.type);
} else { } else {
memcpy(pData, value, pColInfo->info.bytes); setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
} }
i++;
} }
if (++numOfRows >= maxRowsToRead) { if (++numOfRows >= maxRowsToRead) {
tSkipListIterNext(pIter); moveToNextRow(pCheckInfo);
break; break;
} }
} while(tSkipListIterNext(pIter)); } while(moveToNextRow(pCheckInfo));
assert(numOfRows <= maxRowsToRead); assert(numOfRows <= maxRowsToRead);
...@@ -1646,6 +1749,10 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY ...@@ -1646,6 +1749,10 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY
} }
} }
int64_t elapsedTime = taosGetTimestampUs() - st;
tsdbTrace("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d", pQueryHandle,
elapsedTime, numOfRows, numOfCols);
return numOfRows; return numOfRows;
} }
......
...@@ -153,6 +153,8 @@ bool taosMbsToUcs4(char *mbs, size_t mbs_len, char *ucs4, int32_t ucs4_max_len, ...@@ -153,6 +153,8 @@ bool taosMbsToUcs4(char *mbs, size_t mbs_len, char *ucs4, int32_t ucs4_max_len,
int tasoUcs4Compare(void* f1_ucs4, void *f2_ucs4, int bytes); int tasoUcs4Compare(void* f1_ucs4, void *f2_ucs4, int bytes);
void taosRandStr(char* str, int32_t size);
int32_t taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs); int32_t taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs);
bool taosValidateEncodec(const char *encodec); bool taosValidateEncodec(const char *encodec);
......
...@@ -92,12 +92,14 @@ void* taosArrayGet(const SArray* pArray, size_t index) { ...@@ -92,12 +92,14 @@ void* taosArrayGet(const SArray* pArray, size_t index) {
} }
void* taosArrayGetP(const SArray* pArray, size_t index) { void* taosArrayGetP(const SArray* pArray, size_t index) {
void* ret = taosArrayGet(pArray, index); assert(index < pArray->size);
if (ret == NULL) {
void* d = TARRAY_GET_ELEM(pArray, index);
if (d == NULL) {
return NULL; return NULL;
} }
return *(void**)ret; return *(void**)d;
} }
size_t taosArrayGetSize(const SArray* pArray) { return pArray->size; } size_t taosArrayGetSize(const SArray* pArray) { return pArray->size; }
......
...@@ -27,8 +27,6 @@ ...@@ -27,8 +27,6 @@
#include "tulog.h" #include "tulog.h"
#include "taoserror.h" #include "taoserror.h"
int32_t tmpFileSerialNum = 0;
int32_t strdequote(char *z) { int32_t strdequote(char *z) {
if (z == NULL) { if (z == NULL) {
return 0; return 0;
...@@ -433,12 +431,24 @@ void getTmpfilePath(const char *fileNamePrefix, char *dstPath) { ...@@ -433,12 +431,24 @@ void getTmpfilePath(const char *fileNamePrefix, char *dstPath) {
#else #else
char *tmpDir = "/tmp/"; char *tmpDir = "/tmp/";
#endif #endif
int64_t ts = taosGetTimestampUs();
strcpy(tmpPath, tmpDir); strcpy(tmpPath, tmpDir);
strcat(tmpPath, tdengineTmpFileNamePrefix); strcat(tmpPath, tdengineTmpFileNamePrefix);
strcat(tmpPath, fileNamePrefix); strcat(tmpPath, fileNamePrefix);
strcat(tmpPath, "-%d-%"PRIu64"-%u-%"PRIu64); strcat(tmpPath, "-%d-%s");
snprintf(dstPath, PATH_MAX, tmpPath, getpid(), taosGetPthreadId(), atomic_add_fetch_32(&tmpFileSerialNum, 1), ts);
char rand[8] = {0};
taosRandStr(rand, tListLen(rand) - 1);
snprintf(dstPath, PATH_MAX, tmpPath, getpid(), rand);
}
void taosRandStr(char* str, int32_t size) {
const char* set = "abcdefghijklmnopqrstuvwxyz0123456789-_.";
int32_t len = 39;
for(int32_t i = 0; i < size; ++i) {
str[i] = set[rand()%len];
}
} }
int tasoUcs4Compare(void* f1_ucs4, void *f2_ucs4, int bytes) { int tasoUcs4Compare(void* f1_ucs4, void *f2_ucs4, int bytes) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册