본문 바로가기

Programmer Jinyo/Scala & FP

AKKA(아카)with Scala 튜토리얼 02


투명한 기부를 하고싶다면 이 링크로 와보세요! 🥰 (클릭!)

바이낸스(₿) 수수료 평생 20% 할인받는 링크로 가입하기! 🔥 (클릭!)

Typed AKKA가 새로 나와서 이 시리즈 글은 번역하다 말았습니다.

여튼 이 튜토리얼도 좋은 튜토리얼이니 의지가 있으시다면 따라가보셔도 좋을 것 같습니다.

2019/12/01 - [Programmer Jinyo/Scala & AKKA] - AKKA(아카)with Scala 튜토리얼 01

2019/12/01 - [Programmer Jinyo/Scala & AKKA] - AKKA(아카)with Scala 튜토리얼 02

2019/12/02 - [Programmer Jinyo/Scala & AKKA] - AKKA(아카)with Scala 튜토리얼 03 (Routers)


자 이어서 가즈아

 

Ask Pattern mapTo

 

이 튜토리얼은 아카의 AskPattern 의 연장선이다. ActorSystem, protocol, Actot는 이전 튜토리얼에서 진행했던 코드를 그대로 이어서 사용한다. 우리는 이 튜토리얼의 Future.mapTo() 메소드에 더 집중 할 것이다. 이것은 Ask Pattern이 future type을 리턴하는데에 쓰일 수 있다.

 

(Future을 아무래도 좀 알아야 할 것 같아 한글로 된 자료를 좀 찾아봤다)

https://yehongj.tistory.com/55 

https://la-stranger.tistory.com/entry/Scala-Future-Tutorial

그리고 이걸로 부족해서 글을 그냥 썼다.

2019/12/15 - [Programmer Jinyo/Scala & AKKA] - Scala Future에 대해 기본은 배워보자! 튜토리얼 ~~

 

 

우리의 이전 예제와 마찬가지로, 우리는 도넛가게 액터시스템을 만들 것이다.

println("Step 1: Create an actor system")
val system = ActorSystem("DonutStoreActorSystem")

 

우리는 우리의 간단한 Info case class를 통해 actor가 react 할 메시지를 생성할 것이다.

println("\nStep 2: Define the message passing protocol for our DonutStoreActor")
object DonutStoreProtocol {
  case class Info(name: String)
}

DonutInfoActor는 이전 Akka Ask Pattern 예제와 동일하다. 액터는 Info type의 메시지에 react한다. 이름 속성이 바닐라 인 경우 bool 응답 값을 true로 보내고 name 속성의 다른 모든 값에 대해 false 값을 보낸다.

println("\nStep 3: Create DonutInfoActor")
class DonutInfoActor extends Actor with ActorLogging {

  import Tutorial_04_Ask_Pattern_MapTo.DonutStoreProtocol._

  def receive = {
    case Info(name) if name == "vanilla" =>
      log.info(s"Found valid $name donut")
      sender ! true

    case Info(name) =>
      log.info(s"$name donut is not supported")
      sender ! false
  }
}

actorOf() 메소드를 사용하여 액터 시스템에 DonutInfoActor를 만든다.

println("\nStep 4: Create DonutInfoActor")
val donutInfoActor = system.actorOf(Props[DonutInfoActor], name = "DonutInfoActor")

Future.mapTo() 메소드를 사용하여 액터의 리턴 타입을 특정 타입으로 mapping 할 수 있다. 이 예시에서 리턴 타입은 bool type에 매핑된다.

println("\nStep 5: Akka Ask Pattern and future mapTo()")
import DonutStoreProtocol._
import akka.pattern._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
implicit val timeout = Timeout(5 second)

val vanillaDonutFound: Future[Boolean] = (donutInfoActor ? Info("vanilla")).mapTo[Boolean]

for {
  found <- vanillaDonutFound
} yield println(s"Vanilla donut found = $found")

Thread.sleep(5000)

마지막으로 terminate()를 써서 액터 시스템을 종료한다.

println("\nStep 6: close the actor system")
val isTerminated = system.terminate()

내 코드는 아래와 같이 완성됐다

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import DonutStoreProtocol._
import akka.pattern._
import akka.util.Timeout

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration._

object DonutStoreActorSystem extends App{
  println("Step 1: Create an actor system")
  val system = ActorSystem("DonutStoreActorSystem")
  println("\nStep 4: Create DonutInfoActor")
  val donutInfoActor = system.actorOf(Props[DonutInfoActor], name = "DonutInfoActor")

  println("\nStep 5: Akka Ask Pattern and future mapTo()")
  implicit val timeout = Timeout(5 second)

  val vanillaDonutFound: Future[Boolean] = (donutInfoActor ? Info("vanilla")).mapTo[Boolean]

  for {
    found <- vanillaDonutFound
  } yield println(s"Vanilla donut found = $found")

  Thread.sleep(5000)

  println("\nStep 6: close the actor system")
  val isTerminated = system.terminate()
}

//println("\nStep 2: Define the message passing protocol for our DonutStoreActor")
object DonutStoreProtocol {
  case class Info(name: String)
}

//println("\nStep 3: Create DonutInfoActor")
class DonutInfoActor extends Actor with ActorLogging {

  def receive = {
    case Info(name) if name == "vanilla" =>
      log.info(s"Found valid $name donut")
      sender ! true

    case Info(name) =>
      log.info(s"$name donut is not supported")
      sender ! false
  }
}

 

 

 

 

 

 

Ask Pattern pipeTo

계속해서 AKKA Ask pattern에 대해서 볼 것이고, pipeTo()라는 쓸모있는 유틸리티에 대해 알아 볼 것이다. Future.andThen 콜백을 등록하여 Future operation에 연결하여 결과를 보낸 sender에게 쉽게 다시 보낼 수 있다. pipeTo() 사용에 대한 자세한 내용은 공식 Akka 설명서를 참고하라고 한다. 새1끼가.

이전 예제와 같이 Akka 액터를 보유 할 DonutStoreActorSystem이라는 액터 시스템을 만들자.

 

println("Step 1: Create an actor system")
val system = ActorSystem("DonutStoreActorSystem")

이 단계에서는 CheckStock이라는 다른 기능을 정의하여 message passing protocol을 보강한다. 실제 응용 프로그램에서는 CheckStock 메시지가 주어진 도넛의 재고 수량을 확인한다고 상상할 수 있을 것이다.

println("\nStep 2: Define the message passing protocol for our DonutStoreActor")
object DonutStoreProtocol {
  case class Info(name: String)

  case class CheckStock(name: String)
}

CheckStock 메시지에 react하는 DonutStockActor를 만들자. 그런 다음 실제 남은 수량 조회를 수행하는 findStock()이라는 future operation에 call을 위임한다. 결과를 sender에게 다시 보내기 위해 다음과 같이 편리한 pipeTo() 메서드를 사용한다.

println("\nStep 3: Create DonutStockActor")
class DonutStockActor extends Actor with ActorLogging {
  import Tutorial_05_Ask_Pattern_pipeTo.DonutStoreProtocol._

  def receive = {
    case CheckStock(name) =>
      log.info(s"Checking stock for $name donut")
      findStock(name).pipeTo(sender)
  }

  def findStock(name: String): Future[Int] = Future {
    // assume a long running database operation to find stock for the given donut
    100
  }
}

우리는 도넛스톡액터를 우리의 도넛스토어액터시스템에 actorOf()메소드를 실행해서 만든다.

println("\nStep 4: Create DonutStockActor")
val donutStockActor = system.actorOf(Props[DonutStockActor], name = "DonutStockActor")

mapTo()예제와 비슷하게 우리는 AKKA Ask 패턴을 도넛 수량을 체크하기 위해 사용한다. mapTo() 메소드를 통해서 우리는 액터로부터 나온 리턴 타입을 특정 타입으로 변경할 수 있다는 것을 기억하라. (이 경우에는 Int type이다)

(바닐라 도넛 스톡 변수를 도넛 스톡 액터에게 ask 하여 리턴받은 체크스톡을 int형으로 변환해서 넣는 과정)

println("\nStep 5: Akka Ask Pattern using mapTo() method")
import DonutStoreProtocol._
import akka.pattern._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
implicit val timeout = Timeout(5 second)

val vanillaDonutStock: Future[Int] = (donutStockActor ? CheckStock("vanilla")).mapTo[Int]

for {
  found <- vanillaDonutStock
} yield println(s"Vanilla donut stock = $found")

Thread.sleep(5000)

마지막으로, 우리는 액터 시스템을 terminate()를 통해 끌 것이다.

println("\nStep 6: Close the actor system")
val isTerminated = system.terminate()

* 참고 링크

스택오버플로우에 좋은 질문이 있더라. mapping vs pipeTo 에 관한 비교 질문이다.

https://stackoverflow.com/questions/47747026/akka-scala-mapping-future-vs-pipeto

 

그리고 그 아래에 보며는 pipeTo란 어떻게 정의되어있는지에 대해 나와있는데 답변이 써억~ 괜찮았다.

결론은 pipeTo를 쓰면 결국 recipient(수령인)에게 성공 / 실패에 따라 onSuccess/Failure을 통해 결과를 tell 해주고 있는 것이므로 슷비슷비하다는 말이었다.

def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = 
  Actor.noSender): Future[T] = {
    future andThen {
      case Success(r) ⇒ recipient ! r
      case Failure(f) ⇒ recipient ! Status.Failure(f)
    }
  }
}

 

 

 

Actor Hierarchy

 

이 튜토리얼에서는 Tell Pattern 예제를 다시 사용할 것이며 Akka Actor Hierarchy에 대해 조금 더 논의 해 볼 것이다. 다시 언급하건데, 앞에서 본 것 처럼 Actor Model은 actor-to-actor 디자인 원칙을 선호한다. 따라서, 우리의 액터시스템 안의 액터들이 위계(Hierarchy)를 가지고 있다는 것을 명심하는게 중요하다.

 

아래 다이어그램은 AKKA로부터 지원되는 actor hierarchy에 대한 좋은 visual overview이다.

 

 

계층의 맨 위에는 root guardian이 있다. 전체 액터 시스템을 모니터링하는 "최상위"액터라고 생각할 수 있다. 다음은 모든 시스템 레벨 액터를 담당하는 "최상위 레벨"액터 인 system guardian이다. 왼쪽에는 user guardian이 표시되며 이는 액터 시스템에서 생성 한 액터의 "최상위"액터 계층 구조이다. someChild 액터에 표시된 것처럼 계층을 중첩 할 수도 있다.

 

지금은 사용자 가디언에 초점을 맞추겠다. Tell Pattern 예제를 다시 불러와보면 DonutInfoActor가 Info 메시지를 수신했을 때 아래 명령문을 기록하고있었다.

 

[INFO] [05/09/2018 11:36:47.095] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutInfoActor] Found vanilla donut

 

위의 로그 문에서 DonutInfoActor의 계층 구조를 볼 수 있다 : akka : // DonutStoreActorSystem / user / DonutInfoActor 이다. 더 나은 비교가 부족하기에, DonutInfoActor의 경로를 파일 시스템의 일부 파일 경로 또는 위치와 유사하게 생각해도 된다. 따라서 DonutInfoActor는 user guardian 아래에 있으며, 액터 시스템은 DonutStoreActorSystem (액터 시스템을 만들 때 사용한 이름)이라는 이름으로도 표시된다.

 

Fail-Fast 원칙을 따르는 시스템을 설계하는 데 도움이되도록 행위자 계층을 이해하는 것이 중요하다. 다음 튜토리얼에서는 장애를 격리하고 복구 옵션으로 미리 계획하는 액터 시스템을 설계하는 방법을 보여준다. 액터 계층에 대해 배운 것은 장애를 처리 할 수있는 탄력적 인 액터 시스템을 설계하는 기초가 될 것이다.

 

 

 

Actor Lookup

 

이 튜토리얼에서는 Actor Hierarchy에 대해 더 알아보고 액터 시스템에서 actor path를 사용하여 액터의 path를 찾는 방법을 보여준다. 이 예제의 목적을 위해 Tell Pattern 튜토리얼의 코드 snippets을 재사용한다. 액터 path에 대한 추가 정보는 액터 경로 및 주소 지정에 대한 공식 Akka 설명서에서 찾을 수 있다.

이제 당신은 아래와 같은 액터 시스템 생성에 익숙할 것이다.

 

println("Step 1: Create an actor system")
val system = ActorSystem("DonutStoreActorSystem")

다음으로 액터와의 메시지 전달을 위한 간단한 프로토콜을 정의한다.

println("\nStep 2: Define the message passing protocol for our DonutStoreActor")
object DonutStoreProtocol {
  case class Info(name: String)
}

이번에는 도넛 인포 액터를 akka.actor trait을 확장해서 생성 해 볼 것이다.

println("\nStep 3: Define DonutInfoActor")
class DonutInfoActor extends Actor with ActorLogging {

  def receive = {
    case Info(name) =>
      log.info(s"Found $name donut")
  }
}

Step 3에서 정의한 도넛 인포 액터로 우리는 액터 시스템 안에서 actorOf() 메소드를 통해 액터를 생성 할 것이다.

println("\nStep 4: Create DonutInfoActor")
val donutInfoActor = system.actorOf(Props[DonutInfoActor], name = "DonutInfoActor")

Tell Pattern 예제에서는 bang 연산자 !를 통해 DonutInfoActor에 대한 reference를 사용하여 Info 메시지를 보냈다.

println("\nStep 5: Akka Tell Pattern")
import DonutStoreProtocol._
donutInfoActor ! Info("vanilla")

Actor Hierarchy에 대한 이전 튜토리얼에서, 액터 시스템에서 생성 한 액터가 / user / 계층 아래에 ​​있다고 했다. 따라서 / user / DonutInfoActor 경로로 DonutInfoActor를 찾을 수 있어야 한다. ActorRef와 유사하게 bang (!) 연산자를 사용할 수 있다. Tell Pattern을 사용하여 메시지를 보낸다.

println("\nStep 6: Find Actor using actorSelection() method")
system.actorSelection("/user/DonutInfoActor") ! Info("chocolate")

actorSelection을 사용해서 보낼 수 있다.

 

[INFO] [06/26/2018 20:09:54.763] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutInfoActor] Found chocolate donut

 

이런 결과가 나온다.

 

다른 예시로, actorSelection() method 를 액터 시스템의 액터들에게 와일드 카드를 이용해서 한번에 메시지를 보낼 수도 (해당 되는 친구가 하나면 하나에게만 보냄) 있다. /user/* 이렇게.

system.actorSelection("/user/*") ! Info("vanilla and chocolate")

그러면 이런 결과가 나온다.

[INFO] [06/26/2018 20:09:54.763] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutInfoActor] Found vanilla and chocolate donut

 

 

아마도 system.actorOf() 메소드를 사용하여 Actor를 만들 때 지금까지 본 ActorRef를 단순히 사용하지 않는 이유를 물을 수 있다. Akka가 제공하는 또 다른 기능은 액터 시스템이 JVM 내에서 로컬이거나 JVM에서 remotely 호스팅 될 수 있다는 것이다. 액터간에 message passing은 최종 사용자에게 투명하며 Akka는 기본 전송을 처리한다. 다음 튜토리얼에서는 remote 액터의 예를 볼 수 있다.

 

마지막으로 terminate()를 사용해 종료하는걸 잊지 말자.

println("\nStep 7: close the actor system")
val isTerminated = system.terminate()

 

 

 

 

Child Actors

 

이 튜토리얼에서는 액터 계층과 경로를 더 잘 이해하기위한 여정을 계속할 것이다. 보다 정확하게는 우리는 child actor에 대해 소개할 것이다. 즉, 액터 내에서 스폰(spawn. 알 낳는 그 스폰. 리스폰할때 그 스폰) (및 잠재적으로 관리 및 감독되는 액터) 되는 하위 액터를 소개한다. 앞으로 다가오는 튜토리얼에서 Akka Actor supervision strategies에 대해 논의 할 것이므로 지금은 supervision(감독)에 집중하지 않겠다.

일반적인 첫 단계로 DonutStoreActorSystem이라는 액터 시스템을 만든다.

println("Step 1: Create an actor system")
val system = ActorSystem("DonutStoreActorSystem")

그런 다음 액터와 메시지 전달을위한 프로토콜을 정의한다.

println("\nStep 2: Define the message passing protocol for our DonutStoreActor")
object DonutStoreProtocol {
  case class Info(name: String)
}

이 단계에서는 일반적인 akka.actor trait을 확장하여 BakingActor라는 새 액터를 만든다. 이 액터는 아래 DonutInfoActor의 하위 액터(child actor)가 된다.

println("\nStep 3: Define a BakingActor and a DonutInfoActor")
class BakingActor extends Actor with ActorLogging {

  def receive = {
    case Info(name) =>
      log.info(s"BakingActor baking $name donut")
  }
}

DonutInfoActor 생성은 이전 예제와 유사하다. 즉, akka.actor trait을 확장한다. 그러나 액터 context 값을 사용해야하고 BakingActor를 생성하기 위해 actorOf() 메소드를 사용할 것이다. 액터 context value을 사용하면 BakingActor가 DonutInfoActor의 child actor가 된다.

 

hierarchical 관점에서 DonutInfoActor가 / user 계층 : / user / DonutInfoActor에 있다고 생각 할 수 있다. 이것은 자식 BakingActor의 계층과 경로가 / user / DonutInfoActor / BakingActor라는 것을 의미한다. 이 예제를 단순하게 유지하기 위해 DonutInfoActor의 receive 메소드 내에서 동일한 Info 메시지를 BakingActor로 전달(forward)한다.

 

(저 msg @ Info(name)의 at 기호에 대한 글 https://stackoverflow.com/questions/2359014/scala-operator case 에서 해당 값을 추출하기 위해 사용한다고 합니다 ㅎㅎ)

class DonutInfoActor extends Actor with ActorLogging {

val bakingActor = context.actorOf(Props[BakingActor], name = "BakingActor")

  def receive = {
    case msg @ Info(name) =>
      log.info(s"Found $name donut")
      bakingActor forward msg
  }
}

자식 액터 BakingActor의 생성을 내부적으로 담당하므로 DonutInfoActor 만 생성하면 된다.

println("\nStep 4: Create DonutInfoActor")
val donutInfoActor = system.actorOf(Props[DonutInfoActor], name = "DonutInfoActor")

Tell Pattern (fire-and-forget 메시지 전달 스타일)을 사용하여 DonutInfoActor에 Info 메시지를 보내면 안쪽의 자식 액터인  BakingActor에게 메시지를 전달(forward)한다.

println("\nStep 5: Akka Tell Pattern")
import DonutStoreProtocol._
donutInfoActor ! Info("vanilla")

Thread.sleep(3000)

마지막으로 terminate 하자.

println("\nStep 6: close the actor system")
val isTerminated = system.terminate()

 

아래와 같은 결과가 나와야 정상이다.

 

[INFO] [06/29/2018 20:46:10.626] [DonutStoreActorSystem-akka.actor.default-dispatcher-3] [akka://DonutStoreActorSystem/user/DonutInfoActor] Found vanilla donut [INFO] [06/29/2018 20:46:10.627] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutInfoActor/BakingActor] BakingActor baking vanilla donut

 

 

 

 

Actor Lifecycle

 

지금까지는 액터 생성, 액터 계층 및 경로 이해 및 하위 액터 생성에 대한 다양한 예를 살펴 보았다. 이제 Actor Lifecycles를 소개하는 것이 좋을 것 같다. 액터 시스템 내의 모든 Akka 액터는 수명주기를 따른다. 라이프 사이클은 생성에서 삭제까지 액터의 주요 이벤트를 나타낸다. 이를 위해 액터가 언제 시작되는지 아는 것이 어디서 데이터베이스 세션에 대한 연결을 열어야하는지 등의 관점에서 실제 응용 프로그램에서 중요 할 수 있다. 마찬가지로 액터가 중지되거나 충돌 한 경우 데이터베이스 세션을 닫아야 한다.

 

공식 Akka 문서의 아래 다이어그램은 액터 수명주기에 대한 시각적 인 설명을 제공한다.

 

이전 튜토리얼과 비슷한 DonutStoreActorSystem을 만들자.

println("Step 1: Create an actor system")
val system = ActorSystem("DonutStoreActorSystem")

액터와의 메시지 전달 프로토콜을 정의한다.

println("\nStep 2: Define the message passing protocol for our DonutStoreActor")
object DonutStoreProtocol {
  case class Info(name: String)
}

BackingActor에서 액터 라이프 사이클 이벤트, 즉 preStart (), postStop (), preRestart () 및 postRestart () 메서드를 재정의 한다. (시작 전, 멈춘 후, 재시작 전, 재시작 후에 각각 호출되는 메서드이다)

println("\nStep 3: Define a BakingActor and a DonutInfoActor")
class BakingActor extends Actor with ActorLogging {

  override def preStart(): Unit = log.info("prestart")

  override def postStop(): Unit = log.info("postStop")

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = log.info("preRestart")

  override def postRestart(reason: Throwable): Unit = log.info("postRestart")

  def receive = {
    case Info(name) =>
      log.info(s"BakingActor baking $name donut")
  }
}

BackingActor와 마찬가지로 액터 라이프 사이클 이벤트를 override하고 액터의 수명 동안 어떤 이벤트가 트리거되는지 알기 위해 메시지를 간단히 기록한다.

class DonutInfoActor extends Actor with ActorLogging {

  override def preStart(): Unit = log.info("prestart")

  override def postStop(): Unit = log.info("postStop")

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = log.info("preRestart")

  override def postRestart(reason: Throwable): Unit = log.info("postRestart")

  val bakingActor = context.actorOf(Props[BakingActor], name = "BakingActor")

  def receive = {
    case msg @ Info(name) =>
      log.info(s"Found $name donut")
      bakingActor forward msg
  }
}

system.actorOf() 메서드를 사용하여 DonutInfoActor를 만든다.

println("\nStep 4: Create DonutInfoActor")
val donutInfoActor = system.actorOf(Props[DonutInfoActor], name = "DonutInfoActor")

Tell Pattern을 사용하여 DonutInfoActor에 Info 메시지를 보낸다.

println("\nStep 5: Akka Tell Pattern")
import DonutStoreProtocol._
donutInfoActor ! Info("vanilla")

Thread.sleep(5000)

마지막으로 terminate() 메소드를 호출하여 액터 시스템을 닫는다.

println("\nStep 6: close the actor system")
val isTerminated = system.terminate()

이제 다음과 같은 아웃풋을 얻는다.

 

[INFO] [06/29/2018 21:26:19.880] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutInfoActor/BakingActor] prestart

[INFO] [06/29/2018 21:26:19.880] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutInfoActor] prestart

[INFO] [06/29/2018 21:26:19.882] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutInfoActor] Found vanilla donut

[INFO] [06/29/2018 21:26:19.883] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutInfoActor/BakingActor] BakingActor baking vanilla donut

[INFO] [06/29/2018 21:26:24.885] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutInfoActor/BakingActor] postStop

[INFO] [06/29/2018 21:26:24.886] [DonutStoreActorSystem-akka.actor.default-dispatcher-3] [akka://DonutStoreActorSystem/user/DonutInfoActor] postStop

 

 

 

 

 

 

Actor PoisonPill

 

이전의 액터 라이프 사이클 튜토리얼에 대해 계속 알아보자. 마지막 예제에서 preStart () 또는 postStop () 메서드와 같은 액터 이벤트를 재정의하는 방법을 보여주었다. 이 튜토리얼에서는 액터를 종료하거나 중지하기 위해 보내는 특별한 메시지 인 akka.actor.PoisonPill을 사용하는 방법을 보여준다. 행위자가 PoisonPill 메시지를 받으면 로그 메시지에서 actor 중지 이벤트가 트리거되는 것을 볼 수 있다.

이전 튜토리얼과 비슷한 DonutStoreActorSystem을 만든다.

println("Step 1: Create an actor system")
val system = ActorSystem("DonutStoreActorSystem")

다음으로 액터와 메시지 전달을위한 프로토콜을 정의한다.

println("\nStep 2: Define the message passing protocol for our DonutStoreActor")
object DonutStoreProtocol {
  case class Info(name: String)
}

BackingActor에서 액터 라이프 사이클 이벤트, 즉 preStart (), postStop (), preRestart () postRestart () 메서드를 재정의 한다.

println("\nStep 3: Define a BakingActor and a DonutInfoActor")
class BakingActor extends Actor with ActorLogging {

  override def preStart(): Unit = log.info("prestart")

  override def postStop(): Unit = log.info("postStop")

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = log.info("preRestart")

  override def postRestart(reason: Throwable): Unit = log.info("postRestart")

  def receive = {
    case Info(name) =>
      log.info(s"BakingActor baking $name donut")
  }
}

BackingActor와 마찬가지로 액터 라이프 사이클 이벤트를 오버라이드 하고 액터의 수명 동안 어떤 이벤트가 트리거되는지 알기 위해 메시지를 간단히 기록한다.

class DonutInfoActor extends Actor with ActorLogging {

  override def preStart(): Unit = log.info("prestart")

  override def postStop(): Unit = log.info("postStop")

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = log.info("preRestart")

  override def postRestart(reason: Throwable): Unit = log.info("postRestart")

  val bakingActor = context.actorOf(Props[BakingActor], name = "BakingActor")

  def receive = {
    case msg @ Info(name) =>
      log.info(s"Found $name donut")
      bakingActor forward msg
  }
}

system.actorOf() 메서드를 사용하여 DonutInfoActor를 만든다.

println("\nStep 4: Create DonutInfoActor")
val donutInfoActor = system.actorOf(Props[DonutInfoActor], name = "DonutInfoActor")

Tell Pattern을 사용하여 Info 메시지를 DonutInfoActor에 보낸 다음 PoisonPill 메시지를 액터에 보낸다. PosionPill 메시지에 이어 액터에게 다른 Info 메시지를 보내려고 할 것이다.

println("\nStep 5: Akka Tell Pattern")
import DonutStoreProtocol._
donutInfoActor ! Info("vanilla")

이어서 PoisonPill 메시지를 통해 도넛 인포 액터를 죽이자.

donutInfoActor ! PoisonPill

donutInfoActor ! Info("plain")

Thread.sleep(5000)

액터 시스템 내에 DonutInfoActor가 더 이상 존재하지 않기 때문에 Info 메시지가 소비되지 않는다. 대신 액터 시스템 내에서 전달할 수없는 메시지의 placeholder인 별도의 dead-letters hierarchy에 로그가 찍히게 된다. Akka dead-letters에 대한 추가 정보는 공식 Akka 설명서에서 찾아봐라.

마지막으로 terminate () 메소드를 호출하여 액터 시스템을 닫는다.

println("\nStep 6: close the actor system")
val isTerminated = system.terminate()

결과는 아래와 같다.

[INFO] [07/03/2018 20:29:44.475] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutInfoActor/BakingActor] prestart

[INFO] [07/03/2018 20:29:44.475] [DonutStoreActorSystem-akka.actor.default-dispatcher-5] [akka://DonutStoreActorSystem/user/DonutInfoActor] prestart

[INFO] [07/03/2018 20:29:44.477] [DonutStoreActorSystem-akka.actor.default-dispatcher-5] [akka://DonutStoreActorSystem/user/DonutInfoActor] Found vanilla donut

[INFO] [07/03/2018 20:29:44.477] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutInfoActor/BakingActor] BakingActor baking vanilla donut

[INFO] [07/03/2018 20:29:44.477] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutInfoActor/BakingActor] postStop

[INFO] [07/03/2018 20:29:44.477] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutInfoActor] postStop

[INFO] [07/03/2018 20:29:44.477] [DonutStoreActorSystem-akka.actor.default-dispatcher-5] [akka://DonutStoreActorSystem/user/DonutInfoActor] Message [com.nb.actors.Tutorial_10_Kill_Actor_Using_PoisonPill$DonutStoreProtocol$Info] without sender to Actor[akka://DonutStoreActorSystem/user/DonutInfoActor#342625185] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

 

 

 

 

 

 

Error Kernel Supervision

 

이 튜토리얼은 Actor PoisonPill을 사용하여 액터 계층, 액터 라이프 사이클 및 액터 중지에 대한 이전 튜토리얼에 대한 좋은 공부가 될 것이다. 전체 시스템 충돌과 달리 오류를 격리하고 지역화해야하는 오류 커널 패턴을 소개한다. Akka 공식 문서에서 Error Kernel Approach에 대한 추가 세부 사항을 찾을 수 있다.

 

다시 말하지만, Akka Actors는 계층 구조를 형성하며,이 기능을 사용하여 Actor가 하위 액터의 실패에 대해 감독하고 대응하도록 한다. 이전의 DonutStockActor 예제를 확장하고 DonutStockWorkerActor라는 하위 액터에게 작업을 전달한다. 또한 DonutStockWorkerActor는 DonutStockActor의 하위 액터이므로 DonutStockActor가 감독한다.
이것은 DonutStoreActorSystem이라는 액터 시스템을 생성하는 initial step이다.

println("Step 1: Create an actor system")
val system = ActorSystem("DonutStoreActorSystem")

우리는 DonutStoreProtocol을 확장하고 이전 Info() 케이스 클래스와 함께 CheckStock() case class를 추가한다. 이 클래스는 Actors와의 메시지 전달에 사용된다. 또한 Error Kernel Approach에 따라 오류를 격리하는 방법을 보여주는 custom exception (WorkerFailedException)를 만든다.

println("\nStep 2: Define the message passing protocol for our DonutStoreActor")
object DonutStoreProtocol {
    case class Info(name: String)

   case class CheckStock(name: String)

   case class WorkerFailedException(error: String) extends Exception(error)
}

DonutStockActor가 CheckStock 유형의 메시지를 수신하면 실제 메시지 처리를 위해 요청을 하위 Actor에게 전달한다. 자식 액터는 DonutStockWorkerActor type이며 setp 4 에서 정의한다. Error Kernel Approach에 따라 DonutStockActor는 자식 액터 DonutStockWorkerActor를 supervise한다. SupervisorStrategy를 제공함으로써 그렇게 행동한다. 이 간단한 예제에서 액터는 WorkerFailedException type의 예외에 react하고 하위 액터 DonutStockWorkerActor를 restart하려고 시도한다. 다른 모든 예외의 경우 DonutStockActor가 해당 예외를 처리 할 수 ​​없다고 가정하여 해당 예외를 액터 계층 구조로 escalate(올려보낸다는뜻?)한다.

(여기에서 supervisorStrategy 라는 미리 정의된 함수를 오버라이딩 하고, Restart, Escalate는 이때 사용할 수 있는 만들어져 있는 함수이다. OneForOneStrategy 는.. 따로 찾아봐야 할 것 같다. 느낌상 걍 각각 적용되는 Strategy라는 뜻 인것 같다.)

println("\nStep 3: Create DonutStockActor")
class DonutStockActor extends Actor with ActorLogging {

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 seconds) {
      case _: WorkerFailedException =>
        log.error("Worker failed exception, will restart.")
        Restart

      case _: Exception =>
        log.error("Worker failed, will need to escalate up the hierarchy")
        Escalate
    }

  val workerActor = context.actorOf(Props[DonutStockWorkerActor], name = "DonutStockWorkerActor")

  def receive = {
    case checkStock @ CheckStock(name) =>
      log.info(s"Checking stock for $name donut")
      workerActor forward checkStock
  }
}

액터 라이프 사이클을 시작하고 DonutStockWorkerActor가 언제 다시 시작되는지 알기 위해 postRestart() 메서드를 재정의한다. DonutStockWorkerActor가 CheckStock 유형의 메시지를 수신하면 내부적으로 작업을 findStock() 메소드에 위임(delegate)한다. 이 예제를 위해 CheckStock 메시지를 처리 ​​한 후 DonutStockWorkerActor는 context.stop(self)를 호출하여 다른 메시지 처리를 중지한다.  (아얘 삭제와는 다르다.)

findStock () 메소드 내에서 조작이 특정 도넛의 재고 수량을 찾기 위해 데이터베이스와 같은 외부 자원 액세스를 시뮬레이션한다고 가정 할 수 있다. 이 예에서는 임의의 Int 값 100을 반환한다.

println("\nStep 4: Worker Actor called DonutStockWorkerActor")
class DonutStockWorkerActor extends Actor with ActorLogging {

  @throws[Exception](classOf[Exception])
  override def postRestart(reason: Throwable): Unit = {
    log.info(s"restarting ${self.path.name} because of $reason")
  }

  def receive = {
    case CheckStock(name) =>
      findStock(name)
      context.stop(self)
  }

  def findStock(name: String): Int = {
    log.info(s"Finding stock for donut = $name")
    100
// throw new IllegalStateException("boom") // Will Escalate the exception up the hierarchy
// throw new WorkerFailedException("boom") // Will Restart DonutStockWorkerActor
  }
}

도넛스톡액터를 앞선 예제와 비슷하게 만들자.

println("\nStep 5: Define DonutStockActor")
val donutStockActor = system.actorOf(Props[DonutStockActor], name = "DonutStockActor")

 

우리는 CheckStock 메시지를 DonutStockActor로 Ask Pattern을 사용해서 보낼 수 있다.

println("\nStep 6: Akka Ask Pattern")
import DonutStoreProtocol._
import akka.pattern._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
implicit val timeout = Timeout(5 second)

val vanillaDonutStock: Future[Int] = (donutStockActor ? CheckStock("vanilla")).mapTo[Int]
for {
  found <- vanillaDonutStock
} yield (println(s"Vanilla donut stock = $found"))

Thread.sleep(5000)

마지막으로 역시 terminate() 하자.

println("\nStep 7: Close the actor system")
val isTerminated = system.terminate()

throw new IllegalStateException( "boom")의 주석 처리를 제거하거나 new WorkerFailedException( "boom")을 throw 하여 실패를 시뮬레이션 할 수 있다. step 3 의 SupervisorStrategy에 따라 DonutStockWorkerActors가 IllegalStateException을 발생시키는 경우, Supervisor Actor DonutStockActor는 예외를 Actor Hierarchy 위로 escalate 한다. 프로그램을 실행할 때 에스컬레이션을 보여주는 다음 statements이 표시된다.

 

[INFO] [05/15/2018 20:35:47.163] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutStockActor] Checking stock for vanilla donut

[INFO] [05/15/2018 20:35:47.163] [DonutStoreActorSystem-akka.actor.default-dispatcher-3] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerActor] Finding stock for donut = vanilla [ERROR] [05/15/2018 20:35:47.163] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutStockActor] Worker failed, will need to escalate up the hierarchy

[ERROR] [05/15/2018 20:35:47.178] [DonutStoreActorSystem-akka.actor.default-dispatcher-2] [akka://DonutStoreActorSystem/user/DonutStockActor] boom

 

WorkerFailedException 유형의 예외의 경우 SupervisorStrategy가 다음과 같이 하위 액터 DonutStockWorkerActor를 다시 시작하려고 한다.

 

[INFO] [05/15/2018 20:37:13.503] [DonutStoreActorSystem-akka.actor.default-dispatcher-3] [akka://DonutStoreActorSystem/user/DonutStockActor] Checking stock for vanilla donut
[INFO] [05/15/2018 20:37:13.503] [DonutStoreActorSystem-akka.actor.default-dispatcher-3] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerActor] Finding stock for donut = vanilla
[ERROR] [05/15/2018 20:37:13.519] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutStockActor] Worker failed exception, will restart.
[ERROR] [05/15/2018 20:37:13.519] [DonutStoreActorSystem-akka.actor.default-dispatcher-4] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerActor] boom
[INFO] [05/15/2018 20:37:13.519] [DonutStoreActorSystem-akka.actor.default-dispatcher-3] [akka://DonutStoreActorSystem/user/DonutStockActor/DonutStockWorkerActor] restarting DonutStockWorkerActor because of com.nb.actors.Tutorial_11_ErrorKernel$DonutStoreProtocol$WorkerFailedException: boom

 

 

여기까지가 일단 6가지의 튜토리얼 중

AKKA Actors에 대한 튜토리얼이다.

고생했다. (내가)

 

이어서 다음엔 AKKA Routers에 대해서 알아보도록 한다.

 

 

 

다음글은

2019/12/02 - [Programmer Jinyo/Scala & AKKA] - AKKA(아카)with Scala 튜토리얼 03 (Routers)