diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 6fde7b48a29779b2a0c26fb58d360bbfc7ff221f..25a386b23dd2183c2af2161cc713a912046adda8 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -24,6 +24,12 @@ extern "C" { #endif +#define SLOW_LOG_TYPE_QUERY 0x1 +#define SLOW_LOG_TYPE_INSERT 0x2 +#define SLOW_LOG_TYPE_OTHERS 0x4 +#define SLOW_LOG_TYPE_ALL 0xFFFFFFFF + + // cluster extern char tsFirst[]; extern char tsSecond[]; @@ -118,6 +124,8 @@ extern int32_t tsRedirectFactor; extern int32_t tsRedirectMaxPeriod; extern int32_t tsMaxRetryWaitTime; extern bool tsUseAdapter; +extern int32_t tsSlowLogThreshold; +extern int32_t tsSlowLogScope; // client extern int32_t tsMinSlidingTime; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b22ce30894e93bbda986c829c8567138c61d31ea..142427ca6b24f130dd8cd73c22dd586036891f85 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -103,7 +103,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_CHECKSUM_ERROR TAOS_DEF_ERROR_CODE(0, 0x011F) // internal #define TSDB_CODE_COMPRESS_ERROR TAOS_DEF_ERROR_CODE(0, 0x0120) -#define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0121) // +#define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0121) #define TSDB_CODE_CFG_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0122) #define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0123) #define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0124) @@ -118,9 +118,10 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MSG_ENCODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x012D) #define TSDB_CODE_NO_ENOUGH_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012E) -#define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130) // -#define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131) // -#define TSDB_CODE_IVLD_DATA_FMT TAOS_DEF_ERROR_CODE(0, 0x0132) // +#define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130) +#define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131) +#define TSDB_CODE_INVALID_DATA_FMT TAOS_DEF_ERROR_CODE(0, 0x0132) +#define TSDB_CODE_INVALID_CFG_VALUE TAOS_DEF_ERROR_CODE(0, 0x0133) //client #define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) diff --git a/include/util/tlog.h b/include/util/tlog.h index 0071b3d32cd72be79ab5adf8bdba20b92c18f4f7..d267d5876a265d0aea8fae911f0ff0187ff5f9e3 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -83,6 +83,12 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons #endif ; +void taosPrintSlowLog(const char *format, ...) +#ifdef __GNUC__ + __attribute__((format(printf, 1, 2))) +#endif + ; + bool taosAssertDebug(bool condition, const char *file, int32_t line, const char *format, ...); bool taosAssertRelease(bool condition); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 99569fdb574bd2860e8163c9a1ba0e2d30eb5750..07ca3d89093c721aca6b51bfe0f107e01ca3240f 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -42,7 +42,7 @@ SAppInfo appInfo; int64_t lastClusterId = 0; int32_t clientReqRefPool = -1; int32_t clientConnRefPool = -1; -int32_t clientStop = 0; +int32_t clientStop = -1; int32_t timestampDeltaLimit = 900; // s @@ -69,7 +69,6 @@ static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) { } static void deregisterRequest(SRequestObj *pRequest) { - const static int64_t SLOW_QUERY_INTERVAL = 3000000L; // todo configurable if (pRequest == NULL) { tscError("pRequest == NULL"); return; @@ -80,6 +79,7 @@ static void deregisterRequest(SRequestObj *pRequest) { int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1); int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); + int32_t reqType = SLOW_LOG_TYPE_OTHERS; int64_t duration = taosGetTimestampUs() - pRequest->metric.start; tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 @@ -95,6 +95,7 @@ static void deregisterRequest(SRequestObj *pRequest) { duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, pRequest->metric.planCostUs, pRequest->metric.execCostUs); atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); + reqType = SLOW_LOG_TYPE_INSERT; } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64 "us, planCost:%" PRId64 "us, exec:%" PRId64 "us", @@ -102,12 +103,16 @@ static void deregisterRequest(SRequestObj *pRequest) { pRequest->metric.planCostUs, pRequest->metric.execCostUs); atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration); + reqType = SLOW_LOG_TYPE_QUERY; } } - if (duration >= SLOW_QUERY_INTERVAL) { + if (duration >= (tsSlowLogThreshold * 1000000UL)) { atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1); - tscWarnL("slow query: %s, duration:%" PRId64, pRequest->sqlstr, duration); + if (tsSlowLogScope & reqType) { + taosPrintSlowLog("PID:%d, Conn:%u, QID:0x%" PRIx64 ", Start:%" PRId64 ", Duration:%" PRId64 "us, SQL:%s", + taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration, pRequest->sqlstr); + } } releaseTscObj(pTscObj->id); @@ -427,8 +432,12 @@ static void *tscCrashReportThreadFp(void *param) { } #endif + if (-1 != atomic_val_compare_exchange_32(&clientStop, -1, 0)) { + return NULL; + } + while (1) { - if (clientStop) break; + if (clientStop > 0) break; if (loopTimes++ < reportPeriodNum) { taosMsleep(sleepTime); continue; @@ -466,7 +475,7 @@ static void *tscCrashReportThreadFp(void *param) { loopTimes = 0; } - clientStop = -1; + clientStop = -2; return NULL; } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index ce174744ef740e13dffb4ab4a44cdf5a601b0f29..f8eade1d7c62f3da22d765aea5b98622b34a0ba7 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1248,6 +1248,11 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t return NULL; } + pRequest->sqlstr = taosStrdup("taos_connect"); + if (pRequest->sqlstr) { + pRequest->sqlLen = strlen(pRequest->sqlstr); + } + SMsgSendInfo* body = buildConnectMsg(pRequest); int64_t transporterId = 0; @@ -1257,7 +1262,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t if (pRequest->code != TSDB_CODE_SUCCESS) { const char* errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code); - fprintf(stderr, "failed to connect to server, reason: %s\n\n", errorMsg); + tscError("failed to connect to server, reason: %s", errorMsg); terrno = pRequest->code; destroyRequest(pRequest); diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 17150286e1407e65a38e040877da7e053ca2e946..011a1b31119c2a69c50dd83d85a948388d2e25c0 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -660,6 +660,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, SMCreateStbReq pReq = {0}; int32_t code = TSDB_CODE_SUCCESS; SCmdMsgInfo pCmdMsg = {0}; + char *pSql = NULL; // put front for free pReq.numOfColumns = taosArrayGetSize(pColumns); @@ -667,32 +668,35 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, pReq.numOfTags = taosArrayGetSize(pTags); pReq.pTags = pTags; - code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest, 0); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } - - pRequest->syncQuery = true; - if (!pRequest->pDb) { - code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; - goto end; - } - if (action == SCHEMA_ACTION_CREATE_STABLE) { pReq.colVer = 1; pReq.tagVer = 1; pReq.suid = 0; pReq.source = TD_REQ_FROM_APP; + pSql = "sml_create_stable"; } else if (action == SCHEMA_ACTION_ADD_TAG || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) { pReq.colVer = pTableMeta->sversion; pReq.tagVer = pTableMeta->tversion + 1; pReq.suid = pTableMeta->uid; pReq.source = TD_REQ_FROM_TAOX; + pSql = (action == SCHEMA_ACTION_ADD_TAG) ? "sml_add_tag" : "sml_modify_tag_size"; } else if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE) { pReq.colVer = pTableMeta->sversion + 1; pReq.tagVer = pTableMeta->tversion; pReq.suid = pTableMeta->uid; pReq.source = TD_REQ_FROM_TAOX; + pSql = (action == SCHEMA_ACTION_ADD_COLUMN) ? "sml_add_column" : "sml_modify_column_size"; + } + + code = buildRequest(info->taos->id, pSql, strlen(pSql), NULL, false, &pRequest, 0); + if (code != TSDB_CODE_SUCCESS) { + goto end; + } + + pRequest->syncQuery = true; + if (!pRequest->pDb) { + code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; + goto end; } if (pReq.numOfTags == 0) { @@ -1514,6 +1518,44 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL return code; } +void smlSetReqSQL(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd) { + if (tsSlowLogScope & SLOW_LOG_TYPE_INSERT) { + int32_t len = 0; + int32_t rlen = 0; + char* p = NULL; + + if (lines && lines[0]) { + len = strlen(lines[0]); + p = lines[0]; + } else if (rawLine) { + if (rawLineEnd) { + len = rawLineEnd - rawLine; + } else { + len = strlen(rawLine); + } + p = rawLine; + } + + if (NULL == p) { + return; + } + + rlen = TMIN(len, TSDB_MAX_ALLOWED_SQL_LEN); + rlen = TMAX(rlen, 0); + + char *sql = taosMemoryMalloc(rlen + 1); + if (NULL == sql) { + uError("malloc %d for sml sql failed", rlen + 1); + return; + } + memcpy(sql, p, rlen); + sql[rlen] = 0; + + request->sqlstr = sql; + request->sqlLen = rlen; + } +} + TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, char *rawLineEnd, int numLines, int protocol, int precision, int32_t ttl, int64_t reqid) { int32_t code = TSDB_CODE_SUCCESS; @@ -1546,6 +1588,8 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE; info->lineNum = numLines; + smlSetReqSQL(request, lines, rawLine, rawLineEnd); + SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; if (request->pDb == NULL) { request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index f379084cf558a6e9d4225f52163beb1cfea8a8ce..796c5cf8ff5990b438a48dc6fa54037fe44807b4 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -500,7 +500,7 @@ int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) break; default: ASSERTS(0, "invalid row format"); - return TSDB_CODE_IVLD_DATA_FMT; + return TSDB_CODE_INVALID_DATA_FMT; } if (bv == BIT_FLG_NONE) { @@ -938,7 +938,7 @@ static int32_t tRowTupleUpsertColData(SRow *pRow, STSchema *pTSchema, SColData * break; default: ASSERTS(0, "Invalid row flag"); - return TSDB_CODE_IVLD_DATA_FMT; + return TSDB_CODE_INVALID_DATA_FMT; } while (pColData) { @@ -963,7 +963,7 @@ static int32_t tRowTupleUpsertColData(SRow *pRow, STSchema *pTSchema, SColData * break; default: ASSERTS(0, "Invalid row flag"); - return TSDB_CODE_IVLD_DATA_FMT; + return TSDB_CODE_INVALID_DATA_FMT; } if (bv == BIT_FLG_NONE) { @@ -1054,7 +1054,7 @@ static int32_t tRowKVUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aCo pData = pv + ((uint32_t *)pKVIdx->idx)[iCol]; } else { ASSERTS(0, "Invalid KV row format"); - return TSDB_CODE_IVLD_DATA_FMT; + return TSDB_CODE_INVALID_DATA_FMT; } int16_t cid; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index da4a91223815af34c5dfb679046a9e81bbe46edc..0a29a415f0bbba452e3722ff82eaaca0376e3f8b 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -117,6 +117,10 @@ int32_t tsRedirectFactor = 2; int32_t tsRedirectMaxPeriod = 1000; int32_t tsMaxRetryWaitTime = 10000; bool tsUseAdapter = false; +int32_t tsSlowLogThreshold = 3; // seconds +int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL; + + /* @@ -345,6 +349,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1; if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1; if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, 1) != 0) return -1; + if (cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 0, INT32_MAX, true) != 0) return -1; + if (cfgAddString(pCfg, "slowLogScope", "", true) != 0) return -1; tsNumOfRpcThreads = tsNumOfCores / 2; tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS); @@ -692,6 +698,42 @@ static void taosSetServerLogCfg(SConfig *pCfg) { metaDebugFlag = cfgGetItem(pCfg, "metaDebugFlag")->i32; } +static int32_t taosSetSlowLogScope(char *pScope) { + if (NULL == pScope || 0 == strlen(pScope)) { + tsSlowLogScope = SLOW_LOG_TYPE_ALL; + return 0; + } + + if (0 == strcasecmp(pScope, "all")) { + tsSlowLogScope = SLOW_LOG_TYPE_ALL; + return 0; + } + + if (0 == strcasecmp(pScope, "query")) { + tsSlowLogScope = SLOW_LOG_TYPE_QUERY; + return 0; + } + + if (0 == strcasecmp(pScope, "insert")) { + tsSlowLogScope = SLOW_LOG_TYPE_INSERT; + return 0; + } + + if (0 == strcasecmp(pScope, "others")) { + tsSlowLogScope = SLOW_LOG_TYPE_OTHERS; + return 0; + } + + if (0 == strcasecmp(pScope, "none")) { + tsSlowLogScope = 0; + return 0; + } + + uError("Invalid slowLog scope value:%s", pScope); + terrno = TSDB_CODE_INVALID_CFG_VALUE; + return -1; +} + static int32_t taosSetClientCfg(SConfig *pCfg) { tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN); tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32; @@ -742,6 +784,10 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsUseAdapter = cfgGetItem(pCfg, "useAdapter")->bval; tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval; tsQueryMaxConcurrentTables = cfgGetItem(pCfg, "queryMaxConcurrentTables")->i64; + tsSlowLogThreshold = cfgGetItem(pCfg, "slowLogThreshold")->i32; + if (taosSetSlowLogScope(cfgGetItem(pCfg, "slowLogScope")->str)) { + return -1; + } tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32; @@ -1156,6 +1202,12 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { sDebugFlag = cfgGetItem(pCfg, "sDebugFlag")->i32; } else if (strcasecmp("smaDebugFlag", name) == 0) { smaDebugFlag = cfgGetItem(pCfg, "smaDebugFlag")->i32; + } else if (strcasecmp("slowLogThreshold", name) == 0) { + tsSlowLogThreshold = cfgGetItem(pCfg, "slowLogThreshold")->i32; + } else if (strcasecmp("slowLogScope", name) == 0) { + if (taosSetSlowLogScope(cfgGetItem(pCfg, "slowLogScope")->str)) { + return -1; + } } break; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index dbdb40b93d39851c422e9fe2f4f1246d183ad25b..1d9d45f26f342277101ec1bd5eac82c0306e55a7 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -98,7 +98,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DISKSPACE, "No enough disk space" TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting up") TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down") -TAOS_DEFINE_ERROR(TSDB_CODE_IVLD_DATA_FMT, "Invalid data format") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_DATA_FMT, "Invalid data format") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CFG_VALUE, "Invalid configuration value") //client TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation") diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index a3d3c399abcd6040cf042dec045024549e2d3179..baf6a9f319e013af20fa00cb8e23dc3dfc900afd 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -24,10 +24,11 @@ #define LOG_MAX_LINE_SIZE (10024) #define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3) #define LOG_MAX_LINE_DUMP_SIZE (1024 * 1024) -#define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 3) +#define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 128) #define LOG_FILE_NAME_LEN 300 #define LOG_DEFAULT_BUF_SIZE (20 * 1024 * 1024) // 20MB +#define LOG_SLOW_BUF_SIZE (10 * 1024 * 1024) // 10MB #define LOG_DEFAULT_INTERVAL 25 #define LOG_INTERVAL_STEP 5 @@ -51,6 +52,8 @@ typedef struct { int32_t stop; TdThread asyncThread; TdThreadMutex buffMutex; + int32_t writeInterval; + int32_t lastDuration; } SLogBuff; typedef struct { @@ -62,6 +65,7 @@ typedef struct { pid_t pid; char logName[LOG_FILE_NAME_LEN]; SLogBuff *logHandle; + SLogBuff *slowHandle; TdThreadMutex logMutex; } SLogObj; @@ -69,7 +73,6 @@ extern SConfig *tsCfg; static int8_t tsLogInited = 0; static SLogObj tsLogObj = {.fileNum = 1}; static int64_t tsAsyncLogLostLines = 0; -static int32_t tsWriteInterval = LOG_DEFAULT_INTERVAL; static int32_t tsDaylightActive; /* Currently in daylight saving time. */ bool tsLogEmbedded = 0; @@ -82,6 +85,7 @@ int64_t tsNumOfErrorLogs = 0; int64_t tsNumOfInfoLogs = 0; int64_t tsNumOfDebugLogs = 0; int64_t tsNumOfTraceLogs = 0; +int64_t tsNumOfSlowLogs = 0; // log int32_t dDebugFlag = 131; @@ -136,6 +140,34 @@ static int32_t taosStartLog() { return 0; } +int32_t taosInitSlowLog() { + char fullName[PATH_MAX] = {0}; + char logFileName[64] = {0}; +#ifdef CUS_PROMPT + snprintf(logFileName, 64, "%sSlowLog", CUS_PROMPT); +#else + snprintf(logFileName, 64, "taosSlowLog"); +#endif + + if (strlen(tsLogDir) != 0) { + snprintf(fullName, PATH_MAX, "%s" TD_DIRSEP "%s", tsLogDir, logFileName); + } else { + snprintf(fullName, PATH_MAX, "%s", logFileName); + } + + tsLogObj.slowHandle = taosLogBuffNew(LOG_SLOW_BUF_SIZE); + if (tsLogObj.slowHandle == NULL) return -1; + + taosUmaskFile(0); + tsLogObj.slowHandle->pFile = taosOpenFile(fullName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + if (tsLogObj.slowHandle->pFile == NULL) { + printf("\nfailed to open slow log file:%s, reason:%s\n", fullName, strerror(errno)); + return -1; + } + + return 0; +} + int32_t taosInitLog(const char *logName, int32_t maxFiles) { if (atomic_val_compare_exchange_8(&tsLogInited, 0, 1) != 0) return 0; osUpdate(); @@ -151,6 +183,8 @@ int32_t taosInitLog(const char *logName, int32_t maxFiles) { tsLogObj.logHandle = taosLogBuffNew(LOG_DEFAULT_BUF_SIZE); if (tsLogObj.logHandle == NULL) return -1; if (taosOpenLogFile(fullName, tsNumOfLogLines, maxFiles) < 0) return -1; + + if (taosInitSlowLog() < 0) return -1; if (taosStartLog() < 0) return -1; return 0; } @@ -159,25 +193,34 @@ static void taosStopLog() { if (tsLogObj.logHandle) { tsLogObj.logHandle->stop = 1; } + if (tsLogObj.slowHandle) { + tsLogObj.slowHandle->stop = 1; + } } void taosCloseLog() { + taosStopLog(); + + if (tsLogObj.logHandle != NULL && taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) { + taosThreadJoin(tsLogObj.logHandle->asyncThread, NULL); + taosThreadClear(&tsLogObj.logHandle->asyncThread); + } + + if (tsLogObj.slowHandle != NULL) { + taosThreadMutexDestroy(&tsLogObj.slowHandle->buffMutex); + taosCloseFile(&tsLogObj.slowHandle->pFile); + taosMemoryFreeClear(tsLogObj.slowHandle->buffer); + taosMemoryFreeClear(tsLogObj.slowHandle); + } + if (tsLogObj.logHandle != NULL) { - taosStopLog(); - if (tsLogObj.logHandle != NULL && taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) { - taosThreadJoin(tsLogObj.logHandle->asyncThread, NULL); - taosThreadClear(&tsLogObj.logHandle->asyncThread); - } tsLogInited = 0; taosThreadMutexDestroy(&tsLogObj.logHandle->buffMutex); taosCloseFile(&tsLogObj.logHandle->pFile); taosMemoryFreeClear(tsLogObj.logHandle->buffer); - memset(&tsLogObj.logHandle->buffer, 0, sizeof(tsLogObj.logHandle->buffer)); taosThreadMutexDestroy(&tsLogObj.logMutex); taosMemoryFreeClear(tsLogObj.logHandle); - memset(&tsLogObj.logHandle, 0, sizeof(tsLogObj.logHandle)); - tsLogObj.logHandle = NULL; } } @@ -513,10 +556,9 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons va_list argpointer; va_start(argpointer, format); - len += vsnprintf(buffer + len, LOG_MAX_LINE_DUMP_BUFFER_SIZE - len, format, argpointer); + len += vsnprintf(buffer + len, LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2 - len, format, argpointer); va_end(argpointer); - if (len > LOG_MAX_LINE_DUMP_SIZE) len = LOG_MAX_LINE_DUMP_SIZE; buffer[len++] = '\n'; buffer[len] = 0; @@ -524,6 +566,31 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons taosMemoryFree(buffer); } +void taosPrintSlowLog(const char *format, ...) { + if (!osLogSpaceAvailable()) return; + + char *buffer = taosMemoryMalloc(LOG_MAX_LINE_DUMP_BUFFER_SIZE); + int32_t len = taosBuildLogHead(buffer, ""); + + va_list argpointer; + va_start(argpointer, format); + len += vsnprintf(buffer + len, LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2 - len, format, argpointer); + va_end(argpointer); + + buffer[len++] = '\n'; + buffer[len] = 0; + + atomic_add_fetch_64(&tsNumOfSlowLogs, 1); + + if (tsAsyncLog) { + taosPushLogBuffer(tsLogObj.slowHandle, buffer, len); + } else { + taosWriteFile(tsLogObj.slowHandle->pFile, buffer, len); + } + + taosMemoryFree(buffer); +} + void taosDumpData(unsigned char *msg, int32_t len) { if (!osLogSpaceAvailable()) return; taosUpdateLogNums(DEBUG_DUMP); @@ -568,6 +635,7 @@ static SLogBuff *taosLogBuffNew(int32_t bufSize) { LOG_BUF_SIZE(pLogBuf) = bufSize; pLogBuf->minBuffSize = bufSize / 10; pLogBuf->stop = 0; + pLogBuf->writeInterval = LOG_DEFAULT_INTERVAL; if (taosThreadMutexInit(&LOG_BUF_MUTEX(pLogBuf), NULL) < 0) goto _err; // tsem_init(&(pLogBuf->buffNotEmpty), 0, 0); @@ -651,83 +719,78 @@ static int32_t taosGetLogRemainSize(SLogBuff *pLogBuf, int32_t start, int32_t en } static void taosWriteLog(SLogBuff *pLogBuf) { - static int32_t lastDuration = 0; - int32_t remainChecked = 0; - int32_t start, end, pollSize; - - do { - if (remainChecked == 0) { - start = LOG_BUF_START(pLogBuf); - end = LOG_BUF_END(pLogBuf); - - if (start == end) { - dbgEmptyW++; - tsWriteInterval = LOG_MAX_INTERVAL; - return; - } + int32_t start = LOG_BUF_START(pLogBuf); + int32_t end = LOG_BUF_END(pLogBuf); - pollSize = taosGetLogRemainSize(pLogBuf, start, end); - if (pollSize < pLogBuf->minBuffSize) { - lastDuration += tsWriteInterval; - if (lastDuration < LOG_MAX_WAIT_MSEC) { - break; - } - } + if (start == end) { + dbgEmptyW++; + pLogBuf->writeInterval = LOG_MAX_INTERVAL; + return; + } - lastDuration = 0; + int32_t pollSize = taosGetLogRemainSize(pLogBuf, start, end); + if (pollSize < pLogBuf->minBuffSize) { + pLogBuf->lastDuration += pLogBuf->writeInterval; + if (pLogBuf->lastDuration < LOG_MAX_WAIT_MSEC) { + return; } + } - if (start < end) { - taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf) + start, pollSize); - } else { - int32_t tsize = LOG_BUF_SIZE(pLogBuf) - start; - taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf) + start, tsize); + pLogBuf->lastDuration = 0; - taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf), end); - } + if (start < end) { + taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf) + start, pollSize); + } else { + int32_t tsize = LOG_BUF_SIZE(pLogBuf) - start; + taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf) + start, tsize); - dbgWN++; - dbgWSize += pollSize; + taosWriteFile(pLogBuf->pFile, LOG_BUF_BUFFER(pLogBuf), end); + } - if (pollSize < pLogBuf->minBuffSize) { - dbgSmallWN++; - if (tsWriteInterval < LOG_MAX_INTERVAL) { - tsWriteInterval += LOG_INTERVAL_STEP; - } - } else if (pollSize > LOG_BUF_SIZE(pLogBuf) / 3) { - dbgBigWN++; - tsWriteInterval = LOG_MIN_INTERVAL; - } else if (pollSize > LOG_BUF_SIZE(pLogBuf) / 4) { - if (tsWriteInterval > LOG_MIN_INTERVAL) { - tsWriteInterval -= LOG_INTERVAL_STEP; - } - } + dbgWN++; + dbgWSize += pollSize; - LOG_BUF_START(pLogBuf) = (LOG_BUF_START(pLogBuf) + pollSize) % LOG_BUF_SIZE(pLogBuf); + if (pollSize < pLogBuf->minBuffSize) { + dbgSmallWN++; + if (pLogBuf->writeInterval < LOG_MAX_INTERVAL) { + pLogBuf->writeInterval += LOG_INTERVAL_STEP; + } + } else if (pollSize > LOG_BUF_SIZE(pLogBuf) / 3) { + dbgBigWN++; + pLogBuf->writeInterval = LOG_MIN_INTERVAL; + } else if (pollSize > LOG_BUF_SIZE(pLogBuf) / 4) { + if (pLogBuf->writeInterval > LOG_MIN_INTERVAL) { + pLogBuf->writeInterval -= LOG_INTERVAL_STEP; + } + } - start = LOG_BUF_START(pLogBuf); - end = LOG_BUF_END(pLogBuf); + LOG_BUF_START(pLogBuf) = (LOG_BUF_START(pLogBuf) + pollSize) % LOG_BUF_SIZE(pLogBuf); - pollSize = taosGetLogRemainSize(pLogBuf, start, end); - if (pollSize < pLogBuf->minBuffSize) { - break; - } + start = LOG_BUF_START(pLogBuf); + end = LOG_BUF_END(pLogBuf); - tsWriteInterval = LOG_MIN_INTERVAL; + pollSize = taosGetLogRemainSize(pLogBuf, start, end); + if (pollSize < pLogBuf->minBuffSize) { + return; + } - remainChecked = 1; - } while (1); + pLogBuf->writeInterval = 0; } static void *taosAsyncOutputLog(void *param) { - SLogBuff *pLogBuf = (SLogBuff *)param; + SLogBuff *pLogBuf = (SLogBuff *)tsLogObj.logHandle; + SLogBuff *pSlowBuf = (SLogBuff *)tsLogObj.slowHandle; + setThreadName("log"); int32_t count = 0; int32_t updateCron = 0; + int32_t writeInterval = 0; + while (1) { - count += tsWriteInterval; + writeInterval = TMIN(pLogBuf->writeInterval, pSlowBuf->writeInterval); + count += writeInterval; updateCron++; - taosMsleep(tsWriteInterval); + taosMsleep(writeInterval); if (count > 1000) { osUpdate(); count = 0; @@ -735,13 +798,14 @@ static void *taosAsyncOutputLog(void *param) { // Polling the buffer taosWriteLog(pLogBuf); + taosWriteLog(pSlowBuf); if (updateCron >= 3600 * 24 * 40 / 2) { taosUpdateDaylight(); updateCron = 0; } - if (pLogBuf->stop) break; + if (pLogBuf->stop || pSlowBuf->stop) break; } return NULL; diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 910b067d4e0bce5e289c136de29470591a446f2b..478801d3e7878996553c4ef2020d73810dc17088 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -1115,6 +1115,7 @@ int32_t shellExecute() { } if (shell.conn == NULL) { + printf("failed to connect to server, reason: %s\n", taos_errstr(NULL)); fflush(stdout); return -1; }