Bulk Insert در SQL با استفاده از زبان سی شارپ ، مقاله آموزشی که یکی از کاربران در دنیای برنامه نویسی با مشکلی که برای انتقال دیتای سنگین نیاز داشت از آن استفاده میکند و از این داستانی را برای شما شرح می دهد.
روزی در محل کار، وظیفه ای داشتم که نیاز بود درج انبوهی از داده ها را در یک جدول پایگاه داده ی SQL Server انجام دهم.
بطور واضح با ابزار خط فرمان bcp.exe روبرو شدم (و در گذشته استفاده کرده بودم).
برای زمانیکه میخواهید اسکریپت هایی را اجرا کنید و … خوب و مناسب است.
با این حال این بار، میخواستم درج انبوه را بصورت برنامه نویسی با استفاده از برخی کدهای استاندارد .NET انجام دهم.
همانطور که گفتم، این کاری نیست که پیشتر باید با کد انجام میدادم.
بنابراین شروع به کار برای یافتن نحوه ی انجام آن کردم، و پس از چند دقیقه جستجو در گوگل، پاسخی که به دنبال آن میگشتم را پیدا کردم، که این است:
کلاس SqlBulkCopy
این کلاس از نسخه ی ۲.۰ در .NET وجود داشته است، به نظرم اگر نیازی به این ها نداشته باشید، گاهی از دست شما در میروند، که در این مورد، برای من، اتفاق افتاد!
متد اصلی که در این کلاس استفاده خواهید کرد WriteToServer(..)ها هستند که در آن چند سربارگذاری وجود دارد که DataTable/DataRow[] و IDataReader را بکار میگیرند.
- WriteToServer(DataRow[])
- WriteToServer(DataTable)
- WriteToServer(IDataReader)
- WriteToServer(DataTable, DataRowState)
- WriteToServerAsync(DataRow[])
- WriteToServerAsync(DataTable)
- WriteToServerAsync(IDataReader)
- WriteToServerAsync(DataRow[], CancellationToken)
- WriteToServerAsync(DataTable, DataRowState)
- WriteToServerAsync(DataTable, CancellationToken)
- WriteToServerAsync(IDataReader, CancellationToken)
بطور کلی میخواهید متدهای بالا را که از IDataReader استفاده میکنند را بکار بگیرید، به این دلیل که DataReader فقط رو به جلو و فقط خواندنی است.
داده ها را نگه نمیدارد و از این رو بسیار سریعتر از DataTable و DataRows[] است.
سناریوی من این بود که میخواستم یک درج انبوه انجام دهم، پس به سراغ Bulk Insert رفتم.
درج انبوه (Bulk Insert)
ترفند انجام این کار، استفاده از SqlBulkCopy برای انجام یک درج انبوه است، نیاز است یک IDataReader دلخواه ایجاد کنیم.
این کار آسان میبود اگر میتوانستیم کاری مثل ObjectDataReader<SomeObject> انجام داده و آن را برای تغذیه ی WriteToServer() با مجموعه ای از اشیاء استفاده کنیم.
متأسفانه، این امر وجود ندارد، بنابراین باید آن را برای شخص خود پیاده سازی کنید.
public interface IDataReader : IDisposable, IDataRecord { int Depth { get; } bool IsClosed { get; } int RecordsAffected { get; } void Close(); DataTable GetSchemaTable(); bool NextResult(); bool Read(); }
Mike Goatly نمونه پیاده سازی در حال کار آن را به ما میدهد، که بصورت زیر است:
namespace SqlBulkCopyExample { using System; using System.Collections.Generic; using System.Data; using System.Linq; using System.Linq.Expressions; using System.Reflection; public class ObjectDataReader<TData> : IDataReader { /// <summary> /// The enumerator for the IEnumerable{TData} passed to the constructor for /// this instance. /// </summary> private IEnumerator<TData> dataEnumerator; /// <summary> /// The lookup of accessor functions for the properties on the TData type. /// </summary> private Func<TData, object>[] accessors; /// <summary> /// The lookup of property names against their ordinal positions. /// </summary> private Dictionary<string, int> ordinalLookup; /// <summary> /// Initializes a new instance of the <see cref="ObjectDataReader<TData>"/> class. /// </summary> /// <param name="data">The data this instance should enumerate through.</param> public ObjectDataReader(IEnumerable<TData> data) { this.dataEnumerator = data.GetEnumerator(); // Get all the readable properties for the class and // compile an expression capable of reading it var propertyAccessors = typeof(TData) .GetProperties(BindingFlags.Instance | BindingFlags.Public) .Where(p => p.CanRead) .Select((p, i) => new { Index = i, Property = p, Accessor = CreatePropertyAccessor(p) }) .ToArray(); this.accessors = propertyAccessors.Select(p => p.Accessor).ToArray(); this.ordinalLookup = propertyAccessors.ToDictionary( p => p.Property.Name, p => p.Index, StringComparer.OrdinalIgnoreCase); } /// <summary> /// Creates a property accessor for the given property information. /// </summary> /// <param name="p">The property information to generate the accessor for.</param> /// <returns>The generated accessor function.</returns> private Func<TData, object> CreatePropertyAccessor(PropertyInfo p) { // Define the parameter that will be passed - will be the current object var parameter = Expression.Parameter(typeof(TData), "input"); // Define an expression to get the value from the property var propertyAccess = Expression.Property(parameter, p.GetGetMethod()); // Make sure the result of the get method is cast as an object var castAsObject = Expression.TypeAs(propertyAccess, typeof(object)); // Create a lambda expression for the property access and compile it var lamda = Expression.Lambda<Func<TData, object>>(castAsObject, parameter); return lamda.Compile(); } #region IDataReader Members public void Close() { this.Dispose(); } public int Depth { get { return 1; } } public DataTable GetSchemaTable() { return null; } public bool IsClosed { get { return this.dataEnumerator == null; } } public bool NextResult() { return false; } public bool Read() { if (this.dataEnumerator == null) { throw new ObjectDisposedException("ObjectDataReader"); } return this.dataEnumerator.MoveNext(); } public int RecordsAffected { get { return -1; } } #endregion #region IDisposable Members public void Dispose() { this.Dispose(true); GC.SuppressFinalize(this); } protected void Dispose(bool disposing) { if (disposing) { if (this.dataEnumerator != null) { this.dataEnumerator.Dispose(); this.dataEnumerator = null; } } } #endregion #region IDataRecord Members public int FieldCount { get { return this.accessors.Length; } } public bool GetBoolean(int i) { throw new NotImplementedException(); } public byte GetByte(int i) { throw new NotImplementedException(); } public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length) { throw new NotImplementedException(); } public char GetChar(int i) { throw new NotImplementedException(); } public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length) { throw new NotImplementedException(); } public IDataReader GetData(int i) { throw new NotImplementedException(); } public string GetDataTypeName(int i) { throw new NotImplementedException(); } public DateTime GetDateTime(int i) { throw new NotImplementedException(); } public decimal GetDecimal(int i) { throw new NotImplementedException(); } public double GetDouble(int i) { throw new NotImplementedException(); } public Type GetFieldType(int i) { throw new NotImplementedException(); } public float GetFloat(int i) { throw new NotImplementedException(); } public Guid GetGuid(int i) { throw new NotImplementedException(); } public short GetInt16(int i) { throw new NotImplementedException(); } public int GetInt32(int i) { throw new NotImplementedException(); } public long GetInt64(int i) { throw new NotImplementedException(); } public string GetName(int i) { throw new NotImplementedException(); } public int GetOrdinal(string name) { int ordinal; if (!this.ordinalLookup.TryGetValue(name, out ordinal)) { throw new InvalidOperationException("Unknown parameter name " + name); } return ordinal; } public string GetString(int i) { throw new NotImplementedException(); } public object GetValue(int i) { if (this.dataEnumerator == null) { throw new ObjectDisposedException("ObjectDataReader"); } return this.accessors[i](this.dataEnumerator.Current); } public int GetValues(object[] values) { throw new NotImplementedException(); } public bool IsDBNull(int i) { throw new NotImplementedException(); } public object this[string name] { get { throw new NotImplementedException(); } } public object this[int i] { get { throw new NotImplementedException(); } } #endregion } }
با این کد بسیار کارآمد ، تنها کاری که نیاز است انجام دهیم به این شکل است تا درج انبوه ( Bulk Insert ) با استفاده از یک IDataReader به وسیله ی کلاس SqlBulkCopy را انجام دهیم.
namespace SqlBulkCopyExample { using System; using System.Collections.Generic; using System.Data; using System.Data.SqlClient; using System.Diagnostics; using System.Linq; using SqlBulkCopyExample.Properties; class Program { static void Main(string[] args) { var people = CreateSamplePeople(10000); using (var connection = new SqlConnection( "Server=.;Database=MostWanted;Integrated Security=SSPI")) { connection.Open(); InsertDataUsingSqlBulkCopy(people, connection); } } private static void InsertDataUsingSqlBulkCopy( IEnumerable<Person> people, SqlConnection connection) { var bulkCopy = new SqlBulkCopy(connection); bulkCopy.DestinationTableName = "Person"; bulkCopy.ColumnMappings.Add("Name", "Name"); bulkCopy.ColumnMappings.Add("DateOfBirth", "DateOfBirth"); using (var dataReader = new ObjectDataReader<Person>(people)) { bulkCopy.WriteToServer(dataReader); } } private static IEnumerable<Person> CreateSamplePeople(int count) { return Enumerable.Range(0, count) .Select(i => new Person { Name = "Person" + i, DateOfBirth = new DateTime( ۱۹۵۰ + (i % 50), ((i * 3) % 12) + 1, ((i * 7) % 29) + 1) }); } } }
در این آموزش ما یک نمونه کد برای انتقال دیتا با Bulk Insert نوشتیم شما نیز میتوانید از کد های فوق در پروژه های خود استفاده کنید.
دیدگاهها