diff --git a/setup.py b/setup.py index 747fc65..ab26c22 100644 --- a/setup.py +++ b/setup.py @@ -1,27 +1,27 @@ -from setuptools import setup, find_packages import os +from setuptools import find_packages, setup + this_directory = os.path.abspath(os.path.dirname(__file__)) -with open(os.path.join(this_directory, 'README.md'), encoding='utf-8') as f: +with open(os.path.join(this_directory, "README.md"), encoding="utf-8") as f: long_description = f.read() - setup( - name='trustauthx', - version='0.6.3', - description='Official connector SDK for TrustAuthx', + name="trustauthx", + version="0.6.3", + description="Official connector SDK for TrustAuthx", long_description=long_description, - long_description_content_type='text/markdown', # This is important! - author='moonlightnexus', - author_email='nexus@trustauthx.com', + long_description_content_type="text/markdown", # This is important! + author="moonlightnexus", + author_email="nexus@trustauthx.com", url="https://github.com/One-Click-Auth/TrustAuthx-Py-SDK.git", license="MIT", classifiers=[ - 'Development Status :: 4 - Beta', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: MIT License', - 'Programming Language :: Python :: 3.9', - ], + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3.9", + ], packages=find_packages(), install_requires=[ "certifi>=2023.5.7", @@ -37,11 +37,11 @@ "urllib3<=3.0.0", "charset-normalizer>=3.2.0", "python-jose>=3.3.0", - "python-dotenv==1.0.0" - ], + "python-dotenv==1.0.0", + ], entry_points={ - 'console_scripts': [ - 'trustauthx = trustauthx.cli:main', + "console_scripts": [ + "trustauthx = trustauthx.cli:main", ], }, ) diff --git a/trustauthx/authlite.py b/trustauthx/authlite.py index 7321046..4d43b3f 100644 --- a/trustauthx/authlite.py +++ b/trustauthx/authlite.py @@ -1,11 +1,14 @@ +import json +import sqlite3 + import requests -from requests.exceptions import HTTPError from jose import JWTError, jwt from jose.constants import ALGORITHMS -import json -import sqlite3 +from requests.exceptions import HTTPError + from .scheme import * + class _EdgeDBRoleQuery: """ A class for querying and managing roles and permissions. @@ -29,6 +32,7 @@ class _EdgeDBRoleQuery: count_roles(self): Returns the number of roles stored. """ + def __init__(self, roles, in_memory=True): """ Initializes the _EdgeDBRoleQuery instance. @@ -39,19 +43,28 @@ def __init__(self, roles, in_memory=True): """ self.in_memory = in_memory if self.in_memory: - self.roles = {role_id: permissions for role in roles for role_id, permissions in role.items()} + self.roles = { + role_id: permissions + for role in roles + for role_id, permissions in role.items() + } else: - self.conn = sqlite3.connect(':memory:') # replace ':memory:' with your database path + # replace ':memory:' with your database path + self.conn = sqlite3.connect(":memory:") self.cursor = self.conn.cursor() - self.cursor.execute(""" + self.cursor.execute( + """ CREATE TABLE IF NOT EXISTS roles ( role_id TEXT PRIMARY KEY, permissions TEXT ) - """) + """ + ) for role in roles: for role_id, permissions in role.items(): - self.cursor.execute("INSERT INTO roles VALUES (?, ?)", (role_id, permissions)) + self.cursor.execute( + "INSERT INTO roles VALUES (?, ?)", (role_id, permissions) + ) self.conn.commit() def query(self, role_id=None, permission_key=None): @@ -71,21 +84,33 @@ def query(self, role_id=None, permission_key=None): elif role_id: return self.roles.get(role_id, None) elif permission_key: - return {role_id: permissions[permission_key] for role_id, permissions in self.roles.items() if permission_key in permissions} + return { + role_id: permissions[permission_key] + for role_id, permissions in self.roles.items() + if permission_key in permissions + } else: return self.roles else: if role_id and permission_key: - self.cursor.execute("SELECT permissions FROM roles WHERE role_id = ?", (role_id,)) + self.cursor.execute( + "SELECT permissions FROM roles WHERE role_id = ?", (role_id,) + ) permissions = self.cursor.fetchone() if permissions: return permissions[0].get(permission_key, None) elif role_id: - self.cursor.execute("SELECT permissions FROM roles WHERE role_id = ?", (role_id,)) + self.cursor.execute( + "SELECT permissions FROM roles WHERE role_id = ?", (role_id,) + ) return self.cursor.fetchone() elif permission_key: self.cursor.execute("SELECT * FROM roles") - return {role_id: permissions[permission_key] for role_id, permissions in self.cursor.fetchall() if permission_key in permissions} + return { + role_id: permissions[permission_key] + for role_id, permissions in self.cursor.fetchall() + if permission_key in permissions + } else: self.cursor.execute("SELECT * FROM roles") return self.cursor.fetchall() @@ -103,9 +128,13 @@ def validate(self, role_id, permission_key, permission_val): bool: True if the permission value matches the expected value, False otherwise. """ if self.in_memory: - return self.roles.get(role_id, {}).get(permission_key, None) == permission_val + return ( + self.roles.get(role_id, {}).get(permission_key, None) == permission_val + ) else: - self.cursor.execute("SELECT permissions FROM roles WHERE role_id = ?", (role_id,)) + self.cursor.execute( + "SELECT permissions FROM roles WHERE role_id = ?", (role_id,) + ) permissions = self.cursor.fetchone() if permissions: return permissions[0].get(permission_key, None) == permission_val @@ -123,6 +152,7 @@ def count_roles(self): self.cursor.execute("SELECT COUNT(*) FROM roles") return self.cursor.fetchone()[0] + class _Roles(_EdgeDBRoleQuery): """ A class for managing roles and permissions in the EdgeDB system. @@ -151,7 +181,17 @@ class _Roles(_EdgeDBRoleQuery): delete_permission(self, rol_id, **Permission_): Deletes a permission from a role with the specified role ID. """ - def __init__(self, roles, org_id, api_key, signed_key, secret_key, API_BASE_URL, InMemory=True): + + def __init__( + self, + roles, + org_id, + api_key, + signed_key, + secret_key, + API_BASE_URL, + InMemory=True, + ): """ Initializes the _Roles instance. @@ -174,253 +214,241 @@ def __init__(self, roles, org_id, api_key, signed_key, secret_key, API_BASE_URL, def get_all_roles(self) -> GetAllRolesResponse: """ - Retrieves all roles and their permissions from the API. - - Returns: - List[Role]: A list of Role objects representing the roles and their permissions. - - - demo response ==> [ - { - "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", - "rol_id": "rol_gCD_ebc6f7715bb14554", - "name": "string", - "permissions": [ - { - "user": "administration" - } - ] - }, - { - "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", - "rol_id": "rol_Ahy_f51d73ff656545e5", - "name": "string", - "permissions": [] - }, - { - "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", - "rol_id": "rol_rce_474ae9e59b3d49ce", - "name": "string", - "permissions": [ - { - "user": "administration" - }, - { - "viewer": "administration" - }, - { - "maintainer": "administration" - } - ] - } -]""" - url = f'{self.API_BASE_URL}/rbac/role' - headers = {'accept': 'application/json'} + Retrieves all roles and their permissions from the API. + + Returns: + List[Role]: A list of Role objects representing the roles and their permissions. + + + demo response ==> [ + { + "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", + "rol_id": "rol_gCD_ebc6f7715bb14554", + "name": "string", + "permissions": [ + { + "user": "administration" + } + ] + }, + { + "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", + "rol_id": "rol_Ahy_f51d73ff656545e5", + "name": "string", + "permissions": [] + }, + { + "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", + "rol_id": "rol_rce_474ae9e59b3d49ce", + "name": "string", + "permissions": [ + { + "user": "administration" + }, + { + "viewer": "administration" + }, + { + "maintainer": "administration" + } + ] + } + ]""" + url = f"{self.API_BASE_URL}/rbac/role" + headers = {"accept": "application/json"} params = { - 'org_id': f'{self.org_id}', - 'api_key': f'{self.api_key}', - 'signed_key': f'{self._secret_key}' + "org_id": f"{self.org_id}", + "api_key": f"{self.api_key}", + "signed_key": f"{self._secret_key}", } response = requests.get(url, headers=headers, params=params) roles = [Role(**role_data) for role_data in response.json()] return GetAllRolesResponse(roles=[role.to_dict() for role in roles]) - def add_role(self, name, **Permission_) ->AddRoleResponse: - """ - Adds a new role with the specified name and permissions. - - Args: - name (str): The name of the new role. - **Permission_: Keyword arguments representing the permissions to be added to the new role. - - Returns: - AddRoleResponse: An AddRoleResponse object representing the newly created role. - - demo response ==> { - "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", - "rol_id": "rol_rce_474ae9e59b3d49ce", - "name": "string", - "permissions": [ - { - "user": "administration" - }, - { - "viewer": "administration" - }, - { - "maintainer": "administration" - } - ] -}""" - url = f'{self.API_BASE_URL}/rbac/role' - headers = { - 'accept': 'application/json', - 'Content-Type': 'application/json' - } + def add_role(self, name, **Permission_) -> AddRoleResponse: + """ + Adds a new role with the specified name and permissions. + + Args: + name (str): The name of the new role. + **Permission_: Keyword arguments representing the permissions to be added to the new role. + + Returns: + AddRoleResponse: An AddRoleResponse object representing the newly created role. + + demo response ==> { + "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", + "rol_id": "rol_rce_474ae9e59b3d49ce", + "name": "string", + "permissions": [ + { + "user": "administration" + }, + { + "viewer": "administration" + }, + { + "maintainer": "administration" + } + ] + }""" + url = f"{self.API_BASE_URL}/rbac/role" + headers = {"accept": "application/json", "Content-Type": "application/json"} params = { - 'org_id': f'{self.org_id}', - 'api_key': f'{self.api_key}', - 'signed_key': f'{self._secret_key}' + "org_id": f"{self.org_id}", + "api_key": f"{self.api_key}", + "signed_key": f"{self._secret_key}", } permissions = [{k: v} for k, v in Permission_.items()] - data = { - "org_id": f'{self.org_id}', - "name": name, - "permissions": permissions - } - response = requests.post(url, headers=headers, params=params, data=json.dumps(data)) + data = {"org_id": f"{self.org_id}", "name": name, "permissions": permissions} + response = requests.post( + url, headers=headers, params=params, data=json.dumps(data) + ) role_data = response.json() permissions = [Permission(**p) for p in role_data.get("permissions", [])] return AddRoleResponse( org_id=role_data.get("org_id"), rol_id=role_data.get("rol_id"), name=role_data.get("name"), - permissions=permissions + permissions=permissions, ) def delete_role(self, rol_id) -> DeleteRoleResponse: - """ - Deletes a role with the specified role ID. - - Args: - rol_id (str): The ID of the role to be deleted. - - Returns: - DeleteRoleResponse: A DeleteRoleResponse object representing the deleted role. - - demo response ==> { - "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", - "rol_id": "rol_YHV_78ae9006bcaa4c77", - "name": "string", - "permissions": [ - { - "user": "administration" - }, - { - "viewer": "administration" - }, - { - "maintainer": "administration" - } - ] -}""" - url = f'{self.API_BASE_URL}/rbac/role' - headers = { - 'accept': 'application/json', - 'Content-Type': 'application/json' - } + Deletes a role with the specified role ID. + + Args: + rol_id (str): The ID of the role to be deleted. + + Returns: + DeleteRoleResponse: A DeleteRoleResponse object representing the deleted role. + + demo response ==> { + "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", + "rol_id": "rol_YHV_78ae9006bcaa4c77", + "name": "string", + "permissions": [ + { + "user": "administration" + }, + { + "viewer": "administration" + }, + { + "maintainer": "administration" + } + ] + }""" + url = f"{self.API_BASE_URL}/rbac/role" + headers = {"accept": "application/json", "Content-Type": "application/json"} params = { - 'org_id': f'{self.org_id}', - 'api_key': f'{self.api_key}', - 'signed_key': f'{self._secret_key}' - } - data = { - "org_id": f'{self.org_id}', - "rol_id": rol_id + "org_id": f"{self.org_id}", + "api_key": f"{self.api_key}", + "signed_key": f"{self._secret_key}", } - response = requests.delete(url, headers=headers, params=params, data=json.dumps(data)) + data = {"org_id": f"{self.org_id}", "rol_id": rol_id} + response = requests.delete( + url, headers=headers, params=params, data=json.dumps(data) + ) role_data = response.json() permissions = [Permission(**p) for p in role_data.get("permissions", [])] return DeleteRoleResponse( org_id=role_data.get("org_id"), rol_id=role_data.get("rol_id"), name=role_data.get("name"), - permissions=permissions + permissions=permissions, ) def add_permission(self, rol_id, **Permission_) -> AddPermissionResponse: """ - Adds a new permission to a role with the specified role ID. - - Args: - rol_id (str): The ID of the role to which the permission should be added. - **Permission_: Keyword arguments representing the permissions to be added. - - Returns: - AddPermissionResponse: An AddPermissionResponse object representing the added permission. - - demo response ==> { - "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", - "rol_id": "rol_rce_474ae9e59b3d49ce", - "permissions": [ - { - "any": "view" ##only return added content - } - ] -}""" - url = f'{self.API_BASE_URL}/rbac/permission' - headers = { - 'accept': 'application/json', - 'Content-Type': 'application/json' - } + Adds a new permission to a role with the specified role ID. + + Args: + rol_id (str): The ID of the role to which the permission should be added. + **Permission_: Keyword arguments representing the permissions to be added. + + Returns: + AddPermissionResponse: An AddPermissionResponse object representing the added permission. + + demo response ==> { + "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", + "rol_id": "rol_rce_474ae9e59b3d49ce", + "permissions": [ + { + "any": "view" ##only return added content + } + ] + }""" + url = f"{self.API_BASE_URL}/rbac/permission" + headers = {"accept": "application/json", "Content-Type": "application/json"} params = { - 'org_id': f'{self.org_id}', - 'api_key': f'{self.api_key}', - 'signed_key': f'{self._secret_key}' + "org_id": f"{self.org_id}", + "api_key": f"{self.api_key}", + "signed_key": f"{self._secret_key}", } permissions = [{k: v} for k, v in Permission_.items()] data = { - "org_id": f'{self.org_id}', + "org_id": f"{self.org_id}", "rol_id": rol_id, - "permissions": permissions + "permissions": permissions, } - response = requests.post(url, headers=headers, params=params, data=json.dumps(data)) + response = requests.post( + url, headers=headers, params=params, data=json.dumps(data) + ) response_data = response.json() permissions = [{k: v} for k, v in permissions.items()] return AddPermissionResponse( org_id=response_data.get("org_id"), rol_id=response_data.get("rol_id"), - permissions=permissions + permissions=permissions, ) def delete_permission(self, rol_id, **Permission_) -> DeletePermissionResponse: """ - Deletes a permission from a role with the specified role ID. - - Args: - rol_id (str): The ID of the role from which the permission should be deleted. - **Permission_: Keyword arguments representing the permissions to be deleted. - - Returns: - DeletePermissionResponse: A DeletePermissionResponse object representing the role with the deleted permission. - - demo response ==> { - "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", - "rol_id": "rol_rce_474ae9e59b3d49ce", - "permissions": [ - { - "user": "administration" - }, - { - "viewer": "administration" - }, - { - "maintainer": "administration" - } - ] -}""" #return full - url = f'{self.API_BASE_URL}/rbac/permission' - headers = { - 'accept': 'application/json', - 'Content-Type': 'application/json' - } + Deletes a permission from a role with the specified role ID. + + Args: + rol_id (str): The ID of the role from which the permission should be deleted. + **Permission_: Keyword arguments representing the permissions to be deleted. + + Returns: + DeletePermissionResponse: A DeletePermissionResponse object representing the role with the deleted permission. + + demo response ==> { + "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", + "rol_id": "rol_rce_474ae9e59b3d49ce", + "permissions": [ + { + "user": "administration" + }, + { + "viewer": "administration" + }, + { + "maintainer": "administration" + } + ] + }""" # return full + url = f"{self.API_BASE_URL}/rbac/permission" + headers = {"accept": "application/json", "Content-Type": "application/json"} params = { - 'org_id': f'{self.org_id}', - 'api_key': f'{self.api_key}', - 'signed_key': f'{self._secret_key}' + "org_id": f"{self.org_id}", + "api_key": f"{self.api_key}", + "signed_key": f"{self._secret_key}", } permissions = [{k: v} for k, v in Permission_.items()] data = { - "org_id": f'{self.org_id}', + "org_id": f"{self.org_id}", "rol_id": rol_id, - "permissions": permissions + "permissions": permissions, } - response = requests.delete(url, headers=headers, params=params, data=json.dumps(data)) + response = requests.delete( + url, headers=headers, params=params, data=json.dumps(data) + ) return response.json() - -class AuthLiteClient(): + +class AuthLiteClient: """ AuthLiteClient is a Python client for the TrustAuthX authentication service. @@ -476,33 +504,52 @@ class TokenCheck: refresh (str): The refresh token. state (bool): The state of the tokens (True if valid, False otherwise). """ - access :str - refresh:str - state:bool - def __init__(self, api_key, secret_key, org_id=None, API_BASE_URL="https://api.trustauthx.com", in_memory=True): + access: str + refresh: str + state: bool + + def __init__( + self, + api_key, + secret_key, + org_id=None, + API_BASE_URL="https://api.trustauthx.com", + in_memory=True, + ): """ - Initializes the AuthLiteClient instance. + Initializes the AuthLiteClient instance. + + Args: + api_key (str): The API key used for authentication. + secret_key (str): The secret key used for JWT encoding. + org_id (str, optional): The organization ID for generating authentication URLs. + API_BASE_URL (str, optional): The base URL for the API. Defaults to "https://api.trustauthx.com". + in_memory (bool, optional): Flag indicating whether to store the roles in-memory or in a SQLite database. Defaults to True (ie. in-memory). - Args: - api_key (str): The API key used for authentication. - secret_key (str): The secret key used for JWT encoding. - org_id (str, optional): The organization ID for generating authentication URLs. - API_BASE_URL (str, optional): The base URL for the API. Defaults to "https://api.trustauthx.com". - in_memory (bool, optional): Flag indicating whether to store the roles in-memory or in a SQLite database. Defaults to True (ie. in-memory). - - """ - self.jwt_encode = lambda key, data: jwt.encode(data, key=key, algorithm= ALGORITHMS.HS256) - self.jwt_decode = lambda key, data: jwt.decode(str(data), key=key, algorithms=ALGORITHMS.HS256) + """ + self.jwt_encode = lambda key, data: jwt.encode( + data, key=key, algorithm=ALGORITHMS.HS256 + ) + self.jwt_decode = lambda key, data: jwt.decode( + str(data), key=key, algorithms=ALGORITHMS.HS256 + ) self.secret_key = secret_key self.api_key = api_key self.org_id = org_id - self.signed_key = self.jwt_encode(key=self.secret_key, data={"api_key":self.api_key}) + self.signed_key = self.jwt_encode( + key=self.secret_key, data={"api_key": self.api_key} + ) self.API_BASE_URL = API_BASE_URL - self.Roles: _Roles = _Roles(roles=self._set_edge_roles(), org_id=self.org_id, - api_key=self.api_key, signed_key=self.signed_key, - secret_key=self.secret_key, API_BASE_URL=self.API_BASE_URL, - InMemory=in_memory) + self.Roles: _Roles = _Roles( + roles=self._set_edge_roles(), + org_id=self.org_id, + api_key=self.api_key, + signed_key=self.signed_key, + secret_key=self.secret_key, + API_BASE_URL=self.API_BASE_URL, + InMemory=in_memory, + ) def generate_url(self) -> str: """ @@ -510,13 +557,15 @@ def generate_url(self) -> str: Returns: str: The generated authentication URL. - + Raises: ValueError: If org_id is not provided. """ # Generate an authentication url for the given org - if self.org_id:return f"https://app.trustauthx.com/widget/login/?org_id={self.org_id}" - else:raise ValueError("must provide org_id") + if self.org_id: + return f"https://app.trustauthx.com/widget/login/?org_id={self.org_id}" + else: + raise ValueError("must provide org_id") def generate_edit_user_url(self, access_token, url) -> str: """ @@ -530,15 +579,15 @@ def generate_edit_user_url(self, access_token, url) -> str: str: The generated authentication URL. """ # Generate an authentication url for the given org - headers = {'accept': 'application/json'} + headers = {"accept": "application/json"} params = { - 'AccessToken': access_token, - 'api_key': self.api_key, - 'signed_key': self.signed_key, - 'url':url - } + "AccessToken": access_token, + "api_key": self.api_key, + "signed_key": self.signed_key, + "url": url, + } url = f"{self.API_BASE_URL}/api/user/me/settings/" - req = requests.Request('GET', url, params=params, headers=headers).prepare() + req = requests.Request("GET", url, params=params, headers=headers).prepare() return req.url def re_auth(self, code): @@ -555,25 +604,22 @@ def re_auth(self, code): HTTPError: If the request fails with an HTTP error status code. """ url = f"{self.API_BASE_URL}/api/user/me/widget/re-auth/token" - params = { - "code": code, - 'api_key': self.api_key, - 'signed_key': self.signed_key - } + params = {"code": code, "api_key": self.api_key, "signed_key": self.signed_key} headers = {"accept": "application/json"} response = requests.get(url, headers=headers, params=params) if response.status_code == 200: - rtn = self.jwt_decode(self.secret_key,response.json()) + rtn = self.jwt_decode(self.secret_key, response.json()) sub = json.loads(rtn["sub"]) rtn.pop("sub") rtn["email"] = sub["email"] rtn["uid"] = sub["uid"] return rtn - else:raise HTTPError( - 'Request failed with status code : {} \n this code contains a msg : {}'.format( - response.status_code, - response.text) - ) + else: + raise HTTPError( + "Request failed with status code : {} \n this code contains a msg : {}".format( + response.status_code, response.text + ) + ) def get_user(self, token) -> dict: """ @@ -589,26 +635,27 @@ def get_user(self, token) -> dict: HTTPError: If the request fails with an HTTP error status code. """ # Validate the given authentication token - url = f'{self.API_BASE_URL}/api/user/me/auth/data' - headers = {'accept': 'application/json'} + url = f"{self.API_BASE_URL}/api/user/me/auth/data" + headers = {"accept": "application/json"} params = { - 'UserToken': token, - 'api_key': self.api_key, - 'signed_key': self.signed_key - } + "UserToken": token, + "api_key": self.api_key, + "signed_key": self.signed_key, + } response = requests.get(url, headers=headers, params=params) if response.status_code == 200: - rtn = self.jwt_decode(self.secret_key,response.json()) + rtn = self.jwt_decode(self.secret_key, response.json()) sub = json.loads(rtn["sub"]) rtn.pop("sub") rtn["email"] = sub["email"] rtn["uid"] = sub["uid"] return rtn - else:raise HTTPError( - 'Request failed with status code : {} \n this code contains a msg : {}'.format( - response.status_code, - response.text) - ) + else: + raise HTTPError( + "Request failed with status code : {} \n this code contains a msg : {}".format( + response.status_code, response.text + ) + ) def get_user_data(self, AccessToken) -> dict: """ @@ -625,22 +672,23 @@ def get_user_data(self, AccessToken) -> dict: """ # Validate the given authentication token """returns a dict containing 'access_token', 'refresh_token', 'img', 'sub'""" - url = f'{self.API_BASE_URL}/api/user/me/data' - headers = {'accept': 'application/json'} + url = f"{self.API_BASE_URL}/api/user/me/data" + headers = {"accept": "application/json"} params = { - 'AccessToken': AccessToken, - 'api_key': self.api_key, - 'signed_key': self.signed_key - } + "AccessToken": AccessToken, + "api_key": self.api_key, + "signed_key": self.signed_key, + } response = requests.get(url, headers=headers, params=params) if response.status_code == 200: - rtn = self.jwt_decode(self.secret_key,response.json()) + rtn = self.jwt_decode(self.secret_key, response.json()) return rtn - else:raise HTTPError( - 'Request failed with status code : {} \n this code contains a msg : {}'.format( - response.status_code, - response.text) - ) + else: + raise HTTPError( + "Request failed with status code : {} \n this code contains a msg : {}".format( + response.status_code, response.text + ) + ) def get_access_token_from_refresh_token(self, refresh_token): """ @@ -656,20 +704,22 @@ def get_access_token_from_refresh_token(self, refresh_token): HTTPError: If the request fails with an HTTP error status code. """ # Store the given authentication token - url = f'{self.API_BASE_URL}/api/user/me/access/token/' - headers = {'accept': 'application/json'} + url = f"{self.API_BASE_URL}/api/user/me/access/token/" + headers = {"accept": "application/json"} params = { - 'RefreshToken': refresh_token, - 'api_key': self.api_key, - 'signed_key': self.signed_key - } + "RefreshToken": refresh_token, + "api_key": self.api_key, + "signed_key": self.signed_key, + } response = requests.get(url, headers=headers, params=params) - if response.status_code == 200:return response.json() - else:raise HTTPError( - 'Request failed with status code : {} \n this code contains a msg : {}'.format( - response.status_code, - response.text) - ) + if response.status_code == 200: + return response.json() + else: + raise HTTPError( + "Request failed with status code : {} \n this code contains a msg : {}".format( + response.status_code, response.text + ) + ) def validate_access_token(self, access_token) -> bool: """ @@ -682,17 +732,22 @@ def validate_access_token(self, access_token) -> bool: bool: True if the access token is valid, False otherwise. """ # Store the given authentication token - url = f'{self.API_BASE_URL}/api/user/me/auth/validate/token' - headers = {'accept': 'application/json'} + url = f"{self.API_BASE_URL}/api/user/me/auth/validate/token" + headers = {"accept": "application/json"} params = { - 'AccessToken': access_token, - 'api_key': self.api_key, - 'signed_key': self.signed_key - } + "AccessToken": access_token, + "api_key": self.api_key, + "signed_key": self.signed_key, + } response = requests.get(url, headers=headers, params=params) return response.status_code == 200 - def revoke_token(self,AccessToken:str=None, RefreshToken:str = None, revoke_all_tokens:bool = False) -> bool: + def revoke_token( + self, + AccessToken: str = None, + RefreshToken: str = None, + revoke_all_tokens: bool = False, + ) -> bool: """ Revokes an access token or refresh token. @@ -708,22 +763,28 @@ def revoke_token(self,AccessToken:str=None, RefreshToken:str = None, revoke_all_ HTTPError: If the request fails with an HTTP error status code. AttributeError: If neither AccessToken nor RefreshToken is provided. """ - url = f'{self.API_BASE_URL}/api/user/me/token/' - headers = {'accept': 'application/json'} - if not AccessToken and not RefreshToken:raise AttributeError("must provide either AccessToken or RefreshToken") - tt=True if AccessToken else False + url = f"{self.API_BASE_URL}/api/user/me/token/" + headers = {"accept": "application/json"} + if not AccessToken and not RefreshToken: + raise AttributeError("must provide either AccessToken or RefreshToken") + tt = True if AccessToken else False t = AccessToken if AccessToken else RefreshToken params = { - 'Token': t, - 'api_key': self.api_key, - 'signed_key': self.signed_key, - 'AccessToken': tt, - 'SpecificTokenOnly':not revoke_all_tokens, - } + "Token": t, + "api_key": self.api_key, + "signed_key": self.signed_key, + "AccessToken": tt, + "SpecificTokenOnly": not revoke_all_tokens, + } response = requests.delete(url, headers=headers, params=params) - if response.status_code == 200:return response.json() - else:raise HTTPError( - 'Request failed with status code : {} \n this code contains a msg : {}'.format(response.status_code, response.text)) + if response.status_code == 200: + return response.json() + else: + raise HTTPError( + "Request failed with status code : {} \n this code contains a msg : {}".format( + response.status_code, response.text + ) + ) def validate_token_set(self, access_token, refresh_token) -> TokenCheck: """ @@ -746,8 +807,8 @@ def validate_token_set(self, access_token, refresh_token) -> TokenCheck: if refresh_token: new_tokens = self.get_access_token_from_refresh_token(refresh_token) d.state = False - d.access = new_tokens['access_token'] - d.refresh = new_tokens['refresh_token'] + d.access = new_tokens["access_token"] + d.refresh = new_tokens["refresh_token"] return d else: d.state = True @@ -755,7 +816,7 @@ def validate_token_set(self, access_token, refresh_token) -> TokenCheck: d.refresh = refresh_token return d except: - raise HTTPError('both tokens are invalid login again') - + raise HTTPError("both tokens are invalid login again") + def _set_edge_roles(self) -> list: return [] diff --git a/trustauthx/scheme.py b/trustauthx/scheme.py index 2d902b3..5bb5cb5 100644 --- a/trustauthx/scheme.py +++ b/trustauthx/scheme.py @@ -1,5 +1,6 @@ -from dataclasses import dataclass, asdict -from typing import List, Dict, Union +from dataclasses import asdict, dataclass +from typing import Dict, List, Union + @dataclass class Permission: @@ -10,9 +11,11 @@ class Permission: name (str): The name of the permission. value (str): The value of the permission. """ + name: str value: str + @dataclass class Role: """ @@ -24,20 +27,24 @@ class Role: name (str): The name of the role. permissions (List[Permission]): A list of permissions associated with the role. """ + org_id: str rol_id: str name: str permissions: List[Permission] + @dataclass class Permission: name: str value: str + @dataclass class GetAllRolesResponse: roles: List[Dict[str, Union[str, List[Dict[str, str]]]]] + @dataclass class AddRoleResponse: org_id: str @@ -45,6 +52,7 @@ class AddRoleResponse: name: str permissions: List[Permission] + @dataclass class DeleteRoleResponse: org_id: str @@ -52,12 +60,14 @@ class DeleteRoleResponse: name: str permissions: List[Permission] + @dataclass class AddPermissionResponse: org_id: str rol_id: str permissions: List[Dict[str, str]] + @dataclass class DeletePermissionResponse: org_id: str @@ -65,7 +75,6 @@ class DeletePermissionResponse: permissions: List[Permission] - """# Demo data demo_get_all_roles_response = GetAllRolesResponse(roles=[ { @@ -124,4 +133,4 @@ class DeletePermissionResponse: Permission(name="maintainer", value="administration") ] ) -""" \ No newline at end of file +"""