提交 0d262b87 编写于 作者: S Shengliang Guan

fix: definite lost in tlog.c found by valgrind

上级 884bf377
......@@ -39,7 +39,7 @@ int32_t taosMemorySize(void *ptr);
#define taosMemoryFreeClear(ptr) \
do { \
if (ptr) { \
taosMemoryFree((void*)ptr); \
taosMemoryFree((void *)ptr); \
(ptr) = NULL; \
} \
} while (0)
......
......@@ -69,9 +69,9 @@ void taos_cleanup(void) {
rpcCleanup();
catalogDestroy();
schedulerDestroy();
taosCloseLog();
tscInfo("all local resources released");
taosCloseLog();
}
setConfRet taos_set_config(const char *config) {
......
......@@ -47,7 +47,6 @@ typedef struct {
int32_t stop;
TdThread asyncThread;
TdThreadMutex buffMutex;
tsem_t buffNotEmpty;
} SLogBuff;
typedef struct {
......@@ -100,7 +99,7 @@ int64_t dbgBigWN = 0;
int64_t dbgWSize = 0;
static void *taosAsyncOutputLog(void *param);
static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, const char *msg, int32_t msgLen);
static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t msgLen);
static SLogBuff *taosLogBuffNew(int32_t bufSize);
static void taosCloseLogByFd(TdFilePtr pFile);
static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum);
......@@ -136,16 +135,24 @@ static void taosStopLog() {
}
}
static void taosLogBuffDestroy() {
taosThreadMutexDestroy(&tsLogObj.logHandle->buffMutex);
taosCloseFile(&tsLogObj.logHandle->pFile);
taosMemoryFreeClear(tsLogObj.logHandle->buffer);
memset(&tsLogObj.logHandle->buffer, 0, sizeof(tsLogObj.logHandle->buffer));
taosThreadMutexDestroy(&tsLogObj.logMutex);
taosMemoryFreeClear(tsLogObj.logHandle);
memset(&tsLogObj.logHandle, 0, sizeof(tsLogObj.logHandle));
tsLogObj.logHandle = NULL;
}
void taosCloseLog() {
taosStopLog();
if (taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) {
taosThreadJoin(tsLogObj.logHandle->asyncThread, NULL);
}
tsLogInited = 0;
// In case that other threads still use log resources causing invalid write in valgrind
// we comment two lines below.
// taosLogBuffDestroy(tsLogObj.logHandle);
// taosCloseLog();
taosLogBuffDestroy(tsLogObj.logHandle);
}
static bool taosLockLogFile(TdFilePtr pFile) {
......@@ -506,45 +513,45 @@ static void taosCloseLogByFd(TdFilePtr pFile) {
}
static SLogBuff *taosLogBuffNew(int32_t bufSize) {
SLogBuff *tLogBuff = NULL;
SLogBuff *pLogBuf = NULL;
tLogBuff = taosMemoryCalloc(1, sizeof(SLogBuff));
if (tLogBuff == NULL) return NULL;
pLogBuf = taosMemoryCalloc(1, sizeof(SLogBuff));
if (pLogBuf == NULL) return NULL;
LOG_BUF_BUFFER(tLogBuff) = taosMemoryMalloc(bufSize);
if (LOG_BUF_BUFFER(tLogBuff) == NULL) goto _err;
LOG_BUF_BUFFER(pLogBuf) = taosMemoryMalloc(bufSize);
if (LOG_BUF_BUFFER(pLogBuf) == NULL) goto _err;
LOG_BUF_START(tLogBuff) = LOG_BUF_END(tLogBuff) = 0;
LOG_BUF_SIZE(tLogBuff) = bufSize;
tLogBuff->minBuffSize = bufSize / 10;
tLogBuff->stop = 0;
LOG_BUF_START(pLogBuf) = LOG_BUF_END(pLogBuf) = 0;
LOG_BUF_SIZE(pLogBuf) = bufSize;
pLogBuf->minBuffSize = bufSize / 10;
pLogBuf->stop = 0;
if (taosThreadMutexInit(&LOG_BUF_MUTEX(tLogBuff), NULL) < 0) goto _err;
// tsem_init(&(tLogBuff->buffNotEmpty), 0, 0);
if (taosThreadMutexInit(&LOG_BUF_MUTEX(pLogBuf), NULL) < 0) goto _err;
// tsem_init(&(pLogBuf->buffNotEmpty), 0, 0);
return tLogBuff;
return pLogBuf;
_err:
taosMemoryFreeClear(LOG_BUF_BUFFER(tLogBuff));
taosMemoryFreeClear(tLogBuff);
taosMemoryFreeClear(LOG_BUF_BUFFER(pLogBuf));
taosMemoryFreeClear(pLogBuf);
return NULL;
}
static void taosCopyLogBuffer(SLogBuff *tLogBuff, int32_t start, int32_t end, const char *msg, int32_t msgLen) {
static void taosCopyLogBuffer(SLogBuff *pLogBuf, int32_t start, int32_t end, const char *msg, int32_t msgLen) {
if (start > end) {
memcpy(LOG_BUF_BUFFER(tLogBuff) + end, msg, msgLen);
memcpy(LOG_BUF_BUFFER(pLogBuf) + end, msg, msgLen);
} else {
if (LOG_BUF_SIZE(tLogBuff) - end < msgLen) {
memcpy(LOG_BUF_BUFFER(tLogBuff) + end, msg, LOG_BUF_SIZE(tLogBuff) - end);
memcpy(LOG_BUF_BUFFER(tLogBuff), msg + LOG_BUF_SIZE(tLogBuff) - end, msgLen - LOG_BUF_SIZE(tLogBuff) + end);
if (LOG_BUF_SIZE(pLogBuf) - end < msgLen) {
memcpy(LOG_BUF_BUFFER(pLogBuf) + end, msg, LOG_BUF_SIZE(pLogBuf) - end);
memcpy(LOG_BUF_BUFFER(pLogBuf), msg + LOG_BUF_SIZE(pLogBuf) - end, msgLen - LOG_BUF_SIZE(pLogBuf) + end);
} else {
memcpy(LOG_BUF_BUFFER(tLogBuff) + end, msg, msgLen);
memcpy(LOG_BUF_BUFFER(pLogBuf) + end, msg, msgLen);
}
}
LOG_BUF_END(tLogBuff) = (LOG_BUF_END(tLogBuff) + msgLen) % LOG_BUF_SIZE(tLogBuff);
LOG_BUF_END(pLogBuf) = (LOG_BUF_END(pLogBuf) + msgLen) % LOG_BUF_SIZE(pLogBuf);
}
static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, const char *msg, int32_t msgLen) {
static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t msgLen) {
int32_t start = 0;
int32_t end = 0;
int32_t remainSize = 0;
......@@ -552,13 +559,13 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, const char *msg, int32_t ms
char tmpBuf[40] = {0};
int32_t tmpBufLen = 0;
if (tLogBuff == NULL || tLogBuff->stop) return -1;
if (pLogBuf == NULL || pLogBuf->stop) return -1;
taosThreadMutexLock(&LOG_BUF_MUTEX(tLogBuff));
start = LOG_BUF_START(tLogBuff);
end = LOG_BUF_END(tLogBuff);
taosThreadMutexLock(&LOG_BUF_MUTEX(pLogBuf));
start = LOG_BUF_START(pLogBuf);
end = LOG_BUF_END(pLogBuf);
remainSize = (start > end) ? (start - end - 1) : (start + LOG_BUF_SIZE(tLogBuff) - end - 1);
remainSize = (start > end) ? (start - end - 1) : (start + LOG_BUF_SIZE(pLogBuf) - end - 1);
if (lostLine > 0) {
sprintf(tmpBuf, "...Lost %" PRId64 " lines here...\n", lostLine);
......@@ -568,47 +575,47 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, const char *msg, int32_t ms
if (remainSize <= msgLen || ((lostLine > 0) && (remainSize <= (msgLen + tmpBufLen)))) {
lostLine++;
tsAsyncLogLostLines++;
taosThreadMutexUnlock(&LOG_BUF_MUTEX(tLogBuff));
taosThreadMutexUnlock(&LOG_BUF_MUTEX(pLogBuf));
return -1;
}
if (lostLine > 0) {
taosCopyLogBuffer(tLogBuff, start, end, tmpBuf, tmpBufLen);
taosCopyLogBuffer(pLogBuf, start, end, tmpBuf, tmpBufLen);
lostLine = 0;
}
taosCopyLogBuffer(tLogBuff, LOG_BUF_START(tLogBuff), LOG_BUF_END(tLogBuff), msg, msgLen);
taosCopyLogBuffer(pLogBuf, LOG_BUF_START(pLogBuf), LOG_BUF_END(pLogBuf), msg, msgLen);
// int32_t w = atomic_sub_fetch_32(&waitLock, 1);
/*
if (w <= 0 || ((remainSize - msgLen - tmpBufLen) < (LOG_BUF_SIZE(tLogBuff) * 4 /5))) {
tsem_post(&(tLogBuff->buffNotEmpty));
if (w <= 0 || ((remainSize - msgLen - tmpBufLen) < (LOG_BUF_SIZE(pLogBuf) * 4 /5))) {
tsem_post(&(pLogBuf->buffNotEmpty));
dbgPostN++;
} else {
dbgNoPostN++;
}
*/
taosThreadMutexUnlock(&LOG_BUF_MUTEX(tLogBuff));
taosThreadMutexUnlock(&LOG_BUF_MUTEX(pLogBuf));
return 0;
}
static int32_t taosGetLogRemainSize(SLogBuff *tLogBuff, int32_t start, int32_t end) {
static int32_t taosGetLogRemainSize(SLogBuff *pLogBuf, int32_t start, int32_t end) {
int32_t rSize = end - start;
return rSize >= 0 ? rSize : LOG_BUF_SIZE(tLogBuff) + rSize;
return rSize >= 0 ? rSize : LOG_BUF_SIZE(pLogBuf) + rSize;
}
static void taosWriteLog(SLogBuff *tLogBuff) {
static void taosWriteLog(SLogBuff *pLogBuf) {
static int32_t lastDuration = 0;
int32_t remainChecked = 0;
int32_t start, end, pollSize;
do {
if (remainChecked == 0) {
start = LOG_BUF_START(tLogBuff);
end = LOG_BUF_END(tLogBuff);
start = LOG_BUF_START(pLogBuf);
end = LOG_BUF_END(pLogBuf);
if (start == end) {
dbgEmptyW++;
......@@ -616,8 +623,8 @@ static void taosWriteLog(SLogBuff *tLogBuff) {
return;
}
pollSize = taosGetLogRemainSize(tLogBuff, start, end);
if (pollSize < tLogBuff->minBuffSize) {
pollSize = taosGetLogRemainSize(pLogBuf, start, end);
if (pollSize < pLogBuf->minBuffSize) {
lastDuration += tsWriteInterval;
if (lastDuration < LOG_MAX_WAIT_MSEC) {
break;
......@@ -628,38 +635,38 @@ static void taosWriteLog(SLogBuff *tLogBuff) {
}
if (start < end) {
taosWriteFile(tLogBuff->pFile, LOG_BUF_BUFFER(tLogBuff) + start, pollSize);
taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf) + start, pollSize);
} else {
int32_t tsize = LOG_BUF_SIZE(tLogBuff) - start;
taosWriteFile(tLogBuff->pFile, LOG_BUF_BUFFER(tLogBuff) + start, tsize);
int32_t tsize = LOG_BUF_SIZE(pLogBuf) - start;
taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf) + start, tsize);
taosWriteFile(tLogBuff->pFile, LOG_BUF_BUFFER(tLogBuff), end);
taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf), end);
}
dbgWN++;
dbgWSize += pollSize;
if (pollSize < tLogBuff->minBuffSize) {
if (pollSize < pLogBuf->minBuffSize) {
dbgSmallWN++;
if (tsWriteInterval < LOG_MAX_INTERVAL) {
tsWriteInterval += LOG_INTERVAL_STEP;
}
} else if (pollSize > LOG_BUF_SIZE(tLogBuff) / 3) {
} else if (pollSize > LOG_BUF_SIZE(pLogBuf) / 3) {
dbgBigWN++;
tsWriteInterval = LOG_MIN_INTERVAL;
} else if (pollSize > LOG_BUF_SIZE(tLogBuff) / 4) {
} else if (pollSize > LOG_BUF_SIZE(pLogBuf) / 4) {
if (tsWriteInterval > LOG_MIN_INTERVAL) {
tsWriteInterval -= LOG_INTERVAL_STEP;
}
}
LOG_BUF_START(tLogBuff) = (LOG_BUF_START(tLogBuff) + pollSize) % LOG_BUF_SIZE(tLogBuff);
LOG_BUF_START(pLogBuf) = (LOG_BUF_START(pLogBuf) + pollSize) % LOG_BUF_SIZE(pLogBuf);
start = LOG_BUF_START(tLogBuff);
end = LOG_BUF_END(tLogBuff);
start = LOG_BUF_START(pLogBuf);
end = LOG_BUF_END(pLogBuf);
pollSize = taosGetLogRemainSize(tLogBuff, start, end);
if (pollSize < tLogBuff->minBuffSize) {
pollSize = taosGetLogRemainSize(pLogBuf, start, end);
if (pollSize < pLogBuf->minBuffSize) {
break;
}
......@@ -670,16 +677,16 @@ static void taosWriteLog(SLogBuff *tLogBuff) {
}
static void *taosAsyncOutputLog(void *param) {
SLogBuff *tLogBuff = (SLogBuff *)param;
SLogBuff *pLogBuf = (SLogBuff *)param;
setThreadName("log");
while (1) {
taosMsleep(tsWriteInterval);
// Polling the buffer
taosWriteLog(tLogBuff);
taosWriteLog(pLogBuf);
if (tLogBuff->stop) break;
if (pLogBuf->stop) break;
}
return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册