利用MySqlBulkLoader实现批量插入数据的示例详解

 

介绍

最近在项目中遇到插入数据瓶颈,几万、几十万、几百万的数据保存到MYSQL数据库,使用EF插入数据速度非常慢,数据量非常大时EF插入需要几十分钟,甚至几个小时,这样子的速度肯定不是我们所期望的。

后面经过了解与研究发现MySqlBulkLoader,可以批量将数据插入到数据库并且速度上面远远优于EF。

MySqlBulkLoader主要的实现方式:将需要插入的数据转成DataTable,DataTable转成一个CSV文件,将CSV文件使用批量导入的形式导入到数据库里面去。

注意:

1).数据库连接地址需要添加配置AllowLoadLocalInfile=true,允许本地文件导入;

Data Source = 数据库地址; Port = 端口; Initial Catalog = 数据库名; User Id = 用户名; Password = 密码;AllowLoadLocalInfile=true;

2).插入的时候会返回插入行数,但是检查所有的数据都正确,也没有报异常,却返回了插入数量为0,可以检查表是否有唯一索引,插入的数据是否违反了唯一索引

(以下分块展示了代码,如果需要看完整的代码直接看5.完整的代码

 

1.将List转化为DataTable

/// <summary>
      /// 将List转化为DataTable
      /// </summary>
      /// <returns></returns>
      public DataTable ListToDataTable<T>(List<T> data)
      {
          #region 创建一个DataTable,以实体名称作为DataTable名称

          var tableName = typeof(T).Name;
          tableName = tableName.ToSnakeCase(); /*实体名称与表名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/
          DataTable dt = new DataTable
          {
              TableName = tableName
          };

          #endregion

          #region 拿取列名,以实体的属性名作为列名       

          var properties = typeof(T).GetProperties();
          foreach (var item in properties)
          {
              var curFileName = item.Name;
              curFileName = curFileName.ToSnakeCase();/*列名与字段名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/
              dt.Columns.Add(curFileName);
          }

          #endregion

          #region 列赋值
          foreach (var item in data)
          {
              DataRow dr = dt.NewRow();
              var columns = dt.Columns;

              var curPropertyList = item.GetType().GetProperties();
              foreach (var p in curPropertyList)
              {
                  var name = p.Name;
                  name = name.ToSnakeCase();/*列名与字段名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/
                  var curValue = p.GetValue(item);

                  int i = columns.IndexOf(name);
                  dr[i] = curValue;
              }

              dt.Rows.Add(dr);
          }

          #endregion  

          return dt;
      }

 

2.将DataTable转换为标准的CSV文件

/// <summary>
  /// csv扩展
  /// </summary>
  public static class CSVEx
  {
      /// <summary>
      ///将DataTable转换为标准的CSV文件
      /// </summary>
      /// <param name="table">数据表</param>
      /// <param name="tmpPath">文件地址</param>
      /// <returns>返回标准的CSV</returns>
      public static void ToCsv(this DataTable table, string tmpPath)
      {
          //以半角逗号(即,)作分隔符,列为空也要表达其存在。
          //列内容如存在半角逗号(即,)则用半角引号(即"")将该字段值包含起来。
          //列内容如存在半角引号(即")则应替换成半角双引号("")转义,并用半角引号(即"")将该字段值包含起来。
          StringBuilder sb = new StringBuilder();
          DataColumn colum;
          foreach (DataRow row in table.Rows)
          {
              for (int i = 0; i < table.Columns.Count; i++)
              {
                  Type _datatype = typeof(DateTime);
                  colum = table.Columns[i];
                  if (i != 0) sb.Append("\t");
                  //if (colum.DataType == typeof(string) && row[colum].ToString().Contains(","))
                  //{
                  //    sb.Append("\"" + row[colum].ToString().Replace("\"", "\"\"") + "\"");
                  //}
                  if (colum.DataType == _datatype)
                  {
                      sb.Append(((DateTime)row[colum]).ToString("yyyy/MM/dd HH:mm:ss"));
                  }
                  else sb.Append(row[colum].ToString());
              }
              sb.Append("\r\n");
          }
          StreamWriter sw = new StreamWriter(tmpPath, false, UTF8Encoding.UTF8);
          sw.Write(sb.ToString());
          sw.Close();
      }

  }

 

3.CSV文件导入数据到数据库

/// <summary>
  /// 批量导入mysql帮助类
  /// </summary>
  public static class MySqlHelper
  {
      /// <summary>
      /// MySqlBulkLoader批量导入
      /// </summary>
      /// <param name="_mySqlConnection">数据库连接地址</param>
      /// <param name="table"></param>
      /// <param name="csvName"></param>
      /// <returns></returns>
      public static int BulkLoad(MySqlConnection _mySqlConnection, DataTable table, string csvName)
      {
          var columns = table.Columns.Cast<DataColumn>().Select(colum => colum.ColumnName).ToList();
          MySqlBulkLoader bulk = new MySqlBulkLoader(_mySqlConnection)
          {
              FieldTerminator = "\t",
              FieldQuotationCharacter = '"',
              EscapeCharacter = '"',
              LineTerminator = "\r\n",
              FileName = csvName,
              NumberOfLinesToSkip = 0,
              TableName = table.TableName,

          };

          bulk.Columns.AddRange(columns);
          return bulk.Load();
      }
  }

 

4.使用MySqlBulkLoader批量插入数据

/// <summary>
      /// 使用MySqlBulkLoader批量插入数据
      /// </summary>
      /// <typeparam name="T"></typeparam>
      /// <param name="data"></param>
      /// <returns></returns>
      /// <exception cref="Exception"></exception>
      public int BulkLoaderData<T>(List<T> data)
      {
          if (data.Count <= 0) return 0;

          var connectString = "数据库连接地址";
          using (MySqlConnection connection = new MySqlConnection(connectString))
          {
              MySqlTransaction sqlTransaction = null;
              try
              {
                  if (connection.State == ConnectionState.Closed)
                  {
                      connection.Open();
                  }
                  sqlTransaction = connection.BeginTransaction();


                  var dt = ListToDataTable<T>(data); //将List转成dataTable
                  string tmpPath = Path.GetTempFileName();
                  dt.ToCsv(tmpPath); //将DataTable转成CSV文件
                  var insertCount = MySqlHelper.BulkLoad(connection, dt, tmpPath); //使用MySqlBulkLoader插入数据
                  sqlTransaction.Commit();

                  try
                  {
                      if (File.Exists(tmpPath)) File.Delete(tmpPath);
                  }
                  catch (Exception)
                  {
                      //删除文件失败

                  }
                  return insertCount; //返回执行成功的条数
              }
              catch (Exception e)
              {
                  if (sqlTransaction != null)
                  {
                      sqlTransaction.Rollback();
                  }
                  //执行异常 
                  throw e;
              }
          }

      }

 

5.完整的代码

namespace WebApplication1.BrantchInsert
{

  /// <summary>
  /// 批量插入
  /// </summary>
  public class BulkLoader
  {


      /// <summary>
      /// 测试批量插入入口
      /// </summary>
      /// <returns></returns>
      public int BrantchDataTest()
      {

          #region 模拟数据
          var data = new List<CrmCouponTestDto>() {
               new CrmCouponTestDto {
                   Id=1,
                   CouponCode="test001",
                   CouponId = 1,
                   MemberId=100,
                   IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"),
                   UsageTime=Convert.ToDateTime("3000-12-31 00:00:00"),
                   UsageShopId=0,
                   UsageBillNo="",
                   EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"),
                   EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"),
                   Status=0
               },
               new CrmCouponTestDto {
                   Id=2,
                   CouponCode="test002",
                   CouponId = 1,
                     MemberId=101,
                   IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"),
                   UsageTime=Convert.ToDateTime("2022-06-27 14:30:00"),
                   UsageShopId=2,
                   UsageBillNo="CS202206271430001",
                   EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"),
                   EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"),
                   Status=1
               },
                new CrmCouponTestDto {
                   Id=3,
                   CouponCode="test003",
                   CouponId = 1,
                   MemberId=102,
                   IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"),
                   UsageTime=Convert.ToDateTime("3000-12-31 00:00:00"),
                   UsageShopId=0,
                   UsageBillNo="",
                   EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"),
                   EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"),
                   Status=0
               },
                  new CrmCouponTestDto {
                   Id=4,
                   CouponCode="test004",
                   CouponId = 1,
                   MemberId=103,
                   IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"),
                   UsageTime=Convert.ToDateTime("3000-12-31 00:00:00"),
                   UsageShopId=0,
                   UsageBillNo="",
                   EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"),
                   EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"),
                   Status=0
               }
           };
          #endregion
          var result = BulkLoaderData<CrmCouponTestDto>(data);
          return result;

      }


      /// <summary>
      /// 使用MySqlBulkLoader批量插入数据
      /// </summary>
      /// <typeparam name="T"></typeparam>
      /// <param name="data"></param>
      /// <returns></returns>
      /// <exception cref="Exception"></exception>
      public int BulkLoaderData<T>(List<T> data)
      {
          if (data.Count <= 0) return 0;

          var connectString = "数据库连接地址";
          using (MySqlConnection connection = new MySqlConnection(connectString))
          {
              MySqlTransaction sqlTransaction = null;
              try
              {
                  if (connection.State == ConnectionState.Closed)
                  {
                      connection.Open();
                  }
                  sqlTransaction = connection.BeginTransaction();


                  var dt = ListToDataTable<T>(data); //将List转成dataTable
                  string tmpPath = Path.GetTempFileName();
                  dt.ToCsv(tmpPath); //将DataTable转成CSV文件
                  var insertCount = MySqlHelper.BulkLoad(connection, dt, tmpPath); //使用MySqlBulkLoader插入数据
                  sqlTransaction.Commit();

                  try
                  {
                      if (File.Exists(tmpPath)) File.Delete(tmpPath);
                  }
                  catch (Exception)
                  {
                      //删除文件失败

                  }
                  return insertCount; //返回执行成功的条数
              }
              catch (Exception e)
              {
                  if (sqlTransaction != null)
                  {
                      sqlTransaction.Rollback();
                  }
                  //执行异常 
                  throw e;
              }
          }

      }


      /// <summary>
      /// 将List转化为DataTable核心方法
      /// </summary>
      /// <returns></returns>
      public DataTable ListToDataTable<T>(List<T> data)
      {
          #region 创建一个DataTable,以实体名称作为DataTable名称

          var tableName = typeof(T).Name;
          tableName = tableName.ToSnakeCase(); /*实体名称与表名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/
          DataTable dt = new DataTable
          {
              TableName = tableName
          };

          #endregion

          #region 拿取列名,以实体的属性名作为列名       

          var properties = typeof(T).GetProperties();
          foreach (var item in properties)
          {
              var curFileName = item.Name;
              curFileName = curFileName.ToSnakeCase();/*列名与字段名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/
              dt.Columns.Add(curFileName);
          }

          #endregion

          #region 列赋值
          foreach (var item in data)
          {
              DataRow dr = dt.NewRow();
              var columns = dt.Columns;

              var curPropertyList = item.GetType().GetProperties();
              foreach (var p in curPropertyList)
              {
                  var name = p.Name;
                  name = name.ToSnakeCase();/*列名与字段名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/
                  var curValue = p.GetValue(item);

                  int i = columns.IndexOf(name);
                  dr[i] = curValue;
              }

              dt.Rows.Add(dr);
          }

          #endregion  

          return dt;
      }


  }


  /// <summary>
  /// 批量导入mysql帮助类
  /// </summary>
  public static class MySqlHelper
  {
      /// <summary>
      /// MySqlBulkLoader批量导入
      /// </summary>
      /// <param name="_mySqlConnection">数据库连接地址</param>
      /// <param name="table"></param>
      /// <param name="csvName"></param>
      /// <returns></returns>
      public static int BulkLoad(MySqlConnection _mySqlConnection, DataTable table, string csvName)
      {
          var columns = table.Columns.Cast<DataColumn>().Select(colum => colum.ColumnName).ToList();
          MySqlBulkLoader bulk = new MySqlBulkLoader(_mySqlConnection)
          {
              FieldTerminator = "\t",
              FieldQuotationCharacter = '"',
              EscapeCharacter = '"',
              LineTerminator = "\r\n",
              FileName = csvName,
              NumberOfLinesToSkip = 0,
              TableName = table.TableName,

          };

          bulk.Columns.AddRange(columns);
          return bulk.Load();
      }
  }


  /// <summary>
  /// csv扩展
  /// </summary>
  public static class CSVEx
  {
      /// <summary>
      ///将DataTable转换为标准的CSV文件
      /// </summary>
      /// <param name="table">数据表</param>
      /// <param name="tmpPath">文件地址</param>
      /// <returns>返回标准的CSV</returns>
      public static void ToCsv(this DataTable table, string tmpPath)
      {
          //以半角逗号(即,)作分隔符,列为空也要表达其存在。
          //列内容如存在半角逗号(即,)则用半角引号(即"")将该字段值包含起来。
          //列内容如存在半角引号(即")则应替换成半角双引号("")转义,并用半角引号(即"")将该字段值包含起来。
          StringBuilder sb = new StringBuilder();
          DataColumn colum;
          foreach (DataRow row in table.Rows)
          {
              for (int i = 0; i < table.Columns.Count; i++)
              {
                  Type _datatype = typeof(DateTime);
                  colum = table.Columns[i];
                  if (i != 0) sb.Append("\t");
                  //if (colum.DataType == typeof(string) && row[colum].ToString().Contains(","))
                  //{
                  //    sb.Append("\"" + row[colum].ToString().Replace("\"", "\"\"") + "\"");
                  //}
                  if (colum.DataType == _datatype)
                  {
                      sb.Append(((DateTime)row[colum]).ToString("yyyy/MM/dd HH:mm:ss"));
                  }
                  else sb.Append(row[colum].ToString());
              }
              sb.Append("\r\n");
          }
          StreamWriter sw = new StreamWriter(tmpPath, false, UTF8Encoding.UTF8);
          sw.Write(sb.ToString());
          sw.Close();
      }

  }

  /// <summary>
  /// 字符串转化
  /// </summary>
  public static class StringExtensions
  {
      /// <summary>
      /// 转换为 main_keys_id 这种形式的字符串方式
      /// </summary>
      public static string ToSnakeCase(this string input)
      {
          if (string.IsNullOrEmpty(input)) { return input; }

          var startUnderscores = Regex.Match(input, @"^_+");
          return startUnderscores + Regex.Replace(input, @"([a-z0-9])([A-Z])", "$1_$2").ToLower();
      }
  }


  /// <summary>
  /// 实体
  /// </summary>
  public class CrmCouponTestDto
  {
      /// <summary>
      /// ID
      /// </summary>
      public long Id { get; set; }

      /// <summary>
      /// 卡券号
      /// </summary>     
      public string CouponCode { get; set; }

      /// <summary>
      /// 卡券ID
      /// </summary>
      public int CouponId { get; set; }

      /// <summary>
      /// 会员ID
      /// </summary>
      public int MemberId { get; set; }

      /// <summary>
      /// 发放时间
      /// </summary>   
      public DateTime IssueTime { get; set; }

      /// <summary>
      /// 使用时间
      /// </summary>      
      public DateTime UsageTime { get; set; }

      /// <summary>
      /// 使用店铺ID
      /// </summary>      

      public int UsageShopId { get; set; }

      /// <summary>
      /// 使用单号
      /// </summary>      
      public string UsageBillNo { get; set; }

      /// <summary>
      /// 有效开始时间
      /// </summary>      
      public DateTime EffectiveStart { get; set; }

      /// <summary>
      /// 有效结束时间
      /// </summary>      
      public DateTime EffectiveEnd { get; set; }

      /// <summary>
      /// 状态
      /// CouponStatus 卡券状态:
      /// -1:未领用
      /// 0:未使用
      /// 1:已使用
      /// 2:已过期
      ///3:已作废
      ///4:转赠中
      /// </summary>

      public Int16 Status { get; set; }
  }
}

以上就是利用MySqlBulkLoader实现批量插入数据的示例详解的详细内容,更多关于MySqlBulkLoader批量插入数据的资料请关注编程宝库其它相关文章!

Apache Thrift 是 Facebook 实现的一种高效的、支持多种编程语言的远程服务调用的框架。和其它RPC框架相比,它主要具有如下连个特点:高性能。 它采用的是二进制序列化,并且 ...