小言_互联网的博客

千万级数据量的插入操作(MYSQL)

318人阅读  评论(0)

前几天因为公司业务迁移需要,需要从数仓同步一张大表,数据总量大概三千多万,接近四千万的样子,当遇到这种数据量的时候,综合考虑之后,当前比较流行的框架都不能满足于生产需求,使用框架对性能的损耗过于严重,所以有了以下千万级数据量的插入方案。

当数据量达到一定规模的时候,假设一个语句为这样,还比较小的,只有三个字段。

INSERT INTO user_operation_min_temp(observer_id, access_id, access_name) VALUES (?, ?, ?)

如果使用单线程,一次INSET一条的话,那么要插入千万次,并且提交千万次,众所周知,数据库有一定的瓶颈,大量的插入提交操作会严重损耗系统性能。

所以需要使用批量插入,Mybatis的批量插入虽然是批量插入,但那只是业务层面的批量插入,真正执行的时候,还是会帮你分解成单条插入,所以必须要放弃orm框架。而使用原生的插入

批量插入的代码如下

/**
     * 数据存入新的临时表
     * @param list
     */
    private void batch(List<UserOperation> list) {
        String sql = "INSERT INTO user_operation_min_temp(observer_id, access_id, access_name) VALUES (?, ?, ?) ";
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        try {
            connection = dataSource.getConnection();
            connection.setAutoCommit(false);
            preparedStatement = connection.prepareStatement(sql);
            for (UserOperation obj : list) {
                preparedStatement.setInt(1, obj.getObserverId());
                preparedStatement.setInt(2, obj.getAccessId());
                preparedStatement.setString(3, obj.getAccessName());
                preparedStatement.addBatch();
            }
            preparedStatement.executeBatch();
            connection.commit();
        } catch (Exception e) {

        } finally {
            if (preparedStatement != null) {
                try {
                    preparedStatement.close();
                    preparedStatement = null;
                } catch (Exception e) {

                }
            }
            if (connection != null) {
                try {
                    connection.close();
                    connection = null;
                } catch (Exception e) {

                }
            }
        }
    }

那么问题来了,我是否可以把三千多万条list直接传入到这个batch方法呢,答案是不是不可以,但是却是不太好,第一就是你的三千多万条list这个对象或者数组一直要等待插入完成,系统中没有地方使用才可以进行释放内存,对业务或者服务器的负荷比较大,并且容易造成OOM,也就是内存溢出问题。

所以可以利用多线程对list进行分割,比如说3000万的话,可以按照每100万一个批次,也就是30个批次,使用多线程提交这三十个批次。具体的可以按照自己的情况来分割。

list分割可以使用集合的工具类

List<List<UserOperation>> batch = Lists.partition(allList, 30);

需要引入guava依赖

<dependency>
	<groupId>com.google.guava</groupId>
	<artifactId>guava</artifactId>
	<version>24.1-jre</version>
</dependency>

然后

batch.parallelStream().forEach(list -> {
      //执行插入
      batch(list);
});

然后,你以为这样就可以了吗,现实会告诉你,我不要面子的吗、这样缺失不行,第一,你的三千万条数据的对象还是三千万,还多分割了30个批次。内存不要钱吗。所以在读取数据的时候就要烤炉分割,而不会一下子把三千万的数据一次性拿到内存来,然后一批处理完成之后及时释放掉。

改造之后的方案。

1、读取数据,因为我这里读取数据是使用的接口进行调用。可以根据自己的业务进行改造

public void sync() throws SQLException {
        long startTime = System.currentTimeMillis();
        //获取所有用户
        List<User> users = syncService.all();
        if(users == null || users.size() == 0) {
            return;
        }
        //复制表结构
        syncService.like();
        //按照200个人一个批次进行分
        List<List<User>> batch = Lists.partition(users, 200);
        //并发处理 数据存入新表
        batch.parallelStream().forEach(list -> {
            //使用线程安全的集合
            List<UserOperation> userOperations = Collections.synchronizedList(new ArrayList<>());
            for(User user : list) {
                Integer obId = user.getObId();
                if(obId == null || "".equalsIgnoreCase(obId.toString())) {
                    continue;
                }
                //调用接口获取每个用户的权限
                List<UserOperation> temp = syncService.syncUserPermission(obId.toString());
                //当到线程的list中
                userOperations.addAll(temp);
                //logger.info("加入员工 obId:【{}】,姓名:【{}】", obId, user.getName());
            }
            try {
                //执行保存逻辑
                syncService.save(userOperations);
                logger.info("线程批次存入数据库: 当前线程:{}", Thread.currentThread());
            } catch (SQLException e) {
                e.printStackTrace();
            } finally {
                //执行释放
                userOperations = null;
            }
        });
        //数据写完之后,进行临时表的切换
        //一个事物。
        //1、删除旧表
        //2、临时表重命名为旧表的名字
        syncService.transactional();
        long endTime = System.currentTimeMillis();
        logger.info("耗时:{} ms", endTime - startTime);
    }

说明一下,这里不使用清空表再进行插入是因为避免对业务系统的使用造成影响,保证这些操作是在一个事物中发生。

调用全椒县接口的代码

public List<UserOperation> syncUserPermission(String obId) {
        Map<String, Object> params = ImmutableMap.of("data", ImmutableMap.of("observerId", obId));
        ResponseEntity<String> content = null;
        try {
            content = new RetryTemplate() {
                @Override
                protected ResponseEntity<String> doService() {
                    return RestTemplateUtils.post("", params, String.class);
                }
            }.setRetryTime(3).setSleepTime(1000).execute();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (content == null) {
            return new ArrayList<>();
        }
        String body = content.getBody();
        JSONArray jsonArray = JSONArray.parseArray(body);
        if (jsonArray == null || jsonArray.size() == 0) {
            return new ArrayList<>();
        }
        List<UserOperation> operations = jsonArray.stream().map(p -> {
            JSONObject json = (JSONObject) p;
            UserOperation userOperation = new UserOperation();
            userOperation.setObserverId(Integer.parseInt(obId));
            userOperation.setAccessId(json.getInteger("iD"));
            userOperation.setAccessName(json.getString("name"));
            userOperation.setActive(1);
            return userOperation;
        }).collect(Collectors.toList());
        return operations;
    }

表结构的方法

@Transactional
    public void transactional() throws SQLException {
        drop();
        reName();
    }

    /**
     * 复制表结构
     */
    public void like() throws SQLException {
        String sql = "CREATE TABLE user_operation_min_temp LIKE user_operation_min;";
        new QueryRunner(dataSource).update(sql);
    }

    /**
     * 删掉旧表
     * @throws SQLException
     */
    public void drop() throws SQLException {
        String sql = "drop table user_operation_min;";
        new QueryRunner(dataSource).update(sql);
    }

    /**
     * 新表重命名
     */
    public void reName() throws SQLException {
        String sql = "ALTER TABLE user_operation_min_temp RENAME TO  user_operation_min;";
        new QueryRunner(dataSource).update(sql);
    }

然后重点来了,之前提到过得批量INSET方法真的可以用吗?还需要进行一下处理,上面我的INSET语句是三个字段,也就是三个?,三个占位符,PreparedStatement ,一次提交的占位符不能超过65535个,65536 / 3 = 21845 所以我一个批次不能超过21848,多于的还是要分割。这个因为上层已经分割过了,所以就按照最粗的粒度分割。

public void save(List<UserOperation> list) throws SQLException {
        if (list == null || list.size() == 0) {
            return;
        }
        pre(list);
    }

    /**
     * 预处理占位符问题
     * @param list
     * @throws SQLException
     */
    private void pre(List<UserOperation> list) throws SQLException {
        // 21845 * 3 = 65535  占位符不可以超过这个数
        if (list.size() > 21845) {
            List<List<UserOperation>> parts = Lists.partition(list, 21845);
            parts.forEach(this::batch);
            return;
        }
        batch(list);
    }

    /**
     * 数据存入新的临时表
     * @param list
     */
    private void batch(List<UserOperation> list) {
        String sql = "INSERT INTO user_operation_min_temp(observer_id, access_id, access_name) VALUES (?, ?, ?) ";
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        try {
            connection = dataSource.getConnection();
            connection.setAutoCommit(false);
            preparedStatement = connection.prepareStatement(sql);
            for (UserOperation obj : list) {
                preparedStatement.setInt(1, obj.getObserverId());
                preparedStatement.setInt(2, obj.getAccessId());
                preparedStatement.setString(3, obj.getAccessName());
                preparedStatement.addBatch();
            }
            preparedStatement.executeBatch();
            connection.commit();
        } catch (Exception e) {

        } finally {
            if (preparedStatement != null) {
                try {
                    preparedStatement.close();
                    preparedStatement = null;
                } catch (Exception e) {

                }
            }
            if (connection != null) {
                try {
                    connection.close();
                    connection = null;
                } catch (Exception e) {

                }
            }
        }
    }

代码中很多我业务中的类,这里只分享技巧和经验,在使用中还是要根据自己的数据来进行处理。

最后再分享一个微信公众号,关注公众号,是您来过的仪式感。

号主为一线大厂架构师,博客访问量突破一千万。主要分享Java、golang架构,源码,分布式,高并发等技术,用大厂程序员的视角来探讨技术进阶、面试指南、职业规划等。15W技术人的选择!

 


转载:https://blog.csdn.net/qq_29777207/article/details/101602372
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场