提交 e49bd8a7 编写于 作者: H hjxilinx

[td-32] fix bugs in returning data to client

上级 09962c00
...@@ -417,6 +417,10 @@ static void count_function(SQLFunctionCtx *pCtx) { ...@@ -417,6 +417,10 @@ static void count_function(SQLFunctionCtx *pCtx) {
numOfElem += 1; numOfElem += 1;
} }
} else { } else {
/*
* when counting on the primary time stamp column and no statistics data is provided,
* simple use the size value
*/
numOfElem = pCtx->size; numOfElem = pCtx->size;
} }
} }
......
...@@ -659,14 +659,18 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -659,14 +659,18 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tscError("%p failed to malloc for query msg", pSql); tscError("%p failed to malloc for query msg", pSql);
return -1; return -1;
} }
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
char *pStart = pCmd->payload + tsRpcHeadSize;
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta; SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
if (pQueryInfo->colList.numOfCols <= 0) {
tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
return -1;
}
char *pStart = pCmd->payload + tsRpcHeadSize;
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart; SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
...@@ -675,11 +679,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -675,11 +679,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
numOfTables = 1; numOfTables = 1;
pQueryMsg->head.vgId = htonl(pTableMeta->vgId);
pQueryMsg->uid = pTableMeta->uid;
pQueryMsg->numOfTagsCols = 0;
pQueryMsg->vgId = htonl(pTableMeta->vgId);
tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name); tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name);
} else { // query on super table } else { // query on super table
if (pTableMetaInfo->vnodeIndex < 0) { if (pTableMetaInfo->vnodeIndex < 0) {
...@@ -697,11 +697,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -697,11 +697,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
tscTrace("%p query on vid:%d, number of tables:%d", pSql, vnodeId, numOfTables); tscTrace("%p query on vid:%d, number of tables:%d", pSql, vnodeId, numOfTables);
pQueryMsg->vgId = htons(vnodeId); pQueryMsg->head.vgId = htons(vnodeId);
} }
pQueryMsg->numOfTables = htonl(numOfTables); pQueryMsg->numOfTables = htonl(numOfTables);
pQueryMsg->numOfTagsCols = htons(pTableMetaInfo->numOfTags);
if (pQueryInfo->order.order == TSQL_SO_ASC) { if (pQueryInfo->order.order == TSQL_SO_ASC) {
pQueryMsg->window.skey = htobe64(pQueryInfo->stime); pQueryMsg->window.skey = htobe64(pQueryInfo->stime);
...@@ -711,24 +710,15 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -711,24 +710,15 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->window.ekey = htobe64(pQueryInfo->stime); pQueryMsg->window.ekey = htobe64(pQueryInfo->stime);
} }
pQueryMsg->order = htons(pQueryInfo->order.order); pQueryMsg->order = htons(pQueryInfo->order.order);
pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId); pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId);
pQueryMsg->interpoType = htons(pQueryInfo->interpoType);
pQueryMsg->interpoType = htons(pQueryInfo->interpoType); pQueryMsg->limit = htobe64(pQueryInfo->limit.limit);
pQueryMsg->offset = htobe64(pQueryInfo->limit.offset);
pQueryMsg->limit = htobe64(pQueryInfo->limit.limit); pQueryMsg->numOfCols = htons(pQueryInfo->colList.numOfCols);
pQueryMsg->offset = htobe64(pQueryInfo->limit.offset); pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime);
pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime);
pQueryMsg->numOfCols = htons(pQueryInfo->colList.numOfCols);
if (pQueryInfo->colList.numOfCols <= 0) {
tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
return -1;
}
pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime);
pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit; pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime);
if (pQueryInfo->intervalTime < 0) { if (pQueryInfo->intervalTime < 0) {
tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime); tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
...@@ -776,7 +766,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -776,7 +766,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->colList[i].colId = htons(pColSchema->colId); pQueryMsg->colList[i].colId = htons(pColSchema->colId);
pQueryMsg->colList[i].bytes = htons(pColSchema->bytes); pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
pQueryMsg->colList[i].type = htons(pColSchema->type); pQueryMsg->colList[i].type = htons(pColSchema->type);
pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters); pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
// append the filter information after the basic column information // append the filter information after the basic column information
...@@ -824,11 +814,11 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -824,11 +814,11 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return -1; return -1;
} }
pSqlFuncExpr->colInfo.colId = htons(pExpr->colInfo.colId); pSqlFuncExpr->colInfo.colId = htons(pExpr->colInfo.colId);
pSqlFuncExpr->colInfo.colIdx = htons(pExpr->colInfo.colIdx); pSqlFuncExpr->colInfo.colIdx = htons(pExpr->colInfo.colIdx);
pSqlFuncExpr->colInfo.flag = htons(pExpr->colInfo.flag); pSqlFuncExpr->colInfo.flag = htons(pExpr->colInfo.flag);
pSqlFuncExpr->functionId = htons(pExpr->functionId); pSqlFuncExpr->functionId = htons(pExpr->functionId);
pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams); pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
pMsg += sizeof(SSqlFuncExprMsg); pMsg += sizeof(SSqlFuncExprMsg);
...@@ -866,25 +856,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -866,25 +856,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->colNameLen = htonl(len); pQueryMsg->colNameLen = htonl(len);
// serialize the table info (sid, uid, tags) // serialize the table info (sid, uid, tags)
pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->vgId), pMsg); pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->head.vgId), pMsg);
// only include the required tag column schema. If a tag is not required, it won't be sent to vnode
if (pTableMetaInfo->numOfTags > 0) {
// always transfer tag schema to vnode if exists
SSchema *pTagSchema = tscGetTableTagSchema(pTableMeta);
for (int32_t j = 0; j < pTableMetaInfo->numOfTags; ++j) {
if (pTableMetaInfo->tagColumnIndex[j] == TSDB_TBNAME_COLUMN_INDEX) {
SSchema tbSchema = {
.bytes = TSDB_TABLE_NAME_LEN, .colId = TSDB_TBNAME_COLUMN_INDEX, .type = TSDB_DATA_TYPE_BINARY};
memcpy(pMsg, &tbSchema, sizeof(SSchema));
} else {
memcpy(pMsg, &pTagSchema[pTableMetaInfo->tagColumnIndex[j]], sizeof(SSchema));
}
pMsg += sizeof(SSchema);
}
}
SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr; SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
if (pGroupbyExpr->numOfGroupCols != 0) { if (pGroupbyExpr->numOfGroupCols != 0) {
...@@ -948,8 +920,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -948,8 +920,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd->payloadLen = msgLen; pCmd->payloadLen = msgLen;
pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY; pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
pQueryMsg->contLen = htonl(msgLen); pQueryMsg->head.contLen = htonl(msgLen);
assert(msgLen + minMsgSize() <= size); assert(msgLen + minMsgSize() <= size);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2358,7 +2329,6 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { ...@@ -2358,7 +2329,6 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
} }
pRes->row = 0; pRes->row = 0;
tscTrace("%p numOfRows:%d, offset:%d", pSql, pRes->numOfRows, pRes->offset); tscTrace("%p numOfRows:%d, offset:%d", pSql, pRes->numOfRows, pRes->offset);
return 0; return 0;
......
...@@ -284,16 +284,10 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { ...@@ -284,16 +284,10 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
//todo free qinfo //todo free qinfo
} else { } else {
contLen = 100; SRetrieveTableRsp* pRsp = NULL;
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen); int32_t code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen);
pRsp->numOfRows = htonl(1); //todo check code
pRsp->precision = htons(0);
pRsp->offset = htobe64(0);
pRsp->useconds = htobe64(0);
// todo set the data
*(int64_t*) pRsp->data = 1000;
rpcRsp = (SRpcMsg) { rpcRsp = (SRpcMsg) {
.handle = pMsg->rpcMsg.handle, .handle = pMsg->rpcMsg.handle,
......
此差异已折叠。
...@@ -142,7 +142,7 @@ typedef struct SQuery { ...@@ -142,7 +142,7 @@ typedef struct SQuery {
SResultRec rec; SResultRec rec;
int32_t pos; int32_t pos;
int64_t pointsOffset; // the number of points offset to save read data int64_t pointsOffset; // the number of points offset to save read data
SData** sdata; SData** sdata;
int32_t capacity; int32_t capacity;
SSingleColumnFilterInfo* pFilterInfo; SSingleColumnFilterInfo* pFilterInfo;
} SQuery; } SQuery;
...@@ -171,11 +171,10 @@ typedef struct SQueryRuntimeEnv { ...@@ -171,11 +171,10 @@ typedef struct SQueryRuntimeEnv {
typedef struct SQInfo { typedef struct SQInfo {
uint64_t signature; uint64_t signature;
void* pVnode; void* pVnode;
TSKEY startTime; TSKEY startTime;
int64_t elapsedTime; int64_t elapsedTime;
SResultRec rec; SResultRec rec;
int32_t pointsReturned;
int32_t pointsInterpo; int32_t pointsInterpo;
int32_t code; // error code to returned to client int32_t code; // error code to returned to client
int32_t killed; // denotes if current query is killed int32_t killed; // denotes if current query is killed
...@@ -184,7 +183,6 @@ typedef struct SQInfo { ...@@ -184,7 +183,6 @@ typedef struct SQInfo {
SQueryRuntimeEnv runtimeEnv; SQueryRuntimeEnv runtimeEnv;
int32_t subgroupIdx; int32_t subgroupIdx;
int32_t offset; /* offset in group result set of subgroup */ int32_t offset; /* offset in group result set of subgroup */
// tSidSet* pSidSet;
T_REF_DECLARE() T_REF_DECLARE()
/* /*
...@@ -226,7 +224,12 @@ void qSuperTableQuery(void* pReadMsg); ...@@ -226,7 +224,12 @@ void qSuperTableQuery(void* pReadMsg);
*/ */
int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize); int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize);
/**
//int32_t qBuildQueryResult(SQInfo* pQInfo, void* pBuf); *
* @param pQInfo
* @param pRsp
* @return
*/
int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* contLen);
#endif // TDENGINE_QUERYEXECUTOR_H #endif // TDENGINE_QUERYEXECUTOR_H
...@@ -219,7 +219,7 @@ typedef struct SQLAggFuncElem { ...@@ -219,7 +219,7 @@ typedef struct SQLAggFuncElem {
void (*distSecondaryMergeFunc)(SQLFunctionCtx *pCtx); void (*distSecondaryMergeFunc)(SQLFunctionCtx *pCtx);
int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId, int32_t blockStatus); int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId);
} SQLAggFuncElem; } SQLAggFuncElem;
typedef struct SPatternCompareInfo { typedef struct SPatternCompareInfo {
......
此差异已折叠。
...@@ -162,6 +162,8 @@ void taosDeleteStrHash(void *handle, char *string) { ...@@ -162,6 +162,8 @@ void taosDeleteStrHash(void *handle, char *string) {
if (pObj == NULL || pObj->maxSessions == 0) return; if (pObj == NULL || pObj->maxSessions == 0) return;
if (string == NULL || string[0] == 0) return; if (string == NULL || string[0] == 0) return;
return;
hash = (*(pObj->hashFp))(pObj, string); hash = (*(pObj->hashFp))(pObj, string);
pthread_mutex_lock(&pObj->mutex); pthread_mutex_lock(&pObj->mutex);
......
...@@ -109,7 +109,6 @@ typedef struct STsdbQueryHandle { ...@@ -109,7 +109,6 @@ typedef struct STsdbQueryHandle {
uint16_t flag; // denotes reversed scan of data or not uint16_t flag; // denotes reversed scan of data or not
int16_t order; int16_t order;
STimeWindow window; // the primary query time window that applies to all queries STimeWindow window; // the primary query time window that applies to all queries
TSKEY lastKey;
int32_t blockBufferSize; int32_t blockBufferSize;
SCompBlock *pBlock; SCompBlock *pBlock;
int32_t numOfBlocks; int32_t numOfBlocks;
...@@ -264,8 +263,20 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond ...@@ -264,8 +263,20 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond
pQueryHandle->pColumns = pColumnInfo; pQueryHandle->pColumns = pColumnInfo;
pQueryHandle->loadDataAfterSeek = false; pQueryHandle->loadDataAfterSeek = false;
pQueryHandle->isFirstSlot = true; pQueryHandle->isFirstSlot = true;
pQueryHandle->lastKey = pQueryHandle->window.skey; // ascending query // only support table query
assert(taosArrayGetSize(idList) == 1);
pQueryHandle->pTableQueryInfo = calloc(1, sizeof(STableQueryRec));
STableQueryRec* pTableQRec = pQueryHandle->pTableQueryInfo;
pTableQRec->lastKey = pQueryHandle->window.skey;
STableIdInfo* idInfo = taosArrayGet(pQueryHandle->pTableIdList, 0);
STableId tableId = {.uid = idInfo->uid, .tid = idInfo->sid};
STable *pTable = tsdbIsValidTableToInsert(tsdbGetMeta(pQueryHandle->pTsdb), tableId);
pTableQRec->pTableObj = pTable;
// malloc buffer in order to load data from file // malloc buffer in order to load data from file
int32_t numOfCols = taosArrayGetSize(pColumnInfo); int32_t numOfCols = taosArrayGetSize(pColumnInfo);
...@@ -295,14 +306,21 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond ...@@ -295,14 +306,21 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond
return (tsdb_query_handle_t)pQueryHandle; return (tsdb_query_handle_t)pQueryHandle;
} }
static int32_t next = 1;
bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) { bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) {
if (next == 0) { STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
STable *pTable = pHandle->pTableQueryInfo->pTableObj;
// no data in cache, abort
if (pTable->mem == NULL && pTable->imem == NULL) {
return false; return false;
} else {
next = 0;
return true;
} }
// all data in mem are checked already.
if (pHandle->pTableQueryInfo->lastKey > pTable->mem->keyLast) {
return false;
}
return true;
} }
static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead,
...@@ -344,9 +362,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { ...@@ -344,9 +362,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
STableIdInfo* idInfo = taosArrayGet(pHandle->pTableIdList, 0); STableIdInfo* idInfo = taosArrayGet(pHandle->pTableIdList, 0);
STableId tableId = {.uid = idInfo->uid, .tid = idInfo->sid}; STable *pTable = pHandle->pTableQueryInfo->pTableObj;
STable *pTable = tsdbIsValidTableToInsert(tsdbGetMeta(pHandle->pTsdb), tableId);
assert(pTable != NULL);
TSKEY skey = 0, ekey = 0; TSKEY skey = 0, ekey = 0;
int32_t rows = 0; int32_t rows = 0;
...@@ -357,8 +373,8 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { ...@@ -357,8 +373,8 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
} }
SDataBlockInfo blockInfo = { SDataBlockInfo blockInfo = {
.uid = tableId.uid, .uid = idInfo->uid,
.sid = tableId.tid, .sid = idInfo->sid,
.size = rows, .size = rows,
.window = {.skey = skey, .ekey = ekey} .window = {.skey = skey, .ekey = ekey}
}; };
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册