Skip to content

Commit

Permalink
Comment Review 2 : Better Java Docs, method name of spi, Configurable…
Browse files Browse the repository at this point in the history
… retry strategy, netty write call ensurity.
  • Loading branch information
sahusanket committed Nov 5, 2024
1 parent 394a03d commit fe12ebb
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.cdap.cdap.messaging.subscriber.AbstractMessagingSubscriberService;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.security.authorization.AccessControllerInstantiator;
import io.cdap.cdap.security.spi.authorization.AccessControllerSpi;
import io.cdap.cdap.security.spi.authorization.AuditLogContext;
import io.cdap.cdap.security.spi.authorization.AuditLoggerSpi;
import io.cdap.cdap.spi.data.StructuredTableContext;
Expand All @@ -40,10 +41,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -71,7 +71,7 @@ public class AuditLogSubscriberService extends AbstractMessagingSubscriberServic
cConf.getInt(Constants.AuditLogging.AUDIT_LOG_FETCH_SIZE),
cConf.getInt(TxConstants.Manager.CFG_TX_TIMEOUT),
cConf.getInt(Constants.AuditLogging.AUDIT_LOG_POLL_DELAY_MILLIS),
RetryStrategies.exponentialDelay(10, 200, TimeUnit.MILLISECONDS),
RetryStrategies.fromConfiguration(cConf, Constants.AuditLogging.AUDIT_LOG_WRITER_RETRY_PREFIX),
metricsCollectionService.getContext(ImmutableMap.of(
Constants.Metrics.Tag.COMPONENT, Constants.Service.MASTER_SERVICES,
Constants.Metrics.Tag.INSTANCE_ID, "0",
Expand All @@ -93,8 +93,8 @@ protected TransactionRunner getTransactionRunner() {
}

/**
* Loads last persisted message id. This method will be called from a transaction. The returned
* message id will be used as the starting message id (exclusive) for the first fetch.
* Loads last persisted message id within a transaction for the topic of AUDIT LOG EVENTS.
* The returned message id will be used as the starting message id (exclusive) for the first fetch.
*/
@Nullable
@Override
Expand All @@ -105,8 +105,8 @@ protected String loadMessageId(StructuredTableContext context) throws Exception
}

/**
* Persists the given message id. This method will be called from a transaction, which is the same
* transaction for the call to {@link #processMessages(StructuredTableContext, Iterator)}.
* Persists the given message id after the Audit Log events are processed successfully within `processMessages`
* method. This also runs in the same transaction as `processMessages`
*/
@Override
protected void storeMessageId(StructuredTableContext context, String messageId) throws Exception {
Expand All @@ -116,31 +116,30 @@ protected void storeMessageId(StructuredTableContext context, String messageId)
}

/**
* Processes the give list of messages. This method will be called from the same transaction as
* the {@link #storeMessageId(StructuredTableContext, String)} call. If {@link Exception} is
* raised from this method, the messages as provided through the {@code messages} parameter will
* be replayed in the next call.
* This does the actual processing of Audit Log events fetched from the messaging topic.
* If the audit event is required to be published, we add it to a queue and then each event is published via
* {@link AccessControllerSpi} acquired through {@link AccessControllerInstantiator}
* It If {@link Exception} is raised from this method, the messages as provided through the {@code messages} parameter
* will be replayed in the next call.
*/
@Override
protected void processMessages(StructuredTableContext structuredTableContext,
Iterator<ImmutablePair<String, AuditLogContext>> messages) throws Exception {

Queue<AuditLogContext> auditLogContextQueue = new LinkedBlockingDeque<>();
Queue<AuditLogContext> auditLogContextQueue = new ArrayDeque<>();

int count = 0 ;
while (messages.hasNext()) {
ImmutablePair<String, AuditLogContext> next = messages.next();
AuditLogContext auditLogContext = next.getSecond();
if (auditLogContext.isAuditLoggingRequired()){
auditLogContextQueue.add(auditLogContext);
}
count++;
}

if (!auditLogContextQueue.isEmpty()) {
LOG.debug("Publishing a queue of Audit Log events of size {} events.", auditLogContextQueue.size());
AuditLoggerSpi.PublishStatus publishStatus =
this.accessControllerInstantiator.get().publish(auditLogContextQueue);
this.accessControllerInstantiator.get().publishAuditLogs(auditLogContextQueue);
// TODO : This logic can change based on how Auth Ext publishes a batch.
if (publishStatus.equals(AuditLoggerSpi.PublishStatus.UNSUCCESSFUL)) {
throw new Exception("The publishing of audit log events Failed.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class MessagingAuditLogWriter implements AuditLogWriter {
public MessagingAuditLogWriter(CConfiguration cConf, MessagingService messagingService) {
this.topic = NamespaceId.SYSTEM.topic(cConf.get(Constants.AuditLogging.AUDIT_LOG_EVENT_TOPIC));
this.messagingService = messagingService;
this.retryStrategy = RetryStrategies.exponentialDelay(10, 30, TimeUnit.MILLISECONDS);
this.retryStrategy = RetryStrategies.fromConfiguration(cConf, Constants.AuditLogging.AUDIT_LOG_WRITER_RETRY_PREFIX);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2557,5 +2557,7 @@ public static final class AuditLogging {
public static final String AUDIT_LOG_CONSUMER_WRITER_SUBSCRIBER = "auditlog.consumer.publisher";
public static final String AUDIT_LOG_WRITER_SUBSCRIBER = "auditlog.subscriber";

public static final String AUDIT_LOG_WRITER_RETRY_PREFIX = "system.auditlog";

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
SecurityRequestContext.setUserIp(currentUserIp);
}

ctx.fireChannelRead(msg);
try {
ctx.fireChannelRead(msg);
} finally {
ctx.writeAndFlush(msg);
}
}

/**
Expand Down
41 changes: 41 additions & 0 deletions cdap-common/src/main/resources/cdap-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4442,6 +4442,47 @@
</description>
</property>

<property>
<name>system.auditlog.retry.policy.base.delay.ms</name>
<value>100</value>
<description>
The base delay between retries in milliseconds
</description>
</property>

<property>
<name>system.auditlog.retry.policy.max.delay.ms</name>
<value>2000</value>
<description>
The maximum delay between retries in milliseconds
</description>
</property>

<property>
<name>system.auditlog.retry.policy.max.retries</name>
<value>100</value>
<description>
The maximum number of retries to attempt before aborting
</description>
</property>

<property>
<name>system.auditlog.retry.policy.max.time.secs</name>
<value>2147483647</value>
<description>
The maximum elapsed time in seconds before retries are aborted
</description>
</property>

<property>
<name>system.auditlog.retry.policy.type</name>
<value>exponential.backoff</value>
<description>
The type of retry policy for workers. Allowed options: "none",
"fixed.delay", or "exponential.backoff".
</description>
</property>

<!-- Router Configuration -->

<property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ enum PublishStatus {
* If the auth ext is able to publish a batch all together vs needs to publish one by one.
* @return {@link PublishStatus}
*/
PublishStatus publish(Queue<AuditLogContext> auditLogContexts);
PublishStatus publishAuditLogs(Queue<AuditLogContext> auditLogContexts);

}
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ private AuthorizationResponse createAuthResultFromUnauthorizedExp(UnauthorizedEx
* @return {@link PublishStatus}
*/
@Override
public PublishStatus publish(Queue<AuditLogContext> auditLogContexts) {
public PublishStatus publishAuditLogs(Queue<AuditLogContext> auditLogContexts) {
return PublishStatus.PUBLISHED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public AuthorizedResult<Set<GrantedPermission>> listGrants(Principal principal,
* @return {@link PublishStatus}
*/
@Override
public PublishStatus publish(Queue<AuditLogContext> auditLogContexts) {
public PublishStatus publishAuditLogs(Queue<AuditLogContext> auditLogContexts) {
//no-op
return PublishStatus.PUBLISHED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ public AuthorizedResult<Set<GrantedPermission>> listGrants(Principal caller, Pri
}

@Override
public PublishStatus publish(Queue<AuditLogContext> auditLogContexts) {
public PublishStatus publishAuditLogs(Queue<AuditLogContext> auditLogContexts) {
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ private boolean isParent(EntityId guessingParent, Map<EntityType, String> guessi
* @return {@link PublishStatus}
*/
@Override
public PublishStatus publish(Queue<AuditLogContext> auditLogContexts) {
public PublishStatus publishAuditLogs(Queue<AuditLogContext> auditLogContexts) {
return PublishStatus.PUBLISHED;
}

Expand Down

0 comments on commit fe12ebb

Please sign in to comment.