diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 0e309a86311bbd02a1219664a4a6a043a0a99a60..dcb30a65767737b264ea08cbdd47e08c6e84ba72 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -288,7 +288,7 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req return TSDB_CODE_SUCCESS; } -int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; } +int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {} void hbMgrInitMqHbHandle() { clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle; @@ -448,7 +448,7 @@ static void hbStopThread() { } SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) { - /*return NULL;*/ + return NULL; hbMgrInit(); SAppHbMgr *pAppHbMgr = malloc(sizeof(SAppHbMgr)); if (pAppHbMgr == NULL) { @@ -506,7 +506,7 @@ void appHbMgrCleanup(void) { } int hbMgrInit() { - /*return 0;*/ + return 0; // init once int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); if (old == 1) return 0; @@ -524,7 +524,7 @@ int hbMgrInit() { } void hbMgrCleanUp() { - /*return;*/ + return; hbStopThread(); // destroy all appHbMgr @@ -563,7 +563,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo * } int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { - /*return 0;*/ + return 0; SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY}; SHbConnInfo info = {0}; @@ -586,7 +586,7 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int3 } void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { - /*return;*/ + return; int32_t code = 0; code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index d9ab23b9fa29ce819d96afd90bae5b0a06bb3bd9..4d740bcae8f45953ae237464b67c5d8494b44276 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -659,9 +659,12 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ if (pRsp->consumeRsp.numOfTopics == 0) { printf("no data\n"); +<<<<<<< Updated upstream if (pParam->epoch == tmq->epoch) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); } +======= +>>>>>>> Stashed changes taosFreeQitem(pRsp); return 0; } @@ -979,6 +982,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { while (1) { /*printf("cycle\n");*/ taosReadAllQitems(tmq->mqueue, tmq->qall); +<<<<<<< Updated upstream rspMsg = tmqHandleAllRsp(tmq, blocking_time, true); if (rspMsg) { return rspMsg; @@ -990,6 +994,14 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { return NULL; } } +======= + tmqHandleAllRsp(tmq, blocking_time, true); + /*if (blocking_time != 0 && endTime - startTime > blocking_time) {*/ + /*int64_t endTime = taosGetTimestampMs();*/ + /*printf("normal exit\n");*/ + /*return NULL;*/ + /*}*/ +>>>>>>> Stashed changes } } diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index 751bbdbb09d3c2ca5792043d204a12b5c79b0acc..aaabf40e05ed7d6b3a34d886b8b67bd96370d3d4 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -196,11 +196,11 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { char *mode = NULL; if (tdFileOptions & TD_FILE_APPEND) { mode = (tdFileOptions & TD_FILE_TEXT) ? "at+" : "ab+"; - }else if (tdFileOptions & TD_FILE_TRUNC) { + } else if (tdFileOptions & TD_FILE_TRUNC) { mode = (tdFileOptions & TD_FILE_TEXT) ? "wt+" : "wb+"; - }else if ((tdFileOptions & TD_FILE_READ) && !(tdFileOptions & TD_FILE_WRITE)) { + } else if ((tdFileOptions & TD_FILE_READ) && !(tdFileOptions & TD_FILE_WRITE)) { mode = (tdFileOptions & TD_FILE_TEXT) ? "rt" : "rb"; - }else { + } else { mode = (tdFileOptions & TD_FILE_TEXT) ? "rt+" : "rb+"; } assert(!(tdFileOptions & TD_FILE_EXCL)); @@ -637,7 +637,7 @@ void taosFprintfFile(TdFilePtr pFile, const char *format, ...) { } assert(pFile->fp != NULL); - char buffer[MAX_FPRINTFLINE_BUFFER_SIZE] = {0}; + char buffer[MAX_FPRINTFLINE_BUFFER_SIZE] = {0}; va_list ap; va_start(ap, format); vfprintf(pFile->fp, format, ap); @@ -675,11 +675,15 @@ int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict__ ptrBuf) { size_t len = 0; return getline(ptrBuf, &len, pFile->fp); } -int32_t taosEOFFile(TdFilePtr pFile) { +int32_t taosEOFFile(TdFilePtr pFile) { if (pFile == NULL) { return 0; } assert(pFile->fp != NULL); +<<<<<<< Updated upstream return feof(pFile->fp); +======= + return feof(pFile->fp); +>>>>>>> Stashed changes }