Elasticsearch工具类 支持树形结构
- 1. 前言
- 2. 目标
- 3. 问题和解决
- 4. 代码结构设计
- 4.1 普通结构接口-EsRepository
- 4.2 树形结构接口-EsTreeRepository
- 4.3 普通结构抽象类-AbstractEsRepository
- 4.4 树形结构抽象类-AbstractEsTreeRepository
- 4.5 树形结构基类-TreeNode
- 4.6 测试对象实体类-TestTreeEntity
- 5. 实现方法
- 5.1 saveAll
- 5.2 deleteById
- 5.3 deleteByQuery
- 5.4 updateAllById
- 5.5 existsById
- 5.6 根据id集合获取数据List
- 5.7 countGroupBy
- 5.8 树形结构的countByParentId
- 总结
1. 前言
最近做的几个项目用ES作为数据库,一个项目用的开源的jest作为ES工具,感觉用的还可以,但是它好久不更新了。还有一个项目的工具类是自己写的,写的很粗糙,老大的意思要支持ES5.6和ES6.8这两个版本。后来我就用了ES5.6的Low Level Java API实现了常用CRUD方法。后来体验了下Spring Data Elasticsearch,感觉这个框架体验极好,API非常丰富,Spring出品的果然牛。还有因为我常用Spring Data JPA,所以上手有很熟悉的感觉。
因为我技术比较菜,几年Java 开发工作中,也就CRUD,所以看Spring Data ES的源码很吃力,反正看不懂。所以想自己再写个简单的ES工具类,全当熟悉下ES的Java API,和优雅的Spring Data ES比,相差十万八千里。
2. 目标
恰巧我自己写的第一个Java功能是一个ORM工具类,就是根据实体类,产生CRUD方法,所以对Java的泛型和反射还有一点点印象,所以写这个ES的ORM工具又有了当年熟悉的味道。先定好这次的几个目标:
2.1 目标:基于实体类的CRUD
看了网上那么多JPA和Mybatis哪个好的文章,我感觉这些争吵毫无意义,适合自己的就是好的。我个人喜欢JPA的那种面向对象的调调,它也提供了手写SQL查询功能。所以我可能要实现的是如下风格的接口:
T sava(T t);
T findById(String id);
long count(QueryBuilder queryBuilder);
2.2 目标:支持查询ES中树形结构数据
这里我说的ES中树形结构,它不是ES自带的父子文档,我感觉用ES的parent语法挺难用,也许是我太菜,不太会用ES的父子文档。
这里我说的ES的树形结构参考我上一篇博客ES保存树形结构 结合Spring Data Elasticsearch
这里顺便提一下,树形结构最好一个节点只有一个父亲节点,一个节点多父亲的情况在工作中确实会遇到,但是那个坑很多,维护起来很麻烦。所以对我个人而言,拒绝多父亲的树形结构。
3. 问题和解决
3.1 问题:获取泛型T的class,避免显示传入class
泛型T已经传来了,获取T的class,会让代码更加优雅。
不然你看下网上别人的代码,还得传一个clazz,是不是特别让人不爽。
T getById(M id, Class<T> clazz)
boolean exists(M id, Class<T> clazz)
long count(QueryBuilder queryBuilder, Class<T> clazz);
List<T> searchMore(QueryBuilder queryBuilder,int limitSize, Class<T> clazz);
3.1 解决:抄Spring Data的作业
Spring Data ES里有段代码,不明觉厉。虽然我看不懂,但大概理解为子类(AbstractElasticsearchRepository<T, ID>)实现接口(ElasticsearchRepository<T, ID>),在子类中获取父亲的T的类型,这段我也就抄作业抄一半。
private ParameterizedType resolveReturnedClassFromGenericType(Class<?> clazz) {
Object genericSuperclass = clazz.getGenericSuperclass();
if (genericSuperclass instanceof ParameterizedType) {
ParameterizedType parameterizedType = (ParameterizedType) genericSuperclass;
Type rawtype = parameterizedType.getRawType();
if (SimpleElasticsearchRepository.class.equals(rawtype)) {
return parameterizedType;
}
}
return resolveReturnedClassFromGenericType(clazz.getSuperclass());
}
3.2 问题:树形结构如何设计
对于一个树形结构数据,我们常用到如下场景:
- 根据Id,获取其直接儿子节点
- 根据Id,获取其所有子孙节点,例如子孙节点总个数
- 根据Id,获取其所有祖先节点
- 节点变更父亲,修改该节点所以子孙节点的path信息
- 删除一个节点,判断其下是否有子孙,有则不允许删除
3.2 解决:利用ES的nested类型,记录祖先节点ID
参考ES保存树形结构 结合Spring Data Elasticsearch,这里我给下ES的mapping和例子数据
PUT /pigg_tree/_mapping/_doc
{
"properties":{
"id":{
"type":"keyword"
},
"level":{
"type":"keyword"
},
"name":{
"type":"keyword"
},
"parentId":{
"type":"keyword"
},
"path":{
"type":"nested",
"properties":{
"id":{
"type":"keyword"
},
"level":{
"type":"keyword"
}
}
}
}
}
{
"_index" : "pigg_tree",
"_type" : "_doc",
"_id" : "5ebdf2a8551fa08956079179",
"_score" : null,
"_source" : {
"parentId" : "5ebdf263551fd81d52158964",
"level" : 3,
"path" : [
{
"level" : 1,
"id" : "5ebdf241551f9ae2328fa452"
},
{
"level" : 2,
"id" : "5ebdf263551fd81d52158964"
}
],
"id" : "5ebdf2a8551fa08956079179",
"name" : "夏夏夏"
},
"sort" : [
"夏夏夏",
"3"
]
}
4. 代码结构设计
4.1 普通结构接口-EsRepository
@NoRepositoryBean
public interface EsRepository<T> {
T save(T t);
T saveWithoutRefresh(T t);
Iterable<T> saveAll(Iterable<T> entities);
boolean deleteById(String id);
void deleteByQuery(QueryBuilder query);
boolean updateById(String id, Map<String, Object> doc);
void updateAllById(Iterable<String> ids, Map<String, Object> doc);
void updateByQuery(QueryBuilder query, Script script);
boolean existsById(String id);
Optional<T> findById(String id);
Optional<T> findById(String id, SourceFilter sourceFilter);
List<T> findAllById(Iterable<String> ids);
List<T> findAllById(Iterable<String> ids, SourceFilter sourceFilter);
List<T> findByQuery(QueryBuilder query);
List<T> findByQuery(QueryBuilder query, SourceFilter sourceFilter);
List<T> findByQuery(SearchQuery searchQuery);
PageInfo<T> pageQuery(SearchQuery searchQuery);
Long count(QueryBuilder query);
Map<String, Long> countGroupBy(String field, QueryBuilder query, Integer resultSize);
Class<T> getEntityClass();
}
4.2 树形结构接口-EsTreeRepository
@NoRepositoryBean
public interface EsTreeRepository<T extends TreeNode> extends EsRepository<T> {
T saveNode(T t);
Iterable<T> saveAllNodeOfParent(String parentId, Iterable<T> entities);
boolean deleteNodeById(String id);
List<T> findChildrenByParentId(String parentId, boolean onlyNextLevel, SearchQuery searchQuery);
List<T> findForefathersById(String id, SourceFilter sourceFilter);
Long countByParentId(String parentId, boolean onlyNextLevel, QueryBuilder query);
Map<String, Long> countByParentId(List<String> parentIds, boolean onlyNextLevel, QueryBuilder query);
}
4.3 普通结构抽象类-AbstractEsRepository
public abstract class AbstractEsRepository<T> implements EsRepository<T> {
....
省略实现方法
....
}
4.4 树形结构抽象类-AbstractEsTreeRepository
@Component
public class AbstractEsTreeRepository<T extends TreeNode> extends AbstractEsRepository<T> implements EsTreeRepository<T> {
....
省略实现方法
....
}
4.5 树形结构基类-TreeNode
@Data
public class TreeNode {
@EsNodeParentId
private String parentId;
@EsNodeLevel
private int level;
@EsPath
private List<ParentNode> path;
}
4.6 测试对象实体类-TestTreeEntity
注意下面的@ToString(callSuper=true),因为我用了@Data注解,在反序列化时发现得到的对象没有父类TreeNode的属性,经过排查发现是lombok默认重写了toString()方法,所以这样要加@ToString(callSuper=true),或者你就不要用lombok。
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString(callSuper=true)
@EsDocument(indexName = "pigg_tree", type = "_doc")
public class TestTreeEntity extends TreeNode {
@EsId
private String id;
private String name;
}
5. 实现方法
因为代码实在太多了,不可能全部贴博客了,列举几个感觉比较重要的实现方法。
5.1 saveAll
public Iterable<T> saveAll(Iterable<T> entities) {
BulkRequest bulkRequest = new BulkRequest();
Metadata metadataOfClass = null;
Iterator iterator = entities.iterator();
T first = (T) iterator.next();
metadataOfClass = MetadataUtils.getMetadata(first.getClass());
Metadata finalMetadataOfClass = metadataOfClass;
entities.forEach(t -> {
IndexRequest indexRequest = prepareIndex(t, finalMetadataOfClass);
if (indexRequest != null) {
bulkRequest.add(indexRequest);
}
});
try {
checkForBulkUpdateFailure(client.bulk(bulkRequest, RequestOptions.DEFAULT));
} catch (IOException e) {
throw new ElasticsearchException("Error while bulk for request: " + bulkRequest.toString(), e);
}
return entities;
}
5.2 deleteById
public boolean deleteById(String id) {
if (StringUtils.isEmpty(id)) {
throw new ElasticsearchException("ID cannot be empty");
}
Metadata metadata = MetadataUtils.getMetadata(getEntityClass());
DeleteRequest request = new DeleteRequest(
metadata.getIndexName(),
metadata.getTypeName(),
id);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
try {
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
if (deleteResponse.getResult() == DocWriteResponse.Result.DELETED) {
return true;
}
} catch (IOException e) {
throw new ElasticsearchException("Error while deleting item request: " + request.toString(), e);
}
return false;
}
5.3 deleteByQuery
public void deleteByQuery(QueryBuilder query) {
if (query == null) {
throw new ElasticsearchException("query cannot be empty");
}
Metadata metadata = MetadataUtils.getMetadata(getEntityClass());
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(metadata.getIndexName())
.setDocTypes(metadata.getTypeName())
.setQuery(query)
.setAbortOnVersionConflict(false)
.setRefresh(true);
deleteByQueryRequest.setConflicts("proceed");
try {
client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new ElasticsearchException("Error for delete request: " + deleteByQueryRequest.toString(), e);
}
}
5.4 updateAllById
public void updateAllById(Iterable<String> ids, Map<String, Object> doc){
Assert.notNull(ids, "ids can't be null.");
List<String> idList = stringIdsRepresentation(ids);
Metadata metadata = MetadataUtils.getMetadata(getEntityClass());
BulkRequest bulkRequest = new BulkRequest();
idList.forEach(id -> {
UpdateRequest request = new UpdateRequest(metadata.getIndexName(), metadata.getTypeName(), id);
request.doc(doc);
bulkRequest.add(request);
});
try {
checkForBulkUpdateFailure(client.bulk(bulkRequest, RequestOptions.DEFAULT));
} catch (IOException e) {
throw new ElasticsearchException("Error while bulk for request: " + bulkRequest.toString(), e);
}
}
5.5 existsById
public boolean existsById(String id) {
String thisId = stringIdRepresentation(id);
if (StringUtils.isEmpty(thisId)) {
throw new ElasticsearchException("ID cannot be empty");
}
Metadata metadata = MetadataUtils.getMetadata(getEntityClass());
GetRequest getRequest = new GetRequest(
metadata.getIndexName(),
metadata.getTypeName(),
thisId);
getRequest.fetchSourceContext(new FetchSourceContext(false));
getRequest.storedFields("_none_");
try {
return client.exists(getRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new ElasticsearchException("Error for existsById request: " + getRequest.toString(), e);
}
}
5.6 根据id集合获取数据List
public List<T> findAllById(Iterable<String> ids, SourceFilter sourceFilter) {
Assert.notNull(ids, "ids can't be null.");
List<String> idList = stringIdsRepresentation(ids);
Metadata metadata = MetadataUtils.getMetadata(getEntityClass());
if (metadata != null) {
MultiGetRequest request = new MultiGetRequest();
for (String id : idList) {
MultiGetRequest.Item item = new MultiGetRequest.Item(metadata.getIndexName(), metadata.getTypeName(), id);
if (sourceFilter != null && !(sourceFilter.getIncludes() == null && sourceFilter.getExcludes() == null)) {
item.fetchSourceContext(new FetchSourceContext(true, sourceFilter.getIncludes(), sourceFilter.getExcludes()));
}
request.add(item);
}
try {
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
return EsResponseUtils.multiGetResponse2Obj(response, this.entityClass);
} catch (IOException e) {
throw new ElasticsearchException("Error for findAllById request: " + request.toString(), e);
}
}
return null;
}
5.7 countGroupBy
public Map<String, Long> countGroupBy(String field, QueryBuilder query, Integer resultSize){
if (StringUtils.isEmpty(field)) {
throw new ElasticsearchException("field cannot be empty");
}
if (resultSize == null || resultSize <= 0){
resultSize = 1000;
}
Map<String, Long> groupMap = new LinkedHashMap<>();
Metadata metadata = MetadataUtils.getMetadata(getEntityClass());
AggregationBuilder agg = AggregationBuilders.terms("agg")
.field(field)
.size(resultSize)
.order(BucketOrder.key(true))
.order(BucketOrder.count(false));
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
if (null != query) {
boolQueryBuilder.filter(query);
}
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(boolQueryBuilder);
searchSourceBuilder.size(0);
searchSourceBuilder.aggregation(agg);
SearchRequest request = new SearchRequest(metadata.getIndexName());
request.types(metadata.getTypeName());
request.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(request, RequestOptions.DEFAULT);
Terms groups = searchResponse.getAggregations().get("agg");
for (Terms.Bucket entry : groups.getBuckets()) {
groupMap.put(entry.getKey().toString(), entry.getDocCount());
}
} catch (IOException e) {
e.printStackTrace();
}
return groupMap;
}
5.8 树形结构的countByParentId
这个方法是统计一组节点下其各自儿子或者孙子(通过onlyNextLevel区分)的共节点个数。
public Map<String, Long> countByParentId(List<String> parentIds, boolean onlyNextLevel, QueryBuilder query) {
if (CollectionUtils.isEmpty(parentIds)) {
throw new ElasticsearchException("parentIds cannot be empty");
}
Map<String, Long> result = new HashMap<>();
Metadata metadata = MetadataUtils.getMetadata(getEntityClass());
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
if (onlyNextLevel){
boolQueryBuilder.filter(QueryBuilders.termsQuery("parentId", parentIds));
return countGroupBy("parentId", boolQueryBuilder, parentIds.size());
}else {
if (query != null){
boolQueryBuilder.filter(query);
}
BoolQueryBuilder boolQueryBuilderForNested = QueryBuilders.boolQuery();
boolQueryBuilderForNested.filter(QueryBuilders.termsQuery("path.id", parentIds));
boolQueryBuilder.filter(QueryBuilders.nestedQuery("path", boolQueryBuilderForNested, ScoreMode.None));
NestedAggregationBuilder nestedAggregationBuilder = AggregationBuilders.nested("group_by_path", "path");
nestedAggregationBuilder.subAggregation(AggregationBuilders.terms("terms_by_path").field("path.id").size(parentIds.size()));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(boolQueryBuilder);
searchSourceBuilder.size(0);
searchSourceBuilder.aggregation(nestedAggregationBuilder);
System.out.println(boolQueryBuilder.toString());
System.out.println(nestedAggregationBuilder.toString());
SearchRequest request = new SearchRequest(metadata.getIndexName());
request.types(metadata.getTypeName());
request.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(request, RequestOptions.DEFAULT);
Aggregations aggregations = searchResponse.getAggregations();
if (aggregations != null) {
Map<String, Aggregation> aggregationMap = aggregations.asMap();
if (aggregationMap != null && !aggregationMap.isEmpty()) {
Aggregation groupByAncestorId = aggregationMap.get("group_by_path");
if (groupByAncestorId != null) {
ParsedNested parsedNested = (ParsedNested) groupByAncestorId;
//获得所有的桶
Aggregations subAggregations = parsedNested.getAggregations();
Map<String, Aggregation> subAggregationsMap = subAggregations.getAsMap();
Aggregation termsByAncestorId = subAggregationsMap.get("terms_by_path");
ParsedStringTerms parsedStringTerms = (ParsedStringTerms) termsByAncestorId;
//获得所有的桶
List<? extends Terms.Bucket> buckets = parsedStringTerms.getBuckets();
if (!CollectionUtils.isEmpty(buckets)) {
buckets.stream().forEach(bucket ->
{
result.put(bucket.getKeyAsString(), bucket.getDocCount());
});
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
return result;
}
总结
- 学一门技术,需要先广度,后深度,不要要求自己一下子达到什么高度,先完成简单的。
- 比如这次写这个ORM,暂时不考虑ES的index和mapping设置,version字段,多index操作等,这些可以后期慢慢完善。
- 要区分反射时getDeclaredFields()和getFields()方法,如果要获取父类的属性,可以用Hutool工具的ReflectUtil.getFieldsDirectly(clazz, true)。
转载:https://blog.csdn.net/winterking3/article/details/106140122