提交 ee6bdd1e 编写于 作者: Q qq_38870145

Mon Aug 25 18:24:00 CST 2025 inscode

上级 03fcb3f7
此差异已折叠。
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
import os
import json
with open('emailData.json', 'r', encoding='utf-8') as file: # 指定编码避免乱码
data = json.load(file) # 解析为Python字典/列表
def run():
global driver
options = webdriver.ChromeOptions()
options.add_experimental_option("detach", True)
# 步骤1获取到的User Data路径 chrome://version
options.add_argument(r'--user-data-dir=C:\Users\v_bymyma\AppData\Local\Google\Chrome\User Data\Profile 5')
# 步骤2获取到的--profile-directory值
options.add_argument("--profile-directory=auto_py")
options.add_argument("--remote-debugging-port=9222")
options.add_argument('--disable-infobars')
options.add_argument('--disable-blink-features=AutomationControlled')
# driver_path = "C:\\Users\\v_bymyma\PycharmProjects\pythonProject\email\driver\chromedriver.exe"
driver = webdriver.Chrome(options=options)
driver.get(data[13])
time.sleep(30)
# htmlText
#document.querySelector('div[class="monaco-editor no-user-select showUnused showDeprecated vs"]').querySelector('div[class="monaco-scrollable-element editor-scrollable vs"]').innerText
# htmlText=driver.execute_script('return document.querySelector(\'div[class="view-lines monaco-mouse-cursor-text"]\').innerText;')
# # htmlText=driver.find_element(By.XPATH,'//div[@class="monaco-scrollable-element editor-scrollable vs"]')
# js='''document.querySelector('div[class="monaco-editor no-user-select showUnused showDeprecated vs"]').querySelector('div[class="monaco-scrollable-element editor-scrollable vs"]').innerText;'''
# js='''document.querySelector('div[class="monaco-editor no-user-select showUnused showDeprecated vs"]')?.innerText;'''
# js='''document.querySelector('div[id="__next"]')?.innerHtml;'''
# htmlText=driver.execute_script('return {js}'.format(js=js))
# print('htmlText',htmlText)
try:
# iframeHandle = driver.find_element(By.XPATH, '//iframe[@name="parcel-embed"]')
# 切到frame
driver.switch_to.frame('parcel-embed')
time.sleep(20)
htmlHandle=driver.find_element(By.XPATH, '//div[@class="monaco-scrollable-element editor-scrollable vs"]')
time.sleep(2)
run_js_content='''const editorLines = document.querySelector('div.view-lines.monaco-mouse-cursor-text');
if (editorLines) {
editorLines.scrollTop = editorLines.scrollHeight;
} else {
console.error('未找到 .view-lines.monaco-mouse-cursor-text 元素');
}'''
driver.execute_script(run_js_content)
print('iframeHandle', htmlHandle)
print('iframeHandle.text', htmlHandle.text)
time.sleep(2)
# print('htmlHandle', htmlHandle)
# 打开文件写入邮件内容
with open('were-crowdfunding.html', "w", encoding="utf-8") as f:
f.write(htmlHandle.text)
except Exception as e:
print(e)
time.sleep(60)
driver.quit()
if __name__ == '__main__':
run()
\ No newline at end of file
const emailListDoms=document.querySelectorAll('div[data-testid="email grid item"]')
const hrefList=[]
// href="/emails/astro-bot-has-arrived"
for(let i=0;i<emailListDoms.length;++i){
const aDom=emailListDoms[i].querySelector('a')
if(aDom){
const href=aDom.getAttribute('href')
console.log('href',href)
hrefList.push(href)
}
}
// https://www.reallygoodemails.com/emails/astro-bot-has-arrived/live
// document.querySelector('div[class="view-lines monaco-mouse-cursor-text"]').innerText
const liveHrefList=[]
hrefList.forEach(href=>{
const liveHref=`https://www.reallygoodemails.com${href}/live`
liveHrefList.push(liveHref)
})
liveHrefList
\ No newline at end of file
# -*- coding: utf-8 -*- # 推荐格式,兼容编辑器如 Emacs/Vim [2,8,9](@ref)
import imaplib,email,os,time,xlrd,threading
from datetime import datetime, timedelta
from email.header import decode_header
from selenium import webdriver
from concurrent.futures import ThreadPoolExecutor
import requests,csv
from bs4 import BeautifulSoup
def get_url_action(html_content):
# 解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 查找所有的链接 a
links = soup.find_all('a')
print('links',links)
# 提取每个链接的 href 属性并请求这些链接
for link in links:
href = link.get('href')
if href and href.startswith('http'):
print('href\n',href)
try:
# 请求链接
link_response = requests.get(href)
print(f'Successfully requested URL: {href} with status code: {link_response.status_code}')
except requests.exceptions.RequestException as e:
print(f'Failed to request URL: {href} with error: {e}')
# 查找所有的链接 img
links = soup.find_all('img')
print('links',links)
# 提取每个链接的 href 属性并请求这些链接
for link in links:
href = link.get('src')
if href and href.startswith('http'):
print('src\n',href)
try:
# 请求链接
link_response = requests.get(href)
print(f'Successfully requested URL: {href} with status code: {link_response.status_code}')
except requests.exceptions.RequestException as e:
print(f'Failed to request URL: {href} with error: {e}')
def get_folder_name():
# localtime
localtime = time.localtime()
timestamp_str = "%s-%s-%s" % (
localtime.tm_hour,
localtime.tm_min,
localtime.tm_sec,
)
# folder name
cur_day = "%s_%s_%s" % (localtime.tm_year, localtime.tm_mon, localtime.tm_mday)
return cur_day+'_'+timestamp_str
def only_letters(s):
return ''.join([char for char in s if char.isalpha()])
class Email_parse:
def __init__(self,imap_protocol,port, email_address, password, category="promotions"):
self.imap_protocol = imap_protocol
self.email_address = email_address
self.password = password
self.category = category
self.port=port
def gmail_read(self):
try:
# 连接到IMAP服务器
mail = imaplib.IMAP4_SSL(self.imap_protocol, self.port)
mail.login(self.email_address, self.password)
# 选择收件箱
mail.select("INBOX")
# 搜索所有未读邮件
# status, messages = mail.search(None, 'ALL')
# status, messages = mail.search(None, 'FROM "Golden Spatula"')
# search_criteria = '(OR FROM "Golden Spatula" FROM "uco new year" FROM "UCOPLAYSERVICE")'
search_criteria = '(FROM "PLAYCFL")'
now = datetime.now() # 获取当前时间对象
year = now.year # 当前年份(如2025)
month = now.month # 当前月份(1-12)
day = now.day # 当前日期(1-31)
start = datetime(year, 8, 8).strftime("%d-%b-%Y")
end = (datetime(year, month, day) + timedelta(days=1)).strftime("%d-%b-%Y")
search_criteria = f'(SINCE "{start}" BEFORE "{end}" FROM "dragonheir-global.com")'
# search_criteria = 'ALL'
status, messages = mail.search(None, search_criteria)
mail_ids = messages[0].split()
# 倒序排列邮件ID
mail_ids.reverse()
count=0
# 处理每封未读邮件 前50个邮件
for mail_id in mail_ids:
count+=1
if count>500000:
break
# 获取邮件
# 数据
status, msg_data = mail.fetch(mail_id, "(RFC822)")
raw_email = msg_data[0][1]
# 解析邮件
msg = email.message_from_bytes(raw_email)
# 获取邮件主题并解码
subject, encoding = decode_header(msg["Subject"])[0]
if isinstance(subject, bytes):
subject = subject.decode(encoding if encoding else "utf-8")
#
# folder name
folder_name = global_folder_name
dir = 'emails' + '/' + folder_name+'/'+ self.email_address
os.makedirs(dir, exist_ok=True)
html_name= only_letters(str(subject))+str( int(time.time() * 1000) )
# 创建文件名
file_name = dir+'/' + html_name + ".html"
html_content=''
# 打开文件写入邮件内容
with open(file_name, "w", encoding="utf-8") as f:
# 写入基本信息
f.write(
'<!DOCTYPE html><html lang="en" dir="auto" xmlns="http://www.w3.org/1999/xhtml" xmlns:v="urn:schemas-microsoft-com:vml" xmlns:o="urn:schemas-microsoft-com:office:office" >')
html_content+= '<!DOCTYPE html><html lang="en" dir="auto" xmlns="http://www.w3.org/1999/xhtml" xmlns:v="urn:schemas-microsoft-com:vml" xmlns:o="urn:schemas-microsoft-com:office:office" >'
# 获取邮件正文
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
# 跳过附件
if "attachment" in content_disposition:
continue
if content_type == "text/html":
body = part.get_payload(decode=True).decode()
f.write(body)
elif content_type == "text/plain":
body = part.get_payload(decode=True).decode()
f.write(f"<pre>{body}</pre>")
else:
body = msg.get_payload(decode=True).decode()
print('body', body)
f.write(f"<pre>{body}</pre>")
html_content+= f"<pre>{body}</pre>"
# 写入基本信息
f.write('</html>')
html_content+= '</html>'
# 获取邮件发件人
from_ = msg.get("From")
# 打印邮件信息
print(f"email: {self.email_address}")
print(f"主题 Subject: {subject}")
print(f"来源 From: {from_}")
file_name_new = file_name.replace('/', '\\')
file_path = os.getcwd() + '\\' + file_name_new
print('file_path\t',file_path)
get_url_action(html_content)
# chrome option 静默打开浏览器
# options = webdriver.ChromeOptions()
# options.add_experimental_option("detach", True)
# options.add_argument("--headless")
# driver = webdriver.Chrome(options=options)
# driver.get(file_path)
# # 循环将滚动条下拉
# driver.execute_script("window.scrollBy(0,1000)")
# time.sleep(2)
# driver.close()
# 标记邮件为已读
mail.store(mail_id, '+FLAGS', '\Seen')
calc_count_time()
print('time.sleep delay 2 s')
time.sleep(2)
# 关闭连接
mail.close()
mail.logout()
except Exception as e:
print('ERROR EMAIL\t',self.email_address,'\te\t',e)
def calc_count_time():
endTime = datetime.now()
diffSeconds = (endTime - startTime).seconds
diffTime = str(diffSeconds) + "s"
if diffSeconds >= 60 and diffSeconds < 3600:
diffMinutes = (
str(int(diffSeconds / 60)) + "min " + str(int(diffSeconds % 60)) + "s"
)
diffTime = diffMinutes
elif diffSeconds >= 3600:
afterSeconds = diffSeconds % 3600
diffHours = (
str(int(diffSeconds / 3600))
+ "h "
+ str(int(afterSeconds / 60))
+ "min "
+ str(int(afterSeconds % 60))
+ "s"
)
diffTime = diffHours
print("运行时间:diffTime\t" + str(diffTime))
def read_email_action(imap_protocol, port, email_address, password):
demo = Email_parse(imap_protocol, port, email_address, password)
demo.gmail_read()
def read_sheet(data_xls,shee_name,email_list,app_code_list):
sheet_gmail = data_xls.sheet_by_name(shee_name) # 拿出Gmail工作表
print(sheet_gmail)
count_nrows = sheet_gmail.nrows # 行数
count_nclom = sheet_gmail.ncols # 列数
print(count_nclom, count_nrows)
for i in range(3, count_nrows): # 遍历行列 从第4行 开始
gmail_address=sheet_gmail.cell(i, 0).value
gmail_code=sheet_gmail.cell(i, 2).value
gmail_status=sheet_gmail.cell(i, 4).value
print(gmail_address,gmail_code,gmail_status) # 拿出数据 是否授权登录
if gmail_status == '是':
email_list.append(gmail_address)
app_code_list.append(gmail_code)
print('有效 email',gmail_address, gmail_code, gmail_status) # 拿出数据 是否授权登录
def readCsvEmailData(veryfilEmail):
with open('email.csv', 'r', newline='', encoding='utf-8') as file:
csv_reader = csv.reader(file)
headers = next(csv_reader) # 读取标题行(可选)
for row in csv_reader: # 逐行遍历
email = row[0]
pasword = row[1]
appcode = row[2]
print(email, appcode)
veryfilEmail.append([email,appcode])
def read_excel_email():
veryfilEmail=[]
readCsvEmailData(veryfilEmail)
# # 使用线程池来并发处理多个账户 50
with ThreadPoolExecutor(max_workers=80) as executor:
for email_address, password in veryfilEmail:
print(' email_address, password ', email_address, password)
if email_address.endswith('gmail.com'):
executor.submit(read_email_action, 'imap.gmail.com', 993, email_address, password)
elif email_address.endswith('qq.com'):
executor.submit(read_email_action, 'imap.qq.com', 993, email_address, password)
elif email_address.endswith('163.com'):
executor.submit(read_email_action, 'imap.163.com', 993, email_address, password)
elif email_address.endswith('outlook.com'):
executor.submit(read_email_action, 'imap.outlook.com', 993, email_address, password)
calc_count_time()
if __name__ == "__main__":
startTime = datetime.now()
global_folder_name=get_folder_name()
threads_event=[]
read_excel_email()
import imaplib,email,os,time,xlrd,threading,json,csv
from datetime import datetime
from email.header import decode_header
from selenium import webdriver
from concurrent.futures import ThreadPoolExecutor
class Email_parse:
def __init__(self,imap_protocol,port, email_address, password, category="promotions"):
self.imap_protocol = imap_protocol
self.email_address = email_address
self.password = password
self.category = category
self.port=port
def gmail_read(self):
try:
# 存储code
email_store[self.email_address]=self.password
# 连接到IMAP服务器
mail = imaplib.IMAP4_SSL(self.imap_protocol, self.port)
mail.login(self.email_address, self.password)
# 选择收件箱
mail.select("INBOX")
# 关闭连接
mail.close()
mail.logout()
success_email.append(self.email_address)
time.sleep(2)
except Exception as e:
error_email.append(self.email_address)
print(e)
def calc_count_time():
endTime = datetime.now()
diffSeconds = (endTime - startTime).seconds
diffTime = str(diffSeconds) + "s"
if diffSeconds >= 60 and diffSeconds < 3600:
diffMinutes = (
str(int(diffSeconds / 60)) + "min " + str(int(diffSeconds % 60)) + "s"
)
diffTime = diffMinutes
elif diffSeconds >= 3600:
afterSeconds = diffSeconds % 3600
diffHours = (
str(int(diffSeconds / 3600))
+ "h "
+ str(int(afterSeconds / 60))
+ "min "
+ str(int(afterSeconds % 60))
+ "s"
)
diffTime = diffHours
print("运行时间:diffTime\t" + str(diffTime))
def read_email_action(imap_protocol, port, email_address, password):
demo = Email_parse(imap_protocol, port, email_address, password)
demo.gmail_read()
# 收件箱 json 日志 文件 单独
def genInboxJsonFileName():
dirs = "./json/" + 'api_log'
if not os.path.exists(dirs):
os.makedirs(dirs)
return dirs + "/" + 'verify_log' + "_inbox.json"
# 写收件箱 json日志
def write_inbox_json(data):
with open(genInboxJsonFileName(), "w", encoding="utf-8") as file:
# 使用 json.dump() 将字典写入文件
json.dump(data, file, ensure_ascii=False, indent=4)
def read_sheet(data_xls,shee_name,email_list,app_code_list):
sheet_gmail = data_xls.sheet_by_name(shee_name) # 拿出Gmail工作表
print(sheet_gmail)
count_nrows = sheet_gmail.nrows # 行数
count_nclom = sheet_gmail.ncols # 列数
print(count_nclom, count_nrows)
for i in range(3, count_nrows): # 遍历行列 从第4行 开始
gmail_address=sheet_gmail.cell(i, 0).value
gmail_password=sheet_gmail.cell(i, 1).value
email_password_store[gmail_address]=gmail_password
gmail_code=sheet_gmail.cell(i, 2).value
gmail_status=sheet_gmail.cell(i, 4).value
print(gmail_address,gmail_code,gmail_status) # 拿出数据 是否授权登录
if gmail_status == '是':
email_list.append(gmail_address)
app_code_list.append(gmail_code)
print('注册的 email',gmail_address, gmail_code, gmail_status) # 拿出数据 是否授权登录
def read_excel_email():
email_list=[]
app_code_list=[]
data_xls = xlrd.open_workbook("./Warmup预热邮箱采集-批量.xls")
read_sheet(data_xls,'Gmail',email_list,app_code_list)
# read_sheet(data_xls,'Outlook',email_list,app_code_list)
read_sheet(data_xls,'QQ',email_list,app_code_list)
read_sheet(data_xls,'163',email_list,app_code_list)
print('email_list',email_list)
# # 使用线程池来并发处理多个账户 50
with ThreadPoolExecutor(max_workers=80) as executor:
for email_address, password in zip(email_list, app_code_list):
print(' email_address, password ', email_address, password )
if email_address.endswith('gmail.com'):
executor.submit(read_email_action, 'imap.gmail.com', 993, email_address, password)
elif email_address.endswith('qq.com'):
executor.submit(read_email_action, 'imap.qq.com', 993, email_address, password)
elif email_address.endswith('163.com'):
executor.submit(read_email_action, 'imap.163.com', 993, email_address, password)
elif email_address.endswith('outlook.com'):
executor.submit(read_email_action, 'imap.outlook.com', 993, email_address, password)
# gmail
calc_count_time()
# localtime
localtime = time.localtime()
# folder name
folder_name = "%s_%s_%s" % (localtime.tm_year, localtime.tm_mon, localtime.tm_mday)
global success_email
success_email=list(set(success_email))
global error_email
error_email=list(set(error_email))
write_inbox_json({
'successEmail':success_email,
'successEmailNum':len(success_email),
'errorEmail': error_email,
'errorEmailNum': len(error_email),
})
write_csv()
print(error_email)
def write_csv():
# 1.创建文件对象
f = open('email__verify_有效20250819——1.csv', 'w', encoding='utf-8-sig', newline="")
# 2.基于文件对象构建csv写入对象
csv_write = csv.writer(f)
# 3.构建列表头
csv_write.writerow(['有效Email','密码', 'app code','是否有效'])
for e in success_email:
csv_write.writerow([e,email_password_store[e], email_store[e], '有效'])
# 1.创建文件对象
f = open('email__verify_无效20250819——1.csv', 'w', encoding='utf-8-sig', newline="")
# 2.基于文件对象构建csv写入对象
csv_write = csv.writer(f)
# 3.构建列表头
csv_write.writerow(['无效Email','密码', 'app code','是否有效'])
for e in error_email:
csv_write.writerow([e,email_password_store[e], email_store[e], '无效'])
# 1.创建文件对象
f = open('email__verify_所有20250819——1.csv', 'w', encoding='utf-8-sig', newline="")
# 2.基于文件对象构建csv写入对象
csv_write = csv.writer(f)
# 3.构建列表头
csv_write.writerow(['所有Email','密码', 'app code','是否有效'])
for e in success_email:
csv_write.writerow([e,email_password_store[e], email_store[e], '有效'])
for e in error_email:
csv_write.writerow([e, email_password_store[e],email_store[e], '无效'])
print('所有邮箱数量:',len(success_email)+len(error_email))
print('成功邮箱数量:',len(success_email))
print('失败邮箱数量:',len(error_email))
total=len(success_email)+len(error_email)
print('无效有效占比:',len(error_email)/total)
print('有效有效占比:',len(success_email)/total)
if __name__ == "__main__":
startTime = datetime.now()
success_email=[]
error_email=[]
email_store={}
email_password_store={}
threads_event=[]
read_excel_email()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册