提交 9dd3e62b 编写于 作者: weixin_48148422's avatar weixin_48148422

build VgroupTableInfo

上级 50f432c6
...@@ -438,6 +438,9 @@ extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); ...@@ -438,6 +438,9 @@ extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows); typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows);
int32_t tscCompareTidTags(const void* p1, const void* p2);
void tscBuildVgroupTableInfo(STableMetaInfo* pTableMetaInfo, SArray* tables);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -61,14 +61,11 @@ TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid) { ...@@ -61,14 +61,11 @@ TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid) {
return 0; return 0;
} }
SSub* pSub = (SSub*)sub; SSub* pSub = (SSub*)sub;
if (pSub->progress == NULL) {
return 0;
}
SSubscriptionProgress target = {.uid = uid, .key = 0}; SSubscriptionProgress target = {.uid = uid, .key = 0};
SSubscriptionProgress* p = taosArraySearch(pSub->progress, tscCompareSubscriptionProgress, &target); SSubscriptionProgress* p = taosArraySearch(pSub->progress, tscCompareSubscriptionProgress, &target);
if (p == NULL) { if (p == NULL) {
return 0; return INT64_MIN;
} }
return p->key; return p->key;
} }
...@@ -82,61 +79,90 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) { ...@@ -82,61 +79,90 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) {
SSubscriptionProgress* p = taosArraySearch(pSub->progress, tscCompareSubscriptionProgress, &target); SSubscriptionProgress* p = taosArraySearch(pSub->progress, tscCompareSubscriptionProgress, &target);
if (p != NULL) { if (p != NULL) {
p->key = ts; p->key = ts;
return;
} }
}
taosArrayPush(pSub->progress, &target);
taosArraySort(pSub->progress, tscCompareSubscriptionProgress); static void asyncCallback(void *param, TAOS_RES *tres, int code) {
assert(param != NULL);
SSqlObj *pSql = ((SSqlObj *)param);
pSql->res.code = code;
sem_post(&pSql->rspSem);
} }
static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* sql) { static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* sql) {
SSub* pSub = calloc(1, sizeof(SSub)); SSub* pSub = NULL;
if (pSub == NULL) {
terrno = TSDB_CODE_CLI_OUT_OF_MEMORY;
tscError("failed to allocate memory for subscription");
return NULL;
}
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); TRY( 8 ) {
if (pSql == NULL) { SSqlObj* pSql = calloc_throw(1, sizeof(SSqlObj));
terrno = TSDB_CODE_CLI_OUT_OF_MEMORY; CLEANUP_PUSH_FREE(true, pSql);
tscError("failed to allocate SSqlObj for subscription"); SSqlCmd *pCmd = &pSql->cmd;
goto _pSql_failed; SSqlRes *pRes = &pSql->res;
}
pSql->signature = pSql; if (tsem_init(&pSql->rspSem, 0, 0) == -1) {
pSql->pTscObj = pObj; THROW(TAOS_SYSTEM_ERROR(errno));
}
CLEANUP_PUSH_INT_PTR(true, tsem_destroy, &pSql->rspSem);
char* sqlstr = (char*)malloc(strlen(sql) + 1); pSql->signature = pSql;
if (sqlstr == NULL) { pSql->param = pSql;
tscError("failed to allocate sql string for subscription"); pSql->pTscObj = pObj;
goto failed; pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
} pSql->fp = asyncCallback;
strcpy(sqlstr, sql);
strtolower(sqlstr, sqlstr);
pSql->sqlstr = sqlstr;
tsem_init(&pSql->rspSem, 0, 0); int code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (code != TSDB_CODE_SUCCESS) {
THROW(code);
}
CLEANUP_PUSH_FREE(true, pCmd->payload);
SSqlRes *pRes = &pSql->res; pRes->qhandle = 0;
pRes->numOfRows = 1; pRes->numOfRows = 1;
pRes->numOfTotal = 0;
pSql->pSubscription = pSub;
pSub->pSql = pSql;
pSub->signature = pSub;
strncpy(pSub->topic, topic, sizeof(pSub->topic));
pSub->topic[sizeof(pSub->topic) - 1] = 0;
return pSub;
failed: pSql->sqlstr = strdup_throw(sql);
tfree(sqlstr); CLEANUP_PUSH_FREE(true, pSql->sqlstr);
strtolower(pSql->sqlstr, pSql->sqlstr);
code = tsParseSql(pSql, false);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
// wait for the callback function to post the semaphore
sem_wait(&pSql->rspSem);
code = pSql->res.code;
}
if (code != TSDB_CODE_SUCCESS) {
tscError("failed to parse sql statement: %s, error: %s", pSub->topic, tstrerror(code));
THROW( code );
}
if (pSql->cmd.command != TSDB_SQL_SELECT) {
tscError("only 'select' statement is allowed in subscription: %s", pSub->topic);
THROW( -1 ); // TODO
}
_pSql_failed: pSub = calloc_throw(1, sizeof(SSub));
tfree(pSql); CLEANUP_PUSH_FREE(true, pSub);
tfree(pSub); pSql->pSubscription = pSub;
return NULL; pSub->pSql = pSql;
pSub->signature = pSub;
strncpy(pSub->topic, topic, sizeof(pSub->topic));
pSub->topic[sizeof(pSub->topic) - 1] = 0;
pSub->progress = taosArrayInit(32, sizeof(SSubscriptionProgress));
if (pSub->progress == NULL) {
THROW(TSDB_CODE_CLI_OUT_OF_MEMORY);
}
CLEANUP_EXECUTE();
} CATCH( code ) {
tscError("failed to create subscription object: %s", tstrerror(code));
CLEANUP_EXECUTE();
pSub = NULL;
} END_TRY
return pSub;
} }
...@@ -152,41 +178,67 @@ static void tscProcessSubscriptionTimer(void *handle, void *tmrId) { ...@@ -152,41 +178,67 @@ static void tscProcessSubscriptionTimer(void *handle, void *tmrId) {
taosTmrReset(tscProcessSubscriptionTimer, pSub->interval, pSub, tscTmr, &pSub->pTimer); taosTmrReset(tscProcessSubscriptionTimer, pSub->interval, pSub, tscTmr, &pSub->pTimer);
} }
static void waitParseComplete(void *param, TAOS_RES *tres, int code) {
assert(param != NULL); static SArray* getTableList( SSqlObj* pSql ) {
SSqlObj *pSql = ((SSqlObj *)param); const char* p = strstr( pSql->sqlstr, " from " );
char* sql = alloca(strlen(p) + 32);
pSql->res.code = code; sprintf(sql, "select tbid(area)%s", p);
sem_post(&pSql->rspSem); int code = taos_query( pSql->pTscObj, sql );
if (code != TSDB_CODE_SUCCESS) {
tscError("failed to retrieve table id: %s", tstrerror(code));
return NULL;
}
TAOS_RES* res = taos_use_result( pSql->pTscObj );
TAOS_ROW row;
SArray* result = taosArrayInit( 128, sizeof(STidTags) );
while ((row = taos_fetch_row(res))) {
STidTags tags;
memcpy(&tags, row[0], sizeof(tags));
taosArrayPush(result, &tags);
}
return result;
} }
int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
SSqlObj* pSql = pSub->pSql; SSqlObj* pSql = pSub->pSql;
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
tscAllocPayload( pCmd, TSDB_DEFAULT_PAYLOAD_SIZE );
pSql->fp = waitParseComplete; pSub->lastSyncTime = taosGetTimestampMs();
pSql->param = pSql;
int code = tsParseSql(pSql, false); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) { if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
// wait for the callback function to post the semaphore STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
sem_wait(&pSql->rspSem); SSubscriptionProgress target = {.uid = pTableMeta->uid, .key = 0};
code = pSql->res.code; SSubscriptionProgress* p = taosArraySearch(pSub->progress, tscCompareSubscriptionProgress, &target);
} if (p == NULL) {
if (code != TSDB_CODE_SUCCESS) { taosArrayClear(pSub->progress);
tscError("failed to parse sql statement: %s, error: %s", pSub->topic, tstrerror(code)); taosArrayPush(pSub->progress, &target);
return 0; }
return 1;
} }
if (pCmd->command != TSDB_SQL_SELECT) { SArray* tables = getTableList(pSql);
tscError("only 'select' statement is allowed in subscription: %s", pSub->topic); size_t numOfTables = taosArrayGetSize(tables);
return 0;
SArray* progress = taosArrayInit(numOfTables, sizeof(SSubscriptionProgress));
for( size_t i = 0; i < numOfTables; i++ ) {
STidTags* tt = taosArrayGet( tables, i );
SSubscriptionProgress p = { .uid = tt->uid };
p.key = tscGetSubscriptionProgress(pSub, tt->uid);
taosArrayPush(progress, &p);
} }
taosArraySort(progress, tscCompareSubscriptionProgress);
taosArrayDestroy(pSub->progress); taosArrayDestroy(pSub->progress);
pSub->progress = taosArrayInit(32, sizeof(SSubscriptionProgress)); pSub->progress = progress;
pSub->lastSyncTime = taosGetTimestampMs();
taosArraySort( tables, tscCompareTidTags );
tscBuildVgroupTableInfo( pTableMetaInfo, tables );
taosArrayDestroy(tables);
return 1; return 1;
} }
...@@ -229,11 +281,11 @@ static int tscLoadSubscriptionProgress(SSub* pSub) { ...@@ -229,11 +281,11 @@ static int tscLoadSubscriptionProgress(SSub* pSub) {
} }
int numOfTables = atoi(buf); int numOfTables = atoi(buf);
SArray* progress = taosArrayInit(numOfTables, sizeof(SSubscriptionProgress)); SArray* progress = pSub->progress;
taosArrayClear(progress);
for (int i = 0; i < numOfTables; i++) { for (int i = 0; i < numOfTables; i++) {
if (fgets(buf, sizeof(buf), fp) == NULL) { if (fgets(buf, sizeof(buf), fp) == NULL) {
fclose(fp); fclose(fp);
free(progress);
return 0; return 0;
} }
SSubscriptionProgress p; SSubscriptionProgress p;
...@@ -244,8 +296,6 @@ static int tscLoadSubscriptionProgress(SSub* pSub) { ...@@ -244,8 +296,6 @@ static int tscLoadSubscriptionProgress(SSub* pSub) {
fclose(fp); fclose(fp);
taosArraySort(progress, tscCompareSubscriptionProgress); taosArraySort(progress, tscCompareSubscriptionProgress);
taosArrayDestroy(pSub->progress);
pSub->progress = progress;
tscTrace("subscription progress loaded, %d tables: %s", numOfTables, pSub->topic); tscTrace("subscription progress loaded, %d tables: %s", numOfTables, pSub->topic);
return 1; return 1;
} }
...@@ -333,12 +383,28 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { ...@@ -333,12 +383,28 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
for (int retry = 0; retry < 3; retry++) { for (int retry = 0; retry < 3; retry++) {
tscRemoveFromSqlList(pSql); tscRemoveFromSqlList(pSql);
pSql->fp = waitParseComplete; if (taosGetTimestampMs() - pSub->lastSyncTime > 10 * 60 * 1000) {
tscTrace("begin table synchronization");
if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL;
tscTrace("table synchronization completed");
} else {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
uint32_t type = pQueryInfo->type;
// taos_free_result_imp(pSql, 1);
pRes->numOfRows = 1;
//pRes->numOfTotal = 0;
pRes->qhandle = 0;
pSql->cmd.command = TSDB_SQL_SELECT;
pQueryInfo->type = type;
tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vgroupIndex = 0;
}
pSql->fp = asyncCallback;
pSql->param = pSql; pSql->param = pSql;
tscDoQuery(pSql); tscDoQuery(pSql);
if (pRes->code == TSDB_CODE_ACTION_IN_PROGRESS) { sem_wait(&pSql->rspSem);
sem_wait(&pSql->rspSem);
}
if (pRes->code != TSDB_CODE_SUCCESS) { if (pRes->code != TSDB_CODE_SUCCESS) {
continue; continue;
......
...@@ -463,77 +463,51 @@ static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* ...@@ -463,77 +463,51 @@ static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj*
} }
} }
int32_t tagsOrderCompar(const void* p1, const void* p2) { int32_t tscCompareTidTags(const void* p1, const void* p2) {
STidTags* t1 = (STidTags*) p1; const STidTags* t1 = (const STidTags*) p1;
STidTags* t2 = (STidTags*) p2; const STidTags* t2 = (const STidTags*) p2;
if (t1->vgId != t2->vgId) { if (t1->vgId != t2->vgId) {
return (t1->vgId > t2->vgId)? 1:-1; return (t1->vgId > t2->vgId) ? 1 : -1;
} else { }
if (t1->tid != t2->tid) { if (t1->tid != t2->tid) {
return (t1->tid > t2->tid)? 1:-1; return (t1->tid > t2->tid) ? 1 : -1;
} else {
return 0;
}
} }
return 0;
} }
static void doBuildVgroupTableInfo(SArray* res, STableMetaInfo* pTableMetaInfo) { void tscBuildVgroupTableInfo(STableMetaInfo* pTableMetaInfo, SArray* tables) {
SArray* pGroup = taosArrayInit(4, sizeof(SVgroupTableInfo)); SArray* result = taosArrayInit( 4, sizeof(SVgroupTableInfo) );
SArray* vgTables = NULL;
SArray* vgTableIdItem = taosArrayInit(4, sizeof(STableIdInfo)); STidTags* prev = NULL;
int32_t size = taosArrayGetSize(res);
size_t numOfTables = taosArrayGetSize( tables );
STidTags* prev = taosArrayGet(res, 0); for( size_t i = 0; i < numOfTables; i++ ) {
int32_t prevVgId = prev->vgId; STidTags* tt = taosArrayGet( tables, i );
STableIdInfo item = {.uid = prev->uid, .tid = prev->tid, .key = INT64_MIN}; if( prev == NULL || tt->vgId != prev->vgId ) {
taosArrayPush(vgTableIdItem, &item);
for(int32_t k = 1; k < size; ++k) {
STidTags* t1 = taosArrayGet(res, k);
if (prevVgId != t1->vgId) {
SVgroupTableInfo info = {0};
SVgroupsInfo* pvg = pTableMetaInfo->vgroupList; SVgroupsInfo* pvg = pTableMetaInfo->vgroupList;
for(int32_t m = 0; m < pvg->numOfVgroups; ++m) {
if (prevVgId == pvg->vgroups[m].vgId) { SVgroupTableInfo info = { 0 };
for( int32_t m = 0; m < pvg->numOfVgroups; ++m ) {
if( tt->vgId == pvg->vgroups[m].vgId ) {
info.vgInfo = pvg->vgroups[m]; info.vgInfo = pvg->vgroups[m];
break; break;
} }
} }
assert( info.vgInfo.numOfIps != 0 );
assert(info.vgInfo.numOfIps != 0);
info.itemList = vgTableIdItem; vgTables = taosArrayInit( 4, sizeof(STableIdInfo) );
taosArrayPush(pGroup, &info); info.itemList = vgTables;
taosArrayPush( result, &info );
vgTableIdItem = taosArrayInit(4, sizeof(STableIdInfo));
STableIdInfo item1 = {.uid = t1->uid, .tid = t1->tid, .key = INT64_MIN};
taosArrayPush(vgTableIdItem, &item1);
prevVgId = t1->vgId;
} else {
taosArrayPush(vgTableIdItem, &item);
}
}
if (taosArrayGetSize(vgTableIdItem) > 0) {
SVgroupTableInfo info = {0};
SVgroupsInfo* pvg = pTableMetaInfo->vgroupList;
for(int32_t m = 0; m < pvg->numOfVgroups; ++m) {
if (prevVgId == pvg->vgroups[m].vgId) {
info.vgInfo = pvg->vgroups[m];
break;
}
} }
assert(info.vgInfo.numOfIps != 0); STableIdInfo item = { .uid = tt->uid, .tid = tt->tid, .key = INT64_MIN };
info.itemList = vgTableIdItem; taosArrayPush( vgTables, &item );
taosArrayPush(pGroup, &info); prev = tt;
} }
pTableMetaInfo->pVgroupTables = pGroup; pTableMetaInfo->pVgroupTables = result;
} }
static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) { static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) {
...@@ -627,8 +601,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -627,8 +601,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
SJoinSupporter* p1 = pParentSql->pSubs[0]->param; SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSupporter* p2 = pParentSql->pSubs[1]->param; SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
qsort(p1->pIdTagList, p1->num, p1->tagSize, tagsOrderCompar); qsort(p1->pIdTagList, p1->num, p1->tagSize, tscCompareTidTags);
qsort(p2->pIdTagList, p2->num, p2->tagSize, tagsOrderCompar); qsort(p2->pIdTagList, p2->num, p2->tagSize, tscCompareTidTags);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
...@@ -668,11 +642,11 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -668,11 +642,11 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0); SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0);
STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0); STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0);
doBuildVgroupTableInfo(s1, pTableMetaInfo1); tscBuildVgroupTableInfo(pTableMetaInfo1, s1);
SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0); SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0);
STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0); STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0);
doBuildVgroupTableInfo(s2, pTableMetaInfo2); tscBuildVgroupTableInfo(pTableMetaInfo2, s2);
pSupporter->pState->numOfCompleted = 0; pSupporter->pState->numOfCompleted = 0;
pSupporter->pState->code = 0; pSupporter->pState->code = 0;
......
...@@ -2165,7 +2165,7 @@ void* malloc_throw(size_t size) { ...@@ -2165,7 +2165,7 @@ void* malloc_throw(size_t size) {
} }
void* calloc_throw(size_t nmemb, size_t size) { void* calloc_throw(size_t nmemb, size_t size) {
void* p = malloc(size); void* p = calloc(nmemb, size);
if (p == NULL) { if (p == NULL) {
THROW(TSDB_CODE_CLI_OUT_OF_MEMORY); THROW(TSDB_CODE_CLI_OUT_OF_MEMORY);
} }
......
...@@ -1522,7 +1522,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, int64_t uid, const char *pTagC ...@@ -1522,7 +1522,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, int64_t uid, const char *pTagC
} }
if (pTable->type != TSDB_SUPER_TABLE) { if (pTable->type != TSDB_SUPER_TABLE) {
uError("%p query normal tag not allowed, uid:%, tid:%d, name:%s" PRIu64, uError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s",
tsdb, uid, pTable->tableId.tid, pTable->name); tsdb, uid, pTable->tableId.tid, pTable->name);
return TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client return TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册