提交 9551a269 编写于 作者: D dapan1121

feat: support log slow query by default

上级 5fe99c5a
...@@ -103,7 +103,7 @@ int32_t* taosGetErrno(); ...@@ -103,7 +103,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_CHECKSUM_ERROR TAOS_DEF_ERROR_CODE(0, 0x011F) // internal #define TSDB_CODE_CHECKSUM_ERROR TAOS_DEF_ERROR_CODE(0, 0x011F) // internal
#define TSDB_CODE_COMPRESS_ERROR TAOS_DEF_ERROR_CODE(0, 0x0120) #define TSDB_CODE_COMPRESS_ERROR TAOS_DEF_ERROR_CODE(0, 0x0120)
#define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0121) // #define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0121)
#define TSDB_CODE_CFG_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0122) #define TSDB_CODE_CFG_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0122)
#define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0123) #define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0123)
#define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0124) #define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0124)
...@@ -118,9 +118,10 @@ int32_t* taosGetErrno(); ...@@ -118,9 +118,10 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MSG_ENCODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x012D) #define TSDB_CODE_MSG_ENCODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x012D)
#define TSDB_CODE_NO_ENOUGH_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012E) #define TSDB_CODE_NO_ENOUGH_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012E)
#define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130) // #define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130)
#define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131) // #define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131)
#define TSDB_CODE_IVLD_DATA_FMT TAOS_DEF_ERROR_CODE(0, 0x0132) // #define TSDB_CODE_INVALID_DATA_FMT TAOS_DEF_ERROR_CODE(0, 0x0132)
#define TSDB_CODE_INVALID_CFG_VALUE TAOS_DEF_ERROR_CODE(0, 0x0133)
//client //client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) #define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)
......
...@@ -83,6 +83,12 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons ...@@ -83,6 +83,12 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
#endif #endif
; ;
void taosPrintSlowLog(const char *format, ...)
#ifdef __GNUC__
__attribute__((format(printf, 1, 2)))
#endif
;
bool taosAssertDebug(bool condition, const char *file, int32_t line, const char *format, ...); bool taosAssertDebug(bool condition, const char *file, int32_t line, const char *format, ...);
bool taosAssertRelease(bool condition); bool taosAssertRelease(bool condition);
......
...@@ -42,7 +42,7 @@ SAppInfo appInfo; ...@@ -42,7 +42,7 @@ SAppInfo appInfo;
int64_t lastClusterId = 0; int64_t lastClusterId = 0;
int32_t clientReqRefPool = -1; int32_t clientReqRefPool = -1;
int32_t clientConnRefPool = -1; int32_t clientConnRefPool = -1;
int32_t clientStop = 0; int32_t clientStop = -1;
int32_t timestampDeltaLimit = 900; // s int32_t timestampDeltaLimit = 900; // s
...@@ -69,7 +69,6 @@ static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) { ...@@ -69,7 +69,6 @@ static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
} }
static void deregisterRequest(SRequestObj *pRequest) { static void deregisterRequest(SRequestObj *pRequest) {
const static int64_t SLOW_QUERY_INTERVAL = 3000000L; // todo configurable
if (pRequest == NULL) { if (pRequest == NULL) {
tscError("pRequest == NULL"); tscError("pRequest == NULL");
return; return;
...@@ -80,6 +79,7 @@ static void deregisterRequest(SRequestObj *pRequest) { ...@@ -80,6 +79,7 @@ static void deregisterRequest(SRequestObj *pRequest) {
int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1); int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
int32_t reqType = SLOW_LOG_TYPE_OTHERS;
int64_t duration = taosGetTimestampUs() - pRequest->metric.start; int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64
...@@ -95,6 +95,7 @@ static void deregisterRequest(SRequestObj *pRequest) { ...@@ -95,6 +95,7 @@ static void deregisterRequest(SRequestObj *pRequest) {
duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
pRequest->metric.planCostUs, pRequest->metric.execCostUs); pRequest->metric.planCostUs, pRequest->metric.execCostUs);
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
reqType = SLOW_LOG_TYPE_INSERT;
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64 tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
"us, planCost:%" PRId64 "us, exec:%" PRId64 "us", "us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
...@@ -102,12 +103,16 @@ static void deregisterRequest(SRequestObj *pRequest) { ...@@ -102,12 +103,16 @@ static void deregisterRequest(SRequestObj *pRequest) {
pRequest->metric.planCostUs, pRequest->metric.execCostUs); pRequest->metric.planCostUs, pRequest->metric.execCostUs);
atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration); atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
reqType = SLOW_LOG_TYPE_QUERY;
} }
} }
if (duration >= SLOW_QUERY_INTERVAL) { if (duration >= (tsSlowLogThreshold * 1000000UL)) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1); atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
tscWarnL("slow query: %s, duration:%" PRId64, pRequest->sqlstr, duration); if (tsSlowLogScope & reqType) {
taosPrintSlowLog("PID:%d, Conn:%u, QID:0x%" PRIx64 ", Start:%" PRId64 ", Duration:%" PRId64 "us, SQL:%s",
taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration, pRequest->sqlstr);
}
} }
releaseTscObj(pTscObj->id); releaseTscObj(pTscObj->id);
...@@ -427,8 +432,12 @@ static void *tscCrashReportThreadFp(void *param) { ...@@ -427,8 +432,12 @@ static void *tscCrashReportThreadFp(void *param) {
} }
#endif #endif
if (-1 != atomic_val_compare_exchange_32(&clientStop, -1, 0)) {
return NULL;
}
while (1) { while (1) {
if (clientStop) break; if (clientStop > 0) break;
if (loopTimes++ < reportPeriodNum) { if (loopTimes++ < reportPeriodNum) {
taosMsleep(sleepTime); taosMsleep(sleepTime);
continue; continue;
...@@ -466,7 +475,7 @@ static void *tscCrashReportThreadFp(void *param) { ...@@ -466,7 +475,7 @@ static void *tscCrashReportThreadFp(void *param) {
loopTimes = 0; loopTimes = 0;
} }
clientStop = -1; clientStop = -2;
return NULL; return NULL;
} }
......
...@@ -1248,6 +1248,11 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t ...@@ -1248,6 +1248,11 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
return NULL; return NULL;
} }
pRequest->sqlstr = taosStrdup("taos_connect");
if (pRequest->sqlstr) {
pRequest->sqlLen = strlen(pRequest->sqlstr);
}
SMsgSendInfo* body = buildConnectMsg(pRequest); SMsgSendInfo* body = buildConnectMsg(pRequest);
int64_t transporterId = 0; int64_t transporterId = 0;
......
...@@ -660,6 +660,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, ...@@ -660,6 +660,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
SMCreateStbReq pReq = {0}; SMCreateStbReq pReq = {0};
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SCmdMsgInfo pCmdMsg = {0}; SCmdMsgInfo pCmdMsg = {0};
char *pSql = NULL;
// put front for free // put front for free
pReq.numOfColumns = taosArrayGetSize(pColumns); pReq.numOfColumns = taosArrayGetSize(pColumns);
...@@ -667,32 +668,35 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, ...@@ -667,32 +668,35 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
pReq.numOfTags = taosArrayGetSize(pTags); pReq.numOfTags = taosArrayGetSize(pTags);
pReq.pTags = pTags; pReq.pTags = pTags;
code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
pRequest->syncQuery = true;
if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
if (action == SCHEMA_ACTION_CREATE_STABLE) { if (action == SCHEMA_ACTION_CREATE_STABLE) {
pReq.colVer = 1; pReq.colVer = 1;
pReq.tagVer = 1; pReq.tagVer = 1;
pReq.suid = 0; pReq.suid = 0;
pReq.source = TD_REQ_FROM_APP; pReq.source = TD_REQ_FROM_APP;
pSql = "sml_create_stable";
} else if (action == SCHEMA_ACTION_ADD_TAG || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) { } else if (action == SCHEMA_ACTION_ADD_TAG || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
pReq.colVer = pTableMeta->sversion; pReq.colVer = pTableMeta->sversion;
pReq.tagVer = pTableMeta->tversion + 1; pReq.tagVer = pTableMeta->tversion + 1;
pReq.suid = pTableMeta->uid; pReq.suid = pTableMeta->uid;
pReq.source = TD_REQ_FROM_TAOX; pReq.source = TD_REQ_FROM_TAOX;
pSql = (action == SCHEMA_ACTION_ADD_TAG) ? "sml_add_tag" : "sml_modify_tag_size";
} else if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE) { } else if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE) {
pReq.colVer = pTableMeta->sversion + 1; pReq.colVer = pTableMeta->sversion + 1;
pReq.tagVer = pTableMeta->tversion; pReq.tagVer = pTableMeta->tversion;
pReq.suid = pTableMeta->uid; pReq.suid = pTableMeta->uid;
pReq.source = TD_REQ_FROM_TAOX; pReq.source = TD_REQ_FROM_TAOX;
pSql = (action == SCHEMA_ACTION_ADD_COLUMN) ? "sml_add_column" : "sml_modify_column_size";
}
code = buildRequest(info->taos->id, pSql, strlen(pSql), NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
pRequest->syncQuery = true;
if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
} }
if (pReq.numOfTags == 0) { if (pReq.numOfTags == 0) {
...@@ -1514,6 +1518,44 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL ...@@ -1514,6 +1518,44 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
return code; return code;
} }
void smlSetReqSQL(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd) {
if (tsSlowLogScope & SLOW_LOG_TYPE_INSERT) {
int32_t len = 0;
int32_t rlen = 0;
char* p = NULL;
if (lines && lines[0]) {
len = strlen(lines[0]);
p = lines[0];
} else if (rawLine) {
if (rawLineEnd) {
len = rawLineEnd - rawLine;
} else {
len = strlen(rawLine);
}
p = rawLine;
}
if (NULL == p) {
return;
}
rlen = TMIN(len, TSDB_MAX_ALLOWED_SQL_LEN);
rlen = TMAX(rlen, 0);
char *sql = taosMemoryMalloc(rlen + 1);
if (NULL == sql) {
uError("malloc %d for sml sql failed", rlen + 1);
return;
}
memcpy(sql, p, rlen);
sql[rlen] = 0;
request->sqlstr = sql;
request->sqlLen = rlen;
}
}
TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, char *rawLineEnd, int numLines, TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, char *rawLineEnd, int numLines,
int protocol, int precision, int32_t ttl, int64_t reqid) { int protocol, int precision, int32_t ttl, int64_t reqid) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -1546,6 +1588,8 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, ...@@ -1546,6 +1588,8 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE; info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
info->lineNum = numLines; info->lineNum = numLines;
smlSetReqSQL(request, lines, rawLine, rawLineEnd);
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
if (request->pDb == NULL) { if (request->pDb == NULL) {
request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
......
...@@ -500,7 +500,7 @@ int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) ...@@ -500,7 +500,7 @@ int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal)
break; break;
default: default:
ASSERTS(0, "invalid row format"); ASSERTS(0, "invalid row format");
return TSDB_CODE_IVLD_DATA_FMT; return TSDB_CODE_INVALID_DATA_FMT;
} }
if (bv == BIT_FLG_NONE) { if (bv == BIT_FLG_NONE) {
...@@ -938,7 +938,7 @@ static int32_t tRowTupleUpsertColData(SRow *pRow, STSchema *pTSchema, SColData * ...@@ -938,7 +938,7 @@ static int32_t tRowTupleUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *
break; break;
default: default:
ASSERTS(0, "Invalid row flag"); ASSERTS(0, "Invalid row flag");
return TSDB_CODE_IVLD_DATA_FMT; return TSDB_CODE_INVALID_DATA_FMT;
} }
while (pColData) { while (pColData) {
...@@ -963,7 +963,7 @@ static int32_t tRowTupleUpsertColData(SRow *pRow, STSchema *pTSchema, SColData * ...@@ -963,7 +963,7 @@ static int32_t tRowTupleUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *
break; break;
default: default:
ASSERTS(0, "Invalid row flag"); ASSERTS(0, "Invalid row flag");
return TSDB_CODE_IVLD_DATA_FMT; return TSDB_CODE_INVALID_DATA_FMT;
} }
if (bv == BIT_FLG_NONE) { if (bv == BIT_FLG_NONE) {
...@@ -1054,7 +1054,7 @@ static int32_t tRowKVUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aCo ...@@ -1054,7 +1054,7 @@ static int32_t tRowKVUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aCo
pData = pv + ((uint32_t *)pKVIdx->idx)[iCol]; pData = pv + ((uint32_t *)pKVIdx->idx)[iCol];
} else { } else {
ASSERTS(0, "Invalid KV row format"); ASSERTS(0, "Invalid KV row format");
return TSDB_CODE_IVLD_DATA_FMT; return TSDB_CODE_INVALID_DATA_FMT;
} }
int16_t cid; int16_t cid;
......
...@@ -724,7 +724,13 @@ static int32_t taosSetSlowLogScope(char *pScope) { ...@@ -724,7 +724,13 @@ static int32_t taosSetSlowLogScope(char *pScope) {
return 0; return 0;
} }
if (0 == strcasecmp(pScope, "none")) {
tsSlowLogScope = 0;
return 0;
}
uError("Invalid slowLog scope value:%s", pScope); uError("Invalid slowLog scope value:%s", pScope);
terrno = TSDB_CODE_INVALID_CFG_VALUE;
return -1; return -1;
} }
...@@ -1199,7 +1205,9 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { ...@@ -1199,7 +1205,9 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
} else if (strcasecmp("slowLogThreshold", name) == 0) { } else if (strcasecmp("slowLogThreshold", name) == 0) {
tsSlowLogThreshold = cfgGetItem(pCfg, "slowLogThreshold")->i32; tsSlowLogThreshold = cfgGetItem(pCfg, "slowLogThreshold")->i32;
} else if (strcasecmp("slowLogScope", name) == 0) { } else if (strcasecmp("slowLogScope", name) == 0) {
taosSetSlowLogScope(cfgGetItem(pCfg, "slowLogScope")->str) if (taosSetSlowLogScope(cfgGetItem(pCfg, "slowLogScope")->str)) {
return -1;
}
} }
break; break;
} }
......
...@@ -98,7 +98,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DISKSPACE, "No enough disk space" ...@@ -98,7 +98,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DISKSPACE, "No enough disk space"
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting up") TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting up")
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down") TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down")
TAOS_DEFINE_ERROR(TSDB_CODE_IVLD_DATA_FMT, "Invalid data format") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_DATA_FMT, "Invalid data format")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CFG_VALUE, "Invalid configuration value")
//client //client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")
......
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
#define LOG_MAX_LINE_SIZE (10024) #define LOG_MAX_LINE_SIZE (10024)
#define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3) #define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3)
#define LOG_MAX_LINE_DUMP_SIZE (1024 * 1024) #define LOG_MAX_LINE_DUMP_SIZE (1024 * 1024)
#define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 3) #define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 128)
#define LOG_FILE_NAME_LEN 300 #define LOG_FILE_NAME_LEN 300
#define LOG_DEFAULT_BUF_SIZE (20 * 1024 * 1024) // 20MB #define LOG_DEFAULT_BUF_SIZE (20 * 1024 * 1024) // 20MB
...@@ -52,6 +52,8 @@ typedef struct { ...@@ -52,6 +52,8 @@ typedef struct {
int32_t stop; int32_t stop;
TdThread asyncThread; TdThread asyncThread;
TdThreadMutex buffMutex; TdThreadMutex buffMutex;
int32_t writeInterval;
int32_t lastDuration;
} SLogBuff; } SLogBuff;
typedef struct { typedef struct {
...@@ -71,7 +73,6 @@ extern SConfig *tsCfg; ...@@ -71,7 +73,6 @@ extern SConfig *tsCfg;
static int8_t tsLogInited = 0; static int8_t tsLogInited = 0;
static SLogObj tsLogObj = {.fileNum = 1}; static SLogObj tsLogObj = {.fileNum = 1};
static int64_t tsAsyncLogLostLines = 0; static int64_t tsAsyncLogLostLines = 0;
static int32_t tsWriteInterval = LOG_DEFAULT_INTERVAL;
static int32_t tsDaylightActive; /* Currently in daylight saving time. */ static int32_t tsDaylightActive; /* Currently in daylight saving time. */
bool tsLogEmbedded = 0; bool tsLogEmbedded = 0;
...@@ -84,6 +85,7 @@ int64_t tsNumOfErrorLogs = 0; ...@@ -84,6 +85,7 @@ int64_t tsNumOfErrorLogs = 0;
int64_t tsNumOfInfoLogs = 0; int64_t tsNumOfInfoLogs = 0;
int64_t tsNumOfDebugLogs = 0; int64_t tsNumOfDebugLogs = 0;
int64_t tsNumOfTraceLogs = 0; int64_t tsNumOfTraceLogs = 0;
int64_t tsNumOfSlowLogs = 0;
// log // log
int32_t dDebugFlag = 131; int32_t dDebugFlag = 131;
...@@ -203,7 +205,6 @@ void taosCloseLog() { ...@@ -203,7 +205,6 @@ void taosCloseLog() {
taosThreadMutexDestroy(&tsLogObj.slowHandle->buffMutex); taosThreadMutexDestroy(&tsLogObj.slowHandle->buffMutex);
taosCloseFile(&tsLogObj.slowHandle->pFile); taosCloseFile(&tsLogObj.slowHandle->pFile);
taosMemoryFreeClear(tsLogObj.slowHandle->buffer); taosMemoryFreeClear(tsLogObj.slowHandle->buffer);
memset(&tsLogObj.slowHandle->buffer, 0, sizeof(tsLogObj.slowHandle->buffer));
taosMemoryFreeClear(tsLogObj.slowHandle); taosMemoryFreeClear(tsLogObj.slowHandle);
} }
...@@ -217,7 +218,6 @@ void taosCloseLog() { ...@@ -217,7 +218,6 @@ void taosCloseLog() {
taosThreadMutexDestroy(&tsLogObj.logHandle->buffMutex); taosThreadMutexDestroy(&tsLogObj.logHandle->buffMutex);
taosCloseFile(&tsLogObj.logHandle->pFile); taosCloseFile(&tsLogObj.logHandle->pFile);
taosMemoryFreeClear(tsLogObj.logHandle->buffer); taosMemoryFreeClear(tsLogObj.logHandle->buffer);
memset(&tsLogObj.logHandle->buffer, 0, sizeof(tsLogObj.logHandle->buffer));
taosThreadMutexDestroy(&tsLogObj.logMutex); taosThreadMutexDestroy(&tsLogObj.logMutex);
taosMemoryFreeClear(tsLogObj.logHandle); taosMemoryFreeClear(tsLogObj.logHandle);
} }
...@@ -555,10 +555,9 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons ...@@ -555,10 +555,9 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
va_list argpointer; va_list argpointer;
va_start(argpointer, format); va_start(argpointer, format);
len += vsnprintf(buffer + len, LOG_MAX_LINE_DUMP_BUFFER_SIZE - len, format, argpointer); len += vsnprintf(buffer + len, LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2 - len, format, argpointer);
va_end(argpointer); va_end(argpointer);
if (len > LOG_MAX_LINE_DUMP_SIZE) len = LOG_MAX_LINE_DUMP_SIZE;
buffer[len++] = '\n'; buffer[len++] = '\n';
buffer[len] = 0; buffer[len] = 0;
...@@ -566,6 +565,31 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons ...@@ -566,6 +565,31 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
taosMemoryFree(buffer); taosMemoryFree(buffer);
} }
void taosPrintSlowLog(const char *format, ...) {
if (!osLogSpaceAvailable()) return;
char *buffer = taosMemoryMalloc(LOG_MAX_LINE_DUMP_BUFFER_SIZE);
int32_t len = taosBuildLogHead(buffer, "");
va_list argpointer;
va_start(argpointer, format);
len += vsnprintf(buffer + len, LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2 - len, format, argpointer);
va_end(argpointer);
buffer[len++] = '\n';
buffer[len] = 0;
atomic_add_fetch_64(&tsNumOfSlowLogs, 1);
if (tsAsyncLog) {
taosPushLogBuffer(tsLogObj.slowHandle, buffer, len);
} else {
taosWriteFile(tsLogObj.slowHandle->pFile, buffer, len);
}
taosMemoryFree(buffer);
}
void taosDumpData(unsigned char *msg, int32_t len) { void taosDumpData(unsigned char *msg, int32_t len) {
if (!osLogSpaceAvailable()) return; if (!osLogSpaceAvailable()) return;
taosUpdateLogNums(DEBUG_DUMP); taosUpdateLogNums(DEBUG_DUMP);
...@@ -610,6 +634,7 @@ static SLogBuff *taosLogBuffNew(int32_t bufSize) { ...@@ -610,6 +634,7 @@ static SLogBuff *taosLogBuffNew(int32_t bufSize) {
LOG_BUF_SIZE(pLogBuf) = bufSize; LOG_BUF_SIZE(pLogBuf) = bufSize;
pLogBuf->minBuffSize = bufSize / 10; pLogBuf->minBuffSize = bufSize / 10;
pLogBuf->stop = 0; pLogBuf->stop = 0;
pLogBuf->writeInterval = LOG_DEFAULT_INTERVAL;
if (taosThreadMutexInit(&LOG_BUF_MUTEX(pLogBuf), NULL) < 0) goto _err; if (taosThreadMutexInit(&LOG_BUF_MUTEX(pLogBuf), NULL) < 0) goto _err;
// tsem_init(&(pLogBuf->buffNotEmpty), 0, 0); // tsem_init(&(pLogBuf->buffNotEmpty), 0, 0);
...@@ -693,31 +718,24 @@ static int32_t taosGetLogRemainSize(SLogBuff *pLogBuf, int32_t start, int32_t en ...@@ -693,31 +718,24 @@ static int32_t taosGetLogRemainSize(SLogBuff *pLogBuf, int32_t start, int32_t en
} }
static void taosWriteLog(SLogBuff *pLogBuf) { static void taosWriteLog(SLogBuff *pLogBuf) {
static int32_t lastDuration = 0; int32_t start = LOG_BUF_START(pLogBuf);
int32_t remainChecked = 0; int32_t end = LOG_BUF_END(pLogBuf);
int32_t start, end, pollSize;
do {
if (remainChecked == 0) {
start = LOG_BUF_START(pLogBuf);
end = LOG_BUF_END(pLogBuf);
if (start == end) { if (start == end) {
dbgEmptyW++; dbgEmptyW++;
tsWriteInterval = LOG_MAX_INTERVAL; pLogBuf->writeInterval = LOG_MAX_INTERVAL;
return; return;
} }
pollSize = taosGetLogRemainSize(pLogBuf, start, end); int32_t pollSize = taosGetLogRemainSize(pLogBuf, start, end);
if (pollSize < pLogBuf->minBuffSize) { if (pollSize < pLogBuf->minBuffSize) {
lastDuration += tsWriteInterval; pLogBuf->lastDuration += pLogBuf->writeInterval;
if (lastDuration < LOG_MAX_WAIT_MSEC) { if (pLogBuf->lastDuration < LOG_MAX_WAIT_MSEC) {
break; return;
} }
} }
lastDuration = 0; pLogBuf->lastDuration = 0;
}
if (start < end) { if (start < end) {
taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf) + start, pollSize); taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf) + start, pollSize);
...@@ -733,15 +751,15 @@ static void taosWriteLog(SLogBuff *pLogBuf) { ...@@ -733,15 +751,15 @@ static void taosWriteLog(SLogBuff *pLogBuf) {
if (pollSize < pLogBuf->minBuffSize) { if (pollSize < pLogBuf->minBuffSize) {
dbgSmallWN++; dbgSmallWN++;
if (tsWriteInterval < LOG_MAX_INTERVAL) { if (pLogBuf->writeInterval < LOG_MAX_INTERVAL) {
tsWriteInterval += LOG_INTERVAL_STEP; pLogBuf->writeInterval += LOG_INTERVAL_STEP;
} }
} else if (pollSize > LOG_BUF_SIZE(pLogBuf) / 3) { } else if (pollSize > LOG_BUF_SIZE(pLogBuf) / 3) {
dbgBigWN++; dbgBigWN++;
tsWriteInterval = LOG_MIN_INTERVAL; pLogBuf->writeInterval = LOG_MIN_INTERVAL;
} else if (pollSize > LOG_BUF_SIZE(pLogBuf) / 4) { } else if (pollSize > LOG_BUF_SIZE(pLogBuf) / 4) {
if (tsWriteInterval > LOG_MIN_INTERVAL) { if (pLogBuf->writeInterval > LOG_MIN_INTERVAL) {
tsWriteInterval -= LOG_INTERVAL_STEP; pLogBuf->writeInterval -= LOG_INTERVAL_STEP;
} }
} }
...@@ -752,24 +770,26 @@ static void taosWriteLog(SLogBuff *pLogBuf) { ...@@ -752,24 +770,26 @@ static void taosWriteLog(SLogBuff *pLogBuf) {
pollSize = taosGetLogRemainSize(pLogBuf, start, end); pollSize = taosGetLogRemainSize(pLogBuf, start, end);
if (pollSize < pLogBuf->minBuffSize) { if (pollSize < pLogBuf->minBuffSize) {
break; return;
} }
tsWriteInterval = LOG_MIN_INTERVAL; pLogBuf->writeInterval = 0;
remainChecked = 1;
} while (1);
} }
static void *taosAsyncOutputLog(void *param) { static void *taosAsyncOutputLog(void *param) {
SLogBuff *pLogBuf = (SLogBuff *)param; SLogBuff *pLogBuf = (SLogBuff *)tsLogObj.logHandle;
SLogBuff *pSlowBuf = (SLogBuff *)tsLogObj.slowHandle;
setThreadName("log"); setThreadName("log");
int32_t count = 0; int32_t count = 0;
int32_t updateCron = 0; int32_t updateCron = 0;
int32_t writeInterval = 0;
while (1) { while (1) {
count += tsWriteInterval; writeInterval = TMIN(pLogBuf->writeInterval, pSlowBuf->writeInterval);
count += writeInterval;
updateCron++; updateCron++;
taosMsleep(tsWriteInterval); taosMsleep(writeInterval);
if (count > 1000) { if (count > 1000) {
osUpdate(); osUpdate();
count = 0; count = 0;
...@@ -777,13 +797,14 @@ static void *taosAsyncOutputLog(void *param) { ...@@ -777,13 +797,14 @@ static void *taosAsyncOutputLog(void *param) {
// Polling the buffer // Polling the buffer
taosWriteLog(pLogBuf); taosWriteLog(pLogBuf);
taosWriteLog(pSlowBuf);
if (updateCron >= 3600 * 24 * 40 / 2) { if (updateCron >= 3600 * 24 * 40 / 2) {
taosUpdateDaylight(); taosUpdateDaylight();
updateCron = 0; updateCron = 0;
} }
if (pLogBuf->stop) break; if (pLogBuf->stop || pSlowBuf->stop) break;
} }
return NULL; return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册