提交 192ad4df 编写于 作者: weixin_48148422's avatar weixin_48148422

return last key of each meter to client

上级 517fb883
...@@ -1531,7 +1531,7 @@ static char* doSerializeTableInfo(SSqlObj* pSql, int32_t numOfMeters, int32_t vn ...@@ -1531,7 +1531,7 @@ static char* doSerializeTableInfo(SSqlObj* pSql, int32_t numOfMeters, int32_t vn
SMeterSidExtInfo *pMeterInfo = (SMeterSidExtInfo *)pMsg; SMeterSidExtInfo *pMeterInfo = (SMeterSidExtInfo *)pMsg;
pMeterInfo->sid = htonl(pMeterMeta->sid); pMeterInfo->sid = htonl(pMeterMeta->sid);
pMeterInfo->uid = htobe64(pMeterMeta->uid); pMeterInfo->uid = htobe64(pMeterMeta->uid);
pMeterInfo->skey = tscGetSubscriptionProgress(pSql, pMeterMeta->uid); pMeterInfo->key = htobe64(tscGetSubscriptionProgress(pSql, pMeterMeta->uid));
pMsg += sizeof(SMeterSidExtInfo); pMsg += sizeof(SMeterSidExtInfo);
} else { } else {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
...@@ -1542,7 +1542,7 @@ static char* doSerializeTableInfo(SSqlObj* pSql, int32_t numOfMeters, int32_t vn ...@@ -1542,7 +1542,7 @@ static char* doSerializeTableInfo(SSqlObj* pSql, int32_t numOfMeters, int32_t vn
pMeterInfo->sid = htonl(pQueryMeterInfo->sid); pMeterInfo->sid = htonl(pQueryMeterInfo->sid);
pMeterInfo->uid = htobe64(pQueryMeterInfo->uid); pMeterInfo->uid = htobe64(pQueryMeterInfo->uid);
pMeterInfo->skey = tscGetSubscriptionProgress(pSql, pMeterMeta->uid); pMeterInfo->key = htobe64(tscGetSubscriptionProgress(pSql, pQueryMeterInfo->uid));
pMsg += sizeof(SMeterSidExtInfo); pMsg += sizeof(SMeterSidExtInfo);
...@@ -3526,6 +3526,7 @@ int tscProcessQueryRsp(SSqlObj *pSql) { ...@@ -3526,6 +3526,7 @@ int tscProcessQueryRsp(SSqlObj *pSql) {
return 0; return 0;
} }
void tscUpdateSubscriptionProgress(SSqlObj* pSql, int64_t uid, TSKEY ts);
int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
...@@ -3536,11 +3537,25 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { ...@@ -3536,11 +3537,25 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
pRes->numOfRows = htonl(pRetrieve->numOfRows); pRes->numOfRows = htonl(pRetrieve->numOfRows);
pRes->precision = htons(pRetrieve->precision); pRes->precision = htons(pRetrieve->precision);
pRes->offset = htobe64(pRetrieve->offset); pRes->offset = htobe64(pRetrieve->offset);
pRes->uid = pRetrieve->uid;
pRes->useconds = htobe64(pRetrieve->useconds); pRes->useconds = htobe64(pRetrieve->useconds);
pRes->data = pRetrieve->data; pRes->data = pRetrieve->data;
tscSetResultPointer(pCmd, pRes); tscSetResultPointer(pCmd, pRes);
TAOS_FIELD *pField = tscFieldInfoGetField(pCmd, pCmd->fieldsInfo.numOfOutputCols - 1);
int16_t offset = tscFieldInfoGetOffset(pCmd, pCmd->fieldsInfo.numOfOutputCols - 1);
char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows;
int32_t numOfMeters = htonl(*(int32_t*)p);
p += sizeof(int32_t);
for (int i = 0; i < numOfMeters; i++) {
int64_t uid = htobe64(*(int64_t*)p);
p += sizeof(int64_t);
TSKEY key = htobe64(*(TSKEY*)p);
p += sizeof(TSKEY);
tscUpdateSubscriptionProgress(pSql, uid, key);
}
pRes->row = 0; pRes->row = 0;
/** /**
......
...@@ -627,11 +627,6 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { ...@@ -627,11 +627,6 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
rows = taos_fetch_row_impl(res); rows = taos_fetch_row_impl(res);
} }
if (rows != NULL && pSql->pSubscription != NULL) {
TSKEY ts = *(TSKEY*)rows[pCmd->fieldsInfo.numOfOutputCols - 1];
tscUpdateSubscriptionProgress(pMeterMetaInfo->pMeterMeta->uid, ts);
}
// check!!! // check!!!
if (rows != NULL || pMeterMetaInfo->vnodeIndex >= pMeterMetaInfo->pMetricMeta->numOfVnodes) { if (rows != NULL || pMeterMetaInfo->vnodeIndex >= pMeterMetaInfo->pMetricMeta->numOfVnodes) {
break; break;
...@@ -866,61 +861,63 @@ void taos_stop_query(TAOS_RES *res) { ...@@ -866,61 +861,63 @@ void taos_stop_query(TAOS_RES *res) {
int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) { int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
int len = 0; int len = 0;
for (int i = 0; i < num_fields; ++i) { for (int i = 0; i < num_fields; ++i) {
if (i > 0) {
str[len++] = ' ';
}
if (row[i] == NULL) { if (row[i] == NULL) {
len += sprintf(str + len, "%s ", TSDB_DATA_NULL_STR); len += sprintf(str + len, "%s", TSDB_DATA_NULL_STR);
continue; continue;
} }
switch (fields[i].type) { switch (fields[i].type) {
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
len += sprintf(str + len, "%d ", *((char *)row[i])); len += sprintf(str + len, "%d", *((char *)row[i]));
break; break;
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT:
len += sprintf(str + len, "%d ", *((short *)row[i])); len += sprintf(str + len, "%d", *((short *)row[i]));
break; break;
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
len += sprintf(str + len, "%d ", *((int *)row[i])); len += sprintf(str + len, "%d", *((int *)row[i]));
break; break;
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
len += sprintf(str + len, "%" PRId64 " ", *((int64_t *)row[i])); len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
break; break;
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_FLOAT: {
float fv = 0; float fv = 0;
fv = GET_FLOAT_VAL(row[i]); fv = GET_FLOAT_VAL(row[i]);
len += sprintf(str + len, "%f ", fv); len += sprintf(str + len, "%f", fv);
} }
break; break;
case TSDB_DATA_TYPE_DOUBLE:{ case TSDB_DATA_TYPE_DOUBLE:{
double dv = 0; double dv = 0;
dv = GET_DOUBLE_VAL(row[i]); dv = GET_DOUBLE_VAL(row[i]);
len += sprintf(str + len, "%lf ", dv); len += sprintf(str + len, "%lf", dv);
} }
break;
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: { case TSDB_DATA_TYPE_NCHAR: {
/* limit the max length of string to no greater than the maximum length, size_t xlen = 0;
* in case of not null-terminated string */ for (xlen = 0; xlen <= fields[i].bytes; xlen++) {
size_t xlen = strlen(row[i]); char c = ((char*)row[i])[xlen];
size_t trueLen = MIN(xlen, fields[i].bytes); if (c == 0) break;
str[len++] = c;
memcpy(str + len, (char *)row[i], trueLen); }
str[len] = 0;
str[len + trueLen] = ' '; }
len += (trueLen + 1); break;
} break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
len += sprintf(str + len, "%" PRId64 " ", *((int64_t *)row[i])); len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
break; break;
case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_BOOL:
len += sprintf(str + len, "%d ", *((int8_t *)row[i])); len += sprintf(str + len, "%d", *((int8_t *)row[i]));
default: default:
break; break;
} }
......
...@@ -80,7 +80,7 @@ void tscUpdateSubscriptionProgress(SSqlObj* pSql, int64_t uid, TSKEY ts) { ...@@ -80,7 +80,7 @@ void tscUpdateSubscriptionProgress(SSqlObj* pSql, int64_t uid, TSKEY ts) {
else if (p->uid < uid) else if (p->uid < uid)
s = m + 1; s = m + 1;
else { else {
p->key = ts + 1; if (ts >= p->key) p->key = ts + 1;
break; break;
} }
} }
...@@ -219,6 +219,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { ...@@ -219,6 +219,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
pRes->numOfTotal = 0; pRes->numOfTotal = 0;
pRes->qhandle = 0; pRes->qhandle = 0;
pSql->thandle = NULL; pSql->thandle = NULL;
pSql->cmd.command = TSDB_SQL_SELECT;
tscDoQuery(pSql); tscDoQuery(pSql);
if (pRes->code != TSDB_CODE_SUCCESS) { if (pRes->code != TSDB_CODE_SUCCESS) {
......
...@@ -490,7 +490,7 @@ typedef struct SColumnInfo { ...@@ -490,7 +490,7 @@ typedef struct SColumnInfo {
typedef struct SMeterSidExtInfo { typedef struct SMeterSidExtInfo {
int32_t sid; int32_t sid;
int64_t uid; int64_t uid;
TSKEY skey; // start key for subscription TSKEY key; // key for subscription
char tags[]; char tags[];
} SMeterSidExtInfo; } SMeterSidExtInfo;
...@@ -573,7 +573,6 @@ typedef struct { ...@@ -573,7 +573,6 @@ typedef struct {
int16_t precision; int16_t precision;
int64_t offset; // updated offset value for multi-vnode projection query int64_t offset; // updated offset value for multi-vnode projection query
int64_t useconds; int64_t useconds;
int64_t uid;
char data[]; char data[];
} SRetrieveMeterRsp; } SRetrieveMeterRsp;
......
...@@ -750,6 +750,8 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { ...@@ -750,6 +750,8 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
pQuery->ekey = pSupporter->rawEKey; pQuery->ekey = pSupporter->rawEKey;
pSupporter->meterIdx++; pSupporter->meterIdx++;
pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[k]->key = pQuery->lastKey;
// if the buffer is full or group by each table, we need to jump out of the loop // if the buffer is full or group by each table, we need to jump out of the loop
if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL) || if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL) ||
isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)) { isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)) {
...@@ -765,6 +767,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { ...@@ -765,6 +767,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
assert(!Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)); assert(!Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL));
continue; continue;
} else { } else {
pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[k]->key = pQuery->lastKey;
// buffer is full, wait for the next round to retrieve data from current meter // buffer is full, wait for the next round to retrieve data from current meter
assert(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)); assert(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL));
break; break;
......
...@@ -753,7 +753,6 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE ...@@ -753,7 +753,6 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
taosAddIntHash(pSupporter->pMeterObj, pMetersObj[i]->sid, (char *)&pMetersObj[i]); taosAddIntHash(pSupporter->pMeterObj, pMetersObj[i]->sid, (char *)&pMetersObj[i]);
} }
pSupporter->pMeterSidExtInfo = (SMeterSidExtInfo **)pQueryMsg->pSidExtInfo;
int32_t sidElemLen = pQueryMsg->tagLength + sizeof(SMeterSidExtInfo); int32_t sidElemLen = pQueryMsg->tagLength + sizeof(SMeterSidExtInfo);
int32_t size = POINTER_BYTES * pQueryMsg->numOfSids + sidElemLen * pQueryMsg->numOfSids; int32_t size = POINTER_BYTES * pQueryMsg->numOfSids + sidElemLen * pQueryMsg->numOfSids;
...@@ -767,12 +766,16 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE ...@@ -767,12 +766,16 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
char *px = ((char *)pSupporter->pMeterSidExtInfo) + POINTER_BYTES * pQueryMsg->numOfSids; char *px = ((char *)pSupporter->pMeterSidExtInfo) + POINTER_BYTES * pQueryMsg->numOfSids;
for (int32_t i = 0; i < pQueryMsg->numOfSids; ++i) { for (int32_t i = 0; i < pQueryMsg->numOfSids; ++i) {
pSupporter->pMeterSidExtInfo[i] = (SMeterSidExtInfo *)px; SMeterSidExtInfo* pSrc = ((SMeterSidExtInfo **)pQueryMsg->pSidExtInfo)[i];
pSupporter->pMeterSidExtInfo[i]->sid = ((SMeterSidExtInfo **)pQueryMsg->pSidExtInfo)[i]->sid; SMeterSidExtInfo* pDst = (SMeterSidExtInfo *)px;
pSupporter->pMeterSidExtInfo[i] = pDst;
pDst->sid = pSrc->sid;
pDst->uid = pSrc->uid;
pDst->key = pSrc->key;
if (pQueryMsg->tagLength > 0) { if (pQueryMsg->tagLength > 0) {
memcpy(pSupporter->pMeterSidExtInfo[i]->tags, ((SMeterSidExtInfo **)pQueryMsg->pSidExtInfo)[i]->tags, memcpy(pDst->tags, pSrc->tags, pQueryMsg->tagLength);
pQueryMsg->tagLength);
} }
px += sidElemLen; px += sidElemLen;
} }
...@@ -1107,6 +1110,7 @@ int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQueryMsg) { ...@@ -1107,6 +1110,7 @@ int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQueryMsg) {
pSids[j] = (SMeterSidExtInfo *)((char *)pSids[j - 1] + sizeof(SMeterSidExtInfo) + pQueryMsg->tagLength); pSids[j] = (SMeterSidExtInfo *)((char *)pSids[j - 1] + sizeof(SMeterSidExtInfo) + pQueryMsg->tagLength);
pSids[j]->sid = htonl(pSids[j]->sid); pSids[j]->sid = htonl(pSids[j]->sid);
pSids[j]->uid = htobe64(pSids[j]->uid); pSids[j]->uid = htobe64(pSids[j]->uid);
pSids[j]->key = htobe64(pSids[j]->key);
} }
pMsg = (char *)pSids[pQueryMsg->numOfSids - 1]; pMsg = (char *)pSids[pQueryMsg->numOfSids - 1];
......
...@@ -412,6 +412,7 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { ...@@ -412,6 +412,7 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
int code = 0; int code = 0;
pRetrieve = (SRetrieveMeterMsg *)pMsg; pRetrieve = (SRetrieveMeterMsg *)pMsg;
SQInfo* pQInfo = (SQInfo*)pRetrieve->qhandle;
pRetrieve->free = htons(pRetrieve->free); pRetrieve->free = htons(pRetrieve->free);
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
...@@ -438,7 +439,11 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { ...@@ -438,7 +439,11 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
size = vnodeGetResultSize((void *)(pRetrieve->qhandle), &numOfRows); size = vnodeGetResultSize((void *)(pRetrieve->qhandle), &numOfRows);
} }
pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, size + 100); // buffer size for progress information, including meter count,
// and for each meter, including 'uid' and 'TSKEY'.
int progressSize = pQInfo->pMeterQuerySupporter->numOfMeters * (sizeof(int64_t) + sizeof(TSKEY)) + sizeof(int32_t);
pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, progressSize + size + 100);
if (pStart == NULL) { if (pStart == NULL) {
taosSendSimpleRsp(pObj->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); taosSendSimpleRsp(pObj->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
goto _exit; goto _exit;
...@@ -456,7 +461,6 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { ...@@ -456,7 +461,6 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pRsp->offset = htobe64(vnodeGetOffsetVal((void*)pRetrieve->qhandle)); pRsp->offset = htobe64(vnodeGetOffsetVal((void*)pRetrieve->qhandle));
pRsp->useconds = htobe64(((SQInfo *)(pRetrieve->qhandle))->useconds); pRsp->useconds = htobe64(((SQInfo *)(pRetrieve->qhandle))->useconds);
pRsp->uid = ((SQInfo *)(pRetrieve->qhandle))->pObj->uid;
} else { } else {
pRsp->offset = 0; pRsp->offset = 0;
pRsp->useconds = 0; pRsp->useconds = 0;
...@@ -469,11 +473,23 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { ...@@ -469,11 +473,23 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
} }
pMsg += size; pMsg += size;
// write the progress information of each meter to response
// this is required by subscriptions
*((int32_t*)pMsg) = htonl(pQInfo->pMeterQuerySupporter->numOfMeters);
pMsg += sizeof(int32_t);
for (int32_t i = 0; i < pQInfo->pMeterQuerySupporter->numOfMeters; i++) {
*((int64_t*)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->uid);
pMsg += sizeof(int64_t);
*((TSKEY*)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->key);
pMsg += sizeof(TSKEY);
}
msgLen = pMsg - pStart; msgLen = pMsg - pStart;
assert(code != TSDB_CODE_ACTION_IN_PROGRESS); assert(code != TSDB_CODE_ACTION_IN_PROGRESS);
if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) { if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS) && pRetrieve->qhandle != NULL) {
dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code); dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code);
vnodeDecRefCount(pObj->qhandle); vnodeDecRefCount(pObj->qhandle);
pObj->qhandle = NULL; pObj->qhandle = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册