From b5f5400d30a88acd46170f5125d170a67a29e2bc Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 1 Mar 2022 17:49:14 +0800 Subject: [PATCH] fix --- source/client/src/clientHb.c | 166 ++++++++++++++++----------------- source/client/src/tmq.c | 27 ++++-- source/dnode/vnode/src/tq/tq.c | 6 +- source/os/src/osFile.c | 2 +- 4 files changed, 104 insertions(+), 97 deletions(-) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 45c1858948..0e309a8631 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -13,19 +13,17 @@ * along with this program. If not, see . */ -#include "clientInt.h" -#include "trpc.h" #include "catalog.h" +#include "clientInt.h" #include "clientLog.h" +#include "trpc.h" static SClientHbMgr clientHbMgr = {0}; static int32_t hbCreateThread(); static void hbStopThread(); -static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) { - return 0; -} +static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; } static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { int32_t code = 0; @@ -39,8 +37,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray); for (int32_t i = 0; i < numOfBatchs; ++i) { SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i); - tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%"PRIx64, rsp->db, rsp->vgVersion, rsp->uid); - + tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%" PRIx64, rsp->db, rsp->vgVersion, rsp->uid); + if (rsp->vgVersion < 0) { code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid); } else { @@ -60,8 +58,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog taosHashCleanup(vgInfo.vgHash); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - } - + } + catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, &vgInfo); } @@ -106,8 +104,8 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo return TSDB_CODE_SUCCESS; } -static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) { - SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey)); +static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { + SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey)); if (NULL == info) { tscWarn("fail to get connInfo, may be dropped, connId:%d, type:%d", pRsp->connKey.connId, pRsp->connKey.hbType); return TSDB_CODE_SUCCESS; @@ -116,7 +114,7 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0; tscDebug("hb got %d rsp kv", kvNum); - + for (int32_t i = 0; i < kvNum; ++i) { SKv *kv = taosArrayGet(pRsp->info, i); switch (kv->key) { @@ -126,30 +124,30 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs break; } - int64_t *clusterId = (int64_t *)info->param; + int64_t *clusterId = (int64_t *)info->param; struct SCatalog *pCatalog = NULL; - + int32_t code = catalogGetHandle(*clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code)); + tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code)); break; } hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog); break; } - case HEARTBEAT_KEY_STBINFO:{ + case HEARTBEAT_KEY_STBINFO: { if (kv->valueLen <= 0 || NULL == kv->value) { tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value); break; } - int64_t *clusterId = (int64_t *)info->param; + int64_t *clusterId = (int64_t *)info->param; struct SCatalog *pCatalog = NULL; - + int32_t code = catalogGetHandle(*clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code)); + tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code)); break; } @@ -165,22 +163,22 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs return TSDB_CODE_SUCCESS; } -static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code) { +static int32_t hbMqAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) { static int32_t emptyRspNum = 0; if (code != 0) { tfree(param); return -1; } - char *key = (char *)param; + char *key = (char *)param; SClientHbBatchRsp pRsp = {0}; tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp); - + int32_t rspNum = taosArrayGetSize(pRsp.rsps); - SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); + SAppInstInfo **pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); if (pInst == NULL || NULL == *pInst) { - tscError("cluster not exist, key:%s", key); + tscError("cluster not exist, key:%s", key); tfree(param); tFreeClientHbBatchRsp(&pRsp); return -1; @@ -189,13 +187,14 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code tfree(param); if (rspNum) { - tscDebug("hb got %d rsp, %d empty rsp received before", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0)); + tscDebug("hb got %d rsp, %d empty rsp received before", rspNum, + atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0)); } else { atomic_add_fetch_32(&emptyRspNum, 1); } for (int32_t i = 0; i < rspNum; ++i) { - SClientHbRsp* rsp = taosArrayGet(pRsp.rsps, i); + SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i); code = (*clientHbMgr.rspHandle[rsp->connKey.hbType])((*pInst)->pAppHbMgr, rsp); if (code) { break; @@ -203,14 +202,14 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code } tFreeClientHbBatchRsp(&pRsp); - + return code; } int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { SDbVgVersion *dbs = NULL; - uint32_t dbNum = 0; - int32_t code = 0; + uint32_t dbNum = 0; + int32_t code = 0; code = catalogGetExpiredDBs(pCatalog, &dbs, &dbNum); if (TSDB_CODE_SUCCESS != code) { @@ -238,8 +237,8 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { SSTableMetaVersion *stbs = NULL; - uint32_t stbNum = 0; - int32_t code = 0; + uint32_t stbNum = 0; + int32_t code = 0; code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum); if (TSDB_CODE_SUCCESS != code) { @@ -254,7 +253,7 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC SSTableMetaVersion *stb = &stbs[i]; stb->suid = htobe64(stb->suid); stb->sversion = htons(stb->sversion); - stb->tversion = htons(stb->tversion); + stb->tversion = htons(stb->tversion); } SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = sizeof(SSTableMetaVersion) * stbNum, .value = stbs}; @@ -266,17 +265,16 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC return TSDB_CODE_SUCCESS; } - -int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) { - int64_t *clusterId = (int64_t *)param; +int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { + int64_t *clusterId = (int64_t *)param; struct SCatalog *pCatalog = NULL; int32_t code = catalogGetHandle(*clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code)); + tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code)); return code; } - + code = hbGetExpiredDBInfo(connKey, pCatalog, req); if (TSDB_CODE_SUCCESS != code) { return code; @@ -287,13 +285,10 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req return code; } - return TSDB_CODE_SUCCESS; } -int32_t hbMqHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) { - -} +int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; } void hbMgrInitMqHbHandle() { clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle; @@ -312,10 +307,8 @@ void hbFreeReq(void *req) { tFreeReqKvHash(pReq->info); } - - -SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { - SClientHbBatchReq* pBatchReq = calloc(1, sizeof(SClientHbBatchReq)); +SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { + SClientHbBatchReq *pBatchReq = calloc(1, sizeof(SClientHbBatchReq)); if (pBatchReq == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return NULL; @@ -324,11 +317,11 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq)); int32_t code = 0; - void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); + void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); while (pIter != NULL) { - SClientHbReq* pOneReq = pIter; + SClientHbReq *pOneReq = pIter; - SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey)); + SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey)); if (info) { code = (*clientHbMgr.reqHandle[pOneReq->connKey.hbType])(&pOneReq->connKey, info->param, pOneReq); if (code) { @@ -350,11 +343,10 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { return pBatchReq; } - void hbClearReqInfo(SAppHbMgr *pAppHbMgr) { void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); while (pIter != NULL) { - SClientHbReq* pOneReq = pIter; + SClientHbReq *pOneReq = pIter; tFreeReqKvHash(pOneReq->info); taosHashClear(pOneReq->info); @@ -363,31 +355,29 @@ void hbClearReqInfo(SAppHbMgr *pAppHbMgr) { } } - - -static void* hbThreadFunc(void* param) { +static void *hbThreadFunc(void *param) { setThreadName("hb"); while (1) { int8_t threadStop = atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 1, 2); - if(1 == threadStop) { + if (1 == threadStop) { break; } pthread_mutex_lock(&clientHbMgr.lock); int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); - for(int i = 0; i < sz; i++) { - SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); + for (int i = 0; i < sz; i++) { + SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt); if (connCnt == 0) { continue; } - SClientHbBatchReq* pReq = hbGatherAllInfo(pAppHbMgr); + SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr); if (pReq == NULL) { continue; } - int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq); + int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq); void *buf = malloc(tlen); if (buf == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -395,7 +385,7 @@ static void* hbThreadFunc(void* param) { hbClearReqInfo(pAppHbMgr); break; } - + tSerializeSClientHbBatchReq(buf, tlen, pReq); SMsgSendInfo *pInfo = calloc(1, sizeof(SMsgSendInfo)); @@ -415,17 +405,17 @@ static void* hbThreadFunc(void* param) { pInfo->requestObjRefId = 0; SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo; - int64_t transporterId = 0; - SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); + int64_t transporterId = 0; + SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo); - tFreeClientHbBatchReq(pReq, false); + tFreeClientHbBatchReq(pReq, false); hbClearReqInfo(pAppHbMgr); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); } pthread_mutex_unlock(&clientHbMgr.lock); - + taosMsleep(HEARTBEAT_INTERVAL); } return NULL; @@ -449,17 +439,18 @@ static void hbStopThread() { tscDebug("hb thread already stopped"); return; } - + while (2 != atomic_load_8(&clientHbMgr.threadStop)) { usleep(10); } - tscDebug("hb thread stopped"); + tscDebug("hb thread stopped"); } -SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) { +SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) { + /*return NULL;*/ hbMgrInit(); - SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr)); + SAppHbMgr *pAppHbMgr = malloc(sizeof(SAppHbMgr)); if (pAppHbMgr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -495,7 +486,7 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) { pthread_mutex_lock(&clientHbMgr.lock); taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr); pthread_mutex_unlock(&clientHbMgr.lock); - + return pAppHbMgr; } @@ -504,7 +495,7 @@ void appHbMgrCleanup(void) { int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); for (int i = 0; i < sz; i++) { - SAppHbMgr* pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i); + SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i); taosHashCleanup(pTarget->activeInfo); pTarget->activeInfo = NULL; taosHashCleanup(pTarget->connInfo); @@ -515,11 +506,12 @@ void appHbMgrCleanup(void) { } int hbMgrInit() { + /*return 0;*/ // init once int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); if (old == 1) return 0; - clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void*)); + clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *)); pthread_mutex_init(&clientHbMgr.lock, NULL); // init handle funcs @@ -532,36 +524,36 @@ int hbMgrInit() { } void hbMgrCleanUp() { - return; + /*return;*/ hbStopThread(); - + // destroy all appHbMgr int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0); if (old == 0) return; pthread_mutex_lock(&clientHbMgr.lock); appHbMgrCleanup(); - taosArrayDestroy(clientHbMgr.appHbMgrs); + taosArrayDestroy(clientHbMgr.appHbMgrs); pthread_mutex_unlock(&clientHbMgr.lock); - + clientHbMgr.appHbMgrs = NULL; } -int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) { +int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) { // init hash in activeinfo - void* data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + void *data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); if (data != NULL) { return 0; } SClientHbReq hbReq; hbReq.connKey = connKey; hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); - + taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)); - + // init hash if (info != NULL) { - SClientHbReq * pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); info->req = pReq; taosHashPut(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey), info, sizeof(SHbConnInfo)); } @@ -570,9 +562,10 @@ int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo * return 0; } -int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { +int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { + /*return 0;*/ SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY}; - SHbConnInfo info = {0}; + SHbConnInfo info = {0}; switch (hbType) { case HEARTBEAT_TYPE_QUERY: { @@ -588,11 +581,12 @@ int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int3 default: break; } - + return hbRegisterConnImpl(pAppHbMgr, connKey, &info); } -void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) { +void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { + /*return;*/ int32_t code = 0; code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); @@ -602,9 +596,11 @@ void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) { atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); } -int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) { +int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void *key, void *value, int32_t keyLen, + int32_t valueLen) { + return 0; // find req by connection id - SClientHbReq* pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); ASSERT(pReq != NULL); taosHashPut(pReq->info, key, keyLen, value, valueLen); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index acb96013c9..d9ab23b9fa 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -134,7 +134,7 @@ typedef struct { tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t)); conf->auto_commit = false; - conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST; + conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST; return conf; } @@ -651,13 +651,17 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { /*SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp));*/ tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); if (pRsp == NULL) { + printf("fail\n"); return -1; } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp); /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ if (pRsp->consumeRsp.numOfTopics == 0) { - /*printf("no data\n");*/ + printf("no data\n"); + if (pParam->epoch == tmq->epoch) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + } taosFreeQitem(pRsp); return 0; } @@ -732,7 +736,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { return -1; } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); - tDecodeSMqCMGetSubEpRsp(pMsg->pData, &pRsp->getEpRsp); + tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->getEpRsp); taosWriteQitem(tmq->mqueue, pRsp); } return 0; @@ -973,13 +977,18 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { tmqPollImpl(tmq, blocking_time); while (1) { + /*printf("cycle\n");*/ taosReadAllQitems(tmq->mqueue, tmq->qall); - tmqHandleAllRsp(tmq, blocking_time, true); - int64_t endTime = taosGetTimestampMs(); - if (endTime - startTime > blocking_time) { - printf("cycle end\n"); - usleep(1000 * 1000); - return NULL; + rspMsg = tmqHandleAllRsp(tmq, blocking_time, true); + if (rspMsg) { + return rspMsg; + } + if (blocking_time != 0) { + int64_t endTime = taosGetTimestampMs(); + if (endTime - startTime > blocking_time) { + printf("normal exit\n"); + return NULL; + } } } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5f5a434ec9..aa198d0806 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -278,14 +278,16 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { rsp.numOfTopics = 1; rsp.pBlockData = pRes; - int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp); + int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqConsumeRsp(NULL, &rsp); void* buf = rpcMallocCont(tlen); if (buf == NULL) { pMsg->code = -1; return -1; } + ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; + ((SMqRspHead*)buf)->epoch = pReq->epoch; - void* abuf = buf; + void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); tEncodeSMqConsumeRsp(&abuf, &rsp); taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock); pMsg->pCont = buf; diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index 2d77df9b43..751bbdbb09 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -682,4 +682,4 @@ int32_t taosEOFFile(TdFilePtr pFile) { assert(pFile->fp != NULL); return feof(pFile->fp); -} \ No newline at end of file +} -- GitLab