NetCore(Akka)를 이용하여 벌크InsertOrUpdate를 처리해보겠습니다.
이 아티컬의 목표
불특정 시간, 또는 일정하게 대량으로 발생하는 이벤트를 유연한 벌크처리를 하여
RDB와 ElasticSearch에 대량인입이 가능해집니다. 일반적으로 1 Event 1인입 하게 되면 커넥션 비용이 훨씬 많기 때문에 대량 인입처리에 적합하지 않습니다.
SQL과 NOSQL(ELK) 영역에 대량 인입하는 전형적인 방법 두가지 방법을 닷넷코어 어플리케이션을 통해 구현하고 작동시켜보겠습니다.
인프라 준비
도커를 통해 로컬 인프라를 준비합니다 ( 상세한 도커 구동명령 생략)
- mysql을 생성하고 준비된 이벤트 스키마및 테이블을 자동 생성합니다.
- elastic search및 키바나를 자동생성합니다.
사용된 모듈
<PackageReference Include="AkkaDotModule.Webnori" Version="1.1.1" /> <PackageReference Include="NEST" Version="7.5.1" /> <PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="3.1.1" />
- AkkaDotModule.Webnori : 저장소 종속없는 벌크 유틸리티를 제공합니다. ( AKKA를 이용)
- NEST : 엘라스틱서치의 API를 닷넷코드로 이용할수 있습니다.
- Pomelo.EntityFrameworkCore.MySql : Entity ORM + MYSQL을 사용할때 활용할수 있습니다. 여기서는 ORM이 아닌 네이티브 쿼리만을 이용하였습니다.
Application LayOut
- EventController : Event를 RestAPI를 통해 단건/복수건 받을수 있습니다.
- InsertActor : Event를 Queue에 적재합니다.
- BulkWriterActor : 정해진 롤에의해 쌓인만큼 벌크 처리를 시도합니다.
- ElasticEngine / EventRepository : 저장하는 기능을 각각 제공합니다.
이벤트 Entity정의 (+메시지)
벌크 처리할 Event를 정의하였으며, 대량의 InsertOrUpdate를 지원하도록 이벤트를 정의하였습니다.
- POCO 를 지향하여, ElasticSearch의 저장모델과,MySQL의 저장모델을 일치 하였습니다.
using System; namespace BulkBatchApp.Entity { public class TestEvent { public string id { get; set; } public int action_type { get; set; } public string action_name { get; set; } public DateTime upd_dt { get; set; } public DateTime reg_dt { get; set; } } public class InsertOrUpdateTestEvent : TestEvent { } }
벌크 처리 저장소 구현 - MYSQL
using BulkBatchApp.Config; using BulkBatchApp.Entity; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace BulkBatchApp.Repositories { public class EventRepository : DbContext { private string database = "webnori"; private readonly AppSettings _appSettings; public DbSet<QueryIntResult> Native { get; set; } //For Native Query private readonly ILogger<EventRepository> _logger; public EventRepository(AppSettings appSettings, ILogger<EventRepository> logger) { _appSettings = appSettings; _logger = logger; } protected override void OnModelCreating(ModelBuilder modelBuilder) { modelBuilder .Entity<QueryIntResult>(eb => { eb.HasNoKey(); }); } protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) { string dbOption = "Convert Zero Datetime=true;"; string dbConnectionString = string.Empty; dbConnectionString = _appSettings.DBConnection + $"database={database};" + dbOption; optionsBuilder.UseMySql(dbConnectionString); base.OnConfiguring(optionsBuilder); } private string DateToDbString(DateTime dt) { return dt.ToString("yyyy-MM-dd hh:mm:ss"); } public async Task BulkInsertOrUpdateToEventTable(List<TestEvent> testEvents) { StringBuilder queryStr = new StringBuilder(""); queryStr.Append($"INSERT INTO tbl_test_bulk( id, action_type, action_name, reg_dt) VALUES "); int idx = 0; foreach (var testEvent in testEvents) { idx++; string appendQuery = $"('{testEvent.id}',{testEvent.action_type},'{testEvent.action_name}','{DateToDbString(testEvent.reg_dt)}')"; if (idx == testEvents.Count) { queryStr.AppendLine(appendQuery); } else { queryStr.AppendLine(appendQuery + ","); } } queryStr.AppendLine(@" ON DUPLICATE KEY UPDATE action_type= VALUES(action_type), action_name=VALUES(action_name), reg_dt=VALUES(reg_dt); SELECT 1 as result;"); _logger.LogDebug("======= BulkQuery ========"); _logger.LogDebug(queryStr.ToString()); await Native.FromSqlRaw(queryStr.ToString()).ToListAsync().ConfigureAwait(false); } } }
https://entityframework-extensions.net/bulk-update 를 활용하여 ORM버전을 활용하여 더 간단한 코드로
BulkUpdate가 가능하지만, EntityContext를 사용하여 Repository(ORM사용가능하지만, NativeSQL모드) 를 구현하였습니다.
벌크 처리 가능 저장소 구현 - ELASTICSEARCH
using BulkBatchApp.Config; using BulkBatchApp.Entity; using Nest; using System; using System.Collections.Generic; using System.Threading.Tasks; namespace BulkBatchApp.Adapters { public interface IElasticEngine { IElasticClient GetClientForTestEvent(); } public class ElasticEngine : IElasticEngine { private IElasticClient clientForTestEvent { get; set; } public ElasticEngine(AppSettings appSettings) { var defaultSettings = new ConnectionSettings(new Uri(appSettings.ElkConnection)) .DefaultIndex(appSettings.ElkIndex) .DefaultMappingFor<TestEvent>(m => m .IdProperty(p => p.id) ); clientForTestEvent = new ElasticClient(defaultSettings); } public IElasticClient GetClientForTestEvent() { return clientForTestEvent; } public async Task BulkInsertOrUpdateToEventTable(List<TestEvent> testEvents) { var descriptorProductPerPage = new BulkDescriptor(); foreach(var testEvent in testEvents) { descriptorProductPerPage.Index<TestEvent>(op => op .Document(testEvent)); } var result = await GetClientForTestEvent().BulkAsync(descriptorProductPerPage); if (!result.ApiCall.Success) { throw new Exception($"Failed Elk-Bulk Insert : messge {result.ServerError}"); } } } }
.DefaultMappingFor<TestEvent>(m => m
.IdProperty(p => p.id)
);ElasticSearch의 문서는 기본적으로 유니크한 도큐먼트 id를 , 랜덤해시값을 생성하며
유니크한 id를 고정하려면, Id맵핑을 지정하면 됩니다. ( for InsetOrUpdate)
도큐먼트집합을 먼저 만들고, BulkAsync 명령을 통해 벌크처리가 가능합니다.
인입액터
using Akka.Actor; using AkkaDotModule.ActorUtils; using AkkaDotModule.Models; using BulkBatchApp.Entity; using Microsoft.Extensions.DependencyInjection; namespace BulkBatchApp.Actors { public class InsertActor : ReceiveActor { private readonly IActorRef _bulkWriterActor; private readonly IActorRef _batchActor; //벌크옵션, bulkSec 동안모아두고 처리하거나, bulkCount만큼찼을때 int bulkSec = 3; int bulkCount = 1000; int eventCount = 0; public InsertActor(IServiceScopeFactory scopeFactory) { _bulkWriterActor = Context.ActorOf(Props.Create(() => new BulkWriterActor(scopeFactory))); _batchActor = Context.ActorOf(Props.Create(() => new BatchActor(bulkSec))); _batchActor.Tell(new SetTarget(_bulkWriterActor)); //배치처리기 연결( 롤설정된 배치만큼 처리를 요청) ReceiveAsync<InsertOrUpdateTestEvent>(async insertOrUpdateTestEvent => { _batchActor.Tell(new Queue(insertOrUpdateTestEvent)); eventCount++; if (eventCount > bulkCount) { eventCount = 0; //버퍼오버플로우 방지를 위해, 지금까지 받은 데이터 처리 _batchActor.Tell(new Flush()); } }); } } }
BatchActor 는 유연한 벌크처리를 위해 제공되는(AkkaDotModule.Webnori) 모듈이며
지정된 롤에의해 조건이 쌓이면, 해당 컬렉션이 우리가 지정할 Writer액터에게 처리할만큼만 전송됩니다.
대량처리 액터
처리할만큼 컬렉션이 오게되며 여기서는 , RDB/ELK 동시에 벌크 InsertOrUpdate를 처리합니다.
using Akka.Actor; using Akka.Event; using AkkaDotModule.Models; using BulkBatchApp.Adapters; using BulkBatchApp.Entity; using BulkBatchApp.Repositories; using Microsoft.Extensions.DependencyInjection; using System; using System.Collections.Generic; namespace BulkBatchApp.Actors { public class BulkWriterActor : ReceiveActor { private readonly ILoggingAdapter logger = Context.GetLogger(); private readonly IServiceScopeFactory _scopeFactory; public BulkWriterActor(IServiceScopeFactory scopeFactory) { _scopeFactory = scopeFactory; ReceiveAsync<object>(async message => { //각각 저장하고 싶은 DB에따라 배치처리를 수행한다. if (message is Batch batchMessage) { Batch batch = message as Batch; //Type 검사 (BulkWriteActor는 동일한 Type의 컬렉션처리) if (batch.Obj[0] is InsertOrUpdateTestEvent) { using (var scope = _scopeFactory.CreateScope()) { var eventRepository = scope.ServiceProvider.GetRequiredService<EventRepository>(); var elasitcEngine = scope.ServiceProvider.GetRequiredService<ElasticEngine>(); List<TestEvent> testEvents = new List<TestEvent>(); batch.Obj.ForEach(obj => { var AddItem = obj as InsertOrUpdateTestEvent; testEvents.Add(AddItem); }); try { await elasitcEngine.BulkInsertOrUpdateToEventTable(testEvents); await eventRepository.BulkInsertOrUpdateToEventTable(testEvents); } catch (Exception e) { logger.Error($"Failed BulkProductCateRelInsertOrUpdate: count:{testEvents.Count} e:{e.Message}"); } } } } }); } } }
이벤트발생
단건 또는 대량의 이벤트를 발생시킬수 있으며
불특정 발생하는 단건 이벤트에 대해서도 벌크처리(3초동안 모아둔) 가 가능합니다.
이벤트는 배치처리와 다르게 예상하기 힘들며 최대수를 넘게되면 나눠서 처리가 됩니다.
RDB와 ELK에 동시에 벌크처리가 가능함을 확인할수 있습니다.
추가참고 자료
- 작동가능 예제 코드 : https://github.com/psmon/AkkaNetBulkBatch
- FSM배치 처리기 : 00.Finite State Machines
- RDB 유료 벌크 처리기 : https://entityframework-extensions.net/bulk-update - 이 아티클을 통해 유료버전없이 우아한 배치업데이트 처리가가능합니다.