From fc7443107c2b44f57b849a84c595ff95e556aca7 Mon Sep 17 00:00:00 2001 From: "deepsource-autofix[bot]" <62050782+deepsource-autofix[bot]@users.noreply.github.com> Date: Fri, 15 Dec 2023 09:07:02 +0000 Subject: [PATCH] style: format code with Autopep8, Black, isort and Yapf This commit fixes the style issues introduced in e4085d6 according to the output from Autopep8, Black, isort and Yapf. Details: None --- setup.py | 36 ++++---- trustauthx/authlite.py | 190 ++++++++++++++++++++++------------------- trustauthx/cli.py | 111 ++++++++++++++---------- 3 files changed, 186 insertions(+), 151 deletions(-) diff --git a/setup.py b/setup.py index 7105715..dfc5f56 100644 --- a/setup.py +++ b/setup.py @@ -1,27 +1,27 @@ -from setuptools import setup, find_packages import os +from setuptools import find_packages, setup + this_directory = os.path.abspath(os.path.dirname(__file__)) -with open(os.path.join(this_directory, 'README.md'), encoding='utf-8') as f: +with open(os.path.join(this_directory, "README.md"), encoding="utf-8") as f: long_description = f.read() - setup( - name='trustauthx', - version='0.5.5', - description='Official connector SDK for TrustAuthx', + name="trustauthx", + version="0.5.5", + description="Official connector SDK for TrustAuthx", long_description=long_description, - long_description_content_type='text/markdown', # This is important! - author='moonlightnexus', - author_email='nexus@trustauthx.com', + long_description_content_type="text/markdown", # This is important! + author="moonlightnexus", + author_email="nexus@trustauthx.com", url="https://github.com/One-Click-Auth/TrustAuthx-Py-SDK.git", license="MIT", classifiers=[ - 'Development Status :: 4 - Beta', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: MIT License', - 'Programming Language :: Python :: 3.9', - ], + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3.9", + ], packages=find_packages(), install_requires=[ "certifi>=2023.5.7", @@ -37,11 +37,11 @@ "urllib3<=3.0.0", "charset-normalizer>=3.2.0", "python-jose>=3.3.0", - "python-dotenv==1.0.0" - ], + "python-dotenv==1.0.0", + ], entry_points={ - 'console_scripts': [ - 'trustauthx = trustauthx.cli:main', + "console_scripts": [ + "trustauthx = trustauthx.cli:main", ], }, ) diff --git a/trustauthx/authlite.py b/trustauthx/authlite.py index 59ffa64..a2e0cdc 100644 --- a/trustauthx/authlite.py +++ b/trustauthx/authlite.py @@ -1,13 +1,14 @@ +import json + import requests -from requests.exceptions import HTTPError from jose import JWTError, jwt from jose.constants import ALGORITHMS -import json +from requests.exceptions import HTTPError # authx/authlite.py -class AuthLiteClient: +class AuthLiteClient: """ AuthLiteClient is a Python client for the TrustAuthX authentication service. @@ -59,9 +60,10 @@ class TokenCheck: refresh (str): The refresh token. state (bool): The state of the tokens (True if valid, False otherwise). """ - access :str - refresh:str - state:bool + + access: str + refresh: str + state: bool def __init__(self, api_key, secret_key, org_id=None): """ @@ -75,12 +77,15 @@ def __init__(self, api_key, secret_key, org_id=None): Returns: None """ - self.jwt_encode = lambda key, data: jwt.encode(data, key=key, algorithm= ALGORITHMS.HS256) - self.jwt_decode = lambda key, data: jwt.decode(str(data), key=key, algorithms=ALGORITHMS.HS256) + self.jwt_encode = lambda key, data: jwt.encode( + data, key=key, algorithm=ALGORITHMS.HS256) + self.jwt_decode = lambda key, data: jwt.decode( + str(data), key=key, algorithms=ALGORITHMS.HS256) self.secret_key = secret_key self.api_key = api_key self.org_id = org_id - self.signed_key = self.jwt_encode(key=self.secret_key, data={"api_key":self.api_key}) + self.signed_key = self.jwt_encode(key=self.secret_key, + data={"api_key": self.api_key}) def generate_url(self) -> str: """ @@ -88,13 +93,15 @@ def generate_url(self) -> str: Returns: str: The generated authentication URL. - + Raises: ValueError: If org_id is not provided. """ # Generate an authentication url for the given org - if self.org_id:return f"https://app.trustauthx.com/widget/login/?org_id={self.org_id}" - else:raise ValueError("must provide org_id") + if self.org_id: + return f"https://app.trustauthx.com/widget/login/?org_id={self.org_id}" + else: + raise ValueError("must provide org_id") def generate_edit_user_url(self, access_token, url) -> str: """ @@ -108,15 +115,16 @@ def generate_edit_user_url(self, access_token, url) -> str: str: The generated authentication URL. """ # Generate an authentication url for the given org - headers = {'accept': 'application/json'} + headers = {"accept": "application/json"} params = { - 'AccessToken': access_token, - 'api_key': self.api_key, - 'signed_key': self.signed_key, - 'url':url - } + "AccessToken": access_token, + "api_key": self.api_key, + "signed_key": self.signed_key, + "url": url, + } url = "https://api.trustauthx.com/api/user/me/settings/" - req = requests.Request('GET', url, params=params, headers=headers).prepare() + req = requests.Request("GET", url, params=params, + headers=headers).prepare() return req.url def re_auth(self, code): @@ -135,23 +143,22 @@ def re_auth(self, code): url = "https://api.trustauthx.com/api/user/me/widget/re-auth/token" params = { "code": code, - 'api_key': self.api_key, - 'signed_key': self.signed_key + "api_key": self.api_key, + "signed_key": self.signed_key } headers = {"accept": "application/json"} response = requests.get(url, headers=headers, params=params) if response.status_code == 200: - rtn = self.jwt_decode(self.secret_key,response.json()) + rtn = self.jwt_decode(self.secret_key, response.json()) sub = json.loads(rtn["sub"]) rtn.pop("sub") rtn["email"] = sub["email"] rtn["uid"] = sub["uid"] return rtn - else:raise HTTPError( - 'Request failed with status code : {} \n this code contains a msg : {}'.format( - response.status_code, - response.text) - ) + else: + raise HTTPError( + "Request failed with status code : {} \n this code contains a msg : {}" + .format(response.status_code, response.text)) def get_user(self, token) -> dict: """ @@ -167,26 +174,25 @@ def get_user(self, token) -> dict: HTTPError: If the request fails with an HTTP error status code. """ # Validate the given authentication token - url = 'https://api.trustauthx.com/api/user/me/auth/data' - headers = {'accept': 'application/json'} + url = "https://api.trustauthx.com/api/user/me/auth/data" + headers = {"accept": "application/json"} params = { - 'UserToken': token, - 'api_key': self.api_key, - 'signed_key': self.signed_key - } + "UserToken": token, + "api_key": self.api_key, + "signed_key": self.signed_key, + } response = requests.get(url, headers=headers, params=params) if response.status_code == 200: - rtn = self.jwt_decode(self.secret_key,response.json()) + rtn = self.jwt_decode(self.secret_key, response.json()) sub = json.loads(rtn["sub"]) rtn.pop("sub") rtn["email"] = sub["email"] rtn["uid"] = sub["uid"] return rtn - else:raise HTTPError( - 'Request failed with status code : {} \n this code contains a msg : {}'.format( - response.status_code, - response.text) - ) + else: + raise HTTPError( + "Request failed with status code : {} \n this code contains a msg : {}" + .format(response.status_code, response.text)) def get_user_data(self, AccessToken) -> dict: """ @@ -203,22 +209,21 @@ def get_user_data(self, AccessToken) -> dict: """ # Validate the given authentication token """returns a dict containing 'access_token', 'refresh_token', 'img', 'sub'""" - url = 'https://api.trustauthx.com/api/user/me/data' - headers = {'accept': 'application/json'} + url = "https://api.trustauthx.com/api/user/me/data" + headers = {"accept": "application/json"} params = { - 'AccessToken': AccessToken, - 'api_key': self.api_key, - 'signed_key': self.signed_key - } + "AccessToken": AccessToken, + "api_key": self.api_key, + "signed_key": self.signed_key, + } response = requests.get(url, headers=headers, params=params) if response.status_code == 200: - rtn = self.jwt_decode(self.secret_key,response.json()) + rtn = self.jwt_decode(self.secret_key, response.json()) return rtn - else:raise HTTPError( - 'Request failed with status code : {} \n this code contains a msg : {}'.format( - response.status_code, - response.text) - ) + else: + raise HTTPError( + "Request failed with status code : {} \n this code contains a msg : {}" + .format(response.status_code, response.text)) def get_access_token_from_refresh_token(self, refresh_token): """ @@ -234,20 +239,20 @@ def get_access_token_from_refresh_token(self, refresh_token): HTTPError: If the request fails with an HTTP error status code. """ # Store the given authentication token - url = 'https://api.trustauthx.com/api/user/me/access/token/' - headers = {'accept': 'application/json'} + url = "https://api.trustauthx.com/api/user/me/access/token/" + headers = {"accept": "application/json"} params = { - 'RefreshToken': refresh_token, - 'api_key': self.api_key, - 'signed_key': self.signed_key - } + "RefreshToken": refresh_token, + "api_key": self.api_key, + "signed_key": self.signed_key, + } response = requests.get(url, headers=headers, params=params) - if response.status_code == 200:return response.json() - else:raise HTTPError( - 'Request failed with status code : {} \n this code contains a msg : {}'.format( - response.status_code, - response.text) - ) + if response.status_code == 200: + return response.json() + else: + raise HTTPError( + "Request failed with status code : {} \n this code contains a msg : {}" + .format(response.status_code, response.text)) def validate_access_token(self, access_token) -> bool: """ @@ -260,17 +265,22 @@ def validate_access_token(self, access_token) -> bool: bool: True if the access token is valid, False otherwise. """ # Store the given authentication token - url = 'https://api.trustauthx.com/api/user/me/auth/validate/token' - headers = {'accept': 'application/json'} + url = "https://api.trustauthx.com/api/user/me/auth/validate/token" + headers = {"accept": "application/json"} params = { - 'AccessToken': access_token, - 'api_key': self.api_key, - 'signed_key': self.signed_key - } + "AccessToken": access_token, + "api_key": self.api_key, + "signed_key": self.signed_key, + } response = requests.get(url, headers=headers, params=params) return response.status_code == 200 - def revoke_token(self,AccessToken:str=None, RefreshToken:str = None, revoke_all_tokens:bool = False) -> bool: + def revoke_token( + self, + AccessToken: str = None, + RefreshToken: str = None, + revoke_all_tokens: bool = False, + ) -> bool: """ Revokes an access token or refresh token. @@ -286,22 +296,27 @@ def revoke_token(self,AccessToken:str=None, RefreshToken:str = None, revoke_all_ HTTPError: If the request fails with an HTTP error status code. AttributeError: If neither AccessToken nor RefreshToken is provided. """ - url = 'https://api.trustauthx.com/api/user/me/token/' - headers = {'accept': 'application/json'} - if not AccessToken and not RefreshToken:raise AttributeError("must provide either AccessToken or RefreshToken") - tt=True if AccessToken else False + url = "https://api.trustauthx.com/api/user/me/token/" + headers = {"accept": "application/json"} + if not AccessToken and not RefreshToken: + raise AttributeError( + "must provide either AccessToken or RefreshToken") + tt = True if AccessToken else False t = AccessToken if AccessToken else RefreshToken params = { - 'Token': t, - 'api_key': self.api_key, - 'signed_key': self.signed_key, - 'AccessToken': tt, - 'SpecificTokenOnly':not revoke_all_tokens, - } + "Token": t, + "api_key": self.api_key, + "signed_key": self.signed_key, + "AccessToken": tt, + "SpecificTokenOnly": not revoke_all_tokens, + } response = requests.delete(url, headers=headers, params=params) - if response.status_code == 200:return response.json() - else:raise HTTPError( - 'Request failed with status code : {} \n this code contains a msg : {}'.format(response.status_code, response.text)) + if response.status_code == 200: + return response.json() + else: + raise HTTPError( + "Request failed with status code : {} \n this code contains a msg : {}" + .format(response.status_code, response.text)) def validate_token_set(self, access_token, refresh_token) -> TokenCheck: """ @@ -322,10 +337,11 @@ def validate_token_set(self, access_token, refresh_token) -> TokenCheck: is_valid = self.validate_access_token(access_token) if not is_valid: if refresh_token: - new_tokens = self.get_access_token_from_refresh_token(refresh_token) + new_tokens = self.get_access_token_from_refresh_token( + refresh_token) d.state = False - d.access = new_tokens['access_token'] - d.refresh = new_tokens['refresh_token'] + d.access = new_tokens["access_token"] + d.refresh = new_tokens["refresh_token"] return d else: d.state = True @@ -333,4 +349,4 @@ def validate_token_set(self, access_token, refresh_token) -> TokenCheck: d.refresh = refresh_token return d except: - raise HTTPError('both tokens are invalid login again') \ No newline at end of file + raise HTTPError("both tokens are invalid login again") diff --git a/trustauthx/cli.py b/trustauthx/cli.py index 0921aeb..14b03e0 100644 --- a/trustauthx/cli.py +++ b/trustauthx/cli.py @@ -1,32 +1,38 @@ import argparse -from .llmai import LLMAI_Inter +import os +import platform import subprocess -import sys, os, time +import sys +import time + from dotenv import load_dotenv -import platform + +from .llmai import LLMAI_Inter + def is_mac(): - return platform.system() == 'Darwin' + return platform.system() == "Darwin" + def main(): - parser = argparse.ArgumentParser(prog='trustauthx') - load_dotenv(dotenv_path='./.env', override=True, verbose=True) - api_key = os.environ.get('API_KEY') - api_secret = os.environ.get('API_SECRET') - org_id = os.environ.get('ORG_ID') - - parser.add_argument('command') - parser.add_argument('framework') - - parser.add_argument('-k', required=not api_key, help='API key') - parser.add_argument('-s', required=not api_secret, help='API secret') - parser.add_argument('-o', required=not org_id, help='Organization ID') + parser = argparse.ArgumentParser(prog="trustauthx") + load_dotenv(dotenv_path="./.env", override=True, verbose=True) + api_key = os.environ.get("API_KEY") + api_secret = os.environ.get("API_SECRET") + org_id = os.environ.get("ORG_ID") + + parser.add_argument("command") + parser.add_argument("framework") + + parser.add_argument("-k", required=not api_key, help="API key") + parser.add_argument("-s", required=not api_secret, help="API secret") + parser.add_argument("-o", required=not org_id, help="Organization ID") args = parser.parse_args() # try: if args.k and args.s and args.o: if api_key or api_secret or org_id: - file_path = './.env' + file_path = "./.env" if os.path.isfile(file_path): os.remove(file_path) else: @@ -40,42 +46,51 @@ def main(): "API_KEY": args.k, "API_SECRET": args.s, "ORG_ID": args.o - } + } if is_mac(): - with open('.env', 'w') as f: + with open(".env", "w") as f: for key, value in env_vars.items(): - f.write(f'export {key}={value}\n') + f.write(f"export {key}={value}\n") else: - with open('.env', 'w') as f: + with open(".env", "w") as f: for key, value in env_vars.items(): - f.write(f'{key}={value}\n') - load_dotenv(dotenv_path='./.env', override=True, verbose=True) - api_key = os.environ.get('API_KEY') - api_secret = os.environ.get('API_SECRET') - org_id = os.environ.get('ORG_ID') + f.write(f"{key}={value}\n") + load_dotenv(dotenv_path="./.env", override=True, verbose=True) + api_key = os.environ.get("API_KEY") + api_secret = os.environ.get("API_SECRET") + org_id = os.environ.get("ORG_ID") - if api_key or api_secret or org_id: pass - else: print(f"no .env found, api_key {not bool(api_key)}, api_secret {not bool(api_secret)}, org_id {not bool(org_id)}") + if api_key or api_secret or org_id: + pass + else: + print( + f"no .env found, api_key {not bool(api_key)}, api_secret {not bool(api_secret)}, org_id {not bool(org_id)}" + ) client = LLMAI_Inter(api_key, api_secret, org_id, "") - print("\ngetting auth status ...") - if not client.arb_login():raise ConnectionRefusedError("user not found, invalid credential") + print("\ngetting auth status ...") + if not client.arb_login(): + raise ConnectionRefusedError("user not found, invalid credential") print("\nauthorization success, credentials valid") - - if args.command == 'neuroform': - client.framework=args.framework + + if args.command == "neuroform": + client.framework = args.framework sdk = client print("\ngetting req. dependencies:") list_depends = sdk.Install_dependancies() print(list_depends) print("\nInstalling dependencies...") + def install(packages): if packages == "pip install fastapi['all']": subprocess.call("pip install fastapi[all]") return if isinstance(packages, list): - for package in packages:subprocess.call(package, shell=True) - else:subprocess.call(packages, shell=True) + for package in packages: + subprocess.call(package, shell=True) + else: + subprocess.call(packages, shell=True) + install(list_depends) print("\nDependencies installed.") a = sdk.Create_App(path=os.getcwd()) @@ -83,26 +98,30 @@ def install(packages): print("\nApp named --> authx.py") print(f"\napp located at --> {os.path.join(os.getcwd(), 'authx.py')}") print("\nApp creation Successful...") - print(f"\nyou could start the server with command trustauthx start {args.framework}") - - if args.command == 'start': - client.framework=args.framework + print( + f"\nyou could start the server with command trustauthx start {args.framework}" + ) + + if args.command == "start": + client.framework = args.framework sdk = client print("\nTrying to start local server ...") - print("\nthis command might fail in case of few frameworks in such cases consider installing req. lib. and starting server manually") + print( + "\nthis command might fail in case of few frameworks in such cases consider installing req. lib. and starting server manually" + ) b = sdk.Start_server() process = subprocess.Popen(b, shell=True) process.wait() - if args.command == 'login': + if args.command == "login": time.sleep(0.5) print("\nattempt to Login TrustAuthx Build AI successful") print("\nExecuting Rate-Limit") time.sleep(1) print("\nEverything Done Status 200, Ready To Start") - if args.command == 'logout': - file_path = './.env' + if args.command == "logout": + file_path = "./.env" if os.path.isfile(file_path): os.remove(file_path) else: @@ -112,9 +131,9 @@ def install(packages): time.sleep(1) print("\nEverything Done Status 200, Successfully logged out") -if __name__ == '__main__': - main() +if __name__ == "__main__": + main() # if args.command == 'fabricate': # client.framework=args.framework @@ -129,4 +148,4 @@ def install(packages): # install(list_depends) # print("\nDependencies installed.") # a = sdk.Create_App(out=args.out) - # print(a) \ No newline at end of file + # print(a)