数据流处理数据

using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;//引用命名空间

namespace TPL数据流处理数据
{
    class Program
    {
        /*建表脚本
         
CREATE TABLE [dbo].[Persons](
    [ID] [int] NULL,
    [Name] [varchar](50) COLLATE Chinese_PRC_CI_AS NULL,
    [Age] [varchar](50) COLLATE Chinese_PRC_CI_AS NULL,
    [Sex] [varchar](50) COLLATE Chinese_PRC_CI_AS NULL,
    [Addres] [varchar](50) COLLATE Chinese_PRC_CI_AS NULL,
    [Height] [varchar](50) COLLATE Chinese_PRC_CI_AS NULL,
    [Weight] [varchar](50) COLLATE Chinese_PRC_CI_AS NULL
) 
         
         */
        //https://msdn.microsoft.com/zh-cn/vstudio/hh228603(v=vs.96)#predefined_types
        static readonly string connectionString = "Data Source=.;Initial Catalog=TplDB;Persist Security Info=True;User ID=sa;Password=xxxxx";
        public class Person
        {
            public int ID { get; set; }
            public string Name { get; set; }
            public int Age { get; set; }
            public string Sex { get; set; }
            public string Addres { get; set; }
            public string Height { get; set; }
            public string Weight { get; set; }
            static int RandomSeed()
            {
                byte[] bytes = new byte[4];
                System.Security.Cryptography.RNGCryptoServiceProvider rng = new System.Security.Cryptography.RNGCryptoServiceProvider();
                rng.GetBytes(bytes);
                return BitConverter.ToInt32(bytes, 0);
            }
            //待插入生成对象
            public static ITargetBlock<Person> GetPerson(ITargetBlock<Person> list, int count)
            {
                for (int i = 0; i < count; i++)
                {
                    //生成不重复随机信息
                    Random ran = new Random(RandomSeed());
                    list.Post(new Person { ID = i, Name = "Jhon" + i, Age = ran.Next(18, 70), Sex = i % 2 == 0 ? "" : "", Addres = "滨江滨河路" + i, Height = ran.Next(155, 210) + "cm", Weight = ran.Next(40, 200) + "kg" });
                }
                return list;
            }
            //使用缓冲区
            public static void ADDPersons(int count)
            {
                //创建一个ActionBlock添加多个对象,并设置批处理大小
                var batchs = new BatchBlock<Person>(1024 * 1024 * 1024);
                var insertEmployees = new ActionBlock<Person[]>(a => InsertPersons(a));
                //链接批块动作块
                batchs.LinkTo(insertEmployees);
                // 当批块完成时,设置动作块也完成。
                batchs.Completion.ContinueWith(delegate { insertEmployees.Complete(); });
                //对象批处理块。
                GetPerson(batchs,count);
                //设置批块完成状态,等待,插入操作完成。
                batchs.Complete();
                insertEmployees.Completion.Wait();
            }
            //不使用缓冲区
            public static void AddPersons(string connectionString, int count)
            {
                var insertEmployee = new ActionBlock<Person>(e => InsertPersons(new Person[] { e }));
                GetPerson(insertEmployee, count);
                //设置状态
                insertEmployee.Complete();
                insertEmployee.Completion.Wait();
            }

            //插入信息
            public static void InsertPersons(Person[] list)
            {
                SqlConnection sqlConn = new SqlConnection(connectionString);//连接数据库  
                try
                {
                    SqlCommand sqlComm = new SqlCommand();
                    sqlComm.CommandText = @"INSERT INTO [TplDB].[dbo].[Persons]([ID] ,[Name],[Age] ,[Sex] ,[Addres],[Height],[Weight]) VALUES(@ID,@Name,@Age ,@Sex ,@Addres ,@Height ,@Weight)";//参数化SQL  
                    sqlComm.Parameters.Add("@ID", SqlDbType.Int);
                    sqlComm.Parameters.Add("@Name", SqlDbType.VarChar);
                    sqlComm.Parameters.Add("@Age", SqlDbType.VarChar);
                    sqlComm.Parameters.Add("@Sex", SqlDbType.VarChar);
                    sqlComm.Parameters.Add("@Addres", SqlDbType.VarChar);
                    sqlComm.Parameters.Add("@Height", SqlDbType.VarChar);
                    sqlComm.Parameters.Add("@Weight", SqlDbType.VarChar);
                    sqlComm.CommandType = CommandType.Text;
                    sqlComm.Connection = sqlConn;
                    sqlConn.Open();
                    for (int i = 0; i < list.Length; i++)
                    {
                        sqlComm.Parameters.Clear();
                        sqlComm.Parameters.Add("@ID", list[i].ID);
                        sqlComm.Parameters.Add("@Name", list[i].Name);
                        sqlComm.Parameters.Add("@Age", list[i].Age);
                        sqlComm.Parameters.Add("@Sex", list[i].Sex);
                        sqlComm.Parameters.Add("@Addres", list[i].Addres);
                        sqlComm.Parameters.Add("@Height", list[i].Height);
                        sqlComm.Parameters.Add("@Weight", list[i].Weight);
                        int m = sqlComm.ExecuteNonQuery();
                    }

                }
                catch (Exception ex)
                {
                    throw ex;
                }
                finally
                {
                    sqlConn.Close();
                }


            }

        }

        static void Main(string[] args)
        {
           
            Person.ADDPersons(10000);
            Console.ReadKey();
        }
   
      
    }

 


}
View Code
原文地址:https://www.cnblogs.com/BABLOVE/p/4466196.html