用celery实现一个自动发送邮件
时间: 2024-05-02 14:21:16 浏览: 62
android实现自动发送邮件
首先需要安装celery和redis:
```
pip install celery redis
```
接下来创建一个celery实例:
```python
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
```
这里的`tasks`是celery任务的名称,`broker`是指定消息代理的地址,这里使用的是redis。
然后定义一个发送邮件的任务:
```python
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
@app.task
def send_email(to_email, subject, content):
from_email = 'xxx@example.com' # 发件人邮箱
password = 'password' # 发件人邮箱密码
msg = MIMEText(content, 'plain', 'utf-8')
msg['From'] = formataddr(('Sender', from_email))
msg['To'] = formataddr(('Recipient', to_email))
msg['Subject'] = subject
try:
server = smtplib.SMTP('smtp.example.com', 587) # 发件人邮箱SMTP服务器地址和端口号
server.starttls()
server.login(from_email, password)
server.sendmail(from_email, [to_email], msg.as_string())
server.quit()
print(f'Successfully sent email to {to_email}')
except Exception as e:
print(f'Error: {e}')
```
这里使用了`smtplib`库来发送邮件,需要设置发件人邮箱、密码、SMTP服务器地址和端口号,以及收件人邮箱、邮件主题和内容。
最后,在主程序中调用这个任务:
```python
from tasks import send_email
result = send_email.delay('recipient@example.com', 'Test email', 'Hello, world!')
```
这里使用了`delay()`方法来异步执行任务,将收件人邮箱、邮件主题和内容作为参数传递给任务。任务执行完毕后,可以通过`result.get()`方法获取任务的返回值。
完整代码如下:
```python
from celery import Celery
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def send_email(to_email, subject, content):
from_email = 'xxx@example.com' # 发件人邮箱
password = 'password' # 发件人邮箱密码
msg = MIMEText(content, 'plain', 'utf-8')
msg['From'] = formataddr(('Sender', from_email))
msg['To'] = formataddr(('Recipient', to_email))
msg['Subject'] = subject
try:
server = smtplib.SMTP('smtp.example.com', 587) # 发件人邮箱SMTP服务器地址和端口号
server.starttls()
server.login(from_email, password)
server.sendmail(from_email, [to_email], msg.as_string())
server.quit()
print(f'Successfully sent email to {to_email}')
except Exception as e:
print(f'Error: {e}')
if __name__ == '__main__':
result = send_email.delay('recipient@example.com', 'Test email', 'Hello, world!')
print(result.get())
```
注意要在`if __name__ == '__main__':`中调用任务,否则会导致循环引用的问题。
阅读全文