提交 5468d172 编写于 作者: H hzcheng

Merge branch '2.0' into feature/2.0tsdb

......@@ -85,7 +85,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
STableDataBlocks** dataBlocks);
SVnodeSidList* tscGetVnodeSidList(SSuperTableMeta* pMetricmeta, int32_t vnodeIdx);
STableSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
/**
*
......
......@@ -62,7 +62,7 @@ typedef struct STableMeta {
int8_t numOfVpeers;
int16_t sversion;
SVnodeDesc vpeerDesc[TSDB_VNODES_SUPPORT];
int32_t vgid; // virtual group id, which current table belongs to
int32_t vgId; // virtual group id, which current table belongs to
int32_t sid; // the index of one table in a virtual node
uint64_t uid; // unique id of a table
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
......@@ -182,7 +182,7 @@ typedef struct STableDataBlocks {
char tableId[TSDB_TABLE_ID_LEN];
int8_t tsSource; // where does the UNIX timestamp come from, server or client
bool ordered; // if current rows are ordered or not
int64_t vgid; // virtual group id
int64_t vgId; // virtual group id
int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending
int32_t numOfTables; // number of tables in current submit block
......
......@@ -318,15 +318,16 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
SVnodeSidList *pSidList = (SVnodeSidList *)((char *)pMetricMeta + pMetricMeta->list[i]);
for (int32_t j = 0; j < pSidList->numOfSids; ++j) {
STableSidExtInfo *pSidExt = tscGetMeterSidInfo(pSidList, j);
STableIdInfo *pSidExt = tscGetMeterSidInfo(pSidList, j);
for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutputCols; ++k) {
SColIndexEx *pColIndex = &tscSqlExprGet(pQueryInfo, k)->colInfo;
int16_t offsetId = pColIndex->colIdx;
assert((pColIndex->flag & TSDB_COL_TAG) != 0);
assert(0);
char * val = pSidExt->tags + vOffset[offsetId];
char * val = NULL;//pSidExt->tags + vOffset[offsetId];
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, k);
memcpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, k) * totalNumOfResults + pField->bytes * rowIdx, val,
......
......@@ -698,7 +698,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(dataBuf->pData);
tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
dataBuf->vgid = pTableMeta->vgid;
dataBuf->vgId = pTableMeta->vgId;
dataBuf->numOfTables = 1;
/*
......@@ -1058,7 +1058,6 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
goto _error_clean;
}
void *fp = pSql->fp;
ptrdiff_t pos = pSql->asyncTblPos - pSql->sqlstr;
if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) {
......@@ -1068,7 +1067,6 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
* And during the getMeterMetaCallback function, the sql string will be parsed from the
* interrupted position.
*/
if (fp != NULL) {
if (TSDB_CODE_ACTION_IN_PROGRESS == code) {
tscTrace("async insert and waiting to get meter meta, then continue parse sql from offset: %" PRId64, pos);
return code;
......@@ -1077,7 +1075,6 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
// todo add to return
tscError("async insert parse error, code:%d, %s", code, tstrerror(code));
pSql->asyncTblPos = NULL;
}
goto _error_clean; // TODO: should _clean or _error_clean to async flow ????
}
......@@ -1096,15 +1093,13 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
goto _error_clean;
}
int32_t numOfCols = tscGetNumOfTags(pTableMetaInfo->pTableMeta);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
if (sToken.type == TK_VALUES) {
SParsedDataColInfo spd = {.numOfCols = numOfCols};
SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns};
SSchema *pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
tscSetAssignedColumnInfo(&spd, pSchema, numOfCols);
tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
goto _error_clean;
......@@ -1243,7 +1238,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
// submit to more than one vnode
if (pCmd->pDataBlocks->nSize > 0) {
// merge according to vgid
// merge according to vgId
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
goto _error_clean;
}
......
......@@ -165,7 +165,7 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
pTableMeta->sid = pTableMetaMsg->sid;
pTableMeta->uid = pTableMetaMsg->uid;
pTableMeta->vgid = pTableMetaMsg->vgid;
pTableMeta->vgId = pTableMetaMsg->vgId;
pTableMeta->numOfVpeers = pTableMetaMsg->numOfVpeers;
memcpy(pTableMeta->vpeerDesc, pTableMetaMsg->vpeerDesc, sizeof(SVnodeDesc) * pTableMeta->numOfVpeers);
......
......@@ -341,11 +341,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
* the tscShouldFreeAsyncSqlObj will success and tscFreeSqlObj free it immediately.
*/
bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
if (command == TSDB_SQL_INSERT) { // handle multi-vnode insertion situation
(*pSql->fp)(pSql, taosres, rpcMsg->code);
} else {
(*pSql->fp)(pSql->param, taosres, rpcMsg->code);
}
if (shouldFree) {
// If it is failed, all objects allocated during execution taos_connect_a should be released
......@@ -539,22 +535,27 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
char * pMsg, *pStart;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
pStart = pSql->cmd.payload + tsRpcHeadSize;
pMsg = pStart;
pShellMsg = (SShellSubmitMsg *)pMsg;
pShellMsg->desc.numOfVnodes = htonl(1);
pShellMsg->import = htons(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT) ? 0 : 1);
pShellMsg->vnode = 0; //htons(pTableMeta->vpeerDesc[pTableMeta->index].vnode);
pShellMsg->numOfSid = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted
pShellMsg->header.vgId = htonl(pTableMeta->vgId);
pShellMsg->header.contLen = htonl(pSql->cmd.payloadLen);
pShellMsg->numOfTables = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted
// pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
// tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pTableMeta->vpeerDesc[pTableMeta->index].ip),
// htons(pShellMsg->vnode));
pSql->cmd.payloadLen = sizeof(SShellSubmitMsg);
// pSql->cmd.payloadLen = sizeof(SShellSubmitMsg);
return TSDB_CODE_SUCCESS;
}
......@@ -598,7 +599,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(STableSidExtInfo)) * pVnodeSidList->numOfSids;
int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(STableIdInfo)) * pVnodeSidList->numOfSids;
int32_t outputColumnSize = pQueryInfo->exprsInfo.numOfExprs * sizeof(SSqlFuncExprMsg);
int32_t size = meterInfoSize + outputColumnSize + srcColListSize + exprSize + MIN_QUERY_MSG_PKT_SIZE;
......@@ -609,37 +610,34 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
return size;
}
static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfTables, int32_t vnodeId, char *pMsg) {
static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfTables, int32_t vgId, char *pMsg) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
tscTrace("%p vid:%d, query on %d meters", pSql, vnodeId, numOfTables);
tscTrace("%p vgId:%d, query on %d tables", pSql, vgId, numOfTables);
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
#ifdef _DEBUG_VIEW
tscTrace("%p sid:%d, uid:%" PRIu64, pSql, pTableMetaInfo->pTableMeta->sid, pTableMetaInfo->pTableMeta->uid);
#endif
STableSidExtInfo *pTableMetaInfo = (STableSidExtInfo *)pMsg;
pTableMetaInfo->sid = htonl(pTableMeta->sid);
pTableMetaInfo->uid = htobe64(pTableMeta->uid);
pTableMetaInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
pMsg += sizeof(STableSidExtInfo);
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->sid = htonl(pTableMeta->sid);
pTableIdInfo->uid = htobe64(pTableMeta->uid);
pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
pMsg += sizeof(STableIdInfo);
} else {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
for (int32_t i = 0; i < numOfTables; ++i) {
STableSidExtInfo *pTableMetaInfo = (STableSidExtInfo *)pMsg;
STableSidExtInfo *pQueryMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i);
pTableMetaInfo->sid = htonl(pQueryMeterInfo->sid);
pTableMetaInfo->uid = htobe64(pQueryMeterInfo->uid);
pTableMetaInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pQueryMeterInfo->uid));
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
STableIdInfo *pQueryMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i);
pMsg += sizeof(STableSidExtInfo);
pTableIdInfo->sid = htonl(pQueryMeterInfo->sid);
pTableIdInfo->uid = htobe64(pQueryMeterInfo->uid);
pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pQueryMeterInfo->uid));
memcpy(pMsg, pQueryMeterInfo->tags, pMetricMeta->tagLen);
pMsg += pMetricMeta->tagLen;
pMsg += sizeof(STableIdInfo);
#ifdef _DEBUG_VIEW
tscTrace("%p sid:%d, uid:%" PRId64, pSql, pQueryMeterInfo->sid, pQueryMeterInfo->uid);
......@@ -679,6 +677,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->uid = pTableMeta->uid;
pQueryMsg->numOfTagsCols = 0;
pQueryMsg->vgId = htonl(pTableMeta->vgId);
tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name);
} else { // query on super table
if (pTableMetaInfo->vnodeIndex < 0) {
......@@ -696,7 +695,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
tscTrace("%p query on vid:%d, number of tables:%d", pSql, vnodeId, numOfTables);
pQueryMsg->vnode = htons(vnodeId);
pQueryMsg->vgId = htons(vnodeId);
}
pQueryMsg->numOfTables = htonl(numOfTables);
......@@ -764,14 +763,14 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SColumnBase *pCol = tscColumnBaseInfoGet(&pQueryInfo->colList, i);
SSchema * pColSchema = &pSchema[pCol->colIndex.columnIndex];
if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
tscError("%p vid:%d sid:%d id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", pSql,
htons(pQueryMsg->vnode), pTableMeta->sid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex,
pColSchema->name);
return -1; // 0 means build msg failed
}
// if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
// pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
// tscError("%p vid:%d sid:%d id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", pSql,
// htons(pQueryMsg->vnode), pTableMeta->sid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex,
// pColSchema->name);
//
// return -1; // 0 means build msg failed
// }
pQueryMsg->colList[i].colId = htons(pColSchema->colId);
pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
......@@ -865,7 +864,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->colNameLen = htonl(len);
// serialize the table info (sid, uid, tags)
pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->vnode), pMsg);
pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->vgId), pMsg);
// only include the required tag column schema. If a tag is not required, it won't be sent to vnode
if (pTableMetaInfo->numOfTags > 0) {
......@@ -947,6 +946,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd->payloadLen = msgLen;
pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
pQueryMsg->contLen = htonl(msgLen);
assert(msgLen + minMsgSize() <= size);
return TSDB_CODE_SUCCESS;
......@@ -1849,12 +1850,12 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
pMetaMsg->sid = htonl(pMetaMsg->sid);
pMetaMsg->sversion = htons(pMetaMsg->sversion);
pMetaMsg->vgid = htonl(pMetaMsg->vgid);
pMetaMsg->vgId = htonl(pMetaMsg->vgId);
pMetaMsg->uid = htobe64(pMetaMsg->uid);
pMetaMsg->contLen = htons(pMetaMsg->contLen);
if (pMetaMsg->sid < 0 || pMetaMsg->vgid < 0) {
tscError("invalid meter vgid:%d, sid%d", pMetaMsg->vgid, pMetaMsg->sid);
if (pMetaMsg->sid < 0 || pMetaMsg->vgId < 0) {
tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgId, pMetaMsg->sid);
return TSDB_CODE_INVALID_VALUE;
}
......@@ -1948,11 +1949,11 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
pMeta->sid = htonl(pMeta->sid);
pMeta->sversion = htons(pMeta->sversion);
pMeta->vgid = htonl(pMeta->vgid);
pMeta->vgId = htonl(pMeta->vgId);
pMeta->uid = htobe64(pMeta->uid);
if (pMeta->sid <= 0 || pMeta->vgid < 0) {
tscError("invalid meter vgid:%d, sid%d", pMeta->vgid, pMeta->sid);
if (pMeta->sid <= 0 || pMeta->vgId < 0) {
tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
pSql->res.code = TSDB_CODE_INVALID_VALUE;
pSql->res.numOfTotal = i;
return TSDB_CODE_OTHERS;
......@@ -2067,7 +2068,7 @@ int tscProcessMetricMetaRsp(SSqlObj *pSql) {
pMeta->numOfVnodes = htonl(pMeta->numOfVnodes);
pMeta->tagLen = htons(pMeta->tagLen);
size += pMeta->numOfVnodes * sizeof(SVnodeSidList *) + pMeta->numOfTables * sizeof(STableSidExtInfo *);
size += pMeta->numOfVnodes * sizeof(SVnodeSidList *) + pMeta->numOfTables * sizeof(STableIdInfo *);
char *pBuf = calloc(1, size);
if (pBuf == NULL) {
......@@ -2093,16 +2094,16 @@ int tscProcessMetricMetaRsp(SSqlObj *pSql) {
tscTrace("%p metricmeta:vid:%d,numOfTables:%d", pSql, i, pLists->numOfSids);
pBuf += sizeof(SVnodeSidList) + sizeof(STableSidExtInfo *) * pSidLists->numOfSids;
pBuf += sizeof(SVnodeSidList) + sizeof(STableIdInfo *) * pSidLists->numOfSids;
rsp += sizeof(SVnodeSidList);
size_t elemSize = sizeof(STableSidExtInfo) + pNewMetricMeta->tagLen;
size_t elemSize = sizeof(STableIdInfo) + pNewMetricMeta->tagLen;
for (int32_t j = 0; j < pSidLists->numOfSids; ++j) {
pLists->pSidExtInfoList[j] = pBuf - (char *)pLists;
memcpy(pBuf, rsp, elemSize);
((STableSidExtInfo *)pBuf)->uid = htobe64(((STableSidExtInfo *)pBuf)->uid);
((STableSidExtInfo *)pBuf)->sid = htonl(((STableSidExtInfo *)pBuf)->sid);
((STableIdInfo *)pBuf)->uid = htobe64(((STableIdInfo *)pBuf)->uid);
((STableIdInfo *)pBuf)->sid = htonl(((STableIdInfo *)pBuf)->sid);
rsp += elemSize;
pBuf += elemSize;
......
......@@ -130,7 +130,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
pSql->pTscObj = pObj;
pSql->signature = pSql;
tsem_init(&pSql->rspSem, 0, 0);
// tsem_init(&pSql->emptyRspSem, 0, 1);
pObj->pSql = pSql;
pSql->fp = fp;
pSql->param = param;
......@@ -146,6 +146,8 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
return NULL;
}
// tsRpcHeaderSize will be updated during RPC initialization, so only after it initialization, this value is valid
tsInsertHeadSize = tsRpcHeadSize + sizeof(SShellSubmitMsg);
return pObj;
}
......
......@@ -202,7 +202,7 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
for (int32_t j = 0; j < pVnodeSidList->numOfSids; j++) {
STableSidExtInfo *pTableMetaInfo = tscGetMeterSidInfo(pVnodeSidList, j);
STableIdInfo *pTableMetaInfo = tscGetMeterSidInfo(pVnodeSidList, j);
int64_t uid = pTableMetaInfo->uid;
progress[numOfTables].uid = uid;
progress[numOfTables++].key = tscGetSubscriptionProgress(pSub, uid);
......
......@@ -34,7 +34,6 @@ void * pTscMgmtConn;
void * pSlaveConn;
void * tscCacheHandle;
int32_t globalCode = 0;
int initialized = 0;
int slaveIndex;
void * tscTmr;
void * tscQhandle;
......@@ -187,9 +186,7 @@ void taos_init_imp() {
if (tscCacheHandle == NULL) tscCacheHandle = taosCacheInit(tscTmr, refreshTime);
initialized = 1;
tscTrace("client is initialized successfully");
tsInsertHeadSize = tsRpcHeadSize + sizeof(SShellSubmitMsg);
}
void taos_init() { pthread_once(&tscinit, taos_init_imp); }
......
......@@ -191,7 +191,7 @@ SVnodeSidList* tscGetVnodeSidList(SSuperTableMeta* pMetricmeta, int32_t vnodeIdx
return (SVnodeSidList*)(pMetricmeta->list[vnodeIdx] + (char*)pMetricmeta);
}
STableSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) {
STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) {
if (pSidList == NULL) {
tscError("illegal sidlist");
return 0;
......@@ -206,7 +206,7 @@ STableSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) {
assert(pSidList->pSidExtInfoList[idx] >= 0);
return (STableSidExtInfo*)(pSidList->pSidExtInfoList[idx] + (char*)pSidList);
return (STableIdInfo*)(pSidList->pSidExtInfoList[idx] + (char*)pSidList);
}
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
......@@ -614,7 +614,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
*/
pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize;
assert(pCmd->allocSize >= pCmd->payloadLen + tsRpcHeadSize + 100);
assert(pCmd->allocSize >= pCmd->payloadLen + tsRpcHeadSize + 100 && pCmd->payloadLen > 0);
return TSDB_CODE_SUCCESS;
}
......@@ -705,8 +705,9 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
STableDataBlocks* pOneTableBlock = pTableDataBlockList->pData[i];
STableDataBlocks* dataBuf = NULL;
int32_t ret =
tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgid, TSDB_PAYLOAD_SIZE,
tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf);
if (ret != TSDB_CODE_SUCCESS) {
tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret);
......
......@@ -15,13 +15,16 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tlog.h"
#include "tqueue.h"
#include "trpc.h"
#include "dnodeRead.h"
#include "dnodeMgmt.h"
#include "dnodeRead.h"
#include "queryExecutor.h"
typedef struct {
int32_t code;
......@@ -88,7 +91,7 @@ void dnodeRead(SRpcMsg *pMsg) {
while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = 1; //htonl(pHead->vgId);
pHead->vgId = 1;//htonl(pHead->vgId);
pHead->contLen = pMsg->contLen; //htonl(pHead->contLen);
void *pVnode = dnodeGetVnode(pHead->vgId);
......@@ -223,20 +226,26 @@ static void dnodeProcessReadResult(SReadMsg *pRead) {
}
static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
void *pQInfo = (void*)100;
dTrace("query msg is disposed, qInfo:%p", pQInfo);
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont;
SQInfo* pQInfo = NULL;
int32_t code = qCreateQueryInfo(pQueryTableMsg, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = 0;
pRsp->code = code;
pRsp->qhandle = htobe64((uint64_t) (pQInfo));
SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = sizeof(SQueryTableRsp),
.code = 0,
.code = code,
.msgType = 0
};
// do execute query
qTableQuery(pQInfo);
rpcSendResponse(&rpcRsp);
}
......@@ -246,20 +255,50 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
dTrace("retrieve msg is disposed, qInfo:%p", pQInfo);
assert(pQInfo != NULL);
int32_t contLen = 100;
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *) rpcMallocCont(contLen);
int32_t rowSize = 0;
int32_t numOfRows = 0;
int32_t contLen = 0;
SRpcMsg rpcRsp = {0};
int32_t code = qRetrieveQueryResultInfo(pQInfo, &numOfRows, &rowSize);
if (code != TSDB_CODE_SUCCESS) {
contLen = sizeof(SRetrieveTableRsp);
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen);
pRsp->numOfRows = 0;
pRsp->precision = 0;
pRsp->offset = 0;
pRsp->useconds = 0;
SRpcMsg rpcRsp = {
rpcRsp = (SRpcMsg) {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = contLen,
.code = 0,
.code = code,
.msgType = 0
};
//todo free qinfo
} else {
contLen = 100;
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen);
pRsp->numOfRows = 0;
pRsp->precision = 0;
pRsp->offset = 0;
pRsp->useconds = 0;
*(int64_t*) pRsp->data = 1000;
rpcRsp = (SRpcMsg) {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = contLen,
.code = code,
.msgType = 0
};
}
rpcSendResponse(&rpcRsp);
}
......@@ -276,6 +276,8 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) {
pRsp->affectedRows = htonl(1);
pRsp->numOfFailedBlocks = 0;
// todo write to tsdb
SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
......
......@@ -198,10 +198,20 @@ typedef struct {
} SShellSubmitBlock;
typedef struct {
int32_t numOfVnodes;
} SMsgDesc;
typedef struct SMsgHead {
int32_t contLen;
int32_t vgId;
} SMsgHead;
typedef struct {
SMsgDesc desc;
SMsgHead header;
int16_t import;
int16_t vnode;
int32_t numOfSid; /* total number of sid */
char blks[]; /* numOfSid blocks, each blocks for one table */
int32_t numOfTables; // total number of sid
char blks[]; // number of data blocks, each table has at least one data block
} SShellSubmitMsg;
typedef struct {
......@@ -232,15 +242,6 @@ typedef struct {
uint32_t ip;
} SVnodeDesc;
typedef struct {
int32_t numOfVnodes;
} SMsgDesc;
typedef struct {
int32_t contLen;
int32_t vgId;
} SMsgHead;
typedef struct {
int32_t contLen;
int32_t vgId;
......@@ -434,15 +435,11 @@ typedef struct SColumnInfo {
SColumnFilterInfo *filters;
} SColumnInfo;
/*
* enable vnode to understand how to group several tables with different tag;
*/
typedef struct STableSidExtInfo {
typedef struct STableIdInfo {
int32_t sid;
int64_t uid;
TSKEY key; // key for subscription
char tags[];
} STableSidExtInfo;
TSKEY key; // last accessed ts, for subscription
} STableIdInfo;
typedef struct STimeWindow {
TSKEY skey;
......@@ -455,10 +452,10 @@ typedef struct STimeWindow {
* the outputCols will be 3 while the numOfCols is 1.
*/
typedef struct {
int16_t vnode;
int32_t numOfTables;
uint64_t pSidExtInfo; // table id & tag info ptr, in windows pointer may
int32_t contLen; // msg header
int16_t vgId;
int32_t numOfTables;
uint64_t uid;
STimeWindow window;
......@@ -504,7 +501,7 @@ typedef struct {
} SQueryTableMsg;
typedef struct {
char code;
int32_t code;
uint64_t qhandle;
} SQueryTableRsp;
......@@ -670,7 +667,7 @@ typedef struct {
SVnodeDesc vpeerDesc[TSDB_VNODES_SUPPORT];
int16_t index; // used locally
int32_t numOfSids;
int32_t pSidExtInfoList[]; // offset value of STableSidExtInfo
int32_t pSidExtInfoList[]; // offset value of STableIdInfo
} SVnodeSidList;
typedef struct {
......@@ -692,7 +689,7 @@ typedef struct STableMetaMsg {
int8_t numOfVpeers;
SVnodeDesc vpeerDesc[TSDB_VNODES_SUPPORT];
int32_t sid;
int32_t vgid;
int32_t vgId;
uint64_t uid;
SSchema schema[];
} STableMetaMsg;
......
......@@ -445,7 +445,7 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName
int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp) {
pMeta->uid = htobe64(pTable->uid);
pMeta->sid = htonl(pTable->sid);
pMeta->vgid = htonl(pTable->vgId);
pMeta->vgId = htonl(pTable->vgId);
pMeta->sversion = htons(pTable->superTable->sversion);
pMeta->precision = pDb->cfg.precision;
pMeta->numOfTags = pTable->superTable->numOfTags;
......
......@@ -553,11 +553,14 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
pDnode->privateIp = htonl(pStatus->privateIp);
pDnode->publicIp = htonl(pStatus->publicIp);
pDnode->lastReboot = htonl(pStatus->lastReboot);
pDnode->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes);
pDnode->numOfCores = htons(pStatus->numOfCores);
pDnode->diskAvailable = pStatus->diskAvailable;
pDnode->alternativeRole = pStatus->alternativeRole;
if (pDnode->numOfTotalVnodes == 0) {
pDnode->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes);
}
if (pStatus->dnodeId == 0) {
mTrace("dnode:%d, first access, privateIp:%s, name:%s, ", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName);
mgmtSetDnodeMaxVnodes(pDnode);
......
......@@ -524,7 +524,7 @@ static int32_t mgmtSetSchemaFromNormalTable(SSchema *pSchema, SNormalTableObj *p
int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp) {
pMeta->uid = htobe64(pTable->uid);
pMeta->sid = htonl(pTable->sid);
pMeta->vgid = htonl(pTable->vgId);
pMeta->vgId = htonl(pTable->vgId);
pMeta->sversion = htons(pTable->sversion);
pMeta->precision = pDb->cfg.precision;
pMeta->numOfTags = 0;
......
......@@ -654,7 +654,7 @@ int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) {
int32_t mgmtGetSuperTableMeta(SDbObj *pDb, SSuperTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp) {
pMeta->uid = htobe64(pTable->uid);
pMeta->sid = htonl(pTable->sid);
pMeta->vgid = htonl(pTable->vgId);
pMeta->vgId = htonl(pTable->vgId);
pMeta->sversion = htons(pTable->sversion);
pMeta->precision = pDb->cfg.precision;
pMeta->numOfTags = pTable->numOfTags;
......
......@@ -11,5 +11,5 @@ INCLUDE_DIRECTORIES(inc)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(query ${SRC})
TARGET_LINK_LIBRARIES(query tutil m rt)
TARGET_LINK_LIBRARIES(query tsdb tutil m rt)
ENDIF ()
\ No newline at end of file
......@@ -124,9 +124,8 @@ typedef struct tTagSchema {
typedef struct tSidSet {
int32_t numOfSids;
int32_t numOfSubSet;
STableSidExtInfo **pSids;
STableIdInfo **pTableIdList;
int32_t * starterPos; // position of each subgroup, generated according to
SColumnModel *pColumnModel;
SColumnOrderInfo orderIdx;
} tSidSet;
......
......@@ -25,6 +25,7 @@
#include "taosdef.h"
#include "tref.h"
#include "tsqlfunction.h"
#include "tarray.h"
typedef struct SData {
int32_t num;
......@@ -39,7 +40,7 @@ enum {
struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2);
typedef int (*__block_search_fn_t)(char* data, int num, int64_t key, int order);
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
typedef struct SSqlGroupbyExpr {
int16_t tableIndex;
......@@ -142,7 +143,7 @@ typedef struct SQuery {
int32_t pos;
int64_t pointsOffset; // the number of points offset to save read data
SData** sdata;
int32_t capacity;
SSingleColumnFilterInfo* pFilterInfo;
} SQuery;
......@@ -152,7 +153,6 @@ typedef struct SQueryCostSummary {
typedef struct SQueryRuntimeEnv {
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo
SQuery* pQuery;
void* pTabObj;
SData** pInterpoBuf;
SQLFunctionCtx* pCtx;
int16_t numOfRowsPerPage;
......@@ -174,15 +174,16 @@ typedef struct SQInfo {
TSKEY startTime;
int64_t elapsedTime;
SResultRec rec;
int pointsReturned;
int pointsInterpo;
int code; // error code to returned to client
int32_t pointsReturned;
int32_t pointsInterpo;
int32_t code; // error code to returned to client
int32_t killed; // denotes if current query is killed
sem_t dataReady;
SHashObj* pTableList; // table list
SArray* pTableIdList; // table list
SQueryRuntimeEnv runtimeEnv;
int32_t subgroupIdx;
int32_t offset; /* offset in group result set of subgroup */
tSidSet* pSidSet;
// tSidSet* pSidSet;
T_REF_DECLARE()
/*
......@@ -204,13 +205,13 @@ typedef struct SQInfo {
* @param pQInfo
* @return
*/
int32_t qCreateQueryInfo(void* pReadMsg, SQInfo** pQInfo);
int32_t qCreateQueryInfo(SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo);
/**
* query on single table
* @param pReadMsg
*/
void qTableQuery(void* pReadMsg);
void qTableQuery(SQInfo* pQInfo);
/**
* query on super table
......@@ -218,4 +219,13 @@ void qTableQuery(void* pReadMsg);
*/
void qSuperTableQuery(void* pReadMsg);
/**
* wait for the query completed, and retrieve final results to client
* @param pQInfo
*/
int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize);
//int32_t qBuildQueryResult(SQInfo* pQInfo, void* pBuf);
#endif // TDENGINE_QUERYEXECUTOR_H
......@@ -14,19 +14,21 @@
*/
#include "os.h"
#include "taosmsg.h"
#include "hash.h"
#include "hashfunc.h"
#include "taosmsg.h"
#include "tlog.h"
#include "tlosertree.h"
#include "tstatus.h"
#include "tscompression.h"
#include "tstatus.h"
#include "ttime.h"
#include "tlog.h"
#include "qast.h"
#include "qresultBuf.h"
#include "queryExecutor.h"
#include "queryUtil.h"
#include "tsdb.h"
#include "qresultBuf.h"
#define DEFAULT_INTERN_BUF_SIZE 16384L
......@@ -36,7 +38,6 @@
*/
#define PRIMARY_TSCOL_LOADED(query) ((query)->colList[0].data.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0)
#define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0)
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
......@@ -46,16 +47,15 @@
#define SET_SUPPLEMENT_SCAN_FLAG(runtime) ((runtime)->scanFlag = SUPPLEMENTARY_SCAN)
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)
#define GET_QINFO_ADDR(x) ((char*)(x)-offsetof(SQInfo, runtimeEnv))
#define GET_QINFO_ADDR(x) ((char *)(x)-offsetof(SQInfo, runtimeEnv))
#define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index)*(step))
#define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index) * (step))
/* get the qinfo struct address from the query struct address */
#define GET_COLUMN_BYTES(query, colidx) \
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].data.bytes)
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].info.bytes)
#define GET_COLUMN_TYPE(query, colidx) \
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].data.type)
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].info.type)
typedef struct SPointInterpoSupporter {
int32_t numOfCols;
......@@ -98,13 +98,10 @@ typedef enum {
QUERY_NO_DATA_TO_CHECK = 0x8u,
} vnodeQueryStatus;
static void setQueryStatus(SQuery *pQuery, int8_t status);
bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
int32_t setQueryCtxForTableQuery(void* pReadMsg, SQInfo** pQInfo) {
}
int32_t setQueryCtxForTableQuery(void *pReadMsg, SQInfo **pQInfo) {}
enum {
TS_JOIN_TS_EQUAL = 0,
......@@ -112,7 +109,8 @@ enum {
TS_JOIN_TAG_NOT_EQUALS = 2,
};
static int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInfo *pTableDataInfo, int32_t start, int32_t end);
static int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInfo *pTableDataInfo, int32_t start,
int32_t end);
static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult);
......@@ -234,19 +232,19 @@ bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter *
return true;
}
bool vnodeDoFilterData(SQuery* pQuery, int32_t elemPos) {
bool vnodeDoFilterData(SQuery *pQuery, int32_t elemPos) {
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k];
char* pElem = pFilterInfo->pData + pFilterInfo->info.info.bytes * elemPos;
char * pElem = pFilterInfo->pData + pFilterInfo->info.info.bytes * elemPos;
if(isNull(pElem, pFilterInfo->info.info.type)) {
if (isNull(pElem, pFilterInfo->info.info.type)) {
return false;
}
int32_t num = pFilterInfo->numOfFilters;
bool qualified = false;
for(int32_t j = 0; j < num; ++j) {
SColumnFilterElem* pFilterElem = &pFilterInfo->pFilters[j];
for (int32_t j = 0; j < num; ++j) {
SColumnFilterElem *pFilterElem = &pFilterInfo->pFilters[j];
if (pFilterElem->fp(pFilterElem, pElem, pElem)) {
qualified = true;
break;
......@@ -261,7 +259,7 @@ bool vnodeDoFilterData(SQuery* pQuery, int32_t elemPos) {
return true;
}
bool vnodeFilterData(SQuery* pQuery, int32_t* numOfActualRead, int32_t index) {
bool vnodeFilterData(SQuery *pQuery, int32_t *numOfActualRead, int32_t index) {
(*numOfActualRead)++;
if (!vnodeDoFilterData(pQuery, index)) {
return false;
......@@ -401,9 +399,9 @@ bool doRevisedResultsByLimit(SQInfo *pQInfo) {
static bool queryPaused(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo, int32_t forwardStep) {
// output buffer is full, pause current query
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
// assert((QUERY_IS_ASC_QUERY(pQuery) && forwardStep + pQuery->pos <= pDataBlockInfo->size) ||
// (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->pos - forwardStep + 1 >= 0));
//
// assert((QUERY_IS_ASC_QUERY(pQuery) && forwardStep + pQuery->pos <= pDataBlockInfo->size) ||
// (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->pos - forwardStep + 1 >= 0));
//
return true;
}
......@@ -429,7 +427,8 @@ static bool isTopBottomQuery(SQuery *pQuery) {
return false;
}
static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, int32_t columnIndex) {
static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo,
int32_t columnIndex) {
// no SField info exist, or column index larger than the output column, no result.
if (pStatis == NULL) {
return NULL;
......@@ -696,7 +695,8 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey,
}
static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo, TSKEY *pPrimaryColumn,
int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, bool updateLastKey) {
int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn,
bool updateLastKey) {
assert(startPos >= 0 && startPos < pDataBlockInfo->size);
int32_t num = -1;
......@@ -743,7 +743,7 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
}
static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus *pStatus, STimeWindow *pWin,
int32_t startPos, int32_t forwardStep, TSKEY* tsBuf) {
int32_t startPos, int32_t forwardStep, TSKEY *tsBuf) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
......@@ -869,10 +869,10 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
SColumnInfo *pColMsg = &pQuery->colList[i].info;
assert(0);
// char * pData = doGetDataBlocks(pQuery, pRuntimeEnv->colDataBuffer, pQuery->colList[i].colIdxInBuf);
// char * pData = doGetDataBlocks(pQuery, pRuntimeEnv->colDataBuffer, pQuery->colList[i].colIdxInBuf);
sas->elemSize[i] = pColMsg->bytes;
// sas->data[i] = pData + pCtx->startOffset * sas->elemSize[i]; // start from the offset
// sas->data[i] = pData + pCtx->startOffset * sas->elemSize[i]; // start from the offset
}
sas->numOfCols = pQuery->numOfCols;
......@@ -930,7 +930,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataSt
primaryKeyCol = (TSKEY *)(pColInfo->pData);
}
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0:pDataBlockInfo->size - 1;
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pDataBlockInfo->size - 1;
int64_t prevNumOfRes = getNumOfResult(pRuntimeEnv);
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutputCols, sizeof(SArithmeticSupport));
......@@ -953,9 +953,10 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataSt
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
assert(0);
// if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pTabObj->sid, &win) != TSDB_CODE_SUCCESS) {
// return 0;
// }
// if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pTabObj->sid, &win) !=
// TSDB_CODE_SUCCESS) {
// return 0;
// }
TSKEY ekey = reviseWindowEkey(pQuery, &win);
int32_t forwardStep =
......@@ -975,7 +976,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataSt
}
// null data, failed to allocate more memory buffer
// int32_t sid = pRuntimeEnv->pTabObj->sid;
// int32_t sid = pRuntimeEnv->pTabObj->sid;
int32_t sid = 0;
assert(0);
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, sid, &nextWin) != TSDB_CODE_SUCCESS) {
......@@ -1070,7 +1071,7 @@ static char *getGroupbyColumnData(SQuery *pQuery, SData **data, int16_t *type, i
*type = pQuery->colList[colIndex].info.type;
*bytes = pQuery->colList[colIndex].info.bytes;
// groupbyColumnData = doGetDataBlocks(pQuery, data, pQuery->colList[colIndex].inf);
// groupbyColumnData = doGetDataBlocks(pQuery, data, pQuery->colList[colIndex].inf);
break;
}
......@@ -1131,13 +1132,14 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
return true;
}
static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis* pStatis,
SDataBlockInfo *pDataBlockInfo, SWindowResInfo *pWindowResInfo, SArray* pDataBlock) {
static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis,
SDataBlockInfo *pDataBlockInfo, SWindowResInfo *pWindowResInfo,
SArray *pDataBlock) {
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
SQuery * pQuery = pRuntimeEnv->pQuery;
TSKEY * primaryKeyCol = (TSKEY *)taosArrayGet(pDataBlock, 0);
// SData **data = pRuntimeEnv->colDataBuffer;
// SData **data = pRuntimeEnv->colDataBuffer;
int64_t prevNumOfRes = 0;
bool groupbyStateValue = isGroupbyNormalCol(pQuery->pGroupbyExpr);
......@@ -1154,19 +1156,19 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat
char *groupbyColumnData = NULL;
if (groupbyStateValue) {
assert(0);
// groupbyColumnData = getGroupbyColumnData(pQuery, data, &type, &bytes);
// groupbyColumnData = getGroupbyColumnData(pQuery, data, &type, &bytes);
}
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
SDataStatis* pColStatis = NULL;
SDataStatis *pColStatis = NULL;
bool hasNull = hasNullValue(pQuery, k, pDataBlockInfo, pStatis, &pColStatis);
char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->size, pDataBlock);
setExecParams(pQuery, &pCtx[k], dataBlock, (char *)primaryKeyCol, pDataBlockInfo->size, functionId, pColStatis, hasNull,
&sasArray[k], pRuntimeEnv->scanFlag);
setExecParams(pQuery, &pCtx[k], dataBlock, (char *)primaryKeyCol, pDataBlockInfo->size, functionId, pColStatis,
hasNull, &sasArray[k], pRuntimeEnv->scanFlag);
}
// set the input column data
......@@ -1177,7 +1179,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat
* NOTE: here the tbname/tags column cannot reach here, since it will never be a filter column,
* so we do NOT check if is a tag or not
*/
// pFilterInfo->pData = doGetDataBlocks(pQuery, data, pFilterInfo->info.colIdxInBuf);
// pFilterInfo->pData = doGetDataBlocks(pQuery, data, pFilterInfo->info.colIdxInBuf);
}
int32_t numOfRes = 0;
......@@ -1220,7 +1222,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat
assert(0);
int32_t ret = 0;
// int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pTabObj->sid, &win);
// int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pTabObj->sid, &win);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
continue;
}
......@@ -1235,11 +1237,12 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat
STimeWindow nextWin = win;
int32_t index = pWindowResInfo->curIndex;
assert(0);
int32_t sid = 0;//pRuntimeEnv->pTabObj->sid;
int32_t sid = 0; // pRuntimeEnv->pTabObj->sid;
while (1) {
getNextTimeWindow(pQuery, &nextWin);
if (pWindowResInfo->startTime > nextWin.skey || (nextWin.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
if (pWindowResInfo->startTime > nextWin.skey ||
(nextWin.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(nextWin.skey > pQuery->window.skey && !QUERY_IS_ASC_QUERY(pQuery))) {
break;
}
......@@ -1298,7 +1301,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat
if ((pQuery->checkBufferInLoop == 1) && (++numOfRes) >= pQuery->pointsOffset) {
pQuery->lastKey = lastKey + step;
assert(0);
// *forwardStep = j + 1;
// *forwardStep = j + 1;
break;
}
}
......@@ -1479,12 +1482,9 @@ static void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool i
}
}
static int32_t setupQueryRuntimeEnv(void *pMeterObj, SQuery *pQuery, SQueryRuntimeEnv *pRuntimeEnv,
SColumnModel *pTagsSchema, int16_t order, bool isSTableQuery) {
dTrace("QInfo:%p setup runtime env", GET_QINFO_ADDR(pQuery));
pRuntimeEnv->pTabObj = pMeterObj;
pRuntimeEnv->pQuery = pQuery;
static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, SColumnModel *pTagsSchema, int16_t order, bool isSTableQuery) {
dTrace("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv));
SQuery* pQuery = pRuntimeEnv->pQuery;
pRuntimeEnv->resultInfo = calloc(pQuery->numOfOutputCols, sizeof(SResultInfo));
pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutputCols, sizeof(SQLFunctionCtx));
......@@ -1506,9 +1506,8 @@ static int32_t setupQueryRuntimeEnv(void *pMeterObj, SQuery *pQuery, SQueryRunti
pCtx->inputType = pSchema->type;
pCtx->inputBytes = pSchema->bytes;
} else {
assert(0);
// pCtx->inputType = GET_COLUMN_TYPE(pQuery, i);
// pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i);
pCtx->inputType = GET_COLUMN_TYPE(pQuery, i);
pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i);
}
pCtx->ptsOutputBuf = NULL;
......@@ -1559,12 +1558,9 @@ static int32_t setupQueryRuntimeEnv(void *pMeterObj, SQuery *pQuery, SQueryRunti
}
setCtxTagColumnInfo(pQuery, pRuntimeEnv->pCtx);
// for loading block data in memory
// assert(vnodeList[pMeterObj->vnode].cfg.rowsInFileBlock == pMeterObj->pointsPerFileBlock);
return TSDB_CODE_SUCCESS;
_error_clean:
_error_clean:
tfree(pRuntimeEnv->resultInfo);
tfree(pRuntimeEnv->pCtx);
......@@ -1725,7 +1721,7 @@ bool needSupplementaryScan(SQuery *pQuery) {
/////////////////////////////////////////////////////////////////////////////////////////////
void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast,
int64_t *realSkey, int64_t *realEkey, STimeWindow* win) {
int64_t *realSkey, int64_t *realEkey, STimeWindow *win) {
assert(key >= keyFirst && key <= keyLast && pQuery->slidingTime <= pQuery->intervalTime);
win->skey = taosGetIntervalStartTimestamp(key, pQuery->slidingTime, pQuery->slidingTimeUnit, pQuery->precision);
......@@ -1782,8 +1778,8 @@ static bool doGetQueryPos(TSKEY key, SQInfo *pQInfo, SPointInterpoSupporter *pPo
#endif
}
static bool doSetDataInfo(SQInfo *pQInfo, SPointInterpoSupporter *pPointInterpSupporter,
void *pMeterObj, TSKEY nextKey) {
static bool doSetDataInfo(SQInfo *pQInfo, SPointInterpoSupporter *pPointInterpSupporter, void *pMeterObj,
TSKEY nextKey) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
......@@ -1793,8 +1789,8 @@ static bool doSetDataInfo(SQInfo *pQInfo, SPointInterpoSupporter *pPointInterpSu
* the query range is existed, so set them both the value of nextKey
*/
if (pQuery->window.skey != pQuery->window.ekey) {
assert(pQuery->window.skey >= pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery) && nextKey >= pQuery->window.ekey &&
nextKey <= pQuery->window.skey);
assert(pQuery->window.skey >= pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery) &&
nextKey >= pQuery->window.ekey && nextKey <= pQuery->window.skey);
pQuery->window.skey = nextKey;
pQuery->window.ekey = nextKey;
......@@ -1895,8 +1891,7 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) {
pQuery->checkBufferInLoop = hasMultioutput ? 1 : 0;
}
assert(0);
// pQuery->pointsOffset = pQuery->pointsToRead;
// pQuery->pointsOffset = pQuery->pointsToRead;
}
/*
......@@ -1960,8 +1955,8 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) {
if (isPointInterpoQuery(pQuery) && pQuery->intervalTime == 0) {
if (!QUERY_IS_ASC_QUERY(pQuery)) {
dTrace(msg, GET_QINFO_ADDR(pQuery), "interp", pQuery->order.order, TSQL_SO_ASC, pQuery->window.skey, pQuery->window.ekey,
pQuery->window.ekey, pQuery->window.skey);
dTrace(msg, GET_QINFO_ADDR(pQuery), "interp", pQuery->order.order, TSQL_SO_ASC, pQuery->window.skey,
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
}
......@@ -1972,8 +1967,8 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) {
if (pQuery->intervalTime == 0) {
if (onlyFirstQuery(pQuery)) {
if (!QUERY_IS_ASC_QUERY(pQuery)) {
dTrace(msg, GET_QINFO_ADDR(pQuery), "only-first", pQuery->order.order, TSQL_SO_ASC, pQuery->window.skey, pQuery->window.ekey,
pQuery->window.ekey, pQuery->window.skey);
dTrace(msg, GET_QINFO_ADDR(pQuery), "only-first", pQuery->order.order, TSQL_SO_ASC, pQuery->window.skey,
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
}
......@@ -1981,8 +1976,8 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) {
pQuery->order.order = TSQL_SO_ASC;
} else if (onlyLastQuery(pQuery)) {
if (QUERY_IS_ASC_QUERY(pQuery)) {
dTrace(msg, GET_QINFO_ADDR(pQuery), "only-last", pQuery->order.order, TSQL_SO_DESC, pQuery->window.skey, pQuery->window.ekey,
pQuery->window.ekey, pQuery->window.skey);
dTrace(msg, GET_QINFO_ADDR(pQuery), "only-last", pQuery->order.order, TSQL_SO_DESC, pQuery->window.skey,
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
}
......@@ -1994,8 +1989,8 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) {
if (metricQuery) {
if (onlyFirstQuery(pQuery)) {
if (!QUERY_IS_ASC_QUERY(pQuery)) {
dTrace(msg, GET_QINFO_ADDR(pQuery), "only-first stable", pQuery->order.order, TSQL_SO_ASC, pQuery->window.skey,
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
dTrace(msg, GET_QINFO_ADDR(pQuery), "only-first stable", pQuery->order.order, TSQL_SO_ASC,
pQuery->window.skey, pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
}
......@@ -2003,8 +1998,8 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) {
pQuery->order.order = TSQL_SO_ASC;
} else if (onlyLastQuery(pQuery)) {
if (QUERY_IS_ASC_QUERY(pQuery)) {
dTrace(msg, GET_QINFO_ADDR(pQuery), "only-last stable", pQuery->order.order, TSQL_SO_DESC, pQuery->window.skey,
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
dTrace(msg, GET_QINFO_ADDR(pQuery), "only-last stable", pQuery->order.order, TSQL_SO_DESC,
pQuery->window.skey, pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
}
......@@ -2081,7 +2076,7 @@ static void doSetInterpVal(SQLFunctionCtx *pCtx, TSKEY ts, int16_t type, int32_t
*/
void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointInterpSupport) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
SQuery * pQuery = pRuntimeEnv->pQuery;
// not point interpolation query, abort
if (!isPointInterpoQuery(pQuery)) {
......@@ -2154,7 +2149,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
pInterpInfo->pInterpDetail = calloc(1, sizeof(SInterpInfoDetail));
SInterpInfoDetail *pInterpDetail = pInterpInfo->pInterpDetail;
// int32_t type = GET_COLUMN_TYPE(pQuery, i);
// int32_t type = GET_COLUMN_TYPE(pQuery, i);
int32_t type = 0;
assert(0);
......@@ -2188,7 +2183,7 @@ void pointInterpSupporterInit(SQuery *pQuery, SPointInterpoSupporter *pInterpoSu
len += pQuery->colList[i].info.bytes;
}
// assert(PRIMARY_TSCOL_LOADED(pQuery));
// assert(PRIMARY_TSCOL_LOADED(pQuery));
void *prev = calloc(1, len);
void *next = calloc(1, len);
......@@ -2244,10 +2239,10 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
num = 128;
} else if (isIntervalQuery(pQuery)) { // time window query, allocate one page for each table
size_t s = taosHashGetSize(pQInfo->pTableList);
size_t s = taosArrayGetSize(pQInfo->pTableIdList);
num = MAX(s, INITIAL_RESULT_ROWS_VALUE);
} else { // for super table query, one page for each subset
num = pQInfo->pSidSet->numOfSubSet;
// num = pQInfo->pSidSet->numOfSubSet;
}
assert(num > 0);
......@@ -2291,16 +2286,11 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
if (pQInfo->pTableList != NULL) {
taosHashCleanup(pQInfo->pTableList);
pQInfo->pTableList = NULL;
}
// tSidSetDestroy(&pQInfo->pSidSet);
// tSidSetDestroy(&pQInfo->pSidSet);
if (pQInfo->pTableDataInfo != NULL) {
size_t num = taosHashGetSize(pQInfo->pTableList);
for (int32_t j = 0; j < num; ++j) {
// size_t num = taosHashGetSize(pQInfo->pTableIdList);
for (int32_t j = 0; j < 0; ++j) {
destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols);
}
}
......@@ -2311,19 +2301,19 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) ||
(!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) {
dTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey, pQuery->window.ekey,
pQuery->order.order);
dTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey,
pQuery->window.ekey, pQuery->order.order);
sem_post(&pQInfo->dataReady);
// pQInfo->over = 1;
// pQInfo->over = 1;
return TSDB_CODE_SUCCESS;
}
pQuery->status = 0;
pQInfo->rec = (SResultRec) {0};
pQuery->rec = (SResultRec) {0};
pQInfo->rec = (SResultRec){0};
pQuery->rec = (SResultRec){0};
changeExecuteScanOrder(pQuery, true);
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
......@@ -2338,11 +2328,11 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
pQuery->lastKey = pQuery->window.skey;
// create runtime environment
SColumnModel *pTagSchemaInfo = pQInfo->pSidSet->pColumnModel;
// SColumnModel *pTagSchemaInfo = pQInfo->pSidSet->pColumnModel;
// get one queried meter
assert(0);
// SMeterObj *pMeter = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pSids[0]->sid);
// SMeterObj *pMeter = getMeterObj(pQInfo->pTableIdList, pQInfo->pSidSet->pTableIdList[0]->sid);
pRuntimeEnv->pTSBuf = param;
pRuntimeEnv->cur.vnodeIndex = -1;
......@@ -2354,12 +2344,12 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
}
assert(0);
// int32_t ret = setupQueryRuntimeEnv(pMeter, pQuery, &pQInfo->runtimeEnv, pTagSchemaInfo, TSQL_SO_ASC, true);
// if (ret != TSDB_CODE_SUCCESS) {
// return ret;
// }
// int32_t ret = setupQueryRuntimeEnv(pMeter, pQuery, &pQInfo->runtimeEnv, pTagSchemaInfo, TSQL_SO_ASC, true);
// if (ret != TSDB_CODE_SUCCESS) {
// return ret;
// }
// tSidSetSort(pQInfo->pSidSet);
// tSidSetSort(pQInfo->pSidSet);
int32_t size = getInitialPageNum(pQInfo);
int32_t ret = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, size, pQuery->rowSize);
......@@ -2388,10 +2378,10 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
cond.colList = *pQuery->colList;
SArray *sa = taosArrayInit(1, POINTER_BYTES);
for(int32_t i = 0; i < pQInfo->pSidSet->numOfSids; ++i) {
// SMeterObj *p1 = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pSids[i]->sid);
// taosArrayPush(sa, &p1);
}
// for(int32_t i = 0; i < pQInfo->pSidSet->numOfTables; ++i) {
// SMeterObj *p1 = getMeterObj(pQInfo->pTableIdList, pQInfo->pSidSet->pTableIdList[i]->sid);
// taosArrayPush(sa, &p1);
// }
SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0]));
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
......@@ -2405,8 +2395,8 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
pQuery->interpoType = TSDB_INTERPO_NONE;
}
TSKEY revisedStime = taosGetIntervalStartTimestamp(pQuery->window.skey, pQuery->intervalTime,
pQuery->slidingTimeUnit, pQuery->precision);
TSKEY revisedStime = taosGetIntervalStartTimestamp(pQuery->window.skey, pQuery->intervalTime, pQuery->slidingTimeUnit,
pQuery->precision);
taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, revisedStime, 0, 0);
pRuntimeEnv->stableQuery = true;
......@@ -2419,7 +2409,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
*/
void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
if (pQInfo != NULL) {
assert(taosHashGetSize(pQInfo->pTableList) >= 1);
// assert(taosHashGetSize(pQInfo->pTableIdList) >= 1);
}
#if 0
......@@ -2430,7 +2420,7 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
} else {
int32_t num = 0;
for (int32_t i = 0; i < pQInfo->numOfMeters; ++i) {
SMeterObj *pMeter = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pSids[i]->sid);
SMeterObj *pMeter = getMeterObj(pQInfo->pTableIdList, pQInfo->pSidSet->pTableIdList[i]->sid);
atomic_fetch_sub_32(&(pMeter->numOfQueries), 1);
if (pMeter->numOfQueries > 0) {
......@@ -2532,14 +2522,14 @@ static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow) {
pTimeWindow->ekey = pTimeWindow->skey + (pQuery->intervalTime - 1);
}
SArray* loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo, SDataStatis** pStatis) {
SQuery* pQuery = pRuntimeEnv->pQuery;
SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo, SDataStatis **pStatis) {
SQuery * pQuery = pRuntimeEnv->pQuery;
tsdb_query_handle_t pQueryHandle = pRuntimeEnv->pQueryHandle;
uint32_t r = 0;
SArray * pDataBlock = NULL;
// STimeWindow *w = &pQueryHandle->window;
// STimeWindow *w = &pQueryHandle->window;
if (pQuery->numOfFilterCols > 0) {
r = BLK_DATA_ALL_NEEDED;
......@@ -2548,7 +2538,7 @@ SArray* loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBl
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
int32_t colId = pQuery->pSelectExpr[i].pBase.colInfo.colId;
// r |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], w->skey, w->ekey, colId);
// r |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], w->skey, w->ekey, colId);
}
if (pRuntimeEnv->pTSBuf > 0 || isIntervalQuery(pQuery)) {
......@@ -2593,15 +2583,72 @@ SArray* loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBl
return pDataBlock;
}
int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
int firstPos, lastPos, midPos = -1;
int numOfPoints;
TSKEY *keyList;
if (num <= 0) return -1;
keyList = (TSKEY *)pValue;
firstPos = 0;
lastPos = num - 1;
if (order == 0) {
// find the first position which is smaller than the key
while (1) {
if (key >= keyList[lastPos]) return lastPos;
if (key == keyList[firstPos]) return firstPos;
if (key < keyList[firstPos]) return firstPos - 1;
numOfPoints = lastPos - firstPos + 1;
midPos = (numOfPoints >> 1) + firstPos;
if (key < keyList[midPos]) {
lastPos = midPos - 1;
} else if (key > keyList[midPos]) {
firstPos = midPos + 1;
} else {
break;
}
}
} else {
// find the first position which is bigger than the key
while (1) {
if (key <= keyList[firstPos]) return firstPos;
if (key == keyList[lastPos]) return lastPos;
if (key > keyList[lastPos]) {
lastPos = lastPos + 1;
if (lastPos >= num)
return -1;
else
return lastPos;
}
numOfPoints = lastPos - firstPos + 1;
midPos = (numOfPoints >> 1) + firstPos;
if (key < keyList[midPos]) {
lastPos = midPos - 1;
} else if (key > keyList[midPos]) {
firstPos = midPos + 1;
} else {
break;
}
}
}
return midPos;
}
static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
#if 0
SQuery *pQuery = pRuntimeEnv->pQuery;
assert(0);
// __block_search_fn_t searchFn = vnodeSearchKeyFunc[pRuntimeEnv->pTabObj->searchAlgorithm];
int64_t cnt = 0;
dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d", GET_QINFO_ADDR(pQuery),
pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order);
dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d",
GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order);
tsdb_query_handle_t pQueryHandle = pRuntimeEnv->pQueryHandle;
......@@ -2620,17 +2667,15 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SWindowResInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (QUERY_IS_ASC_QUERY(pQuery)) {
// doGetAlignedIntervalQueryRangeImpl(pQuery, blockInfo.window.skey, blockInfo.window.skey,
// pQueryHandle->window.ekey, &skey1, &ekey1, &w);
doGetAlignedIntervalQueryRangeImpl(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey, &skey1, &ekey1, &w);
pWindowResInfo->startTime = w.skey;
pWindowResInfo->prevSKey = w.skey;
} else {
// the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
TSKEY winStart = blockInfo.window.ekey - pQuery->intervalTime;
// doGetAlignedIntervalQueryRangeImpl(pQuery, winStart, pQueryHandle->window.ekey,
// blockInfo.window.ekey, &skey1, &ekey1, &w);
doGetAlignedIntervalQueryRangeImpl(pQuery, winStart, pQuery->window.ekey, blockInfo.window.ekey, &skey1, &ekey1, &w);
// pWindowResInfo->startTime = pQueryHandle->window.skey;
pWindowResInfo->startTime = pQuery->window.skey;
pWindowResInfo->prevSKey = w.skey;
}
}
......@@ -2639,20 +2684,17 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SDataStatis *pStatis = NULL;
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis);
// int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, searchFn, &numOfRes,
// &pRuntimeEnv->windowResInfo, pDataBlock);
int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, &numOfRes,
&pRuntimeEnv->windowResInfo, pDataBlock);
// dTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", fileId:%d, slot:%d, pos:%d, rows:%d, checked:%d",
// GET_QINFO_ADDR(pQuery), blockInfo.window.skey, blockInfo.window.ekey, pQueryHandle->cur.fileId, pQueryHandle->cur.slot,
// pQuery->pos, blockInfo.size, forwardStep);
// save last access position
// cnt += forwardStep;
// if (queryPaused(pQuery, &blockInfo, forwardStep)) {
// if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
// break;
// }
cnt += forwardStep;
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
break;
}
}
......@@ -2674,8 +2716,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
}
return cnt;
#endif
return 0;
}
static void updatelastkey(SQuery *pQuery, STableQueryInfo *pTableQInfo) { pTableQInfo->lastKey = pQuery->lastKey; }
......@@ -2684,8 +2724,7 @@ static void updatelastkey(SQuery *pQuery, STableQueryInfo *pTableQInfo) { pTable
* set tag value in SQLFunctionCtx
* e.g.,tag information into input buffer
*/
static void doSetTagValueInParam(SColumnModel *pTagSchema, int32_t tagColIdx, void *pMeterSidInfo,
tVariant *param) {
static void doSetTagValueInParam(SColumnModel *pTagSchema, int32_t tagColIdx, void *pMeterSidInfo, tVariant *param) {
assert(tagColIdx >= 0);
#if 0
int16_t offset = getColumnModelOffset(pTagSchema, tagColIdx);
......@@ -2886,9 +2925,9 @@ void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOf
}
typedef struct SCompSupporter {
STableDataInfo ** pTableDataInfo;
STableDataInfo **pTableDataInfo;
int32_t * position;
SQInfo *pQInfo;
SQInfo * pQInfo;
} SCompSupporter;
int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) {
......@@ -2937,29 +2976,29 @@ int32_t mergeMetersResultToOneGroups(SQInfo *pQInfo) {
int64_t st = taosGetTimestampMs();
int32_t ret = TSDB_CODE_SUCCESS;
while (pQInfo->subgroupIdx < pQInfo->pSidSet->numOfSubSet) {
int32_t start = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx];
int32_t end = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1];
assert(0);
// ret = doMergeMetersResultsToGroupRes(pQInfo, pQuery, pRuntimeEnv, pQInfo->pTableDataInfo, start, end);
if (ret < 0) { // not enough disk space to save the data into disk
return -1;
}
pQInfo->subgroupIdx += 1;
// this group generates at least one result, return results
if (ret > 0) {
break;
}
assert(pQInfo->numOfGroupResultPages == 0);
dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1);
}
dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", GET_QINFO_ADDR(pQuery),
pQInfo->subgroupIdx - 1, pQInfo->pSidSet->numOfSubSet, taosGetTimestampMs() - st);
// while (pQInfo->subgroupIdx < pQInfo->pSidSet->numOfSubSet) {
// int32_t start = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx];
// int32_t end = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1];
//
// assert(0);
// // ret = doMergeMetersResultsToGroupRes(pQInfo, pQuery, pRuntimeEnv, pQInfo->pTableDataInfo, start, end);
// if (ret < 0) { // not enough disk space to save the data into disk
// return -1;
// }
//
// pQInfo->subgroupIdx += 1;
//
// // this group generates at least one result, return results
// if (ret > 0) {
// break;
// }
//
// assert(pQInfo->numOfGroupResultPages == 0);
// dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1);
// }
//
// dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", GET_QINFO_ADDR(pQuery),
// pQInfo->subgroupIdx - 1, pQInfo->pSidSet->numOfSubSet, taosGetTimestampMs() - st);
return TSDB_CODE_SUCCESS;
}
......@@ -2974,10 +3013,10 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
}
// set current query completed
if (pQInfo->numOfGroupResultPages == 0 && pQInfo->subgroupIdx == pQInfo->pSidSet->numOfSubSet) {
pQInfo->tableIndex = pQInfo->pSidSet->numOfSids;
return;
}
// if (pQInfo->numOfGroupResultPages == 0 && pQInfo->subgroupIdx == pQInfo->pSidSet->numOfSubSet) {
// pQInfo->tableIndex = pQInfo->pSidSet->numOfTables;
// return;
// }
}
SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv;
......@@ -3040,8 +3079,8 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pW
}
int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInfo *pTableDataInfo, int32_t start, int32_t end) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pQInfo->runtimeEnv.pQuery;
tFilePage ** buffer = (tFilePage **)pQuery->sdata;
int32_t * posList = calloc((end - start), sizeof(int32_t));
......@@ -3107,13 +3146,13 @@ int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInfo *pTableDat
doMerge(pRuntimeEnv, ts, pWindowRes, true);
} else { // copy data to disk buffer
assert(0);
// if (buffer[0]->numOfElems == pQuery->pointsToRead) {
// if (flushFromResultBuf(pQInfo) != TSDB_CODE_SUCCESS) {
// return -1;
// }
// if (buffer[0]->numOfElems == pQuery->pointsToRead) {
// if (flushFromResultBuf(pQInfo) != TSDB_CODE_SUCCESS) {
// return -1;
// }
// resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo);
// }
// resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo);
// }
doMerge(pRuntimeEnv, ts, pWindowRes, false);
buffer[0]->numOfElems += 1;
......@@ -3169,8 +3208,8 @@ int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInfo *pTableDat
}
int32_t flushFromResultBuf(SQInfo *pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
int32_t capacity = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / pQuery->rowSize;
......@@ -3284,7 +3323,7 @@ void disableFunctForSuppleScan(SQInfo *pQInfo, int32_t order) {
}
if (isIntervalQuery(pQuery)) {
size_t numOfTables = taosHashGetSize(pQInfo->pTableList);
size_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList);
for (int32_t i = 0; i < numOfTables; ++i) {
STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
......@@ -3322,7 +3361,6 @@ void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTa
void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
// int32_t rows = pRuntimeEnv->pTabObj->pointsPerFileBlock;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
......@@ -3341,8 +3379,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
}
assert(0);
// memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * rows);
memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * pQuery->capacity);
}
initCtxOutputBuf(pRuntimeEnv);
......@@ -3397,7 +3434,7 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery->limit.offset -= pQuery->rec.pointsRead;
pQuery->rec.pointsRead = 0;
// pQuery->pointsOffset = pQuery->rec.pointsToRead; // clear all data in result buffer
// pQuery->pointsOffset = pQuery->rec.pointsToRead; // clear all data in result buffer
resetCtxOutputBuf(pRuntimeEnv);
......@@ -3411,7 +3448,7 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
assert(0);
// memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->pointsRead * bytes);
// memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->pointsRead * bytes);
pRuntimeEnv->pCtx[i].aOutputBuf += bytes * numOfSkip;
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
......@@ -3597,7 +3634,8 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
doSingleMeterSupplementScan(pRuntimeEnv);
// update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query during supplementary scan
// update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query during
// supplementary scan
pQuery->lastKey = lkey;
pQuery->window.ekey = ekey;
......@@ -3659,7 +3697,10 @@ STableQueryInfo *createMeterQueryInfo(SQInfo *pQInfo, int32_t sid, TSKEY skey, T
STableQueryInfo *pTableQueryInfo = calloc(1, sizeof(STableQueryInfo));
pTableQueryInfo->win = (STimeWindow) {.skey = skey, .ekey = ekey,};
pTableQueryInfo->win = (STimeWindow){
.skey = skey,
.ekey = ekey,
};
pTableQueryInfo->lastKey = skey;
pTableQueryInfo->sid = sid;
......@@ -3716,8 +3757,8 @@ void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *p
* @param pRuntimeEnv
* @param pDataBlockInfoEx
*/
void setExecutionContext(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo, int32_t meterIdx,
int32_t groupIdx, TSKEY nextKey) {
void setExecutionContext(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo, int32_t meterIdx, int32_t groupIdx,
TSKEY nextKey) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SWindowResInfo * pWindowResInfo = &pRuntimeEnv->windowResInfo;
int32_t GROUPRESULTID = 1;
......@@ -3774,7 +3815,7 @@ int32_t setAdditionalInfo(SQInfo *pQInfo, int32_t meterIdx, STableQueryInfo *pTa
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
assert(pTableQueryInfo->lastKey > 0);
// vnodeSetTagValueInParam(pQInfo->pSidSet, pRuntimeEnv, pQInfo->pMeterSidExtInfo[meterIdx]);
// vnodeSetTagValueInParam(pQInfo->pSidSet, pRuntimeEnv, pQInfo->pMeterSidExtInfo[meterIdx]);
// both the master and supplement scan needs to set the correct ts comp start position
if (pRuntimeEnv->pTSBuf != NULL) {
......@@ -3882,7 +3923,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) {
totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo);
} else {
totalSubset = pQInfo->pSidSet->numOfSubSet;
// totalSubset = pQInfo->pSidSet->numOfSubSet;
}
return totalSubset;
......@@ -3924,13 +3965,13 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
* current output space is not enough to keep all the result data of this group, only copy partial results
* to SQuery object's result buffer
*/
// if (numOfRowsToCopy > pQuery->pointsToRead - numOfResult) {
// numOfRowsToCopy = pQuery->pointsToRead - numOfResult;
// pQInfo->offset += numOfRowsToCopy;
// } else {
// pQInfo->offset = 0;
// pQInfo->subgroupIdx += 1;
// }
// if (numOfRowsToCopy > pQuery->pointsToRead - numOfResult) {
// numOfRowsToCopy = pQuery->pointsToRead - numOfResult;
// pQInfo->offset += numOfRowsToCopy;
// } else {
// pQInfo->offset = 0;
// pQInfo->subgroupIdx += 1;
// }
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
int32_t size = pRuntimeEnv->pCtx[j].outputBytes;
......@@ -3942,9 +3983,9 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
numOfResult += numOfRowsToCopy;
assert(0);
// if (numOfResult == pQuery->rec.pointsToRead) {
// break;
// }
// if (numOfResult == pQuery->rec.pointsToRead) {
// break;
// }
}
dTrace("QInfo:%p copy data to SQuery buf completed", GET_QINFO_ADDR(pQuery));
......@@ -3971,7 +4012,7 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) {
int32_t numOfResult = doCopyToSData(pQInfo, result, orderType);
pQuery->rec.pointsRead += numOfResult;
// assert(pQuery->rec.pointsRead <= pQuery->pointsToRead);
// assert(pQuery->rec.pointsRead <= pQuery->pointsToRead);
}
static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableDataInfo *pTableDataInfo) {
......@@ -3989,9 +4030,8 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableDataInf
}
}
void stableApplyFunctionsOnBlock_(SQInfo *pQInfo, STableDataInfo *pTableDataInfo,
SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis, SArray* pDataBlock,
__block_search_fn_t searchFn) {
void stableApplyFunctionsOnBlock_(SQInfo *pQInfo, STableDataInfo *pTableDataInfo, SDataBlockInfo *pDataBlockInfo,
SDataStatis *pStatis, SArray *pDataBlock, __block_search_fn_t searchFn) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
STableQueryInfo * pTableQueryInfo = pTableDataInfo->pTableQInfo;
......@@ -3999,7 +4039,7 @@ void stableApplyFunctionsOnBlock_(SQInfo *pQInfo, STableDataInfo *pTableDataInfo
int32_t numOfRes = 0;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) {
// numOfRes = rowwiseApplyAllFunctions(pRuntimeEnv, &forwardStep, pFields, pDataBlockInfo, pWindowResInfo);
// numOfRes = rowwiseApplyAllFunctions(pRuntimeEnv, &forwardStep, pFields, pDataBlockInfo, pWindowResInfo);
} else {
numOfRes = blockwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
}
......@@ -4067,9 +4107,11 @@ bool vnodeHasRemainResults(void *handle) {
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) {
TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->intervalTime,
pQuery->slidingTimeUnit, pQuery->precision);
// int32_t numOfTotal = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pRuntimeEnv->pInterpoBuf[0]->data,
// remain, pQuery->intervalTime, ekey, pQuery->pointsToRead);
// return numOfTotal > 0;
// int32_t numOfTotal = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY
// *)pRuntimeEnv->pInterpoBuf[0]->data,
// remain, pQuery->intervalTime, ekey,
// pQuery->pointsToRead);
// return numOfTotal > 0;
assert(0);
return false;
}
......@@ -4214,7 +4256,6 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage
}
}
#endif
}
void vnodePrintQueryStatistics(SQInfo *pQInfo) {
......@@ -4262,49 +4303,44 @@ void vnodePrintQueryStatistics(SQInfo *pQInfo) {
#endif
}
int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *pMeterObj, void *param) {
int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *param) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
int32_t code = TSDB_CODE_SUCCESS;
//only the successful complete requries the sem_post/over = 1 operations.
// only the successful complete requries the sem_post/over = 1 operations.
if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) ||
(!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) {
dTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey, pQuery->window.ekey,
pQuery->order.order);
dTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey,
pQuery->window.ekey, pQuery->order.order);
sem_post(&pQInfo->dataReady);
// pQInfo->over = 1;
pQInfo->killed = 1;
return TSDB_CODE_SUCCESS;
}
setScanLimitationByResultBuffer(pQuery);
changeExecuteScanOrder(pQuery, false);
// pQInfo->over = 0;
pQInfo->rec = (SResultRec) {0};
// pQuery->pointsRead = 0;
pQInfo->rec = (SResultRec){0};
// dataInCache requires lastKey value
pQuery->lastKey = pQuery->window.skey;
STsdbQueryCond cond = {0};
cond.twindow = (STimeWindow){.skey = pQuery->window.skey, .ekey = pQuery->window.ekey};
cond.order = pQuery->order.order;
cond.twindow = (STimeWindow) {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey};
cond.order = pQuery->order.order;
cond.colList = *pQuery->colList;
SArray *sa = taosArrayInit(1, POINTER_BYTES);
taosArrayPush(sa, &pMeterObj);
SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0]));
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
taosArrayPush(cols, &pQuery->colList[i]);
}
pQInfo->runtimeEnv.pQueryHandle = tsdbQueryByTableId(&cond, sa, cols);
pQInfo->runtimeEnv.pQueryHandle = tsdbQueryByTableId(&cond, pQInfo->pTableIdList, cols);
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
pRuntimeEnv->pQuery = pQuery;
pRuntimeEnv->pTabObj = pMeterObj;
pRuntimeEnv->pTSBuf = param;
pRuntimeEnv->cur.vnodeIndex = -1;
......@@ -4314,7 +4350,7 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *pMeterObj, void *param) {
}
// create runtime environment
code = setupQueryRuntimeEnv(pMeterObj, pQuery, &pQInfo->runtimeEnv, NULL, pQuery->order.order, false);
code = setupQueryRuntimeEnv(&pQInfo->runtimeEnv, NULL, pQuery->order.order, false);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -4338,7 +4374,6 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *pMeterObj, void *param) {
initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, rows, 4096, type);
}
/* query on single table */
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
SPointInterpoSupporter interpInfo = {0};
......@@ -4371,7 +4406,7 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *pMeterObj, void *param) {
int64_t rs = taosGetIntervalStartTimestamp(pQuery->window.skey, pQuery->intervalTime, pQuery->slidingTimeUnit,
pQuery->precision);
taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, rs, 0, 0);
// allocMemForInterpo(pQInfo, pQuery, pMeterObj);
// allocMemForInterpo(pQInfo, pQuery, pMeterObj);
if (!isPointInterpoQuery(pQuery)) {
// assert(pQuery->pos >= 0 && pQuery->slot >= 0);
......@@ -4384,7 +4419,6 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *pMeterObj, void *param) {
return TSDB_CODE_SUCCESS;
}
static bool isGroupbyEachTable(SSqlGroupbyExpr *pGroupbyExpr, tSidSet *pSidset) {
if (pGroupbyExpr == NULL || pGroupbyExpr->numOfGroupCols == 0) {
return false;
......@@ -4393,7 +4427,7 @@ static bool isGroupbyEachTable(SSqlGroupbyExpr *pGroupbyExpr, tSidSet *pSidset)
for (int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) {
SColIndexEx *pColIndex = &pGroupbyExpr->columnInfo[i];
if (pColIndex->flag == TSDB_COL_TAG) {
assert(pSidset->numOfSids == pSidset->numOfSubSet);
// assert(pSidset->numOfTables == pSidset->numOfSubSet);
return true;
}
}
......@@ -4410,8 +4444,6 @@ static bool doCheckWithPrevQueryRange(SQuery *pQuery, TSKEY nextKey) {
return true;
}
static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
......@@ -4424,13 +4456,13 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
}
static void queryOnDataBlocks(SQInfo *pQInfo, STableDataInfo *pMeterDataInfo) {
SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
// SMeterObj * pTempMeter = getMeterObj(pSupporter->pMetersHashTable, pSupporter->pMeterSidExtInfo[0]->sid);
// __block_search_fn_t searchFn = vnodeSearchKeyFunc[pTempMeter->searchAlgorithm];
// SMeterObj * pTempMeter = getMeterObj(pSupporter->pMetersHashTable, pSupporter->pMeterSidExtInfo[0]->sid);
// __block_search_fn_t searchFn = vnodeSearchKeyFunc[pTempMeter->searchAlgorithm];
// dTrace("QInfo:%p start to check data blocks in %d files", pQInfo, pVnodeFileInfo->numOfFiles);
// dTrace("QInfo:%p start to check data blocks in %d files", pQInfo, pVnodeFileInfo->numOfFiles);
tsdb_query_handle_t *pQueryHandle = pRuntimeEnv->pQueryHandle;
while (tsdbNextDataBlock(pQueryHandle)) {
......@@ -4441,52 +4473,51 @@ static void queryOnDataBlocks(SQInfo *pQInfo, STableDataInfo *pMeterDataInfo) {
// prepare the STableDataInfo struct for each table
SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle);
// SMeterObj * pMeterObj = getMeterObj(pSupporter->pMetersHashTable, blockInfo.sid);
// SMeterObj * pMeterObj = getMeterObj(pSupporter->pMetersHashTable, blockInfo.sid);
// pQInfo->pObj = pMeterObj;
// pRuntimeEnv->pMeterObj = pMeterObj;
// pQInfo->pObj = pMeterObj;
// pRuntimeEnv->pMeterObj = pMeterObj;
STableDataInfo *pTableDataInfo = NULL;
// for (int32_t i = 0; i < pSupporter->pSidSet->numOfSids; ++i) {
// if (pMeterDataInfo[i].pMeterObj == pMeterObj) {
// pTableDataInfo = &pMeterDataInfo[i];
// break;
// }
// }
// for (int32_t i = 0; i < pSupporter->pSidSet->numOfTables; ++i) {
// if (pMeterDataInfo[i].pMeterObj == pMeterObj) {
// pTableDataInfo = &pMeterDataInfo[i];
// break;
// }
// }
assert(pTableDataInfo != NULL);
STableQueryInfo *pTableQueryInfo = pTableDataInfo->pTableQInfo;
if (pTableDataInfo->pTableQInfo == NULL) {
// pTableDataInfo->pTableQInfo = createMeterQueryInfo(pQInfo, pMeterObj->sid, pQuery->skey, pQuery->ekey);
// pTableDataInfo->pTableQInfo = createMeterQueryInfo(pQInfo, pMeterObj->sid, pQuery->skey, pQuery->ekey);
}
restoreIntervalQueryRange(pRuntimeEnv, pTableQueryInfo);
SDataStatis *pStatis = NULL;
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis);
SArray * pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis);
TSKEY nextKey = blockInfo.window.ekey;
if (pQuery->intervalTime == 0) {
setExecutionContext(pQInfo, pTableQueryInfo, pTableDataInfo->tableIndex, pTableDataInfo->groupIdx,
nextKey);
setExecutionContext(pQInfo, pTableQueryInfo, pTableDataInfo->tableIndex, pTableDataInfo->groupIdx, nextKey);
} else { // interval query
setIntervalQueryRange(pTableQueryInfo, pQInfo, nextKey);
int32_t ret = setAdditionalInfo(pQInfo, pTableDataInfo->tableIndex, pTableQueryInfo);
if (ret != TSDB_CODE_SUCCESS) {
// pQInfo->killed = 1;
// pQInfo->killed = 1;
return;
}
}
// stableApplyFunctionsOnBlock_(pSupporter, pTableDataInfo, &blockInfo, pStatis, pDataBlock, searchFn);
// stableApplyFunctionsOnBlock_(pSupporter, pTableDataInfo, &blockInfo, pStatis, pDataBlock, searchFn);
}
}
static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool *dataInCache, int32_t index,
int32_t start) {
// SMeterSidExtInfo **pMeterSidExtInfo = pQInfo->pMeterSidExtInfo;
SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv;
// STableIdInfo **pMeterSidExtInfo = pQInfo->pMeterSidExtInfo;
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = &pRuntimeEnv->pQuery;
#if 0
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
......@@ -4540,7 +4571,7 @@ static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool *
static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
SQuery * pQuery = pRuntimeEnv->pQuery;
bool dataInDisk = true;
bool dataInCache = true;
......@@ -4548,15 +4579,14 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start
return 0;
}
SPointInterpoSupporter pointInterpSupporter = {0};
pointInterpSupporterInit(pQuery, &pointInterpSupporter);
assert(0);
// if (!normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &pointInterpSupporter, NULL)) {
// pointInterpSupporterDestroy(&pointInterpSupporter);
// return 0;
// }
// if (!normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &pointInterpSupporter, NULL)) {
// pointInterpSupporterDestroy(&pointInterpSupporter);
// return 0;
// }
/*
* here we set the value for before and after the specified time into the
......@@ -4590,11 +4620,11 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start
* @param pQInfo
*/
static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
#if 0
SQuery* pQuery = pRuntimeEnv->pQuery;
// tSidSet *pSids = pSupporter->pSidSet;
// tSidSet *pTableIdList = pSupporter->pSidSet;
int32_t vid = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[0]->sid)->vnode;
......@@ -4603,12 +4633,12 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0);
while (pSupporter->subgroupIdx < pSids->numOfSubSet) {
int32_t start = pSids->starterPos[pSupporter->subgroupIdx];
int32_t end = pSids->starterPos[pSupporter->subgroupIdx + 1] - 1;
while (pSupporter->subgroupIdx < pTableIdList->numOfSubSet) {
int32_t start = pTableIdList->starterPos[pSupporter->subgroupIdx];
int32_t end = pTableIdList->starterPos[pSupporter->subgroupIdx + 1] - 1;
if (isFirstLastRowQuery(pQuery)) {
dTrace("QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pSids->numOfSubSet,
dTrace("QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet,
pSupporter->subgroupIdx);
TSKEY key = -1;
......@@ -4641,7 +4671,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
int64_t num = doCheckMetersInGroup(pQInfo, index, start);
assert(num >= 0);
} else {
dTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pSids->numOfSubSet,
dTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet,
pSupporter->subgroupIdx);
for (int32_t k = start; k <= end; ++k) {
......@@ -4686,7 +4716,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
}
}
if (pSupporter->meterIdx >= pSids->numOfSids) {
if (pSupporter->meterIdx >= pTableIdList->numOfTables) {
return;
}
......@@ -4752,7 +4782,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
// the limitation of output result is reached, set the query completed
if (doRevisedResultsByLimit(pQInfo)) {
pSupporter->meterIdx = pSupporter->pSidSet->numOfSids;
pSupporter->meterIdx = pSupporter->pSidSet->numOfTables;
break;
}
......@@ -4839,7 +4869,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
dTrace(
"QInfo %p vid:%d, numOfMeters:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d,"
"next skey:%" PRId64 ", offset:%" PRId64,
pQInfo, vid, pSids->numOfSids, pSupporter->meterIdx, pSids->numOfSubSet, pQuery->pointsRead, pQInfo->pointsRead,
pQInfo, vid, pTableIdList->numOfTables, pSupporter->meterIdx, pTableIdList->numOfSubSet, pQuery->pointsRead, pQInfo->pointsRead,
pQInfo->pointsReturned, pQuery->skey, pQuery->limit.offset);
#endif
}
......@@ -4848,15 +4878,15 @@ static void doOrderedScan(SQInfo *pQInfo) {
SQuery *pQuery = &pQInfo->runtimeEnv.pQuery;
#if 0
// if (pQInfo->runtimeEnv. == NULL) {
// pSupporter->pMeterDataInfo = calloc(pSupporter->pSidSet->numOfSids, sizeof(STableDataInfo));
// pSupporter->pMeterDataInfo = calloc(pSupporter->pSidSet->numOfTables, sizeof(STableDataInfo));
// }
SMeterSidExtInfo **pMeterSidExtInfo = pSupporter->pMeterSidExtInfo;
STableIdInfo **pMeterSidExtInfo = pSupporter->pMeterSidExtInfo;
tSidSet* pSidset = pSupporter->pSidSet;
int32_t groupId = 0;
for (int32_t i = 0; i < pSidset->numOfSids; ++i) { // load all meter meta info
for (int32_t i = 0; i < pSidset->numOfTables; ++i) { // load all meter meta info
SMeterObj *pMeterObj = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[i]->sid);
if (pMeterObj == NULL) {
dError("QInfo:%p failed to find required sid:%d", pQInfo, pMeterSidExtInfo[i]->sid);
......@@ -4884,16 +4914,16 @@ static void doOrderedScan(SQInfo *pQInfo) {
static void setupMeterQueryInfoForSupplementQuery(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
int32_t num = taosHashGetSize(pQInfo->pTableList);
int32_t num = taosHashGetSize(pQInfo->pTableIdList);
for (int32_t i = 0; i < num; ++i) {
// STableQueryInfo *pTableQueryInfo = pSupporter->pMeterDataInfo[i].pTableQInfo;
// changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo, pSupporter->rawSKey, pSupporter->rawEKey);
// STableQueryInfo *pTableQueryInfo = pSupporter->pMeterDataInfo[i].pTableQInfo;
// changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo, pSupporter->rawSKey, pSupporter->rawEKey);
}
}
static void doMultiMeterSupplementaryScan(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
SQuery * pQuery = pRuntimeEnv->pQuery;
if (!needSupplementaryScan(pQuery)) {
dTrace("QInfo:%p no need to do supplementary scan, query completed", pQInfo);
......@@ -4901,7 +4931,7 @@ static void doMultiMeterSupplementaryScan(SQInfo *pQInfo) {
}
SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv);
// disableFunctForSuppleScan(pSupporter, pQuery->order.order);
// disableFunctForSuppleScan(pSupporter, pQuery->order.order);
if (pRuntimeEnv->pTSBuf != NULL) {
pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1u;
......@@ -4933,7 +4963,7 @@ static void doMultiMeterSupplementaryScan(SQInfo *pQInfo) {
}
static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
if (pQInfo->subgroupIdx > 0) {
......@@ -4954,7 +4984,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
pQInfo->rec.pointsRead += pQuery->rec.pointsRead;
if (pQuery->rec.pointsRead == 0) {
// vnodePrintQueryStatistics(pSupporter);
// vnodePrintQueryStatistics(pSupporter);
}
dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->rec.pointsRead,
......@@ -5023,7 +5053,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
* select count(*)/top(field,k)/avg(field name) from table_name [where ts>now-1a];
* select count(*) from table_name group by status_column;
*/
static void vnodeSingleTableFixedOutputProcessor(SQInfo *pQInfo) {
static void tableFixedOutputProcessor(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
......@@ -5050,7 +5080,7 @@ static void vnodeSingleTableFixedOutputProcessor(SQInfo *pQInfo) {
pQInfo->rec.pointsRead = pQuery->rec.pointsRead;
}
static void vnodeSingleTableMultiOutputProcessor(SQInfo *pQInfo) {
static void tableMultiOutputProcessor(SQInfo *pQInfo) {
#if 0
SQuery * pQuery = &pQInfo->query;
SMeterObj *pMeterObj = pQInfo->pObj;
......@@ -5146,7 +5176,7 @@ static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) {
}
// load the data block for the next retrieve
// loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos);
// loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos);
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
break;
}
......@@ -5154,11 +5184,11 @@ static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) {
}
/* handle time interval query on single table */
static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) {
// STable *pMeterObj = pQInfo->pObj;
static void tableIntervalProcessor(SQInfo *pQInfo) {
// STable *pMeterObj = pQInfo->pObj;
SQueryRuntimeEnv * pRuntimeEnv = &(pQInfo->runtimeEnv);
SQuery* pQuery = pRuntimeEnv->pQuery;
SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->runtimeEnv);
SQuery * pQuery = pRuntimeEnv->pQuery;
int32_t numOfInterpo = 0;
......@@ -5187,8 +5217,8 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) {
}
numOfInterpo = 0;
pQuery->rec.pointsRead = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf,
pQuery->rec.pointsRead, &numOfInterpo);
pQuery->rec.pointsRead = vnodeQueryResultInterpolate(
pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf, pQuery->rec.pointsRead, &numOfInterpo);
dTrace("QInfo: %p interpo completed, final:%d", pQInfo, pQuery->rec.pointsRead);
if (pQuery->rec.pointsRead > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) {
......@@ -5212,36 +5242,24 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) {
pQInfo->rec.pointsRead += pQuery->rec.pointsRead;
pQInfo->pointsInterpo += numOfInterpo;
// dTrace("%p vid:%d sid:%d id:%s, %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d",
// pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo,
// pQInfo->pointsRead - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned);
// dTrace("%p vid:%d sid:%d id:%s, %d points returned %d points interpo, totalRead:%d totalInterpo:%d
// totalReturn:%d",
// pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo,
// pQInfo->pointsRead - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned);
}
void qTableQuery(void* pReadMsg) {
// SQInfo *pQInfo = (SQInfo *)pReadMsg->ahandle;
void qTableQuery(SQInfo* pQInfo) {
assert(pQInfo != NULL);
#if 0
if (pQInfo == NULL) {
dTrace("%p freed abort query", pQInfo);
if (pQInfo->killed) {
dTrace("QInfo:%p it is already killed, abort", pQInfo);
return;
}
// if (pQInfo->killed) {
// dTrace("QInfo:%p it is already killed, abort", pQInfo);
// vnodeDecRefCount(pQInfo);
//
// return;
// }
// assert(pQInfo->refCount >= 1);
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = &pRuntimeEnv->pQuery;
// assert(pRuntimeEnv->pMeterObj == pMeterObj);
SQuery *pQuery = pRuntimeEnv->pQuery;
// dTrace("vid:%d sid:%d id:%s, query thread is created, numOfQueries:%d, QInfo:%p", pMeterObj->vnode, pMeterObj->sid,
// pMeterObj->meterId, pMeterObj->numOfQueries, pQInfo);
// dTrace("vid:%d sid:%d id:%s, query thread is created, numOfQueries:%d, QInfo:%p", pQInfo);
if (vnodeHasRemainResults(pQInfo)) {
/*
......@@ -5259,15 +5277,9 @@ void qTableQuery(void* pReadMsg) {
pQInfo->pointsInterpo += numOfInterpo;
pQInfo->rec.pointsRead += pQuery->rec.pointsRead;
// dTrace(
// "QInfo:%p vid:%d sid:%d id:%s, %d points returned %d points interpo, totalRead:%d totalInterpo:%d "
// "totalReturn:%d",
// pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo,
// pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned);
// dTrace("QInfo:%p %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d",
// pQInfo, pQuery->pointsRead, numOfInterpo, pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned);
sem_post(&pQInfo->dataReady);
// vnodeDecRefCount(pQInfo);
return;
}
......@@ -5292,26 +5304,20 @@ void qTableQuery(void* pReadMsg) {
// pQInfo->pointsInterpo, pQInfo->pointsReturned);
sem_post(&pQInfo->dataReady);
// vnodeDecRefCount(pQInfo);
return;
}
}
}
assert(0);
// pQInfo->over = 1;
// dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode, pMeterObj->sid,
// pMeterObj->meterId, pQInfo->pointsRead);
// vnodePrintQueryStatistics(pSupporter);
sem_post(&pQInfo->dataReady);
// vnodeDecRefCount(pQInfo);
return;
}
/* number of points returned during this query */
// number of points returned during this query
pQuery->rec.pointsRead = 0;
int64_t st = taosGetTimestampUs();
......@@ -5319,14 +5325,15 @@ void qTableQuery(void* pReadMsg) {
// group by normal column, sliding window query, interval query are handled by interval query processor
if (pQuery->intervalTime != 0 || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation)
// assert(pQuery->checkBufferInLoop == 0 && pQuery->pointsOffset == pQuery->pointsToRead);
vnodeSingleTableIntervalProcessor(pQInfo);
tableIntervalProcessor(pQInfo);
} else {
if (isFixedOutputQuery(pQuery)) {
assert(pQuery->checkBufferInLoop == 0);
vnodeSingleTableFixedOutputProcessor(pQInfo);
tableFixedOutputProcessor(pQInfo);
} else { // diff/add/multiply/subtract/division
assert(pQuery->checkBufferInLoop == 1);
vnodeSingleTableMultiOutputProcessor(pQInfo);
tableMultiOutputProcessor(pQInfo);
}
}
......@@ -5344,7 +5351,6 @@ void qTableQuery(void* pReadMsg) {
sem_post(&pQInfo->dataReady);
// vnodeDecRefCount(pQInfo);
#endif
}
void qSuperTableQuery(void *pReadMsg) {
......@@ -5395,3 +5401,862 @@ void qSuperTableQuery(void *pReadMsg) {
// vnodeDecRefCount(pQInfo);
#endif
}
static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryTableMsg, SSqlFuncExprMsg *pExprMsg) {
int32_t j = 0;
while (j < pQueryTableMsg->numOfCols) {
if (pExprMsg->colInfo.colId == pQueryTableMsg->colList[j].colId) {
break;
}
j += 1;
}
return j;
}
bool vnodeValidateExprColumnInfo(SQueryTableMsg *pQueryTableMsg, SSqlFuncExprMsg *pExprMsg) {
int32_t j = getColumnIndexInSource(pQueryTableMsg, pExprMsg);
return j < pQueryTableMsg->numOfCols;
}
static int32_t validateQueryMeterMsg(SQueryTableMsg *pQueryTableMsg) {
if (pQueryTableMsg->intervalTime < 0) {
dError("qmsg:%p illegal value of aggTimeInterval %" PRId64 "", pQueryTableMsg, pQueryTableMsg->intervalTime);
return -1;
}
if (pQueryTableMsg->numOfTagsCols < 0 || pQueryTableMsg->numOfTagsCols > TSDB_MAX_TAGS + 1) {
dError("qmsg:%p illegal value of numOfTagsCols %d", pQueryTableMsg, pQueryTableMsg->numOfTagsCols);
return -1;
}
if (pQueryTableMsg->numOfCols <= 0 || pQueryTableMsg->numOfCols > TSDB_MAX_COLUMNS) {
dError("qmsg:%p illegal value of numOfCols %d", pQueryTableMsg, pQueryTableMsg->numOfCols);
return -1;
}
if (pQueryTableMsg->numOfTables <= 0) {
dError("qmsg:%p illegal value of numOfTables %d", pQueryTableMsg, pQueryTableMsg->numOfTables);
return -1;
}
if (pQueryTableMsg->numOfGroupCols < 0) {
dError("qmsg:%p illegal value of numOfGroupbyCols %d", pQueryTableMsg, pQueryTableMsg->numOfGroupCols);
return -1;
}
if (pQueryTableMsg->numOfOutputCols > TSDB_MAX_COLUMNS || pQueryTableMsg->numOfOutputCols <= 0) {
dError("qmsg:%p illegal value of output columns %d", pQueryTableMsg, pQueryTableMsg->numOfOutputCols);
return -1;
}
if (pQueryTableMsg->tagLength < 0) {
dError("qmsg:%p illegal value of tag length %d", pQueryTableMsg, pQueryTableMsg->tagLength);
return -1;
}
return 0;
}
static int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg, SArray** pTableIdList) {
pQueryTableMsg->vgId = htons(pQueryTableMsg->vgId);
pQueryTableMsg->numOfTables = htonl(pQueryTableMsg->numOfTables);
pQueryTableMsg->window.skey = htobe64(pQueryTableMsg->window.skey);
pQueryTableMsg->window.ekey = htobe64(pQueryTableMsg->window.ekey);
pQueryTableMsg->order = htons(pQueryTableMsg->order);
pQueryTableMsg->orderColId = htons(pQueryTableMsg->orderColId);
pQueryTableMsg->queryType = htons(pQueryTableMsg->queryType);
pQueryTableMsg->intervalTime = htobe64(pQueryTableMsg->intervalTime);
pQueryTableMsg->slidingTime = htobe64(pQueryTableMsg->slidingTime);
pQueryTableMsg->numOfTagsCols = htons(pQueryTableMsg->numOfTagsCols);
pQueryTableMsg->numOfCols = htons(pQueryTableMsg->numOfCols);
pQueryTableMsg->numOfOutputCols = htons(pQueryTableMsg->numOfOutputCols);
pQueryTableMsg->numOfGroupCols = htons(pQueryTableMsg->numOfGroupCols);
pQueryTableMsg->tagLength = htons(pQueryTableMsg->tagLength);
pQueryTableMsg->limit = htobe64(pQueryTableMsg->limit);
pQueryTableMsg->offset = htobe64(pQueryTableMsg->offset);
pQueryTableMsg->tsOffset = htonl(pQueryTableMsg->tsOffset);
pQueryTableMsg->tsLen = htonl(pQueryTableMsg->tsLen);
pQueryTableMsg->tsNumOfBlocks = htonl(pQueryTableMsg->tsNumOfBlocks);
pQueryTableMsg->tsOrder = htonl(pQueryTableMsg->tsOrder);
// query msg safety check
if (validateQueryMeterMsg(pQueryTableMsg) != 0) {
return TSDB_CODE_INVALID_QUERY_MSG;
}
char *pMsg = (char *)(pQueryTableMsg->colList) + sizeof(SColumnInfo) * pQueryTableMsg->numOfCols;
for (int32_t col = 0; col < pQueryTableMsg->numOfCols; ++col) {
pQueryTableMsg->colList[col].colId = htons(pQueryTableMsg->colList[col].colId);
pQueryTableMsg->colList[col].type = htons(pQueryTableMsg->colList[col].type);
pQueryTableMsg->colList[col].bytes = htons(pQueryTableMsg->colList[col].bytes);
pQueryTableMsg->colList[col].numOfFilters = htons(pQueryTableMsg->colList[col].numOfFilters);
assert(pQueryTableMsg->colList[col].type >= TSDB_DATA_TYPE_BOOL &&
pQueryTableMsg->colList[col].type <= TSDB_DATA_TYPE_NCHAR);
int32_t numOfFilters = pQueryTableMsg->colList[col].numOfFilters;
if (numOfFilters > 0) {
pQueryTableMsg->colList[col].filters = calloc(numOfFilters, sizeof(SColumnFilterInfo));
}
for (int32_t f = 0; f < numOfFilters; ++f) {
SColumnFilterInfo *pFilterInfo = (SColumnFilterInfo *)pMsg;
SColumnFilterInfo *pDestFilterInfo = &pQueryTableMsg->colList[col].filters[f];
pDestFilterInfo->filterOnBinary = htons(pFilterInfo->filterOnBinary);
pMsg += sizeof(SColumnFilterInfo);
if (pDestFilterInfo->filterOnBinary) {
pDestFilterInfo->len = htobe64(pFilterInfo->len);
pDestFilterInfo->pz = (int64_t)calloc(1, pDestFilterInfo->len + 1);
memcpy((void *)pDestFilterInfo->pz, pMsg, pDestFilterInfo->len + 1);
pMsg += (pDestFilterInfo->len + 1);
} else {
pDestFilterInfo->lowerBndi = htobe64(pFilterInfo->lowerBndi);
pDestFilterInfo->upperBndi = htobe64(pFilterInfo->upperBndi);
}
pDestFilterInfo->lowerRelOptr = htons(pFilterInfo->lowerRelOptr);
pDestFilterInfo->upperRelOptr = htons(pFilterInfo->upperRelOptr);
}
}
bool hasArithmeticFunction = false;
/*
* 1. simple projection query on meters, we only record the pSqlFuncExprs[i].colIdx value
* 2. for complex queries, whole SqlExprs object is required.
*/
pQueryTableMsg->pSqlFuncExprs = (int64_t)malloc(POINTER_BYTES * pQueryTableMsg->numOfOutputCols);
SSqlFuncExprMsg *pExprMsg = (SSqlFuncExprMsg *)pMsg;
for (int32_t i = 0; i < pQueryTableMsg->numOfOutputCols; ++i) {
((SSqlFuncExprMsg **)pQueryTableMsg->pSqlFuncExprs)[i] = pExprMsg;
pExprMsg->colInfo.colIdx = htons(pExprMsg->colInfo.colIdx);
pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId);
pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag);
pExprMsg->functionId = htons(pExprMsg->functionId);
pExprMsg->numOfParams = htons(pExprMsg->numOfParams);
pMsg += sizeof(SSqlFuncExprMsg);
for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) {
pExprMsg->arg[j].argType = htons(pExprMsg->arg[j].argType);
pExprMsg->arg[j].argBytes = htons(pExprMsg->arg[j].argBytes);
if (pExprMsg->arg[j].argType == TSDB_DATA_TYPE_BINARY) {
pExprMsg->arg[j].argValue.pz = pMsg;
pMsg += pExprMsg->arg[j].argBytes + 1; // one more for the string terminated char.
} else {
pExprMsg->arg[j].argValue.i64 = htobe64(pExprMsg->arg[j].argValue.i64);
}
}
if (pExprMsg->functionId == TSDB_FUNC_ARITHM) {
hasArithmeticFunction = true;
} else if (pExprMsg->functionId == TSDB_FUNC_TAG || pExprMsg->functionId == TSDB_FUNC_TAGPRJ ||
pExprMsg->functionId == TSDB_FUNC_TAG_DUMMY) {
if (pExprMsg->colInfo.flag != TSDB_COL_TAG) { // ignore the column index check for arithmetic expression.
return TSDB_CODE_INVALID_QUERY_MSG;
}
} else {
if (!vnodeValidateExprColumnInfo(pQueryTableMsg, pExprMsg)) {
return TSDB_CODE_INVALID_QUERY_MSG;
}
}
pExprMsg = (SSqlFuncExprMsg *)pMsg;
}
pQueryTableMsg->colNameLen = htonl(pQueryTableMsg->colNameLen);
if (hasArithmeticFunction) { // column name array
assert(pQueryTableMsg->colNameLen > 0);
pQueryTableMsg->colNameList = (int64_t)pMsg;
pMsg += pQueryTableMsg->colNameLen;
}
*pTableIdList = taosArrayInit(pQueryTableMsg->numOfTables, sizeof(STableIdInfo));
STableIdInfo* pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->sid = htonl(pTableIdInfo->sid);
pTableIdInfo->uid = htobe64(pTableIdInfo->uid);
pTableIdInfo->key = htobe64(pTableIdInfo->key);
taosArrayPush(*pTableIdList, pTableIdInfo);
pMsg += sizeof(STableIdInfo);
for (int32_t j = 1; j < pQueryTableMsg->numOfTables; ++j) {
pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->sid = htonl(pTableIdInfo->sid);
pTableIdInfo->uid = htobe64(pTableIdInfo->uid);
pTableIdInfo->key = htobe64(pTableIdInfo->key);
taosArrayPush(*pTableIdList, pTableIdInfo);
pMsg += sizeof(STableIdInfo);
}
if (pQueryTableMsg->numOfGroupCols > 0 || pQueryTableMsg->numOfTagsCols > 0) { // group by tag columns
pQueryTableMsg->pTagSchema = (uint64_t)pMsg;
SSchema *pTagSchema = (SSchema *)pQueryTableMsg->pTagSchema;
pMsg += sizeof(SSchema) * pQueryTableMsg->numOfTagsCols;
if (pQueryTableMsg->numOfGroupCols > 0) {
pQueryTableMsg->groupbyTagIds = (uint64_t) & (pTagSchema[pQueryTableMsg->numOfTagsCols]);
} else {
pQueryTableMsg->groupbyTagIds = 0;
}
pQueryTableMsg->orderByIdx = htons(pQueryTableMsg->orderByIdx);
pQueryTableMsg->orderType = htons(pQueryTableMsg->orderType);
pMsg += sizeof(SColIndexEx) * pQueryTableMsg->numOfGroupCols;
} else {
pQueryTableMsg->pTagSchema = 0;
pQueryTableMsg->groupbyTagIds = 0;
}
pQueryTableMsg->interpoType = htons(pQueryTableMsg->interpoType);
if (pQueryTableMsg->interpoType != TSDB_INTERPO_NONE) {
pQueryTableMsg->defaultVal = (uint64_t)(pMsg);
int64_t *v = (int64_t *)pMsg;
for (int32_t i = 0; i < pQueryTableMsg->numOfOutputCols; ++i) {
v[i] = htobe64(v[i]);
}
}
dTrace("qmsg:%p query on %d meter(s), qrange:%" PRId64 "-%" PRId64
", numOfGroupbyTagCols:%d, numOfTagCols:%d, timestamp order:%d, "
"tags order:%d, tags order col:%d, numOfOutputCols:%d, numOfCols:%d, interval:%" PRId64
", fillType:%d, comptslen:%d, limit:%" PRId64
", "
"offset:%" PRId64,
pQueryTableMsg, pQueryTableMsg->numOfTables, pQueryTableMsg->window.skey, pQueryTableMsg->window.ekey,
pQueryTableMsg->numOfGroupCols, pQueryTableMsg->numOfTagsCols, pQueryTableMsg->order,
pQueryTableMsg->orderType, pQueryTableMsg->orderByIdx, pQueryTableMsg->numOfOutputCols,
pQueryTableMsg->numOfCols, pQueryTableMsg->intervalTime, pQueryTableMsg->interpoType, pQueryTableMsg->tsLen,
pQueryTableMsg->limit, pQueryTableMsg->offset);
return 0;
}
static int32_t buildAirthmeticExprFromMsg(SSqlFunctionExpr *pExpr, SQueryTableMsg *pQueryMsg) {
SSqlBinaryExprInfo *pBinaryExprInfo = &pExpr->binExprInfo;
SColumnInfo * pColMsg = pQueryMsg->colList;
#if 0
tSQLBinaryExpr* pBinExpr = NULL;
SSchema* pSchema = toSchema(pQueryMsg, pColMsg, pQueryMsg->numOfCols);
dTrace("qmsg:%p create binary expr from string:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz);
tSQLBinaryExprFromString(&pBinExpr, pSchema, pQueryMsg->numOfCols, pExpr->pBase.arg[0].argValue.pz,
pExpr->pBase.arg[0].argBytes);
if (pBinExpr == NULL) {
dError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz);
return TSDB_CODE_APP_ERROR;
}
pBinaryExprInfo->pBinExpr = pBinExpr;
int32_t num = 0;
int16_t ids[TSDB_MAX_COLUMNS] = {0};
tSQLBinaryExprTrv(pBinExpr, &num, ids);
qsort(ids, num, sizeof(int16_t), id_compar);
int32_t i = 0, j = 0;
while (i < num && j < num) {
if (ids[i] == ids[j]) {
j++;
} else {
ids[++i] = ids[j++];
}
}
assert(i <= num);
// there may be duplicated referenced columns.
num = i + 1;
pBinaryExprInfo->pReqColumns = malloc(sizeof(SColIndexEx) * num);
for (int32_t k = 0; k < num; ++k) {
SColIndexEx* pColIndex = &pBinaryExprInfo->pReqColumns[k];
pColIndex->colId = ids[k];
}
pBinaryExprInfo->numOfCols = num;
free(pSchema);
#endif
return TSDB_CODE_SUCCESS;
}
static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr **pSqlFuncExpr) {
*pSqlFuncExpr = NULL;
int32_t code = TSDB_CODE_SUCCESS;
SSqlFunctionExpr *pExprs = (SSqlFunctionExpr *)calloc(1, sizeof(SSqlFunctionExpr) * pQueryMsg->numOfOutputCols);
if (pExprs == NULL) {
tfree(pQueryMsg->pSqlFuncExprs);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType);
int16_t tagLen = 0;
SSchema *pTagSchema = (SSchema *)pQueryMsg->pTagSchema;
for (int32_t i = 0; i < pQueryMsg->numOfOutputCols; ++i) {
pExprs[i].pBase = *((SSqlFuncExprMsg **)pQueryMsg->pSqlFuncExprs)[i];
pExprs[i].resBytes = 0;
int16_t type = 0;
int16_t bytes = 0;
SColIndexEx *pColumnIndexExInfo = &pExprs[i].pBase.colInfo;
// tag column schema is kept in pQueryMsg->pColumnModel
if (TSDB_COL_IS_TAG(pColumnIndexExInfo->flag)) {
if (pColumnIndexExInfo->colIdx >= pQueryMsg->numOfTagsCols) {
tfree(pExprs);
return TSDB_CODE_INVALID_QUERY_MSG;
}
type = pTagSchema[pColumnIndexExInfo->colIdx].type;
bytes = pTagSchema[pColumnIndexExInfo->colIdx].bytes;
} else { // parse the arithmetic expression
if (pExprs[i].pBase.functionId == TSDB_FUNC_ARITHM) {
code = buildAirthmeticExprFromMsg(&pExprs[i], pQueryMsg);
if (code != TSDB_CODE_SUCCESS) {
tfree(pExprs);
return code;
}
type = TSDB_DATA_TYPE_DOUBLE;
bytes = tDataTypeDesc[type].nSize;
} else { // parse the normal column
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase);
assert(j < pQueryMsg->numOfCols);
SColumnInfo* pCol = &pQueryMsg->colList[j];
type = pCol->type;
bytes = pCol->bytes;
}
}
int32_t param = pExprs[i].pBase.arg[0].argValue.i64;
if (getResultDataInfo(type, bytes, pExprs[i].pBase.functionId, param, &pExprs[i].resType, &pExprs[i].resBytes,
&pExprs[i].interResBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) {
tfree(pExprs);
return TSDB_CODE_INVALID_QUERY_MSG;
}
if (pExprs[i].pBase.functionId == TSDB_FUNC_TAG_DUMMY || pExprs[i].pBase.functionId == TSDB_FUNC_TS_DUMMY) {
tagLen += pExprs[i].resBytes;
}
assert(isValidDataType(pExprs[i].resType, pExprs[i].resBytes));
}
// get the correct result size for top/bottom query, according to the number of tags columns in selection clause
// TODO refactor
for (int32_t i = 0; i < pQueryMsg->numOfOutputCols; ++i) {
pExprs[i].pBase = *((SSqlFuncExprMsg **)pQueryMsg->pSqlFuncExprs)[i];
int16_t functId = pExprs[i].pBase.functionId;
if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) {
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase);
assert(j < pQueryMsg->numOfCols);
SColumnInfo *pCol = &pQueryMsg->colList[j];
int16_t type = pCol->type;
int16_t bytes = pCol->bytes;
int32_t ret =
getResultDataInfo(type, bytes, pExprs[i].pBase.functionId, pExprs[i].pBase.arg[0].argValue.i64,
&pExprs[i].resType, &pExprs[i].resBytes, &pExprs[i].interResBytes, tagLen, isSuperTable);
assert(ret == TSDB_CODE_SUCCESS);
}
}
tfree(pQueryMsg->pSqlFuncExprs);
*pSqlFuncExpr = pExprs;
return TSDB_CODE_SUCCESS;
}
static SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t *code) {
if (pQueryMsg->numOfGroupCols == 0) {
return NULL;
}
// using group by tag columns
SSqlGroupbyExpr *pGroupbyExpr =
(SSqlGroupbyExpr *)malloc(sizeof(SSqlGroupbyExpr) + pQueryMsg->numOfGroupCols * sizeof(SColIndexEx));
if (pGroupbyExpr == NULL) {
*code = TSDB_CODE_SERV_OUT_OF_MEMORY;
return NULL;
}
SColIndexEx *pGroupbyColInfo = (SColIndexEx *)pQueryMsg->groupbyTagIds;
pGroupbyExpr->numOfGroupCols = pQueryMsg->numOfGroupCols;
pGroupbyExpr->orderType = pQueryMsg->orderType;
pGroupbyExpr->orderIndex = pQueryMsg->orderByIdx;
memcpy(pGroupbyExpr->columnInfo, pGroupbyColInfo, sizeof(SColIndexEx) * pGroupbyExpr->numOfGroupCols);
return pGroupbyExpr;
}
static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
if (pQuery->colList[i].info.numOfFilters > 0) {
pQuery->numOfFilterCols++;
}
}
if (pQuery->numOfFilterCols == 0) {
return TSDB_CODE_SUCCESS;
}
pQuery->pFilterInfo = calloc(1, sizeof(SSingleColumnFilterInfo) * pQuery->numOfFilterCols);
for (int32_t i = 0, j = 0; i < pQuery->numOfCols; ++i) {
if (pQuery->colList[i].info.numOfFilters > 0) {
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[j];
memcpy(&pFilterInfo->info, &pQuery->colList[i], sizeof(SColumnInfoEx));
pFilterInfo->info.info.filters = NULL;
pFilterInfo->numOfFilters = pQuery->colList[i].info.numOfFilters;
pFilterInfo->pFilters = calloc(pFilterInfo->numOfFilters, sizeof(SColumnFilterElem));
for (int32_t f = 0; f < pFilterInfo->numOfFilters; ++f) {
SColumnFilterElem *pSingleColFilter = &pFilterInfo->pFilters[f];
pSingleColFilter->filterInfo = pQuery->colList[i].info.filters[f];
int32_t lower = pSingleColFilter->filterInfo.lowerRelOptr;
int32_t upper = pSingleColFilter->filterInfo.upperRelOptr;
if (lower == TSDB_RELATION_INVALID && upper == TSDB_RELATION_INVALID) {
dError("QInfo:%p invalid filter info", pQInfo);
return TSDB_CODE_INVALID_QUERY_MSG;
}
int16_t type = pQuery->colList[i].info.type;
int16_t bytes = pQuery->colList[i].info.bytes;
__filter_func_t *rangeFilterArray = NULL; // vnodeGetRangeFilterFuncArray(type);
__filter_func_t *filterArray = NULL; // vnodeGetValueFilterFuncArray(type);
if (rangeFilterArray == NULL && filterArray == NULL) {
dError("QInfo:%p failed to get filter function, invalid data type:%d", pQInfo, type);
return TSDB_CODE_INVALID_QUERY_MSG;
}
if ((lower == TSDB_RELATION_LARGE_EQUAL || lower == TSDB_RELATION_LARGE) &&
(upper == TSDB_RELATION_LESS_EQUAL || upper == TSDB_RELATION_LESS)) {
if (lower == TSDB_RELATION_LARGE_EQUAL) {
if (upper == TSDB_RELATION_LESS_EQUAL) {
pSingleColFilter->fp = rangeFilterArray[4];
} else {
pSingleColFilter->fp = rangeFilterArray[2];
}
} else {
if (upper == TSDB_RELATION_LESS_EQUAL) {
pSingleColFilter->fp = rangeFilterArray[3];
} else {
pSingleColFilter->fp = rangeFilterArray[1];
}
}
} else { // set callback filter function
if (lower != TSDB_RELATION_INVALID) {
pSingleColFilter->fp = filterArray[lower];
if (upper != TSDB_RELATION_INVALID) {
dError("pQInfo:%p failed to get filter function, invalid filter condition", pQInfo, type);
return TSDB_CODE_INVALID_QUERY_MSG;
}
} else {
pSingleColFilter->fp = filterArray[upper];
}
}
assert(pSingleColFilter->fp != NULL);
pSingleColFilter->bytes = bytes;
}
j++;
}
}
return TSDB_CODE_SUCCESS;
}
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs, SArray* pTableIdList) {
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
if (pQInfo == NULL) {
goto _clean_memory;
}
SQuery *pQuery = calloc(1, sizeof(SQuery));
pQInfo->runtimeEnv.pQuery = pQuery;
int16_t numOfCols = pQueryMsg->numOfCols;
int16_t numOfOutputCols = pQueryMsg->numOfOutputCols;
pQuery->numOfCols = numOfCols;
pQuery->numOfOutputCols = numOfOutputCols;
pQuery->limit.limit = pQueryMsg->limit;
pQuery->limit.offset = pQueryMsg->offset;
pQuery->order.order = pQueryMsg->order;
pQuery->order.orderColId = pQueryMsg->orderColId;
pQuery->pSelectExpr = pExprs;
pQuery->pGroupbyExpr = pGroupbyExpr;
pQuery->intervalTime = pQueryMsg->intervalTime;
pQuery->slidingTime = pQueryMsg->slidingTime;
pQuery->slidingTimeUnit = pQueryMsg->slidingTimeUnit;
pQuery->interpoType = pQueryMsg->interpoType;
pQuery->colList = calloc(1, sizeof(SSingleColumnFilterInfo) * numOfCols);
if (pQuery->colList == NULL) {
goto _clean_memory;
}
for (int16_t i = 0; i < numOfCols; ++i) {
pQuery->colList[i].info = pQueryMsg->colList[i];
// SColumnInfo *pColInfo = &pQuery->colList[i].data;
// pColInfo->filters = NULL;
// if (colList[i].numOfFilters > 0) {
// pColInfo->filters = calloc(1, colList[i].numOfFilters * sizeof(SColumnFilterInfo));
//
// for (int32_t j = 0; j < colList[i].numOfFilters; ++j) {
// tscColumnFilterInfoCopy(&pColInfo->filters[j], &colList[i].filters[j]);
// }
// } else {
// pQuery->colList[i].data.filters = NULL;
// }
}
// calculate the result row size
for (int16_t col = 0; col < numOfOutputCols; ++col) {
assert(pExprs[col].resBytes > 0);
pQuery->rowSize += pExprs[col].resBytes;
}
int32_t ret = vnodeCreateFilterInfo(pQInfo, pQuery);
if (ret != TSDB_CODE_SUCCESS) {
goto _clean_memory;
}
// prepare the result buffer
pQuery->sdata = (SData **)calloc(pQuery->numOfOutputCols, sizeof(SData *));
if (pQuery->sdata == NULL) {
goto _clean_memory;
}
// set the output buffer capacity
pQuery->capacity = 4096;
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) {
assert(pExprs[col].interResBytes >= pExprs[col].resBytes);
// allocate additional memory for interResults that are usually larger then final results
size_t size = (pQuery->capacity + 1) * pExprs[col].resBytes + pExprs[col].interResBytes + sizeof(SData);
pQuery->sdata[col] = (SData *)calloc(1, size);
if (pQuery->sdata[col] == NULL) {
goto _clean_memory;
}
}
if (pQuery->interpoType != TSDB_INTERPO_NONE) {
pQuery->defaultVal = malloc(sizeof(int64_t) * pQuery->numOfOutputCols);
if (pQuery->defaultVal == NULL) {
goto _clean_memory;
}
// the first column is the timestamp
memcpy(pQuery->defaultVal, (char *)pQueryMsg->defaultVal, pQuery->numOfOutputCols * sizeof(int64_t));
}
// to make sure third party won't overwrite this structure
pQInfo->signature = (uint64_t)pQInfo;
pQInfo->pTableIdList = pTableIdList;
pQuery->pos = -1;
// dTrace("vid:%d sid:%d meterId:%s, QInfo is allocated:%p", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQInfo);
return pQInfo;
_clean_memory:
tfree(pQuery->defaultVal);
if (pQuery->sdata != NULL) {
for (int16_t col = 0; col < pQuery->numOfOutputCols; ++col) {
tfree(pQuery->sdata[col]);
}
}
tfree(pQuery->sdata);
tfree(pQuery->pFilterInfo);
tfree(pQuery->colList);
tfree(pExprs);
tfree(pGroupbyExpr);
tfree(pQInfo);
return NULL;
}
bool isQInfoValid(void *param) {
SQInfo *pQInfo = (SQInfo *)param;
if (pQInfo == NULL) {
return false;
}
/*
* pQInfo->signature may be changed by another thread, so we assign value of signature
* into local variable, then compare by using local variable
*/
uint64_t sig = pQInfo->signature;
return (sig == (uint64_t)pQInfo);
}
void vnodeFreeQInfo(SQInfo* pQInfo, bool decQueryRef) {
if (!isQInfoValid(pQInfo)) {
return;
}
pQInfo->killed = 1;
dTrace("QInfo:%p start to free SQInfo", pQInfo);
if (decQueryRef) {
vnodeDecMeterRefcnt(pQInfo);
}
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
for (int col = 0; col < pQuery->numOfOutputCols; ++col) {
tfree(pQuery->sdata[col]);
}
// for (int col = 0; col < pQuery->numOfCols; ++col) {
// vnodeFreeColumnInfo(&pQuery->colList[col].data);
// }
//
// if (pQuery->colList[0].colIdx != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
// tfree(pQuery->tsData);
// }
sem_destroy(&(pQInfo->dataReady));
vnodeQueryFreeQInfoEx(pQInfo);
for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) {
SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i];
if (pColFilter->numOfFilters > 0) {
tfree(pColFilter->pFilters);
}
}
tfree(pQuery->pFilterInfo);
tfree(pQuery->colList);
tfree(pQuery->sdata);
if (pQuery->pSelectExpr != NULL) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SSqlBinaryExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].binExprInfo;
if (pBinExprInfo->numOfCols > 0) {
tfree(pBinExprInfo->pReqColumns);
tSQLBinaryExprDestroy(&pBinExprInfo->pBinExpr, NULL);
}
}
tfree(pQuery->pSelectExpr);
}
if (pQuery->defaultVal != NULL) {
tfree(pQuery->defaultVal);
}
tfree(pQuery->pGroupbyExpr);
tfree(pQuery);
// dTrace("QInfo:%p vid:%d sid:%d meterId:%s, QInfo is freed", pQInfo, pObj->vnode, pObj->sid, pObj->meterId);
//destroy signature, in order to avoid the query process pass the object safety check
memset(pQInfo, 0, sizeof(SQInfo));
tfree(pQInfo);
}
static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs,
SArray* pTableIdList, SQInfo **pQInfo) {
int32_t code = TSDB_CODE_SUCCESS;
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pSqlExprs, pTableIdList);
if (pQInfo == NULL) {
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _error;
}
SQuery* pQuery = (*pQInfo)->runtimeEnv.pQuery;
dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo);
// STableIdInfo **pTableIdList = (STableIdInfo **)pQueryMsg->pSidExtInfo;
// if (pTableIdList != NULL && pTableIdList[0]->key > 0) {
// pQuery->window.skey = pTableIdList[0]->key;
// } else {
pQuery->window.skey = pQueryMsg->window.skey;
pQuery->window.ekey = pQueryMsg->window.ekey;
pQuery->lastKey = pQuery->window.skey;
if (sem_init(&(*pQInfo)->dataReady, 0, 0) != 0) {
// dError("QInfo:%p vid:%d sid:%d meterId:%s, init dataReady sem failed, reason:%s", pQInfo, pMeterObj->vnode,
// pMeterObj->sid, pMeterObj->meterId, strerror(errno));
code = TSDB_CODE_APP_ERROR;
goto _error;
}
vnodeParametersSafetyCheck(pQuery);
STSBuf *pTSBuf = NULL;
if (pQueryMsg->tsLen > 0) { // open new file to save the result
char *tsBlock = (char *)pQueryMsg + pQueryMsg->tsOffset;
pTSBuf = tsBufCreateFromCompBlocks(tsBlock, pQueryMsg->tsNumOfBlocks, pQueryMsg->tsLen, pQueryMsg->tsOrder);
tsBufResetPos(pTSBuf);
tsBufNextPos(pTSBuf);
}
if ((code = vnodeQueryTablePrepare(*pQInfo, pTSBuf)) != TSDB_CODE_SUCCESS) {
goto _error;
}
// if (pQInfo->over == 1) {
// vnodeAddRefCount(pQInfo); // for retrieve procedure
// return pQInfo;
// }
// dTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo,
// pQInfo->refCount);
return code;
_error:
// table query ref will be decrease during error handling
vnodeFreeQInfo(*pQInfo, false);
return code;
}
int32_t qCreateQueryInfo(SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) {
assert(pQueryTableMsg != NULL);
int32_t code = TSDB_CODE_SUCCESS;
SArray* pTableIdList = NULL;
if ((code = convertQueryTableMsg(pQueryTableMsg, &pTableIdList)) != TSDB_CODE_SUCCESS) {
return code;
}
if (pQueryTableMsg->numOfTables <= 0) {
dError("Invalid number of tables to query, numOfTables:%d", pQueryTableMsg->numOfTables);
code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over;
}
// todo check vnode status
if (pTableIdList == NULL || taosArrayGetSize(pTableIdList) == 0) {
dError("qmsg:%p, SQueryTableMsg wrong format", pQueryTableMsg);
code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over;
}
SSqlFunctionExpr *pExprs = NULL;
if ((code = createSqlFunctionExprFromMsg(pQueryTableMsg, &pExprs)) != TSDB_CODE_SUCCESS) {
goto _query_over;
}
SSqlGroupbyExpr *pGroupbyExpr = createGroupbyExprFromMsg(pQueryTableMsg, &code);
if ((pGroupbyExpr == NULL && pQueryTableMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) {
goto _query_over;
}
if (QUERY_IS_STABLE_QUERY(pQueryTableMsg->queryType)) {
// pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryTableMsg, &code);
} else {
code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, pQInfo);
}
_query_over:
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pTableIdList);
}
// if failed to add ref for all meters in this query, abort current query
// if (code != TSDB_CODE_SUCCESS) {
// vnodeDecQueryRefCount(pQueryTableMsg, pMeterObjList, incNumber);
// }
//
// tfree(pQueryTableMsg->pSqlFuncExprs);
// tfree(pMeterObjList);
// ret = vnodeSendQueryRspMsg(pObj, code, pObj->qhandle);
//
// tfree(pQueryTableMsg->pSidExtInfo);
// for(int32_t i = 0; i < pQueryTableMsg->numOfCols; ++i) {
// vnodeFreeColumnInfo(&pQueryTableMsg->colList[i]);
// }
//
// atomic_fetch_add_32(&vnodeSelectReqNum, 1);
return TSDB_CODE_SUCCESS;
}
int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize) {
if (pQInfo == NULL || !isQInfoValid(pQInfo)) {
return TSDB_CODE_INVALID_QHANDLE;
}
if (pQInfo->killed) {
dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code);
if (pQInfo->code == TSDB_CODE_SUCCESS) {
return TSDB_CODE_QUERY_CANCELLED;
} else { // in case of not TSDB_CODE_SUCCESS, return the code to client
return abs(pQInfo->code);
}
}
sem_wait(&pQInfo->dataReady);
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
// *numOfRows = pQInfo->rec.pointsRead;
// *rowsize = pQuery->rowSize;
*numOfRows = 1;
// dTrace("QInfo:%p, retrieve data info completed, precision:%d, rowsize:%d, rows:%d, code:%d", pQInfo, *timePrec,
// *rowsize, *numOfRows, pQInfo->code);
if (pQInfo->code < 0) { // less than 0 means there are error existed.
return -pQInfo->code;
}
}
......@@ -287,7 +287,7 @@ void rpcClose(void *param) {
(*taosCleanUpConn[pRpc->connType])(pRpc->udphandle);
for (int i = 0; i < pRpc->sessions; ++i) {
if (pRpc->connList[i].user[0]) {
if (pRpc->connList && pRpc->connList[i].user[0]) {
rpcCloseConn((void *)(pRpc->connList + i));
}
}
......@@ -495,9 +495,10 @@ static void rpcCloseConn(void *thandle) {
SRpcConn *pConn = (SRpcConn *)thandle;
SRpcInfo *pRpc = pConn->pRpc;
if (pConn->user[0] == 0) return;
rpcLockConn(pConn);
if (pConn->user[0]) {
pConn->user[0] = 0;
if (taosCloseConn[pConn->connType]) (*taosCloseConn[pConn->connType])(pConn->chandle);
......@@ -522,7 +523,6 @@ static void rpcCloseConn(void *thandle) {
pConn->pContext = NULL;
tTrace("%s %p, rpc connection is closed", pRpc->label, pConn);
}
rpcUnlockConn(pConn);
}
......
......@@ -16,3 +16,51 @@
#include "os.h"
#include "tsdb.h"
tsdb_query_handle_t *tsdbQueryByTableId(STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo) {
}
bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) {
return false;
}
SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
}
int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SDataStatis **pBlockStatis) {
}
SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList) {
}
int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow* window, tsdbpos_t position, int16_t order) {
}
int32_t tsdbDataBlockSeek(tsdb_query_handle_t *pQueryHandle, tsdbpos_t pos) {
}
tsdbpos_t tsdbDataBlockTell(tsdb_query_handle_t *pQueryHandle) {
return NULL;
}
SArray *tsdbRetrieveDataRow(tsdb_query_handle_t *pQueryHandle, SArray *pIdList, SQueryRowCond *pCond) {
}
tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stableId, const char *pTagFilterStr) {
}
STableIDList *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle) {
}
STableIDList *tsdbQueryTableList(int16_t stableId, const char *pTagCond) {
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册