提交 a3693762 编写于 作者: weixin_48148422's avatar weixin_48148422

super table subscribe is done

上级 9dd3e62b
...@@ -589,25 +589,25 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char ...@@ -589,25 +589,25 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
} else { } else {
pVgroupInfo = &pTableMeta->vgroupInfo; pVgroupInfo = &pTableMeta->vgroupInfo;
} }
tscSetDnodeIpList(pSql, pVgroupInfo); tscSetDnodeIpList(pSql, pVgroupInfo);
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->tid = htonl(pTableMeta->sid); pTableIdInfo->tid = htonl(pTableMeta->sid);
pTableIdInfo->uid = htobe64(pTableMeta->uid); pTableIdInfo->uid = htobe64(pTableMeta->uid);
pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid)); pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
pQueryMsg->numOfTables = htonl(1); // set the number of tables pQueryMsg->numOfTables = htonl(1); // set the number of tables
pMsg += sizeof(STableIdInfo); pMsg += sizeof(STableIdInfo);
} else { } else {
int32_t index = pTableMetaInfo->vgroupIndex; int32_t index = pTableMetaInfo->vgroupIndex;
int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables); int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);
assert(index >= 0 && index < numOfVgroups); assert(index >= 0 && index < numOfVgroups);
tscTrace("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups); tscTrace("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index); SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
// set the vgroup info // set the vgroup info
...@@ -624,7 +624,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char ...@@ -624,7 +624,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->tid = htonl(pItem->tid); pTableIdInfo->tid = htonl(pItem->tid);
pTableIdInfo->uid = htobe64(pItem->uid); pTableIdInfo->uid = htobe64(pItem->uid);
pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid)); pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid));
pMsg += sizeof(STableIdInfo); pMsg += sizeof(STableIdInfo);
} }
} }
......
...@@ -236,8 +236,10 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { ...@@ -236,8 +236,10 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
taosArrayDestroy(pSub->progress); taosArrayDestroy(pSub->progress);
pSub->progress = progress; pSub->progress = progress;
taosArraySort( tables, tscCompareTidTags ); if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
tscBuildVgroupTableInfo( pTableMetaInfo, tables ); taosArraySort( tables, tscCompareTidTags );
tscBuildVgroupTableInfo( pTableMetaInfo, tables );
}
taosArrayDestroy(tables); taosArrayDestroy(tables);
return 1; return 1;
...@@ -274,16 +276,9 @@ static int tscLoadSubscriptionProgress(SSub* pSub) { ...@@ -274,16 +276,9 @@ static int tscLoadSubscriptionProgress(SSub* pSub) {
return 0; return 0;
} }
if (fgets(buf, sizeof(buf), fp) == NULL || atoi(buf) < 0) {
tscTrace("invalid subscription progress file: %s", pSub->topic);
fclose(fp);
return 0;
}
int numOfTables = atoi(buf);
SArray* progress = pSub->progress; SArray* progress = pSub->progress;
taosArrayClear(progress); taosArrayClear(progress);
for (int i = 0; i < numOfTables; i++) { while( 1 ) {
if (fgets(buf, sizeof(buf), fp) == NULL) { if (fgets(buf, sizeof(buf), fp) == NULL) {
fclose(fp); fclose(fp);
return 0; return 0;
...@@ -296,7 +291,7 @@ static int tscLoadSubscriptionProgress(SSub* pSub) { ...@@ -296,7 +291,7 @@ static int tscLoadSubscriptionProgress(SSub* pSub) {
fclose(fp); fclose(fp);
taosArraySort(progress, tscCompareSubscriptionProgress); taosArraySort(progress, tscCompareSubscriptionProgress);
tscTrace("subscription progress loaded, %d tables: %s", numOfTables, pSub->topic); tscTrace("subscription progress loaded, %d tables: %s", taosArrayGetSize(progress), pSub->topic);
return 1; return 1;
} }
...@@ -317,6 +312,7 @@ void tscSaveSubscriptionProgress(void* sub) { ...@@ -317,6 +312,7 @@ void tscSaveSubscriptionProgress(void* sub) {
} }
fputs(pSub->pSql->sqlstr, fp); fputs(pSub->pSql->sqlstr, fp);
fprintf(fp, "\n");
for(size_t i = 0; i < taosArrayGetSize(pSub->progress); i++) { for(size_t i = 0; i < taosArrayGetSize(pSub->progress); i++) {
SSubscriptionProgress* p = taosArrayGet(pSub->progress, i); SSubscriptionProgress* p = taosArrayGet(pSub->progress, i);
fprintf(fp, "%" PRId64 ":%" PRId64 "\n", p->uid, p->key); fprintf(fp, "%" PRId64 ":%" PRId64 "\n", p->uid, p->key);
...@@ -387,20 +383,19 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { ...@@ -387,20 +383,19 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
tscTrace("begin table synchronization"); tscTrace("begin table synchronization");
if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL; if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL;
tscTrace("table synchronization completed"); tscTrace("table synchronization completed");
} else {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
uint32_t type = pQueryInfo->type;
// taos_free_result_imp(pSql, 1);
pRes->numOfRows = 1;
//pRes->numOfTotal = 0;
pRes->qhandle = 0;
pSql->cmd.command = TSDB_SQL_SELECT;
pQueryInfo->type = type;
tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vgroupIndex = 0;
} }
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
uint32_t type = pQueryInfo->type;
tscFreeSqlResult(pSql);
pRes->numOfRows = 1;
pRes->qhandle = 0;
pSql->cmd.command = TSDB_SQL_SELECT;
pQueryInfo->type = type;
tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vgroupIndex = 0;
pSql->fp = asyncCallback; pSql->fp = asyncCallback;
pSql->param = pSql; pSql->param = pSql;
tscDoQuery(pSql); tscDoQuery(pSql);
......
...@@ -4184,11 +4184,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool ...@@ -4184,11 +4184,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
if (!isSTableQuery && isFirstLastRowQuery(pQuery)) { // in case of last_row query, invoke a different API. if (!isSTableQuery && isFirstLastRowQuery(pQuery)) { // in case of last_row query, invoke a different API.
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableIdGroupInfo); pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableIdGroupInfo);
} else if (!isSTableQuery || isIntervalQuery(pQuery) || isFixedOutputQuery(pQuery)) { } else if (!isSTableQuery || isIntervalQuery(pQuery) || isFixedOutputQuery(pQuery)) {
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo); pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo);
} }
// create the table query support structures
createTableQueryInfo(pQInfo);
} }
pQInfo->tsdb = tsdb; pQInfo->tsdb = tsdb;
...@@ -4608,12 +4606,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4608,12 +4606,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
* to ensure that, we can reset the query range once query on a meter is completed. * to ensure that, we can reset the query range once query on a meter is completed.
*/ */
pQInfo->tableIndex++; pQInfo->tableIndex++;
pInfo->lastKey = pQuery->lastKey;
STableIdInfo tidInfo; STableIdInfo tidInfo;
tidInfo.uid = item->id.uid; tidInfo.uid = item->id.uid;
tidInfo.tid = item->id.tid; tidInfo.tid = item->id.tid;
tidInfo.key = pInfo->lastKey; tidInfo.key = pQuery->current->lastKey;
taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo); taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo);
// if the buffer is full or group by each table, we need to jump out of the loop // if the buffer is full or group by each table, we need to jump out of the loop
...@@ -5198,6 +5195,7 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p ...@@ -5198,6 +5195,7 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p
pTableIdInfo->uid = htobe64(pTableIdInfo->uid); pTableIdInfo->uid = htobe64(pTableIdInfo->uid);
pTableIdInfo->key = htobe64(pTableIdInfo->key); pTableIdInfo->key = htobe64(pTableIdInfo->key);
printf("createTableIdList: uid = %ld, key = %ld\n", pTableIdInfo->uid, pTableIdInfo->key);
taosArrayPush(*pTableIdList, pTableIdInfo); taosArrayPush(*pTableIdList, pTableIdInfo);
pMsg += sizeof(STableIdInfo); pMsg += sizeof(STableIdInfo);
} }
...@@ -5761,9 +5759,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, ...@@ -5761,9 +5759,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
// not a problem at present because we only use their 1st int64_t field // not a problem at present because we only use their 1st int64_t field
STableIdInfo* pTableId = taosArraySearch( pTableIdList, compareTableIdInfo, &id ); STableIdInfo* pTableId = taosArraySearch( pTableIdList, compareTableIdInfo, &id );
if (pTableId != NULL ) { if (pTableId != NULL ) {
printf("create QInfoImpl: %ld %ld\n", pTableId->uid, pTableId->key);
window.skey = pTableId->key; window.skey = pTableId->key;
} else { } else {
window.skey = 0; window.skey = INT64_MIN;
} }
item.info = createTableQueryInfo(&pQInfo->runtimeEnv, item.id, window); item.info = createTableQueryInfo(&pQInfo->runtimeEnv, item.id, window);
item.info->groupIdx = i; item.info->groupIdx = i;
...@@ -6060,20 +6059,36 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi ...@@ -6060,20 +6059,36 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
} }
} else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_STABLE_QUERY)) { } else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_STABLE_QUERY)) {
isSTableQuery = true; isSTableQuery = true;
STableIdInfo *id = taosArrayGet(pTableIdList, 0); // TODO: need a macro from TSDB to check if table is super table,
// also note there's possiblity that only one table in the super table
// group by normal column, do not pass the group by condition to tsdb to group table into different group if (taosArrayGetSize(pTableIdList) == 1) {
int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols; STableIdInfo *id = taosArrayGet(pTableIdList, 0);
if (pQueryMsg->numOfGroupCols == 1 && !TSDB_COL_IS_TAG(pGroupColIndex->flag)) { // if array size is 1 and assert super table
numOfGroupByCols = 0;
} // group by normal column, do not pass the group by condition to tsdb to group table into different group
int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols;
// todo handle the error if (pQueryMsg->numOfGroupCols == 1 && !TSDB_COL_IS_TAG(pGroupColIndex->flag)) {
/*int32_t ret =*/tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &groupInfo, pGroupColIndex, numOfGroupByCols = 0;
numOfGroupByCols); }
if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query
code = TSDB_CODE_SUCCESS; // todo handle the error
goto _over; /*int32_t ret =*/tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &groupInfo, pGroupColIndex,
numOfGroupByCols);
if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query
code = TSDB_CODE_SUCCESS;
goto _over;
}
} else {
groupInfo.numOfTables = taosArrayGetSize(pTableIdList);
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
SArray* sa = taosArrayInit(groupInfo.numOfTables, sizeof(STableId));
for(int32_t i = 0; i < groupInfo.numOfTables; ++i) {
STableIdInfo* tableId = taosArrayGet(pTableIdList, i);
taosArrayPush(sa, tableId);
}
taosArrayPush(pTableGroup, &sa);
groupInfo.pGroupList = pTableGroup;
} }
} else { } else {
assert(0); assert(0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册