diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 9a88ee9fc4d3f75c1eca2fd0b66a80aa573dbc1f..1aa56169df51b82651e25ef9eb403342f4b54ae6 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1531,7 +1531,7 @@ static char* doSerializeTableInfo(SSqlObj* pSql, int32_t numOfMeters, int32_t vn SMeterSidExtInfo *pMeterInfo = (SMeterSidExtInfo *)pMsg; pMeterInfo->sid = htonl(pMeterMeta->sid); pMeterInfo->uid = htobe64(pMeterMeta->uid); - pMeterInfo->skey = tscGetSubscriptionProgress(pSql, pMeterMeta->uid); + pMeterInfo->key = htobe64(tscGetSubscriptionProgress(pSql, pMeterMeta->uid)); pMsg += sizeof(SMeterSidExtInfo); } else { SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); @@ -1542,7 +1542,7 @@ static char* doSerializeTableInfo(SSqlObj* pSql, int32_t numOfMeters, int32_t vn pMeterInfo->sid = htonl(pQueryMeterInfo->sid); pMeterInfo->uid = htobe64(pQueryMeterInfo->uid); - pMeterInfo->skey = tscGetSubscriptionProgress(pSql, pMeterMeta->uid); + pMeterInfo->key = htobe64(tscGetSubscriptionProgress(pSql, pQueryMeterInfo->uid)); pMsg += sizeof(SMeterSidExtInfo); @@ -3526,6 +3526,7 @@ int tscProcessQueryRsp(SSqlObj *pSql) { return 0; } +void tscUpdateSubscriptionProgress(SSqlObj* pSql, int64_t uid, TSKEY ts); int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; @@ -3536,11 +3537,25 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { pRes->numOfRows = htonl(pRetrieve->numOfRows); pRes->precision = htons(pRetrieve->precision); pRes->offset = htobe64(pRetrieve->offset); - pRes->uid = pRetrieve->uid; pRes->useconds = htobe64(pRetrieve->useconds); pRes->data = pRetrieve->data; tscSetResultPointer(pCmd, pRes); + + TAOS_FIELD *pField = tscFieldInfoGetField(pCmd, pCmd->fieldsInfo.numOfOutputCols - 1); + int16_t offset = tscFieldInfoGetOffset(pCmd, pCmd->fieldsInfo.numOfOutputCols - 1); + char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows; + + int32_t numOfMeters = htonl(*(int32_t*)p); + p += sizeof(int32_t); + for (int i = 0; i < numOfMeters; i++) { + int64_t uid = htobe64(*(int64_t*)p); + p += sizeof(int64_t); + TSKEY key = htobe64(*(TSKEY*)p); + p += sizeof(TSKEY); + tscUpdateSubscriptionProgress(pSql, uid, key); + } + pRes->row = 0; /** diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 404c3f947b164ffcf90cf4b9466234c8a4d99261..9c812d4187394505d0f745e016339037e7d02bc5 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -627,11 +627,6 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { rows = taos_fetch_row_impl(res); } - if (rows != NULL && pSql->pSubscription != NULL) { - TSKEY ts = *(TSKEY*)rows[pCmd->fieldsInfo.numOfOutputCols - 1]; - tscUpdateSubscriptionProgress(pMeterMetaInfo->pMeterMeta->uid, ts); - } - // check!!! if (rows != NULL || pMeterMetaInfo->vnodeIndex >= pMeterMetaInfo->pMetricMeta->numOfVnodes) { break; @@ -866,61 +861,63 @@ void taos_stop_query(TAOS_RES *res) { int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) { int len = 0; for (int i = 0; i < num_fields; ++i) { + if (i > 0) { + str[len++] = ' '; + } + if (row[i] == NULL) { - len += sprintf(str + len, "%s ", TSDB_DATA_NULL_STR); + len += sprintf(str + len, "%s", TSDB_DATA_NULL_STR); continue; } switch (fields[i].type) { case TSDB_DATA_TYPE_TINYINT: - len += sprintf(str + len, "%d ", *((char *)row[i])); + len += sprintf(str + len, "%d", *((char *)row[i])); break; case TSDB_DATA_TYPE_SMALLINT: - len += sprintf(str + len, "%d ", *((short *)row[i])); + len += sprintf(str + len, "%d", *((short *)row[i])); break; case TSDB_DATA_TYPE_INT: - len += sprintf(str + len, "%d ", *((int *)row[i])); + len += sprintf(str + len, "%d", *((int *)row[i])); break; case TSDB_DATA_TYPE_BIGINT: - len += sprintf(str + len, "%" PRId64 " ", *((int64_t *)row[i])); + len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i])); break; case TSDB_DATA_TYPE_FLOAT: { float fv = 0; fv = GET_FLOAT_VAL(row[i]); - len += sprintf(str + len, "%f ", fv); + len += sprintf(str + len, "%f", fv); } break; case TSDB_DATA_TYPE_DOUBLE:{ double dv = 0; dv = GET_DOUBLE_VAL(row[i]); - len += sprintf(str + len, "%lf ", dv); + len += sprintf(str + len, "%lf", dv); } - break; case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: { - /* limit the max length of string to no greater than the maximum length, - * in case of not null-terminated string */ - size_t xlen = strlen(row[i]); - size_t trueLen = MIN(xlen, fields[i].bytes); - - memcpy(str + len, (char *)row[i], trueLen); - - str[len + trueLen] = ' '; - len += (trueLen + 1); - } break; + size_t xlen = 0; + for (xlen = 0; xlen <= fields[i].bytes; xlen++) { + char c = ((char*)row[i])[xlen]; + if (c == 0) break; + str[len++] = c; + } + str[len] = 0; + } + break; case TSDB_DATA_TYPE_TIMESTAMP: - len += sprintf(str + len, "%" PRId64 " ", *((int64_t *)row[i])); + len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i])); break; case TSDB_DATA_TYPE_BOOL: - len += sprintf(str + len, "%d ", *((int8_t *)row[i])); + len += sprintf(str + len, "%d", *((int8_t *)row[i])); default: break; } diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index af3a57637720260795af3d86697bafde0fc863b0..a55e889c1c067647a371103d84e2fee6ef370433 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -80,7 +80,7 @@ void tscUpdateSubscriptionProgress(SSqlObj* pSql, int64_t uid, TSKEY ts) { else if (p->uid < uid) s = m + 1; else { - p->key = ts + 1; + if (ts >= p->key) p->key = ts + 1; break; } } @@ -219,6 +219,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { pRes->numOfTotal = 0; pRes->qhandle = 0; pSql->thandle = NULL; + pSql->cmd.command = TSDB_SQL_SELECT; tscDoQuery(pSql); if (pRes->code != TSDB_CODE_SUCCESS) { diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 89807f0a1960ffbeeed8218111b4acd732228f71..9151851330c7615f07e1fa229ae74a106c749435 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -490,7 +490,7 @@ typedef struct SColumnInfo { typedef struct SMeterSidExtInfo { int32_t sid; int64_t uid; - TSKEY skey; // start key for subscription + TSKEY key; // key for subscription char tags[]; } SMeterSidExtInfo; @@ -573,7 +573,6 @@ typedef struct { int16_t precision; int64_t offset; // updated offset value for multi-vnode projection query int64_t useconds; - int64_t uid; char data[]; } SRetrieveMeterRsp; diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index c69b27537e82fc7a58aff764276bc8d932f18a1c..2e59789b27f1c5646e50b22c4d6dc32d4466cf72 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -750,6 +750,8 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { pQuery->ekey = pSupporter->rawEKey; pSupporter->meterIdx++; + pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[k]->key = pQuery->lastKey; + // if the buffer is full or group by each table, we need to jump out of the loop if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL) || isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)) { @@ -765,6 +767,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { assert(!Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)); continue; } else { + pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[k]->key = pQuery->lastKey; // buffer is full, wait for the next round to retrieve data from current meter assert(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)); break; diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index bbd3e9465c32174566f08b809545ae4b5d7e5f65..2a913665cd0246b2753adde882fec65a808e3a16 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -753,7 +753,6 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE taosAddIntHash(pSupporter->pMeterObj, pMetersObj[i]->sid, (char *)&pMetersObj[i]); } - pSupporter->pMeterSidExtInfo = (SMeterSidExtInfo **)pQueryMsg->pSidExtInfo; int32_t sidElemLen = pQueryMsg->tagLength + sizeof(SMeterSidExtInfo); int32_t size = POINTER_BYTES * pQueryMsg->numOfSids + sidElemLen * pQueryMsg->numOfSids; @@ -767,12 +766,16 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE char *px = ((char *)pSupporter->pMeterSidExtInfo) + POINTER_BYTES * pQueryMsg->numOfSids; for (int32_t i = 0; i < pQueryMsg->numOfSids; ++i) { - pSupporter->pMeterSidExtInfo[i] = (SMeterSidExtInfo *)px; - pSupporter->pMeterSidExtInfo[i]->sid = ((SMeterSidExtInfo **)pQueryMsg->pSidExtInfo)[i]->sid; + SMeterSidExtInfo* pSrc = ((SMeterSidExtInfo **)pQueryMsg->pSidExtInfo)[i]; + SMeterSidExtInfo* pDst = (SMeterSidExtInfo *)px; + + pSupporter->pMeterSidExtInfo[i] = pDst; + pDst->sid = pSrc->sid; + pDst->uid = pSrc->uid; + pDst->key = pSrc->key; if (pQueryMsg->tagLength > 0) { - memcpy(pSupporter->pMeterSidExtInfo[i]->tags, ((SMeterSidExtInfo **)pQueryMsg->pSidExtInfo)[i]->tags, - pQueryMsg->tagLength); + memcpy(pDst->tags, pSrc->tags, pQueryMsg->tagLength); } px += sidElemLen; } @@ -1107,6 +1110,7 @@ int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQueryMsg) { pSids[j] = (SMeterSidExtInfo *)((char *)pSids[j - 1] + sizeof(SMeterSidExtInfo) + pQueryMsg->tagLength); pSids[j]->sid = htonl(pSids[j]->sid); pSids[j]->uid = htobe64(pSids[j]->uid); + pSids[j]->key = htobe64(pSids[j]->key); } pMsg = (char *)pSids[pQueryMsg->numOfSids - 1]; diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index 5177afc13a40db878dd6b79cbc1bdb093b4acdae..2c8fe35cca11116d279adfc810d232cf1acb720a 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -412,6 +412,7 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { int code = 0; pRetrieve = (SRetrieveMeterMsg *)pMsg; + SQInfo* pQInfo = (SQInfo*)pRetrieve->qhandle; pRetrieve->free = htons(pRetrieve->free); if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { @@ -438,7 +439,11 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { size = vnodeGetResultSize((void *)(pRetrieve->qhandle), &numOfRows); } - pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, size + 100); + // buffer size for progress information, including meter count, + // and for each meter, including 'uid' and 'TSKEY'. + int progressSize = pQInfo->pMeterQuerySupporter->numOfMeters * (sizeof(int64_t) + sizeof(TSKEY)) + sizeof(int32_t); + + pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, progressSize + size + 100); if (pStart == NULL) { taosSendSimpleRsp(pObj->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); goto _exit; @@ -456,7 +461,6 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { if (code == TSDB_CODE_SUCCESS) { pRsp->offset = htobe64(vnodeGetOffsetVal((void*)pRetrieve->qhandle)); pRsp->useconds = htobe64(((SQInfo *)(pRetrieve->qhandle))->useconds); - pRsp->uid = ((SQInfo *)(pRetrieve->qhandle))->pObj->uid; } else { pRsp->offset = 0; pRsp->useconds = 0; @@ -469,11 +473,23 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { } pMsg += size; + + // write the progress information of each meter to response + // this is required by subscriptions + *((int32_t*)pMsg) = htonl(pQInfo->pMeterQuerySupporter->numOfMeters); + pMsg += sizeof(int32_t); + for (int32_t i = 0; i < pQInfo->pMeterQuerySupporter->numOfMeters; i++) { + *((int64_t*)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->uid); + pMsg += sizeof(int64_t); + *((TSKEY*)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->key); + pMsg += sizeof(TSKEY); + } + msgLen = pMsg - pStart; assert(code != TSDB_CODE_ACTION_IN_PROGRESS); - if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) { + if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS) && pRetrieve->qhandle != NULL) { dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code); vnodeDecRefCount(pObj->qhandle); pObj->qhandle = NULL;