diff --git a/libs/datafusion/jni/src/lib.rs b/libs/datafusion/jni/src/lib.rs index 743ccf1ad4ffb..536611ef9e74c 100644 --- a/libs/datafusion/jni/src/lib.rs +++ b/libs/datafusion/jni/src/lib.rs @@ -411,7 +411,7 @@ impl DataFusionAggregator { .as_ref() .and_then(|df| df.clone() .aggregate(vec![col("ord")], vec![sum(col("count")).alias("count")]).unwrap() - .sort(vec![col("ord").sort(false, false)]).ok()) + .sort(vec![col("ord").sort(true, false)]).ok()) } pub fn get_results_with_limit(&self, limit: i32) -> Option { diff --git a/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java index 62966d859b785..53d038d56630b 100644 --- a/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java +++ b/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java @@ -163,7 +163,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception { performFinalReduce, streamExecutor.orElse(executor) ); - logger.info("Will reduce results for {}", results.get(0)); +// logger.info("Will reduce results for {}", results.get(0)); } else { final SearchPhaseController.TopDocsStats topDocsStats = pendingMerges.consumeTopDocsStats(); final List topDocsList = pendingMerges.consumeTopDocs(); diff --git a/server/src/main/java/org/opensearch/action/search/SearchPhaseController.java b/server/src/main/java/org/opensearch/action/search/SearchPhaseController.java index 4ebf8863e10ed..d656d68a88441 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/opensearch/action/search/SearchPhaseController.java @@ -748,10 +748,8 @@ public ReducedQueryPhase reducedFromStream( for (CompletableFuture future : producerFutures) { TicketProcessor.TicketProcessorResult result = future.get(); - logger.info("Row at coord {}", result.getRowCount()); totalRows += result.getRowCount(); } - logger.info("Total {}", totalRows); TotalHits totalHits = new TotalHits(totalRows, Relation.EQUAL_TO); List scoreDocs = new ArrayList<>(); diff --git a/server/src/main/java/org/opensearch/search/query/BatchProcessor.java b/server/src/main/java/org/opensearch/search/query/BatchProcessor.java index 557da4ddcf03c..a37f93411616f 100644 --- a/server/src/main/java/org/opensearch/search/query/BatchProcessor.java +++ b/server/src/main/java/org/opensearch/search/query/BatchProcessor.java @@ -10,7 +10,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.action.search.SearchPhaseController; import org.opensearch.search.aggregations.bucket.terms.StringTerms; import java.util.ArrayList; @@ -28,7 +27,6 @@ public BatchProcessor() { this.batchQueue = new LinkedBlockingQueue<>(); this.mergedResult = new ArrayList<>(); this.producersComplete = false; - } public static Logger logger = LogManager.getLogger(BatchProcessor.class); @@ -37,26 +35,20 @@ public BlockingQueue> getBatchQueue() { } public void processBatches() { - int cnt = 0; +// int cnt = 0; while (!producersComplete || !batchQueue.isEmpty()) { try { List currentBatch = batchQueue.poll(100, TimeUnit.MILLISECONDS); if (currentBatch != null) { - for (int i = 0; i < currentBatch.size(); i++) { - StringTerms.Bucket bucket = currentBatch.get(i); - if (bucket.termBytes.utf8ToString().equalsIgnoreCase("quartzheron")) { - logger.info("quartzheron in batch: {}", bucket.getDocCount()); - } - } mergeSortedLists(currentBatch); - cnt++; +// cnt++; } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Interrupted while processing batches", e); } } - System.out.println("##Total batches processed: " + cnt); +// System.out.println("##Total batches processed: " + cnt); } public void markProducersComplete() { @@ -68,8 +60,6 @@ public List getMergedBuckets() { } private void mergeSortedLists(List newBatch) { - logger.info("##Merging a new batch of size: " + newBatch.size()); - List merged = new ArrayList<>(); int i = 0; int j = 0; @@ -104,12 +94,6 @@ private void mergeSortedLists(List newBatch) { mergedResult.clear(); mergedResult.addAll(merged); - for (int wtf = 0; wtf < mergedResult.size(); wtf++) { - StringTerms.Bucket bucket = mergedResult.get(wtf); - if (bucket.termBytes.utf8ToString().equalsIgnoreCase("quartzheron")) { - logger.info("quartzheron post merge???: {}", bucket.getDocCount()); - } - } } } diff --git a/server/src/main/java/org/opensearch/search/query/TicketProcessor.java b/server/src/main/java/org/opensearch/search/query/TicketProcessor.java index e064d5a6a227f..30648cb8540f1 100644 --- a/server/src/main/java/org/opensearch/search/query/TicketProcessor.java +++ b/server/src/main/java/org/opensearch/search/query/TicketProcessor.java @@ -53,15 +53,9 @@ public TicketProcessorResult call() throws Exception { VarCharVector termVector = (VarCharVector) root.getVector("ord"); UInt8Vector countVector = (UInt8Vector) root.getVector("count"); for (int row = 0; row < termVector.getValueCount(); row++) { - String s = new String(termVector.get(row)); - BytesRef term = new BytesRef(s.getBytes()); - long docCount = countVector.get(row); - if (term.utf8ToString().equalsIgnoreCase("quartzheron")) { - logger.info("quartzheron at coord: {}", docCount); - } StringTerms.Bucket bucket = new StringTerms.Bucket( - term, - docCount, + new BytesRef(termVector.get(row)), + countVector.get(row), new InternalAggregations(List.of()), false, 0, diff --git a/server/src/main/java/org/opensearch/search/stream/collector/PushStreamingCollector.java b/server/src/main/java/org/opensearch/search/stream/collector/PushStreamingCollector.java index 1a0e49a3cb6d5..e09a550785383 100644 --- a/server/src/main/java/org/opensearch/search/stream/collector/PushStreamingCollector.java +++ b/server/src/main/java/org/opensearch/search/stream/collector/PushStreamingCollector.java @@ -147,20 +147,19 @@ public void finish() throws IOException { VarCharVector ordVector = (VarCharVector) bucketRoot.getVector(ORD); BigIntVector countVector = (BigIntVector) bucketRoot.getVector(COUNT); int row = 0; + while (recordBatchStream.loadNextBatch().join()) { + UInt8Vector dfVector = (UInt8Vector) root.getVector(ORD); FieldVector cv = root.getVector(COUNT); for (int i = 0; i < dfVector.getValueCount(); i++) { -// BytesRef bytesRef = dv.lookupOrd(dfVector.get(i)); -// ordVector.setSafe(row, bytesRef.bytes, 0, bytesRef.length); - long ordKey = dfVector.get(i); - BytesRef term = BytesRef.deepCopyOf(dv.lookupOrd(ordKey)); - byte[] bytes = DocValueFormat.RAW.format(term).toString().getBytes(StandardCharsets.UTF_8); - ordVector.setSafe(row, bytes); + BytesRef bytesRef = dv.lookupOrd(dfVector.get(i)); + ordVector.setSafe(row, bytesRef.bytes, 0, bytesRef.length); +// long ordKey = dfVector.get(i); +// BytesRef term = BytesRef.deepCopyOf(dv.lookupOrd(ordKey)); +// byte[] bytes = DocValueFormat.RAW.format(term).toString().getBytes(StandardCharsets.UTF_8); +// ordVector.setSafe(row, bytes); long value = ((BigIntVector) cv).get(i); - if (term.utf8ToString().equalsIgnoreCase("quartzheron")) { - logger.info("quartzheron: {}", value); - } countVector.setSafe(row, value); row++; }