Nuestro código Python está hecho con OpenCv, DeepFace y el modelo basado en FaceNet, el cual acepta imágenes etiquetadas y lo vuelve capaz de reconocer las personas que nosotros queramos enviando el reconocimiento 1 vez por día con nombre, fecha y hora de la persona.
import cv2 from deepface import DeepFace import numpy as np from scipy.spatial.distance import cosine import requests from datetime import datetime import time import pytz # Establecer la zona horaria de Santiago, Chile chile_tz = pytz.timezone("America/Santiago") # Configuración de la URL de Magic Loops magicloops_url = "xxxxxxxxxxxxx" # Diccionario para registrar la última fecha en que se envió la solicitud para cada persona last_sent_date = {} # Función para enviar la solicitud a Magic Loops def send_to_magicloops(recognized_name, min_distance): date_time = datetime.now(chile_tz) # Usar la zona horaria de Santiago data = { "nombre": recognized_name, "distancia": round(min_distance, 2), "fecha": date_time.strftime("%Y-%m-%d"), "hora": date_time.strftime("%H:%M:%S") } response = requests.post(magicloops_url, json=data) if response.status_code == 200: print("Webhook activado con éxito") else: print(f"Error: {response.status_code}") # Función para verificar si ya se envió una solicitud hoy para una persona def should_send_notification(name): today_date = datetime.now(chile_tz).date() # Usar la zona horaria de Santiago if name in last_sent_date: if last_sent_date[name] == today_date: return False # Ya se envió la solicitud hoy last_sent_date[name] = today_date # Actualiza la fecha de envío a hoy return True # Configuración de la base de datos con imágenes de referencia database = {} reference_images = { "joaquin": [ "imagen1.jpg", "imagen2.jpg" ] } # Generar embeddings para cada imagen y guardarlos en la base de datos for name, img_paths in reference_images.items(): embeddings = [] for img_path in img_paths: embedding = DeepFace.represent(img_path=img_path, model_name="Facenet", enforce_detection=False)[0]["embedding"] embeddings.append(embedding) database[name] = embeddings # Función para encontrar la persona en la base de datos con múltiples embeddings def recognize_person(face_img, database, model_name="Facenet", threshold=0.5): embedding_result = DeepFace.represent(img_path=face_img, model_name=model_name, enforce_detection=False) if not embedding_result: return "No se pudo generar el embedding." new_embedding = embedding_result[0]["embedding"] recognized_name = None min_distance = float("inf") # Comparar con cada persona en la base de datos for name, embeddings in database.items(): for embedding in embeddings: distance = cosine(new_embedding, embedding) if distance < min_distance and distance < threshold: min_distance = distance recognized_name = name if recognized_name: return recognized_name, min_distance else: return "Desconocido", None # Inicializar la captura de video desde la cámara del PC # Inicializar la captura de video desde la cámara del PC cap = None while cap is None or not cap.isOpened(): try: cap = cv2.VideoCapture("") if not cap.isOpened(): print("No se pudo conectar a la cámara. Intentando nuevamente en 5 segundos...") time.sleep(5) # Espera antes de intentar nuevamente except Exception as e: print(f"Error al intentar conectar a la cámara: {e}. Intentando nuevamente en 5 segundos...") time.sleep(5) # Espera antes de intentar nuevamente face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') # Variables para optimización last_recognition_time = time.time() while True: # Capturar frame por frame desde la cámara del PC ret, frame = cap.read() if not ret: print("No se pudo recibir el frame. Intentando reconectar...") cap.release() # Liberar la cámara actual cap = None # Resetear la variable de captura # Intentar reconectar a la cámara while cap is None or not cap.isOpened(): try: cap = cv2.VideoCapture("xxxxxxxxxxxxxx") if not cap.isOpened(): print("No se pudo conectar a la cámara. Intentando nuevamente en 5 segundos...") time.sleep(5) except Exception as e: print(f"Error al intentar conectar a la cámara: {e}. Intentando nuevamente en 5 segundos...") time.sleep(5) continue # Volver al inicio del bucle para reintentar # Reducir la resolución del frame (opcional) frame = cv2.resize(frame, (640, 480)) # Convertir a escala de grises para la detección de rostros gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) faces = face_cascade.detectMultiScale(gray, 1.3, 5) # Para cada rostro detectado, recortarlo y reconocerlo (solo cada cierto tiempo) for (x, y, w, h) in faces: face_img = frame[y:y+h, x:x+w] # Solo reconocer si ha pasado un cierto tiempo desde la última vez current_time = time.time() if current_time - last_recognition_time > 2: # Reconocer solo cada 2 segundos recognized_name, min_distance = recognize_person(face_img, database) last_recognition_time = current_time # Actualizar el tiempo de reconocimiento # Dibujar el rectángulo y poner el nombre en el frame de video cv2.rectangle(frame, (x, y), (x+w, y+h), (255, 0, 0), 2) if recognized_name != "Desconocido": cv2.putText(frame, f"{recognized_name} ({min_distance:.2f})", (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (36,255,12), 2) # Enviar a Magic Loops solo si no se ha enviado una notificación hoy if should_send_notification(recognized_name): send_to_magicloops(recognized_name, min_distance) else: cv2.putText(frame, "Desconocido", (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2) # Para salir del bucle, presiona 'q' if cv2.waitKey(1) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows()
Función en Cloud Function que se encarga de la inserción de los datos de la persona detectada en una colección de base de datos NoSQL.
import json import firebase_admin from firebase_admin import credentials, firestore import functions_framework # Inicializar Firebase Admin SDK cred = credentials.Certificate("firebase-adminsdk.json") # Ruta al archivo JSON de las credenciales de Firebase firebase_admin.initialize_app(cred) # Obtener la referencia a Firestore db = firestore.client() # Función para insertar datos en Firestore def insert_data_to_firestore(data): try: # Referencia a la colección "detecciones" detecciones_ref = db.collection("detecciones") # Crear un nuevo documento en la colección "detecciones" deteccion = { "nombre": data["nombre"], "distancia": data["distancia"], "fecha": data["fecha"], "hora": data["hora"] } # Insertar los datos como un nuevo documento detecciones_ref.add(deteccion) print("Datos insertados correctamente en Firestore.") except Exception as e: print(f"Error al insertar los datos en Firestore: {e}") @functions_framework.http def webhook(request): if request.method != 'POST': return 'Método no permitido', 405 # Obtener el JSON recibido request_json = request.get_json(silent=True) if request_json is None: return 'No JSON recibido', 400 # Insertar los datos en Firestore insert_data_to_firestore(request_json) return json.dumps({'status': 'ok'}), 200
Función en Cloud Function que se encarga de notificar a la aplicacion mobile de la deteccion de una nueva persona.
// Importar Firebase Admin SDK y Expo Server SDK const admin = require("firebase-admin"); const {Expo} = require("expo-server-sdk"); const {onDocumentCreated} = require("firebase-functions/v2/firestore"); // Inicializar Firebase Admin SDK admin.initializeApp(); // Crear una instancia de Expo SDK const expo = new Expo(); // Función para escuchar cambios en Firestore usando Firestore v2 exports.notificarLlegada = onDocumentCreated( "detecciones/{documentId}", async (event) => { const snap = event.data; // Obtener los datos del nuevo documento const deteccionData = snap.data(); console.log("Nuevo documento en detecciones:", deteccionData); // Recuperar todos los tokens desde la colección 'push_tokens' const tokensSnapshot = await admin .firestore() .collection("push_tokens") .get(); const tokens = []; tokensSnapshot.forEach((doc) => { const token = doc.data().token; if (token && Expo.isExpoPushToken(token)) { tokens.push(token); } }); if (tokens.length === 0) { console.log("No hay tokens disponibles para enviar notificaciones"); return null; } // Crear el mensaje de notificación const message = { to: tokens, // Los tokens de Expo Push sound: "default", title: "¡Llegó alguien!", body: "Se ha agregado una nueva detección.", }; // Enviar la notificación usando Expo Push Notifications try { const ticket = await expo.sendPushNotificationsAsync([message]); console.log("Notificación enviada:", ticket); return null; } catch (error) { console.error("Error al enviar la notificación:", error); return null; } }, );
Este es el servicio que se encarga de manejar los tokens para las notificaciones, maneja la logica para evitar replicar tokens.
import { useEffect } from 'react'; import { Alert, Platform } from 'react-native'; import * as Notifications from 'expo-notifications'; import { collection, addDoc, query, where, getDocs, deleteDoc, updateDoc } from 'firebase/firestore'; import { db } from '../FirebaseConfig'; import { obtenerDetecciones } from './detecciones'; // Asegúrate de importar esta función si no lo has hecho const TOKEN_COLLECTION = 'push_tokens'; // Función para verificar si un token ya existe const checkExistingToken = async (token) => { try { const q = query(collection(db, TOKEN_COLLECTION), where("token", "==", token)); const querySnapshot = await getDocs(q); return !querySnapshot.empty; } catch (error) { console.error('Error checking existing token:', error); return false; } }; // Función para eliminar tokens antiguos del mismo dispositivo const removeOldTokens = async (deviceId) => { try { const q = query(collection(db, TOKEN_COLLECTION), where("deviceId", "==", deviceId)); const querySnapshot = await getDocs(q); const deletePromises = querySnapshot.docs.map(doc => deleteDoc(doc.ref) ); await Promise.all(deletePromises); } catch (error) { console.error('Error removing old tokens:', error); } }; const registerForPushNotifications = async () => { try { // Solicitar permisos const { status: existingStatus } = await Notifications.getPermissionsAsync(); let finalStatus = existingStatus; if (existingStatus !== 'granted') { const { status } = await Notifications.requestPermissionsAsync(); finalStatus = status; } if (finalStatus !== 'granted') { Alert.alert('Error', 'Failed to get push token for push notification!'); return; } // Generar token const expoPushToken = await Notifications.getExpoPushTokenAsync({ projectId: '754445f8-5994-4a9f-82a7-11c6c302c05b' }); // Verificar si el token ya existe const tokenExists = await checkExistingToken(expoPushToken.data); if (tokenExists) { console.log('Token already registered'); return expoPushToken.data; } // Generar ID único para el dispositivo const deviceId = await Notifications.getDevicePushTokenAsync(); // Eliminar tokens antiguos del mismo dispositivo await removeOldTokens(deviceId.data); // Guardar nuevo token en Firestore await addDoc(collection(db, TOKEN_COLLECTION), { token: expoPushToken.data, deviceId: deviceId.data, createdAt: new Date().toISOString(), platform: Platform.OS, lastActive: new Date().toISOString() }); return expoPushToken.data; } catch (error) { console.error('Error in push notification setup:', error); Alert.alert('Error', 'Something went wrong while setting up notifications'); } }; // Función para actualizar la última actividad del token const updateTokenActivity = async (token) => { try { const q = query(collection(db, TOKEN_COLLECTION), where("token", "==", token)); const querySnapshot = await getDocs(q); if (!querySnapshot.empty) { await updateDoc(querySnapshot.docs[0].ref, { lastActive: new Date().toISOString() }); } } catch (error) { console.error('Error updating token activity:', error); } }; // Función para limpiar token al cerrar sesión const cleanupToken = async (token) => { try { const q = query(collection(db, TOKEN_COLLECTION), where("token", "==", token)); const querySnapshot = await getDocs(q); if (!querySnapshot.empty) { await deleteDoc(querySnapshot.docs[0].ref); } } catch (error) { console.error('Error cleaning up token:', error); } }; // Hook mejorado con cleanup y notificaciones export const useNotificationSetup = (actualizarTabla) => { useEffect(() => { let token; const setup = async () => { token = await registerForPushNotifications(); // Actualizar actividad cada hora const interval = setInterval(() => { if (token) updateTokenActivity(token); }, 3600000); // 1 hora // Escuchar la respuesta de la notificación cuando el usuario la toca const subscription = Notifications.addNotificationResponseReceivedListener(async (response) => { console.log('Notificación tocada:', response); // Aquí actualizamos los datos al tocar la notificación await actualizarTabla(); }); return () => { clearInterval(interval); if (token) cleanupToken(token); subscription.remove(); }; }; setup(); }, [actualizarTabla]); // Agregar `actualizarTabla` como dependencia };