Page History
Info |
---|
NetCore(Akka)를 이용하여 벌크InsertOrUpdate를 처리해보겠습니다. |
이 아티컬의 목표
불특정 시간, 또는 일정하게 대량으로 발생하는 이벤트를 유연한 벌크처리를 하여
...
https://entityframework-extensions.net/bulk-update 를 활용하여 ORM버전을 활용하여 더 간단한 코드로
BulkUpdate가 가능하지만, EntityContext를 사용하여 Repository(ORM버전아닌ORM사용가능하지만, NativeSQL모드) 를 구현하였습니다.
벌크 처리 가능 저장소 구현 - ELASTICSEARCH
...
.DefaultMappingFor<TestEvent>(m => m
.IdProperty(p => p.id)
);ElasticSearch의 문서는 기본적으로 유니크한 도큐먼트 id를 , 랜덤해시값을 생성하며
유니크한 id를 고정하려면, Id맵핑을 지정하면 됩니다. ( for InsetOrUpdate)
도큐먼트집합을 먼저 만들고, BulkAsync 명령을 통해 벌크처리가 가능합니다.
인입액터
Code Block | ||
---|---|---|
| ||
using Akka.Actor; using AkkaDotModule.ActorUtils; using AkkaDotModule.Models; using BulkBatchApp.Entity; using Microsoft.Extensions.DependencyInjection; namespace BulkBatchApp.Actors { public class InsertActor : ReceiveActor { private readonly IActorRef _bulkWriterActor; private readonly IActorRef _batchActor; //벌크옵션, bulkSec 동안모아두고 처리하거나, bulkCount만큼찼을때 int bulkSec = 3; int bulkCount = 1000; int eventCount = 0; public InsertActor(IServiceScopeFactory scopeFactory) { _bulkWriterActor = Context.ActorOf(Props.Create(() => new BulkWriterActor(scopeFactory))); _batchActor = Context.ActorOf(Props.Create(() => new BatchActor(bulkSec))); _batchActor.Tell(new SetTarget(_bulkWriterActor)); //배치처리기 연결( 롤설정된 배치만큼 처리를 요청) ReceiveAsync<InsertOrUpdateTestEvent>(async insertOrUpdateTestEvent => { _batchActor.Tell(new Queue(insertOrUpdateTestEvent)); eventCount++; if (eventCount > bulkCount) { eventCount = 0; //버퍼오버플로우 방지를 위해, 지금까지 받은 데이터 처리 _batchActor.Tell(new Flush()); } }); } } } |
...
RDB와 ELK에 동시에 벌크처리가 가능함을 확인할수 있습니다.
추가참고 자료
- 실습 작동가능 예제 코드 : https://github.com/psmon/AkkaNetBulkBatch
- FSM배치 처리기 : 00.Finite State Machines
- RDB 유료 벌크 처리기 : https://entityframework-extensions.net/bulk-update - 이 아티클을 통해 유료버전없이 우아한 배치업데이트 처리가가능합니다.
...