diff --git a/include/client/taos.h b/include/client/taos.h index 49cfbb52b80e88103fe6befc6d2818641e731fcf..270b647a77d18fd10e97954e435000799bdd6007 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -254,7 +254,7 @@ enum tmq_res_t { TMQ_RES_INVALID = -1, TMQ_RES_DATA = 1, TMQ_RES_TABLE_META = 2, - TMQ_RES_TAOSX = 3, + TMQ_RES_METADATA = 3, }; typedef struct tmq_raw_data { diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index b8fa9580e70c1c7aa17a1402ce6c8113a7f8e094..574d8188fef5b2866b17f7a9f8f7842f162421a3 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -52,7 +52,7 @@ enum { RES_TYPE__QUERY = 1, RES_TYPE__TMQ, RES_TYPE__TMQ_META, - RES_TYPE__TAOSX, + RES_TYPE__TMQ_METADATA, }; #define SHOW_VARIABLES_RESULT_COLS 2 @@ -60,9 +60,9 @@ enum { #define SHOW_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE) #define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) -#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ || *(int8_t*)res == RES_TYPE__TAOSX) +#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) #define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META) -#define TD_RES_TMQ_TAOSX(res) (*(int8_t*)res == RES_TYPE__TAOSX) +#define TD_RES_TMQ_METADATA(res) (*(int8_t*)res == RES_TYPE__TMQ_METADATA) typedef struct SAppInstInfo SAppInstInfo; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 30860780807a820b041e27729f8e351fb46c99b3..273bbcf7ac400fa48e797059995025f2d7604276 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -184,7 +184,7 @@ void taos_free_result(TAOS_RES *res) { SRequestObj *pRequest = (SRequestObj *)res; tscDebug("0x%" PRIx64 " taos_free_result start to free query", pRequest->requestId); destroyRequest(pRequest); - } else if (TD_RES_TMQ_TAOSX(res)) { + } else if (TD_RES_TMQ_METADATA(res)) { SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res; if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen); diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index c135965f07454c46e20780d029c18b2359110877..7c78928333eb75b025052e1c20c10c363f4fdd5f 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -25,12 +25,11 @@ #include "tref.h" #include "ttimer.h" -static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, - int8_t t) { - char* string = NULL; +static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, + int8_t t, cJSON* tables) { cJSON* json = cJSON_CreateObject(); if (json == NULL) { - return string; + return; } cJSON* type = cJSON_CreateString("create"); cJSON_AddItemToObject(json, "type", type); @@ -87,10 +86,7 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch cJSON_AddItemToArray(tags, tag); } cJSON_AddItemToObject(json, "tags", tags); - - string = cJSON_PrintUnformatted(json); - cJSON_Delete(json); - return string; + cJSON_AddItemToArray(tables, json); } static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) { @@ -189,6 +185,7 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) { SVCreateStbReq req = {0}; SDecoder coder; char* string = NULL; + cJSON* tables = cJSON_CreateArray(); // decode and process req void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); @@ -198,11 +195,11 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) { if (tDecodeSVCreateStbReq(&coder, &req) < 0) { goto _err; } - string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE); - tDecoderClear(&coder); - return string; + buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, tables); + string = cJSON_PrintUnformatted(tables); _err: + cJSON_Delete(tables); tDecoderClear(&coder); return string; } @@ -229,12 +226,11 @@ _err: return string; } -static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* tagName, int64_t id, uint8_t tagNum) { - char* string = NULL; +static void buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* tagName, int64_t id, uint8_t tagNum, cJSON* tables) { SArray* pTagVals = NULL; cJSON* json = cJSON_CreateObject(); if (json == NULL) { - return string; + return; } cJSON* type = cJSON_CreateString("create"); cJSON_AddItemToObject(json, "type", type); @@ -315,10 +311,8 @@ static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* end: cJSON_AddItemToObject(json, "tags", tags); - string = cJSON_PrintUnformatted(json); - cJSON_Delete(json); taosArrayDestroy(pTagVals); - return string; + cJSON_AddItemToArray(tables, json); } static char* processCreateTable(SMqMetaRsp* metaRsp) { @@ -335,21 +329,54 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) { } // loop to create table + cJSON* tables = cJSON_CreateArray(); for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; if (pCreateReq->type == TSDB_CHILD_TABLE) { - string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name, - pCreateReq->ctb.tagName, pCreateReq->uid, pCreateReq->ctb.tagNum); + buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name, + pCreateReq->ctb.tagName, pCreateReq->uid, pCreateReq->ctb.tagNum, tables); } else if (pCreateReq->type == TSDB_NORMAL_TABLE) { - string = - buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE); + buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE, tables); } } + string = cJSON_PrintUnformatted(tables); + cJSON_Delete(tables); +_exit: tDecoderClear(&decoder); + return string; +} + +static char* processAutoCreateTable(STaosxRsp* rsp) { + SDecoder decoder = {0}; + SVCreateTbReq* pCreateReq; + char* string = NULL; + + + // loop to create table + cJSON* tables = cJSON_CreateArray(); + for (int32_t iReq = 0; iReq < rsp->createTableNum; iReq++) { + // decode + void** data = taosArrayGet(rsp->createTableReq, iReq); + int32_t *len = taosArrayGet(rsp->createTableLen, iReq); + tDecoderInit(&decoder, *data, *len); + if (tDecodeSVCreateTbReq(&decoder, pCreateReq) < 0) { + tDecoderClear(&decoder); + goto _exit; + } + if (pCreateReq->type == TSDB_CHILD_TABLE) { + buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name, + pCreateReq->ctb.tagName, pCreateReq->uid, pCreateReq->ctb.tagNum, tables); + } else if (pCreateReq->type == TSDB_NORMAL_TABLE) { + buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE, tables); + } + tDecoderClear(&decoder); + } + + string = cJSON_PrintUnformatted(tables); _exit: - tDecoderClear(&decoder); + cJSON_Delete(tables); return string; } @@ -1586,11 +1613,307 @@ end: return code; } +static int32_t tmqWriteRaqMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) { + int32_t code = TSDB_CODE_SUCCESS; + SHashObj* pVgHash = NULL; + SQuery* pQuery = NULL; + SMqTaosxRspObj rspObj = {0}; + SDecoder decoder = {0}; + STableMeta* pTableMeta = NULL; + + terrno = TSDB_CODE_SUCCESS; + SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT); + if (!pRequest) { + uError("WriteRaw:createRequest error request is null"); + return terrno; + } + + rspObj.resIter = -1; + rspObj.resType = RES_TYPE__TMQ_METADATA; + + tDecoderInit(&decoder, data, dataLen); + code = tDecodeSTaosxRsp(&decoder, &rspObj.rsp); + if (code != 0) { + uError("WriteRaw:decode smqDataRsp error"); + code = TSDB_CODE_INVALID_MSG; + goto end; + } + + if (!pRequest->pDb) { + uError("WriteRaw:not use db"); + code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; + goto end; + } + + pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + taosHashSetFreeFp(pVgHash, destroyVgHash); + struct SCatalog* pCatalog = NULL; + code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw: get gatlog error"); + goto end; + } + + SRequestConnInfo conn = {0}; + conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter; + conn.requestId = pRequest->requestId; + conn.requestObjRefId = pRequest->self; + conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); + + printf("raw data block num:%d\n", rspObj.rsp.blockNum); + while (++rspObj.resIter < rspObj.rsp.blockNum) { + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); + if (!rspObj.rsp.withSchema) { + uError("WriteRaw:no schema, iter:%d", rspObj.resIter); + goto end; + } + SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter); + setResSchemaInfo(&rspObj.resInfo, pSW->pSchema, pSW->nCols); + + code = setQueryResultFromRsp(&rspObj.resInfo, pRetrieve, false, false); + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw: setQueryResultFromRsp error"); + goto end; + } + + const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter); + if (!tbName) { + uError("WriteRaw: tbname is null"); + code = TSDB_CODE_TMQ_INVALID_MSG; + goto end; + } + + printf("raw data tbname:%s\n", tbName); + SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; + strcpy(pName.dbname, pRequest->pDb); + strcpy(pName.tname, tbName); + + VgData vgData = {0}; + code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &(vgData.vg)); + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName); + goto end; + } + + code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); + if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { + uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName); + code = TSDB_CODE_SUCCESS; + continue; + } + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbName); + goto end; + } + + uint16_t fLen = 0; + int32_t rowSize = 0; + int16_t nVar = 0; + for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) { + SSchema* schema = &pTableMeta->schema[i]; + fLen += TYPE_BYTES[schema->type]; + rowSize += schema->bytes; + if (IS_VAR_DATA_TYPE(schema->type)) { + nVar++; + } + } + + int32_t rows = rspObj.resInfo.numOfRows; + int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) + + (int32_t)TD_BITMAP_BYTES(pTableMeta->tableInfo.numOfColumns - 1); + + // find schema data info + int32_t schemaLen = 0; + void* schemaData = NULL; + for(int j = 0; j < rspObj.rsp.createTableNum; j++){ + void** dataTmp = taosArrayGet(rspObj.rsp.createTableReq, j); + int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j); + + SDecoder decoderTmp = {0}; + SVCreateTbReq* pCreateReq; + + tDecoderInit(&decoderTmp, *dataTmp, *lenTmp); + if (tDecodeSVCreateTbReq(&decoderTmp, pCreateReq) < 0) { + tDecoderClear(&decoderTmp); + goto end; + } + + ASSERT (pCreateReq->type == TSDB_CHILD_TABLE); + if(strcmp(tbName, pCreateReq->name) == 0){ + schemaLen = *lenTmp; + schemaData = *dataTmp; + tDecoderClear(&decoderTmp); + break; + } + tDecoderClear(&decoderTmp); + } + + int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize; + + SSubmitReq* subReq = NULL; + SSubmitBlk* blk = NULL; + void* hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId)); + if (hData) { + vgData = *(VgData*)hData; + + int32_t totalLen = ((SSubmitReq*)(vgData.data))->length + submitLen; + void* tmp = taosMemoryRealloc(vgData.data, totalLen); + if (tmp == NULL) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto end; + } + vgData.data = tmp; + ((VgData*)hData)->data = tmp; + subReq = (SSubmitReq*)(vgData.data); + blk = POINTER_SHIFT(vgData.data, subReq->length); + } else { + int32_t totalLen = sizeof(SSubmitReq) + submitLen; + void* tmp = taosMemoryCalloc(1, totalLen); + if (tmp == NULL) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto end; + } + vgData.data = tmp; + taosHashPut(pVgHash, (const char*)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char*)&vgData, sizeof(vgData)); + subReq = (SSubmitReq*)(vgData.data); + subReq->length = sizeof(SSubmitReq); + subReq->numOfBlocks = 0; + + blk = POINTER_SHIFT(vgData.data, sizeof(SSubmitReq)); + } + + // pSW->pSchema should be same as pTableMeta->schema + // ASSERT(pSW->nCols == pTableMeta->tableInfo.numOfColumns); + uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid); + uint64_t uid = pTableMeta->uid; + int16_t sver = pTableMeta->sversion; + + void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk)); + if(schemaData){ + memcpy(blkSchema, schemaData, schemaLen); + } + STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen); + + SRowBuilder rb = {0}; + tdSRowInit(&rb, sver); + tdSRowSetTpInfo(&rb, pTableMeta->tableInfo.numOfColumns, fLen); + int32_t totalLen = 0; + + SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + for (int i = 0; i < pSW->nCols; i++) { + SSchema* schema = &pSW->pSchema[i]; + taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t)); + } + + for (int32_t j = 0; j < rows; j++) { + tdSRowResetBuf(&rb, rowData); + + doSetOneRowPtr(&rspObj.resInfo); + rspObj.resInfo.current += 1; + + int32_t offset = 0; + for (int32_t k = 0; k < pTableMeta->tableInfo.numOfColumns; k++) { + const SSchema* pColumn = &pTableMeta->schema[k]; + int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name)); + if (!index) { + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); + } else { + char* colData = rspObj.resInfo.row[*index]; + if (!colData) { + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); + } else { + if (IS_VAR_DATA_TYPE(pColumn->type)) { + colData -= VARSTR_HEADER_SIZE; + } + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k); + } + } + + offset += TYPE_BYTES[pColumn->type]; + } + tdSRowEnd(&rb); + int32_t rowLen = TD_ROW_LEN(rowData); + rowData = POINTER_SHIFT(rowData, rowLen); + totalLen += rowLen; + } + + taosHashCleanup(schemaHash); + blk->uid = htobe64(uid); + blk->suid = htobe64(suid); + blk->sversion = htonl(sver); + blk->schemaLen = htonl(schemaLen); + blk->numOfRows = htonl(rows); + blk->dataLen = htonl(totalLen); + subReq->length += sizeof(SSubmitBlk) + schemaLen + totalLen; + subReq->numOfBlocks++; + taosMemoryFreeClear(pTableMeta); + } + + pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); + if (NULL == pQuery) { + uError("create SQuery error"); + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; + pQuery->haveResultSet = false; + pQuery->msgType = TDMT_VND_SUBMIT; + pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); + if (NULL == pQuery->pRoot) { + uError("create pQuery->pRoot error"); + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot); + nodeStmt->payloadType = PAYLOAD_TYPE_KV; + + int32_t numOfVg = taosHashGetSize(pVgHash); + nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES); + + VgData* vData = (VgData*)taosHashIterate(pVgHash, NULL); + while (vData) { + SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); + if (NULL == dst) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto end; + } + dst->vg = vData->vg; + SSubmitReq* subReq = (SSubmitReq*)(vData->data); + dst->numOfTables = subReq->numOfBlocks; + dst->size = subReq->length; + dst->pData = (char*)subReq; + vData->data = NULL; // no need free + subReq->header.vgId = htonl(dst->vg.vgId); + subReq->version = htonl(1); + subReq->header.contLen = htonl(subReq->length); + subReq->length = htonl(subReq->length); + subReq->numOfBlocks = htonl(subReq->numOfBlocks); + taosArrayPush(nodeStmt->pDataBlocks, &dst); + vData = (VgData*)taosHashIterate(pVgHash, vData); + } + + launchQueryImpl(pRequest, pQuery, true, NULL); + code = pRequest->code; + + end: + tDecoderClear(&decoder); + qDestroyQuery(pQuery); + destroyRequest(pRequest); + taosHashCleanup(pVgHash); + taosMemoryFreeClear(pTableMeta); + return code; +} + char* tmq_get_json_meta(TAOS_RES* res) { - if (!TD_RES_TMQ_META(res)) { + if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res)) { return NULL; } + if(TD_RES_TMQ_METADATA(res)){ + SMqTaosxRspObj* pMetaDataRspObj = (SMqTaosxRspObj*)res; + return processAutoCreateTable(&pMetaDataRspObj->rsp); + } + SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) { return processCreateStb(&pMetaRspObj->metaRsp); @@ -1638,6 +1961,25 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { raw->raw = buf; raw->raw_len = len; raw->raw_type = RES_TYPE__TMQ; + } else if (TD_RES_TMQ_METADATA(res)) { + SMqTaosxRspObj* rspObj = ((SMqTaosxRspObj*)res); + + int32_t len = 0; + int32_t code = 0; + tEncodeSize(tEncodeSTaosxRsp, &rspObj->rsp, len, code); + if (code < 0) { + return -1; + } + + void* buf = taosMemoryCalloc(1, len); + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, len); + tEncodeSTaosxRsp(&encoder, &rspObj->rsp); + tEncoderClear(&encoder); + + raw->raw = buf; + raw->raw_len = len; + raw->raw_type = RES_TYPE__TMQ_METADATA; } else { return TSDB_CODE_TMQ_INVALID_MSG; } @@ -1671,6 +2013,8 @@ int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) { return taosDeleteData(taos, raw.raw, raw.raw_len); } else if (raw.raw_type == RES_TYPE__TMQ) { return tmqWriteRawDataImpl(taos, raw.raw, raw.raw_len); + } else if (raw.raw_type == RES_TYPE__TMQ_METADATA) { + return tmqWriteRaqMetaDataImpl(taos, raw.raw, raw.raw_len); } return TSDB_CODE_INVALID_PARA; } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 9f9a14952e75bdac29564c39dd4ef60da0d07ef0..a84c36790a6ba1f19b4f3f1bc70f92898a4bd869 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1465,7 +1465,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) { SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj)); - pRspObj->resType = RES_TYPE__TAOSX; + pRspObj->resType = RES_TYPE__TMQ_METADATA; tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); pRspObj->vgId = pWrapper->vgHandle->vgId; @@ -1649,7 +1649,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { continue; } // build rsp - SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper); + SMqTaosxRspObj* pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper); taosFreeQitem(pollRspWrapper); return pRsp; } else { @@ -1769,11 +1769,11 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) { } else if (TD_RES_TMQ_META(res)) { SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) { - return TMQ_RES_TAOSX; + return TMQ_RES_DATA; } return TMQ_RES_TABLE_META; - } else if (TD_RES_TMQ_TAOSX(res)) { - return TMQ_RES_DATA; + } else if (TD_RES_TMQ_METADATA(res)) { + return TMQ_RES_METADATA; } else { return TMQ_RES_INVALID; }