提交 c04a4f17 编写于 作者: D dapan1121

fix bug

上级 a64f1379
...@@ -317,7 +317,7 @@ typedef struct STscObj { ...@@ -317,7 +317,7 @@ typedef struct STscObj {
} STscObj; } STscObj;
typedef struct SSubqueryState { typedef struct SSubqueryState {
int32_t subLock; pthread_mutex_t mutex;
int8_t *states; int8_t *states;
int32_t numOfSub; // the number of total sub-queries int32_t numOfSub; // the number of total sub-queries
uint64_t numOfRetrievedRows; // total number of points in this query uint64_t numOfRetrievedRows; // total number of points in this query
......
...@@ -55,34 +55,17 @@ static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) { ...@@ -55,34 +55,17 @@ static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) {
} }
} }
void tscSpinLock(int32_t *lock) {
int i = 0;
while (atomic_val_compare_exchange_32(lock, 0, 1) != 0) {
if (++i % 100 == 0) {
sched_yield();
}
}
}
void tscSpinUnlock(int32_t *lock) {
if (atomic_val_compare_exchange_32(lock, 1, 0) != 1) {
assert(false);
}
}
static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) { static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) {
assert(idx < subState->numOfSub); assert(idx < subState->numOfSub);
assert(subState->states); assert(subState->states);
tscSpinLock(&subState->subLock); pthread_mutex_lock(&subState->mutex);
tscDebug("subquery:%p,%d state set to %d", pSql, idx, state); tscDebug("subquery:%p,%d state set to %d", pSql, idx, state);
subState->states[idx] = state; subState->states[idx] = state;
tscSpinUnlock(&subState->subLock); pthread_mutex_unlock(&subState->mutex);
} }
static bool allSubqueryDone(SSubqueryState *subState) { static bool allSubqueryDone(SSubqueryState *subState) {
...@@ -106,7 +89,7 @@ static bool allSubqueryDone(SSubqueryState *subState) { ...@@ -106,7 +89,7 @@ static bool allSubqueryDone(SSubqueryState *subState) {
static bool subAndCheckDone(SSqlObj *pSql, SSubqueryState *subState, int idx) { static bool subAndCheckDone(SSqlObj *pSql, SSubqueryState *subState, int idx) {
assert(idx < subState->numOfSub); assert(idx < subState->numOfSub);
tscSpinLock(&subState->subLock); pthread_mutex_lock(&subState->mutex);
tscDebug("subquery:%p,%d state set to 1", pSql, idx); tscDebug("subquery:%p,%d state set to 1", pSql, idx);
...@@ -114,7 +97,7 @@ static bool subAndCheckDone(SSqlObj *pSql, SSubqueryState *subState, int idx) { ...@@ -114,7 +97,7 @@ static bool subAndCheckDone(SSqlObj *pSql, SSubqueryState *subState, int idx) {
bool done = allSubqueryDone(subState); bool done = allSubqueryDone(subState);
tscSpinUnlock(&subState->subLock); pthread_mutex_unlock(&subState->mutex);
return done; return done;
} }
...@@ -432,6 +415,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -432,6 +415,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
// scan all subquery, if one sub query has only ts, ignore it // scan all subquery, if one sub query has only ts, ignore it
tscDebug("%p start to launch secondary subqueries, %d out of %d needs to query", pSql, numOfSub, pSql->subState.numOfSub); tscDebug("%p start to launch secondary subqueries, %d out of %d needs to query", pSql, numOfSub, pSql->subState.numOfSub);
memset(&pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
bool success = true; bool success = true;
...@@ -466,7 +450,6 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -466,7 +450,6 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
break; break;
} }
subquerySetState(pNew, &pSql->subState, i, 0);
tscClearSubqueryInfo(&pNew->cmd); tscClearSubqueryInfo(&pNew->cmd);
pSql->pSubs[i] = pNew; pSql->pSubs[i] = pNew;
...@@ -585,8 +568,13 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { ...@@ -585,8 +568,13 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
pSql->pSubs[i] = NULL; pSql->pSubs[i] = NULL;
} }
if (pSql->subState.states) {
pthread_mutex_destroy(&pSql->subState.mutex);
}
tfree(pSql->subState.states); tfree(pSql->subState.states);
pSql->subState.numOfSub = 0; pSql->subState.numOfSub = 0;
} }
...@@ -1269,6 +1257,19 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1269,6 +1257,19 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
} }
} }
if (orderedPrjQuery) {
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) {
pSql->subState.states[i] = 0;
} else {
pSql->subState.states[i] = 1;
}
}
}
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) { if (pSub == NULL) {
...@@ -1296,13 +1297,9 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1296,13 +1297,9 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
pSub->cmd.command = TSDB_SQL_SELECT; pSub->cmd.command = TSDB_SQL_SELECT;
pSub->fp = tscJoinQueryCallback; pSub->fp = tscJoinQueryCallback;
subquerySetState(pSub, &pSql->subState, i, 0);
tscProcessSql(pSub); tscProcessSql(pSub);
tryNextVnode = true; tryNextVnode = true;
} else { } else {
subquerySetState(pSub, &pSql->subState, i, 1);
tscDebug("%p no result in current subquery anymore", pSub); tscDebug("%p no result in current subquery anymore", pSub);
} }
} }
...@@ -1495,8 +1492,6 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -1495,8 +1492,6 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data
pSql->cmd.command = TSDB_SQL_FETCH; pSql->cmd.command = TSDB_SQL_FETCH;
subquerySetState(pSql, &pParentSql->subState, pSupporter->subqueryIndex, 0);
tscProcessSql(pSql); tscProcessSql(pSql);
} else { // first retrieve from vnode during the secondary stage sub-query } else { // first retrieve from vnode during the secondary stage sub-query
// set the command flag must be after the semaphore been correctly set. // set the command flag must be after the semaphore been correctly set.
...@@ -1670,6 +1665,8 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -1670,6 +1665,8 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; goto _error;
} }
pthread_mutex_init(&pSql->subState.mutex, NULL);
} }
bool hasEmptySub = false; bool hasEmptySub = false;
...@@ -1707,8 +1704,9 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -1707,8 +1704,9 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) { if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) {
memset(pSql->subState.states + i, 1, sizeof(*pSql->subState.states) * (pSql->subState.numOfSub - i)); // the already sent request will continue and do not go to the error process routine pRes->code = code;
break; (*pSub->fp)(pSub->param, pSub, 0);
return;
} }
} }
...@@ -1799,6 +1797,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -1799,6 +1797,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tfree(pMemoryBuf); tfree(pMemoryBuf);
return ret; return ret;
} }
pthread_mutex_init(&pState->mutex, NULL);
} }
memset(pState->states, 0, sizeof(*pState->states) * pState->numOfSub); memset(pState->states, 0, sizeof(*pState->states) * pState->numOfSub);
...@@ -2454,6 +2454,8 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -2454,6 +2454,8 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; goto _error;
} }
pthread_mutex_init(&pSql->subState.mutex, NULL);
} }
memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
......
...@@ -441,6 +441,10 @@ static void tscFreeSubobj(SSqlObj* pSql) { ...@@ -441,6 +441,10 @@ static void tscFreeSubobj(SSqlObj* pSql) {
pSql->pSubs[i] = NULL; pSql->pSubs[i] = NULL;
} }
if (pSql->subState.states) {
pthread_mutex_destroy(&pSql->subState.mutex);
}
tfree(pSql->subState.states); tfree(pSql->subState.states);
pSql->subState.numOfSub = 0; pSql->subState.numOfSub = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册