From c26e1a9cd33c43c3885eec0ef3a26b93017367f1 Mon Sep 17 00:00:00 2001 From: Suresh Dash Date: Wed, 30 Jul 2025 22:20:05 +0900 Subject: [PATCH 1/2] Fix: Handle Partial Log Line Writes in CSV Parsing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Description pg_statsinfo reads PostgreSQL CSV logs, parses each entry, and inserts the data into monitoring tables. In certain cases—such as delayed log flushing or during log rotation—it may encounter a partially written CSV log line and incorrectly treat it as complete. This can result in: - Skipping the remaining portion of the actual log line on the next read - Malformed parsing due to an incomplete field set - Insert failures like: ERROR: invalid input syntax for type timestamp with time zone: "03:32.829 JST" Fix Summary - File offset is now updated only after successfully parsing a complete log entry. - Partial lines are safely skipped without updating the read position. - Introduced internal tracking variables: - last_log_size: detects file growth between reads - log_incomplete: tracks possible partial writes and avoids infinite reprocessing - Improved logic for log rotation and partial flush scenarios to ensure stable and reliable log ingestion Additional Notes This fix enhances robustness, especially in environments with: - High log write latency - Small repolog_buffer or frequent log rotation The behavior was verified through both manual and automated test scenarios using a dedicated partial write reproduction script (see tools/reproduce_partial_write.py) --- agent/bin/logger.c | 47 +++++++- agent/bin/logger_send.c | 49 ++++++++- tools/README.md | 54 +++++++++ tools/reproduce_partial_write.py | 181 +++++++++++++++++++++++++++++++ 4 files changed, 320 insertions(+), 11 deletions(-) create mode 100644 tools/README.md create mode 100644 tools/reproduce_partial_write.py diff --git a/agent/bin/logger.c b/agent/bin/logger.c index dd126f4..927f99d 100644 --- a/agent/bin/logger.c +++ b/agent/bin/logger.c @@ -56,6 +56,9 @@ typedef struct Logger char *csv_name; /* log file name */ long csv_offset; /* parsed bytes in log file */ FILE *fp; /* open log file */ + long last_log_size; /* track the last read log file size*/ + bool log_incomplete; /* True if the last read_csv() failed (partial write likely) */ + /* Text log file */ FILE *textlog; @@ -734,9 +737,21 @@ logger_next(Logger *logger, const char *pg_log) struct stat st; bool ret; + /* + * Step 1: Check if the file size remains unchanged. + * If logger->last_log_size is the same as the current file size (st.st_size), + * it indicates that no new logs have been written since the last check. + * + * Step 2: Use logger->log_incomplete to determine if this is due to a read_csv() failure. + * If the previous read_csv() attempt failed, we assume a partial write. + * This helps differentiate between: + * - A fully flushed but stagnant log file (safe to continue reading) + * - An incomplete write (where we should wait before retrying) + */ if (logger->fp == NULL || stat(logger->csv_path, &st) != 0 || - logger->csv_offset >= st.st_size) + logger->csv_offset >= st.st_size || + (logger->last_log_size == st.st_size && logger->log_incomplete)) { char csvlog[MAXPGPATH]; char textlog[MAXPGPATH]; @@ -748,8 +763,10 @@ logger_next(Logger *logger, const char *pg_log) if (!csvlog[0]) return false; /* logfile not found */ - if (logger->fp && strcmp(logger->csv_name, csvlog) == 0) - return false; /* no more logs */ + + /* If the same file is already being processed and log size is unchanged, wait */ + if (logger->fp && strcmp(logger->csv_name, csvlog) == 0 && logger->last_log_size == st.st_size) + return false; /* no more logs */ join_path_components(textlog, pg_log, csvlog); replace_extension(textlog, ".log"); @@ -771,6 +788,14 @@ logger_next(Logger *logger, const char *pg_log) if (!logger_open(logger, csvlog, 0)) return false; + /* Reset tracking variables for the new file */ + logger->last_log_size = 0; + logger->log_incomplete = false; + + /* Refresh file stats for the new log */ + if (stat(logger->csv_path, &st) != 0) + return false; /* Failed to get new log size */ + /* * if writer thread working in fallback mode, * write in the textlog that agent is working in fallback mode. @@ -784,15 +809,27 @@ logger_next(Logger *logger, const char *pg_log) elog(DEBUG2, "read csvlog \"%s\"", logger->csv_path); } + /* Read the next log entry */ clearerr(logger->fp); fseek(logger->fp, logger->csv_offset, SEEK_SET); ret = read_csv(logger->fp, &logger->buf, CSV_COLS, logger->fields); - logger->csv_offset = ftell(logger->fp); - if (!ret) + /* Update last_log_size AFTER reading, to prevent infinite WARNING loops */ + logger->last_log_size = st.st_size; + + if (ret) + { + /* Update offset only if read_csv() was successful */ + logger->csv_offset = ftell(logger->fp); + logger->log_incomplete = false; + } + else { int save_errno = errno; + /* Track partial write if read_csv() fails */ + logger->log_incomplete = true; + /* close the file unless EOF; it means an error */ if (!feof(logger->fp)) { diff --git a/agent/bin/logger_send.c b/agent/bin/logger_send.c index d332fc8..236ab7b 100644 --- a/agent/bin/logger_send.c +++ b/agent/bin/logger_send.c @@ -29,6 +29,8 @@ typedef struct Logger char *csv_name; /* log file name */ long csv_offset; /* parsed bytes in log file */ FILE *fp; /* open log file */ + long last_log_size; /* track the last read log file size*/ + bool log_incomplete; /* True if the last read_csv() failed (partial write likely) */ /* temp buffer */ StringInfoData buf; /* log buffer */ @@ -284,9 +286,21 @@ logger_next(Logger *logger) struct stat st; bool ret; - if (logger->fp == NULL || + /* + * Step 1: Check if the file size remains unchanged. + * If logger->last_log_size is the same as the current file size (st.st_size), + * it indicates that no new logs have been written since the last check. + * + * Step 2: Use logger->log_incomplete to determine if this is due to a read_csv() failure. + * If the previous read_csv() attempt failed, we assume a partial write. + * This helps differentiate between: + * - A fully flushed but stagnant log file (safe to continue reading) + * - An incomplete write (where we should wait before retrying) + */ + if (logger->fp == NULL || stat(logger->csv_path, &st) != 0 || - logger->csv_offset >= st.st_size) + logger->csv_offset >= st.st_size || + (logger->last_log_size == st.st_size && logger->log_incomplete)) { char csvlog[MAXPGPATH]; @@ -294,23 +308,46 @@ logger_next(Logger *logger) if (!csvlog[0]) return false; /* logfile not found */ - if (logger->fp && strcmp(logger->csv_name, csvlog) == 0) - return false; /* no more logs */ + /* If the same file is already being processed and log size is unchanged, wait */ + if (logger->fp && strcmp(logger->csv_name, csvlog) == 0 && logger->last_log_size == st.st_size) + return false; + + /* Switch to the new log file */ logger_close(logger); if (!logger_open(logger, csvlog, 0)) return false; + + /* Reset tracking variables for the new file */ + logger->last_log_size = 0; + logger->log_incomplete = false; + + /* Refresh file stats for the new log */ + if (stat(logger->csv_path, &st) != 0) + return false; /* Failed to get new log size */ } + /* Read the next log entry */ clearerr(logger->fp); fseek(logger->fp, logger->csv_offset, SEEK_SET); ret = read_csv(logger->fp, &logger->buf, CSV_COLS, logger->fields); - logger->csv_offset = ftell(logger->fp); - if (!ret) + /* Update last_log_size AFTER reading, to prevent infinite WARNING loops */ + logger->last_log_size = st.st_size; + + if (ret) + { + /* Update offset only if read_csv() was successful */ + logger->csv_offset = ftell(logger->fp); + logger->log_incomplete = false; + } + else { int save_errno = errno; + /* Track partial write if read_csv() fails */ + logger->log_incomplete = true; + /* close the file unless EOF; it means an error */ if (!feof(logger->fp)) { diff --git a/tools/README.md b/tools/README.md new file mode 100644 index 0000000..47b3670 --- /dev/null +++ b/tools/README.md @@ -0,0 +1,54 @@ +# pg_statsinfo Partial Write Reproducer + +This script simulates a **partially written PostgreSQL CSV log entry** to reproduce a known issue in `pg_statsinfo`, where reading an incomplete log line causes parsing errors and incorrect offset tracking. + +It helps validate the robustness of log parsing logic in `pg_statsinfo`, especially in scenarios where the PostgreSQL backend log entry is flushed in two parts. + +--- + +## Prerequisites + +- PostgreSQL with `pg_statsinfo` installed and configured +- Python 3.6 or higher +- Python packages: + - `psycopg2` + - `pytz` (optional, if timestamps are involved) + +> ⚠️ If your PostgreSQL installation does **not** embed `RPATH`, make sure to set `LD_LIBRARY_PATH` to include the `lib` directory of PostgreSQL: + +```bash +export LD_LIBRARY_PATH=/path/to/postgres/lib:$LD_LIBRARY_PATH + + + +## Configuration: + +Make sure PostgreSQL is running with the following parameters enabled in postgresql.conf: + +logging_collector = on +log_destination = 'csvlog' +log_directory = 'log' +log_filename = 'postgresql-%Y-%m-%d_%H%M%S.csv' +log_statement = 'all' +log_min_messages = LOG + +# pg_statsinfo-specific settings +pg_statsinfo.repolog_buffer = 1 # Set a small buffer size so pg_statsinfo inserts + # parsed logs immediately, making errors easier to + # reproduce promptly +pg_statsinfo.repolog_interval = 60 + +------ +After updating postgresql.conf, restart the PostgreSQL instance for the changes to take effect. + + + + +## Usage: + +python3 tools/reproduce_partial_write.py \ + --pg-bin /path/to/postgres/bin \ + --log-dir /path/to/pg_log \ + --sample-log-file ./sample_log_entry.txt \ + --pg-port 5432 + diff --git a/tools/reproduce_partial_write.py b/tools/reproduce_partial_write.py new file mode 100644 index 0000000..5e234f3 --- /dev/null +++ b/tools/reproduce_partial_write.py @@ -0,0 +1,181 @@ +import argparse +import os +import time +import glob +import re +import pytz +import psycopg2 +import subprocess +import sys +from datetime import datetime + +MAX_RETRIES = 5 +RETRY_DELAY = 5 # seconds + + +def get_current_timestamp_jst(): + jst = pytz.timezone("Asia/Tokyo") + return datetime.now(jst).strftime("%Y-%m-%d %H:%M:%S JST") + + +def parse_to_hourly_timestamp(timestamp_str): + timestamp = datetime.strptime(timestamp_str.replace(" JST", ""), "%Y-%m-%d %H:%M:%S") + rounded_time = timestamp.replace(minute=0, second=0, microsecond=0) + return rounded_time.strftime("%Y-%m-%d %H:%M:%S") + + +def get_latest_logfile(log_dir): + log_files = sorted(glob.glob(os.path.join(log_dir, "postgresql-*.csv")), reverse=True) + return log_files[0] if log_files else None + + +def generate_new_logfile_name(latest_logfile): + match = re.search(r"(postgresql-\d{4}-\d{2}-\d{2}_)(\d{6})", latest_logfile) + if match: + prefix, number = match.groups() + new_number = f"{int(number) + 1:06d}" + return latest_logfile.replace(f"_{number}.csv", f"_{new_number}.csv") + return None + + +def create_partial_csvlog(log_dir): + latest_logfile = get_latest_logfile(log_dir) + if not latest_logfile: + raise RuntimeError("No CSV log files found.") + + new_logfile = generate_new_logfile_name(latest_logfile) + if not new_logfile: + raise RuntimeError("Failed to generate new log file name.") + + full_timestamp = get_current_timestamp_jst() + partial_timestamp = full_timestamp[:14] # Partial write: YYYY-MM-DD HH: + + with open(new_logfile, "w") as f: + f.write(partial_timestamp) + + print(f"Partial CSV log created: {new_logfile}") + return new_logfile, full_timestamp + + +def append_to_logfile(logfile, content): + time.sleep(5) + with open(logfile, "a") as f: + f.write(content + "\n") + print(f"✅ Appended to {logfile}:{content}") + + +def check_pg_statsinfo_log(log_dir, min_count=2): + log_path = os.path.join(log_dir, "pg_statsinfo.log") + if not os.path.exists(log_path): + return False + + with open(log_path, "r") as f: + lines = f.readlines() + + count = sum(1 for line in lines[-50:] if "WARNING: pg_statsinfo: cannot parse csvlog column" in line) + return count >= min_count + + +def execute_query(query, db_config): + conn = psycopg2.connect(**db_config) + cur = conn.cursor() + cur.execute(query) + result = cur.fetchall() + conn.commit() + cur.close() + conn.close() + return result + + +def check_pg_statsinfo_error(log_dir): + error_strings = [ + "ERROR: invalid input syntax for type timestamp with time zone", + "ERROR: pg_statsinfo: query failed: ERROR: date/time " + ] + count = 0 + for file in os.listdir(log_dir): + path = os.path.join(log_dir, file) + if os.path.isfile(path): + with open(path, "r") as f: + count += sum(1 for line in f if any(err in line for err in error_strings)) + return count > 0 + + +def check_postgres_running(pg_bin, pg_port): + pg_isready_path = os.path.join(pg_bin, "pg_isready") + try: + import pdb;pdb.set_trace() + result = subprocess.run([pg_isready_path, "-p", str(pg_port)], check=True) + if result.returncode != 0: + print("❌ PostgreSQL server does not appear to be running:") + print(result.stderr.strip()) + exit(1) + else: + print("✅ PostgreSQL server is running.") + except Exception as e: + print(f"❌ Unexpected error while checking PostgreSQL status: {e}") + sys.exit(1) + except FileNotFoundError: + print("⚠️ 'pg_isready' not found. Please ensure PostgreSQL tools are installed and in PATH.") + exit(1) + + +def main(): + parser = argparse.ArgumentParser(description="Reproduce partial write parsing issue in pg_statsinfo") + parser.add_argument("--data-dir", required=True, help="Path to PostgreSQL data directory") + parser.add_argument("--pg-bin", required=True, help="Path to PostgreSQL binaries") + parser.add_argument("--csv-log", required=True, help="Path to CSV log directory") + parser.add_argument("--pg-port", default="5432", help="PostgreSQL port") + args = parser.parse_args() + + db_config = { + "dbname": "postgres", + "user": "postgres", + "host": "localhost", + "port": args.pg_port + } + + print("🔍 Checking PostgreSQL status...") + check_postgres_running(args.pg_bin, args.pg_port) + + print("Creating partial log entry...") + partial_logfile, ts_str = create_partial_csvlog(args.csv_log) + + print("Waiting for pg_statsinfo to detect parsing warning...") + for _ in range(12): + if check_pg_statsinfo_log(args.csv_log): + break + print("Waiting for WARNING in pg_statsinfo.log...") + time.sleep(5) + else: + print("✖️ Parsing warning not detected. Test scenario failed.") + return + + print("Appending remaining part of the line...") + remaining = ts_str[14:] + content = f"{remaining},,,43168,,abcd.1234,3025,,{ts_str},,0,LOG,00000,\"checkpoint starting: time postgres invalid2\",,,,,,,,,\"\",\"checkpointer\",,0" + append_to_logfile(partial_logfile, content) + + query_ts = parse_to_hourly_timestamp(ts_str) + query = f"SELECT count(*) FROM statsrepo.log WHERE timestamp >= '{query_ts}' AND message LIKE '%postgres invalid2%';" + + for attempt in range(1, MAX_RETRIES + 1): + count = execute_query(query, db_config)[0][0] + if count > 0: + print(f"✅ Validation Passed: {count} matching entries found.") + break + else: + print(f"Attempt {attempt}/{MAX_RETRIES} failed. Retrying in {RETRY_DELAY} seconds...") + time.sleep(RETRY_DELAY) + else: + print("✖️ Validation Failed after multiple retries.") + + if check_pg_statsinfo_error(args.csv_log): + print("\u274C [FAILED] Timestamp parse error detected.") + else: + print("\u2705 [PASSED] No timestamp errors found.") + + +if __name__ == "__main__": + main() + From 054bf2ad2611b3dfea2501111a64001aae98037b Mon Sep 17 00:00:00 2001 From: Suresh Dash Date: Thu, 31 Jul 2025 16:00:48 +0900 Subject: [PATCH 2/2] Fix: Handle Partial Log Line Writes in CSV Parsing --- tools/README.md | 1 - tools/reproduce_partial_write.py | 1 - 2 files changed, 2 deletions(-) diff --git a/tools/README.md b/tools/README.md index 47b3670..a6e64c9 100644 --- a/tools/README.md +++ b/tools/README.md @@ -49,6 +49,5 @@ After updating postgresql.conf, restart the PostgreSQL instance for the changes python3 tools/reproduce_partial_write.py \ --pg-bin /path/to/postgres/bin \ --log-dir /path/to/pg_log \ - --sample-log-file ./sample_log_entry.txt \ --pg-port 5432 diff --git a/tools/reproduce_partial_write.py b/tools/reproduce_partial_write.py index 5e234f3..56237f6 100644 --- a/tools/reproduce_partial_write.py +++ b/tools/reproduce_partial_write.py @@ -104,7 +104,6 @@ def check_pg_statsinfo_error(log_dir): def check_postgres_running(pg_bin, pg_port): pg_isready_path = os.path.join(pg_bin, "pg_isready") try: - import pdb;pdb.set_trace() result = subprocess.run([pg_isready_path, "-p", str(pg_port)], check=True) if result.returncode != 0: print("❌ PostgreSQL server does not appear to be running:")