diff --git a/src/util/src/tlog.c b/src/util/src/tlog.c index ae12e25d5666bd3d6b3f9f996d4429c1609bea88..9770da49acf70fdc339ee5b6497ac213602e3f76 100644 --- a/src/util/src/tlog.c +++ b/src/util/src/tlog.c @@ -28,10 +28,13 @@ #define MAX_LOGLINE_DUMP_CONTENT_SIZE (MAX_LOGLINE_DUMP_SIZE - 100) #define LOG_FILE_NAME_LEN 300 -#define TSDB_DEFAULT_LOG_BUF_SIZE (512 * 1024) // 512K -#define TSDB_MIN_LOG_BUF_SIZE 1024 // 1K -#define TSDB_MAX_LOG_BUF_SIZE (1024 * 1024) // 1M -#define TSDB_DEFAULT_LOG_BUF_UNIT 1024 // 1K +#define TSDB_DEFAULT_LOG_BUF_SIZE (20 * 1024 * 1024) // 20MB + +#define DEFAULT_LOG_INTERVAL 25 +#define LOG_INTERVAL_STEP 5 +#define MIN_LOG_INTERVAL 5 +#define MAX_LOG_INTERVAL 25 +#define LOG_MAX_WAIT_MSEC 1000 #define LOG_BUF_BUFFER(x) ((x)->buffer) #define LOG_BUF_START(x) ((x)->buffStart) @@ -44,6 +47,7 @@ typedef struct { int32_t buffStart; int32_t buffEnd; int32_t buffSize; + int32_t minBuffSize; int32_t fd; int32_t stop; pthread_t asyncThread; @@ -68,6 +72,15 @@ int8_t tsAsyncLog = 1; float tsTotalLogDirGB = 0; float tsAvailLogDirGB = 0; float tsMinimalLogDirGB = 1.0f; +int64_t asyncLogLostLines = 0; +int32_t writeInterval = DEFAULT_LOG_INTERVAL; + +int64_t dbgEmptyW = 0; +int64_t dbgWN = 0; +int64_t dbgSmallWN = 0; +int64_t dbgBigWN = 0; +int64_t dbgWSize = 0; + #ifdef _TD_POWER_ char tsLogDir[TSDB_FILENAME_LEN] = "/var/log/power"; #else @@ -108,7 +121,8 @@ static void taosStopLog() { void taosCloseLog() { taosStopLog(); - tsem_post(&(tsLogObj.logHandle->buffNotEmpty)); + //tsem_post(&(tsLogObj.logHandle->buffNotEmpty)); + taosMsleep(MAX_LOG_INTERVAL/1000); if (taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) { pthread_join(tsLogObj.logHandle->asyncThread, NULL); } @@ -497,8 +511,6 @@ static void taosCloseLogByFd(int32_t fd) { static SLogBuff *taosLogBuffNew(int32_t bufSize) { SLogBuff *tLogBuff = NULL; - if (bufSize < TSDB_MIN_LOG_BUF_SIZE || bufSize > TSDB_MAX_LOG_BUF_SIZE) return NULL; - tLogBuff = calloc(1, sizeof(SLogBuff)); if (tLogBuff == NULL) return NULL; @@ -507,10 +519,11 @@ static SLogBuff *taosLogBuffNew(int32_t bufSize) { LOG_BUF_START(tLogBuff) = LOG_BUF_END(tLogBuff) = 0; LOG_BUF_SIZE(tLogBuff) = bufSize; + tLogBuff->minBuffSize = bufSize / 10; tLogBuff->stop = 0; if (pthread_mutex_init(&LOG_BUF_MUTEX(tLogBuff), NULL) < 0) goto _err; - tsem_init(&(tLogBuff->buffNotEmpty), 0, 0); + //tsem_init(&(tLogBuff->buffNotEmpty), 0, 0); return tLogBuff; @@ -529,10 +542,27 @@ static void taosLogBuffDestroy(SLogBuff *tLogBuff) { } #endif +static void taosCopyLogBuffer(SLogBuff *tLogBuff, int32_t start, int32_t end, char *msg, int32_t msgLen) { + if (start > end) { + memcpy(LOG_BUF_BUFFER(tLogBuff) + end, msg, msgLen); + } else { + if (LOG_BUF_SIZE(tLogBuff) - end < msgLen) { + memcpy(LOG_BUF_BUFFER(tLogBuff) + end, msg, LOG_BUF_SIZE(tLogBuff) - end); + memcpy(LOG_BUF_BUFFER(tLogBuff), msg + LOG_BUF_SIZE(tLogBuff) - end, msgLen - LOG_BUF_SIZE(tLogBuff) + end); + } else { + memcpy(LOG_BUF_BUFFER(tLogBuff) + end, msg, msgLen); + } + } + LOG_BUF_END(tLogBuff) = (LOG_BUF_END(tLogBuff) + msgLen) % LOG_BUF_SIZE(tLogBuff); +} + static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen) { int32_t start = 0; int32_t end = 0; int32_t remainSize = 0; + static int64_t lostLine = 0; + char tmpBuf[40] = {0}; + int32_t tmpBufLen = 0; if (tLogBuff == NULL || tLogBuff->stop) return -1; @@ -540,79 +570,128 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen) start = LOG_BUF_START(tLogBuff); end = LOG_BUF_END(tLogBuff); - remainSize = (start > end) ? (end - start - 1) : (start + LOG_BUF_SIZE(tLogBuff) - end - 1); + remainSize = (start > end) ? (start - end - 1) : (start + LOG_BUF_SIZE(tLogBuff) - end - 1); + + if (lostLine > 0) { + sprintf(tmpBuf, "...Lost %"PRId64" lines here...\n", lostLine); + tmpBufLen = (int32_t)strlen(tmpBuf); + } - if (remainSize <= msgLen) { + if (remainSize <= msgLen || ((lostLine > 0) && (remainSize <= (msgLen + tmpBufLen)))) { + lostLine++; + asyncLogLostLines++; pthread_mutex_unlock(&LOG_BUF_MUTEX(tLogBuff)); return -1; } - if (start > end) { - memcpy(LOG_BUF_BUFFER(tLogBuff) + end, msg, msgLen); - } else { - if (LOG_BUF_SIZE(tLogBuff) - end < msgLen) { - memcpy(LOG_BUF_BUFFER(tLogBuff) + end, msg, LOG_BUF_SIZE(tLogBuff) - end); - memcpy(LOG_BUF_BUFFER(tLogBuff), msg + LOG_BUF_SIZE(tLogBuff) - end, msgLen - LOG_BUF_SIZE(tLogBuff) + end); - } else { - memcpy(LOG_BUF_BUFFER(tLogBuff) + end, msg, msgLen); - } + if (lostLine > 0) { + taosCopyLogBuffer(tLogBuff, start, end, tmpBuf, tmpBufLen); + lostLine = 0; } - LOG_BUF_END(tLogBuff) = (LOG_BUF_END(tLogBuff) + msgLen) % LOG_BUF_SIZE(tLogBuff); - // TODO : put string in the buffer + taosCopyLogBuffer(tLogBuff, LOG_BUF_START(tLogBuff), LOG_BUF_END(tLogBuff), msg, msgLen); - tsem_post(&(tLogBuff->buffNotEmpty)); + //int32_t w = atomic_sub_fetch_32(&waitLock, 1); + /* + if (w <= 0 || ((remainSize - msgLen - tmpBufLen) < (LOG_BUF_SIZE(tLogBuff) * 4 /5))) { + tsem_post(&(tLogBuff->buffNotEmpty)); + dbgPostN++; + } else { + dbgNoPostN++; + } + */ pthread_mutex_unlock(&LOG_BUF_MUTEX(tLogBuff)); + return 0; } -static int32_t taosPollLogBuffer(SLogBuff *tLogBuff, char *buf, int32_t bufSize) { - int32_t start = LOG_BUF_START(tLogBuff); - int32_t end = LOG_BUF_END(tLogBuff); - int32_t pollSize = 0; +static int32_t taosGetLogRemainSize(SLogBuff *tLogBuff, int32_t start, int32_t end) { + int32_t rSize = end - start; - if (start == end) { - return 0; - } else if (start < end) { - pollSize = MIN(end - start, bufSize); + return rSize >= 0 ? rSize : LOG_BUF_SIZE(tLogBuff) + rSize; +} - memcpy(buf, LOG_BUF_BUFFER(tLogBuff) + start, pollSize); - return pollSize; - } else { - pollSize = MIN(end + LOG_BUF_SIZE(tLogBuff) - start, bufSize); - if (pollSize > LOG_BUF_SIZE(tLogBuff) - start) { - int32_t tsize = LOG_BUF_SIZE(tLogBuff) - start; - memcpy(buf, LOG_BUF_BUFFER(tLogBuff) + start, tsize); - memcpy(buf + tsize, LOG_BUF_BUFFER(tLogBuff), pollSize - tsize); +static void taosWriteLog(SLogBuff *tLogBuff) { + static int32_t lastDuration = 0; + int32_t remainChecked = 0; + int32_t start, end, pollSize; + + do { + if (remainChecked == 0) { + start = LOG_BUF_START(tLogBuff); + end = LOG_BUF_END(tLogBuff); + + if (start == end) { + dbgEmptyW++; + writeInterval = MAX_LOG_INTERVAL; + return; + } + pollSize = taosGetLogRemainSize(tLogBuff, start, end); + if (pollSize < tLogBuff->minBuffSize) { + lastDuration += writeInterval; + if (lastDuration < LOG_MAX_WAIT_MSEC) { + break; + } + } + + lastDuration = 0; + } + + if (start < end) { + taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, pollSize); } else { - memcpy(buf, LOG_BUF_BUFFER(tLogBuff) + start, pollSize); + int32_t tsize = LOG_BUF_SIZE(tLogBuff) - start; + taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, tsize); + + taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff), end); } - return pollSize; - } + + dbgWN++; + dbgWSize+=pollSize; + + if (pollSize < tLogBuff->minBuffSize) { + dbgSmallWN++; + if (writeInterval < MAX_LOG_INTERVAL) { + writeInterval += LOG_INTERVAL_STEP; + } + } else if (pollSize > LOG_BUF_SIZE(tLogBuff)/3) { + dbgBigWN++; + writeInterval = MIN_LOG_INTERVAL; + } else if (pollSize > LOG_BUF_SIZE(tLogBuff)/4) { + if (writeInterval > MIN_LOG_INTERVAL) { + writeInterval -= LOG_INTERVAL_STEP; + } + } + + LOG_BUF_START(tLogBuff) = (LOG_BUF_START(tLogBuff) + pollSize) % LOG_BUF_SIZE(tLogBuff); + + start = LOG_BUF_START(tLogBuff); + end = LOG_BUF_END(tLogBuff); + + pollSize = taosGetLogRemainSize(tLogBuff, start, end); + if (pollSize < tLogBuff->minBuffSize) { + break; + } + + writeInterval = MIN_LOG_INTERVAL; + + remainChecked = 1; + }while (1); } static void *taosAsyncOutputLog(void *param) { SLogBuff *tLogBuff = (SLogBuff *)param; - int32_t log_size = 0; - - char tempBuffer[TSDB_DEFAULT_LOG_BUF_UNIT]; - + while (1) { - tsem_wait(&(tLogBuff->buffNotEmpty)); + //tsem_wait(&(tLogBuff->buffNotEmpty)); + + taosMsleep(writeInterval); // Polling the buffer - while (1) { - log_size = taosPollLogBuffer(tLogBuff, tempBuffer, TSDB_DEFAULT_LOG_BUF_UNIT); - if (log_size) { - taosWrite(tLogBuff->fd, tempBuffer, log_size); - LOG_BUF_START(tLogBuff) = (LOG_BUF_START(tLogBuff) + log_size) % LOG_BUF_SIZE(tLogBuff); - } else { - break; - } - } + taosWriteLog(tLogBuff); if (tLogBuff->stop) break; }