提交 80c0a3cf 编写于 作者: B Benguang Zhao

enh: initialize and destroy pSubs and their states with doReInitSubState and...

enh: initialize and destroy pSubs and their states with doReInitSubState and tscFreeSubobj/freeJoinSubqueryObj
上级 8c9aaa9f
...@@ -75,7 +75,7 @@ static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, i ...@@ -75,7 +75,7 @@ static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, i
pthread_mutex_unlock(&subState->mutex); pthread_mutex_unlock(&subState->mutex);
} }
static bool allSubqueryDone(SSqlObj *pParentSql) { static bool allSubqueryDoneWithoutLock(SSqlObj *pParentSql) {
bool done = true; bool done = true;
SSubqueryState *subState = &pParentSql->subState; SSubqueryState *subState = &pParentSql->subState;
...@@ -107,7 +107,7 @@ bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) { ...@@ -107,7 +107,7 @@ bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
tscDebug("0x%"PRIx64" subquery:0x%"PRIx64", index:%d state set to 1", pParentSql->self, pSql->self, idx); tscDebug("0x%"PRIx64" subquery:0x%"PRIx64", index:%d state set to 1", pParentSql->self, pSql->self, idx);
subState->states[idx] = 1; subState->states[idx] = 1;
} }
bool done = allSubqueryDone(pParentSql); bool done = allSubqueryDoneWithoutLock(pParentSql);
if (!done) { if (!done) {
tscDebug("0x%"PRIx64" sub:%p,%d completed, total:%d", pParentSql->self, pSql, idx, pParentSql->subState.numOfSub); tscDebug("0x%"PRIx64" sub:%p,%d completed, total:%d", pParentSql->self, pSql, idx, pParentSql->subState.numOfSub);
} }
...@@ -115,8 +115,6 @@ bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) { ...@@ -115,8 +115,6 @@ bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
return done; return done;
} }
static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) { static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd); SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
...@@ -695,6 +693,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -695,6 +693,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
} }
void freeJoinSubqueryObj(SSqlObj* pSql) { void freeJoinSubqueryObj(SSqlObj* pSql) {
pthread_mutex_lock(&pSql->subState.mutex);
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) { if (pSub == NULL) {
...@@ -707,9 +706,10 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { ...@@ -707,9 +706,10 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
taos_free_result(pSub); taos_free_result(pSub);
pSql->pSubs[i] = NULL; pSql->pSubs[i] = NULL;
} }
tfree(pSql->pSubs);
tfree(pSql->subState.states); tfree(pSql->subState.states);
pSql->subState.numOfSub = 0; pSql->subState.numOfSub = 0;
pthread_mutex_unlock(&pSql->subState.mutex);
} }
static int32_t quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { static int32_t quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
...@@ -1832,8 +1832,6 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1832,8 +1832,6 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
continue; continue;
} }
SSqlRes* pRes1 = &pSql1->res; SSqlRes* pRes1 = &pSql1->res;
if (pRes1->row >= pRes1->numOfRows && !pRes1->completed) { if (pRes1->row >= pRes1->numOfRows && !pRes1->completed) {
subquerySetState(pSql1, &pSql->subState, i, 0); subquerySetState(pSql1, &pSql->subState, i, 0);
...@@ -2031,39 +2029,32 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code); ...@@ -2031,39 +2029,32 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);
int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) { int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) {
SSqlCmd * pCmd = &pSql->cmd; SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
pSql->res.qId = 0x1; pSql->res.qId = 0x1;
assert(pSql->res.numOfRows == 0); assert(pSql->res.numOfRows == 0);
if (pSql->pSubs == NULL) {
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
if (pSql->pSubs == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL); SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL);
if (pNew == NULL) { if (pNew == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
pSql->pSubs[tableIndex] = pNew; pSql->pSubs[tableIndex] = pNew;
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
addGroupInfoForSubquery(pSql, pNew, 0, tableIndex); addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
// refactor as one method // refactor as one method
SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd); SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd);
assert(pNewQueryInfo != NULL); assert(pNewQueryInfo != NULL);
pSupporter->colList = pNewQueryInfo->colList; pSupporter->colList = pNewQueryInfo->colList;
pNewQueryInfo->colList = NULL; pNewQueryInfo->colList = NULL;
pSupporter->exprList = pNewQueryInfo->exprList; pSupporter->exprList = pNewQueryInfo->exprList;
pNewQueryInfo->exprList = NULL; pNewQueryInfo->exprList = NULL;
pSupporter->fieldsInfo = pNewQueryInfo->fieldsInfo; pSupporter->fieldsInfo = pNewQueryInfo->fieldsInfo;
// this data needs to be transfer to support struct // this data needs to be transfer to support struct
memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo)); memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo));
if (tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond) != 0) { if (tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond) != 0) {
...@@ -2176,27 +2167,17 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -2176,27 +2167,17 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pthread_mutex_lock(&pSql->subState.mutex); code = doReInitSubState(pSql, pQueryInfo->numOfTables);
pSql->subState.numOfSub = pQueryInfo->numOfTables; assert (code == TSDB_CODE_SUCCESS && "Out of memory");
if (pSql->subState.states == NULL) {
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states));
if (pSql->subState.states == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
}
memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
tscDebug("0x%"PRIx64" reset all sub states to 0, start subquery, total:%d", pSql->self, pQueryInfo->numOfTables);
pthread_mutex_unlock(&pSql->subState.mutex);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i); SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i);
if (pSupporter == NULL) { // failed to create support struct, abort current query if (pSupporter == NULL) { // failed to create support struct, abort current query
tscError("0x%"PRIx64" tableIndex:%d, failed to allocate join support object, abort further query", pSql->self, i); tscError("0x%"PRIx64" tableIndex:%d, failed to allocate join support object, abort further query", pSql->self, i);
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; goto _error;
} }
code = tscCreateJoinSubquery(pSql, i, pSupporter); code = tscCreateJoinSubquery(pSql, i, pSupporter);
if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query
tscDestroyJoinSupporter(pSupporter); tscDestroyJoinSupporter(pSupporter);
...@@ -2359,8 +2340,7 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -2359,8 +2340,7 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscFreeFirstRoundSup(&param); tscFreeFirstRoundSup(&param);
taos_free_result(pSql); taos_free_result(pSql);
pParent->subState.numOfSub = 0; tscFreeSubobj(pParent);
tfree(pParent->pSubs);
pParent->res.code = code; pParent->res.code = code;
tscAsyncResultOnError(pParent); tscAsyncResultOnError(pParent);
return; return;
...@@ -2463,8 +2443,7 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -2463,8 +2443,7 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
tscFreeFirstRoundSup(&param); tscFreeFirstRoundSup(&param);
taos_free_result(pSql); taos_free_result(pSql);
pParent->subState.numOfSub = 0; tscFreeSubobj(pParent);
tfree(pParent->pSubs);
if (resRows == 0) { if (resRows == 0) {
pParent->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; pParent->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
...@@ -2487,8 +2466,7 @@ void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) { ...@@ -2487,8 +2466,7 @@ void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) {
tscFreeFirstRoundSup(&param); tscFreeFirstRoundSup(&param);
taos_free_result(pSql); taos_free_result(pSql);
parent->subState.numOfSub = 0; tscFreeSubobj(parent);
tfree(parent->pSubs);
parent->res.code = c; parent->res.code = c;
tscAsyncResultOnError(parent); tscAsyncResultOnError(parent);
return; return;
...@@ -2653,14 +2631,11 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { ...@@ -2653,14 +2631,11 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
pSql->self, pNew->self, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pNewQueryInfo->type, pSql->self, pNew->self, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pNewQueryInfo->type,
tscNumOfExprs(pNewQueryInfo), idx+1, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name)); tscNumOfExprs(pNewQueryInfo), idx+1, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));
pSql->pSubs = calloc(1, POINTER_BYTES); int32_t code = doReInitSubState(pSql, 1);
if (pSql->pSubs == NULL) { if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; goto _error;
} }
pSql->subState.numOfSub = 1;
pSql->pSubs[0] = pNew; pSql->pSubs[0] = pNew;
tscHandleMasterSTableQuery(pNew); tscHandleMasterSTableQuery(pNew);
......
...@@ -1686,12 +1686,11 @@ void tscFreeSubobj(SSqlObj* pSql) { ...@@ -1686,12 +1686,11 @@ void tscFreeSubobj(SSqlObj* pSql) {
tscDebug("0x%"PRIx64" start to free sub SqlObj, numOfSub:%d", pSql->self, pSql->subState.numOfSub); tscDebug("0x%"PRIx64" start to free sub SqlObj, numOfSub:%d", pSql->self, pSql->subState.numOfSub);
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
if (pSql->pSubs[i] != NULL) { if (!pSql->pSubs[i]) {
tscDebug("0x%"PRIx64" free sub SqlObj:0x%"PRIx64", index:%d", pSql->self, pSql->pSubs[i]->self, i);
} else {
/* just for python error test case */
tscDebug("0x%"PRIx64" free sub SqlObj:0x0, index:%d", pSql->self, i); tscDebug("0x%"PRIx64" free sub SqlObj:0x0, index:%d", pSql->self, i);
continue;
} }
tscDebug("0x%"PRIx64" free sub SqlObj:0x%"PRIx64", index:%d", pSql->self, pSql->pSubs[i]->self, i);
taos_free_result(pSql->pSubs[i]); taos_free_result(pSql->pSubs[i]);
pSql->pSubs[i] = NULL; pSql->pSubs[i] = NULL;
} }
...@@ -4203,21 +4202,20 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { ...@@ -4203,21 +4202,20 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
return; return;
} }
taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param); taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param);
} }
int32_t doReInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) { int32_t doReInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) {
tscFreeSubobj(pSql); tscFreeSubobj(pSql);
int32_t code = TSDB_CODE_SUCCESS;
pthread_mutex_lock(&pSql->subState.mutex); pthread_mutex_lock(&pSql->subState.mutex);
int32_t code = TSDB_CODE_SUCCESS; pSql->pSubs = calloc(numOfSubqueries, POINTER_BYTES);
pSql->subState.numOfSub = numOfSubqueries; pSql->subState.states = calloc(numOfSubqueries, sizeof(int8_t));
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t));
if (pSql->pSubs == NULL || pSql->subState.states == NULL) { if (pSql->pSubs == NULL || pSql->subState.states == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
pSql->subState.numOfSub = numOfSubqueries;
pthread_mutex_unlock(&pSql->subState.mutex); pthread_mutex_unlock(&pSql->subState.mutex);
return code; return code;
} }
...@@ -4311,26 +4309,16 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4311,26 +4309,16 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
} }
return; return;
} else if (hasMoreClauseToTry(pSql)) {
if (pthread_mutex_init(&pSql->subState.mutex, NULL) != 0) {
goto _error;
}
} }
pSql->cmd.active = pQueryInfo; pSql->cmd.active = pQueryInfo;
doExecuteQuery(pSql, pQueryInfo); doExecuteQuery(pSql, pQueryInfo);
return; return;
_error: _error:
for(int32_t i = 0; i < numOfInit; ++i) {
SSqlObj* p = pSql->pSubs[i];
tscFreeSqlObj(p);
}
pSql->res.code = code; pSql->res.code = code;
pSql->subState.numOfSub = 0; // not initialized sub query object will not be freed tscFreeSubobj(pSql);
tfree(pSql->subState.states);
tfree(pSql->pSubs);
tscAsyncResultOnError(pSql); tscAsyncResultOnError(pSql);
} }
...@@ -4574,7 +4562,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { ...@@ -4574,7 +4562,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
* *
* For super table join with projection query, if anyone of the subquery is exhausted, the query completed. * For super table join with projection query, if anyone of the subquery is exhausted, the query completed.
*/ */
pSql->subState.numOfSub = 0; tscFreeSubobj(pSql);
pCmd->command = TSDB_SQL_SELECT; pCmd->command = TSDB_SQL_SELECT;
tscResetForNextRetrieve(pRes); tscResetForNextRetrieve(pRes);
...@@ -4606,15 +4594,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) { ...@@ -4606,15 +4594,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) {
pRes->final = finalBk; pRes->final = finalBk;
pRes->numOfTotal = num; pRes->numOfTotal = num;
pthread_mutex_lock(&pSql->subState.mutex); tscFreeSubobj(pSql);
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
taos_free_result(pSql->pSubs[i]);
}
tfree(pSql->pSubs);
tfree(pSql->subState.states);
pSql->subState.numOfSub = 0;
pthread_mutex_unlock(&pSql->subState.mutex);
pSql->fp = fp; pSql->fp = fp;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册