提交 d57b28ac 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/main' into fix/TD-21663

...@@ -15,13 +15,13 @@ ...@@ -15,13 +15,13 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndMnode.h" #include "mndMnode.h"
#include "mndCluster.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndPrivilege.h" #include "mndPrivilege.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndSync.h" #include "mndSync.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "tmisce.h" #include "tmisce.h"
#include "mndCluster.h"
#define MNODE_VER_NUMBER 1 #define MNODE_VER_NUMBER 1
#define MNODE_RESERVE_SIZE 64 #define MNODE_RESERVE_SIZE 64
...@@ -181,9 +181,8 @@ _OVER: ...@@ -181,9 +181,8 @@ _OVER:
static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) { static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) {
mTrace("mnode:%d, perform insert action, row:%p", pObj->id, pObj); mTrace("mnode:%d, perform insert action, row:%p", pObj->id, pObj);
pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id); pObj->pDnode = sdbAcquireNotReadyObj(pSdb, SDB_DNODE, &pObj->id);
if (pObj->pDnode == NULL) { if (pObj->pDnode == NULL) {
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
mError("mnode:%d, failed to perform insert action since %s", pObj->id, terrstr()); mError("mnode:%d, failed to perform insert action since %s", pObj->id, terrstr());
return -1; return -1;
} }
......
...@@ -291,6 +291,7 @@ int32_t sdbWriteWithoutFree(SSdb *pSdb, SSdbRaw *pRaw); ...@@ -291,6 +291,7 @@ int32_t sdbWriteWithoutFree(SSdb *pSdb, SSdbRaw *pRaw);
* @return void* The object of the row. * @return void* The object of the row.
*/ */
void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey); void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey);
void *sdbAcquireNotReadyObj(SSdb *pSdb, ESdbType type, const void *pKey);
/** /**
* @brief Release a row from sdb. * @brief Release a row from sdb.
......
...@@ -270,7 +270,7 @@ int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw) { ...@@ -270,7 +270,7 @@ int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw) {
return code; return code;
} }
void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { void *sdbAcquireAll(SSdb *pSdb, ESdbType type, const void *pKey, bool onlyReady) {
terrno = 0; terrno = 0;
SHashObj *hash = sdbGetHash(pSdb, type); SHashObj *hash = sdbGetHash(pSdb, type);
...@@ -306,10 +306,24 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { ...@@ -306,10 +306,24 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
break; break;
} }
if (pRet == NULL) {
if (!onlyReady) {
terrno = 0;
atomic_add_fetch_32(&pRow->refCount, 1);
pRet = pRow->pObj;
sdbPrintOper(pSdb, pRow, "acquire");
}
}
sdbUnLock(pSdb, type); sdbUnLock(pSdb, type);
return pRet; return pRet;
} }
void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { return sdbAcquireAll(pSdb, type, pKey, true); }
void *sdbAcquireNotReadyObj(SSdb *pSdb, ESdbType type, const void *pKey) {
return sdbAcquireAll(pSdb, type, pKey, false);
}
static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) { static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) {
int32_t type = pRow->type; int32_t type = pRow->type;
sdbWriteLock(pSdb, type); sdbWriteLock(pSdb, type);
......
...@@ -192,6 +192,8 @@ SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) { ...@@ -192,6 +192,8 @@ SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
return SYNC_TERM_INVALID; return SYNC_TERM_INVALID;
} }
static inline bool raftLogForceSync(SSyncRaftEntry* pEntry) { return (pEntry->originalRpcType == TDMT_VND_COMMIT); }
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
...@@ -219,9 +221,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -219,9 +221,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
ASSERT(pEntry->index == index); ASSERT(pEntry->index == index);
if (pEntry->originalRpcType == TDMT_VND_COMMIT) { bool forceSync = raftLogForceSync(pEntry);
walFsync(pWal, true); walFsync(pWal, forceSync);
}
sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index, sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index,
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed); TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed);
......
...@@ -1316,11 +1316,11 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, ...@@ -1316,11 +1316,11 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
} }
TDB_CELLDECODER_SET_FREE_KEY(pDecoder); TDB_CELLDECODER_SET_FREE_KEY(pDecoder);
memcpy(pDecoder->pKey, pCell + nHeader, nLocal - 4); memcpy(pDecoder->pKey, pCell + nHeader, nLocal - nHeader - sizeof(pgno));
nLeft -= nLocal - 4; nLeft -= nLocal - nHeader - sizeof(pgno);
nLeftKey -= nLocal - 4; nLeftKey -= nLocal - nHeader - sizeof(pgno);
memcpy(&pgno, pCell + nHeader + nLocal - 4, sizeof(pgno)); memcpy(&pgno, pCell + nLocal - sizeof(pgno), sizeof(pgno));
int lastKeyPageSpace = 0; int lastKeyPageSpace = 0;
// load left key & val to ovpages // load left key & val to ovpages
...@@ -1346,9 +1346,11 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, ...@@ -1346,9 +1346,11 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
if (lastKeyPage) { if (lastKeyPage) {
if (lastKeyPageSpace >= vLen) { if (lastKeyPageSpace >= vLen) {
pDecoder->pVal = ofpCell + kLen - nLeftKey; if (vLen > 0) {
pDecoder->pVal = ofpCell + kLen - nLeftKey;
nLeft -= vLen; nLeft -= vLen;
}
pgno = 0; pgno = 0;
} else { } else {
// read partial val to local // read partial val to local
......
...@@ -637,11 +637,6 @@ int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in ...@@ -637,11 +637,6 @@ int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
void walFsync(SWal *pWal, bool forceFsync) { void walFsync(SWal *pWal, bool forceFsync) {
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) { if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%" PRId64 ".idx, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
if (taosFsyncFile(pWal->pIdxFile) < 0) {
wError("vgId:%d, file:%" PRId64 ".idx, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal),
strerror(errno));
}
wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal)); wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
if (taosFsyncFile(pWal->pLogFile) < 0) { if (taosFsyncFile(pWal->pLogFile) < 0) {
wError("vgId:%d, file:%" PRId64 ".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal), wError("vgId:%d, file:%" PRId64 ".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal),
......
...@@ -445,6 +445,7 @@ ...@@ -445,6 +445,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/database_pre_suf.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/database_pre_suf.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/InsertFuturets.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/InsertFuturets.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/information_schema.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/abs.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/abs.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/abs.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/abs.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/and_or_for_byte.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/and_or_for_byte.py
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
from util.sqlset import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.setsql = TDSetSql()
self.dbname = 'db'
self.stbname = 'stb'
self.binary_length = 20 # the length of binary for column_dict
self.nchar_length = 20 # the length of nchar for column_dict
self.ts = 1537146000000
self.column_dict = {
'ts' : 'timestamp',
'col1': 'tinyint',
'col2': 'smallint',
'col3': 'int',
'col4': 'bigint',
'col5': 'tinyint unsigned',
'col6': 'smallint unsigned',
'col7': 'int unsigned',
'col8': 'bigint unsigned',
'col9': 'float',
'col10': 'double',
'col11': 'bool',
'col12': f'binary({self.binary_length})',
'col13': f'nchar({self.nchar_length})'
}
self.tbnum = 20
self.rowNum = 10
self.tag_dict = {
't0':'int'
}
self.tag_values = [
f'1'
]
self.binary_str = 'taosdata'
self.nchar_str = '涛思数据'
self.ins_list = ['ins_dnodes','ins_mnodes','ins_modules','ins_qnodes','ins_snodes','ins_cluster','ins_databases','ins_functions',\
'ins_indexes','ins_stables','ins_tables','ins_tags','ins_users','ins_grants','ins_vgroups','ins_configs','ins_dnode_variables',\
'ins_topics','ins_subscriptions','ins_streams','ins_stream_tasks','ins_vnodes','ins_user_privileges']
self.perf_list = ['perf_connections','perf_queries','perf_consumers','perf_trans','perf_apps']
def insert_data(self,column_dict,tbname,row_num):
insert_sql = self.setsql.set_insertsql(column_dict,tbname,self.binary_str,self.nchar_str)
for i in range(row_num):
insert_list = []
self.setsql.insert_values(column_dict,i,insert_sql,insert_list,self.ts)
def prepare_data(self):
tdSql.execute(f"create database if not exists {self.dbname} vgroups 2")
tdSql.execute(f'use {self.dbname}')
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict))
for i in range(self.tbnum):
tdSql.execute(f"create table {self.stbname}_{i} using {self.stbname} tags({self.tag_values[0]})")
self.insert_data(self.column_dict,f'{self.stbname}_{i}',self.rowNum)
def count_check(self):
tdSql.query('select count(*) from information_schema.ins_tables')
tdSql.checkEqual(tdSql.queryResult[0][0],self.tbnum+len(self.ins_list)+len(self.perf_list))
tdSql.query(f'select count(*) from information_schema.ins_tables where db_name = "{self.dbname}"')
tdSql.checkEqual(tdSql.queryResult[0][0],self.tbnum)
tdSql.query(f'select count(*) from information_schema.ins_tables where db_name = "{self.dbname}" and stable_name = "{self.stbname}"')
tdSql.checkEqual(tdSql.queryResult[0][0],self.tbnum)
tdSql.execute('create database db1')
tdSql.execute('create table stb1 (ts timestamp,c0 int) tags(t0 int)')
tdSql.execute('create table tb1 using stb1 tags(1)')
tdSql.query(f'select db_name, stable_name, count(*) from information_schema.ins_tables group by db_name, stable_name')
for i in tdSql.queryResult:
if i[0].lower() == 'information_schema':
tdSql.checkEqual(i[2],len(self.ins_list))
elif i[0].lower() == self.dbname and i[1] == self.stbname:
tdSql.checkEqual(i[2],self.tbnum)
elif i[0].lower() == self.dbname and i[1] == 'stb1':
tdSql.checkEqual(i[2],1)
elif i[0].lower() == 'performance_schema':
tdSql.checkEqual(i[2],len(self.perf_list))
tdSql.execute('create table db1.ntb (ts timestamp,c0 int)')
tdSql.query(f'select db_name, count(*) from information_schema.ins_tables group by db_name')
print(tdSql.queryResult)
for i in tdSql.queryResult:
if i[0].lower() == 'information_schema':
tdSql.checkEqual(i[1],len(self.ins_list))
elif i[0].lower() == 'performance_schema':
tdSql.checkEqual(i[1],len(self.perf_list))
elif i[0].lower() == self.dbname:
tdSql.checkEqual(i[1],self.tbnum+1)
def run(self):
self.prepare_data()
self.count_check()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
...@@ -112,7 +112,8 @@ class TDTestCase: ...@@ -112,7 +112,8 @@ class TDTestCase:
dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1] dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1]
cmd = f" taos -h {dnode_first_host} -P {dnode_first_port} -s ' create dnode \"{dnode_id} \" ' ;" cmd = f" taos -h {dnode_first_host} -P {dnode_first_port} -s ' create dnode \"{dnode_id} \" ' ;"
tdLog.debug(cmd) tdLog.debug(cmd)
os.system(cmd) if os.system(cmd) != 0:
raise Exception("failed to execute system command. cmd: %s" % cmd)
time.sleep(2) time.sleep(2)
tdLog.info(" create cluster with %d dnode done! " %dnodes_nums) tdLog.info(" create cluster with %d dnode done! " %dnodes_nums)
...@@ -292,6 +293,8 @@ class TDTestCase: ...@@ -292,6 +293,8 @@ class TDTestCase:
tdLog.debug("drop mnode %d successfully"%(i+1)) tdLog.debug("drop mnode %d successfully"%(i+1))
break break
count+=1 count+=1
self.wait_for_transactions(20)
tdLog.debug("create mnode on dnode %d"%(i+1)) tdLog.debug("create mnode on dnode %d"%(i+1))
tdSql.execute("create mnode on dnode %d"%(i+1)) tdSql.execute("create mnode on dnode %d"%(i+1))
count=0 count=0
...@@ -299,12 +302,24 @@ class TDTestCase: ...@@ -299,12 +302,24 @@ class TDTestCase:
time.sleep(1) time.sleep(1)
tdSql.query("select * from information_schema.ins_mnodes;") tdSql.query("select * from information_schema.ins_mnodes;")
if tdSql.checkRows(3): if tdSql.checkRows(3):
tdLog.debug("drop mnode %d successfully"%(i+1)) tdLog.debug("create mnode %d successfully"%(i+1))
break break
count+=1 count+=1
self.wait_for_transactions(20)
dropcount+=1 dropcount+=1
self.check3mnode() self.check3mnode()
def wait_for_transactions(self, timeout):
count=0
while count<timeout:
time.sleep(1)
tdSql.query("show transactions;")
if tdSql.checkRows(0):
tdLog.debug("transactions completed successfully")
break
count+=1
if count >= timeout:
tdLog.debug("transactions not finished before timeout (%d secs)", timeout)
def getConnection(self, dnode): def getConnection(self, dnode):
host = dnode.cfgDict["fqdn"] host = dnode.cfgDict["fqdn"]
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册