提交 695572cc 编写于 作者: D dapan1121

fix bug

上级 e29a53ef
...@@ -68,25 +68,28 @@ static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, i ...@@ -68,25 +68,28 @@ static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, i
pthread_mutex_unlock(&subState->mutex); pthread_mutex_unlock(&subState->mutex);
} }
static bool allSubqueryDone(SSubqueryState *subState) { static bool allSubqueryDone(SSqlObj *pParentSql) {
bool done = true; bool done = true;
SSubqueryState *subState = &pParentSql->subState;
//lock in caller //lock in caller
for (int i = 0; i < subState->numOfSub; i++) { for (int i = 0; i < subState->numOfSub; i++) {
if (0 == subState->states[i]) { if (0 == subState->states[i]) {
tscDebug("subquery:%d is NOT finished, total:%d", i, subState->numOfSub); tscDebug("subquery:%p,%d is NOT finished, total:%d", pParentSql->pSubs[i], i, subState->numOfSub);
done = false; done = false;
break; break;
} else { } else {
tscDebug("subquery:%d is finished, total:%d", i, subState->numOfSub); tscDebug("subquery:%p,%d is finished, total:%d", pParentSql->pSubs[i], i, subState->numOfSub);
} }
} }
return done; return done;
} }
static bool subAndCheckDone(SSqlObj *pSql, SSubqueryState *subState, int idx) { static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
SSubqueryState *subState = &pParentSql->subState;
assert(idx < subState->numOfSub); assert(idx < subState->numOfSub);
pthread_mutex_lock(&subState->mutex); pthread_mutex_lock(&subState->mutex);
...@@ -95,7 +98,7 @@ static bool subAndCheckDone(SSqlObj *pSql, SSubqueryState *subState, int idx) { ...@@ -95,7 +98,7 @@ static bool subAndCheckDone(SSqlObj *pSql, SSubqueryState *subState, int idx) {
subState->states[idx] = 1; subState->states[idx] = 1;
bool done = allSubqueryDone(subState); bool done = allSubqueryDone(pParentSql);
pthread_mutex_unlock(&subState->mutex); pthread_mutex_unlock(&subState->mutex);
...@@ -580,7 +583,7 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { ...@@ -580,7 +583,7 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
} }
static void quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { static void quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
if (subAndCheckDone(pSqlSub, &pSqlObj->subState, pSupporter->subqueryIndex)) { if (subAndCheckDone(pSqlSub, pSqlObj, pSupporter->subqueryIndex)) {
tscError("%p all subquery return and query failed, global code:%s", pSqlObj, tstrerror(pSqlObj->res.code)); tscError("%p all subquery return and query failed, global code:%s", pSqlObj, tstrerror(pSqlObj->res.code));
freeJoinSubqueryObj(pSqlObj); freeJoinSubqueryObj(pSqlObj);
return; return;
...@@ -897,7 +900,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -897,7 +900,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// no data exists in next vnode, mark the <tid, tags> query completed // no data exists in next vnode, mark the <tid, tags> query completed
// only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets. // only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets.
if (!subAndCheckDone(pSql, &pParentSql->subState, pSupporter->subqueryIndex)) { if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
tscDebug("%p tagRetrieve:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub); tscDebug("%p tagRetrieve:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub);
return; return;
} }
...@@ -945,8 +948,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -945,8 +948,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
((SJoinSupporter*)psub2->param)->pVgroupTables = tscVgroupTableInfoClone(pTableMetaInfo2->pVgroupTables); ((SJoinSupporter*)psub2->param)->pVgroupTables = tscVgroupTableInfoClone(pTableMetaInfo2->pVgroupTables);
pParentSql->subState.numOfSub = 2; pParentSql->subState.numOfSub = 2;
memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub); memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub);
tscDebug("%p reset all sub states to 0", pParentSql);
for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) { for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) {
SSqlObj* sub = pParentSql->pSubs[m]; SSqlObj* sub = pParentSql->pSubs[m];
issueTSCompQuery(sub, sub->param, pParentSql); issueTSCompQuery(sub, sub->param, pParentSql);
...@@ -1063,7 +1068,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -1063,7 +1068,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
return; return;
} }
if (!subAndCheckDone(pSql, &pParentSql->subState, pSupporter->subqueryIndex)) { if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
return; return;
} }
...@@ -1142,7 +1147,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR ...@@ -1142,7 +1147,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
} }
} }
if (!subAndCheckDone(pSql, pState, pSupporter->subqueryIndex)) { if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
tscDebug("%p sub:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pState->numOfSub); tscDebug("%p sub:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pState->numOfSub);
return; return;
} }
...@@ -1263,7 +1268,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1263,7 +1268,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) { if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) {
pSql->subState.states[i] = 0; subquerySetState(pSub, &pSql->subState, i, 0);
} }
} }
} }
...@@ -1476,7 +1481,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -1476,7 +1481,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
// In case of consequence query from other vnode, do not wait for other query response here. // In case of consequence query from other vnode, do not wait for other query response here.
if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) { if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) {
if (!subAndCheckDone(pSql, &pParentSql->subState, pSupporter->subqueryIndex)) { if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
return; return;
} }
} }
...@@ -1830,6 +1835,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -1830,6 +1835,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
} }
memset(pState->states, 0, sizeof(*pState->states) * pState->numOfSub); memset(pState->states, 0, sizeof(*pState->states) * pState->numOfSub);
tscDebug("%p reset all sub states to 0", pSql);
pRes->code = TSDB_CODE_SUCCESS; pRes->code = TSDB_CODE_SUCCESS;
...@@ -2009,7 +2015,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO ...@@ -2009,7 +2015,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
} }
} }
if (!subAndCheckDone(pSql, pState, subqueryIndex)) { if (!subAndCheckDone(pSql, pParentSql, subqueryIndex)) {
tscDebug("%p sub:%p,%d freed, not finished, total:%d", pParentSql, pSql, trsupport->subqueryIndex, pState->numOfSub); tscDebug("%p sub:%p,%d freed, not finished, total:%d", pParentSql, pSql, trsupport->subqueryIndex, pState->numOfSub);
tscFreeRetrieveSup(pSql); tscFreeRetrieveSup(pSql);
...@@ -2079,7 +2085,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p ...@@ -2079,7 +2085,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
return; return;
} }
if (!subAndCheckDone(pSql, &pParentSql->subState, idx)) { if (!subAndCheckDone(pSql, pParentSql, idx)) {
tscDebug("%p sub:%p orderOfSub:%d freed, not finished", pParentSql, pSql, trsupport->subqueryIndex); tscDebug("%p sub:%p orderOfSub:%d freed, not finished", pParentSql, pSql, trsupport->subqueryIndex);
tscFreeRetrieveSup(pSql); tscFreeRetrieveSup(pSql);
...@@ -2350,7 +2356,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -2350,7 +2356,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
} }
} }
if (!subAndCheckDone(tres, &pParentObj->subState, pSupporter->index)) { if (!subAndCheckDone(tres, pParentObj, pSupporter->index)) {
tscDebug("%p insert:%p,%d completed, total:%d", pParentObj, tres, pSupporter->index, pParentObj->subState.numOfSub); tscDebug("%p insert:%p,%d completed, total:%d", pParentObj, tres, pSupporter->index, pParentObj->subState.numOfSub);
return; return;
} }
...@@ -2487,6 +2493,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -2487,6 +2493,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
} }
memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
tscDebug("%p reset all sub states to 0", pSql);
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
if (pSql->pSubs == NULL) { if (pSql->pSubs == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册