diff --git a/agent/bin/logger.c b/agent/bin/logger.c index dd126f4..927f99d 100644 --- a/agent/bin/logger.c +++ b/agent/bin/logger.c @@ -56,6 +56,9 @@ typedef struct Logger char *csv_name; /* log file name */ long csv_offset; /* parsed bytes in log file */ FILE *fp; /* open log file */ + long last_log_size; /* track the last read log file size*/ + bool log_incomplete; /* True if the last read_csv() failed (partial write likely) */ + /* Text log file */ FILE *textlog; @@ -734,9 +737,21 @@ logger_next(Logger *logger, const char *pg_log) struct stat st; bool ret; + /* + * Step 1: Check if the file size remains unchanged. + * If logger->last_log_size is the same as the current file size (st.st_size), + * it indicates that no new logs have been written since the last check. + * + * Step 2: Use logger->log_incomplete to determine if this is due to a read_csv() failure. + * If the previous read_csv() attempt failed, we assume a partial write. + * This helps differentiate between: + * - A fully flushed but stagnant log file (safe to continue reading) + * - An incomplete write (where we should wait before retrying) + */ if (logger->fp == NULL || stat(logger->csv_path, &st) != 0 || - logger->csv_offset >= st.st_size) + logger->csv_offset >= st.st_size || + (logger->last_log_size == st.st_size && logger->log_incomplete)) { char csvlog[MAXPGPATH]; char textlog[MAXPGPATH]; @@ -748,8 +763,10 @@ logger_next(Logger *logger, const char *pg_log) if (!csvlog[0]) return false; /* logfile not found */ - if (logger->fp && strcmp(logger->csv_name, csvlog) == 0) - return false; /* no more logs */ + + /* If the same file is already being processed and log size is unchanged, wait */ + if (logger->fp && strcmp(logger->csv_name, csvlog) == 0 && logger->last_log_size == st.st_size) + return false; /* no more logs */ join_path_components(textlog, pg_log, csvlog); replace_extension(textlog, ".log"); @@ -771,6 +788,14 @@ logger_next(Logger *logger, const char *pg_log) if (!logger_open(logger, csvlog, 0)) return false; + /* Reset tracking variables for the new file */ + logger->last_log_size = 0; + logger->log_incomplete = false; + + /* Refresh file stats for the new log */ + if (stat(logger->csv_path, &st) != 0) + return false; /* Failed to get new log size */ + /* * if writer thread working in fallback mode, * write in the textlog that agent is working in fallback mode. @@ -784,15 +809,27 @@ logger_next(Logger *logger, const char *pg_log) elog(DEBUG2, "read csvlog \"%s\"", logger->csv_path); } + /* Read the next log entry */ clearerr(logger->fp); fseek(logger->fp, logger->csv_offset, SEEK_SET); ret = read_csv(logger->fp, &logger->buf, CSV_COLS, logger->fields); - logger->csv_offset = ftell(logger->fp); - if (!ret) + /* Update last_log_size AFTER reading, to prevent infinite WARNING loops */ + logger->last_log_size = st.st_size; + + if (ret) + { + /* Update offset only if read_csv() was successful */ + logger->csv_offset = ftell(logger->fp); + logger->log_incomplete = false; + } + else { int save_errno = errno; + /* Track partial write if read_csv() fails */ + logger->log_incomplete = true; + /* close the file unless EOF; it means an error */ if (!feof(logger->fp)) { diff --git a/agent/bin/logger_send.c b/agent/bin/logger_send.c index d332fc8..236ab7b 100644 --- a/agent/bin/logger_send.c +++ b/agent/bin/logger_send.c @@ -29,6 +29,8 @@ typedef struct Logger char *csv_name; /* log file name */ long csv_offset; /* parsed bytes in log file */ FILE *fp; /* open log file */ + long last_log_size; /* track the last read log file size*/ + bool log_incomplete; /* True if the last read_csv() failed (partial write likely) */ /* temp buffer */ StringInfoData buf; /* log buffer */ @@ -284,9 +286,21 @@ logger_next(Logger *logger) struct stat st; bool ret; - if (logger->fp == NULL || + /* + * Step 1: Check if the file size remains unchanged. + * If logger->last_log_size is the same as the current file size (st.st_size), + * it indicates that no new logs have been written since the last check. + * + * Step 2: Use logger->log_incomplete to determine if this is due to a read_csv() failure. + * If the previous read_csv() attempt failed, we assume a partial write. + * This helps differentiate between: + * - A fully flushed but stagnant log file (safe to continue reading) + * - An incomplete write (where we should wait before retrying) + */ + if (logger->fp == NULL || stat(logger->csv_path, &st) != 0 || - logger->csv_offset >= st.st_size) + logger->csv_offset >= st.st_size || + (logger->last_log_size == st.st_size && logger->log_incomplete)) { char csvlog[MAXPGPATH]; @@ -294,23 +308,46 @@ logger_next(Logger *logger) if (!csvlog[0]) return false; /* logfile not found */ - if (logger->fp && strcmp(logger->csv_name, csvlog) == 0) - return false; /* no more logs */ + /* If the same file is already being processed and log size is unchanged, wait */ + if (logger->fp && strcmp(logger->csv_name, csvlog) == 0 && logger->last_log_size == st.st_size) + return false; + + /* Switch to the new log file */ logger_close(logger); if (!logger_open(logger, csvlog, 0)) return false; + + /* Reset tracking variables for the new file */ + logger->last_log_size = 0; + logger->log_incomplete = false; + + /* Refresh file stats for the new log */ + if (stat(logger->csv_path, &st) != 0) + return false; /* Failed to get new log size */ } + /* Read the next log entry */ clearerr(logger->fp); fseek(logger->fp, logger->csv_offset, SEEK_SET); ret = read_csv(logger->fp, &logger->buf, CSV_COLS, logger->fields); - logger->csv_offset = ftell(logger->fp); - if (!ret) + /* Update last_log_size AFTER reading, to prevent infinite WARNING loops */ + logger->last_log_size = st.st_size; + + if (ret) + { + /* Update offset only if read_csv() was successful */ + logger->csv_offset = ftell(logger->fp); + logger->log_incomplete = false; + } + else { int save_errno = errno; + /* Track partial write if read_csv() fails */ + logger->log_incomplete = true; + /* close the file unless EOF; it means an error */ if (!feof(logger->fp)) { diff --git a/tools/README.md b/tools/README.md new file mode 100644 index 0000000..a6e64c9 --- /dev/null +++ b/tools/README.md @@ -0,0 +1,53 @@ +# pg_statsinfo Partial Write Reproducer + +This script simulates a **partially written PostgreSQL CSV log entry** to reproduce a known issue in `pg_statsinfo`, where reading an incomplete log line causes parsing errors and incorrect offset tracking. + +It helps validate the robustness of log parsing logic in `pg_statsinfo`, especially in scenarios where the PostgreSQL backend log entry is flushed in two parts. + +--- + +## Prerequisites + +- PostgreSQL with `pg_statsinfo` installed and configured +- Python 3.6 or higher +- Python packages: + - `psycopg2` + - `pytz` (optional, if timestamps are involved) + +> ⚠️ If your PostgreSQL installation does **not** embed `RPATH`, make sure to set `LD_LIBRARY_PATH` to include the `lib` directory of PostgreSQL: + +```bash +export LD_LIBRARY_PATH=/path/to/postgres/lib:$LD_LIBRARY_PATH + + + +## Configuration: + +Make sure PostgreSQL is running with the following parameters enabled in postgresql.conf: + +logging_collector = on +log_destination = 'csvlog' +log_directory = 'log' +log_filename = 'postgresql-%Y-%m-%d_%H%M%S.csv' +log_statement = 'all' +log_min_messages = LOG + +# pg_statsinfo-specific settings +pg_statsinfo.repolog_buffer = 1 # Set a small buffer size so pg_statsinfo inserts + # parsed logs immediately, making errors easier to + # reproduce promptly +pg_statsinfo.repolog_interval = 60 + +------ +After updating postgresql.conf, restart the PostgreSQL instance for the changes to take effect. + + + + +## Usage: + +python3 tools/reproduce_partial_write.py \ + --pg-bin /path/to/postgres/bin \ + --log-dir /path/to/pg_log \ + --pg-port 5432 + diff --git a/tools/reproduce_partial_write.py b/tools/reproduce_partial_write.py new file mode 100644 index 0000000..56237f6 --- /dev/null +++ b/tools/reproduce_partial_write.py @@ -0,0 +1,180 @@ +import argparse +import os +import time +import glob +import re +import pytz +import psycopg2 +import subprocess +import sys +from datetime import datetime + +MAX_RETRIES = 5 +RETRY_DELAY = 5 # seconds + + +def get_current_timestamp_jst(): + jst = pytz.timezone("Asia/Tokyo") + return datetime.now(jst).strftime("%Y-%m-%d %H:%M:%S JST") + + +def parse_to_hourly_timestamp(timestamp_str): + timestamp = datetime.strptime(timestamp_str.replace(" JST", ""), "%Y-%m-%d %H:%M:%S") + rounded_time = timestamp.replace(minute=0, second=0, microsecond=0) + return rounded_time.strftime("%Y-%m-%d %H:%M:%S") + + +def get_latest_logfile(log_dir): + log_files = sorted(glob.glob(os.path.join(log_dir, "postgresql-*.csv")), reverse=True) + return log_files[0] if log_files else None + + +def generate_new_logfile_name(latest_logfile): + match = re.search(r"(postgresql-\d{4}-\d{2}-\d{2}_)(\d{6})", latest_logfile) + if match: + prefix, number = match.groups() + new_number = f"{int(number) + 1:06d}" + return latest_logfile.replace(f"_{number}.csv", f"_{new_number}.csv") + return None + + +def create_partial_csvlog(log_dir): + latest_logfile = get_latest_logfile(log_dir) + if not latest_logfile: + raise RuntimeError("No CSV log files found.") + + new_logfile = generate_new_logfile_name(latest_logfile) + if not new_logfile: + raise RuntimeError("Failed to generate new log file name.") + + full_timestamp = get_current_timestamp_jst() + partial_timestamp = full_timestamp[:14] # Partial write: YYYY-MM-DD HH: + + with open(new_logfile, "w") as f: + f.write(partial_timestamp) + + print(f"Partial CSV log created: {new_logfile}") + return new_logfile, full_timestamp + + +def append_to_logfile(logfile, content): + time.sleep(5) + with open(logfile, "a") as f: + f.write(content + "\n") + print(f"✅ Appended to {logfile}:{content}") + + +def check_pg_statsinfo_log(log_dir, min_count=2): + log_path = os.path.join(log_dir, "pg_statsinfo.log") + if not os.path.exists(log_path): + return False + + with open(log_path, "r") as f: + lines = f.readlines() + + count = sum(1 for line in lines[-50:] if "WARNING: pg_statsinfo: cannot parse csvlog column" in line) + return count >= min_count + + +def execute_query(query, db_config): + conn = psycopg2.connect(**db_config) + cur = conn.cursor() + cur.execute(query) + result = cur.fetchall() + conn.commit() + cur.close() + conn.close() + return result + + +def check_pg_statsinfo_error(log_dir): + error_strings = [ + "ERROR: invalid input syntax for type timestamp with time zone", + "ERROR: pg_statsinfo: query failed: ERROR: date/time " + ] + count = 0 + for file in os.listdir(log_dir): + path = os.path.join(log_dir, file) + if os.path.isfile(path): + with open(path, "r") as f: + count += sum(1 for line in f if any(err in line for err in error_strings)) + return count > 0 + + +def check_postgres_running(pg_bin, pg_port): + pg_isready_path = os.path.join(pg_bin, "pg_isready") + try: + result = subprocess.run([pg_isready_path, "-p", str(pg_port)], check=True) + if result.returncode != 0: + print("❌ PostgreSQL server does not appear to be running:") + print(result.stderr.strip()) + exit(1) + else: + print("✅ PostgreSQL server is running.") + except Exception as e: + print(f"❌ Unexpected error while checking PostgreSQL status: {e}") + sys.exit(1) + except FileNotFoundError: + print("⚠️ 'pg_isready' not found. Please ensure PostgreSQL tools are installed and in PATH.") + exit(1) + + +def main(): + parser = argparse.ArgumentParser(description="Reproduce partial write parsing issue in pg_statsinfo") + parser.add_argument("--data-dir", required=True, help="Path to PostgreSQL data directory") + parser.add_argument("--pg-bin", required=True, help="Path to PostgreSQL binaries") + parser.add_argument("--csv-log", required=True, help="Path to CSV log directory") + parser.add_argument("--pg-port", default="5432", help="PostgreSQL port") + args = parser.parse_args() + + db_config = { + "dbname": "postgres", + "user": "postgres", + "host": "localhost", + "port": args.pg_port + } + + print("🔍 Checking PostgreSQL status...") + check_postgres_running(args.pg_bin, args.pg_port) + + print("Creating partial log entry...") + partial_logfile, ts_str = create_partial_csvlog(args.csv_log) + + print("Waiting for pg_statsinfo to detect parsing warning...") + for _ in range(12): + if check_pg_statsinfo_log(args.csv_log): + break + print("Waiting for WARNING in pg_statsinfo.log...") + time.sleep(5) + else: + print("✖️ Parsing warning not detected. Test scenario failed.") + return + + print("Appending remaining part of the line...") + remaining = ts_str[14:] + content = f"{remaining},,,43168,,abcd.1234,3025,,{ts_str},,0,LOG,00000,\"checkpoint starting: time postgres invalid2\",,,,,,,,,\"\",\"checkpointer\",,0" + append_to_logfile(partial_logfile, content) + + query_ts = parse_to_hourly_timestamp(ts_str) + query = f"SELECT count(*) FROM statsrepo.log WHERE timestamp >= '{query_ts}' AND message LIKE '%postgres invalid2%';" + + for attempt in range(1, MAX_RETRIES + 1): + count = execute_query(query, db_config)[0][0] + if count > 0: + print(f"✅ Validation Passed: {count} matching entries found.") + break + else: + print(f"Attempt {attempt}/{MAX_RETRIES} failed. Retrying in {RETRY_DELAY} seconds...") + time.sleep(RETRY_DELAY) + else: + print("✖️ Validation Failed after multiple retries.") + + if check_pg_statsinfo_error(args.csv_log): + print("\u274C [FAILED] Timestamp parse error detected.") + else: + print("\u2705 [PASSED] No timestamp errors found.") + + +if __name__ == "__main__": + main() +