You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

461 lines
12 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from peewee import *
import datetime
from passlib.context import CryptContext
import re
from utils.ClientModels import database_proxy
import logging
# database_proxy = Proxy()
logger = logging.getLogger(__name__)
# re
USER_RE = re.compile(r"^[a-zA-Z0-9_-]{3,20}$")
PASS_RE = re.compile(r"^.{3,20}$")
# Setting up passlib.
pwd_context = CryptContext(
schemes=["pbkdf2_sha256", ],
default="pbkdf2_sha256",
all__vary_rounds=0.1,
pbkdf2_sha256__default_rounds=20000,
)
class MyBaseModel(Model):
"""
实现一个新的基类Model是peewee的基类;
新增get_or_none()接口封装peewee的get()接口,
查询不到返回None而不抛出异常。
"""
class Meta:
database = database_proxy
is_valid = BooleanField(default=True)
created = TimestampField()
modified = TimestampField()
revision = IntegerField(default=0)
@classmethod
def get_or_none(cls, *args, **kwargs):
try:
return cls.get(*args, **kwargs)
except DoesNotExist:
return None
def save(self, *args, **kwargs):
self.modified = datetime.datetime.now()
self.revision += 1
super(MyBaseModel, self).save(*args, **kwargs)
class User(MyBaseModel):
username = CharField(unique=True)
name = CharField(default='')
password = CharField(default='')
class Meta:
order_by = ('username',)
def crypt_password(self, password):
"""
加密
:param password:
:return: cls.password
"""
self.password = pwd_context.encrypt(password)
return self.password
def verify_password(self, password):
"""
验证密码
:param password:
:return: True/False
"""
return pwd_context.verify(password, self.password)
@classmethod
def username_valid(cls, username):
"""
用户名格式
:param username:
:return: True/False
"""
if USER_RE.match(username):
return True
else:
return False
@classmethod
def password_valid(cls, password):
"""
密码格式
:param password:
:return: True/False
"""
if PASS_RE.match(password):
if len(password) < 5 or len(password) > 14:
return False
else:
return True
else:
return False
@classmethod
def create_user(cls, username, password, **kwargs):
"""
新建用户
:param username:
:param password:
:param kwargs:
:return: User object
"""
if User.username_valid(username=username):
# 用户名格式
encrypted_username = username
else:
return False
if User.password_valid(password=password):
# 密码格式且长度大于等于6小于等于14
encrypted_password = pwd_context.encrypt(password)
else:
return False
return User.create(username=encrypted_username, password=encrypted_password, **kwargs)
@classmethod
def get_user(cls, user_id):
"""
根据user_id获取用户
:param user_id:
:return: User object
"""
return cls.get_or_none(cls.id == user_id)
@classmethod
def get_user_by_username(cls, username):
"""
:return: User object
"""
return cls.get_or_none(cls.username == username)
def assign_group(self, group_id):
"""
给用户分配组
:param group_id:
:return: UserGroup object
"""
return UserGroup.add_user_to_group(user_id=self.id, group_id=group_id)
def change_password(self, old_password, new_password):
"""
修改密码
:param old_password:
:param new_password:
:return: 修改成功后保存返回 True,否则返回 False
"""
# invalid format of old password is considered invalid to save processing
if User.password_valid(old_password):
if not self.verify_password(old_password):
return False
# verify new password, format and length
if User.password_valid(new_password):
if old_password == new_password:
return False
else:
self.password = self.crypt_password(password=new_password)
self.save()
return True
return False
class Operation(MyBaseModel):
"""
操作表
"""
key = CharField(unique=True)
name = CharField(default='')
parent_key = CharField(default='')
detail = CharField(default='')
level = IntegerField(default=0)
sort = IntegerField(default=0)
@classmethod
def get_operation(cls,name):
return cls.get_or_none(cls.name == name)
@classmethod
def get_operation_id(cls,key):
return cls.get(cls.key == key).id
@classmethod
def create_operation(cls, key, **kwargs):
"""
新加操作
:param key:
:param kwargs:
:return: key/Operation object
"""
operation = cls.get_or_none(cls.key == key)
if operation:
return operation.key
else:
return cls.create(key=key, **kwargs)
def assign_group(self, group_id):
"""
给操作分配组
:param group_id:
:return: GroupOperatePermission object
"""
return GroupOperatePermission.add_operation_to_group(operation_id=self.id, group_id=group_id)
class Group(MyBaseModel):
name = CharField(unique=True)
detail = CharField(default='')
@classmethod
def create_group(cls, name, **kwargs):
"""
新建组
:param name:
:return: name/Group object
"""
group = cls.get_or_none(cls.name == name)
if group:
return group.name
else:
return cls.create(name=name, **kwargs)
@classmethod
def get_group(cls, group_id):
"""
根据group_id获取组
:param group_id:
:return: Group object/None
"""
return cls.get_or_none(cls.id == group_id)
def add_user(self, user_id):
"""
组中添加用户
:param user_id:
:return: UserGroup object
"""
return UserGroup.add_user_to_group(user_id=user_id, group_id=self.id)
def add_operation(self, operation_id):
"""
组中添加操作
:param operation_id:
:return: GroupOperatePermission object
"""
return GroupOperatePermission.add_operation_to_group(operation_id=operation_id, group_id=self.id)
class UserGroup(MyBaseModel):
"""
用户组表
"""
class Meta:
primary_key = CompositeKey('user', 'group')
user = ForeignKeyField(User)
group = ForeignKeyField(Group)
@classmethod
def get_ug_by_group_id(cls, group_id):
"""
根据group_id获取用户组
:param group_id:
:return: UserGroup SelectQuery
"""
return cls.select(UserGroup, Group).join(Group).where(cls.group == group_id)
@classmethod
def get_ug_by_user_id(cls, user_id):
"""
根据user_id获取用户组
:param user_id:
:return: UserGroup SelectQuery
"""
return cls.select(UserGroup, User).join(User).where(cls.user == user_id)
@classmethod
def add_user_to_group(cls, user_id, group_id):
"""
添加用户到组里
:param user_id:
:param group_id:
:return: UserGroup object
"""
if cls.user_is_a_member_of_group(user_id=user_id, group_id=group_id):
return False
else:
return cls.create(user=user_id, group=group_id)
@classmethod
def delete_user_from_group(cls, user_id, group_id):
"""
从组里删除用户
:param user_id:
:param group_id:
:return: None
"""
try:
ug = cls.get(cls.user == user_id, cls.group == group_id)
return ug.delete_instance()
except UserGroup.DoesNotExist:
return False
@classmethod
def user_is_a_member_of_group(cls, user_id, group_id):
"""
用户是否是组的成员
:param user_id:
:param group_id:
:return: True/False
"""
query = cls.select().where(cls.user == user_id, cls.group == group_id)
if len(query) == 0:
return False
elif len(query) == 1:
return True
else:
return False
@classmethod
def get_groups_of_user(cls, user_id):
"""
获取用户所属的所有组
:param user_id:
:return: lists
"""
lists = []
ugs = cls.get_ug_by_user_id(user_id=user_id)
for ug in ugs:
lists.append(ug.group.name)
return lists
@classmethod
def get_users_in_group(cls, group_id):
"""
获取组中所有的用户
:param group_id:
:return: lists
"""
lists = []
ugs = cls.get_ug_by_group_id(group_id=group_id)
for ug in ugs:
lists.append(ug.user.username)
return lists
class GroupOperatePermission(MyBaseModel):
"""
授权表
"""
class Meta:
primary_key = CompositeKey('group', 'operation')
group = ForeignKeyField(Group)
operation = ForeignKeyField(Operation)
@classmethod
def get_gop_by_group_id(cls, group_id):
"""
根据group_id获取授权表
:param group_id:
:return: GroupOperatePermission SelectQuery
"""
return cls.select(GroupOperatePermission, Group).join(Group).where(cls.group == group_id)
@classmethod
def get_gop_by_operation_id(cls, operation_id):
"""
根据operation_id获取授权表
:param operation_id:
:return: GroupOperatePermission SelectQuery
"""
return cls.select(GroupOperatePermission, Operation).join(Operation).where(cls.operation == operation_id)
@classmethod
def add_operation_to_group(cls, operation_id, group_id):
"""
增加操作到组
:param operation_id:
:param group_id:
:return: GroupOperatePermission object
"""
if cls.operation_is_a_member_of_group(operation_id=operation_id, group_id=group_id):
return False
else:
return cls.create(operation=operation_id, group=group_id)
@classmethod
def delete_operation_from_group(cls, operation_id, group_id):
"""
从组中删除操作
:param operation_id:
:param group_id:
:return: None/False
"""
try:
gop = cls.get(cls.operation == operation_id, cls.group == group_id)
return gop.delete_instance()
except GroupOperatePermission.DoesNotExist:
return False
@classmethod
def operation_is_a_member_of_group(cls, operation_id, group_id):
"""
判断成员
:param operation_id:
:param group_id:
:return: True/False
"""
query = cls.select().where(cls.operation == operation_id, cls.group == group_id)
if len(query) == 0:
return False
elif len(query) == 1:
return True
else:
return False
@classmethod
def get_groups_with_operation(cls, operation_id):
"""
获取拥有某个操作的所有组
:param operation_id:
:return: lists
"""
lists = []
gops = cls.get_gop_by_operation_id(operation_id=operation_id)
for gop in gops:
lists.append(gop.group.name)
return lists
@classmethod
def get_operations_of_group(cls, group_id):
"""
获取某一个组中所有的操作
:param group_id:
:return: lists
"""
lists = []
gops = cls.get_gop_by_group_id(group_id=group_id)
for gop in gops:
lists.append(gop.operation.name)
return lists
def getPermission(userName):
try:
userId = User.get_user_by_username(userName).id
groupId = UserGroup.get_ug_by_user_id(userId)[0].group
permissionList = GroupOperatePermission.get_operations_of_group(groupId)
except:
return None
return permissionList