未验证 提交 f22d279f 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #21057 from taosdata/feat/TD-19567

feat: support log slow query by default
......@@ -24,6 +24,12 @@
extern "C" {
#endif
#define SLOW_LOG_TYPE_QUERY 0x1
#define SLOW_LOG_TYPE_INSERT 0x2
#define SLOW_LOG_TYPE_OTHERS 0x4
#define SLOW_LOG_TYPE_ALL 0xFFFFFFFF
// cluster
extern char tsFirst[];
extern char tsSecond[];
......@@ -118,6 +124,8 @@ extern int32_t tsRedirectFactor;
extern int32_t tsRedirectMaxPeriod;
extern int32_t tsMaxRetryWaitTime;
extern bool tsUseAdapter;
extern int32_t tsSlowLogThreshold;
extern int32_t tsSlowLogScope;
// client
extern int32_t tsMinSlidingTime;
......
......@@ -103,7 +103,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_CHECKSUM_ERROR TAOS_DEF_ERROR_CODE(0, 0x011F) // internal
#define TSDB_CODE_COMPRESS_ERROR TAOS_DEF_ERROR_CODE(0, 0x0120)
#define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0121) //
#define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0121)
#define TSDB_CODE_CFG_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0122)
#define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0123)
#define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0124)
......@@ -118,9 +118,10 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MSG_ENCODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x012D)
#define TSDB_CODE_NO_ENOUGH_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012E)
#define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130) //
#define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131) //
#define TSDB_CODE_IVLD_DATA_FMT TAOS_DEF_ERROR_CODE(0, 0x0132) //
#define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130)
#define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131)
#define TSDB_CODE_INVALID_DATA_FMT TAOS_DEF_ERROR_CODE(0, 0x0132)
#define TSDB_CODE_INVALID_CFG_VALUE TAOS_DEF_ERROR_CODE(0, 0x0133)
//client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)
......
......@@ -83,6 +83,12 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
#endif
;
void taosPrintSlowLog(const char *format, ...)
#ifdef __GNUC__
__attribute__((format(printf, 1, 2)))
#endif
;
bool taosAssertDebug(bool condition, const char *file, int32_t line, const char *format, ...);
bool taosAssertRelease(bool condition);
......
......@@ -42,7 +42,7 @@ SAppInfo appInfo;
int64_t lastClusterId = 0;
int32_t clientReqRefPool = -1;
int32_t clientConnRefPool = -1;
int32_t clientStop = 0;
int32_t clientStop = -1;
int32_t timestampDeltaLimit = 900; // s
......@@ -69,7 +69,6 @@ static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
}
static void deregisterRequest(SRequestObj *pRequest) {
const static int64_t SLOW_QUERY_INTERVAL = 3000000L; // todo configurable
if (pRequest == NULL) {
tscError("pRequest == NULL");
return;
......@@ -80,6 +79,7 @@ static void deregisterRequest(SRequestObj *pRequest) {
int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
int32_t reqType = SLOW_LOG_TYPE_OTHERS;
int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64
......@@ -95,6 +95,7 @@ static void deregisterRequest(SRequestObj *pRequest) {
duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
pRequest->metric.planCostUs, pRequest->metric.execCostUs);
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
reqType = SLOW_LOG_TYPE_INSERT;
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
"us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
......@@ -102,12 +103,16 @@ static void deregisterRequest(SRequestObj *pRequest) {
pRequest->metric.planCostUs, pRequest->metric.execCostUs);
atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
reqType = SLOW_LOG_TYPE_QUERY;
}
}
if (duration >= SLOW_QUERY_INTERVAL) {
if (duration >= (tsSlowLogThreshold * 1000000UL)) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
tscWarnL("slow query: %s, duration:%" PRId64, pRequest->sqlstr, duration);
if (tsSlowLogScope & reqType) {
taosPrintSlowLog("PID:%d, Conn:%u, QID:0x%" PRIx64 ", Start:%" PRId64 ", Duration:%" PRId64 "us, SQL:%s",
taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration, pRequest->sqlstr);
}
}
releaseTscObj(pTscObj->id);
......@@ -427,8 +432,12 @@ static void *tscCrashReportThreadFp(void *param) {
}
#endif
if (-1 != atomic_val_compare_exchange_32(&clientStop, -1, 0)) {
return NULL;
}
while (1) {
if (clientStop) break;
if (clientStop > 0) break;
if (loopTimes++ < reportPeriodNum) {
taosMsleep(sleepTime);
continue;
......@@ -466,7 +475,7 @@ static void *tscCrashReportThreadFp(void *param) {
loopTimes = 0;
}
clientStop = -1;
clientStop = -2;
return NULL;
}
......
......@@ -1248,6 +1248,11 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
return NULL;
}
pRequest->sqlstr = taosStrdup("taos_connect");
if (pRequest->sqlstr) {
pRequest->sqlLen = strlen(pRequest->sqlstr);
}
SMsgSendInfo* body = buildConnectMsg(pRequest);
int64_t transporterId = 0;
......@@ -1257,7 +1262,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
if (pRequest->code != TSDB_CODE_SUCCESS) {
const char* errorMsg =
(pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
fprintf(stderr, "failed to connect to server, reason: %s\n\n", errorMsg);
tscError("failed to connect to server, reason: %s", errorMsg);
terrno = pRequest->code;
destroyRequest(pRequest);
......
......@@ -660,6 +660,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
SMCreateStbReq pReq = {0};
int32_t code = TSDB_CODE_SUCCESS;
SCmdMsgInfo pCmdMsg = {0};
char *pSql = NULL;
// put front for free
pReq.numOfColumns = taosArrayGetSize(pColumns);
......@@ -667,32 +668,35 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
pReq.numOfTags = taosArrayGetSize(pTags);
pReq.pTags = pTags;
code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
pRequest->syncQuery = true;
if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
if (action == SCHEMA_ACTION_CREATE_STABLE) {
pReq.colVer = 1;
pReq.tagVer = 1;
pReq.suid = 0;
pReq.source = TD_REQ_FROM_APP;
pSql = "sml_create_stable";
} else if (action == SCHEMA_ACTION_ADD_TAG || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
pReq.colVer = pTableMeta->sversion;
pReq.tagVer = pTableMeta->tversion + 1;
pReq.suid = pTableMeta->uid;
pReq.source = TD_REQ_FROM_TAOX;
pSql = (action == SCHEMA_ACTION_ADD_TAG) ? "sml_add_tag" : "sml_modify_tag_size";
} else if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE) {
pReq.colVer = pTableMeta->sversion + 1;
pReq.tagVer = pTableMeta->tversion;
pReq.suid = pTableMeta->uid;
pReq.source = TD_REQ_FROM_TAOX;
pSql = (action == SCHEMA_ACTION_ADD_COLUMN) ? "sml_add_column" : "sml_modify_column_size";
}
code = buildRequest(info->taos->id, pSql, strlen(pSql), NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
pRequest->syncQuery = true;
if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
if (pReq.numOfTags == 0) {
......@@ -1514,6 +1518,44 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
return code;
}
void smlSetReqSQL(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd) {
if (tsSlowLogScope & SLOW_LOG_TYPE_INSERT) {
int32_t len = 0;
int32_t rlen = 0;
char* p = NULL;
if (lines && lines[0]) {
len = strlen(lines[0]);
p = lines[0];
} else if (rawLine) {
if (rawLineEnd) {
len = rawLineEnd - rawLine;
} else {
len = strlen(rawLine);
}
p = rawLine;
}
if (NULL == p) {
return;
}
rlen = TMIN(len, TSDB_MAX_ALLOWED_SQL_LEN);
rlen = TMAX(rlen, 0);
char *sql = taosMemoryMalloc(rlen + 1);
if (NULL == sql) {
uError("malloc %d for sml sql failed", rlen + 1);
return;
}
memcpy(sql, p, rlen);
sql[rlen] = 0;
request->sqlstr = sql;
request->sqlLen = rlen;
}
}
TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, char *rawLineEnd, int numLines,
int protocol, int precision, int32_t ttl, int64_t reqid) {
int32_t code = TSDB_CODE_SUCCESS;
......@@ -1546,6 +1588,8 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
info->lineNum = numLines;
smlSetReqSQL(request, lines, rawLine, rawLineEnd);
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
if (request->pDb == NULL) {
request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
......
......@@ -500,7 +500,7 @@ int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal)
break;
default:
ASSERTS(0, "invalid row format");
return TSDB_CODE_IVLD_DATA_FMT;
return TSDB_CODE_INVALID_DATA_FMT;
}
if (bv == BIT_FLG_NONE) {
......@@ -938,7 +938,7 @@ static int32_t tRowTupleUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *
break;
default:
ASSERTS(0, "Invalid row flag");
return TSDB_CODE_IVLD_DATA_FMT;
return TSDB_CODE_INVALID_DATA_FMT;
}
while (pColData) {
......@@ -963,7 +963,7 @@ static int32_t tRowTupleUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *
break;
default:
ASSERTS(0, "Invalid row flag");
return TSDB_CODE_IVLD_DATA_FMT;
return TSDB_CODE_INVALID_DATA_FMT;
}
if (bv == BIT_FLG_NONE) {
......@@ -1054,7 +1054,7 @@ static int32_t tRowKVUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aCo
pData = pv + ((uint32_t *)pKVIdx->idx)[iCol];
} else {
ASSERTS(0, "Invalid KV row format");
return TSDB_CODE_IVLD_DATA_FMT;
return TSDB_CODE_INVALID_DATA_FMT;
}
int16_t cid;
......
......@@ -117,6 +117,10 @@ int32_t tsRedirectFactor = 2;
int32_t tsRedirectMaxPeriod = 1000;
int32_t tsMaxRetryWaitTime = 10000;
bool tsUseAdapter = false;
int32_t tsSlowLogThreshold = 3; // seconds
int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL;
/*
......@@ -345,6 +349,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1;
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1;
if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 0, INT32_MAX, true) != 0) return -1;
if (cfgAddString(pCfg, "slowLogScope", "", true) != 0) return -1;
tsNumOfRpcThreads = tsNumOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS);
......@@ -692,6 +698,42 @@ static void taosSetServerLogCfg(SConfig *pCfg) {
metaDebugFlag = cfgGetItem(pCfg, "metaDebugFlag")->i32;
}
static int32_t taosSetSlowLogScope(char *pScope) {
if (NULL == pScope || 0 == strlen(pScope)) {
tsSlowLogScope = SLOW_LOG_TYPE_ALL;
return 0;
}
if (0 == strcasecmp(pScope, "all")) {
tsSlowLogScope = SLOW_LOG_TYPE_ALL;
return 0;
}
if (0 == strcasecmp(pScope, "query")) {
tsSlowLogScope = SLOW_LOG_TYPE_QUERY;
return 0;
}
if (0 == strcasecmp(pScope, "insert")) {
tsSlowLogScope = SLOW_LOG_TYPE_INSERT;
return 0;
}
if (0 == strcasecmp(pScope, "others")) {
tsSlowLogScope = SLOW_LOG_TYPE_OTHERS;
return 0;
}
if (0 == strcasecmp(pScope, "none")) {
tsSlowLogScope = 0;
return 0;
}
uError("Invalid slowLog scope value:%s", pScope);
terrno = TSDB_CODE_INVALID_CFG_VALUE;
return -1;
}
static int32_t taosSetClientCfg(SConfig *pCfg) {
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN);
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
......@@ -742,6 +784,10 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsUseAdapter = cfgGetItem(pCfg, "useAdapter")->bval;
tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval;
tsQueryMaxConcurrentTables = cfgGetItem(pCfg, "queryMaxConcurrentTables")->i64;
tsSlowLogThreshold = cfgGetItem(pCfg, "slowLogThreshold")->i32;
if (taosSetSlowLogScope(cfgGetItem(pCfg, "slowLogScope")->str)) {
return -1;
}
tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32;
......@@ -1156,6 +1202,12 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
sDebugFlag = cfgGetItem(pCfg, "sDebugFlag")->i32;
} else if (strcasecmp("smaDebugFlag", name) == 0) {
smaDebugFlag = cfgGetItem(pCfg, "smaDebugFlag")->i32;
} else if (strcasecmp("slowLogThreshold", name) == 0) {
tsSlowLogThreshold = cfgGetItem(pCfg, "slowLogThreshold")->i32;
} else if (strcasecmp("slowLogScope", name) == 0) {
if (taosSetSlowLogScope(cfgGetItem(pCfg, "slowLogScope")->str)) {
return -1;
}
}
break;
}
......
......@@ -98,7 +98,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DISKSPACE, "No enough disk space"
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting up")
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down")
TAOS_DEFINE_ERROR(TSDB_CODE_IVLD_DATA_FMT, "Invalid data format")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_DATA_FMT, "Invalid data format")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CFG_VALUE, "Invalid configuration value")
//client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")
......
......@@ -24,10 +24,11 @@
#define LOG_MAX_LINE_SIZE (10024)
#define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3)
#define LOG_MAX_LINE_DUMP_SIZE (1024 * 1024)
#define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 3)
#define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 128)
#define LOG_FILE_NAME_LEN 300
#define LOG_DEFAULT_BUF_SIZE (20 * 1024 * 1024) // 20MB
#define LOG_SLOW_BUF_SIZE (10 * 1024 * 1024) // 10MB
#define LOG_DEFAULT_INTERVAL 25
#define LOG_INTERVAL_STEP 5
......@@ -51,6 +52,8 @@ typedef struct {
int32_t stop;
TdThread asyncThread;
TdThreadMutex buffMutex;
int32_t writeInterval;
int32_t lastDuration;
} SLogBuff;
typedef struct {
......@@ -62,6 +65,7 @@ typedef struct {
pid_t pid;
char logName[LOG_FILE_NAME_LEN];
SLogBuff *logHandle;
SLogBuff *slowHandle;
TdThreadMutex logMutex;
} SLogObj;
......@@ -69,7 +73,6 @@ extern SConfig *tsCfg;
static int8_t tsLogInited = 0;
static SLogObj tsLogObj = {.fileNum = 1};
static int64_t tsAsyncLogLostLines = 0;
static int32_t tsWriteInterval = LOG_DEFAULT_INTERVAL;
static int32_t tsDaylightActive; /* Currently in daylight saving time. */
bool tsLogEmbedded = 0;
......@@ -82,6 +85,7 @@ int64_t tsNumOfErrorLogs = 0;
int64_t tsNumOfInfoLogs = 0;
int64_t tsNumOfDebugLogs = 0;
int64_t tsNumOfTraceLogs = 0;
int64_t tsNumOfSlowLogs = 0;
// log
int32_t dDebugFlag = 131;
......@@ -136,6 +140,34 @@ static int32_t taosStartLog() {
return 0;
}
int32_t taosInitSlowLog() {
char fullName[PATH_MAX] = {0};
char logFileName[64] = {0};
#ifdef CUS_PROMPT
snprintf(logFileName, 64, "%sSlowLog", CUS_PROMPT);
#else
snprintf(logFileName, 64, "taosSlowLog");
#endif
if (strlen(tsLogDir) != 0) {
snprintf(fullName, PATH_MAX, "%s" TD_DIRSEP "%s", tsLogDir, logFileName);
} else {
snprintf(fullName, PATH_MAX, "%s", logFileName);
}
tsLogObj.slowHandle = taosLogBuffNew(LOG_SLOW_BUF_SIZE);
if (tsLogObj.slowHandle == NULL) return -1;
taosUmaskFile(0);
tsLogObj.slowHandle->pFile = taosOpenFile(fullName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (tsLogObj.slowHandle->pFile == NULL) {
printf("\nfailed to open slow log file:%s, reason:%s\n", fullName, strerror(errno));
return -1;
}
return 0;
}
int32_t taosInitLog(const char *logName, int32_t maxFiles) {
if (atomic_val_compare_exchange_8(&tsLogInited, 0, 1) != 0) return 0;
osUpdate();
......@@ -151,6 +183,8 @@ int32_t taosInitLog(const char *logName, int32_t maxFiles) {
tsLogObj.logHandle = taosLogBuffNew(LOG_DEFAULT_BUF_SIZE);
if (tsLogObj.logHandle == NULL) return -1;
if (taosOpenLogFile(fullName, tsNumOfLogLines, maxFiles) < 0) return -1;
if (taosInitSlowLog() < 0) return -1;
if (taosStartLog() < 0) return -1;
return 0;
}
......@@ -159,25 +193,34 @@ static void taosStopLog() {
if (tsLogObj.logHandle) {
tsLogObj.logHandle->stop = 1;
}
if (tsLogObj.slowHandle) {
tsLogObj.slowHandle->stop = 1;
}
}
void taosCloseLog() {
taosStopLog();
if (tsLogObj.logHandle != NULL && taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) {
taosThreadJoin(tsLogObj.logHandle->asyncThread, NULL);
taosThreadClear(&tsLogObj.logHandle->asyncThread);
}
if (tsLogObj.slowHandle != NULL) {
taosThreadMutexDestroy(&tsLogObj.slowHandle->buffMutex);
taosCloseFile(&tsLogObj.slowHandle->pFile);
taosMemoryFreeClear(tsLogObj.slowHandle->buffer);
taosMemoryFreeClear(tsLogObj.slowHandle);
}
if (tsLogObj.logHandle != NULL) {
taosStopLog();
if (tsLogObj.logHandle != NULL && taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) {
taosThreadJoin(tsLogObj.logHandle->asyncThread, NULL);
taosThreadClear(&tsLogObj.logHandle->asyncThread);
}
tsLogInited = 0;
taosThreadMutexDestroy(&tsLogObj.logHandle->buffMutex);
taosCloseFile(&tsLogObj.logHandle->pFile);
taosMemoryFreeClear(tsLogObj.logHandle->buffer);
memset(&tsLogObj.logHandle->buffer, 0, sizeof(tsLogObj.logHandle->buffer));
taosThreadMutexDestroy(&tsLogObj.logMutex);
taosMemoryFreeClear(tsLogObj.logHandle);
memset(&tsLogObj.logHandle, 0, sizeof(tsLogObj.logHandle));
tsLogObj.logHandle = NULL;
}
}
......@@ -513,10 +556,9 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
va_list argpointer;
va_start(argpointer, format);
len += vsnprintf(buffer + len, LOG_MAX_LINE_DUMP_BUFFER_SIZE - len, format, argpointer);
len += vsnprintf(buffer + len, LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2 - len, format, argpointer);
va_end(argpointer);
if (len > LOG_MAX_LINE_DUMP_SIZE) len = LOG_MAX_LINE_DUMP_SIZE;
buffer[len++] = '\n';
buffer[len] = 0;
......@@ -524,6 +566,31 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
taosMemoryFree(buffer);
}
void taosPrintSlowLog(const char *format, ...) {
if (!osLogSpaceAvailable()) return;
char *buffer = taosMemoryMalloc(LOG_MAX_LINE_DUMP_BUFFER_SIZE);
int32_t len = taosBuildLogHead(buffer, "");
va_list argpointer;
va_start(argpointer, format);
len += vsnprintf(buffer + len, LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2 - len, format, argpointer);
va_end(argpointer);
buffer[len++] = '\n';
buffer[len] = 0;
atomic_add_fetch_64(&tsNumOfSlowLogs, 1);
if (tsAsyncLog) {
taosPushLogBuffer(tsLogObj.slowHandle, buffer, len);
} else {
taosWriteFile(tsLogObj.slowHandle->pFile, buffer, len);
}
taosMemoryFree(buffer);
}
void taosDumpData(unsigned char *msg, int32_t len) {
if (!osLogSpaceAvailable()) return;
taosUpdateLogNums(DEBUG_DUMP);
......@@ -568,6 +635,7 @@ static SLogBuff *taosLogBuffNew(int32_t bufSize) {
LOG_BUF_SIZE(pLogBuf) = bufSize;
pLogBuf->minBuffSize = bufSize / 10;
pLogBuf->stop = 0;
pLogBuf->writeInterval = LOG_DEFAULT_INTERVAL;
if (taosThreadMutexInit(&LOG_BUF_MUTEX(pLogBuf), NULL) < 0) goto _err;
// tsem_init(&(pLogBuf->buffNotEmpty), 0, 0);
......@@ -651,83 +719,78 @@ static int32_t taosGetLogRemainSize(SLogBuff *pLogBuf, int32_t start, int32_t en
}
static void taosWriteLog(SLogBuff *pLogBuf) {
static int32_t lastDuration = 0;
int32_t remainChecked = 0;
int32_t start, end, pollSize;
do {
if (remainChecked == 0) {
start = LOG_BUF_START(pLogBuf);
end = LOG_BUF_END(pLogBuf);
if (start == end) {
dbgEmptyW++;
tsWriteInterval = LOG_MAX_INTERVAL;
return;
}
int32_t start = LOG_BUF_START(pLogBuf);
int32_t end = LOG_BUF_END(pLogBuf);
pollSize = taosGetLogRemainSize(pLogBuf, start, end);
if (pollSize < pLogBuf->minBuffSize) {
lastDuration += tsWriteInterval;
if (lastDuration < LOG_MAX_WAIT_MSEC) {
break;
}
}
if (start == end) {
dbgEmptyW++;
pLogBuf->writeInterval = LOG_MAX_INTERVAL;
return;
}
lastDuration = 0;
int32_t pollSize = taosGetLogRemainSize(pLogBuf, start, end);
if (pollSize < pLogBuf->minBuffSize) {
pLogBuf->lastDuration += pLogBuf->writeInterval;
if (pLogBuf->lastDuration < LOG_MAX_WAIT_MSEC) {
return;
}
}
if (start < end) {
taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf) + start, pollSize);
} else {
int32_t tsize = LOG_BUF_SIZE(pLogBuf) - start;
taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf) + start, tsize);
pLogBuf->lastDuration = 0;
taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf), end);
}
if (start < end) {
taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf) + start, pollSize);
} else {
int32_t tsize = LOG_BUF_SIZE(pLogBuf) - start;
taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf) + start, tsize);
dbgWN++;
dbgWSize += pollSize;
taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf), end);
}
if (pollSize < pLogBuf->minBuffSize) {
dbgSmallWN++;
if (tsWriteInterval < LOG_MAX_INTERVAL) {
tsWriteInterval += LOG_INTERVAL_STEP;
}
} else if (pollSize > LOG_BUF_SIZE(pLogBuf) / 3) {
dbgBigWN++;
tsWriteInterval = LOG_MIN_INTERVAL;
} else if (pollSize > LOG_BUF_SIZE(pLogBuf) / 4) {
if (tsWriteInterval > LOG_MIN_INTERVAL) {
tsWriteInterval -= LOG_INTERVAL_STEP;
}
}
dbgWN++;
dbgWSize += pollSize;
LOG_BUF_START(pLogBuf) = (LOG_BUF_START(pLogBuf) + pollSize) % LOG_BUF_SIZE(pLogBuf);
if (pollSize < pLogBuf->minBuffSize) {
dbgSmallWN++;
if (pLogBuf->writeInterval < LOG_MAX_INTERVAL) {
pLogBuf->writeInterval += LOG_INTERVAL_STEP;
}
} else if (pollSize > LOG_BUF_SIZE(pLogBuf) / 3) {
dbgBigWN++;
pLogBuf->writeInterval = LOG_MIN_INTERVAL;
} else if (pollSize > LOG_BUF_SIZE(pLogBuf) / 4) {
if (pLogBuf->writeInterval > LOG_MIN_INTERVAL) {
pLogBuf->writeInterval -= LOG_INTERVAL_STEP;
}
}
start = LOG_BUF_START(pLogBuf);
end = LOG_BUF_END(pLogBuf);
LOG_BUF_START(pLogBuf) = (LOG_BUF_START(pLogBuf) + pollSize) % LOG_BUF_SIZE(pLogBuf);
pollSize = taosGetLogRemainSize(pLogBuf, start, end);
if (pollSize < pLogBuf->minBuffSize) {
break;
}
start = LOG_BUF_START(pLogBuf);
end = LOG_BUF_END(pLogBuf);
tsWriteInterval = LOG_MIN_INTERVAL;
pollSize = taosGetLogRemainSize(pLogBuf, start, end);
if (pollSize < pLogBuf->minBuffSize) {
return;
}
remainChecked = 1;
} while (1);
pLogBuf->writeInterval = 0;
}
static void *taosAsyncOutputLog(void *param) {
SLogBuff *pLogBuf = (SLogBuff *)param;
SLogBuff *pLogBuf = (SLogBuff *)tsLogObj.logHandle;
SLogBuff *pSlowBuf = (SLogBuff *)tsLogObj.slowHandle;
setThreadName("log");
int32_t count = 0;
int32_t updateCron = 0;
int32_t writeInterval = 0;
while (1) {
count += tsWriteInterval;
writeInterval = TMIN(pLogBuf->writeInterval, pSlowBuf->writeInterval);
count += writeInterval;
updateCron++;
taosMsleep(tsWriteInterval);
taosMsleep(writeInterval);
if (count > 1000) {
osUpdate();
count = 0;
......@@ -735,13 +798,14 @@ static void *taosAsyncOutputLog(void *param) {
// Polling the buffer
taosWriteLog(pLogBuf);
taosWriteLog(pSlowBuf);
if (updateCron >= 3600 * 24 * 40 / 2) {
taosUpdateDaylight();
updateCron = 0;
}
if (pLogBuf->stop) break;
if (pLogBuf->stop || pSlowBuf->stop) break;
}
return NULL;
......
......@@ -1115,6 +1115,7 @@ int32_t shellExecute() {
}
if (shell.conn == NULL) {
printf("failed to connect to server, reason: %s\n", taos_errstr(NULL));
fflush(stdout);
return -1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册