未验证 提交 d1d0ff47 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #3069 from taosdata/feature/query

Feature/query
......@@ -210,7 +210,7 @@ void tscTagCondRelease(STagCond* pCond);
void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo);
void tscSetFreeHeatBeat(STscObj* pObj);
bool tscShouldFreeHeatBeat(SSqlObj* pHb);
bool tscShouldFreeHeartBeat(SSqlObj* pHb);
bool tscShouldBeFreed(SSqlObj* pSql);
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex);
......
......@@ -142,6 +142,7 @@ static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) {
pFillCol[i].col.bytes = pExpr->resBytes;
pFillCol[i].col.type = (int8_t)pExpr->resType;
pFillCol[i].col.colId = pExpr->colInfo.colId;
pFillCol[i].flag = pExpr->colInfo.flag;
pFillCol[i].col.offset = offset;
pFillCol[i].functionId = pExpr->functionId;
......@@ -379,20 +380,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
4096, (int32_t)numOfCols, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit,
tinfo.precision, pQueryInfo->fillType, pFillCol);
}
int32_t startIndex = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols;
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0 && pReducer->pFillInfo != NULL) {
pReducer->pFillInfo->pTags[0] = (char *)pReducer->pFillInfo->pTags + POINTER_BYTES * pQueryInfo->groupbyExpr.numOfGroupCols;
for (int32_t i = 1; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) {
SSchema *pSchema = getColumnModelSchema(pReducer->resColModel, startIndex + i - 1);
pReducer->pFillInfo->pTags[i] = pSchema->bytes + pReducer->pFillInfo->pTags[i - 1];
}
} else {
if (pReducer->pFillInfo != NULL) {
assert(pReducer->pFillInfo->pTags == NULL);
}
}
}
static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage,
......@@ -835,7 +822,7 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource *
}
}
void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo, SFillInfo *pFillInfo) {
void savePrevRecordAndSetupFillInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo, SFillInfo *pFillInfo) {
// discard following dataset in the same group and reset the interpolation information
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -878,64 +865,66 @@ static void reversedCopyFromInterpolationToDstBuf(SQueryInfo *pQueryInfo, SSqlRe
}
}
/*
* Note: pRes->pLocalReducer may be null, due to the fact that "tscDestroyLocalReducer" is called
* by "interuptHandler" function in shell
*/
static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) {
SSqlCmd * pCmd = &pSql->cmd;
SSqlRes * pRes = &pSql->res;
tFilePage * pFinalDataPage = pLocalReducer->pResultBuf;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer, SQueryInfo* pQueryInfo) {
assert(pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE);
// no interval query, no fill operation
if (pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) {
pRes->data = pLocalReducer->pFinalRes;
pRes->numOfRows = pFinalDataPage->num;
tFilePage * pBeforeFillData = pLocalReducer->pResultBuf;
if (pQueryInfo->limit.offset > 0) {
if (pQueryInfo->limit.offset < pRes->numOfRows) {
int32_t prevSize = (int32_t)pFinalDataPage->num;
tColModelErase(pLocalReducer->resColModel, pFinalDataPage, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1);
pRes->data = pLocalReducer->pFinalRes;
pRes->numOfRows = pBeforeFillData->num;
/* remove the hole in column model */
tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize);
if (pQueryInfo->limit.offset > 0) {
if (pQueryInfo->limit.offset < pRes->numOfRows) {
int32_t prevSize = (int32_t)pBeforeFillData->num;
tColModelErase(pLocalReducer->resColModel, pBeforeFillData, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1);
pRes->numOfRows -= pQueryInfo->limit.offset;
pQueryInfo->limit.offset = 0;
} else {
pQueryInfo->limit.offset -= pRes->numOfRows;
pRes->numOfRows = 0;
}
/* remove the hole in column model */
tColModelCompact(pLocalReducer->resColModel, pBeforeFillData, prevSize);
pRes->numOfRows -= pQueryInfo->limit.offset;
pQueryInfo->limit.offset = 0;
} else {
pQueryInfo->limit.offset -= pRes->numOfRows;
pRes->numOfRows = 0;
}
}
pRes->numOfRowsGroup += pRes->numOfRows;
pRes->numOfRowsGroup += pRes->numOfRows;
if (pQueryInfo->limit.limit >= 0 && pRes->numOfRowsGroup > pQueryInfo->limit.limit) {
/* impose the limitation of output rows on the final result */
int32_t prevSize = (int32_t)pFinalDataPage->num;
int32_t overflow = (int32_t)(pRes->numOfRowsGroup - pQueryInfo->limit.limit);
assert(overflow < pRes->numOfRows);
// impose the limitation of output rows on the final result
if (pQueryInfo->limit.limit >= 0 && pRes->numOfRowsGroup > pQueryInfo->limit.limit) {
int32_t prevSize = (int32_t)pBeforeFillData->num;
int32_t overflow = (int32_t)(pRes->numOfRowsGroup - pQueryInfo->limit.limit);
assert(overflow < pRes->numOfRows);
pRes->numOfRowsGroup = pQueryInfo->limit.limit;
pRes->numOfRows -= overflow;
pFinalDataPage->num -= overflow;
pRes->numOfRowsGroup = pQueryInfo->limit.limit;
pRes->numOfRows -= overflow;
pBeforeFillData->num -= overflow;
tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize);
tColModelCompact(pLocalReducer->resColModel, pBeforeFillData, prevSize);
/* set remain data to be discarded, and reset the interpolation information */
savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo);
}
// set remain data to be discarded, and reset the interpolation information
savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo);
}
memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * pLocalReducer->finalRowSize);
memcpy(pRes->data, pBeforeFillData->data, pRes->numOfRows * pLocalReducer->finalRowSize);
pRes->numOfClauseTotal += pRes->numOfRows;
pFinalDataPage->num = 0;
return;
}
pRes->numOfClauseTotal += pRes->numOfRows;
pBeforeFillData->num = 0;
}
/*
* Note: pRes->pLocalReducer may be null, due to the fact that "tscDestroyLocalReducer" is called
* by "interuptHandler" function in shell
*/
static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
tFilePage *pBeforeFillData = pLocalReducer->pResultBuf;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SFillInfo *pFillInfo = pLocalReducer->pFillInfo;
SFillInfo *pFillInfo = pLocalReducer->pFillInfo;
int64_t actualETime = MAX(pQueryInfo->window.skey, pQueryInfo->window.ekey);
tFilePage **pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput);
......@@ -973,7 +962,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
break;
}
/* all output for current group are completed */
// all output in current group are completed
int32_t totalRemainRows = (int32_t)getFilledNumOfRes(pFillInfo, actualETime, pLocalReducer->resColModel->capacity);
if (totalRemainRows <= 0) {
break;
......@@ -983,15 +972,16 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
}
if (pRes->numOfRows > 0) {
if (pQueryInfo->limit.limit >= 0 && pRes->numOfRows > pQueryInfo->limit.limit) {
int32_t overflow = (int32_t)(pRes->numOfRows - pQueryInfo->limit.limit);
pRes->numOfRows -= overflow;
pFinalDataPage->num -= overflow;
int32_t currentTotal = pRes->numOfRowsGroup + pRes->numOfRows;
assert(pRes->numOfRows >= 0 && pFinalDataPage->num > 0);
if (pQueryInfo->limit.limit >= 0 && currentTotal > pQueryInfo->limit.limit) {
int32_t overflow = (int32_t)(currentTotal - pQueryInfo->limit.limit);
pRes->numOfRows -= overflow;
assert(pRes->numOfRows >= 0);
/* set remain data to be discarded, and reset the interpolation information */
savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pFillInfo);
savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pFillInfo);
}
if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
......@@ -1008,7 +998,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
pRes->numOfClauseTotal += pRes->numOfRows;
}
pFinalDataPage->num = 0;
pBeforeFillData->num = 0;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
taosTFree(pResPages[i]);
}
......@@ -1225,7 +1215,7 @@ static bool saveGroupResultInfo(SSqlObj *pSql) {
* @param noMoreCurrentGroupRes
* @return if current group is skipped, return false, and do NOT record it into pRes->numOfGroups
*/
bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCurrentGroupRes) {
bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCurrentGroupRes) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
......@@ -1259,13 +1249,21 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no
// tColModelDisplay(pLocalReducer->resColModel, pLocalReducer->pBufForInterpo, pResBuf->num, pResBuf->num);
#endif
SFillInfo* pFillInfo = pLocalReducer->pFillInfo;
if (pFillInfo != NULL) {
taosFillSetStartInfo(pFillInfo, (int32_t)pResBuf->num, pQueryInfo->window.ekey);
taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf);
}
doFillResult(pSql, pLocalReducer, noMoreCurrentGroupRes);
// no interval query, no fill operation
if (pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) {
genFinalResWithoutFill(pRes, pLocalReducer, pQueryInfo);
} else {
SFillInfo* pFillInfo = pLocalReducer->pFillInfo;
if (pFillInfo != NULL) {
taosFillSetStartInfo(pFillInfo, (int32_t)pResBuf->num, pQueryInfo->window.ekey);
taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf);
}
doFillResult(pSql, pLocalReducer, noMoreCurrentGroupRes);
}
return true;
}
......@@ -1350,7 +1348,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
int64_t etime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey;
int32_t rows = (int32_t)getFilledNumOfRes(pFillInfo, etime, pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo
if (rows > 0) {
doFillResult(pSql, pLocalReducer, true);
}
}
......@@ -1515,7 +1513,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
*/
if ((!sameGroup && pResBuf->num > 0) || (pResBuf->num == pLocalReducer->resColModel->capacity)) {
// does not belong to the same group
bool notSkipped = doGenerateFinalResults(pSql, pLocalReducer, !sameGroup);
bool notSkipped = genFinalResults(pSql, pLocalReducer, !sameGroup);
// this row needs to discard, since it belongs to the group of previous
if (pLocalReducer->discard && sameGroup) {
......@@ -1584,7 +1582,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
}
if (pLocalReducer->pResultBuf->num) {
doGenerateFinalResults(pSql, pLocalReducer, true);
genFinalResults(pSql, pLocalReducer, true);
}
assert(pLocalReducer->status == TSC_LOCALREDUCE_IN_PROGRESS && pRes->row == 0);
......
......@@ -171,45 +171,25 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
void tscProcessActivityTimer(void *handle, void *tmrId) {
STscObj *pObj = (STscObj *)handle;
if (pObj == NULL) return;
if (pObj->signature != pObj) return;
if (pObj->pTimer != tmrId) return;
if (pObj->pHb == NULL) {
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (NULL == pSql) return;
pSql->fp = tscProcessHeartBeatRsp;
SQueryInfo *pQueryInfo = NULL;
tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
pQueryInfo->command = TSDB_SQL_HB;
pSql->cmd.command = TSDB_SQL_HB;
if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
taosTFree(pSql);
return;
}
pSql->cmd.command = TSDB_SQL_HB;
pSql->param = pObj;
pSql->pTscObj = pObj;
pSql->signature = pSql;
pObj->pHb = pSql;
tscAddSubqueryInfo(&pObj->pHb->cmd);
tscDebug("%p pHb is allocated, pObj:%p", pObj->pHb, pObj);
if (pObj == NULL || pObj->signature != pObj) {
return;
}
if (tscShouldFreeHeatBeat(pObj->pHb)) {
tscDebug("%p free HB object and release connection", pObj);
tscFreeSqlObj(pObj->pHb);
tscCloseTscObj(pObj);
SSqlObj* pHB = pObj->pHb;
if (pObj->pTimer != tmrId || pHB == NULL) {
return;
}
tscProcessSql(pObj->pHb);
if (tscShouldFreeHeartBeat(pHB)) {
tscDebug("%p free HB object and release connection", pHB);
tscFreeSqlObj(pHB);
tscCloseTscObj(pObj);
} else {
int32_t code = tscProcessSql(pHB);
if (code != TSDB_CODE_SUCCESS) {
tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
}
}
}
int tscSendMsgToServer(SSqlObj *pSql) {
......@@ -265,6 +245,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
return;
}
pSql->pRpcCtx = NULL; // clear the rpcCtx
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
......@@ -1953,6 +1935,35 @@ int tscProcessShowRsp(SSqlObj *pSql) {
return 0;
}
static void createHBObj(STscObj* pObj) {
if (pObj->pHb != NULL) {
return;
}
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (NULL == pSql) return;
pSql->fp = tscProcessHeartBeatRsp;
SQueryInfo *pQueryInfo = NULL;
tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo);
pQueryInfo->command = TSDB_SQL_HB;
pSql->cmd.command = pQueryInfo->command;
if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) {
taosTFree(pSql);
return;
}
pSql->param = pObj;
pSql->pTscObj = pObj;
pSql->signature = pSql;
pObj->pHb = pSql;
tscAddSubqueryInfo(&pObj->pHb->cmd);
tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj);
}
int tscProcessConnectRsp(SSqlObj *pSql) {
char temp[TSDB_TABLE_FNAME_LEN * 2];
STscObj *pObj = pSql->pTscObj;
......@@ -1974,6 +1985,9 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
pObj->writeAuth = pConnect->writeAuth;
pObj->superAuth = pConnect->superAuth;
pObj->connId = htonl(pConnect->connId);
createHBObj(pObj);
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
return 0;
......
......@@ -181,6 +181,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
return NULL;
}
TAOS *taos_connect_c(const char *ip, uint8_t ipLen, const char *user, uint8_t userLen,
const char *pass, uint8_t passLen, const char *db, uint8_t dbLen, uint16_t port) {
char ipBuf[TSDB_EP_LEN] = {0};
......@@ -215,10 +216,15 @@ void taos_close(TAOS *taos) {
}
if (pObj->pHb != NULL) {
if (pObj->pHb->pRpcCtx != NULL) { // wait for rsp from dnode
rpcCancelRequest(pObj->pHb->pRpcCtx);
}
tscSetFreeHeatBeat(pObj);
} else {
tscCloseTscObj(pObj);
tscFreeSqlObj(pObj->pHb);
}
tscCloseTscObj(pObj);
}
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
......
......@@ -158,6 +158,7 @@ void taos_init() { pthread_once(&tscinit, taos_init_imp); }
void taos_cleanup() {
if (tscCacheHandle != NULL) {
taosCacheCleanup(tscCacheHandle);
tscCacheHandle = NULL;
}
if (tscQhandle != NULL) {
......
......@@ -1394,7 +1394,7 @@ void tscSetFreeHeatBeat(STscObj* pObj) {
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
}
bool tscShouldFreeHeatBeat(SSqlObj* pHb) {
bool tscShouldFreeHeartBeat(SSqlObj* pHb) {
assert(pHb == pHb->signature);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHb->cmd, 0);
......@@ -2141,43 +2141,6 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) {
}
}
//void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) {
// SFieldSupInfo* pInfo = TARRAY_GET_ELEM(pFieldInfo->pSupportInfo, columnIndex);
// assert(pInfo->pSqlExpr != NULL);
//
// int32_t type = pInfo->pSqlExpr->resType;
// int32_t bytes = pInfo->pSqlExpr->resBytes;
//
// char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row;
//
// if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
// int32_t realLen = varDataLen(pData);
// assert(realLen <= bytes - VARSTR_HEADER_SIZE);
//
// if (isNull(pData, type)) {
// pRes->tsrow[columnIndex] = NULL;
// } else {
// pRes->tsrow[columnIndex] = ((tstr*)pData)->data;
// }
//
// if (realLen < pInfo->pSqlExpr->resBytes - VARSTR_HEADER_SIZE) { // todo refactor
// *(pData + realLen + VARSTR_HEADER_SIZE) = 0;
// }
//
// pRes->length[columnIndex] = realLen;
// } else {
// assert(bytes == tDataTypeDesc[type].nSize);
//
// if (isNull(pData, type)) {
// pRes->tsrow[columnIndex] = NULL;
// } else {
// pRes->tsrow[columnIndex] = pData;
// }
//
// pRes->length[columnIndex] = bytes;
// }
//}
void* malloc_throw(size_t size) {
void* p = malloc(size);
if (p == NULL) {
......
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdataformat.h"
#include "tulog.h"
#include "talgo.h"
#include "tcoding.h"
#include "wchar.h"
......@@ -311,10 +312,14 @@ void dataColSetOffset(SDataCol *pCol, int nEle) {
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols));
if (pCols == NULL) return NULL;
if (pCols == NULL) {
uDebug("malloc failure, size:%"PRId64" failed, reason:%s", sizeof(SDataCols), strerror(errno));
return NULL;
}
pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol));
if (pCols->cols == NULL) {
uDebug("malloc failure, size:%"PRId64" failed, reason:%s", sizeof(SDataCol) * maxCols, strerror(errno));
tdFreeDataCols(pCols);
return NULL;
}
......@@ -326,6 +331,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
pCols->buf = malloc(pCols->bufSize);
if (pCols->buf == NULL) {
uDebug("malloc failure, size:%"PRId64" failed, reason:%s", sizeof(SDataCol) * maxCols, strerror(errno));
tdFreeDataCols(pCols);
return NULL;
}
......
......@@ -211,8 +211,8 @@ static void *dnodeProcessReadQueue(void *param) {
dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
} else {
if (code == TSDB_CODE_QRY_HAS_RSP) {
dnodeSendRpcReadRsp(pVnode, pReadMsg, TSDB_CODE_SUCCESS);
} else {
dnodeSendRpcReadRsp(pVnode, pReadMsg, pReadMsg->rpcMsg.code);
} else { // code == TSDB_CODE_NOT_READY, do not return msg to client
dnodeDispatchNonRspMsg(pVnode, pReadMsg, code);
}
}
......
......@@ -310,7 +310,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
if (error_no == 0) {
printf("Query OK, %d row(s) in set (%.6fs)\n", numOfRows, (et - st) / 1E6);
} else {
printf("Query interrupted (%s), %d row(s) in set (%.6fs)\n", taos_errstr(con), numOfRows, (et - st) / 1E6);
printf("Query interrupted (%s), %d row(s) in set (%.6fs)\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6);
}
} else {
int num_rows_affacted = taos_affected_rows(pSql);
......
......@@ -210,6 +210,7 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num
}
}
// todo refactor
if (tscResultsetFetchCompleted(result)) {
isContinue = false;
}
......
......@@ -30,6 +30,11 @@ typedef struct {
int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN
union {int64_t i; double d;} fillVal;
} SFillColInfo;
typedef struct {
SSchema col;
char* tagVal;
} SFillTagColInfo;
typedef struct SFillInfo {
TSKEY start; // start timestamp
......@@ -44,7 +49,8 @@ typedef struct SFillInfo {
int32_t numOfTags; // number of tags
int32_t numOfCols; // number of columns, including the tags columns
int32_t rowSize; // size of each row
char ** pTags; // tags value for current interpolation
// char ** pTags; // tags value for current interpolation
SFillTagColInfo* pTags; // tags value for filling gap
int64_t slidingTime; // sliding value to determine the number of result for a given time window
char * prevValues; // previous row of data, to generate the interpolation results
char * nextValues; // next row of data
......
......@@ -2312,13 +2312,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
while (true) {
if (!tsdbNextDataBlock(pQueryHandle)) {
if (terrno != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, terrno);
}
break;
}
while (tsdbNextDataBlock(pQueryHandle)) {
summary->totalBlocks += 1;
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
......@@ -2338,6 +2332,11 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
continue;
}
if (terrno != TSDB_CODE_SUCCESS) { // load data block failed, abort query
longjmp(pRuntimeEnv->env, terrno);
break;
}
// query start position can not move into tableApplyFunctionsOnBlock due to limit/offset condition
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : blockInfo.rows - 1;
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock);
......@@ -2352,6 +2351,10 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
if (terrno != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, terrno);
}
// if the result buffer is not full, set the query complete
if (!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
setQueryStatus(pQuery, QUERY_COMPLETED);
......@@ -4041,14 +4044,7 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle;
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
while (true) {
if (!tsdbNextDataBlock(pQueryHandle)) {
if (terrno != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, terrno);
}
break;
}
while (tsdbNextDataBlock(pQueryHandle)) {
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
......@@ -4068,6 +4064,10 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
break;
}
}
if (terrno != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, terrno);
}
}
static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
......@@ -4092,14 +4092,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
STableQueryInfo *pTableQueryInfo = pQuery->current;
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
while (true) {
if (!tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) {
if (terrno != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, terrno);
}
break;
}
while (tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) {
tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle, &blockInfo);
if (QUERY_IS_ASC_QUERY(pQuery)) {
......@@ -4195,6 +4188,11 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
}
}
// check for error
if (terrno != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, terrno);
}
return true;
}
......@@ -4411,14 +4409,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
while (true) {
if (!tsdbNextDataBlock(pQueryHandle)) {
if (terrno != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, terrno);
}
break;
}
while (tsdbNextDataBlock(pQueryHandle)) {
summary->totalBlocks += 1;
if (IS_QUERY_KILLED(pQInfo)) {
......@@ -4452,6 +4443,10 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
pQInfo, blockInfo.uid, blockInfo.tid, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, pQuery->current->lastKey);
}
if (terrno != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, terrno);
}
updateWindowResNumOfRes(pRuntimeEnv);
int64_t et = taosGetTimestampMs();
......@@ -6496,8 +6491,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
(*pRsp)->numOfRows = htonl((int32_t)pQuery->rec.rows);
int32_t code = pQInfo->code;
if (code == TSDB_CODE_SUCCESS) {
if (pQInfo->code == TSDB_CODE_SUCCESS) {
(*pRsp)->offset = htobe64(pQuery->limit.offset);
(*pRsp)->useconds = htobe64(pRuntimeEnv->summary.elapsedTime);
} else {
......@@ -6506,11 +6500,10 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
}
(*pRsp)->precision = htons(pQuery->precision);
if (pQuery->rec.rows > 0 && code == TSDB_CODE_SUCCESS) {
code = doDumpQueryResult(pQInfo, (*pRsp)->data);
if (pQuery->rec.rows > 0 && pQInfo->code == TSDB_CODE_SUCCESS) {
doDumpQueryResult(pQInfo, (*pRsp)->data);
} else {
setQueryStatus(pQuery, QUERY_OVER);
code = pQInfo->code;
}
pQInfo->rspContext = NULL;
......@@ -6524,7 +6517,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
qDebug("QInfo:%p has more results waits for client retrieve", pQInfo);
}
return code;
return pQInfo->code;
}
int32_t qQueryCompleted(qinfo_t qinfo) {
......
......@@ -42,19 +42,38 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_
pFillInfo->slidingUnit = slidingUnit;
pFillInfo->pData = malloc(POINTER_BYTES * numOfCols);
if (numOfTags > 0) {
pFillInfo->pTags = calloc(pFillInfo->numOfTags, sizeof(SFillTagColInfo));
for(int32_t i = 0; i < numOfTags; ++i) {
pFillInfo->pTags[i].col.colId = -2;
}
}
int32_t rowsize = 0;
int32_t k = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
int32_t bytes = pFillInfo->pFillCol[i].col.bytes;
pFillInfo->pData[i] = calloc(1, bytes * capacity);
rowsize += bytes;
}
if (numOfTags > 0) {
pFillInfo->pTags = calloc(1, pFillInfo->numOfTags * POINTER_BYTES + rowsize);
SFillColInfo* pColInfo = &pFillInfo->pFillCol[i];
pFillInfo->pData[i] = calloc(1, pColInfo->col.bytes * capacity);
if (pColInfo->flag == TSDB_COL_TAG) {
bool exists = false;
for(int32_t j = 0; j < k; ++j) {
if (pFillInfo->pTags[j].col.colId == pColInfo->col.colId) {
exists = true;
break;
}
}
if (!exists) {
pFillInfo->pTags[k].col.colId = pColInfo->col.colId;
pFillInfo->pTags[k].tagVal = calloc(1, pColInfo->col.bytes);
k += 1;
}
}
rowsize += pColInfo->col.bytes;
}
pFillInfo->rowSize = rowsize;
pFillInfo->capacityInRows = capacity;
......@@ -129,16 +148,21 @@ void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput)
void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInput) {
assert(pFillInfo->numOfRows == pInput->num);
int32_t t = 0;
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
char* s = pInput->data + pCol->col.offset * pInput->num;
memcpy(pFillInfo->pData[i], s, pInput->num * pCol->col.bytes);
if (pCol->flag == TSDB_COL_TAG && t < pFillInfo->numOfTags) { // copy the tag value
memcpy(pFillInfo->pTags[t++], pFillInfo->pData[i], pCol->col.bytes);
char* data = pInput->data + pCol->col.offset * pInput->num;
memcpy(pFillInfo->pData[i], data, pInput->num * pCol->col.bytes);
if (pCol->flag == TSDB_COL_TAG) { // copy the tag value to tag value buffer
for (int32_t j = 0; j < pFillInfo->numOfTags; ++j) {
SFillTagColInfo* pTag = &pFillInfo->pTags[j];
if (pTag->col.colId == pCol->col.colId) {
memcpy(pTag->tagVal, data, pCol->col.bytes);
break;
}
}
}
}
}
......@@ -224,22 +248,31 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi
return 0;
}
static void setTagsValue(SFillInfo* pColInfo, tFilePage** data, char** pTags, int32_t start, int32_t num) {
for (int32_t j = 0, i = start; i < pColInfo->numOfCols; ++i, ++j) {
SFillColInfo* pCol = &pColInfo->pFillCol[i];
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, num);
assignVal(val1, pTags[j], pCol->col.bytes, pCol->col.type);
static void setTagsValue(SFillInfo* pFillInfo, tFilePage** data, int32_t num) {
for(int32_t j = 0; j < pFillInfo->numOfCols; ++j) {
SFillColInfo* pCol = &pFillInfo->pFillCol[j];
if (pCol->flag == TSDB_COL_NORMAL) {
continue;
}
char* val1 = elePtrAt(data[j]->data, pCol->col.bytes, num);
for(int32_t i = 0; i < pFillInfo->numOfTags; ++i) {
SFillTagColInfo* pTag = &pFillInfo->pTags[i];
if (pTag->col.colId == pCol->col.colId) {
assignVal(val1, pTag->tagVal, pCol->col.bytes, pCol->col.type);
break;
}
}
}
}
static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* num, char** srcData,
int64_t ts, char** pTags, bool outOfBound) {
static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* num, char** srcData, int64_t ts,
bool outOfBound) {
char* prevValues = pFillInfo->prevValues;
char* nextValues = pFillInfo->nextValues;
SPoint point1, point2, point;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
char* val = elePtrAt(data[0]->data, TSDB_KEYSIZE, *num);
......@@ -279,7 +312,7 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t*
}
}
setTagsValue(pFillInfo, data, pTags, numOfValCols, *num);
setTagsValue(pFillInfo, data, *num);
} else if (pFillInfo->fillType == TSDB_FILL_LINEAR) {
// TODO : linear interpolation supports NULL value
if (prevValues != NULL && !outOfBound) {
......@@ -304,7 +337,7 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t*
taosDoLinearInterpolation(type, &point1, &point2, &point);
}
setTagsValue(pFillInfo, data, pTags, numOfValCols, *num);
setTagsValue(pFillInfo, data, *num);
} else {
for (int32_t i = 1; i < numOfValCols; ++i) {
......@@ -319,7 +352,7 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t*
}
}
setTagsValue(pFillInfo, data, pTags, numOfValCols, *num);
setTagsValue(pFillInfo, data, *num);
}
} else { /* fill the default value */
......@@ -330,7 +363,7 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t*
assignVal(val1, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type);
}
setTagsValue(pFillInfo, data, pTags, numOfValCols, *num);
setTagsValue(pFillInfo, data, *num);
}
pFillInfo->start += (pFillInfo->slidingTime * step);
......@@ -364,17 +397,14 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
char** nextValues = &pFillInfo->nextValues;
int32_t numOfTags = pFillInfo->numOfTags;
char** pTags = pFillInfo->pTags;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
if (numOfRows == 0) {
/*
* These data are generated according to fill strategy, since the current timestamp is out of time window of
* real result set. Note that we need to keep the direct previous result rows, to generated the filled data.
*/
while (num < outputRows) {
doInterpoResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, pTags, true);
doFillResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, true);
}
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
......@@ -401,12 +431,11 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
while (((pFillInfo->start < ts && FILL_IS_ASC_FILL(pFillInfo)) ||
(pFillInfo->start > ts && !FILL_IS_ASC_FILL(pFillInfo))) && num < outputRows) {
doInterpoResultImpl(pFillInfo, data, &num, srcData, ts, pTags, false);
doFillResultImpl(pFillInfo, data, &num, srcData, ts, false);
}
/* output buffer is full, abort */
if ((num == outputRows && FILL_IS_ASC_FILL(pFillInfo)) ||
(num < 0 && !FILL_IS_ASC_FILL(pFillInfo))) {
if ((num == outputRows && FILL_IS_ASC_FILL(pFillInfo)) || (num < 0 && !FILL_IS_ASC_FILL(pFillInfo))) {
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
return outputRows;
}
......@@ -415,10 +444,12 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
initBeforeAfterDataBuf(pFillInfo, prevValues);
// assign rows to dst buffer
int32_t i = 0;
for (; i < pFillInfo->numOfCols - numOfTags; ++i) {
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (pCol->flag == TSDB_COL_TAG) {
continue;
}
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, num);
char* src = elePtrAt(srcData[i], pCol->col.bytes, pFillInfo->rowIdx);
......@@ -440,10 +471,12 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
}
// set the tag value for final result
setTagsValue(pFillInfo, data, pTags, pFillInfo->numOfCols - numOfTags, num);
setTagsValue(pFillInfo, data, num);
pFillInfo->start += (pFillInfo->slidingTime * step);
pFillInfo->rowIdx += 1;
pFillInfo->numOfCurrent +=1;
num += 1;
}
......
......@@ -541,7 +541,7 @@ void rpcCancelRequest(void *handle) {
if (pContext->signature != pContext) return;
if (pContext->pConn) {
tDebug("%s, app trys to cancel request", pContext->pConn->info);
tDebug("%s, app tries to cancel request", pContext->pConn->info);
pContext->pConn->pReqMsg = NULL;
rpcCloseConn(pContext->pConn);
pContext->pConn = NULL;
......
......@@ -16,7 +16,6 @@
#include "os.h"
#include "tulog.h"
#include "talgo.h"
#include "tutil.h"
#include "tcompare.h"
#include "exception.h"
......@@ -599,6 +598,8 @@ static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSK
static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks) {
// load all the comp offset value for all tables in this file
int32_t code = TSDB_CODE_SUCCESS;
*numOfBlocks = 0;
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
......@@ -606,7 +607,10 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
pCheckInfo->numOfBlocks = 0;
tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb);
if (tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb) != TSDB_CODE_SUCCESS) {
code = terrno;
break;
}
SCompIdx* compIndex = &pQueryHandle->rhelper.curCompIdx;
......@@ -619,7 +623,11 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
assert(compIndex->len > 0);
char* t = realloc(pCheckInfo->pCompInfo, compIndex->len);
assert(t != NULL);
if (t == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
code = TSDB_CODE_TDB_OUT_OF_MEMORY;
break;
}
pCheckInfo->pCompInfo = (SCompInfo*) t;
pCheckInfo->compSize = compIndex->len;
......@@ -661,7 +669,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
(*numOfBlocks) += pCheckInfo->numOfBlocks;
}
return TSDB_CODE_SUCCESS;
return code;
}
#define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block) \
......@@ -672,14 +680,19 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
.uid = (_checkInfo)->tableId.uid})
static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
STsdbRepo *pRepo = pQueryHandle->pTsdb;
bool blockLoaded = false;
int64_t st = taosGetTimestampUs();
if (pCheckInfo->pDataCols == NULL) {
STsdbMeta* pMeta = tsdbGetMeta(pRepo);
pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
if (pCheckInfo->pDataCols == NULL) {
tsdbError("%p failed to malloc buf, %p", pQueryHandle, pQueryHandle->qinfo);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return terrno;
}
}
STSchema* pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj);
......@@ -688,16 +701,17 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
tdInitDataCols(pQueryHandle->rhelper.pDataCols[1], pSchema);
int16_t* colIds = pQueryHandle->defaultLoadColumn->pData;
int32_t ret = tsdbLoadBlockDataCols(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, (int)(QH_GET_NUM_OF_COLS(pQueryHandle)));
if (ret == TSDB_CODE_SUCCESS) {
SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
if (ret != TSDB_CODE_SUCCESS) {
return terrno;
}
pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup;
pBlockLoadInfo->slot = pQueryHandle->cur.slot;
pBlockLoadInfo->tid = pCheckInfo->pTableObj->tableId.tid;
SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
blockLoaded = true;
}
pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup;
pBlockLoadInfo->slot = pQueryHandle->cur.slot;
pBlockLoadInfo->tid = pCheckInfo->pTableObj->tableId.tid;
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
......@@ -709,12 +723,14 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64" , rows:%d, elapsed time:%"PRId64 " us, %p",
pQueryHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, pQueryHandle->qinfo);
return blockLoaded;
return TSDB_CODE_SUCCESS;
}
static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){
static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){
SQueryFilePos* cur = &pQueryHandle->cur;
SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
int32_t code = TSDB_CODE_SUCCESS;
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order);
......@@ -742,10 +758,14 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
cur->mixBlock = true;
cur->blockCompleted = false;
return;
return code;
}
// return error, add test cases
if ((code = doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
return code;
}
doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else {
/*
......@@ -764,16 +784,20 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1);
pCheckInfo->lastKey = cur->lastKey;
}
return code;
}
static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) {
static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, bool* exists) {
SQueryFilePos* cur = &pQueryHandle->cur;
int32_t code = TSDB_CODE_SUCCESS;
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
// query ended in/started from current block
if (pQueryHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) {
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) {
return false;
if ((code = doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
*exists = false;
return code;
}
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
......@@ -789,12 +813,13 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
assert(pCheckInfo->lastKey <= pBlock->keyLast);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else { // the whole block is loaded in to buffer
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
}
} else { //desc order, query ended in current block
if (pQueryHandle->window.ekey > pBlock->keyFirst || pCheckInfo->lastKey < pBlock->keyLast) {
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) {
return false;
if ((code = doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
*exists = false;
return code;
}
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
......@@ -807,11 +832,12 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
assert(pCheckInfo->lastKey >= pBlock->keyFirst);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else {
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
}
}
return pQueryHandle->realNumOfRows > 0;
*exists = pQueryHandle->realNumOfRows > 0;
return code;
}
static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
......@@ -1567,9 +1593,7 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
cur->fid = pQueryHandle->pFileGroup->fileId;
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
*exists = loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo);
return TSDB_CODE_SUCCESS;
return loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo, exists);
}
static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists) {
......@@ -1608,16 +1632,14 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
cur->blockCompleted = false;
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
*exists = loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo);
return TSDB_CODE_SUCCESS;
return loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
}
} else {
tsdbDebug("%p continue in current data block, index:%d, %p", pQueryHandle, cur->slot, pQueryHandle->qinfo);
handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
int32_t code = handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
*exists = pQueryHandle->realNumOfRows > 0;
return TSDB_CODE_SUCCESS;
return code;
}
}
}
......@@ -1655,8 +1677,11 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
return false;
}
/*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo(pHandle, &blockInfo);
tsdbRetrieveDataBlockInfo(pHandle, &blockInfo);
/*SArray *pDataBlock = */tsdbRetrieveDataBlock(pHandle, pQueryHandle->defaultLoadColumn);
if (terrno != TSDB_CODE_SUCCESS) {
return false;
}
if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) {
// data already retrieve, discard other data rows and return
......@@ -1714,9 +1739,9 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j);
STableCheckInfo info = {
.lastKey = pSecQueryHandle->window.skey,
//.tableId = pCheckInfo->tableId,
.pTableObj = pCheckInfo->pTableObj,
};
info.tableId = pCheckInfo->tableId;
taosArrayPush(pSecQueryHandle->pTableCheckInfo, &info);
......@@ -1726,8 +1751,10 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo);
pSecQueryHandle->defaultLoadColumn = taosArrayClone(pQueryHandle->defaultLoadColumn);
bool ret = tsdbNextDataBlock((void*) pSecQueryHandle);
assert(ret);
if (!tsdbNextDataBlock((void*) pSecQueryHandle)) {
tsdbCleanupQueryHandle(pSecQueryHandle);
return false;
}
tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo);
tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn);
......@@ -1770,7 +1797,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
bool exists = true;
int32_t code = getDataBlocksInFiles(pQueryHandle, &exists);
if (code != TSDB_CODE_SUCCESS) {
return false;
return code;
}
if (exists) {
......@@ -2048,7 +2075,9 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
return pHandle->pColumns;
} else { // only load the file block
SCompBlock* pBlock = pBlockInfo->compBlock;
doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot);
if (doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot) != TSDB_CODE_SUCCESS) {
return NULL;
}
// todo refactor
int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
......
......@@ -190,7 +190,7 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext
}
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int duration) {
if (pCacheObj == NULL || pCacheObj->pHashTable == NULL) {
if (pCacheObj == NULL || pCacheObj->pHashTable == NULL || pCacheObj->deleting == 1) {
return NULL;
}
......@@ -261,7 +261,7 @@ static void incRefFn(void* ptNode) {
}
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) {
if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0 || pCacheObj->deleting == 1) {
return NULL;
}
......
......@@ -93,8 +93,11 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void** handle,
vDebug("QInfo:%p exec completed, free handle:%d", *handle, *freeHandle);
}
} else {
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
SRetrieveTableRsp* pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
pRsp->completed = true;
pRet->rsp = pRsp;
*freeHandle = true;
}
......@@ -200,18 +203,18 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle,
pReadMsg->rpcMsg.handle);
code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle);
// todo test the error code case
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_QRY_HAS_RSP;
}
// set the real rsp error code
pReadMsg->rpcMsg.code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle);
// NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client
code = TSDB_CODE_QRY_HAS_RSP;
} else {
freehandle = qQueryCompleted(*qhandle);
}
// NOTE: if the qhandle is not put into vread queue or query is completed, free the qhandle.
// if not build result, free it not by forced.
// If the building of result is not required, simply free it. Otherwise, mandatorily free the qhandle
if (freehandle || (!buildRes)) {
qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle);
}
......
......@@ -61,4 +61,70 @@ endi
sql select count(*) from lr_db0.lr_stb0 where ts>'2018-9-18 8:00:00' and ts<'2018-9-18 14:00:00' interval(1s) fill(NULL);
if $row != 21600 then
return -1
endi
\ No newline at end of file
endi
#regression test case 3
sql select t1,t1,count(*),t1,t1 from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' interval(1h) fill(NULL) group by t1 limit 1
if $row != 2 then
return -1
endi
if $data01 != 7 then
return -1
endi
if $data02 != 7 then
return -1
endi
if $data03 != 59 then
print expect 59, actual: $data03
return -1
endi
if $data04 != 7 then
return -1
endi
if $data11 != 8 then
return -1
endi
if $data12 != 8 then
return -1
endi
if $data13 != NULL then
return -1
endi
sql select t1,t1,count(*),t1,t1 from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' interval(1h) fill(NULL) group by t1 limit 9
if $rows != 18 then
return -1
endi
sql select t1,t1,count(*),t1,t1 from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' interval(1h) fill(NULL) group by t1 limit 12
if $rows != 24 then
return -1
endi
sql select t1,t1,count(*),t1,t1 from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' interval(1h) fill(NULL) group by t1 limit 25
if $rows != 48 then
return -1
endi
sql select t1,t1,count(*),t1,t1 from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' interval(1h) fill(NULL) group by t1 limit 25 offset 1
if $rows != 46 then
return -1
endi
sql select t1,t1,count(*),tbname,t1,t1,tbname from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' interval(1s) fill(NULL) group by tbname, t1 slimit 2 soffset 0 limit 250000 offset 1
if $rows != 172798 then
return -1
endi
sql select t1,t1,count(*),tbname,t1,t1,tbname from lr_stb0 where ts>'2018-09-24 00:00:00.000' and ts<'2018-09-25 00:00:00.000' interval(1s) fill(NULL) group by tbname, t1 slimit 1 soffset 1 limit 250000 offset 1
if $rows != 86399 then
return -1
endi
#sleep 2000
#run general/parser/alter.sim
#sleep 2000
#run general/parser/alter1.sim
#sleep 2000
#run general/parser/alter_stable.sim
#sleep 2000
#run general/parser/auto_create_tb.sim
#sleep 2000
#run general/parser/auto_create_tb_drop_tb.sim
#sleep 2000
#run general/parser/col_arithmetic_operation.sim
#sleep 2000
#run general/parser/columnValue.sim
#sleep 2000
#run general/parser/commit.sim
#sleep 2000
#run general/parser/create_db.sim
#sleep 2000
#run general/parser/create_mt.sim
#sleep 2000
#run general/parser/create_tb.sim
#sleep 2000
#run general/parser/dbtbnameValidate.sim
sleep 2000
run general/parser/alter.sim
sleep 2000
run general/parser/alter1.sim
sleep 2000
run general/parser/alter_stable.sim
sleep 2000
run general/parser/auto_create_tb.sim
sleep 2000
run general/parser/auto_create_tb_drop_tb.sim
sleep 2000
run general/parser/col_arithmetic_operation.sim
sleep 2000
run general/parser/columnValue.sim
sleep 2000
run general/parser/commit.sim
sleep 2000
run general/parser/create_db.sim
sleep 2000
run general/parser/create_mt.sim
sleep 2000
run general/parser/create_tb.sim
sleep 2000
run general/parser/dbtbnameValidate.sim
sleep 2000
run general/parser/fill.sim
sleep 2000
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册