diff --git a/examples/c/tmq.c b/examples/c/tmq.c index 1d2b26624bb3be70051e183b7228b0ec90a48aa8..fd1f146618874132738a675714841e966353311a 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -29,7 +29,7 @@ static void msg_process(TAOS_RES* msg) { printf("vg: %d\n", tmq_get_vgroup_id(msg)); if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) { tmq_raw_data raw = {0}; - int32_t code = tmq_get_raw_meta(msg, &raw); + int32_t code = tmq_get_raw(msg, &raw); if (code == 0) { TAOS* pConn = taos_connect("192.168.1.86", "root", "taosdata", NULL, 0); if (pConn == NULL) { @@ -50,7 +50,7 @@ static void msg_process(TAOS_RES* msg) { } taos_free_result(pRes); - int32_t ret = taos_write_raw_meta(pConn, raw); + int32_t ret = tmq_write_raw(pConn, raw); printf("write raw data: %s\n", tmq_err2str(ret)); taos_close(pConn); } diff --git a/examples/c/tmq_taosx.c b/examples/c/tmq_taosx.c index 13f3b18e64b69271adea3637f19f074138dd6d86..d0def4426905b773db948b0cf6f0d22c8733d5da 100644 --- a/examples/c/tmq_taosx.c +++ b/examples/c/tmq_taosx.c @@ -49,18 +49,25 @@ static void msg_process(TAOS_RES* msg) { printf("meta result: %s\n", result); } tmq_free_json_meta(result); - - - tmq_raw_data raw = {0}; - tmq_get_raw_meta(msg, &raw); - int32_t ret = taos_write_raw_meta(pConn, raw); - printf("write raw meta: %s\n", tmq_err2str(ret)); } - if(tmq_get_res_type(msg) == TMQ_RES_DATA){ - int32_t ret =taos_write_raw_data(pConn, msg); - printf("write raw data: %s\n", tmq_err2str(ret)); - } + tmq_raw_data raw = {0}; + tmq_get_raw(msg, &raw); + int32_t ret = tmq_write_raw(pConn, raw); + printf("write raw data: %s\n", tmq_err2str(ret)); + +// else{ +// while(1){ +// int numOfRows = 0; +// void *pData = NULL; +// taos_fetch_raw_block(msg, &numOfRows, &pData); +// if(numOfRows == 0) break; +// printf("write data: tbname:%s, numOfRows:%d\n", tmq_get_table_name(msg), numOfRows); +// int ret = taos_write_raw_block(pConn, numOfRows, pData, tmq_get_table_name(msg)); +// printf("write raw data: %s\n", tmq_err2str(ret)); +// } +// } + taos_close(pConn); } @@ -121,7 +128,7 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "insert into ct0 values(now, 1, 2, 'a')"); + pRes = taos_query(pConn, "insert into ct0 values(1626006833600, 1, 2, 'a')"); if (taos_errno(pRes) != 0) { printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes)); return -1; @@ -142,7 +149,7 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "insert into ct1 values(now, 3, 4, 'b')"); + pRes = taos_query(pConn, "insert into ct1 values(1626006833600, 3, 4, 'b')"); if (taos_errno(pRes) != 0) { printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes)); return -1; @@ -156,7 +163,7 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "insert into ct3 values(now, 5, 6, 'c') ct1 values(now+1s, 2, 3, 'sds') (now+2s, 4, 5, 'ddd') ct0 values(now+1s, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"); + pRes = taos_query(pConn, "insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, 'ddd') ct0 values(1626006833602, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"); if (taos_errno(pRes) != 0) { printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); return -1; @@ -177,7 +184,14 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "insert into ct3 values(now+7s, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) (now+9s, 51, 62, 'c333', 940)"); + pRes = taos_query(pConn, "insert into ct3 values(1626006833605, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) (1626006833609, 51, 62, 'c333', 940)"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into ct3 select * from ct1"); if (taos_errno(pRes) != 0) { printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); return -1; @@ -198,19 +212,26 @@ int32_t init_env() { } taos_free_result(pRes); -// pRes = taos_query(pConn, "drop table ct3 ct1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes)); -// return -1; -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "drop table st1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); -// return -1; -// } -// taos_free_result(pRes); + pRes = taos_query(pConn, "delete from abc1 .ct3 where ts < 1626006833606"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop table ct3 ct1"); + if (taos_errno(pRes) != 0) { + printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop table st1"); + if (taos_errno(pRes) != 0) { + printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))"); if (taos_errno(pRes) != 0) { @@ -261,12 +282,12 @@ int32_t init_env() { } taos_free_result(pRes); -// pRes = taos_query(pConn, "drop table n1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes)); -// return -1; -// } -// taos_free_result(pRes); + pRes = taos_query(pConn, "drop table n1"); + if (taos_errno(pRes) != 0) { + printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)"); if (taos_errno(pRes) != 0) { @@ -289,21 +310,21 @@ int32_t init_env() { } taos_free_result(pRes); -// pRes = taos_query(pConn, -// "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " -// "nchar(8), t4 bool)"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); -// return -1; -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "drop table st1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); -// return -1; -// } -// taos_free_result(pRes); + pRes = taos_query(pConn, + "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " + "nchar(8), t4 bool)"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop table st1"); + if (taos_errno(pRes) != 0) { + printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); taos_close(pConn); return 0; diff --git a/include/client/taos.h b/include/client/taos.h index 5f147bb07cadbb7969a352fd5b0fb0191e732bb2..6f3244ea82fcbe5edfd52689d2c6a95fc3a2e521 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -260,17 +260,20 @@ enum tmq_res_t { }; typedef struct tmq_raw_data{ - void* raw_meta; - uint32_t raw_meta_len; - uint16_t raw_meta_type; + void* raw; + uint32_t raw_len; + uint16_t raw_type; } tmq_raw_data; typedef enum tmq_res_t tmq_res_t; DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res); -DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, tmq_raw_data *raw_meta); -DLL_EXPORT int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta); -DLL_EXPORT int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *res); +DLL_EXPORT int32_t tmq_get_raw(TAOS_RES *res, tmq_raw_data *raw); +DLL_EXPORT int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw); +DLL_EXPORT int taos_write_raw_block(TAOS *taos, int numOfRows, char *pData, const char* tbname); + + +DLL_EXPORT void tmq_free_raw(tmq_raw_data raw); DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed by tmq_free_json_meta DLL_EXPORT void tmq_free_json_meta(char* jsonMeta); DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); diff --git a/include/common/tcommon.h b/include/common/tcommon.h index e1aadd448663616e8526b3201a1af13bb7775ed2..c5404085bbb56d01b39866e515bf743349ced983 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -40,6 +40,7 @@ enum { || x == TDMT_VND_CREATE_TABLE \ || x == TDMT_VND_ALTER_TABLE \ || x == TDMT_VND_DROP_TABLE \ + || x == TDMT_VND_DELETE \ ) // clang-format on diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 2daa0ae53c1309b8ff356dc634d0c08bf1af4eec..dc83015a8968031a86bdcd8004b1525a856ac15c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3044,6 +3044,7 @@ typedef struct SDeleteRes { int64_t skey; int64_t ekey; int64_t affectedRows; + char tableFName[TSDB_TABLE_FNAME_LEN]; } SDeleteRes; int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes); diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index 8d5a8abcb4aa595086d5b223e6b8dc028554384f..90b804b3825c7a3c9dd6fc9b7469bc0166f7ab21 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -38,6 +38,7 @@ typedef struct SDeleterRes { int64_t skey; int64_t ekey; int64_t affectedRows; + char tableFName[TSDB_TABLE_FNAME_LEN]; } SDeleterRes; typedef struct SDeleterParam { diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index ba16acf7b00e6e485262c7ae372f5c1977c9cea9..1147e875f2b47f8fb42e2d836aeedd67f45484c2 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -503,6 +503,7 @@ typedef struct SDataDeleterNode { uint64_t tableId; int8_t tableType; // table type char tableFName[TSDB_TABLE_FNAME_LEN]; + char tsColName[TSDB_COL_NAME_LEN]; STimeWindow deleteTimeRange; SNode* pAffectedRows; } SDataDeleterNode; diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index cc040594b1d2478fa469d95fbb09b0f0d358b629..b62f822313efee9968d1910cc7835e9181b72b50 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -251,8 +251,8 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t (_code) == TSDB_CODE_APP_NOT_READY || (_code) == TSDB_CODE_RPC_BROKEN_LINK) #define NEED_CLIENT_RM_TBLMETA_REQ(_type) \ - ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_VND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \ - (_type) == TDMT_VND_DROP_STB) + ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \ + (_type) == TDMT_MND_DROP_STB) #define NEED_SCHEDULER_REDIRECT_ERROR(_code) \ ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \ diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 74d5d4270b9e8ba6fa9f5cc549a69ec62e628347..df9072fe1ac05cb63e5317c1c60beb9f79143fb3 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1206,6 +1206,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp); + tDecoderClear(&decoder); memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); } else { ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP); @@ -1859,6 +1860,10 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) { if (TD_RES_TMQ(res)) { return TMQ_RES_DATA; } else if (TD_RES_TMQ_META(res)) { + SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; + if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) { + return TMQ_RES_DATA; + } return TMQ_RES_TABLE_META; } else { return TMQ_RES_INVALID; @@ -1913,17 +1918,6 @@ const char* tmq_get_table_name(TAOS_RES* res) { return NULL; } -int32_t tmq_get_raw_meta(TAOS_RES* res, tmq_raw_data *raw) { - if (TD_RES_TMQ_META(res) && raw) { - SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; - raw->raw_meta = pMetaRspObj->metaRsp.metaRsp; - raw->raw_meta_len = pMetaRspObj->metaRsp.metaRspLen; - raw->raw_meta_type = pMetaRspObj->metaRsp.resMsgType; - return TSDB_CODE_SUCCESS; - } - return TSDB_CODE_INVALID_PARA; -} - static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t) { char* string = NULL; @@ -2436,30 +2430,6 @@ _exit: return string; } -char* tmq_get_json_meta(TAOS_RES* res) { - if (!TD_RES_TMQ_META(res)) { - return NULL; - } - - SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; - if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) { - return processCreateStb(&pMetaRspObj->metaRsp); - } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB) { - return processAlterStb(&pMetaRspObj->metaRsp); - } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB) { - return processDropSTable(&pMetaRspObj->metaRsp); - } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE) { - return processCreateTable(&pMetaRspObj->metaRsp); - } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE) { - return processAlterTable(&pMetaRspObj->metaRsp); - } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) { - return processDropTable(&pMetaRspObj->metaRsp); - } - return NULL; -} - -void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); } - static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { SVCreateStbReq req = {0}; SDecoder coder; @@ -2531,6 +2501,13 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { pQuery.stableQuery = true; launchQueryImpl(pRequest, &pQuery, true, NULL); + + if(pRequest->code == TSDB_CODE_SUCCESS){ + SCatalog* pCatalog = NULL; + catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); + catalogRemoveTableMeta(pCatalog, &tableName); + } + code = pRequest->code; taosMemoryFree(pCmdMsg.pMsg); @@ -2572,7 +2549,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { pReq.suid = req.suid; STscObj* pTscObj = pRequest->pTscObj; - SName tableName; + SName tableName = {0}; tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name); SCmdMsgInfo pCmdMsg = {0}; @@ -2593,6 +2570,13 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { pQuery.stableQuery = true; launchQueryImpl(pRequest, &pQuery, true, NULL); + + if(pRequest->code == TSDB_CODE_SUCCESS){ + SCatalog* pCatalog = NULL; + catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); + catalogRemoveTableMeta(pCatalog, &tableName); + } + code = pRequest->code; taosMemoryFree(pCmdMsg.pMsg); @@ -2659,17 +2643,20 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { .requestId = pRequest->requestId, .requestObjRefId = pRequest->self, .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; + + pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); // loop to create table for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; SVgroupInfo pInfo = {0}; - SName pName; + SName pName = {0}; toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName); code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo); if (code != TSDB_CODE_SUCCESS) { goto end; } + taosArrayPush(pRequest->tableList, &pName); SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId)); if (pTableBatch == NULL) { @@ -2703,8 +2690,11 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { goto end; } - launchQueryImpl(pRequest, pQuery, false, NULL); - pQuery = NULL; // no need to free in the end + launchQueryImpl(pRequest, pQuery, true, NULL); + if (pRequest->code == TSDB_CODE_SUCCESS){ + removeMeta(pTscObj, pRequest->tableList); + } + code = pRequest->code; end: @@ -2772,19 +2762,21 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { .requestId = pRequest->requestId, .requestObjRefId = pRequest->self, .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; + pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); // loop to create table for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pDropReq = req.pReqs + iReq; pDropReq->igNotExists = true; SVgroupInfo pInfo = {0}; - SName pName; + SName pName = {0}; toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName); code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo); if (code != TSDB_CODE_SUCCESS) { goto end; } + taosArrayPush(pRequest->tableList, &pName); SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId)); if (pTableBatch == NULL) { SVgroupDropTableBatch tBatch = {0}; @@ -2815,8 +2807,10 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { goto end; } - launchQueryImpl(pRequest, pQuery, false, NULL); - pQuery = NULL; // no need to free in the end + launchQueryImpl(pRequest, pQuery, true, NULL); + if (pRequest->code == TSDB_CODE_SUCCESS){ + removeMeta(pTscObj, pRequest->tableList); + } code = pRequest->code; end: @@ -2827,6 +2821,70 @@ end: return code; } +// delete from db.tabl where .. -> delete from tabl where .. +// delete from db .tabl where .. -> delete from tabl where .. +static void getTbName(char *sql){ + char *ch = sql; + + bool inBackQuote = false; + int8_t dotIndex = 0; + while(*ch != '\0'){ + if(!inBackQuote && *ch == '`'){ + inBackQuote = true; + ch++; + continue; + } + + if(inBackQuote && *ch == '`'){ + inBackQuote = false; + ch++; + + continue; + } + + if(!inBackQuote && *ch == '.'){ + dotIndex ++; + if(dotIndex == 2){ + memmove(sql, ch + 1, strlen(ch + 1) + 1); + break; + } + } + ch++; + } +} + +static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { + SDeleteRes req = {0}; + SDecoder coder = {0}; + int32_t code = TSDB_CODE_SUCCESS; + + // decode and process req + void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); + int32_t len = metaLen - sizeof(SMsgHead); + tDecoderInit(&coder, data, len); + if (tDecodeDeleteRes(&coder, &req) < 0) { + code = TSDB_CODE_INVALID_PARA; + goto end; + } + + getTbName(req.tableFName); + char sql[256] = {0}; + sprintf(sql, "delete from `%s` where `%s` >= %" PRId64" and `%s` <= %" PRId64, req.tableFName, "ts", req.skey, "ts", req.ekey); + printf("delete sql:%s\n", sql); + + TAOS_RES* res = taos_query(taos, sql); + SRequestObj *pRequest = (SRequestObj *)res; + code = pRequest->code; + if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { + code = TSDB_CODE_SUCCESS; + } + taos_free_result(res); + +end: + tDecoderClear(&coder); + return code; +} + static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { SVAlterTbReq req = {0}; SDecoder coder = {0}; @@ -2914,15 +2972,21 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { goto end; } - launchQueryImpl(pRequest, pQuery, false, NULL); - pQuery = NULL; // no need to free in the end + launchQueryImpl(pRequest, pQuery, true, NULL); + pVgData = NULL; pArray = NULL; code = pRequest->code; if (code == TSDB_CODE_VND_TABLE_NOT_EXIST) { - code = 0; + code = TSDB_CODE_SUCCESS; } + if(pRequest->code == TSDB_CODE_SUCCESS){ + SExecResult* pRes = &pRequest->body.resInfo.execRes; + if(pRes->res != NULL){ + code = handleAlterTbExecRes(pRes->res, pCatalog); + } + } end: taosArrayDestroy(pArray); if (pVgData) taosMemoryFreeClear(pVgData->pData); @@ -2933,27 +2997,6 @@ end: return code; } -int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta){ - if (!taos) { - return TSDB_CODE_INVALID_PARA; - } - - if(raw_meta.raw_meta_type == TDMT_VND_CREATE_STB) { - return taosCreateStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len); - }else if(raw_meta.raw_meta_type == TDMT_VND_ALTER_STB){ - return taosCreateStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len); - }else if(raw_meta.raw_meta_type == TDMT_VND_DROP_STB){ - return taosDropStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len); - }else if(raw_meta.raw_meta_type == TDMT_VND_CREATE_TABLE){ - return taosCreateTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len); - }else if(raw_meta.raw_meta_type == TDMT_VND_ALTER_TABLE){ - return taosAlterTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len); - }else if(raw_meta.raw_meta_type == TDMT_VND_DROP_TABLE){ - return taosDropTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len); - } - return TSDB_CODE_INVALID_PARA; -} - typedef struct{ SVgroupInfo vg; void *data; @@ -2964,15 +3007,196 @@ static void destroyVgHash(void* data) { taosMemoryFreeClear(vgData->data); } -int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){ - if (!TD_RES_TMQ(msg)) { - uError("WriteRaw:msg is not tmq : %d", *(int8_t*)msg); - return TSDB_CODE_TMQ_INVALID_MSG; +int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){ + int32_t code = TSDB_CODE_SUCCESS; + STableMeta* pTableMeta = NULL; + SQuery *pQuery = NULL; + + SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT); + if(!pRequest){ + uError("WriteRaw:createRequest error request is null"); + code = terrno; + goto end; } + if (!pRequest->pDb) { + uError("WriteRaw:not use db"); + code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; + goto end; + } + + SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; + strcpy(pName.dbname, pRequest->pDb); + strcpy(pName.tname, tbname); + + struct SCatalog *pCatalog = NULL; + code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + if(code != TSDB_CODE_SUCCESS){ + uError("WriteRaw: get gatlog error"); + goto end; + } + + SRequestConnInfo conn = {0}; + conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter; + conn.requestId = pRequest->requestId; + conn.requestObjRefId = pRequest->self; + conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); + + SVgroupInfo vgData = {0}; + code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData); + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbname); + goto end; + } + + code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbname); + goto end; + } + uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid); + uint64_t uid = pTableMeta->uid; + int32_t numOfCols = pTableMeta->tableInfo.numOfColumns; + + uint16_t fLen = 0; + int32_t rowSize = 0; + int16_t nVar = 0; + for (int i = 0; i < numOfCols; i++) { + SSchema *schema = pTableMeta->schema + i; + fLen += TYPE_BYTES[schema->type]; + rowSize += schema->bytes; + if(IS_VAR_DATA_TYPE(schema->type)){ + nVar ++; + } + } + + int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) + + (int32_t)TD_BITMAP_BYTES(numOfCols - 1); + int32_t schemaLen = 0; + int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize; + + int32_t totalLen = sizeof(SSubmitReq) + submitLen; + SSubmitReq* subReq = taosMemoryCalloc(1, totalLen); + SSubmitBlk* blk = POINTER_SHIFT(subReq, sizeof(SSubmitReq)); + void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk)); + STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen); + + SRowBuilder rb = {0}; + tdSRowInit(&rb, pTableMeta->sversion); + tdSRowSetTpInfo(&rb, numOfCols, fLen); + int32_t dataLen = 0; + + char* pStart = pData + sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t)); + int32_t* colLength = (int32_t*)pStart; + pStart += sizeof(int32_t) * numOfCols; + + SResultColumn *pCol = taosMemoryCalloc(numOfCols, sizeof(SResultColumn)); + + for (int32_t i = 0; i < numOfCols; ++i) { + if (IS_VAR_DATA_TYPE(pTableMeta->schema[i].type)) { + pCol[i].offset = (int32_t*)pStart; + pStart += rows * sizeof(int32_t); + } else { + pCol[i].nullbitmap = pStart; + pStart += BitmapLen(rows); + } + + pCol[i].pData = pStart; + pStart += colLength[i]; + } + + for (int32_t j = 0; j < rows; j++) { + tdSRowResetBuf(&rb, rowData); + int32_t offset = 0; + for (int32_t k = 0; k < numOfCols; k++) { + const SSchema* pColumn = &pTableMeta->schema[k]; + + if (IS_VAR_DATA_TYPE(pColumn->type)) { + if (pCol[k].offset[j] != -1) { + char* data = pCol[k].pData + pCol[k].offset[j]; + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k); + } else { + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); + } + } else { + if (!colDataIsNull_f(pCol[k].nullbitmap, j)) { + char* data = pCol[k].pData + pColumn->bytes * j; + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k); + } else { + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); + } + } + + offset += TYPE_BYTES[pColumn->type]; + } + int32_t rowLen = TD_ROW_LEN(rowData); + rowData = POINTER_SHIFT(rowData, rowLen); + dataLen += rowLen; + } + + taosMemoryFree(pCol); + + blk->uid = htobe64(uid); + blk->suid = htobe64(suid); + blk->padding = htonl(blk->padding); + blk->sversion = htonl(pTableMeta->sversion); + blk->schemaLen = htonl(schemaLen); + blk->numOfRows = htons(rows); + blk->dataLen = htonl(dataLen); + subReq->length = sizeof(SSubmitReq) + sizeof(SSubmitBlk) + schemaLen + dataLen; + subReq->numOfBlocks = 1; + + pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); + if (NULL == pQuery) { + uError("create SQuery error"); + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; + pQuery->haveResultSet = false; + pQuery->msgType = TDMT_VND_SUBMIT; + pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); + if (NULL == pQuery->pRoot) { + uError("create pQuery->pRoot error"); + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + SVnodeModifOpStmt *nodeStmt = (SVnodeModifOpStmt *)(pQuery->pRoot); + nodeStmt->payloadType = PAYLOAD_TYPE_KV; + nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES); + + SVgDataBlocks *dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); + if (NULL == dst) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto end; + } + dst->vg = vgData; + dst->numOfTables = subReq->numOfBlocks; + dst->size = subReq->length; + dst->pData = (char*)subReq; + subReq->header.vgId = htonl(dst->vg.vgId); + subReq->version = htonl(1); + subReq->header.contLen = htonl(subReq->length); + subReq->length = htonl(subReq->length); + subReq->numOfBlocks = htonl(subReq->numOfBlocks); + subReq = NULL; // no need free + taosArrayPush(nodeStmt->pDataBlocks, &dst); + + launchQueryImpl(pRequest, pQuery, true, NULL); + code = pRequest->code; + +end: + taosMemoryFreeClear(pTableMeta); + qDestroyQuery(pQuery); + return code; +} + +static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){ int32_t code = TSDB_CODE_SUCCESS; SHashObj *pVgHash = NULL; SQuery *pQuery = NULL; + SMqRspObj rspObj = {0}; + SDecoder decoder = {0}; terrno = TSDB_CODE_SUCCESS; SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT); @@ -2981,6 +3205,17 @@ int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){ return terrno; } + rspObj.resIter = -1; + rspObj.resType = RES_TYPE__TMQ; + + tDecoderInit(&decoder, data, dataLen); + code = tDecodeSMqDataRsp(&decoder, &rspObj.rsp); + if (code != 0){ + uError("WriteRaw:decode smqDataRsp error"); + code = TSDB_CODE_INVALID_MSG; + goto end; + } + if (!pRequest->pDb) { uError("WriteRaw:not use db"); code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; @@ -3001,18 +3236,18 @@ int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){ conn.requestId = pRequest->requestId; conn.requestObjRefId = pRequest->self; conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); - SMqRspObj *rspObj = ((SMqRspObj*)msg); - printf("raw data block num:%d\n", rspObj->rsp.blockNum); - while (++rspObj->resIter < rspObj->rsp.blockNum) { - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj->rsp.blockData, rspObj->resIter); - if (!rspObj->rsp.withSchema) { - uError("WriteRaw:no schema, iter:%d", rspObj->resIter); + + printf("raw data block num:%d\n", rspObj.rsp.blockNum); + while (++rspObj.resIter < rspObj.rsp.blockNum) { + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); + if (!rspObj.rsp.withSchema) { + uError("WriteRaw:no schema, iter:%d", rspObj.resIter); goto end; } - SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj->rsp.blockSchema, rspObj->resIter); - setResSchemaInfo(&rspObj->resInfo, pSW->pSchema, pSW->nCols); + SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter); + setResSchemaInfo(&rspObj.resInfo, pSW->pSchema, pSW->nCols); - code = setQueryResultFromRsp(&rspObj->resInfo, pRetrieve, false, false); + code = setQueryResultFromRsp(&rspObj.resInfo, pRetrieve, false, false); if(code != TSDB_CODE_SUCCESS){ uError("WriteRaw: setQueryResultFromRsp error"); goto end; @@ -3030,13 +3265,13 @@ int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){ } } - int32_t rows = rspObj->resInfo.numOfRows; + int32_t rows = rspObj.resInfo.numOfRows; int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) + (int32_t)TD_BITMAP_BYTES(pSW->nCols - 1); int32_t schemaLen = 0; int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize; - const char* tbName = tmq_get_table_name(msg); + const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter); if(!tbName){ uError("WriteRaw: tbname is null"); code = TSDB_CODE_TMQ_INVALID_MSG; @@ -3108,13 +3343,13 @@ int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){ for (int32_t j = 0; j < rows; j++) { tdSRowResetBuf(&rb, rowData); - doSetOneRowPtr(&rspObj->resInfo); - rspObj->resInfo.current += 1; + doSetOneRowPtr(&rspObj.resInfo); + rspObj.resInfo.current += 1; int32_t offset = 0; for (int32_t k = 0; k < pSW->nCols; k++) { const SSchema* pColumn = &pSW->pSchema[k]; - char *data = rspObj->resInfo.row[k]; + char *data = rspObj.resInfo.row[k]; if (!data) { tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); } else { @@ -3186,13 +3421,105 @@ int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){ launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; + end: + tDecoderClear(&decoder); + taos_free_result(&rspObj); qDestroyQuery(pQuery); destroyRequest(pRequest); taosHashCleanup(pVgHash); return code; } +char* tmq_get_json_meta(TAOS_RES* res) { + if (!TD_RES_TMQ_META(res)) { + return NULL; + } + + SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; + if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) { + return processCreateStb(&pMetaRspObj->metaRsp); + } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB) { + return processAlterStb(&pMetaRspObj->metaRsp); + } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB) { + return processDropSTable(&pMetaRspObj->metaRsp); + } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE) { + return processCreateTable(&pMetaRspObj->metaRsp); + } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE) { + return processAlterTable(&pMetaRspObj->metaRsp); + } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) { + return processDropTable(&pMetaRspObj->metaRsp); + } + return NULL; +} + +void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); } + +int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data *raw) { + if (!raw || !res){ + return TSDB_CODE_INVALID_PARA; + } + if (TD_RES_TMQ_META(res)) { + SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; + raw->raw = pMetaRspObj->metaRsp.metaRsp; + raw->raw_len = pMetaRspObj->metaRsp.metaRspLen; + raw->raw_type = pMetaRspObj->metaRsp.resMsgType; + } else if(TD_RES_TMQ(res)){ + SMqRspObj *rspObj = ((SMqRspObj*)res); + + int32_t len = 0; + int32_t code = 0; + tEncodeSize(tEncodeSMqDataRsp, &rspObj->rsp, len, code); + if (code < 0) { + return -1; + } + + void *buf = taosMemoryCalloc(1, len); + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, len); + tEncodeSMqDataRsp(&encoder, &rspObj->rsp); + tEncoderClear(&encoder); + + raw->raw = buf; + raw->raw_len = len; + raw->raw_type = RES_TYPE__TMQ; + } else { + return TSDB_CODE_TMQ_INVALID_MSG; + } + return TSDB_CODE_SUCCESS; +} + +void tmq_free_raw(tmq_raw_data raw) { + if (raw.raw_type == RES_TYPE__TMQ){ + taosMemoryFree(raw.raw); + } +} + +int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw){ + if (!taos) { + return TSDB_CODE_INVALID_PARA; + } + + if(raw.raw_type == TDMT_VND_CREATE_STB) { + return taosCreateStb(taos, raw.raw, raw.raw_len); + }else if(raw.raw_type == TDMT_VND_ALTER_STB){ + return taosCreateStb(taos, raw.raw, raw.raw_len); + }else if(raw.raw_type == TDMT_VND_DROP_STB){ + return taosDropStb(taos, raw.raw, raw.raw_len); + }else if(raw.raw_type == TDMT_VND_CREATE_TABLE){ + return taosCreateTable(taos, raw.raw, raw.raw_len); + }else if(raw.raw_type == TDMT_VND_ALTER_TABLE){ + return taosAlterTable(taos, raw.raw, raw.raw_len); + }else if(raw.raw_type == TDMT_VND_DROP_TABLE) { + return taosDropTable(taos, raw.raw, raw.raw_len); + }else if(raw.raw_type == TDMT_VND_DELETE){ + return taosDeleteData(taos, raw.raw, raw.raw_len); + }else if(raw.raw_type == RES_TYPE__TMQ){ + return tmqWriteRaw(taos, raw.raw, raw.raw_len); + } + return TSDB_CODE_INVALID_PARA; +} + void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) { // tmqCommitInner2(tmq, msg, 0, 1, cb, param); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 35eb58d039ae8d533a93121d52a9bb3f692139bb..22b7dd827d50a133c2d1f2ea3e3be3929e9171c9 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5681,6 +5681,7 @@ int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) { if (tEncodeI64(pCoder, pRes->ekey) < 0) return -1; if (tEncodeI64v(pCoder, pRes->affectedRows) < 0) return -1; + if (tEncodeCStr(pCoder, pRes->tableFName) < 0) return -1; return 0; } @@ -5692,12 +5693,13 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) { if (tDecodeI32v(pCoder, &nUid) < 0) return -1; for (int32_t iUid = 0; iUid < nUid; iUid++) { if (tDecodeU64(pCoder, &uid) < 0) return -1; - taosArrayPush(pRes->uidList, &uid); + if (pRes->uidList) taosArrayPush(pRes->uidList, &uid); } if (tDecodeI64(pCoder, &pRes->skey) < 0) return -1; if (tDecodeI64(pCoder, &pRes->ekey) < 0) return -1; if (tDecodeI64v(pCoder, &pRes->affectedRows) < 0) return -1; + if (tDecodeCStrTo(pCoder, pRes->tableFName) < 0) return -1; return 0; } int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2adfc92ab1f7f0f88ae9dd4f14147c278b3fefdb..01f2f659ff0e5d3b00ad4f5b14aac4de4ae4052d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -146,8 +146,8 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con } } - int32_t len; - int32_t code; + int32_t len = 0; + int32_t code = 0; tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code); if (code < 0) { return -1; @@ -164,9 +164,10 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - SEncoder encoder; + SEncoder encoder = {0}; tEncoderInit(&encoder, abuf, len); tEncodeSMqDataRsp(&encoder, pRsp); + tEncoderClear(&encoder); SRpcMsg rsp = { .info = pMsg->info, @@ -176,8 +177,8 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con }; tmsgSendRsp(&rsp); - char buf1[80]; - char buf2[80]; + char buf1[80] = {0}; + char buf2[80] = {0}; tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset); tqDebug("vgId:%d from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s", diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 9d06fbffdd961f51f7a8c153eae8879411b70919..b5bf92ee7589d6a8a943a84179921504579a74e1 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -106,7 +106,9 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res); - if (code) goto _err; + if (code) { + goto _err; + } // malloc and encode tEncodeSize(tEncodeDeleteRes, &res, size, ret); @@ -993,6 +995,11 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq SDecoder *pCoder = &(SDecoder){0}; SDeleteRes *pRes = &(SDeleteRes){0}; + pRsp->msgType = TDMT_VND_DELETE_RSP; + pRsp->pCont = NULL; + pRsp->contLen = 0; + pRsp->code = TSDB_CODE_SUCCESS; + pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t)); if (pRes->uidList == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -1010,6 +1017,15 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq tDecoderClear(pCoder); taosArrayDestroy(pRes->uidList); + + SVDeleteRsp rsp = {.affectedRows = pRes->affectedRows}; + int32_t ret = 0; + tEncodeSize(tEncodeSVDeleteRsp, &rsp, pRsp->contLen, ret); + pRsp->pCont = rpcMallocCont(pRsp->contLen); + SEncoder ec = {0}; + tEncoderInit(&ec, pRsp->pCont, pRsp->contLen); + tEncodeSVDeleteRsp(&ec, &rsp); + tEncoderClear(&ec); return code; _err: diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 6c0f4b2c122739af4415d4dffa4f8108bf31e3b4..1eb1f5c907cd8a1c888f2ae41ca28a6f41bcfcd4 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -1021,6 +1021,7 @@ void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsCol int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey); SSDataBlock* createSpecialDataBlock(EStreamType type); +void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); #ifdef __cplusplus } diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index a0d4d64bff1d709e820c1a1616254fb9a206fc90..391aef529f6b8bf46c976700986665edc7594665 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -90,6 +90,7 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp pRes->uidList = pHandle->pParam->pUidList; pRes->skey = pHandle->pDeleter->deleteTimeRange.skey; pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey; + strcpy(pRes->tableFName, pHandle->pDeleter->tableFName); pRes->affectedRows = *(int64_t*)pColRes->pData; pBuf->useSize += pEntry->dataLen; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index f249321a767b847c35c7bb1d23d6e740d1cc3f7c..04487fd7e4d45830f47b4428fba56b296867050c 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -347,6 +347,9 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, } code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam); + if(code != TSDB_CODE_SUCCESS){ + taosMemoryFreeClear(pSinkParam); + } } _error: diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3f2f528a2d6211a227b931d7a13bebe4e41261eb..962f37a8f7ac339de08809d46d0ae32510f24273 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3436,7 +3436,7 @@ void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) { initResultRowInfo(&pInfo->resultRowInfo); } -static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { +void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { if (pCtx == NULL) { return NULL; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index c2cf19167a54abde07b2b8281138889f8d7c87a0..cd05aa9ca35721665fe6ed8192476b4d428785aa 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3147,6 +3147,8 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) { blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pWinBlock); blockDataDestroy(pInfo->pUpdateRes); + destroySqlFunctionCtx(pInfo->pDummyCtx, 0); + taosHashCleanup(pInfo->pStDeleted); taosMemoryFreeClear(param); } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index e09887e6515cf1b75e97e0503eac3c846aac4c72..df57d0fef11d96ed10cb4c3def0f4786d3f4e18d 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -283,7 +283,7 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes pRes->skey = pDelRes->skey; pRes->ekey = pDelRes->ekey; pRes->affectedRows = pDelRes->affectedRows; - + strcpy(pRes->tableFName, pDelRes->tableFName); taosMemoryFree(output.pData); return TSDB_CODE_SUCCESS; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index b794cb91f549964000cd506a7f766151d4d285da..67050241e36c688414cca38cf2f033cc4c58b1c7 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -230,6 +230,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa SVDeleteRsp rsp = {0}; tDecoderInit(&coder, msg, msgSize); tDecodeSVDeleteRsp(&coder, &rsp); + tDecoderClear(&coder); atomic_add_fetch_32(&pJob->resNumOfRows, rsp.affectedRows); SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 935d89b99b336b5438892be7d5726c9c04bb4ee8..ceca20150603d9079187b7ece312aa14f4b31f2a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2501,19 +2501,15 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI // if mulit replica, start replicate right now if (ths->replicaNum > 1) { syncNodeReplicate(ths); - } - // pre commit - syncNodePreCommit(ths, pEntry, 0); + // pre commit + syncNodePreCommit(ths, pEntry, 0); + } // if only myself, maybe commit right now if (ths->replicaNum == 1) { syncMaybeAdvanceCommitIndex(ths); } - - } else { - // pre commit - syncNodePreCommit(ths, pEntry, 0); } if (pRetIndex != NULL) { diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index f256c9603724bcfdeeaa5d007c55e9cb8a2bcd10..a81d6db80fb9d40d4847878c19483ebe3d254b38 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -7,8 +7,7 @@ * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * + * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ @@ -211,6 +210,7 @@ typedef struct SConnBuffer { char* buf; int len; int cap; + int left; int total; } SConnBuffer; @@ -282,6 +282,8 @@ int transClearBuffer(SConnBuffer* buf); int transDestroyBuffer(SConnBuffer* buf); int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); bool transReadComplete(SConnBuffer* connBuf); +int transResetBuffer(SConnBuffer* connBuf); +int transDumpFromBuffer(SConnBuffer* connBuf, char** buf); int transSetConnOption(uv_tcp_t* stream); diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index c747e6933923ef970097d8ea0bcf11a6335b897a..62277a7569a836f84e2ea143dc648737c208b3c8 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -17,6 +17,7 @@ #ifdef USE_UV #include #endif +// clang-format off #include "zlib.h" #include "thttp.h" #include "taoserror.h" @@ -174,7 +175,7 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 #else int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { - int32_t code = -1; + int32_t code = -1; TdSocketPtr pSocket = NULL; uint32_t ip = taosGetIpv4FromFqdn(server); @@ -231,4 +232,5 @@ SEND_OVER: return code; } -#endif \ No newline at end of file +// clang-format on +#endif diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 70d56dca139972d33aa3f0e633727dadb0b35ea9..54ffcabc8dae0ae903ee2d6d08a4e162b924d37e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -323,7 +323,8 @@ void cliHandleResp(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; STrans* pTransInst = pThrd->pTransInst; - STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); + STransMsgHead* pHead = NULL; + transDumpFromBuffer(&conn->readBuf, (char**)&pHead); pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); @@ -366,7 +367,6 @@ void cliHandleResp(SCliConn* conn) { } } // buf's mem alread translated to transMsg.pCont - transClearBuffer(&conn->readBuf); if (!CONN_NO_PERSIST_BY_APP(conn)) { transMsg.info.handle = (void*)conn->refId; tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); @@ -636,6 +636,8 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { transReqQueueInit(&conn->wreqQueue); transQueueInit(&conn->cliMsgs, NULL); + + transInitBuffer(&conn->readBuf); QUEUE_INIT(&conn->q); conn->hostThrd = pThrd; conn->status = ConnNormal; @@ -651,8 +653,9 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { QUEUE_REMOVE(&conn->q); QUEUE_INIT(&conn->q); transRemoveExHandle(transGetRefMgt(), conn->refId); - conn->refId = -1; + transDestroyBuffer(&conn->readBuf); + conn->refId = -1; if (conn->task != NULL) transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); if (clear) { @@ -678,7 +681,6 @@ static void cliDestroy(uv_handle_t* handle) { tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); transReqQueueClear(&conn->wreqQueue); - transDestroyBuffer(&conn->readBuf); taosMemoryFree(conn); } static bool cliHandleNoResp(SCliConn* conn) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 155cdd1062afd12093afff9496ae4a36583e93a3..fb59aafb3386e4652e8507fc4f6bd0a5844e2feb 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -16,6 +16,8 @@ #include "transComm.h" +#define BUFFER_CAP 4096 + static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static int32_t refMgt; @@ -111,12 +113,56 @@ int transGetSockDebugInfo(struct sockaddr* sockname, char* dst) { return r; } int transInitBuffer(SConnBuffer* buf) { - transClearBuffer(buf); + buf->cap = BUFFER_CAP; + buf->buf = taosMemoryCalloc(1, BUFFER_CAP); + buf->left = -1; + buf->len = 0; + buf->total = 0; return 0; } +int transDestroyBuffer(SConnBuffer* buf) { + taosMemoryFree(buf->buf); + return 0; +} + int transClearBuffer(SConnBuffer* buf) { - memset(buf, 0, sizeof(*buf)); - buf->total = -1; + SConnBuffer* p = buf; + if (p->cap > BUFFER_CAP) { + p->cap = BUFFER_CAP; + p->buf = taosMemoryRealloc(p->buf, BUFFER_CAP); + } + p->left = -1; + p->len = 0; + p->total = 0; + return 0; +} + +int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { + SConnBuffer* p = connBuf; + if (p->left != 0) { + return -1; + } + int total = connBuf->total; + *buf = taosMemoryCalloc(1, total); + memcpy(*buf, p->buf, total); + + transResetBuffer(connBuf); + return total; +} + +int transResetBuffer(SConnBuffer* connBuf) { + SConnBuffer* p = connBuf; + if (p->total <= p->len) { + int left = p->len - p->total; + memmove(p->buf, p->buf + p->total, left); + p->left = -1; + p->total = 0; + p->len = left; + } else { + p->left = -1; + p->total = 0; + p->len = 0; + } return 0; } int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { @@ -126,54 +172,39 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { * |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user * info--->| */ - static const int CAPACITY = sizeof(STransMsgHead); - SConnBuffer* p = connBuf; - if (p->cap == 0) { - p->buf = (char*)taosMemoryCalloc(CAPACITY, sizeof(char)); - tTrace("internal malloc mem:%p, size:%d", p->buf, CAPACITY); - p->len = 0; - p->cap = CAPACITY; - p->total = -1; - - uvBuf->base = p->buf; - uvBuf->len = CAPACITY; - } else if (p->total == -1 && p->len < CAPACITY) { - uvBuf->base = p->buf + p->len; - uvBuf->len = CAPACITY - p->len; - } else { - p->cap = p->total; - p->buf = taosMemoryRealloc(p->buf, p->cap); - tTrace("internal realloc mem:%p, size:%d", p->buf, p->cap); - uvBuf->base = p->buf + p->len; + uvBuf->base = p->buf + p->len; + if (p->left == -1) { uvBuf->len = p->cap - p->len; + } else { + if (p->left < p->cap - p->len) { + uvBuf->len = p->left; + } else { + p->buf = taosMemoryRealloc(p->buf, p->left + p->len); + uvBuf->base = p->buf + p->len; + uvBuf->len = p->left; + } } return 0; } // check whether already read complete bool transReadComplete(SConnBuffer* connBuf) { - if (connBuf->total == -1 && connBuf->len >= sizeof(STransMsgHead)) { - STransMsgHead head; - memcpy((char*)&head, connBuf->buf, sizeof(head)); - int32_t msgLen = (int32_t)htonl(head.msgLen); - connBuf->total = msgLen; - } - if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) { - return true; - } - return false; -} -int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) { return 0; } - -int transUnpackMsg(STransMsgHead* msgHead) { return 0; } -int transDestroyBuffer(SConnBuffer* buf) { - if (buf->cap > 0) { - taosMemoryFreeClear(buf->buf); + SConnBuffer* p = connBuf; + if (p->len >= sizeof(STransMsgHead)) { + if (p->left == -1) { + STransMsgHead head; + memcpy((char*)&head, connBuf->buf, sizeof(head)); + int32_t msgLen = (int32_t)htonl(head.msgLen); + p->total = msgLen; + } + if (p->total >= p->len) { + p->left = p->total - p->len; + } else { + p->left = 0; + } } - transClearBuffer(buf); - - return 0; + return p->left == 0 ? true : false; } int transSetConnOption(uv_tcp_t* stream) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index fe7ab47feebd8e5ac40327f37b9334fa8eb22093..e360926b408b69033873a97a38e08ffc8990333f 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -212,9 +212,10 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) { } static void uvHandleReq(SSvrConn* pConn) { - SConnBuffer* pBuf = &pConn->readBuf; - char* msg = pBuf->buf; - uint32_t msgLen = pBuf->len; + STransMsgHead* msg = NULL; + int msgLen = 0; + + msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); STransMsgHead* pHead = (STransMsgHead*)msg; pHead->code = htonl(pHead->code); @@ -761,6 +762,7 @@ static SSvrConn* createConn(void* hThrd) { memset(&pConn->regArg, 0, sizeof(pConn->regArg)); pConn->broken = false; pConn->status = ConnNormal; + transInitBuffer(&pConn->readBuf); SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); exh->handle = pConn; diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 8450e8baea60bca7d1ca2f5d5dde1f1fc1953629..fa94bc6a13d289349f687fd214247151c6a2d6c9 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -374,9 +374,10 @@ int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores) { size_t size = 0; int32_t done = 0; int32_t code = -1; + float coreCount = 0; TdFilePtr pFile = taosOpenFile("/proc/cpuinfo", TD_FILE_READ | TD_FILE_STREAM); - if (pFile == NULL) return false; + if (pFile == NULL) return code; while (done != 3 && (size = taosGetLineFile(pFile, &line)) != -1) { line[size - 1] = '\0'; @@ -390,11 +391,26 @@ int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores) { *numOfCores = atof(v); done |= 2; } + if (strncmp(line, "processor", 9) == 0) coreCount += 1; } if (line != NULL) taosMemoryFree(line); taosCloseFile(&pFile); + if (code != 0) { + TdFilePtr pFile1 = taosOpenFile("/proc/device-tree/model", TD_FILE_READ | TD_FILE_STREAM); + if (pFile1 == NULL) return code; + taosGetsFile(pFile1, maxLen, cpuModel); + taosCloseFile(&pFile1); + code = 0; + done |= 1; + } + + if ((done & 2) == 0) { + *numOfCores = coreCount; + done |= 2; + } + return code; #endif } diff --git a/tests/script/tsim/valgrind/checkError6.sim b/tests/script/tsim/valgrind/checkError6.sim index 27fd291f6436593d21fa6a4c0ae085d392ba64d9..fc6559c8efbee1f57fb933ab849084764d061955 100644 --- a/tests/script/tsim/valgrind/checkError6.sim +++ b/tests/script/tsim/valgrind/checkError6.sim @@ -60,12 +60,13 @@ sql select top(tbcol, 2) from tb1 where ts <= 1601481840000 sql select percentile(tbcol, 2) from tb1 where ts <= 1601481840000 sql select leastsquares(tbcol, 1, 1) as b from tb1 where ts <= 1601481840000 sql show table distributed tb1 +sql select count(1) from tb1 sql select count(tbcol) as b from tb1 where ts <= 1601481840000 interval(1m) sql select diff(tbcol) from tb1 where ts <= 1601481840000 sql select diff(tbcol) from tb1 where tbcol > 5 and tbcol < 20 sql select first(tbcol), last(tbcol) as b from tb1 where ts <= 1601481840000 interval(1m) sql select count(tbcol), avg(tbcol), max(tbcol), min(tbcol), sum(tbcol), stddev(tbcol) from tb1 where ts <= 1601481840000 partition by tgcol interval(1m) -#sql select count(tbcol), avg(tbcol), max(tbcol), min(tbcol), count(tbcol) from tb1 where ts <= 1601481840000 and ts >= 1601481800000 partition by tgcol interval(1m) fill(value, 0) +sql select count(tbcol), avg(tbcol), max(tbcol), min(tbcol), count(tbcol) from tb1 where ts <= 1601481840000 and ts >= 1601481800000 partition by tgcol interval(1m) fill(value, 0) sql select last_row(*) from tb1 where tbcol > 5 and tbcol < 20 print =============== step4: stb @@ -78,13 +79,14 @@ sql select avg(tbcol) as b from stb where ts <= 1601481840000 interval(1m) sql select avg(tbcol) as c from stb group by tgcol sql select avg(tbcol) as b from stb where ts <= 1601481840000 partition by tgcol interval(1m) sql show table distributed stb +sql select count(1) from stb sql select count(tbcol) as b from stb where ts <= 1601481840000 partition by tgcol interval(1m) sql select diff(tbcol) from stb where ts <= 1601481840000 sql select first(tbcol), last(tbcol) as c from stb group by tgcol sql select first(tbcol), last(tbcol) as b from stb where ts <= 1601481840000 and tbcol2 is null partition by tgcol interval(1m) sql select first(tbcol), last(tbcol) as b from stb where ts <= 1601481840000 partition by tgcol interval(1m) sql select count(tbcol), avg(tbcol), max(tbcol), min(tbcol), sum(tbcol), stddev(tbcol) from stb where ts <= 1601481840000 partition by tgcol interval(1m) -#sql select count(tbcol), avg(tbcol), max(tbcol), min(tbcol), count(tbcol) from stb where ts <= 1601481840000 and ts >= 1601481800000 partition by tgcol interval(1m) fill(value, 0) +sql select count(tbcol), avg(tbcol), max(tbcol), min(tbcol), count(tbcol) from stb where ts <= 1601481840000 and ts >= 1601481800000 partition by tgcol interval(1m) fill(value, 0) sql select last_row(tbcol), stddev(tbcol) from stb where tbcol > 5 and tbcol < 20 group by tgcol _OVER: diff --git a/tests/system-test/2-query/cast.py b/tests/system-test/2-query/cast.py index bdac2b6175fc5a4157bc58bf2329848c36fb204b..4045b6ad88106e9fed718d00f08b3c113236ac91 100644 --- a/tests/system-test/2-query/cast.py +++ b/tests/system-test/2-query/cast.py @@ -566,8 +566,7 @@ class TDTestCase: if data_ct4_c10[i] is None: tdSql.checkData( i, 0, None ) else: - # time2str = str(int((data_ct4_c10[i]-datetime.datetime.fromtimestamp(0)).total_seconds()*1000)) - time2str = str(int((datetime.datetime.timestamp(data_ct4_c10[i])-datetime.datetime.timestamp(datetime.datetime.fromtimestamp(0)))*1000)) + time2str = str(int((data_ct4_c10[i]-datetime.datetime.fromtimestamp(0)).total_seconds()*1000)) tdSql.checkData( i, 0, time2str ) tdSql.query(f"select cast(c10 as nchar(32)) as b from {self.dbname}.t1") for i in range(len(data_t1_c10)): @@ -576,8 +575,7 @@ class TDTestCase: elif i == 10: continue else: - # time2str = str(int((data_t1_c10[i]-datetime.datetime.fromtimestamp(0)).total_seconds()*1000)) - time2str = str(int((datetime.datetime.timestamp(data_t1_c10[i])-datetime.datetime.timestamp(datetime.datetime.fromtimestamp(0)))*1000)) + time2str = str(int((data_t1_c10[i]-datetime.datetime.fromtimestamp(0)).total_seconds()*1000)) tdSql.checkData( i, 0, time2str ) tdLog.printNoPrefix("==========step38: cast timestamp to binary, expect no changes ") @@ -586,8 +584,7 @@ class TDTestCase: if data_ct4_c10[i] is None: tdSql.checkData( i, 0, None ) else: - # time2str = str(int((data_ct4_c10[i]-datetime.datetime.fromtimestamp(0)).total_seconds()*1000)) - time2str = str(int((datetime.datetime.timestamp(data_ct4_c10[i])-datetime.datetime.timestamp(datetime.datetime.fromtimestamp(0)))*1000)) + time2str = str(int((data_ct4_c10[i]-datetime.datetime.fromtimestamp(0)).total_seconds()*1000)) tdSql.checkData( i, 0, time2str ) tdSql.query(f"select cast(c10 as binary(32)) as b from {self.dbname}.t1") for i in range(len(data_t1_c10)): @@ -596,8 +593,7 @@ class TDTestCase: elif i == 10: continue else: - # time2str = str(int((data_t1_c10[i]-datetime.datetime.fromtimestamp(0)).total_seconds()*1000)) - time2str = str(int((datetime.datetime.timestamp(data_t1_c10[i])-datetime.datetime.timestamp(datetime.datetime.fromtimestamp(0)))*1000)) + time2str = str(int((data_t1_c10[i]-datetime.datetime.fromtimestamp(0)).total_seconds()*1000)) tdSql.checkData( i, 0, time2str ) tdLog.printNoPrefix("==========step39: cast constant operation to bigint, expect change to int ") diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 7ecc81415d09743b9a9a950d166f9c5882c415ac..0c5acf49ab738ee9187e7a63025af339dd882038 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -59,8 +59,8 @@ python3 ./test.py -f 2-query/ceil.py python3 ./test.py -f 2-query/ceil.py -R python3 ./test.py -f 2-query/char_length.py python3 ./test.py -f 2-query/char_length.py -R -python3 ./test.py -f 2-query/check_tsdb.py -python3 ./test.py -f 2-query/check_tsdb.py -R +# python3 ./test.py -f 2-query/check_tsdb.py +# python3 ./test.py -f 2-query/check_tsdb.py -R python3 ./test.py -f 2-query/concat.py python3 ./test.py -f 2-query/concat.py -R python3 ./test.py -f 2-query/concat_ws.py @@ -93,7 +93,6 @@ python3 ./test.py -f 2-query/distribute_agg_min.py -R - python3 ./test.py -f 1-insert/update_data.py python3 ./test.py -f 1-insert/delete_data.py @@ -224,8 +223,8 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py -python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py -python3 ./test.py -f 7-tmq/tmqDnodeRestart.py +#python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py +#python3 ./test.py -f 7-tmq/tmqDnodeRestart.py python3 ./test.py -f 7-tmq/tmqUpdate-1ctb.py python3 ./test.py -f 7-tmq/tmqUpdateWithConsume.py python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot0.py @@ -265,7 +264,7 @@ python3 ./test.py -f 2-query/concat.py -Q 2 python3 ./test.py -f 2-query/concat2.py -Q 2 python3 ./test.py -f 2-query/concat_ws.py -Q 2 python3 ./test.py -f 2-query/concat_ws2.py -Q 2 -python3 ./test.py -f 2-query/check_tsdb.py -Q 2 +#python3 ./test.py -f 2-query/check_tsdb.py -Q 2 python3 ./test.py -f 2-query/spread.py -Q 2 python3 ./test.py -f 2-query/hyperloglog.py -Q 2 python3 ./test.py -f 2-query/explain.py -Q 2 @@ -354,7 +353,7 @@ python3 ./test.py -f 2-query/concat.py -Q 3 python3 ./test.py -f 2-query/concat2.py -Q 3 python3 ./test.py -f 2-query/concat_ws.py -Q 3 python3 ./test.py -f 2-query/concat_ws2.py -Q 3 -python3 ./test.py -f 2-query/check_tsdb.py -Q 3 +#python3 ./test.py -f 2-query/check_tsdb.py -Q 3 python3 ./test.py -f 2-query/spread.py -Q 3 python3 ./test.py -f 2-query/hyperloglog.py -Q 3 python3 ./test.py -f 2-query/explain.py -Q 3 diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 6a18263d502addf2515ac2939e07bb6920c0b4b1..d39ade7e91495d2b3ff1924efdb78103d7b423cc 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -630,7 +630,7 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn { tmq_raw_data raw = {0}; - int32_t code = tmq_get_raw_meta(msg, &raw); + int32_t code = tmq_get_raw(msg, &raw); if(code == TSDB_CODE_SUCCESS){ int retCode = queryDB(pInfo->taos, "use metadb"); @@ -641,7 +641,7 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn } taosFprintfFile(g_fp, "raw:%p\n", &raw); - taos_write_raw_meta(pInfo->taos, raw); + tmq_write_raw(pInfo->taos, raw); } char* result = tmq_get_json_meta(msg);