提交 c269b6d4 编写于 作者: H hjxilinx

[TD-32] fix error in retrieve data

上级 f629aee1
......@@ -106,7 +106,7 @@ bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo);
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryOnMetric(SSqlCmd* pCmd);
bool tscQueryMetricTags(SQueryInfo* pQueryInfo);
bool tscQueryTags(SQueryInfo* pQueryInfo);
bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd);
void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SColumnIndex* pIndex,
......@@ -176,7 +176,7 @@ void tscIncStreamExecutionCount(void* pStream);
bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId);
// get starter position of metric query condition (query on tags) in SSqlCmd.payload
SCond* tsGetMetricQueryCondPos(STagCond* pCond, uint64_t tableIndex);
SCond* tsGetSTableQueryCondPos(STagCond* pCond, uint64_t tableIndex);
void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str);
void tscTagCondCopy(STagCond* dest, const STagCond* src);
......@@ -207,7 +207,7 @@ void tscFreeSubqueryInfo(SSqlCmd* pCmd);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
void tscGetMetricMetaCacheKey(SQueryInfo* pQueryInfo, char* keyStr, uint64_t uid);
int tscGetMetricMeta(SSqlObj* pSql, int32_t clauseIndex);
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex);
int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
int tscGetMeterMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists);
......
......@@ -70,8 +70,9 @@ typedef struct STableMeta {
typedef struct STableMetaInfo {
STableMeta * pTableMeta; // table meta, cached in client side and acquried by name
SSuperTableMeta *pMetricMeta; // metricmeta
// SSuperTableMeta *pMetricMeta; // metricmeta
SArray* vgroupIdList;
/*
* 1. keep the vnode index during the multi-vnode super table projection query
* 2. keep the vnode index for multi-vnode insertion
......
......@@ -459,7 +459,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
code = tscGetMetricMeta(pSql, 0);
code = tscGetSTableVgroupInfo(pSql, 0);
pRes->code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
......@@ -489,7 +489,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
code = tscGetMetricMeta(pSql, pCmd->clauseIndex);
code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex);
pRes->code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
......
......@@ -286,6 +286,7 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
// todo add order support
static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
#if 0
// the result structure has been completed in sql parse, so we
// only need to reorganize the results in the column format
SSqlCmd * pCmd = &pSql->cmd;
......@@ -337,6 +338,7 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
}
}
#endif
return 0;
}
......@@ -345,7 +347,7 @@ static int tscBuildMetricTagSqlFunctionResult(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
#if 0
SSuperTableMeta *pMetricMeta = tscGetMetaInfo(pQueryInfo, 0)->pMetricMeta;
int32_t totalNumOfResults = 1; // count function only produce one result
int32_t rowLen = tscGetResRowLength(pQueryInfo);
......@@ -369,7 +371,8 @@ static int tscBuildMetricTagSqlFunctionResult(SSqlObj *pSql) {
}
rowIdx++;
}
#endif
return 0;
}
......
......@@ -1295,11 +1295,11 @@ int tsParseInsertSql(SSqlObj *pSql) {
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion) {
int32_t ret = TSDB_CODE_SUCCESS;
if (NULL == pSql->asyncTblPos) {
tscCleanSqlCmd(&pSql->cmd);
} else {
// if (NULL == pSql->asyncTblPos) {
// tscCleanSqlCmd(&pSql->cmd);
// } else {
tscTrace("continue parse sql: %s", pSql->asyncTblPos);
}
// }
if (tscIsInsertOrImportData(pSql->sqlstr)) {
/*
......
......@@ -209,9 +209,14 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
int32_t code = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
assert(pQueryInfo->numOfTables == 0);
// assert(pQueryInfo->numOfTables == 0);
STableMetaInfo* pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo);
STableMetaInfo* pTableMetaInfo = NULL;
if (pQueryInfo->numOfTables == 0) {
pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo);
} else {
pTableMetaInfo = &pQueryInfo->pTableMetaInfo[0];
}
pCmd->command = pInfo->type;
......@@ -639,7 +644,7 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
* check invalid SQL:
* select tbname, tags_fields from super_table_name interval(1s)
*/
if (tscQueryMetricTags(pQueryInfo) && pQueryInfo->intervalTime > 0) {
if (tscQueryTags(pQueryInfo) && pQueryInfo->intervalTime > 0) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
}
......@@ -746,7 +751,7 @@ int32_t setMeterID(STableMetaInfo* pTableMetaInfo, SSQLToken* pzTableName, SSqlO
tscClearMeterMetaInfo(pTableMetaInfo, false);
}
} else {
assert(pTableMetaInfo->pTableMeta == NULL && pTableMetaInfo->pMetricMeta == NULL);
assert(pTableMetaInfo->pTableMeta == NULL);
}
tfree(oldName);
......@@ -1252,7 +1257,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
if (tscQueryMetricTags(pQueryInfo)) { // local handle the metric tag query
if (tscQueryTags(pQueryInfo)) { // local handle the metric tag query
pCmd->count = numOfCols; // the number of meter schema, tricky.
pQueryInfo->command = TSDB_SQL_RETRIEVE_TAGS;
}
......@@ -1293,7 +1298,7 @@ SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t c
int16_t functionId = (int16_t)((colIdx >= numOfCols) ? TSDB_FUNC_TAGPRJ : TSDB_FUNC_PRJ);
if (functionId == TSDB_FUNC_TAGPRJ) {
addRequiredTagColumn(pQueryInfo, colIdx - numOfCols, tableIndex);
// addRequiredTagColumn(pQueryInfo, colIdx - numOfCols, tableIndex);
pQueryInfo->type = TSDB_QUERY_TYPE_STABLE_QUERY;
} else {
pQueryInfo->type = TSDB_QUERY_TYPE_PROJECTION_QUERY;
......@@ -1396,8 +1401,7 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum
SColumnList ids = {0};
ids.ids[0] = *pIndex;
// tag columns do not add to source list
ids.num = (j >= tscGetNumOfColumns(pTableMeta)) ? 0 : 1;
ids.num = 1;
insertResultField(pQueryInfo, startPos + j, &ids, pSchema[j].bytes, pSchema[j].type, pSchema[j].name, pExpr);
}
......@@ -4666,14 +4670,13 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL*
* And then launching multiple async-queries against all qualified virtual nodes, during the first-stage
* query operation.
*/
int32_t code = tscGetMetricMeta(pSql, clauseIndex);
int32_t code = tscGetSTableVgroupInfo(pSql, clauseIndex);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// No tables included. No results generated. Query results are empty.
SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta;
if (pTableMetaInfo->pTableMeta == NULL || pMetricMeta == NULL || pMetricMeta->numOfTables == 0) {
if (pTableMetaInfo->pTableMeta == NULL) {
tscTrace("%p no table in metricmeta, no output result", pSql);
pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
}
......@@ -5687,6 +5690,13 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
}
assert(pQueryInfo->numOfTables == pQuerySql->from->nExpr);
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
int32_t code = tscGetSTableVgroupInfo(pSql, index);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
// parse the group by clause in the first place
if (parseGroupbyClause(pQueryInfo, pQuerySql->pGroupby, pCmd) != TSDB_CODE_SUCCESS) {
......
......@@ -32,7 +32,7 @@ int32_t tscGetNumOfTags(const STableMeta* pTableMeta) {
}
if (pTableMeta->tableType == TSDB_SUPER_TABLE || pTableMeta->tableType == TSDB_CHILD_TABLE) {
assert(tinfo.numOfTags > 0);
assert(tinfo.numOfTags >= 0);
return tinfo.numOfTags;
}
......
......@@ -605,7 +605,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
(*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pTableMetaInfo->pMetricMeta->numOfVnodes);
// (*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pTableMetaInfo->pMetricMeta->numOfVnodes);
if (*pMemBuffer == NULL) {
tscError("%p failed to allocate memory", pSql);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
......@@ -636,10 +636,10 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
pModel = createColumnModel(pSchema, pQueryInfo->exprsInfo.numOfExprs, capacity);
for (int32_t i = 0; i < pTableMetaInfo->pMetricMeta->numOfVnodes; ++i) {
(*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pModel);
(*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL;
}
// for (int32_t i = 0; i < pTableMetaInfo->pMetricMeta->numOfVnodes; ++i) {
// (*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pModel);
// (*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL;
// }
if (createOrderDescriptor(pOrderDesc, pCmd, pModel) != TSDB_CODE_SUCCESS) {
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
......
......@@ -35,7 +35,6 @@ SRpcIpSet tscDnodeIpSet;
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void (*tscUpdateVnodeMsg[TSDB_SQL_MAX])(SSqlObj *pSql, char *buf);
void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0};
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid);
......@@ -188,7 +187,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.pCont = pMsg,
.contLen = pSql->cmd.payloadLen,
.handle = pSql,
.code = 0
.code = 0
};
rpcSendRequest(pVnodeConn, pSql->ipList, &rpcMsg);
} else {
......@@ -315,7 +314,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
pRes->numOfRows += pMsg->affectedRows;
tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code,
*(int32_t *)pRes->pRsp, pRes->rspLen);
pMsg->affectedRows, pRes->rspLen);
} else {
tscTrace("%p cmd:%d code:%s rsp len:%d", pSql, pCmd->command, tstrerror(pRes->code), pRes->rspLen);
}
......@@ -361,7 +360,7 @@ int doProcessSql(SSqlObj *pSql) {
pCmd->command == TSDB_SQL_CONNECT ||
pCmd->command == TSDB_SQL_HB ||
pCmd->command == TSDB_SQL_META ||
pCmd->command == TSDB_SQL_METRIC) {
pCmd->command == TSDB_SQL_STABLEVGROUP) {
tscBuildMsg[pCmd->command](pSql, NULL);
}
......@@ -509,22 +508,6 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_SUCCESS;
}
void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) {
//SSubmitMsg *pShellMsg;
//char * pMsg;
//STableMetaInfo * pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
//STableMeta *pTableMeta = pTableMetaInfo->pTableMeta;
//pMsg = buf + tsRpcHeadSize;
//TODO set iplist
//pShellMsg = (SSubmitMsg *)pMsg;
//pShellMsg->vnode = htons(pTableMeta->vpeerDesc[pSql->index].vnode);
//tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pTableMeta->vpeerDesc[pSql->index].ip),
// htons(pShellMsg->vnode));
}
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSubmitMsg *pShellMsg;
char * pMsg, *pStart;
......@@ -554,24 +537,6 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_SUCCESS;
}
void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) {
//TODO
// SSqlCmd * pCmd = &pSql->cmd;
// STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
//
// char * pStart = buf + tsRpcHeadSize;
// SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
//
// if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { // pColumnModel == NULL, query on meter
// STableMeta *pTableMeta = pTableMetaInfo->pTableMeta;
// pQueryMsg->vnode = htons(pTableMeta->vpeerDesc[pSql->index].vnode);
// } else { // query on metric
// SSuperTableMeta * pMetricMeta = pTableMetaInfo->pMetricMeta;
// SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
// pQueryMsg->vnode = htons(pVnodeSidList->vpeerDesc[pSql->index].vnode);
// }
}
/*
* for meter query, simply return the size <= 1k
* for metric query, estimate size according to meter tags
......@@ -589,7 +554,10 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize;
}
int32_t size = 4096;
#if 0
SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
......@@ -600,45 +568,23 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
if (pQueryInfo->tsBuf != NULL) {
size += pQueryInfo->tsBuf->fileSize;
}
#endif
return size;
}
static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfTables, int32_t vgId, char *pMsg) {
static char *doSerializeTableInfo(SSqlObj *pSql, int32_t vgId, char *pMsg) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
tscTrace("%p vgId:%d, query on %d tables", pSql, vgId, numOfTables);
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
#ifdef _DEBUG_VIEW
tscTrace("%p sid:%d, uid:%" PRIu64, pSql, pTableMetaInfo->pTableMeta->sid, pTableMetaInfo->pTableMeta->uid);
#endif
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->sid = htonl(pTableMeta->sid);
pTableIdInfo->uid = htobe64(pTableMeta->uid);
pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
pMsg += sizeof(STableIdInfo);
} else {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
for (int32_t i = 0; i < numOfTables; ++i) {
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
STableIdInfo *pQueryMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i);
pTableIdInfo->sid = htonl(pQueryMeterInfo->sid);
pTableIdInfo->uid = htobe64(pQueryMeterInfo->uid);
pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pQueryMeterInfo->uid));
pMsg += sizeof(STableIdInfo);
#ifdef _DEBUG_VIEW
tscTrace("%p sid:%d, uid:%" PRId64, pSql, pQueryMeterInfo->sid, pQueryMeterInfo->uid);
#endif
}
}
tscTrace("%p vgId:%d, query on table:%s, uid:%" PRIu64, pSql, vgId, pTableMetaInfo->name, pTableMeta->uid);
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->sid = htonl(pTableMeta->sid);
pTableIdInfo->uid = htobe64(pTableMeta->uid);
pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
pMsg += sizeof(STableIdInfo);
return pMsg;
}
......@@ -655,7 +601,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
// SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
if (pQueryInfo->colList.numOfCols <= 0) {
tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta));
......@@ -678,7 +624,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tscError("%p error vnodeIdx:%d", pSql, pTableMetaInfo->vnodeIndex);
return -1;
}
uint32_t vnodeId = 1;
#if 0
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex);
uint32_t vnodeId = pVnodeSidList->vpeerDesc[pVnodeSidList->index].vnode;
......@@ -687,9 +636,11 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tscError("%p vid:%d,error numOfTables in query message:%d", pSql, vnodeId, numOfTables);
return -1; // error
}
#endif
tscTrace("%p query on vid:%d, number of tables:%d", pSql, vnodeId, numOfTables);
pQueryMsg->head.vgId = htons(vnodeId);
pQueryMsg->head.vgId = htonl(vnodeId);
numOfTables = 1;
}
pQueryMsg->numOfTables = htonl(numOfTables);
......@@ -723,24 +674,23 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { // query on meter
pQueryMsg->tagLength = 0;
} else { // query on super table
pQueryMsg->tagLength = htons(pMetricMeta->tagLen);
pQueryMsg->tagLength = htons(0);
}
pQueryMsg->queryType = htons(pQueryInfo->type);
pQueryMsg->numOfOutputCols = htons(pQueryInfo->exprsInfo.numOfExprs);
if (pQueryInfo->fieldsInfo.numOfOutputCols < 0) {
tscError("%p illegal value of number of output columns in query msg: %d", pSql,
pQueryInfo->fieldsInfo.numOfOutputCols);
int32_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutputCols;
if (numOfOutput < 0) {
tscError("%p illegal value of number of output columns in query msg: %d", pSql, numOfOutput);
return -1;
}
// set column list ids
char * pMsg = (char *)(pQueryMsg->colList) + pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
char *pMsg = (char *)(pQueryMsg->colList) + pQueryInfo->colList.numOfCols * sizeof(SColumnInfo);
SSchema *pSchema = tscGetTableSchema(pTableMeta);
for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
......@@ -848,7 +798,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->colNameLen = htonl(len);
// serialize the table info (sid, uid, tags)
pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->head.vgId), pMsg);
pMsg = doSerializeTableInfo(pSql, htons(pQueryMsg->head.vgId), pMsg);
SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
if (pGroupbyExpr->numOfGroupCols != 0) {
......@@ -1443,11 +1393,15 @@ int tscProcessTagRetrieveRsp(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int32_t numOfRes = 0;
#if 0
if (tscSqlExprGet(pQueryInfo, 0)->functionId == TSDB_FUNC_TAGPRJ) {
numOfRes = pTableMetaInfo->pMetricMeta->numOfTables;
} else {
numOfRes = 1; // for count function, there is only one output.
}
#endif
return tscLocalResultCommonBuilder(pSql, numOfRes);
}
......@@ -1533,8 +1487,7 @@ int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += sizeof(STagData);
}
msgLen = pMsg - (char*)pInfoMsg;
pCmd->payloadLen = msgLen;
pCmd->payloadLen = pMsg - (char*)pInfoMsg;;
pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
tfree(tmpData);
......@@ -1608,7 +1561,9 @@ static int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE);
}
int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
#if 0
SSuperTableMetaMsg *pMetaMsg;
char * pMsg, *pStart;
int msgLen = 0;
......@@ -1671,13 +1626,13 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// convert to unicode before sending to mnode for metric query
int32_t condLen = 0;
if (pTagCond->numOfTagCond > 0) {
SCond *pCond = tsGetMetricQueryCondPos(pTagCond, uid);
SCond *pCond = tsGetSTableQueryCondPos(pTagCond, uid);
if (pCond != NULL && pCond->cond != NULL) {
condLen = strlen(pCond->cond) + 1;
bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
if (!ret) {
tscError("%p mbs to ucs4 failed:%s", pSql, tsGetMetricQueryCondPos(pTagCond, uid));
tscError("%p mbs to ucs4 failed:%s", pSql, tsGetSTableQueryCondPos(pTagCond, uid));
return 0;
}
}
......@@ -1749,7 +1704,18 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
assert(msgLen + minMsgSize() <= size);
#endif
SSqlCmd *pCmd = &pSql->cmd;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pCmd->payload;
strncpy(pStableVgroupMsg->tableId, pTableMetaInfo->name, tListLen(pStableVgroupMsg->tableId));
pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
pCmd->payloadLen = sizeof(SCMSTableVgroupMsg);
return TSDB_CODE_SUCCESS;
}
......@@ -1989,22 +1955,11 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
return TSDB_CODE_SUCCESS;
}
int tscProcessMetricMetaRsp(SSqlObj *pSql) {
SSuperTableMeta *pMeta;
uint8_t ieType;
int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
#if 0
void ** metricMetaList = NULL;
int32_t * sizes = NULL;
char *rsp = pSql->res.pRsp;
ieType = *rsp;
if (ieType != TSDB_IE_TYPE_META) {
tscError("invalid ie type:%d", ieType);
return TSDB_CODE_INVALID_IE;
}
rsp++;
int32_t num = htons(*(int16_t *)rsp);
rsp += sizeof(int16_t);
......@@ -2088,7 +2043,6 @@ int tscProcessMetricMetaRsp(SSqlObj *pSql) {
// release the used metricmeta
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false);
pTableMetaInfo->pMetricMeta = (SSuperTableMeta *)taosCachePut(tscCacheHandle, name, (char *)metricMetaList[i],
sizes[i], tsMetricMetaKeepTimer);
tfree(metricMetaList[i]);
......@@ -2108,7 +2062,23 @@ _error_clean:
free(sizes);
free(metricMetaList);
#endif
SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pSql->res.pRsp;
pStableVgroup->numOfDnodes = htonl(pStableVgroup->numOfDnodes);
SSqlObj* pparent = pSql->param;
assert(pparent != NULL);
SSqlCmd* pCmd = &pparent->cmd;
STableMetaInfo* pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
pInfo->vgroupIdList = taosArrayInit(pStableVgroup->numOfDnodes, sizeof(int32_t));
// todo opt performance
for(int32_t i = 0; i < pStableVgroup->numOfDnodes; ++i) {
taosArrayPush(pInfo->vgroupIdList, &pStableVgroup->dnodeIps[i]);
}
return pSql->res.code;
}
......@@ -2234,7 +2204,7 @@ int tscProcessDropTableRsp(SSqlObj *pSql) {
if (pTableMetaInfo->pTableMeta) {
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true);
// taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true);
}
return 0;
......@@ -2255,7 +2225,7 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
bool isSuperTable = UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo);
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true);
// taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true);
if (isSuperTable) { // if it is a super table, reset whole query cache
tscTrace("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
......@@ -2459,30 +2429,34 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
return code;
}
int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
int code = TSDB_CODE_NETWORK_UNAVAIL;
SSqlCmd *pCmd = &pSql->cmd;
/*
* the query condition is serialized into pCmd->payload, we need to rebuild key for metricmeta info in cache.
*/
bool required = false;
//the query condition is serialized into pCmd->payload, we need to rebuild key for stable meta info in cache.
bool required = false;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
if (pQueryInfo->pTableMetaInfo[0]->vgroupIdList != NULL) {
return TSDB_CODE_SUCCESS;
}
#if 0
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
char tagstr[TSDB_MAX_TAGS_LEN + 1] = {0};
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pTableMetaInfo->pTableMeta->uid);
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false);
// taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false);
SSuperTableMeta *ppMeta = (SSuperTableMeta *)taosCacheAcquireByName(tscCacheHandle, tagstr);
if (ppMeta == NULL) {
required = true;
break;
} else {
pTableMetaInfo->pMetricMeta = ppMeta;
// pTableMetaInfo->pMetricMeta = ppMeta;
}
}
......@@ -2490,12 +2464,13 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
if (!required) {
return TSDB_CODE_SUCCESS;
}
#endif
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew;
pNew->cmd.command = TSDB_SQL_METRIC;
pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
SQueryInfo *pNewQueryInfo = NULL;
if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
......@@ -2532,7 +2507,7 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
// tscFreeSubqueryInfo(pCmd);
// }
tscTrace("%p allocate new pSqlObj:%p to get metricMeta", pSql, pNew);
tscTrace("%p allocate new pSqlObj:%p to get stable vgroupInfo", pSql, pNew);
pNew->fp = tscTableMetaCallBack;
pNew->param = pSql;
code = tscProcessSql(pNew);
......@@ -2569,7 +2544,7 @@ void tscInitMsgsFp() {
tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
tscBuildMsg[TSDB_SQL_METRIC] = tscBuildMetricMetaMsg;
tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
......@@ -2587,7 +2562,7 @@ void tscInitMsgsFp() {
tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
tscProcessMsgRsp[TSDB_SQL_METRIC] = tscProcessMetricMetaRsp;
tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
......@@ -2613,7 +2588,4 @@ void tscInitMsgsFp() {
tscKeepConn[TSDB_SQL_SELECT] = 1;
tscKeepConn[TSDB_SQL_FETCH] = 1;
tscKeepConn[TSDB_SQL_HB] = 1;
tscUpdateVnodeMsg[TSDB_SQL_SELECT] = tscUpdateVnodeInQueryMsg;
tscUpdateVnodeMsg[TSDB_SQL_INSERT] = tscUpdateVnodeInSubmitMsg;
}
......@@ -546,11 +546,11 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
* if the global limitation is not reached, and current result has not exhausted, or next more vnodes are
* available, goes on
*/
if (pMetaInfo->vnodeIndex < pMetaInfo->pMetricMeta->numOfVnodes && pRes1->row < pRes1->numOfRows &&
(!tscHasReachLimitation(pQueryInfo1, pRes1))) {
allSubqueryExhausted = false;
break;
}
// if (pMetaInfo->vnodeIndex < pMetaInfo->pMetricMeta->numOfVnodes && pRes1->row < pRes1->numOfRows &&
// (!tscHasReachLimitation(pQueryInfo1, pRes1))) {
// allSubqueryExhausted = false;
// break;
// }
}
hasData = !allSubqueryExhausted;
......
......@@ -80,7 +80,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
if (code == 0 && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
code = tscGetMetricMeta(pSql, 0);
code = tscGetSTableVgroupInfo(pSql, 0);
pSql->res.code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
......
......@@ -178,11 +178,11 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0, 0);
int numOfTables = 0;
if (!UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta;
for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
numOfTables += pVnodeSidList->numOfSids;
}
// SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta;
// for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
// SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
// numOfTables += pVnodeSidList->numOfSids;
// }
}
SSubscriptionProgress* progress = (SSubscriptionProgress*)calloc(numOfTables, sizeof(SSubscriptionProgress));
......@@ -197,17 +197,17 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
progress[0].uid = uid;
progress[0].key = tscGetSubscriptionProgress(pSub, uid);
} else {
SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta;
numOfTables = 0;
for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
for (int32_t j = 0; j < pVnodeSidList->numOfSids; j++) {
STableIdInfo *pTableMetaInfo = tscGetMeterSidInfo(pVnodeSidList, j);
int64_t uid = pTableMetaInfo->uid;
progress[numOfTables].uid = uid;
progress[numOfTables++].key = tscGetSubscriptionProgress(pSub, uid);
}
}
// SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta;
// numOfTables = 0;
// for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
// SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
// for (int32_t j = 0; j < pVnodeSidList->numOfSids; j++) {
// STableIdInfo *pTableMetaInfo = tscGetMeterSidInfo(pVnodeSidList, j);
// int64_t uid = pTableMetaInfo->uid;
// progress[numOfTables].uid = uid;
// progress[numOfTables++].key = tscGetSubscriptionProgress(pSub, uid);
// }
// }
qsort(progress, numOfTables, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress);
}
......
......@@ -476,7 +476,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
assert(pQueryInfo->numOfTables == 1);
// for projection query, need to try next vnode
int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
int32_t totalVnode = 0;
if ((++pTableMetaInfo->vnodeIndex) < totalVnode) {
tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql,
pTableMetaInfo->vnodeIndex - 1, pTableMetaInfo->vnodeIndex, totalVnode, pRes->numOfTotal);
......@@ -541,16 +542,16 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
assert(pQueryInfo->numOfTables == 1);
// for projection query, need to try next vnode if current vnode is exhausted
if ((++pTableMetaInfo->vnodeIndex) < pTableMetaInfo->pMetricMeta->numOfVnodes) {
pSupporter->pState->numOfCompleted = 0;
pSupporter->pState->numOfTotal = 1;
pSql->cmd.command = TSDB_SQL_SELECT;
pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql);
return;
}
// if ((++pTableMetaInfo->vnodeIndex) < pTableMetaInfo->pMetricMeta->numOfVnodes) {
// pSupporter->pState->numOfCompleted = 0;
// pSupporter->pState->numOfTotal = 1;
//
// pSql->cmd.command = TSDB_SQL_SELECT;
// pSql->fp = tscJoinQueryCallback;
// tscProcessSql(pSql);
//
// return;
// }
}
int32_t numOfTotal = pSupporter->pState->numOfTotal;
......@@ -608,10 +609,10 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
if (pRes->row >= pRes->numOfRows && pTableMetaInfo->vnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes &&
(!tscHasReachLimitation(pQueryInfo, pRes))) {
numOfFetch++;
}
// if (pRes->row >= pRes->numOfRows && pTableMetaInfo->vnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes &&
// (!tscHasReachLimitation(pQueryInfo, pRes))) {
// numOfFetch++;
// }
} else {
if (pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pQueryInfo, pRes))) {
numOfFetch++;
......@@ -788,7 +789,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
* data instead of returning to its invoker
*/
if (pTableMetaInfo->vnodeIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
assert(pTableMetaInfo->vnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes);
// assert(pTableMetaInfo->vnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes);
pSupporter->pState->numOfCompleted = 0; // reset the record value
pSql->fp = joinRetrieveCallback; // continue retrieve data
......@@ -1009,7 +1010,9 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int32_t numOfSubQueries = pTableMetaInfo->pMetricMeta->numOfVnodes;
int32_t numOfSubQueries = 0;
// int32_t numOfSubQueries = pTableMetaInfo->pMetricMeta->numOfVnodes;
assert(numOfSubQueries > 0);
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
......@@ -1260,7 +1263,8 @@ static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfR
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
// SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
SVnodeSidList *vnodeInfo = 0;
SVnodeDesc * pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
if (numOfRows > 0) {
......@@ -1409,10 +1413,10 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SVnodeSidList *vnodeInfo = NULL;
SVnodeDesc * pSvd = NULL;
if (pTableMetaInfo->pMetricMeta != NULL) {
vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
}
// if (pTableMetaInfo->pMetricMeta != NULL) {
// vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
// pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
// }
SSubqueryState* pState = trsupport->pState;
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
......@@ -1453,7 +1457,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
} else {
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNewQueryInfo->pTableMetaInfo[0]->pTableMeta != NULL && pNewQueryInfo->pTableMetaInfo[0]->pMetricMeta != NULL);
// assert(pNewQueryInfo->pTableMetaInfo[0]->pTableMeta != NULL && pNewQueryInfo->pTableMetaInfo[0]->pMetricMeta != NULL);
tscProcessSql(pNew);
return;
}
......
......@@ -53,7 +53,7 @@ void tscGetMetricMetaCacheKey(SQueryInfo* pQueryInfo, char* str, uint64_t uid) {
const int32_t maxKeySize = TSDB_MAX_TAGS_LEN; // allowed max key size
SCond* cond = tsGetMetricQueryCondPos(pTagCond, uid);
SCond* cond = tsGetSTableQueryCondPos(pTagCond, uid);
char join[512] = {0};
if (pTagCond->joinInfo.hasJoin) {
......@@ -92,7 +92,7 @@ void tscGetMetricMetaCacheKey(SQueryInfo* pQueryInfo, char* str, uint64_t uid) {
free(tmp);
}
SCond* tsGetMetricQueryCondPos(STagCond* pTagCond, uint64_t uid) {
SCond* tsGetSTableQueryCondPos(STagCond* pTagCond, uint64_t uid) {
for (int32_t i = 0; i < TSDB_MAX_JOIN_TABLE_NUM; ++i) {
if (uid == pTagCond->cond[i].uid) {
return &pTagCond->cond[i];
......@@ -122,7 +122,7 @@ bool tscQueryOnMetric(SSqlCmd* pCmd) {
(pCmd->msgType == TSDB_MSG_TYPE_QUERY);
}
bool tscQueryMetricTags(SQueryInfo* pQueryInfo) {
bool tscQueryTags(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
if (tscSqlExprGet(pQueryInfo, i)->functionId != TSDB_FUNC_TAGPRJ) {
return false;
......@@ -172,6 +172,7 @@ void tscGetDBInfoFromMeterId(char* tableId, char* db) {
}
SVnodeSidList* tscGetVnodeSidList(SSuperTableMeta* pMetricmeta, int32_t vnodeIdx) {
#if 0
if (pMetricmeta == NULL) {
tscError("illegal metricmeta");
return 0;
......@@ -189,6 +190,8 @@ SVnodeSidList* tscGetVnodeSidList(SSuperTableMeta* pMetricmeta, int32_t vnodeIdx
}
return (SVnodeSidList*)(pMetricmeta->list[vnodeIdx] + (char*)pMetricmeta);
#endif
}
STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) {
......@@ -221,12 +224,12 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
// for select query super table, the metricmeta can not be null in any cases.
if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
assert(pTableMetaInfo->pMetricMeta != NULL);
// assert(pTableMetaInfo->pMetricMeta != NULL);
}
if (pTableMetaInfo->pMetricMeta == NULL) {
return false;
}
// if (pTableMetaInfo->pMetricMeta == NULL) {
// return false;
// }
if ((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE) {
return false;
......@@ -259,12 +262,12 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
}
// only query on tag, not a projection query
if (tscQueryMetricTags(pQueryInfo)) {
if (tscQueryTags(pQueryInfo)) {
return false;
}
// for project query, only the following two function is allowed
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TAG &&
functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_ARITHM) {
......@@ -1925,7 +1928,7 @@ STableMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
}
pTableMetaInfo->pTableMeta = pTableMeta;
pTableMetaInfo->pMetricMeta = pMetricMeta;
// pTableMetaInfo->pMetricMeta = pMetricMeta;
pTableMetaInfo->numOfTags = numOfTags;
if (tags != NULL) {
......@@ -1975,7 +1978,7 @@ void tscClearMeterMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache)
}
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache);
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pMetricMeta), removeFromCache);
// taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pMetricMeta), removeFromCache);
}
void tscResetForNextRetrieve(SSqlRes* pRes) {
......@@ -2114,15 +2117,15 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
STableMeta* pPrevMeterMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta);
SSuperTableMeta* pPrevMetricMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta);
// SSuperTableMeta* pPrevMetricMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta);
pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta, pTableMetaInfo->numOfTags,
pTableMetaInfo->tagColumnIndex);
// pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta, pTableMetaInfo->numOfTags,
// pTableMetaInfo->tagColumnIndex);
}
assert(pFinalInfo->pTableMeta != NULL && pNewQueryInfo->numOfTables == 1);
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
assert(pFinalInfo->pMetricMeta != NULL);
// assert(pFinalInfo->pMetricMeta != NULL);
}
tscTrace(
......@@ -2222,13 +2225,13 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo) || (pTableMetaInfo->pMetricMeta == NULL)) {
// if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo) || (pTableMetaInfo->pMetricMeta == NULL)) {
return false;
}
// }
int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
return pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
(!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vnodeIndex < totalVnode - 1);
// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
// return pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
// (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vnodeIndex < totalVnode - 1);
}
void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
......@@ -2244,7 +2247,8 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
assert(pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes));
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
int32_t totalVnode = 0;
// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
while (++pTableMetaInfo->vnodeIndex < totalVnode) {
tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql,
......
......@@ -261,7 +261,7 @@ static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) {
SQInfo* pQInfo = NULL;
if (pMsg->contLen != 0) {
void* tsdb = dnodeGetVnodeTsdb(pVnode);
int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, NULL, &pQInfo);
int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code;
......
......@@ -28,83 +28,83 @@ extern "C" {
#include "trpc.h"
// message type
#define TSDB_MSG_TYPE_REG 1
#define TSDB_MSG_TYPE_REG_RSP 2
#define TSDB_MSG_TYPE_SUBMIT 3
#define TSDB_MSG_TYPE_SUBMIT_RSP 4
#define TSDB_MSG_TYPE_QUERY 5
#define TSDB_MSG_TYPE_QUERY_RSP 6
#define TSDB_MSG_TYPE_RETRIEVE 7
#define TSDB_MSG_TYPE_RETRIEVE_RSP 8
#define TSDB_MSG_TYPE_REG 1
#define TSDB_MSG_TYPE_REG_RSP 2
#define TSDB_MSG_TYPE_SUBMIT 3
#define TSDB_MSG_TYPE_SUBMIT_RSP 4
#define TSDB_MSG_TYPE_QUERY 5
#define TSDB_MSG_TYPE_QUERY_RSP 6
#define TSDB_MSG_TYPE_RETRIEVE 7
#define TSDB_MSG_TYPE_RETRIEVE_RSP 8
// message from mnode to dnode
#define TSDB_MSG_TYPE_MD_CREATE_TABLE 9
#define TSDB_MSG_TYPE_MD_CREATE_TABLE 9
#define TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP 10
#define TSDB_MSG_TYPE_MD_DROP_TABLE 11
#define TSDB_MSG_TYPE_MD_DROP_TABLE_RSP 12
#define TSDB_MSG_TYPE_MD_ALTER_TABLE 13
#define TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP 14
#define TSDB_MSG_TYPE_MD_CREATE_VNODE 15
#define TSDB_MSG_TYPE_MD_DROP_TABLE 11
#define TSDB_MSG_TYPE_MD_DROP_TABLE_RSP 12
#define TSDB_MSG_TYPE_MD_ALTER_TABLE 13
#define TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP 14
#define TSDB_MSG_TYPE_MD_CREATE_VNODE 15
#define TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP 16
#define TSDB_MSG_TYPE_MD_DROP_VNODE 17
#define TSDB_MSG_TYPE_MD_DROP_VNODE_RSP 18
#define TSDB_MSG_TYPE_MD_ALTER_VNODE 19
#define TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP 20
#define TSDB_MSG_TYPE_MD_DROP_STABLE 21
#define TSDB_MSG_TYPE_MD_DROP_STABLE_RSP 22
#define TSDB_MSG_TYPE_MD_ALTER_STREAM 23
#define TSDB_MSG_TYPE_MD_DROP_VNODE 17
#define TSDB_MSG_TYPE_MD_DROP_VNODE_RSP 18
#define TSDB_MSG_TYPE_MD_ALTER_VNODE 19
#define TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP 20
#define TSDB_MSG_TYPE_MD_DROP_STABLE 21
#define TSDB_MSG_TYPE_MD_DROP_STABLE_RSP 22
#define TSDB_MSG_TYPE_MD_ALTER_STREAM 23
#define TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP 24
#define TSDB_MSG_TYPE_MD_CONFIG_DNODE 25
#define TSDB_MSG_TYPE_MD_CONFIG_DNODE 25
#define TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP 26
// message from client to mnode
#define TSDB_MSG_TYPE_CM_CONNECT 31
#define TSDB_MSG_TYPE_CM_CONNECT_RSP 32
#define TSDB_MSG_TYPE_CM_CREATE_ACCT 33
#define TSDB_MSG_TYPE_CM_CREATE_ACCT_RSP 34
#define TSDB_MSG_TYPE_CM_ALTER_ACCT 35
#define TSDB_MSG_TYPE_CM_ALTER_ACCT_RSP 36
#define TSDB_MSG_TYPE_CM_DROP_ACCT 37
#define TSDB_MSG_TYPE_CM_DROP_ACCT_RSP 38
#define TSDB_MSG_TYPE_CM_CREATE_USER 39
#define TSDB_MSG_TYPE_CM_CREATE_USER_RSP 40
#define TSDB_MSG_TYPE_CM_ALTER_USER 41
#define TSDB_MSG_TYPE_CM_ALTER_USER_RSP 42
#define TSDB_MSG_TYPE_CM_DROP_USER 43
#define TSDB_MSG_TYPE_CM_DROP_USER_RSP 44
#define TSDB_MSG_TYPE_CM_CREATE_DNODE 45
#define TSDB_MSG_TYPE_CM_CONNECT 31
#define TSDB_MSG_TYPE_CM_CONNECT_RSP 32
#define TSDB_MSG_TYPE_CM_CREATE_ACCT 33
#define TSDB_MSG_TYPE_CM_CREATE_ACCT_RSP 34
#define TSDB_MSG_TYPE_CM_ALTER_ACCT 35
#define TSDB_MSG_TYPE_CM_ALTER_ACCT_RSP 36
#define TSDB_MSG_TYPE_CM_DROP_ACCT 37
#define TSDB_MSG_TYPE_CM_DROP_ACCT_RSP 38
#define TSDB_MSG_TYPE_CM_CREATE_USER 39
#define TSDB_MSG_TYPE_CM_CREATE_USER_RSP 40
#define TSDB_MSG_TYPE_CM_ALTER_USER 41
#define TSDB_MSG_TYPE_CM_ALTER_USER_RSP 42
#define TSDB_MSG_TYPE_CM_DROP_USER 43
#define TSDB_MSG_TYPE_CM_DROP_USER_RSP 44
#define TSDB_MSG_TYPE_CM_CREATE_DNODE 45
#define TSDB_MSG_TYPE_CM_CREATE_DNODE_RSP 46
#define TSDB_MSG_TYPE_CM_DROP_DNODE 47
#define TSDB_MSG_TYPE_CM_DROP_DNODE_RSP 48
#define TSDB_MSG_TYPE_CM_CONFIG_DNODE TSDB_MSG_TYPE_MD_CONFIG_DNODE
#define TSDB_MSG_TYPE_CM_DROP_DNODE 47
#define TSDB_MSG_TYPE_CM_DROP_DNODE_RSP 48
#define TSDB_MSG_TYPE_CM_CONFIG_DNODE TSDB_MSG_TYPE_MD_CONFIG_DNODE
#define TSDB_MSG_TYPE_CM_CONFIG_DNODE_RSP TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP
#define TSDB_MSG_TYPE_CM_CREATE_DB 49
#define TSDB_MSG_TYPE_CM_CREATE_DB_RSP 50
#define TSDB_MSG_TYPE_CM_DROP_DB 51
#define TSDB_MSG_TYPE_CM_DROP_DB_RSP 52
#define TSDB_MSG_TYPE_CM_USE_DB 53
#define TSDB_MSG_TYPE_CM_USE_DB_RSP 54
#define TSDB_MSG_TYPE_CM_ALTER_DB 55
#define TSDB_MSG_TYPE_CM_ALTER_DB_RSP 56
#define TSDB_MSG_TYPE_CM_CREATE_TABLE 57
#define TSDB_MSG_TYPE_CM_CREATE_DB 49
#define TSDB_MSG_TYPE_CM_CREATE_DB_RSP 50
#define TSDB_MSG_TYPE_CM_DROP_DB 51
#define TSDB_MSG_TYPE_CM_DROP_DB_RSP 52
#define TSDB_MSG_TYPE_CM_USE_DB 53
#define TSDB_MSG_TYPE_CM_USE_DB_RSP 54
#define TSDB_MSG_TYPE_CM_ALTER_DB 55
#define TSDB_MSG_TYPE_CM_ALTER_DB_RSP 56
#define TSDB_MSG_TYPE_CM_CREATE_TABLE 57
#define TSDB_MSG_TYPE_CM_CREATE_TABLE_RSP 58
#define TSDB_MSG_TYPE_CM_DROP_TABLE 59
#define TSDB_MSG_TYPE_CM_DROP_TABLE_RSP 60
#define TSDB_MSG_TYPE_CM_ALTER_TABLE 61
#define TSDB_MSG_TYPE_CM_ALTER_TABLE_RSP 62
#define TSDB_MSG_TYPE_CM_TABLE_META 63
#define TSDB_MSG_TYPE_CM_TABLE_META_RSP 64
#define TSDB_MSG_TYPE_CM_STABLE_VGROUP 65
#define TSDB_MSG_TYPE_CM_DROP_TABLE 59
#define TSDB_MSG_TYPE_CM_DROP_TABLE_RSP 60
#define TSDB_MSG_TYPE_CM_ALTER_TABLE 61
#define TSDB_MSG_TYPE_CM_ALTER_TABLE_RSP 62
#define TSDB_MSG_TYPE_CM_TABLE_META 63
#define TSDB_MSG_TYPE_CM_TABLE_META_RSP 64
#define TSDB_MSG_TYPE_CM_STABLE_VGROUP 65
#define TSDB_MSG_TYPE_CM_STABLE_VGROUP_RSP 66
#define TSDB_MSG_TYPE_CM_TABLES_META 67
#define TSDB_MSG_TYPE_CM_TABLES_META_RSP 68
#define TSDB_MSG_TYPE_CM_ALTER_STREAM 69
#define TSDB_MSG_TYPE_CM_TABLES_META 67
#define TSDB_MSG_TYPE_CM_TABLES_META_RSP 68
#define TSDB_MSG_TYPE_CM_ALTER_STREAM 69
#define TSDB_MSG_TYPE_CM_ALTER_STREAM_RSP 70
#define TSDB_MSG_TYPE_CM_SHOW 71
#define TSDB_MSG_TYPE_CM_SHOW_RSP 72
#define TSDB_MSG_TYPE_CM_KILL_QUERY 73
#define TSDB_MSG_TYPE_CM_KILL_QUERY_RSP 74
#define TSDB_MSG_TYPE_CM_KILL_STREAM 75
#define TSDB_MSG_TYPE_CM_SHOW 71
#define TSDB_MSG_TYPE_CM_SHOW_RSP 72
#define TSDB_MSG_TYPE_CM_KILL_QUERY 73
#define TSDB_MSG_TYPE_CM_KILL_QUERY_RSP 74
#define TSDB_MSG_TYPE_CM_KILL_STREAM 75
#define TSDB_MSG_TYPE_CM_KILL_STREAM_RSP 76
#define TSDB_MSG_TYPE_CM_KILL_CONN 77
#define TSDB_MSG_TYPE_CM_KILL_CONN_RSP 78
......@@ -622,14 +622,14 @@ typedef struct {
char tableIds[];
} SCMMultiTableInfoMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
} SCMSuperTableInfoMsg;
typedef struct SCMSTableVgroupMsg {
char tableId[TSDB_TABLE_ID_LEN];
} SCMSTableVgroupMsg;
typedef struct {
int32_t numOfDnodes;
uint32_t dnodeIps[];
} SCMSuperTableInfoRsp;
} SCMSTableVgroupRspMsg;
typedef struct {
int16_t elemLen;
......
......@@ -216,7 +216,7 @@ void* mgmtGetSuperTable(char *tableId) {
}
static void *mgmtGetSuperTableVgroup(SSuperTableObj *pStable) {
SCMSuperTableInfoRsp *rsp = rpcMallocCont(sizeof(SCMSuperTableInfoRsp) + sizeof(uint32_t) * mgmtGetDnodesNum());
SCMSTableVgroupRspMsg *rsp = rpcMallocCont(sizeof(SCMSTableVgroupRspMsg) + sizeof(uint32_t) * mgmtGetDnodesNum());
rsp->numOfDnodes = htonl(1);
rsp->dnodeIps[0] = htonl(inet_addr(tsPrivateIp));
return rsp;
......@@ -628,14 +628,14 @@ void mgmtGetSuperTableMeta(SQueuedMsg *pMsg, SSuperTableObj *pTable) {
}
static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) {
SCMSuperTableInfoMsg *pInfo = pMsg->pCont;
SCMSTableVgroupMsg *pInfo = pMsg->pCont;
STableInfo *pTable = mgmtGetSuperTable(pInfo->tableId);
if (pTable == NULL) {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
return;
}
SCMSuperTableInfoRsp *pRsp = mgmtGetSuperTableVgroup((SSuperTableObj *) pTable);
SCMSTableVgroupRspMsg *pRsp = mgmtGetSuperTableVgroup((SSuperTableObj *) pTable);
if (pRsp != NULL) {
int32_t msgLen = sizeof(SSuperTableObj) + htonl(pRsp->numOfDnodes) * sizeof(int32_t);
SRpcMsg rpcRsp = {0};
......
......@@ -54,7 +54,7 @@ enum _sql_type {
TSDB_SQL_CONNECT,
TSDB_SQL_USE_DB, // 30
TSDB_SQL_META,
TSDB_SQL_METRIC,
TSDB_SQL_STABLEVGROUP,
TSDB_SQL_MULTI_META,
TSDB_SQL_HB,
......
......@@ -203,7 +203,7 @@ typedef struct SQInfo {
* @param pQInfo
* @return
*/
int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, void* param, SQInfo** pQInfo);
int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo);
/**
* query on single table
......@@ -211,12 +211,6 @@ int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, void* par
*/
void qTableQuery(SQInfo* pQInfo);
/**
* query on super table
* @param pReadMsg
*/
void qSuperTableQuery(void* pReadMsg);
/**
* wait for the query completed, and retrieve final results to client
* @param pQInfo
......
......@@ -5128,137 +5128,99 @@ static void tableIntervalProcessor(SQInfo *pQInfo) {
// pQInfo->size - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned);
}
void qTableQuery(SQInfo *pQInfo) {
if (pQInfo == NULL || pQInfo->signature != pQInfo) {
dTrace("%p freed abort query", pQInfo);
return;
}
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
if (isQueryKilled(pQInfo)) {
dTrace("QInfo:%p it is already killed, abort", pQInfo);
return;
}
static void singleTableQueryImpl(SQInfo* pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
dTrace("QInfo:%p query task is launched", pQInfo);
if (vnodeHasRemainResults(pQInfo)) {
/*
* There are remain results that are not returned due to result interpolation
* So, we do keep in this procedure instead of launching retrieve procedure for next results.
*/
int32_t numOfInterpo = 0;
int32_t remain = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo);
pQuery->rec.size = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata,
(tFilePage **)pRuntimeEnv->pInterpoBuf, remain, &numOfInterpo);
(tFilePage **)pRuntimeEnv->pInterpoBuf, remain, &numOfInterpo);
doRevisedResultsByLimit(pQInfo);
pQInfo->pointsInterpo += numOfInterpo;
pQuery->rec.size += pQuery->rec.size;
// dTrace("QInfo:%p %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d",
// pQInfo, pQuery->size, numOfInterpo, pQInfo->size, pQInfo->pointsInterpo,
// pQInfo->pointsReturned);
dTrace("QInfo:%p current:%d returned, total:%d", pQInfo, pQuery->rec.size, pQuery->rec.total);
sem_post(&pQInfo->dataReady);
return;
}
// here we have scan all qualified data in both data file and cache
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
// continue to get push data from the group result
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) ||
(pQuery->intervalTime > 0 && pQuery->rec.total < pQuery->limit.limit)) {
((isIntervalQuery(pQuery) && pQuery->rec.total < pQuery->limit.limit))) {
// todo limit the output for interval query?
pQuery->rec.size = 0;
pQInfo->subgroupIdx = 0; // always start from 0
if (pRuntimeEnv->windowResInfo.size > 0) {
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
pQuery->rec.size += pQuery->rec.size;
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx);
if (pQuery->rec.size > 0) {
// dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned %d from group results, totalRead:%d
// totalReturn:%d",
// pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->size,
// pQInfo->size, pQInfo->pointsInterpo, pQInfo->pointsReturned);
dTrace("QInfo:%p %d rows returned from group results, total:%d", pQInfo, pQuery->rec.size, pQuery->rec.total);
sem_post(&pQInfo->dataReady);
return;
}
}
}
// dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode,
// pMeterObj->sid,
// pMeterObj->meterId, pQInfo->size);
dTrace("QInfo:%p query over, %d rows are returned", pQInfo, pQuery->rec.total);
// vnodePrintQueryStatistics(pSupporter);
sem_post(&pQInfo->dataReady);
return;
}
// number of points returned during this query
pQuery->rec.size = 0;
int64_t st = taosGetTimestampUs();
// group by normal column, sliding window query, interval query are handled by interval query processor
if (pQuery->intervalTime != 0 || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation)
// assert(pQuery->checkBufferInLoop == 0 && pQuery->pointsOffset == pQuery->pointsToRead);
tableIntervalProcessor(pQInfo);
} else {
if (isFixedOutputQuery(pQuery)) {
assert(pQuery->checkBufferInLoop == 0);
tableFixedOutputProcessor(pQInfo);
} else { // diff/add/multiply/subtract/division
assert(pQuery->checkBufferInLoop == 1);
tableMultiOutputProcessor(pQInfo);
}
}
// record the total elapsed time
pQInfo->elapsedTime += (taosGetTimestampUs() - st);
/* check if query is killed or not */
if (isQueryKilled(pQInfo)) {
dTrace("QInfo:%p query is killed", pQInfo);
} else {
dTrace("QInfo:%p query task completed, %d points are returned", pQInfo, pQuery->rec.size);
}
sem_post(&pQInfo->dataReady);
// vnodeDecRefCount(pQInfo);
}
void qSuperTableQuery(void *pReadMsg) {
// SQInfo *pQInfo = (SQInfo *)pMsg->ahandle;
//
// if (pQInfo == NULL) {
// return;
// }
// if (pQInfo->killed) {
// vnodeDecRefCount(pQInfo);
// dTrace("QInfo:%p it is already killed, abort", pQInfo);
// return;
// }
// assert(pQInfo->refCount >= 1);
#if 0
SQuery *pQuery = &pQInfo->runtimeEnv.pQuery;
void multiTableQueryImpl(SQInfo* pQInfo) {
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
pQuery->rec.size = 0;
int64_t st = taosGetTimestampUs();
if (pQuery->intervalTime > 0 ||
(isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && !isGroupbyNormalCol(pQuery->pGroupbyExpr))) {
assert(pQuery->checkBufferInLoop == 0);
vnodeMultiMeterQueryProcessor(pQInfo);
} else {
assert((pQuery->checkBufferInLoop == 1 && pQuery->intervalTime == 0) || isPointInterpoQuery(pQuery) ||
......@@ -5267,23 +5229,40 @@ void qSuperTableQuery(void *pReadMsg) {
vnodeSTableSeqProcessor(pQInfo);
}
/* record the total elapsed time */
// record the total elapsed time
pQInfo->elapsedTime += (taosGetTimestampUs() - st);
pQuery->status = isQueryKilled(pQInfo) ? 1 : 0;
// taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->size,
// pQInfo->query.interpoType);
// taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->size, pQInfo->query.interpoType);
if (pQuery->rec.size == 0) {
// pQInfo->over = 1;
// dTrace("QInfo:%p over, %d meters queried, %d points are returned", pQInfo, pSupporter->numOfMeters,
// pQInfo->size);
int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList);
dTrace("QInfo:%p over, %d tables queried, %d points are returned", pQInfo, numOfTables, pQuery->rec.total);
// vnodePrintQueryStatistics(pSupporter);
}
sem_post(&pQInfo->dataReady);
// vnodeDecRefCount(pQInfo);
#endif
}
void qTableQuery(SQInfo *pQInfo) {
if (pQInfo == NULL || pQInfo->signature != pQInfo) {
dTrace("%p freed abort query", pQInfo);
return;
}
if (isQueryKilled(pQInfo)) {
dTrace("QInfo:%p it is already killed, abort", pQInfo);
return;
}
dTrace("QInfo:%p query task is launched", pQInfo);
int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList);
if (numOfTables == 1) {
singleTableQueryImpl(pQInfo);
} else {
multiTableQueryImpl(pQInfo);
}
// vnodeDecRefCount(pQInfo);
}
static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryTableMsg, SSqlFuncExprMsg *pExprMsg) {
......@@ -6064,7 +6043,7 @@ _error:
return code;
}
int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, void* param, SQInfo **pQInfo) {
int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) {
assert(pQueryTableMsg != NULL);
int32_t code = TSDB_CODE_SUCCESS;
......@@ -6098,12 +6077,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, void* param
goto _query_over;
}
if (QUERY_IS_STABLE_QUERY(pQueryTableMsg->queryType)) {
// pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryTableMsg, &code);
} else {
code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, tsdb, pQInfo);
// (*pQInfo)->param = param;
}
code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, tsdb, pQInfo);
_query_over:
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -373,8 +373,27 @@ static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable) {
return 0;
}
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
assert(pTable->type == TSDB_CHILD_TABLE);
// TODO
assert(pTable->type == TSDB_CHILD_TABLE && pTable != NULL);
STable* pSTable = tsdbGetTableByUid(pMeta, pTable->superUid);
assert(pSTable != NULL);
int32_t level = 0;
int32_t headSize = 0;
// first tag column
STColumn* s = pSTable->tagSchema->columns[0]; //???
tSkipListRandNodeInfo(pSTable->pIndex, &level, &headSize);
SSkipListNode* pNode = calloc(1, headSize + s->bytes + POINTER_BYTES);
pNode->level = level;
SSkipList* list = pSTable->pIndex;
memcpy(SL_GET_NODE_KEY(list, pNode), dataRowTuple(pTable->tagVal), s->columns[0].bytes);
memcpy(SL_GET_NODE_DATA(pNode), &pTable, POINTER_BYTES);
tSkipListPut(list, pNode);
return 0;
}
......
......@@ -275,8 +275,9 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond
STableIdInfo* idInfo = taosArrayGet(pQueryHandle->pTableIdList, 0);
STableId tableId = {.uid = idInfo->uid, .tid = idInfo->sid};
STable *pTable = tsdbIsValidTableToInsert(tsdbGetMeta(pQueryHandle->pTsdb), tableId);
STable *pTable = tsdbGetTableByUid(tsdbGetMeta(pQueryHandle->pTsdb), idInfo->uid);
assert(pTable != NULL);
pTableQRec->pTableObj = pTable;
// malloc buffer in order to load data from file
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册