提交 44536fdc 编写于 作者: weixin_48148422's avatar weixin_48148422

TBASE-1427

上级 3537de84
...@@ -3543,18 +3543,20 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { ...@@ -3543,18 +3543,20 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
tscSetResultPointer(pCmd, pRes); tscSetResultPointer(pCmd, pRes);
TAOS_FIELD *pField = tscFieldInfoGetField(pCmd, pCmd->fieldsInfo.numOfOutputCols - 1); if (pSql->pSubscription != NULL) {
int16_t offset = tscFieldInfoGetOffset(pCmd, pCmd->fieldsInfo.numOfOutputCols - 1); TAOS_FIELD *pField = tscFieldInfoGetField(pCmd, pCmd->fieldsInfo.numOfOutputCols - 1);
char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows; int16_t offset = tscFieldInfoGetOffset(pCmd, pCmd->fieldsInfo.numOfOutputCols - 1);
char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;
int32_t numOfMeters = htonl(*(int32_t*)p);
p += sizeof(int32_t); int32_t numOfMeters = htonl(*(int32_t*)p);
for (int i = 0; i < numOfMeters; i++) { p += sizeof(int32_t);
int64_t uid = htobe64(*(int64_t*)p); for (int i = 0; i < numOfMeters; i++) {
p += sizeof(int64_t); int64_t uid = htobe64(*(int64_t*)p);
TSKEY key = htobe64(*(TSKEY*)p); p += sizeof(int64_t);
p += sizeof(TSKEY); TSKEY key = htobe64(*(TSKEY*)p);
tscUpdateSubscriptionProgress(pSql, uid, key); p += sizeof(TSKEY);
tscUpdateSubscriptionProgress(pSql, uid, key);
}
} }
pRes->row = 0; pRes->row = 0;
......
...@@ -32,6 +32,7 @@ typedef struct SSubscriptionProgress { ...@@ -32,6 +32,7 @@ typedef struct SSubscriptionProgress {
} SSubscriptionProgress; } SSubscriptionProgress;
typedef struct SSub { typedef struct SSub {
int64_t lastSyncTime;
void * signature; void * signature;
TAOS * taos; TAOS * taos;
void * pTimer; void * pTimer;
...@@ -152,45 +153,44 @@ static void tscProcessSubscribeTimer(void *handle, void *tmrId) { ...@@ -152,45 +153,44 @@ static void tscProcessSubscribeTimer(void *handle, void *tmrId) {
} }
TAOS_SUB *taos_subscribe(TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval) { bool tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
STscObj* pObj = (STscObj*)taos;
if (pObj == NULL || pObj->signature != pObj) {
globalCode = TSDB_CODE_DISCONNECTED;
tscError("connection disconnected");
return NULL;
}
SSub* pSub = tscCreateSubscription(pObj, sql);
if (pSub == NULL) {
return NULL;
}
int code = (uint8_t)tsParseSql(pSub->pSql, pObj->acctId, pObj->db, false); int code = (uint8_t)tsParseSql(pSub->pSql, pObj->acctId, pObj->db, false);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taos_unsubscribe(pSub); taos_unsubscribe(pSub);
return NULL; return false;
} }
int numOfMeters = 0;
SSubscriptionProgress* progress = NULL;
// ??? if there's more than one vnode // ??? if there's more than one vnode
SSqlCmd* pCmd = &pSub->pSql->cmd; SSqlCmd* pCmd = &pSub->pSql->cmd;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
pSub->numOfMeters = 1; numOfMeters = 1;
pSub->progress = calloc(1, sizeof(SSubscriptionProgress)); progress = calloc(1, sizeof(SSubscriptionProgress));
pSub->progress[0].uid = pMeterMetaInfo->pMeterMeta->uid; int64_t uid = pMeterMetaInfo->pMeterMeta->uid;
progress[0].uid = uid;
progress[0].key = tscGetSubscriptionProgress(pSub->pSql, uid);
} else { } else {
SMetricMeta* pMetricMeta = pMeterMetaInfo->pMetricMeta; SMetricMeta* pMetricMeta = pMeterMetaInfo->pMetricMeta;
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
pSub->numOfMeters = pVnodeSidList->numOfSids; numOfMeters = pVnodeSidList->numOfSids;
pSub->progress = calloc(pSub->numOfMeters, sizeof(SSubscriptionProgress)); progress = calloc(numOfMeters, sizeof(SSubscriptionProgress));
for (int32_t i = 0; i < pSub->numOfMeters; ++i) { for (int32_t i = 0; i < numOfMeters; ++i) {
SMeterSidExtInfo *pMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i); SMeterSidExtInfo *pMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i);
pSub->progress[i].uid = pMeterInfo->uid; int64_t uid = pMeterInfo->uid;
progress[i].uid = uid;
progress[i].key = tscGetSubscriptionProgress(pSub->pSql, uid);
} }
qsort(pSub->progress, pSub->numOfMeters, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress); qsort(progress, numOfMeters, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress);
} }
free(pSub->progress);
pSub->numOfMeters = numOfMeters;
pSub->progress = progress;
// timestamp must in the output column // timestamp must in the output column
SFieldInfo* pFieldInfo = &pCmd->fieldsInfo; SFieldInfo* pFieldInfo = &pCmd->fieldsInfo;
tscFieldInfoSetValue(pFieldInfo, pFieldInfo->numOfOutputCols, TSDB_DATA_TYPE_TIMESTAMP, "_c0", TSDB_KEYSIZE); tscFieldInfoSetValue(pFieldInfo, pFieldInfo->numOfOutputCols, TSDB_DATA_TYPE_TIMESTAMP, "_c0", TSDB_KEYSIZE);
...@@ -198,6 +198,30 @@ TAOS_SUB *taos_subscribe(TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp ...@@ -198,6 +198,30 @@ TAOS_SUB *taos_subscribe(TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp
tscFieldInfoUpdateVisible(pFieldInfo, pFieldInfo->numOfOutputCols - 1, false); tscFieldInfoUpdateVisible(pFieldInfo, pFieldInfo->numOfOutputCols - 1, false);
tscFieldInfoCalOffset(pCmd); tscFieldInfoCalOffset(pCmd);
pSub->lastSyncTime = taosGetTimestampMs();
return true;
}
TAOS_SUB *taos_subscribe(TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval) {
STscObj* pObj = (STscObj*)taos;
if (pObj == NULL || pObj->signature != pObj) {
globalCode = TSDB_CODE_DISCONNECTED;
tscError("connection disconnected");
return NULL;
}
SSub* pSub = tscCreateSubscription(pObj, sql);
if (pSub == NULL) {
return NULL;
}
pSub->taos = taos;
if (!tscUpdateSubscription(pObj, pSub)) {
return NULL;
}
if (fp != NULL) { if (fp != NULL) {
pSub->fp = fp; pSub->fp = fp;
pSub->interval = interval; pSub->interval = interval;
...@@ -212,6 +236,12 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { ...@@ -212,6 +236,12 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
SSub *pSub = (SSub *)tsub; SSub *pSub = (SSub *)tsub;
if (pSub == NULL) return NULL; if (pSub == NULL) return NULL;
if (taosGetTimestampMs() - pSub->lastSyncTime > 30 * 10 * 1000) {
taos_query(pSub->taos, "reset query cache;");
// TODO: clear memory
if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL;
}
SSqlObj* pSql = pSub->pSql; SSqlObj* pSql = pSub->pSql;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
......
...@@ -1105,6 +1105,7 @@ int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQueryMsg) { ...@@ -1105,6 +1105,7 @@ int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQueryMsg) {
pSids[0] = (SMeterSidExtInfo *)pMsg; pSids[0] = (SMeterSidExtInfo *)pMsg;
pSids[0]->sid = htonl(pSids[0]->sid); pSids[0]->sid = htonl(pSids[0]->sid);
pSids[0]->uid = htobe64(pSids[0]->uid); pSids[0]->uid = htobe64(pSids[0]->uid);
pSids[0]->key = htobe64(pSids[0]->key);
for (int32_t j = 1; j < pQueryMsg->numOfSids; ++j) { for (int32_t j = 1; j < pQueryMsg->numOfSids; ++j) {
pSids[j] = (SMeterSidExtInfo *)((char *)pSids[j - 1] + sizeof(SMeterSidExtInfo) + pQueryMsg->tagLength); pSids[j] = (SMeterSidExtInfo *)((char *)pSids[j - 1] + sizeof(SMeterSidExtInfo) + pQueryMsg->tagLength);
......
...@@ -441,7 +441,9 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { ...@@ -441,7 +441,9 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
// buffer size for progress information, including meter count, // buffer size for progress information, including meter count,
// and for each meter, including 'uid' and 'TSKEY'. // and for each meter, including 'uid' and 'TSKEY'.
int progressSize = pQInfo->pMeterQuerySupporter->numOfMeters * (sizeof(int64_t) + sizeof(TSKEY)) + sizeof(int32_t); int progressSize = 0;
if (pQInfo->pMeterQuerySupporter != NULL)
progressSize = pQInfo->pMeterQuerySupporter->numOfMeters * (sizeof(int64_t) + sizeof(TSKEY)) + sizeof(int32_t);
pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, progressSize + size + 100); pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, progressSize + size + 100);
if (pStart == NULL) { if (pStart == NULL) {
...@@ -476,13 +478,15 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { ...@@ -476,13 +478,15 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
// write the progress information of each meter to response // write the progress information of each meter to response
// this is required by subscriptions // this is required by subscriptions
*((int32_t*)pMsg) = htonl(pQInfo->pMeterQuerySupporter->numOfMeters); if (progressSize > 0) {
pMsg += sizeof(int32_t); *((int32_t*)pMsg) = htonl(pQInfo->pMeterQuerySupporter->numOfMeters);
for (int32_t i = 0; i < pQInfo->pMeterQuerySupporter->numOfMeters; i++) { pMsg += sizeof(int32_t);
*((int64_t*)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->uid); for (int32_t i = 0; i < pQInfo->pMeterQuerySupporter->numOfMeters; i++) {
pMsg += sizeof(int64_t); *((int64_t*)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->uid);
*((TSKEY*)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->key); pMsg += sizeof(int64_t);
pMsg += sizeof(TSKEY); *((TSKEY*)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->key);
pMsg += sizeof(TSKEY);
}
} }
msgLen = pMsg - pStart; msgLen = pMsg - pStart;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册