diff --git a/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java b/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java
index d49744a2f..667519137 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java
@@ -30,20 +30,19 @@
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -62,11 +61,12 @@
import org.apache.hc.core5.util.Timeout;
/**
- * Lock-free, route-segmented connection pool with tiny, conditional round-robin assistance.
+ * Route-segmented connection pool with reduced contention and a small, conditional
+ * round-robin helper for busy multi-route scenarios.
*
*
Per-route state is kept in independent segments. Disposal of connections is offloaded
- * to a bounded executor so slow closes do not block threads leasing on other routes.
- * A minimal round-robin drainer is engaged only when there are many pending routes and
+ * to a bounded executor so slow graceful closes do not block threads leasing on other routes.
+ * A minimal round-robin helper is engaged only when there are many pending routes and
* there is global headroom; it never scans all routes.
*
* @param route key type
@@ -80,14 +80,19 @@
@Experimental
public final class RouteSegmentedConnPool implements ManagedConnPool {
- // Tiny RR assist: only engage when there are many distinct routes waiting and there is headroom.
- private static final int RR_MIN_PENDING_ROUTES = 12;
+ // Engage RR helper only when there are enough pending routes to benefit.
+ private static final int RR_MIN_PENDING_ROUTES = 8;
private static final int RR_BUDGET = 64;
+ private static final int RR_INLINE_FALLBACK_BUDGET = 8;
private final PoolReusePolicy reusePolicy;
private final TimeValue timeToLive;
+ private final long ttlMillis;
+ private final boolean hasTtl;
private final DisposalCallback disposal;
+ private final ConnPoolListener connPoolListener;
+
private final AtomicInteger defaultMaxPerRoute = new AtomicInteger(5);
private final ConcurrentHashMap segments = new ConcurrentHashMap<>();
@@ -97,7 +102,7 @@ public final class RouteSegmentedConnPool implement
private final AtomicBoolean closed = new AtomicBoolean(false);
- private final ScheduledExecutorService timeouts;
+ private final ScheduledThreadPoolExecutor timeouts;
/**
* Dedicated executor for asynchronous, best-effort disposal.
@@ -116,24 +121,46 @@ public RouteSegmentedConnPool(
final TimeValue timeToLive,
final PoolReusePolicy reusePolicy,
final DisposalCallback disposal) {
+ this(defaultMaxPerRoute, maxTotal, timeToLive, reusePolicy, disposal, null);
+ }
+
+ public RouteSegmentedConnPool(
+ final int defaultMaxPerRoute,
+ final int maxTotal,
+ final TimeValue timeToLive,
+ final PoolReusePolicy reusePolicy,
+ final DisposalCallback disposal,
+ final ConnPoolListener connPoolListener) {
this.defaultMaxPerRoute.set(defaultMaxPerRoute > 0 ? defaultMaxPerRoute : 5);
this.maxTotal.set(maxTotal > 0 ? maxTotal : 25);
this.timeToLive = timeToLive != null ? timeToLive : TimeValue.NEG_ONE_MILLISECOND;
+
+ if (this.timeToLive.getDuration() >= 0) {
+ this.ttlMillis = this.timeToLive.toMilliseconds();
+ this.hasTtl = true;
+ } else {
+ this.ttlMillis = -1L;
+ this.hasTtl = false;
+ }
+
this.reusePolicy = reusePolicy != null ? reusePolicy : PoolReusePolicy.LIFO;
this.disposal = Args.notNull(disposal, "disposal");
+ this.connPoolListener = connPoolListener;
final ThreadFactory tf = r -> {
final Thread t = new Thread(r, "seg-pool-timeouts");
t.setDaemon(true);
return t;
};
- this.timeouts = Executors.newSingleThreadScheduledExecutor(tf);
+ this.timeouts = new ScheduledThreadPoolExecutor(1, tf);
+ this.timeouts.setRemoveOnCancelPolicy(true);
// Asynchronous disposer for slow GRACEFUL closes.
final int cores = Math.max(2, Runtime.getRuntime().availableProcessors());
- final int nThreads = Math.min(8, Math.max(2, cores)); // allow up to 8 on bigger boxes
+ final int nThreads = Math.min(8, cores);
final int qsize = 1024;
+
final ThreadFactory df = r -> {
final Thread t = new Thread(r, "seg-pool-disposer");
t.setDaemon(true);
@@ -144,7 +171,7 @@ public RouteSegmentedConnPool(
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(qsize),
df,
- new ThreadPoolExecutor.AbortPolicy()); // but we preflight capacity to avoid exception storms
+ new ThreadPoolExecutor.AbortPolicy());
}
final class Segment {
@@ -161,18 +188,30 @@ int limitPerRoute(final R route) {
final class Waiter extends CompletableFuture> {
final R route;
+ final Segment seg;
final Timeout requestTimeout;
final Object state;
volatile boolean cancelled;
volatile ScheduledFuture> timeoutTask;
- Waiter(final R route, final Timeout t, final Object s) {
+ Waiter(final R route, final Segment seg, final Timeout t, final Object s) {
this.route = route;
+ this.seg = seg;
this.requestTimeout = t != null ? t : Timeout.DISABLED;
this.state = s;
this.cancelled = false;
this.timeoutTask = null;
}
+
+ @Override
+ public boolean cancel(final boolean mayInterruptIfRunning) {
+ this.cancelled = true;
+ cancelTimeout(this);
+ this.seg.waiters.remove(this);
+ dequeueIfDrained(this.seg);
+ maybeCleanupSegment(this.route, this.seg);
+ return super.cancel(mayInterruptIfRunning);
+ }
}
@Override
@@ -193,13 +232,14 @@ public Future> lease(
break;
}
final long now = System.currentTimeMillis();
- if (hit.getExpiryDeadline().isBefore(now) || isPastTtl(hit)) {
+ if (hit.getExpiryDeadline().isBefore(now) || isPastTtl(hit, now)) {
discardAndDecr(hit, CloseMode.GRACEFUL);
continue;
}
break;
}
if (hit != null) {
+ fireOnLease(route);
if (callback != null) {
callback.completed(hit);
}
@@ -209,6 +249,7 @@ public Future> lease(
// 2) Try to allocate new within caps
if (tryAllocateOne(route, seg)) {
final PoolEntry entry = new PoolEntry<>(route, timeToLive, disposal);
+ fireOnLease(route);
if (callback != null) {
callback.completed(entry);
}
@@ -216,7 +257,7 @@ public Future> lease(
}
// 3) Enqueue waiter with timeout
- final Waiter w = new Waiter(route, requestTimeout, state);
+ final Waiter w = new Waiter(route, seg, requestTimeout, state);
seg.waiters.addLast(w);
enqueueIfNeeded(route, seg);
@@ -225,6 +266,7 @@ public Future> lease(
if (late != null) {
if (seg.waiters.remove(w)) {
cancelTimeout(w);
+ fireOnLease(route);
if (callback != null) {
callback.completed(late);
}
@@ -238,6 +280,7 @@ public Future> lease(
cancelTimeout(other);
handedOff = other.complete(late);
if (handedOff) {
+ fireOnLease(other.route);
break;
}
}
@@ -253,7 +296,13 @@ public Future> lease(
if (callback != null) {
w.whenComplete((pe, ex) -> {
if (ex != null) {
- callback.failed(ex instanceof Exception ? (Exception) ex : new Exception(ex));
+ final Exception failure;
+ if (ex instanceof Exception) {
+ failure = (Exception) ex;
+ } else {
+ failure = new Exception(ex);
+ }
+ callback.failed(failure);
} else {
callback.completed(pe);
}
@@ -272,13 +321,13 @@ public void release(final PoolEntry entry, final boolean reusable) {
final R route = entry.getRoute();
final Segment seg = segments.get(route);
if (seg == null) {
- // Segment got removed; dispose off-thread and bail.
discardEntry(entry, CloseMode.GRACEFUL);
+ fireOnRelease(route);
return;
}
final long now = System.currentTimeMillis();
- final boolean stillValid = reusable && !isPastTtl(entry) && !entry.getExpiryDeadline().isBefore(now);
+ final boolean stillValid = reusable && !isPastTtl(entry, now) && !entry.getExpiryDeadline().isBefore(now);
if (stillValid) {
if (!handOffToCompatibleWaiter(entry, seg)) {
@@ -291,6 +340,7 @@ public void release(final PoolEntry entry, final boolean reusable) {
}
maybeCleanupSegment(route, seg);
+ fireOnRelease(route);
}
@Override
@@ -319,9 +369,10 @@ public void close(final CloseMode closeMode) {
pendingRouteCount.decrementAndGet();
}
- // discard available
for (final PoolEntry p : seg.available) {
- discardEntry(p, closeMode);
+ if (seg.available.remove(p)) {
+ discardEntry(p, closeMode);
+ }
}
seg.available.clear();
@@ -349,13 +400,14 @@ public void closeIdle(final TimeValue idleTime) {
int processed = 0;
final int cap = 64;
- for (final Iterator> it = seg.available.iterator(); it.hasNext(); ) {
- final PoolEntry p = it.next();
+
+ for (final PoolEntry p : seg.available) {
if (p.getUpdated() <= cutoff) {
- it.remove();
- discardAndDecr(p, CloseMode.GRACEFUL);
- if (++processed == cap) {
- break;
+ if (seg.available.remove(p)) {
+ discardAndDecr(p, CloseMode.GRACEFUL);
+ if (++processed == cap) {
+ break;
+ }
}
}
}
@@ -373,13 +425,14 @@ public void closeExpired() {
int processed = 0;
final int cap = 64;
- for (final Iterator> it = seg.available.iterator(); it.hasNext(); ) {
- final PoolEntry p = it.next();
- if (p.getExpiryDeadline().isBefore(now) || isPastTtl(p)) {
- it.remove();
- discardAndDecr(p, CloseMode.GRACEFUL);
- if (++processed == cap) {
- break;
+
+ for (final PoolEntry p : seg.available) {
+ if (p.getExpiryDeadline().isBefore(now) || isPastTtl(p, now)) {
+ if (seg.available.remove(p)) {
+ discardAndDecr(p, CloseMode.GRACEFUL);
+ if (++processed == cap) {
+ break;
+ }
}
}
}
@@ -466,11 +519,11 @@ private void ensureOpen() {
}
}
- private boolean isPastTtl(final PoolEntry p) {
- if (timeToLive == null || timeToLive.getDuration() < 0) {
+ private boolean isPastTtl(final PoolEntry entry, final long now) {
+ if (!hasTtl) {
return false;
}
- return System.currentTimeMillis() - p.getCreated() >= timeToLive.toMilliseconds();
+ return now - entry.getCreated() >= ttlMillis;
}
private void scheduleTimeout(final Waiter w, final Segment seg) {
@@ -489,7 +542,6 @@ private void scheduleTimeout(final Waiter w, final Segment seg) {
final PoolEntry p = pollAvailable(seg, w.state);
if (p != null) {
- // Try to hand off that available entry to some other compatible waiter.
if (!handOffToCompatibleWaiter(p, seg)) {
offerAvailable(seg, p);
}
@@ -516,11 +568,11 @@ private PoolEntry pollAvailable(final Segment seg, final Object neededStat
if (neededState == null) {
return seg.available.pollFirst();
}
- for (final Iterator> it = seg.available.iterator(); it.hasNext(); ) {
- final PoolEntry p = it.next();
+ for (final PoolEntry p : seg.available) {
if (compatible(neededState, p.getState())) {
- it.remove();
- return p;
+ if (seg.available.remove(p)) {
+ return p;
+ }
}
}
return null;
@@ -533,6 +585,7 @@ private boolean compatible(final Object needed, final Object have) {
private boolean handOffToCompatibleWaiter(final PoolEntry entry, final Segment seg) {
final Deque skipped = new ArrayDeque<>();
boolean handedOff = false;
+
for (; ; ) {
final Waiter w = seg.waiters.pollFirst();
if (w == null) {
@@ -545,6 +598,7 @@ private boolean handOffToCompatibleWaiter(final PoolEntry entry, final Seg
cancelTimeout(w);
handedOff = w.complete(entry);
if (handedOff) {
+ fireOnLease(w.route);
dequeueIfDrained(seg);
break;
}
@@ -552,7 +606,7 @@ private boolean handOffToCompatibleWaiter(final PoolEntry entry, final Seg
skipped.addLast(w);
}
}
- // Restore non-compatible waiters to the head to preserve ordering.
+
while (!skipped.isEmpty()) {
seg.waiters.addFirst(skipped.pollLast());
}
@@ -568,10 +622,6 @@ private void discardAndDecr(final PoolEntry p, final CloseMode mode) {
discardEntry(p, mode);
}
- private CloseMode orImmediate(final CloseMode m) {
- return m != null ? m : CloseMode.IMMEDIATE;
- }
-
private void maybeCleanupSegment(final R route, final Segment seg) {
if (seg.allocated.get() == 0 && seg.available.isEmpty() && seg.waiters.isEmpty()) {
segments.remove(route, seg);
@@ -617,7 +667,6 @@ private void dequeueIfDrained(final Segment seg) {
}
private void triggerDrainIfMany() {
- // Engage RR only if there is global headroom and many distinct routes pending
if (pendingRouteCount.get() < RR_MIN_PENDING_ROUTES) {
return;
}
@@ -627,7 +676,8 @@ private void triggerDrainIfMany() {
if (!draining.compareAndSet(false, true)) {
return;
}
- disposer.execute(() -> {
+
+ final Runnable task = () -> {
try {
serveRoundRobin(RR_BUDGET);
} finally {
@@ -638,11 +688,23 @@ private void triggerDrainIfMany() {
triggerDrainIfMany();
}
}
- });
+ };
+
+ try {
+ disposer.execute(task);
+ } catch (final RejectedExecutionException rejected) {
+ // Disposer saturated: keep pool correct; do a tiny bounded inline drain.
+ try {
+ serveRoundRobin(RR_INLINE_FALLBACK_BUDGET);
+ } finally {
+ draining.set(false);
+ }
+ }
}
private void serveRoundRobin(final int budget) {
int created = 0;
+
for (; created < budget; ) {
final R route = pendingQueue.poll();
if (route == null) {
@@ -660,19 +722,19 @@ private void serveRoundRobin(final int budget) {
}
if (!tryAllocateOne(route, seg)) {
- // No headroom or hit per-route cap. Re-queue for later.
pendingQueue.offer(route);
continue;
}
final Waiter w = seg.waiters.pollFirst();
- if (w == null || w.cancelled) {
+ if (w == null || w.cancelled || w.isDone()) {
seg.allocated.decrementAndGet();
totalAllocated.decrementAndGet();
} else {
final PoolEntry entry = new PoolEntry<>(route, timeToLive, disposal);
cancelTimeout(w);
w.complete(entry);
+ fireOnLease(w.route);
created++;
}
@@ -687,16 +749,24 @@ private void serveRoundRobin(final int budget) {
}
/**
- * Dispose a pool entry's connection asynchronously if possible; under pressure fall back to IMMEDIATE on caller.
+ * Dispose a pool entry's connection asynchronously if possible; under pressure fall back
+ * to IMMEDIATE close on the caller.
*/
private void discardEntry(final PoolEntry p, final CloseMode preferred) {
- final CloseMode mode = orImmediate(preferred);
- // Pre-flight capacity to avoid exception storms under saturation
+ final CloseMode mode = preferred != null ? preferred : CloseMode.IMMEDIATE;
+
+ // Keep immediate closes predictable: do it now.
+ if (mode == CloseMode.IMMEDIATE) {
+ p.discardConnection(CloseMode.IMMEDIATE);
+ return;
+ }
+
if (disposer.isShutdown()) {
p.discardConnection(CloseMode.IMMEDIATE);
return;
}
- final LinkedBlockingQueue q = (LinkedBlockingQueue) disposer.getQueue();
+
+ final BlockingQueue q = disposer.getQueue();
if (q.remainingCapacity() == 0) {
p.discardConnection(CloseMode.IMMEDIATE);
return;
@@ -714,4 +784,26 @@ private void discardEntry(final PoolEntry p, final CloseMode preferred) {
p.discardConnection(CloseMode.IMMEDIATE);
}
}
+
+ private void fireOnLease(final R route) {
+ final ConnPoolListener listener = this.connPoolListener;
+ if (listener != null) {
+ try {
+ listener.onLease(route, this);
+ } catch (final RuntimeException ignore) {
+ // ignore
+ }
+ }
+ }
+
+ private void fireOnRelease(final R route) {
+ final ConnPoolListener listener = this.connPoolListener;
+ if (listener != null) {
+ try {
+ listener.onRelease(route, this);
+ } catch (final RuntimeException ignore) {
+ // ignore
+ }
+ }
+ }
}