未验证 提交 aa610e6b 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16895 from taosdata/FIX/TD-19011-2.6

fix: synchronize initialization and destruction of subqueries and their states
...@@ -50,7 +50,7 @@ void tscUnlockByThread(int64_t *lockedBy); ...@@ -50,7 +50,7 @@ void tscUnlockByThread(int64_t *lockedBy);
int tsInsertInitialCheck(SSqlObj *pSql); int tsInsertInitialCheck(SSqlObj *pSql);
void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs); void doCleanupSubqueries(SSqlObj *pSql);
void tscFreeRetrieveSup(void **param); void tscFreeRetrieveSup(void **param);
......
...@@ -350,7 +350,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex, SSqlC ...@@ -350,7 +350,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex, SSqlC
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid); int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid);
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId); int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId);
int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries); int32_t doReInitSubState(SSqlObj* pSql, int32_t numOfSubqueries);
void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex); void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex);
......
...@@ -361,6 +361,7 @@ typedef struct SSubqueryState { ...@@ -361,6 +361,7 @@ typedef struct SSubqueryState {
int8_t *states; int8_t *states;
int32_t numOfSub; // the number of total sub-queries int32_t numOfSub; // the number of total sub-queries
uint64_t numOfRetrievedRows; // total number of points in this query uint64_t numOfRetrievedRows; // total number of points in this query
uint32_t version;
} SSubqueryState; } SSubqueryState;
typedef struct SSqlObj { typedef struct SSqlObj {
...@@ -388,7 +389,6 @@ typedef struct SSqlObj { ...@@ -388,7 +389,6 @@ typedef struct SSqlObj {
SSqlRes res; SSqlRes res;
SSubqueryState subState; SSubqueryState subState;
pthread_mutex_t mtxSubs; // avoid double access pSubs after failure
struct SSqlObj **pSubs; struct SSqlObj **pSubs;
struct SSqlObj *rootObj; struct SSqlObj *rootObj;
...@@ -440,6 +440,12 @@ typedef struct SSqlStream { ...@@ -440,6 +440,12 @@ typedef struct SSqlStream {
struct SSqlStream *prev, *next; struct SSqlStream *prev, *next;
} SSqlStream; } SSqlStream;
SSqlObj* tscAllocSqlObj();
uint32_t tscGetVersionOfSubStateWithoutLock(SSqlObj *pSql);
SSqlObj* tscAcquireRefOfSubobj(SSqlObj *pSql, int32_t idx, uint32_t stateVersion);
void tscReleaseRefOfSubobj(SSqlObj *pSql);
void tscResetAllSubStates(SSqlObj* pSql);
void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable); void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable);
int tscAcquireRpc(const char *key, const char *user, const char *secret,void **pRpcObj); int tscAcquireRpc(const char *key, const char *user, const char *secret,void **pRpcObj);
......
...@@ -360,8 +360,6 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para ...@@ -360,8 +360,6 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
pSql->fetchFp = fp; pSql->fetchFp = fp;
pSql->rootObj = pSql; pSql->rootObj = pSql;
pthread_mutex_init(&pSql->mtxSubs, NULL);
registerSqlObj(pSql); registerSqlObj(pSql);
pSql->sqlstr = calloc(1, sqlLen + 1); pSql->sqlstr = calloc(1, sqlLen + 1);
...@@ -424,7 +422,7 @@ TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, v ...@@ -424,7 +422,7 @@ TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, v
nPrintTsc("%s", sqlstr); nPrintTsc("%s", sqlstr);
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pSql = tscAllocSqlObj();
if (pSql == NULL) { if (pSql == NULL) {
tscError("failed to malloc sqlObj"); tscError("failed to malloc sqlObj");
tscQueueAsyncError(fp, param, TSDB_CODE_TSC_OUT_OF_MEMORY); tscQueueAsyncError(fp, param, TSDB_CODE_TSC_OUT_OF_MEMORY);
......
...@@ -117,7 +117,7 @@ void writeMsgVgId(char * payload, int32_t vgId) { ...@@ -117,7 +117,7 @@ void writeMsgVgId(char * payload, int32_t vgId) {
SSqlObj *tscCreateSTableSubDelete(SSqlObj *pSql, SVgroupMsg* pVgroupMsg, SRetrieveSupport *trsupport) { SSqlObj *tscCreateSTableSubDelete(SSqlObj *pSql, SVgroupMsg* pVgroupMsg, SRetrieveSupport *trsupport) {
// Init // Init
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); SSqlObj* pNew = tscAllocSqlObj();
if (pNew == NULL) { if (pNew == NULL) {
tscError("0x%"PRIx64":CDEL new subdelete failed.", pSql->self); tscError("0x%"PRIx64":CDEL new subdelete failed.", pSql->self);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -153,7 +153,6 @@ SSqlObj *tscCreateSTableSubDelete(SSqlObj *pSql, SVgroupMsg* pVgroupMsg, SRetrie ...@@ -153,7 +153,6 @@ SSqlObj *tscCreateSTableSubDelete(SSqlObj *pSql, SVgroupMsg* pVgroupMsg, SRetrie
// update vgroup id // update vgroup id
writeMsgVgId(pNewCmd->payload ,pVgroupMsg->vgId); writeMsgVgId(pNewCmd->payload ,pVgroupMsg->vgId);
tsem_init(&pNew->rspSem, 0, 0);
registerSqlObj(pNew); registerSqlObj(pNew);
tscDebug("0x%"PRIx64":CDEL new sub insertion: %p", pSql->self, pNew); tscDebug("0x%"PRIx64":CDEL new sub insertion: %p", pSql->self, pNew);
...@@ -196,24 +195,26 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -196,24 +195,26 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
return TSDB_CODE_VND_INVALID_VGROUP_ID; return TSDB_CODE_VND_INVALID_VGROUP_ID;
} }
SSubqueryState *pState = &pSql->subState;
int32_t numOfSub = pTableMetaInfo->vgroupList->numOfVgroups; int32_t numOfSub = pTableMetaInfo->vgroupList->numOfVgroups;
if(numOfSub == 0) { if(numOfSub == 0) {
tscInfo(":CDEL SQL:%p tablename=%s numOfVgroups is zero, maybe empty table.", pSql, pTableMetaInfo->name.tname); tscInfo(":CDEL SQL:%p tablename=%s numOfVgroups is zero, maybe empty table.", pSql, pTableMetaInfo->name.tname);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
ret = doInitSubState(pSql, numOfSub); ret = doReInitSubState(pSql, numOfSub);
if (ret != 0) { if (ret != TSDB_CODE_SUCCESS) {
tscAsyncResultOnError(pSql); tscAsyncResultOnError(pSql);
return ret; return ret;
} }
tscDebug("0x%"PRIx64":CDEL retrieved query data from %d vnode(s)", pSql->self, pState->numOfSub); tscDebug("0x%"PRIx64":CDEL retrieved query data from %d vnode(s)", pSql->self, pSql->subState.numOfSub);
pRes->code = TSDB_CODE_SUCCESS; pRes->code = TSDB_CODE_SUCCESS;
int32_t i; int32_t i = 0;
for (i = 0; i < pState->numOfSub; ++i) {
{ pthread_mutex_lock(&pSql->subState.mutex);
for (i = 0; i < pSql->subState.numOfSub; ++i) {
// vgroup // vgroup
SVgroupMsg* pVgroupMsg = &pTableMetaInfo->vgroupList->vgroups[i]; SVgroupMsg* pVgroupMsg = &pTableMetaInfo->vgroupList->vgroups[i];
...@@ -236,22 +237,19 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -236,22 +237,19 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
pSql->pSubs[i] = pNew; pSql->pSubs[i] = pNew;
} }
if (i < pState->numOfSub) { if (i < pSql->subState.numOfSub) {
tscError("0x%"PRIx64":CDEL failed to prepare subdelete structure and launch subqueries", pSql->self); tscError("0x%"PRIx64":CDEL failed to prepare subdelete structure and launch subqueries", pSql->self);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
doCleanupSubqueries(pSql, i);
return pRes->code; // free all allocated resource
} }
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { pthread_mutex_unlock(&pSql->subState.mutex); }
doCleanupSubqueries(pSql, i);
if (pRes->code != TSDB_CODE_SUCCESS) {
doCleanupSubqueries(pSql);
return pRes->code; return pRes->code;
} }
// send sub sql // send sub sql
doConcurrentlySendSubQueries(pSql); doConcurrentlySendSubQueries(pSql);
//return TSDB_CODE_TSC_QUERY_CANCELLED;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -614,7 +614,7 @@ static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, ch ...@@ -614,7 +614,7 @@ static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, ch
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta; STableMeta * pMeta = pTableMetaInfo->pTableMeta;
SSqlObj *pInterSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pInterSql = tscAllocSqlObj();
if (pInterSql == NULL) { if (pInterSql == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
...@@ -766,7 +766,7 @@ static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) { ...@@ -766,7 +766,7 @@ static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSqlObj *pInterSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pInterSql = tscAllocSqlObj();
if (pInterSql == NULL) { if (pInterSql == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
......
...@@ -558,7 +558,7 @@ static int32_t getSuperTableMetaFromLocalCache(TAOS* taos, char* tableName, STab ...@@ -558,7 +558,7 @@ static int32_t getSuperTableMetaFromLocalCache(TAOS* taos, char* tableName, STab
int32_t code = 0; int32_t code = 0;
STableMeta* tableMeta = NULL; STableMeta* tableMeta = NULL;
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = tscAllocSqlObj();
if (pSql == NULL) { if (pSql == NULL) {
tscError("SML:0x%" PRIx64 " failed to allocate memory, reason:%s", info->id, strerror(errno)); tscError("SML:0x%" PRIx64 " failed to allocate memory, reason:%s", info->id, strerror(errno));
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -2763,7 +2763,7 @@ static int32_t convertPrecisionType(int precision, SMLTimeStampType *tsType) { ...@@ -2763,7 +2763,7 @@ static int32_t convertPrecisionType(int precision, SMLTimeStampType *tsType) {
//make a dummy SSqlObj //make a dummy SSqlObj
static SSqlObj* createSmlQueryObj(TAOS* taos, int32_t affected_rows, int32_t code) { static SSqlObj* createSmlQueryObj(TAOS* taos, int32_t affected_rows, int32_t code) {
SSqlObj *pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); SSqlObj *pNew = tscAllocSqlObj();
if (pNew == NULL) { if (pNew == NULL) {
return NULL; return NULL;
} }
...@@ -2771,7 +2771,6 @@ static SSqlObj* createSmlQueryObj(TAOS* taos, int32_t affected_rows, int32_t cod ...@@ -2771,7 +2771,6 @@ static SSqlObj* createSmlQueryObj(TAOS* taos, int32_t affected_rows, int32_t cod
pNew->pTscObj = taos; pNew->pTscObj = taos;
pNew->fp = NULL; pNew->fp = NULL;
tsem_init(&pNew->rspSem, 0, 0);
registerSqlObj(pNew); registerSqlObj(pNew);
pNew->res.numOfRows = affected_rows; pNew->res.numOfRows = affected_rows;
......
...@@ -1587,7 +1587,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { ...@@ -1587,7 +1587,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
} }
pStmt->taos = pObj; pStmt->taos = pObj;
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = tscAllocSqlObj();
if (pSql == NULL) { if (pSql == NULL) {
free(pStmt); free(pStmt);
...@@ -1604,7 +1604,6 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { ...@@ -1604,7 +1604,6 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
return NULL; return NULL;
} }
tsem_init(&pSql->rspSem, 0, 0);
pSql->signature = pSql; pSql->signature = pSql;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_MAX_REPLICA; pSql->maxRetry = TSDB_MAX_REPLICA;
......
...@@ -281,7 +281,8 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { ...@@ -281,7 +281,8 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
// } else { // } else {
// pQdesc->stableQuery = 0; // pQdesc->stableQuery = 0;
// } // }
pthread_mutex_lock(&pSql->subState.mutex); { pthread_mutex_lock(&pSql->subState.mutex);
if (pSql->pSubs != NULL && pSql->subState.states != NULL) { if (pSql->pSubs != NULL && pSql->subState.states != NULL) {
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
// because subState maybe free on anytime by any thread, check validate from here // because subState maybe free on anytime by any thread, check validate from here
...@@ -298,7 +299,8 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { ...@@ -298,7 +299,8 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
} }
} }
pQdesc->numOfSub = pSql->subState.numOfSub; pQdesc->numOfSub = pSql->subState.numOfSub;
pthread_mutex_unlock(&pSql->subState.mutex);
pthread_mutex_unlock(&pSql->subState.mutex); }
} }
pQdesc->numOfSub = htonl(pQdesc->numOfSub); pQdesc->numOfSub = htonl(pQdesc->numOfSub);
......
...@@ -353,8 +353,8 @@ void checkBrokenQueries(STscObj *pTscObj) { ...@@ -353,8 +353,8 @@ void checkBrokenQueries(STscObj *pTscObj) {
pSql->lastAlive = taosGetTimestampMs(); pSql->lastAlive = taosGetTimestampMs();
} }
} else { } else {
// lock subs { pthread_mutex_lock(&pSql->subState.mutex);
pthread_mutex_lock(&pSql->subState.mutex);
if (pSql->pSubs) { if (pSql->pSubs) {
// have sub sql // have sub sql
for (int i = 0; i < pSql->subState.numOfSub; i++) { for (int i = 0; i < pSql->subState.numOfSub; i++) {
...@@ -375,8 +375,8 @@ void checkBrokenQueries(STscObj *pTscObj) { ...@@ -375,8 +375,8 @@ void checkBrokenQueries(STscObj *pTscObj) {
} }
} }
} }
// unlock
pthread_mutex_unlock(&pSql->subState.mutex); pthread_mutex_unlock(&pSql->subState.mutex); }
} }
// kill query // kill query
...@@ -2810,7 +2810,7 @@ static void createHbObj(STscObj* pObj) { ...@@ -2810,7 +2810,7 @@ static void createHbObj(STscObj* pObj) {
return; return;
} }
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pSql = tscAllocSqlObj();
if (NULL == pSql) return; if (NULL == pSql) return;
pSql->fp = tscProcessHeartBeatRsp; pSql->fp = tscProcessHeartBeatRsp;
...@@ -3083,7 +3083,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { ...@@ -3083,7 +3083,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code); void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool autocreate) { static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool autocreate) {
SSqlObj *pNew = calloc(1, sizeof(SSqlObj)); SSqlObj *pNew = tscAllocSqlObj();
if (NULL == pNew) { if (NULL == pNew) {
tscError("0x%"PRIx64" malloc failed for new sqlobj to get table meta", pSql->self); tscError("0x%"PRIx64" malloc failed for new sqlobj to get table meta", pSql->self);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -3149,7 +3149,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn ...@@ -3149,7 +3149,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
} }
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, SArray* pUdfList, __async_cb_func_t fp, bool metaClone) { int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, SArray* pUdfList, __async_cb_func_t fp, bool metaClone) {
SSqlObj *pNew = calloc(1, sizeof(SSqlObj)); SSqlObj *pNew = tscAllocSqlObj();
if (NULL == pNew) { if (NULL == pNew) {
tscError("0x%"PRIx64" failed to allocate sqlobj to get multiple table meta", pSql->self); tscError("0x%"PRIx64" failed to allocate sqlobj to get multiple table meta", pSql->self);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -3286,7 +3286,7 @@ int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create ...@@ -3286,7 +3286,7 @@ int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create
} }
int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo) { int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
SSqlObj *pNew = calloc(1, sizeof(SSqlObj)); SSqlObj *pNew = tscAllocSqlObj();
if (NULL == pNew) { if (NULL == pNew) {
tscError("%p malloc failed for new sqlobj to get user-defined functions", pSql); tscError("%p malloc failed for new sqlobj to get user-defined functions", pSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -3400,9 +3400,7 @@ int tscRenewTableMeta(SSqlObj *pSql) { ...@@ -3400,9 +3400,7 @@ int tscRenewTableMeta(SSqlObj *pSql) {
pSql->rootObj->retryReason = pSql->retryReason; pSql->rootObj->retryReason = pSql->retryReason;
SSqlObj *rootSql = pSql->rootObj; SSqlObj *rootSql = pSql->rootObj;
pthread_mutex_lock(&rootSql->mtxSubs);
tscFreeSubobj(rootSql); tscFreeSubobj(rootSql);
pthread_mutex_unlock(&rootSql->mtxSubs);
tscResetSqlCmd(&rootSql->cmd, true, rootSql->self); tscResetSqlCmd(&rootSql->cmd, true, rootSql->self);
code = getMultiTableMetaFromMnode(rootSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true); code = getMultiTableMetaFromMnode(rootSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true);
...@@ -3429,7 +3427,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, SQueryInfo* pQueryInfo) { ...@@ -3429,7 +3427,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
if (allVgroupInfoRetrieved(pQueryInfo)) { if (allVgroupInfoRetrieved(pQueryInfo)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSqlObj *pNew = calloc(1, sizeof(SSqlObj)); SSqlObj *pNew = tscAllocSqlObj();
pNew->pTscObj = pSql->pTscObj; pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew; pNew->signature = pNew;
......
...@@ -149,7 +149,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa ...@@ -149,7 +149,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
pthread_mutex_init(&pObj->mutex, NULL); pthread_mutex_init(&pObj->mutex, NULL);
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pSql = tscAllocSqlObj();
if (NULL == pSql) { if (NULL == pSql) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscReleaseRpc(pRpcObj); tscReleaseRpc(pRpcObj);
...@@ -164,8 +164,6 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa ...@@ -164,8 +164,6 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
pSql->param = param; pSql->param = param;
pSql->cmd.command = TSDB_SQL_CONNECT; pSql->cmd.command = TSDB_SQL_CONNECT;
tsem_init(&pSql->rspSem, 0, 0);
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscReleaseRpc(pRpcObj); tscReleaseRpc(pRpcObj);
...@@ -305,10 +303,6 @@ void taos_close(TAOS *taos) { ...@@ -305,10 +303,6 @@ void taos_close(TAOS *taos) {
tscDebug("0x%"PRIx64" HB is freed", pHb->self); tscDebug("0x%"PRIx64" HB is freed", pHb->self);
taosReleaseRef(tscObjRef, pHb->self); taosReleaseRef(tscObjRef, pHb->self);
#ifdef __APPLE__
// to satisfy later tsem_destroy in taos_free_result
tsem_init(&pHb->rspSem, 0, 0);
#endif // __APPLE__
taos_free_result(pHb); taos_free_result(pHb);
} }
} }
...@@ -363,14 +357,13 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen, int64_t* ...@@ -363,14 +357,13 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen, int64_t*
nPrintTsc("%s", sqlstr); nPrintTsc("%s", sqlstr);
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = tscAllocSqlObj();
if (pSql == NULL) { if (pSql == NULL) {
tscError("failed to malloc sqlObj"); tscError("failed to malloc sqlObj");
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL; return NULL;
} }
tsem_init(&pSql->rspSem, 0, 0);
doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen); doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
if (res != NULL) { if (res != NULL) {
...@@ -755,6 +748,8 @@ static void tscKillSTableQuery(SSqlObj *pSql) { ...@@ -755,6 +748,8 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
tscLockByThread(&pSql->squeryLock); tscLockByThread(&pSql->squeryLock);
{ pthread_mutex_lock(&pSql->subState.mutex);
for (int i = 0; i < pSql->subState.numOfSub; ++i) { for (int i = 0; i < pSql->subState.numOfSub; ++i) {
// NOTE: pSub may have been released already here // NOTE: pSub may have been released already here
SSqlObj *pSub = pSql->pSubs[i]; SSqlObj *pSub = pSql->pSubs[i];
...@@ -774,6 +769,8 @@ static void tscKillSTableQuery(SSqlObj *pSql) { ...@@ -774,6 +769,8 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
// taosRelekaseRef(tscObjRef, pSubObj->self); // taosRelekaseRef(tscObjRef, pSubObj->self);
} }
pthread_mutex_unlock(&pSql->subState.mutex); }
if (pSql->subState.numOfSub <= 0) { if (pSql->subState.numOfSub <= 0) {
tscAsyncResultOnError(pSql); tscAsyncResultOnError(pSql);
} }
...@@ -1047,7 +1044,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) { ...@@ -1047,7 +1044,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
return TSDB_CODE_TSC_DISCONNECTED; return TSDB_CODE_TSC_DISCONNECTED;
} }
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = tscAllocSqlObj();
pSql->pTscObj = taos; pSql->pTscObj = taos;
pSql->signature = pSql; pSql->signature = pSql;
...@@ -1165,7 +1162,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -1165,7 +1162,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = tscAllocSqlObj();
tscAllocPayload(&pSql->cmd, 1024); tscAllocPayload(&pSql->cmd, 1024);
pSql->pTscObj = taos; pSql->pTscObj = taos;
......
...@@ -1002,7 +1002,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, int32_t ...@@ -1002,7 +1002,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, int32_t
return NULL; return NULL;
} }
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pSql = tscAllocSqlObj();
if (pSql == NULL) { if (pSql == NULL) {
return NULL; return NULL;
} }
...@@ -1054,7 +1054,6 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, int32_t ...@@ -1054,7 +1054,6 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, int32_t
pSql->fetchFp = tscCreateStream; pSql->fetchFp = tscCreateStream;
pSql->cmd.resColumnId = TSDB_RES_COL_ID; pSql->cmd.resColumnId = TSDB_RES_COL_ID;
tsem_init(&pSql->rspSem, 0, 0);
registerSqlObj(pSql); registerSqlObj(pSql);
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr); tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
......
...@@ -83,7 +83,6 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) { ...@@ -83,7 +83,6 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) {
} }
} }
static void asyncCallback(void *param, TAOS_RES *tres, int code) { static void asyncCallback(void *param, TAOS_RES *tres, int code) {
assert(param != NULL); assert(param != NULL);
SSub *pSub = ((SSub *)param); SSub *pSub = ((SSub *)param);
...@@ -117,7 +116,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* ...@@ -117,7 +116,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
goto fail; goto fail;
} }
pSql = calloc(1, sizeof(SSqlObj)); pSql = tscAllocSqlObj();
if (pSql == NULL) { if (pSql == NULL) {
line = __LINE__; line = __LINE__;
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -132,11 +131,6 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* ...@@ -132,11 +131,6 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
if (tsem_init(&pSql->rspSem, 0, 0) == -1) {
line = __LINE__;
code = TAOS_SYSTEM_ERROR(errno);
goto fail;
}
pSql->param = pSub; pSql->param = pSub;
pSql->maxRetry = TSDB_MAX_REPLICA; pSql->maxRetry = TSDB_MAX_REPLICA;
...@@ -432,7 +426,7 @@ TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char ...@@ -432,7 +426,7 @@ TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char
} }
SSqlObj* recreateSqlObj(SSub* pSub) { SSqlObj* recreateSqlObj(SSub* pSub) {
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = tscAllocSqlObj();
if (pSql == NULL) { if (pSql == NULL) {
return NULL; return NULL;
} }
...@@ -442,10 +436,6 @@ SSqlObj* recreateSqlObj(SSub* pSub) { ...@@ -442,10 +436,6 @@ SSqlObj* recreateSqlObj(SSub* pSub) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
if (tsem_init(&pSql->rspSem, 0, 0) == -1) {
tscFreeSqlObj(pSql);
return NULL;
}
pSql->param = pSub; pSql->param = pSub;
pSql->maxRetry = TSDB_MAX_REPLICA; pSql->maxRetry = TSDB_MAX_REPLICA;
......
...@@ -66,16 +66,14 @@ static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) { ...@@ -66,16 +66,14 @@ static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) {
} }
} }
static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) { static void subquerySetStateWithoutLock(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) {
assert(idx < subState->numOfSub && subState->states != NULL); assert(idx < subState->numOfSub && subState->states != NULL);
tscDebug("subquery:0x%"PRIx64",%d state set to %d", pSql->self, idx, state); tscDebug("subquery:0x%"PRIx64",%d state set to %d", pSql->self, idx, state);
pthread_mutex_lock(&subState->mutex);
subState->states[idx] = state; subState->states[idx] = state;
pthread_mutex_unlock(&subState->mutex);
} }
static bool allSubqueryDone(SSqlObj *pParentSql) { static bool allSubqueryDoneWithoutLock(SSqlObj *pParentSql) {
bool done = true; bool done = true;
SSubqueryState *subState = &pParentSql->subState; SSubqueryState *subState = &pParentSql->subState;
...@@ -107,7 +105,7 @@ bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) { ...@@ -107,7 +105,7 @@ bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
tscDebug("0x%"PRIx64" subquery:0x%"PRIx64", index:%d state set to 1", pParentSql->self, pSql->self, idx); tscDebug("0x%"PRIx64" subquery:0x%"PRIx64", index:%d state set to 1", pParentSql->self, pSql->self, idx);
subState->states[idx] = 1; subState->states[idx] = 1;
} }
bool done = allSubqueryDone(pParentSql); bool done = allSubqueryDoneWithoutLock(pParentSql);
if (!done) { if (!done) {
tscDebug("0x%"PRIx64" sub:%p,%d completed, total:%d", pParentSql->self, pSql, idx, pParentSql->subState.numOfSub); tscDebug("0x%"PRIx64" sub:%p,%d completed, total:%d", pParentSql->self, pSql, idx, pParentSql->subState.numOfSub);
} }
...@@ -115,8 +113,6 @@ bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) { ...@@ -115,8 +113,6 @@ bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
return done; return done;
} }
static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) { static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd); SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
...@@ -125,7 +121,6 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) { ...@@ -125,7 +121,6 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
SLimitVal* pLimit = &pQueryInfo->limit; SLimitVal* pLimit = &pQueryInfo->limit;
int32_t order = pQueryInfo->order.order; int32_t order = pQueryInfo->order.order;
int32_t joinNum = pSql->subState.numOfSub;
SMergeTsCtx ctxlist[TSDB_MAX_JOIN_TABLE_NUM] = {{0}}; SMergeTsCtx ctxlist[TSDB_MAX_JOIN_TABLE_NUM] = {{0}};
SMergeTsCtx* ctxStack[TSDB_MAX_JOIN_TABLE_NUM] = {0}; SMergeTsCtx* ctxStack[TSDB_MAX_JOIN_TABLE_NUM] = {0};
int32_t slot = 0; int32_t slot = 0;
...@@ -140,25 +135,39 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) { ...@@ -140,25 +135,39 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
STSElem prev; STSElem prev;
SArray* tsCond = NULL; SArray* tsCond = NULL;
int32_t mergeDone = 0; int32_t mergeDone = 0;
int32_t joinNum = 0;
int errflag = 0;
{ pthread_mutex_lock(&pSql->subState.mutex);
joinNum = pSql->subState.numOfSub;
for (int32_t i = 0; i < joinNum; ++i) { for (int32_t i = 0; i < joinNum; ++i) {
SSqlObj *pSub = pSql->pSubs[i];
if (!pSub) {
tscError("0x%"PRIx64" the %d'th of the (%d) sub queries is null.", pSql->self, i, joinNum);
errflag = 1;
break;
}
STSBuf* output = tsBufCreate(true, pQueryInfo->order.order); STSBuf* output = tsBufCreate(true, pQueryInfo->order.order);
SQueryInfo* pSubQueryInfo = tscGetQueryInfo(&pSql->pSubs[i]->cmd); SQueryInfo* pSubQueryInfo = tscGetQueryInfo(&pSub->cmd);
pSubQueryInfo->tsBuf = output; pSubQueryInfo->tsBuf = output;
SJoinSupporter* pSupporter = pSql->pSubs[i]->param; SJoinSupporter* pSupporter = pSub->param;
if (pSupporter->pTSBuf == NULL) { if (pSupporter->pTSBuf == NULL) {
tscDebug("0x%"PRIx64" at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql->self); tscDebug("0x%"PRIx64" at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql->self);
return 0; errflag = 1;
break;
} }
tsBufResetPos(pSupporter->pTSBuf); tsBufResetPos(pSupporter->pTSBuf);
if (!tsBufNextPos(pSupporter->pTSBuf)) { if (!tsBufNextPos(pSupporter->pTSBuf)) {
tscDebug("0x%"PRIx64" input1 is empty, 0 for secondary query after ts blocks intersecting", pSql->self); tscDebug("0x%"PRIx64" input1 is empty, 0 for secondary query after ts blocks intersecting", pSql->self);
return 0; errflag = 1;
break;
} }
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" table idx:%d, input group number:%d", pSql->self, tscDebug("0x%"PRIx64" sub:0x%"PRIx64" table idx:%d, input group number:%d", pSql->self,
...@@ -168,6 +177,11 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) { ...@@ -168,6 +177,11 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
ctxlist[i].res = output; ctxlist[i].res = output;
} }
pthread_mutex_unlock(&pSql->subState.mutex); }
if (errflag) {
return 0;
}
TSKEY st = taosGetTimestampUs(); TSKEY st = taosGetTimestampUs();
for (int16_t tidx = 0; tidx < joinNum; tidx++) { for (int16_t tidx = 0; tidx < joinNum; tidx++) {
...@@ -523,6 +537,10 @@ static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupT ...@@ -523,6 +537,10 @@ static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupT
static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
int32_t numOfSub = 0; int32_t numOfSub = 0;
SJoinSupporter* pSupporter = NULL; SJoinSupporter* pSupporter = NULL;
bool success = true;
uint32_t stateVersion = 0;
{ pthread_mutex_lock(&pSql->subState.mutex);
//If the columns are not involved in the final select clause, the corresponding query will not be issued. //If the columns are not involved in the final select clause, the corresponding query will not be issued.
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
...@@ -537,10 +555,9 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -537,10 +555,9 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
// scan all subquery, if one sub query has only ts, ignore it // scan all subquery, if one sub query has only ts, ignore it
tscDebug("0x%"PRIx64" start to launch secondary subqueries, %d out of %d needs to query", pSql->self, numOfSub, pSql->subState.numOfSub); tscDebug("0x%"PRIx64" start to launch secondary subqueries, %d out of %d needs to query", pSql->self, numOfSub, pSql->subState.numOfSub);
bool success = true;
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj *pPrevSub = pSql->pSubs[i]; SSqlObj *pPrevSub = pSql->pSubs[i];
if (!pPrevSub) continue;
pSql->pSubs[i] = NULL; pSql->pSubs[i] = NULL;
pSupporter = pPrevSub->param; pSupporter = pPrevSub->param;
...@@ -664,7 +681,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -664,7 +681,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
pQueryInfo->stableQuery = true; pQueryInfo->stableQuery = true;
} }
subquerySetState(pNew, &pSql->subState, i, 0); subquerySetStateWithoutLock(pNew, &pSql->subState, i, 0);
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
tscDebug("0x%"PRIx64" subquery:0x%"PRIx64" tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s", tscDebug("0x%"PRIx64" subquery:0x%"PRIx64" tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
...@@ -672,34 +689,46 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -672,34 +689,46 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name)); numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));
} }
stateVersion = tscGetVersionOfSubStateWithoutLock(pSql);
pthread_mutex_unlock(&pSql->subState.mutex); }
//prepare the subqueries object failed, abort //prepare the subqueries object failed, abort
if (!success) { if (!success) {
pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscError("0x%"PRIx64" failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql->self, tscError("0x%"PRIx64" failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql->self,
pSql->subState.numOfSub, pSql->res.code); pSql->subState.numOfSub, pSql->res.code);
freeJoinSubqueryObj(pSql); goto _error;
return pSql->res.code;
} }
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
if (pSql->pSubs[i] == NULL) { SSqlObj *pSub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref
if (pSub == NULL) {
if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) {
continue; continue;
} }
pSql->res.code = TSDB_CODE_FAILED;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->pSubs[i]->cmd); tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub);
executeQuery(pSql->pSubs[i], pQueryInfo); goto _error;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd);
executeQuery(pSub, pQueryInfo);
tscReleaseRefOfSubobj(pSub); // REL ref
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error:
freeJoinSubqueryObj(pSql);
return pSql->res.code;
} }
void freeJoinSubqueryObj(SSqlObj* pSql) { void freeJoinSubqueryObj(SSqlObj* pSql) {
pthread_mutex_lock(&pSql->subState.mutex);
if (pSql->subState.numOfSub == 0) { if (pSql->subState.numOfSub == 0) {
return; goto _out;
} }
pSql->subState.version ++;
pthread_mutex_lock(&pSql->subState.mutex);
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
...@@ -713,13 +742,12 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { ...@@ -713,13 +742,12 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
taos_free_result(pSub); taos_free_result(pSub);
pSql->pSubs[i] = NULL; pSql->pSubs[i] = NULL;
} }
tfree(pSql->pSubs);
tfree(pSql->subState.states); tfree(pSql->subState.states);
pSql->subState.numOfSub = 0; pSql->subState.numOfSub = 0;
_out:
pthread_mutex_unlock(&pSql->subState.mutex); pthread_mutex_unlock(&pSql->subState.mutex);
pthread_mutex_destroy(&pSql->subState.mutex);
} }
static int32_t quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { static int32_t quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
...@@ -1031,17 +1059,23 @@ static int32_t tidTagsMergeSort(SArray *arr, int32_t start, int32_t end, const i ...@@ -1031,17 +1059,23 @@ static int32_t tidTagsMergeSort(SArray *arr, int32_t start, int32_t end, const i
} }
static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray* resList) { static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray* resList) {
int16_t joinNum = pParentSql->subState.numOfSub;
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid); int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
SJoinSupporter* p0 = pParentSql->pSubs[0]->param;
SMergeCtx ctxlist[TSDB_MAX_JOIN_TABLE_NUM] = {{0}}; SMergeCtx ctxlist[TSDB_MAX_JOIN_TABLE_NUM] = {{0}};
SMergeCtx* ctxStack[TSDB_MAX_JOIN_TABLE_NUM] = {0}; SMergeCtx* ctxStack[TSDB_MAX_JOIN_TABLE_NUM] = {0};
SSchema* pColSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
// int16_t for padding int16_t joinNum = 0;
int32_t size = p0->tagSize - sizeof(int16_t); SJoinSupporter* p0 = NULL;
int32_t size = 0;
int32_t code = TSDB_CODE_SUCCESS;
SSchema* pColSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId); { pthread_mutex_lock(&pParentSql->subState.mutex);
joinNum = pParentSql->subState.numOfSub;
p0 = pParentSql->pSubs[0]->param;
// int16_t for padding
size = p0->tagSize - sizeof(int16_t);
tscDebug("0x%"PRIx64" all subquery retrieve <tid, tags> complete, do tags match", pParentSql->self); tscDebug("0x%"PRIx64" all subquery retrieve <tid, tags> complete, do tags match", pParentSql->self);
...@@ -1064,8 +1098,14 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar ...@@ -1064,8 +1098,14 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
for (int32_t j = 0; j <= i; j++) { for (int32_t j = 0; j <= i; j++) {
taosArrayDestroy(&ctxlist[j].res); taosArrayDestroy(&ctxlist[j].res);
} }
return TSDB_CODE_QRY_DUP_JOIN_KEY; code = TSDB_CODE_QRY_DUP_JOIN_KEY;
break;
}
} }
pthread_mutex_unlock(&pParentSql->subState.mutex); }
if (code != TSDB_CODE_SUCCESS) {
return code;
} }
int32_t slot = 0; int32_t slot = 0;
...@@ -1384,22 +1424,34 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -1384,22 +1424,34 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
(*pParentSql->fp)(pParentSql->param, pParentSql, 0); (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
} else { } else {
uint32_t stateVersion = tscGetVersionOfSubStateWithoutLock(pParentSql);
for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) { for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) {
SSqlObj *psub = tscAcquireRefOfSubobj(pParentSql, m, stateVersion); // ACQ ref
if (!psub) {
if (stateVersion == tscGetVersionOfSubStateWithoutLock(pParentSql)) {
continue;
}
tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub);
break;
}
// proceed to for ts_comp query // proceed to for ts_comp query
SSqlCmd* pSubCmd = &pParentSql->pSubs[m]->cmd; SSqlCmd* pSubCmd = &psub->cmd;
SArray** s = taosArrayGet(resList, m); SArray** s = taosArrayGet(resList, m);
SQueryInfo* pQueryInfo1 = tscGetQueryInfo(pSubCmd); SQueryInfo* pQueryInfo1 = tscGetQueryInfo(pSubCmd);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo1, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo1, 0);
tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo, *s); tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo, *s);
SSqlObj* psub = pParentSql->pSubs[m];
((SJoinSupporter*)psub->param)->pVgroupTables = tscVgroupTableInfoDup(pTableMetaInfo->pVgroupTables); ((SJoinSupporter*)psub->param)->pVgroupTables = tscVgroupTableInfoDup(pTableMetaInfo->pVgroupTables);
memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub); tscResetAllSubStates(pParentSql);
tscDebug("0x%"PRIx64" reset all sub states to 0", pParentSql->self); tscDebug("0x%"PRIx64" reset all sub states to 0", pParentSql->self);
issueTsCompQuery(psub, psub->param, pParentSql); issueTsCompQuery(psub, psub->param, pParentSql);
tscReleaseRefOfSubobj(psub); // REL ref
} }
} }
...@@ -1695,12 +1747,14 @@ _return: ...@@ -1695,12 +1747,14 @@ _return:
} }
void tscFetchDatablockForSubquery(SSqlObj* pSql) { void tscFetchDatablockForSubquery(SSqlObj* pSql) {
assert(pSql->subState.numOfSub >= 1);
int32_t numOfFetch = 0; int32_t numOfFetch = 0;
bool hasData = true; bool hasData = true;
bool reachLimit = false; bool reachLimit = false;
{ pthread_mutex_lock(&pSql->subState.mutex);
assert(pSql->subState.numOfSub >= 1);
// if the subquery is NULL, it does not involved in the final result generation // if the subquery is NULL, it does not involved in the final result generation
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
...@@ -1731,6 +1785,8 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1731,6 +1785,8 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
} }
} }
pthread_mutex_unlock(&pSql->subState.mutex); }
// has data remains in client side, and continue to return data to app // has data remains in client side, and continue to return data to app
if (hasData) { if (hasData) {
tscBuildResFromSubqueries(pSql); tscBuildResFromSubqueries(pSql);
...@@ -1754,8 +1810,11 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1754,8 +1810,11 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
if (numOfFetch <= 0) { if (numOfFetch <= 0) {
bool tryNextVnode = false; bool tryNextVnode = false;
bool orderedPrjQuery = false; bool orderedPrjQuery = false;
uint32_t stateVersion = 0;
{ pthread_mutex_lock(&pSql->subState.mutex);
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) { if (pSub == NULL) {
...@@ -1769,22 +1828,29 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1769,22 +1828,29 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
} }
} }
if (orderedPrjQuery) { if (orderedPrjQuery) {
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) { if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) {
subquerySetState(pSub, &pSql->subState, i, 0); subquerySetStateWithoutLock(pSub, &pSql->subState, i, 0);
} }
} }
} }
stateVersion = tscGetVersionOfSubStateWithoutLock(pSql);
pthread_mutex_unlock(&pSql->subState.mutex); }
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref
if (pSub == NULL) { if (pSub == NULL) {
if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) {
continue; continue;
} }
tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub);
pSql->res.code = TSDB_CODE_FAILED;
break;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd); SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd);
...@@ -1813,6 +1879,8 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1813,6 +1879,8 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
tscDebug("0x%"PRIx64" no result in current subquery anymore", pSub->self); tscDebug("0x%"PRIx64" no result in current subquery anymore", pSub->self);
} }
} }
tscReleaseRefOfSubobj(pSub); // REL ref
} }
if (tryNextVnode) { if (tryNextVnode) {
...@@ -1836,25 +1904,35 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1836,25 +1904,35 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
tscDebug("0x%"PRIx64" retrieve data from %d subqueries", pSql->self, numOfFetch); tscDebug("0x%"PRIx64" retrieve data from %d subqueries", pSql->self, numOfFetch);
SJoinSupporter* pSupporter = NULL; SJoinSupporter* pSupporter = NULL;
uint32_t stateVersion = 0;
{ pthread_mutex_lock(&pSql->subState.mutex);
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSql1 = pSql->pSubs[i]; SSqlObj* pSql1 = pSql->pSubs[i];
if (pSql1 == NULL) { if (pSql1 == NULL) {
continue; continue;
} }
SSqlRes* pRes1 = &pSql1->res; SSqlRes* pRes1 = &pSql1->res;
if (pRes1->row >= pRes1->numOfRows && !pRes1->completed) { if (pRes1->row >= pRes1->numOfRows && !pRes1->completed) {
subquerySetState(pSql1, &pSql->subState, i, 0); subquerySetStateWithoutLock(pSql1, &pSql->subState, i, 0);
} }
} }
stateVersion = tscGetVersionOfSubStateWithoutLock(pSql);
pthread_mutex_unlock(&pSql->subState.mutex); }
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSql1 = pSql->pSubs[i]; SSqlObj* pSql1 = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref
if (pSql1 == NULL) { if (pSql1 == NULL) {
if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) {
continue; continue;
} }
tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub);
break;
}
SSqlRes* pRes1 = &pSql1->res; SSqlRes* pRes1 = &pSql1->res;
SSqlCmd* pCmd1 = &pSql1->cmd; SSqlCmd* pCmd1 = &pSql1->cmd;
...@@ -1880,6 +1958,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1880,6 +1958,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
tscBuildAndSendRequest(pSql1, NULL); tscBuildAndSendRequest(pSql1, NULL);
} }
tscReleaseRefOfSubobj(pSql1); // REF ref
} }
} }
...@@ -2045,13 +2124,6 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter ...@@ -2045,13 +2124,6 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
pSql->res.qId = 0x1; pSql->res.qId = 0x1;
assert(pSql->res.numOfRows == 0); assert(pSql->res.numOfRows == 0);
if (pSql->pSubs == NULL) {
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
if (pSql->pSubs == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL); SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL);
if (pNew == NULL) { if (pNew == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -2186,33 +2258,30 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -2186,33 +2258,30 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (pSql->subState.states == NULL) { code = doReInitSubState(pSql, pQueryInfo->numOfTables);
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states)); if (code != TSDB_CODE_SUCCESS) {
if (pSql->subState.states == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; goto _error;
} }
pthread_mutex_init(&pSql->subState.mutex, NULL); uint32_t stateVersion = 0;
} int errflag = 0;
pSql->subState.numOfSub = pQueryInfo->numOfTables; { pthread_mutex_lock(&pSql->subState.mutex);
memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
tscDebug("0x%"PRIx64" reset all sub states to 0, start subquery, total:%d", pSql->self, pQueryInfo->numOfTables);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i); SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i);
if (pSupporter == NULL) { // failed to create support struct, abort current query if (pSupporter == NULL) { // failed to create support struct, abort current query
tscError("0x%"PRIx64" tableIndex:%d, failed to allocate join support object, abort further query", pSql->self, i); tscError("0x%"PRIx64" tableIndex:%d, failed to allocate join support object, abort further query", pSql->self, i);
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; errflag = 1;
break;
} }
code = tscCreateJoinSubquery(pSql, i, pSupporter); code = tscCreateJoinSubquery(pSql, i, pSupporter);
if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query
tscDestroyJoinSupporter(pSupporter); tscDestroyJoinSupporter(pSupporter);
goto _error; errflag = 1;
break;
} }
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
...@@ -2223,13 +2292,27 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -2223,13 +2292,27 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
} }
} }
stateVersion = tscGetVersionOfSubStateWithoutLock(pSql);
pthread_mutex_unlock(&pSql->subState.mutex); }
if (errflag) {
goto _error;
}
if (pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { // at least one subquery is empty, do nothing and return if (pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { // at least one subquery is empty, do nothing and return
freeJoinSubqueryObj(pSql); freeJoinSubqueryObj(pSql);
(*pSql->fp)(pSql->param, pSql, 0); (*pSql->fp)(pSql->param, pSql, 0);
} else { } else {
int fail = 0; int fail = 0;
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref
if (!pSub) {
if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) {
continue;
}
code = TSDB_CODE_FAILED;
goto _error;
}
if (fail) { if (fail) {
(*pSub->fp)(pSub->param, pSub, 0); (*pSub->fp)(pSub->param, pSub, 0);
continue; continue;
...@@ -2240,6 +2323,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -2240,6 +2323,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
(*pSub->fp)(pSub->param, pSub, 0); (*pSub->fp)(pSub->param, pSub, 0);
fail = 1; fail = 1;
} }
tscReleaseRefOfSubobj(pSub); // REL ref
} }
if(fail) { if(fail) {
...@@ -2251,22 +2335,17 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -2251,22 +2335,17 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
return; return;
_error: _error:
pRes->code = code; pRes->code = code;
tscAsyncResultOnError(pSql); tscAsyncResultOnError(pSql);
} }
void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) { void doCleanupSubqueries(SSqlObj *pSql) {
pthread_mutex_lock(&pSql->subState.mutex); pthread_mutex_lock(&pSql->subState.mutex);
if (numOfSubs > pSql->subState.numOfSub || numOfSubs <= 0 || pSql->subState.numOfSub <= 0) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
pthread_mutex_unlock(&pSql->subState.mutex);
return;
}
for(int32_t i = 0; i < numOfSubs; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
assert(pSub != NULL); if (!pSub) continue;
pSql->pSubs[i] = NULL;
tscFreeRetrieveSup(&pSub->param); tscFreeRetrieveSup(&pSub->param);
...@@ -2377,10 +2456,7 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -2377,10 +2456,7 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscFreeFirstRoundSup(&param); tscFreeFirstRoundSup(&param);
taos_free_result(pSql); taos_free_result(pSql);
pthread_mutex_lock(&pParent->subState.mutex); tscFreeSubobj(pParent);
pParent->subState.numOfSub = 0;
tfree(pParent->pSubs);
pthread_mutex_unlock(&pParent->subState.mutex);
pParent->res.code = code; pParent->res.code = code;
tscAsyncResultOnError(pParent); tscAsyncResultOnError(pParent);
return; return;
...@@ -2483,10 +2559,7 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -2483,10 +2559,7 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
tscFreeFirstRoundSup(&param); tscFreeFirstRoundSup(&param);
taos_free_result(pSql); taos_free_result(pSql);
pthread_mutex_lock(&pParent->subState.mutex); tscFreeSubobj(pParent);
pParent->subState.numOfSub = 0;
tfree(pParent->pSubs);
pthread_mutex_unlock(&pParent->subState.mutex);
if (resRows == 0) { if (resRows == 0) {
pParent->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; pParent->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
...@@ -2509,10 +2582,7 @@ void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) { ...@@ -2509,10 +2582,7 @@ void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) {
tscFreeFirstRoundSup(&param); tscFreeFirstRoundSup(&param);
taos_free_result(pSql); taos_free_result(pSql);
pthread_mutex_lock(&parent->subState.mutex); tscFreeSubobj(parent);
parent->subState.numOfSub = 0;
tfree(parent->pSubs);
pthread_mutex_unlock(&parent->subState.mutex);
parent->res.code = c; parent->res.code = c;
tscAsyncResultOnError(parent); tscAsyncResultOnError(parent);
return; return;
...@@ -2677,14 +2747,11 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { ...@@ -2677,14 +2747,11 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
pSql->self, pNew->self, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pNewQueryInfo->type, pSql->self, pNew->self, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pNewQueryInfo->type,
tscNumOfExprs(pNewQueryInfo), idx+1, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name)); tscNumOfExprs(pNewQueryInfo), idx+1, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));
pSql->pSubs = calloc(1, POINTER_BYTES); int32_t code = doReInitSubState(pSql, 1);
if (pSql->pSubs == NULL) { if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; goto _error;
} }
pSql->subState.numOfSub = 1;
pSql->pSubs[0] = pNew; pSql->pSubs[0] = pNew;
tscHandleMasterSTableQuery(pNew); tscHandleMasterSTableQuery(pNew);
...@@ -2706,17 +2773,19 @@ typedef struct SPair { ...@@ -2706,17 +2773,19 @@ typedef struct SPair {
static void doSendQueryReqs(SSchedMsg* pSchedMsg) { static void doSendQueryReqs(SSchedMsg* pSchedMsg) {
SSqlObj* pSql = pSchedMsg->ahandle; SSqlObj* pSql = pSchedMsg->ahandle;
SPair* p = pSchedMsg->msg; SPair* p = pSchedMsg->msg;
uint32_t stateVersion = tscGetVersionOfSubStateWithoutLock(pSql);
for (int32_t i = p->first; i < p->second; ++i) { for (int32_t i = p->first; i < p->second; ++i) {
if (i >= pSql->subState.numOfSub) { SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref
tfree(p); if (!pSub) {
return; tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub);
break;
} }
SSqlObj* pSub = pSql->pSubs[i];
SRetrieveSupport* pSupport = pSub->param; SRetrieveSupport* pSupport = pSub->param;
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex); tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex);
tscBuildAndSendRequest(pSub, NULL); tscBuildAndSendRequest(pSub, NULL);
tscReleaseRefOfSubobj(pSub); // REL ref
} }
tfree(p); tfree(p);
...@@ -2779,12 +2848,9 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -2779,12 +2848,9 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSubqueryState *pState = &pSql->subState;
int32_t numOfSub = (pTableMetaInfo->pVgroupTables == NULL) ? pTableMetaInfo->vgroupList->numOfVgroups int32_t numOfSub = (pTableMetaInfo->pVgroupTables == NULL) ? pTableMetaInfo->vgroupList->numOfVgroups
: (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); : (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
int32_t ret = doReInitSubState(pSql, numOfSub);
int32_t ret = doInitSubState(pSql, numOfSub);
if (ret != 0) { if (ret != 0) {
tscAsyncResultOnError(pSql); tscAsyncResultOnError(pSql);
return ret; return ret;
...@@ -2799,11 +2865,11 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -2799,11 +2865,11 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
return ret; return ret;
} }
tscDebug("0x%"PRIx64" retrieved query data from %d vnode(s)", pSql->self, pState->numOfSub); tscDebug("0x%"PRIx64" retrieved query data from %d vnode(s)", pSql->self, pSql->subState.numOfSub);
pRes->code = TSDB_CODE_SUCCESS; pRes->code = TSDB_CODE_SUCCESS;
int32_t i = 0; int32_t i = 0;
for (; i < pState->numOfSub; ++i) { for (; i < pSql->subState.numOfSub; ++i) {
SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport)); SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
if (trs == NULL) { if (trs == NULL) {
tscError("0x%"PRIx64" failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno)); tscError("0x%"PRIx64" failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
...@@ -2844,23 +2910,18 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -2844,23 +2910,18 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
trs->subqueryIndex); trs->subqueryIndex);
} }
if (i < pState->numOfSub) { if (i < pSql->subState.numOfSub) {
tscError("0x%"PRIx64" failed to prepare subquery structure and launch subqueries", pSql->self); tscError("0x%"PRIx64" failed to prepare subquery structure and launch subqueries", pSql->self);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc, pState->numOfSub);
doCleanupSubqueries(pSql, i);
return pRes->code; // free all allocated resource
} }
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { if (pRes->code != TSDB_CODE_SUCCESS) {
tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc, pState->numOfSub); tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc, pSql->subState.numOfSub);
doCleanupSubqueries(pSql, i); doCleanupSubqueries(pSql);
return pRes->code; return pRes->code;
} }
doConcurrentlySendSubQueries(pSql); doConcurrentlySendSubQueries(pSql);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3375,13 +3436,16 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { ...@@ -3375,13 +3436,16 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
} }
} }
static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) { static bool needRetryInsert(SSqlObj* pParentObj) {
if (pParentObj->retry > pParentObj->maxRetry) { if (pParentObj->retry > pParentObj->maxRetry) {
tscError("0x%"PRIx64" max retry reached, abort the retry effort", pParentObj->self); tscError("0x%"PRIx64" max retry reached, abort the retry effort", pParentObj->self);
return false; return false;
} }
bool ret = true;
{ pthread_mutex_lock(&pParentObj->subState.mutex);
for (int32_t i = 0; i < numOfSub; ++i) { for (int32_t i = 0; i < pParentObj->subState.numOfSub; ++i) {
int32_t code = pParentObj->pSubs[i]->res.code; int32_t code = pParentObj->pSubs[i]->res.code;
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
continue; continue;
...@@ -3391,22 +3455,23 @@ static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) { ...@@ -3391,22 +3455,23 @@ static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) {
code != TSDB_CODE_VND_INVALID_VGROUP_ID && code != TSDB_CODE_RPC_NETWORK_UNAVAIL && code != TSDB_CODE_VND_INVALID_VGROUP_ID && code != TSDB_CODE_RPC_NETWORK_UNAVAIL &&
code != TSDB_CODE_APP_NOT_READY) { code != TSDB_CODE_APP_NOT_READY) {
pParentObj->res.code = code; pParentObj->res.code = code;
return false; ret = false;
break;
} }
} }
return true; pthread_mutex_unlock(&pParentObj->subState.mutex); }
return ret;
} }
static void doFreeInsertSupporter(SSqlObj* pSqlObj) { static void doFreeInsertSupporter(SSqlObj* pSqlObj) {
if (pSqlObj == NULL || pSqlObj->subState.numOfSub <= 0) { pthread_mutex_lock(&pSqlObj->subState.mutex);
return;
}
for(int32_t i = 0; i < pSqlObj->subState.numOfSub; ++i) { for(int32_t i = 0; i < pSqlObj->subState.numOfSub; ++i) {
SSqlObj* pSql = pSqlObj->pSubs[i]; SSqlObj* pSql = pSqlObj->pSubs[i];
tfree(pSql->param); tfree(pSql->param);
} }
pthread_mutex_unlock(&pSqlObj->subState.mutex);
} }
static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) { static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) {
...@@ -3442,7 +3507,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -3442,7 +3507,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
// restore user defined fp // restore user defined fp
pParentObj->fp = pParentObj->fetchFp; pParentObj->fp = pParentObj->fetchFp;
int32_t numOfSub = pParentObj->subState.numOfSub;
doFreeInsertSupporter(pParentObj); doFreeInsertSupporter(pParentObj);
if (pParentObj->res.code == TSDB_CODE_SUCCESS) { if (pParentObj->res.code == TSDB_CODE_SUCCESS) {
...@@ -3453,14 +3517,18 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -3453,14 +3517,18 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows; int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows;
(*pParentObj->fp)(pParentObj->param, pParentObj, v); (*pParentObj->fp)(pParentObj->param, pParentObj, v);
} else { } else {
if (!needRetryInsert(pParentObj, numOfSub)) { if (!needRetryInsert(pParentObj)) {
tscAsyncResultOnError(pParentObj); tscAsyncResultOnError(pParentObj);
return; return;
} }
int32_t numOfFailed = 0; int32_t numOfFailed = 0;
for(int32_t i = 0; i < numOfSub; ++i) {
{ pthread_mutex_lock(&pParentObj->subState.mutex);
for(int32_t i = 0; i < pParentObj->subState.numOfSub; ++i) {
SSqlObj* pSql = pParentObj->pSubs[i]; SSqlObj* pSql = pParentObj->pSubs[i];
if (!pSql) continue;
if (pSql->res.code != TSDB_CODE_SUCCESS) { if (pSql->res.code != TSDB_CODE_SUCCESS) {
numOfFailed += 1; numOfFailed += 1;
...@@ -3470,14 +3538,16 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -3470,14 +3538,16 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, 0); STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, 0);
tscAddTableMetaInfo(pQueryInfo, &pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL); tscAddTableMetaInfo(pQueryInfo, &pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
subquerySetState(pSql, &pParentObj->subState, i, 0); subquerySetStateWithoutLock(pSql, &pParentObj->subState, i, 0);
tscDebug("0x%"PRIx64", failed sub:%d, %p", pParentObj->self, i, pSql); tscDebug("0x%"PRIx64", failed sub:%d, %p", pParentObj->self, i, pSql);
} }
} }
pthread_mutex_unlock(&pParentObj->subState.mutex); }
tscWarn("0x%"PRIx64" Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d", pParentObj->self, tscWarn("0x%"PRIx64" Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d", pParentObj->self,
pParentObj->res.numOfRows, numOfFailed, numOfSub); pParentObj->res.numOfRows, numOfFailed, pParentObj->subState.numOfSub);
tscDebug("0x%"PRIx64" cleanup %d tableMeta in hashTable before reparse sql", pParentObj->self, pParentObj->cmd.insertParam.numOfTables); tscDebug("0x%"PRIx64" cleanup %d tableMeta in hashTable before reparse sql", pParentObj->self, pParentObj->cmd.insertParam.numOfTables);
for(int32_t i = 0; i < pParentObj->cmd.insertParam.numOfTables; ++i) { for(int32_t i = 0; i < pParentObj->cmd.insertParam.numOfTables; ++i) {
...@@ -3544,8 +3614,15 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -3544,8 +3614,15 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
return pRes->code; return pRes->code;
} }
uint32_t stateVersion = tscGetVersionOfSubStateWithoutLock(pSql);
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref
if (!pSub) {
tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub);
pRes->code = TSDB_CODE_FAILED;
return pRes->code;
}
SInsertSupporter* pSup = calloc(1, sizeof(SInsertSupporter)); SInsertSupporter* pSup = calloc(1, sizeof(SInsertSupporter));
pSup->idx = i; pSup->idx = i;
pSup->pSql = pSql; pSup->pSql = pSql;
...@@ -3555,43 +3632,33 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -3555,43 +3632,33 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
if (pSub->res.code != TSDB_CODE_SUCCESS) { if (pSub->res.code != TSDB_CODE_SUCCESS) {
tscHandleInsertRetry(pSql, pSub); tscHandleInsertRetry(pSql, pSub);
} }
tscReleaseRefOfSubobj(pSub); // REL ref
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pSql->subState.numOfSub = (uint16_t)taosArrayGetSize(pCmd->insertParam.pDataBlocks);
assert(pSql->subState.numOfSub > 0);
pRes->code = TSDB_CODE_SUCCESS; pRes->code = TSDB_CODE_SUCCESS;
// the number of already initialized subqueries int32_t code = doReInitSubState(pSql, (int32_t)taosArrayGetSize(pCmd->insertParam.pDataBlocks));
int32_t numOfSub = 0; if (code != TSDB_CODE_SUCCESS) {
if (pSql->subState.states == NULL) {
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states));
if (pSql->subState.states == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; goto _error;
} }
pthread_mutex_init(&pSql->subState.mutex, NULL); uint32_t stateVersion = 0;
} int32_t numOfSub = 0;
int errflag = 0;
memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
tscDebug("0x%"PRIx64" reset all sub states to 0", pSql->self);
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); { pthread_mutex_lock(&pSql->subState.mutex);
if (pSql->pSubs == NULL) {
goto _error;
}
assert(pSql->subState.numOfSub > 0);
tscDebug("0x%"PRIx64" submit data to %d vnode(s)", pSql->self, pSql->subState.numOfSub); tscDebug("0x%"PRIx64" submit data to %d vnode(s)", pSql->self, pSql->subState.numOfSub);
while(numOfSub < pSql->subState.numOfSub) { while(numOfSub < pSql->subState.numOfSub) {
SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter)); SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter));
if (pSupporter == NULL) { if (pSupporter == NULL) {
goto _error; errflag = 1;
break;
} }
pSupporter->pSql = pSql; pSupporter->pSql = pSql;
...@@ -3600,7 +3667,8 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -3600,7 +3667,8 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT); SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT);
if (pNew == NULL) { if (pNew == NULL) {
tscError("0x%"PRIx64" failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, numOfSub, strerror(errno)); tscError("0x%"PRIx64" failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, numOfSub, strerror(errno));
goto _error; errflag = 1;
break;
} }
/* /*
...@@ -3608,6 +3676,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -3608,6 +3676,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
* the callback function (multiVnodeInsertFinalize) correctly. * the callback function (multiVnodeInsertFinalize) correctly.
*/ */
pNew->fetchFp = pNew->fp; pNew->fetchFp = pNew->fp;
pSql->pSubs[numOfSub] = pNew; pSql->pSubs[numOfSub] = pNew;
STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->insertParam.pDataBlocks, numOfSub); STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->insertParam.pDataBlocks, numOfSub);
...@@ -3618,28 +3687,44 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -3618,28 +3687,44 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
} else { } else {
tscDebug("0x%"PRIx64" prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%s", pSql->self, numOfSub, tscDebug("0x%"PRIx64" prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%s", pSql->self, numOfSub,
pSql->subState.numOfSub, tstrerror(pRes->code)); pSql->subState.numOfSub, tstrerror(pRes->code));
goto _error; errflag = 1;
break;
} }
} }
if (numOfSub < pSql->subState.numOfSub) { if (numOfSub < pSql->subState.numOfSub) {
tscError("0x%"PRIx64" failed to prepare subObj structure and launch sub-insertion", pSql->self); tscError("0x%"PRIx64" failed to prepare subObj structure and launch sub-insertion", pSql->self);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
errflag = 1;
}
stateVersion = tscGetVersionOfSubStateWithoutLock(pSql);
pthread_mutex_unlock(&pSql->subState.mutex); }
if (errflag) {
goto _error; goto _error;
} }
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks); pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks);
// use the local variable // use the local variable
for (int32_t j = 0; j < numOfSub; ++j) { for (int32_t j = 0; j < pSql->subState.numOfSub; ++j) {
SSqlObj *pSub = pSql->pSubs[j]; SSqlObj *pSub = tscAcquireRefOfSubobj(pSql, j, stateVersion); // ACQ ref
if (!pSub) {
if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) {
continue;
}
tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub);
return TSDB_CODE_FAILED;
}
tscDebug("0x%"PRIx64" sub:%p launch sub insert, orderOfSub:%d", pSql->self, pSub, j); tscDebug("0x%"PRIx64" sub:%p launch sub insert, orderOfSub:%d", pSql->self, pSub, j);
tscBuildAndSendRequest(pSub, NULL); tscBuildAndSendRequest(pSub, NULL);
tscReleaseRefOfSubobj(pSub); // REL ref
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error: _error:
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
...@@ -3663,6 +3748,9 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { ...@@ -3663,6 +3748,9 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
int32_t numOfRes = INT32_MAX; int32_t numOfRes = INT32_MAX;
{ pthread_mutex_lock(&pSql->subState.mutex);
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) { if (pSub == NULL) {
...@@ -3673,6 +3761,8 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { ...@@ -3673,6 +3761,8 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
numOfRes = (int32_t)(MIN(numOfRes, remain)); numOfRes = (int32_t)(MIN(numOfRes, remain));
} }
pthread_mutex_unlock(&pSql->subState.mutex); }
if (numOfRes == 0) { // no result any more, free all subquery objects if (numOfRes == 0) { // no result any more, free all subquery objects
pSql->res.completed = true; pSql->res.completed = true;
freeJoinSubqueryObj(pSql); freeJoinSubqueryObj(pSql);
...@@ -3700,6 +3790,9 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { ...@@ -3700,6 +3790,9 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
tscRestoreFuncForSTableQuery(pQueryInfo); tscRestoreFuncForSTableQuery(pQueryInfo);
tscFieldInfoUpdateOffset(pQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo);
{ pthread_mutex_lock(&pSql->subState.mutex);
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) { if (pSub == NULL) {
...@@ -3735,6 +3828,8 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { ...@@ -3735,6 +3828,8 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
assert(pSub->res.row <= pSub->res.numOfRows); assert(pSub->res.row <= pSub->res.numOfRows);
} }
pthread_mutex_unlock(&pSql->subState.mutex); }
pRes->numOfRows = numOfRes; pRes->numOfRows = numOfRes;
pRes->numOfClauseTotal += numOfRes; pRes->numOfClauseTotal += numOfRes;
...@@ -3847,6 +3942,8 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { ...@@ -3847,6 +3942,8 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
{ pthread_mutex_lock(&pSql->subState.mutex);
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
bool allSubqueryExhausted = true; bool allSubqueryExhausted = true;
...@@ -3893,6 +3990,8 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { ...@@ -3893,6 +3990,8 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
} }
} }
pthread_mutex_unlock(&pSql->subState.mutex); }
return hasData; return hasData;
} }
......
...@@ -1681,33 +1681,30 @@ void tscFreeSqlResult(SSqlObj* pSql) { ...@@ -1681,33 +1681,30 @@ void tscFreeSqlResult(SSqlObj* pSql) {
} }
void tscFreeSubobj(SSqlObj* pSql) { void tscFreeSubobj(SSqlObj* pSql) {
pthread_mutex_lock(&pSql->subState.mutex);
if (pSql->subState.numOfSub == 0) { if (pSql->subState.numOfSub == 0) {
return; goto _out;
} }
pSql->subState.version ++;
pthread_mutex_lock(&pSql->subState.mutex);
tscDebug("0x%"PRIx64" start to free sub SqlObj, numOfSub:%d", pSql->self, pSql->subState.numOfSub); tscDebug("0x%"PRIx64" start to free sub SqlObj, numOfSub:%d", pSql->self, pSql->subState.numOfSub);
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
if (pSql->pSubs[i] != NULL) { if (!pSql->pSubs[i]) {
tscDebug("0x%"PRIx64" free sub SqlObj:0x%"PRIx64", index:%d", pSql->self, pSql->pSubs[i]->self, i);
} else {
/* just for python error test case */
tscDebug("0x%"PRIx64" free sub SqlObj:0x0, index:%d", pSql->self, i); tscDebug("0x%"PRIx64" free sub SqlObj:0x0, index:%d", pSql->self, i);
continue;
} }
tscDebug("0x%"PRIx64" free sub SqlObj:0x%"PRIx64", index:%d", pSql->self, pSql->pSubs[i]->self, i);
taos_free_result(pSql->pSubs[i]); taos_free_result(pSql->pSubs[i]);
pSql->pSubs[i] = NULL; pSql->pSubs[i] = NULL;
} }
tfree(pSql->pSubs);
tfree(pSql->subState.states); tfree(pSql->subState.states);
pSql->subState.numOfSub = 0; pSql->subState.numOfSub = 0;
tfree(pSql->pSubs); _out:
pthread_mutex_unlock(&pSql->subState.mutex); pthread_mutex_unlock(&pSql->subState.mutex);
pthread_mutex_destroy(&pSql->subState.mutex);
} }
/** /**
...@@ -1744,6 +1741,52 @@ void tscFreeMetaSqlObj(int64_t *rid){ ...@@ -1744,6 +1741,52 @@ void tscFreeMetaSqlObj(int64_t *rid){
} }
} }
SSqlObj* tscAllocSqlObj() {
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
if (!pNew) {
return NULL;
}
int rc = tsem_init(&pNew->rspSem, 0, 0);
assert(rc == 0 && "tsem_init failure");
rc = pthread_mutex_init(&pNew->subState.mutex, NULL);
assert(rc == 0 && "pthread_mutex_init failure");
return pNew;
}
SSqlObj* tscAcquireRefOfSubobj(SSqlObj* pSql, int32_t idx, uint32_t stateVersion) {
assert (pSql != NULL);
SSqlObj *pSub = NULL;
{ pthread_mutex_lock(&pSql->subState.mutex);
if (stateVersion != tscGetVersionOfSubStateWithoutLock(pSql) ||
idx < 0 ||
idx >= pSql->subState.numOfSub ||
!pSql->pSubs[idx]) {
goto _out;
}
pSub = taosAcquireRef(tscObjRef, pSql->pSubs[idx]->self);
assert (pSql->pSubs[idx] == pSub && "Refcounted subquery obj mismatch");
_out:
pthread_mutex_unlock(&pSql->subState.mutex); }
return pSub;
}
void tscReleaseRefOfSubobj(SSqlObj* pSub) {
assert (pSub != NULL && pSub->self != 0 && "Subquery obj not refcounted");
taosReleaseRef(tscObjRef, pSub->self);
}
uint32_t tscGetVersionOfSubStateWithoutLock(SSqlObj *pSql) {
return pSql->subState.version;
}
void tscResetAllSubStates(SSqlObj* pSql) {
pthread_mutex_lock(&pSql->subState.mutex);
memset(pSql->subState.states, 0, sizeof(pSql->subState.states[0]) * pSql->subState.numOfSub);
pthread_mutex_unlock(&pSql->subState.mutex);
}
void tscFreeSqlObj(SSqlObj* pSql) { void tscFreeSqlObj(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
return; return;
...@@ -1767,14 +1810,11 @@ void tscFreeSqlObj(SSqlObj* pSql) { ...@@ -1767,14 +1810,11 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tscFreeSubobj(pSql); tscFreeSubobj(pSql);
if (pSql && (pSql == pSql->rootObj)) {
pthread_mutex_destroy(&pSql->mtxSubs);
}
pSql->signature = NULL; pSql->signature = NULL;
pSql->fp = NULL; pSql->fp = NULL;
tfree(pSql->sqlstr); tfree(pSql->sqlstr);
tfree(pSql->pBuf); tfree(pSql->pBuf);
pSql->self = 0; pSql->self = 0;
tscFreeSqlResult(pSql); tscFreeSqlResult(pSql);
...@@ -1785,6 +1825,7 @@ void tscFreeSqlObj(SSqlObj* pSql) { ...@@ -1785,6 +1825,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tscDebug("0x%"PRIx64" addr:%p free completed", sid, pSql); tscDebug("0x%"PRIx64" addr:%p free completed", sid, pSql);
pthread_mutex_destroy(&pSql->subState.mutex);
tsem_destroy(&pSql->rspSem); tsem_destroy(&pSql->rspSem);
memset(pSql, 0, sizeof(*pSql)); memset(pSql, 0, sizeof(*pSql));
free(pSql); free(pSql);
...@@ -3785,7 +3826,7 @@ void registerSqlObj(SSqlObj* pSql) { ...@@ -3785,7 +3826,7 @@ void registerSqlObj(SSqlObj* pSql) {
} }
SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, int32_t cmd) { SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, int32_t cmd) {
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); SSqlObj* pNew = tscAllocSqlObj();
if (pNew == NULL) { if (pNew == NULL) {
tscError("0x%"PRIx64" new subquery failed, tableIndex:%d", pSql->self, 0); tscError("0x%"PRIx64" new subquery failed, tableIndex:%d", pSql->self, 0);
return NULL; return NULL;
...@@ -3797,7 +3838,6 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in ...@@ -3797,7 +3838,6 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in
SSqlCmd* pCmd = &pNew->cmd; SSqlCmd* pCmd = &pNew->cmd;
pCmd->command = cmd; pCmd->command = cmd;
tsem_init(&pNew->rspSem, 0 ,0);
if (tscAddQueryInfo(pCmd) != TSDB_CODE_SUCCESS) { if (tscAddQueryInfo(pCmd) != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pNew); tscFreeSqlObj(pNew);
...@@ -3862,7 +3902,7 @@ static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pNewQueryInfo, int64_t ui ...@@ -3862,7 +3902,7 @@ static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pNewQueryInfo, int64_t ui
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t fp, void* param, int32_t cmd, SSqlObj* pPrevSql) { SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t fp, void* param, int32_t cmd, SSqlObj* pPrevSql) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); SSqlObj* pNew = tscAllocSqlObj();
if (pNew == NULL) { if (pNew == NULL) {
tscError("0x%"PRIx64" new subquery failed, tableIndex:%d", pSql->self, tableIndex); tscError("0x%"PRIx64" new subquery failed, tableIndex:%d", pSql->self, tableIndex);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -3876,7 +3916,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t ...@@ -3876,7 +3916,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pNew->signature = pNew; pNew->signature = pNew;
pNew->sqlstr = strdup(pSql->sqlstr); pNew->sqlstr = strdup(pSql->sqlstr);
pNew->rootObj = pSql->rootObj; pNew->rootObj = pSql->rootObj;
tsem_init(&pNew->rspSem, 0, 0);
SSqlCmd* pnCmd = &pNew->cmd; SSqlCmd* pnCmd = &pNew->cmd;
memcpy(pnCmd, pCmd, sizeof(SSqlCmd)); memcpy(pnCmd, pCmd, sizeof(SSqlCmd));
...@@ -4201,26 +4240,25 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { ...@@ -4201,26 +4240,25 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
return; return;
} }
taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param); taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param);
} }
int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) { int32_t doReInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) {
//bug fix. Above doInitSubState level, the loop invocation with the same SSqlObj will be fail.
//assert(pSql->subState.numOfSub == 0 && pSql->pSubs == NULL && pSql->subState.states == NULL);
tscFreeSubobj(pSql); tscFreeSubobj(pSql);
int32_t code = TSDB_CODE_SUCCESS;
pSql->pSubs = calloc(numOfSubqueries, POINTER_BYTES); { pthread_mutex_lock(&pSql->subState.mutex);
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t));
int32_t code = pthread_mutex_init(&pSql->subState.mutex, NULL); pSql->pSubs = calloc(numOfSubqueries, POINTER_BYTES);
if (pSql->pSubs == NULL || pSql->subState.states == NULL || code != 0) { pSql->subState.states = calloc(numOfSubqueries, sizeof(int8_t));
return TSDB_CODE_TSC_OUT_OF_MEMORY; if (pSql->pSubs == NULL || pSql->subState.states == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
pSql->subState.numOfSub = numOfSubqueries; pSql->subState.numOfSub = numOfSubqueries;
pSql->subState.version ++;
return TSDB_CODE_SUCCESS; pthread_mutex_unlock(&pSql->subState.mutex); }
return code;
} }
// do execute the query according to the query execution plan // do execute the query according to the query execution plan
...@@ -4245,21 +4283,26 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4245,21 +4283,26 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
// upstream may be freed before retry // upstream may be freed before retry
if (pQueryInfo->pUpstream && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly if (pQueryInfo->pUpstream && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly
code = doInitSubState(pSql, (int32_t) taosArrayGetSize(pQueryInfo->pUpstream)); code = doReInitSubState(pSql, (int32_t) taosArrayGetSize(pQueryInfo->pUpstream));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
uint32_t stateVersion = 0;
int errflag = 0;
{ pthread_mutex_lock(&pSql->subState.mutex);
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, i); SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, i);
pSql->cmd.active = pSub; pSql->cmd.active = pSub;
pSql->cmd.command = TSDB_SQL_SELECT; pSql->cmd.command = TSDB_SQL_SELECT;
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); SSqlObj* pNew = tscAllocSqlObj();
if (pNew == NULL) { if (pNew == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; errflag = 1;
break;
} }
pNew->pTscObj = pSql->pTscObj; pNew->pTscObj = pSql->pTscObj;
...@@ -4272,24 +4315,26 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4272,24 +4315,26 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
pNew->cmd.resColumnId = TSDB_RES_COL_ID; pNew->cmd.resColumnId = TSDB_RES_COL_ID;
tsem_init(&pNew->rspSem, 0, 0);
SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport)); // todo use object id SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport)); // todo use object id
if (ps == NULL) { if (ps == NULL) {
tscFreeSqlObj(pNew); tscFreeSqlObj(pNew);
goto _error; errflag = 1;
break;
} }
ps->pParentSql = pSql; ps->pParentSql = pSql;
ps->subqueryIndex = i; ps->subqueryIndex = i;
pNew->param = ps; pNew->param = ps;
registerSqlObj(pNew);
pSql->pSubs[i] = pNew; pSql->pSubs[i] = pNew;
SSqlCmd* pCmd = &pNew->cmd; SSqlCmd* pCmd = &pNew->cmd;
pCmd->command = TSDB_SQL_SELECT; pCmd->command = TSDB_SQL_SELECT;
if ((code = tscAddQueryInfo(pCmd)) != TSDB_CODE_SUCCESS) { if ((code = tscAddQueryInfo(pCmd)) != TSDB_CODE_SUCCESS) {
goto _error; errflag = 1;
break;
} }
SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd); SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd);
...@@ -4299,9 +4344,23 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4299,9 +4344,23 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
numOfInit++; numOfInit++;
} }
stateVersion = tscGetVersionOfSubStateWithoutLock(pSql);
pthread_mutex_unlock(&pSql->subState.mutex); }
if (errflag) {
goto _error;
}
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* psub = pSql->pSubs[i]; SSqlObj* psub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref
registerSqlObj(psub); if (!psub) {
if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) {
continue;
}
tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub);
code = TSDB_CODE_FAILED;
goto _error;
}
// create sub query to handle the sub query. // create sub query to handle the sub query.
SQueryInfo* pq = tscGetQueryInfo(&psub->cmd); SQueryInfo* pq = tscGetQueryInfo(&psub->cmd);
...@@ -4311,30 +4370,21 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4311,30 +4370,21 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
psub->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; psub->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
} }
executeQuery(psub, pq); executeQuery(psub, pq);
tscReleaseRefOfSubobj(psub); // REL ref
} }
return; return;
} else if (hasMoreClauseToTry(pSql)) {
if (pthread_mutex_init(&pSql->subState.mutex, NULL) != 0) {
goto _error;
}
} }
pSql->cmd.active = pQueryInfo; pSql->cmd.active = pQueryInfo;
doExecuteQuery(pSql, pQueryInfo); doExecuteQuery(pSql, pQueryInfo);
return; return;
_error: _error:
for(int32_t i = 0; i < numOfInit; ++i) {
SSqlObj* p = pSql->pSubs[i];
tscFreeSqlObj(p);
}
pSql->res.code = code; pSql->res.code = code;
pSql->subState.numOfSub = 0; // not initialized sub query object will not be freed tscFreeSubobj(pSql);
tfree(pSql->subState.states);
tfree(pSql->pSubs);
tscAsyncResultOnError(pSql); tscAsyncResultOnError(pSql);
} }
...@@ -4578,7 +4628,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { ...@@ -4578,7 +4628,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
* *
* For super table join with projection query, if anyone of the subquery is exhausted, the query completed. * For super table join with projection query, if anyone of the subquery is exhausted, the query completed.
*/ */
pSql->subState.numOfSub = 0; tscFreeSubobj(pSql);
pCmd->command = TSDB_SQL_SELECT; pCmd->command = TSDB_SQL_SELECT;
tscResetForNextRetrieve(pRes); tscResetForNextRetrieve(pRes);
......
...@@ -195,7 +195,6 @@ int taosRemoveRef(int rsetId, int64_t rid) ...@@ -195,7 +195,6 @@ int taosRemoveRef(int rsetId, int64_t rid)
return taosDecRefCount(rsetId, rid, 1); return taosDecRefCount(rsetId, rid, 1);
} }
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid
void *taosAcquireRef(int rsetId, int64_t rid) void *taosAcquireRef(int rsetId, int64_t rid)
{ {
int hash; int hash;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册