Skip to content

Commit

Permalink
Merge pull request #15727 from cdapio/DP_AUDITLOG_CDAP-20852_tms_M2
Browse files Browse the repository at this point in the history
[CDAP-20852] : Dataplane Audit Logging Milestone 2 : Process audit log events via cdap and publish to ext
  • Loading branch information
sahusanket authored Nov 14, 2024
2 parents 8771c9c + 711822b commit e9562fc
Show file tree
Hide file tree
Showing 74 changed files with 1,335 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.cdap.cdap.app.runtime.spark.distributed.SparkTwillRunnable;
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.guice.NoOpAuditLogModule;
import io.cdap.cdap.common.test.MockTwillContext;
import io.cdap.cdap.internal.app.runtime.BasicArguments;
import io.cdap.cdap.internal.app.runtime.ProgramOptionConstants;
Expand Down Expand Up @@ -100,7 +101,7 @@ protected ServiceAnnouncer getServiceAnnouncer() {
}
}.createModule(CConfiguration.create(), new Configuration(),
createProgramOptions(programRunId), programRunId);
Injector injector = Guice.createInjector(module);
Injector injector = Guice.createInjector(module, new NoOpAuditLogModule());
injector.getInstance(ServiceProgramRunner.class);
injector.getInstance(ProgramStateWriter.class);
}
Expand Down Expand Up @@ -151,7 +152,8 @@ protected ServiceAnnouncer getServiceAnnouncer() {
}
}.createModule(CConfiguration.create(), new Configuration(),
createProgramOptions(programRunId), programRunId);
Injector injector = Guice.createInjector(module);

Injector injector = Guice.createInjector(module,new NoOpAuditLogModule());
injector.getInstance(SparkProgramRunner.class);
injector.getInstance(ProgramStateWriter.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/

package io.cdap.cdap.app.guice;

import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.Module;
import com.google.inject.Scopes;
import io.cdap.cdap.api.auditlogging.AuditLogWriter;
import io.cdap.cdap.api.feature.FeatureFlagsProvider;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider;
import io.cdap.cdap.common.guice.NoOpAuditLogModule;
import io.cdap.cdap.common.runtime.RuntimeModule;
import io.cdap.cdap.features.Feature;
import io.cdap.cdap.security.auth.MessagingAuditLogWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AuditLogWriterModule extends RuntimeModule {

private static final Logger LOG = LoggerFactory.getLogger(AuditLogWriterModule.class);

private final boolean auditLoggingEnabled;
private final boolean securityEnabled;

@Inject
public AuditLogWriterModule(CConfiguration cConf) {
FeatureFlagsProvider featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf);
this.auditLoggingEnabled = Feature.DATAPLANE_AUDIT_LOGGING.isEnabled(featureFlagsProvider) ;

this.securityEnabled = cConf.getBoolean(Constants.Security.ENABLED);
}

/**
* Guice modules for In Memory use case of AuditLogWriter.
* Returns No Op for now. This may change in the future.
*/
@Override
public Module getInMemoryModules() {
return new NoOpAuditLogModule();
}

/**
* Guice modules for Standalone use case of AuditLogWriter.
* Returns No Op for now. This may change in the future.
*/
@Override
public Module getStandaloneModules() {
return new NoOpAuditLogModule();
}

/**
* Guice modules for Distributed use case where the audit events would be written to a messaging queue like tms.
*/
@Override
public Module getDistributedModules() {

if (auditLoggingEnabled && securityEnabled) {
LOG.info("Audit Logging feature is ENABLED. Injecting an audit message writer");
return new AbstractModule() {
@Override
protected void configure() {
bind(AuditLogWriter.class).to(MessagingAuditLogWriter.class).in(Scopes.SINGLETON);
}
};
}

LOG.debug("Audit Logging feature or Instance security is DISABLED.");
return new NoOpAuditLogModule();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.cdap.cdap.common.guice.DFSLocationModule;
import io.cdap.cdap.common.guice.IOModule;
import io.cdap.cdap.common.guice.KafkaClientModule;
import io.cdap.cdap.common.guice.NoOpAuditLogModule;
import io.cdap.cdap.common.guice.SupplierProviderBridge;
import io.cdap.cdap.common.guice.ZkClientModule;
import io.cdap.cdap.common.guice.ZkDiscoveryModule;
Expand Down Expand Up @@ -133,7 +134,6 @@ public DistributedProgramContainerModule(CConfiguration cConf, Configuration hCo
@Override
protected void configure() {
List<Module> modules = getCoreModules();

RuntimeMonitorType runtimeMonitorType = SystemArguments.getRuntimeMonitorType(cConf,
programOpts);
modules.add(RuntimeMonitors.getRemoteAuthenticatorModule(runtimeMonitorType, programOpts));
Expand All @@ -159,6 +159,7 @@ private List<Module> getCoreModules() {

List<Module> modules = new ArrayList<>();

modules.add(new NoOpAuditLogModule());
modules.add(new ConfigModule(cConf, hConf));
modules.add(new IOModule());
modules.add(new DFSLocationModule());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.inject.name.Named;
import com.google.inject.util.Modules;
import io.cdap.cdap.api.security.store.SecureStore;
import io.cdap.cdap.app.guice.AuditLogWriterModule;
import io.cdap.cdap.app.guice.ProgramRunnerRuntimeModule;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.conf.CConfiguration;
Expand Down Expand Up @@ -175,6 +176,7 @@ public Injector createPreviewInjector() {
new IOModule(),
RemoteAuthenticatorModules.getDefaultModule(),
new CoreSecurityRuntimeModule().getInMemoryModules(),
new AuditLogWriterModule(previewCConf).getInMemoryModules(),
new AuthenticationContextModules().getMasterWorkerModule(),
new PreviewSecureStoreModule(secureStore),
new PreviewDiscoveryRuntimeModule(discoveryServiceClient),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.api.security.AccessException;
import io.cdap.cdap.app.guice.AuditLogWriterModule;
import io.cdap.cdap.app.preview.PreviewConfigModule;
import io.cdap.cdap.app.preview.PreviewManager;
import io.cdap.cdap.app.preview.PreviewRequest;
Expand Down Expand Up @@ -312,6 +313,7 @@ Injector createPreviewInjector() {
new PreviewDataModules().getDataFabricModule(transactionSystemClient,
previewLevelDBTableService),
new PreviewDataModules().getDataSetsModule(datasetFramework),
new AuditLogWriterModule(previewCConf).getInMemoryModules(),
new AuthenticationContextModules().getMasterModule(),
new LocalLocationModule(),
new PreviewDiscoveryRuntimeModule(discoveryServiceClient),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.inject.assistedinject.FactoryModuleBuilder;
import io.cdap.cdap.api.common.Bytes;
import io.cdap.cdap.app.deploy.Configurator;
import io.cdap.cdap.app.guice.AuditLogWriterModule;
import io.cdap.cdap.app.preview.PreviewConfigModule;
import io.cdap.cdap.app.preview.PreviewRunner;
import io.cdap.cdap.app.preview.PreviewRunnerManager;
Expand Down Expand Up @@ -247,6 +248,7 @@ protected void configure() {

modules.add(new AuthenticationContextModules().getMasterWorkerModule());
modules.add(new AuthorizationEnforcementModule().getNoOpModules());
modules.add(new AuditLogWriterModule(cConf).getInMemoryModules());

byte[] pollerInfoBytes = Bytes.toBytes(new Gson().toJson(pollerInfo));
modules.add(new AbstractModule() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.cdap.cdap.internal.sysapp.SystemAppManagementService;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.scheduler.CoreSchedulerService;
import io.cdap.cdap.security.auth.AuditLogSubscriberService;
import io.cdap.cdap.sourcecontrol.RepositoryCleanupService;
import io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
Expand Down Expand Up @@ -80,6 +81,7 @@ public class AppFabricServer extends AbstractIdleService {
private final ApplicationLifecycleService applicationLifecycleService;
private final Set<String> servicesNames;
private final Set<String> handlerHookNames;
private final AuditLogSubscriberService auditLogSubscriberService;
private final ProgramNotificationSubscriberService programNotificationSubscriberService;
private final ProgramStopSubscriberService programStopSubscriberService;
private final RunRecordCorrectorService runRecordCorrectorService;
Expand Down Expand Up @@ -110,31 +112,32 @@ public class AppFabricServer extends AbstractIdleService {
*/
@Inject
public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
DiscoveryService discoveryService,
@Named(Constants.Service.MASTER_SERVICES_BIND_ADDRESS) InetAddress hostname,
@Named(Constants.AppFabric.HANDLERS_BINDING) Set<HttpHandler> handlers,
@Nullable MetricsCollectionService metricsCollectionService,
ProgramRuntimeService programRuntimeService,
RunRecordCorrectorService runRecordCorrectorService,
ProgramRunStatusMonitorService programRunStatusMonitorService,
ApplicationLifecycleService applicationLifecycleService,
ProgramNotificationSubscriberService programNotificationSubscriberService,
ProgramStopSubscriberService programStopSubscriberService,
@Named("appfabric.services.names") Set<String> servicesNames,
@Named("appfabric.handler.hooks") Set<String> handlerHookNames,
CoreSchedulerService coreSchedulerService,
CredentialProviderService credentialProviderService,
NamespaceCredentialProviderService namespaceCredentialProviderService,
ProvisioningService provisioningService,
BootstrapService bootstrapService,
SystemAppManagementService systemAppManagementService,
TransactionRunner transactionRunner,
RunRecordMonitorService runRecordCounterService,
CommonNettyHttpServiceFactory commonNettyHttpServiceFactory,
RunDataTimeToLiveService runDataTimeToLiveService,
SourceControlOperationRunner sourceControlOperationRunner,
RepositoryCleanupService repositoryCleanupService,
OperationNotificationSubscriberService operationNotificationSubscriberService) {
DiscoveryService discoveryService,
@Named(Constants.Service.MASTER_SERVICES_BIND_ADDRESS) InetAddress hostname,
@Named(Constants.AppFabric.HANDLERS_BINDING) Set<HttpHandler> handlers,
@Nullable MetricsCollectionService metricsCollectionService,
ProgramRuntimeService programRuntimeService,
RunRecordCorrectorService runRecordCorrectorService,
ProgramRunStatusMonitorService programRunStatusMonitorService,
ApplicationLifecycleService applicationLifecycleService,
ProgramNotificationSubscriberService programNotificationSubscriberService,
ProgramStopSubscriberService programStopSubscriberService,
@Named("appfabric.services.names") Set<String> servicesNames,
@Named("appfabric.handler.hooks") Set<String> handlerHookNames,
AuditLogSubscriberService auditLogSubscriberService,
CoreSchedulerService coreSchedulerService,
CredentialProviderService credentialProviderService,
NamespaceCredentialProviderService namespaceCredentialProviderService,
ProvisioningService provisioningService,
BootstrapService bootstrapService,
SystemAppManagementService systemAppManagementService,
TransactionRunner transactionRunner,
RunRecordMonitorService runRecordCounterService,
CommonNettyHttpServiceFactory commonNettyHttpServiceFactory,
RunDataTimeToLiveService runDataTimeToLiveService,
SourceControlOperationRunner sourceControlOperationRunner,
RepositoryCleanupService repositoryCleanupService,
OperationNotificationSubscriberService operationNotificationSubscriberService) {
this.hostname = hostname;
this.discoveryService = discoveryService;
this.handlers = handlers;
Expand All @@ -150,6 +153,7 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
this.runRecordCorrectorService = runRecordCorrectorService;
this.programRunStatusMonitorService = programRunStatusMonitorService;
this.sslEnabled = cConf.getBoolean(Constants.Security.SSL.INTERNAL_ENABLED);
this.auditLogSubscriberService = auditLogSubscriberService;
this.coreSchedulerService = coreSchedulerService;
this.credentialProviderService = credentialProviderService;
this.namespaceCredentialProviderService = namespaceCredentialProviderService;
Expand Down Expand Up @@ -179,6 +183,11 @@ protected void startUp() throws Exception {
if (Feature.NAMESPACED_SERVICE_ACCOUNTS.isEnabled(featureFlagsProvider)) {
futuresList.add(namespaceCredentialProviderService.start());
}
// Only for RBAC instances
if (Feature.DATAPLANE_AUDIT_LOGGING.isEnabled(featureFlagsProvider)
&& cConf.getBoolean(Constants.Security.ENABLED)) {
futuresList.add(auditLogSubscriberService.start());
}
futuresList.addAll(ImmutableList.of(
provisioningService.start(),
applicationLifecycleService.start(),
Expand Down Expand Up @@ -256,6 +265,7 @@ protected void shutDown() throws Exception {
credentialProviderService.stopAndWait();
namespaceCredentialProviderService.stopAndWait();
operationNotificationSubscriberService.stopAndWait();
auditLogSubscriberService.stopAndWait();
}

private Cancellable startHttpService(NettyHttpService httpService) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.inject.Injector;
import com.google.inject.Module;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.app.guice.AuditLogWriterModule;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.guice.ConfigModule;
Expand Down Expand Up @@ -97,6 +98,7 @@ static Injector createInjector(CConfiguration cConf, Configuration hConf) {
modules.add(new MessagingServiceModule(cConf));
modules.add(new SystemAppModule());
modules.add(new MetricsClientRuntimeModule().getDistributedModules());
modules.add(new AuditLogWriterModule(cConf).getDistributedModules());

// If MasterEnvironment is not available, assuming it is the old hadoop stack with ZK, Kafka
MasterEnvironment masterEnv = MasterEnvironments.getMasterEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.inject.Injector;
import com.google.inject.Module;
import io.cdap.cdap.api.feature.FeatureFlagsProvider;
import io.cdap.cdap.app.guice.AuditLogWriterModule;
import io.cdap.cdap.app.guice.DistributedArtifactManagerModule;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
Expand Down Expand Up @@ -114,6 +115,7 @@ public static Injector createInjector(CConfiguration cConf, Configuration hConf)
modules.add(coreSecurityModule);
modules.add(new MessagingServiceModule(cConf));
modules.add(new MetricsClientRuntimeModule().getDistributedModules());
modules.add(new AuditLogWriterModule(cConf).getInMemoryModules());

// If MasterEnvironment is not available, assuming it is the old hadoop stack with ZK, Kafka
MasterEnvironment masterEnv = MasterEnvironments.getMasterEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.cdap.cdap.api.artifact.ArtifactManager;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.app.guice.AppFabricServiceRuntimeModule;
import io.cdap.cdap.app.guice.AuditLogWriterModule;
import io.cdap.cdap.app.guice.AuthorizationModule;
import io.cdap.cdap.app.guice.DistributedArtifactManagerModule;
import io.cdap.cdap.app.guice.ProgramRunnerRuntimeModule;
Expand Down Expand Up @@ -154,6 +155,7 @@ protected void bindKeyManager(Binder binder) {
new MessagingServiceModule(cConf),
new AuthorizationModule(),
new AuthorizationEnforcementModule().getMasterModule(),
new AuditLogWriterModule(cConf).getDistributedModules(),
Modules.override(new AppFabricServiceRuntimeModule(cConf).getDistributedModules())
.with(new AbstractModule() {
// To enable localisation of artifacts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.inject.Key;
import com.google.inject.Module;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.app.guice.AuditLogWriterModule;
import io.cdap.cdap.app.preview.PreviewConfigModule;
import io.cdap.cdap.common.app.MainClassLoader;
import io.cdap.cdap.common.conf.CConfiguration;
Expand Down Expand Up @@ -168,6 +169,7 @@ public final void init(String[] args) throws Exception {
modules.add(new PreviewConfigModule(cConf, hConf, sConf));
modules.add(new IOModule());
modules.add(new MetricsClientRuntimeModule().getDistributedModules());
modules.add(new AuditLogWriterModule(cConf).getDistributedModules());
modules.add(new AbstractModule() {
@Override
protected void configure() {
Expand Down
Loading

0 comments on commit e9562fc

Please sign in to comment.