提交 404674b5 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/develop' into feature/os

...@@ -47,7 +47,6 @@ void tscLockByThread(int64_t *lockedBy); ...@@ -47,7 +47,6 @@ void tscLockByThread(int64_t *lockedBy);
void tscUnlockByThread(int64_t *lockedBy); void tscUnlockByThread(int64_t *lockedBy);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -317,7 +317,8 @@ typedef struct STscObj { ...@@ -317,7 +317,8 @@ typedef struct STscObj {
} STscObj; } STscObj;
typedef struct SSubqueryState { typedef struct SSubqueryState {
int32_t numOfRemain; // the number of remain unfinished subquery pthread_mutex_t mutex;
int8_t *states;
int32_t numOfSub; // the number of total sub-queries int32_t numOfSub; // the number of total sub-queries
uint64_t numOfRetrievedRows; // total number of points in this query uint64_t numOfRetrievedRows; // total number of points in this query
} SSubqueryState; } SSubqueryState;
......
...@@ -1427,6 +1427,10 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { ...@@ -1427,6 +1427,10 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
tscResetForNextRetrieve(pRes); tscResetForNextRetrieve(pRes);
if (pSql->signature != pSql || pRes == NULL || pRes->pLocalReducer == NULL) { // all data has been processed if (pSql->signature != pSql || pRes == NULL || pRes->pLocalReducer == NULL) { // all data has been processed
if (pRes->code == TSDB_CODE_SUCCESS) {
return pRes->code;
}
tscError("%p local merge abort due to error occurs, code:%s", pSql, tstrerror(pRes->code)); tscError("%p local merge abort due to error occurs, code:%s", pSql, tstrerror(pRes->code));
return pRes->code; return pRes->code;
} }
......
...@@ -264,6 +264,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -264,6 +264,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
case TSDB_SQL_DROP_DB: { case TSDB_SQL_DROP_DB: {
const char* msg2 = "invalid name"; const char* msg2 = "invalid name";
const char* msg3 = "param name too long"; const char* msg3 = "param name too long";
const char* msg4 = "table is not super table";
SStrToken* pzName = &pInfo->pDCLInfo->a[0]; SStrToken* pzName = &pInfo->pDCLInfo->a[0];
if ((pInfo->type != TSDB_SQL_DROP_DNODE) && (tscValidateName(pzName) != TSDB_CODE_SUCCESS)) { if ((pInfo->type != TSDB_SQL_DROP_DNODE) && (tscValidateName(pzName) != TSDB_CODE_SUCCESS)) {
...@@ -285,6 +286,18 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -285,6 +286,18 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if(code != TSDB_CODE_SUCCESS) { if(code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
if (pInfo->pDCLInfo->tableType == TSDB_SUPER_TABLE) {
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
}
} else if (pInfo->type == TSDB_SQL_DROP_DNODE) { } else if (pInfo->type == TSDB_SQL_DROP_DNODE) {
pzName->n = strdequote(pzName->z); pzName->n = strdequote(pzName->z);
strncpy(pTableMetaInfo->name, pzName->z, pzName->n); strncpy(pTableMetaInfo->name, pzName->z, pzName->n);
...@@ -4794,6 +4807,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -4794,6 +4807,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
const char* msg17 = "invalid column name"; const char* msg17 = "invalid column name";
const char* msg18 = "primary timestamp column cannot be dropped"; const char* msg18 = "primary timestamp column cannot be dropped";
const char* msg19 = "invalid new tag name"; const char* msg19 = "invalid new tag name";
const char* msg20 = "table is not super table";
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -4819,6 +4833,10 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -4819,6 +4833,10 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
if (pAlterSQL->tableType == TSDB_SUPER_TABLE && !(UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo))) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg20);
}
if (pAlterSQL->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN || pAlterSQL->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN || if (pAlterSQL->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN || pAlterSQL->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN ||
pAlterSQL->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) { pAlterSQL->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) {
if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
......
...@@ -2193,7 +2193,7 @@ int tscProcessDropTableRsp(SSqlObj *pSql) { ...@@ -2193,7 +2193,7 @@ int tscProcessDropTableRsp(SSqlObj *pSql) {
tscDebug("%p remove table meta after drop table:%s, numOfRemain:%d", pSql, pTableMetaInfo->name, tscDebug("%p remove table meta after drop table:%s, numOfRemain:%d", pSql, pTableMetaInfo->name,
(int32_t) taosHashGetSize(tscTableMetaInfo)); (int32_t) taosHashGetSize(tscTableMetaInfo));
assert(pTableMetaInfo->pTableMeta == NULL); pTableMetaInfo->pTableMeta = NULL;
return 0; return 0;
} }
......
...@@ -55,6 +55,58 @@ static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) { ...@@ -55,6 +55,58 @@ static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) {
} }
} }
static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) {
assert(idx < subState->numOfSub);
assert(subState->states);
pthread_mutex_lock(&subState->mutex);
tscDebug("subquery:%p,%d state set to %d", pSql, idx, state);
subState->states[idx] = state;
pthread_mutex_unlock(&subState->mutex);
}
static bool allSubqueryDone(SSqlObj *pParentSql) {
bool done = true;
SSubqueryState *subState = &pParentSql->subState;
//lock in caller
for (int i = 0; i < subState->numOfSub; i++) {
if (0 == subState->states[i]) {
tscDebug("%p subquery:%p,%d is NOT finished, total:%d", pParentSql, pParentSql->pSubs[i], i, subState->numOfSub);
done = false;
break;
} else {
tscDebug("%p subquery:%p,%d is finished, total:%d", pParentSql, pParentSql->pSubs[i], i, subState->numOfSub);
}
}
return done;
}
static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
SSubqueryState *subState = &pParentSql->subState;
assert(idx < subState->numOfSub);
pthread_mutex_lock(&subState->mutex);
tscDebug("%p subquery:%p,%d state set to 1", pParentSql, pSql, idx);
subState->states[idx] = 1;
bool done = allSubqueryDone(pParentSql);
pthread_mutex_unlock(&subState->mutex);
return done;
}
static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) { static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
...@@ -367,10 +419,6 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -367,10 +419,6 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
// scan all subquery, if one sub query has only ts, ignore it // scan all subquery, if one sub query has only ts, ignore it
tscDebug("%p start to launch secondary subqueries, %d out of %d needs to query", pSql, numOfSub, pSql->subState.numOfSub); tscDebug("%p start to launch secondary subqueries, %d out of %d needs to query", pSql, numOfSub, pSql->subState.numOfSub);
//the subqueries that do not actually launch the secondary query to virtual node is set as completed.
SSubqueryState* pState = &pSql->subState;
pState->numOfRemain = numOfSub;
bool success = true; bool success = true;
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
...@@ -404,6 +452,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -404,6 +452,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
break; break;
} }
tscClearSubqueryInfo(&pNew->cmd); tscClearSubqueryInfo(&pNew->cmd);
pSql->pSubs[i] = pNew; pSql->pSubs[i] = pNew;
...@@ -480,6 +529,8 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -480,6 +529,8 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
} }
} }
subquerySetState(pPrevSub, &pSql->subState, i, 0);
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
tscDebug("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s", tscDebug("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pQueryInfo->type, taosArrayGetSize(pQueryInfo->exprList), pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pQueryInfo->type, taosArrayGetSize(pQueryInfo->exprList),
...@@ -517,20 +568,25 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { ...@@ -517,20 +568,25 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
SJoinSupporter* p = pSub->param; SJoinSupporter* p = pSub->param;
tscDestroyJoinSupporter(p); tscDestroyJoinSupporter(p);
if (pSub->res.code == TSDB_CODE_SUCCESS) {
taos_free_result(pSub); taos_free_result(pSub);
pSql->pSubs[i] = NULL;
} }
if (pSql->subState.states) {
pthread_mutex_destroy(&pSql->subState.mutex);
} }
tfree(pSql->subState.states);
pSql->subState.numOfSub = 0; pSql->subState.numOfSub = 0;
} }
static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { static void quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
assert(pSqlObj->subState.numOfRemain > 0); if (subAndCheckDone(pSqlSub, pSqlObj, pSupporter->subqueryIndex)) {
if (atomic_sub_fetch_32(&pSqlObj->subState.numOfRemain, 1) <= 0) {
tscError("%p all subquery return and query failed, global code:%s", pSqlObj, tstrerror(pSqlObj->res.code)); tscError("%p all subquery return and query failed, global code:%s", pSqlObj, tstrerror(pSqlObj->res.code));
freeJoinSubqueryObj(pSqlObj); freeJoinSubqueryObj(pSqlObj);
return;
} }
//tscDestroyJoinSupporter(pSupporter); //tscDestroyJoinSupporter(pSupporter);
...@@ -777,6 +833,15 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -777,6 +833,15 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)); assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY));
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code);
quitAllSubquery(pSql, pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql);
return;
}
// check for the error code firstly // check for the error code firstly
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
// todo retry if other subqueries are not failed // todo retry if other subqueries are not failed
...@@ -785,7 +850,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -785,7 +850,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex); tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
pParentSql->res.code = numOfRows; pParentSql->res.code = numOfRows;
quitAllSubquery(pParentSql, pSupporter); quitAllSubquery(pSql, pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
return; return;
...@@ -802,7 +867,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -802,7 +867,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
tscError("%p failed to malloc memory", pSql); tscError("%p failed to malloc memory", pSql);
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
quitAllSubquery(pParentSql, pSupporter); quitAllSubquery(pSql, pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
return; return;
...@@ -844,7 +909,8 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -844,7 +909,8 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// no data exists in next vnode, mark the <tid, tags> query completed // no data exists in next vnode, mark the <tid, tags> query completed
// only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets. // only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets.
if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
tscDebug("%p tagRetrieve:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub);
return; return;
} }
...@@ -891,7 +957,9 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -891,7 +957,9 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
((SJoinSupporter*)psub2->param)->pVgroupTables = tscVgroupTableInfoClone(pTableMetaInfo2->pVgroupTables); ((SJoinSupporter*)psub2->param)->pVgroupTables = tscVgroupTableInfoClone(pTableMetaInfo2->pVgroupTables);
pParentSql->subState.numOfSub = 2; pParentSql->subState.numOfSub = 2;
pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub;
memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub);
tscDebug("%p reset all sub states to 0", pParentSql);
for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) { for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) {
SSqlObj* sub = pParentSql->pSubs[m]; SSqlObj* sub = pParentSql->pSubs[m];
...@@ -915,6 +983,15 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -915,6 +983,15 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
assert(!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)); assert(!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE));
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code);
quitAllSubquery(pSql, pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql);
return;
}
// check for the error code firstly // check for the error code firstly
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
// todo retry if other subqueries are not failed yet // todo retry if other subqueries are not failed yet
...@@ -922,7 +999,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -922,7 +999,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex); tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
pParentSql->res.code = numOfRows; pParentSql->res.code = numOfRows;
quitAllSubquery(pParentSql, pSupporter); quitAllSubquery(pSql, pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
return; return;
...@@ -937,7 +1014,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -937,7 +1014,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
quitAllSubquery(pParentSql, pSupporter); quitAllSubquery(pSql, pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
...@@ -955,7 +1032,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -955,7 +1032,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
quitAllSubquery(pParentSql, pSupporter); quitAllSubquery(pSql, pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
...@@ -1009,7 +1086,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -1009,7 +1086,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
return; return;
} }
if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
return; return;
} }
...@@ -1049,6 +1126,17 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR ...@@ -1049,6 +1126,17 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code);
quitAllSubquery(pSql, pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql);
return;
}
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
assert(numOfRows == taos_errno(pSql)); assert(numOfRows == taos_errno(pSql));
...@@ -1088,9 +1176,8 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR ...@@ -1088,9 +1176,8 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
} }
} }
assert(pState->numOfRemain > 0); if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) { tscDebug("%p sub:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pState->numOfSub);
tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pState->numOfRemain, pState->numOfSub);
return; return;
} }
...@@ -1205,16 +1292,17 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1205,16 +1292,17 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
} }
} }
// get the number of subquery that need to retrieve the next vnode.
if (orderedPrjQuery) { if (orderedPrjQuery) {
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) { if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) {
pSql->subState.numOfRemain++; subquerySetState(pSub, &pSql->subState, i, 0);
} }
} }
} }
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) { if (pSub == NULL) {
...@@ -1270,7 +1358,19 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1270,7 +1358,19 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
// retrieve data from current vnode. // retrieve data from current vnode.
tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch); tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch);
SJoinSupporter* pSupporter = NULL; SJoinSupporter* pSupporter = NULL;
pSql->subState.numOfRemain = numOfFetch;
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSql1 = pSql->pSubs[i];
if (pSql1 == NULL) {
continue;
}
SSqlRes* pRes1 = &pSql1->res;
if (pRes1->row >= pRes1->numOfRows) {
subquerySetState(pSql1, &pSql->subState, i, 0);
}
}
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSql1 = pSql->pSubs[i]; SSqlObj* pSql1 = pSql->pSubs[i];
...@@ -1372,7 +1472,8 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -1372,7 +1472,8 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
// retrieve actual query results from vnode during the second stage join subquery // retrieve actual query results from vnode during the second stage join subquery
if (pParentSql->res.code != TSDB_CODE_SUCCESS) { if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, pParentSql->res.code); tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, pParentSql->res.code);
quitAllSubquery(pParentSql, pSupporter); quitAllSubquery(pSql, pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
return; return;
...@@ -1384,7 +1485,8 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -1384,7 +1485,8 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
tscError("%p abort query, code:%s, global code:%s", pSql, tstrerror(code), tstrerror(pParentSql->res.code)); tscError("%p abort query, code:%s, global code:%s", pSql, tstrerror(code), tstrerror(pParentSql->res.code));
pParentSql->res.code = code; pParentSql->res.code = code;
quitAllSubquery(pParentSql, pSupporter);
quitAllSubquery(pSql, pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
return; return;
...@@ -1408,7 +1510,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -1408,7 +1510,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
// In case of consequence query from other vnode, do not wait for other query response here. // In case of consequence query from other vnode, do not wait for other query response here.
if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) { if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) {
if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
return; return;
} }
} }
...@@ -1422,6 +1524,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -1422,6 +1524,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data
pSql->cmd.command = TSDB_SQL_FETCH; pSql->cmd.command = TSDB_SQL_FETCH;
tscProcessSql(pSql); tscProcessSql(pSql);
} else { // first retrieve from vnode during the secondary stage sub-query } else { // first retrieve from vnode during the secondary stage sub-query
// set the command flag must be after the semaphore been correctly set. // set the command flag must be after the semaphore been correctly set.
...@@ -1457,8 +1560,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter ...@@ -1457,8 +1560,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
pSql->pSubs[pSql->subState.numOfRemain++] = pNew; pSql->pSubs[tableIndex] = pNew;
assert(pSql->subState.numOfRemain <= pSql->subState.numOfSub);
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
addGroupInfoForSubquery(pSql, pNew, 0, tableIndex); addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
...@@ -1590,6 +1692,19 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -1590,6 +1692,19 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pSql->subState.numOfSub = pQueryInfo->numOfTables; pSql->subState.numOfSub = pQueryInfo->numOfTables;
if (pSql->subState.states == NULL) {
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states));
if (pSql->subState.states == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
pthread_mutex_init(&pSql->subState.mutex, NULL);
}
memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
tscDebug("%p reset all sub states to 0", pSql);
bool hasEmptySub = false; bool hasEmptySub = false;
tscDebug("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables); tscDebug("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables);
...@@ -1622,14 +1737,25 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -1622,14 +1737,25 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
(*pSql->fp)(pSql->param, pSql, 0); (*pSql->fp)(pSql->param, pSql, 0);
} else { } else {
int fail = 0;
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
if (fail) {
(*pSub->fp)(pSub->param, pSub, 0);
continue;
}
if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) { if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) {
pSql->subState.numOfRemain = i - 1; // the already sent request will continue and do not go to the error process routine pRes->code = code;
break; (*pSub->fp)(pSub->param, pSub, 0);
fail = 1;
} }
} }
if(fail) {
return;
}
pSql->cmd.command = TSDB_SQL_TABLE_JOIN_RETRIEVE; pSql->cmd.command = TSDB_SQL_TABLE_JOIN_RETRIEVE;
} }
...@@ -1728,7 +1854,21 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -1728,7 +1854,21 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
return ret; return ret;
} }
pState->numOfRemain = pState->numOfSub; if (pState->states == NULL) {
pState->states = calloc(pState->numOfSub, sizeof(*pState->states));
if (pState->states == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscAsyncResultOnError(pSql);
tfree(pMemoryBuf);
return ret;
}
pthread_mutex_init(&pState->mutex, NULL);
}
memset(pState->states, 0, sizeof(*pState->states) * pState->numOfSub);
tscDebug("%p reset all sub states to 0", pSql);
pRes->code = TSDB_CODE_SUCCESS; pRes->code = TSDB_CODE_SUCCESS;
int32_t i = 0; int32_t i = 0;
...@@ -1877,7 +2017,6 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO ...@@ -1877,7 +2017,6 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
assert(pSql != NULL); assert(pSql != NULL);
SSubqueryState* pState = &pParentSql->subState; SSubqueryState* pState = &pParentSql->subState;
assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0);
// retrieved in subquery failed. OR query cancelled in retrieve phase. // retrieved in subquery failed. OR query cancelled in retrieve phase.
if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) { if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) {
...@@ -1908,10 +2047,8 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO ...@@ -1908,10 +2047,8 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
} }
} }
int32_t remain = -1; if (!subAndCheckDone(pSql, pParentSql, subqueryIndex)) {
if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) { tscDebug("%p sub:%p,%d freed, not finished, total:%d", pParentSql, pSql, trsupport->subqueryIndex, pState->numOfSub);
tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex,
pState->numOfSub - remain);
tscFreeRetrieveSup(pSql); tscFreeRetrieveSup(pSql);
return; return;
...@@ -1980,10 +2117,8 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p ...@@ -1980,10 +2117,8 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
return; return;
} }
int32_t remain = -1; if (!subAndCheckDone(pSql, pParentSql, idx)) {
if ((remain = atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1)) > 0) { tscDebug("%p sub:%p orderOfSub:%d freed, not finished", pParentSql, pSql, trsupport->subqueryIndex);
tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex,
pState->numOfSub - remain);
tscFreeRetrieveSup(pSql); tscFreeRetrieveSup(pSql);
return; return;
...@@ -2033,7 +2168,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR ...@@ -2033,7 +2168,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
SSqlObj * pParentSql = trsupport->pParentSql; SSqlObj * pParentSql = trsupport->pParentSql;
SSubqueryState* pState = &pParentSql->subState; SSubqueryState* pState = &pParentSql->subState;
assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
...@@ -2254,7 +2388,8 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -2254,7 +2388,8 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
} }
} }
if (atomic_sub_fetch_32(&pParentObj->subState.numOfRemain, 1) > 0) { if (!subAndCheckDone(tres, pParentObj, pSupporter->index)) {
tscDebug("%p insert:%p,%d completed, total:%d", pParentObj, tres, pSupporter->index, pParentObj->subState.numOfSub);
return; return;
} }
...@@ -2288,6 +2423,8 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -2288,6 +2423,8 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, pSql->cmd.clauseIndex, 0); STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, pSql->cmd.clauseIndex, 0);
tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL); tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
subquerySetState(pSql, &pParentObj->subState, i, 0);
tscDebug("%p, failed sub:%d, %p", pParentObj, i, pSql); tscDebug("%p, failed sub:%d, %p", pParentObj, i, pSql);
} }
} }
...@@ -2302,7 +2439,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -2302,7 +2439,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
} }
pParentObj->cmd.parseFinished = false; pParentObj->cmd.parseFinished = false;
pParentObj->subState.numOfRemain = numOfFailed;
tscResetSqlCmdObj(&pParentObj->cmd); tscResetSqlCmdObj(&pParentObj->cmd);
...@@ -2378,7 +2514,19 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -2378,7 +2514,19 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
// the number of already initialized subqueries // the number of already initialized subqueries
int32_t numOfSub = 0; int32_t numOfSub = 0;
pSql->subState.numOfRemain = pSql->subState.numOfSub; if (pSql->subState.states == NULL) {
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states));
if (pSql->subState.states == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
pthread_mutex_init(&pSql->subState.mutex, NULL);
}
memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
tscDebug("%p reset all sub states to 0", pSql);
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
if (pSql->pSubs == NULL) { if (pSql->pSubs == NULL) {
goto _error; goto _error;
......
...@@ -441,6 +441,12 @@ static void tscFreeSubobj(SSqlObj* pSql) { ...@@ -441,6 +441,12 @@ static void tscFreeSubobj(SSqlObj* pSql) {
pSql->pSubs[i] = NULL; pSql->pSubs[i] = NULL;
} }
if (pSql->subState.states) {
pthread_mutex_destroy(&pSql->subState.mutex);
}
tfree(pSql->subState.states);
pSql->subState.numOfSub = 0; pSql->subState.numOfSub = 0;
} }
......
...@@ -82,151 +82,154 @@ ...@@ -82,151 +82,154 @@
#define TK_STABLES 64 #define TK_STABLES 64
#define TK_VGROUPS 65 #define TK_VGROUPS 65
#define TK_DROP 66 #define TK_DROP 66
#define TK_DNODE 67 #define TK_STABLE 67
#define TK_USER 68 #define TK_DNODE 68
#define TK_ACCOUNT 69 #define TK_USER 69
#define TK_USE 70 #define TK_ACCOUNT 70
#define TK_DESCRIBE 71 #define TK_USE 71
#define TK_ALTER 72 #define TK_DESCRIBE 72
#define TK_PASS 73 #define TK_ALTER 73
#define TK_PRIVILEGE 74 #define TK_PASS 74
#define TK_LOCAL 75 #define TK_PRIVILEGE 75
#define TK_IF 76 #define TK_LOCAL 76
#define TK_EXISTS 77 #define TK_IF 77
#define TK_PPS 78 #define TK_EXISTS 78
#define TK_TSERIES 79 #define TK_PPS 79
#define TK_DBS 80 #define TK_TSERIES 80
#define TK_STORAGE 81 #define TK_DBS 81
#define TK_QTIME 82 #define TK_STORAGE 82
#define TK_CONNS 83 #define TK_QTIME 83
#define TK_STATE 84 #define TK_CONNS 84
#define TK_KEEP 85 #define TK_STATE 85
#define TK_CACHE 86 #define TK_KEEP 86
#define TK_REPLICA 87 #define TK_CACHE 87
#define TK_QUORUM 88 #define TK_REPLICA 88
#define TK_DAYS 89 #define TK_QUORUM 89
#define TK_MINROWS 90 #define TK_DAYS 90
#define TK_MAXROWS 91 #define TK_MINROWS 91
#define TK_BLOCKS 92 #define TK_MAXROWS 92
#define TK_CTIME 93 #define TK_BLOCKS 93
#define TK_WAL 94 #define TK_CTIME 94
#define TK_FSYNC 95 #define TK_WAL 95
#define TK_COMP 96 #define TK_FSYNC 96
#define TK_PRECISION 97 #define TK_COMP 97
#define TK_UPDATE 98 #define TK_PRECISION 98
#define TK_CACHELAST 99 #define TK_UPDATE 99
#define TK_LP 100 #define TK_CACHELAST 100
#define TK_RP 101 #define TK_LP 101
#define TK_UNSIGNED 102 #define TK_RP 102
#define TK_TAGS 103 #define TK_UNSIGNED 103
#define TK_USING 104 #define TK_TAGS 104
#define TK_AS 105 #define TK_USING 105
#define TK_COMMA 106 #define TK_AS 106
#define TK_NULL 107 #define TK_COMMA 107
#define TK_SELECT 108 #define TK_NULL 108
#define TK_UNION 109 #define TK_SELECT 109
#define TK_ALL 110 #define TK_UNION 110
#define TK_FROM 111 #define TK_ALL 111
#define TK_VARIABLE 112 #define TK_FROM 112
#define TK_INTERVAL 113 #define TK_VARIABLE 113
#define TK_FILL 114 #define TK_INTERVAL 114
#define TK_SLIDING 115 #define TK_FILL 115
#define TK_ORDER 116 #define TK_SLIDING 116
#define TK_BY 117 #define TK_ORDER 117
#define TK_ASC 118 #define TK_BY 118
#define TK_DESC 119 #define TK_ASC 119
#define TK_GROUP 120 #define TK_DESC 120
#define TK_HAVING 121 #define TK_GROUP 121
#define TK_LIMIT 122 #define TK_HAVING 122
#define TK_OFFSET 123 #define TK_LIMIT 123
#define TK_SLIMIT 124 #define TK_OFFSET 124
#define TK_SOFFSET 125 #define TK_SLIMIT 125
#define TK_WHERE 126 #define TK_SOFFSET 126
#define TK_NOW 127 #define TK_WHERE 127
#define TK_RESET 128 #define TK_NOW 128
#define TK_QUERY 129 #define TK_RESET 129
#define TK_ADD 130 #define TK_QUERY 130
#define TK_COLUMN 131 #define TK_ADD 131
#define TK_TAG 132 #define TK_COLUMN 132
#define TK_CHANGE 133 #define TK_TAG 133
#define TK_SET 134 #define TK_CHANGE 134
#define TK_KILL 135 #define TK_SET 135
#define TK_CONNECTION 136 #define TK_KILL 136
#define TK_STREAM 137 #define TK_CONNECTION 137
#define TK_COLON 138 #define TK_STREAM 138
#define TK_ABORT 139 #define TK_COLON 139
#define TK_AFTER 140 #define TK_ABORT 140
#define TK_ATTACH 141 #define TK_AFTER 141
#define TK_BEFORE 142 #define TK_ATTACH 142
#define TK_BEGIN 143 #define TK_BEFORE 143
#define TK_CASCADE 144 #define TK_BEGIN 144
#define TK_CLUSTER 145 #define TK_CASCADE 145
#define TK_CONFLICT 146 #define TK_CLUSTER 146
#define TK_COPY 147 #define TK_CONFLICT 147
#define TK_DEFERRED 148 #define TK_COPY 148
#define TK_DELIMITERS 149 #define TK_DEFERRED 149
#define TK_DETACH 150 #define TK_DELIMITERS 150
#define TK_EACH 151 #define TK_DETACH 151
#define TK_END 152 #define TK_EACH 152
#define TK_EXPLAIN 153 #define TK_END 153
#define TK_FAIL 154 #define TK_EXPLAIN 154
#define TK_FOR 155 #define TK_FAIL 155
#define TK_IGNORE 156 #define TK_FOR 156
#define TK_IMMEDIATE 157 #define TK_IGNORE 157
#define TK_INITIALLY 158 #define TK_IMMEDIATE 158
#define TK_INSTEAD 159 #define TK_INITIALLY 159
#define TK_MATCH 160 #define TK_INSTEAD 160
#define TK_KEY 161 #define TK_MATCH 161
#define TK_OF 162 #define TK_KEY 162
#define TK_RAISE 163 #define TK_OF 163
#define TK_REPLACE 164 #define TK_RAISE 164
#define TK_RESTRICT 165 #define TK_REPLACE 165
#define TK_ROW 166 #define TK_RESTRICT 166
#define TK_STATEMENT 167 #define TK_ROW 167
#define TK_TRIGGER 168 #define TK_STATEMENT 168
#define TK_VIEW 169 #define TK_TRIGGER 169
#define TK_COUNT 170 #define TK_VIEW 170
#define TK_SUM 171 #define TK_COUNT 171
#define TK_AVG 172 #define TK_SUM 172
#define TK_MIN 173 #define TK_AVG 173
#define TK_MAX 174 #define TK_MIN 174
#define TK_FIRST 175 #define TK_MAX 175
#define TK_LAST 176 #define TK_FIRST 176
#define TK_TOP 177 #define TK_LAST 177
#define TK_BOTTOM 178 #define TK_TOP 178
#define TK_STDDEV 179 #define TK_BOTTOM 179
#define TK_PERCENTILE 180 #define TK_STDDEV 180
#define TK_APERCENTILE 181 #define TK_PERCENTILE 181
#define TK_LEASTSQUARES 182 #define TK_APERCENTILE 182
#define TK_HISTOGRAM 183 #define TK_LEASTSQUARES 183
#define TK_DIFF 184 #define TK_HISTOGRAM 184
#define TK_SPREAD 185 #define TK_DIFF 185
#define TK_TWA 186 #define TK_SPREAD 186
#define TK_INTERP 187 #define TK_TWA 187
#define TK_LAST_ROW 188 #define TK_INTERP 188
#define TK_RATE 189 #define TK_LAST_ROW 189
#define TK_IRATE 190 #define TK_RATE 190
#define TK_SUM_RATE 191 #define TK_IRATE 191
#define TK_SUM_IRATE 192 #define TK_SUM_RATE 192
#define TK_AVG_RATE 193 #define TK_SUM_IRATE 193
#define TK_AVG_IRATE 194 #define TK_AVG_RATE 194
#define TK_TBID 195 #define TK_AVG_IRATE 195
#define TK_SEMI 196 #define TK_TBID 196
#define TK_NONE 197 #define TK_SEMI 197
#define TK_PREV 198 #define TK_NONE 198
#define TK_LINEAR 199 #define TK_PREV 199
#define TK_IMPORT 200 #define TK_LINEAR 200
#define TK_METRIC 201 #define TK_IMPORT 201
#define TK_TBNAME 202 #define TK_METRIC 202
#define TK_JOIN 203 #define TK_TBNAME 203
#define TK_METRICS 204 #define TK_JOIN 204
#define TK_STABLE 205 #define TK_METRICS 205
#define TK_INSERT 206 #define TK_INSERT 206
#define TK_INTO 207 #define TK_INTO 207
#define TK_VALUES 208 #define TK_VALUES 208
#define TK_SPACE 300 #define TK_SPACE 300
#define TK_COMMENT 301 #define TK_COMMENT 301
#define TK_ILLEGAL 302 #define TK_ILLEGAL 302
......
...@@ -98,6 +98,7 @@ typedef struct SCreateTableSQL { ...@@ -98,6 +98,7 @@ typedef struct SCreateTableSQL {
typedef struct SAlterTableSQL { typedef struct SAlterTableSQL {
SStrToken name; SStrToken name;
int16_t tableType;
int16_t type; int16_t type;
STagData tagData; STagData tagData;
SArray *pAddColumns; // SArray<TAOS_FIELD> SArray *pAddColumns; // SArray<TAOS_FIELD>
...@@ -156,6 +157,7 @@ typedef struct tDCLSQL { ...@@ -156,6 +157,7 @@ typedef struct tDCLSQL {
int32_t nAlloc; /* Number of entries allocated below */ int32_t nAlloc; /* Number of entries allocated below */
SStrToken *a; /* one entry for element */ SStrToken *a; /* one entry for element */
bool existsCheck; bool existsCheck;
int16_t tableType;
union { union {
SCreateDBInfo dbOpt; SCreateDBInfo dbOpt;
...@@ -250,7 +252,7 @@ SCreateTableSQL *tSetCreateSqlElems(SArray *pCols, SArray *pTags, SQuerySQL *pSe ...@@ -250,7 +252,7 @@ SCreateTableSQL *tSetCreateSqlElems(SArray *pCols, SArray *pTags, SQuerySQL *pSe
void tSqlExprNodeDestroy(tSQLExpr *pExpr); void tSqlExprNodeDestroy(tSQLExpr *pExpr);
SAlterTableSQL * tAlterTableSqlElems(SStrToken *pTableName, SArray *pCols, SArray *pVals, int32_t type); SAlterTableSQL * tAlterTableSqlElems(SStrToken *pTableName, SArray *pCols, SArray *pVals, int32_t type, int16_t tableTable);
SCreatedTableInfo createNewChildTableInfo(SStrToken *pTableName, SArray *pTagVals, SStrToken *pToken, SStrToken* igExists); SCreatedTableInfo createNewChildTableInfo(SStrToken *pTableName, SArray *pTagVals, SStrToken *pToken, SStrToken* igExists);
void destroyAllSelectClause(SSubclauseInfo *pSql); void destroyAllSelectClause(SSubclauseInfo *pSql);
...@@ -267,7 +269,7 @@ void setCreatedTableName(SSqlInfo *pInfo, SStrToken *pTableNameToken, SStrToken ...@@ -267,7 +269,7 @@ void setCreatedTableName(SSqlInfo *pInfo, SStrToken *pTableNameToken, SStrToken
void SqlInfoDestroy(SSqlInfo *pInfo); void SqlInfoDestroy(SSqlInfo *pInfo);
void setDCLSQLElems(SSqlInfo *pInfo, int32_t type, int32_t nParams, ...); void setDCLSQLElems(SSqlInfo *pInfo, int32_t type, int32_t nParams, ...);
void setDropDbTableInfo(SSqlInfo *pInfo, int32_t type, SStrToken* pToken, SStrToken* existsCheck); void setDropDbTableInfo(SSqlInfo *pInfo, int32_t type, SStrToken* pToken, SStrToken* existsCheck,int16_t tableType);
void setShowOptions(SSqlInfo *pInfo, int32_t type, SStrToken* prefix, SStrToken* pPatterns); void setShowOptions(SSqlInfo *pInfo, int32_t type, SStrToken* prefix, SStrToken* pPatterns);
tDCLSQL *tTokenListAppend(tDCLSQL *pTokenList, SStrToken *pToken); tDCLSQL *tTokenListAppend(tDCLSQL *pTokenList, SStrToken *pToken);
......
...@@ -131,10 +131,16 @@ cmd ::= SHOW dbPrefix(X) VGROUPS ids(Y). { ...@@ -131,10 +131,16 @@ cmd ::= SHOW dbPrefix(X) VGROUPS ids(Y). {
//drop configure for tables //drop configure for tables
cmd ::= DROP TABLE ifexists(Y) ids(X) cpxName(Z). { cmd ::= DROP TABLE ifexists(Y) ids(X) cpxName(Z). {
X.n += Z.n; X.n += Z.n;
setDropDbTableInfo(pInfo, TSDB_SQL_DROP_TABLE, &X, &Y); setDropDbTableInfo(pInfo, TSDB_SQL_DROP_TABLE, &X, &Y, -1);
} }
cmd ::= DROP DATABASE ifexists(Y) ids(X). { setDropDbTableInfo(pInfo, TSDB_SQL_DROP_DB, &X, &Y); } //drop stable
cmd ::= DROP STABLE ifexists(Y) ids(X) cpxName(Z). {
X.n += Z.n;
setDropDbTableInfo(pInfo, TSDB_SQL_DROP_TABLE, &X, &Y, TSDB_SUPER_TABLE);
}
cmd ::= DROP DATABASE ifexists(Y) ids(X). { setDropDbTableInfo(pInfo, TSDB_SQL_DROP_DB, &X, &Y, -1); }
cmd ::= DROP DNODE ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_DROP_DNODE, 1, &X); } cmd ::= DROP DNODE ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_DROP_DNODE, 1, &X); }
cmd ::= DROP USER ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_DROP_USER, 1, &X); } cmd ::= DROP USER ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_DROP_USER, 1, &X); }
cmd ::= DROP ACCOUNT ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_DROP_ACCT, 1, &X); } cmd ::= DROP ACCOUNT ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_DROP_ACCT, 1, &X); }
...@@ -305,6 +311,8 @@ signed(A) ::= MINUS INTEGER(X). { A = -strtol(X.z, NULL, 10);} ...@@ -305,6 +311,8 @@ signed(A) ::= MINUS INTEGER(X). { A = -strtol(X.z, NULL, 10);}
////////////////////////////////// The CREATE TABLE statement /////////////////////////////// ////////////////////////////////// The CREATE TABLE statement ///////////////////////////////
cmd ::= CREATE TABLE create_table_args. {} cmd ::= CREATE TABLE create_table_args. {}
cmd ::= CREATE TABLE create_stable_args. {}
cmd ::= CREATE STABLE create_stable_args. {}
cmd ::= CREATE TABLE create_table_list(Z). { pInfo->type = TSDB_SQL_CREATE_TABLE; pInfo->pCreateTableInfo = Z;} cmd ::= CREATE TABLE create_table_list(Z). { pInfo->type = TSDB_SQL_CREATE_TABLE; pInfo->pCreateTableInfo = Z;}
%type create_table_list{SCreateTableSQL*} %type create_table_list{SCreateTableSQL*}
...@@ -333,7 +341,8 @@ create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP. { ...@@ -333,7 +341,8 @@ create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP. {
} }
// create super table // create super table
create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP TAGS LP columnlist(Y) RP. { %type create_stable_args{SCreateTableSQL*}
create_stable_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP TAGS LP columnlist(Y) RP. {
A = tSetCreateSqlElems(X, Y, NULL, TSQL_CREATE_STABLE); A = tSetCreateSqlElems(X, Y, NULL, TSQL_CREATE_STABLE);
setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE); setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE);
...@@ -683,7 +692,7 @@ cmd ::= RESET QUERY CACHE. { setDCLSQLElems(pInfo, TSDB_SQL_RESET_CACHE, 0);} ...@@ -683,7 +692,7 @@ cmd ::= RESET QUERY CACHE. { setDCLSQLElems(pInfo, TSDB_SQL_RESET_CACHE, 0);}
///////////////////////////////////ALTER TABLE statement////////////////////////////////// ///////////////////////////////////ALTER TABLE statement//////////////////////////////////
cmd ::= ALTER TABLE ids(X) cpxName(F) ADD COLUMN columnlist(A). { cmd ::= ALTER TABLE ids(X) cpxName(F) ADD COLUMN columnlist(A). {
X.n += F.n; X.n += F.n;
SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, A, NULL, TSDB_ALTER_TABLE_ADD_COLUMN); SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, A, NULL, TSDB_ALTER_TABLE_ADD_COLUMN, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE); setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
} }
...@@ -693,14 +702,14 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) DROP COLUMN ids(A). { ...@@ -693,14 +702,14 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) DROP COLUMN ids(A). {
toTSDBType(A.type); toTSDBType(A.type);
SArray* K = tVariantListAppendToken(NULL, &A, -1); SArray* K = tVariantListAppendToken(NULL, &A, -1);
SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, NULL, K, TSDB_ALTER_TABLE_DROP_COLUMN); SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, NULL, K, TSDB_ALTER_TABLE_DROP_COLUMN, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE); setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
} }
//////////////////////////////////ALTER TAGS statement///////////////////////////////////// //////////////////////////////////ALTER TAGS statement/////////////////////////////////////
cmd ::= ALTER TABLE ids(X) cpxName(Y) ADD TAG columnlist(A). { cmd ::= ALTER TABLE ids(X) cpxName(Y) ADD TAG columnlist(A). {
X.n += Y.n; X.n += Y.n;
SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, A, NULL, TSDB_ALTER_TABLE_ADD_TAG_COLUMN); SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, A, NULL, TSDB_ALTER_TABLE_ADD_TAG_COLUMN, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE); setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
} }
cmd ::= ALTER TABLE ids(X) cpxName(Z) DROP TAG ids(Y). { cmd ::= ALTER TABLE ids(X) cpxName(Z) DROP TAG ids(Y). {
...@@ -709,7 +718,7 @@ cmd ::= ALTER TABLE ids(X) cpxName(Z) DROP TAG ids(Y). { ...@@ -709,7 +718,7 @@ cmd ::= ALTER TABLE ids(X) cpxName(Z) DROP TAG ids(Y). {
toTSDBType(Y.type); toTSDBType(Y.type);
SArray* A = tVariantListAppendToken(NULL, &Y, -1); SArray* A = tVariantListAppendToken(NULL, &Y, -1);
SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, NULL, A, TSDB_ALTER_TABLE_DROP_TAG_COLUMN); SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, NULL, A, TSDB_ALTER_TABLE_DROP_TAG_COLUMN, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE); setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
} }
...@@ -722,7 +731,7 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) CHANGE TAG ids(Y) ids(Z). { ...@@ -722,7 +731,7 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) CHANGE TAG ids(Y) ids(Z). {
toTSDBType(Z.type); toTSDBType(Z.type);
A = tVariantListAppendToken(A, &Z, -1); A = tVariantListAppendToken(A, &Z, -1);
SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, NULL, A, TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN); SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, NULL, A, TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE); setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
} }
...@@ -733,7 +742,54 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). { ...@@ -733,7 +742,54 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). {
SArray* A = tVariantListAppendToken(NULL, &Y, -1); SArray* A = tVariantListAppendToken(NULL, &Y, -1);
A = tVariantListAppend(A, &Z, -1); A = tVariantListAppend(A, &Z, -1);
SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, NULL, A, TSDB_ALTER_TABLE_UPDATE_TAG_VAL); SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, NULL, A, TSDB_ALTER_TABLE_UPDATE_TAG_VAL, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
///////////////////////////////////ALTER STABLE statement//////////////////////////////////
cmd ::= ALTER STABLE ids(X) cpxName(F) ADD COLUMN columnlist(A). {
X.n += F.n;
SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, A, NULL, TSDB_ALTER_TABLE_ADD_COLUMN, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
cmd ::= ALTER STABLE ids(X) cpxName(F) DROP COLUMN ids(A). {
X.n += F.n;
toTSDBType(A.type);
SArray* K = tVariantListAppendToken(NULL, &A, -1);
SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, NULL, K, TSDB_ALTER_TABLE_DROP_COLUMN, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
//////////////////////////////////ALTER TAGS statement/////////////////////////////////////
cmd ::= ALTER STABLE ids(X) cpxName(Y) ADD TAG columnlist(A). {
X.n += Y.n;
SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, A, NULL, TSDB_ALTER_TABLE_ADD_TAG_COLUMN, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
cmd ::= ALTER STABLE ids(X) cpxName(Z) DROP TAG ids(Y). {
X.n += Z.n;
toTSDBType(Y.type);
SArray* A = tVariantListAppendToken(NULL, &Y, -1);
SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, NULL, A, TSDB_ALTER_TABLE_DROP_TAG_COLUMN, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
cmd ::= ALTER STABLE ids(X) cpxName(F) CHANGE TAG ids(Y) ids(Z). {
X.n += F.n;
toTSDBType(Y.type);
SArray* A = tVariantListAppendToken(NULL, &Y, -1);
toTSDBType(Z.type);
A = tVariantListAppendToken(A, &Z, -1);
SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, NULL, A, TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE); setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
} }
......
...@@ -585,11 +585,12 @@ SCreatedTableInfo createNewChildTableInfo(SStrToken *pTableName, SArray *pTagVal ...@@ -585,11 +585,12 @@ SCreatedTableInfo createNewChildTableInfo(SStrToken *pTableName, SArray *pTagVal
return info; return info;
} }
SAlterTableSQL *tAlterTableSqlElems(SStrToken *pTableName, SArray *pCols, SArray *pVals, int32_t type) { SAlterTableSQL *tAlterTableSqlElems(SStrToken *pTableName, SArray *pCols, SArray *pVals, int32_t type, int16_t tableType) {
SAlterTableSQL *pAlterTable = calloc(1, sizeof(SAlterTableSQL)); SAlterTableSQL *pAlterTable = calloc(1, sizeof(SAlterTableSQL));
pAlterTable->name = *pTableName; pAlterTable->name = *pTableName;
pAlterTable->type = type; pAlterTable->type = type;
pAlterTable->tableType = tableType;
if (type == TSDB_ALTER_TABLE_ADD_COLUMN || type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN) { if (type == TSDB_ALTER_TABLE_ADD_COLUMN || type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN) {
pAlterTable->pAddColumns = pCols; pAlterTable->pAddColumns = pCols;
...@@ -733,9 +734,10 @@ void setDCLSQLElems(SSqlInfo *pInfo, int32_t type, int32_t nParam, ...) { ...@@ -733,9 +734,10 @@ void setDCLSQLElems(SSqlInfo *pInfo, int32_t type, int32_t nParam, ...) {
va_end(va); va_end(va);
} }
void setDropDbTableInfo(SSqlInfo *pInfo, int32_t type, SStrToken* pToken, SStrToken* existsCheck) { void setDropDbTableInfo(SSqlInfo *pInfo, int32_t type, SStrToken* pToken, SStrToken* existsCheck, int16_t tableType) {
pInfo->type = type; pInfo->type = type;
pInfo->pDCLInfo = tTokenListAppend(pInfo->pDCLInfo, pToken); pInfo->pDCLInfo = tTokenListAppend(pInfo->pDCLInfo, pToken);
pInfo->pDCLInfo->tableType = tableType;
pInfo->pDCLInfo->existsCheck = (existsCheck->n == 1); pInfo->pDCLInfo->existsCheck = (existsCheck->n == 1);
} }
......
此差异已折叠。
...@@ -15,6 +15,7 @@ print =============== step1 - login ...@@ -15,6 +15,7 @@ print =============== step1 - login
system_content curl 127.0.0.1:7111/rest/ system_content curl 127.0.0.1:7111/rest/
print 1-> $system_content print 1-> $system_content
if $system_content != @{"status":"error","code":4357,"desc":"no auth info input"}@ then if $system_content != @{"status":"error","code":4357,"desc":"no auth info input"}@ then
print $system_content
return -1 return -1
endi endi
......
...@@ -415,6 +415,7 @@ sql select count(join_mt0.c1), sum(join_mt1.c2), first(join_mt0.c5), last(join_m ...@@ -415,6 +415,7 @@ sql select count(join_mt0.c1), sum(join_mt1.c2), first(join_mt0.c5), last(join_m
$val = 100 $val = 100
if $rows != $val then if $rows != $val then
print $rows
return -1 return -1
endi endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册