您现在的位置是:网站首页> 编程资料编程资料

SQL Server 批量插入数据的完美解决方案_MsSql_

2023-05-26 394人已围观

简介 SQL Server 批量插入数据的完美解决方案_MsSql_

一、Sql Server插入方案介绍

关于 SqlServer 批量插入的方式,有三种比较常用的插入方式,InsertBatchInsertSqlBulkCopy,下面我们对比以下三种方案的速度

1.普通的Insert插入方法

 public static void Insert(IEnumerable persons) { using (var con = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;")) { con.Open(); foreach (var person in persons) { using (var com = new SqlCommand( "INSERT INTO dbo.Person(Id,Name,Age,CreateTime,Sex)VALUES(@Id,@Name,@Age,@CreateTime,@Sex)", con)) { com.Parameters.AddRange(new[] { new SqlParameter("@Id", SqlDbType.BigInt) {Value = person.Id}, new SqlParameter("@Name", SqlDbType.VarChar, 64) {Value = person.Name}, new SqlParameter("@Age", SqlDbType.Int) {Value = person.Age}, new SqlParameter("@CreateTime", SqlDbType.DateTime) {Value = person.CreateTime ?? (object) DBNull.Value}, new SqlParameter("@Sex", SqlDbType.Int) {Value = (int)person.Sex}, }); com.ExecuteNonQuery(); } } } }

2.拼接BatchInsert插入语句

 public static void BatchInsert(Person[] persons) { using (var con = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;")) { con.Open(); var pageCount = (persons.Length - 1) / 1000 + 1; for (int i = 0; i < pageCount; i++) { var personList = persons.Skip(i * 1000).Take(1000).ToArray(); var values = personList.Select(p => $"({p.Id},'{p.Name}',{p.Age},{(p.CreateTime.HasValue ? $"'{p.CreateTime:yyyy-MM-dd HH:mm:ss}'" : "NULL")},{(int) p.Sex})"); var insertSql = $"INSERT INTO dbo.Person(Id,Name,Age,CreateTime,Sex)VALUES{string.Join(",", values)}"; using (var com = new SqlCommand(insertSql, con)) { com.ExecuteNonQuery(); } } } }

3.SqlBulkCopy插入方案

 public static void BulkCopy(IEnumerable persons) { using (var con = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;")) { con.Open(); var table = new DataTable(); table.Columns.AddRange(new [] { new DataColumn("Id", typeof(long)), new DataColumn("Name", typeof(string)), new DataColumn("Age", typeof(int)), new DataColumn("CreateTime", typeof(DateTime)), new DataColumn("Sex", typeof(int)), }); foreach (var p in persons) { table.Rows.Add(new object[] {p.Id, p.Name, p.Age, p.CreateTime, (int) p.Sex}); } using (var copy = new SqlBulkCopy(con)) { copy.DestinationTableName = "Person"; copy.WriteToServer(table); } } }

3.三种方案速度对比

方案数量时间
Insert1千条145.4351ms
BatchInsert1千条103.9061ms
SqlBulkCopy1千条7.021ms
Insert1万条1501.326ms
BatchInsert1万条850.6274ms
SqlBulkCopy1万条30.5129ms
Insert10万条13875.4934ms
BatchInsert10万条8278.9056ms
SqlBulkCopy10万条314.8402ms

两者插入效率对比,Insert明显比SqlBulkCopy要慢太多,大概20~40倍性能差距,下面我们将SqlBulkCopy封装一下,让批量插入更加方便

二、SqlBulkCopy封装代码

1.方法介绍

批量插入扩展方法签名

方法方法参数介绍
BulkCopy同步的批量插入方法
SqlConnection connectionsql server 连接对象
IEnumerable source需要批量插入的数据源
string tableName = null插入表名称【为NULL默认为实体名称】
int bulkCopyTimeout = 30批量插入超时时间
int batchSize = 0写入数据库一批数量【如果为0代表全部一次性插入】最合适数量【这取决于您的环境,尤其是行数和网络延迟。就个人而言,我将从BatchSize属性设置为1000行开始,然后看看其性能如何。如果可行,那么我将使行数加倍(例如增加到2000、4000等),直到性能下降或超时。否则,如果超时发生在1000,那么我将行数减少一半(例如500),直到它起作用为止。】
SqlBulkCopyOptions options = SqlBulkCopyOptions.Default批量复制参数
SqlTransaction externalTransaction = null执行的事务对象
BulkCopyAsync异步的批量插入方法
SqlConnection connectionsql server 连接对象
IEnumerable source需要批量插入的数据源
string tableName = null插入表名称【为NULL默认为实体名称】
int bulkCopyTimeout = 30批量插入超时时间
int batchSize = 0写入数据库一批数量【如果为0代表全部一次性插入】最合适数量【这取决于您的环境,尤其是行数和网络延迟。就个人而言,我将从BatchSize属性设置为1000行开始,然后看看其性能如何。如果可行,那么我将使行数加倍(例如增加到2000、4000等),直到性能下降或超时。否则,如果超时发生在1000,那么我将行数减少一半(例如500),直到它起作用为止。】
SqlBulkCopyOptions options = SqlBulkCopyOptions.Default批量复制参数
SqlTransaction externalTransaction = null执行的事务对象

这个方法主要解决了两个问题:

  • 免去了手动构建DataTable或者IDataReader接口实现类,手动构建的转换比较难以维护,如果修改字段就得把这些地方都进行修改,特别是还需要将枚举类型特殊处理,转换成他的基础类型(默认int
  • 不用亲自创建SqlBulkCopy对象,和配置数据库列的映射,和一些属性的配置

此方案也是在我公司中使用,以满足公司的批量插入数据的需求,例如第三方的对账数据此方法使用的是Expression动态生成数据转换函数,其效率和手写的原生代码差不多,和原生手写代码相比,多余的转换损失很小【最大的性能损失都是在值类型拆装箱上】

此方案和其他网上的方案有些不同的是:不是将List先转换成DataTable,然后写入SqlBulkCopy的,而是使用一个实现IDataReader的读取器包装List,每往SqlBulkCopy插入一行数据才会转换一行数据

IDataReader方案和DataTable方案相比优点

效率高:DataTable方案需要先完全转换后,才能交由SqlBulkCopy写入数据库,而IDataReader方案可以边转换边交给SqlBulkCopy写入数据库(例如:10万数据插入速度可提升30%)

占用内存少:DataTable方案需要先完全转换后,才能交由SqlBulkCopy写入数据库,需要占用大量内存,而IDataReader方案可以边转换边交给SqlBulkCopy写入数据库,无须占用过多内存

强大:因为是边写入边转换,而且EnumerableReader传入的是一个迭代器,可以实现持续插入数据的效果

2.实现原理

① 实体Model与表映射

数据库表代码

 CREATE TABLE [dbo].[Person]( [Id] [BIGINT] NOT NULL, [Name] [VARCHAR](64) NOT NULL, [Age] [INT] NOT NULL, [CreateTime] [DATETIME] NULL, [Sex] [INT] NOT NULL, PRIMARY KEY CLUSTERED ( [Id] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] ) ON [PRIMARY]

实体类代码

 public class Person { public long Id { get; set; } public string Name { get; set; } public int Age { get; set; } public DateTime? CreateTime { get; set; } public Gender Sex { get; set; } } public enum Gender { Man = 0, Woman = 1 }

  • 创建字段映射【如果没有此字段映射会导致数据填错位置,如果类型不对还会导致报错】【因为:没有此字段映射默认是按照列序号对应插入的】
  • 创建映射使用的SqlBulkCopy类型的ColumnMappings属性来完成,数据列与数据库中列的映射
 //创建批量插入对象 using (var copy = new SqlBulkCopy(connection, options, externalTransaction)) { foreach (var column in ModelToDataTable.Columns) { //创建字段映射 copy.ColumnMappings.Add(column.ColumnName, column.ColumnName); } }

② 实体转换成数据行

将数据转换成数据行采用的是:反射+Expression来完成

其中反射是用于获取编写Expression所需程序类,属性等信息

其中Expression是用于生成高效转换函数其中ModelToDataTable类型利用了静态泛型类特性,实现泛型参数的缓存效果

ModelToDataTable的静态构造函数中,生成转换函数,获取需要转换的属性信息,并存入静态只读字段中,完成缓存

③ 使用IDataReader插入数据的重载

EnumerableReader是实现了IDataReader接口的读取类,用于将模型对象,在迭代器中读取出来,并转换成数据行,可供SqlBulkCopy读取

SqlBulkCopy只会调用三个方法:GetOrdinalReadGetValue

  • 其中GetOrdinal只会在首行读取每个列所代表序号【需要填写:SqlBulkCopy类型的ColumnMappings属性】
  • 其中Read方法是迭代到下一行,并调用ModelToDataTable.ToRowData.Invoke()来将模型对象转换成数据行object[]
  • 其中GetValue方法是获取当前行指定下标位置的值

3.完整代码

扩展方法类

 public static class SqlConnectionExtension { ///  /// 批量复制 ///  /// 插入的模型对象 /// 需要批量插入的数据源 /// 数据库连接对象 /// 插入表名称【为NULL默认为实体名称】 /// 插入超时时间 /// 写入数据库一批数量【如果为0代表全部一次性插入】最合适数量【这取决于您的环境,尤其是行数和网络延迟。就个人而言,我将从BatchSize属性设置为1000行开始,然后看看其性能如何。如果可行,那么我将使行数加倍(例如增加到2000、4000等),直到性能下降或超时。否则,如果超时发生在1000,那么我将行数减少一半(例如500),直到它起作用为止。】 /// 批量复制参数 /// 执行的事务对象 /// 插入数量 public static int BulkCopy(this SqlConnection connection, IEnumerable source, string tableName = null, int bulkCopyTimeout = 30, int batchSize = 0, SqlBulkCopyOptions options = SqlBulkCopyOptions.Default, SqlTransaction external
                
                

-六神源码网