有218人阅读过
使用python监测文件夹变动,发送邮件通知
发布于2024/08/28 更新于2024/08/28
[ 教程仅保证更新时有效,请自行测试。]
发布于2024/08/28 更新于2024/08/28
[ 教程仅保证更新时有效,请自行测试。]
[ 教程仅保证更新时有效,请自行测试。]
代码如下:
import os
import time
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# 邮件配置
email_config = {
'sender': '1111@qq.com',
'receivers': ['2222@qq.com', '3333@qq.com'],
'smtp_server': 'smtp.qq.com',
'smtp_port_ssl': 465, # SSL 端口
'smtp_port_tls': 587, # STARTTLS 端口
'password': '444444', # QQ 邮箱授权码
}
class TestHandler(FileSystemEventHandler):
def __init__(self, email_config):
self.email_config = email_config
self.last_modified_time = 0
self.last_email_time = 0
self.email_interval = 10 # 限制邮件发送间隔,单位:秒
def on_modified(self, event):
current_time = time.time()
if current_time - self.last_modified_time > 2:
self.last_modified_time = current_time
print(f"文件 {event.src_path} 已被修改")
if current_time - self.last_email_time > self.email_interval:
self.last_email_time = current_time
self.send_email_notification(event.src_path)
def send_email_notification(self, file_path):
print("正在发送邮件通知...")
msg = MIMEMultipart()
msg['From'] = self.email_config['sender']
msg['To'] = ", ".join(self.email_config['receivers'])
#msg['Subject'] = "文件变动通知"
#body = f"文件 {os.path.basename(file_path)} 已被修改。"
msg['Subject'] = "新采购订单通知"
body = f"有新的采购订单需求,请及时查看并处理!"
msg.attach(MIMEText(body, 'plain'))
for attempt in range(3):
try:
print(f"尝试通过 SSL 端口 {self.email_config['smtp_port_ssl']} 连接...")
server = smtplib.SMTP_SSL(self.email_config['smtp_server'], self.email_config['smtp_port_ssl'])
server.set_debuglevel(1)
server.login(self.email_config['sender'], self.email_config['password'])
server.sendmail(self.email_config['sender'], self.email_config['receivers'], msg.as_string())
print("邮件发送成功")
break
except Exception as e:
print(f"通过 SSL 发送邮件失败,尝试第 {attempt + 1} 次:{e}")
time.sleep(2)
finally:
try:
server.quit()
except Exception as e:
print(f"关闭 SSL 连接时出错:{e}")
if attempt == 2:
print("尝试通过 STARTTLS 端口连接...")
try:
server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port_tls'])
server.set_debuglevel(1)
server.starttls()
server.login(self.email_config['sender'], self.email_config['password'])
server.sendmail(self.email_config['sender'], self.email_config['receivers'], msg.as_string())
print("通过 STARTTLS 端口邮件发送成功")
except Exception as e:
print(f"通过 STARTTLS 发送邮件失败:{e}")
finally:
try:
server.quit()
except Exception as e:
print(f"关闭 STARTTLS 连接时出错:{e}")
if __name__ == "__main__":
path = "assets"
event_handler = TestHandler(email_config)
observer = Observer()
observer.schedule(event_handler, path=path, recursive=False)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()文章对你有帮助吗?
- 一般[0]

- 很赞[0]

- 没用[0]

- 垃圾[0]

- 无语[0]


