Skip to content

Commit 185ea80

Browse files
committed
Split topic.Receive to topic.ReceiveAll and topic.ReceiveSince.
In the old system, ReceiveAll was implemented by giviing an `0`-id to Receive. This was vague. The new system uses different function to make it more clear.
1 parent 72ed12f commit 185ea80

File tree

5 files changed

+67
-71
lines changed

5 files changed

+67
-71
lines changed

benchmark_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func benchmarkPublishWithXReceivers(count int, b *testing.B) {
1818
var id uint64
1919
var values []string
2020
for {
21-
id, values, _ = top.Receive(ctx, id)
21+
id, values, _ = top.ReceiveSince(ctx, id)
2222
if len(values) == 0 {
2323
return
2424
}
@@ -47,7 +47,7 @@ func benchmarkRetrieveBigTopic(count int, b *testing.B) {
4747
b.ResetTimer()
4848

4949
for b.Loop() {
50-
top.Receive(b.Context(), 0)
50+
top.ReceiveSince(b.Context(), 0)
5151
}
5252
}
5353

@@ -69,7 +69,7 @@ func benchmarkRetrieveLastBigTopic(count int, b *testing.B) {
6969
b.ResetTimer()
7070

7171
for b.Loop() {
72-
top.Receive(ctx, id-1)
72+
top.ReceiveSince(ctx, id-1)
7373
}
7474
}
7575

doc.go

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ of being pushed.
55
It solves the problem, that you want to publish data to many goroutines. The
66
standard way in go uses channels to push values to the readers. But channels
77
have the problems, that either the goroutine sending the data has to wait for
8-
the reader or has to discard messages,if the reader is too slow. A buffered
8+
the reader or has to discard messages, if the reader is too slow. A buffered
99
channel can help to delay the problem, but eventually the buffer could be full.
1010
1111
The idea of pulling updates is inspired by Kafka or Redis-Streams. A subscriber
@@ -44,30 +44,30 @@ ignored.
4444
4545
# Receive messages
4646
47-
Messages can be received with the Receive()-method:
47+
Messages can be received with the ReceiveAll()- or ReceiveSince()-method:
4848
49-
id, values, err := top.Receive(context.Background(), 0)
49+
id, values := topic.ReceiveAll()
50+
id, values, err := top.ReceiveSince(context.Background(), 42)
5051
51-
The first returned value is the id created by the last Publish()-call. The
52-
second value is a slice of all messages that were published before.
52+
The returned id is the number of values in the topic. It can only increase.
5353
54-
To receive newer values, Receive() can be called again with the id from the last
54+
The returned values are a slice of the published messages.
55+
56+
To receive newer values, ReceiveSince() can be called again with the id from the last
5557
call:
5658
57-
id, values, err := top.Receive(context.Background(), 0)
59+
id, values, err := top.ReceiveAll()
5860
...
59-
id, values, err = top.Receive(context.Background(), id)
61+
id, values, err = top.ReceiveSince(context.Background(), id)
6062
61-
If the given id is zero, then all messages are returned. If the id is greater
62-
than zero, then only messages are returned that were published by the topic
63-
after the id was created.
63+
Only messages, that were published after the given id are returned.
6464
65-
When there are no new values in the topic, then the Receive()-call blocks until
65+
When there are no new values in the topic, then the ReceiveSince()-call blocks until
6666
there are new values. To add a timeout to the call, the context can be used:
6767
6868
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
6969
defer cancel()
70-
id, values, err = top.Receive(ctx, id)
70+
id, values, err = top.ReceiveSince(ctx, id)
7171
7272
If there are no new values before the context is canceled, the topic returns the error
7373
of the context. For example `context.DeadlineExceeded` or `context.Canceled`.
@@ -81,7 +81,7 @@ The usual pattern to subscribe to a topic is:
8181
var values []string
8282
var err error
8383
for {
84-
id, values, err = top.Receive(ctx, id)
84+
id, values, err = top.ReceiveSince(ctx, id)
8585
if err != nil {
8686
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
8787
// Timeout
@@ -92,7 +92,7 @@ The usual pattern to subscribe to a topic is:
9292
// Process values
9393
}
9494
95-
The loop will process all values published by the topic for one minute.
95+
The loop will process all values published to the topic for one minute.
9696
9797
# Get Last ID
9898
@@ -101,18 +101,19 @@ should be processed that were published after the loop starts, the method
101101
LastID() can be used:
102102
103103
id := top.LastID()
104-
id, values, err = top.Receive(context.Background(), id)
104+
id, values, err = top.ReceiveSince(context.Background(), id)
105105
106-
The return value of LastID() is the highest id in the topic. So a Receive() call
107-
on top.LastID() will only return data that was published after the call.
106+
The return value of LastID() is the highest id in the topic. So
107+
a ReceiveSince() call on top.LastID() will only return data that
108+
was published after the call.
108109
109110
A pattern to receive only new data is:
110111
111112
id := top.LastID()
112113
var values []string
113114
var err error
114115
for {
115-
id, values, err = top.Receive(context.Background(), id)
116+
id, values, err = top.ReceiveSince(context.Background(), id)
116117
if err != nil {
117118
// Handle error
118119
}

example_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func ExampleTopic() {
2424
var values []string
2525
var err error
2626
for {
27-
id, values, err = top.Receive(ctx, id)
27+
id, values, err = top.ReceiveSince(ctx, id)
2828
if err != nil {
2929
if errors.Is(err, context.Canceled) {
3030
// shutdown was called.

topic.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,19 +64,28 @@ func (t *Topic[T]) Publish(value ...T) uint64 {
6464
return t.lastID()
6565
}
6666

67-
// Receive returns all values from the topic. If id is 0, all values are
68-
// returned. Otherwise, all values that were inserted after the id are returned.
67+
// ReceiveAll returns all values from the topic, that is not pruned.
68+
//
69+
// For performance reasons, this function returns the internal slice of the
70+
// topic. It is not allowed to manipulate the values.
71+
func (t *Topic[T]) ReceiveAll() (uint64, []T) {
72+
t.mu.RLock()
73+
defer t.mu.RUnlock()
74+
75+
return t.lastID(), t.data
76+
}
77+
78+
// ReceiveSince returns all values from the topic after the given id.
6979
//
7080
// If the id is lower than the lowest id in the topic, an error of type
7181
// UnknownIDError is returned.
7282
//
7383
// If there is no new data, Receive() blocks until there is new data or the
74-
// given channel is done. The same happens with id 0, when there is no data at
75-
// all in the topic.
84+
// given channel is done.
7685
//
7786
// For performance reasons, this function returns the internal slice of the
7887
// topic. It is not allowed to manipulate the values.
79-
func (t *Topic[T]) Receive(ctx context.Context, id uint64) (uint64, []T, error) {
88+
func (t *Topic[T]) ReceiveSince(ctx context.Context, id uint64) (uint64, []T, error) {
8089
t.mu.RLock()
8190
lastIDWhenStarted := t.lastID()
8291

@@ -88,18 +97,14 @@ func (t *Topic[T]) Receive(ctx context.Context, id uint64) (uint64, []T, error)
8897

8998
select {
9099
case <-c:
91-
return t.Receive(ctx, lastIDWhenStarted)
100+
return t.ReceiveSince(ctx, lastIDWhenStarted)
92101
case <-ctx.Done():
93102
return 0, nil, ctx.Err()
94103
}
95104
}
96105

97106
defer t.mu.RUnlock()
98107

99-
if id == 0 {
100-
return t.lastID(), t.data, nil
101-
}
102-
103108
if id < t.offset {
104109
return 0, nil, UnknownIDError{ID: id, FirstID: t.offset + 1}
105110
}

0 commit comments

Comments
 (0)