未验证 提交 1acddc34 编写于 作者: L lxsbupt 提交者: GitHub

Merge gpugraph to develop (#48507)

* merge gpugraph to develop, fix code style

* update for untrainable params for stage3. (#48577)

* merge gpugraph to develop, trigger ci

* [CodeStyle][isort][Dy2St] sort imports in test_error (#48746)

* [CodeStyle][isort][Dy2St] sort imports in test_error

* update lineno

* Clear extra input (Bias, ResidualData) in OpMaker of conv2d (#47579)

* delete Bias and ResidualData in OpMaker of conv2d

* delete extra input of conv3d

* refactor pass of conv_bias_fusion

* fix mkldnn dependency

* fix mkldnn compile

* fix test_conv_bias_mkldnn_fuse_pass

* police some code

* remove useless log

* fix analyzer_vit_ocr_tester

* fix conv_activation_mkldnn_fuse_pass

* fix test_analyzer_ocr

* add fused_conv_sig

* fix performence regression

* fix performance regression

* make bilinear interpolate stable. (#48644)

* make bilinear interpolate stable.

* fix code

* clear tmp var in ptq (#48660)

* merge gpugraph to develop, fix py-api comment

* merge gpugraph to develop, fix mac-python3

* merge gpugraph to develop, fix mac-python3

* [Dy2St] replace deprecated `load_module` with `exec_module` (#48679)

* merge gpugraph to develop, fix mac-python3

* modify d2d copy to xpu::copy in xpu kernel, test=kunlun (#48710)

* rm _test_eager_guard (#48767)

* delete sampling_id api (#48543)

* [NPU] add FLAGS_npu_storage_format env to enable npu storage format, test=develop (#48774)

* optimize nchw<->nhwc kernel in fp16 model (#48692)

* fix: oss just support sm>=75 (#48731)

* update kl1 op list and optimize matmul unitest for kunlun (#48775)

*test=kunlun

* Fix accuracy fp16 kernel return fp32 tensor error (#48803)

* [phi::DenseTensor] Replace Tensor with phi::DenseTensor (#48682)

* [Zero-Dim] Support 0D for paddle.diagflat (#48735)

* [Zero-Dim] Support 0D for paddle.diagflat

* 【fluid api clear】Move batch norm1 (#47965)

* modify slice infershape

* code style

* modify slice_unittest

* temp fix

* batch_norm api move

* code_style

* codestyle

* ci_static

* add __init__

* reset other change

* revert .cc

* add import batchnorm

* conflict and revert

* fix bug

* fix third conflict one day

* fix conflict

* fix conflict bug

* fix conflict bug

* modify api

* code_style

* modify doc

* add lost doc stable

* fix conflict bug

* ci lack of gpu

* [remove fluid] PRelu BilinearTensorProduct Conv2DTranspose SequenceConv RowConv (#48654)

* [remove fluid] PRelu BilinearTensorProduct

* [remove fluid] PRelu BilinearTensorProduct Conv2DTranspose SequenceConv RowConv

* [remove fluid] PRelu BilinearTensorProduct Conv2DTranspose SequenceConv RowConv

* [remove fluid] PRelu BilinearTensorProduct Conv2DTranspose SequenceConv RowConv

* [remove fluid] PRelu BilinearTensorProduct Conv2DTranspose SequenceConv RowConv

* [remove fluid] PRelu BilinearTensorProduct Conv2DTranspose SequenceConv RowConv

* [remove fluid] PRelu BilinearTensorProduct Conv2DTranspose SequenceConv RowConv

* [remove fluid] PRelu BilinearTensorProduct Conv2DTranspose SequenceConv RowConv

* merge gpugraph to develop, rollback graph_send_recv

* fix ci (#48730)

* Remove reduntant numpy output in Example code (1/3), test=document_fix (#48678)

* 修改了英文API文档 (#48219)

* 修改paddle.nn.dynamic_decode,paddle.nn.functional.diag_embed 示例

* mma qk tensor_core (#48087)

* use mma for QK dot computing in fused_multi_transformer.
* Update fused_multi_transformer_op.cu.h

* remove lrn which is not used in paddle 2.0 (#47945)

* replace scatter_nd and scatter_nd_add with paddle.scatter_nd and (#47960)

paddle.scatter_nd_add

* [PHI] Migrate mul_grad kernel (#48061)

* cleanup unused code

* unify is_int8 is_bfloat16

* Simplify matmul_v2 FWD kernel

* remove RunKernel methods

* remove import namespace

* remove headers

* clean fluid/phi cross imports

* remove fluid axpy_handler

* delete fluid methods

* activations

* OneDNNMemDesc

* MKLDNNFormatForSize

* MatchShapeToLayout

* MKLDNNMemoryFormat

* MKLDNNFormat

* ReorderMKLDNNHandler

* to_void_cast

* review suggestions

* interpolate

* remove fluid depedency

* init

* ExecuteMatMulV2

* rm fluid kernel

* matmul_grad

* remove mutable_data

* mul_grad

* delete unnecessary shape and slice op (#48112)

* 修改英文文档。

* 修改segment operator等英文文档。

* 重新修改了paddle.einsum,paddle.unique_consecutive,
paddle.disable_signal_handler的英文文档格式。

* 重新修改了英文文档格式。;test=docs_preview

* Update extension.py

* 重新修改了英文文档格式。;test=docs_preview

* 重新修改了英文文档格式。
待验收:
- paddle.linalg.svd
- paddle.nn.functional.diag_embed
- paddle.set_grad_enabled
- paddle.disable_signal_handler
- paddle.cumprod
- paddle.devaice.cuda.stream_guard

待修改:
- paddle.nn.dynamic_decode
- paddle.einsum
- paddle.unique_consecutive
- paddle.linalg.svd
- paddle.uncubate.segment_min
- paddle.uncubate.segment_max
- paddle.uncubate.segment_sum
- paddle.uncubate.segment_mean

;test=docs_preview

* 重新修改了英文文档格式。
待验收:
- paddle.linalg.svd
- paddle.nn.functional.diag_embed
- paddle.set_grad_enabled
- paddle.disable_signal_handler
- paddle.cumprod
- paddle.devaice.cuda.stream_guard
- paddle.nn.dynamic_decode
- paddle.unique_consecutive
- paddle.linalg.svd

待修改:
- paddle.einsum
- paddle.incubate.segment_min
- paddle.incubate.segment_max
- paddle.incubate.segment_sum
- paddle.incubate.segment_mean

;test=docs_preview

* 重新修改了英文文档格式。
待验收:
- paddle.linalg.svd
- paddle.nn.functional.diag_embed
- paddle.set_grad_enabled
- paddle.disable_signal_handler
- paddle.cumprod
- paddle.devaice.cuda.stream_guard
- paddle.nn.dynamic_decode
- paddle.unique_consecutive
- paddle.linalg.svd

待修改:
- paddle.einsum
- paddle.incubate.segment_min
- paddle.incubate.segment_max
- paddle.incubate.segment_sum
- paddle.incubate.segment_mean

;test=docs_preview

* update

* test=docs_preview

* update formula; test=docs_preview

* update formula; test=docs_preview

* remove this operator; test=docs_preview

* add hyper link; test=docs_preview

* add default value; test=docs_preview

* update format; test=docs_preview

* empty commit; test=docs_preview

* fix codestyle issues; test=docs_preview

* empty commit; test=docs_preview
Co-authored-by: Nlzy <569782149@qq.com>
Co-authored-by: NVvsmile <450864116@qq.com>
Co-authored-by: NSławomir Siwek <slawomir.siwek@intel.com>
Co-authored-by: NRichardWooSJTU <37864677+RichardWooSJTU@users.noreply.github.com>
Co-authored-by: NLigoml <39876205+Ligoml@users.noreply.github.com>
Co-authored-by: NNyakku Shigure <sigure.qaq@gmail.com>

* [PHI] Migrate squeeze and squeeze_grad kernels (#48634)

* squeeze kernel

* squeze fwd

* whitespace

* 修复paddle.nn.functinal包和paddle.nn包下API文档 (#48581)

* assign cve number to pdsa, test=document_fix (#48846)

* [fluid remove]: remove paddle.fluid.layers.yolo_box and paddle.fluid.layers.yolov3_loss (#48722)

* remove paddle.fluid.layers.nn.temporal_shift

* code check

* rm unittest

* remove fluid.yolo_box

* remove fluid.yolov3_loss

* change the comments of yolov3_loss to yolo_loss

* merge gpugraph to develop, fix windows compile

* merge gpugraph to develop, fix windows compile

* merge gpugraph to develop, fix windows compile

* Try add eval() to speedup the eigen performance. (#48855)

* [Fluid Clean]move inplace_apis_indygraph_only from paddle.flud.dygraph.inplace_utils to paddle.utils (#48744)

* move inplace_apis_indygraph_only from paddle.flud.dygraph.inplace_utils to paddle.utils

* modify conflict

* modify conflict

* modify conflict

* modify conflict

* modify conflict

* modify conflict

* modify conflict

* modify static-check ci error

* fix conflict

* modify failed tests

* fix conflict

* fix conflict

* fix pool2d examples

* modify conflict

* fix failed tests

* fix conflict

* fix failed tests

* modfiy problem of deleting pool2d

* merge gpugraph to develop, fix windows compile

* clean fluid task: transfer gaussian random api (#48529)

* Delete duplicate quant nodes in QAT (#48751)

* rm autograd func dynamic eager tests (#48788)

* Setuptools optimization (#48770)

* optimize setup.py

* modify setup.py

* modify setup.py

* modify setup.py

* modify setup.py after zhangbo reviewed

* [CodeStyle][F811] fix some test cases shadowed by the same name (#48745)

* [CodeStyle][F811] fix some unittests

* fix setup.py

* remove ignore from flake8 config

* remove repeat TestAbsDoubleGradCheck

* fix rrelu test

* fix fft ut

* add noqa in fluid.lstm ut

* add rtol and atol in test_matmul_v2_op

* update rtol

* empty commit

* empty commit

* revert changes in matmul ut and add noqa

* rename test case name

* set free_when_no_cache_hit default value to true (#48815)

* [Clean Fluid] Rm and mv some fluid dygrah apis (#48576)

Remove fluid dygrah apis
GroupNorm
TreeConv
Move fluid dygraph apis
Flatten
SpectralNorm

* [Inference] inference add cinn interface (#48741)

* Clean and migrate fluid APIs of paddle.fluid.layers.control_flow (#48233)

* Merge branch 'reduce_sum' of https://github.com/GhostScreaming/Paddle into mine_fluid_clean_common.

* Fix some bugs.

* Clean APIs in python/paddle/fluid/layers/control_flow.py

* Polish code style.

* Change API.

* Fix some bugs.

* Fix some bugs.

* remove gpu_info.h from phi dependencies (#48811)

* [Paddle Inference] Add add onehot trt converter (#48655)

* add onehot trt converter

* add unitest

* fix bug

* opt code

* fix bug

* fix depth_tensor

* fix unitest

* fix bug

* fix unitest

* fix bug

* fix bug

* fix bug

* fix bug

* [PHI decoupling] remove  bbox_util.h from phi dependencies (#48761)

* remove bbox_util.h from phi

* add file bbox_util.h

* reframe bbox_util.h

* Optimize Paddle diagonal (#47904)

* [API Clean]Clean __all__ to avoid exposing usless API (#48713)

* [API Clean]Clean __all__ to avoid exposing usless API

* fix import

* fix typo

* remove tracedLayer unittest

* Clean fluid APIs in distributed and fleet files (#48851)

* Fix bug of reduce_sum op. When input.numel() > INT32_MAX, its result
is wrong.

* Remove climits.

* Clean fluid API in paddle/distributed and paddle/fleetx folders.
Include following files:
python/paddle/distributed/__init__.py
python/paddle/distributed/collective.py
python/paddle/distributed/fleet/utils/fs.py
python/paddle/distributed/fleet/utils/hybrid_parallel_inference.py
python/paddle/distributed/fleet/utils/hybrid_parallel_util.py
python/paddle/distributed/fleet/utils/internal_storage.py
python/paddle/distributed/launch/context/device.py
python/paddle/distributed/parallel.py
python/paddle/distributed/parallel_with_gloo.py
python/paddle/distributed/spawn.py
python/paddle/framework/__init__.py
To be mentioned, 'paddle.fluid.dygraph.parallel.ParallelEnv'
 and 'fluid.framework.core' keeps unchanged in those files.
ParallelEnv is used by paddle.fluid.dygraph.parallel.DataParallel.
However, APIs in paddle.fluid.dygraph.parallel can't be
migrated to paddle.distributed, as there exists cyclic import
dependencies in modules like paddle.static, paddle.tensor. And
'fluid.framework.core' will be changed to import framework.core
after fluid.core is transmitted.

* Change TODO authors.

* rm kunlun xpu2_op_list (#48826)

*test=kunlun

* remove detection_output, iou_similarity and bipartite_match (#48773)

* Set WaiterType of kGpuSync to kCPU (#48758)

* [Migrate Fluid] Migrate Decoder, BeamSearchDecoder (#48754)

* [Inference] Enable infer shape cache. (#48312)

* [Fluid Clean] remove unfold, deformable_roi_pooling, shard_index, hard_swish, mish, uniform_random, unbind (#48451)

* fix-gpups setup.py (#48888)

* fix-gpups

* test=document_fix

* [PHI decoupling] move cuda_graph from fluid to phi (#48686)

* move cuda_graph from fluid to phi

* move device_memory_aligment from fluid to phi

* Revert "move device_memory_aligment from fluid to phi"

This reverts commit b92fcd39a0a50fdac13278f49be0237a85f3a13f.

* update xpu cmake

* fix english docs typo errors (#48599)

* fix english docs typo errors

the errors in docs as same as chinese pr 5468

* update docs; test=docs_preview
Co-authored-by: NLigoml <39876205+Ligoml@users.noreply.github.com>

* [XPU] add load op into oplist. (#48860)

* [XPU] add load op into oplist.

* remove test_sampling_id_op_xpu.py

* 【fluid clean】remove fluid.dygraph.rnn.lstmcell and fluid.dygraph.rnn.grucell (#48719)

* refine bsd doc (#48882)

* [Paddle Inference] General optimization for no_varlen embedding layernorm (#48580)

* general optimization no_varlen embedding layernorm

* fix tmp directories (#48863)

* rm dygraph_to_static eager guard tests part2 minst2ptb_lm (#48793)

* rm dygraph_to_static eager guard tests part2 minst2ptb_lm

* merge gpugraph to develop, fix the_one_ps.py for gpups

* [remove fluid] under unittesets of linear api (#48564)

* [remove fluid] under unittesets of linear api

* [remove fluid] under unittesets of linear api

* [remove fluid] under unittesets of linear api

* [remove fluid] under unittesets of linear api

* [remove fluid] under unittesets of linear api

* [remove fluid] under unittesets of linear api

* [remove fluid] fluid dygrapn linear api

* [remove fluid] fluid dygrapn linear api

* [remove fluid] fluid dygrapn linear api

* [remove fluid.layers.cross_entropy] remove unit tests (part 1) (#48726)

* replace layers.cross_entropy with paddle.entropy

* fix args

* fix codestyle

* proper fix (#48360)

Reenabled ext_reorder recording for TransDataLayoutFromOneDNN

* [remove fluid.layers.matmul] remove fluid.layers.matmul in example code (#48818)

* replace fluid.layers.matmul in fluid/io.py

* fix doc error in fluid.layers.nn.sampling_id

* remove test_auto_search_dist_matmul_op.py (#48794)

* delete mean api (#48764)

* clean test_op_name_conflict (#48704)

* opt kernel_selection error msg (#48864)

* rewrite delete_weight_dequant_linear_op_encoder/decoder pass (#48650)

* rewrite delete_weight_deqquant_linear_op_encoder/decoder pass

* [XPU] add set_value and set_value_grad (#48845)

* merge gpugraph to develop, fix gpups ut

* Add QuantizedMatmul in QAT (#47997)

* fix 'BlasAXPBY unimplemented' error with custom device (#48762)

* fix 'BlasAXPBY unimplemented' error with custom device

* fix utils CmakeLists bug

* first commit (#38143)

* [Auto Parallel] Add cluster partition and dm to pm (#48320)

* add cluster_partition and device_meshes to process_meshes funcs

* add unitest

* fix paddle2cinn float16 type support bug (#48249)

* remove pool2d from fluid (#48512)

* remove pool2d

* [fluid remove]: remove paddle.fluid.layers.detection_map, paddle.fluid.metrics.DetectionMAP and paddle.fluid.evaluator.DetectionMAP (#48674)

* remove paddle.fluid.layers.nn.temporal_shift

* code check

* rm unittest

* remove paddle.fluid.layers.detection_map and the class:DetectionMAP

* [PHI decoupling] move "flags.h" from fluid to phi (#48696)

* add set_lr & get_lr for stage2 optimizer. (#48857)

* move share_buffer kernel to phi (#48858)

* move share_buffer kernel to phi

* fix ut

* add source file

* fix window links

* [Kernel Selection] Simplify kernel selection process in phi, reduce search number to half (#47771)

* simplify SelectKernelOrThrowError function in phi

* opt kernel_selection process

* polish code, fix backend error

* Support static graph code-gen for scalar and int_array (#48792)

* add suppport_tensor for code_gen to static graph

* support code-gen for int_array

* polish code

* fix bug of data_type

* clean unittest test_model_cast_to_bf16 (#48705)

* rm dy2static eager tests part1 bert2loop (#48790)

* rm dygraph_to_static eager guard tests part3 reinforce2yolo (#48795)

* rm distribution uniform eager guard test (#48768)

* rm distribution uniform eager guard test

* review

* replace cross_entropy in python/paddle/fluid/tests/unittests/test_[a-n]*.py except test_dist_transpiler.py (#48913)

* replace cross_entropy except in python/paddle/fluid/tests/unittests/*.py && unittests/*/*.py (#48922)

* [Paddle Inference]add cutlass act set in conv_elementwise_add_act_fuse_pass (#48838)

* add cutlass act set in conv_elementwise_add_act_fuse_pass

* move fluid.layers.create_global_var to static.create_global_var (#48777)

* Modified the Kernel policy. When the compute is NHWC (#48563)

* temporally disable set_value (#48942)

* xpu support inplace flatten (#48909)

This is a PR to catch up with latest xpu white list strategy
(https://github.com/PaddlePaddle/Paddle/pull/48606)
, since original list only include 'fluid' fashion names, but new list
must include 'phi' fashion as well.
Refer to paddle/phi/core/kernel_factory.cc for more details.

* fix:vit_attention ut (#48884)

* mv fused_bias_dropout_residual_ln to fluid manual dir (#48824)

* mv fused_bias_dropout_residual_ln to fluid manual dir

* rm useless comments

* bug fix (#48829)

* move ops_extra_info_gen.py from phi to fluid (#48926)

* fix scale type in alpha and beta (#48887)

* [inference][trt] upgrade prelu op  (#48528)

* add prelu

* 对多个文档按照要求修改 对应中文的#5453 (#48886)

* fix doc

* test=document_fix
Co-authored-by: NLigoml <39876205+Ligoml@users.noreply.github.com>

* replace cross_entropy in python/paddle/fluid/tests/unittests/*.py except test*.py (#48919)

* [remove fluid] Remove fluid APIs (#48641)

* [CodeStyle] fix renamed files not being monitored by Codestyle Check (#48892)

* [fluid remove]: remove paddle.fluid.layers.box_coder and paddle.fluid.layers.polygon_box_transform (#48896)

* remove fluid_box_coder and polygon_box_transform

* code check

* [Custom XPU Support] Custom extension support xpu backend (#48733)

* support custom_xpu

* update cmake to test xpu

* support custom_xpu, verify mechanism

* fix test_custom_relu_op_xpu_setup.py, test=kunlun

* fix FLAGS_init_allocated_mem

* cancel TIMEOUT property

* reset FLAGS_init_allocated_mem property

* rm mlu ops eager guard tests (#48769)

* rm npu instance_np op for eager guard tests (#48785)

* remove xpu eager guard tests (#48786)

* [remove fluid.layers.cross_entropy] remove unit tests (part 3)  (#48918)

* replace cross_entropy in python/paddle/fluid/tests/unittests/test_[o-z]*.py plus test_dist_transpiler.py

* fix test_prune

* [Inference] optimize some code and fix some bug (#48780)

* clean ir_pass_manager and fix map_depthwise_conv_to_conv_pass

* fix unitest timeout

* [PHI] Migrate reshape kernel (#48749)

* reshape

* typo

* remove header

* support py3 in setup.py (#48905)

* support py3 in setup.py

* support setup.py bdist_wheel in py3

* support py3 in setup.py

* modify run_setup

* [Paddle-TRT] add cast between  int64 tensor  and Paddle-TRT (#45547)

* Add cast between int64 tensor and Paddle-TRT
* Add Unit testing.

* fix sharding_stage1 amp O2 decorate bug (#48960)

* [remove fluid] fluid dygraph Embedding (#48806)

* [remove fluid] fluid dygraph Embedding

* [remove fluid] fluid dygraph Embedding

* [remove fluid] fluid dygraph Embedding

* [remove fluid] fluid dygraph Embedding

* [remove fluid] fluid dygraph Embedding

* [remove fluid] fluid dygraph Embedding

* fix for mkldnn (#48852)

* H2D data transfer optimization with usage of structure type for stack kernel (#48899)

* first commit.

* refine performance with fast_divmod

* refine performance with fast_divmod

* rm accuracy and auc in extra __all__ (#48986)

* Add dynamic checks for collective communication on NCCL  (#48915)

* chore: unify `SingleTensor`

* feat: dynamic check

* support sharding in fp16 on xpu,  (#48897)

* support sharding in fp16 on xpu, change reduce_max to reduce_sum for found nan or inf

* update

* Support cross-step stream synchronization for standalone executor (#48809)

* Add UT

* Support cross-step stream synchronization for standalone executor

* Fix typos

* Fix typos

* Update UTs

* Generate static graph code of some ops by yaml (#48771)

* generate static graph code of some ops by yaml, test = develop

* fix 'take_along_axis' yaml style

* reset scatter/scatter_nd_add

* delete the comments of put_along_axis

* fix a bug in GetTrtWeight (#48993)

* add static_ops.yaml for static op (#48991)

* [PHI decoupling] move norm_utils.cu.h from fluid to phi and remove norm_utils.h in fluid (#48930)

* move norm_utils.cu.h from fluid to phi

* remove norm_utils.h in fluid

* fix bugs and replace mutable_data with Alloc

* replace mutable_data with Alloc

* forbid conv op whose weight is not a persistable weight into Paddle-TRT (#48763)

* fix: Move the pass location to the appropriate location (#48951)

* Enhance check_nan_inf implementation for CPU. (#48591)

* Enable to print device info.

* Enhance the nan and inf checking for cpu.

* Implement a common print function.

* Unify the check of complex numbers.

* Rewrite the omp method.

* Count and print the number of nan and inf.

* Change the print content.

* Add unittest.

* [PHI] OneDNN version of Copy (#48539)

* OneDNN version of Copy, tranpose kernels adjusted

* style fixes in tranpose_grad

* redundant headers deleted

* fix: there are some bugs with trt 8.0 (#48921)

* fix: there are some bugs with trt 8.0

* fix:windows CI trt is too old

* Optimization of Eigh op with ssyevj_batched runtime api (#48560)

* fix codestyle

* add double complex<float> complex<double> dtype support for syevj_batched

* fix use_syevj flag for precision loss when input dtype of syevj_batch is complex128 in some case

* optimize eigh in different case

* fix missing ; bug

* fix use_syevj bug

* fix use_cusolver_syevj_batched flag

* replace cross_entropy in python/paddle/fluid/tests/unittests/*/*.py except unittests/*.py (#48920)

* [PHI decoupling] replace dependency of inclusive_scan.h from phi (#48980)

* replace dependency of inclusive_scan.h from phi

* format code

* fluid API magration : Assert, increment, cond (#48885)

* [Clean fluid] Add inner function _elementwise_op_with_axis (#48748)

* add inner function _elementwise_op_with_axis

* fix transformer_model

* polish API code

* remove elementwise_div/mul api

* delete API in __all__

* delete elementwise_mul completely

* polish elementwise_mul call

* polish internal api

* resolve conflict, fix rnn.py

* use non-inplace call

* delete elementwise_mul api test

* delete elementwise_mul api test

* clean elementwise_add/sub

* restore _elementwise_op_in_dygraph in nn.py

* test_convert_to_mixed_precision.py use tempfile for temporary models/params (#48819)

* Tighten the Interception strategy (#48947)

* test approve ,test=document_fix

* test approve ,test=document_fix

* test approve ,test=document_fix

* [CodeStyle][isort][F401] fix some regression issues (#48936)

* [CodeStyle][isort][F401] fix some regression issues

* add import paddle to fix eval call

* rm multinode eager guard tests (#48766)

* rm multinode eager guard tests

* remove unwanted tests

* reset process_mpi test

* rm unittests eager guard tests part5 dataloader2dygraph_mnist (#48816)

* [PHI]Add new Tensor type and migrate save_combine kernel (#47856)

* add new tensor

* fix windows compile bugs

* fix ci bugs

* fix ci bugs

* fix ci bugs

* perfect according comment

* fix ci compile bugs

* add raw tensor

* fix ci bugs

* modify code by comment

* delete String

* [Fluid Clean]move BatchNorm from flud.dygraph.nn to paddle.nn.layer.norm (#48734)

* move BatchNorm from flud.dygraph.nn to paddle.nn.layer.norm

* modfiy conflict

* modify pre-commit error

* modify static-check ci error

* fix failed tests

* modify conflict

* modify conflict

* delete import modelu GRUUnit

* fix falied test

* fix failed testes

* fix failed tests

* fix failed tests

* fix failed test

* fix error in test_fused_resenet_basic_block_op_xpu.py

* modify after xiaoguang reviewed

* [Setup] Ignore @PADDLE_BINARY_DIR@ files (#49002)

* [Setup] Ignore @PADDLE_BINARY_DIR@ files

* test=document_fix

* reshape onednn test reimplemented (#48850)

* - UT reshape onednn

- Fix

test

test2

- test4

- test5

- test6

test7

- test8

- Ut reinvented

- cosmetic

* - fix

* - fix

* - fix

* - fix

* - Fix

* - fix

* - fix

* - fix

* - Fix

* lint

* update fused_multi_transformer_encoder_pass support GPT new matmul API (#48953)

* fit paddle.matmul in fleetx.gpt

* Revert "set free_when_no_cache_hit default value to true (#48815)" (#48968)

This reverts commit 592ed40b.

* [Paddle Inference]fix some transformer unitest (#48929)

* fix some transformer unitest

* Enable Generic-Plugin support FP16 (#48807)

* support conv1d quant & skip calibrate zero-size tensor (#48912)

* enable custom device save model on device memory && fix conflict (#48221)

* [api move] cvm (#48989)

* [api move] cvm

* [api move] cvm

* [api move] cvm

* [api move] cvm

* [api move] cvm

* [api move] cvm

* [api move] cvm

* [api move] ci test

* [api move] ci test

* [api move] ci test

* Bugfix: xpu now only support single node multi-card, bkcl_comm_num should always set to 1 (#48961)

* rm unittests eager guard tests part23 where2zeros (#48895)

* rm unittests eager guard tests part17 number2pool1d (#48840)

* [NPU] fix FLAGS_npu_storage_format flag in python, test=develop (#48976)

* remove fleet eager guard tests (#48765)

* rm unittests eager guard tests part6 eager_run2expand_v2 (#48817)

* rm unittests eager guard tests part12 imperative_optimizer2resnet (#48833)

* [fluid clean] remove 4 fluid.layers api and imigrate 2 fluid.layer api (#48972)

* fluid clean layer

* docs

* remove reset reference in unittest for `fluid.layers.cross_entropy` (#49012)

* replace cross_entropy in test*.py except python/paddle/fluid/tests/unittests/*.py (#48978)

* remove linear_chain_crf and crf_decoding from fluid (#48996)

* remove linear_chain_crf and crf_decoding

* Generate static graph code of some ops by yaml (#48977)

* generate static graph code of some ops by yaml

* fix the code-style of yaml

* fix the framework_ci for triangular_solve

* change the 'data_type' of scatter

* add the 'out: Out' of scatter_nd_add

* [tools] Update summary env (#48627)

* [tools] remove deprecated api , fix macOS get version error

* [tools] Rename the value that returns null

* [tools] add gcc, clang, cmak, libc version

* [tools] fix cudnn read error

* [tools] add gpu devices list, drive based

* [issue] update 3_build-installation-issue.yml

* [tools] fix get gpu list AttributeError

* [Dy2St] transforms.RandomVerticalFlip Support static mode (#49024)

* add static RandomVerticalFlip

* object => unittest.TestCase

* Save fused_attention op memory when dropout_rate = 0.0 (#48902)

* save fused_attention memory when dropout_rate = 0.0

* add ut

* fix ut bug

* fix fused_layernorm_residual_dropout_bias_test.cu

* Correct multiple inputs and outputs (#48872)

* [CodeStyle][isort][Dy2St] sort imports for paddle.jit (#48637)

* isort jit

* refine comment

* remove non-public apis from __all__ (#48952)

* remove non-public apis from __all__

* fix code style

* fix rmsprop_ yaml bug (#49026)

* fix rmsprop_ yaml bug

* Fixed the dead link bug in the API documentation (#48969)

* first pr

* Revise nn.py

* Revise nn.py 2.0

* Revise rnn.py;test=document_fix

* test=document_fix
Co-authored-by: NLigoml <39876205+Ligoml@users.noreply.github.com>

* Change mutable_data to ctx.Alloc. (#49001)

* [inference][trt] add more unary op and square (#48534)

* add more unary op and square

* Support ninja (#48932)

* move inplace_apis_indygraph_only from paddle.flud.dygraph.inplace_utils to paddle.utils

* modify conflict

* modify conflict

* modify conflict

* modify conflict

* modify conflict

* modify conflict

* modify conflict

* modify static-check ci error

* fix conflict

* modify failed tests

* fix conflict

* fix conflict

* fix pool2d examples

* modify conflict

* fix failed tests

* fix conflict

* fix failed tests

* modfiy problem of deleting pool2d

* support Ninja in setup.py

* support different cmake_generators

* modify after reviewed

* delete unused denotes

* Deleted mkldnn_inplace_pass code (#47818)

* Deleted mkldnn_inplace_pass code

* Fixed error with cmake

* Resolve conflicts

* hide log (#49045)

* test=doucment_fix

* test=document_fix

* [Sparse]Optimize performance of sparse conv on T4 (#49009)

* modify cmake file for cuda11.8 compile (#49020)

* modify cmake file for cuda11.8 compile

* add op_library(fused_embedding_eltwise_layernorm_op DEPS bert_encoder_functor)

* remove dropout from fluid (#48319)

* remove dropout

* nullptr bugfix for XPU pg mode (#49043)

* nullptr bugfix for XPU pg mode

Also a few kernels is added to xpu whitelist

* increase error msg length

* Divide elementwise case from BroadcastKernel and refine transpose autotune (#33051)

* First Commit.

* add some codes

* add elementwise loader

* fix code styles

* merge with develop

* add some changes both in elementwise and transpose

* add init operation in broadcast kernel.

* change codes according to pr suggestions about transpose file

* fix error for op-benchmark ci

* fix according to ci

* add condition of skipif (#48791)

* add condition of skipif

* fix code format error

* Update test_fused_gate_attention_op.py

update

* rm unittests eager guard tests part9 histogram2imperative_dataloader (#48825)

* rm unittests eager guard tests part9 histogram2imperative_dataloader

* rm basic

* rm unittests eager guard test part14 initializer2layer_norm (#48835)

* rm unittests eager guard test part14 initializer2layer_norm

* monior change

* [Bugfix] recompute dep filter param (#49010)

* recompute dep filter param

* recompute dep for reshard

* [Paddle Inference] rewrite convert_to_mixed_precision (#48853)

* [CodeStyle] fix c++17-extensions warning on macos (#49017)

* fix c++17-extensions warning on macos

* fix type

fix c++17-extensions warning on macos

fix c++17-extensions warning on macos

* Add custom CUDNN finding paths for 64bit Windows (#49066)

* remove prior_box (#49006)

* remove prior_box

* modify the sequence of paras of prior_box in multi_box_head api

* InstanceNorm1D、InstanceNorm2D、InstanceNorm3D (#48940)

* modified:   python/paddle/nn/layer/norm.py

* modified:   python/paddle/nn/layer/norm.py

* modified:   python/paddle/nn/layer/norm.py

* modified:   python/paddle/nn/layer/norm.py

* modified:   python/paddle/nn/layer/norm.py

* modified:   python/paddle/nn/layer/norm.py

* test=docs_preview

* InstanceNorm2D中文档格式修改

* test=docs_preview

* modified:   python/paddle/nn/functional/loss.py
	modified:   python/paddle/nn/functional/norm.py
	modified:   python/paddle/nn/layer/loss.py
	modified:   python/paddle/nn/layer/norm.py

* test=docs_preview

* test=docs_preview

* [AutoParallel] recompute tuning (#48608)

* [AutoParallel] recompute tuning

* fix conflict

* update comment

* bug fix

* update rc algo

* tiny fix

* fix clear process_group

* remove comment

* update segment print

* fix import OpRole

* adapt amp pass and grad_clip pass for opt_tuner

* update tuning config

* fix import

* annotate recompute info on ops and upgrade recompute pass

* add op_namescope for seed op

* record reserved vars

* fix recompute var's dist_attr

* fix strategy unittest

* adapt for fp16

* update unittest

* revert copy opt

* update unittest

* rename set_recompute_segments

* fix unittest

* fluid API magration : array_read, array_write (#49022)

* del array_write & array_read

* fix import err

* fix import err

* fix example codes

* Keep double-buffer reader for static mode  (#49068)

* Fix nullptr to TestFuseGemmEpilogueReluBWDFP* (#48997)

* support fp16 index sample (#47897)

* add index sample fp16 support

* remove fluid APIs in distributed_strategy.py and role_maker.py

* Revert "remove fluid APIs in distributed_strategy.py and role_maker.py"

This reverts commit 223bbee990d3bf69e252fc3c0f19e3873550a264.

* fix instantiated more than once

* clean codes

* rm unittest eager guard tests part20 sparse_mv2split (#48879)

* rm unittests eager guard tests part11 imperative_layer2ocr (#48828)

* rm unittests eager guard tests part11 imperative_layer2ocr

* review

* rm eager guard tests part3_1 (#49059)

* fix: gloo compatible (#49084)

* rm eager guard tests part3_3 (#49061)

* fix bug (#49081)

* [Inference] memory_optimize and mkdlnn  problem (#49054)

* memory_optimize and mkdlnn problem

* update

* update

* update

* Remove/move 16 fluid APIs (#48377)

* remove density_prior_box

* remove anchor_generator

* remove roi_perspective_transform

* remove generate_proposal_labels

* remove generate_mask_labels

* remove generate_proposals

* remove box_clip

* remove retinanet_detection_output

* remove multiclass_nms

* remove locality_aware_nms

* remove matrix_nms

* remove distribute_fpn_proposals

* remove box_decoder_and_assign

* remove collect_fpn_proposals

* remove 2 trt files

* move prior_box to static/nn/common.py

* move multi_box_head to static/nn/common.py

* fix for CI/CE

* remove retinanet_detection_output

* restore compile_vs_runtime_white_list.py

* restore test_retinanet_detection_output to white list

* replace nn.flatten by paddle.flatten, and fix doc for retinanet_target_assign

* add enable_static in demo and fix bug

* remove roi_perspective_transform in test_layers

* remove multi_box_head

* change self.multiclass_nms to _legacy_C_ops.multiclass_nms

* empty commit

* empty commit

* check code style

* fix prior_box

* fix CI

* remove redundant prior_box in detection.py

* fix docs

* remove detection

* fix prior_box en doc

* delete prior_box in common

* remote proir_box from __init__.py

* fix embedding multihead (#49085)

* SetDeviceId in StreamSafeCUDAAllocation (#49080)

* [PHI decoupling] Remove fluid imports from MKLDNN code (#48981)

* fix wrong handler name

* mkldnn_engine -> onednn_engine

* remove fluid/errors.h imports

* remove fluid/enforce.h imports

* remove note and unnecessary import

* remove fluid/pretty_log.h imports

* remove fluid/place.h imports

* remove fluid/data_layout_transform.h imports

* remove fluid/device_context.h imports

* remove mkldnn_helper code

* remove fluid/mkldnn_reuse.h imports

* pretty_log import

* replace cross_entropy in python/paddle/fluid/tests/unittests/*.py (#48975)

* 修复paddle.amp.decorate等API的文档 (#48983)

* 涉及到的api有
paddle.amp.decorate
paddle.static.npu_places
paddle.signal.istft
paddle.signal.stft
paddle.linalg.eigvalsh
paddle.randint_like

* change signal.stft

* randint_like的low增加optional

* ; test=docs_preview

* 修改了注解格式; test=docs_preview

* 修改了公式格式

* 修改了decorate的models等

* test=document_fix
Co-authored-by: NLigoml <39876205+Ligoml@users.noreply.github.com>

* 按在线文档需求 61~70 更新了部分文档 (#49014)

* Update docstring:
1. 去除 python/paddle/tensor/manipulation.py 中 cast 函数描述中的 This OP;
2. 调整 python/paddle/fluid/layers/control_flow.py 中 Print 函数中参数描述的顺序,添加 optional 描述;
3. 为 python/paddle/tensor/logic.py 中 logical_and 函数添加 optional 描述;
4. 为 python/paddle/fluid/reader.py 中 DataLoader 类中 from_generator、from_dataset 函数添加 optional 描述;
5. 在 python/paddle/fluid/layers/nn.py 中 crf_decoding 函数的 param_attr 在使用中确实可视为存在默认值 None,故添加 optional 描述;
6. 修复 python/paddle/static/nn/common.py 中 data_norm 函数描述里 tex 语法错误的问题,并一并修复同一文件中的相同问题。

* 根据 review 意见修改部分内容。

* 将谓语动词去掉第三人称单数形式。

* 同步中文文档变更。

* string-->str; test=document_fix
Co-authored-by: NLigoml <39876205+Ligoml@users.noreply.github.com>

* merge gpugraph to develop, fix gloo wrapper

* merge gpugraph to develop, fix ci

* merge gpugraph to develop, fix gloo wrapper

* merge gpugraph to develop, fix ci

* merge gpugraph to develop, fix fleet.py

* merge gpugraph to develop, fix merge error

* merge gpugraph to develop, fix merge error

* merge gpugraph to develop, add python ut

* merge gpugraph to develop, fix code style

* merge gpugraph to develop, add c++ ut

* merge gpugraph to develop, fix code style

* merge gpugraph to develop, fix data_feed.h

* merge gpugraph to develop, fix code style

* merge gpugraph to develop, fix code style

* merge gpugraph to develop, fix code style

* merge gpugraph to develop, fix code style
Co-authored-by: Nwuhuachaocoding <77733235+wuhuachaocoding@users.noreply.github.com>
Co-authored-by: NNyakku Shigure <sigure.qaq@gmail.com>
Co-authored-by: Nzyfncg <zhangyunfei07@baidu.com>
Co-authored-by: Nxiongkun <xiongkun03@baidu.com>
Co-authored-by: Nceci3 <ceci3@users.noreply.github.com>
Co-authored-by: Nzhangyikun02 <48021248+zhangyk0314@users.noreply.github.com>
Co-authored-by: NWeilong Wu <veyron_wu@163.com>
Co-authored-by: N201716010711 <87008376+201716010711@users.noreply.github.com>
Co-authored-by: NQi Li <qili93@qq.com>
Co-authored-by: Nzhoutianzi666 <39978853+zhoutianzi666@users.noreply.github.com>
Co-authored-by: Nfeng_shuai <fengshuai03@baidu.com>
Co-authored-by: NQingshuChen <chenqingshu@baidu.com>
Co-authored-by: NWangZhen <23097963+0x45f@users.noreply.github.com>
Co-authored-by: N张春乔 <83450930+Liyulingyue@users.noreply.github.com>
Co-authored-by: N傅剑寒 <Xs1580802568@gmail.com>
Co-authored-by: Nxiaoguoguo626807 <100397923+xiaoguoguo626807@users.noreply.github.com>
Co-authored-by: Nwangzhen38 <41941775+wangzhen38@users.noreply.github.com>
Co-authored-by: zhouweiwei2014's avatarZhou Wei <1183042833@qq.com>
Co-authored-by: NKevin吴嘉文 <417333277@qq.com>
Co-authored-by: NZman <35071129+Atlantisming@users.noreply.github.com>
Co-authored-by: Nlzy <569782149@qq.com>
Co-authored-by: NVvsmile <450864116@qq.com>
Co-authored-by: NSławomir Siwek <slawomir.siwek@intel.com>
Co-authored-by: NRichardWooSJTU <37864677+RichardWooSJTU@users.noreply.github.com>
Co-authored-by: NLigoml <39876205+Ligoml@users.noreply.github.com>
Co-authored-by: Nhjyp <53164956+Tomoko-hjf@users.noreply.github.com>
Co-authored-by: NVigi Zhang <VigiZhang@users.noreply.github.com>
Co-authored-by: Nzqw_1997 <118182234+zhengqiwen1997@users.noreply.github.com>
Co-authored-by: NYiqun Liu <Xreki@users.noreply.github.com>
Co-authored-by: Nrisemeup1 <62429225+risemeup1@users.noreply.github.com>
Co-authored-by: NGuanghua Yu <742925032@qq.com>
Co-authored-by: N姜永久 <34344716+yjjiang11@users.noreply.github.com>
Co-authored-by: Nwanghuancoder <wanghuan29@baidu.com>
Co-authored-by: NRoc <30228238+sljlp@users.noreply.github.com>
Co-authored-by: NWilber <jiweibo@baidu.com>
Co-authored-by: NGhost Screaming <mofengshenjieII@163.com>
Co-authored-by: NNetpunk <69072522+Patrick-Star125@users.noreply.github.com>
Co-authored-by: N六个骨头 <46243324+zrr1999@users.noreply.github.com>
Co-authored-by: NAurelius84 <zhangliujie@baidu.com>
Co-authored-by: NRuibiao Chen <chenruibiao@baidu.com>
Co-authored-by: Nliu zhengxi <380185688@qq.com>
Co-authored-by: Nheyanru <81976792+heyanru01@users.noreply.github.com>
Co-authored-by: Ntianshuo78520a <707759223@qq.com>
Co-authored-by: Nhuangjiyi <43315610+huangjiyi@users.noreply.github.com>
Co-authored-by: NInfinity_lee <luhputu0815@gmail.com>
Co-authored-by: Nhouj04 <35131887+houj04@users.noreply.github.com>
Co-authored-by: Nlugimzzz <63761690+lugimzzz@users.noreply.github.com>
Co-authored-by: NWangzheee <634486483@qq.com>
Co-authored-by: Nsneaxiy <32832641+sneaxiy@users.noreply.github.com>
Co-authored-by: Nkangguangli <kangguangli@hotmail.com>
Co-authored-by: Njakpiase <jakpia21@gmail.com>
Co-authored-by: NHongyuJia <jiahongyu@baidu.com>
Co-authored-by: Nhaosicheng <47998305+HarperCy@users.noreply.github.com>
Co-authored-by: NChang Xu <molixu7@gmail.com>
Co-authored-by: NKai Song <50285351+USTCKAY@users.noreply.github.com>
Co-authored-by: Nlimingshu <61349199+JamesLim-sy@users.noreply.github.com>
Co-authored-by: NJianghai <72591262+CjhHa1@users.noreply.github.com>
Co-authored-by: Njiangcheng <thisjiang@qq.com>
Co-authored-by: Nccrrong <101700995+ccrrong@users.noreply.github.com>
Co-authored-by: NPuQing <me@puqing.work>
Co-authored-by: NLeo Chen <chenqiuliang@baidu.com>
Co-authored-by: Ncyber-pioneer <116002591+cyber-pioneer@users.noreply.github.com>
Co-authored-by: Nniuliling123 <51102941+niuliling123@users.noreply.github.com>
Co-authored-by: Njames <zhangxiaoci@baidu.com>
Co-authored-by: Nwenbin <wang3323032@qq.com>
Co-authored-by: MarDino's avatarZZK <359521840@qq.com>
Co-authored-by: NZhang Jun <ewalker@live.cn>
Co-authored-by: Nyjphhw <43883055+yjphhw@users.noreply.github.com>
Co-authored-by: NYuanle Liu <yuanlehome@163.com>
Co-authored-by: NWen Sun <35923278+HermitSun@users.noreply.github.com>
Co-authored-by: HappyHeavyRain's avatarlzydev <1528794076@qq.com>
Co-authored-by: NPaulina Gacek <paulina.gacek@intel.com>
Co-authored-by: Nfeifei-111 <2364819892@qq.com>
Co-authored-by: NYuanRisheng <yuanrisheng@baidu.com>
Co-authored-by: NJacek Czaja <jacek.czaja@intel.com>
Co-authored-by: Nweishengying <63448337+weishengying@users.noreply.github.com>
Co-authored-by: Nengineer1109 <jialiang.wang@xdxct.com>
Co-authored-by: Ngouzil <66515297+gouzil@users.noreply.github.com>
Co-authored-by: NRyan <44900829+DrRyanHuang@users.noreply.github.com>
Co-authored-by: Njoanna.wozna.intel <joanna.wozna@intel.com>
Co-authored-by: NJYChen <zoooo0820@qq.com>
Co-authored-by: Njjyaoao <88936287+jjyaoao@users.noreply.github.com>
Co-authored-by: NHulek <jakub.hulek@intel.com>
Co-authored-by: Nzhangkaihuo <zhangkaihuo@baidu.com>
Co-authored-by: NYUNSHEN XIE <1084314248@qq.com>
Co-authored-by: NJZ-LIANG <jianzhongliang10@gmail.com>
Co-authored-by: NTinson Lai <laitingsheng@hotmail.com>
Co-authored-by: NAyuan <79981115+Ayuan2021@users.noreply.github.com>
Co-authored-by: Nzhaoyingli <86812880+zhaoyinglia@users.noreply.github.com>
Co-authored-by: NMing-Xu Huang <mingh@nvidia.com>
Co-authored-by: Nwangxiaoning <71813629+wangxn12138@users.noreply.github.com>
Co-authored-by: NHaohongxiang <86215757+haohongxiang@users.noreply.github.com>
Co-authored-by: NHydrogenSulfate <490868991@qq.com>
Co-authored-by: Nmjxs <52824616+kk-2000@users.noreply.github.com>
Co-authored-by: 学渣戊's avatar学渣戊 <x19403@163.com>
上级 19303281
include(ExternalProject)
set(JEMALLOC_PROJECT "extern_jemalloc")
set(JEMALLOC_URL
https://github.com/jemalloc/jemalloc/releases/download/5.1.0/jemalloc-5.1.0.tar.bz2
)
set(JEMALLOC_BUILD ${THIRD_PARTY_PATH}/jemalloc/src/extern_jemalloc)
set(JEMALLOC_SOURCE_DIR "${THIRD_PARTY_PATH}/jemalloc")
set(JEMALLOC_INSTALL ${THIRD_PARTY_PATH}/install/jemalloc)
set(JEMALLOC_INCLUDE_DIR ${JEMALLOC_INSTALL}/include)
set(JEMALLOC_DOWNLOAD_DIR "${JEMALLOC_SOURCE_DIR}/src/${JEMALLOC_PROJECT}")
set(JEMALLOC_STATIC_LIBRARIES
${THIRD_PARTY_PATH}/install/jemalloc/lib/libjemalloc_pic.a)
set(JEMALLOC_LIBRARIES
${THIRD_PARTY_PATH}/install/jemalloc/lib/libjemalloc_pic.a)
ExternalProject_Add(
extern_jemalloc
PREFIX ${JEMALLOC_SOURCE_DIR}
URL ${JEMALLOC_URL}
INSTALL_DIR ${JEMALLOC_INSTALL}
DOWNLOAD_DIR "${JEMALLOC_DOWNLOAD_DIR}"
BUILD_COMMAND $(MAKE)
BUILD_IN_SOURCE 1
INSTALL_COMMAND $(MAKE) install
CONFIGURE_COMMAND "${JEMALLOC_DOWNLOAD_DIR}/configure"
--prefix=${JEMALLOC_INSTALL} --disable-initial-exec-tls)
add_library(jemalloc STATIC IMPORTED GLOBAL)
set_property(TARGET jemalloc PROPERTY IMPORTED_LOCATION
${JEMALLOC_STATIC_LIBRARIES})
include_directories(${JEMALLOC_INCLUDE_DIR})
add_dependencies(jemalloc extern_jemalloc)
......@@ -14,6 +14,13 @@
include(ExternalProject)
# find_package(jemalloc REQUIRED)
set(JEMALLOC_INCLUDE_DIR ${THIRD_PARTY_PATH}/install/jemalloc/include)
set(JEMALLOC_LIBRARIES
${THIRD_PARTY_PATH}/install/jemalloc/lib/libjemalloc_pic.a)
message(STATUS "rocksdb jemalloc:" ${JEMALLOC_LIBRARIES})
set(ROCKSDB_PREFIX_DIR ${THIRD_PARTY_PATH}/rocksdb)
set(ROCKSDB_INSTALL_DIR ${THIRD_PARTY_PATH}/install/rocksdb)
set(ROCKSDB_INCLUDE_DIR
......@@ -22,21 +29,39 @@ set(ROCKSDB_INCLUDE_DIR
set(ROCKSDB_LIBRARIES
"${ROCKSDB_INSTALL_DIR}/lib/librocksdb.a"
CACHE FILEPATH "rocksdb library." FORCE)
set(ROCKSDB_CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC")
set(ROCKSDB_COMMON_FLAGS
"-g -pipe -O2 -W -Wall -Wno-unused-parameter -fPIC -fno-builtin-memcmp -fno-omit-frame-pointer"
)
set(ROCKSDB_FLAGS
"-DNDEBUG -DROCKSDB_JEMALLOC -DJEMALLOC_NO_DEMANGLE -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DOS_LINUX -DROCKSDB_FALLOCATE_PRESENT -DHAVE_SSE42 -DHAVE_PCLMUL -DZLIB -DROCKSDB_MALLOC_USABLE_SIZE -DROCKSDB_PTHREAD_ADAPTIVE_MUTEX -DROCKSDB_BACKTRACE -DROCKSDB_SUPPORT_THREAD_LOCAL -DROCKSDB_USE_RTTI -DROCKSDB_SCHED_GETCPU_PRESENT -DROCKSDB_RANGESYNC_PRESENT -DROCKSDB_AUXV_GETAUXVAL_PRESENT"
)
set(ROCKSDB_CMAKE_CXX_FLAGS
"${ROCKSDB_COMMON_FLAGS} -DROCKSDB_LIBAIO_PRESENT -msse -msse4.2 -mpclmul ${ROCKSDB_FLAGS} -fPIC -I${JEMALLOC_INCLUDE_DIR} -lz -ldl"
)
set(ROCKSDB_CMAKE_C_FLAGS
"${ROCKSDB_COMMON_FLAGS} ${ROCKSDB_FLAGS} -DROCKSDB_LIBAIO_PRESENT -fPIC -I${JEMALLOC_INCLUDE_DIR}"
)
include_directories(${ROCKSDB_INCLUDE_DIR})
set(CMAKE_CXX_LINK_EXECUTABLE
"${CMAKE_CXX_LINK_EXECUTABLE} -pthread -ldl -lrt -lz")
ExternalProject_Add(
extern_rocksdb
${EXTERNAL_PROJECT_LOG_ARGS}
PREFIX ${ROCKSDB_PREFIX_DIR}
GIT_REPOSITORY "https://github.com/facebook/rocksdb"
GIT_TAG v6.10.1
GIT_REPOSITORY "https://github.com/Thunderbrook/rocksdb"
GIT_TAG 6.19.fb
UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
-DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
-DWITH_BZ2=OFF
-DPORTABLE=1
-DWITH_GFLAGS=OFF
-DWITH_TESTS=OFF
-DWITH_JEMALLOC=ON
-DWITH_BENCHMARK_TOOLS=OFF
-DJeMalloc_LIBRARIES=${JEMALLOC_LIBRARIES}
-DJeMalloc_INCLUDE_DIRS=${JEMALLOC_INCLUDE_DIR}
-DCMAKE_CXX_FLAGS=${ROCKSDB_CMAKE_CXX_FLAGS}
-DCMAKE_C_FLAGS=${CMAKE_C_FLAGS}
INSTALL_COMMAND
......
......@@ -423,6 +423,9 @@ if(WITH_PSCORE)
include(external/rocksdb) # download, build, install rocksdb
list(APPEND third_party_deps extern_rocksdb)
include(external/jemalloc) # download, build, install jemalloc
list(APPEND third_party_deps extern_jemalloc)
endif()
if(WITH_RPC
......
......@@ -66,6 +66,10 @@ class FsReadChannel {
return 0;
}
inline int read(char* data, size_t size) {
return fread(data, 1, size, _file.get());
}
private:
uint32_t _buffer_size;
FsChannelConfig _config;
......@@ -114,6 +118,14 @@ class FsWriteChannel {
return write_line(data.c_str(), data.size());
}
inline uint32_t write(const char* data, size_t size) {
size_t write_count = fwrite(data, 1, size, _file.get());
if (write_count != size) {
return -1;
}
return 0;
}
private:
uint32_t _buffer_size;
FsChannelConfig _config;
......
......@@ -148,10 +148,12 @@ class PSClient {
return fut;
}
virtual ::std::future<int32_t> PullSparsePtr(char **select_values,
virtual ::std::future<int32_t> PullSparsePtr(int shard_id,
char **select_values,
size_t table_id,
const uint64_t *keys,
size_t num) {
size_t num,
uint16_t pass_id) {
VLOG(0) << "Did not implement";
std::promise<int32_t> promise;
std::future<int> fut = promise.get_future();
......@@ -160,6 +162,15 @@ class PSClient {
}
virtual std::future<int32_t> PrintTableStat(uint32_t table_id) = 0;
virtual std::future<int32_t> SaveCacheTable(uint32_t table_id,
uint16_t pass_id,
size_t threshold) {
VLOG(0) << "Did not implement";
std::promise<int32_t> promise;
std::future<int> fut = promise.get_future();
promise.set_value(-1);
return fut;
}
// 确保所有积攒中的请求都发起发送
virtual std::future<int32_t> Flush() = 0;
......
......@@ -260,10 +260,12 @@ int32_t PsLocalClient::Initialize() {
// return done();
//}
::std::future<int32_t> PsLocalClient::PullSparsePtr(char** select_values,
::std::future<int32_t> PsLocalClient::PullSparsePtr(int shard_id,
char** select_values,
size_t table_id,
const uint64_t* keys,
size_t num) {
size_t num,
uint16_t pass_id) {
// FIXME
// auto timer =
// std::make_shared<CostTimer>("pslib_downpour_client_pull_sparse");
......@@ -278,6 +280,8 @@ int32_t PsLocalClient::Initialize() {
table_context.pull_context.ptr_values = select_values;
table_context.use_ptr = true;
table_context.num = num;
table_context.shard_id = shard_id;
table_context.pass_id = pass_id;
// table_ptr->PullSparsePtr(select_values, keys, num);
table_ptr->Pull(table_context);
......@@ -285,6 +289,28 @@ int32_t PsLocalClient::Initialize() {
return done();
}
::std::future<int32_t> PsLocalClient::PrintTableStat(uint32_t table_id) {
auto* table_ptr = GetTable(table_id);
std::pair<int64_t, int64_t> ret = table_ptr->PrintTableStat();
VLOG(0) << "table id: " << table_id << ", feasign size: " << ret.first
<< ", mf size: " << ret.second;
return done();
}
::std::future<int32_t> PsLocalClient::SaveCacheTable(uint32_t table_id,
uint16_t pass_id,
size_t threshold) {
auto* table_ptr = GetTable(table_id);
std::pair<int64_t, int64_t> ret = table_ptr->PrintTableStat();
VLOG(0) << "table id: " << table_id << ", feasign size: " << ret.first
<< ", mf size: " << ret.second;
if (ret.first > (int64_t)threshold) {
VLOG(0) << "run cache table";
table_ptr->CacheTable(pass_id);
}
return done();
}
::std::future<int32_t> PsLocalClient::PushSparseRawGradient(
size_t table_id,
const uint64_t* keys,
......
......@@ -76,18 +76,19 @@ class PsLocalClient : public PSClient {
return fut;
}
virtual ::std::future<int32_t> PullSparsePtr(char** select_values,
virtual ::std::future<int32_t> PullSparsePtr(int shard_id,
char** select_values,
size_t table_id,
const uint64_t* keys,
size_t num);
size_t num,
uint16_t pass_id);
virtual ::std::future<int32_t> PrintTableStat(uint32_t table_id) {
std::promise<int32_t> prom;
std::future<int32_t> fut = prom.get_future();
prom.set_value(0);
virtual ::std::future<int32_t> PrintTableStat(uint32_t table_id);
virtual ::std::future<int32_t> SaveCacheTable(uint32_t table_id,
uint16_t pass_id,
size_t threshold);
return fut;
}
virtual ::std::future<int32_t> PushSparse(size_t table_id,
const uint64_t* keys,
const float** update_values,
......
......@@ -53,16 +53,6 @@ cc_library(
set_source_files_properties(
tensor_accessor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_library(
tensor_table
SRCS
DEPS eigen3
ps_framework_proto
executor
scope
device_context
tensor
${TABLE_DEPS})
set_source_files_properties(table.cc PROPERTIES COMPILE_FLAGS
${DISTRIBUTE_COMPILE_FLAGS})
......@@ -98,7 +88,6 @@ cc_library(
table.cc
DEPS ${TABLE_DEPS}
common_table
tensor_table
ps_framework_proto
string_helper
device_context
......
......@@ -121,6 +121,9 @@ class ValueAccessor {
virtual void UpdateStatAfterSave(float* value, int param) {}
// 判断该value是否保存到ssd
virtual bool SaveSSD(float* value) = 0;
// 判断热启时是否过滤slot对应的feasign
virtual bool FilterSlot(float* value) { return false; }
//
virtual bool SaveCache(float* value,
int param,
......@@ -162,9 +165,18 @@ class ValueAccessor {
return 0;
}
virtual bool SaveMemCache(float* value,
int param,
double global_cache_threshold,
uint16_t pass_id) {
return true;
}
virtual void UpdatePassId(float* value, uint16_t pass_id) {}
virtual float GetField(float* value, const std::string& name) { return 0.0; }
#define DEFINE_GET_INDEX(class, field) \
virtual int get_##field##_index() override { return class ::field##_index(); }
virtual int get_##field##_index() { return class ::field##_index(); }
protected:
size_t _value_size;
......
......@@ -60,7 +60,7 @@ class GraphShard {
std::vector<Node *> get_batch(int start, int end, int step);
void get_ids_by_range(int start, int end, std::vector<uint64_t> *res) {
res->reserve(res->size() + end - start);
for (int i = start; i < end && i < (int)bucket.size(); i++) {
for (int i = start; i < end && i < static_cast<int>(bucket.size()); i++) {
res->emplace_back(bucket[i]->get_id());
}
}
......@@ -93,7 +93,7 @@ class GraphShard {
size_t get_all_feature_ids(std::vector<std::vector<uint64_t>> *total_res,
int slice_num) {
std::vector<uint64_t> keys;
for (int i = 0; i < (int)bucket.size(); i++) {
for (size_t i = 0; i < bucket.size(); i++) {
bucket[i]->get_feature_ids(&keys);
}
return dedup2shard_keys(&keys, total_res, slice_num);
......@@ -130,7 +130,29 @@ class GraphShard {
return node_location;
}
private:
void shrink_to_fit() {
bucket.shrink_to_fit();
for (size_t i = 0; i < bucket.size(); i++) {
bucket[i]->shrink_to_fit();
}
}
void merge_shard(GraphShard *&shard) { // NOLINT
bucket.reserve(bucket.size() + shard->bucket.size());
for (size_t i = 0; i < shard->bucket.size(); i++) {
auto node_id = shard->bucket[i]->get_id();
if (node_location.find(node_id) == node_location.end()) {
node_location[node_id] = bucket.size();
bucket.push_back(shard->bucket[i]);
}
}
shard->node_location.clear();
shard->bucket.clear();
delete shard;
shard = NULL;
}
public:
std::unordered_map<uint64_t, int> node_location;
std::vector<Node *> bucket;
};
......@@ -161,7 +183,7 @@ class SampleResult {
public:
size_t actual_size;
std::shared_ptr<char> buffer;
SampleResult(size_t _actual_size, std::shared_ptr<char> &_buffer)
SampleResult(size_t _actual_size, std::shared_ptr<char> &_buffer) // NOLINT
: actual_size(_actual_size), buffer(_buffer) {}
SampleResult(size_t _actual_size, char *_buffer)
: actual_size(_actual_size),
......@@ -187,7 +209,7 @@ class ScaledLRU;
template <typename K, typename V>
class RandomSampleLRU {
public:
RandomSampleLRU(ScaledLRU<K, V> *_father) {
explicit RandomSampleLRU(ScaledLRU<K, V> *_father) {
father = _father;
remove_count = 0;
node_size = 0;
......@@ -204,7 +226,9 @@ class RandomSampleLRU {
node_head = p;
}
}
LRUResponse query(K *keys, size_t length, std::vector<std::pair<K, V>> &res) {
LRUResponse query(K *keys,
size_t length,
std::vector<std::pair<K, V>> &res) { // NOLINT
if (pthread_rwlock_tryrdlock(&father->rwlock) != 0)
return LRUResponse::blocked;
// pthread_rwlock_rdlock(&father->rwlock);
......@@ -271,7 +295,6 @@ class RandomSampleLRU {
remove(node_head);
remove_count--;
}
// std::cerr<<"after remove_count = "<<remove_count<<std::endl;
}
void move_to_tail(LRUNode<K, V> *node) {
......@@ -356,25 +379,25 @@ class ScaledLRU {
LRUResponse query(size_t index,
K *keys,
size_t length,
std::vector<std::pair<K, V>> &res) {
std::vector<std::pair<K, V>> &res) { // NOLINT
return lru_pool[index].query(keys, length, res);
}
LRUResponse insert(size_t index, K *keys, V *data, size_t length) {
return lru_pool[index].insert(keys, data, length);
}
int Shrink() {
int node_size = 0;
size_t node_size = 0;
for (size_t i = 0; i < lru_pool.size(); i++) {
node_size += lru_pool[i].node_size - lru_pool[i].remove_count;
}
if ((size_t)node_size <= size_t(1.1 * size_limit) + 1) return 0;
if (node_size <= static_cast<size_t>(1.1 * size_limit) + 1) return 0;
if (pthread_rwlock_wrlock(&rwlock) == 0) {
global_count = 0;
for (size_t i = 0; i < lru_pool.size(); i++) {
global_count += lru_pool[i].node_size - lru_pool[i].remove_count;
}
if ((size_t)global_count > size_limit) {
if (static_cast<size_t>(global_count) > size_limit) {
size_t remove = global_count - size_limit;
for (size_t i = 0; i < lru_pool.size(); i++) {
lru_pool[i].total_diff = 0;
......@@ -392,7 +415,7 @@ class ScaledLRU {
void handle_size_diff(int diff) {
if (diff != 0) {
__sync_fetch_and_add(&global_count, diff);
if (global_count > int(1.25 * size_limit)) {
if (global_count > static_cast<int>(1.25 * size_limit)) {
thread_pool->enqueue([this]() -> int { return Shrink(); });
}
}
......@@ -507,8 +530,8 @@ class GraphTable : public Table {
int idx,
int start,
int size,
std::unique_ptr<char[]> &buffer,
int &actual_size,
std::unique_ptr<char[]> &buffer, // NOLINT
int &actual_size, // NOLINT
bool need_feature,
int step);
......@@ -516,40 +539,48 @@ class GraphTable : public Table {
int idx,
uint64_t *node_ids,
int sample_size,
std::vector<std::shared_ptr<char>> &buffers,
std::vector<int> &actual_sizes,
std::vector<std::shared_ptr<char>> &buffers, // NOLINT
std::vector<int> &actual_sizes, // NOLINT
bool need_weight);
int32_t random_sample_nodes(int type_id,
int idx,
int sample_size,
std::unique_ptr<char[]> &buffers,
int &actual_sizes);
std::unique_ptr<char[]> &buffers, // NOLINT
int &actual_sizes); // NOLINT
virtual int32_t get_nodes_ids_by_ranges(
int type_id,
int idx,
std::vector<std::pair<int, int>> ranges,
std::vector<uint64_t> &res);
std::vector<uint64_t> &res); // NOLINT
virtual int32_t Initialize() { return 0; }
virtual int32_t Initialize(const TableParameter &config,
const FsClientParameter &fs_config);
virtual int32_t Initialize(const GraphParameter &config);
void init_worker_poll(int gpu_num);
int32_t Load(const std::string &path, const std::string &param);
int32_t load_node_and_edge_file(std::string etype,
std::string ntype,
std::string epath,
std::string npath,
int32_t load_node_and_edge_file(std::string etype2files,
std::string ntype2files,
std::string graph_data_local_path,
int part_num,
bool reverse);
std::string get_inverse_etype(std::string &etype);
int32_t parse_edge_and_load(std::string etype2files,
std::string graph_data_local_path,
int part_num,
bool reverse);
int32_t parse_node_and_load(std::string ntype2files,
std::string graph_data_local_path,
int part_num);
std::string get_inverse_etype(std::string &etype); // NOLINT
int32_t parse_type_to_typepath(
std::string &type2files, // NOLINT
std::string graph_data_local_path,
std::vector<std::string> &res_type, // NOLINT
std::unordered_map<std::string, std::string> &res_type2path); // NOLINT
int32_t load_edges(const std::string &path,
bool reverse,
const std::string &edge_type);
int get_all_id(int type,
int slice_num,
std::vector<std::vector<uint64_t>> *output);
......@@ -568,6 +599,8 @@ class GraphTable : public Table {
int idx,
int slice_num,
std::vector<std::vector<uint64_t>> *output);
int get_node_embedding_ids(int slice_num,
std::vector<std::vector<uint64_t>> *output);
int32_t load_nodes(const std::string &path,
std::string node_type = std::string());
std::pair<uint64_t, uint64_t> parse_edge_file(const std::string &path,
......@@ -578,23 +611,23 @@ class GraphTable : public Table {
int idx);
std::pair<uint64_t, uint64_t> parse_node_file(const std::string &path);
int32_t add_graph_node(int idx,
std::vector<uint64_t> &id_list,
std::vector<bool> &is_weight_list);
std::vector<uint64_t> &id_list, // NOLINT
std::vector<bool> &is_weight_list); // NOLINT
int32_t remove_graph_node(int idx, std::vector<uint64_t> &id_list);
int32_t remove_graph_node(int idx, std::vector<uint64_t> &id_list); // NOLINT
int32_t get_server_index_by_id(uint64_t id);
Node *find_node(int type_id, int idx, uint64_t id);
Node *find_node(int type_id, uint64_t id);
virtual int32_t Pull(TableContext &context) { return 0; }
virtual int32_t Push(TableContext &context) { return 0; }
virtual int32_t Pull(TableContext &context) { return 0; } // NOLINT
virtual int32_t Push(TableContext &context) { return 0; } // NOLINT
virtual int32_t clear_nodes(int type, int idx);
virtual void Clear() {}
virtual int32_t Flush() { return 0; }
virtual int32_t Shrink(const std::string &param) { return 0; }
//指定保存路径
// 指定保存路径
virtual int32_t Save(const std::string &path, const std::string &converter) {
return 0;
}
......@@ -617,19 +650,28 @@ class GraphTable : public Table {
size_t len,
FeatureNode *node);
virtual int32_t get_node_feat(int idx,
const std::vector<uint64_t> &node_ids,
const std::vector<std::string> &feature_names,
std::vector<std::vector<std::string>> &res);
virtual int32_t set_node_feat(
virtual int32_t get_node_feat(
int idx,
const std::vector<uint64_t> &node_ids,
const std::vector<std::string> &feature_names,
const std::vector<std::vector<std::string>> &res);
std::vector<std::vector<std::string>> &res); // NOLINT
virtual int32_t set_node_feat(
int idx,
const std::vector<uint64_t> &node_ids, // NOLINT
const std::vector<std::string> &feature_names, // NOLINT
const std::vector<std::vector<std::string>> &res); // NOLINT
size_t get_server_num() { return server_num; }
void clear_graph();
void clear_graph(int idx);
void clear_edge_shard();
void clear_feature_shard();
void feature_shrink_to_fit();
void merge_feature_shard();
void release_graph();
void release_graph_edge();
void release_graph_node();
virtual int32_t make_neighbor_sample_cache(size_t size_limit, size_t ttl) {
{
std::unique_lock<std::mutex> lock(mutex_);
......@@ -662,20 +704,24 @@ class GraphTable : public Table {
uint64_t id,
int sample_size,
const std::shared_ptr<std::mt19937_64> rng,
int &actual_size);
int &actual_size); // NOLINT
virtual int32_t add_node_to_ssd(
int type_id, int idx, uint64_t src_id, char *data, int len);
virtual paddle::framework::GpuPsCommGraph make_gpu_ps_graph(
int idx, std::vector<uint64_t> ids);
int idx, const std::vector<uint64_t> &ids);
virtual paddle::framework::GpuPsCommGraphFea make_gpu_ps_graph_fea(
std::vector<uint64_t> &node_ids, int slot_num);
int gpu_id, std::vector<uint64_t> &node_ids, int slot_num); // NOLINT
int32_t Load_to_ssd(const std::string &path, const std::string &param);
int64_t load_graph_to_memory_from_ssd(int idx, std::vector<uint64_t> &ids);
int64_t load_graph_to_memory_from_ssd(int idx,
std::vector<uint64_t> &ids); // NOLINT
int32_t make_complementary_graph(int idx, int64_t byte_size);
int32_t dump_edges_to_ssd(int idx);
int32_t get_partition_num(int idx) { return partitions[idx].size(); }
std::vector<uint64_t> get_partition(int idx, int index) {
if (idx >= (int)partitions.size() || index >= (int)partitions[idx].size())
std::vector<int> slot_feature_num_map() const {
return slot_feature_num_map_;
}
std::vector<uint64_t> get_partition(size_t idx, size_t index) {
if (idx >= partitions.size() || index >= partitions[idx].size())
return std::vector<uint64_t>();
return partitions[idx][index];
}
......@@ -691,10 +737,19 @@ class GraphTable : public Table {
#endif
virtual int32_t add_comm_edge(int idx, uint64_t src_id, uint64_t dst_id);
virtual int32_t build_sampler(int idx, std::string sample_type = "random");
void set_slot_feature_separator(const std::string &ch);
void set_feature_separator(const std::string &ch);
void build_graph_total_keys();
void build_graph_type_keys();
std::vector<uint64_t> graph_total_keys_;
std::vector<std::vector<uint64_t>> graph_type_keys_;
std::unordered_map<int, int> type_to_index_;
std::vector<std::vector<GraphShard *>> edge_shards, feature_shards;
size_t shard_start, shard_end, server_num, shard_num_per_server, shard_num;
int task_pool_size_ = 24;
int task_pool_size_ = 64;
int load_thread_num = 160;
const int random_sample_nodes_ranges = 3;
......@@ -708,8 +763,10 @@ class GraphTable : public Table {
std::vector<std::string> id_to_feature, id_to_edge;
std::string table_name;
std::string table_type;
std::vector<std::string> edge_type_size;
std::vector<std::shared_ptr<::ThreadPool>> _shards_task_pool;
std::vector<std::shared_ptr<::ThreadPool>> _cpu_worker_pool;
std::vector<std::shared_ptr<std::mt19937_64>> _shards_task_rng_pool;
std::shared_ptr<::ThreadPool> load_node_edge_task_pool;
std::shared_ptr<ScaledLRU<SampleKey, SampleResult>> scaled_lru;
......@@ -720,6 +777,7 @@ class GraphTable : public Table {
int cache_ttl;
mutable std::mutex mutex_;
bool build_sampler_on_cpu;
bool is_load_reverse_edge = false;
std::shared_ptr<pthread_rwlock_t> rw_lock;
#ifdef PADDLE_WITH_HETERPS
// paddle::framework::GpuPsGraphTable gpu_graph_table;
......@@ -728,7 +786,9 @@ class GraphTable : public Table {
// std::shared_ptr<GraphSampler> graph_sampler;
// REGISTER_GRAPH_FRIEND_CLASS(2, CompleteGraphSampler, BasicBfsGraphSampler)
#endif
std::string slot_feature_separator_ = std::string(" ");
std::string feature_separator_ = std::string(" ");
std::vector<int> slot_feature_num_map_;
};
/*
......
......@@ -43,6 +43,12 @@ int CtrDymfAccessor::Initialize() {
if (_config.ctr_accessor_param().show_scale()) {
_show_scale = true;
}
for (int i = 0; i < _config.ctr_accessor_param().load_filter_slots_size();
i++) {
_filtered_slots.insert(_config.ctr_accessor_param().load_filter_slots(i));
VLOG(0) << "CtrDymfAccessor::Initialize() load filter slot:"
<< _config.ctr_accessor_param().load_filter_slots(i);
}
VLOG(0) << " INTO CtrDymfAccessor::Initialize(); embed_sgd_dim:"
<< common_feature_value.embed_sgd_dim
<< " embedx_dim:" << common_feature_value.embedx_dim
......@@ -104,6 +110,15 @@ bool CtrDymfAccessor::SaveSSD(float* value) {
return false;
}
bool CtrDymfAccessor::FilterSlot(float* value) {
// 热启时过滤掉_filtered_slots中的feasign
if (_filtered_slots.find(common_feature_value.Slot(value)) !=
_filtered_slots.end()) {
return true;
}
return false;
}
bool CtrDymfAccessor::Save(float* value, int param) {
auto base_threshold = _config.ctr_accessor_param().base_threshold();
auto delta_threshold = _config.ctr_accessor_param().delta_threshold();
......@@ -177,7 +192,8 @@ void CtrDymfAccessor::UpdateStatAfterSave(float* value, int param) {
int32_t CtrDymfAccessor::Create(float** values, size_t num) {
for (size_t value_item = 0; value_item < num; ++value_item) {
float* value = values[value_item];
value[common_feature_value.UnseenDaysIndex()] = 0;
common_feature_value.UnseenDays(value) = 0;
common_feature_value.PassId(value) = 0;
value[common_feature_value.DeltaScoreIndex()] = 0;
value[common_feature_value.ShowIndex()] = 0;
value[common_feature_value.ClickIndex()] = 0;
......@@ -292,7 +308,8 @@ std::string CtrDymfAccessor::ParseToString(const float* v, int param) {
thread_local std::ostringstream os;
os.clear();
os.str("");
os << v[0] << " " << v[1] << " " << v[2] << " " << v[3] << " " << v[4];
os << common_feature_value.UnseenDays(const_cast<float*>(v)) << " " << v[1]
<< " " << v[2] << " " << v[3] << " " << v[4];
// << v[5] << " " << v[6];
for (int i = common_feature_value.EmbedG2SumIndex();
i < common_feature_value.EmbedxG2SumIndex();
......@@ -302,7 +319,8 @@ std::string CtrDymfAccessor::ParseToString(const float* v, int param) {
auto show = common_feature_value.Show(const_cast<float*>(v));
auto click = common_feature_value.Click(const_cast<float*>(v));
auto score = ShowClickScore(show, click);
auto mf_dim = int(common_feature_value.MfDim(const_cast<float*>(v)));
auto mf_dim =
static_cast<int>(common_feature_value.MfDim(const_cast<float*>(v)));
if (score >= _config.embedx_threshold() &&
param > common_feature_value.EmbedxG2SumIndex()) {
for (auto i = common_feature_value.EmbedxG2SumIndex();
......@@ -316,9 +334,24 @@ std::string CtrDymfAccessor::ParseToString(const float* v, int param) {
int CtrDymfAccessor::ParseFromString(const std::string& str, float* value) {
auto ret = paddle::string::str_to_float(str.data(), value);
float unseen_day = value[common_feature_value.UnseenDaysIndex()];
common_feature_value.UnseenDays(value) = (uint16_t)(unseen_day);
common_feature_value.PassId(value) = 0;
CHECK(ret >= 7) << "expect more than 7 real:" << ret;
return ret;
}
bool CtrDymfAccessor::SaveMemCache(float* value,
int param,
double global_cache_threshold,
uint16_t pass_id) {
return common_feature_value.Show(value) > global_cache_threshold ||
common_feature_value.PassId(value) >= pass_id;
}
void CtrDymfAccessor::UpdatePassId(float* value, uint16_t pass_id) {
common_feature_value.PassId(value) = pass_id;
}
} // namespace distributed
} // namespace paddle
......@@ -21,6 +21,7 @@
#include "paddle/fluid/distributed/common/registerer.h"
#include "paddle/fluid/distributed/ps/table/accessor.h"
#include "paddle/fluid/distributed/ps/table/sparse_sgd_rule.h"
#include "paddle/fluid/distributed/ps/thirdparty/round_robin.h"
#include "paddle/fluid/distributed/the_one_ps.pb.h"
namespace paddle {
......@@ -30,7 +31,7 @@ namespace distributed {
class CtrDymfAccessor : public ValueAccessor {
public:
struct CtrDymfFeatureValue {
/*
/*v1: old version
float unseen_days;
float delta_score;
float show;
......@@ -44,6 +45,20 @@ class CtrDymfAccessor : public ValueAccessor {
// float embedx_g2sum;
std::vector<float> embedx_w;
*/
/* V2: support pass_id
uint16_t pass_id;
uint16_t unseen_days;
float show;
float click;
float embed_w;
// float embed_g2sum;
std::vector<float> embed_g2sum;
float slot;
float mf_dim
std::<vector>float embedx_g2sum;
// float embedx_g2sum;
std::vector<float> embedx_w;
*/
int Dim() { return 7 + embed_sgd_dim + embedx_sgd_dim + embedx_dim; }
int DimSize(size_t dim, int embedx_dim) { return sizeof(float); }
......@@ -60,7 +75,7 @@ class CtrDymfAccessor : public ValueAccessor {
int EmbedxWIndex() { return EmbedxG2SumIndex() + embedx_sgd_dim; }
// 根据mf_dim计算的总长度
int Dim(int& mf_dim) {
int Dim(int mf_dim) {
int tmp_embedx_sgd_dim = 1;
if (optimizer_name == "SparseAdamSGDRule") { // adam
tmp_embedx_sgd_dim = mf_dim * 2 + 2;
......@@ -71,9 +86,19 @@ class CtrDymfAccessor : public ValueAccessor {
}
// 根据mf_dim计算的总byte数
int Size(int& mf_dim) { return (Dim(mf_dim)) * sizeof(float); }
int Size(int mf_dim) { return (Dim(mf_dim)) * sizeof(float); }
float& UnseenDays(float* val) { return val[UnseenDaysIndex()]; }
uint16_t& PassId(float* val) {
uint16_t* int16_val =
reinterpret_cast<uint16_t*>(val + UnseenDaysIndex());
return int16_val[0];
}
uint16_t& UnseenDays(float* val) {
uint16_t* int16_val =
reinterpret_cast<uint16_t*>(val + UnseenDaysIndex());
return int16_val[1];
}
float& DeltaScore(float* val) { return val[DeltaScoreIndex()]; }
float& Show(float* val) { return val[ShowIndex()]; }
float& Click(float* val) { return val[ClickIndex()]; }
......@@ -184,6 +209,7 @@ class CtrDymfAccessor : public ValueAccessor {
int param,
double global_cache_threshold) override;
bool SaveSSD(float* value) override;
bool FilterSlot(float* value);
// update delta_score and unseen_days after save
void UpdateStatAfterSave(float* value, int param) override;
// keys不存在时,为values生成随机值
......@@ -217,6 +243,14 @@ class CtrDymfAccessor : public ValueAccessor {
return 0.0;
}
// 根据pass_id和show_threashold阈值来判断cache到ssd
bool SaveMemCache(float* value,
int param,
double global_cache_threshold,
uint16_t pass_id);
// 更新pass_id
void UpdatePassId(float* value, uint16_t pass_id);
private:
// float ShowClickScore(float show, float click);
......@@ -233,6 +267,7 @@ class CtrDymfAccessor : public ValueAccessor {
float ShowClickScore(float show, float click);
SparseValueSGDRule* _embed_sgd_rule;
SparseValueSGDRule* _embedx_sgd_rule;
robin_hood::unordered_set<float> _filtered_slots;
};
} // namespace distributed
} // namespace paddle
......@@ -16,8 +16,12 @@
#include <glog/logging.h>
#include <rocksdb/db.h>
#include <rocksdb/filter_policy.h>
#include <rocksdb/iostats_context.h>
#include <rocksdb/options.h>
#include <rocksdb/perf_context.h>
#include <rocksdb/perf_level.h>
#include <rocksdb/slice.h>
#include <rocksdb/slice_transform.h>
#include <rocksdb/table.h>
#include <rocksdb/write_batch.h>
......@@ -27,6 +31,55 @@
namespace paddle {
namespace distributed {
class Uint64Comparator : public rocksdb::Comparator {
int Compare(const rocksdb::Slice& a, const rocksdb::Slice& b) const {
uint64_t A = *(reinterpret_cast<const uint64_t*>(a.data()));
uint64_t B = *(reinterpret_cast<const uint64_t*>(b.data()));
if (A < B) {
return -1;
}
if (A > B) {
return 1;
}
return 0;
}
const char* Name() const { return "Uint64Comparator"; }
void FindShortestSeparator(std::string*, const rocksdb::Slice&) const {}
void FindShortSuccessor(std::string*) const {}
};
class RocksDBItem {
public:
RocksDBItem() {}
~RocksDBItem() {}
void reset() {
batch_keys.clear();
batch_index.clear();
batch_values.clear();
status.clear();
}
std::vector<rocksdb::Slice> batch_keys;
std::vector<int> batch_index;
std::vector<rocksdb::PinnableSlice> batch_values;
std::vector<rocksdb::Status> status;
};
class RocksDBCtx {
public:
RocksDBCtx() {
items[0].reset();
items[1].reset();
cur_index = 0;
}
~RocksDBCtx() {}
RocksDBItem* switch_item() {
cur_index = (cur_index + 1) % 2;
return &items[cur_index];
}
RocksDBItem items[2];
int cur_index;
};
class RocksDBHandler {
public:
RocksDBHandler() {}
......@@ -38,55 +91,69 @@ class RocksDBHandler {
}
int initialize(const std::string& db_path, const int colnum) {
VLOG(3) << "db path: " << db_path << " colnum: " << colnum;
rocksdb::Options options;
rocksdb::BlockBasedTableOptions bbto;
bbto.block_size = 4 * 1024;
bbto.block_cache = rocksdb::NewLRUCache(64 * 1024 * 1024);
bbto.block_cache_compressed = rocksdb::NewLRUCache(64 * 1024 * 1024);
bbto.cache_index_and_filter_blocks = false;
bbto.filter_policy.reset(rocksdb::NewBloomFilterPolicy(20, false));
bbto.whole_key_filtering = true;
options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbto));
options.keep_log_file_num = 100;
options.max_log_file_size = 50 * 1024 * 1024; // 50MB
options.create_if_missing = true;
options.use_direct_reads = true;
options.max_background_flushes = 5;
options.max_background_compactions = 5;
options.base_background_compactions = 10;
options.write_buffer_size = 256 * 1024 * 1024; // 256MB
options.max_write_buffer_number = 8;
options.max_bytes_for_level_base =
options.max_write_buffer_number * options.write_buffer_size;
options.min_write_buffer_number_to_merge = 1;
options.target_file_size_base = 1024 * 1024 * 1024; // 1024MB
options.memtable_prefix_bloom_size_ratio = 0.02;
options.num_levels = 4;
options.max_open_files = -1;
options.compression = rocksdb::kNoCompression;
options.level0_file_num_compaction_trigger = 8;
options.level0_slowdown_writes_trigger =
1.8 * options.level0_file_num_compaction_trigger;
options.level0_stop_writes_trigger =
3.6 * options.level0_file_num_compaction_trigger;
if (!db_path.empty()) {
std::string rm_cmd = "rm -rf " + db_path;
system(rm_cmd.c_str());
}
rocksdb::Status s = rocksdb::DB::Open(options, db_path, &_db);
assert(s.ok());
_handles.resize(colnum);
VLOG(0) << "db path: " << db_path << " colnum: " << colnum;
_dbs.resize(colnum);
for (int i = 0; i < colnum; i++) {
s = _db->CreateColumnFamily(
options, "shard_" + std::to_string(i), &_handles[i]);
rocksdb::Options options;
options.comparator = &_comparator;
rocksdb::BlockBasedTableOptions bbto;
// options.memtable_factory.reset(rocksdb::NewHashSkipListRepFactory(65536));
// options.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(2));
bbto.format_version = 5;
bbto.use_delta_encoding = false;
bbto.block_size = 4 * 1024;
bbto.block_restart_interval = 6;
bbto.block_cache = rocksdb::NewLRUCache(64 * 1024 * 1024);
// bbto.block_cache_compressed = rocksdb::NewLRUCache(64 * 1024 * 1024);
bbto.cache_index_and_filter_blocks = false;
bbto.filter_policy.reset(rocksdb::NewBloomFilterPolicy(15, false));
bbto.whole_key_filtering = true;
options.statistics = rocksdb::CreateDBStatistics();
options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbto));
// options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
options.keep_log_file_num = 100;
// options.db_log_dir = "./log/rocksdb";
options.max_log_file_size = 50 * 1024 * 1024; // 50MB
// options.threads = 8;
options.create_if_missing = true;
options.use_direct_reads = true;
options.max_background_flushes = 37;
options.max_background_compactions = 64;
options.base_background_compactions = 10;
options.write_buffer_size = 256 * 1024 * 1024; // 256MB
options.max_write_buffer_number = 8;
options.max_bytes_for_level_base =
options.max_write_buffer_number * options.write_buffer_size;
options.min_write_buffer_number_to_merge = 1;
options.target_file_size_base = 1024 * 1024 * 1024; // 1024MB
// options.verify_checksums_in_compaction = false;
// options.disable_auto_compactions = true;
options.memtable_prefix_bloom_size_ratio = 0.02;
options.num_levels = 4;
options.max_open_files = -1;
options.compression = rocksdb::kNoCompression;
// options.compaction_options_fifo = rocksdb::CompactionOptionsFIFO();
// options.compaction_style =
// rocksdb::CompactionStyle::kCompactionStyleFIFO;
options.level0_file_num_compaction_trigger = 5;
options.level0_slowdown_writes_trigger =
1.8 * options.level0_file_num_compaction_trigger;
options.level0_stop_writes_trigger =
3.6 * options.level0_file_num_compaction_trigger;
std::string shard_path = db_path + "_" + std::to_string(i);
if (!shard_path.empty()) {
std::string rm_cmd = "rm -rf " + shard_path;
system(rm_cmd.c_str());
}
rocksdb::Status s = rocksdb::DB::Open(options, shard_path, &_dbs[i]);
assert(s.ok());
}
LOG(INFO) << "DB initialize success, colnum:" << colnum;
VLOG(0) << "DB initialize success, colnum:" << colnum;
return 0;
}
......@@ -94,36 +161,32 @@ class RocksDBHandler {
int id, const char* key, int key_len, const char* value, int value_len) {
rocksdb::WriteOptions options;
options.disableWAL = true;
rocksdb::Status s = _db->Put(options,
_handles[id],
rocksdb::Slice(key, key_len),
rocksdb::Slice(value, value_len));
rocksdb::Status s = _dbs[id]->Put(options,
rocksdb::Slice(key, key_len),
rocksdb::Slice(value, value_len));
assert(s.ok());
return 0;
}
int put_batch(int id,
std::vector<std::pair<char*, int>>& ssd_keys,
std::vector<std::pair<char*, int>>& ssd_values,
std::vector<std::pair<char*, int>>& ssd_keys, // NOLINT
std::vector<std::pair<char*, int>>& ssd_values, // NOLINT
int n) {
rocksdb::WriteOptions options;
options.disableWAL = true;
rocksdb::WriteBatch batch(n * 128);
for (int i = 0; i < n; i++) {
batch.Put(_handles[id],
rocksdb::Slice(ssd_keys[i].first, ssd_keys[i].second),
batch.Put(rocksdb::Slice(ssd_keys[i].first, ssd_keys[i].second),
rocksdb::Slice(ssd_values[i].first, ssd_values[i].second));
}
rocksdb::Status s = _db->Write(options, &batch);
rocksdb::Status s = _dbs[id]->Write(options, &batch);
assert(s.ok());
return 0;
}
int get(int id, const char* key, int key_len, std::string& value) {
rocksdb::Status s = _db->Get(rocksdb::ReadOptions(),
_handles[id],
rocksdb::Slice(key, key_len),
&value);
int get(int id, const char* key, int key_len, std::string& value) { // NOLINT
rocksdb::Status s = _dbs[id]->Get(
rocksdb::ReadOptions(), rocksdb::Slice(key, key_len), &value);
if (s.IsNotFound()) {
return 1;
}
......@@ -131,33 +194,63 @@ class RocksDBHandler {
return 0;
}
void multi_get(int id,
const size_t num_keys,
const rocksdb::Slice* keys,
rocksdb::PinnableSlice* values,
rocksdb::Status* status,
const bool sorted_input = true) {
rocksdb::ColumnFamilyHandle* handle = _dbs[id]->DefaultColumnFamily();
auto read_opt = rocksdb::ReadOptions();
read_opt.fill_cache = false;
_dbs[id]->MultiGet(
read_opt, handle, num_keys, keys, values, status, sorted_input);
}
int del_data(int id, const char* key, int key_len) {
rocksdb::WriteOptions options;
options.disableWAL = true;
rocksdb::Status s =
_db->Delete(options, _handles[id], rocksdb::Slice(key, key_len));
rocksdb::Status s = _dbs[id]->Delete(options, rocksdb::Slice(key, key_len));
assert(s.ok());
return 0;
}
int flush(int id) {
rocksdb::Status s = _db->Flush(rocksdb::FlushOptions(), _handles[id]);
rocksdb::Status s = _dbs[id]->Flush(rocksdb::FlushOptions());
assert(s.ok());
return 0;
}
rocksdb::Iterator* get_iterator(int id) {
return _db->NewIterator(rocksdb::ReadOptions(), _handles[id]);
return _dbs[id]->NewIterator(rocksdb::ReadOptions());
}
int get_estimate_key_num(uint64_t& num_keys) {
_db->GetAggregatedIntProperty("rocksdb.estimate-num-keys", &num_keys);
int get_estimate_key_num(uint64_t& num_keys) { // NOLINT
num_keys = 0;
for (size_t i = 0; i < _dbs.size(); i++) {
uint64_t cur_keys = 0;
_dbs[i]->GetAggregatedIntProperty("rocksdb.estimate-num-keys", &cur_keys);
num_keys += cur_keys;
}
return 0;
}
Uint64Comparator* get_comparator() { return &_comparator; }
int ingest_externel_file(int id,
const std::vector<std::string>& sst_filelist) {
rocksdb::IngestExternalFileOptions ifo;
ifo.move_files = true;
rocksdb::Status s = _dbs[id]->IngestExternalFile(sst_filelist, ifo);
assert(s.ok());
return 0;
}
private:
std::vector<rocksdb::ColumnFamilyHandle*> _handles;
rocksdb::DB* _db;
// rocksdb::DB* _db;
std::vector<rocksdb::DB*> _dbs;
Uint64Comparator _comparator;
};
} // namespace distributed
} // namespace paddle
......@@ -31,7 +31,7 @@ namespace distributed {
class Node {
public:
Node() {}
Node(uint64_t id) : id(id) {}
explicit Node(uint64_t id) : id(id) {}
virtual ~Node() {}
static int id_size, int_size, weight_size;
uint64_t get_id() { return id; }
......@@ -56,8 +56,14 @@ class Node {
virtual int get_feature_ids(int slot_idx, std::vector<uint64_t> *res) const {
return 0;
}
virtual int get_feature_ids(int slot_idx,
std::vector<uint64_t> &feature_id, // NOLINT
std::vector<uint8_t> &slot_id) const { // NOLINT
return 0;
}
virtual void set_feature(int idx, const std::string &str) {}
virtual void set_feature_size(int size) {}
virtual void shrink_to_fit() {}
virtual int get_feature_size() { return 0; }
virtual size_t get_neighbor_size() { return 0; }
......@@ -69,7 +75,8 @@ class Node {
class GraphNode : public Node {
public:
GraphNode() : Node(), sampler(nullptr), edges(nullptr) {}
GraphNode(uint64_t id) : Node(id), sampler(nullptr), edges(nullptr) {}
explicit GraphNode(uint64_t id)
: Node(id), sampler(nullptr), edges(nullptr) {}
virtual ~GraphNode();
virtual void build_edges(bool is_weighted);
virtual void build_sampler(std::string sample_type);
......@@ -92,13 +99,13 @@ class GraphNode : public Node {
class FeatureNode : public Node {
public:
FeatureNode() : Node() {}
FeatureNode(uint64_t id) : Node(id) {}
explicit FeatureNode(uint64_t id) : Node(id) {}
virtual ~FeatureNode() {}
virtual int get_size(bool need_feature);
virtual void to_buffer(char *buffer, bool need_feature);
virtual void recover_from_buffer(char *buffer);
virtual std::string get_feature(int idx) {
if (idx < (int)this->feature.size()) {
if (idx < static_cast<int>(this->feature.size())) {
return this->feature[idx];
} else {
return std::string("");
......@@ -135,7 +142,7 @@ class FeatureNode : public Node {
"get_feature_ids res should not be null"));
res->clear();
errno = 0;
if (slot_idx < (int)this->feature.size()) {
if (slot_idx < static_cast<int>(this->feature.size())) {
const std::string &s = this->feature[slot_idx];
const uint64_t *feas = (const uint64_t *)(s.c_str());
......@@ -155,21 +162,51 @@ class FeatureNode : public Node {
return 0;
}
virtual int get_feature_ids(int slot_idx,
std::vector<uint64_t> &feature_id, // NOLINT
std::vector<uint8_t> &slot_id) const { // NOLINT
errno = 0;
size_t num = 0;
if (slot_idx < static_cast<int>(this->feature.size())) {
const std::string &s = this->feature[slot_idx];
const uint64_t *feas = (const uint64_t *)(s.c_str());
num = s.length() / sizeof(uint64_t);
CHECK((s.length() % sizeof(uint64_t)) == 0)
<< "bad feature_item: [" << s << "]";
for (size_t i = 0; i < num; ++i) {
feature_id.push_back(feas[i]);
slot_id.push_back(slot_idx);
}
}
PADDLE_ENFORCE_EQ(
errno,
0,
paddle::platform::errors::InvalidArgument(
"get_feature_ids get errno should be 0, but got %d.", errno));
return num;
}
virtual std::string *mutable_feature(int idx) {
if (idx >= (int)this->feature.size()) {
if (idx >= static_cast<int>(this->feature.size())) {
this->feature.resize(idx + 1);
}
return &(this->feature[idx]);
}
virtual void set_feature(int idx, const std::string &str) {
if (idx >= (int)this->feature.size()) {
if (idx >= static_cast<int>(this->feature.size())) {
this->feature.resize(idx + 1);
}
this->feature[idx] = str;
}
virtual void set_feature_size(int size) { this->feature.resize(size); }
virtual int get_feature_size() { return this->feature.size(); }
virtual void shrink_to_fit() {
feature.shrink_to_fit();
for (auto &slot : feature) {
slot.shrink_to_fit();
}
}
template <typename T>
static std::string parse_value_to_bytes(std::vector<std::string> feat_str) {
......@@ -179,7 +216,8 @@ class FeatureNode : public Node {
for (size_t i = 0; i < feat_str.size(); i++) {
std::stringstream ss(feat_str[i]);
ss >> v;
std::memcpy(buffer + sizeof(T) * i, (char *)&v, sizeof(T));
std::memcpy(
buffer + sizeof(T) * i, reinterpret_cast<char *>(&v), sizeof(T));
}
return std::string(buffer, Tsize);
}
......@@ -196,7 +234,8 @@ class FeatureNode : public Node {
for (size_t i = 0; i < feat_str_size; i++) {
std::stringstream ss(*(feat_str_begin + i));
ss >> v;
std::memcpy(buffer + sizeof(T) * i, (char *)&v, sizeof(T));
std::memcpy(
buffer + sizeof(T) * i, reinterpret_cast<char *>(&v), sizeof(T));
}
output->assign(buffer);
}
......@@ -208,7 +247,7 @@ class FeatureNode : public Node {
size_t start = 0;
const char *buffer = feat_str.data();
while (start < feat_str.size()) {
std::memcpy((char *)&v, buffer + start, sizeof(T));
std::memcpy(reinterpret_cast<char *>(&v), buffer + start, sizeof(T));
start += sizeof(T);
out.push_back(v);
}
......@@ -225,7 +264,7 @@ class FeatureNode : public Node {
size_t num = output->length();
output->resize(num + Tsize);
T *fea_ptrs = (T *)(&(*output)[num]);
T *fea_ptrs = reinterpret_cast<T *>(&(*output)[num]);
thread_local paddle::string::str_ptr_stream ss;
for (size_t i = 0; i < feat_str_size; i++) {
......
......@@ -137,7 +137,12 @@ int32_t MemorySparseTable::Load(const std::string &path,
size_t feature_value_size =
_value_accesor->GetAccessorInfo().size / sizeof(float);
#ifdef PADDLE_WITH_HETERPS
int thread_num = _real_local_shard_num;
#else
int thread_num = _real_local_shard_num < 15 ? _real_local_shard_num : 15;
#endif
omp_set_num_threads(thread_num);
#pragma omp parallel for schedule(dynamic)
for (int i = 0; i < _real_local_shard_num; ++i) {
......@@ -167,12 +172,6 @@ int32_t MemorySparseTable::Load(const std::string &path,
value.resize(feature_value_size);
int parse_size = _value_accesor->ParseFromString(++end, value.data());
value.resize(parse_size);
// for debug
for (int ii = 0; ii < parse_size; ++ii) {
VLOG(2) << "MemorySparseTable::load key: " << key << " value " << ii
<< ": " << value.data()[ii] << " local_shard: " << i;
}
}
read_channel->close();
if (err_no == -1) {
......@@ -340,7 +339,7 @@ int32_t MemorySparseTable::Save(const std::string &dirname,
size_t file_start_idx = _avg_local_shard_num * _shard_idx;
#ifdef PADDLE_WITH_GPU_GRAPH
#ifdef PADDLE_WITH_HETERPS
int thread_num = _real_local_shard_num;
#else
int thread_num = _real_local_shard_num < 20 ? _real_local_shard_num : 20;
......@@ -723,7 +722,8 @@ int32_t MemorySparseTable::Pull(TableContext &context) {
if (context.use_ptr) {
char **pull_values = context.pull_context.ptr_values;
const uint64_t *keys = context.pull_context.keys;
return PullSparsePtr(pull_values, keys, context.num);
return PullSparsePtr(
context.shard_id, pull_values, keys, context.num, context.pass_id);
} else {
float *pull_values = context.pull_context.values;
const PullSparseValue &pull_value = context.pull_context.pull_value;
......@@ -820,9 +820,11 @@ int32_t MemorySparseTable::PullSparse(float *pull_values,
return 0;
}
int32_t MemorySparseTable::PullSparsePtr(char **pull_values,
int32_t MemorySparseTable::PullSparsePtr(int shard_id, // fake num
char **pull_values,
const uint64_t *keys,
size_t num) {
size_t num,
uint16_t pass_id) {
CostTimer timer("pscore_sparse_select_all");
size_t value_size = _value_accesor->GetAccessorInfo().size / sizeof(float);
size_t mf_value_size =
......
......@@ -90,7 +90,11 @@ class MemorySparseTable : public Table {
std::pair<int64_t, int64_t> PrintTableStat() override;
int32_t PullSparse(float* values, const PullSparseValue& pull_value);
int32_t PullSparsePtr(char** pull_values, const uint64_t* keys, size_t num);
int32_t PullSparsePtr(int shard_id,
char** pull_values,
const uint64_t* keys,
size_t num,
uint16_t pass_id);
int32_t PushSparse(const uint64_t* keys, const float* values, size_t num);
......
......@@ -21,6 +21,41 @@
namespace paddle {
namespace distributed {
class MemRegion {
public:
MemRegion() {
_cap = 2 * 1024 * 1024;
_buf = reinterpret_cast<char*>(malloc(_cap));
_cur = 0;
_file_idx = -1;
}
virtual ~MemRegion() { free(_buf); }
bool buff_remain(int len) {
if (_cap - _cur < len) {
return false;
} else {
return true;
}
}
char* acquire(int len) {
if (_cap - _cur < len) {
return nullptr;
} else {
char* ret = _buf + _cur;
_cur += len;
return ret;
}
}
void reset() {
_cur = 0;
_file_idx = -1;
}
int _cap;
int _cur;
int _file_idx;
char* _buf;
};
class SSDSparseTable : public MemorySparseTable {
public:
typedef SparseTableShard<uint64_t, FixedFeatureValue> shard_type;
......@@ -38,27 +73,34 @@ class SSDSparseTable : public MemorySparseTable {
int32_t Push(TableContext& context) override;
int32_t PullSparse(float* pull_values, const uint64_t* keys, size_t num);
int32_t PullSparsePtr(char** pull_values, const uint64_t* keys, size_t num);
int32_t PullSparsePtr(int shard_id,
char** pull_values,
const uint64_t* keys,
size_t num,
uint16_t pass_id);
int32_t PushSparse(const uint64_t* keys, const float* values, size_t num);
int32_t PushSparse(const uint64_t* keys, const float** values, size_t num);
int32_t Flush() override { return 0; }
virtual int32_t Shrink(const std::string& param) override;
virtual void Clear() override {
int32_t Shrink(const std::string& param) override;
void Clear() override {
for (int i = 0; i < _real_local_shard_num; ++i) {
_local_shards[i].clear();
}
}
virtual int32_t Save(const std::string& path,
const std::string& param) override;
virtual int32_t SaveCache(
int32_t Save(const std::string& path, const std::string& param) override;
int32_t SaveWithString(const std::string& path, const std::string& param);
int32_t SaveWithStringMultiOutput(const std::string& path,
const std::string& param);
int32_t SaveWithBinary(const std::string& path, const std::string& param);
int32_t SaveCache(
const std::string& path,
const std::string& param,
paddle::framework::Channel<std::pair<uint64_t, std::string>>&
shuffled_channel) override;
virtual double GetCacheThreshold() override { return _local_show_threshold; }
virtual int64_t CacheShuffle(
double GetCacheThreshold() override { return _local_show_threshold; }
int64_t CacheShuffle(
const std::string& path,
const std::string& param,
double cache_threshold,
......@@ -67,20 +109,25 @@ class SSDSparseTable : public MemorySparseTable {
paddle::framework::Channel<std::pair<uint64_t, std::string>>&
shuffled_channel,
const std::vector<Table*>& table_ptrs) override;
//加载path目录下数据
virtual int32_t Load(const std::string& path,
const std::string& param) override;
//加载path目录下数据[start_idx, end_idx)
virtual int32_t Load(size_t start_idx,
size_t end_idx,
const std::vector<std::string>& file_list,
const std::string& param);
// 加载path目录下数据
int32_t Load(const std::string& path, const std::string& param) override;
int32_t LoadWithString(size_t file_start_idx,
size_t end_idx,
const std::vector<std::string>& file_list,
const std::string& param);
int32_t LoadWithBinary(const std::string& path, int param);
int64_t LocalSize();
std::pair<int64_t, int64_t> PrintTableStat() override;
int32_t CacheTable(uint16_t pass_id) override;
private:
RocksDBHandler* _db;
int64_t _cache_tk_size;
double _local_show_threshold{0.0};
std::vector<paddle::framework::Channel<std::string>> _fs_channel;
std::mutex _table_mutex;
};
} // namespace distributed
......
......@@ -26,16 +26,15 @@
#include "paddle/fluid/distributed/ps/table/sparse_accessor.h"
#include "paddle/fluid/distributed/ps/table/ssd_sparse_table.h"
#include "paddle/fluid/distributed/ps/table/tensor_accessor.h"
#include "paddle/fluid/distributed/ps/table/tensor_table.h"
namespace paddle {
namespace distributed {
REGISTER_PSCORE_CLASS(Table, GraphTable);
REGISTER_PSCORE_CLASS(Table, MemoryDenseTable);
REGISTER_PSCORE_CLASS(Table, BarrierTable);
REGISTER_PSCORE_CLASS(Table, TensorTable);
REGISTER_PSCORE_CLASS(Table, DenseTensorTable);
REGISTER_PSCORE_CLASS(Table, GlobalStepTable);
// REGISTER_PSCORE_CLASS(Table, TensorTable);
// REGISTER_PSCORE_CLASS(Table, DenseTensorTable);
// REGISTER_PSCORE_CLASS(Table, GlobalStepTable);
REGISTER_PSCORE_CLASS(Table, MemorySparseTable);
REGISTER_PSCORE_CLASS(Table, SSDSparseTable);
REGISTER_PSCORE_CLASS(Table, MemorySparseGeoTable);
......
......@@ -62,6 +62,8 @@ struct TableContext {
size_t num;
bool use_ptr = false;
uint32_t trainer_id; // for GEO and global step
int shard_id; // for gpups
uint16_t pass_id; // for gpups ssd
};
class Table {
......@@ -147,6 +149,7 @@ class Table {
virtual void *GetShard(size_t shard_idx) = 0;
virtual std::pair<int64_t, int64_t> PrintTableStat() { return {0, 0}; }
virtual int32_t CacheTable(uint16_t pass_id) { return 0; }
// for patch model
virtual void Revert() {}
......
......@@ -747,6 +747,17 @@ void FleetWrapper::PrintTableStat(const uint64_t table_id) {
}
}
void FleetWrapper::SaveCacheTable(const uint64_t table_id,
uint16_t pass_id,
size_t threshold) {
auto ret = worker_ptr_->SaveCacheTable(table_id, pass_id, threshold);
ret.wait();
int32_t err_code = ret.get();
if (err_code == -1) {
LOG(ERROR) << "save cache table stat failed";
}
}
void FleetWrapper::ShrinkSparseTable(int table_id, int threshold) {
auto ret = worker_ptr_->Shrink(table_id, std::to_string(threshold));
ret.wait();
......
......@@ -242,6 +242,9 @@ class FleetWrapper {
void BarrierWithTable(uint32_t barrier_type);
void PrintTableStat(const uint64_t table_id);
void SaveCacheTable(const uint64_t table_id,
uint16_t pass_id,
size_t threshold);
// mode = 0, load all feature
// mode = 1, load delta feature, which means load diff
void LoadModel(const std::string& path, const int mode);
......
......@@ -169,6 +169,7 @@ message CtrAccessorParameter {
[ default = 1 ]; // threshold to save ssd
optional bool show_scale = 10 [ default = true ];
optional bool zero_init = 11 [ default = true ];
repeated float load_filter_slots = 12;
}
message TensorAccessorParameter {
......
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#ifdef __LINUX__
#include <pthread.h>
#include <semaphore.h>
#endif
#include "paddle/fluid/platform/enforce.h"
namespace paddle {
namespace framework {
class Barrier {
public:
explicit Barrier(int count = 1) {
#ifdef __LINUX__
CHECK_GE(count, 1);
CHECK_EQ(pthread_barrier_init(&_barrier, NULL, count), 0);
#endif
}
~Barrier() {
#ifdef __LINUX__
CHECK_EQ(pthread_barrier_destroy(&_barrier), 0);
#endif
}
void reset(int count) {
#ifdef __LINUX__
CHECK_GE(count, 1);
CHECK_EQ(pthread_barrier_destroy(&_barrier), 0);
CHECK_EQ(pthread_barrier_init(&_barrier, NULL, count), 0);
#endif
}
void wait() {
#ifdef __LINUX__
int err = pthread_barrier_wait(&_barrier);
if (err != 0 && err != PTHREAD_BARRIER_SERIAL_THREAD)) {
CHECK_EQ(1, 0);
}
#endif
}
private:
#ifdef __LINUX__
pthread_barrier_t _barrier;
#endif
};
// Call func(args...). If interrupted by signal, recall the function.
template <class FUNC, class... ARGS>
auto ignore_signal_call(FUNC &&func, ARGS &&...args) ->
typename std::result_of<FUNC(ARGS...)>::type {
for (;;) {
auto err = func(args...);
if (err < 0 && errno == EINTR) {
LOG(INFO) << "Signal is caught. Ignored.";
continue;
}
return err;
}
}
class Semaphore {
public:
Semaphore() {
#ifdef __LINUX__
CHECK_EQ(sem_init(&_sem, 0, 0), 0);
#endif
}
~Semaphore() {
#ifdef __LINUX__
CHECK_EQ(sem_destroy(&_sem), 0);
#endif
}
void post() {
#ifdef __LINUX__
CHECK_EQ(sem_post(&_sem), 0);
#endif
}
void wait() {
#ifdef __LINUX__
CHECK_EQ(ignore_signal_call(sem_wait, &_sem), 0);
#endif
}
bool try_wait() {
int err = 0;
#ifdef __LINUX__
CHECK((err = ignore_signal_call(sem_trywait, &_sem),
err == 0 || errno == EAGAIN));
#endif
return err == 0;
}
private:
#ifdef __LINUX__
sem_t _sem;
#endif
};
} // namespace framework
} // namespace paddle
......@@ -2114,6 +2114,16 @@ void SlotRecordInMemoryDataFeed::Init(const DataFeedDesc& data_feed_desc) {
#endif
}
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
void SlotRecordInMemoryDataFeed::InitGraphResource() {
gpu_graph_data_generator_.AllocResource(thread_id_, feed_vec_);
}
void SlotRecordInMemoryDataFeed::InitGraphTrainResource() {
gpu_graph_data_generator_.AllocTrainResource(thread_id_);
}
#endif
void SlotRecordInMemoryDataFeed::LoadIntoMemory() {
VLOG(3) << "SlotRecord LoadIntoMemory() begin, thread_id=" << thread_id_;
if (!so_parser_name_.empty()) {
......@@ -2650,7 +2660,7 @@ bool SlotRecordInMemoryDataFeed::Start() {
pack_ = BatchGpuPackMgr().get(this->GetPlace(), used_slots_info_);
#endif
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
gpu_graph_data_generator_.AllocResource(this->place_, feed_vec_);
gpu_graph_data_generator_.SetFeedVec(feed_vec_);
#endif
return true;
}
......@@ -2692,6 +2702,12 @@ int SlotRecordInMemoryDataFeed::Next() {
#endif
}
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
void SlotRecordInMemoryDataFeed::DoWalkandSage() {
gpu_graph_data_generator_.DoWalkandSage();
}
#endif
#if defined(PADDLE_WITH_CUDA) && defined(PADDLE_WITH_HETERPS)
void SlotRecordInMemoryDataFeed::BuildSlotBatchGPU(const int ins_num) {
int offset_cols_size = (ins_num + 1);
......
此差异已折叠。
......@@ -4,7 +4,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
......@@ -60,6 +60,8 @@ class Scope;
class Variable;
class NeighborSampleResult;
class NodeQueryResult;
template <typename KeyType, typename ValType>
class HashTable;
} // namespace framework
} // namespace paddle
......@@ -878,6 +880,9 @@ struct BufState {
int GetNextBatch() {
cursor += len;
if (row_num - cursor < 0) {
return 0;
}
int tmp_len = cursor + batch_size > row_num ? row_num - cursor : batch_size;
if (tmp_len == 0) {
return 0;
......@@ -895,11 +900,16 @@ class GraphDataGenerator {
GraphDataGenerator() {}
virtual ~GraphDataGenerator() {}
void SetConfig(const paddle::framework::DataFeedDesc& data_feed_desc);
void AllocResource(const paddle::platform::Place& place,
std::vector<phi::DenseTensor*> feed_vec);
void AllocResource(int thread_id, std::vector<phi::DenseTensor*> feed_vec);
void AllocTrainResource(int thread_id);
void SetFeedVec(std::vector<phi::DenseTensor*> feed_vec);
int AcquireInstance(BufState* state);
int GenerateBatch();
int FillWalkBuf(std::shared_ptr<phi::Allocation> d_walk);
int FillWalkBuf();
int FillWalkBufMultiPath();
int FillInferBuf();
void DoWalkandSage();
int FillSlotFeature(uint64_t* d_walk);
int FillFeatureBuf(uint64_t* d_walk, uint64_t* d_feature, size_t key_num);
int FillFeatureBuf(std::shared_ptr<phi::Allocation> d_walk,
std::shared_ptr<phi::Allocation> d_feature);
......@@ -910,57 +920,129 @@ class GraphDataGenerator {
int cur_degree,
int step,
int* len_per_row);
int FillInsBuf();
int FillInsBuf(cudaStream_t stream);
int FillIdShowClkTensor(int total_instance,
bool gpu_graph_training,
size_t cursor = 0);
int FillGraphIdShowClkTensor(int uniq_instance,
int total_instance,
int index);
int FillGraphSlotFeature(
int total_instance,
bool gpu_graph_training,
std::shared_ptr<phi::Allocation> final_sage_nodes = nullptr);
int FillSlotFeature(uint64_t* d_walk, size_t key_num);
int MakeInsPair(cudaStream_t stream);
uint64_t CopyUniqueNodes();
int GetPathNum() { return total_row_; }
void ResetPathNum() { total_row_ = 0; }
void ResetEpochFinish() { epoch_finish_ = false; }
void ClearSampleState();
void SetDeviceKeys(std::vector<uint64_t>* device_keys, int type) {
type_to_index_[type] = h_device_keys_.size();
h_device_keys_.push_back(device_keys);
}
// type_to_index_[type] = h_device_keys_.size();
// h_device_keys_.push_back(device_keys);
}
std::vector<std::shared_ptr<phi::Allocation>> SampleNeighbors(
int64_t* uniq_nodes,
int len,
int sample_size,
std::vector<int>& edges_split_num, // NOLINT
int64_t* neighbor_len);
std::shared_ptr<phi::Allocation> FillReindexHashTable(int64_t* input,
int num_input,
int64_t len_hashtable,
int64_t* keys,
int* values,
int* key_index,
int* final_nodes_len);
std::shared_ptr<phi::Allocation> GetReindexResult(int64_t* reindex_src_data,
int64_t* center_nodes,
int* final_nodes_len,
int node_len,
int64_t neighbor_len);
std::shared_ptr<phi::Allocation> GenerateSampleGraph(
uint64_t* node_ids,
int len,
int* uniq_len,
std::shared_ptr<phi::Allocation>& inverse); // NOLINT
int InsertTable(const uint64_t* d_keys,
uint64_t len,
std::shared_ptr<phi::Allocation> d_uniq_node_num);
std::vector<uint64_t>& GetHostVec() { return host_vec_; }
bool get_epoch_finish() { return epoch_finish_; }
void clear_gpu_mem();
protected:
HashTable<uint64_t, uint64_t>* table_;
int walk_degree_;
int walk_len_;
int window_;
int once_sample_startid_len_;
int gpuid_;
// start ids
// int64_t* device_keys_;
// size_t device_key_size_;
std::vector<std::vector<uint64_t>*> h_device_keys_;
std::unordered_map<int, int> type_to_index_;
// point to device_keys_
size_t cursor_;
int thread_id_;
size_t jump_rows_;
int edge_to_id_len_;
int64_t* id_tensor_ptr_;
int* index_tensor_ptr_;
int64_t* show_tensor_ptr_;
int64_t* clk_tensor_ptr_;
cudaStream_t stream_;
cudaStream_t train_stream_;
cudaStream_t sample_stream_;
paddle::platform::Place place_;
std::vector<phi::DenseTensor*> feed_vec_;
std::vector<size_t> offset_;
std::shared_ptr<phi::Allocation> d_prefix_sum_;
std::vector<std::shared_ptr<phi::Allocation>> d_device_keys_;
std::shared_ptr<phi::Allocation> d_train_metapath_keys_;
std::shared_ptr<phi::Allocation> d_walk_;
std::shared_ptr<phi::Allocation> d_feature_list_;
std::shared_ptr<phi::Allocation> d_feature_;
std::shared_ptr<phi::Allocation> d_len_per_row_;
std::shared_ptr<phi::Allocation> d_random_row_;
//
std::shared_ptr<phi::Allocation> d_uniq_node_num_;
std::shared_ptr<phi::Allocation> d_slot_feature_num_map_;
std::shared_ptr<phi::Allocation> d_actual_slot_id_map_;
std::shared_ptr<phi::Allocation> d_fea_offset_map_;
std::vector<std::shared_ptr<phi::Allocation>> d_sampleidx2rows_;
int cur_sampleidx2row_;
// record the keys to call graph_neighbor_sample
std::shared_ptr<phi::Allocation> d_sample_keys_;
int sample_keys_len_;
std::set<int> finish_node_type_;
std::unordered_map<int, size_t> node_type_start_;
std::vector<int> infer_node_type_start_;
std::shared_ptr<phi::Allocation> d_ins_buf_;
std::shared_ptr<phi::Allocation> d_feature_buf_;
std::shared_ptr<phi::Allocation> d_feature_size_list_buf_;
std::shared_ptr<phi::Allocation> d_feature_size_prefixsum_buf_;
std::shared_ptr<phi::Allocation> d_pair_num_;
std::shared_ptr<phi::Allocation> d_slot_tensor_ptr_;
std::shared_ptr<phi::Allocation> d_slot_lod_tensor_ptr_;
std::shared_ptr<phi::Allocation> d_reindex_table_key_;
std::shared_ptr<phi::Allocation> d_reindex_table_value_;
std::shared_ptr<phi::Allocation> d_reindex_table_index_;
std::vector<std::shared_ptr<phi::Allocation>> edge_type_graph_;
std::shared_ptr<phi::Allocation> d_sorted_keys_;
std::shared_ptr<phi::Allocation> d_sorted_idx_;
std::shared_ptr<phi::Allocation> d_offset_;
std::shared_ptr<phi::Allocation> d_merged_cnts_;
std::shared_ptr<phi::Allocation> d_buf_;
// sage mode batch data
std::vector<std::shared_ptr<phi::Allocation>> inverse_vec_;
std::vector<std::shared_ptr<phi::Allocation>> final_sage_nodes_vec_;
std::vector<int> uniq_instance_vec_;
std::vector<int> total_instance_vec_;
std::vector<std::vector<std::shared_ptr<phi::Allocation>>> graph_edges_vec_;
std::vector<std::vector<std::vector<int>>> edges_split_num_vec_;
int64_t reindex_table_size_;
int sage_batch_count_;
int sage_batch_num_;
int ins_buf_pair_len_;
// size of a d_walk buf
size_t buf_size_;
int repeat_time_;
......@@ -968,11 +1050,23 @@ class GraphDataGenerator {
BufState buf_state_;
int batch_size_;
int slot_num_;
std::vector<int> h_slot_feature_num_map_;
int fea_num_per_node_;
int shuffle_seed_;
int debug_mode_;
std::vector<int> first_node_type_;
std::vector<std::vector<int>> meta_path_;
bool gpu_graph_training_;
bool sage_mode_;
std::vector<int> samples_;
bool epoch_finish_;
std::vector<uint64_t> host_vec_;
std::vector<uint64_t> h_device_keys_len_;
uint64_t h_train_metapath_keys_len_;
uint64_t train_table_cap_;
uint64_t infer_table_cap_;
uint64_t copy_unique_len_;
int total_row_;
size_t infer_node_start_;
size_t infer_node_end_;
};
class DataFeed {
......@@ -1037,11 +1131,14 @@ class DataFeed {
virtual void SetParseLogKey(bool parse_logkey) {}
virtual void SetEnablePvMerge(bool enable_pv_merge) {}
virtual void SetCurrentPhase(int current_phase) {}
virtual void SetDeviceKeys(std::vector<uint64_t>* device_keys, int type) {
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
virtual void InitGraphResource() {}
virtual void InitGraphTrainResource() {}
virtual void SetDeviceKeys(std::vector<uint64_t>* device_keys, int type) {
gpu_graph_data_generator_.SetDeviceKeys(device_keys, type);
#endif
}
#endif
virtual void SetGpuGraphMode(int gpu_graph_mode) {
gpu_graph_mode_ = gpu_graph_mode;
}
......@@ -1058,6 +1155,42 @@ class DataFeed {
return ins_content_vec_;
}
virtual int GetCurBatchSize() { return batch_size_; }
virtual int GetGraphPathNum() {
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
return gpu_graph_data_generator_.GetPathNum();
#else
return 0;
#endif
}
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
virtual const std::vector<uint64_t>* GetHostVec() {
return &(gpu_graph_data_generator_.GetHostVec());
}
virtual void clear_gpu_mem() { gpu_graph_data_generator_.clear_gpu_mem(); }
virtual bool get_epoch_finish() {
return gpu_graph_data_generator_.get_epoch_finish();
}
virtual void ResetPathNum() { gpu_graph_data_generator_.ResetPathNum(); }
virtual void ClearSampleState() {
gpu_graph_data_generator_.ClearSampleState();
}
virtual void ResetEpochFinish() {
gpu_graph_data_generator_.ResetEpochFinish();
}
virtual void DoWalkandSage() {
PADDLE_THROW(platform::errors::Unimplemented(
"This function(DoWalkandSage) is not implemented."));
}
#endif
virtual bool IsTrainMode() { return train_mode_; }
virtual void LoadIntoMemory() {
PADDLE_THROW(platform::errors::Unimplemented(
"This function(LoadIntoMemory) is not implemented."));
......@@ -1132,6 +1265,7 @@ class DataFeed {
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
GraphDataGenerator gpu_graph_data_generator_;
#endif
bool train_mode_;
};
// PrivateQueueDataFeed is the base virtual class for ohther DataFeeds.
......@@ -1669,6 +1803,13 @@ class SlotRecordInMemoryDataFeed : public InMemoryDataFeed<SlotRecord> {
const int float_slot_size,
const UsedSlotGpuType* used_slots);
#endif
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
virtual void InitGraphResource(void);
virtual void InitGraphTrainResource(void);
virtual void DoWalkandSage();
#endif
float sample_rate_ = 1.0f;
int use_slot_size_ = 0;
int float_use_slot_size_ = 0;
......
......@@ -38,6 +38,10 @@ message GraphConfig {
optional string first_node_type = 8;
optional string meta_path = 9;
optional bool gpu_graph_training = 10 [ default = true ];
optional bool sage_mode = 11 [ default = false ];
optional string samples = 12;
optional int64 train_table_cap = 13 [ default = 80000 ];
optional int64 infer_table_cap = 14 [ default = 80000 ];
}
message DataFeedDesc {
......
......@@ -36,7 +36,9 @@
#endif
USE_INT_STAT(STAT_total_feasign_num_in_mem);
USE_INT_STAT(STAT_epoch_finish);
DECLARE_bool(graph_get_neighbor_id);
DECLARE_int32(gpugraph_storage_mode);
namespace paddle {
namespace framework {
......@@ -456,80 +458,56 @@ void DatasetImpl<T>::LoadIntoMemory() {
std::vector<std::thread> load_threads;
if (gpu_graph_mode_) {
VLOG(0) << "in gpu_graph_mode";
#ifdef PADDLE_WITH_HETERPS
graph_all_type_total_keys_.clear();
auto gpu_graph_ptr = GraphGpuWrapper::GetInstance();
auto node_to_id = gpu_graph_ptr->feature_to_id;
auto edge_to_id = gpu_graph_ptr->edge_to_id;
graph_all_type_total_keys_.resize(node_to_id.size());
int cnt = 0;
for (auto& iter : node_to_id) {
int node_idx = iter.second;
std::vector<std::vector<uint64_t>> gpu_graph_device_keys;
gpu_graph_ptr->get_all_id(
1, node_idx, thread_num_, &gpu_graph_device_keys);
auto& type_total_key = graph_all_type_total_keys_[cnt];
type_total_key.resize(thread_num_);
for (size_t i = 0; i < gpu_graph_device_keys.size(); i++) {
VLOG(2) << "node type: " << node_idx << ", gpu_graph_device_keys[" << i
<< "] = " << gpu_graph_device_keys[i].size();
for (size_t j = 0; j < gpu_graph_device_keys[i].size(); j++) {
gpu_graph_total_keys_.push_back(gpu_graph_device_keys[i][j]);
type_total_key[i].push_back(gpu_graph_device_keys[i][j]);
}
}
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
for (size_t i = 0; i < readers_.size(); i++) {
readers_[i]->SetGpuGraphMode(gpu_graph_mode_);
}
if (STAT_GET(STAT_epoch_finish) == 1) {
VLOG(0) << "get epoch finish true";
STAT_RESET(STAT_epoch_finish, 0);
for (size_t i = 0; i < readers_.size(); i++) {
readers_[i]->SetDeviceKeys(&type_total_key[i], node_idx);
readers_[i]->SetGpuGraphMode(gpu_graph_mode_);
readers_[i]->ResetPathNum();
readers_[i]->ResetEpochFinish();
}
cnt++;
return;
}
VLOG(2) << "begin add feature_id into gpu_graph_total_keys_ size["
<< gpu_graph_total_keys_.size() << "]";
for (auto& iter : node_to_id) {
std::vector<std::vector<uint64_t>> gpu_graph_device_keys;
int node_idx = iter.second;
gpu_graph_ptr->get_all_feature_ids(
1, node_idx, thread_num_, &gpu_graph_device_keys);
for (size_t i = 0; i < gpu_graph_device_keys.size(); i++) {
VLOG(2) << "begin node type: " << node_idx << ", gpu_graph_device_keys["
<< i << "] = " << gpu_graph_device_keys[i].size();
for (size_t j = 0; j < gpu_graph_device_keys[i].size(); j++) {
gpu_graph_total_keys_.push_back(gpu_graph_device_keys[i][j]);
}
VLOG(2) << "end node type: " << node_idx << ", gpu_graph_device_keys["
<< i << "] = " << gpu_graph_device_keys[i].size();
for (int64_t i = 0; i < thread_num_; ++i) {
load_threads.push_back(std::thread(
&paddle::framework::DataFeed::DoWalkandSage, readers_[i].get()));
}
for (std::thread& t : load_threads) {
t.join();
}
uint64_t node_num = 0;
for (int i = 0; i < thread_num_; i++) {
auto host_vec = readers_[i]->GetHostVec();
node_num += host_vec->size();
}
gpu_graph_total_keys_.reserve(node_num);
for (int i = 0; i < thread_num_; i++) {
auto host_vec = readers_[i]->GetHostVec();
for (size_t j = 0; j < host_vec->size(); j++) {
gpu_graph_total_keys_.push_back((*host_vec)[j]);
}
}
VLOG(2) << "end add feature_id into gpu_graph_total_keys_ size["
<< gpu_graph_total_keys_.size() << "]";
// FIX: trick for iterate edge table
for (auto& iter : edge_to_id) {
int edge_idx = iter.second;
std::vector<std::vector<uint64_t>> gpu_graph_device_keys;
gpu_graph_ptr->get_all_id(
0, edge_idx, thread_num_, &gpu_graph_device_keys);
for (size_t i = 0; i < gpu_graph_device_keys.size(); i++) {
VLOG(1) << "edge type: " << edge_idx << ", gpu_graph_device_keys[" << i
<< "] = " << gpu_graph_device_keys[i].size();
for (size_t j = 0; j < gpu_graph_device_keys[i].size(); j++) {
gpu_graph_total_keys_.push_back(gpu_graph_device_keys[i][j]);
}
if (GetEpochFinish() == true) {
VLOG(0) << "epoch finish, set stat and clear sample stat!";
STAT_RESET(STAT_epoch_finish, 1);
for (size_t i = 0; i < readers_.size(); i++) {
readers_[i]->ClearSampleState();
}
if (FLAGS_graph_get_neighbor_id) {
std::vector<std::vector<uint64_t>> gpu_graph_neighbor_keys;
gpu_graph_ptr->get_all_neighbor_id(
0, edge_idx, thread_num_, &gpu_graph_neighbor_keys);
for (size_t i = 0; i < gpu_graph_neighbor_keys.size(); i++) {
for (size_t k = 0; k < gpu_graph_neighbor_keys[i].size(); k++) {
gpu_graph_total_keys_.push_back(gpu_graph_neighbor_keys[i][k]);
}
}
}
if (FLAGS_gpugraph_storage_mode != GpuGraphStorageMode::WHOLE_HBM) {
for (size_t i = 0; i < readers_.size(); i++) {
readers_[i]->clear_gpu_mem();
}
}
VLOG(2) << "end add edge into gpu_graph_total_keys_ size["
<< gpu_graph_total_keys_.size() << "]";
#endif
} else {
for (int64_t i = 0; i < thread_num_; ++i) {
......@@ -1126,7 +1104,30 @@ void DatasetImpl<T>::DestroyPreLoadReaders() {
template <typename T>
int64_t DatasetImpl<T>::GetMemoryDataSize() {
return input_channel_->Size();
if (gpu_graph_mode_) {
int64_t total_path_num = 0;
for (int i = 0; i < thread_num_; i++) {
total_path_num += readers_[i]->GetGraphPathNum();
}
return total_path_num;
} else {
return input_channel_->Size();
}
}
template <typename T>
bool DatasetImpl<T>::GetEpochFinish() {
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
bool is_epoch_finish = true;
if (gpu_graph_mode_) {
for (int i = 0; i < thread_num_; i++) {
is_epoch_finish = is_epoch_finish && readers_[i]->get_epoch_finish();
}
}
return is_epoch_finish;
#else
return false;
#endif
}
template <typename T>
......@@ -1783,6 +1784,9 @@ void SlotRecordDataset::CreateReaders() {
readers_[i]->SetParseLogKey(parse_logkey_);
readers_[i]->SetEnablePvMerge(enable_pv_merge_);
readers_[i]->SetCurrentPhase(current_phase_);
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
readers_[i]->InitGraphResource();
#endif
if (input_channel_ != nullptr) {
readers_[i]->SetInputChannel(input_channel_.get());
}
......
......@@ -167,6 +167,10 @@ class Dataset {
virtual void SetGpuGraphMode(int is_graph_mode) = 0;
virtual int GetGpuGraphMode() = 0;
virtual bool GetEpochFinish() = 0;
virtual void SetPassId(uint32_t pass_id) = 0;
virtual uint32_t GetPassID() = 0;
protected:
virtual int ReceiveFromClient(int msg_type,
......@@ -260,11 +264,7 @@ class DatasetImpl : public Dataset {
virtual void DynamicAdjustReadersNum(int thread_num);
virtual void SetFleetSendSleepSeconds(int seconds);
virtual std::vector<std::string> GetSlots();
/* for enable_heterps_
virtual void EnableHeterps(bool enable_heterps) {
enable_heterps_ = enable_heterps;
}
*/
virtual bool GetEpochFinish();
std::vector<paddle::framework::Channel<T>>& GetMultiOutputChannel() {
return multi_output_channel_;
......@@ -280,7 +280,9 @@ class DatasetImpl : public Dataset {
std::vector<uint64_t>& GetGpuGraphTotalKeys() {
return gpu_graph_total_keys_;
}
Channel<T>& GetInputChannelRef() { return input_channel_; }
virtual void SetPassId(uint32_t pass_id) { pass_id_ = pass_id; }
virtual uint32_t GetPassID() { return pass_id_; }
protected:
virtual int ReceiveFromClient(int msg_type,
......@@ -341,9 +343,9 @@ class DatasetImpl : public Dataset {
std::vector<std::string> use_slots_;
bool enable_heterps_ = false;
int gpu_graph_mode_ = 0;
// std::vector<std::vector<int64_t>> gpu_graph_device_keys_;
std::vector<std::vector<std::vector<uint64_t>>> graph_all_type_total_keys_;
std::vector<std::vector<std::vector<uint64_t>>> gpu_graph_type_keys_;
std::vector<uint64_t> gpu_graph_total_keys_;
uint32_t pass_id_ = 0;
};
// use std::vector<MultiSlotType> or Record as data type
......
......@@ -242,6 +242,7 @@ class DeviceWorker {
ChannelWriter<std::string> writer_;
const size_t tensor_iterator_thread_num = 16;
platform::DeviceContext* dev_ctx_ = nullptr;
int thread_num_;
};
class CPUWorkerBase : public DeviceWorker {
......@@ -289,6 +290,7 @@ class HogwildWorker : public CPUWorkerBase {
HogwildWorkerParameter param_;
std::vector<std::string> skip_ops_;
std::map<std::string, int> stat_var_name_map_;
static std::atomic<uint64_t> worker_num_stat_;
};
class DownpourWorker : public HogwildWorker {
......
......@@ -53,5 +53,34 @@ TEST(DisMultiTrainerTest, test1) {
tmp1->Finalize();
#endif
}
TEST(DisMultiTrainerTest, testforgpugraph) {
#ifdef _LINUX
TrainerDesc t;
t.set_class_name("MultiTrainer");
t.set_device_worker_name("HogwildWorker");
t.set_thread_num(1);
auto* m = t.mutable_downpour_param()->add_program_config();
m->set_program_id("123");
std::string str;
str += "name: \"MultiSlotDataFeed\"\nbatch_size: 2\nmulti_slot_desc {\n";
str += "slots {\nname: \"words\"\ntype: \"uint64\"\nis_dense: false\n";
str += "is_used: true\n}\nslots {\nname: \"label\"\ntype: \"uint64\"\n";
str += "is_dense: false\nis_used: true\n}\n}\n";
std::shared_ptr<MultiSlotDataset> dataset =
std::make_shared<MultiSlotDataset>();
dataset->SetFileList(std::vector<std::string>());
dataset->SetThreadNum(1);
dataset->SetTrainerNum(1);
dataset->SetDataFeedDesc(str);
dataset->CreateReaders();
dataset->SetGpuGraphMode(true);
dataset->GetMemoryDataSize();
dataset->SetPassId(2);
dataset->GetPassID();
dataset->GetEpochFinish();
#endif
}
} // namespace framework
} // namespace paddle
......@@ -287,6 +287,7 @@ message CtrAccessorParameter {
optional float delete_after_unseen_days = 8 [ default = 30 ];
optional int32 ssd_unseenday_threshold = 9 [ default = 1 ];
optional bool show_scale = 10 [ default = true ];
repeated float load_filter_slots = 11;
}
message TableAccessorSaveParameter {
......
......@@ -29,7 +29,8 @@ if(WITH_HETERPS)
nv_library(
ps_gpu_wrapper
SRCS ps_gpu_wrapper.cu ps_gpu_wrapper.cc
DEPS heter_ps gloo_wrapper ps_framework_proto ${BRPC_DEPS})
DEPS heter_ps gloo_wrapper ps_framework_proto graph_gpu_wrapper
${BRPC_DEPS})
else()
nv_library(
ps_gpu_wrapper
......
......@@ -352,7 +352,8 @@ void GlooWrapper::Init() {
}
#endif
is_initialized_ = true;
VLOG(3) << "gloo initialized done.";
VLOG(0) << "gloo initialized done, rank=" << rank_ << ", size=" << size_
<< ", store_type=" << store_type_;
}
template std::vector<int64_t> GlooWrapper::AllReduce<int64_t>(
......
......@@ -85,7 +85,9 @@ class HeterContext {
std::vector<std::vector<std::mutex*>> dim_mutex_;
int multi_mf_dim_ = 0;
void* sub_graph_feas = NULL;
uint32_t shard_num_ = 37;
uint16_t pass_id_ = 0;
uint64_t size() {
uint64_t total_size = 0;
for (auto& keys : feature_keys_) {
......
......@@ -524,6 +524,7 @@ class concurrent_unordered_map : public managed {
__forceinline__ __device__ iterator
insert(const value_type& x,
aggregation_type op,
uint64_t* local_count = NULL,
comparison_type keys_equal = key_equal(),
bool precomputed_hash = false,
hash_value_type precomputed_hash_value = 0) {
......@@ -548,7 +549,6 @@ class concurrent_unordered_map : public managed {
const key_type insert_key = x.first;
bool insert_success = false;
size_type counter = 0;
while (false == insert_success) {
if (counter++ >= hashtbl_size) {
......@@ -577,19 +577,20 @@ class concurrent_unordered_map : public managed {
if (keys_equal(unused_key, old_key) || keys_equal(insert_key, old_key)) {
update_existing_value(existing_value, x, op);
insert_success = true;
if (m_enable_collision_stat) {
atomicAdd(&m_insert_times, 1);
if (local_count != NULL && keys_equal(unused_key, old_key)) {
atomicAdd(local_count, 1);
}
break;
}
if (m_enable_collision_stat) {
atomicAdd(&m_insert_collisions, 1);
}
current_index = (current_index + 1) % hashtbl_size;
current_hash_bucket = &(hashtbl_values[current_index]);
}
if (m_enable_collision_stat) {
atomicAdd(&m_insert_times, 1);
atomicAdd(&m_insert_collisions, uint64_t(counter + 1));
}
return iterator(
m_hashtbl_values, m_hashtbl_values + hashtbl_size, current_hash_bucket);
}
......@@ -675,15 +676,13 @@ x.second );
begin_ptr = m_hashtbl_values + m_hashtbl_size;
break;
}
if (m_enable_collision_stat) {
atomicAdd(&m_query_collisions, 1);
}
hash_tbl_idx = (hash_tbl_idx + 1) % m_hashtbl_size;
++counter;
}
if (m_enable_collision_stat) {
atomicAdd(&m_query_times, 1);
atomicAdd(&m_query_collisions, (uint64_t)(counter + 1));
}
return const_iterator(
......
......@@ -55,7 +55,7 @@ template <typename GPUAccessor, template <typename T> class GPUOptimizer>
HeterPs<GPUAccessor, GPUOptimizer>::HeterPs(
size_t capacity,
std::shared_ptr<HeterPsResource> resource,
GPUAccessor& gpu_accessor) {
const GPUAccessor& gpu_accessor) {
comm_ = std::make_shared<HeterComm<FeatureKey, float*, float*, GPUAccessor>>(
capacity, resource);
opt_ = GPUOptimizer<GPUAccessor>(gpu_accessor);
......
......@@ -54,7 +54,7 @@ template <typename GPUAccessor, template <typename T> class GPUOptimizer>
HeterPs<GPUAccessor, GPUOptimizer>::HeterPs(
size_t capacity,
std::shared_ptr<HeterPsResource> resource,
GPUAccessor& gpu_accessor) {
const GPUAccessor& gpu_accessor) {
comm_ = std::make_shared<HeterComm<FeatureKey, float*, float*, GPUAccessor>>(
capacity, resource, gpu_accessor);
opt_ = GPUOptimizer<GPUAccessor>(gpu_accessor);
......@@ -122,8 +122,9 @@ template <typename GPUAccessor, template <typename T> class GPUOptimizer>
void HeterPs<GPUAccessor, GPUOptimizer>::set_nccl_comm_and_size(
const std::vector<ncclComm_t>& inner_comms,
const std::vector<ncclComm_t>& inter_comms,
int comm_size) {
comm_->set_nccl_comm_and_size(inner_comms, inter_comms, comm_size);
int comm_size,
int rank_id) {
comm_->set_nccl_comm_and_size(inner_comms, inter_comms, comm_size, rank_id);
}
template <typename GPUAccessor, template <typename T> class GPUOptimizer>
......
......@@ -32,7 +32,7 @@ class HeterPs : public HeterPsBase {
HeterPs() {}
HeterPs(size_t capacity,
std::shared_ptr<HeterPsResource> resource,
GPUAccessor& gpu_accessor);
const GPUAccessor& gpu_accessor);
virtual ~HeterPs();
HeterPs(const HeterPs&) = delete;
HeterPs& operator=(const HeterPs&) = delete;
......@@ -41,8 +41,6 @@ class HeterPs : public HeterPsBase {
FeatureKey* d_keys,
float* d_vals,
size_t len) override;
// void build_ps(int num, FeatureKey* h_keys, float* h_vals, size_t len,
// size_t chunk_size, int stream_num) override;
void build_ps(int num,
FeatureKey* h_keys,
char* pool,
......@@ -53,7 +51,8 @@ class HeterPs : public HeterPsBase {
#if defined(PADDLE_WITH_CUDA)
void set_nccl_comm_and_size(const std::vector<ncclComm_t>& inner_comms,
const std::vector<ncclComm_t>& inter_comms,
int comm_size) override;
int comm_size,
int rank_id) override;
void set_multi_mf_dim(int multi_mf_dim, int max_mf_dim) override;
#endif
......@@ -79,6 +78,16 @@ class HeterPs : public HeterPsBase {
uint32_t* d_merged_cnts,
bool filter_zero);
#endif
// reset table
void reset_table(const int dev_id,
size_t capacity,
const OptimizerConfig& sgd_config,
const OptimizerConfig& embedx_config,
bool infer_mode) {
comm_->reset_table(dev_id, capacity, sgd_config, embedx_config, infer_mode);
}
void set_mode(bool infer_mode) { comm_->set_mode(infer_mode); }
private:
std::shared_ptr<HeterComm<FeatureKey, float*, float*, GPUAccessor>> comm_;
#if defined(PADDLE_WITH_CUDA)
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册