Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 42 additions & 5 deletions agent/bin/logger.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ typedef struct Logger
char *csv_name; /* log file name */
long csv_offset; /* parsed bytes in log file */
FILE *fp; /* open log file */
long last_log_size; /* track the last read log file size*/
bool log_incomplete; /* True if the last read_csv() failed (partial write likely) */


/* Text log file */
FILE *textlog;
Expand Down Expand Up @@ -734,9 +737,21 @@ logger_next(Logger *logger, const char *pg_log)
struct stat st;
bool ret;

/*
* Step 1: Check if the file size remains unchanged.
* If logger->last_log_size is the same as the current file size (st.st_size),
* it indicates that no new logs have been written since the last check.
*
* Step 2: Use logger->log_incomplete to determine if this is due to a read_csv() failure.
* If the previous read_csv() attempt failed, we assume a partial write.
* This helps differentiate between:
* - A fully flushed but stagnant log file (safe to continue reading)
* - An incomplete write (where we should wait before retrying)
*/
if (logger->fp == NULL ||
stat(logger->csv_path, &st) != 0 ||
logger->csv_offset >= st.st_size)
logger->csv_offset >= st.st_size ||
(logger->last_log_size == st.st_size && logger->log_incomplete))
{
char csvlog[MAXPGPATH];
char textlog[MAXPGPATH];
Expand All @@ -748,8 +763,10 @@ logger_next(Logger *logger, const char *pg_log)

if (!csvlog[0])
return false; /* logfile not found */
if (logger->fp && strcmp(logger->csv_name, csvlog) == 0)
return false; /* no more logs */

/* If the same file is already being processed and log size is unchanged, wait */
if (logger->fp && strcmp(logger->csv_name, csvlog) == 0 && logger->last_log_size == st.st_size)
return false; /* no more logs */

join_path_components(textlog, pg_log, csvlog);
replace_extension(textlog, ".log");
Expand All @@ -771,6 +788,14 @@ logger_next(Logger *logger, const char *pg_log)
if (!logger_open(logger, csvlog, 0))
return false;

/* Reset tracking variables for the new file */
logger->last_log_size = 0;
logger->log_incomplete = false;

/* Refresh file stats for the new log */
if (stat(logger->csv_path, &st) != 0)
return false; /* Failed to get new log size */

/*
* if writer thread working in fallback mode,
* write in the textlog that agent is working in fallback mode.
Expand All @@ -784,15 +809,27 @@ logger_next(Logger *logger, const char *pg_log)
elog(DEBUG2, "read csvlog \"%s\"", logger->csv_path);
}

/* Read the next log entry */
clearerr(logger->fp);
fseek(logger->fp, logger->csv_offset, SEEK_SET);
ret = read_csv(logger->fp, &logger->buf, CSV_COLS, logger->fields);
logger->csv_offset = ftell(logger->fp);

if (!ret)
/* Update last_log_size AFTER reading, to prevent infinite WARNING loops */
logger->last_log_size = st.st_size;

if (ret)
{
/* Update offset only if read_csv() was successful */
logger->csv_offset = ftell(logger->fp);
logger->log_incomplete = false;
}
else
{
int save_errno = errno;

/* Track partial write if read_csv() fails */
logger->log_incomplete = true;

/* close the file unless EOF; it means an error */
if (!feof(logger->fp))
{
Expand Down
49 changes: 43 additions & 6 deletions agent/bin/logger_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ typedef struct Logger
char *csv_name; /* log file name */
long csv_offset; /* parsed bytes in log file */
FILE *fp; /* open log file */
long last_log_size; /* track the last read log file size*/
bool log_incomplete; /* True if the last read_csv() failed (partial write likely) */

/* temp buffer */
StringInfoData buf; /* log buffer */
Expand Down Expand Up @@ -284,33 +286,68 @@ logger_next(Logger *logger)
struct stat st;
bool ret;

if (logger->fp == NULL ||
/*
* Step 1: Check if the file size remains unchanged.
* If logger->last_log_size is the same as the current file size (st.st_size),
* it indicates that no new logs have been written since the last check.
*
* Step 2: Use logger->log_incomplete to determine if this is due to a read_csv() failure.
* If the previous read_csv() attempt failed, we assume a partial write.
* This helps differentiate between:
* - A fully flushed but stagnant log file (safe to continue reading)
* - An incomplete write (where we should wait before retrying)
*/
if (logger->fp == NULL ||
stat(logger->csv_path, &st) != 0 ||
logger->csv_offset >= st.st_size)
logger->csv_offset >= st.st_size ||
(logger->last_log_size == st.st_size && logger->log_incomplete))
{
char csvlog[MAXPGPATH];

get_csvlog(csvlog, logger->csv_name, my_log_directory);

if (!csvlog[0])
return false; /* logfile not found */
if (logger->fp && strcmp(logger->csv_name, csvlog) == 0)
return false; /* no more logs */

/* If the same file is already being processed and log size is unchanged, wait */
if (logger->fp && strcmp(logger->csv_name, csvlog) == 0 && logger->last_log_size == st.st_size)
return false;

/* Switch to the new log file */
logger_close(logger);
if (!logger_open(logger, csvlog, 0))
return false;

/* Reset tracking variables for the new file */
logger->last_log_size = 0;
logger->log_incomplete = false;

/* Refresh file stats for the new log */
if (stat(logger->csv_path, &st) != 0)
return false; /* Failed to get new log size */
}

/* Read the next log entry */
clearerr(logger->fp);
fseek(logger->fp, logger->csv_offset, SEEK_SET);
ret = read_csv(logger->fp, &logger->buf, CSV_COLS, logger->fields);
logger->csv_offset = ftell(logger->fp);

if (!ret)
/* Update last_log_size AFTER reading, to prevent infinite WARNING loops */
logger->last_log_size = st.st_size;

if (ret)
{
/* Update offset only if read_csv() was successful */
logger->csv_offset = ftell(logger->fp);
logger->log_incomplete = false;
}
else
{
int save_errno = errno;

/* Track partial write if read_csv() fails */
logger->log_incomplete = true;

/* close the file unless EOF; it means an error */
if (!feof(logger->fp))
{
Expand Down
53 changes: 53 additions & 0 deletions tools/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# pg_statsinfo Partial Write Reproducer

This script simulates a **partially written PostgreSQL CSV log entry** to reproduce a known issue in `pg_statsinfo`, where reading an incomplete log line causes parsing errors and incorrect offset tracking.

It helps validate the robustness of log parsing logic in `pg_statsinfo`, especially in scenarios where the PostgreSQL backend log entry is flushed in two parts.

---

## Prerequisites

- PostgreSQL with `pg_statsinfo` installed and configured
- Python 3.6 or higher
- Python packages:
- `psycopg2`
- `pytz` (optional, if timestamps are involved)

> ⚠️ If your PostgreSQL installation does **not** embed `RPATH`, make sure to set `LD_LIBRARY_PATH` to include the `lib` directory of PostgreSQL:

```bash
export LD_LIBRARY_PATH=/path/to/postgres/lib:$LD_LIBRARY_PATH



## Configuration:

Make sure PostgreSQL is running with the following parameters enabled in postgresql.conf:

logging_collector = on
log_destination = 'csvlog'
log_directory = 'log'
log_filename = 'postgresql-%Y-%m-%d_%H%M%S.csv'
log_statement = 'all'
log_min_messages = LOG

# pg_statsinfo-specific settings
pg_statsinfo.repolog_buffer = 1 # Set a small buffer size so pg_statsinfo inserts
# parsed logs immediately, making errors easier to
# reproduce promptly
pg_statsinfo.repolog_interval = 60

------
After updating postgresql.conf, restart the PostgreSQL instance for the changes to take effect.




## Usage:

python3 tools/reproduce_partial_write.py \
--pg-bin /path/to/postgres/bin \
--log-dir /path/to/pg_log \
--pg-port 5432

Loading