worker.collector.comm

base_comm_collector

Please Reference ding/worker/collector/comm/base_comm_collector.py for usage

BaseCommCollector

class ding.worker.collector.comm.base_comm_collector.BaseCommCollector(cfg)[source]
Overview:

Abstract baseclass for common collector.

Interfaces:

__init__, get_policy_update_info, send_metadata, send_stepdata start, close, _create_collector

Property:

collector_uid

__init__(cfg)[source]
Overview:

Initialization method.

Arguments:
  • cfg (EasyDict): Config dict

_create_collector(task_info: dict) ding.worker.collector.base_parallel_collector.BaseParallelCollector[source]
Overview:

Receive task_info passed from coordinator and create a collector.

Arguments:
  • task_info (dict): Task info dict from coordinator. Should be like Returns:

  • collector (BaseParallelCollector): Created base collector.

Note:

Four methods(‘send_metadata’, ‘send_stepdata’, ‘get_policy_update_info’), and policy are set. The reason why they are set here rather than base collector is, they highly depend on the specific task. Only after task info is passed from coordinator to comm collector through learner slave, can they be clarified and initialized.

close() None[source]
Overview:

Close comm collector.

abstract get_policy_update_info(path: str) Any[source]
Overview:

Get policy information in corresponding path. Will be registered in base collector.

Arguments:
  • path (str): path to policy update information.

abstract send_metadata(metadata: Any) None[source]
Overview:

Store meta data in queue, which will be retrieved by callback function “deal_with_collector_data” in collector slave, then will be sent to coordinator. Will be registered in base collector.

Arguments:
  • metadata (Any): meta data.

abstract send_stepdata(stepdata: Any) None[source]
Overview:

Save step data in corresponding path. Will be registered in base collector.

Arguments:
  • stepdata (Any): step data.

start() None[source]
Overview:

Start comm collector.

create_comm_collector

Overview:

Given the key(comm_collector_name), create a new comm collector instance if in comm_map’s values, or raise an KeyError. In other words, a derived comm collector must first register, then can call create_comm_collector to get the instance.

Arguments:
  • cfg (EasyDict): Collector config. Necessary keys: [import_names, comm_collector_type].

Returns:
  • collector (BaseCommCollector): The created new comm collector, should be an instance of one of comm_map’s values.

flask_fs_collector

Please Reference ding/worker/collector/comm/flask_fs_collector.py for usage

CollectorSlave

class ding.worker.collector.comm.flask_fs_collector.CollectorSlave(*args, callback_fn: Dict[str, Callable], **kwargs)[source]
Overview:

A slave, whose master is coordinator. Used to pass message between comm collector and coordinator.

Interfaces:

__init__, _process_task

__init__(*args, callback_fn: Dict[str, Callable], **kwargs) None[source]
Overview:

Init callback functions additionally. Callback functions are methods in comm collector.

_process_task(task: dict) Union[dict, ding.interaction.slave.action.TaskFail][source]
Overview:

Process a task according to input task info dict, which is passed in by master coordinator. For each type of task, you can refer to corresponding callback function in comm collector for details.

Arguments:
  • cfg (EasyDict): Task dict. Must contain key “name”.

Returns:
  • result (Union[dict, TaskFail]): Task result dict, or task fail exception.

FlaskFileSystemCollector

class ding.worker.collector.comm.flask_fs_collector.FlaskFileSystemCollector(cfg: dict)[source]
Overview:

An implementation of CommLearner, using flask and the file system.

Interfaces:

__init__, deal_with_resource, deal_with_collector_start, deal_with_collector_data, deal_with_collector_close, get_policy_update_info, send_stepdata, send_metadata, start, close

__init__(cfg: dict) None[source]
Overview:

Initialization method.

Arguments:
  • cfg (EasyDict): Config dict

close() None[source]
Overview:

Close comm collector itself and the collector slave.

deal_with_collector_data() dict[source]
Overview:

Callback function in CollectorSlave. Get data sample dict from _metadata_queue, which will be sent to coordinator afterwards.

Returns:
  • data (Any): Data sample dict.

deal_with_collector_start(task_info: dict) None[source]
Overview:

Callback function in CollectorSlave. Create a collector and start a collector thread of the created one.

Arguments:
  • task_info (dict): Task info dict.

Note:

In _create_collector method in base class BaseCommCollector, 4 methods ‘send_metadata’, ‘send_stepdata’, ‘get_policy_update_info’, and policy are set. You can refer to it for details.

deal_with_resource() dict[source]
Overview:

Callback function in CollectorSlave. Return how many resources are needed to start current collector.

Returns:
  • resource (dict): Resource info dict, including [‘gpu’, ‘cpu’].

get_policy_update_info(path: str) dict[source]
Overview:

Get policy information in corresponding path.

Arguments:
  • path (str): path to policy update information.

send_metadata(metadata: dict) None[source]
Overview:

Store learn info dict in queue, which will be retrieved by callback function “deal_with_collector_learn” in collector slave, then will be sent to coordinator.

Arguments:
  • metadata (Any): meta data.

send_stepdata(path: str, stepdata: list) None[source]
Overview:

Save collector’s step data in corresponding path.

Arguments:
  • path (str): Path to save data.

  • stepdata (Any): Data of one step.

start() None[source]
Overview:

Start comm collector itself and the collector slave.

utils

Please Reference ding/worker/collector/comm/utils.py for usage

NaiveCollector

class ding.worker.collector.comm.utils.NaiveCollector(*args, prefix='', **kwargs)[source]
Overview:

A slave, whose master is coordinator. Used to pass message between comm collector and coordinator.

Interfaces:

_process_task, _get_timestep

_process_task(task)[source]
Overview:

Process a task according to input task info dict, which is passed in by master coordinator. For each type of task, you can refer to corresponding callback function in comm collector for details.

Arguments:
  • cfg (EasyDict): Task dict. Must contain key “name”.

Returns:
  • result (Union[dict, TaskFail]): Task result dict, or task fail exception.