diff --git a/docs/en/12-taos-sql/14-stream.md b/docs/en/12-taos-sql/14-stream.md index bd1858d93fd52c2324e6e3eaccefa9ce06018e7a..07db5f4877cba0b6bd01487346544a5ef4c97307 100644 --- a/docs/en/12-taos-sql/14-stream.md +++ b/docs/en/12-taos-sql/14-stream.md @@ -89,6 +89,7 @@ create stream if not exists s1 fill_history 1 into st1 as select count(*) from If some streams are totally outdated, and you do not want it to monitor or process anymore, those streams can be manually dropped and output data will be still kept. + ## Delete a Stream ```sql diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 91ec5f52e5f948f45410977d5db3111d4b2183f9..3f11d2a21832ab5b5bab3c0edc8c0fd5ae085a86 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -58,8 +58,7 @@ typedef enum { #define QUERY_RSP_POLICY_QUICK 1 #define QUERY_MSG_MASK_SHOW_REWRITE() (1 << 0) -#define TEST_SHOW_REWRITE_MASK(m) (((m) & QUERY_MSG_MASK_SHOW_REWRITE()) != 0) - +#define TEST_SHOW_REWRITE_MASK(m) (((m)&QUERY_MSG_MASK_SHOW_REWRITE()) != 0) typedef struct STableComInfo { uint8_t numOfTags; // the number of tags in schema @@ -128,7 +127,7 @@ typedef struct SDBVgInfo { int8_t hashMethod; int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT int64_t stateTs; - SHashObj* vgHash; // key:vgId, value:SVgroupInfo + SHashObj* vgHash; // key:vgId, value:SVgroupInfo SArray* vgArray; } SDBVgInfo; @@ -262,23 +261,26 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \ NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code)) -#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR || (_code) == TSDB_CODE_VND_STOPPED || (_code) == TSDB_CODE_APP_IS_STARTING || (_code) == TSDB_CODE_APP_IS_STOPPING) -#define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_RESTORING || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR) -#define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_MNODE_NOT_FOUND) +#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) \ + ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR || \ + (_code) == TSDB_CODE_VND_STOPPED || (_code) == TSDB_CODE_APP_IS_STARTING || (_code) == TSDB_CODE_APP_IS_STOPPING) +#define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) \ + ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_RESTORING || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR) +#define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_MNODE_NOT_FOUND) #define NO_RET_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) -#define NEED_REDIRECT_ERROR(_code) \ +#define NEED_REDIRECT_ERROR(_code) \ (NO_RET_REDIRECT_ERROR(_code) || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \ SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code)) - #define NEED_CLIENT_RM_TBLMETA_REQ(_type) \ ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \ (_type) == TDMT_MND_DROP_STB) -#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \ - (SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code)) +#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \ + (SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || \ + SYNC_OTHER_LEADER_REDIRECT_ERROR(_code)) #define REQUEST_TOTAL_EXEC_TIMES 2 diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index 02a268afda28698aba56148eacb8b205f12e0af2..b6cce249ea935a55e0515a84955702e5e463ca4d 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -214,6 +214,9 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) { case SNODE: terrno = TSDB_CODE_SNODE_NOT_FOUND; break; + case VNODE: + terrno = TSDB_CODE_VND_STOPPED; + break; default: terrno = TSDB_CODE_APP_IS_STOPPING; break; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 117a9f5e67dd258b5f29a43b92f2ef6867766d2a..854535c82f72bf5ca4c3340355980ecad07ff7df 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -663,7 +663,9 @@ _OVER: const STraceId *trace = &pMsg->info.traceId; SEpSet epSet = {0}; + int32_t tmpCode = terrno; mndGetMnodeEpSet(pMnode, &epSet); + terrno = tmpCode; mGDebug( "msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d " diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 42037304b941a5280ea7f591317125673729784d..d2fc2dc9b1ffed70f80a1571d22479e36e38d423 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -927,7 +927,8 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { } } else { if (pTrans->stage == TRN_STAGE_REDO_ACTION) { - if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_APP_IS_STARTING) { + if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_APP_IS_STARTING || + code == TSDB_CODE_SYN_PROPOSE_NOT_READY) { if (pTrans->failedTimes > 60) sendRsp = true; } else { if (pTrans->failedTimes > 6) sendRsp = true; @@ -1336,6 +1337,7 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { } if (mndCannotExecuteTransAction(pMnode)) return false; + terrno = code; if (code == 0) { pTrans->code = 0; diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 5adb2eb3592e7bcba7803b5e2972f0bdd3689adb..c96d01f93afd3feeb126f60e2f675fc849856f68 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -115,7 +115,6 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) { if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "hashPrefix", pCfg->hashPrefix) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "hashSuffix", pCfg->hashSuffix) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "tsdbPageSize", pCfg->tsdbPageSize) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex) < 0) return -1; @@ -256,7 +255,9 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { } tjsonGetNumberValue(pJson, "tsdbPageSize", pCfg->tsdbPageSize, code); - if (code < 0) pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE * 1024; + if (code < 0 || pCfg->tsdbPageSize < TSDB_MIN_PAGESIZE_PER_VNODE * 1024) { + pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE * 1024; + } return 0; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 0736b763a58dde1dc7cef5897f4379cdb23a84e1..6ad126162eb88f3979e5e9f3990d96d8552c78ee 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1598,7 +1598,7 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR || code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || - code == TSDB_CODE_APP_IS_STOPPING) { + code == TSDB_CODE_APP_IS_STOPPING || code == TSDB_CODE_VND_STOPPED) { tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen); noDelay = cliResetEpset(pCtx, pResp, true); transFreeMsg(pResp->pCont); diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index 8c2170239f95ca7c70c9c3228dfb1736f29414aa..d8cccc83ed7ce0942576873859b27b33cc4621ec 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -464,7 +464,9 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) { #if FILE_WITH_LOCK taosThreadRwlockWrlock(&(pFile->rwlock)); #endif - assert(pFile->fd >= 0); // Please check if you have closed the file. + if (pFile->fd < 0) { + return 0; + } int64_t nleft = count; int64_t nwritten = 0; diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 747187254fad1a9bb18275d1f46795aa2a1a8f85..f6f814d82b8f1de69e28790bb6407c629446590e 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -445,6 +445,9 @@ static inline int32_t taosBuildLogHead(char *buffer, const char *flags) { static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *buffer, int32_t len) { if ((dflag & DEBUG_FILE) && tsLogObj.logHandle && tsLogObj.logHandle->pFile != NULL && osLogSpaceAvailable()) { taosUpdateLogNums(level); +#if 0 + // DEBUG_FATAL and DEBUG_ERROR are duplicated + // fsync will cause thread blocking and may also generate log misalignment in case of asyncLog if (tsAsyncLog && level != DEBUG_FATAL) { taosPushLogBuffer(tsLogObj.logHandle, buffer, len); } else { @@ -453,6 +456,13 @@ static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *b taosFsyncFile(tsLogObj.logHandle->pFile); } } +#else + if (tsAsyncLog) { + taosPushLogBuffer(tsLogObj.logHandle, buffer, len); + } else { + taosWriteFile(tsLogObj.logHandle->pFile, buffer, len); + } +#endif if (tsLogObj.maxLines > 0) { atomic_add_fetch_32(&tsLogObj.lines, 1);