一、简介
1.1 概念
- 是什么
- The Elastic Stack, 包括 Elasticsearch、Kibana、Beats 和 Logstash(也称为 ELK Stack)。
- 能够安全可靠地获取任何来源、任何格式的数据,然后实时地对数据进行搜索、分析和可视化。
- Elaticsearch,简称为ES, ES是一个开源的高扩展的分布式全文搜索引擎,是整个Elastic Stack技术栈的核心。
- 它可以近乎实时的存储、检索数据;本身扩展性很好,可以扩展到上百台服务器,处理PB级别的数据。
- 数据格式
- Elasticsearch是面向文档型数据库,一条数据在这里就是一个文档。
- 与MySQL类比:
- Types的概念已经被逐渐弱化,Elasticsearch 6.X中,一个index下已经只能包含一个type,Elasticsearch 7.X中, Type的概念已经被删除了。
- 用JSON作为文档序列化的格式,比如一条用户信息:
{
"name" : "John",
"sex" : "Male",
"age" : 25,
"birthDate": "1990/05/01",
"about" : "I love to go rock climbing",
"interests": [ "sports", "music" ]
}
1.2 集群
# 同时在三台机子上操作,注意node.name、network.host不能重复
[omm@bigdata01 ~]$ tar -zxf /opt/soft/elasticsearch-7.8.0-linux-x86_64.tar.gz -C /opt/module/
[omm@bigdata01 ~]$ ln -s /opt/module/elasticsearch-7.8.0 /opt/module/es
[omm@bigdata01 ~]$ cp /opt/module/es/config/elasticsearch.yml{
,.bak}
[omm@bigdata01 ~]$ vi /opt/module/es/config/elasticsearch.yml
[omm@bigdata01 ~]$ cat /opt/module/es/config/elasticsearch.yml
#集群名称
cluster.name: cluster-es
#节点名称,每个节点的名称不能重复
node.name: node-1
#ip地址,每个节点的地址不能重复
network.host: bigdata01
#是不是有资格主节点
node.master: true
node.data: true
http.port: 9200
# head 插件需要这打开这两个配置
http.cors.allow-origin: "*"
http.cors.enabled: true
http.max_content_length: 200mb
#es7.x 之后新增的配置,初始化一个新的集群时需要此配置来选举master
cluster.initial_master_nodes: ["node-1","node-2","node-3"]
#es7.x 之后新增的配置,节点发现
discovery.seed_hosts: ["bigdata01:9300","bigdata02:9300","bigdata01:9300"]
gateway.recover_after_nodes: 2
network.tcp.keep_alive: true
network.tcp.no_delay: true
transport.tcp.compress: true
#集群内同时启动的数据任务个数,默认是2个
cluster.routing.allocation.cluster_concurrent_rebalance: 16
#添加或删除节点及负载均衡时并发恢复的线程个数,默认4个
cluster.routing.allocation.node_concurrent_recoveries: 16
#初始化数据恢复时,并发恢复线程的个数,默认4个
cluster.routing.allocation.node_initial_primaries_recoveries: 16
[omm@bigdata01 ~]$
# 重启shell生效,否则会报“max file descriptors [4096] for elasticsearch process is too low”
[omm@bigdata01 ~]$ sudo cp /etc/security/limits.conf{
,.bak}
[omm@bigdata01 ~]$ sudo vi /etc/security/limits.conf
[omm@bigdata01 ~]$ tail -2 /etc/security/limits.conf
omm soft nofile 65536
omm hard nofile 65536
[omm@bigdata01 ~]$ sudo cp /etc/security/limits.d/20-nproc.conf{
,.bak}
[omm@bigdata01 ~]$ sudo vi /etc/security/limits.d/20-nproc.conf
[omm@bigdata01 ~]$ tail -3 /etc/security/limits.d/20-nproc.conf
omm soft nofile 65536
omm hard nofile 65536
* hard nproc 4096
[omm@bigdata01 ~]$ sudo cp /etc/sysctl.conf{
,.bak}
[omm@bigdata01 ~]$ sudo vi /etc/sysctl.conf
[omm@bigdata01 ~]$ tail -1 /etc/sysctl.conf
vm.max_map_count=655360
[omm@bigdata01 ~]$ sudo sysctl -p
vm.max_map_count = 655360
[omm@bigdata01 ~]$ /opt/module/es/bin/elasticsearch -d
GET http://bigdata01:9200/_cluster/health
{
"cluster_name": "cluster-es",
"status": "green",
"timed_out": false,
"number_of_nodes": 3,
"number_of_data_nodes": 3,
"active_primary_shards": 0,
"active_shards": 0,
"relocating_shards": 0,
"initializing_shards": 0,
"unassigned_shards": 0,
"delayed_unassigned_shards": 0,
"number_of_pending_tasks": 0,
"number_of_in_flight_fetch": 0,
"task_max_waiting_in_queue_millis": 0,
"active_shards_percent_as_number": 100.0
}
GET http://bigdata01:9200/_cat/nodes
192.168.1.102 14 46 0 0.10 0.21 0.14 dilmrt - node-2
192.168.1.101 15 35 0 0.11 0.23 0.15 dilmrt - node-1
192.168.1.103 7 47 0 0.17 0.27 0.18 dilmrt * node-3
二、HTTP 操作
2.1 索引
- 创建索引
PUT http://127.0.0.1:9200/shopping
{
"acknowledged": true,
"shards_acknowledged": true,
"index": "shopping"
}
- 查看索引
GET http://127.0.0.1:9200/shopping
{
"shopping": {
"aliases": {
},
"mappings": {
},
"settings": {
"index": {
"creation_date": "1618233655684",
"number_of_shards": "1",
"number_of_replicas": "1",
"uuid": "UAVfEyfxR3yG5xKTJ6yFoQ",
"version": {
"created": "7080099"
},
"provided_name": "shopping"
}
}
}
}
- 查看所有索引
v : verbose,打印索引的详细信息
GET http://127.0.0.1:9200/_cat/indices?v
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
yellow open shopping UAVfEyfxR3yG5xKTJ6yFoQ 1 1 0 0 208b 208b
- 删除索引
DELETE http://127.0.0.1:9200/shopping
{
"acknowledged": true
}
2.2 文档
- 添加文档
POST http://127.0.0.1:9200/shopping/_doc
BODY
{
"title":"小米手机",
"category":"小米",
"images":"http://www.gulixueyuan.com/xm.jpg",
"price":3999.00
}
RESULT
{
"_index": "shopping",
"_type": "_doc",
"_id": "aA5FxngBuOhsc8gNHj7x",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1
}
- 添加文档并指定ID
POST http://127.0.0.1:9200/shopping/_doc/1001
BODY
{
"title":"小米手机",
"category":"小米",
"images":"http://www.gulixueyuan.com/xm.jpg",
"price":3999.00
}
RESULT
{
"_index": "shopping",
"_type": "_doc",
"_id": "1001",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 1,
"_primary_term": 1
}
- 查询指定文档
GET http://127.0.0.1:9200/shopping/_doc/1001
{
"_index": "shopping",
"_type": "_doc",
"_id": "1001",
"_version": 1,
"_seq_no": 1,
"_primary_term": 1,
"found": true,
"_source": {
"title": "小米手机",
"category": "小米",
"images": "http://www.gulixueyuan.com/xm.jpg",
"price": 3999.00
}
}
- 查询所有文档
GET http://127.0.0.1:9200/shopping/_search
{
"took": 41,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 2,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "shopping",
"_type": "_doc",
"_id": "aA5FxngBuOhsc8gNHj7x",
"_score": 1.0,
"_source": {
"title": "小米手机",
"category": "小米",
"images": "http://www.gulixueyuan.com/xm.jpg",
"price": 3999.00
}
},
{
"_index": "shopping",
"_type": "_doc",
"_id": "1001",
"_score": 1.0,
"_source": {
"title": "小米手机",
"category": "小米",
"images": "http://www.gulixueyuan.com/xm.jpg",
"price": 3999.00
}
}
]
}
}
- 覆盖文档
PUT http://127.0.0.1:9200/shopping/_doc/1001
BODY {
"title":"华为手机",
"category":"华为",
"images":"http://www.gulixueyuan.com/hw.jpg",
"price":3999.00
}
RESULT {
"_index": "shopping",
"_type": "_doc",
"_id": "1001",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 2,
"_primary_term": 1
}
- 更新文档
POST http://127.0.0.1:9200/shopping/_update/1001
BODY {
"doc": {
"price": 4999.00
}
}
RESULT {
"_index": "shopping",
"_type": "_doc",
"_id": "1001",
"_version": 3,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 3,
"_primary_term": 1
}
- 删除文档
DELETE http://127.0.0.1:9200/shopping/_doc/1001
{
"_index": "shopping",
"_type": "_doc",
"_id": "1001",
"_version": 4,
"result": "deleted",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 4,
"_primary_term": 1
}
2.3 映射
- 有了索引库,等于有了数据库中的database。
- 接下来就需要建索引库(index)中的映射了,类似于数据库(database)中的表结构(table)。
- 创建数据库表需要设置字段名称,类型,长度,约束等;索引库也一样,需要知道这个类型下有哪些字段,每个字段有哪些约束信息,这就叫做映射(mapping)。
- 创建映射
PUT http://127.0.0.1:9200/student
PUT http://127.0.0.1:9200/student/_mapping
BODY {
"properties": {
"name":{
"type": "text",
"index": true
},
"sex":{
"type": "text",
"index": false
},
"age":{
"type": "long",
"index": false
}
}
}
RESULT {
"acknowledged": true
}
- type
类型 | 子类型 | 说明 |
---|---|---|
String | Text | 可分词 |
keyword | 不可分词,数据会作为完整字段进行匹配 | |
Numerical | 基本数据类型 | long、integer、short、byte、double、float、half_float |
浮点数的高精度类型 | scaled_float | |
Date | 日期类型 | |
Array | 数组类型 | |
Object | 对象 |
-
index:是否索引,默认为true,也就是说你不进行任何配置,所有字段都会被索引。
2.1 true:字段会被索引,则可以用来进行搜索
2.2 false:字段不会被索引,不能用来搜索 -
store:是否将数据进行独立存储,默认为false
3.1 原始的文本会存储在_source里面,默认情况下其他提取出来的字段都不是独立存储的,是从_source里面提取出来的。
3.2 当然你也可以独立的存储某个字段,只要设置"store": true即可,获取独立存储的字段要比从_source中解析快得多,但是也会占用更多的空间,所以要根据实际业务需求来设置。
- 查看映射
GET http://127.0.0.1:9200/student/_mapping
{
"student": {
"mappings": {
"properties": {
"age": {
"type": "long",
"index": false
},
"name": {
"type": "text"
},
"sex": {
"type": "text",
"index": false
}
}
}
}
}
2.4 高级查询
数据准备
# POST /student/_doc/1001
{
"name":"zhangsan",
"nickname":"zhangsan",
"sex":"男",
"age":30
}
# POST /student/_doc/1002
{
"name":"lisi",
"nickname":"lisi",
"sex":"男",
"age":20
}
# POST /student/_doc/1003
{
"name":"wangwu",
"nickname":"wangwu",
"sex":"女",
"age":40
}
# POST /student/_doc/1004
{
"name":"zhangsan1",
"nickname":"zhangsan1",
"sex":"女",
"age":50
}
# POST /student/_doc/1005
{
"name":"zhangsan2",
"nickname":"zhangsan2",
"sex":"女",
"age":30
}
- 查询所有文档
GET http://127.0.0.1:9200/student/_search
BODY {
"query": {
"match_all": {
}
}
}
# "query":这里的query代表一个查询对象,里面可以有不同的查询属性
# "match_all":查询类型,例如:match_all(代表查询所有), match,term , range 等等
# {查询条件}:查询条件会根据类型的不同,写法也有差异
RESULT {
"took": 864,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 5,
"relation": "eq"
},
...
]
}
}
- 匹配查询
match匹配类型查询,会把查询条件进行分词,然后进行查询,多个词条之间是or的关系
GET http://127.0.0.1:9200/student/_search
BODY {
"query": {
"match": {
"name":"zhangsan"
}
}
}
- 字段匹配查询
multi_match与match类似,不同的是它可以在多个字段中查询。
{
"query": {
"multi_match": {
"query": "zhangsan",
"fields": ["name","nickname"]
}
}
}
- 关键字精确查询
term查询,精确的关键词匹配查询,不对查询条件进行分词。
{
"query": {
"term": {
"name": {
"value": "zhangsan"
}
}
}
}
- 多关键字精确查询
terms 查询和 term 查询一样,但它允许你指定多值进行匹配。
{
"query": {
"terms": {
"name": ["zhangsan","lisi"]
}
}
}
- 指定查询字段
默认情况下,Elasticsearch在搜索的结果中,会把文档中保存在_source的所有字段都返回。如果我们只想获取其中的部分字段,我们可以添加_source的过滤
{
"_source": ["name","nickname"],
"query": {
"terms": {
"nickname": ["zhangsan"]
}
}
}
- 过滤字段
可以通过 includes:来指定想要显示的字段,excludes:来指定不想要显示的字段
{
"_source": {
"includes": ["name","nickname"]
},
"query": {
"terms": {
"nickname": ["zhangsan"]
}
}
}
- 组合查询
bool
把各种其它查询通过must
(必须 )、must_not
(必须不)、should
(应该)的方式进行组合
{
"query": {
"bool": {
"must": [
{
"match": {
"name": "zhangsan"
}
}
],
"must_not": [
{
"match": {
"age": "40"
}
}
],
"should": [
{
"match": {
"sex": "男"
}
}
]
}
}
}
- 范围查询
range 查询找出那些落在指定区间内的数字或者时间。range查询允许以下字符
操作符 | 说明 |
---|---|
gt | 大于> |
gte | 大于等于>= |
lt | 小于< |
lte | 小于等于<= |
{
"query": {
"range": {
"age": {
"gte": 30,
"lte": 35
}
}
}
}
- 模糊查询
返回包含与搜索字词相似的字词的文档。
# 编辑距离是将一个术语转换为另一个术语所需的一个字符更改的次数。这些更改可以包括:
# > 更改字符(box → fox)
# > 删除字符(black → lack)
# > 插入字符(sic → sick)
# > 转置两个相邻字符(act → cat)
# 为了找到相似的术语,fuzzy查询会在指定的编辑距离内创建一组搜索词的所有可能的变体或扩展。然后查询返回每个扩展的完全匹配。
# 通过fuzziness修改编辑距离。一般使用默认值AUTO,根据术语的长度生成编辑距离。
{
"query": {
"fuzzy": {
"title": {
"value": "zhangsan"
}
}
}
}
- 单字段排序
sort 可以让我们按照不同的字段进行排序,并且通过order指定排序的方式。desc降序,asc升序。
{
"query": {
"match": {
"name":"zhangsan"
}
},
"sort": [{
"age": {
"order":"desc"
}
}]
}
- 多字段排序
假定我们想要结合使用 age和 _score进行查询,并且匹配的结果首先按照年龄排序,然后按照相关性得分排序
{
"query": {
"match_all": {
}
},
"sort": [
{
"age": {
"order": "desc"
}
},
{
"_score":{
"order": "desc"
}
}
]
}
- 高亮查询
在进行关键字搜索时,搜索出的内容中的关键字会显示不同的颜色,称之为高亮。
# Elasticsearch可以对查询内容中的关键字部分,进行标签和样式(高亮)的设置。
# 在使用match查询的同时,加上一个highlight属性:
# >pre_tags:前置标签
# >post_tags:后置标签
# >fields:需要高亮的字段
# >title:这里声明title字段需要高亮,后面可以为这个字段设置特有配置,也可以空
{
"query": {
"match": {
"name": "zhangsan"
}
},
"highlight": {
"pre_tags": "<font color='red'>",
"post_tags": "</font>",
"fields": {
"name": {
}
}
}
}
- 分页查询
# from:当前页的起始索引,默认从0开始。 from = (pageNum - 1) * size
# size:每页显示多少条
{
"query": {
"match_all": {
}
},
"sort": [
{
"age": {
"order": "desc"
}
}
],
"from": 0,
"size": 2
}
- 聚合查询
聚合允许使用者对es文档进行统计分析,类似与关系型数据库中的group by,当然还有很多其他的聚合,例如取最大值、平均值等等。
{
"aggs":{
"max_age":{
"max":{
"field":"age"}
}
},
"size":0
}
- 桶聚合查询
桶聚和相当于sql中的group by语句:terms聚合,分组统计
{
"aggs":{
"age_groupby":{
"terms":{
"field":"age"}
}
},
"size":0
}
三、Java API
3.1 索引
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.8.0</version>
</dependency>
<!-- elasticsearch的客户端 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.8.0</version>
</dependency>
<!-- elasticsearch依赖2.x的log4j -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.9</version>
</dependency>
<!-- junit单元测试 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
package com.simwor.bigdata.es;
import org.apache.http.HttpHost;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetIndexResponse;
import java.io.IOException;
public class ESClient {
public static void main(String[] args) throws IOException {
RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200))
);
createIndex(esClient);
queryIndex(esClient);
deleteIndex(esClient);
esClient.close();
}
private static void deleteIndex(RestHighLevelClient esClient) throws IOException {
DeleteIndexRequest request = new DeleteIndexRequest("user");
AcknowledgedResponse acknowledgedResponse = esClient.indices().delete(request, RequestOptions.DEFAULT);
System.out.println("删除索引是否成功:" + acknowledgedResponse.isAcknowledged());
}
private static void queryIndex(RestHighLevelClient esClient) throws IOException {
GetIndexRequest request = new GetIndexRequest("user");
GetIndexResponse response = esClient.indices().get(request, RequestOptions.DEFAULT);
System.out.println(response.getAliases());
System.out.println(response.getMappings());
System.out.println(response.getSettings());
}
private static void createIndex(RestHighLevelClient esClient) throws IOException {
CreateIndexRequest request = new CreateIndexRequest("user");
CreateIndexResponse createIndexResponse = esClient.indices().create(request, RequestOptions.DEFAULT);
System.out.println("创建索引是否成功:" + createIndexResponse.isAcknowledged());
}
}
创建索引是否成功:true
{
user=[]}
{
user=org.elasticsearch.cluster.metadata.MappingMetadata@91416359}
{
user={
"index.creation_date":"1618280034576","index.number_of_replicas":"1","index.number_of_shards":"1","index.provided_name":"user","index.uuid":"Dus3O-NJTiekoE_Gr2Rchg","index.version.created":"7080099"}}
删除索引是否成功:true
3.2 文档
package com.simwor.bigdata.es;
public class User {
private String name;
private String sex;
private Integer age;
public User(String name, String sex, Integer age) {
this.name = name;
this.sex = sex;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
}
package com.simwor.bigdata.es;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.HttpHost;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
public class ESClient {
public static void main(String[] args) throws IOException {
RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200))
);
insertDoc(esClient);
updateDoc(esClient);
getDoc(esClient);
deleteDoc(esClient);
batchInsertDoc(esClient);
batchDeleteDoc(esClient);
esClient.close();
}
private static void batchDeleteDoc(RestHighLevelClient esClient) throws IOException {
BulkRequest request = new BulkRequest();
DeleteRequest r1 = new DeleteRequest().index("user").id("1001");
DeleteRequest r2 = new DeleteRequest().index("user").id("1002");
DeleteRequest r3 = new DeleteRequest().index("user").id("1003");
request.add(r1).add(r2).add(r3);
BulkResponse response = esClient.bulk(request, RequestOptions.DEFAULT);
System.out.println("批量删除耗费时间:" + response.getTook());
}
private static void batchInsertDoc(RestHighLevelClient esClient) throws IOException {
ObjectMapper mapper = new ObjectMapper();
String u1Json = mapper.writeValueAsString(new User("rayslee", "male", 18));
String u2Json = mapper.writeValueAsString(new User("rachel", "female", 19));
String u3Json = mapper.writeValueAsString(new User("chandler", "male", 20));
BulkRequest request = new BulkRequest();
IndexRequest r1 = new IndexRequest().index("user").id("1001").source(u1Json, XContentType.JSON);
IndexRequest r2 = new IndexRequest().index("user").id("1002").source(u2Json, XContentType.JSON);
IndexRequest r3 = new IndexRequest().index("user").id("1003").source(u3Json, XContentType.JSON);
request.add(r1).add(r2).add(r3);
BulkResponse response = esClient.bulk(request, RequestOptions.DEFAULT);
System.out.println("批量插入耗费时间:" + response.getTook());
}
private static void deleteDoc(RestHighLevelClient esClient) throws IOException {
DeleteRequest request = new DeleteRequest();
request.index("user").id("1001");
DeleteResponse response = esClient.delete(request, RequestOptions.DEFAULT);
System.out.println(response);
}
private static void getDoc(RestHighLevelClient esClient) throws IOException {
GetRequest request = new GetRequest();
request.index("user").id("1001");
GetResponse response = esClient.get(request, RequestOptions.DEFAULT);
System.out.println(response.getSourceAsString());
}
private static void updateDoc(RestHighLevelClient esClient) throws IOException {
UpdateRequest request = new UpdateRequest();
request.index("user").id("1001");
request.doc(XContentType.JSON, "sex", "男");
UpdateResponse response = esClient.update(request, RequestOptions.DEFAULT);
System.out.println("更新文档结果:" + response.getResult());
}
private static void insertDoc(RestHighLevelClient esClient) throws IOException {
User user = new User("rayslee", "male", 18);
ObjectMapper mapper = new ObjectMapper();
String userJson = mapper.writeValueAsString(user);
IndexRequest request = new IndexRequest();
request.index("user").id("1001");
request.source(userJson, XContentType.JSON);
IndexResponse response = esClient.index(request, RequestOptions.DEFAULT);
System.out.println("插入文档结果:" + response.getResult());
}
}
插入文档结果:CREATED
更新文档结果:UPDATED
{
"name":"rayslee","sex":"男","age":18}
DeleteResponse[index=user,type=_doc,id=1001,version=45,result=deleted,shards=ShardInfo{
total=2, successful=1, failures=[]}]
批量插入耗费时间:6ms
批量删除耗费时间:5ms
3.3 高级查询
package com.simwor.bigdata.es;
import org.apache.http.HttpHost;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
public class ESClient {
public static void main(String[] args) throws Exception {
RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http")));
// 0. 准备数据
//batchInsertDoc(esClient);
SearchRequest request = new SearchRequest();
request.indices("user");
// 1. 查询索引中全部的数据
// request.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()));
// 2. 条件查询 : termQuery
// request.source(new SearchSourceBuilder().query(QueryBuilders.termQuery("age", 30)));
// 3. 分页查询
// SearchSourceBuilder builder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
// builder.from(2);
// builder.size(2);
// request.source(builder);
// 4. 查询排序
// SearchSourceBuilder builder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
// builder.sort("age", SortOrder.DESC);
// request.source(builder);
// 5. 过滤字段
// SearchSourceBuilder builder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
// String[] excludes = {"age"};
// String[] includes = {};
// builder.fetchSource(includes, excludes);
// request.source(builder);
// 6. 组合查询
// SearchSourceBuilder builder = new SearchSourceBuilder();
// BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
// //boolQueryBuilder.must(QueryBuilders.matchQuery("age", 30));
// //boolQueryBuilder.must(QueryBuilders.matchQuery("sex", "男"));
// //boolQueryBuilder.mustNot(QueryBuilders.matchQuery("sex", "男"));
// boolQueryBuilder.should(QueryBuilders.matchQuery("age", 30));
// boolQueryBuilder.should(QueryBuilders.matchQuery("age", 40));
// builder.query(boolQueryBuilder);
// request.source(builder);
// 7. 范围查询
// SearchSourceBuilder builder = new SearchSourceBuilder();
// RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("age");
// rangeQuery.gte(30);
// rangeQuery.lt(50);
// builder.query(rangeQuery);
// request.source(builder);
// 8. 模糊查询
// SearchSourceBuilder builder = new SearchSourceBuilder();
// //builder.query(QueryBuilders.fuzzyQuery("name", "wangwu").fuzziness(Fuzziness.ONE));
// builder.query(QueryBuilders.fuzzyQuery("name", "wangwu").fuzziness(Fuzziness.TWO));
// request.source(builder);
// 9. 高亮查询
// SearchSourceBuilder builder = new SearchSourceBuilder();
// TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery("name", "zhangsan");
// HighlightBuilder highlightBuilder = new HighlightBuilder();
//
// highlightBuilder.preTags("<font color='red'>");
// highlightBuilder.postTags("</font>");
// highlightBuilder.field("name");
//
// builder.highlighter(highlightBuilder);
// builder.query(termsQueryBuilder);
// request.source(builder);
// 10. 聚合查询
// SearchSourceBuilder builder = new SearchSourceBuilder();
// AggregationBuilder aggregationBuilder = AggregationBuilders.max("maxAge").field("age");
// builder.aggregation(aggregationBuilder);
// request.source(builder);
// 11. 分组查询
SearchSourceBuilder builder = new SearchSourceBuilder();
AggregationBuilder aggregationBuilder = AggregationBuilders.terms("ageGroup").field("age");
builder.aggregation(aggregationBuilder);
request.source(builder);
SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
for ( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
esClient.close();
}
private static void batchInsertDoc(RestHighLevelClient esClient) throws IOException {
BulkRequest request = new BulkRequest();
request.add(new IndexRequest().index("user").id("1001").source(XContentType.JSON, "name", "zhangsan", "age",30,"sex","男"));
request.add(new IndexRequest().index("user").id("1002").source(XContentType.JSON, "name", "lisi", "age",30,"sex","女"));
request.add(new IndexRequest().index("user").id("1003").source(XContentType.JSON, "name", "wangwu", "age",40,"sex","男"));
request.add(new IndexRequest().index("user").id("1004").source(XContentType.JSON, "name", "wangwu1", "age",40,"sex","女"));
request.add(new IndexRequest().index("user").id("1005").source(XContentType.JSON, "name", "wangwu2", "age",50,"sex","男"));
request.add(new IndexRequest().index("user").id("1006").source(XContentType.JSON, "name", "wangwu3", "age",50,"sex","男"));
request.add(new IndexRequest().index("user").id("1007").source(XContentType.JSON, "name", "wangwu44", "age",60,"sex","男"));
request.add(new IndexRequest().index("user").id("1008").source(XContentType.JSON, "name", "wangwu555", "age",60,"sex","男"));
request.add(new IndexRequest().index("user").id("1009").source(XContentType.JSON, "name", "wangwu66666", "age",60,"sex","男"));
BulkResponse response = esClient.bulk(request, RequestOptions.DEFAULT);
}
}
四、进阶
4.1 核心概念
# 索引
一个索引就是一个拥有几分相似特征的文档的集合。
# 文档
一个文档是一个可被索引的基础信息单元,也就是一条数据。
# 字段
字段相当于是数据表的字段,对文档数据根据不同属性进行的分类标识。
# 映射
映射(Mapping)是处理数据的方式和规则方面做一些限制
# 分片
1. 一个索引可以存储超出单个节点硬件限制的大量数据,Elasticsearch提供了将索引划分成多份的能力,每一份就称之为分片。
2. 每个分片本身也是一个功能完善并且独立的“索引”,这个“索引”可以被放置到集群中的任何节点上。
3. 分片很重要,主要有两方面的原因:
3.1)允许你水平分割 / 扩展你的内容容量。
3.2)允许你在分片之上进行分布式的、并行的操作,进而提高性能/吞吐量。
4. 一个 Lucene 索引 我们在 Elasticsearch 称作 分片 。
5. 一个 Elasticsearch 索引 是分片的集合。
6. 当 Elasticsearch 在索引中搜索的时候, 他发送查询到每一个属于索引的分片(Lucene 索引),然后合并每个分片的结果到一个全局的结果集。
# 副本
1. Elasticsearch允许你创建分片的一份或多份拷贝,这些拷贝叫做复制分片(副本)。
2. 复制分片可以扩展你的搜索量/吞吐量,因为搜索可以在所有的副本上并行运行。
# 分配
1. 分配(Allocation)是将分片分配给某个节点的过程,包括分配主分片或者副本。
2. 如果是副本,还包含从主分片复制数据的过程。这个过程是由master节点完成的。
4.2 系统架构
1. 一个运行中的 Elasticsearch 实例称为一个节点,而集群是由一个或者多个拥有相同 cluster.name 配置的节点组成, 它们共同承担数据和负载的压力。
2. 当有节点加入集群中或者从集群中移除节点时,集群将会重新平均分布所有的数据。
3. 当一个节点被选举成为主节点时, 它将负责管理集群范围内的所有变更,例如增加、删除索引,或者增加、删除节点等。
4. 而主节点并不需要涉及到文档级别的变更和搜索等操作,所以当集群只拥有一个主节点的情况下,即使流量的增加它也不会成为瓶颈。
5. 任何节点都可以成为主节点。
6. 作为用户,我们可以将请求发送到集群中的任何节点 ,包括主节点。
7. 每个节点都知道任意文档所处的位置,并且能够将我们的请求直接转发到存储我们所需文档的节点。
8. 无论我们将请求发送到哪个节点,它都能负责从各个包含我们所需文档的节点收集回数据,并将最终结果返回給客户端。
4.3 读写流程
- 创建索引
PUT http://bigdata01:9200/users
BODY {
"settings" : {
"number_of_shards" : 3,
"number_of_replicas" : 1
}
}
RESULT {
"acknowledged": true,
"shards_acknowledged": true,
"index": "users"
}
Chrome 安装 “ElasticSearch Head” 插件
- 路由计算
当索引一个文档的时候,文档会被存储到一个主分片中。 Elasticsearch 如何知道一个文档应该存放到哪个分片中呢?
- 写流程
- 读流程
在处理读取请求时,协调结点在每次请求的时候都会通过轮询所有的副本分片来达到负载均衡。
4.4 分片原理
4.4.1 倒排索引
- 正向索引
所谓的正向索引,就是搜索引擎会将待搜索的文件都对应一个文件ID,搜索时将这个ID和搜索关键字进行对应,形成K-V对,然后对关键字进行统计计数。
- 倒排索引
倒排索引,即把文件ID对应到关键词的映射转换为关键词到文件ID的映射,每个关键词都对应着一系列的文件,这些文件中都出现这个关键词。
4.4.2 文档搜索
- 倒排索引的不变性
- 不需要锁。如果你从来不更新索引,你就不需要担心多进程同时修改数据的问题。
- 一旦索引被读入内核的文件系统缓存,便会留在哪里,由于其不变性。只要文件系统缓存中还有足够的空间,那么大部分读请求会直接请求内存,而不会命中磁盘。这提供了很大的性能提升。
- 其它缓存(像filter缓存),在索引的生命周期内始终有效。它们不需要在每次数据改变时被重建,因为数据不会变化。
- 写入单个大的倒排索引允许数据被压缩,减少磁盘 I/O 和 需要被缓存到内存的索引的使用量。
- 但如果你需要让一个新的文档可被搜索,需要重建整个索引。这要么对一个索引所能包含的数据量造成了很大的限制,要么对索引可被更新的频率造成了很大的限制。
- 动态更新索引
如何在保留不变性的前提下实现倒排索引的更新?
- 用更多的索引。通过增加新的补充索引来反映新近的修改,而不是直接重写整个倒排索引。每一个倒排索引都会被轮流查询到,从最早的开始查询完后再对结果进行合并。
- 新文档被收集到内存索引缓存,不时地缓存被提交形成 新的段(本身也是倒排索引),当一个查询被触发,所有已知的段按顺序被查询。
- 段是不可改变的,当一个文档被 “删除” 时,它实际上只是在 .del 文件中被 标记删除。一个被标记删除的文档仍然可以被查询匹配到, 但它会在最终结果被返回前从结果集中移除。
- 段合并的时候会将那些旧的已删除文档从文件系统中清除。
- 近实时搜索
- 新增的文档只有在段提交后才可以被索引到,如果直接使用 fsync 刷写到磁盘代价很大,取而代之的是将其刷新到 OS Cache 文件系统缓存便对搜索可见(1s后)。
- 段刷新的频率可以通过调整,也可以手动调用
/users/_refresh
进行实时刷新。
{
"settings": {
"refresh_interval": "30s"
}
}
- 持久化变更:translog 或者叫事务日志,在每一次对 Elasticsearch 进行操作时均进行了日志记录。
4.5 文档冲突
- 创建测试文档
POST http://127.0.0.1:9200/shopping/_doc/1001
BODY
{
"title":"小米手机",
"category":"小米",
"images":"http://www.gulixueyuan.com/xm.jpg",
"price":3999.00
}
RESULT
{
"_index": "shopping",
"_type": "_doc",
"_id": "1000",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 6,
"_primary_term": 3
}
- 乐观更新
默认情况下,并发更新同一个文档会互相覆盖,通过添加版本号来乐观尝试更新、冲突报错。
POST http://127.0.0.1:9200/shopping/_doc/1000?if_seq_no=1&if_primary_term=1
RESULT {
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[1000]: version conflict, required seqNo [1], primary term [1]. current document has seqNo [6] and primary term [3]",
"index_uuid": "odvP_kiVSKuBWb4_zjrjsA",
"shard": "0",
"index": "shopping"
}
],
"type": "version_conflict_engine_exception",
"reason": "[1000]: version conflict, required seqNo [1], primary term [1]. current document has seqNo [6] and primary term [3]",
"index_uuid": "odvP_kiVSKuBWb4_zjrjsA",
"shard": "0",
"index": "shopping"
},
"status": 409
}
POST http://127.0.0.1:9200/shopping/_doc/1000?if_seq_no=6&if_primary_term=3
{
"_index": "shopping",
"_type": "_doc",
"_id": "1000",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 7,
"_primary_term": 3
}
五、集成
5.1 Spring Data
- application.properties
# es服务地址
elasticsearch.host=127.0.0.1
# es服务端口
elasticsearch.port=9200
# 配置日志级别,开启debug日志
logging.level.com.simwor.bigdata.es=debug
- dependencies
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.6.RELEASE</version>
<relativePath/>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
</dependency>
</dependencies>
- SpringDataElasticSearchMainApplication.java
package com.simwor.bigdata.es;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringDataElasticSearchMainApplication {
public static void main(String[] args) {
SpringApplication.run(SpringDataElasticSearchMainApplication.class,args);
}
}
- ElasticsearchConfig.java
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
@ConfigurationProperties(prefix = "elasticsearch")
@Configuration
@Data
public class ElasticsearchConfig extends AbstractElasticsearchConfiguration {
private String host ;
private Integer port ;
@Override
public RestHighLevelClient elasticsearchClient() {
RestClientBuilder builder = RestClient.builder(new HttpHost(host, port));
return new RestHighLevelClient(builder);
}
}
- Product.java
package com.simwor.bigdata.es;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
@Document(indexName = "product", shards = 3, replicas = 1)
public class Product {
//必须有id,这里的id是全局唯一的标识,等同于es中的"_id"
@Id
private Long id;//商品唯一标识
/**
* type : 字段数据类型
* analyzer : 分词器类型
* index : 是否索引(默认:true)
* Keyword : 短语,不进行分词
*/
@Field(type = FieldType.Text)
private String title;//商品名称
@Field(type = FieldType.Keyword)
private String category;//分类名称
@Field(type = FieldType.Double)
private Double price;//商品价格
@Field(type = FieldType.Keyword, index = false)
private String images;//图片地址
}
- ProductDao.java
package com.simwor.bigdata.es;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface ProductDao extends ElasticsearchRepository<Product,Long> {
}
5.1.1 添加、删除索引
package com.simwor.bigdata.es;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.IndexOperations;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringDataESIndexTest {
//注入ElasticsearchRestTemplate
@Autowired
private ElasticsearchRestTemplate elasticsearchRestTemplate;
//创建索引并增加映射配置
@Test
public void createIndex() {
IndexOperations indexOperations = elasticsearchRestTemplate.indexOps(Product.class);
boolean isExist = indexOperations.exists();
//创建索引,系统初始化会自动创建索引
System.out.println("自动创建创建索引" + isExist);
}
@Test
public void deleteIndex(){
//创建索引,系统初始化会自动创建索引
IndexOperations indexOperations = elasticsearchRestTemplate.indexOps(Product.class);
boolean deleted = indexOperations.delete();
System.out.println("删除索引 = " + deleted);
}
}
5.1.2 文档操作
package com.simwor.bigdata.es;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.ArrayList;
import java.util.List;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringDataESProductDaoTest {
@Autowired
private ProductDao productDao;
/**
* 新增
*/
@Test
public void save(){
Product product = new Product();
product.setId(2L);
product.setTitle("华为手机");
product.setCategory("手机");
product.setPrice(2999.0);
product.setImages("http://www.atguigu/hw.jpg");
productDao.save(product);
}
//修改
@Test
public void update(){
Product product = new Product();
product.setId(1L);
product.setTitle("小米2手机");
product.setCategory("手机");
product.setPrice(9999.0);
product.setImages("http://www.atguigu/xm.jpg");
productDao.save(product);
}
//根据id查询
@Test
public void findById(){
Product product = productDao.findById(1L).get();
System.out.println(product);
}
//查询所有
@Test
public void findAll(){
Iterable<Product> products = productDao.findAll();
for (Product product : products) {
System.out.println(product);
}
}
//删除
@Test
public void delete(){
Product product = new Product();
product.setId(1L);
productDao.delete(product);
}
//批量新增
@Test
public void saveAll(){
List<Product> productList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Product product = new Product();
product.setId(Long.valueOf(i));
product.setTitle("["+i+"]小米手机");
product.setCategory("手机");
product.setPrice(1999.0+i);
product.setImages("http://www.atguigu/xm.jpg");
productList.add(product);
}
productDao.saveAll(productList);
}
//分页查询
@Test
public void findByPageable(){
//设置排序(排序方式,正序还是倒序,排序的id)
Sort sort = Sort.by(Sort.Direction.DESC,"id");
int currentPage=0;//当前页,第一页从0开始,1表示第二页
int pageSize = 5;//每页显示多少条
//设置查询分页
PageRequest pageRequest = PageRequest.of(currentPage, pageSize,sort);
//分页查询
Page<Product> productPage = productDao.findAll(pageRequest);
for (Product Product : productPage.getContent()) {
System.out.println(Product);
}
}
}
5.1.3 文档搜索
package com.simwor.bigdata.es;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.PageRequest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringDataESSearchTest {
@Autowired
private ProductDao productDao;
/**
* term查询
* search(termQueryBuilder) 调用搜索方法,参数查询构建器对象
*/
@Test
public void termQuery(){
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "小米");
Iterable<Product> products = productDao.search(termQueryBuilder);
for (Product product : products) {
System.out.println(product);
}
}
/**
* term查询加分页
*/
@Test
public void termQueryByPage(){
int currentPage= 0 ;
int pageSize = 5;
//设置查询分页
PageRequest pageRequest = PageRequest.of(currentPage, pageSize);
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "小米");
Iterable<Product> products = productDao.search(termQueryBuilder,pageRequest);
for (Product product : products) {
System.out.println(product);
}
}
}
5.2 Spark Streaming
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.8.0</version>
</dependency>
<!-- elasticsearch的客户端 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.8.0</version>
</dependency>
<!-- elasticsearch依赖2.x的log4j -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
</dependencies>
package com.simwor.bigdata
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{
Seconds, StreamingContext}
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.{
RequestOptions, RestClient, RestHighLevelClient}
import org.elasticsearch.common.xcontent.XContentType
import java.util.Date
object Spark2ESTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ESTest")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val ds: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
ds.foreachRDD(
rdd => {
println("*************** " + new Date())
rdd.foreach(
data => {
val client = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
// 新增文档 - 请求对象
val request = new IndexRequest();
// 设置索引及唯一性标识
val ss = data.split(" ")
println("ss = " + ss.mkString(","))
request.index("sparkstreaming").id(ss(0));
val productJson =
s"""
| { "data":"${ss(1)}" }
|""".stripMargin;
// 添加文档数据,数据格式为JSON格式
request.source(productJson,XContentType.JSON);
// 客户端发送请求,获取响应对象
val response = client.index(request, RequestOptions.DEFAULT);
System.out.println("_index:" + response.getIndex());
System.out.println("_id:" + response.getId());
System.out.println("_result:" + response.getResult());
client.close()
}
)
}
)
ssc.start()
ssc.awaitTermination()
}
}
5.3 Flink
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<!-- jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.11.1</version>
</dependency>
</dependencies>
package com.simwor.bigdata;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class FlinkElasticsearchSinkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<String>() {
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.source(json);
}
}
);
esSinkBuilder.setBulkFlushMaxActions(1);
source.addSink(esSinkBuilder.build());
env.execute("flink-es");
}
}
六、优化
6.1 分片数量
- 控制每个分片占用的硬盘容量不超过ES的最大JVM的堆空间设置(一般设置不超过32G,参考下文的JVM设置原则);
- 分片数不超过节点数的3倍,参考公式:
节点数 <= 主分片数 *(副本数+1)
- 对于节点瞬时中断的问题,参数 delayed_timeout ,可以延长再均衡的时间:
PUT /_all/_settings
{
"settings": {
"index.unassigned.node_left.delayed_timeout": "5m"
}
}
6.2 写入
- 有大量的写任务时,可以使用 Bulk 来进行批量写入,默认设置批量提交的数据量不能超过 100M ;
- 默认情况下索引的 refresh_interval 为 1 秒,如果对搜索的实效性要求不高,可以将 Refresh 周期延长;
- 当 Translog 的数据量达到
index.translog.flush_threshold_size
512MB 或者 30 分钟时,会触发一次 Flush; - 当写索引时,需要把写入的数据都同步到副本节点,副本节点越多,写索引的效率就越慢。如果我们需要大批量进行写入操作,可以先禁止 Replica 复制,设置
index.number_of_replicas
: 0 关闭副本。在写入完成后,Replica 修改回正常的状态。
6.3 内存
- ES 堆内存分配原则
- 不要超过物理内存的 50%:Lucene 的设计目的是把底层 OS 里的数据缓存到内存中。
- 堆内存的大小最好不要超过 32GB:在 Java 中,所有对象都分配在堆上,然后有一个 Klass Pointer 指针指向它的类元数据。
- 假设你有个机器有 128 GB 的内存,你可以创建两个节点,每个节点内存分配不超过 32 GB;也就是说不超过 64 GB 内存给 ES 的堆内存,剩下的超过 64 GB 的内存给 Lucene。
6.4 参数
参数名 | 参数值 | 说明 |
---|---|---|
cluster.name | elasticsearch | 配置 ES 的集群名称,默认值是ES,建议改成与所存数据相关的名称,ES 会自动发现在同一网段下的集群名称相同的节点 |
node.name | node-1 | 集群中的节点名,在同一个集群中不能重复。节点的名称一旦设置,就不能再改变了。当然,也可以设置成服务器的主机名称,例如 node.name:${HOSTNAME}。 |
node.master | true | 指定该节点是否有资格被选举成为 Master 节点,默认是 True,如果被设置为 True,则只是有资格成为 Master 节点,具体能否成为 Master 节点,需要通过选举产生。 |
node.data | true | 指定该节点是否存储索引数据,默认为 True。数据的增、删、改、查都是在 Data 节点完成的。 |
index.number_of_shards | 1 | 设置都索引分片个数,默认是 1 片。也可以在创建索引时设置该值,具体设置为多大都值要根据数据量的大小来定。如果数据量不大,则设置成 1 时效率最高 |
index.number_of_replicas | 1 | 设置默认的索引副本个数,默认为 1 个。副本数越多,集群的可用性越好,但是写索引时需要同步的数据越多。 |
transport.tcp.compress | true | 设置在节点间传输数据时是否压缩,默认为 False,不压缩 |
discovery.zen.minimum_master_nodes | 1 | 设置在选举 Master 节点时需要参与的最少的候选主节点数,默认为 1。如果使用默认值,则当网络不稳定时有可能会出现脑裂合理的数值为(master_eligible_nodes/2)+1,其中 master_eligible_nodes 表示集群中的候选主节点数 |
discovery.zen.ping.timeout | 3s | 设置在集群中自动发现其他节点时 Ping 连接的超时时间,默认为 3 秒。在较差的网络环境下需要设置得大一点,防止因误判该节点的存活状态而导致分片的转移 |
七、面试题
7.1 Elasticsearch的master选举流程
- Elasticsearch的选主是ZenDiscovery模块负责的,主要包含Ping(节点之间通过这个RPC来发现彼此)和Unicast(单播模块包含一个主机列表以控制哪些节点需要ping通)这两部分
- 对所有可以成为master的节点(node.master: true)根据nodeId字典排序,每次选举每个节点都把自己所知道节点排一次序,然后选出第一个(第0位)节点,暂且认为它是master节点。
- 如果对某个节点的投票数达到一定的值(可以成为master节点数n/2+1)并且该节点自己也选举自己,那这个节点就是master。否则重新选举一直到满足上述条件。
- master节点的职责主要包括集群、节点和索引的管理,不负责文档级别的管理;data节点可以关闭http功能。
7.2 脑裂问题
- “脑裂”问题可能的成因:
- 网络问题:集群间的网络延迟导致一些节点访问不到master,认为master挂掉了从而选举出新的master,并对master上的分片和副本标红,分配新的主分片
- 节点负载:主节点的角色既为master又为data,访问量较大时可能会导致ES停止响应造成大面积延迟,此时其他节点得不到主节点的响应认为主节点挂掉了,会重新选取主节点。
- 内存回收:data节点上的ES进程占用的内存较大,引发JVM的大规模内存回收,造成ES进程失去响应。
- 脑裂问题解决方案
- 减少误判:
discovery.zen.ping_timeout
节点状态的响应时间,默认为3s,可以适当调大,如果master在该响应时间的范围内没有做出响应应答,判断该节点已经挂掉了。调大参数(如6s,discovery.zen.ping_timeout:6),可适当减少误判。 - 选举触发:
discovery.zen.minimum_master_nodes:1
。该参数是用于控制选举行为发生的最小集群主节点数量。当备选主节点的个数大于等于该参数的值, 且备选主节点中有该参数个节点认为主节点挂了,进行选举。官方建议为(n/2)+1,n为主节点个数 (即有资格成为主节点的节点个数) - 角色分离:即master节点与data节点分离,限制角色。主节点配置为:node.master: true node.data: false,从节点配置为:node.master: false node.data: true。
7.3 部署优化
- 请确保运行应用程序的 JVM 和服务器的 JVM 是完全一样的。
- 通过设置gateway.recover_after_nodes、gateway.expected_nodes、gateway.recover_after_time可以在集群重启的时候避免过多的分片交换,这可能会让数据恢复从数个小时缩短为几秒钟。
- Lucene 使用了大量的文件。同时,Elasticsearch 在节点和 HTTP 客户端之间进行通信也使用了大量的套接字。 所有这一切都需要足够的文件描述符。你应该增加你的文件描述符,设置一个很大的值,如 64,000。
- 使用批量请求并调整其大小:每次批量数据 5–15 MB 大是个不错的起始点。
- 段和合并:Elasticsearch 默认值是 20 MB/s,对机械磁盘应该是个不错的设置。如果你用的是 SSD,可以考虑提高到 100–200 MB/s。如果你在做批量导入,完全不在意搜索,你可以彻底关掉合并限流。
- 可以增加 index.translog.flush_threshold_size 设置,从默认的 512 MB 到更大一些的值,比如 1 GB,这可以在一次清空触发的时候在事务日志里积累出更大的段。
- 如果你的搜索结果不需要近实时的准确度,考虑把每个索引的index.refresh_interval 改到30s。
- 如果你在做大批量导入,考虑通过设置index.number_of_replicas: 0 关闭副本。
7.4 读写一致
- 可以通过版本号使用乐观并发控制,以确保新版本不会被旧版本覆盖,由应用层来处理具体的冲突;
- 另外对于写操作,一致性级别支持quorum/one/all,默认为quorum,即只有当大多数分片可用时才允许写操作。但即使大多数可用,也可能存在因为网络等原因导致写入副本失败,这样该副本被认为故障,分片将会在一个不同的节点上重建。
- 对于读操作,可以设置replication为sync(默认),这使得操作在主分片和副本分片都完成后才会返回;如果设置replication为async时,也可以通过设置搜索请求参数_preference为primary来查询主分片,确保文档是最新版本。
转载:https://blog.csdn.net/weixin_42480750/article/details/115616724