提交 4402833a 编写于 作者: L Liu Jicong

fix: memory error

上级 f5427856
...@@ -14,7 +14,9 @@ ...@@ -14,7 +14,9 @@
*/ */
#include <assert.h> #include <assert.h>
#include <signal.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h>
#include <string.h> #include <string.h>
#include <time.h> #include <time.h>
#include "taos.h" #include "taos.h"
...@@ -24,7 +26,7 @@ static void msg_process(TAOS_RES* msg) { ...@@ -24,7 +26,7 @@ static void msg_process(TAOS_RES* msg) {
char buf[1024]; char buf[1024];
memset(buf, 0, 1024); memset(buf, 0, 1024);
printf("topic: %s\n", tmq_get_topic_name(msg)); printf("topic: %s\n", tmq_get_topic_name(msg));
printf("vg:%d\n", tmq_get_vgroup_id(msg)); printf("vg: %d\n", tmq_get_vgroup_id(msg));
while (1) { while (1) {
TAOS_ROW row = taos_fetch_row(msg); TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break; if (row == NULL) break;
...@@ -141,7 +143,7 @@ int32_t create_topic() { ...@@ -141,7 +143,7 @@ int32_t create_topic() {
} }
void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) { void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) {
printf("commit %d\n", resp); printf("commit %d tmq %p offsets %p param %p\n", resp, tmq, offsets, param);
} }
tmq_t* build_consumer() { tmq_t* build_consumer() {
...@@ -232,6 +234,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -232,6 +234,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
msg_process(tmqmessage); msg_process(tmqmessage);
taos_free_result(tmqmessage); taos_free_result(tmqmessage);
tmq_commit(tmq, NULL, 1);
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/ /*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
} }
} }
......
...@@ -377,7 +377,15 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -377,7 +377,15 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
if (pParam->tmq->commitCb) { if (pParam->tmq->commitCb) {
pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, NULL, pParam->tmq->commitCbUserParam); pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, NULL, pParam->tmq->commitCbUserParam);
} }
if (!pParam->async) tsem_post(&pParam->rspSem); if (!pParam->async)
tsem_post(&pParam->rspSem);
else {
tsem_destroy(&pParam->rspSem);
/*if (pParam->pArray) {*/
/*taosArrayDestroy(pParam->pArray);*/
/*}*/
taosMemoryFree(pParam);
}
return 0; return 0;
} }
...@@ -560,7 +568,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in ...@@ -560,7 +568,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
tscError("failed to malloc request"); tscError("failed to malloc request");
} }
SMqCommitCbParam* pParam = taosMemoryMalloc(sizeof(SMqCommitCbParam)); SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
if (pParam == NULL) { if (pParam == NULL) {
return -1; return -1;
} }
...@@ -575,6 +583,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in ...@@ -575,6 +583,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
}; };
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam; sendInfo->param = pParam;
sendInfo->fp = tmqCommitCb; sendInfo->fp = tmqCommitCb;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
...@@ -585,13 +594,12 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in ...@@ -585,13 +594,12 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
if (!async) { if (!async) {
tsem_wait(&pParam->rspSem); tsem_wait(&pParam->rspSem);
resp = pParam->rspErr; resp = pParam->rspErr;
} tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam);
tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam);
if (pArray) { if (pArray) {
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
}
} }
return resp; return resp;
...@@ -1313,7 +1321,7 @@ const char* tmq_err2str(tmq_resp_err_t err) { ...@@ -1313,7 +1321,7 @@ const char* tmq_err2str(tmq_resp_err_t err) {
const char* tmq_get_topic_name(TAOS_RES* res) { const char* tmq_get_topic_name(TAOS_RES* res) {
if (TD_RES_TMQ(res)) { if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res; SMqRspObj* pRspObj = (SMqRspObj*)res;
return pRspObj->topic; return strchr(pRspObj->topic, '.') + 1;
} else { } else {
return NULL; return NULL;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册