提交 226157bd 编写于 作者: B Benguang Zhao

enh: synchronize iteration over sub queries with subState.mutex

上级 9216e543
......@@ -434,6 +434,8 @@ typedef struct SSqlStream {
} SSqlStream;
SSqlObj* tscAllocSqlObj();
SSqlObj* tscAcquireRefOfSubobj(SSqlObj *pSql, int32_t idx);
void tscReleaseRefOfSubobj(SSqlObj *pSql);
void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable);
......
......@@ -280,7 +280,8 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
// } else {
// pQdesc->stableQuery = 0;
// }
pthread_mutex_lock(&pSql->subState.mutex);
{ pthread_mutex_lock(&pSql->subState.mutex);
if (pSql->pSubs != NULL && pSql->subState.states != NULL) {
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
// because subState maybe free on anytime by any thread, check validate from here
......@@ -297,7 +298,8 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
}
}
pQdesc->numOfSub = pSql->subState.numOfSub;
pthread_mutex_unlock(&pSql->subState.mutex);
pthread_mutex_unlock(&pSql->subState.mutex); }
}
pQdesc->numOfSub = htonl(pQdesc->numOfSub);
......
......@@ -353,8 +353,8 @@ void checkBrokenQueries(STscObj *pTscObj) {
pSql->lastAlive = taosGetTimestampMs();
}
} else {
// lock subs
pthread_mutex_lock(&pSql->subState.mutex);
{ pthread_mutex_lock(&pSql->subState.mutex);
if (pSql->pSubs) {
// have sub sql
for (int i = 0; i < pSql->subState.numOfSub; i++) {
......@@ -375,8 +375,8 @@ void checkBrokenQueries(STscObj *pTscObj) {
}
}
}
// unlock
pthread_mutex_unlock(&pSql->subState.mutex);
pthread_mutex_unlock(&pSql->subState.mutex); }
}
// kill query
......
......@@ -723,7 +723,9 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
tscLockByThread(&pSql->squeryLock);
{ pthread_mutex_lock(&pSql->subState.mutex);
for (int i = 0; i < pSql->subState.numOfSub; ++i) {
// NOTE: pSub may have been released already here
SSqlObj *pSub = pSql->pSubs[i];
......@@ -743,6 +745,8 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
// taosRelekaseRef(tscObjRef, pSubObj->self);
}
pthread_mutex_unlock(&pSql->subState.mutex); }
if (pSql->subState.numOfSub <= 0) {
tscAsyncResultOnError(pSql);
}
......
......@@ -83,7 +83,6 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) {
}
}
static void asyncCallback(void *param, TAOS_RES *tres, int code) {
assert(param != NULL);
SSub *pSub = ((SSub *)param);
......
......@@ -66,13 +66,11 @@ static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) {
}
}
static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) {
static void subquerySetStateWithoutLock(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) {
assert(idx < subState->numOfSub && subState->states != NULL);
tscDebug("subquery:0x%"PRIx64",%d state set to %d", pSql->self, idx, state);
pthread_mutex_lock(&subState->mutex);
subState->states[idx] = state;
pthread_mutex_unlock(&subState->mutex);
}
static bool allSubqueryDoneWithoutLock(SSqlObj *pParentSql) {
......@@ -123,7 +121,6 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
SLimitVal* pLimit = &pQueryInfo->limit;
int32_t order = pQueryInfo->order.order;
int32_t joinNum = pSql->subState.numOfSub;
SMergeTsCtx ctxlist[TSDB_MAX_JOIN_TABLE_NUM] = {{0}};
SMergeTsCtx* ctxStack[TSDB_MAX_JOIN_TABLE_NUM] = {0};
int32_t slot = 0;
......@@ -138,25 +135,39 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
STSElem prev;
SArray* tsCond = NULL;
int32_t mergeDone = 0;
int32_t joinNum = 0;
int errflag = 0;
{ pthread_mutex_lock(&pSql->subState.mutex);
joinNum = pSql->subState.numOfSub;
for (int32_t i = 0; i < joinNum; ++i) {
SSqlObj *pSub = pSql->pSubs[i];
if (!pSub) {
tscError("0x%"PRIx64" the %d'th of the (%d) sub queries is null.", pSql->self, i, joinNum);
errflag = 1;
break;
}
STSBuf* output = tsBufCreate(true, pQueryInfo->order.order);
SQueryInfo* pSubQueryInfo = tscGetQueryInfo(&pSql->pSubs[i]->cmd);
SQueryInfo* pSubQueryInfo = tscGetQueryInfo(&pSub->cmd);
pSubQueryInfo->tsBuf = output;
SJoinSupporter* pSupporter = pSql->pSubs[i]->param;
SJoinSupporter* pSupporter = pSub->param;
if (pSupporter->pTSBuf == NULL) {
tscDebug("0x%"PRIx64" at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql->self);
return 0;
errflag = 1;
break;
}
tsBufResetPos(pSupporter->pTSBuf);
if (!tsBufNextPos(pSupporter->pTSBuf)) {
tscDebug("0x%"PRIx64" input1 is empty, 0 for secondary query after ts blocks intersecting", pSql->self);
return 0;
errflag = 1;
break;
}
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" table idx:%d, input group number:%d", pSql->self,
......@@ -166,6 +177,11 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
ctxlist[i].res = output;
}
pthread_mutex_unlock(&pSql->subState.mutex); }
if (errflag) {
return 0;
}
TSKEY st = taosGetTimestampUs();
for (int16_t tidx = 0; tidx < joinNum; tidx++) {
......@@ -521,7 +537,10 @@ static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupT
static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
int32_t numOfSub = 0;
SJoinSupporter* pSupporter = NULL;
bool success = true;
{ pthread_mutex_lock(&pSql->subState.mutex);
//If the columns are not involved in the final select clause, the corresponding query will not be issued.
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
pSupporter = pSql->pSubs[i]->param;
......@@ -529,20 +548,19 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
++numOfSub;
}
}
assert(numOfSub > 0);
// scan all subquery, if one sub query has only ts, ignore it
tscDebug("0x%"PRIx64" start to launch secondary subqueries, %d out of %d needs to query", pSql->self, numOfSub, pSql->subState.numOfSub);
bool success = true;
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj *pPrevSub = pSql->pSubs[i];
if (!pPrevSub) continue;
pSql->pSubs[i] = NULL;
pSupporter = pPrevSub->param;
if (taosArrayGetSize(pSupporter->exprList) == 0) {
tscDebug("0x%"PRIx64" subIndex: %d, no need to launch query, ignore it", pSql->self, i);
......@@ -570,7 +588,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
tscClearSubqueryInfo(&pNew->cmd);
pSql->pSubs[i] = pNew;
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pNew->cmd);
pQueryInfo->tsBuf = pTsBuf; // transfer the ownership of timestamp comp-z data to the new created object
......@@ -662,14 +680,16 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
pQueryInfo->stableQuery = true;
}
subquerySetState(pNew, &pSql->subState, i, 0);
subquerySetStateWithoutLock(pNew, &pSql->subState, i, 0);
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
tscDebug("0x%"PRIx64" subquery:0x%"PRIx64" tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
pSql->self, pNew->self, 0, pTableMetaInfo->vgroupIndex, pQueryInfo->type, taosArrayGetSize(pQueryInfo->exprList),
numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));
}
pthread_mutex_unlock(&pSql->subState.mutex); }
//prepare the subqueries object failed, abort
if (!success) {
pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -679,14 +699,16 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
return pSql->res.code;
}
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
if (pSql->pSubs[i] == NULL) {
SSqlObj *pSub = tscAcquireRefOfSubobj(pSql, i); // ACQ ref
if (pSub == NULL) {
continue;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->pSubs[i]->cmd);
executeQuery(pSql->pSubs[i], pQueryInfo);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd);
executeQuery(pSub, pQueryInfo);
tscReleaseRefOfSubobj(pSub); // REL ref
}
return TSDB_CODE_SUCCESS;
......@@ -1027,18 +1049,24 @@ static int32_t tidTagsMergeSort(SArray *arr, int32_t start, int32_t end, const i
}
static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray* resList) {
int16_t joinNum = pParentSql->subState.numOfSub;
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
SJoinSupporter* p0 = pParentSql->pSubs[0]->param;
SMergeCtx ctxlist[TSDB_MAX_JOIN_TABLE_NUM] = {{0}};
SMergeCtx* ctxStack[TSDB_MAX_JOIN_TABLE_NUM] = {0};
SSchema* pColSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
int16_t joinNum = 0;
SJoinSupporter* p0 = NULL;
int32_t size = 0;
int32_t code = TSDB_CODE_SUCCESS;
{ pthread_mutex_lock(&pParentSql->subState.mutex);
joinNum = pParentSql->subState.numOfSub;
p0 = pParentSql->pSubs[0]->param;
// int16_t for padding
int32_t size = p0->tagSize - sizeof(int16_t);
size = p0->tagSize - sizeof(int16_t);
SSchema* pColSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
tscDebug("0x%"PRIx64" all subquery retrieve <tid, tags> complete, do tags match", pParentSql->self);
for (int32_t i = 0; i < joinNum; i++) {
......@@ -1060,10 +1088,16 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
for (int32_t j = 0; j <= i; j++) {
taosArrayDestroy(&ctxlist[j].res);
}
return TSDB_CODE_QRY_DUP_JOIN_KEY;
code = TSDB_CODE_QRY_DUP_JOIN_KEY;
break;
}
}
pthread_mutex_unlock(&pParentSql->subState.mutex); }
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t slot = 0;
size_t tableNum = 0;
int16_t* tableMIdx = 0;
......@@ -1381,21 +1415,25 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
} else {
for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) {
SSqlObj *psub = tscAcquireRefOfSubobj(pParentSql, m); // ACQ ref
if (!psub) continue;
// proceed to for ts_comp query
SSqlCmd* pSubCmd = &pParentSql->pSubs[m]->cmd;
SSqlCmd* pSubCmd = &psub->cmd;
SArray** s = taosArrayGet(resList, m);
SQueryInfo* pQueryInfo1 = tscGetQueryInfo(pSubCmd);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo1, 0);
tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo, *s);
SSqlObj* psub = pParentSql->pSubs[m];
((SJoinSupporter*)psub->param)->pVgroupTables = tscVgroupTableInfoDup(pTableMetaInfo->pVgroupTables);
memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub);
tscDebug("0x%"PRIx64" reset all sub states to 0", pParentSql->self);
issueTsCompQuery(psub, psub->param, pParentSql);
tscReleaseRefOfSubobj(psub); // REL ref
}
}
......@@ -1691,12 +1729,14 @@ _return:
}
void tscFetchDatablockForSubquery(SSqlObj* pSql) {
assert(pSql->subState.numOfSub >= 1);
int32_t numOfFetch = 0;
bool hasData = true;
bool reachLimit = false;
{ pthread_mutex_lock(&pSql->subState.mutex);
assert(pSql->subState.numOfSub >= 1);
// if the subquery is NULL, it does not involved in the final result generation
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
......@@ -1727,6 +1767,8 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
}
}
pthread_mutex_unlock(&pSql->subState.mutex); }
// has data remains in client side, and continue to return data to app
if (hasData) {
tscBuildResFromSubqueries(pSql);
......@@ -1752,6 +1794,9 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
bool tryNextVnode = false;
bool orderedPrjQuery = false;
{ pthread_mutex_lock(&pSql->subState.mutex);
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) {
......@@ -1765,21 +1810,21 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
}
}
if (orderedPrjQuery) {
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) {
subquerySetState(pSub, &pSql->subState, i, 0);
subquerySetStateWithoutLock(pSub, &pSql->subState, i, 0);
}
}
}
pthread_mutex_unlock(&pSql->subState.mutex); }
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i); // ACQ ref
if (pSub == NULL) {
continue;
continue;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd);
......@@ -1809,6 +1854,8 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
tscDebug("0x%"PRIx64" no result in current subquery anymore", pSub->self);
}
}
tscReleaseRefOfSubobj(pSub); // REL ref
}
if (tryNextVnode) {
......@@ -1832,6 +1879,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
tscDebug("0x%"PRIx64" retrieve data from %d subqueries", pSql->self, numOfFetch);
SJoinSupporter* pSupporter = NULL;
{ pthread_mutex_lock(&pSql->subState.mutex);
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSql1 = pSql->pSubs[i];
if (pSql1 == NULL) {
......@@ -1840,12 +1888,13 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
SSqlRes* pRes1 = &pSql1->res;
if (pRes1->row >= pRes1->numOfRows && !pRes1->completed) {
subquerySetState(pSql1, &pSql->subState, i, 0);
subquerySetStateWithoutLock(pSql1, &pSql->subState, i, 0);
}
}
pthread_mutex_unlock(&pSql->subState.mutex); }
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSql1 = pSql->pSubs[i];
SSqlObj* pSql1 = tscAcquireRefOfSubobj(pSql, i); // ACQ ref
if (pSql1 == NULL) {
continue;
}
......@@ -1874,6 +1923,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
tscBuildAndSendRequest(pSql1, NULL);
}
tscReleaseRefOfSubobj(pSql1); // REF ref
}
}
......@@ -2178,18 +2228,24 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
code = doReInitSubState(pSql, pQueryInfo->numOfTables);
assert (code == TSDB_CODE_SUCCESS && "Out of memory");
int errflag = 0;
{ pthread_mutex_lock(&pSql->subState.mutex);
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i);
if (pSupporter == NULL) { // failed to create support struct, abort current query
tscError("0x%"PRIx64" tableIndex:%d, failed to allocate join support object, abort further query", pSql->self, i);
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
errflag = 1;
break;
}
code = tscCreateJoinSubquery(pSql, i, pSupporter);
if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query
tscDestroyJoinSupporter(pSupporter);
goto _error;
errflag = 1;
break;
}
SSqlObj* pSub = pSql->pSubs[i];
......@@ -2200,23 +2256,28 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
}
}
pthread_mutex_unlock(&pSql->subState.mutex); }
if (errflag) goto _error;
if (pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { // at least one subquery is empty, do nothing and return
freeJoinSubqueryObj(pSql);
(*pSql->fp)(pSql->param, pSql, 0);
} else {
int fail = 0;
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i); // ACQ ref
if (!pSub) continue;
if (fail) {
(*pSub->fp)(pSub->param, pSub, 0);
continue;
}
if ((code = tscBuildAndSendRequest(pSub, NULL)) != TSDB_CODE_SUCCESS) {
pRes->code = code;
(*pSub->fp)(pSub->param, pSub, 0);
fail = 1;
}
tscReleaseRefOfSubobj(pSub); // REL ref
}
if(fail) {
......@@ -2672,11 +2733,13 @@ static void doSendQueryReqs(SSchedMsg* pSchedMsg) {
tfree(p);
return;
}
SSqlObj* pSub = pSql->pSubs[i];
SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i); // ACQ ref
if (!pSub) continue;
SRetrieveSupport* pSupport = pSub->param;
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex);
tscBuildAndSendRequest(pSub, NULL);
tscReleaseRefOfSubobj(pSub); // REL ref
}
tfree(p);
......@@ -3255,7 +3318,7 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo
pSql->pSubs[trsupport->subqueryIndex] = pNew;
}
return pNew;
}
......@@ -3327,13 +3390,16 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
}
}
static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) {
static bool needRetryInsert(SSqlObj* pParentObj) {
if (pParentObj->retry > pParentObj->maxRetry) {
tscError("0x%"PRIx64" max retry reached, abort the retry effort", pParentObj->self);
return false;
}
bool ret = true;
for (int32_t i = 0; i < numOfSub; ++i) {
{ pthread_mutex_lock(&pParentObj->subState.mutex);
for (int32_t i = 0; i < pParentObj->subState.numOfSub; ++i) {
int32_t code = pParentObj->pSubs[i]->res.code;
if (code == TSDB_CODE_SUCCESS) {
continue;
......@@ -3343,11 +3409,14 @@ static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) {
code != TSDB_CODE_VND_INVALID_VGROUP_ID && code != TSDB_CODE_RPC_NETWORK_UNAVAIL &&
code != TSDB_CODE_APP_NOT_READY) {
pParentObj->res.code = code;
return false;
ret = false;
break;
}
}
return true;
pthread_mutex_unlock(&pParentObj->subState.mutex); }
return ret;
}
static void doFreeInsertSupporter(SSqlObj* pSqlObj) {
......@@ -3392,7 +3461,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
// restore user defined fp
pParentObj->fp = pParentObj->fetchFp;
int32_t numOfSub = pParentObj->subState.numOfSub;
doFreeInsertSupporter(pParentObj);
if (pParentObj->res.code == TSDB_CODE_SUCCESS) {
......@@ -3403,14 +3471,18 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows;
(*pParentObj->fp)(pParentObj->param, pParentObj, v);
} else {
if (!needRetryInsert(pParentObj, numOfSub)) {
if (!needRetryInsert(pParentObj)) {
tscAsyncResultOnError(pParentObj);
return;
}
int32_t numOfFailed = 0;
for(int32_t i = 0; i < numOfSub; ++i) {
{ pthread_mutex_lock(&pParentObj->subState.mutex);
for(int32_t i = 0; i < pParentObj->subState.numOfSub; ++i) {
SSqlObj* pSql = pParentObj->pSubs[i];
if (!pSql) continue;
if (pSql->res.code != TSDB_CODE_SUCCESS) {
numOfFailed += 1;
......@@ -3420,14 +3492,16 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, 0);
tscAddTableMetaInfo(pQueryInfo, &pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
subquerySetState(pSql, &pParentObj->subState, i, 0);
subquerySetStateWithoutLock(pSql, &pParentObj->subState, i, 0);
tscDebug("0x%"PRIx64", failed sub:%d, %p", pParentObj->self, i, pSql);
}
}
pthread_mutex_unlock(&pParentObj->subState.mutex); }
tscWarn("0x%"PRIx64" Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d", pParentObj->self,
pParentObj->res.numOfRows, numOfFailed, numOfSub);
pParentObj->res.numOfRows, numOfFailed, pParentObj->subState.numOfSub);
tscDebug("0x%"PRIx64" cleanup %d tableMeta in hashTable before reparse sql", pParentObj->self, pParentObj->cmd.insertParam.numOfTables);
for(int32_t i = 0; i < pParentObj->cmd.insertParam.numOfTables; ++i) {
......@@ -3495,7 +3569,12 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
}
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i); // ACQ ref
if (!pSub) {
tscError("0x%"PRIx64" the %d'th one of (num:%d) sub queries is null.", pSql->self, i, pSql->subState.numOfSub);
pRes->code = TSDB_CODE_FAILED;
return pRes->code;
}
SInsertSupporter* pSup = calloc(1, sizeof(SInsertSupporter));
pSup->index = i;
pSup->pSql = pSql;
......@@ -3505,6 +3584,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
if (pSub->res.code != TSDB_CODE_SUCCESS) {
tscHandleInsertRetry(pSql, pSub);
}
tscReleaseRefOfSubobj(pSub); // REL ref
}
return TSDB_CODE_SUCCESS;
......@@ -3517,14 +3597,19 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
goto _error;
}
int32_t numOfSub = 0;
int errflag = 0;
{ pthread_mutex_lock(&pSql->subState.mutex);
assert(pSql->subState.numOfSub > 0);
tscDebug("0x%"PRIx64" submit data to %d vnode(s)", pSql->self, pSql->subState.numOfSub);
int32_t numOfSub = 0;
while(numOfSub < pSql->subState.numOfSub) {
SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter));
if (pSupporter == NULL) {
goto _error;
errflag = 1;
break;
}
pSupporter->pSql = pSql;
......@@ -3533,7 +3618,8 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT);
if (pNew == NULL) {
tscError("0x%"PRIx64" failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, numOfSub, strerror(errno));
goto _error;
errflag = 1;
break;
}
/*
......@@ -3541,6 +3627,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
* the callback function (multiVnodeInsertFinalize) correctly.
*/
pNew->fetchFp = pNew->fp;
pSql->pSubs[numOfSub] = pNew;
STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->insertParam.pDataBlocks, numOfSub);
......@@ -3551,24 +3638,31 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
} else {
tscDebug("0x%"PRIx64" prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%s", pSql->self, numOfSub,
pSql->subState.numOfSub, tstrerror(pRes->code));
goto _error;
errflag = 1;
break;
}
}
if (numOfSub < pSql->subState.numOfSub) {
tscError("0x%"PRIx64" failed to prepare subObj structure and launch sub-insertion", pSql->self);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
errflag = 1;
}
pthread_mutex_unlock(&pSql->subState.mutex); }
if (errflag) { goto _error; }
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks);
// use the local variable
for (int32_t j = 0; j < numOfSub; ++j) {
SSqlObj *pSub = pSql->pSubs[j];
for (int32_t j = 0; j < pSql->subState.numOfSub; ++j) {
SSqlObj *pSub = tscAcquireRefOfSubobj(pSql, j); // ACQ ref
if (!pSub) continue;
tscDebug("0x%"PRIx64" sub:%p launch sub insert, orderOfSub:%d", pSql->self, pSub, j);
tscBuildAndSendRequest(pSub, NULL);
tscReleaseRefOfSubobj(pSub); // REL ref
}
return TSDB_CODE_SUCCESS;
_error:
......@@ -3595,6 +3689,9 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
int32_t numOfRes = INT32_MAX;
{ pthread_mutex_lock(&pSql->subState.mutex);
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) {
......@@ -3605,6 +3702,8 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
numOfRes = (int32_t)(MIN(numOfRes, remain));
}
pthread_mutex_unlock(&pSql->subState.mutex); }
if (numOfRes == 0) { // no result any more, free all subquery objects
pSql->res.completed = true;
freeJoinSubqueryObj(pSql);
......@@ -3632,6 +3731,9 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
tscRestoreFuncForSTableQuery(pQueryInfo);
tscFieldInfoUpdateOffset(pQueryInfo);
{ pthread_mutex_lock(&pSql->subState.mutex);
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) {
......@@ -3667,6 +3769,8 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
assert(pSub->res.row <= pSub->res.numOfRows);
}
pthread_mutex_unlock(&pSql->subState.mutex); }
pRes->numOfRows = numOfRes;
pRes->numOfClauseTotal += numOfRes;
......@@ -3779,6 +3883,8 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
{ pthread_mutex_lock(&pSql->subState.mutex);
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
bool allSubqueryExhausted = true;
......@@ -3825,6 +3931,8 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
}
}
pthread_mutex_unlock(&pSql->subState.mutex); }
return hasData;
}
......
......@@ -1727,6 +1727,29 @@ SSqlObj* tscAllocSqlObj() {
return pNew;
}
SSqlObj* tscAcquireRefOfSubobj(SSqlObj* pSql, int32_t idx) {
assert (pSql != NULL);
SSqlObj *pSub = NULL;
pthread_mutex_lock(&pSql->subState.mutex);
if (idx < 0 ||
idx >= pSql->subState.numOfSub ||
!pSql->pSubs[idx]) {
goto _out;
}
pSub = taosAcquireRef(tscObjRef, pSql->pSubs[idx]->self);
assert (pSql->pSubs[idx] == pSub && "Refcounted subquery obj mismatch");
_out:
pthread_mutex_unlock(&pSql->subState.mutex);
return pSub;
}
void tscReleaseRefOfSubobj(SSqlObj* pSub) {
assert (pSub != NULL && pSub->self != 0 && "Subquery obj not refcounted");
taosReleaseRef(tscObjRef, pSub->self);
}
void tscFreeSqlObj(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) {
return;
......@@ -4172,14 +4195,16 @@ int32_t doReInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) {
tscFreeSubobj(pSql);
int32_t code = TSDB_CODE_SUCCESS;
pthread_mutex_lock(&pSql->subState.mutex);
{ pthread_mutex_lock(&pSql->subState.mutex);
pSql->pSubs = calloc(numOfSubqueries, POINTER_BYTES);
pSql->subState.states = calloc(numOfSubqueries, sizeof(int8_t));
if (pSql->pSubs == NULL || pSql->subState.states == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pSql->subState.numOfSub = numOfSubqueries;
pthread_mutex_unlock(&pSql->subState.mutex);
pthread_mutex_unlock(&pSql->subState.mutex); }
return code;
}
......@@ -4204,6 +4229,9 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
goto _error;
}
int errflag = 0;
{ pthread_mutex_lock(&pSql->subState.mutex);
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, i);
......@@ -4213,7 +4241,8 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
SSqlObj* pNew = tscAllocSqlObj();
if (pNew == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
errflag = 1;
break;
}
pNew->pTscObj = pSql->pTscObj;
......@@ -4229,19 +4258,23 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport)); // todo use object id
if (ps == NULL) {
tscFreeSqlObj(pNew);
goto _error;
errflag = 1;
break;
}
ps->pParentSql = pSql;
ps->subqueryIndex = i;
pNew->param = ps;
registerSqlObj(pNew);
pSql->pSubs[i] = pNew;
SSqlCmd* pCmd = &pNew->cmd;
pCmd->command = TSDB_SQL_SELECT;
if ((code = tscAddQueryInfo(pCmd)) != TSDB_CODE_SUCCESS) {
goto _error;
errflag = 1;
break;
}
SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd);
......@@ -4251,9 +4284,12 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
numOfInit++;
}
pthread_mutex_unlock(&pSql->subState.mutex); }
if (errflag) { goto _error; }
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* psub = pSql->pSubs[i];
registerSqlObj(psub);
SSqlObj* psub = tscAcquireRefOfSubobj(pSql, i); // ACQ REF
if (!psub) continue;
// create sub query to handle the sub query.
SQueryInfo* pq = tscGetQueryInfo(&psub->cmd);
......@@ -4263,6 +4299,8 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
psub->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
}
executeQuery(psub, pq);
tscReleaseRefOfSubobj(psub); // REL REF
}
return;
......
......@@ -195,7 +195,6 @@ int taosRemoveRef(int rsetId, int64_t rid)
return taosDecRefCount(rsetId, rid, 1);
}
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid
void *taosAcquireRef(int rsetId, int64_t rid)
{
int hash;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册