diff --git a/tests/test_sqlite_storage.py b/tests/test_sqlite_storage.py new file mode 100644 index 0000000..bc44dce --- /dev/null +++ b/tests/test_sqlite_storage.py @@ -0,0 +1,56 @@ +import pytest +import tempfile +from pathlib import Path +from tix.storage.sqlite_storage import SQLiteTaskStorage + +@pytest.fixture +def sqlite_storage(): + """Create temporary SQLite storage for tests""" + db_path = Path(tempfile.mktemp(suffix=".db")) + storage = SQLiteTaskStorage(db_path) + yield storage + db_path.unlink(missing_ok=True) + +def test_add_and_get(sqlite_storage): + task = sqlite_storage.add_task("SQLite test", "high", ["work"]) + retrieved = sqlite_storage.get_task(task.id) + assert retrieved is not None + assert retrieved.text == "SQLite test" + assert retrieved.priority == "high" + +def test_update(sqlite_storage): + task = sqlite_storage.add_task("Old text") + task.text = "Updated text" + sqlite_storage.update_task(task) + retrieved = sqlite_storage.get_task(task.id) + assert retrieved.text == "Updated text" + +def test_delete(sqlite_storage): + task = sqlite_storage.add_task("To delete") + deleted = sqlite_storage.delete_task(task.id) + assert deleted + assert sqlite_storage.get_task(task.id) is None + +def test_active_and_completed(sqlite_storage): + t1 = sqlite_storage.add_task("Active task") + t2 = sqlite_storage.add_task("Done task") + t2.mark_done() + sqlite_storage.update_task(t2) + + active = sqlite_storage.get_active_tasks() + completed = sqlite_storage.get_completed_tasks() + + assert any(t.text == "Active task" for t in active) + assert any(t.text == "Done task" for t in completed) + +def test_pagination_and_list(sqlite_storage): + for i in range(25): + sqlite_storage.add_task(f"Task {i}") + + page1 = sqlite_storage.list_tasks(page=1, page_size=10) + page2 = sqlite_storage.list_tasks(page=2, page_size=10) + + assert len(page1) == 10 + assert page1[0].text == "Task 0" + assert len(page2) == 10 + assert page2[0].text == "Task 10" diff --git a/tix/models.py b/tix/models.py index c8e2467..7e4db34 100644 --- a/tix/models.py +++ b/tix/models.py @@ -44,6 +44,31 @@ def from_dict(cls, data: dict): links=data.get('links', []) ) + # --- NEW: SQLite support --- + def to_row(self): + """Convert Task to a SQLite row tuple (without id)""" + return ( + self.text, + self.priority, + int(self.completed), # store bool as 0/1 + self.created_at, + self.completed_at, + ",".join(self.tags), + ) + + @classmethod + def from_row(cls, row): + """Create Task from a SQLite row""" + return cls( + id=row["id"], + text=row["text"], + priority=row["priority"], + completed=bool(row["completed"]), + created_at=row["created_at"], + completed_at=row["completed_at"], + tags=row["tags"].split(",") if row["tags"] else [] + ) + def mark_done(self): """Mark task as completed with timestamp""" self.completed = True diff --git a/tix/storage/sqlite_storage.py b/tix/storage/sqlite_storage.py new file mode 100644 index 0000000..83c922f --- /dev/null +++ b/tix/storage/sqlite_storage.py @@ -0,0 +1,76 @@ +import sqlite3 +from pathlib import Path +from typing import List, Optional +from tix.models import Task + +class SQLiteTaskStorage: + """SQLite-based storage for tasks""" + + def __init__(self, db_path: Path = None): + self.db_path = db_path or (Path.home() / ".tix" / "tasks.db") + self.conn = sqlite3.connect(self.db_path) + self.conn.row_factory = sqlite3.Row + self._init_db() + + def _init_db(self): + self.conn.execute(""" + CREATE TABLE IF NOT EXISTS tasks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + text TEXT NOT NULL, + priority TEXT, + completed INTEGER DEFAULT 0, + created_at TEXT, + completed_at TEXT, + tags TEXT + ) + """) + self.conn.commit() + + def add_task(self, text: str, priority: str = "medium", tags: List[str] = None) -> Task: + task = Task(id=0, text=text, priority=priority, tags=tags or []) + cur = self.conn.execute( + "INSERT INTO tasks (text, priority, completed, created_at, completed_at, tags) VALUES (?, ?, ?, ?, ?, ?)", + task.to_row() + ) + self.conn.commit() + task.id = cur.lastrowid + return task + + def get_task(self, task_id: int) -> Optional[Task]: + cur = self.conn.execute("SELECT * FROM tasks WHERE id=?", (task_id,)) + row = cur.fetchone() + return Task.from_row(row) if row else None + + def load_tasks(self) -> List[Task]: + cur = self.conn.execute("SELECT * FROM tasks ORDER BY id") + return [Task.from_row(row) for row in cur.fetchall()] + + def update_task(self, task: Task): + self.conn.execute( + "UPDATE tasks SET text=?, priority=?, completed=?, created_at=?, completed_at=?, tags=? WHERE id=?", + (*task.to_row(), task.id) + ) + self.conn.commit() + + def delete_task(self, task_id: int) -> bool: + cur = self.conn.execute("DELETE FROM tasks WHERE id=?", (task_id,)) + self.conn.commit() + return cur.rowcount > 0 + + def get_active_tasks(self) -> List[Task]: + cur = self.conn.execute("SELECT * FROM tasks WHERE completed=0 ORDER BY id") + return [Task.from_row(row) for row in cur.fetchall()] + + def get_completed_tasks(self) -> List[Task]: + cur = self.conn.execute("SELECT * FROM tasks WHERE completed=1 ORDER BY id") + return [Task.from_row(row) for row in cur.fetchall()] + + def list_tasks(self, page: int = 1, page_size: int = 20) -> List[Task]: + offset = (page - 1) * page_size + cur = self.conn.execute("SELECT * FROM tasks ORDER BY id LIMIT ? OFFSET ?", (page_size, offset)) + return [Task.from_row(row) for row in cur.fetchall()] + + def iter_tasks(self, start: int = 0, count: int = 20): + cur = self.conn.execute("SELECT * FROM tasks ORDER BY id LIMIT ? OFFSET ?", (count, start)) + for row in cur.fetchall(): + yield Task.from_row(row)