提交 c9827141 编写于 作者: H hjxilinx

add the union support in sql parser. refactor some codes. fix bug in join...

add the union support in sql parser. refactor some codes. fix bug in join query caused by feature update #1032. [TBASE-1140]
上级 e15aa9b1
...@@ -121,7 +121,7 @@ STSBuf* tsBufCreate(bool autoDelete); ...@@ -121,7 +121,7 @@ STSBuf* tsBufCreate(bool autoDelete);
STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete); STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete);
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder); STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder);
void tsBufDestory(STSBuf* pTSBuf); void* tsBufDestory(STSBuf* pTSBuf);
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len); void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len);
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx); int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx);
......
...@@ -128,7 +128,7 @@ void tscFieldInfoUpdateVisible(SFieldInfo* pFieldInfo, int32_t index, bool visib ...@@ -128,7 +128,7 @@ void tscFieldInfoUpdateVisible(SFieldInfo* pFieldInfo, int32_t index, bool visib
void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo); void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo);
void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo); void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo);
void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList, int32_t size); void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList, int32_t size);
void tscFieldInfoCopyAll(SFieldInfo* src, SFieldInfo* dst); void tscFieldInfoCopyAll(SFieldInfo* dst, SFieldInfo* src);
TAOS_FIELD* tscFieldInfoGetField(SQueryInfo* pQueryInfo, int32_t index); TAOS_FIELD* tscFieldInfoGetField(SQueryInfo* pQueryInfo, int32_t index);
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index); int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);
...@@ -184,15 +184,17 @@ SMeterMetaInfo* tscGetMeterMetaInfo(SSqlCmd *pCmd, int32_t subClauseIndex, int32 ...@@ -184,15 +184,17 @@ SMeterMetaInfo* tscGetMeterMetaInfo(SSqlCmd *pCmd, int32_t subClauseIndex, int32
SMeterMetaInfo* tscGetMeterMetaInfoFromQueryInfo(SQueryInfo *pQueryInfo, int32_t tableIndex); SMeterMetaInfo* tscGetMeterMetaInfoFromQueryInfo(SQueryInfo *pQueryInfo, int32_t tableIndex);
SQueryInfo *tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex); SQueryInfo *tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex);
int32_t tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex, SQueryInfo** pQueryInfo);
SMeterMetaInfo* tscGetMeterMetaInfoByUid(SQueryInfo* pQueryInfo, int32_t subClauseIndex, uint64_t uid, int32_t* index); SMeterMetaInfo* tscGetMeterMetaInfoByUid(SQueryInfo* pQueryInfo, int32_t subClauseIndex, uint64_t uid, int32_t* index);
void tscClearMeterMetaInfo(SMeterMetaInfo* pMeterMetaInfo, bool removeFromCache); void tscClearMeterMetaInfo(SMeterMetaInfo* pMeterMetaInfo, bool removeFromCache);
SMeterMetaInfo* tscAddMeterMetaInfo(SSqlCmd* pCmd, int32_t subClauseIndex, const char* name, SMeterMeta* pMeterMeta, SMetricMeta* pMetricMeta, SMeterMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, SMeterMeta* pMeterMeta, SMetricMeta* pMetricMeta,
int16_t numOfTags, int16_t* tags); int16_t numOfTags, int16_t* tags);
SMeterMetaInfo* tscAddEmptyMeterMetaInfo(SSqlCmd* pCmd, int32_t subClauseIndex); SMeterMetaInfo* tscAddEmptyMeterMetaInfo(SQueryInfo *pQueryInfo);
int32_t tscAddSubqueryInfo(SSqlCmd *pCmd); int32_t tscAddSubqueryInfo(SSqlCmd *pCmd);
void tscFreeSubqueryInfo(SSqlCmd* pCmd); void tscFreeSubqueryInfo(SSqlCmd* pCmd);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, int32_t subClauseIndex, char* keyStr, uint64_t uid); void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, int32_t subClauseIndex, char* keyStr, uint64_t uid);
int tscGetMetricMeta(SSqlObj* pSql); int tscGetMetricMeta(SSqlObj* pSql);
......
...@@ -400,7 +400,7 @@ typedef struct { ...@@ -400,7 +400,7 @@ typedef struct {
} SIpStrList; } SIpStrList;
// tscSql API // tscSql API
int tsParseSql(SSqlObj *pSql, char *acct, char *db, bool multiVnodeInsertion); int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
void tscInitMsgs(); void tscInitMsgs();
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
......
...@@ -97,7 +97,7 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, ...@@ -97,7 +97,7 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *,
strtolower(pSql->sqlstr, sqlstr); strtolower(pSql->sqlstr, sqlstr);
tscTrace("%p Async SQL: %s, pObj:%p", pSql, pSql->sqlstr, pObj); tscTrace("%p Async SQL: %s, pObj:%p", pSql, pSql->sqlstr, pObj);
int32_t code = tsParseSql(pSql, pObj->acctId, pObj->db, true); int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -523,7 +523,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -523,7 +523,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
} else { // normal async query continues } else { // normal async query continues
code = tsParseSql(pSql, pObj->acctId, pObj->db, false); code = tsParseSql(pSql, false);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
} }
......
...@@ -277,7 +277,7 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { ...@@ -277,7 +277,7 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
return 0; return 0;
} }
tscFreeSqlCmdData(&pNew->cmd); tscClearSubqueryInfo(&pNew->cmd);
pSql->pSubs[j++] = pNew; pSql->pSubs[j++] = pNew;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
...@@ -298,7 +298,7 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { ...@@ -298,7 +298,7 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond); tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond);
tscSqlExprCopy(&pQueryInfo->exprsInfo, &pSupporter->exprsInfo, pSupporter->uid); tscSqlExprCopy(&pQueryInfo->exprsInfo, &pSupporter->exprsInfo, pSupporter->uid);
tscFieldInfoCopyAll(&pSupporter->fieldsInfo, &pQueryInfo->fieldsInfo); tscFieldInfoCopyAll(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo);
// add the ts function for interval query if it is missing // add the ts function for interval query if it is missing
if (pSupporter->exprsInfo.pExprs[0].functionId != TSDB_FUNC_TS && pQueryInfo->nAggTimeInterval > 0) { if (pSupporter->exprsInfo.pExprs[0].functionId != TSDB_FUNC_TS && pQueryInfo->nAggTimeInterval > 0) {
...@@ -918,9 +918,9 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { ...@@ -918,9 +918,9 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
return pTSBuf; return pTSBuf;
} }
void tsBufDestory(STSBuf* pTSBuf) { void* tsBufDestory(STSBuf* pTSBuf) {
if (pTSBuf == NULL) { if (pTSBuf == NULL) {
return; return NULL;
} }
tfree(pTSBuf->assistBuf); tfree(pTSBuf->assistBuf);
...@@ -939,6 +939,7 @@ void tsBufDestory(STSBuf* pTSBuf) { ...@@ -939,6 +939,7 @@ void tsBufDestory(STSBuf* pTSBuf) {
} }
free(pTSBuf); free(pTSBuf);
return NULL;
} }
static STSVnodeBlockInfoEx* tsBufGetLastVnodeInfo(STSBuf* pTSBuf) { static STSVnodeBlockInfoEx* tsBufGetLastVnodeInfo(STSBuf* pTSBuf) {
......
...@@ -944,10 +944,18 @@ int validateTableName(char *tblName, int len) { ...@@ -944,10 +944,18 @@ int validateTableName(char *tblName, int len) {
int doParseInsertSql(SSqlObj *pSql, char *str) { int doParseInsertSql(SSqlObj *pSql, char *str) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
int32_t code = TSDB_CODE_INVALID_SQL;
int32_t totalNum = 0; int32_t totalNum = 0;
SQueryInfo* pQueryInfo = NULL;
SMeterMetaInfo* pMeterMetaInfo = NULL;
SMeterMetaInfo *pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pCmd, 0); int32_t code = tscGetQueryInfoDetailSafely(pCmd, 0, &pQueryInfo);
if (pQueryInfo->numOfTables == 0) {
pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pQueryInfo);
} else {
pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1);
}
if ((code = tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) { if ((code = tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -1208,13 +1216,14 @@ _clean: ...@@ -1208,13 +1216,14 @@ _clean:
return code; return code;
} }
int tsParseInsertSql(SSqlObj *pSql, char *sql, char *acct, char *db) { int tsParseInsertSql(SSqlObj *pSql) {
if (!pSql->pTscObj->writeAuth) { if (!pSql->pTscObj->writeAuth) {
return TSDB_CODE_NO_RIGHTS; return TSDB_CODE_NO_RIGHTS;
} }
int32_t index = 0; int32_t index = 0;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
char* sql = pSql->sqlstr;
SSQLToken sToken = tStrGetToken(sql, &index, false, 0, NULL); SSQLToken sToken = tStrGetToken(sql, &index, false, 0, NULL);
...@@ -1234,12 +1243,9 @@ int tsParseInsertSql(SSqlObj *pSql, char *sql, char *acct, char *db) { ...@@ -1234,12 +1243,9 @@ int tsParseInsertSql(SSqlObj *pSql, char *sql, char *acct, char *db) {
return doParseInsertSql(pSql, sql + index); return doParseInsertSql(pSql, sql + index);
} }
int tsParseSql(SSqlObj *pSql, char *acct, char *db, bool multiVnodeInsertion) { int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion) {
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
// must before clean the sqlcmd object
// tscRemoveMeterMetaInfo(&pSql->cmd, false);
if (NULL == pSql->asyncTblPos) { if (NULL == pSql->asyncTblPos) {
tscCleanSqlCmd(&pSql->cmd); tscCleanSqlCmd(&pSql->cmd);
} else { } else {
...@@ -1260,7 +1266,7 @@ int tsParseSql(SSqlObj *pSql, char *acct, char *db, bool multiVnodeInsertion) { ...@@ -1260,7 +1266,7 @@ int tsParseSql(SSqlObj *pSql, char *acct, char *db, bool multiVnodeInsertion) {
pSql->fp = tscAsyncInsertMultiVnodesProxy; pSql->fp = tscAsyncInsertMultiVnodesProxy;
} }
ret = tsParseInsertSql(pSql, pSql->sqlstr, acct, db); ret = tsParseInsertSql(pSql);
} else { } else {
ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE); ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (TSDB_CODE_SUCCESS != ret) return ret; if (TSDB_CODE_SUCCESS != ret) return ret;
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#include "tstrbuild.h" #include "tstrbuild.h"
int tsParseInsertSql(SSqlObj *pSql, char *sql, char *acct, char *db); int tsParseInsertSql(SSqlObj *pSql);
int taos_query_imp(STscObj* pObj, SSqlObj* pSql); int taos_query_imp(STscObj* pObj, SSqlObj* pSql);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
...@@ -385,12 +385,11 @@ static int insertStmtAddBatch(STscStmt* stmt) { ...@@ -385,12 +385,11 @@ static int insertStmtAddBatch(STscStmt* stmt) {
} }
static int insertStmtPrepare(STscStmt* stmt) { static int insertStmtPrepare(STscStmt* stmt) {
STscObj* taos = stmt->taos;
SSqlObj *pSql = stmt->pSql; SSqlObj *pSql = stmt->pSql;
pSql->cmd.numOfParams = 0; pSql->cmd.numOfParams = 0;
pSql->cmd.batchSize = 0; pSql->cmd.batchSize = 0;
return tsParseInsertSql(pSql, pSql->sqlstr, taos->acctId, taos->db); return tsParseInsertSql(pSql);
} }
static int insertStmtReset(STscStmt* pStmt) { static int insertStmtReset(STscStmt* pStmt) {
......
...@@ -199,14 +199,17 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -199,14 +199,17 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
} }
SSqlCmd* pCmd = &(pSql->cmd); SSqlCmd* pCmd = &(pSql->cmd);
SQueryInfo* pQueryInfo = NULL;
if (!pInfo->valid) { if (!pInfo->valid) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), pInfo->pzErrMsg); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), pInfo->pzErrMsg);
} }
SMeterMetaInfo* pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pCmd, 0); int32_t code = tscGetQueryInfoDetailSafely(pCmd, 0, &pQueryInfo);
SMeterMetaInfo* pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pQueryInfo);
pCmd->command = pInfo->type; pCmd->command = pInfo->type;
int32_t code = 0;
switch (pInfo->type) { switch (pInfo->type) {
case TSDB_SQL_DROP_TABLE: case TSDB_SQL_DROP_TABLE:
...@@ -226,7 +229,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -226,7 +229,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if (pInfo->type == TSDB_SQL_DROP_DB) { if (pInfo->type == TSDB_SQL_DROP_DB) {
assert(pInfo->pDCLInfo->nTokens == 1); assert(pInfo->pDCLInfo->nTokens == 1);
int32_t code = setObjFullName(pMeterMetaInfo->name, getAccountId(pSql), pzName, NULL, NULL); code = setObjFullName(pMeterMetaInfo->name, getAccountId(pSql), pzName, NULL, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
} }
...@@ -500,7 +503,10 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -500,7 +503,10 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
assert(pCmd->numOfClause == 1); assert(pCmd->numOfClause == 1);
for (int32_t i = pCmd->numOfClause; i < pInfo->subclauseInfo.numOfClause; ++i) { for (int32_t i = pCmd->numOfClause; i < pInfo->subclauseInfo.numOfClause; ++i) {
tscAddEmptyMeterMetaInfo(pCmd, i); SQueryInfo* pqi = NULL;
if ((code = tscGetQueryInfoDetailSafely(pCmd, i, &pqi)) != TSDB_CODE_SUCCESS) {
return code;
}
} }
assert(pCmd->numOfClause == pInfo->subclauseInfo.numOfClause); assert(pCmd->numOfClause == pInfo->subclauseInfo.numOfClause);
...@@ -5384,9 +5390,9 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { ...@@ -5384,9 +5390,9 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg0); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg0);
} }
SQueryInfo* pQueryInfo = pCmd->pQueryInfo[index]; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, index);
if (pQueryInfo->numOfTables <= i) { // more than one table if (pQueryInfo->numOfTables <= i) { // more than one table
tscAddEmptyMeterMetaInfo(pCmd, 0); tscAddEmptyMeterMetaInfo(pQueryInfo);
} }
SSQLToken t = {.type = TSDB_DATA_TYPE_BINARY, .n = pTableItem->nLen, .z = pTableItem->pz}; SSQLToken t = {.type = TSDB_DATA_TYPE_BINARY, .n = pTableItem->nLen, .z = pTableItem->pz};
......
...@@ -666,23 +666,26 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu ...@@ -666,23 +666,26 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
// refactor as one method // refactor as one method
SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNewQueryInfo != NULL);
tscColumnBaseInfoUpdateTableIndex(&pNewQueryInfo->colList, 0); tscColumnBaseInfoUpdateTableIndex(&pNewQueryInfo->colList, 0);
tscColumnBaseInfoCopy(&pSupporter->colList, &pNewQueryInfo->colList, 0); tscColumnBaseInfoCopy(&pSupporter->colList, &pNewQueryInfo->colList, 0);
tscSqlExprCopy(&pSupporter->exprsInfo, &pNewQueryInfo->exprsInfo, pSupporter->uid); tscSqlExprCopy(&pSupporter->exprsInfo, &pNewQueryInfo->exprsInfo, pSupporter->uid);
tscFieldInfoCopyAll(&pNewQueryInfo->fieldsInfo, &pSupporter->fieldsInfo); tscFieldInfoCopyAll(&pSupporter->fieldsInfo, &pNewQueryInfo->fieldsInfo);
tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond); tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond);
pSupporter->groupbyExpr = pNewQueryInfo->groupbyExpr;
pNew->cmd.numOfCols = 0; pNew->cmd.numOfCols = 0;
pNewQueryInfo->nAggTimeInterval = 0; pNewQueryInfo->nAggTimeInterval = 0;
memset(&pNewQueryInfo->limit, 0, sizeof(SLimitVal)); memset(&pNewQueryInfo->limit, 0, sizeof(SLimitVal));
// backup the data and clear it in the sqlcmd object
pSupporter->groupbyExpr = pNewQueryInfo->groupbyExpr;
memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr)); memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
// set the ts,tags that involved in join, as the output column of intermediate result // set the ts,tags that involved in join, as the output column of intermediate result
pCmd->pDataBlocks = tscDestroyBlockArrayList(pNew->cmd.pDataBlocks); tscClearSubqueryInfo(&pNew->cmd);
tscFreeSubqueryInfo(&pNew->cmd);
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1}; SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
...@@ -3458,7 +3461,7 @@ static int32_t tscDoGetMeterMeta(SSqlObj *pSql, char *meterId, int32_t index) { ...@@ -3458,7 +3461,7 @@ static int32_t tscDoGetMeterMeta(SSqlObj *pSql, char *meterId, int32_t index) {
pNew->cmd.allocSize = 0; pNew->cmd.allocSize = 0;
tscAddSubqueryInfo(&pNew->cmd); tscAddSubqueryInfo(&pNew->cmd);
pNew->cmd.numOfClause = 1; assert(pNew->cmd.numOfClause == 1);
SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pNewQueryInfo->defaultVal[0] = pQueryInfo->defaultVal[0]; // flag of create table if not exists pNewQueryInfo->defaultVal[0] = pQueryInfo->defaultVal[0]; // flag of create table if not exists
...@@ -3469,7 +3472,7 @@ static int32_t tscDoGetMeterMeta(SSqlObj *pSql, char *meterId, int32_t index) { ...@@ -3469,7 +3472,7 @@ static int32_t tscDoGetMeterMeta(SSqlObj *pSql, char *meterId, int32_t index) {
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
SMeterMetaInfo *pMeterMetaInfo = tscAddEmptyMeterMetaInfo(&pNew->cmd, 0); SMeterMetaInfo *pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pNewQueryInfo);
strcpy(pMeterMetaInfo->name, meterId); strcpy(pMeterMetaInfo->name, meterId);
memcpy(pNew->cmd.payload, pSql->cmd.payload, TSDB_DEFAULT_PAYLOAD_SIZE); memcpy(pNew->cmd.payload, pSql->cmd.payload, TSDB_DEFAULT_PAYLOAD_SIZE);
...@@ -3639,10 +3642,10 @@ int tscGetMetricMeta(SSqlObj *pSql) { ...@@ -3639,10 +3642,10 @@ int tscGetMetricMeta(SSqlObj *pSql) {
pNew->cmd.command = TSDB_SQL_METRIC; pNew->cmd.command = TSDB_SQL_METRIC;
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SMeterMetaInfo *pMMInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, i); SMeterMetaInfo *pMMInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
SMeterMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMMInfo->name); SMeterMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMMInfo->name);
tscAddMeterMetaInfo(&pNew->cmd, 0, pMMInfo->name, pMeterMeta, NULL, pMMInfo->numOfTags, pMMInfo->tagColumnIndex); tscAddMeterMetaInfo(pQueryInfo, pMMInfo->name, pMeterMeta, NULL, pMMInfo->numOfTags, pMMInfo->tagColumnIndex);
} }
if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) { if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
......
...@@ -212,7 +212,7 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { ...@@ -212,7 +212,7 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
tscTrace("%p SQL: %s pObj:%p", pSql, pSql->sqlstr, pObj); tscTrace("%p SQL: %s pObj:%p", pSql, pSql->sqlstr, pObj);
pRes->code = (uint8_t)tsParseSql(pSql, pObj->acctId, pObj->db, false); pRes->code = (uint8_t)tsParseSql(pSql, false);
/* /*
* set the qhandle to 0 before return in order to erase the qhandle value assigned in the previous successful query. * set the qhandle to 0 before return in order to erase the qhandle value assigned in the previous successful query.
...@@ -447,9 +447,9 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { ...@@ -447,9 +447,9 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
SSqlRes *pRes1 = &pSql->pSubs[i]->res; SSqlRes *pRes1 = &pSql->pSubs[i]->res;
SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd; SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd;
SMeterMetaInfo *pMetaInfo = tscGetMeterMetaInfo(pCmd1, 0, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd1, 0);
SMeterMetaInfo *pMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
assert(pQueryInfo->numOfTables == 1); assert(pQueryInfo->numOfTables == 1);
/* /*
...@@ -972,7 +972,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) { ...@@ -972,7 +972,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
pSql->pTableHashList = NULL; pSql->pTableHashList = NULL;
} }
pRes->code = (uint8_t)tsParseSql(pSql, pObj->acctId, pObj->db, false); pRes->code = (uint8_t)tsParseSql(pSql, false);
int code = pRes->code; int code = pRes->code;
tscTrace("%p Valid SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj); tscTrace("%p Valid SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
...@@ -993,7 +993,13 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t ...@@ -993,7 +993,13 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
int code = TSDB_CODE_INVALID_METER_ID; int code = TSDB_CODE_INVALID_METER_ID;
char *str = (char *)tblNameList; char *str = (char *)tblNameList;
SMeterMetaInfo *pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pCmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
if (pQueryInfo == NULL) {
tscAddSubqueryInfo(pCmd);
pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
}
SMeterMetaInfo *pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pQueryInfo);
if ((code = tscAllocPayload(pCmd, tblListLen + 16)) != TSDB_CODE_SUCCESS) { if ((code = tscAllocPayload(pCmd, tblListLen + 16)) != TSDB_CODE_SUCCESS) {
return code; return code;
......
...@@ -885,7 +885,7 @@ void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList ...@@ -885,7 +885,7 @@ void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList
if (size <= 0) { if (size <= 0) {
*dst = *src; *dst = *src;
tscFieldInfoCopyAll(src, dst); tscFieldInfoCopyAll(dst, src);
} else { // only copy the required column } else { // only copy the required column
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
assert(indexList[i] >= 0 && indexList[i] <= src->numOfOutputCols); assert(indexList[i] >= 0 && indexList[i] <= src->numOfOutputCols);
...@@ -894,7 +894,7 @@ void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList ...@@ -894,7 +894,7 @@ void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList
} }
} }
void tscFieldInfoCopyAll(SFieldInfo* src, SFieldInfo* dst) { void tscFieldInfoCopyAll(SFieldInfo* dst, SFieldInfo* src) {
*dst = *src; *dst = *src;
dst->pFields = malloc(sizeof(TAOS_FIELD) * dst->numOfAlloc); dst->pFields = malloc(sizeof(TAOS_FIELD) * dst->numOfAlloc);
...@@ -1565,18 +1565,18 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) { ...@@ -1565,18 +1565,18 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) {
/** /**
* *
* @param pCmd * @param pCmd
* @param unionSubClause denote the index of the union sub clause, usually are 0, if no union query exists. * @param clauseIndex denote the index of the union sub clause, usually are 0, if no union query exists.
* @param tableIndex denote the table index for join query, where more than one table exists * @param tableIndex denote the table index for join query, where more than one table exists
* @return * @return
*/ */
SMeterMetaInfo* tscGetMeterMetaInfo(SSqlCmd* pCmd, int32_t unionClauseIndex, int32_t tableIndex) { SMeterMetaInfo* tscGetMeterMetaInfo(SSqlCmd* pCmd, int32_t clauseIndex, int32_t tableIndex) {
if (pCmd == NULL || pCmd->numOfClause == 0) { if (pCmd == NULL || pCmd->numOfClause == 0) {
return NULL; return NULL;
} }
assert(unionClauseIndex >= 0 && unionClauseIndex < pCmd->numOfClause); assert(clauseIndex >= 0 && clauseIndex < pCmd->numOfClause);
SQueryInfo* pQueryInfo = pCmd->pQueryInfo[unionClauseIndex]; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
return tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, tableIndex); return tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, tableIndex);
} }
...@@ -1601,6 +1601,23 @@ SQueryInfo* tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex) { ...@@ -1601,6 +1601,23 @@ SQueryInfo* tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex) {
return pCmd->pQueryInfo[subClauseIndex]; return pCmd->pQueryInfo[subClauseIndex];
} }
int32_t tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex, SQueryInfo** pQueryInfo) {
int32_t ret = TSDB_CODE_SUCCESS;
assert(subClauseIndex >= 0 && subClauseIndex < TSDB_MAX_UNION_CLAUSE);
*pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex);
while ((*pQueryInfo) == NULL) {
if ((ret = tscAddSubqueryInfo(pCmd)) != TSDB_CODE_SUCCESS) {
return ret;
}
(*pQueryInfo) = tscGetQueryInfoDetail(pCmd, subClauseIndex);
}
return TSDB_CODE_SUCCESS;
}
SMeterMetaInfo* tscGetMeterMetaInfoByUid(SQueryInfo* pQueryInfo, int32_t subClauseIndex, uint64_t uid, int32_t* index) { SMeterMetaInfo* tscGetMeterMetaInfoByUid(SQueryInfo* pQueryInfo, int32_t subClauseIndex, uint64_t uid, int32_t* index) {
int32_t k = -1; int32_t k = -1;
...@@ -1636,7 +1653,7 @@ int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) { ...@@ -1636,7 +1653,7 @@ int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void doFreeSubqueryInfo(SQueryInfo* pQueryInfo, int64_t address) { static void doClearSubqueryInfo(SQueryInfo* pQueryInfo) {
tscTagCondRelease(&pQueryInfo->tagCond); tscTagCondRelease(&pQueryInfo->tagCond);
tscClearFieldInfo(&pQueryInfo->fieldsInfo); tscClearFieldInfo(&pQueryInfo->fieldsInfo);
...@@ -1646,13 +1663,14 @@ static void doFreeSubqueryInfo(SQueryInfo* pQueryInfo, int64_t address) { ...@@ -1646,13 +1663,14 @@ static void doFreeSubqueryInfo(SQueryInfo* pQueryInfo, int64_t address) {
tscColumnBaseInfoDestroy(&pQueryInfo->colList); tscColumnBaseInfoDestroy(&pQueryInfo->colList);
memset(&pQueryInfo->colList, 0, sizeof(pQueryInfo->colList)); memset(&pQueryInfo->colList, 0, sizeof(pQueryInfo->colList));
if (pQueryInfo->tsBuf != NULL) { pQueryInfo->tsBuf = tsBufDestory(pQueryInfo->tsBuf);
tsBufDestory(pQueryInfo->tsBuf); }
pQueryInfo->tsBuf = NULL;
}
tscRemoveAllMeterMetaInfo(pQueryInfo, (const char*) address, false); void tscClearSubqueryInfo(SSqlCmd* pCmd) {
tfree(pQueryInfo); for(int32_t i = 0; i < pCmd->numOfClause; ++i) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
doClearSubqueryInfo(pQueryInfo);
}
} }
void tscFreeSubqueryInfo(SSqlCmd* pCmd) { void tscFreeSubqueryInfo(SSqlCmd* pCmd) {
...@@ -1661,24 +1679,26 @@ void tscFreeSubqueryInfo(SSqlCmd* pCmd) { ...@@ -1661,24 +1679,26 @@ void tscFreeSubqueryInfo(SSqlCmd* pCmd) {
} }
for (int32_t i = 0; i < pCmd->numOfClause; ++i) { for (int32_t i = 0; i < pCmd->numOfClause; ++i) {
int64_t offset = offsetof(SSqlObj, cmd); char *addr = (char *) pCmd - offsetof(SSqlObj, cmd);
int64_t addr = (char*) pCmd - offset;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
doFreeSubqueryInfo(tscGetQueryInfoDetail(pCmd, i), addr); doClearSubqueryInfo(pQueryInfo);
tscRemoveAllMeterMetaInfo(pQueryInfo, (const char *) addr, false);
tfree(pQueryInfo);
} }
pCmd->numOfClause = 0; pCmd->numOfClause = 0;
tfree(pCmd->pQueryInfo); tfree(pCmd->pQueryInfo);
} }
SMeterMetaInfo* tscAddMeterMetaInfo(SSqlCmd* pCmd, int32_t subClauseIndex, const char* name, SMeterMeta* pMeterMeta, SMeterMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, SMeterMeta* pMeterMeta,
SMetricMeta* pMetricMeta, int16_t numOfTags, int16_t* tags) { SMetricMeta* pMetricMeta, int16_t numOfTags, int16_t* tags) {
assert(subClauseIndex >= 0 && subClauseIndex < TSDB_MAX_UNION_CLAUSE); // while (pCmd->numOfClause <= subClauseIndex) {
while (pCmd->numOfClause <= subClauseIndex) { // tscAddSubqueryInfo(pCmd);
tscAddSubqueryInfo(pCmd); // }
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); // SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
void* pAlloc = realloc(pQueryInfo->pMeterInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES); void* pAlloc = realloc(pQueryInfo->pMeterInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES);
if (pAlloc == NULL) { if (pAlloc == NULL) {
...@@ -1708,8 +1728,8 @@ SMeterMetaInfo* tscAddMeterMetaInfo(SSqlCmd* pCmd, int32_t subClauseIndex, const ...@@ -1708,8 +1728,8 @@ SMeterMetaInfo* tscAddMeterMetaInfo(SSqlCmd* pCmd, int32_t subClauseIndex, const
return pMeterMetaInfo; return pMeterMetaInfo;
} }
SMeterMetaInfo* tscAddEmptyMeterMetaInfo(SSqlCmd* pCmd, int32_t subClauseIndex) { SMeterMetaInfo* tscAddEmptyMeterMetaInfo(SQueryInfo* pQueryInfo) {
return tscAddMeterMetaInfo(pCmd, subClauseIndex, NULL, NULL, NULL, 0, NULL); return tscAddMeterMetaInfo(pQueryInfo, NULL, NULL, NULL, 0, NULL);
} }
void doRemoveMeterMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFromCache) { void doRemoveMeterMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFromCache) {
...@@ -1858,11 +1878,11 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -1858,11 +1878,11 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
SMeterMeta* pMeterMeta = taosGetDataFromCache(tscCacheHandle, name); SMeterMeta* pMeterMeta = taosGetDataFromCache(tscCacheHandle, name);
SMetricMeta* pMetricMeta = taosGetDataFromCache(tscCacheHandle, key); SMetricMeta* pMetricMeta = taosGetDataFromCache(tscCacheHandle, key);
pFinalInfo = tscAddMeterMetaInfo(&pNew->cmd, 0, name, pMeterMeta, pMetricMeta, pMeterMetaInfo->numOfTags, pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pMeterMeta, pMetricMeta, pMeterMetaInfo->numOfTags,
pMeterMetaInfo->tagColumnIndex); pMeterMetaInfo->tagColumnIndex);
} else { } else { // transfer the ownership of pMeterMeta/pMetricMeta to the newly create sql object.
SMeterMetaInfo* pPrevInfo = tscGetMeterMetaInfo(&pPrevSql->cmd, 0, 0); SMeterMetaInfo* pPrevInfo = tscGetMeterMetaInfo(&pPrevSql->cmd, 0, 0);
pFinalInfo = tscAddMeterMetaInfo(&pNew->cmd, 0, name, pPrevInfo->pMeterMeta, pPrevInfo->pMetricMeta, pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pPrevInfo->pMeterMeta, pPrevInfo->pMetricMeta,
pMeterMetaInfo->numOfTags, pMeterMetaInfo->tagColumnIndex); pMeterMetaInfo->numOfTags, pMeterMetaInfo->tagColumnIndex);
pPrevInfo->pMeterMeta = NULL; pPrevInfo->pMeterMeta = NULL;
......
...@@ -2274,10 +2274,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2274,10 +2274,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tfree(pRuntimeEnv->pInterpoBuf); tfree(pRuntimeEnv->pInterpoBuf);
} }
if (pRuntimeEnv->pTSBuf != NULL) { pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf);
tsBufDestory(pRuntimeEnv->pTSBuf);
pRuntimeEnv->pTSBuf = NULL;
}
} }
// get maximum time interval in each file // get maximum time interval in each file
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册