diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 9431a09bb0d1ece53ba5df78c03dcaee0f537aa0..32eee0854136aef9e449117c49f4aae586cee85f 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -589,25 +589,25 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char } else { pVgroupInfo = &pTableMeta->vgroupInfo; } - + tscSetDnodeIpList(pSql, pVgroupInfo); pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); - + STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; pTableIdInfo->tid = htonl(pTableMeta->sid); pTableIdInfo->uid = htobe64(pTableMeta->uid); pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid)); - + pQueryMsg->numOfTables = htonl(1); // set the number of tables - + pMsg += sizeof(STableIdInfo); } else { int32_t index = pTableMetaInfo->vgroupIndex; int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables); assert(index >= 0 && index < numOfVgroups); - + tscTrace("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups); - + SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index); // set the vgroup info @@ -624,7 +624,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; pTableIdInfo->tid = htonl(pItem->tid); pTableIdInfo->uid = htobe64(pItem->uid); - pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid)); + pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pItem->uid)); pMsg += sizeof(STableIdInfo); } } diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 5f20ee4fbcf5ca2c67697207181a51bf2ba47c15..8918338265bcfa32498f6127326c0847ce4954f5 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -236,8 +236,10 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { taosArrayDestroy(pSub->progress); pSub->progress = progress; - taosArraySort( tables, tscCompareTidTags ); - tscBuildVgroupTableInfo( pTableMetaInfo, tables ); + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { + taosArraySort( tables, tscCompareTidTags ); + tscBuildVgroupTableInfo( pTableMetaInfo, tables ); + } taosArrayDestroy(tables); return 1; @@ -274,16 +276,9 @@ static int tscLoadSubscriptionProgress(SSub* pSub) { return 0; } - if (fgets(buf, sizeof(buf), fp) == NULL || atoi(buf) < 0) { - tscTrace("invalid subscription progress file: %s", pSub->topic); - fclose(fp); - return 0; - } - - int numOfTables = atoi(buf); SArray* progress = pSub->progress; taosArrayClear(progress); - for (int i = 0; i < numOfTables; i++) { + while( 1 ) { if (fgets(buf, sizeof(buf), fp) == NULL) { fclose(fp); return 0; @@ -296,7 +291,7 @@ static int tscLoadSubscriptionProgress(SSub* pSub) { fclose(fp); taosArraySort(progress, tscCompareSubscriptionProgress); - tscTrace("subscription progress loaded, %d tables: %s", numOfTables, pSub->topic); + tscTrace("subscription progress loaded, %d tables: %s", taosArrayGetSize(progress), pSub->topic); return 1; } @@ -317,6 +312,7 @@ void tscSaveSubscriptionProgress(void* sub) { } fputs(pSub->pSql->sqlstr, fp); + fprintf(fp, "\n"); for(size_t i = 0; i < taosArrayGetSize(pSub->progress); i++) { SSubscriptionProgress* p = taosArrayGet(pSub->progress, i); fprintf(fp, "%" PRId64 ":%" PRId64 "\n", p->uid, p->key); @@ -387,20 +383,19 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { tscTrace("begin table synchronization"); if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL; tscTrace("table synchronization completed"); - } else { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - - uint32_t type = pQueryInfo->type; - // taos_free_result_imp(pSql, 1); - pRes->numOfRows = 1; - //pRes->numOfTotal = 0; - pRes->qhandle = 0; - pSql->cmd.command = TSDB_SQL_SELECT; - pQueryInfo->type = type; - - tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vgroupIndex = 0; } + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + + uint32_t type = pQueryInfo->type; + tscFreeSqlResult(pSql); + pRes->numOfRows = 1; + pRes->qhandle = 0; + pSql->cmd.command = TSDB_SQL_SELECT; + pQueryInfo->type = type; + + tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vgroupIndex = 0; + pSql->fp = asyncCallback; pSql->param = pSql; tscDoQuery(pSql); diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index d400d72a38539e3e2dab970d1f84d96c93e43368..8bd2052ef810009d423560612360c021cbfb4459 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -4184,11 +4184,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool if (!isSTableQuery && isFirstLastRowQuery(pQuery)) { // in case of last_row query, invoke a different API. pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableIdGroupInfo); } else if (!isSTableQuery || isIntervalQuery(pQuery) || isFixedOutputQuery(pQuery)) { + pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo); } - - // create the table query support structures - createTableQueryInfo(pQInfo); } pQInfo->tsdb = tsdb; @@ -4608,12 +4606,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) { * to ensure that, we can reset the query range once query on a meter is completed. */ pQInfo->tableIndex++; - pInfo->lastKey = pQuery->lastKey; STableIdInfo tidInfo; tidInfo.uid = item->id.uid; tidInfo.tid = item->id.tid; - tidInfo.key = pInfo->lastKey; + tidInfo.key = pQuery->current->lastKey; taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo); // if the buffer is full or group by each table, we need to jump out of the loop @@ -5198,6 +5195,7 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p pTableIdInfo->uid = htobe64(pTableIdInfo->uid); pTableIdInfo->key = htobe64(pTableIdInfo->key); +printf("createTableIdList: uid = %ld, key = %ld\n", pTableIdInfo->uid, pTableIdInfo->key); taosArrayPush(*pTableIdList, pTableIdInfo); pMsg += sizeof(STableIdInfo); } @@ -5761,9 +5759,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, // not a problem at present because we only use their 1st int64_t field STableIdInfo* pTableId = taosArraySearch( pTableIdList, compareTableIdInfo, &id ); if (pTableId != NULL ) { + printf("create QInfoImpl: %ld %ld\n", pTableId->uid, pTableId->key); window.skey = pTableId->key; } else { - window.skey = 0; + window.skey = INT64_MIN; } item.info = createTableQueryInfo(&pQInfo->runtimeEnv, item.id, window); item.info->groupIdx = i; @@ -6060,20 +6059,36 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi } } else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_STABLE_QUERY)) { isSTableQuery = true; - STableIdInfo *id = taosArrayGet(pTableIdList, 0); - - // group by normal column, do not pass the group by condition to tsdb to group table into different group - int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols; - if (pQueryMsg->numOfGroupCols == 1 && !TSDB_COL_IS_TAG(pGroupColIndex->flag)) { - numOfGroupByCols = 0; - } - - // todo handle the error - /*int32_t ret =*/tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &groupInfo, pGroupColIndex, - numOfGroupByCols); - if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query - code = TSDB_CODE_SUCCESS; - goto _over; + // TODO: need a macro from TSDB to check if table is super table, + // also note there's possiblity that only one table in the super table + if (taosArrayGetSize(pTableIdList) == 1) { + STableIdInfo *id = taosArrayGet(pTableIdList, 0); + // if array size is 1 and assert super table + + // group by normal column, do not pass the group by condition to tsdb to group table into different group + int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols; + if (pQueryMsg->numOfGroupCols == 1 && !TSDB_COL_IS_TAG(pGroupColIndex->flag)) { + numOfGroupByCols = 0; + } + + // todo handle the error + /*int32_t ret =*/tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &groupInfo, pGroupColIndex, + numOfGroupByCols); + if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query + code = TSDB_CODE_SUCCESS; + goto _over; + } + } else { + groupInfo.numOfTables = taosArrayGetSize(pTableIdList); + SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES); + + SArray* sa = taosArrayInit(groupInfo.numOfTables, sizeof(STableId)); + for(int32_t i = 0; i < groupInfo.numOfTables; ++i) { + STableIdInfo* tableId = taosArrayGet(pTableIdList, i); + taosArrayPush(sa, tableId); + } + taosArrayPush(pTableGroup, &sa); + groupInfo.pGroupList = pTableGroup; } } else { assert(0);