提交 2f91256a 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/tsdb_optimize

......@@ -36,7 +36,6 @@ database_option: {
| TSDB_PAGESIZE value
| WAL_RETENTION_PERIOD value
| WAL_RETENTION_SIZE value
| WAL_ROLL_PERIOD value
| WAL_SEGMENT_SIZE value
}
```
......
......@@ -77,7 +77,7 @@ static inline bool tmsgIsValid(tmsg_t type) {
}
static inline bool vnodeIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_COMMIT);
}
static inline bool syncUtilUserCommit(tmsg_t msgType) {
......
......@@ -547,6 +547,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG TAOS_DEF_ERROR_CODE(0, 0x0915) // internal
#define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916)
#define TSDB_CODE_SYN_WRITE_STALL TAOS_DEF_ERROR_CODE(0, 0x0917)
#define TSDB_CODE_SYN_NEGO_WIN_EXCEEDED TAOS_DEF_ERROR_CODE(0, 0X0918)
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
// tq
......
......@@ -285,8 +285,11 @@ typedef enum ELogicConditionType {
#define TSDB_DNODE_ROLE_VNODE 2
#define TSDB_MAX_REPLICA 5
#define TSDB_SYNC_LOG_BUFFER_SIZE 4096
#define TSDB_SYNC_LOG_BUFFER_RETENTION (TSDB_SYNC_LOG_BUFFER_SIZE >> 4)
#define TSDB_SYNC_LOG_BUFFER_RETENTION 256
#define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512
#define TSDB_SYNC_NEGOTIATION_WIN 512
#define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
......
......@@ -197,7 +197,8 @@ if [[ $productName == "TDengine" ]]; then
mkdir -p ${install_dir}/connector
if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
if [ "$osType" != "Darwin" ]; then
[ -f ${build_dir}/lib/*.jar ] && cp ${build_dir}/lib/*.jar ${install_dir}/connector || :
jars=$(ls ${build_dir}/lib/*.jar 2>/dev/null|wc -l)
[ "${jars}" != "0" ] && cp ${build_dir}/lib/*.jar ${install_dir}/connector || :
fi
git clone --depth 1 https://github.com/taosdata/driver-go ${install_dir}/connector/go
rm -rf ${install_dir}/connector/go/.git ||:
......
......@@ -338,7 +338,20 @@ if [ "$verMode" == "cluster" ] || [ "$verMode" == "cloud" ]; then
connector_dir="${code_dir}/connector"
mkdir -p ${install_dir}/connector
if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
[ -f ${build_dir}/lib/*.jar ] && cp ${build_dir}/lib/*.jar ${install_dir}/connector || :
tmp_pwd=`pwd`
cd ${install_dir}/connector
if [ ! -d taos-connector-jdbc ];then
git clone -b main --depth=1 https://github.com/taosdata/taos-connector-jdbc.git ||:
fi
cd taos-connector-jdbc
mvn clean package -Dmaven.test.skip=true
echo ${build_dir}/lib/
cp target/*.jar ${build_dir}/lib/
cd ${install_dir}/connector
rm -rf taos-connector-jdbc
cd ${tmp_pwd}
jars=$(ls ${build_dir}/lib/*.jar 2>/dev/null|wc -l)
[ "${jars}" != "0" ] && cp ${build_dir}/lib/*.jar ${install_dir}/connector || :
git clone --depth 1 https://github.com/taosdata/driver-go ${install_dir}/connector/go
rm -rf ${install_dir}/connector/go/.git ||:
......
......@@ -97,7 +97,6 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit.c
int32_t vnodeBegin(SVnode* pVnode);
int32_t vnodeShouldCommit(SVnode* pVnode, bool atExit);
void vnodeUpdCommitSched(SVnode* pVnode);
void vnodeRollback(SVnode* pVnode);
int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
int32_t vnodeCommitInfo(const char* dir);
......
......@@ -378,7 +378,6 @@ struct SVnode {
STQ* pTq;
SSink* pSink;
tsem_t canCommit;
SVCommitSched commitSched;
int64_t sync;
TdThreadMutex lock;
bool blocked;
......@@ -387,9 +386,6 @@ struct SVnode {
int32_t blockSec;
int64_t blockSeq;
SQHandle* pQuery;
#if 0
SRpcHandleInfo blockInfo;
#endif
};
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)
......
......@@ -150,23 +150,13 @@ _exit:
return code;
}
void vnodeUpdCommitSched(SVnode *pVnode) {
int64_t randNum = taosRand();
pVnode->commitSched.commitMs = taosGetMonoTimestampMs();
pVnode->commitSched.maxWaitMs = tsVndCommitMaxIntervalMs + (randNum % tsVndCommitMaxIntervalMs);
}
int vnodeShouldCommit(SVnode *pVnode, bool atExit) {
SVCommitSched *pSched = &pVnode->commitSched;
int64_t nowMs = taosGetMonoTimestampMs();
bool diskAvail = osDataSpaceAvailable();
bool needCommit = false;
taosThreadMutexLock(&pVnode->mutex);
if (pVnode->inUse && diskAvail) {
needCommit =
((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) ||
((pVnode->inUse->size > 0) && atExit);
needCommit = (pVnode->inUse->size > pVnode->inUse->node.size) || (pVnode->inUse->size > 0 && atExit);
}
taosThreadMutexUnlock(&pVnode->mutex);
return needCommit;
......@@ -442,8 +432,6 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode),
pInfo->info.state.commitID, pInfo->info.state.committed, pInfo->info.state.commitTerm);
vnodeUpdCommitSched(pVnode);
// persist wal before starting
if (walPersist(pVnode->pWal) < 0) {
vError("vgId:%d, failed to persist wal since %s", TD_VID(pVnode), terrstr());
......
......@@ -286,8 +286,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
taosThreadMutexInit(&pVnode->mutex, NULL);
taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
vnodeUpdCommitSched(pVnode);
int8_t rollback = vnodeShouldRollback(pVnode);
// open buffer pool
......
......@@ -112,9 +112,6 @@ static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak
pVnode->blocked = true;
pVnode->blockSec = taosGetTimestampSec();
pVnode->blockSeq = seq;
#if 0
pVnode->blockInfo = pMsg->info;
#endif
}
taosThreadMutexUnlock(&pVnode->lock);
......@@ -157,8 +154,6 @@ void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) {
} else {
tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
vnodeUpdCommitSched(pVnode);
}
#if BATCH_ENABLE
......
......@@ -3035,12 +3035,13 @@ static int32_t translateSelectList(STranslateContext* pCxt, SSelectStmt* pSelect
}
static int32_t translateHaving(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (NULL == pSelect->pGroupByList && NULL != pSelect->pHaving) {
if (NULL == pSelect->pGroupByList && NULL == pSelect->pPartitionByList && NULL == pSelect->pWindow &&
NULL != pSelect->pHaving) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION);
}
pCxt->currClause = SQL_CLAUSE_HAVING;
int32_t code = translateExpr(pCxt, &pSelect->pHaving);
if (TSDB_CODE_SUCCESS == code) {
if (TSDB_CODE_SUCCESS == code && (NULL != pSelect->pGroupByList || NULL != pSelect->pWindow)) {
code = checkExprForGroupBy(pCxt, &pSelect->pHaving);
}
return code;
......
......@@ -740,6 +740,13 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
code = createColumnByRewriteExprs(pWindow->pFuncs, &pWindow->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving) {
pWindow->node.pConditions = nodesCloneNode(pSelect->pHaving);
if (NULL == pWindow->node.pConditions) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
pSelect->hasAggFuncs = false;
if (TSDB_CODE_SUCCESS == code) {
......@@ -1132,6 +1139,14 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS
}
}
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving && !pSelect->hasAggFuncs && NULL == pSelect->pGroupByList &&
NULL == pSelect->pWindow) {
pPartition->node.pConditions = nodesCloneNode(pSelect->pHaving);
if (NULL == pPartition->node.pConditions) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicNode = (SLogicNode*)pPartition;
} else {
......
......@@ -53,8 +53,15 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
goto _err;
}
if (pNode->restoreFinish && index - pBuf->commitIndex >= TSDB_SYNC_NEGOTIATION_WIN) {
terrno = TSDB_CODE_SYN_NEGO_WIN_EXCEEDED;
sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64, pNode->vgId, terrstr(),
index, pBuf->commitIndex);
goto _err;
}
SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= pBuf->size) {
if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= TSDB_SYNC_APPLYQ_SIZE_LIMIT) {
terrno = TSDB_CODE_SYN_WRITE_STALL;
sError("vgId:%d, failed to append since %s. index:%" PRId64 ", commit-index:%" PRId64 ", applied-index:%" PRId64,
pNode->vgId, terrstr(), index, pBuf->commitIndex, appliedIndex);
......@@ -83,6 +90,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
_err:
syncLogBufferValidate(pBuf);
taosThreadMutexUnlock(&pBuf->mutex);
taosMsleep(1);
return -1;
}
......
......@@ -202,21 +202,22 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
// restore error code
terrno = errCode;
SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
if (pNode != NULL) {
taosPrintLog(flags, level, dflag,
"vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", first-ver:%" PRId64
", last-ver:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-term:%" PRIu64
"vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", applied-index:%" PRId64
", first-ver:%" PRId64 ", last-ver:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-term:%" PRIu64
", elect-times:%d, as-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, "
"aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64
", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64
", buffer:%s, repl-mgrs:%s, members:%s, hb:%s, hb-reply:%s",
pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, logBeginIndex,
logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->electNum,
pNode->becomeLeaderNum, pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems,
pNode->snapshottingIndex, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing,
pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser,
bufferStatesStr, replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr);
pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, appliedIndex,
logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pNode->electNum, pNode->becomeLeaderNum, pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum,
aqItems, pNode->snapshottingIndex, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing,
pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock,
pNode->heartbeatTimerLogicClockUser, bufferStatesStr, replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr);
}
}
......
......@@ -426,6 +426,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RESTORING, "Sync leader is restor
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot msg")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BUFFER_FULL, "Sync buffer is full")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRITE_STALL, "Sync write stall")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEGO_WIN_EXCEEDED, "Sync negotiation win exceeded")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
//tq
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册