From 81bde487f5695634e4a94a2da42fe79759fd9780 Mon Sep 17 00:00:00 2001 From: Balazs Meszaros Date: Wed, 10 Dec 2025 16:05:37 +0100 Subject: [PATCH] HBASE-29764 Make client connection headers accessible inside co-processors --- .../hbase/coprocessor/CoprocessorHost.java | 66 +++++++++---------- .../hbase/coprocessor/ObserverContext.java | 20 ++++-- .../coprocessor/ObserverContextImpl.java | 22 ++++--- .../coprocessor/ObserverRpcCallContext.java | 44 +++++++++++++ .../ObserverRpcCallContextImpl.java | 44 +++++++++++++ .../org/apache/hadoop/hbase/ipc/RpcCall.java | 2 + .../hadoop/hbase/ipc/RpcCoprocessorHost.java | 12 ++-- .../apache/hadoop/hbase/ipc/RpcServer.java | 4 ++ .../hbase/master/MasterCoprocessorHost.java | 12 ++-- .../regionserver/RegionCoprocessorHost.java | 12 ++-- .../RegionServerCoprocessorHost.java | 4 +- .../security/access/AccessController.java | 11 ++-- .../SnapshotScannerHDFSAclController.java | 11 ++-- 13 files changed, 187 insertions(+), 77 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverRpcCallContext.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverRpcCallContextImpl.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java index 137fe3b061df..ecce28fc0994 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java @@ -27,7 +27,6 @@ import java.util.Set; import java.util.TreeSet; import java.util.UUID; -import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; @@ -477,19 +476,18 @@ protected void handleCoprocessorThrowable(final E env, final Throwable e) throws } } - /** - * Used to limit legacy handling to once per Coprocessor class per classloader. - */ - private static final Set> legacyWarning = - new ConcurrentSkipListSet<>(new Comparator>() { - @Override - public int compare(Class c1, Class c2) { - if (c1.equals(c2)) { - return 0; - } - return c1.getName().compareTo(c2.getName()); - } - }); + protected static Optional createObserverRpcCallContext() { + return createObserverRpcCallContext(RpcServer.getRequestUser().orElse(null)); + } + + protected static Optional createObserverRpcCallContext(User user) { + if (user == null) { + /* not in RPC context */ + return Optional.empty(); + } else { + return Optional.of(new ObserverRpcCallContextImpl(user, RpcServer.getConnectionAttributes())); + } + } /** * Implementations defined function to get an observer of type {@code O} from a coprocessor of @@ -506,19 +504,17 @@ private abstract class ObserverOperation extends ObserverContextImpl { ObserverGetter observerGetter; ObserverOperation(ObserverGetter observerGetter) { - this(observerGetter, null); - } - - ObserverOperation(ObserverGetter observerGetter, User user) { - this(observerGetter, user, false); + this(observerGetter, createObserverRpcCallContext()); } - ObserverOperation(ObserverGetter observerGetter, boolean bypassable) { - this(observerGetter, null, bypassable); + ObserverOperation(ObserverGetter observerGetter, + Optional rpcCallContext) { + this(observerGetter, rpcCallContext, false); } - ObserverOperation(ObserverGetter observerGetter, User user, boolean bypassable) { - super(user != null ? user : RpcServer.getRequestUser().orElse(null), bypassable); + ObserverOperation(ObserverGetter observerGetter, + Optional rpcCallContext, boolean bypassable) { + super(rpcCallContext, bypassable); this.observerGetter = observerGetter; } @@ -538,13 +534,14 @@ public ObserverOperationWithoutResult(ObserverGetter observerGetter) { super(observerGetter); } - public ObserverOperationWithoutResult(ObserverGetter observerGetter, User user) { - super(observerGetter, user); + public ObserverOperationWithoutResult(ObserverGetter observerGetter, + Optional rpcCallContext) { + super(observerGetter, rpcCallContext); } - public ObserverOperationWithoutResult(ObserverGetter observerGetter, User user, - boolean bypassable) { - super(observerGetter, user, bypassable); + public ObserverOperationWithoutResult(ObserverGetter observerGetter, + Optional rpcCallContext, boolean bypassable) { + super(observerGetter, rpcCallContext, bypassable); } /** @@ -572,16 +569,17 @@ public ObserverOperationWithResult(ObserverGetter observerGetter, R result public ObserverOperationWithResult(ObserverGetter observerGetter, R result, boolean bypassable) { - this(observerGetter, result, null, bypassable); + this(observerGetter, result, createObserverRpcCallContext(), bypassable); } - public ObserverOperationWithResult(ObserverGetter observerGetter, R result, User user) { - this(observerGetter, result, user, false); + public ObserverOperationWithResult(ObserverGetter observerGetter, R result, + Optional rpcCallContext) { + this(observerGetter, result, rpcCallContext, false); } - private ObserverOperationWithResult(ObserverGetter observerGetter, R result, User user, - boolean bypassable) { - super(observerGetter, user, bypassable); + private ObserverOperationWithResult(ObserverGetter observerGetter, R result, + Optional rpcCallContext, boolean bypassable) { + super(observerGetter, rpcCallContext, bypassable); this.result = result; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverContext.java index c0fd791bcef8..e18f9aee6ef2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverContext.java @@ -70,10 +70,20 @@ public interface ObserverContext { void bypass(); /** - * Returns the active user for the coprocessor call. If an explicit {@code User} instance was - * provided to the constructor, that will be returned, otherwise if we are in the context of an - * RPC call, the remote user is used. May not be present if the execution is outside of an RPC - * context. + * Returns the {@link ObserverRpcCallContext} of an RPC call. May not be present if the execution + * is outside an RPC context. + * @return the context. */ - Optional getCaller(); + Optional getRpcCallContext(); + + /** + * Returns the active user for the coprocessor call. May not be present if the execution is + * outside an RPC context. + * @return the {@link User}. + * @deprecated will be removed in 4.0.0. Use {@link #getRpcCallContext()} instead. + */ + @Deprecated + default Optional getCaller() { + return getRpcCallContext().map(ObserverRpcCallContext::getUser); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverContextImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverContextImpl.java index a3a4a93005c0..cc9d7f499a0b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverContextImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverContextImpl.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.coprocessor; +import java.util.Map; +import java.util.Objects; import java.util.Optional; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.ipc.RpcServer; @@ -35,14 +37,10 @@ public class ObserverContextImpl implements Ob * Is this operation bypassable? */ private final boolean bypassable; - private final User caller; + private final Optional rpcCallContext; - public ObserverContextImpl(User caller) { - this(caller, false); - } - - public ObserverContextImpl(User caller, boolean bypassable) { - this.caller = caller; + public ObserverContextImpl(Optional rpcCallContext, boolean bypassable) { + this.rpcCallContext = Objects.requireNonNull(rpcCallContext, "rpcCallContext cannot be null."); this.bypassable = bypassable; } @@ -83,8 +81,8 @@ public boolean shouldBypass() { } @Override - public Optional getCaller() { - return Optional.ofNullable(caller); + public Optional getRpcCallContext() { + return rpcCallContext; } /** @@ -98,7 +96,11 @@ public Optional getCaller() { @Deprecated // TODO: Remove this method, ObserverContext should not depend on RpcServer public static ObserverContext createAndPrepare(E env) { - ObserverContextImpl ctx = new ObserverContextImpl<>(RpcServer.getRequestUser().orElse(null)); + Optional user = RpcServer.getRequestUser(); + Optional rpcCallContext = + user.map(value -> new ObserverRpcCallContextImpl(value, Map.of())); + + ObserverContextImpl ctx = new ObserverContextImpl<>(rpcCallContext, false); ctx.prepare(env); return ctx; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverRpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverRpcCallContext.java new file mode 100644 index 000000000000..c5dc11969b40 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverRpcCallContext.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import java.util.Map; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.security.User; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +/** + * RPC Call parameters for coprocessor context. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +@InterfaceStability.Evolving +public interface ObserverRpcCallContext { + /** + * Returns the active user for the coprocessor call. + * @return the {@link User}, it must not be {@code null}. + */ + User getUser(); + + /** + * Returns the connection attributes for the coprocessor call. These parameters are passed by the + * client through {@code ConnectionHeader} protobuf. + * @return the attributes, it must not be {@code null}. + */ + Map getAttributes(); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverRpcCallContextImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverRpcCallContextImpl.java new file mode 100644 index 000000000000..1979474922df --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverRpcCallContextImpl.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import java.util.Map; +import java.util.Objects; +import org.apache.hadoop.hbase.security.User; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class ObserverRpcCallContextImpl implements ObserverRpcCallContext { + private final User user; + private final Map attributes; + + public ObserverRpcCallContextImpl(User user, Map attributes) { + this.user = Objects.requireNonNull(user, "user must not be null."); + this.attributes = Objects.requireNonNull(attributes, "attributes must not be null."); + } + + @Override + public User getUser() { + return user; + } + + @Override + public Map getAttributes() { + return attributes; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java index ff3bae19e296..670edb73cb58 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java @@ -19,9 +19,11 @@ import java.io.IOException; import java.util.Map; +import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ExtendedCellScanner; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.security.User; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCoprocessorHost.java index 2ced23119890..c844348a50a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCoprocessorHost.java @@ -84,19 +84,19 @@ public RpcCoprocessor checkAndGetInstance(Class implClass) abstract class RpcObserverOperation extends ObserverOperationWithoutResult { public RpcObserverOperation() { - super(rpcObserverGetter); + this(null); } - public RpcObserverOperation(boolean bypassable) { - this(null, bypassable); + public RpcObserverOperation(User user) { + this(user, false); } - public RpcObserverOperation(User user) { - super(rpcObserverGetter, user); + public RpcObserverOperation(boolean bypassable) { + this(null, bypassable); } public RpcObserverOperation(User user, boolean bypassable) { - super(rpcObserverGetter, user, bypassable); + super(rpcObserverGetter, createObserverRpcCallContext(user), bypassable); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 6dfb5bfb4113..f308f6be4dcf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -799,6 +799,10 @@ public static Optional getRequestUser() { return ctx.isPresent() ? ctx.get().getRequestUser() : Optional.empty(); } + public static Map getConnectionAttributes() { + return getCurrentCall().map(RpcCall::getConnectionAttributes).orElse(Map.of()); + } + /** * The number of open RPC conections * @return the number of open rpc connections diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java index e3d269973f8f..794f8e94eea8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java @@ -191,19 +191,19 @@ public MasterCoprocessor checkAndGetInstance(Class implClass) abstract class MasterObserverOperation extends ObserverOperationWithoutResult { public MasterObserverOperation() { - super(masterObserverGetter); + this(null); } - public MasterObserverOperation(boolean bypassable) { - this(null, bypassable); + public MasterObserverOperation(User user) { + this(user, false); } - public MasterObserverOperation(User user) { - super(masterObserverGetter, user); + public MasterObserverOperation(boolean bypassable) { + this(null, bypassable); } public MasterObserverOperation(User user, boolean bypassable) { - super(masterObserverGetter, user, bypassable); + super(masterObserverGetter, createObserverRpcCallContext(user), bypassable); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index b300496e1d7c..cf2742ccb67a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -493,26 +493,26 @@ public RegionCoprocessor checkAndGetInstance(Class implClass) abstract class RegionObserverOperationWithoutResult extends ObserverOperationWithoutResult { public RegionObserverOperationWithoutResult() { - super(regionObserverGetter); + this(null); } public RegionObserverOperationWithoutResult(User user) { - super(regionObserverGetter, user); + this(user, false); } public RegionObserverOperationWithoutResult(boolean bypassable) { - super(regionObserverGetter, null, bypassable); + this(null, bypassable); } public RegionObserverOperationWithoutResult(User user, boolean bypassable) { - super(regionObserverGetter, user, bypassable); + super(regionObserverGetter, createObserverRpcCallContext(user), bypassable); } } abstract class BulkLoadObserverOperation extends ObserverOperationWithoutResult { public BulkLoadObserverOperation(User user) { - super(RegionCoprocessor::getBulkLoadObserver, user); + super(RegionCoprocessor::getBulkLoadObserver, createObserverRpcCallContext(user)); } } @@ -678,7 +678,7 @@ public InternalScanner preCompact(final HStore store, final InternalScanner scan return defaultResult; } return execOperationWithResult(new ObserverOperationWithResult( - regionObserverGetter, defaultResult, user) { + regionObserverGetter, defaultResult, createObserverRpcCallContext(user)) { @Override public InternalScanner call(RegionObserver observer) throws IOException { InternalScanner scanner = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java index 06eabdad67d4..b2271a3d7c86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java @@ -102,11 +102,11 @@ public RegionServerCoprocessor checkAndGetInstance(Class implClass) abstract class RegionServerObserverOperation extends ObserverOperationWithoutResult { public RegionServerObserverOperation() { - super(rsObserverGetter); + this(null); } public RegionServerObserverOperation(User user) { - super(rsObserverGetter, user); + super(rsObserverGetter, createObserverRpcCallContext(user)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index d03670543438..d81bb3189fe6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -81,6 +81,7 @@ import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.MasterObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.ObserverRpcCallContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; @@ -2342,11 +2343,13 @@ public void preSwitchExceedThrottleQuota(ObserverContext ctx) throws IOException { // for non-rpc handling, fallback to system user - Optional optionalUser = ctx.getCaller(); - if (optionalUser.isPresent()) { - return optionalUser.get(); + Optional rpcCallContext = ctx.getRpcCallContext(); + + if (rpcCallContext.isPresent()) { + return rpcCallContext.get().getUser(); + } else { + return userProvider.getCurrent(); } - return userProvider.getCurrent(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclController.java index f4e4a4a9ffb3..860fd311e902 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclController.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.MasterObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.ObserverRpcCallContext; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; @@ -579,11 +580,13 @@ private boolean needHandleTableHdfsAcl(TableDescriptor tableDescriptor, String o private User getActiveUser(ObserverContext ctx) throws IOException { // for non-rpc handling, fallback to system user - Optional optionalUser = ctx.getCaller(); - if (optionalUser.isPresent()) { - return optionalUser.get(); + Optional rpcCallContext = ctx.getRpcCallContext(); + + if (rpcCallContext.isPresent()) { + return rpcCallContext.get().getUser(); + } else { + return userProvider.getCurrent(); } - return userProvider.getCurrent(); } /**