From bc36a5b56fb86225dfdf517d3d09c05e5a962b11 Mon Sep 17 00:00:00 2001 From: dataengineervishal Date: Thu, 19 Mar 2026 04:08:42 +0530 Subject: [PATCH 1/3] [FLINK-38815] Mask sensitive values in Pekko configuration logs --- .../rpc/pekko/ActorSystemBootstrapTools.java | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java index 517a48669e688..7a21017025361 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java @@ -19,6 +19,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.RpcOptions; import org.apache.flink.runtime.rpc.RpcSystem; import org.apache.flink.util.NetUtils; @@ -32,7 +33,9 @@ import java.io.IOException; import java.net.BindException; import java.util.Iterator; +import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; /** Tools for starting the Actor Systems used to run the JobManager and TaskManager actors. */ public class ActorSystemBootstrapTools { @@ -241,6 +244,20 @@ public static ActorSystem startLocalActorSystem( } } + /** + * Converts the given Pekko {@link Config} into a flattened {@link Map}. + * + * @param config The Pekko configuration + * @return A map of configuration keys to string values + */ + private static Map toMap(Config config) { + return config.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> String.valueOf(entry.getValue().unwrapped()))); + } + /** * Starts an Actor System with given Pekko config. * @@ -251,7 +268,8 @@ public static ActorSystem startLocalActorSystem( */ private static ActorSystem startActorSystem( Config config, String actorSystemName, Logger logger) { - logger.debug("Using pekko configuration\n {}", config); + Map masked = ConfigurationUtils.hideSensitiveValues(toMap(config)); + logger.debug("Using pekko configuration\n {}", masked); ActorSystem actorSystem = PekkoUtils.createActorSystem(actorSystemName, config); logger.info("Actor system started at {}", PekkoUtils.getAddress(actorSystem)); From 985c9310e966d7e0fd77bd48b346f65e78e9f4d2 Mon Sep 17 00:00:00 2001 From: dataengineervishal Date: Sat, 21 Mar 2026 17:04:42 +0530 Subject: [PATCH 2/3] [FLINK-38815] Guard Pekko configuration logging with debug check --- .../flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java index 7a21017025361..1fa8d0bbdb1ac 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java @@ -268,8 +268,11 @@ private static Map toMap(Config config) { */ private static ActorSystem startActorSystem( Config config, String actorSystemName, Logger logger) { - Map masked = ConfigurationUtils.hideSensitiveValues(toMap(config)); - logger.debug("Using pekko configuration\n {}", masked); + if (logger.isDebugEnabled()) { + logger.debug( + "Using pekko configuration\n {}", + ConfigurationUtils.hideSensitiveValues(toMap(config))); + } ActorSystem actorSystem = PekkoUtils.createActorSystem(actorSystemName, config); logger.info("Actor system started at {}", PekkoUtils.getAddress(actorSystem)); From 907d15d8d917505a0164a1744ee3f396d9f807d0 Mon Sep 17 00:00:00 2001 From: dataengineervishal Date: Wed, 25 Mar 2026 00:44:34 +0530 Subject: [PATCH 3/3] [FLINK-38815] Encapsulate masking logic for Pekko config and add unit test - Move sensitive value masking into toMaskedMap method for better encapsulation - Update logging to use masked configuration directly - Add unit test to verify masking of sensitive values --- .../rpc/pekko/ActorSystemBootstrapTools.java | 17 +++++---- .../pekko/ActorSystemBootstrapToolsTest.java | 36 +++++++++++++++++++ 2 files changed, 44 insertions(+), 9 deletions(-) diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java index 1fa8d0bbdb1ac..4e913dd4100e1 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java @@ -250,12 +250,13 @@ public static ActorSystem startLocalActorSystem( * @param config The Pekko configuration * @return A map of configuration keys to string values */ - private static Map toMap(Config config) { - return config.entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - entry -> String.valueOf(entry.getValue().unwrapped()))); + static Map toMaskedMap(Config config) { + return ConfigurationUtils.hideSensitiveValues( + config.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> String.valueOf(entry.getValue().unwrapped())))); } /** @@ -269,9 +270,7 @@ private static Map toMap(Config config) { private static ActorSystem startActorSystem( Config config, String actorSystemName, Logger logger) { if (logger.isDebugEnabled()) { - logger.debug( - "Using pekko configuration\n {}", - ConfigurationUtils.hideSensitiveValues(toMap(config))); + logger.debug("Using pekko configuration\n {}", toMaskedMap(config)); } ActorSystem actorSystem = PekkoUtils.createActorSystem(actorSystemName, config); diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapToolsTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapToolsTest.java index 5343c4011cf1c..1ebecb27fb925 100644 --- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapToolsTest.java +++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapToolsTest.java @@ -18,11 +18,14 @@ package org.apache.flink.runtime.rpc.pekko; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.CheckedSupplier; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import org.apache.pekko.actor.ActorSystem; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -32,6 +35,7 @@ import java.net.InetAddress; import java.net.ServerSocket; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; @@ -40,6 +44,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for the {@link ActorSystemBootstrapTools}. */ @@ -111,4 +116,35 @@ void testActorSystemInstantiationFailureWhenPortOccupied() throws Exception { portOccupier.close(); } } + + @Test + void testToMaskedMapMasksOnlySensitiveKeys() { + Config config = + ConfigFactory.parseMap( + Map.of( + "pekko.loglevel", "OFF", + "pekko.remote.artery.enabled", "false", + "pekko.remote.classic.netty.ssl.security.key-password", "secret", + "pekko.remote.classic.netty.ssl.security.key-store-password", + "secret2", + "pekko.remote.classic.netty.ssl.security.trust-store-password", + "secret3")); + + Map result = ActorSystemBootstrapTools.toMaskedMap(config); + + // Non-sensitive values should remain the same + assertThat(result.get("pekko.loglevel")).isEqualTo("OFF"); + assertThat(result.get("pekko.remote.artery.enabled")).isEqualTo("false"); + + // Sensitive values should be masked + assertThat(result.get("pekko.remote.classic.netty.ssl.security.key-password")) + .isNotEqualTo("secret") + .isEqualTo(GlobalConfiguration.HIDDEN_CONTENT); + assertThat(result.get("pekko.remote.classic.netty.ssl.security.key-store-password")) + .isNotEqualTo("secret2") + .isEqualTo(GlobalConfiguration.HIDDEN_CONTENT); + assertThat(result.get("pekko.remote.classic.netty.ssl.security.trust-store-password")) + .isNotEqualTo("secret3") + .isEqualTo(GlobalConfiguration.HIDDEN_CONTENT); + } }