You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »


// Create ActorSystem
var actorSystem = akkaService.CreateActorSystem();

// Create RoundRobin Router
var roundrobin = actorSystem.ActorOf(Props.Create<BasicActor>().WithRouter(new RoundRobinPool(0)));

// Create Throttle And connect Router
int tps = 1;
var throttle = actorSystem.ActorOf(Props.Create(() => new ThrottleActor(tps)));
throttle.Tell(new SetTarget(roundrobin));

// Create Worker and Add Routee
var workActor = ActorSystem.ActorOf(Props.Create<BasicActor>(), NodeName);
var routee = Routee.FromActorRef(workActor);
roundrobin.Tell(new AddRoutee(routee));

// Say Hello
throttle.Tell(new TodoQueue()
{
    Todo = new Todo()
    {
        Id = i.ToString(),
        Title = ""Hello""                        
    }
});



ThrottleActor
    public class ThrottleActor : ReceiveActor
    {
        private readonly ILoggingAdapter logger = Context.GetLogger();

        private IActorRef? consumer;

        private IActorRef _throttler;

        private readonly IMaterializer _materializer;

        private int _processCouuntPerSec;

        public ThrottleActor(int processCouuntPerSec)
        {
            _materializer = Context.Materializer();

            _processCouuntPerSec = processCouuntPerSec;

            _throttler =
                Source.ActorRef<object>(1000, OverflowStrategy.DropNew)
                      .Throttle(_processCouuntPerSec, TimeSpan.FromSeconds(1), _processCouuntPerSec, ThrottleMode.Shaping)
                      .To(Sink.ActorRef<object>(Self, NotUsed.Instance))
                      .Run(_materializer);

            ReceiveAsync<SetTarget>(async target =>
            {
                consumer = target.Ref;
            });


            ReceiveAsync<TPSInfoReq>(async target =>
            {
                Sender.Tell(_processCouuntPerSec);
            });

            ReceiveAsync<ChangeTPS>(async msg =>
            {
                var oldThrottler = _throttler;

                _processCouuntPerSec = msg.processCouuntPerSec;

                _throttler =
                    Source.ActorRef<object>(1000, OverflowStrategy.DropNew)
                            .Throttle(_processCouuntPerSec, TimeSpan.FromSeconds(1), _processCouuntPerSec, ThrottleMode.Shaping)
                            .To(Sink.ActorRef<object>(Self, NotUsed.Instance))
                            .Run(_materializer);

                oldThrottler.Tell(PoisonPill.Instance);

            });


            ReceiveAsync<TodoQueue>(async msg =>
            {
                _throttler.Tell(new Todo()
                {
                    Id = msg.Todo.Id,
                    Title = msg.Todo.Title
                });
            });

            ReceiveAsync<Todo>(async msg =>
            {
                logger.Info($"{msg.Id} - {msg.Title}");
                // TODO Something

                if (consumer != null)
                {
                    consumer.Tell(msg);
                }
            });
        }
    }





  • No labels