# Copyright 2013-2018 Intranet AG and contributors
#
# guibot is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# guibot is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with guibot. If not, see <http://www.gnu.org/licenses/>.
"""
SUMMARY
------------------------------------------------------
Computer vision finders (CV backends) to perform find targets on screen.
INTERFACE
------------------------------------------------------
"""
import os
import sys
import re
import copy
import random
import configparser as config
import PIL.Image
from .config import GlobalConfig, LocalConfig
from .imagelogger import ImageLogger
from .fileresolver import FileResolver
from .errors import *
import logging
log = logging.getLogger('guibot.finder')
__all__ = ['CVParameter', 'Finder', 'AutoPyFinder', 'ContourFinder', 'TemplateFinder',
'FeatureFinder', 'CascadeFinder', 'TextFinder', 'TemplateFeatureFinder',
'DeepFinder', 'HybridFinder']
[docs]class CVParameter(object):
"""A class for a single parameter used for CV backend configuration."""
[docs] def __init__(self, value,
min_val=None, max_val=None,
delta=10.0, tolerance=1.0,
fixed=True, enumerated=False):
"""
Build a computer vision parameter.
:param value: value of the parameter
:type value: bool or int or float or str or None
:param min_val: lower boundary for the parameter range
:type min_val: int or float or None
:param max_val: upper boundary for the parameter range
:type max_val: int or float or None
:param float delta: delta for the calibration and random value
(no calibration if `delta` < `tolerance`)
:param float tolerance: tolerance of calibration
:param bool fixed: whether the parameter is prevented from calibration
:param bool enumerated: whether the parameter value belongs to an
enumeration or to a range (distance matters)
As a rule of thumb a good choice for the parameter delta is one fourth
of the range since the delta will be used as standard deviation when
generating a random value for the parameter from a normal distribution.
The delta to tolerance ratio is basically the number of failing trials
before the parameter converges and is usually set to ten.
"""
self.value = value
# initial (delta) and minimal (tolerance) variation step
self.delta = delta
self.tolerance = tolerance
# variation allowance range
self.min_val = min_val
if min_val is not None:
assert value >= min_val
elif isinstance(self.value, float):
min_val = -sys.float_info.max
elif isinstance(self.value, int):
min_val = -sys.maxsize
self.max_val = max_val
if max_val is not None:
assert value <= max_val
elif isinstance(self.value, float):
max_val = sys.float_info.max
elif isinstance(self.value, int):
max_val = sys.maxsize
self.range = (min_val, max_val)
# fixed or allowed to be calibrated
self.fixed = fixed
# enumerable (e.g. modes) or range value
self.enumerated = enumerated
if self.enumerated and (self.min_val is None or self.max_val is None):
raise ValueError("Enumerated parameters must have a finite (usually small) range")
[docs] def __repr__(self):
"""
Provide a representation of the parameter for storing and reporting.
:returns: special syntax representation of the parameter
:rtype: str
"""
return ("<value='%s' min='%s' max='%s' delta='%s' tolerance='%s' fixed='%s' enumerated='%s'>"
% (self.value, self.min_val, self.max_val, self.delta, self.tolerance, self.fixed, self.enumerated))
[docs] def __eq__(self, other):
"""
Custom implementation for equality check.
:returns: whether this instance is equal to another
:rtype: bool
"""
if not isinstance(other, CVParameter):
return NotImplemented
return repr(self) == repr(other)
[docs] @staticmethod
def from_string(raw):
"""
Parse a CV parameter from string.
:param str raw: string representation for the parameter
:returns: parameter parsed from the representation
:rtype: :py:class:`CVParameter`
:raises: :py:class:`ValueError` if unsupported type is encountered
"""
args = []
string_args = re.match(r"<value='(.*)' min='(-?[\d.None]+)' max='([\d.None]+)'"
r" delta='([\d.]+)' tolerance='([\d.]+)' fixed='(\w+)' enumerated='(\w+)'>",
raw).group(1, 2, 3, 4, 5, 6)
for arg in string_args:
if arg == "None":
arg = None
elif arg == "True":
arg = True
elif arg == "False":
arg = False
elif re.match(r"-?\d+$", arg):
arg = int(arg)
elif re.match(r"-?\d+(?:\.\d+)?$", arg):
arg = float(arg)
else:
arg = str(arg)
log.log(9, "%s %s", arg, type(arg))
args.append(arg)
log.log(9, "%s", args)
return CVParameter(*args)
[docs] def random_value(self, mu=None, sigma=None):
"""
Return a random value of the CV parameter given its range and type.
:param mu: mean for a normal distribution, uniform distribution if None
:type mu: bool or int or float or str or None
:param sigma: standard deviation for a normal distribution, quarter range if None
(maximal range is equivalent to maximal data type values)
:type sigma: bool or int or float or str or None
:returns: a random value comforming to the CV parameter range and type
:rtype: bool or int or float or str or None
.. note:: Only uniform distribution is used for boolean values.
"""
start, end = self.range[0], self.range[1]
if isinstance(self.value, float):
if mu is None or self.enumerated:
return random.uniform(self.range[0], self.range[1])
elif sigma is None:
return min(max(random.gauss(mu, (start-end)/4), start), end)
else:
return min(max(random.gauss(mu, sigma), start), end)
elif isinstance(self.value, int):
if mu is None or self.enumerated:
return random.randint(start, end)
elif sigma is None:
return min(max(int(random.gauss(mu, (start-end)/4)), start), end)
else:
return min(max(int(random.gauss(mu, sigma)), start), end)
elif isinstance(self.value, bool):
value = random.randint(0, 1)
return value == 1
else:
log.warning("Cannot generate random value for CV parameters other than float, int, and bool")
return self.value
[docs]class Finder(LocalConfig):
"""
Base for all image matching functionality and backends.
The image finding methods include finding one or all matches
above the similarity defined in the configuration of each backend.
There are many parameters that could contribute for a good match. They can
all be manually adjusted or automatically calibrated.
"""
[docs] @staticmethod
def from_match_file(filename):
"""
Read the configuration from a match file with the given filename.
:param str filename: match filename for the configuration
:returns: target finder with the parsed (and generated) settings
:rtype: :py:class:`finder.Finder`
:raises: :py:class:`IOError` if the respective match file couldn't be read
The influence of the read configuration is that of an overwrite, i.e.
all parameters will be generated (if not already present) and then the
ones read from the configuration file will be overwritten.
"""
parser = config.RawConfigParser()
# preserve case sensitivity
parser.optionxform = str
if not filename.endswith(".match"):
filename += ".match"
if not os.path.exists(filename):
filename = FileResolver().search(filename)
success = parser.read(filename)
# if no file is found throw an exception
if len(success) == 0:
raise IOError("Match file %s is corrupted and cannot be read" % filename)
if not parser.has_section("find"):
raise IOError("No image matching configuration can be found")
try:
backend_name = parser.get("find", 'backend')
except config.NoOptionError:
backend_name = GlobalConfig.find_backend
if backend_name == "autopy":
finder = AutoPyFinder(synchronize=False)
elif backend_name == "contour":
finder = ContourFinder(synchronize=False)
elif backend_name == "template":
finder = TemplateFinder(synchronize=False)
elif backend_name == "feature":
finder = FeatureFinder(synchronize=False)
elif backend_name == "cascade":
finder = CascadeFinder(synchronize=False)
elif backend_name == "text":
finder = TextFinder(synchronize=False)
elif backend_name == "tempfeat":
finder = TemplateFeatureFinder(synchronize=False)
elif backend_name == "deep":
finder = DeepFinder(synchronize=False)
elif backend_name == "hybrid":
finder = HybridFinder(synchronize=False)
else:
raise UnsupportedBackendError("No '%s' backend is supported" % backend_name)
for category in finder.params.keys():
if parser.has_section(category):
section_backend = parser.get(category, 'backend')
if section_backend != finder.params[category]["backend"]:
finder.configure_backend(backend=section_backend, category=category, reset=False)
for option in parser.options(category):
if option == "backend":
continue
param_string = parser.get(category, option)
if isinstance(finder.params[category][option], CVParameter):
param = CVParameter.from_string(param_string)
log.log(9, "%s %s", param_string, param)
else:
param = param_string
finder.params[category][option] = param
finder.synchronize()
return finder
[docs] @staticmethod
def to_match_file(finder, filename):
"""
Write the configuration to a match file with the given filename.
:param finder: match configuration to save
:type finder: :py:class:`finder.Finder`
:param str filename: match filename for the configuration
"""
parser = config.RawConfigParser()
# preserve case sensitivity
parser.optionxform = str
sections = finder.params.keys()
for section in sections:
if not parser.has_section(section):
parser.add_section(section)
parser.set(section, 'backend', finder.params[section]["backend"])
for option in finder.params[section]:
log.log(9, "%s %s", section, option)
parser.set(section, option, finder.params[section][option])
if not filename.endswith(".match"):
filename += ".match"
with open(filename, 'w') as configfile:
configfile.write("# IMAGE MATCH DATA\n")
parser.write(configfile)
[docs] def __init__(self, configure=True, synchronize=True):
"""Build a finder and its CV backend settings."""
super(Finder, self).__init__(configure=False, synchronize=False)
# available and currently fully compatible methods
self.categories["find"] = "find_methods"
self.algorithms["find_methods"] = ["autopy", "contour", "template", "feature",
"cascade", "text", "tempfeat", "deep", "hybrid"]
# other attributes
self.imglog = ImageLogger()
self.imglog.log = self.log
# additional preparation (no synchronization available)
if configure:
self.__configure_backend(reset=True)
def __configure_backend(self, backend=None, category="find", reset=False):
if category != "find":
raise UnsupportedBackendError("Backend category '%s' is not supported" % category)
if reset:
super(Finder, self).configure_backend(backend="cv", reset=True)
if backend is None:
backend = GlobalConfig.find_backend
if backend not in self.algorithms[self.categories[category]]:
raise UnsupportedBackendError("Backend '%s' is not among the supported ones: "
"%s" % (backend, self.algorithms[self.categories[category]]))
log.log(9, "Setting backend for %s to %s", category, backend)
self.params[category] = {}
self.params[category]["backend"] = backend
self.params[category]["similarity"] = CVParameter(0.8, 0.0, 1.0)
log.log(9, "%s %s\n", category, self.params[category])
def __synchronize_backend(self, backend=None, category="find", reset=False):
if category != "find":
raise UnsupportedBackendError("Backend category '%s' is not supported" % category)
if reset:
super(Finder, self).synchronize_backend("cv", reset=True)
if backend is not None and self.params[category]["backend"] != backend:
raise UninitializedBackendError("Backend '%s' has not been configured yet" % backend)
backend = self.params[category]["backend"]
[docs] def synchronize_backend(self, backend=None, category="find", reset=False):
"""
Custom implementation of the base method.
See base method for details.
"""
self.__synchronize_backend(backend, category, reset)
[docs] def can_calibrate(self, category, mark):
"""
Fix the parameters for a given category backend algorithm,
i.e. disallow the calibrator to change them.
:param bool mark: whether to mark for calibration
:param str category: backend category whose parameters are marked
:raises: :py:class:`UnsupportedBackendError` if `category` is not among the
supported backend categories
"""
if category not in self.categories.keys():
raise UnsupportedBackendError("Category '%s' not among the "
"supported %s" % (category, self.categories.keys()))
for key, value in self.params[category].items():
if not isinstance(value, CVParameter):
continue
# BUG: force fix parameters that have internal bugs
if category == "fextract" and key == "bytes":
value.fixed = True
elif category == "fdetect" and key == "Extended":
value.fixed = True
elif category == "tdetect" and key in ["input_res_x", "input_res_y"]:
value.fixed = True
else:
value.fixed = not mark
log.debug("Setting %s/%s to fixed=%s for calibration", category, key, value.fixed)
[docs] def copy(self):
"""
Deep copy the current finder and its configuration.
:returns: a copy of the current finder with identical configuration
:rtype: :py:class:`Finder`
"""
acopy = type(self)(synchronize=False)
for category in self.params.keys():
try:
acopy.configure_backend(self.params[category]["backend"], category)
except UnsupportedBackendError:
# some categories are not configurable
pass
for category in self.params.keys():
for param in self.params[category].keys():
acopy.params[category][param] = copy.deepcopy(self.params[category][param])
for category in self.params.keys():
try:
acopy.synchronize_backend(self.params[category]["backend"], category)
except UnsupportedBackendError:
# some categories are not synchronizable
pass
return acopy
[docs] def find(self, needle, haystack):
"""
Find all needle targets in a haystack image.
:param needle: image, text, pattern, or a list or chain of such to look for
:type needle: :py:class:`target.Target` or [:py:class:`target.Target`]
:param haystack: image to look in
:type haystack: :py:class:`target.Image`
:returns: all found matches (one in most use cases)
:rtype: [:py:class:`match.Match`]
:raises: :py:class:`NotImplementedError` if the base class method is called
"""
raise NotImplementedError("Abstract method call - call implementation of this class")
[docs] def log(self, lvl):
"""
Log images with an arbitrary logging level.
:param int lvl: logging level for the message
"""
# below selected logging level
if lvl < self.imglog.logging_level:
self.imglog.clear()
return
# logging is being collected for a specific logtype
elif ImageLogger.accumulate_logging:
return
# no hotmaps to log
elif len(self.imglog.hotmaps) == 0:
raise MissingHotmapError("No matching was performed in order to be image logged")
similarity = self.imglog.similarities[-1] if len(self.imglog.similarities) > 0 else 0.0
name = "imglog%s-3hotmap-%s.png" % (self.imglog.printable_step, similarity)
self.imglog.dump_hotmap(name, self.imglog.hotmaps[-1])
self.imglog.clear()
ImageLogger.step += 1
[docs]class AutoPyFinder(Finder):
"""Simple matching backend provided by AutoPy."""
[docs] def __init__(self, configure=True, synchronize=True):
"""Build a CV backend using AutoPy."""
super(AutoPyFinder, self).__init__(configure=False, synchronize=False)
# other attributes
self._bitmapcache = {}
# additional preparation (no synchronization available)
if configure:
self.__configure_backend(reset=True)
def __configure_backend(self, backend=None, category="autopy", reset=False):
if category != "autopy":
raise UnsupportedBackendError("Backend category '%s' is not supported" % category)
if reset:
super(AutoPyFinder, self).configure_backend(backend="autopy", reset=True)
self.params[category] = {}
self.params[category]["backend"] = "none"
[docs] def find(self, needle, haystack):
"""
Custom implementation of the base method.
:param needle: target iamge to search for
:type needle: :py:class:`Image`
See base method for details.
.. warning:: AutoPy has a bug when finding multiple matches
so it will currently only return a single match.
"""
needle.match_settings = self
needle.use_own_settings = True
self.imglog.needle = needle
self.imglog.haystack = haystack
self.imglog.dump_matched_images()
# prepare a canvas solely for image logging
self.imglog.hotmaps.append(haystack.pil_image.copy())
# class-specific dependencies
from autopy import bitmap, screen
from tempfile import NamedTemporaryFile
if needle.filename in self._bitmapcache:
autopy_needle = self._bitmapcache[needle.filename]
else:
# load and cache it
# TODO: Use in-memory conversion
autopy_needle = bitmap.Bitmap.open(needle.filename)
self._bitmapcache[needle.filename] = autopy_needle
# TODO: Use in-memory conversion
with NamedTemporaryFile(prefix='guibot', suffix='.png') as f:
haystack.save(f.name)
autopy_screenshot = bitmap.Bitmap.open(f.name)
autopy_tolerance = 1.0 - self.params["find"]["similarity"].value
log.debug("Performing autopy template matching with tolerance %s (color)",
autopy_tolerance)
# TODO: since only the coordinates are available and fuzzy areas of
# matches are returned we need to ask autopy team for returning
# the matching rates as well
coord = autopy_screenshot.find_bitmap(autopy_needle, autopy_tolerance)
log.debug("Best acceptable match starting at %s", coord)
if coord is not None:
coord = (int(coord[0]), int(coord[1]))
similarity = self.params["find"]["similarity"].value
self.imglog.locations.append(coord)
self.imglog.similarities.append(similarity)
x, y = coord
w, h = needle.width, needle.height
dx, dy = needle.center_offset.x, needle.center_offset.y
from .match import Match
matches = [Match(x, y, w, h, dx, dy, similarity)]
from PIL import ImageDraw
draw = ImageDraw.Draw(self.imglog.hotmaps[-1])
draw.rectangle((x, y, x+w, y+h), outline=(0, 0, 255))
del draw
else:
matches = []
self.imglog.log(30)
return matches
[docs]class ContourFinder(Finder):
"""
Contour matching backend provided by OpenCV.
Essentially, we will find all countours in a binary image,
preprocessed with Gaussian blur and adaptive threshold and return
the ones with area (size) similar to the searched image.
"""
[docs] def __init__(self, configure=True, synchronize=True):
"""Build a CV backend using OpenCV's contour matching."""
super(ContourFinder, self).__init__(configure=False, synchronize=False)
# available and currently fully compatible methods
self.categories["contour"] = "contour_extractors"
self.categories["threshold"] = "threshold_filters"
self.algorithms["contour_extractors"] = ("mixed",)
self.algorithms["threshold_filters"] = ("normal", "adaptive", "canny")
# additional preparation (no synchronization available)
if configure:
self.__configure(reset=True)
def __configure_backend(self, backend=None, category="contour", reset=False):
"""
Custom implementation of the base method.
See base method for details.
"""
if category not in ["contour", "threshold"]:
raise UnsupportedBackendError("Backend category '%s' is not supported" % category)
if reset:
super(ContourFinder, self).configure_backend("contour", reset=True)
if category == "contour" and backend is None:
backend = "mixed"
elif category == "threshold" and backend is None:
backend = GlobalConfig.contour_threshold_backend
if backend not in self.algorithms[self.categories[category]]:
raise UnsupportedBackendError("Backend '%s' is not among the supported ones: "
"%s" % (backend, self.algorithms[self.categories[category]]))
log.log(9, "Setting backend for %s to %s", category, backend)
self.params[category] = {}
self.params[category]["backend"] = backend
if category == "contour":
# 1 RETR_EXTERNAL, 2 RETR_LIST, 3 RETR_CCOMP, 4 RETR_TREE
self.params[category]["retrievalMode"] = CVParameter(2, 1, 4, enumerated=True)
# 1 CHAIN_APPROX_NONE, 2 CHAIN_APPROX_SIMPLE, 3 CHAIN_APPROX_TC89_L1, 4 CHAIN_APPROX_TC89_KCOS
self.params[category]["approxMethod"] = CVParameter(2, 1, 4, enumerated=True)
self.params[category]["minArea"] = CVParameter(0, 0, None, 100.0)
# 1 L1 method, 2 L2 method, 3 L3 method
self.params[category]["contoursMatch"] = CVParameter(1, 1, 3, enumerated=True)
elif category == "threshold":
# 1 normal, 2 median, 3 gaussian, 4 none
self.params[category]["blurType"] = CVParameter(4, 1, 4, enumerated=True)
self.params[category]["blurKernelSize"] = CVParameter(5, 1, None, 100.0)
self.params[category]["blurKernelSigma"] = CVParameter(0, 0, None, 100.0)
if backend == "normal":
# value of the threshold since it is nonadaptive and fixed
self.params[category]["thresholdValue"] = CVParameter(122, 0, 255, 50.0)
self.params[category]["thresholdMax"] = CVParameter(255, 0, 255, 20.0)
# 0 binary, 1 binar_inv, 2 trunc, 3 tozero, 4 tozero_inv, 5 mask, 6 otsu, 7 triangle
self.params[category]["thresholdType"] = CVParameter(1, 0, 7, enumerated=True)
elif backend == "adaptive":
self.params[category]["thresholdMax"] = CVParameter(255, 0, 255, 20.0)
# 0 adaptive mean threshold, 1 adaptive gaussian (weighted mean) threshold
self.params[category]["adaptiveMethod"] = CVParameter(1, 0, 1, enumerated=True)
# 0 normal, 1 inverted
self.params[category]["thresholdType"] = CVParameter(1, 0, 1, enumerated=True)
# size of the neighborhood to consider to adaptive thresholding
self.params[category]["blockSize"] = CVParameter(11, 3, None, 200.0, 2.0)
# constant to substract from the (weighted) calculated mean
self.params[category]["constant"] = CVParameter(2, -255, 255, 1.0)
elif backend == "canny":
self.params[category]["threshold1"] = CVParameter(100.0, 0.0, None, 50.0)
self.params[category]["threshold2"] = CVParameter(1000.0, 0.0, None, 500.0)
def __configure(self, threshold_filter=None, reset=True, **kwargs):
self.__configure_backend(category="contour", reset=reset)
self.__configure_backend(threshold_filter, "threshold")
[docs] def find(self, needle, haystack):
"""
Custom implementation of the base method.
:param needle: target iamge to search for
:type needle: :py:class:`Image`
See base method for details.
First extract all contours from a binary (boolean, threshold) version of
the needle and haystack and then match the needle contours with one or
more sets of contours in the haystack image. The number of needle matches
depends on the set similarity and can be improved by requiring minimal
area for the contours to be considered.
"""
needle.match_settings = self
needle.use_own_settings = True
self.imglog.needle = needle
self.imglog.haystack = haystack
self.imglog.dump_matched_images()
# class-specific dependencies
import cv2
import numpy
orig_needle = numpy.array(needle.pil_image)
thresh_needle = self._binarize_image(orig_needle, log=False)
countours_needle = thresh_needle.copy()
needle_contours = self._extract_contours(countours_needle, log=False)
orig_haystack = numpy.array(haystack.pil_image)
thresh_haystack = self._binarize_image(orig_haystack, log=True)
countours_haystack = thresh_haystack.copy()
haystack_contours = self._extract_contours(countours_haystack, log=True)
self.imglog.hotmaps.append(numpy.array(haystack.pil_image))
distances = numpy.ones((len(haystack_contours), len(needle_contours)))
for i, hcontour in enumerate(haystack_contours):
if cv2.contourArea(hcontour) < self.params["contour"]["minArea"].value:
continue
for j, ncontour in enumerate(needle_contours):
if cv2.contourArea(ncontour) < self.params["contour"]["minArea"].value:
continue
distances[i, j] = cv2.matchShapes(hcontour, ncontour, self.params["contour"]["contoursMatch"].value, 0)
assert distances[i, j] >= 0.0
from .match import Match
matches = []
nx, ny, nw, nh = cv2.boundingRect(numpy.concatenate(needle_contours, axis=0))
while True:
matching_haystack_contours = []
matching_haystack_distances = numpy.zeros(len(needle_contours))
for j in range(len(needle_contours)):
matching_haystack_distances[j] = numpy.min(distances[:, j])
index = numpy.where(distances[:, j] == matching_haystack_distances[j])
# we don't allow collapsing into the same needle contour, i.e.
# the map from the needle to the haystack contours is injective
# -> so here we cross the entire row rather than one value in it
distances[index[0][0], :] = 1.1 # like this works even for similarity 0.0
matching_haystack_contours.append(haystack_contours[index[0][0]])
average_distance = numpy.average(matching_haystack_distances)
required_distance = 1.0 - self.params["find"]["similarity"].value
logging.debug("Average distance to next needle shape is %s of max allowed %s",
average_distance, required_distance)
if average_distance > required_distance:
break
else:
shape = numpy.concatenate(matching_haystack_contours, axis=0)
x, y, w, h = cv2.boundingRect(shape)
# calculate needle upleft and downright points to return its (0,0) location
needle_upleft = (max(int((x-nx)*float(w)/nw), 0), max(int((y-ny)*float(h)/nh), 0))
needle_downright = (min(int(needle_upleft[0]+needle.width*float(w)/nw), haystack.width),
min(int(needle_upleft[1]+needle.height*float(h)/nh), haystack.height))
needle_center_offset = (needle.center_offset.x*float(w)/nw,
needle.center_offset.y*float(h)/nh)
cv2.rectangle(self.imglog.hotmaps[-1], needle_upleft, needle_downright, (0, 0, 0), 2)
cv2.rectangle(self.imglog.hotmaps[-1], needle_upleft, needle_downright, (255, 255, 255), 1)
# NOTE: to extract the region of interest just do:
# roi = thresh_haystack[y:y+h,x:x+w]
similarity = 1.0 - average_distance
self.imglog.similarities.append(similarity)
self.imglog.locations.append(needle_upleft)
matches.append(Match(needle_upleft[0], needle_upleft[1],
needle_downright[0] - needle_upleft[0],
needle_downright[1] - needle_upleft[1],
needle_center_offset[0], needle_center_offset[1],
similarity))
self.imglog.log(30)
return matches
def _binarize_image(self, image, log=False):
import cv2
# blur first in order to avoid unwonted edges caused from noise
blurSize = self.params["threshold"]["blurKernelSize"].value
blurDeviation = self.params["threshold"]["blurKernelSigma"].value
gray_image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
if self.params["threshold"]["blurType"].value == 1:
blur_image = cv2.blur(gray_image, (blurSize, blurSize))
elif self.params["threshold"]["blurType"].value == 2:
blur_image = cv2.medianBlur(gray_image, blurSize)
elif self.params["threshold"]["blurType"].value == 3:
blur_image = cv2.GaussianBlur(gray_image, (blurSize, blurSize), blurDeviation)
elif self.params["threshold"]["blurType"].value == 4:
blur_image = gray_image
# second stage: thresholding
if self.params["threshold"]["backend"] == "normal":
_, thresh_image = cv2.threshold(blur_image,
self.params["threshold"]["thresholdValue"].value,
self.params["threshold"]["thresholdMax"].value,
self.params["threshold"]["thresholdType"].value)
elif self.params["threshold"]["backend"] == "adaptive":
thresh_image = cv2.adaptiveThreshold(blur_image,
self.params["threshold"]["thresholdMax"].value,
self.params["threshold"]["adaptiveMethod"].value,
self.params["threshold"]["thresholdType"].value,
self.params["threshold"]["blockSize"].value,
self.params["threshold"]["constant"].value)
elif self.params["threshold"]["backend"] == "canny":
thresh_image = cv2.Canny(blur_image,
self.params["threshold"]["threshold1"].value,
self.params["threshold"]["threshold2"].value)
if log:
self.imglog.hotmaps.append(thresh_image)
return thresh_image
def _extract_contours(self, countours_image, log=False):
import cv2
rargs = cv2.findContours(countours_image,
self.params["contour"]["retrievalMode"].value,
self.params["contour"]["approxMethod"].value)
if len(rargs) == 3:
_, contours, hierarchy = rargs
else:
contours, hierarchy = rargs
image_contours = [cv2.approxPolyDP(cnt, 3, True) for cnt in contours]
if log:
cv2.drawContours(countours_image, image_contours, -1, (255, 255, 255))
self.imglog.hotmaps.append(countours_image)
return image_contours
[docs] def log(self, lvl):
"""
Custom implementation of the base method.
See base method for details.
"""
# below selected logging level
if lvl < self.imglog.logging_level:
self.imglog.clear()
return
# logging is being collected for a specific logtype
elif ImageLogger.accumulate_logging:
return
# no hotmaps to log
elif len(self.imglog.hotmaps) == 0:
raise MissingHotmapError("No matching was performed in order to be image logged")
self.imglog.dump_hotmap("imglog%s-3hotmap-1threshold.png" % self.imglog.printable_step,
self.imglog.hotmaps[0])
self.imglog.dump_hotmap("imglog%s-3hotmap-2contours.png" % self.imglog.printable_step,
self.imglog.hotmaps[1])
similarity = self.imglog.similarities[-1] if len(self.imglog.similarities) > 0 else 0.0
self.imglog.dump_hotmap("imglog%s-3hotmap-%s.png" % (self.imglog.printable_step, similarity),
self.imglog.hotmaps[-1])
self.imglog.clear()
ImageLogger.step += 1
[docs]class TemplateFinder(Finder):
"""Template matching backend provided by OpenCV."""
[docs] def __init__(self, configure=True, synchronize=True):
"""Build a CV backend using OpenCV's template matching."""
super(TemplateFinder, self).__init__(configure=False, synchronize=False)
# available and currently fully compatible methods
self.categories["template"] = "template_matchers"
# we only use the normalized version of "sqdiff", "ccorr", and "ccoeff"
self.algorithms["template_matchers"] = ("sqdiff_normed", "ccorr_normed", "ccoeff_normed")
# additional preparation (no synchronization available)
if configure:
self.__configure_backend(reset=True)
def __configure_backend(self, backend=None, category="template", reset=False):
"""
Custom implementation of the base method.
See base method for details.
"""
if category != "template":
raise UnsupportedBackendError("Backend category '%s' is not supported" % category)
if reset:
super(TemplateFinder, self).configure_backend("template", reset=True)
if backend is None:
backend = GlobalConfig.template_match_backend
if backend not in self.algorithms[self.categories[category]]:
raise UnsupportedBackendError("Backend '%s' is not among the supported ones: "
"%s" % (backend, self.algorithms[self.categories[category]]))
log.log(9, "Setting backend for %s to %s", category, backend)
self.params[category] = {}
self.params[category]["backend"] = backend
self.params[category]["nocolor"] = CVParameter(False)
log.log(9, "%s %s\n", category, self.params[category])
[docs] def find(self, needle, haystack):
"""
Custom implementation of the base method.
:param needle: target iamge to search for
:type needle: :py:class:`Image`
:raises: :py:class:`UnsupportedBackendError` if the choice of template
matches is not among the supported ones
See base method for details.
"""
needle.match_settings = self
needle.use_own_settings = True
self.imglog.needle = needle
self.imglog.haystack = haystack
self.imglog.dump_matched_images()
if self.params["template"]["backend"] not in self.algorithms["template_matchers"]:
raise UnsupportedBackendError("Backend '%s' is not among the supported ones: "
"%s" % (self.params["template"]["backend"],
self.algorithms["template_matchers"]))
match_template = self.params["template"]["backend"]
no_color = self.params["template"]["nocolor"].value
log.debug("Performing %s template matching %s color",
match_template, "without" if no_color else "with")
result = self._match_template(needle, haystack, no_color, match_template)
if result is None:
log.warning("OpenCV's template matching returned no result")
return []
# switch max and min for sqdiff and sqdiff_normed (to always look for max)
if self.params["template"]["backend"] in ("sqdiff_normed"):
result = 1.0 - result
import cv2
import numpy
universal_hotmap = result * 255.0
final_hotmap = numpy.array(self.imglog.haystack.pil_image)
if self.params["template"]["nocolor"].value:
final_hotmap = cv2.cvtColor(final_hotmap, cv2.COLOR_RGB2GRAY)
# extract maxima once for each needle size region
similarity = self.params["find"]["similarity"].value
from .match import Match
matches = []
while True:
minVal, maxVal, minLoc, maxLoc = cv2.minMaxLoc(result)
# rectify to the [0,1] interval to avoid negative values in some methods
maxVal = min(max(maxVal, 0.0), 1.0)
log.debug('Next best match with value %s (similarity %s) and location (x,y) %s',
str(maxVal), similarity, str(maxLoc))
if maxVal < similarity:
if len(matches) == 0:
self.imglog.similarities.append(maxVal)
self.imglog.locations.append(maxLoc)
current_hotmap = numpy.copy(universal_hotmap)
cv2.circle(current_hotmap, (maxLoc[0], maxLoc[1]), int(30*maxVal), (255, 255, 255))
self.imglog.hotmaps.append(current_hotmap)
self.imglog.hotmaps.append(final_hotmap)
log.debug("Next best match is not acceptable")
break
else:
self.imglog.similarities.append(maxVal)
self.imglog.locations.append(maxLoc)
current_hotmap = numpy.copy(universal_hotmap)
cv2.circle(current_hotmap, (maxLoc[0], maxLoc[1]), int(30*maxVal), (255, 255, 255))
x, y = maxLoc
w, h = needle.width, needle.height
dx, dy = needle.center_offset.x, needle.center_offset.y
cv2.rectangle(final_hotmap, (x, y), (x+w, y+h), (0, 0, 0), 2)
cv2.rectangle(final_hotmap, (x, y), (x+w, y+h), (255, 255, 255), 1)
self.imglog.hotmaps.append(current_hotmap)
log.debug("Next best match is acceptable")
matches.append(Match(x, y, w, h, dx, dy, maxVal))
if similarity == 0.0:
# return just one match if no similarity requirement
break
res_w = haystack.width - needle.width + 1
res_h = haystack.height - needle.height + 1
match_x0 = max(maxLoc[0] - int(0.5 * needle.width), 0)
match_x1 = min(maxLoc[0] + int(0.5 * needle.width), res_w)
match_y0 = max(maxLoc[1] - int(0.5 * needle.height), 0)
match_y1 = min(maxLoc[1] + int(0.5 * needle.height), res_h)
# log this only if performing deep internal debugging
log.log(9, "Wipe image matches in x [%s, %s]/[%s, %s]",
match_x0, match_x1, 0, res_w)
log.log(9, "Wipe image matches in y [%s, %s]/[%s, %s]",
match_y0, match_y1, 0, res_h)
# clean found image to look for next safe distance match
result[match_y0:match_y1, match_x0:match_x1] = 0.0
log.log(9, "Total maxima up to the point are %i", len(matches))
log.debug("A total of %i matches found", len(matches))
self.imglog.hotmaps.append(final_hotmap)
self.imglog.log(30)
return matches
def _match_template(self, needle, haystack, nocolor, method):
"""
EXTRA DOCSTRING: Template matching backend - wrapper.
Match a color or grayscale needle image using the OpenCV
template matching methods.
"""
# sanity check: needle size must be smaller than haystack
if haystack.width < needle.width or haystack.height < needle.height:
log.warning("The size of the searched image (%sx%s) does not fit the search region (%sx%s)",
needle.width, needle.height, haystack.width, haystack.height)
return None
import cv2
import numpy
methods = {"sqdiff": cv2.TM_SQDIFF, "sqdiff_normed": cv2.TM_SQDIFF_NORMED,
"ccorr": cv2.TM_CCORR, "ccorr_normed": cv2.TM_CCORR_NORMED,
"ccoeff": cv2.TM_CCOEFF, "ccoeff_normed": cv2.TM_CCOEFF_NORMED}
if method not in methods.keys():
raise UnsupportedBackendError("Supported algorithms are in conflict")
numpy_needle = numpy.array(needle.pil_image)
numpy_haystack = numpy.array(haystack.pil_image)
if nocolor:
gray_needle = cv2.cvtColor(numpy_needle, cv2.COLOR_RGB2GRAY)
gray_haystack = cv2.cvtColor(numpy_haystack, cv2.COLOR_RGB2GRAY)
match = cv2.matchTemplate(gray_haystack, gray_needle, methods[method])
else:
match = cv2.matchTemplate(numpy_haystack, numpy_needle, methods[method])
return match
[docs] def log(self, lvl):
"""
Custom implementation of the base method.
See base method for details.
"""
# below selected logging level
if lvl < self.imglog.logging_level:
self.imglog.clear()
return
# logging is being collected for a specific logtype
elif ImageLogger.accumulate_logging:
return
# no hotmaps to log
elif len(self.imglog.hotmaps) == 0:
raise MissingHotmapError("No matching was performed in order to be image logged")
for i in range(len(self.imglog.similarities)):
name = "imglog%s-3hotmap-%stemplate-%s.png" % (self.imglog.printable_step,
i + 1, self.imglog.similarities[i])
self.imglog.dump_hotmap(name, self.imglog.hotmaps[i])
similarity = self.imglog.similarities[-1] if len(self.imglog.similarities) > 0 else 0.0
self.imglog.dump_hotmap("imglog%s-3hotmap-%s.png" % (self.imglog.printable_step, similarity),
self.imglog.hotmaps[-1])
self.imglog.clear()
ImageLogger.step += 1
[docs]class FeatureFinder(Finder):
"""
Feature matching backend provided by OpenCV.
.. note:: SURF and SIFT are proprietary algorithms and are not available
by default in newer OpenCV versions (>3.0).
"""
[docs] def __init__(self, configure=True, synchronize=True):
"""Build a CV backend using OpenCV's feature matching."""
super(FeatureFinder, self).__init__(configure=False, synchronize=False)
# available and currently fully compatible methods
self.categories["feature"] = "feature_projectors"
self.categories["fdetect"] = "feature_detectors"
self.categories["fextract"] = "feature_extractors"
self.categories["fmatch"] = "feature_matchers"
self.algorithms["feature_projectors"] = ("mixed",)
self.algorithms["feature_matchers"] = ("BruteForce", "BruteForce-L1", "BruteForce-Hamming",
"BruteForce-Hamming(2)")
self.algorithms["feature_detectors"] = ("ORB", "BRISK", "KAZE", "AKAZE", "MSER",
"AgastFeatureDetector", "FastFeatureDetector", "GFTTDetector",
"SimpleBlobDetector")
# TODO: we could also support "StereoSGBM" but it needs initialization arguments
# BUG: "KAZE", "AKAZE" we get internal error when using KAZE/AKAZE even though it should be possible
self.algorithms["feature_extractors"] = ("ORB", "BRISK")
# other attributes
self.detector = None
self.extractor = None
self.matcher = None
# additional preparation
if configure:
self.__configure(reset=True)
if synchronize:
self.__synchronize(reset=False)
def __configure_backend(self, backend=None, category="feature", reset=False):
if category not in ["feature", "fdetect", "fextract", "fmatch"]:
raise UnsupportedBackendError("Backend category '%s' is not supported" % category)
if reset:
super(FeatureFinder, self).configure_backend("feature", reset=True)
if category == "feature" and backend is None:
backend = "mixed"
elif category == "fdetect" and backend is None:
backend = GlobalConfig.feature_detect_backend
elif category == "fextract" and backend is None:
backend = GlobalConfig.feature_extract_backend
elif category == "fmatch" and backend is None:
backend = GlobalConfig.feature_match_backend
if backend not in self.algorithms[self.categories[category]]:
raise UnsupportedBackendError("Backend '%s' is not among the supported ones: "
"%s" % (backend, self.algorithms[self.categories[category]]))
log.log(9, "Setting backend for %s to %s", category, backend)
self.params[category] = {}
self.params[category]["backend"] = backend
if category == "feature":
# 0 for homography, 1 for fundamental matrix
self.params[category]["projectionMethod"] = CVParameter(0, 0, 1, enumerated=True)
self.params[category]["ransacReprojThreshold"] = CVParameter(0.0, 0.0, 200.0, 50.0)
self.params[category]["minDetectedFeatures"] = CVParameter(4, 1, None)
self.params[category]["minMatchedFeatures"] = CVParameter(4, 1, None)
# 0 for matched/detected ratio, 1 for projected/matched ratio
self.params[category]["similarityRatio"] = CVParameter(1, 0, 1, enumerated=True)
elif category == "fdetect":
self.params[category]["nzoom"] = CVParameter(1.0, 1.0, 10.0, 2.5)
self.params[category]["hzoom"] = CVParameter(1.0, 1.0, 10.0, 2.5)
import cv2
feature_detector_create = getattr(cv2, "%s_create" % backend)
backend_obj = feature_detector_create()
elif category == "fextract":
import cv2
descriptor_extractor_create = getattr(cv2, "%s_create" % backend)
backend_obj = descriptor_extractor_create()
elif category == "fmatch":
if backend == "in-house-region":
self.params[category]["refinements"] = CVParameter(50, 1, None)
self.params[category]["recalc_interval"] = CVParameter(10, 1, None)
self.params[category]["variants_k"] = CVParameter(100, 1, None)
self.params[category]["variants_ratio"] = CVParameter(0.33, 0.0001, 1.0, 0.25)
return
else:
self.params[category]["ratioThreshold"] = CVParameter(0.65, 0.0, 1.0, 0.25, 0.01)
self.params[category]["ratioTest"] = CVParameter(False)
self.params[category]["symmetryTest"] = CVParameter(False)
# no other parameters are used for the in-house-raw matching
if backend == "in-house-raw":
return
else:
import cv2
# NOTE: descriptor matcher creation is kept the old way while feature
# detection and extraction not - example of the untidy maintenance of OpenCV
backend_obj = cv2.DescriptorMatcher_create(backend)
# BUG: a bug of OpenCV leads to crash if parameters
# are extracted from the matcher interface although
# the API supports it - skip fmatch for now
return
# examine the interface of the OpenCV backend to add extra parameters
if category in ["fdetect", "fextract", "fmatch"]:
log.log(9, "%s %s", backend_obj, dir(backend_obj))
for attribute in dir(backend_obj):
if not attribute.startswith("get"):
continue
param = attribute.replace("get", "")
get_param = getattr(backend_obj, attribute)
val = get_param()
if type(val) not in [bool, int, float, type(None)]:
continue
# give more information about some better known parameters
if category in ("fdetect", "fextract") and param == "FirstLevel":
self.params[category][param] = CVParameter(val, 0, None, 100, 25)
elif category in ("fdetect", "fextract") and param == "MaxFeatures":
self.params[category][param] = CVParameter(val, 0, None, 100.0)
elif category in ("fdetect", "fextract") and param == "WTA_K":
self.params[category][param] = CVParameter(val, 2, 4, 1.0)
elif category in ("fdetect", "fextract") and param == "ScaleFactor":
self.params[category][param] = CVParameter(val, 1.01, 2.0, 0.25, 0.05)
elif category in ("fdetect", "fextract") and param == "NLevels":
self.params[category][param] = CVParameter(val, 1, 100, 25, 0.5)
elif category in ("fdetect", "fextract") and param == "NLevels":
self.params[category][param] = CVParameter(val, 1, 100, 25, 0.5)
elif category in ("fdetect", "fextract") and param == "PatchSize":
self.params[category][param] = CVParameter(val, 2, None, 100, 25)
else:
self.params[category][param] = CVParameter(val)
log.log(9, "%s=%s", param, val)
def __configure(self, feature_detect=None, feature_extract=None,
feature_match=None, reset=True, **kwargs):
self.__configure_backend(category="feature", reset=reset)
self.__configure_backend(feature_detect, "fdetect")
self.__configure_backend(feature_extract, "fextract")
self.__configure_backend(feature_match, "fmatch")
def __synchronize_backend(self, backend=None, category="feature", reset=False):
if category not in ["feature", "fdetect", "fextract", "fmatch"]:
raise UnsupportedBackendError("Backend category '%s' is not supported" % category)
if reset:
super(FeatureFinder, self).synchronize_backend("feature", reset=True)
if backend is not None and self.params[category]["backend"] != backend:
raise UninitializedBackendError("Backend '%s' has not been configured yet" % backend)
backend = self.params[category]["backend"]
backend_obj = None
if category == "feature":
# nothing to sync
return
elif category == "fdetect":
import cv2
feature_detector_create = getattr(cv2, "%s_create" % backend)
backend_obj = feature_detector_create()
elif category == "fextract":
import cv2
descriptor_extractor_create = getattr(cv2, "%s_create" % backend)
backend_obj = descriptor_extractor_create()
elif category == "fmatch":
import cv2
# NOTE: descriptor matcher creation is kept the old way while feature
# detection and extraction not - example of the untidy maintenance of OpenCV
backend_obj = cv2.DescriptorMatcher_create(backend)
# BUG: a bug of OpenCV leads to crash if parameters
# are extracted from the matcher interface although
# the API supports it - skip fmatch for now
self.matcher = backend_obj
return
for attribute in dir(backend_obj):
if not attribute.startswith("get"):
continue
param = attribute.replace("get", "")
if param in self.params[category]:
val = self.params[category][param].value
set_attribute = attribute.replace("get", "set")
# some getters might not have corresponding setters
if not hasattr(backend_obj, set_attribute):
continue
set_param = getattr(backend_obj, set_attribute)
set_param(val)
log.log(9, "Synced %s to %s", param, val)
self.params[category][param].value = val
if category == "fdetect":
self.detector = backend_obj
elif category == "fextract":
self.extractor = backend_obj
elif category == "fmatch":
self.matcher = backend_obj
[docs] def synchronize_backend(self, backend=None, category="feature", reset=False):
"""
Custom implementation of the base method.
See base method for details.
"""
self.__synchronize_backend(backend, category, reset)
def __synchronize(self, feature_detect=None, feature_extract=None,
feature_match=None, reset=True):
self.__synchronize_backend(category="feature", reset=reset)
self.__synchronize_backend(feature_detect, "fdetect")
self.__synchronize_backend(feature_extract, "fextract")
self.__synchronize_backend(feature_match, "fmatch")
[docs] def synchronize(self, feature_detect=None, feature_extract=None,
feature_match=None, reset=True):
"""
Custom implementation of the base method.
:param feature_detect: name of a preselected backend
:type feature_detect: str or None
:param feature_extract: name of a preselected backend
:type feature_extract: str or None
:param feature_match: name of a preselected backend
:type feature_match: str or None
"""
self.__synchronize(feature_detect, feature_extract, feature_match, reset)
[docs] def find(self, needle, haystack):
"""
Custom implementation of the base method.
:param needle: target iamge to search for
:type needle: :py:class:`Image`
See base method for details.
.. warning:: Finding multiple matches is currently not supported
and this will currently only return a single match.
Available methods are: a combination of feature detector,
extractor, and matcher.
"""
needle.match_settings = self
needle.use_own_settings = True
self.imglog.needle = needle
self.imglog.haystack = haystack
self.imglog.dump_matched_images()
import cv2
import numpy
ngray = cv2.cvtColor(numpy.array(needle.pil_image), cv2.COLOR_RGB2GRAY)
hgray = cv2.cvtColor(numpy.array(haystack.pil_image), cv2.COLOR_RGB2GRAY)
self.imglog.hotmaps.append(numpy.array(haystack.pil_image))
self.imglog.hotmaps.append(numpy.array(haystack.pil_image))
self.imglog.hotmaps.append(numpy.array(haystack.pil_image))
self.imglog.hotmaps.append(numpy.array(haystack.pil_image))
# project more points for debugging purposes and image logging
npoints = []
npoints.extend([(0, 0), (needle.width, 0), (0, needle.height),
(needle.width, needle.height)])
npoints.append((needle.width / 2, needle.height / 2))
similarity = self.params["find"]["similarity"].value
hpoints = self._project_features(npoints, ngray, hgray, similarity)
if hpoints is not None and len(hpoints) > 0:
from .match import Match
x, y = hpoints[0]
w, h = tuple(numpy.abs(numpy.subtract(hpoints[3], hpoints[0])))
# TODO: projecting offset requires more effort
matches = [Match(x, y, w, h, 0, 0, self.imglog.similarities[-1])]
self.imglog.log(30)
return matches
self.imglog.log(40)
return []
def _project_features(self, locations_in_needle, ngray, hgray, similarity):
"""
EXTRA DOCSTRING: Feature matching backend - wrapper.
Wrapper for the internal feature detection, matching and location
projection used by all public feature matching functions.
"""
# default logging in case no match is found (further overridden by match stages)
self.imglog.locations.append((0, 0))
self.imglog.similarities.append(0.0)
log.debug("Performing %s feature matching (no color)",
"-".join([self.params["fdetect"]["backend"],
self.params["fextract"]["backend"],
self.params["fmatch"]["backend"]]))
nkp, ndc, hkp, hdc = self._detect_features(ngray, hgray,
self.params["fdetect"]["backend"],
self.params["fextract"]["backend"])
min_features = self.params["feature"]["minDetectedFeatures"].value
if len(nkp) < min_features or len(hkp) < min_features:
log.debug("No acceptable best match after feature detection: "
"only %s\\%s needle and %s\\%s haystack features detected",
len(nkp), min_features, len(hkp), min_features)
return None
mnkp, mhkp = self._match_features(nkp, ndc, hkp, hdc,
self.params["fmatch"]["backend"])
min_features = self.params["feature"]["minMatchedFeatures"].value
if self.imglog.similarities[-1] < similarity or len(mnkp) < min_features:
log.debug("No acceptable best match after feature matching:\n"
"- matched features %s of %s required\n"
"- best match similarity %s of %s required",
len(mnkp), min_features,
self.imglog.similarities[-1], similarity)
return None
locations_in_haystack = self._project_locations(locations_in_needle, mnkp, mhkp)
if self.imglog.similarities[-1] < similarity:
log.debug("No acceptable best match after RANSAC projection: "
"best match similarity %s is less than required %s",
self.imglog.similarities[-1], similarity)
return None
else:
self._log_features(30, self.imglog.locations, self.imglog.hotmaps[-1], 3, 0, 0, 255)
return locations_in_haystack
def _detect_features(self, ngray, hgray, detect, extract):
"""
EXTRA DOCSTRING: Feature matching backend - detection/extraction stage (1).
Detect all keypoints and calculate their respective decriptors.
"""
nfactor = self.params["fdetect"]["nzoom"].value
hfactor = self.params["fdetect"]["hzoom"].value
# zoom in if explicitly set
import cv2
if nfactor > 1.0:
log.debug("Zooming x%i needle", nfactor)
ngray = cv2.resize(ngray, None, fx=nfactor, fy=nfactor)
if hfactor > 1.0:
log.debug("Zooming x%i haystack", hfactor)
hgray = cv2.resize(hgray, None, fx=hfactor, fy=hfactor)
# include only methods tested for compatibility
if (detect in self.algorithms["feature_detectors"]
and extract in self.algorithms["feature_extractors"]):
self.synchronize_backend(category="fdetect")
self.synchronize_backend(category="fextract")
# keypoints
nkeypoints = self.detector.detect(ngray)
hkeypoints = self.detector.detect(hgray)
# feature vectors (descriptors)
(nkeypoints, ndescriptors) = self.extractor.compute(ngray, nkeypoints)
(hkeypoints, hdescriptors) = self.extractor.compute(hgray, hkeypoints)
else:
raise UnsupportedBackendError("Feature detector %s is not among the supported"
"ones %s" % (detect, self.algorithms[self.categories["fdetect"]]))
# reduce keypoint coordinates to the original image size
for nkeypoint in nkeypoints:
nkeypoint.pt = (int(nkeypoint.pt[0] / nfactor),
int(nkeypoint.pt[1] / nfactor))
for hkeypoint in hkeypoints:
hkeypoint.pt = (int(hkeypoint.pt[0] / hfactor),
int(hkeypoint.pt[1] / hfactor))
log.debug("Detected %s keypoints in needle and %s in haystack",
len(nkeypoints), len(hkeypoints))
hkp_locations = [hkp.pt for hkp in hkeypoints]
self._log_features(10, hkp_locations, self.imglog.hotmaps[-4], 3, 255, 0, 0)
return (nkeypoints, ndescriptors, hkeypoints, hdescriptors)
def _match_features(self, nkeypoints, ndescriptors,
hkeypoints, hdescriptors, match):
"""
EXTRA DOCSTRING: Feature matching backend - matching stage (2).
Match two sets of keypoints based on their descriptors.
"""
def ratio_test(matches):
"""
The ratio test checks the first and second best match. If their
ratio is close to 1.0, there are both good candidates for the
match and the probabilty of error when choosing one is greater.
Therefore these matches are ignored and thus only matches of
greater probabilty are returned.
"""
matches2 = []
for m in matches:
if len(m) > 1:
# smooth to make 0/0 case also defined as 1.0
smooth_dist1 = m[0].distance + 0.0000001
smooth_dist2 = m[1].distance + 0.0000001
if smooth_dist1 / smooth_dist2 < self.params["fmatch"]["ratioThreshold"].value:
matches2.append(m[0])
else:
matches2.append(m[0])
log.log(9, "Ratio test result is %i/%i", len(matches2), len(matches))
return matches2
def symmetry_test(nmatches, hmatches):
"""
Refines the matches with a symmetry test which extracts
only the matches in agreement with both the haystack and needle
sets of keypoints. The two keypoints must be best feature
matching of each other to ensure the error by accepting the
match is not too large.
"""
import cv2
matches2 = []
for nm in nmatches:
for hm in hmatches:
if nm.queryIdx == hm.trainIdx and nm.trainIdx == hm.queryIdx:
m = cv2.DMatch(nm.queryIdx, nm.trainIdx, nm.distance)
matches2.append(m)
break
log.log(9, "Symmetry test result is %i/%i", len(matches2), len(matches))
return matches2
# include only methods tested for compatibility
if match in self.algorithms["feature_matchers"]:
# build matcher and match feature vectors
self.synchronize_backend(category="fmatch")
else:
raise UnsupportedBackendError("Feature detector %s is not among the supported"
"ones %s" % (match, self.algorithms[self.categories["fmatch"]]))
# find and filter matches through tests
if match == "in-house-region":
matches = self.matcher.regionMatch(ndescriptors, hdescriptors,
nkeypoints, hkeypoints,
self.params["fmatch"]["refinements"].value,
self.params["fmatch"]["recalc_interval"].value,
self.params["fmatch"]["variants_k"].value,
self.params["fmatch"]["variants_ratio"].value)
else:
if self.params["fmatch"]["ratioTest"].value:
matches = self.matcher.knnMatch(ndescriptors, hdescriptors, 2)
matches = ratio_test(matches)
else:
matches = self.matcher.knnMatch(ndescriptors, hdescriptors, 1)
matches = [m[0] for m in matches]
if self.params["fmatch"]["symmetryTest"].value:
if self.params["fmatch"]["ratioTest"].value:
hmatches = self.matcher.knnMatch(hdescriptors, ndescriptors, 2)
hmatches = ratio_test(hmatches)
else:
hmatches = self.matcher.knnMatch(hdescriptors, ndescriptors, 1)
hmatches = [hm[0] for hm in hmatches]
matches = symmetry_test(matches, hmatches)
# prepare final matches
match_nkeypoints = []
match_hkeypoints = []
matches = sorted(matches, key=lambda x: x.distance)
for match in matches:
log.log(9, match.distance)
match_nkeypoints.append(nkeypoints[match.queryIdx])
match_hkeypoints.append(hkeypoints[match.trainIdx])
# these matches are half the way to being good
mhkp_locations = [mhkp.pt for mhkp in match_hkeypoints]
self._log_features(10, mhkp_locations, self.imglog.hotmaps[-3], 2, 255, 255, 0)
match_similarity = float(len(match_nkeypoints)) / float(len(nkeypoints))
# update the current achieved similarity if matching similarity is used:
# won't be updated anymore if self.params["feature"]["similarityRatio"].value == 0
self.imglog.similarities[-1] = match_similarity
log.log(9, "%s\\%s -> %f", len(match_nkeypoints),
len(nkeypoints), match_similarity)
return (match_nkeypoints, match_hkeypoints)
def _project_locations(self, locations_in_needle, mnkp, mhkp):
"""
EXTRA DOCSTRING: Feature matching backend - projecting stage (3).
Calculate the projection of points from the needle in the
haystack using random sample consensus and the matched
keypoints between the needle and the haystack.
In particular, take the locations in the need as (x,y) tuples
for each point, the matched needle keypoints, and the matched
haystack keypoints and return a list of (x,y) tuples of the
respective locations in the haystack. Also, set the final
similarity and returned location in the hotmap.
.. warning:: The returned location is always the projected
point at (0,0) needle coordinates as in template matching,
i.e. the upper left corner of the image. In case of wild
transformations of the needle in the haystack this has to
be reconsidered and the needle center becomes obligatory.
"""
# check matches consistency
assert len(mnkp) == len(mhkp)
import cv2
import numpy
# homography and fundamental matrix as options - homography is considered only
# for rotation but currently gives better results than the fundamental matrix
if self.params["feature"]["projectionMethod"].value == 0:
H, mask = cv2.findHomography(numpy.array([kp.pt for kp in mnkp]),
numpy.array([kp.pt for kp in mhkp]), cv2.RANSAC,
self.params["feature"]["ransacReprojThreshold"].value)
elif self.params["feature"]["projectionMethod"].value == 1:
H, mask = cv2.findFundamentalMat(numpy.array([kp.pt for kp in mnkp]),
numpy.array([kp.pt for kp in mhkp]),
method=cv2.RANSAC, param1=10.0,
param2=0.9)
else:
raise ValueError("Unsupported projection method - use 0 for homography and "
"1 for fundamentlal matrix")
# measure total used features for the projected focus point
if H is None or mask is None:
log.log(30, "Homography error occurred during feature matching")
self.imglog.similarities[-1] = 0.0
return []
true_matches = []
for i, kp in enumerate(mhkp):
# true matches are also inliers for the homography
if mask[i][0] == 1:
true_matches.append(kp)
tmhkp_locations = [tmhkp.pt for tmhkp in true_matches]
self._log_features(20, tmhkp_locations, self.imglog.hotmaps[-2], 1, 0, 255, 0)
# calculate and project all point coordinates in the needle
projected = []
for location in locations_in_needle:
(ox, oy) = (location[0], location[1])
orig_center_wrapped = numpy.array([[[ox, oy]]], dtype=numpy.float32)
log.log(9, "%s %s", orig_center_wrapped.shape, H.shape)
match_center_wrapped = cv2.perspectiveTransform(orig_center_wrapped, H)
(mx, my) = (match_center_wrapped[0][0][0], match_center_wrapped[0][0][1])
projected.append((int(mx), int(my)))
ransac_similarity = float(len(true_matches)) / float(len(mnkp))
if self.params["feature"]["similarityRatio"].value == 1:
# override the match similarity if projectin-based similarity is preferred
self.imglog.similarities[-1] = ransac_similarity
log.log(9, "%s\\%s -> %f", len(true_matches), len(mnkp), ransac_similarity)
self.imglog.locations.extend(projected)
return projected
[docs] def log(self, lvl):
"""
Custom implementation of the base method.
See base method for details.
"""
# below selected logging level
if lvl < self.imglog.logging_level:
self.imglog.clear()
return
# logging is being collected for a specific logtype
elif ImageLogger.accumulate_logging:
return
# no hotmaps to log
elif len(self.imglog.hotmaps) == 0:
raise MissingHotmapError("No matching was performed in order to be image logged")
stages = ["detect", "match", "project", ""]
for i, stage in enumerate(stages):
if self.imglog.logging_level > 10 and stage in ["detect", "match"]:
continue
if self.imglog.logging_level > 20 and stage == "project":
continue
if stage == "":
name = "imglog%s-3hotmap-%s.png" % (self.imglog.printable_step,
self.imglog.similarities[-1])
else:
name = "imglog%s-3hotmap-%s%s.png" % (self.imglog.printable_step,
i+1, stage)
self.imglog.dump_hotmap(name, self.imglog.hotmaps[i])
self.imglog.clear()
ImageLogger.step += 1
def _log_features(self, lvl, locations, hotmap, radius=0, r=255, g=255, b=255):
if lvl < self.imglog.logging_level:
return
import cv2
for loc in locations:
x, y = loc
cv2.circle(hotmap, (int(x), int(y)), radius, (r, g, b))
[docs]class CascadeFinder(Finder):
"""
Cascade matching backend provided by OpenCV.
This matcher uses Haar cascade for object detection.
It is the most advanced method for object detection
excluding convolutional neural networks. However, it
requires the generation of a Haar cascade (if such is
not already provided) of the needle to be found.
TODO: Currently no similarity requirement can be applied
due to the cascade classifier API.
"""
[docs] def __init__(self, classifier_datapath=".", configure=True, synchronize=True):
"""Build a CV backend using OpenCV's cascade matching options."""
super(CascadeFinder, self).__init__(configure=False, synchronize=False)
# additional preparation (no synchronization available)
if configure:
self.__configure_backend(reset=True)
def __configure_backend(self, backend=None, category="cascade", reset=False):
"""
Custom implementation of the base method.
See base method for details.
"""
if category != "cascade":
raise UnsupportedBackendError("Backend category '%s' is not supported" % category)
if reset:
super(CascadeFinder, self).configure_backend("cascade", reset=True)
self.params[category] = {}
self.params[category]["backend"] = "none"
self.params[category]["scaleFactor"] = CVParameter(1.1, 0.0, None, 0.1)
self.params[category]["minNeighbors"] = CVParameter(3, 0, None, 1.0)
self.params[category]["minWidth"] = CVParameter(0, 0, None, 100.0)
self.params[category]["maxWidth"] = CVParameter(1000, 0, None, 100.0)
self.params[category]["minHeight"] = CVParameter(0, 0, None, 100.0)
self.params[category]["maxHeight"] = CVParameter(1000, 0, None, 100.0)
[docs] def find(self, needle, haystack):
"""
Custom implementation of the base method.
:param needle: target pattern (cascade) to search for
:type needle: :py:class:`Pattern`
See base method for details.
"""
needle.match_settings = self
needle.use_own_settings = True
self.imglog.needle = needle
self.imglog.haystack = haystack
self.imglog.dump_matched_images()
import cv2
import numpy
needle_cascade = cv2.CascadeClassifier(needle.data_file)
if needle_cascade.empty():
raise Exception("Could not load the cascade classifier properly")
gray_haystack = cv2.cvtColor(numpy.array(haystack.pil_image), cv2.COLOR_RGB2GRAY)
canvas = numpy.array(haystack.pil_image)
from .match import Match
matches = []
rects = needle_cascade.detectMultiScale(gray_haystack,
self.params["cascade"]["scaleFactor"].value,
self.params["cascade"]["minNeighbors"].value,
0,
(self.params["cascade"]["minWidth"].value,
self.params["cascade"]["minHeight"].value),
(self.params["cascade"]["maxWidth"].value,
self.params["cascade"]["maxHeight"].value))
for (x, y, w, h) in rects:
cv2.rectangle(canvas, (x, y), (x+w, y+h), (0, 0, 0), 2)
cv2.rectangle(canvas, (x, y), (x+w, y+h), (255, 0, 0), 1)
dx, dy = needle.center_offset.x, needle.center_offset.y
matches.append(Match(x, y, w, h, dx, dy))
self.imglog.similarities.append(self.params["find"]["similarity"].value)
self.imglog.locations = [(loc.x, loc.y) for loc in matches]
self.imglog.hotmaps.append(canvas)
self.imglog.log(30)
return matches
[docs]class TextFinder(ContourFinder):
"""
Text matching backend provided by OpenCV.
This matcher will find a text (string) needle in the haystack,
eventually relying on Tesseract or simpler kNN-based OCR,
using extremal regions or contours before recognition, and
returning a match if the string is among the recognized strings
using string metric similar to Hamming distance.
Extremal Region Filter algorithm described in:
Neumann L., Matas J.: Real-Time Scene Text Localization and Recognition, CVPR 2012
"""
[docs] def __init__(self, configure=True, synchronize=True):
"""Build a CV backend using OpenCV's text matching options."""
super(TextFinder, self).__init__(configure=False, synchronize=False)
# available and currently fully compatible methods
self.categories["text"] = "text_matchers"
self.categories["tdetect"] = "text_detectors"
self.categories["ocr"] = "text_recognizers"
self.categories["threshold2"] = "threshold_filters2"
self.categories["threshold3"] = "threshold_filters3"
self.algorithms["text_matchers"] = ("mixed",)
self.algorithms["text_detectors"] = ("east", "erstat", "contours", "components")
self.algorithms["text_recognizers"] = ("pytesseract", "tesserocr", "tesseract", "hmm", "beamSearch")
self.algorithms["threshold_filters2"] = tuple(self.algorithms["threshold_filters"])
self.algorithms["threshold_filters3"] = tuple(self.algorithms["threshold_filters"])
# other attributes
self.erc1 = None
self.erf1 = None
self.erc2 = None
self.erf2 = None
self.ocr = None
# additional preparation
if configure:
self.__configure(reset=True)
if synchronize:
self.__synchronize(reset=False)
def __configure_backend(self, backend=None, category="text", reset=False):
"""
Custom implementation of the base method.
See base method for details.
"""
if category not in ["text", "tdetect", "ocr", "contour", "threshold", "threshold2", "threshold3"]:
raise UnsupportedBackendError("Backend category '%s' is not supported" % category)
elif category in ["contour", "threshold"]:
ContourFinder.configure_backend(self, backend, category, reset)
return
elif category in ["threshold2", "threshold3"]:
# simply duplicate the first threshold stage configuration
threshold1 = self.params.get("threshold", None)
ContourFinder.configure_backend(self, backend, "threshold", reset)
self.params[category] = self.params["threshold"]
if threshold1 is None:
del self.params["threshold"]
else:
self.params["threshold"] = threshold1
return
if reset:
Finder.configure_backend(self, "text", reset=True)
if category == "text" and backend is None:
backend = "mixed"
elif category == "tdetect" and backend is None:
backend = GlobalConfig.text_detect_backend
elif category == "ocr" and backend is None:
backend = GlobalConfig.text_ocr_backend
if backend not in self.algorithms[self.categories[category]]:
raise UnsupportedBackendError("Backend '%s' is not among the supported ones: "
"%s" % (backend, self.algorithms[self.categories[category]]))
log.log(9, "Setting backend for %s to %s", category, backend)
self.params[category] = {}
self.params[category]["backend"] = backend
if category == "text":
self.params[category]["datapath"] = CVParameter("../misc")
elif category == "tdetect":
if backend == "east":
# network input dimensions - must be divisible by 32, however currently only
# 320x320 doesn't error out from the OpenCV implementation
self.params[category]["input_res_x"] = CVParameter(320, 32, None, 32.0)
self.params[category]["input_res_y"] = CVParameter(320, 32, None, 32.0)
self.params[category]["min_box_confidence"] = CVParameter(0.8, 0.0, 1.0, 0.1)
elif backend == "erstat":
self.params[category]["thresholdDelta"] = CVParameter(1, 1, 255, 50.0)
self.params[category]["minArea"] = CVParameter(0.00025, 0.0, 1.0, 0.25, 0.001)
self.params[category]["maxArea"] = CVParameter(0.13, 0.0, 1.0, 0.25, 0.001)
self.params[category]["minProbability"] = CVParameter(0.4, 0.0, 1.0, 0.25, 0.01)
self.params[category]["nonMaxSuppression"] = CVParameter(True)
self.params[category]["minProbabilityDiff"] = CVParameter(0.1, 0.0, 1.0, 0.25, 0.01)
self.params[category]["minProbability2"] = CVParameter(0.3, 0.0, 1.0, 0.25, 0.01)
elif backend == "contours":
self.params[category]["maxArea"] = CVParameter(10000, 0, None, 1000.0, 10.0)
self.params[category]["minWidth"] = CVParameter(1, 0, None, 100.0)
self.params[category]["maxWidth"] = CVParameter(100, 0, None, 100.0)
self.params[category]["minHeight"] = CVParameter(1, 0, None, 100.0)
self.params[category]["maxHeight"] = CVParameter(100, 0, None, 100.0)
self.params[category]["minAspectRatio"] = CVParameter(0.1, 0.0, None, 10.0)
self.params[category]["maxAspectRatio"] = CVParameter(2.5, 0.0, None, 10.0)
self.params[category]["horizontalSpacing"] = CVParameter(10, 0, None, 10.0)
self.params[category]["verticalVariance"] = CVParameter(10, 0, None, 10.0)
# 0 horizontal, 1 vertical
self.params[category]["orientation"] = CVParameter(0, 0, 1, enumerated=True)
self.params[category]["minChars"] = CVParameter(3, 0, None, 2.0)
elif backend == "components":
# with equal delta and tolerance we ensure that only one failure will be
# allowed and no intermediary values between 4 and 8 will be selected
self.params[category]["connectivity"] = CVParameter(4, 4, 8, 4.0, 4.0)
elif category == "ocr":
if backend in ["tesseract", "tesserocr", "pytesseract"]:
# eng, deu, etc. (ISO 639-3)
self.params[category]["language"] = CVParameter("eng")
self.params[category]["char_whitelist"] = CVParameter(" 0123456789abcdefghijklmnopqrst"
"uvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
# 0 original tesseract only, 1 neural nets LSTM only, 2 both, 3 anything available
self.params[category]["oem"] = CVParameter(3, 0, 3, enumerated=True)
# 13 different page segmentation modes - see Tesseract API
self.params[category]["psmode"] = CVParameter(3, 0, 13, enumerated=True)
if backend == "pytesseract":
self.params[category]["extra_configs"] = CVParameter("")
# TODO: there could be a decent way to change component modes
self.params[category]["component_level"] = CVParameter(1, 1, 1, enumerated=True)
elif backend == "tesserocr":
# TODO: there could be a decent way to change component modes
self.params[category]["component_level"] = CVParameter(1, 1, 1, enumerated=True)
else:
# 0 OCR_LEVEL_WORD, 1 OCR_LEVEL_TEXT_LINE
self.params[category]["component_level"] = CVParameter(1, 0, 1, enumerated=True)
# perform custom image thresholding if set to true or leave it to the OCR
self.params[category]["binarize_text"] = CVParameter(False)
elif backend == "hmm":
# 1 NM 2 CNN as classifiers for hidden markov models (see OpenCV documentation)
self.params[category]["classifier"] = CVParameter(1, 1, 2, enumerated=True)
# 0 OCR_LEVEL_WORD
self.params[category]["component_level"] = CVParameter(0, 0, 1, enumerated=True)
# perform custom image thresholding if set to true or leave it to the OCR
self.params[category]["binarize_text"] = CVParameter(True)
else:
# perform custom image thresholding if set to true or leave it to the OCR
self.params[category]["binarize_text"] = CVParameter(True)
self.params[category]["min_confidence"] = CVParameter(0, 0, 100, 25.0)
# zoom factor for improved OCR processing due to higher resolution
self.params[category]["zoom_factor"] = CVParameter(1.0, 1.0, 100.0, 25.0)
# border size to wrap around text field to improve recognition rate
self.params[category]["border_size"] = CVParameter(10, 0, 100, 25.0)
# 0 erode, 1 dilate, 2 both, 3 none
self.params[category]["erode_dilate"] = CVParameter(3, 0, 3, enumerated=True)
# 0 MORPH_RECT, 1 MORPH_ELLIPSE, 2 MORPH_CROSS
self.params[category]["ed_kernel_type"] = CVParameter(0, 0, 2, enumerated=True)
self.params[category]["ed_kernel_width"] = CVParameter(1, 1, 1000, 250.0, 2.0)
self.params[category]["ed_kernel_height"] = CVParameter(1, 1, 1000, 250.0, 2.0)
# perform distance transform if ture or not if false
self.params[category]["distance_transform"] = CVParameter(False)
# 1 CV_DIST_L1, 2 CV_DIST_L2, 3 CV_DIST_C
self.params[category]["dt_distance_type"] = CVParameter(1, 1, 3, enumerated=True)
# 0 (precise) or 3x3 or 5x5 (the latest only works with Euclidean distance CV_DIST_L2)
self.params[category]["dt_mask_size"] = CVParameter(3, 0, 5, 8.0, 2.0)
[docs] def configure_backend(self, backend=None, category="text", reset=False):
"""
Custom implementation of the base method.
See base method for details.
"""
self.__configure_backend(backend, category, reset)
def __configure(self, text_detector=None, text_recognizer=None,
threshold_filter=None, threshold_filter2=None,
threshold_filter3=None, reset=True):
self.__configure_backend(category="text", reset=reset)
self.__configure_backend(text_detector, "tdetect")
self.__configure_backend(text_recognizer, "ocr")
self.__configure_backend(category="contour")
self.__configure_backend(threshold_filter, "threshold")
self.__configure_backend(threshold_filter2, "threshold2")
self.__configure_backend(threshold_filter3, "threshold3")
[docs] def configure(self, text_detector=None, text_recognizer=None,
threshold_filter=None, threshold_filter2=None,
threshold_filter3=None, reset=True, **kwargs):
"""
Custom implementation of the base method.
:param text_detector: name of a preselected backend
:type text_detector: str or None
:param text_recognizer: name of a preselected backend
:type text_recognizer: str or None
:param threshold_filter: threshold filter for the text detection stage
:type threshold_filter: str or None
:param threshold_filter2: additional threshold filter for the OCR stage
:type threshold_filter2: str or None
:param threshold_filter3: additional threshold filter for distance transformation
:type threshold_filter3: str or None
"""
self.__configure(text_detector, text_recognizer,
threshold_filter, threshold_filter2, threshold_filter3,
reset)
def __synchronize_backend(self, backend=None, category="text", reset=False):
if category not in ["text", "tdetect", "ocr", "contour", "threshold", "threshold2", "threshold3"]:
raise UnsupportedBackendError("Backend category '%s' is not supported" % category)
if reset:
Finder.synchronize_backend(self, "text", reset=True)
if backend is not None and self.params[category]["backend"] != backend:
raise UninitializedBackendError("Backend '%s' has not been configured yet" % backend)
backend = self.params[category]["backend"]
import cv2
datapath = self.params["text"]["datapath"].value
if category == "text" or category in ["contour", "threshold", "threshold2"]:
# nothing to sync
return
elif category == "tdetect" and backend == "east":
self.east_net = cv2.dnn.readNet(os.path.join(datapath, 'frozen_east_text_detection.pb'))
elif category == "tdetect" and backend == "erstat":
self.erc1 = cv2.text.loadClassifierNM1(os.path.join(datapath, 'trained_classifierNM1.xml'))
self.erf1 = cv2.text.createERFilterNM1(self.erc1,
self.params["tdetect"]["thresholdDelta"].value,
self.params["tdetect"]["minArea"].value,
self.params["tdetect"]["maxArea"].value,
self.params["tdetect"]["minProbability"].value,
self.params["tdetect"]["nonMaxSuppression"].value,
self.params["tdetect"]["minProbabilityDiff"].value)
self.erc2 = cv2.text.loadClassifierNM2(os.path.join(datapath, 'trained_classifierNM2.xml'))
self.erf2 = cv2.text.createERFilterNM2(self.erc2, self.params["tdetect"]["minProbability2"].value)
elif category == "tdetect":
# nothing to sync
return
elif category == "ocr":
tessdata_path = os.path.join(datapath, "tessdata")
if not os.path.exists(tessdata_path):
tessdata_path = os.environ.get("TESSDATA_PREFIX", ".")
if backend == "pytesseract":
import pytesseract
self.ocr = pytesseract
self.ocr_config = r"--tessdata-dir '%s' --oem %s --psm %s "
self.ocr_config %= (tessdata_path,
self.params["ocr"]["oem"].value,
self.params["ocr"]["psmode"].value)
self.ocr_config += r"-c tessedit_char_whitelist='%s' %s"
self.ocr_config %= (self.params["ocr"]["char_whitelist"].value,
self.params["ocr"]["extra_configs"].value)
elif backend == "tesserocr":
from tesserocr import PyTessBaseAPI
self.ocr = PyTessBaseAPI(path=tessdata_path,
lang=self.params["ocr"]["language"].value,
oem=self.params["ocr"]["oem"].value,
psm=self.params["ocr"]["psmode"].value)
self.ocr.SetVariable("tessedit_char_whitelist", self.params["ocr"]["char_whitelist"].value)
elif backend == "tesseract":
self.ocr = cv2.text.OCRTesseract_create(tessdata_path,
language=self.params["ocr"]["language"].value,
char_whitelist=self.params["ocr"]["char_whitelist"].value,
oem=self.params["ocr"]["oem"].value,
psmode=self.params["ocr"]["psmode"].value)
elif backend in ["hmm", "beamSearch"]:
import numpy
# vocabulary is strictly related with the XML data so remains hardcoded here
vocabulary = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
with open(os.path.join(datapath, 'OCRHMM_transitions_table.xml')) as f:
transition_p_xml = f.read()
transition_p_data = re.search("<data>(.*)</data>",
transition_p_xml.replace("\n", " "))
assert transition_p_data is not None, "Corrupted transition probability data"
transition_p = numpy.fromstring(transition_p_data.group(1).strip(), sep=' ').reshape(62, 62)
emission_p = numpy.eye(62, dtype=numpy.float64)
if backend == "hmm":
classifier_data = os.path.join(datapath, 'OCRHMM_knn_model_data.xml.gz')
if self.params["ocr"]["classifier"].value == 1:
classifier = cv2.text.loadOCRHMMClassifierNM(classifier_data)
elif self.params["ocr"]["classifier"].value == 2:
classifier = cv2.text.loadOCRHMMClassifierCNN(classifier_data)
else:
raise ValueError("Invalid classifier selected for OCR - must be NM or CNN")
self.ocr = cv2.text.OCRHMMDecoder_create(classifier, vocabulary, transition_p, emission_p)
else:
classifier_data = os.path.join(datapath, 'OCRBeamSearch_CNN_model_data.xml.gz')
classifier = cv2.text.loadOCRBeamSearchClassifierCNN(classifier_data)
self.ocr = cv2.text.OCRBeamSearchDecoder_create(classifier, vocabulary, transition_p, emission_p)
else:
raise ValueError("Invalid OCR backend '%s'" % backend)
[docs] def synchronize_backend(self, backend=None, category="text", reset=False):
"""
Custom implementation of the base method.
See base method for details.
"""
self.__synchronize_backend(backend, category, reset)
def __synchronize(self, text_detector=None, text_recognizer=None,
threshold_filter=None, threshold_filter2=None,
threshold_filter3=None, reset=True):
self.__synchronize_backend(category="text", reset=reset)
self.__synchronize_backend(text_detector, "tdetect")
self.__synchronize_backend(text_recognizer, "ocr")
self.__synchronize_backend(category="contour")
self.__synchronize_backend(threshold_filter, "threshold")
self.__synchronize_backend(threshold_filter2, "threshold2")
self.__synchronize_backend(threshold_filter3, "threshold3")
[docs] def synchronize(self, text_detector=None, text_recognizer=None,
threshold_filter=None, threshold_filter2=None,
threshold_filter3=None, reset=True):
"""
Custom implementation of the base method.
:param text_detector: name of a preselected backend
:type text_detector: str or None
:param text_recognizer: name of a preselected backend
:type text_recognizer: str or None
:param threshold_filter: threshold filter for the text detection stage
:type threshold_filter: str or None
:param threshold_filter2: additional threshold filter for the OCR stage
:type threshold_filter2: str or None
:param threshold_filter3: additional threshold filter for distance transformation
:type threshold_filter3: str or None
"""
self.__synchronize(text_detector, text_recognizer,
threshold_filter, threshold_filter2, threshold_filter3,
reset)
[docs] def find(self, needle, haystack):
"""
Custom implementation of the base method.
:param needle: target text to search for
:type needle: :py:class:`Text`
See base method for details.
"""
needle.match_settings = self
needle.use_own_settings = True
self.imglog.needle = needle
self.imglog.haystack = haystack
self.imglog.dump_matched_images()
import cv2
import numpy
text_needle = needle.value
img_haystack = numpy.array(haystack.pil_image)
final_hotmap = numpy.array(haystack.pil_image)
# detect characters and group them into detected text
backend = self.params["tdetect"]["backend"]
log.debug("Detecting text with %s", backend)
if backend == "east":
text_regions = self._detect_text_east(haystack)
elif backend == "erstat":
text_regions = self._detect_text_erstat(haystack)
elif backend == "contours":
text_regions = self._detect_text_contours(haystack)
elif backend == "components":
text_regions = self._detect_text_components(haystack)
else:
raise UnsupportedBackendError("Unsupported text detection backend %s" % backend)
# perform optical character recognition on the final regions
backend = self.params["ocr"]["backend"]
log.debug("Recognizing text with %s", backend)
from .match import Match
matches = []
def binarize_step(threshold, text_img):
if self.params["ocr"]["binarize_text"].value:
first_threshold = self.params["threshold"]
self.params["threshold"] = self.params[threshold]
try:
text_img = self._binarize_image(text_img)
finally:
self.params["threshold"] = first_threshold
return text_img
else:
return cv2.cvtColor(text_img, cv2.COLOR_RGB2GRAY)
for i, text_box in enumerate(text_regions):
# main OCR preprocessing stage
border = self.params["ocr"]["border_size"].value
text_img = img_haystack[max(text_box[1]-border, 0):min(text_box[1]+text_box[3]+border, img_haystack.shape[0]),
max(text_box[0]-border, 0):min(text_box[0]+text_box[2]+border, img_haystack.shape[1])]
factor = self.params["ocr"]["zoom_factor"].value
log.debug("Zooming x%i candidate for improved OCR processing", factor)
text_img = cv2.resize(text_img, None, fx=factor, fy=factor)
text_img = binarize_step("threshold2", text_img)
if self.params["ocr"]["distance_transform"].value:
text_img = cv2.distanceTransform(text_img,
self.params["ocr"]["dt_distance_type"].value,
self.params["ocr"]["dt_mask_size"].value)
text_img = cv2.cvtColor(numpy.asarray(text_img, dtype='uint8'), cv2.COLOR_GRAY2RGB)
text_img = binarize_step("threshold3", text_img)
if self.params["ocr"]["erode_dilate"].value < 3:
element = cv2.getStructuringElement(self.params["ocr"]["ed_kernel_type"].value,
(self.params["ocr"]["ed_kernel_width"].value,
self.params["ocr"]["ed_kernel_height"].value))
if self.params["ocr"]["erode_dilate"].value in [0, 2]:
text_img = cv2.erode(text_img, element)
if self.params["ocr"]["erode_dilate"].value in [1, 2]:
text_img = cv2.dilate(text_img, element)
self.imglog.hotmaps.append(text_img)
# BUG: we hit segfault when using the BeamSearch OCR backend so disallow it
if backend == "beamSearch":
raise NotImplementedError("Current version of BeamSearch segfaults so it's not yet available")
# TODO: we can do this now with pytesseract/tesserocr but have to evaluate its usefulness
#vector<Rect> boxes;
#vector<string> words;
#vector<float> confidences;
#output = ocr.run(group_img, &boxes, &words, &confidences, cv2.text.OCR_LEVEL_WORD)
# redirection of tesseract's streams can only be done on the file descriptor level
# sys.stdout = open(os.devnull, 'w')
if backend == "pytesseract":
output = self.ocr.image_to_string(text_img,
lang=self.params["ocr"]["language"].value,
config=self.ocr_config)
logging.debug("Running pytesseract with extra command line %s", self.ocr_config)
elif backend == "tesserocr":
self.ocr.SetImage(PIL.Image.fromarray(text_img))
output = self.ocr.GetUTF8Text()
else:
stdout_fd = sys.stdout.fileno() if hasattr(sys.stdout, "fileno") else 1
stderr_fd = sys.stderr.fileno() if hasattr(sys.stderr, "fileno") else 2
null_fo = open(os.devnull, 'wb')
with os.fdopen(os.dup(stdout_fd), 'wb') as cpout_fo:
with os.fdopen(os.dup(stderr_fd), 'wb') as cperr_fo:
sys.stdout.flush()
sys.stderr.flush()
os.dup2(null_fo.fileno(), stdout_fd)
os.dup2(null_fo.fileno(), stderr_fd)
output = self.ocr.run(text_img, text_img,
self.params["ocr"]["min_confidence"].value,
self.params["ocr"]["component_level"].value)
sys.stdout.flush()
sys.stderr.flush()
os.dup2(cpout_fo.fileno(), stdout_fd)
os.dup2(cperr_fo.fileno(), stderr_fd)
null_fo.close()
if self.params["ocr"]["component_level"].value == 1:
# strip of the new line character which is never useful
output = output.rstrip()
log.debug("OCR output %s = '%s'", i+1, output)
similarity = 1.0 - float(needle.distance_to(output)) / max(len(output), len(text_needle))
log.debug("Similarity = '%s'", similarity)
self.imglog.similarities.append(similarity)
if similarity >= self.params["find"]["similarity"].value:
log.debug("Text at (%s, %s) is acceptable", text_box[0], text_box[1])
self.imglog.locations.append((text_box[0], text_box[1]))
x, y, w, h = text_box
dx, dy = needle.center_offset.x, needle.center_offset.y
cv2.rectangle(final_hotmap, (x, y), (x+w, y+h), (0, 0, 0), 2)
cv2.rectangle(final_hotmap, (x, y), (x+w, y+h), (255, 255, 255), 1)
matches.append(Match(x, y, w, h, dx, dy, similarity))
matches = sorted(matches, key=lambda x: x.similarity, reverse=True)
self.imglog.hotmaps.append(final_hotmap)
self.imglog.log(30)
return matches
def _detect_text_east(self, haystack):
#:.. note:: source implementation by Adrian Rosebrock from his post:
#: https://www.pyimagesearch.com/2018/08/20/opencv-text-detection-east-text-detector/
import cv2
import numpy
img = numpy.array(haystack.pil_image)
char_canvas = cv2.cvtColor(numpy.array(haystack.pil_image), cv2.COLOR_RGB2GRAY)
text_canvas = numpy.array(haystack.pil_image)
self.imglog.hotmaps.append(char_canvas)
self.imglog.hotmaps.append(text_canvas)
# resize the image to resolution compatible with the model
inp_width, inp_height = (self.params["tdetect"]["input_res_x"].value,
self.params["tdetect"]["input_res_y"].value)
width_ratio = img.shape[1] / float(inp_width)
height_ratio = img.shape[0] / float(inp_height)
img = cv2.resize(img, (inp_width, inp_height))
# convert to a model-compatible input using the mean from the training
inp = cv2.dnn.blobFromImage(img, mean=(123.68, 116.78, 103.94), swapRB=True, crop=False)
self.east_net.setInput(inp)
# select two output layers for the EAST detector model respectivelly for
# the output probabilities and the text bounding box coordinates
output_layers = ["feature_fusion/Conv_7/Sigmoid", "feature_fusion/concat_3"]
probability, geometry = self.east_net.forward(output_layers)
char_canvas[:] = cv2.resize(probability[0, 0]*255.0, (char_canvas.shape[1], char_canvas.shape[0]))
rects = []
for row in range(0, probability.shape[2]):
row_scores = probability[0, 0, row]
row_data = geometry[0, :, row]
for col in range(0, probability.shape[3]):
# prune out subthreshold probability of being a text
if row_scores[col] < self.params["tdetect"]["min_box_confidence"].value:
continue
# use geometry data to get input size and rescale for final bounding box width and height
h = min(row_data[0][col] + row_data[2][col], inp_height) * height_ratio
w = min(row_data[1][col] + row_data[3][col], inp_width) * width_ratio
# output layer dimensions are 4x smaller than the input layer dimentions
(dx, dy) = (col + 1) * 4.0, (row + 1) * 4.0
# calculate the rotation angle from the prediction ouput
sin, cos = numpy.sin(row_data[4][col]), numpy.cos(row_data[4][col])
# compute the starting (from ending) coordinates for the text bounding box
x2 = min(dx + cos * row_data[1][col] + sin * row_data[2][col], inp_width) * width_ratio
y2 = min(dy - sin * row_data[1][col] + cos * row_data[2][col], inp_height) * height_ratio
# the network might give unlimited region boundaries so limit by input width/height (above)
x1, y1 = x2 - w, y2 - h
rect = (int(x1), int(y1), int(w), int(h))
cv2.rectangle(char_canvas, (rect[0], rect[1]), (rect[0]+rect[2], rect[1]+rect[3]), (0, 0, 0), 2)
cv2.rectangle(char_canvas, (rect[0], rect[1]), (rect[0]+rect[2], rect[1]+rect[3]), (255, 255, 255), 1)
rects.append(rect)
# TODO: needed for outsourced nonmaxima supression
# confidences.append(row_scores[x])
logging.debug("A total of %s possible text regions found", len(rects))
# produce a final set of nonintersecting text regions
text_regions = []
# TODO: apply outsourced nonmaxima suppression as the current OpenCV
# implementation is broken in the number of python2C++ called arguments
# indices = cv2.dnn.NMSBoxesRotated(rects, confidences, 0.5, 0.5, 1., 0)
region_queue = [[region, True] for region in rects]
while True:
# nothing to do for just one region
if len(region_queue) < 2:
break
r1, flag1 = region_queue.pop(0)
if not flag1:
continue
for r2pair in region_queue:
r2, _ = r2pair
# if the two regions intersect
if (r1[0] < r2[0] + r2[2] and r1[0] + r1[2] > r2[0]
and r1[1] < r2[1] + r2[3] and r1[1] + r1[3] > r2[1]):
r1 = [min(r1[0], r2[0]), min(r1[1], r2[1]), max(r1[2], r2[2]), max(r1[3], r2[3])]
# second region will no longer be considered
r2pair[1] = False
# first region is now merged with all intersecting regions
text_regions.append(r1)
for rect in text_regions:
cv2.rectangle(text_canvas, (rect[0], rect[1]), (rect[0]+rect[2], rect[1]+rect[3]), (0, 0, 0), 2)
cv2.rectangle(text_canvas, (rect[0], rect[1]), (rect[0]+rect[2], rect[1]+rect[3]), (0, 0, 255), 1)
logging.debug("A total of %s final text regions found", len(text_regions))
return text_regions
def _detect_text_erstat(self, haystack):
import cv2
import numpy
img = numpy.array(haystack.pil_image)
char_canvas = numpy.array(haystack.pil_image)
text_canvas = numpy.array(haystack.pil_image)
self.imglog.hotmaps.append(char_canvas)
self.imglog.hotmaps.append(text_canvas)
# extract channels to be processed individually - B, G, R, lightness, and gradient magnitude
channels = list(cv2.text.computeNMChannels(img))
# append negative channels to detect ER- (bright regions over dark background) skipping the gradient channel
channel_num_without_grad = len(channels)-1
for i in range(0, channel_num_without_grad):
channels.append(255-channels[i])
char_regions = []
text_regions = []
# apply the default cascade classifier to each independent channel
log.debug("Extracting class specific extremal regions from %s channels", len(channels))
for i, channel in enumerate(channels):
# one liner for "erf1.run(channel)" then "erf2.run(channel)"
regions = cv2.text.detectRegions(channel, self.erf1, self.erf2)
logging.debug("A total of %s possible character regions found on channel %s", len(regions), i)
rects = [cv2.boundingRect(p.reshape(-1, 1, 2)) for p in regions]
for rect in rects:
cv2.rectangle(char_canvas, (rect[0], rect[1]), (rect[0]+rect[2], rect[1]+rect[3]), (0, 0, 0), 2)
cv2.rectangle(char_canvas, (rect[0], rect[1]), (rect[0]+rect[2], rect[1]+rect[3]), (0, 0, 255), 1)
if len(regions) == 0:
continue
region_groups = cv2.text.erGrouping(img, channel, [r.tolist() for r in regions])
logging.debug("A total of %s possible text regions found on channel %s", len(region_groups), i)
for rect in region_groups:
cv2.rectangle(text_canvas, (rect[0], rect[1]), (rect[0]+rect[2], rect[1]+rect[3]), (0, 0, 0), 2)
cv2.rectangle(text_canvas, (rect[0], rect[1]), (rect[0]+rect[2], rect[1]+rect[3]), (0, 255, 0), 1)
char_regions.extend(regions)
text_regions.extend(region_groups)
# produce a final set of nonintersecting text regions
final_regions = []
region_queue = [[region, True] for region in text_regions]
while True:
# nothing to do for just one region
if len(region_queue) < 2:
break
r1, flag1 = region_queue.pop(0)
if not flag1:
continue
for r2pair in region_queue:
r2, _ = r2pair
# if the two regions intersect
if (r1[0] < r2[0] + r2[2] and r1[0] + r1[2] > r2[0]
and r1[1] < r2[1] + r2[3] and r1[1] + r1[3] > r2[1]):
r1 = [min(r1[0], r2[0]), min(r1[1], r2[1]), max(r1[2], r2[2]), max(r1[3], r2[3])]
# second region will no longer be considered
r2pair[1] = False
# first region is now merged with all intersecting regions
final_regions.append(r1)
return final_regions
def _detect_text_contours(self, haystack):
import cv2
import numpy
img = numpy.array(haystack.pil_image)
char_canvas = numpy.array(haystack.pil_image)
text_canvas = numpy.array(haystack.pil_image)
self.imglog.hotmaps.append(char_canvas)
self.imglog.hotmaps.append(text_canvas)
thresh_haystack = self._binarize_image(img)
countours_haystack = thresh_haystack.copy()
haystack_contours = self._extract_contours(countours_haystack)
char_regions = []
for hcontour in haystack_contours:
x, y, w, h = cv2.boundingRect(hcontour)
area, ratio = cv2.contourArea(hcontour), float(w)/h
if (area < self.params["contour"]["minArea"].value
or area > self.params["tdetect"]["maxArea"].value
or w < self.params["tdetect"]["minWidth"].value
or w > self.params["tdetect"]["maxWidth"].value
or h < self.params["tdetect"]["minHeight"].value
or h > self.params["tdetect"]["maxHeight"].value
or ratio < self.params["tdetect"]["minAspectRatio"].value
or ratio > self.params["tdetect"]["maxAspectRatio"].value):
log.debug("Ignoring contour with area %sx%s>%s and aspect ratio %s/%s=%s",
w, h, area, w, h, ratio)
continue
else:
cv2.rectangle(char_canvas, (x, y), (x+w, y+h), (0, 0, 0), 2)
cv2.rectangle(char_canvas, (x, y), (x+w, y+h), (0, 0, 255), 1)
char_regions.append([x, y, w, h])
char_regions = sorted(char_regions, key=lambda x: x[0])
# group characters into horizontally-correlated regions
text_regions = []
dx, dy = self.params["tdetect"]["horizontalSpacing"].value, self.params["tdetect"]["verticalVariance"].value
text_orientation = self.params["tdetect"]["orientation"].value
min_chars_for_text = self.params["tdetect"]["minChars"].value
for i, region1 in enumerate(char_regions):
# region was already merged
if region1 is None:
continue
chars_for_text = 0
for j, region2 in enumerate(char_regions):
# region is compared to itself or to merged region
if region1 == region2 or region2 is None:
continue
x1, y1, w1, h1 = region1
x2, y2, w2, h2 = region2
if text_orientation == 0:
is_text = x2 - (x1 + w1) < dx and x1 - (x2 + w2) < dx and abs(y1 - y2) < dy and abs(h1 - h2) < 2*dy
elif text_orientation == 1:
is_text = y2 - (y1 + h1) < dy and y1 - (y2 + h2) < dy and abs(x1 - x2) < dx and abs(w1 - w2) < 2*dx
if is_text:
region1 = [min(x1, x2), min(y1, y2), max(x1+w1, x2+w2)-min(x1, x2), max(y1+h1, y2+h2)-min(y1, y2)]
chars_for_text += 1
char_regions[j] = None
if chars_for_text < min_chars_for_text:
log.debug("Ignoring text contour with %s<%s characters",
chars_for_text, min_chars_for_text)
continue
x, y, w, h = region1
cv2.rectangle(text_canvas, (x, y), (x+w, y+h), (0, 0, 0), 2)
cv2.rectangle(text_canvas, (x, y), (x+w, y+h), (0, 255, 0), 1)
text_regions.append(region1)
char_regions[i] = None
return text_regions
def _detect_text_components(self, haystack):
import cv2
import numpy
img = numpy.array(haystack.pil_image)
char_canvas = numpy.array(haystack.pil_image)
text_canvas = numpy.array(haystack.pil_image)
self.imglog.hotmaps.append(char_canvas)
self.imglog.hotmaps.append(text_canvas)
connectivity = self.params["tdetect"]["connectivity"].value
label_num, label_img, stats, centroids = cv2.connectedComponentsWithStats(img, connectivity, cv2.CV_32S)
logging.debug("Detected %s component labels with centroids: %s", label_num,
", ".join([str((int(c[0]), int(c[1]))) for c in centroids]))
self.imglog.hotmaps.append(label_img * 255)
for i in range(label_num):
x, y = stats[i, cv2.CC_STAT_LEFT], stats[i, cv2.CC_STAT_TOP]
w, h = stats[i, cv2.CC_STAT_WIDTH], stats[i, cv2.CC_STAT_HEIGHT]
area = stats[i, cv2.CC_STAT_AREA]
if area < self.params["contour"]["minArea"].value:
continue
else:
rect = [x, y, w, h]
cv2.rectangle(char_canvas, (rect[0], rect[1]), (rect[0]+rect[2], rect[1]+rect[3]), (0, 0, 0), 2)
cv2.rectangle(char_canvas, (rect[0], rect[1]), (rect[0]+rect[2], rect[1]+rect[3]), (0, 0, 255), 1)
# TODO: log here since not fully implemented
self.imglog.hotmaps[-1] = cv2.normalize(label_img, label_img, 0, 255, cv2.NORM_MINMAX, cv2.CV_8U)
self.imglog.log(30)
raise NotImplementedError("The connected components method for text detection needs more labels")
# TODO: alternatively use cvBlobsLib
# myblobs = CBlobResult(binary_image, mask, 0, True)
# myblobs.filter_blobs(325, 2000)
# blob_count = myblobs.GetNumBlobs()
[docs] def log(self, lvl):
"""
Custom implementation of the base method.
See base method for details.
"""
# below selected logging level
if lvl < self.imglog.logging_level:
self.imglog.clear()
return
# logging is being collected for a specific logtype
elif ImageLogger.accumulate_logging:
return
# no hotmaps to log
elif len(self.imglog.hotmaps) == 0:
raise MissingHotmapError("No matching was performed in order to be image logged")
self.imglog.dump_hotmap("imglog%s-3hotmap-1char.png" % self.imglog.printable_step,
self.imglog.hotmaps[0])
self.imglog.dump_hotmap("imglog%s-3hotmap-2text.png" % self.imglog.printable_step,
self.imglog.hotmaps[1])
for i in range(2, len(self.imglog.hotmaps)-1):
self.imglog.dump_hotmap("imglog%s-3hotmap-3ocr-%stext-%s.png" % (self.imglog.printable_step, i-1,
self.imglog.similarities[i-2]),
self.imglog.hotmaps[i])
similarity = max(self.imglog.similarities) if len(self.imglog.similarities) > 0 else 0.0
self.imglog.dump_hotmap("imglog%s-3hotmap-%s.png" % (self.imglog.printable_step, similarity),
self.imglog.hotmaps[-1])
self.imglog.clear()
ImageLogger.step += 1
[docs]class TemplateFeatureFinder(TemplateFinder, FeatureFinder):
"""
Hybrid matcher using both OpenCV's template and feature matching.
Feature matching is robust at small regions not too abundant
of features where template matching is too picky. Template
matching is good at large feature abundant regions and can be
used as a heuristic for the feature matching. The current matcher
will perform template matching first and then feature matching on
the survived template matches to select among them one more time.
A separate (usually lower) front similarity is used for the first
stage template matching in order to remove a lot of noise that
would otherwise be distracting for the second stage feature matching.
"""
[docs] def __init__(self, configure=True, synchronize=True):
"""Build a CV backend using OpenCV's template and feature matching."""
super(TemplateFeatureFinder, self).__init__(configure=False, synchronize=False)
self.categories["tempfeat"] = "tempfeat_matchers"
self.algorithms["tempfeat_matchers"] = ("mixed",)
if configure:
self.__configure(reset=True)
if synchronize:
FeatureFinder.synchronize(self, reset=False)
def __configure_backend(self, backend=None, category="tempfeat", reset=False):
if category not in ["tempfeat", "template", "feature", "fdetect", "fextract", "fmatch"]:
raise UnsupportedBackendError("Backend category '%s' is not supported" % category)
elif category in ["feature", "fdetect", "fextract", "fmatch"]:
FeatureFinder.configure_backend(self, backend, category, reset)
return
elif category == "template":
TemplateFinder.configure_backend(self, backend, category, reset)
return
if reset:
Finder.configure_backend(self, "tempfeat", reset=True)
if backend is None:
backend = "mixed"
if backend not in self.algorithms[self.categories[category]]:
raise UnsupportedBackendError("Backend '%s' is not among the supported ones: "
"%s" % (backend, self.algorithms[self.categories[category]]))
self.params[category] = {}
self.params[category]["backend"] = backend
self.params[category]["front_similarity"] = CVParameter(0.7, 0.0, 1.0)
def __configure(self, template_match=None, feature_detect=None,
feature_extract=None, feature_match=None, reset=True):
self.__configure_backend(category="tempfeat", reset=reset)
self.__configure_backend(template_match, "template")
self.__configure_backend(category="feature")
self.__configure_backend(feature_detect, "fdetect")
self.__configure_backend(feature_extract, "fextract")
self.__configure_backend(feature_match, "fmatch")
[docs] def synchronize(self, feature_detect=None, feature_extract=None,
feature_match=None, reset=True):
"""
Custom implementation of the base method.
See base method for details.
"""
Finder.synchronize_backend(self, "tempfeat", reset=reset)
FeatureFinder.synchronize(self,
feature_detect=feature_detect,
feature_extract=feature_extract,
feature_match=feature_match,
reset=False)
[docs] def find(self, needle, haystack):
"""
Custom implementation of the base method.
See base method for details.
Use template matching to deal with feature dense regions
and guide a final feature matching stage.
"""
# accumulate one template and multiple feature cases
ImageLogger.accumulate_logging = True
# use a different lower similarity for the template matching
template_similarity = self.params["tempfeat"]["front_similarity"].value
feature_similarity = self.params["find"]["similarity"].value
log.debug("Using tempfeat matching with template similarity %s "
"and feature similarity %s", template_similarity,
feature_similarity)
# class-specific dependencies
import cv2
import numpy
self.params["find"]["similarity"].value = template_similarity
# call specifically the template find variant here
template_maxima = TemplateFinder.find(self, needle, haystack)
self.params["find"]["similarity"].value = feature_similarity
# dump correct matching settings
self.imglog.dump_matched_images()
ngray = cv2.cvtColor(numpy.array(needle.pil_image), cv2.COLOR_RGB2GRAY)
hgray = cv2.cvtColor(numpy.array(haystack.pil_image), cv2.COLOR_RGB2GRAY)
final_hotmap = numpy.array(haystack.pil_image)
frame_points = [(0, 0)]
feature_maxima = []
is_feature_poor = False
for i, upleft in enumerate(template_maxima):
up = upleft.y
down = min(haystack.height, up + needle.height)
left = upleft.x
right = min(haystack.width, left + needle.width)
log.log(9, "Maximum up-down is %s and left-right is %s",
(up, down), (left, right))
haystack_region = hgray[up:down, left:right]
haystack_region = haystack_region.copy()
hotmap_region = final_hotmap[up:down, left:right]
hotmap_region = hotmap_region.copy()
# four smaller hotmaps for the feature matching stages (draw on same image here)
self.imglog.hotmaps.append(hotmap_region)
self.imglog.hotmaps.append(hotmap_region)
self.imglog.hotmaps.append(hotmap_region)
self.imglog.hotmaps.append(hotmap_region)
res = self._project_features(frame_points, ngray, haystack_region, feature_similarity)
# if the feature matching succeeded or is worse than satisfactory template matching
if res is not None or (self.imglog.similarities[-1] > 0.0
and self.imglog.similarities[-1] < self.imglog.similarities[i]
and self.imglog.similarities[i] > feature_similarity):
# take the template matching location rather than the feature one
# for stability (they should ultimately be the same)
log.debug("Using template result %s instead of the worse feature result %s",
self.imglog.similarities[i], self.imglog.similarities[-1])
location = (left, up)
self.imglog.locations[-1] = location
feature_maxima.append([self.imglog.hotmaps[-1],
self.imglog.similarities[-1],
self.imglog.locations[-1]])
# stitch back for a better final image logging
final_hotmap[up:down, left:right] = hotmap_region
# if similarity is not zero but we have no result, we failed the comparison
elif self.imglog.similarities[-1] == 0.0:
is_feature_poor = True
# if at least one match is feature poor, we cannot rely on feature matching
if is_feature_poor:
log.warn("Feature poor needle detected, falling back to template matching")
# NOTE: this has knowledge of the internal workings of the _template_find_all
# template matching and more specifically that it orders the matches starting
# with the best (this is ok, since this is also internal method)
# NOTE: the needle can only be feature poor if there is at lease one
# template matching
feature_maxima = []
for i, _ in enumerate(template_maxima):
# test the template match also against the actual required similarity
if self.imglog.similarities[i] >= feature_similarity:
feature_maxima.append([self.imglog.hotmaps[i],
self.imglog.similarities[i],
self.imglog.locations[i]])
# release the accumulated logging from subroutines
ImageLogger.accumulate_logging = False
if len(feature_maxima) == 0:
log.debug("No acceptable match with the given feature similarity %s",
feature_similarity)
if len(self.imglog.similarities) > 1:
# NOTE: handle cases when the matching failed at the feature stage, i.e. dump
# a hotmap for debugging also in this case
self.imglog.hotmaps.append(final_hotmap)
self.imglog.similarities.append(self.imglog.similarities[len(template_maxima)])
self.imglog.locations.append(self.imglog.locations[len(template_maxima)])
elif len(self.imglog.similarities) == 1:
# NOTE: we are only interested in the template hotmap on template failure
self.imglog.hotmaps.append(self.imglog.hotmaps[0])
self.imglog.log(30)
return []
matches = []
from .match import Match
maxima = sorted(feature_maxima, key=lambda x: x[1], reverse=True)
for maximum in maxima:
similarity = maximum[1]
x, y = maximum[2]
w, h = needle.width, needle.height
dx, dy = needle.center_offset.x, needle.center_offset.y
cv2.rectangle(final_hotmap, (x, y), (x+needle.width, y+needle.height), (0, 0, 0), 2)
cv2.rectangle(final_hotmap, (x, y), (x+needle.width, y+needle.height), (0, 0, 255), 1)
matches.append(Match(x, y, w, h, dx, dy, similarity))
self.imglog.hotmaps.append(final_hotmap)
# log one best match for final hotmap filename
best_acceptable = maxima[0]
self.imglog.similarities.append(best_acceptable[1])
self.imglog.locations.append(best_acceptable[2])
self.imglog.log(30)
return matches
[docs] def log(self, lvl):
"""
Custom implementation of the base method.
See base method for details.
"""
# below selected logging level
if lvl < self.imglog.logging_level:
self.imglog.clear()
return
# logging is being collected for a specific logtype
elif ImageLogger.accumulate_logging:
return
# no hotmaps to log
elif len(self.imglog.hotmaps) == 0:
raise MissingHotmapError("No matching was performed in order to be image logged")
# knowing how the tempfeat works this estimates
# the expected number of cases starting from 1 (i+1)
# to make sure the winner is the first alphabetically
candidate_num = int(len(self.imglog.similarities) / 2)
for i in range(candidate_num):
name = "imglog%s-3hotmap-%stemplate-%s.png" % (self.imglog.printable_step,
i + 1, self.imglog.similarities[i])
self.imglog.dump_hotmap(name, self.imglog.hotmaps[i])
ii = candidate_num + i
hii = candidate_num + i*4 + 3
#self.imglog.log_locations(30, [self.imglog.locations[ii]], self.imglog.hotmaps[hii], 4, 255, 0, 0)
name = "imglog%s-3hotmap-%sfeature-%s.png" % (self.imglog.printable_step,
i + 1, self.imglog.similarities[ii])
self.imglog.dump_hotmap(name, self.imglog.hotmaps[hii])
if len(self.imglog.similarities) % 2 == 1:
name = "imglog%s-3hotmap-%s.png" % (self.imglog.printable_step,
self.imglog.similarities[-1])
self.imglog.dump_hotmap(name, self.imglog.hotmaps[-1])
self.imglog.clear()
ImageLogger.step += 1
[docs]class DeepFinder(Finder):
"""
Deep learning matching backend provided by PyTorch.
The current implementation contains a basic convolutional
neural network which can be trained to produce needle locations
from a haystack image.
"""
_cache = {}
[docs] def __init__(self, classifier_datapath=".", configure=True, synchronize=True):
"""Build a CV backend using OpenCV's text matching options."""
super(DeepFinder, self).__init__(configure=False, synchronize=False)
# available and currently fully compatible methods
self.categories["deep"] = "deep_learners"
self.algorithms["deep_learners"] = ("pytorch", "tensorflow")
# other attributes
self.net = None
# additional preparation
if configure:
self.__configure_backend(reset=True)
if synchronize:
self.__synchronize_backend(reset=False)
def __configure_backend(self, backend=None, category="deep", reset=False):
"""
Custom implementation of the base method.
See base method for details.
"""
if category != "deep":
raise UnsupportedBackendError("Backend category '%s' is not supported" % category)
if reset:
super(DeepFinder, self).configure_backend("deep", reset=True)
if backend is None:
backend = GlobalConfig.deep_learn_backend
if backend not in self.algorithms[self.categories[category]]:
raise UnsupportedBackendError("Backend '%s' is not among the supported ones: "
"%s" % (backend, self.algorithms[self.categories[category]]))
self.params[category] = {}
self.params[category]["backend"] = backend
# "cpu", "cuda", or "auto"
self.params[category]["device"] = CVParameter("auto")
# number of anticipated classes (target patterns)
self.params[category]["classes"] = CVParameter(91, 1, None, 1)
# "fasterrcnn_resnet50_fpn", "maskrcnn_resnet50_fpn" or other detection models
self.params[category]["arch"] = CVParameter("fasterrcnn_resnet50_fpn")
# file to load pre-trained model weights from
self.params[category]["model"] = CVParameter("")
def __synchronize_backend(self, backend=None, category="deep", reset=False):
if category != "deep":
raise UnsupportedBackendError("Backend category '%s' is not supported" % category)
if reset:
super(DeepFinder, self).synchronize_backend("deep", reset=True)
if backend is not None and self.params[category]["backend"] != backend:
raise UninitializedBackendError("Backend '%s' has not been configured yet" % backend)
backend = self.params[category]["backend"]
# reuse or cache a unique model depending on arch and checkpoint
model_classes = self.params[category]["classes"].value
model_arch = self.params[category]["arch"].value
model_checkpoint = self.params[category]["model"].value
model_id = model_arch if not model_checkpoint else model_checkpoint
# TODO: eventually think about using Catalyst and Keras
if backend == "pytorch":
# class-specific dependencies
import torch
import torchvision.models.detection as models
# reuse weights from already loaded models to avoid one model per sync
if model_id in self._cache:
model = self._cache[model_id]
else:
# only models pretrained on the COCO dataset are available
is_pretrained = model_checkpoint == "" and model_classes == 91
model = models.__dict__[model_arch](pretrained=is_pretrained,
num_classes=model_classes)
# load .pth or .pkl data file if pretrained model is available
if model_checkpoint:
model.load_state_dict(torch.load(model_checkpoint,
map_location="cpu"))
self._cache[model_id] = model
device_opt = self.params[category]["device"].value
if device_opt == "auto":
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
else:
device = torch.device(device_opt)
model.to(device)
model.eval()
self.net = model
elif backend == "tensorflow":
# class-specific dependencies
import tensorflow as tf
tf.keras.backend.clear_session()
# TODO: current TensorFlow model zoo/garden API is too unstable
from research.object_detection.utils import config_util
from research.object_detection.builders import model_builder
# TODO: the model ARCH and CHECKPOINT need extra path flexibility
#tf_models_dir = 'models/research/object_detection'
#model_arch = os.path.join(tf_models_dir, 'configs/tf2/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.config')
#model_checkpoint = os.path.join(tf_models_dir, 'test_data/checkpoint/ckpt-0')
# load pipeline config and build a detection model
configs = config_util.get_configs_from_pipeline_file(model_arch)
model_config = configs['model']
self.net = model_builder.build(model_config=model_config, is_training=False)
ckpt = tf.compat.v2.train.Checkpoint(model=self.net)
ckpt.restore(model_checkpoint)
else:
raise ValueError("Invalid DL backend '%s'" % backend)
[docs] def synchronize_backend(self, backend=None, category="deep", reset=False):
"""
Custom implementation of the base method.
See base method for details.
"""
self.__synchronize_backend(backend, category, reset)
[docs] def find(self, needle, haystack):
"""
Custom implementation of the base method.
:param needle: target pattern (cascade) to search for
:type needle: :py:class:`Pattern`
See base method for details.
"""
needle.match_settings = self
needle.use_own_settings = True
self.imglog.needle = needle
self.imglog.haystack = haystack
self.imglog.dump_matched_images()
# prepare a canvas solely for image logging
full_hotmap = haystack.pil_image.copy()
filtered_hotmap = haystack.pil_image.copy()
final_hotmap = haystack.pil_image.copy()
needle_class = needle.id
similarity = self.params["find"]["similarity"].value
backend = self.params["deep"]["backend"]
if backend == "tensorflow":
raise NotImplementedError("The TensorFlow model zoo/garden libary "
"is too unstable at present")
assert backend == "pytorch", "Only PyTorch model zoo/garden is supported"
import torch
if needle.data_file is not None:
with open(needle.data_file, "rt") as f:
classes_list = [line.rstrip() for line in f.readlines()]
classes = lambda x: classes_list[x]
else:
# an infinite list as a string identity map
classes = lambda x: str(x)
# set the module in evaluation mode
self.net.eval()
# convert haystack data to tensor variable
from torchvision import transforms
img = haystack.pil_image
transform = transforms.Compose([transforms.ToTensor()])
img = transform(img)
# a bit awkward but the only current way to get the model's device
device = next(self.net.parameters()).device
img.to(device)
# forward pass the image to obtain predictions
with torch.no_grad():
pred = self.net([img])
matches = []
from .match import Match
for i in range(len(pred[0]['labels'])):
label = classes(pred[0]['labels'][i].cpu().item())
score = pred[0]['scores'][i].cpu().item()
x, y, w, h = list(pred[0]['boxes'][i].cpu().numpy())
rect = (int(x), int(y), int(x+w), int(y+h))
from PIL import ImageDraw
draw = ImageDraw.Draw(full_hotmap)
draw.rectangle(rect, outline=(255, 0, 0))
draw.text((rect[0], rect[1]), label, fill=(255, 0, 0, 0))
if score < similarity:
logging.debug("Found %s has a low confidence score %s<%s, skipping",
label, score, similarity)
continue
draw = ImageDraw.Draw(filtered_hotmap)
draw.rectangle(rect, outline=(0, 255, 0))
draw.text((rect[0], rect[1]), label, fill=(0, 255, 0, 0))
if label != needle_class:
logging.debug("Found %s is not %s, skipping", label, needle_class)
continue
logging.debug("Found %s with sufficient confidence %s at (%s, %s)",
label, score, x, y)
draw = ImageDraw.Draw(final_hotmap)
draw.rectangle(rect, outline=(0, 0, 255))
self.imglog.locations.append((x, y))
self.imglog.similarities.append(score)
dx, dy = needle.center_offset.x, needle.center_offset.y
matches.append(Match(*rect, dx, dy, score))
self.imglog.hotmaps.append(full_hotmap)
self.imglog.hotmaps.append(filtered_hotmap)
self.imglog.hotmaps.append(final_hotmap)
self.imglog.log(30)
return matches
[docs] def log(self, lvl):
"""
Custom implementation of the base method.
See base method for details.
"""
# below selected logging level
if lvl < self.imglog.logging_level:
self.imglog.clear()
return
# logging is being collected for a specific logtype
elif ImageLogger.accumulate_logging:
return
# no hotmaps to log
elif len(self.imglog.hotmaps) == 0:
raise MissingHotmapError("No matching was performed in order to be image logged")
self.imglog.dump_hotmap("imglog%s-3hotmap-1full.png" % self.imglog.printable_step,
self.imglog.hotmaps[0])
self.imglog.dump_hotmap("imglog%s-3hotmap-2filtered.png" % self.imglog.printable_step,
self.imglog.hotmaps[1])
similarity = self.imglog.similarities[-1] if len(self.imglog.similarities) > 0 else 0.0
name = "imglog%s-3hotmap-%s.png" % (self.imglog.printable_step, similarity)
self.imglog.dump_hotmap(name, self.imglog.hotmaps[-1])
self.imglog.clear()
ImageLogger.step += 1
[docs]class HybridFinder(Finder):
"""
Match a target through a sequence of differently configured attempts.
This matcher can work with any other matcher in the background and with
unique or repeating matchers for each step. If a step fails, the matcher
tries the next available along the fallback chain or fails if the end of
the chain is reached.
"""
[docs] def __init__(self, configure=True, synchronize=True):
"""Build a hybrid matcher."""
super(HybridFinder, self).__init__(configure=False, synchronize=False)
# available and currently fully compatible methods
self.categories["hybrid"] = "hybrid_methods"
self.algorithms["hybrid_methods"] = ("autopy", "contour", "template", "feature", "tempfeat")
# other attributes
self.matcher = None
# additional preparation
if configure:
self.__configure_backend(reset=True)
if synchronize:
self.__synchronize_backend(reset=False)
def __configure_backend(self, backend=None, category="hybrid", reset=False):
if category != "hybrid":
raise UnsupportedBackendError("Backend category '%s' is not supported" % category)
if reset:
# backends are the same as the ones for the base class
super(HybridFinder, self).configure_backend(backend=backend, reset=True)
if backend is None:
backend = GlobalConfig.hybrid_match_backend
if backend not in self.algorithms[self.categories[category]]:
raise UnsupportedBackendError("Backend '%s' is not among the supported ones: "
"%s" % (backend, self.algorithms[self.categories[category]]))
self.params[category] = {}
self.params[category]["backend"] = backend
def __synchronize_backend(self, backend=None, category="hybrid", reset=False):
if category != "hybrid":
raise UnsupportedBackendError("Backend category '%s' is not supported" % category)
if reset:
super(HybridFinder, self).synchronize_backend("hybrid", reset=True)
if backend is not None and self.params[category]["backend"] != backend:
raise UninitializedBackendError("Backend '%s' has not been configured yet" % backend)
backend = self.params[category]["backend"]
# default matcher in case of a simple chain without own matching config
if backend == "autopy":
self.matcher = AutoPyFinder()
elif backend == "contour":
self.matcher = ContourFinder()
elif backend == "template":
self.matcher = TemplateFinder()
elif backend == "feature":
self.matcher = FeatureFinder()
elif backend == "cascade":
self.matcher = CascadeFinder()
elif backend == "text":
self.matcher = TextFinder()
elif backend == "tempfeat":
self.matcher = TemplateFeatureFinder()
elif backend == "deep":
self.matcher = DeepFinder()
[docs] def synchronize_backend(self, backend=None, category="hybrid", reset=False):
"""
Custom implementation of the base method.
See base method for details.
"""
self.__synchronize_backend(backend, category, reset)
[docs] def find(self, needle, haystack):
"""
Custom implementation of the base method.
See base method for details.
"""
try:
iter(needle)
except TypeError:
# one step chains can be of any target type
log.debug("Defaulting to one step chain %s", needle)
needle = [needle]
for step_needle in needle:
if step_needle.use_own_settings and not isinstance(step_needle.match_settings, HybridFinder):
matcher = step_needle.match_settings
else:
matcher = self.matcher
matches = matcher.find(step_needle, haystack)
if len(matches) > 0:
return matches
return []