Skip to content

Commit

Permalink
Merge pull request #6862 from planetf1/issue6858
Browse files Browse the repository at this point in the history
#6858 downgrade kafka in-loop debug info to DEBUG from INFO - too noisy
  • Loading branch information
planetf1 authored Aug 30, 2022
2 parents 7f3d849 + f4fa1a8 commit a8c7cf6
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void run()
//The connector queue is too big. Wait until the size goes down until
//polling again. If we let the events just accumulate, we will
//eventually run out of memory if the consumer cannot keep up.
log.info("Skipping Kafka polling since unprocessed message queue size {} is greater than {}", nUnprocessedEvents, maxQueueSize);
log.debug("Skipping Kafka polling since unprocessed message queue size {} is greater than {}", nUnprocessedEvents, maxQueueSize);
awaitNextPollingTime();
continue;

Expand All @@ -183,7 +183,7 @@ public void run()
String json = consumerRecord.value();
log.debug("Received message: {}" ,json);
countReceivedMessages++;
log.info("Metrics: receivedMessages: {}", countReceivedMessages);
log.debug("Metrics: receivedMessages: {}", countReceivedMessages);
final KafkaIncomingEvent event = new KafkaIncomingEvent(json, consumerRecord.offset());
final String recordKey=consumerRecord.key();
final String recordValue=consumerRecord.value();
Expand All @@ -194,12 +194,12 @@ public void run()
addUnprocessedEvent(consumerRecord.partition(), consumerRecord.topic(), event);
connector.distributeToListeners(event);
countMessagesToProcess++;
log.info("Metrics: messagesToProcess: {}", countMessagesToProcess);
log.debug("Metrics: messagesToProcess: {}", countMessagesToProcess);
}
catch (Exception error)
{
countMessagesFailedToProcess++;
log.info("Metrics: messagesFailedToProcess: {}", countMessagesFailedToProcess);
log.debug("Metrics: messagesFailedToProcess: {}", countMessagesFailedToProcess);
log.warn("Error distributing inbound event: {}", error.getMessage());

if (auditLog != null)
Expand All @@ -217,7 +217,7 @@ public void run()
{
log.debug("Ignoring message with key: {} and value: {}",recordKey, recordValue);
countIgnoredMessages++;
log.info("Metrics: ignoredMessages: {}", countIgnoredMessages);
log.debug("Metrics: ignoredMessages: {}", countIgnoredMessages);
}

if ( isAutoCommitEnabled) {
Expand All @@ -231,14 +231,14 @@ public void run()
final TopicPartition partition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
currentOffsets.put(partition, new OffsetAndMetadata(consumerRecord.offset() + 1));
countCommits++;
log.info("Metrics: messageCommits: {}", countCommits);
log.debug("Metrics: messageCommits: {}", countCommits);

}
}
}
catch (WakeupException e)
{
log.info("Received wakeup call, proceeding with graceful shutdown");
log.debug("Received wakeup call, proceeding with graceful shutdown");
}
catch (Exception error)
{
Expand Down Expand Up @@ -313,7 +313,7 @@ public void run()
}
consumer = null;
}
log.debug("Exiting main loop for topic {} & cleaning up", this.topicToSubscribe);
log.info("Exiting main loop for topic {} & cleaning up", this.topicToSubscribe);

}

Expand Down Expand Up @@ -358,7 +358,7 @@ private boolean checkForFullyProcessedMessages() {
if (isAutoCommitEnabled) {
return false;
}
log.info("Checking for fully processed messages whose offsets need to be committed");
log.debug("Checking for fully processed messages whose offsets need to be committed");

//Check all the queues to see they have events initial events
//that are fully processed
Expand All @@ -375,7 +375,7 @@ private boolean checkForFullyProcessedMessages() {

if (! commitData.isEmpty()) {
currentOffsets.putAll(commitData);
log.info("Committing: {}", commitData);
log.debug("Committing: {}", commitData);
try {
consumer.commitSync(commitData);
return true;
Expand Down Expand Up @@ -417,15 +417,15 @@ private KafkaIncomingEvent removeFullyProcessedEventsFromBeginningOfQueue(Queue<
//The message at the beginning of the queue has been fully processed. Remove
//it from the queue and repeat the check.
lastRemoved = queue.remove();
log.info("Message with offset {} has been fully processed.",lastRemoved.getOffset() );
log.debug("Message with offset {} has been fully processed.",lastRemoved.getOffset() );
countCommits++;
log.info("Metrics: commits: {}", countCommits);
log.debug("Metrics: commits: {}", countCommits);
}
KafkaIncomingEvent firstEvent = queue.peek();
if (firstEvent != null) {
//Queue is not empty, so we're waiting for the processing of first message in
//the queue to finish
log.info("Waiting for completing of processing of message with offset {}",firstEvent.getOffset());
log.debug("Waiting for completing of processing of message with offset {}",firstEvent.getOffset());
}
return lastRemoved;
}
Expand Down Expand Up @@ -551,12 +551,12 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Check if we need to rewind to handle initial startup case -- but only on first assignment
try {
if (initialPartitionAssignment) {
log.info("Received initial PartitionsAssigned event");
log.debug("Received initial PartitionsAssigned event");

long partitionCount = partitions.size();

if (partitionCount != 1) {
log.info("Received PartitionsAssigned event with {} partitions. This is not supported.",partitionCount);
log.warn("Received PartitionsAssigned event with {} partitions. This is not supported.",partitionCount);
} else {
// there is only one partition, so we can just grab the first one - and we'll try this once only
initialPartitionAssignment = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private void publishEvent(String event) throws ConnectorCheckedException {
long eventRetryCount = 0;

messagePublishRequestCount++;
log.info("Metrics: messagePublishRequestCount {}", messagePublishRequestCount);
log.debug("Metrics: messagePublishRequestCount {}", messagePublishRequestCount);

if (producer == null) {
try {
Expand All @@ -107,11 +107,11 @@ private void publishEvent(String event) throws ConnectorCheckedException {
log.debug("Sending message try {} [0 based] : {}", eventRetryCount,event);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, localServerId, event);
kafkaSendAttemptCount++;
log.info("Metrics: kafkaSendAttemptCount {}", kafkaSendAttemptCount);
log.debug("Metrics: kafkaSendAttemptCount {}", kafkaSendAttemptCount);
producer.send(producerRecord).get();
eventSent = true;
messageSendCount++;
log.info("Metrics: messageSendCount {}", messageSendCount);
log.debug("Metrics: messageSendCount {}", messageSendCount);
} catch (ExecutionException error) {
kafkaSendFailCount++;
log.debug("Metrics: kafkaSendFailCount {}", kafkaSendFailCount);
Expand All @@ -129,7 +129,7 @@ private void publishEvent(String event) throws ConnectorCheckedException {
producer = null;

messageFailedSendCount++;
log.info(messageFailedCountString, messageFailedSendCount);
log.warn(messageFailedCountString, messageFailedSendCount);

throw new ConnectorCheckedException(
KafkaOpenMetadataTopicConnectorErrorCode.ERROR_SENDING_EVENT.getMessageDefinition(
Expand All @@ -141,7 +141,7 @@ private void publishEvent(String event) throws ConnectorCheckedException {
producer.close();
producer = null;
messageFailedSendCount++;
log.info(messageFailedCountString, messageFailedSendCount);
log.warn(messageFailedCountString, messageFailedSendCount);
log.error("Retryable Exception closed producer after {} tries", eventRetryCount);
break;
} else {
Expand Down Expand Up @@ -171,7 +171,7 @@ private void publishEvent(String event) throws ConnectorCheckedException {
}

messageFailedSendCount++;
log.info(messageFailedCountString, messageFailedSendCount);
log.warn(messageFailedCountString, messageFailedSendCount);

throw new ConnectorCheckedException(
KafkaOpenMetadataTopicConnectorErrorCode.ERROR_SENDING_EVENT.getMessageDefinition(
Expand Down Expand Up @@ -225,7 +225,7 @@ public void run() {
}
}
} catch (InterruptedException error) {
log.info("Woken up from sleep ");
log.debug("Woken up from sleep ");
Thread.currentThread().interrupt();
} catch (Exception error) {
log.warn("Bad exception from sending events: {}",error.getMessage());
Expand All @@ -240,7 +240,7 @@ public void run() {
}
}
}
log.debug("Exiting main loop for topic {} & cleaning up", topicName);
log.info("Exiting main loop for topic {} & cleaning up", topicName);

/* producer may have already closed by exception handler in publishEvent */
if (producer != null) {
Expand All @@ -265,8 +265,8 @@ public void run() {
*/
private void putEvent(String newEvent) {
inmemoryPutMessageCount++;
log.info("Metrics: inmemoryPutMessageCount {}", inmemoryPutMessageCount);
log.info("Metrics: sendBufferSize {}", sendBuffer.size());
log.debug("Metrics: inmemoryPutMessageCount {}", inmemoryPutMessageCount);
log.debug("Metrics: sendBufferSize {}", sendBuffer.size());
sendBuffer.add(newEvent);
}

Expand Down

0 comments on commit a8c7cf6

Please sign in to comment.