提交 01f56ee5 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/develop' into feature/os

...@@ -210,7 +210,7 @@ void tscTagCondRelease(STagCond* pCond); ...@@ -210,7 +210,7 @@ void tscTagCondRelease(STagCond* pCond);
void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo); void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo);
void tscSetFreeHeatBeat(STscObj* pObj); void tscSetFreeHeatBeat(STscObj* pObj);
bool tscShouldFreeHeatBeat(SSqlObj* pHb); bool tscShouldFreeHeartBeat(SSqlObj* pHb);
bool tscShouldBeFreed(SSqlObj* pSql); bool tscShouldBeFreed(SSqlObj* pSql);
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex); STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex);
......
...@@ -142,6 +142,7 @@ static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) { ...@@ -142,6 +142,7 @@ static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) {
pFillCol[i].col.bytes = pExpr->resBytes; pFillCol[i].col.bytes = pExpr->resBytes;
pFillCol[i].col.type = (int8_t)pExpr->resType; pFillCol[i].col.type = (int8_t)pExpr->resType;
pFillCol[i].col.colId = pExpr->colInfo.colId;
pFillCol[i].flag = pExpr->colInfo.flag; pFillCol[i].flag = pExpr->colInfo.flag;
pFillCol[i].col.offset = offset; pFillCol[i].col.offset = offset;
pFillCol[i].functionId = pExpr->functionId; pFillCol[i].functionId = pExpr->functionId;
...@@ -379,20 +380,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -379,20 +380,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
4096, (int32_t)numOfCols, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit, 4096, (int32_t)numOfCols, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit,
tinfo.precision, pQueryInfo->fillType, pFillCol); tinfo.precision, pQueryInfo->fillType, pFillCol);
} }
int32_t startIndex = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols;
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0 && pReducer->pFillInfo != NULL) {
pReducer->pFillInfo->pTags[0] = (char *)pReducer->pFillInfo->pTags + POINTER_BYTES * pQueryInfo->groupbyExpr.numOfGroupCols;
for (int32_t i = 1; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) {
SSchema *pSchema = getColumnModelSchema(pReducer->resColModel, startIndex + i - 1);
pReducer->pFillInfo->pTags[i] = pSchema->bytes + pReducer->pFillInfo->pTags[i - 1];
}
} else {
if (pReducer->pFillInfo != NULL) {
assert(pReducer->pFillInfo->pTags == NULL);
}
}
} }
static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage,
...@@ -835,7 +822,7 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource * ...@@ -835,7 +822,7 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource *
} }
} }
void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo, SFillInfo *pFillInfo) { void savePrevRecordAndSetupFillInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo, SFillInfo *pFillInfo) {
// discard following dataset in the same group and reset the interpolation information // discard following dataset in the same group and reset the interpolation information
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
...@@ -878,64 +865,66 @@ static void reversedCopyFromInterpolationToDstBuf(SQueryInfo *pQueryInfo, SSqlRe ...@@ -878,64 +865,66 @@ static void reversedCopyFromInterpolationToDstBuf(SQueryInfo *pQueryInfo, SSqlRe
} }
} }
/* static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer, SQueryInfo* pQueryInfo) {
* Note: pRes->pLocalReducer may be null, due to the fact that "tscDestroyLocalReducer" is called assert(pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE);
* by "interuptHandler" function in shell
*/
static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) {
SSqlCmd * pCmd = &pSql->cmd;
SSqlRes * pRes = &pSql->res;
tFilePage * pFinalDataPage = pLocalReducer->pResultBuf;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// no interval query, no fill operation tFilePage * pBeforeFillData = pLocalReducer->pResultBuf;
if (pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) {
pRes->data = pLocalReducer->pFinalRes;
pRes->numOfRows = pFinalDataPage->num;
if (pQueryInfo->limit.offset > 0) { pRes->data = pLocalReducer->pFinalRes;
if (pQueryInfo->limit.offset < pRes->numOfRows) { pRes->numOfRows = pBeforeFillData->num;
int32_t prevSize = (int32_t)pFinalDataPage->num;
tColModelErase(pLocalReducer->resColModel, pFinalDataPage, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1);
/* remove the hole in column model */ if (pQueryInfo->limit.offset > 0) {
tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize); if (pQueryInfo->limit.offset < pRes->numOfRows) {
int32_t prevSize = (int32_t)pBeforeFillData->num;
tColModelErase(pLocalReducer->resColModel, pBeforeFillData, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1);
pRes->numOfRows -= pQueryInfo->limit.offset; /* remove the hole in column model */
pQueryInfo->limit.offset = 0; tColModelCompact(pLocalReducer->resColModel, pBeforeFillData, prevSize);
} else {
pQueryInfo->limit.offset -= pRes->numOfRows; pRes->numOfRows -= pQueryInfo->limit.offset;
pRes->numOfRows = 0; pQueryInfo->limit.offset = 0;
} } else {
pQueryInfo->limit.offset -= pRes->numOfRows;
pRes->numOfRows = 0;
} }
}
pRes->numOfRowsGroup += pRes->numOfRows; pRes->numOfRowsGroup += pRes->numOfRows;
if (pQueryInfo->limit.limit >= 0 && pRes->numOfRowsGroup > pQueryInfo->limit.limit) { // impose the limitation of output rows on the final result
/* impose the limitation of output rows on the final result */ if (pQueryInfo->limit.limit >= 0 && pRes->numOfRowsGroup > pQueryInfo->limit.limit) {
int32_t prevSize = (int32_t)pFinalDataPage->num; int32_t prevSize = (int32_t)pBeforeFillData->num;
int32_t overflow = (int32_t)(pRes->numOfRowsGroup - pQueryInfo->limit.limit); int32_t overflow = (int32_t)(pRes->numOfRowsGroup - pQueryInfo->limit.limit);
assert(overflow < pRes->numOfRows); assert(overflow < pRes->numOfRows);
pRes->numOfRowsGroup = pQueryInfo->limit.limit; pRes->numOfRowsGroup = pQueryInfo->limit.limit;
pRes->numOfRows -= overflow; pRes->numOfRows -= overflow;
pFinalDataPage->num -= overflow; pBeforeFillData->num -= overflow;
tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize); tColModelCompact(pLocalReducer->resColModel, pBeforeFillData, prevSize);
/* set remain data to be discarded, and reset the interpolation information */ // set remain data to be discarded, and reset the interpolation information
savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo); savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo);
} }
memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * pLocalReducer->finalRowSize); memcpy(pRes->data, pBeforeFillData->data, pRes->numOfRows * pLocalReducer->finalRowSize);
pRes->numOfClauseTotal += pRes->numOfRows; pRes->numOfClauseTotal += pRes->numOfRows;
pFinalDataPage->num = 0; pBeforeFillData->num = 0;
return; }
}
/*
* Note: pRes->pLocalReducer may be null, due to the fact that "tscDestroyLocalReducer" is called
* by "interuptHandler" function in shell
*/
static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
tFilePage *pBeforeFillData = pLocalReducer->pResultBuf;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SFillInfo *pFillInfo = pLocalReducer->pFillInfo;
SFillInfo *pFillInfo = pLocalReducer->pFillInfo;
int64_t actualETime = MAX(pQueryInfo->window.skey, pQueryInfo->window.ekey); int64_t actualETime = MAX(pQueryInfo->window.skey, pQueryInfo->window.ekey);
tFilePage **pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput); tFilePage **pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput);
...@@ -973,7 +962,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO ...@@ -973,7 +962,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
break; break;
} }
/* all output for current group are completed */ // all output in current group are completed
int32_t totalRemainRows = (int32_t)getFilledNumOfRes(pFillInfo, actualETime, pLocalReducer->resColModel->capacity); int32_t totalRemainRows = (int32_t)getFilledNumOfRes(pFillInfo, actualETime, pLocalReducer->resColModel->capacity);
if (totalRemainRows <= 0) { if (totalRemainRows <= 0) {
break; break;
...@@ -983,15 +972,16 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO ...@@ -983,15 +972,16 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
} }
if (pRes->numOfRows > 0) { if (pRes->numOfRows > 0) {
if (pQueryInfo->limit.limit >= 0 && pRes->numOfRows > pQueryInfo->limit.limit) { int32_t currentTotal = pRes->numOfRowsGroup + pRes->numOfRows;
int32_t overflow = (int32_t)(pRes->numOfRows - pQueryInfo->limit.limit);
pRes->numOfRows -= overflow;
pFinalDataPage->num -= overflow;
assert(pRes->numOfRows >= 0 && pFinalDataPage->num > 0); if (pQueryInfo->limit.limit >= 0 && currentTotal > pQueryInfo->limit.limit) {
int32_t overflow = (int32_t)(currentTotal - pQueryInfo->limit.limit);
pRes->numOfRows -= overflow;
assert(pRes->numOfRows >= 0);
/* set remain data to be discarded, and reset the interpolation information */ /* set remain data to be discarded, and reset the interpolation information */
savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pFillInfo); savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pFillInfo);
} }
if (pQueryInfo->order.order == TSDB_ORDER_ASC) { if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
...@@ -1008,7 +998,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO ...@@ -1008,7 +998,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
pRes->numOfClauseTotal += pRes->numOfRows; pRes->numOfClauseTotal += pRes->numOfRows;
} }
pFinalDataPage->num = 0; pBeforeFillData->num = 0;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
taosTFree(pResPages[i]); taosTFree(pResPages[i]);
} }
...@@ -1225,7 +1215,7 @@ static bool saveGroupResultInfo(SSqlObj *pSql) { ...@@ -1225,7 +1215,7 @@ static bool saveGroupResultInfo(SSqlObj *pSql) {
* @param noMoreCurrentGroupRes * @param noMoreCurrentGroupRes
* @return if current group is skipped, return false, and do NOT record it into pRes->numOfGroups * @return if current group is skipped, return false, and do NOT record it into pRes->numOfGroups
*/ */
bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCurrentGroupRes) { bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCurrentGroupRes) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
...@@ -1259,13 +1249,21 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no ...@@ -1259,13 +1249,21 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no
// tColModelDisplay(pLocalReducer->resColModel, pLocalReducer->pBufForInterpo, pResBuf->num, pResBuf->num); // tColModelDisplay(pLocalReducer->resColModel, pLocalReducer->pBufForInterpo, pResBuf->num, pResBuf->num);
#endif #endif
SFillInfo* pFillInfo = pLocalReducer->pFillInfo;
if (pFillInfo != NULL) {
taosFillSetStartInfo(pFillInfo, (int32_t)pResBuf->num, pQueryInfo->window.ekey);
taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf);
}
doFillResult(pSql, pLocalReducer, noMoreCurrentGroupRes);
// no interval query, no fill operation
if (pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) {
genFinalResWithoutFill(pRes, pLocalReducer, pQueryInfo);
} else {
SFillInfo* pFillInfo = pLocalReducer->pFillInfo;
if (pFillInfo != NULL) {
taosFillSetStartInfo(pFillInfo, (int32_t)pResBuf->num, pQueryInfo->window.ekey);
taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf);
}
doFillResult(pSql, pLocalReducer, noMoreCurrentGroupRes);
}
return true; return true;
} }
...@@ -1350,7 +1348,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { ...@@ -1350,7 +1348,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
int64_t etime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey; int64_t etime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey;
int32_t rows = (int32_t)getFilledNumOfRes(pFillInfo, etime, pLocalReducer->resColModel->capacity); int32_t rows = (int32_t)getFilledNumOfRes(pFillInfo, etime, pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo if (rows > 0) {
doFillResult(pSql, pLocalReducer, true); doFillResult(pSql, pLocalReducer, true);
} }
} }
...@@ -1515,7 +1513,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { ...@@ -1515,7 +1513,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
*/ */
if ((!sameGroup && pResBuf->num > 0) || (pResBuf->num == pLocalReducer->resColModel->capacity)) { if ((!sameGroup && pResBuf->num > 0) || (pResBuf->num == pLocalReducer->resColModel->capacity)) {
// does not belong to the same group // does not belong to the same group
bool notSkipped = doGenerateFinalResults(pSql, pLocalReducer, !sameGroup); bool notSkipped = genFinalResults(pSql, pLocalReducer, !sameGroup);
// this row needs to discard, since it belongs to the group of previous // this row needs to discard, since it belongs to the group of previous
if (pLocalReducer->discard && sameGroup) { if (pLocalReducer->discard && sameGroup) {
...@@ -1584,7 +1582,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { ...@@ -1584,7 +1582,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
} }
if (pLocalReducer->pResultBuf->num) { if (pLocalReducer->pResultBuf->num) {
doGenerateFinalResults(pSql, pLocalReducer, true); genFinalResults(pSql, pLocalReducer, true);
} }
assert(pLocalReducer->status == TSC_LOCALREDUCE_IN_PROGRESS && pRes->row == 0); assert(pLocalReducer->status == TSC_LOCALREDUCE_IN_PROGRESS && pRes->row == 0);
......
...@@ -171,45 +171,25 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -171,45 +171,25 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
void tscProcessActivityTimer(void *handle, void *tmrId) { void tscProcessActivityTimer(void *handle, void *tmrId) {
STscObj *pObj = (STscObj *)handle; STscObj *pObj = (STscObj *)handle;
if (pObj == NULL || pObj->signature != pObj) {
if (pObj == NULL) return; return;
if (pObj->signature != pObj) return;
if (pObj->pTimer != tmrId) return;
if (pObj->pHb == NULL) {
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (NULL == pSql) return;
pSql->fp = tscProcessHeartBeatRsp;
SQueryInfo *pQueryInfo = NULL;
tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
pQueryInfo->command = TSDB_SQL_HB;
pSql->cmd.command = TSDB_SQL_HB;
if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
taosTFree(pSql);
return;
}
pSql->cmd.command = TSDB_SQL_HB;
pSql->param = pObj;
pSql->pTscObj = pObj;
pSql->signature = pSql;
pObj->pHb = pSql;
tscAddSubqueryInfo(&pObj->pHb->cmd);
tscDebug("%p pHb is allocated, pObj:%p", pObj->pHb, pObj);
} }
if (tscShouldFreeHeatBeat(pObj->pHb)) { SSqlObj* pHB = pObj->pHb;
tscDebug("%p free HB object and release connection", pObj); if (pObj->pTimer != tmrId || pHB == NULL) {
tscFreeSqlObj(pObj->pHb);
tscCloseTscObj(pObj);
return; return;
} }
tscProcessSql(pObj->pHb); if (tscShouldFreeHeartBeat(pHB)) {
tscDebug("%p free HB object and release connection", pHB);
tscFreeSqlObj(pHB);
tscCloseTscObj(pObj);
} else {
int32_t code = tscProcessSql(pHB);
if (code != TSDB_CODE_SUCCESS) {
tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
}
}
} }
int tscSendMsgToServer(SSqlObj *pSql) { int tscSendMsgToServer(SSqlObj *pSql) {
...@@ -265,6 +245,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -265,6 +245,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
return; return;
} }
pSql->pRpcCtx = NULL; // clear the rpcCtx
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) { if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
...@@ -1953,6 +1935,35 @@ int tscProcessShowRsp(SSqlObj *pSql) { ...@@ -1953,6 +1935,35 @@ int tscProcessShowRsp(SSqlObj *pSql) {
return 0; return 0;
} }
static void createHBObj(STscObj* pObj) {
if (pObj->pHb != NULL) {
return;
}
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (NULL == pSql) return;
pSql->fp = tscProcessHeartBeatRsp;
SQueryInfo *pQueryInfo = NULL;
tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
pQueryInfo->command = TSDB_SQL_HB;
pSql->cmd.command = pQueryInfo->command;
if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
taosTFree(pSql);
return;
}
pSql->param = pObj;
pSql->pTscObj = pObj;
pSql->signature = pSql;
pObj->pHb = pSql;
tscAddSubqueryInfo(&pObj->pHb->cmd);
tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj);
}
int tscProcessConnectRsp(SSqlObj *pSql) { int tscProcessConnectRsp(SSqlObj *pSql) {
char temp[TSDB_TABLE_FNAME_LEN * 2]; char temp[TSDB_TABLE_FNAME_LEN * 2];
STscObj *pObj = pSql->pTscObj; STscObj *pObj = pSql->pTscObj;
...@@ -1974,6 +1985,9 @@ int tscProcessConnectRsp(SSqlObj *pSql) { ...@@ -1974,6 +1985,9 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
pObj->writeAuth = pConnect->writeAuth; pObj->writeAuth = pConnect->writeAuth;
pObj->superAuth = pConnect->superAuth; pObj->superAuth = pConnect->superAuth;
pObj->connId = htonl(pConnect->connId); pObj->connId = htonl(pConnect->connId);
createHBObj(pObj);
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
return 0; return 0;
......
...@@ -181,6 +181,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha ...@@ -181,6 +181,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
return NULL; return NULL;
} }
TAOS *taos_connect_c(const char *ip, uint8_t ipLen, const char *user, uint8_t userLen, TAOS *taos_connect_c(const char *ip, uint8_t ipLen, const char *user, uint8_t userLen,
const char *pass, uint8_t passLen, const char *db, uint8_t dbLen, uint16_t port) { const char *pass, uint8_t passLen, const char *db, uint8_t dbLen, uint16_t port) {
char ipBuf[TSDB_EP_LEN] = {0}; char ipBuf[TSDB_EP_LEN] = {0};
...@@ -215,10 +216,15 @@ void taos_close(TAOS *taos) { ...@@ -215,10 +216,15 @@ void taos_close(TAOS *taos) {
} }
if (pObj->pHb != NULL) { if (pObj->pHb != NULL) {
if (pObj->pHb->pRpcCtx != NULL) { // wait for rsp from dnode
rpcCancelRequest(pObj->pHb->pRpcCtx);
}
tscSetFreeHeatBeat(pObj); tscSetFreeHeatBeat(pObj);
} else { tscFreeSqlObj(pObj->pHb);
tscCloseTscObj(pObj);
} }
tscCloseTscObj(pObj);
} }
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
......
...@@ -158,6 +158,7 @@ void taos_init() { pthread_once(&tscinit, taos_init_imp); } ...@@ -158,6 +158,7 @@ void taos_init() { pthread_once(&tscinit, taos_init_imp); }
void taos_cleanup() { void taos_cleanup() {
if (tscCacheHandle != NULL) { if (tscCacheHandle != NULL) {
taosCacheCleanup(tscCacheHandle); taosCacheCleanup(tscCacheHandle);
tscCacheHandle = NULL;
} }
if (tscQhandle != NULL) { if (tscQhandle != NULL) {
......
...@@ -1394,7 +1394,7 @@ void tscSetFreeHeatBeat(STscObj* pObj) { ...@@ -1394,7 +1394,7 @@ void tscSetFreeHeatBeat(STscObj* pObj) {
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
} }
bool tscShouldFreeHeatBeat(SSqlObj* pHb) { bool tscShouldFreeHeartBeat(SSqlObj* pHb) {
assert(pHb == pHb->signature); assert(pHb == pHb->signature);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHb->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHb->cmd, 0);
...@@ -2141,43 +2141,6 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) { ...@@ -2141,43 +2141,6 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) {
} }
} }
//void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) {
// SFieldSupInfo* pInfo = TARRAY_GET_ELEM(pFieldInfo->pSupportInfo, columnIndex);
// assert(pInfo->pSqlExpr != NULL);
//
// int32_t type = pInfo->pSqlExpr->resType;
// int32_t bytes = pInfo->pSqlExpr->resBytes;
//
// char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row;
//
// if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
// int32_t realLen = varDataLen(pData);
// assert(realLen <= bytes - VARSTR_HEADER_SIZE);
//
// if (isNull(pData, type)) {
// pRes->tsrow[columnIndex] = NULL;
// } else {
// pRes->tsrow[columnIndex] = ((tstr*)pData)->data;
// }
//
// if (realLen < pInfo->pSqlExpr->resBytes - VARSTR_HEADER_SIZE) { // todo refactor
// *(pData + realLen + VARSTR_HEADER_SIZE) = 0;
// }
//
// pRes->length[columnIndex] = realLen;
// } else {
// assert(bytes == tDataTypeDesc[type].nSize);
//
// if (isNull(pData, type)) {
// pRes->tsrow[columnIndex] = NULL;
// } else {
// pRes->tsrow[columnIndex] = pData;
// }
//
// pRes->length[columnIndex] = bytes;
// }
//}
void* malloc_throw(size_t size) { void* malloc_throw(size_t size) {
void* p = malloc(size); void* p = malloc(size);
if (p == NULL) { if (p == NULL) {
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tdataformat.h" #include "tdataformat.h"
#include "tulog.h"
#include "talgo.h" #include "talgo.h"
#include "tcoding.h" #include "tcoding.h"
#include "wchar.h" #include "wchar.h"
...@@ -311,10 +312,14 @@ void dataColSetOffset(SDataCol *pCol, int nEle) { ...@@ -311,10 +312,14 @@ void dataColSetOffset(SDataCol *pCol, int nEle) {
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols)); SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols));
if (pCols == NULL) return NULL; if (pCols == NULL) {
uDebug("malloc failure, size:%"PRId64" failed, reason:%s", sizeof(SDataCols), strerror(errno));
return NULL;
}
pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol)); pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol));
if (pCols->cols == NULL) { if (pCols->cols == NULL) {
uDebug("malloc failure, size:%"PRId64" failed, reason:%s", sizeof(SDataCol) * maxCols, strerror(errno));
tdFreeDataCols(pCols); tdFreeDataCols(pCols);
return NULL; return NULL;
} }
...@@ -326,6 +331,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { ...@@ -326,6 +331,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
pCols->buf = malloc(pCols->bufSize); pCols->buf = malloc(pCols->bufSize);
if (pCols->buf == NULL) { if (pCols->buf == NULL) {
uDebug("malloc failure, size:%"PRId64" failed, reason:%s", sizeof(SDataCol) * maxCols, strerror(errno));
tdFreeDataCols(pCols); tdFreeDataCols(pCols);
return NULL; return NULL;
} }
......
...@@ -211,8 +211,8 @@ static void *dnodeProcessReadQueue(void *param) { ...@@ -211,8 +211,8 @@ static void *dnodeProcessReadQueue(void *param) {
dnodeSendRpcReadRsp(pVnode, pReadMsg, code); dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
} else { } else {
if (code == TSDB_CODE_QRY_HAS_RSP) { if (code == TSDB_CODE_QRY_HAS_RSP) {
dnodeSendRpcReadRsp(pVnode, pReadMsg, TSDB_CODE_SUCCESS); dnodeSendRpcReadRsp(pVnode, pReadMsg, pReadMsg->rpcMsg.code);
} else { } else { // code == TSDB_CODE_NOT_READY, do not return msg to client
dnodeDispatchNonRspMsg(pVnode, pReadMsg, code); dnodeDispatchNonRspMsg(pVnode, pReadMsg, code);
} }
} }
......
...@@ -310,7 +310,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { ...@@ -310,7 +310,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
if (error_no == 0) { if (error_no == 0) {
printf("Query OK, %d row(s) in set (%.6fs)\n", numOfRows, (et - st) / 1E6); printf("Query OK, %d row(s) in set (%.6fs)\n", numOfRows, (et - st) / 1E6);
} else { } else {
printf("Query interrupted (%s), %d row(s) in set (%.6fs)\n", taos_errstr(con), numOfRows, (et - st) / 1E6); printf("Query interrupted (%s), %d row(s) in set (%.6fs)\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6);
} }
} else { } else {
int num_rows_affacted = taos_affected_rows(pSql); int num_rows_affacted = taos_affected_rows(pSql);
......
...@@ -210,6 +210,7 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num ...@@ -210,6 +210,7 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num
} }
} }
// todo refactor
if (tscResultsetFetchCompleted(result)) { if (tscResultsetFetchCompleted(result)) {
isContinue = false; isContinue = false;
} }
......
...@@ -30,6 +30,11 @@ typedef struct { ...@@ -30,6 +30,11 @@ typedef struct {
int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN
union {int64_t i; double d;} fillVal; union {int64_t i; double d;} fillVal;
} SFillColInfo; } SFillColInfo;
typedef struct {
SSchema col;
char* tagVal;
} SFillTagColInfo;
typedef struct SFillInfo { typedef struct SFillInfo {
TSKEY start; // start timestamp TSKEY start; // start timestamp
...@@ -44,7 +49,8 @@ typedef struct SFillInfo { ...@@ -44,7 +49,8 @@ typedef struct SFillInfo {
int32_t numOfTags; // number of tags int32_t numOfTags; // number of tags
int32_t numOfCols; // number of columns, including the tags columns int32_t numOfCols; // number of columns, including the tags columns
int32_t rowSize; // size of each row int32_t rowSize; // size of each row
char ** pTags; // tags value for current interpolation // char ** pTags; // tags value for current interpolation
SFillTagColInfo* pTags; // tags value for filling gap
int64_t slidingTime; // sliding value to determine the number of result for a given time window int64_t slidingTime; // sliding value to determine the number of result for a given time window
char * prevValues; // previous row of data, to generate the interpolation results char * prevValues; // previous row of data, to generate the interpolation results
char * nextValues; // next row of data char * nextValues; // next row of data
......
...@@ -2312,13 +2312,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2312,13 +2312,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
while (true) { while (tsdbNextDataBlock(pQueryHandle)) {
if (!tsdbNextDataBlock(pQueryHandle)) {
if (terrno != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, terrno);
}
break;
}
summary->totalBlocks += 1; summary->totalBlocks += 1;
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) { if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
...@@ -2338,6 +2332,11 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2338,6 +2332,11 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
continue; continue;
} }
if (terrno != TSDB_CODE_SUCCESS) { // load data block failed, abort query
longjmp(pRuntimeEnv->env, terrno);
break;
}
// query start position can not move into tableApplyFunctionsOnBlock due to limit/offset condition // query start position can not move into tableApplyFunctionsOnBlock due to limit/offset condition
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : blockInfo.rows - 1; pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : blockInfo.rows - 1;
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock); int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock);
...@@ -2352,6 +2351,10 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2352,6 +2351,10 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
} }
} }
if (terrno != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, terrno);
}
// if the result buffer is not full, set the query complete // if the result buffer is not full, set the query complete
if (!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { if (!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pQuery, QUERY_COMPLETED);
...@@ -4041,14 +4044,7 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -4041,14 +4044,7 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle; TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle;
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
while (true) { while (tsdbNextDataBlock(pQueryHandle)) {
if (!tsdbNextDataBlock(pQueryHandle)) {
if (terrno != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, terrno);
}
break;
}
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) { if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
...@@ -4068,6 +4064,10 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -4068,6 +4064,10 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
break; break;
} }
} }
if (terrno != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, terrno);
}
} }
static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
...@@ -4092,14 +4092,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { ...@@ -4092,14 +4092,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
STableQueryInfo *pTableQueryInfo = pQuery->current; STableQueryInfo *pTableQueryInfo = pQuery->current;
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
while (true) { while (tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) {
if (!tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) {
if (terrno != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, terrno);
}
break;
}
tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle, &blockInfo); tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle, &blockInfo);
if (QUERY_IS_ASC_QUERY(pQuery)) { if (QUERY_IS_ASC_QUERY(pQuery)) {
...@@ -4195,6 +4188,11 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { ...@@ -4195,6 +4188,11 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
} }
} }
// check for error
if (terrno != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, terrno);
}
return true; return true;
} }
...@@ -4411,14 +4409,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { ...@@ -4411,14 +4409,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
while (true) { while (tsdbNextDataBlock(pQueryHandle)) {
if (!tsdbNextDataBlock(pQueryHandle)) {
if (terrno != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, terrno);
}
break;
}
summary->totalBlocks += 1; summary->totalBlocks += 1;
if (IS_QUERY_KILLED(pQInfo)) { if (IS_QUERY_KILLED(pQInfo)) {
...@@ -4452,6 +4443,10 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { ...@@ -4452,6 +4443,10 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
pQInfo, blockInfo.uid, blockInfo.tid, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, pQuery->current->lastKey); pQInfo, blockInfo.uid, blockInfo.tid, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, pQuery->current->lastKey);
} }
if (terrno != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, terrno);
}
updateWindowResNumOfRes(pRuntimeEnv); updateWindowResNumOfRes(pRuntimeEnv);
int64_t et = taosGetTimestampMs(); int64_t et = taosGetTimestampMs();
...@@ -6496,8 +6491,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co ...@@ -6496,8 +6491,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
(*pRsp)->numOfRows = htonl((int32_t)pQuery->rec.rows); (*pRsp)->numOfRows = htonl((int32_t)pQuery->rec.rows);
int32_t code = pQInfo->code; if (pQInfo->code == TSDB_CODE_SUCCESS) {
if (code == TSDB_CODE_SUCCESS) {
(*pRsp)->offset = htobe64(pQuery->limit.offset); (*pRsp)->offset = htobe64(pQuery->limit.offset);
(*pRsp)->useconds = htobe64(pRuntimeEnv->summary.elapsedTime); (*pRsp)->useconds = htobe64(pRuntimeEnv->summary.elapsedTime);
} else { } else {
...@@ -6506,11 +6500,10 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co ...@@ -6506,11 +6500,10 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
} }
(*pRsp)->precision = htons(pQuery->precision); (*pRsp)->precision = htons(pQuery->precision);
if (pQuery->rec.rows > 0 && code == TSDB_CODE_SUCCESS) { if (pQuery->rec.rows > 0 && pQInfo->code == TSDB_CODE_SUCCESS) {
code = doDumpQueryResult(pQInfo, (*pRsp)->data); doDumpQueryResult(pQInfo, (*pRsp)->data);
} else { } else {
setQueryStatus(pQuery, QUERY_OVER); setQueryStatus(pQuery, QUERY_OVER);
code = pQInfo->code;
} }
pQInfo->rspContext = NULL; pQInfo->rspContext = NULL;
...@@ -6524,7 +6517,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co ...@@ -6524,7 +6517,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
qDebug("QInfo:%p has more results waits for client retrieve", pQInfo); qDebug("QInfo:%p has more results waits for client retrieve", pQInfo);
} }
return code; return pQInfo->code;
} }
int32_t qQueryCompleted(qinfo_t qinfo) { int32_t qQueryCompleted(qinfo_t qinfo) {
......
...@@ -42,19 +42,38 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_ ...@@ -42,19 +42,38 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_
pFillInfo->slidingUnit = slidingUnit; pFillInfo->slidingUnit = slidingUnit;
pFillInfo->pData = malloc(POINTER_BYTES * numOfCols); pFillInfo->pData = malloc(POINTER_BYTES * numOfCols);
if (numOfTags > 0) {
pFillInfo->pTags = calloc(pFillInfo->numOfTags, sizeof(SFillTagColInfo));
for(int32_t i = 0; i < numOfTags; ++i) {
pFillInfo->pTags[i].col.colId = -2;
}
}
int32_t rowsize = 0; int32_t rowsize = 0;
int32_t k = 0;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
int32_t bytes = pFillInfo->pFillCol[i].col.bytes; SFillColInfo* pColInfo = &pFillInfo->pFillCol[i];
pFillInfo->pData[i] = calloc(1, bytes * capacity); pFillInfo->pData[i] = calloc(1, pColInfo->col.bytes * capacity);
rowsize += bytes; if (pColInfo->flag == TSDB_COL_TAG) {
} bool exists = false;
for(int32_t j = 0; j < k; ++j) {
if (numOfTags > 0) { if (pFillInfo->pTags[j].col.colId == pColInfo->col.colId) {
pFillInfo->pTags = calloc(1, pFillInfo->numOfTags * POINTER_BYTES + rowsize); exists = true;
break;
}
}
if (!exists) {
pFillInfo->pTags[k].col.colId = pColInfo->col.colId;
pFillInfo->pTags[k].tagVal = calloc(1, pColInfo->col.bytes);
k += 1;
}
}
rowsize += pColInfo->col.bytes;
} }
pFillInfo->rowSize = rowsize; pFillInfo->rowSize = rowsize;
pFillInfo->capacityInRows = capacity; pFillInfo->capacityInRows = capacity;
...@@ -129,16 +148,21 @@ void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput) ...@@ -129,16 +148,21 @@ void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput)
void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInput) { void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInput) {
assert(pFillInfo->numOfRows == pInput->num); assert(pFillInfo->numOfRows == pInput->num);
int32_t t = 0;
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i]; SFillColInfo* pCol = &pFillInfo->pFillCol[i];
char* s = pInput->data + pCol->col.offset * pInput->num; char* data = pInput->data + pCol->col.offset * pInput->num;
memcpy(pFillInfo->pData[i], s, pInput->num * pCol->col.bytes); memcpy(pFillInfo->pData[i], data, pInput->num * pCol->col.bytes);
if (pCol->flag == TSDB_COL_TAG && t < pFillInfo->numOfTags) { // copy the tag value if (pCol->flag == TSDB_COL_TAG) { // copy the tag value to tag value buffer
memcpy(pFillInfo->pTags[t++], pFillInfo->pData[i], pCol->col.bytes); for (int32_t j = 0; j < pFillInfo->numOfTags; ++j) {
SFillTagColInfo* pTag = &pFillInfo->pTags[j];
if (pTag->col.colId == pCol->col.colId) {
memcpy(pTag->tagVal, data, pCol->col.bytes);
break;
}
}
} }
} }
} }
...@@ -224,22 +248,31 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi ...@@ -224,22 +248,31 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi
return 0; return 0;
} }
static void setTagsValue(SFillInfo* pColInfo, tFilePage** data, char** pTags, int32_t start, int32_t num) { static void setTagsValue(SFillInfo* pFillInfo, tFilePage** data, int32_t num) {
for (int32_t j = 0, i = start; i < pColInfo->numOfCols; ++i, ++j) { for(int32_t j = 0; j < pFillInfo->numOfCols; ++j) {
SFillColInfo* pCol = &pColInfo->pFillCol[i]; SFillColInfo* pCol = &pFillInfo->pFillCol[j];
if (pCol->flag == TSDB_COL_NORMAL) {
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, num); continue;
assignVal(val1, pTags[j], pCol->col.bytes, pCol->col.type); }
char* val1 = elePtrAt(data[j]->data, pCol->col.bytes, num);
for(int32_t i = 0; i < pFillInfo->numOfTags; ++i) {
SFillTagColInfo* pTag = &pFillInfo->pTags[i];
if (pTag->col.colId == pCol->col.colId) {
assignVal(val1, pTag->tagVal, pCol->col.bytes, pCol->col.type);
break;
}
}
} }
} }
static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* num, char** srcData, static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* num, char** srcData, int64_t ts,
int64_t ts, char** pTags, bool outOfBound) { bool outOfBound) {
char* prevValues = pFillInfo->prevValues; char* prevValues = pFillInfo->prevValues;
char* nextValues = pFillInfo->nextValues; char* nextValues = pFillInfo->nextValues;
SPoint point1, point2, point; SPoint point1, point2, point;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
char* val = elePtrAt(data[0]->data, TSDB_KEYSIZE, *num); char* val = elePtrAt(data[0]->data, TSDB_KEYSIZE, *num);
...@@ -279,7 +312,7 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* ...@@ -279,7 +312,7 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t*
} }
} }
setTagsValue(pFillInfo, data, pTags, numOfValCols, *num); setTagsValue(pFillInfo, data, *num);
} else if (pFillInfo->fillType == TSDB_FILL_LINEAR) { } else if (pFillInfo->fillType == TSDB_FILL_LINEAR) {
// TODO : linear interpolation supports NULL value // TODO : linear interpolation supports NULL value
if (prevValues != NULL && !outOfBound) { if (prevValues != NULL && !outOfBound) {
...@@ -304,7 +337,7 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* ...@@ -304,7 +337,7 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t*
taosDoLinearInterpolation(type, &point1, &point2, &point); taosDoLinearInterpolation(type, &point1, &point2, &point);
} }
setTagsValue(pFillInfo, data, pTags, numOfValCols, *num); setTagsValue(pFillInfo, data, *num);
} else { } else {
for (int32_t i = 1; i < numOfValCols; ++i) { for (int32_t i = 1; i < numOfValCols; ++i) {
...@@ -319,7 +352,7 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* ...@@ -319,7 +352,7 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t*
} }
} }
setTagsValue(pFillInfo, data, pTags, numOfValCols, *num); setTagsValue(pFillInfo, data, *num);
} }
} else { /* fill the default value */ } else { /* fill the default value */
...@@ -330,7 +363,7 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* ...@@ -330,7 +363,7 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t*
assignVal(val1, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type); assignVal(val1, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type);
} }
setTagsValue(pFillInfo, data, pTags, numOfValCols, *num); setTagsValue(pFillInfo, data, *num);
} }
pFillInfo->start += (pFillInfo->slidingTime * step); pFillInfo->start += (pFillInfo->slidingTime * step);
...@@ -364,17 +397,14 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu ...@@ -364,17 +397,14 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
char** nextValues = &pFillInfo->nextValues; char** nextValues = &pFillInfo->nextValues;
int32_t numOfTags = pFillInfo->numOfTags; int32_t numOfTags = pFillInfo->numOfTags;
char** pTags = pFillInfo->pTags;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
if (numOfRows == 0) { if (numOfRows == 0) {
/* /*
* These data are generated according to fill strategy, since the current timestamp is out of time window of * These data are generated according to fill strategy, since the current timestamp is out of time window of
* real result set. Note that we need to keep the direct previous result rows, to generated the filled data. * real result set. Note that we need to keep the direct previous result rows, to generated the filled data.
*/ */
while (num < outputRows) { while (num < outputRows) {
doInterpoResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, pTags, true); doFillResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, true);
} }
pFillInfo->numOfTotal += pFillInfo->numOfCurrent; pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
...@@ -401,12 +431,11 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu ...@@ -401,12 +431,11 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
while (((pFillInfo->start < ts && FILL_IS_ASC_FILL(pFillInfo)) || while (((pFillInfo->start < ts && FILL_IS_ASC_FILL(pFillInfo)) ||
(pFillInfo->start > ts && !FILL_IS_ASC_FILL(pFillInfo))) && num < outputRows) { (pFillInfo->start > ts && !FILL_IS_ASC_FILL(pFillInfo))) && num < outputRows) {
doInterpoResultImpl(pFillInfo, data, &num, srcData, ts, pTags, false); doFillResultImpl(pFillInfo, data, &num, srcData, ts, false);
} }
/* output buffer is full, abort */ /* output buffer is full, abort */
if ((num == outputRows && FILL_IS_ASC_FILL(pFillInfo)) || if ((num == outputRows && FILL_IS_ASC_FILL(pFillInfo)) || (num < 0 && !FILL_IS_ASC_FILL(pFillInfo))) {
(num < 0 && !FILL_IS_ASC_FILL(pFillInfo))) {
pFillInfo->numOfTotal += pFillInfo->numOfCurrent; pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
return outputRows; return outputRows;
} }
...@@ -415,10 +444,12 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu ...@@ -415,10 +444,12 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
initBeforeAfterDataBuf(pFillInfo, prevValues); initBeforeAfterDataBuf(pFillInfo, prevValues);
// assign rows to dst buffer // assign rows to dst buffer
int32_t i = 0; for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
for (; i < pFillInfo->numOfCols - numOfTags; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i]; SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (pCol->flag == TSDB_COL_TAG) {
continue;
}
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, num); char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, num);
char* src = elePtrAt(srcData[i], pCol->col.bytes, pFillInfo->rowIdx); char* src = elePtrAt(srcData[i], pCol->col.bytes, pFillInfo->rowIdx);
...@@ -440,10 +471,12 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu ...@@ -440,10 +471,12 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
} }
// set the tag value for final result // set the tag value for final result
setTagsValue(pFillInfo, data, pTags, pFillInfo->numOfCols - numOfTags, num); setTagsValue(pFillInfo, data, num);
pFillInfo->start += (pFillInfo->slidingTime * step); pFillInfo->start += (pFillInfo->slidingTime * step);
pFillInfo->rowIdx += 1; pFillInfo->rowIdx += 1;
pFillInfo->numOfCurrent +=1;
num += 1; num += 1;
} }
......
...@@ -541,7 +541,7 @@ void rpcCancelRequest(void *handle) { ...@@ -541,7 +541,7 @@ void rpcCancelRequest(void *handle) {
if (pContext->signature != pContext) return; if (pContext->signature != pContext) return;
if (pContext->pConn) { if (pContext->pConn) {
tDebug("%s, app trys to cancel request", pContext->pConn->info); tDebug("%s, app tries to cancel request", pContext->pConn->info);
pContext->pConn->pReqMsg = NULL; pContext->pConn->pReqMsg = NULL;
rpcCloseConn(pContext->pConn); rpcCloseConn(pContext->pConn);
pContext->pConn = NULL; pContext->pConn = NULL;
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
#include "os.h" #include "os.h"
#include "tulog.h" #include "tulog.h"
#include "talgo.h" #include "talgo.h"
#include "tutil.h"
#include "tcompare.h" #include "tcompare.h"
#include "exception.h" #include "exception.h"
...@@ -599,6 +598,8 @@ static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSK ...@@ -599,6 +598,8 @@ static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSK
static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks) { static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks) {
// load all the comp offset value for all tables in this file // load all the comp offset value for all tables in this file
int32_t code = TSDB_CODE_SUCCESS;
*numOfBlocks = 0; *numOfBlocks = 0;
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
...@@ -606,7 +607,10 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo ...@@ -606,7 +607,10 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
pCheckInfo->numOfBlocks = 0; pCheckInfo->numOfBlocks = 0;
tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb); if (tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb) != TSDB_CODE_SUCCESS) {
code = terrno;
break;
}
SCompIdx* compIndex = &pQueryHandle->rhelper.curCompIdx; SCompIdx* compIndex = &pQueryHandle->rhelper.curCompIdx;
...@@ -619,7 +623,11 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo ...@@ -619,7 +623,11 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
assert(compIndex->len > 0); assert(compIndex->len > 0);
char* t = realloc(pCheckInfo->pCompInfo, compIndex->len); char* t = realloc(pCheckInfo->pCompInfo, compIndex->len);
assert(t != NULL); if (t == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
code = TSDB_CODE_TDB_OUT_OF_MEMORY;
break;
}
pCheckInfo->pCompInfo = (SCompInfo*) t; pCheckInfo->pCompInfo = (SCompInfo*) t;
pCheckInfo->compSize = compIndex->len; pCheckInfo->compSize = compIndex->len;
...@@ -661,7 +669,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo ...@@ -661,7 +669,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
(*numOfBlocks) += pCheckInfo->numOfBlocks; (*numOfBlocks) += pCheckInfo->numOfBlocks;
} }
return TSDB_CODE_SUCCESS; return code;
} }
#define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block) \ #define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block) \
...@@ -672,14 +680,19 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo ...@@ -672,14 +680,19 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
.uid = (_checkInfo)->tableId.uid}) .uid = (_checkInfo)->tableId.uid})
static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) { static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
STsdbRepo *pRepo = pQueryHandle->pTsdb; STsdbRepo *pRepo = pQueryHandle->pTsdb;
bool blockLoaded = false;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
if (pCheckInfo->pDataCols == NULL) { if (pCheckInfo->pDataCols == NULL) {
STsdbMeta* pMeta = tsdbGetMeta(pRepo); STsdbMeta* pMeta = tsdbGetMeta(pRepo);
pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock); pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
if (pCheckInfo->pDataCols == NULL) {
tsdbError("%p failed to malloc buf, %p", pQueryHandle, pQueryHandle->qinfo);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return terrno;
}
} }
STSchema* pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj); STSchema* pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj);
...@@ -688,16 +701,17 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo ...@@ -688,16 +701,17 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
tdInitDataCols(pQueryHandle->rhelper.pDataCols[1], pSchema); tdInitDataCols(pQueryHandle->rhelper.pDataCols[1], pSchema);
int16_t* colIds = pQueryHandle->defaultLoadColumn->pData; int16_t* colIds = pQueryHandle->defaultLoadColumn->pData;
int32_t ret = tsdbLoadBlockDataCols(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, (int)(QH_GET_NUM_OF_COLS(pQueryHandle))); int32_t ret = tsdbLoadBlockDataCols(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, (int)(QH_GET_NUM_OF_COLS(pQueryHandle)));
if (ret == TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo; return terrno;
}
pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup; SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
pBlockLoadInfo->slot = pQueryHandle->cur.slot;
pBlockLoadInfo->tid = pCheckInfo->pTableObj->tableId.tid;
blockLoaded = true; pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup;
} pBlockLoadInfo->slot = pQueryHandle->cur.slot;
pBlockLoadInfo->tid = pCheckInfo->pTableObj->tableId.tid;
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows); assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
...@@ -709,12 +723,14 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo ...@@ -709,12 +723,14 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64" , rows:%d, elapsed time:%"PRId64 " us, %p", tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64" , rows:%d, elapsed time:%"PRId64 " us, %p",
pQueryHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, pQueryHandle->qinfo); pQueryHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, pQueryHandle->qinfo);
return blockLoaded;
return TSDB_CODE_SUCCESS;
} }
static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
int32_t code = TSDB_CODE_SUCCESS;
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo); /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order); SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order);
...@@ -742,10 +758,14 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* ...@@ -742,10 +758,14 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
cur->mixBlock = true; cur->mixBlock = true;
cur->blockCompleted = false; cur->blockCompleted = false;
return; return code;
}
// return error, add test cases
if ((code = doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
return code;
} }
doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else { } else {
/* /*
...@@ -764,16 +784,20 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* ...@@ -764,16 +784,20 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1); cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1);
pCheckInfo->lastKey = cur->lastKey; pCheckInfo->lastKey = cur->lastKey;
} }
return code;
} }
static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) { static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, bool* exists) {
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
int32_t code = TSDB_CODE_SUCCESS;
if (ASCENDING_TRAVERSE(pQueryHandle->order)) { if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
// query ended in/started from current block // query ended in/started from current block
if (pQueryHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) { if (pQueryHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) {
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) { if ((code = doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
return false; *exists = false;
return code;
} }
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0]; SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
...@@ -789,12 +813,13 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock ...@@ -789,12 +813,13 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
assert(pCheckInfo->lastKey <= pBlock->keyLast); assert(pCheckInfo->lastKey <= pBlock->keyLast);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else { // the whole block is loaded in to buffer } else { // the whole block is loaded in to buffer
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
} }
} else { //desc order, query ended in current block } else { //desc order, query ended in current block
if (pQueryHandle->window.ekey > pBlock->keyFirst || pCheckInfo->lastKey < pBlock->keyLast) { if (pQueryHandle->window.ekey > pBlock->keyFirst || pCheckInfo->lastKey < pBlock->keyLast) {
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) { if ((code = doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
return false; *exists = false;
return code;
} }
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0]; SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
...@@ -807,11 +832,12 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock ...@@ -807,11 +832,12 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
assert(pCheckInfo->lastKey >= pBlock->keyFirst); assert(pCheckInfo->lastKey >= pBlock->keyFirst);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else { } else {
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
} }
} }
return pQueryHandle->realNumOfRows > 0; *exists = pQueryHandle->realNumOfRows > 0;
return code;
} }
static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) { static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
...@@ -1567,9 +1593,7 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex ...@@ -1567,9 +1593,7 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
cur->fid = pQueryHandle->pFileGroup->fileId; cur->fid = pQueryHandle->pFileGroup->fileId;
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
*exists = loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo); return loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo, exists);
return TSDB_CODE_SUCCESS;
} }
static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists) { static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists) {
...@@ -1608,16 +1632,14 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists ...@@ -1608,16 +1632,14 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
cur->blockCompleted = false; cur->blockCompleted = false;
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
*exists = loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo); return loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
return TSDB_CODE_SUCCESS;
} }
} else { } else {
tsdbDebug("%p continue in current data block, index:%d, %p", pQueryHandle, cur->slot, pQueryHandle->qinfo); tsdbDebug("%p continue in current data block, index:%d, %p", pQueryHandle, cur->slot, pQueryHandle->qinfo);
handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo); int32_t code = handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
*exists = pQueryHandle->realNumOfRows > 0; *exists = pQueryHandle->realNumOfRows > 0;
return TSDB_CODE_SUCCESS; return code;
} }
} }
} }
...@@ -1655,8 +1677,11 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1655,8 +1677,11 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
return false; return false;
} }
/*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo(pHandle, &blockInfo); tsdbRetrieveDataBlockInfo(pHandle, &blockInfo);
/*SArray *pDataBlock = */tsdbRetrieveDataBlock(pHandle, pQueryHandle->defaultLoadColumn); /*SArray *pDataBlock = */tsdbRetrieveDataBlock(pHandle, pQueryHandle->defaultLoadColumn);
if (terrno != TSDB_CODE_SUCCESS) {
return false;
}
if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) { if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) {
// data already retrieve, discard other data rows and return // data already retrieve, discard other data rows and return
...@@ -1714,9 +1739,9 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1714,9 +1739,9 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j); STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j);
STableCheckInfo info = { STableCheckInfo info = {
.lastKey = pSecQueryHandle->window.skey, .lastKey = pSecQueryHandle->window.skey,
//.tableId = pCheckInfo->tableId,
.pTableObj = pCheckInfo->pTableObj, .pTableObj = pCheckInfo->pTableObj,
}; };
info.tableId = pCheckInfo->tableId; info.tableId = pCheckInfo->tableId;
taosArrayPush(pSecQueryHandle->pTableCheckInfo, &info); taosArrayPush(pSecQueryHandle->pTableCheckInfo, &info);
...@@ -1726,8 +1751,10 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1726,8 +1751,10 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo); tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo);
pSecQueryHandle->defaultLoadColumn = taosArrayClone(pQueryHandle->defaultLoadColumn); pSecQueryHandle->defaultLoadColumn = taosArrayClone(pQueryHandle->defaultLoadColumn);
bool ret = tsdbNextDataBlock((void*) pSecQueryHandle); if (!tsdbNextDataBlock((void*) pSecQueryHandle)) {
assert(ret); tsdbCleanupQueryHandle(pSecQueryHandle);
return false;
}
tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo); tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo);
tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn); tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn);
...@@ -1770,7 +1797,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1770,7 +1797,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
bool exists = true; bool exists = true;
int32_t code = getDataBlocksInFiles(pQueryHandle, &exists); int32_t code = getDataBlocksInFiles(pQueryHandle, &exists);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return false; return code;
} }
if (exists) { if (exists) {
...@@ -2048,7 +2075,9 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) { ...@@ -2048,7 +2075,9 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
return pHandle->pColumns; return pHandle->pColumns;
} else { // only load the file block } else { // only load the file block
SCompBlock* pBlock = pBlockInfo->compBlock; SCompBlock* pBlock = pBlockInfo->compBlock;
doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot); if (doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot) != TSDB_CODE_SUCCESS) {
return NULL;
}
// todo refactor // todo refactor
int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1); int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
......
...@@ -190,7 +190,7 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext ...@@ -190,7 +190,7 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext
} }
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int duration) { void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int duration) {
if (pCacheObj == NULL || pCacheObj->pHashTable == NULL) { if (pCacheObj == NULL || pCacheObj->pHashTable == NULL || pCacheObj->deleting == 1) {
return NULL; return NULL;
} }
...@@ -261,7 +261,7 @@ static void incRefFn(void* ptNode) { ...@@ -261,7 +261,7 @@ static void incRefFn(void* ptNode) {
} }
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) { void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) { if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0 || pCacheObj->deleting == 1) {
return NULL; return NULL;
} }
......
...@@ -93,8 +93,11 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void** handle, ...@@ -93,8 +93,11 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void** handle,
vDebug("QInfo:%p exec completed, free handle:%d", *handle, *freeHandle); vDebug("QInfo:%p exec completed, free handle:%d", *handle, *freeHandle);
} }
} else { } else {
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); SRetrieveTableRsp* pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); memset(pRsp, 0, sizeof(SRetrieveTableRsp));
pRsp->completed = true;
pRet->rsp = pRsp;
*freeHandle = true; *freeHandle = true;
} }
...@@ -200,18 +203,18 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -200,18 +203,18 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle, vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle,
pReadMsg->rpcMsg.handle); pReadMsg->rpcMsg.handle);
code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle);
// todo test the error code case // set the real rsp error code
if (code == TSDB_CODE_SUCCESS) { pReadMsg->rpcMsg.code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle);
code = TSDB_CODE_QRY_HAS_RSP;
} // NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client
code = TSDB_CODE_QRY_HAS_RSP;
} else { } else {
freehandle = qQueryCompleted(*qhandle); freehandle = qQueryCompleted(*qhandle);
} }
// NOTE: if the qhandle is not put into vread queue or query is completed, free the qhandle. // NOTE: if the qhandle is not put into vread queue or query is completed, free the qhandle.
// if not build result, free it not by forced. // If the building of result is not required, simply free it. Otherwise, mandatorily free the qhandle
if (freehandle || (!buildRes)) { if (freehandle || (!buildRes)) {
qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle); qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle);
} }
......
...@@ -61,4 +61,70 @@ endi ...@@ -61,4 +61,70 @@ endi
sql select count(*) from lr_db0.lr_stb0 where ts>'2018-9-18 8:00:00' and ts<'2018-9-18 14:00:00' interval(1s) fill(NULL); sql select count(*) from lr_db0.lr_stb0 where ts>'2018-9-18 8:00:00' and ts<'2018-9-18 14:00:00' interval(1s) fill(NULL);
if $row != 21600 then if $row != 21600 then
return -1 return -1
endi endi
\ No newline at end of file
#regression test case 3
sql select t1,t1,count(*),t1,t1 from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' interval(1h) fill(NULL) group by t1 limit 1
if $row != 2 then
return -1
endi
if $data01 != 7 then
return -1
endi
if $data02 != 7 then
return -1
endi
if $data03 != 59 then
print expect 59, actual: $data03
return -1
endi
if $data04 != 7 then
return -1
endi
if $data11 != 8 then
return -1
endi
if $data12 != 8 then
return -1
endi
if $data13 != NULL then
return -1
endi
sql select t1,t1,count(*),t1,t1 from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' interval(1h) fill(NULL) group by t1 limit 9
if $rows != 18 then
return -1
endi
sql select t1,t1,count(*),t1,t1 from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' interval(1h) fill(NULL) group by t1 limit 12
if $rows != 24 then
return -1
endi
sql select t1,t1,count(*),t1,t1 from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' interval(1h) fill(NULL) group by t1 limit 25
if $rows != 48 then
return -1
endi
sql select t1,t1,count(*),t1,t1 from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' interval(1h) fill(NULL) group by t1 limit 25 offset 1
if $rows != 46 then
return -1
endi
sql select t1,t1,count(*),tbname,t1,t1,tbname from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' interval(1s) fill(NULL) group by tbname, t1 slimit 2 soffset 0 limit 250000 offset 1
if $rows != 172798 then
return -1
endi
sql select t1,t1,count(*),tbname,t1,t1,tbname from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' interval(1s) fill(NULL) group by tbname, t1 slimit 1 soffset 1 limit 250000 offset 1
if $rows != 86399 then
return -1
endi
#sleep 2000 sleep 2000
#run general/parser/alter.sim run general/parser/alter.sim
#sleep 2000 sleep 2000
#run general/parser/alter1.sim run general/parser/alter1.sim
#sleep 2000 sleep 2000
#run general/parser/alter_stable.sim run general/parser/alter_stable.sim
#sleep 2000 sleep 2000
#run general/parser/auto_create_tb.sim run general/parser/auto_create_tb.sim
#sleep 2000 sleep 2000
#run general/parser/auto_create_tb_drop_tb.sim run general/parser/auto_create_tb_drop_tb.sim
#sleep 2000 sleep 2000
#run general/parser/col_arithmetic_operation.sim run general/parser/col_arithmetic_operation.sim
#sleep 2000 sleep 2000
#run general/parser/columnValue.sim run general/parser/columnValue.sim
#sleep 2000 sleep 2000
#run general/parser/commit.sim run general/parser/commit.sim
#sleep 2000 sleep 2000
#run general/parser/create_db.sim run general/parser/create_db.sim
#sleep 2000 sleep 2000
#run general/parser/create_mt.sim run general/parser/create_mt.sim
#sleep 2000 sleep 2000
#run general/parser/create_tb.sim run general/parser/create_tb.sim
#sleep 2000 sleep 2000
#run general/parser/dbtbnameValidate.sim run general/parser/dbtbnameValidate.sim
sleep 2000 sleep 2000
run general/parser/fill.sim run general/parser/fill.sim
sleep 2000 sleep 2000
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册