未验证 提交 09cbf619 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #1890 from taosdata/feature/subscribe

Feature/subscribe
...@@ -31,13 +31,13 @@ extern "C" { ...@@ -31,13 +31,13 @@ extern "C" {
#include "tscSecondaryMerge.h" #include "tscSecondaryMerge.h"
#include "tsclient.h" #include "tsclient.h"
#define UTIL_TABLE_IS_SUPERTABLE(metaInfo) \ #define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE)) (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE))
#define UTIL_TABLE_IS_CHILD_TABLE(metaInfo) \ #define UTIL_TABLE_IS_CHILD_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE)) (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE))
#define UTIL_TABLE_IS_NOMRAL_TABLE(metaInfo)\ #define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo)\
(!(UTIL_TABLE_IS_SUPERTABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo))) (!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
#define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0) #define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0)
...@@ -265,6 +265,10 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); ...@@ -265,6 +265,10 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()); void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)());
void* malloc_throw(size_t size);
void* calloc_throw(size_t nmemb, size_t size);
char* strdup_throw(const char* str);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -438,6 +438,9 @@ extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); ...@@ -438,6 +438,9 @@ extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows); typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows);
int32_t tscCompareTidTags(const void* p1, const void* p2);
void tscBuildVgroupTableInfo(STableMetaInfo* pTableMetaInfo, SArray* tables);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -471,7 +471,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -471,7 +471,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex); code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex);
pRes->code = code; pRes->code = code;
......
...@@ -122,7 +122,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) { ...@@ -122,7 +122,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
int32_t numOfRows = tscGetNumOfColumns(pMeta); int32_t numOfRows = tscGetNumOfColumns(pMeta);
int32_t totalNumOfRows = numOfRows + tscGetNumOfTags(pMeta); int32_t totalNumOfRows = numOfRows + tscGetNumOfTags(pMeta);
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
numOfRows = numOfRows + tscGetNumOfTags(pMeta); numOfRows = numOfRows + tscGetNumOfTags(pMeta);
} }
...@@ -161,7 +161,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) { ...@@ -161,7 +161,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
} }
} }
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
return 0; return 0;
} }
......
...@@ -796,7 +796,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { ...@@ -796,7 +796,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
return code; return code;
} }
if (!UTIL_TABLE_IS_SUPERTABLE(pSTableMeterMetaInfo)) { if (!UTIL_TABLE_IS_SUPER_TABLE(pSTableMeterMetaInfo)) {
return tscInvalidSQLErrMsg(pCmd->payload, "create table only from super table is allowed", sToken.z); return tscInvalidSQLErrMsg(pCmd->payload, "create table only from super table is allowed", sToken.z);
} }
...@@ -1097,7 +1097,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { ...@@ -1097,7 +1097,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
goto _error_clean; // TODO: should _clean or _error_clean to async flow ???? goto _error_clean; // TODO: should _clean or _error_clean to async flow ????
} }
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL); code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL);
goto _error_clean; goto _error_clean;
} }
......
...@@ -1348,7 +1348,7 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum ...@@ -1348,7 +1348,7 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum
STableComInfo tinfo = tscGetTableInfo(pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMeta);
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
numOfTotalColumns = tinfo.numOfColumns + tinfo.numOfTags; numOfTotalColumns = tinfo.numOfColumns + tinfo.numOfTags;
} else { } else {
numOfTotalColumns = tinfo.numOfColumns; numOfTotalColumns = tinfo.numOfColumns;
...@@ -1408,7 +1408,7 @@ int32_t addProjectionExprAndResultField(SQueryInfo* pQueryInfo, tSQLExprItem* pI ...@@ -1408,7 +1408,7 @@ int32_t addProjectionExprAndResultField(SQueryInfo* pQueryInfo, tSQLExprItem* pI
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
if (index.columnIndex >= tscGetNumOfColumns(pTableMeta) && UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { if (index.columnIndex >= tscGetNumOfColumns(pTableMeta) && UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1); return invalidSqlErrMsg(pQueryInfo->msg, msg1);
} }
...@@ -1862,7 +1862,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr ...@@ -1862,7 +1862,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
case TK_TBID: { case TK_TBID: {
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg7); return invalidSqlErrMsg(pQueryInfo->msg, msg7);
} }
...@@ -2279,7 +2279,7 @@ bool validateIpAddress(const char* ip, size_t size) { ...@@ -2279,7 +2279,7 @@ bool validateIpAddress(const char* ip, size_t size) {
int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) { int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (pTableMetaInfo->pTableMeta == NULL || !UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { if (pTableMetaInfo->pTableMeta == NULL || !UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} }
...@@ -2318,7 +2318,7 @@ int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) { ...@@ -2318,7 +2318,7 @@ int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) {
/* transfer the field-info back to original input format */ /* transfer the field-info back to original input format */
void tscRestoreSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) { void tscRestoreSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
return; return;
} }
...@@ -2542,7 +2542,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* ...@@ -2542,7 +2542,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
} }
if (groupTag) { if (groupTag) {
if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg9); return invalidSqlErrMsg(pQueryInfo->msg, msg9);
} }
...@@ -3254,7 +3254,7 @@ static bool validateJoinExprNode(SQueryInfo* pQueryInfo, tSQLExpr* pExpr, SColum ...@@ -3254,7 +3254,7 @@ static bool validateJoinExprNode(SQueryInfo* pQueryInfo, tSQLExpr* pExpr, SColum
} }
// table to table/ super table to super table are allowed // table to table/ super table to super table are allowed
if (UTIL_TABLE_IS_SUPERTABLE(pLeftMeterMeta) != UTIL_TABLE_IS_SUPERTABLE(pRightMeterMeta)) { if (UTIL_TABLE_IS_SUPER_TABLE(pLeftMeterMeta) != UTIL_TABLE_IS_SUPER_TABLE(pRightMeterMeta)) {
invalidSqlErrMsg(pQueryInfo->msg, msg5); invalidSqlErrMsg(pQueryInfo->msg, msg5);
return false; return false;
} }
...@@ -3337,7 +3337,7 @@ static int32_t handleExprInQueryCond(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, S ...@@ -3337,7 +3337,7 @@ static int32_t handleExprInQueryCond(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, S
} else if (index.columnIndex >= tscGetNumOfColumns(pTableMeta) || } else if (index.columnIndex >= tscGetNumOfColumns(pTableMeta) ||
index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { // query on tags index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { // query on tags
// check for tag query condition // check for tag query condition
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1); return invalidSqlErrMsg(pQueryInfo->msg, msg1);
} }
...@@ -3697,7 +3697,7 @@ static int32_t validateJoinExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr) { ...@@ -3697,7 +3697,7 @@ static int32_t validateJoinExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr) {
} }
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { // for stable join, tag columns if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // for stable join, tag columns
// must be present for join // must be present for join
if (pCondExpr->pJoinExpr == NULL) { if (pCondExpr->pJoinExpr == NULL) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1); return invalidSqlErrMsg(pQueryInfo->msg, msg1);
...@@ -3735,7 +3735,7 @@ static void cleanQueryExpr(SCondExpr* pCondExpr) { ...@@ -3735,7 +3735,7 @@ static void cleanQueryExpr(SCondExpr* pCondExpr) {
static void doAddJoinTagsColumnsIntoTagList(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr) { static void doAddJoinTagsColumnsIntoTagList(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type) && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { if (QUERY_IS_JOIN_QUERY(pQueryInfo->type) && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
SColumnIndex index = {0}; SColumnIndex index = {0};
getColumnIndexByName(&pCondExpr->pJoinExpr->pLeft->colInfo, pQueryInfo, &index); getColumnIndexByName(&pCondExpr->pJoinExpr->pLeft->colInfo, pQueryInfo, &index);
...@@ -4102,7 +4102,7 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) { ...@@ -4102,7 +4102,7 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) {
} }
/* for super table query, set default ascending order for group output */ /* for super table query, set default ascending order for group output */
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC; pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC;
} }
} }
...@@ -4128,7 +4128,7 @@ int32_t parseOrderbyClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema ...@@ -4128,7 +4128,7 @@ int32_t parseOrderbyClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema
* *
* for super table query, the order option must be less than 3. * for super table query, the order option must be less than 3.
*/ */
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
if (pSortorder->nExpr > 1) { if (pSortorder->nExpr > 1) {
return invalidSqlErrMsg(pQueryInfo->msg, msg0); return invalidSqlErrMsg(pQueryInfo->msg, msg0);
} }
...@@ -4149,7 +4149,7 @@ int32_t parseOrderbyClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema ...@@ -4149,7 +4149,7 @@ int32_t parseOrderbyClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema
SSQLToken columnName = {pVar->nLen, pVar->nType, pVar->pz}; SSQLToken columnName = {pVar->nLen, pVar->nType, pVar->pz};
SColumnIndex index = {0}; SColumnIndex index = {0};
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { // super table query if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // super table query
if (getColumnIndexByName(&columnName, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { if (getColumnIndexByName(&columnName, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1); return invalidSqlErrMsg(pQueryInfo->msg, msg1);
} }
...@@ -4302,10 +4302,10 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -4302,10 +4302,10 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if (pAlterSQL->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN || pAlterSQL->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN || if (pAlterSQL->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN || pAlterSQL->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN ||
pAlterSQL->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) { pAlterSQL->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) {
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg3); return invalidSqlErrMsg(pQueryInfo->msg, msg3);
} }
} else if ((pAlterSQL->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) && (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo))) { } else if ((pAlterSQL->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) && (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo))) {
return invalidSqlErrMsg(pQueryInfo->msg, msg4); return invalidSqlErrMsg(pQueryInfo->msg, msg4);
} else if ((pAlterSQL->type == TSDB_ALTER_TABLE_ADD_COLUMN || pAlterSQL->type == TSDB_ALTER_TABLE_DROP_COLUMN) && } else if ((pAlterSQL->type == TSDB_ALTER_TABLE_ADD_COLUMN || pAlterSQL->type == TSDB_ALTER_TABLE_DROP_COLUMN) &&
UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo)) { UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo)) {
...@@ -4691,7 +4691,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* ...@@ -4691,7 +4691,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL*
} }
// todo refactor // todo refactor
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
if (!tscQueryTags(pQueryInfo)) { // local handle the super table tag query if (!tscQueryTags(pQueryInfo)) { // local handle the super table tag query
if (tscIsProjectionQueryOnSTable(pQueryInfo, 0)) { if (tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
if (pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0) { if (pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0) {
...@@ -5627,7 +5627,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { ...@@ -5627,7 +5627,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return code; return code;
} }
bool isSTable = UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo); bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
if (parseSelectClause(&pSql->cmd, 0, pQuerySql->pSelection, isSTable) != TSDB_CODE_SUCCESS) { if (parseSelectClause(&pSql->cmd, 0, pQuerySql->pSelection, isSTable) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} }
...@@ -5771,7 +5771,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { ...@@ -5771,7 +5771,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
assert(pQueryInfo->numOfTables == pQuerySql->from->nExpr); assert(pQueryInfo->numOfTables == pQuerySql->from->nExpr);
bool isSTable = false; bool isSTable = false;
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
isSTable = true; isSTable = true;
code = tscGetSTableVgroupInfo(pSql, index); code = tscGetSTableVgroupInfo(pSql, index);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
...@@ -39,7 +39,7 @@ int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql); ...@@ -39,7 +39,7 @@ int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void tscProcessActivityTimer(void *handle, void *tmrId); void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0}; int tscKeepConn[TSDB_SQL_MAX] = {0};
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid); TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts); void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub); void tscSaveSubscriptionProgress(void* sub);
...@@ -500,7 +500,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -500,7 +500,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// todo valid the vgroupId at the client side // todo valid the vgroupId at the client side
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
int32_t vgIndex = pTableMetaInfo->vgroupIndex; int32_t vgIndex = pTableMetaInfo->vgroupIndex;
SVgroupsInfo* pVgroupInfo = pTableMetaInfo->vgroupList; SVgroupsInfo* pVgroupInfo = pTableMetaInfo->vgroupList;
...@@ -566,12 +566,13 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { ...@@ -566,12 +566,13 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) { static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) { if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
SCMVgroupInfo* pVgroupInfo = NULL; SCMVgroupInfo* pVgroupInfo = NULL;
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
int32_t index = pTableMetaInfo->vgroupIndex; int32_t index = pTableMetaInfo->vgroupIndex;
assert(index >= 0); assert(index >= 0);
...@@ -580,25 +581,25 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char ...@@ -580,25 +581,25 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
} else { } else {
pVgroupInfo = &pTableMeta->vgroupInfo; pVgroupInfo = &pTableMeta->vgroupInfo;
} }
tscSetDnodeIpList(pSql, pVgroupInfo); tscSetDnodeIpList(pSql, pVgroupInfo);
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->tid = htonl(pTableMeta->sid); pTableIdInfo->tid = htonl(pTableMeta->sid);
pTableIdInfo->uid = htobe64(pTableMeta->uid); pTableIdInfo->uid = htobe64(pTableMeta->uid);
pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid)); pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid, dfltKey));
pQueryMsg->numOfTables = htonl(1); // set the number of tables pQueryMsg->numOfTables = htonl(1); // set the number of tables
pMsg += sizeof(STableIdInfo); pMsg += sizeof(STableIdInfo);
} else { } else {
int32_t index = pTableMetaInfo->vgroupIndex; int32_t index = pTableMetaInfo->vgroupIndex;
int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables); int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);
assert(index >= 0 && index < numOfVgroups); assert(index >= 0 && index < numOfVgroups);
tscTrace("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups); tscTrace("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index); SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
// set the vgroup info // set the vgroup info
...@@ -615,7 +616,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char ...@@ -615,7 +616,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->tid = htonl(pItem->tid); pTableIdInfo->tid = htonl(pItem->tid);
pTableIdInfo->uid = htobe64(pItem->uid); pTableIdInfo->uid = htobe64(pItem->uid);
// pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid)); pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid, dfltKey));
pMsg += sizeof(STableIdInfo); pMsg += sizeof(STableIdInfo);
} }
} }
...@@ -2283,7 +2284,7 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) { ...@@ -2283,7 +2284,7 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true); taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
if (pTableMetaInfo->pTableMeta) { if (pTableMetaInfo->pTableMeta) {
bool isSuperTable = UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo); bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true); taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
// taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true); // taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true);
...@@ -2345,6 +2346,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { ...@@ -2345,6 +2346,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
for (int i = 0; i < numOfTables; i++) { for (int i = 0; i < numOfTables; i++) {
int64_t uid = htobe64(*(int64_t*)p); int64_t uid = htobe64(*(int64_t*)p);
p += sizeof(int64_t); p += sizeof(int64_t);
p += sizeof(int32_t); // skip tid
TSKEY key = htobe64(*(TSKEY*)p); TSKEY key = htobe64(*(TSKEY*)p);
p += sizeof(TSKEY); p += sizeof(TSKEY);
tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key); tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
......
...@@ -79,7 +79,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { ...@@ -79,7 +79,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
if (code == 0 && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { if (code == 0 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, 0); code = tscGetSTableVgroupInfo(pSql, 0);
pSql->res.code = code; pSql->res.code = code;
......
...@@ -44,8 +44,7 @@ typedef struct SSub { ...@@ -44,8 +44,7 @@ typedef struct SSub {
int interval; int interval;
TAOS_SUBSCRIBE_CALLBACK fp; TAOS_SUBSCRIBE_CALLBACK fp;
void * param; void * param;
int numOfTables; SArray* progress;
SSubscriptionProgress * progress;
} SSub; } SSub;
...@@ -57,92 +56,113 @@ static int tscCompareSubscriptionProgress(const void* a, const void* b) { ...@@ -57,92 +56,113 @@ static int tscCompareSubscriptionProgress(const void* a, const void* b) {
return 0; return 0;
} }
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid) { TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt) {
if (sub == NULL) if (sub == NULL) {
return 0; return dflt;
SSub* pSub = (SSub*)sub;
for (int s = 0, e = pSub->numOfTables; s < e;) {
int m = (s + e) / 2;
SSubscriptionProgress* p = pSub->progress + m;
if (p->uid > uid)
e = m;
else if (p->uid < uid)
s = m + 1;
else
return p->key;
} }
SSub* pSub = (SSub*)sub;
return 0; SSubscriptionProgress target = {.uid = uid, .key = 0};
SSubscriptionProgress* p = taosArraySearch(pSub->progress, tscCompareSubscriptionProgress, &target);
if (p == NULL) {
return dflt;
}
return p->key;
} }
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) { void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) {
if( sub == NULL) if( sub == NULL)
return; return;
SSub* pSub = (SSub*)sub; SSub* pSub = (SSub*)sub;
for (int s = 0, e = pSub->numOfTables; s < e;) {
int m = (s + e) / 2; SSubscriptionProgress target = {.uid = uid, .key = ts};
SSubscriptionProgress* p = pSub->progress + m; SSubscriptionProgress* p = taosArraySearch(pSub->progress, tscCompareSubscriptionProgress, &target);
if (p->uid > uid) if (p != NULL) {
e = m; p->key = ts;
else if (p->uid < uid)
s = m + 1;
else {
if (ts >= p->key) p->key = ts;
break;
}
} }
} }
static void asyncCallback(void *param, TAOS_RES *tres, int code) {
assert(param != NULL);
SSqlObj *pSql = ((SSqlObj *)param);
pSql->res.code = code;
sem_post(&pSql->rspSem);
}
static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* sql) { static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* sql) {
SSub* pSub = calloc(1, sizeof(SSub)); SSub* pSub = NULL;
if (pSub == NULL) {
terrno = TSDB_CODE_CLI_OUT_OF_MEMORY;
tscError("failed to allocate memory for subscription");
return NULL;
}
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); TRY( 8 ) {
if (pSql == NULL) { SSqlObj* pSql = calloc_throw(1, sizeof(SSqlObj));
terrno = TSDB_CODE_CLI_OUT_OF_MEMORY; CLEANUP_PUSH_FREE(true, pSql);
tscError("failed to allocate SSqlObj for subscription"); SSqlCmd *pCmd = &pSql->cmd;
goto _pSql_failed; SSqlRes *pRes = &pSql->res;
}
if (tsem_init(&pSql->rspSem, 0, 0) == -1) {
THROW(TAOS_SYSTEM_ERROR(errno));
}
CLEANUP_PUSH_INT_PTR(true, tsem_destroy, &pSql->rspSem);
pSql->signature = pSql; pSql->signature = pSql;
pSql->pTscObj = pObj; pSql->param = pSql;
pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
pSql->fp = asyncCallback;
char* sqlstr = (char*)malloc(strlen(sql) + 1); int code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (sqlstr == NULL) { if (code != TSDB_CODE_SUCCESS) {
tscError("failed to allocate sql string for subscription"); THROW(code);
goto failed; }
} CLEANUP_PUSH_FREE(true, pCmd->payload);
strcpy(sqlstr, sql);
strtolower(sqlstr, sqlstr);
pSql->sqlstr = sqlstr;
tsem_init(&pSql->rspSem, 0, 0); pRes->qhandle = 0;
pRes->numOfRows = 1;
SSqlRes *pRes = &pSql->res; pSql->sqlstr = strdup_throw(sql);
pRes->numOfRows = 1; CLEANUP_PUSH_FREE(true, pSql->sqlstr);
pRes->numOfTotal = 0; strtolower(pSql->sqlstr, pSql->sqlstr);
pSql->pSubscription = pSub; code = tsParseSql(pSql, false);
pSub->pSql = pSql; if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
pSub->signature = pSub; // wait for the callback function to post the semaphore
strncpy(pSub->topic, topic, sizeof(pSub->topic)); sem_wait(&pSql->rspSem);
pSub->topic[sizeof(pSub->topic) - 1] = 0; code = pSql->res.code;
return pSub; }
if (code != TSDB_CODE_SUCCESS) {
tscError("failed to parse sql statement: %s, error: %s", pSub->topic, tstrerror(code));
THROW( code );
}
if (pSql->cmd.command != TSDB_SQL_SELECT) {
tscError("only 'select' statement is allowed in subscription: %s", pSub->topic);
THROW( -1 ); // TODO
}
pSub = calloc_throw(1, sizeof(SSub));
CLEANUP_PUSH_FREE(true, pSub);
pSql->pSubscription = pSub;
pSub->pSql = pSql;
pSub->signature = pSub;
strncpy(pSub->topic, topic, sizeof(pSub->topic));
pSub->topic[sizeof(pSub->topic) - 1] = 0;
pSub->progress = taosArrayInit(32, sizeof(SSubscriptionProgress));
if (pSub->progress == NULL) {
THROW(TSDB_CODE_CLI_OUT_OF_MEMORY);
}
CLEANUP_EXECUTE();
failed: } CATCH( code ) {
tfree(sqlstr); tscError("failed to create subscription object: %s", tstrerror(code));
CLEANUP_EXECUTE();
pSub = NULL;
_pSql_failed: } END_TRY
tfree(pSql);
tfree(pSub); return pSub;
return NULL;
} }
...@@ -159,60 +179,68 @@ static void tscProcessSubscriptionTimer(void *handle, void *tmrId) { ...@@ -159,60 +179,68 @@ static void tscProcessSubscriptionTimer(void *handle, void *tmrId) {
} }
int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { static SArray* getTableList( SSqlObj* pSql ) {
int code = (uint8_t)tsParseSql(pSub->pSql, false); const char* p = strstr( pSql->sqlstr, " from " );
char* sql = alloca(strlen(p) + 32);
sprintf(sql, "select tbid(tbname)%s", p);
int code = taos_query( pSql->pTscObj, sql );
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscError("failed to parse sql statement: %s", pSub->topic); tscError("failed to retrieve table id: %s", tstrerror(code));
return 0; return NULL;
} }
SSqlCmd* pCmd = &pSub->pSql->cmd; TAOS_RES* res = taos_use_result( pSql->pTscObj );
if (pCmd->command != TSDB_SQL_SELECT) { TAOS_ROW row;
tscError("only 'select' statement is allowed in subscription: %s", pSub->topic); SArray* result = taosArrayInit( 128, sizeof(STidTags) );
return 0; while ((row = taos_fetch_row(res))) {
STidTags tags;
memcpy(&tags, row[0], sizeof(tags));
taosArrayPush(result, &tags);
} }
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0, 0); return result;
int numOfTables = 0; }
if (!UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
// SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta;
// for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
// SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
// numOfTables += pVnodeSidList->numOfSids;
// }
}
SSubscriptionProgress* progress = (SSubscriptionProgress*)calloc(numOfTables, sizeof(SSubscriptionProgress));
if (progress == NULL) { static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
tscError("failed to allocate memory for progress: %s", pSub->topic); SSqlObj* pSql = pSub->pSql;
return 0;
SSqlCmd* pCmd = &pSql->cmd;
pSub->lastSyncTime = taosGetTimestampMs();
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SSubscriptionProgress target = {.uid = pTableMeta->uid, .key = 0};
SSubscriptionProgress* p = taosArraySearch(pSub->progress, tscCompareSubscriptionProgress, &target);
if (p == NULL) {
taosArrayClear(pSub->progress);
taosArrayPush(pSub->progress, &target);
}
return 1;
} }
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { SArray* tables = getTableList(pSql);
numOfTables = 1; size_t numOfTables = taosArrayGetSize(tables);
int64_t uid = pTableMetaInfo->pTableMeta->uid;
progress[0].uid = uid; SArray* progress = taosArrayInit(numOfTables, sizeof(SSubscriptionProgress));
progress[0].key = tscGetSubscriptionProgress(pSub, uid); for( size_t i = 0; i < numOfTables; i++ ) {
} else { STidTags* tt = taosArrayGet( tables, i );
// SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta; SSubscriptionProgress p = { .uid = tt->uid };
// numOfTables = 0; p.key = tscGetSubscriptionProgress(pSub, tt->uid, INT64_MIN);
// for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) { taosArrayPush(progress, &p);
// SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
// for (int32_t j = 0; j < pVnodeSidList->numOfSids; j++) {
// STableIdInfo *pTableMetaInfo = tscGetMeterSidInfo(pVnodeSidList, j);
// int64_t uid = pTableMetaInfo->uid;
// progress[numOfTables].uid = uid;
// progress[numOfTables++].key = tscGetSubscriptionProgress(pSub, uid);
// }
// }
qsort(progress, numOfTables, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress);
} }
taosArraySort(progress, tscCompareSubscriptionProgress);
free(pSub->progress); taosArrayDestroy(pSub->progress);
pSub->numOfTables = numOfTables;
pSub->progress = progress; pSub->progress = progress;
pSub->lastSyncTime = taosGetTimestampMs(); if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
taosArraySort( tables, tscCompareTidTags );
tscBuildVgroupTableInfo( pTableMetaInfo, tables );
}
taosArrayDestroy(tables);
return 1; return 1;
} }
...@@ -248,32 +276,22 @@ static int tscLoadSubscriptionProgress(SSub* pSub) { ...@@ -248,32 +276,22 @@ static int tscLoadSubscriptionProgress(SSub* pSub) {
return 0; return 0;
} }
if (fgets(buf, sizeof(buf), fp) == NULL || atoi(buf) < 0) { SArray* progress = pSub->progress;
tscTrace("invalid subscription progress file: %s", pSub->topic); taosArrayClear(progress);
fclose(fp); while( 1 ) {
return 0;
}
int numOfTables = atoi(buf);
SSubscriptionProgress* progress = calloc(numOfTables, sizeof(SSubscriptionProgress));
for (int i = 0; i < numOfTables; i++) {
if (fgets(buf, sizeof(buf), fp) == NULL) { if (fgets(buf, sizeof(buf), fp) == NULL) {
fclose(fp); fclose(fp);
free(progress);
return 0; return 0;
} }
int64_t uid, key; SSubscriptionProgress p;
sscanf(buf, "%" SCNd64 ":%" SCNd64, &uid, &key); sscanf(buf, "%" SCNd64 ":%" SCNd64, &p.uid, &p.key);
progress[i].uid = uid; taosArrayPush(progress, &p);
progress[i].key = key;
} }
fclose(fp); fclose(fp);
qsort(progress, numOfTables, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress); taosArraySort(progress, tscCompareSubscriptionProgress);
pSub->numOfTables = numOfTables; tscTrace("subscription progress loaded, %d tables: %s", taosArrayGetSize(progress), pSub->topic);
pSub->progress = progress;
tscTrace("subscription progress loaded, %d tables: %s", numOfTables, pSub->topic);
return 1; return 1;
} }
...@@ -294,11 +312,10 @@ void tscSaveSubscriptionProgress(void* sub) { ...@@ -294,11 +312,10 @@ void tscSaveSubscriptionProgress(void* sub) {
} }
fputs(pSub->pSql->sqlstr, fp); fputs(pSub->pSql->sqlstr, fp);
fprintf(fp, "\n%d\n", pSub->numOfTables); fprintf(fp, "\n");
for (int i = 0; i < pSub->numOfTables; i++) { for(size_t i = 0; i < taosArrayGetSize(pSub->progress); i++) {
int64_t uid = pSub->progress[i].uid; SSubscriptionProgress* p = taosArrayGet(pSub->progress, i);
TSKEY key = pSub->progress[i].key; fprintf(fp, "%" PRId64 ":%" PRId64 "\n", p->uid, p->key);
fprintf(fp, "%" PRId64 ":%" PRId64 "\n", uid, key);
} }
fclose(fp); fclose(fp);
...@@ -363,35 +380,34 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { ...@@ -363,35 +380,34 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
tscRemoveFromSqlList(pSql); tscRemoveFromSqlList(pSql);
if (taosGetTimestampMs() - pSub->lastSyncTime > 10 * 60 * 1000) { if (taosGetTimestampMs() - pSub->lastSyncTime > 10 * 60 * 1000) {
tscTrace("begin meter synchronization"); tscTrace("begin table synchronization");
char* sqlstr = pSql->sqlstr;
pSql->sqlstr = NULL;
taos_free_result_imp(pSql, 0);
pSql->sqlstr = sqlstr;
taosCacheEmpty(tscCacheHandle);
if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL; if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL;
tscTrace("meter synchronization completed"); tscTrace("table synchronization completed");
} else {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
uint32_t type = pQueryInfo->type;
taos_free_result_imp(pSql, 1);
pRes->numOfRows = 1;
pRes->numOfTotal = 0;
pRes->qhandle = 0;
pSql->cmd.command = TSDB_SQL_SELECT;
pQueryInfo->type = type;
tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vgroupIndex = 0;
} }
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
uint32_t type = pQueryInfo->type;
tscFreeSqlResult(pSql);
pRes->numOfRows = 1;
pRes->qhandle = 0;
pSql->cmd.command = TSDB_SQL_SELECT;
pQueryInfo->type = type;
tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vgroupIndex = 0;
pSql->fp = asyncCallback;
pSql->param = pSql;
tscDoQuery(pSql); tscDoQuery(pSql);
if (pRes->code != TSDB_CODE_NOT_ACTIVE_TABLE) { sem_wait(&pSql->rspSem);
break;
if (pRes->code != TSDB_CODE_SUCCESS) {
continue;
} }
// meter was removed, make sync time zero, so that next retry will // meter was removed, make sync time zero, so that next retry will
// do synchronization first // do synchronization first
pSub->lastSyncTime = 0; pSub->lastSyncTime = 0;
break;
} }
if (pRes->code != TSDB_CODE_SUCCESS) { if (pRes->code != TSDB_CODE_SUCCESS) {
...@@ -421,7 +437,7 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) { ...@@ -421,7 +437,7 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
} }
tscFreeSqlObj(pSub->pSql); tscFreeSqlObj(pSub->pSql);
free(pSub->progress); taosArrayDestroy(pSub->progress);
memset(pSub, 0, sizeof(*pSub)); memset(pSub, 0, sizeof(*pSub));
free(pSub); free(pSub);
} }
...@@ -330,7 +330,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { ...@@ -330,7 +330,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
pNewQueryInfo->limit = pSupporter->limit; pNewQueryInfo->limit = pSupporter->limit;
// fetch the join tag column // fetch the join tag column
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
SSqlExpr* pExpr = tscSqlExprGet(pNewQueryInfo, 0); SSqlExpr* pExpr = tscSqlExprGet(pNewQueryInfo, 0);
assert(pQueryInfo->tagCond.joinInfo.hasJoin); assert(pQueryInfo->tagCond.joinInfo.hasJoin);
...@@ -463,77 +463,51 @@ static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* ...@@ -463,77 +463,51 @@ static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj*
} }
} }
int32_t tagsOrderCompar(const void* p1, const void* p2) { int32_t tscCompareTidTags(const void* p1, const void* p2) {
STidTags* t1 = (STidTags*) p1; const STidTags* t1 = (const STidTags*) p1;
STidTags* t2 = (STidTags*) p2; const STidTags* t2 = (const STidTags*) p2;
if (t1->vgId != t2->vgId) { if (t1->vgId != t2->vgId) {
return (t1->vgId > t2->vgId)? 1:-1; return (t1->vgId > t2->vgId) ? 1 : -1;
} else { }
if (t1->tid != t2->tid) { if (t1->tid != t2->tid) {
return (t1->tid > t2->tid)? 1:-1; return (t1->tid > t2->tid) ? 1 : -1;
} else {
return 0;
}
} }
return 0;
} }
static void doBuildVgroupTableInfo(SArray* res, STableMetaInfo* pTableMetaInfo) { void tscBuildVgroupTableInfo(STableMetaInfo* pTableMetaInfo, SArray* tables) {
SArray* pGroup = taosArrayInit(4, sizeof(SVgroupTableInfo)); SArray* result = taosArrayInit( 4, sizeof(SVgroupTableInfo) );
SArray* vgTables = NULL;
SArray* vgTableIdItem = taosArrayInit(4, sizeof(STableIdInfo)); STidTags* prev = NULL;
int32_t size = taosArrayGetSize(res);
size_t numOfTables = taosArrayGetSize( tables );
STidTags* prev = taosArrayGet(res, 0); for( size_t i = 0; i < numOfTables; i++ ) {
int32_t prevVgId = prev->vgId; STidTags* tt = taosArrayGet( tables, i );
STableIdInfo item = {.uid = prev->uid, .tid = prev->tid, .key = INT64_MIN}; if( prev == NULL || tt->vgId != prev->vgId ) {
taosArrayPush(vgTableIdItem, &item);
for(int32_t k = 1; k < size; ++k) {
STidTags* t1 = taosArrayGet(res, k);
if (prevVgId != t1->vgId) {
SVgroupTableInfo info = {0};
SVgroupsInfo* pvg = pTableMetaInfo->vgroupList; SVgroupsInfo* pvg = pTableMetaInfo->vgroupList;
for(int32_t m = 0; m < pvg->numOfVgroups; ++m) {
if (prevVgId == pvg->vgroups[m].vgId) { SVgroupTableInfo info = { 0 };
for( int32_t m = 0; m < pvg->numOfVgroups; ++m ) {
if( tt->vgId == pvg->vgroups[m].vgId ) {
info.vgInfo = pvg->vgroups[m]; info.vgInfo = pvg->vgroups[m];
break; break;
} }
} }
assert( info.vgInfo.numOfIps != 0 );
assert(info.vgInfo.numOfIps != 0);
info.itemList = vgTableIdItem; vgTables = taosArrayInit( 4, sizeof(STableIdInfo) );
taosArrayPush(pGroup, &info); info.itemList = vgTables;
taosArrayPush( result, &info );
vgTableIdItem = taosArrayInit(4, sizeof(STableIdInfo));
STableIdInfo item1 = {.uid = t1->uid, .tid = t1->tid, .key = INT64_MIN};
taosArrayPush(vgTableIdItem, &item1);
prevVgId = t1->vgId;
} else {
taosArrayPush(vgTableIdItem, &item);
}
}
if (taosArrayGetSize(vgTableIdItem) > 0) {
SVgroupTableInfo info = {0};
SVgroupsInfo* pvg = pTableMetaInfo->vgroupList;
for(int32_t m = 0; m < pvg->numOfVgroups; ++m) {
if (prevVgId == pvg->vgroups[m].vgId) {
info.vgInfo = pvg->vgroups[m];
break;
}
} }
assert(info.vgInfo.numOfIps != 0); STableIdInfo item = { .uid = tt->uid, .tid = tt->tid, .key = INT64_MIN };
info.itemList = vgTableIdItem; taosArrayPush( vgTables, &item );
taosArrayPush(pGroup, &info); prev = tt;
} }
pTableMetaInfo->pVgroupTables = pGroup; pTableMetaInfo->pVgroupTables = result;
} }
static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) { static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) {
...@@ -627,8 +601,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -627,8 +601,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
SJoinSupporter* p1 = pParentSql->pSubs[0]->param; SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSupporter* p2 = pParentSql->pSubs[1]->param; SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
qsort(p1->pIdTagList, p1->num, p1->tagSize, tagsOrderCompar); qsort(p1->pIdTagList, p1->num, p1->tagSize, tscCompareTidTags);
qsort(p2->pIdTagList, p2->num, p2->tagSize, tagsOrderCompar); qsort(p2->pIdTagList, p2->num, p2->tagSize, tscCompareTidTags);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
...@@ -668,11 +642,11 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -668,11 +642,11 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0); SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0);
STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0); STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0);
doBuildVgroupTableInfo(s1, pTableMetaInfo1); tscBuildVgroupTableInfo(pTableMetaInfo1, s1);
SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0); SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0);
STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0); STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0);
doBuildVgroupTableInfo(s2, pTableMetaInfo2); tscBuildVgroupTableInfo(pTableMetaInfo2, s2);
pSupporter->pState->numOfCompleted = 0; pSupporter->pState->numOfCompleted = 0;
pSupporter->pState->code = 0; pSupporter->pState->code = 0;
...@@ -1096,7 +1070,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter ...@@ -1096,7 +1070,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
tscInitQueryInfo(pNewQueryInfo); tscInitQueryInfo(pNewQueryInfo);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { // return the tableId & tag if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // return the tableId & tag
SSchema s = {0}; SSchema s = {0};
SColumnIndex index = {0}; SColumnIndex index = {0};
......
...@@ -157,7 +157,7 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { ...@@ -157,7 +157,7 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
} }
// for select query super table, the super table vgroup list can not be null in any cases. // for select query super table, the super table vgroup list can not be null in any cases.
if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
assert(pTableMetaInfo->vgroupList != NULL); assert(pTableMetaInfo->vgroupList != NULL);
} }
...@@ -172,7 +172,7 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { ...@@ -172,7 +172,7 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
if (((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) != TSDB_QUERY_TYPE_STABLE_SUBQUERY) && if (((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) != TSDB_QUERY_TYPE_STABLE_SUBQUERY) &&
pQueryInfo->command == TSDB_SQL_SELECT) { pQueryInfo->command == TSDB_SQL_SELECT) {
return UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo); return UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
} }
return false; return false;
...@@ -187,7 +187,7 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { ...@@ -187,7 +187,7 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
* 4. show queries, instead of a select query * 4. show queries, instead of a select query
*/ */
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
if (pTableMetaInfo == NULL || !UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo) || if (pTableMetaInfo == NULL || !UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) ||
pQueryInfo->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || numOfExprs == 0) { pQueryInfo->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || numOfExprs == 0) {
return false; return false;
} }
...@@ -1350,7 +1350,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId) { ...@@ -1350,7 +1350,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId) {
return false; return false;
} }
if (colId == -1 && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { if (colId == -1 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
return true; return true;
} }
...@@ -1461,10 +1461,11 @@ bool tscShouldFreeHeatBeat(SSqlObj* pHb) { ...@@ -1461,10 +1461,11 @@ bool tscShouldFreeHeatBeat(SSqlObj* pHb) {
} }
/* /*
* the following three kinds of SqlObj should not be freed * the following four kinds of SqlObj should not be freed
* 1. SqlObj for stream computing * 1. SqlObj for stream computing
* 2. main SqlObj * 2. main SqlObj
* 3. heartbeat SqlObj * 3. heartbeat SqlObj
* 4. SqlObj for subscription
* *
* If res code is error and SqlObj does not belong to above types, it should be * If res code is error and SqlObj does not belong to above types, it should be
* automatically freed for async query, ignoring that connection should be kept. * automatically freed for async query, ignoring that connection should be kept.
...@@ -1477,7 +1478,7 @@ bool tscShouldBeFreed(SSqlObj* pSql) { ...@@ -1477,7 +1478,7 @@ bool tscShouldBeFreed(SSqlObj* pSql) {
} }
STscObj* pTscObj = pSql->pTscObj; STscObj* pTscObj = pSql->pTscObj;
if (pSql->pStream != NULL || pTscObj->pHb == pSql || pTscObj->pSql == pSql) { if (pSql->pStream != NULL || pTscObj->pHb == pSql || pTscObj->pSql == pSql || pSql->pSubscription != NULL) {
return false; return false;
} }
...@@ -1874,7 +1875,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -1874,7 +1875,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
assert(pNewQueryInfo->numOfTables == 1); assert(pNewQueryInfo->numOfTables == 1);
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
assert(pFinalInfo->vgroupList != NULL); assert(pFinalInfo->vgroupList != NULL);
} }
...@@ -2009,7 +2010,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) { ...@@ -2009,7 +2010,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) {
// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); // SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); // STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo) || (pTableMetaInfo->pMetricMeta == NULL)) { // if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) || (pTableMetaInfo->pMetricMeta == NULL)) {
return false; return false;
// } // }
...@@ -2169,3 +2170,26 @@ void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t column ...@@ -2169,3 +2170,26 @@ void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t column
} }
} }
void* malloc_throw(size_t size) {
void* p = malloc(size);
if (p == NULL) {
THROW(TSDB_CODE_CLI_OUT_OF_MEMORY);
}
return p;
}
void* calloc_throw(size_t nmemb, size_t size) {
void* p = calloc(nmemb, size);
if (p == NULL) {
THROW(TSDB_CODE_CLI_OUT_OF_MEMORY);
}
return p;
}
char* strdup_throw(const char* str) {
char* p = strdup(str);
if (p == NULL) {
THROW(TSDB_CODE_CLI_OUT_OF_MEMORY);
}
return p;
}
...@@ -182,6 +182,7 @@ typedef struct SQInfo { ...@@ -182,6 +182,7 @@ typedef struct SQInfo {
SQueryRuntimeEnv runtimeEnv; SQueryRuntimeEnv runtimeEnv;
int32_t groupIndex; int32_t groupIndex;
int32_t offset; // offset in group result set of subgroup, todo refactor int32_t offset; // offset in group result set of subgroup, todo refactor
SArray* arrTableIdInfo;
T_REF_DECLARE() T_REF_DECLARE()
/* /*
......
...@@ -113,7 +113,6 @@ static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); ...@@ -113,7 +113,6 @@ static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols); static void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols);
static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static bool hasMainOutput(SQuery *pQuery); static bool hasMainOutput(SQuery *pQuery);
static void createTableQueryInfo(SQInfo *pQInfo);
static void buildTagQueryResult(SQInfo *pQInfo); static void buildTagQueryResult(SQInfo *pQInfo);
static int32_t setAdditionalInfo(SQInfo *pQInfo, STableId *pTableId, STableQueryInfo *pTableQueryInfo); static int32_t setAdditionalInfo(SQInfo *pQInfo, STableId *pTableId, STableQueryInfo *pTableQueryInfo);
...@@ -3463,7 +3462,11 @@ static bool hasMainOutput(SQuery *pQuery) { ...@@ -3463,7 +3462,11 @@ static bool hasMainOutput(SQuery *pQuery) {
return false; return false;
} }
STableQueryInfo *createTableQueryInfoImpl(SQueryRuntimeEnv *pRuntimeEnv, STableId tableId, STimeWindow win) { static STableQueryInfo *createTableQueryInfo(
SQueryRuntimeEnv *pRuntimeEnv,
STableId tableId,
STimeWindow win
) {
STableQueryInfo *pTableQueryInfo = calloc(1, sizeof(STableQueryInfo)); STableQueryInfo *pTableQueryInfo = calloc(1, sizeof(STableQueryInfo));
pTableQueryInfo->win = win; pTableQueryInfo->win = win;
...@@ -3870,7 +3873,18 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data ...@@ -3870,7 +3873,18 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
data += bytes * numOfRows; data += bytes * numOfRows;
} }
int32_t numOfTables = (int32_t)taosArrayGetSize(pQInfo->arrTableIdInfo);
*(int32_t*)data = htonl(numOfTables);
data += sizeof(int32_t);
for(int32_t i = 0; i < numOfTables; i++) {
STableIdInfo* pSrc = taosArrayGet(pQInfo->arrTableIdInfo, i);
STableIdInfo* pDst = (STableIdInfo*)data;
pDst->uid = htobe64(pSrc->uid);
pDst->tid = htonl(pSrc->tid);
pDst->key = htobe64(pSrc->key);
data += sizeof(STableIdInfo);
}
// all data returned, set query over // all data returned, set query over
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
if (pQInfo->runtimeEnv.stableQuery && isIntervalQuery(pQuery)) { if (pQInfo->runtimeEnv.stableQuery && isIntervalQuery(pQuery)) {
...@@ -4149,6 +4163,42 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -4149,6 +4163,42 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv) {
return true; return true;
} }
static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
if (onlyQueryTags(pQuery)) {
return;
}
if (isSTableQuery && (!isIntervalQuery(pQuery)) && (!isFixedOutputQuery(pQuery))) {
return;
}
STsdbQueryCond cond = {
.twindow = pQuery->window,
.order = pQuery->order.order,
.colList = pQuery->colList,
.numOfCols = pQuery->numOfCols,
};
if (!isSTableQuery
&& (pQInfo->groupInfo.numOfTables == 1)
&& (cond.order == TSDB_ORDER_ASC)
&& (!isIntervalQuery(pQuery))
&& (!isGroupbyNormalCol(pQuery->pGroupbyExpr))
&& (!isFixedOutputQuery(pQuery))
) {
SArray* pa = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
SGroupItem* pItem = taosArrayGet(pa, 0);
cond.twindow = pItem->info->win;
}
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo);
}
int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool isSTableQuery) { int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool isSTableQuery) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
...@@ -4157,26 +4207,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool ...@@ -4157,26 +4207,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
setScanLimitationByResultBuffer(pQuery); setScanLimitationByResultBuffer(pQuery);
changeExecuteScanOrder(pQuery, false); changeExecuteScanOrder(pQuery, false);
setupQueryHandle(tsdb, pQInfo, isSTableQuery);
STsdbQueryCond cond = {
.twindow = pQuery->window,
.order = pQuery->order.order,
.colList = pQuery->colList,
.numOfCols = pQuery->numOfCols,
};
// normal query setup the queryhandle here
if (!onlyQueryTags(pQuery)) {
if (!isSTableQuery && isFirstLastRowQuery(pQuery)) { // in case of last_row query, invoke a different API.
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableIdGroupInfo);
} else if (!isSTableQuery || isIntervalQuery(pQuery) || isFixedOutputQuery(pQuery)) {
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo);
}
// create the table query support structures
createTableQueryInfo(pQInfo);
}
pQInfo->tsdb = tsdb; pQInfo->tsdb = tsdb;
pQInfo->vgId = vgId; pQInfo->vgId = vgId;
...@@ -4595,6 +4626,13 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4595,6 +4626,13 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
* to ensure that, we can reset the query range once query on a meter is completed. * to ensure that, we can reset the query range once query on a meter is completed.
*/ */
pQInfo->tableIndex++; pQInfo->tableIndex++;
STableIdInfo tidInfo;
tidInfo.uid = item->id.uid;
tidInfo.tid = item->id.tid;
tidInfo.key = pQuery->current->lastKey;
taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo);
// if the buffer is full or group by each table, we need to jump out of the loop // if the buffer is full or group by each table, we need to jump out of the loop
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL) /*|| if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL) /*||
isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)*/) { isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)*/) {
...@@ -4664,35 +4702,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4664,35 +4702,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pQuery->limit.offset); pQuery->limit.offset);
} }
static void createTableQueryInfo(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
// todo make sure the table are added the reference count to gauranteed that all involved tables are valid
size_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
int32_t index = 0;
for (int32_t i = 0; i < numOfGroups; ++i) { // load all meter meta info
SArray *group = *(SArray **)taosArrayGet(pQInfo->groupInfo.pGroupList, i);
size_t s = taosArrayGetSize(group);
for (int32_t j = 0; j < s; ++j) {
SGroupItem* item = (SGroupItem *)taosArrayGet(group, j);
// STableQueryInfo has been created for each table
if (item->info != NULL) {
return;
}
STableQueryInfo* pInfo = createTableQueryInfoImpl(&pQInfo->runtimeEnv, item->id, pQuery->window);
pInfo->groupIdx = i;
pInfo->tableIndex = index;
item->info = pInfo;
index += 1;
}
}
}
static void doSaveContext(SQInfo *pQInfo) { static void doSaveContext(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
...@@ -4914,6 +4923,12 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) ...@@ -4914,6 +4923,12 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo, qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo,
pQuery->current->lastKey, pQuery->window.ekey); pQuery->current->lastKey, pQuery->window.ekey);
} else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
STableIdInfo tidInfo;
tidInfo.uid = pQuery->current->id.uid;
tidInfo.tid = pQuery->current->id.tid;
tidInfo.key = pQuery->current->lastKey;
taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo);
} }
if (!isTSCompQuery(pQuery)) { if (!isTSCompQuery(pQuery)) {
...@@ -5197,20 +5212,10 @@ static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pEx ...@@ -5197,20 +5212,10 @@ static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pEx
static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **pTableIdList) { static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **pTableIdList) {
assert(pQueryMsg->numOfTables > 0); assert(pQueryMsg->numOfTables > 0);
*pTableIdList = taosArrayInit(pQueryMsg->numOfTables, sizeof(STableId)); *pTableIdList = taosArrayInit(pQueryMsg->numOfTables, sizeof(STableIdInfo));
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->tid = htonl(pTableIdInfo->tid);
pTableIdInfo->uid = htobe64(pTableIdInfo->uid);
pTableIdInfo->key = htobe64(pTableIdInfo->key);
STableId id = {.uid = pTableIdInfo->uid, .tid = pTableIdInfo->tid};
taosArrayPush(*pTableIdList, &id);
pMsg += sizeof(STableIdInfo);
for (int32_t j = 1; j < pQueryMsg->numOfTables; ++j) { for (int32_t j = 0; j < pQueryMsg->numOfTables; ++j) {
pTableIdInfo = (STableIdInfo *)pMsg; STableIdInfo* pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->tid = htonl(pTableIdInfo->tid); pTableIdInfo->tid = htonl(pTableIdInfo->tid);
pTableIdInfo->uid = htobe64(pTableIdInfo->uid); pTableIdInfo->uid = htobe64(pTableIdInfo->uid);
...@@ -5661,7 +5666,16 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) { ...@@ -5661,7 +5666,16 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
} }
} }
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
static int compareTableIdInfo( const void* a, const void* b ) {
const STableIdInfo* x = (const STableIdInfo*)a;
const STableIdInfo* y = (const STableIdInfo*)b;
if (x->uid > y->uid) return 1;
if (x->uid < y->uid) return -1;
return 0;
}
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
STableGroupInfo *groupInfo, SColumnInfo* pTagCols) { STableGroupInfo *groupInfo, SColumnInfo* pTagCols) {
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
if (pQInfo == NULL) { if (pQInfo == NULL) {
...@@ -5754,6 +5768,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -5754,6 +5768,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQInfo->groupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); pQInfo->groupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES);
pQInfo->groupInfo.numOfTables = groupInfo->numOfTables; pQInfo->groupInfo.numOfTables = groupInfo->numOfTables;
int tableIndex = 0;
STimeWindow window = pQueryMsg->window;
taosArraySort( pTableIdList, compareTableIdInfo );
for(int32_t i = 0; i < numOfGroups; ++i) { for(int32_t i = 0; i < numOfGroups; ++i) {
SArray* pa = taosArrayGetP(groupInfo->pGroupList, i); SArray* pa = taosArrayGetP(groupInfo->pGroupList, i);
size_t s = taosArrayGetSize(pa); size_t s = taosArrayGetSize(pa);
...@@ -5761,13 +5778,26 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -5761,13 +5778,26 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
SArray* p1 = taosArrayInit(s, sizeof(SGroupItem)); SArray* p1 = taosArrayInit(s, sizeof(SGroupItem));
for(int32_t j = 0; j < s; ++j) { for(int32_t j = 0; j < s; ++j) {
SGroupItem item = { .id = *(STableId*) taosArrayGet(pa, j), .info = NULL, }; STableId id = *(STableId*) taosArrayGet(pa, j);
SGroupItem item = { .id = id };
// NOTE: compare STableIdInfo with STableId
// not a problem at present because we only use their 1st int64_t field
STableIdInfo* pTableId = taosArraySearch( pTableIdList, compareTableIdInfo, &id );
if (pTableId != NULL ) {
window.skey = pTableId->key;
} else {
window.skey = pQueryMsg->window.skey;
}
item.info = createTableQueryInfo(&pQInfo->runtimeEnv, item.id, window);
item.info->groupIdx = i;
item.info->tableIndex = tableIndex++;
taosArrayPush(p1, &item); taosArrayPush(p1, &item);
} }
taosArrayPush(pQInfo->groupInfo.pGroupList, &p1); taosArrayPush(pQInfo->groupInfo.pGroupList, &p1);
} }
pQInfo->arrTableIdInfo = taosArrayInit(tableIndex, sizeof(STableIdInfo));
pQuery->pos = -1; pQuery->pos = -1;
pQuery->window = pQueryMsg->window; pQuery->window = pQueryMsg->window;
...@@ -5919,6 +5949,7 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -5919,6 +5949,7 @@ static void freeQInfo(SQInfo *pQInfo) {
} }
taosArrayDestroy(pQInfo->tableIdGroupInfo.pGroupList); taosArrayDestroy(pQInfo->tableIdGroupInfo.pGroupList);
taosArrayDestroy(pQInfo->arrTableIdInfo);
if (pQuery->pGroupbyExpr != NULL) { if (pQuery->pGroupbyExpr != NULL) {
taosArrayDestroy(pQuery->pGroupbyExpr->columnInfo); taosArrayDestroy(pQuery->pGroupbyExpr->columnInfo);
...@@ -6046,32 +6077,48 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi ...@@ -6046,32 +6077,48 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_TABLE_QUERY)) { if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_TABLE_QUERY)) {
isSTableQuery = TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY); isSTableQuery = TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
STableId *id = taosArrayGet(pTableIdList, 0); STableIdInfo *id = taosArrayGet(pTableIdList, 0);
if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) { if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) {
goto _over; goto _over;
} }
} else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_STABLE_QUERY)) { } else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_STABLE_QUERY)) {
isSTableQuery = true; isSTableQuery = true;
STableId *id = taosArrayGet(pTableIdList, 0); // TODO: need a macro from TSDB to check if table is super table,
// also note there's possiblity that only one table in the super table
// group by normal column, do not pass the group by condition to tsdb to group table into different group if (taosArrayGetSize(pTableIdList) == 1) {
int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols; STableIdInfo *id = taosArrayGet(pTableIdList, 0);
if (pQueryMsg->numOfGroupCols == 1 && !TSDB_COL_IS_TAG(pGroupColIndex->flag)) { // if array size is 1 and assert super table
numOfGroupByCols = 0;
} // group by normal column, do not pass the group by condition to tsdb to group table into different group
int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols;
// todo handle the error if (pQueryMsg->numOfGroupCols == 1 && !TSDB_COL_IS_TAG(pGroupColIndex->flag)) {
/*int32_t ret =*/tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &groupInfo, pGroupColIndex, numOfGroupByCols = 0;
numOfGroupByCols); }
if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query
code = TSDB_CODE_SUCCESS; // todo handle the error
goto _over; /*int32_t ret =*/tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &groupInfo, pGroupColIndex,
numOfGroupByCols);
if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query
code = TSDB_CODE_SUCCESS;
goto _over;
}
} else {
groupInfo.numOfTables = taosArrayGetSize(pTableIdList);
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
SArray* sa = taosArrayInit(groupInfo.numOfTables, sizeof(STableId));
for(int32_t i = 0; i < groupInfo.numOfTables; ++i) {
STableIdInfo* tableId = taosArrayGet(pTableIdList, i);
taosArrayPush(sa, tableId);
}
taosArrayPush(pTableGroup, &sa);
groupInfo.pGroupList = pTableGroup;
} }
} else { } else {
assert(0); assert(0);
} }
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, &groupInfo, pTagColumnInfo); (*pQInfo) = createQInfoImpl(pQueryMsg, pTableIdList, pGroupbyExpr, pExprs, &groupInfo, pTagColumnInfo);
if ((*pQInfo) == NULL) { if ((*pQInfo) == NULL) {
code = TSDB_CODE_SERV_OUT_OF_MEMORY; code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _over; goto _over;
...@@ -6169,6 +6216,8 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co ...@@ -6169,6 +6216,8 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
size_t size = getResultSize(pQInfo, &pQuery->rec.rows); size_t size = getResultSize(pQInfo, &pQuery->rec.rows);
size += sizeof(int32_t);
size += sizeof(STableIdInfo) * taosArrayGetSize(pQInfo->arrTableIdInfo);
*contLen = size + sizeof(SRetrieveTableRsp); *contLen = size + sizeof(SRetrieveTableRsp);
// todo handle failed to allocate memory // todo handle failed to allocate memory
......
...@@ -1523,7 +1523,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, int64_t uid, const char *pTagC ...@@ -1523,7 +1523,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, int64_t uid, const char *pTagC
} }
if (pTable->type != TSDB_SUPER_TABLE) { if (pTable->type != TSDB_SUPER_TABLE) {
uError("%p query normal tag not allowed, uid:%, tid:%d, name:%s" PRIu64, uError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s",
tsdb, uid, pTable->tableId.tid, pTable->name); tsdb, uid, pTable->tableId.tid, pTable->name);
return TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client return TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client
......
...@@ -73,6 +73,7 @@ void cleanupPush_void_ptr_bool ( bool failOnly, void* func, void* arg1, bool ar ...@@ -73,6 +73,7 @@ void cleanupPush_void_ptr_bool ( bool failOnly, void* func, void* arg1, bool ar
void cleanupPush_void_ptr ( bool failOnly, void* func, void* arg ); void cleanupPush_void_ptr ( bool failOnly, void* func, void* arg );
void cleanupPush_int_int ( bool failOnly, void* func, int arg ); void cleanupPush_int_int ( bool failOnly, void* func, int arg );
void cleanupPush_void ( bool failOnly, void* func ); void cleanupPush_void ( bool failOnly, void* func );
void cleanupPush_int_ptr ( bool failOnly, void* func, void* arg );
int32_t cleanupGetActionCount(); int32_t cleanupGetActionCount();
void cleanupExecuteTo( int32_t anchor, bool failed ); void cleanupExecuteTo( int32_t anchor, bool failed );
...@@ -83,8 +84,10 @@ void cleanupExecute( SExceptionNode* node, bool failed ); ...@@ -83,8 +84,10 @@ void cleanupExecute( SExceptionNode* node, bool failed );
#define CLEANUP_PUSH_VOID_PTR( failOnly, func, arg ) cleanupPush_void_ptr( (failOnly), (void*)(func), (void*)(arg) ) #define CLEANUP_PUSH_VOID_PTR( failOnly, func, arg ) cleanupPush_void_ptr( (failOnly), (void*)(func), (void*)(arg) )
#define CLEANUP_PUSH_INT_INT( failOnly, func, arg ) cleanupPush_void_ptr( (failOnly), (void*)(func), (int)(arg) ) #define CLEANUP_PUSH_INT_INT( failOnly, func, arg ) cleanupPush_void_ptr( (failOnly), (void*)(func), (int)(arg) )
#define CLEANUP_PUSH_VOID( failOnly, func ) cleanupPush_void( (failOnly), (void*)(func) ) #define CLEANUP_PUSH_VOID( failOnly, func ) cleanupPush_void( (failOnly), (void*)(func) )
#define CLEANUP_PUSH_INT_PTR( failOnly, func, arg ) cleanupPush_int_ptr( (failOnly), (void*)(func), (void*)(arg) )
#define CLEANUP_PUSH_FREE( failOnly, arg ) cleanupPush_void_ptr( (failOnly), free, (void*)(arg) ) #define CLEANUP_PUSH_FREE( failOnly, arg ) cleanupPush_void_ptr( (failOnly), free, (void*)(arg) )
#define CLEANUP_PUSH_CLOSE( failOnly, arg ) cleanupPush_int_int( (failOnly), close, (int)(arg) ) #define CLEANUP_PUSH_CLOSE( failOnly, arg ) cleanupPush_int_int( (failOnly), close, (int)(arg) )
#define CLEANUP_PUSH_FCLOSE( failOnly, arg ) cleanupPush_int_ptr( (failOnly), fclose, (void*)(arg) )
#define CLEANUP_GET_ANCHOR() cleanupGetActionCount() #define CLEANUP_GET_ANCHOR() cleanupGetActionCount()
#define CLEANUP_EXECUTE_TO( anchor, failed ) cleanupExecuteTo( (anchor), (failed) ) #define CLEANUP_EXECUTE_TO( anchor, failed ) cleanupExecuteTo( (anchor), (failed) )
...@@ -95,7 +98,7 @@ void cleanupExecute( SExceptionNode* node, bool failed ); ...@@ -95,7 +98,7 @@ void cleanupExecute( SExceptionNode* node, bool failed );
void exceptionPushNode( SExceptionNode* node ); void exceptionPushNode( SExceptionNode* node );
int32_t exceptionPopNode(); int32_t exceptionPopNode();
void exceptionThrow( int code ); void exceptionThrow( int32_t code );
#define TRY(maxCleanupActions) do { \ #define TRY(maxCleanupActions) do { \
SExceptionNode exceptionNode = { 0 }; \ SExceptionNode exceptionNode = { 0 }; \
...@@ -106,10 +109,10 @@ void exceptionThrow( int code ); ...@@ -106,10 +109,10 @@ void exceptionThrow( int code );
int caughtException = setjmp( exceptionNode.jb ); \ int caughtException = setjmp( exceptionNode.jb ); \
if( caughtException == 0 ) if( caughtException == 0 )
#define CATCH( code ) int code = exceptionPopNode(); \ #define CATCH( code ) int32_t code = exceptionPopNode(); \
if( caughtException == 1 ) if( caughtException == 1 )
#define FINALLY( code ) int code = exceptionPopNode(); #define FINALLY( code ) int32_t code = exceptionPopNode();
#define END_TRY } while( 0 ); #define END_TRY } while( 0 );
......
...@@ -106,6 +106,12 @@ void taosArrayCopy(SArray* pDst, const SArray* pSrc); ...@@ -106,6 +106,12 @@ void taosArrayCopy(SArray* pDst, const SArray* pSrc);
*/ */
SArray* taosArrayClone(const SArray* pSrc); SArray* taosArrayClone(const SArray* pSrc);
/**
* clear the array (remove all element)
* @param pArray
*/
void taosArrayClear(SArray* pArray);
/** /**
* destroy array list * destroy array list
* @param pArray * @param pArray
......
...@@ -14,7 +14,7 @@ int32_t exceptionPopNode() { ...@@ -14,7 +14,7 @@ int32_t exceptionPopNode() {
return node->code; return node->code;
} }
void exceptionThrow( int code ) { void exceptionThrow( int32_t code ) {
expList->code = code; expList->code = code;
longjmp( expList->jb, 1 ); longjmp( expList->jb, 1 );
} }
...@@ -38,21 +38,27 @@ static void cleanupWrapper_void_ptr( SCleanupAction* ca ) { ...@@ -38,21 +38,27 @@ static void cleanupWrapper_void_ptr( SCleanupAction* ca ) {
static void cleanupWrapper_int_int( SCleanupAction* ca ) { static void cleanupWrapper_int_int( SCleanupAction* ca ) {
int (*func)( int ) = ca->func; int (*func)( int ) = ca->func;
func( (int)(intptr_t)(ca->arg1.Int) ); func( ca->arg1.Int );
} }
static void cleanupWrapper_void_void( SCleanupAction* ca ) { static void cleanupWrapper_void( SCleanupAction* ca ) {
void (*func)() = ca->func; void (*func)() = ca->func;
func(); func();
} }
static void cleanupWrapper_int_ptr( SCleanupAction* ca ) {
int (*func)( void* ) = ca->func;
func( ca->arg1.Ptr );
}
typedef void (*wrapper)(SCleanupAction*); typedef void (*wrapper)(SCleanupAction*);
static wrapper wrappers[] = { static wrapper wrappers[] = {
cleanupWrapper_void_ptr_ptr, cleanupWrapper_void_ptr_ptr,
cleanupWrapper_void_ptr_bool, cleanupWrapper_void_ptr_bool,
cleanupWrapper_void_ptr, cleanupWrapper_void_ptr,
cleanupWrapper_int_int, cleanupWrapper_int_int,
cleanupWrapper_void_void, cleanupWrapper_void,
cleanupWrapper_int_ptr,
}; };
...@@ -107,6 +113,15 @@ void cleanupPush_void( bool failOnly, void* func ) { ...@@ -107,6 +113,15 @@ void cleanupPush_void( bool failOnly, void* func ) {
ca->func = func; ca->func = func;
} }
void cleanupPush_int_ptr( bool failOnly, void* func, void* arg ) {
assert( expList->numCleanupAction < expList->maxCleanupAction );
SCleanupAction *ca = expList->cleanupActions + expList->numCleanupAction++;
ca->wrapper = 5;
ca->failOnly = failOnly;
ca->func = func;
ca->arg1.Ptr = arg;
}
int32_t cleanupGetActionCount() { int32_t cleanupGetActionCount() {
...@@ -118,8 +133,9 @@ static void doExecuteCleanup( SExceptionNode* node, int32_t anchor, bool failed ...@@ -118,8 +133,9 @@ static void doExecuteCleanup( SExceptionNode* node, int32_t anchor, bool failed
while( node->numCleanupAction > anchor ) { while( node->numCleanupAction > anchor ) {
--node->numCleanupAction; --node->numCleanupAction;
SCleanupAction *ca = node->cleanupActions + node->numCleanupAction; SCleanupAction *ca = node->cleanupActions + node->numCleanupAction;
if( failed || !(ca->failOnly) ) if( failed || !(ca->failOnly) ) {
wrappers[ca->wrapper]( ca ); wrappers[ca->wrapper]( ca );
}
} }
} }
......
...@@ -176,6 +176,11 @@ SArray* taosArrayClone(const SArray* pSrc) { ...@@ -176,6 +176,11 @@ SArray* taosArrayClone(const SArray* pSrc) {
return dst; return dst;
} }
void taosArrayClear(SArray* pArray) {
assert( pArray != NULL );
pArray->size = 0;
}
void taosArrayDestroy(SArray* pArray) { void taosArrayDestroy(SArray* pArray) {
if (pArray == NULL) { if (pArray == NULL) {
return; return;
......
...@@ -56,32 +56,46 @@ void run_test(TAOS* taos) { ...@@ -56,32 +56,46 @@ void run_test(TAOS* taos) {
taos_query(taos, "drop database if exists test;"); taos_query(taos, "drop database if exists test;");
usleep(100000); usleep(100000);
taos_query(taos, "create database test tables 5;"); //taos_query(taos, "create database test tables 5;");
taos_query(taos, "create database test;");
usleep(100000); usleep(100000);
taos_query(taos, "use test;"); taos_query(taos, "use test;");
usleep(100000); usleep(100000);
taos_query(taos, "create table meters(ts timestamp, a int, b binary(20)) tags(loc binary(20), area int);"); taos_query(taos, "create table meters(ts timestamp, a int) tags(area int);");
taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:00:00.000', 0, 'china');"); taos_query(taos, "create table t0 using meters tags(0);");
taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:01:00.000', 0, 'china');"); taos_query(taos, "create table t1 using meters tags(1);");
taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:02:00.000', 0, 'china');"); taos_query(taos, "create table t2 using meters tags(2);");
taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:00:00.000', 0, 'china');"); taos_query(taos, "create table t3 using meters tags(3);");
taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:01:00.000', 0, 'china');"); taos_query(taos, "create table t4 using meters tags(4);");
taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:02:00.000', 0, 'china');"); taos_query(taos, "create table t5 using meters tags(5);");
taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:03:00.000', 0, 'china');"); taos_query(taos, "create table t6 using meters tags(6);");
taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:00:00.000', 0, 'UK');"); taos_query(taos, "create table t7 using meters tags(7);");
taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:00.000', 0, 'UK');"); taos_query(taos, "create table t8 using meters tags(8);");
taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:01.000', 0, 'UK');"); taos_query(taos, "create table t9 using meters tags(9);");
taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:02.000', 0, 'UK');");
taos_query(taos, "insert into t3 using meters tags('tianjin', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); taos_query(taos, "insert into t0 values('2020-01-01 00:00:00.000', 0);");
taos_query(taos, "insert into t4 using meters tags('wuhan', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); taos_query(taos, "insert into t0 values('2020-01-01 00:01:00.000', 0);");
taos_query(taos, "insert into t5 using meters tags('jinan', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); taos_query(taos, "insert into t0 values('2020-01-01 00:02:00.000', 0);");
taos_query(taos, "insert into t6 using meters tags('haikou', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); taos_query(taos, "insert into t1 values('2020-01-01 00:00:00.000', 0);");
taos_query(taos, "insert into t7 using meters tags('nanjing', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); taos_query(taos, "insert into t1 values('2020-01-01 00:01:00.000', 0);");
taos_query(taos, "insert into t8 using meters tags('lanzhou', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); taos_query(taos, "insert into t1 values('2020-01-01 00:02:00.000', 0);");
taos_query(taos, "insert into t9 using meters tags('tokyo', 0) values('2020-01-01 00:01:02.000', 0, 'japan');"); taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.000', 0);");
taos_query(taos, "insert into t2 values('2020-01-01 00:00:00.000', 0);");
taos_query(taos, "insert into t2 values('2020-01-01 00:01:00.000', 0);");
taos_query(taos, "insert into t2 values('2020-01-01 00:01:01.000', 0);");
taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t3 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t4 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t5 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t6 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t7 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t8 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t9 values('2020-01-01 00:01:02.000', 0);");
// super tables subscription // super tables subscription
usleep(1000000);
TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0); TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
TAOS_RES* res = taos_consume(tsub); TAOS_RES* res = taos_consume(tsub);
...@@ -90,23 +104,23 @@ void run_test(TAOS* taos) { ...@@ -90,23 +104,23 @@ void run_test(TAOS* taos) {
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 0); check_row_count(__LINE__, res, 0);
taos_query(taos, "insert into t0 values('2020-01-01 00:03:00.000', 0, 'china');"); taos_query(taos, "insert into t0 values('2020-01-01 00:02:00.001', 0);");
taos_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0, 'china');"); taos_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0);");
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 2); check_row_count(__LINE__, res, 2);
taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0, 'UK');"); taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0);");
taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0, 'UK');"); taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0);");
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 2); check_row_count(__LINE__, res, 2);
taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0, 'china');"); taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0);");
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 1); check_row_count(__LINE__, res, 1);
// keep progress information and restart subscription // keep progress information and restart subscription
taos_unsubscribe(tsub, 1); taos_unsubscribe(tsub, 1);
taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0, 'china');"); taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0);");
tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0); tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 24); check_row_count(__LINE__, res, 24);
...@@ -133,7 +147,7 @@ void run_test(TAOS* taos) { ...@@ -133,7 +147,7 @@ void run_test(TAOS* taos) {
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 0); check_row_count(__LINE__, res, 0);
taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0, 'china');"); taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0);");
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 1); check_row_count(__LINE__, res, 1);
...@@ -197,7 +211,7 @@ int main(int argc, char *argv[]) { ...@@ -197,7 +211,7 @@ int main(int argc, char *argv[]) {
// init TAOS // init TAOS
taos_init(); taos_init();
TAOS* taos = taos_connect(host, user, passwd, "test", 0); TAOS* taos = taos_connect(host, user, passwd, "", 0);
if (taos == NULL) { if (taos == NULL) {
printf("failed to connect to db, reason:%s\n", taos_errstr(taos)); printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
exit(1); exit(1);
...@@ -209,6 +223,7 @@ int main(int argc, char *argv[]) { ...@@ -209,6 +223,7 @@ int main(int argc, char *argv[]) {
exit(0); exit(0);
} }
taos_query(taos, "use test;");
TAOS_SUB* tsub = NULL; TAOS_SUB* tsub = NULL;
if (async) { if (async) {
// create an asynchronized subscription, the callback function will be called every 1s // create an asynchronized subscription, the callback function will be called every 1s
......
...@@ -27,7 +27,7 @@ sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol bool) ...@@ -27,7 +27,7 @@ sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol bool)
$i = 0 $i = 0
while $i < 5 while $i < 5
$tb = $tbPrefix . $i $tb = $tbPrefix . $i
sql create table $tb using $mt tags( 0 ) sql create table $tb using $mt tags( $i )
$x = 0 $x = 0
while $x < $rowNum while $x < $rowNum
$val = $x * 60000 $val = $x * 60000
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册