Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
e8933e23
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e8933e23
编写于
7月 06, 2020
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into feature/2.0tsdb
上级
c86b6877
2418c537
变更
44
展开全部
隐藏空白更改
内联
并排
Showing
44 changed file
with
406 addition
and
241 deletion
+406
-241
cmake/define.inc
cmake/define.inc
+4
-0
cmake/input.inc
cmake/input.inc
+5
-0
src/client/inc/tscLog.h
src/client/inc/tscLog.h
+1
-3
src/client/src/TSDBJNIConnector.c
src/client/src/TSDBJNIConnector.c
+0
-3
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+1
-1
src/client/src/tscLocalMerge.c
src/client/src/tscLocalMerge.c
+3
-5
src/client/src/tscPrepare.c
src/client/src/tscPrepare.c
+1
-1
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+6
-4
src/client/src/tscStream.c
src/client/src/tscStream.c
+1
-1
src/common/inc/tname.h
src/common/inc/tname.h
+2
-0
src/common/inc/tulog.h
src/common/inc/tulog.h
+0
-3
src/common/src/tglobal.c
src/common/src/tglobal.c
+1
-1
src/common/src/tname.c
src/common/src/tname.c
+30
-0
src/dnode/src/dnodeMgmt.c
src/dnode/src/dnodeMgmt.c
+10
-6
src/dnode/src/dnodeVWrite.c
src/dnode/src/dnodeVWrite.c
+5
-0
src/inc/taosmsg.h
src/inc/taosmsg.h
+2
-0
src/inc/tsdb.h
src/inc/tsdb.h
+3
-1
src/inc/vnode.h
src/inc/vnode.h
+3
-0
src/mnode/src/mnodeVgroup.c
src/mnode/src/mnodeVgroup.c
+2
-0
src/os/linux/inc/os.h
src/os/linux/inc/os.h
+19
-0
src/os/linux/src/linuxPlatform.c
src/os/linux/src/linuxPlatform.c
+46
-0
src/os/linux/src/linuxSysPara.c
src/os/linux/src/linuxSysPara.c
+9
-10
src/plugins/http/inc/httpLog.h
src/plugins/http/inc/httpLog.h
+1
-3
src/plugins/http/src/httpHandle.c
src/plugins/http/src/httpHandle.c
+3
-3
src/plugins/http/src/httpServer.c
src/plugins/http/src/httpServer.c
+3
-3
src/plugins/http/src/httpSql.c
src/plugins/http/src/httpSql.c
+4
-4
src/plugins/monitor/src/monitorMain.c
src/plugins/monitor/src/monitorMain.c
+0
-3
src/plugins/mqtt/inc/mqttLog.h
src/plugins/mqtt/inc/mqttLog.h
+0
-3
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+7
-2
src/query/inc/qfill.h
src/query/inc/qfill.h
+0
-2
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+154
-98
src/query/src/qUtil.c
src/query/src/qUtil.c
+25
-10
src/query/src/qextbuffer.c
src/query/src/qextbuffer.c
+2
-1
src/query/src/qfill.c
src/query/src/qfill.c
+1
-36
src/rpc/inc/rpcLog.h
src/rpc/inc/rpcLog.h
+1
-3
src/rpc/src/rpcMain.c
src/rpc/src/rpcMain.c
+2
-2
src/tsdb/inc/tsdbMain.h
src/tsdb/inc/tsdbMain.h
+25
-2
src/tsdb/src/tsdbMeta.c
src/tsdb/src/tsdbMeta.c
+2
-0
src/util/inc/tlog.h
src/util/inc/tlog.h
+1
-0
src/util/src/tfile.c
src/util/src/tfile.c
+4
-4
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+10
-21
src/vnode/src/vnodeRead.c
src/vnode/src/vnodeRead.c
+1
-1
src/vnode/src/vnodeWrite.c
src/vnode/src/vnodeWrite.c
+5
-0
tests/comparisonTest/opentsdb/opentsdbtest/pom.xml
tests/comparisonTest/opentsdb/opentsdbtest/pom.xml
+1
-1
未找到文件。
cmake/define.inc
浏览文件 @
e8933e23
...
...
@@ -28,3 +28,7 @@ ENDIF ()
IF
(
TD_RANDOM_FILE_FAIL
)
ADD_DEFINITIONS
(
-
DTAOS_RANDOM_FILE_FAIL
)
ENDIF
()
IF
(
TD_RANDOM_NETWORK_FAIL
)
ADD_DEFINITIONS
(
-
DTAOS_RANDOM_NETWORK_FAIL
)
ENDIF
()
cmake/input.inc
浏览文件 @
e8933e23
...
...
@@ -36,3 +36,8 @@ IF (${RANDOM_FILE_FAIL} MATCHES "true")
SET
(
TD_RANDOM_FILE_FAIL
TRUE
)
MESSAGE
(
STATUS
"build with random-file-fail enabled"
)
ENDIF
()
IF
(
$
{
RANDOM_NETWORK_FAIL
}
MATCHES
"true"
)
SET
(
TD_RANDOM_NETWORK_FAIL
TRUE
)
MESSAGE
(
STATUS
"build with random-network-fail enabled"
)
ENDIF
()
src/client/inc/tscLog.h
浏览文件 @
e8933e23
...
...
@@ -31,9 +31,7 @@ extern int32_t tscEmbedded;
#define tscInfo(...) { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC INFO ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }}
#define tscDebug(...) { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC DEBUG ", cDebugFlag, __VA_ARGS__); }}
#define tscTrace(...) { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC TRACE ", cDebugFlag, __VA_ARGS__); }}
#define tscDebugDump(...) { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC DEBUG ", cDebugFlag, __VA_ARGS__); }}
#define tscTraceDump(...) { if (cDebugFlag & DEBUG_TRACE) { taosPrintLongString("TSC TRACE ", cDebugFlag, __VA_ARGS__); }}
#define tscDebugL(...){ if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC DEBUG ", cDebugFlag, __VA_ARGS__); }}
#ifdef __cplusplus
}
...
...
src/client/src/TSDBJNIConnector.c
浏览文件 @
e8933e23
...
...
@@ -29,9 +29,6 @@
#define jniDebug(...) { if (jniDebugFlag & DEBUG_DEBUG) { taosPrintLog("JNI DEBUG ", jniDebugFlag, __VA_ARGS__); }}
#define jniTrace(...) { if (jniDebugFlag & DEBUG_TRACE) { taosPrintLog("JNI TRACE ", jniDebugFlag, __VA_ARGS__); }}
#define jniDebugDump(...) { if (jniDebugFlag & DEBUG_DEBUG) { taosPrintLongString("JNI DEBUG ", jniDebugFlag, __VA_ARGS__); }}
#define jniTraceDump(...) { if (jniDebugFlag & DEBUG_TRACE) { taosPrintLongString("JNI DEBUG ", jniDebugFlag, __VA_ARGS__); }}
int
__init
=
0
;
JavaVM
*
g_vm
=
NULL
;
...
...
src/client/src/tscAsync.c
浏览文件 @
e8933e23
...
...
@@ -55,7 +55,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
strtolower
(
pSql
->
sqlstr
,
sqlstr
);
tscDebug
Dump
(
"%p SQL: %s"
,
pSql
,
pSql
->
sqlstr
);
tscDebug
L
(
"%p SQL: %s"
,
pSql
,
pSql
->
sqlstr
);
pSql
->
cmd
.
curSql
=
pSql
->
sqlstr
;
int32_t
code
=
tsParseSql
(
pSql
,
true
);
...
...
src/client/src/tscLocalMerge.c
浏览文件 @
e8933e23
...
...
@@ -364,7 +364,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
TSKEY
stime
=
MIN
(
pQueryInfo
->
window
.
skey
,
pQueryInfo
->
window
.
ekey
);
int64_t
revisedSTime
=
taosGetIntervalStartTimestamp
(
stime
,
pQueryInfo
->
intervalTime
,
pQueryInfo
->
slidingTimeUnit
,
tinfo
.
precision
);
taosGetIntervalStartTimestamp
(
stime
,
pQueryInfo
->
slidingTime
,
pQueryInfo
->
intervalTime
,
pQueryInfo
->
slidingTimeUnit
,
tinfo
.
precision
);
if
(
pQueryInfo
->
fillType
!=
TSDB_FILL_NONE
)
{
SFillColInfo
*
pFillCol
=
createFillColInfo
(
pQueryInfo
);
...
...
@@ -831,7 +831,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo
if
(
pFillInfo
!=
NULL
)
{
int64_t
stime
=
(
pQueryInfo
->
window
.
skey
<
pQueryInfo
->
window
.
ekey
)
?
pQueryInfo
->
window
.
skey
:
pQueryInfo
->
window
.
ekey
;
int64_t
revisedSTime
=
taosGetIntervalStartTimestamp
(
stime
,
pQueryInfo
->
slidingTime
,
pQueryInfo
->
slidingTimeUnit
,
tinfo
.
precision
);
taosGetIntervalStartTimestamp
(
stime
,
pQueryInfo
->
slidingTime
,
pQueryInfo
->
intervalTime
,
pQueryInfo
->
slidingTimeUnit
,
tinfo
.
precision
);
taosResetFillInfo
(
pFillInfo
,
revisedSTime
);
}
...
...
@@ -1301,9 +1301,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer
if
(
pQueryInfo
->
fillType
!=
TSDB_FILL_NONE
)
{
TSKEY
skey
=
MIN
(
pQueryInfo
->
window
.
skey
,
pQueryInfo
->
window
.
ekey
);
int64_t
newTime
=
taosGetIntervalStartTimestamp
(
skey
,
pQueryInfo
->
intervalTime
,
pQueryInfo
->
slidingTimeUnit
,
precision
);
// taosResetFillInfo(pLocalReducer->pFillInfo, pQueryInfo->order.order, newTime,
// pQueryInfo->groupbyExpr.numOfGroupCols, 4096, 0, NULL, pLocalReducer->rowSize);
taosGetIntervalStartTimestamp
(
skey
,
pQueryInfo
->
slidingTime
,
pQueryInfo
->
intervalTime
,
pQueryInfo
->
slidingTimeUnit
,
precision
);
taosResetFillInfo
(
pLocalReducer
->
pFillInfo
,
newTime
);
}
}
...
...
src/client/src/tscPrepare.c
浏览文件 @
e8933e23
...
...
@@ -538,7 +538,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
pRes
->
numOfRows
=
1
;
strtolower
(
pSql
->
sqlstr
,
sql
);
tscDebug
Dump
(
"%p SQL: %s"
,
pSql
,
pSql
->
sqlstr
);
tscDebug
L
(
"%p SQL: %s"
,
pSql
,
pSql
->
sqlstr
);
if
(
tscIsInsertData
(
pSql
->
sqlstr
))
{
pStmt
->
isInsert
=
true
;
...
...
src/client/src/tscSQLParser.c
浏览文件 @
e8933e23
...
...
@@ -4487,10 +4487,12 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
SUpdateTableTagValMsg
*
pUpdateMsg
=
(
SUpdateTableTagValMsg
*
)
pCmd
->
payload
;
pUpdateMsg
->
head
.
vgId
=
htonl
(
pTableMeta
->
vgroupInfo
.
vgId
);
pUpdateMsg
->
tid
=
htonl
(
pTableMeta
->
sid
);
pUpdateMsg
->
uid
=
htobe64
(
pTableMeta
->
uid
);
pUpdateMsg
->
colId
=
htons
(
pTagsSchema
->
colId
);
pUpdateMsg
->
tversion
=
htons
(
pTableMeta
->
tversion
);
pUpdateMsg
->
tid
=
htonl
(
pTableMeta
->
sid
);
pUpdateMsg
->
uid
=
htobe64
(
pTableMeta
->
uid
);
pUpdateMsg
->
colId
=
htons
(
pTagsSchema
->
colId
);
pUpdateMsg
->
type
=
htons
(
pTagsSchema
->
type
);
pUpdateMsg
->
bytes
=
htons
(
pTagsSchema
->
bytes
);
pUpdateMsg
->
tversion
=
htons
(
pTableMeta
->
tversion
);
pUpdateMsg
->
numOfTags
=
htons
(
numOfTags
);
pUpdateMsg
->
schemaLen
=
htonl
(
schemaLen
);
...
...
src/client/src/tscStream.c
浏览文件 @
e8933e23
...
...
@@ -503,7 +503,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
}
strtolower
(
pSql
->
sqlstr
,
sqlstr
);
tscDebug
Dump
(
"%p SQL: %s"
,
pSql
,
pSql
->
sqlstr
);
tscDebug
L
(
"%p SQL: %s"
,
pSql
,
pSql
->
sqlstr
);
tsem_init
(
&
pSql
->
rspSem
,
0
,
0
);
int32_t
code
=
tsParseSql
(
pSql
,
true
);
...
...
src/common/inc/tname.h
浏览文件 @
e8933e23
...
...
@@ -29,4 +29,6 @@ bool tscValidateTableNameLength(size_t len);
SColumnFilterInfo
*
tscFilterInfoClone
(
const
SColumnFilterInfo
*
src
,
int32_t
numOfFilters
);
int64_t
taosGetIntervalStartTimestamp
(
int64_t
startTime
,
int64_t
slidingTime
,
int64_t
intervalTime
,
char
timeUnit
,
int16_t
precision
);
#endif // TDENGINE_NAME_H
src/common/inc/tulog.h
浏览文件 @
e8933e23
...
...
@@ -32,9 +32,6 @@ extern int32_t tscEmbedded;
#define uDebug(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLog("UTL DEBUG ", uDebugFlag, __VA_ARGS__); }}
#define uTrace(...) { if (uDebugFlag & DEBUG_TRACE) { taosPrintLog("UTL TRACE ", uDebugFlag, __VA_ARGS__); }}
#define uDebugDump(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLongString("UTL DEBUG ", uDebugFlag, __VA_ARGS__); }}
#define uTraceDump(...) { if (uDebugFlag & DEBUG_TRACE) { taosPrintLongString("UTL TRACE ", uDebugFlag, __VA_ARGS__); }}
#define pError(...) { taosPrintLog("APP ERROR ", 255, __VA_ARGS__); }
#define pPrint(...) { taosPrintLog("APP INFO ", 255, __VA_ARGS__); }
...
...
src/common/src/tglobal.c
浏览文件 @
e8933e23
...
...
@@ -1210,7 +1210,7 @@ void taosInitGlobalCfg() {
}
bool
taosCheckGlobalCfg
()
{
if
(
debugFlag
&
DEBUG_TRACE
||
debugFlag
&
DEBUG_DEBUG
)
{
if
(
debugFlag
&
DEBUG_TRACE
||
debugFlag
&
DEBUG_DEBUG
||
debugFlag
&
DEBUG_DUMP
)
{
taosSetAllDebugFlag
();
}
...
...
src/common/src/tname.c
浏览文件 @
e8933e23
...
...
@@ -75,3 +75,33 @@ SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numO
return
pFilter
;
}
int64_t
taosGetIntervalStartTimestamp
(
int64_t
startTime
,
int64_t
slidingTime
,
int64_t
intervalTime
,
char
timeUnit
,
int16_t
precision
)
{
if
(
slidingTime
==
0
)
{
return
startTime
;
}
int64_t
start
=
((
startTime
-
intervalTime
)
/
slidingTime
+
1
)
*
slidingTime
;
if
(
!
(
timeUnit
==
'a'
||
timeUnit
==
'm'
||
timeUnit
==
's'
||
timeUnit
==
'h'
))
{
/*
* here we revised the start time of day according to the local time zone,
* but in case of DST, the start time of one day need to be dynamically decided.
*/
// todo refactor to extract function that is available for Linux/Windows/Mac platform
#if defined(WINDOWS) && _MSC_VER >= 1900
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
int64_t
timezone
=
_timezone
;
int32_t
daylight
=
_daylight
;
char
**
tzname
=
_tzname
;
#endif
int64_t
t
=
(
precision
==
TSDB_TIME_PRECISION_MILLI
)
?
MILLISECOND_PER_SECOND
:
MILLISECOND_PER_SECOND
*
1000L
;
start
+=
timezone
*
t
;
}
int64_t
end
=
start
+
intervalTime
-
1
;
if
(
end
<
startTime
)
{
start
+=
slidingTime
;
}
return
start
;
}
src/dnode/src/dnodeMgmt.c
浏览文件 @
e8933e23
...
...
@@ -106,6 +106,12 @@ int32_t dnodeInitMgmt() {
}
}
int32_t
code
=
vnodeInitResources
();
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
dnodeCleanupMgmt
();
return
-
1
;
}
// create the queue and thread to handle the message
tsMgmtQset
=
taosOpenQset
();
if
(
tsMgmtQset
==
NULL
)
{
...
...
@@ -127,7 +133,7 @@ int32_t dnodeInitMgmt() {
pthread_attr_init
(
&
thAttr
);
pthread_attr_setdetachstate
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
int32_t
code
=
pthread_create
(
&
tsQthread
,
&
thAttr
,
dnodeProcessMgmtQueue
,
NULL
);
code
=
pthread_create
(
&
tsQthread
,
&
thAttr
,
dnodeProcessMgmtQueue
,
NULL
);
pthread_attr_destroy
(
&
thAttr
);
if
(
code
!=
0
)
{
dError
(
"failed to create thread to process mgmt queue, reason:%s"
,
strerror
(
errno
));
...
...
@@ -282,13 +288,12 @@ static void *dnodeOpenVnode(void *param) {
}
static
int32_t
dnodeOpenVnodes
()
{
int32_t
*
vnodeList
=
calloc
(
TSDB_MAX_VNODES
,
sizeof
(
int32_t
))
;
int32_t
vnodeList
[
TSDB_MAX_VNODES
]
=
{
0
}
;
int32_t
numOfVnodes
=
0
;
int32_t
status
=
dnodeGetVnodeList
(
vnodeList
,
&
numOfVnodes
);
if
(
status
!=
TSDB_CODE_SUCCESS
)
{
dInfo
(
"get dnode list failed"
);
free
(
vnodeList
);
return
status
;
}
...
...
@@ -334,7 +339,6 @@ static int32_t dnodeOpenVnodes() {
free
(
pThread
->
vnodeList
);
}
free
(
vnodeList
);
free
(
threads
);
dInfo
(
"there are total vnodes:%d, openned:%d failed:%d"
,
numOfVnodes
,
openVnodes
,
failedVnodes
);
...
...
@@ -342,7 +346,7 @@ static int32_t dnodeOpenVnodes() {
}
void
dnodeStartStream
()
{
int32_t
vnodeList
[
TSDB_MAX_VNODES
];
int32_t
vnodeList
[
TSDB_MAX_VNODES
]
=
{
0
}
;
int32_t
numOfVnodes
=
0
;
int32_t
status
=
vnodeGetVnodeList
(
vnodeList
,
&
numOfVnodes
);
...
...
@@ -359,7 +363,7 @@ void dnodeStartStream() {
}
static
void
dnodeCloseVnodes
()
{
int32_t
vnodeList
[
TSDB_MAX_VNODES
];
int32_t
vnodeList
[
TSDB_MAX_VNODES
]
=
{
0
}
;
int32_t
numOfVnodes
=
0
;
int32_t
status
;
...
...
src/dnode/src/dnodeVWrite.c
浏览文件 @
e8933e23
...
...
@@ -247,6 +247,11 @@ static void *dnodeProcessWriteQueue(void *param) {
if
(
type
==
TAOS_QTYPE_RPC
)
{
pWrite
=
(
SWriteMsg
*
)
item
;
dnodeSendRpcVnodeWriteRsp
(
pVnode
,
item
,
pWrite
->
rpcMsg
.
code
);
}
else
if
(
type
==
TAOS_QTYPE_FWD
)
{
pHead
=
(
SWalHead
*
)
item
;
vnodeConfirmForward
(
pVnode
,
pHead
->
version
,
0
);
taosFreeQitem
(
item
);
vnodeRelease
(
pVnode
);
}
else
{
taosFreeQitem
(
item
);
vnodeRelease
(
pVnode
);
...
...
src/inc/taosmsg.h
浏览文件 @
e8933e23
...
...
@@ -284,6 +284,8 @@ typedef struct {
int32_t
tid
;
int16_t
tversion
;
int16_t
colId
;
int16_t
type
;
int16_t
bytes
;
int32_t
tagValLen
;
int16_t
numOfTags
;
int32_t
schemaLen
;
...
...
src/inc/tsdb.h
浏览文件 @
e8933e23
...
...
@@ -108,7 +108,9 @@ void tsdbClearTableCfg(STableCfg *config);
void
*
tsdbGetTableTagVal
(
const
void
*
pTable
,
int32_t
colId
,
int16_t
type
,
int16_t
bytes
);
char
*
tsdbGetTableName
(
void
*
pTable
);
STableId
tsdbGetTableId
(
void
*
pTable
);
#define TSDB_TABLEID(_table) ((STableId*) (_table))
STableCfg
*
tsdbCreateTableCfgFromMsg
(
SMDCreateTableMsg
*
pMsg
);
int
tsdbCreateTable
(
TSDB_REPO_T
*
repo
,
STableCfg
*
pCfg
);
...
...
src/inc/vnode.h
浏览文件 @
e8933e23
...
...
@@ -60,7 +60,10 @@ void* vnodeGetWal(void *pVnode);
int32_t
vnodeProcessWrite
(
void
*
pVnode
,
int
qtype
,
void
*
pHead
,
void
*
item
);
int32_t
vnodeGetVnodeList
(
int32_t
vnodeList
[],
int32_t
*
numOfVnodes
);
void
vnodeBuildStatusMsg
(
void
*
param
);
void
vnodeConfirmForward
(
void
*
param
,
uint64_t
version
,
int32_t
code
);
void
vnodeSetAccess
(
SDMVgroupAccess
*
pAccess
,
int32_t
numOfVnodes
);
int32_t
vnodeInitResources
();
void
vnodeCleanupResources
();
int32_t
vnodeProcessRead
(
void
*
pVnode
,
SReadMsg
*
pReadMsg
);
...
...
src/mnode/src/mnodeVgroup.c
浏览文件 @
e8933e23
...
...
@@ -165,6 +165,8 @@ static int32_t mnodeVgroupActionUpdate(SSdbOper *pOper) {
}
mnodeDecDnodeRef
(
pDnode
);
}
free
(
pNew
);
}
mnodeVgroupUpdateIdPool
(
pVgroup
);
...
...
src/os/linux/inc/os.h
浏览文件 @
e8933e23
...
...
@@ -86,9 +86,28 @@ extern "C" {
} \
}
#ifdef TAOS_RANDOM_NETWORK_FAIL
ssize_t
taos_send_random_fail
(
int
sockfd
,
const
void
*
buf
,
size_t
len
,
int
flags
);
ssize_t
taos_sendto_random_fail
(
int
sockfd
,
const
void
*
buf
,
size_t
len
,
int
flags
,
const
struct
sockaddr
*
dest_addr
,
socklen_t
addrlen
);
ssize_t
taos_read_random_fail
(
int
fd
,
void
*
buf
,
size_t
count
);
ssize_t
taos_write_random_fail
(
int
fd
,
const
void
*
buf
,
size_t
count
);
#define send(sockfd, buf, len, flags) taos_send_random_fail(sockfd, buf, len, flags)
#define sendto(sockfd, buf, len, flags, dest_addr, addrlen) \
taos_sendto_random_fail(sockfd, buf, len, flags, dest_addr, addrlen)
#define taosWriteSocket(fd, buf, len) taos_write_random_fail(fd, buf, len)
#define taosReadSocket(fd, buf, len) taos_read_random_fail(fd, buf, len)
#else
#define taosWriteSocket(fd, buf, len) write(fd, buf, len)
#define taosReadSocket(fd, buf, len) read(fd, buf, len)
#endif
/* TAOS_RANDOM_NETWORK_FAIL */
#define atomic_load_8(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_16(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_32(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
...
...
src/os/linux/src/linuxPlatform.c
浏览文件 @
e8933e23
...
...
@@ -270,3 +270,49 @@ int tSystem(const char * cmd)
}
}
#ifdef TAOS_RANDOM_NETWORK_FAIL
#define RANDOM_NETWORK_FAIL_FACTOR 20
ssize_t
taos_send_random_fail
(
int
sockfd
,
const
void
*
buf
,
size_t
len
,
int
flags
)
{
if
(
rand
()
%
RANDOM_NETWORK_FAIL_FACTOR
==
0
)
{
errno
=
ECONNRESET
;
return
-
1
;
}
return
send
(
sockfd
,
buf
,
len
,
flags
);
}
ssize_t
taos_sendto_random_fail
(
int
sockfd
,
const
void
*
buf
,
size_t
len
,
int
flags
,
const
struct
sockaddr
*
dest_addr
,
socklen_t
addrlen
)
{
if
(
rand
()
%
RANDOM_NETWORK_FAIL_FACTOR
==
0
)
{
errno
=
ECONNRESET
;
return
-
1
;
}
return
sendto
(
sockfd
,
buf
,
len
,
flags
,
dest_addr
,
addrlen
);
}
ssize_t
taos_read_random_fail
(
int
fd
,
void
*
buf
,
size_t
count
)
{
if
(
rand
()
%
RANDOM_NETWORK_FAIL_FACTOR
==
0
)
{
errno
=
ECONNRESET
;
return
-
1
;
}
return
read
(
fd
,
buf
,
count
);
}
ssize_t
taos_write_random_fail
(
int
fd
,
const
void
*
buf
,
size_t
count
)
{
if
(
rand
()
%
RANDOM_NETWORK_FAIL_FACTOR
==
0
)
{
errno
=
EINTR
;
return
-
1
;
}
return
write
(
fd
,
buf
,
count
);
}
#endif
/* TAOS_RANDOM_NETWORK_FAIL */
src/os/linux/src/linuxSysPara.c
浏览文件 @
e8933e23
...
...
@@ -160,7 +160,7 @@ static void taosGetSystemTimezone() {
/* load time zone string from /etc/timezone */
FILE
*
f
=
fopen
(
"/etc/timezone"
,
"r"
);
char
buf
[
6
5
]
=
{
0
};
char
buf
[
6
8
]
=
{
0
};
if
(
f
!=
NULL
)
{
int
len
=
fread
(
buf
,
64
,
1
,
f
);
if
(
len
<
64
&&
ferror
(
f
))
{
...
...
@@ -170,18 +170,17 @@ static void taosGetSystemTimezone() {
}
fclose
(
f
);
}
char
*
lineEnd
=
strstr
(
buf
,
"
\n
"
);
if
(
lineEnd
!=
NULL
)
{
*
lineEnd
=
0
;
}
char
*
lineEnd
=
strstr
(
buf
,
"
\n
"
);
if
(
lineEnd
!=
NULL
)
{
*
lineEnd
=
0
;
}
// for CentOS system, /etc/timezone does not exist. Ignore the TZ environment variables
if
(
strlen
(
buf
)
>
0
)
{
setenv
(
"TZ"
,
buf
,
1
);
// for CentOS system, /etc/timezone does not exist. Ignore the TZ environment variables
if
(
strlen
(
buf
)
>
0
)
{
setenv
(
"TZ"
,
buf
,
1
);
}
}
// get and set default timezone
tzset
();
...
...
src/plugins/http/inc/httpLog.h
浏览文件 @
e8933e23
...
...
@@ -26,8 +26,6 @@ extern int32_t httpDebugFlag;
#define httpInfo(...) { if (httpDebugFlag & DEBUG_INFO) { taosPrintLog("HTP INFO ", 255, __VA_ARGS__); }}
#define httpDebug(...) { if (httpDebugFlag & DEBUG_DEBUG) { taosPrintLog("HTP DEBUG ", httpDebugFlag, __VA_ARGS__); }}
#define httpTrace(...) { if (httpDebugFlag & DEBUG_TRACE) { taosPrintLog("HTP TRACE ", httpDebugFlag, __VA_ARGS__); }}
#define httpDebugDump(...) { if (httpDebugFlag & DEBUG_DEBUG) { taosPrintLongString("HTP DEBUG ", httpDebugFlag, __VA_ARGS__); }}
#define httpTraceDump(...) { if (httpDebugFlag & DEBUG_TRACE) { taosPrintLongString("HTP TRACE ", httpDebugFlag, __VA_ARGS__); }}
#define httpTraceL(...){ if (httpDebugFlag & DEBUG_TRACE) { taosPrintLongString("HTP TRACE ", httpDebugFlag, __VA_ARGS__); }}
#endif
src/plugins/http/src/httpHandle.c
浏览文件 @
e8933e23
...
...
@@ -313,9 +313,9 @@ bool httpParseRequest(HttpContext* pContext) {
return
true
;
}
httpTrace
Dump
(
"context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, read size:%d, raw data:
\n
%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
pThread
->
label
,
pContext
->
pThread
->
numOfFds
,
pContext
->
parser
.
bufsize
,
pContext
->
parser
.
buffer
);
httpTrace
L
(
"context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, read size:%d, raw data:
\n
%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
pThread
->
label
,
pContext
->
pThread
->
numOfFds
,
pContext
->
parser
.
bufsize
,
pContext
->
parser
.
buffer
);
if
(
!
httpGetHttpMethod
(
pContext
))
{
return
false
;
...
...
src/plugins/http/src/httpServer.c
浏览文件 @
e8933e23
...
...
@@ -108,7 +108,7 @@ bool httpReadDataImp(HttpContext *pContext) {
static
bool
httpDecompressData
(
HttpContext
*
pContext
)
{
if
(
pContext
->
contentEncoding
!=
HTTP_COMPRESS_GZIP
)
{
httpTrace
Dump
(
"context:%p, fd:%d, ip:%s, content:%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
parser
.
data
.
pos
);
httpTrace
L
(
"context:%p, fd:%d, ip:%s, content:%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
parser
.
data
.
pos
);
return
true
;
}
...
...
@@ -124,8 +124,8 @@ static bool httpDecompressData(HttpContext *pContext) {
if
(
ret
==
0
)
{
memcpy
(
pContext
->
parser
.
data
.
pos
,
decompressBuf
,
decompressBufLen
);
pContext
->
parser
.
data
.
pos
[
decompressBufLen
]
=
0
;
httpTrace
Dump
(
"context:%p, fd:%d, ip:%s, rawSize:%d, decompressSize:%d, content:%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
parser
.
data
.
len
,
decompressBufLen
,
decompressBuf
);
httpTrace
L
(
"context:%p, fd:%d, ip:%s, rawSize:%d, decompressSize:%d, content:%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
parser
.
data
.
len
,
decompressBufLen
,
decompressBuf
);
pContext
->
parser
.
data
.
len
=
decompressBufLen
;
}
else
{
httpError
(
"context:%p, fd:%d, ip:%s, failed to decompress data, rawSize:%d, error:%d"
,
...
...
src/plugins/http/src/httpSql.c
浏览文件 @
e8933e23
...
...
@@ -166,8 +166,8 @@ void httpProcessMultiSql(HttpContext *pContext) {
HttpSqlCmd
*
cmd
=
multiCmds
->
cmds
+
multiCmds
->
pos
;
char
*
sql
=
httpGetCmdsString
(
pContext
,
cmd
->
sql
);
httpTrace
Dump
(
"context:%p, fd:%d, ip:%s, user:%s, process pos:%d, start query, sql:%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
user
,
multiCmds
->
pos
,
sql
);
httpTrace
L
(
"context:%p, fd:%d, ip:%s, user:%s, process pos:%d, start query, sql:%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
user
,
multiCmds
->
pos
,
sql
);
taosNotePrintHttp
(
sql
);
taos_query_a
(
pContext
->
session
->
taos
,
sql
,
httpProcessMultiSqlCallBack
,
(
void
*
)
pContext
);
}
...
...
@@ -306,8 +306,8 @@ void httpProcessSingleSqlCmd(HttpContext *pContext) {
return
;
}
httpTrace
Dump
(
"context:%p, fd:%d, ip:%s, user:%s, start query, sql:%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
user
,
sql
);
httpTrace
L
(
"context:%p, fd:%d, ip:%s, user:%s, start query, sql:%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
user
,
sql
);
taosNotePrintHttp
(
sql
);
taos_query_a
(
pSession
->
taos
,
sql
,
httpProcessSingleSqlCallBack
,
(
void
*
)
pContext
);
}
...
...
src/plugins/monitor/src/monitorMain.c
浏览文件 @
e8933e23
...
...
@@ -35,9 +35,6 @@
#define monitorDebug(...) { if (monitorDebugFlag & DEBUG_DEBUG) { taosPrintLog("MON DEBUG ", monitorDebugFlag, __VA_ARGS__); }}
#define monitorTrace(...) { if (monitorDebugFlag & DEBUG_TRACE) { taosPrintLog("MON TRACE ", monitorDebugFlag, __VA_ARGS__); }}
#define monitorDebugDump(...) { if (monitorDebugFlag & DEBUG_DEBUG) { taosPrintLongString("MON DEBUG ", monitorDebugFlag, __VA_ARGS__); }}
#define monitorTraceDump(...) { if (monitorDebugFlag & DEBUG_TRACE) { taosPrintLongString("MON TRACE ", monitorDebugFlag, __VA_ARGS__); }}
#define SQL_LENGTH 1024
#define LOG_LEN_STR 100
#define IP_LEN_STR 18
...
...
src/plugins/mqtt/inc/mqttLog.h
浏览文件 @
e8933e23
...
...
@@ -27,7 +27,4 @@ extern int32_t mqttDebugFlag;
#define mqttDebug(...) { if (mqttDebugFlag & DEBUG_DEBUG) { taosPrintLog("MQT DEBUG ", mqttDebugFlag, __VA_ARGS__); }}
#define mqttTrace(...) { if (mqttDebugFlag & DEBUG_TRACE) { taosPrintLog("MQT TRACE ", mqttDebugFlag, __VA_ARGS__); }}
#define mqttDebugDump(...) { if (mqttDebugFlag & DEBUG_DEBUG) { taosPrintLongString("MQT DEBUG ", mqttDebugFlag, __VA_ARGS__); }}
#define mqttTraceDump(...) { if (mqttDebugFlag & DEBUG_TRACE) { taosPrintLongString("MQT DEBUG ", mqttDebugFlag, __VA_ARGS__); }}
#endif
src/query/inc/qExecutor.h
浏览文件 @
e8933e23
...
...
@@ -154,6 +154,7 @@ typedef struct SQuery {
}
SQuery
;
typedef
struct
SQueryRuntimeEnv
{
jmp_buf
env
;
SResultInfo
*
resultInfo
;
// todo refactor to merge with SWindowResInfo
SQuery
*
pQuery
;
SQLFunctionCtx
*
pCtx
;
...
...
@@ -169,6 +170,8 @@ typedef struct SQueryRuntimeEnv {
void
*
pSecQueryHandle
;
// another thread for
bool
stableQuery
;
// super table query or not
bool
topBotQuery
;
// false
bool
groupbyNormalCol
;
// denote if this is a groupby normal column query
bool
hasTagResults
;
// if there are tag values in final result or not
int32_t
prevGroupId
;
// previous executed group id
SDiskbasedResultBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
}
SQueryRuntimeEnv
;
...
...
@@ -197,8 +200,10 @@ typedef struct SQInfo {
*/
int32_t
tableIndex
;
int32_t
numOfGroupResultPages
;
_qinfo_free_fn_t
freeFn
;
jmp_buf
env
;
_qinfo_free_fn_t
freeFn
;
//todo remove it
void
*
pBuf
;
// allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
}
SQInfo
;
#endif // TDENGINE_QUERYEXECUTOR_H
src/query/inc/qfill.h
浏览文件 @
e8933e23
...
...
@@ -60,8 +60,6 @@ typedef struct SPoint {
void
*
val
;
}
SPoint
;
int64_t
taosGetIntervalStartTimestamp
(
int64_t
startTime
,
int64_t
slidingTime
,
char
timeUnit
,
int16_t
precision
);
SFillInfo
*
taosInitFillInfo
(
int32_t
order
,
TSKEY
skey
,
int32_t
numOfTags
,
int32_t
capacity
,
int32_t
numOfCols
,
int64_t
slidingTime
,
int8_t
slidingUnit
,
int8_t
precision
,
int32_t
fillType
,
SFillColInfo
*
pFillCol
);
...
...
src/query/src/qExecutor.c
浏览文件 @
e8933e23
此差异已折叠。
点击以展开。
src/query/src/qUtil.c
浏览文件 @
e8933e23
...
...
@@ -32,7 +32,6 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
pWindowResInfo
->
threshold
=
threshold
;
pWindowResInfo
->
type
=
type
;
_hash_fn_t
fn
=
taosGetDefaultHashFunction
(
type
);
pWindowResInfo
->
hashList
=
taosHashInit
(
threshold
,
fn
,
false
);
...
...
@@ -54,7 +53,8 @@ void destroyTimeWindowRes(SWindowResult *pWindowRes, int32_t nOutputCols) {
if
(
pWindowRes
==
NULL
)
{
return
;
}
// TODO opt malloc strategy
for
(
int32_t
i
=
0
;
i
<
nOutputCols
;
++
i
)
{
free
(
pWindowRes
->
resultInfo
[
i
].
interResultBuf
);
}
...
...
@@ -180,19 +180,34 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) {
/*
* remove the results that are not the FIRST time window that spreads beyond the
* the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time
* the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time.
* NOTE: remove redundant, only when the result set order equals to traverse order
*/
void
removeRedundantWindow
(
SWindowResInfo
*
pWindowResInfo
,
TSKEY
lastKey
,
int32_t
order
)
{
assert
(
pWindowResInfo
->
size
>=
0
&&
pWindowResInfo
->
capacity
>=
pWindowResInfo
->
size
);
if
(
pWindowResInfo
->
size
<=
1
)
{
return
;
}
// get the result order
int32_t
resultOrder
=
(
pWindowResInfo
->
pResult
[
0
].
window
.
skey
<
pWindowResInfo
->
pResult
[
1
].
window
.
skey
)
?
TSDB_ORDER_ASC:
TSDB_ORDER_DESC
;
if
(
order
!=
resultOrder
)
{
return
;
}
int32_t
i
=
0
;
while
(
i
<
pWindowResInfo
->
size
&&
((
pWindowResInfo
->
pResult
[
i
].
window
.
ekey
<
lastKey
&&
order
==
QUERY_ASC_FORWARD_STEP
)
||
(
pWindowResInfo
->
pResult
[
i
].
window
.
skey
>
lastKey
&&
order
==
QUERY_DESC_FORWARD_STEP
)))
{
++
i
;
if
(
order
==
QUERY_ASC_FORWARD_STEP
)
{
while
(
i
<
pWindowResInfo
->
size
&&
(
pWindowResInfo
->
pResult
[
i
].
window
.
ekey
<
lastKey
))
{
++
i
;
}
}
else
if
(
order
==
QUERY_DESC_FORWARD_STEP
)
{
while
(
i
<
pWindowResInfo
->
size
&&
(
pWindowResInfo
->
pResult
[
i
].
window
.
skey
>
lastKey
))
{
++
i
;
}
}
// assert(i < pWindowResInfo->size);
if
(
i
<
pWindowResInfo
->
size
)
{
pWindowResInfo
->
size
=
(
i
+
1
);
}
...
...
src/query/src/qextbuffer.c
浏览文件 @
e8933e23
...
...
@@ -118,7 +118,7 @@ static bool tExtMemBufferAlloc(tExtMemBuffer *pMemBuffer) {
* To flush data to disk to accommodate more data
*/
if
(
pMemBuffer
->
numOfInMemPages
>
0
&&
pMemBuffer
->
numOfInMemPages
==
pMemBuffer
->
inMemCapacity
)
{
if
(
!
tExtMemBufferFlush
(
pMemBuffer
)
)
{
if
(
tExtMemBufferFlush
(
pMemBuffer
)
!=
0
)
{
return
false
;
}
}
...
...
@@ -268,6 +268,7 @@ int32_t tExtMemBufferFlush(tExtMemBuffer *pMemBuffer) {
size_t
retVal
=
fwrite
((
char
*
)
&
(
first
->
item
),
pMemBuffer
->
pageSize
,
1
,
pMemBuffer
->
file
);
if
(
retVal
<=
0
)
{
// failed to write to buffer, may be not enough space
ret
=
TAOS_SYSTEM_ERROR
(
errno
);
return
ret
;
}
pMemBuffer
->
fileMeta
.
numOfElemsInFile
+=
first
->
item
.
num
;
...
...
src/query/src/qfill.c
浏览文件 @
e8933e23
...
...
@@ -22,41 +22,6 @@
#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
int64_t
taosGetIntervalStartTimestamp
(
int64_t
startTime
,
int64_t
slidingTime
,
char
timeUnit
,
int16_t
precision
)
{
if
(
slidingTime
==
0
)
{
return
startTime
;
}
if
(
timeUnit
==
'a'
||
timeUnit
==
'm'
||
timeUnit
==
's'
||
timeUnit
==
'h'
)
{
return
(
startTime
/
slidingTime
)
*
slidingTime
;
}
else
{
/*
* here we revised the start time of day according to the local time zone,
* but in case of DST, the start time of one day need to be dynamically decided.
*
* TODO dynamically decide the start time of a day, move to common module
*/
// todo refactor to extract function that is available for Linux/Windows/Mac platform
#if defined(WINDOWS) && _MSC_VER >= 1900
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
int64_t
timezone
=
_timezone
;
int32_t
daylight
=
_daylight
;
char
**
tzname
=
_tzname
;
#endif
int64_t
t
=
(
precision
==
TSDB_TIME_PRECISION_MILLI
)
?
MILLISECOND_PER_SECOND
:
MILLISECOND_PER_SECOND
*
1000L
;
int64_t
revStartime
=
(
startTime
/
slidingTime
)
*
slidingTime
+
timezone
*
t
;
int64_t
revEndtime
=
revStartime
+
slidingTime
-
1
;
if
(
revEndtime
<
startTime
)
{
revStartime
+=
slidingTime
;
}
return
revStartime
;
}
}
SFillInfo
*
taosInitFillInfo
(
int32_t
order
,
TSKEY
skey
,
int32_t
numOfTags
,
int32_t
capacity
,
int32_t
numOfCols
,
int64_t
slidingTime
,
int8_t
slidingUnit
,
int8_t
precision
,
int32_t
fillType
,
SFillColInfo
*
pFillCol
)
{
if
(
fillType
==
TSDB_FILL_NONE
)
{
...
...
@@ -128,7 +93,7 @@ static TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterva
if
(
order
==
TSDB_ORDER_ASC
)
{
return
ekey
;
}
else
{
return
taosGetIntervalStartTimestamp
(
ekey
,
timeInterval
,
slidingTimeUnit
,
precision
);
return
taosGetIntervalStartTimestamp
(
ekey
,
timeInterval
,
timeInterval
,
slidingTimeUnit
,
precision
);
}
}
...
...
src/rpc/inc/rpcLog.h
浏览文件 @
e8933e23
...
...
@@ -31,9 +31,7 @@ extern int32_t tscEmbedded;
#define tInfo(...) { if (rpcDebugFlag & DEBUG_INFO) { taosPrintLog("RPC INFO ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }}
#define tDebug(...) { if (rpcDebugFlag & DEBUG_DEBUG) { taosPrintLog("RPC DEBUG ", rpcDebugFlag, __VA_ARGS__); }}
#define tTrace(...) { if (rpcDebugFlag & DEBUG_TRACE) { taosPrintLog("RPC TRACE ", rpcDebugFlag, __VA_ARGS__); }}
#define tDebugDump(x, y) { if (rpcDebugFlag & DEBUG_DEBUG) { taosDumpData((unsigned char *)x, y); }}
#define tTraceDump(x, y) { if (rpcDebugFlag & DEBUG_TRACE) { taosDumpData((unsigned char *)x, y); }}
#define tDump(x, y) { if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); }}
#ifdef __cplusplus
}
...
...
src/rpc/src/rpcMain.c
浏览文件 @
e8933e23
...
...
@@ -973,7 +973,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
SRpcInfo
*
pRpc
=
(
SRpcInfo
*
)
pRecv
->
shandle
;
SRpcConn
*
pConn
=
(
SRpcConn
*
)
pRecv
->
thandle
;
t
Trace
Dump
(
pRecv
->
msg
,
pRecv
->
msgLen
);
tDump
(
pRecv
->
msg
,
pRecv
->
msgLen
);
// underlying UDP layer does not know it is server or client
pRecv
->
connType
=
pRecv
->
connType
|
pRpc
->
connType
;
...
...
@@ -1247,7 +1247,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
tError
(
"%s, failed to send, msgLen:%d written:%d, reason:%s"
,
pConn
->
info
,
msgLen
,
writtenLen
,
strerror
(
errno
));
}
t
Trace
Dump
(
msg
,
msgLen
);
tDump
(
msg
,
msgLen
);
}
static
void
rpcProcessConnError
(
void
*
param
,
void
*
id
)
{
...
...
src/tsdb/inc/tsdbMain.h
浏览文件 @
e8933e23
...
...
@@ -47,9 +47,9 @@ extern int tsdbDebugFlag;
// Definitions
// ------------------ tsdbMeta.c
typedef
struct
STable
{
STableId
tableId
;
ETableType
type
;
tstr
*
name
;
// NOTE: there a flexible string here
STableId
tableId
;
uint64_t
suid
;
struct
STable
*
pSuper
;
// super table pointer
uint8_t
numOfSchemas
;
...
...
@@ -294,11 +294,34 @@ typedef struct {
#define TABLE_SUID(t) (t)->suid
#define TABLE_LASTKEY(t) (t)->lastKey
static
FORCE_INLINE
STSchema
*
tsdbGetTableSchema
(
STable
*
pTable
)
{
if
(
pTable
->
type
==
TSDB_CHILD_TABLE
)
{
// check child table first
STable
*
pSuper
=
pTable
->
pSuper
;
if
(
pSuper
==
NULL
)
return
NULL
;
return
pSuper
->
schema
[
pSuper
->
numOfSchemas
-
1
];
}
else
if
(
pTable
->
type
==
TSDB_NORMAL_TABLE
||
pTable
->
type
==
TSDB_SUPER_TABLE
||
pTable
->
type
==
TSDB_STREAM_TABLE
)
{
return
pTable
->
schema
[
pTable
->
numOfSchemas
-
1
];
}
else
{
return
NULL
;
}
}
static
FORCE_INLINE
STSchema
*
tsdbGetTableTagSchema
(
STable
*
pTable
)
{
if
(
pTable
->
type
==
TSDB_CHILD_TABLE
)
{
// check child table first
STable
*
pSuper
=
pTable
->
pSuper
;
if
(
pSuper
==
NULL
)
return
NULL
;
return
pSuper
->
tagSchema
;
}
else
if
(
pTable
->
type
==
TSDB_SUPER_TABLE
)
{
return
pTable
->
tagSchema
;
}
else
{
return
NULL
;
}
}
STsdbMeta
*
tsdbNewMeta
(
STsdbCfg
*
pCfg
);
void
tsdbFreeMeta
(
STsdbMeta
*
pMeta
);
int
tsdbOpenMeta
(
STsdbRepo
*
pRepo
);
int
tsdbCloseMeta
(
STsdbRepo
*
pRepo
);
STSchema
*
tsdbGetTableSchema
(
STable
*
pTable
);
STable
*
tsdbGetTableByUid
(
STsdbMeta
*
pMeta
,
uint64_t
uid
);
STSchema
*
tsdbGetTableSchemaByVersion
(
STable
*
pTable
,
int16_t
version
);
STSchema
*
tsdbGetTableTagSchema
(
STable
*
pTable
);
...
...
src/tsdb/src/tsdbMeta.c
浏览文件 @
e8933e23
...
...
@@ -282,6 +282,8 @@ int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
pMsg
->
tid
=
htonl
(
pMsg
->
tid
);
pMsg
->
tversion
=
htons
(
pMsg
->
tversion
);
pMsg
->
colId
=
htons
(
pMsg
->
colId
);
pMsg
->
type
=
htons
(
pMsg
->
type
);
pMsg
->
bytes
=
htons
(
pMsg
->
bytes
);
pMsg
->
tagValLen
=
htonl
(
pMsg
->
tagValLen
);
pMsg
->
numOfTags
=
htons
(
pMsg
->
numOfTags
);
pMsg
->
schemaLen
=
htonl
(
pMsg
->
schemaLen
);
...
...
src/util/inc/tlog.h
浏览文件 @
e8933e23
...
...
@@ -26,6 +26,7 @@ extern "C" {
#define DEBUG_INFO DEBUG_WARN
#define DEBUG_DEBUG 4U
#define DEBUG_TRACE 8U
#define DEBUG_DUMP 16U
#define DEBUG_SCREEN 64U
#define DEBUG_FILE 128U
...
...
src/util/src/tfile.c
浏览文件 @
e8933e23
...
...
@@ -26,12 +26,12 @@
#include "os.h"
#define RANDOM_FACTOR 5
#define RANDOM_F
ILE_FAIL_F
ACTOR 5
ssize_t
taos_tread
(
int
fd
,
void
*
buf
,
size_t
count
)
{
#ifdef TAOS_RANDOM_FILE_FAIL
if
(
rand
()
%
RANDOM_FACTOR
==
0
)
{
if
(
rand
()
%
RANDOM_F
ILE_FAIL_F
ACTOR
==
0
)
{
errno
=
EIO
;
return
-
1
;
}
...
...
@@ -43,7 +43,7 @@ ssize_t taos_tread(int fd, void *buf, size_t count)
ssize_t
taos_twrite
(
int
fd
,
void
*
buf
,
size_t
count
)
{
#ifdef TAOS_RANDOM_FILE_FAIL
if
(
rand
()
%
RANDOM_FACTOR
==
0
)
{
if
(
rand
()
%
RANDOM_F
ILE_FAIL_F
ACTOR
==
0
)
{
errno
=
EIO
;
return
-
1
;
}
...
...
@@ -55,7 +55,7 @@ ssize_t taos_twrite(int fd, void *buf, size_t count)
off_t
taos_lseek
(
int
fd
,
off_t
offset
,
int
whence
)
{
#ifdef TAOS_RANDOM_FILE_FAIL
if
(
rand
()
%
RANDOM_FACTOR
==
0
)
{
if
(
rand
()
%
RANDOM_F
ILE_FAIL_F
ACTOR
==
0
)
{
errno
=
EIO
;
return
-
1
;
}
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
e8933e23
...
...
@@ -34,8 +34,7 @@
#define TSDB_VNODE_VERSION_CONTENT_LEN 31
static
int32_t
tsOpennedVnodes
;
static
void
*
tsDnodeVnodesHash
;
static
SHashObj
*
tsDnodeVnodesHash
;
static
void
vnodeCleanUp
(
SVnodeObj
*
pVnode
);
static
int32_t
vnodeSaveCfg
(
SMDCreateVnodeMsg
*
pVnodeCfg
);
static
int32_t
vnodeReadCfg
(
SVnodeObj
*
pVnode
);
...
...
@@ -47,8 +46,6 @@ static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static
void
vnodeNotifyRole
(
void
*
ahandle
,
int8_t
role
);
static
void
vnodeNotifyFileSynced
(
void
*
ahandle
,
uint64_t
fversion
);
static
pthread_once_t
vnodeModuleInit
=
PTHREAD_ONCE_INIT
;
#ifndef _SYNC
tsync_h
syncStart
(
const
SSyncInfo
*
info
)
{
return
NULL
;
}
int32_t
syncForwardToPeer
(
tsync_h
shandle
,
void
*
pHead
,
void
*
mhandle
,
int
qtype
)
{
return
0
;
}
...
...
@@ -58,25 +55,28 @@ int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; }
void
syncConfirmForward
(
tsync_h
shandle
,
uint64_t
version
,
int32_t
code
)
{}
#endif
static
void
vnodeInit
()
{
int32_t
vnodeInitResources
()
{
vnodeInitWriteFp
();
vnodeInitReadFp
();
tsDnodeVnodesHash
=
taosHashInit
(
TSDB_MAX_VNODES
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
);
if
(
tsDnodeVnodesHash
==
NULL
)
{
vError
(
"failed to init vnode list"
);
return
TSDB_CODE_VND_OUT_OF_MEMORY
;
}
return
TSDB_CODE_SUCCESS
;
}
void
vnodeCleanupResources
()
{
taosHashCleanup
(
tsDnodeVnodesHash
);
vnodeModuleInit
=
PTHREAD_ONCE_INIT
;
tsDnodeVnodesHash
=
NULL
;
if
(
tsDnodeVnodesHash
!=
NULL
)
{
taosHashCleanup
(
tsDnodeVnodesHash
);
tsDnodeVnodesHash
=
NULL
;
}
}
int32_t
vnodeCreate
(
SMDCreateVnodeMsg
*
pVnodeCfg
)
{
int32_t
code
;
pthread_once
(
&
vnodeModuleInit
,
vnodeInit
);
SVnodeObj
*
pTemp
=
(
SVnodeObj
*
)
taosHashGet
(
tsDnodeVnodesHash
,
(
const
char
*
)
&
pVnodeCfg
->
cfg
.
vgId
,
sizeof
(
int32_t
));
if
(
pTemp
!=
NULL
)
{
...
...
@@ -144,11 +144,6 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
}
int32_t
vnodeDrop
(
int32_t
vgId
)
{
if
(
tsDnodeVnodesHash
==
NULL
)
{
vDebug
(
"vgId:%d, failed to drop, vgId not exist"
,
vgId
);
return
TSDB_CODE_VND_INVALID_VGROUP_ID
;
}
SVnodeObj
**
ppVnode
=
(
SVnodeObj
**
)
taosHashGet
(
tsDnodeVnodesHash
,
(
const
char
*
)
&
vgId
,
sizeof
(
int32_t
));
if
(
ppVnode
==
NULL
||
*
ppVnode
==
NULL
)
{
vDebug
(
"vgId:%d, failed to drop, vgId not find"
,
vgId
);
...
...
@@ -187,7 +182,6 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
int32_t
vnodeOpen
(
int32_t
vnode
,
char
*
rootDir
)
{
char
temp
[
TSDB_FILENAME_LEN
];
pthread_once
(
&
vnodeModuleInit
,
vnodeInit
);
SVnodeObj
*
pVnode
=
calloc
(
sizeof
(
SVnodeObj
),
1
);
if
(
pVnode
==
NULL
)
{
...
...
@@ -195,7 +189,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
return
TAOS_SYSTEM_ERROR
(
errno
);
}
atomic_add_fetch_32
(
&
tsOpennedVnodes
,
1
);
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
pVnode
->
vgId
=
vnode
;
...
...
@@ -366,13 +359,11 @@ void vnodeRelease(void *pVnodeRaw) {
free
(
pVnode
);
int32_t
count
=
atomic_sub_fetch_32
(
&
tsOpennedVnodes
,
1
);
int32_t
count
=
taosHashGetSize
(
tsDnodeVnodesHash
);
vDebug
(
"vgId:%d, vnode is released, vnodes:%d"
,
vgId
,
count
);
}
void
*
vnodeGetVnode
(
int32_t
vgId
)
{
if
(
tsDnodeVnodesHash
==
NULL
)
return
NULL
;
SVnodeObj
**
ppVnode
=
(
SVnodeObj
**
)
taosHashGet
(
tsDnodeVnodesHash
,
(
const
char
*
)
&
vgId
,
sizeof
(
int32_t
));
if
(
ppVnode
==
NULL
||
*
ppVnode
==
NULL
)
{
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
...
...
@@ -434,8 +425,6 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SDMStatusMsg *pStatus) {
}
int32_t
vnodeGetVnodeList
(
int32_t
vnodeList
[],
int32_t
*
numOfVnodes
)
{
if
(
tsDnodeVnodesHash
==
NULL
)
return
TSDB_CODE_SUCCESS
;
SHashMutableIterator
*
pIter
=
taosHashCreateIter
(
tsDnodeVnodesHash
);
while
(
taosHashIterNext
(
pIter
))
{
SVnodeObj
**
pVnode
=
taosHashIterGet
(
pIter
);
...
...
src/vnode/src/vnodeRead.c
浏览文件 @
e8933e23
...
...
@@ -110,13 +110,13 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// current connect is broken
if
(
code
==
TSDB_CODE_SUCCESS
)
{
// add lock here
handle
=
qRegisterQInfo
(
pVnode
->
qMgmt
,
pQInfo
);
if
(
handle
==
NULL
)
{
// failed to register qhandle
pRsp
->
code
=
TSDB_CODE_QRY_INVALID_QHANDLE
;
qKillQuery
(
pQInfo
);
qKillQuery
(
pQInfo
);
pQInfo
=
NULL
;
}
else
{
assert
(
*
handle
==
pQInfo
);
pRsp
->
qhandle
=
htobe64
((
uint64_t
)
(
handle
));
...
...
src/vnode/src/vnodeWrite.c
浏览文件 @
e8933e23
...
...
@@ -89,6 +89,11 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
return
syncCode
;
}
void
vnodeConfirmForward
(
void
*
param
,
uint64_t
version
,
int32_t
code
)
{
SVnodeObj
*
pVnode
=
(
SVnodeObj
*
)
param
;
syncConfirmForward
(
pVnode
->
sync
,
version
,
code
);
}
static
int32_t
vnodeProcessSubmitMsg
(
SVnodeObj
*
pVnode
,
void
*
pCont
,
SRspRet
*
pRet
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
tests/comparisonTest/opentsdb/opentsdbtest/pom.xml
浏览文件 @
e8933e23
...
...
@@ -94,7 +94,7 @@
<dependency>
<groupId>
com.google.guava
</groupId>
<artifactId>
guava
</artifactId>
<version>
18.0
</version>
<version>
24.1.1
</version>
</dependency>
<dependency>
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录