# -*- coding: utf-8 -*-# # ------------------------------------------------------------------------------- # Name: models # Description: 模型 # Author: GaoNingNing # Date: 2022/11/27 # ------------------------------------------------------------------------------- # -*- coding: utf-8 -*-# # ------------------------------------------------------------------------------- # Name: models # Description: # Author: GaoNingNing # Date: 2022/11/20 # ------------------------------------------------------------------------------- import hashlib from datetime import datetime import bleach from flask import current_app, request, url_for from authlib.jose import jwt, JoseError from markdown import markdown from wtforms import ValidationError from app import db from werkzeug.security import generate_password_hash, check_password_hash from flask_login import UserMixin, AnonymousUserMixin from . import login_manager class Permission: FOLLOW = 1 COMMENT = 2 WRITE = 4 MODERATE = 8 ADMIN = 16 class Role(db.Model): __tablename__ = 'roles' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), unique=True) default = db.Column(db.Boolean, default=False, index=True) permissions = db.Column(db.Integer) users = db.relationship('User', backref='role', lazy='dynamic') def __init__(self, **kwargs): super(Role, self).__init__(**kwargs) if self.permissions is None: self.permissions = 0 def add_permission(self, perm): if not self.has_permission(perm): self.permissions += perm def remove_permission(self, perm): if self.has_permission(perm): self.permissions -= perm def reset_permissions(self): self.permissions = 0 def has_permission(self, perm): return self.permissions & perm == perm @staticmethod def insert_roles(): roles = { 'User': [Permission.FOLLOW, Permission.COMMENT, Permission.WRITE], 'Moderator': [Permission.FOLLOW, Permission.COMMENT, Permission.WRITE, Permission.MODERATE], 'Administrator': [Permission.FOLLOW, Permission.COMMENT, Permission.WRITE, Permission.MODERATE, Permission.ADMIN], } default_role = 'User' for r in roles: role = Role.query.filter_by(name=r).first() if role is None: role = Role(name=r) role.reset_permissions() for perm in roles[r]: role.add_permission(perm) role.default = (role.name == default_role) db.session.add(role) db.session.commit() class Post(db.Model): __tablename__ = 'posts' id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String) body = db.Column(db.Text) body_html = db.Column(db.Text) timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow) update_timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow, onupdate=datetime.utcnow) author_id = db.Column(db.Integer, db.ForeignKey('users.id')) comments = db.relationship('Comment', backref='post', lazy='dynamic') @staticmethod def on_changed_body(target, value, oldvalue, initiator): allowed_tags = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1', 'h2', 'h3', 'p'] target.body_html = bleach.linkify(bleach.clean( markdown(value, output_format='html'), tags=allowed_tags, strip=True)) def to_json(self): json_post = { 'url': url_for('api.get_post', id=self.id), 'title': self.title, 'body': self.body, 'body_html': self.body_html, 'timestamp': self.timestamp, 'update_timestamp': self.update_timestamp, 'author_url': url_for('api.get_user', id=self.author_id), 'comments_url': url_for('api.get_post_comments', id=self.id), 'comment_count': self.comments.count() } return json_post @staticmethod def from_json(json_post): body = json_post.get('body') if body is None or body == '': raise ValidationError('post does not have a body') return Post(body=body) db.event.listen(Post.body, 'set', Post.on_changed_body) class Follow(db.Model): __tablename__ = 'follows' follower_id = db.Column(db.Integer, db.ForeignKey('users.id'), primary_key=True) followed_id = db.Column(db.Integer, db.ForeignKey('users.id'), primary_key=True) timestamp = db.Column(db.DateTime, default=datetime.utcnow) class User(UserMixin, db.Model): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(64), unique=True, index=True) real_avatar = db.Column(db.String(128), default=None) password_hash = db.Column(db.String(128)) role_id = db.Column(db.Integer, db.ForeignKey('roles.id')) name = db.Column(db.String(64)) location = db.Column(db.String(64)) about_me = db.Column(db.Text()) member_since = db.Column(db.DateTime(), default=datetime.utcnow) last_seen = db.Column(db.DateTime(), default=datetime.utcnow) posts = db.relationship('Post', backref='author', lazy='dynamic') followed = db.relationship('Follow', foreign_keys=[Follow.follower_id], backref=db.backref('follower', lazy='joined'), lazy='dynamic', cascade='all, delete-orphan') followers = db.relationship('Follow', foreign_keys=[Follow.followed_id], backref=db.backref('followed', lazy='joined'), lazy='dynamic', cascade='all, delete-orphan') comments = db.relationship('Comment', backref='author', lazy='dynamic') def __init__(self, **kwargs): super(User, self).__init__(**kwargs) if self.role is None: if self.role is None: self.role = Role.query.filter_by(default=True).first() self.follow(self) @staticmethod def add_self_follows(): for user in User.query.all(): if not user.is_following(user): user.follow(user) db.session.add(user) db.session.commit() @property def password(self): raise AttributeError('password is not a readable attribute') @password.setter def password(self, password): self.password_hash = generate_password_hash(password) def verify_password(self, password): return check_password_hash(self.password_hash, password) def __repr__(self): return '' % self.username def can(self, perm): return self.role is not None and self.role.has_permission(perm) def is_administrator(self): return self.can(Permission.ADMIN) def ping(self): self.last_seen = datetime.utcnow() db.session.add(self) db.session.commit() def follow(self, user): if not self.is_following(user): f = Follow(follower=self, followed=user) db.session.add(f) def unfollow(self, user): f = self.followed.filter_by(followed_id=user.id).first() if f: db.session.delete(f) def is_following(self, user): if user.id is None: return False return self.followed.filter_by( followed_id=user.id).first() is not None def is_followed_by(self, user): if user.id is None: return False return self.followers.filter_by( follower_id=user.id).first() is not None @property def followed_posts(self): return Post.query.join(Follow, Follow.followed_id == Post.author_id) \ .filter(Follow.follower_id == self.id) # def generate_auth_token(self, expiration): # s = Serializer(current_app.config['SECRET_KEY']) # return s.dumps({'id': self.id}) # # @staticmethod # def verify_auth_token(token): # s = Serializer(current_app.config['SECRET_KEY']) # try: # data = s.loads(token) # except: # return None # return User.query.get(data['id']) def generate_auth_token(self, **kwargs): """生成用于邮箱验证的JWT(json web token)""" # 签名算法 header = {'alg': 'HS256'} # 待签名的数据负载 data = {'id': self.id} data.update(**kwargs) token = jwt.encode(header=header, payload=data, key=current_app.config['SECRET_KEY']) return token.decode() @staticmethod def verify_auth_token(token): """用于验证用户注册和用户修改密码或邮箱的token, 并完成相应的确认操作""" try: data = jwt.decode(token, current_app.config['SECRET_KEY']) except JoseError: return False return User.query.get(data['id']) def to_json(self): json_user = { 'url': url_for('api.get_user', id=self.id), 'username': self.username, 'member_since': self.member_since, 'last_seen': self.last_seen, 'posts_url': url_for('api.get_user_posts', id=self.id), 'followed_posts_url': url_for('api.get_user_followed_posts', id=self.id), 'post_count': self.posts.count() } return json_user class AnonymousUser(AnonymousUserMixin): def can(self, permissions): return False def is_administrator(self): return False login_manager.anonymous_user = AnonymousUser class Comment(db.Model): __tablename__ = 'comments' id = db.Column(db.Integer, primary_key=True) body = db.Column(db.Text) body_html = db.Column(db.Text) timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow) disabled = db.Column(db.Boolean, default=False) author_id = db.Column(db.Integer, db.ForeignKey('users.id')) post_id = db.Column(db.Integer, db.ForeignKey('posts.id')) @staticmethod def on_changed_body(target, value, oldvalue, initiator): allowed_tags = ['a', 'abbr', 'acronym', 'b', 'code', 'em', 'i', 'strong'] target.body_html = bleach.linkify(bleach.clean( markdown(value, output_format='html'), tags=allowed_tags, strip=True)) def to_json(self): json_comment = { 'url': url_for('api.get_comment', id=self.id), 'post_url': url_for('api.get_post', id=self.post_id), 'body': self.body, 'body_html': self.body_html, 'timestamp': self.timestamp, 'author_url': url_for('api.get_user', id=self.author_id), } return json_comment @staticmethod def from_json(json_comment): body = json_comment.get('body') if body is None or body == '': raise ValidationError('comment does not have a body') return Comment(body=body) db.event.listen(Comment.body, 'set', Comment.on_changed_body) @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id))