Elasticsearch的数据来自Mysql数据库中,所以当我们的MySQL发生改变时,Elasticsearch也要跟着改变,这时候我们的es的数据就要和mysql同步了
同步实现思路
常见的数据同步方案有三种:
同步调用
异步通知
监听binlog
方案一:
hotel-demo对外提供接口,用来修改elasticsearch中的数据
酒店管理服务在完成数据库操作后,直接调用hotel-demo提供的接口,
也就是说MySQL修改完去修改es的数据
优点:实现简单,粗暴
缺点:业务耦合度高
方案二
hotel-admin对mysql数据库数据完成增、删、改后,发送MQ消息
hotel-demo监听MQ,接收到消息后完成elasticsearch数据修改
优点:低耦合,实现难度一般
缺点:依赖mq的可靠性
这个实现方式也就是使用mq进行操纵,当我们修改MySQL的服务器修改完以后会将信息发送给MQ,然后修改ES的会进行监听,当监听到了以后就进行修改es的操作
方式三:
给mysql开启binlog功能
mysql完成增、删、改操作都会记录在binlog中
hotel-demo基于canal监听binlog变化,实时更新elasticsearch中的内容
也就是监听mysql,如果MySQL的数据有变化那么就直接去改变es的数据
优点:完全解除服务间耦合
缺点:开启binlog增加数据库负担、实现复杂度高
在这里使用的是第二种实现方案:使用MQ来写
同步案例代码
用来操控ES的代码(负责监听MQ队列)
-
/**
-
* 监听增加和修改的队列
-
* 因为我们的ES中可以进行全量修改,当有这个id的数据的时候那么就先删除再新增,没有这个数据那么就直接新增
-
* 所以队列过来的id不管是新增还是修改es都可以判断如果有这个数据id那么就先删除再新增,如果没有这个数据就直接新增,所以新增和修改他俩用一个方法就行了
-
*
-
* @param id 队列中需要进行操作的id
-
*/
-
@RabbitListener(bindings = @QueueBinding(
-
value = @Queue(name = MqConstants.HOTEL_INSERT_QUEUE),
-
exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = ExchangeTypes.DIRECT),
-
key = MqConstants.HOTEL_INSERT_KEY
-
))
-
public
void
insertAndUpdate
(Long id) {
-
if (id ==
null) {
-
return;
-
}
-
log.info(
"入参:{}", id);
-
//监听到以后拿到id去数据库查询整个数据
-
Hotel
hotel
= iHotelService.getById(id);
-
//因为查的mysql数据和es的数据有些不一样所以需要做转换
-
HotelDoc
hotelDoc
=
new
HotelDoc(hotel);
-
//转换为json
-
String
hotelDocJson
= JSON.toJSONString(hotelDoc);
-
System.out.println(
"hotelDocJson = " + hotelDocJson);
-
//发送到ES中,因为我们的ES中可以进行全量修改,当有这个id的数据的时候那么就先删除再新增,没有这个数据那么就直接新增
-
//创建请求语义对象 添加文档数据
-
IndexRequest
request
=
new
IndexRequest(
"hotel");
-
//这个新增就是PUT在es中
-
request.id(hotel.getId().toString()).source(hotelDocJson, XContentType.JSON);
-
//发送请求
-
try {
-
IndexResponse
response
= client.index(request, RequestOptions.DEFAULT);
-
RestStatus
status
= response.status();
-
log.info(
"响应结果为:{}", status);
-
}
catch (IOException e) {
-
e.printStackTrace();
-
}
-
}
-
-
/**
-
* 监听删除队列
-
*
-
* @param id 队列中需要进行操作的id
-
*/
-
@RabbitListener(bindings = @QueueBinding(
-
value = @Queue(name = MqConstants.HOTEL_DELETE_QUEUE),
-
exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = ExchangeTypes.DIRECT),
-
key = MqConstants.HOTEL_DELETE_KEY
-
))
-
public
void
deleteByMqId
(Long id) {
-
if (id ==
null) {
-
return;
-
}
-
log.info(
"入参:{}", id);
-
//先创建语义对象,直接就可以给里面写id的字段
-
DeleteRequest
request
=
new
DeleteRequest(
"hotel", id.toString());
-
//发送请求
-
try {
-
DeleteResponse
response
= client.delete(request, RequestOptions.DEFAULT);
-
RestStatus
status
= response.status();
-
log.info(
"响应结果为:{}", status);
-
}
catch (IOException e) {
-
e.printStackTrace();
-
}
用来操作MySQL的代码:
-
@RestController
-
@RequestMapping("hotel")
-
public
class
HotelController {
-
-
//注入和RabbitMQ链接
-
@Autowired
-
private RabbitTemplate rabbitTemplate;
-
-
@Autowired
-
private IHotelService hotelService;
-
-
@GetMapping("/{id}")
-
public Hotel
queryById
(@PathVariable("id") Long id) {
-
return hotelService.getById(id);
-
}
-
-
@GetMapping("/list")
-
public PageResult
hotelList
(
-
@RequestParam(value = "page", defaultValue = "1") Integer page,
-
@RequestParam(value = "size", defaultValue = "1") Integer size
-
) {
-
Page<Hotel> result = hotelService.page(
new
Page<>(page, size));
-
-
return
new
PageResult(result.getTotal(), result.getRecords());
-
}
-
-
@PostMapping
-
public
void
saveHotel
(@RequestBody Hotel hotel) {
-
hotelService.save(hotel);
-
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
-
}
-
-
@PutMapping()
-
public
void
updateById
(@RequestBody Hotel hotel) {
-
if (hotel.getId() ==
null) {
-
throw
new
InvalidParameterException(
"id不能为空");
-
}
-
hotelService.updateById(hotel);
-
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
-
}
-
-
@DeleteMapping("/{id}")
-
public
void
deleteById
(@PathVariable("id") Long id) {
-
hotelService.removeById(id);
-
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_DELETE_KEY, id);
-
}
-
}
当然也是可以不使用注解来写,直接在配置文件中写队列绑定的交换机
RabbitMQ_小白要变大牛的博客-CSDN博客RabbitMqhttps://blog.csdn.net/hnhroot/article/details/125463948?spm=1001.2014.3001.5502
转载:https://blog.csdn.net/hnhroot/article/details/125531758