diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..3e28428 --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,142 @@ +name: CI + +on: + push: + branches: + - "main" + pull_request: + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + +jobs: + lint-python: + name: Lint Python + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + cache: "pip" + + - name: Run flake8 + uses: py-actions/flake8@v2 + + validate-compute-block: + name: Validate Compute Block Config + runs-on: ubuntu-latest + needs: lint-python + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + + - name: Intall dependencies + run: | + pip install -r requirements.txt + + - name: Check cbcs + run: | + python3 - <<'EOF' + import main + + from scystream.sdk.config import load_config, get_compute_block + from scystream.sdk.config.config_loader import _compare_configs + from pathlib import Path + + CBC_PATH = Path("cbc.yaml") + + if not CBC_PATH.exists(): + raise FileNotFoundError("cbc.yaml not found in repo root.") + + block_from_code = get_compute_block() + block_from_yaml = load_config(str(CBC_PATH)) + + _compare_configs(block_from_code, block_from_yaml) + + print("cbc.yaml matches python code definition") + EOF + + run-test: + name: Run Tests + runs-on: ubuntu-latest + needs: validate-compute-block + services: + minio: + image: lazybit/minio + ports: + - 9000:9000 + env: + MINIO_ROOT_USER: minioadmin + MINIO_ROOT_PASSWORD: minioadmin + options: >- + --health-cmd "curl -f http://localhost:9000/minio/health/live || exit 1" + --health-interval 5s + --health-retries 5 + --health-timeout 5s + postgres: + image: postgres:15 + ports: + - 5432:5432 + env: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: postgres + options: >- + --health-cmd="pg_isready -U postgres" + --health-interval=5s + --health-retries=10 + --health-timeout=5s + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + cache: "pip" + + - name: Install dependencies + run: | + pip install -r requirements.txt + + - name: Run Tests + run: pytest -vv + + build: + name: Build Docker Image + runs-on: ubuntu-latest + needs: run-test + permissions: + contents: read + packages: write + steps: + - name: Checkout Repository + uses: actions/checkout@v4 + + - name: Log in to Docker Hub + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract metadata for docker + id: meta + uses: docker/metadata-action@v5 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}/language-preprocessing + tags: | + type=ref, event=pr + type=raw, value=latest, enable=${{ (github.ref == format('refs/heads/{0}', 'main')) }} + + - name: Build and push Docker image + uses: docker/build-push-action@v5 + with: + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + diff --git a/.github/workflows/docker.yaml b/.github/workflows/docker.yaml deleted file mode 100644 index df0d4cf..0000000 --- a/.github/workflows/docker.yaml +++ /dev/null @@ -1,44 +0,0 @@ -name: Docker -on: - push: - branches: - - "main" - pull_request: - -env: - REGISTRY: ghcr.io - IMAGE_NAME: ${{ github.repository }} - -jobs: - build: - name: Build docker image - runs-on: ubuntu-latest - permissions: - contents: read - packages: write - steps: - - name: Checkout Repository - uses: actions/checkout@v4 - - - name: Log in to Docker Hub - uses: docker/login-action@v3 - with: - registry: ${{ env.REGISTRY }} - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - - - name: Extract metadata for docker - id: meta - uses: docker/metadata-action@v5 - with: - images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}/language-preprocessing - tags: | - type=ref, event=pr - type=raw, value=latest, enable=${{ (github.ref == format('refs/heads/{0}', 'main')) }} - - - name: Build and push Docker image - uses: docker/build-push-action@v5 - with: - push: true - tags: ${{ steps.meta.outputs.tags }} - labels: ${{ steps.meta.outputs.labels }} diff --git a/cbc.yaml b/cbc.yaml index 0932961..4df19d8 100644 --- a/cbc.yaml +++ b/cbc.yaml @@ -1,15 +1,16 @@ -author: Paul Kalhorn +author: Paul Kalhorn description: Language preprocessing for .txt or .bib files docker_image: ghcr.io/rwth-time/language-preprocessing/language-preprocessing entrypoints: preprocess_bib_file: - description: Entrypoint for preprocessing a .bib file + description: Entrypoint for preprocessing an attribute of a .bib file envs: + BIB_DOWNLOAD_PATH: /tmp/input.bib FILTER_STOPWORDS: true LANGUAGE: en NGRAM_MAX: 3 NGRAM_MIN: 2 - UNIGRAM_NORMALIZER: porter + UNIGRAM_NORMALIZER: lemma USE_NGRAMS: true inputs: bib_input: @@ -23,33 +24,18 @@ entrypoints: bib_file_S3_PORT: null bib_file_S3_SECRET_KEY: null bib_file_SELECTED_ATTRIBUTE: Abstract - description: The bib file, aswell as one attribute selected for preprocessing + description: The bib file, aswell as one attribute selected for preprocessing type: file outputs: - dtm_output: + normalized_docs_output: config: - dtm_output_BUCKET_NAME: null - dtm_output_FILE_EXT: pkl - dtm_output_FILE_NAME: null - dtm_output_FILE_PATH: null - dtm_output_S3_ACCESS_KEY: null - dtm_output_S3_HOST: null - dtm_output_S3_PORT: null - dtm_output_S3_SECRET_KEY: null - description: Numpy representation of document-term matrix as .pkl file - type: file - vocab_output: - config: - vocab_output_BUCKET_NAME: null - vocab_output_FILE_EXT: pkl - vocab_output_FILE_NAME: null - vocab_output_FILE_PATH: null - vocab_output_S3_ACCESS_KEY: null - vocab_output_S3_HOST: null - vocab_output_S3_PORT: null - vocab_output_S3_SECRET_KEY: null - description: Pkl file of a dictionary that maps all words to their index in the DTM - type: file + normalized_docs_DB_TABLE: null + normalized_docs_PG_HOST: null + normalized_docs_PG_PASS: null + normalized_docs_PG_PORT: null + normalized_docs_PG_USER: null + description: Database Output, containing bib_id aswell as the normalized text + type: pg_table preprocess_txt_file: description: Entrypoint to preprocess a .txt file envs: @@ -57,7 +43,8 @@ entrypoints: LANGUAGE: en NGRAM_MAX: 3 NGRAM_MIN: 2 - UNIGRAM_NORMALIZER: porter + TXT_DOWNLOAD_PATH: /tmp/input.txt + UNIGRAM_NORMALIZER: lemma USE_NGRAMS: true inputs: txt_input: @@ -70,31 +57,16 @@ entrypoints: txt_file_S3_HOST: null txt_file_S3_PORT: null txt_file_S3_SECRET_KEY: null - description: A .txt file + description: A .txt file, each line will be treated as a document type: file outputs: - dtm_output: + normalized_docs_output: config: - dtm_output_BUCKET_NAME: null - dtm_output_FILE_EXT: pkl - dtm_output_FILE_NAME: null - dtm_output_FILE_PATH: null - dtm_output_S3_ACCESS_KEY: null - dtm_output_S3_HOST: null - dtm_output_S3_PORT: null - dtm_output_S3_SECRET_KEY: null - description: Numpy representation of document-term matrix as .pkl file - type: file - vocab_output: - config: - vocab_output_BUCKET_NAME: null - vocab_output_FILE_EXT: pkl - vocab_output_FILE_NAME: null - vocab_output_FILE_PATH: null - vocab_output_S3_ACCESS_KEY: null - vocab_output_S3_HOST: null - vocab_output_S3_PORT: null - vocab_output_S3_SECRET_KEY: null - description: Pkl file of a dictionary that maps all words to their index in the DTM - type: file + normalized_docs_DB_TABLE: null + normalized_docs_PG_HOST: null + normalized_docs_PG_PASS: null + normalized_docs_PG_PORT: null + normalized_docs_PG_USER: null + description: Database Output, containing bib_id aswell as the normalized text + type: pg_table name: Language-Preprocessing diff --git a/docker-compose.yml b/docker-compose.yml index 3c73fe4..d75aa02 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,13 +13,16 @@ services: ports: - "9000:9000" - "9001:9001" - networks: - - scystream-net -networks: - scystream-net: - driver: bridge + postgres: + image: postgres:13 + container_name: postgres + environment: + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgres + - POSTGRES_DB=postgres + ports: + - "5432:5432" volumes: minio_data: - search_query: diff --git a/main.py b/main.py index 230d15f..3223be6 100644 --- a/main.py +++ b/main.py @@ -1,29 +1,31 @@ -import pickle -import tempfile +import logging +import pandas as pd +from sqlalchemy import create_engine +from typing import List from scystream.sdk.core import entrypoint from scystream.sdk.env.settings import ( - EnvSettings, - InputSettings, - OutputSettings, - FileSettings + EnvSettings, + InputSettings, + OutputSettings, + FileSettings, + PostgresSettings ) from scystream.sdk.file_handling.s3_manager import S3Operations from preprocessing.core import Preprocessor from preprocessing.loader import TxtLoader, BibLoader +from preprocessing.models import DocumentRecord, PreprocessedDocument +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) -class DTMFileOutput(FileSettings, OutputSettings): - __identifier__ = "dtm_output" - - FILE_EXT: str = "pkl" - - -class VocabFileOutput(FileSettings, OutputSettings): - __identifier__ = "vocab_output" - FILE_EXT: str = "pkl" +class NormalizedDocsOutput(PostgresSettings, OutputSettings): + __identifier__ = "normalized_docs" class TXTFileInput(FileSettings, InputSettings): @@ -46,9 +48,10 @@ class PreprocessTXT(EnvSettings): NGRAM_MIN: int = 2 NGRAM_MAX: int = 3 + TXT_DOWNLOAD_PATH: str = "/tmp/input.txt" + txt_input: TXTFileInput - dtm_output: DTMFileOutput - vocab_output: VocabFileOutput + normalized_docs_output: NormalizedDocsOutput class PreprocessBIB(EnvSettings): @@ -59,13 +62,41 @@ class PreprocessBIB(EnvSettings): NGRAM_MIN: int = 2 NGRAM_MAX: int = 3 + BIB_DOWNLOAD_PATH: str = "/tmp/input.bib" + bib_input: BIBFileInput - dtm_output: DTMFileOutput - vocab_output: VocabFileOutput + normalized_docs_output: NormalizedDocsOutput + + +def _write_preprocessed_docs_to_postgres( + preprocessed_ouput: List[PreprocessedDocument], + settings: PostgresSettings +): + df = pd.DataFrame([ + { + "doc_id": d.doc_id, + "tokens": d.tokens + } + for d in preprocessed_ouput + ]) + + logger.info(f"Writing {len(df)} processed documents to DB table '{ + settings.DB_TABLE}'…") + engine = create_engine( + f"postgresql+psycopg2://{settings.PG_USER}:{settings.PG_PASS}" + f"@{settings.PG_HOST}:{int(settings.PG_PORT)}/" + ) + + df.to_sql(settings.DB_TABLE, engine, if_exists="replace", index=False) + logger.info(f"Successfully stored normalized documents into '{ + settings.DB_TABLE}'.") -def _preprocess_and_store(texts, settings): + +def _preprocess_and_store(documents: List[DocumentRecord], settings): """Shared preprocessing logic for TXT and BIB.""" + logger.info(f"Starting preprocessing with {len(documents)} documents") + pre = Preprocessor( language=settings.LANGUAGE, filter_stopwords=settings.FILTER_STOPWORDS, @@ -74,74 +105,33 @@ def _preprocess_and_store(texts, settings): ngram_min=settings.NGRAM_MIN, ngram_max=settings.NGRAM_MAX, ) - pre.texts = texts - pre.analyze_texts() - pre.generate_bag_of_words() - dtm, vocab = pre.generate_document_term_matrix() + pre.documents = documents + result = pre.generate_normalized_output() - with tempfile.NamedTemporaryFile(suffix="_dtm.pkl") as tmp_dtm, \ - tempfile.NamedTemporaryFile(suffix="_vocab.pkl") as tmp_vocab: + _write_preprocessed_docs_to_postgres( + result, settings.normalized_docs_output) - pickle.dump(dtm, tmp_dtm) - tmp_dtm.flush() - - pickle.dump(vocab, tmp_vocab) - tmp_vocab.flush() - - S3Operations.upload(settings.dtm_output, tmp_dtm.name) - S3Operations.upload(settings.vocab_output, tmp_vocab.name) + logger.info("Preprocessing completed successfully.") @entrypoint(PreprocessTXT) def preprocess_txt_file(settings): - S3Operations.download(settings.txt_input, "input.txt") - texts = TxtLoader.load("./input.txt") + logger.info("Downloading TXT input from S3...") + S3Operations.download(settings.txt_input, settings.TXT_DOWNLOAD_PATH) + + texts = TxtLoader.load(settings.TXT_DOWNLOAD_PATH) + _preprocess_and_store(texts, settings) @entrypoint(PreprocessBIB) def preprocess_bib_file(settings): - S3Operations.download(settings.bib_input, "input.bib") + logger.info("Downloading BIB input from S3...") + S3Operations.download(settings.bib_input, settings.BIB_DOWNLOAD_PATH) + texts = BibLoader.load( - "./input.bib", + settings.BIB_DOWNLOAD_PATH, attribute=settings.bib_input.SELECTED_ATTRIBUTE, ) _preprocess_and_store(texts, settings) - - -""" -if __name__ == "__main__": - test = PreprocessBIB( - bib_input=BIBFileInput( - S3_HOST="http://localhost", - S3_PORT="9000", - S3_ACCESS_KEY="minioadmin", - S3_SECRET_KEY="minioadmin", - BUCKET_NAME="input-bucket", - FILE_PATH="input_file_path", - FILE_NAME="wos_export", - SELECTED_ATTRIBUTE="abstract" - ), - dtm_output=DTMFileOutput( - S3_HOST="http://localhost", - S3_PORT="9000", - S3_ACCESS_KEY="minioadmin", - S3_SECRET_KEY="minioadmin", - BUCKET_NAME="output-bucket", - FILE_PATH="output_file_path", - FILE_NAME="dtm_file_bib" - ), - vocab_output=VocabFileOutput( - S3_HOST="http://localhost", - S3_PORT="9000", - S3_ACCESS_KEY="minioadmin", - S3_SECRET_KEY="minioadmin", - BUCKET_NAME="output-bucket", - FILE_PATH="output_file_path", - FILE_NAME="vocab_file_bib" - ) - ) - - preprocess_bib_file(test) -""" diff --git a/preprocessing/__init__.py b/preprocessing/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/preprocessing/core.py b/preprocessing/core.py index 4db4585..f819839 100644 --- a/preprocessing/core.py +++ b/preprocessing/core.py @@ -1,14 +1,15 @@ +import logging import spacy -import numpy as np -from typing import Literal +from typing import Literal, List from nltk.stem.porter import PorterStemmer -from collections import Counter +from preprocessing.models import PreprocessedDocument, DocumentRecord LANG_TO_SPACY_MODELS = { "en": "en_core_web_sm", "de": "de_core_news_sm" } +logger = logging.getLogger(__name__) class Preprocessor: @@ -21,6 +22,12 @@ def __init__( ngram_min: int = 2, ngram_max: int = 3, ): + logger.info( + "Init Preprocessor (lang=%s, filter_stopwords=%s, ngrams=%s)", + language, + filter_stopwords, + use_ngrams, + ) self.language = language self.filter_stopwords = filter_stopwords self.unigram_normalizer = unigram_normalizer @@ -29,22 +36,14 @@ def __init__( self.ngram_max = ngram_max self.nlp_model = LANG_TO_SPACY_MODELS.get(language, "en_core_web_sm") - - self.ngram_frequency = Counter() - self.ngram_document_frequency = Counter() - self.token_frequency = Counter() - self.token_document_frequency = Counter() - - self.texts: list[str] = [] - - self.bag_of_words = [] - try: self.nlp = spacy.load(self.nlp_model, disable=["ner"]) except OSError: spacy.cli.download(self.nlp_model) self.nlp = spacy.load(self.nlp_model, disable=["ner"]) + self.documents: List[DocumentRecord] = [] + def filter_tokens( self, tokens: list[spacy.tokens.Token], @@ -57,42 +56,39 @@ def filter_tokens( and len(t.text) > 2 ] - def analyze_texts(self): + def generate_normalized_output(self) -> List[PreprocessedDocument]: + logger.info("Generating normalized output...") porter = PorterStemmer() - for text in self.texts: - doc = self.nlp(text) - token_list = [] - ngram_list = [] + processed_docs: List[PreprocessedDocument] = [] + + for record in self.documents: + doc = self.nlp(record.text) + doc_terms = [] - for sentence in doc.sents: - filtered_tokens = self.filter_tokens( - list(sentence), - self.filter_stopwords + # Process each sentence + for sent in doc.sents: + filtered = self.filter_tokens( + list(sent), self.filter_stopwords ) - normalized_tokens = [ - self.normalize_token(t, porter) for t in filtered_tokens + normalized = [ + self.normalize_token(t, porter) for t in filtered ] - token_list.extend(normalized_tokens) + doc_terms.extend(normalized) - if ( - self.use_ngrams and - self.ngram_min > 1 and - self.ngram_max > 1 - ): + # Generate n-grams + if self.use_ngrams and self.ngram_min > 1: for n in range(self.ngram_min, self.ngram_max + 1): - for i in range(len(normalized_tokens) - n + 1): - ngram = " ".join(normalized_tokens[i:i+n]) - ngram_list.append(ngram) + for i in range(len(normalized) - n + 1): + ngram = " ".join(normalized[i:i+n]) + doc_terms.append(ngram) - # update unigram counters - self.token_frequency.update(token_list) - self.token_document_frequency.update(set(token_list)) + processed_docs.append(PreprocessedDocument( + doc_id=record.doc_id, + tokens=doc_terms + )) - # update n-gram counters if any - if ngram_list: - self.ngram_frequency.update(ngram_list) - self.ngram_document_frequency.update(set(ngram_list)) + return processed_docs def normalize_token( self, @@ -108,90 +104,3 @@ def normalize_token( elif self.unigram_normalizer == "lemma": return token.lemma_.lower() return word - - def generate_bag_of_words(self): - porter = PorterStemmer() - self.bag_of_words = [] - - for text in self.texts: - doc = self.nlp(text) - doc_terms = [] - - for sent in doc.sents: - tokens = self.filter_tokens(list(sent), self.filter_stopwords) - - # Handle unigrams - for token in tokens: - normalized = self.normalize_token(token, porter) - - token_dict = { - "term": normalized, - "type": "word", - "span": 1, - "freq": self.token_frequency.get(normalized, 0), - "docs": ( - self.token_document_frequency.get(normalized, 0) - ), - "filters": ( - ["stop"] if not self.filter_stopwords - and token.is_stop else [] - ) - } - doc_terms.append(token_dict) - - # Handle ngrams - if self.use_ngrams and self.ngram_min > 1: - added_ngrams = set() # avoid duplicates - for n in range(self.ngram_min, self.ngram_max + 1): - for i in range(len(tokens) - n + 1): - ngram_tokens = tokens[i:i+n] - ngram_str = " ".join( - [self.normalize_token(t, porter) - for t in ngram_tokens] - ) - - if ngram_str in added_ngrams: - continue - added_ngrams.add(ngram_str) - - ngram_dict = { - "term": ngram_str, - "type": "ngram", - "span": n, - "freq": self.ngram_frequency.get(ngram_str, 0), - "docs": ( - self.ngram_document_frequency.get( - ngram_str, 0 - ) - ), - "filters": [] - } - doc_terms.append(ngram_dict) - - self.bag_of_words.append(doc_terms) - - def generate_document_term_matrix(self) -> (np.ndarray, dict): - """ - Converts bag_of_words into document-term matrix - - dtm (np.ndarray): shape = (num_docs, num_terms) - vocab (dict): mapping term -> column index - """ - - all_terms = set() - for doc in self.bag_of_words: - for t in doc: - all_terms.add(t["term"]) - - vocab = {term: idx for idx, term in enumerate(sorted(all_terms))} - - num_docs = len(self.bag_of_words) - num_terms = len(vocab) - dtm = np.zeros((num_docs, num_terms), dtype=int) - - for doc_idx, doc in enumerate(self.bag_of_words): - for token in doc: - term_idx = vocab[token["term"]] - dtm[doc_idx, term_idx] += 1 - - return dtm, vocab diff --git a/preprocessing/loader.py b/preprocessing/loader.py index d55aac3..ecb5193 100644 --- a/preprocessing/loader.py +++ b/preprocessing/loader.py @@ -1,21 +1,26 @@ +import logging import re import bibtexparser +from preprocessing.models import DocumentRecord + +logger = logging.getLogger(__name__) + def normalize_text(text: str) -> str: if not text: return "" - # Remove curly braces - text = re.sub(r"[{}]", "", text) - # Remove LaTeX commands - text = re.sub(r"\\[a-zA-Z]+\s*(\{[^}]*\})?", "", text) + text = re.sub(r"\\[a-zA-Z]+\{([^}]*)\}", r"\1", text) + + text = re.sub(r"\\[a-zA-Z]+", "", text) + + text = re.sub(r"[{}]", "", text) - # Remove LaTeX escaped quotes/accents - text = re.sub(r"\\""[a-zA-Z]", lambda m: m.group(0)[-1], text) + text = re.sub(r'\\"([a-zA-Z])', r'\1', text) text = re.sub(r"\\'", "", text) - text = text.replace("'", "") + text = re.sub(r"\s+", " ", text) return text.strip() @@ -23,21 +28,46 @@ def normalize_text(text: str) -> str: class TxtLoader: @staticmethod - def load(file_path: str) -> list[str]: + def load(file_path: str) -> list[DocumentRecord]: with open(file_path, "r", encoding="utf-8") as f: lines = f.readlines() - return [normalize_text(line) for line in lines] + + return [ + DocumentRecord( + doc_id=str(i), + text=normalize_text(line) + ) + for i, line in enumerate(lines, start=1) + ] class BibLoader: @staticmethod - def load(file_path: str, attribute: str) -> list[str]: + def load(file_path: str, attribute: str) -> list[DocumentRecord]: + logger.info(f"Loading BIB file (attribute={attribute})...") + with open(file_path, "r", encoding="utf-8") as f: bib_database = bibtexparser.load(f) results = [] + attribute_lower = attribute.lower() + for entry in bib_database.entries: - value = entry.get(attribute.lower(), "") - results.append(normalize_text(value)) + bib_id = ( + entry.get("id") + or entry.get("ID") + or entry.get("citekey") + or entry.get("entrykey") + or entry.get("Unique-ID") + or "UNKNOWN_ID" + ) + + raw_value = entry.get(attribute_lower, "") + normalized = normalize_text(raw_value) + + results.append(DocumentRecord( + doc_id=bib_id, + text=normalized + )) return results diff --git a/preprocessing/models.py b/preprocessing/models.py new file mode 100644 index 0000000..48ddc4c --- /dev/null +++ b/preprocessing/models.py @@ -0,0 +1,14 @@ +from typing import List +from dataclasses import dataclass + + +@dataclass +class DocumentRecord: + doc_id: str # "0", "1", ... for TXT OR bib_id for BIB + text: str # normalized text + + +@dataclass +class PreprocessedDocument: + doc_id: str + tokens: List[str] diff --git a/requirements.txt b/requirements.txt index 3493ec1..1804e39 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,8 @@ scystream-sdk==1.2.2 spacy==3.8.7 nltk==3.9.1 -numpy==2.3.3 bibtexparser==1.4.3 +pytest==9.0.1 +pandas==2.3.3 +SQLAlchemy==2.0.43 +psycopg2-binary==2.9.10 diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/conftest.py b/test/conftest.py new file mode 100644 index 0000000..5ce5cca --- /dev/null +++ b/test/conftest.py @@ -0,0 +1,20 @@ +import pytest + +from preprocessing.core import Preprocessor + + +@pytest.fixture +def simple_texts(): + return ["This is a test sentence.", "Another test sentence."] + + +@pytest.fixture +def preprocessor(): + return Preprocessor( + language="en", + filter_stopwords=True, + unigram_normalizer="porter", + use_ngrams=True, + ngram_min=2, + ngram_max=3, + ) diff --git a/test/files/test.txt b/test/files/input.txt similarity index 100% rename from test/files/test.txt rename to test/files/input.txt diff --git a/test/test_full.py b/test/test_full.py new file mode 100644 index 0000000..6a04e61 --- /dev/null +++ b/test/test_full.py @@ -0,0 +1,179 @@ +import os +import boto3 +import pytest +import pandas as pd + +from pathlib import Path +from main import preprocess_bib_file, preprocess_txt_file +from botocore.exceptions import ClientError +from sqlalchemy import create_engine + +MINIO_USER = "minioadmin" +MINIO_PWD = "minioadmin" +BUCKET_NAME = "testbucket" + +PG_USER = "postgres" +PG_PASS = "postgres" + + +def parse_pg_array(arr: str) -> list[str]: + # Convert Postgres literal → Python list + arr = arr.strip("{}") + if not arr: + return [] + # handle quoted items + return [a.strip('"') for a in arr.split(",")] + + +def ensure_bucket(s3, bucket): + try: + s3.head_bucket(Bucket=bucket) + except ClientError as e: + error_code = e.response["Error"]["Code"] + if error_code in ("404", "NoSuchBucket"): + s3.create_bucket(Bucket=bucket) + else: + raise + + +def download_to_tmp(s3, bucket, key): + tmp_path = Path("/tmp") / key.replace("/", "_") + s3.download_file(bucket, key, str(tmp_path)) + return tmp_path + + +@pytest.fixture +def s3_minio(): + client = boto3.client( + "s3", + endpoint_url="http://localhost:9000", + aws_access_key_id=MINIO_USER, + aws_secret_access_key=MINIO_PWD + ) + ensure_bucket(client, BUCKET_NAME) + return client + + +def test_full_bib(s3_minio): + input_file_name = "input" + + bib_path = Path(__file__).parent / "files" / f"{input_file_name}.bib" + bib_bytes = bib_path.read_bytes() + + # Upload to MinIO + s3_minio.put_object( + Bucket=BUCKET_NAME, + Key=f"{input_file_name}.bib", + Body=bib_bytes + ) + + # ENV for preprocess_bib_file + env = { + # Preprocessor config + "UNIGRAM_NORMALIZER": "porter", + + # BIB INPUT S3 + "bib_file_S3_HOST": "http://127.0.0.1", + "bib_file_S3_PORT": "9000", + "bib_file_S3_ACCESS_KEY": MINIO_USER, + "bib_file_S3_SECRET_KEY": MINIO_PWD, + "bib_file_BUCKET_NAME": BUCKET_NAME, + "bib_file_FILE_PATH": "", + "bib_file_FILE_NAME": input_file_name, + "bib_file_SELECTED_ATTRIBUTE": "abstract", + + # PostgreSQL output + "normalized_docs_PG_HOST": "localhost", + "normalized_docs_PG_PORT": "5432", + "normalized_docs_PG_USER": PG_USER, + "normalized_docs_PG_PASS": PG_PASS, + "normalized_docs_DB_TABLE": "normalized_docs_bib", + } + + for k, v in env.items(): + os.environ[k] = v + + # Run block + preprocess_bib_file() + + # Query PostgreSQL for inserted documents + engine = create_engine( + f"postgresql+psycopg2://{PG_USER}:{PG_PASS}@localhost:5432/" + ) + df = pd.read_sql_table("normalized_docs_bib", engine) + + # Assertions + assert len(df) > 0 + assert "doc_id" in df.columns + assert "tokens" in df.columns + + # doc_id increments + assert len(df["doc_id"]) == len(df) # doc_id count matches rows + assert df["doc_id"].is_unique # no duplicates + assert all(isinstance(x, str) for x in df["doc_id"]) # Bib IDs are strings + + assert set(df["doc_id"]) == { + "WOS:001016714700004", + "WOS:001322577100012" + } + + df["tokens"] = df["tokens"].apply(parse_pg_array) + + assert isinstance(df.iloc[0]["tokens"], list) + assert all(isinstance(t, str) for t in df.iloc[0]["tokens"]) + + +def test_full_txt(s3_minio): + input_file_name = "input" + + txt_path = Path(__file__).parent / "files" / f"{input_file_name}.txt" + txt_bytes = txt_path.read_bytes() + + # Upload input to MinIO + s3_minio.put_object( + Bucket=BUCKET_NAME, + Key=f"{input_file_name}.txt", + Body=txt_bytes + ) + + env = { + "UNIGRAM_NORMALIZER": "porter", + + # TXT input S3 + "txt_file_S3_HOST": "http://127.0.0.1", + "txt_file_S3_PORT": "9000", + "txt_file_S3_ACCESS_KEY": MINIO_USER, + "txt_file_S3_SECRET_KEY": MINIO_PWD, + "txt_file_BUCKET_NAME": BUCKET_NAME, + "txt_file_FILE_PATH": "", + "txt_file_FILE_NAME": input_file_name, + + # Postgres output + "normalized_docs_PG_HOST": "localhost", + "normalized_docs_PG_PORT": "5432", + "normalized_docs_PG_USER": PG_USER, + "normalized_docs_PG_PASS": PG_PASS, + "normalized_docs_DB_TABLE": "normalized_docs_txt", + } + + for k, v in env.items(): + os.environ[k] = v + + preprocess_txt_file() + + # Query PostgreSQL + engine = create_engine( + f"postgresql+psycopg2://{PG_USER}:{PG_PASS}@localhost:5432/" + ) + df = pd.read_sql_table("normalized_docs_txt", engine) + + # Assertions + assert len(df) > 0 + assert "doc_id" in df.columns + assert "tokens" in df.columns + assert len(df["doc_id"]) == len(df) + + df["tokens"] = df["tokens"].apply(parse_pg_array) + + assert isinstance(df.iloc[0]["tokens"], list) + assert all(isinstance(t, str) for t in df.iloc[0]["tokens"]) diff --git a/test/test_loaders.py b/test/test_loaders.py new file mode 100644 index 0000000..c55827c --- /dev/null +++ b/test/test_loaders.py @@ -0,0 +1,52 @@ +import os +import tempfile + +from preprocessing.loader import TxtLoader, BibLoader +from preprocessing.models import DocumentRecord + + +def test_txt_loader_reads_and_normalizes(): + with tempfile.NamedTemporaryFile("w+", delete=False) as f: + f.write("Hello {World}\nSecond line") + fname = f.name + + result = TxtLoader.load(fname) + os.unlink(fname) + + # Expect list of DocumentRecord + assert len(result) == 2 + + assert isinstance(result[0], DocumentRecord) + assert result[0].doc_id == "1" + assert result[0].text == "Hello World" + + assert isinstance(result[1], DocumentRecord) + assert result[1].doc_id == "2" + assert result[1].text == "Second line" + + +def test_bib_loader_extracts_attribute(): + bib_content = r""" + @article{a, + abstract = {This is {Bib} \textbf{text}.}, + title = {Ignore me} + } + """ + + with tempfile.NamedTemporaryFile("w+", delete=False) as f: + f.write(bib_content) + fname = f.name + + result = BibLoader.load(fname, "abstract") + os.unlink(fname) + + assert len(result) == 1 + + record = result[0] + assert isinstance(record, DocumentRecord) + + # ID taken from bib entry key: "@article{a,..." + assert record.doc_id == "a" + + # Normalized abstract text + assert record.text == "This is Bib text." diff --git a/test/test_normalize.py b/test/test_normalize.py new file mode 100644 index 0000000..33dd6e5 --- /dev/null +++ b/test/test_normalize.py @@ -0,0 +1,17 @@ +from preprocessing.loader import normalize_text + + +def test_normalize_removes_braces(): + assert normalize_text("{abc}") == "abc" + + +def test_normalize_removes_latex_commands(): + assert normalize_text(r"\textbf{Hello}") == "Hello" + + +def test_normalize_removes_accents(): + assert normalize_text(r"\'a") == "a" + + +def test_normalize_collapses_whitespace(): + assert normalize_text("a b c") == "a b c" diff --git a/test/test_preprocessor_unit.py b/test/test_preprocessor_unit.py new file mode 100644 index 0000000..0833a14 --- /dev/null +++ b/test/test_preprocessor_unit.py @@ -0,0 +1,43 @@ +from preprocessing.core import Preprocessor +from preprocessing.models import DocumentRecord + + +def test_preprocessor_generate_normalized_output(): + # Prepare input documents as dataclasses + docs = [ + DocumentRecord(doc_id="1", text="Dogs are running fast."), + DocumentRecord(doc_id="2", text="Cats jump high.") + ] + + pre = Preprocessor( + language="en", + filter_stopwords=True, + unigram_normalizer="lemma", + use_ngrams=True, + ngram_min=2, + ngram_max=2, + ) + + pre.documents = docs + output = pre.generate_normalized_output() + + # Basic structure checks + assert len(output) == 2 + assert output[0].doc_id == "1" + assert output[1].doc_id == "2" + + # Tokens must not be empty + assert len(output[0].tokens) > 0 + assert len(output[1].tokens) > 0 + + # Check that lemmatization worked + # "running" → "run" + assert "run" in output[0].tokens + + # Stopwords filtered → "are" removed + assert "are" not in output[0].tokens + + # Check n-gram generation (bigram because ngram_min=ngram_max=2) + # Example bigram from doc1: "dog run" (if spacy lemmatizes) + bigrams_doc1 = [tok for tok in output[0].tokens if " " in tok] + assert len(bigrams_doc1) > 0 # at least one n-gram produced