提交 8677b56a 编写于 作者: wmmhello's avatar wmmhello

fix:assert error in tqProcessSubmitReqForSubscribe if put pHandle to array twice

上级 6a889ae8
......@@ -534,7 +534,7 @@ static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSm
uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL;
if (index) {
if (colField[*index].type != kv->type) {
uError("SML:0x%" PRIx64 " point type and db type mismatch. point type: %d, db type: %d, key: %s", info->id, colField[*index].type, kv->type, kv->key);
uError("SML:0x%" PRIx64 " point type and db type mismatch. db type: %d, point type: %d, key: %s", info->id, colField[*index].type, kv->type, kv->key);
return TSDB_CODE_SML_INVALID_DATA;
}
......
......@@ -114,8 +114,7 @@ struct STQ {
char* path;
int64_t walLogLastVer;
SRWLatch lock;
SHashObj* pPushMgr; // consumerId -> STqPushEntry
SArray * pPushArray;
SHashObj* pPushMgr; // consumerId -> STqHandle
SHashObj* pHandle; // subKey -> STqHandle
SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
STqOffsetStore* pOffsetStore;
......
......@@ -78,18 +78,6 @@ static void destroyTqHandle(void* data) {
}
}
static void tqPushEntryFree(void* data) {
STqPushEntry* p = *(void**)data;
if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) {
tDeleteSMqDataRsp(p->pDataRsp);
} else if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__TAOSX_RSP) {
tDeleteSTaosxRsp((STaosxRsp*)p->pDataRsp);
}
taosMemoryFree(p->pDataRsp);
taosMemoryFree(p);
}
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
pLeft->val.version <= pRight->val.version;
......@@ -109,11 +97,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
pTq->pPushArray = taosArrayInit(8, POINTER_BYTES);
taosInitRWLatch(&pTq->lock);
pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree);
pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
......@@ -158,7 +143,6 @@ void tqClose(STQ* pTq) {
taosMemoryFree(pTq->path);
tqMetaClose(pTq);
streamMetaClose(pTq->pStreamMeta);
taosArrayDestroy(pTq->pPushArray);
taosMemoryFree(pTq);
}
......@@ -569,14 +553,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
// remove if it has been register in the push manager, and return one empty block to consumer
// tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
for(size_t i = 0; i < taosArrayGetSize(pTq->pPushArray); i++) {
void* handle = taosArrayGetP(pTq->pPushArray, i);
if(handle == pHandle) {
tqInfo("vgId:%d remove handle when switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId);
taosArrayRemove(pTq->pPushArray, i);
break;
}
}
taosHashRemove(pTq->pPushMgr, &pHandle->consumerId, sizeof(int64_t));
if(pHandle->msg != NULL) {
rpcFreeCont(pHandle->msg->pCont);
taosMemoryFree(pHandle->msg);
......@@ -1091,8 +1069,9 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
taosWLockLatch(&pTq->lock);
for(size_t i = 0; i < taosArrayGetSize(pTq->pPushArray); i++){
STqHandle* pHandle = (STqHandle*)taosArrayGetP(pTq->pPushArray, i);
void *pIter = taosHashIterate(pTq->pPushMgr, NULL);
while(pIter){
STqHandle* pHandle = *(STqHandle**)pIter;
tqDebug("vgId:%d start set submit for pHandle:%p", vgId, pHandle);
if(ASSERT(pHandle->msg != NULL)){
tqError("pHandle->msg should not be null");
......@@ -1103,8 +1082,9 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
taosMemoryFree(pHandle->msg);
pHandle->msg = NULL;
}
pIter = taosHashIterate(pTq->pPushMgr, pIter);
}
taosArrayClear(pTq->pPushArray);
taosHashClear(pTq->pPushMgr);
// unlock
taosWUnLockLatch(&pTq->lock);
......
......@@ -206,69 +206,6 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
}
#endif
typedef struct {
void* pKey;
int64_t keyLen;
} SItem;
static void recordPushedEntry(SArray* cachedKey, void* pIter);
static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq);
static void freeItem(void* param) {
SItem* p = (SItem*)param;
taosMemoryFree(p->pKey);
}
static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int64_t ver, int32_t vgId, char* pData,
int32_t dataLen, SArray* pCachedKey) {
STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
SMqDataRsp* pRsp = pPushEntry->pDataRsp;
if (pRsp->reqOffset.version >= ver) {
tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", vgId,
pRsp->reqOffset.version, ver);
return;
}
qTaskInfo_t pTaskInfo = pExec->task;
// prepare scan mem data
SPackedData submit = {.msgStr = pData, .msgLen = dataLen, .ver = ver};
if (qStreamSetScanMemData(pTaskInfo, submit) != 0) {
return;
}
qStreamSetOpen(pTaskInfo);
// here start to scan submit block to extract the subscribed data
int32_t totalRows = 0;
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
if (qExecTask(pTaskInfo, &pDataBlock, &ts) < 0) {
tqDebug("vgId:%d, tq exec error since %s", vgId, terrstr());
}
if (pDataBlock == NULL) {
break;
}
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
pRsp->blockNum++;
totalRows += pDataBlock->info.rows;
}
tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d, rows:%d", vgId, pPushEntry->subKey, pRsp->blockNum,
totalRows);
if (pRsp->blockNum > 0) {
tqOffsetResetToLog(&pRsp->rspOffset, ver);
tqPushDataRsp(pTq, pPushEntry);
recordPushedEntry(pCachedKey, pIter);
}
}
int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
// void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
// int32_t len = msgLen - sizeof(SSubmitReq2Msg);
......@@ -363,84 +300,3 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
return 0;
}
int32_t tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp,
int32_t type) {
uint64_t consumerId = pRequest->consumerId;
int32_t vgId = TD_VID(pTq->pVnode);
STqHandle* pTqHandle = pHandle;
STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry));
if (pPushEntry == NULL) {
tqDebug("tmq poll: consumer:0x%" PRIx64 ", vgId:%d failed to malloc, size:%d", consumerId, vgId,
(int32_t)sizeof(STqPushEntry));
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pPushEntry->info = pRpcMsg->info;
memcpy(pPushEntry->subKey, pTqHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN);
if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
pPushEntry->pDataRsp = taosMemoryCalloc(1, sizeof(STaosxRsp));
memcpy(pPushEntry->pDataRsp, pDataRsp, sizeof(STaosxRsp));
} else if (type == TMQ_MSG_TYPE__POLL_RSP) {
pPushEntry->pDataRsp = taosMemoryCalloc(1, sizeof(SMqDataRsp));
memcpy(pPushEntry->pDataRsp, pDataRsp, sizeof(SMqDataRsp));
}
SMqRspHead* pHead = &pPushEntry->pDataRsp->head;
pHead->consumerId = consumerId;
pHead->epoch = pRequest->epoch;
pHead->mqMsgType = type;
taosHashPut(pTq->pPushMgr, pTqHandle->subKey, strlen(pTqHandle->subKey), &pPushEntry, sizeof(void*));
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr, total:%d",
consumerId, pTqHandle->subKey, pDataRsp->reqOffset.version, vgId, taosHashGetSize(pTq->pPushMgr));
return 0;
}
int32_t tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) {
int32_t vgId = TD_VID(pTq->pVnode);
STqPushEntry** pEntry = taosHashGet(pTq->pPushMgr, pKey, keyLen);
if (pEntry != NULL) {
uint64_t cId = (*pEntry)->pDataRsp->head.consumerId;
ASSERT(consumerId == cId);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s vgId:%d remove from push mgr, remains:%d", consumerId,
(*pEntry)->subKey, vgId, taosHashGetSize(pTq->pPushMgr) - 1);
if (rspConsumer) { // rsp the old consumer with empty block.
tqPushDataRsp(pTq, *pEntry);
}
taosHashRemove(pTq->pPushMgr, pKey, keyLen);
}
return 0;
}
void recordPushedEntry(SArray* cachedKey, void* pIter) {
size_t kLen = 0;
void* key = taosHashGetKey(pIter, &kLen);
SItem item = {.pKey = strndup(key, kLen), .keyLen = kLen};
taosArrayPush(cachedKey, &item);
}
void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
int32_t numOfKeys = (int32_t)taosArrayGetSize(pCachedKeys);
for (int32_t i = 0; i < numOfKeys; i++) {
SItem* pItem = taosArrayGet(pCachedKeys, i);
if (taosHashRemove(pTq->pPushMgr, pItem->pKey, pItem->keyLen) != 0) {
tqError("vgId:%d, tq push hash remove key error, key: %s", vgId, (char*)pItem->pKey);
}
}
if (numOfKeys > 0) {
tqDebug("vgId:%d, pushed %d items and remain:%d", vgId, numOfKeys, (int32_t)taosHashGetSize(pTq->pPushMgr));
}
}
......@@ -191,7 +191,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
pHandle->msg->contLen = pMsg->contLen;
tqDebug("data is over, register to handle:%p, pCont:%p, len:%d", pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
taosArrayPush(pTq->pPushArray, &pHandle);
taosHashPut(pTq->pPushMgr, &pHandle->consumerId, sizeof(int64_t), &pHandle, POINTER_BYTES);
taosWUnLockLatch(&pTq->lock);
tDeleteSMqDataRsp(&dataRsp);
return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册