
Rough draft of the code for the Skynet project.
General summary;
- importing of libraries
- initialization of parameters
- Video stream function to run in parallel thread
- Several helpful/data-conversion functions
- Aim monitoring functions to run in parallel threads
- Setup
- Main loop which includes the modes; manual control, calibration mode, mapping/testing mode & overwatch mode
# import the necessary packages
import imutils
import threading
import datetime
import time
import math
import cv2
import pickle
import RPi.GPIO as GPIO
from picamera.array import PiRGBArray
from picamera import PiCamera
from Adafruit_PWM_Servo_Driver import PWM
from imutils.video.pivideostream import PiVideoStream
channelLaser = 5 #GPIO pin of coaxial laser (through transistor)
channelYaw = 6 #AdaFruit PWM channel of yaw servo
channelPitch = 7 #AdaFruit PWM channel of pitch servo
freq = 50 #could be higher, but this is plenty and less taxing
pwm = PWM(0x40, debug=False) #setting up Adafruit PWM HAT
pwm.setPWMFreq(freq)
window_name1 = "Live feed" #continuous video feed
config_file = "calibration.dat" #pickle file which contains calibration data
camera_resolution = (800, 600) #resolution of camera itself
camera_fps = 24 #framerate of camera
resolution = (800, 600) #resized resolution (used for display & analysis)
center = [int(resolution[0]/2), int(resolution[1]/2)] #center of the screen
gun_pos = center #global variable which is monitored by gun_monitors
center_areas = [0, 0] #compound center of identified contours in overwatch routine
refPx = [0, 0] #reference point for mouseclick events
(ow_dthresh, ow_minarea, ow_maxarea) = (5, 300, 2000) #parameters for overwatch mode
(ow_minUploadseconds, ow_minmotionframes) = (6.0, 3) #parameters for overwatch mode
try: #load calibration parameters from "calibrate.dat"
with open(config_file, "rb") as file: #read calibration data from file
calibration = pickle.load(file)
(alpha_offset, alpha_cf, alpha_min, origin, L1, La) = calibration
print ("[SKYNET] Calibration data loaded")
print calibration
print (" ")
except: #if calibration file doesn't exist; load standard parameters
calibration = (4.9, 0.64, -20.0, (319, 1615), 1148.0, 12,6) #standard calibration parameters (offset [pixels], L-constant [virtual px])
alpha_offset = calibration[0]
alpha_cf = calibration[1]
alpha_min = calibration[2]
origin = calibration[3]
L1 = calibration[4]
La = calibration[5]
print ("[SKYNET] No calibration file found, reverting to standard parameters")
print (" ")
calibrate_positionsD = ((-18.0, -18.0), (-18.0, 18.0), (18.0, -18.0), (18.0, 18.0)) #positions to calibrate on [degrees]
calr = [[0.0, 0.0], [0.0, 0.0], [0.0, 0.0], [0.0, 0.0]] #raw results of calibration [pixels]
test_sequence = ([300, 350, 0], [300, 250, 1], [50, 350, 1], [150, 300, 1], [50, 250, 1], [450, 300, 1], [250, 188, 0])#test sequence [pixels]
test_sequence = ([350, 300, 1], [450, 300, 1], [450, 400, 1], [350, 400, 1], [350, 300, 1])#test sequence [pixels]
#Threaded capturing of the camera output
class PiVideoStream:
def __init__(self, resolution=camera_resolution, framerate=camera_fps):
# initialize the camera and stream
self.camera = PiCamera()
self.camera.resolution = resolution
self.camera.framerate = framerate
self.rawCapture = PiRGBArray(self.camera, size=resolution)
self.stream = self.camera.capture_continuous(self.rawCapture, format="bgr", use_video_port=True)
# initialize the frame and the variable used to indicate
# if the thread should be stopped
self.frame = None
self.stopped = False
def start(self):
# start the thread to read frames from the video stream
threading.Thread(target=self.update, args=()).start()
return self
def update(self):
# keep looping infinitely until the thread is stopped
for f in self.stream:
# grab the frame from the stream and clear the stream in
# preparation for the next frame
self.frame = f.array
self.rawCapture.truncate(0)
#cv2.imshow(window_name1, self.frame) #doesn't work, don't know why
#time.sleep(0.01)
# if the thread indicator variable is set, stop the thread
# and resource camera resources
if self.stopped:
self.stream.close()
self.rawCapture.close()
self.camera.close()
return
def read(self):
# return the frame most recently read
return self.frame
def stop(self):
# indicate that the thread should be stopped
self.stopped = True
#Capturing the mouseposition and releasing the calibration sequence to continue
def mouse_click(event, x, y, flags, param):
# grab references to the global variable
global refPx, clicked
# if the left mouse button was clicked, record the (x, y) coordinates
if event == cv2.EVENT_LBUTTONDOWN:
refPx = [x, y]
clicked = True
#Easily change laser_state
def Laser(laser_state):
if laser_state == 0:
GPIO.output(channelLaser,GPIO.LOW)
elif laser_state == 1:
GPIO.output(channelLaser, GPIO.HIGH)
else:
GPIO.output(channelLaser,GPIO.LOW)
print "[ERROR] incorrect laser_state parameter. Laser switched off."
#A small function to translate pixels to degrees, taking calibration results into account
def RemapP2D(pixel, servo):
pixel = [pixel[0] - origin[0], origin[1] - pixel[1]]
if servo == "yaw":
degree_r = math.atan2(pixel[0], pixel[1]) / 3.14 * 180.0
degree = (degree_r - alpha_offset) / alpha_cf
elif servo == "pitch":
Lt = math.sqrt(math.pow(pixel[0],2)+math.pow(pixel[1],2))
degree = ((Lt-L1) / La ) + alpha_min
else:
print ("[WEAPONS_SYS] Bad servo allocation")
return degree
#A small function to translate degrees to ticks for the PWM-driver
def RemapD2T(degree, servo):
if degree > 50 or degree < -50:
print ("[WEAPONS_SYS] Servo limits exceeded. Servo: %s Degree: %s" % (servo, degree))
degree = -30
degree = min(degree, 50) #safety limiter
degree = max(degree, -50) #safety limiter
if servo == "yaw":
degreemin = -60
degreemax = 60
pulsemin = 900
pulsemax = 2100
elif servo == "pitch":
degreemin = -60
degreemax = 60
pulsemin = 900
pulsemax = 2100
else:
print ("[WEAPONS_SYS] Bad servo allocation")
degreerange = (degreemax - degreemin)
pulserange = (pulsemax - pulsemin)
pulseus = (((degree - degreemin) * pulserange) / degreerange) + pulsemin
tickus = 1000000 / freq / 4096 #duration of one of the 4096 ticks [us]
tick = int(pulseus / tickus) #number of ticks for required pulse length
return tick
#Monitor yaw position
def MonitorGunYaw(monitoring_wait_event):
dtmax = 0.03 #starting/end speed
dtmin = 0.0015 #determines 'cruising' speed
steps = 100 #controls the start of decel-/acceleration
#Initializing values
degree = RemapP2D(gun_pos, "yaw")
currentPosition = RemapD2T(degree, "yaw")
while (True):
monitoring_wait_event.wait()
degree = RemapP2D(gun_pos, "yaw") #calculate degree taking calibration into accout
newPosition = RemapD2T(degree, "yaw") #position is measured in pwm-'ticks'
distance = newPosition - currentPosition #amount of steps left to target position (for decel-/acceleration)
#setting servo position gradually
if currentPosition == newPosition:
time.sleep(0.3)
else:
direction = (distance) / abs(distance) #either +1 or -1
currentPosition = currentPosition + direction #move 1 step in direction
if abs(distance) < steps:
time.sleep((dtmin + (1 - abs(distance) / steps)) * (dtmax-dtmin))
else:
time.sleep(dtmin)
pwm.setPWM(channelYaw, 0, currentPosition)
#Monitor pitch position
def MonitorGunPitch(monitoring_wait_event):
dtmax = 0.02 #starting/end speed
dtmin = 0.0015 #determines 'cruising' speed
steps = 50 #controls the start of deceleration
#Initializing values
degree = RemapP2D(gun_pos, "pitch")
currentPosition = RemapD2T(degree, "pitch")
while (True):
monitoring_wait_event.wait()
degree = RemapP2D(gun_pos, "pitch") #calculate degree taking calibration into accout
newPosition = RemapD2T(degree, "pitch") #position is measured in pwm-'ticks'
distance = newPosition - currentPosition #amount of steps left to target position (for decel-/acceleration)
#setting servo position gradually
if currentPosition == newPosition:
time.sleep(0.3)
else:
direction = (distance) / abs(distance) #either +1 or -1
currentPosition = currentPosition + direction #move 1 step in direction
if abs(distance) < steps:
time.sleep((dtmin + (1 - abs(distance) / steps)) * (dtmax-dtmin))
else:
time.sleep(dtmin)
pwm.setPWM(channelPitch, 0, currentPosition)
#Direct control of gun_pos by setting degrees directly (needs disabling of monitoring threads)
def SetGunPosD(gun_posD):
#global gun_posD
#dt = 0.02 #determines 'cruising' speed
#steps = 10 #controls the start of deceleration
#Pitch
#degree = RemapP2D(gun_posD, "pitch")
currentPosition = RemapD2T(gun_posD[1], "pitch")
pwm.setPWM(channelPitch, 0, currentPosition)
#while (True):
# degree = gun_pos_degrees[1]
# newPosition = RemapD2T(degree, "pitch") #position is measured in pwm-'ticks'
# distance = newPosition - currentPosition #amount of steps left to target position (for decel-/acceleration)
# if currentPosition == newPosition:
# break
# else:
# direction = (distance) / abs(distance) #either +1 or -1
# currentPosition = currentPosition + direction #move 1 step in direction
# time.sleep(dt)
# pwm.setPWM(channelPitch, 0, currentPosition)
#Yaw
#degree = RemapP2D(gun_posD, "yaw")
currentPosition = RemapD2T(gun_posD[0], "yaw")
pwm.setPWM(channelYaw, 0, currentPosition)
#Initiate all threads/parameters/etc.
while True:
#Setting up window and callback to capture mouse clicks
cv2.namedWindow(window_name1)
#cv2.startWindowThread() #this seems to throw an error, it didn't in previous programs (where the camera didn't run in a thread)
cv2.setMouseCallback(window_name1, mouse_click)
#Start videofeed
print ("[SKYNET] Warming up sensors...")
feed = PiVideoStream().start()
time.sleep(1.0)
frame = feed.read()
frame = imutils.resize(frame, width=resolution[0])
cv2.imshow(window_name1, frame)
print ("[SKYNET] Sensors online")
time.sleep(0.1)
#Events and threads
monitoring_wait_event = threading.Event()
gunyawthread = threading.Thread(target=MonitorGunYaw, args=(monitoring_wait_event,))
gunyawthread.daemon = True
gunyawthread.start()
gunpitchthread = threading.Thread(target=MonitorGunPitch, args=(monitoring_wait_event,))
gunpitchthread.daemon = True
gunpitchthread.start()
#set flag to start monitoring of gun_pos
monitoring_wait_event.set()
print ("[WEAPON_SYS] Monitoring online")
time.sleep(0.1)
#GPIO laser
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(channelLaser,GPIO.OUT)
Laser(0)
print ("[WEAPON_SYS] Coaxial laser online")
time.sleep(0.1)
#surveillance setup
avg = None
lastUploaded = datetime.datetime.now()
motionCounter = 0
#Preperation completed, initiating continuous loop
returned = True
break
#The continuous loop
while True:
#print menu
if returned == True:
returned = False
print (" ")
print ("[SKYNET] System ready:")
print ("(q) quit")
print ("(c) calibration")
print ("(m) manual mode")
print ("(t) testing sequence")
print ("(s) overwatch mode")
print (" ")
#keep live feed updated
frame = feed.read()
#frame = imutils.resize(frame, width=resolution[0])
cv2.imshow(window_name1, frame)
#enter one of the modes by keystroke
key = cv2.waitKey(42) & 0xFF
if key == ord('q'): #quit
print ("[SKYNET] Logging off...")
break
elif key == ord('c'): #calibration mode
#In calibration mode, the continuous feed is halted. For all calibration positions (normally 4)
#the program targets the laser and asks for the corresponding pixel.
print "[SKYNET] Calibration sequence started"
monitoring_wait_event.clear()
print "[WEAPON_SYS] Monitoring offline" #allow direct setting of servo positions
print "[SKYNET] Assuming direct control"
print " "
j = 0
#Loop through all calibration positions (normally 4)
for i in calibrate_positionsD:
print("Setting point[%s]: Please wait..." % (str(i)))
Laser(1) #switch on laser
SetGunPosD(i) #change gun position
time.sleep(2)
frame = feed.read()
#frame = imutils.resize(frame, width=resolution[0])
cv2.imshow(window_name1, frame) #show picture for calibration input
time.sleep(0.5)
clicked = False
print("Please click on coordinates")
while True:
frame = feed.read()
#frame = imutils.resize(frame, width=resolution[0])
cv2.imshow(window_name1, frame)
key_c = cv2.waitKey(42) & 0xFF
if clicked == True: #wait for click to continue in program
clicked = False #immediately clear click
Laser(0) #switch off laser (to prevent overheating & annoyance)
break #continue in program
calr[j] = refPx #save clicked position as result
print ("coordinates entered: %s" % (str(calr[j])))
print (" ")
j = j + 1
#Calculate the differente calibration parameters from results (yaw, pitch)(offset, l_constant)
a1 = (calr[1][1] - calr[0][1]) / (calr[1][0] - calr[0][0])
a2 = (calr[3][1] - calr[2][1]) / (calr[3][0] - calr[2][0])
b1 = calr[0][1] - (calr[0][0] * a1)
b2 = calr[2][1] - (calr[2][0] * a2)
alpha1 = math.atan(a1) * 180.0 / 3.14
alpha2 = math.atan(a2) * 180.0 / 3.14
alpha_offset = (alpha1 + alpha2) / 2.0
alpha_cf = (90.0 - (alpha1 - alpha_offset)) / abs(calibrate_positionsD[0][0])
origin = ( (b2-b1)/(a1-a2), a1*((b2-b1)/(a1-a2))+b1)
L11 = math.sqrt(math.pow(calr[0][0]-origin[0],2)+math.pow(origin[1]-calr[0][1],2))
L12 = math.sqrt(math.pow(calr[2][0]-origin[0],2)+math.pow(origin[1]-calr[2][1],2))
L21 = math.sqrt(math.pow(calr[1][0]-origin[0],2)+math.pow(origin[1]-calr[1][1],2))
L22 = math.sqrt(math.pow(calr[3][0]-origin[0],2)+math.pow(origin[1]-calr[3][1],2))
L1 = (L11 + L12) / 2
L2 = (L21 + L22) / 2
La = (L2 - L1) / (calibrate_positionsD[1][1] - calibrate_positionsD[0][1])
alpha_min = calibrate_positionsD[0][1]
calibration = (alpha_offset, alpha_cf, alpha_min, origin, L1, La)
#write results to a pickle
with open(config_file, "wb") as file:
pickle.dump(calibration, file)
#Write all results to a readable file
file = open("calibration_log.txt","w")
file.write("alpha_offset \n")
file.write(str(calibration[0]) + "\n\n")
file.write("alpha_cf \n")
file.write(str(calibration[1]) + "\n\n")
file.write("alpha_min \n")
file.write(str(calibration[2]) + "\n\n")
file.write("origin \n")
file.write(str(calibration[3]) + "\n\n")
file.write("L1 \n")
file.write(str(calibration[4]) + "\n\n")
file.write("LA \n")
file.write(str(calibration[5]) + "\n\n")
for l in range(0,j):
file.write("Raw results %s \n" % (l+1))
file.write(str(calibrate_positionsD[l]) + "\n")
file.write(str(calr[l]) + "\n\n")
file.close()
gun_pos = center
print "[SKYNET] Calibration sequence ended successfully. Parameters stored in file(s)."
monitoring_wait_event.set()
print ("[WEAPON_SYS] Monitoring back online")
print ("[SKYNET] Calibration complete, returing to main loop")
returned = True
elif key == ord('m'): #manual mode
print ("[SKYNET] Manual mode, please select target...")
clicked = False
while True:
frame = feed.read() #keep updating live feed
#frame = imutils.resize(frame, width=resolution[0])
cv2.imshow(window_name1, frame)
key = cv2.waitKey(42) & 0xFF
if clicked == True: #when clicked continue
clicked = False #immediately clear click
break #continue in program
print ("[SKYNET] Target aqcuired")
gun_pos = refPx #set click position as target
Laser(1)
tstart = time.time()
while time.time() < tstart + 4:
frame = feed.read()
#frame = imutils.resize(frame, width=resolution[0])
cv2.imshow(window_name1, frame)
key = cv2.waitKey(42) & 0xFF
Laser(0)
print ("[SKYNET] Returning to main loop")
returned = True
elif key == ord('t'): #testing sequence
print ("[SKYNET] Test sequence initiating")
for j in range(0, len(test_sequence)):
print("Setting point[%s]: Please wait..." % (j))
gun_pos = [test_sequence[j][0], test_sequence[j][1]]
Laser(test_sequence[j][2])
tstart = time.time()
while time.time() < tstart + 3:
frame = feed.read()
#frame = imutils.resize(frame, width=resolution[0])
cv2.imshow(window_name1, frame)
key = cv2.waitKey(42) & 0xFF
Laser(0)
print ("[SKYNET] Test sequence finished, returning to main loop")
returned = True
elif key == ord('s'): #overwatch mode
print ("[SKYNET] Entering overwatch")
print ("(r) return to menu")
print (" ")
#Create new background image each time Overwatch is restarted
print "[SENSOR] starting background model"
avg = gray.copy().astype("float")
lastUploaded = timestamp
while True:
motion = False
timestamp = datetime.datetime.now()
frame = feed.read()
#frame = imutils.resize(frame, width=resolution[0])
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = cv2.GaussianBlur(gray, (21, 21), 0)
cv2.accumulateWeighted(gray, avg, 0.3)
frameDelta = cv2.absdiff(gray, cv2.convertScaleAbs(avg))
# threshold the delta image, dilate the thresholded image to fill in holes, then find contours on thresholded image
thresh = cv2.threshold(frameDelta, ow_dthresh, 255, cv2.THRESH_BINARY)[1]
thresh = cv2.dilate(thresh, None, iterations=3)
(cnts, _) = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
center_areas = [0, 0]
cntsareacount = 0
#determine total magnitude of areas (for the calculation of a compound centre)
for c in cnts:
#if the contour is too small, ignore it
if cv2.contourArea(c) > ow_minarea and cv2.contourArea(c) < ow_maxarea:
cntsareacount = cntsareacount + cv2.contourArea(c)
#analyse all areas
for c in cnts:
#if the contour is too small, ignore it
if cv2.contourArea(c) < ow_minarea or cv2.contourArea(c) > ow_maxarea:
continue
#compute the bounding box for the contour, draw it on the frame, update compound centre and update the text
(x, y, w, h) = cv2.boundingRect(c)
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
center_areas = [int(center_areas[0] + (x + w/2) * cv2.contourArea(c) / cntsareacount), int(center_areas[1] + (y + h/2) * cv2.contourArea(c) / cntsareacount)]
motion = True
#draw a single, compound centre of all areas
if center_areas <> [0, 0]:
cv2.circle(frame, center_areas, 20, (255, 0, 0), 2)
#draw the text and timestamp on the frame
ts = timestamp.strftime("%A %d %B %Y %I:%M:%S%p")
#cv2.putText(frame, "Room Status: {}".format(text), (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
cv2.putText(frame, ts, (10, frame.shape[0] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.35, (0, 0, 255), 1)
# check to see if the room is occupied
if motion == True:
#Increment the motion counter
motionCounter += 1
# check to see if the number of frames with consistent motion is high enough
if motionCounter >= ow_minmotionframes:
gun_pos = center_areas
Laser(1)
if timestamp - lastUploaded > ow_minUploadseconds
print "[SKYNET] Tracking target {} {}".format(ts, motionCounter)
#update the last uploaded timestamp
lastUploaded = timestamp
#otherwise, the room is not occupied
else:
Laser(0)
gun_pos = center
if motionCounter > 0:
print"[SENSOR] Target lost"
motionCounter = 0
cv2.imshow(window_name1, frame)
key = cv2.waitKey(10) & 0xFF
if key == ord('r'):
break
Laser(0)
gun_pos = center
print (" ")
print ("[SKYNET] Exiting overwatch, returning to main loop")
returned = True
#Quitting
gun_pos = center
Laser(0)
time.sleep(3)
monitoring_wait_event.clear()
feed.stop
time.sleep(0.2)
#Let's keep trying to destroy those windows...
for q in range(0,3):
cv2.waitKey(1)
cv2.waitKey(1)
cv2.destroyAllWindows
cv2.waitKey(1)
time.sleep(0.5)
print ("[SKYNET] Terminated")