提交 c1057ebf 编写于 作者: S Shengliang Guan

Merge branch 'feature/query' of https://github.com/taosdata/TDengine into feature/query

......@@ -491,16 +491,16 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
pShellMsg->header.vgId = htonl(vgId);
pShellMsg->header.contLen = htonl(size);
pShellMsg->header.contLen = htonl(size); // the length not includes the size of SMsgDesc
pShellMsg->length = pShellMsg->header.contLen;
pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted
pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of tables to be inserted
// pSql->cmd.payloadLen is set during copying data into payload
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo);
tscTrace("%p build submit msg, vgId:%d numOfVgroup:%d numberOfIP:%d", pSql, vgId, htonl(pMsgDesc->numOfVnodes),
tscTrace("%p build submit msg, vgId:%d numOfTables:%d numberOfIP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
pSql->ipList.numOfIps);
return TSDB_CODE_SUCCESS;
}
......
......@@ -133,7 +133,6 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
return NULL;
}
// tsRpcHeaderSize will be updated during RPC initialization, so only after it initialization, this value is valid
tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
return pSql;
}
......
......@@ -180,6 +180,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in
getTmpfilePath("join-", pSupporter->path);
pSupporter->f = fopen(pSupporter->path, "w");
// todo handle error
if (pSupporter->f == NULL) {
tscError("%p failed to create tmp file:%s, reason:%s", pSql, pSupporter->path, strerror(errno));
}
......@@ -234,7 +235,7 @@ static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
/*
* launch secondary stage query to fetch the result that contains timestamp in set
*/
static int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
int32_t numOfSub = 0;
SJoinSupporter* pSupporter = NULL;
......@@ -249,7 +250,7 @@ static int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
assert(numOfSub > 0);
// scan all subquery, if one sub query has only ts, ignore it
tscTrace("%p start to launch secondary subquery, total:%d, only:%d needs to query", pSql, pSql->numOfSubs, numOfSub);
tscTrace("%p start to launch secondary subqueries, total:%d, only:%d needs to query", pSql, pSql->numOfSubs, numOfSub);
//the subqueries that do not actually launch the secondary query to virtual node is set as completed.
SSubqueryState* pState = pSupporter->pState;
......@@ -451,7 +452,7 @@ static UNUSED_FUNC void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupport
freeJoinSubqueryObj(pParentSql);
} else {
updateQueryTimeRange(pParentQueryInfo, &win);
tscLaunchSecondPhaseSubqueries(pParentSql);
tscLaunchRealSubqueries(pParentSql);
}
}
......@@ -851,7 +852,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// launch the query the retrieve actual results from vnode along with the filtered timestamp
SQueryInfo* pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
updateQueryTimeRange(pPQueryInfo, &win);
tscLaunchSecondPhaseSubqueries(pParentSql);
tscLaunchRealSubqueries(pParentSql);
}
static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfRows) {
......@@ -1159,7 +1160,6 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
// todo merge with callback
int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) {
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
......@@ -1302,7 +1302,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) {
pState->numOfTotal = pQueryInfo->numOfTables;
pState->numOfRemain = pState->numOfTotal;
tscTrace("%p start launch subquery, total:%d", pSql, pQueryInfo->numOfTables);
tscTrace("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);
......@@ -1848,8 +1848,6 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) {
SInsertSupporter *pSupporter = (SInsertSupporter *)param;
SSqlObj* pParentObj = pSupporter->pSql;
SSqlCmd* pParentCmd = &pParentObj->cmd;
SSubqueryState* pState = pSupporter->pState;
// record the total inserted rows
......@@ -1875,7 +1873,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
// release data block data
tfree(pState);
pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks);
// pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks);
// restore user defined fp
pParentObj->fp = pParentObj->fetchFp;
......@@ -1945,7 +1943,8 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
tscTrace("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j);
tscProcessSql(pSub);
}
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
return TSDB_CODE_SUCCESS;
_error:
......
......@@ -4319,7 +4319,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
}
} else if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group-by on normal columns query
while (pQInfo->groupIndex < numOfGroups) {
SArray* group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, pQInfo->groupIndex);
SArray* group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex);
qTrace("QInfo:%p group by normal columns group:%d, total group:%zu", pQInfo, pQInfo->groupIndex, numOfGroups);
......
......@@ -114,8 +114,6 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
SWindowResult *pResult = &pWindowResInfo->pResult[i];
if (pResult->status.closed) { // remove the window slot from hash table
taosHashRemove(pWindowResInfo->hashList, (const char *)&pResult->window.skey, pWindowResInfo->type);
printf("remove ============>%ld, remain size:%ld\n", pResult->window.skey, pWindowResInfo->hashList->size);
} else {
break;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册