diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/AuditLogSubscriberService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/AuditLogSubscriberService.java index 1a78a7fb8aa..c21f14dc03c 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/AuditLogSubscriberService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/AuditLogSubscriberService.java @@ -32,6 +32,7 @@ import io.cdap.cdap.messaging.subscriber.AbstractMessagingSubscriberService; import io.cdap.cdap.proto.id.NamespaceId; import io.cdap.cdap.security.authorization.AccessControllerInstantiator; +import io.cdap.cdap.security.spi.authorization.AccessControllerSpi; import io.cdap.cdap.security.spi.authorization.AuditLogContext; import io.cdap.cdap.security.spi.authorization.AuditLoggerSpi; import io.cdap.cdap.spi.data.StructuredTableContext; @@ -40,10 +41,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayDeque; import java.util.Iterator; import java.util.Queue; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; /** @@ -71,7 +71,7 @@ public class AuditLogSubscriberService extends AbstractMessagingSubscriberServic cConf.getInt(Constants.AuditLogging.AUDIT_LOG_FETCH_SIZE), cConf.getInt(TxConstants.Manager.CFG_TX_TIMEOUT), cConf.getInt(Constants.AuditLogging.AUDIT_LOG_POLL_DELAY_MILLIS), - RetryStrategies.exponentialDelay(10, 200, TimeUnit.MILLISECONDS), + RetryStrategies.fromConfiguration(cConf, Constants.AuditLogging.AUDIT_LOG_WRITER_RETRY_PREFIX), metricsCollectionService.getContext(ImmutableMap.of( Constants.Metrics.Tag.COMPONENT, Constants.Service.MASTER_SERVICES, Constants.Metrics.Tag.INSTANCE_ID, "0", @@ -93,8 +93,8 @@ protected TransactionRunner getTransactionRunner() { } /** - * Loads last persisted message id. This method will be called from a transaction. The returned - * message id will be used as the starting message id (exclusive) for the first fetch. + * Loads last persisted message id within a transaction for the topic of AUDIT LOG EVENTS. + * The returned message id will be used as the starting message id (exclusive) for the first fetch. */ @Nullable @Override @@ -105,8 +105,8 @@ protected String loadMessageId(StructuredTableContext context) throws Exception } /** - * Persists the given message id. This method will be called from a transaction, which is the same - * transaction for the call to {@link #processMessages(StructuredTableContext, Iterator)}. + * Persists the given message id after the Audit Log events are processed successfully within `processMessages` + * method. This also runs in the same transaction as `processMessages` */ @Override protected void storeMessageId(StructuredTableContext context, String messageId) throws Exception { @@ -116,31 +116,30 @@ protected void storeMessageId(StructuredTableContext context, String messageId) } /** - * Processes the give list of messages. This method will be called from the same transaction as - * the {@link #storeMessageId(StructuredTableContext, String)} call. If {@link Exception} is - * raised from this method, the messages as provided through the {@code messages} parameter will - * be replayed in the next call. + * This does the actual processing of Audit Log events fetched from the messaging topic. + * If the audit event is required to be published, we add it to a queue and then each event is published via + * {@link AccessControllerSpi} acquired through {@link AccessControllerInstantiator} + * It If {@link Exception} is raised from this method, the messages as provided through the {@code messages} parameter + * will be replayed in the next call. */ @Override protected void processMessages(StructuredTableContext structuredTableContext, Iterator> messages) throws Exception { - Queue auditLogContextQueue = new LinkedBlockingDeque<>(); + Queue auditLogContextQueue = new ArrayDeque<>(); - int count = 0 ; while (messages.hasNext()) { ImmutablePair next = messages.next(); AuditLogContext auditLogContext = next.getSecond(); if (auditLogContext.isAuditLoggingRequired()){ auditLogContextQueue.add(auditLogContext); } - count++; } if (!auditLogContextQueue.isEmpty()) { LOG.debug("Publishing a queue of Audit Log events of size {} events.", auditLogContextQueue.size()); AuditLoggerSpi.PublishStatus publishStatus = - this.accessControllerInstantiator.get().publish(auditLogContextQueue); + this.accessControllerInstantiator.get().publishAuditLogs(auditLogContextQueue); // TODO : This logic can change based on how Auth Ext publishes a batch. if (publishStatus.equals(AuditLoggerSpi.PublishStatus.UNSUCCESSFUL)) { throw new Exception("The publishing of audit log events Failed."); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/MessagingAuditLogWriter.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/MessagingAuditLogWriter.java index ecaee462827..0f014b47fbf 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/MessagingAuditLogWriter.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/security/auth/MessagingAuditLogWriter.java @@ -59,7 +59,7 @@ public class MessagingAuditLogWriter implements AuditLogWriter { public MessagingAuditLogWriter(CConfiguration cConf, MessagingService messagingService) { this.topic = NamespaceId.SYSTEM.topic(cConf.get(Constants.AuditLogging.AUDIT_LOG_EVENT_TOPIC)); this.messagingService = messagingService; - this.retryStrategy = RetryStrategies.exponentialDelay(10, 30, TimeUnit.MILLISECONDS); + this.retryStrategy = RetryStrategies.fromConfiguration(cConf, Constants.AuditLogging.AUDIT_LOG_WRITER_RETRY_PREFIX); } /** diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java index 18bb4669a22..86f5f5422e3 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java @@ -2557,5 +2557,7 @@ public static final class AuditLogging { public static final String AUDIT_LOG_CONSUMER_WRITER_SUBSCRIBER = "auditlog.consumer.publisher"; public static final String AUDIT_LOG_WRITER_SUBSCRIBER = "auditlog.subscriber"; + public static final String AUDIT_LOG_WRITER_RETRY_PREFIX = "system.auditlog"; + } } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/http/AuthenticationChannelHandler.java b/cdap-common/src/main/java/io/cdap/cdap/common/http/AuthenticationChannelHandler.java index 5cd7aaf1edd..4fa787b2fa4 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/http/AuthenticationChannelHandler.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/http/AuthenticationChannelHandler.java @@ -125,7 +125,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception SecurityRequestContext.setUserIp(currentUserIp); } - ctx.fireChannelRead(msg); + try { + ctx.fireChannelRead(msg); + } finally { + ctx.writeAndFlush(msg); + } } /** diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml index ca4a9bae743..12801ff0e64 100644 --- a/cdap-common/src/main/resources/cdap-default.xml +++ b/cdap-common/src/main/resources/cdap-default.xml @@ -4442,6 +4442,47 @@ + + system.auditlog.retry.policy.base.delay.ms + 100 + + The base delay between retries in milliseconds + + + + + system.auditlog.retry.policy.max.delay.ms + 2000 + + The maximum delay between retries in milliseconds + + + + + system.auditlog.retry.policy.max.retries + 100 + + The maximum number of retries to attempt before aborting + + + + + system.auditlog.retry.policy.max.time.secs + 2147483647 + + The maximum elapsed time in seconds before retries are aborted + + + + + system.auditlog.retry.policy.type + exponential.backoff + + The type of retry policy for workers. Allowed options: "none", + "fixed.delay", or "exponential.backoff". + + + diff --git a/cdap-security-spi/src/main/java/io/cdap/cdap/security/spi/authorization/AuditLoggerSpi.java b/cdap-security-spi/src/main/java/io/cdap/cdap/security/spi/authorization/AuditLoggerSpi.java index e4d61f4340c..7fa1e04258a 100644 --- a/cdap-security-spi/src/main/java/io/cdap/cdap/security/spi/authorization/AuditLoggerSpi.java +++ b/cdap-security-spi/src/main/java/io/cdap/cdap/security/spi/authorization/AuditLoggerSpi.java @@ -40,6 +40,6 @@ enum PublishStatus { * If the auth ext is able to publish a batch all together vs needs to publish one by one. * @return {@link PublishStatus} */ - PublishStatus publish(Queue auditLogContexts); + PublishStatus publishAuditLogs(Queue auditLogContexts); } diff --git a/cdap-security/src/main/java/io/cdap/cdap/security/authorization/AccessControllerWrapper.java b/cdap-security/src/main/java/io/cdap/cdap/security/authorization/AccessControllerWrapper.java index 3b563e08e11..b170fbf0ad8 100644 --- a/cdap-security/src/main/java/io/cdap/cdap/security/authorization/AccessControllerWrapper.java +++ b/cdap-security/src/main/java/io/cdap/cdap/security/authorization/AccessControllerWrapper.java @@ -267,7 +267,7 @@ private AuthorizationResponse createAuthResultFromUnauthorizedExp(UnauthorizedEx * @return {@link PublishStatus} */ @Override - public PublishStatus publish(Queue auditLogContexts) { + public PublishStatus publishAuditLogs(Queue auditLogContexts) { return PublishStatus.PUBLISHED; } } diff --git a/cdap-security/src/main/java/io/cdap/cdap/security/authorization/NoOpAccessControllerV2.java b/cdap-security/src/main/java/io/cdap/cdap/security/authorization/NoOpAccessControllerV2.java index ff517e70ffd..c0d0cd713bd 100644 --- a/cdap-security/src/main/java/io/cdap/cdap/security/authorization/NoOpAccessControllerV2.java +++ b/cdap-security/src/main/java/io/cdap/cdap/security/authorization/NoOpAccessControllerV2.java @@ -128,7 +128,7 @@ public AuthorizedResult> listGrants(Principal principal, * @return {@link PublishStatus} */ @Override - public PublishStatus publish(Queue auditLogContexts) { + public PublishStatus publishAuditLogs(Queue auditLogContexts) { //no-op return PublishStatus.PUBLISHED; } diff --git a/cdap-security/src/test/java/io/cdap/cdap/security/authorization/AccessControllerInstantiatorTest.java b/cdap-security/src/test/java/io/cdap/cdap/security/authorization/AccessControllerInstantiatorTest.java index 1e2f900b979..56f17775795 100644 --- a/cdap-security/src/test/java/io/cdap/cdap/security/authorization/AccessControllerInstantiatorTest.java +++ b/cdap-security/src/test/java/io/cdap/cdap/security/authorization/AccessControllerInstantiatorTest.java @@ -600,7 +600,7 @@ public AuthorizedResult> listGrants(Principal caller, Pri } @Override - public PublishStatus publish(Queue auditLogContexts) { + public PublishStatus publishAuditLogs(Queue auditLogContexts) { return null; } } diff --git a/cdap-security/src/test/java/io/cdap/cdap/security/authorization/InMemoryAccessControllerV2.java b/cdap-security/src/test/java/io/cdap/cdap/security/authorization/InMemoryAccessControllerV2.java index fc3b91914a8..14b43230fb6 100644 --- a/cdap-security/src/test/java/io/cdap/cdap/security/authorization/InMemoryAccessControllerV2.java +++ b/cdap-security/src/test/java/io/cdap/cdap/security/authorization/InMemoryAccessControllerV2.java @@ -294,7 +294,7 @@ private boolean isParent(EntityId guessingParent, Map guessi * @return {@link PublishStatus} */ @Override - public PublishStatus publish(Queue auditLogContexts) { + public PublishStatus publishAuditLogs(Queue auditLogContexts) { return PublishStatus.PUBLISHED; }