提交 4d0b8966 编写于 作者: L Liu Jicong

merge from 3.0

上级 b5f5400d
...@@ -288,7 +288,7 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req ...@@ -288,7 +288,7 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; } int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {}
void hbMgrInitMqHbHandle() { void hbMgrInitMqHbHandle() {
clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle; clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle;
...@@ -448,7 +448,7 @@ static void hbStopThread() { ...@@ -448,7 +448,7 @@ static void hbStopThread() {
} }
SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) { SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
/*return NULL;*/ return NULL;
hbMgrInit(); hbMgrInit();
SAppHbMgr *pAppHbMgr = malloc(sizeof(SAppHbMgr)); SAppHbMgr *pAppHbMgr = malloc(sizeof(SAppHbMgr));
if (pAppHbMgr == NULL) { if (pAppHbMgr == NULL) {
...@@ -506,7 +506,7 @@ void appHbMgrCleanup(void) { ...@@ -506,7 +506,7 @@ void appHbMgrCleanup(void) {
} }
int hbMgrInit() { int hbMgrInit() {
/*return 0;*/ return 0;
// init once // init once
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
if (old == 1) return 0; if (old == 1) return 0;
...@@ -524,7 +524,7 @@ int hbMgrInit() { ...@@ -524,7 +524,7 @@ int hbMgrInit() {
} }
void hbMgrCleanUp() { void hbMgrCleanUp() {
/*return;*/ return;
hbStopThread(); hbStopThread();
// destroy all appHbMgr // destroy all appHbMgr
...@@ -563,7 +563,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo * ...@@ -563,7 +563,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
} }
int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) {
/*return 0;*/ return 0;
SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY}; SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY};
SHbConnInfo info = {0}; SHbConnInfo info = {0};
...@@ -586,7 +586,7 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int3 ...@@ -586,7 +586,7 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int3
} }
void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
/*return;*/ return;
int32_t code = 0; int32_t code = 0;
code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
......
...@@ -659,9 +659,12 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -659,9 +659,12 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
if (pRsp->consumeRsp.numOfTopics == 0) { if (pRsp->consumeRsp.numOfTopics == 0) {
printf("no data\n"); printf("no data\n");
<<<<<<< Updated upstream
if (pParam->epoch == tmq->epoch) { if (pParam->epoch == tmq->epoch) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
} }
=======
>>>>>>> Stashed changes
taosFreeQitem(pRsp); taosFreeQitem(pRsp);
return 0; return 0;
} }
...@@ -979,6 +982,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -979,6 +982,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
while (1) { while (1) {
/*printf("cycle\n");*/ /*printf("cycle\n");*/
taosReadAllQitems(tmq->mqueue, tmq->qall); taosReadAllQitems(tmq->mqueue, tmq->qall);
<<<<<<< Updated upstream
rspMsg = tmqHandleAllRsp(tmq, blocking_time, true); rspMsg = tmqHandleAllRsp(tmq, blocking_time, true);
if (rspMsg) { if (rspMsg) {
return rspMsg; return rspMsg;
...@@ -990,6 +994,14 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -990,6 +994,14 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
return NULL; return NULL;
} }
} }
=======
tmqHandleAllRsp(tmq, blocking_time, true);
/*if (blocking_time != 0 && endTime - startTime > blocking_time) {*/
/*int64_t endTime = taosGetTimestampMs();*/
/*printf("normal exit\n");*/
/*return NULL;*/
/*}*/
>>>>>>> Stashed changes
} }
} }
......
...@@ -196,11 +196,11 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { ...@@ -196,11 +196,11 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
char *mode = NULL; char *mode = NULL;
if (tdFileOptions & TD_FILE_APPEND) { if (tdFileOptions & TD_FILE_APPEND) {
mode = (tdFileOptions & TD_FILE_TEXT) ? "at+" : "ab+"; mode = (tdFileOptions & TD_FILE_TEXT) ? "at+" : "ab+";
}else if (tdFileOptions & TD_FILE_TRUNC) { } else if (tdFileOptions & TD_FILE_TRUNC) {
mode = (tdFileOptions & TD_FILE_TEXT) ? "wt+" : "wb+"; mode = (tdFileOptions & TD_FILE_TEXT) ? "wt+" : "wb+";
}else if ((tdFileOptions & TD_FILE_READ) && !(tdFileOptions & TD_FILE_WRITE)) { } else if ((tdFileOptions & TD_FILE_READ) && !(tdFileOptions & TD_FILE_WRITE)) {
mode = (tdFileOptions & TD_FILE_TEXT) ? "rt" : "rb"; mode = (tdFileOptions & TD_FILE_TEXT) ? "rt" : "rb";
}else { } else {
mode = (tdFileOptions & TD_FILE_TEXT) ? "rt+" : "rb+"; mode = (tdFileOptions & TD_FILE_TEXT) ? "rt+" : "rb+";
} }
assert(!(tdFileOptions & TD_FILE_EXCL)); assert(!(tdFileOptions & TD_FILE_EXCL));
...@@ -637,7 +637,7 @@ void taosFprintfFile(TdFilePtr pFile, const char *format, ...) { ...@@ -637,7 +637,7 @@ void taosFprintfFile(TdFilePtr pFile, const char *format, ...) {
} }
assert(pFile->fp != NULL); assert(pFile->fp != NULL);
char buffer[MAX_FPRINTFLINE_BUFFER_SIZE] = {0}; char buffer[MAX_FPRINTFLINE_BUFFER_SIZE] = {0};
va_list ap; va_list ap;
va_start(ap, format); va_start(ap, format);
vfprintf(pFile->fp, format, ap); vfprintf(pFile->fp, format, ap);
...@@ -675,11 +675,15 @@ int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict__ ptrBuf) { ...@@ -675,11 +675,15 @@ int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict__ ptrBuf) {
size_t len = 0; size_t len = 0;
return getline(ptrBuf, &len, pFile->fp); return getline(ptrBuf, &len, pFile->fp);
} }
int32_t taosEOFFile(TdFilePtr pFile) { int32_t taosEOFFile(TdFilePtr pFile) {
if (pFile == NULL) { if (pFile == NULL) {
return 0; return 0;
} }
assert(pFile->fp != NULL); assert(pFile->fp != NULL);
<<<<<<< Updated upstream
return feof(pFile->fp); return feof(pFile->fp);
=======
return feof(pFile->fp);
>>>>>>> Stashed changes
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册