diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index aecbdd82075e747d613171569c62ef83e0219f1d..004c5bc21c5efe3ae6cae600c8270a5c0cdac1ac 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -324,7 +324,6 @@ typedef struct SSqlObj { SSqlRes res; uint16_t numOfSubs; struct SSqlObj **pSubs; - tsem_t subReadySem; struct SSqlObj * prev, *next; } SSqlObj; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index b05aef76eb387596700d19090406073dcdf6132b..4643d255dc79e3192da5279326d2b3b0bd0a212d 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -433,7 +433,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { tscError("%p get tableMeta failed, code:%s", pSql, tstrerror(code)); goto _error; } else { - tscDebug("%p get tableMeta successfully", pSql); + const char* msg = (pCmd->command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"table-meta"; + tscDebug("%p get %s successfully", pSql, msg); } if (pSql->pStream == NULL) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index e33bfe44d19a9b90f20a39bf00a063ae435b0324..81be0625ea44cf53b7b35be0a6634bb6a9167bbc 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -454,6 +454,8 @@ void tscKillSTableQuery(SSqlObj *pSql) { return; } + pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; + for (int i = 0; i < pSql->numOfSubs; ++i) { // NOTE: pSub may have been released already here SSqlObj *pSub = pSql->pSubs[i]; @@ -466,7 +468,7 @@ void tscKillSTableQuery(SSqlObj *pSql) { rpcCancelRequest(pSub->pRpcCtx); } - tscQueueAsyncRes(pSub); + tscQueueAsyncRes(pSub); // async res? not other functions? } tscDebug("%p super table query cancelled", pSql); @@ -1436,11 +1438,6 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { return code; } - // all subquery have completed already - if (pRes->pLocalReducer == NULL) { - sem_wait(&pSql->subReadySem); - } - pRes->code = tscDoLocalMerge(pSql); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 1a774e878494e8055bf81b1b22a8510dd79003b2..8d3336828018b3a3000cc45e83e5a078ec578d7f 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -295,8 +295,6 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) { } tsem_init(&pSql->rspSem, 0, 0); - tsem_init(&pSql->subReadySem, 0, 0); - doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen); tsem_wait(&pSql->rspSem); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index c50b663e02e51b50157d474facc0952238100301..4e2edde31ea3d6c023e4f58f297009e10906e668 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1146,7 +1146,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ///////////////////////////////////////////////////////////////////////////////////////// static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code); -static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj); +static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj); int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) { SSqlCmd * pCmd = &pSql->cmd; @@ -1411,7 +1411,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { trs->pParentSql = pSql; trs->pFinalColModel = pModel; - SSqlObj *pNew = tscCreateSqlObjForSubquery(pSql, trs, NULL); + SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL); if (pNew == NULL) { tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); taosTFree(trs->localBuffer); @@ -1451,18 +1451,12 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { tscDebug("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex); tscProcessSql(pSub); } - - // set the command flag must be after the semaphore been correctly set. - pSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE; - if (pRes->code == TSDB_CODE_SUCCESS) { - (*pSql->fp)(pSql->param, pSql, 0); - } return TSDB_CODE_SUCCESS; } static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) { - tscDebug("%p start to free subquery result", pSql); + tscDebug("%p start to free subquery obj", pSql); int32_t index = trsupport->subqueryIndex; SSqlObj *pParentSql = trsupport->pParentSql; @@ -1503,10 +1497,10 @@ static int32_t tscReissueSubquery(SRetrieveSupport *trsupport, SSqlObj *pSql, in // clear local saved number of results trsupport->localBuffer->num = 0; - tscDebug("%p sub:%p retrieve/query failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSql, pSql, + tscError("%p sub:%p retrieve/query failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSql, pSql, tstrerror(code), subqueryIndex, trsupport->numOfRetry); - SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSql, trsupport, pSql); + SSqlObj *pNew = tscCreateSTableSubquery(trsupport->pParentSql, trsupport, pSql); if (pNew == NULL) { tscError("%p sub:%p failed to create new subquery due to error:%s, abort retry, vgId:%d, orderOfSub:%d", trsupport->pParentSql, pSql, tstrerror(terrno), pVgroup->vgId, trsupport->subqueryIndex); @@ -1584,9 +1578,17 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO taosTFree(trsupport->pState); tscFreeSubSqlObj(trsupport, pSql); - + // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes - tsem_post(&pParentSql->subReadySem); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0); + + if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { + (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code); + } else { // regular super table query + if (pParentSql->res.code != TSDB_CODE_SUCCESS) { + tscQueueAsyncRes(pParentSql); + } + } } static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* pSql) { @@ -1659,8 +1661,13 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p taosTFree(trsupport->pState); tscFreeSubSqlObj(trsupport, pSql); - // all subqueries are completed, retrieve from local can be proceeded. - tsem_post(&pParentSql->subReadySem); + // set the command flag must be after the semaphore been correctly set. + pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE; + if (pParentSql->res.code == TSDB_CODE_SUCCESS) { + (*pParentSql->fp)(pParentSql->param, pParentSql, 0); + } else { + tscQueueAsyncRes(pParentSql); + } } static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { @@ -1669,21 +1676,22 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR int32_t idx = trsupport->subqueryIndex; SSqlObj * pParentSql = trsupport->pParentSql; + assert(tres != NULL); SSqlObj *pSql = (SSqlObj *)tres; - if (pSql == NULL) { // sql object has been released in error process, return immediately - tscDebug("%p subquery has been released, idx:%d, abort", pParentSql, idx); - return; - } - +// if (pSql == NULL) { // sql object has been released in error process, return immediately +// tscDebug("%p subquery has been released, idx:%d, abort", pParentSql, idx); +// return; +// } + SSubqueryState* pState = trsupport->pState; assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); - SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; + SCMVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; if (pParentSql->res.code != TSDB_CODE_SUCCESS) { trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; - tscDebug("%p query cancelled or failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s", + tscDebug("%p query cancelled/failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s", pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(numOfRows), tstrerror(pParentSql->res.code)); tscHandleSubqueryError(param, tres, numOfRows); @@ -1694,7 +1702,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR assert(numOfRows == taos_errno(pSql)); if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) { - tscDebug("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry); + tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry); if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) { return; @@ -1745,11 +1753,8 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR (int32_t)pRes->numOfRows, pQueryInfo->groupbyExpr.orderType); if (ret != 0) { // set no disk space error info, and abort retry tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE); - } else if (pRes->completed) { tscAllDataRetrievedFromDnode(trsupport, pSql); - return; - } else { // continue fetch data from dnode taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param); } @@ -1759,15 +1764,15 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR } } -static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) { +static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) { const int32_t table_index = 0; SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, TSDB_SQL_SELECT, prevSqlObj); if (pNew != NULL) { // the sub query of two-stage super table query SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY; - - assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1); + assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 && trsupport->subqueryIndex < pSql->numOfSubs); // launch subquery for each vnode, so the subquery index equals to the vgroupIndex. STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index); @@ -1784,7 +1789,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { SSqlObj* pParentSql = trsupport->pParentSql; SSqlObj* pSql = (SSqlObj *) tres; - + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); assert(pSql->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1); @@ -1812,7 +1817,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { assert(code == taos_errno(pSql)); if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) { - tscWarn("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry); + tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry); if (tscReissueSubquery(trsupport, pSql, code) == TSDB_CODE_SUCCESS) { return; } @@ -2099,7 +2104,6 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) { SSqlRes *pRes = &pSql->res; assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows); - if (pRes->row >= pRes->numOfRows) { // all the results has returned to invoker taosTFree(pRes->tsrow); return pRes->tsrow; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 788283a1adcc74ed6c7101464b500b2580eb3090..9f23ac2bdc7fc36e3c4a96a412c1807e2fc4379c 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -373,7 +373,7 @@ void tscFreeSqlObj(SSqlObj* pSql) { if (pSql == NULL || pSql->signature != pSql) { return; } - + tscDebug("%p start to free sql object", pSql); tscPartiallyFreeSqlObj(pSql); @@ -388,7 +388,6 @@ void tscFreeSqlObj(SSqlObj* pSql) { taosTFree(pSql->sqlstr); tsem_destroy(&pSql->rspSem); - tsem_destroy(&pSql->subReadySem); free(pSql); } @@ -1759,6 +1758,7 @@ static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pQueryInfo, SQueryInfo* p SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, int32_t cmd, SSqlObj* pPrevSql) { SSqlCmd* pCmd = &pSql->cmd; + SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); if (pNew == NULL) { tscError("%p new subquery failed, tableIndex:%d", pSql, tableIndex); diff --git a/src/util/inc/tlockfree.h b/src/util/inc/tlockfree.h index a81f597832350c0d529670970d7eb80405321629..e960b601caf6322938f4b0da3953e0b3ba454ce3 100644 --- a/src/util/inc/tlockfree.h +++ b/src/util/inc/tlockfree.h @@ -36,6 +36,14 @@ typedef void (*_ref_fn_t)(const void* pObj); _ref_fn_t end; \ } _ref_func = {.begin = (s), .end = (e)}; +// set the initial reference count value +#define T_REF_INIT_VAL(x, _v) \ + do { \ + assert(_v >= 0); \ + atomic_store_32(&((x)->_ref.val), (_v)); \ + } while (0) + +// increase the reference count by 1 #define T_REF_INC(x) (atomic_add_fetch_32(&((x)->_ref.val), 1)) #define T_REF_INC_WITH_CB(x, p) \ diff --git a/tests/pytest/crash_gen.sh b/tests/pytest/crash_gen.sh index f6be6aae4967e285e069a2b6b71117ad71c05ad5..df1a9f595b986215deb69a5f9f99a3a5847f41dd 100755 --- a/tests/pytest/crash_gen.sh +++ b/tests/pytest/crash_gen.sh @@ -49,4 +49,4 @@ export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB_DIR # Now we are all let, and let's see if we can find a crash. Note we pass all params -python3 ./crash_gen.py $@ +python3.8 ./crash_gen.py $@ diff --git a/tests/script/general/parser/join.sim b/tests/script/general/parser/join.sim index 882f561ae1ef37d289ee742f1a1848433697a1fa..c4e490190ddd8844164057025630d39dc7155993 100644 --- a/tests/script/general/parser/join.sim +++ b/tests/script/general/parser/join.sim @@ -330,7 +330,10 @@ sql_error select join_tb1.* from $tb1 , $tb2 where join_tb1.ts != join_tb0.ts an sql_error select join_tb1.* from $tb1 , $tb1 where join_tb1.ts = join_tb1.ts and join_tb1.ts >= 100000; sql_error select join_tb1.* from $tb1 , $tb1 where join_tb1.ts = join_tb1.ts order by ts; sql_error select join_tb1.* from $tb1 , $tb1 where join_tb1.ts = join_tb1.ts order by join_tb1.c7; - +sql_error select * from $tb1, $tb2; +sql_error select last_row(*) from $tb1, $tb2 +sql_error select last_row(*) from $tb1, $tb2 where join_tb1.ts < now +sql_error select last_row(*) from $tb1, $tb2 where join_tb1.ts = join_tb2.ts print ==================================super table join ============================== # select duplicate columns