提交 dfc5ce23 编写于 作者: wmmhello's avatar wmmhello

fix:error in auto create table for taosX

上级 13059c40
...@@ -219,8 +219,6 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) { ...@@ -219,8 +219,6 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) {
goto _err; goto _err;
} }
string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen); string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen);
tDecoderClear(&coder);
return string;
_err: _err:
tDecoderClear(&coder); tDecoderClear(&coder);
...@@ -397,6 +395,10 @@ static char* processAutoCreateTable(STaosxRsp* rsp) { ...@@ -397,6 +395,10 @@ static char* processAutoCreateTable(STaosxRsp* rsp) {
_exit: _exit:
for(int i = 0; i < rsp->createTableNum; i++){ for(int i = 0; i < rsp->createTableNum; i++){
tDecoderClear(&decoder[i]); tDecoderClear(&decoder[i]);
taosMemoryFreeClear(pCreateReq[i].comment);
if (pCreateReq[i].type == TSDB_CHILD_TABLE) {
taosArrayDestroy(pCreateReq[i].ctb.tagName);
}
} }
taosMemoryFree(decoder); taosMemoryFree(decoder);
taosMemoryFree(pCreateReq); taosMemoryFree(pCreateReq);
...@@ -512,6 +514,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) { ...@@ -512,6 +514,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
_exit: _exit:
cJSON_Delete(json);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return string; return string;
} }
...@@ -543,6 +546,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) { ...@@ -543,6 +546,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
_exit: _exit:
cJSON_Delete(json);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return string; return string;
} }
...@@ -583,6 +587,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) { ...@@ -583,6 +587,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
_exit: _exit:
cJSON_Delete(json);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return string; return string;
} }
...@@ -688,6 +693,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -688,6 +693,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
goto end; goto end;
} }
pRequest->syncQuery = true;
if (!pRequest->pDb) { if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end; goto end;
...@@ -920,7 +926,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -920,7 +926,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
} }
pRequest->syncQuery = true;
if (!pRequest->pDb) { if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end; goto end;
...@@ -1093,6 +1099,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -1093,6 +1099,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
goto end; goto end;
} }
pRequest->syncQuery = true;
if (!pRequest->pDb) { if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end; goto end;
...@@ -1212,6 +1219,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) ...@@ -1212,6 +1219,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
goto end; goto end;
} }
pRequest->syncQuery = true;
if (!pRequest->pDb) { if (!pRequest->pDb) {
uError("WriteRaw:not use db"); uError("WriteRaw:not use db");
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
...@@ -1399,6 +1407,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { ...@@ -1399,6 +1407,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
return terrno; return terrno;
} }
pRequest->syncQuery = true;
rspObj.resIter = -1; rspObj.resIter = -1;
rspObj.resType = RES_TYPE__TMQ; rspObj.resType = RES_TYPE__TMQ;
...@@ -1664,6 +1673,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) ...@@ -1664,6 +1673,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
return terrno; return terrno;
} }
pRequest->syncQuery = true;
rspObj.resIter = -1; rspObj.resIter = -1;
rspObj.resType = RES_TYPE__TMQ_METADATA; rspObj.resType = RES_TYPE__TMQ_METADATA;
...@@ -1739,23 +1749,29 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) ...@@ -1739,23 +1749,29 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j); int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j);
SDecoder decoderTmp = {0}; SDecoder decoderTmp = {0};
SVCreateTbReq* pCreateReq; SVCreateTbReq pCreateReq = {0};
tDecoderInit(&decoderTmp, *dataTmp, *lenTmp); tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
if (tDecodeSVCreateTbReq(&decoderTmp, pCreateReq) < 0) { if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) {
tDecoderClear(&decoderTmp); tDecoderClear(&decoderTmp);
taosMemoryFreeClear(pCreateReq.comment);
taosArrayDestroy(pCreateReq.ctb.tagName);
goto end; goto end;
} }
ASSERT (pCreateReq->type == TSDB_CHILD_TABLE); ASSERT (pCreateReq.type == TSDB_CHILD_TABLE);
if(strcmp(tbName, pCreateReq->name) == 0){ if(strcmp(tbName, pCreateReq.name) == 0){
schemaLen = *lenTmp; schemaLen = *lenTmp;
schemaData = *dataTmp; schemaData = *dataTmp;
strcpy(pName.tname, pCreateReq->ctb.name); strcpy(pName.tname, pCreateReq.ctb.name);
tDecoderClear(&decoderTmp); tDecoderClear(&decoderTmp);
taosMemoryFreeClear(pCreateReq.comment);
taosArrayDestroy(pCreateReq.ctb.tagName);
break; break;
} }
tDecoderClear(&decoderTmp); tDecoderClear(&decoderTmp);
taosMemoryFreeClear(pCreateReq.comment);
taosArrayDestroy(pCreateReq.ctb.tagName);
} }
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
...@@ -1884,6 +1900,8 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) ...@@ -1884,6 +1900,8 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
subReq->length += sizeof(SSubmitBlk) + schemaLen + totalLen; subReq->length += sizeof(SSubmitBlk) + schemaLen + totalLen;
subReq->numOfBlocks++; subReq->numOfBlocks++;
taosMemoryFreeClear(pTableMeta); taosMemoryFreeClear(pTableMeta);
rspObj.resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&rspObj.resInfo);
} }
pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
......
...@@ -1816,7 +1816,7 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { ...@@ -1816,7 +1816,7 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
} else if (TD_RES_TMQ_META(res)) { } else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
return pMetaRspObj->vgId; return pMetaRspObj->vgId;
} else if (TD_RES_TMQ_META(res)) { } else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res; SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
return pRspObj->vgId; return pRspObj->vgId;
} else { } else {
...@@ -1832,6 +1832,13 @@ const char* tmq_get_table_name(TAOS_RES* res) { ...@@ -1832,6 +1832,13 @@ const char* tmq_get_table_name(TAOS_RES* res) {
return NULL; return NULL;
} }
return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter); return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
pRspObj->resIter >= pRspObj->rsp.blockNum) {
return NULL;
}
return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
} }
return NULL; return NULL;
} }
......
...@@ -6007,12 +6007,18 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) { ...@@ -6007,12 +6007,18 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) {
void tDeleteSTaosxRsp(STaosxRsp *pRsp) { void tDeleteSTaosxRsp(STaosxRsp *pRsp) {
taosArrayDestroy(pRsp->blockDataLen); taosArrayDestroy(pRsp->blockDataLen);
pRsp->blockDataLen = NULL;
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree); taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
pRsp->blockData = NULL;
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper); taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper);
pRsp->blockSchema = NULL;
taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree); taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree);
pRsp->blockTbName = NULL;
taosArrayDestroy(pRsp->createTableLen); taosArrayDestroy(pRsp->createTableLen);
pRsp->createTableLen = NULL;
taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree); taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree);
pRsp->createTableReq = NULL;
} }
int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) { int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) {
......
...@@ -539,7 +539,7 @@ typedef struct { ...@@ -539,7 +539,7 @@ typedef struct {
} SMqConsumerEp; } SMqConsumerEp;
SMqConsumerEp* tCloneSMqConsumerEp(const SMqConsumerEp* pEp); SMqConsumerEp* tCloneSMqConsumerEp(const SMqConsumerEp* pEp);
void tDeleteSMqConsumerEp(SMqConsumerEp* pEp); void tDeleteSMqConsumerEp(void* pEp);
int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pEp); int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pEp);
void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp); void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp);
......
...@@ -197,11 +197,12 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { ...@@ -197,11 +197,12 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg)); SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));
pLostMsg->consumerId = pConsumer->consumerId; pLostMsg->consumerId = pConsumer->consumerId;
SRpcMsg *pRpcMsg = taosMemoryCalloc(1, sizeof(SRpcMsg)); SRpcMsg pRpcMsg = {
pRpcMsg->msgType = TDMT_MND_MQ_CONSUMER_LOST; .msgType = TDMT_MND_MQ_CONSUMER_LOST,
pRpcMsg->pCont = pLostMsg; .pCont = pLostMsg,
pRpcMsg->contLen = sizeof(SMqConsumerLostMsg); .contLen = sizeof(SMqConsumerLostMsg),
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, pRpcMsg); };
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
} }
if (status == MQ_CONSUMER_STATUS__LOST_REBD || status == MQ_CONSUMER_STATUS__READY) { if (status == MQ_CONSUMER_STATUS__LOST_REBD || status == MQ_CONSUMER_STATUS__READY) {
// do nothing // do nothing
...@@ -280,11 +281,12 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { ...@@ -280,11 +281,12 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
pRecoverMsg->consumerId = consumerId; pRecoverMsg->consumerId = consumerId;
SRpcMsg *pRpcMsg = taosMemoryCalloc(1, sizeof(SRpcMsg)); SRpcMsg pRpcMsg = {
pRpcMsg->msgType = TDMT_MND_MQ_CONSUMER_RECOVER; .msgType = TDMT_MND_MQ_CONSUMER_RECOVER,
pRpcMsg->pCont = pRecoverMsg; .pCont = pRecoverMsg,
pRpcMsg->contLen = sizeof(SMqConsumerRecoverMsg); .contLen = sizeof(SMqConsumerRecoverMsg),
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, pRpcMsg); };
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
} }
mndReleaseConsumer(pMnode, pConsumer); mndReleaseConsumer(pMnode, pConsumer);
...@@ -318,11 +320,12 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { ...@@ -318,11 +320,12 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
pRecoverMsg->consumerId = consumerId; pRecoverMsg->consumerId = consumerId;
SRpcMsg *pRpcMsg = taosMemoryCalloc(1, sizeof(SRpcMsg)); SRpcMsg pRpcMsg = {
pRpcMsg->msgType = TDMT_MND_MQ_CONSUMER_RECOVER; .msgType = TDMT_MND_MQ_CONSUMER_RECOVER,
pRpcMsg->pCont = pRecoverMsg; .pCont = pRecoverMsg,
pRpcMsg->contLen = sizeof(SMqConsumerRecoverMsg); .contLen = sizeof(SMqConsumerRecoverMsg),
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, pRpcMsg); };
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
} }
#endif #endif
......
...@@ -343,8 +343,8 @@ SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) { ...@@ -343,8 +343,8 @@ SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
return pConsumerEpNew; return pConsumerEpNew;
} }
void tDeleteSMqConsumerEp(SMqConsumerEp *pConsumerEp) { void tDeleteSMqConsumerEp(void *data) {
// SMqConsumerEp *pConsumerEp = (SMqConsumerEp*)data;
taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp); taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
} }
......
...@@ -319,8 +319,12 @@ _query: ...@@ -319,8 +319,12 @@ _query:
pSchema = tCloneSSchemaWrapper(&meNew.stbEntry.schemaRow); pSchema = tCloneSSchemaWrapper(&meNew.stbEntry.schemaRow);
tDecoderClear(&dcNew); tDecoderClear(&dcNew);
tdbTbcClose(pCur); tdbTbcClose(pCur);
tdbFree(pKey);
tdbFree(pVal);
goto _exit; goto _exit;
} }
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur); tdbTbcClose(pCur);
} }
} else if (me.type == TSDB_CHILD_TABLE) { } else if (me.type == TSDB_CHILD_TABLE) {
...@@ -347,11 +351,13 @@ _query: ...@@ -347,11 +351,13 @@ _query:
tDecoderClear(&dc); tDecoderClear(&dc);
_exit: _exit:
tDecoderClear(&dc);
metaULock(pMeta); metaULock(pMeta);
tdbFree(pData); tdbFree(pData);
return pSchema; return pSchema;
_err: _err:
tDecoderClear(&dc);
metaULock(pMeta); metaULock(pMeta);
tdbFree(pData); tdbFree(pData);
return NULL; return NULL;
...@@ -382,11 +388,9 @@ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList) { ...@@ -382,11 +388,9 @@ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList) {
} }
ttlKey = *(STtlIdxKey *)pKey; ttlKey = *(STtlIdxKey *)pKey;
taosArrayPush(uidList, &ttlKey.uid); taosArrayPush(uidList, &ttlKey.uid);
tdbFree(pKey);
} }
tdbTbcClose(pCur); tdbTbcClose(pCur);
tdbFree(pKey);
return 0; return 0;
} }
......
...@@ -353,6 +353,8 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t ...@@ -353,6 +353,8 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t
metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version, idData->index); metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version, idData->index);
} }
tdbFree(pKey);
tdbFree(pVal);
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }
...@@ -528,6 +530,7 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in ...@@ -528,6 +530,7 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in
} }
} }
} }
taosArrayDestroy(pTagVals);
} }
// SIdInfo* sidInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &me.ctbEntry.suid, sizeof(tb_uid_t)); // SIdInfo* sidInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &me.ctbEntry.suid, sizeof(tb_uid_t));
// if(sidInfo->version >= idInfo->version){ // if(sidInfo->version >= idInfo->version){
......
...@@ -1191,10 +1191,11 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { ...@@ -1191,10 +1191,11 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
const void *pTagData = NULL; // const void *pTagData = NULL; //
int32_t nTagData = 0; int32_t nTagData = 0;
SDecoder dc = {0}; SDecoder dc = {0};
int32_t ret = 0;
// get super table // get super table
if (tdbTbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData) != 0) { if (tdbTbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData) != 0) {
return -1; ret = -1;
goto end;
} }
tbDbKey.uid = pCtbEntry->ctbEntry.suid; tbDbKey.uid = pCtbEntry->ctbEntry.suid;
tbDbKey.version = ((SUidIdxVal *)pData)[0].version; tbDbKey.version = ((SUidIdxVal *)pData)[0].version;
...@@ -1220,17 +1221,20 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { ...@@ -1220,17 +1221,20 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
// nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len; // nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len;
pTagData = pCtbEntry->ctbEntry.pTags; pTagData = pCtbEntry->ctbEntry.pTags;
nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len; nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len;
return metaSaveJsonVarToIdx(pMeta, pCtbEntry, pTagColumn); ret = metaSaveJsonVarToIdx(pMeta, pCtbEntry, pTagColumn);
goto end;
} }
if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type, if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type,
pCtbEntry->uid, &pTagIdxKey, &nTagIdxKey) < 0) { pCtbEntry->uid, &pTagIdxKey, &nTagIdxKey) < 0) {
return -1; ret = -1;
goto end;
} }
tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn); tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn);
end:
metaDestroyTagIdxKey(pTagIdxKey); metaDestroyTagIdxKey(pTagIdxKey);
tDecoderClear(&dc); tDecoderClear(&dc);
tdbFree(pData); tdbFree(pData);
return 0; return ret;
} }
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) { static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
......
...@@ -51,6 +51,20 @@ void tqCleanUp() { ...@@ -51,6 +51,20 @@ void tqCleanUp() {
} }
} }
static void destroySTqHandle(void* data) {
STqHandle* pData = (STqHandle*)data;
qDestroyTask(pData->execHandle.task);
if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
} else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
tqCloseReader(pData->execHandle.pExecReader);
walCloseReader(pData->pWalReader);
taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
} else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE){
walCloseReader(pData->pWalReader);
tqCloseReader(pData->execHandle.pExecReader);
}
}
STQ* tqOpen(const char* path, SVnode* pVnode) { STQ* tqOpen(const char* path, SVnode* pVnode) {
STQ* pTq = taosMemoryCalloc(1, sizeof(STQ)); STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
if (pTq == NULL) { if (pTq == NULL) {
...@@ -62,6 +76,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { ...@@ -62,6 +76,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
taosHashSetFreeFp(pTq->pHandle, destroySTqHandle);
pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
...@@ -520,6 +536,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -520,6 +536,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
int64_t fetchVer = fetchOffsetNew.version + 1; int64_t fetchVer = fetchOffsetNew.version + 1;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) { if (pCkHead == NULL) {
tDeleteSTaosxRsp(&taosxRsp);
return -1; return -1;
} }
...@@ -580,14 +597,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -580,14 +597,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) { if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
code = -1; code = -1;
taosMemoryFree(pCkHead); taosMemoryFree(pCkHead);
tDeleteSTaosxRsp(&taosxRsp);
return code; return code;
} }
code = 0; code = 0;
if (pCkHead) taosMemoryFree(pCkHead); if (pCkHead) taosMemoryFree(pCkHead);
tDeleteSTaosxRsp(&taosxRsp);
return code; return code;
} }
} }
} }
tDeleteSTaosxRsp(&taosxRsp);
return 0; return 0;
} }
......
...@@ -145,8 +145,10 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) { ...@@ -145,8 +145,10 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
ASSERT(0); ASSERT(0);
tqError("write offset incomplete, len %d, write len %" PRId64, bodyLen, writeLen); tqError("write offset incomplete, len %d, write len %" PRId64, bodyLen, writeLen);
taosHashCancelIterate(pStore->pHash, pIter); taosHashCancelIterate(pStore->pHash, pIter);
taosMemoryFree(buf);
return -1; return -1;
} }
taosMemoryFree(buf);
} }
// close and rename file // close and rename file
taosCloseFile(&pFile); taosCloseFile(&pFile);
......
...@@ -3645,6 +3645,11 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) { ...@@ -3645,6 +3645,11 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
tDeleteSSchemaWrapper(pSchemaInfo->qsw); tDeleteSSchemaWrapper(pSchemaInfo->qsw);
} }
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) {
tDeleteSSchemaWrapper(pStreamInfo->schema);
cleanupQueryTableDataCond(&pStreamInfo->tableCond);
}
static int32_t sortTableGroup(STableListInfo* pTableListInfo) { static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
taosArrayClear(pTableListInfo->pGroupList); taosArrayClear(pTableListInfo->pGroupList);
SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t)); SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t));
...@@ -4338,6 +4343,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { ...@@ -4338,6 +4343,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
doDestroyTableList(&pTaskInfo->tableqinfoList); doDestroyTableList(&pTaskInfo->tableqinfoList);
destroyOperatorInfo(pTaskInfo->pRoot); destroyOperatorInfo(pTaskInfo->pRoot);
cleanupTableSchemaInfo(&pTaskInfo->schemaInfo); cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
cleanupStreamInfo(&pTaskInfo->streamInfo);
nodesDestroyNode((SNode*)pTaskInfo->pSubplan); nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
......
...@@ -1769,6 +1769,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { ...@@ -1769,6 +1769,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
qDebug("tmqsnap change get data uid:%ld", mtInfo.uid); qDebug("tmqsnap change get data uid:%ld", mtInfo.uid);
qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType); qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType);
} }
tDeleteSSchemaWrapper(mtInfo.schema);
qDebug("tmqsnap stream scan tsdb return null"); qDebug("tmqsnap stream scan tsdb return null");
return NULL; return NULL;
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) { } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
......
...@@ -632,6 +632,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SD ...@@ -632,6 +632,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SD
QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRes)); QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRes));
taosMemoryFreeClear(req.msg);
QW_SCH_TASK_DLOG("processDelete end, node:%p", node); QW_SCH_TASK_DLOG("processDelete end, node:%p", node);
_return: _return:
......
...@@ -68,6 +68,7 @@ static void msg_process(TAOS_RES* msg) { ...@@ -68,6 +68,7 @@ static void msg_process(TAOS_RES* msg) {
tmq_raw_data raw = {0}; tmq_raw_data raw = {0};
tmq_get_raw(msg, &raw); tmq_get_raw(msg, &raw);
printf("write raw data type: %d\n", raw.raw_type);
int32_t ret = tmq_write_raw(pConn, raw); int32_t ret = tmq_write_raw(pConn, raw);
printf("write raw data: %s\n", tmq_err2str(ret)); printf("write raw data: %s\n", tmq_err2str(ret));
tmq_free_raw(raw); tmq_free_raw(raw);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册