提交 b42a8214 编写于 作者: weixin_48148422's avatar weixin_48148422

TD-191: WIP

上级 95df099a
...@@ -624,7 +624,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char ...@@ -624,7 +624,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->tid = htonl(pItem->tid); pTableIdInfo->tid = htonl(pItem->tid);
pTableIdInfo->uid = htobe64(pItem->uid); pTableIdInfo->uid = htobe64(pItem->uid);
// pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid)); pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
pMsg += sizeof(STableIdInfo); pMsg += sizeof(STableIdInfo);
} }
} }
...@@ -2354,6 +2354,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { ...@@ -2354,6 +2354,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
for (int i = 0; i < numOfTables; i++) { for (int i = 0; i < numOfTables; i++) {
int64_t uid = htobe64(*(int64_t*)p); int64_t uid = htobe64(*(int64_t*)p);
p += sizeof(int64_t); p += sizeof(int64_t);
p += sizeof(int32_t); // skip tid
TSKEY key = htobe64(*(TSKEY*)p); TSKEY key = htobe64(*(TSKEY*)p);
p += sizeof(TSKEY); p += sizeof(TSKEY);
tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key); tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key);
......
...@@ -44,8 +44,7 @@ typedef struct SSub { ...@@ -44,8 +44,7 @@ typedef struct SSub {
int interval; int interval;
TAOS_SUBSCRIBE_CALLBACK fp; TAOS_SUBSCRIBE_CALLBACK fp;
void * param; void * param;
int numOfTables; SArray* progress;
SSubscriptionProgress * progress;
} SSub; } SSub;
...@@ -58,41 +57,36 @@ static int tscCompareSubscriptionProgress(const void* a, const void* b) { ...@@ -58,41 +57,36 @@ static int tscCompareSubscriptionProgress(const void* a, const void* b) {
} }
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid) { TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid) {
if (sub == NULL) if (sub == NULL) {
return 0; return 0;
}
SSub* pSub = (SSub*)sub; SSub* pSub = (SSub*)sub;
for (int s = 0, e = pSub->numOfTables; s < e;) { if (pSub->progress == NULL) {
int m = (s + e) / 2; return 0;
SSubscriptionProgress* p = pSub->progress + m;
if (p->uid > uid)
e = m;
else if (p->uid < uid)
s = m + 1;
else
return p->key;
} }
return 0; SSubscriptionProgress target = {.uid = uid, .key = 0};
SSubscriptionProgress* p = taosArraySearch(pSub->progress, tscCompareSubscriptionProgress, &target);
if (p == NULL) {
return 0;
}
return p->key;
} }
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) { void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) {
if( sub == NULL) if( sub == NULL)
return; return;
SSub* pSub = (SSub*)sub; SSub* pSub = (SSub*)sub;
for (int s = 0, e = pSub->numOfTables; s < e;) {
int m = (s + e) / 2; SSubscriptionProgress target = {.uid = uid, .key = ts};
SSubscriptionProgress* p = pSub->progress + m; SSubscriptionProgress* p = taosArraySearch(pSub->progress, tscCompareSubscriptionProgress, &target);
if (p->uid > uid) if (p != NULL) {
e = m; p->key = ts;
else if (p->uid < uid) return;
s = m + 1;
else {
if (ts >= p->key) p->key = ts;
break;
}
} }
taosArrayPush(pSub->progress, &target);
taosArraySort(pSub->progress, tscCompareSubscriptionProgress);
} }
...@@ -158,60 +152,40 @@ static void tscProcessSubscriptionTimer(void *handle, void *tmrId) { ...@@ -158,60 +152,40 @@ static void tscProcessSubscriptionTimer(void *handle, void *tmrId) {
taosTmrReset(tscProcessSubscriptionTimer, pSub->interval, pSub, tscTmr, &pSub->pTimer); taosTmrReset(tscProcessSubscriptionTimer, pSub->interval, pSub, tscTmr, &pSub->pTimer);
} }
static void waitParseComplete(void *param, TAOS_RES *tres, int code) {
assert(param != NULL);
SSqlObj *pSql = ((SSqlObj *)param);
pSql->res.code = code;
sem_post(&pSql->rspSem);
}
int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
int code = (uint8_t)tsParseSql(pSub->pSql, false); SSqlObj* pSql = pSub->pSql;
SSqlCmd* pCmd = &pSql->cmd;
tscAllocPayload( pCmd, TSDB_DEFAULT_PAYLOAD_SIZE );
pSql->fp = waitParseComplete;
pSql->param = pSql;
int code = tsParseSql(pSql, false);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
// wait for the callback function to post the semaphore
sem_wait(&pSql->rspSem);
code = pSql->res.code;
}
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscError("failed to parse sql statement: %s", pSub->topic); tscError("failed to parse sql statement: %s, error: %s", pSub->topic, tstrerror(code));
return 0; return 0;
} }
SSqlCmd* pCmd = &pSub->pSql->cmd;
if (pCmd->command != TSDB_SQL_SELECT) { if (pCmd->command != TSDB_SQL_SELECT) {
tscError("only 'select' statement is allowed in subscription: %s", pSub->topic); tscError("only 'select' statement is allowed in subscription: %s", pSub->topic);
return 0; return 0;
} }
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0, 0); taosArrayDestroy(pSub->progress);
int numOfTables = 0; pSub->progress = taosArrayInit(32, sizeof(SSubscriptionProgress));
if (!UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
// SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta;
// for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
// SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
// numOfTables += pVnodeSidList->numOfSids;
// }
}
SSubscriptionProgress* progress = (SSubscriptionProgress*)calloc(numOfTables, sizeof(SSubscriptionProgress));
if (progress == NULL) {
tscError("failed to allocate memory for progress: %s", pSub->topic);
return 0;
}
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
numOfTables = 1;
int64_t uid = pTableMetaInfo->pTableMeta->uid;
progress[0].uid = uid;
progress[0].key = tscGetSubscriptionProgress(pSub, uid);
} else {
// SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta;
// numOfTables = 0;
// for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
// SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
// for (int32_t j = 0; j < pVnodeSidList->numOfSids; j++) {
// STableIdInfo *pTableMetaInfo = tscGetMeterSidInfo(pVnodeSidList, j);
// int64_t uid = pTableMetaInfo->uid;
// progress[numOfTables].uid = uid;
// progress[numOfTables++].key = tscGetSubscriptionProgress(pSub, uid);
// }
// }
qsort(progress, numOfTables, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress);
}
free(pSub->progress);
pSub->numOfTables = numOfTables;
pSub->progress = progress;
pSub->lastSyncTime = taosGetTimestampMs(); pSub->lastSyncTime = taosGetTimestampMs();
return 1; return 1;
...@@ -255,23 +229,22 @@ static int tscLoadSubscriptionProgress(SSub* pSub) { ...@@ -255,23 +229,22 @@ static int tscLoadSubscriptionProgress(SSub* pSub) {
} }
int numOfTables = atoi(buf); int numOfTables = atoi(buf);
SSubscriptionProgress* progress = calloc(numOfTables, sizeof(SSubscriptionProgress)); SArray* progress = taosArrayInit(numOfTables, sizeof(SSubscriptionProgress));
for (int i = 0; i < numOfTables; i++) { for (int i = 0; i < numOfTables; i++) {
if (fgets(buf, sizeof(buf), fp) == NULL) { if (fgets(buf, sizeof(buf), fp) == NULL) {
fclose(fp); fclose(fp);
free(progress); free(progress);
return 0; return 0;
} }
int64_t uid, key; SSubscriptionProgress p;
sscanf(buf, "%" SCNd64 ":%" SCNd64, &uid, &key); sscanf(buf, "%" SCNd64 ":%" SCNd64, &p.uid, &p.key);
progress[i].uid = uid; taosArrayPush(progress, &p);
progress[i].key = key;
} }
fclose(fp); fclose(fp);
qsort(progress, numOfTables, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress); taosArraySort(progress, tscCompareSubscriptionProgress);
pSub->numOfTables = numOfTables; taosArrayDestroy(pSub->progress);
pSub->progress = progress; pSub->progress = progress;
tscTrace("subscription progress loaded, %d tables: %s", numOfTables, pSub->topic); tscTrace("subscription progress loaded, %d tables: %s", numOfTables, pSub->topic);
return 1; return 1;
...@@ -294,11 +267,9 @@ void tscSaveSubscriptionProgress(void* sub) { ...@@ -294,11 +267,9 @@ void tscSaveSubscriptionProgress(void* sub) {
} }
fputs(pSub->pSql->sqlstr, fp); fputs(pSub->pSql->sqlstr, fp);
fprintf(fp, "\n%d\n", pSub->numOfTables); for(size_t i = 0; i < taosArrayGetSize(pSub->progress); i++) {
for (int i = 0; i < pSub->numOfTables; i++) { SSubscriptionProgress* p = taosArrayGet(pSub->progress, i);
int64_t uid = pSub->progress[i].uid; fprintf(fp, "%" PRId64 ":%" PRId64 "\n", p->uid, p->key);
TSKEY key = pSub->progress[i].key;
fprintf(fp, "%" PRId64 ":%" PRId64 "\n", uid, key);
} }
fclose(fp); fclose(fp);
...@@ -362,36 +333,20 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { ...@@ -362,36 +333,20 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
for (int retry = 0; retry < 3; retry++) { for (int retry = 0; retry < 3; retry++) {
tscRemoveFromSqlList(pSql); tscRemoveFromSqlList(pSql);
if (taosGetTimestampMs() - pSub->lastSyncTime > 10 * 60 * 1000) { pSql->fp = waitParseComplete;
tscTrace("begin meter synchronization"); pSql->param = pSql;
char* sqlstr = pSql->sqlstr; tscDoQuery(pSql);
pSql->sqlstr = NULL; if (pRes->code == TSDB_CODE_ACTION_IN_PROGRESS) {
taos_free_result_imp(pSql, 0); sem_wait(&pSql->rspSem);
pSql->sqlstr = sqlstr;
taosCacheEmpty(tscCacheHandle);
if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL;
tscTrace("meter synchronization completed");
} else {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
uint32_t type = pQueryInfo->type;
taos_free_result_imp(pSql, 1);
pRes->numOfRows = 1;
pRes->numOfTotal = 0;
pRes->qhandle = 0;
pSql->cmd.command = TSDB_SQL_SELECT;
pQueryInfo->type = type;
tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vgroupIndex = 0;
} }
tscDoQuery(pSql); if (pRes->code != TSDB_CODE_SUCCESS) {
if (pRes->code != TSDB_CODE_NOT_ACTIVE_TABLE) { continue;
break;
} }
// meter was removed, make sync time zero, so that next retry will // meter was removed, make sync time zero, so that next retry will
// do synchronization first // do synchronization first
pSub->lastSyncTime = 0; pSub->lastSyncTime = 0;
break;
} }
if (pRes->code != TSDB_CODE_SUCCESS) { if (pRes->code != TSDB_CODE_SUCCESS) {
...@@ -421,7 +376,7 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) { ...@@ -421,7 +376,7 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
} }
tscFreeSqlObj(pSub->pSql); tscFreeSqlObj(pSub->pSql);
free(pSub->progress); taosArrayDestroy(pSub->progress);
memset(pSub, 0, sizeof(*pSub)); memset(pSub, 0, sizeof(*pSub));
free(pSub); free(pSub);
} }
...@@ -1459,10 +1459,11 @@ bool tscShouldFreeHeatBeat(SSqlObj* pHb) { ...@@ -1459,10 +1459,11 @@ bool tscShouldFreeHeatBeat(SSqlObj* pHb) {
} }
/* /*
* the following three kinds of SqlObj should not be freed * the following four kinds of SqlObj should not be freed
* 1. SqlObj for stream computing * 1. SqlObj for stream computing
* 2. main SqlObj * 2. main SqlObj
* 3. heartbeat SqlObj * 3. heartbeat SqlObj
* 4. SqlObj for subscription
* *
* If res code is error and SqlObj does not belong to above types, it should be * If res code is error and SqlObj does not belong to above types, it should be
* automatically freed for async query, ignoring that connection should be kept. * automatically freed for async query, ignoring that connection should be kept.
...@@ -1475,7 +1476,7 @@ bool tscShouldBeFreed(SSqlObj* pSql) { ...@@ -1475,7 +1476,7 @@ bool tscShouldBeFreed(SSqlObj* pSql) {
} }
STscObj* pTscObj = pSql->pTscObj; STscObj* pTscObj = pSql->pTscObj;
if (pSql->pStream != NULL || pTscObj->pHb == pSql || pTscObj->pSql == pSql) { if (pSql->pStream != NULL || pTscObj->pHb == pSql || pTscObj->pSql == pSql || pSql->pSubscription != NULL) {
return false; return false;
} }
......
...@@ -182,6 +182,7 @@ typedef struct SQInfo { ...@@ -182,6 +182,7 @@ typedef struct SQInfo {
SQueryRuntimeEnv runtimeEnv; SQueryRuntimeEnv runtimeEnv;
int32_t groupIndex; int32_t groupIndex;
int32_t offset; // offset in group result set of subgroup, todo refactor int32_t offset; // offset in group result set of subgroup, todo refactor
SArray* arrTableIdInfo;
T_REF_DECLARE() T_REF_DECLARE()
/* /*
......
...@@ -113,7 +113,6 @@ static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); ...@@ -113,7 +113,6 @@ static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols); static void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols);
static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static bool hasMainOutput(SQuery *pQuery); static bool hasMainOutput(SQuery *pQuery);
static void createTableQueryInfo(SQInfo *pQInfo);
static void buildTagQueryResult(SQInfo *pQInfo); static void buildTagQueryResult(SQInfo *pQInfo);
static int32_t setAdditionalInfo(SQInfo *pQInfo, STableId *pTableId, STableQueryInfo *pTableQueryInfo); static int32_t setAdditionalInfo(SQInfo *pQInfo, STableId *pTableId, STableQueryInfo *pTableQueryInfo);
...@@ -3460,7 +3459,11 @@ static bool hasMainOutput(SQuery *pQuery) { ...@@ -3460,7 +3459,11 @@ static bool hasMainOutput(SQuery *pQuery) {
return false; return false;
} }
STableQueryInfo *createTableQueryInfoImpl(SQueryRuntimeEnv *pRuntimeEnv, STableId tableId, STimeWindow win) { static STableQueryInfo *createTableQueryInfo(
SQueryRuntimeEnv *pRuntimeEnv,
STableId tableId,
STimeWindow win
) {
STableQueryInfo *pTableQueryInfo = calloc(1, sizeof(STableQueryInfo)); STableQueryInfo *pTableQueryInfo = calloc(1, sizeof(STableQueryInfo));
pTableQueryInfo->win = win; pTableQueryInfo->win = win;
...@@ -3870,7 +3873,18 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data ...@@ -3870,7 +3873,18 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
data += bytes * numOfRows; data += bytes * numOfRows;
} }
int32_t numOfTables = (int32_t)taosArrayGetSize(pQInfo->arrTableIdInfo);
*(int32_t*)data = htonl(numOfTables);
data += sizeof(int32_t);
for(int32_t i = 0; i < numOfTables; i++) {
STableIdInfo* pSrc = taosArrayGet(pQInfo->arrTableIdInfo, i);
STableIdInfo* pDst = (STableIdInfo*)data;
pDst->uid = htobe64(pSrc->uid);
pDst->tid = htonl(pSrc->tid);
pDst->key = htobe64(pSrc->key);
data += sizeof(STableIdInfo);
}
// all data returned, set query over // all data returned, set query over
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
if (pQInfo->runtimeEnv.stableQuery && isIntervalQuery(pQuery)) { if (pQInfo->runtimeEnv.stableQuery && isIntervalQuery(pQuery)) {
...@@ -4594,6 +4608,14 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4594,6 +4608,14 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
* to ensure that, we can reset the query range once query on a meter is completed. * to ensure that, we can reset the query range once query on a meter is completed.
*/ */
pQInfo->tableIndex++; pQInfo->tableIndex++;
pInfo->lastKey = pQuery->lastKey;
STableIdInfo tidInfo;
tidInfo.uid = item->id.uid;
tidInfo.tid = item->id.tid;
tidInfo.key = pInfo->lastKey;
taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo);
// if the buffer is full or group by each table, we need to jump out of the loop // if the buffer is full or group by each table, we need to jump out of the loop
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL) /*|| if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL) /*||
isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)*/) { isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)*/) {
...@@ -4663,33 +4685,13 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4663,33 +4685,13 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pQuery->limit.offset); pQuery->limit.offset);
} }
static void createTableQueryInfo(SQInfo *pQInfo) { static void prepareQueryInfoForReverseScan(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; // SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
// todo make sure the table are added the reference count to gauranteed that all involved tables are valid
size_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
int32_t index = 0; // for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
for (int32_t i = 0; i < numOfGroups; ++i) { // load all meter meta info // STableQueryInfo *pTableQueryInfo = pQInfo->pTableQueryInfo[i].pTableQInfo;
SArray *group = *(SArray **)taosArrayGet(pQInfo->groupInfo.pGroupList, i); // changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo);
// }
size_t s = taosArrayGetSize(group);
for (int32_t j = 0; j < s; ++j) {
SGroupItem* item = (SGroupItem *)taosArrayGet(group, j);
// STableQueryInfo has been created for each table
if (item->info != NULL) {
return;
}
STableQueryInfo* pInfo = createTableQueryInfoImpl(&pQInfo->runtimeEnv, item->id, pQuery->window);
pInfo->groupIdx = i;
pInfo->tableIndex = index;
item->info = pInfo;
index += 1;
}
}
} }
static void doSaveContext(SQInfo *pQInfo) { static void doSaveContext(SQInfo *pQInfo) {
...@@ -5196,20 +5198,10 @@ static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pEx ...@@ -5196,20 +5198,10 @@ static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pEx
static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **pTableIdList) { static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **pTableIdList) {
assert(pQueryMsg->numOfTables > 0); assert(pQueryMsg->numOfTables > 0);
*pTableIdList = taosArrayInit(pQueryMsg->numOfTables, sizeof(STableId)); *pTableIdList = taosArrayInit(pQueryMsg->numOfTables, sizeof(STableIdInfo));
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->tid = htonl(pTableIdInfo->tid);
pTableIdInfo->uid = htobe64(pTableIdInfo->uid);
pTableIdInfo->key = htobe64(pTableIdInfo->key);
STableId id = {.uid = pTableIdInfo->uid, .tid = pTableIdInfo->tid}; for (int32_t j = 0; j < pQueryMsg->numOfTables; ++j) {
taosArrayPush(*pTableIdList, &id); STableIdInfo* pTableIdInfo = (STableIdInfo *)pMsg;
pMsg += sizeof(STableIdInfo);
for (int32_t j = 1; j < pQueryMsg->numOfTables; ++j) {
pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->tid = htonl(pTableIdInfo->tid); pTableIdInfo->tid = htonl(pTableIdInfo->tid);
pTableIdInfo->uid = htobe64(pTableIdInfo->uid); pTableIdInfo->uid = htobe64(pTableIdInfo->uid);
...@@ -5660,7 +5652,16 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) { ...@@ -5660,7 +5652,16 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
} }
} }
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
static int compareTableIdInfo( const void* a, const void* b ) {
const STableIdInfo* x = (const STableIdInfo*)a;
const STableIdInfo* y = (const STableIdInfo*)b;
if (x->uid > y->uid) return 1;
if (x->uid < y->uid) return -1;
return 0;
}
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
STableGroupInfo *groupInfo, SColumnInfo* pTagCols) { STableGroupInfo *groupInfo, SColumnInfo* pTagCols) {
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
if (pQInfo == NULL) { if (pQInfo == NULL) {
...@@ -5753,6 +5754,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -5753,6 +5754,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQInfo->groupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); pQInfo->groupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES);
pQInfo->groupInfo.numOfTables = groupInfo->numOfTables; pQInfo->groupInfo.numOfTables = groupInfo->numOfTables;
int tableIndex = 0;
STimeWindow window = pQueryMsg->window;
taosArraySort( pTableIdList, compareTableIdInfo );
for(int32_t i = 0; i < numOfGroups; ++i) { for(int32_t i = 0; i < numOfGroups; ++i) {
SArray* pa = taosArrayGetP(groupInfo->pGroupList, i); SArray* pa = taosArrayGetP(groupInfo->pGroupList, i);
size_t s = taosArrayGetSize(pa); size_t s = taosArrayGetSize(pa);
...@@ -5760,13 +5764,26 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -5760,13 +5764,26 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
SArray* p1 = taosArrayInit(s, sizeof(SGroupItem)); SArray* p1 = taosArrayInit(s, sizeof(SGroupItem));
for(int32_t j = 0; j < s; ++j) { for(int32_t j = 0; j < s; ++j) {
SGroupItem item = { .id = *(STableId*) taosArrayGet(pa, j), .info = NULL, }; STableId id = *(STableId*) taosArrayGet(pa, j);
SGroupItem item = { .id = id };
// NOTE: compare STableIdInfo with STableId
// not a problem at present because we only use their 1st int64_t field
STableIdInfo* pTableId = taosArraySearch( pTableIdList, compareTableIdInfo, &id );
if (pTableId != NULL ) {
window.skey = pTableId->key;
} else {
window.skey = 0;
}
item.info = createTableQueryInfo(&pQInfo->runtimeEnv, item.id, window);
item.info->groupIdx = i;
item.info->tableIndex = tableIndex++;
taosArrayPush(p1, &item); taosArrayPush(p1, &item);
} }
taosArrayPush(pQInfo->groupInfo.pGroupList, &p1); taosArrayPush(pQInfo->groupInfo.pGroupList, &p1);
} }
pQInfo->arrTableIdInfo = taosArrayInit(tableIndex, sizeof(STableIdInfo));
pQuery->pos = -1; pQuery->pos = -1;
pQuery->window = pQueryMsg->window; pQuery->window = pQueryMsg->window;
...@@ -5918,6 +5935,7 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -5918,6 +5935,7 @@ static void freeQInfo(SQInfo *pQInfo) {
} }
taosArrayDestroy(pQInfo->tableIdGroupInfo.pGroupList); taosArrayDestroy(pQInfo->tableIdGroupInfo.pGroupList);
taosArrayDestroy(pQInfo->arrTableIdInfo);
if (pQuery->pGroupbyExpr != NULL) { if (pQuery->pGroupbyExpr != NULL) {
taosArrayDestroy(pQuery->pGroupbyExpr->columnInfo); taosArrayDestroy(pQuery->pGroupbyExpr->columnInfo);
...@@ -6045,13 +6063,13 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi ...@@ -6045,13 +6063,13 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_TABLE_QUERY)) { if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_TABLE_QUERY)) {
isSTableQuery = TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY); isSTableQuery = TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
STableId *id = taosArrayGet(pTableIdList, 0); STableIdInfo *id = taosArrayGet(pTableIdList, 0);
if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) { if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) {
goto _over; goto _over;
} }
} else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_STABLE_QUERY)) { } else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_STABLE_QUERY)) {
isSTableQuery = true; isSTableQuery = true;
STableId *id = taosArrayGet(pTableIdList, 0); STableIdInfo *id = taosArrayGet(pTableIdList, 0);
// group by normal column, do not pass the group by condition to tsdb to group table into different group // group by normal column, do not pass the group by condition to tsdb to group table into different group
int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols; int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols;
...@@ -6070,7 +6088,7 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi ...@@ -6070,7 +6088,7 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
assert(0); assert(0);
} }
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, &groupInfo, pTagColumnInfo); (*pQInfo) = createQInfoImpl(pQueryMsg, pTableIdList, pGroupbyExpr, pExprs, &groupInfo, pTagColumnInfo);
if ((*pQInfo) == NULL) { if ((*pQInfo) == NULL) {
code = TSDB_CODE_SERV_OUT_OF_MEMORY; code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _over; goto _over;
...@@ -6168,6 +6186,8 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co ...@@ -6168,6 +6186,8 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
size_t size = getResultSize(pQInfo, &pQuery->rec.rows); size_t size = getResultSize(pQInfo, &pQuery->rec.rows);
size += sizeof(int32_t);
size += sizeof(STableIdInfo) * taosArrayGetSize(pQInfo->arrTableIdInfo);
*contLen = size + sizeof(SRetrieveTableRsp); *contLen = size + sizeof(SRetrieveTableRsp);
// todo handle failed to allocate memory // todo handle failed to allocate memory
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册