fix:es滚动查询

上级 815d0a00
......@@ -13,8 +13,6 @@ public class EsController {
@PostMapping("es")
public Boolean query(@RequestParam String query, Long fetchSize) {
return esService.query(query, fetchSize);
}
}
......@@ -13,16 +13,32 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* 查询处理器
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/7/30 18:11
*/
@Component
public class EsQueryProcessor {
//1. 我们要用stream 返回 为了节省内存
/**
* 用stream 返回 为了节省内存
*
* @param query
* @param fetchSize
* @return
*/
public Stream<Map<String, Object>> scrollEsStream(String query, Long fetchSize) {
return StreamSupport.stream(Spliterators
.spliteratorUnknownSize(new ScrollIterator(query, fetchSize), 0), false);
}
//2. 我们要 迭代器
/**
* 要用迭代器
*/
private class ScrollIterator implements Iterator<Map<String, Object>> {
private String scrollId;
private List<String> columns;
......@@ -36,18 +52,23 @@ public class EsQueryProcessor {
new EsSqlQuery(query, fetchSize), EsSqlResult.class);//第一次访问的结果出来了
this.scrollId = esSqlResult.getCursor();
this.columns = esSqlResult.getColumns()
.stream().map(x->x.get("name"))
.stream().map(x -> x.get("name"))
.collect(Collectors.toList());
this.iterator = convert(columns, esSqlResult).iterator();
}
// hasNext 根据 是否 scrollId 为null进行后续的 第二次,第三次,,,的访问,直到 scrollId 为null
/**
* hasNext 根据 是否 scrollId 为null进行后续的 第二次,第三次,,,的访问,直到 scrollId 为null
*
* @return
*/
@Override
public boolean hasNext() {
return iterator.hasNext() || scrollNext();
}
private boolean scrollNext() {
if(iterator == null || this.scrollId == null) {
if (iterator == null || this.scrollId == null) {
return false;
}
EsSqlResult esSqlResult = restTemplate.postForObject("http://localhost:9200/_sql?format=json",
......@@ -64,13 +85,18 @@ public class EsQueryProcessor {
}
//3. 返回结果传统一点 List<map>
/**
* 返回结果传统一点 List<map>
*
* @param columns
* @param esSqlResult
* @return
*/
private List<Map<String, Object>> convert(List<String> columns, EsSqlResult esSqlResult) {
List<Map<String, Object>> results = new ArrayList<>();
for(List<Object> row : esSqlResult.getRows()) {
for (List<Object> row : esSqlResult.getRows()) {
Map<String, Object> map = new HashMap<>();
for(int i = 0; i < columns.size(); i++) {
for (int i = 0; i < columns.size(); i++) {
map.put(columns.get(i), row.get(i));
}
results.add(map);
......
......@@ -2,10 +2,27 @@ package com.study.design.esquery;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
/**
* 查询参数
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/7/30 18:10
*/
@JsonIgnoreProperties
public class EsSqlQuery {
/**
* 查询的sql
*/
private String query;
/**
* 一次获取的数量
*/
private Long fetchSize;
/**
* 游标
*/
private String cursor;
public EsSqlQuery(String cursor) {
......
......@@ -3,9 +3,27 @@ package com.study.design.esquery;
import java.util.List;
import java.util.Map;
/**
* 查询的结果集
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/7/30 18:11
*/
public class EsSqlResult {
/**
* 列名
*/
private List<Map<String, String>> columns;
/**
* 行数据信息
*/
private List<List<Object>> rows;
/**
* 游标
*/
private String cursor;
public List<Map<String, String>> getColumns() {
......
package com.study.design.service;
import com.study.design.esquery.EsQueryProcessor;
import com.study.design.order.pojo.Order;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
......@@ -16,7 +12,7 @@ public class EsService {
@Autowired
private EsQueryProcessor esQueryProcessor;
public Boolean query(String query, Long fetchSize) {
Stream<Map<String, Object>> mapStream = esQueryProcessor
.scrollEsStream(query, fetchSize);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册