-
-
Notifications
You must be signed in to change notification settings - Fork 67
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #440 from rakesh-jain/elastic
Elasticsearch
- Loading branch information
Showing
10 changed files
with
555 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
plugins { | ||
id 'java' | ||
} | ||
|
||
repositories { | ||
mavenCentral() | ||
} | ||
|
||
dependencies { | ||
api project(":sbk-api") | ||
|
||
// Add your storage driver specific dependencies here | ||
api 'co.elastic.clients:elasticsearch-java:8.15.0' | ||
} |
171 changes: 171 additions & 0 deletions
171
drivers/elasticsearch/src/main/java/io/sbk/driver/Elasticsearch/Elasticsearch.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,171 @@ | ||
/** | ||
* Copyright (c) KMG. All Rights Reserved.. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
*/ | ||
package io.sbk.driver.Elasticsearch; | ||
|
||
import co.elastic.clients.elasticsearch.ElasticsearchClient; | ||
import co.elastic.clients.elasticsearch._types.ElasticsearchException; | ||
import co.elastic.clients.elasticsearch.indices.CreateIndexRequest; | ||
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse; | ||
import co.elastic.clients.json.jackson.JacksonJsonpMapper; | ||
import co.elastic.clients.transport.ElasticsearchTransport; | ||
import co.elastic.clients.transport.endpoints.BooleanResponse; | ||
import co.elastic.clients.transport.rest_client.RestClientTransport; | ||
import com.fasterxml.jackson.databind.DeserializationFeature; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.fasterxml.jackson.dataformat.javaprop.JavaPropsFactory; | ||
import io.sbk.api.DataReader; | ||
import io.sbk.api.DataWriter; | ||
import io.sbk.data.impl.SbkString; | ||
import io.sbk.params.ParameterOptions; | ||
import io.sbk.api.Storage; | ||
import io.sbk.data.DataType; | ||
import io.sbk.params.InputOptions; | ||
import io.sbk.system.Printer; | ||
import org.apache.http.HttpHost; | ||
import org.apache.http.auth.AuthScope; | ||
import org.apache.http.auth.UsernamePasswordCredentials; | ||
import org.apache.http.impl.client.BasicCredentialsProvider; | ||
import org.elasticsearch.client.RestClient; | ||
import org.elasticsearch.client.RestClientBuilder; | ||
|
||
import java.io.IOException; | ||
import java.util.Objects; | ||
|
||
/** | ||
* Class for Elasticsearch storage driver. | ||
* | ||
* Incase if your data type in other than byte[] (Byte Array) | ||
* then change the datatype and getDataType. | ||
*/ | ||
public class Elasticsearch implements Storage<String> { | ||
private final static String CONFIGFILE = "Elasticsearch.properties"; | ||
private ElasticsearchConfig config; | ||
private ElasticsearchClient elasticsearchClient; | ||
|
||
public static long generateStartKey(int id) { | ||
return (long) id * (long) Integer.MAX_VALUE; | ||
} | ||
|
||
@Override | ||
public void addArgs(final InputOptions params) throws IllegalArgumentException { | ||
final ObjectMapper mapper = new ObjectMapper(new JavaPropsFactory()) | ||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); | ||
try { | ||
config = mapper.readValue( | ||
Objects.requireNonNull(Elasticsearch.class.getClassLoader().getResourceAsStream(CONFIGFILE)), | ||
ElasticsearchConfig.class); | ||
} catch (Exception ex) { | ||
ex.printStackTrace(); | ||
throw new IllegalArgumentException(ex); | ||
} | ||
|
||
// change and uncomment the below code as per your driver specific parameters | ||
// params.addOption("param", true, "Elasticsearch parameter, default param: " + config.param); | ||
params.addOption("user", true, "ElasticSearch user : " + config.user); | ||
params.addOption("password", true, "ElasticSearch Password " + config.password); | ||
params.addOption("url", true, "ElasticSearch URL:" + config.url); | ||
params.addOption("index", true, "ElasticSearch Index: " + config.index); | ||
} | ||
|
||
@Override | ||
public void parseArgs(final ParameterOptions params) throws IllegalArgumentException { | ||
// change and uncommnet the below code as per your driver specific parameters | ||
// config.param = params.getOptionValue("param", config.param); | ||
config.user = params.getOptionValue("user", config.user); | ||
config.password = params.getOptionValue("password", config.password); | ||
config.url = params.getOptionValue("url", config.url); | ||
config.index = params.getOptionValue("index", config.index); | ||
} | ||
|
||
@Override | ||
public void openStorage(final ParameterOptions params) throws IOException { | ||
try { | ||
elasticsearchClient = connect(); | ||
Printer.log.info("ElasticSearch Client Connected....."); | ||
String index1 = config.index.trim(); | ||
if (!indexExists(index1)) { | ||
createIndex(elasticsearchClient, index1); | ||
} | ||
} catch (ElasticsearchException e ) { | ||
Printer.log.error(e.getMessage()); | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
private ElasticsearchClient connect() { | ||
Printer.log.info("Attempting to connect to Elasticsearch..."); | ||
try { | ||
final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); | ||
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(config.user, config.password)); | ||
|
||
RestClientBuilder builder = RestClient.builder(HttpHost.create(config.url)) | ||
.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); | ||
|
||
RestClient restClient = builder.build(); | ||
ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); | ||
return new ElasticsearchClient(transport); | ||
} catch (ElasticsearchException e) { | ||
Printer.log.error("Error connecting to Elasticsearch: " + e.getMessage()); | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
private boolean indexExists(String indexName) { | ||
try { | ||
BooleanResponse response = elasticsearchClient. | ||
indices().exists(e -> e.index(indexName)); | ||
return response.value(); | ||
} catch (IOException e) { | ||
e.printStackTrace(); | ||
return false; | ||
} | ||
} | ||
|
||
private void createIndex(ElasticsearchClient client, String indexName) { | ||
try { | ||
CreateIndexRequest createIndexRequest = CreateIndexRequest.of(c -> c.index(indexName)); | ||
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest); | ||
|
||
if (createIndexResponse.acknowledged()) { | ||
Printer.log.info(indexName + "Created Successfully"); | ||
} else { | ||
Printer.log.info("Index creation was not acknowledged."); | ||
} | ||
} catch (IOException e) { | ||
e.printStackTrace(); | ||
} | ||
} | ||
|
||
@Override | ||
public void closeStorage(final ParameterOptions params) throws IOException { | ||
try { | ||
elasticsearchClient._transport().close(); | ||
} catch (ElasticsearchException e) { | ||
e.printStackTrace(); | ||
Printer.log.error("Failed to close the connection"); | ||
} | ||
|
||
} | ||
|
||
@Override | ||
public DataWriter<String> createWriter(final int id, final ParameterOptions params) { | ||
return new ElasticsearchWriter(id, params, config, elasticsearchClient); | ||
} | ||
|
||
@Override | ||
public DataReader<String> createReader(final int id, final ParameterOptions params) { | ||
return new ElasticsearchReader(id, params, config, elasticsearchClient); | ||
} | ||
|
||
@Override | ||
public DataType<String> getDataType() { | ||
return new SbkString(); | ||
} | ||
} |
22 changes: 22 additions & 0 deletions
22
drivers/elasticsearch/src/main/java/io/sbk/driver/Elasticsearch/ElasticsearchConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
/** | ||
* Copyright (c) KMG. All Rights Reserved.. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
*/ | ||
|
||
package io.sbk.driver.Elasticsearch; | ||
|
||
/** | ||
* Class for Elasticsearch storage configuration. | ||
*/ | ||
public class ElasticsearchConfig { | ||
// Add Elasticsearch Storage driver configuration parameters | ||
public String user; | ||
public String password; | ||
public String url; | ||
public String index; | ||
} |
55 changes: 55 additions & 0 deletions
55
drivers/elasticsearch/src/main/java/io/sbk/driver/Elasticsearch/ElasticsearchReader.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
/** | ||
* Copyright (c) KMG. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
*/ | ||
package io.sbk.driver.Elasticsearch; | ||
|
||
import co.elastic.clients.elasticsearch.ElasticsearchClient; | ||
import co.elastic.clients.elasticsearch._types.ElasticsearchException; | ||
import co.elastic.clients.elasticsearch.core.GetRequest; | ||
import co.elastic.clients.elasticsearch.core.GetResponse; | ||
import io.sbk.params.ParameterOptions; | ||
import io.sbk.api.Reader; | ||
import io.sbk.system.Printer; | ||
|
||
import java.io.IOException; | ||
import java.util.Map; | ||
|
||
/** | ||
* Class for Elasticsearch Reader. | ||
*/ | ||
public class ElasticsearchReader implements Reader<String> { | ||
private final ElasticsearchConfig config; | ||
private final ElasticsearchClient client; | ||
private long id; | ||
|
||
public ElasticsearchReader(int readerId, ParameterOptions params, ElasticsearchConfig config, ElasticsearchClient client) { | ||
this.id = Elasticsearch.generateStartKey(readerId); | ||
this.config = config; | ||
this.client = client; | ||
} | ||
|
||
@Override | ||
public String read() throws IOException { | ||
try { | ||
GetRequest request = GetRequest.of(g -> g | ||
.index(config.index.trim()) | ||
.id(String.valueOf(id++)) | ||
); | ||
GetResponse<Map> response = client.get(request, Map.class); | ||
return response.fields().toString(); | ||
} catch (ElasticsearchException e) { | ||
Printer.log.error("Elastic Search: recordRead failed !"); | ||
throw new IOException(e); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
} | ||
} |
74 changes: 74 additions & 0 deletions
74
drivers/elasticsearch/src/main/java/io/sbk/driver/Elasticsearch/ElasticsearchWriter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
/** | ||
* Copyright (c) KMG. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
*/ | ||
package io.sbk.driver.Elasticsearch; | ||
|
||
import co.elastic.clients.elasticsearch.ElasticsearchClient; | ||
import co.elastic.clients.elasticsearch._types.ElasticsearchException; | ||
import co.elastic.clients.elasticsearch.core.IndexRequest; | ||
import io.sbk.params.ParameterOptions; | ||
import io.sbk.api.Writer; | ||
import io.sbk.system.Printer; | ||
|
||
import java.io.IOException; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.concurrent.CompletableFuture; | ||
|
||
|
||
/** | ||
* Class for Elasticsearch Writer. | ||
*/ | ||
public class ElasticsearchWriter implements Writer<String> { | ||
private final ElasticsearchConfig config; | ||
private final ElasticsearchClient client; | ||
private long id; | ||
|
||
public ElasticsearchWriter(int writerID, ParameterOptions params, ElasticsearchConfig config, ElasticsearchClient client) { | ||
this.id = Elasticsearch.generateStartKey(writerID); | ||
this.config = config; | ||
this.client = client; | ||
} | ||
|
||
@Override | ||
public CompletableFuture writeAsync(String data) throws IOException { | ||
try { | ||
writeData(data); | ||
} catch (ElasticsearchException ex ) { | ||
Printer.log.error("Elastic Search: recordWrite failed !"); | ||
throw new IOException(ex); | ||
} | ||
return null; | ||
} | ||
|
||
@Override | ||
public void sync() throws IOException { | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
} | ||
|
||
|
||
private void writeData(String data) { | ||
Map<String, String> document = new HashMap<>(); | ||
document.put("data", data); | ||
try { | ||
IndexRequest<Map<String, String>> request = IndexRequest.of(i -> i | ||
.index(config.index.trim()) | ||
.id(String.valueOf(id++)) | ||
.document(document) | ||
); | ||
client.index(request); | ||
} catch (ElasticsearchException | IOException ex ) { | ||
Printer.log.error("Elastic Search: recordWrite failed !"); | ||
throw new RuntimeException(ex); | ||
} | ||
} | ||
} |
13 changes: 13 additions & 0 deletions
13
drivers/elasticsearch/src/main/resources/Elasticsearch.properties
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
#Copyright (c) KMG. All Rights Reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
# Elasticsearch storage driver default Properties/parameters | ||
user= rakesh | ||
password= jain@88 | ||
url= http://localhost:9200 | ||
index=sbkj |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters