diff --git a/Makefile b/Makefile index f595b9e..e3a88f3 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ BINARY=es-query-csv -VERSION=1.3.1 +VERSION=v1.4.0 BUILD_TIME=`date +%FT%T%z` GOX_OSARCH="darwin/amd64 darwin/arm64 linux/386 linux/amd64 windows/386 windows/amd64" diff --git a/README.md b/README.md index c184db6..ac5c088 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ es-query-export -c "http://localhost:9200" -i "logstash-*" --startdate="2019-04- | `-q --query` | | Lucene query to match documents (same as in Kibana) | | ` --fields` | | define a comma separated list of fields to export | | `-o --outfile` | output.csv | name of output file | +| `-f --outformat` | csv | format of the output data: possible values csv, json, raw | | `-r --rawquery`| | optional raw ElasticSearch query JSON string | | `-s --start` | | optional start date - Format: YYYY-MM-DDThh:mm:ss.SSSZ. or any other Elasticsearch default format | | `-e --end` | | optional end date - Format: YYYY-MM-DDThh:mm:ss.SSSZ. or any other Elasticsearch default format | @@ -33,3 +34,9 @@ es-query-export -c "http://localhost:9200" -i "logstash-*" --startdate="2019-04- | `--verifySSL` | true | optional define how to handle SSL certificates | | `--user` | | optional username | | `--pass` | | optional password | + +## Output Formats + +- `csv` - all or selected fields separated by comma (,) +- `json` - all or selected fields as JSON objects, one per line +- `raw` - JSON dump of matching documents including id, index and _source field containing the document data. One document as JSON object per line. \ No newline at end of file diff --git a/export.go b/export.go deleted file mode 100644 index 3cdf544..0000000 --- a/export.go +++ /dev/null @@ -1,238 +0,0 @@ -package main - -import ( - "context" - "crypto/tls" - "encoding/csv" - "encoding/json" - "fmt" - "io" - "log" - "net/http" - "os" - "regexp" - "strings" - "time" - - "github.com/olivere/elastic/v7" - "golang.org/x/sync/errgroup" - "gopkg.in/cheggaaa/pb.v2" -) - -func export(ctx context.Context, conf *Flags) { - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: !conf.ElasticVerifySSL}, - } - httpClient := &http.Client{Transport: tr} - - esOpts := make([]elastic.ClientOptionFunc, 0) - esOpts = append(esOpts, - elastic.SetHttpClient(httpClient), - elastic.SetURL(conf.ElasticURL), - elastic.SetSniff(false), - elastic.SetHealthcheckInterval(60*time.Second), - elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)), - ) - - if conf.ElasticUser != "" && conf.ElasticPass != "" { - esOpts = append(esOpts, elastic.SetBasicAuth(conf.ElasticUser, conf.ElasticPass)) - } - - client, err := elastic.NewClient(esOpts...) - if err != nil { - log.Fatalf("Error connecting to ElasticSearch: %s", err) - } - defer client.Stop() - - if conf.Fieldlist != "" { - conf.fields = strings.Split(conf.Fieldlist, ",") - } - - outfile, err := os.Create(conf.Outfile) - if err != nil { - log.Fatalf("Error creating output file - %s", err) - } - defer outfile.Close() - - g, ctx := errgroup.WithContext(ctx) - - var rangeQuery *elastic.RangeQuery - - esQuery := elastic.NewBoolQuery() - - if conf.StartDate != "" && conf.EndDate != "" { - rangeQuery = elastic.NewRangeQuery(conf.Timefield).Gte(conf.StartDate).Lte(conf.EndDate) - } else if conf.StartDate != "" { - rangeQuery = elastic.NewRangeQuery(conf.Timefield).Gte(conf.StartDate) - } else if conf.EndDate != "" { - rangeQuery = elastic.NewRangeQuery(conf.Timefield).Lte(conf.EndDate) - } else { - rangeQuery = nil - } - - if rangeQuery != nil { - esQuery = esQuery.Filter(rangeQuery) - } - - if conf.RAWQuery != "" { - esQuery = esQuery.Must(elastic.NewRawStringQuery(conf.RAWQuery)) - } else if conf.Query != "" { - esQuery = esQuery.Must(elastic.NewQueryStringQuery(conf.Query)) - } else { - esQuery = esQuery.Must(elastic.NewMatchAllQuery()) - } - - /* - source, _ := esQuery.Source() - data, _ := json.Marshal(source) - fmt.Println(string(data)) - */ - - // Count total and setup progress - total, err := client.Count(conf.Index).Query(esQuery).Do(ctx) - if err != nil { - log.Printf("Error counting ElasticSearch documents - %v", err) - } - bar := pb.StartNew(int(total)) - - // one goroutine to receive hits from Elastic and send them to hits channel - hits := make(chan json.RawMessage) - g.Go(func() error { - defer close(hits) - - scroll := client.Scroll(conf.Index).Size(conf.ScrollSize).Query(esQuery) - - // include selected fields otherwise export all - if conf.fields != nil { - fetchSource := elastic.NewFetchSourceContext(true) - for _, field := range conf.fields { - fetchSource.Include(field) - } - scroll = scroll.FetchSourceContext(fetchSource) - } - - for { - results, err := scroll.Do(ctx) - if err == io.EOF { - return nil // all results retrieved - } - if err != nil { - return err // something went wrong - } - - // Send the hits to the hits channel - for _, hit := range results.Hits.Hits { - hits <- hit.Source - } - - // Check if we need to terminate early - select { - default: - case <-ctx.Done(): - return ctx.Err() - } - } - }) - - // goroutine outside of the errgroup to receive csv outputs from csvout channel and write to file - csvout := make(chan []string, 8) - go func() { - w := csv.NewWriter(outfile) - - var csvheader []string - if conf.fields != nil { - for _, field := range conf.fields { - csvheader = append(csvheader, field) - } - if err := w.Write(csvheader); err != nil { - log.Printf("Error writing CSV header - %v", err) - } - } - - for csvdata := range csvout { - if err := w.Write(csvdata); err != nil { - log.Printf("Error writing CSV data - %v", err) - } - - w.Flush() - bar.Increment() - } - - }() - - // some more goroutines in the errgroup context to do the transformation, room to add more work here in future - for i := 0; i < 8; i++ { - g.Go(func() error { - var document map[string]interface{} - - for hit := range hits { - var csvdata []string - var outdata string - - if err := json.Unmarshal(hit, &document); err != nil { - log.Printf("Error unmarshal JSON from ElasticSearch - %v", err) - } - - if conf.fields != nil { - for _, field := range conf.fields { - if val, ok := document[field]; ok { - if val == nil { - csvdata = append(csvdata, "") - continue - } - - // this type switch is probably not really needed anymore - switch val.(type) { - case int64: - outdata = fmt.Sprintf("%d", val) - case float64: - f := val.(float64) - d := int(f) - if f == float64(d) { - outdata = fmt.Sprintf("%d", d) - } else { - outdata = fmt.Sprintf("%f", f) - } - - default: - outdata = removeLBR(fmt.Sprintf("%v", val)) - } - - csvdata = append(csvdata, outdata) - } else { - csvdata = append(csvdata, "") - } - } - - } else { - for _, val := range document { - outdata = removeLBR(fmt.Sprintf("%v", val)) - csvdata = append(csvdata, outdata) - } - } - - // send string array to csv output - csvout <- csvdata - - select { - default: - case <-ctx.Done(): - return ctx.Err() - } - } - return nil - }) - } - - // Check if any goroutines failed. - if err := g.Wait(); err != nil { - log.Printf("Error - %v", err) - } - - bar.Finish() -} - -func removeLBR(text string) string { - re := regexp.MustCompile(`\x{000D}\x{000A}|[\x{000A}\x{000B}\x{000C}\x{000D}\x{0085}\x{2028}\x{2029}]`) - return re.ReplaceAllString(text, ``) -} diff --git a/export/export.go b/export/export.go new file mode 100644 index 0000000..853eafc --- /dev/null +++ b/export/export.go @@ -0,0 +1,165 @@ +package export + +import ( + "context" + "crypto/tls" + "io" + "log" + "net/http" + "os" + "strings" + "time" + + "github.com/olivere/elastic/v7" + "gopkg.in/cheggaaa/pb.v2" + + "github.com/pteich/elastic-query-export/flags" + "github.com/pteich/elastic-query-export/formats" +) + +const workers = 8 + +// Formatter defines how an output formatter has to look like +type Formatter interface { + Run(context.Context, <-chan *elastic.SearchHit) error +} + +// Run starts the export of Elastic data +func Run(ctx context.Context, conf *flags.Flags) { + exportCtx, cancel := context.WithCancel(ctx) + defer cancel() + + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: !conf.ElasticVerifySSL}, + } + httpClient := &http.Client{Transport: tr} + + esOpts := make([]elastic.ClientOptionFunc, 0) + esOpts = append(esOpts, + elastic.SetHttpClient(httpClient), + elastic.SetURL(conf.ElasticURL), + elastic.SetSniff(false), + elastic.SetHealthcheckInterval(60*time.Second), + elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)), + ) + + if conf.ElasticUser != "" && conf.ElasticPass != "" { + esOpts = append(esOpts, elastic.SetBasicAuth(conf.ElasticUser, conf.ElasticPass)) + } + + client, err := elastic.NewClient(esOpts...) + if err != nil { + log.Fatalf("Error connecting to ElasticSearch: %s", err) + } + defer client.Stop() + + if conf.Fieldlist != "" { + conf.Fields = strings.Split(conf.Fieldlist, ",") + } + + outfile, err := os.Create(conf.Outfile) + if err != nil { + log.Fatalf("Error creating output file - %s", err) + } + defer outfile.Close() + + var rangeQuery *elastic.RangeQuery + + esQuery := elastic.NewBoolQuery() + + if conf.StartDate != "" && conf.EndDate != "" { + rangeQuery = elastic.NewRangeQuery(conf.Timefield).Gte(conf.StartDate).Lte(conf.EndDate) + } else if conf.StartDate != "" { + rangeQuery = elastic.NewRangeQuery(conf.Timefield).Gte(conf.StartDate) + } else if conf.EndDate != "" { + rangeQuery = elastic.NewRangeQuery(conf.Timefield).Lte(conf.EndDate) + } else { + rangeQuery = nil + } + + if rangeQuery != nil { + esQuery = esQuery.Filter(rangeQuery) + } + + if conf.RAWQuery != "" { + esQuery = esQuery.Must(elastic.NewRawStringQuery(conf.RAWQuery)) + } else if conf.Query != "" { + esQuery = esQuery.Must(elastic.NewQueryStringQuery(conf.Query)) + } else { + esQuery = esQuery.Must(elastic.NewMatchAllQuery()) + } + + // Count total and setup progress + total, err := client.Count(conf.Index).Query(esQuery).Do(ctx) + if err != nil { + log.Printf("Error counting ElasticSearch documents - %v", err) + } + bar := pb.StartNew(int(total)) + + hits := make(chan *elastic.SearchHit) + + go func() { + defer close(hits) + + scroll := client.Scroll(conf.Index).Size(conf.ScrollSize).Query(esQuery) + + // include selected fields otherwise export all + if conf.Fields != nil { + fetchSource := elastic.NewFetchSourceContext(true) + for _, field := range conf.Fields { + fetchSource.Include(field) + } + scroll = scroll.FetchSourceContext(fetchSource) + } + + for { + results, err := scroll.Do(ctx) + if err == io.EOF { + return // all results retrieved + } + if err != nil { + log.Println(err) + cancel() + return // something went wrong + } + + // Send the hits to the hits channel + for _, hit := range results.Hits.Hits { + // Check if we need to terminate early + select { + case hits <- hit: + case <-exportCtx.Done(): + return + } + } + } + }() + + var output Formatter + switch conf.OutFormat { + case flags.FormatJSON: + output = formats.JSON{ + Outfile: outfile, + ProgessBar: bar, + } + case flags.FormatRAW: + output = formats.Raw{ + Outfile: outfile, + ProgessBar: bar, + } + default: + output = formats.CSV{ + Conf: conf, + Outfile: outfile, + Workers: workers, + ProgessBar: bar, + } + } + + err = output.Run(exportCtx, hits) + if err != nil { + log.Println(err) + } + + bar.Finish() +} diff --git a/flags.go b/flags/flags.go similarity index 78% rename from flags.go rename to flags/flags.go index d8d1c99..11d0f05 100644 --- a/flags.go +++ b/flags/flags.go @@ -1,4 +1,10 @@ -package main +package flags + +const ( + FormatCSV = "csv" + FormatJSON = "json" + FormatRAW = "raw" +) type Flags struct { ElasticURL string `cli:"connect" cliAlt:"c" usage:"ElasticSearch URL"` @@ -8,11 +14,12 @@ type Flags struct { Index string `cli:"index" cliAlt:"i" usage:"ElasticSearch Index (or Index Prefix)"` RAWQuery string `cli:"rawquery" cliAlt:"r" usage:"ElasticSearch raw query string"` Query string `cli:"query" cliAlt:"q" usage:"Lucene query same that is used in Kibana search input"` - Outfile string `cli:"outfile" cliAlt:"o" usage:"Path to CSV output file"` + OutFormat string `cli:"outformat" cliAlt:"f" usage:"Format of the output data. [json|csv]"` + Outfile string `cli:"outfile" cliAlt:"o" usage:"Path to output file"` StartDate string `cli:"start" cliAlt:"s" usage:"Start date for included documents"` EndDate string `cli:"end" cliAlt:"e" usage:"End date for included documents"` ScrollSize int `cli:"size" usage:"Number of documents that will be returned per shard"` Timefield string `cli:"timefield" usage:"Field name to use for start and end date query"` - Fieldlist string `cli:"fields" usage:"fields to include in export as comma separated list"` - fields []string + Fieldlist string `cli:"fields" usage:"Fields to include in export as comma separated list"` + Fields []string } diff --git a/formats/csv.go b/formats/csv.go new file mode 100644 index 0000000..d125d62 --- /dev/null +++ b/formats/csv.go @@ -0,0 +1,122 @@ +package formats + +import ( + "context" + "encoding/csv" + "encoding/json" + "fmt" + "log" + "os" + "regexp" + + "github.com/olivere/elastic/v7" + "golang.org/x/sync/errgroup" + "gopkg.in/cheggaaa/pb.v2" + + "github.com/pteich/elastic-query-export/flags" +) + +type CSV struct { + Conf *flags.Flags + Outfile *os.File + Workers int + ProgessBar *pb.ProgressBar +} + +func (c CSV) Run(ctx context.Context, hits <-chan *elastic.SearchHit) error { + g, ctx := errgroup.WithContext(ctx) + + csvout := make(chan []string, c.Workers) + defer close(csvout) + + go func() { + w := csv.NewWriter(c.Outfile) + + var csvheader []string + if c.Conf.Fields != nil { + csvheader = append(csvheader, c.Conf.Fields...) + if err := w.Write(csvheader); err != nil { + log.Printf("Error writing CSV header - %v", err) + } + } + + for csvdata := range csvout { + if err := w.Write(csvdata); err != nil { + log.Printf("Error writing CSV data - %v", err) + } + + w.Flush() + c.ProgessBar.Increment() + } + + }() + + for i := 0; i < c.Workers; i++ { + g.Go(func() error { + var document map[string]interface{} + + for hit := range hits { + var csvdata []string + var outdata string + + if err := json.Unmarshal(hit.Source, &document); err != nil { + log.Printf("Error unmarshal JSON from ElasticSearch - %v", err) + } + + if c.Conf.Fields != nil { + for _, field := range c.Conf.Fields { + if val, ok := document[field]; ok { + if val == nil { + csvdata = append(csvdata, "") + continue + } + + // this type switch is probably not really needed anymore + switch val := val.(type) { + case int64: + outdata = fmt.Sprintf("%d", val) + case float64: + d := int(val) + if val == float64(d) { + outdata = fmt.Sprintf("%d", d) + } else { + outdata = fmt.Sprintf("%f", val) + } + + default: + outdata = removeLBR(fmt.Sprintf("%v", val)) + } + + csvdata = append(csvdata, outdata) + } else { + csvdata = append(csvdata, "") + } + } + + } else { + for _, val := range document { + outdata = removeLBR(fmt.Sprintf("%v", val)) + csvdata = append(csvdata, outdata) + } + } + + // send string array to csv output + csvout <- csvdata + + select { + default: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + } + + return g.Wait() +} + +func removeLBR(text string) string { + re := regexp.MustCompile(`\x{000D}\x{000A}|[\x{000A}\x{000B}\x{000C}\x{000D}\x{0085}\x{2028}\x{2029}]`) + return re.ReplaceAllString(text, ``) +} diff --git a/formats/json.go b/formats/json.go new file mode 100644 index 0000000..e520c32 --- /dev/null +++ b/formats/json.go @@ -0,0 +1,27 @@ +package formats + +import ( + "context" + "fmt" + "os" + + "github.com/olivere/elastic/v7" + "gopkg.in/cheggaaa/pb.v2" +) + +type JSON struct { + Outfile *os.File + ProgessBar *pb.ProgressBar +} + +func (j JSON) Run(ctx context.Context, hits <-chan *elastic.SearchHit) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case hit := <-hits: + fmt.Fprintln(j.Outfile, string(hit.Source)) + j.ProgessBar.Increment() + } + } +} diff --git a/formats/raw.go b/formats/raw.go new file mode 100644 index 0000000..def8dc3 --- /dev/null +++ b/formats/raw.go @@ -0,0 +1,34 @@ +package formats + +import ( + "context" + "encoding/json" + "fmt" + "log" + "os" + + "github.com/olivere/elastic/v7" + "gopkg.in/cheggaaa/pb.v2" +) + +type Raw struct { + Outfile *os.File + ProgessBar *pb.ProgressBar +} + +func (r Raw) Run(ctx context.Context, hits <-chan *elastic.SearchHit) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case hit := <-hits: + data, err := json.Marshal(hit) + if err != nil { + log.Println(err) + continue + } + fmt.Fprintln(r.Outfile, string(data)) + r.ProgessBar.Increment() + } + } +} diff --git a/main.go b/main.go index 8cee29c..e623854 100644 --- a/main.go +++ b/main.go @@ -8,16 +8,20 @@ import ( "syscall" "github.com/pteich/configstruct" + + "github.com/pteich/elastic-query-export/export" + "github.com/pteich/elastic-query-export/flags" ) var Version string func main() { - flags := Flags{ + conf := flags.Flags{ ElasticURL: "http://localhost:9200", ElasticVerifySSL: true, Index: "logs-*", Query: "*", + OutFormat: flags.FormatCSV, Outfile: "output.csv", ScrollSize: 1000, Timefield: "@timestamp", @@ -28,10 +32,10 @@ func main() { cmd := configstruct.NewCommand( "", - "CLI tool to export data from ElasticSearch into a CSV file. https://github.com/pteich/elastic-query-export", - &flags, + "CLI tool to export data from ElasticSearch into a CSV or JSON file. https://github.com/pteich/elastic-query-export", + &conf, func(c *configstruct.Command, cfg interface{}) error { - export(ctx, cfg.(*Flags)) + export.Run(ctx, cfg.(*flags.Flags)) return nil }, )