提交 dffe02e3 编写于 作者: wmmhello's avatar wmmhello

fix:modify delete msg type to tmq meta

上级 51a6aa51
...@@ -179,7 +179,7 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) { ...@@ -179,7 +179,7 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
} }
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
end: end:
cJSON_Delete(json); cJSON_Delete(json);
tFreeSMAltertbReq(&req); tFreeSMAltertbReq(&req);
return string; return string;
...@@ -200,7 +200,7 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) { ...@@ -200,7 +200,7 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) {
} }
string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE); string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
_err: _err:
tDecoderClear(&coder); tDecoderClear(&coder);
return string; return string;
} }
...@@ -220,7 +220,7 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) { ...@@ -220,7 +220,7 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) {
} }
string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen); string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen);
_err: _err:
tDecoderClear(&coder); tDecoderClear(&coder);
return string; return string;
} }
...@@ -302,7 +302,7 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { ...@@ -302,7 +302,7 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
cJSON_AddItemToArray(tags, tag); cJSON_AddItemToArray(tags, tag);
} }
end: end:
cJSON_AddItemToObject(json, "tags", tags); cJSON_AddItemToObject(json, "tags", tags);
taosArrayDestroy(pTagVals); taosArrayDestroy(pTagVals);
} }
...@@ -360,7 +360,7 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) { ...@@ -360,7 +360,7 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
} }
} }
_exit: _exit:
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq; pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment); taosMemoryFreeClear(pCreateReq->comment);
...@@ -393,7 +393,7 @@ static char* processAutoCreateTable(STaosxRsp* rsp) { ...@@ -393,7 +393,7 @@ static char* processAutoCreateTable(STaosxRsp* rsp) {
} }
string = buildCreateCTableJson(pCreateReq, rsp->createTableNum); string = buildCreateCTableJson(pCreateReq, rsp->createTableNum);
_exit: _exit:
for (int i = 0; i < rsp->createTableNum; i++) { for (int i = 0; i < rsp->createTableNum; i++) {
tDecoderClear(&decoder[i]); tDecoderClear(&decoder[i]);
taosMemoryFreeClear(pCreateReq[i].comment); taosMemoryFreeClear(pCreateReq[i].comment);
...@@ -515,7 +515,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) { ...@@ -515,7 +515,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
} }
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
_exit: _exit:
cJSON_Delete(json); cJSON_Delete(json);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return string; return string;
...@@ -548,7 +548,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) { ...@@ -548,7 +548,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
_exit: _exit:
cJSON_Delete(json); cJSON_Delete(json);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return string; return string;
...@@ -590,7 +590,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) { ...@@ -590,7 +590,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
_exit: _exit:
cJSON_Delete(json); cJSON_Delete(json);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return string; return string;
...@@ -678,7 +678,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -678,7 +678,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
code = pRequest->code; code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg); taosMemoryFree(pCmdMsg.pMsg);
end: end:
destroyRequest(pRequest); destroyRequest(pRequest);
tFreeSMCreateStbReq(&pReq); tFreeSMCreateStbReq(&pReq);
tDecoderClear(&coder); tDecoderClear(&coder);
...@@ -748,7 +748,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -748,7 +748,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
code = pRequest->code; code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg); taosMemoryFree(pCmdMsg.pMsg);
end: end:
destroyRequest(pRequest); destroyRequest(pRequest);
tDecoderClear(&coder); tDecoderClear(&coder);
return code; return code;
...@@ -809,9 +809,9 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -809,9 +809,9 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch); taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
// loop to create table // loop to create table
...@@ -891,7 +891,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -891,7 +891,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
code = pRequest->code; code = pRequest->code;
end: end:
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq; pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment); taosMemoryFreeClear(pCreateReq->comment);
...@@ -961,9 +961,9 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -961,9 +961,9 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch); taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
// loop to create table // loop to create table
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
...@@ -1015,7 +1015,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -1015,7 +1015,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
} }
code = pRequest->code; code = pRequest->code;
end: end:
taosHashCleanup(pVgroupHashmap); taosHashCleanup(pVgroupHashmap);
destroyRequest(pRequest); destroyRequest(pRequest);
tDecoderClear(&coder); tDecoderClear(&coder);
...@@ -1073,7 +1073,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -1073,7 +1073,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
char sql[256] = {0}; char sql[256] = {0};
snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName, snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
req.tsColName, req.skey, req.tsColName, req.ekey); req.tsColName, req.skey, req.tsColName, req.ekey);
printf("delete sql:%s\n", sql); uDebug("delete sql:%s\n", sql);
TAOS_RES* res = taos_query(taos, sql); TAOS_RES* res = taos_query(taos, sql);
SRequestObj* pRequest = (SRequestObj*)res; SRequestObj* pRequest = (SRequestObj*)res;
...@@ -1083,7 +1083,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -1083,7 +1083,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
} }
taos_free_result(res); taos_free_result(res);
end: end:
tDecoderClear(&coder); tDecoderClear(&coder);
return code; return code;
} }
...@@ -1130,9 +1130,9 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -1130,9 +1130,9 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
} }
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
SVgroupInfo pInfo = {0}; SVgroupInfo pInfo = {0};
SName pName = {0}; SName pName = {0};
...@@ -1191,7 +1191,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -1191,7 +1191,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
code = handleAlterTbExecRes(pRes->res, pCatalog); code = handleAlterTbExecRes(pRes->res, pCatalog);
} }
} }
end: end:
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
if (pVgData) taosMemoryFreeClear(pVgData->pData); if (pVgData) taosMemoryFreeClear(pVgData->pData);
taosMemoryFreeClear(pVgData); taosMemoryFreeClear(pVgData);
...@@ -1267,14 +1267,14 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch ...@@ -1267,14 +1267,14 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
uint16_t fLen = 0; uint16_t fLen = 0;
int32_t rowSize = 0; int32_t rowSize = 0;
int16_t nVar = 0; int16_t nVar = 0;
for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) { for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
SSchema* schema = pTableMeta->schema + i; SSchema* schema = pTableMeta->schema + i;
fLen += TYPE_BYTES[schema->type]; fLen += TYPE_BYTES[schema->type];
rowSize += schema->bytes; rowSize += schema->bytes;
if (IS_VAR_DATA_TYPE(schema->type)) { if (IS_VAR_DATA_TYPE(schema->type)) {
nVar++; nVar++;
}
} }
}
fLen -= sizeof(TSKEY); fLen -= sizeof(TSKEY);
...@@ -1597,7 +1597,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) ...@@ -1597,7 +1597,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code; code = pRequest->code;
end: end:
taosMemoryFreeClear(pTableMeta); taosMemoryFreeClear(pTableMeta);
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
taosMemoryFree(subReq); taosMemoryFree(subReq);
...@@ -1652,7 +1652,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { ...@@ -1652,7 +1652,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
conn.requestObjRefId = pRequest->self; conn.requestObjRefId = pRequest->self;
conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
printf("raw data block num:%d\n", rspObj.rsp.blockNum); uDebug("raw data block num:%d\n", rspObj.rsp.blockNum);
while (++rspObj.resIter < rspObj.rsp.blockNum) { while (++rspObj.resIter < rspObj.rsp.blockNum) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
if (!rspObj.rsp.withSchema) { if (!rspObj.rsp.withSchema) {
...@@ -1675,7 +1675,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { ...@@ -1675,7 +1675,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
goto end; goto end;
} }
printf("raw data tbname:%s\n", tbName); uDebug("raw data tbname:%s\n", tbName);
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
strcpy(pName.dbname, pRequest->pDb); strcpy(pName.dbname, pRequest->pDb);
strcpy(pName.tname, tbName); strcpy(pName.tname, tbName);
...@@ -1861,7 +1861,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { ...@@ -1861,7 +1861,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code; code = pRequest->code;
end: end:
tDeleteSMqDataRsp(&rspObj.rsp); tDeleteSMqDataRsp(&rspObj.rsp);
rspObj.resInfo.pRspMsg = NULL; rspObj.resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&rspObj.resInfo); doFreeReqResultInfo(&rspObj.resInfo);
...@@ -1922,7 +1922,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) ...@@ -1922,7 +1922,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
conn.requestObjRefId = pRequest->self; conn.requestObjRefId = pRequest->self;
conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
printf("raw data block num:%d\n", rspObj.rsp.blockNum); uDebug("raw data block num:%d\n", rspObj.rsp.blockNum);
while (++rspObj.resIter < rspObj.rsp.blockNum) { while (++rspObj.resIter < rspObj.rsp.blockNum) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
if (!rspObj.rsp.withSchema) { if (!rspObj.rsp.withSchema) {
...@@ -1945,7 +1945,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) ...@@ -1945,7 +1945,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
goto end; goto end;
} }
printf("raw data tbname:%s\n", tbName); uDebug("raw data tbname:%s\n", tbName);
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
strcpy(pName.dbname, pRequest->pDb); strcpy(pName.dbname, pRequest->pDb);
strcpy(pName.tname, tbName); strcpy(pName.tname, tbName);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册