From 1045f203b5c91ff9f7ea7298096546986036e812 Mon Sep 17 00:00:00 2001 From: Jordi Bagot Date: Tue, 25 Nov 2025 18:16:52 +0100 Subject: [PATCH 1/3] format --- jamf2snipe | 1387 ++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 1015 insertions(+), 372 deletions(-) diff --git a/jamf2snipe b/jamf2snipe index eb17500..a92da09 100755 --- a/jamf2snipe +++ b/jamf2snipe @@ -33,17 +33,17 @@ version = "1.0.7" validsubset = [ - "general", - "location", - "purchasing", - "peripherals", - "hardware", - "certificates", - "software", - "extension_attributes", - "groups_accounts", - "iphones", - "configuration_profiles" + "general", + "location", + "purchasing", + "peripherals", + "hardware", + "certificates", + "software", + "extension_attributes", + "groups_accounts", + "iphones", + "configuration_profiles", ] @@ -60,24 +60,96 @@ from urllib3 import Retry # Set us up for using runtime arguments by defining them. runtimeargs = argparse.ArgumentParser() -runtimeargs.add_argument("-v", "--verbose", help="Sets the logging level to INFO and gives you a better idea of what the script is doing.", action="store_true") -runtimeargs.add_argument("--auto_incrementing", help="You can use this if you have auto-incrementing enabled in your snipe instance to utilize that instead of adding the Jamf ID for the asset tag.", action="store_true") -runtimeargs.add_argument("--dryrun", help="This checks your config and tries to contact both the JAMFPro and Snipe-it instances, and will generate the assets for debugging, but not update or sync anything but exits before updating or syncing any assets.", action="store_true") -runtimeargs.add_argument("--connection_test", help="This checks your config and tries to contact both the JAMFPro and Snipe-it instances.", action="store_true") -runtimeargs.add_argument("-d", "--debug", help="Sets logging to include additional DEBUG messages.", action="store_true") -runtimeargs.add_argument("--do_not_update_jamf", help="Does not update Jamf with the asset tags stored in Snipe.", action="store_false") -runtimeargs.add_argument('--do_not_verify_ssl', help="Skips SSL verification for all requests. Helpful when you use self-signed certificate.", action="store_false") -runtimeargs.add_argument("-r", "--ratelimited", help="Puts a half second delay between API calls to adhere to the standard 120/minute rate limit", action="store_true") -runtimeargs.add_argument("-f", "--force", help="Updates the Snipe asset with information from Jamf every time, despite what the timestamps indicate.", action="store_true") -runtimeargs.add_argument("--version", help="Prints the version and exits.", action="store_true") +runtimeargs.add_argument( + "-v", + "--verbose", + help="Sets the logging level to INFO and gives you a better idea of what the script is doing.", + action="store_true", +) +runtimeargs.add_argument( + "--auto_incrementing", + help="You can use this if you have auto-incrementing enabled in your snipe instance to utilize that instead of adding the Jamf ID for the asset tag.", + action="store_true", +) +runtimeargs.add_argument( + "--dryrun", + help="This checks your config and tries to contact both the JAMFPro and Snipe-it instances, and will generate the assets for debugging, but not update or sync anything but exits before updating or syncing any assets.", + action="store_true", +) +runtimeargs.add_argument( + "--connection_test", + help="This checks your config and tries to contact both the JAMFPro and Snipe-it instances.", + action="store_true", +) +runtimeargs.add_argument( + "-d", + "--debug", + help="Sets logging to include additional DEBUG messages.", + action="store_true", +) +runtimeargs.add_argument( + "--do_not_update_jamf", + help="Does not update Jamf with the asset tags stored in Snipe.", + action="store_false", +) +runtimeargs.add_argument( + "--do_not_verify_ssl", + help="Skips SSL verification for all requests. Helpful when you use self-signed certificate.", + action="store_false", +) +runtimeargs.add_argument( + "-r", + "--ratelimited", + help="Puts a half second delay between API calls to adhere to the standard 120/minute rate limit", + action="store_true", +) +runtimeargs.add_argument( + "-f", + "--force", + help="Updates the Snipe asset with information from Jamf every time, despite what the timestamps indicate.", + action="store_true", +) +runtimeargs.add_argument( + "--version", help="Prints the version and exits.", action="store_true" +) user_opts = runtimeargs.add_mutually_exclusive_group() -user_opts.add_argument("-u", "--users", help="Checks out the item to the current user in Jamf if it's not already deployed", action="store_true") -user_opts.add_argument("-ui", "--users_inverse", help="Checks out the item to the current user in Jamf if it's already deployed", action="store_true") -user_opts.add_argument("-uf", "--users_force", help="Checks out the item to the user specified in Jamf no matter what", action="store_true") -runtimeargs.add_argument("-uns", "--users_no_search", help="Doesn't search for any users if the specified fields in Jamf and Snipe don't match. (case insensitive)", action="store_true") +user_opts.add_argument( + "-u", + "--users", + help="Checks out the item to the current user in Jamf if it's not already deployed", + action="store_true", +) +user_opts.add_argument( + "-ui", + "--users_inverse", + help="Checks out the item to the current user in Jamf if it's already deployed", + action="store_true", +) +user_opts.add_argument( + "-uf", + "--users_force", + help="Checks out the item to the user specified in Jamf no matter what", + action="store_true", +) +runtimeargs.add_argument( + "-uns", + "--users_no_search", + help="Doesn't search for any users if the specified fields in Jamf and Snipe don't match. (case insensitive)", + action="store_true", +) type_opts = runtimeargs.add_mutually_exclusive_group() -type_opts.add_argument("-m", "--mobiles", help="Runs against the Jamf mobiles endpoint only.", action="store_true") -type_opts.add_argument("-c", "--computers", help="Runs against the Jamf computers endpoint only.", action="store_true") +type_opts.add_argument( + "-m", + "--mobiles", + help="Runs against the Jamf mobiles endpoint only.", + action="store_true", +) +type_opts.add_argument( + "-c", + "--computers", + help="Runs against the Jamf computers endpoint only.", + action="store_true", +) user_args = runtimeargs.parse_args() if user_args.version: @@ -94,17 +166,25 @@ else: # Notify users if we're doing a connection test. if user_args.connection_test and user_args.dryrun: - logging.error("You can't use --connection_test and --dryrun at the same time. Please choose one or the other.") + logging.error( + "You can't use --connection_test and --dryrun at the same time. Please choose one or the other." + ) raise SystemExit("Error: Invalid runtime arguments - Exiting.") if user_args.connection_test and user_args.force: - logging.error("You can't use --connection_test and --force at the same time. Please choose one or the other.") + logging.error( + "You can't use --connection_test and --force at the same time. Please choose one or the other." + ) raise SystemExit("Error: Invalid runtime arguments - Exiting.") if user_args.connection_test: - print("Connection test: Starting jamf2snipe with a connection test where we'll try to contact both the JAMFPro and Snipe-it instances.") + print( + "Connection test: Starting jamf2snipe with a connection test where we'll try to contact both the JAMFPro and Snipe-it instances." + ) # Notify users if we're doing a dry run. if user_args.dryrun and user_args.force: - print("Running a dry run with force enabled. This will generate assets for debugging, but not update or sync anything.") + print( + "Running a dry run with force enabled. This will generate assets for debugging, but not update or sync anything." + ) elif user_args.dryrun: print("Dryrun: Starting jamf2snipe with a dry run where no assets will be updated.") @@ -113,72 +193,100 @@ logging.info("Searching for a valid settings.conf file.") config = configparser.ConfigParser() logging.debug("Checking for a settings.conf in /opt/jamf2snipe ...") config.read("/opt/jamf2snipe/settings.conf") -if 'snipe-it' not in set(config): - logging.debug("No valid config found in: /opt Checking for a settings.conf in /etc/jamf2snipe ...") - config.read('/etc/jamf2snipe/settings.conf') -if 'snipe-it' not in set(config): - logging.debug("No valid config found in /etc Checking for a settings.conf in current directory ...") +if "snipe-it" not in set(config): + logging.debug( + "No valid config found in: /opt Checking for a settings.conf in /etc/jamf2snipe ..." + ) + config.read("/etc/jamf2snipe/settings.conf") +if "snipe-it" not in set(config): + logging.debug( + "No valid config found in /etc Checking for a settings.conf in current directory ..." + ) config.read("settings.conf") -if 'snipe-it' not in set(config): +if "snipe-it" not in set(config): logging.debug("No valid config found in current folder.") - logging.error("No valid settings.conf was found. We'll need to quit while you figure out where the settings are at. You can check the README for valid locations.") + logging.error( + "No valid settings.conf was found. We'll need to quit while you figure out where the settings are at. You can check the README for valid locations." + ) raise SystemExit("Error: No valid settings.conf - Exiting.") -logging.info("Great, we found a settings file. Let's get started by parsing all of the settings.") +logging.info( + "Great, we found a settings file. Let's get started by parsing all of the settings." +) # While setting the variables, use a try loop so we can raise a error if something goes wrong. try: # Set some Variables from the settings.conf: # This is the address, cname, or FQDN for your JamfPro instance. logging.info("Setting the Jamf Pro Base url.") - jamfpro_base = config['jamf']['url'] + jamfpro_base = config["jamf"]["url"] logging.debug("The configured Jamf Pro base url is: {}".format(jamfpro_base)) logging.info("Setting the username to request an api key.") - jamf_user = config['jamf']['username'] + jamf_user = config["jamf"]["username"] logging.debug("The user you provided for Jamf is: {}".format(jamf_user)) logging.info("Setting the password to request an api key.") - jamf_password = config['jamf']['password'] + jamf_password = config["jamf"]["password"] logging.debug("The password you provided for Jamf is: {}".format(jamf_user)) # This is the address, cname, or FQDN for your snipe-it instance. logging.info("Setting the base URL for SnipeIT.") - snipe_base = config['snipe-it']['url'] + snipe_base = config["snipe-it"]["url"] logging.debug("The configured Snipe-IT base url is: {}".format(snipe_base)) logging.info("Setting the API key for SnipeIT.") - snipe_apiKey = config['snipe-it']['apikey'] + snipe_apiKey = config["snipe-it"]["apikey"] logging.debug("The API key you provided for Snipe is: {}".format(snipe_apiKey)) logging.info("Setting the default status for SnipeIT assets.") - defaultStatus = config['snipe-it']['defaultStatus'] - logging.debug("The default status we'll be setting updated assets to is: {} (I sure hope this is a number or something is probably wrong)".format(defaultStatus)) + defaultStatus = config["snipe-it"]["defaultStatus"] + logging.debug( + "The default status we'll be setting updated assets to is: {} (I sure hope this is a number or something is probably wrong)".format( + defaultStatus + ) + ) logging.info("Setting the Snipe ID for Apple Manufacturer devices.") - apple_manufacturer_id = config['snipe-it']['manufacturer_id'] - logging.debug("The configured manufacturer ID for Apple computers in snipe is: {} (Pretty sure this needs to be a number too)".format(apple_manufacturer_id)) + apple_manufacturer_id = config["snipe-it"]["manufacturer_id"] + logging.debug( + "The configured manufacturer ID for Apple computers in snipe is: {} (Pretty sure this needs to be a number too)".format( + apple_manufacturer_id + ) + ) except: - logging.error("Some of the required settings from the settings.conf were missing or invalid. Re-run jamf2snipe with the --verbose or --debug flag to get more details on which setting is missing or misconfigured.") + logging.error( + "Some of the required settings from the settings.conf were missing or invalid. Re-run jamf2snipe with the --verbose or --debug flag to get more details on which setting is missing or misconfigured." + ) raise SystemExit("Error: Missing or invalid settings in settings.conf - Exiting.") # Check the config file for correct headers # Do some tests to see if the user has updated their settings.conf file SETTINGS_CORRECT = True -if 'api-mapping' in config: - logging.error("Looks like you're using the old method for api-mapping. Please use computers-api-mapping and mobile_devices-api-mapping.") +if "api-mapping" in config: + logging.error( + "Looks like you're using the old method for api-mapping. Please use computers-api-mapping and mobile_devices-api-mapping." + ) SETTINGS_CORRECT = False -if not 'user-mapping' in config and (user_args.users or user_args.users_force or user_args.users_inverse): - logging.error("""You've chosen to check out assets to users in some capacity using a cmdline switch, but not specified how you want to - search Snipe IT for the users from Jamf. Make sure you have a 'user-mapping' section in your settings.conf file.""") +if not "user-mapping" in config and ( + user_args.users or user_args.users_force or user_args.users_inverse +): + logging.error( + """You've chosen to check out assets to users in some capacity using a cmdline switch, but not specified how you want to + search Snipe IT for the users from Jamf. Make sure you have a 'user-mapping' section in your settings.conf file.""" + ) SETTINGS_CORRECT = False if snipe_base.endswith("/"): - logging.error("""You have a trailing forward slash in the snipe url. Please remove it.""") + logging.error( + """You have a trailing forward slash in the snipe url. Please remove it.""" + ) SETTINGS_CORRECT = False if jamfpro_base.endswith("/"): - logging.error("""You have a trailing forward slash in the JamfPro url. Please remove it.""") + logging.error( + """You have a trailing forward slash in the JamfPro url. Please remove it.""" + ) SETTINGS_CORRECT = False @@ -186,14 +294,20 @@ if not SETTINGS_CORRECT: raise SystemExit # Check the config file for valid jamf subsets. This is based off the JAMF API and if it's not right we can't map fields over to SNIPE properly. -logging.debug("Checking the settings.conf file for valid JAMF subsets of the JAMF API so mapping can occur properly.") -for key in config['computers-api-mapping']: - jamfsplit = config['computers-api-mapping'][key].split() +logging.debug( + "Checking the settings.conf file for valid JAMF subsets of the JAMF API so mapping can occur properly." +) +for key in config["computers-api-mapping"]: + jamfsplit = config["computers-api-mapping"][key].split() if jamfsplit[0] in validsubset: - logging.info('Found subset {}: Acceptable'.format(jamfsplit[0])) + logging.info("Found subset {}: Acceptable".format(jamfsplit[0])) continue else: - logging.error("Found invalid subset: {} in the settings.conf file.\nThis is not in the acceptable list of subsets. Check your settings.conf\n Valid subsets are: {}".format(jamfsplit[0], ', '.join(validsubset))) + logging.error( + "Found invalid subset: {} in the settings.conf file.\nThis is not in the acceptable list of subsets. Check your settings.conf\n Valid subsets are: {}".format( + jamfsplit[0], ", ".join(validsubset) + ) + ) raise SystemExit("Invalid Subset found in settings.conf") ### Setup Some Functions ### @@ -202,16 +316,25 @@ first_api_call = None # Headers for the API call. logging.info("Creating the headers we'll need for API calls") -jamfbasicheaders = {'Accept': 'application/json','Content-Type':'application/json'} -snipeheaders = {'Authorization': 'Bearer {}'.format(snipe_apiKey),'Accept': 'application/json','Content-Type':'application/json'} -logging.debug('Request headers for JamfPro will be: {}\nRequest headers for Snipe will be: {}'.format(jamfbasicheaders, snipeheaders)) +jamfbasicheaders = {"Accept": "application/json", "Content-Type": "application/json"} +snipeheaders = { + "Authorization": "Bearer {}".format(snipe_apiKey), + "Accept": "application/json", + "Content-Type": "application/json", +} +logging.debug( + "Request headers for JamfPro will be: {}\nRequest headers for Snipe will be: {}".format( + jamfbasicheaders, snipeheaders + ) +) session = Session() retries = Retry( total=3, - allowed_methods={'GET'}, + allowed_methods={"GET"}, ) -session.mount('https://', HTTPAdapter(max_retries=retries)) +session.mount("https://", HTTPAdapter(max_retries=retries)) + # Use Basic Auth to request a Jamf Token. def request_jamf_token(): @@ -223,33 +346,68 @@ def request_jamf_token(): global expires_time token_request_time = time.time() logging.info("Requesting a new token at {}.".format(token_request_time)) - api_url = '{0}/api/v1/auth/token'.format(jamfpro_base) + api_url = "{0}/api/v1/auth/token".format(jamfpro_base) # No hook for this api call. - logging.debug('Calling for a token against: {}\n The username and password can be found earlier in the script.'.format(api_url)) + logging.debug( + "Calling for a token against: {}\n The username and password can be found earlier in the script.".format( + api_url + ) + ) # No hook for this API call. - response = session.post(api_url, auth=(jamf_user, jamf_password), headers=jamfbasicheaders, verify=user_args.do_not_verify_ssl) + response = session.post( + api_url, + auth=(jamf_user, jamf_password), + headers=jamfbasicheaders, + verify=user_args.do_not_verify_ssl, + ) if response.status_code == 200: logging.debug("Got back a valid 200 response code.") jsonresponse = response.json() logging.debug(jsonresponse) # So we have our token and Expires time. Set the expires time globably so we can reset later. try: - expires_time = datetime.datetime.fromisoformat(jsonresponse['expires'].replace("Z", "+00:00")) + expires_time = datetime.datetime.fromisoformat( + jsonresponse["expires"].replace("Z", "+00:00") + ) except: # APIs are awful and Jamf doesn't always send enough ms digits. UGH. try: - expires_time = datetime.datetime.fromisoformat(jsonresponse['expires'].replace("Z", "0+00:00")) + expires_time = datetime.datetime.fromisoformat( + jsonresponse["expires"].replace("Z", "0+00:00") + ) except: - logging.error("Jamf sent a malformed timestamp: {}\n Please feel free to complain to Jamf support.".format(jsonresponse['expires'])) + logging.error( + "Jamf sent a malformed timestamp: {}\n Please feel free to complain to Jamf support.".format( + jsonresponse["expires"] + ) + ) raise SystemExit("Unable to grok Jamf Timestamp - Exiting") - logging.debug("Token expires in: {}".format(expires_time - datetime.datetime.now(datetime.timezone.utc))) + logging.debug( + "Token expires in: {}".format( + expires_time - datetime.datetime.now(datetime.timezone.utc) + ) + ) # The headers are also global, because they get used elsewhere. logging.info("Setting new jamf headers with bearer token") - jamfheaders = {'Authorization': 'Bearer {}'.format(jsonresponse['token']),'Accept': 'application/json','Content-Type':'application/json'} - jamfxmlheaders = {'Authorization': 'Bearer {}'.format(jsonresponse['token']),'Accept': 'application/xml','Content-Type':'application/xml'} - logging.debug('Request headers for JamfPro will be: {}\nRequest headers for Snipe will be: {}'.format(jamfheaders, snipeheaders)) + jamfheaders = { + "Authorization": "Bearer {}".format(jsonresponse["token"]), + "Accept": "application/json", + "Content-Type": "application/json", + } + jamfxmlheaders = { + "Authorization": "Bearer {}".format(jsonresponse["token"]), + "Accept": "application/xml", + "Content-Type": "application/xml", + } + logging.debug( + "Request headers for JamfPro will be: {}\nRequest headers for Snipe will be: {}".format( + jamfheaders, snipeheaders + ) + ) else: - logging.error("Could not obtain a token for use with Jamf's classic API. Please check your username and password.") + logging.error( + "Could not obtain a token for use with Jamf's classic API. Please check your username and password." + ) raise SystemExit("Unable to obtain Jamf Token") @@ -268,7 +426,9 @@ def request_handler(r, *args, **kwargs): # Slow and steady wins the race. Limit all API calls (not just to snipe) to the Rate limit. if user_args.ratelimited: if '"messages":429' in r.text: - logging.warn("Despite respecting the rate limit of Snipe, we've still been limited. Trying again after sleeping for 2 seconds.") + logging.warn( + "Despite respecting the rate limit of Snipe, we've still been limited. Trying again after sleeping for 2 seconds." + ) time.sleep(2) re_req = r.request return session.send(re_req) @@ -276,407 +436,712 @@ def request_handler(r, *args, **kwargs): first_api_call = time.time() time.sleep(0.5) api_count += 1 - time_elapsed = (time.time() - first_api_call) + time_elapsed = time.time() - first_api_call api_rate = api_count / time_elapsed if api_rate > 1.95: sleep_time = 0.5 + (api_rate - 1.95) - logging.debug('Going over snipe rate limit of 120/minute ({}/minute), sleeping for {}'.format(api_rate,sleep_time)) + logging.debug( + "Going over snipe rate limit of 120/minute ({}/minute), sleeping for {}".format( + api_rate, sleep_time + ) + ) time.sleep(sleep_time) - logging.debug("Made {} requests to Snipe IT in {} seconds, with a request being sent every {} seconds".format(api_count, time_elapsed, api_rate)) + logging.debug( + "Made {} requests to Snipe IT in {} seconds, with a request being sent every {} seconds".format( + api_count, time_elapsed, api_rate + ) + ) if '"messages":429' in r.text: logging.error(r.content) - raise SystemExit("We've been rate limited. Use option -r to respect the built in Snipe IT API rate limit of 120/minute.") + raise SystemExit( + "We've been rate limited. Use option -r to respect the built in Snipe IT API rate limit of 120/minute." + ) return r + # Function to make the API call for all JAMF devices def get_jamf_computers(): - api_url = '{0}/JSSResource/computers'.format(jamfpro_base) - logging.debug('Calling for JAMF computers against: {}\n The username, passwords, and headers for this GET requestcan be found near the beginning of the output.'.format(api_url)) - response = session.get(api_url, headers=jamfheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + api_url = "{0}/JSSResource/computers".format(jamfpro_base) + logging.debug( + "Calling for JAMF computers against: {}\n The username, passwords, and headers for this GET requestcan be found near the beginning of the output.".format( + api_url + ) + ) + response = session.get( + api_url, + headers=jamfheaders, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 200: logging.debug("Got back a valid 200 response code.") return response.json() - elif b'policies.ratelimit.QuotaViolation' in response.content: - logging.info('JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to get a list of computers Waiting a bit to retry the lookup.'.format(response)) - logging.warning('JAMFPro Ratelimit exceeded: pausing ') + elif b"policies.ratelimit.QuotaViolation" in response.content: + logging.info( + "JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to get a list of computers Waiting a bit to retry the lookup.".format( + response + ) + ) + logging.warning("JAMFPro Ratelimit exceeded: pausing ") time.sleep(75) logging.info("Finished waiting. Retrying lookup...") newresponse = get_jamf_computers() return newresponse else: - logging.warning('Received an invalid status code when trying to retreive JAMF Device list:{} - {}'.format(response.status_code, response.content)) + logging.warning( + "Received an invalid status code when trying to retreive JAMF Device list:{} - {}".format( + response.status_code, response.content + ) + ) logging.debug("Returning a null value for the function.") return None + # Function to make the API call for JAMF devices in group def get_jamf_computers_by_group(jamf_id): - api_url = '{0}/JSSResource/computergroups/id/{1}'.format(jamfpro_base, jamf_id) - logging.debug('Calling for JAMF computers against: {}\n The username, passwords, and headers for this GET requestcan be found near the beginning of the output.'.format(api_url)) - response = session.get(api_url, headers=jamfheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + api_url = "{0}/JSSResource/computergroups/id/{1}".format(jamfpro_base, jamf_id) + logging.debug( + "Calling for JAMF computers against: {}\n The username, passwords, and headers for this GET requestcan be found near the beginning of the output.".format( + api_url + ) + ) + response = session.get( + api_url, + headers=jamfheaders, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 200: logging.debug("Got back a valid 200 response code.") jsonresponse = response.json() - logging.debug("Returning: {}".format(jsonresponse['computer_group'])) - return jsonresponse['computer_group'] - elif b'policies.ratelimit.QuotaViolation' in response.content: - logging.info('JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to get a list of computers Waiting a bit to retry the lookup.'.format(response)) - logging.warning('JAMFPro Ratelimit exceeded: pausing ') + logging.debug("Returning: {}".format(jsonresponse["computer_group"])) + return jsonresponse["computer_group"] + elif b"policies.ratelimit.QuotaViolation" in response.content: + logging.info( + "JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to get a list of computers Waiting a bit to retry the lookup.".format( + response + ) + ) + logging.warning("JAMFPro Ratelimit exceeded: pausing ") time.sleep(75) logging.info("Finished waiting. Retrying lookup...") newresponse = get_jamf_computers_by_group(jamf_id) return newresponse else: - logging.warning('Received an invalid status code when trying to retreive JAMF Device list:{} - {}'.format(response.status_code, response.content)) + logging.warning( + "Received an invalid status code when trying to retreive JAMF Device list:{} - {}".format( + response.status_code, response.content + ) + ) logging.debug("Returning a null value for the function.") return None + # Function to make the API call for all JAMF mobile devices def get_jamf_mobiles(): - api_url = '{0}/JSSResource/mobiledevices'.format(jamfpro_base) - logging.debug('Calling for JAMF mobiles against: {}\n The username, passwords, and headers for this GET requestcan be found near the beginning of the output.'.format(api_url)) - response = session.get(api_url, headers=jamfheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + api_url = "{0}/JSSResource/mobiledevices".format(jamfpro_base) + logging.debug( + "Calling for JAMF mobiles against: {}\n The username, passwords, and headers for this GET requestcan be found near the beginning of the output.".format( + api_url + ) + ) + response = session.get( + api_url, + headers=jamfheaders, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 200: logging.debug("Got back a valid 200 response code.") return response.json() - elif b'policies.ratelimit.QuotaViolation' in response.content: - logging.info('JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to get a list of mobiles Waiting a bit to retry the lookup.'.format(response)) - logging.warning('JAMFPro Ratelimit exceeded: pausing ') + elif b"policies.ratelimit.QuotaViolation" in response.content: + logging.info( + "JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to get a list of mobiles Waiting a bit to retry the lookup.".format( + response + ) + ) + logging.warning("JAMFPro Ratelimit exceeded: pausing ") time.sleep(75) logging.info("Finished waiting. Retrying lookup...") newresponse = get_jamf_mobiles() return newresponse else: - logging.warning('Received an invalid status code when trying to retreive JAMF Device list:{} - {}'.format(response.status_code, response.content)) + logging.warning( + "Received an invalid status code when trying to retreive JAMF Device list:{} - {}".format( + response.status_code, response.content + ) + ) logging.debug("Returning a null value for the function.") return None + # Function to make the API call for all JAMF mobile devices in group def get_jamf_mobiles_by_group(jamf_id): - api_url = '{0}/JSSResource/mobiledevicegroups/id/{1}'.format(jamfpro_base, jamf_id) - logging.debug('Calling for JAMF mobiles against: {}\n The username, passwords, and headers for this GET requestcan be found near the beginning of the output.'.format(api_url)) - response = session.get(api_url, headers=jamfheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + api_url = "{0}/JSSResource/mobiledevicegroups/id/{1}".format(jamfpro_base, jamf_id) + logging.debug( + "Calling for JAMF mobiles against: {}\n The username, passwords, and headers for this GET requestcan be found near the beginning of the output.".format( + api_url + ) + ) + response = session.get( + api_url, + headers=jamfheaders, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 200: logging.debug("Got back a valid 200 response code.") jsonresponse = response.json() - logging.debug("Returning: {}".format(jsonresponse['mobile_device_group'])) - return jsonresponse['mobile_device_group'] - elif b'policies.ratelimit.QuotaViolation' in response.content: - logging.info('JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to get a list of mobiles Waiting a bit to retry the lookup.'.format(response)) - logging.warning('JAMFPro Ratelimit exceeded: pausing ') + logging.debug("Returning: {}".format(jsonresponse["mobile_device_group"])) + return jsonresponse["mobile_device_group"] + elif b"policies.ratelimit.QuotaViolation" in response.content: + logging.info( + "JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to get a list of mobiles Waiting a bit to retry the lookup.".format( + response + ) + ) + logging.warning("JAMFPro Ratelimit exceeded: pausing ") time.sleep(75) logging.info("Finished waiting. Retrying lookup...") newresponse = get_jamf_mobiles_by_group(jamf_id) return newresponse else: - logging.warning('Received an invalid status code when trying to retreive JAMF Device list:{} - {}'.format(response.status_code, response.content)) + logging.warning( + "Received an invalid status code when trying to retreive JAMF Device list:{} - {}".format( + response.status_code, response.content + ) + ) logging.debug("Returning a null value for the function.") return None + # Function to lookup a JAMF asset by id. def search_jamf_asset(jamf_id): api_url = "{}/JSSResource/computers/id/{}".format(jamfpro_base, jamf_id) - response = session.get(api_url, headers=jamfheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + response = session.get( + api_url, + headers=jamfheaders, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 200: logging.debug("Got back a valid 200 response code.") jsonresponse = response.json() - logging.debug("Returning: {}".format(jsonresponse['computer'])) - return jsonresponse['computer'] - elif b'policies.ratelimit.QuotaViolation' in response.content: - logging.info('JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to look up id: {} Waiting a bit to retry the lookup.'.format(response, jamf_id)) - logging.warning('JAMFPro Ratelimit exceeded: pausing ') + logging.debug("Returning: {}".format(jsonresponse["computer"])) + return jsonresponse["computer"] + elif b"policies.ratelimit.QuotaViolation" in response.content: + logging.info( + "JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to look up id: {} Waiting a bit to retry the lookup.".format( + response, jamf_id + ) + ) + logging.warning("JAMFPro Ratelimit exceeded: pausing ") time.sleep(75) logging.info("Finished waiting. Retrying lookup...") newresponse = search_jamf_asset(jamf_id) return newresponse else: - logging.warning('JAMFPro responded with error code:{} when we tried to look up id: {}'.format(response, jamf_id)) + logging.warning( + "JAMFPro responded with error code:{} when we tried to look up id: {}".format( + response, jamf_id + ) + ) logging.debug("Returning a null value for the function.") return None + # Function to lookup a JAMF mobile asset by id. def search_jamf_mobile(jamf_id): api_url = "{}/JSSResource/mobiledevices/id/{}".format(jamfpro_base, jamf_id) - response = session.get(api_url, headers=jamfheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + response = session.get( + api_url, + headers=jamfheaders, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 200: logging.debug("Got back a valid 200 response code.") jsonresponse = response.json() - logging.debug("Returning: {}".format(jsonresponse['mobile_device'])) - return jsonresponse['mobile_device'] - elif b'policies.ratelimit.QuotaViolation' in response.content: - logging.info('JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to look up id: {} Waiting a bit to retry the lookup.'.format(response, jamf_id)) - logging.warning('JAMFPro Ratelimit exceeded: pausing ') + logging.debug("Returning: {}".format(jsonresponse["mobile_device"])) + return jsonresponse["mobile_device"] + elif b"policies.ratelimit.QuotaViolation" in response.content: + logging.info( + "JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to look up id: {} Waiting a bit to retry the lookup.".format( + response, jamf_id + ) + ) + logging.warning("JAMFPro Ratelimit exceeded: pausing ") time.sleep(75) logging.info("Finished waiting. Retyring lookup...") newresponse = search_jamf_asset(jamf_id) return newresponse else: - logging.warning('JAMFPro responded with error code:{} when we tried to look up id: {}'.format(response, jamf_id)) + logging.warning( + "JAMFPro responded with error code:{} when we tried to look up id: {}".format( + response, jamf_id + ) + ) logging.debug("Returning a null value for the function.") return None + # Function to update the asset tag of computers in JAMF with an number passed from Snipe. def update_jamf_asset_tag(jamf_id, asset_tag): if user_args.dryrun: - logging.debug("Would have updated JAMF asset id: {} with asset tag: {}".format(jamf_id, asset_tag)) + logging.debug( + "Would have updated JAMF asset id: {} with asset tag: {}".format( + jamf_id, asset_tag + ) + ) return True api_url = "{}/JSSResource/computers/id/{}".format(jamfpro_base, jamf_id) - payload = """{}{}""".format(jamf_id, asset_tag) - logging.debug('Making Get request against: {}\nPayload for the PUT request is: {}\nThe username, password, and headers can be found near the beginning of the output.'.format(api_url, payload)) - response = session.put(api_url, data=payload, headers=jamfxmlheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + payload = """{}{}""".format( + jamf_id, asset_tag + ) + logging.debug( + "Making Get request against: {}\nPayload for the PUT request is: {}\nThe username, password, and headers can be found near the beginning of the output.".format( + api_url, payload + ) + ) + response = session.put( + api_url, + data=payload, + headers=jamfxmlheaders, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 201: logging.debug("Got a 201 response. Returning: True") return True - elif b'policies.ratelimit.QuotaViolation' in response.content: - logging.info('JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to look up id: {} Waiting a bit to retry the lookup.'.format(response, jamf_id)) - logging.warning('JAMFPro Ratelimit exceeded: pausing ') + elif b"policies.ratelimit.QuotaViolation" in response.content: + logging.info( + "JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to look up id: {} Waiting a bit to retry the lookup.".format( + response, jamf_id + ) + ) + logging.warning("JAMFPro Ratelimit exceeded: pausing ") time.sleep(75) logging.info("Finished waiting. Retrying update...") newresponse = update_jamf_asset_tag(jamf_id, asset_tag) return newresponse if response.status_code == 200: - logging.debug("Got a 200 response code. Returning the response: {}".format(response)) + logging.debug( + "Got a 200 response code. Returning the response: {}".format(response) + ) return response.json() else: - logging.warning('Got back an error response code:{} - {}'.format(response.status_code, response.content)) + logging.warning( + "Got back an error response code:{} - {}".format( + response.status_code, response.content + ) + ) return None + # Function to update the asset tag of mobile devices in JAMF with an number passed from Snipe. def update_jamf_mobiledevice_asset_tag(jamf_id, asset_tag): if user_args.dryrun: - logging.debug("Would have updated JAMF asset id: {} with asset tag: {}".format(jamf_id, asset_tag)) + logging.debug( + "Would have updated JAMF asset id: {} with asset tag: {}".format( + jamf_id, asset_tag + ) + ) return True api_url = "{}/JSSResource/mobiledevices/id/{}".format(jamfpro_base, jamf_id) - payload = """{}{}""".format(jamf_id, asset_tag) - logging.debug('Making Get request against: {}\nPayload for the PUT request is: {}\nThe username, password, and headers can be found near the beginning of the output.'.format(api_url, payload)) - response = session.put(api_url, data=payload, headers=jamfxmlheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + payload = """{}{}""".format( + jamf_id, asset_tag + ) + logging.debug( + "Making Get request against: {}\nPayload for the PUT request is: {}\nThe username, password, and headers can be found near the beginning of the output.".format( + api_url, payload + ) + ) + response = session.put( + api_url, + data=payload, + headers=jamfxmlheaders, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 201: logging.debug("Got a 201 response. Returning: True") return True - elif b'policies.ratelimit.QuotaViolation' in response.content: - logging.info('JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to look up id: {} Waiting a bit to retry the lookup.'.format(response, jamf_id)) - logging.warning('JAMFPro Ratelimit exceeded: pausing ') + elif b"policies.ratelimit.QuotaViolation" in response.content: + logging.info( + "JAMFPro responded with error code: {} - Policy Ratelimit Quota Violation - when we tried to look up id: {} Waiting a bit to retry the lookup.".format( + response, jamf_id + ) + ) + logging.warning("JAMFPro Ratelimit exceeded: pausing ") time.sleep(75) logging.info("Finished waiting. Retrying update...") newresponse = update_jamf_mobiledevice_asset_tag(jamf_id, asset_tag) return newresponse if response.status_code == 200: - logging.debug("Got a 200 response code. Returning the response: {}".format(response)) + logging.debug( + "Got a 200 response code. Returning the response: {}".format(response) + ) return response.json() else: - logging.warning('Got back an error response code:{} - {}'.format(response.status_code, response.content)) + logging.warning( + "Got back an error response code:{} - {}".format( + response.status_code, response.content + ) + ) return None + # Function to lookup a snipe asset by serial number. def search_snipe_asset(serial): - api_url = '{}/api/v1/hardware/byserial/{}'.format(snipe_base, serial) - response = session.get(api_url, headers=snipeheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + api_url = "{}/api/v1/hardware/byserial/{}".format(snipe_base, serial) + response = session.get( + api_url, + headers=snipeheaders, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 200: jsonresponse = response.json() # Check to make sure there's actually a result if "total" in jsonresponse: - if jsonresponse['total'] == 1: + if jsonresponse["total"] == 1: return jsonresponse - elif jsonresponse['total'] == 0: + elif jsonresponse["total"] == 0: logging.info("No assets match {}".format(serial)) return "NoMatch" else: - logging.warning('FOUND {} matching assets while searching for: {}'.format(jsonresponse['total'], serial)) + logging.warning( + "FOUND {} matching assets while searching for: {}".format( + jsonresponse["total"], serial + ) + ) return "MultiMatch" else: logging.info("No assets match {}".format(serial)) return "NoMatch" else: - logging.warning('Snipe-IT responded with error code:{} when we tried to look up: {}'.format(response.text, serial)) - logging.debug('{} - {}'.format(response.status_code, response.content)) + logging.warning( + "Snipe-IT responded with error code:{} when we tried to look up: {}".format( + response.text, serial + ) + ) + logging.debug("{} - {}".format(response.status_code, response.content)) return "ERROR" + # Function to get all the asset models def get_snipe_models(): - api_url = '{}/api/v1/models'.format(snipe_base) - logging.debug('Calling against: {}'.format(api_url)) - response = session.get(api_url, headers=snipeheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + api_url = "{}/api/v1/models".format(snipe_base) + logging.debug("Calling against: {}".format(api_url)) + response = session.get( + api_url, + headers=snipeheaders, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 200: jsonresponse = response.json() - logging.info("Got a valid response that should have {} models.".format(jsonresponse['total'])) - if jsonresponse['total'] <= len(jsonresponse['rows']) : + logging.info( + "Got a valid response that should have {} models.".format( + jsonresponse["total"] + ) + ) + if jsonresponse["total"] <= len(jsonresponse["rows"]): return jsonresponse else: logging.info("We didn't get enough results so we need to get them again.") - api_url = '{}/api/v1/models?limit={}'.format(snipe_base, jsonresponse['total']) - newresponse = session.get(api_url, headers=snipeheaders, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + api_url = "{}/api/v1/models?limit={}".format( + snipe_base, jsonresponse["total"] + ) + newresponse = session.get( + api_url, + headers=snipeheaders, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 200: newjsonresponse = newresponse.json() - if newjsonresponse['total'] == len(newjsonresponse['rows']) : + if newjsonresponse["total"] == len(newjsonresponse["rows"]): return newjsonresponse else: logging.error("We couldn't seem to get all of the model numbers") - raise SystemExit("Unable to get all model objects from Snipe-IT instanace") + raise SystemExit( + "Unable to get all model objects from Snipe-IT instanace" + ) else: - logging.error('When we tried to retreive a list of models, Snipe-IT responded with error status code:{} - {}'.format(response.status_code, response.content)) + logging.error( + "When we tried to retreive a list of models, Snipe-IT responded with error status code:{} - {}".format( + response.status_code, response.content + ) + ) raise SystemExit("Snipe models API endpoint failed.") else: - logging.error('When we tried to retreive a list of models, Snipe-IT responded with error status code:{} - {}'.format(response.status_code, response.content)) + logging.error( + "When we tried to retreive a list of models, Snipe-IT responded with error status code:{} - {}".format( + response.status_code, response.content + ) + ) raise SystemExit("Snipe models API endpoint failed.") + # Recursive function returns all users in a Snipe Instance, 100 at a time. def get_snipe_users(previous=[]): - user_id_url = '{}/api/v1/users'.format(snipe_base) - payload = { - 'limit': 100, - 'offset': len(previous) - } - logging.debug('The payload for the snipe users GET is {}'.format(payload)) - response = session.get(user_id_url, headers=snipeheaders, params=payload, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + user_id_url = "{}/api/v1/users".format(snipe_base) + payload = {"limit": 100, "offset": len(previous)} + logging.debug("The payload for the snipe users GET is {}".format(payload)) + response = session.get( + user_id_url, + headers=snipeheaders, + params=payload, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) response_json = response.json() - current = response_json['rows'] + current = response_json["rows"] if len(previous) != 0: current = previous + current - if response_json['total'] > len(current): - logging.debug('We have more than 100 users, get the next page - total: {} current: {}'.format(response_json['total'], len(current))) + if response_json["total"] > len(current): + logging.debug( + "We have more than 100 users, get the next page - total: {} current: {}".format( + response_json["total"], len(current) + ) + ) return get_snipe_users(current) else: return current + # Function to search snipe for a user def get_snipe_user_id(username): - if username == '': + if username == "": return "NotFound" username = username.lower() for user in snipe_users: for value in user.values(): if str(value).lower() == username: - id = user['id'] + id = user["id"] return id if user_args.users_no_search: - logging.debug("No matches in snipe_users for {}, not querying the API for the next closest match since we've been told not to".format(username)) + logging.debug( + "No matches in snipe_users for {}, not querying the API for the next closest match since we've been told not to".format( + username + ) + ) return "NotFound" - logging.debug('No matches in snipe_users for {}, querying the API for the next closest match'.format(username)) - user_id_url = '{}/api/v1/users'.format(snipe_base) - payload = { - 'search':username, - 'limit':1, - 'sort':'username', - 'order':'asc' - } - logging.debug('The payload for the snipe user search is: {}'.format(payload)) - response = session.get(user_id_url, headers=snipeheaders, params=payload, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + logging.debug( + "No matches in snipe_users for {}, querying the API for the next closest match".format( + username + ) + ) + user_id_url = "{}/api/v1/users".format(snipe_base) + payload = {"search": username, "limit": 1, "sort": "username", "order": "asc"} + logging.debug("The payload for the snipe user search is: {}".format(payload)) + response = session.get( + user_id_url, + headers=snipeheaders, + params=payload, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) try: - return response.json()['rows'][0]['id'] + return response.json()["rows"][0]["id"] except: return "NotFound" + # Function that creates a new Snipe Model - not an asset - with a JSON payload def create_snipe_model(payload): - api_url = '{}/api/v1/models'.format(snipe_base) - logging.debug('Calling to create new snipe model type against: {}\nThe payload for the POST request is:{}\nThe request headers can be found near the start of the output.'.format(api_url, payload)) - response = session.post(api_url, headers=snipeheaders, json=payload, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + api_url = "{}/api/v1/models".format(snipe_base) + logging.debug( + "Calling to create new snipe model type against: {}\nThe payload for the POST request is:{}\nThe request headers can be found near the start of the output.".format( + api_url, payload + ) + ) + response = session.post( + api_url, + headers=snipeheaders, + json=payload, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) if response.status_code == 200: jsonresponse = response.json() - modelnumbers[jsonresponse['payload']['model_number']] = jsonresponse['payload']['id'] + modelnumbers[jsonresponse["payload"]["model_number"]] = jsonresponse["payload"][ + "id" + ] return True else: - logging.warning('Error code: {} while trying to create a new model.'.format(response.status_code)) + logging.warning( + "Error code: {} while trying to create a new model.".format( + response.status_code + ) + ) return False + # Function to create a new asset by passing array def create_snipe_asset(payload): - api_url = '{}/api/v1/hardware'.format(snipe_base) - logging.debug('Calling to create a new asset against: {}\nThe payload for the POST request is:{}\nThe request headers can be found near the start of the output.'.format(api_url, payload)) - response = session.post(api_url, headers=snipeheaders, json=payload, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + api_url = "{}/api/v1/hardware".format(snipe_base) + logging.debug( + "Calling to create a new asset against: {}\nThe payload for the POST request is:{}\nThe request headers can be found near the start of the output.".format( + api_url, payload + ) + ) + response = session.post( + api_url, + headers=snipeheaders, + json=payload, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) logging.debug(response.text) if response.status_code == 200: logging.debug("Got back status code: 200 - {}".format(response.content)) jsonresponse = response.json() - if jsonresponse['status'] == "error": - logging.error('Asset creation failed for asset {} with error {}'.format(payload['name'],jsonresponse['messages'])) - return 'ERROR', response - return 'AssetCreated', response + if jsonresponse["status"] == "error": + logging.error( + "Asset creation failed for asset {} with error {}".format( + payload["name"], jsonresponse["messages"] + ) + ) + return "ERROR", response + return "AssetCreated", response else: - logging.error('Asset creation failed for asset {} with error {}'.format(payload['name'],response.text)) - return 'ERROR', response + logging.error( + "Asset creation failed for asset {} with error {}".format( + payload["name"], response.text + ) + ) + return "ERROR", response + # Function that updates a snipe asset with a JSON payload def update_snipe_asset(snipe_id, payload): if user_args.dryrun: - logging.debug("Dry run mode is enabled. We would have updated ID: {} with the following payload: {}".format(snipe_id, payload)) + logging.debug( + "Dry run mode is enabled. We would have updated ID: {} with the following payload: {}".format( + snipe_id, payload + ) + ) return True - api_url = '{}/api/v1/hardware/{}'.format(snipe_base, snipe_id) - logging.debug('The payload for the snipe update is: {}'.format(payload)) - response = session.patch(api_url, headers=snipeheaders, json=payload, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) + api_url = "{}/api/v1/hardware/{}".format(snipe_base, snipe_id) + logging.debug("The payload for the snipe update is: {}".format(payload)) + response = session.patch( + api_url, + headers=snipeheaders, + json=payload, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) # Verify that the payload updated properly. goodupdate = True if response.status_code == 200: - logging.debug("Got back status code: 200 - Checking the payload updated properly: If you error here it's because you configure the API mapping right.") + logging.debug( + "Got back status code: 200 - Checking the payload updated properly: If you error here it's because you configure the API mapping right." + ) jsonresponse = response.json() # Check if there's an Error and Log it, or parse the payload. - if jsonresponse['status'] == "error": - logging.error('Unable to update ID: {}. Error "{}"'.format(snipe_id, jsonresponse['messages'])) + if jsonresponse["status"] == "error": + logging.error( + 'Unable to update ID: {}. Error "{}"'.format( + snipe_id, jsonresponse["messages"] + ) + ) goodupdate = False else: for key in payload: - if key == 'purchase_date': + if key == "purchase_date": payload[key] = payload[key] + " 00:00:00" - if payload[key] == '': + if payload[key] == "": payload[key] = None - if jsonresponse['payload'][key] != payload[key]: - logging.warning('Unable to update ID: {}. We failed to update the {} field with "{}"'.format(snipe_id, key, payload[key])) + if jsonresponse["payload"][key] != payload[key]: + logging.warning( + 'Unable to update ID: {}. We failed to update the {} field with "{}"'.format( + snipe_id, key, payload[key] + ) + ) goodupdate = False else: - logging.info("Sucessfully updated {} with: {}".format(key, payload[key])) + logging.info( + "Sucessfully updated {} with: {}".format(key, payload[key]) + ) return goodupdate else: - logging.error('Whoops. Got an error status code while updating ID {}: {} - {}'.format(snipe_id, response.status_code, response.content)) + logging.error( + "Whoops. Got an error status code while updating ID {}: {} - {}".format( + snipe_id, response.status_code, response.content + ) + ) return False + # Function that checks in an asset in snipe def checkin_snipe_asset(asset_id): - api_url = '{}/api/v1/hardware/{}/checkin'.format(snipe_base, asset_id) - payload = { - 'note':'checked in by script from Jamf' - } - logging.debug('The payload for the snipe checkin is: {}'.format(payload)) - response = session.post(api_url, headers=snipeheaders, json=payload, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) - logging.debug('The response from Snipe IT is: {}'.format(response.json())) + api_url = "{}/api/v1/hardware/{}/checkin".format(snipe_base, asset_id) + payload = {"note": "checked in by script from Jamf"} + logging.debug("The payload for the snipe checkin is: {}".format(payload)) + response = session.post( + api_url, + headers=snipeheaders, + json=payload, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) + logging.debug("The response from Snipe IT is: {}".format(response.json())) if response.status_code == 200: logging.debug("Got back status code: 200 - {}".format(response.content)) return "CheckedOut" else: return response + # Function that checks out an asset in snipe def checkout_snipe_asset(user, asset_id, checked_out_user=None): - logging.debug('Asset {} is being checked out to {}'.format(user, asset_id)) + logging.debug("Asset {} is being checked out to {}".format(user, asset_id)) user_id = get_snipe_user_id(user) - if user_id == 'NotFound': + if user_id == "NotFound": logging.info("User {} not found".format(user)) return "NotFound" if checked_out_user == None: logging.info("Not checked out, checking out to {}".format(user)) elif checked_out_user == "NewAsset": - logging.info("First time this asset will be checked out, checking out to {}".format(user)) - elif checked_out_user['id'] == user_id: + logging.info( + "First time this asset will be checked out, checking out to {}".format(user) + ) + elif checked_out_user["id"] == user_id: logging.info(str(asset_id) + " already checked out to user " + user) - return 'CheckedOut' + return "CheckedOut" else: - logging.info("Checking in {} to check it out to {}".format(asset_id,user)) + logging.info("Checking in {} to check it out to {}".format(asset_id, user)) checkin_snipe_asset(asset_id) - api_url = '{}/api/v1/hardware/{}/checkout'.format(snipe_base, asset_id) - logging.info("Checking out {} to check it out to {}".format(asset_id,user)) + api_url = "{}/api/v1/hardware/{}/checkout".format(snipe_base, asset_id) + logging.info("Checking out {} to check it out to {}".format(asset_id, user)) payload = { - 'checkout_to_type':'user', - 'assigned_user':user_id, - 'note':'Assignment made automatically, via script from Jamf.' + "checkout_to_type": "user", + "assigned_user": user_id, + "note": "Assignment made automatically, via script from Jamf.", } - logging.debug('The payload for the snipe checkin is: {}'.format(payload)) - response = session.post(api_url, headers=snipeheaders, json=payload, verify=user_args.do_not_verify_ssl, hooks={'response': request_handler}) - logging.debug('The response from Snipe IT is: {}'.format(response.json())) + logging.debug("The payload for the snipe checkin is: {}".format(payload)) + response = session.post( + api_url, + headers=snipeheaders, + json=payload, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) + logging.debug("The response from Snipe IT is: {}".format(response.json())) if response.status_code == 200: logging.debug("Got back status code: 200 - {}".format(response.content)) return "CheckedOut" else: - logging.error('Asset checkout failed for asset {} with error {}'.format(asset_id,response.text)) + logging.error( + "Asset checkout failed for asset {} with error {}".format( + asset_id, response.text + ) + ) return response + ### Run Testing ### # Report if we're verifying SSL or not. logging.info("SSL Verification is set to: {}".format(user_args.do_not_verify_ssl)) @@ -684,27 +1149,41 @@ logging.info("SSL Verification is set to: {}".format(user_args.do_not_verify_ssl # Do some tests to see if the hosts are up. Don't use hooks for these as we don't have tokens yet. logging.info("Running tests to see if hosts are up.") try: - SNIPE_UP = True if session.get(snipe_base, verify=user_args.do_not_verify_ssl).status_code == 200 else False + SNIPE_UP = ( + True + if session.get(snipe_base, verify=user_args.do_not_verify_ssl).status_code + == 200 + else False + ) except Exception as e: logging.exception(e) SNIPE_UP = False try: - JAMF_UP = True if session.get(jamfpro_base, verify=user_args.do_not_verify_ssl).status_code in (200, 401) else False + JAMF_UP = ( + True + if session.get(jamfpro_base, verify=user_args.do_not_verify_ssl).status_code + in (200, 401) + else False + ) except Exception as e: logging.exception(e) JAMF_UP = False if not SNIPE_UP: - logging.error('Snipe-IT looks like it is down from here. \nPlease check your config in the settings.conf file, or your instance.') + logging.error( + "Snipe-IT looks like it is down from here. \nPlease check your config in the settings.conf file, or your instance." + ) else: - logging.info('We were able to get a good response from your Snipe-IT instance.') + logging.info("We were able to get a good response from your Snipe-IT instance.") if not JAMF_UP: - logging.error('JAMFPro looks down from here. \nPlease check the your config in the settings.conf file, or your hosted JAMFPro instance.') + logging.error( + "JAMFPro looks down from here. \nPlease check the your config in the settings.conf file, or your hosted JAMFPro instance." + ) else: - logging.info('We were able to get a good response from your JAMFPro instance.') + logging.info("We were able to get a good response from your JAMFPro instance.") # Exit if you can't contact SNIPE -if ( JAMF_UP == False ) or ( SNIPE_UP == False ): +if (JAMF_UP == False) or (SNIPE_UP == False): raise SystemExit("Error: Host could not be contacted.") # Test that we can actually connect with the API keys by getting a bearer token. @@ -717,32 +1196,43 @@ logging.info("Finished running our tests.") # Get a list of known models from Snipe logging.info("Getting a list of computer models that snipe knows about.") snipemodels = get_snipe_models() -logging.debug("Parsing the {} model results for models with model numbers.".format(len(snipemodels['rows']))) +logging.debug( + "Parsing the {} model results for models with model numbers.".format( + len(snipemodels["rows"]) + ) +) modelnumbers = {} -for model in snipemodels['rows']: - if model['model_number'] == "": - logging.debug("The model, {}, did not have a model number. Skipping.".format(model['name'])) +for model in snipemodels["rows"]: + if model["model_number"] == "": + logging.debug( + "The model, {}, did not have a model number. Skipping.".format( + model["name"] + ) + ) continue - modelnumbers[model['model_number']] = model['id'] + modelnumbers[model["model_number"]] = model["id"] logging.info("Our list of models has {} entries.".format(len(modelnumbers))) -logging.debug("Here's the list of the {} models and their id's that we were able to collect:\n{}".format(len(modelnumbers), modelnumbers)) +logging.debug( + "Here's the list of the {} models and their id's that we were able to collect:\n{}".format( + len(modelnumbers), modelnumbers + ) +) # Get the IDS of all active assets. -if 'computer_group_id' in config['jamf'] and config['jamf']['computer_group_id']: +if "computer_group_id" in config["jamf"] and config["jamf"]["computer_group_id"]: logging.info("Getting list of computers from JAMF by computer group id.") - jamf_computer_list = get_jamf_computers_by_group(config['jamf']['computer_group_id']) + jamf_computer_list = get_jamf_computers_by_group( + config["jamf"]["computer_group_id"] + ) else: jamf_computer_list = get_jamf_computers() -if 'mobile_group_id' in config['jamf'] and config['jamf']['mobile_group_id']: +if "mobile_group_id" in config["jamf"] and config["jamf"]["mobile_group_id"]: logging.info("Getting list of mobiles from JAMF by mobile group id.") - jamf_mobile_list = get_jamf_mobiles_by_group(config['jamf']['mobile_group_id']) + jamf_mobile_list = get_jamf_mobiles_by_group(config["jamf"]["mobile_group_id"]) else: jamf_mobile_list = get_jamf_mobiles() -jamf_types = { - 'computers': jamf_computer_list, - 'mobile_devices': jamf_mobile_list -} +jamf_types = {"computers": jamf_computer_list, "mobile_devices": jamf_mobile_list} # Get a list of users from Snipe if the user has specified # they're syncing users @@ -752,18 +1242,22 @@ if user_args.users or user_args.users_force or user_args.users_inverse: TotalNumber = 0 if user_args.computers: - TotalNumber = len(jamf_types['computers']['computers']) + TotalNumber = len(jamf_types["computers"]["computers"]) elif user_args.mobiles: - TotalNumber = len(jamf_types['mobile_devices']['mobile_devices']) + TotalNumber = len(jamf_types["mobile_devices"]["mobile_devices"]) else: for jamf_type in jamf_types: TotalNumber += len(jamf_types[jamf_type][jamf_type]) # Make sure we have a good list. if jamf_computer_list != None: - logging.info('Received a list of JAMF assets that had {} entries.'.format(TotalNumber)) + logging.info( + "Received a list of JAMF assets that had {} entries.".format(TotalNumber) + ) else: - logging.error("We were not able to retreive a list of assets from your JAMF instance. It's likely that your settings, or credentials are incorrect. Check your settings.conf and verify you can make API calls outside of this system with the credentials found in your settings.conf") + logging.error( + "We were not able to retreive a list of assets from your JAMF instance. It's likely that your settings, or credentials are incorrect. Check your settings.conf and verify you can make API calls outside of this system with the credentials found in your settings.conf" + ) raise SystemExit("Unable to get JAMF Computers.") # After this point we start editing data, so quit if this is a dryrun @@ -771,110 +1265,174 @@ if user_args.connection_test: raise SystemExit("Connection Test: Complete.") # From this point on, we're editing data. -logging.info('Starting to Update Inventory') +logging.info("Starting to Update Inventory") CurrentNumber = 0 for jamf_type in jamf_types: if user_args.computers: - if jamf_type != 'computers': + if jamf_type != "computers": continue if user_args.mobiles: - if jamf_type != 'mobile_devices': + if jamf_type != "mobile_devices": continue for jamf_asset in jamf_types[jamf_type][jamf_type]: CurrentNumber += 1 - logging.info("Processing entry {} out of {} - JAMFID: {} - NAME: {}".format(CurrentNumber, TotalNumber, jamf_asset['id'], jamf_asset['name'])) + logging.info( + "Processing entry {} out of {} - JAMFID: {} - NAME: {}".format( + CurrentNumber, TotalNumber, jamf_asset["id"], jamf_asset["name"] + ) + ) # Search through the list by ID for all asset information\ - if jamf_type == 'computers': - jamf = search_jamf_asset(jamf_asset['id']) - elif jamf_type == 'mobile_devices': - jamf = search_jamf_mobile(jamf_asset['id']) + if jamf_type == "computers": + jamf = search_jamf_asset(jamf_asset["id"]) + elif jamf_type == "mobile_devices": + jamf = search_jamf_mobile(jamf_asset["id"]) if jamf == None: continue # If the entry doesn't contain a serial, then we need to skip this entry. - if jamf['general']['serial_number'] == 'Not Available': - logging.warning("The serial number is not available in JAMF. This is normal for DEP enrolled devices that have not yet checked in for the first time and for personal mobile devices. Since there's no serial number yet, we'll skip it for now.") + if jamf["general"]["serial_number"] == "Not Available": + logging.warning( + "The serial number is not available in JAMF. This is normal for DEP enrolled devices that have not yet checked in for the first time and for personal mobile devices. Since there's no serial number yet, we'll skip it for now." + ) continue - if jamf['general']['serial_number'] == None: - logging.warning("The serial number is not available in JAMF. This is normal for DEP enrolled devices that have not yet checked in for the first time and for personal mobile devices. Since there's no serial number yet, we'll skip it for now.") + if jamf["general"]["serial_number"] == None: + logging.warning( + "The serial number is not available in JAMF. This is normal for DEP enrolled devices that have not yet checked in for the first time and for personal mobile devices. Since there's no serial number yet, we'll skip it for now." + ) continue # Check that the model number exists in snipe, if not create it. - if jamf_type == 'computers': - if jamf['hardware']['model_identifier'] not in modelnumbers and jamf['hardware']['model_identifier']: - logging.info("Could not find a model ID in snipe for: {}".format(jamf['hardware']['model_identifier'])) - newmodel = {"category_id":config['snipe-it']['computer_model_category_id'],"manufacturer_id":apple_manufacturer_id,"name": jamf['hardware']['model'],"model_number":jamf['hardware']['model_identifier']} - if 'computer_custom_fieldset_id' in config['snipe-it']: - fieldset_split = config['snipe-it']['computer_custom_fieldset_id'] - newmodel['fieldset_id'] = fieldset_split + if jamf_type == "computers": + if ( + jamf["hardware"]["model_identifier"] not in modelnumbers + and jamf["hardware"]["model_identifier"] + ): + logging.info( + "Could not find a model ID in snipe for: {}".format( + jamf["hardware"]["model_identifier"] + ) + ) + newmodel = { + "category_id": config["snipe-it"]["computer_model_category_id"], + "manufacturer_id": apple_manufacturer_id, + "name": jamf["hardware"]["model"], + "model_number": jamf["hardware"]["model_identifier"], + } + if "computer_custom_fieldset_id" in config["snipe-it"]: + fieldset_split = config["snipe-it"]["computer_custom_fieldset_id"] + newmodel["fieldset_id"] = fieldset_split create_snipe_model(newmodel) - elif jamf_type == 'mobile_devices': - if jamf['general']['model_identifier'] not in modelnumbers and jamf['general']['model_identifier']: - logging.info("Could not find a model ID in snipe for: {}".format(jamf['general']['model_identifier'])) - newmodel = {"category_id":config['snipe-it']['mobile_model_category_id'],"manufacturer_id":apple_manufacturer_id,"name": jamf['general']['model'],"model_number":jamf['general']['model_identifier']} - if 'mobile_custom_fieldset_id' in config['snipe-it']: - fieldset_split = config['snipe-it']['mobile_custom_fieldset_id'] - newmodel['fieldset_id'] = fieldset_split + elif jamf_type == "mobile_devices": + if ( + jamf["general"]["model_identifier"] not in modelnumbers + and jamf["general"]["model_identifier"] + ): + logging.info( + "Could not find a model ID in snipe for: {}".format( + jamf["general"]["model_identifier"] + ) + ) + newmodel = { + "category_id": config["snipe-it"]["mobile_model_category_id"], + "manufacturer_id": apple_manufacturer_id, + "name": jamf["general"]["model"], + "model_number": jamf["general"]["model_identifier"], + } + if "mobile_custom_fieldset_id" in config["snipe-it"]: + fieldset_split = config["snipe-it"]["mobile_custom_fieldset_id"] + newmodel["fieldset_id"] = fieldset_split create_snipe_model(newmodel) # Pass the SN from JAMF to search for a match in Snipe - snipe = search_snipe_asset(jamf['general']['serial_number']) + snipe = search_snipe_asset(jamf["general"]["serial_number"]) # Create a new asset if there's no match: - if snipe == 'NoMatch': - logging.info("Creating a new asset in snipe for JAMF ID {} - {}".format(jamf['general']['id'], jamf['general']['name'])) + if snipe == "NoMatch": + logging.info( + "Creating a new asset in snipe for JAMF ID {} - {}".format( + jamf["general"]["id"], jamf["general"]["name"] + ) + ) # This section checks to see if the asset tag was already put into JAMF, if not it creates one with with Jamf's ID. - if jamf['general']['asset_tag'] == '': + if jamf["general"]["asset_tag"] == "": jamf_asset_tag = None - logging.debug('No asset tag found in Jamf, checking settings.conf for alternative specified field.') - if 'asset_tag' in config['snipe-it']: - tag_split = config['snipe-it']['asset_tag'].split() + logging.debug( + "No asset tag found in Jamf, checking settings.conf for alternative specified field." + ) + if "asset_tag" in config["snipe-it"]: + tag_split = config["snipe-it"]["asset_tag"].split() try: - jamf_asset_tag = jamf['{}'.format(tag_split[0])]['{}'.format(tag_split[1])] + jamf_asset_tag = jamf["{}".format(tag_split[0])][ + "{}".format(tag_split[1]) + ] except: - if jamf_type == 'mobile_devices': - jamf_asset_tag = 'jamfid-m-{}'.format(jamf['general']['id']) - elif jamf_type == 'computers': - jamf_asset_tag = 'jamfid-{}'.format(jamf['general']['id']) + if jamf_type == "mobile_devices": + jamf_asset_tag = "jamfid-m-{}".format(jamf["general"]["id"]) + elif jamf_type == "computers": + jamf_asset_tag = "jamfid-{}".format(jamf["general"]["id"]) else: - logging.error("Could not generate an asset tag for this device. Skipping") + logging.error( + "Could not generate an asset tag for this device. Skipping" + ) # Dump the object for debugging. logging.verbose(jamf) continue - #raise SystemError('No such attribute {} in the jamf payload. Please check your settings.conf file'.format(tag_split)) - if jamf_asset_tag == None or jamf_asset_tag == '': - logging.debug('No custom configuration found in settings.conf for asset tag name upon asset creation.') - if jamf_type == 'mobile_devices': - jamf_asset_tag = 'jamfid-m-{}'.format(jamf['general']['id']) - elif jamf_type == 'computers': - jamf_asset_tag = 'jamfid-{}'.format(jamf['general']['id']) + # raise SystemError('No such attribute {} in the jamf payload. Please check your settings.conf file'.format(tag_split)) + if jamf_asset_tag == None or jamf_asset_tag == "": + logging.debug( + "No custom configuration found in settings.conf for asset tag name upon asset creation." + ) + if jamf_type == "mobile_devices": + jamf_asset_tag = "jamfid-m-{}".format(jamf["general"]["id"]) + elif jamf_type == "computers": + jamf_asset_tag = "jamfid-{}".format(jamf["general"]["id"]) else: - jamf_asset_tag = jamf['general']['asset_tag'] - logging.info("Asset tag found in Jamf, setting it to: {}".format(jamf_asset_tag)) + jamf_asset_tag = jamf["general"]["asset_tag"] + logging.info( + "Asset tag found in Jamf, setting it to: {}".format(jamf_asset_tag) + ) # Create the payload - if jamf_type == 'mobile_devices': + if jamf_type == "mobile_devices": logging.debug("Payload is being made for a mobile device") - newasset = {'asset_tag': jamf_asset_tag, 'model_id': modelnumbers['{}'.format(jamf['general']['model_identifier'])], 'name': jamf['general']['name'], 'status_id': defaultStatus,'serial': jamf['general']['serial_number']} - elif jamf_type == 'computers': + newasset = { + "asset_tag": jamf_asset_tag, + "model_id": modelnumbers[ + "{}".format(jamf["general"]["model_identifier"]) + ], + "name": jamf["general"]["name"], + "status_id": defaultStatus, + "serial": jamf["general"]["serial_number"], + } + elif jamf_type == "computers": logging.debug("Payload is being made for a computer") - newasset = {'asset_tag': jamf_asset_tag,'model_id': modelnumbers['{}'.format(jamf['hardware']['model_identifier'])], 'name': jamf['general']['name'], 'status_id': defaultStatus,'serial': jamf['general']['serial_number']} + newasset = { + "asset_tag": jamf_asset_tag, + "model_id": modelnumbers[ + "{}".format(jamf["hardware"]["model_identifier"]) + ], + "name": jamf["general"]["name"], + "status_id": defaultStatus, + "serial": jamf["general"]["serial_number"], + } else: - for snipekey in config['{}-api-mapping'.format(jamf_type)]: - jamfsplit = config['{}-api-mapping'.format(jamf_type)][snipekey].split() + for snipekey in config["{}-api-mapping".format(jamf_type)]: + jamfsplit = config["{}-api-mapping".format(jamf_type)][ + snipekey + ].split() try: for i, item in enumerate(jamfsplit): try: item = int(item) except ValueError: - logging.debug('{} is not an integer'.format(item)) + logging.debug("{} is not an integer".format(item)) if i == 0: jamf_value = jamf[item] else: - if jamfsplit[0] == 'extension_attributes': + if jamfsplit[0] == "extension_attributes": for attribute in jamf_value: - if attribute['id'] == item: - jamf_value = attribute['value'] + if attribute["id"] == item: + jamf_value = attribute["value"] else: jamf_value = jamf_value[item] newasset[snipekey] = jamf_value @@ -882,109 +1440,194 @@ for jamf_type in jamf_types: continue # Reset the payload without the asset_tag if auto_incrementing flag is set. if user_args.auto_incrementing: - newasset.pop('asset_tag', None) + newasset.pop("asset_tag", None) new_snipe_asset = create_snipe_asset(newasset) logging.debug(new_snipe_asset) if new_snipe_asset[0] != "AssetCreated": continue if user_args.users or user_args.users_force or user_args.users_inverse: - jamfsplit = config['user-mapping']['jamf_api_field'].split() + jamfsplit = config["user-mapping"]["jamf_api_field"].split() if jamfsplit[1] not in jamf[jamfsplit[0]]: - logging.info("Couldn't find {} for this device in {}, not checking it out.".format(jamfsplit[1], jamfsplit[0])) + logging.info( + "Couldn't find {} for this device in {}, not checking it out.".format( + jamfsplit[1], jamfsplit[0] + ) + ) continue - logging.info('Checking out new item {} to user {}'.format(jamf['general']['name'], jamf['{}'.format(jamfsplit[0])]['{}'.format(jamfsplit[1])])) - checkout_snipe_asset(jamf['{}'.format(jamfsplit[0])]['{}'.format(jamfsplit[1])],new_snipe_asset[1].json()['payload']['id'], "NewAsset") + logging.info( + "Checking out new item {} to user {}".format( + jamf["general"]["name"], + jamf["{}".format(jamfsplit[0])]["{}".format(jamfsplit[1])], + ) + ) + checkout_snipe_asset( + jamf["{}".format(jamfsplit[0])]["{}".format(jamfsplit[1])], + new_snipe_asset[1].json()["payload"]["id"], + "NewAsset", + ) # Log an error if there's an issue, or more than once match. - elif snipe == 'MultiMatch': - logging.warning("WARN: You need to resolve multiple assets with the same serial number in your inventory. If you can't find them in your inventory, you might need to purge your deleted records. You can find that in the Snipe Admin settings. Skipping serial number {} for now.".format(jamf['general']['serial_number'])) - elif snipe == 'ERROR': - logging.error("We got an error when looking up serial number {} in snipe, which shouldn't happen at this point. Check your snipe instance and setup. Skipping for now.".format(jamf['general']['serial_number'])) + elif snipe == "MultiMatch": + logging.warning( + "WARN: You need to resolve multiple assets with the same serial number in your inventory. If you can't find them in your inventory, you might need to purge your deleted records. You can find that in the Snipe Admin settings. Skipping serial number {} for now.".format( + jamf["general"]["serial_number"] + ) + ) + elif snipe == "ERROR": + logging.error( + "We got an error when looking up serial number {} in snipe, which shouldn't happen at this point. Check your snipe instance and setup. Skipping for now.".format( + jamf["general"]["serial_number"] + ) + ) else: # Only update if JAMF has more recent info. - snipe_id = snipe['rows'][0]['id'] - snipe_time = snipe['rows'][0]['updated_at']['datetime'] - if jamf_type == 'computers': - jamf_time = jamf['general']['report_date'] - elif jamf_type == 'mobile_devices': - jamf_time = jamf['general']['last_inventory_update'] + snipe_id = snipe["rows"][0]["id"] + snipe_time = snipe["rows"][0]["updated_at"]["datetime"] + if jamf_type == "computers": + jamf_time = jamf["general"]["report_date"] + elif jamf_type == "mobile_devices": + jamf_time = jamf["general"]["last_inventory_update"] # Check to see that the JAMF record is newer than the previous Snipe update, or if it is a new record in Snipe - if ( jamf_time > snipe_time ) or ( user_args.force ): + if (jamf_time > snipe_time) or (user_args.force): if user_args.force: - logging.debug("Forced the Update regardless of the timestamps below.") - logging.debug("Updating the Snipe asset because JAMF has a more recent timestamp: {} > {} or the Snipe Record is new".format(jamf_time, snipe_time)) + logging.debug( + "Forced the Update regardless of the timestamps below." + ) + logging.debug( + "Updating the Snipe asset because JAMF has a more recent timestamp: {} > {} or the Snipe Record is new".format( + jamf_time, snipe_time + ) + ) updates = {} - for snipekey in config['{}-api-mapping'.format(jamf_type)]: + for snipekey in config["{}-api-mapping".format(jamf_type)]: try: - jamfsplit = config['{}-api-mapping'.format(jamf_type)][snipekey].split() + jamfsplit = config["{}-api-mapping".format(jamf_type)][ + snipekey + ].split() for i, item in enumerate(jamfsplit): try: item = int(item) except ValueError: - logging.debug('{} is not an integer'.format(item)) + logging.debug("{} is not an integer".format(item)) if i == 0: jamf_value = jamf[item] else: - if jamfsplit[0] == 'extension_attributes': + if jamfsplit[0] == "extension_attributes": for attribute in jamf_value: - if attribute['id'] == item: - jamf_value = attribute['value'] + if attribute["id"] == item: + jamf_value = attribute["value"] else: jamf_value = jamf_value[item] payload = {snipekey: jamf_value} latestvalue = jamf_value except (KeyError, TypeError): - logging.debug("Skipping the payload, because the JAMF key we're mapping to doesn't exist") + logging.debug( + "Skipping the payload, because the JAMF key we're mapping to doesn't exist" + ) continue # Need to check that we're not needlessly updating the asset. # If it's a custom value it'll fail the first section and send it to except section that will parse custom sections. try: - if snipe['rows'][0][snipekey] != latestvalue: + if snipe["rows"][0][snipekey] != latestvalue: updates.update(payload) else: - logging.debug("Skipping the payload, because it already exits.") + logging.debug( + "Skipping the payload, because it already exits." + ) except: - logging.debug("The snipekey lookup failed, which means it's a custom field. Parsing those to see if it needs to be updated or not.") + logging.debug( + "The snipekey lookup failed, which means it's a custom field. Parsing those to see if it needs to be updated or not." + ) needsupdate = False - for CustomField in snipe['rows'][0]['custom_fields']: - if snipe['rows'][0]['custom_fields'][CustomField]['field'] == snipekey : - if snipe['rows'][0]['custom_fields'][CustomField]['value'] != str(latestvalue): - logging.debug("Found the field, and the value needs to be updated from {} to {}".format(snipe['rows'][0]['custom_fields'][CustomField]['value'], latestvalue)) + for CustomField in snipe["rows"][0]["custom_fields"]: + if ( + snipe["rows"][0]["custom_fields"][CustomField]["field"] + == snipekey + ): + if snipe["rows"][0]["custom_fields"][CustomField][ + "value" + ] != str(latestvalue): + logging.debug( + "Found the field, and the value needs to be updated from {} to {}".format( + snipe["rows"][0]["custom_fields"][ + CustomField + ]["value"], + latestvalue, + ) + ) needsupdate = True if needsupdate == True: updates.update(payload) else: - logging.debug("Skipping the payload, because it already exists, or the Snipe key we're mapping to doesn't.") + logging.debug( + "Skipping the payload, because it already exists, or the Snipe key we're mapping to doesn't." + ) if updates: update_snipe_asset(snipe_id, updates) - if ((user_args.users or user_args.users_inverse) and (snipe['rows'][0]['assigned_to'] == None) == user_args.users) or user_args.users_force: + if ( + (user_args.users or user_args.users_inverse) + and (snipe["rows"][0]["assigned_to"] == None) == user_args.users + ) or user_args.users_force: - if snipe['rows'][0]['status_label']['status_meta'] in ('deployable', 'deployed'): - jamfsplit = config['user-mapping']['jamf_api_field'].split() + if snipe["rows"][0]["status_label"]["status_meta"] in ( + "deployable", + "deployed", + ): + jamfsplit = config["user-mapping"]["jamf_api_field"].split() if jamfsplit[1] not in jamf[jamfsplit[0]]: - logging.info("Couldn't find {} for this device in {}, not checking it out.".format(jamfsplit[1], jamfsplit[0])) + logging.info( + "Couldn't find {} for this device in {}, not checking it out.".format( + jamfsplit[1], jamfsplit[0] + ) + ) continue - checkout_snipe_asset(jamf['{}'.format(jamfsplit[0])]['{}'.format(jamfsplit[1])], snipe_id, snipe['rows'][0]['assigned_to']) + checkout_snipe_asset( + jamf["{}".format(jamfsplit[0])]["{}".format(jamfsplit[1])], + snipe_id, + snipe["rows"][0]["assigned_to"], + ) else: - logging.info("Can't checkout {} since the status isn't set to deployable".format(jamf['general']['name'])) + logging.info( + "Can't checkout {} since the status isn't set to deployable".format( + jamf["general"]["name"] + ) + ) else: - logging.info("Snipe Record is newer than the JAMF record. Nothing to sync. If this wrong, then force an inventory update in JAMF") - logging.debug("Not updating the Snipe asset because Snipe has a more recent timestamp: {} < {}".format(jamf_time, snipe_time)) + logging.info( + "Snipe Record is newer than the JAMF record. Nothing to sync. If this wrong, then force an inventory update in JAMF" + ) + logging.debug( + "Not updating the Snipe asset because Snipe has a more recent timestamp: {} < {}".format( + jamf_time, snipe_time + ) + ) # Update/Sync the Snipe Asset Tag Number back to JAMF # The user arg below is set to false if it's called, so this would fail if the user called it. - if (jamf['general']['asset_tag'] != snipe['rows'][0]['asset_tag']) and user_args.do_not_update_jamf : - logging.info("JAMF doesn't have the same asset tag as SNIPE so we'll update it because it should be authoritative.") - if snipe['rows'][0]['asset_tag'][0]: - if jamf_type == 'computers': - update_jamf_asset_tag("{}".format(jamf['general']['id']), '{}'.format(snipe['rows'][0]['asset_tag'])) - logging.info("Device is a computer, updating computer record") - elif jamf_type == 'mobile_devices': - update_jamf_mobiledevice_asset_tag("{}".format(jamf['general']['id']), '{}'.format(snipe['rows'][0]['asset_tag'])) - logging.info("Device is a mobile device, updating the mobile device record") - -logging.debug('Total amount of API calls made: {}'.format(api_count)) + if ( + jamf["general"]["asset_tag"] != snipe["rows"][0]["asset_tag"] + ) and user_args.do_not_update_jamf: + logging.info( + "JAMF doesn't have the same asset tag as SNIPE so we'll update it because it should be authoritative." + ) + if snipe["rows"][0]["asset_tag"][0]: + if jamf_type == "computers": + update_jamf_asset_tag( + "{}".format(jamf["general"]["id"]), + "{}".format(snipe["rows"][0]["asset_tag"]), + ) + logging.info("Device is a computer, updating computer record") + elif jamf_type == "mobile_devices": + update_jamf_mobiledevice_asset_tag( + "{}".format(jamf["general"]["id"]), + "{}".format(snipe["rows"][0]["asset_tag"]), + ) + logging.info( + "Device is a mobile device, updating the mobile device record" + ) + +logging.debug("Total amount of API calls made: {}".format(api_count)) From 89dbcbbe900ffe853e29a0013606486ff49b22d3 Mon Sep 17 00:00:00 2001 From: Jordi Bagot Date: Tue, 16 Dec 2025 09:41:59 +0100 Subject: [PATCH 2/3] Add a new feature to checkout assets to buildings specified in JAMF --- jamf2snipe | 113 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 110 insertions(+), 3 deletions(-) diff --git a/jamf2snipe b/jamf2snipe index a92da09..47b83e8 100755 --- a/jamf2snipe +++ b/jamf2snipe @@ -949,6 +949,61 @@ def get_snipe_user_id(username): return "NotFound" +def get_snipe_locations(previous=[]): + locations_url = f"{snipe_base}/api/v1/locations" + payload = {"limit": 100, "offset": len(previous)} + logging.debug("The payload for the snipe locations GET is {}".format(payload)) + response = session.get( + locations_url, + headers=snipeheaders, + params=payload, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) + response_json = response.json() + current = response_json["rows"] + if previous: + current = previous + current + if response_json["total"] > len(current): + logging.debug( + "We have more than 100 locations, get the next page - total: {} current: {}".format( + response_json["total"], len(current) + ) + ) + return get_snipe_locations(current) + else: + return current + + +def get_snipe_location_id(location_name): + if location_name == "": + return "NotFound" + location_name = location_name.lower() + for location in snipe_locations: + for value in location.values(): + if str(value).lower() == location_name: + id = location["id"] + return id + logging.debug( + "No matches in snipe_locations for {}, querying the API for the next closest match".format( + location_name + ) + ) + location_id_url = "{}/api/v1/locations".format(snipe_base) + payload = {"search": location_name, "limit": 1, "sort": "name", "order": "asc"} + logging.debug("The payload for the snipe location search is: {}".format(payload)) + response = session.get( + location_id_url, + headers=snipeheaders, + params=payload, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) + try: + return response.json()["rows"][0]["id"] + except: + return "NotFound" + # Function that creates a new Snipe Model - not an asset - with a JSON payload def create_snipe_model(payload): api_url = "{}/api/v1/models".format(snipe_base) @@ -1096,7 +1151,28 @@ def checkin_snipe_asset(asset_id): # Function that checks out an asset in snipe -def checkout_snipe_asset(user, asset_id, checked_out_user=None): +def checkout_snipe_asset(location, asset_id, checked_out_user=None): + jamfsplit = config["user-mapping"]["jamf_api_field"].split() + checked_out = None + + user = None + building = None + for field in jamfsplit: + if field in location: + if field == "username": + user = location[field] + elif field == "building": + building = location[field] + + if user: + checked_out = checkout_snipe_asset_to_user(user, asset_id, checked_out_user) + + if not checked_out and building: + checked_out = checkout_snipe_asset_to_location(building, asset_id, checked_out_user) + + return checked_out + +def checkout_snipe_asset_to_user(user, asset_id, checked_out_user=None): logging.debug("Asset {} is being checked out to {}".format(user, asset_id)) user_id = get_snipe_user_id(user) if user_id == "NotFound": @@ -1142,6 +1218,36 @@ def checkout_snipe_asset(user, asset_id, checked_out_user=None): return response +def checkout_snipe_asset_to_location(location, asset_id, checked_out_user=None): + location_id = get_snipe_location_id(location) + api_url = "{}/api/v1/hardware/{}/checkout".format(snipe_base, asset_id) + logging.info("Checking out {} to check it out to {}".format(asset_id, location)) + payload = { + "checkout_to_type": "location", + "assigned_location": location_id, + "note": "Assignment made automatically, via script from Jamf.", + } + logging.debug("The payload for the snipe checkin is: {}".format(payload)) + response = session.post( + api_url, + headers=snipeheaders, + json=payload, + verify=user_args.do_not_verify_ssl, + hooks={"response": request_handler}, + ) + logging.debug("The response from Snipe IT is: {}".format(response.json())) + if response.status_code == 200: + logging.debug("Got back status code: 200 - {}".format(response.content)) + return "CheckedOut" + else: + logging.error( + "Asset checkout failed for asset {} with error {}".format( + asset_id, response.text + ) + ) + return response + + ### Run Testing ### # Report if we're verifying SSL or not. logging.info("SSL Verification is set to: {}".format(user_args.do_not_verify_ssl)) @@ -1239,6 +1345,7 @@ jamf_types = {"computers": jamf_computer_list, "mobile_devices": jamf_mobile_lis if user_args.users or user_args.users_force or user_args.users_inverse: snipe_users = get_snipe_users() + snipe_locations = get_snipe_locations() TotalNumber = 0 if user_args.computers: @@ -1461,7 +1568,7 @@ for jamf_type in jamf_types: ) ) checkout_snipe_asset( - jamf["{}".format(jamfsplit[0])]["{}".format(jamfsplit[1])], + jamf[jamfsplit[0]], new_snipe_asset[1].json()["payload"]["id"], "NewAsset", ) @@ -1585,7 +1692,7 @@ for jamf_type in jamf_types: ) continue checkout_snipe_asset( - jamf["{}".format(jamfsplit[0])]["{}".format(jamfsplit[1])], + jamf[jamfsplit[0]], snipe_id, snipe["rows"][0]["assigned_to"], ) From e30cf84e64d38ef08d56ac6d23ffde151f6a473e Mon Sep 17 00:00:00 2001 From: Jordi Bagot Date: Tue, 16 Dec 2025 23:24:24 +0100 Subject: [PATCH 3/3] Fix format with Black --- jamf2snipe | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/jamf2snipe b/jamf2snipe index 47b83e8..303a943 100755 --- a/jamf2snipe +++ b/jamf2snipe @@ -1004,6 +1004,7 @@ def get_snipe_location_id(location_name): except: return "NotFound" + # Function that creates a new Snipe Model - not an asset - with a JSON payload def create_snipe_model(payload): api_url = "{}/api/v1/models".format(snipe_base) @@ -1168,10 +1169,13 @@ def checkout_snipe_asset(location, asset_id, checked_out_user=None): checked_out = checkout_snipe_asset_to_user(user, asset_id, checked_out_user) if not checked_out and building: - checked_out = checkout_snipe_asset_to_location(building, asset_id, checked_out_user) + checked_out = checkout_snipe_asset_to_location( + building, asset_id, checked_out_user + ) return checked_out + def checkout_snipe_asset_to_user(user, asset_id, checked_out_user=None): logging.debug("Asset {} is being checked out to {}".format(user, asset_id)) user_id = get_snipe_user_id(user)