From 4776b33c921db68dbe501432f83ad7d1b97858ee Mon Sep 17 00:00:00 2001 From: Steve Gilberd Date: Sun, 7 Sep 2025 21:50:52 +1200 Subject: [PATCH 01/10] Add packet replay feature This feature causes rebroadcasting nodes to periodically advertise a zero-hop summary of packets that they have recently rebroadcast. Other nodes can use this information to notice that they have missed packets, and request that the advertising node retransmit them again. This feature is currently enabled by default, pending implementation of the necessary config elements to adjust it remotely. In the meantime, it can be disabled entirely by defining MESHTASTIC_EXCLUDE_REPLAY to a non-zero value when building. All tunables are currently statically defined in ReplayModule.h. In order to minimise overhead on-air, this feature uses a non-protobuf payload: | Offset | Size | Description | |--------|------|------------------------------------------------------------------------| | 0 | 2 | Advert or request type | | 2 | 1 | This message advertises or requests high-priority packets only | | 3 | 1 | (adverts only) This is the first advert since the sender booted | | 4 | 1 | The sender is using an infrastructure rebroadcast role | | 5 | 1 | (adverts only) This is an aggregate of specific prior advertisements | | 6 | 1 | (adverets only) This advertisement contains a list of throttled clients| | 7 | 1 | Reserved for future use | | 8 | 5 | The base sequence number to which this advertisement or request refers | | 13 | 1 | Reserved for future use | | 14 | 1 | Reserved for future use | | 15 | 1 | Reserved for future use | Advertisements consist of the standard 2-byte header, followed by: - A 2-byte bitmap, indicating which of the 16 possible cache ranges are present - For each range indicated in the bitmap: - A 2-byte bitmap, indicating which of the packets in this range are referenced - A 2-byte priority bitmap, indicating which packets in this range are high priority - For each included packet, a 2-byte hash: ((from ^ id) >> 16 & 0xFFFF) ^ ((from ^ id) & 0xFFFF) - If the 'aggregate' flag is set, a 2-byte bitmap indicating which other adverts are included in this one - If the 'throttled' flag is set, a list of truncated-to-one-byte node IDs that have been throttled A typical advertisement will have a payload size of 8 bytes plus 2 bytes per included packet. If packets are requested, but no longer cached, then the sender will send a state advertisement indicating which of its advertised packets are no longer available. This consists of the standard 2-byte header, followed by: - A 2-byte bitmap, indicating which of the 16 possible cache ranges are present - For each range indicated in the bitmap: - A 2-byte bitmap, indicating which of the packets in this range are no longer available Requests consist of the standard 2-byte header, followed by: - A 2-byte bitmap, indicating which of the 16 possible cache ranges are being requested - For each range indicated in the bitmap: - A 2-byte bitmap, indicating which of the packets in this range are being requested --- src/mesh/FloodingRouter.cpp | 5 + src/mesh/MemoryPool.h | 46 ++ src/mesh/RadioInterface.cpp | 6 + src/mesh/RadioLibInterface.cpp | 67 +- src/mesh/RadioLibInterface.h | 2 + src/mesh/Router.cpp | 6 +- src/modules/Modules.cpp | 6 + src/modules/ReplayModule.cpp | 1064 ++++++++++++++++++++++++++++++++ src/modules/ReplayModule.h | 218 +++++++ 9 files changed, 1410 insertions(+), 10 deletions(-) create mode 100644 src/modules/ReplayModule.cpp create mode 100644 src/modules/ReplayModule.h diff --git a/src/mesh/FloodingRouter.cpp b/src/mesh/FloodingRouter.cpp index f805055c8c1..1ae5bbdba49 100644 --- a/src/mesh/FloodingRouter.cpp +++ b/src/mesh/FloodingRouter.cpp @@ -66,6 +66,11 @@ bool FloodingRouter::roleAllowsCancelingDupe(const meshtastic_MeshPacket *p) void FloodingRouter::perhapsCancelDupe(const meshtastic_MeshPacket *p) { + if (p->is_replay_cached) { + // This is a replayed packet, so we have already transmitted it before, and any further retransmissions + // are explicitly requested by a replay client and therefore should not be cancelled or delayed. + return; + } if (p->transport_mechanism == meshtastic_MeshPacket_TransportMechanism_TRANSPORT_LORA && roleAllowsCancelingDupe(p)) { // cancel rebroadcast of this message *if* there was already one, unless we're a router/repeater! // But only LoRa packets should be able to trigger this. diff --git a/src/mesh/MemoryPool.h b/src/mesh/MemoryPool.h index eb5ac5109de..aefbeb84c90 100644 --- a/src/mesh/MemoryPool.h +++ b/src/mesh/MemoryPool.h @@ -105,6 +105,29 @@ template class MemoryDynamic : public Allocator } }; +/** + * A version of MemoryDynamic that plays nicely with the replay cache + */ +template class MemoryDynamicReplayAware : public MemoryDynamic +{ + public: + virtual void release(T *p) override + { + if (p->is_replay_cached) + // Don't free packets that are in the replay cache + return; + MemoryDynamic::release(p); + } + + T *allocCopy(const T &src, TickType_t maxWait = portMAX_DELAY) + { + T *p = MemoryDynamic::allocCopy(src, maxWait); + if (p) + p->is_replay_cached = false; + return p; + } +}; + /** * A static memory pool that uses a fixed buffer instead of heap allocation */ @@ -141,6 +164,14 @@ template class MemoryPool : public Allocator } } + T *allocCopy(const T &src, TickType_t maxWait = portMAX_DELAY) + { + T *p = MemoryPool::allocCopy(src, maxWait); + if (p) + p->is_replay_cached = false; + return p; + } + protected: // Alloc some storage from our static pool virtual T *alloc(TickType_t maxWait) override @@ -159,3 +190,18 @@ template class MemoryPool : public Allocator return nullptr; } }; + +/** + * A version of MemoryPool that plays nicely with the replay cache + */ +template class MemoryPoolReplayAware : public MemoryPool +{ + public: + virtual void release(T *p) override + { + if (p->is_replay_cached) + // Don't free packets that are in the replay cache + return; + MemoryPool::release(p); + } +}; diff --git a/src/mesh/RadioInterface.cpp b/src/mesh/RadioInterface.cpp index 71fcf1e7415..3956202c5e7 100644 --- a/src/mesh/RadioInterface.cpp +++ b/src/mesh/RadioInterface.cpp @@ -7,6 +7,9 @@ #include "Router.h" #include "configuration.h" #include "main.h" +#if !MESHTASTIC_EXCLUDE_REPLAY +#include "modules/ReplayModule.h" +#endif #include "sleep.h" #include #include @@ -355,6 +358,9 @@ void printPacket(const char *prefix, const meshtastic_MeshPacket *p) std::string out = DEBUG_PORT.mt_sprintf("%s (id=0x%08x fr=0x%08x to=0x%08x, transport = %u, WantAck=%d, HopLim=%d Ch=0x%x", prefix, p->id, p->from, p->to, p->transport_mechanism, p->want_ack, p->hop_limit, p->channel); +#if !MESHTASTIC_EXCLUDE_REPLAY + out += DEBUG_PORT.mt_sprintf(" hash=0x%04x", REPLAY_HASH(p->from, p->id)); +#endif if (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) { auto &s = p->decoded; diff --git a/src/mesh/RadioLibInterface.cpp b/src/mesh/RadioLibInterface.cpp index 0db101ce69f..9d37d321b70 100644 --- a/src/mesh/RadioLibInterface.cpp +++ b/src/mesh/RadioLibInterface.cpp @@ -11,6 +11,10 @@ #include #include +#if !MESHTASTIC_EXCLUDE_REPLAY +#include "modules/ReplayModule.h" +#endif + #if ARCH_PORTDUINO #include "PortduinoGlue.h" #include "meshUtils.h" @@ -260,11 +264,11 @@ void RadioLibInterface::onNotify(uint32_t notification) // If we are not currently in receive mode, then restart the random delay (this can happen if the main thread // has placed the unit into standby) FIXME, how will this work if the chipset is in sleep mode? - if (!txQueue.empty()) { + if (getNextTXPacket()) { if (!canSendImmediately()) { setTransmitDelay(); // currently Rx/Tx-ing: reset random delay } else { - meshtastic_MeshPacket *txp = txQueue.getFront(); + meshtastic_MeshPacket *txp = getNextTXPacket(); assert(txp); long delay_remaining = txp->tx_after ? txp->tx_after - millis() : 0; if (delay_remaining > 0) { @@ -277,7 +281,7 @@ void RadioLibInterface::onNotify(uint32_t notification) } else { // Send any outgoing packets we have ready as fast as possible to keep the time between channel scan and // actual transmission as short as possible - txp = txQueue.dequeue(); + txp = getNextTXPacket(true); assert(txp); bool sent = startSend(txp); if (sent) { @@ -286,6 +290,9 @@ void RadioLibInterface::onNotify(uint32_t notification) airTime->logAirtime(TX_LOG, xmitMsec); } LOG_DEBUG("%d packets remain in the TX queue", txQueue.getMaxLen() - txQueue.getFree()); +#if !MESHTASTIC_EXCLUDE_REPLAY + LOG_DEBUG("%u packets remain in the replay queue", replayModule->queueLength()); +#endif } } } @@ -302,6 +309,10 @@ void RadioLibInterface::setTransmitDelay() { meshtastic_MeshPacket *p = txQueue.getFront(); if (!p) { +#if !MESHTASTIC_EXCLUDE_REPLAY + if (replayModule->queuePeek()) + notify(TRANSMIT_DELAY_COMPLETED, true); +#endif return; // noop if there's nothing in the queue } @@ -313,7 +324,13 @@ void RadioLibInterface::setTransmitDelay() unsigned long add_delay = p->rx_rssi ? getTxDelayMsecWeighted(p) : getTxDelayMsec(); unsigned long now = millis(); p->tx_after = min(max(p->tx_after + add_delay, now + add_delay), now + 2 * getTxDelayMsecWeightedWorst(p->rx_snr)); - notifyLater(p->tx_after - now, TRANSMIT_DELAY_COMPLETED, false); +#if !MESHTASTIC_EXCLUDE_REPLAY + // If the head of the queue is delayed, but there are replay packets waiting, notify TX immediately + if (replayModule->queuePeek()) + notify(TRANSMIT_DELAY_COMPLETED, true); + else +#endif + notifyLater(p->tx_after - now, TRANSMIT_DELAY_COMPLETED, false); } else if (p->rx_snr == 0 && p->rx_rssi == 0) { /* We assume if rx_snr = 0 and rx_rssi = 0, the packet was generated locally. * This assumption is valid because of the offset generated by the radio to account for the noise @@ -380,8 +397,14 @@ void RadioLibInterface::completeSending() if (p) { txGood++; - if (!isFromUs(p)) + if (!isFromUs(p)) { txRelay++; +#if !MESHTASTIC_EXCLUDE_REPLAY + replayModule->adopt(p); // If we relayed it, then we might be asked to replay it later + } else { + replayModule->remember(p); // If we sent it, remember it so we don't ask for someone else to replay it +#endif + } printPacket("Completed sending", p); // We are done sending that packet, release it @@ -471,10 +494,17 @@ void RadioLibInterface::handleReceiveInterrupt() memcpy(mp->encrypted.bytes, radioBuffer.payload, payloadLen); mp->encrypted.size = payloadLen; - printPacket("Lora RX", mp); - airTime->logAirtime(RX_LOG, xmitMsec); +#if !MESHTASTIC_EXCLUDE_REPLAY + if (REPLAY_FAKE_PACKET_LOSS_PERCENT && (rand() % 100 < REPLAY_FAKE_PACKET_LOSS_PERCENT)) { + packetPool.release(mp); + return; // Fake some packet loss to test replay functionality + } + + replayModule->remember(mp); +#endif + deliverToReceiver(mp); } } @@ -531,4 +561,27 @@ bool RadioLibInterface::startSend(meshtastic_MeshPacket *txp) return res == RADIOLIB_ERR_NONE; } +} + +meshtastic_MeshPacket *RadioLibInterface::getNextTXPacket(bool dequeue) +{ + meshtastic_MeshPacket *p = txQueue.getFront(); + if (p && p->tx_after <= millis()) { + if (dequeue) { + p = txQueue.dequeue(); + } + return p; + } else { +#if !MESHTASTIC_EXCLUDE_REPLAY + // If there's nothing ready to send in the main TX queue, see if there's a replay packet pending + p = replayModule->queuePeek(); + if (p) { + if (dequeue) { + replayModule->queuePop(); + } + return p; + } +#endif + return NULL; + } } \ No newline at end of file diff --git a/src/mesh/RadioLibInterface.h b/src/mesh/RadioLibInterface.h index 9f497812f27..a5ad7ccce60 100644 --- a/src/mesh/RadioLibInterface.h +++ b/src/mesh/RadioLibInterface.h @@ -178,6 +178,8 @@ class RadioLibInterface : public RadioInterface, protected concurrency::Notified meshtastic_QueueStatus getQueueStatus(); + meshtastic_MeshPacket *getNextTXPacket(bool dequeue = false); + protected: uint32_t activeReceiveStart = 0; diff --git a/src/mesh/Router.cpp b/src/mesh/Router.cpp index 09fb079c599..9f308881191 100644 --- a/src/mesh/Router.cpp +++ b/src/mesh/Router.cpp @@ -35,7 +35,7 @@ (MAX_RX_TOPHONE + MAX_RX_FROMRADIO + 2 * MAX_TX_QUEUE + \ 2) // max number of packets which can be in flight (either queued from reception or queued for sending) -static MemoryDynamic dynamicPool; +static MemoryDynamicReplayAware dynamicPool; Allocator &packetPool = dynamicPool; #else // Embedded targets use static memory pools with compile-time constants @@ -43,7 +43,7 @@ Allocator &packetPool = dynamicPool; (MAX_RX_TOPHONE + MAX_RX_FROMRADIO + 2 * MAX_TX_QUEUE + \ 2) // max number of packets which can be in flight (either queued from reception or queued for sending) -static MemoryPool staticPool; +static MemoryPoolReplayAware staticPool; Allocator &packetPool = staticPool; #endif @@ -692,7 +692,7 @@ void Router::handleReceived(meshtastic_MeshPacket *p, RxSource src) meshtastic_PortNum_POSITION_APP, meshtastic_PortNum_NODEINFO_APP, meshtastic_PortNum_ROUTING_APP, meshtastic_PortNum_TELEMETRY_APP, meshtastic_PortNum_ADMIN_APP, meshtastic_PortNum_ALERT_APP, meshtastic_PortNum_KEY_VERIFICATION_APP, meshtastic_PortNum_WAYPOINT_APP, - meshtastic_PortNum_STORE_FORWARD_APP, meshtastic_PortNum_TRACEROUTE_APP)) { + meshtastic_PortNum_STORE_FORWARD_APP, meshtastic_PortNum_TRACEROUTE_APP, meshtastic_PortNum_REPLAY_APP)) { LOG_DEBUG("Ignore packet on non-standard portnum for CORE_PORTNUMS_ONLY"); cancelSending(p->from, p->id); skipHandle = true; diff --git a/src/modules/Modules.cpp b/src/modules/Modules.cpp index bd899749303..7af8f6ba24e 100644 --- a/src/modules/Modules.cpp +++ b/src/modules/Modules.cpp @@ -48,6 +48,9 @@ #if !MESHTASTIC_EXCLUDE_POWERSTRESS #include "modules/PowerStressModule.h" #endif +#if !MESHTASTIC_EXCLUDE_REPLAY +#include "modules/ReplayModule.h" +#endif #include "modules/RoutingModule.h" #include "modules/TextMessageModule.h" #if !MESHTASTIC_EXCLUDE_TRACEROUTE @@ -171,6 +174,9 @@ void setupModules() #endif #if !MESHTASTIC_EXCLUDE_POWERSTRESS new PowerStressModule(); +#endif +#if !MESHTASTIC_EXCLUDE_REPLAY + replayModule = new ReplayModule(); #endif // Example: Put your module here // new ReplyModule(); diff --git a/src/modules/ReplayModule.cpp b/src/modules/ReplayModule.cpp new file mode 100644 index 00000000000..e2392d57fb1 --- /dev/null +++ b/src/modules/ReplayModule.cpp @@ -0,0 +1,1064 @@ +#include "ReplayModule.h" +#include "MeshService.h" +#include "configuration.h" +#include "main.h" +#include "memGet.h" + +/** + * TODO: + * - Which tunables should be configurable? + * - What should the tunable default values be? + * - Scale replay rate based on modem settings + * - Prioritise replay of packets requested by routers + * - Lots of testing (and likely a bunch of bugfixes) + * - Implement a periodic stats packet that includes: + * - Number of adverts sent + * - Number of replays sent + * - Number of replay requests received + * - Number of replays requested + * - Number of adverts received + * - For each server we are tracking: + * - Age of last advert + * - Number of adverts received + * - Number of packets requested from this server + * - Number of packets requested by this server + * - router flag + */ + +ReplayModule *replayModule = NULL; + +/** + * Copy a packet into the replay buffer + */ +ReplayEntry *ReplayBuffer::adopt(meshtastic_MeshPacket *p) +{ + if (p->is_replay_cached || search(p)) + return NULL; // Already cached + + // Free the tail entry before overwriting it + if (next && (next & REPLAY_BUFFER_MASK) == (last & REPLAY_BUFFER_MASK)) { + ReplayEntry *oldEntry = &entries[(last++) & REPLAY_BUFFER_MASK]; + if (oldEntry->p) { + meshtastic_MeshPacket *oldp = oldEntry->p; + oldEntry->p = NULL; + packets.release(oldp); + num_cached--; + } + } + + // Ensure we don't use too much memory + unsigned int cacheHeapPct = getNumCached() * sizeof(meshtastic_MeshPacket) * 100 / memGet.getHeapSize(); + unsigned int freeHeapPct = (memGet.getFreeHeap() * 100) / memGet.getHeapSize(); + if (cacheHeapPct >= REPLAY_HEAP_THRESHOLD_PCT && freeHeapPct < REPLAY_HEAP_FREE_MIN_PCT) { + unsigned int wantPct = REPLAY_HEAP_FREE_TARGET_PCT - freeHeapPct; + if (wantPct > cacheHeapPct - REPLAY_HEAP_RESERVE_PCT) + wantPct = cacheHeapPct - REPLAY_HEAP_RESERVE_PCT; + unsigned int reduceBy = (wantPct * memGet.getHeapSize()) / (sizeof(meshtastic_MeshPacket) * 100); + LOG_DEBUG("Replay: Pruning %u packets from the replay cache to reduce memory pressure", reduceBy); + prune(getNumCached() - reduceBy); + } else if (num_cached >= REPLAY_BUFFER_CACHE_MAX) { + prune(REPLAY_BUFFER_CACHE_MAX - 1); + } + + // Add the new entry + meshtastic_MeshPacket *newPacket = packets.allocCopy(*p); + newPacket->is_replay_cached = true; + ReplayEntry *newEntry = &entries[next++ & REPLAY_BUFFER_MASK]; + *newEntry = {}; + newEntry->hash = REPLAY_HASH(p->from, p->id); + newEntry->p = newPacket; + num_cached++; + LOG_DEBUG("Replay: packets=%u, cached=%u, cache=%lu, heap=%lu, heap_free=%lu, last=%lu, next=%lu", getLength(), + getNumCached(), num_cached * sizeof(meshtastic_MeshPacket), memGet.getHeapSize(), memGet.getFreeHeap(), last, next); + + return newEntry; +} + +/** + * Search for a packet in the replay buffer by its hash + */ +ReplayEntry *ReplayBuffer::search(ReplayHash hash) +{ + if (next == last) + return NULL; // The buffer is empty + for (unsigned int i = next + REPLAY_BUFFER_SIZE - 1; i >= last + REPLAY_BUFFER_SIZE; i--) { + unsigned int idx = i & REPLAY_BUFFER_MASK; + if (entries[idx].hash == hash) + return &entries[idx]; + } + return NULL; +} + +/** + * Search for a packet in the replay buffer by its (from,id) tuple + */ +ReplayEntry *ReplayBuffer::search(NodeNum from, uint32_t id) +{ + if (next == last) + return NULL; // The buffer is empty + for (unsigned int i = next + REPLAY_BUFFER_SIZE - 1; i >= last + REPLAY_BUFFER_SIZE; i--) { + unsigned int idx = i & REPLAY_BUFFER_MASK; + if (!entries[idx].p) + continue; // This entry does not have an associated cached packet + if (entries[idx].p->id == id && entries[idx].p->from == from) + return &entries[idx]; + } + return NULL; +} + +/** + * Search for a packet in the replay buffer by its pointer + * If strict, only an exact pointer match will do. Otherwise matches on (from,id) tuple. + */ +ReplayEntry *ReplayBuffer::search(meshtastic_MeshPacket *p, bool strict) +{ + if (!p) + return NULL; // Invalid search pointer + if (next == last) + return NULL; // The buffer is empty + if (!strict) + return search(p->from, p->id); + for (unsigned int i = next + REPLAY_BUFFER_SIZE - 1; i >= last + REPLAY_BUFFER_SIZE; i--) { + unsigned int idx = i & REPLAY_BUFFER_MASK; + if (entries[idx].p == p) + return &entries[idx]; + } + return NULL; +} + +/** + * Prune the replay buffer to contain no more than 'keep' cached packets + * Hashes are still retained. + */ +void ReplayBuffer::prune(unsigned int keep) +{ + if (getLength() <= keep) + return; // Nothing to do + unsigned int priority[meshtastic_MeshPacket_Priority_MAX + 1] = {}; + // Count the number of packets at each priority level + for (unsigned int i = last; i < next; i++) { + unsigned int idx = i & REPLAY_BUFFER_MASK; + if (entries[idx].p) { + priority[entries[idx].p->priority]++; + } + } + if (num_cached <= keep) + return; // Nothing to do + size_t threshold = 0; + // Find the lowest priority threshold that will release enough packets + for (unsigned int prunable = 0; threshold <= meshtastic_MeshPacket_Priority_MAX;) { + prunable += priority[threshold]; + if (num_cached - prunable <= keep) + break; + threshold++; + } + // Release all packets at or below the priority threshold until we have pruned enough + for (unsigned int i = last; i < next && num_cached > keep; i++) { + unsigned int idx = i & REPLAY_BUFFER_MASK; + if (entries[idx].p && entries[idx].p->priority <= threshold) { + meshtastic_MeshPacket *p = entries[idx].p; + entries[idx].p = NULL; + packets.release(p); + num_cached--; + } + } +} + +/** + * Truncate the replay buffer to contain no more than the most recent 'keep' entries + * Both metadata and cached packets are released + */ +void ReplayBuffer::truncate(unsigned int keep) +{ + while (getLength() > keep) { + ReplayEntry *oldEntry = &entries[last++ & REPLAY_BUFFER_MASK]; + if (oldEntry->p) { + meshtastic_MeshPacket *oldp = oldEntry->p; + oldEntry->p = NULL; + packets.release(oldp); + num_cached--; + } + } +} + +/** + * Add a packet to the replay buffer + */ +void ReplayModule::adopt(meshtastic_MeshPacket *p) +{ + if (p->decoded.portnum == meshtastic_PortNum_REPLAY_APP) + return; // Don't cache replay packets + + ReplayEntry *entry = buffer.adopt(p); + if (!entry) + return; // Already cached + + LOG_DEBUG("Replay: Adopting packet from=0x%08x id=0x%08x priority=%u packets=%u cached=%u cache_bytes=%u", p->from, p->id, + p->priority, buffer.getLength(), buffer.getNumCached(), buffer.getNumCached() * sizeof(meshtastic_MeshPacket)); + unsigned int idx = buffer.getHeadCursor() & REPLAY_BUFFER_MASK; + want_replay.reset(idx); + dirty.set(idx); + if (p->priority >= REPLAY_CHUTIL_PRIORITY) + dirty_prio.set(idx); + packets_since_advert++; + notify(REPLAY_NOTIFY_ADOPT, true); +} + +/** + * Check if we have seen this packet before + */ +bool ReplayModule::isKnown(ReplayHash hash) +{ + for (ReplayCursor i = memory_next + REPLAY_REMEMBER_SIZE; i >= memory_next; i--) { + if (memory[i & REPLAY_REMEMBER_MASK] == hash) { + return true; + } + } + return false; +} + +/** + * Get client metadata entry + */ +ReplayClientInfo *ReplayModule::client(NodeNum id) +{ + ReplayClientInfo *target = clients; + for (unsigned int i = 0; i < REPLAY_CLIENT_SIZE; i++) { + if (clients[i].id == id) { + return &clients[i]; + unsigned int bucket = clients[i].bucket + ((millis() - clients[i].last_request_millis) / REPLAY_CLIENT_RATE_MS); + if (bucket >= REPLAY_CLIENT_BURST && clients[i].last_request_millis < target->last_request_millis) + target = &clients[i]; + } + if (target->id != id) { + *target = {}; + target->id = id; + } + } + return target; +} + +/** + * Send an advertisement of available packets + */ +void ReplayModule::advertise(bool aggregate, unsigned int from_sequence, ReplayMap aggregate_mask) +{ + LOG_INFO("Replay: Triggered advertisement: dirty=%u, dirty_prio=%u, packets_since_advert=%u, seq=%u", dirty.count(), + dirty_prio.count(), packets_since_advert, (next_sequence + 1) & REPLAY_SEQUENCE_MASK); + if (last_advert_cursor < buffer.getTailCursor()) + last_advert_cursor = buffer.getTailCursor(); // Clamp the advertisement cursor to the start of the buffer + if (last_advert_cursor >= buffer.getHeadCursor() && !aggregate) + return; // No new packets since last advertisement & not responding to an aggregate request + ReplayWire wire = {}; + wire.header.type = REPLAY_ADVERT_TYPE_AVAILABLE; + wire.header.priority = (airTime->channelUtilizationPercent() >= REPLAY_CHUTIL_THRESHOLD_PCT); + wire.header.boot = !last_advert_millis; + wire.header.router = IS_ONE_OF(config.device.role, meshtastic_Config_DeviceConfig_Role_ROUTER, + meshtastic_Config_DeviceConfig_Role_ROUTER_LATE); + wire.header.aggregate = aggregate; + + uint8_t throttled_clients[REPLAY_CLIENT_SIZE] = {}; + uint8_t throttled_count = 0; + for (unsigned int i = 0; i < REPLAY_CLIENT_SIZE; i++) { + ReplayClientInfo *client = &clients[i]; + if (!(client->bucket + ((millis() - client->last_request_millis) / REPLAY_CLIENT_RATE_MS))) { + LOG_DEBUG("Replay: Telling client 0x%08x not to make requests", client->id); + throttled_clients[throttled_count++] = client->id & 0x000F; + } + if (throttled_count >= REPLAY_CLIENT_THROTTLE_ADVERT_MAX) + break; + } + if (throttled_count) + wire.header.throttle = 1; + + last_advert_millis = millis(); + std::bitset b = {}; + ReplayMap aggregate_mask_local = 0; + if (aggregate) { + LOG_INFO("Replay: Sending aggregate advertisement from_sequence=%u mask=0x%04x", from_sequence, aggregate_mask); + wire.header.sequence = from_sequence & REPLAY_SEQUENCE_MASK; + b.reset(); + for (unsigned int bit = 0; bit < 15; bit++) { + if (!(aggregate_mask & (1 << bit))) + continue; + ReplayAdvertisement *record = &advertisements[(from_sequence + bit) & REPLAY_SEQUENCE_MASK]; + if (b.count() + record->dirty.count() > REPLAY_ADVERT_MAX_PACKETS) { + LOG_DEBUG("Replay: Requested aggregate exceeds max packets per advert"); + break; // Avoid exceeding the maximum number of packets in a single advert + } + for (unsigned int i = record->tail; i <= record->head && i < buffer.getTailCursor(); i++) { + record->dirty.reset(i & REPLAY_BUFFER_MASK); // Clear expired packets + } + b |= record->dirty; + aggregate_mask_local |= (1 << bit); + } + for (unsigned int i = 0; i < REPLAY_BUFFER_SIZE; i++) { + if (b.test(i)) { + if (!buffer.get(i)->p) + b.reset(i); // Don't advertise pruned entries + else if (wire.header.priority && buffer.get(i)->p->priority < REPLAY_CHUTIL_PRIORITY) + b.reset(i); // Don't advertise non-priority entries + } + } + } else { + wire.header.sequence = next_sequence & REPLAY_SEQUENCE_MASK; + b = wire.header.priority ? dirty_prio : dirty; + ReplayAdvertisement *record = &advertisements[wire.header.sequence]; + record->sequence = next_sequence; + record->head = buffer.getHeadCursor(); + record->tail = buffer.getTailCursor(); + record->dirty = dirty; + } + + uint16_t ranges = 0; + unsigned int packets = 0; + bool again = false; + for (unsigned int i = 0; i < REPLAY_BUFFER_SIZE; i++) { + const unsigned int bit = i / 16; + if (b.test(i) && buffer.get(i)->p) { + ranges |= (1 << bit); + packets++; + if (packets >= REPLAY_ADVERT_MAX_PACKETS) { + again = true; + break; + } + } + } + if (!ranges) + return; // No cached dirty packets at current priority level + + const unsigned int payload_max = + __builtin_popcount(ranges) * 2 + packets + 1 /*ranges*/ + 1 /*header*/ + (aggregate ? 1 : 0) /*aggregate mask*/; + uint16_t payload[payload_max] = {wire.header.bitfield, ranges}; + off_t payload_cursor = 2; + for (unsigned int bit = 0; bit < 16; bit++) { + if (!(ranges & (1 << bit))) + continue; + ReplayMap *packet_map = &payload[payload_cursor++]; + ReplayMap *priority_map = &payload[payload_cursor++]; + for (unsigned int j = 0; j < 16; j++) { + unsigned int idx = bit * 16 + j; + ReplayEntry *entry = buffer.get(idx); + if (!b.test(idx) || !entry->p) + continue; + LOG_DEBUG("Advertising packet hash=0x%04x from=0x%08x id=0x%08x", entry->hash, entry->p->from, entry->p->id); + payload[payload_cursor++] = entry->hash; + *packet_map |= (1 << j); + if (entry->p->priority >= REPLAY_CHUTIL_PRIORITY) + *priority_map |= (1 << j); + dirty.reset(idx); + dirty_prio.reset(idx); + } + } + if (aggregate) + payload[payload_cursor++] = aggregate_mask_local; + else + next_sequence++; + + if (throttled_count) { + uint8_t *payload_throttled = (uint8_t *)&payload[payload_cursor]; + for (unsigned int i = 0; i < throttled_count; i++) { + payload_throttled[i] = throttled_clients[i]; + } + payload_cursor += throttled_count / 2 + (throttled_count & 0x01); + } + + LOG_INFO("Replay: Advertising %u of %u/%u cached packets (chutil=%4.2f%%)", packets, buffer.getNumCached(), + buffer.getLength(), airTime->channelUtilizationPercent()); + meshtastic_MeshPacket *p = allocDataPacket(); + p->to = NODENUM_BROADCAST; + p->priority = meshtastic_MeshPacket_Priority_REPLAY; + p->hop_limit = 0; + p->decoded.payload.size = payload_cursor * sizeof(uint16_t); + memcpy(p->decoded.payload.bytes, payload, p->decoded.payload.size); + + last_advert_cursor = buffer.getHeadCursor(); + if (!aggregate) + packets_since_advert -= packets; + service->sendToMesh(p); + + if (again) { + advertise(); + } +} + +/** + * Send an advertisement of expired packets (i.e. which packets have been pruned from the cache) + */ +void ReplayModule::advertiseExpired() +{ + ReplayWire wire = {}; + wire.header.type = REPLAY_ADVERT_TYPE_EXPIRED; + wire.header.priority = (airTime->channelUtilizationPercent() >= REPLAY_CHUTIL_THRESHOLD_PCT); + wire.header.boot = !last_advert_millis; + wire.header.router = IS_ONE_OF(config.device.role, meshtastic_Config_DeviceConfig_Role_ROUTER, + meshtastic_Config_DeviceConfig_Role_ROUTER_LATE); + + uint16_t payload[18] = {wire.header.bitfield}; + ReplayMap *map = &payload[1]; + ReplayMap *range = &payload[2]; + + for (unsigned int i = 0; i < REPLAY_BUFFER_SIZE; i++) { + const unsigned int bit = i / 16; + if (!(i & 0x04) && *range) + range++; + if (!buffer.get(i)->p) + *range |= (1 << (i & 0x04)); + } + + meshtastic_MeshPacket *p = allocDataPacket(); + p->to = NODENUM_BROADCAST; + p->priority = meshtastic_MeshPacket_Priority_REPLAY; + p->hop_limit = 0; + p->decoded.payload.size = sizeof(uint16_t) * (1 /*header*/ + 1 /*map*/ + __builtin_popcount(*map) /*ranges*/); + memcpy(p->decoded.payload.bytes, &payload, p->decoded.payload.size); + + service->sendToMesh(p); + want_replay_expired = false; +} + +/** + * Send the next pending packet for which a replay has been requested, priority packets first + */ +void ReplayModule::replay() +{ + LOG_WARN("Replay: Triggered replay: from=%u, want_replay=%u, want_replay_prio=%u, want_replay_expired=%u", replay_from, + want_replay.count(), want_replay_prio, want_replay_expired); + + if (!replay_from) + return; // No replay in progress + + if (want_replay_expired && last_expired_millis + REPLAY_EXPIRED_SPACING_SECS * 1000 < millis()) { + advertiseExpired(); + return; + } + + if (!want_replay.any()) { + LOG_DEBUG("Replay: There is nothing left to replay"); + replay_from = 0; // All done + return; + } + + ReplayEntry *to_send = NULL; + ReplayCursor to_send_idx = 0; + for (ReplayCursor i = replay_from + REPLAY_BUFFER_SIZE; !to_send && i >= buffer.getTailCursor() + REPLAY_BUFFER_SIZE; i--) { + // Replay priority packets first + ReplayCursor idx = i & REPLAY_BUFFER_MASK; + if (want_replay.test(idx)) { + ReplayEntry *entry = buffer.get(idx); + if (!entry->p) + want_replay_expired = true; + else if (want_replay_prio && !(entry->p->priority >= REPLAY_CHUTIL_PRIORITY)) { + if (entry->last_replay_millis > last_advert_millis) + continue; // Already replayed this packet since last advert + to_send = entry; + to_send_idx = idx; + } + } + } + if (!to_send && airTime->channelUtilizationPercent() < REPLAY_CHUTIL_THRESHOLD_PCT) { + // No more priority packets to send, so now send non-priority packets if chutil allows + want_replay_prio = false; + for (ReplayCursor i = replay_from + REPLAY_BUFFER_SIZE; !to_send && i >= buffer.getTailCursor() + REPLAY_BUFFER_SIZE; + i--) { + ReplayCursor idx = i & REPLAY_BUFFER_MASK; + if (want_replay.test(idx) && buffer.get(idx)->p) { + ReplayEntry *entry = buffer.get(idx); + if (entry->last_replay_millis > last_advert_millis) + continue; // Already replayed this packet since last advert + to_send = entry; + to_send_idx = idx; + } + } + } + + if (to_send) { + LOG_INFO("Replay: Replaying packet hash=0x%04x from=0x%08x id=0x%08x count=%u", to_send->hash, to_send->p->from, + to_send->p->id, to_send->replay_count + 1); + // router->rawSend(to_send->p); + if (!queuePush(to_send_idx)) { + LOG_WARN("Replay: Unable to queue replay packet hash=0x%04x from=0x%08x id=0x%08x: queue full", to_send->hash, + to_send->p->from, to_send->p->id); + return; + } else { + to_send->last_replay_millis = millis(); + to_send->replay_count++; + want_replay.reset(to_send_idx); + } + } else { + LOG_DEBUG("Triggered replay, but there is nothing to send"); + replay_from = 0; // All done + } +} + +/** + * Request replay of missing packets from a server + */ +void ReplayModule::requestReplay(ReplayServerInfo *server) +{ + std::bitset request = server->missing & server->available; + if (server->flag_priority) + request &= server->priority; + if (!request.any()) + return; // Nothing to request + unsigned long request_millis = millis() + REPLAY_REQUEST_TIMEOUT_SECS * 1000; + ReplayRequestInfo *requests[REPLAY_BUFFER_SIZE] = {}; + for (int i = 0; i < REPLAY_BUFFER_SIZE; i++) { + if (request.test(i)) { + if (isKnown(server->packets[i])) { + server->missing.reset(i); + request.reset(i); + continue; + } + ReplayRequestInfo *r = requestInfo(server->packets[i]); + if (!r) { + LOG_WARN("Replay: Not requesting missing packet 0x%04x from server 0x%08x: too many outstanding requests", + server->packets[i], server->id); + request.reset(i); + } else if (r->timeout_millis >= millis()) { + LOG_DEBUG("Replay: Not requesting missing packet 0x%04x from server 0x%08x: already requested this recently", + server->packets[i], server->id); + request.reset(i); + } else { + requests[i] = r; + r->timeout_millis = request_millis; + } + } + } + if (!request.any()) + return; // Nothing to request + int requested = request.count(); + if (requested > REPLAY_REQUEST_MAX_PACKETS) { + // Limit the number of requested packets to avoid overloading the server + for (int i = 0; i < REPLAY_BUFFER_SIZE && requested > REPLAY_REQUEST_MAX_PACKETS; i++) { + if (request.test(i) && !server->priority.test(i)) { + // Skip non-priority packets first + request.reset(i); + requested--; + requests[i]->timeout_millis = 0; + } + } + for (int i = 0; i < REPLAY_BUFFER_SIZE && requested > REPLAY_REQUEST_MAX_PACKETS; i++) { + if (request.test(i)) { + request.reset(i); + requested--; + requests[i]->timeout_millis = 0; + } + } + } + + ReplayWire wire = {}; + wire.header.type = REPLAY_REQUEST_TYPE_PACKETS; + wire.header.priority = airTime->channelUtilizationPercent() >= REPLAY_CHUTIL_THRESHOLD_PCT; + wire.header.router = IS_ONE_OF(config.device.role, meshtastic_Config_DeviceConfig_Role_ROUTER, + meshtastic_Config_DeviceConfig_Role_ROUTER_LATE); + wire.header.sequence = server->last_sequence; // Echo the server's last sequence number for tracking & future-proofing + + meshtastic_MeshPacket *p = allocDataPacket(); + p->to = server->id; + p->priority = meshtastic_MeshPacket_Priority_REPLAY; + p->hop_limit = 0; + uint16_t *payload = (uint16_t *)p->decoded.payload.bytes; + *payload++ = wire.header.bitfield; + ReplayMap *map = payload++; + for (unsigned int i = 0; i < 16; i++) { + for (unsigned int j = 0; j < 16; j++) { + unsigned int idx = i * 16 + j; + if (request.test(idx)) { + LOG_DEBUG("Replay: Requesting replay of packet hash=0x%04x via=0x%08x", server->packets[idx], server->id); + *map |= (1 << i); + *payload |= (1 << j); + server->replays_requested++; + } + } + if (*map & (1 << i)) + payload++; + } + p->decoded.payload.size = (payload - (uint16_t *)p->decoded.payload.bytes) * sizeof(uint16_t); + + LOG_INFO("Replay: Requesting %u missing packets server=0x%08x prio=%u ranges=%u size=%u", request.count(), server->id, + wire.header.priority, (uint16_t)*map, p->decoded.payload.size); + service->sendToMesh(p); +} + +/** + * Request that a server send an aggregate advertisement covering specific prior adverts that have been missed + */ +void ReplayModule::requestMissingAdvertisements(ReplayServerInfo *server) +{ + if (!server->missing_sequence) + return; // Nothing to request + meshtastic_MeshPacket *p = allocDataPacket(); + p->to = server->id; + p->priority = meshtastic_MeshPacket_Priority_REPLAY; + p->hop_limit = 0; + uint16_t *payload = (uint16_t *)p->decoded.payload.bytes; + + ReplayWire wire = {}; + wire.header.type = REPLAY_REQUEST_TYPE_ADVERTISEMENT; + wire.header.priority = airTime->channelUtilizationPercent() >= REPLAY_CHUTIL_THRESHOLD_PCT; + wire.header.router = IS_ONE_OF(config.device.role, meshtastic_Config_DeviceConfig_Role_ROUTER, + meshtastic_Config_DeviceConfig_Role_ROUTER_LATE); + wire.header.sequence = server->last_sequence; + *payload++ = wire.header.bitfield; + *payload++ = server->missing_sequence & 0xFFFF; + p->decoded.payload.size = (unsigned char *)payload - p->decoded.payload.bytes; + + LOG_INFO("Replay: Requesting missing advertisements from server=0x%08x sequence=%lu missing=%u", server->id, + server->last_sequence, server->missing_sequence); + service->sendToMesh(p); +} + +/** + * Handle all incoming replay protocol packets + */ +ProcessMessage ReplayModule::handleReceived(const meshtastic_MeshPacket &mp) +{ + if (mp.decoded.payload.size < sizeof(uint16_t)) + return ProcessMessage::STOP; // Not enough data for even the header + if (mp.hop_limit != mp.hop_start) { + LOG_DEBUG("Replay: Ignoring indirect packet from=0x%08x hop_limit=%u hop_start=%u", mp.from, mp.hop_limit, mp.hop_start); + return ProcessMessage::STOP; // Replay packets must be from a direct neighbor + } + if (isFromUs(&mp)) + return ProcessMessage::STOP; // Ignore our own packets + + else if (isToUs(&mp)) + handleRequest(&mp); + else + handleAdvertisement(&mp); + + return ProcessMessage::STOP; +} + +/** + * Handle a replay request packet + */ +void ReplayModule::handleRequest(const meshtastic_MeshPacket *p) +{ + uint16_t *payload = (uint16_t *)p->decoded.payload.bytes; + int payload_words = p->decoded.payload.size / sizeof(uint16_t); + ReplayWire *wire = (ReplayWire *)payload; + LOG_INFO("Replay: Received request from=0x%08x size=%u type=%u", p->from, p->decoded.payload.size, wire->header.type); + + ReplayClientInfo *client = this->client(p->from); + client->bucket += (millis() - client->last_request_millis) / REPLAY_CLIENT_RATE_MS; + if (client->bucket > REPLAY_CLIENT_BURST) + client->bucket = REPLAY_CLIENT_BURST; + client->last_request_millis = millis(); + + switch (wire->header.type) { + case REPLAY_REQUEST_TYPE_ADVERTISEMENT: { + if (payload_words < 2) { + LOG_WARN("Replay: Advertisement request payload too small"); + break; + } + uint32_t missing = payload[1]; + if (!missing) { + LOG_WARN("Replay: Advertisement request for zero missing advertisements"); + break; + } + LOG_INFO("Replay: Advertisement request from=0x%08x seq=%u missing=%u", p->from, wire->header.sequence, missing); + advertise(true, wire->header.sequence, missing); + } break; + case REPLAY_REQUEST_TYPE_PACKETS: { + if (payload_words < 3 || payload_words < 1 /*header*/ + 1 /*map*/ + __builtin_popcount(payload[1]) /*ranges*/) { + LOG_WARN("Replay: Packet request payload too small"); + break; + } + ReplayMap map = payload[1]; + ReplayMap *range = &payload[2]; + unsigned int requested = 0; + for (unsigned int i = 0; i < 16; i++) { + if (!(map & (1 << i))) + continue; + for (unsigned int j = 0; j < 16; j++) { + if (*range & (1 << j)) { + if (!client->bucket) + break; + ReplayCursor idx = (i * 16 + j) & REPLAY_BUFFER_MASK; + ReplayEntry *entry = buffer.get(idx); + if (router->findInTxQueue(entry->p->from, entry->p->id)) + continue; // Don't replay packets that are already in our TX queue + if (!wire->header.priority || (entry->p && entry->p->priority >= REPLAY_CHUTIL_PRIORITY)) { + want_replay.set(idx); + requested++; + client->bucket--; + LOG_INFO("Replay: Request for %s packet hash=0x%04x client=0x%08x", entry->p ? "cached" : "expired", + entry->hash, client->id); + if (!entry->p) + want_replay_expired = true; + } + } + } + range++; + } + if (!client->bucket) + LOG_WARN("Replay: Client 0x%08x is being rate limited", client->id); + replay_from = buffer.getHeadCursor(); + LOG_INFO("Replay: Pending replay of %u packets, requested=%u, want_expired=%u", want_replay.count(), requested, + want_replay_expired); + notify(REPLAY_NOTIFY_REPLAY, true); + } break; + default: + LOG_WARN("Replay: Unknown request type %u", wire->header.type); + break; + } +} + +/** + * Handle a replay advertisement packet + */ +void ReplayModule::handleAdvertisement(const meshtastic_MeshPacket *p) +{ + LOG_INFO("Received replay advertisement from=0x%08x id=0x%08x size=%u", p->from, p->id, p->decoded.payload.size); + if (isFromUs(p)) + return; // Ignore our own advertisements + + if (p->decoded.payload.size < sizeof(uint16_t)) { + LOG_WARN("Replay: Advertisement payload too small"); + return; // Not enough data for even the header + } + uint16_t *payload = (uint16_t *)p->decoded.payload.bytes; + ReplayWire *wire = (ReplayWire *)payload++; + int payload_words = p->decoded.payload.size / sizeof(uint16_t) - 1; + ReplayServerInfo _server = {}; + _server.id = p->from; + ReplayServerInfo *server = &_server; + for (unsigned int i = 0; i < REPLAY_TRACK_SERVERS; i++) { + if (servers[i].id == p->from) { + server = &servers[i]; + break; + } + } + server->last_advert_millis = millis(); + server->flag_priority = wire->header.priority; + server->flag_router = wire->header.router; + + if (wire->header.boot) { + // The server has rebooted, so reset its availability state + server->available.reset(); + server->priority.reset(); + server->missing.reset(); + server->last_sequence = 0; + server->missing_sequence = 0; + } + + switch (wire->header.type) { + case REPLAY_ADVERT_TYPE_AVAILABLE: + handleAvailabilityAdvertisement(wire, (unsigned char *)payload, + ((unsigned char *)p->decoded.payload.bytes) + p->decoded.payload.size, server); + break; + case REPLAY_ADVERT_TYPE_EXPIRED: + if (payload_words < 1 || payload_words < 1 /*map*/ + __builtin_popcount(payload[0]) /*ranges*/) { + LOG_WARN("Replay: Expired advert payload too small"); + return; + } + handleExpiredAdvertisement(wire, (unsigned char *)payload, + ((unsigned char *)p->decoded.payload.bytes) + p->decoded.payload.size, server); + break; + default: + LOG_WARN("Replay: Unknown advertisement type %u", wire->header.type); + return; + } + + server->adverts_received++; + if (!server->is_tracked) { + // Start tracking this server if we have space or it is more useful than an existing tracked server + server->discovered_millis = millis(); + ReplayServerInfo *target = servers; + for (unsigned int i = 0; i < REPLAY_TRACK_SERVERS; i++) { + if (!servers[i].is_tracked) { + target = &servers[i]; + break; // Always use empty slots first + } + if (!servers[i].flag_router && target->flag_router) + target = &servers[i]; // Prefer replacing non-routers + else if (servers[i].last_advert_millis < target->last_advert_millis) + target = &servers[i]; // Prefer replacing older entries + } + if (!target->is_tracked // Target is an empty slot + || (target->last_advert_millis + REPLAY_SERVER_STALE_SECS * 1000 < millis()) // Target is stale + || (target->replays_requested < server->replays_requested) // Target is less useful + ) { + memcpy(target, server, sizeof(ReplayServerInfo)); + server = target; + server->is_tracked = true; + LOG_INFO("Replay: Now tracking server=0x%08x", target->id); + } + } + LOG_INFO("Replay: server=0x%08x adverts=%u requests=%u missing=%u/%u seq=%u prio=%u router=%u agg=%u boot=%u", server->id, + server->adverts_received, server->replays_requested, server->missing.count(), server->available.count(), + wire->header.sequence, server->flag_priority, server->flag_router, wire->header.aggregate, wire->header.boot); +} + +/** + * Handle an availability advertisement (i.e. which packets the server has available) + */ +void ReplayModule::handleAvailabilityAdvertisement(ReplayWire *wire, unsigned char *data, unsigned char *data_end, + ReplayServerInfo *server) +{ + int payload_words = (data_end - data) / sizeof(uint16_t); + if (payload_words < 2 || payload_words < 1 /*map*/ + __builtin_popcount(((uint16_t *)data)[0]) * 2 /*ranges*/) { + LOG_WARN("Replay: Availability advert payload too small"); + return; + } + uint16_t *payload = (uint16_t *)data; + uint16_t *payload_start = payload; + ReplayMap map = *payload++; + for (unsigned int i = 0; i < 16; i++) { + if (!(map & (1 << i))) + continue; + ReplayMap *packet_map = payload++; + ReplayMap *priority_map = payload++; + if (payload - payload_start > payload_words || + payload - payload_start > payload_words - __builtin_popcount(*packet_map)) { + LOG_WARN("Replay: Availability advert payload too small"); + return; + } + for (unsigned int j = 0; j < 16; j++) { + if (*packet_map & (1 << j)) { + ReplayCursor idx = (i * 16 + j) & REPLAY_BUFFER_MASK; + server->available.set(idx); + if (*priority_map & (1 << j)) + server->priority.set(idx); + else + server->priority.reset(idx); + server->packets[idx] = *payload++; + if (!isKnown(server->packets[idx])) { + LOG_WARN("Replay: Discovered missing packet hash=0x%04x via=0x%08x", server->packets[idx], server->id); + server->missing.set(idx); + } else { + LOG_DEBUG("Replay: Discovered known packet hash=0x%04x via=0x%08x", server->packets[idx], server->id); + server->missing.reset(idx); + } + } + } + } + + if (!wire->header.boot) { + unsigned int this_sequence = (server->last_sequence & ~REPLAY_SEQUENCE_MASK) | wire->header.sequence; + if (!server->last_sequence) { + // First ever advert received from this server + server->last_sequence = REPLAY_SEQUENCE_MASK + 1 + wire->header.sequence; + this_sequence = (server->last_sequence & ~REPLAY_SEQUENCE_MASK) | wire->header.sequence; + LOG_INFO("Replay: First advertisement from server=0x%08x seq=%u, last_seq=%u", server->id, this_sequence, + server->last_sequence); + } + if (this_sequence <= server->last_sequence - 15) + this_sequence += REPLAY_SEQUENCE_MASK + 1; // This is a forward wrap, not a reference to an old sequence + if (this_sequence < server->last_sequence && !wire->header.aggregate) { + // If the sequence number went backwards, then we have likely missed many intervening + // adverts and should reset our tracking state & start with a blank slate. Do not ask + // for missing adverts, because we have missed way too much for this to be sensible. + LOG_WARN("Replay: Advertisement sequence went backwards from server=0x%08x seq=%u, last_seq=%u", server->id, + this_sequence, server->last_sequence); + server->available.reset(); + server->priority.reset(); + server->missing.reset(); + server->last_sequence = REPLAY_SEQUENCE_MASK + 1 + wire->header.sequence; + this_sequence = (server->last_sequence & ~REPLAY_SEQUENCE_MASK) | wire->header.sequence; + server->max_sequence = this_sequence; + } + if (this_sequence > server->max_sequence) + server->max_sequence = this_sequence; + for (unsigned int i = 0; i < 32; i++) { + unsigned int seq = server->last_sequence + i; + if (seq <= server->last_sequence) + continue; // We already received the advert with this sequence + if (seq >= this_sequence) + break; + server->missing_sequence |= (1 << i); + LOG_WARN("Replay: Noticed missing advertisement seq=%u from server=0x%08x", seq, server->id); + } + while (server->last_sequence < server->max_sequence && !(server->missing_sequence & 3)) { + server->missing_sequence >>= 1; + server->last_sequence++; + } + + // Handle aggregate advertisements + if (wire->header.aggregate && data_end - (unsigned char *)payload >= (int)sizeof(ReplayMap)) { + ReplayMap aggregate_mask = *payload++; + for (unsigned int i = 0; i < 16; i++) { + unsigned int seq = this_sequence + i; + if (seq <= server->last_sequence) + continue; // We already received the advert with this sequence + if (aggregate_mask & (1 << i)) { + LOG_DEBUG("Replay: Caught up on missed advertisement server=0x%08x seq=%u", server->id, seq); + server->missing_sequence &= ~(1 << (seq - server->last_sequence)); + if (seq > server->max_sequence) + server->max_sequence = seq; + } + } + while (server->last_sequence < server->max_sequence && !(server->missing_sequence & 3)) { + server->missing_sequence >>= 1; + server->last_sequence++; + } + } + + // Catch up on missing advertisements + if (server->missing_sequence) { + requestMissingAdvertisements(server); + } else { + LOG_DEBUG("Replay: Sequence assert last=%u this=%u max=%u, wire=%u", server->last_sequence, this_sequence, + server->max_sequence, wire->header.sequence); + assert(server->last_sequence == server->max_sequence); + } + } + + if (wire->header.throttle && (unsigned char *)payload < data_end) { + uint8_t *throttled = (uint8_t *)payload; + uint8_t me = nodeDB->getNodeNum() & 0x000F; + while (throttled <= data_end) { + if (*throttled++ == me) + return; // We are being throttled by the server, so don't ask for anything + } + } + + if (server->missing.any()) { + for (unsigned int i = 0; i < REPLAY_BUFFER_SIZE; i++) { + if (server->missing.test(i)) { + if (!server->available.test(i)) { + // This packet is missing but the server does not claim to have it, so stop tracking it + server->missing.reset(i); + } else if (isKnown(server->packets[i])) { + // This packet was previously missing, but we have since received it + server->missing.reset(i); + } + } + } + if (server->missing.any()) + requestReplay(server); + } +} + +/** + * Handle an expiry advertisement (i.e. which packets the server has pruned from its cache) + */ +void ReplayModule::handleExpiredAdvertisement(ReplayWire *wire, unsigned char *data, unsigned char *data_end, + ReplayServerInfo *server) +{ + unsigned int expired = 0; + uint16_t *payload = (uint16_t *)data; + ReplayMap map = *payload++; + for (unsigned int i = 0; i < 16; i++) { + if (!(map & (1 << i))) + continue; + for (unsigned int j = 0; j < 16; j++) { + if (*payload & (1 << j)) { + ReplayCursor idx = (i * 16 + j) & REPLAY_BUFFER_MASK; + server->available.reset(idx); + expired++; + } + } + payload++; + } + LOG_INFO("Replay: Received expiry advertisement from=0x%08x expired_count=%u", server->id, expired); +} + +/** + * Get or allocate a request slot for a specific packet hash + */ +ReplayRequestInfo *ReplayModule::requestInfo(ReplayHash hash) +{ + ReplayRequestInfo *target = NULL; + for (unsigned int i = 0; i < REPLAY_REQUEST_MAX_OUTSTANDING; i++) { + if (requests[i].hash == hash && requests[i].timeout_millis >= millis()) { + return &requests[i]; + } + if (!target && requests[i].timeout_millis < millis()) + target = &requests[i]; + } + if (target) { + target->hash = hash; + target->timeout_millis = 0; + } + return target; +} + +/** + * Add a packet to the replay TX queue for sending + */ +bool ReplayModule::queuePush(ReplayCursor idx) +{ + for (unsigned int i = queue_tail; queue_length && i < queue_next; i++) { + if (queue[i & REPLAY_QUEUE_MASK] == idx) { + LOG_INFO("Replay: Packet already in TX queue hash=0x%04x", buffer.get(idx & REPLAY_BUFFER_MASK)->hash); + return true; // Already queued + } + } + if (queueLength() >= REPLAY_QUEUE_SIZE) { + LOG_WARN("Replay: Queue full, cannot queue packet hash=0x%04x", buffer.get(idx & REPLAY_BUFFER_MASK)->hash); + return false; // Queue is full + } + idx = (buffer.getTailCursor() & ~REPLAY_BUFFER_MASK) | (idx & REPLAY_BUFFER_MASK); + queue[queue_next++ & REPLAY_QUEUE_MASK] = idx; + queue_length++; + LOG_DEBUG("Replay: Queued packet for TX hash=0x%04x queue=%u", buffer.get(idx & REPLAY_BUFFER_MASK)->hash, queueLength()); + return true; +} + +/** + * Peek at the next packet in the replay TX queue without removing it + */ +meshtastic_MeshPacket *ReplayModule::queuePeek() +{ + if (!queueLength()) + return NULL; // Queue is empty + ReplayCursor idx = queue[queue_tail & REPLAY_QUEUE_MASK]; + if (idx < buffer.getTailCursor()) { + LOG_ERROR("Replay: Peeked at a packet older than the buffer tail"); + queue_tail++; // This entry has fallen out of the buffer + queue_length--; + return queuePeek(); + } + ReplayEntry *entry = buffer.get(idx & REPLAY_BUFFER_MASK); + if (!entry->p) { + LOG_ERROR("Replay: Peeked at a pruned packet hash=0x%04x", entry->hash); + queue_tail++; // This entry has been pruned + queue_length--; + return queuePeek(); + } + return entry->p; +} + +/** + * Pop the next packet from the replay TX queue + */ +meshtastic_MeshPacket *ReplayModule::queuePop() +{ + meshtastic_MeshPacket *p = queuePeek(); + if (p) { + queue_tail++; + queue_length--; + } + return p; +} + +/** + * Handle thread notifications + */ +void ReplayModule::onNotify(uint32_t notification) +{ + LOG_DEBUG("Replay: onNotify %u", notification); + + if (replay_from) + replay(); + + unsigned int now = millis(); + uint32_t deadline = last_advert_millis + REPLAY_FLUSH_SECS * 1000; + if (now < REPLAY_STARTUP_DELAY_SECS * 1000) + deadline = REPLAY_STARTUP_DELAY_SECS * 1000; // Ensure we don't advertise too quickly on boot + + if (packets_since_advert > REPLAY_FLUSH_PACKETS || deadline <= now) + advertise(); + + if (replay_from >= buffer.getTailCursor() && replay_from) { + // We still have packets pending replay + notifyLater(REPLAY_SPACING_MS, REPLAY_NOTIFY_REPLAY, true); + } else if (deadline > now) { + // Sleep until the next advert deadline + LOG_DEBUG("Sleep to deadline %ld", deadline - now); + notifyLater(deadline - now, REPLAY_NOTIFY_INTERVAL, false); + } +} \ No newline at end of file diff --git a/src/modules/ReplayModule.h b/src/modules/ReplayModule.h new file mode 100644 index 00000000000..046620822a1 --- /dev/null +++ b/src/modules/ReplayModule.h @@ -0,0 +1,218 @@ +#pragma once +#include "SinglePortModule.h" +#include "concurrency/NotifiedWorkerThread.h" +#include + +#define REPLAY_FAKE_PACKET_LOSS_PERCENT 0 // Simulate this percentage of packet loss for testing + +#define REPLAY_REMEMBER_MASK 0x3FF /*1024*/ // Mask for wrapping packet memory index +#define REPLAY_REMEMBER_SIZE (REPLAY_REMEMBER_MASK + 1) // Remember the most recent n received packets +#define REPLAY_BUFFER_MASK 0xFF /*256*/ // Mask for wrapping buffer indices +#define REPLAY_BUFFER_SIZE (REPLAY_BUFFER_MASK + 1) // Track at most this many packets +#define REPLAY_BUFFER_CACHE_MAX REPLAY_BUFFER_SIZE // Cache at most this many packets +#define REPLAY_QUEUE_MASK 0x0F // Mask for wrapping the replay queue index +#define REPLAY_QUEUE_SIZE (REPLAY_QUEUE_MASK + 1) // Size of the replay +#define REPLAY_FLUSH_PACKETS 16 // Send an advertisement after at most this many packets +#define REPLAY_FLUSH_SECS 15 // Send an advertisement after at most this many seconds (if unadvertised packets are pending) +#define REPLAY_STARTUP_DELAY_SECS 30 // Wait this many seconds after boot before sending the first advertisement +#define REPLAY_ADVERT_MAX_PACKETS 64 // Advertise at most this many packets at a time +#define REPLAY_CHUTIL_THRESHOLD_PCT 35 // If chutil is >= this, only advertise high-priority packets +#define REPLAY_CHUTIL_PRIORITY meshtastic_MeshPacket_Priority_RELIABLE // Packets with priority >= this are high-priority +#define REPLAY_HEAP_THRESHOLD_PCT 10 // If we are using more than this much of the heap on cache, enable proactive pruning +#define REPLAY_HEAP_RESERVE_PCT 5 // Don't prune the cache to below this much of the heap +#define REPLAY_HEAP_FREE_MIN_PCT 10 // Prune packets if free heap is below this +#define REPLAY_HEAP_FREE_TARGET_PCT 15 // Prune packets until free heap is above this +#define REPLAY_SPACING_MS 500 // Spacing between replayed packets (TODO: scale based on radio settings) +#define REPLAY_EXPIRED_SPACING_SECS 10 // Minimum spacing between advertisements of expired packets +#define REPLAY_SEQUENCE_MASK 0x1F // Mask for wrapping advertisement sequence number +#define REPLAY_TRACK_SERVERS 8 // Keep track of state for this many servers +#define REPLAY_REQUEST_MAX_PACKETS 16 // Request at most this many packets at a time +#define REPLAY_REQUEST_MAX_OUTSTANDING 32 // Keep track of this many outstanding requested packets +#define REPLAY_REQUEST_TIMEOUT_SECS 30 // Consider a requested packet lost or unfilled after this many seconds +#define REPLAY_SERVER_STALE_SECS 900 // Consider a server stale if we haven't heard from it in this many seconds +#define REPLAY_CLIENT_BURST 16 // Allow at most this many replay requests per client in a burst +#define REPLAY_CLIENT_RATE_MS 100 // Allow at most one replay request per client every this many milliseconds on average +#define REPLAY_CLIENT_SIZE 128 // Track at most this many clients +#define REPLAY_CLIENT_THROTTLE_ADVERT_MAX 64 // Advertise at most this many throttled clients at a time + +#define REPLAY_REQUEST_TYPE_ADVERTISEMENT 0 // Request an advertisement +#define REPLAY_REQUEST_TYPE_PACKETS 1 // Request a replay of the specified packets +#define REPLAY_REQUEST_TYPE_RESERVED_2 2 // Reserved for future use +#define REPLAY_REQUEST_TYPE_RESERVED_3 3 // Reserved for future use +#define REPLAY_ADVERT_TYPE_AVAILABLE 0 // Advertise available packets +#define REPLAY_ADVERT_TYPE_EXPIRED 1 // Advertise expired packets (i.e. cannot be replayed) +#define REPLAY_ADVERT_TYPE_STATISTICS 2 // Transmit statistics about the replay system +#define REPLAY_ADVERT_TYPE_RESERVED_3 3 // Reserved for future use + +#define REPLAY_NOTIFY_ADOPT 1 // A packet has been adopted into the cache +#define REPLAY_NOTIFY_INTERVAL 2 // The interval timer fired +#define REPLAY_NOTIFY_REPLAY 3 // Trigger replay of wanted packets + +#define REPLAY_HASH(a, b) ((((a ^ b) >> 16) & 0xFFFF) ^ ((a ^ b) & 0xFFFF)) + +typedef uint16_t ReplayHash; +typedef uint16_t ReplayMap; +typedef unsigned long ReplayCursor; + +typedef struct ReplayWire { + union { + uint16_t bitfield = 0; + struct { + uint8_t type : 2; // Request or advertisement type + uint8_t priority : 1; // Please only request / send high-priority packets + uint8_t boot : 1; // (adverts only) The sending node just booted + uint8_t router : 1; // The sending node is a router (prioritise following & replaying for) + uint8_t aggregate : 1; // (adverts only) This is an aggregate replay of prior adverts + uint8_t throttle : 1; // (adverts only) Lists clients that should not request replays in response to this advert + uint8_t reserved_0 : 1; // Reserved for future use + uint8_t sequence : 5; // Incremented with each advertisement + uint8_t reserved_1 : 3; // Reserved for future use + }; + } header; + /** + * Advertisement payload is: + * - uint16 range map (which 16-packet ranges are included) + * - for each range: + * - uint16 packet bitmap (which packets in the range are included) + * - uint16 priority bitmap (which packets in the range are high priority) + * - uint16[] packet hashes + * - (aggregate only) uint16 aggregate mask (which adverts are included in this aggregate) + * - (throttle only) uint8[] list of clients that should not request replays in response to this advert + * + * Expired advertisement payload is: + * - uint16 range map (which 16-packet ranges are included) + * - for each included range: + * - uint16 packet bitmap (which packets in the range are expired) + * + * Request payload is: + * - uint16 range map (which 16-packet ranges are included) + * - for each included range: + * - uint16 packet bitmap (which packets in the range are requested) + */ + +} ReplayWire; +static_assert(sizeof(ReplayWire::header) == sizeof(ReplayWire::header.bitfield)); + +typedef struct ReplayEntry { + meshtastic_MeshPacket *p = NULL; + uint32_t last_replay_millis = 0; + uint16_t replay_count = 0; + ReplayHash hash = 0; +} ReplayEntry; + +typedef struct ReplayAdvertisement { + unsigned int sequence = 0; + ReplayCursor head = 0; + ReplayCursor tail = 0; + std::bitset dirty = {}; +} ReplayAdvertisement; + +typedef struct ReplayServerInfo { + NodeNum id = 0; + unsigned int discovered_millis = 0; + unsigned long last_advert_millis = 0; + unsigned int last_sequence = 0; + unsigned int max_sequence = 0; + unsigned long missing_sequence = 0; + unsigned int replays_requested = 0; + unsigned int adverts_received = 0; + bool flag_priority = false; + bool flag_router = false; + bool is_tracked = false; + ReplayHash packets[REPLAY_BUFFER_SIZE] = {}; + std::bitset available = {}; + std::bitset priority = {}; + std::bitset missing = {}; +} ReplayServerInfo; + +typedef struct ReplayClientInfo { + NodeNum id = 0; + unsigned long last_request_millis = 0; + unsigned int bucket = REPLAY_CLIENT_BURST; + unsigned int requests = 0; +} ReplayClientInfo; + +typedef struct ReplayRequestInfo { + ReplayHash hash = 0; + unsigned long timeout_millis = 0; +} ReplayRequestInfo; + +class ReplayBuffer +{ + public: + ReplayBuffer(){}; + ReplayEntry *adopt(meshtastic_MeshPacket *p); + unsigned int getLength() const { return next - last; }; + unsigned int getNumCached() const { return num_cached; }; + ReplayCursor getHeadCursor() const { return next ? next - 1 : 0; }; + ReplayCursor getTailCursor() const { return last; }; + ReplayEntry *get(unsigned int idx) { return &entries[idx & REPLAY_BUFFER_MASK]; }; + ReplayEntry *search(ReplayHash hash); + ReplayEntry *search(NodeNum from, uint32_t id); + ReplayEntry *search(meshtastic_MeshPacket *p, bool strict = false); + + private: + unsigned int num_cached = 0; + ReplayCursor next = 0; + ReplayCursor last = 0; + ReplayEntry entries[REPLAY_BUFFER_SIZE]; + MemoryDynamicReplayAware packets; + void prune(unsigned int keep = REPLAY_BUFFER_CACHE_MAX); // Free up memory by releasing cached packets + void truncate(unsigned int keep = REPLAY_BUFFER_SIZE); // Discard all but the most recent n packets +}; + +class ReplayModule : public SinglePortModule, private concurrency::NotifiedWorkerThread +{ + public: + ReplayModule() : SinglePortModule("replay", meshtastic_PortNum_REPLAY_APP), concurrency::NotifiedWorkerThread("replay") {} + void adopt(meshtastic_MeshPacket *p); + bool isKnown(ReplayHash hash); + bool isKnown(meshtastic_MeshPacket *p) { return isKnown(REPLAY_HASH(p->from, p->id)); }; + void remember(ReplayHash hash) { memory[memory_next++ & REPLAY_REMEMBER_MASK] = hash; }; + void remember(meshtastic_MeshPacket *p) { remember(REPLAY_HASH(p->from, p->id)); }; + meshtastic_MeshPacket *queuePeek(); + meshtastic_MeshPacket *queuePop(); + unsigned int queueLength() { return queue_length; }; + + private: + ReplayBuffer buffer; + ReplayCursor last_advert_cursor = 0; + unsigned long last_advert_millis = 0; + unsigned long last_expired_millis = 0; + unsigned int packets_since_advert = 0; + unsigned int next_sequence = 0; + std::bitset dirty = {}; + std::bitset dirty_prio = {}; + std::bitset want_replay = {}; + ReplayHash memory[REPLAY_REMEMBER_SIZE] = {}; + ReplayAdvertisement advertisements[32] = {}; + ReplayServerInfo servers[REPLAY_TRACK_SERVERS] = {}; + ReplayClientInfo clients[REPLAY_CLIENT_SIZE] = {}; + ReplayRequestInfo requests[REPLAY_REQUEST_MAX_OUTSTANDING] = {}; + ReplayCursor memory_next = 1; + ReplayCursor replay_from = 0; + ReplayCursor queue[REPLAY_QUEUE_SIZE] = {}; + ReplayCursor queue_next = 0; + ReplayCursor queue_tail = 0; + ReplayCursor queue_length = 0; + bool want_replay_prio = false; + bool want_replay_expired = false; + ReplayClientInfo *client(NodeNum id); + void advertise(bool aggregate = false, unsigned int from_sequence = 0, ReplayMap aggregate_mask = 0); + void advertiseExpired(); + void replay(); + void requestReplay(ReplayServerInfo *server); + void requestMissingAdvertisements(ReplayServerInfo *server); + ProcessMessage handleReceived(const meshtastic_MeshPacket &mp) override; + void handleRequest(const meshtastic_MeshPacket *p); + void handleAdvertisement(const meshtastic_MeshPacket *p); + void handleAvailabilityAdvertisement(ReplayWire *wire, unsigned char *payload, unsigned char *payload_end, + ReplayServerInfo *server); + void handleExpiredAdvertisement(ReplayWire *wire, unsigned char *payload, unsigned char *payload_end, + ReplayServerInfo *server); + ReplayRequestInfo *requestInfo(ReplayHash hash); + bool queuePush(ReplayCursor idx); + void onNotify(uint32_t notification); +}; + +extern ReplayModule *replayModule; \ No newline at end of file From 35529b609ce29ce55a093887f45298ee470d877f Mon Sep 17 00:00:00 2001 From: Steve Gilberd Date: Sat, 20 Sep 2025 20:36:57 +1200 Subject: [PATCH 02/10] Fix wire sequence wrap --- src/modules/ReplayModule.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/modules/ReplayModule.cpp b/src/modules/ReplayModule.cpp index e2392d57fb1..039d711e77a 100644 --- a/src/modules/ReplayModule.cpp +++ b/src/modules/ReplayModule.cpp @@ -842,6 +842,8 @@ void ReplayModule::handleAvailabilityAdvertisement(ReplayWire *wire, unsigned ch // First ever advert received from this server server->last_sequence = REPLAY_SEQUENCE_MASK + 1 + wire->header.sequence; this_sequence = (server->last_sequence & ~REPLAY_SEQUENCE_MASK) | wire->header.sequence; + if (!wire->header.sequence) + this_sequence += REPLAY_SEQUENCE_MASK + 1; // The wire sequence just wrapped LOG_INFO("Replay: First advertisement from server=0x%08x seq=%u, last_seq=%u", server->id, this_sequence, server->last_sequence); } From 94576b79cedf0605d24b87f0ea5f08a471eacd15 Mon Sep 17 00:00:00 2001 From: Steve Gilberd Date: Sat, 20 Sep 2025 22:06:41 +1200 Subject: [PATCH 03/10] Fix heap leak --- src/modules/ReplayModule.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/modules/ReplayModule.cpp b/src/modules/ReplayModule.cpp index 039d711e77a..eb5abfe0845 100644 --- a/src/modules/ReplayModule.cpp +++ b/src/modules/ReplayModule.cpp @@ -41,6 +41,7 @@ ReplayEntry *ReplayBuffer::adopt(meshtastic_MeshPacket *p) if (oldEntry->p) { meshtastic_MeshPacket *oldp = oldEntry->p; oldEntry->p = NULL; + oldp->is_replay_cached = false; packets.release(oldp); num_cached--; } @@ -158,6 +159,7 @@ void ReplayBuffer::prune(unsigned int keep) if (entries[idx].p && entries[idx].p->priority <= threshold) { meshtastic_MeshPacket *p = entries[idx].p; entries[idx].p = NULL; + p->is_replay_cached = false; packets.release(p); num_cached--; } @@ -175,6 +177,7 @@ void ReplayBuffer::truncate(unsigned int keep) if (oldEntry->p) { meshtastic_MeshPacket *oldp = oldEntry->p; oldEntry->p = NULL; + oldp->is_replay_cached = false; packets.release(oldp); num_cached--; } From f353858217918e3bceb4cd197ff09d9868b764de Mon Sep 17 00:00:00 2001 From: Steve Gilberd Date: Sun, 21 Sep 2025 13:01:30 +1200 Subject: [PATCH 04/10] Better handling of stale servers --- src/modules/ReplayModule.cpp | 63 +++++++++++++++++++++++++++--------- src/modules/ReplayModule.h | 1 + 2 files changed, 48 insertions(+), 16 deletions(-) diff --git a/src/modules/ReplayModule.cpp b/src/modules/ReplayModule.cpp index eb5abfe0845..161cb202e72 100644 --- a/src/modules/ReplayModule.cpp +++ b/src/modules/ReplayModule.cpp @@ -504,6 +504,11 @@ void ReplayModule::requestReplay(ReplayServerInfo *server) request &= server->priority; if (!request.any()) return; // Nothing to request + if (server->last_advert_millis + REPLAY_SERVER_STALE_SECS * 1000 < millis()) { + LOG_DEBUG("Replay: Cancelling requests for missing packets from stale server=0x%08x", server->id); + invalidateServer(server); + return; + } unsigned long request_millis = millis() + REPLAY_REQUEST_TIMEOUT_SECS * 1000; ReplayRequestInfo *requests[REPLAY_BUFFER_SIZE] = {}; for (int i = 0; i < REPLAY_BUFFER_SIZE; i++) { @@ -731,6 +736,11 @@ void ReplayModule::handleAdvertisement(const meshtastic_MeshPacket *p) for (unsigned int i = 0; i < REPLAY_TRACK_SERVERS; i++) { if (servers[i].id == p->from) { server = &servers[i]; + if (server->last_advert_millis + REPLAY_SERVER_STALE_SECS * 1000 < millis()) { + LOG_INFO("Replay: Stale server 0x%08x has become active again after %u seconds", server->id, + (millis() - server->last_advert_millis) / 1000); + invalidateServer(server); + } break; } } @@ -738,14 +748,9 @@ void ReplayModule::handleAdvertisement(const meshtastic_MeshPacket *p) server->flag_priority = wire->header.priority; server->flag_router = wire->header.router; - if (wire->header.boot) { + if (wire->header.boot) // The server has rebooted, so reset its availability state - server->available.reset(); - server->priority.reset(); - server->missing.reset(); - server->last_sequence = 0; - server->missing_sequence = 0; - } + invalidateServer(server); switch (wire->header.type) { case REPLAY_ADVERT_TYPE_AVAILABLE: @@ -852,15 +857,22 @@ void ReplayModule::handleAvailabilityAdvertisement(ReplayWire *wire, unsigned ch } if (this_sequence <= server->last_sequence - 15) this_sequence += REPLAY_SEQUENCE_MASK + 1; // This is a forward wrap, not a reference to an old sequence - if (this_sequence < server->last_sequence && !wire->header.aggregate) { - // If the sequence number went backwards, then we have likely missed many intervening - // adverts and should reset our tracking state & start with a blank slate. Do not ask - // for missing adverts, because we have missed way too much for this to be sensible. - LOG_WARN("Replay: Advertisement sequence went backwards from server=0x%08x seq=%u, last_seq=%u", server->id, - this_sequence, server->last_sequence); - server->available.reset(); - server->priority.reset(); - server->missing.reset(); + if (!wire->header.aggregate && + ((this_sequence < server->last_sequence) || + (server->max_sequence > server->last_sequence && server->max_sequence - server->last_sequence > 15))) { + if (this_sequence < server->last_sequence) + // If the sequence number went backwards, then we have likely missed many intervening + // adverts and should reset our tracking state & start with a blank slate. Do not ask + // for missing adverts, because we have missed way too much for this to be sensible. + LOG_WARN("Replay: Advertisement sequence went backwards from server=0x%08x seq=%u, last_seq=%u", server->id, + this_sequence, server->last_sequence); + else if (server->max_sequence - server->last_sequence > 15) + // If we have missed so many adverts that we are this far behind, we are probably never + // going to catch up via aggregates, so reset our tracking state & start with a blank slate. + LOG_WARN("Replay: Too many missed adverts from server=0x%08x seq=%u, last_seq=%u, max_seq=%u", server->id, + this_sequence, server->last_sequence, server->max_sequence); + + invalidateServer(server); server->last_sequence = REPLAY_SEQUENCE_MASK + 1 + wire->header.sequence; this_sequence = (server->last_sequence & ~REPLAY_SEQUENCE_MASK) | wire->header.sequence; server->max_sequence = this_sequence; @@ -1040,6 +1052,25 @@ meshtastic_MeshPacket *ReplayModule::queuePop() return p; } +/** + * Invalidate a server record's state and prepare it for reuse + */ +void ReplayModule::invalidateServer(ReplayServerInfo *server, bool stats) +{ + server->last_sequence = 0; + server->max_sequence = 0; + server->missing_sequence = 0; + server->available.reset(); + server->priority.reset(); + server->missing.reset(); + + if (stats) { + server->adverts_received = 0; + server->replays_requested = 0; + server->last_advert_millis = 0; + } +} + /** * Handle thread notifications */ diff --git a/src/modules/ReplayModule.h b/src/modules/ReplayModule.h index 046620822a1..a29cccfc950 100644 --- a/src/modules/ReplayModule.h +++ b/src/modules/ReplayModule.h @@ -212,6 +212,7 @@ class ReplayModule : public SinglePortModule, private concurrency::NotifiedWorke ReplayServerInfo *server); ReplayRequestInfo *requestInfo(ReplayHash hash); bool queuePush(ReplayCursor idx); + void invalidateServer(ReplayServerInfo *server, bool stats = false); void onNotify(uint32_t notification); }; From 22ad9f9f644e643ec07370c9efae1c90b0f3a7fb Mon Sep 17 00:00:00 2001 From: Steve Gilberd Date: Sun, 21 Sep 2025 13:01:48 +1200 Subject: [PATCH 05/10] Update bug comment --- src/modules/ReplayModule.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/modules/ReplayModule.cpp b/src/modules/ReplayModule.cpp index 161cb202e72..b66d6983e54 100644 --- a/src/modules/ReplayModule.cpp +++ b/src/modules/ReplayModule.cpp @@ -11,6 +11,8 @@ * - Scale replay rate based on modem settings * - Prioritise replay of packets requested by routers * - Lots of testing (and likely a bunch of bugfixes) + * - WARN | 23:30:46 4214 [Router] Replay: Advertisement sequence went backwards from server=0x056191db seq=36, last_seq=48 + * - Back off repeated replay requests? * - Implement a periodic stats packet that includes: * - Number of adverts sent * - Number of replays sent From 86ca8441dd913626ad5e6bb0791e9ad9525ef490 Mon Sep 17 00:00:00 2001 From: Steve Gilberd Date: Sun, 21 Sep 2025 13:02:10 +1200 Subject: [PATCH 06/10] Update tunables --- src/modules/ReplayModule.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/modules/ReplayModule.h b/src/modules/ReplayModule.h index a29cccfc950..d15a2cc746c 100644 --- a/src/modules/ReplayModule.h +++ b/src/modules/ReplayModule.h @@ -13,7 +13,7 @@ #define REPLAY_QUEUE_MASK 0x0F // Mask for wrapping the replay queue index #define REPLAY_QUEUE_SIZE (REPLAY_QUEUE_MASK + 1) // Size of the replay #define REPLAY_FLUSH_PACKETS 16 // Send an advertisement after at most this many packets -#define REPLAY_FLUSH_SECS 15 // Send an advertisement after at most this many seconds (if unadvertised packets are pending) +#define REPLAY_FLUSH_SECS 20 // Send an advertisement after at most this many seconds (if unadvertised packets are pending) #define REPLAY_STARTUP_DELAY_SECS 30 // Wait this many seconds after boot before sending the first advertisement #define REPLAY_ADVERT_MAX_PACKETS 64 // Advertise at most this many packets at a time #define REPLAY_CHUTIL_THRESHOLD_PCT 35 // If chutil is >= this, only advertise high-priority packets @@ -22,16 +22,16 @@ #define REPLAY_HEAP_RESERVE_PCT 5 // Don't prune the cache to below this much of the heap #define REPLAY_HEAP_FREE_MIN_PCT 10 // Prune packets if free heap is below this #define REPLAY_HEAP_FREE_TARGET_PCT 15 // Prune packets until free heap is above this -#define REPLAY_SPACING_MS 500 // Spacing between replayed packets (TODO: scale based on radio settings) +#define REPLAY_SPACING_MS 1000 // Spacing between replayed packets (TODO: scale based on radio settings) #define REPLAY_EXPIRED_SPACING_SECS 10 // Minimum spacing between advertisements of expired packets #define REPLAY_SEQUENCE_MASK 0x1F // Mask for wrapping advertisement sequence number #define REPLAY_TRACK_SERVERS 8 // Keep track of state for this many servers #define REPLAY_REQUEST_MAX_PACKETS 16 // Request at most this many packets at a time #define REPLAY_REQUEST_MAX_OUTSTANDING 32 // Keep track of this many outstanding requested packets -#define REPLAY_REQUEST_TIMEOUT_SECS 30 // Consider a requested packet lost or unfilled after this many seconds -#define REPLAY_SERVER_STALE_SECS 900 // Consider a server stale if we haven't heard from it in this many seconds +#define REPLAY_REQUEST_TIMEOUT_SECS 45 // Consider a requested packet lost or unfilled after this many seconds +#define REPLAY_SERVER_STALE_SECS 300 // Consider a server stale if we haven't heard from it in this many seconds #define REPLAY_CLIENT_BURST 16 // Allow at most this many replay requests per client in a burst -#define REPLAY_CLIENT_RATE_MS 100 // Allow at most one replay request per client every this many milliseconds on average +#define REPLAY_CLIENT_RATE_MS 1000 // Allow at most one replay request per client every this many milliseconds on average #define REPLAY_CLIENT_SIZE 128 // Track at most this many clients #define REPLAY_CLIENT_THROTTLE_ADVERT_MAX 64 // Advertise at most this many throttled clients at a time From 208dc41809cbb04f885fb838ab82df1f39f77b47 Mon Sep 17 00:00:00 2001 From: Steve Gilberd Date: Tue, 23 Sep 2025 12:52:49 +1200 Subject: [PATCH 07/10] Properly log last expiry time --- src/modules/ReplayModule.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/modules/ReplayModule.cpp b/src/modules/ReplayModule.cpp index b66d6983e54..d4c6bddca5c 100644 --- a/src/modules/ReplayModule.cpp +++ b/src/modules/ReplayModule.cpp @@ -419,6 +419,7 @@ void ReplayModule::advertiseExpired() memcpy(p->decoded.payload.bytes, &payload, p->decoded.payload.size); service->sendToMesh(p); + last_expired_millis = millis(); want_replay_expired = false; } From ac8190088126e3bc024a5287937680e01fe0457b Mon Sep 17 00:00:00 2001 From: Steve Gilberd Date: Tue, 23 Sep 2025 15:07:41 +1200 Subject: [PATCH 08/10] Add stats packet --- src/modules/ReplayModule.cpp | 210 ++++++++++++++++++++++++++++++++--- src/modules/ReplayModule.h | 43 +++++++ 2 files changed, 239 insertions(+), 14 deletions(-) diff --git a/src/modules/ReplayModule.cpp b/src/modules/ReplayModule.cpp index d4c6bddca5c..374a792a55c 100644 --- a/src/modules/ReplayModule.cpp +++ b/src/modules/ReplayModule.cpp @@ -10,21 +10,11 @@ * - What should the tunable default values be? * - Scale replay rate based on modem settings * - Prioritise replay of packets requested by routers + * - Cache replay stats packets normally * - Lots of testing (and likely a bunch of bugfixes) * - WARN | 23:30:46 4214 [Router] Replay: Advertisement sequence went backwards from server=0x056191db seq=36, last_seq=48 * - Back off repeated replay requests? - * - Implement a periodic stats packet that includes: - * - Number of adverts sent - * - Number of replays sent - * - Number of replay requests received - * - Number of replays requested - * - Number of adverts received - * - For each server we are tracking: - * - Age of last advert - * - Number of adverts received - * - Number of packets requested from this server - * - Number of packets requested by this server - * - router flag + * - Frequent reboots since implementing stats */ ReplayModule *replayModule = NULL; @@ -198,6 +188,10 @@ void ReplayModule::adopt(meshtastic_MeshPacket *p) if (!entry) return; // Already cached + metrics.packets_rebroadcast++; + if (p->priority >= REPLAY_CHUTIL_PRIORITY) + metrics.packets_rebroadcast_prio++; + LOG_DEBUG("Replay: Adopting packet from=0x%08x id=0x%08x priority=%u packets=%u cached=%u cache_bytes=%u", p->from, p->id, p->priority, buffer.getLength(), buffer.getNumCached(), buffer.getNumCached() * sizeof(meshtastic_MeshPacket)); unsigned int idx = buffer.getHeadCursor() & REPLAY_BUFFER_MASK; @@ -382,6 +376,10 @@ void ReplayModule::advertise(bool aggregate, unsigned int from_sequence, ReplayM packets_since_advert -= packets; service->sendToMesh(p); + metrics.adverts_sent++; + if (aggregate) + metrics.adverts_sent_agg++; + if (again) { advertise(); } @@ -420,6 +418,7 @@ void ReplayModule::advertiseExpired() service->sendToMesh(p); last_expired_millis = millis(); + metrics.adverts_sent_expired++; want_replay_expired = false; } @@ -489,6 +488,9 @@ void ReplayModule::replay() } else { to_send->last_replay_millis = millis(); to_send->replay_count++; + metrics.packets_replayed++; + if (to_send->p->priority >= REPLAY_CHUTIL_PRIORITY) + metrics.packets_replayed_prio++; want_replay.reset(to_send_idx); } } else { @@ -580,6 +582,9 @@ void ReplayModule::requestReplay(ReplayServerInfo *server) *map |= (1 << i); *payload |= (1 << j); server->replays_requested++; + metrics.packets_requested++; + if (server->priority.test(idx)) + metrics.packets_requested_prio++; } } if (*map & (1 << i)) @@ -590,6 +595,7 @@ void ReplayModule::requestReplay(ReplayServerInfo *server) LOG_INFO("Replay: Requesting %u missing packets server=0x%08x prio=%u ranges=%u size=%u", request.count(), server->id, wire.header.priority, (uint16_t)*map, p->decoded.payload.size); service->sendToMesh(p); + getStats(server->id)->requests_to++; } /** @@ -658,6 +664,13 @@ void ReplayModule::handleRequest(const meshtastic_MeshPacket *p) client->bucket = REPLAY_CLIENT_BURST; client->last_request_millis = millis(); + ReplayStats *stats = getStats(p->from); + stats->requests_from++; + if (wire->header.router) + stats->is_router = true; + if (wire->header.priority) + stats->priority = true; + switch (wire->header.type) { case REPLAY_REQUEST_TYPE_ADVERTISEMENT: { if (payload_words < 2) { @@ -671,6 +684,7 @@ void ReplayModule::handleRequest(const meshtastic_MeshPacket *p) } LOG_INFO("Replay: Advertisement request from=0x%08x seq=%u missing=%u", p->from, wire->header.sequence, missing); advertise(true, wire->header.sequence, missing); + stats->replays_for++; } break; case REPLAY_REQUEST_TYPE_PACKETS: { if (payload_words < 3 || payload_words < 1 /*header*/ + 1 /*map*/ + __builtin_popcount(payload[1]) /*ranges*/) { @@ -693,6 +707,7 @@ void ReplayModule::handleRequest(const meshtastic_MeshPacket *p) continue; // Don't replay packets that are already in our TX queue if (!wire->header.priority || (entry->p && entry->p->priority >= REPLAY_CHUTIL_PRIORITY)) { want_replay.set(idx); + stats->replays_for++; requested++; client->bucket--; LOG_INFO("Replay: Request for %s packet hash=0x%04x client=0x%08x", entry->p ? "cached" : "expired", @@ -704,8 +719,10 @@ void ReplayModule::handleRequest(const meshtastic_MeshPacket *p) } range++; } - if (!client->bucket) + if (!client->bucket) { LOG_WARN("Replay: Client 0x%08x is being rate limited", client->id); + stats->throttled = true; + } replay_from = buffer.getHeadCursor(); LOG_INFO("Replay: Pending replay of %u packets, requested=%u, want_expired=%u", want_replay.count(), requested, want_replay_expired); @@ -768,6 +785,18 @@ void ReplayModule::handleAdvertisement(const meshtastic_MeshPacket *p) handleExpiredAdvertisement(wire, (unsigned char *)payload, ((unsigned char *)p->decoded.payload.bytes) + p->decoded.payload.size, server); break; + case REPLAY_ADVERT_TYPE_STATISTICS: { + meshtastic_ReplayStats rs = {}; + bool success = pb_decode_from_bytes((unsigned char *)payload, p->decoded.payload.size - sizeof(ReplayWire), + meshtastic_ReplayStats_fields, &rs); + if (!success) + LOG_WARN("Replay: Failed to decode invalid stats advertisement from=0x%08x", p->from); + else { + LOG_INFO("Replay: Received stats summary from=0x%08x", p->from); + printStats(&rs); + } + } + return; // Stats packets aren't a normal advertisement, so don't set up tracking for this node default: LOG_WARN("Replay: Unknown advertisement type %u", wire->header.type); return; @@ -809,6 +838,13 @@ void ReplayModule::handleAdvertisement(const meshtastic_MeshPacket *p) void ReplayModule::handleAvailabilityAdvertisement(ReplayWire *wire, unsigned char *data, unsigned char *data_end, ReplayServerInfo *server) { + ReplayStats *stats = getStats(server->id); + stats->adverts_from++; + if (wire->header.router) + stats->is_router = true; + if (wire->header.priority) + stats->priority = true; + int payload_words = (data_end - data) / sizeof(uint16_t); if (payload_words < 2 || payload_words < 1 /*map*/ + __builtin_popcount(((uint16_t *)data)[0]) * 2 /*ranges*/) { LOG_WARN("Replay: Availability advert payload too small"); @@ -839,6 +875,8 @@ void ReplayModule::handleAvailabilityAdvertisement(ReplayWire *wire, unsigned ch if (!isKnown(server->packets[idx])) { LOG_WARN("Replay: Discovered missing packet hash=0x%04x via=0x%08x", server->packets[idx], server->id); server->missing.set(idx); + server->packets_missed++; + stats->missed_from++; } else { LOG_DEBUG("Replay: Discovered known packet hash=0x%04x via=0x%08x", server->packets[idx], server->id); server->missing.reset(idx); @@ -889,6 +927,9 @@ void ReplayModule::handleAvailabilityAdvertisement(ReplayWire *wire, unsigned ch if (seq >= this_sequence) break; server->missing_sequence |= (1 << i); + server->packets_missed++; + stats->missed_from++; + metrics.packets_requested++; LOG_WARN("Replay: Noticed missing advertisement seq=%u from server=0x%08x", seq, server->id); } while (server->last_sequence < server->max_sequence && !(server->missing_sequence & 3)) { @@ -930,8 +971,10 @@ void ReplayModule::handleAvailabilityAdvertisement(ReplayWire *wire, unsigned ch uint8_t *throttled = (uint8_t *)payload; uint8_t me = nodeDB->getNodeNum() & 0x000F; while (throttled <= data_end) { - if (*throttled++ == me) + if (*throttled++ == me) { + stats->throttled_from++; return; // We are being throttled by the server, so don't ask for anything + } } } @@ -958,6 +1001,13 @@ void ReplayModule::handleAvailabilityAdvertisement(ReplayWire *wire, unsigned ch void ReplayModule::handleExpiredAdvertisement(ReplayWire *wire, unsigned char *data, unsigned char *data_end, ReplayServerInfo *server) { + ReplayStats *stats = getStats(server->id); + stats->expired_from++; + if (wire->header.router) + stats->is_router = true; + if (wire->header.priority) + stats->priority = true; + unsigned int expired = 0; uint16_t *payload = (uint16_t *)data; ReplayMap map = *payload++; @@ -1074,6 +1124,132 @@ void ReplayModule::invalidateServer(ReplayServerInfo *server, bool stats) } } +/** + * Get the current stats object for a node + */ +ReplayStats *ReplayModule::getStats(NodeNum id) +{ + for (unsigned int i = 0; i < REPLAY_STATS_SIZE; i++) { + if (servers[i].id == id) + return &stats[i]; + } + ReplayStats *s = &stats[stats_next++ & REPLAY_STATS_MASK]; + *s = {}; + s->id = id; + return s; +} + +/** + * Reset stats for all nodes + */ +void ReplayModule::resetStats() +{ + metrics = {}; + stats_next = 0; + memset(stats, 0, sizeof(stats)); + metrics.window_start_millis = millis(); +} + +/** + * Broadcast a stats packet to the mesh + */ +void ReplayModule::sendStats() +{ + if (!metrics.adverts_sent && !IS_ONE_OF(config.device.role, meshtastic_Config_DeviceConfig_Role_ROUTER, + meshtastic_Config_DeviceConfig_Role_ROUTER_LATE)) { + LOG_DEBUG("Replay: Skipping stats broadcast because no adverts sent and not a router"); + resetStats(); + return; + } + + ReplayWire wire = {}; + wire.header.type = REPLAY_ADVERT_TYPE_STATISTICS; + wire.header.priority = airTime->channelUtilizationPercent() >= REPLAY_CHUTIL_THRESHOLD_PCT; + wire.header.router = IS_ONE_OF(config.device.role, meshtastic_Config_DeviceConfig_Role_ROUTER, + meshtastic_Config_DeviceConfig_Role_ROUTER_LATE); + + meshtastic_ReplayStats rs = {}; + rs.window_length_secs = (millis() - metrics.window_start_millis) / 1000; + rs.current_size = buffer.getLength(); + rs.current_cached = buffer.getNumCached(); + rs.adverts_sent = metrics.adverts_sent; + rs.expired_sent = metrics.adverts_sent_expired; + rs.requests_sent_packets = metrics.packets_requested; + rs.requests_sent_packets_prio = metrics.packets_requested_prio; + rs.packets_replayed = metrics.packets_replayed; + rs.packets_replayed_prio = metrics.packets_replayed_prio; + rs.packets_rebroadcast = metrics.packets_rebroadcast; + rs.packets_rebroadcast_prio = metrics.packets_rebroadcast_prio; + + for (unsigned int i = 0; i < REPLAY_STATS_SIZE; i++) { + ReplayStats *s = &stats[i]; + rs.expired_received += s->expired_from; + rs.requests_sent += s->requests_to; + rs.packets_missed += s->missed_from; + + if (s->adverts_from) { + rs.unique_advertisers++; + rs.adverts_received += s->adverts_from; + } + if (s->requests_from) { + rs.requests_received += s->requests_from; + rs.unique_requestors++; + } + if (s->throttled) + rs.throttled_requestors++; + } + + for (unsigned int i = 0; i < REPLAY_TRACK_SERVERS; i++) { + ReplayServerInfo *server = &servers[i]; + if (!server->is_tracked) + continue; + meshtastic_ReplayServerStats *ss = &rs.servers[rs.servers_count++]; + ss->id = server->id; + ss->adverts_received = server->adverts_received; + ss->requests_sent = server->replays_requested; + ss->packets_missed = server->packets_missed; + ss->last_advert_secs = (millis() - server->last_advert_millis) / 1000; + ss->is_router = server->flag_router; + ss->priority = server->flag_priority; + } + + meshtastic_MeshPacket *p = allocDataPacket(); + assert(p); + unsigned char *pos = p->decoded.payload.bytes; + p->to = NODENUM_BROADCAST; + p->priority = meshtastic_MeshPacket_Priority_DEFAULT; + memcpy(pos, &wire.header.bitfield, sizeof(wire.header.bitfield)); + pos += sizeof(wire.header.bitfield); + pos += pb_encode_to_bytes(pos, sizeof(p->decoded.payload.bytes) - (pos - p->decoded.payload.bytes), + meshtastic_ReplayStats_fields, &rs); + p->decoded.payload.size = pos - p->decoded.payload.bytes; + + LOG_INFO("Replay: Broadcasting statistics to mesh"); + printStats(&rs); + + service->sendToMesh(p); + resetStats(); + last_stats_millis = millis(); +} + +void ReplayModule::printStats(meshtastic_ReplayStats *rs) +{ + LOG_INFO("Replay statistics (last %u seconds):", rs->window_length_secs); + LOG_INFO(" Buffer: size=%u cached=%u", rs->current_size, rs->current_cached); + LOG_INFO(" Advertisements: sent=%u expired=%u received=%u advertisers=%u missed_packets=%u", rs->adverts_sent, + rs->expired_sent, rs->adverts_received, rs->unique_advertisers, rs->packets_missed); + LOG_INFO(" Requests: sent=%u packets=%u prio=%u received=%u requestors=%u throttled=%u", rs->requests_sent, + rs->requests_sent_packets, rs->requests_sent_packets_prio, rs->requests_received, rs->unique_requestors, + rs->throttled_requestors); + LOG_INFO(" Replays: packets=%u prio=%u", rs->packets_replayed, rs->packets_replayed_prio); + LOG_INFO(" Rebroadcasts: packets=%u prio=%u", rs->packets_rebroadcast, rs->packets_rebroadcast_prio); + for (unsigned int i = 0; i < rs->servers_count; i++) { + meshtastic_ReplayServerStats *s = &rs->servers[i]; + LOG_INFO(" Server 0x%08x: adverts=%u requests=%u missed=%u last_advert=%us router=%u prio=%u", s->id, + s->adverts_received, s->requests_sent, s->packets_missed, s->last_advert_secs, s->is_router, s->priority); + } +} + /** * Handle thread notifications */ @@ -1092,10 +1268,16 @@ void ReplayModule::onNotify(uint32_t notification) if (packets_since_advert > REPLAY_FLUSH_PACKETS || deadline <= now) advertise(); + if (last_stats_millis + REPLAY_STATS_INTERVAL_SECS * 1000 <= now) + sendStats(); + if (replay_from >= buffer.getTailCursor() && replay_from) { // We still have packets pending replay notifyLater(REPLAY_SPACING_MS, REPLAY_NOTIFY_REPLAY, true); } else if (deadline > now) { + if (last_stats_millis + REPLAY_STATS_INTERVAL_SECS * 1000 < deadline) + deadline = last_stats_millis + REPLAY_STATS_INTERVAL_SECS * 1000; + // Sleep until the next advert deadline LOG_DEBUG("Sleep to deadline %ld", deadline - now); notifyLater(deadline - now, REPLAY_NOTIFY_INTERVAL, false); diff --git a/src/modules/ReplayModule.h b/src/modules/ReplayModule.h index d15a2cc746c..d1ce6167ac1 100644 --- a/src/modules/ReplayModule.h +++ b/src/modules/ReplayModule.h @@ -12,6 +12,8 @@ #define REPLAY_BUFFER_CACHE_MAX REPLAY_BUFFER_SIZE // Cache at most this many packets #define REPLAY_QUEUE_MASK 0x0F // Mask for wrapping the replay queue index #define REPLAY_QUEUE_SIZE (REPLAY_QUEUE_MASK + 1) // Size of the replay +#define REPLAY_STATS_MASK 0x7F // Mask for wrapping the stats index +#define REPLAY_STATS_SIZE (REPLAY_STATS_MASK + 1) // Size of the stats array #define REPLAY_FLUSH_PACKETS 16 // Send an advertisement after at most this many packets #define REPLAY_FLUSH_SECS 20 // Send an advertisement after at most this many seconds (if unadvertised packets are pending) #define REPLAY_STARTUP_DELAY_SECS 30 // Wait this many seconds after boot before sending the first advertisement @@ -34,6 +36,7 @@ #define REPLAY_CLIENT_RATE_MS 1000 // Allow at most one replay request per client every this many milliseconds on average #define REPLAY_CLIENT_SIZE 128 // Track at most this many clients #define REPLAY_CLIENT_THROTTLE_ADVERT_MAX 64 // Advertise at most this many throttled clients at a time +#define REPLAY_STATS_INTERVAL_SECS 900 // Send statistics every n seconds #define REPLAY_REQUEST_TYPE_ADVERTISEMENT 0 // Request an advertisement #define REPLAY_REQUEST_TYPE_PACKETS 1 // Request a replay of the specified packets @@ -116,6 +119,7 @@ typedef struct ReplayServerInfo { unsigned long missing_sequence = 0; unsigned int replays_requested = 0; unsigned int adverts_received = 0; + unsigned int packets_missed = 0; bool flag_priority = false; bool flag_router = false; bool is_tracked = false; @@ -137,6 +141,26 @@ typedef struct ReplayRequestInfo { unsigned long timeout_millis = 0; } ReplayRequestInfo; +typedef struct ReplayStats { + NodeNum id = 0; + uint8_t adverts_from = 0; // Number of adverts received from this node + uint8_t expired_from = 0; // Number of expiry adverts received from this node + uint8_t missed_from = 0; // Number of missed adverts & packets sent by this node + uint8_t requests_from = 0; // Number of requests received from this node + uint8_t throttled_from = 0; // Number of times we were throttled by this node + uint8_t requests_to = 0; // Number of requests sent to this node + uint8_t replays_for = 0; // Number of packets replayed for this node + union { + uint8_t bitfield = 0; + struct { + uint8_t is_router : 1; // This node is a router + uint8_t throttled : 1; // This node was throttled at some point within the stats window + uint8_t priority : 1; // This node indicated priority constraints at some point within the stats window + uint8_t reserved : 5; // Reserved for future use + }; + }; +} ReplayStats; + class ReplayBuffer { public: @@ -179,6 +203,7 @@ class ReplayModule : public SinglePortModule, private concurrency::NotifiedWorke ReplayCursor last_advert_cursor = 0; unsigned long last_advert_millis = 0; unsigned long last_expired_millis = 0; + unsigned long last_stats_millis = 0; unsigned int packets_since_advert = 0; unsigned int next_sequence = 0; std::bitset dirty = {}; @@ -189,6 +214,8 @@ class ReplayModule : public SinglePortModule, private concurrency::NotifiedWorke ReplayServerInfo servers[REPLAY_TRACK_SERVERS] = {}; ReplayClientInfo clients[REPLAY_CLIENT_SIZE] = {}; ReplayRequestInfo requests[REPLAY_REQUEST_MAX_OUTSTANDING] = {}; + ReplayStats stats[REPLAY_STATS_SIZE] = {}; + ReplayCursor stats_next = 0; ReplayCursor memory_next = 1; ReplayCursor replay_from = 0; ReplayCursor queue[REPLAY_QUEUE_SIZE] = {}; @@ -197,6 +224,18 @@ class ReplayModule : public SinglePortModule, private concurrency::NotifiedWorke ReplayCursor queue_length = 0; bool want_replay_prio = false; bool want_replay_expired = false; + struct { + unsigned int adverts_sent = 0; + unsigned int adverts_sent_agg = 0; + unsigned int adverts_sent_expired = 0; + unsigned int packets_rebroadcast = 0; + unsigned int packets_rebroadcast_prio = 0; + unsigned int packets_replayed = 0; + unsigned int packets_replayed_prio = 0; + unsigned int packets_requested = 0; + unsigned int packets_requested_prio = 0; + unsigned int window_start_millis = 0; + } metrics; ReplayClientInfo *client(NodeNum id); void advertise(bool aggregate = false, unsigned int from_sequence = 0, ReplayMap aggregate_mask = 0); void advertiseExpired(); @@ -213,6 +252,10 @@ class ReplayModule : public SinglePortModule, private concurrency::NotifiedWorke ReplayRequestInfo *requestInfo(ReplayHash hash); bool queuePush(ReplayCursor idx); void invalidateServer(ReplayServerInfo *server, bool stats = false); + ReplayStats *getStats(NodeNum id); + void resetStats(); + void sendStats(); + void printStats(meshtastic_ReplayStats *rs); void onNotify(uint32_t notification); }; From 4a707f7656aeb68c253761616985c8b9864ea09c Mon Sep 17 00:00:00 2001 From: Steve Gilberd Date: Tue, 23 Sep 2025 17:04:57 +1200 Subject: [PATCH 09/10] Collate requests-to stat --- src/modules/ReplayModule.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/modules/ReplayModule.cpp b/src/modules/ReplayModule.cpp index 374a792a55c..4038d13843f 100644 --- a/src/modules/ReplayModule.cpp +++ b/src/modules/ReplayModule.cpp @@ -1195,6 +1195,8 @@ void ReplayModule::sendStats() rs.requests_received += s->requests_from; rs.unique_requestors++; } + if (s->requests_to) + rs.requests_sent += s->requests_to; if (s->throttled) rs.throttled_requestors++; } From 88647b0945eeb8387d2a10befba7731ddfcb6920 Mon Sep 17 00:00:00 2001 From: Steve Gilberd Date: Thu, 25 Sep 2025 16:04:16 +1200 Subject: [PATCH 10/10] Reinstate accidentally-removed printPacket --- src/mesh/RadioLibInterface.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/mesh/RadioLibInterface.cpp b/src/mesh/RadioLibInterface.cpp index 9d37d321b70..1a4a93ad2bf 100644 --- a/src/mesh/RadioLibInterface.cpp +++ b/src/mesh/RadioLibInterface.cpp @@ -496,6 +496,8 @@ void RadioLibInterface::handleReceiveInterrupt() airTime->logAirtime(RX_LOG, xmitMsec); + printPacket("Lora RX", mp); + #if !MESHTASTIC_EXCLUDE_REPLAY if (REPLAY_FAKE_PACKET_LOSS_PERCENT && (rand() % 100 < REPLAY_FAKE_PACKET_LOSS_PERCENT)) { packetPool.release(mp);