Skip to content

Commit

Permalink
fix escaping logic to handle nested calls to populate_with_tokens (#503)
Browse files Browse the repository at this point in the history
fix escaping logic to handle nested calls to populate_with_tokens
  • Loading branch information
stlava authored May 26, 2020
1 parent 6e08d0b commit 6348ff9
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 32 deletions.
28 changes: 20 additions & 8 deletions kingpin/actors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ class BaseActor(object):
# second 'global runtime context object'.
strict_init_context = True

# Controls whether to remove escape characters from tokens that have been
# escaped.
remove_escape_sequence = True

def __init__(self, desc=None, options={}, dry=False, warn_on_failure=False,
condition=True, init_context={}, init_tokens={},
timeout=None):
Expand Down Expand Up @@ -149,17 +153,21 @@ def __init__(self, desc=None, options={}, dry=False, warn_on_failure=False,

# strict about this -- but in the future, when we have a
# runtime_context object, we may loosen this restriction).
self._fill_in_contexts(context=self._init_context,
strict=self.strict_init_context)
self._fill_in_contexts(
context=self._init_context,
strict=self.strict_init_context,
remove_escape_sequence=self.remove_escape_sequence)

self._setup_log()
self._setup_defaults()
self._validate_options() # Relies on _setup_log() above

# Fill in any options with the supplied initialization context. Be
self.log.debug('Initialized (warn_on_failure=%s, '
'strict_init_context=%s)' %
(warn_on_failure, self.strict_init_context))
'strict_init_context=%s,'
'remove_escape_sequence=%s)' %
(warn_on_failure, self.strict_init_context,
self.remove_escape_sequence))

def __repr__(self):
"""Returns a nice name/description of the actor.
Expand Down Expand Up @@ -373,7 +381,8 @@ def _check_condition(self):
self._condition, check))
return check

def _fill_in_contexts(self, context={}, strict=True):
def _fill_in_contexts(self, context={}, strict=True,
remove_escape_sequence=True):
"""Parses self._options and updates it with the supplied context.
Parses the objects self._options dict (by converting it into a JSON
Expand All @@ -395,7 +404,8 @@ def _fill_in_contexts(self, context={}, strict=True):
context,
self.left_context_separator,
self.right_context_separator,
strict=strict)
strict=strict,
remove_escape_sequence=remove_escape_sequence)
except LookupError as e:
msg = 'Context for description failed: %s' % e
raise exceptions.InvalidOptions(msg)
Expand All @@ -407,7 +417,8 @@ def _fill_in_contexts(self, context={}, strict=True):
context,
self.left_context_separator,
self.right_context_separator,
strict=strict)
strict=strict,
remove_escape_sequence=remove_escape_sequence)
except LookupError as e:
msg = 'Context for condition failed: %s' % e
raise exceptions.InvalidOptions(msg)
Expand All @@ -426,7 +437,8 @@ def _fill_in_contexts(self, context={}, strict=True):
self.left_context_separator,
self.right_context_separator,
strict=strict,
escape_sequence='\\\\')
escape_sequence='\\\\',
remove_escape_sequence=remove_escape_sequence)
except LookupError as e:
msg = 'Context for options failed: %s' % e
raise exceptions.InvalidOptions(msg)
Expand Down
5 changes: 5 additions & 0 deletions kingpin/actors/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ class BaseGroupActor(base.BaseActor):
# the moment that this actor is instantiated.
strict_init_context = False

# Do not remove remove escape sequence from escaped tokens. This will be
# done later by another actor. Otherwise we risk remove the escapes and
# failing because the token isn't found by a sub actor.
remove_escape_sequence = False

def __init__(self, *args, **kwargs):
"""Initializes all of the sub actors.
Expand Down
9 changes: 9 additions & 0 deletions kingpin/test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ def test_populate_with_escape_non_strict(self):
result = utils.populate_with_tokens(string, tokens, strict=False)
self.assertEquals(result, expect)

def test_populate_with_escape_non_strict_no_escape(self):
tokens = {'UNIT_TEST': 'FOOBAR'}
string = 'Unit \%UNIT_TEST\% Test'
expect = 'Unit \%UNIT_TEST\% Test'
result = utils.populate_with_tokens(string, tokens,
strict=False,
remove_escape_sequence=False)
self.assertEquals(result, expect)

def test_convert_script_to_dict(self):
# Should work with string path to a file
dirname, filename = os.path.split(os.path.abspath(__file__))
Expand Down
45 changes: 21 additions & 24 deletions kingpin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ def tornado_sleep(seconds=1.0):


def populate_with_tokens(string, tokens, left_wrapper='%', right_wrapper='%',
strict=True, escape_sequence='\\'):
strict=True, escape_sequence='\\',
remove_escape_sequence=True):
"""Insert token variables into the string.
Will match any token wrapped in '%'s and replace it with the value of that
Expand All @@ -262,6 +263,8 @@ def populate_with_tokens(string, tokens, left_wrapper='%', right_wrapper='%',
strict: (bool) whether or not to make sure all tokens were replaced
escape_sequence: character string to use as the escape sequence for
left and right wrappers
remove_escape_sequence: (bool) whether or not to remove the escape
sequence if it found. For example \\%FOO\\% would turn into %FOO%.
Example:
export ME=biz
Expand Down Expand Up @@ -300,35 +303,29 @@ def populate_with_tokens(string, tokens, left_wrapper='%', right_wrapper='%',
left_wrapper,
right_wrapper)

# If we aren't strict, we return after replacing escape sequence
if not strict:
# Find text that's between the wrappers and escape sequence and replace
# with just the wrappers and text.
string = re.sub(
escape_pattern,
r'{0}\2{1}'.format(left_wrapper, right_wrapper),
string)
return string

# If we are strict, we check if we missed anything. If we did, raise an
# exception.
missed_tokens = list(set(re.findall(r'%s[\w]+%s' %
(left_wrapper, right_wrapper), string)))
if strict:
missed_tokens = list(set(re.findall(r'%s[\w]+%s' %
(left_wrapper, right_wrapper), string)))

# Remove the escaped tokens from the missing tokens
escape_findings = re.finditer(escape_pattern, string)
escaped_tokens = [m.groups()[0] for m in escape_findings]
missed_tokens = list(set(missed_tokens) - set(escaped_tokens))
# Remove the escaped tokens from the missing tokens
escape_findings = re.finditer(escape_pattern, string)

if missed_tokens:
raise LookupError(
'Found un-matched tokens in JSON string: %s' % missed_tokens)
escaped_tokens = [m.groups()[1] for m in escape_findings]
missed_tokens = list(set(missed_tokens) - set(escaped_tokens))

# Replace the escaped tokens
string = re.sub(escape_pattern,
r'{0}\2{1}'.format(left_wrapper, right_wrapper),
string)
if missed_tokens:
raise LookupError(
'Found un-matched tokens in JSON string: %s' % missed_tokens)

# Find text that's between the wrappers and escape sequence and
# replace with just the wrappers and text.
if remove_escape_sequence:
string = re.sub(
escape_pattern,
r'{0}\2{1}'.format(left_wrapper, right_wrapper),
string)
return string


Expand Down

0 comments on commit 6348ff9

Please sign in to comment.