2014年に某サイトで公開したテックブログですが、こちらに引っ越しました。
Actorモデルの魅力を感じていただければと思います。
概要
Actorモデルを実装した Akka Actor を Java で利用しました。その際に気づいた事や使い方などをまとめます。
Akka を Java から使っている情報が少なく苦労したので書き記しておきます。
(公式ページ)https://akka.io/
結論
複雑なマルチスレッドの概念をシンプルに変えるアクターモデルの魅力を強く感じました。
新しいフレームワークを学びながら実装することでモチベーション高く開発でき、今後も機会がある度に使おうと思っています。
何が良かったか
マルチスレッド処理が簡単に実現できる!!!
誤解を恐れずに言い切ってしまうと Akka とはノンブロッキングなマルチスレッドを簡単に記述出来るフレームワークです。
並行処理を クラスター(ActorSystem)と Singletion なスレッド(Actor)で実現しています。
各 Actor は
・Singleton なスレッドで動作する
・ActorSystem 内で各 Actor インスタンスを名前で管理されていて名前を指定するだけで Actor (Singleton スレッド)を作る or 呼び出す事が出来る
・FIFO な Queue を持っていて依頼された順番に処理出来る(QueueはFIFO以外に自由に設定可能)
・メッセージにより他Actorへ処理を移譲出来る
様々な router が便利
Actor 間の連携は Java Object のインスタンスや文字列をメッセージとしてやりとり出来ます。
メッセージには、Post するような投げっぱなしのメッセージと Send するような返事をもらうメッセージがあります。
メッセージの送信パターンは
・宛先指定のメッセージ配信
・ラウンドロビンによるメッセージ配信
・ブロードキャストによる複数Actorへのメッセージ配信
(公式ページ)http://doc.akka.io/docs/akka/2.4.1/java/routing.html
Let it crash
失敗したら捨ててしまってやり直そうを簡単に実現しています。
イミュータブルインフラストラクチャと似ている思想です。
(公式ページ)http://doc.akka.io/docs/akka/2.4.1/java/fault-tolerance.html
各 Actor にはエラー発生を検知する SuperVisior と Exception 発生時にどのように振る舞うか Strategy を定義できます。
例えば
・リトライ可能なエラーが発生したら再度 Actor インスタンスを作りなおしてメッセージを再送して処理を行う
・リトライ不能なエラーが発生したら再実行を行わずログに出力して終了させる
など可能です。
(公式ページ)http://doc.akka.io/docs/akka/2.4.1/general/supervision.html
Local および Remote Cluster を構築できる
ActorSystem という Cluster によって、 Local および Remote ホストで Actor 処理を実現できます。
処理をスケールさせたければサーバを追加して Actor によるメッセージのやりとりで処理を分散できます。
(公式ページ)http://doc.akka.io/docs/akka/2.4.1/java/remoting.html
Java エンジニアが Scala を学ぶきっかけになる(のでは)!!
Java や Scala は IDE を使って開発する事が多いと思いますが、
Eclipse では F3 キー、IntelliJ Idea では Command + B(当方Macで開発) で定義へジャンプしていると度々 Scala で実装されたソースコードに出会う事になると思います。
見ているうちに Scala への抵抗が薄れ重い腰を上げることに繋がりやすいと思います。
Akka 以外にも Play Framework2 や Spark を通して Scala を覚えるというのもお勧めです。
漠然と 新しいプログラミング言語を学ぶよりも生きたソースコードに触れるのが習得への近道ではないでしょうか。
デメリット
日本語の情報が少ない
一年前に比べるとぐっと増えてきましたがまだまだ日本語の情報が少ないです。
英語の公式サイトの情報が圧倒的なのであきらめて公式サイトを読み込みましょう。
他?
(Akka に関係ない他の事で多々苦労をしましたが・・・)
開発に使った時にバグに遭遇することもなく、学習コスト以外はとくに苦労しなかったので特に浮かばないです。
コード例
コーディング例です。
(例)EntryPoint -> Controller -> FirstActor -> SecondActor と処理を依頼
実際に業務で開発した非同期なDequeue処理をシンプル化したコードです
こちらにCodeを登録しておきました。
https://github.com/ishiyayu/Akka-Java
・EntryPoint
public class Entry {
public static void main(String[] args) {
AkkaController controller = new AkkaController();
try {
controller.execute();
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
System.exit(0);
}
}
・Akka を呼び出す Controller クラス
import akka.actor.Actor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.pattern.Patterns;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class AkkaController {
private ActorSystem actorSystem;
private volatile boolean stopFlg = false;
public void execute() throws Exception {
// ActorSystem起動(パラメータはActorSystemにつける名前です)
this.actorSystem = ActorSystem.create("ActorSystem-name");
// ActorSystem起動
shutdownHook(actorSystem);
ActorRef actor = null;
while (true) {
if (stopFlg) {
break;
}
// Actor の作成(パラメータはActorにつける名前です)
actor = createActor(FirstActor.class, "actor-name");
// Actor へメッセージを送り処理を依頼する
String retMsg = null;
try {
// Actor への処理の依頼
// Actor から返信を待つ ask で msg 送り future を受取る
// Await.result で future の値を待つ
retMsg = (String) Await.result(
Patterns.ask(
actor
, "メッセージ内容"
, 30L * 1000L)
, Duration.create(
60L * 1000L
, TimeUnit.MILLISECONDS)
);
} catch (TimeoutException e) {
// エラー処理
}
}
}
/**
* Actorの作成
* @param clazz
* @param actorName
* @return
*/
@SuppressWarnings("deprecation")
protected ActorRef createActor(Class<? extends Actor> clazz, String actorName) {
// Akkaインスタンスがあれば返却
ActorRef actor = actorSystem.actorFor("/user/" + actorName);
if (actor != null && !actor.isTerminated()) {
return actor;
}
// インスタンスがないので新規作成
return actorSystem.actorOf(Props.create(clazz), actorName);
}
/**
* JVM終了時の処理
* @param actorSystem
*/
public void shutdownHook(ActorSystem actorSystem) {
// JVM 終了時に ActorSystem を shutdown する
Thread t = new Thread(() -> {
this.stopFlg = true;
actorSystem.shutdown();
System.out.println("shutdownHook!!");
actorSystem.awaitTermination();}
);
Runtime.getRuntime().addShutdownHook(t);
}
}
・FirstActor
import akka.actor.Actor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
public class FirstActor extends UntypedActor {
/**
* インスタンス化される際に1度のみ実行される(costructor)
*/
@Override
public void preStart() {
// 何か初期化処理
}
/**
* インスタンスが破棄される際に1度のみ実行される(destructor)
*/
@Override
public void postStop() {
// 何か終了処理
}
/**
* EntryPoint
* msgを受信した際にここがコールされる
* @param message
* @throws Exception
*/
@Override
public void onReceive(Object message) throws Exception {
try {
// 想定外のmsgを受け取っていないかチェック
// Objectを送信するならば instanceof でチェックなどで validation
if (message == null || !(message instanceof String)) {
// messsage の破棄
unhandled(message);
return;
}
// 何かビジネス処理
// SecondActorへ処理を依頼
ActorRef secondActor = createActor(SecondActor.class, "SendActor-Name");
// 返信不要の tell() でメッセージ送信
secondActor.tell("メッセージ", null);
// 呼び元に終了メッセージ返して1件のJob終了
sender().tell("DONE", getSelf());
} catch (Exception e) {
// error処理
// 呼び元にメッセージ返して1件のJob終了
sender().tell("ERROR", getSelf());
// Actorのエラーを検知出来るようにエラーを投げる
throw e;
}
}
/**
* SecondActorインスタンスの取得
* @param clazz
* @param actorName
* @return
*/
protected ActorRef createActor(Class<? extends Actor> clazz, String actorName) {
// 既存actorあれば返却
// pathはactorSystmNameから下を指定する(akka://ActorSystem-name/user/xxx)
// 現在は actorFor() は Deprecation になっているので actorSelection() を使ってください。。
ActorRef actor = getContext().actorFor("/user/" + actorName);
if (actor != null && !actor.isTerminated()) {
return actor;
}
// 指定のActorがないので Actor インスタンスを新規作成
return getContext().system().actorOf(Props.create(clazz), actorName);
}
}
・SecondActor
import akka.actor.ReceiveTimeout;
import akka.actor.UntypedActor;
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
public class SecondActor extends UntypedActor{
/**
* インスタンス化される際に1度のみ実行される(costructor)
*/
@Override
public void preStart() {
// Actor instance の自動終了も可能です
// 指定秒数Messageが届かなければReceiveTimeoutメッセージが届くように設定 -> instance終了させる
getContext().setReceiveTimeout(Duration.create(10 * 60, TimeUnit.SECONDS));
}
/**
* インスタンスが破棄される際に1度のみ実行される(destructor)
*/
@Override
public void postStop() {
}
/**
* エントリポイント
* @param message
* @throws Exception
*/
@Override
public void onReceive(Object message) throws Exception {
try {
// ReceiveTimeoutならば自分インスタンスを終了
if (message instanceof ReceiveTimeout) {
getContext().stop(getSelf());
unhandled(message);
return;
}
// 必要に応じてmsgチェック
// 何かビジネスロジック
System.out.println("hogehoge");
// 返信不要の tell() でメッセージを受け取ったので 呼び元へメッセージを返す必要はありません
} catch (Exception e) {
// 何かエラー処理
throw e;
}
}
}
コメント