提交 4f5c5887 编写于 作者: H Haojun Liao

[td-225]

上级 6246a5fd
...@@ -491,16 +491,16 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -491,16 +491,16 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg; SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
pShellMsg->header.vgId = htonl(vgId); pShellMsg->header.vgId = htonl(vgId);
pShellMsg->header.contLen = htonl(size); pShellMsg->header.contLen = htonl(size); // the length not includes the size of SMsgDesc
pShellMsg->length = pShellMsg->header.contLen; pShellMsg->length = pShellMsg->header.contLen;
pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of tables to be inserted
// pSql->cmd.payloadLen is set during copying data into payload // pSql->cmd.payloadLen is set during copying data into payload
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo); tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo);
tscTrace("%p build submit msg, vgId:%d numOfVgroup:%d numberOfIP:%d", pSql, vgId, htonl(pMsgDesc->numOfVnodes), tscTrace("%p build submit msg, vgId:%d numOfTables:%d numberOfIP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
pSql->ipList.numOfIps); pSql->ipList.numOfIps);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -133,7 +133,6 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -133,7 +133,6 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
return NULL; return NULL;
} }
// tsRpcHeaderSize will be updated during RPC initialization, so only after it initialization, this value is valid
tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg); tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
return pSql; return pSql;
} }
......
...@@ -180,6 +180,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in ...@@ -180,6 +180,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in
getTmpfilePath("join-", pSupporter->path); getTmpfilePath("join-", pSupporter->path);
pSupporter->f = fopen(pSupporter->path, "w"); pSupporter->f = fopen(pSupporter->path, "w");
// todo handle error
if (pSupporter->f == NULL) { if (pSupporter->f == NULL) {
tscError("%p failed to create tmp file:%s, reason:%s", pSql, pSupporter->path, strerror(errno)); tscError("%p failed to create tmp file:%s, reason:%s", pSql, pSupporter->path, strerror(errno));
} }
...@@ -234,7 +235,7 @@ static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) { ...@@ -234,7 +235,7 @@ static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
/* /*
* launch secondary stage query to fetch the result that contains timestamp in set * launch secondary stage query to fetch the result that contains timestamp in set
*/ */
static int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
int32_t numOfSub = 0; int32_t numOfSub = 0;
SJoinSupporter* pSupporter = NULL; SJoinSupporter* pSupporter = NULL;
...@@ -249,7 +250,7 @@ static int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { ...@@ -249,7 +250,7 @@ static int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
assert(numOfSub > 0); assert(numOfSub > 0);
// scan all subquery, if one sub query has only ts, ignore it // scan all subquery, if one sub query has only ts, ignore it
tscTrace("%p start to launch secondary subquery, total:%d, only:%d needs to query", pSql, pSql->numOfSubs, numOfSub); tscTrace("%p start to launch secondary subqueries, total:%d, only:%d needs to query", pSql, pSql->numOfSubs, numOfSub);
//the subqueries that do not actually launch the secondary query to virtual node is set as completed. //the subqueries that do not actually launch the secondary query to virtual node is set as completed.
SSubqueryState* pState = pSupporter->pState; SSubqueryState* pState = pSupporter->pState;
...@@ -451,7 +452,7 @@ static UNUSED_FUNC void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupport ...@@ -451,7 +452,7 @@ static UNUSED_FUNC void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupport
freeJoinSubqueryObj(pParentSql); freeJoinSubqueryObj(pParentSql);
} else { } else {
updateQueryTimeRange(pParentQueryInfo, &win); updateQueryTimeRange(pParentQueryInfo, &win);
tscLaunchSecondPhaseSubqueries(pParentSql); tscLaunchRealSubqueries(pParentSql);
} }
} }
...@@ -851,7 +852,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -851,7 +852,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// launch the query the retrieve actual results from vnode along with the filtered timestamp // launch the query the retrieve actual results from vnode along with the filtered timestamp
SQueryInfo* pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex); SQueryInfo* pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
updateQueryTimeRange(pPQueryInfo, &win); updateQueryTimeRange(pPQueryInfo, &win);
tscLaunchSecondPhaseSubqueries(pParentSql); tscLaunchRealSubqueries(pParentSql);
} }
static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfRows) { static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfRows) {
...@@ -1159,7 +1160,6 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code); ...@@ -1159,7 +1160,6 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj); static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
// todo merge with callback
int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) { int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) {
SSqlCmd * pCmd = &pSql->cmd; SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
...@@ -1302,7 +1302,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -1302,7 +1302,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) {
pState->numOfTotal = pQueryInfo->numOfTables; pState->numOfTotal = pQueryInfo->numOfTables;
pState->numOfRemain = pState->numOfTotal; pState->numOfRemain = pState->numOfTotal;
tscTrace("%p start launch subquery, total:%d", pSql, pQueryInfo->numOfTables); tscTrace("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i); SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);
...@@ -1848,8 +1848,6 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { ...@@ -1848,8 +1848,6 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) { static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) {
SInsertSupporter *pSupporter = (SInsertSupporter *)param; SInsertSupporter *pSupporter = (SInsertSupporter *)param;
SSqlObj* pParentObj = pSupporter->pSql; SSqlObj* pParentObj = pSupporter->pSql;
SSqlCmd* pParentCmd = &pParentObj->cmd;
SSubqueryState* pState = pSupporter->pState; SSubqueryState* pState = pSupporter->pState;
// record the total inserted rows // record the total inserted rows
...@@ -1875,7 +1873,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -1875,7 +1873,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
// release data block data // release data block data
tfree(pState); tfree(pState);
pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks); // pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks);
// restore user defined fp // restore user defined fp
pParentObj->fp = pParentObj->fetchFp; pParentObj->fp = pParentObj->fetchFp;
...@@ -1945,7 +1943,8 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -1945,7 +1943,8 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
tscTrace("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j); tscTrace("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j);
tscProcessSql(pSub); tscProcessSql(pSub);
} }
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error: _error:
......
...@@ -4319,7 +4319,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4319,7 +4319,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
} }
} else if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group-by on normal columns query } else if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group-by on normal columns query
while (pQInfo->groupIndex < numOfGroups) { while (pQInfo->groupIndex < numOfGroups) {
SArray* group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, pQInfo->groupIndex); SArray* group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex);
qTrace("QInfo:%p group by normal columns group:%d, total group:%zu", pQInfo, pQInfo->groupIndex, numOfGroups); qTrace("QInfo:%p group by normal columns group:%d, total group:%zu", pQInfo, pQInfo->groupIndex, numOfGroups);
......
...@@ -114,8 +114,6 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { ...@@ -114,8 +114,6 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
SWindowResult *pResult = &pWindowResInfo->pResult[i]; SWindowResult *pResult = &pWindowResInfo->pResult[i];
if (pResult->status.closed) { // remove the window slot from hash table if (pResult->status.closed) { // remove the window slot from hash table
taosHashRemove(pWindowResInfo->hashList, (const char *)&pResult->window.skey, pWindowResInfo->type); taosHashRemove(pWindowResInfo->hashList, (const char *)&pResult->window.skey, pWindowResInfo->type);
printf("remove ============>%ld, remain size:%ld\n", pResult->window.skey, pWindowResInfo->hashList->size);
} else { } else {
break; break;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册