From 4f5c58877c33936f30d8236127ac72cefb6d8c19 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 22 Jun 2020 16:02:57 +0800 Subject: [PATCH] [td-225] --- src/client/src/tscServer.c | 6 +++--- src/client/src/tscSql.c | 1 - src/client/src/tscSubquery.c | 19 +++++++++---------- src/query/src/qExecutor.c | 2 +- src/query/src/qUtil.c | 2 -- 5 files changed, 13 insertions(+), 17 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 58865d3eac..102cb427d5 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -491,16 +491,16 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg; pShellMsg->header.vgId = htonl(vgId); - pShellMsg->header.contLen = htonl(size); + pShellMsg->header.contLen = htonl(size); // the length not includes the size of SMsgDesc pShellMsg->length = pShellMsg->header.contLen; - pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted + pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of tables to be inserted // pSql->cmd.payloadLen is set during copying data into payload pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo); - tscTrace("%p build submit msg, vgId:%d numOfVgroup:%d numberOfIP:%d", pSql, vgId, htonl(pMsgDesc->numOfVnodes), + tscTrace("%p build submit msg, vgId:%d numOfTables:%d numberOfIP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit, pSql->ipList.numOfIps); return TSDB_CODE_SUCCESS; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 8668c31cf4..720e412225 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -133,7 +133,6 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con return NULL; } - // tsRpcHeaderSize will be updated during RPC initialization, so only after it initialization, this value is valid tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg); return pSql; } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 7a64e7f496..fbfc771397 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -180,6 +180,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in getTmpfilePath("join-", pSupporter->path); pSupporter->f = fopen(pSupporter->path, "w"); + // todo handle error if (pSupporter->f == NULL) { tscError("%p failed to create tmp file:%s, reason:%s", pSql, pSupporter->path, strerror(errno)); } @@ -234,7 +235,7 @@ static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) { /* * launch secondary stage query to fetch the result that contains timestamp in set */ -static int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { +static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { int32_t numOfSub = 0; SJoinSupporter* pSupporter = NULL; @@ -249,7 +250,7 @@ static int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { assert(numOfSub > 0); // scan all subquery, if one sub query has only ts, ignore it - tscTrace("%p start to launch secondary subquery, total:%d, only:%d needs to query", pSql, pSql->numOfSubs, numOfSub); + tscTrace("%p start to launch secondary subqueries, total:%d, only:%d needs to query", pSql, pSql->numOfSubs, numOfSub); //the subqueries that do not actually launch the secondary query to virtual node is set as completed. SSubqueryState* pState = pSupporter->pState; @@ -451,7 +452,7 @@ static UNUSED_FUNC void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupport freeJoinSubqueryObj(pParentSql); } else { updateQueryTimeRange(pParentQueryInfo, &win); - tscLaunchSecondPhaseSubqueries(pParentSql); + tscLaunchRealSubqueries(pParentSql); } } @@ -851,7 +852,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow // launch the query the retrieve actual results from vnode along with the filtered timestamp SQueryInfo* pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex); updateQueryTimeRange(pPQueryInfo, &win); - tscLaunchSecondPhaseSubqueries(pParentSql); + tscLaunchRealSubqueries(pParentSql); } static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfRows) { @@ -1159,7 +1160,6 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code); static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj); -// todo merge with callback int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) { SSqlCmd * pCmd = &pSql->cmd; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); @@ -1302,7 +1302,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) { pState->numOfTotal = pQueryInfo->numOfTables; pState->numOfRemain = pState->numOfTotal; - tscTrace("%p start launch subquery, total:%d", pSql, pQueryInfo->numOfTables); + tscTrace("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables); for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i); @@ -1848,8 +1848,6 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) { SInsertSupporter *pSupporter = (SInsertSupporter *)param; SSqlObj* pParentObj = pSupporter->pSql; - SSqlCmd* pParentCmd = &pParentObj->cmd; - SSubqueryState* pState = pSupporter->pState; // record the total inserted rows @@ -1875,7 +1873,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) // release data block data tfree(pState); - pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks); +// pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks); // restore user defined fp pParentObj->fp = pParentObj->fetchFp; @@ -1945,7 +1943,8 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { tscTrace("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j); tscProcessSql(pSub); } - + + pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); return TSDB_CODE_SUCCESS; _error: diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 5a865667d0..02084dc6f7 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4319,7 +4319,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } } else if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group-by on normal columns query while (pQInfo->groupIndex < numOfGroups) { - SArray* group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, pQInfo->groupIndex); + SArray* group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex); qTrace("QInfo:%p group by normal columns group:%d, total group:%zu", pQInfo, pQInfo->groupIndex, numOfGroups); diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index aa5550efcb..42592e91c5 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -114,8 +114,6 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { SWindowResult *pResult = &pWindowResInfo->pResult[i]; if (pResult->status.closed) { // remove the window slot from hash table taosHashRemove(pWindowResInfo->hashList, (const char *)&pResult->window.skey, pWindowResInfo->type); - printf("remove ============>%ld, remain size:%ld\n", pResult->window.skey, pWindowResInfo->hashList->size); - } else { break; } -- GitLab