Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
4ed51b9a
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4ed51b9a
编写于
7月 21, 2021
作者:
Z
zhaoyanggh
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' of
https://github.com/taosdata/TDengine
into test/TD-4816
上级
7472194e
b538d580
变更
52
隐藏空白更改
内联
并排
Showing
52 changed file
with
302 addition
and
44 deletion
+302
-44
src/balance/src/bnThread.c
src/balance/src/bnThread.c
+2
-0
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+1
-0
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+3
-2
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+1
-1
src/client/src/tscPrepare.c
src/client/src/tscPrepare.c
+133
-14
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+19
-17
src/common/src/tname.c
src/common/src/tname.c
+1
-1
src/dnode/src/dnodeMPeer.c
src/dnode/src/dnodeMPeer.c
+2
-0
src/dnode/src/dnodeMRead.c
src/dnode/src/dnodeMRead.c
+2
-0
src/dnode/src/dnodeMWrite.c
src/dnode/src/dnodeMWrite.c
+3
-1
src/dnode/src/dnodeTelemetry.c
src/dnode/src/dnodeTelemetry.c
+2
-0
src/dnode/src/dnodeVMgmt.c
src/dnode/src/dnodeVMgmt.c
+2
-0
src/dnode/src/dnodeVRead.c
src/dnode/src/dnodeVRead.c
+5
-0
src/dnode/src/dnodeVWrite.c
src/dnode/src/dnodeVWrite.c
+2
-0
src/dnode/src/dnodeVnodes.c
src/dnode/src/dnodeVnodes.c
+2
-0
src/inc/taos.h
src/inc/taos.h
+2
-0
src/kit/shell/src/shellCheck.c
src/kit/shell/src/shellCheck.c
+2
-0
src/kit/shell/src/shellDarwin.c
src/kit/shell/src/shellDarwin.c
+2
-0
src/kit/shell/src/shellImport.c
src/kit/shell/src/shellImport.c
+2
-0
src/kit/shell/src/shellLinux.c
src/kit/shell/src/shellLinux.c
+2
-0
src/kit/shell/src/shellMain.c
src/kit/shell/src/shellMain.c
+2
-0
src/kit/taosdemo/taosdemo.c
src/kit/taosdemo/taosdemo.c
+17
-2
src/kit/taosdump/taosdump.c
src/kit/taosdump/taosdump.c
+4
-0
src/mnode/src/mnodeSdb.c
src/mnode/src/mnodeSdb.c
+1
-0
src/os/inc/osDef.h
src/os/inc/osDef.h
+19
-0
src/os/inc/osInc.h
src/os/inc/osInc.h
+1
-0
src/os/src/darwin/dwSemaphore.c
src/os/src/darwin/dwSemaphore.c
+2
-0
src/os/src/darwin/dwTimer.c
src/os/src/darwin/dwTimer.c
+1
-0
src/os/src/detail/osTimer.c
src/os/src/detail/osTimer.c
+2
-0
src/plugins/http/src/httpQueue.c
src/plugins/http/src/httpQueue.c
+2
-0
src/plugins/http/src/httpServer.c
src/plugins/http/src/httpServer.c
+2
-0
src/plugins/monitor/src/monMain.c
src/plugins/monitor/src/monMain.c
+1
-0
src/plugins/mqtt/src/mqttSystem.c
src/plugins/mqtt/src/mqttSystem.c
+3
-1
src/rpc/src/rpcTcp.c
src/rpc/src/rpcTcp.c
+6
-0
src/rpc/src/rpcUdp.c
src/rpc/src/rpcUdp.c
+2
-0
src/rpc/test/rclient.c
src/rpc/test/rclient.c
+2
-0
src/rpc/test/rsclient.c
src/rpc/test/rsclient.c
+3
-1
src/sync/src/syncRestore.c
src/sync/src/syncRestore.c
+1
-0
src/sync/src/syncRetrieve.c
src/sync/src/syncRetrieve.c
+1
-0
src/sync/src/syncTcp.c
src/sync/src/syncTcp.c
+3
-0
src/sync/test/syncClient.c
src/sync/test/syncClient.c
+2
-0
src/sync/test/syncServer.c
src/sync/test/syncServer.c
+2
-0
src/tsdb/src/tsdbCommitQueue.c
src/tsdb/src/tsdbCommitQueue.c
+3
-1
src/util/src/tcache.c
src/util/src/tcache.c
+2
-0
src/util/src/tlog.c
src/util/src/tlog.c
+4
-0
src/util/src/tnettest.c
src/util/src/tnettest.c
+6
-2
src/util/src/tnote.c
src/util/src/tnote.c
+2
-0
src/util/src/tsched.c
src/util/src/tsched.c
+3
-1
src/util/tests/trefTest.c
src/util/tests/trefTest.c
+8
-0
src/vnode/src/vnodeBackup.c
src/vnode/src/vnodeBackup.c
+2
-0
src/vnode/src/vnodeWorker.c
src/vnode/src/vnodeWorker.c
+2
-0
src/wal/src/walMgmt.c
src/wal/src/walMgmt.c
+1
-0
未找到文件。
src/balance/src/bnThread.c
浏览文件 @
4ed51b9a
...
...
@@ -23,6 +23,8 @@
static
SBnThread
tsBnThread
;
static
void
*
bnThreadFunc
(
void
*
arg
)
{
setThreadName
(
"bnThreadd"
);
while
(
1
)
{
pthread_mutex_lock
(
&
tsBnThread
.
mutex
);
if
(
tsBnThread
.
stop
)
{
...
...
src/client/inc/tscUtil.h
浏览文件 @
4ed51b9a
...
...
@@ -110,6 +110,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
void
tscDestroyDataBlock
(
STableDataBlocks
*
pDataBlock
,
bool
removeMeta
);
void
tscSortRemoveDataBlockDupRowsRaw
(
STableDataBlocks
*
dataBuf
);
int
tscSortRemoveDataBlockDupRows
(
STableDataBlocks
*
dataBuf
,
SBlockKeyInfo
*
pBlkKeyInfo
);
int32_t
tsSetBlockInfo
(
SSubmitBlk
*
pBlocks
,
const
STableMeta
*
pTableMeta
,
int32_t
numOfRows
);
void
tscDestroyBoundColumnInfo
(
SParsedDataColInfo
*
pColInfo
);
void
doRetrieveSubqueryData
(
SSchedMsg
*
pMsg
);
...
...
src/client/inc/tsclient.h
浏览文件 @
4ed51b9a
...
...
@@ -138,7 +138,8 @@ typedef struct STableDataBlocks {
uint32_t
size
;
STableMeta
*
pTableMeta
;
// the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache
char
*
pData
;
bool
cloned
;
SParsedDataColInfo
boundColumnInfo
;
// for parameter ('?') binding
...
...
@@ -436,4 +437,4 @@ int32_t getExtendedRowSize(STableComInfo *tinfo);
}
#endif
#endif
\ No newline at end of file
#endif
src/client/src/tscParseInsert.c
浏览文件 @
4ed51b9a
...
...
@@ -1056,7 +1056,7 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tsSetBlockInfo
(
SSubmitBlk
*
pBlocks
,
const
STableMeta
*
pTableMeta
,
int32_t
numOfRows
)
{
int32_t
FORCE_INLINE
tsSetBlockInfo
(
SSubmitBlk
*
pBlocks
,
const
STableMeta
*
pTableMeta
,
int32_t
numOfRows
)
{
pBlocks
->
tid
=
pTableMeta
->
id
.
tid
;
pBlocks
->
uid
=
pTableMeta
->
id
.
uid
;
pBlocks
->
sversion
=
pTableMeta
->
sversion
;
...
...
src/client/src/tscPrepare.c
浏览文件 @
4ed51b9a
...
...
@@ -47,6 +47,7 @@ typedef struct SNormalStmt {
typedef
struct
SMultiTbStmt
{
bool
nameSet
;
bool
tagSet
;
bool
subSet
;
uint64_t
currentUid
;
char
*
sqlstr
;
uint32_t
tbNum
;
...
...
@@ -54,6 +55,7 @@ typedef struct SMultiTbStmt {
SStrToken
stbname
;
SStrToken
values
;
SArray
*
tags
;
STableDataBlocks
*
lastBlock
;
SHashObj
*
pTableHash
;
SHashObj
*
pTableBlockHashList
;
// data block for each table
}
SMultiTbStmt
;
...
...
@@ -347,11 +349,11 @@ int32_t fillTablesColumnsNull(SSqlObj* pSql) {
////////////////////////////////////////////////////////////////////////////////
// functions for insertion statement preparation
static
int
doBindParam
(
STableDataBlocks
*
pBlock
,
char
*
data
,
SParamInfo
*
param
,
TAOS_BIND
*
bind
,
int32_t
colNum
)
{
if
(
bind
->
is_null
!=
NULL
&&
*
(
bind
->
is_null
))
{
setNull
(
data
+
param
->
offset
,
param
->
type
,
param
->
bytes
);
return
TSDB_CODE_SUCCESS
;
}
static
FORCE_INLINE
int
doBindParam
(
STableDataBlocks
*
pBlock
,
char
*
data
,
SParamInfo
*
param
,
TAOS_BIND
*
bind
,
int32_t
colNum
)
{
if
(
bind
->
is_null
!=
NULL
&&
*
(
bind
->
is_null
))
{
setNull
(
data
+
param
->
offset
,
param
->
type
,
param
->
bytes
);
return
TSDB_CODE_SUCCESS
;
}
#if 0
if (0) {
...
...
@@ -746,25 +748,25 @@ static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param,
case
TSDB_DATA_TYPE_BOOL
:
case
TSDB_DATA_TYPE_TINYINT
:
case
TSDB_DATA_TYPE_UTINYINT
:
size
=
1
;
*
(
uint8_t
*
)(
data
+
param
->
offset
)
=
*
(
uint8_t
*
)
bind
->
buffer
;
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
case
TSDB_DATA_TYPE_USMALLINT
:
size
=
2
;
*
(
uint16_t
*
)(
data
+
param
->
offset
)
=
*
(
uint16_t
*
)
bind
->
buffer
;
break
;
case
TSDB_DATA_TYPE_INT
:
case
TSDB_DATA_TYPE_UINT
:
case
TSDB_DATA_TYPE_FLOAT
:
size
=
4
;
*
(
uint32_t
*
)(
data
+
param
->
offset
)
=
*
(
uint32_t
*
)
bind
->
buffer
;
break
;
case
TSDB_DATA_TYPE_BIGINT
:
case
TSDB_DATA_TYPE_UBIGINT
:
case
TSDB_DATA_TYPE_DOUBLE
:
case
TSDB_DATA_TYPE_TIMESTAMP
:
size
=
8
;
*
(
uint64_t
*
)(
data
+
param
->
offset
)
=
*
(
uint64_t
*
)
bind
->
buffer
;
break
;
case
TSDB_DATA_TYPE_BINARY
:
...
...
@@ -790,7 +792,6 @@ static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param,
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
memcpy
(
data
+
param
->
offset
,
bind
->
buffer
,
size
);
if
(
param
->
offset
==
0
)
{
if
(
tsCheckTimestamp
(
pBlock
,
data
+
param
->
offset
)
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"invalid timestamp"
);
...
...
@@ -801,6 +802,58 @@ static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param,
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
insertStmtGenLastBlock
(
STableDataBlocks
**
lastBlock
,
STableDataBlocks
*
pBlock
)
{
*
lastBlock
=
(
STableDataBlocks
*
)
malloc
(
sizeof
(
STableDataBlocks
));
memcpy
(
*
lastBlock
,
pBlock
,
sizeof
(
STableDataBlocks
));
(
*
lastBlock
)
->
cloned
=
true
;
(
*
lastBlock
)
->
pData
=
NULL
;
(
*
lastBlock
)
->
ordered
=
true
;
(
*
lastBlock
)
->
prevTS
=
INT64_MIN
;
(
*
lastBlock
)
->
size
=
sizeof
(
SSubmitBlk
);
(
*
lastBlock
)
->
tsSource
=
-
1
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
insertStmtGenBlock
(
STscStmt
*
pStmt
,
STableDataBlocks
**
pBlock
,
STableMeta
*
pTableMeta
,
SName
*
name
)
{
int32_t
code
=
0
;
if
(
pStmt
->
mtb
.
lastBlock
==
NULL
)
{
tscError
(
"no previous data block"
);
return
TSDB_CODE_TSC_APP_ERROR
;
}
int32_t
msize
=
tscGetTableMetaSize
(
pTableMeta
);
int32_t
tsize
=
sizeof
(
STableDataBlocks
)
+
msize
;
void
*
t
=
malloc
(
tsize
);
*
pBlock
=
t
;
memcpy
(
*
pBlock
,
pStmt
->
mtb
.
lastBlock
,
sizeof
(
STableDataBlocks
));
t
=
(
char
*
)
t
+
sizeof
(
STableDataBlocks
);
(
*
pBlock
)
->
pTableMeta
=
t
;
memcpy
((
*
pBlock
)
->
pTableMeta
,
pTableMeta
,
msize
);
(
*
pBlock
)
->
pData
=
malloc
((
*
pBlock
)
->
nAllocSize
);
(
*
pBlock
)
->
vgId
=
(
*
pBlock
)
->
pTableMeta
->
vgId
;
tNameAssign
(
&
(
*
pBlock
)
->
tableName
,
name
);
SSubmitBlk
*
blk
=
(
SSubmitBlk
*
)(
*
pBlock
)
->
pData
;
memset
(
blk
,
0
,
sizeof
(
*
blk
));
code
=
tsSetBlockInfo
(
blk
,
pTableMeta
,
0
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
STMT_RET
(
code
);
}
return
TSDB_CODE_SUCCESS
;
}
static
int
doBindBatchParam
(
STableDataBlocks
*
pBlock
,
SParamInfo
*
param
,
TAOS_MULTI_BIND
*
bind
,
int32_t
rowNum
)
{
if
(
bind
->
buffer_type
!=
param
->
type
||
!
isValidDataType
(
param
->
type
))
{
...
...
@@ -1172,7 +1225,7 @@ static void insertBatchClean(STscStmt* pStmt) {
static
int
insertBatchStmtExecute
(
STscStmt
*
pStmt
)
{
int32_t
code
=
0
;
if
(
pStmt
->
mtb
.
nameSet
==
false
)
{
tscError
(
"0x%"
PRIx64
" no table name set"
,
pStmt
->
pSql
->
self
);
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
&
pStmt
->
pSql
->
cmd
),
"no table name set"
);
...
...
@@ -1227,11 +1280,11 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) {
pStmt
->
mtb
.
tbname
=
sToken
;
pStmt
->
mtb
.
nameSet
=
false
;
if
(
pStmt
->
mtb
.
pTableHash
==
NULL
)
{
pStmt
->
mtb
.
pTableHash
=
taosHashInit
(
1
6
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
false
);
pStmt
->
mtb
.
pTableHash
=
taosHashInit
(
1
28
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
false
);
}
if
(
pStmt
->
mtb
.
pTableBlockHashList
==
NULL
)
{
pStmt
->
mtb
.
pTableBlockHashList
=
taosHashInit
(
1
6
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
);
pStmt
->
mtb
.
pTableBlockHashList
=
taosHashInit
(
1
28
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
);
}
pStmt
->
mtb
.
tagSet
=
true
;
...
...
@@ -1522,6 +1575,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
int
taos_stmt_set_tbname_tags
(
TAOS_STMT
*
stmt
,
const
char
*
name
,
TAOS_BIND
*
tags
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
int32_t
code
=
0
;
if
(
stmt
==
NULL
||
pStmt
->
pSql
==
NULL
||
pStmt
->
taos
==
NULL
)
{
STMT_RET
(
TSDB_CODE_TSC_DISCONNECTED
);
...
...
@@ -1559,6 +1613,9 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
SSubmitBlk
*
pBlk
=
(
SSubmitBlk
*
)
(
*
t1
)
->
pData
;
pCmd
->
batchSize
=
pBlk
->
numOfRows
;
if
(
pBlk
->
numOfRows
==
0
)
{
(
*
t1
)
->
prevTS
=
INT64_MIN
;
}
taosHashPut
(
pCmd
->
insertParam
.
pTableBlockHashList
,
(
void
*
)
&
pStmt
->
mtb
.
currentUid
,
sizeof
(
pStmt
->
mtb
.
currentUid
),
(
void
*
)
t1
,
POINTER_BYTES
);
...
...
@@ -1566,6 +1623,51 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
STMT_RET
(
TSDB_CODE_SUCCESS
);
}
if
(
pStmt
->
mtb
.
subSet
&&
taosHashGetSize
(
pStmt
->
mtb
.
pTableHash
)
>
0
)
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
0
);
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
char
sTableName
[
TSDB_TABLE_FNAME_LEN
];
strncpy
(
sTableName
,
pTableMeta
->
sTableName
,
sizeof
(
sTableName
));
SStrToken
tname
=
{
0
};
tname
.
type
=
TK_STRING
;
tname
.
z
=
(
char
*
)
name
;
tname
.
n
=
(
uint32_t
)
strlen
(
name
);
SName
fullname
=
{
0
};
tscSetTableFullName
(
&
fullname
,
&
tname
,
pSql
);
memcpy
(
&
pTableMetaInfo
->
name
,
&
fullname
,
sizeof
(
fullname
));
code
=
tscGetTableMeta
(
pSql
,
pTableMetaInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
STMT_RET
(
code
);
}
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
if
(
strcmp
(
sTableName
,
pTableMeta
->
sTableName
))
{
tscError
(
"0x%"
PRIx64
" only tables belongs to one stable is allowed"
,
pSql
->
self
);
STMT_RET
(
TSDB_CODE_TSC_APP_ERROR
);
}
STableDataBlocks
*
pBlock
=
NULL
;
insertStmtGenBlock
(
pStmt
,
&
pBlock
,
pTableMeta
,
&
pTableMetaInfo
->
name
);
pCmd
->
batchSize
=
0
;
pStmt
->
mtb
.
currentUid
=
pTableMeta
->
id
.
uid
;
pStmt
->
mtb
.
tbNum
++
;
taosHashPut
(
pCmd
->
insertParam
.
pTableBlockHashList
,
(
void
*
)
&
pStmt
->
mtb
.
currentUid
,
sizeof
(
pStmt
->
mtb
.
currentUid
),
(
void
*
)
&
pBlock
,
POINTER_BYTES
);
taosHashPut
(
pStmt
->
mtb
.
pTableBlockHashList
,
(
void
*
)
&
pStmt
->
mtb
.
currentUid
,
sizeof
(
pStmt
->
mtb
.
currentUid
),
(
void
*
)
&
pBlock
,
POINTER_BYTES
);
taosHashPut
(
pStmt
->
mtb
.
pTableHash
,
name
,
strlen
(
name
),
(
char
*
)
&
pTableMeta
->
id
.
uid
,
sizeof
(
pTableMeta
->
id
.
uid
));
tscDebug
(
"0x%"
PRIx64
" table:%s is prepared, uid:%"
PRIx64
,
pSql
->
self
,
name
,
pStmt
->
mtb
.
currentUid
);
STMT_RET
(
TSDB_CODE_SUCCESS
);
}
if
(
pStmt
->
mtb
.
tagSet
)
{
pStmt
->
mtb
.
tbname
=
tscReplaceStrToken
(
&
pSql
->
sqlstr
,
&
pStmt
->
mtb
.
tbname
,
name
);
}
else
{
...
...
@@ -1594,7 +1696,7 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
pCmd
->
insertParam
.
pTableBlockHashList
=
hashList
;
}
int32_t
code
=
tsParseSql
(
pStmt
->
pSql
,
true
);
code
=
tsParseSql
(
pStmt
->
pSql
,
true
);
if
(
code
==
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
// wait for the callback function to post the semaphore
tsem_wait
(
&
pStmt
->
pSql
->
rspSem
);
...
...
@@ -1622,6 +1724,10 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
taosHashPut
(
pStmt
->
mtb
.
pTableBlockHashList
,
(
void
*
)
&
pStmt
->
mtb
.
currentUid
,
sizeof
(
pStmt
->
mtb
.
currentUid
),
(
void
*
)
&
pBlock
,
POINTER_BYTES
);
taosHashPut
(
pStmt
->
mtb
.
pTableHash
,
name
,
strlen
(
name
),
(
char
*
)
&
pTableMeta
->
id
.
uid
,
sizeof
(
pTableMeta
->
id
.
uid
));
if
(
pStmt
->
mtb
.
lastBlock
==
NULL
)
{
insertStmtGenLastBlock
(
&
pStmt
->
mtb
.
lastBlock
,
pBlock
);
}
tscDebug
(
"0x%"
PRIx64
" table:%s is prepared, uid:%"
PRIx64
,
pSql
->
self
,
name
,
pStmt
->
mtb
.
currentUid
);
}
...
...
@@ -1629,7 +1735,17 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
}
int
taos_stmt_set_sub_tbname
(
TAOS_STMT
*
stmt
,
const
char
*
name
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
pStmt
->
mtb
.
subSet
=
true
;
return
taos_stmt_set_tbname_tags
(
stmt
,
name
,
NULL
);
}
int
taos_stmt_set_tbname
(
TAOS_STMT
*
stmt
,
const
char
*
name
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
pStmt
->
mtb
.
subSet
=
false
;
return
taos_stmt_set_tbname_tags
(
stmt
,
name
,
NULL
);
}
...
...
@@ -1653,6 +1769,7 @@ int taos_stmt_close(TAOS_STMT* stmt) {
if
(
pStmt
->
pSql
&&
pStmt
->
pSql
->
res
.
code
!=
0
)
{
rmMeta
=
true
;
}
tscDestroyDataBlock
(
pStmt
->
mtb
.
lastBlock
,
rmMeta
);
pStmt
->
mtb
.
pTableBlockHashList
=
tscDestroyBlockHashTable
(
pStmt
->
mtb
.
pTableBlockHashList
,
rmMeta
);
taosHashCleanup
(
pStmt
->
pSql
->
cmd
.
insertParam
.
pTableBlockHashList
);
pStmt
->
pSql
->
cmd
.
insertParam
.
pTableBlockHashList
=
NULL
;
...
...
@@ -1687,6 +1804,8 @@ int taos_stmt_bind_param(TAOS_STMT* stmt, TAOS_BIND* bind) {
pStmt
->
last
=
STMT_BIND
;
tscDebug
(
"tableId:%"
PRIu64
", try to bind one row"
,
pStmt
->
mtb
.
currentUid
);
STMT_RET
(
insertStmtBindParam
(
pStmt
,
bind
));
}
else
{
STMT_RET
(
normalStmtBindParam
(
pStmt
,
bind
));
...
...
src/client/src/tscUtil.c
浏览文件 @
4ed51b9a
...
...
@@ -1517,12 +1517,6 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
}
tfree
(
pDataBlock
->
pData
);
tfree
(
pDataBlock
->
params
);
// free the refcount for metermeta
if
(
pDataBlock
->
pTableMeta
!=
NULL
)
{
tfree
(
pDataBlock
->
pTableMeta
);
}
if
(
removeMeta
)
{
char
name
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
...
...
@@ -1531,7 +1525,17 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
taosHashRemove
(
tscTableMetaInfo
,
name
,
strnlen
(
name
,
TSDB_TABLE_FNAME_LEN
));
}
tscDestroyBoundColumnInfo
(
&
pDataBlock
->
boundColumnInfo
);
if
(
!
pDataBlock
->
cloned
)
{
tfree
(
pDataBlock
->
params
);
// free the refcount for metermeta
if
(
pDataBlock
->
pTableMeta
!=
NULL
)
{
tfree
(
pDataBlock
->
pTableMeta
);
}
tscDestroyBoundColumnInfo
(
&
pDataBlock
->
boundColumnInfo
);
}
tfree
(
pDataBlock
);
}
...
...
@@ -1710,12 +1714,14 @@ int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOff
dataBuf
->
nAllocSize
=
dataBuf
->
headerSize
*
2
;
}
dataBuf
->
pData
=
calloc
(
1
,
dataBuf
->
nAllocSize
);
//dataBuf->pData = calloc(1, dataBuf->nAllocSize);
dataBuf
->
pData
=
malloc
(
dataBuf
->
nAllocSize
);
if
(
dataBuf
->
pData
==
NULL
)
{
tscError
(
"failed to allocated memory, reason:%s"
,
strerror
(
errno
));
tfree
(
dataBuf
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
memset
(
dataBuf
->
pData
,
0
,
sizeof
(
SSubmitBlk
));
//Here we keep the tableMeta to avoid it to be remove by other threads.
dataBuf
->
pTableMeta
=
tscTableMetaDup
(
pTableMeta
);
...
...
@@ -1956,16 +1962,14 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
static
void
extractTableNameList
(
SInsertStatementParam
*
pInsertParam
,
bool
freeBlockMap
)
{
pInsertParam
->
numOfTables
=
(
int32_t
)
taosHashGetSize
(
pInsertParam
->
pTableBlockHashList
);
if
(
pInsertParam
->
pTableNameList
==
NULL
)
{
pInsertParam
->
pTableNameList
=
calloc
(
pInsertParam
->
numOfTables
,
POINTER_BYTES
);
}
else
{
memset
(
pInsertParam
->
pTableNameList
,
0
,
pInsertParam
->
numOfTables
*
POINTER_BYTES
);
pInsertParam
->
pTableNameList
=
malloc
(
pInsertParam
->
numOfTables
*
POINTER_BYTES
);
}
STableDataBlocks
**
p1
=
taosHashIterate
(
pInsertParam
->
pTableBlockHashList
,
NULL
);
int32_t
i
=
0
;
while
(
p1
)
{
STableDataBlocks
*
pBlocks
=
*
p1
;
tfree
(
pInsertParam
->
pTableNameList
[
i
]);
//
tfree(pInsertParam->pTableNameList[i]);
pInsertParam
->
pTableNameList
[
i
++
]
=
tNameDup
(
&
pBlocks
->
tableName
);
p1
=
taosHashIterate
(
pInsertParam
->
pTableBlockHashList
,
p1
);
...
...
@@ -2009,14 +2013,12 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
int64_t
destSize
=
dataBuf
->
size
+
pOneTableBlock
->
size
+
pBlocks
->
numOfRows
*
expandSize
+
sizeof
(
STColumn
)
*
tscGetNumOfColumns
(
pOneTableBlock
->
pTableMeta
);
if
(
dataBuf
->
nAllocSize
<
destSize
)
{
while
(
dataBuf
->
nAllocSize
<
destSize
)
{
dataBuf
->
nAllocSize
=
(
uint32_t
)(
dataBuf
->
nAllocSize
*
1
.
5
);
}
dataBuf
->
nAllocSize
=
(
uint32_t
)(
destSize
*
1
.
5
);
char
*
tmp
=
realloc
(
dataBuf
->
pData
,
dataBuf
->
nAllocSize
);
if
(
tmp
!=
NULL
)
{
dataBuf
->
pData
=
tmp
;
memset
(
dataBuf
->
pData
+
dataBuf
->
size
,
0
,
dataBuf
->
nAllocSize
-
dataBuf
->
size
);
//
memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size);
}
else
{
// failed to allocate memory, free already allocated memory and return error code
tscError
(
"0x%"
PRIx64
" failed to allocate memory for merging submit block, size:%d"
,
pInsertParam
->
objectId
,
dataBuf
->
nAllocSize
);
...
...
@@ -4384,7 +4386,7 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta) {
assert
(
pTableMeta
!=
NULL
);
size_t
size
=
tscGetTableMetaSize
(
pTableMeta
);
STableMeta
*
p
=
calloc
(
1
,
size
);
STableMeta
*
p
=
malloc
(
size
);
memcpy
(
p
,
pTableMeta
,
size
);
return
p
;
}
...
...
src/common/src/tname.c
浏览文件 @
4ed51b9a
...
...
@@ -306,7 +306,7 @@ bool tIsValidName(const SName* name) {
SName
*
tNameDup
(
const
SName
*
name
)
{
assert
(
name
!=
NULL
);
SName
*
p
=
calloc
(
1
,
sizeof
(
SName
));
SName
*
p
=
malloc
(
sizeof
(
SName
));
memcpy
(
p
,
name
,
sizeof
(
SName
));
return
p
;
}
...
...
src/dnode/src/dnodeMPeer.c
浏览文件 @
4ed51b9a
...
...
@@ -150,6 +150,8 @@ static void *dnodeProcessMPeerQueue(void *param) {
SMnodeMsg
*
pPeerMsg
;
int32_t
type
;
void
*
unUsed
;
setThreadName
(
"dnodeMPeerQ"
);
while
(
1
)
{
if
(
taosReadQitemFromQset
(
tsMPeerQset
,
&
type
,
(
void
**
)
&
pPeerMsg
,
&
unUsed
)
==
0
)
{
...
...
src/dnode/src/dnodeMRead.c
浏览文件 @
4ed51b9a
...
...
@@ -155,6 +155,8 @@ static void *dnodeProcessMReadQueue(void *param) {
int32_t
type
;
void
*
unUsed
;
setThreadName
(
"dnodeMReadQ"
);
while
(
1
)
{
if
(
taosReadQitemFromQset
(
tsMReadQset
,
&
type
,
(
void
**
)
&
pRead
,
&
unUsed
)
==
0
)
{
dDebug
(
"qset:%p, mnode read got no message from qset, exiting"
,
tsMReadQset
);
...
...
src/dnode/src/dnodeMWrite.c
浏览文件 @
4ed51b9a
...
...
@@ -168,7 +168,9 @@ static void *dnodeProcessMWriteQueue(void *param) {
SMnodeMsg
*
pWrite
;
int32_t
type
;
void
*
unUsed
;
setThreadName
(
"dnodeMWriteQ"
);
while
(
1
)
{
if
(
taosReadQitemFromQset
(
tsMWriteQset
,
&
type
,
(
void
**
)
&
pWrite
,
&
unUsed
)
==
0
)
{
dDebug
(
"qset:%p, mnode write got no message from qset, exiting"
,
tsMWriteQset
);
...
...
src/dnode/src/dnodeTelemetry.c
浏览文件 @
4ed51b9a
...
...
@@ -245,6 +245,8 @@ static void* telemetryThread(void* param) {
clock_gettime
(
CLOCK_REALTIME
,
&
end
);
end
.
tv_sec
+=
300
;
// wait 5 minutes before send first report
setThreadName
(
"telemetryThrd"
);
while
(
!
tsExit
)
{
int
r
=
0
;
struct
timespec
ts
=
end
;
...
...
src/dnode/src/dnodeVMgmt.c
浏览文件 @
4ed51b9a
...
...
@@ -103,6 +103,8 @@ static void *dnodeProcessMgmtQueue(void *wparam) {
int32_t
qtype
;
void
*
handle
;
setThreadName
(
"dnodeMgmtQ"
);
while
(
1
)
{
if
(
taosReadQitemFromQset
(
pPool
->
qset
,
&
qtype
,
(
void
**
)
&
pMgmt
,
&
handle
)
==
0
)
{
dDebug
(
"qdnode mgmt got no message from qset:%p, , exit"
,
pPool
->
qset
);
...
...
src/dnode/src/dnodeVRead.c
浏览文件 @
4ed51b9a
...
...
@@ -118,6 +118,11 @@ static void *dnodeProcessReadQueue(void *wparam) {
SVReadMsg
*
pRead
;
int32_t
qtype
;
void
*
pVnode
;
char
name
[
16
];
memset
(
name
,
0
,
16
);
snprintf
(
name
,
16
,
"%s-dnReadQ"
,
pPool
->
name
);
setThreadName
(
name
);
while
(
1
)
{
if
(
taosReadQitemFromQset
(
pPool
->
qset
,
&
qtype
,
(
void
**
)
&
pRead
,
&
pVnode
)
==
0
)
{
...
...
src/dnode/src/dnodeVWrite.c
浏览文件 @
4ed51b9a
...
...
@@ -191,6 +191,8 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
taosBlockSIGPIPE
();
dDebug
(
"dnode vwrite worker:%d is running"
,
pWorker
->
workerId
);
setThreadName
(
"dnodeWriteQ"
);
while
(
1
)
{
numOfMsgs
=
taosReadAllQitemsFromQset
(
pWorker
->
qset
,
pWorker
->
qall
,
&
pVnode
);
if
(
numOfMsgs
==
0
)
{
...
...
src/dnode/src/dnodeVnodes.c
浏览文件 @
4ed51b9a
...
...
@@ -91,6 +91,8 @@ static void *dnodeOpenVnode(void *param) {
dDebug
(
"thread:%d, start to open %d vnodes"
,
pThread
->
threadIndex
,
pThread
->
vnodeNum
);
setThreadName
(
"dnodeOpenVnode"
);
for
(
int32_t
v
=
0
;
v
<
pThread
->
vnodeNum
;
++
v
)
{
int32_t
vgId
=
pThread
->
vnodeList
[
v
];
snprintf
(
stepDesc
,
TSDB_STEP_DESC_LEN
,
"vgId:%d, start to restore, %d of %d have been opened"
,
vgId
,
tsOpenVnodes
,
tsTotalVnodes
);
...
...
src/inc/taos.h
浏览文件 @
4ed51b9a
...
...
@@ -111,10 +111,12 @@ typedef struct TAOS_MULTI_BIND {
}
TAOS_MULTI_BIND
;
DLL_EXPORT
TAOS_STMT
*
taos_stmt_init
(
TAOS
*
taos
);
DLL_EXPORT
int
taos_stmt_prepare
(
TAOS_STMT
*
stmt
,
const
char
*
sql
,
unsigned
long
length
);
DLL_EXPORT
int
taos_stmt_set_tbname_tags
(
TAOS_STMT
*
stmt
,
const
char
*
name
,
TAOS_BIND
*
tags
);
DLL_EXPORT
int
taos_stmt_set_tbname
(
TAOS_STMT
*
stmt
,
const
char
*
name
);
DLL_EXPORT
int
taos_stmt_set_sub_tbname
(
TAOS_STMT
*
stmt
,
const
char
*
name
);
DLL_EXPORT
int
taos_stmt_is_insert
(
TAOS_STMT
*
stmt
,
int
*
insert
);
DLL_EXPORT
int
taos_stmt_num_params
(
TAOS_STMT
*
stmt
,
int
*
nums
);
int
taos_stmt_get_param
(
TAOS_STMT
*
stmt
,
int
idx
,
int
*
type
,
int
*
bytes
);
...
...
src/kit/shell/src/shellCheck.c
浏览文件 @
4ed51b9a
...
...
@@ -104,6 +104,8 @@ static void shellFreeTbnames() {
static
void
*
shellCheckThreadFp
(
void
*
arg
)
{
ShellThreadObj
*
pThread
=
(
ShellThreadObj
*
)
arg
;
setThreadName
(
"shellCheckThrd"
);
int32_t
interval
=
tbNum
/
pThread
->
totalThreads
+
1
;
int32_t
start
=
pThread
->
threadIndex
*
interval
;
int32_t
end
=
(
pThread
->
threadIndex
+
1
)
*
interval
;
...
...
src/kit/shell/src/shellDarwin.c
浏览文件 @
4ed51b9a
...
...
@@ -336,6 +336,8 @@ void *shellLoopQuery(void *arg) {
TAOS
*
con
=
(
TAOS
*
)
arg
;
setThreadName
(
"shellLoopQuery"
);
pthread_cleanup_push
(
cleanup_handler
,
NULL
);
char
*
command
=
malloc
(
MAX_COMMAND_SIZE
);
...
...
src/kit/shell/src/shellImport.c
浏览文件 @
4ed51b9a
...
...
@@ -223,6 +223,8 @@ static void shellSourceFile(TAOS *con, char *fptr) {
void
*
shellImportThreadFp
(
void
*
arg
)
{
ShellThreadObj
*
pThread
=
(
ShellThreadObj
*
)
arg
;
setThreadName
(
"shellImportThrd"
);
for
(
int
f
=
0
;
f
<
shellSQLFileNum
;
++
f
)
{
if
(
f
%
pThread
->
totalThreads
==
pThread
->
threadIndex
)
{
char
*
SQLFileName
=
shellSQLFiles
[
f
];
...
...
src/kit/shell/src/shellLinux.c
浏览文件 @
4ed51b9a
...
...
@@ -336,6 +336,8 @@ void *shellLoopQuery(void *arg) {
TAOS
*
con
=
(
TAOS
*
)
arg
;
setThreadName
(
"shellLoopQuery"
);
pthread_cleanup_push
(
cleanup_handler
,
NULL
);
char
*
command
=
malloc
(
MAX_COMMAND_SIZE
);
...
...
src/kit/shell/src/shellMain.c
浏览文件 @
4ed51b9a
...
...
@@ -26,6 +26,8 @@ void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context) {
}
void
*
cancelHandler
(
void
*
arg
)
{
setThreadName
(
"cancelHandler"
);
while
(
1
)
{
if
(
tsem_wait
(
&
cancelSem
)
!=
0
)
{
taosMsleep
(
10
);
...
...
src/kit/taosdemo/taosdemo.c
浏览文件 @
4ed51b9a
...
...
@@ -3025,10 +3025,11 @@ static void* createTable(void *sarg)
threadInfo
*
pThreadInfo
=
(
threadInfo
*
)
sarg
;
SSuperTable
*
superTblInfo
=
pThreadInfo
->
superTblInfo
;
setThreadName
(
"createTable"
);
uint64_t
lastPrintTime
=
taosGetTimestampMs
();
int
buff_len
;
buff_len
=
BUFFER_SIZE
;
int
buff_len
=
BUFFER_SIZE
;
pThreadInfo
->
buffer
=
calloc
(
buff_len
,
1
);
if
(
pThreadInfo
->
buffer
==
NULL
)
{
...
...
@@ -6428,6 +6429,8 @@ static void* syncWrite(void *sarg) {
threadInfo
*
pThreadInfo
=
(
threadInfo
*
)
sarg
;
SSuperTable
*
superTblInfo
=
pThreadInfo
->
superTblInfo
;
setThreadName
(
"syncWrite"
);
uint32_t
interlaceRows
;
if
(
superTblInfo
)
{
...
...
@@ -6513,6 +6516,8 @@ static void *asyncWrite(void *sarg) {
threadInfo
*
pThreadInfo
=
(
threadInfo
*
)
sarg
;
SSuperTable
*
superTblInfo
=
pThreadInfo
->
superTblInfo
;
setThreadName
(
"asyncWrite"
);
pThreadInfo
->
st
=
0
;
pThreadInfo
->
et
=
0
;
pThreadInfo
->
lastTs
=
pThreadInfo
->
start_time
;
...
...
@@ -6913,6 +6918,7 @@ static void *readTable(void *sarg) {
#if 1
threadInfo
*
pThreadInfo
=
(
threadInfo
*
)
sarg
;
TAOS
*
taos
=
pThreadInfo
->
taos
;
setThreadName
(
"readTable"
);
char
command
[
BUFFER_SIZE
]
=
"
\0
"
;
uint64_t
sTime
=
pThreadInfo
->
start_time
;
char
*
tb_prefix
=
pThreadInfo
->
tb_prefix
;
...
...
@@ -6985,6 +6991,7 @@ static void *readMetric(void *sarg) {
#if 1
threadInfo
*
pThreadInfo
=
(
threadInfo
*
)
sarg
;
TAOS
*
taos
=
pThreadInfo
->
taos
;
setThreadName
(
"readMetric"
);
char
command
[
BUFFER_SIZE
]
=
"
\0
"
;
FILE
*
fp
=
fopen
(
pThreadInfo
->
filePath
,
"a"
);
if
(
NULL
==
fp
)
{
...
...
@@ -7161,6 +7168,8 @@ static int insertTestProcess() {
static
void
*
specifiedTableQuery
(
void
*
sarg
)
{
threadInfo
*
pThreadInfo
=
(
threadInfo
*
)
sarg
;
setThreadName
(
"specTableQuery"
);
if
(
pThreadInfo
->
taos
==
NULL
)
{
TAOS
*
taos
=
NULL
;
taos
=
taos_connect
(
g_queryInfo
.
host
,
...
...
@@ -7260,6 +7269,8 @@ static void *superTableQuery(void *sarg) {
char
sqlstr
[
MAX_QUERY_SQL_LENGTH
];
threadInfo
*
pThreadInfo
=
(
threadInfo
*
)
sarg
;
setThreadName
(
"superTableQuery"
);
if
(
pThreadInfo
->
taos
==
NULL
)
{
TAOS
*
taos
=
NULL
;
taos
=
taos_connect
(
g_queryInfo
.
host
,
...
...
@@ -7562,6 +7573,8 @@ static void *superSubscribe(void *sarg) {
TAOS_SUB
*
tsub
[
MAX_QUERY_SQL_COUNT
]
=
{
0
};
uint64_t
tsubSeq
;
setThreadName
(
"superSub"
);
if
(
pThreadInfo
->
ntables
>
MAX_QUERY_SQL_COUNT
)
{
errorPrint
(
"The table number(%"
PRId64
") of the thread is more than max query sql count: %d
\n
"
,
pThreadInfo
->
ntables
,
MAX_QUERY_SQL_COUNT
);
...
...
@@ -7708,6 +7721,8 @@ static void *specifiedSubscribe(void *sarg) {
threadInfo
*
pThreadInfo
=
(
threadInfo
*
)
sarg
;
// TAOS_SUB* tsub = NULL;
setThreadName
(
"specSub"
);
if
(
pThreadInfo
->
taos
==
NULL
)
{
pThreadInfo
->
taos
=
taos_connect
(
g_queryInfo
.
host
,
g_queryInfo
.
user
,
...
...
src/kit/taosdump/taosdump.c
浏览文件 @
4ed51b9a
...
...
@@ -1474,6 +1474,8 @@ static void* taosDumpOutWorkThreadFp(void *arg)
STableRecord
tableRecord
;
int
fd
;
setThreadName
(
"dumpOutWorkThrd"
);
char
tmpBuf
[
4096
]
=
{
0
};
sprintf
(
tmpBuf
,
".tables.tmp.%d"
,
pThread
->
threadIndex
);
fd
=
open
(
tmpBuf
,
O_RDWR
|
O_CREAT
,
S_IRWXU
|
S_IRGRP
|
S_IXGRP
|
S_IROTH
);
...
...
@@ -2571,6 +2573,8 @@ static int taosDumpInOneFile(TAOS* taos, FILE* fp, char* fcharset,
static
void
*
taosDumpInWorkThreadFp
(
void
*
arg
)
{
SThreadParaObj
*
pThread
=
(
SThreadParaObj
*
)
arg
;
setThreadName
(
"dumpInWorkThrd"
);
for
(
int32_t
f
=
0
;
f
<
g_tsSqlFileNum
;
++
f
)
{
if
(
f
%
pThread
->
totalThreads
==
pThread
->
threadIndex
)
{
char
*
SQLFileName
=
g_tsDumpInSqlFiles
[
f
];
...
...
src/mnode/src/mnodeSdb.c
浏览文件 @
4ed51b9a
...
...
@@ -1113,6 +1113,7 @@ static void *sdbWorkerFp(void *pWorker) {
void
*
unUsed
;
taosBlockSIGPIPE
();
setThreadName
(
"sdbWorker"
);
while
(
1
)
{
int32_t
numOfMsgs
=
taosReadAllQitemsFromQset
(
tsSdbWQset
,
tsSdbWQall
,
&
unUsed
);
...
...
src/os/inc/osDef.h
浏览文件 @
4ed51b9a
...
...
@@ -210,6 +210,25 @@ extern "C" {
#define PRIzu "zu"
#endif
#if defined(_TD_LINUX_64) || defined(_TD_LINUX_32) || defined(_TD_MIPS_64) || defined(_TD_ARM_32) || defined(_TD_ARM_64) || defined(_TD_DARWIN_64)
#if defined(_TD_DARWIN_64)
// MacOS
#if !defined(_GNU_SOURCE)
#define setThreadName(name) do { pthread_setname_np((name)); } while (0)
#else
// pthread_setname_np not defined
#define setThreadName(name)
#endif
#else
// Linux, length of name must <= 16 (the last '\0' included)
#define setThreadName(name) do { prctl(PR_SET_NAME, (name)); } while (0)
#endif
#else
// Windows
#define setThreadName(name)
#endif
#ifdef __cplusplus
}
#endif
...
...
src/os/inc/osInc.h
浏览文件 @
4ed51b9a
...
...
@@ -85,6 +85,7 @@ extern "C" {
#include <sys/eventfd.h>
#include <sys/resource.h>
#include <sys/sendfile.h>
#include <sys/prctl.h>
#if !(defined(_ALPINE))
#include <error.h>
...
...
src/os/src/darwin/dwSemaphore.c
浏览文件 @
4ed51b9a
...
...
@@ -41,6 +41,8 @@ static semaphore_t sem_exit;
static
void
*
sem_thread_routine
(
void
*
arg
)
{
(
void
)
arg
;
setThreadName
(
"sem_thrd"
);
sem_port
=
mach_task_self
();
kern_return_t
ret
=
semaphore_create
(
sem_port
,
&
sem_exit
,
SYNC_POLICY_FIFO
,
0
);
if
(
ret
!=
KERN_SUCCESS
)
{
...
...
src/os/src/darwin/dwTimer.c
浏览文件 @
4ed51b9a
...
...
@@ -32,6 +32,7 @@ static volatile int timer_stop = 0;
static
void
*
timer_routine
(
void
*
arg
)
{
(
void
)
arg
;
setThreadName
(
"timer"
);
int
r
=
0
;
struct
timespec
to
=
{
0
};
...
...
src/os/src/detail/osTimer.c
浏览文件 @
4ed51b9a
...
...
@@ -38,6 +38,8 @@ static void *taosProcessAlarmSignal(void *tharg) {
struct
sigevent
sevent
=
{{
0
}};
setThreadName
(
"alarmSignal"
);
#ifdef _ALPINE
sevent
.
sigev_notify
=
SIGEV_THREAD
;
sevent
.
sigev_value
.
sival_int
=
syscall
(
__NR_gettid
);
...
...
src/plugins/http/src/httpQueue.c
浏览文件 @
4ed51b9a
...
...
@@ -70,6 +70,8 @@ static void *httpProcessResultQueue(void *param) {
int32_t
type
;
void
*
unUsed
;
setThreadName
(
"httpResultQ"
);
while
(
1
)
{
if
(
taosReadQitemFromQset
(
tsHttpQset
,
&
type
,
(
void
**
)
&
pMsg
,
&
unUsed
)
==
0
)
{
httpDebug
(
"qset:%p, http queue got no message from qset, exiting"
,
tsHttpQset
);
...
...
src/plugins/http/src/httpServer.c
浏览文件 @
4ed51b9a
...
...
@@ -117,6 +117,7 @@ static void httpProcessHttpData(void *param) {
int32_t
fdNum
;
taosSetMaskSIGPIPE
();
setThreadName
(
"httpData"
);
while
(
1
)
{
struct
epoll_event
events
[
HTTP_MAX_EVENTS
];
...
...
@@ -208,6 +209,7 @@ static void *httpAcceptHttpConnection(void *arg) {
int32_t
totalFds
=
0
;
taosSetMaskSIGPIPE
();
setThreadName
(
"httpAcceptConn"
);
pServer
->
fd
=
taosOpenTcpServerSocket
(
pServer
->
serverIp
,
pServer
->
serverPort
);
...
...
src/plugins/monitor/src/monMain.c
浏览文件 @
4ed51b9a
...
...
@@ -114,6 +114,7 @@ int32_t monStartSystem() {
static
void
*
monThreadFunc
(
void
*
param
)
{
monDebug
(
"starting to initialize monitor module ..."
);
setThreadName
(
"monThrd"
);
while
(
1
)
{
static
int32_t
accessTimes
=
0
;
...
...
src/plugins/mqtt/src/mqttSystem.c
浏览文件 @
4ed51b9a
...
...
@@ -100,6 +100,8 @@ void mqttPublishCallback(void** unused, struct mqtt_response_publish* published)
}
void
*
mqttClientRefresher
(
void
*
client
)
{
setThreadName
(
"mqttCliRefresh"
);
while
(
tsMqttIsRuning
)
{
mqtt_sync
((
struct
mqtt_client
*
)
client
);
taosMsleep
(
100
);
...
...
@@ -141,4 +143,4 @@ void mqttReconnectClient(struct mqtt_client* client, void** unused) {
mqtt_reinit
(
client
,
sockfd
,
tsMqttStatus
.
sendbuf
,
tsMqttStatus
.
sendbufsz
,
tsMqttStatus
.
recvbuf
,
tsMqttStatus
.
recvbufsz
);
mqtt_connect
(
client
,
tsMqttClientId
,
NULL
,
NULL
,
0
,
tsMqttUser
,
tsMqttPass
,
MQTT_CONNECT_CLEAN_SESSION
,
400
);
mqtt_subscribe
(
client
,
tsMqttTopic
,
0
);
}
\ No newline at end of file
}
src/rpc/src/rpcTcp.c
浏览文件 @
4ed51b9a
...
...
@@ -242,6 +242,7 @@ static void *taosAcceptTcpConnection(void *arg) {
pServerObj
=
(
SServerObj
*
)
arg
;
tDebug
(
"%s TCP server is ready, ip:0x%x:%hu"
,
pServerObj
->
label
,
pServerObj
->
ip
,
pServerObj
->
port
);
setThreadName
(
"acceptTcpConn"
);
while
(
1
)
{
socklen_t
addrlen
=
sizeof
(
caddr
);
...
...
@@ -528,6 +529,11 @@ static void *taosProcessTcpData(void *param) {
SFdObj
*
pFdObj
;
struct
epoll_event
events
[
maxEvents
];
SRecvInfo
recvInfo
;
char
name
[
16
];
memset
(
name
,
0
,
sizeof
(
name
));
snprintf
(
name
,
16
,
"%s-tcpData"
,
pThreadObj
->
label
);
setThreadName
(
name
);
while
(
1
)
{
int
fdNum
=
epoll_wait
(
pThreadObj
->
pollFd
,
events
,
maxEvents
,
TAOS_EPOLL_WAIT_TIME
);
...
...
src/rpc/src/rpcUdp.c
浏览文件 @
4ed51b9a
...
...
@@ -195,6 +195,8 @@ static void *taosRecvUdpData(void *param) {
tDebug
(
"%s UDP thread is created, index:%d"
,
pConn
->
label
,
pConn
->
index
);
char
*
msg
=
pConn
->
buffer
;
setThreadName
(
"recvUdpData"
);
while
(
1
)
{
dataLen
=
recvfrom
(
pConn
->
fd
,
pConn
->
buffer
,
RPC_MAX_UDP_SIZE
,
0
,
(
struct
sockaddr
*
)
&
sourceAdd
,
&
addLen
);
if
(
dataLen
<=
0
)
{
...
...
src/rpc/test/rclient.c
浏览文件 @
4ed51b9a
...
...
@@ -47,6 +47,8 @@ static int tcount = 0;
static
void
*
sendRequest
(
void
*
param
)
{
SInfo
*
pInfo
=
(
SInfo
*
)
param
;
SRpcMsg
rpcMsg
=
{
0
};
setThreadName
(
"sendCliReq"
);
tDebug
(
"thread:%d, start to send request"
,
pInfo
->
index
);
...
...
src/rpc/test/rsclient.c
浏览文件 @
4ed51b9a
...
...
@@ -39,8 +39,10 @@ static int terror = 0;
static
void
*
sendRequest
(
void
*
param
)
{
SInfo
*
pInfo
=
(
SInfo
*
)
param
;
SRpcMsg
rpcMsg
,
rspMsg
;
SRpcMsg
rpcMsg
,
rspMsg
;
setThreadName
(
"sendSrvReq"
);
tDebug
(
"thread:%d, start to send request"
,
pInfo
->
index
);
while
(
pInfo
->
numOfReqs
==
0
||
pInfo
->
num
<
pInfo
->
numOfReqs
)
{
...
...
src/sync/src/syncRestore.c
浏览文件 @
4ed51b9a
...
...
@@ -263,6 +263,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
}
void
*
syncRestoreData
(
void
*
param
)
{
setThreadName
(
"syncRestoreData"
);
int64_t
rid
=
(
int64_t
)
param
;
SSyncPeer
*
pPeer
=
syncAcquirePeer
(
rid
);
if
(
pPeer
==
NULL
)
{
...
...
src/sync/src/syncRetrieve.c
浏览文件 @
4ed51b9a
...
...
@@ -415,6 +415,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
}
void
*
syncRetrieveData
(
void
*
param
)
{
setThreadName
(
"syncRetrievData"
);
int64_t
rid
=
(
int64_t
)
param
;
SSyncPeer
*
pPeer
=
syncAcquirePeer
(
rid
);
if
(
pPeer
==
NULL
)
{
...
...
src/sync/src/syncTcp.c
浏览文件 @
4ed51b9a
...
...
@@ -195,6 +195,8 @@ static void *syncProcessTcpData(void *param) {
SConnObj
*
pConn
=
NULL
;
struct
epoll_event
events
[
maxEvents
];
setThreadName
(
"syncTcpData"
);
void
*
buffer
=
malloc
(
pInfo
->
bufferSize
);
taosBlockSIGPIPE
();
...
...
@@ -257,6 +259,7 @@ static void *syncAcceptPeerTcpConnection(void *argv) {
SPoolInfo
*
pInfo
=
&
pPool
->
info
;
taosBlockSIGPIPE
();
setThreadName
(
"acceptTcpConn"
);
while
(
1
)
{
struct
sockaddr_in
clientAddr
;
...
...
src/sync/test/syncClient.c
浏览文件 @
4ed51b9a
...
...
@@ -48,6 +48,8 @@ void *sendRequest(void *param) {
SInfo
*
pInfo
=
(
SInfo
*
)
param
;
SRpcMsg
rpcMsg
=
{
0
};
setThreadName
(
"sendCliReq"
);
uDebug
(
"thread:%d, start to send request"
,
pInfo
->
index
);
while
(
pInfo
->
numOfReqs
==
0
||
pInfo
->
num
<
pInfo
->
numOfReqs
)
{
...
...
src/sync/test/syncServer.c
浏览文件 @
4ed51b9a
...
...
@@ -178,6 +178,8 @@ void *processWriteQueue(void *param) {
int
type
;
void
*
item
;
setThreadName
(
"writeQ"
);
while
(
1
)
{
int
ret
=
taosReadQitem
(
qhandle
,
&
type
,
&
item
);
if
(
ret
<=
0
)
{
...
...
src/tsdb/src/tsdbCommitQueue.c
浏览文件 @
4ed51b9a
...
...
@@ -158,6 +158,8 @@ static void *tsdbLoopCommit(void *arg) {
STsdbRepo
*
pRepo
=
NULL
;
TSDB_REQ_T
req
;
setThreadName
(
"tsdbCommit"
);
while
(
true
)
{
pthread_mutex_lock
(
&
(
pQueue
->
lock
));
...
...
@@ -208,4 +210,4 @@ void tsdbDecCommitRef(int vgId) {
int
refCount
=
atomic_sub_fetch_32
(
&
tsCommitQueue
.
refCount
,
1
);
pthread_cond_broadcast
(
&
(
tsCommitQueue
.
queueNotEmpty
));
tsdbDebug
(
"vgId:%d, dec commit queue ref to %d"
,
vgId
,
refCount
);
}
\ No newline at end of file
}
src/util/src/tcache.c
浏览文件 @
4ed51b9a
...
...
@@ -656,6 +656,8 @@ void* taosCacheTimedRefresh(void *handle) {
return
NULL
;
}
setThreadName
(
"cacheTimedRefre"
);
const
int32_t
SLEEP_DURATION
=
500
;
//500 ms
int64_t
totalTick
=
pCacheObj
->
refreshTime
/
SLEEP_DURATION
;
...
...
src/util/src/tlog.c
浏览文件 @
4ed51b9a
...
...
@@ -178,6 +178,8 @@ static void *taosThreadToOpenNewFile(void *param) {
char
keepName
[
LOG_FILE_NAME_LEN
+
20
];
sprintf
(
keepName
,
"%s.%d"
,
tsLogObj
.
logName
,
tsLogObj
.
flag
);
setThreadName
(
"openNewFile"
);
tsLogObj
.
flag
^=
1
;
tsLogObj
.
lines
=
0
;
char
name
[
LOG_FILE_NAME_LEN
+
20
];
...
...
@@ -687,6 +689,8 @@ static void taosWriteLog(SLogBuff *tLogBuff) {
static
void
*
taosAsyncOutputLog
(
void
*
param
)
{
SLogBuff
*
tLogBuff
=
(
SLogBuff
*
)
param
;
setThreadName
(
"asyncOutputLog"
);
while
(
1
)
{
//tsem_wait(&(tLogBuff->buffNotEmpty));
...
...
src/util/src/tnettest.c
浏览文件 @
4ed51b9a
...
...
@@ -50,7 +50,9 @@ static void *taosNetBindUdpPort(void *sarg) {
struct
sockaddr_in
server_addr
;
struct
sockaddr_in
clientAddr
;
if
((
serverSocket
=
socket
(
AF_INET
,
SOCK_DGRAM
,
IPPROTO_UDP
))
<
0
)
{
setThreadName
(
"netBindUdpPort"
);
if
((
serverSocket
=
socket
(
AF_INET
,
SOCK_DGRAM
,
IPPROTO_UDP
))
<
0
)
{
uError
(
"failed to create UDP socket since %s"
,
strerror
(
errno
));
return
NULL
;
}
...
...
@@ -106,13 +108,15 @@ static void *taosNetBindTcpPort(void *sarg) {
struct
sockaddr_in
server_addr
;
struct
sockaddr_in
clientAddr
;
STestInfo
*
pinfo
=
sarg
;
STestInfo
*
pinfo
=
sarg
;
int32_t
port
=
pinfo
->
port
;
SOCKET
serverSocket
;
int32_t
addr_len
=
sizeof
(
clientAddr
);
SOCKET
client
;
char
buffer
[
BUFFER_SIZE
];
setThreadName
(
"netBindTcpPort"
);
if
((
serverSocket
=
socket
(
AF_INET
,
SOCK_STREAM
,
IPPROTO_TCP
))
<
0
)
{
uError
(
"failed to create TCP socket since %s"
,
strerror
(
errno
));
return
NULL
;
...
...
src/util/src/tnote.c
浏览文件 @
4ed51b9a
...
...
@@ -84,6 +84,8 @@ static void *taosThreadToOpenNewNote(void *param) {
char
name
[
NOTE_FILE_NAME_LEN
*
2
];
SNoteObj
*
pNote
=
(
SNoteObj
*
)
param
;
setThreadName
(
"openNewNote"
);
pNote
->
flag
^=
1
;
pNote
->
lines
=
0
;
sprintf
(
name
,
"%s.%d"
,
pNote
->
name
,
pNote
->
flag
);
...
...
src/util/src/tsched.c
浏览文件 @
4ed51b9a
...
...
@@ -122,6 +122,8 @@ void *taosProcessSchedQueue(void *scheduler) {
SSchedQueue
*
pSched
=
(
SSchedQueue
*
)
scheduler
;
int
ret
=
0
;
setThreadName
(
"schedQ"
);
while
(
1
)
{
if
((
ret
=
tsem_wait
(
&
pSched
->
fullSem
))
!=
0
)
{
uFatal
(
"wait %s fullSem failed(%s)"
,
pSched
->
label
,
strerror
(
errno
));
...
...
@@ -234,4 +236,4 @@ void taosDumpSchedulerStatus(void *qhandle, void *tmrId) {
}
taosTmrReset
(
taosDumpSchedulerStatus
,
DUMP_SCHEDULER_TIME_WINDOW
,
pSched
,
pSched
->
pTmrCtrl
,
&
pSched
->
pTimer
);
}
\ No newline at end of file
}
src/util/tests/trefTest.c
浏览文件 @
4ed51b9a
...
...
@@ -35,6 +35,8 @@ void *addRef(void *param) {
SRefSpace
*
pSpace
=
(
SRefSpace
*
)
param
;
int
id
;
setThreadName
(
"addRef"
);
for
(
int
i
=
0
;
i
<
pSpace
->
steps
;
++
i
)
{
printf
(
"a"
);
id
=
random
()
%
pSpace
->
refNum
;
...
...
@@ -52,6 +54,8 @@ void *removeRef(void *param) {
SRefSpace
*
pSpace
=
(
SRefSpace
*
)
param
;
int
id
,
code
;
setThreadName
(
"removeRef"
);
for
(
int
i
=
0
;
i
<
pSpace
->
steps
;
++
i
)
{
printf
(
"d"
);
id
=
random
()
%
pSpace
->
refNum
;
...
...
@@ -70,6 +74,8 @@ void *acquireRelease(void *param) {
SRefSpace
*
pSpace
=
(
SRefSpace
*
)
param
;
int
id
;
setThreadName
(
"acquireRelease"
);
for
(
int
i
=
0
;
i
<
pSpace
->
steps
;
++
i
)
{
printf
(
"a"
);
...
...
@@ -91,6 +97,8 @@ void myfree(void *p) {
void
*
openRefSpace
(
void
*
param
)
{
SRefSpace
*
pSpace
=
(
SRefSpace
*
)
param
;
setThreadName
(
"openRefSpace"
);
printf
(
"c"
);
pSpace
->
rsetId
=
taosOpenRef
(
50
,
myfree
);
...
...
src/vnode/src/vnodeBackup.c
浏览文件 @
4ed51b9a
...
...
@@ -61,6 +61,8 @@ static void vnodeProcessBackupMsg(SVBackupMsg *pMsg) {
}
static
void
*
vnodeBackupFunc
(
void
*
param
)
{
setThreadName
(
"vnodeBackup"
);
while
(
1
)
{
SVBackupMsg
*
pMsg
=
NULL
;
if
(
taosReadQitemFromQset
(
tsVBackupQset
,
NULL
,
(
void
**
)
&
pMsg
,
NULL
)
==
0
)
{
...
...
src/vnode/src/vnodeWorker.c
浏览文件 @
4ed51b9a
...
...
@@ -188,6 +188,8 @@ static void vnodeProcessMWorkerMsg(SVMWorkerMsg *pMsg) {
}
static
void
*
vnodeMWorkerFunc
(
void
*
param
)
{
setThreadName
(
"vnodeMWorker"
);
while
(
1
)
{
SVMWorkerMsg
*
pMsg
=
NULL
;
if
(
taosReadQitemFromQset
(
tsVMWorkerQset
,
NULL
,
(
void
**
)
&
pMsg
,
NULL
)
==
0
)
{
...
...
src/wal/src/walMgmt.c
浏览文件 @
4ed51b9a
...
...
@@ -192,6 +192,7 @@ static void walFsyncAll() {
static
void
*
walThreadFunc
(
void
*
param
)
{
int
stop
=
0
;
setThreadName
(
"walThrd"
);
while
(
1
)
{
walUpdateSeq
();
walFsyncAll
();
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录