BinaryProtoLookupService.cc 6.7 KB
Newer Older
S
saandrews 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
/**
 * Copyright 2016 Yahoo Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "DestinationName.h"
#include "BinaryProtoLookupService.h"
#include "LogUtils.h"
#include "SharedBuffer.h"

#include <boost/shared_ptr.hpp>
#include <boost/bind.hpp>
#include "ConnectionPool.h"

#include <string>

DECLARE_LOG_OBJECT()

namespace pulsar {

    /*
     * @param lookupUrl service url to do lookup
     * Constructor
     */
    BinaryProtoLookupService::BinaryProtoLookupService(ConnectionPool& cnxPool, const std::string& lookupUrl)
    :
    cnxPool_(cnxPool),
    serviceUrl_(lookupUrl),
    mutex_(),
    requestIdGenerator_(0) {}

    /*
     * @param destination_name topic name to get broker for
     *
     * Looks up the owner broker for the given destination name
     */
    Future<Result, LookupDataResultPtr> BinaryProtoLookupService::lookupAsync(const std::string& destinationName) {
        DestinationNamePtr dn = DestinationName::get(destinationName);
        if (!dn) {
            LOG_ERROR("Unable to parse destination - " << destinationName);
            LookupDataResultPromisePtr promise = boost::make_shared<LookupDataResultPromise>();
            promise->setFailed(ResultInvalidTopicName);
            return promise->getFuture();
        }
        std::string lookupName = dn->toString();
        LookupDataResultPromisePtr promise = boost::make_shared<LookupDataResultPromise>();
        Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_);
        future.addListener(boost::bind(&BinaryProtoLookupService::sendTopicLookupRequest, this, lookupName, false, _1, _2, promise));
        return promise->getFuture();
    }

    /*
     * @param    destination_name topic to get number of partitions.
     *
     */
    Future<Result, LookupDataResultPtr> BinaryProtoLookupService::getPartitionMetadataAsync(const DestinationNamePtr& dn) {
        LookupDataResultPromisePtr promise = boost::make_shared<LookupDataResultPromise>();
        if (!dn) {
            promise->setFailed(ResultInvalidTopicName);
            return promise->getFuture();
        }
        std::string lookupName = dn->toString();
        Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_);
        future.addListener(boost::bind(&BinaryProtoLookupService::sendPartitionMetadataLookupRequest, this, lookupName, _1, _2, promise));
        return promise->getFuture();
    }


    void BinaryProtoLookupService::sendTopicLookupRequest(const std::string& destinationName, bool authoritative, Result result, const ClientConnectionWeakPtr& clientCnx, LookupDataResultPromisePtr promise) {
        if (result != ResultOk) {
            promise->setFailed(ResultConnectError);
            return;
        }
        LookupDataResultPromisePtr lookupPromise = boost::make_shared<LookupDataResultPromise>();
        ClientConnectionPtr conn = clientCnx.lock();
        uint64_t requestId = newRequestId();
        conn->newTopicLookup(destinationName, authoritative, requestId, lookupPromise);
        lookupPromise->getFuture().addListener(boost::bind(&BinaryProtoLookupService::handleLookup, this, destinationName, _1, _2, clientCnx, promise));
    }

    void BinaryProtoLookupService::handleLookup(const std::string& destinationName,
            Result result, LookupDataResultPtr data, const ClientConnectionWeakPtr& clientCnx,
            LookupDataResultPromisePtr promise) {
        if (data) {
            if(data ->isRedirect()) {
                LOG_DEBUG("Lookup request is for " << destinationName << " redirected to " << data->getBrokerUrl());
                Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(data->getBrokerUrl());
                future.addListener(boost::bind(&BinaryProtoLookupService::sendTopicLookupRequest, this, destinationName, data->isAuthoritative(), _1, _2, promise));
            } else {
                LOG_DEBUG("Lookup response for " << destinationName << ", lookup-broker-url " << data->getBrokerUrl());
                promise->setValue(data);
            }
        } else {
            LOG_DEBUG("Lookup failed for " << destinationName << ", result " << result);
            promise->setFailed(result);
        }
    }

    void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::string& destinationName, Result result, const ClientConnectionWeakPtr& clientCnx, LookupDataResultPromisePtr promise) {
        if (result != ResultOk) {
            promise->setFailed(ResultConnectError);
            Future<Result, LookupDataResultPtr> future = promise->getFuture();
            return;
        }
        LookupDataResultPromisePtr lookupPromise = boost::make_shared<LookupDataResultPromise>();
        ClientConnectionPtr conn = clientCnx.lock();
        uint64_t requestId = newRequestId();
        conn->newPartitionedMetadataLookup(destinationName, requestId, lookupPromise);
        lookupPromise->getFuture().addListener(boost::bind(&BinaryProtoLookupService::handleLookup, this, destinationName, _1, _2, clientCnx, promise));
    }

    void BinaryProtoLookupService::handlePartitionMetadataLookup(const std::string& destinationName,
            Result result, LookupDataResultPtr data, const ClientConnectionWeakPtr& clientCnx,
            LookupDataResultPromisePtr promise) {
        if (data) {
            if(data->isRedirect()) {
R
Rajan 已提交
128
                LOG_DEBUG("PartitionMetadataLookup request is for " << destinationName << " redirected to " << data->getBrokerUrl());
S
saandrews 已提交
129 130 131
                Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(data->getBrokerUrl());
                future.addListener(boost::bind(&BinaryProtoLookupService::sendPartitionMetadataLookupRequest, this, destinationName, _1, _2, promise));
            } else {
R
Rajan 已提交
132
                LOG_DEBUG("PartitionMetadataLookup response for " << destinationName << ", lookup-broker-url " << data->getBrokerUrl());
S
saandrews 已提交
133 134 135
                promise->setValue(data);
            }
        } else {
R
Rajan 已提交
136
            LOG_DEBUG("PartitionMetadataLookup failed for " << destinationName << ", result " << result);
S
saandrews 已提交
137 138 139 140 141 142 143 144 145 146
            promise->setFailed(result);
        }
    }

    uint64_t BinaryProtoLookupService::newRequestId() {
        Lock lock(mutex_);
        return ++requestIdGenerator_;
    }

}