未验证 提交 8d0854f8 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #1903 from taosdata/feature/query

Feature/query
......@@ -145,7 +145,7 @@ static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
return;
}
// local reducer has handle this situation during super table non-projection query.
// local merge has handle this situation during super table non-projection query.
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) {
pRes->numOfTotalInCurrentClause += pRes->numOfRows;
}
......@@ -222,9 +222,30 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
tscResetForNextRetrieve(pRes);
// handle the sub queries of join query
if (pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) {
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockFromSubquery(pSql);
} else {
} else if (pRes->completed && pCmd->command == TSDB_SQL_FETCH) {
if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
return;
} else {
/*
* all available virtual node has been checked already, now we need to check
* for the next subclause queries
*/
if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
return;
}
/*
* 1. has reach the limitation
* 2. no remain virtual nodes to be retrieved anymore
*/
(*pSql->fetchFp)(param, pSql, 0);
}
return;
} else { // current query is not completed, continue retrieve from node
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
......
......@@ -283,7 +283,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
// used to keep the latest input row
pReducer->pTempBuffer = (tFilePage *)calloc(1, pReducer->rowSize + sizeof(tFilePage));
pReducer->discardData = (tFilePage *)calloc(1, pReducer->rowSize + sizeof(tFilePage));
pReducer->discard = false;
......@@ -311,6 +310,8 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
return;
}
size = tscSqlExprNumOfExprs(pQueryInfo);
pReducer->pTempBuffer->numOfElems = 0;
pReducer->pResInfo = calloc(size, sizeof(SResultInfo));
......
......@@ -1445,8 +1445,9 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
}
pRes->row = 0;
pRes->completed = (pRes->numOfRows == 0);
uint8_t code = pRes->code;
int32_t code = pRes->code;
if (pRes->code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
} else {
......
......@@ -463,11 +463,11 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return NULL;
}
// current data are exhausted, fetch more data
if (pRes->row >= pRes->numOfRows && pRes->completed != true &&
// current data set are exhausted, fetch more data from node
if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql)) &&
(pCmd->command == TSDB_SQL_RETRIEVE ||
pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE ||
pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE ||
pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE ||
pCmd->command == TSDB_SQL_FETCH ||
pCmd->command == TSDB_SQL_SHOW ||
pCmd->command == TSDB_SQL_SELECT ||
......
......@@ -1177,7 +1177,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) {
}
}
pSql->cmd.command = (pSql->numOfSubs <= 0)? TSDB_SQL_RETRIEVE_EMPTY_RESULT:TSDB_SQL_METRIC_JOIN_RETRIEVE;
pSql->cmd.command = (pSql->numOfSubs <= 0)? TSDB_SQL_RETRIEVE_EMPTY_RESULT:TSDB_SQL_TABLE_JOIN_RETRIEVE;
return TSDB_CODE_SUCCESS;
}
......@@ -1923,7 +1923,7 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows);
if(pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) {
if(pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
if (pRes->completed) {
tfree(pRes->tsrow);
}
......
......@@ -386,7 +386,7 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
int32_t cmd = pCmd->command;
if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_LOCALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
cmd == TSDB_SQL_METRIC_JOIN_RETRIEVE) {
cmd == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscRemoveFromSqlList(pSql);
}
......@@ -1647,9 +1647,11 @@ void doRemoveTableMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFro
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) {
tscTrace("%p deref the table meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables);
int32_t index = pQueryInfo->numOfTables;
while (index >= 0) {
doRemoveTableMetaInfo(pQueryInfo, --index, removeFromCache);
for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
tscClearTableMetaInfo(pTableMetaInfo, removeFromCache);
free(pTableMetaInfo);
}
tfree(pQueryInfo->pTableMetaInfo);
......@@ -1701,16 +1703,8 @@ void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache)
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache);
tfree(pTableMetaInfo->vgroupList);
if (pTableMetaInfo->tagColList != NULL) {
size_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList);
for(int32_t i = 0; i < numOfTags; ++i) { // todo do NOT use the allocated object
SColumn* pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
tfree(pCol);
}
taosArrayDestroy(pTableMetaInfo->tagColList);
pTableMetaInfo->tagColList = NULL;
}
tscColumnListDestroy(pTableMetaInfo->tagColList);
pTableMetaInfo->tagColList = NULL;
}
void tscResetForNextRetrieve(SSqlRes* pRes) {
......@@ -2001,22 +1995,28 @@ char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; }
/**
* If current vnode query does not return results anymore (pRes->numOfRows == 0), try the next vnode if exists,
* in case of multi-vnode super table projection query and the result does not reach the limitation.
* while multi-vnode super table projection query and the result does not reach the limitation.
*/
bool hasMoreVnodesToTry(SSqlObj* pSql) {
// SSqlCmd* pCmd = &pSql->cmd;
// SSqlRes* pRes = &pSql->res;
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
if (pCmd->command != TSDB_SQL_FETCH) {
return false;
}
// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) || (pTableMetaInfo->pMetricMeta == NULL)) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pRes->completed);
// for normal table, do not try any more if result are exhausted
if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) || (pTableMetaInfo->vgroupList == NULL)) {
return false;
// }
}
// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
// return pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
// (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < totalVnode - 1);
int32_t numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
return tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
(!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < numOfVgroups - 1);
}
void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
......@@ -2032,12 +2032,11 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
assert(pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes));
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int32_t totalVnode = 0;
// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
while (++pTableMetaInfo->vgroupIndex < totalVnode) {
int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
while (++pTableMetaInfo->vgroupIndex < totalVgroups) {
tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql,
pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVnode, pRes->numOfTotalInCurrentClause);
pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pRes->numOfTotalInCurrentClause);
/*
* update the limit and offset value for the query on the next vnode,
......@@ -2053,10 +2052,10 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
}
pQueryInfo->limit.offset = pRes->offset;
assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0));
tscTrace("%p new query to next vnode, vnode index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64, pSql,
pTableMetaInfo->vgroupIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit);
tscTrace("%p new query to next vgroup, index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64,
pSql, pTableMetaInfo->vgroupIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit);
/*
* For project query with super table join, the numOfSub is equalled to the number of all subqueries.
......@@ -2070,43 +2069,46 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
tscResetForNextRetrieve(pRes);
// in case of async query, set the callback function
void* fp1 = pSql->fp;
// void* fp1 = pSql->fp;
pSql->fp = fp;
if (fp1 != NULL) {
assert(fp != NULL);
}
int32_t ret = tscProcessSql(pSql); // todo check for failure
// if (fp1 != NULL) {
// assert(fp != NULL);
// }
// in case of async query, return now
if (fp != NULL) {
return;
}
if (ret != TSDB_CODE_SUCCESS) {
pSql->res.code = ret;
int32_t ret = tscProcessSql(pSql);
if (ret == TSDB_CODE_SUCCESS) {
return;
} else {// todo check for failure
}
// retrieve data
assert(pCmd->command == TSDB_SQL_SELECT);
pCmd->command = TSDB_SQL_FETCH;
if ((ret = tscProcessSql(pSql)) != TSDB_CODE_SUCCESS) {
pSql->res.code = ret;
return;
}
// if the result from current virtual node are empty, try next if exists. otherwise, return the results.
if (pRes->numOfRows > 0) {
break;
}
// in case of async query, return now
// if (fp != NULL) {
// return;
// }
//
// if (ret != TSDB_CODE_SUCCESS) {
// pSql->res.code = ret;
// return;
// }
//
// // retrieve data
// assert(pCmd->command == TSDB_SQL_SELECT);
// pCmd->command = TSDB_SQL_FETCH;
//
// if ((ret = tscProcessSql(pSql)) != TSDB_CODE_SUCCESS) {
// pSql->res.code = ret;
// return;
// }
//
// // if the result from current virtual node are empty, try next if exists. otherwise, return the results.
// if (pRes->numOfRows > 0) {
// break;
// }
}
if (pRes->numOfRows == 0) {
tscTrace("%p all vnodes exhausted, prj query completed. total res:%d", pSql, totalVnode, pRes->numOfTotal);
}
// if (pRes->numOfRows == 0) {
// tscTrace("%p all vnodes exhausted, prj query completed. total res:%d", pSql, totalVnode, pRes->numOfTotal);
// }
}
void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
......
......@@ -61,7 +61,7 @@ enum _sql_type {
TSDB_SQL_LOCAL, // SQL below for client local
TSDB_SQL_DESCRIBE_TABLE,
TSDB_SQL_RETRIEVE_LOCALMERGE,
TSDB_SQL_METRIC_JOIN_RETRIEVE,
TSDB_SQL_TABLE_JOIN_RETRIEVE,
/*
* build empty result instead of accessing dnode to fetch result
......
......@@ -2398,7 +2398,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
// todo extract methods
if (isIntervalQuery(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == 0) {
TSKEY skey1, ekey1;
STimeWindow w = {0};
STimeWindow w = TSWINDOW_INITIALIZER;
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (QUERY_IS_ASC_QUERY(pQuery)) {
......
......@@ -54,12 +54,12 @@ typedef struct SQueryFilePos {
typedef struct SDataBlockLoadInfo {
SFileGroup* fileGroup;
int32_t slot;
int32_t sid;
int32_t tid;
SArray* pLoadedCols;
} SDataBlockLoadInfo;
typedef struct SLoadCompBlockInfo {
int32_t sid; /* table sid */
int32_t tid; /* table tid */
int32_t fileId;
int32_t fileListIndex;
} SLoadCompBlockInfo;
......@@ -127,12 +127,12 @@ static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle);
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
pBlockLoadInfo->slot = -1;
pBlockLoadInfo->sid = -1;
pBlockLoadInfo->tid = -1;
pBlockLoadInfo->fileGroup = NULL;
}
static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
pCompBlockLoadInfo->sid = -1;
pCompBlockLoadInfo->tid = -1;
pCompBlockLoadInfo->fileId = -1;
pCompBlockLoadInfo->fileListIndex = -1;
}
......@@ -423,7 +423,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
bool blockLoaded = false;
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
if (pCheckInfo->pDataCols == NULL) {
if (pCheckInfo->pDataCols == NULL) { // todo: why not the real data?
pCheckInfo->pDataCols = tdNewDataCols(pRepo->tsdbMeta->maxRowBytes, pRepo->tsdbMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
}
......@@ -434,7 +434,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup;
pBlockLoadInfo->slot = pQueryHandle->cur.slot;
pBlockLoadInfo->sid = pCheckInfo->pTableObj->tableId.tid;
pBlockLoadInfo->tid = pCheckInfo->pTableObj->tableId.tid;
blockLoaded = true;
}
......@@ -614,7 +614,7 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
int32_t bytes = pCol->info.bytes;
for (int32_t j = 0; j < numOfCols; ++j) {
for (int32_t j = 0; j < numOfCols; ++j) { //todo opt performance
SDataCol* src = &pQueryHandle->rhelper.pDataCols[0]->cols[j];
if (pCol->info.colId == src->colId) {
......@@ -1197,7 +1197,9 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
} else {
// data block has been loaded, todo extract method
SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->sid == pCheckInfo->pTableObj->tableId.tid) {
if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fileId == pHandle->cur.fid &&
pBlockLoadInfo->tid == pCheckInfo->pTableObj->tableId.tid) {
return pHandle->pColumns;
} else {
SCompBlock* pBlock = pBlockInfoEx->pBlock.compBlock;
......
......@@ -111,7 +111,8 @@ sql import into tb values(1520000034001, 34001)
sql import into tb values(1520000050001, 50001)
sql select * from tb;
print $rows
if $rows != 19 then
if $rows != 19 then
print expect 19, actual: $rows
return -1
endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册