未验证 提交 69e1edb3 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16898 from taosdata/fix/TD-18740

feat:Complete the automatic table creation function for taosX 
...@@ -254,7 +254,7 @@ enum tmq_res_t { ...@@ -254,7 +254,7 @@ enum tmq_res_t {
TMQ_RES_INVALID = -1, TMQ_RES_INVALID = -1,
TMQ_RES_DATA = 1, TMQ_RES_DATA = 1,
TMQ_RES_TABLE_META = 2, TMQ_RES_TABLE_META = 2,
TMQ_RES_TAOSX = 3, TMQ_RES_METADATA = 3,
}; };
typedef struct tmq_raw_data { typedef struct tmq_raw_data {
......
...@@ -52,7 +52,7 @@ enum { ...@@ -52,7 +52,7 @@ enum {
RES_TYPE__QUERY = 1, RES_TYPE__QUERY = 1,
RES_TYPE__TMQ, RES_TYPE__TMQ,
RES_TYPE__TMQ_META, RES_TYPE__TMQ_META,
RES_TYPE__TAOSX, RES_TYPE__TMQ_METADATA,
}; };
#define SHOW_VARIABLES_RESULT_COLS 2 #define SHOW_VARIABLES_RESULT_COLS 2
...@@ -60,9 +60,9 @@ enum { ...@@ -60,9 +60,9 @@ enum {
#define SHOW_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE) #define SHOW_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE)
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) #define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ || *(int8_t*)res == RES_TYPE__TAOSX) #define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META) #define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META)
#define TD_RES_TMQ_TAOSX(res) (*(int8_t*)res == RES_TYPE__TAOSX) #define TD_RES_TMQ_METADATA(res) (*(int8_t*)res == RES_TYPE__TMQ_METADATA)
typedef struct SAppInstInfo SAppInstInfo; typedef struct SAppInstInfo SAppInstInfo;
......
...@@ -189,6 +189,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, ...@@ -189,6 +189,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
tscError("%d failed to add to request container, reqId:0x%" PRIx64 ", conn:%d, %s", (*pRequest)->self, tscError("%d failed to add to request container, reqId:0x%" PRIx64 ", conn:%d, %s", (*pRequest)->self,
(*pRequest)->requestId, pTscObj->id, sql); (*pRequest)->requestId, pTscObj->id, sql);
taosMemoryFree(param);
destroyRequest(*pRequest); destroyRequest(*pRequest);
*pRequest = NULL; *pRequest = NULL;
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
......
...@@ -148,7 +148,7 @@ int taos_errno(TAOS_RES *res) { ...@@ -148,7 +148,7 @@ int taos_errno(TAOS_RES *res) {
return terrno; return terrno;
} }
if (TD_RES_TMQ(res)) { if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
return 0; return 0;
} }
...@@ -162,7 +162,7 @@ const char *taos_errstr(TAOS_RES *res) { ...@@ -162,7 +162,7 @@ const char *taos_errstr(TAOS_RES *res) {
return (const char *)tstrerror(terrno); return (const char *)tstrerror(terrno);
} }
if (TD_RES_TMQ(res)) { if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
return "success"; return "success";
} }
...@@ -184,7 +184,7 @@ void taos_free_result(TAOS_RES *res) { ...@@ -184,7 +184,7 @@ void taos_free_result(TAOS_RES *res) {
SRequestObj *pRequest = (SRequestObj *)res; SRequestObj *pRequest = (SRequestObj *)res;
tscDebug("0x%" PRIx64 " taos_free_result start to free query", pRequest->requestId); tscDebug("0x%" PRIx64 " taos_free_result start to free query", pRequest->requestId);
destroyRequest(pRequest); destroyRequest(pRequest);
} else if (TD_RES_TMQ_TAOSX(res)) { } else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res; SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res;
if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen); if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen);
...@@ -264,7 +264,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { ...@@ -264,7 +264,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return doFetchRows(pRequest, true, true); return doFetchRows(pRequest, true, true);
#endif #endif
} else if (TD_RES_TMQ(res)) { } else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
SMqRspObj *msg = ((SMqRspObj *)res); SMqRspObj *msg = ((SMqRspObj *)res);
SReqResultInfo *pResultInfo; SReqResultInfo *pResultInfo;
if (msg->resIter == -1) { if (msg->resIter == -1) {
...@@ -437,7 +437,7 @@ const char *taos_data_type(int type) { ...@@ -437,7 +437,7 @@ const char *taos_data_type(int type) {
const char *taos_get_client_info() { return version; } const char *taos_get_client_info() { return version; }
int taos_affected_rows(TAOS_RES *res) { int taos_affected_rows(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res)) { if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) {
return 0; return 0;
} }
...@@ -454,7 +454,7 @@ int taos_result_precision(TAOS_RES *res) { ...@@ -454,7 +454,7 @@ int taos_result_precision(TAOS_RES *res) {
if (TD_RES_QUERY(res)) { if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res; SRequestObj *pRequest = (SRequestObj *)res;
return pRequest->body.resInfo.precision; return pRequest->body.resInfo.precision;
} else if (TD_RES_TMQ(res)) { } else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
SReqResultInfo *info = tmqGetCurResInfo(res); SReqResultInfo *info = tmqGetCurResInfo(res);
return info->precision; return info->precision;
} }
...@@ -487,7 +487,7 @@ int taos_select_db(TAOS *taos, const char *db) { ...@@ -487,7 +487,7 @@ int taos_select_db(TAOS *taos, const char *db) {
} }
void taos_stop_query(TAOS_RES *res) { void taos_stop_query(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res)) { if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) {
return; return;
} }
...@@ -559,7 +559,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { ...@@ -559,7 +559,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
(*rows) = pResultInfo->row; (*rows) = pResultInfo->row;
(*numOfRows) = pResultInfo->numOfRows; (*numOfRows) = pResultInfo->numOfRows;
return pRequest->code; return pRequest->code;
} else if (TD_RES_TMQ(res)) { } else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
SReqResultInfo *pResultInfo = tmqGetNextResInfo(res, true); SReqResultInfo *pResultInfo = tmqGetNextResInfo(res, true);
if (pResultInfo == NULL) return -1; if (pResultInfo == NULL) return -1;
...@@ -578,7 +578,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { ...@@ -578,7 +578,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
return 0; return 0;
} }
if (TD_RES_TMQ(res)) { if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
SReqResultInfo *pResultInfo = tmqGetNextResInfo(res, false); SReqResultInfo *pResultInfo = tmqGetNextResInfo(res, false);
if (pResultInfo == NULL) { if (pResultInfo == NULL) {
(*numOfRows) = 0; (*numOfRows) = 0;
......
...@@ -515,6 +515,10 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm ...@@ -515,6 +515,10 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg; SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
topic = pMetaRspObj->topic; topic = pMetaRspObj->topic;
vgId = pMetaRspObj->vgId; vgId = pMetaRspObj->vgId;
} else if(TD_RES_TMQ_METADATA(msg)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg;
topic = pRspObj->topic;
vgId = pRspObj->vgId;
} else { } else {
return TSDB_CODE_TMQ_INVALID_MSG; return TSDB_CODE_TMQ_INVALID_MSG;
} }
...@@ -1471,16 +1475,16 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { ...@@ -1471,16 +1475,16 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) { SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj)); SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
pRspObj->resType = RES_TYPE__TAOSX; pRspObj->resType = RES_TYPE__TMQ_METADATA;
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
pRspObj->vgId = pWrapper->vgHandle->vgId; pRspObj->vgId = pWrapper->vgHandle->vgId;
pRspObj->resIter = -1; pRspObj->resIter = -1;
memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqTaosxRspObj)); memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.totalRows = 0;
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
if (!pWrapper->dataRsp.withSchema) { if (!pWrapper->taosxRsp.withSchema) {
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols); setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
} }
...@@ -1654,8 +1658,14 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1654,8 +1658,14 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
rspWrapper = NULL; rspWrapper = NULL;
continue; continue;
} }
// build rsp // build rsp
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper); void* pRsp = NULL;
if(pollRspWrapper->taosxRsp.createTableNum == 0){
pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
}else{
pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
}
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
return pRsp; return pRsp;
} else { } else {
...@@ -1775,11 +1785,11 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) { ...@@ -1775,11 +1785,11 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) {
} else if (TD_RES_TMQ_META(res)) { } else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) { if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) {
return TMQ_RES_TAOSX; return TMQ_RES_DATA;
} }
return TMQ_RES_TABLE_META; return TMQ_RES_TABLE_META;
} else if (TD_RES_TMQ_TAOSX(res)) { } else if (TD_RES_TMQ_METADATA(res)) {
return TMQ_RES_DATA; return TMQ_RES_METADATA;
} else { } else {
return TMQ_RES_INVALID; return TMQ_RES_INVALID;
} }
...@@ -1792,6 +1802,9 @@ const char* tmq_get_topic_name(TAOS_RES* res) { ...@@ -1792,6 +1802,9 @@ const char* tmq_get_topic_name(TAOS_RES* res) {
} else if (TD_RES_TMQ_META(res)) { } else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
return strchr(pMetaRspObj->topic, '.') + 1; return strchr(pMetaRspObj->topic, '.') + 1;
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
return strchr(pRspObj->topic, '.') + 1;
} else { } else {
return NULL; return NULL;
} }
...@@ -1804,6 +1817,9 @@ const char* tmq_get_db_name(TAOS_RES* res) { ...@@ -1804,6 +1817,9 @@ const char* tmq_get_db_name(TAOS_RES* res) {
} else if (TD_RES_TMQ_META(res)) { } else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
return strchr(pMetaRspObj->db, '.') + 1; return strchr(pMetaRspObj->db, '.') + 1;
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
return strchr(pRspObj->db, '.') + 1;
} else { } else {
return NULL; return NULL;
} }
...@@ -1816,6 +1832,9 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { ...@@ -1816,6 +1832,9 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
} else if (TD_RES_TMQ_META(res)) { } else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
return pMetaRspObj->vgId; return pMetaRspObj->vgId;
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
return pRspObj->vgId;
} else { } else {
return -1; return -1;
} }
...@@ -1829,7 +1848,14 @@ const char* tmq_get_table_name(TAOS_RES* res) { ...@@ -1829,7 +1848,14 @@ const char* tmq_get_table_name(TAOS_RES* res) {
return NULL; return NULL;
} }
return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter); return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
} } else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
pRspObj->resIter >= pRspObj->rsp.blockNum) {
return NULL;
}
return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
}
return NULL; return NULL;
} }
......
...@@ -6018,12 +6018,18 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) { ...@@ -6018,12 +6018,18 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) {
void tDeleteSTaosxRsp(STaosxRsp *pRsp) { void tDeleteSTaosxRsp(STaosxRsp *pRsp) {
taosArrayDestroy(pRsp->blockDataLen); taosArrayDestroy(pRsp->blockDataLen);
pRsp->blockDataLen = NULL;
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree); taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
pRsp->blockData = NULL;
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper); taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper);
pRsp->blockSchema = NULL;
taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree); taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree);
pRsp->blockTbName = NULL;
taosArrayDestroy(pRsp->createTableLen); taosArrayDestroy(pRsp->createTableLen);
pRsp->createTableLen = NULL;
taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree); taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree);
pRsp->createTableReq = NULL;
} }
int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) { int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) {
......
...@@ -540,7 +540,7 @@ typedef struct { ...@@ -540,7 +540,7 @@ typedef struct {
} SMqConsumerEp; } SMqConsumerEp;
SMqConsumerEp* tCloneSMqConsumerEp(const SMqConsumerEp* pEp); SMqConsumerEp* tCloneSMqConsumerEp(const SMqConsumerEp* pEp);
void tDeleteSMqConsumerEp(SMqConsumerEp* pEp); void tDeleteSMqConsumerEp(void* pEp);
int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pEp); int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pEp);
void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp); void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp);
......
...@@ -197,11 +197,12 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { ...@@ -197,11 +197,12 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg)); SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));
pLostMsg->consumerId = pConsumer->consumerId; pLostMsg->consumerId = pConsumer->consumerId;
SRpcMsg *pRpcMsg = taosMemoryCalloc(1, sizeof(SRpcMsg)); SRpcMsg pRpcMsg = {
pRpcMsg->msgType = TDMT_MND_MQ_CONSUMER_LOST; .msgType = TDMT_MND_MQ_CONSUMER_LOST,
pRpcMsg->pCont = pLostMsg; .pCont = pLostMsg,
pRpcMsg->contLen = sizeof(SMqConsumerLostMsg); .contLen = sizeof(SMqConsumerLostMsg),
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, pRpcMsg); };
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
} }
if (status == MQ_CONSUMER_STATUS__LOST_REBD || status == MQ_CONSUMER_STATUS__READY) { if (status == MQ_CONSUMER_STATUS__LOST_REBD || status == MQ_CONSUMER_STATUS__READY) {
// do nothing // do nothing
...@@ -280,11 +281,12 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { ...@@ -280,11 +281,12 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
pRecoverMsg->consumerId = consumerId; pRecoverMsg->consumerId = consumerId;
SRpcMsg *pRpcMsg = taosMemoryCalloc(1, sizeof(SRpcMsg)); SRpcMsg pRpcMsg = {
pRpcMsg->msgType = TDMT_MND_MQ_CONSUMER_RECOVER; .msgType = TDMT_MND_MQ_CONSUMER_RECOVER,
pRpcMsg->pCont = pRecoverMsg; .pCont = pRecoverMsg,
pRpcMsg->contLen = sizeof(SMqConsumerRecoverMsg); .contLen = sizeof(SMqConsumerRecoverMsg),
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, pRpcMsg); };
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
} }
mndReleaseConsumer(pMnode, pConsumer); mndReleaseConsumer(pMnode, pConsumer);
...@@ -318,11 +320,12 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { ...@@ -318,11 +320,12 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
pRecoverMsg->consumerId = consumerId; pRecoverMsg->consumerId = consumerId;
SRpcMsg *pRpcMsg = taosMemoryCalloc(1, sizeof(SRpcMsg)); SRpcMsg pRpcMsg = {
pRpcMsg->msgType = TDMT_MND_MQ_CONSUMER_RECOVER; .msgType = TDMT_MND_MQ_CONSUMER_RECOVER,
pRpcMsg->pCont = pRecoverMsg; .pCont = pRecoverMsg,
pRpcMsg->contLen = sizeof(SMqConsumerRecoverMsg); .contLen = sizeof(SMqConsumerRecoverMsg),
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, pRpcMsg); };
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
} }
#endif #endif
......
...@@ -338,8 +338,8 @@ SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) { ...@@ -338,8 +338,8 @@ SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
return pConsumerEpNew; return pConsumerEpNew;
} }
void tDeleteSMqConsumerEp(SMqConsumerEp *pConsumerEp) { void tDeleteSMqConsumerEp(void *data) {
// SMqConsumerEp *pConsumerEp = (SMqConsumerEp*)data;
taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp); taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
} }
......
...@@ -184,7 +184,7 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg); ...@@ -184,7 +184,7 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid,
const char* stbFullName, SBatchDeleteReq* pDeleteReq); const char* stbFullName, SBatchDeleteReq* pDeleteReq);
// sma // sma
......
...@@ -319,8 +319,12 @@ _query: ...@@ -319,8 +319,12 @@ _query:
pSchema = tCloneSSchemaWrapper(&meNew.stbEntry.schemaRow); pSchema = tCloneSSchemaWrapper(&meNew.stbEntry.schemaRow);
tDecoderClear(&dcNew); tDecoderClear(&dcNew);
tdbTbcClose(pCur); tdbTbcClose(pCur);
tdbFree(pKey);
tdbFree(pVal);
goto _exit; goto _exit;
} }
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur); tdbTbcClose(pCur);
} }
} else if (me.type == TSDB_CHILD_TABLE) { } else if (me.type == TSDB_CHILD_TABLE) {
...@@ -347,11 +351,13 @@ _query: ...@@ -347,11 +351,13 @@ _query:
tDecoderClear(&dc); tDecoderClear(&dc);
_exit: _exit:
tDecoderClear(&dc);
metaULock(pMeta); metaULock(pMeta);
tdbFree(pData); tdbFree(pData);
return pSchema; return pSchema;
_err: _err:
tDecoderClear(&dc);
metaULock(pMeta); metaULock(pMeta);
tdbFree(pData); tdbFree(pData);
return NULL; return NULL;
...@@ -383,10 +389,8 @@ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList) { ...@@ -383,10 +389,8 @@ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList) {
ttlKey = *(STtlIdxKey *)pKey; ttlKey = *(STtlIdxKey *)pKey;
taosArrayPush(uidList, &ttlKey.uid); taosArrayPush(uidList, &ttlKey.uid);
} }
tdbTbcClose(pCur);
tdbFree(pKey); tdbFree(pKey);
tdbTbcClose(pCur);
return 0; return 0;
} }
......
...@@ -353,6 +353,8 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t ...@@ -353,6 +353,8 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t
metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version, idData->index); metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version, idData->index);
} }
tdbFree(pKey);
tdbFree(pVal);
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }
...@@ -528,6 +530,7 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in ...@@ -528,6 +530,7 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in
} }
} }
} }
taosArrayDestroy(pTagVals);
} }
// SIdInfo* sidInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &me.ctbEntry.suid, sizeof(tb_uid_t)); // SIdInfo* sidInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &me.ctbEntry.suid, sizeof(tb_uid_t));
// if(sidInfo->version >= idInfo->version){ // if(sidInfo->version >= idInfo->version){
......
...@@ -1192,10 +1192,11 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { ...@@ -1192,10 +1192,11 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
const void *pTagData = NULL; // const void *pTagData = NULL; //
int32_t nTagData = 0; int32_t nTagData = 0;
SDecoder dc = {0}; SDecoder dc = {0};
int32_t ret = 0;
// get super table // get super table
if (tdbTbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData) != 0) { if (tdbTbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData) != 0) {
return -1; ret = -1;
goto end;
} }
tbDbKey.uid = pCtbEntry->ctbEntry.suid; tbDbKey.uid = pCtbEntry->ctbEntry.suid;
tbDbKey.version = ((SUidIdxVal *)pData)[0].version; tbDbKey.version = ((SUidIdxVal *)pData)[0].version;
...@@ -1221,17 +1222,20 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { ...@@ -1221,17 +1222,20 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
// nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len; // nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len;
pTagData = pCtbEntry->ctbEntry.pTags; pTagData = pCtbEntry->ctbEntry.pTags;
nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len; nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len;
return metaSaveJsonVarToIdx(pMeta, pCtbEntry, pTagColumn); ret = metaSaveJsonVarToIdx(pMeta, pCtbEntry, pTagColumn);
goto end;
} }
if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type, if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type,
pCtbEntry->uid, &pTagIdxKey, &nTagIdxKey) < 0) { pCtbEntry->uid, &pTagIdxKey, &nTagIdxKey) < 0) {
return -1; ret = -1;
goto end;
} }
tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn); tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn);
end:
metaDestroyTagIdxKey(pTagIdxKey); metaDestroyTagIdxKey(pTagIdxKey);
tDecoderClear(&dc); tDecoderClear(&dc);
tdbFree(pData); tdbFree(pData);
return 0; return ret;
} }
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) { static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
......
...@@ -204,7 +204,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char ...@@ -204,7 +204,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
} }
SBatchDeleteReq deleteReq; SBatchDeleteReq deleteReq;
SSubmitReq *pSubmitReq = tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true, SSubmitReq *pSubmitReq = tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true,
pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq); pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq);
if (!pSubmitReq) { if (!pSubmitReq) {
......
...@@ -51,6 +51,20 @@ void tqCleanUp() { ...@@ -51,6 +51,20 @@ void tqCleanUp() {
} }
} }
static void destroySTqHandle(void* data) {
STqHandle* pData = (STqHandle*)data;
qDestroyTask(pData->execHandle.task);
if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
} else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
tqCloseReader(pData->execHandle.pExecReader);
walCloseReader(pData->pWalReader);
taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
} else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE){
walCloseReader(pData->pWalReader);
tqCloseReader(pData->execHandle.pExecReader);
}
}
STQ* tqOpen(const char* path, SVnode* pVnode) { STQ* tqOpen(const char* path, SVnode* pVnode) {
STQ* pTq = taosMemoryCalloc(1, sizeof(STQ)); STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
if (pTq == NULL) { if (pTq == NULL) {
...@@ -62,6 +76,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { ...@@ -62,6 +76,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
taosHashSetFreeFp(pTq->pHandle, destroySTqHandle);
pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
...@@ -517,6 +533,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -517,6 +533,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
int64_t fetchVer = fetchOffsetNew.version + 1; int64_t fetchVer = fetchOffsetNew.version + 1;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) { if (pCkHead == NULL) {
tDeleteSTaosxRsp(&taosxRsp);
return -1; return -1;
} }
...@@ -577,14 +594,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -577,14 +594,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) { if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
code = -1; code = -1;
taosMemoryFree(pCkHead); taosMemoryFree(pCkHead);
tDeleteSTaosxRsp(&taosxRsp);
return code; return code;
} }
code = 0; code = 0;
if (pCkHead) taosMemoryFree(pCkHead); if (pCkHead) taosMemoryFree(pCkHead);
tDeleteSTaosxRsp(&taosxRsp);
return code; return code;
} }
} }
} }
tDeleteSTaosxRsp(&taosxRsp);
return 0; return 0;
} }
......
...@@ -243,14 +243,15 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp ...@@ -243,14 +243,15 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
} }
if (pHandle->fetchMeta) { if (pHandle->fetchMeta) {
SSubmitBlk* pBlk = pReader->pBlock; SSubmitBlk* pBlk = pReader->pBlock;
if (pBlk->schemaLen > 0) { int32_t schemaLen = htonl(pBlk->schemaLen);
if (schemaLen > 0) {
if (pRsp->createTableNum == 0) { if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
} }
void* createReq = taosMemoryCalloc(1, pBlk->schemaLen); void* createReq = taosMemoryCalloc(1, schemaLen);
memcpy(createReq, pBlk->data, pBlk->schemaLen); memcpy(createReq, pBlk->data, schemaLen);
taosArrayPush(pRsp->createTableLen, &pBlk->schemaLen); taosArrayPush(pRsp->createTableLen, &schemaLen);
taosArrayPush(pRsp->createTableReq, &createReq); taosArrayPush(pRsp->createTableReq, &createReq);
pRsp->createTableNum++; pRsp->createTableNum++;
} }
...@@ -277,14 +278,15 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp ...@@ -277,14 +278,15 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
} }
if (pHandle->fetchMeta) { if (pHandle->fetchMeta) {
SSubmitBlk* pBlk = pReader->pBlock; SSubmitBlk* pBlk = pReader->pBlock;
if (pBlk->schemaLen > 0) { int32_t schemaLen = htonl(pBlk->schemaLen);
if (schemaLen > 0) {
if (pRsp->createTableNum == 0) { if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
} }
void* createReq = taosMemoryCalloc(1, pBlk->schemaLen); void* createReq = taosMemoryCalloc(1, schemaLen);
memcpy(createReq, pBlk->data, pBlk->schemaLen); memcpy(createReq, pBlk->data, schemaLen);
taosArrayPush(pRsp->createTableLen, &pBlk->schemaLen); taosArrayPush(pRsp->createTableLen, &schemaLen);
taosArrayPush(pRsp->createTableReq, &createReq); taosArrayPush(pRsp->createTableReq, &createReq);
pRsp->createTableNum++; pRsp->createTableNum++;
} }
......
...@@ -145,6 +145,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) { ...@@ -145,6 +145,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
ASSERT(0); ASSERT(0);
tqError("write offset incomplete, len %d, write len %" PRId64, bodyLen, writeLen); tqError("write offset incomplete, len %d, write len %" PRId64, bodyLen, writeLen);
taosHashCancelIterate(pStore->pHash, pIter); taosHashCancelIterate(pStore->pHash, pIter);
taosMemoryFree(buf);
return -1; return -1;
} }
taosMemoryFree(buf); taosMemoryFree(buf);
......
...@@ -80,6 +80,13 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ ...@@ -80,6 +80,13 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){
void* buf = taosMemoryMalloc(tlen); void* buf = taosMemoryMalloc(tlen);
if (NULL == buf) { if (NULL == buf) {
taosArrayDestroy(reqNew.pArray); taosArrayDestroy(reqNew.pArray);
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment);
if (pCreateReq->type == TSDB_CHILD_TABLE) {
taosArrayDestroy(pCreateReq->ctb.tagName);
}
}
goto end; goto end;
} }
SEncoder coderNew = {0}; SEncoder coderNew = {0};
...@@ -91,6 +98,14 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ ...@@ -91,6 +98,14 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){
taosMemoryFree(buf); taosMemoryFree(buf);
taosArrayDestroy(reqNew.pArray); taosArrayDestroy(reqNew.pArray);
} }
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment);
if (pCreateReq->type == TSDB_CHILD_TABLE) {
taosArrayDestroy(pCreateReq->ctb.tagName);
}
}
} else if (msgType == TDMT_VND_ALTER_TABLE) { } else if (msgType == TDMT_VND_ALTER_TABLE) {
SVAlterTbReq req = {0}; SVAlterTbReq req = {0};
......
...@@ -48,7 +48,7 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl ...@@ -48,7 +48,7 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
return 0; return 0;
} }
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, bool createTb, SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, SSchemaWrapper* pTagSchemaWrapper, bool createTb,
int64_t suid, const char* stbFullName, SBatchDeleteReq* pDeleteReq) { int64_t suid, const char* stbFullName, SBatchDeleteReq* pDeleteReq) {
SSubmitReq* ret = NULL; SSubmitReq* ret = NULL;
SArray* schemaReqs = NULL; SArray* schemaReqs = NULL;
...@@ -89,6 +89,32 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem ...@@ -89,6 +89,32 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
return NULL; return NULL;
} }
SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
char tagNameStr[TSDB_COL_NAME_LEN] = {0};
strcpy(tagNameStr, "group_id");
taosArrayPush(tagName, tagNameStr);
// STag* pTag = NULL;
// taosArrayClear(tagArray);
// SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
// for(int j = 0; j < pTagSchemaWrapper->nCols; j++){
// STagVal tagVal = {
// .cid = pTagSchemaWrapper->pSchema[j].colId,
// .type = pTagSchemaWrapper->pSchema[j].type,
// .i64 = (int64_t)pDataBlock->info.groupId,
// };
// taosArrayPush(tagArray, &tagVal);
// taosArrayPush(tagName, pTagSchemaWrapper->pSchema[j].name);
// }
//
// tTagNew(tagArray, 1, false, &pTag);
// if (pTag == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
// taosArrayDestroy(tagArray);
// taosArrayDestroy(tagName);
// return NULL;
// }
SVCreateTbReq createTbReq = {0}; SVCreateTbReq createTbReq = {0};
SName name = {0}; SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
...@@ -99,6 +125,8 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem ...@@ -99,6 +125,8 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
createTbReq.type = TSDB_CHILD_TABLE; createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid; createTbReq.ctb.suid = suid;
createTbReq.ctb.pTag = (uint8_t*)pTag; createTbReq.ctb.pTag = (uint8_t*)pTag;
createTbReq.ctb.tagNum = taosArrayGetSize(tagArray);
createTbReq.ctb.tagName = tagName;
int32_t code; int32_t code;
int32_t schemaLen; int32_t schemaLen;
...@@ -113,6 +141,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem ...@@ -113,6 +141,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
void* schemaStr = taosMemoryMalloc(schemaLen); void* schemaStr = taosMemoryMalloc(schemaLen);
if (schemaStr == NULL) { if (schemaStr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tdDestroySVCreateTbReq(&createTbReq);
return NULL; return NULL;
} }
taosArrayPush(schemaReqs, &schemaStr); taosArrayPush(schemaReqs, &schemaStr);
...@@ -123,6 +152,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem ...@@ -123,6 +152,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
code = tEncodeSVCreateTbReq(&encoder, &createTbReq); code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
if (code < 0) { if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tdDestroySVCreateTbReq(&createTbReq);
return NULL; return NULL;
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
...@@ -231,7 +261,7 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ...@@ -231,7 +261,7 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
ASSERT(pTask->tbSink.pTSchema); ASSERT(pTask->tbSink.pTSchema);
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, pTask->tbSink.pSchemaWrapper, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, &deleteReq); pTask->tbSink.stbFullName, &deleteReq);
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId); tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);
......
...@@ -1342,7 +1342,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi ...@@ -1342,7 +1342,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) { taosMemoryFree(pCond->colList); } void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) { taosMemoryFreeClear(pCond->colList); }
int32_t convertFillType(int32_t mode) { int32_t convertFillType(int32_t mode) {
int32_t type = TSDB_FILL_NONE; int32_t type = TSDB_FILL_NONE;
......
...@@ -876,6 +876,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -876,6 +876,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList,
&pInfo->dataReader, NULL); &pInfo->dataReader, NULL);
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
pTaskInfo->streamInfo.schema = mtInfo.schema; pTaskInfo->streamInfo.schema = mtInfo.schema;
......
...@@ -3350,6 +3350,10 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) { ...@@ -3350,6 +3350,10 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
tDeleteSSchemaWrapper(pSchemaInfo->qsw); tDeleteSSchemaWrapper(pSchemaInfo->qsw);
} }
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) {
tDeleteSSchemaWrapper(pStreamInfo->schema);
}
static int32_t sortTableGroup(STableListInfo* pTableListInfo) { static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
taosArrayClear(pTableListInfo->pGroupList); taosArrayClear(pTableListInfo->pGroupList);
SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t)); SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t));
...@@ -4043,6 +4047,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { ...@@ -4043,6 +4047,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
doDestroyTableList(&pTaskInfo->tableqinfoList); doDestroyTableList(&pTaskInfo->tableqinfoList);
destroyOperatorInfo(pTaskInfo->pRoot); destroyOperatorInfo(pTaskInfo->pRoot);
cleanupTableSchemaInfo(&pTaskInfo->schemaInfo); cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
cleanupStreamInfo(&pTaskInfo->streamInfo);
nodesDestroyNode((SNode*)pTaskInfo->pSubplan); nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
......
...@@ -1780,6 +1780,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { ...@@ -1780,6 +1780,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
qDebug("tmqsnap change get data uid:%ld", mtInfo.uid); qDebug("tmqsnap change get data uid:%ld", mtInfo.uid);
qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType); qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType);
} }
tDeleteSSchemaWrapper(mtInfo.schema);
qDebug("tmqsnap stream scan tsdb return null"); qDebug("tmqsnap stream scan tsdb return null");
return NULL; return NULL;
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) { } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
......
...@@ -1059,7 +1059,7 @@ end: ...@@ -1059,7 +1059,7 @@ end:
for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) { for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) {
STagVal* p = (STagVal*)taosArrayGet(pTagVals, i); STagVal* p = (STagVal*)taosArrayGet(pTagVals, i);
if (IS_VAR_DATA_TYPE(p->type)) { if (IS_VAR_DATA_TYPE(p->type)) {
taosMemoryFree(p->pData); taosMemoryFreeClear(p->pData);
} }
} }
taosArrayDestroy(pTagVals); taosArrayDestroy(pTagVals);
...@@ -2040,7 +2040,7 @@ end: ...@@ -2040,7 +2040,7 @@ end:
for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) { for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
STagVal* p = (STagVal*)taosArrayGet(pTagArray, i); STagVal* p = (STagVal*)taosArrayGet(pTagArray, i);
if (p->type == TSDB_DATA_TYPE_NCHAR) { if (p->type == TSDB_DATA_TYPE_NCHAR) {
taosMemoryFree(p->pData); taosMemoryFreeClear(p->pData);
} }
} }
taosArrayDestroy(pTagArray); taosArrayDestroy(pTagArray);
......
...@@ -6664,12 +6664,7 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS ...@@ -6664,12 +6664,7 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
break; break;
} }
} while (0); } while (0);
for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) {
STagVal* p = (STagVal*)taosArrayGet(pTagVals, i);
if (IS_VAR_DATA_TYPE(p->type)) {
taosMemoryFree(p->pData);
}
}
taosArrayDestroy(pTagVals); taosArrayDestroy(pTagVals);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
......
...@@ -410,6 +410,12 @@ end: ...@@ -410,6 +410,12 @@ end:
if (retCode == TSDB_CODE_SUCCESS) { if (retCode == TSDB_CODE_SUCCESS) {
tTagNew(pTagVals, 1, true, ppTag); tTagNew(pTagVals, 1, true, ppTag);
} }
for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) {
STagVal* p = (STagVal*)taosArrayGet(pTagVals, i);
if (IS_VAR_DATA_TYPE(p->type)) {
taosMemoryFreeClear(p->pData);
}
}
cJSON_Delete(root); cJSON_Delete(root);
return retCode; return retCode;
} }
......
...@@ -632,6 +632,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SD ...@@ -632,6 +632,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SD
QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRes)); QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRes));
taosMemoryFreeClear(req.msg);
QW_SCH_TASK_DLOG("processDelete end, node:%p", node); QW_SCH_TASK_DLOG("processDelete end, node:%p", node);
_return: _return:
......
...@@ -31,18 +31,20 @@ class TDTestCase: ...@@ -31,18 +31,20 @@ class TDTestCase:
while True: while True:
dst = queryFile.readline() dst = queryFile.readline()
src = consumeFile.readline() src = consumeFile.readline()
if src:
if dst:
if dst != src: if dst != src:
tdLog.exit("compare error: %s != %s"%src, dst) tdLog.exit("compare error: %s != %s"%(src, dst))
else: else:
break break
return return
def checkDropData(self): def checkDropData(self, drop):
tdSql.execute('use db_taosx') tdSql.execute('use db_taosx')
tdSql.query("show tables") tdSql.query("show tables")
tdSql.checkRows(2) if drop:
tdSql.checkRows(10)
else:
tdSql.checkRows(15)
tdSql.query("select * from jt order by i") tdSql.query("select * from jt order by i")
tdSql.checkRows(2) tdSql.checkRows(2)
tdSql.checkData(0, 1, 1) tdSql.checkData(0, 1, 1)
...@@ -50,15 +52,72 @@ class TDTestCase: ...@@ -50,15 +52,72 @@ class TDTestCase:
tdSql.checkData(0, 2, '{"k1":1,"k2":"hello"}') tdSql.checkData(0, 2, '{"k1":1,"k2":"hello"}')
tdSql.checkData(1, 2, None) tdSql.checkData(1, 2, None)
tdSql.query("select * from sttb order by ts")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 13)
tdSql.checkData(1, 1, 16)
tdSql.checkData(0, 2, 22)
tdSql.checkData(1, 2, 25)
tdSql.checkData(0, 5, "sttb3")
tdSql.checkData(1, 5, "sttb4")
tdSql.query("select * from stt order by ts")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 21)
tdSql.checkData(0, 2, 2)
tdSql.checkData(1, 2, 21)
tdSql.checkData(0, 5, "stt3")
tdSql.checkData(1, 5, "stt4")
tdSql.execute('use abc1') tdSql.execute('use abc1')
tdSql.query("show tables") tdSql.query("show tables")
tdSql.checkRows(2) if drop:
tdSql.checkRows(10)
else:
tdSql.checkRows(15)
tdSql.query("select * from jt order by i") tdSql.query("select * from jt order by i")
tdSql.checkRows(2) tdSql.checkRows(2)
tdSql.checkData(0, 1, 1) tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 11) tdSql.checkData(1, 1, 11)
tdSql.checkData(0, 2, '{"k1":1,"k2":"hello"}') tdSql.checkData(0, 2, '{"k1":1,"k2":"hello"}')
tdSql.checkData(1, 2, None) tdSql.checkData(1, 2, None)
tdSql.query("select * from sttb order by ts")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 13)
tdSql.checkData(1, 1, 16)
tdSql.checkData(0, 2, 22)
tdSql.checkData(1, 2, 25)
tdSql.checkData(0, 5, "sttb3")
tdSql.checkData(1, 5, "sttb4")
tdSql.query("select * from stt order by ts")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 21)
tdSql.checkData(0, 2, 2)
tdSql.checkData(1, 2, 21)
tdSql.checkData(0, 5, "stt3")
tdSql.checkData(1, 5, "stt4")
return
def checkDataTable(self):
tdSql.execute('use db_taosx')
tdSql.query("select * from meters_summary")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 120)
tdSql.checkData(0, 2, 1)
tdSql.checkData(0, 3, "San Francisco")
tdSql.execute('use abc1')
tdSql.query("select * from meters_summary")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 120)
tdSql.checkData(0, 2, 1)
tdSql.checkData(0, 3, "San Francisco")
return return
def checkData(self): def checkData(self):
...@@ -145,6 +204,19 @@ class TDTestCase: ...@@ -145,6 +204,19 @@ class TDTestCase:
self.checkJson(cfgPath, "tmq_taosx_tmp") self.checkJson(cfgPath, "tmq_taosx_tmp")
self.checkData() self.checkData()
self.checkDropData(False)
return
def checkWal1VgroupTable(self):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -t'%(buildPath, cfgPath)
tdLog.info(cmdStr)
os.system(cmdStr)
self.checkJson(cfgPath, "tmq_taosx_tmp")
self.checkDataTable()
return return
...@@ -155,6 +227,7 @@ class TDTestCase: ...@@ -155,6 +227,7 @@ class TDTestCase:
os.system(cmdStr) os.system(cmdStr)
self.checkData() self.checkData()
self.checkDropData(False)
return return
...@@ -164,7 +237,7 @@ class TDTestCase: ...@@ -164,7 +237,7 @@ class TDTestCase:
tdLog.info(cmdStr) tdLog.info(cmdStr)
os.system(cmdStr) os.system(cmdStr)
self.checkDropData() self.checkDropData(True)
return return
...@@ -177,6 +250,19 @@ class TDTestCase: ...@@ -177,6 +250,19 @@ class TDTestCase:
self.checkJson(cfgPath, "tmq_taosx_tmp_snapshot") self.checkJson(cfgPath, "tmq_taosx_tmp_snapshot")
self.checkData() self.checkData()
self.checkDropData(False)
return
def checkSnapshot1VgroupTable(self):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -s -t'%(buildPath, cfgPath)
tdLog.info(cmdStr)
os.system(cmdStr)
self.checkJson(cfgPath, "tmq_taosx_tmp_snapshot")
self.checkDataTable()
return return
...@@ -187,6 +273,7 @@ class TDTestCase: ...@@ -187,6 +273,7 @@ class TDTestCase:
os.system(cmdStr) os.system(cmdStr)
self.checkData() self.checkData()
self.checkDropData(False)
return return
...@@ -196,7 +283,7 @@ class TDTestCase: ...@@ -196,7 +283,7 @@ class TDTestCase:
tdLog.info(cmdStr) tdLog.info(cmdStr)
os.system(cmdStr) os.system(cmdStr)
self.checkDropData() self.checkDropData(True)
return return
...@@ -205,6 +292,9 @@ class TDTestCase: ...@@ -205,6 +292,9 @@ class TDTestCase:
self.checkWal1Vgroup() self.checkWal1Vgroup()
self.checkSnapshot1Vgroup() self.checkSnapshot1Vgroup()
self.checkWal1VgroupTable()
self.checkSnapshot1VgroupTable()
self.checkWalMultiVgroups() self.checkWalMultiVgroups()
self.checkSnapshotMultiVgroups() self.checkSnapshotMultiVgroups()
......
...@@ -653,23 +653,23 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn ...@@ -653,23 +653,23 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
int32_t code = tmq_get_raw(msg, &raw); int32_t code = tmq_get_raw(msg, &raw);
if(code == TSDB_CODE_SUCCESS){ if(code == TSDB_CODE_SUCCESS){
int retCode = queryDB(pInfo->taos, "use metadb"); // int retCode = queryDB(pInfo->taos, "use metadb");
if (retCode != 0) { // if (retCode != 0) {
taosFprintfFile(g_fp, "error when use metadb\n"); // taosFprintfFile(g_fp, "error when use metadb\n");
taosCloseFile(&g_fp); // taosCloseFile(&g_fp);
exit(-1); // exit(-1);
} // }
taosFprintfFile(g_fp, "raw:%p\n", &raw); // taosFprintfFile(g_fp, "raw:%p\n", &raw);
//
tmq_write_raw(pInfo->taos, raw); // tmq_write_raw(pInfo->taos, raw);
} }
char* result = tmq_get_json_meta(msg); char* result = tmq_get_json_meta(msg);
if(result){ if(result && strcmp(result, "") != 0){
//printf("meta result: %s\n", result); //printf("meta result: %s\n", result);
taosFprintfFile(pInfo->pConsumeMetaFile, "%s\n", result); taosFprintfFile(pInfo->pConsumeMetaFile, "%s\n", result);
taosMemoryFree(result);
} }
tmq_free_json_meta(result);
} }
totalRows++; totalRows++;
...@@ -818,8 +818,12 @@ void loop_consume(SThreadInfo* pInfo) { ...@@ -818,8 +818,12 @@ void loop_consume(SThreadInfo* pInfo) {
tmq_res_t msgType = tmq_get_res_type(tmqMsg); tmq_res_t msgType = tmq_get_res_type(tmqMsg);
if (msgType == TMQ_RES_TABLE_META) { if (msgType == TMQ_RES_TABLE_META) {
totalRows += meta_msg_process(tmqMsg, pInfo, totalMsgs); totalRows += meta_msg_process(tmqMsg, pInfo, totalMsgs);
} else if (msgType == TMQ_RES_DATA) } else if (msgType == TMQ_RES_DATA){
totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs); totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs);
} else if (msgType == TMQ_RES_METADATA){
meta_msg_process(tmqMsg, pInfo, totalMsgs);
totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs);
}
} }
taos_free_result(tmqMsg); taos_free_result(tmqMsg);
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册