diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 2a37f3fc7cc4ecbbde65aa9102fa73f928f7eaa0..1654f76f1c5bd5feef17e3f25c08a87f286608d9 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -31,13 +31,13 @@ extern "C" { #include "tscSecondaryMerge.h" #include "tsclient.h" -#define UTIL_TABLE_IS_SUPERTABLE(metaInfo) \ +#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \ (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE)) #define UTIL_TABLE_IS_CHILD_TABLE(metaInfo) \ (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE)) -#define UTIL_TABLE_IS_NOMRAL_TABLE(metaInfo)\ - (!(UTIL_TABLE_IS_SUPERTABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo))) +#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo)\ + (!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo))) #define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0) @@ -265,6 +265,10 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()); +void* malloc_throw(size_t size); +void* calloc_throw(size_t nmemb, size_t size); +char* strdup_throw(const char* str); + #ifdef __cplusplus } #endif diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 6ea1ee6440999d843f2326ffc2567b9ca79957f9..a7eec31388d3a4fa389b164bb721316c9453b1ee 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -438,6 +438,9 @@ extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows); +int32_t tscCompareTidTags(const void* p1, const void* p2); +void tscBuildVgroupTableInfo(STableMetaInfo* pTableMetaInfo, SArray* tables); + #ifdef __cplusplus } #endif diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 813c4561524d03456101f2ac0687ba3b0b095820..133bd116ba5e8f5468711b3d43a56d6ef5331d3d 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -471,7 +471,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; - if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { + if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex); pRes->code = code; diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 5ae226a577a9af982c7c0d926eb432d5e7e73ac4..2da786d1d8f3a5ff8e4947c92afd340d2b6d55df 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -122,7 +122,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) { int32_t numOfRows = tscGetNumOfColumns(pMeta); int32_t totalNumOfRows = numOfRows + tscGetNumOfTags(pMeta); - if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { numOfRows = numOfRows + tscGetNumOfTags(pMeta); } @@ -161,7 +161,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) { } } - if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { return 0; } diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 65555dba136480f92f90fdef6ba1fc9f61af088c..7e67ff82e9bac2a19d393bcb89949ac1983a384c 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -796,7 +796,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { return code; } - if (!UTIL_TABLE_IS_SUPERTABLE(pSTableMeterMetaInfo)) { + if (!UTIL_TABLE_IS_SUPER_TABLE(pSTableMeterMetaInfo)) { return tscInvalidSQLErrMsg(pCmd->payload, "create table only from super table is allowed", sToken.z); } @@ -1097,7 +1097,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { goto _error_clean; // TODO: should _clean or _error_clean to async flow ???? } - if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL); goto _error_clean; } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 86a74f01153ebf76751f0f5b68b3e1495f85e95e..ebb081bb3ae168c71a1d214af303567e0fdded3a 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1348,7 +1348,7 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum STableComInfo tinfo = tscGetTableInfo(pTableMeta); - if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { numOfTotalColumns = tinfo.numOfColumns + tinfo.numOfTags; } else { numOfTotalColumns = tinfo.numOfColumns; @@ -1408,7 +1408,7 @@ int32_t addProjectionExprAndResultField(SQueryInfo* pQueryInfo, tSQLExprItem* pI STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; - if (index.columnIndex >= tscGetNumOfColumns(pTableMeta) && UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { + if (index.columnIndex >= tscGetNumOfColumns(pTableMeta) && UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); } @@ -1862,7 +1862,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr case TK_TBID: { pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) { return invalidSqlErrMsg(pQueryInfo->msg, msg7); } @@ -2279,7 +2279,7 @@ bool validateIpAddress(const char* ip, size_t size) { int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - if (pTableMetaInfo->pTableMeta == NULL || !UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { + if (pTableMetaInfo->pTableMeta == NULL || !UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { return TSDB_CODE_INVALID_SQL; } @@ -2318,7 +2318,7 @@ int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) { /* transfer the field-info back to original input format */ void tscRestoreSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { + if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { return; } @@ -2542,7 +2542,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* } if (groupTag) { - if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { + if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { return invalidSqlErrMsg(pQueryInfo->msg, msg9); } @@ -3254,7 +3254,7 @@ static bool validateJoinExprNode(SQueryInfo* pQueryInfo, tSQLExpr* pExpr, SColum } // table to table/ super table to super table are allowed - if (UTIL_TABLE_IS_SUPERTABLE(pLeftMeterMeta) != UTIL_TABLE_IS_SUPERTABLE(pRightMeterMeta)) { + if (UTIL_TABLE_IS_SUPER_TABLE(pLeftMeterMeta) != UTIL_TABLE_IS_SUPER_TABLE(pRightMeterMeta)) { invalidSqlErrMsg(pQueryInfo->msg, msg5); return false; } @@ -3337,7 +3337,7 @@ static int32_t handleExprInQueryCond(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, S } else if (index.columnIndex >= tscGetNumOfColumns(pTableMeta) || index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { // query on tags // check for tag query condition - if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); } @@ -3697,7 +3697,7 @@ static int32_t validateJoinExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr) { } STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { // for stable join, tag columns + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // for stable join, tag columns // must be present for join if (pCondExpr->pJoinExpr == NULL) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); @@ -3735,7 +3735,7 @@ static void cleanQueryExpr(SCondExpr* pCondExpr) { static void doAddJoinTagsColumnsIntoTagList(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - if (QUERY_IS_JOIN_QUERY(pQueryInfo->type) && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { + if (QUERY_IS_JOIN_QUERY(pQueryInfo->type) && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { SColumnIndex index = {0}; getColumnIndexByName(&pCondExpr->pJoinExpr->pLeft->colInfo, pQueryInfo, &index); @@ -4102,7 +4102,7 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) { } /* for super table query, set default ascending order for group output */ - if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC; } } @@ -4128,7 +4128,7 @@ int32_t parseOrderbyClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema * * for super table query, the order option must be less than 3. */ - if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) { if (pSortorder->nExpr > 1) { return invalidSqlErrMsg(pQueryInfo->msg, msg0); } @@ -4149,7 +4149,7 @@ int32_t parseOrderbyClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema SSQLToken columnName = {pVar->nLen, pVar->nType, pVar->pz}; SColumnIndex index = {0}; - if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { // super table query + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // super table query if (getColumnIndexByName(&columnName, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); } @@ -4302,10 +4302,10 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { if (pAlterSQL->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN || pAlterSQL->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN || pAlterSQL->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) { - if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) { return invalidSqlErrMsg(pQueryInfo->msg, msg3); } - } else if ((pAlterSQL->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) && (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo))) { + } else if ((pAlterSQL->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) && (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo))) { return invalidSqlErrMsg(pQueryInfo->msg, msg4); } else if ((pAlterSQL->type == TSDB_ALTER_TABLE_ADD_COLUMN || pAlterSQL->type == TSDB_ALTER_TABLE_DROP_COLUMN) && UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo)) { @@ -4691,7 +4691,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* } // todo refactor - if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (!tscQueryTags(pQueryInfo)) { // local handle the super table tag query if (tscIsProjectionQueryOnSTable(pQueryInfo, 0)) { if (pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0) { @@ -5627,7 +5627,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { return code; } - bool isSTable = UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo); + bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); if (parseSelectClause(&pSql->cmd, 0, pQuerySql->pSelection, isSTable) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; } @@ -5771,7 +5771,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { assert(pQueryInfo->numOfTables == pQuerySql->from->nExpr); bool isSTable = false; - if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { isSTable = true; code = tscGetSTableVgroupInfo(pSql, index); if (code != TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index b958bbe15c3fcb5372e6b324d5780ed37ef758c1..43370c70372d9f86bb3c318772afc67beb6f12d3 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -39,7 +39,7 @@ int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql); void tscProcessActivityTimer(void *handle, void *tmrId); int tscKeepConn[TSDB_SQL_MAX] = {0}; -TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid); +TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt); void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts); void tscSaveSubscriptionProgress(void* sub); @@ -500,7 +500,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // todo valid the vgroupId at the client side STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { int32_t vgIndex = pTableMetaInfo->vgroupIndex; SVgroupsInfo* pVgroupInfo = pTableMetaInfo->vgroupList; @@ -566,12 +566,13 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0); + TSKEY dfltKey = htobe64(pQueryMsg->window.skey); STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; - if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) { + if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) { SCMVgroupInfo* pVgroupInfo = NULL; - if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { int32_t index = pTableMetaInfo->vgroupIndex; assert(index >= 0); @@ -580,25 +581,25 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char } else { pVgroupInfo = &pTableMeta->vgroupInfo; } - + tscSetDnodeIpList(pSql, pVgroupInfo); pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); - + STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; pTableIdInfo->tid = htonl(pTableMeta->sid); pTableIdInfo->uid = htobe64(pTableMeta->uid); - pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid)); - + pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid, dfltKey)); + pQueryMsg->numOfTables = htonl(1); // set the number of tables - + pMsg += sizeof(STableIdInfo); } else { int32_t index = pTableMetaInfo->vgroupIndex; int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables); assert(index >= 0 && index < numOfVgroups); - + tscTrace("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups); - + SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index); // set the vgroup info @@ -615,7 +616,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; pTableIdInfo->tid = htonl(pItem->tid); pTableIdInfo->uid = htobe64(pItem->uid); -// pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid)); + pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid, dfltKey)); pMsg += sizeof(STableIdInfo); } } @@ -2283,7 +2284,7 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) { taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true); if (pTableMetaInfo->pTableMeta) { - bool isSuperTable = UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo); + bool isSuperTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true); // taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true); @@ -2345,6 +2346,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { for (int i = 0; i < numOfTables; i++) { int64_t uid = htobe64(*(int64_t*)p); p += sizeof(int64_t); + p += sizeof(int32_t); // skip tid TSKEY key = htobe64(*(TSKEY*)p); p += sizeof(TSKEY); tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index d7c22b2248426e6437f260e1c7bab30de2d580ba..a00f856b2a9917a261368ba4f3e29fd09e111894 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -79,7 +79,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; - if (code == 0 && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { + if (code == 0 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { code = tscGetSTableVgroupInfo(pSql, 0); pSql->res.code = code; diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 856b67839115680eb535721546d9dd0abc83cc8c..8e90660a2462705d14a313a344e90c0ffe0c14cd 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -44,8 +44,7 @@ typedef struct SSub { int interval; TAOS_SUBSCRIBE_CALLBACK fp; void * param; - int numOfTables; - SSubscriptionProgress * progress; + SArray* progress; } SSub; @@ -57,92 +56,113 @@ static int tscCompareSubscriptionProgress(const void* a, const void* b) { return 0; } -TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid) { - if (sub == NULL) - return 0; - - SSub* pSub = (SSub*)sub; - for (int s = 0, e = pSub->numOfTables; s < e;) { - int m = (s + e) / 2; - SSubscriptionProgress* p = pSub->progress + m; - if (p->uid > uid) - e = m; - else if (p->uid < uid) - s = m + 1; - else - return p->key; +TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt) { + if (sub == NULL) { + return dflt; } + SSub* pSub = (SSub*)sub; - return 0; + SSubscriptionProgress target = {.uid = uid, .key = 0}; + SSubscriptionProgress* p = taosArraySearch(pSub->progress, tscCompareSubscriptionProgress, &target); + if (p == NULL) { + return dflt; + } + return p->key; } void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) { if( sub == NULL) return; - SSub* pSub = (SSub*)sub; - for (int s = 0, e = pSub->numOfTables; s < e;) { - int m = (s + e) / 2; - SSubscriptionProgress* p = pSub->progress + m; - if (p->uid > uid) - e = m; - else if (p->uid < uid) - s = m + 1; - else { - if (ts >= p->key) p->key = ts; - break; - } + + SSubscriptionProgress target = {.uid = uid, .key = ts}; + SSubscriptionProgress* p = taosArraySearch(pSub->progress, tscCompareSubscriptionProgress, &target); + if (p != NULL) { + p->key = ts; } } +static void asyncCallback(void *param, TAOS_RES *tres, int code) { + assert(param != NULL); + SSqlObj *pSql = ((SSqlObj *)param); + + pSql->res.code = code; + sem_post(&pSql->rspSem); +} + + static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* sql) { - SSub* pSub = calloc(1, sizeof(SSub)); - if (pSub == NULL) { - terrno = TSDB_CODE_CLI_OUT_OF_MEMORY; - tscError("failed to allocate memory for subscription"); - return NULL; - } + SSub* pSub = NULL; - SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); - if (pSql == NULL) { - terrno = TSDB_CODE_CLI_OUT_OF_MEMORY; - tscError("failed to allocate SSqlObj for subscription"); - goto _pSql_failed; - } + TRY( 8 ) { + SSqlObj* pSql = calloc_throw(1, sizeof(SSqlObj)); + CLEANUP_PUSH_FREE(true, pSql); + SSqlCmd *pCmd = &pSql->cmd; + SSqlRes *pRes = &pSql->res; + + if (tsem_init(&pSql->rspSem, 0, 0) == -1) { + THROW(TAOS_SYSTEM_ERROR(errno)); + } + CLEANUP_PUSH_INT_PTR(true, tsem_destroy, &pSql->rspSem); - pSql->signature = pSql; - pSql->pTscObj = pObj; + pSql->signature = pSql; + pSql->param = pSql; + pSql->pTscObj = pObj; + pSql->maxRetry = TSDB_MAX_REPLICA_NUM; + pSql->fp = asyncCallback; - char* sqlstr = (char*)malloc(strlen(sql) + 1); - if (sqlstr == NULL) { - tscError("failed to allocate sql string for subscription"); - goto failed; - } - strcpy(sqlstr, sql); - strtolower(sqlstr, sqlstr); - pSql->sqlstr = sqlstr; + int code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); + if (code != TSDB_CODE_SUCCESS) { + THROW(code); + } + CLEANUP_PUSH_FREE(true, pCmd->payload); - tsem_init(&pSql->rspSem, 0, 0); + pRes->qhandle = 0; + pRes->numOfRows = 1; - SSqlRes *pRes = &pSql->res; - pRes->numOfRows = 1; - pRes->numOfTotal = 0; - - pSql->pSubscription = pSub; - pSub->pSql = pSql; - pSub->signature = pSub; - strncpy(pSub->topic, topic, sizeof(pSub->topic)); - pSub->topic[sizeof(pSub->topic) - 1] = 0; - return pSub; + pSql->sqlstr = strdup_throw(sql); + CLEANUP_PUSH_FREE(true, pSql->sqlstr); + strtolower(pSql->sqlstr, pSql->sqlstr); + + code = tsParseSql(pSql, false); + if (code == TSDB_CODE_ACTION_IN_PROGRESS) { + // wait for the callback function to post the semaphore + sem_wait(&pSql->rspSem); + code = pSql->res.code; + } + if (code != TSDB_CODE_SUCCESS) { + tscError("failed to parse sql statement: %s, error: %s", pSub->topic, tstrerror(code)); + THROW( code ); + } + + if (pSql->cmd.command != TSDB_SQL_SELECT) { + tscError("only 'select' statement is allowed in subscription: %s", pSub->topic); + THROW( -1 ); // TODO + } + + pSub = calloc_throw(1, sizeof(SSub)); + CLEANUP_PUSH_FREE(true, pSub); + pSql->pSubscription = pSub; + pSub->pSql = pSql; + pSub->signature = pSub; + strncpy(pSub->topic, topic, sizeof(pSub->topic)); + pSub->topic[sizeof(pSub->topic) - 1] = 0; + pSub->progress = taosArrayInit(32, sizeof(SSubscriptionProgress)); + if (pSub->progress == NULL) { + THROW(TSDB_CODE_CLI_OUT_OF_MEMORY); + } + + CLEANUP_EXECUTE(); -failed: - tfree(sqlstr); + } CATCH( code ) { + tscError("failed to create subscription object: %s", tstrerror(code)); + CLEANUP_EXECUTE(); + pSub = NULL; -_pSql_failed: - tfree(pSql); - tfree(pSub); - return NULL; + } END_TRY + + return pSub; } @@ -159,60 +179,68 @@ static void tscProcessSubscriptionTimer(void *handle, void *tmrId) { } -int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { - int code = (uint8_t)tsParseSql(pSub->pSql, false); +static SArray* getTableList( SSqlObj* pSql ) { + const char* p = strstr( pSql->sqlstr, " from " ); + char* sql = alloca(strlen(p) + 32); + sprintf(sql, "select tbid(tbname)%s", p); + int code = taos_query( pSql->pTscObj, sql ); if (code != TSDB_CODE_SUCCESS) { - tscError("failed to parse sql statement: %s", pSub->topic); - return 0; + tscError("failed to retrieve table id: %s", tstrerror(code)); + return NULL; } - SSqlCmd* pCmd = &pSub->pSql->cmd; - if (pCmd->command != TSDB_SQL_SELECT) { - tscError("only 'select' statement is allowed in subscription: %s", pSub->topic); - return 0; + TAOS_RES* res = taos_use_result( pSql->pTscObj ); + TAOS_ROW row; + SArray* result = taosArrayInit( 128, sizeof(STidTags) ); + while ((row = taos_fetch_row(res))) { + STidTags tags; + memcpy(&tags, row[0], sizeof(tags)); + taosArrayPush(result, &tags); } - STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0, 0); - int numOfTables = 0; - if (!UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { -// SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta; -// for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) { -// SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i); -// numOfTables += pVnodeSidList->numOfSids; -// } - } + return result; +} - SSubscriptionProgress* progress = (SSubscriptionProgress*)calloc(numOfTables, sizeof(SSubscriptionProgress)); - if (progress == NULL) { - tscError("failed to allocate memory for progress: %s", pSub->topic); - return 0; + +static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { + SSqlObj* pSql = pSub->pSql; + + SSqlCmd* pCmd = &pSql->cmd; + + pSub->lastSyncTime = taosGetTimestampMs(); + + STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); + if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) { + STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; + SSubscriptionProgress target = {.uid = pTableMeta->uid, .key = 0}; + SSubscriptionProgress* p = taosArraySearch(pSub->progress, tscCompareSubscriptionProgress, &target); + if (p == NULL) { + taosArrayClear(pSub->progress); + taosArrayPush(pSub->progress, &target); + } + return 1; } - if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { - numOfTables = 1; - int64_t uid = pTableMetaInfo->pTableMeta->uid; - progress[0].uid = uid; - progress[0].key = tscGetSubscriptionProgress(pSub, uid); - } else { -// SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta; -// numOfTables = 0; -// for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) { -// SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i); -// for (int32_t j = 0; j < pVnodeSidList->numOfSids; j++) { -// STableIdInfo *pTableMetaInfo = tscGetMeterSidInfo(pVnodeSidList, j); -// int64_t uid = pTableMetaInfo->uid; -// progress[numOfTables].uid = uid; -// progress[numOfTables++].key = tscGetSubscriptionProgress(pSub, uid); -// } -// } - qsort(progress, numOfTables, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress); + SArray* tables = getTableList(pSql); + size_t numOfTables = taosArrayGetSize(tables); + + SArray* progress = taosArrayInit(numOfTables, sizeof(SSubscriptionProgress)); + for( size_t i = 0; i < numOfTables; i++ ) { + STidTags* tt = taosArrayGet( tables, i ); + SSubscriptionProgress p = { .uid = tt->uid }; + p.key = tscGetSubscriptionProgress(pSub, tt->uid, INT64_MIN); + taosArrayPush(progress, &p); } + taosArraySort(progress, tscCompareSubscriptionProgress); - free(pSub->progress); - pSub->numOfTables = numOfTables; + taosArrayDestroy(pSub->progress); pSub->progress = progress; - pSub->lastSyncTime = taosGetTimestampMs(); + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { + taosArraySort( tables, tscCompareTidTags ); + tscBuildVgroupTableInfo( pTableMetaInfo, tables ); + } + taosArrayDestroy(tables); return 1; } @@ -248,32 +276,22 @@ static int tscLoadSubscriptionProgress(SSub* pSub) { return 0; } - if (fgets(buf, sizeof(buf), fp) == NULL || atoi(buf) < 0) { - tscTrace("invalid subscription progress file: %s", pSub->topic); - fclose(fp); - return 0; - } - - int numOfTables = atoi(buf); - SSubscriptionProgress* progress = calloc(numOfTables, sizeof(SSubscriptionProgress)); - for (int i = 0; i < numOfTables; i++) { + SArray* progress = pSub->progress; + taosArrayClear(progress); + while( 1 ) { if (fgets(buf, sizeof(buf), fp) == NULL) { fclose(fp); - free(progress); return 0; } - int64_t uid, key; - sscanf(buf, "%" SCNd64 ":%" SCNd64, &uid, &key); - progress[i].uid = uid; - progress[i].key = key; + SSubscriptionProgress p; + sscanf(buf, "%" SCNd64 ":%" SCNd64, &p.uid, &p.key); + taosArrayPush(progress, &p); } fclose(fp); - qsort(progress, numOfTables, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress); - pSub->numOfTables = numOfTables; - pSub->progress = progress; - tscTrace("subscription progress loaded, %d tables: %s", numOfTables, pSub->topic); + taosArraySort(progress, tscCompareSubscriptionProgress); + tscTrace("subscription progress loaded, %d tables: %s", taosArrayGetSize(progress), pSub->topic); return 1; } @@ -294,11 +312,10 @@ void tscSaveSubscriptionProgress(void* sub) { } fputs(pSub->pSql->sqlstr, fp); - fprintf(fp, "\n%d\n", pSub->numOfTables); - for (int i = 0; i < pSub->numOfTables; i++) { - int64_t uid = pSub->progress[i].uid; - TSKEY key = pSub->progress[i].key; - fprintf(fp, "%" PRId64 ":%" PRId64 "\n", uid, key); + fprintf(fp, "\n"); + for(size_t i = 0; i < taosArrayGetSize(pSub->progress); i++) { + SSubscriptionProgress* p = taosArrayGet(pSub->progress, i); + fprintf(fp, "%" PRId64 ":%" PRId64 "\n", p->uid, p->key); } fclose(fp); @@ -363,35 +380,34 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { tscRemoveFromSqlList(pSql); if (taosGetTimestampMs() - pSub->lastSyncTime > 10 * 60 * 1000) { - tscTrace("begin meter synchronization"); - char* sqlstr = pSql->sqlstr; - pSql->sqlstr = NULL; - taos_free_result_imp(pSql, 0); - pSql->sqlstr = sqlstr; - taosCacheEmpty(tscCacheHandle); + tscTrace("begin table synchronization"); if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL; - tscTrace("meter synchronization completed"); - } else { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - - uint32_t type = pQueryInfo->type; - taos_free_result_imp(pSql, 1); - pRes->numOfRows = 1; - pRes->numOfTotal = 0; - pRes->qhandle = 0; - pSql->cmd.command = TSDB_SQL_SELECT; - pQueryInfo->type = type; - - tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vgroupIndex = 0; + tscTrace("table synchronization completed"); } + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + + uint32_t type = pQueryInfo->type; + tscFreeSqlResult(pSql); + pRes->numOfRows = 1; + pRes->qhandle = 0; + pSql->cmd.command = TSDB_SQL_SELECT; + pQueryInfo->type = type; + + tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vgroupIndex = 0; + + pSql->fp = asyncCallback; + pSql->param = pSql; tscDoQuery(pSql); - if (pRes->code != TSDB_CODE_NOT_ACTIVE_TABLE) { - break; + sem_wait(&pSql->rspSem); + + if (pRes->code != TSDB_CODE_SUCCESS) { + continue; } // meter was removed, make sync time zero, so that next retry will // do synchronization first pSub->lastSyncTime = 0; + break; } if (pRes->code != TSDB_CODE_SUCCESS) { @@ -421,7 +437,7 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) { } tscFreeSqlObj(pSub->pSql); - free(pSub->progress); + taosArrayDestroy(pSub->progress); memset(pSub, 0, sizeof(*pSub)); free(pSub); } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 1e4a142a906cb852dffdebca799f0cfbbab4dece..02af71ee66a8b24f6a0a42170316c00296c2abb7 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -330,7 +330,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { pNewQueryInfo->limit = pSupporter->limit; // fetch the join tag column - if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { SSqlExpr* pExpr = tscSqlExprGet(pNewQueryInfo, 0); assert(pQueryInfo->tagCond.joinInfo.hasJoin); @@ -463,77 +463,51 @@ static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* } } -int32_t tagsOrderCompar(const void* p1, const void* p2) { - STidTags* t1 = (STidTags*) p1; - STidTags* t2 = (STidTags*) p2; +int32_t tscCompareTidTags(const void* p1, const void* p2) { + const STidTags* t1 = (const STidTags*) p1; + const STidTags* t2 = (const STidTags*) p2; if (t1->vgId != t2->vgId) { - return (t1->vgId > t2->vgId)? 1:-1; - } else { - if (t1->tid != t2->tid) { - return (t1->tid > t2->tid)? 1:-1; - } else { - return 0; - } + return (t1->vgId > t2->vgId) ? 1 : -1; + } + if (t1->tid != t2->tid) { + return (t1->tid > t2->tid) ? 1 : -1; } + return 0; } -static void doBuildVgroupTableInfo(SArray* res, STableMetaInfo* pTableMetaInfo) { - SArray* pGroup = taosArrayInit(4, sizeof(SVgroupTableInfo)); - - SArray* vgTableIdItem = taosArrayInit(4, sizeof(STableIdInfo)); - int32_t size = taosArrayGetSize(res); - - STidTags* prev = taosArrayGet(res, 0); - int32_t prevVgId = prev->vgId; - - STableIdInfo item = {.uid = prev->uid, .tid = prev->tid, .key = INT64_MIN}; - taosArrayPush(vgTableIdItem, &item); - - for(int32_t k = 1; k < size; ++k) { - STidTags* t1 = taosArrayGet(res, k); - if (prevVgId != t1->vgId) { - - SVgroupTableInfo info = {0}; - +void tscBuildVgroupTableInfo(STableMetaInfo* pTableMetaInfo, SArray* tables) { + SArray* result = taosArrayInit( 4, sizeof(SVgroupTableInfo) ); + SArray* vgTables = NULL; + STidTags* prev = NULL; + + size_t numOfTables = taosArrayGetSize( tables ); + for( size_t i = 0; i < numOfTables; i++ ) { + STidTags* tt = taosArrayGet( tables, i ); + + if( prev == NULL || tt->vgId != prev->vgId ) { SVgroupsInfo* pvg = pTableMetaInfo->vgroupList; - for(int32_t m = 0; m < pvg->numOfVgroups; ++m) { - if (prevVgId == pvg->vgroups[m].vgId) { + + SVgroupTableInfo info = { 0 }; + for( int32_t m = 0; m < pvg->numOfVgroups; ++m ) { + if( tt->vgId == pvg->vgroups[m].vgId ) { info.vgInfo = pvg->vgroups[m]; break; } } - - assert(info.vgInfo.numOfIps != 0); - info.itemList = vgTableIdItem; - taosArrayPush(pGroup, &info); - - vgTableIdItem = taosArrayInit(4, sizeof(STableIdInfo)); - STableIdInfo item1 = {.uid = t1->uid, .tid = t1->tid, .key = INT64_MIN}; - taosArrayPush(vgTableIdItem, &item1); - prevVgId = t1->vgId; - } else { - taosArrayPush(vgTableIdItem, &item); - } - } - - if (taosArrayGetSize(vgTableIdItem) > 0) { - SVgroupTableInfo info = {0}; - SVgroupsInfo* pvg = pTableMetaInfo->vgroupList; - - for(int32_t m = 0; m < pvg->numOfVgroups; ++m) { - if (prevVgId == pvg->vgroups[m].vgId) { - info.vgInfo = pvg->vgroups[m]; - break; - } + assert( info.vgInfo.numOfIps != 0 ); + + vgTables = taosArrayInit( 4, sizeof(STableIdInfo) ); + info.itemList = vgTables; + taosArrayPush( result, &info ); } - - assert(info.vgInfo.numOfIps != 0); - info.itemList = vgTableIdItem; - taosArrayPush(pGroup, &info); + + STableIdInfo item = { .uid = tt->uid, .tid = tt->tid, .key = INT64_MIN }; + taosArrayPush( vgTables, &item ); + prev = tt; } - - pTableMetaInfo->pVgroupTables = pGroup; + + pTableMetaInfo->pVgroupTables = result; } static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) { @@ -627,8 +601,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { SJoinSupporter* p1 = pParentSql->pSubs[0]->param; SJoinSupporter* p2 = pParentSql->pSubs[1]->param; - qsort(p1->pIdTagList, p1->num, p1->tagSize, tagsOrderCompar); - qsort(p2->pIdTagList, p2->num, p2->tagSize, tagsOrderCompar); + qsort(p1->pIdTagList, p1->num, p1->tagSize, tscCompareTidTags); + qsort(p2->pIdTagList, p2->num, p2->tagSize, tscCompareTidTags); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -668,11 +642,11 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0); STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0); - doBuildVgroupTableInfo(s1, pTableMetaInfo1); + tscBuildVgroupTableInfo(pTableMetaInfo1, s1); SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0); STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0); - doBuildVgroupTableInfo(s2, pTableMetaInfo2); + tscBuildVgroupTableInfo(pTableMetaInfo2, s2); pSupporter->pState->numOfCompleted = 0; pSupporter->pState->code = 0; @@ -1096,7 +1070,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter tscInitQueryInfo(pNewQueryInfo); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0); - if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { // return the tableId & tag + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // return the tableId & tag SSchema s = {0}; SColumnIndex index = {0}; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 42c537eb0cfceebd095e04483a84fec6d67fb5ae..f9531ec1aa57688860006770ba375fd599eddeb4 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -157,7 +157,7 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { } // for select query super table, the super table vgroup list can not be null in any cases. - if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { + if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { assert(pTableMetaInfo->vgroupList != NULL); } @@ -172,7 +172,7 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { if (((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) != TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->command == TSDB_SQL_SELECT) { - return UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo); + return UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); } return false; @@ -187,7 +187,7 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { * 4. show queries, instead of a select query */ size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); - if (pTableMetaInfo == NULL || !UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo) || + if (pTableMetaInfo == NULL || !UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) || pQueryInfo->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || numOfExprs == 0) { return false; } @@ -1350,7 +1350,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId) { return false; } - if (colId == -1 && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { + if (colId == -1 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { return true; } @@ -1461,10 +1461,11 @@ bool tscShouldFreeHeatBeat(SSqlObj* pHb) { } /* - * the following three kinds of SqlObj should not be freed + * the following four kinds of SqlObj should not be freed * 1. SqlObj for stream computing * 2. main SqlObj * 3. heartbeat SqlObj + * 4. SqlObj for subscription * * If res code is error and SqlObj does not belong to above types, it should be * automatically freed for async query, ignoring that connection should be kept. @@ -1477,7 +1478,7 @@ bool tscShouldBeFreed(SSqlObj* pSql) { } STscObj* pTscObj = pSql->pTscObj; - if (pSql->pStream != NULL || pTscObj->pHb == pSql || pTscObj->pSql == pSql) { + if (pSql->pStream != NULL || pTscObj->pHb == pSql || pTscObj->pSql == pSql || pSql->pSubscription != NULL) { return false; } @@ -1874,7 +1875,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void assert(pNewQueryInfo->numOfTables == 1); - if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { assert(pFinalInfo->vgroupList != NULL); } @@ -2009,7 +2010,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) { // SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); // STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); -// if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo) || (pTableMetaInfo->pMetricMeta == NULL)) { +// if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) || (pTableMetaInfo->pMetricMeta == NULL)) { return false; // } @@ -2169,3 +2170,26 @@ void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t column } } +void* malloc_throw(size_t size) { + void* p = malloc(size); + if (p == NULL) { + THROW(TSDB_CODE_CLI_OUT_OF_MEMORY); + } + return p; +} + +void* calloc_throw(size_t nmemb, size_t size) { + void* p = calloc(nmemb, size); + if (p == NULL) { + THROW(TSDB_CODE_CLI_OUT_OF_MEMORY); + } + return p; +} + +char* strdup_throw(const char* str) { + char* p = strdup(str); + if (p == NULL) { + THROW(TSDB_CODE_CLI_OUT_OF_MEMORY); + } + return p; +} diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index 2088e5a49ddd4d249effdc915234c70efff961cc..991b3b73f7d7820dc5d292f95a815e734cfdb64b 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -182,6 +182,7 @@ typedef struct SQInfo { SQueryRuntimeEnv runtimeEnv; int32_t groupIndex; int32_t offset; // offset in group result set of subgroup, todo refactor + SArray* arrTableIdInfo; T_REF_DECLARE() /* diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 4b48d7931690f037282db1e043674fe562cbfb97..be86f64899786aa5bbe68e62971aa997fa6799eb 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -113,7 +113,6 @@ static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols); static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static bool hasMainOutput(SQuery *pQuery); -static void createTableQueryInfo(SQInfo *pQInfo); static void buildTagQueryResult(SQInfo *pQInfo); static int32_t setAdditionalInfo(SQInfo *pQInfo, STableId *pTableId, STableQueryInfo *pTableQueryInfo); @@ -3463,7 +3462,11 @@ static bool hasMainOutput(SQuery *pQuery) { return false; } -STableQueryInfo *createTableQueryInfoImpl(SQueryRuntimeEnv *pRuntimeEnv, STableId tableId, STimeWindow win) { +static STableQueryInfo *createTableQueryInfo( + SQueryRuntimeEnv *pRuntimeEnv, + STableId tableId, + STimeWindow win +) { STableQueryInfo *pTableQueryInfo = calloc(1, sizeof(STableQueryInfo)); pTableQueryInfo->win = win; @@ -3870,7 +3873,18 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data data += bytes * numOfRows; } - + int32_t numOfTables = (int32_t)taosArrayGetSize(pQInfo->arrTableIdInfo); + *(int32_t*)data = htonl(numOfTables); + data += sizeof(int32_t); + for(int32_t i = 0; i < numOfTables; i++) { + STableIdInfo* pSrc = taosArrayGet(pQInfo->arrTableIdInfo, i); + STableIdInfo* pDst = (STableIdInfo*)data; + pDst->uid = htobe64(pSrc->uid); + pDst->tid = htonl(pSrc->tid); + pDst->key = htobe64(pSrc->key); + data += sizeof(STableIdInfo); + } + // all data returned, set query over if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (pQInfo->runtimeEnv.stableQuery && isIntervalQuery(pQuery)) { @@ -4149,6 +4163,42 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv) { return true; } + +static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) { + SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + + if (onlyQueryTags(pQuery)) { + return; + } + + if (isSTableQuery && (!isIntervalQuery(pQuery)) && (!isFixedOutputQuery(pQuery))) { + return; + } + + STsdbQueryCond cond = { + .twindow = pQuery->window, + .order = pQuery->order.order, + .colList = pQuery->colList, + .numOfCols = pQuery->numOfCols, + }; + + if (!isSTableQuery + && (pQInfo->groupInfo.numOfTables == 1) + && (cond.order == TSDB_ORDER_ASC) + && (!isIntervalQuery(pQuery)) + && (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) + && (!isFixedOutputQuery(pQuery)) + ) { + SArray* pa = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); + SGroupItem* pItem = taosArrayGet(pa, 0); + cond.twindow = pItem->info->win; + } + + pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo); +} + + int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool isSTableQuery) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; @@ -4157,26 +4207,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool setScanLimitationByResultBuffer(pQuery); changeExecuteScanOrder(pQuery, false); - - STsdbQueryCond cond = { - .twindow = pQuery->window, - .order = pQuery->order.order, - .colList = pQuery->colList, - .numOfCols = pQuery->numOfCols, - }; - - - // normal query setup the queryhandle here - if (!onlyQueryTags(pQuery)) { - if (!isSTableQuery && isFirstLastRowQuery(pQuery)) { // in case of last_row query, invoke a different API. - pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableIdGroupInfo); - } else if (!isSTableQuery || isIntervalQuery(pQuery) || isFixedOutputQuery(pQuery)) { - pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo); - } - - // create the table query support structures - createTableQueryInfo(pQInfo); - } + setupQueryHandle(tsdb, pQInfo, isSTableQuery); pQInfo->tsdb = tsdb; pQInfo->vgId = vgId; @@ -4595,6 +4626,13 @@ static void sequentialTableProcess(SQInfo *pQInfo) { * to ensure that, we can reset the query range once query on a meter is completed. */ pQInfo->tableIndex++; + + STableIdInfo tidInfo; + tidInfo.uid = item->id.uid; + tidInfo.tid = item->id.tid; + tidInfo.key = pQuery->current->lastKey; + taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo); + // if the buffer is full or group by each table, we need to jump out of the loop if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL) /*|| isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)*/) { @@ -4664,35 +4702,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) { pQuery->limit.offset); } -static void createTableQueryInfo(SQInfo *pQInfo) { - SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - - // todo make sure the table are added the reference count to gauranteed that all involved tables are valid - size_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); - - int32_t index = 0; - for (int32_t i = 0; i < numOfGroups; ++i) { // load all meter meta info - SArray *group = *(SArray **)taosArrayGet(pQInfo->groupInfo.pGroupList, i); - - size_t s = taosArrayGetSize(group); - for (int32_t j = 0; j < s; ++j) { - SGroupItem* item = (SGroupItem *)taosArrayGet(group, j); - - // STableQueryInfo has been created for each table - if (item->info != NULL) { - return; - } - - STableQueryInfo* pInfo = createTableQueryInfoImpl(&pQInfo->runtimeEnv, item->id, pQuery->window); - pInfo->groupIdx = i; - pInfo->tableIndex = index; - - item->info = pInfo; - index += 1; - } - } -} - static void doSaveContext(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; @@ -4914,6 +4923,12 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo, pQuery->current->lastKey, pQuery->window.ekey); + } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + STableIdInfo tidInfo; + tidInfo.uid = pQuery->current->id.uid; + tidInfo.tid = pQuery->current->id.tid; + tidInfo.key = pQuery->current->lastKey; + taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo); } if (!isTSCompQuery(pQuery)) { @@ -5197,20 +5212,10 @@ static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pEx static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **pTableIdList) { assert(pQueryMsg->numOfTables > 0); - *pTableIdList = taosArrayInit(pQueryMsg->numOfTables, sizeof(STableId)); - - STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; - pTableIdInfo->tid = htonl(pTableIdInfo->tid); - pTableIdInfo->uid = htobe64(pTableIdInfo->uid); - pTableIdInfo->key = htobe64(pTableIdInfo->key); - - STableId id = {.uid = pTableIdInfo->uid, .tid = pTableIdInfo->tid}; - taosArrayPush(*pTableIdList, &id); - - pMsg += sizeof(STableIdInfo); + *pTableIdList = taosArrayInit(pQueryMsg->numOfTables, sizeof(STableIdInfo)); - for (int32_t j = 1; j < pQueryMsg->numOfTables; ++j) { - pTableIdInfo = (STableIdInfo *)pMsg; + for (int32_t j = 0; j < pQueryMsg->numOfTables; ++j) { + STableIdInfo* pTableIdInfo = (STableIdInfo *)pMsg; pTableIdInfo->tid = htonl(pTableIdInfo->tid); pTableIdInfo->uid = htobe64(pTableIdInfo->uid); @@ -5661,7 +5666,16 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) { } } -static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, + +static int compareTableIdInfo( const void* a, const void* b ) { + const STableIdInfo* x = (const STableIdInfo*)a; + const STableIdInfo* y = (const STableIdInfo*)b; + if (x->uid > y->uid) return 1; + if (x->uid < y->uid) return -1; + return 0; +} + +static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, STableGroupInfo *groupInfo, SColumnInfo* pTagCols) { SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); if (pQInfo == NULL) { @@ -5754,6 +5768,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou pQInfo->groupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); pQInfo->groupInfo.numOfTables = groupInfo->numOfTables; + int tableIndex = 0; + STimeWindow window = pQueryMsg->window; + taosArraySort( pTableIdList, compareTableIdInfo ); for(int32_t i = 0; i < numOfGroups; ++i) { SArray* pa = taosArrayGetP(groupInfo->pGroupList, i); size_t s = taosArrayGetSize(pa); @@ -5761,13 +5778,26 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou SArray* p1 = taosArrayInit(s, sizeof(SGroupItem)); for(int32_t j = 0; j < s; ++j) { - SGroupItem item = { .id = *(STableId*) taosArrayGet(pa, j), .info = NULL, }; + STableId id = *(STableId*) taosArrayGet(pa, j); + SGroupItem item = { .id = id }; + // NOTE: compare STableIdInfo with STableId + // not a problem at present because we only use their 1st int64_t field + STableIdInfo* pTableId = taosArraySearch( pTableIdList, compareTableIdInfo, &id ); + if (pTableId != NULL ) { + window.skey = pTableId->key; + } else { + window.skey = pQueryMsg->window.skey; + } + item.info = createTableQueryInfo(&pQInfo->runtimeEnv, item.id, window); + item.info->groupIdx = i; + item.info->tableIndex = tableIndex++; taosArrayPush(p1, &item); } - taosArrayPush(pQInfo->groupInfo.pGroupList, &p1); } + pQInfo->arrTableIdInfo = taosArrayInit(tableIndex, sizeof(STableIdInfo)); + pQuery->pos = -1; pQuery->window = pQueryMsg->window; @@ -5919,6 +5949,7 @@ static void freeQInfo(SQInfo *pQInfo) { } taosArrayDestroy(pQInfo->tableIdGroupInfo.pGroupList); + taosArrayDestroy(pQInfo->arrTableIdInfo); if (pQuery->pGroupbyExpr != NULL) { taosArrayDestroy(pQuery->pGroupbyExpr->columnInfo); @@ -6046,32 +6077,48 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_TABLE_QUERY)) { isSTableQuery = TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY); - STableId *id = taosArrayGet(pTableIdList, 0); + STableIdInfo *id = taosArrayGet(pTableIdList, 0); if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) { goto _over; } } else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_STABLE_QUERY)) { isSTableQuery = true; - STableId *id = taosArrayGet(pTableIdList, 0); - - // group by normal column, do not pass the group by condition to tsdb to group table into different group - int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols; - if (pQueryMsg->numOfGroupCols == 1 && !TSDB_COL_IS_TAG(pGroupColIndex->flag)) { - numOfGroupByCols = 0; - } - - // todo handle the error - /*int32_t ret =*/tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &groupInfo, pGroupColIndex, - numOfGroupByCols); - if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query - code = TSDB_CODE_SUCCESS; - goto _over; + // TODO: need a macro from TSDB to check if table is super table, + // also note there's possiblity that only one table in the super table + if (taosArrayGetSize(pTableIdList) == 1) { + STableIdInfo *id = taosArrayGet(pTableIdList, 0); + // if array size is 1 and assert super table + + // group by normal column, do not pass the group by condition to tsdb to group table into different group + int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols; + if (pQueryMsg->numOfGroupCols == 1 && !TSDB_COL_IS_TAG(pGroupColIndex->flag)) { + numOfGroupByCols = 0; + } + + // todo handle the error + /*int32_t ret =*/tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &groupInfo, pGroupColIndex, + numOfGroupByCols); + if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query + code = TSDB_CODE_SUCCESS; + goto _over; + } + } else { + groupInfo.numOfTables = taosArrayGetSize(pTableIdList); + SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES); + + SArray* sa = taosArrayInit(groupInfo.numOfTables, sizeof(STableId)); + for(int32_t i = 0; i < groupInfo.numOfTables; ++i) { + STableIdInfo* tableId = taosArrayGet(pTableIdList, i); + taosArrayPush(sa, tableId); + } + taosArrayPush(pTableGroup, &sa); + groupInfo.pGroupList = pTableGroup; } } else { assert(0); } - (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, &groupInfo, pTagColumnInfo); + (*pQInfo) = createQInfoImpl(pQueryMsg, pTableIdList, pGroupbyExpr, pExprs, &groupInfo, pTagColumnInfo); if ((*pQInfo) == NULL) { code = TSDB_CODE_SERV_OUT_OF_MEMORY; goto _over; @@ -6169,6 +6216,8 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co SQuery *pQuery = pQInfo->runtimeEnv.pQuery; size_t size = getResultSize(pQInfo, &pQuery->rec.rows); + size += sizeof(int32_t); + size += sizeof(STableIdInfo) * taosArrayGetSize(pQInfo->arrTableIdInfo); *contLen = size + sizeof(SRetrieveTableRsp); // todo handle failed to allocate memory diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index b01407024da637344bdb146c256f635492a60d10..1349095ea56fca9668e300675f5d43b57257b68f 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -1523,7 +1523,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, int64_t uid, const char *pTagC } if (pTable->type != TSDB_SUPER_TABLE) { - uError("%p query normal tag not allowed, uid:%, tid:%d, name:%s" PRIu64, + uError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s", tsdb, uid, pTable->tableId.tid, pTable->name); return TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client diff --git a/src/util/inc/exception.h b/src/util/inc/exception.h index 41f01d68dd5fddd800c20b5d23c6b20fa1bb7b73..52cd03d83082ae44b163d92f12f017b853b90e3b 100644 --- a/src/util/inc/exception.h +++ b/src/util/inc/exception.h @@ -73,6 +73,7 @@ void cleanupPush_void_ptr_bool ( bool failOnly, void* func, void* arg1, bool ar void cleanupPush_void_ptr ( bool failOnly, void* func, void* arg ); void cleanupPush_int_int ( bool failOnly, void* func, int arg ); void cleanupPush_void ( bool failOnly, void* func ); +void cleanupPush_int_ptr ( bool failOnly, void* func, void* arg ); int32_t cleanupGetActionCount(); void cleanupExecuteTo( int32_t anchor, bool failed ); @@ -83,8 +84,10 @@ void cleanupExecute( SExceptionNode* node, bool failed ); #define CLEANUP_PUSH_VOID_PTR( failOnly, func, arg ) cleanupPush_void_ptr( (failOnly), (void*)(func), (void*)(arg) ) #define CLEANUP_PUSH_INT_INT( failOnly, func, arg ) cleanupPush_void_ptr( (failOnly), (void*)(func), (int)(arg) ) #define CLEANUP_PUSH_VOID( failOnly, func ) cleanupPush_void( (failOnly), (void*)(func) ) +#define CLEANUP_PUSH_INT_PTR( failOnly, func, arg ) cleanupPush_int_ptr( (failOnly), (void*)(func), (void*)(arg) ) #define CLEANUP_PUSH_FREE( failOnly, arg ) cleanupPush_void_ptr( (failOnly), free, (void*)(arg) ) #define CLEANUP_PUSH_CLOSE( failOnly, arg ) cleanupPush_int_int( (failOnly), close, (int)(arg) ) +#define CLEANUP_PUSH_FCLOSE( failOnly, arg ) cleanupPush_int_ptr( (failOnly), fclose, (void*)(arg) ) #define CLEANUP_GET_ANCHOR() cleanupGetActionCount() #define CLEANUP_EXECUTE_TO( anchor, failed ) cleanupExecuteTo( (anchor), (failed) ) @@ -95,7 +98,7 @@ void cleanupExecute( SExceptionNode* node, bool failed ); void exceptionPushNode( SExceptionNode* node ); int32_t exceptionPopNode(); -void exceptionThrow( int code ); +void exceptionThrow( int32_t code ); #define TRY(maxCleanupActions) do { \ SExceptionNode exceptionNode = { 0 }; \ @@ -106,10 +109,10 @@ void exceptionThrow( int code ); int caughtException = setjmp( exceptionNode.jb ); \ if( caughtException == 0 ) -#define CATCH( code ) int code = exceptionPopNode(); \ +#define CATCH( code ) int32_t code = exceptionPopNode(); \ if( caughtException == 1 ) -#define FINALLY( code ) int code = exceptionPopNode(); +#define FINALLY( code ) int32_t code = exceptionPopNode(); #define END_TRY } while( 0 ); diff --git a/src/util/inc/tarray.h b/src/util/inc/tarray.h index 7edd032034c18a3120647525f38ed64e86d0cd08..866bde0938499c68a357a86fe90972603708008a 100644 --- a/src/util/inc/tarray.h +++ b/src/util/inc/tarray.h @@ -106,6 +106,12 @@ void taosArrayCopy(SArray* pDst, const SArray* pSrc); */ SArray* taosArrayClone(const SArray* pSrc); +/** + * clear the array (remove all element) + * @param pArray + */ +void taosArrayClear(SArray* pArray); + /** * destroy array list * @param pArray diff --git a/src/util/src/exception.c b/src/util/src/exception.c index 7f8f91c784b7a07078a2801a373d608693178db7..3d2949c093d47aeb41d4e10cd566ba9975f44457 100644 --- a/src/util/src/exception.c +++ b/src/util/src/exception.c @@ -14,7 +14,7 @@ int32_t exceptionPopNode() { return node->code; } -void exceptionThrow( int code ) { +void exceptionThrow( int32_t code ) { expList->code = code; longjmp( expList->jb, 1 ); } @@ -38,21 +38,27 @@ static void cleanupWrapper_void_ptr( SCleanupAction* ca ) { static void cleanupWrapper_int_int( SCleanupAction* ca ) { int (*func)( int ) = ca->func; - func( (int)(intptr_t)(ca->arg1.Int) ); + func( ca->arg1.Int ); } -static void cleanupWrapper_void_void( SCleanupAction* ca ) { +static void cleanupWrapper_void( SCleanupAction* ca ) { void (*func)() = ca->func; func(); } +static void cleanupWrapper_int_ptr( SCleanupAction* ca ) { + int (*func)( void* ) = ca->func; + func( ca->arg1.Ptr ); +} + typedef void (*wrapper)(SCleanupAction*); static wrapper wrappers[] = { cleanupWrapper_void_ptr_ptr, cleanupWrapper_void_ptr_bool, cleanupWrapper_void_ptr, cleanupWrapper_int_int, - cleanupWrapper_void_void, + cleanupWrapper_void, + cleanupWrapper_int_ptr, }; @@ -107,6 +113,15 @@ void cleanupPush_void( bool failOnly, void* func ) { ca->func = func; } +void cleanupPush_int_ptr( bool failOnly, void* func, void* arg ) { + assert( expList->numCleanupAction < expList->maxCleanupAction ); + + SCleanupAction *ca = expList->cleanupActions + expList->numCleanupAction++; + ca->wrapper = 5; + ca->failOnly = failOnly; + ca->func = func; + ca->arg1.Ptr = arg; +} int32_t cleanupGetActionCount() { @@ -118,8 +133,9 @@ static void doExecuteCleanup( SExceptionNode* node, int32_t anchor, bool failed while( node->numCleanupAction > anchor ) { --node->numCleanupAction; SCleanupAction *ca = node->cleanupActions + node->numCleanupAction; - if( failed || !(ca->failOnly) ) + if( failed || !(ca->failOnly) ) { wrappers[ca->wrapper]( ca ); + } } } diff --git a/src/util/src/tarray.c b/src/util/src/tarray.c index 5ef4417710adbaa7151ee19b2f276653d69a3067..5198597ff71dbc14e622f5b2a8fa9189ddaaf2b4 100755 --- a/src/util/src/tarray.c +++ b/src/util/src/tarray.c @@ -176,6 +176,11 @@ SArray* taosArrayClone(const SArray* pSrc) { return dst; } +void taosArrayClear(SArray* pArray) { + assert( pArray != NULL ); + pArray->size = 0; +} + void taosArrayDestroy(SArray* pArray) { if (pArray == NULL) { return; diff --git a/tests/examples/c/subscribe.c b/tests/examples/c/subscribe.c index 0bf93f6f2ddd81e715e7d9cf0b5abfd054635060..f9acf2bb10d36740dc5b82704e18256c4421ba5e 100644 --- a/tests/examples/c/subscribe.c +++ b/tests/examples/c/subscribe.c @@ -56,32 +56,46 @@ void run_test(TAOS* taos) { taos_query(taos, "drop database if exists test;"); usleep(100000); - taos_query(taos, "create database test tables 5;"); + //taos_query(taos, "create database test tables 5;"); + taos_query(taos, "create database test;"); usleep(100000); taos_query(taos, "use test;"); + usleep(100000); - taos_query(taos, "create table meters(ts timestamp, a int, b binary(20)) tags(loc binary(20), area int);"); - - taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:00:00.000', 0, 'china');"); - taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:01:00.000', 0, 'china');"); - taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:02:00.000', 0, 'china');"); - taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:00:00.000', 0, 'china');"); - taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:01:00.000', 0, 'china');"); - taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:02:00.000', 0, 'china');"); - taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:03:00.000', 0, 'china');"); - taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:00:00.000', 0, 'UK');"); - taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:00.000', 0, 'UK');"); - taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:01.000', 0, 'UK');"); - taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:02.000', 0, 'UK');"); - taos_query(taos, "insert into t3 using meters tags('tianjin', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); - taos_query(taos, "insert into t4 using meters tags('wuhan', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); - taos_query(taos, "insert into t5 using meters tags('jinan', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); - taos_query(taos, "insert into t6 using meters tags('haikou', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); - taos_query(taos, "insert into t7 using meters tags('nanjing', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); - taos_query(taos, "insert into t8 using meters tags('lanzhou', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); - taos_query(taos, "insert into t9 using meters tags('tokyo', 0) values('2020-01-01 00:01:02.000', 0, 'japan');"); + taos_query(taos, "create table meters(ts timestamp, a int) tags(area int);"); + + taos_query(taos, "create table t0 using meters tags(0);"); + taos_query(taos, "create table t1 using meters tags(1);"); + taos_query(taos, "create table t2 using meters tags(2);"); + taos_query(taos, "create table t3 using meters tags(3);"); + taos_query(taos, "create table t4 using meters tags(4);"); + taos_query(taos, "create table t5 using meters tags(5);"); + taos_query(taos, "create table t6 using meters tags(6);"); + taos_query(taos, "create table t7 using meters tags(7);"); + taos_query(taos, "create table t8 using meters tags(8);"); + taos_query(taos, "create table t9 using meters tags(9);"); + + taos_query(taos, "insert into t0 values('2020-01-01 00:00:00.000', 0);"); + taos_query(taos, "insert into t0 values('2020-01-01 00:01:00.000', 0);"); + taos_query(taos, "insert into t0 values('2020-01-01 00:02:00.000', 0);"); + taos_query(taos, "insert into t1 values('2020-01-01 00:00:00.000', 0);"); + taos_query(taos, "insert into t1 values('2020-01-01 00:01:00.000', 0);"); + taos_query(taos, "insert into t1 values('2020-01-01 00:02:00.000', 0);"); + taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.000', 0);"); + taos_query(taos, "insert into t2 values('2020-01-01 00:00:00.000', 0);"); + taos_query(taos, "insert into t2 values('2020-01-01 00:01:00.000', 0);"); + taos_query(taos, "insert into t2 values('2020-01-01 00:01:01.000', 0);"); + taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.000', 0);"); + taos_query(taos, "insert into t3 values('2020-01-01 00:01:02.000', 0);"); + taos_query(taos, "insert into t4 values('2020-01-01 00:01:02.000', 0);"); + taos_query(taos, "insert into t5 values('2020-01-01 00:01:02.000', 0);"); + taos_query(taos, "insert into t6 values('2020-01-01 00:01:02.000', 0);"); + taos_query(taos, "insert into t7 values('2020-01-01 00:01:02.000', 0);"); + taos_query(taos, "insert into t8 values('2020-01-01 00:01:02.000', 0);"); + taos_query(taos, "insert into t9 values('2020-01-01 00:01:02.000', 0);"); // super tables subscription + usleep(1000000); TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0); TAOS_RES* res = taos_consume(tsub); @@ -90,23 +104,23 @@ void run_test(TAOS* taos) { res = taos_consume(tsub); check_row_count(__LINE__, res, 0); - taos_query(taos, "insert into t0 values('2020-01-01 00:03:00.000', 0, 'china');"); - taos_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0, 'china');"); + taos_query(taos, "insert into t0 values('2020-01-01 00:02:00.001', 0);"); + taos_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0);"); res = taos_consume(tsub); check_row_count(__LINE__, res, 2); - taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0, 'UK');"); - taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0, 'UK');"); + taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0);"); + taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0);"); res = taos_consume(tsub); check_row_count(__LINE__, res, 2); - taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0, 'china');"); + taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0);"); res = taos_consume(tsub); check_row_count(__LINE__, res, 1); // keep progress information and restart subscription taos_unsubscribe(tsub, 1); - taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0, 'china');"); + taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0);"); tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0); res = taos_consume(tsub); check_row_count(__LINE__, res, 24); @@ -133,7 +147,7 @@ void run_test(TAOS* taos) { res = taos_consume(tsub); check_row_count(__LINE__, res, 0); - taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0, 'china');"); + taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0);"); res = taos_consume(tsub); check_row_count(__LINE__, res, 1); @@ -197,7 +211,7 @@ int main(int argc, char *argv[]) { // init TAOS taos_init(); - TAOS* taos = taos_connect(host, user, passwd, "test", 0); + TAOS* taos = taos_connect(host, user, passwd, "", 0); if (taos == NULL) { printf("failed to connect to db, reason:%s\n", taos_errstr(taos)); exit(1); @@ -209,6 +223,7 @@ int main(int argc, char *argv[]) { exit(0); } + taos_query(taos, "use test;"); TAOS_SUB* tsub = NULL; if (async) { // create an asynchronized subscription, the callback function will be called every 1s diff --git a/tests/script/general/cache/new_metrics.sim b/tests/script/general/cache/new_metrics.sim index 64170ec67ee7465dd75c87bca694048788ee183a..0e304d9acb0ec3ceba18993846f9f6d8269ff966 100644 --- a/tests/script/general/cache/new_metrics.sim +++ b/tests/script/general/cache/new_metrics.sim @@ -27,7 +27,7 @@ sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol bool) $i = 0 while $i < 5 $tb = $tbPrefix . $i - sql create table $tb using $mt tags( 0 ) + sql create table $tb using $mt tags( $i ) $x = 0 while $x < $rowNum $val = $x * 60000