diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 4979bcec4f3027d109a70436668b10fde5e066cf..c32ec5625b16beea988e3db026c584a29663f27c 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -440,6 +440,8 @@ typedef struct SSqlStream { } SSqlStream; SSqlObj* tscAllocSqlObj(); +SSqlObj* tscAcquireRefOfSubobj(SSqlObj *pSql, int32_t idx); +void tscReleaseRefOfSubobj(SSqlObj *pSql); void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable); diff --git a/src/client/src/tscDelete.c b/src/client/src/tscDelete.c index 2fd019c3e330171eaf5a93d9f8d3901643446cd6..978d824e915aaf9f5bcd717978091104939234e0 100644 --- a/src/client/src/tscDelete.c +++ b/src/client/src/tscDelete.c @@ -210,7 +210,10 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) { tscDebug("0x%"PRIx64":CDEL retrieved query data from %d vnode(s)", pSql->self, pSql->subState.numOfSub); pRes->code = TSDB_CODE_SUCCESS; - int32_t i; + int32_t i = 0; + + { pthread_mutex_lock(&pSql->subState.mutex); + for (i = 0; i < pSql->subState.numOfSub; ++i) { // vgroup SVgroupMsg* pVgroupMsg = &pTableMetaInfo->vgroupList->vgroups[i]; @@ -239,6 +242,8 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) { pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; } + pthread_mutex_unlock(&pSql->subState.mutex); } + if (pRes->code != TSDB_CODE_SUCCESS) { doCleanupSubqueries(pSql); return pRes->code; diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index 7de9ef762a29c47bf56eb63630be2e45cf269b39..3162b052806fc34208bdecb5afbe1bea1b9c469a 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -281,7 +281,8 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { // } else { // pQdesc->stableQuery = 0; // } - pthread_mutex_lock(&pSql->subState.mutex); + { pthread_mutex_lock(&pSql->subState.mutex); + if (pSql->pSubs != NULL && pSql->subState.states != NULL) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { // because subState maybe free on anytime by any thread, check validate from here @@ -298,7 +299,8 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { } } pQdesc->numOfSub = pSql->subState.numOfSub; - pthread_mutex_unlock(&pSql->subState.mutex); + + pthread_mutex_unlock(&pSql->subState.mutex); } } pQdesc->numOfSub = htonl(pQdesc->numOfSub); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 06c6efb4766b5382327363f5bdd257ea70272b3d..212787bce55b5f768a511d290d6dc6203a9202d9 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -353,8 +353,8 @@ void checkBrokenQueries(STscObj *pTscObj) { pSql->lastAlive = taosGetTimestampMs(); } } else { - // lock subs - pthread_mutex_lock(&pSql->subState.mutex); + { pthread_mutex_lock(&pSql->subState.mutex); + if (pSql->pSubs) { // have sub sql for (int i = 0; i < pSql->subState.numOfSub; i++) { @@ -375,8 +375,8 @@ void checkBrokenQueries(STscObj *pTscObj) { } } } - // unlock - pthread_mutex_unlock(&pSql->subState.mutex); + + pthread_mutex_unlock(&pSql->subState.mutex); } } // kill query diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 901d1788c95782ba076f56bfd01a3dfa08f8ad9a..ca242d8fe96a157d1d0f77021a550fdada95c923 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -747,7 +747,9 @@ static void tscKillSTableQuery(SSqlObj *pSql) { pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; tscLockByThread(&pSql->squeryLock); - + + { pthread_mutex_lock(&pSql->subState.mutex); + for (int i = 0; i < pSql->subState.numOfSub; ++i) { // NOTE: pSub may have been released already here SSqlObj *pSub = pSql->pSubs[i]; @@ -767,6 +769,8 @@ static void tscKillSTableQuery(SSqlObj *pSql) { // taosRelekaseRef(tscObjRef, pSubObj->self); } + pthread_mutex_unlock(&pSql->subState.mutex); } + if (pSql->subState.numOfSub <= 0) { tscAsyncResultOnError(pSql); } diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index c7d4d3c00a88f988168abf350edcbb3f92d28e33..d48a92cb962317dc1e6554a12a6aa474c6a1f193 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -83,7 +83,6 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) { } } - static void asyncCallback(void *param, TAOS_RES *tres, int code) { assert(param != NULL); SSub *pSub = ((SSub *)param); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 72cf23770ac3943a65783e66f5552a0a44e042ba..f07e4c4ea520cb8bdd9dce430df3608e303df628 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -66,13 +66,11 @@ static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) { } } -static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) { +static void subquerySetStateWithoutLock(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) { assert(idx < subState->numOfSub && subState->states != NULL); tscDebug("subquery:0x%"PRIx64",%d state set to %d", pSql->self, idx, state); - pthread_mutex_lock(&subState->mutex); subState->states[idx] = state; - pthread_mutex_unlock(&subState->mutex); } static bool allSubqueryDoneWithoutLock(SSqlObj *pParentSql) { @@ -123,7 +121,6 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) { SLimitVal* pLimit = &pQueryInfo->limit; int32_t order = pQueryInfo->order.order; - int32_t joinNum = pSql->subState.numOfSub; SMergeTsCtx ctxlist[TSDB_MAX_JOIN_TABLE_NUM] = {{0}}; SMergeTsCtx* ctxStack[TSDB_MAX_JOIN_TABLE_NUM] = {0}; int32_t slot = 0; @@ -138,25 +135,39 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) { STSElem prev; SArray* tsCond = NULL; int32_t mergeDone = 0; + int32_t joinNum = 0; + + int errflag = 0; + { pthread_mutex_lock(&pSql->subState.mutex); + + joinNum = pSql->subState.numOfSub; for (int32_t i = 0; i < joinNum; ++i) { + SSqlObj *pSub = pSql->pSubs[i]; + if (!pSub) { + tscError("0x%"PRIx64" the %d'th of the (%d) sub queries is null.", pSql->self, i, joinNum); + errflag = 1; + break; + } STSBuf* output = tsBufCreate(true, pQueryInfo->order.order); - SQueryInfo* pSubQueryInfo = tscGetQueryInfo(&pSql->pSubs[i]->cmd); + SQueryInfo* pSubQueryInfo = tscGetQueryInfo(&pSub->cmd); pSubQueryInfo->tsBuf = output; - SJoinSupporter* pSupporter = pSql->pSubs[i]->param; + SJoinSupporter* pSupporter = pSub->param; if (pSupporter->pTSBuf == NULL) { tscDebug("0x%"PRIx64" at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql->self); - return 0; + errflag = 1; + break; } tsBufResetPos(pSupporter->pTSBuf); if (!tsBufNextPos(pSupporter->pTSBuf)) { tscDebug("0x%"PRIx64" input1 is empty, 0 for secondary query after ts blocks intersecting", pSql->self); - return 0; + errflag = 1; + break; } tscDebug("0x%"PRIx64" sub:0x%"PRIx64" table idx:%d, input group number:%d", pSql->self, @@ -166,6 +177,11 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) { ctxlist[i].res = output; } + pthread_mutex_unlock(&pSql->subState.mutex); } + if (errflag) { + return 0; + } + TSKEY st = taosGetTimestampUs(); for (int16_t tidx = 0; tidx < joinNum; tidx++) { @@ -521,7 +537,10 @@ static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupT static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { int32_t numOfSub = 0; SJoinSupporter* pSupporter = NULL; - + bool success = true; + + { pthread_mutex_lock(&pSql->subState.mutex); + //If the columns are not involved in the final select clause, the corresponding query will not be issued. for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { pSupporter = pSql->pSubs[i]->param; @@ -529,20 +548,19 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ++numOfSub; } } - + assert(numOfSub > 0); - + // scan all subquery, if one sub query has only ts, ignore it tscDebug("0x%"PRIx64" start to launch secondary subqueries, %d out of %d needs to query", pSql->self, numOfSub, pSql->subState.numOfSub); - bool success = true; - for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj *pPrevSub = pSql->pSubs[i]; + if (!pPrevSub) continue; pSql->pSubs[i] = NULL; - + pSupporter = pPrevSub->param; - + if (taosArrayGetSize(pSupporter->exprList) == 0) { tscDebug("0x%"PRIx64" subIndex: %d, no need to launch query, ignore it", pSql->self, i); @@ -570,7 +588,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { tscClearSubqueryInfo(&pNew->cmd); pSql->pSubs[i] = pNew; - + SQueryInfo *pQueryInfo = tscGetQueryInfo(&pNew->cmd); pQueryInfo->tsBuf = pTsBuf; // transfer the ownership of timestamp comp-z data to the new created object @@ -662,14 +680,16 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { pQueryInfo->stableQuery = true; } - subquerySetState(pNew, &pSql->subState, i, 0); - + subquerySetStateWithoutLock(pNew, &pSql->subState, i, 0); + size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); tscDebug("0x%"PRIx64" subquery:0x%"PRIx64" tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s", pSql->self, pNew->self, 0, pTableMetaInfo->vgroupIndex, pQueryInfo->type, taosArrayGetSize(pQueryInfo->exprList), numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name)); } - + + pthread_mutex_unlock(&pSql->subState.mutex); } + //prepare the subqueries object failed, abort if (!success) { pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -679,14 +699,16 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { return pSql->res.code; } - + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - if (pSql->pSubs[i] == NULL) { + SSqlObj *pSub = tscAcquireRefOfSubobj(pSql, i); // ACQ ref + if (pSub == NULL) { continue; } - SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->pSubs[i]->cmd); - executeQuery(pSql->pSubs[i], pQueryInfo); + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd); + executeQuery(pSub, pQueryInfo); + tscReleaseRefOfSubobj(pSub); // REL ref } return TSDB_CODE_SUCCESS; @@ -1027,18 +1049,24 @@ static int32_t tidTagsMergeSort(SArray *arr, int32_t start, int32_t end, const i } static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray* resList) { - int16_t joinNum = pParentSql->subState.numOfSub; STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid); - SJoinSupporter* p0 = pParentSql->pSubs[0]->param; SMergeCtx ctxlist[TSDB_MAX_JOIN_TABLE_NUM] = {{0}}; SMergeCtx* ctxStack[TSDB_MAX_JOIN_TABLE_NUM] = {0}; + SSchema* pColSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId); + + int16_t joinNum = 0; + SJoinSupporter* p0 = NULL; + int32_t size = 0; + int32_t code = TSDB_CODE_SUCCESS; + { pthread_mutex_lock(&pParentSql->subState.mutex); + + joinNum = pParentSql->subState.numOfSub; + p0 = pParentSql->pSubs[0]->param; // int16_t for padding - int32_t size = p0->tagSize - sizeof(int16_t); + size = p0->tagSize - sizeof(int16_t); - SSchema* pColSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId); - tscDebug("0x%"PRIx64" all subquery retrieve complete, do tags match", pParentSql->self); for (int32_t i = 0; i < joinNum; i++) { @@ -1060,10 +1088,16 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar for (int32_t j = 0; j <= i; j++) { taosArrayDestroy(&ctxlist[j].res); } - return TSDB_CODE_QRY_DUP_JOIN_KEY; + code = TSDB_CODE_QRY_DUP_JOIN_KEY; + break; } } + pthread_mutex_unlock(&pParentSql->subState.mutex); } + if (code != TSDB_CODE_SUCCESS) { + return code; + } + int32_t slot = 0; size_t tableNum = 0; int16_t* tableMIdx = 0; @@ -1381,21 +1415,25 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow (*pParentSql->fp)(pParentSql->param, pParentSql, 0); } else { for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) { + SSqlObj *psub = tscAcquireRefOfSubobj(pParentSql, m); // ACQ ref + if (!psub) continue; + // proceed to for ts_comp query - SSqlCmd* pSubCmd = &pParentSql->pSubs[m]->cmd; + SSqlCmd* pSubCmd = &psub->cmd; SArray** s = taosArrayGet(resList, m); SQueryInfo* pQueryInfo1 = tscGetQueryInfo(pSubCmd); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo1, 0); tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo, *s); - SSqlObj* psub = pParentSql->pSubs[m]; ((SJoinSupporter*)psub->param)->pVgroupTables = tscVgroupTableInfoDup(pTableMetaInfo->pVgroupTables); memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub); tscDebug("0x%"PRIx64" reset all sub states to 0", pParentSql->self); issueTsCompQuery(psub, psub->param, pParentSql); + + tscReleaseRefOfSubobj(psub); // REL ref } } @@ -1691,12 +1729,14 @@ _return: } void tscFetchDatablockForSubquery(SSqlObj* pSql) { - assert(pSql->subState.numOfSub >= 1); - int32_t numOfFetch = 0; bool hasData = true; bool reachLimit = false; + { pthread_mutex_lock(&pSql->subState.mutex); + + assert(pSql->subState.numOfSub >= 1); + // if the subquery is NULL, it does not involved in the final result generation for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; @@ -1727,6 +1767,8 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { } } + pthread_mutex_unlock(&pSql->subState.mutex); } + // has data remains in client side, and continue to return data to app if (hasData) { tscBuildResFromSubqueries(pSql); @@ -1752,6 +1794,9 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { bool tryNextVnode = false; bool orderedPrjQuery = false; + + { pthread_mutex_lock(&pSql->subState.mutex); + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if (pSub == NULL) { @@ -1765,21 +1810,21 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { } } - if (orderedPrjQuery) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) { - subquerySetState(pSub, &pSql->subState, i, 0); + subquerySetStateWithoutLock(pSub, &pSql->subState, i, 0); } } } - + + pthread_mutex_unlock(&pSql->subState.mutex); } for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - SSqlObj* pSub = pSql->pSubs[i]; + SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i); // ACQ ref if (pSub == NULL) { - continue; + continue; } SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd); @@ -1809,6 +1854,8 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { tscDebug("0x%"PRIx64" no result in current subquery anymore", pSub->self); } } + + tscReleaseRefOfSubobj(pSub); // REL ref } if (tryNextVnode) { @@ -1832,6 +1879,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { tscDebug("0x%"PRIx64" retrieve data from %d subqueries", pSql->self, numOfFetch); SJoinSupporter* pSupporter = NULL; + { pthread_mutex_lock(&pSql->subState.mutex); for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSql1 = pSql->pSubs[i]; if (pSql1 == NULL) { @@ -1840,12 +1888,13 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { SSqlRes* pRes1 = &pSql1->res; if (pRes1->row >= pRes1->numOfRows && !pRes1->completed) { - subquerySetState(pSql1, &pSql->subState, i, 0); + subquerySetStateWithoutLock(pSql1, &pSql->subState, i, 0); } } + pthread_mutex_unlock(&pSql->subState.mutex); } for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - SSqlObj* pSql1 = pSql->pSubs[i]; + SSqlObj* pSql1 = tscAcquireRefOfSubobj(pSql, i); // ACQ ref if (pSql1 == NULL) { continue; } @@ -1874,6 +1923,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { tscBuildAndSendRequest(pSql1, NULL); } + tscReleaseRefOfSubobj(pSql1); // REF ref } } @@ -2178,18 +2228,24 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { goto _error; } + int errflag = 0; + + { pthread_mutex_lock(&pSql->subState.mutex); + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i); if (pSupporter == NULL) { // failed to create support struct, abort current query tscError("0x%"PRIx64" tableIndex:%d, failed to allocate join support object, abort further query", pSql->self, i); code = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto _error; + errflag = 1; + break; } code = tscCreateJoinSubquery(pSql, i, pSupporter); if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query tscDestroyJoinSupporter(pSupporter); - goto _error; + errflag = 1; + break; } SSqlObj* pSub = pSql->pSubs[i]; @@ -2200,23 +2256,28 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { } } + pthread_mutex_unlock(&pSql->subState.mutex); } + if (errflag) goto _error; + if (pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { // at least one subquery is empty, do nothing and return freeJoinSubqueryObj(pSql); (*pSql->fp)(pSql->param, pSql, 0); } else { int fail = 0; for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - SSqlObj* pSub = pSql->pSubs[i]; + SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i); // ACQ ref + if (!pSub) continue; if (fail) { (*pSub->fp)(pSub->param, pSub, 0); continue; } - + if ((code = tscBuildAndSendRequest(pSub, NULL)) != TSDB_CODE_SUCCESS) { pRes->code = code; (*pSub->fp)(pSub->param, pSub, 0); fail = 1; } + tscReleaseRefOfSubobj(pSub); // REL ref } if(fail) { @@ -2672,11 +2733,13 @@ static void doSendQueryReqs(SSchedMsg* pSchedMsg) { tfree(p); return; } - SSqlObj* pSub = pSql->pSubs[i]; + SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i); // ACQ ref + if (!pSub) continue; SRetrieveSupport* pSupport = pSub->param; tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex); tscBuildAndSendRequest(pSub, NULL); + tscReleaseRefOfSubobj(pSub); // REL ref } tfree(p); @@ -3255,7 +3318,7 @@ SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSq pSql->pSubs[trsupport->subqueryIndex] = pNew; } - + return pNew; } @@ -3327,13 +3390,16 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { } } -static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) { +static bool needRetryInsert(SSqlObj* pParentObj) { if (pParentObj->retry > pParentObj->maxRetry) { tscError("0x%"PRIx64" max retry reached, abort the retry effort", pParentObj->self); return false; } + bool ret = true; - for (int32_t i = 0; i < numOfSub; ++i) { + { pthread_mutex_lock(&pParentObj->subState.mutex); + + for (int32_t i = 0; i < pParentObj->subState.numOfSub; ++i) { int32_t code = pParentObj->pSubs[i]->res.code; if (code == TSDB_CODE_SUCCESS) { continue; @@ -3343,11 +3409,14 @@ static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) { code != TSDB_CODE_VND_INVALID_VGROUP_ID && code != TSDB_CODE_RPC_NETWORK_UNAVAIL && code != TSDB_CODE_APP_NOT_READY) { pParentObj->res.code = code; - return false; + ret = false; + break; } } - return true; + pthread_mutex_unlock(&pParentObj->subState.mutex); } + + return ret; } static void doFreeInsertSupporter(SSqlObj* pSqlObj) { @@ -3392,7 +3461,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) // restore user defined fp pParentObj->fp = pParentObj->fetchFp; - int32_t numOfSub = pParentObj->subState.numOfSub; doFreeInsertSupporter(pParentObj); if (pParentObj->res.code == TSDB_CODE_SUCCESS) { @@ -3403,14 +3471,18 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows; (*pParentObj->fp)(pParentObj->param, pParentObj, v); } else { - if (!needRetryInsert(pParentObj, numOfSub)) { + if (!needRetryInsert(pParentObj)) { tscAsyncResultOnError(pParentObj); return; } int32_t numOfFailed = 0; - for(int32_t i = 0; i < numOfSub; ++i) { + + { pthread_mutex_lock(&pParentObj->subState.mutex); + + for(int32_t i = 0; i < pParentObj->subState.numOfSub; ++i) { SSqlObj* pSql = pParentObj->pSubs[i]; + if (!pSql) continue; if (pSql->res.code != TSDB_CODE_SUCCESS) { numOfFailed += 1; @@ -3420,14 +3492,16 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, 0); tscAddTableMetaInfo(pQueryInfo, &pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL); - subquerySetState(pSql, &pParentObj->subState, i, 0); + subquerySetStateWithoutLock(pSql, &pParentObj->subState, i, 0); tscDebug("0x%"PRIx64", failed sub:%d, %p", pParentObj->self, i, pSql); } } + pthread_mutex_unlock(&pParentObj->subState.mutex); } + tscWarn("0x%"PRIx64" Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d", pParentObj->self, - pParentObj->res.numOfRows, numOfFailed, numOfSub); + pParentObj->res.numOfRows, numOfFailed, pParentObj->subState.numOfSub); tscDebug("0x%"PRIx64" cleanup %d tableMeta in hashTable before reparse sql", pParentObj->self, pParentObj->cmd.insertParam.numOfTables); for(int32_t i = 0; i < pParentObj->cmd.insertParam.numOfTables; ++i) { @@ -3495,7 +3569,12 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { } for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - SSqlObj* pSub = pSql->pSubs[i]; + SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i); // ACQ ref + if (!pSub) { + tscError("0x%"PRIx64" the %d'th one of (num:%d) sub queries is null.", pSql->self, i, pSql->subState.numOfSub); + pRes->code = TSDB_CODE_FAILED; + return pRes->code; + } SInsertSupporter* pSup = calloc(1, sizeof(SInsertSupporter)); pSup->idx = i; pSup->pSql = pSql; @@ -3505,6 +3584,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { if (pSub->res.code != TSDB_CODE_SUCCESS) { tscHandleInsertRetry(pSql, pSub); } + tscReleaseRefOfSubobj(pSub); // REL ref } return TSDB_CODE_SUCCESS; @@ -3517,14 +3597,19 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { goto _error; } + int32_t numOfSub = 0; + int errflag = 0; + + { pthread_mutex_lock(&pSql->subState.mutex); + assert(pSql->subState.numOfSub > 0); tscDebug("0x%"PRIx64" submit data to %d vnode(s)", pSql->self, pSql->subState.numOfSub); - int32_t numOfSub = 0; while(numOfSub < pSql->subState.numOfSub) { SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter)); if (pSupporter == NULL) { - goto _error; + errflag = 1; + break; } pSupporter->pSql = pSql; @@ -3533,7 +3618,8 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT); if (pNew == NULL) { tscError("0x%"PRIx64" failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, numOfSub, strerror(errno)); - goto _error; + errflag = 1; + break; } /* @@ -3541,6 +3627,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { * the callback function (multiVnodeInsertFinalize) correctly. */ pNew->fetchFp = pNew->fp; + pSql->pSubs[numOfSub] = pNew; STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->insertParam.pDataBlocks, numOfSub); @@ -3551,24 +3638,31 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { } else { tscDebug("0x%"PRIx64" prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%s", pSql->self, numOfSub, pSql->subState.numOfSub, tstrerror(pRes->code)); - goto _error; + errflag = 1; + break; } } - + if (numOfSub < pSql->subState.numOfSub) { tscError("0x%"PRIx64" failed to prepare subObj structure and launch sub-insertion", pSql->self); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto _error; + errflag = 1; } + pthread_mutex_unlock(&pSql->subState.mutex); } + if (errflag) { goto _error; } + pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks); // use the local variable - for (int32_t j = 0; j < numOfSub; ++j) { - SSqlObj *pSub = pSql->pSubs[j]; + for (int32_t j = 0; j < pSql->subState.numOfSub; ++j) { + SSqlObj *pSub = tscAcquireRefOfSubobj(pSql, j); // ACQ ref + if (!pSub) continue; tscDebug("0x%"PRIx64" sub:%p launch sub insert, orderOfSub:%d", pSql->self, pSub, j); tscBuildAndSendRequest(pSub, NULL); + tscReleaseRefOfSubobj(pSub); // REL ref } + return TSDB_CODE_SUCCESS; _error: @@ -3595,6 +3689,9 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd); int32_t numOfRes = INT32_MAX; + + { pthread_mutex_lock(&pSql->subState.mutex); + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if (pSub == NULL) { @@ -3605,6 +3702,8 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { numOfRes = (int32_t)(MIN(numOfRes, remain)); } + pthread_mutex_unlock(&pSql->subState.mutex); } + if (numOfRes == 0) { // no result any more, free all subquery objects pSql->res.completed = true; freeJoinSubqueryObj(pSql); @@ -3632,6 +3731,9 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { tscRestoreFuncForSTableQuery(pQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo); + + { pthread_mutex_lock(&pSql->subState.mutex); + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if (pSub == NULL) { @@ -3667,6 +3769,8 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { assert(pSub->res.row <= pSub->res.numOfRows); } + pthread_mutex_unlock(&pSql->subState.mutex); } + pRes->numOfRows = numOfRes; pRes->numOfClauseTotal += numOfRes; @@ -3779,6 +3883,8 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); + { pthread_mutex_lock(&pSql->subState.mutex); + if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { bool allSubqueryExhausted = true; @@ -3825,6 +3931,8 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { } } + pthread_mutex_unlock(&pSql->subState.mutex); } + return hasData; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index ae4a35f65ca5e8f0f2c30ab25bee37e8edb6ead9..97222d86c91edfd96419bbf02e89293bc777d012 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1749,6 +1749,29 @@ SSqlObj* tscAllocSqlObj() { return pNew; } +SSqlObj* tscAcquireRefOfSubobj(SSqlObj* pSql, int32_t idx) { + assert (pSql != NULL); + SSqlObj *pSub = NULL; + + pthread_mutex_lock(&pSql->subState.mutex); + if (idx < 0 || + idx >= pSql->subState.numOfSub || + !pSql->pSubs[idx]) { + goto _out; + } + pSub = taosAcquireRef(tscObjRef, pSql->pSubs[idx]->self); + assert (pSql->pSubs[idx] == pSub && "Refcounted subquery obj mismatch"); + +_out: + pthread_mutex_unlock(&pSql->subState.mutex); + return pSub; +} + +void tscReleaseRefOfSubobj(SSqlObj* pSub) { + assert (pSub != NULL && pSub->self != 0 && "Subquery obj not refcounted"); + taosReleaseRef(tscObjRef, pSub->self); +} + void tscFreeSqlObj(SSqlObj* pSql) { if (pSql == NULL || pSql->signature != pSql) { return; @@ -4209,14 +4232,16 @@ int32_t doReInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) { tscFreeSubobj(pSql); int32_t code = TSDB_CODE_SUCCESS; - pthread_mutex_lock(&pSql->subState.mutex); + { pthread_mutex_lock(&pSql->subState.mutex); + pSql->pSubs = calloc(numOfSubqueries, POINTER_BYTES); pSql->subState.states = calloc(numOfSubqueries, sizeof(int8_t)); if (pSql->pSubs == NULL || pSql->subState.states == NULL) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; } pSql->subState.numOfSub = numOfSubqueries; - pthread_mutex_unlock(&pSql->subState.mutex); + + pthread_mutex_unlock(&pSql->subState.mutex); } return code; } @@ -4247,6 +4272,9 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { goto _error; } + int errflag = 0; + { pthread_mutex_lock(&pSql->subState.mutex); + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, i); @@ -4256,7 +4284,8 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { SSqlObj* pNew = tscAllocSqlObj(); if (pNew == NULL) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto _error; + errflag = 1; + break; } pNew->pTscObj = pSql->pTscObj; @@ -4272,19 +4301,23 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport)); // todo use object id if (ps == NULL) { tscFreeSqlObj(pNew); - goto _error; + errflag = 1; + break; } ps->pParentSql = pSql; ps->subqueryIndex = i; - pNew->param = ps; + + registerSqlObj(pNew); + pSql->pSubs[i] = pNew; SSqlCmd* pCmd = &pNew->cmd; pCmd->command = TSDB_SQL_SELECT; if ((code = tscAddQueryInfo(pCmd)) != TSDB_CODE_SUCCESS) { - goto _error; + errflag = 1; + break; } SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd); @@ -4294,9 +4327,12 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { numOfInit++; } + pthread_mutex_unlock(&pSql->subState.mutex); } + if (errflag) { goto _error; } + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - SSqlObj* psub = pSql->pSubs[i]; - registerSqlObj(psub); + SSqlObj* psub = tscAcquireRefOfSubobj(pSql, i); // ACQ REF + if (!psub) continue; // create sub query to handle the sub query. SQueryInfo* pq = tscGetQueryInfo(&psub->cmd); @@ -4306,6 +4342,8 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { psub->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; } executeQuery(psub, pq); + + tscReleaseRefOfSubobj(psub); // REL REF } return; diff --git a/src/util/src/tref.c b/src/util/src/tref.c index bff8b12aaefc1734318e891efab7a9b02e6557f4..c4a44d71f1fc8921577ba115be210451880c0db0 100644 --- a/src/util/src/tref.c +++ b/src/util/src/tref.c @@ -195,7 +195,6 @@ int taosRemoveRef(int rsetId, int64_t rid) return taosDecRefCount(rsetId, rid, 1); } -// if rid is 0, return the first p in hash list, otherwise, return the next after current rid void *taosAcquireRef(int rsetId, int64_t rid) { int hash;