Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 10 additions & 27 deletions examples/oauth_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

from pytfe import TFEClient, TFEConfig
from pytfe.errors import NotFound
from pytfe.models import OAuthTokenListOptions, OAuthTokenUpdateOptions
from pytfe.models import OAuthTokenUpdateOptions


def main():
Expand All @@ -55,34 +55,18 @@ def main():
# =====================================================
print("\n1. Testing list() function:")
try:
# Test basic list without options
token_list = client.oauth_tokens.list(organization_name)
print(f" ✓ Found {len(token_list.items)} OAuth tokens")

# Show token details
for i, token in enumerate(token_list.items[:3], 1): # Show first 3
print(f" {i}. Token ID: {token.id}")
print(f" UID: {token.uid}")
print(f" Service Provider User: {token.service_provider_user}")
print(f" Has SSH Key: {token.has_ssh_key}")
print(f" Created: {token.created_at}")
for token in client.oauth_tokens.list(organization_name):
print(f" Token ID: {token.id}")
print(f" Service Provider User: {token.service_provider_user}")
print(f" Has SSH Key: {token.has_ssh_key}")
print(f" Created: {token.created_at}")
if token.oauth_client:
print(f" OAuth Client: {token.oauth_client.id}")

# Store first token for subsequent tests
if token_list.items:
test_token_id = token_list.items[0].id
print(f"\n Using token {test_token_id} for subsequent tests")

# Test list with options
print("\n Testing list() with pagination options:")
options = OAuthTokenListOptions(page_size=10, page_number=1)
token_list_with_options = client.oauth_tokens.list(organization_name, options)
print(f" ✓ Found {len(token_list_with_options.items)} tokens with options")
if token_list_with_options.current_page:
print(f" Current page: {token_list_with_options.current_page}")
if token_list_with_options.total_count:
print(f" Total count: {token_list_with_options.total_count}")
# Store first token for subsequent tests
if token and not test_token_id:
test_token_id = token.id
print(f"\n Using token {test_token_id} for subsequent tests \n")

except NotFound:
print(
Expand All @@ -99,7 +83,6 @@ def main():
try:
token = client.oauth_tokens.read(test_token_id)
print(f" ✓ Read OAuth token: {token.id}")
print(f" UID: {token.uid}")
print(f" Service Provider User: {token.service_provider_user}")
print(f" Has SSH Key: {token.has_ssh_key}")
print(f" Created: {token.created_at}")
Expand Down
94 changes: 43 additions & 51 deletions examples/policy_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ def main():
required=True,
help="Task stage ID to list policy evaluations for",
)
parser.add_argument("--page", type=int, default=1)
parser.add_argument("--page-size", type=int, default=20)
args = parser.parse_args()

Expand All @@ -41,62 +40,55 @@ def main():
_print_header(f"Listing policy evaluations for task stage: {args.task_stage_id}")

options = PolicyEvaluationListOptions(
page_number=args.page,
page_size=args.page_size,
)

try:
pe_list = client.policy_evaluations.list(args.task_stage_id, options)

print(f"Total policy evaluations: {pe_list.total_count}")
print(f"Page {pe_list.current_page} of {pe_list.total_pages}")
print()

if not pe_list.items:
pe_count = 0
for pe in client.policy_evaluations.list(args.task_stage_id, options):
pe_count += 1
print(f"- ID: {pe.id}")
print(f" Status: {pe.status}")
print(f" Policy Kind: {pe.policy_kind}")

if pe.result_count:
print(" Result Count:")
if pe.result_count.passed is not None:
print(f" - Passed: {pe.result_count.passed}")
if pe.result_count.advisory_failed is not None:
print(f" - Advisory Failed: {pe.result_count.advisory_failed}")
if pe.result_count.mandatory_failed is not None:
print(f" - Mandatory Failed: {pe.result_count.mandatory_failed}")
if pe.result_count.errored is not None:
print(f" - Errored: {pe.result_count.errored}")

if pe.status_timestamp:
print(" Status Timestamps:")
if pe.status_timestamp.passed_at:
print(f" - Passed At: {pe.status_timestamp.passed_at}")
if pe.status_timestamp.failed_at:
print(f" - Failed At: {pe.status_timestamp.failed_at}")
if pe.status_timestamp.running_at:
print(f" - Running At: {pe.status_timestamp.running_at}")
if pe.status_timestamp.canceled_at:
print(f" - Canceled At: {pe.status_timestamp.canceled_at}")
if pe.status_timestamp.errored_at:
print(f" - Errored At: {pe.status_timestamp.errored_at}")

if pe.policy_attachable:
print(f" Task Stage: {pe.task_stage.id} ({pe.task_stage.type})")

if pe.created_at:
print(f" Created At: {pe.created_at}")
if pe.updated_at:
print(f" Updated At: {pe.updated_at}")

print()

if pe_count == 0:
print("No policy evaluations found for this task stage.")
else:
for pe in pe_list.items:
print(f"- ID: {pe.id}")
print(f" Status: {pe.status}")
print(f" Policy Kind: {pe.policy_kind}")

if pe.result_count:
print(" Result Count:")
if pe.result_count.passed is not None:
print(f" - Passed: {pe.result_count.passed}")
if pe.result_count.advisory_failed is not None:
print(
f" - Advisory Failed: {pe.result_count.advisory_failed}"
)
if pe.result_count.mandatory_failed is not None:
print(
f" - Mandatory Failed: {pe.result_count.mandatory_failed}"
)
if pe.result_count.errored is not None:
print(f" - Errored: {pe.result_count.errored}")

if pe.status_timestamp:
print(" Status Timestamps:")
if pe.status_timestamp.passed_at:
print(f" - Passed At: {pe.status_timestamp.passed_at}")
if pe.status_timestamp.failed_at:
print(f" - Failed At: {pe.status_timestamp.failed_at}")
if pe.status_timestamp.running_at:
print(f" - Running At: {pe.status_timestamp.running_at}")
if pe.status_timestamp.canceled_at:
print(f" - Canceled At: {pe.status_timestamp.canceled_at}")
if pe.status_timestamp.errored_at:
print(f" - Errored At: {pe.status_timestamp.errored_at}")

if pe.task_stage:
print(f" Task Stage: {pe.task_stage.id} ({pe.task_stage.type})")

if pe.created_at:
print(f" Created At: {pe.created_at}")
if pe.updated_at:
print(f" Updated At: {pe.updated_at}")

print()
print(f"\nTotal: {pe_count} policy evaluations")

except Exception as e:
print(f"Error listing policy evaluations: {e}")
Expand Down
20 changes: 8 additions & 12 deletions examples/reserved_tag_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ def main():
try:
# 1. List existing reserved tag keys
print("\n1. Listing reserved tag keys...")
reserved_tag_keys = client.reserved_tag_key.list(TFE_ORG)
print(f"✅ Found {len(reserved_tag_keys.items)} reserved tag keys:")
for rtk in reserved_tag_keys.items:
for rtk in client.reserved_tag_key.list(TFE_ORG):
print(
f" - ID: {rtk.id}, Key: {rtk.key}, Disable Overrides: {rtk.disable_overrides}"
)
Expand Down Expand Up @@ -87,18 +85,16 @@ def main():

# 5. Verify deletion by listing again
print("\n5. Verifying deletion...")
reserved_tag_keys_after = client.reserved_tag_key.list(TFE_ORG)
print(
f"✅ Reserved tag keys after deletion: {len(reserved_tag_keys_after.items)}"
)
reserved_tag_keys_after = list(client.reserved_tag_key.list(TFE_ORG))
print(f"Reserved tag keys after deletion: {len(reserved_tag_keys_after)}")

# 6. Demonstrate pagination with options
print("\n6. Demonstrating pagination options...")
list_options = ReservedTagKeyListOptions(page_size=5, page_number=1)
paginated_rtks = client.reserved_tag_key.list(TFE_ORG, list_options)
print(f"✅ Page 1 with page size 5: {len(paginated_rtks.items)} keys")
print(f" Total pages: {paginated_rtks.total_pages}")
print(f" Total count: {paginated_rtks.total_count}")
list_options = ReservedTagKeyListOptions(page_size=5)
for rtk in client.reserved_tag_key.list(TFE_ORG, list_options):
print(
f" - ID: {rtk.id}, Key: {rtk.key}, Disable Overrides: {rtk.disable_overrides}"
)

print("\n🎉 Reserved Tag Keys API example completed successfully!")

Expand Down
6 changes: 3 additions & 3 deletions src/pytfe/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
from .resources.policy_check import PolicyChecks
from .resources.policy_evaluation import PolicyEvaluations
from .resources.policy_set import PolicySets
from .resources.policy_set_outcome import PolicySets as PolicySetOutcomes
from .resources.policy_set_outcome import PolicySetOutcomes
from .resources.policy_set_version import PolicySetVersions
from .resources.projects import Projects
from .resources.query_run import QueryRuns
from .resources.registry_module import RegistryModules
from .resources.registry_provider import RegistryProviders
from .resources.reserved_tag_key import ReservedTagKey
from .resources.reserved_tag_key import ReservedTagKeys
from .resources.run import Runs
from .resources.run_event import RunEvents
from .resources.run_task import RunTasks
Expand Down Expand Up @@ -91,7 +91,7 @@ def __init__(self, config: TFEConfig | None = None):
self.ssh_keys = SSHKeys(self._transport)

# Reserved Tag Key
self.reserved_tag_key = ReservedTagKey(self._transport)
self.reserved_tag_key = ReservedTagKeys(self._transport)

def close(self) -> None:
try:
Expand Down
8 changes: 8 additions & 0 deletions src/pytfe/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,3 +460,11 @@ class InvalidPolicyEvaluationIDError(InvalidValues):

def __init__(self, message: str = "invalid value for policy evaluation ID"):
super().__init__(message)


# Policy Set Outcome errors
class InvalidPolicySetOutcomeIDError(InvalidValues):
"""Raised when an invalid policy set outcome ID is provided."""

def __init__(self, message: str = "invalid value for policy set outcome ID"):
super().__init__(message)
6 changes: 0 additions & 6 deletions src/pytfe/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
)
from .oauth_token import (
OAuthToken,
OAuthTokenList,
OAuthTokenListOptions,
OAuthTokenUpdateOptions,
)
Expand Down Expand Up @@ -110,7 +109,6 @@
from .policy_evaluation import (
PolicyAttachable,
PolicyEvaluation,
PolicyEvaluationList,
PolicyEvaluationListOptions,
PolicyEvaluationStatus,
PolicyEvaluationStatusTimestamps,
Expand Down Expand Up @@ -208,7 +206,6 @@
from .reserved_tag_key import (
ReservedTagKey,
ReservedTagKeyCreateOptions,
ReservedTagKeyList,
ReservedTagKeyListOptions,
ReservedTagKeyUpdateOptions,
)
Expand Down Expand Up @@ -349,7 +346,6 @@
"ServiceProviderType",
# OAuth token
"OAuthToken",
"OAuthTokenList",
"OAuthTokenListOptions",
"OAuthTokenUpdateOptions",
# SSH keys
Expand All @@ -361,7 +357,6 @@
# Reserved tag keys
"ReservedTagKey",
"ReservedTagKeyCreateOptions",
"ReservedTagKeyList",
"ReservedTagKeyListOptions",
"ReservedTagKeyUpdateOptions",
# Agent & pools
Expand Down Expand Up @@ -559,7 +554,6 @@
# Policy Evaluation
"PolicyAttachable",
"PolicyEvaluation",
"PolicyEvaluationList",
"PolicyEvaluationListOptions",
"PolicyEvaluationStatus",
"PolicyEvaluationStatusTimestamps",
Expand Down
20 changes: 2 additions & 18 deletions src/pytfe/models/oauth_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ class OAuthToken(BaseModel):
model_config = ConfigDict(extra="forbid")

id: str = Field(..., description="OAuth token ID")
uid: str = Field(..., description="OAuth token UID")
created_at: datetime = Field(..., description="Creation timestamp")
has_ssh_key: bool = Field(..., description="Whether the token has an SSH key")
service_provider_user: str = Field(..., description="Service provider user")
Expand All @@ -26,26 +25,12 @@ class OAuthToken(BaseModel):
)


class OAuthTokenList(BaseModel):
"""List of OAuth tokens with pagination information."""

model_config = ConfigDict(extra="forbid")

items: list[OAuthToken] = Field(default_factory=list, description="OAuth tokens")
current_page: int | None = Field(None, description="Current page number")
prev_page: int | None = Field(None, description="Previous page number")
next_page: int | None = Field(None, description="Next page number")
total_pages: int | None = Field(None, description="Total number of pages")
total_count: int | None = Field(None, description="Total count of items")


class OAuthTokenListOptions(BaseModel):
"""Options for listing OAuth tokens."""

model_config = ConfigDict(extra="forbid")
model_config = ConfigDict(populate_by_name=True, validate_by_name=True)

page_number: int | None = Field(None, description="Page number")
page_size: int | None = Field(None, description="Page size")
page_size: int | None = Field(None, alias="page[size]", description="Page size")


class OAuthTokenUpdateOptions(BaseModel):
Expand All @@ -63,7 +48,6 @@ class OAuthTokenUpdateOptions(BaseModel):
from .oauth_client import OAuthClient # noqa: F401

OAuthToken.model_rebuild()
OAuthTokenList.model_rebuild()
except ImportError:
# If OAuthClient is not available, create a dummy class
pass
16 changes: 1 addition & 15 deletions src/pytfe/models/policy_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class PolicyEvaluation(BaseModel):
updated_at: datetime | None = Field(None, alias="updated-at")

# The task stage the policy evaluation belongs to
task_stage: PolicyAttachable | None = Field(None, alias="policy-attachable")
policy_attachable: PolicyAttachable | None = Field(None, alias="policy-attachable")


class PolicyEvaluationStatusTimestamps(BaseModel):
Expand Down Expand Up @@ -72,23 +72,9 @@ class PolicyResultCount(BaseModel):
errored: int | None = Field(None, alias="errored")


class PolicyEvaluationList(BaseModel):
"""PolicyEvaluationList represents a list of policy evaluations"""

model_config = ConfigDict(populate_by_name=True, validate_by_name=True)

items: list[PolicyEvaluation] | None = Field(default_factory=list)
current_page: int | None = None
next_page: str | None = None
prev_page: str | None = None
total_count: int | None = None
total_pages: int | None = None


class PolicyEvaluationListOptions(BaseModel):
"""PolicyEvaluationListOptions represents the options for listing policy evaluations"""

model_config = ConfigDict(populate_by_name=True, validate_by_name=True)

page_number: int | None = Field(None, alias="page[number]")
page_size: int | None = Field(None, alias="page[size]")
Loading
Loading