Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
97024f14
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
97024f14
编写于
2月 24, 2023
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix(query): fix coverity issue.
上级
6d216ec6
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
80 addition
and
35 deletion
+80
-35
examples/c/tmq.c
examples/c/tmq.c
+32
-8
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+6
-1
source/dnode/vnode/src/tsdb/tsdbCacheRead.c
source/dnode/vnode/src/tsdb/tsdbCacheRead.c
+4
-2
source/dnode/vnode/src/tsdb/tsdbDataIter.c
source/dnode/vnode/src/tsdb/tsdbDataIter.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbFile.c
source/dnode/vnode/src/tsdb/tsdbFile.c
+2
-2
source/dnode/vnode/src/tsdb/tsdbMemTable.c
source/dnode/vnode/src/tsdb/tsdbMemTable.c
+3
-1
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+2
-2
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+0
-2
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+5
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+2
-2
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+6
-5
source/libs/executor/src/sysscanoperator.c
source/libs/executor/src/sysscanoperator.c
+1
-3
source/libs/executor/src/tsort.c
source/libs/executor/src/tsort.c
+1
-0
source/util/src/tpagedbuf.c
source/util/src/tpagedbuf.c
+13
-3
source/util/src/tutil.c
source/util/src/tutil.c
+1
-1
utils/test/c/tmqSim.c
utils/test/c/tmqSim.c
+1
-1
未找到文件。
examples/c/tmq.c
浏览文件 @
97024f14
...
...
@@ -191,21 +191,45 @@ tmq_t* build_consumer() {
tmq_conf_res_t
code
;
tmq_conf_t
*
conf
=
tmq_conf_new
();
code
=
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"true"
);
if
(
TMQ_CONF_OK
!=
code
)
return
NULL
;
if
(
TMQ_CONF_OK
!=
code
)
{
tmq_conf_destroy
(
conf
);
return
NULL
;
}
code
=
tmq_conf_set
(
conf
,
"auto.commit.interval.ms"
,
"1000"
);
if
(
TMQ_CONF_OK
!=
code
)
return
NULL
;
if
(
TMQ_CONF_OK
!=
code
)
{
tmq_conf_destroy
(
conf
);
return
NULL
;
}
code
=
tmq_conf_set
(
conf
,
"group.id"
,
"cgrpName"
);
if
(
TMQ_CONF_OK
!=
code
)
return
NULL
;
if
(
TMQ_CONF_OK
!=
code
)
{
tmq_conf_destroy
(
conf
);
return
NULL
;
}
code
=
tmq_conf_set
(
conf
,
"client.id"
,
"user defined name"
);
if
(
TMQ_CONF_OK
!=
code
)
return
NULL
;
if
(
TMQ_CONF_OK
!=
code
)
{
tmq_conf_destroy
(
conf
);
return
NULL
;
}
code
=
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
if
(
TMQ_CONF_OK
!=
code
)
return
NULL
;
if
(
TMQ_CONF_OK
!=
code
)
{
tmq_conf_destroy
(
conf
);
return
NULL
;
}
code
=
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
if
(
TMQ_CONF_OK
!=
code
)
return
NULL
;
if
(
TMQ_CONF_OK
!=
code
)
{
tmq_conf_destroy
(
conf
);
return
NULL
;
}
code
=
tmq_conf_set
(
conf
,
"auto.offset.reset"
,
"earliest"
);
if
(
TMQ_CONF_OK
!=
code
)
return
NULL
;
if
(
TMQ_CONF_OK
!=
code
)
{
tmq_conf_destroy
(
conf
);
return
NULL
;
}
code
=
tmq_conf_set
(
conf
,
"experimental.snapshot.enable"
,
"false"
);
if
(
TMQ_CONF_OK
!=
code
)
return
NULL
;
if
(
TMQ_CONF_OK
!=
code
)
{
tmq_conf_destroy
(
conf
);
return
NULL
;
}
tmq_conf_set_auto_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
...
...
source/common/src/tdatablock.c
浏览文件 @
97024f14
...
...
@@ -1033,6 +1033,7 @@ SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock*
offset
+=
pInfo
->
pColData
->
info
.
bytes
;
}
taosMemoryFree
(
buf
);
return
phelper
;
}
...
...
@@ -2370,7 +2371,11 @@ _end:
taosArrayDestroy
(
pVals
);
if
(
terrno
!=
0
)
{
*
ppReq
=
NULL
;
if
(
pReq
)
tDestroySSubmitReq2
(
pReq
,
TSDB_MSG_FLG_ENCODE
);
if
(
pReq
)
{
tDestroySSubmitReq2
(
pReq
,
TSDB_MSG_FLG_ENCODE
);
taosMemoryFreeClear
(
pReq
);
}
return
TSDB_CODE_FAILED
;
}
*
ppReq
=
pReq
;
...
...
source/dnode/vnode/src/tsdb/tsdbCacheRead.c
浏览文件 @
97024f14
...
...
@@ -433,8 +433,10 @@ _end:
tsdbUntakeReadSnap
((
STsdbReader
*
)
pr
,
pr
->
pReadSnap
,
true
);
taosThreadMutexUnlock
(
&
pr
->
readerMutex
);
for
(
int32_t
j
=
0
;
j
<
pr
->
numOfCols
;
++
j
)
{
taosMemoryFree
(
pRes
[
j
]);
if
(
pRes
!=
NULL
)
{
for
(
int32_t
j
=
0
;
j
<
pr
->
numOfCols
;
++
j
)
{
taosMemoryFree
(
pRes
[
j
]);
}
}
taosMemoryFree
(
pRes
);
...
...
source/dnode/vnode/src/tsdb/tsdbDataIter.c
浏览文件 @
97024f14
...
...
@@ -219,7 +219,7 @@ static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo*
}
ASSERT
(
pIter
->
rowInfo
.
suid
==
pIter
->
dIter
.
bData
.
suid
);
ASSERT
(
pIter
->
rowInfo
.
uid
=
pIter
->
dIter
.
bData
.
uid
);
ASSERT
(
pIter
->
rowInfo
.
uid
=
=
pIter
->
dIter
.
bData
.
uid
);
pIter
->
rowInfo
.
row
=
tsdbRowFromBlockData
(
&
pIter
->
dIter
.
bData
,
pIter
->
dIter
.
iRow
);
pIter
->
dIter
.
iRow
++
;
goto
_exit
;
...
...
source/dnode/vnode/src/tsdb/tsdbFile.c
浏览文件 @
97024f14
...
...
@@ -148,10 +148,10 @@ bool tsdbDelFileIsSame(SDelFile *pDelFile1, SDelFile *pDelFile2) { return pDelFi
int32_t
tsdbDFileRollback
(
STsdb
*
pTsdb
,
SDFileSet
*
pSet
,
EDataFileT
ftype
)
{
int32_t
code
=
0
;
int64_t
size
;
int64_t
size
=
0
;
int64_t
n
;
TdFilePtr
pFD
;
char
fname
[
TSDB_FILENAME_LEN
];
char
fname
[
TSDB_FILENAME_LEN
]
=
{
0
}
;
char
hdr
[
TSDB_FHDR_SIZE
]
=
{
0
};
// truncate
...
...
source/dnode/vnode/src/tsdb/tsdbMemTable.c
浏览文件 @
97024f14
...
...
@@ -473,7 +473,7 @@ static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListN
int8_t
forward
)
{
int32_t
code
=
0
;
int8_t
level
;
SMemSkipListNode
*
pNode
;
SMemSkipListNode
*
pNode
=
NULL
;
SVBufPool
*
pPool
=
pMemTable
->
pTsdb
->
pVnode
->
inUse
;
int64_t
nSize
;
...
...
@@ -591,7 +591,9 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
pBlockData
->
aColData
=
vnodeBufPoolMalloc
(
pPool
,
sizeof
(
SColData
)
*
pBlockData
->
nColData
);
if
(
pBlockData
->
aColData
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_exit
;
}
for
(
int32_t
iColData
=
0
;
iColData
<
pBlockData
->
nColData
;
++
iColData
)
{
code
=
tColDataCopy
(
&
aColData
[
iColData
+
1
],
&
pBlockData
->
aColData
[
iColData
],
(
xMallocFn
)
vnodeBufPoolMalloc
,
pPool
);
if
(
code
)
goto
_exit
;
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
97024f14
...
...
@@ -874,7 +874,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
pBlockNum
->
numOfBlocks
+=
1
;
}
if
(
pScanInfo
->
pBlockList
!=
NULL
&&
taosArrayGetSize
(
pScanInfo
->
pBlockList
)
>
0
)
{
if
(
(
pScanInfo
->
pBlockList
!=
NULL
)
&&
(
taosArrayGetSize
(
pScanInfo
->
pBlockList
)
>
0
)
)
{
numOfQTable
+=
1
;
}
}
...
...
@@ -4532,7 +4532,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
SSDataBlock
*
pResBlock
=
pReader
->
pResBlock
;
if
(
pResBlock
->
pBlockAgg
==
NULL
)
{
size_t
num
=
taosArrayGetSize
(
pResBlock
->
pDataBlock
);
pResBlock
->
pBlockAgg
=
taosMemoryCalloc
(
num
,
sizeof
(
SColumnDataAgg
)
);
pResBlock
->
pBlockAgg
=
taosMemoryCalloc
(
num
,
POINTER_BYTES
);
}
// do fill all null column value SMA info
...
...
source/libs/executor/src/executil.c
浏览文件 @
97024f14
...
...
@@ -1148,7 +1148,6 @@ int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode,
if
(
TSDB_CODE_SUCCESS
==
code
)
{
REPLACE_NODE
(
pNew
);
}
else
{
taosMemoryFree
(
keyBuf
);
nodesDestroyList
(
groupNew
);
metaReaderClear
(
&
mr
);
return
code
;
...
...
@@ -1166,7 +1165,6 @@ int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode,
if
(
pValue
->
node
.
resType
.
type
==
TSDB_DATA_TYPE_JSON
)
{
if
(
tTagIsJson
(
data
))
{
terrno
=
TSDB_CODE_QRY_JSON_IN_GROUP_ERROR
;
taosMemoryFree
(
keyBuf
);
nodesDestroyList
(
groupNew
);
metaReaderClear
(
&
mr
);
return
terrno
;
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
97024f14
...
...
@@ -2035,7 +2035,11 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode,
tDecoderClear
(
&
mr
.
coder
);
tb_uid_t
suid
=
mr
.
me
.
ctbEntry
.
suid
;
metaGetTableEntryByUidCache
(
&
mr
,
suid
);
code
=
metaGetTableEntryByUidCache
(
&
mr
,
suid
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
terrno
;
}
pSchemaInfo
->
sw
=
tCloneSSchemaWrapper
(
&
mr
.
me
.
stbEntry
.
schemaRow
);
pSchemaInfo
->
tversion
=
mr
.
me
.
stbEntry
.
schemaTag
.
version
;
}
else
{
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
97024f14
...
...
@@ -3050,8 +3050,8 @@ int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCoun
}
}
}
else
{
strncpy
(
supp
->
dbNameFilter
,
tNameGetDbNameP
(
tableName
),
TSDB_DB_NAME_LEN
);
strncpy
(
supp
->
stbNameFilter
,
tNameGetTableName
(
tableName
),
TSDB_TABLE_NAME_LEN
);
t
strncpy
(
supp
->
dbNameFilter
,
tNameGetDbNameP
(
tableName
),
TSDB_DB_NAME_LEN
);
t
strncpy
(
supp
->
stbNameFilter
,
tNameGetTableName
(
tableName
),
TSDB_TABLE_NAME_LEN
);
}
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
97024f14
...
...
@@ -46,8 +46,9 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
pOperator
->
pTaskInfo
=
pTaskInfo
;
SDataBlockDescNode
*
pDescNode
=
pSortNode
->
node
.
pOutputDataBlockDesc
;
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pSortNode
->
pExprs
,
NULL
,
&
numOfCols
);
int32_t
numOfCols
=
0
;
pOperator
->
exprSupp
.
pExprInfo
=
createExprInfo
(
pSortNode
->
pExprs
,
NULL
,
&
numOfCols
);
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
int32_t
numOfOutputCols
=
0
;
int32_t
code
=
...
...
@@ -56,7 +57,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
goto
_error
;
}
pOperator
->
exprSupp
.
pCtx
=
createSqlFunctionCtx
(
pExprInfo
,
numOfCols
,
&
pOperator
->
exprSupp
.
rowEntryInfoOffset
);
pOperator
->
exprSupp
.
pCtx
=
createSqlFunctionCtx
(
pOperator
->
exprSupp
.
pExprInfo
,
numOfCols
,
&
pOperator
->
exprSupp
.
rowEntryInfoOffset
);
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
1024
);
code
=
filterInitFromNode
((
SNode
*
)
pSortNode
->
node
.
pConditions
,
&
pOperator
->
exprSupp
.
pFilterInfo
,
0
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -68,8 +70,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
initLimitInfo
(
pSortNode
->
node
.
pLimit
,
pSortNode
->
node
.
pSlimit
,
&
pInfo
->
limitInfo
);
setOperatorInfo
(
pOperator
,
"SortOperator"
,
QUERY_NODE_PHYSICAL_PLAN_SORT
,
true
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
// lazy evaluation for the following parameter since the input datablock is not known till now.
// pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2;
...
...
source/libs/executor/src/sysscanoperator.c
浏览文件 @
97024f14
...
...
@@ -1608,9 +1608,7 @@ static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScan
if
(
pInfo
->
tbnameSlotId
!=
-
1
)
{
SColumnInfoData
*
pColumnInfoData
=
(
SColumnInfoData
*
)
taosArrayGet
(
pBlock
->
pDataBlock
,
pInfo
->
tbnameSlotId
);
char
varTbName
[
TSDB_TABLE_FNAME_LEN
-
1
+
VARSTR_HEADER_SIZE
]
=
{
0
};
memcpy
(
varDataVal
(
varTbName
),
name
,
strlen
(
name
));
varDataSetLen
(
varTbName
,
strlen
(
name
));
STR_TO_VARSTR
(
varTbName
,
name
);
colDataAppendNItems
(
pColumnInfoData
,
0
,
varTbName
,
pBlock
->
info
.
rows
);
}
...
...
source/libs/executor/src/tsort.c
浏览文件 @
97024f14
...
...
@@ -212,6 +212,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
int32_t
pageId
=
-
1
;
void
*
pPage
=
getNewBufPage
(
pHandle
->
pBuf
,
&
pageId
);
if
(
pPage
==
NULL
)
{
taosArrayDestroy
(
pPageIdList
);
blockDataDestroy
(
p
);
return
terrno
;
}
...
...
source/util/src/tpagedbuf.c
浏览文件 @
97024f14
...
...
@@ -391,7 +391,7 @@ _error:
return
TSDB_CODE_OUT_OF_MEMORY
;
}
static
char
*
doExtractPage
(
SDiskbasedBuf
*
pBuf
)
{
static
char
*
doExtractPage
(
SDiskbasedBuf
*
pBuf
,
bool
*
newPage
)
{
char
*
availablePage
=
NULL
;
if
(
NO_IN_MEM_AVAILABLE_PAGES
(
pBuf
))
{
availablePage
=
evictBufPage
(
pBuf
);
...
...
@@ -405,6 +405,7 @@ static char* doExtractPage(SDiskbasedBuf* pBuf) {
if
(
availablePage
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
}
*
newPage
=
true
;
}
return
availablePage
;
...
...
@@ -413,7 +414,8 @@ static char* doExtractPage(SDiskbasedBuf* pBuf) {
void
*
getNewBufPage
(
SDiskbasedBuf
*
pBuf
,
int32_t
*
pageId
)
{
pBuf
->
statis
.
getPages
+=
1
;
char
*
availablePage
=
doExtractPage
(
pBuf
);
bool
newPage
=
false
;
char
*
availablePage
=
doExtractPage
(
pBuf
,
&
newPage
);
if
(
availablePage
==
NULL
)
{
return
NULL
;
}
...
...
@@ -432,6 +434,9 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
// register page id info
pi
=
registerNewPageInfo
(
pBuf
,
*
pageId
);
if
(
pi
==
NULL
)
{
if
(
newPage
)
{
taosMemoryFree
(
availablePage
);
}
return
NULL
;
}
...
...
@@ -492,7 +497,8 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
ASSERT
((
!
BUF_PAGE_IN_MEM
(
*
pi
))
&&
(
*
pi
)
->
pn
==
NULL
&&
(((
*
pi
)
->
length
>=
0
&&
(
*
pi
)
->
offset
>=
0
)
||
((
*
pi
)
->
length
==
-
1
&&
(
*
pi
)
->
offset
==
-
1
)));
(
*
pi
)
->
pData
=
doExtractPage
(
pBuf
);
bool
newPage
=
false
;
(
*
pi
)
->
pData
=
doExtractPage
(
pBuf
,
&
newPage
);
// failed to evict buffer page, return with error code.
if
((
*
pi
)
->
pData
==
NULL
)
{
...
...
@@ -509,6 +515,10 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
if
(
HAS_DATA_IN_DISK
(
*
pi
))
{
int32_t
code
=
loadPageFromDisk
(
pBuf
,
*
pi
);
if
(
code
!=
0
)
{
if
(
newPage
)
{
taosMemoryFree
((
*
pi
)
->
pData
);
}
terrno
=
code
;
return
NULL
;
}
...
...
source/util/src/tutil.c
浏览文件 @
97024f14
...
...
@@ -118,7 +118,7 @@ char **strsplit(char *z, const char *delim, int32_t *num) {
if
((
*
num
)
>=
size
)
{
size
=
(
size
<<
1
);
split
=
taosMemoryRealloc
(
split
,
POINTER_BYTES
*
size
);
ASSERTS
(
NULL
!=
split
,
"realloc memory failed. size=%d"
,
POINTER_BYTES
*
size
);
ASSERTS
(
NULL
!=
split
,
"realloc memory failed. size=%d"
,
(
int32_t
)
POINTER_BYTES
*
size
);
}
}
...
...
utils/test/c/tmqSim.c
浏览文件 @
97024f14
...
...
@@ -1381,7 +1381,7 @@ void startOmbConsume() {
printf
(
"SQL: %s
\n
"
,
sql
);
queryDbExec
(
taos
,
sql
,
NO_INSERT_TYPE
);
int32_t
producerRate
=
ceil
(
g_stConfInfo
.
producerRate
/
g_stConfInfo
.
producers
);
int32_t
producerRate
=
ceil
(
((
double
)
g_stConfInfo
.
producerRate
)
/
g_stConfInfo
.
producers
);
printf
(
"==== create %d produce thread ====
\n
"
,
g_stConfInfo
.
producers
);
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
producers
;
++
i
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录