未验证 提交 fe25b330 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2695 from taosdata/feature/query

Feature/query
......@@ -263,9 +263,11 @@ int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid);
void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex);
bool hasMoreVnodesToTry(SSqlObj *pSql);
bool hasMoreClauseToTry(SSqlObj* pSql);
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)());
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
int tscSetMgmtIpListFromCfg(const char *first, const char *second);
void* malloc_throw(size_t size);
......
......@@ -422,7 +422,6 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
int32_t bytes = pInfo->pSqlExpr->resBytes;
char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row;
if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
int32_t realLen = varDataLen(pData);
assert(realLen <= bytes - VARSTR_HEADER_SIZE);
......
......@@ -169,7 +169,11 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
tscProcessSql(pSql);
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockFromSubquery(pSql);
} else {
tscProcessSql(pSql);
}
}
/*
......@@ -474,33 +478,11 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
// in case of insert, redo parsing the sql string and build new submit data block for two reasons:
// 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly.
// 2. vnode may need the schema information along with submit block to update its local table schema.
if (pCmd->command == TSDB_SQL_INSERT) {
tscDebug("%p redo parse sql string to build submit block", pSql);
pCmd->parseFinished = false;
tscResetSqlCmdObj(pCmd);
code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
/*
* Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks,
* and send the required submit block according to index value in supporter to server.
*/
pSql->fp = pSql->fetchFp; // restore the fp
tscHandleInsertRetry(pSql);
} else if (pCmd->command == TSDB_SQL_SELECT) { // in case of other query type, continue
if (pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) {
tscDebug("%p redo parse sql string and proceed", pSql);
//tscDebug("before %p fp:%p, fetchFp:%p", pSql, pSql->fp, pSql->fetchFp);
pCmd->parseFinished = false;
tscResetSqlCmdObj(pCmd);
//tscDebug("after %p fp:%p, fetchFp:%p", pSql, pSql->fp, pSql->fetchFp);
code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
......@@ -509,8 +491,17 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
goto _error;
}
tscProcessSql(pSql);
} else { // in all other cases, simple retry
if (pCmd->command == TSDB_SQL_INSERT) {
/*
* Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks,
* and send the required submit block according to index value in supporter to server.
*/
pSql->fp = pSql->fetchFp; // restore the fp
tscHandleInsertRetry(pSql);
} else if (pCmd->command == TSDB_SQL_SELECT) { // in case of other query type, continue
tscProcessSql(pSql);
}
}else { // in all other cases, simple retry
tscProcessSql(pSql);
}
......
......@@ -1481,7 +1481,7 @@ static bool first_last_function_setup(SQLFunctionCtx *pCtx) {
// todo opt for null block
static void first_function(SQLFunctionCtx *pCtx) {
if (pCtx->order == TSDB_ORDER_DESC) {
if (pCtx->order == TSDB_ORDER_DESC || pCtx->preAggVals.dataBlockLoaded == false) {
return;
}
......@@ -1550,28 +1550,17 @@ static void first_data_assign_impl(SQLFunctionCtx *pCtx, char *pData, int32_t in
* to decide if the value is earlier than current intermediate result
*/
static void first_dist_function(SQLFunctionCtx *pCtx) {
assert(pCtx->size > 0);
if (pCtx->size == 0) {
return;
}
/*
* do not to check data in the following cases:
* 1. data block that are not loaded
* 2. scan data files in desc order
*/
if (pCtx->order == TSDB_ORDER_DESC) {
if (pCtx->order == TSDB_ORDER_DESC || pCtx->preAggVals.dataBlockLoaded == false) {
return;
}
int32_t notNullElems = 0;
// data block is discard, not loaded, do not need to check it
if (!pCtx->preAggVals.dataBlockLoaded) {
return;
}
// find the first not null value
for (int32_t i = 0; i < pCtx->size; ++i) {
char *data = GET_INPUT_CHAR_INDEX(pCtx, i);
......@@ -1655,7 +1644,7 @@ static void first_dist_func_second_merge(SQLFunctionCtx *pCtx) {
* least one data in this block that is not null.(TODO opt for this case)
*/
static void last_function(SQLFunctionCtx *pCtx) {
if (pCtx->order != pCtx->param[0].i64Key) {
if (pCtx->order != pCtx->param[0].i64Key || pCtx->preAggVals.dataBlockLoaded == false) {
return;
}
......@@ -2303,8 +2292,9 @@ static void top_func_second_merge(SQLFunctionCtx *pCtx) {
// the intermediate result is binary, we only use the output data type
for (int32_t i = 0; i < pInput->num; ++i) {
int16_t type = (pCtx->outputType == TSDB_DATA_TYPE_FLOAT)? TSDB_DATA_TYPE_DOUBLE:pCtx->outputType;
do_top_function_add(pOutput, pCtx->param[0].i64Key, &pInput->res[i]->v.i64Key, pInput->res[i]->timestamp,
pCtx->outputType, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage);
type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage);
}
SET_VAL(pCtx, pInput->num, pOutput->num);
......
......@@ -485,7 +485,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
case TSDB_SQL_SELECT: {
assert(pCmd->numOfClause == 1);
const char* msg1 = "columns in select clause not identical";
for (int32_t i = pCmd->numOfClause; i < pInfo->subclauseInfo.numOfClause; ++i) {
......@@ -496,16 +495,19 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
assert(pCmd->numOfClause == pInfo->subclauseInfo.numOfClause);
for (int32_t i = 0; i < pInfo->subclauseInfo.numOfClause; ++i) {
for (int32_t i = pCmd->clauseIndex; i < pInfo->subclauseInfo.numOfClause; ++i) {
SQuerySQL* pQuerySql = pInfo->subclauseInfo.pClause[i];
tscTrace("%p start to parse %dth subclause, total:%d", pSql, i, pInfo->subclauseInfo.numOfClause);
if ((code = doCheckForQuery(pSql, pQuerySql, i)) != TSDB_CODE_SUCCESS) {
return code;
}
tscPrintSelectClause(pSql, i);
pCmd->clauseIndex += 1;
}
// restore the clause index
pCmd->clauseIndex = 0;
// set the command/global limit parameters from the first subclause to the sqlcmd object
SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pCmd, 0);
pCmd->command = pQueryInfo1->command;
......@@ -1385,6 +1387,11 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum
return numOfTotalColumns;
}
static void tscInsertPrimaryTSSourceColumn(SQueryInfo* pQueryInfo, SColumnIndex* pIndex) {
SColumnIndex tsCol = {.tableIndex = pIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscColumnListInsert(pQueryInfo->colList, &tsCol);
}
int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQLExprItem* pItem) {
const char* msg0 = "invalid column name";
const char* msg1 = "tag for normal table query is not allowed";
......@@ -1427,6 +1434,8 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
addProjectQueryCol(pQueryInfo, startPos, &index, pItem);
}
tscInsertPrimaryTSSourceColumn(pQueryInfo, &index);
} else {
return TSDB_CODE_TSC_INVALID_SQL;
}
......@@ -1499,8 +1508,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
switch (optr) {
case TK_COUNT: {
if (pItem->pNode->pParam != NULL && pItem->pNode->pParam->nExpr != 1) {
/* more than one parameter for count() function */
if (pItem->pNode->pParam != NULL && pItem->pNode->pParam->nExpr != 1) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
......@@ -1551,11 +1560,12 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
} else { // count(*) is equalled to count(primary_timestamp_key)
index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize;
pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, false);
}
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
memset(pExpr->aliasName, 0, tListLen(pExpr->aliasName));
getColumnName(pItem, pExpr->aliasName, sizeof(pExpr->aliasName) - 1);
......@@ -1570,9 +1580,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
// the time stamp may be always needed
if (index.tableIndex > 0 && index.tableIndex < tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) {
SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscColumnListInsert(pQueryInfo->colList, &tsCol);
if (index.tableIndex < tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) {
tscInsertPrimaryTSSourceColumn(pQueryInfo, &index);
}
return TSDB_CODE_SUCCESS;
......@@ -1682,10 +1691,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
tscColumnListInsert(pQueryInfo->colList, &(ids.ids[i]));
}
}
SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscColumnListInsert(pQueryInfo->colList, &tsCol);
tscInsertPrimaryTSSourceColumn(pQueryInfo, &index);
return TSDB_CODE_SUCCESS;
}
case TK_FIRST:
......@@ -5862,6 +5869,8 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo);
}
assert(pCmd->clauseIndex == index);
// too many result columns not support order by in query
if (pQuerySql->pSelection->nExpr > TSDB_MAX_COLUMNS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg8);
......@@ -5975,12 +5984,11 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
pQueryInfo->window.ekey = pQueryInfo->window.ekey / 1000;
}
} else { // set the time rang
pQueryInfo->window.skey = TSKEY_INITIAL_VAL;
pQueryInfo->window.ekey = INT64_MAX;
pQueryInfo->window = TSWINDOW_INITIALIZER;
}
// user does not specified the query time window, twa is not allowed in such case.
if ((pQueryInfo->window.skey == 0 || pQueryInfo->window.ekey == INT64_MAX ||
if ((pQueryInfo->window.skey == INT64_MIN || pQueryInfo->window.ekey == INT64_MAX ||
(pQueryInfo->window.ekey == INT64_MAX / 1000 && tinfo.precision == TSDB_TIME_PRECISION_MILLI)) && tscIsTWAQuery(pQueryInfo)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg9);
}
......
......@@ -475,7 +475,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
pRetrieveMsg->free = htons(pQueryInfo->type);
// todo valid the vgroupId at the client side
......
......@@ -424,7 +424,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
}
// current data set are exhausted, fetch more data from node
if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql)) &&
if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) &&
(pCmd->command == TSDB_SQL_RETRIEVE ||
pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE ||
pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE ||
......
......@@ -848,13 +848,14 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// TODO put to async res?
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
assert(numOfRows == taos_errno(pSql));
pParentSql->res.code = numOfRows;
tscError("%p retrieve failed, index:%d, code:%s", pSql, pSupporter->subqueryIndex, tstrerror(numOfRows));
tscQueueAsyncRes(pParentSql);
return;
}
if (numOfRows >= 0) {
......@@ -941,31 +942,22 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
SSqlRes *pRes = &pSub->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0);
// STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
// if (pRes->row >= pRes->numOfRows && pTableMetaInfo->vgroupIndex < pTableMetaInfo->vgroupList->numOfVgroups &&
// (!tscHasReachLimitation(pQueryInfo, pRes)) && !pRes->completed) {
// numOfFetch++;
// }
// } else {
if (!tscHasReachLimitation(pQueryInfo, pRes)) {
if (pRes->row >= pRes->numOfRows) {
hasData = false;
if (!pRes->completed) {
numOfFetch++;
}
}
} else { // has reach the limitation, no data anymore
if (pRes->row >= pRes->numOfRows) {
hasData = false;
break;
if (!tscHasReachLimitation(pQueryInfo, pRes)) {
if (pRes->row >= pRes->numOfRows) {
hasData = false;
if (!pRes->completed) {
numOfFetch++;
}
}
} else { // has reach the limitation, no data anymore
if (pRes->row >= pRes->numOfRows) {
hasData = false;
break;
}
}
// }
}
// has data remains in client side, and continue to return data to app
if (hasData) {
......@@ -1026,7 +1018,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
tscDebug("%p all subquery response, retrieve data", pSql);
tscDebug("%p all subquery response, retrieve data for subclause:%d", pSql, pCmd->clauseIndex);
// the column transfer support struct has been built
if (pRes->pColumnIndex != NULL) {
......@@ -1195,8 +1187,11 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
pNew->cmd.numOfCols = 0;
pNewQueryInfo->intervalTime = 0;
memset(&pNewQueryInfo->limit, 0, sizeof(SLimitVal));
pSupporter->limit = pNewQueryInfo->limit;
pNewQueryInfo->limit.limit = -1;
pNewQueryInfo->limit.offset = 0;
// backup the data and clear it in the sqlcmd object
pSupporter->groupbyExpr = pNewQueryInfo->groupbyExpr;
memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
......@@ -1307,7 +1302,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) {
break;
}
}
pSql->cmd.command = (pSql->numOfSubs <= 0)? TSDB_SQL_RETRIEVE_EMPTY_RESULT:TSDB_SQL_TABLE_JOIN_RETRIEVE;
return TSDB_CODE_SUCCESS;
......@@ -1982,88 +1977,119 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t columnIndex, int16_t* bytes) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SFieldSupInfo* pInfo = (SFieldSupInfo*) TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.pSupportInfo, columnIndex);
assert(pInfo->pSqlExpr != NULL);
*bytes = pInfo->pSqlExpr->resBytes;
char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows;
return pData;
}
static void doBuildResFromSubqueries(SSqlObj* pSql) {
SSqlRes* pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
int32_t numOfRes = INT32_MAX;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
if (pSql->pSubs[i] == NULL) {
continue;
}
numOfRes = MIN(numOfRes, pSql->pSubs[i]->res.numOfRows);
}
int32_t totalSize = tscGetResRowLength(pQueryInfo->exprList);
pRes->pRsp = realloc(pRes->pRsp, numOfRes * totalSize);
pRes->data = pRes->pRsp;
char* data = pRes->data;
int16_t bytes = 0;
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
for(int32_t i = 0; i < numOfExprs; ++i) {
SColumnIndex* pIndex = &pRes->pColumnIndex[i];
SSqlRes *pRes1 = &pSql->pSubs[pIndex->tableIndex]->res;
SSqlCmd *pCmd1 = &pSql->pSubs[pIndex->tableIndex]->cmd;
char* pData = getResultBlockPosition(pCmd1, pRes1, pIndex->columnIndex, &bytes);
memcpy(data, pData, bytes * numOfRes);
data += bytes * numOfRes;
pRes1->row = numOfRes;
}
pRes->numOfRows = numOfRes;
pRes->numOfClauseTotal += numOfRes;
}
void tscBuildResFromSubqueries(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlRes* pRes = &pSql->res;
if (pRes->code != TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pSql);
return;
}
while (1) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
if (pRes->tsrow == NULL) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
if (pRes->tsrow == NULL) {
pRes->tsrow = calloc(numOfExprs, POINTER_BYTES);
pRes->length = calloc(numOfExprs, sizeof(int32_t));
}
bool success = false;
int32_t numOfTableHasRes = 0;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
if (pSql->pSubs[i] != NULL) {
numOfTableHasRes++;
}
}
if (numOfTableHasRes >= 2) { // do merge result
success = (doSetResultRowData(pSql->pSubs[0], false) != NULL) && (doSetResultRowData(pSql->pSubs[1], false) != NULL);
} else { // only one subquery
SSqlObj *pSub = pSql->pSubs[0];
if (pSub == NULL) {
pSub = pSql->pSubs[1];
}
success = (doSetResultRowData(pSub, false) != NULL);
pRes->tsrow = calloc(numOfExprs, POINTER_BYTES);
pRes->buffer = calloc(numOfExprs, POINTER_BYTES);
pRes->length = calloc(numOfExprs, sizeof(int32_t));
tscRestoreSQLFuncForSTableQuery(pQueryInfo);
}
while (1) {
if (pRes->row < pRes->numOfRows) {
assert(0);
}
if (success) { // current row of final output has been built, return to app
for (int32_t i = 0; i < numOfExprs; ++i) {
SColumnIndex* pIndex = &pRes->pColumnIndex[i];
SSqlRes *pRes1 = &pSql->pSubs[pIndex->tableIndex]->res;
pRes->tsrow[i] = pRes1->tsrow[pIndex->columnIndex];
pRes->length[i] = pRes1->length[pIndex->columnIndex];
}
pRes->numOfClauseTotal++;
break;
} else { // continue retrieve data from vnode
if (!tscHasRemainDataInSubqueryResultSet(pSql)) {
tscDebug("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1);
SSubqueryState *pState = NULL;
// free all sub sqlobj
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlObj *pChildObj = pSql->pSubs[i];
if (pChildObj == NULL) {
continue;
}
SJoinSupporter *pSupporter = (SJoinSupporter *)pChildObj->param;
pState = pSupporter->pState;
tscDestroyJoinSupporter(pChildObj->param);
taos_free_result(pChildObj);
}
free(pState);
pRes->completed = true; // set query completed
sem_post(&pSql->rspSem);
return;
}
tscFetchDatablockFromSubquery(pSql);
if (pRes->code != TSDB_CODE_SUCCESS) {
return;
}
doBuildResFromSubqueries(pSql);
sem_post(&pSql->rspSem);
return;
// continue retrieve data from vnode
// if (!tscHasRemainDataInSubqueryResultSet(pSql)) {
// tscDebug("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1);
// SSubqueryState* pState = NULL;
//
// // free all sub sqlobj
// for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
// SSqlObj* pChildObj = pSql->pSubs[i];
// if (pChildObj == NULL) {
// continue;
// }
//
// SJoinSupporter* pSupporter = (SJoinSupporter*)pChildObj->param;
// pState = pSupporter->pState;
//
// tscDestroyJoinSupporter(pChildObj->param);
// taos_free_result(pChildObj);
// }
//
// free(pState);
//
// pRes->completed = true; // set query completed
// sem_post(&pSql->rspSem);
// return;
// }
tscFetchDatablockFromSubquery(pSql);
if (pRes->code != TSDB_CODE_SUCCESS) {
return;
}
}
if (pSql->res.code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, 0);
(*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
} else {
tscQueueAsyncRes(pSql);
}
......@@ -2117,14 +2143,6 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows);
if(pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
if (pRes->completed) {
tfree(pRes->tsrow);
}
return pRes->tsrow;
}
if (pRes->row >= pRes->numOfRows) { // all the results has returned to invoker
tfree(pRes->tsrow);
return pRes->tsrow;
......@@ -2182,7 +2200,7 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
return pRes->tsrow;
}
static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
bool hasData = true;
SSqlCmd *pCmd = &pSql->cmd;
......
......@@ -1994,6 +1994,10 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) {
(!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < numOfVgroups - 1);
}
bool hasMoreClauseToTry(SSqlObj* pSql) {
return pSql->cmd.clauseIndex < pSql->cmd.numOfClause - 1;
}
void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
......@@ -2050,7 +2054,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
}
}
void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
......@@ -2070,17 +2074,13 @@ void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
tfree(pSql->pSubs);
pSql->numOfSubs = 0;
if (pSql->fp != NULL) {
pSql->fp = queryFp;
assert(queryFp != NULL);
}
pSql->fp = fp;
tscDebug("%p try data in the next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause);
if (pCmd->command > TSDB_SQL_LOCAL) {
tscProcessLocalCmd(pSql);
} else {
tscProcessSql(pSql);
tscDoQuery(pSql);
}
}
......
......@@ -121,7 +121,6 @@ typedef struct SQueryCostInfo {
uint32_t loadBlockStatis;
uint32_t discardBlocks;
uint64_t elapsedTime;
uint64_t ioTime;
uint64_t computTime;
} SQueryCostInfo;
......@@ -201,7 +200,7 @@ typedef struct SQInfo {
*/
int32_t tableIndex;
int32_t numOfGroupResultPages;
void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
} SQInfo;
......
......@@ -23,7 +23,7 @@ void copyTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* dst, con
int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t size,
int32_t threshold, int16_t type);
void cleanupTimeWindowInfo(SWindowResInfo* pWindowResInfo, int32_t numOfCols);
void cleanupTimeWindowInfo(SWindowResInfo* pWindowResInfo);
void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo* pWindowResInfo);
void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num);
......@@ -32,14 +32,29 @@ int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo);
void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot);
void closeAllTimeWindow(SWindowResInfo* pWindowResInfo);
void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order);
SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot);
static FORCE_INLINE SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot) {
assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size);
return &pWindowResInfo->pResult[slot];
}
#define curTimeWindow(_winres) ((_winres)->curIndex)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pSelectExpr[1].base.arg->argValue.i64:1)
bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot);
int32_t createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo, size_t interBufSize);
int32_t createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, size_t interBufSize);
static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWindowResult *pResult) {
assert(pResult != NULL && pRuntimeEnv != NULL);
SQuery *pQuery = pRuntimeEnv->pQuery;
tFilePage *page = GET_RES_BUF_PAGE_BY_ID(pRuntimeEnv->pResultBuf, pResult->pos.pageId);
int32_t realRowId = pResult->pos.rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery);
char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWindowResult *pResult);
return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage +
pQuery->pSelectExpr[columnIndex].bytes * realRowId;
}
__filter_func_t *getRangeFilterFuncArray(int32_t type);
__filter_func_t *getValueFilterFuncArray(int32_t type);
......
......@@ -22,26 +22,22 @@ extern "C" {
#include "os.h"
#include "qextbuffer.h"
#include "hash.h"
typedef struct SIDList {
uint32_t alloc;
int32_t size;
int32_t* pData;
} SIDList;
typedef struct SArray* SIDList;
typedef struct SDiskbasedResultBuf {
int32_t numOfRowsPerPage;
int32_t numOfPages;
int64_t totalBufSize;
int32_t fd; // data file fd
int32_t allocateId; // allocated page id
int32_t incStep; // minimum allocated pages
char* pBuf; // mmap buffer pointer
char* path; // file path
int32_t numOfRowsPerPage;
int32_t numOfPages;
int64_t totalBufSize;
int32_t fd; // data file fd
int32_t allocateId; // allocated page id
int32_t incStep; // minimum allocated pages
char* pBuf; // mmap buffer pointer
char* path; // file path
uint32_t numOfAllocGroupIds; // number of allocated id list
void* idsTable; // id hash table
SIDList* list; // for each id, there is a page id list
SHashObj* idsTable; // id hash table
SIDList list; // for each id, there is a page id list
} SDiskbasedResultBuf;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (8192L*5)
......@@ -112,7 +108,7 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle);
* @param pList
* @return
*/
int32_t getLastPageId(SIDList *pList);
int32_t getLastPageId(SIDList pList);
#ifdef __cplusplus
}
......
......@@ -132,13 +132,10 @@ typedef struct SQLPreAggVal {
typedef struct SInterpInfoDetail {
TSKEY ts; // interp specified timestamp
int8_t hasResult;
int8_t type;
int8_t primaryCol;
} SInterpInfoDetail;
typedef struct SInterpInfo { SInterpInfoDetail *pInterpDetail; } SInterpInfo;
typedef struct SResultInfo {
int8_t hasResult; // result generated, not NULL value
bool initialized; // output buffer has been initialized
......@@ -146,7 +143,7 @@ typedef struct SResultInfo {
bool superTableQ; // is super table query
int32_t numOfRes; // num of output result in current buffer
int32_t bufLen; // buffer size
void * interResultBuf; // output result buffer
void* interResultBuf; // output result buffer
} SResultInfo;
struct SQLFunctionCtx;
......
此差异已折叠。
......@@ -53,9 +53,9 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
if (pWindowResInfo->pResult == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) {
SPosInfo posInfo = {-1, -1};
int32_t code = createQueryResultInfo(pRuntimeEnv->pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &posInfo, pRuntimeEnv->interBufSize);
int32_t code = createQueryResultInfo(pRuntimeEnv->pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, pRuntimeEnv->interBufSize);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -64,16 +64,15 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
return TSDB_CODE_SUCCESS;
}
void destroyTimeWindowRes(SWindowResult *pWindowRes, int32_t nOutputCols) {
void destroyTimeWindowRes(SWindowResult *pWindowRes) {
if (pWindowRes == NULL) {
return;
}
free(pWindowRes->resultInfo[0].interResultBuf);
free(pWindowRes->resultInfo);
}
void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, int32_t numOfCols) {
void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo) {
if (pWindowResInfo == NULL) {
return;
}
......@@ -84,8 +83,7 @@ void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, int32_t numOfCols) {
if (pWindowResInfo->pResult != NULL) {
for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) {
SWindowResult *pResult = &pWindowResInfo->pResult[i];
destroyTimeWindowRes(pResult, numOfCols);
destroyTimeWindowRes(&pWindowResInfo->pResult[i]);
}
}
......@@ -225,11 +223,6 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_
}
}
SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot) {
assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size);
return &pWindowResInfo->pResult[slot];
}
bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot) {
return (getWindowResult(pWindowResInfo, slot)->status.closed == true);
}
......
......@@ -1064,10 +1064,9 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) {
if (*e == TS_PATH_DELIMITER[0]) {
cond = e + 1;
} else if (*e == ',') {
size_t len = e - cond + VARSTR_HEADER_SIZE;
char* p = exception_malloc(len);
varDataSetLen(p, len - VARSTR_HEADER_SIZE);
memcpy(varDataVal(p), cond, len);
size_t len = e - cond;
char* p = exception_malloc(len + VARSTR_HEADER_SIZE);
STR_WITH_SIZE_TO_VARSTR(p, cond, len);
cond += len;
taosArrayPush(pVal->arr, &p);
}
......
......@@ -2,7 +2,6 @@
#include "hash.h"
#include "qextbuffer.h"
#include "taoserror.h"
#include "tsqlfunction.h"
#include "queryLog.h"
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize, void* handle) {
......@@ -20,35 +19,31 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t si
// init id hash table
pResBuf->idsTable = taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
pResBuf->list = calloc(size, sizeof(SIDList));
pResBuf->numOfAllocGroupIds = size;
pResBuf->list = taosArrayInit(size, POINTER_BYTES);
char path[4096] = {0};
getTmpfilePath("tsdb_q_buf", path);
getTmpfilePath("tsdb_qbuf", path);
pResBuf->path = strdup(path);
pResBuf->fd = open(pResBuf->path, O_CREAT | O_RDWR, 0666);
memset(path, 0, tListLen(path));
if (!FD_VALID(pResBuf->fd)) {
qError("failed to create tmp file: %s on disk. %s", pResBuf->path, strerror(errno));
return TSDB_CODE_QRY_NO_DISKSPACE;
return TAOS_SYSTEM_ERROR(errno);
}
int32_t ret = ftruncate(pResBuf->fd, pResBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE);
if (ret != TSDB_CODE_SUCCESS) {
qError("failed to create tmp file: %s on disk. %s", pResBuf->path, strerror(errno));
return TSDB_CODE_QRY_NO_DISKSPACE;
return TAOS_SYSTEM_ERROR(errno);
}
pResBuf->pBuf = mmap(NULL, pResBuf->totalBufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pResBuf->fd, 0);
if (pResBuf->pBuf == MAP_FAILED) {
qError("QInfo:%p failed to map temp file: %s. %s", handle, pResBuf->path, strerror(errno));
return TSDB_CODE_QRY_OUT_OF_MEMORY; // todo change error code
return TAOS_SYSTEM_ERROR(errno);
}
qDebug("QInfo:%p create tmp file for output result, %s, %" PRId64 "bytes", handle, pResBuf->path,
qDebug("QInfo:%p create tmp file for output result:%s, %" PRId64 "bytes", handle, pResBuf->path,
pResBuf->totalBufSize);
return TSDB_CODE_SUCCESS;
......@@ -86,11 +81,11 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t numOf
return TSDB_CODE_SUCCESS;
}
static bool noMoreAvailablePages(SDiskbasedResultBuf* pResultBuf) {
static FORCE_INLINE bool noMoreAvailablePages(SDiskbasedResultBuf* pResultBuf) {
return (pResultBuf->allocateId == pResultBuf->numOfPages - 1);
}
static int32_t getGroupIndex(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
static FORCE_INLINE int32_t getGroupIndex(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
assert(pResultBuf != NULL);
char* p = taosHashGet(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t));
......@@ -99,51 +94,20 @@ static int32_t getGroupIndex(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
}
int32_t slot = GET_INT32_VAL(p);
assert(slot >= 0 && slot < pResultBuf->numOfAllocGroupIds);
assert(slot >= 0 && slot < taosHashGetSize(pResultBuf->idsTable));
return slot;
}
static int32_t addNewGroupId(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
int32_t num = getNumOfResultBufGroupId(pResultBuf); // the num is the newest allocated group id slot
if (pResultBuf->numOfAllocGroupIds <= num) {
size_t n = pResultBuf->numOfAllocGroupIds << 1u;
SIDList* p = (SIDList*)realloc(pResultBuf->list, sizeof(SIDList) * n);
assert(p != NULL);
memset(&p[pResultBuf->numOfAllocGroupIds], 0, sizeof(SIDList) * pResultBuf->numOfAllocGroupIds);
pResultBuf->list = p;
pResultBuf->numOfAllocGroupIds = n;
}
taosHashPut(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t), &num, sizeof(int32_t));
return num;
}
static int32_t doRegisterId(SIDList* pList, int32_t id) {
if (pList->size >= pList->alloc) {
int32_t s = 0;
if (pList->alloc == 0) {
s = 4;
assert(pList->pData == NULL);
} else {
s = pList->alloc << 1u;
}
int32_t* c = realloc(pList->pData, s * sizeof(int32_t));
assert(c);
memset(&c[pList->alloc], 0, sizeof(int32_t) * pList->alloc);
pList->pData = c;
pList->alloc = s;
}
SArray* pa = taosArrayInit(1, sizeof(int32_t));
taosArrayPush(pResultBuf->list, &pa);
pList->pData[pList->size++] = id;
return 0;
assert(taosArrayGetSize(pResultBuf->list) == taosHashGetSize(pResultBuf->idsTable));
return num;
}
static void registerPageId(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t pageId) {
......@@ -152,8 +116,8 @@ static void registerPageId(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int
slot = addNewGroupId(pResultBuf, groupId);
}
SIDList* pList = &pResultBuf->list[slot];
doRegisterId(pList, pageId);
SIDList pList = taosArrayGetP(pResultBuf->list, slot);
taosArrayPush(pList, &pageId);
}
tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) {
......@@ -178,12 +142,11 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32
int32_t getNumOfRowsPerPage(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; }
SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
SIDList list = {0};
int32_t slot = getGroupIndex(pResultBuf, groupId);
if (slot < 0) {
return list;
return taosArrayInit(1, sizeof(int32_t));
} else {
return pResultBuf->list[slot];
return taosArrayGetP(pResultBuf->list, slot);
}
}
......@@ -202,22 +165,20 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) {
tfree(pResultBuf->path);
for (int32_t i = 0; i < pResultBuf->numOfAllocGroupIds; ++i) {
SIDList* pList = &pResultBuf->list[i];
tfree(pList->pData);
size_t size = taosArrayGetSize(pResultBuf->list);
for (int32_t i = 0; i < size; ++i) {
SArray* pa = taosArrayGetP(pResultBuf->list, i);
taosArrayDestroy(pa);
}
tfree(pResultBuf->list);
taosArrayDestroy(pResultBuf->list);
taosHashCleanup(pResultBuf->idsTable);
tfree(pResultBuf);
}
int32_t getLastPageId(SIDList *pList) {
if (pList == NULL || pList->size <= 0) {
return -1;
}
return pList->pData[pList->size - 1];
int32_t getLastPageId(SIDList pList) {
size_t size = taosArrayGetSize(pList);
return *(int32_t*) taosArrayGet(pList, size - 1);
}
......@@ -374,7 +374,7 @@ int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chand
if (chandle == NULL) return -1;
return (int)send(pFdObj->fd, data, (size_t)len, 0);
return taosWriteMsg(pFdObj->fd, data, len);
}
static void taosReportBrokenLink(SFdObj *pFdObj) {
......
......@@ -90,6 +90,12 @@ typedef struct SBlockOrderSupporter {
int32_t* numOfBlocksPerTable;
} SBlockOrderSupporter;
typedef struct SIOCostSummary {
int64_t blockLoadTime;
int64_t statisInfoLoadTime;
int64_t checkForNextTime;
} SIOCostSummary;
typedef struct STsdbQueryHandle {
STsdbRepo* pTsdb;
SQueryFilePos cur; // current position
......@@ -116,6 +122,8 @@ typedef struct STsdbQueryHandle {
SArray* defaultLoadColumn;// default load column
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */
SIOCostSummary cost;
} STsdbQueryHandle;
static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle);
......@@ -384,7 +392,7 @@ SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo) {
return NULL;
}
bool moveToNextRow(STableCheckInfo* pCheckInfo) {
static bool moveToNextRow(STableCheckInfo* pCheckInfo) {
bool hasNext = false;
if (pCheckInfo->chosen == 0) {
if (pCheckInfo->iter != NULL) {
......@@ -507,7 +515,7 @@ static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSK
return midSlot;
}
static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks, int32_t type) {
static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks) {
SFileGroup* fileGroup = pQueryHandle->pFileGroup;
assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0);
......@@ -524,52 +532,61 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
for (int32_t i = 0; i < numOfTables; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
pCheckInfo->numOfBlocks = 0;
SCompIdx* compIndex = &pQueryHandle->rhelper.pCompIdx[pCheckInfo->tableId.tid];
if (compIndex->len == 0 || compIndex->numOfBlocks == 0 ||
compIndex->uid != pCheckInfo->tableId.uid) { // no data block in this file, try next file
pCheckInfo->numOfBlocks = 0;
continue; // no data blocks in the file belongs to pCheckInfo->pTable
// no data block in this file, try next file
if (compIndex->len == 0 || compIndex->numOfBlocks == 0 || compIndex->uid != pCheckInfo->tableId.uid) {
continue; // no data blocks in the file belongs to pCheckInfo->pTable
}
if (pCheckInfo->compSize < compIndex->len) {
assert(compIndex->len > 0);
char* t = realloc(pCheckInfo->pCompInfo, compIndex->len);
assert(t != NULL);
pCheckInfo->pCompInfo = (SCompInfo*) t;
pCheckInfo->compSize = compIndex->len;
}
tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb);
tsdbLoadCompInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo));
SCompInfo* pCompInfo = pCheckInfo->pCompInfo;
TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL;
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
assert(pCheckInfo->lastKey <= pQueryHandle->window.ekey && pQueryHandle->window.skey <= pQueryHandle->window.ekey);
} else {
if (pCheckInfo->compSize < compIndex->len) {
assert(compIndex->len > 0);
char* t = realloc(pCheckInfo->pCompInfo, compIndex->len);
assert(t != NULL);
pCheckInfo->pCompInfo = (SCompInfo*) t;
pCheckInfo->compSize = compIndex->len;
}
tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb);
assert(pCheckInfo->lastKey >= pQueryHandle->window.ekey && pQueryHandle->window.skey >= pQueryHandle->window.ekey);
}
tsdbLoadCompInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo));
SCompInfo* pCompInfo = pCheckInfo->pCompInfo;
TSKEY s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey);
TSKEY e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey);
// discard the unqualified data block based on the query time window
int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
int32_t end = start;
if (s > pCompInfo->blocks[start].keyLast) {
continue;
}
s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey);
e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey);
// todo speedup the procedure of located end block
while (end < compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
end += 1;
}
// discard the unqualified data block based on the query time window
int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
int32_t end = start;
pCheckInfo->numOfBlocks = (end - start);
if (start > 0) {
memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SCompBlock));
}
if (s > pCompInfo->blocks[start].keyLast) {
continue;
}
// todo speedup the procedure of located end block
while (end < compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
end += 1;
}
(*numOfBlocks) += pCheckInfo->numOfBlocks;
pCheckInfo->numOfBlocks = (end - start);
if (start > 0) {
memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SCompBlock));
}
(*numOfBlocks) += pCheckInfo->numOfBlocks;
}
return TSDB_CODE_SUCCESS;
......@@ -583,22 +600,13 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
.uid = (_checkInfo)->tableId.uid})
static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) {
STsdbRepo *pRepo = pQueryHandle->pTsdb;
// TODO refactor
SCompData* data = calloc(1, sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols);
data->numOfCols = pBlock->numOfCols;
data->uid = pCheckInfo->pTableObj->tableId.uid;
bool blockLoaded = false;
int64_t st = taosGetTimestampUs();
bool blockLoaded = false;
int64_t st = taosGetTimestampUs();
if (pCheckInfo->pDataCols == NULL) {
STsdbMeta* pMeta = tsdbGetMeta(pRepo);
// TODO
pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
}
......@@ -607,7 +615,9 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
tdInitDataCols(pQueryHandle->rhelper.pDataCols[0], pSchema);
tdInitDataCols(pQueryHandle->rhelper.pDataCols[1], pSchema);
if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo) == 0) {
int16_t* colIds = pQueryHandle->defaultLoadColumn->pData;
int32_t ret = tsdbLoadBlockDataCols(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, QH_GET_NUM_OF_COLS(pQueryHandle));
if (ret == TSDB_CODE_SUCCESS) {
SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup;
......@@ -621,10 +631,10 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
pBlock->numOfRows = pCols->numOfRows;
tfree(data);
int64_t et = taosGetTimestampUs() - st;
tsdbDebug("%p load file block into buffer, elapsed time:%"PRId64 " us", pQueryHandle, et);
int64_t elapsedTime = (taosGetTimestampUs() - st);
pQueryHandle->cost.blockLoadTime += elapsedTime;
tsdbDebug("%p load file block into buffer, elapsed time:%"PRId64 " us", pQueryHandle, elapsedTime);
return blockLoaded;
}
......@@ -1246,7 +1256,7 @@ static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void*
pLeftBlockInfoEx->compBlock->last == pRightBlockInfoEx->compBlock->last) {
tsdbError("error in header file, two block with same offset:%" PRId64, (int64_t)pLeftBlockInfoEx->compBlock->offset);
}
#endif
#endif
return pLeftBlockInfoEx->compBlock->offset > pRightBlockInfoEx->compBlock->offset ? 1 : -1;
}
......@@ -1367,7 +1377,6 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
return TSDB_CODE_SUCCESS;
}
// todo opt for only one table case
static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* exists) {
pQueryHandle->numOfBlocks = 0;
SQueryFilePos* cur = &pQueryHandle->cur;
......@@ -1378,8 +1387,7 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
int32_t type = ASCENDING_TRAVERSE(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL;
if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks, type)) != TSDB_CODE_SUCCESS) {
if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
break;
}
......@@ -1487,7 +1495,10 @@ static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) {
// handle data in cache situation
bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
int64_t stime = taosGetTimestampUs();
int64_t elapsedTime = stime;
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
assert(numOfTables > 0);
......@@ -1539,7 +1550,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
pSecQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis));
pSecQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {{0}, 0};
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
......@@ -1547,7 +1558,6 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
colInfo.info = pCol->info;
colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCol->info.bytes);
taosArrayPush(pSecQueryHandle->pColumns, &colInfo);
pSecQueryHandle->statis[i].colId = colInfo.info.colId;
}
size_t si = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
......@@ -1569,7 +1579,8 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
tsdbInitDataBlockLoadInfo(&pSecQueryHandle->dataBlockLoadInfo);
tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo);
pSecQueryHandle->defaultLoadColumn = taosArrayClone(pQueryHandle->defaultLoadColumn);
bool ret = tsdbNextDataBlock((void*) pSecQueryHandle);
assert(ret);
......@@ -1607,6 +1618,8 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
}
if (exists) {
elapsedTime = taosGetTimestampUs() - stime;
pQueryHandle->cost.checkForNextTime += elapsedTime;
return exists;
}
......@@ -1617,6 +1630,9 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
// TODO: opt by consider the scan order
bool ret = doHasDataInBuffer(pQueryHandle);
terrno = TSDB_CODE_SUCCESS;
elapsedTime = taosGetTimestampUs() - stime;
pQueryHandle->cost.checkForNextTime += elapsedTime;
return ret;
}
......@@ -1794,41 +1810,44 @@ void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle, SDataBlockInfo* p
int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataStatis** pBlockStatis) {
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
SQueryFilePos* cur = &pHandle->cur;
if (cur->mixBlock) {
SQueryFilePos* c = &pHandle->cur;
if (c->mixBlock) {
*pBlockStatis = NULL;
return TSDB_CODE_SUCCESS;
}
assert((cur->slot >= 0 && cur->slot < pHandle->numOfBlocks) ||
((cur->slot == pHandle->numOfBlocks) && (cur->slot == 0)));
STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot];
// file block with subblocks has no statistics data
STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[c->slot];
assert((c->slot >= 0 && c->slot < pHandle->numOfBlocks) || ((c->slot == pHandle->numOfBlocks) && (c->slot == 0)));
// file block with sub-blocks has no statistics data
if (pBlockInfo->compBlock->numOfSubBlocks > 1) {
*pBlockStatis = NULL;
return TSDB_CODE_SUCCESS;
}
int64_t stime = taosGetTimestampUs();
tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL);
// todo opt perf
int16_t* colIds = pHandle->defaultLoadColumn->pData;
size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle);
memset(pHandle->statis, 0, numOfCols * sizeof(SDataStatis));
for(int32_t i = 0; i < numOfCols; ++i) {
SDataStatis* st = &pHandle->statis[i];
int32_t colId = st->colId;
memset(st, 0, sizeof(SDataStatis));
st->colId = colId;
pHandle->statis[i].colId = colIds[i];
}
tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, numOfCols);
*pBlockStatis = pHandle->statis;
// always load the first primary timestamp column data
SDataStatis* pPrimaryColStatis = &pHandle->statis[0];
assert(pPrimaryColStatis->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX);
pPrimaryColStatis->numOfNull = 0;
pPrimaryColStatis->min = pBlockInfo->compBlock->keyFirst;
pPrimaryColStatis->max = pBlockInfo->compBlock->keyLast;
//update the number of NULL data rows
for(int32_t i = 0; i < numOfCols; ++i) {
for(int32_t i = 1; i < numOfCols; ++i) {
if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL
pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
}
......@@ -1840,7 +1859,11 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
pHandle->statis[i].max = pBlockInfo->compBlock->keyLast;
}
}
int64_t elapsed = taosGetTimestampUs() - stime;
pHandle->cost.statisInfoLoadTime += elapsed;
*pBlockStatis = pHandle->statis;
return TSDB_CODE_SUCCESS;
}
......@@ -1893,8 +1916,6 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
}
}
SArray* tsdbRetrieveDataRow(TsdbQueryHandleT* pQueryHandle, SArray* pIdList, SQueryRowCond* pCond) { return NULL; }
static int32_t getAllTableList(STable* pSuperTable, SArray* list) {
SSkipListIterator* iter = tSkipListCreateIter(pSuperTable->pIndex);
while (tSkipListIterNext(iter)) {
......@@ -1923,6 +1944,7 @@ static void destroyHelper(void* param) {
free(param);
}
#define TAG_INVALID_COLUMN_INDEX -2
static int32_t getTagColumnIndex(STSchema* pTSchema, SSchema* pSchema) {
// filter on table name(TBNAME)
if (strcasecmp(pSchema->name, TSQL_TBNAME_L) == 0) {
......@@ -1954,9 +1976,8 @@ void filterPrepare(void* expr, void* param) {
tVariant* pCond = pExpr->_node.pRight->pVal;
SSchema* pSchema = pExpr->_node.pLeft->pSchema;
// todo : if current super table does not change schema yet, this function may fail to get correct schema, test case
int32_t index = getTagColumnIndex(pTSSchema, pSchema);
assert((index >= 0 && i < TSDB_MAX_TAGS) || (index == TSDB_TBNAME_COLUMN_INDEX));
assert((index >= 0 && i < TSDB_MAX_TAGS) || (index == TSDB_TBNAME_COLUMN_INDEX) || index == TAG_INVALID_COLUMN_INDEX);
pInfo->sch = *pSchema;
pInfo->colIndex = index;
......@@ -2362,6 +2383,11 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
tsdbUnRefMemTable(pQueryHandle->pTsdb, pQueryHandle->imem);
tsdbDestroyHelper(&pQueryHandle->rhelper);
SIOCostSummary* pCost = &pQueryHandle->cost;
tsdbDebug(":io-cost summary: statis-info:%"PRId64"us, datablock:%" PRId64"us, check data:%"PRId64"us, %p",
pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pQueryHandle->qinfo);
tfree(pQueryHandle);
}
......
......@@ -225,7 +225,7 @@ static void doCleanupDataCache(SCacheObj *pCacheObj);
* refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
* @param handle Cache object handle
*/
static void* taosCacheTimedRefresh(void *pCacheObj);
static void* taosCacheTimedRefresh(void *handle);
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, const char* cacheName) {
if (refreshTimeInSeconds <= 0) {
......@@ -455,51 +455,11 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
__cache_unlock(pCacheObj);
} else {
uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, T_REF_VAL_GET(pNode) - 1);
__cache_wr_lock(pCacheObj);
// NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
int32_t ref = T_REF_DEC(pNode);
if (inTrashCan && (ref == 0)) {
// Remove it if the ref count is 0.
// The ref count does not need to load and check again after lock acquired, since ref count can not be increased when
// the node is in trashcan.
assert(pNode->pTNodeHeader->pData == pNode);
taosRemoveFromTrashCan(pCacheObj, pNode->pTNodeHeader);
}
__cache_unlock(pCacheObj);
uDebug("cache:%s, key:%p, %p is released, refcnt:%d, in trashcan:%d", pCacheObj->name, pNode->key, pNode->data, ref,
inTrashCan);
}
// else {
// if (_remove) { // not in trash can, but need to remove it
// __cache_wr_lock(pCacheObj);
//
// /*
// * If not referenced by other users. Otherwise move this node to trashcan wait for all users
// * releasing this resources.
// *
// * NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
// * that tries to do the same thing.
// */
// if (ref == 0) {
// if (T_REF_VAL_GET(pNode) == 0) {
// taosCacheReleaseNode(pCacheObj, pNode);
// } else {
// taosCacheMoveToTrash(pCacheObj, pNode);
// }
// } else if (ref > 0) {
// if (!pNode->inTrashCan) {
// assert(pNode->pTNodeHeader == NULL);
// taosCacheMoveToTrash(pCacheObj, pNode);
// }
// }
//
// __cache_unlock(pCacheObj);
// }
// }
}
void taosCacheEmpty(SCacheObj *pCacheObj) {
......
......@@ -26,7 +26,7 @@ sleep 2000
run general/parser/fill.sim
sleep 2000
run general/parser/fill_stb.sim
sleep 2000
sleep 2000
#run general/parser/fill_us.sim #
sleep 2000
run general/parser/first_last.sim
......@@ -91,13 +91,11 @@ run general/parser/select_with_tags.sim
sleep 2000
run general/parser/groupby.sim
sleep 2000
run general/parser/tags_filter.sim
sleep 2000
run general/parser/union.sim
sleep 2000
run general/parser/sliding.sim
sleep 2000
run general/parser/fill_us.sim
sleep 2000
run general/parser/tags_filter.sim
#sleep 2000
#run general/parser/repeatStream.sim
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c debugFlag -v 135
system sh/cfg.sh -n dnode1 -c rpcDebugFlag -v 135
system sh/exec.sh -n dnode1 -s start
#system sh/stop_dnodes.sh
#
#system sh/deploy.sh -n dnode1 -i 1
#system sh/cfg.sh -n dnode1 -c walLevel -v 0
#system sh/cfg.sh -n dnode1 -c debugFlag -v 135
#system sh/cfg.sh -n dnode1 -c rpcDebugFlag -v 135
#system sh/exec.sh -n dnode1 -s start
sleep 1000
sql connect
......@@ -24,77 +24,77 @@ $mt = $mtPrefix . $i
$j = 1
$mt1 = $mtPrefix . $j
sql drop database if exits $db -x step1
step1:
sql create database if not exists $db maxtables 4
#
#sql drop database if exits $db -x step1
#step1:
#sql create database if not exists $db maxtables 4
sql use $db
sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int)
$i = 0
$t = 1578203484000
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using $mt tags( $i )
$x = 0
while $x < $rowNum
$ms = $x * 1000
$ms = $ms * 60
$c = $x / 100
$c = $c * 100
$c = $x - $c
$binary = 'binary . $c
$binary = $binary . '
$nchar = 'nchar . $c
$nchar = $nchar . '
$t1 = $t + $ms
sql insert into $tb values ($t1 , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar )
$x = $x + 1
endw
$i = $i + 1
endw
sql create table $mt1 (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int)
$j = 0
$t = 1578203484000
$rowNum = 1000
$tbNum = 5
$i = 0
while $i < $tbNum
$tb1 = $tbPrefix1 . $j
sql create table $tb1 using $mt1 tags( $i )
$x = 0
while $x < $rowNum
$ms = $x * 1000
$ms = $ms * 60
$c = $x / 100
$c = $c * 100
$c = $x - $c
$binary = 'binary . $c
$binary = $binary . '
$nchar = 'nchar . $c
$nchar = $nchar . '
$t1 = $t + $ms
sql insert into $tb1 values ($t1 , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar )
$x = $x + 1
endw
$i = $i + 1
$j = $j + 1
endw
print sleep 1sec.
sleep 1000
#sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int)
#
#$i = 0
#$t = 1578203484000
#
#while $i < $tbNum
# $tb = $tbPrefix . $i
# sql create table $tb using $mt tags( $i )
#
# $x = 0
# while $x < $rowNum
# $ms = $x * 1000
# $ms = $ms * 60
#
# $c = $x / 100
# $c = $c * 100
# $c = $x - $c
# $binary = 'binary . $c
# $binary = $binary . '
# $nchar = 'nchar . $c
# $nchar = $nchar . '
#
# $t1 = $t + $ms
# sql insert into $tb values ($t1 , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar )
# $x = $x + 1
# endw
#
# $i = $i + 1
#endw
#
#sql create table $mt1 (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int)
#
#$j = 0
#$t = 1578203484000
#$rowNum = 1000
#$tbNum = 5
#$i = 0
#
#while $i < $tbNum
# $tb1 = $tbPrefix1 . $j
# sql create table $tb1 using $mt1 tags( $i )
#
# $x = 0
# while $x < $rowNum
# $ms = $x * 1000
# $ms = $ms * 60
#
# $c = $x / 100
# $c = $c * 100
# $c = $x - $c
# $binary = 'binary . $c
# $binary = $binary . '
# $nchar = 'nchar . $c
# $nchar = $nchar . '
#
# $t1 = $t + $ms
# sql insert into $tb1 values ($t1 , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar )
# $x = $x + 1
# endw
#
# $i = $i + 1
# $j = $j + 1
#endw
#
#print sleep 1sec.
#sleep 1000
$i = 1
$tb = $tbPrefix . $i
......@@ -222,7 +222,7 @@ endi
print ===========================================tags union
# two super table tag union, limit is not active during retrieve tags query
sql select t1 from union_mt0 union all select t1 from union_mt0 limit 1
sql select t1 from union_mt0 union all select t1 from union_mt0
if $rows != 20 then
return -1
endi
......@@ -235,6 +235,10 @@ if $data90 != 9 then
return -1
endi
#sql select t1 from union_mt0 union all select t1 from union_mt0 limit 1
#if $row != 11 then
# return -1
#endi
#========================================== two super table join subclause
print ================two super table join subclause
sql select avg(union_mt0.c1) as c from union_mt0 interval(1h) limit 10 union all select union_mt1.ts, union_mt1.c1/1.0 as c from union_mt0, union_mt1 where union_mt1.ts=union_mt0.ts and union_mt1.t1=union_mt0.t1 limit 5;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册