Skip to content

Commit

Permalink
fix all exporter issues
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy committed Jan 23, 2025
1 parent 1d991b8 commit 955b765
Show file tree
Hide file tree
Showing 18 changed files with 539 additions and 336 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ testClusters.all {
testDistribution = "INTEG_TEST"
// This installs our plugin into the testClusters
plugin(project.tasks.bundlePlugin.archiveFile)
// numberOfNodes = 2
}

run {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,21 +131,19 @@ public List<Setting<?>> getSettings() {
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS,
QueryInsightsSettings.TOP_N_CPU_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_CPU_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_CPU_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_CPU_EXPORTER_SETTINGS,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS,
QueryInsightsSettings.TOP_N_QUERIES_GROUP_BY,
QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N,
QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME,
QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE,
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING,
QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER,
QueryInsightsSettings.TOP_N_EXPORTER_TYPE,
QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ public final class DebugExporter implements QueryInsightsExporter {
* Logger of the debug exporter
*/
private final Logger logger = LogManager.getLogger();
private static final String EXPORTER_ID = "debug_exporter";

/**
* Constructor of DebugExporter
*/
private DebugExporter() {}

@Override
public String getId() {
return EXPORTER_ID;
}

private static class InstanceHolder {
private static final DebugExporter INSTANCE = new DebugExporter();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,30 @@
import static org.opensearch.plugin.insights.core.service.TopQueriesService.isTopQueriesIndex;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_DELETE_AFTER_VALUE;

import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.bulk.BulkRequestBuilder;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
Expand All @@ -44,21 +53,45 @@ public final class LocalIndexExporter implements QueryInsightsExporter {
*/
private final Logger logger = LogManager.getLogger();
private final Client client;
private final ClusterService clusterService;
private final String indexMapping;
private DateTimeFormatter indexPattern;
private int deleteAfter;
private final String id;

private static final int DEFAULT_NUMBER_OF_REPLICA = 1;
private static final int DEFAULT_NUMBER_OF_SHARDS = 1;
private static final List<String> DEFAULT_SORTED_FIELDS = List.of(
"measurements.latency.number",
"measurements.cpu.number",
"measurements.memory.number"
);
private static final List<String> DEFAULT_SORTED_ORDERS = List.of(
"desc",
"desc",
"desc"
);

/**
* Constructor of LocalIndexExporter
*
* @param client OS client
* @param indexPattern the pattern of index to export to
*/
public LocalIndexExporter(final Client client, final DateTimeFormatter indexPattern) {
public LocalIndexExporter(final Client client, final ClusterService clusterService, final DateTimeFormatter indexPattern, final String indexMapping, final String id) {
this.indexPattern = indexPattern;
this.client = client;
this.clusterService = clusterService;
this.indexMapping = indexMapping;
this.id = id;
this.deleteAfter = DEFAULT_DELETE_AFTER_VALUE;
}

@Override
public String getId() {
return id;
}

/**
* Getter of indexPattern
*
Expand Down Expand Up @@ -89,28 +122,73 @@ public void export(final List<SearchQueryRecord> records) {
}
try {
final String indexName = buildLocalIndexName();
final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1));
for (SearchQueryRecord record : records) {
bulkRequestBuilder.add(
new IndexRequest(indexName).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))

if (!checkIndexExists(indexName)) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);

createIndexRequest.settings(Settings.builder()
.putList("index.sort.field", DEFAULT_SORTED_FIELDS)
.putList("index.sort.order", DEFAULT_SORTED_ORDERS)
.put("index.number_of_shards", DEFAULT_NUMBER_OF_SHARDS)
.put("index.number_of_replicas", DEFAULT_NUMBER_OF_REPLICA)
);
createIndexRequest.mapping(readIndexMappings());

client.admin().indices().create(createIndexRequest, new ActionListener<>() {
@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
if (createIndexResponse.isAcknowledged()) {
try {
bulk(indexName, records);
} catch (IOException e) {
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS);
logger.error("Unable to index query insights data: ", e);
}
}
}
@Override
public void onFailure(Exception e) {
if (e instanceof ResourceAlreadyExistsException) {
try {
bulk(indexName, records);
} catch (IOException ex) {
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS);
logger.error("Unable to index query insights data: ", e);
}
} else {
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS);
logger.error("Unable to create query insights index: ", e);
}
}
});
} else {
bulk(indexName, records);
}
bulkRequestBuilder.execute(new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {}

@Override
public void onFailure(Exception e) {
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_BULK_FAILURES);
logger.error("Failed to execute bulk operation for query insights data: ", e);
}
});
} catch (final Exception e) {
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS);
logger.error("Unable to index query insights data: ", e);
}
}

private void bulk (final String indexName, final List<SearchQueryRecord> records) throws IOException {
final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1));
for (SearchQueryRecord record : records) {
bulkRequestBuilder.add(
new IndexRequest(indexName).id(record.getId()).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
}
bulkRequestBuilder.execute(new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {}

@Override
public void onFailure(Exception e) {
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_BULK_FAILURES);
logger.error("Failed to execute bulk operation for query insights data: ", e);
}
});
}

/**
* Close the exporter sink
*/
Expand Down Expand Up @@ -146,7 +224,7 @@ public void deleteExpiredTopNIndices(final Map<String, IndexMetadata> indexMetad
long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(deleteAfter);
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
String indexName = entry.getKey();
if (isTopQueriesIndex(indexName) && entry.getValue().getCreationDate() <= expirationMillisLong) {
if (isTopQueriesIndex(indexName, entry.getValue()) && entry.getValue().getCreationDate() <= expirationMillisLong) {
// delete this index
TopQueriesService.deleteSingleIndex(indexName, client);
}
Expand All @@ -167,4 +245,26 @@ public static String generateLocalIndexDateHash() {
// Generate a 5-digit numeric hash from the date's hashCode
return String.format(Locale.ROOT, "%05d", (currentDate.hashCode() % 100000 + 100000) % 100000);
}

/**
* check if index exists
* @return boolean
*/
private boolean checkIndexExists(String indexName) {
ClusterState clusterState = clusterService.state();
return clusterState.getRoutingTable().hasIndex(indexName);
}

/**
* get correlation rule index mappings
* @return mappings of correlation rule index
* @throws IOException IOException
*/
private String readIndexMappings() throws IOException {
return new String(
Objects.requireNonNull(LocalIndexExporter.class.getClassLoader().getResourceAsStream(indexMapping))
.readAllBytes(),
Charset.defaultCharset()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ public interface QueryInsightsExporter extends Closeable {
* @param records list of {@link SearchQueryRecord}
*/
void export(final List<SearchQueryRecord> records);
String getId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@

package org.opensearch.plugin.insights.core.exporter;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE;

import java.io.IOException;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
import org.opensearch.plugin.insights.core.reader.QueryInsightsReader;

/**
* Factory class for validating and creating exporters based on provided settings
Expand All @@ -32,39 +32,40 @@ public class QueryInsightsExporterFactory {
*/
private final Logger logger = LogManager.getLogger();
final private Client client;
final private Set<QueryInsightsExporter> exporters;
final private ClusterService clusterService;
final private Map<String, QueryInsightsExporter> exporters;

/**
* Constructor of QueryInsightsExporterFactory
*
* @param client OS client
*/
public QueryInsightsExporterFactory(final Client client) {
public QueryInsightsExporterFactory(final Client client, final ClusterService clusterService) {
this.client = client;
this.exporters = new HashSet<>();
this.clusterService = clusterService;
this.exporters = new HashMap<>();
}

/**
* Validate exporter sink config
*
* @param settings exporter sink config {@link Settings}
* @param exporterType exporter sink type
* @throws IllegalArgumentException if provided exporter sink config settings are invalid
*/
public void validateExporterConfig(final Settings settings) throws IllegalArgumentException {
public void validateExporterType(final String exporterType) throws IllegalArgumentException {
// Disable exporter if the EXPORTER_TYPE setting is null
if (settings.get(EXPORTER_TYPE) == null) {
if (exporterType == null) {
return;
}
SinkType type;
try {
type = SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE));
SinkType.parse(exporterType);
} catch (IllegalArgumentException e) {
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.INVALID_EXPORTER_TYPE_FAILURES);
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Invalid exporter type [%s], type should be one of %s",
settings.get(EXPORTER_TYPE),
exporterType,
SinkType.allSinkTypes()
)
);
Expand All @@ -78,10 +79,10 @@ public void validateExporterConfig(final Settings settings) throws IllegalArgume
* @param indexPattern the index pattern if creating a index exporter
* @return QueryInsightsExporter the created exporter sink
*/
public QueryInsightsExporter createExporter(SinkType type, String indexPattern) {
public QueryInsightsExporter createExporter(String id, SinkType type, String indexPattern, String indexMapping) {
if (SinkType.LOCAL_INDEX.equals(type)) {
QueryInsightsExporter exporter = new LocalIndexExporter(client, DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT));
this.exporters.add(exporter);
QueryInsightsExporter exporter = new LocalIndexExporter(client, clusterService, DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT), indexMapping, id);
this.exporters.put(id, exporter);
return exporter;
}
return DebugExporter.getInstance();
Expand All @@ -101,6 +102,15 @@ public QueryInsightsExporter updateExporter(QueryInsightsExporter exporter, Stri
return exporter;
}

/**
* Get a exporter by id
* @param id The id of the exporter
* @return QueryInsightsReader the Reader
*/
public QueryInsightsExporter getExporter(String id) {
return this.exporters.get(id);
}

/**
* Close an exporter
*
Expand All @@ -110,7 +120,7 @@ public QueryInsightsExporter updateExporter(QueryInsightsExporter exporter, Stri
public void closeExporter(QueryInsightsExporter exporter) throws IOException {
if (exporter != null) {
exporter.close();
this.exporters.remove(exporter);
this.exporters.remove(exporter.getId());
}
}

Expand All @@ -119,7 +129,7 @@ public void closeExporter(QueryInsightsExporter exporter) throws IOException {
*
*/
public void closeAllExporters() {
for (QueryInsightsExporter exporter : exporters) {
for (QueryInsightsExporter exporter : exporters.values()) {
try {
closeExporter(exporter);
} catch (IOException e) {
Expand Down
Loading

0 comments on commit 955b765

Please sign in to comment.