Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions docs/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,94 @@ const auto minors = query(conn);
- Error handling through `Result<T>`
- Resource management through `Ref<T>`
- Customizable connection parameters (host, port, database name, etc.)
- LISTEN/NOTIFY for real-time event notifications

## LISTEN/NOTIFY

PostgreSQL provides a simple publish-subscribe mechanism through `LISTEN` and `NOTIFY` commands. This allows database clients to receive real-time notifications when events occur, without polling. Any client can send a notification to a channel, and all clients listening on that channel will receive it asynchronously.

> **Note:** You should use a dedicated connection for LISTEN/NOTIFY, separate from your main database activity and outside any connection pool. This is because the listening connection must remain open and persistent to receive notifications, and connection pools typically recycle connections which would lose the LISTEN state.

### API

The `Connection` class provides the following methods for listen/notify:

```cpp
// Subscribe to a notification channel
rfl::Result<Nothing> listen(const std::string& channel) noexcept;

// Unsubscribe from a notification channel
rfl::Result<Nothing> unlisten(const std::string& channel) noexcept;

// Send a notification to a channel with an optional payload
rfl::Result<Nothing> notify(const std::string& channel,
const std::string& payload = "") noexcept;

// Consume input from the server (must be called before get_notifications)
bool consume_input() noexcept;

// Retrieve all pending notifications
std::list<Notification> get_notifications() noexcept;
```

The `Notification` struct contains:

```cpp
struct Notification {
std::string channel; // The channel name
std::string payload; // The notification payload (may be empty)
int backend_pid; // The PID of the notifying backend
};
```

### Subscribing to Channels

```cpp
auto conn = sqlgen::postgres::connect(creds);
if (!conn) {
// Handle error...
return;
}

// Subscribe to a channel
auto result = (*conn)->listen("my_channel");
if (!result) {
// Handle error...
}
```

### Receiving Notifications

To receive notifications, you must periodically call `consume_input()` to read data from the server, then `get_notifications()` to retrieve any pending notifications:

```cpp
while (running) {
// Consume any available input from the server
if (!(*conn)->consume_input()) {
// Connection error
break;
}

// Process any pending notifications
auto notifications = (*conn)->get_notifications();
for (const auto& notification : notifications) {
// Handle the notification
std::cout << "Channel: " << notification.channel
<< " Payload: " << notification.payload << std::endl;
}

// Sleep briefly before checking again
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
```

### Sending Notifications

```cpp
// Send a notification with a payload
auto result = (*conn)->notify("my_channel", "event data here");
if (!result) {
// Handle error...
}
```

25 changes: 25 additions & 0 deletions include/sqlgen/postgres/Connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <stdexcept>
#include <string>
#include <vector>
#include <list>

#include "../Iterator.hpp"
#include "../Ref.hpp"
Expand All @@ -35,6 +36,18 @@

namespace sqlgen::postgres {

enum class NotificationWaitResult {
Ready, // Data available (possibly a NOTIFY)
Timeout, // Timeout elapsed
Error // I/O or connection error
};

struct Notification {
std::string channel;
std::string payload;
int backend_pid;
};

class SQLGEN_API Connection {
using Conn = PostgresV2Connection;

Expand Down Expand Up @@ -86,6 +99,16 @@ class SQLGEN_API Connection {
[&](const auto& _data) { return write_impl(_data); }, _begin, _end);
}

std::list<Notification> get_notifications() noexcept;

rfl::Result<Nothing> listen(const std::string& channel) noexcept;

rfl::Result<Nothing> unlisten(const std:: string& channel) noexcept;

rfl::Result<Nothing> notify(const std::string& channel, const std::string& payload = "") noexcept;

bool consume_input() noexcept;

private:
Result<Nothing> insert_impl(
const dynamic::Insert& _stmt,
Expand All @@ -101,6 +124,8 @@ class SQLGEN_API Connection {
Result<Nothing> write_impl(
const std::vector<std::vector<std::optional<std::string>>>& _data);

bool is_valid_channel_name(const std::string& s) const noexcept;

private:
Conn conn_;
};
Expand Down
74 changes: 73 additions & 1 deletion src/sqlgen/postgres/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,69 @@ Result<Nothing> Connection::end_write() {
return Nothing{};
}

std::list<Notification> Connection::get_notifications() noexcept {
std::list<Notification> notices;

// Safe to call even if no data — just returns true
if (!PQconsumeInput(conn_.ptr())) {
// Note: In pure wait/consume pattern, this should rarely happen if socket is healthy
// But we don't error here — just skip
return notices;
}

PGnotify* notify;
while ((notify = PQnotifies(conn_.ptr())) != nullptr) {
notices.push_back({
.channel = std::string(notify->relname),
.payload = notify->extra[0] ? std::string(notify->extra) : "",
.backend_pid = notify->be_pid
});
PQfreemem(notify);
}

return notices;
}

rfl::Result<Nothing> Connection::listen(const std::string& channel) noexcept {
if (!is_valid_channel_name(channel)) {
return error("Invalid channel name: must be a PostgreSQL identifier");
}
const std::string sql = "LISTEN " + channel;
return execute(sql);
}

rfl::Result<Nothing> Connection::unlisten(const std::string& channel) noexcept {
if (channel == "*") {
return execute("UNLISTEN *");
}
if (!is_valid_channel_name(channel)) {
return error("Invalid channel name");
}
const std::string sql = "UNLISTEN " + channel;
return execute(sql);
}

rfl::Result<Nothing> Connection::notify(const std::string& channel, const std::string& payload) noexcept {
if (!is_valid_channel_name(channel)) {
return error("Invalid channel name");
}

auto* escaped_payload = PQescapeLiteral(conn_.ptr(), payload.c_str(), payload.size());
if (!escaped_payload) {
return error("Failed to escape NOTIFY payload");
}
const std::string sql = "NOTIFY " + channel + ", " + std::string(escaped_payload);
PQfreemem(escaped_payload);

auto result = execute(sql);
PQflush(conn_.ptr());
return result;
}

bool Connection::consume_input() noexcept {
return PQconsumeInput(conn_.ptr()) == 1;
}

Result<Nothing> Connection::insert_impl(
const dynamic::Insert& _stmt,
const std::vector<std::vector<std::optional<std::string>>>&
Expand Down Expand Up @@ -160,5 +223,14 @@ Result<Nothing> Connection::write_impl(
return Nothing{};
}

} // namespace sqlgen::postgres
bool Connection::is_valid_channel_name(const std::string& s) const noexcept {
if (s.empty()) return false;
const char first = s[0];
if (first != '_' && !std::isalpha(static_cast<unsigned char>(first)))
return false;
return std::all_of(s.begin() + 1, s.end(), [](char c) {
return c == '_' || std::isalnum(static_cast<unsigned char>(c));
});
}

} // namespace sqlgen::postgres
Loading
Loading