提交 2d5c2a5f 编写于 作者: H Haojun Liao

[td-225]fix bugs found by regression test.

上级 e9db267b
...@@ -166,7 +166,7 @@ void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo); ...@@ -166,7 +166,7 @@ void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo);
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index); int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);
void tscFieldInfoClear(SFieldInfo* pFieldInfo); void tscFieldInfoClear(SFieldInfo* pFieldInfo);
void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc); void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc, const SArray* pExprList);
static FORCE_INLINE int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQueryInfo->fieldsInfo.numOfOutput; } static FORCE_INLINE int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQueryInfo->fieldsInfo.numOfOutput; }
...@@ -192,11 +192,14 @@ SExprInfo* tscExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t function ...@@ -192,11 +192,14 @@ SExprInfo* tscExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t function
size_t tscNumOfExprs(SQueryInfo* pQueryInfo); size_t tscNumOfExprs(SQueryInfo* pQueryInfo);
SExprInfo *tscExprGet(SQueryInfo* pQueryInfo, int32_t index); SExprInfo *tscExprGet(SQueryInfo* pQueryInfo, int32_t index);
int32_t tscExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy); int32_t tscExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
int32_t tscExprCopyAll(SArray* dst, const SArray* src, bool deepcopy);
void tscExprAssign(SExprInfo* dst, const SExprInfo* src); void tscExprAssign(SExprInfo* dst, const SExprInfo* src);
void tscExprDestroy(SArray* pExprInfo); void tscExprDestroy(SArray* pExprInfo);
int32_t createProjectionExpr(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SExprInfo*** pExpr, int32_t* num); int32_t createProjectionExpr(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SExprInfo*** pExpr, int32_t* num);
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta);
SColumn* tscColumnClone(const SColumn* src); SColumn* tscColumnClone(const SColumn* src);
bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid); bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid);
SColumn* tscColumnListInsert(SArray* pColumnList, int32_t columnIndex, uint64_t uid, SSchema* pSchema); SColumn* tscColumnListInsert(SArray* pColumnList, int32_t columnIndex, uint64_t uid, SSchema* pSchema);
......
...@@ -226,7 +226,7 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) { ...@@ -226,7 +226,7 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) {
SSchedMsg schedMsg = {0}; SSchedMsg schedMsg = {0};
schedMsg.fp = doRetrieveSubqueryData; schedMsg.fp = doRetrieveSubqueryData;
schedMsg.ahandle = (void *)pSql->self; schedMsg.ahandle = (void *)pSql;
schedMsg.thandle = (void *)1; schedMsg.thandle = (void *)1;
schedMsg.msg = 0; schedMsg.msg = 0;
taosScheduleTask(tscQhandle, &schedMsg); taosScheduleTask(tscQhandle, &schedMsg);
......
...@@ -974,7 +974,7 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -974,7 +974,7 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
if (pInfo->currentGroupOffset == 0) { if (pInfo->currentGroupOffset == 0) {
pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup); pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -982,7 +982,7 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -982,7 +982,7 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) { if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) {
while ((*newgroup) == false) { // ignore the remain blocks while ((*newgroup) == false) { // ignore the remain blocks
pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup); pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -994,7 +994,7 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -994,7 +994,7 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
return pBlock; return pBlock;
} }
pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup); pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -1008,7 +1008,7 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -1008,7 +1008,7 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
} }
while ((*newgroup) == false) { while ((*newgroup) == false) {
pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup); pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
......
...@@ -803,7 +803,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC ...@@ -803,7 +803,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
} }
STableMetaInfo *pSTableMetaInfo = tscGetMetaInfo(pQueryInfo, STABLE_INDEX); STableMetaInfo *pSTableMetaInfo = tscGetMetaInfo(pQueryInfo, STABLE_INDEX);
code = tscSetTableFullName(&pTableMetaInfo->name, &sToken, pSql); code = tscSetTableFullName(&pSTableMetaInfo->name, &sToken, pSql);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
......
...@@ -7149,6 +7149,9 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -7149,6 +7149,9 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
int32_t size = taosArrayGetSize(pInfo->list); int32_t size = taosArrayGetSize(pInfo->list);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SSqlNode* pSqlNode = taosArrayGetP(pInfo->list, i); SSqlNode* pSqlNode = taosArrayGetP(pInfo->list, i);
if (pSqlNode->from == NULL) {
goto _end;
}
// load the table meta in the from clause // load the table meta in the from clause
if (pSqlNode->from->type == SQL_NODE_FROM_TABLELIST) { if (pSqlNode->from->type == SQL_NODE_FROM_TABLELIST) {
...@@ -7330,6 +7333,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS ...@@ -7330,6 +7333,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS
SSqlNode* p = taosArrayGetP(subInfo->pSubquery, 0); SSqlNode* p = taosArrayGetP(subInfo->pSubquery, 0);
SQueryInfo* pSub = calloc(1, sizeof(SQueryInfo)); SQueryInfo* pSub = calloc(1, sizeof(SQueryInfo));
printf("-------------queryinfo:%p\n", pSub);
tscInitQueryInfo(pSub); tscInitQueryInfo(pSub);
int32_t code = validateSqlNode(pSql, p, pSub); int32_t code = validateSqlNode(pSql, p, pSub);
...@@ -7408,6 +7412,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -7408,6 +7412,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
} }
if (pSqlNode->from->type == SQL_NODE_FROM_SUBQUERY) { if (pSqlNode->from->type == SQL_NODE_FROM_SUBQUERY) {
clearAllTableMetaInfo(pQueryInfo, false);
pQueryInfo->numOfTables = 0; pQueryInfo->numOfTables = 0;
// parse the subquery in the first place // parse the subquery in the first place
......
...@@ -1997,6 +1997,10 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { ...@@ -1997,6 +1997,10 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
rsp += sizeof(SMultiTableMeta); rsp += sizeof(SMultiTableMeta);
SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pSql->param); SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pSql->param);
if(pParentSql == NULL) {
return pSql->res.code;
}
SSqlCmd *pParentCmd = &pParentSql->cmd; SSqlCmd *pParentCmd = &pParentSql->cmd;
SHashObj *pSet = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); SHashObj *pSet = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
...@@ -2006,12 +2010,16 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { ...@@ -2006,12 +2010,16 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
STableMetaMsg *pMetaMsg = (STableMetaMsg *)pMsg; STableMetaMsg *pMetaMsg = (STableMetaMsg *)pMsg;
int32_t code = tableMetaMsgConvert(pMetaMsg); int32_t code = tableMetaMsgConvert(pMetaMsg);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosHashCleanup(pSet);
taosReleaseRef(tscObjRef, pParentSql->self);
return code; return code;
} }
STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg); STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg);
if (!tIsValidSchema(pTableMeta->schema, pTableMeta->tableInfo.numOfColumns, pTableMeta->tableInfo.numOfTags)) { if (!tIsValidSchema(pTableMeta->schema, pTableMeta->tableInfo.numOfColumns, pTableMeta->tableInfo.numOfTags)) {
tscError("0x%"PRIx64" invalid table meta from mnode, name:%s", pSql->self, pMetaMsg->tableFname); tscError("0x%"PRIx64" invalid table meta from mnode, name:%s", pSql->self, pMetaMsg->tableFname);
taosHashCleanup(pSet);
taosReleaseRef(tscObjRef, pParentSql->self);
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
...@@ -2062,6 +2070,7 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { ...@@ -2062,6 +2070,7 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
tscDebug("0x%"PRIx64" load multi-tableMeta resp from complete numOfTables:%d", pSql->self, pMultiMeta->numOfTables); tscDebug("0x%"PRIx64" load multi-tableMeta resp from complete numOfTables:%d", pSql->self, pMultiMeta->numOfTables);
taosHashCleanup(pSet); taosHashCleanup(pSet);
taosReleaseRef(tscObjRef, pParentSql->self);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2090,51 +2099,10 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { ...@@ -2090,51 +2099,10 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
int32_t size = 0; int32_t size = 0;
pInfo->vgroupList = createVgroupInfoFromMsg(pMsg, &size, pSql->self); pInfo->vgroupList = createVgroupInfoFromMsg(pMsg, &size, pSql->self);
/* size_t vgroupsz = sizeof(SVgroupInfo) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo);
pInfo->vgroupList = calloc(1, vgroupsz);
assert(pInfo->vgroupList != NULL);
pInfo->vgroupList->numOfVgroups = pVgroupMsg->numOfVgroups;
if (pInfo->vgroupList->numOfVgroups <= 0) {
tscDebug("0x%"PRIx64" empty vgroup info, no corresponding tables for stable", pSql->self);
} else {
for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
// just init, no need to lock
SVgroupInfo *pVgroup = &pInfo->vgroupList->vgroups[j];
SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
vmsg->vgId = htonl(vmsg->vgId);
vmsg->numOfEps = vmsg->numOfEps;
for (int32_t k = 0; k < vmsg->numOfEps; ++k) {
vmsg->epAddr[k].port = htons(vmsg->epAddr[k].port);
}
SNewVgroupInfo newVi = createNewVgroupInfo(vmsg);
pVgroup->numOfEps = newVi.numOfEps;
pVgroup->vgId = newVi.vgId;
for (int32_t k = 0; k < vmsg->numOfEps; ++k) {
pVgroup->epAddr[k].port = newVi.ep[k].port;
pVgroup->epAddr[k].fqdn = strndup(newVi.ep[k].fqdn, TSDB_FQDN_LEN);
}
// check if current buffer contains the vgroup info.
// If not, add it
SNewVgroupInfo existVgroupInfo = {.inUse = -1};
taosHashGetClone(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), NULL, &existVgroupInfo, sizeof(SNewVgroupInfo));
if (((existVgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&existVgroupInfo, vmsg)) ||
(existVgroupInfo.inUse < 0)) { // vgroup info exists, compare with it
taosHashPut(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), &newVi, sizeof(newVi));
tscDebug("add new VgroupInfo, vgId:%d, total cached:%d", newVi.vgId, (int32_t) taosHashGetSize(tscVgroupMap));
}
}
}
*/
pMsg += size; pMsg += size;
} }
taosReleaseRef(tscObjRef, parent->self); taosReleaseRef(tscObjRef, parent->self);
return pSql->res.code; return pSql->res.code;
} }
...@@ -2378,8 +2346,6 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { ...@@ -2378,8 +2346,6 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
tscSetResRawPtr(pRes, pQueryInfo); tscSetResRawPtr(pRes, pQueryInfo);
} }
// handleDownstreamOperator(pRes, pQueryInfo);
if (pSql->pSubscription != NULL) { if (pSql->pSubscription != NULL) {
int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput; int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
......
...@@ -2894,7 +2894,7 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo ...@@ -2894,7 +2894,7 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo
pQueryInfo->limit.limit = -1; pQueryInfo->limit.limit = -1;
pQueryInfo->limit.offset = 0; pQueryInfo->limit.offset = 0;
assert(/*pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 &&*/ trsupport->subqueryIndex < pSql->subState.numOfSub); assert(trsupport->subqueryIndex < pSql->subState.numOfSub);
// launch subquery for each vnode, so the subquery index equals to the vgroupIndex. // launch subquery for each vnode, so the subquery index equals to the vgroupIndex.
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index);
...@@ -3577,17 +3577,9 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STable ...@@ -3577,17 +3577,9 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STable
} }
} }
// for (int32_t i = 0; i < numOfOutput; ++i) { // todo refactor: filter should not be applied here.
// SExprInfo* pExprInfo = &pExprs[i];
// if (pExprInfo->pExpr != NULL) {
// tExprTreeDestroy(pExprInfo->pExpr, NULL);
// pExprInfo->pExpr = NULL;
// }
// }
//
// tfree(pExprs);
createFilterInfo(pQueryAttr, 0); createFilterInfo(pQueryAttr, 0);
pQueryAttr->numOfFilterCols = 0;
SArray* pa = NULL; SArray* pa = NULL;
if (stage == MASTER_SCAN) { if (stage == MASTER_SCAN) {
......
...@@ -31,7 +31,6 @@ ...@@ -31,7 +31,6 @@
#include "ttokendef.h" #include "ttokendef.h"
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo); static void freeQueryInfoImpl(SQueryInfo* pQueryInfo);
static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta);
static void tscStrToLower(char *str, int32_t n) { static void tscStrToLower(char *str, int32_t n) {
if (str == NULL || n <= 0) { return;} if (str == NULL || n <= 0) { return;}
...@@ -716,7 +715,6 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) { ...@@ -716,7 +715,6 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) {
return pBlock; return pBlock;
} }
static int32_t v = 0;
SSDataBlock* doDataBlockJoin(void* param, bool* newgroup) { SSDataBlock* doDataBlockJoin(void* param, bool* newgroup) {
SOperatorInfo *pOperator = (SOperatorInfo*) param; SOperatorInfo *pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
...@@ -735,11 +733,6 @@ SSDataBlock* doDataBlockJoin(void* param, bool* newgroup) { ...@@ -735,11 +733,6 @@ SSDataBlock* doDataBlockJoin(void* param, bool* newgroup) {
pStatus->pBlock = pOperator->upstream[i]->exec(pOperator->upstream[i], newgroup); pStatus->pBlock = pOperator->upstream[i]->exec(pOperator->upstream[i], newgroup);
pStatus->index = 0; pStatus->index = 0;
if (i == 0 && pStatus->pBlock != NULL) {
v += pStatus->pBlock->info.rows;
printf("---------------%d\n", v);
}
if (pStatus->pBlock == NULL) { if (pStatus->pBlock == NULL) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -901,6 +894,13 @@ SOperatorInfo* createDummyInputOperator(SSqlObj* pSql, SSchema* pSchema, int32_t ...@@ -901,6 +894,13 @@ SOperatorInfo* createDummyInputOperator(SSqlObj* pSql, SSchema* pSchema, int32_t
return pOptr; return pOptr;
} }
static void destroyJoinOperator(void* param, int32_t numOfOutput) {
SJoinOperatorInfo* pInfo = (SJoinOperatorInfo*) param;
tfree(pInfo->status);
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
}
SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput) { SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput) {
SJoinOperatorInfo* pInfo = calloc(1, sizeof(SJoinOperatorInfo)); SJoinOperatorInfo* pInfo = calloc(1, sizeof(SJoinOperatorInfo));
...@@ -931,7 +931,7 @@ SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstre ...@@ -931,7 +931,7 @@ SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstre
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->exec = doDataBlockJoin; pOperator->exec = doDataBlockJoin;
pOperator->cleanup = destroyDummyInputOperator; pOperator->cleanup = destroyJoinOperator;
for(int32_t i = 0; i < numOfUpstream; ++i) { for(int32_t i = 0; i < numOfUpstream; ++i) {
appendUpstream(pOperator, pUpstream[i]); appendUpstream(pOperator, pUpstream[i]);
...@@ -1075,8 +1075,9 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) { ...@@ -1075,8 +1075,9 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) {
while(pQueryInfo != NULL) { while(pQueryInfo != NULL) {
SQueryInfo* p = pQueryInfo->sibling; SQueryInfo* p = pQueryInfo->sibling;
if (taosArrayGetSize(pQueryInfo->pUpstream) > 0) { int32_t numOfUpstream = taosArrayGetSize(pQueryInfo->pUpstream);
SQueryInfo* pUp = taosArrayGetP(pQueryInfo->pUpstream, 0); for(int32_t i = 0; i < numOfUpstream; ++i) {
SQueryInfo* pUp = taosArrayGetP(pQueryInfo->pUpstream, i);
freeQueryInfoImpl(pUp); freeQueryInfoImpl(pUp);
clearAllTableMetaInfo(pUp, removeMeta); clearAllTableMetaInfo(pUp, removeMeta);
...@@ -1097,11 +1098,11 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) { ...@@ -1097,11 +1098,11 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) {
} }
tfree(pQueryInfo); tfree(pQueryInfo);
pQueryInfo = p; pQueryInfo = p;
} }
pCmd->pQueryInfo = NULL; pCmd->pQueryInfo = NULL;
pCmd->active = NULL;
} }
void destroyTableNameList(SSqlCmd* pCmd) { void destroyTableNameList(SSqlCmd* pCmd) {
...@@ -1132,6 +1133,9 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool removeMeta) { ...@@ -1132,6 +1133,9 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool removeMeta) {
pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList, removeMeta); pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList, removeMeta);
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
tscFreeQueryInfo(pCmd, removeMeta); tscFreeQueryInfo(pCmd, removeMeta);
taosHashCleanup(pCmd->pTableMetaMap);
pCmd->pTableMetaMap = NULL;
} }
void tscFreeSqlResult(SSqlObj* pSql) { void tscFreeSqlResult(SSqlObj* pSql) {
...@@ -1834,8 +1838,8 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) { ...@@ -1834,8 +1838,8 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
memset(pFieldInfo, 0, sizeof(SFieldInfo)); memset(pFieldInfo, 0, sizeof(SFieldInfo));
} }
void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc) { void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc, const SArray* pExprList) {
assert(pFieldInfo != NULL && pSrc != NULL); assert(pFieldInfo != NULL && pSrc != NULL && pExprList != NULL);
pFieldInfo->numOfOutput = pSrc->numOfOutput; pFieldInfo->numOfOutput = pSrc->numOfOutput;
if (pSrc->final != NULL) { if (pSrc->final != NULL) {
...@@ -1845,13 +1849,25 @@ void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc) { ...@@ -1845,13 +1849,25 @@ void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc) {
if (pSrc->internalField != NULL) { if (pSrc->internalField != NULL) {
size_t num = taosArrayGetSize(pSrc->internalField); size_t num = taosArrayGetSize(pSrc->internalField);
size_t numOfExpr = taosArrayGetSize(pExprList);
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SInternalField* pfield = taosArrayGet(pSrc->internalField, i); SInternalField* pfield = taosArrayGet(pSrc->internalField, i);
SInternalField p = {.visible = pfield->visible, .field = pfield->field}; SInternalField p = {.visible = pfield->visible, .field = pfield->field};
p.pExpr = calloc(1, sizeof(SExprInfo));
tscExprAssign(p.pExpr, pfield->pExpr); int32_t resColId = pfield->pExpr->base.resColId;
for(int32_t j = 0; j < numOfExpr; ++j) {
SExprInfo* pExpr = taosArrayGetP(pExprList, j);
if (pExpr->base.resColId == resColId) {
p.pExpr = pExpr;
break;
}
}
// p.pExpr = calloc(1, sizeof(SExprInfo));
// tscExprAssign(p.pExpr, pfield->pExpr);
taosArrayPush(pFieldInfo->internalField, &p);
} }
} }
} }
...@@ -2012,7 +2028,7 @@ int32_t tscExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy) ...@@ -2012,7 +2028,7 @@ int32_t tscExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy)
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SExprInfo* pExpr = taosArrayGetP(src, i); SExprInfo* pExpr = taosArrayGetP(src, i);
if (uid != 0 && pExpr->base.uid == uid) { if (pExpr->base.uid == uid) {
if (deepcopy) { if (deepcopy) {
SExprInfo* p1 = calloc(1, sizeof(SExprInfo)); SExprInfo* p1 = calloc(1, sizeof(SExprInfo));
tscExprAssign(p1, pExpr); tscExprAssign(p1, pExpr);
...@@ -2029,6 +2045,26 @@ int32_t tscExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy) ...@@ -2029,6 +2045,26 @@ int32_t tscExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy)
return 0; return 0;
} }
int32_t tscExprCopyAll(SArray* dst, const SArray* src, bool deepcopy) {
assert(src != NULL && dst != NULL);
size_t size = taosArrayGetSize(src);
for (int32_t i = 0; i < size; ++i) {
SExprInfo* pExpr = taosArrayGetP(src, i);
if (deepcopy) {
SExprInfo* p1 = calloc(1, sizeof(SExprInfo));
tscExprAssign(p1, pExpr);
taosArrayPush(dst, &p1);
} else {
taosArrayPush(dst, &pExpr);
}
}
return 0;
}
bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid) { bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid) {
// ignore the tbname columnIndex to be inserted into source list // ignore the tbname columnIndex to be inserted into source list
if (columnIndex < 0) { if (columnIndex < 0) {
...@@ -2688,7 +2724,7 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) { ...@@ -2688,7 +2724,7 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) {
pQueryInfo->fillType = pSrc->fillType; pQueryInfo->fillType = pSrc->fillType;
pQueryInfo->fillVal = NULL; pQueryInfo->fillVal = NULL;
pQueryInfo->clauseLimit = pSrc->clauseLimit; pQueryInfo->clauseLimit = pSrc->clauseLimit;
pQueryInfo->numOfTables = pSrc->numOfTables; pQueryInfo->numOfTables = 0;
pQueryInfo->window = pSrc->window; pQueryInfo->window = pSrc->window;
pQueryInfo->sessionWindow = pSrc->sessionWindow; pQueryInfo->sessionWindow = pSrc->sessionWindow;
pQueryInfo->pTableMetaInfo = NULL; pQueryInfo->pTableMetaInfo = NULL;
...@@ -2728,14 +2764,14 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) { ...@@ -2728,14 +2764,14 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) {
memcpy(pQueryInfo->fillVal, pSrc->fillVal, pSrc->fieldsInfo.numOfOutput * sizeof(int64_t)); memcpy(pQueryInfo->fillVal, pSrc->fillVal, pSrc->fieldsInfo.numOfOutput * sizeof(int64_t));
} }
tscColumnListCopyAll(pQueryInfo->colList, pSrc->colList); if (tscExprCopyAll(pQueryInfo->exprList, pSrc->exprList, true) != 0) {
tscFieldInfoCopy(&pQueryInfo->fieldsInfo, &pSrc->fieldsInfo);
if (tscExprCopy(pQueryInfo->exprList, pSrc->exprList, 0, true) != 0) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; goto _error;
} }
tscColumnListCopyAll(pQueryInfo->colList, pSrc->colList);
tscFieldInfoCopy(&pQueryInfo->fieldsInfo, &pSrc->fieldsInfo, pQueryInfo->exprList);
for(int32_t i = 0; i < pSrc->numOfTables; ++i) { for(int32_t i = 0; i < pSrc->numOfTables; ++i) {
STableMetaInfo* p1 = tscGetMetaInfo((SQueryInfo*) pSrc, i); STableMetaInfo* p1 = tscGetMetaInfo((SQueryInfo*) pSrc, i);
...@@ -2822,28 +2858,29 @@ void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta) { ...@@ -2822,28 +2858,29 @@ void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta) {
if (removeMeta) { if (removeMeta) {
char name[TSDB_TABLE_FNAME_LEN] = {0}; char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, name); tNameExtractFullName(&pTableMetaInfo->name, name);
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
} }
tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables); tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
tscClearTableMetaInfo(pTableMetaInfo); tscClearTableMetaInfo(pTableMetaInfo);
free(pTableMetaInfo); free(pTableMetaInfo);
} }
tfree(pQueryInfo->pTableMetaInfo); tfree(pQueryInfo->pTableMetaInfo);
} }
STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, SName* name, STableMeta* pTableMeta, STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, SName* name, STableMeta* pTableMeta,
SVgroupsInfo* vgroupList, SArray* pTagCols, SArray* pVgroupTables) { SVgroupsInfo* vgroupList, SArray* pTagCols, SArray* pVgroupTables) {
void* pAlloc = realloc(pQueryInfo->pTableMetaInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES); void* tmp = realloc(pQueryInfo->pTableMetaInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES);
if (pAlloc == NULL) { if (tmp == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pQueryInfo->pTableMetaInfo = pAlloc; pQueryInfo->pTableMetaInfo = tmp;
STableMetaInfo* pTableMetaInfo = calloc(1, sizeof(STableMetaInfo)); STableMetaInfo* pTableMetaInfo = calloc(1, sizeof(STableMetaInfo));
if (pTableMetaInfo == NULL) { if (pTableMetaInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL; return NULL;
...@@ -3018,10 +3055,11 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t ...@@ -3018,10 +3055,11 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
SSqlCmd* pnCmd = &pNew->cmd; SSqlCmd* pnCmd = &pNew->cmd;
memcpy(pnCmd, pCmd, sizeof(SSqlCmd)); memcpy(pnCmd, pCmd, sizeof(SSqlCmd));
pnCmd->command = cmd; pnCmd->command = cmd;
pnCmd->payload = NULL; pnCmd->payload = NULL;
pnCmd->allocSize = 0; pnCmd->allocSize = 0;
pnCmd->pTableMetaMap = NULL;
pnCmd->pQueryInfo = NULL; pnCmd->pQueryInfo = NULL;
pnCmd->clauseIndex = 0; pnCmd->clauseIndex = 0;
...@@ -3206,7 +3244,7 @@ void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -3206,7 +3244,7 @@ void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
} }
void doRetrieveSubqueryData(SSchedMsg *pMsg) { void doRetrieveSubqueryData(SSchedMsg *pMsg) {
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pMsg->ahandle); SSqlObj* pSql = (SSqlObj*) pMsg->ahandle;
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
tscDebug("%p SqlObj is freed, not add into queue async res", pMsg->ahandle); tscDebug("%p SqlObj is freed, not add into queue async res", pMsg->ahandle);
return; return;
...@@ -3246,7 +3284,7 @@ static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) { ...@@ -3246,7 +3284,7 @@ static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) {
SSchedMsg schedMsg = {0}; SSchedMsg schedMsg = {0};
schedMsg.fp = doRetrieveSubqueryData; schedMsg.fp = doRetrieveSubqueryData;
schedMsg.ahandle = (void *)pParentSql->self; schedMsg.ahandle = (void *)pParentSql;
schedMsg.thandle = (void *)1; schedMsg.thandle = (void *)1;
schedMsg.msg = 0; schedMsg.msg = 0;
taosScheduleTask(tscQhandle, &schedMsg); taosScheduleTask(tscQhandle, &schedMsg);
...@@ -3311,7 +3349,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -3311,7 +3349,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
tscQueryInfoCopy(pNewQueryInfo, pSub); tscQueryInfoCopy(pNewQueryInfo, pSub);
// create sub query to handle the sub query. // create sub query to handle the sub query.
executeQuery(pNew, pSub); executeQuery(pNew, pNewQueryInfo);
} }
// merge sub query result and generate final results // merge sub query result and generate final results
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册