Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
df973ce2
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
df973ce2
编写于
5月 16, 2023
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: remove debug info
上级
e4380d63
变更
19
隐藏空白更改
内联
并排
Showing
19 changed file
with
29 addition
and
60 deletion
+29
-60
include/libs/executor/dataSinkMgt.h
include/libs/executor/dataSinkMgt.h
+0
-1
source/client/src/clientMain.c
source/client/src/clientMain.c
+1
-1
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+0
-2
source/libs/catalog/src/ctgAsync.c
source/libs/catalog/src/ctgAsync.c
+1
-1
source/libs/catalog/src/ctgRemote.c
source/libs/catalog/src/ctgRemote.c
+1
-1
source/libs/executor/src/dataDispatcher.c
source/libs/executor/src/dataDispatcher.c
+0
-10
source/libs/executor/src/exchangeoperator.c
source/libs/executor/src/exchangeoperator.c
+1
-1
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+2
-2
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+2
-2
source/libs/executor/src/projectoperator.c
source/libs/executor/src/projectoperator.c
+0
-10
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+0
-6
source/libs/qworker/src/qwMsg.c
source/libs/qworker/src/qwMsg.c
+4
-4
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+1
-3
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+3
-3
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+2
-2
source/libs/scheduler/src/schTask.c
source/libs/scheduler/src/schTask.c
+1
-1
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+2
-2
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+3
-3
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+5
-5
未找到文件。
include/libs/executor/dataSinkMgt.h
浏览文件 @
df973ce2
...
...
@@ -69,7 +69,6 @@ typedef struct SOutputData {
int32_t
numOfBlocks
;
int64_t
numOfRows
;
// int32_t changed to int64_t
int32_t
numOfCols
;
int32_t
dataSize
;
int8_t
compressed
;
char
*
pData
;
bool
queryEnd
;
...
...
source/client/src/clientMain.c
浏览文件 @
df973ce2
...
...
@@ -761,7 +761,7 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t
int64_t
analyseStart
=
taosGetTimestampUs
();
pRequest
->
metric
.
ctgCostUs
=
analyseStart
-
pRequest
->
metric
.
ctgStart
;
q
Error
(
"0x%"
PRIx64
" start to semantic analysis, reqId:0x%"
PRIx64
,
pRequest
->
self
,
pRequest
->
requestId
);
q
Debug
(
"0x%"
PRIx64
" start to semantic analysis, reqId:0x%"
PRIx64
,
pRequest
->
self
,
pRequest
->
requestId
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
code
=
qAnalyseSqlSemantic
(
pWrapper
->
pParseCtx
,
pWrapper
->
pCatalogReq
,
pResultMeta
,
pQuery
);
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
df973ce2
...
...
@@ -1341,8 +1341,6 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SC
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
ctgError
(
"%s"
,
"catalog start async get all metas"
);
int32_t
code
=
0
;
SCtgJob
*
pJob
=
NULL
;
CTG_ERR_JRET
(
ctgInitJob
(
pCtg
,
pConn
,
&
pJob
,
pReq
,
fp
,
param
));
...
...
source/libs/catalog/src/ctgAsync.c
浏览文件 @
df973ce2
...
...
@@ -1035,7 +1035,7 @@ _return:
int32_t
ctgCallUserCb
(
void
*
param
)
{
SCtgJob
*
pJob
=
(
SCtgJob
*
)
param
;
q
Error
(
"QID:0x%"
PRIx64
" ctg start to call user cb with rsp %s"
,
pJob
->
queryId
,
tstrerror
(
pJob
->
jobResCode
));
q
Debug
(
"QID:0x%"
PRIx64
" ctg start to call user cb with rsp %s"
,
pJob
->
queryId
,
tstrerror
(
pJob
->
jobResCode
));
(
*
pJob
->
userFp
)(
&
pJob
->
jobRes
,
pJob
->
userParam
,
pJob
->
jobResCode
);
...
...
source/libs/catalog/src/ctgRemote.c
浏览文件 @
df973ce2
...
...
@@ -627,7 +627,7 @@ int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) {
SCtgBatch
*
pBatch
=
(
SCtgBatch
*
)
p
;
int32_t
msgSize
=
0
;
ctg
Error
(
"QID:0x%"
PRIx64
" ctg start to launch batch %d"
,
pJob
->
queryId
,
pBatch
->
batchId
);
ctg
Debug
(
"QID:0x%"
PRIx64
" ctg start to launch batch %d"
,
pJob
->
queryId
,
pBatch
->
batchId
);
CTG_ERR_JRET
(
ctgBuildBatchReqMsg
(
pBatch
,
*
vgId
,
&
msg
,
&
msgSize
));
code
=
ctgAsyncSendMsg
(
pCtg
,
&
pBatch
->
conn
,
pJob
,
pBatch
->
pTaskIds
,
pBatch
->
batchId
,
pBatch
->
pMsgIdxs
,
...
...
source/libs/executor/src/dataDispatcher.c
浏览文件 @
df973ce2
...
...
@@ -82,8 +82,6 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
pBuf
->
useSize
+=
pEntry
->
dataLen
;
//mprotect(pBuf->pData, pBuf->allocSize, PROT_READ);
atomic_add_fetch_64
(
&
pHandle
->
cachedSize
,
pEntry
->
dataLen
);
atomic_add_fetch_64
(
&
gDataSinkStat
.
cachedSize
,
pEntry
->
dataLen
);
}
...
...
@@ -101,8 +99,6 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
pBuf
->
allocSize
=
sizeof
(
SDataCacheEntry
)
+
blockGetEncodeSize
(
pInput
->
pData
);
pBuf
->
pData
=
taosMemoryMalloc
(
pBuf
->
allocSize
);
//pBuf->allocSize = (pBuf->allocSize > 4096) ? (pBuf->allocSize/4096 + 1)*4096 : 4096;
//pBuf->pData = taosMemoryMallocAlign(4096, pBuf->allocSize);
if
(
pBuf
->
pData
==
NULL
)
{
qError
(
"SinkNode failed to malloc memory, size:%d, code:%d"
,
pBuf
->
allocSize
,
TAOS_SYSTEM_ERROR
(
errno
));
}
...
...
@@ -193,19 +189,14 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
return
TSDB_CODE_SUCCESS
;
}
SDataCacheEntry
*
pEntry
=
(
SDataCacheEntry
*
)(
pDispatcher
->
nextOutput
.
pData
);
assert
(
pOutput
->
dataSize
==
pEntry
->
dataLen
);
memcpy
(
pOutput
->
pData
,
pEntry
->
data
,
pEntry
->
dataLen
);
pOutput
->
numOfRows
=
pEntry
->
numOfRows
;
pOutput
->
numOfCols
=
pEntry
->
numOfCols
;
pOutput
->
compressed
=
pEntry
->
compressed
;
// ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
// ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
atomic_sub_fetch_64
(
&
pDispatcher
->
cachedSize
,
pEntry
->
dataLen
);
atomic_sub_fetch_64
(
&
gDataSinkStat
.
cachedSize
,
pEntry
->
dataLen
);
//mprotect(pDispatcher->nextOutput.pData, pDispatcher->nextOutput.allocSize, PROT_READ|PROT_WRITE|PROT_EXEC);
taosMemoryFreeClear
(
pDispatcher
->
nextOutput
.
pData
);
// todo persistent
pOutput
->
bufStatus
=
updateStatus
(
pDispatcher
);
taosThreadMutexLock
(
&
pDispatcher
->
mutex
);
...
...
@@ -225,7 +216,6 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
SDataDispatchBuf
*
pBuf
=
NULL
;
taosReadQitem
(
pDispatcher
->
pDataBlocks
,
(
void
**
)
&
pBuf
);
if
(
pBuf
!=
NULL
)
{
//mprotect(pBuf->pData, pBuf->allocSize, PROT_READ|PROT_WRITE|PROT_EXEC);
taosMemoryFreeClear
(
pBuf
->
pData
);
taosFreeQitem
(
pBuf
);
}
...
...
source/libs/executor/src/exchangeoperator.c
浏览文件 @
df973ce2
...
...
@@ -451,7 +451,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
return
pTaskInfo
->
code
;
}
q
Error
(
"%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%"
PRIx64
", execId:%d, %p, %d/%"
PRIzu
,
q
Debug
(
"%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%"
PRIx64
", execId:%d, %p, %d/%"
PRIzu
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
addr
.
epSet
.
eps
[
0
].
fqdn
,
pSource
->
taskId
,
pSource
->
execId
,
pExchangeInfo
,
sourceIndex
,
totalSources
);
...
...
source/libs/executor/src/executil.c
浏览文件 @
df973ce2
...
...
@@ -2096,7 +2096,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
int64_t
st1
=
taosGetTimestampUs
();
pTaskInfo
->
cost
.
extractListTime
=
(
st1
-
st
)
/
1000
.
0
;
q
Error
(
"extract queried table list completed, %d tables, elapsed time:%.2f ms %s"
,
numOfTables
,
q
Debug
(
"extract queried table list completed, %d tables, elapsed time:%.2f ms %s"
,
numOfTables
,
pTaskInfo
->
cost
.
extractListTime
,
idStr
);
if
(
numOfTables
==
0
)
{
...
...
@@ -2110,7 +2110,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
}
pTaskInfo
->
cost
.
groupIdMapTime
=
(
taosGetTimestampUs
()
-
st1
)
/
1000
.
0
;
q
Error
(
"generate group id map completed, elapsed time:%.2f ms %s"
,
pTaskInfo
->
cost
.
groupIdMapTime
,
idStr
);
q
Debug
(
"generate group id map completed, elapsed time:%.2f ms %s"
,
pTaskInfo
->
cost
.
groupIdMapTime
,
idStr
);
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/executor/src/executor.c
浏览文件 @
df973ce2
...
...
@@ -493,7 +493,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
SExecTaskInfo
**
pTask
=
(
SExecTaskInfo
**
)
pTaskInfo
;
taosThreadOnce
(
&
initPoolOnce
,
initRefPool
);
q
Error
(
"start to create task, TID:0x%"
PRIx64
" QID:0x%"
PRIx64
", vgId:%d"
,
taskId
,
pSubplan
->
id
.
queryId
,
vgId
);
q
Debug
(
"start to create task, TID:0x%"
PRIx64
" QID:0x%"
PRIx64
", vgId:%d"
,
taskId
,
pSubplan
->
id
.
queryId
,
vgId
);
int32_t
code
=
createExecTaskInfo
(
pSubplan
,
pTask
,
readHandle
,
taskId
,
vgId
,
sql
,
model
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -520,7 +520,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
code
=
dsCreateDataSinker
(
pSubplan
->
pDataSink
,
handle
,
pSinkParam
,
(
*
pTask
)
->
id
.
str
);
}
q
Error
(
"subplan task create completed, TID:0x%"
PRIx64
" QID:0x%"
PRIx64
,
taskId
,
pSubplan
->
id
.
queryId
);
q
Debug
(
"subplan task create completed, TID:0x%"
PRIx64
" QID:0x%"
PRIx64
,
taskId
,
pSubplan
->
id
.
queryId
);
_error:
// if failed to add ref for all tables in this query, abort current query
...
...
source/libs/executor/src/projectoperator.c
浏览文件 @
df973ce2
...
...
@@ -249,9 +249,6 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
return
doGenerateSourceData
(
pOperator
);
}
int64_t
st0
=
taosGetTimestampUs
();
int64_t
st1
=
0
,
st2
=
0
;
while
(
1
)
{
while
(
1
)
{
blockDataCleanup
(
pRes
);
...
...
@@ -263,7 +260,6 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
setOperatorCompleted
(
pOperator
);
break
;
}
st1
=
taosGetTimestampUs
();
// if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
// qDebug("set status recv");
// pOperator->status = OP_EXEC_RECV;
...
...
@@ -305,8 +301,6 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
continue
;
}
st2
=
taosGetTimestampUs
();
break
;
}
...
...
@@ -352,10 +346,6 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pOperator
->
cost
.
openCost
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
}
if
(
p
->
info
.
rows
>
0
)
{
qError
(
"doProjection, open:%"
PRId64
"us, project:%"
PRId64
"us, rows:%"
PRId64
,
st1
-
st0
,
st2
-
st1
,
p
->
info
.
rows
);
}
return
(
p
->
info
.
rows
>
0
)
?
p
:
NULL
;
}
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
df973ce2
...
...
@@ -1279,20 +1279,15 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
return
NULL
;
}
int64_t
st0
=
taosGetTimestampUs
();
SSDataBlock
*
pBlock
=
pInfo
->
binfo
.
pRes
;
pTaskInfo
->
code
=
pOperator
->
fpSet
.
_openFn
(
pOperator
);
if
(
pTaskInfo
->
code
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
}
int64_t
st1
=
taosGetTimestampUs
();
int64_t
st2
=
0
,
st3
=
0
;
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
st2
=
taosGetTimestampUs
();
doFilter
(
pBlock
,
pOperator
->
exprSupp
.
pFilterInfo
,
NULL
);
st3
=
taosGetTimestampUs
();
bool
hasRemain
=
hasRemainResults
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
...
...
@@ -1301,7 +1296,6 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
}
if
(
pBlock
->
info
.
rows
>
0
)
{
qError
(
"interval open:%"
PRId64
"us, build:%"
PRId64
"us, filter:%"
PRId64
"us, rows:%"
PRId64
,
st1
-
st0
,
st2
-
st1
,
st3
-
st2
,
pBlock
->
info
.
rows
);
break
;
}
}
...
...
source/libs/qworker/src/qwMsg.c
浏览文件 @
df973ce2
...
...
@@ -381,9 +381,9 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGran
SQWMsg
qwMsg
=
{
.
msgType
=
pMsg
->
msgType
,
.
msg
=
msg
.
msg
,
.
msgLen
=
msg
.
msgLen
,
.
connInfo
=
pMsg
->
info
};
QW_SCH_TASK_
E
LOG
(
"prerocessQuery start, handle:%p, SQL:%s"
,
pMsg
->
info
.
handle
,
msg
.
sql
);
QW_SCH_TASK_
D
LOG
(
"prerocessQuery start, handle:%p, SQL:%s"
,
pMsg
->
info
.
handle
,
msg
.
sql
);
code
=
qwPreprocessQuery
(
QW_FPARAMS
(),
&
qwMsg
);
QW_SCH_TASK_
E
LOG
(
"prerocessQuery end, handle:%p, code:%x"
,
pMsg
->
info
.
handle
,
code
);
QW_SCH_TASK_
D
LOG
(
"prerocessQuery end, handle:%p, code:%x"
,
pMsg
->
info
.
handle
,
code
);
tFreeSSubQueryMsg
(
&
msg
);
...
...
@@ -449,11 +449,11 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
qwMsg
.
msgInfo
.
taskType
=
msg
.
taskType
;
qwMsg
.
msgInfo
.
needFetch
=
msg
.
needFetch
;
QW_SCH_TASK_
E
LOG
(
"processQuery start, node:%p, type:%s, handle:%p, SQL:%s"
,
node
,
TMSG_INFO
(
pMsg
->
msgType
),
QW_SCH_TASK_
D
LOG
(
"processQuery start, node:%p, type:%s, handle:%p, SQL:%s"
,
node
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
info
.
handle
,
msg
.
sql
);
code
=
qwProcessQuery
(
QW_FPARAMS
(),
&
qwMsg
,
msg
.
sql
);
msg
.
sql
=
NULL
;
QW_SCH_TASK_
E
LOG
(
"processQuery end, node:%p, code:%x"
,
node
,
code
);
QW_SCH_TASK_
D
LOG
(
"processQuery end, node:%p, code:%x"
,
node
,
code
);
tFreeSSubQueryMsg
(
&
msg
);
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
df973ce2
...
...
@@ -138,7 +138,7 @@ int32_t qwSendQueryRsp(QW_FPARAMS_DEF, int32_t msgType, SQWTaskCtx *ctx, int32_t
if
((
!
quickRsp
)
||
QUERY_RSP_POLICY_QUICK
==
tsQueryRspPolicy
)
{
if
(
!
ctx
->
localExec
)
{
qwBuildAndSendQueryRsp
(
msgType
,
&
ctx
->
ctrlConnInfo
,
rspCode
,
ctx
);
QW_TASK_
E
LOG
(
"query msg rsped, handle:%p, code:%x - %s"
,
ctx
->
ctrlConnInfo
.
handle
,
rspCode
,
tstrerror
(
rspCode
));
QW_TASK_
D
LOG
(
"query msg rsped, handle:%p, code:%x - %s"
,
ctx
->
ctrlConnInfo
.
handle
,
rspCode
,
tstrerror
(
rspCode
));
}
ctx
->
queryRsped
=
true
;
...
...
@@ -323,7 +323,6 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
if
(
len
==
0
)
{
if
(
queryEnd
)
{
output
.
dataSize
=
0
;
code
=
dsGetDataBlock
(
ctx
->
sinkHandle
,
&
output
);
if
(
code
)
{
QW_TASK_ELOG
(
"dsGetDataBlock failed, code:%x - %s"
,
code
,
tstrerror
(
code
));
...
...
@@ -359,7 +358,6 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
QW_ERR_RET
(
qwMallocFetchRsp
(
!
ctx
->
localExec
,
*
dataLen
,
&
rsp
));
output
.
pData
=
rsp
->
data
+
*
dataLen
-
len
;
output
.
dataSize
=
len
;
code
=
dsGetDataBlock
(
ctx
->
sinkHandle
,
&
output
);
if
(
code
)
{
QW_TASK_ELOG
(
"dsGetDataBlock failed, code:%x - %s"
,
code
,
tstrerror
(
code
));
...
...
source/libs/scheduler/src/schJob.c
浏览文件 @
df973ce2
...
...
@@ -440,7 +440,7 @@ int32_t schNotifyUserExecRes(SSchJob *pJob) {
schDumpJobExecRes
(
pJob
,
pRes
);
}
SCH_JOB_
E
LOG
(
"sch start to invoke exec cb, code: %s"
,
tstrerror
(
pJob
->
errCode
));
SCH_JOB_
D
LOG
(
"sch start to invoke exec cb, code: %s"
,
tstrerror
(
pJob
->
errCode
));
(
*
pJob
->
userRes
.
execFp
)(
pRes
,
pJob
->
userRes
.
cbParam
,
atomic_load_32
(
&
pJob
->
errCode
));
SCH_JOB_DLOG
(
"sch end from exec cb, code: %s"
,
tstrerror
(
pJob
->
errCode
));
...
...
@@ -452,7 +452,7 @@ int32_t schNotifyUserFetchRes(SSchJob *pJob) {
schDumpJobFetchRes
(
pJob
,
&
pRes
);
SCH_JOB_
E
LOG
(
"sch start to invoke fetch cb, code: %s"
,
tstrerror
(
pJob
->
errCode
));
SCH_JOB_
D
LOG
(
"sch start to invoke fetch cb, code: %s"
,
tstrerror
(
pJob
->
errCode
));
(
*
pJob
->
userRes
.
fetchFp
)(
pRes
,
pJob
->
userRes
.
cbParam
,
atomic_load_32
(
&
pJob
->
errCode
));
SCH_JOB_DLOG
(
"sch end from fetch cb, code: %s"
,
tstrerror
(
pJob
->
errCode
));
...
...
@@ -811,7 +811,7 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
tsem_wait
(
&
pJob
->
rspSem
);
}
SCH_JOB_
E
LOG
(
"job exec done, job status:%s, jobId:0x%"
PRIx64
,
SCH_GET_JOB_STATUS_STR
(
pJob
),
pJob
->
refId
);
SCH_JOB_
D
LOG
(
"job exec done, job status:%s, jobId:0x%"
PRIx64
,
SCH_GET_JOB_STATUS_STR
(
pJob
),
pJob
->
refId
);
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/scheduler/src/schRemote.c
浏览文件 @
df973ce2
...
...
@@ -923,10 +923,10 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
SCH_ERR_JRET
(
schUpdateSendTargetInfo
(
pMsgSendInfo
,
addr
,
pTask
));
if
(
pJob
&&
pTask
)
{
SCH_TASK_
E
LOG
(
"start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p"
,
TMSG_INFO
(
msgType
),
addr
->
nodeId
,
SCH_TASK_
D
LOG
(
"start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p"
,
TMSG_INFO
(
msgType
),
addr
->
nodeId
,
epSet
->
eps
[
epSet
->
inUse
].
fqdn
,
epSet
->
eps
[
epSet
->
inUse
].
port
,
trans
->
pTrans
,
trans
->
pHandle
);
}
else
{
q
Error
(
"start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p"
,
TMSG_INFO
(
msgType
),
addr
->
nodeId
,
q
Debug
(
"start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p"
,
TMSG_INFO
(
msgType
),
addr
->
nodeId
,
epSet
->
eps
[
epSet
->
inUse
].
fqdn
,
epSet
->
eps
[
epSet
->
inUse
].
port
,
trans
->
pTrans
,
trans
->
pHandle
);
}
...
...
source/libs/scheduler/src/schTask.c
浏览文件 @
df973ce2
...
...
@@ -1064,7 +1064,7 @@ int32_t schLaunchTaskImpl(void *param) {
pTask
->
retryTimes
++
;
pTask
->
waitRetry
=
false
;
SCH_TASK_
E
LOG
(
"start to launch %s task, execId %d, retry %d"
,
SCH_TASK_
D
LOG
(
"start to launch %s task, execId %d, retry %d"
,
SCH_IS_LOCAL_EXEC_TASK
(
pJob
,
pTask
)
?
"LOCAL"
:
"REMOTE"
,
pTask
->
execId
,
pTask
->
retryTimes
);
SCH_LOG_TASK_START_TS
(
pTask
);
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
df973ce2
...
...
@@ -67,7 +67,7 @@ int32_t schedulerInit() {
}
int32_t
schedulerExecJob
(
SSchedulerReq
*
pReq
,
int64_t
*
pJobId
)
{
q
Error
(
"scheduler %s exec job start"
,
pReq
->
syncReq
?
"SYNC"
:
"ASYNC"
);
q
Debug
(
"scheduler %s exec job start"
,
pReq
->
syncReq
?
"SYNC"
:
"ASYNC"
);
int32_t
code
=
0
;
SSchJob
*
pJob
=
NULL
;
...
...
@@ -86,7 +86,7 @@ _return:
}
int32_t
schedulerFetchRows
(
int64_t
jobId
,
SSchedulerReq
*
pReq
)
{
q
Error
(
"scheduler %s fetch rows start"
,
pReq
->
syncReq
?
"SYNC"
:
"ASYNC"
);
q
Debug
(
"scheduler %s fetch rows start"
,
pReq
->
syncReq
?
"SYNC"
:
"ASYNC"
);
int32_t
code
=
0
;
SSchJob
*
pJob
=
NULL
;
...
...
source/libs/transport/src/transCli.c
浏览文件 @
df973ce2
...
...
@@ -426,7 +426,7 @@ void cliHandleResp(SCliConn* conn) {
}
STraceId
*
trace
=
&
transMsg
.
info
.
traceId
;
tG
Error
(
"%s conn %p %s received from %s, local info:%s, len:%d, code str:%s"
,
CONN_GET_INST_LABEL
(
conn
),
conn
,
tG
Debug
(
"%s conn %p %s received from %s, local info:%s, len:%d, code str:%s"
,
CONN_GET_INST_LABEL
(
conn
),
conn
,
TMSG_INFO
(
pHead
->
msgType
),
conn
->
dst
,
conn
->
src
,
pHead
->
msgLen
,
tstrerror
(
transMsg
.
code
));
if
(
pCtx
==
NULL
&&
CONN_NO_PERSIST_BY_APP
(
conn
))
{
...
...
@@ -966,7 +966,7 @@ static void cliSendCb(uv_write_t* req, int status) {
}
if
(
status
==
0
)
{
t
Error
(
"%s conn %p data already was written out"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
);
t
Debug
(
"%s conn %p data already was written out"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
);
}
else
{
if
(
!
uv_is_closing
((
uv_handle_t
*
)
&
pConn
->
stream
))
{
tError
(
"%s conn %p failed to write:%s"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
,
uv_err_name
(
status
));
...
...
@@ -1106,7 +1106,7 @@ void cliSend(SCliConn* pConn) {
msgLen
=
(
int32_t
)
ntohl
((
uint32_t
)(
pHead
->
msgLen
));
}
tG
Error
(
"%s conn %p %s is sent to %s, local info %s, len:%d"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
,
tG
Debug
(
"%s conn %p %s is sent to %s, local info %s, len:%d"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
,
TMSG_INFO
(
pHead
->
msgType
),
pConn
->
dst
,
pConn
->
src
,
msgLen
);
uv_buf_t
wb
=
uv_buf_init
((
char
*
)
pHead
,
msgLen
);
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
df973ce2
...
...
@@ -236,19 +236,19 @@ static bool uvHandleReq(SSvrConn* pConn) {
if
(
pConn
->
status
==
ConnNormal
&&
pHead
->
noResp
==
0
)
{
transRefSrvHandle
(
pConn
);
if
(
cost
>=
EXCEPTION_LIMIT_US
)
{
tG
Error
(
"%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception"
,
transLabel
(
pTransInst
),
tG
Debug
(
"%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
pConn
->
dst
,
pConn
->
src
,
msgLen
,
(
int
)
cost
);
}
else
{
tG
Error
(
"%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus"
,
transLabel
(
pTransInst
),
pConn
,
tG
Debug
(
"%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
pConn
->
dst
,
pConn
->
src
,
msgLen
,
(
int
)
cost
);
}
}
else
{
if
(
cost
>=
EXCEPTION_LIMIT_US
)
{
tG
Error
(
"%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception"
,
tG
Debug
(
"%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
pConn
->
dst
,
pConn
->
src
,
msgLen
,
pHead
->
noResp
,
transMsg
.
code
,
(
int
)(
cost
));
}
else
{
tG
Error
(
"%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus"
,
tG
Debug
(
"%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
pConn
->
dst
,
pConn
->
src
,
msgLen
,
pHead
->
noResp
,
transMsg
.
code
,
(
int
)(
cost
));
}
...
...
@@ -447,7 +447,7 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
}
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
tG
Error
(
"%s conn %p %s is sent to %s, local info:%s, len:%d"
,
transLabel
(
pTransInst
),
pConn
,
tG
Debug
(
"%s conn %p %s is sent to %s, local info:%s, len:%d"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
pHead
->
msgType
),
pConn
->
dst
,
pConn
->
src
,
len
);
wb
->
base
=
(
char
*
)
pHead
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录