Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f76ca26c
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f76ca26c
编写于
3月 07, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feature/3.0_mhli
上级
25af19bd
342ef936
变更
35
隐藏空白更改
内联
并排
Showing
35 changed file
with
1177 addition
and
73 deletion
+1177
-73
include/common/tglobal.h
include/common/tglobal.h
+1
-0
include/common/tmsg.h
include/common/tmsg.h
+61
-7
include/common/tmsgdef.h
include/common/tmsgdef.h
+3
-0
include/common/trow.h
include/common/trow.h
+3
-3
include/libs/monitor/monitor.h
include/libs/monitor/monitor.h
+1
-0
include/os/osFile.h
include/os/osFile.h
+0
-1
include/os/osSocket.h
include/os/osSocket.h
+18
-1
include/util/taoserror.h
include/util/taoserror.h
+1
-0
include/util/thttp.h
include/util/thttp.h
+3
-1
source/common/src/tglobal.c
source/common/src/tglobal.c
+3
-0
source/dnode/mgmt/impl/src/dndEnv.c
source/dnode/mgmt/impl/src/dndEnv.c
+1
-1
source/dnode/mnode/impl/src/mndTelem.c
source/dnode/mnode/impl/src/mndTelem.c
+1
-1
source/dnode/vnode/inc/meta.h
source/dnode/vnode/inc/meta.h
+1
-0
source/dnode/vnode/inc/tsdb.h
source/dnode/vnode/inc/tsdb.h
+21
-0
source/dnode/vnode/src/inc/tsdbDef.h
source/dnode/vnode/src/inc/tsdbDef.h
+1
-0
source/dnode/vnode/src/inc/tsdbFS.h
source/dnode/vnode/src/inc/tsdbFS.h
+1
-0
source/dnode/vnode/src/inc/tsdbSma.h
source/dnode/vnode/src/inc/tsdbSma.h
+95
-0
source/dnode/vnode/src/meta/metaBDBImpl.c
source/dnode/vnode/src/meta/metaBDBImpl.c
+52
-8
source/dnode/vnode/src/tsdb/tsdbSma.c
source/dnode/vnode/src/tsdb/tsdbSma.c
+550
-0
source/dnode/vnode/src/tsdb/tsdbWrite.c
source/dnode/vnode/src/tsdb/tsdbWrite.c
+41
-0
source/dnode/vnode/src/vnd/vnodeWrite.c
source/dnode/vnode/src/vnd/vnodeWrite.c
+9
-0
source/dnode/vnode/test/tsdbSmaTest.cpp
source/dnode/vnode/test/tsdbSmaTest.cpp
+14
-5
source/libs/index/test/indexTests.cc
source/libs/index/test/indexTests.cc
+41
-4
source/libs/monitor/inc/monInt.h
source/libs/monitor/inc/monInt.h
+1
-0
source/libs/monitor/src/monitor.c
source/libs/monitor/src/monitor.c
+3
-1
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+0
-2
source/libs/transport/inc/transportInt.h
source/libs/transport/inc/transportInt.h
+0
-2
source/libs/transport/src/rpcTcp.c
source/libs/transport/src/rpcTcp.c
+2
-0
source/libs/transport/src/rpcUdp.c
source/libs/transport/src/rpcUdp.c
+3
-0
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+1
-1
source/os/src/osFile.c
source/os/src/osFile.c
+31
-14
source/os/src/osSocket.c
source/os/src/osSocket.c
+36
-10
source/util/CMakeLists.txt
source/util/CMakeLists.txt
+7
-1
source/util/src/terror.c
source/util/src/terror.c
+1
-0
source/util/src/thttp.c
source/util/src/thttp.c
+170
-10
未找到文件。
include/common/tglobal.h
浏览文件 @
f76ca26c
...
...
@@ -58,6 +58,7 @@ extern int32_t tsMonitorInterval;
extern
char
tsMonitorFqdn
[];
extern
uint16_t
tsMonitorPort
;
extern
int32_t
tsMonitorMaxLogs
;
extern
bool
tsMonitorComp
;
// query buffer management
extern
int32_t
tsQueryBufferSize
;
// maximum allowed usage buffer size in MB for each data node during query processing
...
...
include/common/tmsg.h
浏览文件 @
f76ca26c
...
...
@@ -1866,10 +1866,59 @@ typedef struct {
uint64_t
tableUid
;
// super/common table uid
int64_t
interval
;
int64_t
sliding
;
col_id_t
*
colIds
;
//
N.B.
sorted column ids
uint16_t
*
funcIds
;
//
N.B.
sorted sma function ids
col_id_t
*
colIds
;
// sorted column ids
uint16_t
*
funcIds
;
// sorted sma function ids
}
STSma
;
// Time-range-wise SMA
typedef
struct
{
int8_t
msgType
;
// 0 create, 1 recreate
STSma
tSma
;
STimeWindow
window
;
}
SCreateTSmaMsg
;
typedef
struct
{
STimeWindow
window
;
char
indexName
[
TSDB_INDEX_NAME_LEN
+
1
];
}
SDropTSmaMsg
;
typedef
struct
{
STimeWindow
tsWindow
;
// [skey, ekey]
uint64_t
tableUid
;
// sub/common table uid
int32_t
numOfBlocks
;
// number of sma blocks for each column, total number is numOfBlocks*numOfColId
int32_t
dataLen
;
// total data length
col_id_t
*
colIds
;
// e.g. 2,4,9,10
col_id_t
numOfColIds
;
// e.g. 4
char
data
[];
// the sma blocks
}
STSmaData
;
// TODO: move to the final location afte schema of STSma/STSmaData defined
static
FORCE_INLINE
void
tdDestroySmaData
(
STSmaData
*
pSmaData
)
{
if
(
pSmaData
)
{
if
(
pSmaData
->
colIds
)
{
tfree
(
pSmaData
->
colIds
);
}
tfree
(
pSmaData
);
}
}
// RSma: Time-range-wise Rollup SMA
// TODO: refactor when rSma grammar defined finally =>
typedef
struct
{
int64_t
interval
;
int32_t
retention
;
// unit: day
uint16_t
days
;
// unit: day
int8_t
intervalUnit
;
}
SSmaParams
;
// TODO: refactor when rSma grammar defined finally <=
typedef
struct
{
// TODO: refactor to use the real schema =>
STSma
tsma
;
float
xFilesFactor
;
SArray
*
smaParams
;
// SSmaParams
// TODO: refactor to use the real schema <=
}
SRSma
;
typedef
struct
{
uint32_t
number
;
STSma
*
tSma
;
...
...
@@ -1885,12 +1934,17 @@ static FORCE_INLINE void tdDestroyTSma(STSma* pSma, bool releaseSelf) {
}
}
static
FORCE_INLINE
void
tdDestroyTSmaWrapper
(
STSmaWrapper
*
pSW
)
{
if
(
pSW
&&
pSW
->
tSma
)
{
for
(
uint32_t
i
=
0
;
i
<
pSW
->
number
;
++
i
)
{
tdDestroyTSma
(
pSW
->
tSma
+
i
,
false
);
static
FORCE_INLINE
void
tdDestroyTSmaWrapper
(
STSmaWrapper
*
pSW
,
bool
releaseSelf
)
{
if
(
pSW
)
{
if
(
pSW
->
tSma
)
{
for
(
uint32_t
i
=
0
;
i
<
pSW
->
number
;
++
i
)
{
tdDestroyTSma
(
pSW
->
tSma
+
i
,
false
);
}
tfree
(
pSW
->
tSma
);
}
if
(
releaseSelf
)
{
free
(
pSW
);
}
tfree
(
pSW
->
tSma
);
}
}
...
...
include/common/tmsgdef.h
浏览文件 @
f76ca26c
...
...
@@ -184,6 +184,9 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_SUBSCRIBE
,
"vnode-subscribe"
,
SMVSubscribeReq
,
SMVSubscribeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CONSUME
,
"vnode-consume"
,
SMqCVConsumeReq
,
SMqCVConsumeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CREATE_SMA
,
"vnode-create-sma"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CANCEL_SMA
,
"vnode-cancel-sma"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_DROP_SMA
,
"vnode-drop-sma"
,
NULL
,
NULL
)
// Requests handled by QNODE
TD_NEW_MSG_SEG
(
TDMT_QND_MSG
)
...
...
include/common/trow.h
浏览文件 @
f76ca26c
...
...
@@ -118,6 +118,8 @@ typedef struct {
}
SKvRow
;
typedef
struct
{
/// timestamp
TSKEY
ts
;
union
{
/// union field for encode and decode
uint32_t
info
;
...
...
@@ -138,8 +140,6 @@ typedef struct {
uint32_t
len
;
/// row version
uint64_t
ver
;
/// timestamp
TSKEY
ts
;
/// the inline data, maybe a tuple or a k-v tuple
char
data
[];
}
STSRow
;
...
...
@@ -173,7 +173,7 @@ typedef struct {
#define TD_ROW_DATA(r) ((r)->data)
#define TD_ROW_LEN(r) ((r)->len)
#define TD_ROW_KEY(r) ((r)->ts)
#define TD_ROW_KEY_ADDR(r)
POINTER_SHIFT((r), 16
)
#define TD_ROW_KEY_ADDR(r)
(r
)
// N.B. If without STSchema, getExtendedRowSize() is used to get the rowMaxBytes and
// (int32_t)ceil((double)nCols/TD_VTYPE_PARTS) should be added if TD_SUPPORT_BITMAP defined.
...
...
include/libs/monitor/monitor.h
浏览文件 @
f76ca26c
...
...
@@ -130,6 +130,7 @@ typedef struct {
const
char
*
server
;
uint16_t
port
;
int32_t
maxLogs
;
bool
comp
;
}
SMonCfg
;
int32_t
monInit
(
const
SMonCfg
*
pCfg
);
...
...
include/os/osFile.h
浏览文件 @
f76ca26c
...
...
@@ -87,7 +87,6 @@ int32_t taosRemoveFile(const char *path);
void
taosGetTmpfilePath
(
const
char
*
inputTmpDir
,
const
char
*
fileNamePrefix
,
char
*
dstPath
);
int64_t
taosSendFile
(
SocketFd
fdDst
,
TdFilePtr
pFileSrc
,
int64_t
*
offset
,
int64_t
size
);
int64_t
taosFSendFile
(
TdFilePtr
pFileOut
,
TdFilePtr
pFileIn
,
int64_t
*
offset
,
int64_t
size
);
void
*
taosMmapReadOnlyFile
(
TdFilePtr
pFile
,
int64_t
length
);
...
...
include/os/osSocket.h
浏览文件 @
f76ca26c
...
...
@@ -16,6 +16,14 @@
#ifndef _TD_OS_SOCKET_H_
#define _TD_OS_SOCKET_H_
// If the error is in a third-party library, place this header file under the third-party library header file.
#ifndef ALLOW_FORBID_FUNC
#define socket SOCKET_FUNC_TAOS_FORBID
#define bind BIND_FUNC_TAOS_FORBID
#define listen LISTEN_FUNC_TAOS_FORBID
// #define accept ACCEPT_FUNC_TAOS_FORBID
#endif
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#include "winsock2.h"
#include <WS2tcpip.h>
...
...
@@ -26,10 +34,16 @@
#include <sys/epoll.h>
#endif
#ifdef USE_UV
#include <uv.h>
#endif
#ifdef __cplusplus
extern
"C"
{
#endif
#ifndef USE_UV
#define TAOS_EPOLL_WAIT_TIME 500
typedef
int32_t
SOCKET
;
typedef
SOCKET
EpollFd
;
...
...
@@ -50,7 +64,6 @@ void taosShutDownSocketRD(SOCKET fd);
void
taosShutDownSocketWR
(
SOCKET
fd
);
int32_t
taosSetNonblocking
(
SOCKET
sock
,
int32_t
on
);
void
taosIgnSIGPIPE
();
void
taosBlockSIGPIPE
();
void
taosSetMaskSIGPIPE
();
int32_t
taosSetSockOpt
(
SOCKET
socketfd
,
int32_t
level
,
int32_t
optname
,
void
*
optval
,
int32_t
optlen
);
int32_t
taosGetSockOpt
(
SOCKET
socketfd
,
int32_t
level
,
int32_t
optname
,
void
*
optval
,
int32_t
*
optlen
);
...
...
@@ -86,6 +99,10 @@ uint32_t taosGetIpv4FromFqdn(const char *);
void
tinet_ntoa
(
char
*
ipstr
,
uint32_t
ip
);
uint32_t
ip2uint
(
const
char
*
const
ip_addr
);
#endif
void
taosBlockSIGPIPE
();
#ifdef __cplusplus
}
#endif
...
...
include/util/taoserror.h
浏览文件 @
f76ca26c
...
...
@@ -84,6 +84,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_INVALID_VERSION_NUMBER TAOS_DEF_ERROR_CODE(0, 0x0120)
#define TSDB_CODE_INVALID_VERSION_STRING TAOS_DEF_ERROR_CODE(0, 0x0121)
#define TSDB_CODE_VERSION_NOT_COMPATIBLE TAOS_DEF_ERROR_CODE(0, 0x0122)
#define TSDB_CODE_COMPRESS_ERROR TAOS_DEF_ERROR_CODE(0, 0x0123)
//client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)
...
...
include/util/thttp.h
浏览文件 @
f76ca26c
...
...
@@ -22,7 +22,9 @@
extern
"C"
{
#endif
int32_t
taosSendHttpReport
(
const
char
*
server
,
uint16_t
port
,
const
char
*
pCont
,
int32_t
contLen
);
typedef
enum
{
HTTP_GZIP
,
HTTP_FLAT
}
EHttpCompFlag
;
int32_t
taosSendHttpReport
(
const
char
*
server
,
uint16_t
port
,
char
*
pCont
,
int32_t
contLen
,
EHttpCompFlag
flag
);
#ifdef __cplusplus
}
...
...
source/common/src/tglobal.c
浏览文件 @
f76ca26c
...
...
@@ -52,6 +52,7 @@ int32_t tsMonitorInterval = 5;
char
tsMonitorFqdn
[
TSDB_FQDN_LEN
]
=
{
0
};
uint16_t
tsMonitorPort
=
6043
;
int32_t
tsMonitorMaxLogs
=
100
;
bool
tsMonitorComp
=
false
;
/*
* denote if the server needs to compress response message at the application layer to client, including query rsp,
...
...
@@ -346,6 +347,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if
(
cfgAddString
(
pCfg
,
"monitorFqdn"
,
tsMonitorFqdn
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"monitorPort"
,
tsMonitorPort
,
1
,
65056
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"monitorMaxLogs"
,
tsMonitorMaxLogs
,
1
,
1000000
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"monitorComp"
,
tsMonitorComp
,
0
)
!=
0
)
return
-
1
;
return
0
;
}
...
...
@@ -462,6 +464,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tstrncpy
(
tsMonitorFqdn
,
cfgGetItem
(
pCfg
,
"monitorFqdn"
)
->
str
,
TSDB_FQDN_LEN
);
tsMonitorPort
=
(
uint16_t
)
cfgGetItem
(
pCfg
,
"monitorPort"
)
->
i32
;
tsMonitorMaxLogs
=
cfgGetItem
(
pCfg
,
"monitorMaxLogs"
)
->
i32
;
tsMonitorComp
=
cfgGetItem
(
pCfg
,
"monitorComp"
)
->
bval
;
if
(
tsQueryBufferSize
>=
0
)
{
tsQueryBufferSizeBytes
=
tsQueryBufferSize
*
1048576UL
;
...
...
source/dnode/mgmt/impl/src/dndEnv.c
浏览文件 @
f76ca26c
...
...
@@ -298,7 +298,7 @@ int32_t dndInit() {
return
-
1
;
}
SMonCfg
monCfg
=
{.
maxLogs
=
tsMonitorMaxLogs
,
.
port
=
tsMonitorPort
,
.
server
=
tsMonitorFqdn
};
SMonCfg
monCfg
=
{.
maxLogs
=
tsMonitorMaxLogs
,
.
port
=
tsMonitorPort
,
.
server
=
tsMonitorFqdn
,
.
comp
=
tsMonitorComp
};
if
(
monInit
(
&
monCfg
)
!=
0
)
{
dError
(
"failed to init monitor since %s"
,
terrstr
());
dndCleanup
();
...
...
source/dnode/mnode/impl/src/mndTelem.c
浏览文件 @
f76ca26c
...
...
@@ -87,7 +87,7 @@ static int32_t mndProcessTelemTimer(SMnodeMsg* pReq) {
taosWLockLatch
(
&
pMgmt
->
lock
);
char
*
pCont
=
mndBuildTelemetryReport
(
pMnode
);
if
(
pCont
!=
NULL
)
{
taosSendHttpReport
(
TELEMETRY_SERVER
,
TELEMETRY_PORT
,
pCont
,
strlen
(
pCont
));
taosSendHttpReport
(
TELEMETRY_SERVER
,
TELEMETRY_PORT
,
pCont
,
strlen
(
pCont
)
,
HTTP_FLAT
);
free
(
pCont
);
}
taosWUnLockLatch
(
&
pMgmt
->
lock
);
...
...
source/dnode/vnode/inc/meta.h
浏览文件 @
f76ca26c
...
...
@@ -57,6 +57,7 @@ STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid);
SSchemaWrapper
*
metaGetTableSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
,
bool
isinline
);
STSchema
*
metaGetTbTSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
);
SSmaCfg
*
metaGetSmaInfoByName
(
SMeta
*
pMeta
,
const
char
*
indexName
);
STSmaWrapper
*
metaGetSmaInfoByUid
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
SMTbCursor
*
metaOpenTbCursor
(
SMeta
*
pMeta
);
void
metaCloseTbCursor
(
SMTbCursor
*
pTbCur
);
...
...
source/dnode/vnode/inc/tsdb.h
浏览文件 @
f76ca26c
...
...
@@ -87,6 +87,27 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp);
int
tsdbPrepareCommit
(
STsdb
*
pTsdb
);
int
tsdbCommit
(
STsdb
*
pTsdb
);
/**
* @brief Insert tSma(Time-range-wise SMA) data from stream computing engine
*
* @param pTsdb
* @param param
* @param pData
* @return int32_t
*/
int32_t
tsdbInsertTSmaData
(
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
);
/**
* @brief Insert RSma(Time-range-wise Rollup SMA) data.
*
* @param pTsdb
* @param param
* @param pData
* @return int32_t
*/
int32_t
tsdbInsertRSmaData
(
STsdb
*
pTsdb
,
SRSma
*
param
,
STSmaData
*
pData
);
// STsdbCfg
int
tsdbOptionsInit
(
STsdbCfg
*
);
void
tsdbOptionsClear
(
STsdbCfg
*
);
...
...
source/dnode/vnode/src/inc/tsdbDef.h
浏览文件 @
f76ca26c
...
...
@@ -35,6 +35,7 @@
#include "tsdbMemory.h"
#include "tsdbOptions.h"
#include "tsdbReadImpl.h"
#include "tsdbSma.h"
#ifdef __cplusplus
extern
"C"
{
...
...
source/dnode/vnode/src/inc/tsdbFS.h
浏览文件 @
f76ca26c
...
...
@@ -42,6 +42,7 @@ typedef struct {
typedef
struct
{
STsdbFSMeta
meta
;
// FS meta
SArray
*
df
;
// data file array
SArray
*
smaf
;
// sma data file array
}
SFSStatus
;
typedef
struct
{
...
...
source/dnode/vnode/src/inc/tsdbSma.h
0 → 100644
浏览文件 @
f76ca26c
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_SMA_H_
#define _TD_TSDB_SMA_H_
// insert/update interface
int32_t
tsdbInsertTSmaDataImpl
(
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
);
int32_t
tsdbInsertRSmaDataImpl
(
STsdb
*
pTsdb
,
SRSma
*
param
,
STSmaData
*
pData
);
// query interface
// TODO: This is the basic params, and should wrap the params to a queryHandle.
int32_t
tsdbGetTSmaDataImpl
(
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
,
STimeWindow
*
queryWin
,
int32_t
nMaxResult
);
// management interface
int32_t
tsdbGetTSmaStatus
(
STsdb
*
pTsdb
,
STSma
*
param
,
void
*
result
);
int32_t
tsdbRemoveTSmaData
(
STsdb
*
pTsdb
,
STSma
*
param
,
STimeWindow
*
pWin
);
// internal func
static
FORCE_INLINE
int32_t
tsdbEncodeTSmaKey
(
uint64_t
tableUid
,
col_id_t
colId
,
TSKEY
tsKey
,
void
**
pData
)
{
int32_t
len
=
0
;
len
+=
taosEncodeFixedU64
(
pData
,
tableUid
);
len
+=
taosEncodeFixedU16
(
pData
,
colId
);
len
+=
taosEncodeFixedI64
(
pData
,
tsKey
);
return
len
;
}
#if 0
typedef struct {
int minFid;
int midFid;
int maxFid;
TSKEY minKey;
} SRtn;
typedef struct {
uint64_t uid;
int64_t offset;
int64_t size;
} SKVRecord;
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn);
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
if (key < 0) {
return (int)((key + 1) / tsTickPerDay[precision] / days - 1);
} else {
return (int)((key / tsTickPerDay[precision] / days));
}
}
static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
if (fid >= pRtn->maxFid) {
return 0;
} else if (fid >= pRtn->midFid) {
return 1;
} else if (fid >= pRtn->minFid) {
return 2;
} else {
return -1;
}
}
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)
int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord);
void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord);
void *tsdbCommitData(STsdbRepo *pRepo);
int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn);
int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx);
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock,
bool isLast, bool isSuper, void **ppBuf, void **ppCBuf);
int tsdbApplyRtn(STsdbRepo *pRepo);
#endif
#endif
/* _TD_TSDB_SMA_H_ */
\ No newline at end of file
source/dnode/vnode/src/meta/metaBDBImpl.c
浏览文件 @
f76ca26c
...
...
@@ -833,6 +833,7 @@ SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
}
pCur
->
uid
=
uid
;
// TODO: lock?
ret
=
pDB
->
pCtbIdx
->
cursor
(
pDB
->
pSmaIdx
,
NULL
,
&
(
pCur
->
pCur
),
0
);
if
(
ret
!=
0
)
{
free
(
pCur
);
...
...
@@ -852,25 +853,68 @@ void metaCloseSmaCurosr(SMSmaCursor *pCur) {
}
}
const
char
*
metaSmaCursorNext
(
SMSmaCursor
*
pCur
)
{
DBT
skey
=
{
0
};
DBT
pkey
=
{
0
};
DBT
pval
=
{
0
};
void
*
pBuf
;
const
char
*
metaSmaCursorNext
(
SMSmaCursor
*
pCur
)
{
DBT
skey
=
{
0
};
DBT
pkey
=
{
0
};
DBT
pval
=
{
0
};
// Set key
skey
.
data
=
&
(
pCur
->
uid
);
skey
.
size
=
sizeof
(
pCur
->
uid
);
// TODO: lock?
if
(
pCur
->
pCur
->
pget
(
pCur
->
pCur
,
&
skey
,
&
pkey
,
&
pval
,
DB_NEXT
)
==
0
)
{
const
char
*
indexName
=
(
const
char
*
)
pkey
.
data
;
const
char
*
indexName
=
(
const
char
*
)
pkey
.
data
;
assert
(
indexName
!=
NULL
);
return
indexName
;
}
else
{
return
0
;
return
NULL
;
}
}
STSmaWrapper
*
metaGetSmaInfoByUid
(
SMeta
*
pMeta
,
tb_uid_t
uid
)
{
STSmaWrapper
*
pSW
=
NULL
;
pSW
=
calloc
(
sizeof
(
*
pSW
),
1
);
if
(
pSW
==
NULL
)
{
return
NULL
;
}
SMSmaCursor
*
pCur
=
metaOpenSmaCursor
(
pMeta
,
uid
);
if
(
pCur
==
NULL
)
{
free
(
pSW
);
return
NULL
;
}
DBT
skey
=
{.
data
=
&
(
pCur
->
uid
)};
DBT
pval
=
{.
size
=
sizeof
(
pCur
->
uid
)};
void
*
pBuf
=
NULL
;
while
(
true
)
{
// TODO: lock?
if
(
pCur
->
pCur
->
pget
(
pCur
->
pCur
,
&
skey
,
NULL
,
&
pval
,
DB_NEXT
)
==
0
)
{
++
pSW
->
number
;
STSma
*
tptr
=
(
STSma
*
)
realloc
(
pSW
->
tSma
,
pSW
->
number
*
sizeof
(
STSma
));
if
(
tptr
==
NULL
)
{
metaCloseSmaCurosr
(
pCur
);
tdDestroyTSmaWrapper
(
pSW
,
true
);
return
NULL
;
}
pSW
->
tSma
=
tptr
;
pBuf
=
pval
.
data
;
if
(
tDecodeTSma
(
pBuf
,
pSW
->
tSma
+
pSW
->
number
-
1
)
==
NULL
)
{
metaCloseSmaCurosr
(
pCur
);
tdDestroyTSmaWrapper
(
pSW
,
true
);
return
NULL
;
}
continue
;
}
break
;
}
metaCloseSmaCurosr
(
pCur
);
return
pSW
;
}
static
void
metaDBWLock
(
SMetaDB
*
pDB
)
{
#if IMPL_WITH_LOCK
...
...
source/dnode/vnode/src/tsdb/tsdbSma.c
0 → 100644
浏览文件 @
f76ca26c
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbDef.h"
#define SMA_STORAGE_TSDB_DAYS 30
#define SMA_STORAGE_SPLIT_HOURS 24
#define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8
#define SMA_STORE_SINGLE_BLOCKS // store SMA data by single block or multiple blocks
typedef
enum
{
SMA_STORAGE_LEVEL_TSDB
=
0
,
// store TSma in dir e.g. vnode${N}/tsdb/.tsma
SMA_STORAGE_LEVEL_DFILESET
=
1
// store TSma in file e.g. vnode${N}/tsdb/v2f1900.tsma.${sma_index_name}
}
ESmaStorageLevel
;
typedef
struct
{
STsdb
*
pTsdb
;
char
*
pDFile
;
// TODO: use the real DFile type, not char*
int32_t
interval
;
// interval with the precision of DB
int32_t
blockSize
;
// size of SMA block item
// TODO
}
STSmaWriteH
;
typedef
struct
{
int32_t
iter
;
}
SmaFsIter
;
typedef
struct
{
STsdb
*
pTsdb
;
char
*
pDFile
;
// TODO: use the real DFile type, not char*
int32_t
interval
;
// interval with the precision of DB
int32_t
blockSize
;
// size of SMA block item
int8_t
storageLevel
;
int8_t
days
;
SmaFsIter
smaFsIter
;
// TODO
}
STSmaReadH
;
// declaration of static functions
static
int32_t
tsdbInitTSmaWriteH
(
STSmaWriteH
*
pSmaH
,
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
);
static
int32_t
tsdbInitTSmaReadH
(
STSmaReadH
*
pSmaH
,
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
);
static
int32_t
tsdbJudgeStorageLevel
(
int64_t
interval
,
int8_t
intervalUnit
);
static
int32_t
tsdbInsertTSmaDataSection
(
STSmaWriteH
*
pSmaH
,
STSmaData
*
pData
,
int32_t
sectionDataLen
,
int32_t
nBlocks
);
static
int32_t
tsdbInsertTSmaBlocks
(
void
*
bTree
,
const
char
*
smaKey
,
const
char
*
pData
,
int32_t
dataLen
);
static
int32_t
tsdbTSmaDataSplit
(
STSmaWriteH
*
pSmaH
,
STSma
*
param
,
STSmaData
*
pData
,
int32_t
days
,
int32_t
nOffset
,
int32_t
fid
,
int32_t
*
nSmaBlocks
);
static
int64_t
tsdbGetIntervalByPrecision
(
int64_t
interval
,
uint8_t
intervalUnit
,
int8_t
precision
);
static
int32_t
tsdbSetTSmaDataFile
(
STSmaWriteH
*
pSmaH
,
STSma
*
param
,
STSmaData
*
pData
,
int32_t
storageLevel
,
int32_t
fid
);
static
int32_t
tsdbInitTSmaReadH
(
STSmaReadH
*
pSmaH
,
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
);
static
int32_t
tsdbInitTSmaFile
(
STSmaReadH
*
pReadH
,
STSma
*
param
,
STimeWindow
*
queryWin
);
static
bool
tsdbSetAndOpenTSmaFile
(
STSmaReadH
*
pReadH
,
STSma
*
param
,
STimeWindow
*
queryWin
);
/**
* @brief Judge the tSma storage level
*
* @param interval
* @param intervalUnit
* @return int32_t
*/
static
int32_t
tsdbJudgeStorageLevel
(
int64_t
interval
,
int8_t
intervalUnit
)
{
// TODO: configurable for SMA_STORAGE_SPLIT_HOURS?
switch
(
intervalUnit
)
{
case
TD_TIME_UNIT_HOUR
:
if
(
interval
<
SMA_STORAGE_SPLIT_HOURS
)
{
return
SMA_STORAGE_LEVEL_DFILESET
;
}
break
;
case
TD_TIME_UNIT_MINUTE
:
if
(
interval
<
60
*
SMA_STORAGE_SPLIT_HOURS
)
{
return
SMA_STORAGE_LEVEL_DFILESET
;
}
break
;
case
TD_TIME_UNIT_SEC
:
if
(
interval
<
3600
*
SMA_STORAGE_SPLIT_HOURS
)
{
return
SMA_STORAGE_LEVEL_DFILESET
;
}
break
;
case
TD_TIME_UNIT_MILLISEC
:
if
(
interval
<
3600
*
1e3
*
SMA_STORAGE_SPLIT_HOURS
)
{
return
SMA_STORAGE_LEVEL_DFILESET
;
}
break
;
case
TD_TIME_UNIT_MICROSEC
:
if
(
interval
<
3600
*
1e6
*
SMA_STORAGE_SPLIT_HOURS
)
{
return
SMA_STORAGE_LEVEL_DFILESET
;
}
break
;
case
TD_TIME_UNIT_NANOSEC
:
if
(
interval
<
3600
*
1e9
*
SMA_STORAGE_SPLIT_HOURS
)
{
return
SMA_STORAGE_LEVEL_DFILESET
;
}
break
;
default:
break
;
}
return
SMA_STORAGE_LEVEL_TSDB
;
}
/**
* @brief Insert TSma data blocks to B+Tree
*
* @param bTree
* @param smaKey
* @param pData
* @param dataLen
* @return int32_t
*/
static
int32_t
tsdbInsertTSmaBlocks
(
void
*
bTree
,
const
char
*
smaKey
,
const
char
*
pData
,
int32_t
dataLen
)
{
// TODO: insert sma data blocks into B+Tree
printf
(
"insert sma data blocks into B+Tree: smaKey %"
PRIx64
"-%"
PRIu16
"-%"
PRIx64
", dataLen %d
\n
"
,
*
(
uint64_t
*
)
smaKey
,
*
(
uint16_t
*
)
POINTER_SHIFT
(
smaKey
,
8
),
*
(
int64_t
*
)
POINTER_SHIFT
(
smaKey
,
10
),
dataLen
);
return
TSDB_CODE_SUCCESS
;
}
static
int64_t
tsdbGetIntervalByPrecision
(
int64_t
interval
,
uint8_t
intervalUnit
,
int8_t
precision
)
{
if
(
intervalUnit
<
TD_TIME_UNIT_MILLISEC
)
{
switch
(
intervalUnit
)
{
case
TD_TIME_UNIT_YEAR
:
case
TD_TIME_UNIT_SEASON
:
case
TD_TIME_UNIT_MONTH
:
case
TD_TIME_UNIT_WEEK
:
// illegal time unit
tsdbError
(
"invalid interval unit: %d
\n
"
,
intervalUnit
);
TASSERT
(
0
);
break
;
case
TD_TIME_UNIT_DAY
:
// the interval for tSma calculation must <= day
interval
*=
86400
*
1e3
;
break
;
case
TD_TIME_UNIT_HOUR
:
interval
*=
3600
*
1e3
;
break
;
case
TD_TIME_UNIT_MINUTE
:
interval
*=
60
*
1e3
;
break
;
case
TD_TIME_UNIT_SEC
:
interval
*=
1e3
;
break
;
default:
break
;
}
}
switch
(
intervalUnit
)
{
case
TD_TIME_UNIT_MILLISEC
:
if
(
TSDB_TIME_PRECISION_MILLI
==
precision
)
{
return
interval
;
}
else
if
(
TSDB_TIME_PRECISION_MICRO
==
precision
)
{
return
interval
*
1e3
;
}
else
{
// nano second
return
interval
*
1e6
;
}
break
;
case
TD_TIME_UNIT_MICROSEC
:
if
(
TSDB_TIME_PRECISION_MILLI
==
precision
)
{
return
interval
/
1e3
;
}
else
if
(
TSDB_TIME_PRECISION_MICRO
==
precision
)
{
return
interval
;
}
else
{
// nano second
return
interval
*
1e3
;
}
break
;
case
TD_TIME_UNIT_NANOSEC
:
if
(
TSDB_TIME_PRECISION_MILLI
==
precision
)
{
return
interval
/
1e6
;
}
else
if
(
TSDB_TIME_PRECISION_MICRO
==
precision
)
{
return
interval
/
1e3
;
}
else
{
// nano second
return
interval
;
}
break
;
default:
if
(
TSDB_TIME_PRECISION_MILLI
==
precision
)
{
return
interval
*
1e3
;
}
else
if
(
TSDB_TIME_PRECISION_MICRO
==
precision
)
{
return
interval
*
1e6
;
}
else
{
// nano second
return
interval
*
1e9
;
}
break
;
}
return
interval
;
}
/**
* @brief Split the TSma data blocks into expected size and insert into B+Tree.
*
* @param pSmaH
* @param pData
* @param nOffset The nOffset of blocks since fid changes.
* @param nBlocks The nBlocks with the same fid since nOffset.
* @return int32_t
*/
static
int32_t
tsdbInsertTSmaDataSection
(
STSmaWriteH
*
pSmaH
,
STSmaData
*
pData
,
int32_t
nOffset
,
int32_t
nBlocks
)
{
STsdb
*
pTsdb
=
pSmaH
->
pTsdb
;
TASSERT
(
pData
->
colIds
!=
NULL
);
tsdbDebug
(
"tsdbInsertTSmaDataSection: nOffset %d, nBlocks %d"
,
nOffset
,
nBlocks
);
printf
(
"tsdbInsertTSmaDataSection: nOffset %d, nBlocks %d
\n
"
,
nOffset
,
nBlocks
);
int32_t
colDataLen
=
pData
->
dataLen
/
pData
->
numOfColIds
;
int32_t
sectionDataLen
=
pSmaH
->
blockSize
*
nBlocks
;
for
(
col_id_t
i
=
0
;
i
<
pData
->
numOfColIds
;
++
i
)
{
// param: pointer of B+Tree, key, value, dataLen
void
*
bTree
=
pSmaH
->
pDFile
;
#ifndef SMA_STORE_SINGLE_BLOCKS
// save tSma data blocks as a whole
char
smaKey
[
SMA_KEY_LEN
]
=
{
0
};
void
*
pSmaKey
=
&
smaKey
;
tsdbEncodeTSmaKey
(
pData
->
tableUid
,
*
(
pData
->
colIds
+
i
),
pData
->
tsWindow
.
skey
+
nOffset
*
pSmaH
->
interval
,
(
void
**
)
&
pSmaKey
);
if
(
tsdbInsertTSmaBlocks
(
bTree
,
smaKey
,
pData
->
data
+
i
*
colDataLen
+
nOffset
*
pSmaH
->
blockSize
,
sectionDataLen
)
<
0
)
{
tsdbWarn
(
"vgId:%d insert tSma blocks failed since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
}
#else
// save tSma data blocks separately
for
(
int32_t
n
=
0
;
n
<
nBlocks
;
++
n
)
{
char
smaKey
[
SMA_KEY_LEN
]
=
{
0
};
void
*
pSmaKey
=
&
smaKey
;
tsdbEncodeTSmaKey
(
pData
->
tableUid
,
*
(
pData
->
colIds
+
i
),
pData
->
tsWindow
.
skey
+
(
nOffset
+
n
)
*
pSmaH
->
interval
,
(
void
**
)
&
pSmaKey
);
if
(
tsdbInsertTSmaBlocks
(
bTree
,
smaKey
,
pData
->
data
+
i
*
colDataLen
+
(
nOffset
+
n
)
*
pSmaH
->
blockSize
,
pSmaH
->
blockSize
)
<
0
)
{
tsdbWarn
(
"vgId:%d insert tSma blocks failed since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
}
}
#endif
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tsdbInitTSmaWriteH
(
STSmaWriteH
*
pSmaH
,
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
)
{
pSmaH
->
pTsdb
=
pTsdb
;
pSmaH
->
interval
=
tsdbGetIntervalByPrecision
(
param
->
interval
,
param
->
intervalUnit
,
REPO_CFG
(
pTsdb
)
->
precision
);
pSmaH
->
blockSize
=
param
->
numOfFuncIds
*
sizeof
(
int64_t
);
}
static
int32_t
tsdbSetTSmaDataFile
(
STSmaWriteH
*
pSmaH
,
STSma
*
param
,
STSmaData
*
pData
,
int32_t
storageLevel
,
int32_t
fid
)
{
// TODO
pSmaH
->
pDFile
=
"tSma_interval_file_name"
;
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief Split the sma data blocks by fid.
*
* @param pSmaH
* @param param
* @param pData
* @param nOffset
* @param fid
* @param nSmaBlocks
* @return int32_t
*/
static
int32_t
tsdbTSmaDataSplit
(
STSmaWriteH
*
pSmaH
,
STSma
*
param
,
STSmaData
*
pData
,
int32_t
days
,
int32_t
nOffset
,
int32_t
fid
,
int32_t
*
nSmaBlocks
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pSmaH
->
pTsdb
);
// TODO: use binary search
for
(
int32_t
n
=
nOffset
+
1
;
n
<
pData
->
numOfBlocks
;
++
n
)
{
// TODO: The tsWindow.skey should use the precision of DB.
int32_t
tFid
=
(
int32_t
)(
TSDB_KEY_FID
(
pData
->
tsWindow
.
skey
+
pSmaH
->
interval
*
n
,
days
,
pCfg
->
precision
));
if
(
tFid
>
fid
)
{
*
nSmaBlocks
=
n
-
nOffset
;
break
;
}
}
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief Insert/Update Time-range-wise SMA data.
* - If interval < SMA_STORAGE_SPLIT_HOURS(e.g. 24), save the SMA data as a part of DFileSet to e.g.
* v3f1900.tsma.${sma_index_name}. The days is the same with that for TS data files.
* - If interval >= SMA_STORAGE_SPLIT_HOURS, save the SMA data to e.g. vnode3/tsma/v3f632.tsma.${sma_index_name}. The
* days is 30 times of the interval, and the minimum days is SMA_STORAGE_TSDB_DAYS(30d).
* - The destination file of one data block for some interval is determined by its start TS key.
*
* @param pTsdb
* @param param
* @param pData
* @return int32_t
*/
int32_t
tsdbInsertTSmaDataImpl
(
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
STSmaData
*
curData
=
pData
;
STSmaWriteH
tSmaH
=
{
0
};
tsdbInitTSmaWriteH
(
&
tSmaH
,
pTsdb
,
param
,
pData
);
if
(
pData
->
numOfBlocks
<=
0
||
pData
->
numOfColIds
<=
0
||
pData
->
dataLen
<=
0
)
{
TASSERT
(
0
);
terrno
=
TSDB_CODE_INVALID_PARA
;
return
terrno
;
}
// Step 1: Judge the storage level
int32_t
storageLevel
=
tsdbJudgeStorageLevel
(
param
->
interval
,
param
->
intervalUnit
);
int32_t
daysPerFile
=
storageLevel
==
SMA_STORAGE_LEVEL_TSDB
?
SMA_STORAGE_TSDB_DAYS
:
pCfg
->
daysPerFile
;
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
// - Set and open the DFile or the B+Tree file
int32_t
minFid
=
(
int32_t
)(
TSDB_KEY_FID
(
pData
->
tsWindow
.
skey
,
daysPerFile
,
pCfg
->
precision
));
int32_t
maxFid
=
(
int32_t
)(
TSDB_KEY_FID
(
pData
->
tsWindow
.
ekey
,
daysPerFile
,
pCfg
->
precision
));
if
(
minFid
==
maxFid
)
{
// Save all the TSma data to one file
// TODO: tsdbStartTSmaCommit();
tsdbSetTSmaDataFile
(
&
tSmaH
,
param
,
pData
,
storageLevel
,
minFid
);
tsdbInsertTSmaDataSection
(
&
tSmaH
,
pData
,
0
,
pData
->
numOfBlocks
);
// TODO:tsdbEndTSmaCommit();
}
else
if
(
minFid
<
maxFid
)
{
// Split the TSma data and save to multiple files. As there is limit for the span, it can't span more than 2 files
// actually.
// TODO: tsdbStartTSmaCommit();
int32_t
tFid
=
minFid
;
int32_t
nOffset
=
0
;
int32_t
nSmaBlocks
=
0
;
do
{
tsdbTSmaDataSplit
(
&
tSmaH
,
param
,
pData
,
daysPerFile
,
nOffset
,
tFid
,
&
nSmaBlocks
);
tsdbSetTSmaDataFile
(
&
tSmaH
,
param
,
pData
,
storageLevel
,
tFid
);
if
(
tsdbInsertTSmaDataSection
(
&
tSmaH
,
pData
,
nOffset
,
nSmaBlocks
)
<
0
)
{
return
terrno
;
}
++
tFid
;
nOffset
+=
nSmaBlocks
;
if
(
tFid
==
maxFid
)
{
tsdbSetTSmaDataFile
(
&
tSmaH
,
param
,
pData
,
storageLevel
,
tFid
);
tsdbInsertTSmaDataSection
(
&
tSmaH
,
pData
,
nOffset
,
pData
->
numOfBlocks
-
nOffset
);
break
;
}
}
while
(
true
);
// TODO:tsdbEndTSmaCommit();
}
else
{
terrno
=
TSDB_CODE_INVALID_PARA
;
return
terrno
;
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tsdbSetRSmaDataFile
(
STSmaWriteH
*
pSmaH
,
SRSma
*
param
,
STSmaData
*
pData
,
int32_t
fid
)
{
// TODO
pSmaH
->
pDFile
=
"rSma_interval_file_name"
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdbInsertRSmaDataImpl
(
STsdb
*
pTsdb
,
SRSma
*
param
,
STSmaData
*
pData
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
STSma
*
tParam
=
&
param
->
tsma
;
STSmaData
*
curData
=
pData
;
STSmaWriteH
tSmaH
=
{
0
};
tsdbInitTSmaWriteH
(
&
tSmaH
,
pTsdb
,
tParam
,
pData
);
int32_t
nSmaBlocks
=
pData
->
numOfBlocks
;
int32_t
colDataLen
=
pData
->
dataLen
/
nSmaBlocks
;
// Step 2.2: Storage of SMA_STORAGE_LEVEL_DFILESET
// TODO: Use the daysPerFile for rSma data, not for TS data.
// TODO: The lifecycle of rSma data should be processed like the TS data files.
int32_t
minFid
=
(
int32_t
)(
TSDB_KEY_FID
(
pData
->
tsWindow
.
skey
,
pCfg
->
daysPerFile
,
pCfg
->
precision
));
int32_t
maxFid
=
(
int32_t
)(
TSDB_KEY_FID
(
pData
->
tsWindow
.
ekey
,
pCfg
->
daysPerFile
,
pCfg
->
precision
));
if
(
minFid
==
maxFid
)
{
// Save all the TSma data to one file
tsdbSetRSmaDataFile
(
&
tSmaH
,
param
,
pData
,
minFid
);
// TODO: tsdbStartTSmaCommit();
tsdbInsertTSmaDataSection
(
&
tSmaH
,
pData
,
colDataLen
,
nSmaBlocks
);
// TODO:tsdbEndTSmaCommit();
}
else
if
(
minFid
<
maxFid
)
{
// Split the TSma data and save to multiple files. As there is limit for the span, it can't span more than 2 files
// actually.
// TODO: tsdbStartTSmaCommit();
int32_t
tmpFid
=
0
;
int32_t
step
=
0
;
for
(
int32_t
n
=
0
;
n
<
pData
->
numOfBlocks
;
++
n
)
{
}
tsdbInsertTSmaDataSection
(
&
tSmaH
,
pData
,
colDataLen
,
nSmaBlocks
);
// TODO:tsdbEndTSmaCommit();
}
else
{
TASSERT
(
0
);
return
TSDB_CODE_INVALID_PARA
;
}
// Step 4: finish
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief Init of tSma ReadH
*
* @param pSmaH
* @param pTsdb
* @param param
* @param pData
* @return int32_t
*/
static
int32_t
tsdbInitTSmaReadH
(
STSmaReadH
*
pSmaH
,
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
)
{
pSmaH
->
pTsdb
=
pTsdb
;
pSmaH
->
interval
=
tsdbGetIntervalByPrecision
(
param
->
interval
,
param
->
intervalUnit
,
REPO_CFG
(
pTsdb
)
->
precision
);
pSmaH
->
blockSize
=
param
->
numOfFuncIds
*
sizeof
(
int64_t
);
}
/**
* @brief Init of tSma FS
*
* @param pReadH
* @param param
* @param queryWin
* @return int32_t
*/
static
int32_t
tsdbInitTSmaFile
(
STSmaReadH
*
pReadH
,
STSma
*
param
,
STimeWindow
*
queryWin
)
{
int32_t
storageLevel
=
tsdbJudgeStorageLevel
(
param
->
interval
,
param
->
intervalUnit
);
int32_t
daysPerFile
=
storageLevel
==
SMA_STORAGE_LEVEL_TSDB
?
SMA_STORAGE_TSDB_DAYS
:
REPO_CFG
(
pReadH
->
pTsdb
)
->
daysPerFile
;
pReadH
->
storageLevel
=
storageLevel
;
pReadH
->
days
=
daysPerFile
;
pReadH
->
smaFsIter
.
iter
=
0
;
}
/**
* @brief Set and open tSma file if it has key locates in queryWin.
*
* @param pReadH
* @param param
* @param queryWin
* @return true
* @return false
*/
static
bool
tsdbSetAndOpenTSmaFile
(
STSmaReadH
*
pReadH
,
STSma
*
param
,
STimeWindow
*
queryWin
)
{
SArray
*
smaFs
=
pReadH
->
pTsdb
->
fs
->
cstatus
->
smaf
;
int32_t
nSmaFs
=
taosArrayGetSize
(
smaFs
);
pReadH
->
pDFile
=
NULL
;
while
(
pReadH
->
smaFsIter
.
iter
<
nSmaFs
)
{
void
*
pSmaFile
=
taosArrayGet
(
smaFs
,
pReadH
->
smaFsIter
.
iter
);
if
(
pSmaFile
)
{
// match(indexName, queryWindow)
// TODO: select the file by index_name ...
pReadH
->
pDFile
=
pSmaFile
;
++
pReadH
->
smaFsIter
.
iter
;
break
;
}
++
pReadH
->
smaFsIter
.
iter
;
}
if
(
pReadH
->
pDFile
!=
NULL
)
{
tsdbDebug
(
"vg%d: smaFile %s matched"
,
REPO_ID
(
pReadH
->
pTsdb
),
"[pSmaFile dir]"
);
return
true
;
}
return
false
;
}
/**
* @brief Return the data between queryWin and fill the pData.
*
* @param pTsdb
* @param param
* @param pData
* @param queryWin
* @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM.
* @return int32_t
*/
int32_t
tsdbGetTSmaDataImpl
(
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
,
STimeWindow
*
queryWin
,
int32_t
nMaxResult
)
{
STSmaReadH
tReadH
=
{
0
};
tsdbInitTSmaReadH
(
&
tReadH
,
pTsdb
,
param
,
pData
);
tsdbInitTSmaFile
(
&
tReadH
,
param
,
queryWin
);
int32_t
nResult
=
0
;
int64_t
lastKey
=
0
;
while
(
true
)
{
if
(
nResult
>=
nMaxResult
)
{
break
;
}
// set and open the file according to the STSma param
if
(
tsdbSetAndOpenTSmaFile
(
&
tReadH
,
param
,
queryWin
))
{
char
bTree
[
100
]
=
"
\0
"
;
while
(
strncmp
(
bTree
,
"has more nodes"
,
100
)
==
0
)
{
if
(
nResult
>=
nMaxResult
)
{
break
;
}
// tsdbGetDataFromBTree(bTree, queryWin, lastKey)
// fill the pData
++
nResult
;
}
}
}
// read data from file and fill the result
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief Get the start TS key of the last data block of one interval/sliding.
*
* @param pTsdb
* @param param
* @param result
* @return int32_t
* 1) Return 0 and fill the result if the check procedure is normal;
* 2) Return -1 if error occurs during the check procedure.
*/
int32_t
tsdbGetTSmaStatus
(
STsdb
*
pTsdb
,
STSma
*
param
,
void
*
result
)
{
const
char
*
procedure
=
""
;
if
(
strncmp
(
procedure
,
"get the start TS key of the last data block"
,
100
)
!=
0
)
{
return
-
1
;
}
// fill the result
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief Remove the tSma data files related to param between pWin.
*
* @param pTsdb
* @param param
* @param pWin
* @return int32_t
*/
int32_t
tsdbRemoveTSmaData
(
STsdb
*
pTsdb
,
STSma
*
param
,
STimeWindow
*
pWin
)
{
// for ("tSmaFiles of param-interval-sliding between pWin") {
// // remove the tSmaFile
// }
return
TSDB_CODE_SUCCESS
;
}
\ No newline at end of file
source/dnode/vnode/src/tsdb/tsdbWrite.c
浏览文件 @
f76ca26c
...
...
@@ -15,6 +15,14 @@
#include "tsdbDef.h"
/**
* @brief insert TS data
*
* @param pTsdb
* @param pMsg
* @param pRsp
* @return int
*/
int
tsdbInsertData
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
,
SSubmitRsp
*
pRsp
)
{
// Check if mem is there. If not, create one.
if
(
pTsdb
->
mem
==
NULL
)
{
...
...
@@ -24,4 +32,37 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) {
}
}
return
tsdbMemTableInsert
(
pTsdb
,
pTsdb
->
mem
,
pMsg
,
NULL
);
}
/**
* @brief Insert/Update tSma(Time-range-wise SMA) data from stream computing engine
*
* @param pTsdb
* @param param
* @param pData
* @return int32_t
* TODO: Who is responsible for resource allocate and release?
*/
int32_t
tsdbInsertTSmaData
(
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
tsdbInsertTSmaDataImpl
(
pTsdb
,
param
,
pData
))
<
0
)
{
tsdbWarn
(
"vgId:%d insert tSma data failed since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
}
return
code
;
}
/**
* @brief Insert Time-range-wise Rollup Sma(RSma) data
*
* @param pTsdb
* @param param
* @param pData
* @return int32_t
*/
int32_t
tsdbInsertRSmaData
(
STsdb
*
pTsdb
,
SRSma
*
param
,
STSmaData
*
pData
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
tsdbInsertRSmaDataImpl
(
pTsdb
,
param
,
pData
))
<
0
)
{
tsdbWarn
(
"vgId:%d insert rSma data failed since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
}
return
code
;
}
\ No newline at end of file
source/dnode/vnode/src/vnd/vnodeWrite.c
浏览文件 @
f76ca26c
...
...
@@ -132,6 +132,15 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
if
(
tqProcessRebReq
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)))
<
0
)
{
}
}
break
;
case
TDMT_VND_CREATE_SMA
:
{
// timeRangeSMA
// 1. tdCreateSmaMeta(pVnode->pMeta,...);
// 2. tdCreateSmaDataInit();
// 3. tdCreateSmaData
}
break
;
case
TDMT_VND_CANCEL_SMA
:
{
// timeRangeSMA
}
break
;
case
TDMT_VND_DROP_SMA
:
{
// timeRangeSMA
}
break
;
default:
ASSERT
(
0
);
break
;
...
...
source/dnode/vnode/test/tsdbSmaTest.cpp
浏览文件 @
f76ca26c
...
...
@@ -95,7 +95,7 @@ TEST(testCase, tSmaEncodeDecodeTest) {
// resource release
tdDestroyTSma
(
&
tSma
,
false
);
tdDestroyTSmaWrapper
(
&
dstTSmaWrapper
);
tdDestroyTSmaWrapper
(
&
dstTSmaWrapper
,
false
);
}
TEST
(
testCase
,
tSma_DB_Put_Get_Del_Test
)
{
...
...
@@ -161,7 +161,7 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
EXPECT_EQ
(
qSmaCfg
->
interval
,
tSma
.
interval
);
tdDestroyTSma
(
qSmaCfg
,
true
);
// get
valu
e by table uid
// get
index nam
e by table uid
SMSmaCursor
*
pSmaCur
=
metaOpenSmaCursor
(
pMeta
,
tbUid
);
assert
(
pSmaCur
!=
NULL
);
uint32_t
indexCnt
=
0
;
...
...
@@ -176,6 +176,15 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
EXPECT_EQ
(
indexCnt
,
2
);
metaCloseSmaCurosr
(
pSmaCur
);
// get wrapper by table uid
STSmaWrapper
*
pSW
=
metaGetSmaInfoByUid
(
pMeta
,
tbUid
);
assert
(
pSW
!=
NULL
);
EXPECT_EQ
(
pSW
->
number
,
2
);
EXPECT_STRCASEEQ
(
pSW
->
tSma
->
indexName
,
smaIndexName1
);
EXPECT_EQ
(
pSW
->
tSma
->
tableUid
,
tSma
.
tableUid
);
EXPECT_STRCASEEQ
((
pSW
->
tSma
+
1
)
->
indexName
,
smaIndexName2
);
EXPECT_EQ
((
pSW
->
tSma
+
1
)
->
tableUid
,
tSma
.
tableUid
);
// resource release
metaRemoveSmaFromDb
(
pMeta
,
smaIndexName1
);
metaRemoveSmaFromDb
(
pMeta
,
smaIndexName2
);
...
...
@@ -197,15 +206,15 @@ TEST(testCase, tSmaInsertTest) {
int32_t blockSize = tSma.numOfFuncIds * sizeof(int64_t);
int32_t numOfColIds = 3;
int32_t numOf
Sma
Blocks = 10;
int32_t numOfBlocks = 10;
int32_t dataLen = numOfColIds * numOf
Sma
Blocks * blockSize;
int32_t dataLen = numOfColIds * numOfBlocks * blockSize;
pSmaData = (STSmaData*)malloc(sizeof(STSmaData) + dataLen);
ASSERT_EQ(pSmaData != NULL, true);
pSmaData->tableUid = 3232329230;
pSmaData->numOfColIds = numOfColIds;
pSmaData->numOf
SmaBlocks = numOfSma
Blocks;
pSmaData->numOf
Blocks = numOf
Blocks;
pSmaData->dataLen = dataLen;
pSmaData->tsWindow.skey = 1640000000;
pSmaData->tsWindow.ekey = 1645788649;
...
...
source/libs/index/test/indexTests.cc
浏览文件 @
f76ca26c
...
...
@@ -1058,7 +1058,7 @@ TEST_F(IndexEnv2, testIndex_read_performance4) {
std
::
cout
<<
"reader sz: "
<<
index
->
SearchOne
(
"tag1"
,
"Hello"
)
<<
std
::
endl
;
assert
(
3
==
index
->
SearchOne
(
"tag10"
,
"Hello"
));
}
TEST_F
(
IndexEnv2
,
testIndex_del
)
{
TEST_F
(
IndexEnv2
,
testIndex_
cache_
del
)
{
std
::
string
path
=
"/tmp/cache_and_tfile"
;
if
(
index
->
Init
(
path
)
!=
0
)
{
}
...
...
@@ -1068,9 +1068,46 @@ TEST_F(IndexEnv2, testIndex_del) {
index
->
Del
(
"tag10"
,
"Hello"
,
12
);
index
->
Del
(
"tag10"
,
"Hello"
,
11
);
index
->
WriteMultiMillonData
(
"tag10"
,
"xxxxxxxxxxxxxx"
,
100
*
10000
);
// index->WriteMultiMillonData("tag10", "xxxxxxxxxxxxxx", 100 * 10000);
index
->
Del
(
"tag10"
,
"Hello"
,
17
);
EXPECT_EQ
(
97
,
index
->
SearchOne
(
"tag10"
,
"Hello"
));
index
->
PutOneTarge
(
"tag10"
,
"Hello"
,
17
);
// add again
EXPECT_EQ
(
98
,
index
->
SearchOne
(
"tag10"
,
"Hello"
));
// std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl;
// assert(3 == index->SearchOne("tag10", "Hello"));
// del all
for
(
int
i
=
0
;
i
<
200
;
i
++
)
{
index
->
Del
(
"tag10"
,
"Hello"
,
i
);
}
EXPECT_EQ
(
0
,
index
->
SearchOne
(
"tag10"
,
"Hello"
));
// add other item
for
(
int
i
=
0
;
i
<
2000
;
i
++
)
{
index
->
PutOneTarge
(
"tag10"
,
"World"
,
i
);
}
for
(
int
i
=
0
;
i
<
2000
;
i
++
)
{
index
->
PutOneTarge
(
"tag10"
,
"Hello"
,
i
);
}
EXPECT_EQ
(
2000
,
index
->
SearchOne
(
"tag10"
,
"Hello"
));
for
(
int
i
=
0
;
i
<
2000
;
i
++
)
{
index
->
Del
(
"tag10"
,
"Hello"
,
i
);
}
EXPECT_EQ
(
0
,
index
->
SearchOne
(
"tag10"
,
"Hello"
));
}
TEST_F
(
IndexEnv2
,
testIndex_del
)
{
std
::
string
path
=
"/tmp/cache_and_tfile"
;
if
(
index
->
Init
(
path
)
!=
0
)
{
}
for
(
int
i
=
0
;
i
<
100
;
i
++
)
{
index
->
PutOneTarge
(
"tag10"
,
"Hello"
,
i
);
}
index
->
Del
(
"tag10"
,
"Hello"
,
12
);
index
->
Del
(
"tag10"
,
"Hello"
,
11
);
index
->
WriteMultiMillonData
(
"tag10"
,
"xxxxxxxxxxxxxx"
,
100
*
10000
);
index
->
Del
(
"tag10"
,
"Hello"
,
17
);
EXPECT_EQ
(
97
,
index
->
SearchOne
(
"tag10"
,
"Hello"
));
}
source/libs/monitor/inc/monInt.h
浏览文件 @
f76ca26c
...
...
@@ -54,6 +54,7 @@ typedef struct {
int32_t
maxLogs
;
const
char
*
server
;
uint16_t
port
;
bool
comp
;
SMonState
state
;
}
SMonitor
;
...
...
source/libs/monitor/src/monitor.c
浏览文件 @
f76ca26c
...
...
@@ -45,6 +45,7 @@ int32_t monInit(const SMonCfg *pCfg) {
tsMonitor
.
maxLogs
=
pCfg
->
maxLogs
;
tsMonitor
.
server
=
pCfg
->
server
;
tsMonitor
.
port
=
pCfg
->
port
;
tsMonitor
.
comp
=
pCfg
->
comp
;
tsLogFp
=
monRecordLog
;
tsMonitor
.
state
.
time
=
taosGetTimestampMs
();
pthread_mutex_init
(
&
tsMonitor
.
lock
,
NULL
);
...
...
@@ -375,7 +376,8 @@ void monSendReport(SMonInfo *pMonitor) {
char
*
pCont
=
tjsonToString
(
pMonitor
->
pJson
);
if
(
pCont
!=
NULL
)
{
taosSendHttpReport
(
tsMonitor
.
server
,
tsMonitor
.
port
,
pCont
,
strlen
(
pCont
));
EHttpCompFlag
flag
=
tsMonitor
.
comp
?
HTTP_GZIP
:
HTTP_FLAT
;
taosSendHttpReport
(
tsMonitor
.
server
,
tsMonitor
.
port
,
pCont
,
strlen
(
pCont
),
flag
);
free
(
pCont
);
}
}
source/libs/transport/inc/transComm.h
浏览文件 @
f76ca26c
...
...
@@ -20,8 +20,6 @@
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "rpcTcp.h"
#include "rpcUdp.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
...
...
source/libs/transport/inc/transportInt.h
浏览文件 @
f76ca26c
...
...
@@ -24,8 +24,6 @@
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "rpcTcp.h"
#include "rpcUdp.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
...
...
source/libs/transport/src/rpcTcp.c
浏览文件 @
f76ca26c
...
...
@@ -21,6 +21,7 @@
#include "taoserror.h"
#include "tutil.h"
#ifndef USE_UV
typedef
struct
SFdObj
{
void
*
signature
;
SOCKET
fd
;
// TCP socket FD
...
...
@@ -659,3 +660,4 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
tfree
(
pFdObj
);
}
#endif
\ No newline at end of file
source/libs/transport/src/rpcUdp.c
浏览文件 @
f76ca26c
...
...
@@ -22,6 +22,8 @@
#include "ttimer.h"
#include "tutil.h"
#ifndef USE_UV
#define RPC_MAX_UDP_CONNS 256
#define RPC_MAX_UDP_PKTS 1000
#define RPC_UDP_BUF_TIME 5 // mseconds
...
...
@@ -257,3 +259,4 @@ int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *c
return
ret
;
}
#endif
\ No newline at end of file
source/libs/transport/src/transSrv.c
浏览文件 @
f76ca26c
...
...
@@ -56,7 +56,7 @@ typedef struct SSrvMsg {
typedef
struct
SWorkThrdObj
{
pthread_t
thread
;
uv_pipe_t
*
pipe
;
int
fd
;
uv_os_fd_t
fd
;
uv_loop_t
*
loop
;
SAsyncPool
*
asyncPool
;
// uv_async_t* workerAsync; //
...
...
source/os/src/osFile.c
浏览文件 @
f76ca26c
...
...
@@ -667,17 +667,43 @@ int64_t taosSendFile(SocketFd dfd, FileFd sfd, int64_t *offset, int64_t count) {
#else
int64_t
taosSendFile
(
SocketFd
fdDst
,
TdFilePtr
pFileSrc
,
int64_t
*
offset
,
int64_t
size
)
{
if
(
pFileSrc
==
NULL
)
{
// int64_t taosSendFile(int fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_t size) {
// if (pFileSrc == NULL) {
// return 0;
// }
// assert(pFileSrc->fd >= 0);
// int64_t leftbytes = size;
// int64_t sentbytes;
// while (leftbytes > 0) {
// sentbytes = sendfile(fdDst, pFileSrc->fd, offset, leftbytes);
// if (sentbytes == -1) {
// if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
// continue;
// } else {
// return -1;
// }
// } else if (sentbytes == 0) {
// return (int64_t)(size - leftbytes);
// }
// leftbytes -= sentbytes;
// }
// return size;
// }
int64_t
taosFSendFile
(
TdFilePtr
pFileOut
,
TdFilePtr
pFileIn
,
int64_t
*
offset
,
int64_t
size
)
{
if
(
pFileOut
==
NULL
||
pFileIn
==
NULL
)
{
return
0
;
}
assert
(
pFileSrc
->
fd
>=
0
);
assert
(
pFileIn
->
fd
>=
0
&&
pFileOut
->
fd
>=
0
);
int64_t
leftbytes
=
size
;
int64_t
sentbytes
;
while
(
leftbytes
>
0
)
{
sentbytes
=
sendfile
(
fdDst
,
pFileSrc
->
fd
,
offset
,
leftbytes
);
sentbytes
=
sendfile
(
pFileOut
->
fd
,
pFileIn
->
fd
,
offset
,
leftbytes
);
if
(
sentbytes
==
-
1
)
{
if
(
errno
==
EINTR
||
errno
==
EAGAIN
||
errno
==
EWOULDBLOCK
)
{
continue
;
...
...
@@ -694,15 +720,6 @@ int64_t taosSendFile(SocketFd fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_
return
size
;
}
int64_t
taosFSendFile
(
TdFilePtr
pFileOut
,
TdFilePtr
pFileIn
,
int64_t
*
offset
,
int64_t
size
)
{
if
(
pFileOut
==
NULL
||
pFileIn
==
NULL
)
{
return
0
;
}
assert
(
pFileOut
->
fd
>=
0
);
return
taosSendFile
(
pFileOut
->
fd
,
pFileIn
,
offset
,
size
);
}
#endif
void
taosFprintfFile
(
TdFilePtr
pFile
,
const
char
*
format
,
...)
{
...
...
source/os/src/osSocket.c
浏览文件 @
f76ca26c
...
...
@@ -34,6 +34,24 @@
#include <unistd.h>
#endif
#ifndef USE_UV
// typedef struct TdSocketServer {
// #if SOCKET_WITH_LOCK
// pthread_rwlock_t rwlock;
// #endif
// int refId;
// SocketFd fd;
// } * TdSocketServerPtr, TdSocketServer;
// typedef struct TdSocketConnector {
// #if SOCKET_WITH_LOCK
// pthread_rwlock_t rwlock;
// #endif
// int refId;
// SocketFd fd;
// } * TdSocketConnectorPtr, TdSocketConnector;
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#define taosSend(sockfd, buf, len, flags) send((SOCKET)sockfd, buf, len, flags)
...
...
@@ -115,15 +133,6 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
void
taosIgnSIGPIPE
()
{
signal
(
SIGPIPE
,
SIG_IGN
);
}
void
taosBlockSIGPIPE
()
{
sigset_t
signal_mask
;
sigemptyset
(
&
signal_mask
);
sigaddset
(
&
signal_mask
,
SIGPIPE
);
int32_t
rc
=
pthread_sigmask
(
SIG_BLOCK
,
&
signal_mask
,
NULL
);
if
(
rc
!=
0
)
{
//printf("failed to block SIGPIPE");
}
}
void
taosSetMaskSIGPIPE
()
{
sigset_t
signal_mask
;
...
...
@@ -215,7 +224,6 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
}
void
taosIgnSIGPIPE
()
{}
void
taosBlockSIGPIPE
()
{}
void
taosSetMaskSIGPIPE
()
{}
int32_t
taosSetSockOpt
(
SOCKET
socketfd
,
int32_t
level
,
int32_t
optname
,
void
*
optval
,
int32_t
optlen
)
{
...
...
@@ -786,3 +794,21 @@ int64_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len) {
return
len
;
}
#endif
#if !(defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32))
void
taosBlockSIGPIPE
()
{
sigset_t
signal_mask
;
sigemptyset
(
&
signal_mask
);
sigaddset
(
&
signal_mask
,
SIGPIPE
);
int32_t
rc
=
pthread_sigmask
(
SIG_BLOCK
,
&
signal_mask
,
NULL
);
if
(
rc
!=
0
)
{
//printf("failed to block SIGPIPE");
}
}
#else
void
taosBlockSIGPIPE
()
{}
#endif
\ No newline at end of file
source/util/CMakeLists.txt
浏览文件 @
f76ca26c
...
...
@@ -10,8 +10,14 @@ target_link_libraries(
util
PRIVATE os
PUBLIC lz4_static
PUBLIC api cjson
PUBLIC api cjson
zlib
)
if
(
${
BUILD_WITH_UV
}
)
target_link_libraries
(
util
PUBLIC uv_a
)
endif
(
${
BUILD_TEST
}
)
if
(
${
BUILD_TEST
}
)
ADD_SUBDIRECTORY
(
test
)
...
...
source/util/src/terror.c
浏览文件 @
f76ca26c
...
...
@@ -68,6 +68,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, "Client and server's t
TAOS_DEFINE_ERROR
(
TSDB_CODE_APP_NOT_READY
,
"Database not ready"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RPC_FQDN_ERROR
,
"Unable to resolve FQDN"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RPC_INVALID_VERSION
,
"Invalid app version"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_COMPRESS_ERROR
,
"Failed to compress msg"
)
//common & util
TAOS_DEFINE_ERROR
(
TSDB_CODE_OPS_NOT_SUPPORT
,
"Operation not supported"
)
...
...
source/util/src/thttp.c
浏览文件 @
f76ca26c
...
...
@@ -17,8 +17,163 @@
#include "thttp.h"
#include "taoserror.h"
#include "tlog.h"
#include "zlib.h"
int32_t
taosSendHttpReport
(
const
char
*
server
,
uint16_t
port
,
const
char
*
pCont
,
int32_t
contLen
)
{
static
int32_t
taosBuildHttpHeader
(
const
char
*
server
,
int32_t
contLen
,
char
*
pHead
,
int32_t
headLen
,
EHttpCompFlag
flag
)
{
if
(
flag
==
HTTP_FLAT
)
{
return
snprintf
(
pHead
,
headLen
,
"POST /report HTTP/1.1
\n
"
"Host: %s
\n
"
"Content-Type: application/json
\n
"
"Content-Length: %d
\n\n
"
,
server
,
contLen
);
}
else
if
(
flag
==
HTTP_GZIP
)
{
return
snprintf
(
pHead
,
headLen
,
"POST /report HTTP/1.1
\n
"
"Host: %s
\n
"
"Content-Type: application/json
\n
"
"Content-Encoding: gzip
\n
"
"Content-Length: %d
\n\n
"
,
server
,
contLen
);
}
else
{
return
-
1
;
}
}
int32_t
taosCompressHttpRport
(
char
*
pSrc
,
int32_t
srcLen
)
{
int32_t
code
=
-
1
;
int32_t
destLen
=
srcLen
;
void
*
pDest
=
malloc
(
destLen
);
if
(
pDest
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_OVER
;
}
z_stream
gzipStream
=
{
0
};
gzipStream
.
zalloc
=
(
alloc_func
)
0
;
gzipStream
.
zfree
=
(
free_func
)
0
;
gzipStream
.
opaque
=
(
voidpf
)
0
;
if
(
deflateInit2
(
&
gzipStream
,
Z_DEFAULT_COMPRESSION
,
Z_DEFLATED
,
MAX_WBITS
+
16
,
8
,
Z_DEFAULT_STRATEGY
)
!=
Z_OK
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_OVER
;
}
gzipStream
.
next_in
=
(
Bytef
*
)
pSrc
;
gzipStream
.
avail_in
=
(
uLong
)
srcLen
;
gzipStream
.
next_out
=
(
Bytef
*
)
pDest
;
gzipStream
.
avail_out
=
(
uLong
)(
destLen
);
while
(
gzipStream
.
avail_in
!=
0
&&
gzipStream
.
total_out
<
(
uLong
)(
destLen
))
{
if
(
deflate
(
&
gzipStream
,
Z_FULL_FLUSH
)
!=
Z_OK
)
{
terrno
=
TSDB_CODE_COMPRESS_ERROR
;
goto
_OVER
;
}
}
if
(
gzipStream
.
avail_in
!=
0
)
{
terrno
=
TSDB_CODE_COMPRESS_ERROR
;
goto
_OVER
;
}
int32_t
err
=
0
;
while
(
1
)
{
if
((
err
=
deflate
(
&
gzipStream
,
Z_FINISH
))
==
Z_STREAM_END
)
{
break
;
}
if
(
err
!=
Z_OK
)
{
terrno
=
TSDB_CODE_COMPRESS_ERROR
;
goto
_OVER
;
}
}
if
(
deflateEnd
(
&
gzipStream
)
!=
Z_OK
)
{
terrno
=
TSDB_CODE_COMPRESS_ERROR
;
goto
_OVER
;
}
if
(
gzipStream
.
total_out
>=
srcLen
)
{
terrno
=
TSDB_CODE_COMPRESS_ERROR
;
goto
_OVER
;
}
code
=
0
;
_OVER:
if
(
code
==
0
)
{
memcpy
(
pSrc
,
pDest
,
gzipStream
.
total_out
);
code
=
gzipStream
.
total_out
;
}
free
(
pDest
);
return
code
;
}
#ifdef USE_UV
static
void
clientConnCb
(
uv_connect_t
*
req
,
int32_t
status
)
{
if
(
status
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
status
);
uError
(
"Connection error %s
\n
"
,
uv_strerror
(
status
));
return
;
}
// impl later
uv_buf_t
*
wb
=
req
->
data
;
if
(
wb
==
NULL
)
{
uv_close
((
uv_handle_t
*
)
req
->
handle
,
NULL
);
}
uv_write_t
write_req
;
uv_write
(
&
write_req
,
req
->
handle
,
wb
,
2
,
NULL
);
uv_close
((
uv_handle_t
*
)
req
->
handle
,
NULL
);
}
int32_t
taosSendHttpReport
(
const
char
*
server
,
uint16_t
port
,
const
char
*
pCont
,
int32_t
contLen
,
EHttpCompFlag
flag
)
{
uint32_t
ipv4
=
taosGetIpv4FromFqdn
(
server
);
if
(
ipv4
==
0xffffffff
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to get http server:%s ip since %s"
,
server
,
terrstr
());
return
-
1
;
}
char
ipv4Buf
[
128
]
=
{
0
};
tinet_ntoa
(
ipv4Buf
,
ipv4
);
struct
sockaddr_in
dest
=
{
0
};
uv_ip4_addr
(
ipv4Buf
,
port
,
&
dest
);
uv_tcp_t
socket_tcp
=
{
0
};
uv_loop_t
*
loop
=
uv_default_loop
();
uv_tcp_init
(
loop
,
&
socket_tcp
);
uv_connect_t
*
connect
=
(
uv_connect_t
*
)
malloc
(
sizeof
(
uv_connect_t
));
if
(
flag
==
HTTP_GZIP
)
{
int32_t
dstLen
=
taosCompressHttpRport
(
pCont
,
contLen
);
if
(
dstLen
>
0
)
{
contLen
=
dstLen
;
}
else
{
flag
=
HTTP_FLAT
;
}
}
char
header
[
1024
]
=
{
0
};
int32_t
headLen
=
taosBuildHttpHeader
(
server
,
contLen
,
header
,
sizeof
(
header
),
flag
);
uv_buf_t
wb
[
2
];
wb
[
0
]
=
uv_buf_init
((
char
*
)
header
,
headLen
);
wb
[
1
]
=
uv_buf_init
((
char
*
)
pCont
,
contLen
);
connect
->
data
=
wb
;
uv_tcp_connect
(
connect
,
&
socket_tcp
,
(
const
struct
sockaddr
*
)
&
dest
,
clientConnCb
);
terrno
=
0
;
uv_run
(
loop
,
UV_RUN_DEFAULT
);
uv_loop_close
(
loop
);
free
(
connect
);
return
terrno
;
}
#else
int32_t
taosSendHttpReport
(
const
char
*
server
,
uint16_t
port
,
char
*
pCont
,
int32_t
contLen
,
EHttpCompFlag
flag
)
{
int32_t
code
=
-
1
;
SOCKET
fd
=
0
;
...
...
@@ -36,15 +191,19 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont,
goto
SEND_OVER
;
}
char
header
[
4096
]
=
{
0
};
int32_t
headLen
=
snprintf
(
header
,
sizeof
(
header
),
"POST /report HTTP/1.1
\n
"
"Host: %s
\n
"
"Content-Type: application/json
\n
"
"Content-Length: %d
\n\n
"
,
server
,
contLen
);
if
(
flag
==
HTTP_GZIP
)
{
int32_t
dstLen
=
taosCompressHttpRport
(
pCont
,
contLen
);
if
(
dstLen
>
0
)
{
contLen
=
dstLen
;
}
else
{
flag
=
HTTP_FLAT
;
}
}
char
header
[
1024
]
=
{
0
};
int32_t
headLen
=
taosBuildHttpHeader
(
server
,
contLen
,
header
,
sizeof
(
header
),
flag
);
if
(
taosWriteSocket
(
fd
,
(
void
*
)
header
,
headLen
)
<
0
)
{
if
(
taosWriteSocket
(
fd
,
header
,
headLen
)
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to send http header to %s:%u since %s"
,
server
,
port
,
terrstr
());
goto
SEND_OVER
;
...
...
@@ -63,7 +222,6 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont,
goto
SEND_OVER
;
}
uTrace
(
"send http to %s:%u, len:%d content: %s"
,
server
,
port
,
contLen
,
pCont
);
code
=
0
;
SEND_OVER:
...
...
@@ -73,3 +231,5 @@ SEND_OVER:
return
code
;
}
#endif
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录