CV_AG/Test_Track_updated.py

284 lines
11 KiB
Python

import cv2
from ultralytics import YOLO
from collections import deque
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
from influxdb_client import InfluxDBClient, Point, WriteOptions
import time
from datetime import datetime
import ssl
import os
import tkinter as tk
from tkinter import ttk
from PIL import Image, ImageTk
import threading
# InfluxDB Configuration
INFLUX_URL = "http://localhost:8086"
INFLUX_TOKEN = "export INFLUX_TOKEN=duVTQHPpHqr6WmdYfpSStqm-pxnvZHs-W0-3lXDnk8Tn6PGt59MlnTSR6egjMWdYvmL_ZI6xt3YUzGVBZHvc7w=="
INFLUX_ORG = "GAAIM"
INFLUX_BUCKET = "AGVIGNETTE"
# Connect to InfluxDB
client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
write_api = client.write_api(write_options=WriteOptions(batch_size=1))
# MQTT Setup
MQTT_BROKER = "192.168.8.172"
MQTT_TOPIC = "fruit/classification"
def start_loading():
for i in range(101): # 0 to 100%
time.sleep(0.38) # 0.4s * 100 = 40 seconds
progress_var.set(i)
progress_bar.update_idletasks()
root.destroy()
# Set up full-screen window
root = tk.Tk()
root.title("Starting Up")
root.attributes('-fullscreen', True)
# Get screen size
screen_width = root.winfo_screenwidth()
screen_height = root.winfo_screenheight()
# Load and resize the background image
try:
bg_img = Image.open("comicrobodog.png") # Replace with your image
bg_img = bg_img.resize((screen_width, screen_height), Image.ANTIALIAS)
bg_photo = ImageTk.PhotoImage(bg_img)
# Set as background using a label
bg_label = tk.Label(root, image=bg_photo)
bg_label.place(x=0, y=0, relwidth=1, relheight=1)
except Exception as e:
print("Error loading background image:", e)
root.configure(bg='black') # Fallback
# Overlay content frame (transparent background)
overlay = tk.Frame(root, bg='', padx=20, pady=20)
overlay.place(relx=0.5, rely=0.5, anchor='center')
# Message label
label = tk.Label(
overlay,
text="Computer Vision Vignette is Starting Up",
font=("Helvetica", 32, "bold"),
fg="white"
)
label.pack(pady=10)
# Progress bar
progress_var = tk.IntVar()
progress_bar = ttk.Progressbar(
overlay,
maximum=100,
variable=progress_var,
length=800
)
progress_bar.pack(pady=20)
# Start the progress in a thread
threading.Thread(target=start_loading, daemon=True).start()
# Close on ESC
root.bind("<Escape>", lambda e: root.destroy())
root.mainloop()
mqtt_client = mqtt.Client()
# Set up TLS/SSL for MQTT connection
mqtt_client.connect(MQTT_BROKER, 1883, 60000)
# Allow duplicate loading of OpenMP runtime
os.environ["KMP_DUPLICATE_LIB_OK"] = "True"
# Define the official YAML configuration file path (adjust as needed)
yaml_path = "botsort.yaml"
# Camera index (default camera index, 1 indicates an external camera)
camera_index = 0
cap = cv2.VideoCapture(camera_index)
cap.set(cv2.CAP_PROP_FPS, 30)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Load the YOLO model
model = YOLO(r"/Users/ag_cv_gaaim/Desktop/CV_AG/runs/detect/train4/weights/best.pt") # Load custom model
# Define class labels
class_labels = {
0: "Bruised",
1: "DefectiveLemon",
2: "GoodLemon",
3: "NotRipeLemon",
4: "Rotten"
}
# Apply smoothing to "DefectiveLemon", "GoodLemon", and "NotRipeLemon"
smoothing_labels = ["DefectiveLemon", "GoodLemon", "NotRipeLemon"]
# Smoothing parameters for sliding window
HISTORY_LENGTH = 20 # Number of recent frames
DEFECT_THRESHOLD = 0.3 # Threshold for "DefectiveLemon" proportion
GOOD_THRESHOLD = 0.7 # Threshold for "GoodLemon" and "NotRipeLemon" proportion
# State history for each target (used for smoothing), format: {ID: deque([...], maxlen=HISTORY_LENGTH)}
lemon_history = {}
lemon_send_history = []
# Set the display window to be resizable
cv2.namedWindow("Live Detection", cv2.WINDOW_NORMAL)
cv2.setWindowProperty("Live Detection", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
# Smoothing function:
# If the current detected label is not in smoothing_labels, clear the target's history and return the current label;
# Otherwise, add the current label to the history and return a smoothed label based on the proportion.
def get_smoothed_label(obj_id, current_label):
if current_label not in smoothing_labels:
if obj_id in lemon_history:
lemon_history[obj_id].clear()
return current_label
if obj_id not in lemon_history:
lemon_history[obj_id] = deque(maxlen=HISTORY_LENGTH)
lemon_history[obj_id].append(current_label)
history = lemon_history[obj_id]
defect_count = history.count("DefectiveLemon")
good_count = history.count("GoodLemon")
notripe_count = history.count("NotRipeLemon")
total = len(history)
if total == 0:
return current_label
if defect_count / total >= DEFECT_THRESHOLD:
return "DefectiveLemon"
elif good_count / total >= GOOD_THRESHOLD:
return "GoodLemon"
elif notripe_count / total >= GOOD_THRESHOLD:
return "NotRipeLemon"
else:
return history[-1]
# Use streaming tracking mode to maintain tracker state
results = model.track(
source=camera_index, # Get video stream directly from the camera
conf=0.3,
tracker=yaml_path, # Use the YAML configuration file
persist=True, # Persist tracking (do not reset)
stream=True, # Stream processing, not frame-by-frame calling
show=False,
device = 'mps' #'cpu'
)
# Create variables to store the tracking results
num_defective = 0
num_good = 0
num_notripe = 0
last_classification = None
# Iterate over streaming tracking results
for result in results:
frame = result.orig_img # Current frame
frame = cv2.flip(frame, 1)
detections = result.boxes # Detection box information
# Create bounding box for classification area
cv2.rectangle(frame, (0, 370), (1000, 700), (0, 0, 0), -1) # Black background for text
cv2.rectangle(frame, (0, 0), (1000, 200), (0, 0, 0), -1) # Black background for text
cv2.rectangle(frame, (600, 200), (660, 370), (255, 255, 255), 2)
cv2.putText(frame, "Classification Area", (560, 190), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
# Display the number of lemons in the top left corner
cv2.putText(frame, f"Defective: {num_defective}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
cv2.putText(frame, f"Good: {num_good}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(frame, f"Not Ripe: {num_notripe}", (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 100, 80), 2)
cv2.putText(frame, f"Last Classification: {last_classification}", (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
cv2.putText(frame, f"Total Lemons: {num_defective + num_good + num_notripe}", (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
for box in detections:
x1, y1, x2, y2 = map(int, box.xyxy[0]) # Detection box coordinates
# Adjust x-coordinates for the flipped frame
x1, x2 = width - x2, width - x1
obj_id = int(box.id) if box.id is not None else -1 # Tracking ID
class_id = int(box.cls) # Class ID
score = box.conf # Confidence
label = class_labels.get(class_id, "Unknown") # Get class name
# If target ID is valid
if obj_id != -1:
# If the detected label requires smoothing, use the smoothing function
if label in smoothing_labels:
final_label = get_smoothed_label(obj_id, label)
display_text = f"ID {obj_id} | {final_label}"
# Only print for targets with smoothed labels (only care about these three classes)
if final_label in smoothing_labels:
position = f"({x1}, {y1}, {x2}, {y2})"
print(f"ID: {obj_id}, Position: {position}, Label: {display_text}")
# Draw detection box and label with color based on classification
if final_label == "DefectiveLemon":
box_color = (100, 100, 255) # Red for defective
elif final_label == "NotRipeLemon":
box_color = (255, 100, 80) # Blue for unripe
elif final_label == "GoodLemon":
box_color = (0, 255, 0) # Green for good
else:
box_color = (255, 255, 255) # White for unknown or other classes
# Add background rectangle for text
text_size = cv2.getTextSize(display_text, cv2.FONT_HERSHEY_TRIPLEX, 0.6, 2)[0]
text_x, text_y = x1, y1 - 10
text_w, text_h = text_size[0], text_size[1]
cv2.rectangle(frame, (text_x, text_y - text_h - 5), (text_x + text_w, text_y + 5), (0, 0, 0), -1)
# Draw detection box and text
cv2.rectangle(frame, (x1, y1), (x2, y2), box_color, 2)
cv2.putText(frame, display_text, (text_x, text_y),
cv2.FONT_HERSHEY_TRIPLEX, 0.6, box_color, 2)
cv2.rectangle(frame, (500, 0), (1000, 170), (0, 0, 0), -1) # Black background for text
if x1 > 600 and x1 < 660 and y2 < 410 and y1 > 190 and obj_id not in lemon_send_history:
if final_label in ["DefectiveLemon", "NotRipeLemon", "GoodLemon"]:
mqtt_message = f"lemon_classification classification=\"{final_label}\" {int(time.time()*1e9)}"
lemon_send_history.append(obj_id)
mqtt_client.publish(MQTT_TOPIC, mqtt_message)
# Update Tracking Variables
if final_label == "DefectiveLemon":
num_defective += 1
elif final_label == "GoodLemon":
num_good += 1
elif final_label == "NotRipeLemon":
num_notripe += 1
last_classification = final_label
else:
# For other classes, display the current detection result directly and clear history (if exists)
if obj_id in lemon_history:
lemon_history[obj_id].clear()
display_text = label
else:
display_text = label
# Display the processed frame
cv2.imshow("Live Detection", frame)
# Exit program when ESC key is pressed
if cv2.waitKey(1) & 0xFF == 27:
print("ESC key detected. Exiting the program.")
break
cv2.destroyAllWindows()
print("Camera video processing complete. Program terminated.")