# 可迭代式 DataPipes > 原文:[`pytorch.org/data/beta/torchdata.datapipes.iter.html`](https://pytorch.org/data/beta/torchdata.datapipes.iter.html) 可迭代式数据集是 IterableDataset 子类的实例,实现了`__iter__()`协议,并表示数据样本的可迭代。这种类型的数据集特别适用于随机读取昂贵甚至不太可能的情况,批量大小取决于获取的数据。 例如,这样一个数据集,当调用`iter(iterdatapipe)`时,可以返回从数据库、远程服务器或实时生成的日志中读取的数据流。 这是`torch`中`IterableDataset`的更新版本。 ```py class torchdata.datapipes.iter.IterDataPipe(*args, **kwds) ``` 可迭代式 DataPipe。 所有表示数据样本可迭代的 DataPipes 都应该是这样的子类。当数据来自流时,或者样本数量太大无法全部放入内存时,这种 DataPipes 风格特别有用。`IterDataPipe`是惰性初始化的,只有在对`IterDataPipe`的迭代器调用`next()`时才计算其元素。 所有子类应该重写`__iter__()`,它将返回此 DataPipe 中样本的迭代器。调用`IterDataPipe`的`__iter__`会自动调用其方法`reset()`,默认情况下不执行任何操作。当编写自定义`IterDataPipe`时,用户应该根据需要重写`reset()`。常见用法包括重置自定义`IterDataPipe`中的缓冲区、指针和各种状态变量。 注意 每次只能有一个迭代器对`IterDataPipe`有效,创建第二个迭代器将使第一个迭代器无效。这个约束是必要的,因为一些`IterDataPipe`具有内部缓冲区,如果有多个迭代器,其状态可能会变得无效。下面的代码示例详细介绍了这个约束在实践中的样子。如果您对这个约束有任何反馈,请参阅[GitHub IterDataPipe Single Iterator Issue](https://github.com/pytorch/data/issues/45)。 这些 DataPipes 可以以两种方式调用,使用类构造函数或将它们的函数形式应用于现有的`IterDataPipe`(推荐,大多数但不是所有 DataPipes 都可用)。您可以将多个 IterDataPipe 链接在一起,形成一个连续执行多个操作的管道。 注意 当子类与`DataLoader`一起使用时,DataPipe 中的每个项目将从`DataLoader`迭代器中产生。当`num_workers > 0`时,每个工作进程将拥有 DataPipe 对象的不同副本,因此通常希望配置每个副本独立以避免从工作进程返回重复数据。`get_worker_info()`在工作进程中调用时,返回有关工作进程的信息。它可以在数据集的`__iter__()`方法或`DataLoader`的`worker_init_fn`选项中使用,以修改每个副本的行为。 示例 通用用法: ```py >>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended) >>> list(map_dp_1) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list(map_dp_2) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0) >>> list(filter_dp) [2, 4, 6, 8, 10] ``` 单迭代器约束示例: ```py >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> source_dp = IterableWrapper(range(10)) >>> it1 = iter(source_dp) >>> list(it1) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> it1 = iter(source_dp) >>> it2 = iter(source_dp) # The creation of a new iterator invalidates `it1` >>> next(it2) 0 >>> next(it1) # Further usage of `it1` will raise a `RunTimeError` ``` 我们有不同类型的 Iterable DataPipes: 1. 存档 - 打开和解压不同格式的存档文件。 1. 增强 - 增强您的样本(例如添加索引,或无限循环)。 1. 组合 - 执行组合操作(例如采样、洗牌)。 1. 组合/拆分 - 通过组合多个 DataPipes 或将一个 DataPipe 拆分为多个来进行交互。 1. 分组 - 在 DataPipe 中对样本进行分组 1. IO - 与文件系统或远程服务器交互(例如下载、打开、保存文件,并列出目录中的文件)。 1. 映射 - 将给定函数应用于 DataPipe 中的每个元素。 1. 其他 - 执行各种操作。 1. 选择 - 在 DataPipe 中选择特定样本。 1. 文本 - 解析、读取和转换文本文件和数据 ## 存档 DataPipes 这些 DataPipes 帮助打开和解压不同格式的存档文件。 | `Bz2FileLoader` | 从包含路径名和 bz2 二进制流元组的可迭代 DataPipe 中解压缩 bz2 二进制流,并产生一个路径名和提取的二进制流元组(函数名:`load_from_bz2`)。 | | --- | --- | | `Decompressor` | 接受路径和压缩数据流的元组,并返回路径和解压缩数据流的元组(函数名:`decompress`)。 | | `RarArchiveLoader` | 从包含路径名和 rar 二进制流元组的输入可迭代 DataPipe 中解压缩 rar 二进制流,并产生一个路径名和提取的二进制流元组(函数名:`load_from_rar`)。 | | `TarArchiveLoader` | 从包含路径名和 tar 二进制流元组的可迭代 DataPipe 中打开/解压缩 tar 二进制流,并产生一个路径名和提取的二进制流元组(函数名:`load_from_tar`)。 | | `TFRecordLoader` | 从包含路径名和 tfrecord 二进制流元组的可迭代 DataPipe 中打开/解压缩 tfrecord 二进制流,并产生存储的记录(函数名:`load_from_tfrecord`)。 | | `WebDataset` | 接受(路径,数据)元组流的可迭代 DataPipe,通常表示 tar 存档的路径名和文件(函数名:`webdataset`)。 | | `XzFileLoader` | 从包含路径名和 xz 二进制流元组的可迭代 DataPipe 中解压缩 xz(lzma)二进制流,并产生一个路径名和提取的二进制流元组(函数名:`load_from_xz`)。 | | `ZipArchiveLoader` | 从包含路径名和 zip 二进制流元组的可迭代 DataPipe 中打开/解压缩 zip 二进制流,并产生一个路径名和提取的二进制流元组(函数名:`load_from_zip`)。 | ## 增强 DataPipes 这些 DataPipes 有助于增强您的样本。 | `Cycler` | 默认情况下永久循环指定的输入,或者循环指定次数(函数名:`cycle`)。 | | --- | --- | | `Enumerator` | 通过枚举向现有 DataPipe 添加索引,默认情况下索引从 0 开始(函数名:`enumerate`)。 | | `IndexAdder` | 向现有可迭代 DataPipe 添加索引(函数名:`add_index`)。 | | `Repeater` | 在移动到下一个元素之前,重复为源 DataPipe 的每个元素指定次数的输出(功能名称:`repeat`)。 | ## 组合式 DataPipes 这些 DataPipes 有助于执行组合操作。 | `InBatchShuffler` | 对来自先前 DataPipe 的每个小批次进行洗牌(功能名称:`in_batch_shuffle`)。 | | --- | --- | | `Sampler` | 使用提供的`Sampler`生成样本元素(默认为`SequentialSampler`)。 | | `Shuffler` | 使用缓冲区对输入 DataPipe 进行洗牌(功能名称:`shuffle`)。 | ## 组合/拆分 DataPipes 这些通常涉及多个 DataPipes,将它们组合在一起或将一个拆分为多个。 | `Concater` | 连接多个 Iterable DataPipes(功能名称:`concat`)。 | | --- | --- | | `Demultiplexer` | 使用给定的分类函数将输入 DataPipe 拆分为多个子 DataPipes(功能名称:`demux`)。 | | `Forker` | 创建相同 Iterable DataPipe 的多个实例(功能名称:`fork`)。 | | `IterKeyZipper` | 根据匹配的键将两个 IterDataPipes 一起压缩(功能名称:`zip_with_iter`)。 | | `MapKeyZipper` | 将源 IterDataPipe 的项目与 MapDataPipe 的项目结合(功能名称:`zip_with_map`)。 | | `Multiplexer` | 从输入的每个 Iterable DataPipe 中一次产生一个元素(功能名称:`mux`)。 | | `MultiplexerLongest` | 从输入的每个 Iterable DataPipe 中一次产生一个元素(功能名称:`mux_longest`)。 | | `RoundRobinDemultiplexer` | 按照轮询顺序将输入 DataPipe 拆分为多个子 DataPipes(功能名称:`round_robin_demux`)。 | | `SampleMultiplexer` | 接受一个(IterDataPipe, Weight)字典,并根据权重从这些 DataPipes 中进行采样生成项目。 | | `UnZipper` | 接受一个序列的 DataPipe,解压每个序列,并根据序列中的位置将元素分别返回到不同的 DataPipes 中(功能名称:`unzip`)。 | | `Zipper` | 从每个输入 DataPipe 中聚合元素为元组(功能名称:`zip`)。 | | `ZipperLongest` | 从每个输入 DataPipe 中聚合元素为元组(功能名称:`zip_longest`)。 | ## Grouping DataPipes 这些 DataPipes 让您在 DataPipe 中对样本进行分组。 | `Batcher` | 创建数据的小批次(功能名称:`batch`)。 | | --- | --- | | `BucketBatcher` | 从排序的桶中创建数据的小批次(功能名称:`bucketbatch`)。 | | `Collator` | 通过自定义整理函数将 DataPipe 中的样本整理为张量(功能名称:`collate`)。 | | `Grouper` | 通过从`group_key_fn`生成的键对来自输入 IterDataPipe 的数据进行分组,并在定义了`group_size`的情况下生成具有最大批量大小的`DataChunk`(功能名称:`groupby`)。 | | `MaxTokenBucketizer` | 从具有限制大小的最小堆中创建数据的小批次,并且每个批次中由`len_fn`返回的样本的总长度将受到`max_token_count`的限制(功能名称:`max_token_bucketize`)。 | | `UnBatcher` | 撤消数据的批处理(功能名称:`unbatch`)。 | ## IO DataPipes 这些 DataPipes 有助于与文件系统或远程服务器进行交互(例如下载、打开、保存文件以及列出目录中的文件)。 | `AISFileLister` | 可迭代的 Datapipe,列出具有给定 URL 前缀的 AIStore 后端的文件(功能名称:`list_files_by_ais`)。 | | --- | --- | | `AISFileLoader` | 可迭代的 DataPipe,从具有给定 URL 的 AIStore 中加载文件(功能名称:`load_files_by_ais`)。 | | `FSSpecFileLister` | 列出提供的`root`路径名或 URL 的目录内容,并为目录中的每个文件生成完整的路径名或 URL(功能名称:`list_files_by_fsspec`)。 | | `FSSpecFileOpener` | 从包含 fsspec 路径的输入 datapipe 中打开文件,并生成路径名和打开的文件流的元组(功能名称:`open_files_by_fsspec`)。 | | `FSSpecSaver` | 接收元数据和数据元组的 DataPipe,将数据保存到目标路径(由 filepath_fn 和元数据生成),并产生结果的 fsspec 路径(函数名:`save_by_fsspec`)。 | | `FileLister` | 给定根目录的路径,产生根目录中文件的路径名(路径+文件名)。 | | `FileOpener` | 给定路径名,打开文件并以元组形式产生路径名和文件流(函数名:`open_files`)。 | | `GDriveReader` | 接收指向 GDrive 文件的 URL,并产生文件名和 IO 流的元组(函数名:`read_from_gdrive`)。 | | `HttpReader` | 接收文件 URL(指向文件的 HTTP URL),并产生文件 URL 和 IO 流的元组(函数名:`read_from_http`)。 | | `HuggingFaceHubReader` | 接收数据集名称并返回一个可迭代的 HuggingFace 数据集。 | | `IoPathFileLister` | 列出提供的`root`路径名或 URL 的目录内容,并为目录中的每个文件产生完整的路径名或 URL(函数名:`list_files_by_iopath`)。 | | `IoPathFileOpener` | 从包含路径名或 URL 的输入 datapipe 中打开文件,并产生路径名和已打开文件流的元组(函数名:`open_files_by_iopath`)。 | | `IoPathSaver` | 接收元数据和数据元组的 DataPipe,将数据保存到由`filepath_fn`和元数据生成的目标路径,并以 iopath 格式(函数名:`save_by_iopath`)产生结果路径。 | | `OnlineReader` | 接收文件 URL(可以是指向文件的 HTTP URL 或指向 GDrive 文件的 URL),并产生文件 URL 和 IO 流的元组(函数名:`read_from_remote`)。 | | `ParquetDataFrameLoader` | 接收 Parquet 文件的路径,并为 Parquet 文件中的每个行组返回一个 TorchArrow DataFrame(函数名:`load_parquet_as_df`)。 | | `S3FileLister` | 可迭代的 DataPipe,列出具有给定前缀的 Amazon S3 文件 URL(函数名:`list_files_by_s3`)。 | | `S3FileLoader` | 可迭代的 DataPipe,从给定的 S3 URL 加载 Amazon S3 文件(函数名:`load_files_by_s3`)。 | | `Saver` | 接收元数据和数据元组的 DataPipe,将数据保存到由`filepath_fn`生成的目标路径和元数据中,并在本地文件系统上生成文件路径(函数名称:`save_to_disk`)。 | ## Mapping DataPipes 这些 DataPipes 将给定的函数应用于 DataPipe 中的每个元素。 | `BatchAsyncMapper` | 将源 DataPipe 中的元素组合成批次,并对每个批次中的每个元素并发地应用协程函数,然后将输出展平为单个、非嵌套的 IterDataPipe(函数名称:`async_map_batches`)。 | | --- | --- | | `BatchMapper` | 将源 DataPipe 中的元素组合成批次,并对每个批次应用函数,然后将输出展平为单个、非嵌套的 IterDataPipe(函数名称:`map_batches`)。 | | `FlatMapper` | 对源 DataPipe 中的每个项目应用函数,然后将输出展平为单个、非嵌套的 IterDataPipe(函数名称:`flatmap`)。 | | `Mapper` | 对源 DataPipe 中的每个项目应用函数(函数名称:`map`)。 | | `ShuffledFlatMapper` | 对源 DataPipe 中的每个项目应用函数,然后将返回的可迭代对象收集到缓冲区中,然后,在每次迭代时,随机选择缓冲区中的一个可迭代对象,并从该可迭代对象中产生一个项目(函数名称:`shuffled_flatmap`)。 | | `ThreadPoolMapper` | 并发地对源 DataPipe 中的每个项目应用函数,使用`ThreadPoolExecutor`(函数名称:`threadpool_map`)。 | ## 其他 DataPipes 一组具有不同功能的杂项 DataPipes。 | `DataFrameMaker` | 获取数据行,将其中一些数据批量处理并创建 TorchArrow 数据框(函数名称:`dataframe`)。 | | --- | --- | | `EndOnDiskCacheHolder` | 指示先前 DataPipe 的结果将保存在由`filepath_fn`指定的本地文件中(函数名称:`end_caching`)。 | | `FullSync` | 同步分布式进程中的数据,以防止训练过程中出现挂起,这是由不均匀的分片数据引起的(函数名称:`fullsync`)。 | | `HashChecker` | 计算并检查每个文件的哈希值,从文件名和数据/流的元组输入 DataPipe 中(函数名称:`check_hash`)。 | | `InMemoryCacheHolder` | 将来自源 DataPipe 的元素存储在内存中,如果指定了大小限制,则存储在内存中(功能名称:`in_memory_cache`)。 | | `IterableWrapper` | 包装可迭代对象以创建 IterDataPipe。 | | `LengthSetter` | 设置 DataPipe 的长度属性,该属性由`__len__`返回(功能名称:`set_length`)。 | | `MapToIterConverter` | 将`MapDataPipe`转换为`IterDataPipe`(功能名称:`to_iter_datapipe`)。 | | `OnDiskCacheHolder` | 将多个 DataPipe 操作的输出缓存到本地文件中,这些操作通常是性能瓶颈,如下载、解压等(功能名称:`on_disk_cache`)。 | | `PinMemory` | 预取源 DataPipe 中的一个元素并将其移动到固定内存中(功能名称:`pin_memory`)。 | | `Prefetcher` | 预取来自源 DataPipe 的元素并将它们放入缓冲区(功能名称:`prefetch`)。 | | `RandomSplitter` | 将源 DataPipe 中的样本随机分成组(功能名称:`random_split`)。 | | `ShardExpander` | 将传入的分片字符串扩展为分片。 | | `ShardingFilter` | 允许 DataPipe 被分片的包装器(功能名称:`sharding_filter`)。 | | `ShardingRoundRobinDispatcher` | 包装器,指示`DataPipe`图的前一部分是不可复制的,并且在使用多处理时将以循环方式将数据分发到工作进程中(功能名称:`sharding_round_robin_dispatcher`)。 | ## 选择 DataPipes 这些 DataPipes 帮助您在 DataPipe 中选择特定的样本。 | `Filter` | 根据输入的`filter_fn`从源 datapipe 中过滤出元素(功能名称:`filter`)。 | | --- | --- | | `Header` | 从源 DataPipe 中产生元素,直到达到指定的限制为止(功能名称:`header`)。 | | `Dropper` | 通过其索引在输入 DataPipe 中删除列/元素(功能名称:`drop`)。 | | `Slicer` | 通过起始/停止/步长或索引返回输入 DataPipe 中元素的切片(功能名称:`slice`)。 | | `Flattener` | 根据提供的索引,在每个样本/元素级别返回输入 DataPipe 的扁平副本(功能名称:`flatten`)。 | ## 文本 DataPipes 这些 DataPipes 帮助您解析、读取和转换文本文件和数据。 | `CSVDictParser` | 接受由文件名和 CSV 数据流元组组成的 DataPipe,逐行读取并返回 CSV 文件中的内容(功能名称:`parse_csv_as_dict`)。 | | --- | --- | | `CSVParser` | 接受由文件名和 CSV 数据流元组组成的 DataPipe,逐行读取并返回 CSV 文件中的内容(功能名称:`parse_csv`)。 | | `JsonParser` | 从 JSON 数据流中读取并产生一个由文件名和 JSON 数据组成的元组(功能名称:`parse_json_files`)。 | | `LineReader` | 接受由文件名和字符串数据流元组组成的 DataPipe,对流中的每一行,产生一个由文件名和该行组成的元组(功能名称:`readlines`)。 | | `ParagraphAggregator` | 将同一文件中的文本行聚合成一个段落(功能名称:`lines_to_paragraphs`)。 | | `RoutedDecoder` | 从输入 DataPipe 解码二进制流,以元组形式产生路径名和解码数据(功能名称:`routed_decode`)。 | | `Rows2Columnar` | 接受一个带有数据批次的输入 DataPipe,逐批处理并为每批产生一个字典,其中`column_names`作为键,每行对应值的列表作为值(功能名称:`rows2columnar`)。 | | `StreamReader` | 给定 IO 流及其标签名称,以元组形式产生带有标签名称的字节(功能名称:`read_from_stream`)。 |