提交 8ff86402 编写于 作者: H hjxilinx

[TD-32] fix bugs in super table query.

上级 e45701a6
......@@ -636,10 +636,11 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
pModel = createColumnModel(pSchema, pQueryInfo->exprsInfo.numOfExprs, capacity);
// for (int32_t i = 0; i < pTableMetaInfo->pMetricMeta->numOfVnodes; ++i) {
// (*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pModel);
// (*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL;
// }
size_t numOfSubs = taosArrayGetSize(pTableMetaInfo->vgroupIdList);
for (int32_t i = 0; i < numOfSubs; ++i) {
(*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pModel);
(*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL;
}
if (createOrderDescriptor(pOrderDesc, pCmd, pModel) != TSDB_CODE_SUCCESS) {
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
......
......@@ -509,31 +509,31 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSubmitMsg *pShellMsg;
char * pMsg, *pStart;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
pStart = pSql->cmd.payload + tsRpcHeadSize;
pMsg = pStart;
char* pMsg = pSql->cmd.payload + tsRpcHeadSize;
// NOTE: shell message size should not include SMsgDesc
int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
pMsgDesc->numOfVnodes = htonl(1); //set the number of vnodes
pMsgDesc->numOfVnodes = htonl(1); //todo set the right number of vnodes
pMsg += sizeof(SMsgDesc);
pShellMsg = (SSubmitMsg *)pMsg;
SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
pShellMsg->header.vgId = htonl(pTableMeta->vgId);
pShellMsg->header.contLen = htonl(pSql->cmd.payloadLen);
pShellMsg->header.contLen = htonl(size);
pShellMsg->length = pShellMsg->header.contLen;
pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted
// pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here
// pSql->cmd.payloadLen is set during copying data into paylaod
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
tscTrace("%p build submit msg, vgId:%d numOfVnodes:%d", pSql, pTableMeta->vgId, htons(pMsgDesc->numOfVnodes));
// tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pTableMeta->vpeerDesc[pTableMeta->index].ip),
// htons(pShellMsg->vnode));
return TSDB_CODE_SUCCESS;
}
......
......@@ -1006,14 +1006,12 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
pRes->qhandle = 1; // hack the qhandle check
const uint32_t nBufferSize = (1 << 16); // 64KB
const uint32_t nBufferSize = (1u << 16); // 64KB
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int32_t numOfSubQueries = 1;
// int32_t numOfSubQueries = pTableMetaInfo->pMetricMeta->numOfVnodes;
int32_t numOfSubQueries = taosArrayGetSize(pTableMetaInfo->vgroupIdList);
assert(numOfSubQueries > 0);
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
......@@ -1119,7 +1117,7 @@ static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) {
tfree(trsupport);
}
static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows);
static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows);
static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t errCode) {
// set no disk space error info
......@@ -1141,7 +1139,7 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES
pthread_mutex_unlock(&trsupport->queryMutex);
tscRetrieveFromVnodeCallBack(trsupport, tres, trsupport->pState->code);
tscRetrieveFromDnodeCallBack(trsupport, tres, trsupport->pState->code);
}
static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
......@@ -1236,7 +1234,86 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
}
}
static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* pSql) {
int32_t idx = trsupport->subqueryIndex;
SSqlObj * pPObj = trsupport->pParentSqlObj;
tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
SSubqueryState* pState = trsupport->pState;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
// data in from current vnode is stored in cache and disk
uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems;
// tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip,
// pSvd->vnode, numOfRowsFromSubquery, idx);
tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);
#ifdef _DEBUG_VIEW
printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->numOfElems);
SSrcColumnInfo colInfo[256] = {0};
tscGetSrcColumnInfo(colInfo, pQueryInfo);
tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->numOfElems,
trsupport->localBuffer->numOfElems, colInfo);
#endif
if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {
tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql,
tsAvailTmpDirGB, tsMinimalTmpDirGB);
tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_CLI_NO_DISKSPACE);
return;
}
// each result for a vnode is ordered as an independant list,
// then used as an input of loser tree for disk-based merge routine
int32_t ret = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer,
pQueryInfo->groupbyExpr.orderType);
if (ret != 0) { // set no disk space error info, and abort retry
return tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_CLI_NO_DISKSPACE);
}
// keep this value local variable, since the pState variable may be released by other threads, if atomic_add opertion
// increases the finished value up to pState->numOfTotal value, which means all subqueries are completed.
// In this case, the comparsion between finished value and released pState->numOfTotal is not safe.
int32_t numOfTotal = pState->numOfTotal;
int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
if (finished < numOfTotal) {
tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
return tscFreeSubSqlObj(trsupport, pSql);
}
// all sub-queries are returned, start to local merge process
pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
tscTrace("%p retrieve from %d vnodes completed.final NumOfRows:%d,start to build loser tree", pPObj,
pState->numOfTotal, pState->numOfRetrievedRows);
SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
tscClearInterpInfo(pPQueryInfo);
tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel,
&pPObj->cmd, &pPObj->res);
tscTrace("%p build loser tree completed", pPObj);
pPObj->res.precision = pSql->res.precision;
pPObj->res.numOfRows = 0;
pPObj->res.row = 0;
// only free once
tfree(trsupport->pState);
tscFreeSubSqlObj(trsupport, pSql);
// set the command flag must be after the semaphore been correctly set.
pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
if (pPObj->res.code == TSDB_CODE_SUCCESS) {
(*pPObj->fp)(pPObj->param, pPObj, 0);
} else {
tscQueueAsyncRes(pPObj);
}
}
static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
int32_t idx = trsupport->subqueryIndex;
SSqlObj * pPObj = trsupport->pParentSqlObj;
......@@ -1265,15 +1342,15 @@ static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfR
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
SVnodeSidList *vnodeInfo = 0;
SVnodeDesc * pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
// SVnodeSidList *vnodeInfo = 0;
// SVnodeDesc * pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
if (numOfRows > 0) {
assert(pRes->numOfRows == numOfRows);
int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql,
pRes->numOfRows, pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx);
// tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql,
// pRes->numOfRows, pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx);
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId64 " , current:%" PRId64,
......@@ -1299,85 +1376,16 @@ static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfR
int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
if (ret < 0) {
// set no disk space error info, and abort retry
if (ret < 0) { // set no disk space error info, and abort retry
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
} else {
} else if (pRes->completed) {
tscAllDataRetrievedFromDnode(trsupport, pSql);
} else { // continue fetch data from dnode
pthread_mutex_unlock(&trsupport->queryMutex);
taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param);
}
} else { // all data has been retrieved to client
/* data in from current vnode is stored in cache and disk */
uint32_t numOfRowsFromVnode = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems;
tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip,
pSvd->vnode, numOfRowsFromVnode, idx);
tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);
#ifdef _DEBUG_VIEW
printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->numOfElems);
SSrcColumnInfo colInfo[256] = {0};
tscGetSrcColumnInfo(colInfo, pQueryInfo);
tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->numOfElems,
trsupport->localBuffer->numOfElems, colInfo);
#endif
if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {
tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql,
tsAvailTmpDirGB, tsMinimalTmpDirGB);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
return;
}
// each result for a vnode is ordered as an independant list,
// then used as an input of loser tree for disk-based merge routine
int32_t ret = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer,
pQueryInfo->groupbyExpr.orderType);
if (ret != 0) {
/* set no disk space error info, and abort retry */
return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
}
// keep this value local variable, since the pState variable may be released by other threads, if atomic_add opertion
// increases the finished value up to pState->numOfTotal value, which means all subqueries are completed.
// In this case, the comparsion between finished value and released pState->numOfTotal is not safe.
int32_t numOfTotal = pState->numOfTotal;
int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
if (finished < numOfTotal) {
tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
return tscFreeSubSqlObj(trsupport, pSql);
}
// all sub-queries are returned, start to local merge process
pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
tscTrace("%p retrieve from %d vnodes completed.final NumOfRows:%d,start to build loser tree", pPObj,
pState->numOfTotal, pState->numOfRetrievedRows);
SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
tscClearInterpInfo(pPQueryInfo);
tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel,
&pPObj->cmd, &pPObj->res);
tscTrace("%p build loser tree completed", pPObj);
pPObj->res.precision = pSql->res.precision;
pPObj->res.numOfRows = 0;
pPObj->res.row = 0;
// only free once
tfree(trsupport->pState);
tscFreeSubSqlObj(trsupport, pSql);
// set the command flag must be after the semaphore been correctly set.
pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
if (pPObj->res.code == TSDB_CODE_SUCCESS) {
(*pPObj->fp)(pPObj->param, pPObj, 0);
} else {
tscQueueAsyncRes(pPObj);
taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
}
} else { // all data has been retrieved to client
tscAllDataRetrievedFromDnode(trsupport, pSql);
}
}
......@@ -1437,7 +1445,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
/*
* if a query on a vnode is failed, all retrieve operations from vnode that occurs later
* than this one are actually not necessary, we simply call the tscRetrieveFromVnodeCallBack
* than this one are actually not necessary, we simply call the tscRetrieveFromDnodeCallBack
* function to abort current and remain retrieve process.
*
* NOTE: threadsafe is required.
......@@ -1475,7 +1483,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
trsupport->subqueryIndex, pState->code);
}
tscRetrieveFromVnodeCallBack(param, tres, pState->code);
tscRetrieveFromDnodeCallBack(param, tres, pState->code);
} else { // success, proceed to retrieve data from dnode
if (vnodeInfo != NULL) {
tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
......@@ -1486,7 +1494,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
trsupport->subqueryIndex);
}
taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param);
taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
}
}
......
......@@ -608,7 +608,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
* the payloadLen should be actual message body size
* the old value of payloadLen is the allocated payload size
*/
pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize - sizeof(SMsgDesc);
pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize;
assert(pCmd->allocSize >= pCmd->payloadLen + tsRpcHeadSize + 100 && pCmd->payloadLen > 0);
return TSDB_CODE_SUCCESS;
......
......@@ -2204,7 +2204,7 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) {
size_t s = taosArrayGetSize(pQInfo->pTableIdList);
num = MAX(s, INITIAL_RESULT_ROWS_VALUE);
} else { // for super table query, one page for each subset
// num = pQInfo->pSidSet->numOfSubSet;
num = 1;//pQInfo->pSidSet->numOfSubSet;
}
assert(num > 0);
......@@ -3873,7 +3873,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) {
totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo);
} else {
// totalSubset = pQInfo->pSidSet->numOfSubSet;
totalSubset = 1;//pQInfo->pSidSet->numOfSubSet;
}
return totalSubset;
......@@ -3909,19 +3909,18 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
int32_t numOfRowsToCopy = result[i].numOfRows - pQInfo->offset;
int32_t oldOffset = pQInfo->offset;
assert(0);
/*
* current output space is not enough to keep all the result data of this group, only copy partial results
* to SQuery object's result buffer
*/
// if (numOfRowsToCopy > pQuery->pointsToRead - numOfResult) {
// numOfRowsToCopy = pQuery->pointsToRead - numOfResult;
// pQInfo->offset += numOfRowsToCopy;
// } else {
// pQInfo->offset = 0;
// pQInfo->subgroupIdx += 1;
// }
if (numOfRowsToCopy > pQuery->rec.capacity - numOfResult) {
numOfRowsToCopy = pQuery->rec.capacity - numOfResult;
pQInfo->offset += numOfRowsToCopy;
} else {
pQInfo->offset = 0;
pQInfo->subgroupIdx += 1;
}
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
int32_t size = pRuntimeEnv->pCtx[j].outputBytes;
......@@ -3932,13 +3931,12 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
}
numOfResult += numOfRowsToCopy;
assert(0);
// if (numOfResult == pQuery->rec.pointsToRead) {
// break;
// }
if (numOfResult == pQuery->rec.capacity) {
break;
}
}
dTrace("QInfo:%p copy data to SQuery buf completed", GET_QINFO_ADDR(pQuery));
dTrace("QInfo:%p copy data to SQuery buf completed", pQInfo);
#ifdef _DEBUG_VIEW
displayInterResult(pQuery->sdata, pQuery, numOfResult);
......@@ -4135,10 +4133,11 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage
}
void vnodePrintQueryStatistics(SQInfo *pQInfo) {
#if 0
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
#if 0
SQueryCostSummary *pSummary = &pRuntimeEnv->summary;
if (pRuntimeEnv->pResultBuf == NULL) {
pSummary->tmpBufferInDisk = 0;
......@@ -4179,41 +4178,30 @@ void vnodePrintQueryStatistics(SQInfo *pQInfo) {
#endif
}
int32_t initQInfo(SQInfo *pQInfo, void *param, void* tsdb) {
int32_t doInitializeQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
int32_t code = TSDB_CODE_SUCCESS;
// only the successful complete requries the sem_post/over = 1 operations.
if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) ||
(!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) {
dTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey,
pQuery->window.ekey, pQuery->order.order);
sem_post(&pQInfo->dataReady);
setQueryStatus(pQuery, QUERY_COMPLETED);
return TSDB_CODE_SUCCESS;
}
setScanLimitationByResultBuffer(pQuery);
changeExecuteScanOrder(pQuery, false);
// dataInCache requires lastKey value
pQuery->lastKey = pQuery->window.skey;
STsdbQueryCond cond = {0};
cond.twindow = (STimeWindow){.skey = pQuery->window.skey, .ekey = pQuery->window.ekey};
cond.order = pQuery->order.order;
cond.colList = *pQuery->colList;
STsdbQueryCond cond = {
.twindow = pQuery->window,
.order = pQuery->order.order,
.colList = *pQuery->colList,
};
SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0]));
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
taosArrayPush(cols, &pQuery->colList[i]);
}
pQInfo->runtimeEnv.pQueryHandle = tsdbQueryByTableId(tsdb, &cond, pQInfo->pTableIdList, cols);
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(tsdb, &cond, pQInfo->pTableIdList, cols);
pRuntimeEnv->pQuery = pQuery;
pRuntimeEnv->pTSBuf = param;
......@@ -4224,15 +4212,34 @@ int32_t initQInfo(SQInfo *pQInfo, void *param, void* tsdb) {
}
// create runtime environment
code = setupQueryRuntimeEnv(&pQInfo->runtimeEnv, NULL, pQuery->order.order, false);
code = setupQueryRuntimeEnv(pRuntimeEnv, NULL, pQuery->order.order, isSTableQuery);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, false);
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, isSTableQuery);
if (isSTableQuery) {
int32_t rows = getInitialPageNum(pQInfo);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pQuery->intervalTime == 0) {
int16_t type = TSDB_DATA_TYPE_NULL;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags;
type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
} else {
type = TSDB_DATA_TYPE_INT; // group id
}
initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type);
}
} else if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
int32_t rows = getInitialPageNum(pQInfo);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -4349,6 +4356,7 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
for(int32_t i = 0; i < numOfTables; ++i) {
if (pQInfo->pTableDataInfo[i].pTableQInfo->tid == blockInfo.sid) {
pTableDataInfo = &pQInfo->pTableDataInfo[i];
break;
}
}
......@@ -4361,7 +4369,7 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
SArray * pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis);
TSKEY nextKey = blockInfo.window.ekey;
if (pQuery->intervalTime == 0) {
if (!isIntervalQuery(pQuery)) {
setExecutionContext(pQInfo, pTableQueryInfo, pTableDataInfo->tableIndex, pTableDataInfo->groupIdx, nextKey);
} else { // interval query
setIntervalQueryRange(pTableQueryInfo, pQInfo, nextKey);
......@@ -4487,9 +4495,10 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start
*/
static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
#if 0
SQuery* pQuery = pRuntimeEnv->pQuery;
setQueryStatus(pQuery, QUERY_COMPLETED);
#if 0
// tSidSet *pTableIdList = pSupporter->pSidSet;
int32_t vid = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[0]->sid)->vnode;
......@@ -4876,9 +4885,10 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
doRestoreContext(pQInfo);
} else {
dTrace("QInfo:%p no need to do reversed scan, query completed", pQInfo);
return;
}
setQueryStatus(pQuery, QUERY_COMPLETED);
if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) {
dTrace("QInfo:%p query killed or error occurred, code:%d, abort", pQInfo, pQInfo->code);
return;
......@@ -5245,14 +5255,16 @@ static int32_t validateQueryMsg(SQueryTableMsg *pQueryMsg) {
static char* createTableIdList(SQueryTableMsg* pQueryMsg, char* pMsg, SArray** pTableIdList) {
assert(pQueryMsg->numOfTables > 0);
*pTableIdList = taosArrayInit(pQueryMsg->numOfTables, sizeof(STableIdInfo));
*pTableIdList = taosArrayInit(pQueryMsg->numOfTables, sizeof(STableId));
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->sid = htonl(pTableIdInfo->sid);
pTableIdInfo->uid = htobe64(pTableIdInfo->uid);
pTableIdInfo->key = htobe64(pTableIdInfo->key);
taosArrayPush(*pTableIdList, pTableIdInfo);
STableId id = {.uid = pTableIdInfo->uid, .tid = pTableIdInfo->sid};
taosArrayPush(*pTableIdList, &id);
pMsg += sizeof(STableIdInfo);
for (int32_t j = 1; j < pQueryMsg->numOfTables; ++j) {
......@@ -5820,7 +5832,18 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQuery->pos = -1;
dTrace("QInfo %p is allocated", pQInfo);
pQuery->window.skey = pQueryMsg->window.skey;
pQuery->window.ekey = pQueryMsg->window.ekey;
pQuery->lastKey = pQuery->window.skey;
if (sem_init(&pQInfo->dataReady, 0, 0) != 0) {
dError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, strerror(errno));
goto _clean_memory;
}
vnodeParametersSafetyCheck(pQuery);
dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo);
return pQInfo;
_clean_memory:
......@@ -5859,57 +5882,42 @@ static bool isValidQInfo(void *param) {
}
static void freeQInfo(SQInfo *pQInfo);
static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs,
SArray *pTableIdList, void* tsdb, SQInfo **pQInfo) {
static int32_t initializeQInfo(SQueryTableMsg *pQueryMsg, void* tsdb, SQInfo *pQInfo, bool isSTable) {
int32_t code = TSDB_CODE_SUCCESS;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pSqlExprs, pTableIdList);
if ((*pQInfo) == NULL) {
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _error;
}
SQuery *pQuery = (*pQInfo)->runtimeEnv.pQuery;
dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo);
pQuery->window.skey = pQueryMsg->window.skey;
pQuery->window.ekey = pQueryMsg->window.ekey;
pQuery->lastKey = pQuery->window.skey;
if (sem_init(&(*pQInfo)->dataReady, 0, 0) != 0) {
dError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, strerror(errno));
code = TSDB_CODE_APP_ERROR;
goto _error;
}
vnodeParametersSafetyCheck(pQuery);
STSBuf *pTSBuf = NULL;
if (pQueryMsg->tsLen > 0) { // open new file to save the result
char *tsBlock = (char *)pQueryMsg + pQueryMsg->tsOffset;
pTSBuf = tsBufCreateFromCompBlocks(tsBlock, pQueryMsg->tsNumOfBlocks, pQueryMsg->tsLen, pQueryMsg->tsOrder);
tsBufResetPos(pTSBuf);
tsBufNextPos(pTSBuf);
}
// only the successful complete requries the sem_post/over = 1 operations.
if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) ||
(!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) {
dTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey,
pQuery->window.ekey, pQuery->order.order);
sem_post(&pQInfo->dataReady);
setQueryStatus(pQuery, QUERY_COMPLETED);
return TSDB_CODE_SUCCESS;
}
// filter the qualified
if ((code = initQInfo(*pQInfo, pTSBuf, tsdb)) != TSDB_CODE_SUCCESS) {
if ((code = doInitializeQInfo(pQInfo, pTSBuf, tsdb, isSTable)) != TSDB_CODE_SUCCESS) {
goto _error;
}
// if (pQInfo->over == 1) {
// vnodeAddRefCount(pQInfo); // for retrieve procedure
// return pQInfo;
// }
// dTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo,
// pQInfo->refCount);
return code;
_error:
// table query ref will be decrease during error handling
freeQInfo(*pQInfo);
freeQInfo(pQInfo);
return code;
}
......@@ -6068,7 +6076,10 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo)
// super table query
SArray* res = NULL;
bool isSTableQuery = false;
if ((pQueryMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) {
isSTableQuery = true;
STableId* id = taosArrayGet(pTableIdList, 0);
id->uid = -1;
......@@ -6081,8 +6092,13 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo)
res = pTableIdList;
}
code = createQInfo(pQueryMsg, pGroupbyExpr, pExprs, res, tsdb, pQInfo);
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, res);
if ((*pQInfo) == NULL) {
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
}
code = initializeQInfo(pQueryMsg, tsdb, *pQInfo, isSTableQuery);
_query_over:
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pTableIdList);
......
......@@ -73,7 +73,7 @@ typedef struct SQueryFilesInfo {
char dbFilePathPrefix[PATH_MAX];
} SQueryFilesInfo;
typedef struct STableQueryInfo {
typedef struct STableCheckInfo {
STableId tableId;
TSKEY lastKey;
STable * pTableObj;
......@@ -81,7 +81,8 @@ typedef struct STableQueryInfo {
int32_t numOfBlocks;
int32_t start;
SCompBlock *pBlock;
} STableQueryInfo;
SSkipListIterator* iter;
} STableCheckInfo;
typedef struct {
SCompBlock *compBlock;
......@@ -90,14 +91,19 @@ typedef struct {
typedef struct STableDataBlockInfoEx {
SCompBlockFields pBlock;
STableQueryInfo* pMeterDataInfo;
STableCheckInfo* pMeterDataInfo;
int32_t blockIndex;
int32_t groupIdx; /* number of group is less than the total number of meters */
} STableDataBlockInfoEx;
enum {
SINGLE_TABLE_MODEL = 1,
MULTI_TABLE_MODEL = 2,
};
typedef struct STsdbQueryHandle {
struct STsdbRepo* pTsdb;
int8_t model; // access model, single table model or multi-table model
SQueryHandlePos cur; // current position
SQueryHandlePos start; // the start position, used for secondary/third iteration
int32_t unzipBufSize;
......@@ -120,14 +126,13 @@ typedef struct STsdbQueryHandle {
bool locateStart;
int32_t realNumOfRows;
bool loadDataAfterSeek; // load data after seek.
SArray* pTableQueryInfo;
SArray* pTableCheckInfo;
int32_t activeIndex;
int32_t tableIndex;
bool isFirstSlot;
void * qinfo; // query info handle, for debug purpose
SSkipListIterator* memIter;
STableDataBlockInfoEx *pDataBlockInfoEx;
} STsdbQueryHandle;
......@@ -271,19 +276,21 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond
size_t size = taosArrayGetSize(idList);
assert(size >= 1);
pQueryHandle->pTableQueryInfo = taosArrayInit(size, sizeof(STableQueryInfo));
pQueryHandle->pTableCheckInfo = taosArrayInit(size, sizeof(STableCheckInfo));
for(int32_t i = 0; i < size; ++i) {
STableId id = *(STableId*) taosArrayGet(idList, i);
STableQueryInfo info = {
STableCheckInfo info = {
.lastKey = pQueryHandle->window.skey,
.tableId = id,
.pTableObj = tsdbGetTableByUid(tsdbGetMeta(tsdb), id.uid), //todo this may be failed
};
taosArrayPush(pQueryHandle->pTableQueryInfo, &info);
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
}
pQueryHandle->model = (size > 1)? MULTI_TABLE_MODEL:SINGLE_TABLE_MODEL;
pQueryHandle->activeIndex = 0;
// malloc buffer in order to load data from file
......@@ -314,11 +321,13 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond
return (tsdb_query_handle_t)pQueryHandle;
}
bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) {
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
STableQueryInfo* pTableQInfo = taosArrayGet(pHandle->pTableQueryInfo, pHandle->activeIndex);
static bool hasMoreDataInCacheForSingleModel(STsdbQueryHandle* pHandle) {
assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1);
STable *pTable = pTableQInfo->pTableObj;
STableCheckInfo* pTableCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
STable *pTable = pTableCheckInfo->pTableObj;
assert(pTable != NULL);
// no data in cache, abort
if (pTable->mem == NULL && pTable->imem == NULL) {
......@@ -326,13 +335,49 @@ bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) {
}
// all data in mem are checked already.
if (pTableQInfo->lastKey > pTable->mem->keyLast) {
if (pTableCheckInfo->lastKey > pTable->mem->keyLast) {
return false;
}
return true;
}
static bool hasMoreDataInCacheForMultiModel(STsdbQueryHandle* pHandle) {
size_t numOfTables = taosArrayGetSize(pHandle->pTableCheckInfo);
assert(numOfTables > 0);
while(pHandle->activeIndex < numOfTables) {
STableCheckInfo* pTableCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
STable *pTable = pTableCheckInfo->pTableObj;
if (pTable->mem == NULL && pTable->imem == NULL) {
pHandle->activeIndex += 1; // try next table if exits
continue;
}
// all data in mem are checked already.
if (pTableCheckInfo->lastKey > pTable->mem->keyLast) {
pHandle->activeIndex += 1; // try next table if exits
continue;
}
return true;
}
// all tables has checked already
return false;
}
// handle data in cache situation
bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) {
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
if (pHandle->model == SINGLE_TABLE_MODEL) {
return hasMoreDataInCacheForSingleModel(pHandle);
} else {
return hasMoreDataInCacheForMultiModel(pHandle);
}
}
static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead,
TSKEY* skey, TSKEY* ekey, STsdbQueryHandle* pHandle) {
int numOfRows = 0;
......@@ -370,7 +415,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max
SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
STableQueryInfo* pTableQInfo = taosArrayGet(pHandle->pTableQueryInfo, pHandle->activeIndex);
STableCheckInfo* pTableQInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
STable *pTable = pTableQInfo->pTableObj;
TSKEY skey = 0, ekey = 0;
......@@ -379,11 +424,11 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
if (pTable->mem != NULL) {
// create mem table iterator if it is not created yet
if (pHandle->memIter == NULL) {
pHandle->memIter = tSkipListCreateIter(pTable->mem->pData);
if (pTableQInfo->iter == NULL) {
pTableQInfo->iter = tSkipListCreateIter(pTable->mem->pData);
}
rows = tsdbReadRowsFromCache(pHandle->memIter, INT64_MAX, 2, &skey, &ekey, pHandle);
rows = tsdbReadRowsFromCache(pTableQInfo->iter, INT64_MAX, 2, &skey, &ekey, pHandle);
}
SDataBlockInfo blockInfo = {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册