Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Info

NetCore(Akka)를 이용하여 벌크InsertOrUpdate를 처리해보겠습니다.

git : https://github.com/psmon/AkkaNetBulkBatch


이 아티컬의 목표

불특정 시간, 또는 일정하게 대량으로 발생하는 이벤트를 유연한 벌크처리를 하여

RDB와 ElasticSearch에 대량인입이 가능해집니다. 일반적으로 1Event에서 1 Insert를 일반적으로 1 Event 1인입 하게 되면 커넥션 비용이 훨씬 많기 때문에 대량 인입처리에 적합하지 않습니다.

...

https://entityframework-extensions.net/bulk-update 를 활용하여 ORM버전을 활용하여 더 간단한 코드로

BulkUpdate가 가능하지만 가격이 높으며 유료입니다, EntityContext를 사용하여 Repository(ORM사용가능하지만, NativeSQL모드) 를 구현하였습니다.


벌크 처리 가능 저장소 구현 - ELASTICSEARCH

...

.DefaultMappingFor<TestEvent>(m => m
.IdProperty(p => p.id)
);

ElasticSearch의 문서는 기본적으로 유니크한 도큐먼트 id를 , 랜덤해시값을 생성하며 

유니크한 id를 고정하려면, Id맵핑을 지정하면 됩니다. ( for InsetOrUpdate)

도큐먼트집합을 먼저 만들고,  BulkAsync 명령을 통해 벌크처리가 가능합니다.

인입액터

Code Block
themeEmacs
using Akka.Actor;
using AkkaDotModule.ActorUtils;
using AkkaDotModule.Models;
using BulkBatchApp.Entity;
using Microsoft.Extensions.DependencyInjection;

namespace BulkBatchApp.Actors
{
    public class InsertActor : ReceiveActor
    {
        private readonly IActorRef _bulkWriterActor;
        private readonly IActorRef _batchActor;

        //벌크옵션, bulkSec 동안모아두고 처리하거나, bulkCount만큼찼을때
        int bulkSec = 3;
        int bulkCount = 1000;
        int eventCount = 0;

        public InsertActor(IServiceScopeFactory scopeFactory)
        {
            _bulkWriterActor = Context.ActorOf(Props.Create(() => new BulkWriterActor(scopeFactory)));
            _batchActor = Context.ActorOf(Props.Create(() => new BatchActor(bulkSec)));
            _batchActor.Tell(new SetTarget(_bulkWriterActor));  //배치처리기 연결( 롤설정된 배치만큼 처리를 요청)
            ReceiveAsync<InsertOrUpdateTestEvent>(async insertOrUpdateTestEvent =>
            {
                _batchActor.Tell(new Queue(insertOrUpdateTestEvent));
                eventCount++;
                if (eventCount > bulkCount)
                {
                    eventCount = 0;
                    //버퍼오버플로우 방지를 위해, 지금까지 받은 데이터 처리
                    _batchActor.Tell(new Flush());
                }
            });
        }
    }
}

...

Code Block
themeEmacs
title이벤트 발생 API
collapsetrue
using Akka.Actor;
using AkkaDotModule.Config;
using BulkBatchApp.Entity;
using Microsoft.AspNetCore.Mvc;
using System;
using System.Threading.Tasks;

namespace BulkBatchApp.Controllers
{
    [Route("api/[controller]")]
    public class EventController : Controller
    {
        private readonly IActorRef _insertActor;

        public EventController()
        {
            _insertActor = AkkaLoad.ActorSelect("InsertActor"); 
        }

        /// <summary>
        /// 사용자 이벤트를 발생시킨다.
        /// </summary>        
        /// <response code="200">성공</response>
        /// <response code="412">
        /// ....         
        /// </response>
        [HttpPost("EventRaise")]
        public async Task<string> EventRaise(            
            [FromBody] InsertOrUpdateTestEvent userEvent)
        {
            _insertActor.Tell(userEvent);
            var result = "ok";
            return result;
        }

        /// <summary>
        /// 사용자 이벤트를 다수 발생시킨다.
        /// </summary>        
        /// <param name="repeat">반복</param>        
        /// <response code="200">성공</response>
        /// <response code="412">
        /// ....         
        /// </response>
        [HttpPost("EventBulkRaise")]
        public async Task<string> EventBulkRaise(
            int repeat,
            [FromBody] InsertOrUpdateTestEvent userEvent)
        {
            for (int i= 0; i<repeat; i++)
            {
                InsertOrUpdateTestEvent bulkEvent = new InsertOrUpdateTestEvent()
                {
                    id = userEvent.id + i,                    
                    action_type = userEvent.action_type,
                    action_name = userEvent.action_name + "_" + i,
                    reg_dt = DateTime.Now,
                    upd_dt = DateTime.Now
                };

                _insertActor.Tell(bulkEvent);
            }
            
            var result = "ok";
            return result;
        }

    }
}


RDB와 ELK에 동시에 벌크처리가 가능함을 확인할수 있습니다.

Image Modified

단건 또는 ORM과 비교해서 성능차이

Image Removed




추가참고 자료