Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.util.NetUtils;
Expand All @@ -32,7 +33,9 @@
import java.io.IOException;
import java.net.BindException;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/** Tools for starting the Actor Systems used to run the JobManager and TaskManager actors. */
public class ActorSystemBootstrapTools {
Expand Down Expand Up @@ -241,6 +244,20 @@ public static ActorSystem startLocalActorSystem(
}
}

/**
* Converts the given Pekko {@link Config} into a flattened {@link Map}.
*
* @param config The Pekko configuration
* @return A map of configuration keys to string values
*/
private static Map<String, String> toMap(Config config) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could have the ConfigurationUtils.hideSensitiveValues in this method , then you could add a unit test to ensure the config comes out as expected.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, thanks! I've moved the masking logic into the method to encapsulate it and added a unit test to verify that sensitive values are properly masked.

Updated the PR, could you please have a look?

return config.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> String.valueOf(entry.getValue().unwrapped())));
}

/**
* Starts an Actor System with given Pekko config.
*
Expand All @@ -251,7 +268,11 @@ public static ActorSystem startLocalActorSystem(
*/
private static ActorSystem startActorSystem(
Config config, String actorSystemName, Logger logger) {
logger.debug("Using pekko configuration\n {}", config);
if (logger.isDebugEnabled()) {
logger.debug(
"Using pekko configuration\n {}",
ConfigurationUtils.hideSensitiveValues(toMap(config)));
}
ActorSystem actorSystem = PekkoUtils.createActorSystem(actorSystemName, config);

logger.info("Actor system started at {}", PekkoUtils.getAddress(actorSystem));
Expand Down