提交 e9969951 编写于 作者: H hjxilinx

[td-32] add retrieve support

上级 8395d5eb
......@@ -609,21 +609,21 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
return size;
}
static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfTables, int32_t vnodeId, char *pMsg) {
static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfTables, int32_t vgId, char *pMsg) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
tscTrace("%p vid:%d, query on %d meters", pSql, vnodeId, numOfTables);
tscTrace("%p vgId:%d, query on %d tables", pSql, vgId, numOfTables);
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
#ifdef _DEBUG_VIEW
tscTrace("%p sid:%d, uid:%" PRIu64, pSql, pTableMetaInfo->pTableMeta->sid, pTableMetaInfo->pTableMeta->uid);
#endif
STableIdInfo *pTableMetaInfo = (STableIdInfo *)pMsg;
pTableMetaInfo->sid = htonl(pTableMeta->sid);
pTableMetaInfo->uid = htobe64(pTableMeta->uid);
pTableMetaInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->sid = htonl(pTableMeta->sid);
pTableIdInfo->uid = htobe64(pTableMeta->uid);
pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
pMsg += sizeof(STableIdInfo);
} else {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
......@@ -676,6 +676,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->uid = pTableMeta->uid;
pQueryMsg->numOfTagsCols = 0;
pQueryMsg->vgId = htonl(pTableMeta->vgid);
tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name);
} else { // query on super table
if (pTableMetaInfo->vnodeIndex < 0) {
......@@ -693,7 +694,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
tscTrace("%p query on vid:%d, number of tables:%d", pSql, vnodeId, numOfTables);
pQueryMsg->vnode = htons(vnodeId);
pQueryMsg->vgId = htons(vnodeId);
}
pQueryMsg->numOfTables = htonl(numOfTables);
......@@ -761,14 +762,14 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SColumnBase *pCol = tscColumnBaseInfoGet(&pQueryInfo->colList, i);
SSchema * pColSchema = &pSchema[pCol->colIndex.columnIndex];
if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
tscError("%p vid:%d sid:%d id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", pSql,
htons(pQueryMsg->vnode), pTableMeta->sid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex,
pColSchema->name);
return -1; // 0 means build msg failed
}
// if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
// pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
// tscError("%p vid:%d sid:%d id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", pSql,
// htons(pQueryMsg->vnode), pTableMeta->sid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex,
// pColSchema->name);
//
// return -1; // 0 means build msg failed
// }
pQueryMsg->colList[i].colId = htons(pColSchema->colId);
pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
......@@ -862,7 +863,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->colNameLen = htonl(len);
// serialize the table info (sid, uid, tags)
pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->vnode), pMsg);
pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->vgId), pMsg);
// only include the required tag column schema. If a tag is not required, it won't be sent to vnode
if (pTableMetaInfo->numOfTags > 0) {
......@@ -944,6 +945,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd->payloadLen = msgLen;
pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
pQueryMsg->contLen = htonl(msgLen);
assert(msgLen + minMsgSize() <= size);
return TSDB_CODE_SUCCESS;
......
......@@ -229,21 +229,23 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont;
SQInfo* pQInfo = NULL;
int32_t ret = qCreateQueryInfo(pQueryTableMsg, &pQInfo);
dTrace("query msg is disposed, qInfo:%p", pQueryTableMsg);
int32_t code = qCreateQueryInfo(pQueryTableMsg, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = 0;
pRsp->code = code;
pRsp->qhandle = htobe64((uint64_t) (pQInfo));
SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = sizeof(SQueryTableRsp),
.code = 0,
.code = code,
.msgType = 0
};
// do execute query
qTableQuery(pQInfo);
rpcSendResponse(&rpcRsp);
}
......@@ -253,20 +255,50 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
dTrace("retrieve msg is disposed, qInfo:%p", pQInfo);
assert(pQInfo != NULL);
int32_t contLen = 100;
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *) rpcMallocCont(contLen);
int32_t rowSize = 0;
int32_t numOfRows = 0;
int32_t contLen = 0;
SRpcMsg rpcRsp = {0};
int32_t code = qRetrieveQueryResultInfo(pQInfo, &numOfRows, &rowSize);
if (code != TSDB_CODE_SUCCESS) {
contLen = sizeof(SRetrieveTableRsp);
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen);
pRsp->numOfRows = 0;
pRsp->precision = 0;
pRsp->offset = 0;
pRsp->useconds = 0;
SRpcMsg rpcRsp = {
rpcRsp = (SRpcMsg) {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = contLen,
.code = 0,
.code = code,
.msgType = 0
};
//todo free qinfo
} else {
contLen = 100;
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen);
pRsp->numOfRows = 0;
pRsp->precision = 0;
pRsp->offset = 0;
pRsp->useconds = 0;
*(int64_t*) pRsp->data = 1000;
rpcRsp = (SRpcMsg) {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = contLen,
.code = code,
.msgType = 0
};
}
rpcSendResponse(&rpcRsp);
}
......@@ -451,10 +451,10 @@ typedef struct STimeWindow {
* the outputCols will be 3 while the numOfCols is 1.
*/
typedef struct {
int16_t vnode;
int32_t numOfTables;
uint64_t pSidExtInfo; // table id & tag info ptr, in windows pointer may
int32_t contLen; // msg header
int16_t vgId;
int32_t numOfTables;
uint64_t uid;
STimeWindow window;
......
......@@ -11,5 +11,5 @@ INCLUDE_DIRECTORIES(inc)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(query ${SRC})
TARGET_LINK_LIBRARIES(query tutil m rt)
TARGET_LINK_LIBRARIES(query tsdb tutil m rt)
ENDIF ()
\ No newline at end of file
......@@ -124,9 +124,8 @@ typedef struct tTagSchema {
typedef struct tSidSet {
int32_t numOfSids;
int32_t numOfSubSet;
STableIdInfo **pSids;
STableIdInfo **pTableIdList;
int32_t * starterPos; // position of each subgroup, generated according to
SColumnModel *pColumnModel;
SColumnOrderInfo orderIdx;
} tSidSet;
......
......@@ -25,6 +25,7 @@
#include "taosdef.h"
#include "tref.h"
#include "tsqlfunction.h"
#include "tarray.h"
typedef struct SData {
int32_t num;
......@@ -39,7 +40,7 @@ enum {
struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2);
typedef int (*__block_search_fn_t)(char* data, int num, int64_t key, int order);
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
typedef struct SSqlGroupbyExpr {
int16_t tableIndex;
......@@ -142,7 +143,7 @@ typedef struct SQuery {
int32_t pos;
int64_t pointsOffset; // the number of points offset to save read data
SData** sdata;
int32_t capacity;
SSingleColumnFilterInfo* pFilterInfo;
} SQuery;
......@@ -152,7 +153,6 @@ typedef struct SQueryCostSummary {
typedef struct SQueryRuntimeEnv {
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo
SQuery* pQuery;
// void* pTabObj;
SData** pInterpoBuf;
SQLFunctionCtx* pCtx;
int16_t numOfRowsPerPage;
......@@ -174,15 +174,16 @@ typedef struct SQInfo {
TSKEY startTime;
int64_t elapsedTime;
SResultRec rec;
int pointsReturned;
int pointsInterpo;
int code; // error code to returned to client
int32_t pointsReturned;
int32_t pointsInterpo;
int32_t code; // error code to returned to client
int32_t killed; // denotes if current query is killed
sem_t dataReady;
SHashObj* pTableList; // table list
SArray* pTableIdList; // table list
SQueryRuntimeEnv runtimeEnv;
int32_t subgroupIdx;
int32_t offset; /* offset in group result set of subgroup */
tSidSet* pSidSet;
// tSidSet* pSidSet;
T_REF_DECLARE()
/*
......@@ -210,7 +211,7 @@ int32_t qCreateQueryInfo(SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo);
* query on single table
* @param pReadMsg
*/
void qTableQuery(void* pReadMsg);
void qTableQuery(SQInfo* pQInfo);
/**
* query on super table
......@@ -218,4 +219,13 @@ void qTableQuery(void* pReadMsg);
*/
void qSuperTableQuery(void* pReadMsg);
/**
* wait for the query completed, and retrieve final results to client
* @param pQInfo
*/
int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize);
//int32_t qBuildQueryResult(SQInfo* pQInfo, void* pBuf);
#endif // TDENGINE_QUERYEXECUTOR_H
......@@ -12,18 +12,19 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <qast.h>
#include "os.h"
#include "taosmsg.h"
#include "hash.h"
#include "hashfunc.h"
#include "taosmsg.h"
#include "tlog.h"
#include "tlosertree.h"
#include "tscompression.h"
#include "tstatus.h"
#include "ttime.h"
#include "qast.h"
#include "qresultBuf.h"
#include "queryExecutor.h"
#include "queryUtil.h"
......@@ -52,9 +53,9 @@
/* get the qinfo struct address from the query struct address */
#define GET_COLUMN_BYTES(query, colidx) \
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].data.bytes)
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].info.bytes)
#define GET_COLUMN_TYPE(query, colidx) \
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].data.type)
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].info.type)
typedef struct SPointInterpoSupporter {
int32_t numOfCols;
......@@ -1505,9 +1506,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, SColumnModel
pCtx->inputType = pSchema->type;
pCtx->inputBytes = pSchema->bytes;
} else {
assert(0);
// pCtx->inputType = GET_COLUMN_TYPE(pQuery, i);
// pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i);
pCtx->inputType = GET_COLUMN_TYPE(pQuery, i);
pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i);
}
pCtx->ptsOutputBuf = NULL;
......@@ -1558,9 +1558,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, SColumnModel
}
setCtxTagColumnInfo(pQuery, pRuntimeEnv->pCtx);
// for loading block data in memory
// assert(vnodeList[pMeterObj->vnode].cfg.rowsInFileBlock == pMeterObj->pointsPerFileBlock);
return TSDB_CODE_SUCCESS;
_error_clean:
......@@ -1894,7 +1891,6 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) {
pQuery->checkBufferInLoop = hasMultioutput ? 1 : 0;
}
assert(0);
// pQuery->pointsOffset = pQuery->pointsToRead;
}
......@@ -2243,10 +2239,10 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
num = 128;
} else if (isIntervalQuery(pQuery)) { // time window query, allocate one page for each table
size_t s = taosHashGetSize(pQInfo->pTableList);
size_t s = taosArrayGetSize(pQInfo->pTableIdList);
num = MAX(s, INITIAL_RESULT_ROWS_VALUE);
} else { // for super table query, one page for each subset
num = pQInfo->pSidSet->numOfSubSet;
// num = pQInfo->pSidSet->numOfSubSet;
}
assert(num > 0);
......@@ -2290,16 +2286,11 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
if (pQInfo->pTableList != NULL) {
taosHashCleanup(pQInfo->pTableList);
pQInfo->pTableList = NULL;
}
// tSidSetDestroy(&pQInfo->pSidSet);
if (pQInfo->pTableDataInfo != NULL) {
size_t num = taosHashGetSize(pQInfo->pTableList);
for (int32_t j = 0; j < num; ++j) {
// size_t num = taosHashGetSize(pQInfo->pTableIdList);
for (int32_t j = 0; j < 0; ++j) {
destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols);
}
}
......@@ -2337,11 +2328,11 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
pQuery->lastKey = pQuery->window.skey;
// create runtime environment
SColumnModel *pTagSchemaInfo = pQInfo->pSidSet->pColumnModel;
// SColumnModel *pTagSchemaInfo = pQInfo->pSidSet->pColumnModel;
// get one queried meter
assert(0);
// SMeterObj *pMeter = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pSids[0]->sid);
// SMeterObj *pMeter = getMeterObj(pQInfo->pTableIdList, pQInfo->pSidSet->pTableIdList[0]->sid);
pRuntimeEnv->pTSBuf = param;
pRuntimeEnv->cur.vnodeIndex = -1;
......@@ -2388,7 +2379,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
SArray *sa = taosArrayInit(1, POINTER_BYTES);
// for(int32_t i = 0; i < pQInfo->pSidSet->numOfTables; ++i) {
// SMeterObj *p1 = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pSids[i]->sid);
// SMeterObj *p1 = getMeterObj(pQInfo->pTableIdList, pQInfo->pSidSet->pTableIdList[i]->sid);
// taosArrayPush(sa, &p1);
// }
......@@ -2418,7 +2409,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
*/
void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
if (pQInfo != NULL) {
assert(taosHashGetSize(pQInfo->pTableList) >= 1);
// assert(taosHashGetSize(pQInfo->pTableIdList) >= 1);
}
#if 0
......@@ -2429,7 +2420,7 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
} else {
int32_t num = 0;
for (int32_t i = 0; i < pQInfo->numOfMeters; ++i) {
SMeterObj *pMeter = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pSids[i]->sid);
SMeterObj *pMeter = getMeterObj(pQInfo->pTableIdList, pQInfo->pSidSet->pTableIdList[i]->sid);
atomic_fetch_sub_32(&(pMeter->numOfQueries), 1);
if (pMeter->numOfQueries > 0) {
......@@ -2592,15 +2583,72 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl
return pDataBlock;
}
int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
int firstPos, lastPos, midPos = -1;
int numOfPoints;
TSKEY *keyList;
if (num <= 0) return -1;
keyList = (TSKEY *)pValue;
firstPos = 0;
lastPos = num - 1;
if (order == 0) {
// find the first position which is smaller than the key
while (1) {
if (key >= keyList[lastPos]) return lastPos;
if (key == keyList[firstPos]) return firstPos;
if (key < keyList[firstPos]) return firstPos - 1;
numOfPoints = lastPos - firstPos + 1;
midPos = (numOfPoints >> 1) + firstPos;
if (key < keyList[midPos]) {
lastPos = midPos - 1;
} else if (key > keyList[midPos]) {
firstPos = midPos + 1;
} else {
break;
}
}
} else {
// find the first position which is bigger than the key
while (1) {
if (key <= keyList[firstPos]) return firstPos;
if (key == keyList[lastPos]) return lastPos;
if (key > keyList[lastPos]) {
lastPos = lastPos + 1;
if (lastPos >= num)
return -1;
else
return lastPos;
}
numOfPoints = lastPos - firstPos + 1;
midPos = (numOfPoints >> 1) + firstPos;
if (key < keyList[midPos]) {
lastPos = midPos - 1;
} else if (key > keyList[midPos]) {
firstPos = midPos + 1;
} else {
break;
}
}
}
return midPos;
}
static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
#if 0
SQuery *pQuery = pRuntimeEnv->pQuery;
assert(0);
// __block_search_fn_t searchFn = vnodeSearchKeyFunc[pRuntimeEnv->pTabObj->searchAlgorithm];
int64_t cnt = 0;
dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d", GET_QINFO_ADDR(pQuery),
pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order);
dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d",
GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order);
tsdb_query_handle_t pQueryHandle = pRuntimeEnv->pQueryHandle;
......@@ -2619,17 +2667,15 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SWindowResInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (QUERY_IS_ASC_QUERY(pQuery)) {
// doGetAlignedIntervalQueryRangeImpl(pQuery, blockInfo.window.skey, blockInfo.window.skey,
// pQueryHandle->window.ekey, &skey1, &ekey1, &w);
doGetAlignedIntervalQueryRangeImpl(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey, &skey1, &ekey1, &w);
pWindowResInfo->startTime = w.skey;
pWindowResInfo->prevSKey = w.skey;
} else {
// the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
TSKEY winStart = blockInfo.window.ekey - pQuery->intervalTime;
// doGetAlignedIntervalQueryRangeImpl(pQuery, winStart, pQueryHandle->window.ekey,
// blockInfo.window.ekey, &skey1, &ekey1, &w);
doGetAlignedIntervalQueryRangeImpl(pQuery, winStart, pQuery->window.ekey, blockInfo.window.ekey, &skey1, &ekey1, &w);
// pWindowResInfo->startTime = pQueryHandle->window.skey;
pWindowResInfo->startTime = pQuery->window.skey;
pWindowResInfo->prevSKey = w.skey;
}
}
......@@ -2638,20 +2684,17 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SDataStatis *pStatis = NULL;
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis);
// int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, searchFn, &numOfRes,
// &pRuntimeEnv->windowResInfo, pDataBlock);
int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, &numOfRes,
&pRuntimeEnv->windowResInfo, pDataBlock);
// dTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", fileId:%d, slot:%d, pos:%d, rows:%d, checked:%d",
// GET_QINFO_ADDR(pQuery), blockInfo.window.skey, blockInfo.window.ekey, pQueryHandle->cur.fileId, pQueryHandle->cur.slot,
// pQuery->pos, blockInfo.size, forwardStep);
// save last access position
// cnt += forwardStep;
// if (queryPaused(pQuery, &blockInfo, forwardStep)) {
// if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
// break;
// }
cnt += forwardStep;
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
break;
}
}
......@@ -2673,8 +2716,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
}
return cnt;
#endif
return 0;
}
static void updatelastkey(SQuery *pQuery, STableQueryInfo *pTableQInfo) { pTableQInfo->lastKey = pQuery->lastKey; }
......@@ -2935,29 +2976,29 @@ int32_t mergeMetersResultToOneGroups(SQInfo *pQInfo) {
int64_t st = taosGetTimestampMs();
int32_t ret = TSDB_CODE_SUCCESS;
while (pQInfo->subgroupIdx < pQInfo->pSidSet->numOfSubSet) {
int32_t start = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx];
int32_t end = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1];
assert(0);
// ret = doMergeMetersResultsToGroupRes(pQInfo, pQuery, pRuntimeEnv, pQInfo->pTableDataInfo, start, end);
if (ret < 0) { // not enough disk space to save the data into disk
return -1;
}
pQInfo->subgroupIdx += 1;
// this group generates at least one result, return results
if (ret > 0) {
break;
}
assert(pQInfo->numOfGroupResultPages == 0);
dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1);
}
dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", GET_QINFO_ADDR(pQuery),
pQInfo->subgroupIdx - 1, pQInfo->pSidSet->numOfSubSet, taosGetTimestampMs() - st);
// while (pQInfo->subgroupIdx < pQInfo->pSidSet->numOfSubSet) {
// int32_t start = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx];
// int32_t end = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1];
//
// assert(0);
// // ret = doMergeMetersResultsToGroupRes(pQInfo, pQuery, pRuntimeEnv, pQInfo->pTableDataInfo, start, end);
// if (ret < 0) { // not enough disk space to save the data into disk
// return -1;
// }
//
// pQInfo->subgroupIdx += 1;
//
// // this group generates at least one result, return results
// if (ret > 0) {
// break;
// }
//
// assert(pQInfo->numOfGroupResultPages == 0);
// dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1);
// }
//
// dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", GET_QINFO_ADDR(pQuery),
// pQInfo->subgroupIdx - 1, pQInfo->pSidSet->numOfSubSet, taosGetTimestampMs() - st);
return TSDB_CODE_SUCCESS;
}
......@@ -2972,10 +3013,10 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
}
// set current query completed
if (pQInfo->numOfGroupResultPages == 0 && pQInfo->subgroupIdx == pQInfo->pSidSet->numOfSubSet) {
// if (pQInfo->numOfGroupResultPages == 0 && pQInfo->subgroupIdx == pQInfo->pSidSet->numOfSubSet) {
// pQInfo->tableIndex = pQInfo->pSidSet->numOfTables;
return;
}
// return;
// }
}
SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv;
......@@ -3282,7 +3323,7 @@ void disableFunctForSuppleScan(SQInfo *pQInfo, int32_t order) {
}
if (isIntervalQuery(pQuery)) {
size_t numOfTables = taosHashGetSize(pQInfo->pTableList);
size_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList);
for (int32_t i = 0; i < numOfTables; ++i) {
STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
......@@ -3320,7 +3361,6 @@ void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTa
void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
// int32_t rows = pRuntimeEnv->pTabObj->pointsPerFileBlock;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
......@@ -3339,8 +3379,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
}
assert(0);
// memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * rows);
memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * pQuery->capacity);
}
initCtxOutputBuf(pRuntimeEnv);
......@@ -3884,7 +3923,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) {
totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo);
} else {
totalSubset = pQInfo->pSidSet->numOfSubSet;
// totalSubset = pQInfo->pSidSet->numOfSubSet;
}
return totalSubset;
......@@ -4275,7 +4314,7 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *param) {
pQuery->window.ekey, pQuery->order.order);
sem_post(&pQInfo->dataReady);
// pQInfo->over = 1;
pQInfo->killed = 1;
return TSDB_CODE_SUCCESS;
}
......@@ -4298,10 +4337,7 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *param) {
taosArrayPush(cols, &pQuery->colList[i]);
}
SArray* sa = taosArrayInit(1, sizeof(int16_t));
taosArrayPush(sa, &pQInfo->pSidSet->pSids[0]->sid);
pQInfo->runtimeEnv.pQueryHandle = tsdbQueryByTableId(&cond, sa, cols);
pQInfo->runtimeEnv.pQueryHandle = tsdbQueryByTableId(&cond, pQInfo->pTableIdList, cols);
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
pRuntimeEnv->pQuery = pQuery;
......@@ -4588,7 +4624,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
#if 0
SQuery* pQuery = pRuntimeEnv->pQuery;
// tSidSet *pSids = pSupporter->pSidSet;
// tSidSet *pTableIdList = pSupporter->pSidSet;
int32_t vid = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[0]->sid)->vnode;
......@@ -4597,12 +4633,12 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0);
while (pSupporter->subgroupIdx < pSids->numOfSubSet) {
int32_t start = pSids->starterPos[pSupporter->subgroupIdx];
int32_t end = pSids->starterPos[pSupporter->subgroupIdx + 1] - 1;
while (pSupporter->subgroupIdx < pTableIdList->numOfSubSet) {
int32_t start = pTableIdList->starterPos[pSupporter->subgroupIdx];
int32_t end = pTableIdList->starterPos[pSupporter->subgroupIdx + 1] - 1;
if (isFirstLastRowQuery(pQuery)) {
dTrace("QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pSids->numOfSubSet,
dTrace("QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet,
pSupporter->subgroupIdx);
TSKEY key = -1;
......@@ -4635,7 +4671,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
int64_t num = doCheckMetersInGroup(pQInfo, index, start);
assert(num >= 0);
} else {
dTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pSids->numOfSubSet,
dTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet,
pSupporter->subgroupIdx);
for (int32_t k = start; k <= end; ++k) {
......@@ -4680,7 +4716,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
}
}
if (pSupporter->meterIdx >= pSids->numOfTables) {
if (pSupporter->meterIdx >= pTableIdList->numOfTables) {
return;
}
......@@ -4833,7 +4869,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
dTrace(
"QInfo %p vid:%d, numOfMeters:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d,"
"next skey:%" PRId64 ", offset:%" PRId64,
pQInfo, vid, pSids->numOfTables, pSupporter->meterIdx, pSids->numOfSubSet, pQuery->pointsRead, pQInfo->pointsRead,
pQInfo, vid, pTableIdList->numOfTables, pSupporter->meterIdx, pTableIdList->numOfSubSet, pQuery->pointsRead, pQInfo->pointsRead,
pQInfo->pointsReturned, pQuery->skey, pQuery->limit.offset);
#endif
}
......@@ -4878,7 +4914,7 @@ static void doOrderedScan(SQInfo *pQInfo) {
static void setupMeterQueryInfoForSupplementQuery(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
int32_t num = taosHashGetSize(pQInfo->pTableList);
int32_t num = taosHashGetSize(pQInfo->pTableIdList);
for (int32_t i = 0; i < num; ++i) {
// STableQueryInfo *pTableQueryInfo = pSupporter->pMeterDataInfo[i].pTableQInfo;
// changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo, pSupporter->rawSKey, pSupporter->rawEKey);
......@@ -5017,7 +5053,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
* select count(*)/top(field,k)/avg(field name) from table_name [where ts>now-1a];
* select count(*) from table_name group by status_column;
*/
static void vnodeSingleTableFixedOutputProcessor(SQInfo *pQInfo) {
static void tableFixedOutputProcessor(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
......@@ -5044,7 +5080,7 @@ static void vnodeSingleTableFixedOutputProcessor(SQInfo *pQInfo) {
pQInfo->rec.pointsRead = pQuery->rec.pointsRead;
}
static void vnodeSingleTableMultiOutputProcessor(SQInfo *pQInfo) {
static void tableMultiOutputProcessor(SQInfo *pQInfo) {
#if 0
SQuery * pQuery = &pQInfo->query;
SMeterObj *pMeterObj = pQInfo->pObj;
......@@ -5148,7 +5184,7 @@ static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) {
}
/* handle time interval query on single table */
static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) {
static void tableIntervalProcessor(SQInfo *pQInfo) {
// STable *pMeterObj = pQInfo->pObj;
SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->runtimeEnv);
......@@ -5212,31 +5248,18 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) {
// pQInfo->pointsRead - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned);
}
void qTableQuery(void *pReadMsg) {
// SQInfo *pQInfo = (SQInfo *)pReadMsg->ahandle;
void qTableQuery(SQInfo* pQInfo) {
assert(pQInfo != NULL);
#if 0
if (pQInfo == NULL) {
dTrace("%p freed abort query", pQInfo);
if (pQInfo->killed) {
dTrace("QInfo:%p it is already killed, abort", pQInfo);
return;
}
// if (pQInfo->killed) {
// dTrace("QInfo:%p it is already killed, abort", pQInfo);
// vnodeDecRefCount(pQInfo);
//
// return;
// }
// assert(pQInfo->refCount >= 1);
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = &pRuntimeEnv->pQuery;
// assert(pRuntimeEnv->pMeterObj == pMeterObj);
SQuery *pQuery = pRuntimeEnv->pQuery;
// dTrace("vid:%d sid:%d id:%s, query thread is created, numOfQueries:%d, QInfo:%p", pMeterObj->vnode, pMeterObj->sid,
// pMeterObj->meterId, pMeterObj->numOfQueries, pQInfo);
// dTrace("vid:%d sid:%d id:%s, query thread is created, numOfQueries:%d, QInfo:%p", pQInfo);
if (vnodeHasRemainResults(pQInfo)) {
/*
......@@ -5254,15 +5277,9 @@ void qTableQuery(void *pReadMsg) {
pQInfo->pointsInterpo += numOfInterpo;
pQInfo->rec.pointsRead += pQuery->rec.pointsRead;
// dTrace(
// "QInfo:%p vid:%d sid:%d id:%s, %d points returned %d points interpo, totalRead:%d totalInterpo:%d "
// "totalReturn:%d",
// pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo,
// pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned);
// dTrace("QInfo:%p %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d",
// pQInfo, pQuery->pointsRead, numOfInterpo, pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned);
sem_post(&pQInfo->dataReady);
// vnodeDecRefCount(pQInfo);
return;
}
......@@ -5287,26 +5304,20 @@ void qTableQuery(void *pReadMsg) {
// pQInfo->pointsInterpo, pQInfo->pointsReturned);
sem_post(&pQInfo->dataReady);
// vnodeDecRefCount(pQInfo);
return;
}
}
}
assert(0);
// pQInfo->over = 1;
// dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode, pMeterObj->sid,
// pMeterObj->meterId, pQInfo->pointsRead);
// vnodePrintQueryStatistics(pSupporter);
sem_post(&pQInfo->dataReady);
// vnodeDecRefCount(pQInfo);
return;
}
/* number of points returned during this query */
// number of points returned during this query
pQuery->rec.pointsRead = 0;
int64_t st = taosGetTimestampUs();
......@@ -5314,14 +5325,15 @@ void qTableQuery(void *pReadMsg) {
// group by normal column, sliding window query, interval query are handled by interval query processor
if (pQuery->intervalTime != 0 || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation)
// assert(pQuery->checkBufferInLoop == 0 && pQuery->pointsOffset == pQuery->pointsToRead);
vnodeSingleTableIntervalProcessor(pQInfo);
tableIntervalProcessor(pQInfo);
} else {
if (isFixedOutputQuery(pQuery)) {
assert(pQuery->checkBufferInLoop == 0);
vnodeSingleTableFixedOutputProcessor(pQInfo);
tableFixedOutputProcessor(pQInfo);
} else { // diff/add/multiply/subtract/division
assert(pQuery->checkBufferInLoop == 1);
vnodeSingleTableMultiOutputProcessor(pQInfo);
tableMultiOutputProcessor(pQInfo);
}
}
......@@ -5339,7 +5351,6 @@ void qTableQuery(void *pReadMsg) {
sem_post(&pQInfo->dataReady);
// vnodeDecRefCount(pQInfo);
#endif
}
void qSuperTableQuery(void *pReadMsg) {
......@@ -5449,17 +5460,12 @@ static int32_t validateQueryMeterMsg(SQueryTableMsg *pQueryTableMsg) {
return 0;
}
int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg) {
pQueryTableMsg->vnode = htons(pQueryTableMsg->vnode);
static int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg, SArray** pTableIdList) {
pQueryTableMsg->vgId = htons(pQueryTableMsg->vgId);
pQueryTableMsg->numOfTables = htonl(pQueryTableMsg->numOfTables);
#ifdef TSKEY32
pQueryTableMsg->skey = htonl(pQueryTableMsg->skey);
pQueryTableMsg->ekey = htonl(pQueryTableMsg->ekey);
#else
pQueryTableMsg->window.skey = htobe64(pQueryTableMsg->window.skey);
pQueryTableMsg->window.ekey = htobe64(pQueryTableMsg->window.ekey);
#endif
pQueryTableMsg->order = htons(pQueryTableMsg->order);
pQueryTableMsg->orderColId = htons(pQueryTableMsg->orderColId);
......@@ -5477,6 +5483,7 @@ int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg) {
pQueryTableMsg->limit = htobe64(pQueryTableMsg->limit);
pQueryTableMsg->offset = htobe64(pQueryTableMsg->offset);
pQueryTableMsg->tsOffset = htonl(pQueryTableMsg->tsOffset);
pQueryTableMsg->tsLen = htonl(pQueryTableMsg->tsLen);
pQueryTableMsg->tsNumOfBlocks = htonl(pQueryTableMsg->tsNumOfBlocks);
......@@ -5487,7 +5494,6 @@ int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg) {
return TSDB_CODE_INVALID_QUERY_MSG;
}
// STableIdInfo **pSids = NULL;
char *pMsg = (char *)(pQueryTableMsg->colList) + sizeof(SColumnInfo) * pQueryTableMsg->numOfCols;
for (int32_t col = 0; col < pQueryTableMsg->numOfCols; ++col) {
......@@ -5585,23 +5591,26 @@ int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg) {
pMsg += pQueryTableMsg->colNameLen;
}
STableIdInfo **pSids = (STableIdInfo **)calloc(pQueryTableMsg->numOfTables, sizeof(STableIdInfo *));
pQueryTableMsg->pSidExtInfo = (uint64_t)pSids;
*pTableIdList = taosArrayInit(pQueryTableMsg->numOfTables, sizeof(STableIdInfo));
pSids[0] = (STableIdInfo *)pMsg;
pSids[0]->sid = htonl(pSids[0]->sid);
pSids[0]->uid = htobe64(pSids[0]->uid);
pSids[0]->key = htobe64(pSids[0]->key);
STableIdInfo* pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->sid = htonl(pTableIdInfo->sid);
pTableIdInfo->uid = htobe64(pTableIdInfo->uid);
pTableIdInfo->key = htobe64(pTableIdInfo->key);
taosArrayPush(*pTableIdList, pTableIdInfo);
pMsg += sizeof(STableIdInfo);
for (int32_t j = 1; j < pQueryTableMsg->numOfTables; ++j) {
pSids[j] = (STableIdInfo *)((char *)pSids[j - 1] + sizeof(STableIdInfo) + pQueryTableMsg->tagLength);
pSids[j]->sid = htonl(pSids[j]->sid);
pSids[j]->uid = htobe64(pSids[j]->uid);
pSids[j]->key = htobe64(pSids[j]->key);
}
pTableIdInfo = (STableIdInfo *)pMsg;
pMsg = (char *)pSids[pQueryTableMsg->numOfTables - 1];
pMsg += sizeof(STableIdInfo) + pQueryTableMsg->tagLength;
pTableIdInfo->sid = htonl(pTableIdInfo->sid);
pTableIdInfo->uid = htobe64(pTableIdInfo->uid);
pTableIdInfo->key = htobe64(pTableIdInfo->key);
taosArrayPush(*pTableIdList, pTableIdInfo);
pMsg += sizeof(STableIdInfo);
}
if (pQueryTableMsg->numOfGroupCols > 0 || pQueryTableMsg->numOfTagsCols > 0) { // group by tag columns
pQueryTableMsg->pTagSchema = (uint64_t)pMsg;
......@@ -5698,8 +5707,9 @@ static int32_t buildAirthmeticExprFromMsg(SSqlFunctionExpr *pExpr, SQueryTableMs
return TSDB_CODE_SUCCESS;
}
int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr **pSqlFuncExpr) {
static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr **pSqlFuncExpr) {
*pSqlFuncExpr = NULL;
int32_t code = TSDB_CODE_SUCCESS;
SSqlFunctionExpr *pExprs = (SSqlFunctionExpr *)calloc(1, sizeof(SSqlFunctionExpr) * pQueryMsg->numOfOutputCols);
if (pExprs == NULL) {
......@@ -5732,24 +5742,24 @@ int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr
bytes = pTagSchema[pColumnIndexExInfo->colIdx].bytes;
} else { // parse the arithmetic expression
// if (pExprs[i].pBase.functionId == TSDB_FUNC_ARITHM) {
// *code = buildAirthmeticExprFromMsg(&pExprs[i], pQueryMsg);
//
// if (*code != TSDB_CODE_SUCCESS) {
// tfree(pExprs);
// return NULL;
// }
//
// type = TSDB_DATA_TYPE_DOUBLE;
// bytes = tDataTypeDesc[type].nSize;
// } else { // parse the normal column
// int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase);
// assert(j < pQueryMsg->numOfCols);
//
// SColumnInfo* pCol = &pQueryMsg->colList[j];
// type = pCol->type;
// bytes = pCol->bytes;
// }
if (pExprs[i].pBase.functionId == TSDB_FUNC_ARITHM) {
code = buildAirthmeticExprFromMsg(&pExprs[i], pQueryMsg);
if (code != TSDB_CODE_SUCCESS) {
tfree(pExprs);
return code;
}
type = TSDB_DATA_TYPE_DOUBLE;
bytes = tDataTypeDesc[type].nSize;
} else { // parse the normal column
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase);
assert(j < pQueryMsg->numOfCols);
SColumnInfo* pCol = &pQueryMsg->colList[j];
type = pCol->type;
bytes = pCol->bytes;
}
}
int32_t param = pExprs[i].pBase.arg[0].argValue.i64;
......@@ -5792,7 +5802,7 @@ int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr
return TSDB_CODE_SUCCESS;
}
SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t *code) {
static SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t *code) {
if (pQueryMsg->numOfGroupCols == 0) {
return NULL;
}
......@@ -5815,7 +5825,7 @@ SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t *co
return pGroupbyExpr;
}
int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) {
static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
if (pQuery->colList[i].info.numOfFilters > 0) {
pQuery->numOfFilterCols++;
......@@ -5899,7 +5909,7 @@ int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) {
return TSDB_CODE_SUCCESS;
}
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs) {
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs, SArray* pTableIdList) {
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
if (pQInfo == NULL) {
goto _clean_memory;
......@@ -5935,6 +5945,21 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
goto _clean_memory;
}
for (int16_t i = 0; i < numOfCols; ++i) {
pQuery->colList[i].info = pQueryMsg->colList[i];
// SColumnInfo *pColInfo = &pQuery->colList[i].data;
// pColInfo->filters = NULL;
// if (colList[i].numOfFilters > 0) {
// pColInfo->filters = calloc(1, colList[i].numOfFilters * sizeof(SColumnFilterInfo));
//
// for (int32_t j = 0; j < colList[i].numOfFilters; ++j) {
// tscColumnFilterInfoCopy(&pColInfo->filters[j], &colList[i].filters[j]);
// }
// } else {
// pQuery->colList[i].data.filters = NULL;
// }
}
// calculate the result row size
for (int16_t col = 0; col < numOfOutputCols; ++col) {
assert(pExprs[col].resBytes > 0);
......@@ -5952,13 +5977,13 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
goto _clean_memory;
}
// set the output buffer capacity
pQuery->capacity = 4096;
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) {
assert(pExprs[col].interResBytes >= pExprs[col].resBytes);
// allocate additional memory for interResults that are usually larger then final results
// size_t size = (pQInfo->query.pointsToRead + 1) * pExprs[col].resBytes + pExprs[col].interResBytes +
// sizeof(SData);
size_t size = 1000;
size_t size = (pQuery->capacity + 1) * pExprs[col].resBytes + pExprs[col].interResBytes + sizeof(SData);
pQuery->sdata[col] = (SData *)calloc(1, size);
if (pQuery->sdata[col] == NULL) {
goto _clean_memory;
......@@ -5977,10 +6002,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
// to make sure third party won't overwrite this structure
pQInfo->signature = (uint64_t)pQInfo;
pQuery->pos = -1;
pQInfo->pTableIdList = pTableIdList;
// dTrace("vid:%d sid:%d meterId:%s, QInfo is allocated:%p", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId,
// pQInfo);
pQuery->pos = -1;
// dTrace("vid:%d sid:%d meterId:%s, QInfo is allocated:%p", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQInfo);
return pQInfo;
......@@ -6005,28 +6030,107 @@ _clean_memory:
return NULL;
}
int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs,
SQInfo **pQInfo) {
SQuery *pQuery;
bool isQInfoValid(void *param) {
SQInfo *pQInfo = (SQInfo *)param;
if (pQInfo == NULL) {
return false;
}
/*
* pQInfo->signature may be changed by another thread, so we assign value of signature
* into local variable, then compare by using local variable
*/
uint64_t sig = pQInfo->signature;
return (sig == (uint64_t)pQInfo);
}
void vnodeFreeQInfo(SQInfo* pQInfo, bool decQueryRef) {
if (!isQInfoValid(pQInfo)) {
return;
}
pQInfo->killed = 1;
dTrace("QInfo:%p start to free SQInfo", pQInfo);
if (decQueryRef) {
vnodeDecMeterRefcnt(pQInfo);
}
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
for (int col = 0; col < pQuery->numOfOutputCols; ++col) {
tfree(pQuery->sdata[col]);
}
// for (int col = 0; col < pQuery->numOfCols; ++col) {
// vnodeFreeColumnInfo(&pQuery->colList[col].data);
// }
//
// if (pQuery->colList[0].colIdx != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
// tfree(pQuery->tsData);
// }
sem_destroy(&(pQInfo->dataReady));
vnodeQueryFreeQInfoEx(pQInfo);
for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) {
SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i];
if (pColFilter->numOfFilters > 0) {
tfree(pColFilter->pFilters);
}
}
tfree(pQuery->pFilterInfo);
tfree(pQuery->colList);
tfree(pQuery->sdata);
if (pQuery->pSelectExpr != NULL) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SSqlBinaryExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].binExprInfo;
if (pBinExprInfo->numOfCols > 0) {
tfree(pBinExprInfo->pReqColumns);
tSQLBinaryExprDestroy(&pBinExprInfo->pBinExpr, NULL);
}
}
tfree(pQuery->pSelectExpr);
}
if (pQuery->defaultVal != NULL) {
tfree(pQuery->defaultVal);
}
tfree(pQuery->pGroupbyExpr);
tfree(pQuery);
// dTrace("QInfo:%p vid:%d sid:%d meterId:%s, QInfo is freed", pQInfo, pObj->vnode, pObj->sid, pObj->meterId);
//destroy signature, in order to avoid the query process pass the object safety check
memset(pQInfo, 0, sizeof(SQInfo));
tfree(pQInfo);
}
static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs,
SArray* pTableIdList, SQInfo **pQInfo) {
int32_t code = TSDB_CODE_SUCCESS;
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pSqlExprs);
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pSqlExprs, pTableIdList);
if (pQInfo == NULL) {
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _error;
}
pQuery = (*pQInfo)->runtimeEnv.pQuery;
SQuery* pQuery = (*pQInfo)->runtimeEnv.pQuery;
dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo);
STableIdInfo **pSids = (STableIdInfo **)pQueryMsg->pSidExtInfo;
if (pSids != NULL && pSids[0]->key > 0) {
pQuery->window.skey = pSids[0]->key;
} else {
// STableIdInfo **pTableIdList = (STableIdInfo **)pQueryMsg->pSidExtInfo;
// if (pTableIdList != NULL && pTableIdList[0]->key > 0) {
// pQuery->window.skey = pTableIdList[0]->key;
// } else {
pQuery->window.skey = pQueryMsg->window.skey;
}
pQuery->window.ekey = pQueryMsg->window.ekey;
pQuery->lastKey = pQuery->window.skey;
if (sem_init(&(*pQInfo)->dataReady, 0, 0) != 0) {
......@@ -6038,10 +6142,6 @@ int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SS
vnodeParametersSafetyCheck(pQuery);
(*pQInfo)->pTableList = taosHashInit(pQueryMsg->numOfTables, taosIntHash_32, false);
// taosHashPut(pSupporter->pMetersHashTable, (const char *)&pMetersObj[0]->sid, sizeof(pMeterObj[0].sid),
// (char *)&pMetersObj[0], POINTER_BYTES);
STSBuf *pTSBuf = NULL;
if (pQueryMsg->tsLen > 0) { // open new file to save the result
char *tsBlock = (char *)pQueryMsg + pQueryMsg->tsOffset;
......@@ -6062,13 +6162,11 @@ int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SS
// dTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo,
// pQInfo->refCount);
// taosScheduleTask(queryQhandle, &schedMsg);
return code;
_error:
// table query ref will be decrease during error handling
// vnodeFreeQInfo(pQInfo, false);
vnodeFreeQInfo(*pQInfo, false);
return code;
}
......@@ -6076,7 +6174,8 @@ int32_t qCreateQueryInfo(SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) {
assert(pQueryTableMsg != NULL);
int32_t code = TSDB_CODE_SUCCESS;
if ((code = convertQueryTableMsg(pQueryTableMsg)) != TSDB_CODE_SUCCESS) {
SArray* pTableIdList = NULL;
if ((code = convertQueryTableMsg(pQueryTableMsg, &pTableIdList)) != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -6087,7 +6186,7 @@ int32_t qCreateQueryInfo(SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) {
}
// todo check vnode status
if (pQueryTableMsg->pSidExtInfo == 0) {
if (pTableIdList == NULL || taosArrayGetSize(pTableIdList) == 0) {
dError("qmsg:%p, SQueryTableMsg wrong format", pQueryTableMsg);
code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over;
......@@ -6106,10 +6205,14 @@ int32_t qCreateQueryInfo(SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) {
if (QUERY_IS_STABLE_QUERY(pQueryTableMsg->queryType)) {
// pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryTableMsg, &code);
} else {
code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pQInfo);
code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, pQInfo);
}
_query_over:
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pTableIdList);
}
// if failed to add ref for all meters in this query, abort current query
// if (code != TSDB_CODE_SUCCESS) {
// vnodeDecQueryRefCount(pQueryTableMsg, pMeterObjList, incNumber);
......@@ -6127,3 +6230,33 @@ _query_over:
// atomic_fetch_add_32(&vnodeSelectReqNum, 1);
return TSDB_CODE_SUCCESS;
}
int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize) {
if (pQInfo == NULL || !isQInfoValid(pQInfo)) {
return TSDB_CODE_INVALID_QHANDLE;
}
if (pQInfo->killed) {
dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code);
if (pQInfo->code == TSDB_CODE_SUCCESS) {
return TSDB_CODE_QUERY_CANCELLED;
} else { // in case of not TSDB_CODE_SUCCESS, return the code to client
return abs(pQInfo->code);
}
}
sem_wait(&pQInfo->dataReady);
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
// *numOfRows = pQInfo->rec.pointsRead;
// *rowsize = pQuery->rowSize;
*numOfRows = 1;
// dTrace("QInfo:%p, retrieve data info completed, precision:%d, rowsize:%d, rows:%d, code:%d", pQInfo, *timePrec,
// *rowsize, *numOfRows, pQInfo->code);
if (pQInfo->code < 0) { // less than 0 means there are error existed.
return -pQInfo->code;
}
}
......@@ -16,3 +16,51 @@
#include "os.h"
#include "tsdb.h"
tsdb_query_handle_t *tsdbQueryByTableId(STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo) {
}
bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) {
return false;
}
SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
}
int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SDataStatis **pBlockStatis) {
}
SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList) {
}
int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow* window, tsdbpos_t position, int16_t order) {
}
int32_t tsdbDataBlockSeek(tsdb_query_handle_t *pQueryHandle, tsdbpos_t pos) {
}
tsdbpos_t tsdbDataBlockTell(tsdb_query_handle_t *pQueryHandle) {
return NULL;
}
SArray *tsdbRetrieveDataRow(tsdb_query_handle_t *pQueryHandle, SArray *pIdList, SQueryRowCond *pCond) {
}
tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stableId, const char *pTagFilterStr) {
}
STableIDList *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle) {
}
STableIDList *tsdbQueryTableList(int16_t stableId, const char *pTagCond) {
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册