FLask如何连接mysql数据库
时间: 2023-07-20 20:35:23 浏览: 119
要连接MySQL数据库,需要使用Flask中的扩展程序Flask-MySQLdb。首先需要安装该扩展程序,可以通过以下命令进行安装:
```
pip install flask-mysqldb
```
安装完成后,可以使用以下代码连接MySQL数据库:
```python
from flask import Flask
from flask_mysqldb import MySQL
app = Flask(__name__)
app.config['MYSQL_HOST'] = 'localhost'
app.config['MYSQL_USER'] = 'username'
app.config['MYSQL_PASSWORD'] = 'password'
app.config['MYSQL_DB'] = 'database_name'
mysql = MySQL(app)
```
其中,`MYSQL_HOST`表示MySQL数据库的主机名,`MYSQL_USER`表示登录MySQL数据库的用户名,`MYSQL_PASSWORD`表示登录MySQL数据库的密码,`MYSQL_DB`表示要连接的MySQL数据库名。连接成功后,可以使用`mysql`对象执行MySQL语句。
相关问题
如何结合Flex(Flash)与PHP后端及MySQL数据库实现用户登录功能,并确保安全的数据库连接?请提供相关代码示例。
为了实现用户登录功能和安全的数据库连接,我们需要关注几个关键点:用户界面的Flex前端开发、PHP后端逻辑处理以及MySQL数据库的安全交互。结合《Flex+PHP+MySQL:开心农场游戏开发教程》,可以学习到这些技术的综合运用。
参考资源链接:[Flex+PHP+MySQL:开心农场游戏开发教程](https://wenku.csdn.net/doc/7kqsbw0sjz?spm=1055.2569.3001.10343)
首先,在Flex前端,我们使用ActionScript创建登录界面,并使用RemoteObject与PHP后端通信。示例代码如下:
```actionscript
<mx:RemoteObject destination=
参考资源链接:[Flex+PHP+MySQL:开心农场游戏开发教程](https://wenku.csdn.net/doc/7kqsbw0sjz?spm=1055.2569.3001.10343)
改为使用mysql数据库
要将现有的Flask应用程序从SQLite迁移到MySQL,需要进行以下步骤:
1. **安装MySQL驱动**:首先,确保安装了MySQL的Python驱动程序 `mysql-connector-python` 或者 `pymysql`。可以使用以下命令安装:
```bash
pip install mysql-connector-python
```
2. **修改数据库连接配置**:将原有的SQLite数据库连接代码更改为MySQL的连接代码。
3. **调整SQL语句**:虽然大多数SQL语句在SQLite和MySQL之间是兼容的,但某些语法可能会有所不同,需要进行相应的调整。
以下是修改后的代码示例:
### 修改后的代码
```python
from flask import Flask, render_template, request, redirect, url_for, flash, session
import mysql.connector
import bcrypt
import logging
app = Flask(__name__)
app.secret_key = 'your_secret_key'
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 数据库连接
def connect_db():
conn = mysql.connector.connect(
host='localhost',
user='your_username',
password='your_password',
database='pet_weight_management'
)
return conn
# 初始化数据库
def init_db():
conn = connect_db()
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(255) UNIQUE NOT NULL,
password VARCHAR(255) NOT NULL
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS pets (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
pet_name VARCHAR(255) NOT NULL,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS pet_weights (
id INT AUTO_INCREMENT PRIMARY KEY,
pet_id INT NOT NULL,
weight FLOAT NOT NULL,
recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (pet_id) REFERENCES pets (id) ON DELETE CASCADE
)
''')
conn.commit()
conn.close()
@app.route('/')
def index():
return render_template('index.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
conn = connect_db()
cursor = conn.cursor()
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
try:
cursor.execute('INSERT INTO users (username, password) VALUES (%s, %s)', (username, hashed_password))
conn.commit()
flash("注册成功!", 'success')
logging.info("用户 %s 注册成功!", username)
return redirect(url_for('login'))
except mysql.connector.IntegrityError:
flash("用户名已存在!", 'warning')
logging.warning("用户名 %s 已存在!", username)
finally:
conn.close()
return render_template('register.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
conn = connect_db()
cursor = conn.cursor()
cursor.execute('SELECT id, password FROM users WHERE username = %s', (username,))
result = cursor.fetchone()
if result and bcrypt.checkpw(password.encode('utf-8'), result[1]):
session['user_id'] = result[0]
flash("登录成功!", 'success')
logging.info("用户 %s 登录成功!", username)
return redirect(url_for('dashboard'))
else:
flash("用户名或密码错误!", 'danger')
logging.warning("用户名或密码错误!")
conn.close()
return render_template('login.html')
@app.route('/logout')
def logout():
session.pop('user_id', None)
flash("已退出登录。", 'info')
logging.info("已退出登录。")
return redirect(url_for('index'))
@app.route('/dashboard')
def dashboard():
if 'user_id' not in session:
return redirect(url_for('login'))
user_id = session['user_id']
conn = connect_db()
cursor = conn.cursor()
cursor.execute('SELECT id, pet_name FROM pets WHERE user_id = %s', (user_id,))
pets = cursor.fetchall()
conn.close()
return render_template('dashboard.html', pets=pets)
@app.route('/add_pet', methods=['GET', 'POST'])
def add_pet():
if 'user_id' not in session:
return redirect(url_for('login'))
if request.method == 'POST':
user_id = session['user_id']
pet_name = request.form['pet_name']
weight = float(request.form['weight'])
conn = connect_db()
cursor = conn.cursor()
cursor.execute('INSERT INTO pets (user_id, pet_name) VALUES (%s, %s)', (user_id, pet_name))
pet_id = cursor.lastrowid
cursor.execute('INSERT INTO pet_weights (pet_id, weight) VALUES (%s, %s)', (pet_id, weight))
conn.commit()
flash("宠物添加成功,初始体重为 %.2f kg" % weight, 'success')
logging.info("宠物 %s 添加成功,初始体重为 %.2f kg", pet_name, weight)
conn.close()
return redirect(url_for('dashboard'))
return render_template('add_pet.html')
@app.route('/view_pet/<int:pet_id>')
def view_pet(pet_id):
if 'user_id' not in session:
return redirect(url_for('login'))
conn = connect_db()
cursor = conn.cursor()
cursor.execute('SELECT pet_name FROM pets WHERE id = %s', (pet_id,))
pet = cursor.fetchone()
if not pet:
flash("宠物不存在!", 'warning')
logging.warning("宠物不存在!")
return redirect(url_for('dashboard'))
cursor.execute('SELECT weight, recorded_at FROM pet_weights WHERE pet_id = %s ORDER BY recorded_at', (pet_id,))
weights = cursor.fetchall()
conn.close()
return render_template('view_pet.html', pet=pet, weights=weights)
@app.route('/update_pet_weight/<int:pet_id>', methods=['GET', 'POST'])
def update_pet_weight(pet_id):
if 'user_id' not in session:
return redirect(url_for('login'))
conn = connect_db()
cursor = conn.cursor()
# 获取当前宠物的所有体重记录
cursor.execute('SELECT weight, recorded_at FROM pet_weights WHERE pet_id = %s ORDER BY recorded_at DESC', (pet_id,))
weights = cursor.fetchall()
if request.method == 'POST':
weight = float(request.form['weight'])
cursor.execute('INSERT INTO pet_weights (pet_id, weight) VALUES (%s, %s)', (pet_id, weight))
conn.commit()
flash("宠物体重更新成功!", 'success')
logging.info("宠物体重更新成功!")
conn.close()
return redirect(url_for('view_pet', pet_id=pet_id))
conn.close()
return render_template('update_pet_weight.html', pet_id=pet_id, weights=weights)
@app.route('/delete_account', methods=['GET', 'POST'])
def delete_account():
if 'user_id' not in session:
return redirect(url_for('login'))
if request.method == 'POST':
user_id = session['user_id']
conn = connect_db()
cursor = conn.cursor()
try:
# 删除宠物的体重记录
cursor.execute('DELETE FROM pet_weights WHERE pet_id IN (SELECT id FROM pets WHERE user_id = %s)', (user_id,))
# 删除宠物信息
cursor.execute('DELETE FROM pets WHERE user_id = %s', (user_id,))
# 删除用户信息
cursor.execute('DELETE FROM users WHERE id = %s', (user_id,))
conn.commit()
session.pop('user_id', None)
flash("账号已注销。", 'success')
logging.info("用户 %s 账号已注销。", user_id)
except Exception as e:
flash("注销账号失败: %s" % str(e), 'danger')
logging.error("注销账号失败: %s", str(e))
finally:
conn.close()
return redirect(url_for('index'))
return render_template('delete_account.html')
if __name__ == '__main__':
init_db()
app.run(debug=True)
```
### 关键点总结
1. **数据库连接**:使用 `mysql.connector` 连接到MySQL数据库。
2. **SQL语句**:调整SQL语句以适应MySQL的语法(例如,使用 `%s` 作为占位符)。
3. **初始化数据库**:创建表时使用MySQL的语法。
通过以上步骤,你可以将现有的Flask应用从SQLite迁移到MySQL。
阅读全文