Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
fb6fa6ad
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
fb6fa6ad
编写于
2月 28, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
2月 28, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #10449 from taosdata/feature/config
minor changes
上级
f021015e
26adff1f
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
297 addition
and
311 deletion
+297
-311
include/os/os.h
include/os/os.h
+3
-3
include/os/osFile.h
include/os/osFile.h
+2
-2
include/os/osMemory.h
include/os/osMemory.h
+4
-4
include/os/osSemaphore.h
include/os/osSemaphore.h
+2
-2
include/os/osSocket.h
include/os/osSocket.h
+4
-4
include/os/osSysinfo.h
include/os/osSysinfo.h
+2
-2
include/os/osThread.h
include/os/osThread.h
+2
-2
source/common/src/tdataformat.c
source/common/src/tdataformat.c
+27
-31
source/common/src/ttime.c
source/common/src/ttime.c
+61
-70
source/common/src/ttszip.c
source/common/src/ttszip.c
+190
-191
未找到文件。
include/os/os.h
浏览文件 @
fb6fa6ad
...
...
@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef
TDENGINE_OS_H
#define
TDENGINE_OS_H
#ifndef
_TD_OS_H_
#define
_TD_OS_H_
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -82,4 +82,4 @@ void osInit();
}
#endif
#endif
#endif
/*_TD_OS_H_*/
include/os/osFile.h
浏览文件 @
fb6fa6ad
...
...
@@ -16,12 +16,12 @@
#ifndef _TD_OS_FILE_H_
#define _TD_OS_FILE_H_
#include "osSocket.h"
#ifdef __cplusplus
extern
"C"
{
#endif
#include "osSocket.h"
#ifndef ALLOW_FORBID_FUNC
#define open OPEN_FUNC_TAOS_FORBID
#define fopen FOPEN_FUNC_TAOS_FORBID
...
...
include/os/osMemory.h
浏览文件 @
fb6fa6ad
...
...
@@ -20,12 +20,12 @@
extern
"C"
{
#endif
#define tfree(x) \
do { \
if (x) { \
#define tfree(x)
\
do {
\
if (x) {
\
free((void *)(x)); \
(x) = 0; \
} \
}
\
} while (0)
#ifdef __cplusplus
...
...
include/os/osSemaphore.h
浏览文件 @
fb6fa6ad
...
...
@@ -16,12 +16,12 @@
#ifndef _TD_OS_SEMPHONE_H_
#define _TD_OS_SEMPHONE_H_
#include <semaphore.h>
#ifdef __cplusplus
extern
"C"
{
#endif
#include <semaphore.h>
#if defined (_TD_DARWIN_64)
typedef
struct
tsem_s
*
tsem_t
;
int
tsem_init
(
tsem_t
*
sem
,
int
pshared
,
unsigned
int
value
);
...
...
include/os/osSocket.h
浏览文件 @
fb6fa6ad
...
...
@@ -16,10 +16,6 @@
#ifndef _TD_OS_SOCKET_H_
#define _TD_OS_SOCKET_H_
#ifdef __cplusplus
extern
"C"
{
#endif
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#include "winsock2.h"
#include <WS2tcpip.h>
...
...
@@ -30,6 +26,10 @@ extern "C" {
#include <sys/epoll.h>
#endif
#ifdef __cplusplus
extern
"C"
{
#endif
#define TAOS_EPOLL_WAIT_TIME 500
typedef
int32_t
SOCKET
;
typedef
SOCKET
EpollFd
;
...
...
include/os/osSysinfo.h
浏览文件 @
fb6fa6ad
...
...
@@ -16,12 +16,12 @@
#ifndef _TD_OS_SYSINFO_H_
#define _TD_OS_SYSINFO_H_
#include "os.h"
#ifdef __cplusplus
extern
"C"
{
#endif
#include "os.h"
typedef
struct
{
int64_t
total
;
int64_t
used
;
...
...
include/os/osThread.h
浏览文件 @
fb6fa6ad
...
...
@@ -16,12 +16,12 @@
#ifndef _TD_OS_THREAD_H_
#define _TD_OS_THREAD_H_
#include <pthread.h>
#ifdef __cplusplus
extern
"C"
{
#endif
#include <pthread.h>
#ifdef __cplusplus
}
#endif
...
...
source/common/src/tdataformat.c
浏览文件 @
fb6fa6ad
...
...
@@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE
#include "tdataformat.h"
#include "tcoding.h"
#include "tcoding.h"
#include "tlog.h"
static
void
dataColSetNEleNull
(
SDataCol
*
pCol
,
int
nEle
);
...
...
@@ -25,7 +25,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
#endif
int
tdAllocMemForCol
(
SDataCol
*
pCol
,
int
maxPoints
)
{
int
spaceNeeded
=
pCol
->
bytes
*
maxPoints
;
if
(
IS_VAR_DATA_TYPE
(
pCol
->
type
))
{
if
(
IS_VAR_DATA_TYPE
(
pCol
->
type
))
{
spaceNeeded
+=
sizeof
(
VarDataOffsetT
)
*
maxPoints
;
}
#ifdef TD_SUPPORT_BITMAP
...
...
@@ -36,11 +36,10 @@ int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
spaceNeeded
+=
TYPE_BYTES
[
pCol
->
type
];
#endif
if
(
pCol
->
spaceSize
<
spaceNeeded
)
{
void
*
ptr
=
realloc
(
pCol
->
pData
,
spaceNeeded
);
if
(
ptr
==
NULL
)
{
uDebug
(
"malloc failure, size:%"
PRId64
" failed, reason:%s"
,
(
int64_t
)
spaceNeeded
,
strerror
(
errno
));
if
(
pCol
->
spaceSize
<
spaceNeeded
)
{
void
*
ptr
=
realloc
(
pCol
->
pData
,
spaceNeeded
);
if
(
ptr
==
NULL
)
{
uDebug
(
"malloc failure, size:%"
PRId64
" failed, reason:%s"
,
(
int64_t
)
spaceNeeded
,
strerror
(
errno
));
return
-
1
;
}
else
{
pCol
->
pData
=
ptr
;
...
...
@@ -66,8 +65,7 @@ int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
* Duplicate the schema and return a new object
*/
STSchema
*
tdDupSchema
(
const
STSchema
*
pSchema
)
{
int
tlen
=
sizeof
(
STSchema
)
+
sizeof
(
STColumn
)
*
schemaNCols
(
pSchema
);
int
tlen
=
sizeof
(
STSchema
)
+
sizeof
(
STColumn
)
*
schemaNCols
(
pSchema
);
STSchema
*
tSchema
=
(
STSchema
*
)
malloc
(
tlen
);
if
(
tSchema
==
NULL
)
return
NULL
;
...
...
@@ -98,8 +96,8 @@ int tdEncodeSchema(void **buf, STSchema *pSchema) {
* Decode a schema from a binary.
*/
void
*
tdDecodeSchema
(
void
*
buf
,
STSchema
**
pRSchema
)
{
int
version
=
0
;
int
numOfCols
=
0
;
int
version
=
0
;
int
numOfCols
=
0
;
STSchemaBuilder
schemaBuilder
;
buf
=
taosDecodeFixedI32
(
buf
,
&
version
);
...
...
@@ -155,7 +153,7 @@ int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int1
if
(
pBuilder
->
nCols
>=
pBuilder
->
tCols
)
{
pBuilder
->
tCols
*=
2
;
STColumn
*
columns
=
(
STColumn
*
)
realloc
(
pBuilder
->
columns
,
sizeof
(
STColumn
)
*
pBuilder
->
tCols
);
STColumn
*
columns
=
(
STColumn
*
)
realloc
(
pBuilder
->
columns
,
sizeof
(
STColumn
)
*
pBuilder
->
tCols
);
if
(
columns
==
NULL
)
return
-
1
;
pBuilder
->
columns
=
columns
;
}
...
...
@@ -166,7 +164,7 @@ int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int1
if
(
pBuilder
->
nCols
==
0
)
{
colSetOffset
(
pCol
,
0
);
}
else
{
STColumn
*
pTCol
=
&
(
pBuilder
->
columns
[
pBuilder
->
nCols
-
1
]);
STColumn
*
pTCol
=
&
(
pBuilder
->
columns
[
pBuilder
->
nCols
-
1
]);
colSetOffset
(
pCol
,
pTCol
->
offset
+
TYPE_BYTES
[
pTCol
->
type
]);
}
...
...
@@ -258,7 +256,7 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) {
pDataCol
->
type
=
colType
(
pCol
);
pDataCol
->
colId
=
colColId
(
pCol
);
pDataCol
->
bytes
=
colBytes
(
pCol
);
pDataCol
->
offset
=
colOffset
(
pCol
)
+
0
;
//
TD_DATA_ROW_HEAD_SIZE;
pDataCol
->
offset
=
colOffset
(
pCol
)
+
0
;
//
TD_DATA_ROW_HEAD_SIZE;
pDataCol
->
len
=
0
;
}
...
...
@@ -272,7 +270,7 @@ int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPo
return
0
;
}
if
(
tdAllocMemForCol
(
pCol
,
maxPoints
)
<
0
)
return
-
1
;
if
(
tdAllocMemForCol
(
pCol
,
maxPoints
)
<
0
)
return
-
1
;
if
(
numOfRows
>
0
)
{
// Find the first not null value, fill all previouse values as NULL
dataColSetNEleNull
(
pCol
,
numOfRows
);
...
...
@@ -303,7 +301,7 @@ static FORCE_INLINE const void *tdGetColDataOfRowUnsafe(SDataCol *pCol, int row)
}
bool
isNEleNull
(
SDataCol
*
pCol
,
int
nEle
)
{
if
(
isAllRowsNull
(
pCol
))
return
true
;
if
(
isAllRowsNull
(
pCol
))
return
true
;
for
(
int
i
=
0
;
i
<
nEle
;
i
++
)
{
if
(
!
isNull
(
tdGetColDataOfRowUnsafe
(
pCol
,
i
),
pCol
->
type
))
return
false
;
}
...
...
@@ -370,7 +368,7 @@ SDataCols *tdNewDataCols(int maxCols, int maxRows) {
return
NULL
;
}
int
i
;
for
(
i
=
0
;
i
<
maxCols
;
i
++
)
{
for
(
i
=
0
;
i
<
maxCols
;
i
++
)
{
pCols
->
cols
[
i
].
spaceSize
=
0
;
pCols
->
cols
[
i
].
len
=
0
;
pCols
->
cols
[
i
].
pData
=
NULL
;
...
...
@@ -386,10 +384,10 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
int
oldMaxCols
=
pCols
->
maxCols
;
if
(
schemaNCols
(
pSchema
)
>
oldMaxCols
)
{
pCols
->
maxCols
=
schemaNCols
(
pSchema
);
void
*
ptr
=
(
SDataCol
*
)
realloc
(
pCols
->
cols
,
sizeof
(
SDataCol
)
*
pCols
->
maxCols
);
void
*
ptr
=
(
SDataCol
*
)
realloc
(
pCols
->
cols
,
sizeof
(
SDataCol
)
*
pCols
->
maxCols
);
if
(
ptr
==
NULL
)
return
-
1
;
pCols
->
cols
=
ptr
;
for
(
i
=
oldMaxCols
;
i
<
pCols
->
maxCols
;
i
++
)
{
for
(
i
=
oldMaxCols
;
i
<
pCols
->
maxCols
;
i
++
)
{
pCols
->
cols
[
i
].
pData
=
NULL
;
pCols
->
cols
[
i
].
dataOff
=
NULL
;
pCols
->
cols
[
i
].
spaceSize
=
0
;
...
...
@@ -402,16 +400,16 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
for
(
i
=
0
;
i
<
schemaNCols
(
pSchema
);
i
++
)
{
dataColInit
(
pCols
->
cols
+
i
,
schemaColAt
(
pSchema
,
i
),
pCols
->
maxPoints
);
}
return
0
;
}
SDataCols
*
tdFreeDataCols
(
SDataCols
*
pCols
)
{
int
i
;
if
(
pCols
)
{
if
(
pCols
->
cols
)
{
if
(
pCols
->
cols
)
{
int
maxCols
=
pCols
->
maxCols
;
for
(
i
=
0
;
i
<
maxCols
;
i
++
)
{
for
(
i
=
0
;
i
<
maxCols
;
i
++
)
{
SDataCol
*
pCol
=
&
pCols
->
cols
[
i
];
tfree
(
pCol
->
pData
);
}
...
...
@@ -439,7 +437,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
if
(
keepData
)
{
if
(
pDataCols
->
cols
[
i
].
len
>
0
)
{
if
(
tdAllocMemForCol
(
&
pRet
->
cols
[
i
],
pRet
->
maxPoints
)
<
0
)
{
if
(
tdAllocMemForCol
(
&
pRet
->
cols
[
i
],
pRet
->
maxPoints
)
<
0
)
{
tdFreeDataCols
(
pRet
);
return
NULL
;
}
...
...
@@ -647,9 +645,9 @@ SKVRow tdKVRowDup(SKVRow row) {
return
trow
;
}
static
int
compareColIdx
(
const
void
*
a
,
const
void
*
b
)
{
const
SColIdx
*
x
=
(
const
SColIdx
*
)
a
;
const
SColIdx
*
y
=
(
const
SColIdx
*
)
b
;
static
int
compareColIdx
(
const
void
*
a
,
const
void
*
b
)
{
const
SColIdx
*
x
=
(
const
SColIdx
*
)
a
;
const
SColIdx
*
y
=
(
const
SColIdx
*
)
b
;
if
(
x
->
colId
>
y
->
colId
)
{
return
1
;
}
...
...
@@ -659,15 +657,13 @@ static int compareColIdx(const void* a, const void* b) {
return
0
;
}
void
tdSortKVRowByColIdx
(
SKVRow
row
)
{
qsort
(
kvRowColIdx
(
row
),
kvRowNCols
(
row
),
sizeof
(
SColIdx
),
compareColIdx
);
}
void
tdSortKVRowByColIdx
(
SKVRow
row
)
{
qsort
(
kvRowColIdx
(
row
),
kvRowNCols
(
row
),
sizeof
(
SColIdx
),
compareColIdx
);
}
int
tdSetKVRowDataOfCol
(
SKVRow
*
orow
,
int16_t
colId
,
int8_t
type
,
void
*
value
)
{
SColIdx
*
pColIdx
=
NULL
;
SKVRow
row
=
*
orow
;
SKVRow
nrow
=
NULL
;
void
*
ptr
=
taosbsearch
(
&
colId
,
kvRowColIdx
(
row
),
kvRowNCols
(
row
),
sizeof
(
SColIdx
),
comparTagId
,
TD_GE
);
void
*
ptr
=
taosbsearch
(
&
colId
,
kvRowColIdx
(
row
),
kvRowNCols
(
row
),
sizeof
(
SColIdx
),
comparTagId
,
TD_GE
);
if
(
ptr
==
NULL
||
((
SColIdx
*
)
ptr
)
->
colId
>
colId
)
{
// need to add a column value to the row
int
diff
=
IS_VAR_DATA_TYPE
(
type
)
?
varDataTLen
(
value
)
:
TYPE_BYTES
[
type
];
...
...
@@ -699,7 +695,7 @@ int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
if
(
IS_VAR_DATA_TYPE
(
type
))
{
void
*
pOldVal
=
kvRowColVal
(
row
,
(
SColIdx
*
)
ptr
);
if
(
varDataTLen
(
value
)
==
varDataTLen
(
pOldVal
))
{
// just update the column value in place
if
(
varDataTLen
(
value
)
==
varDataTLen
(
pOldVal
))
{
// just update the column value in place
memcpy
(
pOldVal
,
value
,
varDataTLen
(
value
));
}
else
{
// need to reallocate the memory
int16_t
nlen
=
kvRowLen
(
row
)
+
(
varDataTLen
(
value
)
-
varDataTLen
(
pOldVal
));
...
...
source/common/src/ttime.c
浏览文件 @
fb6fa6ad
...
...
@@ -43,50 +43,46 @@
* An encoding of midnight at the end of the day as 24:00:00 - ie. midnight
* tomorrow - (allowable under ISO 8601) is supported.
*/
static
int64_t
user_mktime64
(
const
unsigned
int
year0
,
const
unsigned
int
mon0
,
const
unsigned
int
day
,
const
unsigned
int
hour
,
const
unsigned
int
min
,
const
unsigned
int
sec
,
int64_t
time_zone
)
{
unsigned
int
mon
=
mon0
,
year
=
year0
;
static
int64_t
user_mktime64
(
const
uint32_t
year0
,
const
uint32_t
mon0
,
const
uint32_t
day
,
const
uint32_t
hour
,
const
uint32_t
min
,
const
uint32_t
sec
,
int64_t
time_zone
)
{
uint32_t
mon
=
mon0
,
year
=
year0
;
/* 1..12 -> 11,12,1..10 */
if
(
0
>=
(
int
)
(
mon
-=
2
))
{
mon
+=
12
;
/* Puts Feb last since it has leap day */
if
(
0
>=
(
int
32_t
)
(
mon
-=
2
))
{
mon
+=
12
;
/* Puts Feb last since it has leap day */
year
-=
1
;
}
//int64_t res = (((((int64_t) (year/4 - year/100 + year/400 + 367*mon/12 + day) +
// year*365 - 719499)*24 + hour)*60 + min)*60 + sec);
//
int64_t res = (((((int64_t) (year/4 - year/100 + year/400 + 367*mon/12 + day) +
//
year*365 - 719499)*24 + hour)*60 + min)*60 + sec);
int64_t
res
;
res
=
367
*
((
int64_t
)
mon
)
/
12
;
res
+=
year
/
4
-
year
/
100
+
year
/
400
+
day
+
((
int64_t
)
year
)
*
365
-
719499
;
res
=
res
*
24
;
res
=
((
res
+
hour
)
*
60
+
min
)
*
60
+
sec
;
res
=
367
*
((
int64_t
)
mon
)
/
12
;
res
+=
year
/
4
-
year
/
100
+
year
/
400
+
day
+
((
int64_t
)
year
)
*
365
-
719499
;
res
=
res
*
24
;
res
=
((
res
+
hour
)
*
60
+
min
)
*
60
+
sec
;
return
(
res
+
time_zone
);
}
// ==== mktime() kernel code =================//
static
int64_t
m_deltaUtc
=
0
;
void
deltaToUtcInitOnce
()
{
void
deltaToUtcInitOnce
()
{
struct
tm
tm
=
{
0
};
(
void
)
strptime
(
"1970-01-01 00:00:00"
,
(
const
char
*
)(
"%Y-%m-%d %H:%M:%S"
),
&
tm
);
(
void
)
strptime
(
"1970-01-01 00:00:00"
,
(
const
char
*
)(
"%Y-%m-%d %H:%M:%S"
),
&
tm
);
m_deltaUtc
=
(
int64_t
)
mktime
(
&
tm
);
//printf("====delta:%lld\n\n", seconds);
//
printf("====delta:%lld\n\n", seconds);
}
static
int64_t
parseFraction
(
char
*
str
,
char
**
end
,
int32_t
timePrec
);
static
int32_t
parseTimeWithTz
(
const
char
*
timestr
,
int64_t
*
time
,
int32_t
timePrec
,
char
delim
);
static
int32_t
parseLocaltime
(
char
*
timestr
,
int64_t
*
time
,
int32_t
timePrec
);
static
int32_t
parseLocaltimeDst
(
char
*
timestr
,
int64_t
*
time
,
int32_t
timePrec
);
static
char
*
forwardToTimeStringEnd
(
char
*
str
);
static
bool
checkTzPresent
(
const
char
*
str
,
int32_t
len
);
static
char
*
forwardToTimeStringEnd
(
char
*
str
);
static
bool
checkTzPresent
(
const
char
*
str
,
int32_t
len
);
static
int32_t
(
*
parseLocaltimeFp
[])
(
char
*
timestr
,
int64_t
*
time
,
int32_t
timePrec
)
=
{
parseLocaltime
,
parseLocaltimeDst
};
static
int32_t
(
*
parseLocaltimeFp
[])(
char
*
timestr
,
int64_t
*
time
,
int32_t
timePrec
)
=
{
parseLocaltime
,
parseLocaltimeDst
};
int32_t
taosParseTime
(
const
char
*
timestr
,
int64_t
*
time
,
int32_t
len
,
int32_t
timePrec
,
int8_t
day_light
)
{
/* parse datatime string in with tz */
...
...
@@ -103,9 +99,9 @@ bool checkTzPresent(const char* str, int32_t len) {
char
*
seg
=
forwardToTimeStringEnd
((
char
*
)
str
);
int32_t
seg_len
=
len
-
(
int32_t
)(
seg
-
str
);
char
*
c
=
&
seg
[
seg_len
-
1
];
for
(
int
i
=
0
;
i
<
seg_len
;
++
i
)
{
if
(
*
c
==
'Z'
||
*
c
==
'z'
||
*
c
==
'+'
||
*
c
==
'-'
)
{
char
*
c
=
&
seg
[
seg_len
-
1
];
for
(
int
32_t
i
=
0
;
i
<
seg_len
;
++
i
)
{
if
(
*
c
==
'Z'
||
*
c
==
'z'
||
*
c
==
'+'
||
*
c
==
'-'
)
{
return
true
;
}
c
--
;
...
...
@@ -199,13 +195,12 @@ int32_t parseTimezone(char* str, int64_t* tzOffset) {
i
+=
2
;
}
//return error if there're illegal charaters after min(2 Digits)
char
*
minStr
=
&
str
[
i
];
//
return error if there're illegal charaters after min(2 Digits)
char
*
minStr
=
&
str
[
i
];
if
(
minStr
[
1
]
!=
'\0'
&&
minStr
[
2
]
!=
'\0'
)
{
return
-
1
;
return
-
1
;
}
int64_t
minute
=
strnatoi
(
&
str
[
i
],
2
);
if
(
minute
>
59
)
{
return
-
1
;
...
...
@@ -233,9 +228,8 @@ int32_t parseTimezone(char* str, int64_t* tzOffset) {
* 2013-04-12T15:52:01.123+0800
*/
int32_t
parseTimeWithTz
(
const
char
*
timestr
,
int64_t
*
time
,
int32_t
timePrec
,
char
delim
)
{
int64_t
factor
=
(
timePrec
==
TSDB_TIME_PRECISION_MILLI
)
?
1000
:
(
timePrec
==
TSDB_TIME_PRECISION_MICRO
?
1000000
:
1000000000
);
int64_t
factor
=
(
timePrec
==
TSDB_TIME_PRECISION_MILLI
)
?
1000
:
(
timePrec
==
TSDB_TIME_PRECISION_MICRO
?
1000000
:
1000000000
);
int64_t
tzOffset
=
0
;
struct
tm
tm
=
{
0
};
...
...
@@ -255,8 +249,8 @@ int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, ch
/* mktime will be affected by TZ, set by using taos_options */
#ifdef WINDOWS
int64_t
seconds
=
user_mktime64
(
tm
.
tm_year
+
1900
,
tm
.
tm_mon
+
1
,
tm
.
tm_mday
,
tm
.
tm_hour
,
tm
.
tm_min
,
tm
.
tm_sec
,
0
);
//
int64_t seconds = gmtime(&tm);
int64_t
seconds
=
user_mktime64
(
tm
.
tm_year
+
1900
,
tm
.
tm_mon
+
1
,
tm
.
tm_mday
,
tm
.
tm_hour
,
tm
.
tm_min
,
tm
.
tm_sec
,
0
);
//
int64_t seconds = gmtime(&tm);
#else
int64_t
seconds
=
timegm
(
&
tm
);
#endif
...
...
@@ -320,8 +314,9 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) {
#endif
#endif
int64_t
seconds
=
user_mktime64
(
tm
.
tm_year
+
1900
,
tm
.
tm_mon
+
1
,
tm
.
tm_mday
,
tm
.
tm_hour
,
tm
.
tm_min
,
tm
.
tm_sec
,
timezone
);
int64_t
seconds
=
user_mktime64
(
tm
.
tm_year
+
1900
,
tm
.
tm_mon
+
1
,
tm
.
tm_mday
,
tm
.
tm_hour
,
tm
.
tm_min
,
tm
.
tm_sec
,
timezone
);
int64_t
fraction
=
0
;
if
(
*
str
==
'.'
)
{
...
...
@@ -331,8 +326,8 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) {
}
}
int64_t
factor
=
(
timePrec
==
TSDB_TIME_PRECISION_MILLI
)
?
1000
:
(
timePrec
==
TSDB_TIME_PRECISION_MICRO
?
1000000
:
1000000000
);
int64_t
factor
=
(
timePrec
==
TSDB_TIME_PRECISION_MILLI
)
?
1000
:
(
timePrec
==
TSDB_TIME_PRECISION_MICRO
?
1000000
:
1000000000
);
*
time
=
factor
*
seconds
+
fraction
;
return
0
;
...
...
@@ -350,7 +345,7 @@ int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) {
/* mktime will be affected by TZ, set by using taos_options */
int64_t
seconds
=
mktime
(
&
tm
);
int64_t
fraction
=
0
;
if
(
*
str
==
'.'
)
{
...
...
@@ -360,27 +355,22 @@ int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) {
}
}
int64_t
factor
=
(
timePrec
==
TSDB_TIME_PRECISION_MILLI
)
?
1000
:
(
timePrec
==
TSDB_TIME_PRECISION_MICRO
?
1000000
:
1000000000
);
int64_t
factor
=
(
timePrec
==
TSDB_TIME_PRECISION_MILLI
)
?
1000
:
(
timePrec
==
TSDB_TIME_PRECISION_MICRO
?
1000000
:
1000000000
);
*
time
=
factor
*
seconds
+
fraction
;
return
0
;
}
int64_t
convertTimePrecision
(
int64_t
time
,
int32_t
fromPrecision
,
int32_t
toPrecision
)
{
assert
(
fromPrecision
==
TSDB_TIME_PRECISION_MILLI
||
fromPrecision
==
TSDB_TIME_PRECISION_MICRO
||
assert
(
fromPrecision
==
TSDB_TIME_PRECISION_MILLI
||
fromPrecision
==
TSDB_TIME_PRECISION_MICRO
||
fromPrecision
==
TSDB_TIME_PRECISION_NANO
);
assert
(
toPrecision
==
TSDB_TIME_PRECISION_MILLI
||
toPrecision
==
TSDB_TIME_PRECISION_MICRO
||
assert
(
toPrecision
==
TSDB_TIME_PRECISION_MILLI
||
toPrecision
==
TSDB_TIME_PRECISION_MICRO
||
toPrecision
==
TSDB_TIME_PRECISION_NANO
);
static
double
factors
[
3
][
3
]
=
{
{
1
.,
1000
.,
1000000
.},
{
1
.
0
/
1000
,
1
.,
1000
.},
{
1
.
0
/
1000000
,
1
.
0
/
1000
,
1
.}
};
static
double
factors
[
3
][
3
]
=
{{
1
.,
1000
.,
1000000
.},
{
1
.
0
/
1000
,
1
.,
1000
.},
{
1
.
0
/
1000000
,
1
.
0
/
1000
,
1
.}};
return
(
int64_t
)((
double
)
time
*
factors
[
fromPrecision
][
toPrecision
]);
}
static
int32_t
getDuration
(
int64_t
val
,
char
unit
,
int64_t
*
result
,
int32_t
timePrecision
)
{
switch
(
unit
)
{
case
's'
:
(
*
result
)
=
convertTimePrecision
(
val
*
MILLISECOND_PER_SECOND
,
TSDB_TIME_PRECISION_MILLI
,
timePrecision
);
...
...
@@ -427,7 +417,8 @@ static int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t time
* d - Days (24 hours)
* w - Weeks (7 days)
*/
int32_t
parseAbsoluteDuration
(
const
char
*
token
,
int32_t
tokenlen
,
int64_t
*
duration
,
char
*
unit
,
int32_t
timePrecision
)
{
int32_t
parseAbsoluteDuration
(
const
char
*
token
,
int32_t
tokenlen
,
int64_t
*
duration
,
char
*
unit
,
int32_t
timePrecision
)
{
errno
=
0
;
char
*
endPtr
=
NULL
;
...
...
@@ -474,9 +465,9 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) {
}
struct
tm
tm
;
time_t
tt
=
(
time_t
)(
t
/
TSDB_TICK_PER_SECOND
(
precision
));
time_t
tt
=
(
time_t
)(
t
/
TSDB_TICK_PER_SECOND
(
precision
));
localtime_r
(
&
tt
,
&
tm
);
int
mon
=
tm
.
tm_year
*
12
+
tm
.
tm_mon
+
(
in
t
)
duration
;
int
32_t
mon
=
tm
.
tm_year
*
12
+
tm
.
tm_mon
+
(
int32_
t
)
duration
;
tm
.
tm_year
=
mon
/
12
;
tm
.
tm_mon
=
mon
%
12
;
...
...
@@ -497,13 +488,13 @@ int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char
ekey
/=
(
int64_t
)(
TSDB_TICK_PER_SECOND
(
precision
));
struct
tm
tm
;
time_t
t
=
(
time_t
)
skey
;
time_t
t
=
(
time_t
)
skey
;
localtime_r
(
&
t
,
&
tm
);
int
smon
=
tm
.
tm_year
*
12
+
tm
.
tm_mon
;
int
32_t
smon
=
tm
.
tm_year
*
12
+
tm
.
tm_mon
;
t
=
(
time_t
)
ekey
;
localtime_r
(
&
t
,
&
tm
);
int
emon
=
tm
.
tm_year
*
12
+
tm
.
tm_mon
;
int
32_t
emon
=
tm
.
tm_year
*
12
+
tm
.
tm_mon
;
if
(
unit
==
'y'
)
{
interval
*=
12
;
...
...
@@ -522,7 +513,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
if
(
pInterval
->
slidingUnit
==
'n'
||
pInterval
->
slidingUnit
==
'y'
)
{
start
/=
(
int64_t
)(
TSDB_TICK_PER_SECOND
(
precision
));
struct
tm
tm
;
time_t
tt
=
(
time_t
)
start
;
time_t
tt
=
(
time_t
)
start
;
localtime_r
(
&
tt
,
&
tm
);
tm
.
tm_sec
=
0
;
tm
.
tm_min
=
0
;
...
...
@@ -531,10 +522,10 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
if
(
pInterval
->
slidingUnit
==
'y'
)
{
tm
.
tm_mon
=
0
;
tm
.
tm_year
=
(
int
)(
tm
.
tm_year
/
pInterval
->
sliding
*
pInterval
->
sliding
);
tm
.
tm_year
=
(
int
32_t
)(
tm
.
tm_year
/
pInterval
->
sliding
*
pInterval
->
sliding
);
}
else
{
int
mon
=
tm
.
tm_year
*
12
+
tm
.
tm_mon
;
mon
=
(
int
)(
mon
/
pInterval
->
sliding
*
pInterval
->
sliding
);
int
32_t
mon
=
tm
.
tm_year
*
12
+
tm
.
tm_mon
;
mon
=
(
int
32_t
)(
mon
/
pInterval
->
sliding
*
pInterval
->
sliding
);
tm
.
tm_year
=
mon
/
12
;
tm
.
tm_mon
=
mon
%
12
;
}
...
...
@@ -547,17 +538,17 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
start
=
(
delta
/
pInterval
->
sliding
+
factor
)
*
pInterval
->
sliding
;
if
(
pInterval
->
intervalUnit
==
'd'
||
pInterval
->
intervalUnit
==
'w'
)
{
/*
* here we revised the start time of day according to the local time zone,
* but in case of DST, the start time of one day need to be dynamically decided.
*/
/*
* here we revised the start time of day according to the local time zone,
* but in case of DST, the start time of one day need to be dynamically decided.
*/
// todo refactor to extract function that is available for Linux/Windows/Mac platform
#if defined(WINDOWS) && _MSC_VER >= 1900
#if defined(WINDOWS) && _MSC_VER >= 1900
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
int64_t
timezone
=
_timezone
;
int32_t
daylight
=
_daylight
;
char
**
tzname
=
_tzname
;
#endif
#endif
start
+=
(
int64_t
)(
timezone
*
TSDB_TICK_PER_SECOND
(
precision
));
}
...
...
@@ -568,7 +559,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
if
(
start
<
0
||
INT64_MAX
-
start
>
pInterval
->
interval
-
1
)
{
end
=
start
+
pInterval
->
interval
-
1
;
while
(
end
<
t
&&
((
start
+
pInterval
->
sliding
)
<=
INT64_MAX
))
{
// move forward to the correct time window
while
(
end
<
t
&&
((
start
+
pInterval
->
sliding
)
<=
INT64_MAX
))
{
// move forward to the correct time window
start
+=
pInterval
->
sliding
;
if
(
start
<
0
||
INT64_MAX
-
start
>
pInterval
->
interval
-
1
)
{
...
...
@@ -601,8 +592,8 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
// and the parameter can also be a variable.
const
char
*
fmtts
(
int64_t
ts
)
{
static
char
buf
[
96
];
size_t
pos
=
0
;
struct
tm
tm
;
size_t
pos
=
0
;
struct
tm
tm
;
if
(
ts
>
-
62135625943
&&
ts
<
32503651200
)
{
time_t
t
=
(
time_t
)
ts
;
...
...
@@ -619,7 +610,7 @@ const char* fmtts(int64_t ts) {
buf
[
pos
++
]
=
' '
;
}
pos
+=
strftime
(
buf
+
pos
,
sizeof
(
buf
),
"ms=%Y-%m-%d %H:%M:%S"
,
&
tm
);
pos
+=
sprintf
(
buf
+
pos
,
".%03d"
,
(
int
)(
ts
%
1000
));
pos
+=
sprintf
(
buf
+
pos
,
".%03d"
,
(
int
32_t
)(
ts
%
1000
));
}
{
...
...
@@ -631,7 +622,7 @@ const char* fmtts(int64_t ts) {
buf
[
pos
++
]
=
' '
;
}
pos
+=
strftime
(
buf
+
pos
,
sizeof
(
buf
),
"us=%Y-%m-%d %H:%M:%S"
,
&
tm
);
pos
+=
sprintf
(
buf
+
pos
,
".%06d"
,
(
int
)(
ts
%
1000000
));
pos
+=
sprintf
(
buf
+
pos
,
".%06d"
,
(
int
32_t
)(
ts
%
1000000
));
}
return
buf
;
...
...
source/common/src/ttszip.c
浏览文件 @
fb6fa6ad
...
...
@@ -19,7 +19,7 @@
#include "tcompression.h"
static
int32_t
getDataStartOffset
();
static
void
TSBufUpdateGroupInfo
(
STSBuf
*
pTSBuf
,
int32_t
index
,
STSGroupBlockInfo
*
pBlockInfo
);
static
void
TSBufUpdateGroupInfo
(
STSBuf
*
pTSBuf
,
int32_t
index
,
STSGroupBlockInfo
*
pBlockInfo
);
static
STSBuf
*
allocResForTSBuf
(
STSBuf
*
pTSBuf
);
static
int32_t
STSBufUpdateHeader
(
STSBuf
*
pTSBuf
,
STSBufFileHeader
*
pHeader
);
...
...
@@ -36,7 +36,7 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order) {
}
pTSBuf
->
autoDelete
=
autoDelete
;
taosGetTmpfilePath
(
tsTempDir
,
"join"
,
pTSBuf
->
path
);
// pTSBuf->pFile = fopen(pTSBuf->path, "wb+");
pTSBuf
->
pFile
=
taosOpenFile
(
pTSBuf
->
path
,
TD_FILE_CTEATE
|
TD_FILE_WRITE
|
TD_FILE_READ
|
TD_FILE_TRUNC
);
...
...
@@ -48,20 +48,20 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order) {
if
(
!
autoDelete
)
{
remove
(
pTSBuf
->
path
);
}
if
(
NULL
==
allocResForTSBuf
(
pTSBuf
))
{
return
NULL
;
}
// update the header info
STSBufFileHeader
header
=
{.
magic
=
TS_COMP_FILE_MAGIC
,
.
numOfGroup
=
pTSBuf
->
numOfGroups
,
.
tsOrder
=
TSDB_ORDER_ASC
};
STSBufUpdateHeader
(
pTSBuf
,
&
header
);
tsBufResetPos
(
pTSBuf
);
pTSBuf
->
cur
.
order
=
TSDB_ORDER_ASC
;
pTSBuf
->
tsOrder
=
order
;
return
pTSBuf
;
}
...
...
@@ -72,23 +72,23 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
}
pTSBuf
->
autoDelete
=
autoDelete
;
tstrncpy
(
pTSBuf
->
path
,
path
,
sizeof
(
pTSBuf
->
path
));
// pTSBuf->pFile = fopen(pTSBuf->path, "rb+");
pTSBuf
->
pFile
=
taosOpenFile
(
pTSBuf
->
path
,
TD_FILE_WRITE
|
TD_FILE_READ
);
if
(
pTSBuf
->
pFile
==
NULL
)
{
free
(
pTSBuf
);
return
NULL
;
}
if
(
allocResForTSBuf
(
pTSBuf
)
==
NULL
)
{
return
NULL
;
}
// validate the file magic number
STSBufFileHeader
header
=
{
0
};
int32_t
ret
=
taosLSeekFile
(
pTSBuf
->
pFile
,
0
,
SEEK_SET
);
int32_t
ret
=
taosLSeekFile
(
pTSBuf
->
pFile
,
0
,
SEEK_SET
);
UNUSED
(
ret
);
size_t
sz
=
taosReadFile
(
pTSBuf
->
pFile
,
&
header
,
sizeof
(
STSBufFileHeader
));
UNUSED
(
sz
);
...
...
@@ -98,7 +98,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
tsBufDestroy
(
pTSBuf
);
return
NULL
;
}
if
(
header
.
numOfGroup
>
pTSBuf
->
numOfAlloc
)
{
pTSBuf
->
numOfAlloc
=
header
.
numOfGroup
;
STSGroupBlockInfoEx
*
tmp
=
realloc
(
pTSBuf
->
pData
,
sizeof
(
STSGroupBlockInfoEx
)
*
pTSBuf
->
numOfAlloc
);
...
...
@@ -106,57 +106,58 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
tsBufDestroy
(
pTSBuf
);
return
NULL
;
}
pTSBuf
->
pData
=
tmp
;
}
pTSBuf
->
numOfGroups
=
header
.
numOfGroup
;
// check the ts order
pTSBuf
->
tsOrder
=
header
.
tsOrder
;
if
(
pTSBuf
->
tsOrder
!=
TSDB_ORDER_ASC
&&
pTSBuf
->
tsOrder
!=
TSDB_ORDER_DESC
)
{
// tscError("invalid order info in buf:%d", pTSBuf->tsOrder);
// tscError("invalid order info in buf:%d", pTSBuf->tsOrder);
tsBufDestroy
(
pTSBuf
);
return
NULL
;
}
size_t
infoSize
=
sizeof
(
STSGroupBlockInfo
)
*
pTSBuf
->
numOfGroups
;
STSGroupBlockInfo
*
buf
=
(
STSGroupBlockInfo
*
)
calloc
(
1
,
infoSize
);
if
(
buf
==
NULL
)
{
tsBufDestroy
(
pTSBuf
);
return
NULL
;
}
//int64_t pos = ftell(pTSBuf->pFile); //pos not used
return
NULL
;
}
//
int64_t pos = ftell(pTSBuf->pFile); //pos not used
sz
=
taosReadFile
(
pTSBuf
->
pFile
,
buf
,
infoSize
);
UNUSED
(
sz
);
// the length value for each vnode is not kept in file, so does not set the length value
for
(
int32_t
i
=
0
;
i
<
pTSBuf
->
numOfGroups
;
++
i
)
{
STSGroupBlockInfoEx
*
pBlockList
=
&
pTSBuf
->
pData
[
i
];
memcpy
(
&
pBlockList
->
info
,
&
buf
[
i
],
sizeof
(
STSGroupBlockInfo
));
}
free
(
buf
);
ret
=
taosLSeekFile
(
pTSBuf
->
pFile
,
0
,
SEEK_END
);
UNUSED
(
ret
);
int64_t
file_size
;
if
(
taosFStatFile
(
pTSBuf
->
pFile
,
&
file_size
,
NULL
)
!=
0
)
{
tsBufDestroy
(
pTSBuf
);
return
NULL
;
}
pTSBuf
->
fileSize
=
(
uint32_t
)
file_size
;
tsBufResetPos
(
pTSBuf
);
// ascending by default
pTSBuf
->
cur
.
order
=
TSDB_ORDER_ASC
;
// tscDebug("create tsBuf from file:%s, fd:%d, size:%d, numOfGroups:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->pFile),
// pTSBuf->fileSize, pTSBuf->numOfGroups, pTSBuf->autoDelete);
// tscDebug("create tsBuf from file:%s, fd:%d, size:%d, numOfGroups:%d, autoDelete:%d", pTSBuf->path,
// fileno(pTSBuf->pFile),
// pTSBuf->fileSize, pTSBuf->numOfGroups, pTSBuf->autoDelete);
return
pTSBuf
;
}
...
...
@@ -164,22 +165,22 @@ void* tsBufDestroy(STSBuf* pTSBuf) {
if
(
pTSBuf
==
NULL
)
{
return
NULL
;
}
tfree
(
pTSBuf
->
assistBuf
);
tfree
(
pTSBuf
->
tsData
.
rawBuf
);
tfree
(
pTSBuf
->
pData
);
tfree
(
pTSBuf
->
block
.
payload
);
if
(
!
pTSBuf
->
remainOpen
)
{
taosCloseFile
(
&
pTSBuf
->
pFile
);
}
if
(
pTSBuf
->
autoDelete
)
{
// ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path);
// ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path);
remove
(
pTSBuf
->
path
);
}
else
{
// tscDebug("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path);
// tscDebug("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path);
}
taosVariantDestroy
(
&
pTSBuf
->
block
.
tag
);
...
...
@@ -189,7 +190,7 @@ void* tsBufDestroy(STSBuf* pTSBuf) {
static
STSGroupBlockInfoEx
*
tsBufGetLastGroupInfo
(
STSBuf
*
pTSBuf
)
{
int32_t
last
=
pTSBuf
->
numOfGroups
-
1
;
assert
(
last
>=
0
);
return
&
pTSBuf
->
pData
[
last
];
}
...
...
@@ -198,40 +199,40 @@ static STSGroupBlockInfoEx* addOneGroupInfo(STSBuf* pTSBuf, int32_t id) {
if
(
pTSBuf
->
numOfAlloc
<=
pTSBuf
->
numOfGroups
)
{
uint32_t
newSize
=
(
uint32_t
)(
pTSBuf
->
numOfAlloc
*
1
.
5
);
assert
((
int32_t
)
newSize
>
pTSBuf
->
numOfAlloc
);
STSGroupBlockInfoEx
*
tmp
=
(
STSGroupBlockInfoEx
*
)
realloc
(
pTSBuf
->
pData
,
sizeof
(
STSGroupBlockInfoEx
)
*
newSize
);
if
(
tmp
==
NULL
)
{
return
NULL
;
}
pTSBuf
->
pData
=
tmp
;
pTSBuf
->
numOfAlloc
=
newSize
;
memset
(
&
pTSBuf
->
pData
[
pTSBuf
->
numOfGroups
],
0
,
sizeof
(
STSGroupBlockInfoEx
)
*
(
newSize
-
pTSBuf
->
numOfGroups
));
}
if
(
pTSBuf
->
numOfGroups
>
0
)
{
STSGroupBlockInfoEx
*
pPrevBlockInfoEx
=
tsBufGetLastGroupInfo
(
pTSBuf
);
// update prev vnode length info in file
TSBufUpdateGroupInfo
(
pTSBuf
,
pTSBuf
->
numOfGroups
-
1
,
&
pPrevBlockInfoEx
->
info
);
}
// set initial value for vnode block
STSGroupBlockInfo
*
pBlockInfo
=
&
pTSBuf
->
pData
[
pTSBuf
->
numOfGroups
].
info
;
pBlockInfo
->
id
=
id
;
pBlockInfo
->
offset
=
pTSBuf
->
fileSize
;
assert
(
pBlockInfo
->
offset
>=
getDataStartOffset
());
// update vnode info in file
TSBufUpdateGroupInfo
(
pTSBuf
,
pTSBuf
->
numOfGroups
,
pBlockInfo
);
// add one vnode info
pTSBuf
->
numOfGroups
+=
1
;
// update the header info
STSBufFileHeader
header
=
{
.
magic
=
TS_COMP_FILE_MAGIC
,
.
numOfGroup
=
pTSBuf
->
numOfGroups
,
.
tsOrder
=
pTSBuf
->
tsOrder
};
STSBufUpdateHeader
(
pTSBuf
,
&
header
);
return
tsBufGetLastGroupInfo
(
pTSBuf
);
}
...
...
@@ -240,7 +241,7 @@ static void shrinkBuffer(STSList* ptsData) {
// shrink tmp buffer size if it consumes too many memory compared to the pre-defined size
if
(
ptsData
->
allocSize
>=
ptsData
->
threshold
*
2
)
{
char
*
rawBuf
=
realloc
(
ptsData
->
rawBuf
,
MEM_BUF_SIZE
);
if
(
rawBuf
)
{
if
(
rawBuf
)
{
ptsData
->
rawBuf
=
rawBuf
;
ptsData
->
allocSize
=
MEM_BUF_SIZE
;
}
...
...
@@ -260,18 +261,17 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
if
(
pTSBuf
->
tsData
.
len
==
0
)
{
return
;
}
STSBlock
*
pBlock
=
&
pTSBuf
->
block
;
STSList
*
pTsData
=
&
pTSBuf
->
tsData
;
pBlock
->
numOfElem
=
pTsData
->
len
/
TSDB_KEYSIZE
;
pBlock
->
compLen
=
tsCompressTimestamp
(
pTsData
->
rawBuf
,
pTsData
->
len
,
pTsData
->
len
/
TSDB_KEYSIZE
,
pBlock
->
payload
,
pTsData
->
allocSize
,
TWO_STAGE_COMP
,
pTSBuf
->
assistBuf
,
pTSBuf
->
bufSize
);
pBlock
->
compLen
=
tsCompressTimestamp
(
pTsData
->
rawBuf
,
pTsData
->
len
,
pTsData
->
len
/
TSDB_KEYSIZE
,
pBlock
->
payload
,
pTsData
->
allocSize
,
TWO_STAGE_COMP
,
pTSBuf
->
assistBuf
,
pTSBuf
->
bufSize
);
int64_t
r
=
taosLSeekFile
(
pTSBuf
->
pFile
,
pTSBuf
->
fileSize
,
SEEK_SET
);
assert
(
r
==
0
);
/*
* format for output data:
* 1. tags, number of ts, size after compressed, payload, size after compressed
...
...
@@ -289,10 +289,10 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
}
else
if
(
pBlock
->
tag
.
nType
==
TSDB_DATA_TYPE_FLOAT
)
{
metaLen
+=
(
int32_t
)
taosWriteFile
(
pTSBuf
->
pFile
,
&
pBlock
->
tag
.
nLen
,
sizeof
(
pBlock
->
tag
.
nLen
));
float
tfloat
=
(
float
)
pBlock
->
tag
.
d
;
metaLen
+=
(
int32_t
)
taosWriteFile
(
pTSBuf
->
pFile
,
&
tfloat
,
(
size_t
)
pBlock
->
tag
.
nLen
);
metaLen
+=
(
int32_t
)
taosWriteFile
(
pTSBuf
->
pFile
,
&
tfloat
,
(
size_t
)
pBlock
->
tag
.
nLen
);
}
else
if
(
pBlock
->
tag
.
nType
!=
TSDB_DATA_TYPE_NULL
)
{
metaLen
+=
(
int32_t
)
taosWriteFile
(
pTSBuf
->
pFile
,
&
pBlock
->
tag
.
nLen
,
sizeof
(
pBlock
->
tag
.
nLen
));
metaLen
+=
(
int32_t
)
taosWriteFile
(
pTSBuf
->
pFile
,
&
pBlock
->
tag
.
i
,
(
size_t
)
pBlock
->
tag
.
nLen
);
metaLen
+=
(
int32_t
)
taosWriteFile
(
pTSBuf
->
pFile
,
&
pBlock
->
tag
.
i
,
(
size_t
)
pBlock
->
tag
.
nLen
);
}
else
{
trueLen
=
0
;
metaLen
+=
(
int32_t
)
taosWriteFile
(
pTSBuf
->
pFile
,
&
trueLen
,
sizeof
(
pBlock
->
tag
.
nLen
));
...
...
@@ -303,19 +303,19 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
taosWriteFile
(
pTSBuf
->
pFile
,
pBlock
->
payload
,
(
size_t
)
pBlock
->
compLen
);
taosWriteFile
(
pTSBuf
->
pFile
,
&
pBlock
->
compLen
,
sizeof
(
pBlock
->
compLen
));
metaLen
+=
(
int32_t
)
taosWriteFile
(
pTSBuf
->
pFile
,
&
trueLen
,
sizeof
(
pBlock
->
tag
.
nLen
));
metaLen
+=
(
int32_t
)
taosWriteFile
(
pTSBuf
->
pFile
,
&
trueLen
,
sizeof
(
pBlock
->
tag
.
nLen
));
assert
(
metaLen
==
getTagAreaLength
(
&
pBlock
->
tag
));
int32_t
blockSize
=
metaLen
+
sizeof
(
pBlock
->
numOfElem
)
+
sizeof
(
pBlock
->
compLen
)
*
2
+
pBlock
->
compLen
;
pTSBuf
->
fileSize
+=
blockSize
;
pTSBuf
->
tsData
.
len
=
0
;
STSGroupBlockInfoEx
*
pGroupBlockInfoEx
=
tsBufGetLastGroupInfo
(
pTSBuf
);
pGroupBlockInfoEx
->
info
.
compLen
+=
blockSize
;
pGroupBlockInfoEx
->
info
.
numOfBlocks
+=
1
;
shrinkBuffer
(
&
pTSBuf
->
tsData
);
}
...
...
@@ -326,7 +326,7 @@ static void expandBuffer(STSList* ptsData, int32_t inputSize) {
if
(
tmp
==
NULL
)
{
// todo
}
ptsData
->
rawBuf
=
tmp
;
ptsData
->
allocSize
=
newSize
;
}
...
...
@@ -336,8 +336,8 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
STSBlock
*
pBlock
=
&
pTSBuf
->
block
;
// clear the memory buffer
pBlock
->
compLen
=
0
;
pBlock
->
padding
=
0
;
pBlock
->
compLen
=
0
;
pBlock
->
padding
=
0
;
pBlock
->
numOfElem
=
0
;
int32_t
offset
=
-
1
;
...
...
@@ -347,11 +347,11 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
* set the right position for the reversed traverse, the reversed traverse is started from
* the end of each comp data block
*/
int32_t
prev
=
-
(
int32_t
)
(
sizeof
(
pBlock
->
padding
)
+
sizeof
(
pBlock
->
tag
.
nLen
));
int32_t
prev
=
-
(
int32_t
)(
sizeof
(
pBlock
->
padding
)
+
sizeof
(
pBlock
->
tag
.
nLen
));
int32_t
ret
=
taosLSeekFile
(
pTSBuf
->
pFile
,
prev
,
SEEK_CUR
);
size_t
sz
=
taosReadFile
(
pTSBuf
->
pFile
,
&
pBlock
->
padding
,
sizeof
(
pBlock
->
padding
));
size_t
sz
=
taosReadFile
(
pTSBuf
->
pFile
,
&
pBlock
->
padding
,
sizeof
(
pBlock
->
padding
));
sz
=
taosReadFile
(
pTSBuf
->
pFile
,
&
pBlock
->
tag
.
nLen
,
sizeof
(
pBlock
->
tag
.
nLen
));
UNUSED
(
sz
);
UNUSED
(
sz
);
pBlock
->
compLen
=
pBlock
->
padding
;
...
...
@@ -376,11 +376,11 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
UNUSED
(
sz
);
}
else
if
(
pBlock
->
tag
.
nType
==
TSDB_DATA_TYPE_FLOAT
)
{
float
tfloat
=
0
;
sz
=
taosReadFile
(
pTSBuf
->
pFile
,
&
tfloat
,
(
size_t
)
pBlock
->
tag
.
nLen
);
sz
=
taosReadFile
(
pTSBuf
->
pFile
,
&
tfloat
,
(
size_t
)
pBlock
->
tag
.
nLen
);
pBlock
->
tag
.
d
=
(
double
)
tfloat
;
UNUSED
(
sz
);
}
else
if
(
pBlock
->
tag
.
nType
!=
TSDB_DATA_TYPE_NULL
)
{
//
TODO check the return value
sz
=
taosReadFile
(
pTSBuf
->
pFile
,
&
pBlock
->
tag
.
i
,
(
size_t
)
pBlock
->
tag
.
nLen
);
}
else
if
(
pBlock
->
tag
.
nType
!=
TSDB_DATA_TYPE_NULL
)
{
//
TODO check the return value
sz
=
taosReadFile
(
pTSBuf
->
pFile
,
&
pBlock
->
tag
.
i
,
(
size_t
)
pBlock
->
tag
.
nLen
);
UNUSED
(
sz
);
}
...
...
@@ -395,7 +395,7 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
tsDecompressTimestamp
(
pBlock
->
payload
,
pBlock
->
compLen
,
pBlock
->
numOfElem
,
pTSBuf
->
tsData
.
rawBuf
,
pTSBuf
->
tsData
.
allocSize
,
TWO_STAGE_COMP
,
pTSBuf
->
assistBuf
,
pTSBuf
->
bufSize
);
}
// read the comp length at the length of comp block
sz
=
taosReadFile
(
pTSBuf
->
pFile
,
&
pBlock
->
padding
,
sizeof
(
pBlock
->
padding
));
assert
(
pBlock
->
padding
==
pBlock
->
compLen
);
...
...
@@ -409,24 +409,24 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
}
UNUSED
(
sz
);
// for backwards traverse, set the start position at the end of previous block
if
(
order
==
TSDB_ORDER_DESC
)
{
int32_t
r
=
taosLSeekFile
(
pTSBuf
->
pFile
,
-
offset
,
SEEK_CUR
);
UNUSED
(
r
);
}
return
pBlock
;
}
// set the order of ts buffer if the ts order has not been set yet
static
int32_t
setCheckTSOrder
(
STSBuf
*
pTSBuf
,
const
char
*
pData
,
int32_t
len
)
{
STSList
*
ptsData
=
&
pTSBuf
->
tsData
;
if
(
pTSBuf
->
tsOrder
==
-
1
)
{
if
(
ptsData
->
len
>
0
)
{
TSKEY
lastKey
=
*
(
TSKEY
*
)(
ptsData
->
rawBuf
+
ptsData
->
len
-
TSDB_KEYSIZE
);
if
(
lastKey
>
*
(
TSKEY
*
)
pData
)
{
pTSBuf
->
tsOrder
=
TSDB_ORDER_DESC
;
}
else
{
...
...
@@ -436,7 +436,7 @@ static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) {
// no data in current vnode, more than one ts is added, check the orders
TSKEY
k1
=
*
(
TSKEY
*
)(
pData
);
TSKEY
k2
=
*
(
TSKEY
*
)(
pData
+
TSDB_KEYSIZE
);
if
(
k1
<
k2
)
{
pTSBuf
->
tsOrder
=
TSDB_ORDER_ASC
;
}
else
if
(
k1
>
k2
)
{
...
...
@@ -448,23 +448,23 @@ static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) {
}
else
{
// todo the timestamp order is set, check the asc/desc order of appended data
}
return
TSDB_CODE_SUCCESS
;
}
void
tsBufAppend
(
STSBuf
*
pTSBuf
,
int32_t
id
,
SVariant
*
tag
,
const
char
*
pData
,
int32_t
len
)
{
STSGroupBlockInfoEx
*
pBlockInfo
=
NULL
;
STSList
*
ptsData
=
&
pTSBuf
->
tsData
;
if
(
pTSBuf
->
numOfGroups
==
0
||
tsBufGetLastGroupInfo
(
pTSBuf
)
->
info
.
id
!=
id
)
{
writeDataToDisk
(
pTSBuf
);
shrinkBuffer
(
ptsData
);
pBlockInfo
=
addOneGroupInfo
(
pTSBuf
,
id
);
}
else
{
pBlockInfo
=
tsBufGetLastGroupInfo
(
pTSBuf
);
}
assert
(
pBlockInfo
->
info
.
id
==
id
);
if
((
taosVariantCompare
(
&
pTSBuf
->
block
.
tag
,
tag
)
!=
0
)
&&
ptsData
->
len
>
0
)
{
...
...
@@ -476,22 +476,22 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t id, SVariant* tag, const char* pData, i
taosVariantAssign
(
&
pTSBuf
->
block
.
tag
,
tag
);
memcpy
(
ptsData
->
rawBuf
+
ptsData
->
len
,
pData
,
(
size_t
)
len
);
// todo check return value
setCheckTSOrder
(
pTSBuf
,
pData
,
len
);
ptsData
->
len
+=
len
;
pBlockInfo
->
len
+=
len
;
pTSBuf
->
numOfTotal
+=
len
/
TSDB_KEYSIZE
;
// the size of raw data exceeds the size of the default prepared buffer, so
// during getBufBlock, the output buffer needs to be large enough.
if
(
ptsData
->
len
>=
ptsData
->
threshold
)
{
writeDataToDisk
(
pTSBuf
);
shrinkBuffer
(
ptsData
);
}
tsBufResetPos
(
pTSBuf
);
}
...
...
@@ -499,15 +499,15 @@ void tsBufFlush(STSBuf* pTSBuf) {
if
(
pTSBuf
->
tsData
.
len
<=
0
)
{
return
;
}
writeDataToDisk
(
pTSBuf
);
shrinkBuffer
(
&
pTSBuf
->
tsData
);
STSGroupBlockInfoEx
*
pBlockInfoEx
=
tsBufGetLastGroupInfo
(
pTSBuf
);
// update prev vnode length info in file
TSBufUpdateGroupInfo
(
pTSBuf
,
pTSBuf
->
numOfGroups
-
1
,
&
pBlockInfoEx
->
info
);
// save the ts order into header
STSBufFileHeader
header
=
{
.
magic
=
TS_COMP_FILE_MAGIC
,
.
numOfGroup
=
pTSBuf
->
numOfGroups
,
.
tsOrder
=
pTSBuf
->
tsOrder
};
...
...
@@ -522,7 +522,7 @@ static int32_t tsBufFindGroupById(STSGroupBlockInfoEx* pGroupInfoEx, int32_t num
break
;
}
}
return
j
;
}
...
...
@@ -531,17 +531,17 @@ static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, int
if
(
taosLSeekFile
(
pTSBuf
->
pFile
,
pBlockInfo
->
offset
,
SEEK_SET
)
!=
0
)
{
return
-
1
;
}
// sequentially read the compressed data blocks, start from the beginning of the comp data block of this vnode
int32_t
i
=
0
;
bool
decomp
=
false
;
while
((
i
++
)
<=
blockIndex
)
{
if
(
readDataFromDisk
(
pTSBuf
,
TSDB_ORDER_ASC
,
decomp
)
==
NULL
)
{
return
-
1
;
}
}
// set the file position to be the end of previous comp block
if
(
pTSBuf
->
cur
.
order
==
TSDB_ORDER_DESC
)
{
STSBlock
*
pBlock
=
&
pTSBuf
->
block
;
...
...
@@ -550,34 +550,34 @@ static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, int
int32_t
ret
=
taosLSeekFile
(
pTSBuf
->
pFile
,
-
compBlockSize
,
SEEK_CUR
);
UNUSED
(
ret
);
}
return
0
;
}
static
int32_t
tsBufFindBlockByTag
(
STSBuf
*
pTSBuf
,
STSGroupBlockInfo
*
pBlockInfo
,
SVariant
*
tag
)
{
bool
decomp
=
false
;
int64_t
offset
=
0
;
if
(
pTSBuf
->
cur
.
order
==
TSDB_ORDER_ASC
)
{
offset
=
pBlockInfo
->
offset
;
}
else
{
// reversed traverse starts from the end of block
offset
=
pBlockInfo
->
offset
+
pBlockInfo
->
compLen
;
}
if
(
taosLSeekFile
(
pTSBuf
->
pFile
,
(
int32_t
)
offset
,
SEEK_SET
)
!=
0
)
{
return
-
1
;
}
for
(
int32_t
i
=
0
;
i
<
pBlockInfo
->
numOfBlocks
;
++
i
)
{
if
(
readDataFromDisk
(
pTSBuf
,
pTSBuf
->
cur
.
order
,
decomp
)
==
NULL
)
{
return
-
1
;
}
if
(
taosVariantCompare
(
&
pTSBuf
->
block
.
tag
,
tag
)
==
0
)
{
return
(
pTSBuf
->
cur
.
order
==
TSDB_ORDER_ASC
)
?
i
:
(
pBlockInfo
->
numOfBlocks
-
(
i
+
1
));
return
(
pTSBuf
->
cur
.
order
==
TSDB_ORDER_ASC
)
?
i
:
(
pBlockInfo
->
numOfBlocks
-
(
i
+
1
));
}
}
return
-
1
;
}
...
...
@@ -586,14 +586,14 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t groupIndex, int32_t blockIndex
if
(
pBlockInfo
->
numOfBlocks
<=
blockIndex
)
{
assert
(
false
);
}
STSCursor
*
pCur
=
&
pTSBuf
->
cur
;
if
(
pCur
->
vgroupIndex
==
groupIndex
&&
((
pCur
->
blockIndex
<=
blockIndex
&&
pCur
->
order
==
TSDB_ORDER_ASC
)
||
(
pCur
->
blockIndex
>=
blockIndex
&&
pCur
->
order
==
TSDB_ORDER_DESC
)))
{
(
pCur
->
blockIndex
>=
blockIndex
&&
pCur
->
order
==
TSDB_ORDER_DESC
)))
{
int32_t
i
=
0
;
bool
decomp
=
false
;
int32_t
step
=
abs
(
blockIndex
-
pCur
->
blockIndex
);
while
((
++
i
)
<=
step
)
{
if
(
readDataFromDisk
(
pTSBuf
,
pCur
->
order
,
decomp
)
==
NULL
)
{
return
;
...
...
@@ -604,11 +604,11 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t groupIndex, int32_t blockIndex
assert
(
false
);
}
}
STSBlock
*
pBlock
=
&
pTSBuf
->
block
;
size_t
s
=
pBlock
->
numOfElem
*
TSDB_KEYSIZE
;
/*
* In order to accommodate all the qualified data, the actual buffer size for one block with identical tags value
* may exceed the maximum allowed size during *tsBufAppend* function by invoking expandBuffer function
...
...
@@ -616,16 +616,16 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t groupIndex, int32_t blockIndex
if
(
s
>
pTSBuf
->
tsData
.
allocSize
)
{
expandBuffer
(
&
pTSBuf
->
tsData
,
(
int32_t
)
s
);
}
pTSBuf
->
tsData
.
len
=
tsDecompressTimestamp
(
pBlock
->
payload
,
pBlock
->
compLen
,
pBlock
->
numOfElem
,
pTSBuf
->
tsData
.
rawBuf
,
pTSBuf
->
tsData
.
allocSize
,
TWO_STAGE_COMP
,
pTSBuf
->
assistBuf
,
pTSBuf
->
bufSize
);
assert
((
pTSBuf
->
tsData
.
len
/
TSDB_KEYSIZE
==
pBlock
->
numOfElem
)
&&
(
pTSBuf
->
tsData
.
allocSize
>=
pTSBuf
->
tsData
.
len
));
pCur
->
vgroupIndex
=
groupIndex
;
pCur
->
blockIndex
=
blockIndex
;
pCur
->
tsIndex
=
(
pCur
->
order
==
TSDB_ORDER_ASC
)
?
0
:
pBlock
->
numOfElem
-
1
;
}
...
...
@@ -647,7 +647,7 @@ STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id) {
if
(
j
==
-
1
)
{
return
NULL
;
}
return
&
pTSBuf
->
pData
[
j
].
info
;
}
...
...
@@ -660,13 +660,14 @@ int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) {
int32_t
r
=
taosLSeekFile
(
pTSBuf
->
pFile
,
0
,
SEEK_SET
);
if
(
r
!=
0
)
{
// qError("fseek failed, errno:%d", errno);
// qError("fseek failed, errno:%d", errno);
return
-
1
;
}
size_t
ws
=
taosWriteFile
(
pTSBuf
->
pFile
,
pHeader
,
sizeof
(
STSBufFileHeader
));
if
(
ws
!=
1
)
{
// qError("ts update header fwrite failed, size:%d, expected size:%d", (int32_t)ws, (int32_t)sizeof(STSBufFileHeader));
if
(
ws
!=
1
)
{
// qError("ts update header fwrite failed, size:%d, expected size:%d", (int32_t)ws,
// (int32_t)sizeof(STSBufFileHeader));
return
-
1
;
}
return
0
;
...
...
@@ -676,33 +677,33 @@ bool tsBufNextPos(STSBuf* pTSBuf) {
if
(
pTSBuf
==
NULL
||
pTSBuf
->
numOfGroups
==
0
)
{
return
false
;
}
STSCursor
*
pCur
=
&
pTSBuf
->
cur
;
// get the first/last position according to traverse order
if
(
pCur
->
vgroupIndex
==
-
1
)
{
if
(
pCur
->
order
==
TSDB_ORDER_ASC
)
{
tsBufGetBlock
(
pTSBuf
,
0
,
0
);
if
(
pTSBuf
->
block
.
numOfElem
==
0
)
{
// the whole list is empty, return
tsBufResetPos
(
pTSBuf
);
return
false
;
}
else
{
return
true
;
}
}
else
{
// get the last timestamp record in the last block of the last vnode
assert
(
pTSBuf
->
numOfGroups
>
0
);
int32_t
groupIndex
=
pTSBuf
->
numOfGroups
-
1
;
pCur
->
vgroupIndex
=
groupIndex
;
int32_t
id
=
pTSBuf
->
pData
[
pCur
->
vgroupIndex
].
info
.
id
;
STSGroupBlockInfo
*
pBlockInfo
=
tsBufGetGroupBlockInfo
(
pTSBuf
,
id
);
int32_t
blockIndex
=
pBlockInfo
->
numOfBlocks
-
1
;
tsBufGetBlock
(
pTSBuf
,
groupIndex
,
blockIndex
);
pCur
->
tsIndex
=
pTSBuf
->
block
.
numOfElem
-
1
;
if
(
pTSBuf
->
block
.
numOfElem
==
0
)
{
tsBufResetPos
(
pTSBuf
);
...
...
@@ -712,16 +713,16 @@ bool tsBufNextPos(STSBuf* pTSBuf) {
}
}
}
int32_t
step
=
pCur
->
order
==
TSDB_ORDER_ASC
?
1
:
-
1
;
while
(
1
)
{
assert
(
pTSBuf
->
tsData
.
len
==
pTSBuf
->
block
.
numOfElem
*
TSDB_KEYSIZE
);
if
((
pCur
->
order
==
TSDB_ORDER_ASC
&&
pCur
->
tsIndex
>=
pTSBuf
->
block
.
numOfElem
-
1
)
||
(
pCur
->
order
==
TSDB_ORDER_DESC
&&
pCur
->
tsIndex
<=
0
))
{
int32_t
id
=
pTSBuf
->
pData
[
pCur
->
vgroupIndex
].
info
.
id
;
STSGroupBlockInfo
*
pBlockInfo
=
tsBufGetGroupBlockInfo
(
pTSBuf
,
id
);
if
(
pBlockInfo
==
NULL
||
(
pCur
->
blockIndex
>=
pBlockInfo
->
numOfBlocks
-
1
&&
pCur
->
order
==
TSDB_ORDER_ASC
)
||
(
pCur
->
blockIndex
<=
0
&&
pCur
->
order
==
TSDB_ORDER_DESC
))
{
...
...
@@ -730,15 +731,15 @@ bool tsBufNextPos(STSBuf* pTSBuf) {
pCur
->
vgroupIndex
=
-
1
;
return
false
;
}
if
(
pBlockInfo
==
NULL
)
{
return
false
;
}
int32_t
blockIndex
=
(
pCur
->
order
==
TSDB_ORDER_ASC
)
?
0
:
(
pBlockInfo
->
numOfBlocks
-
1
);
tsBufGetBlock
(
pTSBuf
,
pCur
->
vgroupIndex
+
step
,
blockIndex
);
break
;
}
else
{
tsBufGetBlock
(
pTSBuf
,
pCur
->
vgroupIndex
,
pCur
->
blockIndex
+
step
);
break
;
...
...
@@ -748,7 +749,7 @@ bool tsBufNextPos(STSBuf* pTSBuf) {
break
;
}
}
return
true
;
}
...
...
@@ -756,7 +757,7 @@ void tsBufResetPos(STSBuf* pTSBuf) {
if
(
pTSBuf
==
NULL
)
{
return
;
}
pTSBuf
->
cur
=
(
STSCursor
){.
tsIndex
=
-
1
,
.
blockIndex
=
-
1
,
.
vgroupIndex
=
-
1
,
.
order
=
pTSBuf
->
cur
.
order
};
}
...
...
@@ -765,14 +766,14 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) {
if
(
pTSBuf
==
NULL
)
{
return
elem1
;
}
STSCursor
*
pCur
=
&
pTSBuf
->
cur
;
if
(
pCur
!=
NULL
&&
pCur
->
vgroupIndex
<
0
)
{
return
elem1
;
}
STSBlock
*
pBlock
=
&
pTSBuf
->
block
;
elem1
.
id
=
pTSBuf
->
pData
[
pCur
->
vgroupIndex
].
info
.
id
;
elem1
.
ts
=
*
(
TSKEY
*
)(
pTSBuf
->
tsData
.
rawBuf
+
pCur
->
tsIndex
*
TSDB_KEYSIZE
);
elem1
.
tag
=
&
pBlock
->
tag
;
...
...
@@ -791,65 +792,65 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
if
(
pDestBuf
==
NULL
||
pSrcBuf
==
NULL
||
pSrcBuf
->
numOfGroups
<=
0
)
{
return
0
;
}
if
(
pDestBuf
->
numOfGroups
+
pSrcBuf
->
numOfGroups
>
TS_COMP_FILE_GROUP_MAX
)
{
return
-
1
;
}
// src can only have one vnode index
assert
(
pSrcBuf
->
numOfGroups
==
1
);
// there are data in buffer, flush to disk first
tsBufFlush
(
pDestBuf
);
// compared with the last vnode id
int32_t
id
=
tsBufGetLastGroupInfo
((
STSBuf
*
)
pSrcBuf
)
->
info
.
id
;
int32_t
id
=
tsBufGetLastGroupInfo
((
STSBuf
*
)
pSrcBuf
)
->
info
.
id
;
if
(
id
!=
tsBufGetLastGroupInfo
(
pDestBuf
)
->
info
.
id
)
{
int32_t
oldSize
=
pDestBuf
->
numOfGroups
;
int32_t
newSize
=
oldSize
+
pSrcBuf
->
numOfGroups
;
if
(
pDestBuf
->
numOfAlloc
<
newSize
)
{
pDestBuf
->
numOfAlloc
=
newSize
;
STSGroupBlockInfoEx
*
tmp
=
realloc
(
pDestBuf
->
pData
,
sizeof
(
STSGroupBlockInfoEx
)
*
newSize
);
if
(
tmp
==
NULL
)
{
return
-
1
;
}
pDestBuf
->
pData
=
tmp
;
}
// directly copy the vnode index information
memcpy
(
&
pDestBuf
->
pData
[
oldSize
],
pSrcBuf
->
pData
,
(
size_t
)
pSrcBuf
->
numOfGroups
*
sizeof
(
STSGroupBlockInfoEx
));
// set the new offset value
for
(
int32_t
i
=
0
;
i
<
pSrcBuf
->
numOfGroups
;
++
i
)
{
STSGroupBlockInfoEx
*
pBlockInfoEx
=
&
pDestBuf
->
pData
[
i
+
oldSize
];
pBlockInfoEx
->
info
.
offset
=
(
pSrcBuf
->
pData
[
i
].
info
.
offset
-
getDataStartOffset
())
+
pDestBuf
->
fileSize
;
pBlockInfoEx
->
info
.
id
=
id
;
}
pDestBuf
->
numOfGroups
=
newSize
;
}
else
{
STSGroupBlockInfoEx
*
pBlockInfoEx
=
tsBufGetLastGroupInfo
(
pDestBuf
);
pBlockInfoEx
->
len
+=
pSrcBuf
->
pData
[
0
].
len
;
pBlockInfoEx
->
info
.
numOfBlocks
+=
pSrcBuf
->
pData
[
0
].
info
.
numOfBlocks
;
pBlockInfoEx
->
info
.
compLen
+=
pSrcBuf
->
pData
[
0
].
info
.
compLen
;
pBlockInfoEx
->
info
.
id
=
id
;
}
int32_t
r
=
taosLSeekFile
(
pDestBuf
->
pFile
,
0
,
SEEK_END
);
assert
(
r
==
0
);
int64_t
offset
=
getDataStartOffset
();
int32_t
size
=
(
int32_t
)
pSrcBuf
->
fileSize
-
(
int32_t
)
offset
;
int64_t
written
=
taosFSendFile
(
pDestBuf
->
pFile
,
pSrcBuf
->
pFile
,
&
offset
,
size
);
if
(
written
==
-
1
||
written
!=
size
)
{
return
-
1
;
}
pDestBuf
->
numOfTotal
+=
pSrcBuf
->
numOfTotal
;
int32_t
oldSize
=
pDestBuf
->
fileSize
;
...
...
@@ -864,7 +865,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
int64_t
file_size
;
if
(
taosFStatFile
(
pDestBuf
->
pFile
,
&
file_size
,
NULL
)
!=
0
)
{
return
-
1
;
return
-
1
;
}
pDestBuf
->
fileSize
=
(
uint32_t
)
file_size
;
...
...
@@ -875,33 +876,33 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
STSBuf
*
tsBufCreateFromCompBlocks
(
const
char
*
pData
,
int32_t
numOfBlocks
,
int32_t
len
,
int32_t
order
,
int32_t
id
)
{
STSBuf
*
pTSBuf
=
tsBufCreate
(
true
,
order
);
STSGroupBlockInfo
*
pBlockInfo
=
&
(
addOneGroupInfo
(
pTSBuf
,
0
)
->
info
);
pBlockInfo
->
numOfBlocks
=
numOfBlocks
;
pBlockInfo
->
compLen
=
len
;
pBlockInfo
->
offset
=
getDataStartOffset
();
pBlockInfo
->
id
=
id
;
// update prev vnode length info in file
TSBufUpdateGroupInfo
(
pTSBuf
,
pTSBuf
->
numOfGroups
-
1
,
pBlockInfo
);
int32_t
ret
=
taosLSeekFile
(
pTSBuf
->
pFile
,
pBlockInfo
->
offset
,
SEEK_SET
);
if
(
ret
==
-
1
)
{
// qError("fseek failed, errno:%d", errno);
// qError("fseek failed, errno:%d", errno);
tsBufDestroy
(
pTSBuf
);
return
NULL
;
}
size_t
sz
=
taosWriteFile
(
pTSBuf
->
pFile
,
(
void
*
)
pData
,
len
);
if
(
sz
!=
len
)
{
// qError("ts data fwrite failed, write size:%d, expected size:%d", (int32_t)sz, len);
// qError("ts data fwrite failed, write size:%d, expected size:%d", (int32_t)sz, len);
tsBufDestroy
(
pTSBuf
);
return
NULL
;
}
pTSBuf
->
fileSize
+=
len
;
pTSBuf
->
tsOrder
=
order
;
assert
(
order
==
TSDB_ORDER_ASC
||
order
==
TSDB_ORDER_DESC
);
STSBufFileHeader
header
=
{
.
magic
=
TS_COMP_FILE_MAGIC
,
.
numOfGroup
=
pTSBuf
->
numOfGroups
,
.
tsOrder
=
pTSBuf
->
tsOrder
};
if
(
STSBufUpdateHeader
(
pTSBuf
,
&
header
)
<
0
)
{
...
...
@@ -910,42 +911,42 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
}
// TODO taosFsync??
// if (taosFsync(fileno(pTSBuf->pFile)) == -1) {
//// qError("fsync failed, errno:%d", errno);
// tsBufDestroy(pTSBuf);
// return NULL;
// }
// if (taosFsync(fileno(pTSBuf->pFile)) == -1) {
//// qError("fsync failed, errno:%d", errno);
// tsBufDestroy(pTSBuf);
// return NULL;
// }
return
pTSBuf
;
}
STSElem
tsBufGetElemStartPos
(
STSBuf
*
pTSBuf
,
int32_t
id
,
SVariant
*
tag
)
{
STSElem
elem
=
{.
id
=
-
1
};
if
(
pTSBuf
==
NULL
)
{
return
elem
;
}
int32_t
j
=
tsBufFindGroupById
(
pTSBuf
->
pData
,
pTSBuf
->
numOfGroups
,
id
);
if
(
j
==
-
1
)
{
return
elem
;
}
// for debug purpose
// tsBufDisplay(pTSBuf);
STSCursor
*
pCur
=
&
pTSBuf
->
cur
;
STSGroupBlockInfo
*
pBlockInfo
=
&
pTSBuf
->
pData
[
j
].
info
;
int32_t
blockIndex
=
tsBufFindBlockByTag
(
pTSBuf
,
pBlockInfo
,
tag
);
if
(
blockIndex
<
0
)
{
return
elem
;
}
pCur
->
vgroupIndex
=
j
;
pCur
->
blockIndex
=
blockIndex
;
tsBufGetBlock
(
pTSBuf
,
j
,
blockIndex
);
return
tsBufGetElem
(
pTSBuf
);
}
...
...
@@ -954,7 +955,7 @@ STSCursor tsBufGetCursor(STSBuf* pTSBuf) {
if
(
pTSBuf
==
NULL
)
{
return
c
;
}
return
pTSBuf
->
cur
;
}
...
...
@@ -962,12 +963,12 @@ void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur) {
if
(
pTSBuf
==
NULL
||
pCur
==
NULL
)
{
return
;
}
// assert(pCur->vgroupIndex != -1 && pCur->tsIndex >= 0 && pCur->blockIndex >= 0);
if
(
pCur
->
vgroupIndex
!=
-
1
)
{
tsBufGetBlock
(
pTSBuf
,
pCur
->
vgroupIndex
,
pCur
->
blockIndex
);
}
pTSBuf
->
cur
=
*
pCur
;
}
...
...
@@ -975,7 +976,7 @@ void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order) {
if
(
pTSBuf
==
NULL
)
{
return
;
}
pTSBuf
->
cur
.
order
=
order
;
}
...
...
@@ -992,19 +993,19 @@ STSBuf* tsBufClone(STSBuf* pTSBuf) {
void
tsBufDisplay
(
STSBuf
*
pTSBuf
)
{
printf
(
"-------start of ts comp file-------
\n
"
);
printf
(
"number of vnode:%d
\n
"
,
pTSBuf
->
numOfGroups
);
int32_t
old
=
pTSBuf
->
cur
.
order
;
pTSBuf
->
cur
.
order
=
TSDB_ORDER_ASC
;
tsBufResetPos
(
pTSBuf
);
while
(
tsBufNextPos
(
pTSBuf
))
{
STSElem
elem
=
tsBufGetElem
(
pTSBuf
);
if
(
elem
.
tag
->
nType
==
TSDB_DATA_TYPE_BIGINT
)
{
printf
(
"%d-%"
PRId64
"-%"
PRId64
"
\n
"
,
elem
.
id
,
elem
.
tag
->
i
,
elem
.
ts
);
}
}
pTSBuf
->
cur
.
order
=
old
;
printf
(
"-------end of ts comp file-------
\n
"
);
}
...
...
@@ -1021,36 +1022,36 @@ static void TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInf
static
STSBuf
*
allocResForTSBuf
(
STSBuf
*
pTSBuf
)
{
const
int32_t
INITIAL_GROUPINFO_SIZE
=
4
;
pTSBuf
->
numOfAlloc
=
INITIAL_GROUPINFO_SIZE
;
pTSBuf
->
pData
=
calloc
(
pTSBuf
->
numOfAlloc
,
sizeof
(
STSGroupBlockInfoEx
));
if
(
pTSBuf
->
pData
==
NULL
)
{
tsBufDestroy
(
pTSBuf
);
return
NULL
;
}
pTSBuf
->
tsData
.
rawBuf
=
malloc
(
MEM_BUF_SIZE
);
if
(
pTSBuf
->
tsData
.
rawBuf
==
NULL
)
{
tsBufDestroy
(
pTSBuf
);
return
NULL
;
}
pTSBuf
->
bufSize
=
MEM_BUF_SIZE
;
pTSBuf
->
tsData
.
threshold
=
MEM_BUF_SIZE
;
pTSBuf
->
tsData
.
allocSize
=
MEM_BUF_SIZE
;
pTSBuf
->
assistBuf
=
malloc
(
MEM_BUF_SIZE
);
if
(
pTSBuf
->
assistBuf
==
NULL
)
{
tsBufDestroy
(
pTSBuf
);
return
NULL
;
}
pTSBuf
->
block
.
payload
=
malloc
(
MEM_BUF_SIZE
);
if
(
pTSBuf
->
block
.
payload
==
NULL
)
{
tsBufDestroy
(
pTSBuf
);
return
NULL
;
}
pTSBuf
->
fileSize
+=
getDataStartOffset
();
return
pTSBuf
;
}
...
...
@@ -1076,28 +1077,28 @@ void tsBufGetGroupIdList(STSBuf* pTSBuf, int32_t* num, int32_t** id) {
(
*
id
)
=
malloc
(
tsBufGetNumOfGroup
(
pTSBuf
)
*
sizeof
(
int32_t
));
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
(
*
id
)[
i
]
=
pTSBuf
->
pData
[
i
].
info
.
id
;
}
}
int32_t
dumpFileBlockByGroupId
(
STSBuf
*
pTSBuf
,
int32_t
groupIndex
,
void
*
buf
,
int32_t
*
len
,
int32_t
*
numOfBlocks
)
{
assert
(
groupIndex
>=
0
&&
groupIndex
<
pTSBuf
->
numOfGroups
);
STSGroupBlockInfo
*
pBlockInfo
=
&
pTSBuf
->
pData
[
groupIndex
].
info
;
STSGroupBlockInfo
*
pBlockInfo
=
&
pTSBuf
->
pData
[
groupIndex
].
info
;
*
len
=
0
;
*
numOfBlocks
=
0
;
if
(
taosLSeekFile
(
pTSBuf
->
pFile
,
pBlockInfo
->
offset
,
SEEK_SET
)
!=
0
)
{
int
code
=
TAOS_SYSTEM_ERROR
(
taosEOFFile
(
pTSBuf
->
pFile
));
// qError("%p: fseek failed: %s", pSql, tstrerror(code));
int
32_t
code
=
TAOS_SYSTEM_ERROR
(
taosEOFFile
(
pTSBuf
->
pFile
));
// qError("%p: fseek failed: %s", pSql, tstrerror(code));
return
code
;
}
size_t
s
=
taosReadFile
(
pTSBuf
->
pFile
,
buf
,
pBlockInfo
->
compLen
);
if
(
s
!=
pBlockInfo
->
compLen
)
{
int
code
=
TAOS_SYSTEM_ERROR
(
taosEOFFile
(
pTSBuf
->
pFile
));
// tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
int
32_t
code
=
TAOS_SYSTEM_ERROR
(
taosEOFFile
(
pTSBuf
->
pFile
));
// tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
return
code
;
}
...
...
@@ -1120,6 +1121,4 @@ STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, SVariant* pTag) {
return
el
;
}
bool
tsBufIsValidElem
(
STSElem
*
pElem
)
{
return
pElem
->
id
>=
0
;
}
bool
tsBufIsValidElem
(
STSElem
*
pElem
)
{
return
pElem
->
id
>=
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录