import os, time, copy, shutil, Queue
import Image, ImageFilter, ImageEnhance, ImageDraw, ImageOps
class capture:
gauge_box = None # bounding box of the gauge panel
dial_centers = None # Dictionary keyed by dial unit, of dial center coordinates
dial_radius = None # Radius in pixels of a dial
dev = None # '/dev/video0'
tmpfile = None # temporary location for captured jpeg
debug = None # print a bit more information
debug_nocap = None # If true, no new images will be acquired
debug_logcappath = None # write all captured images to this path
full = None # full image
dials = None # image of all dials
# current reading
cur_time = None # unix time of most recent reading
dial_values = None # current gauge values
dial_angles = None # current gauge angles
reading = None # The final value, as one float
# a past reading, with difference great enough to avoid bouncing
prev_values = None
rot_count_adj = None # rotation counts, adjusted to assure
# non-decreasing values
rot_count = None # rotation counts, never calibrated. for rate
cumm_time = None # last cumulative reset
cumm_count = None # cumulative rotation count
def reset_cumm_count(self):
self.cumm_time = self.cur_time
self.cumm_count = {0.5: 0, 2: 0}
def get_cumm_count(self):
cnt_tmp = copy.copy(self.cumm_count)
self.reset_cumm_count()
return cnt_tmp
def __init__(self, gauge_box, dial_centers, dial_radius,
dev='/dev/video0',
tempfile='temp.jpeg',
debug = False, debug_nocap = False,
debug_logcappath = None):
self.gauge_box = gauge_box
self.dial_centers = dial_centers
self.dial_radius = dial_radius
self.dev = dev
self.tempfile = tempfile
self.debug = debug
self.debug_nocap = debug_nocap
self.debug_logcappath = debug_logcappath
def get_reading(self):
"""Snap a picture and process a new reading. Returns (value,"""
self.snap()
self.reading = 0
self.cur_time = time.time()
da = self.dial_angles = {}
cv = self.dial_values = {}
self.dials.save('0_dials.jpeg')
dr = self.dial_radius
for (factor, (dx,dy,rot)) in dial_centers.items():
# crop and enhance the dial image
#
#FIXME: better thresholding techniques should certainly be
#used, not to mention proper registration of the dial
#coordiantes. For now, the camera is well secured, and
#lighting constant.
im_dial = self.dials.crop( (dx-dr, dy-dr, dx+dr, dy+dr))
#im_dial.save('1_crop_%s.jpeg' % factor)
im_dial = ImageOps.equalize(im_dial)
#im_dial.save('2_eq_%s.jpeg' % factor)
im_dial = ImageOps.grayscale(im_dial).point(lambda i: 0 if i < 60 else 255)
#im_dial.save('3_thresh_%s.jpeg' % factor)
im_dial = im_dial.filter(ImageFilter.ModeFilter)
#im_dial.save('4_filter_%s.jpeg' % factor)
im_dial = ImageOps.invert(im_dial)
#im_dial.save('5_invert_%s.jpeg' % factor)
(angle, arcsize) = self.findangle_pie(im_dial, arcsize=15)
da[factor] = angle + float(arcsize)/2
cv[factor] = self.angle_to_float( da[factor], rot)
if self.debug:
print "Factor angle: ", factor, angle
self.show_pie(im_dial, angle, arcsize)
# process the raw values
self.count_rotations()
self.disambiguate_dials()
# compute the final value
# factors are per revolution, so divide by ten
for factor in (1000000, 100000, 10000, 1000):
self.reading += factor/10 * int(self.prev_values[factor])
self.reading += 2 * (self.rot_count_adj[2] % 50) + (cv[2] * 0.2)
#value += 0.5 * (self.cnt_half % 4)
self.show_overlay()
return (self.cur_time, self.reading)
def disambiguate_dials(self):
"Reconcile ambiguous dial values with finer dial readings."
cv = self.dial_values
pv = self.prev_values
# disambiguate the 2ft dial using the half count
if int(cv[2] + 0.2) > int(cv[2]) and (self.rot_count_adj[0.5] % 4) > 1:
cv[2] = (int(cv[2]) + 1) % 10
else:
cv[2] = int(cv[2])
# if close to the next value, rely upon the next dial to
# disambiguate only really applies to the upper row. Increment
# only once
for factor in (1000, 10000, 100000, 1000000):
# increased by >1, or passed zero
if int(cv[factor]) > int(pv[factor]) or (
int(pv[factor] > 5) and int(cv[factor]) < 5):
# reset the counters, if on the 1000 dial
if factor==1000:
pv[factor] = cv[factor]
self.rot_count_adj[2] += 50 - (self.rot_count_adj[2] % 50)
self.rot_count_adj[0.5] += 4 - (self.rot_count_adj[0.5] % 4)
# increment only if near a digit, and the next smaller
# dial is at a low value.
# FIXME: just update whenever the previous dial wraps
elif pv[factor/10] < 5:
elif cv[factor] > (pv[factor] + 0.5) % 10:
return
def count_rotations(self):
"""Count rotations of the half and two foot dials. Also,
synchronize with the 1000ft dial when it is near a digit."""
# Note that if more than about 15 seconds have passed, the
# half gauge may have looped. This is not currently
# considered
t = self.cur_time
# if no previous value, store the current and move on
if self.prev_values is None:
self.rot_count_adj = { 0.5: 0, 2: 0}
self.rot_count = { 0.5: 0, 2: 0}
self.prev_values = copy.copy(cv)
cv[1000] = int(cv[1000])
# increment these counts if they passed zero
for factor in (0.5, 2):
# passed zero?
if int(pv[factor]) > 5 and int(cv[factor]) < 5:
self.rot_count_adj[factor] += 1
self.rot_count[factor] += 1
self.cumm_count[factor] += 1
self.prev_values[factor] = cv[factor]
# if more than 0.5 past last, update. 0.5 should be greater than any noise
def snap(self, nocapture=False):
"""Use v4lctl to capture a new image to disk."""
if not self.debug_nocap:
os.system( "v4lctl -c %s snap jpeg full %s" % (self.dev, self.tempfile))
if self.debug_logcappath is not None:
shutil.copyfile(
self.tempfile,
self.debug_logcappath + '/' + str(int(self.cur_time)) + '.jpeg')
self.full = Image.open(self.tempfile)
#self.full = self.full.rotate(-90)
self.dials = self.full.crop(self.gauge_box)
def angle_to_float(self, angle, rot):
"""Convert a gauge angle to a float. rot is either 'cw' or 'ccw'."""
if rot == 'ccw':
return (-float(angle) / 36 + 7.5) % 10
elif rot == 'cw':
return (float(angle) / 36 + 2.5) % 10
def show_pie(self, img, angle, arcsize):
"""Draw a pie slice at the center of an image."""
print "Drawing Angle: %d - %d" % (angle, angle+arcsize)
imgc = img.copy()
imgc = imgc.convert("RGB")
draw = ImageDraw.Draw(imgc)
draw.pieslice( (0,0,imgc.size[0],imgc.size[1]), angle, angle+arcsize,
outline="#00ff00")
imgc.show()
def show_overlay(self):
"""Show the current value overlaid on the current image.
Mostly for debugging."""
img_c = self.full.copy().convert("RGB")
draw = ImageDraw.Draw(img_c)
draw.rectangle(self.gauge_box, outline='#00FF00')
bx, by = self.gauge_box[:2]
for (factor,(dx,dy,rot)) in self.dial_centers.items():
if self.dial_angles is not None:
angle = self.dial_angles[factor]
angle = 0
draw.pieslice((dx-dr+bx, dy-dr+by, dx+dr+bx, dy+dr+by),
angle, angle-1, outline='#00FF00')
if self.dial_values is not None:
digit = self.dial_values[factor]
draw.text((dx+bx-10, dy+by-dr-15), "%0.2f" % digit, fill='#00FF00')
# show only a portion around the bounding box
crop_box = [-30, -30, 30, 30]
for (i,val) in enumerate(self.gauge_box): crop_box[i] += val
img_c.crop(crop_box)
img_c.show()
def findangle_pie(self, img, arcsize = 10, verbose=False):
"""Fine the needle angle by fitting a small pie slice to it"""
angles = range(0,360, max(arcsize/2,1))
mincnt = img.size[0]*img.size[1]
minangle = None
for angle in angles:
draw.pieslice( (0,0,img.size[0],img.size[1]), angle, angle+arcsize, fill=0)
count = imgc.histogram()[255]
if count < mincnt:
mincnt = count
minangle = angle
if self.debug and verbose:
print "Angle, count:", angle, count
self.show_pie(img, angle, arcsize)
return (minangle, arcsize)
# This didn't work so well...
def findangle_line(self, img):
xs = [float(a) * 20 for a in range(-10,10)]
ys = [float(a) * 20 for a in range(-10,10)]
minoffset = (None,None)
center = tuple([a / 2 for a in img.size])
for x_off in xs:
for y_off in ys:
endpoint = (center[0] + x_off,
center[1] + y_off)
draw.line( (center, endpoint), fill=0)
print "min, count:", mincnt, count
minoffset = (x_off, y_off)
print "x_off, y_off:", x_off, y_off
if __name__ == '__main__':
# hardcoded coordinates of an arbitrary bounding box to containing
# the dials, dial centers, and the dial radius.
gb = (30, 275, 330, 450)
dial_centers = {1000000 : (59, 48, 'ccw'),
100000 : (126, 47, 'cw'),
10000 : (193, 47, 'ccw'),
1000 : (260, 46, 'cw'),
2 : (39, 137, 'ccw'),
0.5 : (112, 137, 'ccw')}
dial_radius = 31
# Print debug information while parsing, and display images
debug=False
# Don't capture new images. Use temp.jpeg from the current directory
debug_nocap=True
# Store captured images in this path
debug_logcappath=os.path.expandvars('$HOME/tmp/images/')
# Initialize the capture class
c_cap = capture(gb, dial_centers, dial_radius, debug=debug,
debug_nocap=debug_nocap, debug_logcappath=debug_logcappath)
t_last_cumm = None
t_last = None
value_last = None
dial_last = None
log_out = open("meter.out",'a', buffering=1)
while True:
(t, value) = c_cap.get_reading()
t = int(t)
dial_values = c_cap.dial_values
prev_values = c_cap.prev_values
if dial_last is None or value != value_last:
print dial_values
t_last = t
dial_last = dial_values
value_last = value
print >> log_out,"%d %0.2f - %0.3f - %0.3f %d %d - %0.3f %d %d" % (
t, value,
prev_values[1000],
prev_values[2], c_cap.rot_count_adj[2], c_cap.rot_count[2],
prev_values[0.5], c_cap.rot_count_adj[0.5], c_cap.rot_count[0.5])
if debug_nocap: break