Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion src/spock_functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -3002,8 +3002,16 @@ get_channel_stats(PG_FUNCTION_ARGS)
values[i++] = ObjectIdGetDatum(entry->key.subid);
values[i++] = ObjectIdGetDatum(entry->key.relid);

/*
* Acquire spinlock before reading counter values to prevent torn
* reads. The writer (handle_stats_counter) uses entry->mutex to
* protect counter updates, so we must use the same lock for reads
* to ensure atomic access to 64-bit counter values.
*/
SpinLockAcquire(&entry->mutex);
for (j = 0; j < SPOCK_STATS_NUM_COUNTERS; j++)
values[i++] = Int64GetDatum(entry->counter[j]);
SpinLockRelease(&entry->mutex);

tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
Expand Down Expand Up @@ -3031,10 +3039,19 @@ reset_channel_stats(PG_FUNCTION_ARGS)

LWLockAcquire(SpockCtx->lock, LW_EXCLUSIVE);

/*
* In principle we could reset only specific channel statistics; but that
* would be more complicated, and it's probably not worth the trouble.
* So for now, just reset all entries.
*/
hash_seq_init(&hash_seq, SpockHash);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
{
hash_search(SpockHash, &entry->key, HASH_REMOVE, NULL);
if (hash_search(SpockHash,
&entry->key,
HASH_REMOVE,
NULL) == NULL)
elog(ERROR, "hash table corrupted");
}

LWLockRelease(SpockCtx->lock);
Expand Down
30 changes: 27 additions & 3 deletions src/spock_group.c
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,20 @@ spock_group_attach(Oid dbid, Oid node_id, Oid remote_node_id)
SpockGroupEntry *entry;
bool found;

entry = (SpockGroupEntry *) hash_search(SpockGroupHash, &key,
HASH_ENTER, &found);
entry = (SpockGroupEntry *) hash_search(SpockGroupHash, &key, HASH_ENTER, &found);

/*
* HASH_FIXED_SIZE hash tables can return NULL when full. Check for this
* to prevent dereferencing NULL pointer.
*/
if (entry == NULL)
{
elog(ERROR, "SpockGroupHash is full, cannot attach to group "
"(dbid=%u, node_id=%u, remote_node_id=%u)",
dbid, node_id, remote_node_id);
return NULL;
}

if (!found)
{
/*
Expand Down Expand Up @@ -357,7 +369,19 @@ spock_group_progress_update(const SpockApplyProgress *sap)
entry = (SpockGroupEntry *) hash_search(SpockGroupHash, &sap->key,
HASH_ENTER, &found);

if (!found)
/*
* HASH_FIXED_SIZE hash tables can return NULL when full. Check for this
* to prevent dereferencing NULL pointer.
*/
if (entry == NULL)
{
elog(WARNING, "SpockGroupHash is full, cannot update progress for group "
"(dbid=%u, node_id=%u, remote_node_id=%u)",
sap->key.dbid, sap->key.node_id, sap->key.remote_node_id);
return false;
}

if (!found) /* New Entry */
{
/*
* New entry: the hash table already copied sap->key into
Expand Down
70 changes: 45 additions & 25 deletions src/spock_output_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -1488,8 +1488,6 @@ relmetacache_init(MemoryContext decoding_context)

if (RelMetaCache == NULL)
{
MemoryContext old_ctxt;

RelMetaCacheContext = AllocSetContextCreate(TopMemoryContext,
"spock output relmetacache",
ALLOCSET_DEFAULT_SIZES);
Expand All @@ -1502,11 +1500,9 @@ relmetacache_init(MemoryContext decoding_context)
ctl.entrysize = sizeof(struct SPKRelMetaCacheEntry);
ctl.hcxt = RelMetaCacheContext;
hash_flags |= HASH_BLOBS;
old_ctxt = MemoryContextSwitchTo(RelMetaCacheContext);
RelMetaCache = hash_create("spock relation metadata cache",
RELMETACACHE_INITIAL_SIZE,
&ctl, hash_flags);
(void) MemoryContextSwitchTo(old_ctxt);

Assert(RelMetaCache != NULL);

Expand All @@ -1530,14 +1526,11 @@ relmetacache_get_relation(struct SpockOutputData *data,
{
struct SPKRelMetaCacheEntry *hentry;
bool found;
MemoryContext old_mctx;

/* Find cached function info, creating if not found */
old_mctx = MemoryContextSwitchTo(RelMetaCacheContext);
hentry = (struct SPKRelMetaCacheEntry *) hash_search(RelMetaCache,
(void *) (&RelationGetRelid(rel)),
HASH_ENTER, &found);
(void) MemoryContextSwitchTo(old_mctx);

/* If not found or not valid, it can't be cached. */
if (!found || !hentry->is_valid)
Expand Down Expand Up @@ -1567,17 +1560,22 @@ relmetacache_flush(void)
HASH_SEQ_STATUS status;
struct SPKRelMetaCacheEntry *hentry;

if (RelMetaCache != NULL)
{
hash_seq_init(&status, RelMetaCache);
if (RelMetaCache == NULL)
return;

while ((hentry = (struct SPKRelMetaCacheEntry *) hash_seq_search(&status)) != NULL)
{
if (hash_search(RelMetaCache,
(void *) &hentry->relid,
HASH_REMOVE, NULL) == NULL)
elog(ERROR, "hash table corrupted");
}
/*
* In principle we could flush only cache entries relating to specific
* relations; but that would be more complicated, and it's probably not
* worth the trouble. So for now, just flush all entries.
*/
hash_seq_init(&status, RelMetaCache);
while ((hentry = (struct SPKRelMetaCacheEntry *) hash_seq_search(&status)) != NULL)
{
if (hash_search(RelMetaCache,
(void *) &hentry->relid,
HASH_REMOVE,
NULL) == NULL)
elog(ERROR, "hash table corrupted");
}
}

Expand All @@ -1592,27 +1590,49 @@ relmetacache_prune(void)
{
HASH_SEQ_STATUS status;
struct SPKRelMetaCacheEntry *hentry;
Oid *relids_to_remove;
int num_entries;
int i;
int idx;

/*
* Since the pruning can be expensive, do it only if ig we invalidated at
* Since the pruning can be expensive, do it only if we invalidated at
* least half of initial cache size.
*/
if (InvalidRelMetaCacheCnt < RELMETACACHE_INITIAL_SIZE / 2)
return;

hash_seq_init(&status, RelMetaCache);
/*
* Cannot call hash_search(HASH_REMOVE) during hash_seq_search iteration
* because removing entries corrupts the iterator state (bucket pointers,
* freed memory), causing crashes or skipped entries. Use two-pass
* approach: collect relids of invalid entries, then remove.
*/
num_entries = hash_get_num_entries(RelMetaCache);
if (num_entries == 0)
return;

relids_to_remove = (Oid *) palloc(num_entries * sizeof(Oid));
idx = 0;

/* First pass: collect invalid entries */
hash_seq_init(&status, RelMetaCache);
while ((hentry = (struct SPKRelMetaCacheEntry *) hash_seq_search(&status)) != NULL)
{
if (!hentry->is_valid)
{
if (hash_search(RelMetaCache,
(void *) &hentry->relid,
HASH_REMOVE, NULL) == NULL)
elog(ERROR, "hash table corrupted");
}
relids_to_remove[idx++] = hentry->relid;
}

/* Second pass: remove entries */
for (i = 0; i < idx; i++)
{
if (hash_search(RelMetaCache,
(void *) &relids_to_remove[i],
HASH_REMOVE, NULL) == NULL)
elog(ERROR, "hash table corrupted");
}

pfree(relids_to_remove);
InvalidRelMetaCacheCnt = 0;
}

Expand Down
12 changes: 12 additions & 0 deletions src/spock_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,18 @@ handle_stats_counter(Relation relation, Oid subid, spockStatsType typ, int ntup)

entry = (spockStatsEntry *) hash_search(SpockHash, &key,
HASH_ENTER, &found);

/*
* HASH_FIXED_SIZE hash tables can return NULL when full. Check for
* this to prevent dereferencing NULL pointer.
*/
if (entry == NULL)
{
LWLockRelease(SpockCtx->lock);
elog(WARNING, "SpockHash is full, cannot add stats entry");
spock_stats_hash_full = true;
return;
}
}

if (!found)
Expand Down