Skip to content

Commit

Permalink
fix sort order
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Feb 18, 2025
1 parent 4298abf commit cce37c5
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 40 deletions.
2 changes: 1 addition & 1 deletion libs/datafusion/jni/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ impl DataFusionAggregator {
.as_ref()
.and_then(|df| df.clone()
.aggregate(vec![col("ord")], vec![sum(col("count")).alias("count")]).unwrap()
.sort(vec![col("ord").sort(false, false)]).ok())
.sort(vec![col("ord").sort(true, false)]).ok())
}

pub fn get_results_with_limit(&self, limit: i32) -> Option<DataFrame> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
performFinalReduce,
streamExecutor.orElse(executor)
);
logger.info("Will reduce results for {}", results.get(0));
// logger.info("Will reduce results for {}", results.get(0));
} else {
final SearchPhaseController.TopDocsStats topDocsStats = pendingMerges.consumeTopDocsStats();
final List<TopDocs> topDocsList = pendingMerges.consumeTopDocs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -748,10 +748,8 @@ public ReducedQueryPhase reducedFromStream(

for (CompletableFuture<TicketProcessor.TicketProcessorResult> future : producerFutures) {
TicketProcessor.TicketProcessorResult result = future.get();
logger.info("Row at coord {}", result.getRowCount());
totalRows += result.getRowCount();
}
logger.info("Total {}", totalRows);

TotalHits totalHits = new TotalHits(totalRows, Relation.EQUAL_TO);
List<ScoreDoc> scoreDocs = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.search.SearchPhaseController;
import org.opensearch.search.aggregations.bucket.terms.StringTerms;

import java.util.ArrayList;
Expand All @@ -28,7 +27,6 @@ public BatchProcessor() {
this.batchQueue = new LinkedBlockingQueue<>();
this.mergedResult = new ArrayList<>();
this.producersComplete = false;

}
public static Logger logger = LogManager.getLogger(BatchProcessor.class);

Expand All @@ -37,26 +35,20 @@ public BlockingQueue<List<StringTerms.Bucket>> getBatchQueue() {
}

public void processBatches() {
int cnt = 0;
// int cnt = 0;
while (!producersComplete || !batchQueue.isEmpty()) {
try {
List<StringTerms.Bucket> currentBatch = batchQueue.poll(100, TimeUnit.MILLISECONDS);
if (currentBatch != null) {
for (int i = 0; i < currentBatch.size(); i++) {
StringTerms.Bucket bucket = currentBatch.get(i);
if (bucket.termBytes.utf8ToString().equalsIgnoreCase("quartzheron")) {
logger.info("quartzheron in batch: {}", bucket.getDocCount());
}
}
mergeSortedLists(currentBatch);
cnt++;
// cnt++;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while processing batches", e);
}
}
System.out.println("##Total batches processed: " + cnt);
// System.out.println("##Total batches processed: " + cnt);
}

public void markProducersComplete() {
Expand All @@ -68,8 +60,6 @@ public List<StringTerms.Bucket> getMergedBuckets() {
}

private void mergeSortedLists(List<StringTerms.Bucket> newBatch) {
logger.info("##Merging a new batch of size: " + newBatch.size());

List<StringTerms.Bucket> merged = new ArrayList<>();
int i = 0;
int j = 0;
Expand Down Expand Up @@ -104,12 +94,6 @@ private void mergeSortedLists(List<StringTerms.Bucket> newBatch) {

mergedResult.clear();
mergedResult.addAll(merged);
for (int wtf = 0; wtf < mergedResult.size(); wtf++) {
StringTerms.Bucket bucket = mergedResult.get(wtf);
if (bucket.termBytes.utf8ToString().equalsIgnoreCase("quartzheron")) {
logger.info("quartzheron post merge???: {}", bucket.getDocCount());
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,9 @@ public TicketProcessorResult call() throws Exception {
VarCharVector termVector = (VarCharVector) root.getVector("ord");
UInt8Vector countVector = (UInt8Vector) root.getVector("count");
for (int row = 0; row < termVector.getValueCount(); row++) {
String s = new String(termVector.get(row));
BytesRef term = new BytesRef(s.getBytes());
long docCount = countVector.get(row);
if (term.utf8ToString().equalsIgnoreCase("quartzheron")) {
logger.info("quartzheron at coord: {}", docCount);
}
StringTerms.Bucket bucket = new StringTerms.Bucket(
term,
docCount,
new BytesRef(termVector.get(row)),
countVector.get(row),
new InternalAggregations(List.of()),
false,
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,20 +147,19 @@ public void finish() throws IOException {
VarCharVector ordVector = (VarCharVector) bucketRoot.getVector(ORD);
BigIntVector countVector = (BigIntVector) bucketRoot.getVector(COUNT);
int row = 0;

while (recordBatchStream.loadNextBatch().join()) {

UInt8Vector dfVector = (UInt8Vector) root.getVector(ORD);
FieldVector cv = root.getVector(COUNT);
for (int i = 0; i < dfVector.getValueCount(); i++) {
// BytesRef bytesRef = dv.lookupOrd(dfVector.get(i));
// ordVector.setSafe(row, bytesRef.bytes, 0, bytesRef.length);
long ordKey = dfVector.get(i);
BytesRef term = BytesRef.deepCopyOf(dv.lookupOrd(ordKey));
byte[] bytes = DocValueFormat.RAW.format(term).toString().getBytes(StandardCharsets.UTF_8);
ordVector.setSafe(row, bytes);
BytesRef bytesRef = dv.lookupOrd(dfVector.get(i));
ordVector.setSafe(row, bytesRef.bytes, 0, bytesRef.length);
// long ordKey = dfVector.get(i);
// BytesRef term = BytesRef.deepCopyOf(dv.lookupOrd(ordKey));
// byte[] bytes = DocValueFormat.RAW.format(term).toString().getBytes(StandardCharsets.UTF_8);
// ordVector.setSafe(row, bytes);
long value = ((BigIntVector) cv).get(i);
if (term.utf8ToString().equalsIgnoreCase("quartzheron")) {
logger.info("quartzheron: {}", value);
}
countVector.setSafe(row, value);
row++;
}
Expand Down

0 comments on commit cce37c5

Please sign in to comment.