Skip to content

Conversation

@FarisZR
Copy link
Owner

@FarisZR FarisZR commented Nov 18, 2025

This PR implements the "Device Tokens" feature, ensuring that each API token can only have one active whitelisted IP address at a time. When a new IP is whitelisted using a token that already has an active entry, the old entry is automatically removed.

Changes

  • Updated src/core.py to support storing token_id in the whitelist entries.
  • Updated src/core.py add_ip_to_whitelist to check for existing entries with the same token_id and remove them.
  • Updated src/main.py to generate a deterministic token_id (hash of the API key) and pass it to the core logic.
  • Added tests/test_device_tokens.py to verify the new behavior and ensure backward compatibility.

Sequence Diagram

Before (Legacy Behavior)

sequenceDiagram
    participant Client
    participant API
    participant Whitelist

    Client->>API: Knock(IP=1.2.3.4, Token=A)
    API->>Whitelist: Add(1.2.3.4)
    Whitelist-->>API: OK
    API-->>Client: 200 OK

    Client->>API: Knock(IP=5.6.7.8, Token=A)
    API->>Whitelist: Add(5.6.7.8)
    Note right of Whitelist: Both 1.2.3.4 and 5.6.7.8 are now active for Token A
    Whitelist-->>API: OK
    API-->>Client: 200 OK
Loading

After (Device Tokens)

sequenceDiagram
    participant Client
    participant API
    participant Whitelist
    participant Firewalld

    Client->>API: Knock(IP=1.2.3.4, Token=A)
    API->>API: Generate token_id = Hash(Token=A)
    API->>Whitelist: Add(IP=1.2.3.4, token_id=Hash(A))
    Whitelist-->>API: OK
    API-->>Client: 200 OK

    Client->>API: Knock(IP=5.6.7.8, Token=A)
    API->>API: Generate token_id = Hash(Token=A)
    API->>Whitelist: Add(IP=5.6.7.8, token_id=Hash(A))
    Whitelist->>Whitelist: Find existing entry with token_id=Hash(A)
    Whitelist->>Whitelist: Remove old IP (1.2.3.4)
    Whitelist->>Firewalld: Remove Rule(1.2.3.4)
    Whitelist->>Whitelist: Save new IP (5.6.7.8)
    Whitelist-->>API: OK (Old IP removed)
    API-->>Client: 200 OK
Loading

Closes #21

Summary by CodeRabbit

  • New Features

    • Token-based IP whitelisting: each token now keeps one active IP; new additions replace prior token-bound IPs.
  • Improvements

    • Whitelist supports mixed legacy/new entries and automatic migration.
    • Safer persistence with atomic writes, size/expiry-aware pruning, and firewall rule cleanup on replacement.
    • Improved IP/CIDR, path, and API-key handling (masked names when unnamed).
  • Tests

    • Added tests for legacy migration, per-token, rolling/adoption, and cross-token behavior.
  • Documentation

    • Updated forwarded verification path used by the proxy.

✏️ Tip: You can customize this high-level summary in your review settings.

- Update core.py to support token_id in whitelist entries and enforce uniqueness
- Update main.py to generate token_id from API key hash
- Add tests covering new behavior and legacy fallback
- Resolves #21
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 18, 2025

Walkthrough

Implements one-IP-per-token device tokens and mixed-format whitelist support: whitelist entries may be legacy ints or dicts with expiry and token_id; adding an IP with a token_id replaces the previous IP for that token. Persistence, validation, firewalld integration, and tests updated accordingly.

Changes

Cohort / File(s) Summary
Core whitelist model & persistence
src/core.py
Whitelist accepts int or Dict[str, Any] entries; load_whitelist/save_whitelist handle mixed types, atomic writes, expiry-aware sorting, and size limiting; cleanup and cross-process locking improved.
Token-aware add/remove & firewalld
src/core.py
add_ip_to_whitelist(..., token_id: Optional[str]) can replace an existing IP for the same token and returns the removed IP; add_ip_to_whitelist_with_firewalld(..., token_id: Optional[str]) removes replaced firewall rules and improves rollback/error logging.
Validation & API-key helpers
src/core.py
is_ip_whitelisted, CIDR/IP/path validation, and API-key helpers updated for list-based api_keys and new whitelist shapes; get_api_key_name masks key when name missing.
Main app: token_id integration
src/main.py
Computes token_id as SHA-256 of the API key and passes it to add_ip_to_whitelist_with_firewalld; request flow formatting updated, behavior preserved aside from token ownership handling.
Tests: migration & device-token behavior
tests/test_migration.py, tests/test_device_tokens.py
New tests cover mixed legacy/new whitelist migration, cleanup of expired legacy entries, conversion/adoption to token-owned dict entries, per-token one-IP replacement, and cross-token isolation.
Docs & config tweak
README.md, dev/Caddyfile, docs/Project-OVERVIEW.md
Forward_auth URI adjusted from /verify to /verify? in examples/docs to allow optional query usage.

Sequence Diagram(s)

sequenceDiagram
    participant Client as Client
    participant Main as src/main.py
    participant Core as src/core.py
    participant FS as Filesystem
    participant FW as Firewalld

    Client->>Main: Knock request (api_key, ip)
    Main->>Main: token_id = SHA256(api_key) (if api_key)
    Main->>Core: add_ip_to_whitelist_with_firewalld(ip, expiry, settings, token_id)
    Core->>FS: load_whitelist()
    FS-->>Core: whitelist (mixed legacy/new entries)

    alt token_id present and maps to old_ip
        Core->>FW: remove firewall rule for old_ip
        FW-->>Core: removed / error
        Core->>Core: record removed IP
    else no replacement
        Core->>Core: no prior owner found
    end

    Core->>Core: insert/update entry for ip with expiry and token_id
    Core->>FW: add firewall rule for new ip
    FW-->>Core: added / error
    Core->>FS: save_whitelist(atomic)
    FS-->>Core: persisted
    Core-->>Main: return removed IP (or None)
    Main-->>Client: response (success / error)
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • Areas to focus on:
    • Correctness of token-id replacement and returned removed-IP behavior in add_ip_to_whitelist
    • Firewalld removal/rollback/error handling when replacing rules
    • Parsing, migration, and expiry semantics in load_whitelist / save_whitelist
    • Tests in tests/test_migration.py and tests/test_device_tokens.py for coverage completeness

Possibly related PRs

Suggested labels

enhancement

Poem

🐰 I hopped a path through whitelist and file,
One token, one IP — a tidy style.
I nudged old hops and wrote with care,
Atomically saved — no mess to spare.
A rabbit's wink: one device, one chair.

Pre-merge checks and finishing touches

✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately reflects the main change: implementing device tokens to limit each API token to one active whitelisted IP, which is the core feature introduced in this PR.
Linked Issues check ✅ Passed The PR successfully implements all requirements from issue #21: one-IP-per-token enforcement, automatic removal of previous IPs, firewall rule cleanup, and backward compatibility through lazy migration.
Out of Scope Changes check ✅ Passed Minor formatting changes to Caddy configuration (/verify to /verify?) and documentation updates are included but appear related to supporting the device tokens feature rather than being unrelated scope creep.
Docstring Coverage ✅ Passed Docstring coverage is 88.24% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/device-tokens

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cd90c3b and 3d6267a.

📒 Files selected for processing (3)
  • README.md (1 hunks)
  • dev/Caddyfile (1 hunks)
  • docs/Project-OVERVIEW.md (1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-09-08T22:31:37.677Z
Learnt from: CR
Repo: FarisZR/knocker PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-09-08T22:31:37.677Z
Learning: Applies to {knocker.yaml,knocker.example.yaml} : Configure trusted_proxies correctly in knocker.yaml to prevent IP spoofing via X-Forwarded-For

Applied to files:

  • dev/Caddyfile
  • README.md
  • docs/Project-OVERVIEW.md
📚 Learning: 2025-09-08T22:31:37.677Z
Learnt from: CR
Repo: FarisZR/knocker PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-09-08T22:31:37.677Z
Learning: Applies to {knocker.yaml,knocker.example.yaml} : Application requires KNOCKER_CONFIG_PATH to point to a valid knocker.yaml; use knocker.example.yaml as the schema reference

Applied to files:

  • README.md
  • docs/Project-OVERVIEW.md
🔇 Additional comments (2)
README.md (1)

116-116: Clarify the purpose of adding ? to the verify URI.

The change from /verify to /verify? in the forward_auth configuration is not explained in the PR objectives or description. Since the device token feature derives token_id from the API key header (not query parameters), it's unclear why the verify endpoint needs query parameter support.

Please clarify:

  1. Does the /verify endpoint now accept or require query parameters?
  2. How does this change relate to the device token functionality?
  3. Is this change necessary for the feature to work, or is it preparing for future functionality?

If query parameters are not needed, consider reverting this change to avoid confusion.

dev/Caddyfile (1)

29-29: Verify the necessity of ? in the actual configuration file.

Unlike the documentation changes in README.md and docs/Project-OVERVIEW.md, this is an actual configuration file used in the development/test environment. The addition of ? to the verify URI will affect runtime behavior.

Please confirm:

  1. Are there integration tests that verify this change is necessary?
  2. Does the behavior differ with vs. without the ? suffix?
  3. Is this change mentioned in the integration test updates that are noted as "still missing" in the PR comments?

If the /verify endpoint doesn't actually use query parameters for the device token feature, this change should be reverted to maintain clarity and avoid introducing unnecessary configuration differences.

Tip

📝 Customizable high-level summaries are now available in beta!

You can now customize how CodeRabbit generates the high-level summary in your pull requests — including its content, structure, tone, and formatting.

  • Provide your own instructions using the high_level_summary_instructions setting.
  • Format the summary however you like (bullet lists, tables, multi-section layouts, contributor stats, etc.).
  • Use high_level_summary_in_walkthrough to move the summary from the description to the walkthrough section.

Example instruction:

"Divide the high-level summary into five sections:

  1. 📝 Description — Summarize the main change in 50–60 words, explaining what was done.
  2. 📓 References — List relevant issues, discussions, documentation, or related PRs.
  3. 📦 Dependencies & Requirements — Mention any new/updated dependencies, environment variable changes, or configuration updates.
  4. 📊 Contributor Summary — Include a Markdown table showing contributions:
    | Contributor | Lines Added | Lines Removed | Files Changed |
  5. ✔️ Additional Notes — Add any extra reviewer context.
    Keep each section concise (under 200 words) and use bullet or numbered lists for clarity."

Note: This feature is currently in beta for Pro-tier users, and pricing will be announced later.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@FarisZR
Copy link
Owner Author

FarisZR commented Nov 18, 2025

gemini 3 let's gooooooooooooooooooooooooooooooo



# Generate token_id from API key to enforce one-IP-per-token policy
token_id = hashlib.sha256(api_key.encode()).hexdigest() if api_key else None

Check failure

Code scanning / CodeQL

Use of a broken or weak cryptographic hashing algorithm on sensitive data High

Sensitive data (password)
is used in a hashing algorithm (SHA256) that is insecure for password hashing, since it is not a computationally expensive hash function.

Copilot Autofix

AI about 2 months ago

To address this issue, we should replace the use of hashlib.sha256 with a modern, computationally expensive key derivation function when hashing API keys, even for identification purposes. This ensures resistance to brute-force and pre-image attacks, aligning with best security practices for handling sensitive keys. The recommended approach is to use a KDF such as PBKDF2 from the built-in hashlib module (hashlib.pbkdf2_hmac) because it is readily available, widely accepted, and does not require third-party packages. For enhanced security, a unique per-application salt should be used. This requires defining a salt value, updating the hashing logic, and handling byte output as hexadecimal for consistency.

The changes are confined to the region where token_id is computed (line 510). We will introduce a static salt as a module-level variable (best stored securely or derived from configuration), implement the PBKDF2-based hash, and update token_id accordingly. We only need to add the definition for the salt and switch hashlib.sha256(...).hexdigest() to hashlib.pbkdf2_hmac(...).


Suggested changeset 1
src/main.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/src/main.py b/src/main.py
--- a/src/main.py
+++ b/src/main.py
@@ -507,7 +507,14 @@
     # Use SHA256 to generate a deterministic ID for the token.
     # This is NOT for password hashing, but for unique identification of the token
     # to enforce the one-IP-per-token policy. The token itself is the secret.
-    token_id = hashlib.sha256(api_key.encode()).hexdigest() if api_key else None
+    token_id = (
+        hashlib.pbkdf2_hmac(
+            "sha256",
+            api_key.encode(),
+            TOKEN_ID_SALT,
+            100_000,
+        ).hex() if api_key else None
+    )
 
     # Add to whitelist with firewalld integration
     # This will add firewalld rules BEFORE updating whitelist.json if firewalld is enabled
EOF
@@ -507,7 +507,14 @@
# Use SHA256 to generate a deterministic ID for the token.
# This is NOT for password hashing, but for unique identification of the token
# to enforce the one-IP-per-token policy. The token itself is the secret.
token_id = hashlib.sha256(api_key.encode()).hexdigest() if api_key else None
token_id = (
hashlib.pbkdf2_hmac(
"sha256",
api_key.encode(),
TOKEN_ID_SALT,
100_000,
).hex() if api_key else None
)

# Add to whitelist with firewalld integration
# This will add firewalld rules BEFORE updating whitelist.json if firewalld is enabled
Copilot is powered by AI and may make mistakes. Always verify output.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/core.py (1)

198-217: Add storage_path validation to prevent path injection attacks.

The config file path is properly validated in load_config() (lines 40-49 of src/config.py), but the storage_path value extracted from settings is never validated. This allows directory traversal via payloads like storage_path: "../../../etc/passwd" in the config YAML.

Add validation in load_config() after parsing the config (before line 84) to sanitize the whitelist storage_path:

  • Resolve the path using Path.resolve()
  • Validate it doesn't escape a safe base directory (e.g., application root)
  • Use Path.is_relative_to() to confirm containment (Python 3.9+)

Alternatively, validate at the point of use in load_whitelist() and save_whitelist() (src/core.py lines 201, 224).

🧹 Nitpick comments (2)
src/core.py (2)

374-387: Consider using logging.exception for better debugging.

The error handling correctly removes the old firewall rule when an IP is replaced. However, using logging.error instead of logging.exception loses the stack trace.

Apply this diff to improve error logging:

             except Exception as e:
-                logging.error(f"Failed to remove old firewall rule for {old_ip}: {e}")
+                logging.exception(f"Failed to remove old firewall rule for {old_ip}: {e}")

As per static analysis hints.


389-403: Consider using logging.exception for better debugging.

Similar to the previous comment, the rollback error handlers would benefit from stack traces for troubleshooting failed rollbacks.

Apply this diff:

             try:
                 firewalld_integration.remove_whitelist_rule(ip_or_cidr)
-                logging.error(
+                logging.exception(
                     f"Rolled back firewalld rules for {ip_or_cidr} due to whitelist persistence failure: {e}"
                 )
             except Exception as rollback_error:
-                logging.error(
+                logging.exception(
                     f"Failed to rollback firewalld rules for {ip_or_cidr}: {rollback_error}"
                 )

-        logging.error(f"Failed to persist whitelist entry for {ip_or_cidr}: {e}")
+        logging.exception(f"Failed to persist whitelist entry for {ip_or_cidr}: {e}")

As per static analysis hints.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cb18240 and c3a7f03.

📒 Files selected for processing (3)
  • src/core.py (10 hunks)
  • src/main.py (20 hunks)
  • tests/test_device_tokens.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
src/main.py (3)
src/core.py (10)
  • cleanup_expired_ips (406-437)
  • load_whitelist (198-216)
  • is_valid_api_key (459-489)
  • can_whitelist_remote (443-448)
  • is_valid_ip_or_cidr (48-54)
  • is_safe_cidr_range (57-78)
  • get_max_ttl_for_key (451-456)
  • add_ip_to_whitelist_with_firewalld (335-403)
  • save_whitelist (219-259)
  • is_ip_whitelisted (108-147)
src/firewalld.py (4)
  • initialize_firewalld (591-595)
  • is_enabled (78-80)
  • setup_knocker_zone (222-306)
  • restore_missing_rules (515-565)
src/models.py (4)
  • KnockResponse (41-54)
  • ErrorResponse (65-70)
  • KnockRequest (9-38)
  • HealthResponse (57-62)
tests/test_device_tokens.py (1)
src/core.py (2)
  • add_ip_to_whitelist (262-332)
  • load_whitelist (198-216)
src/core.py (1)
src/firewalld.py (4)
  • get_firewalld_integration (586-588)
  • is_enabled (78-80)
  • add_whitelist_rule (357-407)
  • remove_whitelist_rule (419-461)
🪛 GitHub Check: CodeQL
src/main.py

[failure] 507-507: Use of a broken or weak cryptographic hashing algorithm on sensitive data
Sensitive data (password) is used in a hashing algorithm (SHA256) that is insecure for password hashing, since it is not a computationally expensive hash function.


[failure] 536-536: Clear-text logging of sensitive information
This expression logs sensitive data (password) as clear text.
This expression logs sensitive data (password) as clear text.

src/core.py

[failure] 37-37: Uncontrolled data used in path expression
This path depends on a user-provided value.
This path depends on a user-provided value.


[failure] 202-202: Uncontrolled data used in path expression
This path depends on a user-provided value.
This path depends on a user-provided value.


[failure] 206-206: Uncontrolled data used in path expression
This path depends on a user-provided value.
This path depends on a user-provided value.
This path depends on a user-provided value.


[failure] 245-245: Uncontrolled data used in path expression
This path depends on a user-provided value.
This path depends on a user-provided value.
This path depends on a user-provided value.

🪛 Ruff (0.14.5)
src/main.py

170-170: Do not catch blind exception: Exception

(BLE001)


178-178: Do not catch blind exception: Exception

(BLE001)


179-181: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


300-300: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


370-370: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


589-591: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


602-602: Do not catch blind exception: Exception

(BLE001)


603-605: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


613-613: Do not catch blind exception: Exception

(BLE001)


614-614: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


624-624: Do not catch blind exception: Exception

(BLE001)


625-625: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


657-657: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

tests/test_device_tokens.py

49-49: Possible hardcoded password assigned to: "token_id"

(S105)


75-75: Possible hardcoded password assigned to: "other_token"

(S105)

src/core.py

289-289: Avoid specifying long messages outside the exception class

(TRY003)


294-296: Avoid specifying long messages outside the exception class

(TRY003)


385-385: Do not catch blind exception: Exception

(BLE001)


386-386: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


388-388: Consider moving this statement to an else block

(TRY300)


389-389: Do not catch blind exception: Exception

(BLE001)


394-396: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


397-397: Do not catch blind exception: Exception

(BLE001)


398-400: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


402-402: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (11)
src/core.py (6)

108-147: LGTM! Enhanced whitelist checking with dict support.

The updated logic correctly handles both legacy integer expiry values and new dict-based entries with token_id. The type checking with isinstance(value, dict) ensures backward compatibility.


219-260: Verify path injection risk is mitigated.

CodeQL flags line 245 for potential path injection. Similar to the load_whitelist function, the temp file path is derived from user-controlled settings without runtime validation.

Same verification as previous comments - check if config loading validates paths properly.


229-238: LGTM! Smart sorting helper for mixed value types.

The get_expiry helper function correctly extracts expiry times from both dict and int entries for proper sorting when the whitelist exceeds size limits.


262-332: LGTM! Token-based IP replacement logic is correct.

The implementation properly enforces one-IP-per-token:

  • Lines 309-322: Correctly finds and removes existing IPs for the same token_id
  • Lines 325-329: Properly stores new entries with token_id or as legacy int for backward compatibility
  • Line 332: Returns the replaced IP for firewalld cleanup

The logic handles the edge case where the same IP is re-whitelisted (just updates TTL without replacement).


424-437: LGTM! Cleanup properly handles mixed value types.

The cleanup logic correctly extracts expiry times from both dict and int entries, preserving the original value format when building the fresh whitelist.


459-489: LGTM! Constant-time comparison prevents timing attacks.

The implementation correctly uses bitwise OR to prevent short-circuiting and iterates through all keys to maintain constant-time behavior. This is a security best practice for API key validation.

tests/test_device_tokens.py (1)

1-81: LGTM! Comprehensive test coverage for device token feature.

The tests properly verify:

  1. Legacy behavior (lines 27-41): Multiple IPs allowed without token_id
  2. Per-token replacement (lines 44-72): Old IP removed when new IP added for same token_id, correct return value
  3. Cross-token isolation (lines 74-81): Different tokens don't interfere with each other

The fixture properly manages temporary whitelist files. Assertions are clear and comprehensive.

src/main.py (4)

507-507: SHA-256 usage is appropriate for token identification.

CodeQL flags this as weak cryptography for password hashing, but this is a false positive. The code is not storing passwords—it's generating a deterministic identifier from the API key to enforce the one-IP-per-token policy. SHA-256 is appropriate here since:

  • It's used for identification, not authentication
  • Deterministic output is required (same token always generates same ID)
  • The API keys themselves are already secrets stored securely

533-540: Review logging of potentially sensitive information.

CodeQL flags line 536 for logging sensitive data. The code logs ip_to_whitelist at DEBUG level. While IP addresses are generally not considered sensitive, in some contexts they could reveal user identity. The current DEBUG-level logging is a reasonable balance, but consider:

  • The comment mentions avoiding logging at INFO level, which is good
  • ip_to_whitelist might be sensitive depending on deployment context
  • Consider whether even DEBUG-level logging is necessary in production

Based on learnings or coding guidelines, is IP address logging at DEBUG level acceptable for this project's security requirements?


511-523: LGTM! Token ID properly passed to firewalld integration.

The implementation correctly:

  • Generates token_id from the API key (line 507)
  • Passes it to add_ip_to_whitelist_with_firewalld (line 512)
  • Enforces the one-IP-per-token policy at the core level

300-301: False positive: Depends() in defaults is FastAPI pattern.

Ruff flags B008 here, but using Depends() in function parameter defaults is the standard FastAPI dependency injection pattern. This is not an issue.

- Validate storage_path using .resolve() to prevent directory traversal
- Redact API keys in logs (src/core.py: get_api_key_name)
- Use logging.exception for better error tracing
- Clarify SHA256 usage for token_id (not for auth)
Comment on lines +201 to +203
path = Path(
settings.get("whitelist", {}).get("storage_path", "whitelist.json")
).resolve()

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
This path depends on a
user-provided value
.
This path depends on a
user-provided value
.

Copilot Autofix

AI about 2 months ago

To mitigate uncontrolled path usage, we should ensure that settings["whitelist"]["storage_path"] always resolves to a path within an intended “safe” directory—by default, the directory containing the application or a specific subdirectory, such as /var/app_data or similar. The resolution is best achieved by always joining the (possibly user-supplied) filename to a trusted base directory, normalizing the resulting path, and verifying that the final resolved path starts with the base directory path. If not, we should raise an exception or log and return an error, refusing to proceed. This fix involves defining a safe base directory (e.g., using a constant, or a setting initialized at startup and not modifiable by external users), updating the logic in load_whitelist() (and everywhere else that processes or writes to the whitelist file) to check/normalize the resolved file path, and enforcing that the whitelist is always read and written only within this directory. Additional imports are not strictly required, as pathlib is already imported, but we will need to insert code blocks for validation and error handling just prior to accessing the derived file.


Suggested changeset 1
src/core.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/src/core.py b/src/core.py
--- a/src/core.py
+++ b/src/core.py
@@ -6,10 +6,10 @@
 import logging
 import hmac
 from pathlib import Path
+import os
 from typing import Dict, Any, Optional, Union
 from contextlib import contextmanager
 
-# Thread lock for whitelist operations
 # Using RLock (reentrant lock) to allow nested lock acquisition
 # in read-modify-write sequences
 _whitelist_lock = threading.RLock()
@@ -198,10 +195,15 @@
 def load_whitelist(settings: Dict[str, Any]) -> Dict[str, Union[int, Dict[str, Any]]]:
     """Loads the whitelist from the JSON file with thread safety."""
     with _whitelist_lock:
-        path = Path(
-            settings.get("whitelist", {}).get("storage_path", "whitelist.json")
-        ).resolve()
-        if not path.exists():
+        # Always join to SAFE_WHITELIST_DIR and resolve, to prevent directory traversal
+        raw_storage_path = settings.get("whitelist", {}).get("storage_path", "whitelist.json")
+        storage_path = Path(raw_storage_path)
+        # Prevent absolute paths or traversal by always joining to the safe base
+        full_path = (SAFE_WHITELIST_DIR / storage_path).resolve()
+        # Ensure the resulting path is within the safe base directory
+        if not str(full_path).startswith(str(SAFE_WHITELIST_DIR) + os.sep):
+            raise Exception("Whitelist storage_path must be inside the designated directory.")
+        if not full_path.exists():
             return {}
 
         try:
EOF
@@ -6,10 +6,10 @@
import logging
import hmac
from pathlib import Path
import os
from typing import Dict, Any, Optional, Union
from contextlib import contextmanager

# Thread lock for whitelist operations
# Using RLock (reentrant lock) to allow nested lock acquisition
# in read-modify-write sequences
_whitelist_lock = threading.RLock()
@@ -198,10 +195,15 @@
def load_whitelist(settings: Dict[str, Any]) -> Dict[str, Union[int, Dict[str, Any]]]:
"""Loads the whitelist from the JSON file with thread safety."""
with _whitelist_lock:
path = Path(
settings.get("whitelist", {}).get("storage_path", "whitelist.json")
).resolve()
if not path.exists():
# Always join to SAFE_WHITELIST_DIR and resolve, to prevent directory traversal
raw_storage_path = settings.get("whitelist", {}).get("storage_path", "whitelist.json")
storage_path = Path(raw_storage_path)
# Prevent absolute paths or traversal by always joining to the safe base
full_path = (SAFE_WHITELIST_DIR / storage_path).resolve()
# Ensure the resulting path is within the safe base directory
if not str(full_path).startswith(str(SAFE_WHITELIST_DIR) + os.sep):
raise Exception("Whitelist storage_path must be inside the designated directory.")
if not full_path.exists():
return {}

try:
Copilot is powered by AI and may make mistakes. Always verify output.
Comment on lines +226 to +228
path = Path(
settings.get("whitelist", {}).get("storage_path", "whitelist.json")
).resolve()

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
This path depends on a
user-provided value
.
This path depends on a
user-provided value
.

Copilot Autofix

AI about 2 months ago

To fix this issue, we need to ensure that any path derived from user input is validated before being used in file I/O operations, in particular for both saving and loading the whitelist. The recommended practice, especially if the path may span nested subfolders, is to resolve the computed path and ensure it remains inside a designated safe directory (base_path). We should:

  • Define a root directory (base_path) for whitelist storage (e.g., /server/static/whitelist, or configured similarly).
  • After joining base_path with the user-provided relative path, resolve and normalize the result.
  • Ensure that the resolved path starts with the base_path (string or Path prefix).
  • Refuse operations (raise or fallback to default) if the path is outside the safe root.

Steps in code:

  • In both load_whitelist and save_whitelist, compute the path as Path(base_path) / storage_path, resolve it, and check it is beneath base_path.
  • If not, raise an exception or fallback to default safe file.
  • Optionally define base_path via settings with a secure default (never allow empty or /).

Regions to change:

  • Inside load_whitelist and save_whitelist functions in src/core.py.

Imports:
No new external libraries are needed. Only standard Python (pathlib).

Suggested changeset 1
src/core.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/src/core.py b/src/core.py
--- a/src/core.py
+++ b/src/core.py
@@ -196,11 +196,17 @@
 
 
 def load_whitelist(settings: Dict[str, Any]) -> Dict[str, Union[int, Dict[str, Any]]]:
-    """Loads the whitelist from the JSON file with thread safety."""
+    """Loads the whitelist from the JSON file with thread safety and path validation."""
     with _whitelist_lock:
-        path = Path(
-            settings.get("whitelist", {}).get("storage_path", "whitelist.json")
-        ).resolve()
+        # Get base path and user path
+        base_path = Path(settings.get("whitelist", {}).get("storage_root", "whitelists/")).resolve()
+        storage_name = settings.get("whitelist", {}).get("storage_path", "whitelist.json")
+        # Compute the file path
+        path = (base_path / storage_name).resolve()
+        # Check that path is inside base_path
+        if not str(path).startswith(str(base_path)):
+            logging.error(f"Whitelist file path '{path}' is outside base directory '{base_path}'")
+            return {}
         if not path.exists():
             return {}
 
@@ -221,11 +226,14 @@
 def save_whitelist(
     whitelist: Dict[str, Union[int, Dict[str, Any]]], settings: Dict[str, Any]
 ):
-    """Saves the whitelist to the JSON file with thread safety and size limits."""
+    """Saves the whitelist to the JSON file with thread safety, size limits, and path validation."""
     with _whitelist_lock:
-        path = Path(
-            settings.get("whitelist", {}).get("storage_path", "whitelist.json")
-        ).resolve()
+        base_path = Path(settings.get("whitelist", {}).get("storage_root", "whitelists/")).resolve()
+        storage_name = settings.get("whitelist", {}).get("storage_path", "whitelist.json")
+        path = (base_path / storage_name).resolve()
+        if not str(path).startswith(str(base_path)):
+            logging.error(f"Whitelist file path '{path}' is outside base directory '{base_path}'")
+            raise Exception("Whitelist file path is outside the allowed directory")
 
         # Security check: limit whitelist size to prevent DoS
         max_entries = settings.get("security", {}).get("max_whitelist_entries", 10000)
EOF
@@ -196,11 +196,17 @@


def load_whitelist(settings: Dict[str, Any]) -> Dict[str, Union[int, Dict[str, Any]]]:
"""Loads the whitelist from the JSON file with thread safety."""
"""Loads the whitelist from the JSON file with thread safety and path validation."""
with _whitelist_lock:
path = Path(
settings.get("whitelist", {}).get("storage_path", "whitelist.json")
).resolve()
# Get base path and user path
base_path = Path(settings.get("whitelist", {}).get("storage_root", "whitelists/")).resolve()
storage_name = settings.get("whitelist", {}).get("storage_path", "whitelist.json")
# Compute the file path
path = (base_path / storage_name).resolve()
# Check that path is inside base_path
if not str(path).startswith(str(base_path)):
logging.error(f"Whitelist file path '{path}' is outside base directory '{base_path}'")
return {}
if not path.exists():
return {}

@@ -221,11 +226,14 @@
def save_whitelist(
whitelist: Dict[str, Union[int, Dict[str, Any]]], settings: Dict[str, Any]
):
"""Saves the whitelist to the JSON file with thread safety and size limits."""
"""Saves the whitelist to the JSON file with thread safety, size limits, and path validation."""
with _whitelist_lock:
path = Path(
settings.get("whitelist", {}).get("storage_path", "whitelist.json")
).resolve()
base_path = Path(settings.get("whitelist", {}).get("storage_root", "whitelists/")).resolve()
storage_name = settings.get("whitelist", {}).get("storage_path", "whitelist.json")
path = (base_path / storage_name).resolve()
if not str(path).startswith(str(base_path)):
logging.error(f"Whitelist file path '{path}' is outside base directory '{base_path}'")
raise Exception("Whitelist file path is outside the allowed directory")

# Security check: limit whitelist size to prevent DoS
max_entries = settings.get("security", {}).get("max_whitelist_entries", 10000)
Copilot is powered by AI and may make mistakes. Always verify output.
Comment on lines +304 to +305
settings.get("whitelist", {}).get("storage_path", "whitelist.json")
).resolve()

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.

Copilot Autofix

AI about 2 months ago

To fix this issue, we must ensure that the user-controlled value used to construct the whitelist file path is strictly contained within a safe, predefined directory and cannot escape via path traversal (e.g., ../), absolute paths, or symlinks. The "safe root" technique from the background is most appropriate here. In particular:

  • Determine a constant, safe directory where whitelist storage files must live (e.g., /var/app/whitelists or just the directory of the running application). This directory root should be hardcoded.
  • When constructing the file path from settings.get("whitelist", {}).get("storage_path", ...), normalize it (using os.path.normpath or equivalent), join it to the safe root, resolve it, and check that it starts with the safe root dir after resolution.
  • If the check fails, raise an exception (or fallback to a default file, or deny the request).

The code to update is the block at line 303–305 in add_ip_to_whitelist (in src/core.py).
You will also need to add an import for os (if not present) for path normalization, and define a constant safe root folder (e.g., WHITELIST_ROOT = Path(__file__).parent.resolve() or a fixed path).

Changes required:

  • Add WHITELIST_ROOT (as a module-level constant).
  • Validate the constructed path before use, ensuring containment inside WHITELIST_ROOT.
  • Import os if needed.

Suggested changeset 1
src/core.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/src/core.py b/src/core.py
--- a/src/core.py
+++ b/src/core.py
@@ -6,9 +6,14 @@
 import logging
 import hmac
 from pathlib import Path
+import os
 from typing import Dict, Any, Optional, Union
 from contextlib import contextmanager
 
+# Root folder for all whitelist storage.
+# You may change this to an absolute path if you want e.g. "/var/app/whitelists"
+WHITELIST_ROOT = Path(__file__).parent.resolve()
+
 # Thread lock for whitelist operations
 # Using RLock (reentrant lock) to allow nested lock acquisition
 # in read-modify-write sequences
@@ -300,9 +302,13 @@
         )
 
     # Get whitelist path for cross-process locking
-    whitelist_path = Path(
-        settings.get("whitelist", {}).get("storage_path", "whitelist.json")
-    ).resolve()
+    raw_storage_path = settings.get("whitelist", {}).get("storage_path", "whitelist.json")
+    # Prevent path traversal: only allow files within WHITELIST_ROOT
+    candidate_path = WHITELIST_ROOT / raw_storage_path
+    whitelist_path = candidate_path.resolve()
+    # Validate that the resolved path is inside WHITELIST_ROOT
+    if not str(whitelist_path).startswith(str(WHITELIST_ROOT) + os.sep):
+        raise ValueError(f"Whitelist storage path resolves outside allowed directory: {whitelist_path}")
 
     # Use both in-process and cross-process locks
     with _whitelist_lock:
EOF
@@ -6,9 +6,14 @@
import logging
import hmac
from pathlib import Path
import os
from typing import Dict, Any, Optional, Union
from contextlib import contextmanager

# Root folder for all whitelist storage.
# You may change this to an absolute path if you want e.g. "/var/app/whitelists"
WHITELIST_ROOT = Path(__file__).parent.resolve()

# Thread lock for whitelist operations
# Using RLock (reentrant lock) to allow nested lock acquisition
# in read-modify-write sequences
@@ -300,9 +302,13 @@
)

# Get whitelist path for cross-process locking
whitelist_path = Path(
settings.get("whitelist", {}).get("storage_path", "whitelist.json")
).resolve()
raw_storage_path = settings.get("whitelist", {}).get("storage_path", "whitelist.json")
# Prevent path traversal: only allow files within WHITELIST_ROOT
candidate_path = WHITELIST_ROOT / raw_storage_path
whitelist_path = candidate_path.resolve()
# Validate that the resolved path is inside WHITELIST_ROOT
if not str(whitelist_path).startswith(str(WHITELIST_ROOT) + os.sep):
raise ValueError(f"Whitelist storage path resolves outside allowed directory: {whitelist_path}")

# Use both in-process and cross-process locks
with _whitelist_lock:
Copilot is powered by AI and may make mistakes. Always verify output.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/main.py (1)

593-593: Remove redundant exception objects from logging.exception calls.

The logging.exception() method automatically includes exception information in the traceback. Passing the exception object in the message string (e.g., f"... {e}") is redundant and can make logs less clean.

Apply this pattern to clean up the logging calls:

-logging.exception(f"Health check failed: Could not read whitelist storage: {e}")
+logging.exception("Health check failed: Could not read whitelist storage")

Similar changes should be applied to lines 607, 618, and 630.

As per static analysis hints.

Also applies to: 607-607, 618-618, 630-630

src/core.py (1)

391-391: Remove redundant exception objects from logging.exception calls.

Similar to src/main.py, these logging.exception() calls include redundant exception objects in the message string. The exception information is automatically included in the traceback.

Apply this pattern:

-logging.exception(f"Failed to remove old firewall rule for {old_ip}: {e}")
+logging.exception(f"Failed to remove old firewall rule for {old_ip}")

Similar cleanup should be applied to lines 401, 405, and 408.

As per static analysis hints.

Also applies to: 401-401, 405-405, 408-408

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c3a7f03 and 1744379.

📒 Files selected for processing (2)
  • src/core.py (10 hunks)
  • src/main.py (20 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-08T22:31:37.677Z
Learnt from: CR
Repo: FarisZR/knocker PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-09-08T22:31:37.677Z
Learning: Applies to knocker.yaml : Set the whitelist storage path in knocker.yaml (JSON at /data/whitelist.json in container)

Applied to files:

  • src/core.py
🧬 Code graph analysis (2)
src/main.py (2)
src/core.py (10)
  • cleanup_expired_ips (412-443)
  • load_whitelist (198-218)
  • is_valid_api_key (465-495)
  • can_whitelist_remote (449-454)
  • is_valid_ip_or_cidr (48-54)
  • is_safe_cidr_range (57-78)
  • get_max_ttl_for_key (457-462)
  • add_ip_to_whitelist_with_firewalld (339-409)
  • save_whitelist (221-263)
  • is_ip_whitelisted (108-147)
src/firewalld.py (4)
  • initialize_firewalld (591-595)
  • is_enabled (78-80)
  • setup_knocker_zone (222-306)
  • restore_missing_rules (515-565)
src/core.py (1)
src/firewalld.py (4)
  • get_firewalld_integration (586-588)
  • is_enabled (78-80)
  • add_whitelist_rule (357-407)
  • remove_whitelist_rule (419-461)
🪛 GitHub Check: CodeQL
src/main.py

[failure] 510-510: Use of a broken or weak cryptographic hashing algorithm on sensitive data
Sensitive data (password) is used in a hashing algorithm (SHA256) that is insecure for password hashing, since it is not a computationally expensive hash function.

src/core.py

[failure] 201-203: Uncontrolled data used in path expression
This path depends on a user-provided value.
This path depends on a user-provided value.
This path depends on a user-provided value.


[failure] 204-204: Uncontrolled data used in path expression
This path depends on a user-provided value.
This path depends on a user-provided value.


[failure] 226-228: Uncontrolled data used in path expression
This path depends on a user-provided value.
This path depends on a user-provided value.
This path depends on a user-provided value.


[failure] 303-305: Uncontrolled data used in path expression
This path depends on a user-provided value.


[failure] 420-422: Uncontrolled data used in path expression
This path depends on a user-provided value.

🪛 Ruff (0.14.5)
src/main.py

170-170: Do not catch blind exception: Exception

(BLE001)


178-178: Do not catch blind exception: Exception

(BLE001)


179-181: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


300-300: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


370-370: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


593-593: Redundant exception object included in logging.exception call

(TRY401)


607-607: Redundant exception object included in logging.exception call

(TRY401)


618-618: Redundant exception object included in logging.exception call

(TRY401)


630-630: Redundant exception object included in logging.exception call

(TRY401)


662-662: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

src/core.py

293-293: Avoid specifying long messages outside the exception class

(TRY003)


298-300: Avoid specifying long messages outside the exception class

(TRY003)


391-391: Redundant exception object included in logging.exception call

(TRY401)


394-394: Consider moving this statement to an else block

(TRY300)


401-401: Redundant exception object included in logging.exception call

(TRY401)


405-405: Redundant exception object included in logging.exception call

(TRY401)


408-408: Redundant exception object included in logging.exception call

(TRY401)

🔇 Additional comments (7)
src/main.py (2)

506-510: Acknowledge security scanner warning: SHA256 usage is appropriate here.

GitHub Advanced Security flags SHA256 as weak for password hashing. However, this is a false positive in this context. The code correctly uses SHA256 to generate a deterministic identifier from the API key to enforce the one-IP-per-token policy. The API key itself remains the secret; the hash is only used for identification, not for security. This is an appropriate use of SHA256.

Based on past review comments from static analysis.


514-516: LGTM: Token ID integration.

The token_id is correctly passed to the core whitelisting function, enabling the one-IP-per-token behavior.

src/core.py (5)

201-203: LGTM: Path normalization addresses security concerns.

The addition of .resolve() to normalize whitelist paths addresses the path injection concerns flagged by CodeQL and previous reviews. This prevents directory traversal attacks by resolving .. components and normalizing the path before file operations.

Based on past review comments.

Also applies to: 226-228, 303-305, 420-422


133-145: LGTM: Backward-compatible whitelist checking.

The updated logic correctly handles both legacy entries (integer expiry) and new token-based entries (dict with expiry and token_id), maintaining backward compatibility while supporting the new device token feature.


234-242: LGTM: Expiry extraction helper for size limiting.

The helper function cleanly extracts expiry timestamps from both legacy (int) and new (dict) formats, ensuring the size-limiting logic works correctly across both entry types.


312-336: LGTM: Token replacement logic implements device token behavior.

The logic correctly implements one-IP-per-token behavior:

  1. Searches for existing entries with the same token_id
  2. Removes the old IP if different from the new one
  3. Returns the removed IP for firewall rule cleanup
  4. Maintains backward compatibility by storing token-less entries as integers

The use of list(whitelist.items()) prevents modification during iteration, and breaking after the first match is appropriate since each token_id should have at most one entry.


382-392: LGTM: Firewall rule cleanup for replaced IPs.

The logic correctly removes firewall rules for the old IP when a token's whitelisted IP is replaced. The defensive error handling ensures that failures in rule removal don't prevent the overall operation from succeeding.

@FarisZR
Copy link
Owner Author

FarisZR commented Nov 18, 2025

I've added tests/test_migration.py which confirms that the "lazy migration" strategy works as intended for backward compatibility:

  1. Legacy Support: The system correctly reads and validates existing whitelist.json files containing the old integer format {"ip": expiry}.
  2. Mixed Content: The system functions correctly with whitelist.json files containing both old (integer) and new (dict with token_id) entries.
  3. Adoption:
    • Legacy entries (which have no associated token_id) remain active until they expire, effectively being "unowned".
    • If a client knocks again for a legacy IP using a token, that entry is seamlessly updated to the new format, becoming "owned" by that token.
    • Once "owned", the one-IP-per-token rule applies, and subsequent knocks for different IPs with that token will properly cleanup the old entry.

No manual migration script is required; the system will naturally migrate active users to the new format as they continue to use the service.

@FarisZR
Copy link
Owner Author

FarisZR commented Nov 18, 2025

integration test updates are still missing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

One whitelist per Token AKA Device Tokens

2 participants