diff --git a/examples/c/tmq.c b/examples/c/tmq.c index 1d2b26624bb3be70051e183b7228b0ec90a48aa8..fd1f146618874132738a675714841e966353311a 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -29,7 +29,7 @@ static void msg_process(TAOS_RES* msg) { printf("vg: %d\n", tmq_get_vgroup_id(msg)); if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) { tmq_raw_data raw = {0}; - int32_t code = tmq_get_raw_meta(msg, &raw); + int32_t code = tmq_get_raw(msg, &raw); if (code == 0) { TAOS* pConn = taos_connect("192.168.1.86", "root", "taosdata", NULL, 0); if (pConn == NULL) { @@ -50,7 +50,7 @@ static void msg_process(TAOS_RES* msg) { } taos_free_result(pRes); - int32_t ret = taos_write_raw_meta(pConn, raw); + int32_t ret = tmq_write_raw(pConn, raw); printf("write raw data: %s\n", tmq_err2str(ret)); taos_close(pConn); } diff --git a/examples/c/tmq_taosx.c b/examples/c/tmq_taosx.c index 13f3b18e64b69271adea3637f19f074138dd6d86..d0def4426905b773db948b0cf6f0d22c8733d5da 100644 --- a/examples/c/tmq_taosx.c +++ b/examples/c/tmq_taosx.c @@ -49,18 +49,25 @@ static void msg_process(TAOS_RES* msg) { printf("meta result: %s\n", result); } tmq_free_json_meta(result); - - - tmq_raw_data raw = {0}; - tmq_get_raw_meta(msg, &raw); - int32_t ret = taos_write_raw_meta(pConn, raw); - printf("write raw meta: %s\n", tmq_err2str(ret)); } - if(tmq_get_res_type(msg) == TMQ_RES_DATA){ - int32_t ret =taos_write_raw_data(pConn, msg); - printf("write raw data: %s\n", tmq_err2str(ret)); - } + tmq_raw_data raw = {0}; + tmq_get_raw(msg, &raw); + int32_t ret = tmq_write_raw(pConn, raw); + printf("write raw data: %s\n", tmq_err2str(ret)); + +// else{ +// while(1){ +// int numOfRows = 0; +// void *pData = NULL; +// taos_fetch_raw_block(msg, &numOfRows, &pData); +// if(numOfRows == 0) break; +// printf("write data: tbname:%s, numOfRows:%d\n", tmq_get_table_name(msg), numOfRows); +// int ret = taos_write_raw_block(pConn, numOfRows, pData, tmq_get_table_name(msg)); +// printf("write raw data: %s\n", tmq_err2str(ret)); +// } +// } + taos_close(pConn); } @@ -121,7 +128,7 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "insert into ct0 values(now, 1, 2, 'a')"); + pRes = taos_query(pConn, "insert into ct0 values(1626006833600, 1, 2, 'a')"); if (taos_errno(pRes) != 0) { printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes)); return -1; @@ -142,7 +149,7 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "insert into ct1 values(now, 3, 4, 'b')"); + pRes = taos_query(pConn, "insert into ct1 values(1626006833600, 3, 4, 'b')"); if (taos_errno(pRes) != 0) { printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes)); return -1; @@ -156,7 +163,7 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "insert into ct3 values(now, 5, 6, 'c') ct1 values(now+1s, 2, 3, 'sds') (now+2s, 4, 5, 'ddd') ct0 values(now+1s, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"); + pRes = taos_query(pConn, "insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, 'ddd') ct0 values(1626006833602, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"); if (taos_errno(pRes) != 0) { printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); return -1; @@ -177,7 +184,14 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "insert into ct3 values(now+7s, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) (now+9s, 51, 62, 'c333', 940)"); + pRes = taos_query(pConn, "insert into ct3 values(1626006833605, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) (1626006833609, 51, 62, 'c333', 940)"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into ct3 select * from ct1"); if (taos_errno(pRes) != 0) { printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); return -1; @@ -198,19 +212,26 @@ int32_t init_env() { } taos_free_result(pRes); -// pRes = taos_query(pConn, "drop table ct3 ct1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes)); -// return -1; -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "drop table st1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); -// return -1; -// } -// taos_free_result(pRes); + pRes = taos_query(pConn, "delete from abc1 .ct3 where ts < 1626006833606"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop table ct3 ct1"); + if (taos_errno(pRes) != 0) { + printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop table st1"); + if (taos_errno(pRes) != 0) { + printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))"); if (taos_errno(pRes) != 0) { @@ -261,12 +282,12 @@ int32_t init_env() { } taos_free_result(pRes); -// pRes = taos_query(pConn, "drop table n1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes)); -// return -1; -// } -// taos_free_result(pRes); + pRes = taos_query(pConn, "drop table n1"); + if (taos_errno(pRes) != 0) { + printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)"); if (taos_errno(pRes) != 0) { @@ -289,21 +310,21 @@ int32_t init_env() { } taos_free_result(pRes); -// pRes = taos_query(pConn, -// "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " -// "nchar(8), t4 bool)"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); -// return -1; -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "drop table st1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); -// return -1; -// } -// taos_free_result(pRes); + pRes = taos_query(pConn, + "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " + "nchar(8), t4 bool)"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop table st1"); + if (taos_errno(pRes) != 0) { + printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); taos_close(pConn); return 0; diff --git a/include/client/taos.h b/include/client/taos.h index 5f147bb07cadbb7969a352fd5b0fb0191e732bb2..6f3244ea82fcbe5edfd52689d2c6a95fc3a2e521 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -260,17 +260,20 @@ enum tmq_res_t { }; typedef struct tmq_raw_data{ - void* raw_meta; - uint32_t raw_meta_len; - uint16_t raw_meta_type; + void* raw; + uint32_t raw_len; + uint16_t raw_type; } tmq_raw_data; typedef enum tmq_res_t tmq_res_t; DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res); -DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, tmq_raw_data *raw_meta); -DLL_EXPORT int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta); -DLL_EXPORT int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *res); +DLL_EXPORT int32_t tmq_get_raw(TAOS_RES *res, tmq_raw_data *raw); +DLL_EXPORT int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw); +DLL_EXPORT int taos_write_raw_block(TAOS *taos, int numOfRows, char *pData, const char* tbname); + + +DLL_EXPORT void tmq_free_raw(tmq_raw_data raw); DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed by tmq_free_json_meta DLL_EXPORT void tmq_free_json_meta(char* jsonMeta); DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 9e7aea03ea28e6b2a4c43a39681a6556fd367051..46e065d356e0f5583644bbf9df0d6e50ade923ee 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -40,6 +40,7 @@ enum { || x == TDMT_VND_CREATE_TABLE \ || x == TDMT_VND_ALTER_TABLE \ || x == TDMT_VND_DROP_TABLE \ + || x == TDMT_VND_DELETE \ ) // clang-format on diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ee19969e5004836f60d84b52aed7ba963e9675b7..60ddb1639c369723561b0d5c59c121e37bad79a1 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3036,6 +3036,7 @@ typedef struct SDeleteRes { int64_t skey; int64_t ekey; int64_t affectedRows; + char tableFName[TSDB_TABLE_FNAME_LEN]; } SDeleteRes; int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes); diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index 8d5a8abcb4aa595086d5b223e6b8dc028554384f..90b804b3825c7a3c9dd6fc9b7469bc0166f7ab21 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -38,6 +38,7 @@ typedef struct SDeleterRes { int64_t skey; int64_t ekey; int64_t affectedRows; + char tableFName[TSDB_TABLE_FNAME_LEN]; } SDeleterRes; typedef struct SDeleterParam { diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index e382fa4efd4eea7f22cf41da92ec751bd7a2fa4a..bc0c3fe81bc1e02cd65c94b7b552c9d97dd06f62 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -502,6 +502,7 @@ typedef struct SDataDeleterNode { uint64_t tableId; int8_t tableType; // table type char tableFName[TSDB_TABLE_FNAME_LEN]; + char tsColName[TSDB_COL_NAME_LEN]; STimeWindow deleteTimeRange; SNode* pAffectedRows; } SDataDeleterNode; diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index cc040594b1d2478fa469d95fbb09b0f0d358b629..b62f822313efee9968d1910cc7835e9181b72b50 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -251,8 +251,8 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t (_code) == TSDB_CODE_APP_NOT_READY || (_code) == TSDB_CODE_RPC_BROKEN_LINK) #define NEED_CLIENT_RM_TBLMETA_REQ(_type) \ - ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_VND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \ - (_type) == TDMT_VND_DROP_STB) + ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \ + (_type) == TDMT_MND_DROP_STB) #define NEED_SCHEDULER_REDIRECT_ERROR(_code) \ ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \ diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 74d5d4270b9e8ba6fa9f5cc549a69ec62e628347..df9072fe1ac05cb63e5317c1c60beb9f79143fb3 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1206,6 +1206,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp); + tDecoderClear(&decoder); memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); } else { ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP); @@ -1859,6 +1860,10 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) { if (TD_RES_TMQ(res)) { return TMQ_RES_DATA; } else if (TD_RES_TMQ_META(res)) { + SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; + if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) { + return TMQ_RES_DATA; + } return TMQ_RES_TABLE_META; } else { return TMQ_RES_INVALID; @@ -1913,17 +1918,6 @@ const char* tmq_get_table_name(TAOS_RES* res) { return NULL; } -int32_t tmq_get_raw_meta(TAOS_RES* res, tmq_raw_data *raw) { - if (TD_RES_TMQ_META(res) && raw) { - SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; - raw->raw_meta = pMetaRspObj->metaRsp.metaRsp; - raw->raw_meta_len = pMetaRspObj->metaRsp.metaRspLen; - raw->raw_meta_type = pMetaRspObj->metaRsp.resMsgType; - return TSDB_CODE_SUCCESS; - } - return TSDB_CODE_INVALID_PARA; -} - static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t) { char* string = NULL; @@ -2436,30 +2430,6 @@ _exit: return string; } -char* tmq_get_json_meta(TAOS_RES* res) { - if (!TD_RES_TMQ_META(res)) { - return NULL; - } - - SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; - if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) { - return processCreateStb(&pMetaRspObj->metaRsp); - } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB) { - return processAlterStb(&pMetaRspObj->metaRsp); - } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB) { - return processDropSTable(&pMetaRspObj->metaRsp); - } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE) { - return processCreateTable(&pMetaRspObj->metaRsp); - } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE) { - return processAlterTable(&pMetaRspObj->metaRsp); - } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) { - return processDropTable(&pMetaRspObj->metaRsp); - } - return NULL; -} - -void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); } - static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { SVCreateStbReq req = {0}; SDecoder coder; @@ -2531,6 +2501,13 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { pQuery.stableQuery = true; launchQueryImpl(pRequest, &pQuery, true, NULL); + + if(pRequest->code == TSDB_CODE_SUCCESS){ + SCatalog* pCatalog = NULL; + catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); + catalogRemoveTableMeta(pCatalog, &tableName); + } + code = pRequest->code; taosMemoryFree(pCmdMsg.pMsg); @@ -2572,7 +2549,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { pReq.suid = req.suid; STscObj* pTscObj = pRequest->pTscObj; - SName tableName; + SName tableName = {0}; tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name); SCmdMsgInfo pCmdMsg = {0}; @@ -2593,6 +2570,13 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { pQuery.stableQuery = true; launchQueryImpl(pRequest, &pQuery, true, NULL); + + if(pRequest->code == TSDB_CODE_SUCCESS){ + SCatalog* pCatalog = NULL; + catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); + catalogRemoveTableMeta(pCatalog, &tableName); + } + code = pRequest->code; taosMemoryFree(pCmdMsg.pMsg); @@ -2659,17 +2643,20 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { .requestId = pRequest->requestId, .requestObjRefId = pRequest->self, .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; + + pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); // loop to create table for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; SVgroupInfo pInfo = {0}; - SName pName; + SName pName = {0}; toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName); code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo); if (code != TSDB_CODE_SUCCESS) { goto end; } + taosArrayPush(pRequest->tableList, &pName); SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId)); if (pTableBatch == NULL) { @@ -2703,8 +2690,11 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { goto end; } - launchQueryImpl(pRequest, pQuery, false, NULL); - pQuery = NULL; // no need to free in the end + launchQueryImpl(pRequest, pQuery, true, NULL); + if (pRequest->code == TSDB_CODE_SUCCESS){ + removeMeta(pTscObj, pRequest->tableList); + } + code = pRequest->code; end: @@ -2772,19 +2762,21 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { .requestId = pRequest->requestId, .requestObjRefId = pRequest->self, .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; + pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); // loop to create table for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pDropReq = req.pReqs + iReq; pDropReq->igNotExists = true; SVgroupInfo pInfo = {0}; - SName pName; + SName pName = {0}; toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName); code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo); if (code != TSDB_CODE_SUCCESS) { goto end; } + taosArrayPush(pRequest->tableList, &pName); SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId)); if (pTableBatch == NULL) { SVgroupDropTableBatch tBatch = {0}; @@ -2815,8 +2807,10 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { goto end; } - launchQueryImpl(pRequest, pQuery, false, NULL); - pQuery = NULL; // no need to free in the end + launchQueryImpl(pRequest, pQuery, true, NULL); + if (pRequest->code == TSDB_CODE_SUCCESS){ + removeMeta(pTscObj, pRequest->tableList); + } code = pRequest->code; end: @@ -2827,6 +2821,70 @@ end: return code; } +// delete from db.tabl where .. -> delete from tabl where .. +// delete from db .tabl where .. -> delete from tabl where .. +static void getTbName(char *sql){ + char *ch = sql; + + bool inBackQuote = false; + int8_t dotIndex = 0; + while(*ch != '\0'){ + if(!inBackQuote && *ch == '`'){ + inBackQuote = true; + ch++; + continue; + } + + if(inBackQuote && *ch == '`'){ + inBackQuote = false; + ch++; + + continue; + } + + if(!inBackQuote && *ch == '.'){ + dotIndex ++; + if(dotIndex == 2){ + memmove(sql, ch + 1, strlen(ch + 1) + 1); + break; + } + } + ch++; + } +} + +static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { + SDeleteRes req = {0}; + SDecoder coder = {0}; + int32_t code = TSDB_CODE_SUCCESS; + + // decode and process req + void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); + int32_t len = metaLen - sizeof(SMsgHead); + tDecoderInit(&coder, data, len); + if (tDecodeDeleteRes(&coder, &req) < 0) { + code = TSDB_CODE_INVALID_PARA; + goto end; + } + + getTbName(req.tableFName); + char sql[256] = {0}; + sprintf(sql, "delete from `%s` where `%s` >= %" PRId64" and `%s` <= %" PRId64, req.tableFName, "ts", req.skey, "ts", req.ekey); + printf("delete sql:%s\n", sql); + + TAOS_RES* res = taos_query(taos, sql); + SRequestObj *pRequest = (SRequestObj *)res; + code = pRequest->code; + if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { + code = TSDB_CODE_SUCCESS; + } + taos_free_result(res); + +end: + tDecoderClear(&coder); + return code; +} + static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { SVAlterTbReq req = {0}; SDecoder coder = {0}; @@ -2914,15 +2972,21 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { goto end; } - launchQueryImpl(pRequest, pQuery, false, NULL); - pQuery = NULL; // no need to free in the end + launchQueryImpl(pRequest, pQuery, true, NULL); + pVgData = NULL; pArray = NULL; code = pRequest->code; if (code == TSDB_CODE_VND_TABLE_NOT_EXIST) { - code = 0; + code = TSDB_CODE_SUCCESS; } + if(pRequest->code == TSDB_CODE_SUCCESS){ + SExecResult* pRes = &pRequest->body.resInfo.execRes; + if(pRes->res != NULL){ + code = handleAlterTbExecRes(pRes->res, pCatalog); + } + } end: taosArrayDestroy(pArray); if (pVgData) taosMemoryFreeClear(pVgData->pData); @@ -2933,27 +2997,6 @@ end: return code; } -int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta){ - if (!taos) { - return TSDB_CODE_INVALID_PARA; - } - - if(raw_meta.raw_meta_type == TDMT_VND_CREATE_STB) { - return taosCreateStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len); - }else if(raw_meta.raw_meta_type == TDMT_VND_ALTER_STB){ - return taosCreateStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len); - }else if(raw_meta.raw_meta_type == TDMT_VND_DROP_STB){ - return taosDropStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len); - }else if(raw_meta.raw_meta_type == TDMT_VND_CREATE_TABLE){ - return taosCreateTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len); - }else if(raw_meta.raw_meta_type == TDMT_VND_ALTER_TABLE){ - return taosAlterTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len); - }else if(raw_meta.raw_meta_type == TDMT_VND_DROP_TABLE){ - return taosDropTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len); - } - return TSDB_CODE_INVALID_PARA; -} - typedef struct{ SVgroupInfo vg; void *data; @@ -2964,15 +3007,196 @@ static void destroyVgHash(void* data) { taosMemoryFreeClear(vgData->data); } -int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){ - if (!TD_RES_TMQ(msg)) { - uError("WriteRaw:msg is not tmq : %d", *(int8_t*)msg); - return TSDB_CODE_TMQ_INVALID_MSG; +int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){ + int32_t code = TSDB_CODE_SUCCESS; + STableMeta* pTableMeta = NULL; + SQuery *pQuery = NULL; + + SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT); + if(!pRequest){ + uError("WriteRaw:createRequest error request is null"); + code = terrno; + goto end; } + if (!pRequest->pDb) { + uError("WriteRaw:not use db"); + code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; + goto end; + } + + SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; + strcpy(pName.dbname, pRequest->pDb); + strcpy(pName.tname, tbname); + + struct SCatalog *pCatalog = NULL; + code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + if(code != TSDB_CODE_SUCCESS){ + uError("WriteRaw: get gatlog error"); + goto end; + } + + SRequestConnInfo conn = {0}; + conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter; + conn.requestId = pRequest->requestId; + conn.requestObjRefId = pRequest->self; + conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); + + SVgroupInfo vgData = {0}; + code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData); + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbname); + goto end; + } + + code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbname); + goto end; + } + uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid); + uint64_t uid = pTableMeta->uid; + int32_t numOfCols = pTableMeta->tableInfo.numOfColumns; + + uint16_t fLen = 0; + int32_t rowSize = 0; + int16_t nVar = 0; + for (int i = 0; i < numOfCols; i++) { + SSchema *schema = pTableMeta->schema + i; + fLen += TYPE_BYTES[schema->type]; + rowSize += schema->bytes; + if(IS_VAR_DATA_TYPE(schema->type)){ + nVar ++; + } + } + + int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) + + (int32_t)TD_BITMAP_BYTES(numOfCols - 1); + int32_t schemaLen = 0; + int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize; + + int32_t totalLen = sizeof(SSubmitReq) + submitLen; + SSubmitReq* subReq = taosMemoryCalloc(1, totalLen); + SSubmitBlk* blk = POINTER_SHIFT(subReq, sizeof(SSubmitReq)); + void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk)); + STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen); + + SRowBuilder rb = {0}; + tdSRowInit(&rb, pTableMeta->sversion); + tdSRowSetTpInfo(&rb, numOfCols, fLen); + int32_t dataLen = 0; + + char* pStart = pData + sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t)); + int32_t* colLength = (int32_t*)pStart; + pStart += sizeof(int32_t) * numOfCols; + + SResultColumn *pCol = taosMemoryCalloc(numOfCols, sizeof(SResultColumn)); + + for (int32_t i = 0; i < numOfCols; ++i) { + if (IS_VAR_DATA_TYPE(pTableMeta->schema[i].type)) { + pCol[i].offset = (int32_t*)pStart; + pStart += rows * sizeof(int32_t); + } else { + pCol[i].nullbitmap = pStart; + pStart += BitmapLen(rows); + } + + pCol[i].pData = pStart; + pStart += colLength[i]; + } + + for (int32_t j = 0; j < rows; j++) { + tdSRowResetBuf(&rb, rowData); + int32_t offset = 0; + for (int32_t k = 0; k < numOfCols; k++) { + const SSchema* pColumn = &pTableMeta->schema[k]; + + if (IS_VAR_DATA_TYPE(pColumn->type)) { + if (pCol[k].offset[j] != -1) { + char* data = pCol[k].pData + pCol[k].offset[j]; + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k); + } else { + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); + } + } else { + if (!colDataIsNull_f(pCol[k].nullbitmap, j)) { + char* data = pCol[k].pData + pColumn->bytes * j; + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k); + } else { + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); + } + } + + offset += TYPE_BYTES[pColumn->type]; + } + int32_t rowLen = TD_ROW_LEN(rowData); + rowData = POINTER_SHIFT(rowData, rowLen); + dataLen += rowLen; + } + + taosMemoryFree(pCol); + + blk->uid = htobe64(uid); + blk->suid = htobe64(suid); + blk->padding = htonl(blk->padding); + blk->sversion = htonl(pTableMeta->sversion); + blk->schemaLen = htonl(schemaLen); + blk->numOfRows = htons(rows); + blk->dataLen = htonl(dataLen); + subReq->length = sizeof(SSubmitReq) + sizeof(SSubmitBlk) + schemaLen + dataLen; + subReq->numOfBlocks = 1; + + pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); + if (NULL == pQuery) { + uError("create SQuery error"); + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; + pQuery->haveResultSet = false; + pQuery->msgType = TDMT_VND_SUBMIT; + pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); + if (NULL == pQuery->pRoot) { + uError("create pQuery->pRoot error"); + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + SVnodeModifOpStmt *nodeStmt = (SVnodeModifOpStmt *)(pQuery->pRoot); + nodeStmt->payloadType = PAYLOAD_TYPE_KV; + nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES); + + SVgDataBlocks *dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); + if (NULL == dst) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto end; + } + dst->vg = vgData; + dst->numOfTables = subReq->numOfBlocks; + dst->size = subReq->length; + dst->pData = (char*)subReq; + subReq->header.vgId = htonl(dst->vg.vgId); + subReq->version = htonl(1); + subReq->header.contLen = htonl(subReq->length); + subReq->length = htonl(subReq->length); + subReq->numOfBlocks = htonl(subReq->numOfBlocks); + subReq = NULL; // no need free + taosArrayPush(nodeStmt->pDataBlocks, &dst); + + launchQueryImpl(pRequest, pQuery, true, NULL); + code = pRequest->code; + +end: + taosMemoryFreeClear(pTableMeta); + qDestroyQuery(pQuery); + return code; +} + +static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){ int32_t code = TSDB_CODE_SUCCESS; SHashObj *pVgHash = NULL; SQuery *pQuery = NULL; + SMqRspObj rspObj = {0}; + SDecoder decoder = {0}; terrno = TSDB_CODE_SUCCESS; SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT); @@ -2981,6 +3205,17 @@ int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){ return terrno; } + rspObj.resIter = -1; + rspObj.resType = RES_TYPE__TMQ; + + tDecoderInit(&decoder, data, dataLen); + code = tDecodeSMqDataRsp(&decoder, &rspObj.rsp); + if (code != 0){ + uError("WriteRaw:decode smqDataRsp error"); + code = TSDB_CODE_INVALID_MSG; + goto end; + } + if (!pRequest->pDb) { uError("WriteRaw:not use db"); code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; @@ -3001,18 +3236,18 @@ int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){ conn.requestId = pRequest->requestId; conn.requestObjRefId = pRequest->self; conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); - SMqRspObj *rspObj = ((SMqRspObj*)msg); - printf("raw data block num:%d\n", rspObj->rsp.blockNum); - while (++rspObj->resIter < rspObj->rsp.blockNum) { - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj->rsp.blockData, rspObj->resIter); - if (!rspObj->rsp.withSchema) { - uError("WriteRaw:no schema, iter:%d", rspObj->resIter); + + printf("raw data block num:%d\n", rspObj.rsp.blockNum); + while (++rspObj.resIter < rspObj.rsp.blockNum) { + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); + if (!rspObj.rsp.withSchema) { + uError("WriteRaw:no schema, iter:%d", rspObj.resIter); goto end; } - SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj->rsp.blockSchema, rspObj->resIter); - setResSchemaInfo(&rspObj->resInfo, pSW->pSchema, pSW->nCols); + SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter); + setResSchemaInfo(&rspObj.resInfo, pSW->pSchema, pSW->nCols); - code = setQueryResultFromRsp(&rspObj->resInfo, pRetrieve, false, false); + code = setQueryResultFromRsp(&rspObj.resInfo, pRetrieve, false, false); if(code != TSDB_CODE_SUCCESS){ uError("WriteRaw: setQueryResultFromRsp error"); goto end; @@ -3030,13 +3265,13 @@ int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){ } } - int32_t rows = rspObj->resInfo.numOfRows; + int32_t rows = rspObj.resInfo.numOfRows; int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) + (int32_t)TD_BITMAP_BYTES(pSW->nCols - 1); int32_t schemaLen = 0; int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize; - const char* tbName = tmq_get_table_name(msg); + const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter); if(!tbName){ uError("WriteRaw: tbname is null"); code = TSDB_CODE_TMQ_INVALID_MSG; @@ -3108,13 +3343,13 @@ int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){ for (int32_t j = 0; j < rows; j++) { tdSRowResetBuf(&rb, rowData); - doSetOneRowPtr(&rspObj->resInfo); - rspObj->resInfo.current += 1; + doSetOneRowPtr(&rspObj.resInfo); + rspObj.resInfo.current += 1; int32_t offset = 0; for (int32_t k = 0; k < pSW->nCols; k++) { const SSchema* pColumn = &pSW->pSchema[k]; - char *data = rspObj->resInfo.row[k]; + char *data = rspObj.resInfo.row[k]; if (!data) { tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); } else { @@ -3186,13 +3421,105 @@ int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){ launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; + end: + tDecoderClear(&decoder); + taos_free_result(&rspObj); qDestroyQuery(pQuery); destroyRequest(pRequest); taosHashCleanup(pVgHash); return code; } +char* tmq_get_json_meta(TAOS_RES* res) { + if (!TD_RES_TMQ_META(res)) { + return NULL; + } + + SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; + if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) { + return processCreateStb(&pMetaRspObj->metaRsp); + } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB) { + return processAlterStb(&pMetaRspObj->metaRsp); + } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB) { + return processDropSTable(&pMetaRspObj->metaRsp); + } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE) { + return processCreateTable(&pMetaRspObj->metaRsp); + } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE) { + return processAlterTable(&pMetaRspObj->metaRsp); + } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) { + return processDropTable(&pMetaRspObj->metaRsp); + } + return NULL; +} + +void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); } + +int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data *raw) { + if (!raw || !res){ + return TSDB_CODE_INVALID_PARA; + } + if (TD_RES_TMQ_META(res)) { + SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; + raw->raw = pMetaRspObj->metaRsp.metaRsp; + raw->raw_len = pMetaRspObj->metaRsp.metaRspLen; + raw->raw_type = pMetaRspObj->metaRsp.resMsgType; + } else if(TD_RES_TMQ(res)){ + SMqRspObj *rspObj = ((SMqRspObj*)res); + + int32_t len = 0; + int32_t code = 0; + tEncodeSize(tEncodeSMqDataRsp, &rspObj->rsp, len, code); + if (code < 0) { + return -1; + } + + void *buf = taosMemoryCalloc(1, len); + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, len); + tEncodeSMqDataRsp(&encoder, &rspObj->rsp); + tEncoderClear(&encoder); + + raw->raw = buf; + raw->raw_len = len; + raw->raw_type = RES_TYPE__TMQ; + } else { + return TSDB_CODE_TMQ_INVALID_MSG; + } + return TSDB_CODE_SUCCESS; +} + +void tmq_free_raw(tmq_raw_data raw) { + if (raw.raw_type == RES_TYPE__TMQ){ + taosMemoryFree(raw.raw); + } +} + +int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw){ + if (!taos) { + return TSDB_CODE_INVALID_PARA; + } + + if(raw.raw_type == TDMT_VND_CREATE_STB) { + return taosCreateStb(taos, raw.raw, raw.raw_len); + }else if(raw.raw_type == TDMT_VND_ALTER_STB){ + return taosCreateStb(taos, raw.raw, raw.raw_len); + }else if(raw.raw_type == TDMT_VND_DROP_STB){ + return taosDropStb(taos, raw.raw, raw.raw_len); + }else if(raw.raw_type == TDMT_VND_CREATE_TABLE){ + return taosCreateTable(taos, raw.raw, raw.raw_len); + }else if(raw.raw_type == TDMT_VND_ALTER_TABLE){ + return taosAlterTable(taos, raw.raw, raw.raw_len); + }else if(raw.raw_type == TDMT_VND_DROP_TABLE) { + return taosDropTable(taos, raw.raw, raw.raw_len); + }else if(raw.raw_type == TDMT_VND_DELETE){ + return taosDeleteData(taos, raw.raw, raw.raw_len); + }else if(raw.raw_type == RES_TYPE__TMQ){ + return tmqWriteRaw(taos, raw.raw, raw.raw_len); + } + return TSDB_CODE_INVALID_PARA; +} + void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) { // tmqCommitInner2(tmq, msg, 0, 1, cb, param); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c94c624f8953e4317ca95e23dfa462dbe49a3dc1..82c400fe6a06ec1fdc5bb69bd92d1e3ea69d88f4 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5664,6 +5664,7 @@ int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) { if (tEncodeI64(pCoder, pRes->ekey) < 0) return -1; if (tEncodeI64v(pCoder, pRes->affectedRows) < 0) return -1; + if (tEncodeCStr(pCoder, pRes->tableFName) < 0) return -1; return 0; } @@ -5675,12 +5676,13 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) { if (tDecodeI32v(pCoder, &nUid) < 0) return -1; for (int32_t iUid = 0; iUid < nUid; iUid++) { if (tDecodeU64(pCoder, &uid) < 0) return -1; - taosArrayPush(pRes->uidList, &uid); + if (pRes->uidList) taosArrayPush(pRes->uidList, &uid); } if (tDecodeI64(pCoder, &pRes->skey) < 0) return -1; if (tDecodeI64(pCoder, &pRes->ekey) < 0) return -1; if (tDecodeI64v(pCoder, &pRes->affectedRows) < 0) return -1; + if (tDecodeCStrTo(pCoder, pRes->tableFName) < 0) return -1; return 0; } int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 002b629660935cf08597ccc320cc294b9aae771b..33617c96beb0e710faa0ffb32f5e98252ed7e60a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -138,8 +138,8 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0); } - int32_t len; - int32_t code; + int32_t len = 0; + int32_t code = 0; tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code); if (code < 0) { return -1; @@ -156,9 +156,10 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - SEncoder encoder; + SEncoder encoder = {0}; tEncoderInit(&encoder, abuf, len); tEncodeSMqDataRsp(&encoder, pRsp); + tEncoderClear(&encoder); SRpcMsg rsp = { .info = pMsg->info, @@ -168,8 +169,8 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con }; tmsgSendRsp(&rsp); - char buf1[80]; - char buf2[80]; + char buf1[80] = {0}; + char buf2[80] = {0}; tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset); tqDebug("vgId:%d from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s", diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 5463f42f6af608942e8aa6db18dd05189e401767..5048d86ba60f7a540a305183cf7f81faa4faa467 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -106,7 +106,9 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res); - if (code) goto _err; + if (code) { + goto _err; + } // malloc and encode tEncodeSize(tEncodeDeleteRes, &res, size, ret); @@ -981,6 +983,11 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq SDecoder *pCoder = &(SDecoder){0}; SDeleteRes *pRes = &(SDeleteRes){0}; + pRsp->msgType = TDMT_VND_DELETE_RSP; + pRsp->pCont = NULL; + pRsp->contLen = 0; + pRsp->code = TSDB_CODE_SUCCESS; + pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t)); if (pRes->uidList == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -998,6 +1005,15 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq tDecoderClear(pCoder); taosArrayDestroy(pRes->uidList); + + SVDeleteRsp rsp = {.affectedRows = pRes->affectedRows}; + int32_t ret = 0; + tEncodeSize(tEncodeSVDeleteRsp, &rsp, pRsp->contLen, ret); + pRsp->pCont = rpcMallocCont(pRsp->contLen); + SEncoder ec = {0}; + tEncoderInit(&ec, pRsp->pCont, pRsp->contLen); + tEncodeSVDeleteRsp(&ec, &rsp); + tEncoderClear(&ec); return code; _err: diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 3c56abbd15395d7c1a1f8942e8ea9587a4e1a2ce..c1841bfd21285784b339b735727428cd34e6d8b1 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -90,6 +90,7 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp pRes->uidList = pHandle->pParam->pUidList; pRes->skey = pHandle->pDeleter->deleteTimeRange.skey; pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey; + strcpy(pRes->tableFName, pHandle->pDeleter->tableFName); pRes->affectedRows = *(int64_t*)pColRes->pData; pBuf->useSize += pEntry->dataLen; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index d8cd76d31e236eaeac9bc4fc01f72f1e24c66566..08088fd015031582ba8de2722578e04eed4c48c1 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -347,6 +347,9 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, } code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam); + if(code != TSDB_CODE_SUCCESS){ + taosMemoryFreeClear(pSinkParam); + } } _error: diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index ebccb7950cf537d255881a81443e5723873a1ab3..c58eb76f8cd9b2df604a6aca9e19e37179ea8702 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -279,7 +279,7 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes pRes->skey = pDelRes->skey; pRes->ekey = pDelRes->ekey; pRes->affectedRows = pDelRes->affectedRows; - + strcpy(pRes->tableFName, pDelRes->tableFName); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index b794cb91f549964000cd506a7f766151d4d285da..67050241e36c688414cca38cf2f033cc4c58b1c7 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -230,6 +230,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa SVDeleteRsp rsp = {0}; tDecoderInit(&coder, msg, msgSize); tDecodeSVDeleteRsp(&coder, &rsp); + tDecoderClear(&coder); atomic_add_fetch_32(&pJob->resNumOfRows, rsp.affectedRows); SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows); diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 6a18263d502addf2515ac2939e07bb6920c0b4b1..d39ade7e91495d2b3ff1924efdb78103d7b423cc 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -630,7 +630,7 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn { tmq_raw_data raw = {0}; - int32_t code = tmq_get_raw_meta(msg, &raw); + int32_t code = tmq_get_raw(msg, &raw); if(code == TSDB_CODE_SUCCESS){ int retCode = queryDB(pInfo->taos, "use metadb"); @@ -641,7 +641,7 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn } taosFprintfFile(g_fp, "raw:%p\n", &raw); - taos_write_raw_meta(pInfo->taos, raw); + tmq_write_raw(pInfo->taos, raw); } char* result = tmq_get_json_meta(msg);