提交 71a10b91 编写于 作者: wmmhello's avatar wmmhello

Merge branch 'main' of https://github.com/taosdata/TDengine into fix/TS-2385

......@@ -2,7 +2,7 @@
# taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 4776778
GIT_TAG 4efbc10
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
......@@ -82,6 +82,10 @@ extern bool tsEnableTelem;
extern int32_t tsTelemInterval;
extern char tsTelemServer[];
extern uint16_t tsTelemPort;
extern bool tsEnableCrashReport;
extern char* tsTelemUri;
extern char* tsClientCrashReportUri;
extern char* tsSvrCrashReportUri;
// query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
......@@ -24,7 +24,7 @@ extern "C" {
typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag;
int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag);
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag);
#ifdef __cplusplus
......@@ -46,27 +46,73 @@ void taosSetTerminalMode();
int32_t taosGetOldTerminalMode();
void taosResetTerminalMode();
#define STACKSIZE 100
#if !defined(WINDOWS)
#define taosPrintTrace(flags, level, dflag) \
{ \
void* array[100]; \
int32_t size = backtrace(array, 100); \
char** strings = backtrace_symbols(array, size); \
if (strings != NULL) { \
taosPrintLog(flags, level, dflag, "obtained %d stack frames", size); \
for (int32_t i = 0; i < size; i++) { \
taosPrintLog(flags, level, dflag, "frame:%d, %s", i, strings[i]); \
} \
} \
taosMemoryFree(strings); \
#define taosLogTraceToBuf(buf, bufSize, ignoreNum) { \
void* array[STACKSIZE]; \
int32_t size = backtrace(array, STACKSIZE); \
char** strings = backtrace_symbols(array, size); \
int32_t offset = 0; \
if (strings != NULL) { \
offset = snprintf(buf, bufSize - 1, "obtained %d stack frames\n", (ignoreNum > 0) ? size - ignoreNum : size); \
for (int32_t i = (ignoreNum > 0) ? ignoreNum : 0; i < size; i++) { \
offset += snprintf(buf + offset, bufSize - 1 - offset, "frame:%d, %s\n", (ignoreNum > 0) ? i - ignoreNum : i, strings[i]); \
} \
} \
taosMemoryFree(strings); \
#define taosPrintTrace(flags, level, dflag, ignoreNum) \
{ \
void* array[STACKSIZE]; \
int32_t size = backtrace(array, STACKSIZE); \
char** strings = backtrace_symbols(array, size); \
if (strings != NULL) { \
taosPrintLog(flags, level, dflag, "obtained %d stack frames", (ignoreNum > 0) ? size - ignoreNum : size); \
for (int32_t i = (ignoreNum > 0) ? ignoreNum : 0; i < size; i++) { \
taosPrintLog(flags, level, dflag, "frame:%d, %s", (ignoreNum > 0) ? i - ignoreNum : i, strings[i]); \
} \
} \
taosMemoryFree(strings); \
#include <windows.h>
#include <dbghelp.h>
#define STACKSIZE 64
#define taosPrintTrace(flags, level, dflag) \
#define taosLogTraceToBuf(buf, bufSize, ignoreNum) { \
unsigned int i; \
void* stack[STACKSIZE]; \
unsigned short frames; \
SYMBOL_INFO* symbol; \
HANDLE process; \
int32_t offset = 0; \
process = GetCurrentProcess(); \
SymInitialize(process, NULL, TRUE); \
frames = CaptureStackBackTrace(0, STACKSIZE, stack, NULL); \
symbol = (SYMBOL_INFO*)calloc(sizeof(SYMBOL_INFO) + 256 * sizeof(char), 1); \
if (symbol != NULL) { \
symbol->MaxNameLen = 255; \
symbol->SizeOfStruct = sizeof(SYMBOL_INFO); \
if (frames > 0) { \
offset = snprintf(buf, bufSize - 1, "obtained %d stack frames\n", (ignoreNum > 0) ? frames - ignoreNum : frames); \
for (i = (ignoreNum > 0) ? ignoreNum : 0; i < frames; i++) { \
SymFromAddr(process, (DWORD64)(stack[i]), 0, symbol); \
offset += snprintf(buf + offset, bufSize - 1 - offset, "frame:%i, %s - 0x%0X\n", (ignoreNum > 0) ? i - ignoreNum : i, symbol->Name, symbol->Address); \
} \
} \
free(symbol); \
} \
#define taosPrintTrace(flags, level, dflag, ignoreNum) \
{ \
unsigned int i; \
void* stack[STACKSIZE]; \
......@@ -85,10 +131,10 @@ void taosResetTerminalMode();
symbol->SizeOfStruct = sizeof(SYMBOL_INFO); \
if (frames > 0) { \
taosPrintLog(flags, level, dflag, "obtained %d stack frames", frames); \
for (i = 0; i < frames; i++) { \
taosPrintLog(flags, level, dflag, "obtained %d stack frames\n", (ignoreNum > 0) ? frames - ignoreNum : frames); \
for (i = (ignoreNum > 0) ? ignoreNum : 0; i < frames; i++) { \
SymFromAddr(process, (DWORD64)(stack[i]), 0, symbol); \
taosPrintLog(flags, level, dflag, "frame:%i: %s - 0x%0X", frames - i - 1, symbol->Name, symbol->Address); \
taosPrintLog(flags, level, dflag, "frame:%i, %s - 0x%0X\n", (ignoreNum > 0) ? i - ignoreNum : i, symbol->Name, symbol->Address); \
} \
} \
free(symbol); \
......@@ -99,6 +99,11 @@ bool taosAssertRelease(bool condition);
void taosLogCrashInfo(char* nodeType, char* pMsg, int64_t msgLen, int signum, void *sigInfo);
void taosReadCrashInfo(char* filepath, char** pMsg, int64_t* pMsgLen, TdFilePtr* pFd);
void taosReleaseCrashLogFile(TdFilePtr pFile, bool truncateFile);
int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime);
// clang-format off
#define uFatal(...) { if (uDebugFlag & DEBUG_FATAL) { taosPrintLog("UTL FATAL", DEBUG_FATAL, tsLogEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }}
#define uError(...) { if (uDebugFlag & DEBUG_ERROR) { taosPrintLog("UTL ERROR ", DEBUG_ERROR, tsLogEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }}
......@@ -43,6 +43,9 @@
# Switch for allowing TDengine to collect and report service usage information
# telemetryReporting 1
# Switch for allowing TDengine to collect and report crash information
# crashReporting 1
# The maximum number of vnodes supported by this dnode
# supportVnodes 0
......@@ -313,6 +313,8 @@ extern SAppInfo appInfo;
extern int32_t clientReqRefPool;
extern int32_t clientConnRefPool;
extern int32_t timestampDeltaLimit;
extern int64_t lastClusterId;
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType);
......@@ -340,6 +342,7 @@ void resetConnectDB(STscObj* pTscObj);
int taos_options_imp(TSDB_OPTION option, const char* str);
void* openTransporter(const char* user, const char* auth, int32_t numOfThreads);
void tscStopCrashReport();
typedef struct AsyncArg {
SRpcMsg msg;
......@@ -28,13 +28,16 @@
#include "trpc.h"
#include "tsched.h"
#include "ttime.h"
#include "thttp.h"
SAppInfo appInfo;
int64_t lastClusterId = 0;
int32_t clientReqRefPool = -1;
int32_t clientConnRefPool = -1;
int32_t clientStop = 0;
int32_t timestampDeltaLimit = 900; // s
......@@ -385,6 +388,146 @@ void destroyRequest(SRequestObj *pRequest) {
void taosClientCrash(int signum, void *sigInfo, void *context) {
#if !defined(WINDOWS)
char *pMsg = NULL;
const char *flags = "UTL FATAL ";
ELogLevel level = DEBUG_FATAL;
int32_t dflag = 255;
int64_t msgLen= -1;
if (tsEnableCrashReport) {
if (taosGenCrashJsonMsg(signum, &pMsg, lastClusterId, appInfo.startTime)) {
taosPrintLog(flags, level, dflag, "failed to generate crash json msg");
goto _return;
} else {
msgLen = strlen(pMsg);
taosLogCrashInfo("taos", pMsg, msgLen, signum, sigInfo);
void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); }
static void *tscCrashReportThreadFp(void *param) {
char filepath[PATH_MAX] = {0};
snprintf(filepath, sizeof(filepath), "%s%s.taosCrashLog", tsLogDir, TD_DIRSEP);
char *pMsg = NULL;
int64_t msgLen = 0;
TdFilePtr pFile = NULL;
bool truncateFile = false;
int32_t sleepTime = 200;
int32_t reportPeriodNum = 3600 * 1000 / sleepTime;
int32_t loopTimes = reportPeriodNum;
#ifdef WINDOWS
if (taosCheckCurrentInDll()) {
while (1) {
if (clientStop) break;
if (loopTimes++ < reportPeriodNum) {
taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
if (pMsg && msgLen > 0) {
if (taosSendHttpReport(tsTelemServer, tsClientCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
tscError("failed to send crash report");
if (pFile) {
taosReleaseCrashLogFile(pFile, false);
} else {
tscInfo("succeed to send crash report");
truncateFile = true;
} else {
tscDebug("no crash info");
if (pMsg && msgLen > 0) {
pMsg = NULL;
if (pFile) {
taosReleaseCrashLogFile(pFile, truncateFile);
truncateFile = false;
loopTimes = 0;
clientStop = -1;
return NULL;
int32_t tscCrashReportInit() {
if (!tsEnableCrashReport) {
return 0;
TdThreadAttr thAttr;
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
TdThread crashReportThread;
if (taosThreadCreate(&crashReportThread, &thAttr, tscCrashReportThreadFp, NULL) != 0) {
tscError("failed to create crashReport thread since %s", strerror(errno));
return -1;
return 0;
void tscStopCrashReport() {
if (!tsEnableCrashReport) {
if (atomic_val_compare_exchange_32(&clientStop, 0, 1)) {
tscDebug("hb thread already stopped");
while (atomic_load_32(&clientStop) > 0) {
static void tscSetSignalHandle() {
#if !defined(WINDOWS)
taosSetSignal(SIGBUS, taosClientCrash);
taosSetSignal(SIGABRT, taosClientCrash);
taosSetSignal(SIGFPE, taosClientCrash);
taosSetSignal(SIGSEGV, taosClientCrash);
void taos_init_imp(void) {
// In the APIs of other program language, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
......@@ -392,6 +535,10 @@ void taos_init_imp(void) {
appInfo.pid = taosGetPId();
appInfo.startTime = taosGetTimestampMs();
appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (taosCreateLog("taoslog", 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
......@@ -404,6 +551,8 @@ void taos_init_imp(void) {
if (taosConvInit() != 0) {
......@@ -433,9 +582,8 @@ void taos_init_imp(void) {
taosGetAppName(appInfo.appName, NULL);
taosThreadMutexInit(&appInfo.mutex, NULL);
appInfo.pid = taosGetPId();
appInfo.startTime = taosGetTimestampMs();
appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
tscDebug("client is initialized successfully");
......@@ -1253,7 +1253,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
if (pRequest->code != TSDB_CODE_SUCCESS) {
const char* errorMsg =
......@@ -55,6 +55,8 @@ void taos_cleanup(void) {
int32_t id = clientReqRefPool;
clientReqRefPool = -1;
......@@ -106,7 +108,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
if (pass == NULL) {
STscObj *pObj = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY);
if (pObj) {
int64_t *rid = taosMemoryCalloc(1, sizeof(int64_t));
......@@ -119,6 +119,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
// update the appInstInfo
pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
lastClusterId = connectRsp.clusterId;
pTscObj->connType = connectRsp.connType;
......@@ -73,6 +73,11 @@ bool tsEnableTelem = true;
int32_t tsTelemInterval = 43200;
char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com";
uint16_t tsTelemPort = 80;
char* tsTelemUri = "/report";
bool tsEnableCrashReport = true;
char* tsClientCrashReportUri = "/ccrashreport";
char* tsSvrCrashReportUri = "/dcrashreport";
// schemaless
char tsSmlTagName[TSDB_COL_NAME_LEN] = "_tag_null";
......@@ -202,7 +207,9 @@ int32_t taosSetTfsCfg(SConfig *pCfg) {
int32_t taosSetTfsCfg(SConfig *pCfg);
struct SConfig *taosGetCfg() { return tsCfg; }
struct SConfig *taosGetCfg() {
return tsCfg;
static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile,
char *apolloUrl) {
......@@ -314,6 +321,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "maxMemUsedByInsert", tsMaxMemUsedByInsert, 1, INT32_MAX, true) != 0) return -1;
if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, 0) != 0) return -1;
if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1;
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1;
tsNumOfTaskQueueThreads = tsNumOfCores / 2;
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4);
......@@ -377,7 +385,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, 0) != 0) return -1;
tsNumOfRpcThreads = tsNumOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, 4);
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, TSDB_MAX_RPC_THREADS);
if (cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, 0) != 0) return -1;
tsNumOfCommitThreads = tsNumOfCores / 2;
......@@ -434,6 +442,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "monitorMaxLogs", tsMonitorMaxLogs, 1, 1000000, 0) != 0) return -1;
if (cfgAddBool(pCfg, "monitorComp", tsMonitorComp, 0) != 0) return -1;
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, 0) != 0) return -1;
if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "telemetryInterval", tsTelemInterval, 1, 200000, 0) != 0) return -1;
if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, 0) != 0) return -1;
......@@ -665,6 +674,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsQueryUseNodeAllocator = cfgGetItem(pCfg, "queryUseNodeAllocator")->bval;
tsKeepColumnName = cfgGetItem(pCfg, "keepColumnName")->bval;
tsUseAdapter = cfgGetItem(pCfg, "useAdapter")->bval;
tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval;
tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32;
return 0;
......@@ -715,7 +725,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfSnodeWriteThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
tsSIMDBuiltins = (bool) cfgGetItem(pCfg, "SIMD-builtins")->bval;
tsSIMDBuiltins = (bool)cfgGetItem(pCfg, "SIMD-builtins")->bval;
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
......@@ -726,6 +736,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsQueryRspPolicy = cfgGetItem(pCfg, "queryRspPolicy")->i32;
tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval;
tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval;
tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32;
tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN);
tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32;
......@@ -795,6 +806,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsCountAlwaysReturnValue = cfgGetItem(pCfg, "countAlwaysReturnValue")->i32;
} else if (strcasecmp("cDebugFlag", name) == 0) {
cDebugFlag = cfgGetItem(pCfg, "cDebugFlag")->i32;
} else if (strcasecmp("crashReporting", name) == 0) {
tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval;
......@@ -44,6 +44,7 @@ static struct {
char apolloUrl[PATH_MAX];
const char **envCmd;
SArray *pArgs; // SConfigPair
int64_t startTime;
} global = {0};
static void dmSetDebugFlag(int32_t signum, void *sigInfo, void *context) { taosSetAllDebugFlag(143, true); }
......@@ -67,23 +68,67 @@ static void dmStopDnode(int signum, void *sigInfo, void *context) {
void dmLogCrash(int signum, void *sigInfo, void *context) {
#ifndef WINDOWS
char *pMsg = NULL;
const char *flags = "UTL FATAL ";
ELogLevel level = DEBUG_FATAL;
int32_t dflag = 255;
int64_t msgLen= -1;
if (tsEnableCrashReport) {
if (taosGenCrashJsonMsg(signum, &pMsg, dmGetClusterId(), global.startTime)) {
taosPrintLog(flags, level, dflag, "failed to generate crash json msg");
goto _return;
} else {
msgLen = strlen(pMsg);
taosLogCrashInfo("taosd", pMsg, msgLen, signum, sigInfo);
static void dmSetSignalHandle() {
taosSetSignal(SIGUSR1, dmSetDebugFlag);
taosSetSignal(SIGUSR2, dmSetAssert);
taosSetSignal(SIGTERM, dmStopDnode);
taosSetSignal(SIGHUP, dmStopDnode);
taosSetSignal(SIGINT, dmStopDnode);
taosSetSignal(SIGABRT, dmStopDnode);
taosSetSignal(SIGBREAK, dmStopDnode);
#ifndef WINDOWS
taosSetSignal(SIGTSTP, dmStopDnode);
taosSetSignal(SIGQUIT, dmStopDnode);
#ifndef WINDOWS
taosSetSignal(SIGBUS, dmLogCrash);
taosSetSignal(SIGABRT, dmLogCrash);
taosSetSignal(SIGFPE, dmLogCrash);
taosSetSignal(SIGSEGV, dmLogCrash);
static int32_t dmParseArgs(int32_t argc, char const *argv[]) {
global.startTime = taosGetTimestampMs();
int32_t cmdEnvIndex = 0;
if (argc < 2) return 0;
global.envCmd = taosMemoryMalloc((argc - 1) * sizeof(char *));
memset(global.envCmd, 0, (argc - 1) * sizeof(char *));
for (int32_t i = 1; i < argc; ++i) {
......@@ -29,6 +29,7 @@ typedef struct SDnodeMgmt {
const char *name;
TdThread statusThread;
TdThread monitorThread;
TdThread crashReportThread;
SSingleWorker mgmtWorker;
ProcessCreateNodeFp processCreateNodeFp;
ProcessDropNodeFp processDropNodeFp;
......@@ -55,6 +56,8 @@ int32_t dmStartStatusThread(SDnodeMgmt *pMgmt);
void dmStopStatusThread(SDnodeMgmt *pMgmt);
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt);
void dmStopMonitorThread(SDnodeMgmt *pMgmt);
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt);
void dmStopCrashReportThread(SDnodeMgmt *pMgmt);
int32_t dmStartWorker(SDnodeMgmt *pMgmt);
void dmStopWorker(SDnodeMgmt *pMgmt);
......@@ -23,6 +23,9 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) {
if (dmStartMonitorThread(pMgmt) != 0) {
return -1;
if (dmStartCrashReportThread(pMgmt) != 0) {
return -1;
return 0;
......@@ -30,6 +33,7 @@ static void dmStopMgmt(SDnodeMgmt *pMgmt) {
pMgmt->pData->stopped = true;
static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
......@@ -15,6 +15,7 @@
#include "dmInt.h"
#include "thttp.h"
static void *dmStatusThreadFp(void *param) {
SDnodeMgmt *pMgmt = param;
......@@ -63,6 +64,63 @@ static void *dmMonitorThreadFp(void *param) {
return NULL;
static void *dmCrashReportThreadFp(void *param) {
SDnodeMgmt *pMgmt = param;
int64_t lastTime = taosGetTimestampMs();
char filepath[PATH_MAX] = {0};
snprintf(filepath, sizeof(filepath), "%s%s.taosdCrashLog", tsLogDir, TD_DIRSEP);
char *pMsg = NULL;
int64_t msgLen = 0;
TdFilePtr pFile = NULL;
bool truncateFile = false;
int32_t sleepTime = 200;
int32_t reportPeriodNum = 3600 * 1000 / sleepTime;;
int32_t loopTimes = reportPeriodNum;
while (1) {
if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
if (loopTimes++ < reportPeriodNum) {
taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
if (pMsg && msgLen > 0) {
if (taosSendHttpReport(tsTelemServer, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
dError("failed to send crash report");
if (pFile) {
taosReleaseCrashLogFile(pFile, false);
} else {
dInfo("succeed to send crash report");
truncateFile = true;
} else {
dDebug("no crash info");
if (pMsg && msgLen > 0) {
pMsg = NULL;
if (pFile) {
taosReleaseCrashLogFile(pFile, truncateFile);
truncateFile = false;
loopTimes = 0;
return NULL;
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
TdThreadAttr thAttr;
......@@ -105,6 +163,36 @@ void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) {
if (!tsEnableCrashReport) {
return 0;
TdThreadAttr thAttr;
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pMgmt->crashReportThread, &thAttr, dmCrashReportThreadFp, pMgmt) != 0) {
dError("failed to create crashReport thread since %s", strerror(errno));
return -1;
tmsgReportStartup("dnode-crashReport", "initialized");
return 0;
void dmStopCrashReportThread(SDnodeMgmt *pMgmt) {
if (!tsEnableCrashReport) {
if (taosCheckPthreadValid(pMgmt->crashReportThread)) {
taosThreadJoin(pMgmt->crashReportThread, NULL);
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SDnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
......@@ -15,6 +15,7 @@
#include "mmInt.h"
#include "tjson.h"
int32_t mmReadFile(const char *path, SMnodeOpt *pOption) {
......@@ -130,56 +131,70 @@ _OVER:
return code;
int32_t mmWriteFile(const char *path, const SMnodeOpt *pOption) {
char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%smnode.json.bak", path, TD_DIRSEP);
snprintf(realfile, sizeof(realfile), "%s%smnode.json", path, TD_DIRSEP);
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to write %s since %s", file, terrstr());
return -1;
int32_t len = 0;
int32_t maxLen = 4096;
char *content = taosMemoryCalloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
static int32_t mmEncodeOption(SJson *pJson, const SMnodeOpt *pOption) {
if (pOption->deploy && pOption->numOfReplicas > 0) {
len += snprintf(content + len, maxLen - len, " \"selfIndex\": %d,\n", pOption->selfIndex);
len += snprintf(content + len, maxLen - len, " \"replicas\": [{\n");
if (tjsonAddDoubleToObject(pJson, "selfIndex", pOption->selfIndex) < 0) return -1;
SJson *replicas = tjsonCreateArray();
if (replicas == NULL) return -1;
if (tjsonAddItemToObject(pJson, "replicas", replicas) < 0) return -1;
for (int32_t i = 0; i < pOption->numOfReplicas; ++i) {
SJson *replica = tjsonCreateObject();
if (replica == NULL) return -1;
const SReplica *pReplica = pOption->replicas + i;
if (pReplica != NULL && pReplica->id > 0) {
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port);
if (i < pOption->numOfReplicas - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }],\n");
if (tjsonAddDoubleToObject(replica, "id", pReplica->id) < 0) return -1;
if (tjsonAddStringToObject(replica, "fqdn", pReplica->fqdn) < 0) return -1;
if (tjsonAddDoubleToObject(replica, "port", pReplica->port) < 0) return -1;
if (tjsonAddItemToArray(replicas, replica) < 0) return -1;
len += snprintf(content + len, maxLen - len, " \"deployed\": %d\n", pOption->deploy);
len += snprintf(content + len, maxLen - len, "}\n");
taosWriteFile(pFile, content, len);
if (tjsonAddDoubleToObject(pJson, "deployed", pOption->deploy) < 0) return -1;
return 0;
int32_t mmWriteFile(const char *path, const SMnodeOpt *pOption) {
int32_t code = -1;
char *buffer = NULL;
SJson *pJson = NULL;
TdFilePtr pFile = NULL;
char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%smnode.json.bak", path, TD_DIRSEP);
snprintf(realfile, sizeof(realfile), "%s%smnode.json", path, TD_DIRSEP);
pJson = tjsonCreateObject();
if (pJson == NULL) goto _OVER;
if (mmEncodeOption(pJson, pOption) != 0) goto _OVER;
buffer = tjsonToString(pJson);
if (buffer == NULL) goto _OVER;
terrno = 0;
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) goto _OVER;
int32_t len = strlen(buffer);
if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER;
if (taosFsyncFile(pFile) < 0) goto _OVER;
if (taosRenameFile(file, realfile) != 0) goto _OVER;
if (taosRenameFile(file, realfile) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to rename %s since %s", file, terrstr());
return -1;
code = 0;
dInfo("succeed to write mnode file:%s, deloyed:%d", realfile, pOption->deploy);
dDebug("succeed to write %s, deployed:%d", realfile, pOption->deploy);
return 0;
if (pJson != NULL) tjsonDelete(pJson);
if (buffer != NULL) taosMemoryFree(buffer);
if (pFile != NULL) taosCloseFile(&pFile);
if (code != 0) {
if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to write mnode file:%s since %s, deloyed:%d", realfile, terrstr(), pOption->deploy);
return code;
......@@ -15,6 +15,7 @@
#include "vmInt.h"
#include "tjson.h"
#define MAX_CONTENT_LEN 2 * 1024 * 1024
......@@ -144,65 +145,66 @@ _OVER:
return code;
int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) {
int32_t code = 0;
char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%svnodes.json.bak", pMgmt->path, TD_DIRSEP);
snprintf(realfile, sizeof(file), "%s%svnodes.json", pMgmt->path, TD_DIRSEP);
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to write %s since %s", file, terrstr());
return -1;
int32_t numOfVnodes = 0;
SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
if (ppVnodes == NULL) {
code = -1;
dError("failed to write %s while get vnodelist", file);
goto _OVER;
int32_t len = 0;
int32_t maxLen = MAX_CONTENT_LEN;
char *content = taosMemoryCalloc(1, maxLen + 1);
if (content == NULL) {
code = -1;
dError("failed to write %s while malloc content", file);
goto _OVER;
static int32_t vmEncodeVnodeList(SJson *pJson, SVnodeObj **ppVnodes, int32_t numOfVnodes) {
SJson *vnodes = tjsonCreateArray();
if (vnodes == NULL) return -1;
if (tjsonAddItemToObject(pJson, "vnodes", vnodes) < 0) return -1;
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"vnodes\": [\n");
for (int32_t i = 0; i < numOfVnodes; ++i) {
SVnodeObj *pVnode = ppVnodes[i];
if (pVnode == NULL) continue;
len += snprintf(content + len, maxLen - len, " {\n");
len += snprintf(content + len, maxLen - len, " \"vgId\": %d,\n", pVnode->vgId);
len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pVnode->dropped);
len += snprintf(content + len, maxLen - len, " \"vgVersion\": %d\n", pVnode->vgVersion);
if (i < numOfVnodes - 1) {
len += snprintf(content + len, maxLen - len, " },\n");
} else {
len += snprintf(content + len, maxLen - len, " }\n");
SJson *vnode = tjsonCreateObject();
if (vnode == NULL) return -1;
if (tjsonAddDoubleToObject(vnode, "vgId", pVnode->vgId) < 0) return -1;
if (tjsonAddDoubleToObject(vnode, "dropped", pVnode->dropped) < 0) return -1;
if (tjsonAddDoubleToObject(vnode, "vgVersion", pVnode->vgVersion) < 0) return -1;
if (tjsonAddItemToArray(vnodes, vnode) < 0) return -1;
len += snprintf(content + len, maxLen - len, " ]\n");
len += snprintf(content + len, maxLen - len, "}\n");
return 0;
int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) {
int32_t code = -1;
char *buffer = NULL;
SJson *pJson = NULL;
TdFilePtr pFile = NULL;
SVnodeObj **ppVnodes = NULL;
char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%svnodes.json.bak", pMgmt->path, TD_DIRSEP);
snprintf(realfile, sizeof(realfile), "%s%svnodes.json", pMgmt->path, TD_DIRSEP);
int32_t numOfVnodes = 0;
ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
if (ppVnodes == NULL) goto _OVER;
pJson = tjsonCreateObject();
if (pJson == NULL) goto _OVER;
if (vmEncodeVnodeList(pJson, ppVnodes, numOfVnodes) != 0) goto _OVER;
buffer = tjsonToString(pJson);
if (buffer == NULL) goto _OVER;
terrno = 0;
taosWriteFile(pFile, content, len);
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) goto _OVER;
int32_t len = strlen(buffer);
if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER;
if (taosFsyncFile(pFile) < 0) goto _OVER;
if (taosRenameFile(file, realfile) != 0) goto _OVER;
code = 0;
dInfo("succeed to write vnodes file:%s, vnodes:%d", realfile, numOfVnodes);
if (pJson != NULL) tjsonDelete(pJson);
if (buffer != NULL) taosMemoryFree(buffer);
if (pFile != NULL) taosCloseFile(&pFile);
if (ppVnodes != NULL) {
for (int32_t i = 0; i < numOfVnodes; ++i) {
SVnodeObj *pVnode = ppVnodes[i];
......@@ -213,14 +215,9 @@ _OVER:
if (code != 0) return -1;
dInfo("succeed to write %s, numOfVnodes:%d", realfile, numOfVnodes);
code = taosRenameFile(file, realfile);
if (code != 0) {
dError("failed to rename %s to %s", file, realfile);
if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to write vnodes file:%s since %s, vnodes:%d", realfile, terrstr(), numOfVnodes);
return code;
\ No newline at end of file
......@@ -85,6 +85,7 @@ typedef struct SDnode {
// dmEnv.c
SDnode *dmInstance();
void dmReportStartup(const char *pName, const char *pDesc);
int64_t dmGetClusterId();
// dmMgmt.c
int32_t dmInitDnode(SDnode *pDnode);
......@@ -268,3 +268,8 @@ void dmReportStartup(const char *pName, const char *pDesc) {
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
dDebug("step:%s, %s", pStartup->name, pStartup->desc);
int64_t dmGetClusterId() {
return global.data.clusterId;
......@@ -111,6 +111,7 @@ static int32_t dmStartNodes(SDnode *pDnode) {
dInfo("TDengine initialized successfully");
dmReportStartup("TDengine", "initialized successfully");
return 0;
......@@ -15,6 +15,7 @@
#include "dmUtil.h"
#include "tjson.h"
#include "tmisce.h"
static void dmPrintEps(SDnodeData *pData);
......@@ -181,81 +182,73 @@ _OVER:
return code;
int32_t dmWriteEps(SDnodeData *pData) {
int32_t code = -1;
char *content = NULL;
TdFilePtr pFile = NULL;
static int32_t dmEncodeEps(SJson *pJson, SDnodeData *pData) {
if (tjsonAddDoubleToObject(pJson, "dnodeId", pData->dnodeId) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "dnodeVer", pData->dnodeVer) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "clusterId", pData->clusterId) < 0) return -1;
if (tjsonAddDoubleToObject(pJson, "dropped", pData->dropped) < 0) return -1;
char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%sdnode%sdnode.json.bak", tsDataDir, TD_DIRSEP, TD_DIRSEP);
snprintf(realfile, sizeof(realfile), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
dError("failed to open %s since %s", file, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _OVER;
int32_t len = 0;
int32_t maxLen = 256 * 1024;
content = taosMemoryCalloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pData->dnodeId);
len += snprintf(content + len, maxLen - len, " \"dnodeVer\": \"%" PRId64 "\",\n", pData->dnodeVer);
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pData->clusterId);
len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pData->dropped);
len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n");
SJson *dnodes = tjsonCreateArray();
if (dnodes == NULL) return -1;
if (tjsonAddItemToObject(pJson, "dnodes", dnodes) < 0) return -1;
int32_t numOfEps = (int32_t)taosArrayGetSize(pData->dnodeEps);
for (int32_t i = 0; i < numOfEps; ++i) {
SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, i);
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pDnodeEp->id);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pDnodeEp->ep.fqdn);
len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", pDnodeEp->ep.port);
len += snprintf(content + len, maxLen - len, " \"isMnode\": %d\n", pDnodeEp->isMnode);
if (i < numOfEps - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
SJson *dnode = tjsonCreateObject();
if (dnode == NULL) return -1;
if (tjsonAddDoubleToObject(dnode, "id", pDnodeEp->id) < 0) return -1;
if (tjsonAddStringToObject(dnode, "fqdn", pDnodeEp->ep.fqdn) < 0) return -1;
if (tjsonAddDoubleToObject(dnode, "port", pDnodeEp->ep.port) < 0) return -1;
if (tjsonAddDoubleToObject(dnode, "isMnode", pDnodeEp->isMnode) < 0) return -1;
if (tjsonAddItemToArray(dnodes, dnode) < 0) return -1;
len += snprintf(content + len, maxLen - len, "}\n");
if (taosWriteFile(pFile, content, len) != len) {
dError("failed to write %s since %s", file, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _OVER;
return 0;
if (taosFsyncFile(pFile) < 0) {
dError("failed to fsync %s since %s", file, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _OVER;
int32_t dmWriteEps(SDnodeData *pData) {
int32_t code = -1;
char *buffer = NULL;
SJson *pJson = NULL;
TdFilePtr pFile = NULL;
char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%sdnode%sdnode.json.bak", tsDataDir, TD_DIRSEP, TD_DIRSEP);
snprintf(realfile, sizeof(realfile), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
pJson = tjsonCreateObject();
if (pJson == NULL) goto _OVER;
if (dmEncodeEps(pJson, pData) != 0) goto _OVER;
buffer = tjsonToString(pJson);
if (buffer == NULL) goto _OVER;
terrno = 0;
if (taosRenameFile(file, realfile) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to rename %s since %s", file, terrstr());
goto _OVER;
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) goto _OVER;
int32_t len = strlen(buffer);
if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER;
if (taosFsyncFile(pFile) < 0) goto _OVER;
if (taosRenameFile(file, realfile) != 0) goto _OVER;
code = 0;
pData->updateTime = taosGetTimestampMs();
dInfo("succeed to write %s, dnodeVer:%" PRId64, realfile, pData->dnodeVer);
dInfo("succeed to write dnode file:%s, dnodeVer:%" PRId64, realfile, pData->dnodeVer);
if (content != NULL) taosMemoryFreeClear(content);
if (pJson != NULL) tjsonDelete(pJson);
if (buffer != NULL) taosMemoryFree(buffer);
if (pFile != NULL) taosCloseFile(&pFile);
if (code != 0) {
dError("failed to write file %s since %s", realfile, terrstr());
if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno);
dInfo("succeed to write dnode file:%s since %s, dnodeVer:%" PRId64, realfile, terrstr(), pData->dnodeVer);
return code;
......@@ -15,6 +15,7 @@
#include "dmUtil.h"
#include "tjson.h"
#define MAXLEN 1024
......@@ -63,56 +64,51 @@ _OVER:
return code;
static int32_t dmEncodeFile(SJson *pJson, bool deployed) {
if (tjsonAddDoubleToObject(pJson, "deployed", deployed) < 0) return -1;
return 0;
int32_t dmWriteFile(const char *path, const char *name, bool deployed) {
int32_t code = -1;
int32_t len = 0;
char content[MAXLEN + 1] = {0};
char *buffer = NULL;
SJson *pJson = NULL;
TdFilePtr pFile = NULL;
char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0};
TdFilePtr pFile = NULL;
snprintf(file, sizeof(file), "%s%s%s.json", path, TD_DIRSEP, name);
snprintf(realfile, sizeof(realfile), "%s%s%s.json", path, TD_DIRSEP, name);
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to write %s since %s", file, terrstr());
goto _OVER;
len += snprintf(content + len, MAXLEN - len, "{\n");
len += snprintf(content + len, MAXLEN - len, " \"deployed\": %d\n", deployed);
len += snprintf(content + len, MAXLEN - len, "}\n");
pJson = tjsonCreateObject();
if (pJson == NULL) goto _OVER;
if (dmEncodeFile(pJson, deployed) != 0) goto _OVER;
buffer = tjsonToString(pJson);
if (buffer == NULL) goto _OVER;
terrno = 0;
if (taosWriteFile(pFile, content, len) != len) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to write file:%s since %s", file, terrstr());
goto _OVER;
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) goto _OVER;
if (taosFsyncFile(pFile) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to fsync file:%s since %s", file, terrstr());
goto _OVER;
int32_t len = strlen(buffer);
if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER;
if (taosFsyncFile(pFile) < 0) goto _OVER;
if (taosRenameFile(file, realfile) != 0) goto _OVER;
if (taosRenameFile(file, realfile) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to rename %s since %s", file, terrstr());
return -1;
dInfo("succeed to write %s, deployed:%d", realfile, deployed);
code = 0;
dInfo("succeed to write file:%s, deloyed:%d", realfile, deployed);
if (pFile != NULL) {
if (pJson != NULL) tjsonDelete(pJson);
if (buffer != NULL) taosMemoryFree(buffer);
if (pFile != NULL) taosCloseFile(&pFile);
if (code != 0) {
if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to write file:%s since %s, deloyed:%d", realfile, terrstr(), deployed);
return code;
......@@ -34,6 +34,8 @@ SHashObj *mndDupDbHash(SHashObj *pOld);
SHashObj *mndDupTopicHash(SHashObj *pOld);
int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp,
int32_t *pRspLen);
int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db);
int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic);
#ifdef __cplusplus
......@@ -1051,17 +1051,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
if (mndDropStreamByDb(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropSmasByDb(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER;
SUserObj *pUser = mndAcquireUser(pMnode, pDb->createUser);
if (pUser != NULL) {
SSdbRaw *pCommitRaw = mndUserActionEncode(pUser);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
goto _OVER;
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (mndUserRemoveDb(pMnode, pTrans, pDb->name) != 0) goto _OVER;
int32_t rspLen = 0;
void *pRsp = NULL;
......@@ -131,7 +131,7 @@ static int32_t mndProcessTelemTimer(SRpcMsg* pReq) {
if (pCont != NULL) {
if (taosSendHttpReport(tsTelemServer, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT) != 0) {
if (taosSendHttpReport(tsTelemServer, tsTelemUri, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT) != 0) {
mError("failed to send telemetry report");
} else {
mInfo("succeed to send telemetry report");
......@@ -604,22 +604,19 @@ _OVER:
static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTopicObj *pTopic) {
int32_t code = -1;
if (mndUserRemoveTopic(pMnode, pTrans, pTopic->name) != 0) goto _OVER;
SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
return -1;
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto _OVER;
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
return -1;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
return 0;
return code;
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
......@@ -890,6 +887,7 @@ int32_t mndCheckTopicExist(SMnode *pMnode, SDbObj *pDb) {
return 0;
#if 0
int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
......@@ -917,3 +915,4 @@ int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
return code;
\ No newline at end of file
......@@ -285,14 +285,35 @@ static int32_t mndUserActionInsert(SSdb *pSdb, SUserObj *pUser) {
return 0;
static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
mTrace("user:%s, perform delete action, row:%p", pUser->user, pUser);
static int32_t mndUserDupObj(SUserObj *pUser, SUserObj *pNew) {
memcpy(pNew, pUser, sizeof(SUserObj));
pNew->updateTime = taosGetTimestampMs();
pNew->readDbs = mndDupDbHash(pUser->readDbs);
pNew->writeDbs = mndDupDbHash(pUser->writeDbs);
pNew->topics = mndDupTopicHash(pUser->topics);
if (pNew->readDbs == NULL || pNew->writeDbs == NULL || pNew->topics == NULL) {
return -1;
return 0;
static void mndUserFreeObj(SUserObj *pUser) {
pUser->readDbs = NULL;
pUser->writeDbs = NULL;
pUser->topics = NULL;
static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
mTrace("user:%s, perform delete action, row:%p", pUser->user, pUser);
return 0;
......@@ -516,19 +537,7 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
goto _OVER;
memcpy(&newUser, pUser, sizeof(SUserObj));
newUser.updateTime = taosGetTimestampMs();
newUser.readDbs = mndDupDbHash(pUser->readDbs);
newUser.writeDbs = mndDupDbHash(pUser->writeDbs);
newUser.topics = mndDupTopicHash(pUser->topics);
if (newUser.readDbs == NULL || newUser.writeDbs == NULL || newUser.topics == NULL) {
goto _OVER;
if (mndUserDupObj(pUser, &newUser) != 0) goto _OVER;
if (alterReq.alterType == TSDB_ALTER_USER_PASSWD) {
char pass[TSDB_PASSWORD_LEN + 1] = {0};
......@@ -654,9 +663,7 @@ _OVER:
mndReleaseUser(pMnode, pOperUser);
mndReleaseUser(pMnode, pUser);
return code;
......@@ -1007,3 +1014,74 @@ _OVER:
return code;
int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db) {
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
int32_t len = strlen(db) + 1;
void *pIter = NULL;
SUserObj *pUser = NULL;
SUserObj newUser = {0};
while (1) {
pIter = sdbFetch(pSdb, SDB_USER, pIter, (void **)&pUser);
if (pIter == NULL) break;
code = -1;
if (mndUserDupObj(pUser, &newUser) != 0) break;
bool inRead = (taosHashGet(newUser.readDbs, db, len) != NULL);
bool inWrite = (taosHashGet(newUser.writeDbs, db, len) != NULL);
if (inRead || inWrite) {
(void)taosHashRemove(newUser.readDbs, db, len);
(void)taosHashRemove(newUser.writeDbs, db, len);
SSdbRaw *pCommitRaw = mndUserActionEncode(&newUser);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) break;
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
sdbRelease(pSdb, pUser);
code = 0;
if (pUser != NULL) sdbRelease(pSdb, pUser);
if (pIter != NULL) sdbCancelFetch(pSdb, pIter);
return code;
int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic) {
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
int32_t len = strlen(topic) + 1;
void *pIter = NULL;
SUserObj *pUser = NULL;
SUserObj newUser = {0};
while (1) {
pIter = sdbFetch(pSdb, SDB_USER, pIter, (void **)&pUser);
if (pIter == NULL) break;
code = -1;
if (mndUserDupObj(pUser, &newUser) != 0) break;
bool inTopic = (taosHashGet(newUser.topics, topic, len) != NULL);
if (inTopic) {
(void)taosHashRemove(newUser.topics, topic, len);
SSdbRaw *pCommitRaw = mndUserActionEncode(&newUser);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) break;
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
sdbRelease(pSdb, pUser);
code = 0;
if (pUser != NULL) sdbRelease(pSdb, pUser);
if (pIter != NULL) sdbCancelFetch(pSdb, pIter);
return code;
......@@ -636,15 +636,20 @@ int32_t sdbStartWrite(SSdb *pSdb, SSdbIter **ppIter) {
int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply, int64_t index, int64_t term, int64_t config) {
int32_t code = 0;
int32_t code = -1;
if (!isApply) {
mInfo("sdbiter:%p, not apply to sdb", pIter);
return 0;
code = 0;
goto _OVER;
if (taosFsyncFile(pIter->file) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("sdbiter:%p, failed to fasync file %s since %s", pIter, pIter->name, terrstr());
goto _OVER;
pIter->file = NULL;
......@@ -653,14 +658,12 @@ int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply, int64_t index, i
if (taosRenameFile(pIter->name, datafile) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("sdbiter:%p, failed to rename file %s to %s since %s", pIter, pIter->name, datafile, terrstr());
return -1;
goto _OVER;
if (sdbReadFile(pSdb) != 0) {
mError("sdbiter:%p, failed to read from %s since %s", pIter, datafile, terrstr());
return -1;
goto _OVER;
if (config > 0) {
......@@ -674,8 +677,11 @@ int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply, int64_t index, i
mInfo("sdbiter:%p, success applyed to sdb", pIter);
code = 0;
return 0;
return code;
int32_t sdbDoWrite(SSdb *pSdb, SSdbIter *pIter, void *pBuf, int32_t len) {
......@@ -395,6 +395,10 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) {
int32_t code = 0;
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
if (!pSmaEnv) {
goto _exit;
#if 0
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
......@@ -192,6 +192,7 @@ static int32_t tsdbSnapNextRow(STsdbSnapReader* pReader) {
int64_t rowVer = pIter->bData.aVersion[pIter->iRow];
if (rowVer >= pReader->sver && rowVer <= pReader->ever) {
pIter->rInfo.suid = pIter->bData.suid;
pIter->rInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
goto _out;
......@@ -406,8 +406,10 @@ static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_
snprintf(dir, TSDB_FILENAME_LEN, "%s", pWriter->pVnode->path);
SVnode *pVnode = pWriter->pVnode;
SVnodeStats vndStats = pWriter->info.config.vndStats;
SVnode *pVnode = pWriter->pVnode;
pWriter->info.config = pVnode->config;
pWriter->info.config.vndStats = vndStats;
vDebug("vgId:%d, save config while write snapshot", pWriter->pVnode->config.vgId);
if (vnodeSaveInfo(dir, &pWriter->info) < 0) {
code = terrno;
......@@ -483,7 +483,7 @@ int32_t ctgInitTask(SCtgJob* pJob, CTG_TASK_TYPE type, void* param, int32_t* tas
CTG_UNLOCK(CTG_WRITE, &pJob->taskLock);
return code;
......@@ -905,6 +905,14 @@ int32_t ctgCallUserCb(void* param) {
void ctgUpdateJobErrCode(SCtgJob* pJob, int32_t errCode) {
if (!NEED_CLIENT_REFRESH_VG_ERROR(errCode) || errCode == TSDB_CODE_SUCCESS) return;
atomic_store_32(&pJob->jobResCode, errCode);
qDebug("QID:0x%" PRIx64 " ctg job errCode updated to %s", pJob->queryId, tstrerror(errCode));
int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
SCtgJob* pJob = pTask->pJob;
int32_t code = 0;
......@@ -924,6 +932,8 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
if (taskDone < taosArrayGetSize(pJob->pTasks)) {
qDebug("QID:0x%" PRIx64 " task done: %d, total: %d", pJob->queryId, taskDone,
ctgUpdateJobErrCode(pJob, rspCode);
......@@ -931,7 +941,8 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
pJob->jobResCode = code;
ctgUpdateJobErrCode(pJob, rspCode);
// pJob->jobResCode = code;
// taosSsleep(2);
// qDebug("QID:0x%" PRIx64 " ctg after sleep", pJob->queryId);
......@@ -1098,7 +1109,8 @@ _return:
if (code) {
ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, tstrerror(code));
ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname,
if (pTask->res || code) {
ctgHandleTaskEnd(pTask, code);
......@@ -1286,7 +1298,8 @@ _return:
TSWAP(pTask->res, ctx->pResList);
taskDone = true;
ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, tstrerror(code));
ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname,
if (pTask->res && taskDone) {
......@@ -108,7 +108,7 @@ static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
void tsortClearOrderdSource(SArray *pOrderedSource) {
void tsortClearOrderdSource(SArray* pOrderedSource) {
for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) {
SSortSource** pSource = taosArrayGet(pOrderedSource, i);
if (NULL == *pSource) {
......@@ -121,6 +121,12 @@ void tsortClearOrderdSource(SArray *pOrderedSource) {
if ((*pSource)->param && !(*pSource)->onlyRef) {
if (!(*pSource)->onlyRef && (*pSource)->src.pBlock) {
(*pSource)->src.pBlock = NULL;
......@@ -629,9 +635,9 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
SSortSource* source = *pSource;
SSortSource* source = *pSource;
*pSource = NULL;
while (1) {
......@@ -659,6 +665,10 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
if (source->param && !source->onlyRef) {
if (!source->onlyRef && source->src.pBlock) {
source->src.pBlock = NULL;
return code;
......@@ -672,6 +682,10 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
if (source->param && !source->onlyRef) {
if (!source->onlyRef && source->src.pBlock) {
source->src.pBlock = NULL;
return code;
......@@ -849,8 +863,8 @@ SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
SSortExecInfo info = {0};
if (pHandle == NULL) {
info.sortMethod = SORT_QSORT_T; // by default
info.sortBuffer = 2 * 1048576; // 2mb by default
info.sortMethod = SORT_QSORT_T; // by default
info.sortBuffer = 2 * 1048576; // 2mb by default
} else {
info.sortBuffer = pHandle->pageSize * pHandle->numOfPages;
info.sortMethod = pHandle->inMemSort ? SORT_QSORT_T : SORT_SPILLED_MERGE_SORT_T;
......@@ -73,10 +73,10 @@ void tMemBucketDestroy(tMemBucket *pBucket);
int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size);
double getPercentile(tMemBucket *pMemBucket, double percent);
int32_t getPercentile(tMemBucket *pMemBucket, double percent, double *result);
#ifdef __cplusplus
\ No newline at end of file
......@@ -1670,15 +1670,14 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
tMemBucket* pMemBucket = ppInfo->pMemBucket;
if (pMemBucket != NULL && pMemBucket->total > 0) { // check for null
SET_DOUBLE_VAL(&ppInfo->result, getPercentile(pMemBucket, v));
int32_t code = getPercentile(pMemBucket, v, &ppInfo->result);
if (code != TSDB_CODE_SUCCESS) {
return code;
if (ppInfo->result < 0) {
return functionFinalize(pCtx, pBlock);
......@@ -90,7 +90,7 @@ static void resetPosInfo(SSlotInfo *pInfo) {
pInfo->data = NULL;
double findOnlyResult(tMemBucket *pMemBucket) {
int32_t findOnlyResult(tMemBucket *pMemBucket, double *result) {
ASSERT(pMemBucket->total == 1);
for (int32_t i = 0; i < pMemBucket->numOfSlots; ++i) {
......@@ -108,17 +108,17 @@ double findOnlyResult(tMemBucket *pMemBucket) {
int32_t *pageId = taosArrayGet(list, 0);
SFilePage *pPage = getBufPage(pMemBucket->pBuffer, *pageId);
if (pPage == NULL) {
return -1;
ASSERT(pPage->num == 1);
double v = 0;
GET_TYPED_DATA(v, double, pMemBucket->type, pPage->data);
return v;
GET_TYPED_DATA(*result, double, pMemBucket->type, pPage->data);
return 0;
*result = 0.0;
int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) {
......@@ -440,7 +440,7 @@ static double getIdenticalDataVal(tMemBucket *pMemBucket, int32_t slotIndex) {
return finalResult;
double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) {
int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction, double *result) {
int32_t num = 0;
for (int32_t i = 0; i < pMemBucket->numOfSlots; ++i) {
......@@ -473,15 +473,15 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
ASSERT(minOfNextSlot > maxOfThisSlot);
double val = (1 - fraction) * maxOfThisSlot + fraction * minOfNextSlot;
return val;
*result = (1 - fraction) * maxOfThisSlot + fraction * minOfNextSlot;
if (pSlot->info.size <= pMemBucket->maxCapacity) {
// data in buffer and file are merged together to be processed.
SFilePage *buffer = loadDataFromFilePage(pMemBucket, i);
if (buffer == NULL) {
return -1;
int32_t currentIdx = count - num;
......@@ -492,13 +492,14 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
GET_TYPED_DATA(td, double, pMemBucket->type, thisVal);
GET_TYPED_DATA(nd, double, pMemBucket->type, nextVal);
double val = (1 - fraction) * td + fraction * nd;
*result = (1 - fraction) * td + fraction * nd;
return val;
} else { // incur a second round bucket split
if (isIdenticalData(pMemBucket, i)) {
return getIdenticalDataVal(pMemBucket, i);
*result = getIdenticalDataVal(pMemBucket, i);
// try next round
......@@ -518,37 +519,37 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
int32_t *pageId = taosArrayGet(list, f);
SFilePage *pg = getBufPage(pMemBucket->pBuffer, *pageId);
if (pg == NULL) {
return -1;
int32_t code = tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num);
if (code != TSDB_CODE_SUCCESS) {
return -1;
return code;
setBufPageDirty(pg, true);
releaseBufPage(pMemBucket->pBuffer, pg);
return getPercentileImpl(pMemBucket, count - num, fraction);
return getPercentileImpl(pMemBucket, count - num, fraction, result);
} else {
num += pSlot->info.size;
return 0;
*result = 0;
double getPercentile(tMemBucket *pMemBucket, double percent) {
int32_t getPercentile(tMemBucket *pMemBucket, double percent, double *result) {
if (pMemBucket->total == 0) {
return 0.0;
*result = 0.0;
// if only one elements exists, return it
if (pMemBucket->total == 1) {
if (findOnlyResult(pMemBucket) < 0) {
return -1;
return findOnlyResult(pMemBucket, result);
percent = fabs(percent);
......@@ -558,21 +559,21 @@ double getPercentile(tMemBucket *pMemBucket, double percent) {
MinMaxEntry *pRange = &pMemBucket->range;
if (IS_SIGNED_NUMERIC_TYPE(pMemBucket->type)) {
double v = (double)(fabs(percent - 100) < DBL_EPSILON ? pRange->i64MaxVal : pRange->i64MinVal);
return v;
*result = (double)(fabs(percent - 100) < DBL_EPSILON ? pRange->i64MaxVal : pRange->i64MinVal);
} else if (IS_UNSIGNED_NUMERIC_TYPE(pMemBucket->type)) {
double v = (double)(fabs(percent - 100) < DBL_EPSILON ? pRange->u64MaxVal : pRange->u64MinVal);
return v;
*result = (double)(fabs(percent - 100) < DBL_EPSILON ? pRange->u64MaxVal : pRange->u64MinVal);
} else {
return fabs(percent - 100) < DBL_EPSILON ? pRange->dMaxVal : pRange->dMinVal;
*result = fabs(percent - 100) < DBL_EPSILON ? pRange->dMaxVal : pRange->dMinVal;
double percentVal = (percent * (pMemBucket->total - 1)) / ((double)100.0);
// do put data by using buckets
int32_t orderIdx = (int32_t)percentVal;
return getPercentileImpl(pMemBucket, orderIdx, percentVal - orderIdx);
return getPercentileImpl(pMemBucket, orderIdx, percentVal - orderIdx, result);
......@@ -20,6 +20,7 @@
#include "ttime.h"
static SMonitor tsMonitor = {0};
static char* tsMonUri = "/report";
void monRecordLog(int64_t ts, ELogLevel level, const char *content) {
......@@ -550,7 +551,7 @@ void monSendReport() {
// uDebugL("report cont:%s\n", pCont);
if (pCont != NULL) {
EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT;
if (taosSendHttpReport(tsMonitor.cfg.server, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) {
if (taosSendHttpReport(tsMonitor.cfg.server, tsMonUri, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) {
uError("failed to send monitor msg");
......@@ -1334,6 +1334,7 @@ static int32_t createSetOpLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetO
if (TSDB_CODE_SUCCESS == code) {
pSetOp->precision = pSetOperator->precision;
*pLogicNode = (SLogicNode*)pSetOp;
} else {
......@@ -57,7 +57,6 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender);
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender);
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish);
int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
typedef struct SSyncSnapshotReceiver {
......@@ -82,7 +81,6 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver)
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver);
// on message
int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg);
......@@ -1247,7 +1247,7 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
#if 0
if (pSyncNode->pNewNodeReceiver != NULL) {
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
......@@ -1270,7 +1270,7 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
void syncNodePostClose(SSyncNode* pSyncNode) {
if (pSyncNode->pNewNodeReceiver != NULL) {
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
......@@ -1325,7 +1325,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
if (pSyncNode->pNewNodeReceiver != NULL) {
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
......@@ -1855,7 +1855,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
// close receiver
if (pSyncNode != NULL && pSyncNode->pNewNodeReceiver != NULL &&
snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
// stop elect timer
......@@ -71,31 +71,23 @@ int32_t syncWriteCfgFile(SSyncNode *pNode) {
char file[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s.bak", realfile);
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
sError("vgId:%d, failed to open sync cfg file:%s since %s", pNode->vgId, realfile, terrstr());
goto _OVER;
pJson = tjsonCreateObject();
if (pJson == NULL) goto _OVER;
if (tjsonAddObject(pJson, "RaftCfg", syncEncodeRaftCfg, pCfg) < 0) goto _OVER;
buffer = tjsonToString(pJson);
if (buffer == NULL) goto _OVER;
terrno = 0;
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) goto _OVER;
int32_t len = strlen(buffer);
if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER;
if (taosFsyncFile(pFile) < 0) goto _OVER;
if (taosRenameFile(file, realfile) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
sError("vgId:%d, failed to rename sync cfg file:%s to %s since %s", pNode->vgId, file, realfile, terrstr());
goto _OVER;
if (taosRenameFile(file, realfile) != 0) goto _OVER;
code = 0;
sInfo("vgId:%d, succeed to write sync cfg file:%s, len:%d", pNode->vgId, realfile, len);
......@@ -106,6 +98,7 @@ _OVER:
if (pFile != NULL) taosCloseFile(&pFile);
if (code != 0) {
if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno);
sError("vgId:%d, failed to write sync cfg file:%s since %s", pNode->vgId, realfile, terrstr());
return code;
......@@ -150,7 +150,7 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
// when sender receive ack, call this function to send msg from seq
// seq = ack + 1, already updated
int32_t snapshotSend(SSyncSnapshotSender *pSender) {
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
// free memory last time (current seq - 1)
if (pSender->pCurrentBlock != NULL) {
......@@ -342,23 +342,6 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }
// force stop
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
sRInfo(pReceiver, "snapshot receiver force stop, writer:%p");
// force close, abandon incomplete data
if (pReceiver->pWriter != NULL) {
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
if (ret != 0) {
sRInfo(pReceiver, "snapshot receiver force stop failed since %s", terrstr());
pReceiver->pWriter = NULL;
pReceiver->start = false;
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
if (pReceiver->pWriter != NULL) {
sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null");
......@@ -590,7 +573,7 @@ _START_RECEIVER:
if (snapshotReceiverIsStart(pReceiver)) {
sRInfo(pReceiver, "snapshot receiver already start and force stop pre one");
snapshotReceiverStart(pReceiver, pMsg); // set start-time same with sender
......@@ -842,7 +825,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
// force close, no response
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop");
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data");
code = syncNodeOnSnapshotReceive(pSyncNode, pMsg);
......@@ -989,6 +972,13 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
return syncNodeOnSnapshotPreRsp(pSyncNode, pSender, pMsg);
if (pSender->pReader == NULL || pSender->finish) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender invalid");
sSError(pSender, "snapshot sender invalid, pReader:%p finish:%d", pMsg->code, pSender->pReader, pSender->finish);
terrno = pMsg->code;
goto _ERROR;
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin");
if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
......@@ -100,14 +100,7 @@ typedef void* queue[2];
#define TRANS_READ_TIMEOUT 3000 // read timeout (ms)
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512
#define TRANS_MAGIC_NUM 0x5f375a86
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512
#define TRANS_MAGIC_NUM 0x5f375a86
#define TRANS_MAGIC_NUM 0x5f375a86
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
typedef SRpcMsg STransMsg;
......@@ -35,6 +35,7 @@ typedef struct SHttpModule {
typedef struct SHttpMsg {
queue q;
char* server;
char* uri;
int32_t port;
char* cont;
int32_t len;
......@@ -63,26 +64,26 @@ static void httpHandleReq(SHttpMsg* msg);
static void httpHandleQuit(SHttpMsg* msg);
static int32_t httpSendQuit();
static int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen,
static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag);
static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen,
static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, char* pHead, int32_t headLen,
EHttpCompFlag flag) {
if (flag == HTTP_FLAT) {
return snprintf(pHead, headLen,
"POST /report HTTP/1.1\n"
"POST %s HTTP/1.1\n"
"Host: %s\n"
"Content-Type: application/json\n"
"Content-Length: %d\n\n",
server, contLen);
uri, server, contLen);
} else if (flag == HTTP_GZIP) {
return snprintf(pHead, headLen,
"POST /report HTTP/1.1\n"
"POST %s HTTP/1.1\n"
"Host: %s\n"
"Content-Type: application/json\n"
"Content-Encoding: gzip\n"
"Content-Length: %d\n\n",
server, contLen);
uri, server, contLen);
} else {
return -1;
......@@ -181,6 +182,7 @@ static void httpDestroyMsg(SHttpMsg* msg) {
if (msg == NULL) return;
......@@ -293,10 +295,11 @@ int32_t httpSendQuit() {
return 0;
static int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen,
static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag) {
SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg));
msg->server = strdup(server);
msg->uri = strdup(uri);
msg->port = port;
msg->cont = taosMemoryMalloc(contLen);
memcpy(msg->cont, pCont, contLen);
......@@ -309,12 +312,10 @@ static int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* p
tError("http-report already released");
return -1;
} else {
msg->http = load;
transAsyncSend(load->asyncPool, &(msg->q));
return 0;
msg->http = load;
return transAsyncSend(load->asyncPool, &(msg->q));
static void httpDestroyClientCb(uv_handle_t* handle) {
......@@ -360,7 +361,7 @@ static void httpHandleReq(SHttpMsg* msg) {
int32_t len = 2048;
char* header = taosMemoryCalloc(1, len);
int32_t headLen = taosBuildHttpHeader(msg->server, msg->len, header, len, msg->flag);
int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, header, len, msg->flag);
if (headLen < 0) {
goto END;
......@@ -380,6 +381,7 @@ static void httpHandleReq(SHttpMsg* msg) {
cli->port = msg->port;
cli->dest = dest;
uv_tcp_init(http->loop, &cli->tcp);
......@@ -406,9 +408,9 @@ END:
int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) {
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) {
taosThreadOnce(&transHttpInit, transHttpEnvInit);
return taosSendHttpReportImpl(server, port, pCont, contLen, flag);
return taosSendHttpReportImpl(server, uri, port, pCont, contLen, flag);
static void transHttpEnvInit() {
......@@ -33,7 +33,7 @@ TEST(osTest, osSystem) {
const char *flags = "UTL FATAL ";
ELogLevel level = DEBUG_FATAL;
int32_t dflag = 255; // tsLogEmbedded ? 255 : uDebugFlag
taosPrintTrace(flags, level, dflag);
taosPrintTrace(flags, level, dflag, 0);
void fileOperateOnFree(void *param) {
......@@ -18,6 +18,8 @@
#include "os.h"
#include "tconfig.h"
#include "tutil.h"
#include "tjson.h"
#include "tglobal.h"
#define LOG_MAX_LINE_SIZE (1024)
......@@ -808,7 +810,7 @@ bool taosAssertDebug(bool condition, const char *file, int32_t line, const char
taosPrintLogImp(1, 255, buffer, len);
taosPrintLog(flags, level, dflag, "tAssert at file %s:%d exit:%d", file, line, tsAssert);
taosPrintTrace(flags, level, dflag);
taosPrintTrace(flags, level, dflag, -1);
if (tsAssert) {
// taosCloseLog();
......@@ -824,6 +826,216 @@ bool taosAssertDebug(bool condition, const char *file, int32_t line, const char
return true;
int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime) {
SJson* pJson = tjsonCreateObject();
if (pJson == NULL) return -1;
char tmp[4096] = {0};
tjsonAddDoubleToObject(pJson, "reportVersion", 1);
tjsonAddIntegerToObject(pJson, "clusterId", clusterId);
tjsonAddIntegerToObject(pJson, "startTime", startTime);
tjsonAddStringToObject(pJson, "fqdn", tmp);
tjsonAddIntegerToObject(pJson, "pid", taosGetPId());
taosGetAppName(tmp, NULL);
tjsonAddStringToObject(pJson, "appName", tmp);
if (taosGetOsReleaseName(tmp, sizeof(tmp)) == 0) {
tjsonAddStringToObject(pJson, "os", tmp);
float numOfCores = 0;
if (taosGetCpuInfo(tmp, sizeof(tmp), &numOfCores) == 0) {
tjsonAddStringToObject(pJson, "cpuModel", tmp);
tjsonAddDoubleToObject(pJson, "numOfCpu", numOfCores);
} else {
tjsonAddDoubleToObject(pJson, "numOfCpu", tsNumOfCores);
snprintf(tmp, sizeof(tmp), "%" PRId64 " kB", tsTotalMemoryKB);
tjsonAddStringToObject(pJson, "memory", tmp);
tjsonAddStringToObject(pJson, "version", version);
tjsonAddStringToObject(pJson, "buildInfo", buildinfo);
tjsonAddStringToObject(pJson, "gitInfo", gitinfo);
tjsonAddIntegerToObject(pJson, "crashSig", signum);
tjsonAddIntegerToObject(pJson, "crashTs", taosGetTimestampUs());
#ifdef _TD_DARWIN_64
taosLogTraceToBuf(tmp, sizeof(tmp), 4);
#elif !defined(WINDOWS)
taosLogTraceToBuf(tmp, sizeof(tmp), 3);
taosLogTraceToBuf(tmp, sizeof(tmp), 8);
tjsonAddStringToObject(pJson, "stackInfo", tmp);
char* pCont = tjsonToString(pJson);
*pMsg = pCont;
void taosLogCrashInfo(char* nodeType, char* pMsg, int64_t msgLen, int signum, void *sigInfo) {
const char *flags = "UTL FATAL ";
ELogLevel level = DEBUG_FATAL;
int32_t dflag = 255;
char filepath[PATH_MAX] = {0};
TdFilePtr pFile = NULL;
if (pMsg && msgLen > 0) {
snprintf(filepath, sizeof(filepath), "%s%s.%sCrashLog", tsLogDir, TD_DIRSEP, nodeType);
pFile = taosOpenFile(filepath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pFile == NULL) {
taosPrintLog(flags, level, dflag, "failed to open file:%s since %s", filepath, terrstr());
goto _return;
int64_t writeSize = taosWriteFile(pFile, &msgLen, sizeof(msgLen));
if (sizeof(msgLen) != writeSize) {
taosPrintLog(flags, level, dflag, "failed to write len to file:%s,%p wlen:%" PRId64 " tlen:%lu since %s",
filepath, pFile, writeSize, sizeof(msgLen), terrstr());
goto _return;
writeSize = taosWriteFile(pFile, pMsg, msgLen);
if (msgLen != writeSize) {
taosPrintLog(flags, level, dflag, "failed to write file:%s,%p wlen:%" PRId64 " tlen:%" PRId64 " since %s",
filepath, pFile, writeSize, msgLen, terrstr());
goto _return;
if (pFile) taosCloseFile(&pFile);
terrno = TAOS_SYSTEM_ERROR(errno);
taosPrintLog(flags, level, dflag, "crash signal is %d", signum);
#ifdef _TD_DARWIN_64
taosPrintTrace(flags, level, dflag, 4);
#elif !defined(WINDOWS)
taosPrintLog(flags, level, dflag, "sender PID:%d cmdline:%s", ((siginfo_t *)sigInfo)->si_pid,
taosGetCmdlineByPID(((siginfo_t *)sigInfo)->si_pid));
taosPrintTrace(flags, level, dflag, 3);
taosPrintTrace(flags, level, dflag, 8);
void taosReadCrashInfo(char* filepath, char** pMsg, int64_t* pMsgLen, TdFilePtr* pFd) {
const char *flags = "UTL FATAL ";
ELogLevel level = DEBUG_FATAL;
int32_t dflag = 255;
TdFilePtr pFile = NULL;
bool truncateFile = false;
char* buf = NULL;
if (NULL == *pFd) {
int64_t filesize = 0;
if (taosStatFile(filepath, &filesize, NULL) < 0) {
if (ENOENT == errno) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosPrintLog(flags, level, dflag, "failed to stat file:%s since %s", filepath, terrstr());
if (filesize <= 0) {
pFile = taosOpenFile(filepath, TD_FILE_READ|TD_FILE_WRITE);
if (pFile == NULL) {
if (ENOENT == errno) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosPrintLog(flags, level, dflag, "failed to open file:%s since %s", filepath, terrstr());
} else {
pFile = *pFd;
int64_t msgLen = 0;
int64_t readSize = taosReadFile(pFile, &msgLen, sizeof(msgLen));
if (sizeof(msgLen) != readSize) {
truncateFile = true;
if (readSize < 0) {
taosPrintLog(flags, level, dflag, "failed to read len from file:%s,%p wlen:%" PRId64 " tlen:%lu since %s",
filepath, pFile, readSize, sizeof(msgLen), terrstr());
goto _return;
buf = taosMemoryMalloc(msgLen);
if (NULL == buf) {
taosPrintLog(flags, level, dflag, "failed to malloc buf, size:%" PRId64, msgLen);
goto _return;
readSize = taosReadFile(pFile, buf, msgLen);
if (msgLen != readSize) {
truncateFile = true;
taosPrintLog(flags, level, dflag, "failed to read file:%s,%p wlen:%" PRId64 " tlen:%" PRId64 " since %s",
filepath, pFile, readSize, msgLen, terrstr());
goto _return;
*pMsg = buf;
*pMsgLen = msgLen;
*pFd = pFile;
if (truncateFile) {
taosFtruncateFile(pFile, 0);
*pMsg = NULL;
*pMsgLen = 0;
*pFd = NULL;
void taosReleaseCrashLogFile(TdFilePtr pFile, bool truncateFile) {
if (truncateFile) {
taosFtruncateFile(pFile, 0);
#ifdef NDEBUG
bool taosAssertRelease(bool condition) {
if (condition) return false;
......@@ -842,4 +1054,4 @@ bool taosAssertRelease(bool condition) {
return true;
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册