Files
django-vue3-admin-gd/backend/system/views/user.py
2025-09-02 17:50:32 +08:00

212 lines
7.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from django.db.models import Prefetch, F
from django.utils import timezone
from rest_framework import serializers
from rest_framework.authtoken.models import Token
from rest_framework.authtoken.views import ObtainAuthToken
from rest_framework.response import Response
from rest_framework.views import APIView
from django.contrib.auth.hashers import make_password
from rest_framework.permissions import IsAuthenticated
from django_filters import rest_framework as filters
from ip2geotools.databases.noncommercial import DbIpCity
from system.models import User, Menu, LoginLog, Dept
from utils.ip_utils import get_client_ip
from utils.serializers import CustomModelSerializer
from utils.custom_model_viewSet import CustomModelViewSet
class UserSerializer(CustomModelSerializer):
roles = serializers.SerializerMethodField() # 新增字段
depts = serializers.SerializerMethodField() # 新增字段
"""
用户数据 序列化器
"""
class Meta:
model = User
fields = '__all__'
read_only_fields = ['id', 'create_time', 'update_time']
def get_roles(self, obj):
"""
返回用户所有角色的名称列表
"""
return list(obj.role.values_list('name', flat=True))
def get_depts(self, obj):
# 返回所有部门名称列表
return list(obj.dept.values_list('name', flat=True))
def create(self, validated_data):
if 'password' in validated_data:
validated_data['password'] = make_password(validated_data['password'])
return super().create(validated_data)
def update(self, instance, validated_data):
if 'password' in validated_data:
validated_data['password'] = make_password(validated_data['password'])
return super().update(instance, validated_data)
class UserLogin(ObtainAuthToken):
def post(self, request, *args, **kwargs):
serializer = self.serializer_class(data=request.data,
context={'request': request})
serializer.is_valid(raise_exception=True)
user = serializer.validated_data['user']
token, created = Token.objects.get_or_create(user=user)
# 获取真实IP地址
client_ip = get_client_ip(request)
# 获取IP地址的地理位置信息
# location_info = self.get_location_from_ip(client_ip)
location_info = ''
# 更新登录IP和登录时间
user.login_ip = client_ip
user.last_login = timezone.now()
user.save(update_fields=['login_ip', 'last_login'])
user_data = UserSerializer(user).data
# 记录登录日志
LoginLog.objects.create(
username=user.username,
result=LoginLog.LoginResult.SUCCESS,
user_ip=client_ip,
location=location_info,
user_agent=request.META.get('HTTP_USER_AGENT', '')
)
# 在序列化后的数据中加入 accessToken
user_data['accessToken'] = token.key
return Response({
"code": 0,
"data": user_data,
"error": None,
"message": "ok"
})
def get_location_from_ip(self, ip_address):
"""根据IP地址获取地理位置信息"""
try:
# 对于本地IP地址返回默认值
if ip_address in ['127.0.0.1', 'localhost']:
return "本地网络"
# 查询IP地址信息
response = DbIpCity.get(ip_address, api_key='free')
# 构建地区信息字符串
location_parts = []
if response.city:
location_parts.append(response.city)
if response.region:
location_parts.append(response.region)
if response.country:
location_parts.append(response.country)
return ', '.join(location_parts) if location_parts else "未知位置"
except Exception as e:
# 记录错误日志
import logging
logger = logging.getLogger(__name__)
logger.error(f"获取IP地址位置信息失败: {str(e)}")
return "位置获取失败"
class UserInfo(APIView):
def get(self, request, *args, **kwargs):
user = self.request.user
user_data = UserSerializer(user).data
if user.is_superuser:
roles = ['admin']
# menus = Menu.objects.filter(pid__isnull=True).order_by('sort')
permissions = Menu.objects.filter(type='button').order_by('auth_code').values_list('auth_code', flat=True)
else:
roles = user.get_role_name
# menus = Menu.objects.filter(pid__isnull=True, role__users=user).order_by('sort').distinct()
permissions = Menu.objects.filter(type='button', role__users=user).order_by('auth_code').distinct().values_list('auth_code', flat=True)
# menus_data = MenuSerializer(menus, many=True).data
user_data['roles'] = roles
user_data['permissions'] = permissions
return Response({
"code": 0,
"data": user_data,
"error": None,
"message": "ok"
})
class Codes(APIView):
def get(self, request, *args, **kwargs):
return Response({
"code": 0,
"data": [
"AC_100100",
"AC_100110",
"AC_100120",
"AC_100010"
],
"error": None,
"message": "ok"
})
class UserFilter(filters.FilterSet):
# 多部门过滤,假设字段为 dept支持 ?dept=1,2,3
dept = filters.CharFilter(method='filter_dept')
# 名称和手机号模糊查询
username = filters.CharFilter(field_name='username', lookup_expr='icontains')
mobile = filters.CharFilter(field_name='mobile', lookup_expr='icontains')
class Meta:
model = User
fields = ['dept', 'username', 'nickname', 'mobile', 'status']
def filter_dept(self, queryset, name, value):
# value 可能是单个id或逗号分隔的多个id
dept_ids = [int(i) for i in value.split(',') if i]
all_ids = set()
for dept_id in dept_ids:
all_ids.update(get_dept_and_children_ids(dept_id))
return queryset.filter(dept__in=all_ids)
def get_dept_and_children_ids(dept_id):
# 递归查找所有子部门id
ids = [dept_id]
children = Dept.objects.filter(pid_id=dept_id)
for child in children:
ids.extend(get_dept_and_children_ids(child.id))
return ids
class UserViewSet(CustomModelViewSet):
"""
用户数据 视图集
"""
queryset = User.objects.filter(is_deleted=False).order_by('-id').prefetch_related('role', 'dept', 'post')
serializer_class = UserSerializer
read_only_fields = ['id', 'create_time', 'update_time', 'login_ip']
filterset_class = UserFilter
search_fields = ['username', 'nickname', 'mobile'] # 支持模糊搜索
ordering_fields = ['create_time', 'id']
ordering = ['-create_time']
class Logout(APIView):
permission_classes = [IsAuthenticated]
def post(self, request, *args, **kwargs):
# user = request.user
# 删除用户的Token
# Token.objects.filter(user=user).delete()
return Response({
"code": 0,
"data": None,
"error": None,
"message": "登出成功"
})