未验证 提交 09c10df4 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #10087 from taosdata/feature/tq

fix mem leak
...@@ -92,17 +92,6 @@ typedef struct taosField { ...@@ -92,17 +92,6 @@ typedef struct taosField {
typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code); typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code);
typedef struct tmq_t tmq_t;
typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t;
typedef struct tmq_message_t tmq_message_t;
typedef struct tmq_message_topic_t tmq_message_topic_t;
typedef struct tmq_message_tb_t tmq_message_tb_t;
typedef struct tmq_tb_iter_t tmq_tb_iter_t;
typedef struct tmq_message_col_t tmq_message_col_t;
typedef struct tmq_col_iter_t tmq_col_iter_t;
typedef struct TAOS_BIND { typedef struct TAOS_BIND {
int buffer_type; int buffer_type;
void * buffer; void * buffer;
...@@ -205,27 +194,59 @@ DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList); ...@@ -205,27 +194,59 @@ DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList);
DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision); DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision);
/* --------------------------TMQ INTERFACE------------------------------- */ /* --------------------------TMQ INTERFACE------------------------------- */
typedef struct tmq_resp_err_t tmq_resp_err_t;
enum tmq_resp_err_t {
TMQ_RESP_ERR__SUCCESS = 0,
TMQ_RESP_ERR__FAIL = 1,
};
typedef enum tmq_resp_err_t tmq_resp_err_t;
typedef struct tmq_t tmq_t;
typedef struct tmq_topic_vgroup_t tmq_topic_vgroup_t; typedef struct tmq_topic_vgroup_t tmq_topic_vgroup_t;
typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t; typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t;
typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t;
typedef struct tmq_message_t tmq_message_t;
typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param)); typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param));
DLL_EXPORT tmq_list_t* tmq_list_new(); DLL_EXPORT tmq_list_t* tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t*, char*); DLL_EXPORT int32_t tmq_list_append(tmq_list_t*, char*);
DLL_EXPORT tmq_conf_t* tmq_conf_new();
DLL_EXPORT int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value);
DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb);
DLL_EXPORT TAOS_RES *taos_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen); DLL_EXPORT TAOS_RES *taos_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen);
DLL_EXPORT tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen); DLL_EXPORT tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen);
DLL_EXPORT TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list); /* ------------------------TMQ CONSUMER INTERFACE------------------------ */
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_topic_vgroup_list_t** topics);
#endif
DLL_EXPORT tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time); DLL_EXPORT tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time);
DLL_EXPORT tmq_resp_err_t* tmq_commit(tmq_t* tmq, tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async); #if 0
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t* tmq);
DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups);
DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** vgroups);
#endif
DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async);
#endif
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
enum tmq_conf_res_t {
TMQ_CONF_UNKNOWN = -2,
TMQ_CONF_INVALID = -1,
TMQ_CONF_OK = 0,
};
typedef enum tmq_conf_res_t tmq_conf_res_t;
DLL_EXPORT tmq_conf_t* tmq_conf_new();
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t* conf);
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value);
DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -119,7 +119,7 @@ static FORCE_INLINE void* tDecodeDataBlock(void* buf, SSDataBlock* pBlock) { ...@@ -119,7 +119,7 @@ static FORCE_INLINE void* tDecodeDataBlock(void* buf, SSDataBlock* pBlock) {
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData)); pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SColumnInfoData data; SColumnInfoData data = {0};
buf = taosDecodeFixedI16(buf, &data.info.colId); buf = taosDecodeFixedI16(buf, &data.info.colId);
buf = taosDecodeFixedI16(buf, &data.info.type); buf = taosDecodeFixedI16(buf, &data.info.type);
buf = taosDecodeFixedI16(buf, &data.info.bytes); buf = taosDecodeFixedI16(buf, &data.info.bytes);
...@@ -167,13 +167,46 @@ static FORCE_INLINE void* tDecodeSMqConsumeRsp(void* buf, SMqConsumeRsp* pRsp) { ...@@ -167,13 +167,46 @@ static FORCE_INLINE void* tDecodeSMqConsumeRsp(void* buf, SMqConsumeRsp* pRsp) {
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
pRsp->pBlockData = taosArrayInit(sz, sizeof(SSDataBlock)); pRsp->pBlockData = taosArrayInit(sz, sizeof(SSDataBlock));
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SSDataBlock block; SSDataBlock block = {0};
tDecodeDataBlock(buf, &block); tDecodeDataBlock(buf, &block);
taosArrayPush(pRsp->pBlockData, &block); taosArrayPush(pRsp->pBlockData, &block);
} }
return buf; return buf;
} }
static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) {
if (pBlock == NULL) {
return;
}
//int32_t numOfOutput = pBlock->info.numOfCols;
int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
for(int32_t i = 0; i < sz; ++i) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
tfree(pColInfoData->pData);
}
taosArrayDestroy(pBlock->pDataBlock);
tfree(pBlock->pBlockAgg);
//tfree(pBlock);
}
static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqConsumeRsp* pRsp) {
if (pRsp->schemas) {
if (pRsp->schemas->nCols) {
tfree(pRsp->schemas->pSchema);
}
free(pRsp->schemas);
}
taosArrayDestroyEx(pRsp->pBlockData, (void(*)(void*))tDeleteSSDataBlock);
pRsp->pBlockData = NULL;
//for (int i = 0; i < taosArrayGetSize(pRsp->pBlockData); i++) {
//SSDataBlock* pDataBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i);
//tDeleteSSDataBlock(pDataBlock);
//}
}
//====================================================================================================================== //======================================================================================================================
// the following structure shared by parser and executor // the following structure shared by parser and executor
typedef struct SColumn { typedef struct SColumn {
......
...@@ -1752,6 +1752,10 @@ typedef struct SMqCMGetSubEpRsp { ...@@ -1752,6 +1752,10 @@ typedef struct SMqCMGetSubEpRsp {
SArray* topics; // SArray<SMqSubTopicEp> SArray* topics; // SArray<SMqSubTopicEp>
} SMqCMGetSubEpRsp; } SMqCMGetSubEpRsp;
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
taosArrayDestroy(pSubTopicEp->vgs);
}
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pVgEp->vgId); tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
...@@ -1765,6 +1769,10 @@ static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) { ...@@ -1765,6 +1769,10 @@ static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
return buf; return buf;
} }
static FORCE_INLINE void tDeleteSMqCMGetSubEpRsp(SMqCMGetSubEpRsp* pRsp) {
taosArrayDestroyEx(pRsp->topics, (void (*)(void*)) tDeleteSMqSubTopicEp);
}
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) { static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeString(buf, pTopicEp->topic); tlen += taosEncodeString(buf, pTopicEp->topic);
......
...@@ -26,6 +26,51 @@ ...@@ -26,6 +26,51 @@
#include "tpagedfile.h" #include "tpagedfile.h"
#include "tref.h" #include "tref.h"
struct tmq_list_t {
int32_t cnt;
int32_t tot;
char* elems[];
};
struct tmq_topic_vgroup_t {
char* topic;
int32_t vgId;
int64_t commitOffset;
};
struct tmq_topic_vgroup_list_t {
int32_t cnt;
int32_t size;
tmq_topic_vgroup_t* elems;
};
struct tmq_conf_t {
char clientId[256];
char groupId[256];
/*char* ip;*/
/*uint16_t port;*/
tmq_commit_cb* commit_cb;
};
struct tmq_t {
char groupId[256];
char clientId[256];
SRWLatch lock;
int64_t consumerId;
int64_t epoch;
int64_t status;
tsem_t rspSem;
STscObj* pTscObj;
tmq_commit_cb* commit_cb;
int32_t nextTopicIdx;
SArray* clientTopics; //SArray<SMqClientTopic>
//stat
int64_t pollCnt;
};
struct tmq_message_t {
SMqConsumeRsp rsp;
};
typedef struct SMqClientVg { typedef struct SMqClientVg {
// statistics // statistics
int64_t pollCnt; int64_t pollCnt;
...@@ -47,83 +92,43 @@ typedef struct SMqClientTopic { ...@@ -47,83 +92,43 @@ typedef struct SMqClientTopic {
SArray* vgs; //SArray<SMqClientVg> SArray* vgs; //SArray<SMqClientVg>
} SMqClientTopic; } SMqClientTopic;
typedef struct SMqAskEpCbParam { typedef struct SMqAskEpCbParam {
tmq_t* tmq; tmq_t* tmq;
int32_t wait; int32_t wait;
} SMqAskEpCbParam; } SMqAskEpCbParam;
struct tmq_resp_err_t {
int32_t code;
};
struct tmq_topic_vgroup_t {
char* topic;
int32_t vgId;
int64_t commitOffset;
};
struct tmq_topic_vgroup_list_t {
int32_t cnt;
int32_t size;
tmq_topic_vgroup_t* elems;
};
typedef struct SMqConsumeCbParam { typedef struct SMqConsumeCbParam {
tmq_t* tmq; tmq_t* tmq;
SMqClientVg* pVg; SMqClientVg* pVg;
tmq_message_t** retMsg; tmq_message_t** retMsg;
} SMqConsumeCbParam; } SMqConsumeCbParam;
struct tmq_conf_t { typedef struct SMqSubscribeCbParam {
char clientId[256]; tmq_t* tmq;
char groupId[256]; tsem_t rspSem;
char* ip; tmq_resp_err_t rspErr;
uint16_t port; } SMqSubscribeCbParam;
tmq_commit_cb* commit_cb;
};
struct tmq_message_t {
SMqConsumeRsp rsp;
};
tmq_conf_t* tmq_conf_new() { tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t)); tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t));
return conf; return conf;
} }
int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) { void tmq_conf_destroy(tmq_conf_t* conf) {
if(conf) free(conf);
}
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
if (strcmp(key, "group.id") == 0) { if (strcmp(key, "group.id") == 0) {
strcpy(conf->groupId, value); strcpy(conf->groupId, value);
} }
if (strcmp(key, "client.id") == 0) { if (strcmp(key, "client.id") == 0) {
strcpy(conf->clientId, value); strcpy(conf->clientId, value);
} }
return 0; return TMQ_CONF_OK;
} }
struct tmq_t {
char groupId[256];
char clientId[256];
SRWLatch lock;
int64_t consumerId;
int64_t epoch;
int64_t status;
tsem_t rspSem;
STscObj* pTscObj;
tmq_commit_cb* commit_cb;
int32_t nextTopicIdx;
SArray* clientTopics; //SArray<SMqClientTopic>
//stat
int64_t pollCnt;
};
struct tmq_list_t {
int32_t cnt;
int32_t tot;
char* elems[];
};
tmq_list_t* tmq_list_new() { tmq_list_t* tmq_list_new() {
tmq_list_t *ptr = malloc(sizeof(tmq_list_t) + 8 * sizeof(char*)); tmq_list_t *ptr = malloc(sizeof(tmq_list_t) + 8 * sizeof(char*));
if (ptr == NULL) { if (ptr == NULL) {
...@@ -141,6 +146,12 @@ int32_t tmq_list_append(tmq_list_t* ptr, char* src) { ...@@ -141,6 +146,12 @@ int32_t tmq_list_append(tmq_list_t* ptr, char* src) {
return 0; return 0;
} }
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
pParam->rspErr = code;
tsem_post(&pParam->rspSem);
return 0;
}
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
tmq_t* pTmq = calloc(sizeof(tmq_t), 1); tmq_t* pTmq = calloc(sizeof(tmq_t), 1);
...@@ -161,7 +172,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs ...@@ -161,7 +172,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
return pTmq; return pTmq;
} }
TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
SRequestObj *pRequest = NULL; SRequestObj *pRequest = NULL;
int32_t sz = topic_list->cnt; int32_t sz = topic_list->cnt;
//destroy ex //destroy ex
...@@ -219,27 +230,31 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { ...@@ -219,27 +230,31 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
tscError("failed to malloc sqlObj"); tscError("failed to malloc sqlObj");
} }
SMqSubscribeCbParam param = {
.rspErr = TMQ_RESP_ERR__SUCCESS,
.tmq = tmq
};
tsem_init(&param.rspSem, 0, 0);
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
/*sendInfo->fp*/ sendInfo->param = &param;
sendInfo->fp = tmqSubscribeCb;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&param.rspSem);
tsem_destroy(&param.rspSem);
_return: _return:
/*if (sendInfo != NULL) {*/ /*if (sendInfo != NULL) {*/
/*destroySendMsgInfo(sendInfo);*/ /*destroySendMsgInfo(sendInfo);*/
/*}*/ /*}*/
if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) { return param.rspErr;
pRequest->code = terrno;
}
return pRequest;
} }
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) {
...@@ -441,15 +456,17 @@ static char *formatTimestamp(char *buf, int64_t val, int precision) { ...@@ -441,15 +456,17 @@ static char *formatTimestamp(char *buf, int64_t val, int precision) {
int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) {
if (code == -1) { if (code == -1) {
printf("msg discard\n"); printf("msg discard\n");
free(param);
return 0; return 0;
} }
char pBuf[128]; char pBuf[128];
SMqConsumeCbParam* pParam = (SMqConsumeCbParam*)param; SMqConsumeCbParam* pParam = (SMqConsumeCbParam*)param;
SMqClientVg* pVg = pParam->pVg; SMqClientVg* pVg = pParam->pVg;
SMqConsumeRsp rsp; SMqConsumeRsp rsp = {0};
tDecodeSMqConsumeRsp(pMsg->pData, &rsp); tDecodeSMqConsumeRsp(pMsg->pData, &rsp);
if (rsp.numOfTopics == 0) { if (rsp.numOfTopics == 0) {
/*printf("no data\n");*/ /*printf("no data\n");*/
free(param);
return 0; return 0;
} }
int32_t colNum = rsp.schemas->nCols; int32_t colNum = rsp.schemas->nCols;
...@@ -486,6 +503,8 @@ int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -486,6 +503,8 @@ int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) {
printf("\n"); printf("\n");
} }
} }
tDeleteSMqConsumeRsp(&rsp);
free(param);
/*printf("\n-----msg end------\n");*/ /*printf("\n-----msg end------\n");*/
return 0; return 0;
} }
...@@ -542,6 +561,7 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -542,6 +561,7 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) {
if (pParam->wait) { if (pParam->wait) {
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
} }
tDeleteSMqCMGetSubEpRsp(&rsp);
free(pParam); free(pParam);
return 0; return 0;
} }
...@@ -550,6 +570,7 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) { ...@@ -550,6 +570,7 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) {
int32_t tlen = sizeof(SMqCMGetSubEpReq); int32_t tlen = sizeof(SMqCMGetSubEpReq);
SMqCMGetSubEpReq* buf = malloc(tlen); SMqCMGetSubEpReq* buf = malloc(tlen);
if (buf == NULL) { if (buf == NULL) {
goto END;
tscError("failed to malloc get subscribe ep buf"); tscError("failed to malloc get subscribe ep buf");
} }
buf->consumerId = htobe64(tmq->consumerId); buf->consumerId = htobe64(tmq->consumerId);
...@@ -557,6 +578,7 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) { ...@@ -557,6 +578,7 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) {
SRequestObj *pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP); SRequestObj *pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP);
if (pRequest == NULL) { if (pRequest == NULL) {
goto END;
tscError("failed to malloc subscribe ep request"); tscError("failed to malloc subscribe ep request");
} }
...@@ -564,7 +586,6 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) { ...@@ -564,7 +586,6 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) {
SMqAskEpCbParam *pParam = malloc(sizeof(SMqAskEpCbParam)); SMqAskEpCbParam *pParam = malloc(sizeof(SMqAskEpCbParam));
if (pParam == NULL) { if (pParam == NULL) {
free(buf);
goto END; goto END;
} }
pParam->tmq = tmq; pParam->tmq = tmq;
...@@ -611,10 +632,10 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -611,10 +632,10 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
tmq_message_t* tmq_message = NULL; tmq_message_t* tmq_message = NULL;
int64_t status = atomic_load_64(&tmq->status); int64_t status = atomic_load_64(&tmq->status);
tmqAsyncAskEp(tmq, status == 0 || taosArrayGetSize(tmq->clientTopics)); tmqAsyncAskEp(tmq, taosArrayGetSize(tmq->clientTopics));
/*if (blocking_time < 0) blocking_time = 500;*/ /*if (blocking_time < 0) blocking_time = 500;*/
blocking_time = 1000; blocking_time = 1;
if (taosArrayGetSize(tmq->clientTopics) == 0) { if (taosArrayGetSize(tmq->clientTopics) == 0) {
tscDebug("consumer:%ld poll but not assigned", tmq->consumerId); tscDebug("consumer:%ld poll but not assigned", tmq->consumerId);
...@@ -674,16 +695,15 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -674,16 +695,15 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
/*return pRequest;*/ /*return pRequest;*/
} }
tmq_resp_err_t* tmq_commit(tmq_t* tmq, tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) { tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) {
SMqConsumeReq req = {0}; SMqConsumeReq req = {0};
return NULL; return 0;
} }
void tmq_message_destroy(tmq_message_t* tmq_message) { void tmq_message_destroy(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return; if (tmq_message == NULL) return;
} }
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
assert(pMsgBody != NULL); assert(pMsgBody != NULL);
tfree(pMsgBody->msgInfo.pData); tfree(pMsgBody->msgInfo.pData);
......
...@@ -606,7 +606,7 @@ TEST(testCase, create_topic_stb_Test) { ...@@ -606,7 +606,7 @@ TEST(testCase, create_topic_stb_Test) {
taos_free_result(pRes); taos_free_result(pRes);
char* sql = "select ts, k from st1"; char* sql = "select * from st1";
pRes = taos_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql)); pRes = taos_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
......
...@@ -393,6 +393,12 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu ...@@ -393,6 +393,12 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu
return buf; return buf;
} }
static FORCE_INLINE void tDeleteSMqConsumerEp(SMqConsumerEp* pConsumerEp) {
if (pConsumerEp) {
tfree(pConsumerEp->qmsg);
}
}
// unit for rebalance // unit for rebalance
typedef struct SMqSubscribeObj { typedef struct SMqSubscribeObj {
char key[TSDB_SUBSCRIBE_KEY_LEN]; char key[TSDB_SUBSCRIBE_KEY_LEN];
...@@ -520,7 +526,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) ...@@ -520,7 +526,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
return NULL; return NULL;
} }
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp cEp; SMqConsumerEp cEp = {0};
buf = tDecodeSMqConsumerEp(buf, &cEp); buf = tDecodeSMqConsumerEp(buf, &cEp);
taosArrayPush(pSub->assigned, &cEp); taosArrayPush(pSub->assigned, &cEp);
} }
...@@ -533,7 +539,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) ...@@ -533,7 +539,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
return NULL; return NULL;
} }
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp cEp; SMqConsumerEp cEp = {0};
buf = tDecodeSMqConsumerEp(buf, &cEp); buf = tDecodeSMqConsumerEp(buf, &cEp);
taosArrayPush(pSub->lostConsumer, &cEp); taosArrayPush(pSub->lostConsumer, &cEp);
} }
...@@ -547,7 +553,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) ...@@ -547,7 +553,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
return NULL; return NULL;
} }
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp cEp; SMqConsumerEp cEp = {0};
buf = tDecodeSMqConsumerEp(buf, &cEp); buf = tDecodeSMqConsumerEp(buf, &cEp);
taosArrayPush(pSub->idleConsumer, &cEp); taosArrayPush(pSub->idleConsumer, &cEp);
} }
...@@ -563,7 +569,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) ...@@ -563,7 +569,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
return NULL; return NULL;
} }
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp cEp; SMqConsumerEp cEp = {0};
buf = tDecodeSMqConsumerEp(buf, &cEp); buf = tDecodeSMqConsumerEp(buf, &cEp);
taosArrayPush(pSub->unassignedVg, &cEp); taosArrayPush(pSub->unassignedVg, &cEp);
} }
...@@ -571,6 +577,29 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) ...@@ -571,6 +577,29 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
return buf; return buf;
} }
static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) {
if (pSub->availConsumer) {
taosArrayDestroy(pSub->availConsumer);
pSub->availConsumer = NULL;
}
if (pSub->assigned) {
taosArrayDestroyEx(pSub->assigned, (void (*)(void*))tDeleteSMqConsumerEp);
pSub->assigned = NULL;
}
if (pSub->unassignedVg) {
taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp);
pSub->unassignedVg = NULL;
}
if (pSub->idleConsumer) {
taosArrayDestroyEx(pSub->idleConsumer, (void (*)(void*))tDeleteSMqConsumerEp);
pSub->idleConsumer = NULL;
}
if (pSub->lostConsumer) {
taosArrayDestroyEx(pSub->lostConsumer, (void (*)(void*))tDeleteSMqConsumerEp);
pSub->lostConsumer = NULL;
}
}
typedef struct SMqCGroup { typedef struct SMqCGroup {
char name[TSDB_CONSUMER_GROUP_LEN]; char name[TSDB_CONSUMER_GROUP_LEN];
int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal
......
...@@ -75,6 +75,7 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { ...@@ -75,6 +75,7 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
CM_ENCODE_OVER: CM_ENCODE_OVER:
tfree(buf);
if (terrno != 0) { if (terrno != 0) {
mError("consumer:%ld, failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); mError("consumer:%ld, failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
...@@ -117,6 +118,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { ...@@ -117,6 +118,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
CM_DECODE_OVER: CM_DECODE_OVER:
tfree(buf);
if (terrno != TSDB_CODE_SUCCESS) { if (terrno != TSDB_CODE_SUCCESS) {
mError("consumer:%ld, failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); mError("consumer:%ld, failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
tfree(pRow); tfree(pRow);
......
...@@ -69,7 +69,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) { ...@@ -69,7 +69,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont; SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont;
SMqCMGetSubEpRsp rsp; SMqCMGetSubEpRsp rsp = {0};
int64_t consumerId = be64toh(pReq->consumerId); int64_t consumerId = be64toh(pReq->consumerId);
int64_t currentTs = taosGetTimestampMs(); int64_t currentTs = taosGetTimestampMs();
...@@ -122,7 +122,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { ...@@ -122,7 +122,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
if (changed || found) { if (changed || found) {
SSdbRaw *pRaw = mndSubActionEncode(pSub); SSdbRaw *pRaw = mndSubActionEncode(pSub);
sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbSetRawStatus(pRaw, SDB_STATUS_READY);
sdbWriteNotFree(pMnode->pSdb, pRaw); sdbWrite(pMnode->pSdb, pRaw);
} }
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);
} }
...@@ -134,7 +134,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { ...@@ -134,7 +134,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
} }
void *abuf = buf; void *abuf = buf;
tEncodeSMqCMGetSubEpRsp(&abuf, &rsp); tEncodeSMqCMGetSubEpRsp(&abuf, &rsp);
// TODO: free rsp tDeleteSMqCMGetSubEpRsp(&rsp);
pMsg->pCont = buf; pMsg->pCont = buf;
pMsg->contLen = tlen; pMsg->contLen = tlen;
return 0; return 0;
...@@ -292,7 +292,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas ...@@ -292,7 +292,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
return -1; return -1;
} }
SMqConsumerEp CEp; SMqConsumerEp CEp = {0};
CEp.status = 0; CEp.status = 0;
CEp.consumerId = -1; CEp.consumerId = -1;
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
...@@ -388,6 +388,7 @@ static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) { ...@@ -388,6 +388,7 @@ static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
SUB_ENCODE_OVER: SUB_ENCODE_OVER:
tfree(buf);
if (terrno != 0) { if (terrno != 0) {
mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr()); mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
...@@ -431,6 +432,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { ...@@ -431,6 +432,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
SUB_DECODE_OVER: SUB_DECODE_OVER:
tfree(buf);
if (terrno != TSDB_CODE_SUCCESS) { if (terrno != TSDB_CODE_SUCCESS) {
mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr()); mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr());
// TODO free subscribeobj // TODO free subscribeobj
...@@ -448,6 +450,7 @@ static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) { ...@@ -448,6 +450,7 @@ static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) { static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
mTrace("subscribe:%s, perform delete action", pSub->key); mTrace("subscribe:%s, perform delete action", pSub->key);
tDeleteSMqSubscribeObj(pSub);
return 0; return 0;
} }
......
...@@ -240,15 +240,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq ...@@ -240,15 +240,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq
topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name)); topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
topicObj.dbUid = pDb->uid; topicObj.dbUid = pDb->uid;
topicObj.version = 1; topicObj.version = 1;
topicObj.sql = strdup(pCreate->sql); topicObj.sql = pCreate->sql;
topicObj.physicalPlan = strdup(pCreate->physicalPlan); topicObj.physicalPlan = pCreate->physicalPlan;
topicObj.logicalPlan = strdup(pCreate->logicalPlan); topicObj.logicalPlan = pCreate->logicalPlan;
topicObj.sqlLen = strlen(pCreate->sql); topicObj.sqlLen = strlen(pCreate->sql);
SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj); SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj);
if (pTopicRaw == NULL) return -1; if (pTopicRaw == NULL) return -1;
if (sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY) != 0) return -1; if (sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY) != 0) return -1;
// TODO: replace with trans to support recovery
return sdbWrite(pMnode->pSdb, pTopicRaw); return sdbWrite(pMnode->pSdb, pTopicRaw);
} }
......
...@@ -785,6 +785,15 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -785,6 +785,15 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
} }
void* abuf = buf; void* abuf = buf;
tEncodeSMqConsumeRsp(&abuf, &rsp); tEncodeSMqConsumeRsp(&abuf, &rsp);
if (rsp.pBlockData) {
taosArrayDestroyEx(rsp.pBlockData, (void(*)(void*))tDeleteSSDataBlock);
rsp.pBlockData = NULL;
/*for (int i = 0; i < taosArrayGetSize(rsp.pBlockData); i++) {*/
/*SSDataBlock* pBlock = taosArrayGet(rsp.pBlockData, i);*/
/*tDeleteSSDataBlock(pBlock);*/
/*}*/
/*taosArrayDestroy(rsp.pBlockData);*/
}
pMsg->pCont = buf; pMsg->pCont = buf;
pMsg->contLen = tlen; pMsg->contLen = tlen;
pMsg->code = 0; pMsg->code = 0;
...@@ -916,6 +925,11 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { ...@@ -916,6 +925,11 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
int32_t numOfCols = pHandle->pSchema->numOfCols; int32_t numOfCols = pHandle->pSchema->numOfCols;
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList); int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
//TODO: stable case
if (colNumNeed > pSchemaWrapper->nCols) {
colNumNeed = pSchemaWrapper->nCols;
}
SArray* pArray = taosArrayInit(colNumNeed, sizeof(SColumnInfoData)); SArray* pArray = taosArrayInit(colNumNeed, sizeof(SColumnInfoData));
if (pArray == NULL) { if (pArray == NULL) {
return NULL; return NULL;
...@@ -928,7 +942,6 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { ...@@ -928,7 +942,6 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
j++; j++;
} }
SSchema* pColSchema = &pSchemaWrapper->pSchema[j]; SSchema* pColSchema = &pSchemaWrapper->pSchema[j];
ASSERT(pColSchema->colId == colId);
SColumnInfoData colInfo = {0}; SColumnInfoData colInfo = {0};
int sz = numOfRows * pColSchema->bytes; int sz = numOfRows * pColSchema->bytes;
colInfo.info.bytes = pColSchema->bytes; colInfo.info.bytes = pColSchema->bytes;
......
...@@ -92,7 +92,7 @@ int32_t debugFlag = 0; ...@@ -92,7 +92,7 @@ int32_t debugFlag = 0;
int32_t sDebugFlag = 135; int32_t sDebugFlag = 135;
int32_t wDebugFlag = 135; int32_t wDebugFlag = 135;
int32_t tsdbDebugFlag = 131; int32_t tsdbDebugFlag = 131;
int32_t tqDebugFlag = 131; int32_t tqDebugFlag = 135;
int32_t cqDebugFlag = 131; int32_t cqDebugFlag = 131;
int32_t fsDebugFlag = 135; int32_t fsDebugFlag = 135;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册