-
Notifications
You must be signed in to change notification settings - Fork 15
Implement listen / notify for Postgres backend #105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Assorted snippets with the conversation with qwenIt may be of use to others to see how I arrived at the present state. Here are some useful snippets of the chat I had with qwen; I've edited them lightly for readability. Define overall approachIn this conversation qwen sketched out the overall approach. 🎯 Goal: Add
|
| Feature | Design |
|---|---|
LISTEN / UNLISTEN |
Connection methods returning Result<Nothing> |
NOTIFY |
Connection method with optional payload |
| Notifications | Retrieved via get_notifications() |
| Safety | Validate channel names, escape payload |
| Style | Matches existing sqlgen PostgreSQL API (sync, functional, no hidden threads) |
Discussions on retrieving results
In this conversation we discussed the pro's and con's of owning the loop. We decided against adding any threading etc within sqlgen as it seems to go against the library design.
✅ Summary of Your Approach
- User owns the loop (e.g., in a CLI tool, daemon, or Asio-compatible I/O loop).
select()(orpoll()/epoll()on Linux) blocks until either:- Data arrives on the PostgreSQL socket (potentially a
NOTIFY), - A timeout occurs,
- The loop is explicitly broken (e.g., by signal or shutdown flag).
- Data arrives on the PostgreSQL socket (potentially a
- After
select()indicates readability, you:- Call
PQconsumeInput()to buffer incoming data, - Drain all available
PGnotify*objects viaPQnotifies().
- Call
This is exactly how libpq documentation recommends handling asynchronous notifications in a single-threaded app.
Explanation of LISTEN / NOTIFYActually it just occurred to me that you may not be entirely familiar with this pattern as it appears to be a postgres specific feature. Let me sketch it out quickly. From the docs [1]:
Given this, the approach we intend to use in our code is as follows, we define triggers on a per table basis like so: -- Trigger function to send notifications on currency changes
CREATE OR REPLACE FUNCTION oresdb.notify_currency_changes()
RETURNS TRIGGER AS $$
DECLARE
notification_payload jsonb;
entity_name text := 'ores.risk.currency';
change_timestamp timestamptz := NOW();
BEGIN
-- Construct the JSON payload
notification_payload := jsonb_build_object(
'entity', entity_name,
'timestamp', to_char(change_timestamp, 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"')
);
-- Notify on the 'ores_currencies' channel
PERFORM pg_notify('ores_currencies', notification_payload::text);
RETURN NULL; -- AFTER triggers can return NULL
END;
$$ LANGUAGE plpgsql;
-- Trigger to fire after INSERT, UPDATE, or DELETE on the currencies table
CREATE OR REPLACE TRIGGER currency_change_notify_trigger
AFTER INSERT OR UPDATE OR DELETE ON oresdb.currencies
FOR EACH ROW EXECUTE FUNCTION oresdb.notify_currency_changes();When an insert is performed, we see it as follows: The JSON object will then be retrieved via the sqlgen machinery, deserialised with rfl and processed by the application. |
Multiple connections requiredImportant point:
|
Exposing the native socketAfter playing with this for a bit, it seems to me that we need to expose the native socket so that client code can wait on it. Qwen analysis on exposing socketRather than embedding platform-specific I/O logic ( This approach: ✅ Matches sqlgen’s design philosophy
✅ Avoids cross-platform I/O complexity in sqlgen
✅ What to exposeAdd one method to // Returns the underlying socket descriptor used by libpq.
// Valid only when connection is active.
// On POSIX: int; on Windows: SOCKET (but libpq returns int for portability).
// Returns -1 if no active connection.
int native_socket() const noexcept {
return PQsocket(conn_);
}
So this is already cross-platform. ✅ User code handles waitingThe user then integrates like this (example with Asio): auto conn = sqlgen::postgres::connect(creds).value();
conn->listen("mychan").value();
asio::io_context io;
asio::posix::stream_descriptor sock(io, conn->native_socket());
sock.async_wait(asio::posix::stream_descriptor::wait_read, [&](auto ec) {
if (!ec) {
// Now safe to call conn->get_notifications()
auto notices = conn->get_notifications();
// process...
}
});
io.run();Or in a CLI loop with ✅ What to remove
✅ Precedent in sqlgensqlgen already follows this pattern:
✅ RecommendationDo this: // In Connection.hpp
int native_socket() const noexcept;Don’t do this:
Instead, document the pattern:
This keeps sqlgen lean, portable, and composable — exactly as designed. Would you like a PR-ready patch for |
Distillation of the work in [1]. Should be sufficient to implement listen/notify support over v0.4.0. [1] getml/sqlgen#105
Accessing the connection from the sessionOK when I finally tried using all of this in anger, I immediately bumped into a problem :-) so I was thinking I could just start a session off of the pool as I do for normal code and just keep that session alive within my Postgres listener. However, from the session I don't seem to be able to access the connection with the present session interface. Not quite sure what the right thing is here:
For the latter, we would need: std::list<Notification> get_notifications() noexcept;
rfl::Result<Nothing> listen(const std::string& channel) noexcept;
rfl::Result<Nothing> unlisten(const std:: string& channel) noexcept;
rfl::Result<Nothing> notify(const std::string& channel, const std::string& payload = "") noexcept;
int native_socket() const noexcept;I think I will start by just forwarding these calls from the session to the connection since that seems to be in keeping with the current approach whilst I wait for any commentary from you guys. |
Accessing the connection from the session
Actually this is not even that sensible because the notification methods are Postgres specific. I am just going to do the quick thing and expose a getter for the connection for now and perhaps you guys can help me with the right solution for this at some point later on. |
Storing session as a long lived connectionSecond problem: storing the session as a member variable in a listener class does not seem like the greatest of ideas. I mean, to be fair, you already say this in your docs [1]:
So this is making me believe that we should not really use the session interface for this use case:
I'll try to put as much of this stuff in client code to avoid polluting sqlgen with it, get it to work end to end, and then present my findings so we can brainstorm as to what belongs where. [1] Connection Pool |
As per PR 105 [1]. [1] getml/sqlgen#105
As per PR 105 [1]. [1] getml/sqlgen#105
b5d0482 to
84e5f3b
Compare
Ready for review - Final thoughtsI have now removed all the unnecessary bits and added documentation. Let me know what you think. Build is still red due to some intermittent issues. I will try to force a rebuild tomorrow, hopefully it will go green. |
68de425 to
74db215
Compare
Hi sqlgen developers,
As per discussion in [1], I thought it may be easier to discuss this topic with some kind of experimental evidence. I had a quick go at this using qwen and came with with something which, at first glance, is not entirely unusable :-) As with OFFSET, I will iterate on it until it all goes green etc - but if you have any comments as I go along, or if you disagree with the approach entirely do let me know.
Cheers
Marco
[1] Using sqlgen with LISTEN/NOTIFY #104