From 23a18b5019f3ee3c7dd327b472b688980bc98fe9 Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Fri, 12 Dec 2025 16:52:54 +0100 Subject: [PATCH] Add ConnPoolListener support to RouteSegmentedConnPool Add optional ConnPoolListener callbacks to RouteSegmentedConnPool to keep pool event subscription consistent across pool implementations. Callbacks are invoked on successful lease and on release. When no listener is configured (default), the hot path is a single null-check. Listener failures are ignored to avoid affecting pool correctness. --- .../hc/core5/pool/RouteSegmentedConnPool.java | 202 +++++++++++++----- 1 file changed, 147 insertions(+), 55 deletions(-) diff --git a/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java b/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java index d49744a2f..667519137 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java @@ -30,20 +30,19 @@ import java.util.ArrayDeque; import java.util.Deque; import java.util.HashSet; -import java.util.Iterator; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -62,11 +61,12 @@ import org.apache.hc.core5.util.Timeout; /** - * Lock-free, route-segmented connection pool with tiny, conditional round-robin assistance. + * Route-segmented connection pool with reduced contention and a small, conditional + * round-robin helper for busy multi-route scenarios. * *

Per-route state is kept in independent segments. Disposal of connections is offloaded - * to a bounded executor so slow closes do not block threads leasing on other routes. - * A minimal round-robin drainer is engaged only when there are many pending routes and + * to a bounded executor so slow graceful closes do not block threads leasing on other routes. + * A minimal round-robin helper is engaged only when there are many pending routes and * there is global headroom; it never scans all routes.

* * @param route key type @@ -80,14 +80,19 @@ @Experimental public final class RouteSegmentedConnPool implements ManagedConnPool { - // Tiny RR assist: only engage when there are many distinct routes waiting and there is headroom. - private static final int RR_MIN_PENDING_ROUTES = 12; + // Engage RR helper only when there are enough pending routes to benefit. + private static final int RR_MIN_PENDING_ROUTES = 8; private static final int RR_BUDGET = 64; + private static final int RR_INLINE_FALLBACK_BUDGET = 8; private final PoolReusePolicy reusePolicy; private final TimeValue timeToLive; + private final long ttlMillis; + private final boolean hasTtl; private final DisposalCallback disposal; + private final ConnPoolListener connPoolListener; + private final AtomicInteger defaultMaxPerRoute = new AtomicInteger(5); private final ConcurrentHashMap segments = new ConcurrentHashMap<>(); @@ -97,7 +102,7 @@ public final class RouteSegmentedConnPool implement private final AtomicBoolean closed = new AtomicBoolean(false); - private final ScheduledExecutorService timeouts; + private final ScheduledThreadPoolExecutor timeouts; /** * Dedicated executor for asynchronous, best-effort disposal. @@ -116,24 +121,46 @@ public RouteSegmentedConnPool( final TimeValue timeToLive, final PoolReusePolicy reusePolicy, final DisposalCallback disposal) { + this(defaultMaxPerRoute, maxTotal, timeToLive, reusePolicy, disposal, null); + } + + public RouteSegmentedConnPool( + final int defaultMaxPerRoute, + final int maxTotal, + final TimeValue timeToLive, + final PoolReusePolicy reusePolicy, + final DisposalCallback disposal, + final ConnPoolListener connPoolListener) { this.defaultMaxPerRoute.set(defaultMaxPerRoute > 0 ? defaultMaxPerRoute : 5); this.maxTotal.set(maxTotal > 0 ? maxTotal : 25); this.timeToLive = timeToLive != null ? timeToLive : TimeValue.NEG_ONE_MILLISECOND; + + if (this.timeToLive.getDuration() >= 0) { + this.ttlMillis = this.timeToLive.toMilliseconds(); + this.hasTtl = true; + } else { + this.ttlMillis = -1L; + this.hasTtl = false; + } + this.reusePolicy = reusePolicy != null ? reusePolicy : PoolReusePolicy.LIFO; this.disposal = Args.notNull(disposal, "disposal"); + this.connPoolListener = connPoolListener; final ThreadFactory tf = r -> { final Thread t = new Thread(r, "seg-pool-timeouts"); t.setDaemon(true); return t; }; - this.timeouts = Executors.newSingleThreadScheduledExecutor(tf); + this.timeouts = new ScheduledThreadPoolExecutor(1, tf); + this.timeouts.setRemoveOnCancelPolicy(true); // Asynchronous disposer for slow GRACEFUL closes. final int cores = Math.max(2, Runtime.getRuntime().availableProcessors()); - final int nThreads = Math.min(8, Math.max(2, cores)); // allow up to 8 on bigger boxes + final int nThreads = Math.min(8, cores); final int qsize = 1024; + final ThreadFactory df = r -> { final Thread t = new Thread(r, "seg-pool-disposer"); t.setDaemon(true); @@ -144,7 +171,7 @@ public RouteSegmentedConnPool( 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(qsize), df, - new ThreadPoolExecutor.AbortPolicy()); // but we preflight capacity to avoid exception storms + new ThreadPoolExecutor.AbortPolicy()); } final class Segment { @@ -161,18 +188,30 @@ int limitPerRoute(final R route) { final class Waiter extends CompletableFuture> { final R route; + final Segment seg; final Timeout requestTimeout; final Object state; volatile boolean cancelled; volatile ScheduledFuture timeoutTask; - Waiter(final R route, final Timeout t, final Object s) { + Waiter(final R route, final Segment seg, final Timeout t, final Object s) { this.route = route; + this.seg = seg; this.requestTimeout = t != null ? t : Timeout.DISABLED; this.state = s; this.cancelled = false; this.timeoutTask = null; } + + @Override + public boolean cancel(final boolean mayInterruptIfRunning) { + this.cancelled = true; + cancelTimeout(this); + this.seg.waiters.remove(this); + dequeueIfDrained(this.seg); + maybeCleanupSegment(this.route, this.seg); + return super.cancel(mayInterruptIfRunning); + } } @Override @@ -193,13 +232,14 @@ public Future> lease( break; } final long now = System.currentTimeMillis(); - if (hit.getExpiryDeadline().isBefore(now) || isPastTtl(hit)) { + if (hit.getExpiryDeadline().isBefore(now) || isPastTtl(hit, now)) { discardAndDecr(hit, CloseMode.GRACEFUL); continue; } break; } if (hit != null) { + fireOnLease(route); if (callback != null) { callback.completed(hit); } @@ -209,6 +249,7 @@ public Future> lease( // 2) Try to allocate new within caps if (tryAllocateOne(route, seg)) { final PoolEntry entry = new PoolEntry<>(route, timeToLive, disposal); + fireOnLease(route); if (callback != null) { callback.completed(entry); } @@ -216,7 +257,7 @@ public Future> lease( } // 3) Enqueue waiter with timeout - final Waiter w = new Waiter(route, requestTimeout, state); + final Waiter w = new Waiter(route, seg, requestTimeout, state); seg.waiters.addLast(w); enqueueIfNeeded(route, seg); @@ -225,6 +266,7 @@ public Future> lease( if (late != null) { if (seg.waiters.remove(w)) { cancelTimeout(w); + fireOnLease(route); if (callback != null) { callback.completed(late); } @@ -238,6 +280,7 @@ public Future> lease( cancelTimeout(other); handedOff = other.complete(late); if (handedOff) { + fireOnLease(other.route); break; } } @@ -253,7 +296,13 @@ public Future> lease( if (callback != null) { w.whenComplete((pe, ex) -> { if (ex != null) { - callback.failed(ex instanceof Exception ? (Exception) ex : new Exception(ex)); + final Exception failure; + if (ex instanceof Exception) { + failure = (Exception) ex; + } else { + failure = new Exception(ex); + } + callback.failed(failure); } else { callback.completed(pe); } @@ -272,13 +321,13 @@ public void release(final PoolEntry entry, final boolean reusable) { final R route = entry.getRoute(); final Segment seg = segments.get(route); if (seg == null) { - // Segment got removed; dispose off-thread and bail. discardEntry(entry, CloseMode.GRACEFUL); + fireOnRelease(route); return; } final long now = System.currentTimeMillis(); - final boolean stillValid = reusable && !isPastTtl(entry) && !entry.getExpiryDeadline().isBefore(now); + final boolean stillValid = reusable && !isPastTtl(entry, now) && !entry.getExpiryDeadline().isBefore(now); if (stillValid) { if (!handOffToCompatibleWaiter(entry, seg)) { @@ -291,6 +340,7 @@ public void release(final PoolEntry entry, final boolean reusable) { } maybeCleanupSegment(route, seg); + fireOnRelease(route); } @Override @@ -319,9 +369,10 @@ public void close(final CloseMode closeMode) { pendingRouteCount.decrementAndGet(); } - // discard available for (final PoolEntry p : seg.available) { - discardEntry(p, closeMode); + if (seg.available.remove(p)) { + discardEntry(p, closeMode); + } } seg.available.clear(); @@ -349,13 +400,14 @@ public void closeIdle(final TimeValue idleTime) { int processed = 0; final int cap = 64; - for (final Iterator> it = seg.available.iterator(); it.hasNext(); ) { - final PoolEntry p = it.next(); + + for (final PoolEntry p : seg.available) { if (p.getUpdated() <= cutoff) { - it.remove(); - discardAndDecr(p, CloseMode.GRACEFUL); - if (++processed == cap) { - break; + if (seg.available.remove(p)) { + discardAndDecr(p, CloseMode.GRACEFUL); + if (++processed == cap) { + break; + } } } } @@ -373,13 +425,14 @@ public void closeExpired() { int processed = 0; final int cap = 64; - for (final Iterator> it = seg.available.iterator(); it.hasNext(); ) { - final PoolEntry p = it.next(); - if (p.getExpiryDeadline().isBefore(now) || isPastTtl(p)) { - it.remove(); - discardAndDecr(p, CloseMode.GRACEFUL); - if (++processed == cap) { - break; + + for (final PoolEntry p : seg.available) { + if (p.getExpiryDeadline().isBefore(now) || isPastTtl(p, now)) { + if (seg.available.remove(p)) { + discardAndDecr(p, CloseMode.GRACEFUL); + if (++processed == cap) { + break; + } } } } @@ -466,11 +519,11 @@ private void ensureOpen() { } } - private boolean isPastTtl(final PoolEntry p) { - if (timeToLive == null || timeToLive.getDuration() < 0) { + private boolean isPastTtl(final PoolEntry entry, final long now) { + if (!hasTtl) { return false; } - return System.currentTimeMillis() - p.getCreated() >= timeToLive.toMilliseconds(); + return now - entry.getCreated() >= ttlMillis; } private void scheduleTimeout(final Waiter w, final Segment seg) { @@ -489,7 +542,6 @@ private void scheduleTimeout(final Waiter w, final Segment seg) { final PoolEntry p = pollAvailable(seg, w.state); if (p != null) { - // Try to hand off that available entry to some other compatible waiter. if (!handOffToCompatibleWaiter(p, seg)) { offerAvailable(seg, p); } @@ -516,11 +568,11 @@ private PoolEntry pollAvailable(final Segment seg, final Object neededStat if (neededState == null) { return seg.available.pollFirst(); } - for (final Iterator> it = seg.available.iterator(); it.hasNext(); ) { - final PoolEntry p = it.next(); + for (final PoolEntry p : seg.available) { if (compatible(neededState, p.getState())) { - it.remove(); - return p; + if (seg.available.remove(p)) { + return p; + } } } return null; @@ -533,6 +585,7 @@ private boolean compatible(final Object needed, final Object have) { private boolean handOffToCompatibleWaiter(final PoolEntry entry, final Segment seg) { final Deque skipped = new ArrayDeque<>(); boolean handedOff = false; + for (; ; ) { final Waiter w = seg.waiters.pollFirst(); if (w == null) { @@ -545,6 +598,7 @@ private boolean handOffToCompatibleWaiter(final PoolEntry entry, final Seg cancelTimeout(w); handedOff = w.complete(entry); if (handedOff) { + fireOnLease(w.route); dequeueIfDrained(seg); break; } @@ -552,7 +606,7 @@ private boolean handOffToCompatibleWaiter(final PoolEntry entry, final Seg skipped.addLast(w); } } - // Restore non-compatible waiters to the head to preserve ordering. + while (!skipped.isEmpty()) { seg.waiters.addFirst(skipped.pollLast()); } @@ -568,10 +622,6 @@ private void discardAndDecr(final PoolEntry p, final CloseMode mode) { discardEntry(p, mode); } - private CloseMode orImmediate(final CloseMode m) { - return m != null ? m : CloseMode.IMMEDIATE; - } - private void maybeCleanupSegment(final R route, final Segment seg) { if (seg.allocated.get() == 0 && seg.available.isEmpty() && seg.waiters.isEmpty()) { segments.remove(route, seg); @@ -617,7 +667,6 @@ private void dequeueIfDrained(final Segment seg) { } private void triggerDrainIfMany() { - // Engage RR only if there is global headroom and many distinct routes pending if (pendingRouteCount.get() < RR_MIN_PENDING_ROUTES) { return; } @@ -627,7 +676,8 @@ private void triggerDrainIfMany() { if (!draining.compareAndSet(false, true)) { return; } - disposer.execute(() -> { + + final Runnable task = () -> { try { serveRoundRobin(RR_BUDGET); } finally { @@ -638,11 +688,23 @@ private void triggerDrainIfMany() { triggerDrainIfMany(); } } - }); + }; + + try { + disposer.execute(task); + } catch (final RejectedExecutionException rejected) { + // Disposer saturated: keep pool correct; do a tiny bounded inline drain. + try { + serveRoundRobin(RR_INLINE_FALLBACK_BUDGET); + } finally { + draining.set(false); + } + } } private void serveRoundRobin(final int budget) { int created = 0; + for (; created < budget; ) { final R route = pendingQueue.poll(); if (route == null) { @@ -660,19 +722,19 @@ private void serveRoundRobin(final int budget) { } if (!tryAllocateOne(route, seg)) { - // No headroom or hit per-route cap. Re-queue for later. pendingQueue.offer(route); continue; } final Waiter w = seg.waiters.pollFirst(); - if (w == null || w.cancelled) { + if (w == null || w.cancelled || w.isDone()) { seg.allocated.decrementAndGet(); totalAllocated.decrementAndGet(); } else { final PoolEntry entry = new PoolEntry<>(route, timeToLive, disposal); cancelTimeout(w); w.complete(entry); + fireOnLease(w.route); created++; } @@ -687,16 +749,24 @@ private void serveRoundRobin(final int budget) { } /** - * Dispose a pool entry's connection asynchronously if possible; under pressure fall back to IMMEDIATE on caller. + * Dispose a pool entry's connection asynchronously if possible; under pressure fall back + * to IMMEDIATE close on the caller. */ private void discardEntry(final PoolEntry p, final CloseMode preferred) { - final CloseMode mode = orImmediate(preferred); - // Pre-flight capacity to avoid exception storms under saturation + final CloseMode mode = preferred != null ? preferred : CloseMode.IMMEDIATE; + + // Keep immediate closes predictable: do it now. + if (mode == CloseMode.IMMEDIATE) { + p.discardConnection(CloseMode.IMMEDIATE); + return; + } + if (disposer.isShutdown()) { p.discardConnection(CloseMode.IMMEDIATE); return; } - final LinkedBlockingQueue q = (LinkedBlockingQueue) disposer.getQueue(); + + final BlockingQueue q = disposer.getQueue(); if (q.remainingCapacity() == 0) { p.discardConnection(CloseMode.IMMEDIATE); return; @@ -714,4 +784,26 @@ private void discardEntry(final PoolEntry p, final CloseMode preferred) { p.discardConnection(CloseMode.IMMEDIATE); } } + + private void fireOnLease(final R route) { + final ConnPoolListener listener = this.connPoolListener; + if (listener != null) { + try { + listener.onLease(route, this); + } catch (final RuntimeException ignore) { + // ignore + } + } + } + + private void fireOnRelease(final R route) { + final ConnPoolListener listener = this.connPoolListener; + if (listener != null) { + try { + listener.onRelease(route, this); + } catch (final RuntimeException ignore) { + // ignore + } + } + } }