Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
45ec75a5
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
45ec75a5
编写于
1月 11, 2023
作者:
S
Shengliang Guan
提交者:
GitHub
1月 11, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #19472 from taosdata/fix/long_query
Fix/long_query
上级
e96da59c
ba68d5d7
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
480 addition
and
216 deletion
+480
-216
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+5
-16
source/dnode/vnode/src/inc/vnd.h
source/dnode/vnode/src/inc/vnd.h
+13
-2
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+29
-8
source/dnode/vnode/src/tsdb/tsdbCacheRead.c
source/dnode/vnode/src/tsdb/tsdbCacheRead.c
+14
-6
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+2
-2
source/dnode/vnode/src/tsdb/tsdbMemTable.c
source/dnode/vnode/src/tsdb/tsdbMemTable.c
+39
-54
source/dnode/vnode/src/tsdb/tsdbOpen.c
source/dnode/vnode/src/tsdb/tsdbOpen.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+90
-44
source/dnode/vnode/src/vnd/vnodeBufPool.c
source/dnode/vnode/src/vnd/vnodeBufPool.c
+141
-58
source/dnode/vnode/src/vnd/vnodeCommit.c
source/dnode/vnode/src/vnd/vnodeCommit.c
+140
-21
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+1
-1
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+3
-3
source/util/src/terror.c
source/util/src/terror.c
+1
-0
未找到文件。
include/util/taoserror.h
浏览文件 @
45ec75a5
...
...
@@ -415,6 +415,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_NO_AVAIL_BUFPOOL TAOS_DEF_ERROR_CODE(0, 0x0528)
#define TSDB_CODE_VND_STOPPED TAOS_DEF_ERROR_CODE(0, 0x0529)
#define TSDB_CODE_VND_DUP_REQUEST TAOS_DEF_ERROR_CODE(0, 0x0530)
#define TSDB_CODE_VND_QUERY_BUSY TAOS_DEF_ERROR_CODE(0, 0x0531)
// tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
...
...
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
45ec75a5
...
...
@@ -65,7 +65,6 @@ typedef struct SSmaInfo SSmaInfo;
typedef
struct
SBlockCol
SBlockCol
;
typedef
struct
SVersionRange
SVersionRange
;
typedef
struct
SLDataIter
SLDataIter
;
typedef
struct
SQueryNode
SQueryNode
;
typedef
struct
SDiskCol
SDiskCol
;
typedef
struct
SDiskData
SDiskData
;
typedef
struct
SDiskDataBuilder
SDiskDataBuilder
;
...
...
@@ -209,13 +208,11 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in
uint8_t
**
ppBuf
);
// tsdbMemTable ==============================================================================================
// SMemTable
typedef
int32_t
(
*
_tsdb_reseek_func_t
)(
void
*
pQHandle
);
int32_t
tsdbMemTableCreate
(
STsdb
*
pTsdb
,
SMemTable
**
ppMemTable
);
void
tsdbMemTableDestroy
(
SMemTable
*
pMemTable
);
void
tsdbMemTableDestroy
(
SMemTable
*
pMemTable
,
bool
proactive
);
STbData
*
tsdbGetTbDataFromMemTable
(
SMemTable
*
pMemTable
,
tb_uid_t
suid
,
tb_uid_t
uid
);
int32_t
tsdbRefMemTable
(
SMemTable
*
pMemTable
,
void
*
pQHandle
,
_tsdb_reseek_func_t
reseek
,
SQueryNode
**
pp
Node
);
int32_t
tsdbUnrefMemTable
(
SMemTable
*
pMemTable
,
SQueryNode
*
pNode
);
int32_t
tsdbRefMemTable
(
SMemTable
*
pMemTable
,
SQueryNode
*
pQ
Node
);
int32_t
tsdbUnrefMemTable
(
SMemTable
*
pMemTable
,
SQueryNode
*
pNode
,
bool
proactive
);
SArray
*
tsdbMemTableGetTbDataArray
(
SMemTable
*
pMemTable
);
// STbDataIter
int32_t
tsdbTbDataIterCreate
(
STbData
*
pTbData
,
TSDBKEY
*
pFrom
,
int8_t
backward
,
STbDataIter
**
ppIter
);
...
...
@@ -293,8 +290,8 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader);
int32_t
tsdbReadDelData
(
SDelFReader
*
pReader
,
SDelIdx
*
pDelIdx
,
SArray
*
aDelData
);
int32_t
tsdbReadDelIdx
(
SDelFReader
*
pReader
,
SArray
*
aDelIdx
);
// tsdbRead.c ==============================================================================================
int32_t
tsdbTakeReadSnap
(
STsdbReader
*
pReader
,
_
tsdb
_reseek_func_t
reseek
,
STsdbReadSnap
**
ppSnap
);
void
tsdbUntakeReadSnap
(
STsdbReader
*
pReader
,
STsdbReadSnap
*
pSnap
);
int32_t
tsdbTakeReadSnap
(
STsdbReader
*
pReader
,
_
query
_reseek_func_t
reseek
,
STsdbReadSnap
**
ppSnap
);
void
tsdbUntakeReadSnap
(
STsdbReader
*
pReader
,
STsdbReadSnap
*
pSnap
,
bool
proactive
);
// tsdbMerge.c ==============================================================================================
int32_t
tsdbMerge
(
STsdb
*
pTsdb
);
...
...
@@ -369,13 +366,6 @@ struct STbData {
STbData
*
next
;
};
struct
SQueryNode
{
SQueryNode
*
pNext
;
SQueryNode
**
ppNext
;
void
*
pQHandle
;
_tsdb_reseek_func_t
reseek
;
};
struct
SMemTable
{
SRWLatch
latch
;
STsdb
*
pTsdb
;
...
...
@@ -392,7 +382,6 @@ struct SMemTable {
int32_t
nBucket
;
STbData
**
aBucket
;
};
SQueryNode
qList
;
};
struct
TSDBROW
{
...
...
source/dnode/vnode/src/inc/vnd.h
浏览文件 @
45ec75a5
...
...
@@ -61,10 +61,19 @@ struct SVBufPoolNode {
};
struct
SVBufPool
{
SVBufPool
*
next
;
SVBufPool
*
freeNext
;
SVBufPool
*
recycleNext
;
SVBufPool
*
recyclePrev
;
// query handle list
TdThreadMutex
mutex
;
int32_t
nQuery
;
SQueryNode
qList
;
SVnode
*
pVnode
;
TdThreadSpinlock
*
lock
;
int32_t
id
;
volatile
int32_t
nRef
;
TdThreadSpinlock
*
lock
;
int64_t
size
;
uint8_t
*
ptr
;
SVBufPoolNode
*
pTail
;
...
...
@@ -74,6 +83,8 @@ struct SVBufPool {
int32_t
vnodeOpenBufPool
(
SVnode
*
pVnode
);
int32_t
vnodeCloseBufPool
(
SVnode
*
pVnode
);
void
vnodeBufPoolReset
(
SVBufPool
*
pPool
);
void
vnodeBufPoolAddToFreeList
(
SVBufPool
*
pPool
);
int32_t
vnodeBufPoolRecycle
(
SVBufPool
*
pPool
);
// vnodeQuery.c
int32_t
vnodeQueryOpen
(
SVnode
*
pVnode
);
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
45ec75a5
...
...
@@ -76,6 +76,7 @@ typedef struct SRSmaSnapReader SRSmaSnapReader;
typedef
struct
SRSmaSnapWriter
SRSmaSnapWriter
;
typedef
struct
SSnapDataHdr
SSnapDataHdr
;
typedef
struct
SCommitInfo
SCommitInfo
;
typedef
struct
SQueryNode
SQueryNode
;
#define VNODE_META_DIR "meta"
#define VNODE_TSDB_DIR "tsdb"
...
...
@@ -87,17 +88,29 @@ typedef struct SCommitInfo SCommitInfo;
#define VNODE_RSMA1_DIR "rsma1"
#define VNODE_RSMA2_DIR "rsma2"
#define VNODE_BUFPOOL_SEGMENTS 3
#define VND_INFO_FNAME "vnode.json"
// vnd.h
typedef
int32_t
(
*
_query_reseek_func_t
)(
void
*
pQHandle
);
struct
SQueryNode
{
SQueryNode
*
pNext
;
SQueryNode
**
ppNext
;
void
*
pQHandle
;
_query_reseek_func_t
reseek
;
};
void
*
vnodeBufPoolMalloc
(
SVBufPool
*
pPool
,
int
size
);
void
*
vnodeBufPoolMallocAligned
(
SVBufPool
*
pPool
,
int
size
);
void
vnodeBufPoolFree
(
SVBufPool
*
pPool
,
void
*
p
);
void
vnodeBufPoolRef
(
SVBufPool
*
pPool
);
void
vnodeBufPoolUnRef
(
SVBufPool
*
pPool
);
void
vnodeBufPoolUnRef
(
SVBufPool
*
pPool
,
bool
proactive
);
int
vnodeDecodeInfo
(
uint8_t
*
pData
,
SVnodeInfo
*
pInfo
);
int32_t
vnodeBufPoolRegisterQuery
(
SVBufPool
*
pPool
,
SQueryNode
*
pQNode
);
void
vnodeBufPoolDeregisterQuery
(
SVBufPool
*
pPool
,
SQueryNode
*
pQNode
,
bool
proactive
);
// meta
typedef
struct
SMCtbCursor
SMCtbCursor
;
typedef
struct
SMStbCursor
SMStbCursor
;
...
...
@@ -331,16 +344,24 @@ typedef struct SVCommitSched {
}
SVCommitSched
;
struct
SVnode
{
char
*
path
;
SVnodeCfg
config
;
SVState
state
;
SVStatis
statis
;
STfs
*
pTfs
;
SMsgCb
msgCb
;
char
*
path
;
SVnodeCfg
config
;
SVState
state
;
SVStatis
statis
;
STfs
*
pTfs
;
SMsgCb
msgCb
;
// Buffer Pool
TdThreadMutex
mutex
;
TdThreadCond
poolNotEmpty
;
SVBufPool
*
pPool
;
SVBufPool
*
aBufPool
[
VNODE_BUFPOOL_SEGMENTS
];
SVBufPool
*
freeList
;
SVBufPool
*
inUse
;
SVBufPool
*
onCommit
;
SVBufPool
*
recycleHead
;
SVBufPool
*
recycleTail
;
SVBufPool
*
onRecycle
;
SMeta
*
pMeta
;
SSma
*
pSma
;
STsdb
*
pTsdb
;
...
...
source/dnode/vnode/src/tsdb/tsdbCacheRead.c
浏览文件 @
45ec75a5
...
...
@@ -208,13 +208,21 @@ static int32_t tsdbCacheQueryReseek(void* pQHandle) {
int32_t
code
=
0
;
SCacheRowsReader
*
pReader
=
pQHandle
;
taosThreadMutexLock
(
&
pReader
->
readerMutex
);
code
=
taosThreadMutexTryLock
(
&
pReader
->
readerMutex
);
if
(
code
==
0
)
{
// pause current reader's state if not paused, save ts & version for resuming
// just wait for the big all tables' snapshot untaking for now
// pause current reader's state if not paused, save ts & version for resuming
// just wait for the big all tables' snapshot untaking for now
code
=
TSDB_CODE_VND_QUERY_BUSY
;
taosThreadMutexUnlock
(
&
pReader
->
readerMutex
);
return
code
;
taosThreadMutexUnlock
(
&
pReader
->
readerMutex
);
return
code
;
}
else
if
(
code
==
EBUSY
)
{
return
TSDB_CODE_VND_QUERY_BUSY
;
}
else
{
return
-
1
;
}
}
int32_t
tsdbRetrieveCacheRows
(
void
*
pReader
,
SSDataBlock
*
pResBlock
,
const
int32_t
*
slotIds
,
SArray
*
pTableUidList
)
{
...
...
@@ -375,7 +383,7 @@ _end:
tsdbDataFReaderClose
(
&
pr
->
pDataFReader
);
resetLastBlockLoadInfo
(
pr
->
pLoadInfo
);
tsdbUntakeReadSnap
((
STsdbReader
*
)
pr
,
pr
->
pReadSnap
);
tsdbUntakeReadSnap
((
STsdbReader
*
)
pr
,
pr
->
pReadSnap
,
true
);
taosThreadMutexUnlock
(
&
pr
->
readerMutex
);
for
(
int32_t
j
=
0
;
j
<
pr
->
numOfCols
;
++
j
)
{
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
45ec75a5
...
...
@@ -175,7 +175,7 @@ int32_t tsdbCommit(STsdb *pTsdb, SCommitInfo *pInfo) {
pTsdb
->
imem
=
NULL
;
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
tsdbUnrefMemTable
(
pMemTable
,
NULL
);
tsdbUnrefMemTable
(
pMemTable
,
NULL
,
true
);
goto
_exit
;
}
...
...
@@ -1664,7 +1664,7 @@ int32_t tsdbFinishCommit(STsdb *pTsdb) {
// unlock
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
if
(
pMemTable
)
{
tsdbUnrefMemTable
(
pMemTable
,
NULL
);
tsdbUnrefMemTable
(
pMemTable
,
NULL
,
true
);
}
_exit:
...
...
source/dnode/vnode/src/tsdb/tsdbMemTable.c
浏览文件 @
45ec75a5
...
...
@@ -64,8 +64,8 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
taosMemoryFree
(
pMemTable
);
goto
_err
;
}
pMemTable
->
qList
.
pNext
=
&
pMemTable
->
qList
;
pMemTable
->
qList
.
ppNext
=
&
pMemTable
->
qList
.
pNext
;
//
pMemTable->qList.pNext = &pMemTable->qList;
//
pMemTable->qList.ppNext = &pMemTable->qList.pNext;
vnodeBufPoolRef
(
pMemTable
->
pPool
);
*
ppMemTable
=
pMemTable
;
...
...
@@ -76,9 +76,9 @@ _err:
return
code
;
}
void
tsdbMemTableDestroy
(
SMemTable
*
pMemTable
)
{
void
tsdbMemTableDestroy
(
SMemTable
*
pMemTable
,
bool
proactive
)
{
if
(
pMemTable
)
{
vnodeBufPoolUnRef
(
pMemTable
->
pPool
);
vnodeBufPoolUnRef
(
pMemTable
->
pPool
,
proactive
);
taosMemoryFree
(
pMemTable
->
aBucket
);
taosMemoryFree
(
pMemTable
);
}
...
...
@@ -749,42 +749,27 @@ _exit:
int32_t
tsdbGetNRowsInTbData
(
STbData
*
pTbData
)
{
return
pTbData
->
sl
.
size
;
}
int32_t
tsdbRefMemTable
(
SMemTable
*
pMemTable
,
void
*
pQHandle
,
_tsdb_reseek_func_t
reseek
,
SQueryNode
**
pp
Node
)
{
int32_t
tsdbRefMemTable
(
SMemTable
*
pMemTable
,
SQueryNode
*
pQ
Node
)
{
int32_t
code
=
0
;
int32_t
nRef
=
atomic_fetch_add_32
(
&
pMemTable
->
nRef
,
1
);
ASSERT
(
nRef
>
0
);
/*
// register handle (todo: take concurrency in consideration)
*ppNode = taosMemoryMalloc(sizeof(SQueryNode));
if (*ppNode == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
(*ppNode)->pQHandle = pQHandle;
(*ppNode)->reseek = reseek;
(*ppNode)->pNext = pMemTable->qList.pNext;
(*ppNode)->ppNext = &pMemTable->qList.pNext;
pMemTable->qList.pNext->ppNext = &(*ppNode)->pNext;
pMemTable->qList.pNext = *ppNode;
*/
vnodeBufPoolRegisterQuery
(
pMemTable
->
pPool
,
pQNode
);
_exit:
return
code
;
}
int32_t
tsdbUnrefMemTable
(
SMemTable
*
pMemTable
,
SQueryNode
*
pNode
)
{
int32_t
tsdbUnrefMemTable
(
SMemTable
*
pMemTable
,
SQueryNode
*
pNode
,
bool
proactive
)
{
int32_t
code
=
0
;
/*
// unregister handle (todo: take concurrency in consideration)
if
(
pNode
)
{
pNode->pNext->ppNext = pNode->ppNext;
*pNode->ppNext = pNode->pNext;
taosMemoryFree(pNode);
vnodeBufPoolDeregisterQuery
(
pMemTable
->
pPool
,
pNode
,
proactive
);
}
*/
int32_t
nRef
=
atomic_sub_fetch_32
(
&
pMemTable
->
nRef
,
1
);
if
(
nRef
==
0
)
{
tsdbMemTableDestroy
(
pMemTable
);
if
(
atomic_sub_fetch_32
(
&
pMemTable
->
nRef
,
1
)
==
0
)
{
tsdbMemTableDestroy
(
pMemTable
,
proactive
);
}
return
code
;
...
...
@@ -828,28 +813,28 @@ _exit:
return
aTbDataP
;
}
int32_t
tsdbRecycleMemTable
(
SMemTable
*
pMemTable
)
{
int32_t
code
=
0
;
SQueryNode
*
pNode
=
pMemTable
->
qList
.
pNext
;
while
(
1
)
{
ASSERT
(
pNode
!=
&
pMemTable
->
qList
);
SQueryNode
*
pNextNode
=
pNode
->
pNext
;
if
(
pNextNode
==
&
pMemTable
->
qList
)
{
code
=
(
*
pNode
->
reseek
)(
pNode
->
pQHandle
);
if
(
code
)
goto
_exit
;
break
;
}
else
{
code
=
(
*
pNode
->
reseek
)(
pNode
->
pQHandle
);
if
(
code
)
goto
_exit
;
pNode
=
pMemTable
->
qList
.
pNext
;
ASSERT
(
pNode
==
pNextNode
);
}
}
// NOTE: Take care here, pMemTable is destroyed
_exit:
return
code
;
}
//
int32_t tsdbRecycleMemTable(SMemTable *pMemTable) {
//
int32_t code = 0;
//
SQueryNode *pNode = pMemTable->qList.pNext;
//
while (1) {
//
ASSERT(pNode != &pMemTable->qList);
//
SQueryNode *pNextNode = pNode->pNext;
//
if (pNextNode == &pMemTable->qList) {
//
code = (*pNode->reseek)(pNode->pQHandle);
//
if (code) goto _exit;
//
break;
//
} else {
//
code = (*pNode->reseek)(pNode->pQHandle);
//
if (code) goto _exit;
//
pNode = pMemTable->qList.pNext;
//
ASSERT(pNode == pNextNode);
//
}
//
}
//
// NOTE: Take care here, pMemTable is destroyed
//
_exit:
//
return code;
//
}
source/dnode/vnode/src/tsdb/tsdbOpen.c
浏览文件 @
45ec75a5
...
...
@@ -88,7 +88,7 @@ _err:
int
tsdbClose
(
STsdb
**
pTsdb
)
{
if
(
*
pTsdb
)
{
taosThreadRwlockWrlock
(
&
(
*
pTsdb
)
->
rwLock
);
tsdbMemTableDestroy
((
*
pTsdb
)
->
mem
);
tsdbMemTableDestroy
((
*
pTsdb
)
->
mem
,
true
);
(
*
pTsdb
)
->
mem
=
NULL
;
taosThreadRwlockUnlock
(
&
(
*
pTsdb
)
->
rwLock
);
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
45ec75a5
...
...
@@ -3953,7 +3953,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
}
qTrace
(
"tsdb/reader: %p, untake snapshot"
,
pReader
);
tsdbUntakeReadSnap
(
pReader
,
pReader
->
pReadSnap
);
tsdbUntakeReadSnap
(
pReader
,
pReader
->
pReadSnap
,
true
);
taosThreadMutexDestroy
(
&
pReader
->
readerMutex
);
...
...
@@ -4001,7 +4001,8 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
if
(
pStatus
->
loadFromFile
)
{
SFileDataBlockInfo
*
pBlockInfo
=
getCurrentBlockInfo
(
&
pReader
->
status
.
blockIter
);
if
(
pBlockInfo
!=
NULL
)
{
pBlockScanInfo
=
taosHashGet
(
pStatus
->
pTableMap
,
&
pBlockInfo
->
uid
,
sizeof
(
pBlockInfo
->
uid
));
pBlockScanInfo
=
*
(
STableBlockScanInfo
**
)
taosHashGet
(
pStatus
->
pTableMap
,
&
pBlockInfo
->
uid
,
sizeof
(
pBlockInfo
->
uid
));
if
(
pBlockScanInfo
==
NULL
)
{
code
=
TSDB_CODE_INVALID_PARA
;
tsdbError
(
"failed to locate the uid:%"
PRIu64
" in query table uid list, total tables:%d, %s"
,
pBlockInfo
->
uid
,
...
...
@@ -4015,20 +4016,28 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
tsdbDataFReaderClose
(
&
pReader
->
pFileReader
);
// resetDataBlockScanInfo excluding lastKey
STableBlockScanInfo
*
p
=
NULL
;
STableBlockScanInfo
*
*
p
=
NULL
;
while
((
p
=
taosHashIterate
(
pStatus
->
pTableMap
,
p
))
!=
NULL
)
{
p
->
iterInit
=
false
;
p
->
iiter
.
hasVal
=
false
;
if
(
p
->
iter
.
iter
!=
NULL
)
{
p
->
iter
.
iter
=
tsdbTbDataIterDestroy
(
p
->
iter
.
iter
);
STableBlockScanInfo
*
pInfo
=
*
(
STableBlockScanInfo
**
)
p
;
pInfo
->
iterInit
=
false
;
pInfo
->
iter
.
hasVal
=
false
;
pInfo
->
iiter
.
hasVal
=
false
;
if
(
pInfo
->
iter
.
iter
!=
NULL
)
{
pInfo
->
iter
.
iter
=
tsdbTbDataIterDestroy
(
pInfo
->
iter
.
iter
);
}
if
(
pInfo
->
iiter
.
iter
!=
NULL
)
{
pInfo
->
iiter
.
iter
=
tsdbTbDataIterDestroy
(
pInfo
->
iiter
.
iter
);
}
p
->
delSkyline
=
taosArrayDestroy
(
p
->
delSkyline
);
// p->lastKey = ts;
p
Info
->
delSkyline
=
taosArrayDestroy
(
pInfo
->
delSkyline
);
// p
Info
->lastKey = ts;
}
}
else
{
pBlockScanInfo
=
*
pStatus
->
pTableIter
;
pBlockScanInfo
=
pStatus
->
pTableIter
==
NULL
?
NULL
:
*
pStatus
->
pTableIter
;
if
(
pBlockScanInfo
)
{
// save lastKey to restore memory iterator
STimeWindow
w
=
pReader
->
pResBlock
->
info
.
window
;
...
...
@@ -4052,11 +4061,13 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
}
}
tsdbUntakeReadSnap
(
pReader
,
pReader
->
pReadSnap
);
tsdbUntakeReadSnap
(
pReader
,
pReader
->
pReadSnap
,
false
);
pReader
->
pReadSnap
=
NULL
;
pReader
->
suspended
=
true
;
tsdbDebug
(
"reader: %p suspended uid %"
PRIu64
" in this query %s"
,
pReader
,
pBlockScanInfo
->
uid
,
pReader
->
idStr
);
tsdbDebug
(
"reader: %p suspended uid %"
PRIu64
" in this query %s"
,
pReader
,
pBlockScanInfo
?
pBlockScanInfo
->
uid
:
0
,
pReader
->
idStr
);
return
code
;
_err:
...
...
@@ -4068,18 +4079,24 @@ static int32_t tsdbSetQueryReseek(void* pQHandle) {
int32_t
code
=
0
;
STsdbReader
*
pReader
=
pQHandle
;
taosThreadMutexLock
(
&
pReader
->
readerMutex
);
code
=
taosThreadMutexTryLock
(
&
pReader
->
readerMutex
);
if
(
code
==
0
)
{
if
(
pReader
->
suspended
)
{
taosThreadMutexUnlock
(
&
pReader
->
readerMutex
);
return
code
;
}
tsdbReaderSuspend
(
pReader
);
if
(
pReader
->
suspended
)
{
taosThreadMutexUnlock
(
&
pReader
->
readerMutex
);
return
code
;
}
else
if
(
code
==
EBUSY
)
{
return
TSDB_CODE_VND_QUERY_BUSY
;
}
else
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
return
TSDB_CODE_FAILED
;
}
tsdbReaderSuspend
(
pReader
);
taosThreadMutexUnlock
(
&
pReader
->
readerMutex
);
return
code
;
}
int32_t
tsdbReaderResume
(
STsdbReader
*
pReader
)
{
...
...
@@ -4428,17 +4445,18 @@ SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
int32_t
tsdbReaderReset
(
STsdbReader
*
pReader
,
SQueryTableDataCond
*
pCond
)
{
SReaderStatus
*
pStatus
=
&
pReader
->
status
;
qTrace
(
"tsdb/read: %p, take read mutex"
,
pReader
);
qTrace
(
"tsdb/read
er-reset
: %p, take read mutex"
,
pReader
);
taosThreadMutexLock
(
&
pReader
->
readerMutex
);
if
(
pReader
->
suspended
)
{
tsdbReaderResume
(
pReader
);
}
taosThreadMutexUnlock
(
&
pReader
->
readerMutex
);
if
(
isEmptyQueryTimeWindow
(
&
pReader
->
window
)
||
pReader
->
pReadSnap
==
NULL
)
{
tsdbDebug
(
"tsdb reader reset return %p"
,
pReader
->
pReadSnap
);
taosThreadMutexUnlock
(
&
pReader
->
readerMutex
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -4474,6 +4492,9 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tsdbError
(
"%p reset reader failed, numOfTables:%d, query range:%"
PRId64
" - %"
PRId64
" in query %s"
,
pReader
,
numOfTables
,
pReader
->
window
.
skey
,
pReader
->
window
.
ekey
,
pReader
->
idStr
);
taosThreadMutexUnlock
(
&
pReader
->
readerMutex
);
return
code
;
}
}
...
...
@@ -4483,6 +4504,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
pReader
,
pReader
->
suid
,
numOfTables
,
pCond
->
twindows
.
skey
,
pReader
->
window
.
skey
,
pReader
->
window
.
ekey
,
pReader
->
idStr
);
taosThreadMutexUnlock
(
&
pReader
->
readerMutex
);
return
code
;
}
...
...
@@ -4639,68 +4662,91 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdbTakeReadSnap
(
STsdbReader
*
pReader
,
_
tsdb
_reseek_func_t
reseek
,
STsdbReadSnap
**
ppSnap
)
{
int32_t
tsdbTakeReadSnap
(
STsdbReader
*
pReader
,
_
query
_reseek_func_t
reseek
,
STsdbReadSnap
**
ppSnap
)
{
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pReader
->
pTsdb
;
SVersionRange
*
pRange
=
&
pReader
->
verRange
;
// alloc
*
ppSnap
=
(
STsdbReadSnap
*
)
taosMemoryCalloc
(
1
,
sizeof
(
STsdbRead
Snap
));
if
(
*
p
pSnap
==
NULL
)
{
STsdbReadSnap
*
pSnap
=
(
STsdbReadSnap
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
p
Snap
));
if
(
pSnap
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_exit
;
}
// lock
code
=
taosThreadRwlockRdlock
(
&
pTsdb
->
rwLock
);
if
(
code
)
{
code
=
TAOS_SYSTEM_ERROR
(
code
);
goto
_exit
;
}
taosThreadRwlockRdlock
(
&
pTsdb
->
rwLock
);
// take snapshot
if
(
pTsdb
->
mem
&&
(
pRange
->
minVer
<=
pTsdb
->
mem
->
maxVer
&&
pRange
->
maxVer
>=
pTsdb
->
mem
->
minVer
))
{
tsdbRefMemTable
(
pTsdb
->
mem
,
pReader
,
reseek
,
&
(
*
ppSnap
)
->
pNode
);
(
*
ppSnap
)
->
pMem
=
pTsdb
->
mem
;
pSnap
->
pMem
=
pTsdb
->
mem
;
pSnap
->
pNode
=
taosMemoryMalloc
(
sizeof
(
*
pSnap
->
pNode
));
if
(
pSnap
->
pNode
==
NULL
)
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_exit
;
}
pSnap
->
pNode
->
pQHandle
=
pReader
;
pSnap
->
pNode
->
reseek
=
reseek
;
tsdbRefMemTable
(
pTsdb
->
mem
,
pSnap
->
pNode
);
}
if
(
pTsdb
->
imem
&&
(
pRange
->
minVer
<=
pTsdb
->
imem
->
maxVer
&&
pRange
->
maxVer
>=
pTsdb
->
imem
->
minVer
))
{
tsdbRefMemTable
(
pTsdb
->
imem
,
pReader
,
reseek
,
&
(
*
ppSnap
)
->
pINode
);
(
*
ppSnap
)
->
pIMem
=
pTsdb
->
imem
;
pSnap
->
pIMem
=
pTsdb
->
imem
;
pSnap
->
pINode
=
taosMemoryMalloc
(
sizeof
(
*
pSnap
->
pINode
));
if
(
pSnap
->
pINode
==
NULL
)
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_exit
;
}
pSnap
->
pINode
->
pQHandle
=
pReader
;
pSnap
->
pINode
->
reseek
=
reseek
;
tsdbRefMemTable
(
pTsdb
->
imem
,
pSnap
->
pINode
);
}
// fs
code
=
tsdbFSRef
(
pTsdb
,
&
(
*
ppSnap
)
->
fs
);
code
=
tsdbFSRef
(
pTsdb
,
&
pSnap
->
fs
);
if
(
code
)
{
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
goto
_exit
;
}
// unlock
code
=
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
if
(
code
)
{
code
=
TAOS_SYSTEM_ERROR
(
code
);
goto
_exit
;
}
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
tsdbTrace
(
"vgId:%d, take read snapshot"
,
TD_VID
(
pTsdb
->
pVnode
));
_exit:
if
(
code
)
{
*
ppSnap
=
NULL
;
if
(
pSnap
)
{
if
(
pSnap
->
pNode
)
taosMemoryFree
(
pSnap
->
pNode
);
if
(
pSnap
->
pINode
)
taosMemoryFree
(
pSnap
->
pINode
);
taosMemoryFree
(
pSnap
);
}
}
else
{
*
ppSnap
=
pSnap
;
}
return
code
;
}
void
tsdbUntakeReadSnap
(
STsdbReader
*
pReader
,
STsdbReadSnap
*
pSnap
)
{
void
tsdbUntakeReadSnap
(
STsdbReader
*
pReader
,
STsdbReadSnap
*
pSnap
,
bool
proactive
)
{
STsdb
*
pTsdb
=
pReader
->
pTsdb
;
if
(
pSnap
)
{
if
(
pSnap
->
pMem
)
{
tsdbUnrefMemTable
(
pSnap
->
pMem
,
pSnap
->
pNode
);
tsdbUnrefMemTable
(
pSnap
->
pMem
,
pSnap
->
pNode
,
proactive
);
}
if
(
pSnap
->
pIMem
)
{
tsdbUnrefMemTable
(
pSnap
->
pIMem
,
pSnap
->
pINode
);
tsdbUnrefMemTable
(
pSnap
->
pIMem
,
pSnap
->
pINode
,
proactive
);
}
tsdbFSUnref
(
pTsdb
,
&
pSnap
->
fs
);
if
(
pSnap
->
pNode
)
taosMemoryFree
(
pSnap
->
pNode
);
if
(
pSnap
->
pINode
)
taosMemoryFree
(
pSnap
->
pINode
);
taosMemoryFree
(
pSnap
);
}
tsdbTrace
(
"vgId:%d, untake read snapshot"
,
TD_VID
(
pTsdb
->
pVnode
));
...
...
source/dnode/vnode/src/vnd/vnodeBufPool.c
浏览文件 @
45ec75a5
...
...
@@ -16,9 +16,7 @@
#include "vnd.h"
/* ------------------------ STRUCTURES ------------------------ */
#define VNODE_BUFPOOL_SEGMENTS 3
static
int
vnodeBufPoolCreate
(
SVnode
*
pVnode
,
int64_t
size
,
SVBufPool
**
ppPool
)
{
static
int
vnodeBufPoolCreate
(
SVnode
*
pVnode
,
int32_t
id
,
int64_t
size
,
SVBufPool
**
ppPool
)
{
SVBufPool
*
pPool
;
pPool
=
taosMemoryMalloc
(
sizeof
(
SVBufPool
)
+
size
);
...
...
@@ -26,6 +24,21 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool)
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
memset
(
pPool
,
0
,
sizeof
(
SVBufPool
));
// query handle list
taosThreadMutexInit
(
&
pPool
->
mutex
,
NULL
);
pPool
->
nQuery
=
0
;
pPool
->
qList
.
pNext
=
&
pPool
->
qList
;
pPool
->
qList
.
ppNext
=
&
pPool
->
qList
.
pNext
;
pPool
->
pVnode
=
pVnode
;
pPool
->
id
=
id
;
pPool
->
ptr
=
pPool
->
node
.
data
;
pPool
->
pTail
=
&
pPool
->
node
;
pPool
->
node
.
prev
=
NULL
;
pPool
->
node
.
pnext
=
&
pPool
->
pTail
;
pPool
->
node
.
size
=
size
;
if
(
VND_IS_RSMA
(
pVnode
))
{
pPool
->
lock
=
taosMemoryMalloc
(
sizeof
(
TdThreadSpinlock
));
...
...
@@ -44,16 +57,6 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool)
pPool
->
lock
=
NULL
;
}
pPool
->
next
=
NULL
;
pPool
->
pVnode
=
pVnode
;
pPool
->
nRef
=
0
;
pPool
->
size
=
0
;
pPool
->
ptr
=
pPool
->
node
.
data
;
pPool
->
pTail
=
&
pPool
->
node
;
pPool
->
node
.
prev
=
NULL
;
pPool
->
node
.
pnext
=
&
pPool
->
pTail
;
pPool
->
node
.
size
=
size
;
*
ppPool
=
pPool
;
return
0
;
}
...
...
@@ -64,27 +67,25 @@ static int vnodeBufPoolDestroy(SVBufPool *pPool) {
taosThreadSpinDestroy
(
pPool
->
lock
);
taosMemoryFree
((
void
*
)
pPool
->
lock
);
}
taosThreadMutexDestroy
(
&
pPool
->
mutex
);
taosMemoryFree
(
pPool
);
return
0
;
}
int
vnodeOpenBufPool
(
SVnode
*
pVnode
)
{
SVBufPool
*
pPool
=
NULL
;
int64_t
size
=
pVnode
->
config
.
szBuf
/
VNODE_BUFPOOL_SEGMENTS
;
ASSERT
(
pVnode
->
pPool
==
NULL
);
int64_t
size
=
pVnode
->
config
.
szBuf
/
VNODE_BUFPOOL_SEGMENTS
;
for
(
int
i
=
0
;
i
<
VNODE_BUFPOOL_SEGMENTS
;
i
++
)
{
// create pool
if
(
vnodeBufPoolCreate
(
pVnode
,
size
,
&
pPool
))
{
if
(
vnodeBufPoolCreate
(
pVnode
,
i
,
size
,
&
pVnode
->
aBufPool
[
i
]
))
{
vError
(
"vgId:%d, failed to open vnode buffer pool since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
vnodeCloseBufPool
(
pVnode
);
return
-
1
;
}
// add
pool to vnode
p
Pool
->
next
=
pVnode
->
pPool
;
pVnode
->
pPool
=
pPool
;
// add
to free list
p
Vnode
->
aBufPool
[
i
]
->
freeNext
=
pVnode
->
freeList
;
pVnode
->
freeList
=
pVnode
->
aBufPool
[
i
]
;
}
vDebug
(
"vgId:%d, vnode buffer pool is opened, size:%"
PRId64
,
TD_VID
(
pVnode
),
size
);
...
...
@@ -92,23 +93,19 @@ int vnodeOpenBufPool(SVnode *pVnode) {
}
int
vnodeCloseBufPool
(
SVnode
*
pVnode
)
{
SVBufPool
*
pPool
;
for
(
pPool
=
pVnode
->
pPool
;
pPool
;
pPool
=
pVnode
->
pPool
)
{
pVnode
->
pPool
=
pPool
->
next
;
vnodeBufPoolDestroy
(
pPool
);
for
(
int32_t
i
=
0
;
i
<
VNODE_BUFPOOL_SEGMENTS
;
i
++
)
{
if
(
pVnode
->
aBufPool
[
i
])
{
vnodeBufPoolDestroy
(
pVnode
->
aBufPool
[
i
]);
pVnode
->
aBufPool
[
i
]
=
NULL
;
}
}
if
(
pVnode
->
inUse
)
{
vnodeBufPoolDestroy
(
pVnode
->
inUse
);
pVnode
->
inUse
=
NULL
;
}
vDebug
(
"vgId:%d, vnode buffer pool is closed"
,
TD_VID
(
pVnode
));
return
0
;
}
void
vnodeBufPoolReset
(
SVBufPool
*
pPool
)
{
ASSERT
(
pPool
->
nQuery
==
0
);
for
(
SVBufPoolNode
*
pNode
=
pPool
->
pTail
;
pNode
->
prev
;
pNode
=
pPool
->
pTail
)
{
ASSERT
(
pNode
->
pnext
==
&
pPool
->
pTail
);
pNode
->
prev
->
pnext
=
&
pPool
->
pTail
;
...
...
@@ -215,35 +212,121 @@ void vnodeBufPoolRef(SVBufPool *pPool) {
ASSERT
(
nRef
>
0
);
}
void
vnodeBufPoolUnRef
(
SVBufPool
*
pPool
)
{
if
(
pPool
==
NULL
)
{
return
;
}
int32_t
nRef
=
atomic_sub_fetch_32
(
&
pPool
->
nRef
,
1
);
if
(
nRef
==
0
)
{
SVnode
*
pVnode
=
pPool
->
pVnode
;
vnodeBufPoolReset
(
pPool
);
taosThreadMutexLock
(
&
pVnode
->
mutex
);
int64_t
size
=
pVnode
->
config
.
szBuf
/
VNODE_BUFPOOL_SEGMENTS
;
if
(
pPool
->
node
.
size
!=
size
)
{
SVBufPool
*
pPoolT
=
NULL
;
if
(
vnodeBufPoolCreate
(
pVnode
,
size
,
&
pPoolT
)
<
0
)
{
vWarn
(
"vgId:%d, try to change buf pools size from %"
PRId64
" to %"
PRId64
" since %s"
,
TD_VID
(
pVnode
),
pPool
->
node
.
size
,
size
,
tstrerror
(
errno
));
}
else
{
vnodeBufPoolDestroy
(
pPool
);
pPool
=
pPoolT
;
vDebug
(
"vgId:%d, change buf pools size from %"
PRId64
" to %"
PRId64
,
TD_VID
(
pVnode
),
pPool
->
node
.
size
,
size
);
}
void
vnodeBufPoolAddToFreeList
(
SVBufPool
*
pPool
)
{
SVnode
*
pVnode
=
pPool
->
pVnode
;
int64_t
size
=
pVnode
->
config
.
szBuf
/
VNODE_BUFPOOL_SEGMENTS
;
if
(
pPool
->
node
.
size
!=
size
)
{
SVBufPool
*
pNewPool
=
NULL
;
if
(
vnodeBufPoolCreate
(
pVnode
,
pPool
->
id
,
size
,
&
pNewPool
)
<
0
)
{
vWarn
(
"vgId:%d failed to change buffer pool of id %d size from %"
PRId64
" to %"
PRId64
" since %s"
,
TD_VID
(
pVnode
),
pPool
->
id
,
pPool
->
node
.
size
,
size
,
tstrerror
(
errno
));
}
else
{
vInfo
(
"vgId:%d buffer pool of id %d size changed from %"
PRId64
" to %"
PRId64
,
TD_VID
(
pVnode
),
pPool
->
id
,
pPool
->
node
.
size
,
size
);
vnodeBufPoolDestroy
(
pPool
);
pPool
=
pNewPool
;
pVnode
->
aBufPool
[
pPool
->
id
]
=
pPool
;
}
}
// add to free list
vDebug
(
"vgId:%d buffer pool %p of id %d is added to free list"
,
TD_VID
(
pVnode
),
pPool
,
pPool
->
id
);
vnodeBufPoolReset
(
pPool
);
pPool
->
freeNext
=
pVnode
->
freeList
;
pVnode
->
freeList
=
pPool
;
taosThreadCondSignal
(
&
pVnode
->
poolNotEmpty
);
}
void
vnodeBufPoolUnRef
(
SVBufPool
*
pPool
,
bool
proactive
)
{
if
(
pPool
==
NULL
)
return
;
pPool
->
next
=
pVnode
->
pPool
;
pVnode
->
pPool
=
pPool
;
taosThreadCondSignal
(
&
pVnode
->
poolNotEmpty
);
SVnode
*
pVnode
=
pPool
->
pVnode
;
taosThreadMutexUnlock
(
&
pVnode
->
mutex
);
if
(
proactive
)
taosThreadMutexLock
(
&
pVnode
->
mutex
);
if
(
atomic_sub_fetch_32
(
&
pPool
->
nRef
,
1
)
>
0
)
goto
_exit
;
// remove from recycle queue or on-recycle position
if
(
pVnode
->
onRecycle
==
pPool
)
{
pVnode
->
onRecycle
=
NULL
;
}
else
{
ASSERT
(
proactive
);
if
(
pPool
->
recyclePrev
)
{
pPool
->
recyclePrev
->
recycleNext
=
pPool
->
recycleNext
;
}
else
{
pVnode
->
recycleHead
=
pPool
->
recycleNext
;
}
if
(
pPool
->
recycleNext
)
{
pPool
->
recycleNext
->
recyclePrev
=
pPool
->
recyclePrev
;
}
else
{
pVnode
->
recycleTail
=
pPool
->
recyclePrev
;
}
pPool
->
recyclePrev
=
pPool
->
recycleNext
=
NULL
;
}
vnodeBufPoolAddToFreeList
(
pPool
);
_exit:
if
(
proactive
)
taosThreadMutexUnlock
(
&
pVnode
->
mutex
);
return
;
}
int32_t
vnodeBufPoolRegisterQuery
(
SVBufPool
*
pPool
,
SQueryNode
*
pQNode
)
{
int32_t
code
=
0
;
taosThreadMutexLock
(
&
pPool
->
mutex
);
pQNode
->
pNext
=
pPool
->
qList
.
pNext
;
pQNode
->
ppNext
=
&
pPool
->
qList
.
pNext
;
pPool
->
qList
.
pNext
->
ppNext
=
&
pQNode
->
pNext
;
pPool
->
qList
.
pNext
=
pQNode
;
pPool
->
nQuery
++
;
taosThreadMutexUnlock
(
&
pPool
->
mutex
);
_exit:
return
code
;
}
void
vnodeBufPoolDeregisterQuery
(
SVBufPool
*
pPool
,
SQueryNode
*
pQNode
,
bool
proactive
)
{
int32_t
code
=
0
;
if
(
proactive
)
taosThreadMutexLock
(
&
pPool
->
mutex
);
pQNode
->
pNext
->
ppNext
=
pQNode
->
ppNext
;
*
pQNode
->
ppNext
=
pQNode
->
pNext
;
pPool
->
nQuery
--
;
if
(
proactive
)
taosThreadMutexUnlock
(
&
pPool
->
mutex
);
}
int32_t
vnodeBufPoolRecycle
(
SVBufPool
*
pPool
)
{
int32_t
code
=
0
;
SVnode
*
pVnode
=
pPool
->
pVnode
;
vDebug
(
"vgId:%d recycle buffer pool %p of id %d"
,
TD_VID
(
pVnode
),
pPool
,
pPool
->
id
);
taosThreadMutexLock
(
&
pPool
->
mutex
);
SQueryNode
*
pNode
=
pPool
->
qList
.
pNext
;
while
(
pNode
!=
&
pPool
->
qList
)
{
SQueryNode
*
pTNode
=
pNode
->
pNext
;
int32_t
rc
=
pNode
->
reseek
(
pNode
->
pQHandle
);
if
(
rc
==
0
||
rc
==
TSDB_CODE_VND_QUERY_BUSY
)
{
pNode
=
pTNode
;
}
else
{
code
=
rc
;
goto
_exit
;
}
}
_exit:
taosThreadMutexUnlock
(
&
pPool
->
mutex
);
return
code
;
}
\ No newline at end of file
source/dnode/vnode/src/vnd/vnodeCommit.c
浏览文件 @
45ec75a5
...
...
@@ -21,41 +21,128 @@
static
int
vnodeEncodeInfo
(
const
SVnodeInfo
*
pInfo
,
char
**
ppData
);
static
int
vnodeCommitImpl
(
SCommitInfo
*
pInfo
);
int
vnodeBegin
(
SVnode
*
pVnode
)
{
// alloc buffer pool
taosThreadMutexLock
(
&
pVnode
->
mutex
);
#define WAIT_TIME_MILI_SEC 10 // miliseconds
static
int32_t
vnodeTryRecycleBufPool
(
SVnode
*
pVnode
)
{
int32_t
code
=
0
;
while
(
pVnode
->
pPool
==
NULL
)
{
taosThreadCondWait
(
&
pVnode
->
poolNotEmpty
,
&
pVnode
->
mutex
);
if
(
pVnode
->
onRecycle
==
NULL
)
{
if
(
pVnode
->
recycleHead
==
NULL
)
{
vDebug
(
"vgId:%d no recyclable buffer pool"
,
TD_VID
(
pVnode
));
goto
_exit
;
}
else
{
vDebug
(
"vgId:%d buffer pool %p of id %d on recycle queue, try to recycle"
,
TD_VID
(
pVnode
),
pVnode
->
recycleHead
,
pVnode
->
recycleHead
->
id
);
pVnode
->
onRecycle
=
pVnode
->
recycleHead
;
if
(
pVnode
->
recycleHead
==
pVnode
->
recycleTail
)
{
pVnode
->
recycleHead
=
pVnode
->
recycleTail
=
NULL
;
}
else
{
pVnode
->
recycleHead
=
pVnode
->
recycleHead
->
recycleNext
;
pVnode
->
recycleHead
->
recyclePrev
=
NULL
;
}
pVnode
->
onRecycle
->
recycleNext
=
pVnode
->
onRecycle
->
recyclePrev
=
NULL
;
}
}
pVnode
->
inUse
=
pVnode
->
pPool
;
pVnode
->
inUse
->
nRef
=
1
;
pVnode
->
pPool
=
pVnode
->
inUse
->
next
;
pVnode
->
inUse
->
next
=
NULL
;
code
=
vnodeBufPoolRecycle
(
pVnode
->
onRecycle
);
if
(
code
)
goto
_exit
;
_exit:
if
(
code
)
{
vError
(
"vgId:%d %s failed since %s"
,
TD_VID
(
pVnode
),
__func__
,
tstrerror
(
code
));
}
return
code
;
}
static
int32_t
vnodeGetBufPoolToUse
(
SVnode
*
pVnode
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
taosThreadMutexLock
(
&
pVnode
->
mutex
);
int32_t
nTry
=
0
;
for
(;;)
{
++
nTry
;
if
(
pVnode
->
freeList
)
{
vDebug
(
"vgId:%d allocate free buffer pool on %d try, pPool:%p id:%d"
,
TD_VID
(
pVnode
),
nTry
,
pVnode
->
freeList
,
pVnode
->
freeList
->
id
);
pVnode
->
inUse
=
pVnode
->
freeList
;
pVnode
->
inUse
->
nRef
=
1
;
pVnode
->
freeList
=
pVnode
->
inUse
->
freeNext
;
pVnode
->
inUse
->
freeNext
=
NULL
;
break
;
}
else
{
vDebug
(
"vgId:%d no free buffer pool on %d try, try to recycle..."
,
TD_VID
(
pVnode
),
nTry
);
/*
code = vnodeTryRecycleBufPool(pVnode);
TSDB_CHECK_CODE(code, lino, _exit);
*/
if
(
pVnode
->
freeList
==
NULL
)
{
vDebug
(
"vgId:%d no free buffer pool on %d try, wait %d ms..."
,
TD_VID
(
pVnode
),
nTry
,
WAIT_TIME_MILI_SEC
);
struct
timeval
tv
;
struct
timespec
ts
;
taosGetTimeOfDay
(
&
tv
);
ts
.
tv_nsec
=
tv
.
tv_usec
*
1000
+
WAIT_TIME_MILI_SEC
*
1000000
;
if
(
ts
.
tv_nsec
>
999999999l
)
{
ts
.
tv_sec
=
tv
.
tv_sec
+
1
;
ts
.
tv_nsec
-=
1000000000l
;
}
else
{
ts
.
tv_sec
=
tv
.
tv_sec
;
}
int32_t
rc
=
taosThreadCondTimedWait
(
&
pVnode
->
poolNotEmpty
,
&
pVnode
->
mutex
,
&
ts
);
if
(
rc
&&
rc
!=
ETIMEDOUT
)
{
code
=
TAOS_SYSTEM_ERROR
(
rc
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
}
}
_exit:
taosThreadMutexUnlock
(
&
pVnode
->
mutex
);
if
(
code
)
{
vError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
int
vnodeBegin
(
SVnode
*
pVnode
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
pVnode
->
state
.
commitID
++
;
// alloc buffer pool
code
=
vnodeGetBufPoolToUse
(
pVnode
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// begin meta
if
(
metaBegin
(
pVnode
->
pMeta
,
META_BEGIN_HEAP_BUFFERPOOL
)
<
0
)
{
vError
(
"vgId:%d, failed to begin meta since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
))
;
return
-
1
;
code
=
terrno
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
)
;
}
// begin tsdb
if
(
tsdbBegin
(
pVnode
->
pTsdb
)
<
0
)
{
vError
(
"vgId:%d, failed to begin tsdb since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
))
;
return
-
1
;
code
=
terrno
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
)
;
}
// begin sma
if
(
VND_IS_RSMA
(
pVnode
)
&&
smaBegin
(
pVnode
->
pSma
)
<
0
)
{
vError
(
"vgId:%d, failed to begin sma since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
))
;
return
-
1
;
code
=
terrno
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
)
;
}
return
0
;
_exit:
if
(
code
)
{
terrno
=
code
;
vError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
void
vnodeUpdCommitSched
(
SVnode
*
pVnode
)
{
...
...
@@ -70,7 +157,7 @@ int vnodeShouldCommit(SVnode *pVnode) {
}
SVCommitSched
*
pSched
=
&
pVnode
->
commitSched
;
int64_t
nowMs
=
taosGetMonoTimestampMs
();
int64_t
nowMs
=
taosGetMonoTimestampMs
();
return
(((
pVnode
->
inUse
->
size
>
pVnode
->
inUse
->
node
.
size
)
&&
(
pSched
->
commitMs
+
SYNC_VND_COMMIT_MIN_MS
<
nowMs
))
||
(
pVnode
->
inUse
->
size
>
0
&&
pSched
->
commitMs
+
pSched
->
maxWaitMs
<
nowMs
));
...
...
@@ -209,6 +296,12 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
tsem_wait
(
&
pVnode
->
canCommit
);
taosThreadMutexLock
(
&
pVnode
->
mutex
);
ASSERT
(
pVnode
->
onCommit
==
NULL
);
pVnode
->
onCommit
=
pVnode
->
inUse
;
pVnode
->
inUse
=
NULL
;
taosThreadMutexUnlock
(
&
pVnode
->
mutex
);
pVnode
->
state
.
commitTerm
=
pVnode
->
state
.
applyTerm
;
pInfo
->
info
.
config
=
pVnode
->
config
;
...
...
@@ -238,9 +331,6 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
code
=
smaPrepareAsyncCommit
(
pVnode
->
pSma
);
if
(
code
)
goto
_exit
;
vnodeBufPoolUnRef
(
pVnode
->
inUse
);
pVnode
->
inUse
=
NULL
;
_exit:
if
(
code
)
{
vError
(
"vgId:%d, %s failed at line %d since %s, commit id:%"
PRId64
,
TD_VID
(
pVnode
),
__func__
,
lino
,
...
...
@@ -251,19 +341,48 @@ _exit:
return
code
;
}
static
void
vnodeReturnBufPool
(
SVnode
*
pVnode
)
{
taosThreadMutexLock
(
&
pVnode
->
mutex
);
SVBufPool
*
pPool
=
pVnode
->
onCommit
;
int32_t
nRef
=
atomic_sub_fetch_32
(
&
pPool
->
nRef
,
1
);
pVnode
->
onCommit
=
NULL
;
if
(
nRef
==
0
)
{
vnodeBufPoolAddToFreeList
(
pPool
);
}
else
if
(
nRef
>
0
)
{
vDebug
(
"vgId:%d buffer pool %p of id %d is added to recycle queue"
,
TD_VID
(
pVnode
),
pPool
,
pPool
->
id
);
if
(
pVnode
->
recycleTail
==
NULL
)
{
pPool
->
recyclePrev
=
pPool
->
recycleNext
=
NULL
;
pVnode
->
recycleHead
=
pVnode
->
recycleTail
=
pPool
;
}
else
{
pPool
->
recyclePrev
=
pVnode
->
recycleTail
;
pPool
->
recycleNext
=
NULL
;
pVnode
->
recycleTail
->
recycleNext
=
pPool
;
pVnode
->
recycleTail
=
pPool
;
}
}
else
{
ASSERT
(
0
);
}
taosThreadMutexUnlock
(
&
pVnode
->
mutex
);
}
static
int32_t
vnodeCommitTask
(
void
*
arg
)
{
int32_t
code
=
0
;
SCommitInfo
*
pInfo
=
(
SCommitInfo
*
)
arg
;
SVnode
*
pVnode
=
pInfo
->
pVnode
;
// commit
code
=
vnodeCommitImpl
(
pInfo
);
if
(
code
)
goto
_exit
;
vnodeReturnBufPool
(
pVnode
);
_exit:
// end commit
tsem_post
(
&
p
Info
->
p
Vnode
->
canCommit
);
tsem_post
(
&
pVnode
->
canCommit
);
taosMemoryFree
(
pInfo
);
return
code
;
}
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
45ec75a5
...
...
@@ -240,7 +240,7 @@ _err:
if
(
pVnode
->
pTsdb
)
tsdbClose
(
&
pVnode
->
pTsdb
);
if
(
pVnode
->
pSma
)
smaClose
(
pVnode
->
pSma
);
if
(
pVnode
->
pMeta
)
metaClose
(
pVnode
->
pMeta
);
if
(
pVnode
->
pPool
)
vnodeCloseBufPool
(
pVnode
);
if
(
pVnode
->
freeList
)
vnodeCloseBufPool
(
pVnode
);
tsem_destroy
(
&
(
pVnode
->
canCommit
));
taosMemoryFree
(
pVnode
);
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
45ec75a5
...
...
@@ -191,13 +191,13 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
void
*
pReq
;
int32_t
len
;
int32_t
ret
;
/*
if (!pVnode->inUse) {
terrno = TSDB_CODE_VND_NO_AVAIL_BUFPOOL;
vError("vgId:%d, not ready to write since %s", TD_VID(pVnode), terrstr());
return -1;
}
*/
if
(
version
<=
pVnode
->
state
.
applied
)
{
vError
(
"vgId:%d, duplicate write request. version: %"
PRId64
", applied: %"
PRId64
""
,
TD_VID
(
pVnode
),
version
,
pVnode
->
state
.
applied
);
...
...
@@ -1001,7 +1001,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
code
=
terrno
;
goto
_exit
;
}
pSubmitTbData
->
uid
=
pSubmitTbData
->
pCreateTbReq
->
uid
;
// update uid if table exist for using below
pSubmitTbData
->
uid
=
pSubmitTbData
->
pCreateTbReq
->
uid
;
// update uid if table exist for using below
}
}
...
...
source/util/src/terror.c
浏览文件 @
45ec75a5
...
...
@@ -320,6 +320,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_SUBSCRIBED, "Table column is subsc
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_NO_AVAIL_BUFPOOL
,
"No availabe buffer pool"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_STOPPED
,
"Vnode stopped"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_DUP_REQUEST
,
"Duplicate write request"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_QUERY_BUSY
,
"Query busy"
)
// tsdb
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_INVALID_TABLE_ID
,
"Invalid table ID"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录