未验证 提交 424d9bfd 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1397 from taosdata/liaohj_2

Liaohj 2
...@@ -85,7 +85,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, ...@@ -85,7 +85,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
STableDataBlocks** dataBlocks); STableDataBlocks** dataBlocks);
SVnodeSidList* tscGetVnodeSidList(SSuperTableMeta* pMetricmeta, int32_t vnodeIdx); SVnodeSidList* tscGetVnodeSidList(SSuperTableMeta* pMetricmeta, int32_t vnodeIdx);
STableSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx); STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
/** /**
* *
......
...@@ -318,15 +318,16 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) { ...@@ -318,15 +318,16 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
SVnodeSidList *pSidList = (SVnodeSidList *)((char *)pMetricMeta + pMetricMeta->list[i]); SVnodeSidList *pSidList = (SVnodeSidList *)((char *)pMetricMeta + pMetricMeta->list[i]);
for (int32_t j = 0; j < pSidList->numOfSids; ++j) { for (int32_t j = 0; j < pSidList->numOfSids; ++j) {
STableSidExtInfo *pSidExt = tscGetMeterSidInfo(pSidList, j); STableIdInfo *pSidExt = tscGetMeterSidInfo(pSidList, j);
for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutputCols; ++k) { for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutputCols; ++k) {
SColIndexEx *pColIndex = &tscSqlExprGet(pQueryInfo, k)->colInfo; SColIndexEx *pColIndex = &tscSqlExprGet(pQueryInfo, k)->colInfo;
int16_t offsetId = pColIndex->colIdx; int16_t offsetId = pColIndex->colIdx;
assert((pColIndex->flag & TSDB_COL_TAG) != 0); assert((pColIndex->flag & TSDB_COL_TAG) != 0);
assert(0);
char * val = pSidExt->tags + vOffset[offsetId];
char * val = NULL;//pSidExt->tags + vOffset[offsetId];
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, k); TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, k);
memcpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, k) * totalNumOfResults + pField->bytes * rowIdx, val, memcpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, k) * totalNumOfResults + pField->bytes * rowIdx, val,
......
...@@ -598,7 +598,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { ...@@ -598,7 +598,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta; SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex); SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(STableSidExtInfo)) * pVnodeSidList->numOfSids; int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(STableIdInfo)) * pVnodeSidList->numOfSids;
int32_t outputColumnSize = pQueryInfo->exprsInfo.numOfExprs * sizeof(SSqlFuncExprMsg); int32_t outputColumnSize = pQueryInfo->exprsInfo.numOfExprs * sizeof(SSqlFuncExprMsg);
int32_t size = meterInfoSize + outputColumnSize + srcColListSize + exprSize + MIN_QUERY_MSG_PKT_SIZE; int32_t size = meterInfoSize + outputColumnSize + srcColListSize + exprSize + MIN_QUERY_MSG_PKT_SIZE;
...@@ -609,37 +609,34 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { ...@@ -609,37 +609,34 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
return size; return size;
} }
static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfTables, int32_t vnodeId, char *pMsg) { static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfTables, int32_t vgId, char *pMsg) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta; SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
tscTrace("%p vid:%d, query on %d meters", pSql, vnodeId, numOfTables); tscTrace("%p vgId:%d, query on %d tables", pSql, vgId, numOfTables);
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
tscTrace("%p sid:%d, uid:%" PRIu64, pSql, pTableMetaInfo->pTableMeta->sid, pTableMetaInfo->pTableMeta->uid); tscTrace("%p sid:%d, uid:%" PRIu64, pSql, pTableMetaInfo->pTableMeta->sid, pTableMetaInfo->pTableMeta->uid);
#endif #endif
STableSidExtInfo *pTableMetaInfo = (STableSidExtInfo *)pMsg; STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableMetaInfo->sid = htonl(pTableMeta->sid); pTableIdInfo->sid = htonl(pTableMeta->sid);
pTableMetaInfo->uid = htobe64(pTableMeta->uid); pTableIdInfo->uid = htobe64(pTableMeta->uid);
pTableMetaInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid)); pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
pMsg += sizeof(STableSidExtInfo); pMsg += sizeof(STableIdInfo);
} else { } else {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex); SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
for (int32_t i = 0; i < numOfTables; ++i) { for (int32_t i = 0; i < numOfTables; ++i) {
STableSidExtInfo *pTableMetaInfo = (STableSidExtInfo *)pMsg; STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
STableSidExtInfo *pQueryMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i); STableIdInfo *pQueryMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i);
pTableMetaInfo->sid = htonl(pQueryMeterInfo->sid); pTableIdInfo->sid = htonl(pQueryMeterInfo->sid);
pTableMetaInfo->uid = htobe64(pQueryMeterInfo->uid); pTableIdInfo->uid = htobe64(pQueryMeterInfo->uid);
pTableMetaInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pQueryMeterInfo->uid)); pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pQueryMeterInfo->uid));
pMsg += sizeof(STableSidExtInfo); pMsg += sizeof(STableIdInfo);
memcpy(pMsg, pQueryMeterInfo->tags, pMetricMeta->tagLen);
pMsg += pMetricMeta->tagLen;
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
tscTrace("%p sid:%d, uid:%" PRId64, pSql, pQueryMeterInfo->sid, pQueryMeterInfo->uid); tscTrace("%p sid:%d, uid:%" PRId64, pSql, pQueryMeterInfo->sid, pQueryMeterInfo->uid);
...@@ -679,6 +676,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -679,6 +676,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->uid = pTableMeta->uid; pQueryMsg->uid = pTableMeta->uid;
pQueryMsg->numOfTagsCols = 0; pQueryMsg->numOfTagsCols = 0;
pQueryMsg->vgId = htonl(pTableMeta->vgid);
tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name); tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name);
} else { // query on super table } else { // query on super table
if (pTableMetaInfo->vnodeIndex < 0) { if (pTableMetaInfo->vnodeIndex < 0) {
...@@ -696,7 +694,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -696,7 +694,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
tscTrace("%p query on vid:%d, number of tables:%d", pSql, vnodeId, numOfTables); tscTrace("%p query on vid:%d, number of tables:%d", pSql, vnodeId, numOfTables);
pQueryMsg->vnode = htons(vnodeId); pQueryMsg->vgId = htons(vnodeId);
} }
pQueryMsg->numOfTables = htonl(numOfTables); pQueryMsg->numOfTables = htonl(numOfTables);
...@@ -764,14 +762,14 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -764,14 +762,14 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SColumnBase *pCol = tscColumnBaseInfoGet(&pQueryInfo->colList, i); SColumnBase *pCol = tscColumnBaseInfoGet(&pQueryInfo->colList, i);
SSchema * pColSchema = &pSchema[pCol->colIndex.columnIndex]; SSchema * pColSchema = &pSchema[pCol->colIndex.columnIndex];
if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL || // if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
pColSchema->type > TSDB_DATA_TYPE_NCHAR) { // pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
tscError("%p vid:%d sid:%d id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", pSql, // tscError("%p vid:%d sid:%d id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", pSql,
htons(pQueryMsg->vnode), pTableMeta->sid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex, // htons(pQueryMsg->vnode), pTableMeta->sid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex,
pColSchema->name); // pColSchema->name);
//
return -1; // 0 means build msg failed // return -1; // 0 means build msg failed
} // }
pQueryMsg->colList[i].colId = htons(pColSchema->colId); pQueryMsg->colList[i].colId = htons(pColSchema->colId);
pQueryMsg->colList[i].bytes = htons(pColSchema->bytes); pQueryMsg->colList[i].bytes = htons(pColSchema->bytes);
...@@ -865,7 +863,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -865,7 +863,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->colNameLen = htonl(len); pQueryMsg->colNameLen = htonl(len);
// serialize the table info (sid, uid, tags) // serialize the table info (sid, uid, tags)
pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->vnode), pMsg); pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->vgId), pMsg);
// only include the required tag column schema. If a tag is not required, it won't be sent to vnode // only include the required tag column schema. If a tag is not required, it won't be sent to vnode
if (pTableMetaInfo->numOfTags > 0) { if (pTableMetaInfo->numOfTags > 0) {
...@@ -946,7 +944,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -946,7 +944,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tscTrace("%p msg built success,len:%d bytes", pSql, msgLen); tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
pCmd->payloadLen = msgLen; pCmd->payloadLen = msgLen;
pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY; pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
pQueryMsg->contLen = htonl(msgLen);
assert(msgLen + minMsgSize() <= size); assert(msgLen + minMsgSize() <= size);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2067,7 +2067,7 @@ int tscProcessMetricMetaRsp(SSqlObj *pSql) { ...@@ -2067,7 +2067,7 @@ int tscProcessMetricMetaRsp(SSqlObj *pSql) {
pMeta->numOfVnodes = htonl(pMeta->numOfVnodes); pMeta->numOfVnodes = htonl(pMeta->numOfVnodes);
pMeta->tagLen = htons(pMeta->tagLen); pMeta->tagLen = htons(pMeta->tagLen);
size += pMeta->numOfVnodes * sizeof(SVnodeSidList *) + pMeta->numOfTables * sizeof(STableSidExtInfo *); size += pMeta->numOfVnodes * sizeof(SVnodeSidList *) + pMeta->numOfTables * sizeof(STableIdInfo *);
char *pBuf = calloc(1, size); char *pBuf = calloc(1, size);
if (pBuf == NULL) { if (pBuf == NULL) {
...@@ -2093,16 +2093,16 @@ int tscProcessMetricMetaRsp(SSqlObj *pSql) { ...@@ -2093,16 +2093,16 @@ int tscProcessMetricMetaRsp(SSqlObj *pSql) {
tscTrace("%p metricmeta:vid:%d,numOfTables:%d", pSql, i, pLists->numOfSids); tscTrace("%p metricmeta:vid:%d,numOfTables:%d", pSql, i, pLists->numOfSids);
pBuf += sizeof(SVnodeSidList) + sizeof(STableSidExtInfo *) * pSidLists->numOfSids; pBuf += sizeof(SVnodeSidList) + sizeof(STableIdInfo *) * pSidLists->numOfSids;
rsp += sizeof(SVnodeSidList); rsp += sizeof(SVnodeSidList);
size_t elemSize = sizeof(STableSidExtInfo) + pNewMetricMeta->tagLen; size_t elemSize = sizeof(STableIdInfo) + pNewMetricMeta->tagLen;
for (int32_t j = 0; j < pSidLists->numOfSids; ++j) { for (int32_t j = 0; j < pSidLists->numOfSids; ++j) {
pLists->pSidExtInfoList[j] = pBuf - (char *)pLists; pLists->pSidExtInfoList[j] = pBuf - (char *)pLists;
memcpy(pBuf, rsp, elemSize); memcpy(pBuf, rsp, elemSize);
((STableSidExtInfo *)pBuf)->uid = htobe64(((STableSidExtInfo *)pBuf)->uid); ((STableIdInfo *)pBuf)->uid = htobe64(((STableIdInfo *)pBuf)->uid);
((STableSidExtInfo *)pBuf)->sid = htonl(((STableSidExtInfo *)pBuf)->sid); ((STableIdInfo *)pBuf)->sid = htonl(((STableIdInfo *)pBuf)->sid);
rsp += elemSize; rsp += elemSize;
pBuf += elemSize; pBuf += elemSize;
......
...@@ -202,7 +202,7 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { ...@@ -202,7 +202,7 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) { for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i); SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
for (int32_t j = 0; j < pVnodeSidList->numOfSids; j++) { for (int32_t j = 0; j < pVnodeSidList->numOfSids; j++) {
STableSidExtInfo *pTableMetaInfo = tscGetMeterSidInfo(pVnodeSidList, j); STableIdInfo *pTableMetaInfo = tscGetMeterSidInfo(pVnodeSidList, j);
int64_t uid = pTableMetaInfo->uid; int64_t uid = pTableMetaInfo->uid;
progress[numOfTables].uid = uid; progress[numOfTables].uid = uid;
progress[numOfTables++].key = tscGetSubscriptionProgress(pSub, uid); progress[numOfTables++].key = tscGetSubscriptionProgress(pSub, uid);
......
...@@ -191,7 +191,7 @@ SVnodeSidList* tscGetVnodeSidList(SSuperTableMeta* pMetricmeta, int32_t vnodeIdx ...@@ -191,7 +191,7 @@ SVnodeSidList* tscGetVnodeSidList(SSuperTableMeta* pMetricmeta, int32_t vnodeIdx
return (SVnodeSidList*)(pMetricmeta->list[vnodeIdx] + (char*)pMetricmeta); return (SVnodeSidList*)(pMetricmeta->list[vnodeIdx] + (char*)pMetricmeta);
} }
STableSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) { STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) {
if (pSidList == NULL) { if (pSidList == NULL) {
tscError("illegal sidlist"); tscError("illegal sidlist");
return 0; return 0;
...@@ -206,7 +206,7 @@ STableSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) { ...@@ -206,7 +206,7 @@ STableSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) {
assert(pSidList->pSidExtInfoList[idx] >= 0); assert(pSidList->pSidExtInfoList[idx] >= 0);
return (STableSidExtInfo*)(pSidList->pSidExtInfoList[idx] + (char*)pSidList); return (STableIdInfo*)(pSidList->pSidExtInfoList[idx] + (char*)pSidList);
} }
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
......
...@@ -15,13 +15,16 @@ ...@@ -15,13 +15,16 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tlog.h" #include "tlog.h"
#include "tqueue.h" #include "tqueue.h"
#include "trpc.h" #include "trpc.h"
#include "dnodeRead.h"
#include "dnodeMgmt.h" #include "dnodeMgmt.h"
#include "dnodeRead.h"
#include "queryExecutor.h"
typedef struct { typedef struct {
int32_t code; int32_t code;
...@@ -88,7 +91,7 @@ void dnodeRead(SRpcMsg *pMsg) { ...@@ -88,7 +91,7 @@ void dnodeRead(SRpcMsg *pMsg) {
while (leftLen > 0) { while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *) pCont; SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = 1; //htonl(pHead->vgId); pHead->vgId = 1;//htonl(pHead->vgId);
pHead->contLen = pMsg->contLen; //htonl(pHead->contLen); pHead->contLen = pMsg->contLen; //htonl(pHead->contLen);
void *pVnode = dnodeGetVnode(pHead->vgId); void *pVnode = dnodeGetVnode(pHead->vgId);
...@@ -223,20 +226,26 @@ static void dnodeProcessReadResult(SReadMsg *pRead) { ...@@ -223,20 +226,26 @@ static void dnodeProcessReadResult(SReadMsg *pRead) {
} }
static void dnodeProcessQueryMsg(SReadMsg *pMsg) { static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
void *pQInfo = (void*)100; SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont;
dTrace("query msg is disposed, qInfo:%p", pQInfo);
SQInfo* pQInfo = NULL;
int32_t code = qCreateQueryInfo(pQueryTableMsg, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = 0; pRsp->code = code;
pRsp->qhandle = htobe64((uint64_t) (pQInfo)); pRsp->qhandle = htobe64((uint64_t) (pQInfo));
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle, .handle = pMsg->rpcMsg.handle,
.pCont = pRsp, .pCont = pRsp,
.contLen = sizeof(SQueryTableRsp), .contLen = sizeof(SQueryTableRsp),
.code = 0, .code = code,
.msgType = 0 .msgType = 0
}; };
// do execute query
qTableQuery(pQInfo);
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
...@@ -245,21 +254,51 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { ...@@ -245,21 +254,51 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
void *pQInfo = htobe64(pRetrieve->qhandle); void *pQInfo = htobe64(pRetrieve->qhandle);
dTrace("retrieve msg is disposed, qInfo:%p", pQInfo); dTrace("retrieve msg is disposed, qInfo:%p", pQInfo);
assert(pQInfo != NULL); int32_t rowSize = 0;
int32_t contLen = 100; int32_t numOfRows = 0;
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *) rpcMallocCont(contLen); int32_t contLen = 0;
pRsp->numOfRows = 0;
pRsp->precision = 0; SRpcMsg rpcRsp = {0};
pRsp->offset = 0;
pRsp->useconds = 0; int32_t code = qRetrieveQueryResultInfo(pQInfo, &numOfRows, &rowSize);
if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rpcRsp = { contLen = sizeof(SRetrieveTableRsp);
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp, SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen);
.contLen = contLen, pRsp->numOfRows = 0;
.code = 0, pRsp->precision = 0;
.msgType = 0 pRsp->offset = 0;
}; pRsp->useconds = 0;
rpcRsp = (SRpcMsg) {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = contLen,
.code = code,
.msgType = 0
};
//todo free qinfo
} else {
contLen = 100;
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen);
pRsp->numOfRows = 0;
pRsp->precision = 0;
pRsp->offset = 0;
pRsp->useconds = 0;
*(int64_t*) pRsp->data = 1000;
rpcRsp = (SRpcMsg) {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = contLen,
.code = code,
.msgType = 0
};
}
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
...@@ -434,15 +434,11 @@ typedef struct SColumnInfo { ...@@ -434,15 +434,11 @@ typedef struct SColumnInfo {
SColumnFilterInfo *filters; SColumnFilterInfo *filters;
} SColumnInfo; } SColumnInfo;
/* typedef struct STableIdInfo {
* enable vnode to understand how to group several tables with different tag;
*/
typedef struct STableSidExtInfo {
int32_t sid; int32_t sid;
int64_t uid; int64_t uid;
TSKEY key; // key for subscription TSKEY key; // last accessed ts, for subscription
char tags[]; } STableIdInfo;
} STableSidExtInfo;
typedef struct STimeWindow { typedef struct STimeWindow {
TSKEY skey; TSKEY skey;
...@@ -455,10 +451,10 @@ typedef struct STimeWindow { ...@@ -455,10 +451,10 @@ typedef struct STimeWindow {
* the outputCols will be 3 while the numOfCols is 1. * the outputCols will be 3 while the numOfCols is 1.
*/ */
typedef struct { typedef struct {
int16_t vnode; int32_t contLen; // msg header
int16_t vgId;
int32_t numOfTables; int32_t numOfTables;
uint64_t pSidExtInfo; // table id & tag info ptr, in windows pointer may
uint64_t uid; uint64_t uid;
STimeWindow window; STimeWindow window;
...@@ -504,7 +500,7 @@ typedef struct { ...@@ -504,7 +500,7 @@ typedef struct {
} SQueryTableMsg; } SQueryTableMsg;
typedef struct { typedef struct {
char code; int32_t code;
uint64_t qhandle; uint64_t qhandle;
} SQueryTableRsp; } SQueryTableRsp;
...@@ -670,7 +666,7 @@ typedef struct { ...@@ -670,7 +666,7 @@ typedef struct {
SVnodeDesc vpeerDesc[TSDB_VNODES_SUPPORT]; SVnodeDesc vpeerDesc[TSDB_VNODES_SUPPORT];
int16_t index; // used locally int16_t index; // used locally
int32_t numOfSids; int32_t numOfSids;
int32_t pSidExtInfoList[]; // offset value of STableSidExtInfo int32_t pSidExtInfoList[]; // offset value of STableIdInfo
} SVnodeSidList; } SVnodeSidList;
typedef struct { typedef struct {
......
...@@ -11,5 +11,5 @@ INCLUDE_DIRECTORIES(inc) ...@@ -11,5 +11,5 @@ INCLUDE_DIRECTORIES(inc)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(query ${SRC}) ADD_LIBRARY(query ${SRC})
TARGET_LINK_LIBRARIES(query tutil m rt) TARGET_LINK_LIBRARIES(query tsdb tutil m rt)
ENDIF () ENDIF ()
\ No newline at end of file
...@@ -124,9 +124,8 @@ typedef struct tTagSchema { ...@@ -124,9 +124,8 @@ typedef struct tTagSchema {
typedef struct tSidSet { typedef struct tSidSet {
int32_t numOfSids; int32_t numOfSids;
int32_t numOfSubSet; int32_t numOfSubSet;
STableSidExtInfo **pSids; STableIdInfo **pTableIdList;
int32_t * starterPos; // position of each subgroup, generated according to int32_t * starterPos; // position of each subgroup, generated according to
SColumnModel *pColumnModel; SColumnModel *pColumnModel;
SColumnOrderInfo orderIdx; SColumnOrderInfo orderIdx;
} tSidSet; } tSidSet;
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "taosdef.h" #include "taosdef.h"
#include "tref.h" #include "tref.h"
#include "tsqlfunction.h" #include "tsqlfunction.h"
#include "tarray.h"
typedef struct SData { typedef struct SData {
int32_t num; int32_t num;
...@@ -39,7 +40,7 @@ enum { ...@@ -39,7 +40,7 @@ enum {
struct SColumnFilterElem; struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2); typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2);
typedef int (*__block_search_fn_t)(char* data, int num, int64_t key, int order); typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
typedef struct SSqlGroupbyExpr { typedef struct SSqlGroupbyExpr {
int16_t tableIndex; int16_t tableIndex;
...@@ -142,7 +143,7 @@ typedef struct SQuery { ...@@ -142,7 +143,7 @@ typedef struct SQuery {
int32_t pos; int32_t pos;
int64_t pointsOffset; // the number of points offset to save read data int64_t pointsOffset; // the number of points offset to save read data
SData** sdata; SData** sdata;
int32_t capacity;
SSingleColumnFilterInfo* pFilterInfo; SSingleColumnFilterInfo* pFilterInfo;
} SQuery; } SQuery;
...@@ -152,7 +153,6 @@ typedef struct SQueryCostSummary { ...@@ -152,7 +153,6 @@ typedef struct SQueryCostSummary {
typedef struct SQueryRuntimeEnv { typedef struct SQueryRuntimeEnv {
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo
SQuery* pQuery; SQuery* pQuery;
void* pTabObj;
SData** pInterpoBuf; SData** pInterpoBuf;
SQLFunctionCtx* pCtx; SQLFunctionCtx* pCtx;
int16_t numOfRowsPerPage; int16_t numOfRowsPerPage;
...@@ -174,16 +174,17 @@ typedef struct SQInfo { ...@@ -174,16 +174,17 @@ typedef struct SQInfo {
TSKEY startTime; TSKEY startTime;
int64_t elapsedTime; int64_t elapsedTime;
SResultRec rec; SResultRec rec;
int pointsReturned; int32_t pointsReturned;
int pointsInterpo; int32_t pointsInterpo;
int code; // error code to returned to client int32_t code; // error code to returned to client
int32_t killed; // denotes if current query is killed
sem_t dataReady; sem_t dataReady;
SHashObj* pTableList; // table list SArray* pTableIdList; // table list
SQueryRuntimeEnv runtimeEnv; SQueryRuntimeEnv runtimeEnv;
int32_t subgroupIdx; int32_t subgroupIdx;
int32_t offset; /* offset in group result set of subgroup */ int32_t offset; /* offset in group result set of subgroup */
tSidSet* pSidSet; // tSidSet* pSidSet;
T_REF_DECLARE() T_REF_DECLARE()
/* /*
* the query is executed position on which meter of the whole list. * the query is executed position on which meter of the whole list.
...@@ -204,13 +205,13 @@ typedef struct SQInfo { ...@@ -204,13 +205,13 @@ typedef struct SQInfo {
* @param pQInfo * @param pQInfo
* @return * @return
*/ */
int32_t qCreateQueryInfo(void* pReadMsg, SQInfo** pQInfo); int32_t qCreateQueryInfo(SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo);
/** /**
* query on single table * query on single table
* @param pReadMsg * @param pReadMsg
*/ */
void qTableQuery(void* pReadMsg); void qTableQuery(SQInfo* pQInfo);
/** /**
* query on super table * query on super table
...@@ -218,4 +219,13 @@ void qTableQuery(void* pReadMsg); ...@@ -218,4 +219,13 @@ void qTableQuery(void* pReadMsg);
*/ */
void qSuperTableQuery(void* pReadMsg); void qSuperTableQuery(void* pReadMsg);
/**
* wait for the query completed, and retrieve final results to client
* @param pQInfo
*/
int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize);
//int32_t qBuildQueryResult(SQInfo* pQInfo, void* pBuf);
#endif // TDENGINE_QUERYEXECUTOR_H #endif // TDENGINE_QUERYEXECUTOR_H
此差异已折叠。
...@@ -16,3 +16,51 @@ ...@@ -16,3 +16,51 @@
#include "os.h" #include "os.h"
#include "tsdb.h" #include "tsdb.h"
tsdb_query_handle_t *tsdbQueryByTableId(STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo) {
}
bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) {
return false;
}
SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
}
int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SDataStatis **pBlockStatis) {
}
SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList) {
}
int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow* window, tsdbpos_t position, int16_t order) {
}
int32_t tsdbDataBlockSeek(tsdb_query_handle_t *pQueryHandle, tsdbpos_t pos) {
}
tsdbpos_t tsdbDataBlockTell(tsdb_query_handle_t *pQueryHandle) {
return NULL;
}
SArray *tsdbRetrieveDataRow(tsdb_query_handle_t *pQueryHandle, SArray *pIdList, SQueryRowCond *pCond) {
}
tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stableId, const char *pTagFilterStr) {
}
STableIDList *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle) {
}
STableIDList *tsdbQueryTableList(int16_t stableId, const char *pTagCond) {
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册