worker.collector.base_serial_collector

base_serial_collector

Please Reference ding/worker/collector/base_serial_collector.py for usage

ISerialCollector

class ding.worker.collector.base_serial_collector.ISerialCollector[source]
Overview:

Abstract baseclass for serial collector.

Interfaces:

default_config, reset_env, reset_policy, reset, collect

Property:

envstep

abstract collect(per_collect_target: Any) List[Any][source]
Overview:

Collect the corresponding data according to the specified target and return. There are different definitions in episode and sample mode.

classmethod default_config() easydict.EasyDict[source]
Overview:

Get collector’s default config. We merge collector’s default config with other default configs and user’s config to get the final config.

Return:

cfg: (EasyDict): collector’s default config

abstract property envstep: int
Overview:

Get the total envstep num.

abstract reset(_policy: Optional[collections.namedtuple] = None, _env: Optional[ding.envs.env_manager.base_env_manager.BaseEnvManager] = None) None[source]
Overview:

Reset collector’s policy and environment. Use new policy and environment to collect data.

abstract reset_env(_env: Optional[ding.envs.env_manager.base_env_manager.BaseEnvManager] = None) None[source]
Overview:

Reset collector’s environment. In some case, we need collector use the same policy to collect data in different environments. We can use reset_env to reset the environment.

abstract reset_policy(_policy: Optional[collections.namedtuple] = None) None[source]
Overview:

Reset collector’s policy. In some case, we need collector work in this same environment but use different policy to collect data. We can use reset_policy to reset the policy.

create_serial_collector

Overview:

Create a specific collector instance based on the config.

get_serial_collector_cls

Overview:

Get the specific collector class according to the config.

CachePool

class ding.worker.collector.base_serial_collector.CachePool(name: str, env_num: int, deepcopy: bool = False)[source]
Overview:

CachePool is the repository of cache items.

Interfaces:

__init__, update, __getitem__, reset

__getitem__(idx: int) Any[source]
Overview:

Get item in cache pool.

Arguments:
  • idx (int): The index of the item we need to get.

Return:
  • item (Any): The item we get.

__init__(name: str, env_num: int, deepcopy: bool = False) None[source]
Overview:

Initialization method.

Arguments:
  • name (str): name of cache

  • env_num (int): number of environments

  • deepcopy (bool): whether to deepcopy data

reset(idx: int) None[source]
Overview:

Reset the cache pool.

Arguments:
  • idx (int): The index of the position we need to reset.

update(data: Union[Dict[int, Any], list]) None[source]
Overview:

Update elements in cache pool.

Arguments:
  • data (Dict[int, Any]): A dict containing update index-value pairs. Key is index in cache pool, and value is the new element.

TrajBuffer

class ding.worker.collector.base_serial_collector.TrajBuffer(maxlen: int, *args, **kwargs)[source]
Overview:

TrajBuffer is used to store traj_len pieces of transitions.

Interfaces:

__init__, append

__init__(maxlen: int, *args, **kwargs) None[source]
Overview:

Initialization trajBuffer.

Arguments:
  • maxlen (int): The maximum length of trajectory buffer.

append(data: Any) None[source]
Overview:

Append data to trajBuffer.

to_tensor_transitions

Overview:

transitions collected data to tensor.

Argument:
  • data (List[Dict[str, Any]]): the data that will be transited to tensor.

Return:
  • data (List[Dict[str, Any]]): the data that can be transited to tensor.

Tip

In order to save memory, If there are next_obs in the passed data, we do special treatment on next_obs so that the next_obs of each state in the data fragment is the next state’s obs and the next_obs of the last state is its own next_obs, and we make transform_scalar is False.