提交 414021a8 编写于 作者: wmmhello's avatar wmmhello

fix:error in auto create table for taosX

上级 0afef12c
...@@ -2954,7 +2954,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE ...@@ -2954,7 +2954,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
} }
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
// taosMemoryFree(pSubTopicEp->schema.pSchema); taosMemoryFreeClear(pSubTopicEp->schema.pSchema);
taosArrayDestroy(pSubTopicEp->vgs); taosArrayDestroy(pSubTopicEp->vgs);
} }
......
...@@ -362,12 +362,19 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) { ...@@ -362,12 +362,19 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
} }
_exit: _exit:
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment);
if (pCreateReq->type == TSDB_CHILD_TABLE) {
taosArrayDestroy(pCreateReq->ctb.tagName);
}
}
tDecoderClear(&decoder); tDecoderClear(&decoder);
return string; return string;
} }
static char* processAutoCreateTable(STaosxRsp* rsp) { static char* processAutoCreateTable(STaosxRsp* rsp) {
if(rsp->createTableNum == 0) return NULL; if(rsp->createTableNum == 0) return strdup("");
SDecoder* decoder = taosMemoryCalloc(rsp->createTableNum, sizeof(SDecoder)); SDecoder* decoder = taosMemoryCalloc(rsp->createTableNum, sizeof(SDecoder));
SVCreateTbReq* pCreateReq = taosMemoryCalloc(rsp->createTableNum, sizeof(SVCreateTbReq)); SVCreateTbReq* pCreateReq = taosMemoryCalloc(rsp->createTableNum, sizeof(SVCreateTbReq));
...@@ -873,6 +880,14 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -873,6 +880,14 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
code = pRequest->code; code = pRequest->code;
end: end:
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment);
if (pCreateReq->type == TSDB_CHILD_TABLE) {
taosArrayDestroy(pCreateReq->ctb.tagName);
}
}
taosHashCleanup(pVgroupHashmap); taosHashCleanup(pVgroupHashmap);
destroyRequest(pRequest); destroyRequest(pRequest);
tDecoderClear(&coder); tDecoderClear(&coder);
...@@ -1621,6 +1636,9 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { ...@@ -1621,6 +1636,9 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
code = pRequest->code; code = pRequest->code;
end: end:
tDeleteSMqDataRsp(&rspObj.rsp);
rspObj.resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&rspObj.resInfo);
tDecoderClear(&decoder); tDecoderClear(&decoder);
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
destroyRequest(pRequest); destroyRequest(pRequest);
...@@ -1629,7 +1647,7 @@ end: ...@@ -1629,7 +1647,7 @@ end:
return code; return code;
} }
static int32_t tmqWriteRaqMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) { static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SHashObj* pVgHash = NULL; SHashObj* pVgHash = NULL;
SQuery* pQuery = NULL; SQuery* pQuery = NULL;
...@@ -1711,6 +1729,33 @@ static int32_t tmqWriteRaqMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) ...@@ -1711,6 +1729,33 @@ static int32_t tmqWriteRaqMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
goto end; goto end;
} }
// find schema data info
int32_t schemaLen = 0;
void* schemaData = NULL;
for(int j = 0; j < rspObj.rsp.createTableNum; j++){
void** dataTmp = taosArrayGet(rspObj.rsp.createTableReq, j);
int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j);
SDecoder decoderTmp = {0};
SVCreateTbReq* pCreateReq;
tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
if (tDecodeSVCreateTbReq(&decoderTmp, pCreateReq) < 0) {
tDecoderClear(&decoderTmp);
goto end;
}
ASSERT (pCreateReq->type == TSDB_CHILD_TABLE);
if(strcmp(tbName, pCreateReq->name) == 0){
schemaLen = *lenTmp;
schemaData = *dataTmp;
strcpy(pName.tname, pCreateReq->ctb.name);
tDecoderClear(&decoderTmp);
break;
}
tDecoderClear(&decoderTmp);
}
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName); uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName);
...@@ -1738,32 +1783,6 @@ static int32_t tmqWriteRaqMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) ...@@ -1738,32 +1783,6 @@ static int32_t tmqWriteRaqMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) + int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
(int32_t)TD_BITMAP_BYTES(pTableMeta->tableInfo.numOfColumns - 1); (int32_t)TD_BITMAP_BYTES(pTableMeta->tableInfo.numOfColumns - 1);
// find schema data info
int32_t schemaLen = 0;
void* schemaData = NULL;
for(int j = 0; j < rspObj.rsp.createTableNum; j++){
void** dataTmp = taosArrayGet(rspObj.rsp.createTableReq, j);
int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j);
SDecoder decoderTmp = {0};
SVCreateTbReq* pCreateReq;
tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
if (tDecodeSVCreateTbReq(&decoderTmp, pCreateReq) < 0) {
tDecoderClear(&decoderTmp);
goto end;
}
ASSERT (pCreateReq->type == TSDB_CHILD_TABLE);
if(strcmp(tbName, pCreateReq->name) == 0){
schemaLen = *lenTmp;
schemaData = *dataTmp;
tDecoderClear(&decoderTmp);
break;
}
tDecoderClear(&decoderTmp);
}
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize; int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
SSubmitReq* subReq = NULL; SSubmitReq* subReq = NULL;
...@@ -1912,6 +1931,9 @@ static int32_t tmqWriteRaqMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) ...@@ -1912,6 +1931,9 @@ static int32_t tmqWriteRaqMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
code = pRequest->code; code = pRequest->code;
end: end:
tDeleteSTaosxRsp(&rspObj.rsp);
rspObj.resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&rspObj.resInfo);
tDecoderClear(&decoder); tDecoderClear(&decoder);
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
destroyRequest(pRequest); destroyRequest(pRequest);
...@@ -2003,7 +2025,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { ...@@ -2003,7 +2025,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
} }
void tmq_free_raw(tmq_raw_data raw) { void tmq_free_raw(tmq_raw_data raw) {
if (raw.raw_type == RES_TYPE__TMQ) { if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) {
taosMemoryFree(raw.raw); taosMemoryFree(raw.raw);
} }
} }
...@@ -2030,7 +2052,7 @@ int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) { ...@@ -2030,7 +2052,7 @@ int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
} else if (raw.raw_type == RES_TYPE__TMQ) { } else if (raw.raw_type == RES_TYPE__TMQ) {
return tmqWriteRawDataImpl(taos, raw.raw, raw.raw_len); return tmqWriteRawDataImpl(taos, raw.raw, raw.raw_len);
} else if (raw.raw_type == RES_TYPE__TMQ_METADATA) { } else if (raw.raw_type == RES_TYPE__TMQ_METADATA) {
return tmqWriteRaqMetaDataImpl(taos, raw.raw, raw.raw_len); return tmqWriteRawMetaDataImpl(taos, raw.raw, raw.raw_len);
} }
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
...@@ -184,7 +184,7 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg); ...@@ -184,7 +184,7 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid,
const char* stbFullName, SBatchDeleteReq* pDeleteReq); const char* stbFullName, SBatchDeleteReq* pDeleteReq);
// sma // sma
......
...@@ -204,7 +204,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char ...@@ -204,7 +204,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
} }
SBatchDeleteReq deleteReq; SBatchDeleteReq deleteReq;
SSubmitReq *pSubmitReq = tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true, SSubmitReq *pSubmitReq = tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true,
pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq); pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq);
if (!pSubmitReq) { if (!pSubmitReq) {
......
...@@ -243,14 +243,15 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp ...@@ -243,14 +243,15 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
} }
if (pHandle->fetchMeta) { if (pHandle->fetchMeta) {
SSubmitBlk* pBlk = pReader->pBlock; SSubmitBlk* pBlk = pReader->pBlock;
if (pBlk->schemaLen > 0) { int32_t schemaLen = htonl(pBlk->schemaLen);
if (schemaLen > 0) {
if (pRsp->createTableNum == 0) { if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
} }
void* createReq = taosMemoryCalloc(1, pBlk->schemaLen); void* createReq = taosMemoryCalloc(1, schemaLen);
memcpy(createReq, pBlk->data, pBlk->schemaLen); memcpy(createReq, pBlk->data, schemaLen);
taosArrayPush(pRsp->createTableLen, &pBlk->schemaLen); taosArrayPush(pRsp->createTableLen, &schemaLen);
taosArrayPush(pRsp->createTableReq, &createReq); taosArrayPush(pRsp->createTableReq, &createReq);
pRsp->createTableNum++; pRsp->createTableNum++;
} }
...@@ -277,14 +278,15 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp ...@@ -277,14 +278,15 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
} }
if (pHandle->fetchMeta) { if (pHandle->fetchMeta) {
SSubmitBlk* pBlk = pReader->pBlock; SSubmitBlk* pBlk = pReader->pBlock;
if (pBlk->schemaLen > 0) { int32_t schemaLen = htonl(pBlk->schemaLen);
if (schemaLen > 0) {
if (pRsp->createTableNum == 0) { if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
} }
void* createReq = taosMemoryCalloc(1, pBlk->schemaLen); void* createReq = taosMemoryCalloc(1, schemaLen);
memcpy(createReq, pBlk->data, pBlk->schemaLen); memcpy(createReq, pBlk->data, schemaLen);
taosArrayPush(pRsp->createTableLen, &pBlk->schemaLen); taosArrayPush(pRsp->createTableLen, &schemaLen);
taosArrayPush(pRsp->createTableReq, &createReq); taosArrayPush(pRsp->createTableReq, &createReq);
pRsp->createTableNum++; pRsp->createTableNum++;
} }
......
...@@ -48,7 +48,7 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl ...@@ -48,7 +48,7 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
return 0; return 0;
} }
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, bool createTb, SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, SSchemaWrapper* pTagSchemaWrapper, bool createTb,
int64_t suid, const char* stbFullName, SBatchDeleteReq* pDeleteReq) { int64_t suid, const char* stbFullName, SBatchDeleteReq* pDeleteReq) {
SSubmitReq* ret = NULL; SSubmitReq* ret = NULL;
SArray* schemaReqs = NULL; SArray* schemaReqs = NULL;
...@@ -89,6 +89,30 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem ...@@ -89,6 +89,30 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
return NULL; return NULL;
} }
SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
taosArrayPush(tagName, "group_id");
// STag* pTag = NULL;
// taosArrayClear(tagArray);
// SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
// for(int j = 0; j < pTagSchemaWrapper->nCols; j++){
// STagVal tagVal = {
// .cid = pTagSchemaWrapper->pSchema[j].colId,
// .type = pTagSchemaWrapper->pSchema[j].type,
// .i64 = (int64_t)pDataBlock->info.groupId,
// };
// taosArrayPush(tagArray, &tagVal);
// taosArrayPush(tagName, pTagSchemaWrapper->pSchema[j].name);
// }
//
// tTagNew(tagArray, 1, false, &pTag);
// if (pTag == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
// taosArrayDestroy(tagArray);
// taosArrayDestroy(tagName);
// return NULL;
// }
SVCreateTbReq createTbReq = {0}; SVCreateTbReq createTbReq = {0};
SName name = {0}; SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
...@@ -99,6 +123,8 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem ...@@ -99,6 +123,8 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
createTbReq.type = TSDB_CHILD_TABLE; createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid; createTbReq.ctb.suid = suid;
createTbReq.ctb.pTag = (uint8_t*)pTag; createTbReq.ctb.pTag = (uint8_t*)pTag;
createTbReq.ctb.tagNum = taosArrayGetSize(tagArray);
createTbReq.ctb.tagName = tagName;
int32_t code; int32_t code;
int32_t schemaLen; int32_t schemaLen;
...@@ -113,6 +139,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem ...@@ -113,6 +139,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
void* schemaStr = taosMemoryMalloc(schemaLen); void* schemaStr = taosMemoryMalloc(schemaLen);
if (schemaStr == NULL) { if (schemaStr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tdDestroySVCreateTbReq(&createTbReq);
return NULL; return NULL;
} }
taosArrayPush(schemaReqs, &schemaStr); taosArrayPush(schemaReqs, &schemaStr);
...@@ -123,6 +150,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem ...@@ -123,6 +150,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
code = tEncodeSVCreateTbReq(&encoder, &createTbReq); code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
if (code < 0) { if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tdDestroySVCreateTbReq(&createTbReq);
return NULL; return NULL;
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
...@@ -231,7 +259,7 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ...@@ -231,7 +259,7 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
ASSERT(pTask->tbSink.pTSchema); ASSERT(pTask->tbSink.pTSchema);
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, pTask->tbSink.pSchemaWrapper, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, &deleteReq); pTask->tbSink.stbFullName, &deleteReq);
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId); tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);
......
...@@ -1059,7 +1059,7 @@ end: ...@@ -1059,7 +1059,7 @@ end:
for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) { for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) {
STagVal* p = (STagVal*)taosArrayGet(pTagVals, i); STagVal* p = (STagVal*)taosArrayGet(pTagVals, i);
if (IS_VAR_DATA_TYPE(p->type)) { if (IS_VAR_DATA_TYPE(p->type)) {
taosMemoryFree(p->pData); taosMemoryFreeClear(p->pData);
} }
} }
taosArrayDestroy(pTagVals); taosArrayDestroy(pTagVals);
...@@ -2039,7 +2039,7 @@ end: ...@@ -2039,7 +2039,7 @@ end:
for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) { for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
STagVal* p = (STagVal*)taosArrayGet(pTagArray, i); STagVal* p = (STagVal*)taosArrayGet(pTagArray, i);
if (p->type == TSDB_DATA_TYPE_NCHAR) { if (p->type == TSDB_DATA_TYPE_NCHAR) {
taosMemoryFree(p->pData); taosMemoryFreeClear(p->pData);
} }
} }
taosArrayDestroy(pTagArray); taosArrayDestroy(pTagArray);
......
...@@ -6633,12 +6633,7 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS ...@@ -6633,12 +6633,7 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
break; break;
} }
} while (0); } while (0);
for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) {
STagVal* p = (STagVal*)taosArrayGet(pTagVals, i);
if (IS_VAR_DATA_TYPE(p->type)) {
taosMemoryFree(p->pData);
}
}
taosArrayDestroy(pTagVals); taosArrayDestroy(pTagVals);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
......
...@@ -410,6 +410,12 @@ end: ...@@ -410,6 +410,12 @@ end:
if (retCode == TSDB_CODE_SUCCESS) { if (retCode == TSDB_CODE_SUCCESS) {
tTagNew(pTagVals, 1, true, ppTag); tTagNew(pTagVals, 1, true, ppTag);
} }
for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) {
STagVal* p = (STagVal*)taosArrayGet(pTagVals, i);
if (IS_VAR_DATA_TYPE(p->type)) {
taosMemoryFreeClear(p->pData);
}
}
cJSON_Delete(root); cJSON_Delete(root);
return retCode; return retCode;
} }
......
...@@ -1153,7 +1153,8 @@ int32_t toJsonFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu ...@@ -1153,7 +1153,8 @@ int32_t toJsonFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
memcpy(tmp, varDataVal(input), varDataLen(input)); memcpy(tmp, varDataVal(input), varDataLen(input));
tmp[varDataLen(input)] = 0; tmp[varDataLen(input)] = 0;
if(parseJsontoTagData(tmp, pTagVals, &pTag, NULL)){ if(parseJsontoTagData(tmp, pTagVals, &pTag, NULL)){
tTagNew(pTagVals, 1, true, &pTag); taosArrayDestroy(pTagVals);
return TSDB_CODE_FAILED;
} }
} }
......
...@@ -38,10 +38,13 @@ class TDTestCase: ...@@ -38,10 +38,13 @@ class TDTestCase:
break break
return return
def checkDropData(self): def checkDropData(self, drop):
tdSql.execute('use db_taosx') tdSql.execute('use db_taosx')
tdSql.query("show tables") tdSql.query("show tables")
tdSql.checkRows(6) if drop:
tdSql.checkRows(10)
else:
tdSql.checkRows(15)
tdSql.query("select * from jt order by i") tdSql.query("select * from jt order by i")
tdSql.checkRows(2) tdSql.checkRows(2)
tdSql.checkData(0, 1, 1) tdSql.checkData(0, 1, 1)
...@@ -49,15 +52,72 @@ class TDTestCase: ...@@ -49,15 +52,72 @@ class TDTestCase:
tdSql.checkData(0, 2, '{"k1":1,"k2":"hello"}') tdSql.checkData(0, 2, '{"k1":1,"k2":"hello"}')
tdSql.checkData(1, 2, None) tdSql.checkData(1, 2, None)
tdSql.query("select * from sttb order by ts")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 13)
tdSql.checkData(1, 1, 16)
tdSql.checkData(0, 2, 22)
tdSql.checkData(1, 2, 25)
tdSql.checkData(0, 5, "sttb3")
tdSql.checkData(1, 5, "sttb4")
tdSql.query("select * from stt order by ts")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 21)
tdSql.checkData(0, 2, 2)
tdSql.checkData(1, 2, 21)
tdSql.checkData(0, 5, "stt3")
tdSql.checkData(1, 5, "stt4")
tdSql.execute('use abc1') tdSql.execute('use abc1')
tdSql.query("show tables") tdSql.query("show tables")
tdSql.checkRows(6) if drop:
tdSql.checkRows(10)
else:
tdSql.checkRows(15)
tdSql.query("select * from jt order by i") tdSql.query("select * from jt order by i")
tdSql.checkRows(2) tdSql.checkRows(2)
tdSql.checkData(0, 1, 1) tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 11) tdSql.checkData(1, 1, 11)
tdSql.checkData(0, 2, '{"k1":1,"k2":"hello"}') tdSql.checkData(0, 2, '{"k1":1,"k2":"hello"}')
tdSql.checkData(1, 2, None) tdSql.checkData(1, 2, None)
tdSql.query("select * from sttb order by ts")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 13)
tdSql.checkData(1, 1, 16)
tdSql.checkData(0, 2, 22)
tdSql.checkData(1, 2, 25)
tdSql.checkData(0, 5, "sttb3")
tdSql.checkData(1, 5, "sttb4")
tdSql.query("select * from stt order by ts")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 21)
tdSql.checkData(0, 2, 2)
tdSql.checkData(1, 2, 21)
tdSql.checkData(0, 5, "stt3")
tdSql.checkData(1, 5, "stt4")
return
def checkDataTable(self):
tdSql.execute('use db_taosx')
tdSql.query("select * from meters_summary")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 120)
tdSql.checkData(0, 2, 1)
tdSql.checkData(0, 3, "San Francisco")
tdSql.execute('use abc1')
tdSql.query("select * from meters_summary")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 120)
tdSql.checkData(0, 2, 1)
tdSql.checkData(0, 3, "San Francisco")
return return
def checkData(self): def checkData(self):
...@@ -144,6 +204,19 @@ class TDTestCase: ...@@ -144,6 +204,19 @@ class TDTestCase:
self.checkJson(cfgPath, "tmq_taosx_tmp") self.checkJson(cfgPath, "tmq_taosx_tmp")
self.checkData() self.checkData()
self.checkDropData(False)
return
def checkWal1VgroupTable(self):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -t'%(buildPath, cfgPath)
tdLog.info(cmdStr)
os.system(cmdStr)
self.checkJson(cfgPath, "tmq_taosx_tmp")
self.checkDataTable()
return return
...@@ -154,6 +227,7 @@ class TDTestCase: ...@@ -154,6 +227,7 @@ class TDTestCase:
os.system(cmdStr) os.system(cmdStr)
self.checkData() self.checkData()
self.checkDropData(False)
return return
...@@ -163,7 +237,7 @@ class TDTestCase: ...@@ -163,7 +237,7 @@ class TDTestCase:
tdLog.info(cmdStr) tdLog.info(cmdStr)
os.system(cmdStr) os.system(cmdStr)
self.checkDropData() self.checkDropData(True)
return return
...@@ -176,6 +250,19 @@ class TDTestCase: ...@@ -176,6 +250,19 @@ class TDTestCase:
self.checkJson(cfgPath, "tmq_taosx_tmp_snapshot") self.checkJson(cfgPath, "tmq_taosx_tmp_snapshot")
self.checkData() self.checkData()
self.checkDropData(False)
return
def checkSnapshot1VgroupTable(self):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -s -t'%(buildPath, cfgPath)
tdLog.info(cmdStr)
os.system(cmdStr)
self.checkJson(cfgPath, "tmq_taosx_tmp_snapshot")
self.checkDataTable()
return return
...@@ -186,6 +273,7 @@ class TDTestCase: ...@@ -186,6 +273,7 @@ class TDTestCase:
os.system(cmdStr) os.system(cmdStr)
self.checkData() self.checkData()
self.checkDropData(False)
return return
...@@ -195,7 +283,7 @@ class TDTestCase: ...@@ -195,7 +283,7 @@ class TDTestCase:
tdLog.info(cmdStr) tdLog.info(cmdStr)
os.system(cmdStr) os.system(cmdStr)
self.checkDropData() self.checkDropData(True)
return return
...@@ -204,6 +292,9 @@ class TDTestCase: ...@@ -204,6 +292,9 @@ class TDTestCase:
self.checkWal1Vgroup() self.checkWal1Vgroup()
self.checkSnapshot1Vgroup() self.checkSnapshot1Vgroup()
self.checkWal1VgroupTable()
self.checkSnapshot1VgroupTable()
self.checkWalMultiVgroups() self.checkWalMultiVgroups()
self.checkSnapshotMultiVgroups() self.checkSnapshotMultiVgroups()
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册