提交 8a20b2fe 编写于 作者: L Liu Jicong

fix memory error

上级 feffab62
...@@ -54,25 +54,28 @@ typedef struct SColumnDataAgg { ...@@ -54,25 +54,28 @@ typedef struct SColumnDataAgg {
} SColumnDataAgg; } SColumnDataAgg;
typedef struct SDataBlockInfo { typedef struct SDataBlockInfo {
STimeWindow window; STimeWindow window;
int32_t rows; int32_t rows;
int32_t rowSize; int32_t rowSize;
int16_t numOfCols; int16_t numOfCols;
int16_t hasVarCol; int16_t hasVarCol;
union {int64_t uid; int64_t blockId;}; union {
int64_t uid;
int64_t blockId;
};
} SDataBlockInfo; } SDataBlockInfo;
//typedef struct SConstantItem { // typedef struct SConstantItem {
// SColumnInfo info; // SColumnInfo info;
// int32_t startRow; // run-length-encoding to save the space for multiple rows // int32_t startRow; // run-length-encoding to save the space for multiple rows
// int32_t endRow; // int32_t endRow;
// SVariant value; // SVariant value;
//} SConstantItem; // } SConstantItem;
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList); // info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
typedef struct SSDataBlock { typedef struct SSDataBlock {
SColumnDataAgg *pBlockAgg; SColumnDataAgg* pBlockAgg;
SArray *pDataBlock; // SArray<SColumnInfoData> SArray* pDataBlock; // SArray<SColumnInfoData>
SDataBlockInfo info; SDataBlockInfo info;
} SSDataBlock; } SSDataBlock;
...@@ -108,13 +111,13 @@ static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { ...@@ -108,13 +111,13 @@ static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) {
static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) { static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) {
int32_t tlen = 0; int32_t tlen = 0;
int32_t sz = 0; int32_t sz = 0;
tlen += taosEncodeFixedI64(buf, pRsp->consumerId); // tlen += taosEncodeFixedI64(buf, pRsp->consumerId);
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset); tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset); tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum); tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum);
tlen += taosEncodeFixedI32(buf, pRsp->numOfTopics); tlen += taosEncodeFixedI32(buf, pRsp->numOfTopics);
if (pRsp->numOfTopics == 0) return tlen; if (pRsp->numOfTopics == 0) return tlen;
tlen += tEncodeSSchemaWrapper(buf, pRsp->schemas); tlen += tEncodeSSchemaWrapper(buf, pRsp->schema);
if (pRsp->pBlockData) { if (pRsp->pBlockData) {
sz = taosArrayGetSize(pRsp->pBlockData); sz = taosArrayGetSize(pRsp->pBlockData);
} }
...@@ -128,15 +131,15 @@ static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp ...@@ -128,15 +131,15 @@ static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp
static FORCE_INLINE void* tDecodeSMqPollRsp(void* buf, SMqPollRsp* pRsp) { static FORCE_INLINE void* tDecodeSMqPollRsp(void* buf, SMqPollRsp* pRsp) {
int32_t sz; int32_t sz;
buf = taosDecodeFixedI64(buf, &pRsp->consumerId); // buf = taosDecodeFixedI64(buf, &pRsp->consumerId);
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset); buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum); buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
buf = taosDecodeFixedI32(buf, &pRsp->numOfTopics); buf = taosDecodeFixedI32(buf, &pRsp->numOfTopics);
if (pRsp->numOfTopics == 0) return buf; if (pRsp->numOfTopics == 0) return buf;
pRsp->schemas = (SSchemaWrapper*)calloc(1, sizeof(SSchemaWrapper)); pRsp->schema = (SSchemaWrapper*)calloc(1, sizeof(SSchemaWrapper));
if (pRsp->schemas == NULL) return NULL; if (pRsp->schema == NULL) return NULL;
buf = tDecodeSSchemaWrapper(buf, pRsp->schemas); buf = tDecodeSSchemaWrapper(buf, pRsp->schema);
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
pRsp->pBlockData = taosArrayInit(sz, sizeof(SSDataBlock)); pRsp->pBlockData = taosArrayInit(sz, sizeof(SSDataBlock));
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
...@@ -148,11 +151,11 @@ static FORCE_INLINE void* tDecodeSMqPollRsp(void* buf, SMqPollRsp* pRsp) { ...@@ -148,11 +151,11 @@ static FORCE_INLINE void* tDecodeSMqPollRsp(void* buf, SMqPollRsp* pRsp) {
} }
static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) { static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) {
if (pRsp->schemas) { if (pRsp->schema) {
if (pRsp->schemas->nCols) { if (pRsp->schema->nCols) {
tfree(pRsp->schemas->pSchema); tfree(pRsp->schema->pSchema);
} }
free(pRsp->schemas); free(pRsp->schema);
} }
taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))tDeleteSSDataBlock); taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))tDeleteSSDataBlock);
pRsp->pBlockData = NULL; pRsp->pBlockData = NULL;
...@@ -196,7 +199,7 @@ typedef struct SGroupbyExpr { ...@@ -196,7 +199,7 @@ typedef struct SGroupbyExpr {
typedef struct SFunctParam { typedef struct SFunctParam {
int32_t type; int32_t type;
SColumn *pCol; SColumn* pCol;
SVariant param; SVariant param;
} SFunctParam; } SFunctParam;
...@@ -214,12 +217,12 @@ typedef struct SResSchame { ...@@ -214,12 +217,12 @@ typedef struct SResSchame {
typedef struct SExprBasicInfo { typedef struct SExprBasicInfo {
SResSchema resSchema; SResSchema resSchema;
int16_t numOfParams; // argument value of each function int16_t numOfParams; // argument value of each function
SFunctParam *pParam; SFunctParam* pParam;
} SExprBasicInfo; } SExprBasicInfo;
typedef struct SExprInfo { typedef struct SExprInfo {
struct SExprBasicInfo base; struct SExprBasicInfo base;
struct tExprNode *pExpr; struct tExprNode* pExpr;
} SExprInfo; } SExprInfo;
typedef struct SStateWindow { typedef struct SStateWindow {
......
...@@ -1282,7 +1282,7 @@ static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) { ...@@ -1282,7 +1282,7 @@ static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) {
if (pRebSub == NULL) { if (pRebSub == NULL) {
goto _err; goto _err;
} }
pRebSub->key = key; pRebSub->key = strdup(key);
pRebSub->lostConsumers = taosArrayInit(0, sizeof(int64_t)); pRebSub->lostConsumers = taosArrayInit(0, sizeof(int64_t));
if (pRebSub->lostConsumers == NULL) { if (pRebSub->lostConsumers == NULL) {
goto _err; goto _err;
...@@ -2116,25 +2116,16 @@ typedef struct { ...@@ -2116,25 +2116,16 @@ typedef struct {
int8_t mqMsgType; int8_t mqMsgType;
int32_t code; int32_t code;
int32_t epoch; int32_t epoch;
int64_t consumerId;
} SMqRspHead; } SMqRspHead;
typedef struct {
int64_t consumerId;
SSchemaWrapper* schemas;
int64_t reqOffset;
int64_t rspOffset;
int32_t skipLogNum;
int32_t numOfTopics;
SArray* pBlockData; // SArray<SSDataBlock>
} SMqPollRsp;
// one req for one vg+topic
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
int64_t consumerId; int64_t consumerId;
int64_t blockingTime; int64_t blockingTime;
int32_t epoch; int32_t epoch;
int8_t withSchema;
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];
int64_t currentOffset; int64_t currentOffset;
...@@ -2153,19 +2144,21 @@ typedef struct { ...@@ -2153,19 +2144,21 @@ typedef struct {
} SMqSubTopicEp; } SMqSubTopicEp;
typedef struct { typedef struct {
int64_t consumerId; SMqRspHead head;
char cgroup[TSDB_CGROUP_LEN]; // TODO: remove from msg
SArray* topics; // SArray<SMqSubTopicEp> int64_t reqOffset;
} SMqCMGetSubEpRsp; int64_t rspOffset;
int32_t skipLogNum;
int32_t numOfTopics;
SSchemaWrapper* schema;
SArray* pBlockData; // SArray<SSDataBlock>
} SMqPollRsp;
typedef struct { typedef struct {
SMqRspHead head; SMqRspHead head;
union { char cgroup[TSDB_CGROUP_LEN];
SMqPollRsp consumeRsp; SArray* topics; // SArray<SMqSubTopicEp>
SMqCMGetSubEpRsp getEpRsp; } SMqCMGetSubEpRsp;
};
void* extra;
} SMqMsgWrapper;
typedef struct { typedef struct {
int32_t curBlock; int32_t curBlock;
...@@ -2173,11 +2166,13 @@ typedef struct { ...@@ -2173,11 +2166,13 @@ typedef struct {
void** uData; void** uData;
} SMqRowIter; } SMqRowIter;
struct tmq_message_t_v1 { struct tmq_message_t {
SMqPollRsp rsp; SMqPollRsp msg;
void* vg;
SMqRowIter iter; SMqRowIter iter;
}; };
#if 0
struct tmq_message_t { struct tmq_message_t {
SMqRspHead head; SMqRspHead head;
union { union {
...@@ -2189,6 +2184,7 @@ struct tmq_message_t { ...@@ -2189,6 +2184,7 @@ struct tmq_message_t {
int32_t curRow; int32_t curRow;
void** uData; void** uData;
}; };
#endif
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); } static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); }
...@@ -2241,8 +2237,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE ...@@ -2241,8 +2237,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) { static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pRsp->consumerId); // tlen += taosEncodeString(buf, pRsp->cgroup);
tlen += taosEncodeString(buf, pRsp->cgroup);
int32_t sz = taosArrayGetSize(pRsp->topics); int32_t sz = taosArrayGetSize(pRsp->topics);
tlen += taosEncodeFixedI32(buf, sz); tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
...@@ -2253,8 +2248,7 @@ static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSu ...@@ -2253,8 +2248,7 @@ static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSu
} }
static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) { static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) {
buf = taosDecodeFixedI64(buf, &pRsp->consumerId); // buf = taosDecodeStringTo(buf, pRsp->cgroup);
buf = taosDecodeStringTo(buf, pRsp->cgroup);
int32_t sz; int32_t sz;
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
pRsp->topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); pRsp->topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
......
...@@ -681,7 +681,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { ...@@ -681,7 +681,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) { int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return 0; if (tmq_message == NULL) return 0;
SMqPollRsp* pRsp = &tmq_message->consumeRsp; SMqPollRsp* pRsp = &tmq_message->msg;
return pRsp->skipLogNum; return pRsp->skipLogNum;
} }
...@@ -690,15 +690,15 @@ void tmqShowMsg(tmq_message_t* tmq_message) { ...@@ -690,15 +690,15 @@ void tmqShowMsg(tmq_message_t* tmq_message) {
static bool noPrintSchema; static bool noPrintSchema;
char pBuf[128]; char pBuf[128];
SMqPollRsp* pRsp = &tmq_message->consumeRsp; SMqPollRsp* pRsp = &tmq_message->msg;
int32_t colNum = pRsp->schemas->nCols; int32_t colNum = pRsp->schema->nCols;
if (!noPrintSchema) { if (!noPrintSchema) {
printf("|"); printf("|");
for (int32_t i = 0; i < colNum; i++) { for (int32_t i = 0; i < colNum; i++) {
if (i == 0) if (i == 0)
printf(" %25s |", pRsp->schemas->pSchema[i].name); printf(" %25s |", pRsp->schema->pSchema[i].name);
else else
printf(" %15s |", pRsp->schemas->pSchema[i].name); printf(" %15s |", pRsp->schema->pSchema[i].name);
} }
printf("\n"); printf("\n");
printf("===============================================\n"); printf("===============================================\n");
...@@ -778,19 +778,19 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -778,19 +778,19 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
goto WRITE_QUEUE_FAIL; goto WRITE_QUEUE_FAIL;
} }
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp); tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->msg);
pRsp->curBlock = 0; pRsp->iter.curBlock = 0;
pRsp->curRow = 0; pRsp->iter.curRow = 0;
// TODO: alloc mem // TODO: alloc mem
/*pRsp->*/ /*pRsp->*/
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
if (pRsp->consumeRsp.numOfTopics == 0) { if (pRsp->msg.numOfTopics == 0) {
/*printf("no data\n");*/ /*printf("no data\n");*/
taosFreeQitem(pRsp); taosFreeQitem(pRsp);
goto WRITE_QUEUE_FAIL; goto WRITE_QUEUE_FAIL;
} }
pRsp->extra = pParam->pVg; pRsp->vg = pParam->pVg;
taosWriteQitem(tmq->mqueue, pRsp); taosWriteQitem(tmq->mqueue, pRsp);
atomic_add_fetch_32(&tmq->readyRequest, 1); atomic_add_fetch_32(&tmq->readyRequest, 1);
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
...@@ -860,14 +860,14 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -860,14 +860,14 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
} }
tDeleteSMqCMGetSubEpRsp(&rsp); tDeleteSMqCMGetSubEpRsp(&rsp);
} else { } else {
tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); SMqCMGetSubEpRsp* pRsp = taosAllocateQitem(sizeof(SMqCMGetSubEpRsp));
if (pRsp == NULL) { if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1; code = -1;
goto END; goto END;
} }
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->getEpRsp); tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pRsp);
taosWriteQitem(tmq->mqueue, pRsp); taosWriteQitem(tmq->mqueue, pRsp);
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
...@@ -983,6 +983,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTo ...@@ -983,6 +983,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTo
return pReq; return pReq;
} }
#if 0
tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) { tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) {
tmq_message_t* msg = NULL; tmq_message_t* msg = NULL;
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
...@@ -1050,6 +1051,7 @@ tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) { ...@@ -1050,6 +1051,7 @@ tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) {
} }
return NULL; return NULL;
} }
#endif
int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
/*printf("call poll\n");*/ /*printf("call poll\n");*/
...@@ -1111,11 +1113,12 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { ...@@ -1111,11 +1113,12 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
} }
// return // return
int32_t tmqHandleRes(tmq_t* tmq, tmq_message_t* rspMsg, bool* pReset) { int32_t tmqHandleRes(tmq_t* tmq, SMqRspHead* rspHead, bool* pReset) {
if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__EP_RSP) { if (rspHead->mqMsgType == TMQ_MSG_TYPE__EP_RSP) {
/*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/ /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
if (rspMsg->head.epoch > atomic_load_32(&tmq->epoch)) { if (rspHead->epoch > atomic_load_32(&tmq->epoch)) {
tmqUpdateEp(tmq, rspMsg->head.epoch, &rspMsg->getEpRsp); SMqCMGetSubEpRsp* rspMsg = (SMqCMGetSubEpRsp*)rspHead;
tmqUpdateEp(tmq, rspHead->epoch, rspMsg);
tmqClearUnhandleMsg(tmq); tmqClearUnhandleMsg(tmq);
*pReset = true; *pReset = true;
} else { } else {
...@@ -1129,21 +1132,22 @@ int32_t tmqHandleRes(tmq_t* tmq, tmq_message_t* rspMsg, bool* pReset) { ...@@ -1129,21 +1132,22 @@ int32_t tmqHandleRes(tmq_t* tmq, tmq_message_t* rspMsg, bool* pReset) {
tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) { tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
while (1) { while (1) {
tmq_message_t* rspMsg = NULL; SMqRspHead* rspHead = NULL;
taosGetQitem(tmq->qall, (void**)&rspMsg); taosGetQitem(tmq->qall, (void**)&rspHead);
if (rspMsg == NULL) { if (rspHead == NULL) {
taosReadAllQitems(tmq->mqueue, tmq->qall); taosReadAllQitems(tmq->mqueue, tmq->qall);
taosGetQitem(tmq->qall, (void**)&rspMsg); taosGetQitem(tmq->qall, (void**)&rspHead);
if (rspMsg == NULL) return NULL; if (rspHead == NULL) return NULL;
} }
if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { if (rspHead->mqMsgType == TMQ_MSG_TYPE__POLL_RSP) {
tmq_message_t* rspMsg = (tmq_message_t*)rspHead;
atomic_sub_fetch_32(&tmq->readyRequest, 1); atomic_sub_fetch_32(&tmq->readyRequest, 1);
/*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/ /*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/
if (rspMsg->head.epoch == atomic_load_32(&tmq->epoch)) { if (rspMsg->msg.head.epoch == atomic_load_32(&tmq->epoch)) {
/*printf("epoch match\n");*/ /*printf("epoch match\n");*/
SMqClientVg* pVg = rspMsg->extra; SMqClientVg* pVg = rspMsg->vg;
pVg->currentOffset = rspMsg->consumeRsp.rspOffset; pVg->currentOffset = rspMsg->msg.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
return rspMsg; return rspMsg;
} else { } else {
...@@ -1153,8 +1157,8 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese ...@@ -1153,8 +1157,8 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
} else { } else {
/*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/ /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
bool reset = false; bool reset = false;
tmqHandleRes(tmq, rspMsg, &reset); tmqHandleRes(tmq, rspHead, &reset);
taosFreeQitem(rspMsg); taosFreeQitem(rspHead);
if (pollIfReset && reset) { if (pollIfReset && reset) {
printf("reset and repoll\n"); printf("reset and repoll\n");
tmqPollImpl(tmq, blockingTime); tmqPollImpl(tmq, blockingTime);
...@@ -1163,6 +1167,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese ...@@ -1163,6 +1167,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
} }
} }
#if 0
tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) { tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) {
tmq_message_t* rspMsg = NULL; tmq_message_t* rspMsg = NULL;
int64_t startTime = taosGetTimestampMs(); int64_t startTime = taosGetTimestampMs();
...@@ -1185,6 +1190,7 @@ tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) { ...@@ -1185,6 +1190,7 @@ tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) {
return NULL; return NULL;
} }
} }
#endif
tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
tmq_message_t* rspMsg; tmq_message_t* rspMsg;
...@@ -1350,7 +1356,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v ...@@ -1350,7 +1356,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v
void tmq_message_destroy(tmq_message_t* tmq_message) { void tmq_message_destroy(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return; if (tmq_message == NULL) return;
SMqPollRsp* pRsp = &tmq_message->consumeRsp; SMqPollRsp* pRsp = &tmq_message->msg;
tDeleteSMqConsumeRsp(pRsp); tDeleteSMqConsumeRsp(pRsp);
/*free(tmq_message);*/ /*free(tmq_message);*/
taosFreeQitem(tmq_message); taosFreeQitem(tmq_message);
...@@ -1366,24 +1372,24 @@ const char* tmq_err2str(tmq_resp_err_t err) { ...@@ -1366,24 +1372,24 @@ const char* tmq_err2str(tmq_resp_err_t err) {
} }
TAOS_ROW tmq_get_row(tmq_message_t* message) { TAOS_ROW tmq_get_row(tmq_message_t* message) {
SMqPollRsp* rsp = &message->consumeRsp; SMqPollRsp* rsp = &message->msg;
while (1) { while (1) {
if (message->curBlock < taosArrayGetSize(rsp->pBlockData)) { if (message->iter.curBlock < taosArrayGetSize(rsp->pBlockData)) {
SSDataBlock* pBlock = taosArrayGet(rsp->pBlockData, message->curBlock); SSDataBlock* pBlock = taosArrayGet(rsp->pBlockData, message->iter.curBlock);
if (message->curRow < pBlock->info.rows) { if (message->iter.curRow < pBlock->info.rows) {
for (int i = 0; i < pBlock->info.numOfCols; i++) { for (int i = 0; i < pBlock->info.numOfCols; i++) {
SColumnInfoData* pData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pData = taosArrayGet(pBlock->pDataBlock, i);
if (colDataIsNull_s(pData, message->curRow)) if (colDataIsNull_s(pData, message->iter.curRow))
message->uData[i] = NULL; message->iter.uData[i] = NULL;
else { else {
message->uData[i] = colDataGetData(pData, message->curRow); message->iter.uData[i] = colDataGetData(pData, message->iter.curRow);
} }
} }
message->curRow++; message->iter.curRow++;
return message->uData; return message->iter.uData;
} else { } else {
message->curBlock++; message->iter.curBlock++;
message->curRow = 0; message->iter.curRow = 0;
continue; continue;
} }
} }
......
...@@ -77,8 +77,9 @@ static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { ...@@ -77,8 +77,9 @@ static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
} }
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont); // TODO: handle invalid write
taosFreeQitem(pMsg); /*rpcFreeCont(pMsg->rpcMsg.pCont);*/
/*taosFreeQitem(pMsg);*/
} }
static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
......
...@@ -272,7 +272,6 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) { ...@@ -272,7 +272,6 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
/*sdbWrite(pMnode->pSdb, pConsumerRaw);*/ /*sdbWrite(pMnode->pSdb, pConsumerRaw);*/
strcpy(rsp.cgroup, pReq->cgroup); strcpy(rsp.cgroup, pReq->cgroup);
rsp.consumerId = consumerId;
if (epoch != pConsumer->epoch) { if (epoch != pConsumer->epoch) {
mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, pConsumer->epoch); mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, pConsumer->epoch);
SArray *pTopics = pConsumer->currentTopics; SArray *pTopics = pConsumer->currentTopics;
...@@ -322,6 +321,7 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) { ...@@ -322,6 +321,7 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
} }
((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP; ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
((SMqRspHead *)buf)->epoch = pConsumer->epoch; ((SMqRspHead *)buf)->epoch = pConsumer->epoch;
((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqCMGetSubEpRsp(&abuf, &rsp); tEncodeSMqCMGetSubEpRsp(&abuf, &rsp);
...@@ -344,14 +344,14 @@ static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup) ...@@ -344,14 +344,14 @@ static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup)
} }
static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
SMqRebSubscribe *pRebSub = taosHashGet(pHash, key, strlen(key)); SMqRebSubscribe *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
if (pRebSub == NULL) { if (pRebSub == NULL) {
pRebSub = tNewSMqRebSubscribe(key); pRebSub = tNewSMqRebSubscribe(key);
if (pRebSub == NULL) { if (pRebSub == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
taosHashPut(pHash, key, strlen(key), pRebSub, sizeof(SMqRebSubscribe)); taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebSubscribe));
} }
return pRebSub; return pRebSub;
} }
...@@ -441,6 +441,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { ...@@ -441,6 +441,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
if (pIter == NULL) break; if (pIter == NULL) break;
SMqRebSubscribe *pRebSub = (SMqRebSubscribe *)pIter; SMqRebSubscribe *pRebSub = (SMqRebSubscribe *)pIter;
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key); SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key);
tfree(pRebSub->key);
mInfo("mq rebalance subscription: %s", pSub->key); mInfo("mq rebalance subscription: %s", pSub->key);
...@@ -503,7 +504,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { ...@@ -503,7 +504,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE); atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
} }
mInfo("mq consumer:%" PRId64 ", status change from %d to %d", pRebConsumer->consumerId, status, pRebConsumer->status); mInfo("mq consumer:%" PRId64 ", status change from %d to %d", pRebConsumer->consumerId, status,
pRebConsumer->status);
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer); SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
...@@ -543,8 +545,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { ...@@ -543,8 +545,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
} else { } else {
mInfo("mq rebalance: assign vgroup %d, from consumer %" PRId64 " to consumer %" PRId64 "", pConsumerEp->vgId, mInfo("mq rebalance: assign vgroup %d, from consumer %" PRId64 " to consumer %" PRId64 "",
pConsumerEp->oldConsumerId, pConsumerEp->consumerId); pConsumerEp->vgId, pConsumerEp->oldConsumerId, pConsumerEp->consumerId);
mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp); mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
} }
...@@ -1099,7 +1101,8 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { ...@@ -1099,7 +1101,8 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, newTopicName); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, newTopicName);
bool createSub = false; bool createSub = false;
if (pSub == NULL) { if (pSub == NULL) {
mDebug("create new subscription by consumer %" PRId64 ", group: %s, topic %s", consumerId, cgroup, newTopicName); mDebug("create new subscription by consumer %" PRId64 ", group: %s, topic %s", consumerId, cgroup,
newTopicName);
pSub = mndCreateSubscription(pMnode, pTopic, cgroup); pSub = mndCreateSubscription(pMnode, pTopic, cgroup);
createSub = true; createSub = true;
......
...@@ -245,7 +245,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -245,7 +245,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} }
SMqPollRsp rsp = { SMqPollRsp rsp = {
.consumerId = consumerId, /*.consumerId = consumerId,*/
.numOfTopics = 0, .numOfTopics = 0,
.pBlockData = NULL, .pBlockData = NULL,
}; };
...@@ -298,7 +298,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -298,7 +298,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} }
taosArrayPush(pRes, pDataBlock); taosArrayPush(pRes, pDataBlock);
rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper; rsp.schema = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper;
rsp.rspOffset = fetchOffset; rsp.rspOffset = fetchOffset;
rsp.numOfTopics = 1; rsp.numOfTopics = 1;
...@@ -312,6 +312,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -312,6 +312,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} }
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch; ((SMqRspHead*)buf)->epoch = pReq->epoch;
((SMqRspHead*)buf)->consumerId = consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqPollRsp(&abuf, &rsp); tEncodeSMqPollRsp(&abuf, &rsp);
......
...@@ -14,8 +14,8 @@ ...@@ -14,8 +14,8 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "vnd.h"
#include "sync.h" #include "sync.h"
#include "vnd.h"
// #include "vnodeInt.h" // #include "vnodeInt.h"
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; } int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; }
...@@ -41,6 +41,6 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { ...@@ -41,6 +41,6 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
} }
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vInfo("sync message is processed"); /*vInfo("sync message is processed");*/
return 0; return 0;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册