244 lines
7.9 KiB
Python
244 lines
7.9 KiB
Python
|
import piexif
|
|||
|
from PIL import Image
|
|||
|
import face_recognition
|
|||
|
from datetime import datetime
|
|||
|
from models import db, User, Photo
|
|||
|
from werkzeug.utils import secure_filename
|
|||
|
import os
|
|||
|
import numpy as np
|
|||
|
|
|||
|
def process_face_encoding(photo_path):
|
|||
|
"""提取人脸特征编码并裁剪出最大的面部区域"""
|
|||
|
# 加载图片
|
|||
|
image = face_recognition.load_image_file(photo_path)
|
|||
|
|
|||
|
# 获取所有人脸的位置
|
|||
|
face_locations = face_recognition.face_locations(image)
|
|||
|
|
|||
|
if not face_locations:
|
|||
|
return None # 没有检测到人脸
|
|||
|
|
|||
|
# 找到最大的人脸
|
|||
|
largest_face = max(face_locations, key=lambda x: (x[2] - x[0]) * (x[3] - x[1])) # 按照面积选取最大的人脸
|
|||
|
|
|||
|
# 获取该人脸的特征编码
|
|||
|
face_encodings = face_recognition.face_encodings(image, [largest_face])
|
|||
|
|
|||
|
if face_encodings:
|
|||
|
# 裁剪出最大的人脸部分
|
|||
|
top, right, bottom, left = largest_face
|
|||
|
image_pil = Image.open(photo_path)
|
|||
|
cropped_face = image_pil.crop((left, top, right, bottom))
|
|||
|
|
|||
|
# 保存裁剪后的图像(可选)
|
|||
|
cropped_face_path = photo_path.replace(".jpg", "_cropped.jpg")
|
|||
|
cropped_face.save(cropped_face_path)
|
|||
|
|
|||
|
return face_encodings[0], cropped_face_path # 返回面部编码和裁剪后的人脸图片路径
|
|||
|
|
|||
|
return None # 如果没有提取到人脸特征编码
|
|||
|
|
|||
|
|
|||
|
def add_user(username, userphoto):
|
|||
|
"""添加用户并存储人脸特征"""
|
|||
|
if userphoto:
|
|||
|
filename = secure_filename(userphoto.filename)
|
|||
|
user_dir = os.path.join("static/uploads/users",username)
|
|||
|
user_dir =user_dir.replace("\\","/")
|
|||
|
os.makedirs(user_dir, exist_ok=True)
|
|||
|
photo_path = os.path.join(user_dir,"photo.jpg")
|
|||
|
photo_path = photo_path.replace("\\","/")
|
|||
|
|
|||
|
|
|||
|
# 保存头像
|
|||
|
userphoto.save(photo_path)
|
|||
|
|
|||
|
# 处理人脸特征
|
|||
|
face_encoding = process_face_encoding(photo_path)
|
|||
|
if face_encoding is None:
|
|||
|
print("未检测到人脸,无法添加用户")
|
|||
|
return False
|
|||
|
|
|||
|
# 存储到数据库
|
|||
|
new_user = User(username=username, encoding=face_encoding, photo=photo_path)
|
|||
|
db.session.add(new_user)
|
|||
|
db.session.commit()
|
|||
|
return True
|
|||
|
return False
|
|||
|
|
|||
|
|
|||
|
|
|||
|
# 删除用户
|
|||
|
def delete_user(user_id):
|
|||
|
try:
|
|||
|
user = User.query.get(user_id)
|
|||
|
if user:
|
|||
|
db.session.delete(user)
|
|||
|
db.session.commit()
|
|||
|
return True
|
|||
|
return False
|
|||
|
except Exception as e:
|
|||
|
print(e)
|
|||
|
return False
|
|||
|
|
|||
|
# 编辑用户
|
|||
|
def edit_user(user_id, username, userphoto):
|
|||
|
try:
|
|||
|
user = User.query.get(user_id)
|
|||
|
if user:
|
|||
|
user.username = username
|
|||
|
if userphoto:
|
|||
|
photo_path = os.path.join('static', 'uploads', 'users', str(user.id))
|
|||
|
os.makedirs(photo_path, exist_ok=True)
|
|||
|
photo_file = os.path.join(photo_path, userphoto.filename)
|
|||
|
userphoto.save(photo_file)
|
|||
|
# 更新照片路径
|
|||
|
photo = Photo.query.filter_by(user_id=user.id).first()
|
|||
|
photo.photo_path = photo_file
|
|||
|
db.session.commit()
|
|||
|
return True
|
|||
|
return False
|
|||
|
except Exception as e:
|
|||
|
print(e)
|
|||
|
return False
|
|||
|
|
|||
|
# 获取用户
|
|||
|
def get_user(user_id):
|
|||
|
return User.query.get(user_id)
|
|||
|
|
|||
|
def save_temp_photo(photo):
|
|||
|
"""保存临时照片"""
|
|||
|
temp_dir = 'static/uploads/temp'
|
|||
|
os.makedirs(temp_dir, exist_ok=True)
|
|||
|
photo_path = os.path.join(temp_dir, photo.filename).replace("\\", "/")
|
|||
|
photo.save(photo_path)
|
|||
|
return photo_path
|
|||
|
|
|||
|
|
|||
|
def get_largest_face(image_path):
|
|||
|
"""检测并裁剪最大的人脸"""
|
|||
|
image = face_recognition.load_image_file(image_path)
|
|||
|
face_locations = face_recognition.face_locations(image)
|
|||
|
|
|||
|
if not face_locations:
|
|||
|
return None # 没有检测到人脸
|
|||
|
|
|||
|
# 找到最大的人脸
|
|||
|
largest_face = max(face_locations, key=lambda loc: (loc[2] - loc[0]) * (loc[3] - loc[1]))
|
|||
|
top, right, bottom, left = largest_face
|
|||
|
|
|||
|
# 裁剪最大的人脸
|
|||
|
with Image.open(image_path) as img:
|
|||
|
face_img = img.crop((left, top, right, bottom))
|
|||
|
face_crop_path = image_path.replace(".jpg", "_face.jpg") # 生成裁剪后的人脸图片路径
|
|||
|
face_img.save(face_crop_path)
|
|||
|
|
|||
|
return face_crop_path
|
|||
|
|
|||
|
def match_face(face_path):
|
|||
|
"""匹配人脸并返回匹配的用户"""
|
|||
|
face_image = face_recognition.load_image_file(face_path)
|
|||
|
face_encodings = face_recognition.face_encodings(face_image)
|
|||
|
|
|||
|
if not face_encodings:
|
|||
|
return None # 未检测到人脸
|
|||
|
|
|||
|
face_encoding = face_encodings[0] # 取第一张脸
|
|||
|
users = User.query.all()
|
|||
|
|
|||
|
for user in users:
|
|||
|
if user.encoding:
|
|||
|
user_encoding = np.array(user.encoding) # 从数据库中获取存储的编码
|
|||
|
match = face_recognition.compare_faces([user_encoding], face_encoding, tolerance=0.35)
|
|||
|
if match[0]:
|
|||
|
return user # 匹配成功
|
|||
|
return None # 未找到匹配
|
|||
|
|
|||
|
|
|||
|
|
|||
|
def get_exif_time(photo_path):
|
|||
|
"""提取 EXIF 时间"""
|
|||
|
try:
|
|||
|
with Image.open(photo_path) as img:
|
|||
|
exif_data = img._getexif()
|
|||
|
if exif_data:
|
|||
|
timestamp = exif_data.get(36867, None) # 36867 为 EXIF 拍摄时间
|
|||
|
return timestamp if timestamp else datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|||
|
except Exception as e:
|
|||
|
print(f"无法提取 EXIF 时间: {e}")
|
|||
|
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|||
|
|
|||
|
|
|||
|
def save_classified_photo(photo, user, timestamp):
|
|||
|
"""保存分类后的照片"""
|
|||
|
user_dir = os.path.join('static/uploads/users', str(user.id), timestamp)
|
|||
|
os.makedirs(user_dir, exist_ok=True)
|
|||
|
|
|||
|
new_photo_path = os.path.join(user_dir, photo.filename)
|
|||
|
|
|||
|
try:
|
|||
|
with Image.open(photo) as img:
|
|||
|
img.save(new_photo_path, quality=95)
|
|||
|
except Exception as e:
|
|||
|
print(f"保存图片时出错: {e}")
|
|||
|
|
|||
|
# 保存到数据库
|
|||
|
photo_record = Photo(user_id=user.id, created_at=timestamp, classification=True,
|
|||
|
classification_image_path=new_photo_path)
|
|||
|
db.session.add(photo_record)
|
|||
|
db.session.commit()
|
|||
|
|
|||
|
print(f"照片 {photo.filename} 已保存至 {new_photo_path}")
|
|||
|
|
|||
|
|
|||
|
def classify_photos(photos):
|
|||
|
"""处理上传的照片"""
|
|||
|
for photo in photos:
|
|||
|
temp_photo_path = save_temp_photo(photo)
|
|||
|
cropped_face_path = get_largest_face(temp_photo_path)
|
|||
|
|
|||
|
if not cropped_face_path:
|
|||
|
print(f"没有检测到人脸:{photo.filename}")
|
|||
|
continue
|
|||
|
|
|||
|
matched_user = match_face(cropped_face_path)
|
|||
|
timestamp = get_exif_time(temp_photo_path)
|
|||
|
|
|||
|
if matched_user:
|
|||
|
save_classified_photo(photo, matched_user, timestamp)
|
|||
|
else:
|
|||
|
print(f"没有找到匹配的人脸:{photo.filename}")
|
|||
|
|
|||
|
|
|||
|
# 根据姓名和时间查找照片
|
|||
|
def search_photos(name, date):
|
|||
|
user = User.query.filter_by(username=name).first()
|
|||
|
if not user:
|
|||
|
return []
|
|||
|
photos = Photo.query.filter_by(user_id=user.id).all()
|
|||
|
filtered_photos = []
|
|||
|
for photo in photos:
|
|||
|
# 过滤日期
|
|||
|
exif_data = piexif.load(photo.photo_path)
|
|||
|
date_taken = exif_data.get(piexif.ExifIFD.DateTimeOriginal, b"").decode("utf-8")
|
|||
|
if date_taken and date in date_taken:
|
|||
|
filtered_photos.append(photo)
|
|||
|
return filtered_photos
|
|||
|
|
|||
|
def find_match(image_path, known_face_encodings):
|
|||
|
# 加载图片
|
|||
|
unknown_image = face_recognition.load_image_file(image_path)
|
|||
|
|
|||
|
# 获取图片中的人脸特征编码
|
|||
|
unknown_face_encoding = face_recognition.face_encodings(unknown_image)
|
|||
|
|
|||
|
if unknown_face_encoding:
|
|||
|
unknown_face_encoding = unknown_face_encoding[0]
|
|||
|
# 设置一个匹配的阈值,例如0.6,值越低匹配越宽松
|
|||
|
matches = face_recognition.compare_faces(known_face_encodings, unknown_face_encoding, tolerance=0.6)
|
|||
|
|
|||
|
if True in matches:
|
|||
|
match_index = matches.index(True)
|
|||
|
return match_index
|
|||
|
return None
|