小言_互联网的博客

Mysql和Elasticsearch的数据同步

301人阅读  评论(0)

Elasticsearch的数据来自Mysql数据库中,所以当我们的MySQL发生改变时,Elasticsearch也要跟着改变,这时候我们的es的数据就要和mysql同步了


同步实现思路

 常见的数据同步方案有三种:

  • 同步调用

  • 异步通知

  • 监听binlog

 方案一:
      

 

  • hotel-demo对外提供接口,用来修改elasticsearch中的数据

  • 酒店管理服务在完成数据库操作后,直接调用hotel-demo提供的接口,

        也就是说MySQL修改完去修改es的数据 

  • 优点:实现简单,粗暴

  • 缺点:业务耦合度高

 方案二

 

 

  • hotel-admin对mysql数据库数据完成增、删、改后,发送MQ消息

  • hotel-demo监听MQ,接收到消息后完成elasticsearch数据修改

  • 优点:低耦合,实现难度一般

  • 缺点:依赖mq的可靠性

  • 这个实现方式也就是使用mq进行操纵,当我们修改MySQL的服务器修改完以后会将信息发送给MQ,然后修改ES的会进行监听,当监听到了以后就进行修改es的操作

方式三:

 

 

  • 给mysql开启binlog功能

  • mysql完成增、删、改操作都会记录在binlog中

  • hotel-demo基于canal监听binlog变化,实时更新elasticsearch中的内容

  • 也就是监听mysql,如果MySQL的数据有变化那么就直接去改变es的数据

  • 优点:完全解除服务间耦合

  • 缺点:开启binlog增加数据库负担、实现复杂度高

 在这里使用的是第二种实现方案:使用MQ来写


同步案例代码
 

用来操控ES的代码(负责监听MQ队列)


  
  1. /**
  2. * 监听增加和修改的队列
  3. * 因为我们的ES中可以进行全量修改,当有这个id的数据的时候那么就先删除再新增,没有这个数据那么就直接新增
  4. * 所以队列过来的id不管是新增还是修改es都可以判断如果有这个数据id那么就先删除再新增,如果没有这个数据就直接新增,所以新增和修改他俩用一个方法就行了
  5. *
  6. * @param id 队列中需要进行操作的id
  7. */
  8. @RabbitListener(bindings = @QueueBinding(
  9. value = @Queue(name = MqConstants.HOTEL_INSERT_QUEUE),
  10. exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = ExchangeTypes.DIRECT),
  11. key = MqConstants.HOTEL_INSERT_KEY
  12. ))
  13. public void insertAndUpdate (Long id) {
  14. if (id == null) {
  15. return;
  16. }
  17. log.info( "入参:{}", id);
  18. //监听到以后拿到id去数据库查询整个数据
  19. Hotel hotel = iHotelService.getById(id);
  20. //因为查的mysql数据和es的数据有些不一样所以需要做转换
  21. HotelDoc hotelDoc = new HotelDoc(hotel);
  22. //转换为json
  23. String hotelDocJson = JSON.toJSONString(hotelDoc);
  24. System.out.println( "hotelDocJson = " + hotelDocJson);
  25. //发送到ES中,因为我们的ES中可以进行全量修改,当有这个id的数据的时候那么就先删除再新增,没有这个数据那么就直接新增
  26. //创建请求语义对象 添加文档数据
  27. IndexRequest request = new IndexRequest( "hotel");
  28. //这个新增就是PUT在es中
  29. request.id(hotel.getId().toString()).source(hotelDocJson, XContentType.JSON);
  30. //发送请求
  31. try {
  32. IndexResponse response = client.index(request, RequestOptions.DEFAULT);
  33. RestStatus status = response.status();
  34. log.info( "响应结果为:{}", status);
  35. } catch (IOException e) {
  36. e.printStackTrace();
  37. }
  38. }
  39. /**
  40. * 监听删除队列
  41. *
  42. * @param id 队列中需要进行操作的id
  43. */
  44. @RabbitListener(bindings = @QueueBinding(
  45. value = @Queue(name = MqConstants.HOTEL_DELETE_QUEUE),
  46. exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = ExchangeTypes.DIRECT),
  47. key = MqConstants.HOTEL_DELETE_KEY
  48. ))
  49. public void deleteByMqId (Long id) {
  50. if (id == null) {
  51. return;
  52. }
  53. log.info( "入参:{}", id);
  54. //先创建语义对象,直接就可以给里面写id的字段
  55. DeleteRequest request = new DeleteRequest( "hotel", id.toString());
  56. //发送请求
  57. try {
  58. DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
  59. RestStatus status = response.status();
  60. log.info( "响应结果为:{}", status);
  61. } catch (IOException e) {
  62. e.printStackTrace();
  63. }

 用来操作MySQL的代码:
 


  
  1. @RestController
  2. @RequestMapping("hotel")
  3. public class HotelController {
  4. //注入和RabbitMQ链接
  5. @Autowired
  6. private RabbitTemplate rabbitTemplate;
  7. @Autowired
  8. private IHotelService hotelService;
  9. @GetMapping("/{id}")
  10. public Hotel queryById (@PathVariable("id") Long id) {
  11. return hotelService.getById(id);
  12. }
  13. @GetMapping("/list")
  14. public PageResult hotelList (
  15. @RequestParam(value = "page", defaultValue = "1") Integer page,
  16. @RequestParam(value = "size", defaultValue = "1") Integer size
  17. ) {
  18. Page<Hotel> result = hotelService.page( new Page<>(page, size));
  19. return new PageResult(result.getTotal(), result.getRecords());
  20. }
  21. @PostMapping
  22. public void saveHotel (@RequestBody Hotel hotel) {
  23. hotelService.save(hotel);
  24. rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
  25. }
  26. @PutMapping()
  27. public void updateById (@RequestBody Hotel hotel) {
  28. if (hotel.getId() == null) {
  29. throw new InvalidParameterException( "id不能为空");
  30. }
  31. hotelService.updateById(hotel);
  32. rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
  33. }
  34. @DeleteMapping("/{id}")
  35. public void deleteById (@PathVariable("id") Long id) {
  36. hotelService.removeById(id);
  37. rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_DELETE_KEY, id);
  38. }
  39. }

当然也是可以不使用注解来写,直接在配置文件中写队列绑定的交换机
RabbitMQ_小白要变大牛的博客-CSDN博客RabbitMqhttps://blog.csdn.net/hnhroot/article/details/125463948?spm=1001.2014.3001.5502


转载:https://blog.csdn.net/hnhroot/article/details/125531758
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场