Merge remote-tracking branch 'origin/master'

This commit is contained in:
李强
2023-03-31 10:31:03 +08:00
312 changed files with 21647 additions and 25563 deletions

View File

@@ -33,6 +33,7 @@ class CoreInitialize:
path_file = os.path.join(apps.get_app_config(self.app.split('.')[-1]).path, 'fixtures',
f'init_{Serializer.Meta.model._meta.model_name}.json')
if not os.path.isfile(path_file):
print("文件不存在,跳过初始化")
return
with open(path_file,encoding="utf-8") as f:
for data in json.load(f):

View File

@@ -0,0 +1,155 @@
# -*- coding: utf-8 -*-
from rest_framework.decorators import action
from rest_framework.permissions import AllowAny
from dvadmin.utils.json_response import DetailResponse
class FastCrudMixin:
"""
定义快速CRUD数据操作的通用方法
"""
# 需要CRUD的字段
crud_fields = None
# 排除CRUD的字段
exclude_fields = None
# 自定义CRUD的JSON
custom_crud_json = None
# 需要修改的CRUD键值对
crud_update_key_value = None
# 将Django的字段类型处理为JS类型
def __handle_type(self, type):
if type in ['BigAutoField', 'CharField']:
return "input"
if type == 'DateTimeField':
return "datetime"
if type == 'DateField':
return "date"
if type == 'IntegerField':
return "number"
if type == 'BooleanField':
return "dict-switch"
# 获取字段属性信息
def __get_field_attribute(self):
result = []
queryset = self.get_queryset()
__name = ""
__verbose_name = ""
__type = "text"
# 判断指定CRUD字段
if self.crud_fields and type(self.crud_fields == list):
for item in self.crud_fields:
try:
field = queryset.model._meta.get_field(item)
field_type = field.get_internal_type()
__name = field.name
# 判断类型是否为外键类型,外键类型需要特殊方式获取verbose_name
if field_type in ['ForeignKey', 'OneToOneField', 'ManyToManyField']:
continue
# try:
# verbose_name = Users._meta.get_field(str(field.name)).verbose_name
# except:
# pass
else:
__verbose_name = field.verbose_name
__type = self.__handle_type(field_type)
except:
continue
result.append({"key": __name, "title": __verbose_name, "type": __type})
else:
# 获取model的所有字段及属性
model_fields = queryset.model._meta.get_fields()
# 遍历所有字段属性
for field in model_fields:
field_type = field.get_internal_type()
__name = field.name
# 判断需要排除的CRUD字段
if self.exclude_fields and type(self.exclude_fields == list):
if __name in self.exclude_fields:
continue
# 判断类型是否为外键类型,外键类型需要特殊方式获取verbose_name
if field_type in ['ForeignKey', 'OneToOneField', 'ManyToManyField']:
continue
# try:
# verbose_name = Users._meta.get_field(str(field.name)).verbose_name
# except:
# pass
else:
__verbose_name = field.verbose_name
__type = self.__handle_type(field_type)
result.append({"key": __name, "title": __verbose_name, "type": __type})
return result
#获取key
def __find_key(self,dct: dict,
target_key: str,
level: int = -1,
index: int = -1) -> tuple:
"""Find a key within a nested dictionary and return its level and index."""
for k, v in dct.items():
level += 1
index += 1
if k == target_key:
return level, index
elif isinstance(v, list):
for i, dct_ in enumerate(v):
if isinstance(dct_, dict):
result = self.__find_key(dct_, target_key)
if result is not None:
return result
else:
continue
elif isinstance(v, str) or isinstance(v, int) or isinstance(v, float):
continue
# 修改字典中key的value
def __update_nested_dict(self,nested_dict: dict,
target_key: str,
new_value) -> dict:
"""Update a nested dictionary with a new value."""
split_target_key = target_key.split('.')
if len(split_target_key) > 1:
new_dict = nested_dict[split_target_key[0]]
for item in split_target_key[1:-1]:
new_dict = new_dict[item]
self.__update_nested_dict(new_dict, split_target_key[-1], new_value)
else:
nested_dict[target_key] = new_value
return nested_dict
# 处理crud,返回columns
def __handle_crud(self):
result = self.__get_field_attribute()
columns = dict()
for item in result:
key = item.get('key')
title = item.get('title')
type = item.get('type')
columns[key] = {
"title": title,
"key": key,
"type": type
}
# 对自定义的crud配置合并
if self.custom_crud_json and isinstance(self.custom_crud_json,dict):
columns = columns | self.custom_crud_json
# 对curd进行修改配置
if self.crud_update_key_value and isinstance(self.crud_update_key_value,dict):
for key, value in self.crud_update_key_value.items():
columns = self.__update_nested_dict(columns,key,value)
return columns
@action(methods=['get'], detail=False,permission_classes=[AllowAny])
def init_crud(self, request):
self.permission_classes = [AllowAny]
columns = self.__handle_crud()
expose = "({expose,dict})=>{"
ret = "return {"
res = "}}"
data = f"""{expose}
{ret}
columns:{columns}
{res}
"""
return DetailResponse(data=data)

View File

@@ -11,14 +11,19 @@ import traceback
from django.db.models import ProtectedError
from django.http import Http404
from rest_framework.exceptions import APIException as DRFAPIException, AuthenticationFailed
from rest_framework.views import set_rollback
from rest_framework.exceptions import APIException as DRFAPIException, AuthenticationFailed, NotAuthenticated
from rest_framework.status import HTTP_401_UNAUTHORIZED
from rest_framework.views import set_rollback, exception_handler
from dvadmin.utils.json_response import ErrorResponse
logger = logging.getLogger(__name__)
class CustomAuthenticationFailed(NotAuthenticated):
# 设置 status_code 属性为 400
status_code = 400
def CustomExceptionHandler(ex, context):
"""
统一异常拦截处理
@@ -30,9 +35,14 @@ def CustomExceptionHandler(ex, context):
"""
msg = ''
code = 4000
# 调用默认的异常处理函数
response = exception_handler(ex, context)
if isinstance(ex, AuthenticationFailed):
code = 401
code_type = response.data.get('detail').code
if code_type == 'no_active_account':
code=400
return ErrorResponse(status=HTTP_401_UNAUTHORIZED)
msg = ex.detail
elif isinstance(ex,Http404):
code = 400

View File

@@ -20,7 +20,7 @@ from django_filters.rest_framework import DjangoFilterBackend
from django_filters.utils import get_model_field
from rest_framework.filters import BaseFilterBackend
from dvadmin.system.models import Dept, ApiWhiteList
from dvadmin.system.models import Dept, ApiWhiteList, RoleMenuButtonPermission
def get_dept(dept_id: int, dept_all_list=None, dept_list=None):
@@ -105,11 +105,21 @@ class DataLevelPermissionsFilter(BaseFilterBackend):
# (2, "本部门数据权限"),
# (3, "全部数据权限"),
# (4, "自定数据权限")
role_list = request.user.role.filter(status=1).values("admin", "data_range")
replace_str = re.compile('\d')
re_api = replace_str.sub('{id}', api)
role_id_list = request.user.role.values_list('id', flat=True)
role_permission_list=RoleMenuButtonPermission.objects.filter(
role__in=role_id_list,
role__status=1,
menu_button__api=re_api,
menu_button__method=method).values(
'data_range',
role_admin=F('role__admin')
)
dataScope_list = [] # 权限范围列表
for ele in role_list:
for ele in role_permission_list:
# 判断用户是否为超级管理员角色/如果拥有[全部数据权限]则返回所有数据
if 3 == ele.get("data_range") or ele.get("admin") == True:
if 3 == ele.get("data_range") or ele.get("role_admin") == True:
return queryset
dataScope_list.append(ele.get("data_range"))
dataScope_list = list(set(dataScope_list))

View File

@@ -20,12 +20,10 @@ class SuccessResponse(Response):
content_type=None,page=1,limit=1,total=1):
std_data = {
"code": 2000,
"data": {
"page": page,
"limit": limit,
"total": total,
"data": data
},
"page": page,
"limit": limit,
"total": total,
"data": data,
"msg": msg
}
super().__init__(std_data, status, template_name, headers, exception, content_type)

View File

@@ -10,7 +10,7 @@
from collections import OrderedDict
from django.core import paginator
from django.core.paginator import Paginator as DjangoPaginator
from django.core.paginator import Paginator as DjangoPaginator, InvalidPage
from rest_framework.pagination import PageNumberPagination
from rest_framework.response import Response
@@ -21,23 +21,65 @@ class CustomPagination(PageNumberPagination):
max_page_size = 999
django_paginator_class = DjangoPaginator
def paginate_queryset(self, queryset, request, view=None):
"""
Paginate a queryset if required, either returning a
page object, or `None` if pagination is not configured for this view.
"""
empty = True
page_size = self.get_page_size(request)
if not page_size:
return None
paginator = self.django_paginator_class(queryset, page_size)
page_number = request.query_params.get(self.page_query_param, 1)
if page_number in self.last_page_strings:
page_number = paginator.num_pages
try:
self.page = paginator.page(page_number)
except InvalidPage as exc:
# msg = self.invalid_page_message.format(
# page_number=page_number, message=str(exc)
# )
# raise NotFound(msg)
empty = False
pass
if paginator.num_pages > 1 and self.template is not None:
# The browsable API should display pagination controls.
self.display_page_controls = True
self.request = request
if not empty:
self.page = []
return list(self.page)
def get_paginated_response(self, data):
code = 2000
msg = 'success'
res = {
"page": int(self.get_page_number(self.request, paginator)) or 1,
"total": self.page.paginator.count,
"limit": int(self.get_page_size(self.request)) or 10,
"data": data
}
page =int(self.get_page_number(self.request, paginator)) or 1
total=self.page.paginator.count if self.page else 0
limit= int(self.get_page_size(self.request)) or 10
is_next= self.page.has_next()
is_previous= self.page.has_previous()
data=data
if not data:
code = 2000
msg = "暂无数据"
res['data'] = []
data = []
return Response(OrderedDict([
('code', code),
('msg', msg),
# ('total',self.page.paginator.count),
('data', res),
('page', page),
('limit', limit),
('total',total),
('is_next',is_next),
('is_previous', is_previous),
('data', data)
]))

View File

@@ -12,7 +12,7 @@ from django.contrib.auth.models import AnonymousUser
from django.db.models import F
from rest_framework.permissions import BasePermission
from dvadmin.system.models import ApiWhiteList
from dvadmin.system.models import ApiWhiteList, RoleMenuButtonPermission
def ValidationApi(reqApi, validApi):
@@ -81,7 +81,8 @@ class CustomPermission(BasePermission):
# ********#
if not hasattr(request.user, "role"):
return False
userApiList = request.user.role.values('permission__api', 'permission__method') # 获取当前用户的角色拥有的所有接口
role_id_list = request.user.role.values_list('id',flat=True)
userApiList = RoleMenuButtonPermission.objects.filter(role__in=role_id_list).values(permission__api=F('menu_button__api'), permission__method=F('menu_button__method')) # 获取当前用户的角色拥有的所有接口
ApiList = [
str(item.get('permission__api').replace('{id}', '([a-zA-Z0-9-]+)')) + ":" + str(
item.get('permission__method')) + '$' for item in userApiList if item.get('permission__api')]

View File

@@ -20,7 +20,8 @@ from dvadmin.utils.json_response import SuccessResponse, ErrorResponse, DetailRe
from dvadmin.utils.permission import CustomPermission
from django_restql.mixins import QueryArgumentsMixin
class CustomModelViewSet(ModelViewSet,ImportSerializerMixin,ExportSerializerMixin,QueryArgumentsMixin):
class CustomModelViewSet(ModelViewSet, ImportSerializerMixin, ExportSerializerMixin, QueryArgumentsMixin):
"""
自定义的ModelViewSet:
统一标准的返回格式;新增,查询,修改可使用不同序列化器
@@ -36,13 +37,13 @@ class CustomModelViewSet(ModelViewSet,ImportSerializerMixin,ExportSerializerMixi
update_serializer_class = None
filter_fields = '__all__'
search_fields = ()
extra_filter_backends = [DataLevelPermissionsFilter]
extra_filter_class = [DataLevelPermissionsFilter]
permission_classes = [CustomPermission]
import_field_dict = {}
export_field_label = {}
def filter_queryset(self, queryset):
for backend in set(set(self.filter_backends) | set(self.extra_filter_backends or [])):
for backend in set(set(self.filter_backends) | set(self.extra_filter_class or [])):
queryset = backend().filter_queryset(self.request, queryset, self)
return queryset
@@ -51,7 +52,6 @@ class CustomModelViewSet(ModelViewSet,ImportSerializerMixin,ExportSerializerMixi
return self.values_queryset
return super().get_queryset()
def get_serializer_class(self):
action_serializer_name = f"{self.action}_serializer_class"
action_serializer_class = getattr(self, action_serializer_name, None)
@@ -107,17 +107,17 @@ class CustomModelViewSet(ModelViewSet,ImportSerializerMixin,ExportSerializerMixi
instance.delete()
return DetailResponse(data=[], msg="删除成功")
keys = openapi.Schema(description='主键列表', type=openapi.TYPE_ARRAY, items=openapi.TYPE_STRING)
keys = openapi.Schema(description='主键列表',type=openapi.TYPE_ARRAY,items=openapi.TYPE_STRING)
@swagger_auto_schema(request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
required=['keys'],
properties={'keys': keys}
), operation_summary='批量删除')
@action(methods=['delete'],detail=False)
def multiple_delete(self,request,*args,**kwargs):
@action(methods=['delete'], detail=False)
def multiple_delete(self, request, *args, **kwargs):
request_data = request.data
keys = request_data.get('keys',None)
keys = request_data.get('keys', None)
if keys:
self.get_queryset().filter(id__in=keys).delete()
return SuccessResponse(data=[], msg="删除成功")