Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.8","3.9", "3.10", "3.11", "3.12"]
python-version: ["3.10", "3.11", "3.12", "3.13"]

steps:
- uses: actions/checkout@v4
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
> * v1.00, 11.2020 -- Adjustments for OpenSource - Andreas Graber
> * v1.3, 06.2021 -- Snapshot creation added - Dario Kaelin
> * v1.5, 01.2023 -- Dependency Update
> * v1.6, 06.2024 -- Improvement for subscription - Dario Kaelin
> * v1.6, 06.2024 -- Improvement for subscription - Dario Kaelin
> * v1.7, 12.2025 -- Dependency Update - Dario Kaelin
5 changes: 1 addition & 4 deletions aciClient/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
from aciClient.aci import ACI
from aciClient.aciCertClient import ACICert

__all__ = [
'ACI',
'ACICert'
]
__all__ = ["ACI", "ACICert"]
162 changes: 100 additions & 62 deletions aciClient/aci.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ class ACI:
# constructor
# ==============================================================================
def __init__(self, apicIp, apicUser, apicPasword, refresh=False, proxies=None):
self.__logger.debug('Constructor called')
self.__logger.debug("Constructor called")
self.apicIp = apicIp
self.apicUser = apicUser
self.apicPassword = apicPasword
self.proxies = proxies

self.baseUrl = 'https://' + self.apicIp + '/api/'
self.__logger.debug(f'BaseUrl set to: {self.baseUrl}')
self.baseUrl = "https://" + self.apicIp + "/api/"
self.__logger.debug(f"BaseUrl set to: {self.baseUrl}")

self.refresh_auto = refresh
self.refresh_next = None
Expand All @@ -52,17 +52,27 @@ def __init__(self, apicIp, apicUser, apicPasword, refresh=False, proxies=None):
self.retry_backoff_factor = 10 # in seconds; multiplied by previous attempts.

def __refresh_session_timer(self, response):
self.__logger.debug(f'refreshing the token {self.refresh_offset}s before it expires')
self.refresh_next = int(response.json()['imdata'][0]['aaaLogin']['attributes']['refreshTimeoutSeconds'])
self.refresh_thread = threading.Timer(self.refresh_next - self.refresh_offset, self.renewCookie)
self.__logger.debug(f'starting thread to refresh token in {self.refresh_next - self.refresh_offset}s')
self.__logger.debug(
f"refreshing the token {self.refresh_offset}s before it expires"
)
self.refresh_next = int(
response.json()["imdata"][0]["aaaLogin"]["attributes"][
"refreshTimeoutSeconds"
]
)
self.refresh_thread = threading.Timer(
self.refresh_next - self.refresh_offset, self.renewCookie
)
self.__logger.debug(
f"starting thread to refresh token in {self.refresh_next - self.refresh_offset}s"
)
self.refresh_thread.start()

# ==============================================================================
# login
# ==============================================================================
def login(self) -> bool:
self.__logger.debug('login called')
self.__logger.debug("login called")

retry_strategy = urllib3.Retry(
total=self.total_retry_attempts,
Expand All @@ -72,30 +82,38 @@ def login(self) -> bool:
adapter = HTTPAdapter(max_retries=retry_strategy)

self.session = requests.Session()
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
self.__logger.debug('Session Object Created')
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
self.__logger.debug("Session Object Created")

if self.proxies is not None:
self.session.proxies = self.proxies

# create credentials structure
userPass = json.dumps({'aaaUser': {'attributes': {'name': self.apicUser, 'pwd': self.apicPassword}}})
userPass = json.dumps(
{
"aaaUser": {
"attributes": {"name": self.apicUser, "pwd": self.apicPassword}
}
}
)

self.__logger.info(f'Login to apic {self.baseUrl}')
response = self.session.post(self.baseUrl + 'aaaLogin.json', data=userPass, verify=False, timeout=5)
self.__logger.info(f"Login to apic {self.baseUrl}")
response = self.session.post(
self.baseUrl + "aaaLogin.json", data=userPass, verify=False, timeout=5
)

# Don't raise an exception for 401
if response.status_code == 401:
self.__logger.error(f'Login not possible due to Error: {response.text}')
self.__logger.error(f"Login not possible due to Error: {response.text}")
self.session = False
return False

# Raise a exception for all other 4xx and 5xx status_codes
response.raise_for_status()

self.token = response.json()['imdata'][0]['aaaLogin']['attributes']['token']
self.__logger.debug('Successful get Token from APIC')
self.token = response.json()["imdata"][0]["aaaLogin"]["attributes"]["token"]
self.__logger.debug("Successful get Token from APIC")

if self.refresh_auto:
self.__refresh_session_timer(response=response)
Expand All @@ -105,31 +123,34 @@ def login(self) -> bool:
# logout
# ==============================================================================
def logout(self):
self.__logger.debug('logout called')
self.__logger.debug("logout called")
self.refresh_auto = False
if self.refresh_thread is not None:
if self.refresh_thread.is_alive():
self.__logger.debug('Stoping refresh_auto thread')
self.__logger.debug("Stoping refresh_auto thread")
self.refresh_thread.cancel()
self.postJson(jsonData={'aaaUser': {'attributes': {'name': self.apicUser}}}, url='aaaLogout.json')
self.__logger.debug('Logout from APIC sucessfull')
self.postJson(
jsonData={"aaaUser": {"attributes": {"name": self.apicUser}}},
url="aaaLogout.json",
)
self.__logger.debug("Logout from APIC sucessfull")

# ==============================================================================
# renew cookie (aaaRefresh)
# ==============================================================================
def renewCookie(self) -> bool:
self.__logger.debug('Renew Cookie called')
response = self.session.post(self.baseUrl + 'aaaRefresh.json', verify=False)
self.__logger.debug("Renew Cookie called")
response = self.session.post(self.baseUrl + "aaaRefresh.json", verify=False)

if response.status_code == 200:
if self.refresh_auto:
self.__refresh_session_timer(response=response)
self.token = response.json()['imdata'][0]['aaaLogin']['attributes']['token']
self.__logger.debug('Successfuly renewed the token')
self.token = response.json()["imdata"][0]["aaaLogin"]["attributes"]["token"]
self.__logger.debug("Successfuly renewed the token")
else:
self.token = False
self.refresh_auto = False
self.__logger.error(f'Could not renew token. {response.text}')
self.__logger.error(f"Could not renew token. {response.text}")
response.raise_for_status()
return False
return True
Expand All @@ -138,101 +159,118 @@ def renewCookie(self) -> bool:
# getToken
# ==============================================================================
def getToken(self) -> str:
self.__logger.debug('Get Token called')
self.__logger.debug("Get Token called")
return self.token

# ==============================================================================
# getJson
# ==============================================================================
def getJson(self, uri, subscription=False) -> {}:
url = self.baseUrl + uri
self.__logger.debug(f'Get Json called url: {url}')
self.__logger.debug(f"Get Json called url: {url}")

if subscription:
url = '{}?subscription=yes'.format(url)
url = "{}?subscription=yes".format(url)
response = self.session.get(url, verify=False)

if response.ok:
responseJson = response.json()
self.__logger.debug(f'Successful get Data from APIC: {responseJson}')
self.__logger.debug(f"Successful get Data from APIC: {responseJson}")
if subscription:
subscription_id = responseJson['subscriptionId']
self.__logger.debug(f'Returning Subscription Id: {subscription_id}')
subscription_id = responseJson["subscriptionId"]
self.__logger.debug(f"Returning Subscription Id: {subscription_id}")
return subscription_id
return responseJson['imdata']
return responseJson["imdata"]

elif response.status_code == 400:
resp_text = response.json()['imdata'][0]['error']['attributes']['text']
self.__logger.error(f'Error 400 during get occured: {resp_text}')
if resp_text == 'Unable to process the query, result dataset is too big':
resp_text = response.json()["imdata"][0]["error"]["attributes"]["text"]
self.__logger.error(f"Error 400 during get occured: {resp_text}")
if resp_text == "Unable to process the query, result dataset is too big":
# Dataset was too big, we try to grab all the data with pagination
self.__logger.debug(f'Trying with Pagination, uri: {uri}')
self.__logger.debug(f"Trying with Pagination, uri: {uri}")
return self.getJsonPaged(uri)
return resp_text
else:
self.__logger.error(f'Error during get occured: {response.json()}')
self.__logger.error(f"Error during get occured: {response.json()}")
return response.json()

# ==============================================================================
# getJson with Pagination
# ==============================================================================
def getJsonPaged(self, uri) -> {}:
url = self.baseUrl + uri
self.__logger.debug(f'Get Json Pagination called url: {url}')
self.__logger.debug(f"Get Json Pagination called url: {url}")
parsed_url = urlparse(url)
parsed_query = parse_qsl(parsed_url.query)

return_data = []
page = 0

while True:
parsed_query.extend([('page', page), ('page-size', '50000')])
parsed_query.extend([("page", page), ("page-size", "50000")])
page += 1
url_to_call = urlunparse((parsed_url[0], parsed_url[1], parsed_url[2], parsed_url[3],
urlencode(parsed_query), parsed_url[5]))
url_to_call = urlunparse(
(
parsed_url[0],
parsed_url[1],
parsed_url[2],
parsed_url[3],
urlencode(parsed_query),
parsed_url[5],
)
)
response = self.session.get(url_to_call, verify=False)

if response.ok:
responseJson = response.json()
self.__logger.debug(f'Successful get Data from APIC: {responseJson}')
if responseJson['imdata']:
return_data.extend(responseJson['imdata'])
self.__logger.debug(f"Successful get Data from APIC: {responseJson}")
if responseJson["imdata"]:
return_data.extend(responseJson["imdata"])
else:
return return_data

elif response.status_code == 400:
resp_text = '400: ' + response.json()['imdata'][0]['error']['attributes']['text']
self.__logger.error(f'Error 400 during get occured: {resp_text}')
resp_text = (
"400: "
+ response.json()["imdata"][0]["error"]["attributes"]["text"]
)
self.__logger.error(f"Error 400 during get occured: {resp_text}")
return resp_text

else:
self.__logger.error(f'Error during get occured: {response.json()}')
self.__logger.error(f"Error during get occured: {response.json()}")
return False

# ==============================================================================
# postJson
# ==============================================================================
def postJson(self, jsonData, url='mo.json') -> {}:
self.__logger.debug(f'Post Json called data: {jsonData}')
response = self.session.post(self.baseUrl + url, verify=False, data=json.dumps(jsonData, sort_keys=True))
def postJson(self, jsonData, url="mo.json") -> {}:
self.__logger.debug(f"Post Json called data: {jsonData}")
response = self.session.post(
self.baseUrl + url, verify=False, data=json.dumps(jsonData, sort_keys=True)
)
if response.status_code == 200:
self.__logger.debug(f'Successful Posted Data to APIC: {response.json()}')
self.__logger.debug(f"Successful Posted Data to APIC: {response.json()}")
return response.status_code
elif response.status_code == 400:
resp_text = '400: ' + response.json()['imdata'][0]['error']['attributes']['text']
self.__logger.error(f'Error 400 during get occured: {resp_text}')
resp_text = (
"400: " + response.json()["imdata"][0]["error"]["attributes"]["text"]
)
self.__logger.error(f"Error 400 during get occured: {resp_text}")
return resp_text
else:
self.__logger.error(f'Error during get occured: {response.json()}')
self.__logger.error(f"Error during get occured: {response.json()}")
response.raise_for_status()
return response.status_code

# ==============================================================================
# deleteMo
# ==============================================================================
def deleteMo(self, dn) -> int:
self.__logger.debug(f'Delete Mo called DN: {dn}')
response = self.session.delete(self.baseUrl + "mo/" + dn + ".json", verify=False)
self.__logger.debug(f"Delete Mo called DN: {dn}")
response = self.session.delete(
self.baseUrl + "mo/" + dn + ".json", verify=False
)

# Raise Exception if http Error occurred
response.raise_for_status()
Expand All @@ -243,7 +281,7 @@ def deleteMo(self, dn) -> int:
# snapshot
# ==============================================================================
def snapshot(self, description="snapshot", target_dn="") -> bool:
self.__logger.debug(f'snapshot called {description}')
self.__logger.debug(f"snapshot called {description}")

json_payload = [
{
Expand All @@ -258,21 +296,21 @@ def snapshot(self, description="snapshot", target_dn="") -> bool:
"name": "aciclient",
"nameAlias": "",
"snapshot": "yes",
"targetDn": f"{target_dn}"
"targetDn": f"{target_dn}",
}
}
}
]

response = self.postJson(json_payload)
if response == 200:
self.__logger.debug('snapshot created and triggered')
self.__logger.debug("snapshot created and triggered")
return True
else:
self.__logger.error(f'snapshot creation not succesfull: {response}')
self.__logger.error(f"snapshot creation not succesfull: {response}")
return False

# ==============================================================================
# ==============================================================================
# subscribe
# ==============================================================================
def subscribe(
Expand Down
Loading