Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
99fd3284
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
99fd3284
编写于
10月 18, 2022
作者:
D
dapan1121
提交者:
GitHub
10月 18, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #17439 from taosdata/fix/coverity.pw
fix: fix coverity issues
上级
c77bc2b9
0fe14f0b
变更
19
显示空白变更内容
内联
并排
Showing
19 changed file
with
113 addition
and
50 deletion
+113
-50
include/util/tdef.h
include/util/tdef.h
+1
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+12
-4
source/client/src/clientMain.c
source/client/src/clientMain.c
+0
-1
source/client/src/clientMsgHandler.c
source/client/src/clientMsgHandler.c
+5
-2
source/client/src/clientStmt.c
source/client/src/clientStmt.c
+1
-1
source/dnode/mnode/impl/src/mndQuery.c
source/dnode/mnode/impl/src/mndQuery.c
+25
-0
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+7
-2
source/dnode/vnode/src/vnd/vnodeQuery.c
source/dnode/vnode/src/vnd/vnodeQuery.c
+5
-0
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+5
-3
source/libs/catalog/src/ctgAsync.c
source/libs/catalog/src/ctgAsync.c
+2
-2
source/libs/catalog/src/ctgCache.c
source/libs/catalog/src/ctgCache.c
+22
-21
source/libs/command/src/command.c
source/libs/command/src/command.c
+6
-1
source/libs/executor/src/dataDispatcher.c
source/libs/executor/src/dataDispatcher.c
+7
-1
source/libs/executor/src/dataInserter.c
source/libs/executor/src/dataInserter.c
+1
-1
source/libs/qcom/src/queryUtil.c
source/libs/qcom/src/queryUtil.c
+1
-2
source/libs/qcom/src/querymsg.c
source/libs/qcom/src/querymsg.c
+6
-6
source/libs/qworker/src/qwUtil.c
source/libs/qworker/src/qwUtil.c
+5
-1
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+0
-1
source/libs/scheduler/src/schTask.c
source/libs/scheduler/src/schTask.c
+2
-1
未找到文件。
include/util/tdef.h
浏览文件 @
99fd3284
...
...
@@ -498,6 +498,7 @@ enum {
#define MAX_NUM_STR_SIZE 40
#define MAX_META_MSG_IN_BATCH 1048576
#define MAX_META_BATCH_RSP_SIZE (1 * 1048576 * 1024)
#ifdef __cplusplus
}
...
...
source/client/src/clientImpl.c
浏览文件 @
99fd3284
...
...
@@ -199,7 +199,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
if
(
tsQueryUseNodeAllocator
&&
!
qIsInsertValuesSql
((
*
pRequest
)
->
sqlstr
,
(
*
pRequest
)
->
sqlLen
))
{
if
(
TSDB_CODE_SUCCESS
!=
nodesCreateAllocator
((
*
pRequest
)
->
requestId
,
tsQueryNodeChunkSize
,
&
((
*
pRequest
)
->
allocatorRefId
)))
{
tscError
(
"%d failed to create node allocator, reqId:0x%"
PRIx64
", conn:%
d
, %s"
,
(
*
pRequest
)
->
self
,
tscError
(
"%d failed to create node allocator, reqId:0x%"
PRIx64
", conn:%
"
PRId64
"
, %s"
,
(
*
pRequest
)
->
self
,
(
*
pRequest
)
->
requestId
,
pTscObj
->
id
,
sql
);
destroyRequest
(
*
pRequest
);
...
...
@@ -955,8 +955,13 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
switch
(
pQuery
->
execMode
)
{
case
QUERY_EXEC_MODE_LOCAL
:
if
(
!
pRequest
->
validateOnly
)
{
if
(
NULL
==
pQuery
->
pRoot
)
{
terrno
=
TSDB_CODE_INVALID_PARA
;
code
=
terrno
;
}
else
{
code
=
execLocalCmd
(
pRequest
,
pQuery
);
}
}
break
;
case
QUERY_EXEC_MODE_RPC
:
if
(
!
pRequest
->
validateOnly
)
{
...
...
@@ -997,7 +1002,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
handleQueryExecRsp
(
pRequest
);
if
(
NULL
!=
pRequest
&&
TSDB_CODE_SUCCESS
!=
code
)
{
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
pRequest
->
code
=
terrno
;
}
...
...
@@ -2254,7 +2259,10 @@ void syncQueryFn(void* param, void* res, int32_t code) {
void
taosAsyncQueryImpl
(
uint64_t
connId
,
const
char
*
sql
,
__taos_async_fn_t
fp
,
void
*
param
,
bool
validateOnly
)
{
if
(
sql
==
NULL
||
NULL
==
fp
)
{
terrno
=
TSDB_CODE_INVALID_PARA
;
if
(
fp
)
{
fp
(
param
,
NULL
,
terrno
);
}
return
;
}
...
...
source/client/src/clientMain.c
浏览文件 @
99fd3284
...
...
@@ -944,7 +944,6 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
if
(
pResultInfo
->
completed
)
{
// it is a local executed query, no need to do async fetch
if
(
QUERY_EXEC_MODE_LOCAL
==
pRequest
->
body
.
execMode
)
{
ASSERT
(
pResultInfo
->
numOfRows
>=
0
);
if
(
pResultInfo
->
localResultFetched
)
{
pResultInfo
->
numOfRows
=
0
;
pResultInfo
->
current
=
0
;
...
...
source/client/src/clientMsgHandler.c
浏览文件 @
99fd3284
...
...
@@ -292,9 +292,11 @@ int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
tDeserializeSDropDbRsp
(
pMsg
->
pData
,
pMsg
->
len
,
&
dropdbRsp
);
struct
SCatalog
*
pCatalog
=
NULL
;
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
int32_t
code
=
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
catalogRemoveDB
(
pCatalog
,
dropdbRsp
.
db
,
dropdbRsp
.
uid
);
}
}
taosMemoryFree
(
pMsg
->
pData
);
taosMemoryFree
(
pMsg
->
pEpSet
);
...
...
@@ -397,6 +399,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
size_t
rspSize
=
sizeof
(
SRetrieveTableRsp
)
+
blockGetEncodeSize
(
pBlock
);
*
pRsp
=
taosMemoryCalloc
(
1
,
rspSize
);
if
(
NULL
==
*
pRsp
)
{
blockDataDestroy
(
pBlock
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
source/client/src/clientStmt.c
浏览文件 @
99fd3284
...
...
@@ -152,7 +152,7 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags,
pStmt
->
bInfo
.
tbType
=
pTableMeta
->
tableType
;
pStmt
->
bInfo
.
boundTags
=
tags
;
pStmt
->
bInfo
.
tagsCached
=
false
;
strcpy
(
pStmt
->
bInfo
.
stbFName
,
sTableName
);
tstrncpy
(
pStmt
->
bInfo
.
stbFName
,
sTableName
,
sizeof
(
pStmt
->
bInfo
.
stbFName
)
);
return
TSDB_CODE_SUCCESS
;
}
...
...
source/dnode/mnode/impl/src/mndQuery.c
浏览文件 @
99fd3284
...
...
@@ -90,14 +90,39 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
}
for
(
int32_t
i
=
0
;
i
<
msgNum
;
++
i
)
{
if
(
offset
>=
pMsg
->
contLen
)
{
mError
(
"offset %d is bigger than contLen %d"
,
offset
,
pMsg
->
contLen
);
terrno
=
TSDB_CODE_MSG_NOT_PROCESSED
;
taosArrayDestroy
(
batchRsp
);
return
-
1
;
}
req
.
msgIdx
=
ntohl
(
*
(
int32_t
*
)((
char
*
)
pMsg
->
pCont
+
offset
));
offset
+=
sizeof
(
req
.
msgIdx
);
if
(
offset
>=
pMsg
->
contLen
)
{
mError
(
"offset %d is bigger than contLen %d"
,
offset
,
pMsg
->
contLen
);
terrno
=
TSDB_CODE_MSG_NOT_PROCESSED
;
taosArrayDestroy
(
batchRsp
);
return
-
1
;
}
req
.
msgType
=
ntohl
(
*
(
int32_t
*
)((
char
*
)
pMsg
->
pCont
+
offset
));
offset
+=
sizeof
(
req
.
msgType
);
if
(
offset
>=
pMsg
->
contLen
)
{
mError
(
"offset %d is bigger than contLen %d"
,
offset
,
pMsg
->
contLen
);
terrno
=
TSDB_CODE_MSG_NOT_PROCESSED
;
taosArrayDestroy
(
batchRsp
);
return
-
1
;
}
req
.
msgLen
=
ntohl
(
*
(
int32_t
*
)((
char
*
)
pMsg
->
pCont
+
offset
));
offset
+=
sizeof
(
req
.
msgLen
);
if
(
offset
>=
pMsg
->
contLen
)
{
mError
(
"offset %d is bigger than contLen %d"
,
offset
,
pMsg
->
contLen
);
terrno
=
TSDB_CODE_MSG_NOT_PROCESSED
;
taosArrayDestroy
(
batchRsp
);
return
-
1
;
}
req
.
msg
=
(
char
*
)
pMsg
->
pCont
+
offset
;
offset
+=
req
.
msgLen
;
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
99fd3284
...
...
@@ -2553,12 +2553,17 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
char
rollup
[
160
+
VARSTR_HEADER_SIZE
]
=
{
0
};
int32_t
rollupNum
=
(
int32_t
)
taosArrayGetSize
(
pStb
->
pFuncs
);
char
*
sep
=
", "
;
int32_t
sepLen
=
strlen
(
sep
);
int32_t
rollupLen
=
sizeof
(
rollup
)
-
2
;
for
(
int32_t
i
=
0
;
i
<
rollupNum
;
++
i
)
{
char
*
funcName
=
taosArrayGet
(
pStb
->
pFuncs
,
i
);
if
(
i
)
{
strcat
(
varDataVal
(
rollup
),
", "
);
strncat
(
varDataVal
(
rollup
),
sep
,
rollupLen
);
rollupLen
-=
sepLen
;
}
strcat
(
varDataVal
(
rollup
),
funcName
);
strncat
(
varDataVal
(
rollup
),
funcName
,
rollupLen
);
rollupLen
-=
strlen
(
funcName
);
}
varDataSetLen
(
rollup
,
strlen
(
varDataVal
(
rollup
)));
...
...
source/dnode/vnode/src/vnd/vnodeQuery.c
浏览文件 @
99fd3284
...
...
@@ -330,6 +330,11 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
rspSize
+=
sizeof
(
int32_t
);
offset
=
0
;
if
(
rspSize
>
MAX_META_BATCH_RSP_SIZE
)
{
code
=
TSDB_CODE_INVALID_MSG_LEN
;
goto
_exit
;
}
pRsp
=
rpcMallocCont
(
rspSize
);
if
(
pRsp
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
99fd3284
...
...
@@ -302,8 +302,10 @@ int32_t ctgUpdateTbMeta(SCatalog* pCtg, STableMetaRsp* rspMsg, bool syncOp) {
_return:
if
(
output
)
{
taosMemoryFreeClear
(
output
->
tbMeta
);
taosMemoryFreeClear
(
output
);
}
CTG_RET
(
code
);
}
...
...
source/libs/catalog/src/ctgAsync.c
浏览文件 @
99fd3284
...
...
@@ -252,7 +252,7 @@ int32_t ctgInitGetIndexTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SCtgIndexCtx
*
ctx
=
task
.
taskCtx
;
strcpy
(
ctx
->
indexFName
,
name
);
tstrncpy
(
ctx
->
indexFName
,
name
,
sizeof
(
ctx
->
indexFName
)
);
taosArrayPush
(
pJob
->
pTasks
,
&
task
);
...
...
@@ -277,7 +277,7 @@ int32_t ctgInitGetUdfTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SCtgUdfCtx
*
ctx
=
task
.
taskCtx
;
strcpy
(
ctx
->
udfName
,
name
);
tstrncpy
(
ctx
->
udfName
,
name
,
sizeof
(
ctx
->
udfName
)
);
taosArrayPush
(
pJob
->
pTasks
,
&
task
);
...
...
source/libs/catalog/src/ctgCache.c
浏览文件 @
99fd3284
...
...
@@ -660,7 +660,7 @@ int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId)
}
msg
->
pCtg
=
pCtg
;
strncpy
(
msg
->
dbFName
,
dbFName
,
sizeof
(
msg
->
dbFName
));
t
strncpy
(
msg
->
dbFName
,
dbFName
,
sizeof
(
msg
->
dbFName
));
msg
->
dbId
=
dbId
;
op
->
data
=
msg
;
...
...
@@ -693,7 +693,7 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp)
}
msg
->
pCtg
=
pCtg
;
strncpy
(
msg
->
dbFName
,
dbFName
,
sizeof
(
msg
->
dbFName
));
t
strncpy
(
msg
->
dbFName
,
dbFName
,
sizeof
(
msg
->
dbFName
));
op
->
data
=
msg
;
...
...
@@ -721,8 +721,8 @@ int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId,
}
msg
->
pCtg
=
pCtg
;
strncpy
(
msg
->
dbFName
,
dbFName
,
sizeof
(
msg
->
dbFName
));
strncpy
(
msg
->
stbName
,
stbName
,
sizeof
(
msg
->
stbName
));
t
strncpy
(
msg
->
dbFName
,
dbFName
,
sizeof
(
msg
->
dbFName
));
t
strncpy
(
msg
->
stbName
,
stbName
,
sizeof
(
msg
->
stbName
));
msg
->
dbId
=
dbId
;
msg
->
suid
=
suid
;
...
...
@@ -751,8 +751,8 @@ int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId,
}
msg
->
pCtg
=
pCtg
;
strncpy
(
msg
->
dbFName
,
dbFName
,
sizeof
(
msg
->
dbFName
));
strncpy
(
msg
->
tbName
,
tbName
,
sizeof
(
msg
->
tbName
));
t
strncpy
(
msg
->
dbFName
,
dbFName
,
sizeof
(
msg
->
dbFName
));
t
strncpy
(
msg
->
tbName
,
tbName
,
sizeof
(
msg
->
tbName
));
msg
->
dbId
=
dbId
;
op
->
data
=
msg
;
...
...
@@ -785,7 +785,7 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId
dbFName
=
p
+
1
;
}
strncpy
(
msg
->
dbFName
,
dbFName
,
sizeof
(
msg
->
dbFName
));
t
strncpy
(
msg
->
dbFName
,
dbFName
,
sizeof
(
msg
->
dbFName
));
msg
->
pCtg
=
pCtg
;
msg
->
dbId
=
dbId
;
msg
->
dbInfo
=
dbInfo
;
...
...
@@ -817,7 +817,8 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog *pCtg, STableMetaOutput *output, bool sy
char
*
p
=
strchr
(
output
->
dbFName
,
'.'
);
if
(
p
&&
IS_SYS_DBNAME
(
p
+
1
))
{
memmove
(
output
->
dbFName
,
p
+
1
,
strlen
(
p
+
1
));
int32_t
len
=
strlen
(
p
+
1
);
memmove
(
output
->
dbFName
,
p
+
1
,
len
>=
TSDB_DB_FNAME_LEN
?
TSDB_DB_FNAME_LEN
-
1
:
len
);
}
msg
->
pCtg
=
pCtg
;
...
...
@@ -852,7 +853,7 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEp
}
msg
->
pCtg
=
pCtg
;
strcpy
(
msg
->
dbFName
,
dbFName
);
tstrncpy
(
msg
->
dbFName
,
dbFName
,
sizeof
(
msg
->
dbFName
)
);
msg
->
vgId
=
vgId
;
msg
->
epSet
=
*
pEpSet
;
...
...
@@ -1215,7 +1216,7 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
CTG_CACHE_STAT_INC
(
numOfDb
,
1
);
SDbVgVersion
vgVersion
=
{.
dbId
=
newDBCache
.
dbId
,
.
vgVersion
=
-
1
};
strncpy
(
vgVersion
.
dbFName
,
dbFName
,
sizeof
(
vgVersion
.
dbFName
));
t
strncpy
(
vgVersion
.
dbFName
,
dbFName
,
sizeof
(
vgVersion
.
dbFName
));
ctgDebug
(
"db added to cache, dbFName:%s, dbId:0x%"
PRIx64
,
dbFName
,
dbId
);
...
...
@@ -1331,8 +1332,8 @@ int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uin
metaRent
.
smaVer
=
pCache
->
pIndex
->
version
;
}
strcpy
(
metaRent
.
dbFName
,
dbFName
);
strcpy
(
metaRent
.
stbName
,
tbName
);
tstrncpy
(
metaRent
.
dbFName
,
dbFName
,
sizeof
(
metaRent
.
dbFName
)
);
tstrncpy
(
metaRent
.
stbName
,
tbName
,
sizeof
(
metaRent
.
stbName
)
);
CTG_ERR_RET
(
ctgMetaRentUpdate
(
&
pCtg
->
stbRent
,
&
metaRent
,
metaRent
.
suid
,
sizeof
(
SSTableVersion
),
ctgStbVersionSortCompare
,
ctgStbVersionSearchCompare
));
...
...
@@ -1418,7 +1419,9 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
ctgDebug
(
"stb 0x%"
PRIx64
" updated to cache, dbFName:%s, tbName:%s, tbType:%d"
,
meta
->
suid
,
dbFName
,
tbName
,
meta
->
tableType
);
if
(
pCache
)
{
CTG_ERR_RET
(
ctgUpdateRentStbVersion
(
pCtg
,
dbFName
,
tbName
,
dbId
,
meta
->
suid
,
pCache
));
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1590,7 +1593,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
dbCache
=
NULL
;
strncpy
(
vgVersion
.
dbFName
,
dbFName
,
sizeof
(
vgVersion
.
dbFName
));
t
strncpy
(
vgVersion
.
dbFName
,
dbFName
,
sizeof
(
vgVersion
.
dbFName
));
CTG_ERR_JRET
(
ctgMetaRentUpdate
(
&
msg
->
pCtg
->
dbRent
,
&
vgVersion
,
vgVersion
.
dbId
,
sizeof
(
SDbVgVersion
),
ctgDbVgVersionSortCompare
,
ctgDbVgVersionSearchCompare
));
...
...
@@ -1680,9 +1683,9 @@ int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
if
(
CTG_IS_META_TABLE
(
pMeta
->
metaType
)
||
CTG_IS_META_BOTH
(
pMeta
->
metaType
))
{
int32_t
metaSize
=
CTG_META_SIZE
(
pMeta
->
tbMeta
);
CTG_ERR_JRET
(
ctgWriteTbMetaToCache
(
pCtg
,
dbCache
,
pMeta
->
dbFName
,
pMeta
->
dbId
,
pMeta
->
tbName
,
pMeta
->
tbMeta
,
metaSize
));
code
=
ctgWriteTbMetaToCache
(
pCtg
,
dbCache
,
pMeta
->
dbFName
,
pMeta
->
dbId
,
pMeta
->
tbName
,
pMeta
->
tbMeta
,
metaSize
);
pMeta
->
tbMeta
=
NULL
;
CTG_ERR_JRET
(
code
);
}
if
(
CTG_IS_META_CTABLE
(
pMeta
->
metaType
)
||
CTG_IS_META_BOTH
(
pMeta
->
metaType
))
{
...
...
@@ -1697,10 +1700,8 @@ int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
_return:
if
(
pMeta
)
{
taosMemoryFreeClear
(
pMeta
->
tbMeta
);
taosMemoryFreeClear
(
pMeta
);
}
taosMemoryFreeClear
(
msg
);
...
...
source/libs/command/src/command.c
浏览文件 @
99fd3284
...
...
@@ -361,7 +361,12 @@ int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
SArray
*
pTagVals
=
NULL
;
STag
*
pTag
=
(
STag
*
)
pCfg
->
pTags
;
if
(
pCfg
->
pTags
&&
tTagIsJson
(
pTag
))
{
if
(
NULL
==
pCfg
->
pTags
||
pCfg
->
numOfTags
<=
0
)
{
qError
(
"tag missed in table cfg, pointer:%p, numOfTags:%d"
,
pCfg
->
pTags
,
pCfg
->
numOfTags
);
return
TSDB_CODE_APP_ERROR
;
}
if
(
tTagIsJson
(
pTag
))
{
char
*
pJson
=
parseTagDatatoJson
(
pTag
);
if
(
pJson
)
{
*
len
+=
sprintf
(
buf
+
VARSTR_HEADER_SIZE
+
*
len
,
"%s"
,
pJson
);
...
...
source/libs/executor/src/dataDispatcher.c
浏览文件 @
99fd3284
...
...
@@ -143,9 +143,15 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
static
int32_t
putDataBlock
(
SDataSinkHandle
*
pHandle
,
const
SInputData
*
pInput
,
bool
*
pContinue
)
{
SDataDispatchHandle
*
pDispatcher
=
(
SDataDispatchHandle
*
)
pHandle
;
SDataDispatchBuf
*
pBuf
=
taosAllocateQitem
(
sizeof
(
SDataDispatchBuf
),
DEF_QITEM
);
if
(
NULL
==
pBuf
||
!
allocBuf
(
pDispatcher
,
pInput
,
pBuf
)
)
{
if
(
NULL
==
pBuf
)
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
if
(
!
allocBuf
(
pDispatcher
,
pInput
,
pBuf
))
{
taosFreeQitem
(
pBuf
);
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
toDataCacheEntry
(
pDispatcher
,
pInput
,
pBuf
);
taosWriteQitem
(
pDispatcher
->
pDataBlocks
,
pBuf
);
*
pContinue
=
(
DS_BUF_LOW
==
updateStatus
(
pDispatcher
)
?
true
:
false
);
...
...
source/libs/executor/src/dataInserter.c
浏览文件 @
99fd3284
...
...
@@ -323,7 +323,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
int32_t
code
=
tsdbGetTableSchema
(
inserter
->
pParam
->
readHandle
->
vnode
,
pInserterNode
->
tableId
,
&
inserter
->
pSchema
,
&
suid
);
if
(
code
)
{
destroyDataSinker
((
SDataSinkHandle
*
)
pInserterNode
);
destroyDataSinker
((
SDataSinkHandle
*
)
inserter
);
return
code
;
}
...
...
source/libs/qcom/src/queryUtil.c
浏览文件 @
99fd3284
...
...
@@ -357,8 +357,7 @@ char* parseTagDatatoJson(void* p) {
for
(
int
j
=
0
;
j
<
nCols
;
++
j
)
{
STagVal
*
pTagVal
=
(
STagVal
*
)
taosArrayGet
(
pTagVals
,
j
);
// json key encode by binary
memset
(
tagJsonKey
,
0
,
sizeof
(
tagJsonKey
));
memcpy
(
tagJsonKey
,
pTagVal
->
pKey
,
strlen
(
pTagVal
->
pKey
));
tstrncpy
(
tagJsonKey
,
pTagVal
->
pKey
,
sizeof
(
tagJsonKey
));
// json value
char
type
=
pTagVal
->
type
;
if
(
type
==
TSDB_DATA_TYPE_NULL
)
{
...
...
source/libs/qcom/src/querymsg.c
浏览文件 @
99fd3284
...
...
@@ -173,7 +173,7 @@ int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
}
SDbCfgReq
dbCfgReq
=
{
0
};
str
cpy
(
dbCfgReq
.
db
,
input
);
str
ncpy
(
dbCfgReq
.
db
,
input
,
sizeof
(
dbCfgReq
.
db
)
-
1
);
int32_t
bufLen
=
tSerializeSDbCfgReq
(
NULL
,
0
,
&
dbCfgReq
);
void
*
pBuf
=
(
*
mallcFp
)(
bufLen
);
...
...
@@ -191,7 +191,7 @@ int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t
}
SUserIndexReq
indexReq
=
{
0
};
str
cpy
(
indexReq
.
indexFName
,
input
);
str
ncpy
(
indexReq
.
indexFName
,
input
,
sizeof
(
indexReq
.
indexFName
)
-
1
);
int32_t
bufLen
=
tSerializeSUserIndexReq
(
NULL
,
0
,
&
indexReq
);
void
*
pBuf
=
(
*
mallcFp
)(
bufLen
);
...
...
@@ -233,7 +233,7 @@ int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32
}
SGetUserAuthReq
req
=
{
0
};
strncpy
(
req
.
user
,
input
,
sizeof
(
req
.
user
));
strncpy
(
req
.
user
,
input
,
sizeof
(
req
.
user
)
-
1
);
int32_t
bufLen
=
tSerializeSGetUserAuthReq
(
NULL
,
0
,
&
req
);
void
*
pBuf
=
(
*
mallcFp
)(
bufLen
);
...
...
@@ -251,7 +251,7 @@ int32_t queryBuildGetTbIndexMsg(void *input, char **msg, int32_t msgSize, int32_
}
STableIndexReq
indexReq
=
{
0
};
str
cpy
(
indexReq
.
tbFName
,
input
);
str
ncpy
(
indexReq
.
tbFName
,
input
,
sizeof
(
indexReq
.
tbFName
)
-
1
);
int32_t
bufLen
=
tSerializeSTableIndexReq
(
NULL
,
0
,
&
indexReq
);
void
*
pBuf
=
(
*
mallcFp
)(
bufLen
);
...
...
@@ -271,8 +271,8 @@ int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
SBuildTableInput
*
pInput
=
input
;
STableCfgReq
cfgReq
=
{
0
};
cfgReq
.
header
.
vgId
=
pInput
->
vgId
;
strncpy
(
cfgReq
.
dbFName
,
pInput
->
dbFName
,
sizeof
(
cfgReq
.
dbFName
));
strncpy
(
cfgReq
.
tbName
,
pInput
->
tbName
,
sizeof
(
cfgReq
.
tbName
));
strncpy
(
cfgReq
.
dbFName
,
pInput
->
dbFName
,
sizeof
(
cfgReq
.
dbFName
)
-
1
);
strncpy
(
cfgReq
.
tbName
,
pInput
->
tbName
,
sizeof
(
cfgReq
.
tbName
)
-
1
);
int32_t
bufLen
=
tSerializeSTableCfgReq
(
NULL
,
0
,
&
cfgReq
);
void
*
pBuf
=
(
*
mallcFp
)(
bufLen
);
...
...
source/libs/qworker/src/qwUtil.c
浏览文件 @
99fd3284
...
...
@@ -412,7 +412,7 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
while
(
true
)
{
paramIdx
=
atomic_load_32
(
&
gQwMgmt
.
paramIdx
);
if
(
paramIdx
==
tListLen
(
gQwMgmt
.
param
))
{
newParamIdx
=
0
;
newParamIdx
=
1
;
}
else
{
newParamIdx
=
paramIdx
+
1
;
}
...
...
@@ -422,6 +422,10 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
}
}
if
(
paramIdx
==
tListLen
(
gQwMgmt
.
param
))
{
paramIdx
=
0
;
}
gQwMgmt
.
param
[
paramIdx
].
qwrId
=
gQwMgmt
.
qwRef
;
gQwMgmt
.
param
[
paramIdx
].
refId
=
refId
;
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
99fd3284
...
...
@@ -398,7 +398,6 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
if
(
QW_EVENT_PROCESSED
(
ctx
,
QW_EVENT_DROP
))
{
QW_TASK_ELOG
(
"task already dropped at wrong phase %s"
,
qwPhaseStr
(
phase
));
QW_ERR_JRET
(
TSDB_CODE_QRY_TASK_STATUS_ERROR
);
break
;
}
if
(
QW_EVENT_RECEIVED
(
ctx
,
QW_EVENT_DROP
))
{
...
...
source/libs/scheduler/src/schTask.c
浏览文件 @
99fd3284
...
...
@@ -430,7 +430,8 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
if
(
SCH_IS_DATA_BIND_TASK
(
pTask
))
{
if
(
NULL
==
pData
->
pEpSet
)
{
SCH_TASK_ELOG
(
"no epset updated while got error %s"
,
tstrerror
(
rspCode
));
SCH_ERR_JRET
(
rspCode
);
code
=
rspCode
;
goto
_return
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录