diff --git a/ayon_api/server_api.py b/ayon_api/server_api.py index 3390a287e..f1b7ff49c 100644 --- a/ayon_api/server_api.py +++ b/ayon_api/server_api.py @@ -282,7 +282,7 @@ def __contains__(self, key): return key in self.data def __repr__(self): - return "<{} [{}]>".format(self.__class__.__name__, self.status) + return f"<{self.__class__.__name__} [{self.status}]>" def __len__(self): return int(200 <= self.status < 400) @@ -314,10 +314,9 @@ def __len__(self): def __repr__(self): if self.errors: - return "<{} errors={}>".format( - self.__class__.__name__, self.errors[0]['message'] - ) - return "<{}>".format(self.__class__.__name__) + message = self.errors[0]["message"] + return f"<{self.__class__.__name__} errors={message}>" + return f"<{self.__class__.__name__}>" def fill_own_attribs(entity): @@ -475,12 +474,12 @@ def __init__( max_retries: Optional[int] = None, ): if not base_url: - raise ValueError("Invalid server URL {}".format(str(base_url))) + raise ValueError(f"Invalid server URL {str(base_url)}") base_url = base_url.rstrip("/") self._base_url: str = base_url - self._rest_url: str = "{}/api".format(base_url) - self._graphql_url: str = "{}/graphql".format(base_url) + self._rest_url: str = f"{base_url}/api" + self._graphql_url: str = f"{base_url}/graphql" self._log: logging.Logger = logging.getLogger(self.__class__.__name__) self._access_token: Optional[str] = token # Allow to have 'site_id' to 'None' @@ -939,9 +938,9 @@ def has_valid_token(self) -> bool: def validate_server_availability(self): if not self.is_server_available: - raise ServerNotReached("Server \"{}\" can't be reached".format( - self._base_url - )) + raise ServerNotReached( + f"Server \"{self._base_url}\" can't be reached" + ) def validate_token(self) -> bool: try: @@ -1236,7 +1235,7 @@ def get_user( raise UnauthorizedError("User is not authorized.") return output - response = self.get("users/{}".format(username)) + response = self.get(f"users/{username}") response.raise_for_status() return response.data @@ -1270,8 +1269,7 @@ def get_headers( if username: headers["X-as-user"] = username else: - headers["Authorization"] = "Bearer {}".format( - self._access_token) + headers["Authorization"] = f"Bearer {self._access_token}" return headers def login( @@ -1318,9 +1316,9 @@ def login( _detail = response.data.get("detail") details = "" if _detail: - details = " {}".format(_detail) + details = f" {_detail}" - raise AuthenticationError("Login failed {}".format(details)) + raise AuthenticationError(f"Login failed {details}") finally: self._token_validation_started = False @@ -1433,12 +1431,12 @@ def _do_rest_request(self, function, url, **kwargs): else: new_response = RestApiResponse(response) - self.log.debug("Response {}".format(str(new_response))) + self.log.debug(f"Response {str(new_response)}") return new_response def raw_post(self, entrypoint: str, **kwargs): url = self._endpoint_to_url(entrypoint) - self.log.debug("Executing [POST] {}".format(url)) + self.log.debug(f"Executing [POST] {url}") return self._do_rest_request( RequestTypes.post, url, @@ -1447,7 +1445,7 @@ def raw_post(self, entrypoint: str, **kwargs): def raw_put(self, entrypoint: str, **kwargs): url = self._endpoint_to_url(entrypoint) - self.log.debug("Executing [PUT] {}".format(url)) + self.log.debug(f"Executing [PUT] {url}") return self._do_rest_request( RequestTypes.put, url, @@ -1456,7 +1454,7 @@ def raw_put(self, entrypoint: str, **kwargs): def raw_patch(self, entrypoint: str, **kwargs): url = self._endpoint_to_url(entrypoint) - self.log.debug("Executing [PATCH] {}".format(url)) + self.log.debug(f"Executing [PATCH] {url}") return self._do_rest_request( RequestTypes.patch, url, @@ -1465,7 +1463,7 @@ def raw_patch(self, entrypoint: str, **kwargs): def raw_get(self, entrypoint: str, **kwargs): url = self._endpoint_to_url(entrypoint) - self.log.debug("Executing [GET] {}".format(url)) + self.log.debug(f"Executing [GET] {url}") return self._do_rest_request( RequestTypes.get, url, @@ -1474,7 +1472,7 @@ def raw_get(self, entrypoint: str, **kwargs): def raw_delete(self, entrypoint: str, **kwargs): url = self._endpoint_to_url(entrypoint) - self.log.debug("Executing [DELETE] {}".format(url)) + self.log.debug(f"Executing [DELETE] {url}") return self._do_rest_request( RequestTypes.delete, url, @@ -1509,7 +1507,7 @@ def get_event(self, event_id: str) -> Optional[Dict[str, Any]]: dict[str, Any]: Full event data. """ - response = self.get("events/{}".format(event_id)) + response = self.get(f"events/{event_id}") response.raise_for_status() return response.data @@ -1666,16 +1664,16 @@ def update_event( args.append("progress") if retries is not None: args.append("retries") - fields = ", ".join("'{}'".format(f) for f in args) + fields = ", ".join(f"'{f}'" for f in args) ending = "s" if len(args) > 1 else "" - raise ValueError(( - "Your server version '{}' does not support update" - " of {} field{} on event. The fields are supported since" - " server version '0.5'." - ).format(self.get_server_version(), fields, ending)) + raise ValueError( + f"Your server version '{self.server_version}' does not" + f" support update of {fields} field{ending} on event." + " The fields are supported since server version '0.5'." + ) response = self.patch( - "events/{}".format(event_id), + f"events/{event_id}", **kwargs ) response.raise_for_status() @@ -2487,7 +2485,7 @@ def upload_reviewable( filename = os.path.basename(filepath) headers["x-file-name"] = filename - query = f"?label={label}" if label else "" + query = prepare_query_string({"label": label or None}) endpoint = ( f"/projects/{project_name}" f"/versions/{version_id}/reviewables{query}" @@ -2551,7 +2549,7 @@ def get_server_schema(self) -> Optional[Dict[str, Any]]: dict[str, Any]: Full server schema. """ - url = "{}/openapi.json".format(self._base_url) + url = f"{self._base_url}/openapi.json" response = self._do_rest_request(RequestTypes.get, url) if response: return response.data @@ -2611,7 +2609,7 @@ def set_attribute_config( position = len(attributes) response = self.put( - "attributes/{}".format(attribute_name), + f"attributes/{attribute_name}", data=data, scope=scope, position=position, @@ -2620,9 +2618,8 @@ def set_attribute_config( if response.status_code != 204: # TODO raise different exception raise ValueError( - "Attribute \"{}\" was not created/updated. {}".format( - attribute_name, response.detail - ) + f"Attribute \"{attribute_name}\" was not created/updated." + f" {response.detail}" ) self.reset_attributes_schema() @@ -2636,11 +2633,10 @@ def remove_attribute_config(self, attribute_name: str): attribute_name (str): Name of attribute to remove. """ - response = self.delete("attributes/{}".format(attribute_name)) + response = self.delete(f"attributes/{attribute_name}") response.raise_for_status( - "Attribute \"{}\" was not created/updated. {}".format( - attribute_name, response.detail - ) + f"Attribute \"{attribute_name}\" was not created/updated." + f" {response.detail}" ) self.reset_attributes_schema() @@ -2709,7 +2705,7 @@ def get_attributes_fields_for_type( """ attributes = self.get_attributes_for_type(entity_type) return { - "attrib.{}".format(attr) + f"attrib.{attr}" for attr in attributes } @@ -2783,7 +2779,7 @@ def get_default_fields_for_type(self, entity_type: str) -> Set[str]: entity_type_defaults = set(DEFAULT_USER_FIELDS) else: - raise ValueError("Unknown entity type \"{}\"".format(entity_type)) + raise ValueError(f"Unknown entity type \"{entity_type}\"") return ( entity_type_defaults | self.get_attributes_fields_for_type(entity_type) @@ -2832,11 +2828,7 @@ def get_addon_endpoint( ending = "" if subpaths: ending = "/{}".format("/".join(subpaths)) - return "addons/{}/{}{}".format( - addon_name, - addon_version, - ending - ) + return f"addons/{addon_name}/{addon_version}{ending}" def get_addon_url( self, @@ -2938,19 +2930,11 @@ def get_installers( InstallersInfoDict: Information about installers known for server. """ - query_fields = [ - "{}={}".format(key, value) - for key, value in ( - ("version", version), - ("platform", platform_name), - ) - if value - ] - query = "" - if query_fields: - query = "?{}".format(",".join(query_fields)) - - response = self.get("desktop/installers{}".format(query)) + query = prepare_query_string({ + "version": version or None, + "platform": platform_name or None, + }) + response = self.get(f"desktop/installers{query}") response.raise_for_status() return response.data @@ -3019,7 +3003,7 @@ def update_installer(self, filename: str, sources: List[Dict[str, Any]]): """ response = self.patch( - "desktop/installers/{}".format(filename), + f"desktop/installers/{filename}", sources=sources ) response.raise_for_status() @@ -3031,7 +3015,7 @@ def delete_installer(self, filename: str): filename (str): Installer filename. """ - response = self.delete("desktop/installers/{}".format(filename)) + response = self.delete(f"desktop/installers/{filename}") response.raise_for_status() def download_installer( @@ -3052,7 +3036,7 @@ def download_installer( """ self.download_file( - "desktop/installers/{}".format(filename), + f"desktop/installers/{filename}", dst_filepath, chunk_size=chunk_size, progress=progress @@ -3077,7 +3061,7 @@ def upload_installer( """ return self.upload_file( - "desktop/installers/{}".format(dst_filename), + f"desktop/installers/{dst_filename}", src_filepath, progress=progress ) @@ -3087,7 +3071,7 @@ def _get_dependency_package_route( ) -> str: endpoint = "desktop/dependencyPackages" if filename: - return "{}/{}".format(endpoint, filename) + return f"{endpoint}/{filename}" return endpoint def get_dependency_packages(self) -> "DependencyPackagesDict": @@ -3313,10 +3297,9 @@ def delete_addon(self, addon_name: str, purge: Optional[bool] = None): purge (Optional[bool]): Purge all data related to the addon. """ - query_data = {} if purge is not None: - query_data["purge"] = "true" if purge else "false" - query = prepare_query_string(query_data) + purge = "true" if purge else "false" + query = prepare_query_string({"purge": purge}) response = self.delete(f"addons/{addon_name}{query}") response.raise_for_status() @@ -3337,10 +3320,9 @@ def delete_addon_version( purge (Optional[bool]): Purge all data related to the addon. """ - query_data = {} if purge is not None: - query_data["purge"] = "true" if purge else "false" - query = prepare_query_string(query_data) + purge = "true" if purge else "false" + query = prepare_query_string({"purge": purge}) response = self.delete(f"addons/{addon_name}/{addon_version}{query}") response.raise_for_status() @@ -3671,7 +3653,7 @@ def get_project_anatomy_preset( if (major, minor, patch) < (1, 0, 8): preset_name = self.get_default_anatomy_preset_name() - result = self.get("anatomy/presets/{}".format(preset_name)) + result = self.get(f"anatomy/presets/{preset_name}") result.raise_for_status() return result.data @@ -3715,7 +3697,7 @@ def get_project_root_overrides( dict[str, dict[str, str]]: Root values by root name by site id. """ - result = self.get("projects/{}/roots".format(project_name)) + result = self.get(f"projects/{project_name}/roots") result.raise_for_status() return result.data @@ -3843,12 +3825,9 @@ def _get_project_roots_values( platform_name = platform.system() query_data["platform"] = platform_name.lower() - query = "?{}".format(",".join([ - "{}={}".format(key, value) - for key, value in query_data.items() - ])) + query = prepare_query_string(query_data) response = self.get( - "projects/{}/siteRoots{}".format(project_name, query) + f"projects/{project_name}/siteRoots{query}" ) response.raise_for_status() return response.data @@ -3945,9 +3924,9 @@ def get_addon_site_settings_schema( dict[str, Any]: Schema of site settings. """ - result = self.get("addons/{}/{}/siteSettings/schema".format( - addon_name, addon_version - )) + result = self.get( + f"addons/{addon_name}/{addon_version}/siteSettings/schema" + ) result.raise_for_status() return result.data @@ -3974,10 +3953,7 @@ def get_addon_studio_settings( if variant is None: variant = self.default_settings_variant - query_items = {} - if variant: - query_items["variant"] = variant - query = prepare_query_string(query_items) + query = prepare_query_string({"variant": variant or None}) result = self.get( f"addons/{addon_name}/{addon_version}/settings{query}" @@ -4025,21 +4001,16 @@ def get_addon_project_settings( elif not site_id: site_id = self.site_id - query_items = {} - if site_id: - query_items["site"] = site_id - if variant is None: variant = self.default_settings_variant - if variant: - query_items["variant"] = variant - - query = prepare_query_string(query_items) + query = prepare_query_string({ + "site": site_id or None, + "variant": variant or None, + }) result = self.get( - "addons/{}/{}/settings/{}{}".format( - addon_name, addon_version, project_name, query - ) + f"addons/{addon_name}/{addon_version}" + f"/settings/{project_name}{query}" ) result.raise_for_status() return result.data @@ -4112,9 +4083,9 @@ def get_addon_site_settings( return {} query = prepare_query_string({"site": site_id}) - result = self.get("addons/{}/{}/siteSettings{}".format( - addon_name, addon_version, query - )) + result = self.get( + f"addons/{addon_name}/{addon_version}/siteSettings{query}" + ) result.raise_for_status() return result.data @@ -4157,23 +4128,18 @@ def get_bundle_settings( dict[str, Any]: All settings for single bundle. """ - query_values = { - key: value - for key, value in ( - ("project_name", project_name), - ("variant", variant or self.default_settings_variant), - ("bundle_name", bundle_name), - ) - if value - } - if use_site: - if not site_id: - site_id = self.site_id - if site_id: - query_values["site_id"] = site_id - - query = prepare_query_string(query_values) - response = self.get("settings{}".format(query)) + if not use_site: + site_id = None + elif not site_id: + site_id = self.site_id + + query = prepare_query_string({ + "project_name": project_name or None, + "bundle_name": bundle_name or None, + "variant": variant or self.default_settings_variant or None, + "site_id": site_id, + }) + response = self.get(f"settings{query}") response.raise_for_status() return response.data @@ -4412,7 +4378,7 @@ def delete_secret(self, secret_name: str): secret_name (str): Name of secret to delete. """ - response = self.delete("secrets/{}".format(secret_name)) + response = self.delete(f"secrets/{secret_name}") response.raise_for_status() return response.data @@ -4435,7 +4401,7 @@ def get_rest_project( if not project_name: return None - response = self.get("projects/{}".format(project_name)) + response = self.get(f"projects/{project_name}") # TODO ignore only error about not existing project if response.status != 200: return None @@ -4562,11 +4528,11 @@ def get_rest_folders( "Function 'get_folders_rest' is supported" " for AYON server 1.0.8 and above." ) - query = "?attrib={}".format( - "true" if include_attrib else "false" - ) + query = prepare_query_string({ + "attrib": "true" if include_attrib else "false" + }) response = self.get( - "projects/{}/folders{}".format(project_name, query) + f"projects/{project_name}/folders{query}" ) response.raise_for_status() return response.data["folders"] @@ -4612,20 +4578,15 @@ def get_project_names( list[str]: List of available project names. """ - query_keys = {} if active is not None: - query_keys["active"] = "true" if active else "false" + active = "true" if active else "false" if library is not None: - query_keys["library"] = "true" if library else "false" - query = "" - if query_keys: - query = "?{}".format(",".join([ - f"{key}={value}" - for key, value in query_keys.items() - ])) - - response = self.get(f"projects{query}", **query_keys) + library = "true" if library else "false" + + query = prepare_query_string({"active": active, "library": library}) + + response = self.get(f"projects{query}") response.raise_for_status() data = response.data project_names = [] @@ -4779,18 +4740,10 @@ def get_folders_hierarchy( if folder_types: folder_types = ",".join(folder_types) - query_fields = [ - "{}={}".format(key, value) - for key, value in ( - ("search", search_string), - ("types", folder_types), - ) - if value - ] - query = "" - if query_fields: - query = "?{}".format(",".join(query_fields)) - + query = prepare_query_string({ + "search": search_string or None, + "types": folder_types or None, + }) response = self.get( f"projects/{project_name}/hierarchy{query}" ) @@ -5209,7 +5162,7 @@ def create_folder( create_data[key] = value response = self.post( - "projects/{}/folders".format(project_name), + f"projects/{project_name}/folders", **create_data ) response.raise_for_status() @@ -5295,7 +5248,7 @@ def delete_folder( folder, products, versions and representations. """ - url = "projects/{}/folders/{}".format(project_name, folder_id) + url = f"projects/{project_name}/folders/{folder_id}" if force: url += "?force=true" response = self.delete(url) @@ -5715,7 +5668,7 @@ def create_task( create_data[key] = value response = self.post( - "projects/{}/tasks".format(project_name), + f"projects/{project_name}/tasks", **create_data ) response.raise_for_status() @@ -5787,7 +5740,7 @@ def update_task( update_data[key] = value response = self.patch( - "projects/{}/tasks/{}".format(project_name, task_id), + f"projects/{project_name}/tasks/{task_id}", **update_data ) response.raise_for_status() @@ -5801,7 +5754,7 @@ def delete_task(self, project_name: str, task_id: str): """ response = self.delete( - "projects/{}/tasks/{}".format(project_name, task_id) + f"projects/{project_name}/tasks/{task_id}" ) response.raise_for_status() @@ -6204,7 +6157,7 @@ def create_product( create_data[key] = value response = self.post( - "projects/{}/products".format(project_name), + f"projects/{project_name}/products", **create_data ) response.raise_for_status() @@ -6258,7 +6211,7 @@ def update_product( update_data[key] = value response = self.patch( - "projects/{}/products/{}".format(project_name, product_id), + f"projects/{project_name}/products/{product_id}", **update_data ) response.raise_for_status() @@ -6272,7 +6225,7 @@ def delete_product(self, project_name: str, product_id: str): """ response = self.delete( - "projects/{}/products/{}".format(project_name, product_id) + f"projects/{project_name}/products/{product_id}" ) response.raise_for_status() @@ -6814,7 +6767,7 @@ def create_version( create_data[key] = value response = self.post( - "projects/{}/versions".format(project_name), + f"projects/{project_name}/versions", **create_data ) response.raise_for_status() @@ -7451,7 +7404,7 @@ def get_repre_ids_by_context_filters( """ if not isinstance(context_filters, dict): raise TypeError( - "Expected 'dict' got {}".format(str(type(context_filters))) + f"Expected 'dict' got {str(type(context_filters))}" ) filter_body = {} @@ -7484,7 +7437,7 @@ def get_repre_ids_by_context_filters( }) response = self.post( - "projects/{}/repreContextFilter".format(project_name), + f"projects/{project_name}/repreContextFilter", context=body_context_filters, **filter_body ) @@ -7542,7 +7495,7 @@ def create_representation( create_data[key] = value response = self.post( - "projects/{}/representations".format(project_name), + f"projects/{project_name}/representations", **create_data ) response.raise_for_status() @@ -7597,9 +7550,7 @@ def update_representation( update_data[key] = value response = self.patch( - "projects/{}/representations/{}".format( - project_name, representation_id - ), + f"projects/{project_name}/representations/{representation_id}", **update_data ) response.raise_for_status() @@ -7615,9 +7566,7 @@ def delete_representation( """ response = self.delete( - "projects/{}/representation/{}".format( - project_name, representation_id - ) + f"projects/{project_name}/representation/{representation_id}" ) response.raise_for_status() @@ -7886,11 +7835,9 @@ def get_thumbnail( ): entity_type += "s" - response = self.raw_get("projects/{}/{}/{}/thumbnail".format( - project_name, - entity_type, - entity_id - )) + response = self.raw_get( + f"projects/{project_name}/{entity_type}/{entity_id}/thumbnail" + ) return self._prepare_thumbnail_content(project_name, response) def get_folder_thumbnail( @@ -7998,7 +7945,7 @@ def create_thumbnail( mime_type = get_media_mime_type(src_filepath) response = self.upload_file( - "projects/{}/thumbnails".format(project_name), + f"projects/{project_name}/thumbnails", src_filepath, request_type=RequestTypes.post, headers={"Content-Type": mime_type}, @@ -8069,14 +8016,14 @@ def create_project( """ if self.get_project(project_name): - raise ValueError("Project with name \"{}\" already exists".format( - project_name - )) + raise ValueError( + f"Project with name \"{project_name}\" already exists" + ) if not PROJECT_NAME_REGEX.match(project_name): - raise ValueError(( - "Project name \"{}\" contain invalid characters" - ).format(project_name)) + raise ValueError( + f"Project name \"{project_name}\" contain invalid characters" + ) preset = self.get_project_anatomy_preset(preset_name) @@ -8089,12 +8036,12 @@ def create_project( ) if result.status != 201: - details = "Unknown details ({})".format(result.status) + details = f"Unknown details ({result.status})" if result.data: details = result.data.get("detail") or details - raise ValueError("Failed to create project \"{}\": {}".format( - project_name, details - )) + raise ValueError( + f"Failed to create project \"{project_name}\": {details}" + ) return self.get_project(project_name) @@ -8157,7 +8104,7 @@ def update_project( if value is not None }) response = self.patch( - "projects/{}".format(project_name), + f"projects/{project_name}", **changes ) response.raise_for_status() @@ -8172,16 +8119,15 @@ def delete_project(self, project_name: str): """ if not self.get_project(project_name): - raise ValueError("Project with name \"{}\" was not found".format( - project_name - )) + raise ValueError( + f"Project with name \"{project_name}\" was not found" + ) - result = self.delete("projects/{}".format(project_name)) + result = self.delete(f"projects/{project_name}") if result.status_code != 204: + detail = result.data["detail"] raise ValueError( - "Failed to delete project \"{}\". {}".format( - project_name, result.data["detail"] - ) + f"Failed to delete project \"{project_name}\". {detail}" ) # --- Links --- @@ -8222,7 +8168,7 @@ def get_link_types(self, project_name: str) -> List[Dict[str, Any]]: list[dict[str, Any]]: Link types available on project. """ - response = self.get("projects/{}/links/types".format(project_name)) + response = self.get(f"projects/{project_name}/links/types") response.raise_for_status() return response.data["types"] @@ -8295,7 +8241,7 @@ def create_link_type( link_type_name, input_type, output_type ) response = self.put( - "projects/{}/links/types/{}".format(project_name, full_type_name), + f"projects/{project_name}/links/types/{full_type_name}", **data ) response.raise_for_status() @@ -8323,7 +8269,8 @@ def delete_link_type( link_type_name, input_type, output_type ) response = self.delete( - "projects/{}/links/types/{}".format(project_name, full_type_name)) + f"projects/{project_name}/links/types/{full_type_name}" + ) response.raise_for_status() def make_sure_link_type_exists( @@ -8407,10 +8354,10 @@ def create_link( ): kwargs["link"] = full_link_type_name if link_name: - raise UnsupportedServerVersion(( + raise UnsupportedServerVersion( "Link name is not supported" - " for version of AYON server {}" - ).format(self.server_version)) + f" for version of AYON server {self.server_version}" + ) else: kwargs["linkType"] = full_link_type_name @@ -8418,7 +8365,7 @@ def create_link( kwargs["name"] = link_name response = self.post( - "projects/{}/links".format(project_name), **kwargs + f"projects/{project_name}/links", **kwargs ) response.raise_for_status() return response.data @@ -8435,7 +8382,7 @@ def delete_link(self, project_name: str, link_id: str): """ response = self.delete( - "projects/{}/links/{}".format(project_name, link_id) + f"projects/{project_name}/links/{link_id}" ) response.raise_for_status() @@ -8587,7 +8534,7 @@ def get_entities_links( fields.discard("name") link_fields.discard("links") link_fields |= { - "links.{}".format(field) + f"links.{field}" for field in fields } # --------- @@ -9016,21 +8963,21 @@ def _prepare_fields( if "folderTypes" in fields: fields.remove("folderTypes") fields |= { - "folderTypes.{}".format(name) + f"folderTypes.{name}" for name in self.get_default_fields_for_type("folderType") } if "taskTypes" in fields: fields.remove("taskTypes") fields |= { - "taskTypes.{}".format(name) + f"taskTypes.{name}" for name in self.get_default_fields_for_type("taskType") } if "productTypes" in fields: fields.remove("productTypes") fields |= { - "productTypes.{}".format(name) + f"productTypes.{name}" for name in self.get_default_fields_for_type( "productType" ) diff --git a/ayon_api/utils.py b/ayon_api/utils.py index a0033672c..bd391b3cb 100644 --- a/ayon_api/utils.py +++ b/ayon_api/utils.py @@ -139,19 +139,29 @@ def is_valid(self) -> bool: ) -def prepare_query_string(key_values: Dict[str, Any]): +def prepare_query_string( + key_values: Dict[str, Any], skip_none: bool = True +) -> str: """Prepare data to query string. If there are any values a query starting with '?' is returned otherwise an empty string. Args: - dict[str, Any]: Query values. + key_values (dict[str, Any]): Query values. + skip_none (bool): Filter values which are 'None'. Returns: str: Query string. """ + if skip_none: + key_values = { + key: value + for key, value in key_values.items() + if value is not None + } + if not key_values: return "" return "?{}".format(urlencode(key_values))