Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rti perform #288

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public void handleMessageForElrXml(String message,
iReportStatusRepository.save(reportStatusIdData);

if (dataProcessingApplied) {
kafkaProducerService.sendMessageAfterConvertedToXml(String.valueOf(nbsInterfaceModel.getNbsInterfaceUid()), "elr_unprocessed", 0);
kafkaProducerService.sendMessageAfterConvertedToXml(String.valueOf(nbsInterfaceModel.getNbsInterfaceUid()), "dp_elr_unprocessed", 0);
}
else {
kafkaProducerService.sendMessageAfterConvertedToXml(nbsInterfaceModel.getNbsInterfaceUid().toString(), convertedToXmlTopic, 0);
Expand Down Expand Up @@ -538,7 +538,7 @@ public void xmlConversionHandlerProcessing(String message, String operation, Str
}

if (dataProcessingApplied) {
kafkaProducerService.sendMessageAfterConvertedToXml(nbsInterfaceModel.getNbsInterfaceUid().toString(), "elr_unprocessed", 0); //NOSONAR
kafkaProducerService.sendMessageAfterConvertedToXml(nbsInterfaceModel.getNbsInterfaceUid().toString(), "dp_elr_unprocessed", 0); //NOSONAR
} else {
kafkaProducerService.sendMessageAfterConvertedToXml(nbsInterfaceModel.getNbsInterfaceUid().toString(), convertedToXmlTopic, 0);
}
Expand Down
12 changes: 6 additions & 6 deletions data-processing-service/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Kafka Config
- kafka-topics.sh --alter --bootstrap-server [your-kafka-broker]:9092 --partitions [new-num-partitions] --topic [your-topic-name]
-
-
kafka-topics.sh --alter --bootstrap-server localhost:9092 --partitions 10 --topic elr_action_tracker
kafka-topics.sh --alter --bootstrap-server localhost:9092 --partitions 2 --topic elr_action_tracker
kafka-topics.sh --alter --bootstrap-server localhost:9092 --partitions 10 --topic elr_duplicate
kafka-topics.sh --alter --bootstrap-server localhost:9092 --partitions 10 --topic elr_edx_log
kafka-topics.sh --alter --bootstrap-server localhost:9092 --partitions 10 --topic elr_processing_handle_lab
Expand Down Expand Up @@ -34,9 +34,9 @@ docker exec -t broker kafka-topics --bootstrap-server broker:29092 --topic elr_e
docker exec -t broker kafka-topics --bootstrap-server broker:29092 --topic elr_action_tracker --describe


docker exec -t broker kafka-topics -alter --bootstrap-server broker:29092 --topic elr_unprocessed --partitions 10
docker exec -t broker kafka-topics -alter --bootstrap-server broker:29092 --topic elr_processing_handle_lab --partitions 10
docker exec -t broker kafka-topics -alter --bootstrap-server broker:29092 --topic elr_processing_public_health_case --partitions 10
docker exec -t broker kafka-topics -alter --bootstrap-server broker:29092 --topic elr_edx_log --partitions 10
docker exec -t broker kafka-topics -alter --bootstrap-server broker:29092 --topic elr_action_tracker --partitions 10
docker exec -t broker kafka-topics -alter --bootstrap-server broker:29092 --topic dp_elr_unprocessed --partitions 100
docker exec -t broker kafka-topics -alter --bootstrap-server broker:29092 --topic dp_elr_processing_handle_lab --partitions 100
docker exec -t broker kafka-topics -alter --bootstrap-server broker:29092 --topic dp_elr_processing_public_health_case --partitions 100
docker exec -t broker kafka-topics -alter --bootstrap-server broker:29092 --topic dp_elr_edx_log --partitions 100
docker exec -t broker kafka-topics -alter --bootstrap-server broker:29092 --topic dp_elr_action_tracker --partitions 100

2 changes: 1 addition & 1 deletion data-processing-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ dependencies {

// Misc
implementation 'com.google.code.gson:gson:2.11.0'
implementation 'com.zaxxer:HikariCP:5.0.1'
implementation 'com.zaxxer:HikariCP:6.2.1'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.18.2'
implementation 'org.hibernate.validator:hibernate-validator'
implementation 'jakarta.xml.bind:jakarta.xml.bind-api:4.0.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ public class KafkaConsumerConfig {
@Value("${spring.kafka.group-id}")
private String groupId = "";

@Value("${spring.kafka.group-id-phc}")
private String groupId2 = "";

@Value("${spring.kafka.group-id-lab}")
private String groupId3 = "";

@Value("${spring.kafka.group-id-edx}")
private String groupId4 = "";

@Value("${spring.kafka.partition}")
private Integer partition = 1;

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers = "";

Expand Down Expand Up @@ -74,7 +86,7 @@ public ConsumerFactory<String, String> consumerFactory() {
// Session timeout configurations
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 30-second session timeout
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000); // Send heartbeat every 10 seconds
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Disable auto commit for manual control
// config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Disable auto commit for manual control

return new DefaultKafkaConsumerFactory<>(config);
}
Expand All @@ -84,10 +96,124 @@ public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerCont
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setConcurrency(partition);
// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);

return factory;
}



@Bean
public ConsumerFactory<String, String> consumerFactory2() {
final Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId2);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());

// High-throughput configurations
config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 1024); // Fetch at least 1MB of data per request
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // Wait up to 500ms for more data before responding
config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10 * 1024 * 1024); // Fetch up to 10MB per partition

// Polling configurations
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // Limit records fetched per poll to 500
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000); // Allow up to 5 minutes for processing

// Session timeout configurations
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 30-second session timeout
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000); // Send heartbeat every 10 seconds
// config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Disable auto commit for manual control

return new DefaultKafkaConsumerFactory<>(config);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory2() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory2());
factory.setConcurrency(partition);
// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);

return factory;
}

@Bean
public ConsumerFactory<String, String> consumerFactory3() {
final Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId3);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());

// High-throughput configurations
config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 1024); // Fetch at least 1MB of data per request
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // Wait up to 500ms for more data before responding
config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10 * 1024 * 1024); // Fetch up to 10MB per partition

// Polling configurations
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // Limit records fetched per poll to 500
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000); // Allow up to 5 minutes for processing

// Session timeout configurations
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 30-second session timeout
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000); // Send heartbeat every 10 seconds
// config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Disable auto commit for manual control

return new DefaultKafkaConsumerFactory<>(config);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory3() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory3());
factory.setConcurrency(partition);
// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);

return factory;
}

@Bean
public ConsumerFactory<String, String> consumerFactory4() {
final Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId4);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());

// High-throughput configurations
config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 1024); // Fetch at least 1MB of data per request
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // Wait up to 500ms for more data before responding
config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10 * 1024 * 1024); // Fetch up to 10MB per partition

// Polling configurations
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // Limit records fetched per poll to 500
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000); // Allow up to 5 minutes for processing

// Session timeout configurations
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 30-second session timeout
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000); // Send heartbeat every 10 seconds
// config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Disable auto commit for manual control

return new DefaultKafkaConsumerFactory<>(config);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory4() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory4());
factory.setConcurrency(partition);
// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);

return factory;
}


}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package gov.cdc.dataprocessing.config;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import jakarta.persistence.EntityManagerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -61,16 +63,52 @@ public class NbsDataSourceConfig {
@Value("${spring.datasource.password}")
private String dbUserPassword;

@Value("${spring.datasource.hikari.maximum-pool-size:100}")
private int maximumPoolSize;

@Value("${spring.datasource.hikari.minimum-idle:50}")
private int minimumIdle;

@Value("${spring.datasource.hikari.idle-timeout:120000}")
private long idleTimeout;

@Value("${spring.datasource.hikari.max-lifetime:1200000}")
private long maxLifetime;

@Value("${spring.datasource.hikari.connection-timeout:300000}")
private long connectionTimeout;

@Value("${spring.datasource.hikari.pool-name:OdseHikariCP}")
private String poolName;


@Bean(name = "nbsDataSource")
public DataSource nbsDataSource() {
DataSourceBuilder dataSourceBuilder = DataSourceBuilder.create();
// DataSourceBuilder dataSourceBuilder = DataSourceBuilder.create();
//
// dataSourceBuilder.driverClassName(driverClassName);
// dataSourceBuilder.url(dbUrl);
// dataSourceBuilder.username(dbUserName);
// dataSourceBuilder.password(dbUserPassword);
//
// return dataSourceBuilder.build();


HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setDriverClassName(driverClassName);
hikariConfig.setJdbcUrl(dbUrl);
hikariConfig.setUsername(dbUserName);
hikariConfig.setPassword(dbUserPassword);

dataSourceBuilder.driverClassName(driverClassName);
dataSourceBuilder.url(dbUrl);
dataSourceBuilder.username(dbUserName);
dataSourceBuilder.password(dbUserPassword);
// HikariCP-specific settings
hikariConfig.setMaximumPoolSize(maximumPoolSize);
hikariConfig.setMinimumIdle(minimumIdle);
hikariConfig.setIdleTimeout(idleTimeout);
hikariConfig.setMaxLifetime(maxLifetime);
hikariConfig.setConnectionTimeout(connectionTimeout);
hikariConfig.setPoolName(poolName);

return dataSourceBuilder.build();
return new HikariDataSource(hikariConfig);
}

@Bean(name = "nbsEntityManagerFactoryBuilder")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package gov.cdc.dataprocessing.config;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import jakarta.persistence.EntityManagerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -63,16 +65,50 @@ public class OdseDataSourceConfig {
@Value("${spring.datasource.password}")
private String dbUserPassword;

@Value("${spring.datasource.hikari.maximum-pool-size:100}")
private int maximumPoolSize;

@Value("${spring.datasource.hikari.minimum-idle:50}")
private int minimumIdle;

@Value("${spring.datasource.hikari.idle-timeout:120000}")
private long idleTimeout;

@Value("${spring.datasource.hikari.max-lifetime:1200000}")
private long maxLifetime;

@Value("${spring.datasource.hikari.connection-timeout:300000}")
private long connectionTimeout;

@Value("${spring.datasource.hikari.pool-name:OdseHikariCP}")
private String poolName;

@Bean(name = "odseDataSource")
public DataSource odseDataSource() {
DataSourceBuilder dataSourceBuilder = DataSourceBuilder.create();
// DataSourceBuilder dataSourceBuilder = DataSourceBuilder.create();
//
// dataSourceBuilder.driverClassName(driverClassName);
// dataSourceBuilder.url(dbUrl);
// dataSourceBuilder.username(dbUserName);
// dataSourceBuilder.password(dbUserPassword);
//
// return dataSourceBuilder.build();

HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setDriverClassName(driverClassName);
hikariConfig.setJdbcUrl(dbUrl);
hikariConfig.setUsername(dbUserName);
hikariConfig.setPassword(dbUserPassword);

dataSourceBuilder.driverClassName(driverClassName);
dataSourceBuilder.url(dbUrl);
dataSourceBuilder.username(dbUserName);
dataSourceBuilder.password(dbUserPassword);
// HikariCP-specific settings
hikariConfig.setMaximumPoolSize(maximumPoolSize);
hikariConfig.setMinimumIdle(minimumIdle);
hikariConfig.setIdleTimeout(idleTimeout);
hikariConfig.setMaxLifetime(maxLifetime);
hikariConfig.setConnectionTimeout(connectionTimeout);
hikariConfig.setPoolName(poolName);

return dataSourceBuilder.build();
return new HikariDataSource(hikariConfig);
}

@Bean(name = "odseEntityManagerFactoryBuilder")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package gov.cdc.dataprocessing.controller;

import com.google.gson.Gson;
import gov.cdc.dataprocessing.repository.nbs.odse.repos.person.PersonRepository;
import gov.cdc.dataprocessing.utilities.GsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
Expand Down Expand Up @@ -32,11 +35,22 @@
@SuppressWarnings({"java:S125", "java:S3776", "java:S6204", "java:S1141", "java:S1118", "java:S1186", "java:S6809", "java:S6541", "java:S2139", "java:S3740",
"java:S1149", "java:S112", "java:S107", "java:S1195", "java:S1135", "java:S6201", "java:S1192", "java:S135", "java:S117"})
public class Controller {
@Autowired
public Controller( ) {

private final PersonRepository personRepository;

public Controller(PersonRepository personRepository) {
this.personRepository = personRepository;
}

@GetMapping("/test")
public ResponseEntity<String> getPersonTest() {
log.info("Data Processing Service Status OK");
var test = personRepository.findById(10599457L);
Gson gsonUtil = new Gson();
return ResponseEntity.status(HttpStatus.OK).body(gsonUtil.toJson(test.get()) );
}


@GetMapping("/status")
public ResponseEntity<String> getDataPipelineStatusHealth() {
log.info("Data Processing Service Status OK");
Expand Down
Loading
Loading