提交 9265ea6a 编写于 作者: N Ning Yu 提交者: Hubert Zhang

ic-proxy: refresh peers on demand

The user can adjust the ic-proxy peer addresses at runtime and reload by
sending SIGHUP, if an address is modified or removed, the corresponding
peer connection must be closed or reestablished.  The same to the peer
listener, if the listener port is changed, then must re-setup the
listener.
上级 854c4b84
......@@ -27,6 +27,8 @@
#include <unistd.h>
static void ic_proxy_server_peer_listener_init(uv_loop_t *loop);
static uv_loop_t ic_proxy_server_loop;
static uv_signal_t ic_proxy_server_signal_hup;
static uv_signal_t ic_proxy_server_signal_int;
......@@ -36,6 +38,7 @@ static uv_timer_t ic_proxy_server_timer;
static uv_tcp_t ic_proxy_peer_listener;
static bool ic_proxy_peer_listening;
static bool ic_proxy_peer_relistening;
static uv_pipe_t ic_proxy_client_listener;
static bool ic_proxy_client_listening;
......@@ -55,6 +58,13 @@ ic_proxy_server_peer_listener_on_closed(uv_handle_t *handle)
/* A new peer listener will be created on the next timer callback */
ic_proxy_peer_listening = false;
/* If relisten is requested, do it now */
if (ic_proxy_peer_relistening)
{
ic_proxy_peer_relistening = false;
ic_proxy_server_peer_listener_init(handle->loop);
}
}
/*
......@@ -200,6 +210,40 @@ ic_proxy_server_peer_listener_init(uv_loop_t *loop)
ic_proxy_peer_listening = true;
}
/*
* Reinit the peer listener.
*/
static void
ic_proxy_server_peer_listener_reinit(uv_loop_t *loop)
{
const ICProxyAddr *myaddr = ic_proxy_get_my_addr();
if (ic_proxy_peer_relistening)
return;
if (ic_proxy_peer_listening)
{
/*
* We are listening already, so must first close the current one, we
* keep the ic_proxy_peer_listening as true during the process to
* prevent double connect.
*/
ic_proxy_log(LOG, "ic-proxy-server: closing the legacy peer listener");
/* Only recreate a new listener if an address is assigned to us */
ic_proxy_peer_relistening = !!myaddr;
uv_close((uv_handle_t *) &ic_proxy_peer_listener,
ic_proxy_server_peer_listener_on_closed);
}
else if (myaddr)
{
/* Otherwise simply establish a new one */
ic_proxy_peer_relistening = false;
ic_proxy_server_peer_listener_init(loop);
}
}
/*
* The client listener is closed.
*/
......@@ -353,6 +397,57 @@ ic_proxy_server_ensure_peers(uv_loop_t *loop)
}
}
/*
* Drop legacy peers.
*
* The list ic_proxy_removed_addrs contains both removed and updated addresses,
* the corresponding peers must be disconnected before taking further actions.
*/
static void
ic_proxy_server_drop_legacy_peers(uv_loop_t *loop)
{
ListCell *cell;
const ICProxyAddr *myaddr = ic_proxy_get_my_addr();
/*
* Also take the chance to check the peer listener.
*
* If myaddr cannot be found at all, the address must have been removed,
* close the current listener.
*/
if (!myaddr)
ic_proxy_server_peer_listener_reinit(loop);
foreach(cell, ic_proxy_removed_addrs)
{
ICProxyAddr *addr = lfirst(cell);
ICProxyPeer *peer;
/*
* Also take the chance to check the peer listener.
*
* If myaddr appears in the removed list, then the address must have
* been changed or removed, no need to compare the sockaddrs again.
*/
if (myaddr && myaddr->dbid == addr->dbid)
ic_proxy_server_peer_listener_reinit(loop);
/*
* Refer to ic_proxy_server_ensure_peers() on why we need below checks.
*/
if (addr->content >= GpIdentity.segindex)
continue;
if (addr->dbid == GpIdentity.dbid)
continue; /* do not connect to my primary / mirror */
peer = ic_proxy_peer_lookup(addr->content, addr->dbid);
if (!peer)
continue;
ic_proxy_peer_disconnect(peer);
}
}
/*
* Timer handler.
*
......@@ -383,6 +478,7 @@ ic_proxy_server_on_signal(uv_signal_t *handle, int signum)
ProcessConfigFile(PGC_SIGHUP);
ic_proxy_reload_addresses(handle->loop);
ic_proxy_server_drop_legacy_peers(handle->loop);
ic_proxy_server_peer_listener_init(handle->loop);
ic_proxy_server_ensure_peers(handle->loop);
......@@ -434,6 +530,7 @@ ic_proxy_server_main(void)
ic_proxy_client_table_init();
ic_proxy_peer_listening = false;
ic_proxy_peer_relistening = false;
ic_proxy_client_listening = false;
uv_signal_init(&ic_proxy_server_loop, &ic_proxy_server_signal_hup);
......@@ -510,6 +607,9 @@ ic_proxy_server_quit(uv_loop_t *loop, bool relaunch)
*/
if (ic_proxy_peer_listening)
{
/* cancel pending relistening request */
ic_proxy_peer_relistening = false;
uv_unref((uv_handle_t *) &ic_proxy_peer_listener);
uv_close((uv_handle_t *) &ic_proxy_peer_listener, NULL);
}
......
......@@ -813,6 +813,27 @@ ic_proxy_peer_connect(ICProxyPeer *peer, struct sockaddr_in *dest)
ic_proxy_peer_on_connected);
}
/*
* Disconnect a peer.
*
* The peer can be in any state, the caller only needs to ensure not to call
* this function from a peer callback.
*/
void
ic_proxy_peer_disconnect(ICProxyPeer *peer)
{
/* No such a peer yet */
if (!peer)
return;
/* No connection is made or being made */
if (!(peer->state & IC_PROXY_PEER_STATE_CONNECTING))
return;
ic_proxy_log(LOG, "%s: disconnecting", peer->name);
ic_proxy_peer_shutdown(peer);
}
/*
* Send a packet to a remote peer.
*/
......
......@@ -127,6 +127,7 @@ extern ICProxyPeer *ic_proxy_peer_new(uv_loop_t *loop,
extern void ic_proxy_peer_free(ICProxyPeer *peer);
extern void ic_proxy_peer_read_hello(ICProxyPeer *peer);
extern void ic_proxy_peer_connect(ICProxyPeer *peer, struct sockaddr_in *dest);
extern void ic_proxy_peer_disconnect(ICProxyPeer *peer);
extern void ic_proxy_peer_route_data(ICProxyPeer *peer, ICProxyPkt *pkt,
ic_proxy_sent_cb callback, void *opaque);
extern ICProxyPeer *ic_proxy_peer_lookup(int16 content, uint16 dbid);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册