From 9dd3e62b4b11b96e8f6db1b51ee09a64deace8cf Mon Sep 17 00:00:00 2001 From: localvar Date: Mon, 11 May 2020 17:48:52 +0800 Subject: [PATCH] build VgroupTableInfo --- src/client/inc/tsclient.h | 3 + src/client/src/tscSub.c | 222 +++++++++++++++++++++++------------ src/client/src/tscSubquery.c | 100 ++++++---------- src/client/src/tscUtil.c | 2 +- src/tsdb/src/tsdbRead.c | 2 +- 5 files changed, 186 insertions(+), 143 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 6ea1ee6440..a7eec31388 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -438,6 +438,9 @@ extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows); +int32_t tscCompareTidTags(const void* p1, const void* p2); +void tscBuildVgroupTableInfo(STableMetaInfo* pTableMetaInfo, SArray* tables); + #ifdef __cplusplus } #endif diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 009372f3f7..5f20ee4fbc 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -61,14 +61,11 @@ TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid) { return 0; } SSub* pSub = (SSub*)sub; - if (pSub->progress == NULL) { - return 0; - } SSubscriptionProgress target = {.uid = uid, .key = 0}; SSubscriptionProgress* p = taosArraySearch(pSub->progress, tscCompareSubscriptionProgress, &target); if (p == NULL) { - return 0; + return INT64_MIN; } return p->key; } @@ -82,61 +79,90 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) { SSubscriptionProgress* p = taosArraySearch(pSub->progress, tscCompareSubscriptionProgress, &target); if (p != NULL) { p->key = ts; - return; } +} - taosArrayPush(pSub->progress, &target); - taosArraySort(pSub->progress, tscCompareSubscriptionProgress); + +static void asyncCallback(void *param, TAOS_RES *tres, int code) { + assert(param != NULL); + SSqlObj *pSql = ((SSqlObj *)param); + + pSql->res.code = code; + sem_post(&pSql->rspSem); } static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* sql) { - SSub* pSub = calloc(1, sizeof(SSub)); - if (pSub == NULL) { - terrno = TSDB_CODE_CLI_OUT_OF_MEMORY; - tscError("failed to allocate memory for subscription"); - return NULL; - } + SSub* pSub = NULL; - SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); - if (pSql == NULL) { - terrno = TSDB_CODE_CLI_OUT_OF_MEMORY; - tscError("failed to allocate SSqlObj for subscription"); - goto _pSql_failed; - } + TRY( 8 ) { + SSqlObj* pSql = calloc_throw(1, sizeof(SSqlObj)); + CLEANUP_PUSH_FREE(true, pSql); + SSqlCmd *pCmd = &pSql->cmd; + SSqlRes *pRes = &pSql->res; - pSql->signature = pSql; - pSql->pTscObj = pObj; + if (tsem_init(&pSql->rspSem, 0, 0) == -1) { + THROW(TAOS_SYSTEM_ERROR(errno)); + } + CLEANUP_PUSH_INT_PTR(true, tsem_destroy, &pSql->rspSem); - char* sqlstr = (char*)malloc(strlen(sql) + 1); - if (sqlstr == NULL) { - tscError("failed to allocate sql string for subscription"); - goto failed; - } - strcpy(sqlstr, sql); - strtolower(sqlstr, sqlstr); - pSql->sqlstr = sqlstr; + pSql->signature = pSql; + pSql->param = pSql; + pSql->pTscObj = pObj; + pSql->maxRetry = TSDB_MAX_REPLICA_NUM; + pSql->fp = asyncCallback; - tsem_init(&pSql->rspSem, 0, 0); + int code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); + if (code != TSDB_CODE_SUCCESS) { + THROW(code); + } + CLEANUP_PUSH_FREE(true, pCmd->payload); - SSqlRes *pRes = &pSql->res; - pRes->numOfRows = 1; - pRes->numOfTotal = 0; - - pSql->pSubscription = pSub; - pSub->pSql = pSql; - pSub->signature = pSub; - strncpy(pSub->topic, topic, sizeof(pSub->topic)); - pSub->topic[sizeof(pSub->topic) - 1] = 0; - return pSub; + pRes->qhandle = 0; + pRes->numOfRows = 1; -failed: - tfree(sqlstr); + pSql->sqlstr = strdup_throw(sql); + CLEANUP_PUSH_FREE(true, pSql->sqlstr); + strtolower(pSql->sqlstr, pSql->sqlstr); + + code = tsParseSql(pSql, false); + if (code == TSDB_CODE_ACTION_IN_PROGRESS) { + // wait for the callback function to post the semaphore + sem_wait(&pSql->rspSem); + code = pSql->res.code; + } + if (code != TSDB_CODE_SUCCESS) { + tscError("failed to parse sql statement: %s, error: %s", pSub->topic, tstrerror(code)); + THROW( code ); + } + + if (pSql->cmd.command != TSDB_SQL_SELECT) { + tscError("only 'select' statement is allowed in subscription: %s", pSub->topic); + THROW( -1 ); // TODO + } -_pSql_failed: - tfree(pSql); - tfree(pSub); - return NULL; + pSub = calloc_throw(1, sizeof(SSub)); + CLEANUP_PUSH_FREE(true, pSub); + pSql->pSubscription = pSub; + pSub->pSql = pSql; + pSub->signature = pSub; + strncpy(pSub->topic, topic, sizeof(pSub->topic)); + pSub->topic[sizeof(pSub->topic) - 1] = 0; + pSub->progress = taosArrayInit(32, sizeof(SSubscriptionProgress)); + if (pSub->progress == NULL) { + THROW(TSDB_CODE_CLI_OUT_OF_MEMORY); + } + + CLEANUP_EXECUTE(); + + } CATCH( code ) { + tscError("failed to create subscription object: %s", tstrerror(code)); + CLEANUP_EXECUTE(); + pSub = NULL; + + } END_TRY + + return pSub; } @@ -152,41 +178,67 @@ static void tscProcessSubscriptionTimer(void *handle, void *tmrId) { taosTmrReset(tscProcessSubscriptionTimer, pSub->interval, pSub, tscTmr, &pSub->pTimer); } -static void waitParseComplete(void *param, TAOS_RES *tres, int code) { - assert(param != NULL); - SSqlObj *pSql = ((SSqlObj *)param); - - pSql->res.code = code; - sem_post(&pSql->rspSem); + +static SArray* getTableList( SSqlObj* pSql ) { + const char* p = strstr( pSql->sqlstr, " from " ); + char* sql = alloca(strlen(p) + 32); + sprintf(sql, "select tbid(area)%s", p); + int code = taos_query( pSql->pTscObj, sql ); + if (code != TSDB_CODE_SUCCESS) { + tscError("failed to retrieve table id: %s", tstrerror(code)); + return NULL; + } + + TAOS_RES* res = taos_use_result( pSql->pTscObj ); + TAOS_ROW row; + SArray* result = taosArrayInit( 128, sizeof(STidTags) ); + while ((row = taos_fetch_row(res))) { + STidTags tags; + memcpy(&tags, row[0], sizeof(tags)); + taosArrayPush(result, &tags); + } + + return result; } -int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { + +static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { SSqlObj* pSql = pSub->pSql; SSqlCmd* pCmd = &pSql->cmd; - tscAllocPayload( pCmd, TSDB_DEFAULT_PAYLOAD_SIZE ); - pSql->fp = waitParseComplete; - pSql->param = pSql; - int code = tsParseSql(pSql, false); - if (code == TSDB_CODE_ACTION_IN_PROGRESS) { - // wait for the callback function to post the semaphore - sem_wait(&pSql->rspSem); - code = pSql->res.code; - } - if (code != TSDB_CODE_SUCCESS) { - tscError("failed to parse sql statement: %s, error: %s", pSub->topic, tstrerror(code)); - return 0; + pSub->lastSyncTime = taosGetTimestampMs(); + + STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); + if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) { + STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; + SSubscriptionProgress target = {.uid = pTableMeta->uid, .key = 0}; + SSubscriptionProgress* p = taosArraySearch(pSub->progress, tscCompareSubscriptionProgress, &target); + if (p == NULL) { + taosArrayClear(pSub->progress); + taosArrayPush(pSub->progress, &target); + } + return 1; } - if (pCmd->command != TSDB_SQL_SELECT) { - tscError("only 'select' statement is allowed in subscription: %s", pSub->topic); - return 0; + SArray* tables = getTableList(pSql); + size_t numOfTables = taosArrayGetSize(tables); + + SArray* progress = taosArrayInit(numOfTables, sizeof(SSubscriptionProgress)); + for( size_t i = 0; i < numOfTables; i++ ) { + STidTags* tt = taosArrayGet( tables, i ); + SSubscriptionProgress p = { .uid = tt->uid }; + p.key = tscGetSubscriptionProgress(pSub, tt->uid); + taosArrayPush(progress, &p); } + taosArraySort(progress, tscCompareSubscriptionProgress); taosArrayDestroy(pSub->progress); - pSub->progress = taosArrayInit(32, sizeof(SSubscriptionProgress)); - pSub->lastSyncTime = taosGetTimestampMs(); + pSub->progress = progress; + + taosArraySort( tables, tscCompareTidTags ); + tscBuildVgroupTableInfo( pTableMetaInfo, tables ); + taosArrayDestroy(tables); return 1; } @@ -229,11 +281,11 @@ static int tscLoadSubscriptionProgress(SSub* pSub) { } int numOfTables = atoi(buf); - SArray* progress = taosArrayInit(numOfTables, sizeof(SSubscriptionProgress)); + SArray* progress = pSub->progress; + taosArrayClear(progress); for (int i = 0; i < numOfTables; i++) { if (fgets(buf, sizeof(buf), fp) == NULL) { fclose(fp); - free(progress); return 0; } SSubscriptionProgress p; @@ -244,8 +296,6 @@ static int tscLoadSubscriptionProgress(SSub* pSub) { fclose(fp); taosArraySort(progress, tscCompareSubscriptionProgress); - taosArrayDestroy(pSub->progress); - pSub->progress = progress; tscTrace("subscription progress loaded, %d tables: %s", numOfTables, pSub->topic); return 1; } @@ -333,12 +383,28 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { for (int retry = 0; retry < 3; retry++) { tscRemoveFromSqlList(pSql); - pSql->fp = waitParseComplete; + if (taosGetTimestampMs() - pSub->lastSyncTime > 10 * 60 * 1000) { + tscTrace("begin table synchronization"); + if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL; + tscTrace("table synchronization completed"); + } else { + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + + uint32_t type = pQueryInfo->type; + // taos_free_result_imp(pSql, 1); + pRes->numOfRows = 1; + //pRes->numOfTotal = 0; + pRes->qhandle = 0; + pSql->cmd.command = TSDB_SQL_SELECT; + pQueryInfo->type = type; + + tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vgroupIndex = 0; + } + + pSql->fp = asyncCallback; pSql->param = pSql; tscDoQuery(pSql); - if (pRes->code == TSDB_CODE_ACTION_IN_PROGRESS) { - sem_wait(&pSql->rspSem); - } + sem_wait(&pSql->rspSem); if (pRes->code != TSDB_CODE_SUCCESS) { continue; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 073fffbc57..fbe19fc030 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -463,77 +463,51 @@ static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* } } -int32_t tagsOrderCompar(const void* p1, const void* p2) { - STidTags* t1 = (STidTags*) p1; - STidTags* t2 = (STidTags*) p2; +int32_t tscCompareTidTags(const void* p1, const void* p2) { + const STidTags* t1 = (const STidTags*) p1; + const STidTags* t2 = (const STidTags*) p2; if (t1->vgId != t2->vgId) { - return (t1->vgId > t2->vgId)? 1:-1; - } else { - if (t1->tid != t2->tid) { - return (t1->tid > t2->tid)? 1:-1; - } else { - return 0; - } + return (t1->vgId > t2->vgId) ? 1 : -1; + } + if (t1->tid != t2->tid) { + return (t1->tid > t2->tid) ? 1 : -1; } + return 0; } -static void doBuildVgroupTableInfo(SArray* res, STableMetaInfo* pTableMetaInfo) { - SArray* pGroup = taosArrayInit(4, sizeof(SVgroupTableInfo)); - - SArray* vgTableIdItem = taosArrayInit(4, sizeof(STableIdInfo)); - int32_t size = taosArrayGetSize(res); - - STidTags* prev = taosArrayGet(res, 0); - int32_t prevVgId = prev->vgId; - - STableIdInfo item = {.uid = prev->uid, .tid = prev->tid, .key = INT64_MIN}; - taosArrayPush(vgTableIdItem, &item); - - for(int32_t k = 1; k < size; ++k) { - STidTags* t1 = taosArrayGet(res, k); - if (prevVgId != t1->vgId) { - - SVgroupTableInfo info = {0}; - +void tscBuildVgroupTableInfo(STableMetaInfo* pTableMetaInfo, SArray* tables) { + SArray* result = taosArrayInit( 4, sizeof(SVgroupTableInfo) ); + SArray* vgTables = NULL; + STidTags* prev = NULL; + + size_t numOfTables = taosArrayGetSize( tables ); + for( size_t i = 0; i < numOfTables; i++ ) { + STidTags* tt = taosArrayGet( tables, i ); + + if( prev == NULL || tt->vgId != prev->vgId ) { SVgroupsInfo* pvg = pTableMetaInfo->vgroupList; - for(int32_t m = 0; m < pvg->numOfVgroups; ++m) { - if (prevVgId == pvg->vgroups[m].vgId) { + + SVgroupTableInfo info = { 0 }; + for( int32_t m = 0; m < pvg->numOfVgroups; ++m ) { + if( tt->vgId == pvg->vgroups[m].vgId ) { info.vgInfo = pvg->vgroups[m]; break; } } - - assert(info.vgInfo.numOfIps != 0); - info.itemList = vgTableIdItem; - taosArrayPush(pGroup, &info); - - vgTableIdItem = taosArrayInit(4, sizeof(STableIdInfo)); - STableIdInfo item1 = {.uid = t1->uid, .tid = t1->tid, .key = INT64_MIN}; - taosArrayPush(vgTableIdItem, &item1); - prevVgId = t1->vgId; - } else { - taosArrayPush(vgTableIdItem, &item); - } - } - - if (taosArrayGetSize(vgTableIdItem) > 0) { - SVgroupTableInfo info = {0}; - SVgroupsInfo* pvg = pTableMetaInfo->vgroupList; - - for(int32_t m = 0; m < pvg->numOfVgroups; ++m) { - if (prevVgId == pvg->vgroups[m].vgId) { - info.vgInfo = pvg->vgroups[m]; - break; - } + assert( info.vgInfo.numOfIps != 0 ); + + vgTables = taosArrayInit( 4, sizeof(STableIdInfo) ); + info.itemList = vgTables; + taosArrayPush( result, &info ); } - - assert(info.vgInfo.numOfIps != 0); - info.itemList = vgTableIdItem; - taosArrayPush(pGroup, &info); + + STableIdInfo item = { .uid = tt->uid, .tid = tt->tid, .key = INT64_MIN }; + taosArrayPush( vgTables, &item ); + prev = tt; } - - pTableMetaInfo->pVgroupTables = pGroup; + + pTableMetaInfo->pVgroupTables = result; } static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) { @@ -627,8 +601,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { SJoinSupporter* p1 = pParentSql->pSubs[0]->param; SJoinSupporter* p2 = pParentSql->pSubs[1]->param; - qsort(p1->pIdTagList, p1->num, p1->tagSize, tagsOrderCompar); - qsort(p2->pIdTagList, p2->num, p2->tagSize, tagsOrderCompar); + qsort(p1->pIdTagList, p1->num, p1->tagSize, tscCompareTidTags); + qsort(p2->pIdTagList, p2->num, p2->tagSize, tscCompareTidTags); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -668,11 +642,11 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0); STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0); - doBuildVgroupTableInfo(s1, pTableMetaInfo1); + tscBuildVgroupTableInfo(pTableMetaInfo1, s1); SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0); STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0); - doBuildVgroupTableInfo(s2, pTableMetaInfo2); + tscBuildVgroupTableInfo(pTableMetaInfo2, s2); pSupporter->pState->numOfCompleted = 0; pSupporter->pState->code = 0; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 2ef7342d08..85735b14e1 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2165,7 +2165,7 @@ void* malloc_throw(size_t size) { } void* calloc_throw(size_t nmemb, size_t size) { - void* p = malloc(size); + void* p = calloc(nmemb, size); if (p == NULL) { THROW(TSDB_CODE_CLI_OUT_OF_MEMORY); } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 6576e7d7f1..a999502897 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -1522,7 +1522,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, int64_t uid, const char *pTagC } if (pTable->type != TSDB_SUPER_TABLE) { - uError("%p query normal tag not allowed, uid:%, tid:%d, name:%s" PRIu64, + uError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s", tsdb, uid, pTable->tableId.tid, pTable->name); return TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client -- GitLab