talentmap_api/integrations/synchronization_helpers.py
File `synchronization_helpers.py` has 427 lines of code (exceeds 250 allowed). Consider refactoring.'''This file contains a variety of helper functions for data synchronization''' from requests import Session import loggingimport osimport re import zeepfrom zeep.transports import Transport from dateutil.relativedelta import relativedelta import defusedxml.lxml as ET from django.conf import settings from talentmap_api.common.common_helpers import ensure_date, safe_navigationfrom talentmap_api.common.xml_helpers import parse_boolean, parse_date, get_nested_tag, xml_etree_to_dict, set_foreign_key_by_filters from talentmap_api.settings import get_delineated_environment_variable from talentmap_api.bidding.models import BiddingStatusfrom talentmap_api.position.models import Assignmentfrom talentmap_api.language.models import Proficiencyfrom talentmap_api.user_profile.models import SavedSearch logger = logging.getLogger(__name__) Function `get_soap_client` has a Cognitive Complexity of 12 (exceeds 7 allowed). Consider refactoring.def get_soap_client(cert=None, soap_function="", test=False): ''' Creates a properly initialized SOAP client, with unit testing support Args: - cert (str) - The location of the certificate to verify self-signed certificates (optional) - soap_function (str) - The name of the function that the loader will look for (optional unless testing) - test (bool) - Whether this is a testing SOAP client Returns: - client (Object) - The SOAP client ''' client = None if test: logger.info("Creating mock SOAP client for testing") # Create an anonymous object client = type('soapclient', (object,), {}) client.service = type('soapclientserivce', (object,), {}) # Add the test function def unit_test_function(self, **kwargs): # Get the value from kwargs for the RequestName requestname = kwargs.get("RequestName") paginationstartkey = kwargs.get("PaginationStartKey", "") parser = ET._etree.XMLParser(recover=True) # Load the soap integration test data for that request name xml = os.path.join(settings.BASE_DIR, 'talentmap_api', 'data', 'test_data', 'soap_integration', f'empty.xml') if paginationstartkey == "": xml = os.path.join(settings.BASE_DIR, 'talentmap_api', 'data', 'test_data', 'soap_integration', f'{requestname}.xml') xml_tree = ET.parse(xml, parser) return xml_tree # Bind the unit test function; Ash nazg thrakatulûk agh burzum-ishi krimpatul setattr(client.service, soap_function, unit_test_function.__get__(client.service, client.service.__class__)) else: # pragma: no cover # Initialize transport layer session = Session() # Get our Synchronization headers # This will search for any environment variables called DJANGO_SYNCHRONIZATION_HEADER_xxxx, and parse it as a Header/Value pair headers = {} env_sync_headers = [os.environ[key] for key in os.environ.keys() if key[:29] == "DJANGO_SYNCHRONIZATION_HEADER"] for header in env_sync_headers: # pragma: no cover split = header.split("=") headers[split[0]] = split[1] logger.info(f"Setting Synchronization header\t\t{split[0]}: {split[1]}") session.headers.update(headers) # Attempt to get the cert location cert = get_delineated_environment_variable('WSDL_SSL_CERT', None) if cert: logger.info(f'Setting SSL verification cert to {cert}') session.verify = cert else: logger.info(f'Ignoring self-signed certification errors.') session.verify = False transport = Transport(session=session) # Get the WSDL location wsdl_location = get_delineated_environment_variable('WSDL_LOCATION') logger.info(f'Initializing client with WSDL: {wsdl_location}') client = zeep.Client(wsdl=wsdl_location, transport=transport) # Get our Synchronization namespace overrides # This will search for any environment variables called DJANGO_SOAP_NS_OVERRIDE_xxxx, and parse it as a Header/Value pair soap_ns_overrides = [os.environ[key] for key in os.environ.keys() if key[:23] == "DJANGO_SOAP_NS_OVERRIDE"] logger.info(f"Current namespace mapping {client.wsdl.types.prefix_map}") for override in soap_ns_overrides: # pragma: no cover split = override.split("=") logger.info(f"Setting namespace override header\t\t{split[1]} -> {split[0]}") client.set_ns_prefix(split[1], client.wsdl.types.prefix_map[split[0]]) logger.info(f"New namespace mapping {client.wsdl.types.prefix_map}") return client def generate_soap_header(tag): ''' Generates and returns an XSD complex type representing the requested header ''' return zeep.xsd.Element( tag, zeep.xsd.String() ) def mode_skills(last_updated_date=None): # Request data soap_arguments = { "RequestorID": "TalentMAP", "Action": "GET", "RequestName": "skill", "MaximumOutputRows": 1000, "Version": "0.01", "DataFormat": "XML", "InputParameters": "<skills><skill></skill></skills>" } # Response parsing data instance_tag = "skill" collision_field = "code" tag_map = { "code": "code", "description": "description" } return (soap_arguments, instance_tag, tag_map, collision_field, None, None) def mode_grade(last_updated_date=None): # Request data soap_arguments = { "RequestorID": "TalentMAP", "Action": "GET", "RequestName": "grade", "MaximumOutputRows": 1000, "Version": "0.01", "DataFormat": "XML", "InputParameters": "<grades><grade></grade></grades>" } # Response parsing data instance_tag = "grade" collision_field = "code" tag_map = { "code": "code", } return (soap_arguments, instance_tag, tag_map, collision_field, None, None) def mode_tods(last_updated_date=None): # Request data soap_arguments = { "RequestorID": "TalentMAP", "Action": "GET", "RequestName": "tods", "MaximumOutputRows": 1000, "Version": "0.01", "DataFormat": "XML", "InputParameters": "<tods><tod></tod></tods>" } # Response parsing data instance_tag = "tod" collision_field = "code" tag_map = { "code": "code", "short_description": "short_description", "long_description": lambda instance, item: setattr(instance, "long_description", re.sub('&', '&', item.text).strip()), "months": "months", "status": "_status", "is_active": parse_boolean("is_active") } return (soap_arguments, instance_tag, tag_map, collision_field, None, None) def mode_organizations(last_updated_date=None): # Request data soap_arguments = { "RequestorID": "TalentMAP", "Action": "GET", "RequestName": "organization", "MaximumOutputRows": 1000, "Version": "0.01", "DataFormat": "XML", "InputParameters": "<organizations><organization></organization></organizations>" } # Response parsing data instance_tag = "organization" collision_field = "code" tag_map = { "code": "code", "short_description": "short_description", "long_description": "long_description", "parent_organization": "_parent_organization_code", "bureau_organization": "_parent_bureau_code", "city_code": "_location_code", "is_bureau": parse_boolean("is_bureau"), "is_regional": parse_boolean("is_regional") } return (soap_arguments, instance_tag, tag_map, collision_field, None, None) def mode_languages(last_updated_date=None): # Request data soap_arguments = { "RequestorID": "TalentMAP", "Action": "GET", "RequestName": "language", "MaximumOutputRows": 1000, "Version": "0.01", "DataFormat": "XML", "InputParameters": "<languages><language></language></languages>" } # Response parsing data instance_tag = "language" collision_field = "code" tag_map = { "code": "code", "long_description": "long_description", "short_description": "short_description", } def post_load_function(model, new_ids, updated_ids): # We need to ensure all appropriate proficiencies are existant # as there is no current SOAP synchronization endpoint for these Proficiency.create_defaults() return (soap_arguments, instance_tag, tag_map, collision_field, post_load_function, None) def mode_countries(last_updated_date=None): # Request data soap_arguments = { "RequestorID": "TalentMAP", "Action": "GET", "RequestName": "country", "MaximumOutputRows": 1000, "Version": "0.01", "DataFormat": "XML", "InputParameters": "<countries><country></country></countries>" } # Response parsing data instance_tag = "country" collision_field = "code" tag_map = tag_map = { "code": "code", "name": "name", "short_name": "short_name", "short_code": "short_code", "location_prefix": "location_prefix" } return (soap_arguments, instance_tag, tag_map, collision_field, None, None) def mode_locations(last_updated_date=None): # Request data soap_arguments = { "RequestorID": "TalentMAP", "Action": "GET", "RequestName": "location", "MaximumOutputRows": 1000, "Version": "0.01", "DataFormat": "XML", "InputParameters": "<locations><location></location></locations>" } # Response parsing data instance_tag = "location" collision_field = "code" tag_map = { "code": "code", "city": "city", "state": "state", "country": "_country" } return (soap_arguments, instance_tag, tag_map, collision_field, None, None) def mode_posts(last_updated_date=None): # Request data soap_arguments = { "RequestorID": "TalentMAP", "Action": "GET", "RequestName": "orgpost", "MaximumOutputRows": 1000, "Version": "0.01", "DataFormat": "XML", "InputParameters": "<orgposts><orgpost></orgpost></orgposts>" } # Response parsing data instance_tag = "orgpost" collision_field = "_location_code" tag_map = { "location_code": "_location_code", "tod_code": set_foreign_key_by_filters("tour_of_duty", "code", "__icontains"), "cost_of_living_adjustment": "cost_of_living_adjustment", "differential_rate": "differential_rate", "rest_relaxation_point": "rest_relaxation_point", "danger_pay": "danger_pay", "has_consumable_allowance": parse_boolean("has_consumable_allowance"), "has_service_needs_differential": parse_boolean("has_service_needs_differential"), } return (soap_arguments, instance_tag, tag_map, collision_field, None, None) def mode_capsule_descriptions(last_updated_date=None): # Set the appropriate use_last_updated string use_last_updated_string = "" if last_updated_date is not None: use_last_updated_string = f"<LAST_DATE_UPDATED>{last_updated_date}</LAST_DATE_UPDATED>" # Request data soap_arguments = { "RequestorID": "TalentMAP", "Action": "GET", "RequestName": "positioncapsule", "MaximumOutputRows": 100, "Version": "0.01", "DataFormat": "XML", "InputParameters": f"<positionCapsules><positionCapsule>{use_last_updated_string}</positionCapsule></positionCapsules>" } # Response parsing data instance_tag = "positionCapsule" collision_field = "_pos_seq_num" tag_map = { "POSITION_ID": "_pos_seq_num", "CONTENT": "content", "DATE_CREATED": parse_date("date_created"), "DATE_UPDATED": parse_date("date_updated"), } return (soap_arguments, instance_tag, tag_map, collision_field, None, None) def mode_positions(last_updated_date=None): # Request data soap_arguments = { "RequestorID": "TalentMAP", "Action": "GET", "RequestName": "position", "MaximumOutputRows": 1000, "Version": "0.01", "DataFormat": "XML", "InputParameters": "<positions><position></position></positions>" } # Response parsing data instance_tag = "position" collision_field = "_seq_num" tag_map = { "pos_id": "_seq_num", "position_number": "position_number", "title": "title", "org_code": "_org_code", "bureau_code": "_bureau_code", "skill_code": "_skill_code", "is_overseas": parse_boolean("is_overseas", ['O']), "grade": "_grade_code", "tod_code": set_foreign_key_by_filters("tour_of_duty", "code", "__icontains"), "language_code_1": "_language_1_code", "language_code_2": "_language_2_code", "location_code": "_location_code", "spoken_proficiency_1": "_language_1_spoken_proficiency_code", "reading_proficiency_1": "_language_1_reading_proficiency_code", "spoken_proficiency_2": "_language_2_spoken_proficiency_code", "reading_proficiency_2": "_language_2_reading_proficiency_code", "create_date": parse_date("create_date"), "update_date": parse_date("update_date"), "effective_date": parse_date("effective_date"), } def post_load_function(model, new_ids, updated_ids): # If we have any new or updated positions, update saved search counts if len(new_ids) + len(updated_ids) > 0: SavedSearch.update_counts_for_endpoint(endpoint='position', contains=True) return (soap_arguments, instance_tag, tag_map, collision_field, post_load_function, None) def mode_skill_cones(last_updated_date=None): # Request data soap_arguments = { "RequestorID": "TalentMAP", "Action": "GET", "RequestName": "jobcategoryskill", "MaximumOutputRows": 1000, "Version": "0.02", "DataFormat": "XML", "InputParameters": "<jobCategories><jobCategory></jobCategory></jobCategories>" } # Response parsing data instance_tag = "jobCategory" collision_field = "_id" tag_map = { "JC_ID": "_id", "JC_NM_TXT": "name", "skills": get_nested_tag("_skill_codes", "code", many=True) } return (soap_arguments, instance_tag, tag_map, collision_field, None, None) Function `mode_cycles` has a Cognitive Complexity of 10 (exceeds 7 allowed). Consider refactoring.def mode_cycles(last_updated_date=None): # Request data soap_arguments = { "RequestorID": "TalentMAP", "Action": "GET", "RequestName": "cycle", "MaximumOutputRows": 1000, "Version": "0.02", "DataFormat": "XML", "InputParameters": "<cycles><cycle></cycle></cycles>" } # Response parsing data instance_tag = "cycle" collision_field = "_id" tag_map = { "id": "_id", "name": "name", "category_code": "_category_code" } def override_loading_method(loader, tag, new_instances, updated_instances): # If our cycle exists, clear its position numbers xml_dict = xml_etree_to_dict(tag) extant_cycle = loader.model.objects.filter(_id=xml_dict['id']).first() if extant_cycle: extant_cycle._positions_seq_nums.clear() instance, updated = loader.default_xml_action(tag, new_instances, updated_instances) # Find the dates for this cycle for date in xml_dict["children"]: if date["DATA_TYPE"] == "CYCLE": instance.cycle_start_date = ensure_date(date["BEGIN_DATE"], utc_offset=-5) instance.cycle_end_date = ensure_date(date["END_DATE"], utc_offset=-5) elif date["DATA_TYPE"] == "BIDDUE": instance.cycle_deadline_date = ensure_date(date["BEGIN_DATE"], utc_offset=-5) if updated: instance.save() return (soap_arguments, instance_tag, tag_map, collision_field, None, override_loading_method) Function `mode_cycle_positions` has a Cognitive Complexity of 33 (exceeds 7 allowed). Consider refactoring.def mode_cycle_positions(last_updated_date=None): # Set the appropriate use_last_updated string use_last_updated_string = "" if last_updated_date is not None: use_last_updated_string = f"<LAST_DATE_UPDATED>{last_updated_date}</LAST_DATE_UPDATED>" # Request data soap_arguments = { "RequestorID": "TalentMAP", "Action": "GET", "RequestName": "availableposition", "MaximumOutputRows": 1000, "Version": "0.01", "DataFormat": "XML", "InputParameters": f"<availablePositions><availablePosition>{use_last_updated_string}</availablePosition></availablePositions>" } # Response parsing data instance_tag = "availablePosition" collision_field = "" tag_map = { } def post_load_function(model, new_ids, updated_ids): # If we have any new or updated positions, update saved search counts if len(new_ids) + len(updated_ids) > 0: SavedSearch.update_counts_for_endpoint(endpoint='position', contains=True) def override_loading_method(loader, tag, new_instances, updated_instances): data = xml_etree_to_dict(tag) # Find our matching bidcycle bc = loader.model.objects.filter(_id=data["CYCLE_ID"]).first() if bc: updated_instances.append(bc) bc._positions_seq_nums.append(data["POSITION_ID"]) bc.save() position = loader.model.positions.field.related_model.objects.filter(_seq_num=data["POSITION_ID"]).first() if position: updated_instances.append(position) bidding_status, _ = BiddingStatus.objects.get_or_create(bidcycle=bc, position=position) bidding_status.status_code = data["STATUS_CODE"] bidding_status.status = data["STATUS"] bidding_status.save() position.effective_date = ensure_date(data["DATE_UPDATED"], utc_offset=-5) position.posted_date = ensure_date(data["CP_POST_DT"], utc_offset=-5) position.save() if "TED" in data.keys(): ted = ensure_date(data["TED"], utc_offset=-5) tod = position.tour_of_duty if not tod: tod = safe_navigation(position, "post.tour_of_duty") if ted and tod and tod.months: start_date = ted - relativedelta(months=tod.months) if not position.current_assignment: Assignment.objects.create(position=position, start_date=start_date, tour_of_duty=tod, status="active") else: position.current_assignment.start_date = start_date position.current_assignment.tour_of_duty = tod position.current_assignment.save() elif ted: logger.warning(f"Attepting to set position {position} TED to {data['TED']} but no position or post TOD is available - start date will not be set") if not position.current_assignment: Assignment.objects.create(position=position, estimated_end_date=ted, status="active") else: position.current_assignment.estimated_end_date = ted position.current_assignment.state_date = None position.current_assignment.tour_of_duty = None position.current_assignment.save() else: logger.warning(f"Attempting to set position {position} TED, but TED is {ted}") return (soap_arguments, instance_tag, tag_map, collision_field, post_load_function, override_loading_method) # Model helper maps and return functionsMODEL_HELPER_MAP = { "position.Skill": [mode_skills], "position.SkillCone": [mode_skill_cones], "position.Grade": [mode_grade], "position.CapsuleDescription": [mode_capsule_descriptions], "organization.TourOfDuty": [mode_tods], "organization.Organization": [mode_organizations], "organization.Country": [mode_countries], "organization.Location": [mode_locations], "organization.Post": [mode_posts], "language.Language": [mode_languages], "position.Position": [mode_positions], "bidding.BidCycle": [mode_cycles, mode_cycle_positions],} def get_synchronization_information(model): return iter(MODEL_HELPER_MAP[model])