localstack/services/lambda_/event_source_listeners/utils.py
import json
import logging
import re
from localstack import config
from localstack.aws.api.lambda_ import FilterCriteria
from localstack.services.events.event_ruler import matches_rule
from localstack.utils.strings import first_char_to_lower
LOG = logging.getLogger(__name__)
class InvalidEventPatternException(Exception):
reason: str
def __init__(self, reason=None, message=None) -> None:
self.reason = reason
self.message = message or f"Event pattern is not valid. Reason: {reason}"
def filter_stream_records(records, filters: list[FilterCriteria]):
filtered_records = []
for record in records:
for filter in filters:
for rule in filter["Filters"]:
if config.EVENT_RULE_ENGINE == "java":
event_str = json.dumps(record)
event_pattern_str = rule["Pattern"]
match_result = matches_rule(event_str, event_pattern_str)
else:
filter_pattern: dict[str, any] = json.loads(rule["Pattern"])
match_result = does_match_event(filter_pattern, record)
if match_result:
filtered_records.append(record)
break
return filtered_records
def does_match_event(event_pattern: dict[str, any], event: dict[str, any]) -> bool:
"""Decides whether an event pattern matches an event or not.
Returns True if the `event_pattern` matches the given `event` and False otherwise.
Implements "Amazon EventBridge event patterns":
https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html
Used in different places:
* EventBridge: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html
* Lambda ESM: https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html
* EventBridge Pipes: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html
* SNS: https://docs.aws.amazon.com/sns/latest/dg/sns-subscription-filter-policies.html
Open source AWS rule engine: https://github.com/aws/event-ruler
"""
# TODO: test this conditional: https://coveralls.io/builds/66584026/source?filename=localstack%2Fservices%2Flambda_%2Fevent_source_listeners%2Futils.py#L25
if not event_pattern:
return True
does_match_results = []
for key, value in event_pattern.items():
# check if rule exists in event
event_value = event.get(key) if isinstance(event, dict) else None
does_pattern_match = False
if event_value is not None:
# check if filter rule value is a list (leaf of rule tree) or a dict (recursively call function)
if isinstance(value, list):
if len(value) > 0:
if isinstance(value[0], (str, int)):
does_pattern_match = event_value in value
if isinstance(value[0], dict):
does_pattern_match = verify_dict_filter(event_value, value[0])
else:
LOG.warning(f"Empty lambda filter: {key}")
elif isinstance(value, dict):
does_pattern_match = does_match_event(value, event_value)
else:
# special case 'exists'
def _filter_rule_value_list(val):
if isinstance(val[0], dict):
return not val[0].get("exists", True)
elif val[0] is None:
# support null filter
return True
def _filter_rule_value_dict(val):
for k, v in val.items():
return (
_filter_rule_value_list(val[k])
if isinstance(val[k], list)
else _filter_rule_value_dict(val[k])
)
return True
if isinstance(value, list) and len(value) > 0:
does_pattern_match = _filter_rule_value_list(value)
elif isinstance(value, dict):
# special case 'exists' for S type, e.g. {"S": [{"exists": false}]}
# https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.Lambda.Tutorial2.html
does_pattern_match = _filter_rule_value_dict(value)
does_match_results.append(does_pattern_match)
return all(does_match_results)
def verify_dict_filter(record_value: any, dict_filter: dict[str, any]) -> bool:
# https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-syntax
does_match_filter = False
for key, filter_value in dict_filter.items():
if key == "anything-but":
does_match_filter = record_value not in filter_value
elif key == "numeric":
does_match_filter = handle_numeric_conditions(record_value, filter_value)
elif key == "exists":
does_match_filter = bool(
filter_value
) # exists means that the key exists in the event record
elif key == "prefix":
if not isinstance(record_value, str):
LOG.warning(f"Record Value {record_value} does not seem to be a valid string.")
does_match_filter = isinstance(record_value, str) and record_value.startswith(
str(filter_value)
)
if does_match_filter:
return True
return does_match_filter
def handle_numeric_conditions(
first_operand: int | float, conditions: list[str | int | float]
) -> bool:
"""Implements numeric matching for a given list of conditions.
Example: { "numeric": [ ">", 0, "<=", 5 ] }
Numeric matching works with values that are JSON numbers.
It is limited to values between -5.0e9 and +5.0e9 inclusive, with 15 digits of precision,
or six digits to the right of the decimal point.
https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns-content-based-filtering.html#filtering-numeric-matchinghttps://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns-content-based-filtering.html#filtering-numeric-matching
"""
# Invalid example for uneven list: { "numeric": [ ">", 0, "<" ] }
if len(conditions) % 2 > 0:
raise InvalidEventPatternException("Bad numeric range operator")
if not isinstance(first_operand, (int, float)):
raise InvalidEventPatternException(
f"The value {first_operand} for the numeric comparison {conditions} is not a valid number"
)
for i in range(0, len(conditions), 2):
operator = conditions[i]
second_operand_str = conditions[i + 1]
try:
second_operand = float(second_operand_str)
except ValueError:
raise InvalidEventPatternException(
f"Could not convert filter value {second_operand_str} to a valid number"
) from ValueError
if operator == ">" and not (first_operand > second_operand):
return False
if operator == ">=" and not (first_operand >= second_operand):
return False
if operator == "=" and not (first_operand == second_operand):
return False
if operator == "<" and not (first_operand < second_operand):
return False
if operator == "<=" and not (first_operand <= second_operand):
return False
return True
def contains_list(filter: dict) -> bool:
if isinstance(filter, dict):
for key, value in filter.items():
if isinstance(value, list) and len(value) > 0:
return True
return contains_list(value)
return False
def validate_filters(filter: FilterCriteria) -> bool:
# filter needs to be json serializeable
for rule in filter["Filters"]:
try:
if not (filter_pattern := json.loads(rule["Pattern"])):
return False
return contains_list(filter_pattern)
except json.JSONDecodeError:
return False
# needs to contain on what to filter (some list with citerias)
# https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-syntax
return True
def message_attributes_to_lower(message_attrs):
"""Convert message attribute details (first characters) to lower case (e.g., stringValue, dataType)."""
message_attrs = message_attrs or {}
for _, attr in message_attrs.items():
if not isinstance(attr, dict):
continue
for key, value in dict(attr).items():
attr[first_char_to_lower(key)] = attr.pop(key)
return message_attrs
def event_source_arn_matches(mapped: str, searched: str) -> bool:
if not mapped:
return False
if not searched or mapped == searched:
return True
# Some types of ARNs can end with a path separated by slashes, for
# example the ARN of a DynamoDB stream is tableARN/stream/ID. It's
# a little counterintuitive that a more specific mapped ARN can
# match a less specific ARN on the event, but some integration tests
# rely on it for things like subscribing to a stream and matching an
# event labeled with the table ARN.
if re.match(r"^%s$" % searched, mapped):
return True
if mapped.startswith(searched):
suffix = mapped[len(searched) :]
return suffix[0] == "/"
return False
def has_data_filter_criteria(filters: list[FilterCriteria]) -> bool:
for filter in filters:
for rule in filter.get("Filters", []):
parsed_pattern = json.loads(rule["Pattern"])
if "data" in parsed_pattern:
return True
return False