提交 dfcb2f8e 编写于 作者: H Haojun Liao

[td-225] refactor

上级 9326bee4
......@@ -93,8 +93,8 @@ typedef struct SVgroupTableInfo {
SArray *itemList; // SArray<STableIdInfo>
} SVgroupTableInfo;
static FORCE_INLINE SQueryInfo* tscGetQueryInfo(SSqlCmd* pCmd, int32_t subClauseIndex) {
assert(pCmd != NULL && subClauseIndex >= 0);
static FORCE_INLINE SQueryInfo* tscGetQueryInfo(SSqlCmd* pCmd) {
assert(pCmd != NULL);
if (pCmd->pQueryInfo == NULL) {
return NULL;
}
......@@ -175,13 +175,13 @@ void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo);
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);
void tscFieldInfoClear(SFieldInfo* pFieldInfo);
void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc);
static FORCE_INLINE int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQueryInfo->fieldsInfo.numOfOutput; }
int32_t tscFieldInfoCompare(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2);
void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, uint64_t uid);
int32_t tscGetResRowLength(SArray* pExprList);
SExprInfo* tscExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
......@@ -211,6 +211,7 @@ bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid);
SColumn* tscColumnListInsert(SArray* pColumnList, int32_t columnIndex, uint64_t uid, SSchema* pSchema);
void tscColumnListDestroy(SArray* pColList);
void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid);
void tscColumnListCopyAll(SArray* dst, const SArray* src);
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo);
......@@ -232,13 +233,13 @@ void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo);
bool tscShouldBeFreed(SSqlObj* pSql);
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex);
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t tableIndex);
STableMetaInfo* tscGetMetaInfo(SQueryInfo *pQueryInfo, int32_t tableIndex);
void tscInitQueryInfo(SQueryInfo* pQueryInfo);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
int32_t tscAddQueryInfo(SSqlCmd *pCmd);
SQueryInfo *tscGetQueryInfo(SSqlCmd* pCmd, int32_t subClauseIndex);
SQueryInfo *tscGetQueryInfo(SSqlCmd* pCmd);
SQueryInfo *tscGetQueryInfoS(SSqlCmd *pCmd, int32_t subClauseIndex);
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo);
......@@ -308,6 +309,8 @@ int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupList);
int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length);
bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx);
bool tscSetSqlOwner(SSqlObj* pSql);
void tscClearSqlOwner(SSqlObj* pSql);
int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize);
......
......@@ -283,9 +283,9 @@ typedef struct {
int32_t numOfParams;
int8_t dataSourceType; // load data from file or not
char reserve4[3]; // fix bus error on arm32
char reserve4[3]; // fix bus error on arm32
int8_t submitSchema; // submit block is built with table schema
char reserve5[3]; // fix bus error on arm32
char reserve5[3]; // fix bus error on arm32
STagData tagData; // NOTE: pTagData->data is used as a variant length array
SName **pTableNameList; // all involved tableMeta list of current insert sql statement.
......@@ -452,7 +452,7 @@ int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock);
void handleDownstreamOperator(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void handleDownstreamOperator(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSqlRes* pOutput);
void destroyTableNameList(SSqlCmd* pCmd);
void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta);
......
......@@ -69,7 +69,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
executeQuery(pSql, pQueryInfo);
}
......@@ -376,7 +376,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
tscDebug("0x%"PRIx64" get %s successfully", pSql->self, msg);
if (pSql->pStream == NULL) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
// check if it is a sub-query of super table query first, if true, enter another routine
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY|TSDB_QUERY_TYPE_SUBQUERY|TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) {
......@@ -406,7 +406,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (pCmd->parseFinished) {
tscDebug("0x%"PRIx64" update local table meta, continue to process sql and send corresponding query", pSql->self);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS);
......@@ -449,7 +449,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
}
if (pCmd->insertType == TSDB_QUERY_TYPE_STMT_INSERT) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
......@@ -466,7 +466,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
tscHandleMultivnodeInsert(pSql);
}
} else {
SQueryInfo* pQueryInfo1 = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo1 = tscGetQueryInfo(pCmd);
executeQuery(pSql, pQueryInfo1);
}
......
......@@ -53,7 +53,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
SSqlRes *pRes = &pSql->res;
// one column for each row
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -154,7 +154,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
pSql->cmd.numOfCols = numOfCols;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
pQueryInfo->order.order = TSDB_ORDER_ASC;
TAOS_FIELD f = {.type = TSDB_DATA_TYPE_BINARY, .bytes = (TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE};
......@@ -199,7 +199,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
}
static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
assert(tscGetMetaInfo(pQueryInfo, 0)->pTableMeta != NULL);
......@@ -389,7 +389,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
SColumnIndex index = {0};
pSql->cmd.numOfCols = 2;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
pQueryInfo->order.order = TSDB_ORDER_ASC;
TAOS_FIELD f;
......@@ -427,7 +427,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
static int32_t tscSCreateSetValueToResObj(SSqlObj *pSql, int32_t rowLen, const char *tableName, const char *ddl) {
SSqlRes *pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
int32_t numOfRows = 1;
if (strlen(ddl) == 0) {
......@@ -444,7 +444,7 @@ static int32_t tscSCreateSetValueToResObj(SSqlObj *pSql, int32_t rowLen, const c
return 0;
}
static int32_t tscSCreateBuildResult(SSqlObj *pSql, BuildType type, const char *str, const char *result) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
int32_t rowLen = tscSCreateBuildResultFields(pSql, type, result);
tscFieldInfoUpdateOffset(pQueryInfo);
......@@ -531,7 +531,7 @@ static int32_t tscGetTableTagColumnName(SSqlObj *pSql, char **result) {
}
buf[0] = 0;
STableMeta *pMeta = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->pTableMeta;
STableMeta *pMeta = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0)->pTableMeta;
if (pMeta->tableType == TSDB_SUPER_TABLE || pMeta->tableType == TSDB_NORMAL_TABLE ||
pMeta->tableType == TSDB_STREAM_TABLE) {
free(buf);
......@@ -552,7 +552,7 @@ static int32_t tscGetTableTagColumnName(SSqlObj *pSql, char **result) {
return TSDB_CODE_SUCCESS;
}
static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, char *ddl) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -606,7 +606,7 @@ static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, ch
}
static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName, char *ddl) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -633,7 +633,7 @@ static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName,
}
static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName, char *ddl) {
char *result = ddl;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -674,7 +674,7 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName,
}
static int32_t tscProcessShowCreateTable(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pTableMetaInfo->pTableMeta != NULL);
......@@ -700,7 +700,7 @@ static int32_t tscProcessShowCreateTable(SSqlObj *pSql) {
}
static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -727,7 +727,7 @@ static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) {
return TSDB_CODE_TSC_ACTION_IN_PROGRESS;
}
static int32_t tscProcessCurrentUser(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resBytes = TSDB_USER_LEN + TSDB_DATA_TYPE_BINARY;
......@@ -754,7 +754,7 @@ static int32_t tscProcessCurrentDB(SSqlObj *pSql) {
extractDBName(pSql->pTscObj->db, db);
pthread_mutex_unlock(&pSql->pTscObj->mutex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
......@@ -781,7 +781,7 @@ static int32_t tscProcessCurrentDB(SSqlObj *pSql) {
static int32_t tscProcessServerVer(SSqlObj *pSql) {
const char* v = pSql->pTscObj->sversion;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
......@@ -804,7 +804,7 @@ static int32_t tscProcessServerVer(SSqlObj *pSql) {
}
static int32_t tscProcessClientVer(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
......@@ -856,7 +856,7 @@ static int32_t tscProcessServStatus(SSqlObj *pSql) {
return pSql->res.code;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
int32_t val = 1;
......@@ -870,7 +870,7 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
pCmd->numOfCols = 1;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
pQueryInfo->order.order = TSDB_ORDER_ASC;
tscFieldInfoClear(&pQueryInfo->fieldsInfo);
......
......@@ -139,7 +139,7 @@ int32_t tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tO
#ifdef _DEBUG_VIEW
printf("load data page into mem for build loser tree: %" PRIu64 " rows\n", ds->filePage.num);
SSrcColumnInfo colInfo[256] = {0};
SQueryInfo * pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo * pQueryInfo = tscGetQueryInfo(pCmd);
tscGetSrcColumnInfo(colInfo, pQueryInfo);
......
......@@ -748,7 +748,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
const int32_t STABLE_INDEX = 1;
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
char *sql = *sqlstr;
......@@ -1071,7 +1071,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
int32_t totalNum = 0;
int32_t code = TSDB_CODE_SUCCESS;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
assert(pQueryInfo != NULL);
STableMetaInfo *pTableMetaInfo = (pQueryInfo->numOfTables == 0)? tscAddEmptyMetaInfo(pQueryInfo):tscGetMetaInfo(pQueryInfo, 0);
......@@ -1219,7 +1219,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
}
} else { // bindedColumns != NULL
// insert into tablename(col1, col2,..., coln) values(v1, v2,... vn);
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, 0)->pTableMeta;
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
goto _clean;
......@@ -1363,7 +1363,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
SSqlCmd *pCmd = &pSql->cmd;
pSql->res.numOfRows = 0;
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, 0)->pTableMeta;
SSubmitBlk *pBlocks = (SSubmitBlk *)(pTableDataBlocks->pData);
code = tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
......@@ -1425,7 +1425,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
// accumulate the total submit records
pParentSql->res.numOfRows += pSql->res.numOfRows;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
......
......@@ -696,7 +696,7 @@ static int doBindParam(char* data, SParamInfo* param, TAOS_BIND* bind) {
static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
SSqlCmd* pCmd = &stmt->pSql->cmd;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
if (pCmd->pTableBlockHashList == NULL) {
......@@ -763,7 +763,7 @@ static int insertStmtReset(STscStmt* pStmt) {
}
pCmd->batchSize = 0;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
pTableMetaInfo->vgroupIndex = 0;
return TSDB_CODE_SUCCESS;
}
......@@ -778,7 +778,7 @@ static int insertStmtExecute(STscStmt* stmt) {
return TSDB_CODE_SUCCESS;
}
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
if (pCmd->pTableBlockHashList == NULL) {
......@@ -1057,7 +1057,7 @@ int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes) {
if (pStmt->isInsert) {
SSqlCmd* pCmd = &pStmt->pSql->cmd;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
if (pCmd->pTableBlockHashList == NULL) {
pCmd->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
......
......@@ -619,32 +619,32 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
case TSDB_SQL_SELECT: {
const char* msg1 = "columns in select clause not identical";
int32_t code = loadAllTableMeta(pSql, pInfo);
code = loadAllTableMeta(pSql, pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SQueryInfo* pCurrent = pCmd->pQueryInfo;
for(int32_t i = 0; i < pCmd->clauseIndex; ++i) {
pCurrent = pCurrent->sibling;
}
pQueryInfo = tscGetQueryInfo(pCmd);
size_t size = taosArrayGetSize(pInfo->list);
for (int32_t i = pCmd->clauseIndex; i < size; ++i) {
for (int32_t i = 0; i < size; ++i) {
SSqlNode* pSqlNode = taosArrayGetP(pInfo->list, i);
tscTrace("%p start to parse %dth subclause, total:%d", pSql, i, (int32_t) size);
if ((code = validateSqlNode(pSql, pSqlNode, pCurrent)) != TSDB_CODE_SUCCESS) {
tscTrace("%p start to parse %dth subclause, total:%"PRId64, pSql, i, size);
if ((code = validateSqlNode(pSql, pSqlNode, pQueryInfo)) != TSDB_CODE_SUCCESS) {
return code;
}
tscPrintSelNodeList(pSql, i);
pCmd->clauseIndex += 1;
if (i+1 < size && pCurrent->sibling == NULL) {
if ((i + 1) < size && pQueryInfo->sibling == NULL) {
if ((code = tscAddQueryInfo(pCmd)) != TSDB_CODE_SUCCESS) {
return code;
}
pCurrent = pCmd->active;
pQueryInfo = pCmd->active;
}
}
......@@ -1202,7 +1202,7 @@ bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField) {
const char* msg5 = "invalid binary/nchar tag length";
const char* msg6 = "invalid data type in tags";
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
int32_t numOfTags = tscGetNumOfTags(pTableMeta);
......@@ -1275,7 +1275,7 @@ bool validateOneColumn(SSqlCmd* pCmd, TAOS_FIELD* pColField) {
const char* msg6 = "invalid column length";
// assert(pCmd->numOfClause == 1);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
int32_t numOfTags = tscGetNumOfTags(pTableMeta);
......@@ -2649,7 +2649,7 @@ int32_t getColumnIndexByName(SSqlCmd* pCmd, const SStrToken* pToken, SQueryInfo*
int32_t setShowInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
SSqlCmd* pCmd = &pSql->cmd;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
// assert(pCmd->numOfClause == 1);
pCmd->command = TSDB_SQL_SHOW;
......@@ -5059,7 +5059,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
SSqlCmd* pCmd = &pSql->cmd;
SAlterTableInfo* pAlterSQL = pInfo->pAlterInfo;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, DEFAULT_TABLE_INDEX);
......@@ -5747,10 +5747,10 @@ int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDbInfo* pCreateDbSql) {
}
void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex) {
SQueryInfo* pParentQueryInfo = tscGetQueryInfo(&pParentObj->cmd, subClauseIndex);
SQueryInfo* pParentQueryInfo = tscGetQueryInfo(&pParentObj->cmd);
if (pParentQueryInfo->groupbyExpr.numOfGroupCols > 0) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, subClauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SExprInfo* pExpr = NULL;
size_t size = taosArrayGetSize(pQueryInfo->exprList);
......@@ -6369,7 +6369,7 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCreateDbMsg* pCreate) {
// for debug purpose
void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, subClauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
int32_t size = (int32_t)tscNumOfExprs(pQueryInfo);
if (size == 0) {
......@@ -6411,7 +6411,7 @@ int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSqlInfo* p
const char* msg1 = "invalid table name";
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, subClauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
......@@ -6470,7 +6470,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
SSqlCmd* pCmd = &pSql->cmd;
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
// two table: the first one is for current table, and the secondary is for the super table.
if (pQueryInfo->numOfTables < 2) {
......@@ -6673,7 +6673,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
const char* msg7 = "time interval is required";
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
assert(pQueryInfo->numOfTables == 1);
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
......@@ -7301,6 +7301,57 @@ static STableMeta* extractTempTableMetaFromSubquery(SQueryInfo* pUpstream) {
return meta;
}
static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pSql, SQueryInfo* pQueryInfo, char* msgBuf) {
SRelElementPair* subInfo = taosArrayGet(pSqlNode->from->list, index);
// union all is not support currently
SSqlNode* p = taosArrayGetP(subInfo->pSubquery, 0);
SQueryInfo* pSub = calloc(1, sizeof(SQueryInfo));
tscInitQueryInfo(pSub);
int32_t code = validateSqlNode(pSql, p, pSub);
assert(code != TSDB_CODE_TSC_ACTION_IN_PROGRESS);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pSub->pDownstream = pQueryInfo;
// create dummy table meta info
STableMetaInfo* pTableMetaInfo1 = calloc(1, sizeof(STableMetaInfo));
pTableMetaInfo1->pTableMeta = extractTempTableMetaFromSubquery(pSub);
if (subInfo->aliasName.n > 0) {
if (subInfo->aliasName.n >= TSDB_TABLE_FNAME_LEN) {
return invalidSqlErrMsg(msgBuf, "subquery alias name too long");
}
strncpy(pTableMetaInfo1->aliasName, subInfo->aliasName.z, subInfo->aliasName.n);
}
taosArrayPush(pQueryInfo->pUpstream, &pSub);
// NOTE: order mix up in subquery not support yet.
pQueryInfo->order = pSub->order;
char* tmp = realloc(pQueryInfo->pTableMetaInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES);
if (tmp == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pQueryInfo->pTableMetaInfo[pQueryInfo->numOfTables] = pTableMetaInfo1;
pQueryInfo->numOfTables += 1;
// all columns are added into the table column list
STableMeta* pMeta = pTableMetaInfo1->pTableMeta;
for(int32_t i = 0; i < pMeta->tableInfo.numOfColumns; ++i) {
tscColumnListInsert(pQueryInfo->colList, i, pMeta->id.uid, &pMeta->schema[i]);
}
return TSDB_CODE_SUCCESS;
}
int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInfo) {
assert(pSqlNode != NULL && (pSqlNode->from == NULL || taosArrayGetSize(pSqlNode->from->list) > 0));
......@@ -7319,10 +7370,10 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
/*
* handle the sql expression without from subclause
* select current_database();
* select server_status();
* select server_version();
* select client_version();
* select server_state();
* select current_database();
*/
if (pSqlNode->from == NULL) {
assert(pSqlNode->fillType == NULL && pSqlNode->pGroupby == NULL && pSqlNode->pWhere == NULL &&
......@@ -7331,61 +7382,30 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
}
if (pSqlNode->from->type == SQL_NODE_FROM_SUBQUERY) {
// parse the subquery in the first place
int32_t numOfSub = taosArrayGetSize(pSqlNode->from->list);
SRelElementPair* sub = taosArrayGet(pSqlNode->from->list, 0);
SSqlNode* p = taosArrayGetP(sub->pSubquery, 0);
pQueryInfo->numOfTables = 0;
code = validateSqlNode(pSql, p, pQueryInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code != TSDB_CODE_SUCCESS) {
return code;
}
SQueryInfo* current = calloc(1, sizeof(SQueryInfo));
tscInitQueryInfo(current);
taosArrayPush(current->pUpstream, &pQueryInfo);
STableMeta* pTableMeta = extractTempTableMetaFromSubquery(pQueryInfo);
STableMetaInfo* pTableMetaInfo1 = calloc(1, sizeof(STableMetaInfo));
pTableMetaInfo1->pTableMeta = pTableMeta;
if (sub->aliasName.n > 0) {
if (sub->aliasName.n > TSDB_TABLE_FNAME_LEN) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "subquery alias name too long");
// parse the subquery in the first place
int32_t numOfSub = (int32_t) taosArrayGetSize(pSqlNode->from->list);
for(int32_t i = 0; i < numOfSub; ++i) {
code = doValidateSubquery(pSqlNode, i, pSql, pQueryInfo, tscGetErrorMsgPayload(pCmd));
if (code != TSDB_CODE_SUCCESS) {
return code;
}
strncpy(pTableMetaInfo1->aliasName, sub->aliasName.z, sub->aliasName.n);
}
current->pTableMetaInfo = calloc(numOfSub, POINTER_BYTES);
current->pTableMetaInfo[0] = pTableMetaInfo1;
current->numOfTables = 1;
current->order = pQueryInfo->order;
pCmd->pQueryInfo = current;
pQueryInfo->pDownstream = current;
if (validateSelectNodeList(pCmd, current, pSqlNode->pSelNodeList, false, false, false) != TSDB_CODE_SUCCESS) {
if (validateSelectNodeList(pCmd, pQueryInfo, pSqlNode->pSelNodeList, false, false, false) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
// all columns are added into the table column list
for(int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
tscColumnListInsert(current->colList, i, pTableMetaInfo1->pTableMeta->id.uid,
&pTableMetaInfo1->pTableMeta->schema[i]);
}
if (pSqlNode->pWhere != NULL) {
if (validateWhereNode(current, &pSqlNode->pWhere, pSql) != TSDB_CODE_SUCCESS) {
if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
pSqlNode->pWhere = NULL;
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
if (pTableMeta->tableInfo.precision == TSDB_TIME_PRECISION_MILLI) {
current->window.skey = current->window.skey / 1000;
current->window.ekey = current->window.ekey / 1000;
pQueryInfo->window.skey = pQueryInfo->window.skey / 1000;
pQueryInfo->window.ekey = pQueryInfo->window.ekey / 1000;
}
}
} else {
......
......@@ -116,7 +116,7 @@ static void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SNewVgroupInfo *pVgrou
static void tscUpdateVgroupInfo(SSqlObj *pSql, SRpcEpSet *pEpSet) {
SSqlCmd *pCmd = &pSql->cmd;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) {
return;
}
......@@ -335,7 +335,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
tscDebug("0x%"PRIx64" sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
pSql->self, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
......@@ -506,7 +506,7 @@ int tscBuildAndSendRequest(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
uint32_t type = 0;
if (pQueryInfo == NULL) {
pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
pQueryInfo = tscGetQueryInfo(pCmd);
}
STableMetaInfo *pTableMetaInfo = NULL;
......@@ -581,7 +581,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
char* pMsg = pSql->cmd.payload;
......@@ -620,7 +620,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) {
const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
......@@ -630,7 +630,6 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) {
int32_t tsBufSize = (pQueryInfo->tsBuf != NULL) ? pQueryInfo->tsBuf->fileSize : 0;
int32_t sqlLen = (int32_t) strlen(pSql->sqlstr) + 1;
int32_t tableSerialize = 0;
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (pTableMetaInfo->pVgroupTables != NULL) {
......@@ -1051,7 +1050,7 @@ int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCreateDbMsg *pCreateDbMsg = (SCreateDbMsg *)pCmd->payload;
// assert(pCmd->numOfClause == 1);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
int32_t code = tNameExtractFullName(&pTableMetaInfo->name, pCreateDbMsg->db);
assert(code == TSDB_CODE_SUCCESS);
......@@ -1171,7 +1170,7 @@ int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SDropDbMsg *pDropDbMsg = (SDropDbMsg*)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
int32_t code = tNameExtractFullName(&pTableMetaInfo->name, pDropDbMsg->db);
assert(code == TSDB_CODE_SUCCESS && pTableMetaInfo->name.type == TSDB_DB_NAME_T);
......@@ -1192,7 +1191,7 @@ int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
tNameExtractFullName(&pTableMetaInfo->name, pDropTableMsg->name);
pDropTableMsg->igNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0;
......@@ -1249,7 +1248,7 @@ int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
SUseDbMsg *pUseDbMsg = (SUseDbMsg *)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
tNameExtractFullName(&pTableMetaInfo->name, pUseDbMsg->db);
pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
......@@ -1266,7 +1265,7 @@ int32_t tscBuildSyncDbReplicaMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
}
SSyncDbMsg *pSyncMsg = (SSyncDbMsg *)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
tNameExtractFullName(&pTableMetaInfo->name, pSyncMsg->db);
pCmd->msgType = TSDB_MSG_TYPE_CM_SYNC_DB;
......@@ -1286,7 +1285,7 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SShowMsg *pShowMsg = (SShowMsg *)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
if (tNameIsEmpty(&pTableMetaInfo->name)) {
pthread_mutex_lock(&pObj->mutex);
......@@ -1360,7 +1359,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSchema *pSchema;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// Reallocate the payload size
......@@ -1449,7 +1448,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
return minMsgSize() + sizeof(SAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) + TSDB_EXTRA_PAYLOAD_SIZE;
}
......@@ -1458,7 +1457,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int msgLen = 0;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -1507,7 +1506,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMeta *pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
SNewVgroupInfo vgroupInfo = {.vgId = -1};
......@@ -1527,7 +1526,7 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SAlterDbMsg *pAlterDbMsg = (SAlterDbMsg* )pCmd->payload;
pAlterDbMsg->dbType = -1;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
tNameExtractFullName(&pTableMetaInfo->name, pAlterDbMsg->db);
return TSDB_CODE_SUCCESS;
......@@ -1543,7 +1542,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
pRetrieveMsg->qId = htobe64(pSql->res.qId);
pRetrieveMsg->free = htons(pQueryInfo->type);
......@@ -1567,7 +1566,7 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
pRes->row = 0;
pRes->rspType = 1;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
return pRes->code;
}
......@@ -1591,7 +1590,7 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
int tscProcessDescribeTableRsp(SSqlObj *pSql) {
SSqlCmd * pCmd = &pSql->cmd;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
......@@ -1645,7 +1644,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
qTableQuery(pQueryInfo->pQInfo, &localQueryId);
convertQueryResult(pRes, pQueryInfo);
handleDownstreamOperator(pRes, pQueryInfo);
// handleDownstreamOperator(pRes, pQueryInfo);
code = pRes->code;
if (pRes->code == TSDB_CODE_SUCCESS) {
......@@ -1693,7 +1692,7 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableInfoMsg *pInfoMsg = (STableInfoMsg *)pCmd->payload;
......@@ -1755,14 +1754,14 @@ int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
char* pMsg = pCmd->payload;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
SSTableVgroupMsg *pStableVgroupMsg = (SSTableVgroupMsg *)pMsg;
pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
pMsg += sizeof(SSTableVgroupMsg);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, i);
int32_t code = tNameExtractFullName(&pTableMetaInfo->name, pMsg);
assert(code == TSDB_CODE_SUCCESS);
......@@ -1918,7 +1917,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
return code;
}
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
assert(pTableMetaInfo->pTableMeta == NULL);
STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg);
......@@ -2085,7 +2084,7 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
SSqlCmd* pCmd = &parent->cmd;
for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) {
STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, i);
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *) pMsg;
pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups);
......@@ -2148,7 +2147,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -2274,7 +2273,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
int tscProcessUseDbRsp(SSqlObj *pSql) {
STscObj * pObj = pSql->pTscObj;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
pthread_mutex_lock(&pObj->mutex);
int ret = tNameExtractFullName(&pTableMetaInfo->name, pObj->db);
......@@ -2292,7 +2291,7 @@ int tscProcessDropDbRsp(SSqlObj *pSql) {
}
int tscProcessDropTableRsp(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
//The cached tableMeta is expired in this case, so clean it in hash table
char name[TSDB_TABLE_FNAME_LEN] = {0};
......@@ -2306,7 +2305,7 @@ int tscProcessDropTableRsp(SSqlObj *pSql) {
}
int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, name);
......@@ -2380,7 +2379,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
tscSetResRawPtr(pRes, pQueryInfo);
}
handleDownstreamOperator(pRes, pQueryInfo);
// handleDownstreamOperator(pRes, pQueryInfo);
if (pSql->pSubscription != NULL) {
int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
......@@ -2583,7 +2582,7 @@ int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
char name[TSDB_TABLE_FNAME_LEN] = {0};
......
......@@ -373,7 +373,7 @@ int taos_num_fields(TAOS_RES *res) {
if (pSql == NULL || pSql->signature != pSql) return 0;
int32_t num = 0;
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (pQueryInfo == NULL) {
return num;
}
......@@ -412,7 +412,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
SSqlRes *pRes = &pSql->res;
if (pSql == NULL || pSql->signature != pSql) return 0;
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (pQueryInfo == NULL) {
return NULL;
}
......@@ -563,7 +563,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
return true;
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
if ((pQueryInfo == NULL) || tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
return true;
......@@ -676,7 +676,7 @@ char *taos_get_client_info() { return version; }
static void tscKillSTableQuery(SSqlObj *pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
return;
......@@ -727,7 +727,7 @@ void taos_stop_query(TAOS_RES *res) {
// set the error code for master pSqlObj firstly
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
assert(pSql->rpcRid <= 0);
......@@ -757,7 +757,7 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
return true;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (pQueryInfo == NULL) {
return true;
}
......
......@@ -89,7 +89,7 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
return;
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
......@@ -138,7 +138,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
pStream->numOfRes = 0; // reset the numOfRes.
SSqlObj *pSql = pStream->pSql;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
tscDebug("0x%"PRIx64" timer launch query", pSql->self);
if (pStream->isProject) {
......@@ -197,7 +197,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
tscError("0x%"PRIx64" stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql->self,
pStream, numOfRows, retryDelay);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0);
char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, name);
......@@ -224,7 +224,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
static void tscStreamFillTimeGap(SSqlStream* pStream, TSKEY ts) {
#if 0
SSqlObj * pSql = pStream->pSql;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (pQueryInfo->fillType != TSDB_FILL_SET_VALUE && pQueryInfo->fillType != TSDB_FILL_NULL) {
return;
......@@ -273,7 +273,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful.
......@@ -444,7 +444,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
int64_t minIntervalTime =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinIntervalTime * 1000L : tsMinIntervalTime;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (!pStream->isProject && pQueryInfo->interval.interval == 0) {
sprintf(pSql->cmd.payload, "the interval value is 0");
......@@ -494,7 +494,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
}
static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (pStream->isProject) {
// no data in table, flush all data till now to destination meter, 10sec delay
......@@ -556,7 +556,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
......
......@@ -266,7 +266,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
pSub->lastSyncTime = taosGetTimestampMs();
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SSubscriptionProgress target = {.uid = pTableMeta->id.uid, .key = 0};
......@@ -284,7 +284,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
}
size_t numOfTables = taosArrayGetSize(tables);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
SArray* progress = taosArrayInit(numOfTables, sizeof(SSubscriptionProgress));
for( size_t i = 0; i < numOfTables; i++ ) {
STidTags* tt = taosArrayGet( tables, i );
......@@ -304,7 +304,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
}
taosArrayDestroy(tables);
TSDB_QUERY_SET_TYPE(tscGetQueryInfo(pCmd, 0)->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
TSDB_QUERY_SET_TYPE(tscGetQueryInfo(pCmd)->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
return 1;
}
......@@ -503,8 +503,8 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
SSqlObj *pSql = pSub->pSql;
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
if (taosArrayGetSize(pSub->progress) > 0) { // fix crash in single table subscription
size_t size = taosArrayGetSize(pSub->progress);
......
......@@ -98,7 +98,7 @@ static bool allSubqueryDone(SSqlObj *pParentSql) {
return done;
}
static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
SSubqueryState *subState = &pParentSql->subState;
assert(idx < subState->numOfSub);
......@@ -124,7 +124,7 @@ static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
win->skey = INT64_MAX;
win->ekey = INT64_MIN;
......@@ -149,7 +149,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
for (int32_t i = 0; i < joinNum; ++i) {
STSBuf* output = tsBufCreate(true, pQueryInfo->order.order);
SQueryInfo* pSubQueryInfo = tscGetQueryInfo(&pSql->pSubs[i]->cmd, 0);
SQueryInfo* pSubQueryInfo = tscGetQueryInfo(&pSql->pSubs[i]->cmd);
pSubQueryInfo->tsBuf = output;
......@@ -394,12 +394,12 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index) {
pSupporter->pObj = pSql;
pSupporter->subqueryIndex = index;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
memcpy(&pSupporter->interval, &pQueryInfo->interval, sizeof(pSupporter->interval));
pSupporter->limit = pQueryInfo->limit;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, index);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, index);
pSupporter->uid = pTableMetaInfo->pTableMeta->id.uid;
assert (pSupporter->uid != 0);
......@@ -549,7 +549,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
continue;
}
SQueryInfo *pSubQueryInfo = tscGetQueryInfo(&pPrevSub->cmd, 0);
SQueryInfo *pSubQueryInfo = tscGetQueryInfo(&pPrevSub->cmd);
STSBuf *pTsBuf = pSubQueryInfo->tsBuf;
pSubQueryInfo->tsBuf = NULL;
......@@ -567,7 +567,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
tscClearSubqueryInfo(&pNew->cmd);
pSql->pSubs[i] = pNew;
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pNew->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pNew->cmd);
pQueryInfo->tsBuf = pTsBuf; // transfer the ownership of timestamp comp-z data to the new created object
// set the second stage sub query for join process
......@@ -665,7 +665,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
continue;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->pSubs[i]->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->pSubs[i]->cmd);
executeQuery(pSql->pSubs[i], pQueryInfo);
}
......@@ -802,7 +802,7 @@ static void issueTsCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
tscClearSubqueryInfo(pCmd);
tscFreeSqlResult(pSql);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
assert(pQueryInfo->numOfTables == 1);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -1091,7 +1091,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
// todo, the type may not include TSDB_QUERY_TYPE_TAG_FILTER_QUERY
assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY));
......@@ -1213,7 +1213,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
SSqlCmd* pSubCmd = &pParentSql->pSubs[m]->cmd;
SArray** s = taosArrayGet(resList, m);
SQueryInfo* pQueryInfo1 = tscGetQueryInfo(pSubCmd, 0);
SQueryInfo* pQueryInfo1 = tscGetQueryInfo(pSubCmd);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo1, 0);
tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo, *s);
......@@ -1247,7 +1247,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
assert(!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE));
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
......@@ -1379,7 +1379,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
}
// launch the query the retrieve actual results from vnode along with the filtered timestamp
SQueryInfo* pPQueryInfo = tscGetQueryInfo(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
SQueryInfo* pPQueryInfo = tscGetQueryInfo(&pParentSql->cmd);
updateQueryTimeRange(pPQueryInfo, &win);
//update the vgroup that involved in real data query
......@@ -1395,7 +1395,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" abort query due to other subquery failure. code:%d, global code:%d", pSql->self, numOfRows, pParentSql->res.code);
......@@ -1504,7 +1504,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
SSqlRes *pRes = &pSub->res;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd);
if (!tscHasReachLimitation(pQueryInfo, pRes)) {
if (pRes->row >= pRes->numOfRows) {
// no data left in current result buffer
......@@ -1556,7 +1556,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
continue;
}
SQueryInfo* p = tscGetQueryInfo(&pSub->cmd, 0);
SQueryInfo* p = tscGetQueryInfo(&pSub->cmd);
orderedPrjQuery = tscNonOrderedProjectionQueryOnSTable(p, 0);
if (orderedPrjQuery) {
break;
......@@ -1580,7 +1580,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
continue;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd);
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && pSub->res.row >= pSub->res.numOfRows &&
pSub->res.completed) {
......@@ -1655,7 +1655,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
pSupporter = (SJoinSupporter*)pSql1->param;
// wait for all subqueries completed
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd1, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd1);
assert(pRes1->numOfRows >= 0 && pQueryInfo->numOfTables == 1);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -1686,7 +1686,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
int32_t numOfExprs = (int32_t)tscNumOfExprs(pQueryInfo);
pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * numOfExprs);
......@@ -1710,7 +1710,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
assert(tableIndexOfSub >= 0 && tableIndexOfSub < pQueryInfo->numOfTables);
SSqlCmd* pSubCmd = &pSql->pSubs[tableIndexOfSub]->cmd;
SQueryInfo* pSubQueryInfo = tscGetQueryInfo(pSubCmd, 0);
SQueryInfo* pSubQueryInfo = tscGetQueryInfo(pSubCmd);
size_t numOfSubExpr = taosArrayGetSize(pSubQueryInfo->exprList);
for (int32_t k = 0; k < numOfSubExpr; ++k) {
......@@ -1734,7 +1734,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
SSqlObj* pParentSql = pSupporter->pObj;
// There is only one subquery and table for each subquery.
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1);
......@@ -1818,7 +1818,7 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo
int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) {
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
pSql->res.qId = 0x1;
assert(pSql->res.numOfRows == 0);
......@@ -1841,7 +1841,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
// refactor as one method
SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd, 0);
SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd);
assert(pNewQueryInfo != NULL);
// update the table index
......@@ -1952,7 +1952,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
}
} else {
assert(0);
SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd, 0);
SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd);
pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
}
......@@ -1963,7 +1963,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0);
int32_t code = TSDB_CODE_SUCCESS;
......@@ -1998,7 +1998,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
}
SSqlObj* pSub = pSql->pSubs[i];
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSub->cmd, 0, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSub->cmd, 0);
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) && (pTableMetaInfo->vgroupList->numOfVgroups == 0)) {
pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
break;
......@@ -2147,7 +2147,7 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
SFirstRoundQuerySup* pSup = param;
SSqlObj* pParent = pSup->pParent;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
int32_t code = taos_errno(pSql);
if (code != TSDB_CODE_SUCCESS) {
......@@ -2230,7 +2230,7 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
// set the parameters for the second round query process
SSqlCmd *pPCmd = &pParent->cmd;
SQueryInfo *pQueryInfo1 = tscGetQueryInfo(pPCmd, 0);
SQueryInfo *pQueryInfo1 = tscGetQueryInfo(pPCmd);
int32_t resRows = pSup->numOfRows;
if (pSup->numOfRows > 0) {
......@@ -2280,8 +2280,8 @@ void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) {
}
int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
STableMetaInfo* pTableMetaInfo1 = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo* pTableMetaInfo1 = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
SFirstRoundQuerySup *pSup = calloc(1, sizeof(SFirstRoundQuerySup));
......@@ -2296,7 +2296,7 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
tscClearSubqueryInfo(pCmd);
tscFreeSqlResult(pSql);
SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd);
assert(pQueryInfo->numOfTables == 1);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
......@@ -2495,7 +2495,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
// todo handle multi-vnode situation
if (pQueryInfo->tsBuf) {
SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd, 0);
SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd);
pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf);
assert(pNewQueryInfo->tsBuf != NULL);
}
......@@ -2582,7 +2582,7 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32
SSqlObj *pParentSql = trsupport->pParentSql;
int32_t subqueryIndex = trsupport->subqueryIndex;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);
......@@ -2685,7 +2685,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tscFreeRetrieveSup(pSql);
// in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd);
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code);
......@@ -2702,7 +2702,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
SSubqueryState* pState = &pParentSql->subState;
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
......@@ -2751,7 +2751,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
tscDebug("0x%"PRIx64" retrieve from %d vnodes completed.final NumOfRows:%" PRId64 ",start to build loser tree",
pParentSql->self, pState->numOfSub, pState->numOfRetrievedRows);
SQueryInfo *pPQueryInfo = tscGetQueryInfo(&pParentSql->cmd, 0);
SQueryInfo *pPQueryInfo = tscGetQueryInfo(&pParentSql->cmd);
tscClearInterpInfo(pPQueryInfo);
code = tscCreateLocalMerger(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, pPQueryInfo, &pParentSql->res.pLocalMerger, pParentSql->self);
......@@ -2798,7 +2798,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
SSubqueryState* pState = &pParentSql->subState;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
......@@ -2836,7 +2836,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
}
SSqlRes * pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (numOfRows > 0) {
assert(pRes->numOfRows == numOfRows);
......@@ -2888,7 +2888,7 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo
SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, TSDB_SQL_SELECT, prevSqlObj);
if (pNew != NULL) { // the sub query of two-stage super table query
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pNew->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pNew->cmd);
pNew->cmd.active = pQueryInfo;
pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
......@@ -2923,10 +2923,10 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SSqlObj* pParentSql = trsupport->pParentSql;
SSqlObj* pSql = (SSqlObj *) tres;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
assert(pQueryInfo->numOfTables == 1);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex];
// stable query killed or other subquery failed, all query stopped
......@@ -2953,7 +2953,6 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
tscError("0x%"PRIx64" sub:0x%"PRIx64" failed code:%s, retry:%d", pParentSql->self, pSql->self, tstrerror(code), trsupport->numOfRetry);
int32_t sent = 0;
tscReissueSubquery(trsupport, pSql, code, &sent);
if (sent) {
return;
......@@ -3062,7 +3061,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
// clean up tableMeta in cache
tscFreeQueryInfo(&pSql->cmd, false);
SQueryInfo* pQueryInfo = tscGetQueryInfoS(&pSql->cmd, 0);
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, pSql->cmd.clauseIndex, 0);
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, 0);
tscAddTableMetaInfo(pQueryInfo, &pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
subquerySetState(pSql, &pParentObj->subState, i, 0);
......@@ -3234,7 +3233,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
}
static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t columnIndex, int16_t* bytes) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
SInternalField* pInfo = (SInternalField*) TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, columnIndex);
assert(pInfo->pExpr->pExpr == NULL);
......@@ -3250,7 +3249,7 @@ static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t column
static void doBuildResFromSubqueries(SSqlObj* pSql) {
SSqlRes* pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
int32_t numOfRes = INT32_MAX;
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
......@@ -3347,7 +3346,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
}
if (pRes->tsrow == NULL) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
pRes->numOfCols = (int16_t) tscNumOfExprs(pQueryInfo);
pRes->tsrow = calloc(pRes->numOfCols, POINTER_BYTES);
......@@ -3401,7 +3400,7 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql) {
return pRes->tsrow;
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
size_t size = tscNumOfFields(pQueryInfo);
......@@ -3434,7 +3433,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
bool hasData = true;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
bool allSubqueryExhausted = true;
......@@ -3446,7 +3445,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
SSqlRes *pRes1 = &pSql->pSubs[i]->res;
SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd;
SQueryInfo *pQueryInfo1 = tscGetQueryInfo(pCmd1, pCmd1->clauseIndex);
SQueryInfo *pQueryInfo1 = tscGetQueryInfo(pCmd1);
assert(pQueryInfo1->numOfTables == 1);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo1, 0);
......@@ -3470,7 +3469,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
}
SSqlRes * pRes1 = &pSql->pSubs[i]->res;
SQueryInfo *pQueryInfo1 = tscGetQueryInfo(&pSql->pSubs[i]->cmd, 0);
SQueryInfo *pQueryInfo1 = tscGetQueryInfo(&pSql->pSubs[i]->cmd);
if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pQueryInfo1, pRes1) &&
tscIsProjectionQuery(pQueryInfo1)) ||
......
......@@ -743,38 +743,39 @@ void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
pRes->completed = (pRes->numOfRows == 0);
}
void handleDownstreamOperator(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
void handleDownstreamOperator(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSqlRes* pOutput) {
if (pQueryInfo->pDownstream != NULL) {
// handle the following query process
SQueryInfo *px = pQueryInfo->pDownstream;
SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->pTableMetaInfo[0]->pTableMeta, px->colList);
int32_t numOfOutput = (int32_t) tscNumOfExprs(px);
if (px->pQInfo == NULL) {
SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->pTableMetaInfo[0]->pTableMeta, px->colList);
int32_t numOfOutput = (int32_t) tscNumOfExprs(px);
int32_t numOfCols = (int32_t) taosArrayGetSize(px->colList);
SQueriedTableInfo info = {.colList = pColumnInfo, .numOfCols = numOfCols,};
SSchema* pSchema = tscGetTableSchema(px->pTableMetaInfo[0]->pTableMeta);
int32_t numOfCols = (int32_t) taosArrayGetSize(px->colList);
SQueriedTableInfo info = {.colList = pColumnInfo, .numOfCols = numOfCols,};
SSchema* pSchema = tscGetTableSchema(px->pTableMetaInfo[0]->pTableMeta);
STableGroupInfo tableGroupInfo = {.numOfTables = 1, .pGroupList = taosArrayInit(1, POINTER_BYTES),};
tableGroupInfo.map = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
STableGroupInfo tableGroupInfo = {.numOfTables = 1, .pGroupList = taosArrayInit(1, POINTER_BYTES),};
tableGroupInfo.map = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
STableKeyInfo tableKeyInfo = {.pTable = NULL, .lastKey = INT64_MIN};
STableKeyInfo tableKeyInfo = {.pTable = NULL, .lastKey = INT64_MIN};
SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(group, &tableKeyInfo);
SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(group, &tableKeyInfo);
taosArrayPush(tableGroupInfo.pGroupList, &group);
taosArrayPush(tableGroupInfo.pGroupList, &group);
SOperatorInfo* pSourceOptr = createDummyInputOperator((char*)pRes, pSchema, numOfCols);
SOperatorInfo* pSourceOptr = createDummyInputOperator((char*)pRes, pSchema, numOfCols);
SExprInfo *exprInfo = NULL;
/*int32_t code = */createQueryFunc(&info, numOfOutput, &exprInfo, px->exprList->pData, NULL, px->type, NULL);
px->pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOptr, NULL, NULL, MASTER_SCAN);
SExprInfo *exprInfo = NULL;
/*int32_t code = */createQueryFunc(&info, numOfOutput, &exprInfo, px->exprList->pData, NULL, px->type, NULL);
px->pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOptr, NULL, NULL, MASTER_SCAN);
tfree(pColumnInfo);
}
uint64_t qId = 0;
qTableQuery(px->pQInfo, &qId);
convertQueryResult(pRes, px);
tfree(pColumnInfo);
convertQueryResult(pOutput, px);
}
}
......@@ -812,7 +813,7 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) {
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
while(pQueryInfo != NULL) {
SQueryInfo* p = pQueryInfo->sibling;
......@@ -1076,7 +1077,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
pCmd->numOfTablesInSubmit = pDataBlock->numOfTables;
// assert(pCmd->numOfClause == 1);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
// todo refactor
// set the correct table meta object, the table meta has been locked in pDataBlocks, so it must be in the cache
......@@ -1576,6 +1577,29 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
memset(pFieldInfo, 0, sizeof(SFieldInfo));
}
void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc) {
assert(pFieldInfo != NULL && pSrc != NULL);
pFieldInfo->numOfOutput = pSrc->numOfOutput;
if (pSrc->final != NULL) {
pFieldInfo->final = calloc(pSrc->numOfOutput, sizeof(TAOS_FIELD));
memcpy(pFieldInfo->final, pSrc->final, sizeof(TAOS_FIELD) * pSrc->numOfOutput);
}
if (pSrc->internalField != NULL) {
size_t num = taosArrayGetSize(pSrc->internalField);
for (int32_t i = 0; i < num; ++i) {
SInternalField* pfield = taosArrayGet(pSrc->internalField, i);
SInternalField p = {.visible = pfield->visible, .field = pfield->field};
p.pExpr = calloc(1, sizeof(SExprInfo));
tscExprAssign(p.pExpr, pfield->pExpr);
}
}
}
SExprInfo* tscExprCreate(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, int32_t colType) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pColIndex->tableIndex);
......@@ -1731,7 +1755,7 @@ int32_t tscExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy)
for (int32_t i = 0; i < size; ++i) {
SExprInfo* pExpr = taosArrayGetP(src, i);
if (pExpr->base.uid == uid) {
if (uid != 0 && pExpr->base.uid == uid) {
if (deepcopy) {
SExprInfo* p1 = calloc(1, sizeof(SExprInfo));
tscExprAssign(p1, pExpr);
......@@ -1740,7 +1764,8 @@ int32_t tscExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy)
} else {
taosArrayPush(dst, &pExpr);
}
} else {
taosArrayPush(dst, &pExpr);
}
}
......@@ -1885,6 +1910,18 @@ void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid) {
}
}
void tscColumnListCopyAll(SArray* dst, const SArray* src) {
assert(src != NULL && dst != NULL);
size_t num = taosArrayGetSize(src);
for (int32_t i = 0; i < num; ++i) {
SColumn* pCol = taosArrayGetP(src, i);
SColumn* p = tscColumnClone(pCol);
taosArrayPush(dst, &p);
}
}
void tscColumnListDestroy(SArray* pColumnList) {
if (pColumnList == NULL) {
return;
......@@ -2241,15 +2278,9 @@ bool tscShouldBeFreed(SSqlObj* pSql) {
* @param tableIndex denote the table index for join query, where more than one table exists
* @return
*/
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd* pCmd, int32_t clauseIndex, int32_t tableIndex) {
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd* pCmd, int32_t tableIndex) {
assert(pCmd != NULL);
// if (pCmd == NULL || pCmd->numOfClause == 0) {
// return NULL;
// }
// assert(clauseIndex >= 0 && clauseIndex < pCmd->numOfClause);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
return tscGetMetaInfo(pQueryInfo, tableIndex);
}
......@@ -2267,7 +2298,7 @@ STableMetaInfo* tscGetMetaInfo(SQueryInfo* pQueryInfo, int32_t tableIndex) {
}
SQueryInfo* tscGetQueryInfoS(SSqlCmd* pCmd, int32_t subClauseIndex) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, subClauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
int32_t ret = TSDB_CODE_SUCCESS;
while ((pQueryInfo) == NULL) {
......@@ -2276,7 +2307,7 @@ SQueryInfo* tscGetQueryInfoS(SSqlCmd* pCmd, int32_t subClauseIndex) {
return NULL;
}
pQueryInfo = tscGetQueryInfo(pCmd, subClauseIndex);
pQueryInfo = tscGetQueryInfo(pCmd);
}
return pQueryInfo;
......@@ -2316,20 +2347,11 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
pQueryInfo->slimit.limit = -1;
pQueryInfo->slimit.offset = 0;
pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->window = TSWINDOW_INITIALIZER;
}
int32_t tscAddQueryInfo(SSqlCmd* pCmd) {
assert(pCmd != NULL);
// todo refactor: remove this structure
// size_t s = pCmd->numOfClause + 1;
// char* tmp = realloc(pCmd->pQueryInfo, s * POINTER_BYTES);
// if (tmp == NULL) {
// return TSDB_CODE_TSC_OUT_OF_MEMORY;
// }
// pCmd->pQueryInfo = (SQueryInfo**)tmp;
SQueryInfo* pQueryInfo = calloc(1, sizeof(SQueryInfo));
if (pQueryInfo == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -2337,7 +2359,6 @@ int32_t tscAddQueryInfo(SSqlCmd* pCmd) {
tscInitQueryInfo(pQueryInfo);
pQueryInfo->window = TSWINDOW_INITIALIZER;
pQueryInfo->msg = pCmd->payload; // pointer to the parent error message buffer
if (pCmd->pQueryInfo == NULL) {
......@@ -2386,7 +2407,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) {
}
void tscClearSubqueryInfo(SSqlCmd* pCmd) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
while (pQueryInfo != NULL) {
SQueryInfo* p = pQueryInfo->sibling;
freeQueryInfoImpl(pQueryInfo);
......@@ -2394,6 +2415,86 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd) {
}
}
int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) {
assert(pQueryInfo != NULL && pSrc != NULL);
int32_t code = TSDB_CODE_SUCCESS;
memcpy(&pQueryInfo->interval, &pSrc->interval, sizeof(pQueryInfo->interval));
pQueryInfo->command = pSrc->command;
pQueryInfo->type = pSrc->type;
pQueryInfo->window = pSrc->window;
pQueryInfo->limit = pSrc->limit;
pQueryInfo->slimit = pSrc->slimit;
pQueryInfo->order = pSrc->order;
pQueryInfo->vgroupLimit = pSrc->vgroupLimit;
pQueryInfo->tsBuf = NULL;
pQueryInfo->fillType = pSrc->fillType;
pQueryInfo->fillVal = NULL;
pQueryInfo->clauseLimit = pSrc->clauseLimit;
pQueryInfo->numOfTables = pSrc->numOfTables;
pQueryInfo->window = pSrc->window;
pQueryInfo->sessionWindow = pSrc->sessionWindow;
pQueryInfo->pTableMetaInfo = NULL;
pQueryInfo->bufLen = pSrc->bufLen;
pQueryInfo->buf = malloc(pSrc->bufLen);
if (pQueryInfo->buf == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
if (pSrc->bufLen > 0) {
memcpy(pQueryInfo->buf, pSrc->buf, pSrc->bufLen);
}
pQueryInfo->groupbyExpr = pSrc->groupbyExpr;
if (pSrc->groupbyExpr.columnInfo != NULL) {
pQueryInfo->groupbyExpr.columnInfo = taosArrayDup(pSrc->groupbyExpr.columnInfo);
if (pQueryInfo->groupbyExpr.columnInfo == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
}
if (tscTagCondCopy(&pQueryInfo->tagCond, &pSrc->tagCond) != 0) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
if (pSrc->fillType != TSDB_FILL_NONE) {
pQueryInfo->fillVal = malloc(pSrc->fieldsInfo.numOfOutput * sizeof(int64_t));
if (pQueryInfo->fillVal == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
memcpy(pQueryInfo->fillVal, pSrc->fillVal, pSrc->fieldsInfo.numOfOutput * sizeof(int64_t));
}
tscColumnListCopyAll(pQueryInfo->colList, pSrc->colList);
tscFieldInfoCopy(&pQueryInfo->fieldsInfo, &pSrc->fieldsInfo);
if (tscExprCopy(pQueryInfo->exprList, pSrc->exprList, 0, true) != 0) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
for(int32_t i = 0; i < pSrc->numOfTables; ++i) {
STableMetaInfo* p1 = tscGetMetaInfo((SQueryInfo*) pSrc, i);
STableMeta* pMeta = tscTableMetaDup(p1->pTableMeta);
if (pMeta == NULL) {
// todo handle the error
}
tscAddTableMetaInfo(pQueryInfo, &p1->name, pMeta, p1->vgroupList, p1->tagColList, p1->pVgroupTables);
}
_error:
return code;
}
void tscFreeVgroupTableInfo(SArray* pVgroupTables) {
if (pVgroupTables == NULL) {
return;
......@@ -2594,7 +2695,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in
SQueryInfo* pQueryInfo = tscGetQueryInfoS(pCmd, 0);
assert(pSql->cmd.clauseIndex == 0);
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
tscAddTableMetaInfo(pQueryInfo, &pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
registerSqlObj(pNew);
......@@ -2682,7 +2783,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
goto _error;
}
SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pnCmd, 0);
SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pnCmd);
pNewQueryInfo->command = pQueryInfo->command;
pnCmd->active = pNewQueryInfo;
......@@ -2747,7 +2848,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
// set the correct query type
if (pPrevSql != NULL) {
SQueryInfo* pPrevQueryInfo = tscGetQueryInfo(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex);
SQueryInfo* pPrevQueryInfo = tscGetQueryInfo(&pPrevSql->cmd);
pNewQueryInfo->type = pPrevQueryInfo->type;
} else {
TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY);// it must be the subquery
......@@ -2774,7 +2875,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, &pTableMetaInfo->name, pTableMeta, pTableMetaInfo->vgroupList,
pTableMetaInfo->tagColList, pTableMetaInfo->pVgroupTables);
} else { // transfer the ownership of pTableMeta to the newly create sql object.
STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, 0);
if (pPrevInfo->pTableMeta && pPrevInfo->pTableMeta->tableType < 0) {
terrno = TSDB_CODE_TSC_APP_ERROR;
goto _error;
......@@ -2848,6 +2949,41 @@ void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
}
}
static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) {
// handle the pDownStream process
SRetrieveSupport* ps = param;
SSqlObj* pParentSql = ps->pParentSql;
SSqlObj* pSql = tres;
if (!subAndCheckDone(pSql, pParentSql, ps->subqueryIndex)) {
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" orderOfSub:%d freed, not all subquery finished", pParentSql->self, pSql->self, ps->subqueryIndex);
return;
}
// merge all subquery result
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
/*TAOS_ROW* pRow = */taos_fetch_row(pSql);
if (pSql->res.numOfRows > 0) {
handleDownstreamOperator(pRes, pQueryInfo, &pParentSql->res);
}
code = pParentSql->res.code;
pParentSql->res.qId = -1;
if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows);
} else {
tscAsyncResultOnError(pParentSql);
}
}
static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
printf("123\n");
taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param);
}
// do execute the query according to the query execution plan
void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
if (pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
......@@ -2860,14 +2996,48 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
}
if (taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly
SQueryInfo* pq = taosArrayGetP(pQueryInfo->pUpstream, 0);
pSql->subState.numOfSub = taosArrayGetSize(pQueryInfo->pUpstream);
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t));
pSql->cmd.active = pq;
SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, 0);
pSql->cmd.active = pSub;
pSql->cmd.command = TSDB_SQL_SELECT;
executeQuery(pSql, pq);
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
if (pNew == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
// return NULL;
}
pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew;
pNew->sqlstr = strdup(pSql->sqlstr); // todo refactor
pNew->fp = tscSubqueryCompleteCallback;
SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport));// todo use object id
ps->pParentSql = pSql;
ps->subqueryIndex = 0;
pNew->param = ps;
pSql->pSubs[0] = pNew;
registerSqlObj(pNew);
SSqlCmd* pCmd = &pNew->cmd;
pCmd->command = TSDB_SQL_SELECT;
pCmd->parseFinished = 1;
if (tscAddQueryInfo(pCmd) != TSDB_CODE_SUCCESS) {
}
SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd);
tscQueryInfoCopy(pNewQueryInfo, pSub);
// create sub query to handle the sub query.
executeQuery(pNew, pSub);
// merge nest query result and generate final results
// merge sub query result and generate final results
return;
}
......@@ -2899,7 +3069,7 @@ void tscDoQuery(SSqlObj* pSql) {
if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
tscImportDataFromFile(pSql);
} else {
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
uint16_t type = pQueryInfo->type;
if (QUERY_IS_JOIN_QUERY(type)) {
......@@ -2990,7 +3160,7 @@ bool tscIsQueryWithLimit(SSqlObj* pSql) {
}
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo* pqi = tscGetQueryInfo(pCmd, 0);
SQueryInfo* pqi = tscGetQueryInfo(pCmd);
while(pqi != NULL) {
if (pqi->limit.limit > 0) {
return true;
......@@ -3074,7 +3244,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) {
}
assert(pRes->completed);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// for normal table, no need to try any more if results are all retrieved from one vnode
......@@ -3100,7 +3270,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
/*
* no result returned from the current virtual node anymore, try the next vnode if exists
......@@ -3157,7 +3327,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) {
SSqlRes* pRes = &pSql->res;
pCmd->clauseIndex++;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
pSql->cmd.command = pQueryInfo->command;
......@@ -3465,8 +3635,8 @@ int32_t createProjectionExpr(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaI
int32_t numOfOutput = (int32_t) taosArrayGetSize(pQueryInfo->exprList);
for (int32_t j = 0; j < numOfOutput; ++j) {
SExprInfo* px = taosArrayGetP(pQueryInfo->exprList, j);
if (px->base.resColId == pse->colInfo.colId) {
SExprInfo* p = taosArrayGetP(pQueryInfo->exprList, j);
if (p->base.resColId == pse->colInfo.colId) {
pse->colInfo.colIndex = j;
break;
}
......
......@@ -2843,12 +2843,26 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
pInfo->numOfTables = htonl(pInfo->numOfTables);
pInfo->numOfVgroups = htonl(pInfo->numOfVgroups);
int32_t contLen = pMsg->rpcMsg.contLen - sizeof(SMultiTableInfoMsg);
int32_t num = 0;
int32_t code = TSDB_CODE_SUCCESS;
char* str = strndup(pInfo->tableNames, contLen);
char** nameList = strsplit(str, ",", &num);
SMultiTableMeta *pMultiMeta = NULL;
if (num != pInfo->numOfTables + pInfo->numOfVgroups) {
mError("msg:%p, app:%p, failed to get multi-tableMeta, msg inconsistent", pMsg, pMsg->rpcMsg.ahandle);
code = TSDB_CODE_MND_INVALID_TABLE_NAME;
goto _error;
}
// first malloc 80KB, subsequent reallocation will expand the size as twice of the original size
int32_t totalMallocLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16);
SMultiTableMeta *pMultiMeta = rpcMallocCont(totalMallocLen);
pMultiMeta = rpcMallocCont(totalMallocLen);
if (pMultiMeta == NULL) {
return TSDB_CODE_MND_OUT_OF_MEMORY;
code = TSDB_CODE_MND_OUT_OF_MEMORY;
goto _error;
}
pMultiMeta->contLen = sizeof(SMultiTableMeta);
......@@ -2856,16 +2870,15 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
SArray* pList = taosArrayInit(4, POINTER_BYTES);
for (int32_t t = 0; t < pInfo->numOfTables; ++t) {
char *fullName = (char *)(pInfo->tableNames + t * TSDB_TABLE_FNAME_LEN);
int32_t t = 0;
for (; t < pInfo->numOfTables; ++t) {
char *fullName = nameList[t];
pMsg->pTable = mnodeGetTable(fullName);
if (pMsg->pTable == NULL) {
pMsg->pTable = mnodeGetTable(fullName);
if (pMsg->pTable == NULL) {
rpcFreeCont(pMultiMeta);
mError("msg:%p, app:%p table:%s, failed to get table meta, table not exist", pMsg, pMsg->rpcMsg.ahandle, fullName);
return TSDB_CODE_MND_INVALID_TABLE_NAME;
}
mError("msg:%p, app:%p table:%s, failed to get table meta, table not exist", pMsg, pMsg->rpcMsg.ahandle, fullName);
code = TSDB_CODE_MND_INVALID_TABLE_NAME;
goto _error;
}
if (pMsg->pDb == NULL) {
......@@ -2873,9 +2886,9 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
}
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
rpcFreeCont(pMultiMeta);
mnodeDecTableRef(pMsg->pTable);
return TSDB_CODE_APP_NOT_READY;
code = TSDB_CODE_APP_NOT_READY;
goto _error;
}
int remain = totalMallocLen - pMultiMeta->contLen;
......@@ -2884,20 +2897,18 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
pMultiMeta = rpcReallocCont(pMultiMeta, totalMallocLen);
if (pMultiMeta == NULL) {
mnodeDecTableRef(pMsg->pTable);
return TSDB_CODE_MND_OUT_OF_MEMORY;
code = TSDB_CODE_MND_OUT_OF_MEMORY;
goto _error;
}
}
STableMetaMsg *pMeta = (STableMetaMsg *)((char*) pMultiMeta + pMultiMeta->contLen);
int32_t code = 0;
if (pMsg->pTable->type != TSDB_SUPER_TABLE) {
code = mnodeDoGetChildTableMeta(pMsg, pMeta);
} else {
if (pMsg->pTable->type == TSDB_SUPER_TABLE) {
code = mnodeDoGetSuperTableMeta(pMsg, pMeta);
// keep the full name for each super table for retrieve vgroup list
taosArrayPush(pList, &fullName);
taosArrayPush(pList, &fullName);// keep the full name for each super table for retrieve vgroup list
} else {
code = mnodeDoGetChildTableMeta(pMsg, pMeta);
}
if (code == TSDB_CODE_SUCCESS) {
......@@ -2911,22 +2922,21 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
char* msg = (char*) pMultiMeta + pMultiMeta->contLen;
// add the additional super table names that needs the vgroup info
for(int32_t i = 0; i < pInfo->numOfVgroups; ++i) {
char *fullName = (char *)(pInfo->tableNames + (i + pInfo->numOfTables) * TSDB_TABLE_FNAME_LEN);
taosArrayPush(pList, fullName);
for(;t < pInfo->numOfVgroups; ++t) {
taosArrayPush(pList, &nameList[t]);
}
// add the pVgroupList into the pList
int32_t numOfStable = (int32_t) taosArrayGetSize(pList);
pMultiMeta->numOfVgroup = htonl(numOfStable);
int32_t numOfVgroupList = (int32_t) taosArrayGetSize(pList);
pMultiMeta->numOfVgroup = htonl(numOfVgroupList);
for(int32_t i = 0; i < numOfStable; ++i) {
for(int32_t i = 0; i < numOfVgroupList; ++i) {
char* name = taosArrayGetP(pList, i);
SSTableObj *pTable = mnodeGetSuperTable(name);
if (pTable == NULL) {
mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, name);
mnodeDecTableRef(pTable);
continue;
code = TSDB_CODE_MND_INVALID_TABLE_NAME;
goto _error;
}
msg = serializeVgroupInfo(pTable, name, msg, pMsg, pMsg->rpcMsg.ahandle);
......@@ -2939,6 +2949,14 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
pMsg->rpcRsp.len = pMultiMeta->contLen;
return TSDB_CODE_SUCCESS;
_error:
tfree(str);
tfree(nameList);
rpcFreeCont(pMultiMeta);
taosArrayDestroy(pList);
return code;
}
static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册