producer_consumer.py 1.8 KB
Newer Older
F
feilong 已提交
1
# -*- coding: UTF-8 -*-
F
feilong 已提交
2
# 作者:huanhuilong
F
feilong 已提交
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
# 标题:Python 生产者消费者
# 描述:实用生产者消费者例子,使用 post 接口投递下载每个图片,下载完退出程序

from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from lock_counter import LockCounter
from image_download import download_img
import threading


class ImageDownloader:
    def __init__(self, worker_count, progress) -> None:
        self.queue = Queue(1024)
        self.event = threading.Event()
        self.worker_count = worker_count
        self.counter = LockCounter()
        self.progress = progress
        self.pool = ThreadPoolExecutor(max_workers=self.worker_count+2)

    def __consumer(self):
        while not self.event.is_set():
            if not self.queue.empty():
                url = self.queue.get()
                print(f"[consumer] get url:{url}")
                download_img(url)
                self.counter.step()
            else:
                print("[consumer] queue is empty, just wait")
                self.event.wait()

            if self.event.is_set():
                self.event.clear()

            if self.progress(self.counter.size()):
                print('done')
                self.event.set()
                break

    def post(self, url):
        print(f"[producer] put url:{url}")
        self.queue.put(url)
        self.event.set()

    def start(self):
        for i in range(0, self.worker_count):
            self.pool.submit(self.__consumer)


if __name__ == '__main__':
    urls = [
        "https://img-ask.csdnimg.cn/upload/1623844642974.jpg",
        "https://img-ask.csdn.net/upload/201510/22/1445491909_384819.jpg",
    ]

    downloader = ImageDownloader(3, lambda count: count == len(urls))
    downloader.start()
    for url in urls:
        downloader.post(url)