提交 b451e685 编写于 作者: H Haojun Liao

[TD-2587]<fix>: fix bugs introduced by refactor.

上级 04e5bb2c
......@@ -69,14 +69,17 @@ TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt) {
}
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) {
if( sub == NULL)
if( sub == NULL) {
return;
}
SSub* pSub = (SSub*)sub;
SSubscriptionProgress target = {.uid = uid, .key = ts};
SSubscriptionProgress* p = taosArraySearch(pSub->progress, &target, tscCompareSubscriptionProgress);
if (p != NULL) {
p->key = ts;
tscDebug("subscribe:%s, uid:%"PRIu64" update sub start ts:%"PRId64, pSub->topic, p->uid, p->key);
}
}
......@@ -502,6 +505,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
if (taosArrayGetSize(pSub->progress) > 0) { // fix crash in single tabel subscription
pQueryInfo->window.skey = ((SSubscriptionProgress*)taosArrayGet(pSub->progress, 0))->key;
tscDebug("subscribe:%s set subscribe skey:%"PRId64, pSub->topic, pQueryInfo->window.skey);
}
if (pSub->pTimer == NULL) {
......
......@@ -1722,6 +1722,10 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, searchFn, pDataBlock);
}
// update the lastkey of current table for projection/aggregation query
TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey;
pTableQueryInfo->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
// interval query with limit applied
int32_t numOfRes = 0;
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
......@@ -4299,7 +4303,9 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
*(int32_t*)data = htonl(numOfTables);
data += sizeof(int32_t);
int32_t total = 0;
STableIdInfo* item = taosHashIterate(pQInfo->arrTableIdInfo, NULL);
while(item) {
STableIdInfo* pDst = (STableIdInfo*)data;
pDst->uid = htobe64(item->uid);
......@@ -4307,9 +4313,14 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
pDst->key = htobe64(item->key);
data += sizeof(STableIdInfo);
total++;
qDebug("QInfo:%p set subscribe info, tid:%d, uid:%"PRIu64", skey:%"PRId64, pQInfo, item->tid, item->uid, item->key);
item = taosHashIterate(pQInfo->arrTableIdInfo, item);
}
qDebug("QInfo:%p set %d subscribe info", pQInfo, total);
// Check if query is completed or not for stable query or normal table query respectively.
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
if (pQInfo->runtimeEnv.stableQuery) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册