Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions tests/test_sqlite_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import pytest
import tempfile
from pathlib import Path
from tix.storage.sqlite_storage import SQLiteTaskStorage

@pytest.fixture
def sqlite_storage():
"""Create temporary SQLite storage for tests"""
db_path = Path(tempfile.mktemp(suffix=".db"))
storage = SQLiteTaskStorage(db_path)
yield storage
db_path.unlink(missing_ok=True)

def test_add_and_get(sqlite_storage):
task = sqlite_storage.add_task("SQLite test", "high", ["work"])
retrieved = sqlite_storage.get_task(task.id)
assert retrieved is not None
assert retrieved.text == "SQLite test"
assert retrieved.priority == "high"

def test_update(sqlite_storage):
task = sqlite_storage.add_task("Old text")
task.text = "Updated text"
sqlite_storage.update_task(task)
retrieved = sqlite_storage.get_task(task.id)
assert retrieved.text == "Updated text"

def test_delete(sqlite_storage):
task = sqlite_storage.add_task("To delete")
deleted = sqlite_storage.delete_task(task.id)
assert deleted
assert sqlite_storage.get_task(task.id) is None

def test_active_and_completed(sqlite_storage):
t1 = sqlite_storage.add_task("Active task")
t2 = sqlite_storage.add_task("Done task")
t2.mark_done()
sqlite_storage.update_task(t2)

active = sqlite_storage.get_active_tasks()
completed = sqlite_storage.get_completed_tasks()

assert any(t.text == "Active task" for t in active)
assert any(t.text == "Done task" for t in completed)

def test_pagination_and_list(sqlite_storage):
for i in range(25):
sqlite_storage.add_task(f"Task {i}")

page1 = sqlite_storage.list_tasks(page=1, page_size=10)
page2 = sqlite_storage.list_tasks(page=2, page_size=10)

assert len(page1) == 10
assert page1[0].text == "Task 0"
assert len(page2) == 10
assert page2[0].text == "Task 10"
25 changes: 25 additions & 0 deletions tix/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,31 @@ def from_dict(cls, data: dict):
links=data.get('links', [])
)

# --- NEW: SQLite support ---
def to_row(self):
"""Convert Task to a SQLite row tuple (without id)"""
return (
self.text,
self.priority,
int(self.completed), # store bool as 0/1
self.created_at,
self.completed_at,
",".join(self.tags),
)

@classmethod
def from_row(cls, row):
"""Create Task from a SQLite row"""
return cls(
id=row["id"],
text=row["text"],
priority=row["priority"],
completed=bool(row["completed"]),
created_at=row["created_at"],
completed_at=row["completed_at"],
tags=row["tags"].split(",") if row["tags"] else []
)

def mark_done(self):
"""Mark task as completed with timestamp"""
self.completed = True
Expand Down
76 changes: 76 additions & 0 deletions tix/storage/sqlite_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import sqlite3
from pathlib import Path
from typing import List, Optional
from tix.models import Task

class SQLiteTaskStorage:
"""SQLite-based storage for tasks"""

def __init__(self, db_path: Path = None):
self.db_path = db_path or (Path.home() / ".tix" / "tasks.db")
self.conn = sqlite3.connect(self.db_path)
self.conn.row_factory = sqlite3.Row
self._init_db()

def _init_db(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
text TEXT NOT NULL,
priority TEXT,
completed INTEGER DEFAULT 0,
created_at TEXT,
completed_at TEXT,
tags TEXT
)
""")
self.conn.commit()

def add_task(self, text: str, priority: str = "medium", tags: List[str] = None) -> Task:
task = Task(id=0, text=text, priority=priority, tags=tags or [])
cur = self.conn.execute(
"INSERT INTO tasks (text, priority, completed, created_at, completed_at, tags) VALUES (?, ?, ?, ?, ?, ?)",
task.to_row()
)
self.conn.commit()
task.id = cur.lastrowid
return task

def get_task(self, task_id: int) -> Optional[Task]:
cur = self.conn.execute("SELECT * FROM tasks WHERE id=?", (task_id,))
row = cur.fetchone()
return Task.from_row(row) if row else None

def load_tasks(self) -> List[Task]:
cur = self.conn.execute("SELECT * FROM tasks ORDER BY id")
return [Task.from_row(row) for row in cur.fetchall()]

def update_task(self, task: Task):
self.conn.execute(
"UPDATE tasks SET text=?, priority=?, completed=?, created_at=?, completed_at=?, tags=? WHERE id=?",
(*task.to_row(), task.id)
)
self.conn.commit()

def delete_task(self, task_id: int) -> bool:
cur = self.conn.execute("DELETE FROM tasks WHERE id=?", (task_id,))
self.conn.commit()
return cur.rowcount > 0

def get_active_tasks(self) -> List[Task]:
cur = self.conn.execute("SELECT * FROM tasks WHERE completed=0 ORDER BY id")
return [Task.from_row(row) for row in cur.fetchall()]

def get_completed_tasks(self) -> List[Task]:
cur = self.conn.execute("SELECT * FROM tasks WHERE completed=1 ORDER BY id")
return [Task.from_row(row) for row in cur.fetchall()]

def list_tasks(self, page: int = 1, page_size: int = 20) -> List[Task]:
offset = (page - 1) * page_size
cur = self.conn.execute("SELECT * FROM tasks ORDER BY id LIMIT ? OFFSET ?", (page_size, offset))
return [Task.from_row(row) for row in cur.fetchall()]

def iter_tasks(self, start: int = 0, count: int = 20):
cur = self.conn.execute("SELECT * FROM tasks ORDER BY id LIMIT ? OFFSET ?", (count, start))
for row in cur.fetchall():
yield Task.from_row(row)