accessor.h 5.9 KB
Newer Older
T
tangwei12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#include <stdint.h>
#include <stdio.h>
#include <unordered_map>
#include <vector>
#include "paddle/fluid/distributed/common/registerer.h"
#include "paddle/fluid/distributed/ps.pb.h"

namespace paddle {
namespace distributed {
struct FsDataConverter {
  std::string converter;
  std::string deconverter;
};

struct Region {
  Region() : data(NULL), size(0) {}
  Region(char* data, size_t data_num) : data(data), size(data_num) {}
  Region(float* data, size_t data_num)
      : data(reinterpret_cast<char*>(data)), size(data_num << 2) {}
  Region(int16_t* data, size_t data_num)
      : data(reinterpret_cast<char*>(data)), size(data_num << 1) {}
  Region(int32_t* data, size_t data_num)
      : data(reinterpret_cast<char*>(data)), size(data_num << 2) {}
  Region(int64_t* data, size_t data_num)
      : data(reinterpret_cast<char*>(data)), size(data_num << 3) {}
  char* data;
  size_t size;
};

struct DataConverter {
  int param;
  std::string converter;
  std::string deconverter;
};

class ValueAccessor {
 public:
  explicit ValueAccessor(){};
  virtual ~ValueAccessor(){};

  virtual int configure(const TableAccessorParameter& parameter) {
    _config = parameter;
    // data_convert结构体初始化
    if (_config.table_accessor_save_param_size() != 0) {
      for (int i = 0; i < _config.table_accessor_save_param_size(); ++i) {
        int param = _config.table_accessor_save_param(i).param();
        std::string converter =
            _config.table_accessor_save_param(i).converter();
        std::string deconverter =
            _config.table_accessor_save_param(i).deconverter();
        _data_coverter_map[param] = std::make_shared<DataConverter>();
        *(_data_coverter_map[param]) = {param, converter, deconverter};
      }
    }
    return 0;
  }
  virtual int initialize() = 0;

  // value维度
  virtual size_t dim() = 0;
  // value各个维度的size
  virtual size_t dim_size(size_t dim) = 0;
  // value各维度相加总size
  virtual size_t size() = 0;

  // value中mf动态长度部分总size大小, sparse下生效
  virtual size_t mf_size() { return 0; }
  virtual bool need_extend_mf(float* value) { return false; }
  virtual bool has_mf(size_t size) { return false; }
  // pull value维度
  virtual size_t select_dim() = 0;
  // pull value各个维度的size
  virtual size_t select_dim_size(size_t dim) = 0;
  // pull value各维度相加总size
  virtual size_t select_size() = 0;
  // push value维度
  virtual size_t update_dim() = 0;
  // push value各个维度的size
  virtual size_t update_dim_size(size_t dim) = 0;
  // push value各维度相加总size
  virtual size_t update_size() = 0;
  // fea total for dense
  virtual size_t fea_dim() { return _config.fea_dim(); }
  // converter for save
  virtual std::string get_converter(int param) {
    auto itr = _data_coverter_map.find(param);
    if (itr == _data_coverter_map.end()) {
      return "";
    } else {
      return (*itr).second->converter;
    }
  }
  // deconverter for load
  virtual std::string get_deconverter(int param) {
    auto itr = _data_coverter_map.find(param);
    if (itr == _data_coverter_map.end()) {
      return "";
    } else {
      return (*itr).second->deconverter;
    }
  }
  // 判断该value是否进行shrink
  virtual bool shrink(float* value) = 0;

  // 判断该value是否在save阶段dump,
  // param作为参数用于标识save阶段,如downpour的xbox与batch_model
  virtual bool save(float* value, int param) = 0;
  // update delta_score and unseen_days after save
  virtual void update_stat_after_save(float* value, int param) {}

  // keys不存在时,为values生成随机值
  virtual int32_t create(float** value, size_t num) = 0;
  virtual bool create_value(int type, const float* value) { return true; }
  // 从values中选取到select_values中
  virtual int32_t select(float** select_values, const float** values,
                         size_t num) = 0;
  // 将update_values聚合到一起
  virtual int32_t merge(float** update_values,
                        const float** other_update_values, size_t num) = 0;
  // 将update_values聚合到一起,通过it.next判定是否进入下一个key
  // virtual int32_t merge(float** update_values, iterator it);
  // 将update_values更新应用到values中
  virtual int32_t update(float** values, const float** update_values,
                         size_t num) = 0;

  // used to save model, will filter feature
  virtual std::string parse_to_string(const float* value, int param) = 0;
  //  parse value from string, used to load model
  virtual int32_t parse_from_string(const std::string& data, float* value) = 0;

  virtual FsDataConverter converter(int param) {
    FsDataConverter data_convert;
    data_convert.converter = this->get_converter(param);
    data_convert.deconverter = this->get_deconverter(param);
    return data_convert;
  }

  virtual int set_weight(float** values, const float** update_values,
                         size_t num) {
    return 0;
  }

  virtual float get_field(float* value, const std::string& name) { return 0.0; }

 protected:
  size_t _value_size;
  size_t _select_value_size;
  size_t _update_value_size;
  TableAccessorParameter _config;
  std::unordered_map<int, std::shared_ptr<struct DataConverter>>
      _data_coverter_map;
};
REGISTER_REGISTERER(ValueAccessor);
}  // namespace distributed
}  // namespace paddle