未验证 提交 f72930ee 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #10880 from taosdata/feature/tq

fix memory error
...@@ -101,13 +101,27 @@ void* blockDataDestroy(SSDataBlock* pBlock); ...@@ -101,13 +101,27 @@ void* blockDataDestroy(SSDataBlock* pBlock);
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock); int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);
void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock); void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock);
static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) {
if (pBlock == NULL) { // WARNING: do not use info.numOfCols,
return; // sometimes info.numOfCols != array size
int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
tfree(pColInfoData->varmeta.offset);
} else {
tfree(pColInfoData->nullbitmap);
}
tfree(pColInfoData->pData);
} }
blockDataDestroy(pBlock);
taosArrayDestroy(pBlock->pDataBlock);
tfree(pBlock->pBlockAgg);
} }
static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { blockDestroyInner(pBlock); }
static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) { static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) {
int32_t tlen = 0; int32_t tlen = 0;
int32_t sz = 0; int32_t sz = 0;
...@@ -157,7 +171,7 @@ static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) { ...@@ -157,7 +171,7 @@ static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) {
} }
free(pRsp->schema); free(pRsp->schema);
} }
taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))tDeleteSSDataBlock); taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))blockDestroyInner);
pRsp->pBlockData = NULL; pRsp->pBlockData = NULL;
} }
......
...@@ -2145,11 +2145,12 @@ typedef struct { ...@@ -2145,11 +2145,12 @@ typedef struct {
typedef struct { typedef struct {
SMqRspHead head; SMqRspHead head;
int64_t reqOffset;
int64_t rspOffset;
int32_t skipLogNum;
// TODO: replace with topic name
int32_t numOfTopics;
// TODO: remove from msg // TODO: remove from msg
int64_t reqOffset;
int64_t rspOffset;
int32_t skipLogNum;
int32_t numOfTopics;
SSchemaWrapper* schema; SSchemaWrapper* schema;
SArray* pBlockData; // SArray<SSDataBlock> SArray* pBlockData; // SArray<SSDataBlock>
} SMqPollRsp; } SMqPollRsp;
......
...@@ -735,7 +735,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -735,7 +735,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqClientVg* pVg = pParam->pVg; SMqClientVg* pVg = pParam->pVg;
tmq_t* tmq = pParam->tmq; tmq_t* tmq = pParam->tmq;
if (code != 0) { if (code != 0) {
printf("msg discard\n"); printf("msg discard %x\n", code);
goto WRITE_QUEUE_FAIL; goto WRITE_QUEUE_FAIL;
} }
......
此差异已折叠。
...@@ -337,9 +337,9 @@ static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup) ...@@ -337,9 +337,9 @@ static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup)
while (key[i] != TMQ_SEPARATOR) { while (key[i] != TMQ_SEPARATOR) {
i++; i++;
} }
memcpy(topic, key, i - 1); memcpy(cgroup, key, i);
topic[i] = 0; cgroup[i] = 0;
strcpy(cgroup, &key[i + 1]); strcpy(topic, &key[i + 1]);
return 0; return 0;
} }
...@@ -539,8 +539,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { ...@@ -539,8 +539,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
mndSplitSubscribeKey(pSub->key, topic, cgroup); mndSplitSubscribeKey(pSub->key, topic, cgroup);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 "", pConsumerEp->vgId, topic, mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 " cgroup: %s", pConsumerEp->vgId,
pConsumerEp->consumerId); topic, pConsumerEp->consumerId, cgroup);
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
......
...@@ -241,12 +241,12 @@ static int32_t mndGetPlanString(SCMCreateTopicReq *pCreate, char **pStr) { ...@@ -241,12 +241,12 @@ static int32_t mndGetPlanString(SCMCreateTopicReq *pCreate, char **pStr) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SNode* pAst = NULL; SNode *pAst = NULL;
int32_t code = nodesStringToNode(pCreate->ast, &pAst); int32_t code = nodesStringToNode(pCreate->ast, &pAst);
SQueryPlan* pPlan = NULL; SQueryPlan *pPlan = NULL;
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
SPlanContext cxt = { .pAstRoot = pAst, .topicQuery = true }; SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
code = qCreateQueryPlan(&cxt, &pPlan, NULL); code = qCreateQueryPlan(&cxt, &pPlan, NULL);
} }
...@@ -274,7 +274,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq ...@@ -274,7 +274,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
topicObj.logicalPlan = ""; topicObj.logicalPlan = "";
topicObj.sqlLen = strlen(pCreate->sql); topicObj.sqlLen = strlen(pCreate->sql);
char* pPlanStr = NULL; char *pPlanStr = NULL;
if (TSDB_CODE_SUCCESS != mndGetPlanString(pCreate, &pPlanStr)) { if (TSDB_CODE_SUCCESS != mndGetPlanString(pCreate, &pPlanStr)) {
mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr()); mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr());
return -1; return -1;
......
...@@ -316,7 +316,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -316,7 +316,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqPollRsp(&abuf, &rsp); tEncodeSMqPollRsp(&abuf, &rsp);
taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock); /*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/
pMsg->pCont = buf; pMsg->pCont = buf;
pMsg->contLen = tlen; pMsg->contLen = tlen;
pMsg->code = 0; pMsg->code = 0;
......
...@@ -306,7 +306,7 @@ int32_t init_env() { ...@@ -306,7 +306,7 @@ int32_t init_env() {
} }
//const char* sql = "select * from tu1"; //const char* sql = "select * from tu1";
sprintf(sqlStr, "create topic test_stb_topic_1 as select * from %s%d", g_stConfInfo.stbName, 0); sprintf(sqlStr, "create topic test_stb_topic_1 as select * from %s", g_stConfInfo.stbName);
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr));*/ /*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr));*/
pRes = taos_query(pConn, sqlStr); pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册