提交 53a7e3d6 编写于 作者: H Haojun Liao

[td-225] add test case for join query.

上级 18637381
......@@ -324,7 +324,6 @@ typedef struct SSqlObj {
SSqlRes res;
uint16_t numOfSubs;
struct SSqlObj **pSubs;
tsem_t subReadySem;
struct SSqlObj * prev, *next;
} SSqlObj;
......
......@@ -433,7 +433,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
tscError("%p get tableMeta failed, code:%s", pSql, tstrerror(code));
goto _error;
} else {
tscDebug("%p get tableMeta successfully", pSql);
const char* msg = (pCmd->command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"table-meta";
tscDebug("%p get %s successfully", pSql, msg);
}
if (pSql->pStream == NULL) {
......
......@@ -454,6 +454,8 @@ void tscKillSTableQuery(SSqlObj *pSql) {
return;
}
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
for (int i = 0; i < pSql->numOfSubs; ++i) {
// NOTE: pSub may have been released already here
SSqlObj *pSub = pSql->pSubs[i];
......@@ -466,7 +468,7 @@ void tscKillSTableQuery(SSqlObj *pSql) {
rpcCancelRequest(pSub->pRpcCtx);
}
tscQueueAsyncRes(pSub);
tscQueueAsyncRes(pSub); // async res? not other functions?
}
tscDebug("%p super table query cancelled", pSql);
......@@ -1436,11 +1438,6 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
return code;
}
// all subquery have completed already
if (pRes->pLocalReducer == NULL) {
sem_wait(&pSql->subReadySem);
}
pRes->code = tscDoLocalMerge(pSql);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
......
......@@ -295,8 +295,6 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) {
}
tsem_init(&pSql->rspSem, 0, 0);
tsem_init(&pSql->subReadySem, 0, 0);
doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
tsem_wait(&pSql->rspSem);
......
......@@ -1146,7 +1146,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
/////////////////////////////////////////////////////////////////////////////////////////
static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) {
SSqlCmd * pCmd = &pSql->cmd;
......@@ -1411,7 +1411,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
trs->pParentSql = pSql;
trs->pFinalColModel = pModel;
SSqlObj *pNew = tscCreateSqlObjForSubquery(pSql, trs, NULL);
SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL);
if (pNew == NULL) {
tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
taosTFree(trs->localBuffer);
......@@ -1451,18 +1451,12 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tscDebug("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex);
tscProcessSql(pSub);
}
// set the command flag must be after the semaphore been correctly set.
pSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
if (pRes->code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, 0);
}
return TSDB_CODE_SUCCESS;
}
static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) {
tscDebug("%p start to free subquery result", pSql);
tscDebug("%p start to free subquery obj", pSql);
int32_t index = trsupport->subqueryIndex;
SSqlObj *pParentSql = trsupport->pParentSql;
......@@ -1503,10 +1497,10 @@ static int32_t tscReissueSubquery(SRetrieveSupport *trsupport, SSqlObj *pSql, in
// clear local saved number of results
trsupport->localBuffer->num = 0;
tscDebug("%p sub:%p retrieve/query failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSql, pSql,
tscError("%p sub:%p retrieve/query failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSql, pSql,
tstrerror(code), subqueryIndex, trsupport->numOfRetry);
SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSql, trsupport, pSql);
SSqlObj *pNew = tscCreateSTableSubquery(trsupport->pParentSql, trsupport, pSql);
if (pNew == NULL) {
tscError("%p sub:%p failed to create new subquery due to error:%s, abort retry, vgId:%d, orderOfSub:%d",
trsupport->pParentSql, pSql, tstrerror(terrno), pVgroup->vgId, trsupport->subqueryIndex);
......@@ -1584,9 +1578,17 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
taosTFree(trsupport->pState);
tscFreeSubSqlObj(trsupport, pSql);
// in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
tsem_post(&pParentSql->subReadySem);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code);
} else { // regular super table query
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pParentSql);
}
}
}
static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* pSql) {
......@@ -1659,8 +1661,13 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
taosTFree(trsupport->pState);
tscFreeSubSqlObj(trsupport, pSql);
// all subqueries are completed, retrieve from local can be proceeded.
tsem_post(&pParentSql->subReadySem);
// set the command flag must be after the semaphore been correctly set.
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
} else {
tscQueueAsyncRes(pParentSql);
}
}
static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
......@@ -1669,21 +1676,22 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
int32_t idx = trsupport->subqueryIndex;
SSqlObj * pParentSql = trsupport->pParentSql;
assert(tres != NULL);
SSqlObj *pSql = (SSqlObj *)tres;
if (pSql == NULL) { // sql object has been released in error process, return immediately
tscDebug("%p subquery has been released, idx:%d, abort", pParentSql, idx);
return;
}
// if (pSql == NULL) { // sql object has been released in error process, return immediately
// tscDebug("%p subquery has been released, idx:%d, abort", pParentSql, idx);
// return;
// }
SSubqueryState* pState = trsupport->pState;
assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
SCMVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
tscDebug("%p query cancelled or failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s",
tscDebug("%p query cancelled/failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s",
pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(numOfRows), tstrerror(pParentSql->res.code));
tscHandleSubqueryError(param, tres, numOfRows);
......@@ -1694,7 +1702,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
assert(numOfRows == taos_errno(pSql));
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
tscDebug("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry);
tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry);
if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) {
return;
......@@ -1745,11 +1753,8 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
(int32_t)pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
if (ret != 0) { // set no disk space error info, and abort retry
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE);
} else if (pRes->completed) {
tscAllDataRetrievedFromDnode(trsupport, pSql);
return;
} else { // continue fetch data from dnode
taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
}
......@@ -1759,15 +1764,15 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
}
}
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
const int32_t table_index = 0;
SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, TSDB_SQL_SELECT, prevSqlObj);
if (pNew != NULL) { // the sub query of two-stage super table query
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1);
assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 && trsupport->subqueryIndex < pSql->numOfSubs);
// launch subquery for each vnode, so the subquery index equals to the vgroupIndex.
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index);
......@@ -1784,7 +1789,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SSqlObj* pParentSql = trsupport->pParentSql;
SSqlObj* pSql = (SSqlObj *) tres;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
assert(pSql->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
......@@ -1812,7 +1817,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
assert(code == taos_errno(pSql));
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
tscWarn("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry);
tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry);
if (tscReissueSubquery(trsupport, pSql, code) == TSDB_CODE_SUCCESS) {
return;
}
......@@ -2099,7 +2104,6 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
SSqlRes *pRes = &pSql->res;
assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows);
if (pRes->row >= pRes->numOfRows) { // all the results has returned to invoker
taosTFree(pRes->tsrow);
return pRes->tsrow;
......
......@@ -373,7 +373,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) {
return;
}
tscDebug("%p start to free sql object", pSql);
tscPartiallyFreeSqlObj(pSql);
......@@ -388,7 +388,6 @@ void tscFreeSqlObj(SSqlObj* pSql) {
taosTFree(pSql->sqlstr);
tsem_destroy(&pSql->rspSem);
tsem_destroy(&pSql->subReadySem);
free(pSql);
}
......@@ -1759,6 +1758,7 @@ static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pQueryInfo, SQueryInfo* p
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, int32_t cmd, SSqlObj* pPrevSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
if (pNew == NULL) {
tscError("%p new subquery failed, tableIndex:%d", pSql, tableIndex);
......
......@@ -36,6 +36,14 @@ typedef void (*_ref_fn_t)(const void* pObj);
_ref_fn_t end; \
} _ref_func = {.begin = (s), .end = (e)};
// set the initial reference count value
#define T_REF_INIT_VAL(x, _v) \
do { \
assert(_v >= 0); \
atomic_store_32(&((x)->_ref.val), (_v)); \
} while (0)
// increase the reference count by 1
#define T_REF_INC(x) (atomic_add_fetch_32(&((x)->_ref.val), 1))
#define T_REF_INC_WITH_CB(x, p) \
......
......@@ -49,4 +49,4 @@ export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB_DIR
# Now we are all let, and let's see if we can find a crash. Note we pass all params
python3 ./crash_gen.py $@
python3.8 ./crash_gen.py $@
......@@ -330,7 +330,10 @@ sql_error select join_tb1.* from $tb1 , $tb2 where join_tb1.ts != join_tb0.ts an
sql_error select join_tb1.* from $tb1 , $tb1 where join_tb1.ts = join_tb1.ts and join_tb1.ts >= 100000;
sql_error select join_tb1.* from $tb1 , $tb1 where join_tb1.ts = join_tb1.ts order by ts;
sql_error select join_tb1.* from $tb1 , $tb1 where join_tb1.ts = join_tb1.ts order by join_tb1.c7;
sql_error select * from $tb1, $tb2;
sql_error select last_row(*) from $tb1, $tb2
sql_error select last_row(*) from $tb1, $tb2 where join_tb1.ts < now
sql_error select last_row(*) from $tb1, $tb2 where join_tb1.ts = join_tb2.ts
print ==================================super table join ==============================
# select duplicate columns
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册