Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

第九章测试章节遇到问题 #19

Open
amao-code opened this issue Jun 28, 2023 · 3 comments
Open

第九章测试章节遇到问题 #19

amao-code opened this issue Jun 28, 2023 · 3 comments

Comments

@amao-code
Copy link

  • 第九章的测试代码无法正常运行,修改后出现的其他问题能麻烦帮忙看一下么,因为我对flask还不是很熟悉
  • 使用的程序版本
    · Flask == 2.3.2
    · Flask-Login == 0.6.2
    · Flask-SQLAlchemy == 3.0.5
    · SQLAlchemy == 2.0.17
  • 程序主代码
#!/bin/bash/env python3


from flask import Flask, url_for, render_template, redirect, flash, request
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager,UserMixin,login_user,login_required, logout_user, current_user
from werkzeug.security import generate_password_hash, check_password_hash
from markupsafe import escape
import os
import sys
import click

app = Flask(__name__)

app.config['SECRET_KEY'] = 'dev'  # 等同于 app.secret_key = 'dev'
login_manager = LoginManager(app)  # 实例化扩展类
login_manager.login_view = 'login'

WIN = sys.platform.startswith('win')
if WIN:  # 如果是 Windows 系统,使用三个斜线
    prefix = 'sqlite:///'
else:  # 否则使用四个斜线
    prefix = 'sqlite:////'

app.config['SQLALCHEMY_DATABASE_URI'] = prefix + os.path.join(app.root_path, 'data.db')
# app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False  # 关闭对模型修改的监控
db = SQLAlchemy(app) # 初始化扩展,传入程序实例 app

# @app.route('/')
# def hello():
#     return 'Hello'

# @app.route('/user/<name>')
# def user_page(name):
#     return f'User: {escape(name)}'

# @app.route('/test')
# def test_url_for():
#     # 下面是一些调用示例(请访问 http://localhost:5000/test 后在命令行窗口查看输出的 URL):
#     print(url_for('hello'))  # 生成 hello 视图函数对应的 URL,将会输出:/
#     # 注意下面两个调用是如何生成包含 URL 变量的 URL 的
#     print(url_for('user_page', name='greyli'))  # 输出:/user/greyli
#     print(url_for('user_page', name='peter'))  # 输出:/user/peter
#     print(url_for('test_url_for'))  # 输出:/test
#     # 下面这个调用传入了多余的关键字参数,它们会被作为查询字符串附加到 URL 后面。
#     print(url_for('test_url_for', num=2))  # 输出:/test?num=2
#     return 'Test page'

# name = 'Grey Li'
# movies = [
#     {'title': 'My Neighbor Totoro', 'year': '1988'},
#     {'title': 'Dead Poets Society', 'year': '1989'},
#     {'title': 'A Perfect World', 'year': '1993'},
#     {'title': 'Leon', 'year': '1994'},
#     {'title': 'Mahjong', 'year': '1996'},
#     {'title': 'Swallowtail Butterfly', 'year': '1996'},
#     {'title': 'King of Comedy', 'year': '1999'},
#     {'title': 'Devils on the Doorstep', 'year': '1999'},
#     {'title': 'WALL-E', 'year': '2008'},
#     {'title': 'The Pork of Music', 'year': '2012'},
# ]
# @app.route('/')
# def index():
#     return render_template('index.html', name=name, movies=movies)


class Movie(db.Model):
    id = db.Column(db.Integer, primary_key=True)  # 主键
    title = db.Column(db.String(60)) #电影标题
    year = db.Column(db.String(4))  #电影年份

class User(db.Model, UserMixin):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(20))
    username = db.Column(db.String(20))  # 用户名
    password_hash = db.Column(db.String(128))  # 密码散列值

    def set_password(self, password):  # 用来设置密码的方法,接受密码作为参数
        self.password_hash = generate_password_hash(password)  # 将生成的密码保持到对应字段

    def validate_password(self, password):  # 用于验证密码的方法,接受密码作为参数
        return check_password_hash(self.password_hash, password)  # 返回布尔值

@app.cli.command() # 注册为命令,可以传入 name 参数来自定义命令
@click.option('--drop', is_flag=True, help='Create after drop') # 设置选项
def initdb(drop):
    """Initialize the databases."""
    if drop:  #判断是否输入了选项
        db.drop_all()
    db.create_all()
    click.echo('Initalize databases.') # 输出提示信息


@app.cli.command()
def forge():
    """Generate fake data."""
    db.create_all()

    # 全局的两个变量移动到这个函数内
    name = 'Grey Li'
    movies = [
        {'title': 'My Neighbor Totoro', 'year': '1988'},
        {'title': 'Dead Poets Society', 'year': '1989'},
        {'title': 'A Perfect World', 'year': '1993'},
        {'title': 'Leon', 'year': '1994'},
        {'title': 'Mahjong', 'year': '1996'},
        {'title': 'Swallowtail Butterfly', 'year': '1996'},
        {'title': 'King of Comedy', 'year': '1999'},
        {'title': 'Devils on the Doorstep', 'year': '1999'},
        {'title': 'WALL-E', 'year': '2008'},
        {'title': 'The Pork of Music', 'year': '2012'},
    ]

    user = User(name=name)
    db.session.add(user)
    for m in movies:
        movie = Movie(title=m['title'], year=m['year'])
        db.session.add(movie)

    db.session.commit()
    click.echo('Done.')

@app.cli.command()
@click.option('--username', prompt=True, help='The username used to login.')
@click.option('--password', prompt=True, hide_input=True, confirmation_prompt=True, help='The password used to login.')
def admin(username, password):
    """Create user."""
    db.create_all()

    user = User.query.first()
    if user is not None:
        click.echo('Updating user...')
        user.username = username
        user.set_password(password)  # 设置密码
    else:
        click.echo('Creating user...')
        user = User(username=username, name='Admin')
        user.set_password(password)  # 设置密码
        db.session.add(user)

    db.session.commit()  # 提交数据库会话
    click.echo('Done.')

@app.context_processor
def inject_user():  # 函数名可以随意修改
    user = User.query.first()
    return dict(user=user)  # 需要返回字典,等同于 return {'user': user}


@app.errorhandler(404)  # 传入要处理的错误代码
def page_not_found(e):  # 接受异常对象作为参数
    user = User.query.first()
    return render_template('404.html'), 404  # 返回模板和状态码

@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'POST':  # 判断是否是 POST 请求
        if not current_user.is_authenticated:  # 如果当前用户未认证
            return redirect(url_for('index'))  # 重定向到主页
        # 获取表单数据
        title = request.form.get('title')  # 传入表单对应输入字段的 name 值
        year = request.form.get('year')
        # 验证数据
        if not title or not year or len(year) != 4 or len(title) > 60:
            flash('Invalid input.')  # 显示错误提示
            return redirect(url_for('index'))  # 重定向回主页
        # 保存表单数据到数据库
        movie = Movie(title=title, year=year)  # 创建记录
        db.session.add(movie)  # 添加到数据库会话
        db.session.commit()  # 提交数据库会话
        flash('Item created.')  # 显示成功创建的提示
        return redirect(url_for('index'))  # 重定向回主页
    movies = Movie.query.all()
    return render_template('index.html', movies=movies)
    user = User.query.first()  # 读取用户记录
    movies = Movie.query.all()  # 读取所有电影记录
    return render_template('index.html', user=user, movies=movies)

@app.route('/movie/edit/<int:movie_id>', methods=['GET', 'POST'])
@login_required
def edit(movie_id):
    movie = Movie.query.get_or_404(movie_id)

    if request.method == 'POST':  # 处理编辑表单的提交请求
        title = request.form['title']
        year = request.form['year']

        if not title or not year or len(year) != 4 or len(title) > 60:
            flash('Invalid input.')
            return redirect(url_for('edit', movie_id=movie_id))  # 重定向回对应的编辑页面

        movie.title = title  # 更新标题
        movie.year = year  # 更新年份
        db.session.commit()  # 提交数据库会话
        flash('Item updated.')
        return redirect(url_for('index'))  # 重定向回主页

    return render_template('edit.html', movie=movie)  # 传入被编辑的电影记录

@app.route('/movie/delete/<int:movie_id>', methods=['POST'])  # 限定只接受 POST 请求
@login_required
def delete(movie_id):
    movie = Movie.query.get_or_404(movie_id)  # 获取电影记录
    db.session.delete(movie)  # 删除对应的记录
    db.session.commit()  # 提交数据库会话
    flash('Item deleted.')
    return redirect(url_for('index'))  # 重定向回主页

@login_manager.user_loader
def load_user(user_id):  # 创建用户加载回调函数,接受用户 ID 作为参数
    user = User.query.get(int(user_id))  # 用 ID 作为 User 模型的主键查询对应的用户
    return user  # 返回用户对象

@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        username = request.form['username']
        password = request.form['password']

        if not username or not password:
            flash('Invalid input.')
            return redirect(url_for('login'))

        user = User.query.first()
        # 验证用户名和密码是否一致
        if username == user.username and user.validate_password(password):
            login_user(user)  # 登入用户
            flash('Login success.')
            return redirect(url_for('index'))  # 重定向到主页

        flash('Invalid username or password.')  # 如果验证失败,显示错误消息
        return redirect(url_for('login'))  # 重定向回登录页面

    return render_template('login.html')

@app.route('/logout')
@login_required  # 用于视图保护,后面会详细介绍
def logout():
    logout_user()  # 登出用户
    flash('Goodbye.')
    return redirect(url_for('index'))  # 重定向回首页

@app.route('/settings', methods=['GET', 'POST'])
@login_required
def settings():
    if request.method == 'POST':
        name = request.form['name']

        if not name or len(name) > 20:
            flash('Invalid input.')
            return redirect(url_for('settings'))

        current_user.name = name
        # current_user 会返回当前登录用户的数据库记录对象
        # 等同于下面的用法
        # user = User.query.first()
        # user.name = name
        db.session.commit()
        flash('Settings updated.')
        return redirect(url_for('index'))

    return render_template('settings.html')
  • 使用文章第九章上的测试代码
import unittest

from app import app, db, Movie, User


class WatchlistTestCase(unittest.TestCase):

    def setUp(self):
        # 更新配置
        app.config.update(
            TESTING=True,
            SQLALCHEMY_DATABASE_URI='sqlite:///:memory:'
        )
        # 创建数据库和表
        db.create_all()
        # 创建测试数据,一个用户,一个电影条目
        user = User(name='Test', username='test')
        user.set_password('123')
        movie = Movie(title='Test Movie Title', year='2019')
        # 使用 add_all() 方法一次添加多个模型类实例,传入列表
        db.session.add_all([user, movie])
        db.session.commit()

        self.client = app.test_client()  # 创建测试客户端
        self.runner = app.test_cli_runner()  # 创建测试命令运行器

    def tearDown(self):
        db.session.remove()  # 清除数据库会话
        db.drop_all()  # 删除数据库表

    # 测试程序实例是否存在
    def test_app_exist(self):
        self.assertIsNotNone(app)

    # 测试程序是否处于测试模式
    def test_app_is_testing(self):
        self.assertTrue(app.config['TESTING'])

if __name__ == '__main__':
    unittest.main()

出现如下报错

PS D:\python-script\flask-learning> & "D:/Program Files/Python3.10/python.exe" d:/python-script/flask-learning/test_watchlist.py
EE
======================================================================
ERROR: test_app_exist (__main__.WatchlistTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "d:\python-script\flask-learning\test_watchlist.py", line 15, in setUp
    db.create_all()
  File "D:\Program Files\Python3.10\Lib\site-packages\flask_sqlalchemy\extension.py", line 887, in create_all
    self._call_for_binds(bind_key, "create_all")
  File "D:\Program Files\Python3.10\Lib\site-packages\flask_sqlalchemy\extension.py", line 858, in _call_for_binds
    engine = self.engines[key]
  File "D:\Program Files\Python3.10\Lib\site-packages\flask_sqlalchemy\extension.py", line 639, in engines
    app = current_app._get_current_object()  # type: ignore[attr-defined]
  File "D:\Program Files\Python3.10\Lib\site-packages\werkzeug\local.py", line 508, in _get_current_object
    raise RuntimeError(unbound_message) from None
RuntimeError: Working outside of application context.

This typically means that you attempted to use functionality that needed
the current application. To solve this, set up an application context
with app.app_context(). See the documentation for more information.

** 看报错似乎为上下文的异常,我修改后的代码运行,可以通过测试,但是存在的问题是app.config.update里面的配置未生效,使用的还是原文件的SQLALCHEMY_DATABASE_URI配置,导致会覆盖开发环境的数据

  • 修改后的代码
import unittest

from app import app, db, Movie, User


class WatchlistTestCase(unittest.TestCase):

    def setUp(self):
        # 更新配置
        self.app = app
        app.config.update(
            TESTING=True,
            SQLALCHEMY_DATABASE_URI='sqlite:///:memory:'
        )
        self.app_ctxt = self.app.app_context()
        self.app_ctxt.push()
        # 创建数据库和表
        db.create_all()
        # 创建测试数据,一个用户,一个电影条目
        user = User(name='Test', username='test')
        user.set_password('123')
        movie = Movie(title='Test Movie Title', year='2019')
        # 使用 add_all() 方法一次添加多个模型类实例,传入列表
        db.session.add_all([user, movie])
        db.session.commit()

        self.client = app.test_client()  # 创建测试客户端
        self.runner = app.test_cli_runner()  # 创建测试命令运行器

    def tearDown(self):
        db.session.remove()  # 清除数据库会话
        db.drop_all()  # 删除数据库表
        self.app_ctxt.pop()

    # 测试程序实例是否存在
    def test_app_exist(self):
        self.assertIsNotNone(app)

    # 测试程序是否处于测试模式
    def test_app_is_testing(self):
        self.assertTrue(app.config['TESTING'])

if __name__ == '__main__':
    unittest.main()

** 测试通过,但是开发数据库被覆盖了

PS D:\python-script\flask-learning> & "D:/Program Files/Python3.10/python.exe" d:/python-script/flask-learning/test_watchlist.py
..
----------------------------------------------------------------------
Ran 2 tests in 1.145s

OK
PS D:\python-script\flask-learning> 
@1evergreen
Copy link

遇到了同样的问题, 使用 with app.app_context(): 上下文环境 可以解决测试error,但是开发数据库数据被覆盖.

@greyli
Copy link
Member

greyli commented Dec 11, 2023

Flask-SQLAlchemy 新版本有一些行为变化导致了这个问题,除了把程序改为使用工厂函数来创建程序,暂时没有好方法。临时可以固定旧版本的 Flask-SQLAlchemy。

我想先等等看这个 issue 的最终结果:pallets-eco/flask-sqlalchemy#1172

@TANG839
Copy link

TANG839 commented May 5, 2024

什么时候可以解决这个issue呢?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants