提交 fb265ff9 编写于 作者: K kailixu

Merge branch '3.0' into feature/TD-19148-D

...@@ -62,6 +62,7 @@ int32_t taosRealPath(char *dirname, char *realPath, int32_t maxlen); ...@@ -62,6 +62,7 @@ int32_t taosRealPath(char *dirname, char *realPath, int32_t maxlen);
bool taosIsDir(const char *dirname); bool taosIsDir(const char *dirname);
char *taosDirName(char *dirname); char *taosDirName(char *dirname);
char *taosDirEntryBaseName(char *dirname); char *taosDirEntryBaseName(char *dirname);
void taosGetCwd(char *buf, int32_t len);
TdDirPtr taosOpenDir(const char *dirname); TdDirPtr taosOpenDir(const char *dirname);
TdDirEntryPtr taosReadDir(TdDirPtr pDir); TdDirEntryPtr taosReadDir(TdDirPtr pDir);
......
...@@ -62,8 +62,11 @@ void taosResetTerminalMode(); ...@@ -62,8 +62,11 @@ void taosResetTerminalMode();
taosMemoryFree(strings); \ taosMemoryFree(strings); \
} }
#else #else
#define taosPrintTrace(flags, level, dflag) \ #define taosPrintTrace(flags, level, dflag) \
{} { \
taosPrintLog(flags, level, dflag, \
"backtrace not implemented on windows, so detailed stack information cannot be printed"); \
}
#endif #endif
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -307,8 +307,9 @@ typedef enum ELogicConditionType { ...@@ -307,8 +307,9 @@ typedef enum ELogicConditionType {
#define TSDB_MIN_DURATION_PER_FILE 60 // unit minute #define TSDB_MIN_DURATION_PER_FILE 60 // unit minute
#define TSDB_MAX_DURATION_PER_FILE (3650 * 1440) #define TSDB_MAX_DURATION_PER_FILE (3650 * 1440)
#define TSDB_DEFAULT_DURATION_PER_FILE (10 * 1440) #define TSDB_DEFAULT_DURATION_PER_FILE (10 * 1440)
#define TSDB_MIN_KEEP (1 * 1440) // data in db to be reserved. unit minute #define TSDB_MIN_KEEP (1 * 1440) // data in db to be reserved. unit minute
#define TSDB_MAX_KEEP (365000 * 1440) // data in db to be reserved. #define TSDB_MAX_KEEP (365000 * 1440) // data in db to be reserved.
#define TSDB_MAX_KEEP_NS (365 * 292 * 1440) // data in db to be reserved.
#define TSDB_DEFAULT_KEEP (3650 * 1440) // ten years #define TSDB_DEFAULT_KEEP (3650 * 1440) // ten years
#define TSDB_MIN_MINROWS_FBLOCK 10 #define TSDB_MIN_MINROWS_FBLOCK 10
#define TSDB_MAX_MINROWS_FBLOCK 1000 #define TSDB_MAX_MINROWS_FBLOCK 1000
......
...@@ -83,8 +83,8 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons ...@@ -83,8 +83,8 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
#endif #endif
; ;
bool taosAssertLog(bool condition, const char *file, int32_t line, const char *format, ...); bool taosAssert(bool condition, const char *file, int32_t line, const char *format, ...);
#define ASSERTS(condition, ...) taosAssertLog(condition, __FILE__, __LINE__, __VA_ARGS__) #define ASSERTS(condition, ...) taosAssert(condition, __FILE__, __LINE__, __VA_ARGS__)
#define ASSERT(condition) ASSERTS(condition, "assert info not provided") #define ASSERT(condition) ASSERTS(condition, "assert info not provided")
// clang-format off // clang-format off
......
...@@ -875,6 +875,7 @@ void tmqFreeImpl(void* handle) { ...@@ -875,6 +875,7 @@ void tmqFreeImpl(void* handle) {
tmq_t* tmq = (tmq_t*)handle; tmq_t* tmq = (tmq_t*)handle;
// TODO stop timer // TODO stop timer
tmqClearUnhandleMsg(tmq);
if (tmq->mqueue) taosCloseQueue(tmq->mqueue); if (tmq->mqueue) taosCloseQueue(tmq->mqueue);
if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask); if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
if (tmq->qall) taosFreeQall(tmq->qall); if (tmq->qall) taosFreeQall(tmq->qall);
...@@ -884,8 +885,7 @@ void tmqFreeImpl(void* handle) { ...@@ -884,8 +885,7 @@ void tmqFreeImpl(void* handle) {
int32_t sz = taosArrayGetSize(tmq->clientTopics); int32_t sz = taosArrayGetSize(tmq->clientTopics);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema); taosMemoryFreeClear(pTopic->schema.pSchema);
int32_t vgSz = taosArrayGetSize(pTopic->vgs);
taosArrayDestroy(pTopic->vgs); taosArrayDestroy(pTopic->vgs);
} }
taosArrayDestroy(tmq->clientTopics); taosArrayDestroy(tmq->clientTopics);
...@@ -1304,7 +1304,6 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { ...@@ -1304,7 +1304,6 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema); if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
int32_t vgSz = taosArrayGetSize(pTopic->vgs);
taosArrayDestroy(pTopic->vgs); taosArrayDestroy(pTopic->vgs);
} }
taosArrayDestroy(tmq->clientTopics); taosArrayDestroy(tmq->clientTopics);
...@@ -1410,7 +1409,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { ...@@ -1410,7 +1409,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
return -1; return -1;
} }
void* pReq = taosMemoryCalloc(1, tlen); void* pReq = taosMemoryCalloc(1, tlen);
if (tlen < 0) { if (pReq == NULL) {
tscError("failed to malloc askEpReq msg, size:%d", tlen); tscError("failed to malloc askEpReq msg, size:%d", tlen);
return -1; return -1;
} }
...@@ -1738,7 +1737,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1738,7 +1737,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
return pRsp; return pRsp;
} else { } else {
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n", tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "dmMgmt.h" #include "dmMgmt.h"
#include "mnode.h" #include "mnode.h"
#include "tconfig.h" #include "tconfig.h"
#include "tglobal.h"
// clang-format off // clang-format off
#define DM_APOLLO_URL "The apollo string to use when configuring the server, such as: -a 'jsonFile:./tests/cfg.json', cfg.json text can be '{\"fqdn\":\"td1\"}'." #define DM_APOLLO_URL "The apollo string to use when configuring the server, such as: -a 'jsonFile:./tests/cfg.json', cfg.json text can be '{\"fqdn\":\"td1\"}'."
...@@ -45,9 +46,30 @@ static struct { ...@@ -45,9 +46,30 @@ static struct {
SArray *pArgs; // SConfigPair SArray *pArgs; // SConfigPair
} global = {0}; } global = {0};
static void dmStopDnode(int signum, void *info, void *ctx) { dmStop(); } static void dmSetDebugFlag(int32_t signum, void *sigInfo, void *context) { taosSetAllDebugFlag(143, true); }
static void dmSetAssert(int32_t signum, void *sigInfo, void *context) { tsAssert = 1; }
static void dmStopDnode(int signum, void *sigInfo, void *context) {
// taosIgnSignal(SIGUSR1);
// taosIgnSignal(SIGUSR2);
taosIgnSignal(SIGTERM);
taosIgnSignal(SIGHUP);
taosIgnSignal(SIGINT);
taosIgnSignal(SIGABRT);
taosIgnSignal(SIGBREAK);
dInfo("shut down signal is %d", signum);
#ifndef WINDOWS
dInfo("sender PID:%d cmdline:%s", ((siginfo_t *)sigInfo)->si_pid,
taosGetCmdlineByPID(((siginfo_t *)sigInfo)->si_pid));
#endif
dmStop();
}
static void dmSetSignalHandle() { static void dmSetSignalHandle() {
taosSetSignal(SIGUSR1, dmSetDebugFlag);
taosSetSignal(SIGUSR2, dmSetAssert);
taosSetSignal(SIGTERM, dmStopDnode); taosSetSignal(SIGTERM, dmStopDnode);
taosSetSignal(SIGHUP, dmStopDnode); taosSetSignal(SIGHUP, dmStopDnode);
taosSetSignal(SIGINT, dmStopDnode); taosSetSignal(SIGINT, dmStopDnode);
...@@ -105,6 +127,19 @@ static int32_t dmParseArgs(int32_t argc, char const *argv[]) { ...@@ -105,6 +127,19 @@ static int32_t dmParseArgs(int32_t argc, char const *argv[]) {
return 0; return 0;
} }
static void dmPrintArgs(int32_t argc, char const *argv[]) {
char path[1024] = {0};
taosGetCwd(path, sizeof(path));
char args[1024] = {0};
int32_t arglen = snprintf(args, sizeof(args), "%s", argv[0]);
for (int32_t i = 1; i < argc; ++i) {
arglen = arglen + snprintf(args + arglen, sizeof(args) - arglen, " %s", argv[i]);
}
dInfo("startup path:%s args:%s", path, args);
}
static void dmGenerateGrant() { mndGenerateMachineCode(); } static void dmGenerateGrant() { mndGenerateMachineCode(); }
static void dmPrintVersion() { static void dmPrintVersion() {
...@@ -194,6 +229,8 @@ int mainWindows(int argc, char **argv) { ...@@ -194,6 +229,8 @@ int mainWindows(int argc, char **argv) {
return -1; return -1;
} }
dmPrintArgs(argc, argv);
if (taosInitCfg(configDir, global.envCmd, global.envFile, global.apolloUrl, global.pArgs, 0) != 0) { if (taosInitCfg(configDir, global.envCmd, global.envFile, global.apolloUrl, global.pArgs, 0) != 0) {
dError("failed to start since read config error"); dError("failed to start since read config error");
taosCloseLog(); taosCloseLog();
......
...@@ -1353,6 +1353,10 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { ...@@ -1353,6 +1353,10 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
goto end; goto end;
} }
if (stbEntry.stbEntry.schemaTag.pSchema == NULL) {
goto end;
}
pTagColumn = &stbEntry.stbEntry.schemaTag.pSchema[0]; pTagColumn = &stbEntry.stbEntry.schemaTag.pSchema[0];
STagVal tagVal = {.cid = pTagColumn->colId}; STagVal tagVal = {.cid = pTagColumn->colId};
......
...@@ -952,7 +952,10 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs ...@@ -952,7 +952,10 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
SArray *pDelIdxArray = taosArrayInit(32, sizeof(SDelIdx)); SArray *pDelIdxArray = taosArrayInit(32, sizeof(SDelIdx));
code = tsdbReadDelIdx(pDelFReader, pDelIdxArray); code = tsdbReadDelIdx(pDelFReader, pDelIdxArray);
if (code) goto _err; if (code) {
tsdbDelFReaderClose(&pDelFReader);
goto _err;
}
SDelIdx *delIdx = taosArraySearch(pDelIdxArray, &(SDelIdx){.suid = suid, .uid = uid}, tCmprDelIdx, TD_EQ); SDelIdx *delIdx = taosArraySearch(pDelIdxArray, &(SDelIdx){.suid = suid, .uid = uid}, tCmprDelIdx, TD_EQ);
......
...@@ -962,6 +962,7 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) { ...@@ -962,6 +962,7 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) {
} }
} }
pDFileSet->diskId = pSet->diskId;
goto _exit; goto _exit;
} }
} }
......
...@@ -863,19 +863,20 @@ static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) { ...@@ -863,19 +863,20 @@ static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) {
int32_t compareWinRes(void* pKey, void* data, int32_t index) { int32_t compareWinRes(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data; SArray* res = (SArray*)data;
SWinKey* pos = taosArrayGet(res, index); SWinKey* pDataPos = taosArrayGet(res, index);
SResKeyPos* pData = (SResKeyPos*)pKey; SResKeyPos* pRKey = (SResKeyPos*)pKey;
if (*(int64_t*)pData->key == pos->ts) { if (pRKey->groupId > pDataPos->groupId) {
if (pData->groupId > pos->groupId) {
return 1;
} else if (pData->groupId < pos->groupId) {
return -1;
}
return 0;
} else if (*(int64_t*)pData->key > pos->ts) {
return 1; return 1;
} else if (pRKey->groupId < pDataPos->groupId) {
return -1;
} }
return -1;
if (*(int64_t*)pRKey->key > pDataPos->ts) {
return 1;
} else if (*(int64_t*)pRKey->key < pDataPos->ts){
return -1;
}
return 0;
} }
static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) { static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
...@@ -1400,19 +1401,21 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) { ...@@ -1400,19 +1401,21 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
int32_t compareWinKey(void* pKey, void* data, int32_t index) { int32_t compareWinKey(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data; SArray* res = (SArray*)data;
SWinKey* pos = taosArrayGet(res, index); SWinKey* pDataPos = taosArrayGet(res, index);
SWinKey* pData = (SWinKey*)pKey; SWinKey* pWKey = (SWinKey*)pKey;
if (pData->ts == pos->ts) {
if (pData->groupId > pos->groupId) { if (pWKey->groupId > pDataPos->groupId) {
return 1;
} else if (pData->groupId < pos->groupId) {
return -1;
}
return 0;
} else if (pData->ts > pos->ts) {
return 1; return 1;
} else if (pWKey->groupId < pDataPos->groupId) {
return -1;
} }
return -1;
if (pWKey->ts > pDataPos->ts) {
return 1;
} else if (pWKey->ts < pDataPos->ts) {
return -1;
}
return 0;
} }
static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval, static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
......
...@@ -368,11 +368,13 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { ...@@ -368,11 +368,13 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
pSlot->info.data = NULL; pSlot->info.data = NULL;
} }
SArray *pPageIdList = (SArray *)taosHashGet(pBucket->groupPagesMap, &groupId, sizeof(groupId)); SArray *pPageIdList;
if (pPageIdList == NULL) { void *p = taosHashGet(pBucket->groupPagesMap, &groupId, sizeof(groupId));
SArray *pList = taosArrayInit(4, sizeof(int32_t)); if (p == NULL) {
taosHashPut(pBucket->groupPagesMap, &groupId, sizeof(groupId), &pList, POINTER_BYTES); pPageIdList = taosArrayInit(4, sizeof(int32_t));
pPageIdList = pList; taosHashPut(pBucket->groupPagesMap, &groupId, sizeof(groupId), &pPageIdList, POINTER_BYTES);
} else {
pPageIdList = *(SArray **)p;
} }
pSlot->info.data = getNewBufPage(pBucket->pBuffer, &pageId); pSlot->info.data = getNewBufPage(pBucket->pBuffer, &pageId);
......
...@@ -3879,12 +3879,17 @@ static int32_t checkDbKeepOption(STranslateContext* pCxt, SDatabaseOptions* pOpt ...@@ -3879,12 +3879,17 @@ static int32_t checkDbKeepOption(STranslateContext* pCxt, SDatabaseOptions* pOpt
pOptions->keep[2] = getBigintFromValueNode((SValueNode*)nodesListGetNode(pOptions->pKeep, 2)); pOptions->keep[2] = getBigintFromValueNode((SValueNode*)nodesListGetNode(pOptions->pKeep, 2));
} }
int64_t tsdbMaxKeep = TSDB_MAX_KEEP;
if (pOptions->precision == TSDB_TIME_PRECISION_NANO) {
tsdbMaxKeep = TSDB_MAX_KEEP_NS;
}
if (pOptions->keep[0] < TSDB_MIN_KEEP || pOptions->keep[1] < TSDB_MIN_KEEP || pOptions->keep[2] < TSDB_MIN_KEEP || if (pOptions->keep[0] < TSDB_MIN_KEEP || pOptions->keep[1] < TSDB_MIN_KEEP || pOptions->keep[2] < TSDB_MIN_KEEP ||
pOptions->keep[0] > TSDB_MAX_KEEP || pOptions->keep[1] > TSDB_MAX_KEEP || pOptions->keep[2] > TSDB_MAX_KEEP) { pOptions->keep[0] > tsdbMaxKeep || pOptions->keep[1] > tsdbMaxKeep || pOptions->keep[2] > tsdbMaxKeep) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION, return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option keep: %" PRId64 ", %" PRId64 ", %" PRId64 " valid range: [%dm, %dm]", "Invalid option keep: %" PRId64 ", %" PRId64 ", %" PRId64 " valid range: [%dm, %dm]",
pOptions->keep[0], pOptions->keep[1], pOptions->keep[2], TSDB_MIN_KEEP, pOptions->keep[0], pOptions->keep[1], pOptions->keep[2], TSDB_MIN_KEEP,
TSDB_MAX_KEEP); tsdbMaxKeep);
} }
if (!((pOptions->keep[0] <= pOptions->keep[1]) && (pOptions->keep[1] <= pOptions->keep[2]))) { if (!((pOptions->keep[0] <= pOptions->keep[1]) && (pOptions->keep[1] <= pOptions->keep[2]))) {
...@@ -4036,7 +4041,10 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName ...@@ -4036,7 +4041,10 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName
TSDB_MAX_MINROWS_FBLOCK); TSDB_MAX_MINROWS_FBLOCK);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkDbKeepOption(pCxt, pOptions); code = checkDbPrecisionOption(pCxt, pOptions);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkDbKeepOption(pCxt, pOptions); // use precision
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkDbRangeOption(pCxt, "pages", pOptions->pages, TSDB_MIN_PAGES_PER_VNODE, TSDB_MAX_PAGES_PER_VNODE); code = checkDbRangeOption(pCxt, "pages", pOptions->pages, TSDB_MIN_PAGES_PER_VNODE, TSDB_MAX_PAGES_PER_VNODE);
...@@ -4049,9 +4057,6 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName ...@@ -4049,9 +4057,6 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName
code = checkDbRangeOption(pCxt, "tsdbPagesize", pOptions->tsdbPageSize, TSDB_MIN_TSDB_PAGESIZE, code = checkDbRangeOption(pCxt, "tsdbPagesize", pOptions->tsdbPageSize, TSDB_MIN_TSDB_PAGESIZE,
TSDB_MAX_TSDB_PAGESIZE); TSDB_MAX_TSDB_PAGESIZE);
} }
if (TSDB_CODE_SUCCESS == code) {
code = checkDbPrecisionOption(pCxt, pOptions);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkDbEnumOption(pCxt, "replications", pOptions->replica, TSDB_MIN_DB_REPLICA, TSDB_MAX_DB_REPLICA); code = checkDbEnumOption(pCxt, "replications", pOptions->replica, TSDB_MIN_DB_REPLICA, TSDB_MAX_DB_REPLICA);
} }
......
...@@ -497,3 +497,11 @@ int32_t taosCloseDir(TdDirPtr *ppDir) { ...@@ -497,3 +497,11 @@ int32_t taosCloseDir(TdDirPtr *ppDir) {
return 0; return 0;
#endif #endif
} }
void taosGetCwd(char *buf, int32_t len) {
#if !defined(WINDOWS)
(void)getcwd(buf, len - 1);
#else
strncpy(buf, "not implemented on windows", len -1);
#endif
}
...@@ -780,7 +780,7 @@ cmp_end: ...@@ -780,7 +780,7 @@ cmp_end:
return ret; return ret;
} }
bool taosAssertLog(bool condition, const char *file, int32_t line, const char *format, ...) { bool taosAssert(bool condition, const char *file, int32_t line, const char *format, ...) {
if (condition) return false; if (condition) return false;
const char *flags = "UTL FATAL "; const char *flags = "UTL FATAL ";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册