-
Notifications
You must be signed in to change notification settings - Fork 18
Address review comments #188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
168b23c
remove host:port
talvasconcelos 88329d8
fix parallel lists
talvasconcelos 96cc69b
private methods in bottom
talvasconcelos dc352e6
extract functions to helpers and services
talvasconcelos f2b9deb
chore: linter
talvasconcelos 6fea733
images are a csv of ids
talvasconcelos b9624af
chore: linter
talvasconcelos File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| import json | ||
|
|
||
| from loguru import logger | ||
|
|
||
|
|
||
| def _from_csv(value: str | None, separator: str = ",") -> list[str]: | ||
| if not value: | ||
| return [] | ||
| parts = [part.strip() for part in value.split(separator)] | ||
| return [part for part in parts if part] | ||
|
|
||
|
|
||
| def _serialize_inventory_tags(tags: list[str] | str | None) -> str | None: | ||
| if isinstance(tags, list): | ||
| return ",".join([tag for tag in tags if tag]) | ||
| return tags | ||
|
|
||
|
|
||
| def _inventory_tags_to_list(raw_tags: str | list[str] | None) -> list[str]: | ||
| if raw_tags is None: | ||
| return [] | ||
| if isinstance(raw_tags, list): | ||
| return [tag.strip() for tag in raw_tags if tag and tag.strip()] | ||
| return [tag.strip() for tag in raw_tags.split(",") if tag and tag.strip()] | ||
|
|
||
|
|
||
| def _inventory_tags_to_string(raw_tags: str | list[str] | None) -> str | None: | ||
| if raw_tags is None: | ||
| return None | ||
| if isinstance(raw_tags, str): | ||
| return raw_tags | ||
| return ",".join([tag for tag in raw_tags if tag]) | ||
|
|
||
|
|
||
| def _first_image(images: str | list[str] | None) -> str | None: | ||
| if not images: | ||
| return None | ||
| if isinstance(images, list): | ||
| return _normalize_image(images[0]) if images else None | ||
| raw = str(images).strip() | ||
| if not raw: | ||
| return None | ||
| try: | ||
| parsed = json.loads(raw) | ||
| if isinstance(parsed, list) and parsed: | ||
| return _normalize_image(parsed[0]) | ||
| except Exception as exc: | ||
| logger.exception(f"Exception occurred while parsing image JSON: {exc}") | ||
|
|
||
| if "|||" in raw: | ||
| return _normalize_image(raw.split("|||")[0]) | ||
|
|
||
| if "," in raw: | ||
| return _normalize_image(raw.split(",")[0]) | ||
| return _normalize_image(raw) | ||
|
|
||
|
|
||
| def _normalize_image(val: str | None) -> str | None: | ||
| if not val: | ||
| return None | ||
| val = str(val).strip() | ||
| if not val: | ||
| return None | ||
| if val.startswith("http") or val.startswith("/api/") or val.startswith("data:"): | ||
| return val | ||
| return f"/api/v1/assets/{val}/binary" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,120 @@ | ||
| from typing import Any | ||
|
|
||
| import httpx | ||
| from lnbits.core.crud import get_wallet | ||
| from lnbits.core.models import User | ||
| from lnbits.helpers import create_access_token | ||
| from lnbits.settings import settings | ||
|
|
||
| from .helpers import _from_csv, _inventory_tags_to_list | ||
|
|
||
|
|
||
| async def _deduct_inventory_stock(wallet_id: str, inventory_payload: dict) -> None: | ||
| wallet = await get_wallet(wallet_id) | ||
| if not wallet: | ||
| return | ||
| inventory_id = inventory_payload.get("inventory_id") | ||
| items = inventory_payload.get("items") or [] | ||
| if not inventory_id or not items: | ||
| return | ||
| items_to_update = [] | ||
| for item in items: | ||
| item_id = item.get("id") | ||
| qty = item.get("quantity") or 0 | ||
| if not item_id or qty <= 0: | ||
| continue | ||
| items_to_update.append({"id": item_id, "quantity": int(qty)}) | ||
| if not items_to_update: | ||
| return | ||
|
|
||
| ids = [item["id"] for item in items_to_update] | ||
| quantities = [item["quantity"] for item in items_to_update] | ||
|
|
||
| # Needed to accomodate admin users, as using user ID is not possible | ||
| access = create_access_token( | ||
| {"sub": "", "usr": wallet.user}, token_expire_minutes=1 | ||
| ) | ||
| async with httpx.AsyncClient() as client: | ||
| await client.patch( | ||
| url=f"http://{settings.host}:{settings.port}/inventory/api/v1/items/{inventory_id}/quantities", | ||
| headers={"Authorization": f"Bearer {access}"}, | ||
| params={"source": "tpos", "ids": ids, "quantities": quantities}, | ||
| ) | ||
| return | ||
|
|
||
|
|
||
| async def _get_default_inventory(user_id: str) -> dict[str, Any] | None: | ||
| access = create_access_token({"sub": "", "usr": user_id}, token_expire_minutes=1) | ||
| async with httpx.AsyncClient() as client: | ||
| resp = await client.get( | ||
| url=f"http://{settings.host}:{settings.port}/inventory/api/v1", | ||
| headers={"Authorization": f"Bearer {access}"}, | ||
| ) | ||
| inventory = resp.json() | ||
| if not inventory: | ||
| return None | ||
| if isinstance(inventory, list): | ||
| inventory = inventory[0] if inventory else None | ||
| if not isinstance(inventory, dict): | ||
| return None | ||
| inventory["tags"] = _inventory_tags_to_list(inventory.get("tags")) | ||
| inventory["omit_tags"] = _inventory_tags_to_list(inventory.get("omit_tags")) | ||
| return inventory | ||
|
|
||
|
|
||
| async def _get_inventory_items_for_tpos( | ||
| user_id: str, | ||
| inventory_id: str, | ||
| tags: str | list[str] | None, | ||
| omit_tags: str | list[str] | None, | ||
| ) -> list[Any]: | ||
| tag_list = _inventory_tags_to_list(tags) | ||
| omit_list = [tag.lower() for tag in _inventory_tags_to_list(omit_tags)] | ||
| allowed_tags = [tag.lower() for tag in tag_list] | ||
| access = create_access_token({"sub": "", "usr": user_id}, token_expire_minutes=1) | ||
| async with httpx.AsyncClient() as client: | ||
| resp = await client.get( | ||
| url=f"http://{settings.host}:{settings.port}/inventory/api/v1/items/{inventory_id}/paginated", | ||
| headers={"Authorization": f"Bearer {access}"}, | ||
| params={"limit": 500, "offset": 0, "is_active": True}, | ||
| ) | ||
| payload = resp.json() | ||
| items = payload.get("data", []) if isinstance(payload, dict) else payload | ||
|
|
||
| # item images are a comma separated string; make a list | ||
| for item in items: | ||
| images = item.get("images") | ||
| item["images"] = _from_csv(images) | ||
|
|
||
| def has_allowed_tag(item_tags: str | list[str] | None) -> bool: | ||
| # When no tags are configured for this TPoS, show no items | ||
| if not tag_list: | ||
| return False | ||
| item_tag_list = [tag.lower() for tag in _inventory_tags_to_list(item_tags)] | ||
| return any(tag in item_tag_list for tag in allowed_tags) | ||
|
|
||
| def has_omit_tag(item_omit_tags: str | list[str] | None) -> bool: | ||
| if not omit_list: | ||
| return False | ||
| item_tag_list = [tag.lower() for tag in _inventory_tags_to_list(item_omit_tags)] | ||
| return any(tag in item_tag_list for tag in omit_list) | ||
|
|
||
| filtered = [ | ||
| item | ||
| for item in items | ||
| if has_allowed_tag(item.get("tags")) and not has_omit_tag(item.get("omit_tags")) | ||
| ] | ||
| # If no items matched the provided tags, fall back to all items minus omitted ones. | ||
| if tag_list and not filtered: | ||
| filtered = [item for item in items if not has_omit_tag(item.get("omit_tags"))] | ||
|
|
||
| # hide items with no stock when stock tracking is enabled | ||
| return [ | ||
| item | ||
| for item in filtered | ||
| if item.get("quantity_in_stock") is None or item.get("quantity_in_stock") > 0 | ||
| ] | ||
|
|
||
|
|
||
| def _inventory_available_for_user(user: User | None) -> bool: | ||
| return bool(user and "inventory" in (user.extensions or [])) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these should be public