Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
yma16
auto_python
提交
efc92edc
A
auto_python
项目概览
yma16
/
auto_python
与 Fork 源项目一致
Fork自
inscode / Python
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
A
auto_python
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
efc92edc
编写于
4月 01, 2025
作者:
Q
qq_38870145
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Tue Apr 1 15:15:35 CST 2025 inscode
上级
0a575cc3
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
439 addition
and
0 deletion
+439
-0
gmail.py
gmail.py
+249
-0
outlook.py
outlook.py
+190
-0
未找到文件。
gmail.py
0 → 100644
浏览文件 @
efc92edc
import
imaplib
,
email
,
os
,
time
,
xlrd
,
threading
from
datetime
import
datetime
from
email.header
import
decode_header
from
selenium
import
webdriver
from
concurrent.futures
import
ThreadPoolExecutor
import
requests
from
bs4
import
BeautifulSoup
def
get_url_action
(
html_content
):
# 解析HTML
soup
=
BeautifulSoup
(
html_content
,
'html.parser'
)
# 查找所有的链接 a
links
=
soup
.
find_all
(
'a'
)
print
(
'links'
,
links
)
# 提取每个链接的 href 属性并请求这些链接
for
link
in
links
:
href
=
link
.
get
(
'href'
)
if
href
and
href
.
startswith
(
'http'
):
print
(
'href
\n
'
,
href
)
try
:
# 请求链接
link_response
=
requests
.
get
(
href
)
print
(
f
'Successfully requested URL:
{
href
}
with status code:
{
link_response
.
status_code
}
'
)
except
requests
.
exceptions
.
RequestException
as
e
:
print
(
f
'Failed to request URL:
{
href
}
with error:
{
e
}
'
)
# 查找所有的链接 img
links
=
soup
.
find_all
(
'img'
)
print
(
'links'
,
links
)
# 提取每个链接的 href 属性并请求这些链接
for
link
in
links
:
href
=
link
.
get
(
'src'
)
if
href
and
href
.
startswith
(
'http'
):
print
(
'src
\n
'
,
href
)
try
:
# 请求链接
link_response
=
requests
.
get
(
href
)
print
(
f
'Successfully requested URL:
{
href
}
with status code:
{
link_response
.
status_code
}
'
)
except
requests
.
exceptions
.
RequestException
as
e
:
print
(
f
'Failed to request URL:
{
href
}
with error:
{
e
}
'
)
def
get_folder_name
():
# localtime
localtime
=
time
.
localtime
()
timestamp_str
=
"%s-%s-%s"
%
(
localtime
.
tm_hour
,
localtime
.
tm_min
,
localtime
.
tm_sec
,
)
# folder name
cur_day
=
"%s_%s_%s"
%
(
localtime
.
tm_year
,
localtime
.
tm_mon
,
localtime
.
tm_mday
)
return
cur_day
+
'_'
+
timestamp_str
def
only_letters
(
s
):
return
''
.
join
([
char
for
char
in
s
if
char
.
isalpha
()])
class
Email_parse
:
def
__init__
(
self
,
imap_protocol
,
port
,
email_address
,
password
,
category
=
"promotions"
):
self
.
imap_protocol
=
imap_protocol
self
.
email_address
=
email_address
self
.
password
=
password
self
.
category
=
category
self
.
port
=
port
def
gmail_read
(
self
):
try
:
# 连接到IMAP服务器
mail
=
imaplib
.
IMAP4_SSL
(
self
.
imap_protocol
,
self
.
port
)
mail
.
login
(
self
.
email_address
,
self
.
password
)
# 选择收件箱
mail
.
select
(
"INBOX"
)
# 搜索所有未读邮件
# status, messages = mail.search(None, 'ALL')
# status, messages = mail.search(None, 'FROM "Golden Spatula"')
# search_criteria = '(OR FROM "Golden Spatula" FROM "uco new year" FROM "UCOPLAYSERVICE")'
search_criteria
=
'(FROM "RIDER")'
# search_criteria = 'ALL'
status
,
messages
=
mail
.
search
(
None
,
search_criteria
)
mail_ids
=
messages
[
0
].
split
()
# 倒序排列邮件ID
# mail_ids.reverse()
count
=
0
# 处理每封未读邮件 前50个邮件
for
mail_id
in
mail_ids
:
count
+=
1
if
count
>
200
:
break
# 获取邮件
# 数据
status
,
msg_data
=
mail
.
fetch
(
mail_id
,
"(RFC822)"
)
raw_email
=
msg_data
[
0
][
1
]
# 解析邮件
msg
=
email
.
message_from_bytes
(
raw_email
)
# 获取邮件主题并解码
subject
,
encoding
=
decode_header
(
msg
[
"Subject"
])[
0
]
if
isinstance
(
subject
,
bytes
):
subject
=
subject
.
decode
(
encoding
if
encoding
else
"utf-8"
)
#
# folder name
folder_name
=
global_folder_name
dir
=
'emails'
+
'/'
+
folder_name
+
'/'
+
self
.
email_address
os
.
makedirs
(
dir
,
exist_ok
=
True
)
html_name
=
only_letters
(
str
(
subject
))
# 创建文件名
file_name
=
dir
+
'/'
+
html_name
+
".html"
html_content
=
''
# 打开文件写入邮件内容
with
open
(
file_name
,
"w"
,
encoding
=
"utf-8"
)
as
f
:
# 写入基本信息
f
.
write
(
'<!DOCTYPE html><html lang="en" dir="auto" xmlns="http://www.w3.org/1999/xhtml" xmlns:v="urn:schemas-microsoft-com:vml" xmlns:o="urn:schemas-microsoft-com:office:office" >'
)
html_content
+=
'<!DOCTYPE html><html lang="en" dir="auto" xmlns="http://www.w3.org/1999/xhtml" xmlns:v="urn:schemas-microsoft-com:vml" xmlns:o="urn:schemas-microsoft-com:office:office" >'
# 获取邮件正文
if
msg
.
is_multipart
():
for
part
in
msg
.
walk
():
content_type
=
part
.
get_content_type
()
content_disposition
=
str
(
part
.
get
(
"Content-Disposition"
))
# 跳过附件
if
"attachment"
in
content_disposition
:
continue
if
content_type
==
"text/html"
:
body
=
part
.
get_payload
(
decode
=
True
).
decode
()
f
.
write
(
body
)
elif
content_type
==
"text/plain"
:
body
=
part
.
get_payload
(
decode
=
True
).
decode
()
f
.
write
(
f
"<pre>
{
body
}
</pre>"
)
else
:
body
=
msg
.
get_payload
(
decode
=
True
).
decode
()
print
(
'body'
,
body
)
f
.
write
(
f
"<pre>
{
body
}
</pre>"
)
html_content
+=
f
"<pre>
{
body
}
</pre>"
# 写入基本信息
f
.
write
(
'</html>'
)
html_content
+=
'</html>'
# 获取邮件发件人
from_
=
msg
.
get
(
"From"
)
# 打印邮件信息
print
(
f
"email:
{
self
.
email_address
}
"
)
print
(
f
"主题 Subject:
{
subject
}
"
)
print
(
f
"来源 From:
{
from_
}
"
)
file_name_new
=
file_name
.
replace
(
'/'
,
'
\\
'
)
file_path
=
os
.
getcwd
()
+
'
\\
'
+
file_name_new
print
(
'file_path
\t
'
,
file_path
)
get_url_action
(
html_content
)
# chrome option 静默打开浏览器
# options = webdriver.ChromeOptions()
# options.add_experimental_option("detach", True)
# options.add_argument("--headless")
# driver = webdriver.Chrome(options=options)
# driver.get(file_path)
# # 循环将滚动条下拉
# driver.execute_script("window.scrollBy(0,1000)")
# time.sleep(2)
# driver.close()
# 标记邮件为已读
mail
.
store
(
mail_id
,
'+FLAGS'
,
'\Seen'
)
calc_count_time
()
print
(
'time.sleep delay 2 s'
)
time
.
sleep
(
2
)
# 关闭连接
mail
.
close
()
mail
.
logout
()
except
Exception
as
e
:
print
(
'ERROR EMAIL
\t
'
,
self
.
email_address
,
'
\t
e
\t
'
,
e
)
def
calc_count_time
():
endTime
=
datetime
.
now
()
diffSeconds
=
(
endTime
-
startTime
).
seconds
diffTime
=
str
(
diffSeconds
)
+
"s"
if
diffSeconds
>=
60
and
diffSeconds
<
3600
:
diffMinutes
=
(
str
(
int
(
diffSeconds
/
60
))
+
"min "
+
str
(
int
(
diffSeconds
%
60
))
+
"s"
)
diffTime
=
diffMinutes
elif
diffSeconds
>=
3600
:
afterSeconds
=
diffSeconds
%
3600
diffHours
=
(
str
(
int
(
diffSeconds
/
3600
))
+
"h "
+
str
(
int
(
afterSeconds
/
60
))
+
"min "
+
str
(
int
(
afterSeconds
%
60
))
+
"s"
)
diffTime
=
diffHours
print
(
"运行时间:diffTime
\t
"
+
str
(
diffTime
))
def
read_email_action
(
imap_protocol
,
port
,
email_address
,
password
):
demo
=
Email_parse
(
imap_protocol
,
port
,
email_address
,
password
)
demo
.
gmail_read
()
def
read_sheet
(
data_xls
,
shee_name
,
email_list
,
app_code_list
):
sheet_gmail
=
data_xls
.
sheet_by_name
(
shee_name
)
# 拿出Gmail工作表
print
(
sheet_gmail
)
count_nrows
=
sheet_gmail
.
nrows
# 行数
count_nclom
=
sheet_gmail
.
ncols
# 列数
print
(
count_nclom
,
count_nrows
)
for
i
in
range
(
3
,
count_nrows
):
# 遍历行列 从第4行 开始
gmail_address
=
sheet_gmail
.
cell
(
i
,
0
).
value
gmail_code
=
sheet_gmail
.
cell
(
i
,
2
).
value
gmail_status
=
sheet_gmail
.
cell
(
i
,
4
).
value
print
(
gmail_address
,
gmail_code
,
gmail_status
)
# 拿出数据 是否授权登录
if
gmail_status
==
'是'
:
email_list
.
append
(
gmail_address
)
app_code_list
.
append
(
gmail_code
)
print
(
'有效 email'
,
gmail_address
,
gmail_code
,
gmail_status
)
# 拿出数据 是否授权登录
def
read_excel_email
():
email_list
=
[]
app_code_list
=
[]
data_xls
=
xlrd
.
open_workbook
(
"./Warmup预热邮箱采集-批量.xls"
)
read_sheet
(
data_xls
,
'Gmail'
,
email_list
,
app_code_list
)
# read_sheet(data_xls,'Outlook',email_list,app_code_list)
read_sheet
(
data_xls
,
'QQ'
,
email_list
,
app_code_list
)
read_sheet
(
data_xls
,
'163'
,
email_list
,
app_code_list
)
print
(
'email_list'
,
email_list
)
# # 使用线程池来并发处理多个账户 50
with
ThreadPoolExecutor
(
max_workers
=
80
)
as
executor
:
for
email_address
,
password
in
zip
(
email_list
,
app_code_list
):
print
(
' email_address, password '
,
email_address
,
password
)
if
email_address
.
endswith
(
'gmail.com'
):
executor
.
submit
(
read_email_action
,
'imap.gmail.com'
,
993
,
email_address
,
password
)
elif
email_address
.
endswith
(
'qq.com'
):
executor
.
submit
(
read_email_action
,
'imap.qq.com'
,
993
,
email_address
,
password
)
elif
email_address
.
endswith
(
'163.com'
):
executor
.
submit
(
read_email_action
,
'imap.163.com'
,
993
,
email_address
,
password
)
elif
email_address
.
endswith
(
'outlook.com'
):
executor
.
submit
(
read_email_action
,
'imap.outlook.com'
,
993
,
email_address
,
password
)
# gmail
calc_count_time
()
if
__name__
==
"__main__"
:
startTime
=
datetime
.
now
()
global_folder_name
=
get_folder_name
()
threads_event
=
[]
read_excel_email
()
outlook.py
0 → 100644
浏览文件 @
efc92edc
import
time
,
math
from
selenium
import
webdriver
from
selenium.webdriver.common.by
import
By
from
selenium.webdriver.chrome.service
import
Service
from
selenium.webdriver.common.by
import
By
from
selenium.webdriver.support.ui
import
WebDriverWait
from
selenium.webdriver.support
import
expected_conditions
as
EC
import
os
from
PIL
import
Image
from
time
import
sleep
def
genDir
():
cur_timestemp
=
int
(
round
(
time
.
time
()
*
1000
))
dir
=
'./screen_shot/outlook/{ts}'
.
format
(
ts
=
cur_timestemp
)
os
.
makedirs
(
dir
,
exist_ok
=
True
)
return
dir
def
cutImg
(
screenshot_path
,
element
,
baseDir
,
loc
):
# 获取元素坐标和尺寸
location
=
element
.
location
# {x: number, y: number}
size
=
element
.
size
# {width: number, height: number}
# 计算裁剪区域 (左, 上, 右, 下)
left
=
location
[
'x'
]
top
=
location
[
'y'
]
right
=
location
[
'x'
]
+
size
[
'width'
]
bottom
=
location
[
'y'
]
+
size
[
'height'
]
# 使用 Pillow 裁剪图像
image
=
Image
.
open
(
screenshot_path
)
cropped_image
=
image
.
crop
((
left
,
top
,
right
,
bottom
))
cropped_image
.
save
(
f
"
{
baseDir
}
/cut_
{
loc
}
.png"
)
class
ScreenShot
:
__JS__
=
{
'scroll_to_bottom'
:
"window.scroll({top:document.body.clientHeight,left:0,behavior:'auto'});"
,
'scroll_to_y'
:
"window.scroll({top:%d,left:0,behavior:'auto'});"
,
}
__base_end__
=
'tmp_end.png'
__scroll_bottom__
=
'scroll_to_bottom'
__scroll_y__
=
'scroll_to_y'
__body__
=
'//body'
__height__
=
'height'
__clear_shell__
=
'rm -rf *.png'
__RGB__
=
'RGB'
@
classmethod
def
screen_shot
(
cls
,
driver
,
title
,
uploader_url
=
''
,
delete
=
False
):
dir
=
genDir
()
# 当前滚动高度
scrollTop
=
driver
.
execute_script
(
'return document.getElementById("ConversationReadingPaneContainer").childNodes[1].childNodes[0].scrollTop;'
)
# 可滚动高度范围
scrollHeight
=
driver
.
execute_script
(
'return document.getElementById("ConversationReadingPaneContainer").childNodes[1].childNodes[0].scrollHeight;'
)
# 渲染的可视区域
clientHeight
=
driver
.
execute_script
(
'return document.getElementById("ConversationReadingPaneContainer").childNodes[1].childNodes[0].clientHeight;'
)
print
(
'scrollTop'
,
scrollTop
,
type
(
scrollTop
))
print
(
'scrollHeight'
,
scrollHeight
)
print
(
'clientHeight'
,
clientHeight
)
lastScrollTop
=
scrollTop
# 次数索引
i
=
0
# 等待元素加载并可见
element
=
WebDriverWait
(
driver
,
10
).
until
(
EC
.
visibility_of_element_located
((
By
.
ID
,
"ConversationReadingPaneContainer"
))
)
if
lastScrollTop
<
scrollHeight
:
img_path
=
f
'
{
dir
}
/custom_
{
i
}
.png'
driver
.
save_screenshot
(
img_path
)
cutImg
(
img_path
,
element
,
dir
,
i
)
# 滚动底部 + 渲染高度 翻页滚动
while
lastScrollTop
<
scrollHeight
:
i
+=
1
# 加渲染高度 翻页 好拼接
lastScrollTop
+=
clientHeight
driver
.
execute_script
(
'document.getElementById("ConversationReadingPaneContainer").childNodes[1].childNodes[0].scrollTop={scrollTop}'
.
format
(
scrollTop
=
lastScrollTop
))
sleep
(.
5
)
img_path
=
f
'
{
dir
}
/custom_
{
i
}
.png'
driver
.
save_screenshot
(
img_path
)
cutImg
(
img_path
,
element
,
dir
,
i
)
else
:
# 完整email
img_path
=
f
'
{
dir
}
/full_screen_email.png'
driver
.
save_screenshot
(
img_path
)
@
classmethod
def
__join_images__
(
cls
,
png1
,
png2
,
size
=
0
,
output
=
'result.png'
):
"""
图片拼接
:param png1: 图片1
:param png2: 图片2
:param size: 两个图片重叠的距离
:param output: 输出的图片文件
:return:
"""
size
=
size
*
2
img1
,
img2
=
Image
.
open
(
png1
),
Image
.
open
(
png2
)
size1
,
size2
=
img1
.
size
,
img2
.
size
joint
=
Image
.
new
(
cls
.
__RGB__
,
(
size1
[
0
],
size1
[
1
]
+
size2
[
1
]
-
size
))
loc1
,
loc2
=
(
0
,
0
),
(
0
,
size1
[
1
]
-
size
)
joint
.
paste
(
img1
,
loc1
)
joint
.
paste
(
img2
,
loc2
)
joint
.
save
(
output
)
def
run
():
global
driver
options
=
webdriver
.
ChromeOptions
()
options
.
add_experimental_option
(
"detach"
,
True
)
# 步骤1获取到的User Data路径
options
.
add_argument
(
r
'--user-data-dir=C:\Users\v_bymyma\AppData\Local\Google\Chrome\User Data\Default'
)
# 步骤2获取到的--profile-directory值
options
.
add_argument
(
"--profile-directory=auto_py"
)
options
.
add_argument
(
"--disable-web-security"
)
# driver_path = "C:\\Users\\v_bymyma\PycharmProjects\pythonProject\email\driver\chromedriver.exe"
driver
=
webdriver
.
Chrome
(
options
=
options
)
driver
.
get
(
'https://outlook.live.com/mail/0/'
)
mapMSg
()
def
mapMSg
():
time
.
sleep
(
5
)
driver_msg_dom
=
driver
.
find_element
(
By
.
CLASS_NAME
,
"customScrollBar"
)
print
(
'driver_msg_dom'
,
driver_msg_dom
)
print
(
'driver_doms'
,
driver_msg_dom
)
driver_tbody_dom
=
driver_msg_dom
.
find_elements
(
By
.
CLASS_NAME
,
'EeHm8'
)
print
(
'driver_tbody_dom'
,
driver_tbody_dom
)
for
trItem
in
driver_tbody_dom
:
print
(
'trItem'
,
trItem
)
title
=
trItem
.
text
print
(
'title'
,
title
)
if
'VideoCover'
in
str
(
title
):
# 找到则点击 VideoCover
trItem
.
click
()
time
.
sleep
(
2
)
time
.
sleep
(
5
)
# 获取控制台日志
logs
=
driver
.
execute_script
(
"return window.logs;"
)
print
(
"操作日志:"
,
logs
)
# html2canvas = """
# if(!html2canvas){
# var s=document.createElement('script');
# s.src='https://html2canvas.hertzen.com/dist/html2canvas.min.js';
# document.head.appendChild(s);
# }
# """
# base64Data = driver.execute_script(html2canvas)
#
# time.sleep(2)
# driver.execute_script("""
# // 执行截图(推荐配置高清和跨域支持)
# html2canvas(document.getElementById("ConversationReadingPaneContainer").childNodes[1].childNodes[0], {
# scale: 2, // 2倍分辨率防模糊[7](@ref)
# useCORS: true, // 处理跨域图片[2,8](@ref)
# logging: false // 关闭控制台日志
# }).then(canvas=>{
# const base64=canvas.toDataURL('image/jpeg', 0.85);
# // 转换为 Base64 JPG
# console.log(base64);
# return base64;
# })
# """)
cur_timestemp
=
int
(
round
(
time
.
time
()
*
1000
))
ScreenShot
.
screen_shot
(
driver
,
'outlook_'
+
str
(
cur_timestemp
))
# print('base64Data',base64Data)
# email_box=driver.find_element(By.ID,':mt')
break
time
.
sleep
(
1
)
driver
.
quit
()
time
.
sleep
(
50
)
driver
.
close
()
if
__name__
==
'__main__'
:
run
()
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录