Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Greenplum
Gpdb
提交
5d408fc1
G
Gpdb
项目概览
Greenplum
/
Gpdb
通知
7
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
G
Gpdb
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
5d408fc1
编写于
6月 13, 2016
作者:
G
Gang Xiong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Remove dead code
上级
dc68b3d8
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
4 addition
and
372 deletion
+4
-372
src/backend/cdb/dispatcher/cdbdisp.c
src/backend/cdb/dispatcher/cdbdisp.c
+1
-1
src/backend/cdb/dispatcher/cdbdisp_thread.c
src/backend/cdb/dispatcher/cdbdisp_thread.c
+2
-318
src/backend/cdb/dispatcher/cdbdispatchresult.c
src/backend/cdb/dispatcher/cdbdispatchresult.c
+0
-43
src/include/cdb/cdbdisp_thread.h
src/include/cdb/cdbdisp_thread.h
+1
-2
src/include/cdb/cdbdispatchresult.h
src/include/cdb/cdbdispatchresult.h
+0
-8
未找到文件。
src/backend/cdb/dispatcher/cdbdisp.c
浏览文件 @
5d408fc1
...
...
@@ -111,7 +111,7 @@ CdbCheckDispatchResult(struct CdbDispatcherState *ds,
{
PG_TRY
();
{
CdbCheckDispatchResult_internal
(
ds
,
NULL
,
NULL
,
waitMode
);
CdbCheckDispatchResult_internal
(
ds
,
waitMode
);
}
PG_CATCH
();
{
...
...
src/backend/cdb/dispatcher/cdbdisp_thread.c
浏览文件 @
5d408fc1
...
...
@@ -71,7 +71,6 @@ cdbdisp_signalQE(SegmentDatabaseDescriptor * segdbDesc,
static
void
*
thread_DispatchCommand
(
void
*
arg
);
static
void
thread_DispatchOut
(
DispatchCommandParms
*
pParms
);
static
void
thread_DispatchWait
(
DispatchCommandParms
*
pParms
);
static
void
thread_DispatchWaitSingle
(
DispatchCommandParms
*
pParms
);
static
void
handlePollError
(
DispatchCommandParms
*
pParms
,
int
db_count
,
int
sock_errno
);
...
...
@@ -160,12 +159,6 @@ cdbdisp_dispatchToGang_internal(struct CdbDispatcherState *ds,
elog
(
FATAL
,
"could not allocate resources for segworker communication"
);
}
/*
* Transfer any connection errors from segdbDesc.
*/
if
(
segdbDesc
->
errcode
||
segdbDesc
->
error_message
.
len
)
cdbdisp_mergeConnectionErrors
(
qeResult
,
segdbDesc
);
parmsIndex
=
segdbs_in_thread_pool
/
gp_connections_per_thread
;
pParms
=
ds
->
dispatchThreads
->
dispatchCommandParmsAr
+
ds
->
dispatchThreads
->
threadCount
+
parmsIndex
;
pParms
->
dispatchResultPtrArray
[
pParms
->
db_count
++
]
=
qeResult
;
...
...
@@ -240,23 +233,15 @@ cdbdisp_dispatchToGang_internal(struct CdbDispatcherState *ds,
void
CdbCheckDispatchResult_internal
(
struct
CdbDispatcherState
*
ds
,
struct
SegmentDatabaseDescriptor
***
failedSegDB
,
int
*
numOfFailed
,
DispatchWaitMode
waitMode
)
DispatchWaitMode
waitMode
)
{
int
i
;
int
j
;
int
nFailed
=
0
;
DispatchCommandParms
*
pParms
;
CdbDispatchResult
*
dispatchResult
;
SegmentDatabaseDescriptor
*
segdbDesc
;
Assert
(
ds
!=
NULL
);
if
(
failedSegDB
)
*
failedSegDB
=
NULL
;
if
(
numOfFailed
)
*
numOfFailed
=
0
;
/*
* No-op if no work was dispatched since the last time we were called.
*/
...
...
@@ -332,48 +317,18 @@ CdbCheckDispatchResult_internal(struct CdbDispatcherState *ds,
continue
;
}
segdbDesc
=
dispatchResult
->
segdbDesc
;
/*
* segdbDesc error message is unlikely here, but check anyway.
*/
if
(
segdbDesc
->
errcode
||
segdbDesc
->
error_message
.
len
)
cdbdisp_mergeConnectionErrors
(
dispatchResult
,
segdbDesc
);
/*
* Log the result
*/
if
(
DEBUG2
>=
log_min_messages
)
cdbdisp_debugDispatchResult
(
dispatchResult
,
DEBUG2
,
DEBUG3
);
/*
* Notify FTS to reconnect if connection lost or never connected.
*/
if
(
failedSegDB
&&
PQstatus
(
segdbDesc
->
conn
)
==
CONNECTION_BAD
)
{
/*
* Allocate storage. Caller should pfree() it.
*/
if
(
!*
failedSegDB
)
*
failedSegDB
=
palloc
(
sizeof
(
**
failedSegDB
)
*
(
2
*
getgpsegmentCount
()
+
1
));
/*
* Append to broken connection list.
*/
(
*
failedSegDB
)[
nFailed
++
]
=
segdbDesc
;
(
*
failedSegDB
)[
nFailed
]
=
NULL
;
if
(
numOfFailed
)
*
numOfFailed
=
nFailed
;
}
/*
* Zap our SegmentDatabaseDescriptor ptr because it may be
* invalidated by the call to FtsHandleNetFailure() below.
* Anything we need from there, we should get before this.
*/
dispatchResult
->
segdbDesc
=
NULL
;
}
}
...
...
@@ -562,103 +517,11 @@ thread_DispatchOut(DispatchCommandParms * pParms)
continue
;
}
#ifdef USE_NONBLOCKING
/*
* In 2000, Tom Lane said:
* "I believe that the nonblocking-mode code is pretty buggy, and don't
* recommend using it unless you really need it and want to help debug
* it.."
*
* Reading through the code, I'm not convinced the situation has
* improved in 2007... I still see some very questionable things
* about nonblocking mode, so for now, I'm disabling it.
*/
PQsetnonblocking
(
dispatchResult
->
segdbDesc
->
conn
,
TRUE
);
#endif
dispatchCommand
(
dispatchResult
,
pParms
->
query_text
,
pParms
->
query_text_len
);
}
}
#ifdef USE_NONBLOCKING
/*
* Is everything sent? Well, if the network stack was too busy, and we are using
* nonblocking mode, some of the sends
* might not have completed. We can't use SELECT to wait unless they have
* received their work, or we will wait forever. Make sure they do.
*/
{
bool
allsent
=
true
;
/*
* debug loop to check to see if this really is needed
*/
for
(
i
=
0
;
i
<
db_count
;
i
++
)
{
dispatchResult
=
pParms
->
dispatchResultPtrArray
[
i
];
if
(
!
dispatchResult
->
stillRunning
||
!
dispatchResult
->
hasDispatched
)
continue
;
if
(
PQstatus
(
dispatchResult
->
segdbDesc
->
conn
)
==
CONNECTION_BAD
)
continue
;
if
(
dispatchResult
->
segdbDesc
->
conn
->
outCount
>
0
)
{
write_log
(
"Yes, extra flushing is necessary %d"
,
i
);
break
;
}
}
/*
* Check to see if any needed extra flushing.
*/
for
(
i
=
0
;
i
<
db_count
;
i
++
)
{
int
flushResult
;
dispatchResult
=
pParms
->
dispatchResultPtrArray
[
i
];
if
(
!
dispatchResult
->
stillRunning
||
!
dispatchResult
->
hasDispatched
)
continue
;
if
(
PQstatus
(
dispatchResult
->
segdbDesc
->
conn
)
==
CONNECTION_BAD
)
continue
;
/*
* If data remains unsent, send it. Else we might be waiting for the
* result of a command the backend hasn't even got yet.
*/
flushResult
=
PQflush
(
dispatchResult
->
segdbDesc
->
conn
);
/*
* First time, go through the loop without waiting if we can't
* flush, in case we are using multiple network adapters, and
* other connections might be able to flush
*/
if
(
flushResult
>
0
)
{
allsent
=
false
;
write_log
(
"flushing didn't finish the work %d"
,
i
);
}
}
/*
* our first attempt at doing more flushes didn't get everything out,
* so we need to continue to try.
*/
for
(
i
=
0
;
i
<
db_count
;
i
++
)
{
dispatchResult
=
pParms
->
dispatchResultPtrArray
[
i
];
while
(
PQisnonblocking
(
dispatchResult
->
segdbDesc
->
conn
))
{
PQflush
(
dispatchResult
->
segdbDesc
->
conn
);
PQsetnonblocking
(
dispatchResult
->
segdbDesc
->
conn
,
FALSE
);
}
}
}
#endif
}
static
void
...
...
@@ -851,179 +714,6 @@ thread_DispatchWait(DispatchCommandParms * pParms)
}
}
static
void
thread_DispatchWaitSingle
(
DispatchCommandParms
*
pParms
)
{
SegmentDatabaseDescriptor
*
segdbDesc
;
CdbDispatchResult
*
dispatchResult
;
char
*
msg
=
NULL
;
/*
* Assert() cannot be used in threads
*/
if
(
pParms
->
db_count
!=
1
)
write_log
(
"Bug... thread_dispatchWaitSingle called with db_count %d"
,
pParms
->
db_count
);
dispatchResult
=
pParms
->
dispatchResultPtrArray
[
0
];
segdbDesc
=
dispatchResult
->
segdbDesc
;
if
(
PQstatus
(
segdbDesc
->
conn
)
==
CONNECTION_BAD
)
{
msg
=
PQerrorMessage
(
segdbDesc
->
conn
);
/*
* Save error info for later.
*/
cdbdisp_appendMessage
(
dispatchResult
,
DEBUG1
,
ERRCODE_GP_INTERCONNECTION_ERROR
,
"Lost connection to %s. %s"
,
segdbDesc
->
whoami
,
msg
?
msg
:
""
);
/*
* Free the PGconn object.
*/
PQfinish
(
segdbDesc
->
conn
);
segdbDesc
->
conn
=
NULL
;
dispatchResult
->
stillRunning
=
false
;
}
else
{
PQsetnonblocking
(
segdbDesc
->
conn
,
FALSE
);
for
(;;)
{
PGresult
*
pRes
;
ExecStatusType
resultStatus
;
int
resultIndex
=
cdbdisp_numPGresult
(
dispatchResult
);
if
(
DEBUG4
>=
log_min_messages
)
write_log
(
"PQgetResult, resultIndex = %d"
,
resultIndex
);
/*
* Get one message.
*/
pRes
=
PQgetResult
(
segdbDesc
->
conn
);
CollectQEWriterTransactionInformation
(
segdbDesc
,
dispatchResult
);
/*
* Command is complete when PGgetResult() returns NULL. It is critical
* that for any connection that had an asynchronous command sent thru
* it, we call PQgetResult until it returns NULL. Otherwise, the next
* time a command is sent to that connection, it will return an error
* that there's a command pending.
*/
if
(
!
pRes
)
{
if
(
DEBUG4
>=
log_min_messages
)
{
/*
* Don't use elog, it's not thread-safe
*/
write_log
(
"%s -> idle"
,
segdbDesc
->
whoami
);
}
break
;
}
/*
* Attach the PGresult object to the CdbDispatchResult object.
*/
cdbdisp_appendResult
(
dispatchResult
,
pRes
);
/*
* Did a command complete successfully?
*/
resultStatus
=
PQresultStatus
(
pRes
);
if
(
resultStatus
==
PGRES_COMMAND_OK
||
resultStatus
==
PGRES_TUPLES_OK
||
resultStatus
==
PGRES_COPY_IN
||
resultStatus
==
PGRES_COPY_OUT
)
{
/*
* Save the index of the last successful PGresult. Can be given to
* cdbdisp_getPGresult() to get tuple count, etc.
*/
dispatchResult
->
okindex
=
resultIndex
;
if
(
DEBUG3
>=
log_min_messages
)
{
/*
* Don't use elog, it's not thread-safe
*/
char
*
cmdStatus
=
PQcmdStatus
(
pRes
);
write_log
(
"%s -> ok %s"
,
segdbDesc
->
whoami
,
cmdStatus
?
cmdStatus
:
"(no cmdStatus)"
);
}
if
(
resultStatus
==
PGRES_COPY_IN
||
resultStatus
==
PGRES_COPY_OUT
)
return
;
}
/*
* Note QE error. Cancel the whole statement if requested.
*/
else
{
char
*
sqlstate
=
PQresultErrorField
(
pRes
,
PG_DIAG_SQLSTATE
);
int
errcode
=
0
;
msg
=
PQresultErrorMessage
(
pRes
);
if
(
DEBUG2
>=
log_min_messages
)
{
/*
* Don't use elog, it's not thread-safe
*/
write_log
(
"%s -> %s %s %s"
,
segdbDesc
->
whoami
,
PQresStatus
(
resultStatus
),
sqlstate
?
sqlstate
:
"(no SQLSTATE)"
,
msg
?
msg
:
""
);
}
/*
* Convert SQLSTATE to an error code (ERRCODE_xxx). Use a generic
* nonzero error code if no SQLSTATE.
*/
if
(
sqlstate
&&
strlen
(
sqlstate
)
==
5
)
errcode
=
sqlstate_to_errcode
(
sqlstate
);
/*
* Save first error code and the index of its PGresult buffer
* entry.
*/
cdbdisp_seterrcode
(
errcode
,
resultIndex
,
dispatchResult
);
}
}
if
(
DEBUG4
>=
log_min_messages
)
write_log
(
"processResultsSingle says we are finished with : %s"
,
segdbDesc
->
whoami
);
dispatchResult
->
stillRunning
=
false
;
if
(
DEBUG1
>=
log_min_messages
)
{
char
msec_str
[
32
];
switch
(
check_log_duration
(
msec_str
,
false
))
{
case
1
:
case
2
:
write_log
(
"duration to dispatch result received from thread (seg %d): %s ms"
,
dispatchResult
->
segdbDesc
->
segindex
,
msec_str
);
break
;
}
}
if
(
PQisBusy
(
dispatchResult
->
segdbDesc
->
conn
))
write_log
(
"We thought we were done, because finished==true, but libpq says we are still busy"
);
}
}
/*
* Cleanup routine for the dispatching thread. This will indicate the thread
* is not running any longer.
...
...
@@ -1071,13 +761,7 @@ thread_DispatchCommand(void *arg)
pthread_cleanup_push
(
DecrementRunningCount
,
NULL
);
{
thread_DispatchOut
(
pParms
);
/*
* thread_DispatchWaitSingle might have a problem with interupts
*/
if
(
pParms
->
db_count
==
1
&&
false
)
thread_DispatchWaitSingle
(
pParms
);
else
thread_DispatchWait
(
pParms
);
thread_DispatchWait
(
pParms
);
}
pthread_cleanup_pop
(
1
);
...
...
src/backend/cdb/dispatcher/cdbdispatchresult.c
浏览文件 @
5d408fc1
...
...
@@ -283,40 +283,6 @@ cdbdisp_seterrcode(int errcode, /* ERRCODE_xxx or 0 */
}
}
/*
* Transfer connection error messages to dispatchResult from segdbDesc.
* returns true if segdbDesc had err info
*/
bool
cdbdisp_mergeConnectionErrors
(
CdbDispatchResult
*
dispatchResult
,
struct
SegmentDatabaseDescriptor
*
segdbDesc
)
{
if
(
!
segdbDesc
)
return
false
;
if
(
segdbDesc
->
errcode
==
0
&&
segdbDesc
->
error_message
.
len
==
0
)
return
false
;
/*
* Error code should always be accompanied by text and vice-versa.
*/
Assert
(
segdbDesc
->
errcode
!=
0
&&
segdbDesc
->
error_message
.
len
>
0
);
/*
* Append error message text and save error code.
*/
cdbdisp_appendMessage
(
dispatchResult
,
0
,
segdbDesc
->
errcode
,
"%s"
,
segdbDesc
->
error_message
.
data
);
/*
* Reset connection object's error info.
*/
segdbDesc
->
errcode
=
0
;
segdbDesc
->
error_message
.
len
=
0
;
segdbDesc
->
error_message
.
data
[
0
]
=
'\0'
;
return
true
;
}
/*
* Format a message, printf-style, and append to the error_message buffer.
* Also write it to stderr if logging is enabled for messages of the
...
...
@@ -550,7 +516,6 @@ void
cdbdisp_dumpDispatchResult
(
CdbDispatchResult
*
dispatchResult
,
bool
verbose
,
struct
StringInfoData
*
buf
)
{
SegmentDatabaseDescriptor
*
segdbDesc
=
dispatchResult
->
segdbDesc
;
int
ires
;
int
nres
;
...
...
@@ -630,14 +595,6 @@ cdbdisp_dumpDispatchResult(CdbDispatchResult *dispatchResult,
}
}
/*
* segdbDesc error message shouldn't be possible here, but check anyway.
* Ordinarily dispatchResult->segdbDesc is NULL here because we are
* called after it has been cleared by CdbCheckDispatchResult().
*/
if
(
dispatchResult
->
segdbDesc
)
cdbdisp_mergeConnectionErrors
(
dispatchResult
,
segdbDesc
);
/*
* Error found on our side of the libpq interface?
*/
...
...
src/include/cdb/cdbdisp_thread.h
浏览文件 @
5d408fc1
...
...
@@ -79,8 +79,7 @@ cdbdisp_dispatchToGang_internal(struct CdbDispatcherState *ds,
void
CdbCheckDispatchResult_internal
(
struct
CdbDispatcherState
*
ds
,
struct
SegmentDatabaseDescriptor
***
failedSegDB
,
int
*
numOfFailed
,
DispatchWaitMode
waitMode
);
DispatchWaitMode
waitMode
);
void
cdbdisp_waitThreads
(
void
);
...
...
src/include/cdb/cdbdispatchresult.h
浏览文件 @
5d408fc1
...
...
@@ -194,14 +194,6 @@ cdbdisp_seterrcode(int errcode, /* ERRCODE_xxx or 0 */
int
resultIndex
,
/* -1 if no PGresult */
CdbDispatchResult
*
dispatchResult
);
/*
* Transfer connection error messages to dispatchResult from segdbDesc.
* Returns true if segdbDesc had err info
*/
bool
cdbdisp_mergeConnectionErrors
(
CdbDispatchResult
*
dispatchResult
,
struct
SegmentDatabaseDescriptor
*
segdbDesc
);
/*
* Format a message, printf-style, and append to the error_message buffer.
* Also write it to stderr if logging is enabled for messages of the
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录