提交 40ddf1f1 编写于 作者: B Benguang Zhao

enh: synchronize iteration over sub queries with subState.mutex

上级 4cbcf232
...@@ -440,6 +440,8 @@ typedef struct SSqlStream { ...@@ -440,6 +440,8 @@ typedef struct SSqlStream {
} SSqlStream; } SSqlStream;
SSqlObj* tscAllocSqlObj(); SSqlObj* tscAllocSqlObj();
SSqlObj* tscAcquireRefOfSubobj(SSqlObj *pSql, int32_t idx);
void tscReleaseRefOfSubobj(SSqlObj *pSql);
void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable); void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable);
......
...@@ -210,7 +210,10 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -210,7 +210,10 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
tscDebug("0x%"PRIx64":CDEL retrieved query data from %d vnode(s)", pSql->self, pSql->subState.numOfSub); tscDebug("0x%"PRIx64":CDEL retrieved query data from %d vnode(s)", pSql->self, pSql->subState.numOfSub);
pRes->code = TSDB_CODE_SUCCESS; pRes->code = TSDB_CODE_SUCCESS;
int32_t i; int32_t i = 0;
{ pthread_mutex_lock(&pSql->subState.mutex);
for (i = 0; i < pSql->subState.numOfSub; ++i) { for (i = 0; i < pSql->subState.numOfSub; ++i) {
// vgroup // vgroup
SVgroupMsg* pVgroupMsg = &pTableMetaInfo->vgroupList->vgroups[i]; SVgroupMsg* pVgroupMsg = &pTableMetaInfo->vgroupList->vgroups[i];
...@@ -239,6 +242,8 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -239,6 +242,8 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
pthread_mutex_unlock(&pSql->subState.mutex); }
if (pRes->code != TSDB_CODE_SUCCESS) { if (pRes->code != TSDB_CODE_SUCCESS) {
doCleanupSubqueries(pSql); doCleanupSubqueries(pSql);
return pRes->code; return pRes->code;
......
...@@ -281,7 +281,8 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { ...@@ -281,7 +281,8 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
// } else { // } else {
// pQdesc->stableQuery = 0; // pQdesc->stableQuery = 0;
// } // }
pthread_mutex_lock(&pSql->subState.mutex); { pthread_mutex_lock(&pSql->subState.mutex);
if (pSql->pSubs != NULL && pSql->subState.states != NULL) { if (pSql->pSubs != NULL && pSql->subState.states != NULL) {
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
// because subState maybe free on anytime by any thread, check validate from here // because subState maybe free on anytime by any thread, check validate from here
...@@ -298,7 +299,8 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { ...@@ -298,7 +299,8 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
} }
} }
pQdesc->numOfSub = pSql->subState.numOfSub; pQdesc->numOfSub = pSql->subState.numOfSub;
pthread_mutex_unlock(&pSql->subState.mutex);
pthread_mutex_unlock(&pSql->subState.mutex); }
} }
pQdesc->numOfSub = htonl(pQdesc->numOfSub); pQdesc->numOfSub = htonl(pQdesc->numOfSub);
......
...@@ -353,8 +353,8 @@ void checkBrokenQueries(STscObj *pTscObj) { ...@@ -353,8 +353,8 @@ void checkBrokenQueries(STscObj *pTscObj) {
pSql->lastAlive = taosGetTimestampMs(); pSql->lastAlive = taosGetTimestampMs();
} }
} else { } else {
// lock subs { pthread_mutex_lock(&pSql->subState.mutex);
pthread_mutex_lock(&pSql->subState.mutex);
if (pSql->pSubs) { if (pSql->pSubs) {
// have sub sql // have sub sql
for (int i = 0; i < pSql->subState.numOfSub; i++) { for (int i = 0; i < pSql->subState.numOfSub; i++) {
...@@ -375,8 +375,8 @@ void checkBrokenQueries(STscObj *pTscObj) { ...@@ -375,8 +375,8 @@ void checkBrokenQueries(STscObj *pTscObj) {
} }
} }
} }
// unlock
pthread_mutex_unlock(&pSql->subState.mutex); pthread_mutex_unlock(&pSql->subState.mutex); }
} }
// kill query // kill query
......
...@@ -747,7 +747,9 @@ static void tscKillSTableQuery(SSqlObj *pSql) { ...@@ -747,7 +747,9 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
tscLockByThread(&pSql->squeryLock); tscLockByThread(&pSql->squeryLock);
{ pthread_mutex_lock(&pSql->subState.mutex);
for (int i = 0; i < pSql->subState.numOfSub; ++i) { for (int i = 0; i < pSql->subState.numOfSub; ++i) {
// NOTE: pSub may have been released already here // NOTE: pSub may have been released already here
SSqlObj *pSub = pSql->pSubs[i]; SSqlObj *pSub = pSql->pSubs[i];
...@@ -767,6 +769,8 @@ static void tscKillSTableQuery(SSqlObj *pSql) { ...@@ -767,6 +769,8 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
// taosRelekaseRef(tscObjRef, pSubObj->self); // taosRelekaseRef(tscObjRef, pSubObj->self);
} }
pthread_mutex_unlock(&pSql->subState.mutex); }
if (pSql->subState.numOfSub <= 0) { if (pSql->subState.numOfSub <= 0) {
tscAsyncResultOnError(pSql); tscAsyncResultOnError(pSql);
} }
......
...@@ -83,7 +83,6 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) { ...@@ -83,7 +83,6 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) {
} }
} }
static void asyncCallback(void *param, TAOS_RES *tres, int code) { static void asyncCallback(void *param, TAOS_RES *tres, int code) {
assert(param != NULL); assert(param != NULL);
SSub *pSub = ((SSub *)param); SSub *pSub = ((SSub *)param);
......
此差异已折叠。
...@@ -1749,6 +1749,29 @@ SSqlObj* tscAllocSqlObj() { ...@@ -1749,6 +1749,29 @@ SSqlObj* tscAllocSqlObj() {
return pNew; return pNew;
} }
SSqlObj* tscAcquireRefOfSubobj(SSqlObj* pSql, int32_t idx) {
assert (pSql != NULL);
SSqlObj *pSub = NULL;
pthread_mutex_lock(&pSql->subState.mutex);
if (idx < 0 ||
idx >= pSql->subState.numOfSub ||
!pSql->pSubs[idx]) {
goto _out;
}
pSub = taosAcquireRef(tscObjRef, pSql->pSubs[idx]->self);
assert (pSql->pSubs[idx] == pSub && "Refcounted subquery obj mismatch");
_out:
pthread_mutex_unlock(&pSql->subState.mutex);
return pSub;
}
void tscReleaseRefOfSubobj(SSqlObj* pSub) {
assert (pSub != NULL && pSub->self != 0 && "Subquery obj not refcounted");
taosReleaseRef(tscObjRef, pSub->self);
}
void tscFreeSqlObj(SSqlObj* pSql) { void tscFreeSqlObj(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
return; return;
...@@ -4209,14 +4232,16 @@ int32_t doReInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) { ...@@ -4209,14 +4232,16 @@ int32_t doReInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) {
tscFreeSubobj(pSql); tscFreeSubobj(pSql);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pthread_mutex_lock(&pSql->subState.mutex); { pthread_mutex_lock(&pSql->subState.mutex);
pSql->pSubs = calloc(numOfSubqueries, POINTER_BYTES); pSql->pSubs = calloc(numOfSubqueries, POINTER_BYTES);
pSql->subState.states = calloc(numOfSubqueries, sizeof(int8_t)); pSql->subState.states = calloc(numOfSubqueries, sizeof(int8_t));
if (pSql->pSubs == NULL || pSql->subState.states == NULL) { if (pSql->pSubs == NULL || pSql->subState.states == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
pSql->subState.numOfSub = numOfSubqueries; pSql->subState.numOfSub = numOfSubqueries;
pthread_mutex_unlock(&pSql->subState.mutex);
pthread_mutex_unlock(&pSql->subState.mutex); }
return code; return code;
} }
...@@ -4247,6 +4272,9 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4247,6 +4272,9 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
goto _error; goto _error;
} }
int errflag = 0;
{ pthread_mutex_lock(&pSql->subState.mutex);
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, i); SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, i);
...@@ -4256,7 +4284,8 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4256,7 +4284,8 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
SSqlObj* pNew = tscAllocSqlObj(); SSqlObj* pNew = tscAllocSqlObj();
if (pNew == NULL) { if (pNew == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; errflag = 1;
break;
} }
pNew->pTscObj = pSql->pTscObj; pNew->pTscObj = pSql->pTscObj;
...@@ -4272,19 +4301,23 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4272,19 +4301,23 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport)); // todo use object id SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport)); // todo use object id
if (ps == NULL) { if (ps == NULL) {
tscFreeSqlObj(pNew); tscFreeSqlObj(pNew);
goto _error; errflag = 1;
break;
} }
ps->pParentSql = pSql; ps->pParentSql = pSql;
ps->subqueryIndex = i; ps->subqueryIndex = i;
pNew->param = ps; pNew->param = ps;
registerSqlObj(pNew);
pSql->pSubs[i] = pNew; pSql->pSubs[i] = pNew;
SSqlCmd* pCmd = &pNew->cmd; SSqlCmd* pCmd = &pNew->cmd;
pCmd->command = TSDB_SQL_SELECT; pCmd->command = TSDB_SQL_SELECT;
if ((code = tscAddQueryInfo(pCmd)) != TSDB_CODE_SUCCESS) { if ((code = tscAddQueryInfo(pCmd)) != TSDB_CODE_SUCCESS) {
goto _error; errflag = 1;
break;
} }
SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd); SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd);
...@@ -4294,9 +4327,12 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4294,9 +4327,12 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
numOfInit++; numOfInit++;
} }
pthread_mutex_unlock(&pSql->subState.mutex); }
if (errflag) { goto _error; }
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* psub = pSql->pSubs[i]; SSqlObj* psub = tscAcquireRefOfSubobj(pSql, i); // ACQ REF
registerSqlObj(psub); if (!psub) continue;
// create sub query to handle the sub query. // create sub query to handle the sub query.
SQueryInfo* pq = tscGetQueryInfo(&psub->cmd); SQueryInfo* pq = tscGetQueryInfo(&psub->cmd);
...@@ -4306,6 +4342,8 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4306,6 +4342,8 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
psub->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; psub->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
} }
executeQuery(psub, pq); executeQuery(psub, pq);
tscReleaseRefOfSubobj(psub); // REL REF
} }
return; return;
......
...@@ -195,7 +195,6 @@ int taosRemoveRef(int rsetId, int64_t rid) ...@@ -195,7 +195,6 @@ int taosRemoveRef(int rsetId, int64_t rid)
return taosDecRefCount(rsetId, rid, 1); return taosDecRefCount(rsetId, rid, 1);
} }
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid
void *taosAcquireRef(int rsetId, int64_t rid) void *taosAcquireRef(int rsetId, int64_t rid)
{ {
int hash; int hash;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册