import imaplib,email,os,time,xlrd,threading from datetime import datetime from email.header import decode_header from selenium import webdriver from concurrent.futures import ThreadPoolExecutor import requests from bs4 import BeautifulSoup def get_url_action(html_content): # 解析HTML soup = BeautifulSoup(html_content, 'html.parser') # 查找所有的链接 a links = soup.find_all('a') print('links',links) # 提取每个链接的 href 属性并请求这些链接 for link in links: href = link.get('href') if href and href.startswith('http'): print('href\n',href) try: # 请求链接 link_response = requests.get(href) print(f'Successfully requested URL: {href} with status code: {link_response.status_code}') except requests.exceptions.RequestException as e: print(f'Failed to request URL: {href} with error: {e}') # 查找所有的链接 img links = soup.find_all('img') print('links',links) # 提取每个链接的 href 属性并请求这些链接 for link in links: href = link.get('src') if href and href.startswith('http'): print('src\n',href) try: # 请求链接 link_response = requests.get(href) print(f'Successfully requested URL: {href} with status code: {link_response.status_code}') except requests.exceptions.RequestException as e: print(f'Failed to request URL: {href} with error: {e}') def get_folder_name(): # localtime localtime = time.localtime() timestamp_str = "%s-%s-%s" % ( localtime.tm_hour, localtime.tm_min, localtime.tm_sec, ) # folder name cur_day = "%s_%s_%s" % (localtime.tm_year, localtime.tm_mon, localtime.tm_mday) return cur_day+'_'+timestamp_str def only_letters(s): return ''.join([char for char in s if char.isalpha()]) class Email_parse: def __init__(self,imap_protocol,port, email_address, password, category="promotions"): self.imap_protocol = imap_protocol self.email_address = email_address self.password = password self.category = category self.port=port def gmail_read(self): try: # 连接到IMAP服务器 mail = imaplib.IMAP4_SSL(self.imap_protocol, self.port) mail.login(self.email_address, self.password) # 选择收件箱 mail.select("INBOX") # 搜索所有未读邮件 # status, messages = mail.search(None, 'ALL') # status, messages = mail.search(None, 'FROM "Golden Spatula"') # search_criteria = '(OR FROM "Golden Spatula" FROM "uco new year" FROM "UCOPLAYSERVICE")' search_criteria = '(FROM "RIDER")' # search_criteria = 'ALL' status, messages = mail.search(None, search_criteria) mail_ids = messages[0].split() # 倒序排列邮件ID # mail_ids.reverse() count=0 # 处理每封未读邮件 前50个邮件 for mail_id in mail_ids: count+=1 if count>200: break # 获取邮件 # 数据 status, msg_data = mail.fetch(mail_id, "(RFC822)") raw_email = msg_data[0][1] # 解析邮件 msg = email.message_from_bytes(raw_email) # 获取邮件主题并解码 subject, encoding = decode_header(msg["Subject"])[0] if isinstance(subject, bytes): subject = subject.decode(encoding if encoding else "utf-8") # # folder name folder_name = global_folder_name dir = 'emails' + '/' + folder_name+'/'+ self.email_address os.makedirs(dir, exist_ok=True) html_name= only_letters(str(subject)) # 创建文件名 file_name = dir+'/' + html_name + ".html" html_content='' # 打开文件写入邮件内容 with open(file_name, "w", encoding="utf-8") as f: # 写入基本信息 f.write( '') html_content+= '' # 获取邮件正文 if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() content_disposition = str(part.get("Content-Disposition")) # 跳过附件 if "attachment" in content_disposition: continue if content_type == "text/html": body = part.get_payload(decode=True).decode() f.write(body) elif content_type == "text/plain": body = part.get_payload(decode=True).decode() f.write(f"
{body}")
else:
body = msg.get_payload(decode=True).decode()
print('body', body)
f.write(f"{body}")
html_content+= f"{body}"
# 写入基本信息
f.write('')
html_content+= ''
# 获取邮件发件人
from_ = msg.get("From")
# 打印邮件信息
print(f"email: {self.email_address}")
print(f"主题 Subject: {subject}")
print(f"来源 From: {from_}")
file_name_new = file_name.replace('/', '\\')
file_path = os.getcwd() + '\\' + file_name_new
print('file_path\t',file_path)
get_url_action(html_content)
# chrome option 静默打开浏览器
# options = webdriver.ChromeOptions()
# options.add_experimental_option("detach", True)
# options.add_argument("--headless")
# driver = webdriver.Chrome(options=options)
# driver.get(file_path)
# # 循环将滚动条下拉
# driver.execute_script("window.scrollBy(0,1000)")
# time.sleep(2)
# driver.close()
# 标记邮件为已读
mail.store(mail_id, '+FLAGS', '\Seen')
calc_count_time()
print('time.sleep delay 2 s')
time.sleep(2)
# 关闭连接
mail.close()
mail.logout()
except Exception as e:
print('ERROR EMAIL\t',self.email_address,'\te\t',e)
def calc_count_time():
endTime = datetime.now()
diffSeconds = (endTime - startTime).seconds
diffTime = str(diffSeconds) + "s"
if diffSeconds >= 60 and diffSeconds < 3600:
diffMinutes = (
str(int(diffSeconds / 60)) + "min " + str(int(diffSeconds % 60)) + "s"
)
diffTime = diffMinutes
elif diffSeconds >= 3600:
afterSeconds = diffSeconds % 3600
diffHours = (
str(int(diffSeconds / 3600))
+ "h "
+ str(int(afterSeconds / 60))
+ "min "
+ str(int(afterSeconds % 60))
+ "s"
)
diffTime = diffHours
print("运行时间:diffTime\t" + str(diffTime))
def read_email_action(imap_protocol, port, email_address, password):
demo = Email_parse(imap_protocol, port, email_address, password)
demo.gmail_read()
def read_sheet(data_xls,shee_name,email_list,app_code_list):
sheet_gmail = data_xls.sheet_by_name(shee_name) # 拿出Gmail工作表
print(sheet_gmail)
count_nrows = sheet_gmail.nrows # 行数
count_nclom = sheet_gmail.ncols # 列数
print(count_nclom, count_nrows)
for i in range(3, count_nrows): # 遍历行列 从第4行 开始
gmail_address=sheet_gmail.cell(i, 0).value
gmail_code=sheet_gmail.cell(i, 2).value
gmail_status=sheet_gmail.cell(i, 4).value
print(gmail_address,gmail_code,gmail_status) # 拿出数据 是否授权登录
if gmail_status == '是':
email_list.append(gmail_address)
app_code_list.append(gmail_code)
print('有效 email',gmail_address, gmail_code, gmail_status) # 拿出数据 是否授权登录
def read_excel_email():
email_list=[]
app_code_list=[]
data_xls = xlrd.open_workbook("./Warmup预热邮箱采集-批量.xls")
read_sheet(data_xls,'Gmail',email_list,app_code_list)
# read_sheet(data_xls,'Outlook',email_list,app_code_list)
read_sheet(data_xls,'QQ',email_list,app_code_list)
read_sheet(data_xls,'163',email_list,app_code_list)
print('email_list',email_list)
# # 使用线程池来并发处理多个账户 50
with ThreadPoolExecutor(max_workers=80) as executor:
for email_address, password in zip(email_list, app_code_list):
print(' email_address, password ', email_address, password )
if email_address.endswith('gmail.com'):
executor.submit(read_email_action, 'imap.gmail.com', 993, email_address, password)
elif email_address.endswith('qq.com'):
executor.submit(read_email_action, 'imap.qq.com', 993, email_address, password)
elif email_address.endswith('163.com'):
executor.submit(read_email_action, 'imap.163.com', 993, email_address, password)
elif email_address.endswith('outlook.com'):
executor.submit(read_email_action, 'imap.outlook.com', 993, email_address, password)
# gmail
calc_count_time()
if __name__ == "__main__":
startTime = datetime.now()
global_folder_name=get_folder_name()
threads_event=[]
read_excel_email()