##问题描述
从github(https://github.com/num10/transfile)上下载的一个用flask+mysql实现的转pdf成word的web应用,其中
涉及到新用户的注册:
可是在注册页面填完信息点击注册后前端报错如下:
sqlalchemy.orm.exc.ObjectDeletedError: Instance '<User at 0x58072f0>' has been deleted, or its row is otherwise not present.
根据traceback的信息,定位到出错的位置应该是下图中的models.py和views.py文件
后台关于这两个文件的报错信息为:
File "C:\Users\26808\transfile-master\app\auth\views.py", line 68, in register
token = user.generate_confirmation_token()
File "C:\Users\26808\transfile-master\app\models.py", line 142, in generate_confirmation_token
return s.dumps({'confirm': self.id}).decode('utf-8')
对应文件中:
views.py:
@auth.route('/register', methods=['GET', 'POST'])
def register():
form = RegistrationForm()
if form.validate_on_submit():
user = User(email=form.email.data,
username=form.username.data,
password=form.password.data)
db.session.add(user)
db.session.commit()
token = user.generate_confirmation_token()
send_email(user.email, '认证您的账户',
'auth/email/confirm', user=user, token=token)
flash('认证信息已发送至您的邮箱')
return redirect(url_for('auth.login'))
return render_template('auth/register.html', form=form)
models.py:
def generate_confirmation_token(self, expiration=3600):
s = Serializer(current_app.config['SECRET_KEY'], expiration)
return s.dumps({'confirm': self.id}).decode('utf-8')
##补充
1.按照requirements文件的要求,flask都安装了正确的包
2.正如register函数中所写的那样,点击注册后用户信息会被加入mysql数据库中但是会被重复加入,即多次运行每次都出错,并且相同的用户被反复加入mysql数据库中
3.网上大多数都是通过app.run()来启动文件的,但是这个项目里面先要设置set FLASK_APP = manage.py
在flask run
才能运行
能解释一下为什么吗?
manage.py代码如下:
#!/usr/bin/ python
# -*- coding: UTF-8 -*-
import os
COV = None
if os.environ.get('FLASK_COVERAGE'):
import coverage
COV = coverage.coverage(branch=True, include='app/*')
COV.start()
import sys
import click
from flask_migrate import Migrate,upgrade
from app import create_app, db
from app.models import User, Follow, Role, Permission, Post, Comment
from flask_uploads import configure_uploads
from app.auth.forms import pdfs,photos,words
app = create_app(os.getenv('FLASK_CONFIG') or 'default')
migrate = Migrate(app, db)
configure_uploads(app, pdfs)
configure_uploads(app, photos)
configure_uploads(app, words)
@app.shell_context_processor
def make_shell_context():
return dict(db=db, User=User, Follow=Follow, Role=Role,
Permission=Permission, Post=Post, Comment=Comment)
@app.cli.command()
@click.option('--coverage/--no-coverage', default=False,
help='Run tests under code coverage.')
def test(coverage=False):
"""Run the unit tests."""
if coverage and not os.environ.get('FLASK_COVERAGE'):
import subprocess
os.environ['FLASK_COVERAGE'] = '1'
sys.exit(subprocess.call(sys.argv))
import unittest
tests = unittest.TestLoader().discover('tests')
unittest.TextTestRunner(verbosity=2).run(tests)
if COV:
COV.stop()
COV.save()
print('Coverage Summary:')
COV.report()
basedir = os.path.abspath(os.path.dirname(__file__))
covdir = os.path.join(basedir, 'tmp/coverage')
COV.html_report(directory=covdir)
print('HTML version: file://%s/index.html' % covdir)
COV.erase()
@app.cli.command()
@click.option('--length', default=25,
help='Number of functions to include in the profiler report.')
@click.option('--profile-dir', default=None,
help='Directory where profiler data files are saved.')
def profile(length=25, profile_dir=None):
"""Start the application under the code profiler."""
from werkzeug.contrib.profiler import ProfilerMiddleware
app.wsgi_app = ProfilerMiddleware(app.wsgi_app, restrictions=[length],
profile_dir=profile_dir)
app.run(debug=False)
@app.cli.command()
def deploy():
"""Run deployment tasks."""
# migrate database to latest revision
upgrade()
# create or update user roles
Role.insert_roles()
# ensure all users are following themselves
User.add_self_follows()
if __name__ == '__main__':
deploy()
数据模型models.py:
#!/usr/bin/ python
# -*- coding: UTF-8 -*-
from datetime import datetime
import hashlib
from werkzeug.security import generate_password_hash, check_password_hash
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from markdown import markdown
import bleach
from flask import current_app, request,url_for
from flask_login import UserMixin, AnonymousUserMixin
from . import db, login_manager
class Permission:
FOLLOW = 1
COMMENT = 2
WRITE = 4
MODERATE = 8
ADMIN = 16
class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
default = db.Column(db.Boolean, default=False, index=True)
permissions = db.Column(db.Integer)
users = db.relationship('User', backref='role', lazy='dynamic')
def __init__(self, **kwargs):
super(Role, self).__init__(**kwargs)
if self.permissions is None:
self.permissions = 0
@staticmethod
def insert_roles():
roles = {
'User': [Permission.FOLLOW, Permission.COMMENT, Permission.WRITE],
'Moderator': [Permission.FOLLOW, Permission.COMMENT,
Permission.WRITE, Permission.MODERATE],
'Administrator': [Permission.FOLLOW, Permission.COMMENT,
Permission.WRITE, Permission.MODERATE,
Permission.ADMIN],
}
default_role = 'User'
for r in roles:
role = Role.query.filter_by(name=r).first()
if role is None:
role = Role(name=r)
role.reset_permissions()
for perm in roles[r]:
role.add_permission(perm)
role.default = (role.name == default_role)
db.session.add(role)
db.session.commit()
def add_permission(self, perm):
if not self.has_permission(perm):
self.permissions += perm
def remove_permission(self, perm):
if self.has_permission(perm):
self.permissions -= perm
def reset_permissions(self):
self.permissions = 0
def has_permission(self, perm):
return self.permissions & perm == perm
def __repr__(self):
return '<Role %r>' % self.name
class Follow(db.Model):
__tablename__ = 'follows'
follower_id = db.Column(db.Integer, db.ForeignKey('users.id'),
primary_key=True)
followed_id = db.Column(db.Integer, db.ForeignKey('users.id'),
primary_key=True)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
class User(UserMixin, db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(64), unique=True, index=True)
username = db.Column(db.String(64), unique=True, index=True)
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
password_hash = db.Column(db.String(128))
confirmed = db.Column(db.Boolean, default=False)
name = db.Column(db.String(64))
location = db.Column(db.String(64))
about_me = db.Column(db.Text())
member_since = db.Column(db.DateTime(), default=datetime.utcnow)
last_seen = db.Column(db.DateTime(), default=datetime.utcnow)
avatar_hash = db.Column(db.String(32))
posts = db.relationship('Post', backref='author', lazy='dynamic')
followed = db.relationship('Follow',
foreign_keys=[Follow.follower_id],
backref=db.backref('follower', lazy='joined'),
lazy='dynamic',
cascade='all, delete-orphan')
followers = db.relationship('Follow',
foreign_keys=[Follow.followed_id],
backref=db.backref('followed', lazy='joined'),
lazy='dynamic',
cascade='all, delete-orphan')
comments = db.relationship('Comment', backref='author', lazy='dynamic')
@staticmethod
def add_self_follows():
for user in User.query.all():
if not user.is_following(user):
user.follow(user)
db.session.add(user)
db.session.commit()
def __init__(self, **kwargs):
super(User, self).__init__(**kwargs)
if self.role is None:
if self.email == current_app.config['FLASKY_ADMIN']:
self.role = Role.query.filter_by(name='Administrator').first()
if self.role is None:
self.role = Role.query.filter_by(default=True).first()
if self.email is not None and self.avatar_hash is None:
self.avatar_hash = self.gravatar_hash()
self.follow(self)
@property
def password(self):
raise AttributeError('password is not a readable attribute')
@password.setter
def password(self, password):
self.password_hash = generate_password_hash(password)
def verify_password(self, password):
return check_password_hash(self.password_hash, password)
def generate_confirmation_token(self, expiration=3600):
s = Serializer(current_app.config['SECRET_KEY'], expiration)
return s.dumps({'confirm': self.id}).decode('utf-8')
def confirm(self, token):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token.encode('utf-8'))
except:
return False
if data.get('confirm') != self.id:
return False
self.confirmed = True
db.session.add(self)
return True
def generate_reset_token(self, expiration=3600):
s = Serializer(current_app.config['SECRET_KEY'], expiration)
return s.dumps({'reset': self.id}).decode('utf-8')
@staticmethod
def reset_password(token, new_password):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token.encode('utf-8'))
except:
return False
user = User.query.get(data.get('reset'))
if user is None:
return False
user.password = new_password
db.session.add(user)
return True
def can(self, perm):
return self.role is not None and self.role.has_permission(perm)
def is_administrator(self):
return self.can(Permission.ADMIN)
def ping(self):
self.last_seen = datetime.utcnow()
db.session.add(self)
def gravatar_hash(self):
return hashlib.md5(self.email.lower().encode('utf-8')).hexdigest()
def gravatar(self, size=100, default='identicon', rating='g'):
url = 'https://secure.gravatar.com/avatar'
hash = self.avatar_hash or self.gravatar_hash()
return '{url}/{hash}?s={size}&d={default}&r={rating}'.format(
url=url, hash=hash, size=size, default=default, rating=rating)
def follow(self, user):
if not self.is_following(user):
f = Follow(follower=self, followed=user)
db.session.add(f)
def unfollow(self, user):
f = self.followed.filter_by(followed_id=user.id).first()
if f:
db.session.delete(f)
def is_following(self, user):
if user.id is None:
return False
return self.followed.filter_by(
followed_id=user.id).first() is not None
def is_followed_by(self, user):
if user.id is None:
return False
return self.followers.filter_by(
follower_id=user.id).first() is not None
@property
def followed_posts(self):
return Post.query.join(Follow, Follow.followed_id == Post.author_id)\
.filter(Follow.follower_id == self.id)
def to_json(self):
json_user = {
'url': url_for('api.get_user', id=self.id),
'username': self.username,
'member_since': self.member_since,
'last_seen': self.last_seen,
'posts_url': url_for('api.get_user_posts', id=self.id),
'followed_posts_url': url_for('api.get_user_followed_posts',
id=self.id),
'post_count': self.posts.count()
}
return json_user
def generate_auth_token(self, expiration):
s = Serializer(current_app.config['SECRET_KEY'],
expires_in=expiration)
return s.dumps({'id': self.id}).decode('utf-8')
@staticmethod
def verify_auth_token(token):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return None
return User.query.get(data['id'])
def __repr__(self):
return '<User %r>' % self.username
class AnonymousUser(AnonymousUserMixin):
def can(self, permissions):
return False
def is_administrator(self):
return False
login_manager.anonymous_user = AnonymousUser
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
class Post(db.Model):
__tablename__ = 'posts'
id = db.Column(db.Integer, primary_key=True)
body = db.Column(db.Text)
body_html = db.Column(db.Text)
timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
author_id = db.Column(db.Integer, db.ForeignKey('users.id'))
comments = db.relationship('Comment', backref='post', lazy='dynamic')
@staticmethod
def on_changed_body(target, value, oldvalue, initiator):
allowed_tags = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code',
'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul',
'h1', 'h2', 'h3', 'p']
target.body_html = bleach.linkify(bleach.clean(
markdown(value, output_format='html'),
tags=allowed_tags, strip=True))
def to_json(self):
json_post = {
'url': url_for('api.get_post', id=self.id),
'body': self.body,
'body_html': self.body_html,
'timestamp': self.timestamp,
'author_url': url_for('api.get_user', id=self.author_id),
'comments_url': url_for('api.get_post_comments', id=self.id),
'comment_count': self.comments.count()
}
return json_post
@staticmethod
def from_json(json_post):
body = json_post.get('body')
if body is None or body == '':
raise ValidationError('post does not have a body')
return Post(body=body)
db.event.listen(Post.body, 'set', Post.on_changed_body)
class Comment(db.Model):
__tablename__ = 'comments'
id = db.Column(db.Integer, primary_key=True)
body = db.Column(db.Text)
body_html = db.Column(db.Text)
timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
disabled = db.Column(db.Boolean)
author_id = db.Column(db.Integer, db.ForeignKey('users.id'))
post_id = db.Column(db.Integer, db.ForeignKey('posts.id'))
@staticmethod
def on_changed_body(target, value, oldvalue, initiator):
allowed_tags = ['a', 'abbr', 'acronym', 'b', 'code', 'em', 'i',
'strong']
target.body_html = bleach.linkify(bleach.clean(
markdown(value, output_format='html'),
tags=allowed_tags, strip=True))
db.event.listen(Comment.body, 'set', Comment.on_changed_body)