diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/PartitionSizeAnomalyFinder.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/PartitionSizeAnomalyFinder.java index fe5ef303e..048ffddfb 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/PartitionSizeAnomalyFinder.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/PartitionSizeAnomalyFinder.java @@ -108,9 +108,9 @@ public void configure(Map configs) { String topicExcludedFromCheck = (String) configs.get(TOPIC_EXCLUDED_FROM_PARTITION_SIZE_CHECK); _topicExcludedFromCheck = Pattern.compile(topicExcludedFromCheck == null ? DEFAULT_TOPIC_EXCLUDED_FROM_PARTITION_SIZE_CHECK : topicExcludedFromCheck); - Integer partitionSizeThreshold = (Integer) configs.get(SELF_HEALING_PARTITION_SIZE_THRESHOLD_MB_CONFIG); + String partitionSizeThreshold = (String) configs.get(SELF_HEALING_PARTITION_SIZE_THRESHOLD_MB_CONFIG); _partitionSizeThresholdInMb = partitionSizeThreshold == null ? DEFAULT_SELF_HEALING_PARTITION_SIZE_THRESHOLD_MB - : partitionSizeThreshold; + : Integer.parseInt(partitionSizeThreshold); String topicPartitionSizeAnomalyClass = (String) configs.get(TOPIC_PARTITION_SIZE_ANOMALY_CLASS_CONFIG); if (topicPartitionSizeAnomalyClass == null) { _topicPartitionSizeAnomalyClass = DEFAULT_TOPIC_PARTITION_SIZE_ANOMALY_CLASS;