diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index af91ac34f08658da67644d7f72209231b99eaeaf..2ca6ba669185d5bc8123d21e64805e65a8718945 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -210,7 +210,7 @@ void tscTagCondRelease(STagCond* pCond); void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo); void tscSetFreeHeatBeat(STscObj* pObj); -bool tscShouldFreeHeatBeat(SSqlObj* pHb); +bool tscShouldFreeHeartBeat(SSqlObj* pHb); bool tscShouldBeFreed(SSqlObj* pSql); STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex); diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 0733593284d4ec609c037fd79bebb228aea9804c..186c2871a13a3b6e1a196b6c57d9355fc305dc16 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -142,6 +142,7 @@ static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) { pFillCol[i].col.bytes = pExpr->resBytes; pFillCol[i].col.type = (int8_t)pExpr->resType; + pFillCol[i].col.colId = pExpr->colInfo.colId; pFillCol[i].flag = pExpr->colInfo.flag; pFillCol[i].col.offset = offset; pFillCol[i].functionId = pExpr->functionId; @@ -379,20 +380,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd 4096, (int32_t)numOfCols, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit, tinfo.precision, pQueryInfo->fillType, pFillCol); } - - int32_t startIndex = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols; - - if (pQueryInfo->groupbyExpr.numOfGroupCols > 0 && pReducer->pFillInfo != NULL) { - pReducer->pFillInfo->pTags[0] = (char *)pReducer->pFillInfo->pTags + POINTER_BYTES * pQueryInfo->groupbyExpr.numOfGroupCols; - for (int32_t i = 1; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) { - SSchema *pSchema = getColumnModelSchema(pReducer->resColModel, startIndex + i - 1); - pReducer->pFillInfo->pTags[i] = pSchema->bytes + pReducer->pFillInfo->pTags[i - 1]; - } - } else { - if (pReducer->pFillInfo != NULL) { - assert(pReducer->pFillInfo->pTags == NULL); - } - } } static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, @@ -835,7 +822,7 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource * } } -void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo, SFillInfo *pFillInfo) { +void savePrevRecordAndSetupFillInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo, SFillInfo *pFillInfo) { // discard following dataset in the same group and reset the interpolation information STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -878,64 +865,66 @@ static void reversedCopyFromInterpolationToDstBuf(SQueryInfo *pQueryInfo, SSqlRe } } -/* - * Note: pRes->pLocalReducer may be null, due to the fact that "tscDestroyLocalReducer" is called - * by "interuptHandler" function in shell - */ -static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) { - SSqlCmd * pCmd = &pSql->cmd; - SSqlRes * pRes = &pSql->res; - - tFilePage * pFinalDataPage = pLocalReducer->pResultBuf; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); +static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer, SQueryInfo* pQueryInfo) { + assert(pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE); - // no interval query, no fill operation - if (pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) { - pRes->data = pLocalReducer->pFinalRes; - pRes->numOfRows = pFinalDataPage->num; + tFilePage * pBeforeFillData = pLocalReducer->pResultBuf; - if (pQueryInfo->limit.offset > 0) { - if (pQueryInfo->limit.offset < pRes->numOfRows) { - int32_t prevSize = (int32_t)pFinalDataPage->num; - tColModelErase(pLocalReducer->resColModel, pFinalDataPage, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1); + pRes->data = pLocalReducer->pFinalRes; + pRes->numOfRows = pBeforeFillData->num; - /* remove the hole in column model */ - tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize); + if (pQueryInfo->limit.offset > 0) { + if (pQueryInfo->limit.offset < pRes->numOfRows) { + int32_t prevSize = (int32_t)pBeforeFillData->num; + tColModelErase(pLocalReducer->resColModel, pBeforeFillData, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1); - pRes->numOfRows -= pQueryInfo->limit.offset; - pQueryInfo->limit.offset = 0; - } else { - pQueryInfo->limit.offset -= pRes->numOfRows; - pRes->numOfRows = 0; - } + /* remove the hole in column model */ + tColModelCompact(pLocalReducer->resColModel, pBeforeFillData, prevSize); + + pRes->numOfRows -= pQueryInfo->limit.offset; + pQueryInfo->limit.offset = 0; + } else { + pQueryInfo->limit.offset -= pRes->numOfRows; + pRes->numOfRows = 0; } + } - pRes->numOfRowsGroup += pRes->numOfRows; + pRes->numOfRowsGroup += pRes->numOfRows; - if (pQueryInfo->limit.limit >= 0 && pRes->numOfRowsGroup > pQueryInfo->limit.limit) { - /* impose the limitation of output rows on the final result */ - int32_t prevSize = (int32_t)pFinalDataPage->num; - int32_t overflow = (int32_t)(pRes->numOfRowsGroup - pQueryInfo->limit.limit); - assert(overflow < pRes->numOfRows); + // impose the limitation of output rows on the final result + if (pQueryInfo->limit.limit >= 0 && pRes->numOfRowsGroup > pQueryInfo->limit.limit) { + int32_t prevSize = (int32_t)pBeforeFillData->num; + int32_t overflow = (int32_t)(pRes->numOfRowsGroup - pQueryInfo->limit.limit); + assert(overflow < pRes->numOfRows); - pRes->numOfRowsGroup = pQueryInfo->limit.limit; - pRes->numOfRows -= overflow; - pFinalDataPage->num -= overflow; + pRes->numOfRowsGroup = pQueryInfo->limit.limit; + pRes->numOfRows -= overflow; + pBeforeFillData->num -= overflow; - tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize); + tColModelCompact(pLocalReducer->resColModel, pBeforeFillData, prevSize); - /* set remain data to be discarded, and reset the interpolation information */ - savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo); - } + // set remain data to be discarded, and reset the interpolation information + savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo); + } - memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * pLocalReducer->finalRowSize); + memcpy(pRes->data, pBeforeFillData->data, pRes->numOfRows * pLocalReducer->finalRowSize); - pRes->numOfClauseTotal += pRes->numOfRows; - pFinalDataPage->num = 0; - return; - } + pRes->numOfClauseTotal += pRes->numOfRows; + pBeforeFillData->num = 0; +} + +/* + * Note: pRes->pLocalReducer may be null, due to the fact that "tscDestroyLocalReducer" is called + * by "interuptHandler" function in shell + */ +static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) { + SSqlCmd *pCmd = &pSql->cmd; + SSqlRes *pRes = &pSql->res; + + tFilePage *pBeforeFillData = pLocalReducer->pResultBuf; + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SFillInfo *pFillInfo = pLocalReducer->pFillInfo; - SFillInfo *pFillInfo = pLocalReducer->pFillInfo; int64_t actualETime = MAX(pQueryInfo->window.skey, pQueryInfo->window.ekey); tFilePage **pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput); @@ -973,7 +962,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO break; } - /* all output for current group are completed */ + // all output in current group are completed int32_t totalRemainRows = (int32_t)getFilledNumOfRes(pFillInfo, actualETime, pLocalReducer->resColModel->capacity); if (totalRemainRows <= 0) { break; @@ -983,15 +972,16 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO } if (pRes->numOfRows > 0) { - if (pQueryInfo->limit.limit >= 0 && pRes->numOfRows > pQueryInfo->limit.limit) { - int32_t overflow = (int32_t)(pRes->numOfRows - pQueryInfo->limit.limit); - pRes->numOfRows -= overflow; - pFinalDataPage->num -= overflow; + int32_t currentTotal = pRes->numOfRowsGroup + pRes->numOfRows; - assert(pRes->numOfRows >= 0 && pFinalDataPage->num > 0); + if (pQueryInfo->limit.limit >= 0 && currentTotal > pQueryInfo->limit.limit) { + int32_t overflow = (int32_t)(currentTotal - pQueryInfo->limit.limit); + + pRes->numOfRows -= overflow; + assert(pRes->numOfRows >= 0); /* set remain data to be discarded, and reset the interpolation information */ - savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pFillInfo); + savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pFillInfo); } if (pQueryInfo->order.order == TSDB_ORDER_ASC) { @@ -1008,7 +998,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO pRes->numOfClauseTotal += pRes->numOfRows; } - pFinalDataPage->num = 0; + pBeforeFillData->num = 0; for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { taosTFree(pResPages[i]); } @@ -1225,7 +1215,7 @@ static bool saveGroupResultInfo(SSqlObj *pSql) { * @param noMoreCurrentGroupRes * @return if current group is skipped, return false, and do NOT record it into pRes->numOfGroups */ -bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCurrentGroupRes) { +bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCurrentGroupRes) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; @@ -1259,13 +1249,21 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no // tColModelDisplay(pLocalReducer->resColModel, pLocalReducer->pBufForInterpo, pResBuf->num, pResBuf->num); #endif - SFillInfo* pFillInfo = pLocalReducer->pFillInfo; - if (pFillInfo != NULL) { - taosFillSetStartInfo(pFillInfo, (int32_t)pResBuf->num, pQueryInfo->window.ekey); - taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf); - } - doFillResult(pSql, pLocalReducer, noMoreCurrentGroupRes); + + // no interval query, no fill operation + if (pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) { + genFinalResWithoutFill(pRes, pLocalReducer, pQueryInfo); + } else { + SFillInfo* pFillInfo = pLocalReducer->pFillInfo; + if (pFillInfo != NULL) { + taosFillSetStartInfo(pFillInfo, (int32_t)pResBuf->num, pQueryInfo->window.ekey); + taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf); + } + + doFillResult(pSql, pLocalReducer, noMoreCurrentGroupRes); + } + return true; } @@ -1350,7 +1348,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { int64_t etime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey; int32_t rows = (int32_t)getFilledNumOfRes(pFillInfo, etime, pLocalReducer->resColModel->capacity); - if (rows > 0) { // do interpo + if (rows > 0) { doFillResult(pSql, pLocalReducer, true); } } @@ -1515,7 +1513,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { */ if ((!sameGroup && pResBuf->num > 0) || (pResBuf->num == pLocalReducer->resColModel->capacity)) { // does not belong to the same group - bool notSkipped = doGenerateFinalResults(pSql, pLocalReducer, !sameGroup); + bool notSkipped = genFinalResults(pSql, pLocalReducer, !sameGroup); // this row needs to discard, since it belongs to the group of previous if (pLocalReducer->discard && sameGroup) { @@ -1584,7 +1582,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { } if (pLocalReducer->pResultBuf->num) { - doGenerateFinalResults(pSql, pLocalReducer, true); + genFinalResults(pSql, pLocalReducer, true); } assert(pLocalReducer->status == TSC_LOCALREDUCE_IN_PROGRESS && pRes->row == 0); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 8d86f046ef5dcb0ae55f75683430a93b2814c683..6b75b680b10858ecb6789aba6a0f92aa61c3a38a 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -171,45 +171,25 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { void tscProcessActivityTimer(void *handle, void *tmrId) { STscObj *pObj = (STscObj *)handle; - - if (pObj == NULL) return; - if (pObj->signature != pObj) return; - if (pObj->pTimer != tmrId) return; - - if (pObj->pHb == NULL) { - SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); - if (NULL == pSql) return; - - pSql->fp = tscProcessHeartBeatRsp; - - SQueryInfo *pQueryInfo = NULL; - tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo); - pQueryInfo->command = TSDB_SQL_HB; - - pSql->cmd.command = TSDB_SQL_HB; - if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) { - taosTFree(pSql); - return; - } - - pSql->cmd.command = TSDB_SQL_HB; - pSql->param = pObj; - pSql->pTscObj = pObj; - pSql->signature = pSql; - pObj->pHb = pSql; - tscAddSubqueryInfo(&pObj->pHb->cmd); - - tscDebug("%p pHb is allocated, pObj:%p", pObj->pHb, pObj); + if (pObj == NULL || pObj->signature != pObj) { + return; } - if (tscShouldFreeHeatBeat(pObj->pHb)) { - tscDebug("%p free HB object and release connection", pObj); - tscFreeSqlObj(pObj->pHb); - tscCloseTscObj(pObj); + SSqlObj* pHB = pObj->pHb; + if (pObj->pTimer != tmrId || pHB == NULL) { return; } - tscProcessSql(pObj->pHb); + if (tscShouldFreeHeartBeat(pHB)) { + tscDebug("%p free HB object and release connection", pHB); + tscFreeSqlObj(pHB); + tscCloseTscObj(pObj); + } else { + int32_t code = tscProcessSql(pHB); + if (code != TSDB_CODE_SUCCESS) { + tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code)); + } + } } int tscSendMsgToServer(SSqlObj *pSql) { @@ -265,6 +245,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { return; } + pSql->pRpcCtx = NULL; // clear the rpcCtx + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) { tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", @@ -1953,6 +1935,35 @@ int tscProcessShowRsp(SSqlObj *pSql) { return 0; } +static void createHBObj(STscObj* pObj) { + if (pObj->pHb != NULL) { + return; + } + + SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); + if (NULL == pSql) return; + + pSql->fp = tscProcessHeartBeatRsp; + + SQueryInfo *pQueryInfo = NULL; + tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo); + pQueryInfo->command = TSDB_SQL_HB; + + pSql->cmd.command = pQueryInfo->command; + if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) { + taosTFree(pSql); + return; + } + + pSql->param = pObj; + pSql->pTscObj = pObj; + pSql->signature = pSql; + pObj->pHb = pSql; + tscAddSubqueryInfo(&pObj->pHb->cmd); + + tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj); +} + int tscProcessConnectRsp(SSqlObj *pSql) { char temp[TSDB_TABLE_FNAME_LEN * 2]; STscObj *pObj = pSql->pTscObj; @@ -1974,6 +1985,9 @@ int tscProcessConnectRsp(SSqlObj *pSql) { pObj->writeAuth = pConnect->writeAuth; pObj->superAuth = pConnect->superAuth; pObj->connId = htonl(pConnect->connId); + + createHBObj(pObj); + taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); return 0; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 2c48c76b1ccdca0f78fa0ba627773eff9423d353..29c8aa0a561d30e2514a760a37298f088e7cbf80 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -181,6 +181,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha return NULL; } + TAOS *taos_connect_c(const char *ip, uint8_t ipLen, const char *user, uint8_t userLen, const char *pass, uint8_t passLen, const char *db, uint8_t dbLen, uint16_t port) { char ipBuf[TSDB_EP_LEN] = {0}; @@ -215,10 +216,15 @@ void taos_close(TAOS *taos) { } if (pObj->pHb != NULL) { + if (pObj->pHb->pRpcCtx != NULL) { // wait for rsp from dnode + rpcCancelRequest(pObj->pHb->pRpcCtx); + } + tscSetFreeHeatBeat(pObj); - } else { - tscCloseTscObj(pObj); + tscFreeSqlObj(pObj->pHb); } + + tscCloseTscObj(pObj); } void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index c742db42ab530f23d49db86d8221d9c4b4b4da6f..211e6737543b8b6dd14c20793d5f5a40a2239e94 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -158,6 +158,7 @@ void taos_init() { pthread_once(&tscinit, taos_init_imp); } void taos_cleanup() { if (tscCacheHandle != NULL) { taosCacheCleanup(tscCacheHandle); + tscCacheHandle = NULL; } if (tscQhandle != NULL) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 60723676df295ce38cc4f6e60fab47ccaab5f1c0..64a871ff746c157300fa2ee4bccc291354d12702 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1394,7 +1394,7 @@ void tscSetFreeHeatBeat(STscObj* pObj) { pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; } -bool tscShouldFreeHeatBeat(SSqlObj* pHb) { +bool tscShouldFreeHeartBeat(SSqlObj* pHb) { assert(pHb == pHb->signature); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHb->cmd, 0); @@ -2141,43 +2141,6 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) { } } -//void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) { -// SFieldSupInfo* pInfo = TARRAY_GET_ELEM(pFieldInfo->pSupportInfo, columnIndex); -// assert(pInfo->pSqlExpr != NULL); -// -// int32_t type = pInfo->pSqlExpr->resType; -// int32_t bytes = pInfo->pSqlExpr->resBytes; -// -// char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row; -// -// if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) { -// int32_t realLen = varDataLen(pData); -// assert(realLen <= bytes - VARSTR_HEADER_SIZE); -// -// if (isNull(pData, type)) { -// pRes->tsrow[columnIndex] = NULL; -// } else { -// pRes->tsrow[columnIndex] = ((tstr*)pData)->data; -// } -// -// if (realLen < pInfo->pSqlExpr->resBytes - VARSTR_HEADER_SIZE) { // todo refactor -// *(pData + realLen + VARSTR_HEADER_SIZE) = 0; -// } -// -// pRes->length[columnIndex] = realLen; -// } else { -// assert(bytes == tDataTypeDesc[type].nSize); -// -// if (isNull(pData, type)) { -// pRes->tsrow[columnIndex] = NULL; -// } else { -// pRes->tsrow[columnIndex] = pData; -// } -// -// pRes->length[columnIndex] = bytes; -// } -//} - void* malloc_throw(size_t size) { void* p = malloc(size); if (p == NULL) { diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 0be021617446e75414c40677e5f53360db1c8d15..e7f40442a0b18c07193a10db61452c820e603473 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ #include "tdataformat.h" +#include "tulog.h" #include "talgo.h" #include "tcoding.h" #include "wchar.h" @@ -311,10 +312,14 @@ void dataColSetOffset(SDataCol *pCol, int nEle) { SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols)); - if (pCols == NULL) return NULL; + if (pCols == NULL) { + uDebug("malloc failure, size:%"PRId64" failed, reason:%s", sizeof(SDataCols), strerror(errno)); + return NULL; + } pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol)); if (pCols->cols == NULL) { + uDebug("malloc failure, size:%"PRId64" failed, reason:%s", sizeof(SDataCol) * maxCols, strerror(errno)); tdFreeDataCols(pCols); return NULL; } @@ -326,6 +331,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { pCols->buf = malloc(pCols->bufSize); if (pCols->buf == NULL) { + uDebug("malloc failure, size:%"PRId64" failed, reason:%s", sizeof(SDataCol) * maxCols, strerror(errno)); tdFreeDataCols(pCols); return NULL; } diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index c99cf87b210b9089f32e6e3a71418f8631ec8041..d66ebf9772bcf702490b5c780de3223cd1b6a833 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -211,8 +211,8 @@ static void *dnodeProcessReadQueue(void *param) { dnodeSendRpcReadRsp(pVnode, pReadMsg, code); } else { if (code == TSDB_CODE_QRY_HAS_RSP) { - dnodeSendRpcReadRsp(pVnode, pReadMsg, TSDB_CODE_SUCCESS); - } else { + dnodeSendRpcReadRsp(pVnode, pReadMsg, pReadMsg->rpcMsg.code); + } else { // code == TSDB_CODE_NOT_READY, do not return msg to client dnodeDispatchNonRspMsg(pVnode, pReadMsg, code); } } diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index d765eb3ad7f4177af9402cb278f24e51a7deae9e..abc4f7a02cc82eb02a6e7f3d8765038e654d4000 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -310,7 +310,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { if (error_no == 0) { printf("Query OK, %d row(s) in set (%.6fs)\n", numOfRows, (et - st) / 1E6); } else { - printf("Query interrupted (%s), %d row(s) in set (%.6fs)\n", taos_errstr(con), numOfRows, (et - st) / 1E6); + printf("Query interrupted (%s), %d row(s) in set (%.6fs)\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6); } } else { int num_rows_affacted = taos_affected_rows(pSql); diff --git a/src/plugins/http/src/httpSql.c b/src/plugins/http/src/httpSql.c index a01cb07a98cdff914dfb474b05c376ebca91a09d..e0e0d1aa8b1a97905ab845a8560ad735fc19b99a 100644 --- a/src/plugins/http/src/httpSql.c +++ b/src/plugins/http/src/httpSql.c @@ -210,6 +210,7 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num } } + // todo refactor if (tscResultsetFetchCompleted(result)) { isContinue = false; } diff --git a/src/query/inc/qFill.h b/src/query/inc/qFill.h index db6a69c2c5a98b4b6716d4b3ae3890adacb6c2a3..6b8dcb0bf9e11636cfb79298902ba74de26c9018 100644 --- a/src/query/inc/qFill.h +++ b/src/query/inc/qFill.h @@ -30,6 +30,11 @@ typedef struct { int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN union {int64_t i; double d;} fillVal; } SFillColInfo; + +typedef struct { + SSchema col; + char* tagVal; +} SFillTagColInfo; typedef struct SFillInfo { TSKEY start; // start timestamp @@ -44,7 +49,8 @@ typedef struct SFillInfo { int32_t numOfTags; // number of tags int32_t numOfCols; // number of columns, including the tags columns int32_t rowSize; // size of each row - char ** pTags; // tags value for current interpolation +// char ** pTags; // tags value for current interpolation + SFillTagColInfo* pTags; // tags value for filling gap int64_t slidingTime; // sliding value to determine the number of result for a given time window char * prevValues; // previous row of data, to generate the interpolation results char * nextValues; // next row of data diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index c208d613304017afbe58817ab2ade74c3775e47b..c8efee03cd2dcff517e3bab9b970cd4fc5e0081b 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2312,13 +2312,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; - while (true) { - if (!tsdbNextDataBlock(pQueryHandle)) { - if (terrno != TSDB_CODE_SUCCESS) { - longjmp(pRuntimeEnv->env, terrno); - } - break; - } + while (tsdbNextDataBlock(pQueryHandle)) { summary->totalBlocks += 1; if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) { @@ -2338,6 +2332,11 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { continue; } + if (terrno != TSDB_CODE_SUCCESS) { // load data block failed, abort query + longjmp(pRuntimeEnv->env, terrno); + break; + } + // query start position can not move into tableApplyFunctionsOnBlock due to limit/offset condition pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : blockInfo.rows - 1; int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock); @@ -2352,6 +2351,10 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { } } + if (terrno != TSDB_CODE_SUCCESS) { + longjmp(pRuntimeEnv->env, terrno); + } + // if the result buffer is not full, set the query complete if (!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { setQueryStatus(pQuery, QUERY_COMPLETED); @@ -4041,14 +4044,7 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle; SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; - while (true) { - if (!tsdbNextDataBlock(pQueryHandle)) { - if (terrno != TSDB_CODE_SUCCESS) { - longjmp(pRuntimeEnv->env, terrno); - } - break; - } - + while (tsdbNextDataBlock(pQueryHandle)) { if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) { finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); @@ -4068,6 +4064,10 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { break; } } + + if (terrno != TSDB_CODE_SUCCESS) { + longjmp(pRuntimeEnv->env, terrno); + } } static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { @@ -4092,14 +4092,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { STableQueryInfo *pTableQueryInfo = pQuery->current; SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; - while (true) { - if (!tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) { - if (terrno != TSDB_CODE_SUCCESS) { - longjmp(pRuntimeEnv->env, terrno); - } - break; - } - + while (tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) { tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle, &blockInfo); if (QUERY_IS_ASC_QUERY(pQuery)) { @@ -4195,6 +4188,11 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { } } + // check for error + if (terrno != TSDB_CODE_SUCCESS) { + longjmp(pRuntimeEnv->env, terrno); + } + return true; } @@ -4411,14 +4409,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - while (true) { - if (!tsdbNextDataBlock(pQueryHandle)) { - if (terrno != TSDB_CODE_SUCCESS) { - longjmp(pRuntimeEnv->env, terrno); - } - break; - } - + while (tsdbNextDataBlock(pQueryHandle)) { summary->totalBlocks += 1; if (IS_QUERY_KILLED(pQInfo)) { @@ -4452,6 +4443,10 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { pQInfo, blockInfo.uid, blockInfo.tid, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, pQuery->current->lastKey); } + if (terrno != TSDB_CODE_SUCCESS) { + longjmp(pRuntimeEnv->env, terrno); + } + updateWindowResNumOfRes(pRuntimeEnv); int64_t et = taosGetTimestampMs(); @@ -6496,8 +6491,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co (*pRsp)->numOfRows = htonl((int32_t)pQuery->rec.rows); - int32_t code = pQInfo->code; - if (code == TSDB_CODE_SUCCESS) { + if (pQInfo->code == TSDB_CODE_SUCCESS) { (*pRsp)->offset = htobe64(pQuery->limit.offset); (*pRsp)->useconds = htobe64(pRuntimeEnv->summary.elapsedTime); } else { @@ -6506,11 +6500,10 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co } (*pRsp)->precision = htons(pQuery->precision); - if (pQuery->rec.rows > 0 && code == TSDB_CODE_SUCCESS) { - code = doDumpQueryResult(pQInfo, (*pRsp)->data); + if (pQuery->rec.rows > 0 && pQInfo->code == TSDB_CODE_SUCCESS) { + doDumpQueryResult(pQInfo, (*pRsp)->data); } else { setQueryStatus(pQuery, QUERY_OVER); - code = pQInfo->code; } pQInfo->rspContext = NULL; @@ -6524,7 +6517,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co qDebug("QInfo:%p has more results waits for client retrieve", pQInfo); } - return code; + return pQInfo->code; } int32_t qQueryCompleted(qinfo_t qinfo) { diff --git a/src/query/src/qFill.c b/src/query/src/qFill.c index d29186ba49d01e1d4494075dc69f669114342b4d..d9fe67e1b77eed8a7725c3ddabdf01476bdb8cd6 100644 --- a/src/query/src/qFill.c +++ b/src/query/src/qFill.c @@ -42,19 +42,38 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_ pFillInfo->slidingUnit = slidingUnit; pFillInfo->pData = malloc(POINTER_BYTES * numOfCols); - + if (numOfTags > 0) { + pFillInfo->pTags = calloc(pFillInfo->numOfTags, sizeof(SFillTagColInfo)); + for(int32_t i = 0; i < numOfTags; ++i) { + pFillInfo->pTags[i].col.colId = -2; + } + } + int32_t rowsize = 0; + int32_t k = 0; for (int32_t i = 0; i < numOfCols; ++i) { - int32_t bytes = pFillInfo->pFillCol[i].col.bytes; - pFillInfo->pData[i] = calloc(1, bytes * capacity); - - rowsize += bytes; - } - - if (numOfTags > 0) { - pFillInfo->pTags = calloc(1, pFillInfo->numOfTags * POINTER_BYTES + rowsize); + SFillColInfo* pColInfo = &pFillInfo->pFillCol[i]; + pFillInfo->pData[i] = calloc(1, pColInfo->col.bytes * capacity); + + if (pColInfo->flag == TSDB_COL_TAG) { + bool exists = false; + for(int32_t j = 0; j < k; ++j) { + if (pFillInfo->pTags[j].col.colId == pColInfo->col.colId) { + exists = true; + break; + } + } + + if (!exists) { + pFillInfo->pTags[k].col.colId = pColInfo->col.colId; + pFillInfo->pTags[k].tagVal = calloc(1, pColInfo->col.bytes); + + k += 1; + } + } + rowsize += pColInfo->col.bytes; } - + pFillInfo->rowSize = rowsize; pFillInfo->capacityInRows = capacity; @@ -129,16 +148,21 @@ void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput) void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInput) { assert(pFillInfo->numOfRows == pInput->num); - int32_t t = 0; - + for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - - char* s = pInput->data + pCol->col.offset * pInput->num; - memcpy(pFillInfo->pData[i], s, pInput->num * pCol->col.bytes); - - if (pCol->flag == TSDB_COL_TAG && t < pFillInfo->numOfTags) { // copy the tag value - memcpy(pFillInfo->pTags[t++], pFillInfo->pData[i], pCol->col.bytes); + + char* data = pInput->data + pCol->col.offset * pInput->num; + memcpy(pFillInfo->pData[i], data, pInput->num * pCol->col.bytes); + + if (pCol->flag == TSDB_COL_TAG) { // copy the tag value to tag value buffer + for (int32_t j = 0; j < pFillInfo->numOfTags; ++j) { + SFillTagColInfo* pTag = &pFillInfo->pTags[j]; + if (pTag->col.colId == pCol->col.colId) { + memcpy(pTag->tagVal, data, pCol->col.bytes); + break; + } + } } } } @@ -224,22 +248,31 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi return 0; } -static void setTagsValue(SFillInfo* pColInfo, tFilePage** data, char** pTags, int32_t start, int32_t num) { - for (int32_t j = 0, i = start; i < pColInfo->numOfCols; ++i, ++j) { - SFillColInfo* pCol = &pColInfo->pFillCol[i]; - - char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, num); - assignVal(val1, pTags[j], pCol->col.bytes, pCol->col.type); +static void setTagsValue(SFillInfo* pFillInfo, tFilePage** data, int32_t num) { + for(int32_t j = 0; j < pFillInfo->numOfCols; ++j) { + SFillColInfo* pCol = &pFillInfo->pFillCol[j]; + if (pCol->flag == TSDB_COL_NORMAL) { + continue; + } + + char* val1 = elePtrAt(data[j]->data, pCol->col.bytes, num); + + for(int32_t i = 0; i < pFillInfo->numOfTags; ++i) { + SFillTagColInfo* pTag = &pFillInfo->pTags[i]; + if (pTag->col.colId == pCol->col.colId) { + assignVal(val1, pTag->tagVal, pCol->col.bytes, pCol->col.type); + break; + } + } } } -static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* num, char** srcData, - int64_t ts, char** pTags, bool outOfBound) { +static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* num, char** srcData, int64_t ts, + bool outOfBound) { char* prevValues = pFillInfo->prevValues; char* nextValues = pFillInfo->nextValues; SPoint point1, point2, point; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); char* val = elePtrAt(data[0]->data, TSDB_KEYSIZE, *num); @@ -279,7 +312,7 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* } } - setTagsValue(pFillInfo, data, pTags, numOfValCols, *num); + setTagsValue(pFillInfo, data, *num); } else if (pFillInfo->fillType == TSDB_FILL_LINEAR) { // TODO : linear interpolation supports NULL value if (prevValues != NULL && !outOfBound) { @@ -304,7 +337,7 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* taosDoLinearInterpolation(type, &point1, &point2, &point); } - setTagsValue(pFillInfo, data, pTags, numOfValCols, *num); + setTagsValue(pFillInfo, data, *num); } else { for (int32_t i = 1; i < numOfValCols; ++i) { @@ -319,7 +352,7 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* } } - setTagsValue(pFillInfo, data, pTags, numOfValCols, *num); + setTagsValue(pFillInfo, data, *num); } } else { /* fill the default value */ @@ -330,7 +363,7 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* assignVal(val1, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type); } - setTagsValue(pFillInfo, data, pTags, numOfValCols, *num); + setTagsValue(pFillInfo, data, *num); } pFillInfo->start += (pFillInfo->slidingTime * step); @@ -364,17 +397,14 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu char** nextValues = &pFillInfo->nextValues; int32_t numOfTags = pFillInfo->numOfTags; - char** pTags = pFillInfo->pTags; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); - if (numOfRows == 0) { /* * These data are generated according to fill strategy, since the current timestamp is out of time window of * real result set. Note that we need to keep the direct previous result rows, to generated the filled data. */ while (num < outputRows) { - doInterpoResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, pTags, true); + doFillResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, true); } pFillInfo->numOfTotal += pFillInfo->numOfCurrent; @@ -401,12 +431,11 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu while (((pFillInfo->start < ts && FILL_IS_ASC_FILL(pFillInfo)) || (pFillInfo->start > ts && !FILL_IS_ASC_FILL(pFillInfo))) && num < outputRows) { - doInterpoResultImpl(pFillInfo, data, &num, srcData, ts, pTags, false); + doFillResultImpl(pFillInfo, data, &num, srcData, ts, false); } /* output buffer is full, abort */ - if ((num == outputRows && FILL_IS_ASC_FILL(pFillInfo)) || - (num < 0 && !FILL_IS_ASC_FILL(pFillInfo))) { + if ((num == outputRows && FILL_IS_ASC_FILL(pFillInfo)) || (num < 0 && !FILL_IS_ASC_FILL(pFillInfo))) { pFillInfo->numOfTotal += pFillInfo->numOfCurrent; return outputRows; } @@ -415,10 +444,12 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu initBeforeAfterDataBuf(pFillInfo, prevValues); // assign rows to dst buffer - int32_t i = 0; - for (; i < pFillInfo->numOfCols - numOfTags; ++i) { + for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - + if (pCol->flag == TSDB_COL_TAG) { + continue; + } + char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, num); char* src = elePtrAt(srcData[i], pCol->col.bytes, pFillInfo->rowIdx); @@ -440,10 +471,12 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu } // set the tag value for final result - setTagsValue(pFillInfo, data, pTags, pFillInfo->numOfCols - numOfTags, num); + setTagsValue(pFillInfo, data, num); pFillInfo->start += (pFillInfo->slidingTime * step); pFillInfo->rowIdx += 1; + + pFillInfo->numOfCurrent +=1; num += 1; } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 13f44f20b751036629a15024c0aec997060db85c..d0c57a34d0551639c3f689d39f34e38ff88836b1 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -541,7 +541,7 @@ void rpcCancelRequest(void *handle) { if (pContext->signature != pContext) return; if (pContext->pConn) { - tDebug("%s, app trys to cancel request", pContext->pConn->info); + tDebug("%s, app tries to cancel request", pContext->pConn->info); pContext->pConn->pReqMsg = NULL; rpcCloseConn(pContext->pConn); pContext->pConn = NULL; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index fe32a0e1aad50e54518faca960eda65ea5b07e6e..c89ae0698a7a721f21fcc4315de2d32ce6e06874 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -16,7 +16,6 @@ #include "os.h" #include "tulog.h" #include "talgo.h" -#include "tutil.h" #include "tcompare.h" #include "exception.h" @@ -599,6 +598,8 @@ static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSK static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks) { // load all the comp offset value for all tables in this file + int32_t code = TSDB_CODE_SUCCESS; + *numOfBlocks = 0; size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); @@ -606,7 +607,10 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); pCheckInfo->numOfBlocks = 0; - tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb); + if (tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb) != TSDB_CODE_SUCCESS) { + code = terrno; + break; + } SCompIdx* compIndex = &pQueryHandle->rhelper.curCompIdx; @@ -619,7 +623,11 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo assert(compIndex->len > 0); char* t = realloc(pCheckInfo->pCompInfo, compIndex->len); - assert(t != NULL); + if (t == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + code = TSDB_CODE_TDB_OUT_OF_MEMORY; + break; + } pCheckInfo->pCompInfo = (SCompInfo*) t; pCheckInfo->compSize = compIndex->len; @@ -661,7 +669,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo (*numOfBlocks) += pCheckInfo->numOfBlocks; } - return TSDB_CODE_SUCCESS; + return code; } #define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block) \ @@ -672,14 +680,19 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo .uid = (_checkInfo)->tableId.uid}) -static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) { +static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) { STsdbRepo *pRepo = pQueryHandle->pTsdb; - bool blockLoaded = false; int64_t st = taosGetTimestampUs(); if (pCheckInfo->pDataCols == NULL) { STsdbMeta* pMeta = tsdbGetMeta(pRepo); + pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock); + if (pCheckInfo->pDataCols == NULL) { + tsdbError("%p failed to malloc buf, %p", pQueryHandle, pQueryHandle->qinfo); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return terrno; + } } STSchema* pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj); @@ -688,16 +701,17 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo tdInitDataCols(pQueryHandle->rhelper.pDataCols[1], pSchema); int16_t* colIds = pQueryHandle->defaultLoadColumn->pData; + int32_t ret = tsdbLoadBlockDataCols(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, (int)(QH_GET_NUM_OF_COLS(pQueryHandle))); - if (ret == TSDB_CODE_SUCCESS) { - SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo; + if (ret != TSDB_CODE_SUCCESS) { + return terrno; + } - pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup; - pBlockLoadInfo->slot = pQueryHandle->cur.slot; - pBlockLoadInfo->tid = pCheckInfo->pTableObj->tableId.tid; + SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo; - blockLoaded = true; - } + pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup; + pBlockLoadInfo->slot = pQueryHandle->cur.slot; + pBlockLoadInfo->tid = pCheckInfo->pTableObj->tableId.tid; SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows); @@ -709,12 +723,14 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64" , rows:%d, elapsed time:%"PRId64 " us, %p", pQueryHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, pQueryHandle->qinfo); - return blockLoaded; + + return TSDB_CODE_SUCCESS; } -static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){ +static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){ SQueryFilePos* cur = &pQueryHandle->cur; SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); + int32_t code = TSDB_CODE_SUCCESS; /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo); SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order); @@ -742,10 +758,14 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* cur->mixBlock = true; cur->blockCompleted = false; - return; + return code; + } + + // return error, add test cases + if ((code = doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) { + return code; } - doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock); } else { /* @@ -764,16 +784,20 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1); pCheckInfo->lastKey = cur->lastKey; } + + return code; } -static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) { +static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, bool* exists) { SQueryFilePos* cur = &pQueryHandle->cur; + int32_t code = TSDB_CODE_SUCCESS; if (ASCENDING_TRAVERSE(pQueryHandle->order)) { // query ended in/started from current block if (pQueryHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) { - if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) { - return false; + if ((code = doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) { + *exists = false; + return code; } SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0]; @@ -789,12 +813,13 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock assert(pCheckInfo->lastKey <= pBlock->keyLast); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock); } else { // the whole block is loaded in to buffer - handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); + code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); } } else { //desc order, query ended in current block if (pQueryHandle->window.ekey > pBlock->keyFirst || pCheckInfo->lastKey < pBlock->keyLast) { - if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) { - return false; + if ((code = doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) { + *exists = false; + return code; } SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0]; @@ -807,11 +832,12 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock assert(pCheckInfo->lastKey >= pBlock->keyFirst); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock); } else { - handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); + code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); } } - return pQueryHandle->realNumOfRows > 0; + *exists = pQueryHandle->realNumOfRows > 0; + return code; } static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) { @@ -1567,9 +1593,7 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex cur->fid = pQueryHandle->pFileGroup->fileId; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; - *exists = loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo); - - return TSDB_CODE_SUCCESS; + return loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo, exists); } static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists) { @@ -1608,16 +1632,14 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists cur->blockCompleted = false; STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot]; - *exists = loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo); - - return TSDB_CODE_SUCCESS; + return loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo, exists); } } else { tsdbDebug("%p continue in current data block, index:%d, %p", pQueryHandle, cur->slot, pQueryHandle->qinfo); - handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo); + int32_t code = handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo); *exists = pQueryHandle->realNumOfRows > 0; - return TSDB_CODE_SUCCESS; + return code; } } } @@ -1655,8 +1677,11 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { return false; } - /*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo(pHandle, &blockInfo); + tsdbRetrieveDataBlockInfo(pHandle, &blockInfo); /*SArray *pDataBlock = */tsdbRetrieveDataBlock(pHandle, pQueryHandle->defaultLoadColumn); + if (terrno != TSDB_CODE_SUCCESS) { + return false; + } if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) { // data already retrieve, discard other data rows and return @@ -1714,9 +1739,9 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j); STableCheckInfo info = { .lastKey = pSecQueryHandle->window.skey, - //.tableId = pCheckInfo->tableId, .pTableObj = pCheckInfo->pTableObj, }; + info.tableId = pCheckInfo->tableId; taosArrayPush(pSecQueryHandle->pTableCheckInfo, &info); @@ -1726,8 +1751,10 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo); pSecQueryHandle->defaultLoadColumn = taosArrayClone(pQueryHandle->defaultLoadColumn); - bool ret = tsdbNextDataBlock((void*) pSecQueryHandle); - assert(ret); + if (!tsdbNextDataBlock((void*) pSecQueryHandle)) { + tsdbCleanupQueryHandle(pSecQueryHandle); + return false; + } tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo); tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn); @@ -1770,7 +1797,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { bool exists = true; int32_t code = getDataBlocksInFiles(pQueryHandle, &exists); if (code != TSDB_CODE_SUCCESS) { - return false; + return code; } if (exists) { @@ -2048,7 +2075,9 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) { return pHandle->pColumns; } else { // only load the file block SCompBlock* pBlock = pBlockInfo->compBlock; - doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot); + if (doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot) != TSDB_CODE_SUCCESS) { + return NULL; + } // todo refactor int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1); diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index fee7ed3c8b923599a75be13acc44af68329636b2..bd903c8c231b11e587662d0618a2f354ab06b382 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -190,7 +190,7 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext } void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int duration) { - if (pCacheObj == NULL || pCacheObj->pHashTable == NULL) { + if (pCacheObj == NULL || pCacheObj->pHashTable == NULL || pCacheObj->deleting == 1) { return NULL; } @@ -261,7 +261,7 @@ static void incRefFn(void* ptNode) { } void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) { - if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) { + if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0 || pCacheObj->deleting == 1) { return NULL; } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 8ca76ef22de81e4e98fde59da1f889b8e306935c..c41b24579469d93f6864727a919327f782f87dbb 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -93,8 +93,11 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void** handle, vDebug("QInfo:%p exec completed, free handle:%d", *handle, *freeHandle); } } else { - pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); - memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); + SRetrieveTableRsp* pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); + memset(pRsp, 0, sizeof(SRetrieveTableRsp)); + pRsp->completed = true; + + pRet->rsp = pRsp; *freeHandle = true; } @@ -200,18 +203,18 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle, pReadMsg->rpcMsg.handle); - code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle); - // todo test the error code case - if (code == TSDB_CODE_SUCCESS) { - code = TSDB_CODE_QRY_HAS_RSP; - } + // set the real rsp error code + pReadMsg->rpcMsg.code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle); + + // NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client + code = TSDB_CODE_QRY_HAS_RSP; } else { freehandle = qQueryCompleted(*qhandle); } // NOTE: if the qhandle is not put into vread queue or query is completed, free the qhandle. - // if not build result, free it not by forced. + // If the building of result is not required, simply free it. Otherwise, mandatorily free the qhandle if (freehandle || (!buildRes)) { qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle); } diff --git a/tests/script/general/parser/lastrow_query.sim b/tests/script/general/parser/lastrow_query.sim index 9f52e45b809e17cd3a5c8ccc960d7d19545eefd6..98eb5a8d6d924087997aea909cda69fc80735fb3 100644 --- a/tests/script/general/parser/lastrow_query.sim +++ b/tests/script/general/parser/lastrow_query.sim @@ -61,4 +61,70 @@ endi sql select count(*) from lr_db0.lr_stb0 where ts>'2018-9-18 8:00:00' and ts<'2018-9-18 14:00:00' interval(1s) fill(NULL); if $row != 21600 then return -1 -endi \ No newline at end of file +endi + +#regression test case 3 +sql select t1,t1,count(*),t1,t1 from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' interval(1h) fill(NULL) group by t1 limit 1 +if $row != 2 then + return -1 +endi + +if $data01 != 7 then + return -1 +endi + +if $data02 != 7 then + return -1 +endi + +if $data03 != 59 then + print expect 59, actual: $data03 + return -1 +endi + +if $data04 != 7 then + return -1 +endi + +if $data11 != 8 then + return -1 +endi + +if $data12 != 8 then + return -1 +endi + +if $data13 != NULL then + return -1 +endi + +sql select t1,t1,count(*),t1,t1 from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' interval(1h) fill(NULL) group by t1 limit 9 +if $rows != 18 then + return -1 +endi + +sql select t1,t1,count(*),t1,t1 from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' interval(1h) fill(NULL) group by t1 limit 12 +if $rows != 24 then + return -1 +endi + +sql select t1,t1,count(*),t1,t1 from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' interval(1h) fill(NULL) group by t1 limit 25 +if $rows != 48 then + return -1 +endi + +sql select t1,t1,count(*),t1,t1 from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' interval(1h) fill(NULL) group by t1 limit 25 offset 1 +if $rows != 46 then + return -1 +endi + +sql select t1,t1,count(*),tbname,t1,t1,tbname from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' interval(1s) fill(NULL) group by tbname, t1 slimit 2 soffset 0 limit 250000 offset 1 +if $rows != 172798 then + return -1 +endi + +sql select t1,t1,count(*),tbname,t1,t1,tbname from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' interval(1s) fill(NULL) group by tbname, t1 slimit 1 soffset 1 limit 250000 offset 1 +if $rows != 86399 then + return -1 +endi + diff --git a/tests/script/general/parser/testSuite.sim b/tests/script/general/parser/testSuite.sim index ccd1aa9940462568d8bac59463d594ff674e82ea..aafba2d328eea518f8292c138fed60084c79bc42 100644 --- a/tests/script/general/parser/testSuite.sim +++ b/tests/script/general/parser/testSuite.sim @@ -1,27 +1,27 @@ -#sleep 2000 -#run general/parser/alter.sim -#sleep 2000 -#run general/parser/alter1.sim -#sleep 2000 -#run general/parser/alter_stable.sim -#sleep 2000 -#run general/parser/auto_create_tb.sim -#sleep 2000 -#run general/parser/auto_create_tb_drop_tb.sim -#sleep 2000 -#run general/parser/col_arithmetic_operation.sim -#sleep 2000 -#run general/parser/columnValue.sim -#sleep 2000 -#run general/parser/commit.sim -#sleep 2000 -#run general/parser/create_db.sim -#sleep 2000 -#run general/parser/create_mt.sim -#sleep 2000 -#run general/parser/create_tb.sim -#sleep 2000 -#run general/parser/dbtbnameValidate.sim +sleep 2000 +run general/parser/alter.sim +sleep 2000 +run general/parser/alter1.sim +sleep 2000 +run general/parser/alter_stable.sim +sleep 2000 +run general/parser/auto_create_tb.sim +sleep 2000 +run general/parser/auto_create_tb_drop_tb.sim +sleep 2000 +run general/parser/col_arithmetic_operation.sim +sleep 2000 +run general/parser/columnValue.sim +sleep 2000 +run general/parser/commit.sim +sleep 2000 +run general/parser/create_db.sim +sleep 2000 +run general/parser/create_mt.sim +sleep 2000 +run general/parser/create_tb.sim +sleep 2000 +run general/parser/dbtbnameValidate.sim sleep 2000 run general/parser/fill.sim sleep 2000