提交 847a0d7d 编写于 作者: H Haojun Liao

[td-225] fix bug while client does not retrieve or paritally retrieve result from vnode.

上级 b549f121
......@@ -57,6 +57,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
}
pSql->sqlstr = realloc(pSql->sqlstr, sqlLen + 1);
if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql);
tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY);
......
......@@ -3003,6 +3003,7 @@ static void tag_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
output += VARSTR_HEADER_SIZE;
}
// todo : handle the binary/nchar data
tVariantDump(&pCtx->tag, output, pCtx->tag.nType);
pCtx->aOutputBuf += pCtx->outputBytes;
}
......@@ -3857,10 +3858,14 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) {
* param[2]: next value of specified timestamp
* param[3]: denotes if the result is a precious result or interpolation results
*
* param[1]: denote the specified timestamp to generated the interp result
* param[2]: fill policy
*
* @param pCtx
*/
static void interp_function(SQLFunctionCtx *pCtx) {
// at this point, the value is existed, return directly
#if 0
if (pCtx->param[3].i64Key == 1) {
char *pData = GET_INPUT_CHAR(pCtx);
assignVal(pCtx->aOutputBuf, pData, pCtx->inputBytes, pCtx->inputType);
......@@ -3955,6 +3960,88 @@ static void interp_function(SQLFunctionCtx *pCtx) {
tVariantDestroy(&pCtx->param[2]);
// data in the check operation are all null, not output
SET_VAL(pCtx, pCtx->size, 1);
#endif
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
SInterpInfoDetail* pInfo = pResInfo->interResultBuf;
if (pCtx->size == 1) {
char *pData = GET_INPUT_CHAR(pCtx);
assignVal(pCtx->aOutputBuf, pData, pCtx->inputBytes, pCtx->inputType);
} else {
/*
* use interpolation to generate the result.
* Note: the result of primary timestamp column uses the timestamp specified by user in the query sql
*/
assert(pCtx->size == 2);
if (pInfo->type == TSDB_FILL_NONE) { // set no output result
return;
}
if (pInfo->primaryCol == 1) {
*(TSKEY *) pCtx->aOutputBuf = pInfo->ts;
} else {
if (pInfo->type == TSDB_FILL_NULL) {
if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pCtx->aOutputBuf, pCtx->outputType);
} else {
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
}
SET_VAL(pCtx, pCtx->size, 1);
} else if (pInfo->type == TSDB_FILL_SET_VALUE) {
tVariantDump(&pCtx->param[1], pCtx->aOutputBuf, pCtx->inputType);
} else if (pInfo->type == TSDB_FILL_PREV) {
char *data = GET_INPUT_CHAR_INDEX(pCtx, 0);
assignVal(pCtx->aOutputBuf, data, pCtx->outputBytes, pCtx->outputType);
SET_VAL(pCtx, pCtx->size, 1);
} else if (pInfo->type == TSDB_FILL_LINEAR) {
char *data1 = GET_INPUT_CHAR_INDEX(pCtx, 0);
char *data2 = GET_INPUT_CHAR_INDEX(pCtx, 1);
TSKEY key1 = pCtx->ptsList[0];
TSKEY key2 = pCtx->ptsList[1];
SPoint point1 = {.key = key1, .val = data1};
SPoint point2 = {.key = key2, .val = data2};
SPoint point = {.key = pInfo->ts, .val = pCtx->aOutputBuf};
int32_t srcType = pCtx->inputType;
if ((srcType >= TSDB_DATA_TYPE_TINYINT && srcType <= TSDB_DATA_TYPE_BIGINT) ||
srcType == TSDB_DATA_TYPE_TIMESTAMP || srcType == TSDB_DATA_TYPE_DOUBLE) {
point1.val = data1;
point2.val = data2;
if (isNull(data1, srcType) || isNull(data2, srcType)) {
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
} else {
taosDoLinearInterpolation(pCtx->outputType, &point1, &point2, &point);
}
} else if (srcType == TSDB_DATA_TYPE_FLOAT) {
point1.val = data1;
point2.val = data2;
if (isNull(data1, srcType) || isNull(data2, srcType)) {
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
} else {
taosDoLinearInterpolation(pCtx->outputType, &point1, &point2, &point);
}
} else {
if (srcType == TSDB_DATA_TYPE_BINARY || srcType == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pCtx->aOutputBuf, pCtx->inputType);
} else {
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
}
}
}
}
}
SET_VAL(pCtx, pCtx->size, 1);
}
......@@ -4910,7 +4997,7 @@ SQLAggFuncElem aAggs[] = {{
"interp",
TSDB_FUNC_INTERP,
TSDB_FUNC_INTERP,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS ,
function_setup,
interp_function,
do_sum_f, // todo filter handle
......@@ -4918,7 +5005,7 @@ SQLAggFuncElem aAggs[] = {{
doFinalizer,
noop1,
copy_function,
no_data_info,
data_req_load_info,
},
{
// 28
......
......@@ -220,7 +220,9 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
if (pSql->freed || pObj->signature != pObj) {
tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
pObj, pObj->signature);
tscFreeSqlObj(pSql);
if (pObj->pSql != pSql) { // it is not a main SqlObj, should be freed
tscFreeSqlObj(pSql);
}
rpcFreeCont(rpcMsg->pCont);
return;
}
......@@ -1867,8 +1869,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
free(pTableMeta);
tscTrace("%p recv table meta: %"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->uid, pTableMeta->sid, pTableMetaInfo->name);
free(pTableMeta);
return TSDB_CODE_SUCCESS;
}
......
......@@ -530,7 +530,6 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
SSqlObj *pSql = (SSqlObj *)res;
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
tscTrace("%p start to free result", pSql);
......@@ -562,10 +561,6 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
return;
}
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
/*
* case 1. Partial data have been retrieved from vnodes, but not all data has been retrieved yet.
* We need to recycle the connection by noticing the vnode return 0 results.
......@@ -576,27 +571,27 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
* for each subquery. Because the failure of execution tsProcessSql may trigger the callback function
* be executed, and the retry efforts may result in double free the resources, e.g.,SRetrieveSupport
*/
if ((pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_SHOW || pCmd->command == TSDB_SQL_RETRIEVE ||
pCmd->command == TSDB_SQL_FETCH) &&
(pRes->code != TSDB_CODE_QUERY_CANCELLED && ((pRes->numOfRows > 0 && pCmd->command < TSDB_SQL_LOCAL && pRes->completed == false) ||
(pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows == 0 && pCmd->command == TSDB_SQL_SELECT &&
pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL)))) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
tscTrace("%p code:%d, numOfRows:%d, command:%d", pSql, pRes->code, pRes->numOfRows, pCmd->command);
pSql->freed = 1;
tscProcessSql(pSql);
// if ((pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_SHOW || pCmd->command == TSDB_SQL_RETRIEVE ||
// pCmd->command == TSDB_SQL_FETCH) &&
// (pRes->code != TSDB_CODE_QUERY_CANCELLED && ((pRes->numOfRows > 0 && pCmd->command < TSDB_SQL_LOCAL && pRes->completed == false) ||
// (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows == 0 && pCmd->command == TSDB_SQL_SELECT &&
// pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL)))) {
// pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
//
// tscTrace("%p code:%d, numOfRows:%d, command:%d", pSql, pRes->code, pRes->numOfRows, pCmd->command);
//
// pSql->freed = 1;
// tscProcessSql(pSql);
/*
* If release connection msg is sent to vnode, the corresponding SqlObj for async query can not be freed instantly,
* since its free operation is delegated to callback function, which is tscProcessMsgFromServer.
*/
STscObj* pObj = pSql->pTscObj;
if (pObj->pSql == pSql) {
pObj->pSql = NULL;
}
} else { // if no free resource msg is sent to vnode, we free this object immediately.
// STscObj* pObj = pSql->pTscObj;
// if (pObj->pSql == pSql) {
// pObj->pSql = NULL;
// }
// } else { // if no free resource msg is sent to vnode, we free this object immediately.
STscObj* pTscObj = pSql->pTscObj;
if (pTscObj->pSql != pSql) {
......@@ -611,7 +606,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
tscTrace("%p sql result is freed by app", pSql);
}
}
}
// }
}
void taos_free_result(TAOS_RES *res) { taos_free_result_imp(res, 0); }
......
......@@ -421,8 +421,9 @@ void tscFreeSqlObj(SSqlObj* pSql) {
memset(pCmd->payload, 0, (size_t)pCmd->allocSize);
tfree(pCmd->payload);
pCmd->allocSize = 0;
tfree(pSql->sqlstr);
free(pSql);
}
......
......@@ -196,6 +196,8 @@ TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STable
*/
TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo);
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList);
/**
* move to next block if exists
*
......
......@@ -269,9 +269,6 @@ extern struct SQLAggFuncElem aAggs[];
/* compatible check array list */
extern int32_t funcCompatDefList[];
void getStatistics(char *priData, char *data, int32_t size, int32_t numOfRow, int32_t type, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull);
bool top_bot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, char *minval, char *maxval);
bool stableQueryFunctChanged(int32_t funcId);
......
......@@ -110,8 +110,9 @@ static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInf
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow);
static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* pData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo,
int32_t functionId, SDataStatis *pStatis, bool hasNull, void *param, int32_t scanFlag);
static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo,
SDataStatis *pStatis, void *param, int32_t colIndex);
static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols);
static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
......@@ -954,16 +955,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
int32_t colId = pQuery->pSelectExpr[k].base.colInfo.colId;
SDataStatis *tpField = NULL;
bool hasNull = hasNullValue(pQuery, k, pDataBlockInfo->numOfCols, pStatis, &tpField);
char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo, functionId, tpField, hasNull,
&sasArray[k], colId);
setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo, pStatis, &sasArray[k], k);
}
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
......@@ -1176,16 +1169,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
int32_t colId = pQuery->pSelectExpr[k].base.colInfo.colId;
SDataStatis *pColStatis = NULL;
bool hasNull = hasNullValue(pQuery, k, pDataBlockInfo->numOfCols, pStatis, &pColStatis);
char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo, functionId, pColStatis, hasNull,
&sasArray[k], colId);
setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo, pStatis, &sasArray[k], k);
}
// set the input column data
......@@ -1354,11 +1339,16 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
}
void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo,
int32_t functionId, SDataStatis *pStatis, bool hasNull, void *param, int32_t colId) {
pCtx->hasNull = hasNull;
SDataStatis *pStatis, void *param, int32_t colIndex) {
int32_t functionId = pQuery->pSelectExpr[colIndex].base.functionId;
int32_t colId = pQuery->pSelectExpr[colIndex].base.colInfo.colId;
SDataStatis *tpField = NULL;
pCtx->hasNull = hasNullValue(pQuery, colIndex, pBlockInfo->numOfCols, pStatis, &tpField);
pCtx->aInputElemBuf = inputData;
if (pStatis != NULL) {
if (tpField != NULL) {
pCtx->preAggVals.isSet = true;
pCtx->preAggVals.statis = *pStatis;
if (pCtx->preAggVals.statis.numOfNull == -1) {
......@@ -1404,6 +1394,19 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
pCtx->preAggVals.statis.min = pBlockInfo->window.skey;
pCtx->preAggVals.statis.max = pBlockInfo->window.ekey;
}
} else if (functionId == TSDB_FUNC_INTERP) {
SInterpInfoDetail *pInterpInfo = GET_RES_INFO(pCtx)->interResultBuf;
pInterpInfo->type = pQuery->fillType;
pInterpInfo->ts = pQuery->window.skey;
pInterpInfo->primaryCol = (colId == PRIMARYKEY_TIMESTAMP_COL_INDEX);
if (pQuery->fillVal != NULL) {
if (isNull((const char*) &pQuery->fillVal[colIndex], pCtx->inputType)) {
pCtx->param[1].nType = TSDB_DATA_TYPE_NULL;
} else { // todo refactor, tVariantCreateFromBinary should handle the NULL value
tVariantCreateFromBinary(&pCtx->param[1], (char*) &pQuery->fillVal[colIndex], pCtx->inputBytes, pCtx->inputType);
}
}
}
#if defined(_DEBUG_VIEW)
......@@ -1635,10 +1638,11 @@ static bool isFixedOutputQuery(SQuery *pQuery) {
return false;
}
// todo refactor with isLastRowQuery
static bool isPointInterpoQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionID = pQuery->pSelectExpr[i].base.functionId;
if (functionID == TSDB_FUNC_INTERP || functionID == TSDB_FUNC_LAST_ROW) {
if (functionID == TSDB_FUNC_INTERP/* || functionID == TSDB_FUNC_LAST_ROW*/) {
return true;
}
}
......@@ -1817,7 +1821,7 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) {
/*
* todo add more parameters to check soon..
*/
bool vnodeParametersSafetyCheck(SQuery *pQuery) {
bool colIdCheck(SQuery *pQuery) {
// load data column information is incorrect
for (int32_t i = 0; i < pQuery->numOfCols - 1; ++i) {
if (pQuery->colList[i].colId == pQuery->colList[i + 1].colId) {
......@@ -1825,6 +1829,7 @@ bool vnodeParametersSafetyCheck(SQuery *pQuery) {
return false;
}
}
return true;
}
......@@ -1851,7 +1856,7 @@ static bool onlyFirstQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSD
static bool onlyLastQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST); }
static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) {
static void changeExecuteScanOrder(SQuery *pQuery, bool stableQuery) {
// in case of point-interpolation query, use asc order scan
char msg[] = "QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%" PRId64
"-%" PRId64 ", new qrange:%" PRId64 "-%" PRId64;
......@@ -1906,7 +1911,7 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) {
}
} else { // interval query
if (metricQuery) {
if (stableQuery) {
if (onlyFirstQuery(pQuery)) {
if (!QUERY_IS_ASC_QUERY(pQuery)) {
qTrace(msg, GET_QINFO_ADDR(pQuery), "only-first stable", pQuery->order.order, TSDB_ORDER_ASC,
......@@ -3841,7 +3846,8 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SFillInfo *pFillInfo = pRuntimeEnv->pFillInfo;
if (pQuery->fillType == TSDB_FILL_NONE) {
// todo refactor
if (pQuery->fillType == TSDB_FILL_NONE || (pQuery->fillType != TSDB_FILL_NONE && isPointInterpoQuery(pQuery))) {
assert(pFillInfo == NULL);
return false;
}
......@@ -4195,7 +4201,6 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
return true;
}
static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
......@@ -4229,6 +4234,8 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
if (isFirstLastRowQuery(pQuery)) {
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableIdGroupInfo);
} else if (isPointInterpoQuery(pQuery)) {
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQInfo->tableIdGroupInfo);
} else {
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo);
}
......@@ -4346,7 +4353,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
// pointInterpSupporterSetData(pQInfo, &interpInfo);
// pointInterpSupporterDestroy(&interpInfo);
if (pQuery->fillType != TSDB_FILL_NONE) {
if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
SFillColInfo* pColInfo = taosCreateFillColInfo(pQuery);
pRuntimeEnv->pFillInfo = taosInitFillInfo(pQuery->order.order, 0, 0, pQuery->rec.capacity, pQuery->numOfOutput,
pQuery->slidingTime, pQuery->fillType, pColInfo);
......@@ -5809,7 +5816,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
goto _cleanup;
}
vnodeParametersSafetyCheck(pQuery);
colIdCheck(pQuery);
qTrace("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo);
return pQInfo;
......
......@@ -766,7 +766,7 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, char type) {
case TSDB_DATA_TYPE_BINARY: {
if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
*payload = TSDB_DATA_BINARY_NULL;
setVardataNull(payload,TSDB_DATA_TYPE_BINARY);
} else {
if (pVariant->nType != TSDB_DATA_TYPE_BINARY) {
toBinary(pVariant, &payload, &pVariant->nLen);
......@@ -786,7 +786,7 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, char type) {
}
case TSDB_DATA_TYPE_NCHAR: {
if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
*(uint32_t *) payload = TSDB_DATA_NCHAR_NULL;
setVardataNull(payload,TSDB_DATA_TYPE_NCHAR);
} else {
if (pVariant->nType != TSDB_DATA_TYPE_NCHAR) {
toNchar(pVariant, &payload, &pVariant->nLen);
......
......@@ -101,6 +101,7 @@ STable *tsdbDecodeTable(void *cont, int contLen) {
pTable->schema = tdDecodeSchema(&ptr);
}
pTable->lastKey = TSKEY_INITIAL_VAL;
return pTable;
}
......@@ -349,7 +350,7 @@ int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) {
table->name = calloc(1, size + VARSTR_HEADER_SIZE + 1);
STR_WITH_SIZE_TO_VARSTR(table->name, pCfg->name, size);
table->lastKey = 0;
table->lastKey = TSKEY_INITIAL_VAL;
if (IS_CREATE_STABLE(pCfg)) { // TSDB_CHILD_TABLE
table->type = TSDB_CHILD_TABLE;
table->superUid = pCfg->superUid;
......
......@@ -117,6 +117,7 @@ typedef struct STsdbQueryHandle {
} STsdbQueryHandle;
static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle);
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
pBlockLoadInfo->slot = -1;
......@@ -188,9 +189,6 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
}
}
for(int32_t i = 0; i < numOfCols; ++i) {
}
uTrace("%p total numOfTable:%d in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo));
tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
......@@ -213,9 +211,9 @@ TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond*
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList);
pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
pQueryHandle->order = TSDB_ORDER_ASC;
// pQueryHandle->outputCapacity = 2; // only allowed two rows to be loaded
// changeQueryHandleForLastrowQuery(pQueryHandle);
changeQueryHandleForInterpQuery(pQueryHandle);
return pQueryHandle;
}
......@@ -581,18 +579,33 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
}
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa);
taosArrayDestroy(sa);
} else {
pQueryHandle->realNumOfRows = binfo.rows;
cur->rows = binfo.rows;
cur->win = binfo.window;
cur->mixBlock = false;
cur->blockCompleted = true;
cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1);
/*
* no data in cache, only load data from file
* during the query processing, data in cache will not be checked anymore.
*
* Here the buffer is not enough, so only part of file block can be loaded into memory buffer
*/
// if (pQueryHandle->outputCapacity < binfo.rows) {
// SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
// doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
//
// doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa);
// taosArrayDestroy(sa);
// } else {
pQueryHandle->realNumOfRows = binfo.rows;
cur->rows = binfo.rows;
cur->win = binfo.window;
cur->mixBlock = false;
cur->blockCompleted = true;
cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1);
// }
}
}
......@@ -622,15 +635,14 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
}
} else { //desc order, query ended in current block
if (pQueryHandle->window.ekey > pBlock->keyFirst) {
if (pQueryHandle->window.ekey > pBlock->keyFirst || pCheckInfo->lastKey < pBlock->keyLast) {
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
return false;
}
SDataCols* pDataCols = pCheckInfo->pDataCols;
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
if (pCheckInfo->lastKey < pBlock->keyLast) {
cur->pos =
binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
cur->pos = binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
} else {
cur->pos = pBlock->numOfRows - 1;
}
......@@ -1011,7 +1023,7 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
firstPos = 0;
lastPos = num - 1;
if (order == 0) {
if (order == TSDB_ORDER_DESC) {
// find the first position which is smaller than the key
while (1) {
if (key >= keyList[lastPos]) return lastPos;
......@@ -1307,12 +1319,116 @@ static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) {
}
// handle data in cache situation
bool tsdbNextDataBlock(TsdbQueryHandleT* pqHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pqHandle;
bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
assert(numOfTables > 0);
if (pQueryHandle->type == TSDB_QUERY_TYPE_EXTERNAL) {
pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->order = TSDB_ORDER_DESC;
if (!tsdbNextDataBlock(pHandle)) {
return false;
}
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
/*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo(pHandle);
/*SArray *pDataBlock = */tsdbRetrieveDataBlock(pHandle, sa);
if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) {
// data already retrieve, discard other data rows and return
int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
memcpy(pCol->pData, pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows-1), pCol->info.bytes);
}
pQueryHandle->cur.win = (STimeWindow){pQueryHandle->window.skey, pQueryHandle->window.skey};
pQueryHandle->window = pQueryHandle->cur.win;
pQueryHandle->cur.rows = 1;
pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
return true;
} else {
STsdbQueryHandle* pSecQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
pSecQueryHandle->order = TSDB_ORDER_ASC;
pSecQueryHandle->window = (STimeWindow) {pQueryHandle->window.skey, INT64_MAX};
pSecQueryHandle->pTsdb = pQueryHandle->pTsdb;
pSecQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pSecQueryHandle->cur.fid = -1;
pSecQueryHandle->cur.win = TSWINDOW_INITIALIZER;
pSecQueryHandle->checkFiles = true;
pSecQueryHandle->activeIndex = 0;
pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock;
tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb);
// allocate buffer in order to load data blocks from file
int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
pSecQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis));
pSecQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {{0}, 0};
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
colInfo.info = pCol->info;
colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCol->info.bytes);
taosArrayPush(pSecQueryHandle->pColumns, &colInfo);
pSecQueryHandle->statis[i].colId = colInfo.info.colId;
}
size_t si = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
pSecQueryHandle->pTableCheckInfo = taosArrayInit(si, sizeof(STableCheckInfo));
STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
assert(pMeta != NULL);
for (int32_t j = 0; j < si; ++j) {
STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j);
STableCheckInfo info = {
.lastKey = pSecQueryHandle->window.skey,
.tableId = pCheckInfo->tableId,
.pTableObj = pCheckInfo->pTableObj,
};
taosArrayPush(pSecQueryHandle->pTableCheckInfo, &info);
}
tsdbInitDataBlockLoadInfo(&pSecQueryHandle->dataBlockLoadInfo);
tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo);
bool ret = tsdbNextDataBlock((void*) pSecQueryHandle);
assert(ret);
/*SDataBlockInfo* pBlockInfo =*/ tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle);
/*SArray *pDataBlock = */tsdbRetrieveDataBlock((void*) pSecQueryHandle, sa);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
memcpy(pCol->pData, pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows-1), pCol->info.bytes);
SColumnInfoData* pCol1 = taosArrayGet(pSecQueryHandle->pColumns, i);
assert(pCol->info.colId == pCol1->info.colId);
memcpy(pCol->pData + pCol->info.bytes, pCol1->pData, pCol1->info.bytes);
}
SColumnInfoData* pTSCol = taosArrayGet(pQueryHandle->pColumns, 0);
pQueryHandle->cur.win = (STimeWindow){((TSKEY*)pTSCol->pData)[0], ((TSKEY*)pTSCol->pData)[1]};
pQueryHandle->window = pQueryHandle->cur.win;
pQueryHandle->cur.rows = 2;
tsdbCleanupQueryHandle(pSecQueryHandle);
}
pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
return true;
}
if (pQueryHandle->checkFiles) {
if (getDataBlocksInFiles(pQueryHandle)) {
return true;
......@@ -1322,7 +1438,6 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pqHandle) {
pQueryHandle->checkFiles = false;
}
// TODO: opt by using lastKeyOnFile
// TODO: opt by consider the scan order
return doHasDataInBuffer(pQueryHandle);
}
......@@ -1336,23 +1451,22 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
// todo consider the query time window, current last_row does not apply the query time window
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
TSKEY key = 0;
TSKEY key = TSKEY_INITIAL_VAL;
int32_t index = -1;
for(int32_t i = 0; i < numOfTables; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
if (pCheckInfo->pTableObj->lastKey > key) { //todo lastKey should not be 0 by default
if (pCheckInfo->pTableObj->lastKey > key) {
key = pCheckInfo->pTableObj->lastKey;
index = i;
}
}
// todo, there are no data in all the tables. opt performance
if (index == -1) {
return;
}
// erase all other elements in array list, todo refactor
// erase all other elements in array list
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
for (int32_t i = 0; i < size; ++i) {
if (i == index) {
......@@ -1371,9 +1485,7 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
}
STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, index);
taosArrayDestroy(pQueryHandle->pTableCheckInfo);
pQueryHandle->pTableCheckInfo = taosArrayInit(1, sizeof(STableCheckInfo));
taosArrayClear(pQueryHandle->pTableCheckInfo);
info.lastKey = key;
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
......@@ -1382,6 +1494,43 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
pQueryHandle->window = (STimeWindow) {key, key};
}
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) {
// filter the queried time stamp in the first place
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
pQueryHandle->order = TSDB_ORDER_DESC;
assert(pQueryHandle->window.skey == pQueryHandle->window.ekey);
// starts from the buffer in case of descending timestamp order check data blocks
// todo consider the query time window, current last_row does not apply the query time window
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
int32_t i = 0;
while(i < numOfTables) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
if (pQueryHandle->window.skey <= pCheckInfo->pTableObj->lastKey &&
pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL) {
break;
}
i++;
}
// there are no data in all the tables
if (i == numOfTables) {
return;
}
STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, i);
taosArrayClear(pQueryHandle->pTableCheckInfo);
info.lastKey = pQueryHandle->window.skey;
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
// update the query time window according to the chosen last timestamp
pQueryHandle->window = (STimeWindow) {info.lastKey, TSKEY_INITIAL_VAL};
}
static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
STsdbQueryHandle* pQueryHandle) {
int numOfRows = 0;
......@@ -1469,14 +1618,15 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY
// copy data from cache into data block
SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) {
STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle;
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
SQueryFilePos* cur = &pHandle->cur;
STable* pTable = pCheckInfo->pTableObj;
int32_t step = ASCENDING_TRAVERSE(pHandle->order)? 1:-1;
// there are data in file
if (pHandle->cur.fid >= 0) {
STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot];
STable* pTable = pBlockInfo->pTableCheckInfo->pTableObj;
SDataBlockInfo blockInfo = {
.uid = pTable->tableId.uid,
.tid = pTable->tableId.tid,
......@@ -1487,15 +1637,12 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) {
return blockInfo;
} else {
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
SQueryFilePos* cur = &pHandle->cur;
STable* pTable = pCheckInfo->pTableObj;
if (pTable->mem != NULL) { // create mem table iterator if it is not created yet
// TODO move to next function
if (pTable->mem != NULL && pHandle->type != TSDB_QUERY_TYPE_EXTERNAL) { // create mem table iterator if it is not created yet
assert(pCheckInfo->iter != NULL);
STimeWindow* win = &cur->win;
pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, pHandle->window.ekey,
cur->rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, pHandle->window.ekey,
pHandle->outputCapacity, &win->skey, &win->ekey, pHandle); // todo refactor API
// update the last key value
......@@ -1505,7 +1652,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) {
}
if (!ASCENDING_TRAVERSE(pHandle->order)) {
SWAP(pHandle->cur.win.skey, pHandle->cur.win.ekey, TSKEY);
SWAP(cur->win.skey, cur->win.ekey, TSKEY);
}
SDataBlockInfo blockInfo = {
......
......@@ -117,6 +117,7 @@ $tb = $tbPrefix . 0
return -1
endi
if $data01 != NULL then
print expect NULL, actual $data01
return -1
endi
if $data02 != NULL then
......@@ -213,6 +214,7 @@ $tb = $tbPrefix . 0
return -1
endi
if $data03 != 0.00000 then
print expect 0.00000, actual:$data03
return -1
endi
# if $data04 != NULL then
......
......@@ -62,16 +62,19 @@ sleep 2000
run general/parser/tbnameIn.sim
sleep 2000
run general/parser/projection_limit_offset.sim
sleep 2000
run general/parser/limit2.sim
sleep 2000
run general/parser/slimit.sim
sleep 2000
run general/parser/fill.sim
sleep 2000
run general/parser/fill_stb.sim
sleep 2000
run general/parser/where.sim
sleep 2000
run general/parser/slimit.sim
sleep 2000
run general/parser/select_with_tags.sim
sleep 2000
run general/parser/tags_dynamically_specifiy.sim
sleep 2000
......@@ -86,8 +89,6 @@ run general/parser/stream_on_sys.sim
sleep 2000
run general/parser/stream.sim
sleep 2000
run general/parser/where.sim
sleep 2000
#run general/parser/repeatAlter.sim
sleep 2000
......@@ -97,11 +98,8 @@ run general/parser/join.sim
sleep 2000
run general/parser/join_multivnode.sim
sleep 2000
run general/parser/select_with_tags.sim
sleep 2000
run general/parser/groupby.sim
sleep 2000
run general/parser/binary_escapeCharacter.sim
sleep 2000
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册