Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
87a73142
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
87a73142
编写于
8月 07, 2020
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-1057
上级
1db9831c
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
85 addition
and
82 deletion
+85
-82
deps/MsvcLibX/include/msvclibx.h
deps/MsvcLibX/include/msvclibx.h
+1
-1
src/os/inc/osWindows.h
src/os/inc/osWindows.h
+1
-0
src/tsdb/src/tsdbFile.c
src/tsdb/src/tsdbFile.c
+3
-3
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+7
-7
src/tsdb/src/tsdbMemTable.c
src/tsdb/src/tsdbMemTable.c
+4
-4
src/tsdb/src/tsdbMeta.c
src/tsdb/src/tsdbMeta.c
+6
-6
src/tsdb/src/tsdbRWHelper.c
src/tsdb/src/tsdbRWHelper.c
+27
-27
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+36
-34
未找到文件。
deps/MsvcLibX/include/msvclibx.h
浏览文件 @
87a73142
...
...
@@ -58,7 +58,7 @@
/* Generate the OS-and-debug-mode-specific library name */
#define _MSVCLIBX_LIB "MsvcLibX" _MSVCLIBX_LIB_OS_SUFFIX _MSVCLIBX_LIB_DBG_SUFFIX ".lib"
#pragma message("Adding pragma comment(lib, \"" _MSVCLIBX_LIB "\")")
//
#pragma message("Adding pragma comment(lib, \"" _MSVCLIBX_LIB "\")")
#pragma comment(lib, _MSVCLIBX_LIB)
/* Library-specific routine used internally by many standard routines */
...
...
src/os/inc/osWindows.h
浏览文件 @
87a73142
...
...
@@ -144,6 +144,7 @@ char * getpass(const char *prefix);
int
flock
(
int
fd
,
int
option
);
int
fsync
(
int
filedes
);
char
*
strndup
(
const
char
*
s
,
size_t
n
);
char
*
dirname
(
char
*
pszPathname
);
// for access function in io.h
#define F_OK 00 //Existence only
...
...
src/tsdb/src/tsdbFile.c
浏览文件 @
87a73142
...
...
@@ -305,8 +305,8 @@ void tsdbFitRetention(STsdbRepo *pRepo) {
STsdbFileH
*
pFileH
=
pRepo
->
tsdbFileH
;
SFileGroup
*
pGroup
=
pFileH
->
pFGroup
;
int
mfid
=
TSDB_KEY_FILEID
(
taosGetTimestamp
(
pCfg
->
precision
),
pCfg
->
daysPerFile
,
pCfg
->
precision
)
-
TSDB_MAX_FILE
(
pCfg
->
keep
,
pCfg
->
daysPerFile
);
int
mfid
=
(
int
)(
TSDB_KEY_FILEID
(
taosGetTimestamp
(
pCfg
->
precision
),
pCfg
->
daysPerFile
,
pCfg
->
precision
)
-
TSDB_MAX_FILE
(
pCfg
->
keep
,
pCfg
->
daysPerFile
)
)
;
pthread_rwlock_wrlock
(
&
(
pFileH
->
fhlock
));
...
...
@@ -371,7 +371,7 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
SFileGroup
fileGroup
=
*
pFGroup
;
int
nFilesLeft
=
pFileH
->
nFGroups
-
(
POINTER_DISTANCE
(
pFGroup
,
pFileH
->
pFGroup
)
/
sizeof
(
SFileGroup
)
+
1
);
int
nFilesLeft
=
pFileH
->
nFGroups
-
(
int
)(
POINTER_DISTANCE
(
pFGroup
,
pFileH
->
pFGroup
)
/
sizeof
(
SFileGroup
)
+
1
);
if
(
nFilesLeft
>
0
)
{
memmove
((
void
*
)
pFGroup
,
POINTER_SHIFT
(
pFGroup
,
sizeof
(
SFileGroup
)),
sizeof
(
SFileGroup
)
*
nFilesLeft
);
}
...
...
src/tsdb/src/tsdbMain.c
浏览文件 @
87a73142
...
...
@@ -211,7 +211,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
char
*
sdup
=
strdup
(
pRepo
->
rootDir
);
char
*
prefix
=
dirname
(
sdup
);
int
prefixLen
=
strlen
(
prefix
);
int
prefixLen
=
(
int
)
strlen
(
prefix
);
taosTFree
(
sdup
);
if
(
name
[
0
]
==
0
)
{
// get the file from index or after, but not larger than eindex
...
...
@@ -232,7 +232,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
fname
=
strdup
(
pFGroup
->
files
[(
*
index
)
%
TSDB_FILE_TYPE_MAX
].
fname
);
magic
=
pFGroup
->
files
[(
*
index
)
%
TSDB_FILE_TYPE_MAX
].
info
.
magic
;
}
else
{
if
((
pFGroup
->
fileId
+
1
)
*
TSDB_FILE_TYPE_MAX
-
1
<
eindex
)
{
if
((
pFGroup
->
fileId
+
1
)
*
TSDB_FILE_TYPE_MAX
-
1
<
(
int
)
eindex
)
{
fname
=
strdup
(
pFGroup
->
files
[
0
].
fname
);
*
index
=
pFGroup
->
fileId
*
TSDB_FILE_TYPE_MAX
;
magic
=
pFGroup
->
files
[
0
].
info
.
magic
;
...
...
@@ -327,7 +327,7 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int
// ----------------- INTERNAL FUNCTIONS -----------------
char
*
tsdbGetMetaFileName
(
char
*
rootDir
)
{
int
tlen
=
strlen
(
rootDir
)
+
strlen
(
TSDB_META_FILE_NAME
)
+
2
;
int
tlen
=
(
int
)(
strlen
(
rootDir
)
+
strlen
(
TSDB_META_FILE_NAME
)
+
2
)
;
char
*
fname
=
calloc
(
1
,
tlen
);
if
(
fname
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
...
...
@@ -366,7 +366,7 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) {
}
char
*
tsdbGetDataDirName
(
char
*
rootDir
)
{
int
tlen
=
strlen
(
rootDir
)
+
strlen
(
TSDB_DATA_DIR_NAME
)
+
2
;
int
tlen
=
(
int
)(
strlen
(
rootDir
)
+
strlen
(
TSDB_DATA_DIR_NAME
)
+
2
)
;
char
*
fname
=
calloc
(
1
,
tlen
);
if
(
fname
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
...
...
@@ -627,7 +627,7 @@ _err:
}
static
char
*
tsdbGetCfgFname
(
char
*
rootDir
)
{
int
tlen
=
strlen
(
rootDir
)
+
strlen
(
TSDB_CFG_FILE_NAME
)
+
2
;
int
tlen
=
(
int
)(
strlen
(
rootDir
)
+
strlen
(
TSDB_CFG_FILE_NAME
)
+
2
)
;
char
*
fname
=
calloc
(
1
,
tlen
);
if
(
fname
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
...
...
@@ -853,8 +853,8 @@ static int tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) {
return
-
1
;
}
int
mfid
=
TSDB_KEY_FILEID
(
taosGetTimestamp
(
pCfg
->
precision
),
pCfg
->
daysPerFile
,
pCfg
->
precision
)
-
TSDB_MAX_FILE
(
keep
,
pCfg
->
daysPerFile
);
int
mfid
=
(
int
)(
TSDB_KEY_FILEID
(
taosGetTimestamp
(
pCfg
->
precision
),
pCfg
->
daysPerFile
,
pCfg
->
precision
)
-
TSDB_MAX_FILE
(
keep
,
pCfg
->
daysPerFile
)
)
;
int
i
=
0
;
for
(;
i
<
pFileH
->
nFGroups
;
i
++
)
{
...
...
src/tsdb/src/tsdbMemTable.c
浏览文件 @
87a73142
...
...
@@ -307,7 +307,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
keyNext
=
tsdbNextIterKey
(
pIter
);
if
(
keyNext
<
0
||
keyNext
>
maxKey
)
return
numOfRows
;
void
*
ptr
=
taosbsearch
((
void
*
)(
&
keyNext
),
(
void
*
)
filterKeys
,
nFilterKeys
,
sizeof
(
TSKEY
),
compTSKEY
,
TD_GE
);
filterIter
=
(
ptr
==
NULL
)
?
nFilterKeys
:
(
POINTER_DISTANCE
(
ptr
,
filterKeys
)
/
sizeof
(
TSKEY
));
filterIter
=
(
ptr
==
NULL
)
?
nFilterKeys
:
(
int
)((
POINTER_DISTANCE
(
ptr
,
filterKeys
)
/
sizeof
(
TSKEY
)
));
}
do
{
...
...
@@ -364,7 +364,7 @@ static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) {
tsdbTrace
(
"vgId:%d free %d bytes to TSDB buffer pool, nBlocks %d offset %d remain %d"
,
REPO_ID
(
pRepo
),
bytes
,
listNEles
(
pRepo
->
mem
->
bufBlockList
),
pBufBlock
->
offset
,
pBufBlock
->
remain
);
}
else
{
SListNode
*
pNode
=
(
SListNode
*
)
POINTER_SHIFT
(
ptr
,
-
sizeof
(
SListNode
));
SListNode
*
pNode
=
(
SListNode
*
)
POINTER_SHIFT
(
ptr
,
-
(
int
)(
sizeof
(
SListNode
)
));
ASSERT
(
listTail
(
pRepo
->
mem
->
extraBuffList
)
==
pNode
);
tdListPopNode
(
pRepo
->
mem
->
extraBuffList
,
pNode
);
free
(
pNode
);
...
...
@@ -495,8 +495,8 @@ static void *tsdbCommitData(void *arg) {
goto
_exit
;
}
int
sfid
=
TSDB_KEY_FILEID
(
pMem
->
keyFirst
,
pCfg
->
daysPerFile
,
pCfg
->
precision
);
int
efid
=
TSDB_KEY_FILEID
(
pMem
->
keyLast
,
pCfg
->
daysPerFile
,
pCfg
->
precision
);
int
sfid
=
(
int
)(
TSDB_KEY_FILEID
(
pMem
->
keyFirst
,
pCfg
->
daysPerFile
,
pCfg
->
precision
)
);
int
efid
=
(
int
)(
TSDB_KEY_FILEID
(
pMem
->
keyLast
,
pCfg
->
daysPerFile
,
pCfg
->
precision
)
);
// Loop to commit to each file
for
(
int
fid
=
sfid
;
fid
<=
efid
;
fid
++
)
{
...
...
src/tsdb/src/tsdbMeta.c
浏览文件 @
87a73142
...
...
@@ -443,7 +443,7 @@ STsdbMeta *tsdbNewMeta(STsdbCfg *pCfg) {
goto
_err
;
}
pMeta
->
uidMap
=
taosHashInit
(
TSDB_INIT_NTABLES
*
1
.
1
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
);
pMeta
->
uidMap
=
taosHashInit
(
(
size_t
)(
TSDB_INIT_NTABLES
*
1
.
1
)
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
);
if
(
pMeta
->
uidMap
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
...
...
@@ -672,7 +672,7 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
}
STR_WITH_SIZE_TO_VARSTR
(
pTable
->
name
,
pCfg
->
sname
,
tsize
);
STR_WITH_SIZE_TO_VARSTR
(
pTable
->
name
,
pCfg
->
sname
,
(
VarDataLenT
)
tsize
);
TABLE_UID
(
pTable
)
=
pCfg
->
superUid
;
TABLE_TID
(
pTable
)
=
-
1
;
TABLE_SUID
(
pTable
)
=
-
1
;
...
...
@@ -690,7 +690,7 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
}
pTable
->
tagVal
=
NULL
;
STColumn
*
pCol
=
schemaColAt
(
pTable
->
tagSchema
,
DEFAULT_TAG_INDEX_COLUMN
);
pTable
->
pIndex
=
tSkipListCreate
(
TSDB_SUPER_TABLE_SL_LEVEL
,
colType
(
pCol
),
colBytes
(
pCol
),
1
,
0
,
1
,
getTagIndexKey
);
pTable
->
pIndex
=
tSkipListCreate
(
TSDB_SUPER_TABLE_SL_LEVEL
,
colType
(
pCol
),
(
uint8_t
)(
colBytes
(
pCol
)
),
1
,
0
,
1
,
getTagIndexKey
);
if
(
pTable
->
pIndex
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
...
...
@@ -703,7 +703,7 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
}
STR_WITH_SIZE_TO_VARSTR
(
pTable
->
name
,
pCfg
->
name
,
tsize
);
STR_WITH_SIZE_TO_VARSTR
(
pTable
->
name
,
pCfg
->
name
,
(
VarDataLenT
)
tsize
);
TABLE_UID
(
pTable
)
=
pCfg
->
tableId
.
uid
;
TABLE_TID
(
pTable
)
=
pCfg
->
tableId
.
tid
;
...
...
@@ -1165,7 +1165,7 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) {
buf
=
tdDecodeSchema
(
buf
,
&
(
pTable
->
tagSchema
));
STColumn
*
pCol
=
schemaColAt
(
pTable
->
tagSchema
,
DEFAULT_TAG_INDEX_COLUMN
);
pTable
->
pIndex
=
tSkipListCreate
(
TSDB_SUPER_TABLE_SL_LEVEL
,
colType
(
pCol
),
colBytes
(
pCol
),
1
,
0
,
1
,
getTagIndexKey
);
tSkipListCreate
(
TSDB_SUPER_TABLE_SL_LEVEL
,
colType
(
pCol
),
(
uint8_t
)(
colBytes
(
pCol
)
),
1
,
0
,
1
,
getTagIndexKey
);
if
(
pTable
->
pIndex
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tsdbFreeTable
(
pTable
);
...
...
@@ -1191,7 +1191,7 @@ static int tsdbGetTableEncodeSize(int8_t act, STable *pTable) {
tlen
=
sizeof
(
SListNode
)
+
sizeof
(
SActObj
)
+
sizeof
(
SActCont
)
+
tsdbEncodeTable
(
NULL
,
pTable
)
+
sizeof
(
TSCKSUM
);
}
else
{
if
(
TABLE_TYPE
(
pTable
)
==
TSDB_SUPER_TABLE
)
{
tlen
=
(
sizeof
(
SListNode
)
+
sizeof
(
SActObj
))
*
(
tSkipListGetSize
(
pTable
->
pIndex
)
+
1
);
tlen
=
(
int
)((
sizeof
(
SListNode
)
+
sizeof
(
SActObj
))
*
(
tSkipListGetSize
(
pTable
->
pIndex
)
+
1
)
);
}
else
{
tlen
=
sizeof
(
SListNode
)
+
sizeof
(
SActObj
);
}
...
...
src/tsdb/src/tsdbRWHelper.c
浏览文件 @
87a73142
...
...
@@ -302,7 +302,7 @@ int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols
if
(
tsdbLoadCompInfo
(
pHelper
,
NULL
)
<
0
)
return
-
1
;
while
(
true
)
{
ASSERT
(
blkIdx
<=
pIdx
->
numOfBlocks
);
ASSERT
(
blkIdx
<=
(
int
)
pIdx
->
numOfBlocks
);
TSKEY
keyFirst
=
tsdbNextIterKey
(
pCommitIter
->
pIter
);
if
(
keyFirst
<
0
||
keyFirst
>
maxKey
)
break
;
// iter over
...
...
@@ -405,7 +405,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
pIdx
->
tid
=
pHelper
->
tableInfo
.
tid
;
ASSERT
(
pIdx
->
offset
>=
TSDB_FILE_HEAD_SIZE
);
if
(
taosTWrite
(
pFile
->
fd
,
(
void
*
)(
pHelper
->
pCompInfo
),
pIdx
->
len
)
<
pIdx
->
len
)
{
if
(
taosTWrite
(
pFile
->
fd
,
(
void
*
)(
pHelper
->
pCompInfo
),
pIdx
->
len
)
<
(
int
)
pIdx
->
len
)
{
tsdbError
(
"vgId:%d failed to write %d bytes to file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pIdx
->
len
,
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -462,7 +462,7 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
pFile
->
info
.
offset
=
offset
;
if
(
taosTWrite
(
pFile
->
fd
,
(
void
*
)
pHelper
->
pWIdx
,
pFile
->
info
.
len
)
<
pFile
->
info
.
len
)
{
if
(
taosTWrite
(
pFile
->
fd
,
(
void
*
)
pHelper
->
pWIdx
,
pFile
->
info
.
len
)
<
(
int
)
pFile
->
info
.
len
)
{
tsdbError
(
"vgId:%d failed to write %d bytes to file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
info
.
len
,
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -495,7 +495,7 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
return
-
1
;
}
if
(
taosTRead
(
fd
,
(
void
*
)(
pHelper
->
pBuffer
),
pFile
->
info
.
len
)
<
pFile
->
info
.
len
)
{
if
(
taosTRead
(
fd
,
(
void
*
)(
pHelper
->
pBuffer
),
pFile
->
info
.
len
)
<
(
int
)
pFile
->
info
.
len
)
{
tsdbError
(
"vgId:%d failed to read %d bytes from file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
info
.
len
,
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -512,7 +512,7 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
// Decode it
pHelper
->
idxH
.
numOfIdx
=
0
;
void
*
ptr
=
pHelper
->
pBuffer
;
while
(
POINTER_DISTANCE
(
ptr
,
pHelper
->
pBuffer
)
<
(
pFile
->
info
.
len
-
sizeof
(
TSCKSUM
)))
{
while
(
POINTER_DISTANCE
(
ptr
,
pHelper
->
pBuffer
)
<
(
int
)(
pFile
->
info
.
len
-
sizeof
(
TSCKSUM
)))
{
size_t
tlen
=
taosTSizeof
(
pHelper
->
idxH
.
pIdxArray
);
pHelper
->
idxH
.
numOfIdx
++
;
...
...
@@ -535,7 +535,7 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
ASSERT
(
pHelper
->
idxH
.
numOfIdx
==
1
||
pHelper
->
idxH
.
pIdxArray
[
pHelper
->
idxH
.
numOfIdx
-
1
].
tid
>
pHelper
->
idxH
.
pIdxArray
[
pHelper
->
idxH
.
numOfIdx
-
2
].
tid
);
ASSERT
(
POINTER_DISTANCE
(
ptr
,
pHelper
->
pBuffer
)
<=
pFile
->
info
.
len
-
sizeof
(
TSCKSUM
));
ASSERT
(
POINTER_DISTANCE
(
ptr
,
pHelper
->
pBuffer
)
<=
(
int
)(
pFile
->
info
.
len
-
sizeof
(
TSCKSUM
)
));
}
}
}
...
...
@@ -566,7 +566,7 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
}
pHelper
->
pCompInfo
=
taosTRealloc
((
void
*
)
pHelper
->
pCompInfo
,
pIdx
->
len
);
if
(
taosTRead
(
fd
,
(
void
*
)(
pHelper
->
pCompInfo
),
pIdx
->
len
)
<
pIdx
->
len
)
{
if
(
taosTRead
(
fd
,
(
void
*
)(
pHelper
->
pCompInfo
),
pIdx
->
len
)
<
(
int
)
pIdx
->
len
)
{
tsdbError
(
"vgId:%d failed to read %d bytes from file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pIdx
->
len
,
helperHeadF
(
pHelper
)
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -594,7 +594,7 @@ int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) {
ASSERT
(
pCompBlock
->
numOfSubBlocks
<=
1
);
SFile
*
pFile
=
(
pCompBlock
->
last
)
?
helperLastF
(
pHelper
)
:
helperDataF
(
pHelper
);
if
(
lseek
(
pFile
->
fd
,
pCompBlock
->
offset
,
SEEK_SET
)
<
0
)
{
if
(
lseek
(
pFile
->
fd
,
(
off_t
)
pCompBlock
->
offset
,
SEEK_SET
)
<
0
)
{
tsdbError
(
"vgId:%d failed to lseek file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
...
...
@@ -614,7 +614,7 @@ int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) {
return
-
1
;
}
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)
pHelper
->
pCompData
,
tsize
))
{
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)
pHelper
->
pCompData
,
(
uint32_t
)
tsize
))
{
tsdbError
(
"vgId:%d file %s is broken, offset %"
PRId64
" size %zu"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
fname
,
(
int64_t
)
pCompBlock
->
offset
,
tsize
);
terrno
=
TSDB_CODE_TDB_FILE_CORRUPTED
;
...
...
@@ -787,8 +787,8 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
}
flen
=
(
*
(
tDataTypeDesc
[
pDataCol
->
type
].
compFunc
))((
char
*
)
pDataCol
->
pData
,
tlen
,
rowsToWrite
,
tptr
,
taosTSizeof
(
pHelper
->
pBuffer
)
-
lsize
,
pCfg
->
compression
,
pHelper
->
compBuffer
,
taosTSizeof
(
pHelper
->
compBuffer
));
(
int32_t
)
taosTSizeof
(
pHelper
->
pBuffer
)
-
lsize
,
pCfg
->
compression
,
pHelper
->
compBuffer
,
(
int32_t
)
taosTSizeof
(
pHelper
->
compBuffer
));
}
else
{
flen
=
tlen
;
memcpy
(
tptr
,
pDataCol
->
pData
,
flen
);
...
...
@@ -879,7 +879,7 @@ static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) {
static
int
tsdbInsertSuperBlock
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
int
blkIdx
)
{
SCompIdx
*
pIdx
=
&
(
pHelper
->
curCompIdx
);
ASSERT
(
blkIdx
>=
0
&&
blkIdx
<=
pIdx
->
numOfBlocks
);
ASSERT
(
blkIdx
>=
0
&&
blkIdx
<=
(
int
)
pIdx
->
numOfBlocks
);
ASSERT
(
pCompBlock
->
numOfSubBlocks
==
1
);
// Adjust memory if no more room
...
...
@@ -887,7 +887,7 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
if
(
tsdbAdjustInfoSizeIfNeeded
(
pHelper
,
pIdx
->
len
+
sizeof
(
SCompInfo
))
<
0
)
goto
_err
;
// Change the offset
for
(
in
t
i
=
0
;
i
<
pIdx
->
numOfBlocks
;
i
++
)
{
for
(
uint32_
t
i
=
0
;
i
<
pIdx
->
numOfBlocks
;
i
++
)
{
SCompBlock
*
pTCompBlock
=
&
pHelper
->
pCompInfo
->
blocks
[
i
];
if
(
pTCompBlock
->
numOfSubBlocks
>
1
)
pTCompBlock
->
offset
+=
sizeof
(
SCompBlock
);
}
...
...
@@ -906,7 +906,7 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
pIdx
->
len
+=
sizeof
(
SCompBlock
);
ASSERT
(
pIdx
->
len
<=
taosTSizeof
(
pHelper
->
pCompInfo
));
pIdx
->
maxKey
=
blockAtIdx
(
pHelper
,
pIdx
->
numOfBlocks
-
1
)
->
keyLast
;
pIdx
->
hasLast
=
blockAtIdx
(
pHelper
,
pIdx
->
numOfBlocks
-
1
)
->
last
;
pIdx
->
hasLast
=
(
uint32_t
)
blockAtIdx
(
pHelper
,
pIdx
->
numOfBlocks
-
1
)
->
last
;
if
(
pIdx
->
numOfBlocks
>
1
)
{
ASSERT
(
pHelper
->
pCompInfo
->
blocks
[
0
].
keyLast
<
pHelper
->
pCompInfo
->
blocks
[
1
].
keyFirst
);
...
...
@@ -925,7 +925,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
ASSERT
(
pCompBlock
->
numOfSubBlocks
==
0
);
SCompIdx
*
pIdx
=
&
(
pHelper
->
curCompIdx
);
ASSERT
(
blkIdx
>=
0
&&
blkIdx
<
pIdx
->
numOfBlocks
);
ASSERT
(
blkIdx
>=
0
&&
blkIdx
<
(
int
)
pIdx
->
numOfBlocks
);
SCompBlock
*
pSCompBlock
=
pHelper
->
pCompInfo
->
blocks
+
blkIdx
;
ASSERT
(
pSCompBlock
->
numOfSubBlocks
>=
1
&&
pSCompBlock
->
numOfSubBlocks
<
TSDB_MAX_SUBBLOCKS
);
...
...
@@ -943,7 +943,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
memmove
((
void
*
)((
char
*
)(
pHelper
->
pCompInfo
)
+
pSCompBlock
->
offset
+
pSCompBlock
->
len
+
sizeof
(
SCompBlock
)),
(
void
*
)((
char
*
)(
pHelper
->
pCompInfo
)
+
pSCompBlock
->
offset
+
pSCompBlock
->
len
),
tsize
);
for
(
in
t
i
=
blkIdx
+
1
;
i
<
pIdx
->
numOfBlocks
;
i
++
)
{
for
(
uint32_
t
i
=
blkIdx
+
1
;
i
<
pIdx
->
numOfBlocks
;
i
++
)
{
SCompBlock
*
pTCompBlock
=
&
pHelper
->
pCompInfo
->
blocks
[
i
];
if
(
pTCompBlock
->
numOfSubBlocks
>
1
)
pTCompBlock
->
offset
+=
sizeof
(
SCompBlock
);
}
...
...
@@ -960,7 +960,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
pIdx
->
len
+=
sizeof
(
SCompBlock
);
}
else
{
// Need to create two sub-blocks
void
*
ptr
=
NULL
;
for
(
in
t
i
=
blkIdx
+
1
;
i
<
pIdx
->
numOfBlocks
;
i
++
)
{
for
(
uint32_
t
i
=
blkIdx
+
1
;
i
<
pIdx
->
numOfBlocks
;
i
++
)
{
SCompBlock
*
pTCompBlock
=
pHelper
->
pCompInfo
->
blocks
+
i
;
if
(
pTCompBlock
->
numOfSubBlocks
>
1
)
{
ptr
=
POINTER_SHIFT
(
pHelper
->
pCompInfo
,
pTCompBlock
->
offset
);
...
...
@@ -973,7 +973,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
size_t
tsize
=
pIdx
->
len
-
((
char
*
)
ptr
-
(
char
*
)(
pHelper
->
pCompInfo
));
if
(
tsize
>
0
)
{
memmove
(
POINTER_SHIFT
(
ptr
,
sizeof
(
SCompBlock
)
*
2
),
ptr
,
tsize
);
for
(
in
t
i
=
blkIdx
+
1
;
i
<
pIdx
->
numOfBlocks
;
i
++
)
{
for
(
uint32_
t
i
=
blkIdx
+
1
;
i
<
pIdx
->
numOfBlocks
;
i
++
)
{
SCompBlock
*
pTCompBlock
=
pHelper
->
pCompInfo
->
blocks
+
i
;
if
(
pTCompBlock
->
numOfSubBlocks
>
1
)
pTCompBlock
->
offset
+=
(
sizeof
(
SCompBlock
)
*
2
);
}
...
...
@@ -995,7 +995,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
}
pIdx
->
maxKey
=
pHelper
->
pCompInfo
->
blocks
[
pIdx
->
numOfBlocks
-
1
].
keyLast
;
pIdx
->
hasLast
=
pHelper
->
pCompInfo
->
blocks
[
pIdx
->
numOfBlocks
-
1
].
last
;
pIdx
->
hasLast
=
(
uint32_t
)
pHelper
->
pCompInfo
->
blocks
[
pIdx
->
numOfBlocks
-
1
].
last
;
tsdbDebug
(
"vgId:%d tid:%d a subblock is added at index %d"
,
REPO_ID
(
pHelper
->
pRepo
),
pHelper
->
tableInfo
.
tid
,
blkIdx
);
...
...
@@ -1010,7 +1010,7 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
SCompIdx
*
pIdx
=
&
(
pHelper
->
curCompIdx
);
ASSERT
(
blkIdx
>=
0
&&
blkIdx
<
pIdx
->
numOfBlocks
);
ASSERT
(
blkIdx
>=
0
&&
blkIdx
<
(
int
)
pIdx
->
numOfBlocks
);
SCompBlock
*
pSCompBlock
=
pHelper
->
pCompInfo
->
blocks
+
blkIdx
;
...
...
@@ -1024,7 +1024,7 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
POINTER_SHIFT
(
pHelper
->
pCompInfo
,
pSCompBlock
->
offset
+
pSCompBlock
->
len
),
tsize
);
}
for
(
in
t
i
=
blkIdx
+
1
;
i
<
pIdx
->
numOfBlocks
;
i
++
)
{
for
(
uint32_
t
i
=
blkIdx
+
1
;
i
<
pIdx
->
numOfBlocks
;
i
++
)
{
SCompBlock
*
pTCompBlock
=
&
pHelper
->
pCompInfo
->
blocks
[
i
];
if
(
pTCompBlock
->
numOfSubBlocks
>
1
)
pTCompBlock
->
offset
-=
(
sizeof
(
SCompBlock
)
*
pSCompBlock
->
numOfSubBlocks
);
}
...
...
@@ -1035,7 +1035,7 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
*
pSCompBlock
=
*
pCompBlock
;
pIdx
->
maxKey
=
blockAtIdx
(
pHelper
,
pIdx
->
numOfBlocks
-
1
)
->
keyLast
;
pIdx
->
hasLast
=
blockAtIdx
(
pHelper
,
pIdx
->
numOfBlocks
-
1
)
->
last
;
pIdx
->
hasLast
=
(
uint32_t
)
blockAtIdx
(
pHelper
,
pIdx
->
numOfBlocks
-
1
)
->
last
;
tsdbDebug
(
"vgId:%d tid:%d a super block is updated at index %d"
,
REPO_ID
(
pHelper
->
pRepo
),
pHelper
->
tableInfo
.
tid
,
blkIdx
);
...
...
@@ -1197,7 +1197,7 @@ static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBl
}
int64_t
offset
=
pCompBlock
->
offset
+
TSDB_GET_COMPCOL_LEN
(
pCompBlock
->
numOfCols
)
+
pCompCol
->
offset
;
if
(
lseek
(
pFile
->
fd
,
offset
,
SEEK_SET
)
<
0
)
{
if
(
lseek
(
pFile
->
fd
,
(
off_t
)
offset
,
SEEK_SET
)
<
0
)
{
tsdbError
(
"vgId:%d failed to lseek file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
...
...
@@ -1212,7 +1212,7 @@ static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBl
if
(
tsdbCheckAndDecodeColumnData
(
pDataCol
,
pHelper
->
pBuffer
,
pCompCol
->
len
,
pCompBlock
->
algorithm
,
pCompBlock
->
numOfRows
,
pHelper
->
pRepo
->
config
.
maxRowsPerFileBlock
,
pHelper
->
compBuffer
,
taosTSizeof
(
pHelper
->
compBuffer
))
<
0
)
{
pHelper
->
compBuffer
,
(
int32_t
)
taosTSizeof
(
pHelper
->
compBuffer
))
<
0
)
{
tsdbError
(
"vgId:%d file %s is broken at column %d offset %"
PRId64
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
fname
,
pCompCol
->
colId
,
offset
);
return
-
1
;
...
...
@@ -1312,7 +1312,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
SCompData
*
pCompData
=
(
SCompData
*
)
pHelper
->
pBuffer
;
int
fd
=
pFile
->
fd
;
if
(
lseek
(
fd
,
pCompBlock
->
offset
,
SEEK_SET
)
<
0
)
{
if
(
lseek
(
fd
,
(
off_t
)
pCompBlock
->
offset
,
SEEK_SET
)
<
0
)
{
tsdbError
(
"vgId:%d tid:%d failed to lseek file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pHelper
->
tableInfo
.
tid
,
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -1375,7 +1375,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
}
if
(
tsdbCheckAndDecodeColumnData
(
pDataCol
,
(
char
*
)
pCompData
+
tsize
+
toffset
,
tlen
,
pCompBlock
->
algorithm
,
pCompBlock
->
numOfRows
,
pDataCols
->
maxPoints
,
pHelper
->
compBuffer
,
taosTSizeof
(
pHelper
->
compBuffer
))
<
0
)
{
(
int32_t
)
taosTSizeof
(
pHelper
->
compBuffer
))
<
0
)
{
tsdbError
(
"vgId:%d file %s is broken at column %d block offset %"
PRId64
" column offset %d"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
fname
,
tcolId
,
(
int64_t
)
pCompBlock
->
offset
,
toffset
);
goto
_err
;
...
...
@@ -1499,7 +1499,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
SCompBlock
*
pCompBlock
=
taosbsearch
((
void
*
)(
&
keyFirst
),
(
void
*
)
blockAtIdx
(
pHelper
,
*
blkIdx
),
pIdx
->
numOfBlocks
-
*
blkIdx
,
sizeof
(
SCompBlock
),
compareKeyBlock
,
TD_GE
);
ASSERT
(
pCompBlock
!=
NULL
);
int
tblkIdx
=
TSDB_GET_COMPBLOCK_IDX
(
pHelper
,
pCompBlock
);
int
tblkIdx
=
(
int32_t
)(
TSDB_GET_COMPBLOCK_IDX
(
pHelper
,
pCompBlock
)
);
if
(
pCompBlock
->
last
)
{
ASSERT
(
pCompBlock
->
numOfRows
<
pCfg
->
minRowsPerFileBlock
&&
tblkIdx
==
pIdx
->
numOfBlocks
-
1
);
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
87a73142
...
...
@@ -235,9 +235,10 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
STableCheckInfo
info
=
{
.
lastKey
=
pQueryHandle
->
window
.
skey
,
.
tableId
=
pTable
->
tableId
,
//
.tableId = pTable->tableId,
.
pTableObj
=
pTable
,
};
info
.
tableId
=
pTable
->
tableId
;
assert
(
info
.
pTableObj
!=
NULL
&&
(
info
.
pTableObj
->
type
==
TSDB_NORMAL_TABLE
||
info
.
pTableObj
->
type
==
TSDB_CHILD_TABLE
||
info
.
pTableObj
->
type
==
TSDB_STREAM_TABLE
));
...
...
@@ -523,7 +524,7 @@ static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precisio
fid
=
INT32_MAX
;
}
return
fid
;
return
(
int32_t
)
fid
;
}
static
int32_t
binarySearchForBlock
(
SCompBlock
*
pBlock
,
int32_t
numOfBlocks
,
TSKEY
skey
,
int32_t
order
)
{
...
...
@@ -571,7 +572,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
continue
;
// no data blocks in the file belongs to pCheckInfo->pTable
}
if
(
pCheckInfo
->
compSize
<
compIndex
->
len
)
{
if
(
pCheckInfo
->
compSize
<
(
int32_t
)
compIndex
->
len
)
{
assert
(
compIndex
->
len
>
0
);
char
*
t
=
realloc
(
pCheckInfo
->
pCompInfo
,
compIndex
->
len
);
...
...
@@ -604,7 +605,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
}
// todo speedup the procedure of located end block
while
(
end
<
compIndex
->
numOfBlocks
&&
(
pCompInfo
->
blocks
[
end
].
keyFirst
<=
e
))
{
while
(
end
<
(
int32_t
)
compIndex
->
numOfBlocks
&&
(
pCompInfo
->
blocks
[
end
].
keyFirst
<=
e
))
{
end
+=
1
;
}
...
...
@@ -644,7 +645,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
tdInitDataCols
(
pQueryHandle
->
rhelper
.
pDataCols
[
1
],
pSchema
);
int16_t
*
colIds
=
pQueryHandle
->
defaultLoadColumn
->
pData
;
int32_t
ret
=
tsdbLoadBlockDataCols
(
&
(
pQueryHandle
->
rhelper
),
pBlock
,
pCheckInfo
->
pCompInfo
,
colIds
,
QH_GET_NUM_OF_COLS
(
pQueryHandle
));
int32_t
ret
=
tsdbLoadBlockDataCols
(
&
(
pQueryHandle
->
rhelper
),
pBlock
,
pCheckInfo
->
pCompInfo
,
colIds
,
(
int
)(
QH_GET_NUM_OF_COLS
(
pQueryHandle
)
));
if
(
ret
==
TSDB_CODE_SUCCESS
)
{
SDataBlockLoadInfo
*
pBlockLoadInfo
=
&
pQueryHandle
->
dataBlockLoadInfo
;
...
...
@@ -846,7 +847,7 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap
return
numOfRows
;
}
int32_t
requiredNumOfCols
=
taosArrayGetSize
(
pQueryHandle
->
pColumns
);
int32_t
requiredNumOfCols
=
(
int32_t
)
taosArrayGetSize
(
pQueryHandle
->
pColumns
);
//data in buffer has greater timestamp, copy data in file block
int32_t
i
=
0
,
j
=
0
;
...
...
@@ -862,15 +863,15 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap
int32_t
bytes
=
pColInfo
->
info
.
bytes
;
if
(
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
))
{
pData
=
pColInfo
->
pData
+
numOfRows
*
pColInfo
->
info
.
bytes
;
pData
=
(
char
*
)
pColInfo
->
pData
+
numOfRows
*
pColInfo
->
info
.
bytes
;
}
else
{
pData
=
pColInfo
->
pData
+
(
capacity
-
numOfRows
-
num
)
*
pColInfo
->
info
.
bytes
;
pData
=
(
char
*
)
pColInfo
->
pData
+
(
capacity
-
numOfRows
-
num
)
*
pColInfo
->
info
.
bytes
;
}
if
(
pColInfo
->
info
.
colId
==
src
->
colId
)
{
if
(
pColInfo
->
info
.
type
!=
TSDB_DATA_TYPE_BINARY
&&
pColInfo
->
info
.
type
!=
TSDB_DATA_TYPE_NCHAR
)
{
memmove
(
pData
,
src
->
pData
+
bytes
*
start
,
bytes
*
num
);
memmove
(
pData
,
(
char
*
)
src
->
pData
+
bytes
*
start
,
bytes
*
num
);
}
else
{
// handle the var-string
char
*
dst
=
pData
;
...
...
@@ -902,9 +903,9 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap
while
(
i
<
requiredNumOfCols
)
{
// the remain columns are all null data
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
i
);
if
(
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
))
{
pData
=
pColInfo
->
pData
+
numOfRows
*
pColInfo
->
info
.
bytes
;
pData
=
(
char
*
)
pColInfo
->
pData
+
numOfRows
*
pColInfo
->
info
.
bytes
;
}
else
{
pData
=
pColInfo
->
pData
+
(
capacity
-
numOfRows
-
num
)
*
pColInfo
->
info
.
bytes
;
pData
=
(
char
*
)
pColInfo
->
pData
+
(
capacity
-
numOfRows
-
num
)
*
pColInfo
->
info
.
bytes
;
}
if
(
pColInfo
->
info
.
type
==
TSDB_DATA_TYPE_BINARY
||
pColInfo
->
info
.
type
==
TSDB_DATA_TYPE_NCHAR
)
{
...
...
@@ -944,13 +945,13 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
}
if
(
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
))
{
pData
=
pColInfo
->
pData
+
numOfRows
*
pColInfo
->
info
.
bytes
;
pData
=
(
char
*
)
pColInfo
->
pData
+
numOfRows
*
pColInfo
->
info
.
bytes
;
}
else
{
pData
=
pColInfo
->
pData
+
(
capacity
-
numOfRows
-
1
)
*
pColInfo
->
info
.
bytes
;
pData
=
(
char
*
)
pColInfo
->
pData
+
(
capacity
-
numOfRows
-
1
)
*
pColInfo
->
info
.
bytes
;
}
if
(
pSchema
->
columns
[
j
].
colId
==
pColInfo
->
info
.
colId
)
{
void
*
value
=
tdGetRowDataOfCol
(
row
,
pColInfo
->
info
.
type
,
TD_DATA_ROW_HEAD_SIZE
+
pSchema
->
columns
[
j
].
offset
);
void
*
value
=
tdGetRowDataOfCol
(
row
,
(
int8_t
)
pColInfo
->
info
.
type
,
TD_DATA_ROW_HEAD_SIZE
+
pSchema
->
columns
[
j
].
offset
);
if
(
pColInfo
->
info
.
type
==
TSDB_DATA_TYPE_BINARY
||
pColInfo
->
info
.
type
==
TSDB_DATA_TYPE_NCHAR
)
{
memcpy
(
pData
,
value
,
varDataTLen
(
value
));
}
else
{
...
...
@@ -972,9 +973,9 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
while
(
i
<
numOfCols
)
{
// the remain columns are all null data
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
i
);
if
(
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
))
{
pData
=
pColInfo
->
pData
+
numOfRows
*
pColInfo
->
info
.
bytes
;
pData
=
(
char
*
)
pColInfo
->
pData
+
numOfRows
*
pColInfo
->
info
.
bytes
;
}
else
{
pData
=
pColInfo
->
pData
+
(
capacity
-
numOfRows
-
1
)
*
pColInfo
->
info
.
bytes
;
pData
=
(
char
*
)
pColInfo
->
pData
+
(
capacity
-
numOfRows
-
1
)
*
pColInfo
->
info
.
bytes
;
}
if
(
pColInfo
->
info
.
type
==
TSDB_DATA_TYPE_BINARY
||
pColInfo
->
info
.
type
==
TSDB_DATA_TYPE_NCHAR
)
{
...
...
@@ -997,7 +998,7 @@ static void moveDataToFront(STsdbQueryHandle* pQueryHandle, int32_t numOfRows, i
int32_t
emptySize
=
pQueryHandle
->
outputCapacity
-
numOfRows
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
i
);
memmove
(
pColInfo
->
pData
,
pColInfo
->
pData
+
emptySize
*
pColInfo
->
info
.
bytes
,
numOfRows
*
pColInfo
->
info
.
bytes
);
memmove
(
(
char
*
)
pColInfo
->
pData
,
(
char
*
)
pColInfo
->
pData
+
emptySize
*
pColInfo
->
info
.
bytes
,
numOfRows
*
pColInfo
->
info
.
bytes
);
}
}
}
...
...
@@ -1075,7 +1076,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int32_t
order
=
(
pQueryHandle
->
order
==
TSDB_ORDER_ASC
)
?
TSDB_ORDER_DESC
:
TSDB_ORDER_ASC
;
int32_t
step
=
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
?
1
:-
1
;
int32_t
numOfCols
=
QH_GET_NUM_OF_COLS
(
pQueryHandle
);
int32_t
numOfCols
=
(
int32_t
)(
QH_GET_NUM_OF_COLS
(
pQueryHandle
)
);
STable
*
pTable
=
pCheckInfo
->
pTableObj
;
int32_t
endPos
=
cur
->
pos
;
...
...
@@ -1326,7 +1327,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
size_t
size
=
sizeof
(
STableBlockInfo
)
*
numOfBlocks
;
if
(
pQueryHandle
->
allocSize
<
size
)
{
pQueryHandle
->
allocSize
=
size
;
pQueryHandle
->
allocSize
=
(
int32_t
)
size
;
char
*
tmp
=
realloc
(
pQueryHandle
->
pDataBlockInfo
,
pQueryHandle
->
allocSize
);
if
(
tmp
==
NULL
)
{
return
TSDB_CODE_TDB_OUT_OF_MEMORY
;
...
...
@@ -1338,7 +1339,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
memset
(
pQueryHandle
->
pDataBlockInfo
,
0
,
size
);
*
numOfAllocBlocks
=
numOfBlocks
;
int32_t
numOfTables
=
taosArrayGetSize
(
pQueryHandle
->
pTableCheckInfo
);
int32_t
numOfTables
=
(
int32_t
)
taosArrayGetSize
(
pQueryHandle
->
pTableCheckInfo
);
SBlockOrderSupporter
sup
=
{
0
};
sup
.
numOfTables
=
numOfTables
;
...
...
@@ -1445,7 +1446,7 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
numOfBlocks
=
0
;
int32_t
numOfTables
=
taosArrayGetSize
(
pQueryHandle
->
pTableCheckInfo
);
int32_t
numOfTables
=
(
int32_t
)
taosArrayGetSize
(
pQueryHandle
->
pTableCheckInfo
);
STsdbCfg
*
pCfg
=
&
pQueryHandle
->
pTsdb
->
config
;
STimeWindow
win
=
TSWINDOW_INITIALIZER
;
...
...
@@ -1615,10 +1616,10 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
if
(
pQueryHandle
->
cur
.
win
.
ekey
==
pQueryHandle
->
window
.
skey
)
{
// data already retrieve, discard other data rows and return
int32_t
numOfCols
=
QH_GET_NUM_OF_COLS
(
pQueryHandle
);
int32_t
numOfCols
=
(
int32_t
)(
QH_GET_NUM_OF_COLS
(
pQueryHandle
)
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
pCol
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
i
);
memcpy
(
pCol
->
pData
,
pCol
->
pData
+
pCol
->
info
.
bytes
*
(
pQueryHandle
->
cur
.
rows
-
1
),
pCol
->
info
.
bytes
);
memcpy
(
(
char
*
)
pCol
->
pData
,
(
char
*
)
pCol
->
pData
+
pCol
->
info
.
bytes
*
(
pQueryHandle
->
cur
.
rows
-
1
),
pCol
->
info
.
bytes
);
}
pQueryHandle
->
cur
.
win
=
(
STimeWindow
){
pQueryHandle
->
window
.
skey
,
pQueryHandle
->
window
.
skey
};
...
...
@@ -1646,7 +1647,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
tsdbTakeMemSnapshot
(
pSecQueryHandle
->
pTsdb
,
&
pSecQueryHandle
->
mem
,
&
pSecQueryHandle
->
imem
);
// allocate buffer in order to load data blocks from file
int32_t
numOfCols
=
QH_GET_NUM_OF_COLS
(
pQueryHandle
);
int32_t
numOfCols
=
(
int32_t
)(
QH_GET_NUM_OF_COLS
(
pQueryHandle
)
);
pSecQueryHandle
->
statis
=
calloc
(
numOfCols
,
sizeof
(
SDataStatis
));
pSecQueryHandle
->
pColumns
=
taosArrayInit
(
numOfCols
,
sizeof
(
SColumnInfoData
));
...
...
@@ -1669,9 +1670,10 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
STableCheckInfo
*
pCheckInfo
=
(
STableCheckInfo
*
)
taosArrayGet
(
pQueryHandle
->
pTableCheckInfo
,
j
);
STableCheckInfo
info
=
{
.
lastKey
=
pSecQueryHandle
->
window
.
skey
,
.
tableId
=
pCheckInfo
->
tableId
,
//
.tableId = pCheckInfo->tableId,
.
pTableObj
=
pCheckInfo
->
pTableObj
,
};
info
.
tableId
=
pCheckInfo
->
tableId
;
taosArrayPush
(
pSecQueryHandle
->
pTableCheckInfo
,
&
info
);
}
...
...
@@ -1688,12 +1690,12 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
pCol
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
i
);
memcpy
(
pCol
->
pData
,
pCol
->
pData
+
pCol
->
info
.
bytes
*
(
pQueryHandle
->
cur
.
rows
-
1
),
pCol
->
info
.
bytes
);
memcpy
(
(
char
*
)
pCol
->
pData
,
(
char
*
)
pCol
->
pData
+
pCol
->
info
.
bytes
*
(
pQueryHandle
->
cur
.
rows
-
1
),
pCol
->
info
.
bytes
);
SColumnInfoData
*
pCol1
=
taosArrayGet
(
pSecQueryHandle
->
pColumns
,
i
);
assert
(
pCol
->
info
.
colId
==
pCol1
->
info
.
colId
);
memcpy
(
pCol
->
pData
+
pCol
->
info
.
bytes
,
pCol1
->
pData
,
pCol1
->
info
.
bytes
);
memcpy
(
(
char
*
)
pCol
->
pData
+
pCol
->
info
.
bytes
,
pCol1
->
pData
,
pCol1
->
info
.
bytes
);
}
SColumnInfoData
*
pTSCol
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
0
);
...
...
@@ -1839,7 +1841,7 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) {
static
int
tsdbReadRowsFromCache
(
STableCheckInfo
*
pCheckInfo
,
TSKEY
maxKey
,
int
maxRowsToRead
,
STimeWindow
*
win
,
STsdbQueryHandle
*
pQueryHandle
)
{
int
numOfRows
=
0
;
int32_t
numOfCols
=
taosArrayGetSize
(
pQueryHandle
->
pColumns
);
int32_t
numOfCols
=
(
int32_t
)
taosArrayGetSize
(
pQueryHandle
->
pColumns
);
win
->
skey
=
TSKEY_INITIAL_VAL
;
int64_t
st
=
taosGetTimestampUs
();
...
...
@@ -1881,7 +1883,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
i
);
memmove
(
pColInfo
->
pData
,
pColInfo
->
pData
+
emptySize
*
pColInfo
->
info
.
bytes
,
numOfRows
*
pColInfo
->
info
.
bytes
);
memmove
(
(
char
*
)
pColInfo
->
pData
,
(
char
*
)
pColInfo
->
pData
+
emptySize
*
pColInfo
->
info
.
bytes
,
numOfRows
*
pColInfo
->
info
.
bytes
);
}
}
...
...
@@ -1910,7 +1912,7 @@ void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle, SDataBlockInfo* p
pDataBlockInfo
->
tid
=
pTable
->
tableId
.
tid
;
pDataBlockInfo
->
rows
=
cur
->
rows
;
pDataBlockInfo
->
window
=
cur
->
win
;
pDataBlockInfo
->
numOfCols
=
QH_GET_NUM_OF_COLS
(
pHandle
);
pDataBlockInfo
->
numOfCols
=
(
int32_t
)(
QH_GET_NUM_OF_COLS
(
pHandle
)
);
}
/*
...
...
@@ -1945,7 +1947,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
pHandle
->
statis
[
i
].
colId
=
colIds
[
i
];
}
tsdbGetDataStatis
(
&
pHandle
->
rhelper
,
pHandle
->
statis
,
numOfCols
);
tsdbGetDataStatis
(
&
pHandle
->
rhelper
,
pHandle
->
statis
,
(
int
)
numOfCols
);
// always load the first primary timestamp column data
SDataStatis
*
pPrimaryColStatis
=
&
pHandle
->
statis
[
0
];
...
...
@@ -2010,11 +2012,11 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
if
(
!
ASCENDING_TRAVERSE
(
pHandle
->
order
)
&&
numOfRows
<
pHandle
->
outputCapacity
)
{
int32_t
emptySize
=
pHandle
->
outputCapacity
-
numOfRows
;
int32_t
reqNumOfCols
=
taosArrayGetSize
(
pHandle
->
pColumns
);
int32_t
reqNumOfCols
=
(
int32_t
)
taosArrayGetSize
(
pHandle
->
pColumns
);
for
(
int32_t
i
=
0
;
i
<
reqNumOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pHandle
->
pColumns
,
i
);
memmove
(
pColInfo
->
pData
,
pColInfo
->
pData
+
emptySize
*
pColInfo
->
info
.
bytes
,
numOfRows
*
pColInfo
->
info
.
bytes
);
memmove
(
(
char
*
)
pColInfo
->
pData
,
(
char
*
)
pColInfo
->
pData
+
emptySize
*
pColInfo
->
info
.
bytes
,
numOfRows
*
pColInfo
->
info
.
bytes
);
}
}
...
...
@@ -2332,7 +2334,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT
THROW
(
TSDB_CODE_TDB_OUT_OF_MEMORY
);
}
expr
->
nodeType
=
TSQL_NODE_EXPR
;
expr
->
_node
.
optr
=
tagNameRelType
;
expr
->
_node
.
optr
=
(
uint8_t
)
tagNameRelType
;
expr
->
_node
.
pLeft
=
tagExpr
;
expr
->
_node
.
pRight
=
tbnameExpr
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录