未验证 提交 dcabef9a 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #7077 from taosdata/feature/query

Feature/query
...@@ -211,27 +211,27 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) { ...@@ -211,27 +211,27 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
pSql->fp = tscAsyncFetchRowsProxy; pSql->fp = tscAsyncFetchRowsProxy;
pSql->param = param; pSql->param = param;
if (pRes->qId == 0) {
tscError("qhandle is invalid");
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
tscAsyncResultOnError(pSql);
return;
}
tscResetForNextRetrieve(pRes); tscResetForNextRetrieve(pRes);
// handle outer query based on the already retrieved nest query results. // handle outer query based on the already retrieved nest query results.
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) {
SSchedMsg schedMsg = {0}; SSchedMsg schedMsg = {0};
schedMsg.fp = doRetrieveSubqueryData; schedMsg.fp = doRetrieveSubqueryData;
schedMsg.ahandle = (void *)pSql; schedMsg.ahandle = (void *)pSql;
schedMsg.thandle = (void *)1; schedMsg.thandle = (void *)1;
schedMsg.msg = 0; schedMsg.msg = 0;
taosScheduleTask(tscQhandle, &schedMsg); taosScheduleTask(tscQhandle, &schedMsg);
return; return;
} }
if (pRes->qId == 0) {
tscError("qhandle is invalid");
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
tscAsyncResultOnError(pSql);
return;
}
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockForSubquery(pSql); tscFetchDatablockForSubquery(pSql);
} else if (pRes->completed) { } else if (pRes->completed) {
......
...@@ -899,8 +899,7 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -899,8 +899,7 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SSqlNode* pSqlNode = taosArrayGetP(pInfo->list, i); SSqlNode* pSqlNode = taosArrayGetP(pInfo->list, i);
tscTrace("%p start to parse %dth subclause, total:%"PRIzu, pSql, i, size); tscTrace("0x%"PRIx64" start to parse the %dth subclause, total:%"PRIzu, pSql->self, i, size);
// normalizeSqlNode(pSqlNode); // normalize the column name in each function // normalizeSqlNode(pSqlNode); // normalize the column name in each function
if ((code = validateSqlNode(pSql, pSqlNode, pQueryInfo)) != TSDB_CODE_SUCCESS) { if ((code = validateSqlNode(pSql, pSqlNode, pQueryInfo)) != TSDB_CODE_SUCCESS) {
return code; return code;
......
...@@ -392,14 +392,17 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -392,14 +392,17 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
// single table query error need to be handled here. // single table query error need to be handled here.
if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) && if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
(((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || // change the retry procedure (((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) ||
rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || // change the retry procedure rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
rpcMsg->code == TSDB_CODE_APP_NOT_READY)) { rpcMsg->code == TSDB_CODE_APP_NOT_READY)) {
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY | // 1. super table subquery
// 2. nest queries are all not updated the tablemeta and retry parse the sql after cleanup local tablemeta/vgroup id buffer
if ((TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) && TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) &&
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) { !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) ||
(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY))) {
// do nothing in case of super table subquery // do nothing in case of super table subquery
} else { } else {
pSql->retry += 1; pSql->retry += 1;
......
...@@ -2706,7 +2706,6 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO ...@@ -2706,7 +2706,6 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
// release allocated resource // release allocated resource
tscDestroyGlobalMergerEnv(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, pState->numOfSub); tscDestroyGlobalMergerEnv(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, pState->numOfSub);
tscFreeRetrieveSup(pSql); tscFreeRetrieveSup(pSql);
// in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
...@@ -2717,10 +2716,13 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO ...@@ -2717,10 +2716,13 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
int32_t code = pParentSql->res.code; int32_t code = pParentSql->res.code;
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry) { if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry) {
// remove the cached tableMeta and vgroup id list, and then parse the sql again // remove the cached tableMeta and vgroup id list, and then parse the sql again
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentSql->cmd, 0); SSqlCmd* pParentCmd = &pParentSql->cmd;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0);
tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self); tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self);
tscResetSqlCmd(&pParentSql->cmd, true); pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap);
pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pParentSql->res.code = TSDB_CODE_SUCCESS; pParentSql->res.code = TSDB_CODE_SUCCESS;
pParentSql->retry++; pParentSql->retry++;
......
...@@ -3180,6 +3180,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) { ...@@ -3180,6 +3180,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) {
} }
pQueryInfo->tsBuf = tsBufDestroy(pQueryInfo->tsBuf); pQueryInfo->tsBuf = tsBufDestroy(pQueryInfo->tsBuf);
pQueryInfo->fillType = 0;
tfree(pQueryInfo->fillVal); tfree(pQueryInfo->fillVal);
tfree(pQueryInfo->buf); tfree(pQueryInfo->buf);
...@@ -3818,13 +3819,64 @@ static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) { ...@@ -3818,13 +3819,64 @@ static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) {
} }
} }
// todo handle the failure
static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
SSqlObj* pSql = tres;
SRetrieveSupport* ps = param;
if (pSql->res.code != TSDB_CODE_SUCCESS) {
SSqlObj* pParentSql = ps->pParentSql;
int32_t index = ps->subqueryIndex;
bool ret = subAndCheckDone(pSql, pParentSql, index);
tfree(ps);
pSql->param = NULL;
if (!ret) {
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" orderOfSub:%d completed, not all subquery finished", pParentSql->self, pSql->self, index);
return;
}
// todo refactor
tscDebug("0x%"PRIx64" all subquery response received, retry", pParentSql->self);
SSqlCmd* pParentCmd = &pParentSql->cmd;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0);
tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self);
pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap);
pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pParentSql->res.code = TSDB_CODE_SUCCESS;
pParentSql->retry++;
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self,
tstrerror(code), pParentSql->retry);
code = tsParseSql(pParentSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
}
if (code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = code;
tscAsyncResultOnError(pParentSql);
return;
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(pParentCmd);
executeQuery(pParentSql, pQueryInfo);
return;
}
taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param); taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param);
} }
// do execute the query according to the query execution plan // do execute the query according to the query execution plan
void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfInit = 0;
if (pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { if (pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
(*pSql->fp)(pSql->param, pSql, 0); (*pSql->fp)(pSql->param, pSql, 0);
return; return;
...@@ -3839,7 +3891,12 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -3839,7 +3891,12 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t)); pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t));
pthread_mutex_init(&pSql->subState.mutex, NULL); code = pthread_mutex_init(&pSql->subState.mutex, NULL);
if (pSql->pSubs == NULL || pSql->subState.states == NULL || code != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, i); SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, i);
...@@ -3847,46 +3904,71 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -3847,46 +3904,71 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
pSql->cmd.active = pSub; pSql->cmd.active = pSub;
pSql->cmd.command = TSDB_SQL_SELECT; pSql->cmd.command = TSDB_SQL_SELECT;
// TODO handle memory failure
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
if (pNew == NULL) { if (pNew == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscError("pNew == NULL, out of memory"); goto _error;
return;
} }
pNew->pTscObj = pSql->pTscObj; pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew; pNew->signature = pNew;
pNew->sqlstr = strdup(pSql->sqlstr); // todo refactor pNew->sqlstr = strdup(pSql->sqlstr);
pNew->fp = tscSubqueryCompleteCallback; pNew->fp = tscSubqueryCompleteCallback;
pNew->maxRetry = pSql->maxRetry;
tsem_init(&pNew->rspSem, 0, 0); tsem_init(&pNew->rspSem, 0, 0);
SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport)); // todo use object id SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport)); // todo use object id
if (ps == NULL) {
tscFreeSqlObj(pNew);
goto _error;
}
ps->pParentSql = pSql; ps->pParentSql = pSql;
ps->subqueryIndex = i; ps->subqueryIndex = i;
pNew->param = ps; pNew->param = ps;
pSql->pSubs[i] = pNew; pSql->pSubs[i] = pNew;
registerSqlObj(pNew);
SSqlCmd* pCmd = &pNew->cmd; SSqlCmd* pCmd = &pNew->cmd;
pCmd->command = TSDB_SQL_SELECT; pCmd->command = TSDB_SQL_SELECT;
if (tscAddQueryInfo(pCmd) != TSDB_CODE_SUCCESS) { if ((code = tscAddQueryInfo(pCmd)) != TSDB_CODE_SUCCESS) {
goto _error;
} }
SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd); SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd);
tscQueryInfoCopy(pNewQueryInfo, pSub); tscQueryInfoCopy(pNewQueryInfo, pSub);
TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY);
numOfInit++;
}
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* psub = pSql->pSubs[i];
registerSqlObj(psub);
// create sub query to handle the sub query. // create sub query to handle the sub query.
executeQuery(pNew, pNewQueryInfo); SQueryInfo* pq = tscGetQueryInfo(&psub->cmd);
executeQuery(psub, pq);
} }
// merge sub query result and generate final results
return; return;
} }
pSql->cmd.active = pQueryInfo; pSql->cmd.active = pQueryInfo;
doExecuteQuery(pSql, pQueryInfo); doExecuteQuery(pSql, pQueryInfo);
return;
_error:
for(int32_t i = 0; i < numOfInit; ++i) {
SSqlObj* p = pSql->pSubs[i];
tscFreeSqlObj(p);
}
pSql->res.code = code;
pSql->subState.numOfSub = 0; // not initialized sub query object will not be freed
tfree(pSql->subState.states);
tfree(pSql->pSubs);
tscAsyncResultOnError(pSql);
} }
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid) { int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid) {
......
...@@ -365,6 +365,7 @@ do { \ ...@@ -365,6 +365,7 @@ do { \
#define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x200u #define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x200u
#define TSDB_QUERY_TYPE_FILE_INSERT 0x400u // insert data from file #define TSDB_QUERY_TYPE_FILE_INSERT 0x400u // insert data from file
#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type #define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type
#define TSDB_QUERY_TYPE_NEST_SUBQUERY 0x1000u // nested sub query
#define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0) #define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0)
#define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type)) #define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type))
......
...@@ -38,7 +38,7 @@ static void *taosProcessAlarmSignal(void *tharg) { ...@@ -38,7 +38,7 @@ static void *taosProcessAlarmSignal(void *tharg) {
struct sigevent sevent = {{0}}; struct sigevent sevent = {{0}};
setThreadName("alarmSignal"); setThreadName("tmr");
#ifdef _ALPINE #ifdef _ALPINE
sevent.sigev_notify = SIGEV_THREAD; sevent.sigev_notify = SIGEV_THREAD;
......
...@@ -486,6 +486,10 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC ...@@ -486,6 +486,10 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
TsdbQueryHandleT* tsdbQueryTables(STsdbRepo* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, SMemRef* pRef) { TsdbQueryHandleT* tsdbQueryTables(STsdbRepo* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, SMemRef* pRef) {
STsdbQueryHandle* pQueryHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, pRef); STsdbQueryHandle* pQueryHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, pRef);
if (pQueryHandle == NULL) {
return NULL;
}
if (emptyQueryTimewindow(pQueryHandle)) { if (emptyQueryTimewindow(pQueryHandle)) {
return (TsdbQueryHandleT*) pQueryHandle; return (TsdbQueryHandleT*) pQueryHandle;
} }
...@@ -596,6 +600,10 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable ...@@ -596,6 +600,10 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable
} }
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef); STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef);
if (pQueryHandle == NULL) {
return NULL;
}
int32_t code = checkForCachedLastRow(pQueryHandle, groupList); int32_t code = checkForCachedLastRow(pQueryHandle, groupList);
if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0 if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
terrno = code; terrno = code;
...@@ -613,6 +621,10 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable ...@@ -613,6 +621,10 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable
TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) { TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) {
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef); STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef);
if (pQueryHandle == NULL) {
return NULL;
}
int32_t code = checkForCachedLast(pQueryHandle); int32_t code = checkForCachedLast(pQueryHandle);
if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0 if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
terrno = code; terrno = code;
......
...@@ -444,6 +444,10 @@ if $rows != $val then ...@@ -444,6 +444,10 @@ if $rows != $val then
return -1 return -1
endi endi
print ================>TD-5600
sql select first(join_tb0.c8),first(join_tb0.c9) from join_tb1 , join_tb0 where join_tb1.ts = join_tb0.ts and join_tb1.ts <= 100002 and join_tb1.ts>=100000 interval(1s) fill(linear);
#=============================================================== #===============================================================
sql select first(join_tb0.c8),first(join_tb0.c9) from join_tb1 , join_tb0 where join_tb1.ts = join_tb0.ts and join_tb1.ts <= 100002 and join_tb0.c7 = true sql select first(join_tb0.c8),first(join_tb0.c9) from join_tb1 , join_tb0 where join_tb1.ts = join_tb0.ts and join_tb1.ts <= 100002 and join_tb0.c7 = true
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册