diff --git a/include/client/taos.h b/include/client/taos.h index 49cfbb52b80e88103fe6befc6d2818641e731fcf..270b647a77d18fd10e97954e435000799bdd6007 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -254,7 +254,7 @@ enum tmq_res_t { TMQ_RES_INVALID = -1, TMQ_RES_DATA = 1, TMQ_RES_TABLE_META = 2, - TMQ_RES_TAOSX = 3, + TMQ_RES_METADATA = 3, }; typedef struct tmq_raw_data { diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index b8fa9580e70c1c7aa17a1402ce6c8113a7f8e094..574d8188fef5b2866b17f7a9f8f7842f162421a3 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -52,7 +52,7 @@ enum { RES_TYPE__QUERY = 1, RES_TYPE__TMQ, RES_TYPE__TMQ_META, - RES_TYPE__TAOSX, + RES_TYPE__TMQ_METADATA, }; #define SHOW_VARIABLES_RESULT_COLS 2 @@ -60,9 +60,9 @@ enum { #define SHOW_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE) #define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) -#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ || *(int8_t*)res == RES_TYPE__TAOSX) +#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) #define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META) -#define TD_RES_TMQ_TAOSX(res) (*(int8_t*)res == RES_TYPE__TAOSX) +#define TD_RES_TMQ_METADATA(res) (*(int8_t*)res == RES_TYPE__TMQ_METADATA) typedef struct SAppInstInfo SAppInstInfo; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 0e1d82b273523a02a40ee0605f7ab259013bfe2a..5128695359b9628d873af0a75373e530f78a6403 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -189,6 +189,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, tscError("%d failed to add to request container, reqId:0x%" PRIx64 ", conn:%d, %s", (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql); + taosMemoryFree(param); destroyRequest(*pRequest); *pRequest = NULL; return TSDB_CODE_TSC_OUT_OF_MEMORY; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 73636e7372b97d9cc13926ac9cafccef315e3d70..13cbaa0e2290eb186ed439ccdc965ebeb736a40f 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -148,7 +148,7 @@ int taos_errno(TAOS_RES *res) { return terrno; } - if (TD_RES_TMQ(res)) { + if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { return 0; } @@ -162,7 +162,7 @@ const char *taos_errstr(TAOS_RES *res) { return (const char *)tstrerror(terrno); } - if (TD_RES_TMQ(res)) { + if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { return "success"; } @@ -184,7 +184,7 @@ void taos_free_result(TAOS_RES *res) { SRequestObj *pRequest = (SRequestObj *)res; tscDebug("0x%" PRIx64 " taos_free_result start to free query", pRequest->requestId); destroyRequest(pRequest); - } else if (TD_RES_TMQ_TAOSX(res)) { + } else if (TD_RES_TMQ_METADATA(res)) { SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res; if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen); @@ -264,7 +264,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { return doFetchRows(pRequest, true, true); #endif - } else if (TD_RES_TMQ(res)) { + } else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { SMqRspObj *msg = ((SMqRspObj *)res); SReqResultInfo *pResultInfo; if (msg->resIter == -1) { @@ -437,7 +437,7 @@ const char *taos_data_type(int type) { const char *taos_get_client_info() { return version; } int taos_affected_rows(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res)) { + if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) { return 0; } @@ -454,7 +454,7 @@ int taos_result_precision(TAOS_RES *res) { if (TD_RES_QUERY(res)) { SRequestObj *pRequest = (SRequestObj *)res; return pRequest->body.resInfo.precision; - } else if (TD_RES_TMQ(res)) { + } else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { SReqResultInfo *info = tmqGetCurResInfo(res); return info->precision; } @@ -487,7 +487,7 @@ int taos_select_db(TAOS *taos, const char *db) { } void taos_stop_query(TAOS_RES *res) { - if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res)) { + if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) { return; } @@ -559,7 +559,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { (*rows) = pResultInfo->row; (*numOfRows) = pResultInfo->numOfRows; return pRequest->code; - } else if (TD_RES_TMQ(res)) { + } else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { SReqResultInfo *pResultInfo = tmqGetNextResInfo(res, true); if (pResultInfo == NULL) return -1; @@ -578,7 +578,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { return 0; } - if (TD_RES_TMQ(res)) { + if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { SReqResultInfo *pResultInfo = tmqGetNextResInfo(res, false); if (pResultInfo == NULL) { (*numOfRows) = 0; diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index c135965f07454c46e20780d029c18b2359110877..eb7b45cc05d6158368af00d7dd7bc7397d7f20b3 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -30,7 +30,7 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch char* string = NULL; cJSON* json = cJSON_CreateObject(); if (json == NULL) { - return string; + return NULL; } cJSON* type = cJSON_CreateString("create"); cJSON_AddItemToObject(json, "type", type); @@ -39,10 +39,10 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch // sprintf(uid, "%"PRIi64, id); // cJSON* id_ = cJSON_CreateString(uid); // cJSON_AddItemToObject(json, "id", id_); - cJSON* tableName = cJSON_CreateString(name); - cJSON_AddItemToObject(json, "tableName", tableName); cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super"); cJSON_AddItemToObject(json, "tableType", tableType); + cJSON* tableName = cJSON_CreateString(name); + cJSON_AddItemToObject(json, "tableName", tableName); // cJSON* version = cJSON_CreateNumber(1); // cJSON_AddItemToObject(json, "version", version); @@ -112,10 +112,10 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) { // cJSON_AddItemToObject(json, "uid", uid); SName name = {0}; tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - cJSON* tableName = cJSON_CreateString(name.tname); - cJSON_AddItemToObject(json, "tableName", tableName); cJSON* tableType = cJSON_CreateString("super"); cJSON_AddItemToObject(json, "tableType", tableType); + cJSON* tableName = cJSON_CreateString(name.tname); + cJSON_AddItemToObject(json, "tableName", tableName); cJSON* alterType = cJSON_CreateNumber(req.alterType); cJSON_AddItemToObject(json, "alterType", alterType); @@ -199,8 +199,6 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) { goto _err; } string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE); - tDecoderClear(&coder); - return string; _err: tDecoderClear(&coder); @@ -221,32 +219,22 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) { goto _err; } string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen); - tDecoderClear(&coder); - return string; _err: tDecoderClear(&coder); return string; } -static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* tagName, int64_t id, uint8_t tagNum) { - char* string = NULL; - SArray* pTagVals = NULL; - cJSON* json = cJSON_CreateObject(); - if (json == NULL) { - return string; - } - cJSON* type = cJSON_CreateString("create"); - cJSON_AddItemToObject(json, "type", type); - // char cid[32] = {0}; - // sprintf(cid, "%"PRIi64, id); - // cJSON* cid_ = cJSON_CreateString(cid); - // cJSON_AddItemToObject(json, "id", cid_); +static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq){ + STag* pTag = (STag*)pCreateReq->ctb.pTag; + char* sname = pCreateReq->ctb.name; + char* name = pCreateReq->name; + SArray* tagName = pCreateReq->ctb.tagName; + int64_t id = pCreateReq->uid; + uint8_t tagNum = pCreateReq->ctb.tagNum; cJSON* tableName = cJSON_CreateString(name); cJSON_AddItemToObject(json, "tableName", tableName); - cJSON* tableType = cJSON_CreateString("child"); - cJSON_AddItemToObject(json, "tableType", tableType); cJSON* using = cJSON_CreateString(sname); cJSON_AddItemToObject(json, "using", using); cJSON* tagNumJson = cJSON_CreateNumber(tagNum); @@ -255,6 +243,7 @@ static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* // cJSON_AddItemToObject(json, "version", version); cJSON* tags = cJSON_CreateArray(); + SArray* pTagVals = NULL; int32_t code = tTagToValArray(pTag, &pTagVals); if (code) { goto end; @@ -313,11 +302,37 @@ static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* cJSON_AddItemToArray(tags, tag); } -end: + end: cJSON_AddItemToObject(json, "tags", tags); + taosArrayDestroy(pTagVals); +} + +static char* buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs) { + char* string = NULL; + cJSON* json = cJSON_CreateObject(); + if (json == NULL) { + return NULL; + } + cJSON* type = cJSON_CreateString("create"); + cJSON_AddItemToObject(json, "type", type); + // char cid[32] = {0}; + // sprintf(cid, "%"PRIi64, id); + // cJSON* cid_ = cJSON_CreateString(cid); + // cJSON_AddItemToObject(json, "id", cid_); + + cJSON* tableType = cJSON_CreateString("child"); + cJSON_AddItemToObject(json, "tableType", tableType); + + buildChildElement(json, pCreateReq); + cJSON* createList = cJSON_CreateArray(); + for(int i = 0; nReqs > 1 && i < nReqs; i++){ + cJSON* create = cJSON_CreateObject(); + buildChildElement(create, pCreateReq + i); + cJSON_AddItemToArray(createList, create); + } + cJSON_AddItemToObject(json, "createList", createList); string = cJSON_PrintUnformatted(json); cJSON_Delete(json); - taosArrayDestroy(pTagVals); return string; } @@ -335,21 +350,58 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) { } // loop to create table - for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { - pCreateReq = req.pReqs + iReq; + if (req.nReqs > 0) { + pCreateReq = req.pReqs; if (pCreateReq->type == TSDB_CHILD_TABLE) { - string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name, - pCreateReq->ctb.tagName, pCreateReq->uid, pCreateReq->ctb.tagNum); + string = buildCreateCTableJson(req.pReqs, req.nReqs); } else if (pCreateReq->type == TSDB_NORMAL_TABLE) { - string = - buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE); + string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE); } } +_exit: + for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { + pCreateReq = req.pReqs + iReq; + taosMemoryFreeClear(pCreateReq->comment); + if (pCreateReq->type == TSDB_CHILD_TABLE) { + taosArrayDestroy(pCreateReq->ctb.tagName); + } + } tDecoderClear(&decoder); + return string; +} + +static char* processAutoCreateTable(STaosxRsp* rsp) { + ASSERT(rsp->createTableNum != 0); + + SDecoder* decoder = taosMemoryCalloc(rsp->createTableNum, sizeof(SDecoder)); + SVCreateTbReq* pCreateReq = taosMemoryCalloc(rsp->createTableNum, sizeof(SVCreateTbReq)); + char* string = NULL; + + // loop to create table + for (int32_t iReq = 0; iReq < rsp->createTableNum; iReq++) { + // decode + void** data = taosArrayGet(rsp->createTableReq, iReq); + int32_t *len = taosArrayGet(rsp->createTableLen, iReq); + tDecoderInit(&decoder[iReq], *data, *len); + if (tDecodeSVCreateTbReq(&decoder[iReq], pCreateReq + iReq) < 0) { + goto _exit; + } + + ASSERT(pCreateReq[iReq].type == TSDB_CHILD_TABLE); + } + string = buildCreateCTableJson(pCreateReq, rsp->createTableNum); _exit: - tDecoderClear(&decoder); + for(int i = 0; i < rsp->createTableNum; i++){ + tDecoderClear(&decoder[i]); + taosMemoryFreeClear(pCreateReq[i].comment); + if (pCreateReq[i].type == TSDB_CHILD_TABLE) { + taosArrayDestroy(pCreateReq[i].ctb.tagName); + } + } + taosMemoryFree(decoder); + taosMemoryFree(pCreateReq); return string; } @@ -374,10 +426,10 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) { cJSON_AddItemToObject(json, "type", type); // cJSON* uid = cJSON_CreateNumber(id); // cJSON_AddItemToObject(json, "uid", uid); - cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName); - cJSON_AddItemToObject(json, "tableName", tableName); cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ? "child" : "normal"); cJSON_AddItemToObject(json, "tableType", tableType); + cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName); + cJSON_AddItemToObject(json, "tableName", tableName); cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action); cJSON_AddItemToObject(json, "alterType", alterType); @@ -462,6 +514,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) { string = cJSON_PrintUnformatted(json); _exit: + cJSON_Delete(json); tDecoderClear(&decoder); return string; } @@ -485,14 +538,15 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) { } cJSON* type = cJSON_CreateString("drop"); cJSON_AddItemToObject(json, "type", type); - cJSON* tableName = cJSON_CreateString(req.name); - cJSON_AddItemToObject(json, "tableName", tableName); cJSON* tableType = cJSON_CreateString("super"); cJSON_AddItemToObject(json, "tableType", tableType); + cJSON* tableName = cJSON_CreateString(req.name); + cJSON_AddItemToObject(json, "tableName", tableName); string = cJSON_PrintUnformatted(json); _exit: + cJSON_Delete(json); tDecoderClear(&decoder); return string; } @@ -533,6 +587,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) { string = cJSON_PrintUnformatted(json); _exit: + cJSON_Delete(json); tDecoderClear(&decoder); return string; } @@ -549,6 +604,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { goto end; } + pRequest->syncQuery = true; if (!pRequest->pDb) { code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; goto end; @@ -637,6 +693,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { goto end; } + pRequest->syncQuery = true; if (!pRequest->pDb) { code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; goto end; @@ -717,6 +774,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { goto end; } + pRequest->syncQuery = true; if (!pRequest->pDb) { code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; goto end; @@ -830,6 +888,14 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { code = pRequest->code; end: + for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { + pCreateReq = req.pReqs + iReq; + taosMemoryFreeClear(pCreateReq->comment); + if (pCreateReq->type == TSDB_CHILD_TABLE) { + taosArrayDestroy(pCreateReq->ctb.tagName); + } + } + taosHashCleanup(pVgroupHashmap); destroyRequest(pRequest); tDecoderClear(&coder); @@ -860,7 +926,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { if (code != TSDB_CODE_SUCCESS) { goto end; } - + pRequest->syncQuery = true; if (!pRequest->pDb) { code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; goto end; @@ -1033,6 +1099,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { goto end; } + pRequest->syncQuery = true; if (!pRequest->pDb) { code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; goto end; @@ -1152,6 +1219,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) goto end; } + pRequest->syncQuery = true; if (!pRequest->pDb) { uError("WriteRaw:not use db"); code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; @@ -1339,6 +1407,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { return terrno; } + pRequest->syncQuery = true; rspObj.resIter = -1; rspObj.resType = RES_TYPE__TMQ; @@ -1529,6 +1598,8 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { subReq->length += sizeof(SSubmitBlk) + schemaLen + totalLen; subReq->numOfBlocks++; taosMemoryFreeClear(pTableMeta); + rspObj.resInfo.pRspMsg = NULL; + doFreeReqResultInfo(&rspObj.resInfo); } pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); @@ -1578,6 +1649,313 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { code = pRequest->code; end: + tDeleteSMqDataRsp(&rspObj.rsp); + rspObj.resInfo.pRspMsg = NULL; + doFreeReqResultInfo(&rspObj.resInfo); + tDecoderClear(&decoder); + qDestroyQuery(pQuery); + destroyRequest(pRequest); + taosHashCleanup(pVgHash); + taosMemoryFreeClear(pTableMeta); + return code; +} + +static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) { + int32_t code = TSDB_CODE_SUCCESS; + SHashObj* pVgHash = NULL; + SQuery* pQuery = NULL; + SMqTaosxRspObj rspObj = {0}; + SDecoder decoder = {0}; + STableMeta* pTableMeta = NULL; + + terrno = TSDB_CODE_SUCCESS; + SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT); + if (!pRequest) { + uError("WriteRaw:createRequest error request is null"); + return terrno; + } + + pRequest->syncQuery = true; + rspObj.resIter = -1; + rspObj.resType = RES_TYPE__TMQ_METADATA; + + tDecoderInit(&decoder, data, dataLen); + code = tDecodeSTaosxRsp(&decoder, &rspObj.rsp); + if (code != 0) { + uError("WriteRaw:decode smqDataRsp error"); + code = TSDB_CODE_INVALID_MSG; + goto end; + } + + if (!pRequest->pDb) { + uError("WriteRaw:not use db"); + code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; + goto end; + } + + pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + taosHashSetFreeFp(pVgHash, destroyVgHash); + struct SCatalog* pCatalog = NULL; + code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw: get gatlog error"); + goto end; + } + + SRequestConnInfo conn = {0}; + conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter; + conn.requestId = pRequest->requestId; + conn.requestObjRefId = pRequest->self; + conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); + + printf("raw data block num:%d\n", rspObj.rsp.blockNum); + while (++rspObj.resIter < rspObj.rsp.blockNum) { + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); + if (!rspObj.rsp.withSchema) { + uError("WriteRaw:no schema, iter:%d", rspObj.resIter); + goto end; + } + SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter); + setResSchemaInfo(&rspObj.resInfo, pSW->pSchema, pSW->nCols); + + code = setQueryResultFromRsp(&rspObj.resInfo, pRetrieve, false, false); + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw: setQueryResultFromRsp error"); + goto end; + } + + const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter); + if (!tbName) { + uError("WriteRaw: tbname is null"); + code = TSDB_CODE_TMQ_INVALID_MSG; + goto end; + } + + printf("raw data tbname:%s\n", tbName); + SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; + strcpy(pName.dbname, pRequest->pDb); + strcpy(pName.tname, tbName); + + VgData vgData = {0}; + code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &(vgData.vg)); + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName); + goto end; + } + + // find schema data info + int32_t schemaLen = 0; + void* schemaData = NULL; + for(int j = 0; j < rspObj.rsp.createTableNum; j++){ + void** dataTmp = taosArrayGet(rspObj.rsp.createTableReq, j); + int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j); + + SDecoder decoderTmp = {0}; + SVCreateTbReq pCreateReq = {0}; + + tDecoderInit(&decoderTmp, *dataTmp, *lenTmp); + if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) { + tDecoderClear(&decoderTmp); + taosMemoryFreeClear(pCreateReq.comment); + taosArrayDestroy(pCreateReq.ctb.tagName); + goto end; + } + + ASSERT (pCreateReq.type == TSDB_CHILD_TABLE); + if(strcmp(tbName, pCreateReq.name) == 0){ + schemaLen = *lenTmp; + schemaData = *dataTmp; + strcpy(pName.tname, pCreateReq.ctb.name); + tDecoderClear(&decoderTmp); + taosMemoryFreeClear(pCreateReq.comment); + taosArrayDestroy(pCreateReq.ctb.tagName); + break; + } + tDecoderClear(&decoderTmp); + taosMemoryFreeClear(pCreateReq.comment); + taosArrayDestroy(pCreateReq.ctb.tagName); + } + + code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); + if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { + uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName); + code = TSDB_CODE_SUCCESS; + continue; + } + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbName); + goto end; + } + + uint16_t fLen = 0; + int32_t rowSize = 0; + int16_t nVar = 0; + for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) { + SSchema* schema = &pTableMeta->schema[i]; + fLen += TYPE_BYTES[schema->type]; + rowSize += schema->bytes; + if (IS_VAR_DATA_TYPE(schema->type)) { + nVar++; + } + } + + int32_t rows = rspObj.resInfo.numOfRows; + int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) + + (int32_t)TD_BITMAP_BYTES(pTableMeta->tableInfo.numOfColumns - 1); + + int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize; + + SSubmitReq* subReq = NULL; + SSubmitBlk* blk = NULL; + void* hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId)); + if (hData) { + vgData = *(VgData*)hData; + + int32_t totalLen = ((SSubmitReq*)(vgData.data))->length + submitLen; + void* tmp = taosMemoryRealloc(vgData.data, totalLen); + if (tmp == NULL) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto end; + } + vgData.data = tmp; + ((VgData*)hData)->data = tmp; + subReq = (SSubmitReq*)(vgData.data); + blk = POINTER_SHIFT(vgData.data, subReq->length); + } else { + int32_t totalLen = sizeof(SSubmitReq) + submitLen; + void* tmp = taosMemoryCalloc(1, totalLen); + if (tmp == NULL) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto end; + } + vgData.data = tmp; + taosHashPut(pVgHash, (const char*)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char*)&vgData, sizeof(vgData)); + subReq = (SSubmitReq*)(vgData.data); + subReq->length = sizeof(SSubmitReq); + subReq->numOfBlocks = 0; + + blk = POINTER_SHIFT(vgData.data, sizeof(SSubmitReq)); + } + + // pSW->pSchema should be same as pTableMeta->schema + // ASSERT(pSW->nCols == pTableMeta->tableInfo.numOfColumns); + uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid); + uint64_t uid = pTableMeta->uid; + int16_t sver = pTableMeta->sversion; + + void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk)); + if(schemaData){ + memcpy(blkSchema, schemaData, schemaLen); + } + STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen); + + SRowBuilder rb = {0}; + tdSRowInit(&rb, sver); + tdSRowSetTpInfo(&rb, pTableMeta->tableInfo.numOfColumns, fLen); + int32_t totalLen = 0; + + SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + for (int i = 0; i < pSW->nCols; i++) { + SSchema* schema = &pSW->pSchema[i]; + taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t)); + } + + for (int32_t j = 0; j < rows; j++) { + tdSRowResetBuf(&rb, rowData); + + doSetOneRowPtr(&rspObj.resInfo); + rspObj.resInfo.current += 1; + + int32_t offset = 0; + for (int32_t k = 0; k < pTableMeta->tableInfo.numOfColumns; k++) { + const SSchema* pColumn = &pTableMeta->schema[k]; + int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name)); + if (!index) { + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); + } else { + char* colData = rspObj.resInfo.row[*index]; + if (!colData) { + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); + } else { + if (IS_VAR_DATA_TYPE(pColumn->type)) { + colData -= VARSTR_HEADER_SIZE; + } + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k); + } + } + + offset += TYPE_BYTES[pColumn->type]; + } + tdSRowEnd(&rb); + int32_t rowLen = TD_ROW_LEN(rowData); + rowData = POINTER_SHIFT(rowData, rowLen); + totalLen += rowLen; + } + + taosHashCleanup(schemaHash); + blk->uid = htobe64(uid); + blk->suid = htobe64(suid); + blk->sversion = htonl(sver); + blk->schemaLen = htonl(schemaLen); + blk->numOfRows = htonl(rows); + blk->dataLen = htonl(totalLen); + subReq->length += sizeof(SSubmitBlk) + schemaLen + totalLen; + subReq->numOfBlocks++; + taosMemoryFreeClear(pTableMeta); + rspObj.resInfo.pRspMsg = NULL; + doFreeReqResultInfo(&rspObj.resInfo); + } + + pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); + if (NULL == pQuery) { + uError("create SQuery error"); + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; + pQuery->haveResultSet = false; + pQuery->msgType = TDMT_VND_SUBMIT; + pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); + if (NULL == pQuery->pRoot) { + uError("create pQuery->pRoot error"); + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot); + nodeStmt->payloadType = PAYLOAD_TYPE_KV; + + int32_t numOfVg = taosHashGetSize(pVgHash); + nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES); + + VgData* vData = (VgData*)taosHashIterate(pVgHash, NULL); + while (vData) { + SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); + if (NULL == dst) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto end; + } + dst->vg = vData->vg; + SSubmitReq* subReq = (SSubmitReq*)(vData->data); + dst->numOfTables = subReq->numOfBlocks; + dst->size = subReq->length; + dst->pData = (char*)subReq; + vData->data = NULL; // no need free + subReq->header.vgId = htonl(dst->vg.vgId); + subReq->version = htonl(1); + subReq->header.contLen = htonl(subReq->length); + subReq->length = htonl(subReq->length); + subReq->numOfBlocks = htonl(subReq->numOfBlocks); + taosArrayPush(nodeStmt->pDataBlocks, &dst); + vData = (VgData*)taosHashIterate(pVgHash, vData); + } + + launchQueryImpl(pRequest, pQuery, true, NULL); + code = pRequest->code; + + end: + tDeleteSTaosxRsp(&rspObj.rsp); + rspObj.resInfo.pRspMsg = NULL; + doFreeReqResultInfo(&rspObj.resInfo); tDecoderClear(&decoder); qDestroyQuery(pQuery); destroyRequest(pRequest); @@ -1587,10 +1965,15 @@ end: } char* tmq_get_json_meta(TAOS_RES* res) { - if (!TD_RES_TMQ_META(res)) { + if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res)) { return NULL; } + if(TD_RES_TMQ_METADATA(res)){ + SMqTaosxRspObj* pMetaDataRspObj = (SMqTaosxRspObj*)res; + return processAutoCreateTable(&pMetaDataRspObj->rsp); + } + SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) { return processCreateStb(&pMetaRspObj->metaRsp); @@ -1638,6 +2021,25 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { raw->raw = buf; raw->raw_len = len; raw->raw_type = RES_TYPE__TMQ; + } else if (TD_RES_TMQ_METADATA(res)) { + SMqTaosxRspObj* rspObj = ((SMqTaosxRspObj*)res); + + int32_t len = 0; + int32_t code = 0; + tEncodeSize(tEncodeSTaosxRsp, &rspObj->rsp, len, code); + if (code < 0) { + return -1; + } + + void* buf = taosMemoryCalloc(1, len); + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, len); + tEncodeSTaosxRsp(&encoder, &rspObj->rsp); + tEncoderClear(&encoder); + + raw->raw = buf; + raw->raw_len = len; + raw->raw_type = RES_TYPE__TMQ_METADATA; } else { return TSDB_CODE_TMQ_INVALID_MSG; } @@ -1645,7 +2047,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { } void tmq_free_raw(tmq_raw_data raw) { - if (raw.raw_type == RES_TYPE__TMQ) { + if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) { taosMemoryFree(raw.raw); } } @@ -1671,6 +2073,8 @@ int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) { return taosDeleteData(taos, raw.raw, raw.raw_len); } else if (raw.raw_type == RES_TYPE__TMQ) { return tmqWriteRawDataImpl(taos, raw.raw, raw.raw_len); + } else if (raw.raw_type == RES_TYPE__TMQ_METADATA) { + return tmqWriteRawMetaDataImpl(taos, raw.raw, raw.raw_len); } return TSDB_CODE_INVALID_PARA; } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index fc95bd86dcbbe991dd4c006026d055deccd8e7b0..c9c02a77e19b1984c39d3f4752620c505a69da37 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -515,6 +515,10 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg; topic = pMetaRspObj->topic; vgId = pMetaRspObj->vgId; + } else if(TD_RES_TMQ_METADATA(msg)) { + SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg; + topic = pRspObj->topic; + vgId = pRspObj->vgId; } else { return TSDB_CODE_TMQ_INVALID_MSG; } @@ -1471,16 +1475,16 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) { SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj)); - pRspObj->resType = RES_TYPE__TAOSX; + pRspObj->resType = RES_TYPE__TMQ_METADATA; tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); pRspObj->vgId = pWrapper->vgHandle->vgId; pRspObj->resIter = -1; - memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqTaosxRspObj)); + memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp)); pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; - if (!pWrapper->dataRsp.withSchema) { + if (!pWrapper->taosxRsp.withSchema) { setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols); } @@ -1654,8 +1658,14 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { rspWrapper = NULL; continue; } + // build rsp - SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper); + void* pRsp = NULL; + if(pollRspWrapper->taosxRsp.createTableNum == 0){ + pRsp = tmqBuildRspFromWrapper(pollRspWrapper); + }else{ + pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper); + } taosFreeQitem(pollRspWrapper); return pRsp; } else { @@ -1775,11 +1785,11 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) { } else if (TD_RES_TMQ_META(res)) { SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) { - return TMQ_RES_TAOSX; + return TMQ_RES_DATA; } return TMQ_RES_TABLE_META; - } else if (TD_RES_TMQ_TAOSX(res)) { - return TMQ_RES_DATA; + } else if (TD_RES_TMQ_METADATA(res)) { + return TMQ_RES_METADATA; } else { return TMQ_RES_INVALID; } @@ -1792,6 +1802,9 @@ const char* tmq_get_topic_name(TAOS_RES* res) { } else if (TD_RES_TMQ_META(res)) { SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; return strchr(pMetaRspObj->topic, '.') + 1; + } else if (TD_RES_TMQ_METADATA(res)) { + SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res; + return strchr(pRspObj->topic, '.') + 1; } else { return NULL; } @@ -1804,6 +1817,9 @@ const char* tmq_get_db_name(TAOS_RES* res) { } else if (TD_RES_TMQ_META(res)) { SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; return strchr(pMetaRspObj->db, '.') + 1; + } else if (TD_RES_TMQ_METADATA(res)) { + SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res; + return strchr(pRspObj->db, '.') + 1; } else { return NULL; } @@ -1816,6 +1832,9 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { } else if (TD_RES_TMQ_META(res)) { SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; return pMetaRspObj->vgId; + } else if (TD_RES_TMQ_METADATA(res)) { + SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res; + return pRspObj->vgId; } else { return -1; } @@ -1829,7 +1848,14 @@ const char* tmq_get_table_name(TAOS_RES* res) { return NULL; } return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter); - } + } else if (TD_RES_TMQ_METADATA(res)) { + SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res; + if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 || + pRspObj->resIter >= pRspObj->rsp.blockNum) { + return NULL; + } + return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter); + } return NULL; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index fffaf214edbca55b8bc02c09c090725599872c97..09908150248c7ad1f098d4751ea5fee655ad078f 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6018,12 +6018,18 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) { void tDeleteSTaosxRsp(STaosxRsp *pRsp) { taosArrayDestroy(pRsp->blockDataLen); + pRsp->blockDataLen = NULL; taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree); + pRsp->blockData = NULL; taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper); + pRsp->blockSchema = NULL; taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree); + pRsp->blockTbName = NULL; taosArrayDestroy(pRsp->createTableLen); + pRsp->createTableLen = NULL; taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree); + pRsp->createTableReq = NULL; } int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index a9723abacc19835aa55db5f9a21f8b3d679f02e8..537c9ca83480f0b23120f9d60920de98c763c00c 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -540,7 +540,7 @@ typedef struct { } SMqConsumerEp; SMqConsumerEp* tCloneSMqConsumerEp(const SMqConsumerEp* pEp); -void tDeleteSMqConsumerEp(SMqConsumerEp* pEp); +void tDeleteSMqConsumerEp(void* pEp); int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pEp); void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 4470a6b064f63e6b10d866495ce8579a5fd39e04..cdfa20866599925445b324bddca2049478bc0568 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -197,11 +197,12 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg)); pLostMsg->consumerId = pConsumer->consumerId; - SRpcMsg *pRpcMsg = taosMemoryCalloc(1, sizeof(SRpcMsg)); - pRpcMsg->msgType = TDMT_MND_MQ_CONSUMER_LOST; - pRpcMsg->pCont = pLostMsg; - pRpcMsg->contLen = sizeof(SMqConsumerLostMsg); - tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, pRpcMsg); + SRpcMsg pRpcMsg = { + .msgType = TDMT_MND_MQ_CONSUMER_LOST, + .pCont = pLostMsg, + .contLen = sizeof(SMqConsumerLostMsg), + }; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg); } if (status == MQ_CONSUMER_STATUS__LOST_REBD || status == MQ_CONSUMER_STATUS__READY) { // do nothing @@ -280,11 +281,12 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); pRecoverMsg->consumerId = consumerId; - SRpcMsg *pRpcMsg = taosMemoryCalloc(1, sizeof(SRpcMsg)); - pRpcMsg->msgType = TDMT_MND_MQ_CONSUMER_RECOVER; - pRpcMsg->pCont = pRecoverMsg; - pRpcMsg->contLen = sizeof(SMqConsumerRecoverMsg); - tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, pRpcMsg); + SRpcMsg pRpcMsg = { + .msgType = TDMT_MND_MQ_CONSUMER_RECOVER, + .pCont = pRecoverMsg, + .contLen = sizeof(SMqConsumerRecoverMsg), + }; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg); } mndReleaseConsumer(pMnode, pConsumer); @@ -318,11 +320,12 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); pRecoverMsg->consumerId = consumerId; - SRpcMsg *pRpcMsg = taosMemoryCalloc(1, sizeof(SRpcMsg)); - pRpcMsg->msgType = TDMT_MND_MQ_CONSUMER_RECOVER; - pRpcMsg->pCont = pRecoverMsg; - pRpcMsg->contLen = sizeof(SMqConsumerRecoverMsg); - tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, pRpcMsg); + SRpcMsg pRpcMsg = { + .msgType = TDMT_MND_MQ_CONSUMER_RECOVER, + .pCont = pRecoverMsg, + .contLen = sizeof(SMqConsumerRecoverMsg), + }; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg); } #endif diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index f1280700f0969e252d04a0531638029bfc9bb676..50d26ce9a51dfb8488020a0fd257074f723bbbb6 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -338,8 +338,8 @@ SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) { return pConsumerEpNew; } -void tDeleteSMqConsumerEp(SMqConsumerEp *pConsumerEp) { - // +void tDeleteSMqConsumerEp(void *data) { + SMqConsumerEp *pConsumerEp = (SMqConsumerEp*)data; taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp); } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 0e85e7bfb60313b106d8838986fa685eedf2c409..4c8045d651b8f895113b3349a7dd39a7958d487e 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -184,7 +184,7 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); -SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, +SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName, SBatchDeleteReq* pDeleteReq); // sma diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index a34569b08ecdd87483dbf31e8c6d7e406e8ae766..24ade017d37f73e83b09578ea5b2211c5b2c43a5 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -319,8 +319,12 @@ _query: pSchema = tCloneSSchemaWrapper(&meNew.stbEntry.schemaRow); tDecoderClear(&dcNew); tdbTbcClose(pCur); + tdbFree(pKey); + tdbFree(pVal); goto _exit; } + tdbFree(pKey); + tdbFree(pVal); tdbTbcClose(pCur); } } else if (me.type == TSDB_CHILD_TABLE) { @@ -347,11 +351,13 @@ _query: tDecoderClear(&dc); _exit: + tDecoderClear(&dc); metaULock(pMeta); tdbFree(pData); return pSchema; _err: + tDecoderClear(&dc); metaULock(pMeta); tdbFree(pData); return NULL; @@ -383,10 +389,8 @@ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList) { ttlKey = *(STtlIdxKey *)pKey; taosArrayPush(uidList, &ttlKey.uid); } - tdbTbcClose(pCur); - tdbFree(pKey); - + tdbTbcClose(pCur); return 0; } diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index 0edbd092e6b06883cc1e2b6be66e0ea55b8563a1..9fdbe50f88ab9fbdd2570ea76f51d7cb26772d06 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -353,6 +353,8 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version, idData->index); } + tdbFree(pKey); + tdbFree(pVal); return TDB_CODE_SUCCESS; } @@ -528,6 +530,7 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in } } } + taosArrayDestroy(pTagVals); } // SIdInfo* sidInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &me.ctbEntry.suid, sizeof(tb_uid_t)); // if(sidInfo->version >= idInfo->version){ diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 22ec8118a2911f738b2aa7ce27f8b32b7fd4d461..ce2a1c4b89336dcd48ae481960e614978f532030 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -1192,10 +1192,11 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { const void *pTagData = NULL; // int32_t nTagData = 0; SDecoder dc = {0}; - + int32_t ret = 0; // get super table if (tdbTbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData) != 0) { - return -1; + ret = -1; + goto end; } tbDbKey.uid = pCtbEntry->ctbEntry.suid; tbDbKey.version = ((SUidIdxVal *)pData)[0].version; @@ -1221,17 +1222,20 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { // nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len; pTagData = pCtbEntry->ctbEntry.pTags; nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len; - return metaSaveJsonVarToIdx(pMeta, pCtbEntry, pTagColumn); + ret = metaSaveJsonVarToIdx(pMeta, pCtbEntry, pTagColumn); + goto end; } if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type, pCtbEntry->uid, &pTagIdxKey, &nTagIdxKey) < 0) { - return -1; + ret = -1; + goto end; } tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn); +end: metaDestroyTagIdxKey(pTagIdxKey); tDecoderClear(&dc); tdbFree(pData); - return 0; + return ret; } static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) { diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index e2cb51f586b2ec0306d15a39ed3048a321ff1179..6c32fbbc847c2cb1d65831c34689cfc6939e7497 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -204,7 +204,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char } SBatchDeleteReq deleteReq; - SSubmitReq *pSubmitReq = tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true, + SSubmitReq *pSubmitReq = tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true, pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq); if (!pSubmitReq) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index cbe916d0f772eea4d0cb238c0369b49df7b63b51..98b7dd7163089e5a7e067ca069fb5f61959a74d3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -51,6 +51,20 @@ void tqCleanUp() { } } +static void destroySTqHandle(void* data) { + STqHandle* pData = (STqHandle*)data; + qDestroyTask(pData->execHandle.task); + if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) { + tqCloseReader(pData->execHandle.pExecReader); + walCloseReader(pData->pWalReader); + taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid); + } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE){ + walCloseReader(pData->pWalReader); + tqCloseReader(pData->execHandle.pExecReader); + } +} + STQ* tqOpen(const char* path, SVnode* pVnode) { STQ* pTq = taosMemoryCalloc(1, sizeof(STQ)); if (pTq == NULL) { @@ -62,6 +76,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); + taosHashSetFreeFp(pTq->pHandle, destroySTqHandle); + pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); @@ -517,6 +533,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { int64_t fetchVer = fetchOffsetNew.version + 1; pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { + tDeleteSTaosxRsp(&taosxRsp); return -1; } @@ -577,14 +594,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) { code = -1; taosMemoryFree(pCkHead); + tDeleteSTaosxRsp(&taosxRsp); return code; } code = 0; if (pCkHead) taosMemoryFree(pCkHead); + tDeleteSTaosxRsp(&taosxRsp); return code; } } } + tDeleteSTaosxRsp(&taosxRsp); return 0; } diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index d00907f6778d176d1eabe77f92a243a1c5dd3cbd..a24f92023584710e642d931131d9f707a08c52da 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -243,14 +243,15 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp } if (pHandle->fetchMeta) { SSubmitBlk* pBlk = pReader->pBlock; - if (pBlk->schemaLen > 0) { + int32_t schemaLen = htonl(pBlk->schemaLen); + if (schemaLen > 0) { if (pRsp->createTableNum == 0) { pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); } - void* createReq = taosMemoryCalloc(1, pBlk->schemaLen); - memcpy(createReq, pBlk->data, pBlk->schemaLen); - taosArrayPush(pRsp->createTableLen, &pBlk->schemaLen); + void* createReq = taosMemoryCalloc(1, schemaLen); + memcpy(createReq, pBlk->data, schemaLen); + taosArrayPush(pRsp->createTableLen, &schemaLen); taosArrayPush(pRsp->createTableReq, &createReq); pRsp->createTableNum++; } @@ -277,14 +278,15 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp } if (pHandle->fetchMeta) { SSubmitBlk* pBlk = pReader->pBlock; - if (pBlk->schemaLen > 0) { + int32_t schemaLen = htonl(pBlk->schemaLen); + if (schemaLen > 0) { if (pRsp->createTableNum == 0) { pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); } - void* createReq = taosMemoryCalloc(1, pBlk->schemaLen); - memcpy(createReq, pBlk->data, pBlk->schemaLen); - taosArrayPush(pRsp->createTableLen, &pBlk->schemaLen); + void* createReq = taosMemoryCalloc(1, schemaLen); + memcpy(createReq, pBlk->data, schemaLen); + taosArrayPush(pRsp->createTableLen, &schemaLen); taosArrayPush(pRsp->createTableReq, &createReq); pRsp->createTableNum++; } diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index d8d8025151a6b366914aa1549711af5d1862637c..7097591c35820398547bc6093ee74c4e89a02ad8 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -145,6 +145,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) { ASSERT(0); tqError("write offset incomplete, len %d, write len %" PRId64, bodyLen, writeLen); taosHashCancelIterate(pStore->pHash, pIter); + taosMemoryFree(buf); return -1; } taosMemoryFree(buf); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 375130fa2c34f5b954e3fe3710914dc01e8f6363..8e5f328efa14454f9d89de3f8e77bcb8a368dbd9 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -80,6 +80,13 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ void* buf = taosMemoryMalloc(tlen); if (NULL == buf) { taosArrayDestroy(reqNew.pArray); + for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { + pCreateReq = req.pReqs + iReq; + taosMemoryFreeClear(pCreateReq->comment); + if (pCreateReq->type == TSDB_CHILD_TABLE) { + taosArrayDestroy(pCreateReq->ctb.tagName); + } + } goto end; } SEncoder coderNew = {0}; @@ -91,6 +98,14 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ taosMemoryFree(buf); taosArrayDestroy(reqNew.pArray); } + + for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { + pCreateReq = req.pReqs + iReq; + taosMemoryFreeClear(pCreateReq->comment); + if (pCreateReq->type == TSDB_CHILD_TABLE) { + taosArrayDestroy(pCreateReq->ctb.tagName); + } + } } else if (msgType == TDMT_VND_ALTER_TABLE) { SVAlterTbReq req = {0}; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 522bf46aa1fb9d28225f3118b9b6e1bed7a6cd75..5ddca1f37bc4b8d9ac2dbde00c63956e8cdbdc75 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -48,7 +48,7 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl return 0; } -SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, bool createTb, +SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName, SBatchDeleteReq* pDeleteReq) { SSubmitReq* ret = NULL; SArray* schemaReqs = NULL; @@ -89,6 +89,32 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem return NULL; } + SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); + char tagNameStr[TSDB_COL_NAME_LEN] = {0}; + strcpy(tagNameStr, "group_id"); + taosArrayPush(tagName, tagNameStr); + +// STag* pTag = NULL; +// taosArrayClear(tagArray); +// SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); +// for(int j = 0; j < pTagSchemaWrapper->nCols; j++){ +// STagVal tagVal = { +// .cid = pTagSchemaWrapper->pSchema[j].colId, +// .type = pTagSchemaWrapper->pSchema[j].type, +// .i64 = (int64_t)pDataBlock->info.groupId, +// }; +// taosArrayPush(tagArray, &tagVal); +// taosArrayPush(tagName, pTagSchemaWrapper->pSchema[j].name); +// } +// +// tTagNew(tagArray, 1, false, &pTag); +// if (pTag == NULL) { +// terrno = TSDB_CODE_OUT_OF_MEMORY; +// taosArrayDestroy(tagArray); +// taosArrayDestroy(tagName); +// return NULL; +// } + SVCreateTbReq createTbReq = {0}; SName name = {0}; tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); @@ -99,6 +125,8 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem createTbReq.type = TSDB_CHILD_TABLE; createTbReq.ctb.suid = suid; createTbReq.ctb.pTag = (uint8_t*)pTag; + createTbReq.ctb.tagNum = taosArrayGetSize(tagArray); + createTbReq.ctb.tagName = tagName; int32_t code; int32_t schemaLen; @@ -113,6 +141,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem void* schemaStr = taosMemoryMalloc(schemaLen); if (schemaStr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + tdDestroySVCreateTbReq(&createTbReq); return NULL; } taosArrayPush(schemaReqs, &schemaStr); @@ -123,6 +152,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem code = tEncodeSVCreateTbReq(&encoder, &createTbReq); if (code < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; + tdDestroySVCreateTbReq(&createTbReq); return NULL; } tEncoderClear(&encoder); @@ -231,7 +261,7 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ASSERT(pTask->tbSink.pTSchema); deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); - SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, + SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, pTask->tbSink.pSchemaWrapper, true, pTask->tbSink.stbUid, pTask->tbSink.stbFullName, &deleteReq); tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index b88589740d63460cd6057f385f02f755851ee254..b26603b394f1527334f4e4d490edb69e996053e1 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1342,7 +1342,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi return TSDB_CODE_SUCCESS; } -void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) { taosMemoryFree(pCond->colList); } +void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) { taosMemoryFreeClear(pCond->colList); } int32_t convertFillType(int32_t mode) { int32_t type = TSDB_FILL_NONE; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 44541ee27c86d93b7448bbc79fce785d51764394..45b88ec6a5c8143ba192aa7924cd873e4b406e6c 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -876,6 +876,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, &pInfo->dataReader, NULL); + cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); pTaskInfo->streamInfo.schema = mtInfo.schema; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f6bb1ffe4278d7bdec42fce85d1d65523e52f28f..a1ff79bd03738a1f0195879985a9da9544630c03 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3350,6 +3350,10 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) { tDeleteSSchemaWrapper(pSchemaInfo->qsw); } +static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { + tDeleteSSchemaWrapper(pStreamInfo->schema); +} + static int32_t sortTableGroup(STableListInfo* pTableListInfo) { taosArrayClear(pTableListInfo->pGroupList); SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t)); @@ -4043,6 +4047,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { doDestroyTableList(&pTaskInfo->tableqinfoList); destroyOperatorInfo(pTaskInfo->pRoot); cleanupTableSchemaInfo(&pTaskInfo->schemaInfo); + cleanupStreamInfo(&pTaskInfo->streamInfo); nodesDestroyNode((SNode*)pTaskInfo->pSubplan); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0c51f95284a243fdcfb008888fa0930c2182b7ad..8f9a78db83994e0d4252ecca3a5fd9ec76855aa8 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1780,6 +1780,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { qDebug("tmqsnap change get data uid:%ld", mtInfo.uid); qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType); } + tDeleteSSchemaWrapper(mtInfo.schema); qDebug("tmqsnap stream scan tsdb return null"); return NULL; } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) { diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 1af61b0ca833d33f36983bce9f4898da369e9bfb..ca9bb16c05fcbcd1edacf380f986e94eba1ca102 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1059,7 +1059,7 @@ end: for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) { STagVal* p = (STagVal*)taosArrayGet(pTagVals, i); if (IS_VAR_DATA_TYPE(p->type)) { - taosMemoryFree(p->pData); + taosMemoryFreeClear(p->pData); } } taosArrayDestroy(pTagVals); @@ -2040,7 +2040,7 @@ end: for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) { STagVal* p = (STagVal*)taosArrayGet(pTagArray, i); if (p->type == TSDB_DATA_TYPE_NCHAR) { - taosMemoryFree(p->pData); + taosMemoryFreeClear(p->pData); } } taosArrayDestroy(pTagArray); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index ea9eefbb2999da482202e99de1075cd3588f2b26..f0d5e5d67e90f0e2a820e9a5de328717eac57873 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -6664,12 +6664,7 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS break; } } while (0); - for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) { - STagVal* p = (STagVal*)taosArrayGet(pTagVals, i); - if (IS_VAR_DATA_TYPE(p->type)) { - taosMemoryFree(p->pData); - } - } + taosArrayDestroy(pTagVals); if (code != TSDB_CODE_SUCCESS) { return code; diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index daab80667ca88b6f1d1929813d98b2941fad2c57..3075be1a0087b550bd9919c4be4f4427957b8c80 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -410,6 +410,12 @@ end: if (retCode == TSDB_CODE_SUCCESS) { tTagNew(pTagVals, 1, true, ppTag); } + for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) { + STagVal* p = (STagVal*)taosArrayGet(pTagVals, i); + if (IS_VAR_DATA_TYPE(p->type)) { + taosMemoryFreeClear(p->pData); + } + } cJSON_Delete(root); return retCode; } diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index ecc25e9c2df6fd1d0efb35cc21d871fde64784c5..c38c9ac1cf522d747188e5e3fff5c7a81944a490 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -632,6 +632,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SD QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRes)); + taosMemoryFreeClear(req.msg); QW_SCH_TASK_DLOG("processDelete end, node:%p", node); _return: diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index 07602ec29f69f9fbd0dab90935e0922996c80f80..626e935733a19b47a9524c474614ac816ef38bbe 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -31,18 +31,20 @@ class TDTestCase: while True: dst = queryFile.readline() src = consumeFile.readline() - - if dst: + if src: if dst != src: - tdLog.exit("compare error: %s != %s"%src, dst) + tdLog.exit("compare error: %s != %s"%(src, dst)) else: break return - def checkDropData(self): + def checkDropData(self, drop): tdSql.execute('use db_taosx') tdSql.query("show tables") - tdSql.checkRows(2) + if drop: + tdSql.checkRows(10) + else: + tdSql.checkRows(15) tdSql.query("select * from jt order by i") tdSql.checkRows(2) tdSql.checkData(0, 1, 1) @@ -50,15 +52,72 @@ class TDTestCase: tdSql.checkData(0, 2, '{"k1":1,"k2":"hello"}') tdSql.checkData(1, 2, None) + tdSql.query("select * from sttb order by ts") + tdSql.checkRows(2) + tdSql.checkData(0, 1, 13) + tdSql.checkData(1, 1, 16) + tdSql.checkData(0, 2, 22) + tdSql.checkData(1, 2, 25) + tdSql.checkData(0, 5, "sttb3") + tdSql.checkData(1, 5, "sttb4") + + tdSql.query("select * from stt order by ts") + tdSql.checkRows(2) + tdSql.checkData(0, 1, 1) + tdSql.checkData(1, 1, 21) + tdSql.checkData(0, 2, 2) + tdSql.checkData(1, 2, 21) + tdSql.checkData(0, 5, "stt3") + tdSql.checkData(1, 5, "stt4") + tdSql.execute('use abc1') tdSql.query("show tables") - tdSql.checkRows(2) + if drop: + tdSql.checkRows(10) + else: + tdSql.checkRows(15) tdSql.query("select * from jt order by i") tdSql.checkRows(2) tdSql.checkData(0, 1, 1) tdSql.checkData(1, 1, 11) tdSql.checkData(0, 2, '{"k1":1,"k2":"hello"}') tdSql.checkData(1, 2, None) + + tdSql.query("select * from sttb order by ts") + tdSql.checkRows(2) + tdSql.checkData(0, 1, 13) + tdSql.checkData(1, 1, 16) + tdSql.checkData(0, 2, 22) + tdSql.checkData(1, 2, 25) + tdSql.checkData(0, 5, "sttb3") + tdSql.checkData(1, 5, "sttb4") + + tdSql.query("select * from stt order by ts") + tdSql.checkRows(2) + tdSql.checkData(0, 1, 1) + tdSql.checkData(1, 1, 21) + tdSql.checkData(0, 2, 2) + tdSql.checkData(1, 2, 21) + tdSql.checkData(0, 5, "stt3") + tdSql.checkData(1, 5, "stt4") + + return + + def checkDataTable(self): + tdSql.execute('use db_taosx') + tdSql.query("select * from meters_summary") + tdSql.checkRows(1) + tdSql.checkData(0, 1, 120) + tdSql.checkData(0, 2, 1) + tdSql.checkData(0, 3, "San Francisco") + + tdSql.execute('use abc1') + tdSql.query("select * from meters_summary") + tdSql.checkRows(1) + tdSql.checkData(0, 1, 120) + tdSql.checkData(0, 2, 1) + tdSql.checkData(0, 3, "San Francisco") + return def checkData(self): @@ -145,6 +204,19 @@ class TDTestCase: self.checkJson(cfgPath, "tmq_taosx_tmp") self.checkData() + self.checkDropData(False) + + return + + def checkWal1VgroupTable(self): + buildPath = tdCom.getBuildPath() + cfgPath = tdCom.getClientCfgPath() + cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -t'%(buildPath, cfgPath) + tdLog.info(cmdStr) + os.system(cmdStr) + + self.checkJson(cfgPath, "tmq_taosx_tmp") + self.checkDataTable() return @@ -155,6 +227,7 @@ class TDTestCase: os.system(cmdStr) self.checkData() + self.checkDropData(False) return @@ -164,7 +237,7 @@ class TDTestCase: tdLog.info(cmdStr) os.system(cmdStr) - self.checkDropData() + self.checkDropData(True) return @@ -177,6 +250,19 @@ class TDTestCase: self.checkJson(cfgPath, "tmq_taosx_tmp_snapshot") self.checkData() + self.checkDropData(False) + + return + + def checkSnapshot1VgroupTable(self): + buildPath = tdCom.getBuildPath() + cfgPath = tdCom.getClientCfgPath() + cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -s -t'%(buildPath, cfgPath) + tdLog.info(cmdStr) + os.system(cmdStr) + + self.checkJson(cfgPath, "tmq_taosx_tmp_snapshot") + self.checkDataTable() return @@ -187,6 +273,7 @@ class TDTestCase: os.system(cmdStr) self.checkData() + self.checkDropData(False) return @@ -196,7 +283,7 @@ class TDTestCase: tdLog.info(cmdStr) os.system(cmdStr) - self.checkDropData() + self.checkDropData(True) return @@ -205,6 +292,9 @@ class TDTestCase: self.checkWal1Vgroup() self.checkSnapshot1Vgroup() + self.checkWal1VgroupTable() + self.checkSnapshot1VgroupTable() + self.checkWalMultiVgroups() self.checkSnapshotMultiVgroups() diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c index 71b31ba1071c977d9fd3d2ceb046bdff02ca53df..a043aa7a6d75effa224d77d1de518a35abd06249 100644 --- a/utils/test/c/tmqSim.c +++ b/utils/test/c/tmqSim.c @@ -653,23 +653,23 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn int32_t code = tmq_get_raw(msg, &raw); if(code == TSDB_CODE_SUCCESS){ - int retCode = queryDB(pInfo->taos, "use metadb"); - if (retCode != 0) { - taosFprintfFile(g_fp, "error when use metadb\n"); - taosCloseFile(&g_fp); - exit(-1); - } - taosFprintfFile(g_fp, "raw:%p\n", &raw); - - tmq_write_raw(pInfo->taos, raw); +// int retCode = queryDB(pInfo->taos, "use metadb"); +// if (retCode != 0) { +// taosFprintfFile(g_fp, "error when use metadb\n"); +// taosCloseFile(&g_fp); +// exit(-1); +// } +// taosFprintfFile(g_fp, "raw:%p\n", &raw); +// +// tmq_write_raw(pInfo->taos, raw); } char* result = tmq_get_json_meta(msg); - if(result){ + if(result && strcmp(result, "") != 0){ //printf("meta result: %s\n", result); taosFprintfFile(pInfo->pConsumeMetaFile, "%s\n", result); - taosMemoryFree(result); } + tmq_free_json_meta(result); } totalRows++; @@ -818,8 +818,12 @@ void loop_consume(SThreadInfo* pInfo) { tmq_res_t msgType = tmq_get_res_type(tmqMsg); if (msgType == TMQ_RES_TABLE_META) { totalRows += meta_msg_process(tmqMsg, pInfo, totalMsgs); - } else if (msgType == TMQ_RES_DATA) - totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs); + } else if (msgType == TMQ_RES_DATA){ + totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs); + } else if (msgType == TMQ_RES_METADATA){ + meta_msg_process(tmqMsg, pInfo, totalMsgs); + totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs); + } } taos_free_result(tmqMsg); diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index f917b9159e9914682c277329ddcfa4e269dc4908..3a598ba98b59d4e55f1a12cc7dd60745ac915d74 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -54,24 +54,24 @@ static void msg_process(TAOS_RES* msg) { printf("db: %s\n", tmq_get_db_name(msg)); printf("vg: %d\n", tmq_get_vgroup_id(msg)); TAOS *pConn = use_db(); - if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) { + if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META || tmq_get_res_type(msg) == TMQ_RES_METADATA) { char* result = tmq_get_json_meta(msg); if (result) { printf("meta result: %s\n", result); + if(g_fp && strcmp(result, "") != 0){ + taosFprintfFile(g_fp, result); + taosFprintfFile(g_fp, "\n"); + } } - if(g_fp){ - taosFprintfFile(g_fp, result); - taosFprintfFile(g_fp, "\n"); - } - tmq_free_json_meta(result); } tmq_raw_data raw = {0}; tmq_get_raw(msg, &raw); + printf("write raw data type: %d\n", raw.raw_type); int32_t ret = tmq_write_raw(pConn, raw); printf("write raw data: %s\n", tmq_err2str(ret)); - + tmq_free_raw(raw); taos_close(pConn); } @@ -309,6 +309,41 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes){ } taos_free_result(pRes); } + + pRes = taos_query(pConn, + "create stable if not exists stt (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " + "nchar(8), t4 bool)"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table stt, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, + "create stable if not exists sttb (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " + "nchar(8), t4 bool)"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table sttb, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table if not exists stt1 using stt tags(2, \"stt1\", true) sttb1 using sttb tags(4, \"sttb1\", true) " + "stt2 using stt tags(43, \"stt2\", false) sttb2 using sttb tags(54, \"sttb2\", true)"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into stt3 using stt tags(23, \"stt3\", true) values(now + 1s, 1, 2, 'stt3') sttb3 using sttb tags(4, \"sttb3\", true) values(now + 2s, 13, 22, 'sttb3') " + "stt4 using stt tags(433, \"stt4\", false) values(now + 3s, 21, 21, 'stt4') sttb4 using sttb tags(543, \"sttb4\", true) values(now + 4s, 16, 25, 'sttb4')"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + return 0; } @@ -542,47 +577,83 @@ void initLogFile() { } if(g_conf.snapShot){ - char *result[] = { - "{\"type\":\"create\",\"tableName\":\"st1\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":64},{\"name\":\"c4\",\"type\":5}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1},{\"name\":\"t2\",\"type\":8,\"length\":64}]}", - "{\"type\":\"create\",\"tableName\":\"ct0\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}", - "{\"type\":\"create\",\"tableName\":\"ct1\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2000}]}", - "{\"type\":\"create\",\"tableName\":\"ct2\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[]}", - "{\"type\":\"create\",\"tableName\":\"ct3\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":5000}]}", - "{\"type\":\"create\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c2\",\"type\":10,\"length\":8},{\"name\":\"cc3\",\"type\":5}],\"tags\":[]}", - "{\"type\":\"create\",\"tableName\":\"jt\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"i\",\"type\":4}],\"tags\":[{\"name\":\"t\",\"type\":15}]}", - "{\"type\":\"create\",\"tableName\":\"jt1\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[{\"name\":\"t\",\"type\":15,\"value\":\"{\\\"k1\\\":1,\\\"k2\\\":\\\"hello\\\"}\"}]}", - "{\"type\":\"create\",\"tableName\":\"jt2\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[]}", - }; - - for(int i = 0; i < sizeof(result)/sizeof(result[0]); i++){ - taosFprintfFile(pFile2, result[i]); - taosFprintfFile(pFile2, "\n"); + if(g_conf.subTable){ + char *result[] = { + "{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"meters_summary\",\"columns\":[{\"name\":\"_wstart\",\"type\":9},{\"name\":\"current\",\"type\":6},{\"name\":\"groupid\",\"type\":4},{\"name\":\"location\",\"type\":8,\"length\":16}],\"tags\":[{\"name\":\"group_id\",\"type\":14}]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"t_d2a450ee819dcf7576f0282d9ac22dbc\",\"using\":\"meters_summary\",\"tagNum\":1,\"tags\":[{\"name\":\"group_id\",\"type\":14,\"value\":1.313555008277358e+19}],\"createList\":[]}" + }; + for(int i = 0; i < sizeof(result)/sizeof(result[0]); i++){ + taosFprintfFile(pFile2, result[i]); + taosFprintfFile(pFile2, "\n"); + } + }else{ + char *result[] = { + "{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"st1\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":64},{\"name\":\"c4\",\"type\":5}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1},{\"name\":\"t2\",\"type\":8,\"length\":64}]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct0\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct1\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2000}],\"createList\":[]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct2\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[],\"createList\":[]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct3\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":5000}],\"createList\":[]}", + "{\"type\":\"create\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c2\",\"type\":10,\"length\":8},{\"name\":\"cc3\",\"type\":5}],\"tags\":[]}", + "{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"jt\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"i\",\"type\":4}],\"tags\":[{\"name\":\"t\",\"type\":15}]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"jt1\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[{\"name\":\"t\",\"type\":15,\"value\":\"{\\\"k1\\\":1,\\\"k2\\\":\\\"hello\\\"}\"}],\"createList\":[]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"jt2\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[],\"createList\":[]}", + "{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"stt\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":16}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1}]}", + "{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"sttb\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":16}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1}]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt1\",\"using\":\"stt\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt1\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"sttb1\",\"using\":\"sttb\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":4},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"sttb1\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt2\",\"using\":\"stt\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":43},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt2\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":0}],\"createList\":[]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"sttb2\",\"using\":\"sttb\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":54},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"sttb2\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt3\",\"using\":\"stt\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":23},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt3\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"sttb3\",\"using\":\"sttb\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":4},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"sttb3\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt4\",\"using\":\"stt\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":433},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt4\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":0}],\"createList\":[]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"sttb4\",\"using\":\"sttb\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":543},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"sttb4\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}" + }; + for(int i = 0; i < sizeof(result)/sizeof(result[0]); i++){ + taosFprintfFile(pFile2, result[i]); + taosFprintfFile(pFile2, "\n"); + } } }else{ - char *result[] = { - "{\"type\":\"create\",\"tableName\":\"st1\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":16}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1}]}", - "{\"type\":\"create\",\"tableName\":\"ct0\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}", - "{\"type\":\"create\",\"tableName\":\"ct1\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2000}]}", - "{\"type\":\"create\",\"tableName\":\"ct2\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[]}", - "{\"type\":\"create\",\"tableName\":\"ct3\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":3000}]}", - "{\"type\":\"alter\",\"tableName\":\"st1\",\"tableType\":\"super\",\"alterType\":5,\"colName\":\"c4\",\"colType\":5}", - "{\"type\":\"alter\",\"tableName\":\"st1\",\"tableType\":\"super\",\"alterType\":7,\"colName\":\"c3\",\"colType\":8,\"colLength\":64}", - "{\"type\":\"alter\",\"tableName\":\"st1\",\"tableType\":\"super\",\"alterType\":1,\"colName\":\"t2\",\"colType\":8,\"colLength\":64}", - "{\"type\":\"alter\",\"tableName\":\"ct3\",\"tableType\":\"child\",\"alterType\":4,\"colName\":\"t1\",\"colValue\":\"5000\",\"colValueNull\":false}", - "{\"type\":\"create\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":10,\"length\":4}],\"tags\":[]}", - "{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":5,\"colName\":\"c3\",\"colType\":5}", - "{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":7,\"colName\":\"c2\",\"colType\":10,\"colLength\":8}", - "{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":10,\"colName\":\"c3\",\"colNewName\":\"cc3\"}", - "{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":9}", - "{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":6,\"colName\":\"c1\"}", - "{\"type\":\"create\",\"tableName\":\"jt\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"i\",\"type\":4}],\"tags\":[{\"name\":\"t\",\"type\":15}]}", - "{\"type\":\"create\",\"tableName\":\"jt1\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[{\"name\":\"t\",\"type\":15,\"value\":\"{\\\"k1\\\":1,\\\"k2\\\":\\\"hello\\\"}\"}]}", - "{\"type\":\"create\",\"tableName\":\"jt2\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[]}" - }; - - for(int i = 0; i < sizeof(result)/sizeof(result[0]); i++){ - taosFprintfFile(pFile2, result[i]); - taosFprintfFile(pFile2, "\n"); + if(g_conf.subTable){ + char *result[] = { + "{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"meters_summary\",\"columns\":[{\"name\":\"_wstart\",\"type\":9},{\"name\":\"current\",\"type\":6},{\"name\":\"groupid\",\"type\":4},{\"name\":\"location\",\"type\":8,\"length\":16}],\"tags\":[{\"name\":\"group_id\",\"type\":14}]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"t_d2a450ee819dcf7576f0282d9ac22dbc\",\"using\":\"meters_summary\",\"tagNum\":1,\"tags\":[{\"name\":\"group_id\",\"type\":14,\"value\":1.313555008277358e+19}],\"createList\":[]}" + }; + + for(int i = 0; i < sizeof(result)/sizeof(result[0]); i++){ + taosFprintfFile(pFile2, result[i]); + taosFprintfFile(pFile2, "\n"); + } + }else{ + char *result[] = { + "{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"st1\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":16}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1}]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct0\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct1\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2000}],\"createList\":[]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct2\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[],\"createList\":[]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct3\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":3000}],\"createList\":[]}", + "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":5,\"colName\":\"c4\",\"colType\":5}", + "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":7,\"colName\":\"c3\",\"colType\":8,\"colLength\":64}", + "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":1,\"colName\":\"t2\",\"colType\":8,\"colLength\":64}", + "{\"type\":\"alter\",\"tableType\":\"child\",\"tableName\":\"ct3\",\"alterType\":4,\"colName\":\"t1\",\"colValue\":\"5000\",\"colValueNull\":false}", + "{\"type\":\"create\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":10,\"length\":4}],\"tags\":[]}", + "{\"type\":\"alter\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"alterType\":5,\"colName\":\"c3\",\"colType\":5}", + "{\"type\":\"alter\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"alterType\":7,\"colName\":\"c2\",\"colType\":10,\"colLength\":8}", + "{\"type\":\"alter\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"alterType\":10,\"colName\":\"c3\",\"colNewName\":\"cc3\"}", + "{\"type\":\"alter\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"alterType\":9}", + "{\"type\":\"alter\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"alterType\":6,\"colName\":\"c1\"}", + "{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"jt\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"i\",\"type\":4}],\"tags\":[{\"name\":\"t\",\"type\":15}]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"jt1\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[{\"name\":\"t\",\"type\":15,\"value\":\"{\\\"k1\\\":1,\\\"k2\\\":\\\"hello\\\"}\"}],\"createList\":[]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"jt2\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[],\"createList\":[]}", + "{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"stt\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":16}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1}]}", + "{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"sttb\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":16}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1}]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt1\",\"using\":\"stt\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt1\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[{\"tableName\":\"stt1\",\"using\":\"stt\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt1\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]},{\"tableName\":\"sttb1\",\"using\":\"sttb\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":4},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"sttb1\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]},{\"tableName\":\"stt2\",\"using\":\"stt\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":43},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt2\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":0}]},{\"tableName\":\"sttb2\",\"using\":\"sttb\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":54},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"sttb2\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}]}", + "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt3\",\"using\":\"stt\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":23},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt3\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[{\"tableName\":\"stt3\",\"using\":\"stt\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":23},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt3\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]},{\"tableName\":\"sttb3\",\"using\":\"sttb\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":4},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"sttb3\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]},{\"tableName\":\"stt4\",\"using\":\"stt\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":433},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt4\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":0}]},{\"tableName\":\"sttb4\",\"using\":\"sttb\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":543},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"sttb4\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}]}" + }; + + for(int i = 0; i < sizeof(result)/sizeof(result[0]); i++){ + taosFprintfFile(pFile2, result[i]); + taosFprintfFile(pFile2, "\n"); + } } } @@ -619,5 +690,6 @@ int main(int argc, char* argv[]) { tmq_t* tmq = build_consumer(); tmq_list_t* topic_list = build_topic_list(); basic_consume_loop(tmq, topic_list); + tmq_list_destroy(topic_list); taosCloseFile(&g_fp); }