提交 930f4f16 编写于 作者: S slguan

Merge branch '2.0' into refact/slguan

......@@ -417,6 +417,10 @@ static void count_function(SQLFunctionCtx *pCtx) {
numOfElem += 1;
} else {
* when counting on the primary time stamp column and no statistics data is provided,
* simple use the size value
numOfElem = pCtx->size;
......@@ -273,8 +273,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
pSql->retry = 0;
pRes->rspLen = 0;
if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL;
} else {
......@@ -283,7 +283,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
assert(rpcMsg->msgType == pCmd->msgType + 1);
pRes->code = (int32_t)rpcMsg->code;
pRes->code = rpcMsg->code;
pRes->rspType = rpcMsg->msgType;
pRes->rspLen = rpcMsg->contLen;
......@@ -662,12 +662,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
char *pStart = pCmd->payload + tsRpcHeadSize;
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
if (pQueryInfo->colList.numOfCols <= 0) {
tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
return -1;
char *pStart = pCmd->payload + tsRpcHeadSize;
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
int32_t msgLen = 0;
......@@ -675,11 +679,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
numOfTables = 1;
pQueryMsg->uid = pTableMeta->uid;
pQueryMsg->numOfTagsCols = 0;
pQueryMsg->vgId = htonl(pTableMeta->vgId);
pQueryMsg->head.vgId = htonl(pTableMeta->vgId);
tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name);
} else { // query on super table
if (pTableMetaInfo->vnodeIndex < 0) {
......@@ -697,11 +697,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tscTrace("%p query on vid:%d, number of tables:%d", pSql, vnodeId, numOfTables);
pQueryMsg->vgId = htons(vnodeId);
pQueryMsg->head.vgId = htons(vnodeId);
pQueryMsg->numOfTables = htonl(numOfTables);
pQueryMsg->numOfTagsCols = htons(pTableMetaInfo->numOfTags);
if (pQueryInfo->order.order == TSQL_SO_ASC) {
pQueryMsg->window.skey = htobe64(pQueryInfo->stime);
......@@ -713,22 +712,13 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->order = htons(pQueryInfo->order.order);
pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId);
pQueryMsg->interpoType = htons(pQueryInfo->interpoType);
pQueryMsg->limit = htobe64(pQueryInfo->limit.limit);
pQueryMsg->offset = htobe64(pQueryInfo->limit.offset);
pQueryMsg->numOfCols = htons(pQueryInfo->colList.numOfCols);
if (pQueryInfo->colList.numOfCols <= 0) {
tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
return -1;
pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime);
pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime);
pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
if (pQueryInfo->intervalTime < 0) {
tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
......@@ -866,25 +856,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->colNameLen = htonl(len);
// serialize the table info (sid, uid, tags)
pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->vgId), pMsg);
// only include the required tag column schema. If a tag is not required, it won't be sent to vnode
if (pTableMetaInfo->numOfTags > 0) {
// always transfer tag schema to vnode if exists
SSchema *pTagSchema = tscGetTableTagSchema(pTableMeta);
for (int32_t j = 0; j < pTableMetaInfo->numOfTags; ++j) {
if (pTableMetaInfo->tagColumnIndex[j] == TSDB_TBNAME_COLUMN_INDEX) {
SSchema tbSchema = {
memcpy(pMsg, &tbSchema, sizeof(SSchema));
} else {
memcpy(pMsg, &pTagSchema[pTableMetaInfo->tagColumnIndex[j]], sizeof(SSchema));
pMsg += sizeof(SSchema);
pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->head.vgId), pMsg);
SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
if (pGroupbyExpr->numOfGroupCols != 0) {
......@@ -948,8 +920,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd->payloadLen = msgLen;
pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
pQueryMsg->contLen = htonl(msgLen);
pQueryMsg->head.contLen = htonl(msgLen);
assert(msgLen + minMsgSize() <= size);
......@@ -2358,7 +2329,6 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
pRes->row = 0;
tscTrace("%p numOfRows:%d, offset:%d", pSql, pRes->numOfRows, pRes->offset);
return 0;
......@@ -254,7 +254,7 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
SRetrieveTableMsg *pRetrieve = pMsg->pCont;
void *pQInfo = htobe64(pRetrieve->qhandle);
void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId);
......@@ -263,37 +263,18 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
int32_t contLen = 0;
SRpcMsg rpcRsp = {0};
SRetrieveTableRsp *pRsp = NULL;
int32_t code = qRetrieveQueryResultInfo(pQInfo, &numOfRows, &rowSize);
if (code != TSDB_CODE_SUCCESS) {
contLen = sizeof(SRetrieveTableRsp);
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen);
pRsp->numOfRows = 0;
pRsp->precision = 0;
pRsp->offset = 0;
pRsp->useconds = 0;
rpcRsp = (SRpcMsg) {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = contLen,
.code = code,
.msgType = 0
//todo free qinfo
pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen);
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
} else {
contLen = 100;
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen);
pRsp->numOfRows = htonl(1);
pRsp->precision = htons(0);
pRsp->offset = htobe64(0);
pRsp->useconds = htobe64(0);
// todo set the data
*(int64_t*) pRsp->data = 1000;
// todo check code and handle error in build result set
code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen);
rpcRsp = (SRpcMsg) {
.handle = pMsg->rpcMsg.handle,
......@@ -302,7 +283,6 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
.code = code,
.msgType = 0
......@@ -20,8 +20,8 @@
extern "C" {
#include <stdint.h>
#include <stdbool.h>
#include <stdint.h>
#include "taosdef.h"
#include "taoserror.h"
......@@ -188,7 +188,7 @@ extern char *taosMsg[];
#pragma pack(push, 1)
//typedef struct {
// typedef struct {
// int32_t vnode;
// int32_t sid;
// int32_t sversion;
......@@ -206,7 +206,7 @@ typedef struct SMsgHead {
int32_t vgId;
} SMsgHead;
//typedef struct {
// typedef struct {
// SMsgDesc desc;
// SMsgHead header;
// int16_t import;
......@@ -229,8 +229,8 @@ typedef struct SSubmitBlk {
typedef struct SSubmitMsg {
SMsgHead header;
int32_t length;
int32_t compressed:2;
int32_t numOfBlocks:30;
int32_t compressed : 2;
int32_t numOfBlocks : 30;
SSubmitBlk blocks[];
} SSubmitMsg;
......@@ -258,7 +258,7 @@ typedef struct SSchema {
} SSchema;
typedef struct {
int32_t vnode; //the index of vnode
int32_t vnode; // the index of vnode
uint32_t ip;
} SVnodeDesc;
......@@ -472,47 +472,30 @@ typedef struct STimeWindow {
* the outputCols will be 3 while the numOfCols is 1.
typedef struct {
int32_t contLen; // msg header
int16_t vgId;
int32_t numOfTables;
uint64_t uid;
SMsgHead head;
STimeWindow window;
int32_t numOfTables;
int16_t order;
int16_t orderColId;
int16_t numOfCols; // the number of columns will be load from vnode
char slidingTimeUnit; // time interval type, for revisement of interval(1d)
int64_t intervalTime; // time interval for aggregation, in million second
int64_t intervalOffset; // start offset for interval query
int64_t slidingTime; // value for sliding window
// tag schema, used to parse tag information in pSidExtInfo
uint64_t pTagSchema;
int16_t numOfTagsCols; // required number of tags
char slidingTimeUnit; // time interval type, for revisement of interval(1d)
int16_t tagLength; // tag length in current query
int16_t numOfGroupCols; // num of group by columns
int16_t orderByIdx;
int16_t orderType; // used in group by xx order by xxx
uint64_t groupbyTagIds;
int64_t limit;
int64_t offset;
int16_t queryType; // denote another query process
int16_t numOfOutputCols; // final output columns numbers
int16_t interpoType; // interpolate type
uint64_t defaultVal; // default value array list
int32_t colNameLen;
int64_t colNameList;
int64_t pSqlFuncExprs;
int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed
int32_t tsLen; // total length of ts comp block
int32_t tsNumOfBlocks; // ts comp block numbers
......@@ -175,7 +175,6 @@ typedef struct SQInfo {
TSKEY startTime;
int64_t elapsedTime;
SResultRec rec;
int32_t pointsReturned;
int32_t pointsInterpo;
int32_t code; // error code to returned to client
int32_t killed; // denotes if current query is killed
......@@ -184,7 +183,6 @@ typedef struct SQInfo {
SQueryRuntimeEnv runtimeEnv;
int32_t subgroupIdx;
int32_t offset; /* offset in group result set of subgroup */
// tSidSet* pSidSet;
......@@ -226,7 +224,12 @@ void qSuperTableQuery(void* pReadMsg);
int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize);
//int32_t qBuildQueryResult(SQInfo* pQInfo, void* pBuf);
* @param pQInfo
* @param pRsp
* @return
int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* contLen);
......@@ -219,7 +219,7 @@ typedef struct SQLAggFuncElem {
void (*distSecondaryMergeFunc)(SQLFunctionCtx *pCtx);
int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId, int32_t blockStatus);
int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId);
} SQLAggFuncElem;
typedef struct SPatternCompareInfo {
......@@ -162,6 +162,8 @@ void taosDeleteStrHash(void *handle, char *string) {
if (pObj == NULL || pObj->maxSessions == 0) return;
if (string == NULL || string[0] == 0) return;
hash = (*(pObj->hashFp))(pObj, string);
......@@ -109,7 +109,6 @@ typedef struct STsdbQueryHandle {
uint16_t flag; // denotes reversed scan of data or not
int16_t order;
STimeWindow window; // the primary query time window that applies to all queries
TSKEY lastKey;
int32_t blockBufferSize;
SCompBlock *pBlock;
int32_t numOfBlocks;
......@@ -265,7 +264,19 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond
pQueryHandle->loadDataAfterSeek = false;
pQueryHandle->isFirstSlot = true;
pQueryHandle->lastKey = pQueryHandle->window.skey; // ascending query
// only support table query
assert(taosArrayGetSize(idList) == 1);
pQueryHandle->pTableQueryInfo = calloc(1, sizeof(STableQueryRec));
STableQueryRec* pTableQRec = pQueryHandle->pTableQueryInfo;
pTableQRec->lastKey = pQueryHandle->window.skey;
STableIdInfo* idInfo = taosArrayGet(pQueryHandle->pTableIdList, 0);
STableId tableId = {.uid = idInfo->uid, .tid = idInfo->sid};
STable *pTable = tsdbIsValidTableToInsert(tsdbGetMeta(pQueryHandle->pTsdb), tableId);
pTableQRec->pTableObj = pTable;
// malloc buffer in order to load data from file
int32_t numOfCols = taosArrayGetSize(pColumnInfo);
......@@ -295,14 +306,21 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond
return (tsdb_query_handle_t)pQueryHandle;
static int32_t next = 1;
bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) {
if (next == 0) {
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
STable *pTable = pHandle->pTableQueryInfo->pTableObj;
// no data in cache, abort
if (pTable->mem == NULL && pTable->imem == NULL) {
return false;
// all data in mem are checked already.
if (pHandle->pTableQueryInfo->lastKey > pTable->mem->keyLast) {
return false;
} else {
next = 0;
return true;
return true;
static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead,
......@@ -344,9 +362,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
STableIdInfo* idInfo = taosArrayGet(pHandle->pTableIdList, 0);
STableId tableId = {.uid = idInfo->uid, .tid = idInfo->sid};
STable *pTable = tsdbIsValidTableToInsert(tsdbGetMeta(pHandle->pTsdb), tableId);
assert(pTable != NULL);
STable *pTable = pHandle->pTableQueryInfo->pTableObj;
TSKEY skey = 0, ekey = 0;
int32_t rows = 0;
......@@ -357,12 +373,15 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
SDataBlockInfo blockInfo = {
.uid = tableId.uid,
.sid = tableId.tid,
.uid = idInfo->uid,
.sid = idInfo->sid,
.size = rows,
.window = {.skey = skey, .ekey = ekey}
// update the last key value
pHandle->pTableQueryInfo->lastKey = ekey + 1;
return blockInfo;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册