确保Python
已安装依赖项:pandas
和openpyxl
把需要下载的图片链接和修改后的名字放在Excel文件:Down.xlsx
pip install pandas openpyxl
把以下代码保存为download_images.py文件
:
import pandas as pd import os import requests from datetime import datetime import urllib.parse from pathlib import Path import time import random from concurrent.futures import ThreadPoolExecutor, wait from fake_useragent import UserAgent import logging from tenacity import retry, stop_after_attempt, wait_exponential # 设置日志 logging.basicConfig( filename='download_errors.log', level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s' ) # 设置下载目录 base_dir = r"D:\Down" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") download_dir = os.path.join(base_dir, f"Task_{timestamp}") os.makedirs(download_dir, exist_ok=True) # 写死 Excel 文件路径 excel_file = r"C:\Users\XXX\Downloads\Down.xlsx" # 初始化 User-Agent ua = UserAgent() # 可选代理设置(如果需要使用代理,取消注释并配置) # proxies = { # 'http': 'http://your_proxy:port', # 'https': 'https://your_proxy:port' # } # 下载单张图片 @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) def download_image(row, index, session): image_url = row['Image Src'] new_name = row['New Name'] if not image_url or not new_name: return index, '', 'Missing URL or Name' try: # 获取文件扩展名 parsed_url = urllib.parse.urlparse(image_url) ext = os.path.splitext(parsed_url.path)[1] if not ext: ext = '.jpg' new_filename = f"{new_name}{ext}" full_path = os.path.join(download_dir, new_filename) # 设置请求头 headers = {'User-Agent': ua.random} # 下载图片 response = session.get(image_url, timeout=10, headers=headers) # proxies=proxies if needed # 检查响应状态 if response.status_code == 200: with open(full_path, 'wb') as f: f.write(response.content) return index, full_path, 'Success' else: return index, '', f"HTTP {response.status_code}" except requests.exceptions.RequestException as e: logging.error(f"Failed to download {image_url}: {str(e)}") return index, '', f"Error: {str(e)}" except Exception as e: logging.error(f"Unexpected error for {image_url}: {str(e)}") return index, '', f"Error: {str(e)}" # 主处理函数 def process_downloads(): # 读取 Excel 文件 try: df = pd.read_excel(excel_file) except FileNotFoundError: print(f"Error: Excel file {excel_file} not found.") return # 确保所需列存在 required_columns = ['Image Src', 'New Name', 'Full Path', 'Status'] for col in required_columns: if col not in df.columns: df[col] = '' # 使用 Session 保持连接 with requests.Session() as session: # 使用线程池,最大10个并行任务 with ThreadPoolExecutor(max_workers=10) as executor: futures = [] # 明确使用列表 print(f"Initial type of futures: {type(futures)}") # 调试:初始类型 for index, row in df.iterrows(): # 调试:每次 append 前检查类型 if not isinstance(futures, list): print(f"Error: futures is not a list, type is {type(futures)}") return futures.append(executor.submit(download_image, row, index, session)) print(f"Appended future for index {index}, futures length: {len(futures)}") # 调试 # 如果达到10个并行任务,等待任意一个完成 if len(futures) >= 10: completed, uncompleted = wait( futures, return_when='FIRST_COMPLETED' ) futures = list(uncompleted) # 转换回列表 print(f"Completed {len(completed)} tasks, futures length: {len(futures)}") # 调试 # 随机延迟1-3秒 time.sleep(random.uniform(1, 3)) # 等待所有剩余任务完成 print("Waiting for remaining tasks...") for future in wait(futures)[0]: index, full_path, status = future.result() df.at[index, 'Full Path'] = full_path df.at[index, 'Status'] = status # 保存更新后的 Excel output_file = os.path.join(download_dir, "updated_excel.xlsx") df.to_excel(output_file, index=False) print(f"处理完成,更新后的 Excel 已保存至: {output_file}") if __name__ == "__main__": process_downloads()