前几篇文章介绍了关于一些大数据的相关内容。但是实际生产中数据往往都是数仓的构建都是基于数据库的改变做的。Oracle和Sql server都有基于CDC的数据仓库构建方案。而mysql目前已知的方案就是基于binlog来构建数仓。也可以结合binlog和前文所讲的flink做一些实时计算
代码
binlogDto
package com.example.demo.dto;
public class BinlogDto {
private String event;
private Object value;
public BinlogDto(String event, Object value) {
this.event = event;
this.value = value;
}
public BinlogDto() {
}
public String getEvent() {
return event;
}
public void setEvent(String event) {
this.event = event;
}
public Object getValue() {
return value;
}
public void setValue(Object value) {
this.value = value;
}
}
Message
package com.example.demo.dto;
import java.util.Date;
public class Message {
private Long id;
private String msg;
private Date sendTime;
public Message(Long id, String msg, Date sendTime) {
this.id = id;
this.msg = msg;
this.sendTime = sendTime;
}
public Message() {
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Date getSendTime() {
return sendTime;
}
public void setSendTime(Date sendTime) {
this.sendTime = sendTime;
}
}
KafkaSender
package com.example.demo.kafka;
import com.alibaba.fastjson.JSON;
import com.example.demo.dto.Message;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@Component
public class KafkaSender {
@Autowired
KafkaTemplate kafkaTemplate;
public void createTopic(String host,String topic,int partNum,short repeatNum) {
Properties props = new Properties();
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,host);
NewTopic newTopic = new NewTopic(topic, partNum, repeatNum);
AdminClient adminClient = AdminClient.create(props);
List<NewTopic> topicList = Arrays.asList(newTopic);
adminClient.createTopics(topicList);
adminClient.close(10, TimeUnit.SECONDS);
}
public void send(String topic,String msg){
Message message=new Message();
message.setId(System.currentTimeMillis());
message.setMsg(msg);
message.setSendTime(new Date());
System.out.println("发送消息的topic"+topic+"发送的消息内容"+JSON.toJSONString(msg));
kafkaTemplate.send(topic, JSON.toJSONString(msg));
kafkaTemplate.send(topic,msg);
}
}
BinlogclientRunner
package com.example.demo.runner;
import com.alibaba.fastjson.JSON;
import com.example.demo.dto.BinlogDto;
import com.example.demo.kafka.KafkaSender;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
public class BinlogClientRunner implements CommandLineRunner {
@Value("${binlog.host}")
private String host;
@Value("${binlog.port}")
private int port;
@Value("${binlog.user}")
private String user;
@Value("${binlog.password}")
private String password;
// binlog server_id
@Value("${server.id}")
private long serverId;
// kafka话题
@Value("${kafka.topic}")
private String topic;
// kafka分区
@Value("${kafka.partNum}")
private int partNum;
// Kafka备份数
@Value("${kafka.repeatNum}")
private short repeatNum;
// kafka地址
@Value("${spring.kafka.bootstrap-servers}")
private String kafkaHost;
// 指定监听的数据表
@Value("${binlog.database.table}")
private String database_table;
@Autowired
KafkaSender kafkaSender;
@Async
@Override
public void run(String... args) throws Exception {
// 创建topic
kafkaSender.createTopic(kafkaHost, topic, partNum, repeatNum);
// 获取监听数据表数组
List<String> databaseList = Arrays.asList(database_table.split(","));
HashMap<Long, String> tableMap = new HashMap<Long, String>();
// 创建binlog监听客户端
BinaryLogClient client = new BinaryLogClient(host, port, user, password);
client.setServerId(serverId);
client.registerEventListener((event -> {
// binlog事件
EventData data = event.getData();
if (data != null) {
if (data instanceof TableMapEventData) {
TableMapEventData tableMapEventData = (TableMapEventData) data;
tableMap.put(tableMapEventData.getTableId(), tableMapEventData.getDatabase() + "." + tableMapEventData.getTable());
}
// update数据
if (data instanceof UpdateRowsEventData) {
UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) data;
String tableName = tableMap.get(updateRowsEventData.getTableId());
if (tableName != null && databaseList.contains(tableName)) {
String eventKey = tableName + ".update";
for (Map.Entry<Serializable[], Serializable[]> row : updateRowsEventData.getRows()) {
String msg = JSON.toJSONString(new BinlogDto(eventKey, row.getValue()));
kafkaSender.send(topic, msg);
}
}
}
// insert数据
else if (data instanceof WriteRowsEventData) {
WriteRowsEventData writeRowsEventData = (WriteRowsEventData) data;
String tableName = tableMap.get(writeRowsEventData.getTableId());
if (tableName != null && databaseList.contains(tableName)) {
String eventKey = tableName + ".insert";
for (Serializable[] row : writeRowsEventData.getRows()) {
String msg = JSON.toJSONString(new BinlogDto(eventKey, row));
kafkaSender.send(topic, msg);
}
}
}
// delete数据
else if (data instanceof DeleteRowsEventData) {
DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) data;
String tableName = tableMap.get(deleteRowsEventData.getTableId());
if (tableName != null && databaseList.contains(tableName)) {
String eventKey = tableName + ".delete";
for (Serializable[] row : deleteRowsEventData.getRows()) {
String msg = JSON.toJSONString(new BinlogDto(eventKey, row));
kafkaSender.send(topic, msg);
}
}
}
}
}));
client.connect();
}
}
本文转载自他人博客https://www.jianshu.com/p/5acb30ec8347
转载:https://blog.csdn.net/u011342403/article/details/101477655
查看评论