arranger.cpp 5.9 KB
Newer Older
W
init  
wangyunlai.wyl 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/**
 * Copyright (c) 2021 OceanBase
 * OceanBase Migration Service LogProxy is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#include <mutex>
#include <memory>
15
#include "common/version.h"
W
init  
wangyunlai.wyl 已提交
16 17 18
#include "common/log.h"
#include "arranger/source_invoke.h"
#include "arranger/arranger.h"
19 20
#include "obaccess/ob_access.h"
#include "obaccess/oblog_config.h"
W
init  
wangyunlai.wyl 已提交
21 22 23 24 25 26 27 28

namespace oceanbase {
namespace logproxy {

static Config& _s_conf = Config::instance();

int Arranger::init()
{
29 30 31 32
  if (!localhostip(_localhost, _localip) || _localhost.empty() || _localip.empty()) {
    OMS_ERROR << "Failed to fetch localhost name or localip";
    return OMS_FAILED;
  }
W
init  
wangyunlai.wyl 已提交
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54

  int ret = _accepter.init();
  if (ret == OMS_FAILED) {
    return ret;
  }
  _accepter.set_read_callback([this](const PeerInfo& peer, const Message& msg) { return on_msg(peer, msg); });
  return OMS_OK;
}

int Arranger::run_foreground()
{
  int ret = _accepter.listen(_s_conf.service_port.val());
  if (ret != OMS_OK) {
    return ret;
  }
  return _accepter.start();
}

EventResult Arranger::on_msg(const PeerInfo& peer, const Message& msg)
{
  OMS_INFO << "Arranger on_msg fired: " << peer.to_string();
  if (msg.type() == MessageType::HANDSHAKE_REQUEST_CLIENT) {
F
Fankux 已提交
55
    auto& handshake = (ClientHandshakeRequestMessage&)msg;
W
init  
wangyunlai.wyl 已提交
56 57 58 59 60
    OMS_INFO << "Handshake request from peer: " << peer.to_string() << ", msg: " << handshake.to_string();

    ClientMeta client = ClientMeta::from_handshake(peer, handshake);
    client.packet_version = msg.version();

61 62
    std::string errmsg;
    if (auth(client, errmsg) != OMS_OK) {
63
      response_error(peer, msg.version(), ErrorCode::NO_AUTH, errmsg);
64 65 66
      return EventResult::ER_CLOSE_CHANNEL;
    }

67
    ClientHandshakeResponseMessage resp(0, _localip, __OMS_VERSION__);
W
init  
wangyunlai.wyl 已提交
68 69 70 71
    resp.set_version(msg.version());
    int ret = _accepter.send_message(peer, resp, true);
    if (ret != OMS_OK) {
      OMS_WARN << "Failed to send handshake response message. peer=" << peer.to_string();
72
      return EventResult::ER_CLOSE_CHANNEL;
W
init  
wangyunlai.wyl 已提交
73 74 75 76
    }

    ret = create(client);
    if (ret != OMS_OK) {
77
      response_error(peer, msg.version(), E_INNER, "Failed to create oblogreader");
78
      return EventResult::ER_CLOSE_CHANNEL;
W
init  
wangyunlai.wyl 已提交
79 80 81 82 83 84 85 86
    }

  } else {
    OMS_WARN << "Unknown message type: " << (int)msg.type();
  }
  return EventResult::ER_SUCCESS;
}

87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
int Arranger::auth(ClientMeta& client, std::string& errmsg)
{
  if (_s_conf.auth_user.val()) {
    OblogConfig oblog_config(client.configuration);

    ObAccess ob_access;
    int ret = ob_access.init(oblog_config);
    if (ret != OMS_OK) {
      errmsg = "Failed to parse configuration";
      return ret;
    }

    ret = ob_access.auth();
    if (ret != OMS_OK) {
      errmsg = "Failed to auth";
      return ret;
    }
  }
  return OMS_OK;
}

W
init  
wangyunlai.wyl 已提交
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
int Arranger::create(const ClientMeta& client)
{
  std::lock_guard<std::mutex> lg(_op_mutex);

  OMS_INFO << "Client connecting: " << client.to_string();

  const ClientId& client_id = client.id;
  const auto& fd_entry = _client_peers.find(client_id);
  if (fd_entry != _client_peers.end()) {
    if (fd_entry->second != client.peer) {
      OMS_ERROR << "duplication exist clientId: " << client_id.to_string();
      close_client_locked(client, "duplication exist client_id");
      return OMS_FAILED;
    } else {
      OMS_WARN << "duplication exist clientId and channel";
      return OMS_OK;
    }
  } else {
    _client_peers.emplace(client_id, client.peer);
  }

  int ret = start_source(client, client.configuration);
  if (ret != OMS_OK) {
F
Fankux 已提交
131
    close_client_locked(client, "failed to invoke");
W
init  
wangyunlai.wyl 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
    return ret;
  }

  //    if (client.getEnableMonitor()) {
  //        LogProxyMetric.instance().registerRuntimeStatusCallback(client.getClientId(), status -> {
  //            status = status.toBuilder().setStreamCount(clients.size()).setWorkerCount(sources.size()).build();
  //            clientDataChannel.pushRuntimeStatus(status);
  //            return 0;
  //        });
  //        logger.info("Client registered Monitor: {}", client.getClientId());
  //    }

  OMS_INFO << "Client connected: " << client_id.to_string();
  return OMS_OK;
}

148 149 150 151 152 153 154 155 156
int Arranger::start_source(const ClientMeta& client, const std::string& configuration)
{
  int ret = SourceInvoke::invoke(_accepter, client, configuration);
  if (ret != OMS_OK) {
    return ret;
  }
  return OMS_OK;
}

157
void Arranger::response_error(const PeerInfo& peer, MessageVersion version, ErrorCode code, const std::string& errmsg)
158
{
159
  ErrorMessage error(code, errmsg);
160
  error.set_version(version);
161
  int ret = _accepter.send_message(peer, error, true);
162 163 164 165 166
  if (ret != OMS_OK) {
    OMS_WARN << "Failed to send error response message to peer:" << peer.to_string() << " for message:" << errmsg;
  }
}

W
init  
wangyunlai.wyl 已提交
167 168 169 170 171
int Arranger::close_client(const ClientMeta& client, const std::string& msg)
{
  std::lock_guard<std::mutex> _lp(_op_mutex);
  return close_client_locked(client, msg);
}
172

W
init  
wangyunlai.wyl 已提交
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
int Arranger::close_client_locked(const ClientMeta& client, const std::string& msg)
{
  auto channel_entry = _client_peers.find(client.id);
  if (channel_entry != _client_peers.end()) {
    // try to send errmsg to client, ignore send failures
    // TODO... error code
    ErrorMessage err(OMS_FAILED, msg);
    err.set_version(client.packet_version);
    int ret = _accepter.send_message(channel_entry->second, err, true);
    if (ret != OMS_OK) {
      OMS_WARN << "Failed to send error response message. client=" << client.peer.id();
    }
    _accepter.remove_channel(channel_entry->second);
    _client_peers.erase(channel_entry);
  }
  return OMS_OK;
}

}  // namespace logproxy
}  // namespace oceanbase