forcemax's

Reference

1. Storm-starter : https://github.com/apache/storm/tree/master/examples/storm-starter

2. Storm : https://github.com/apache/storm

3. Implementing Real-Time Trending Topics With a Distributed Rolling Count Algorithm in Storm : http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/


소개

회사 업무로 Apache Storm(이하 Storm)과 관련된 프로젝트를 완료하고 휴식중에, Storm 기초를 설명할 수 있을만한 예제를 만들어보기 위해서 Storm Starter를 참조하여 간단한 프로젝트를 만들었다.

이 프로젝트는 Twitter Sample Public Status API(https://dev.twitter.com/streaming/reference/get/statuses/sample)를 사용하여 Twitter realtime stream data의 일부를 Input으로 하고, HashTag 정보를 추출한 후 일정 시간 간격(emit frequency)으로 일정 시간 동안(window length)의 일정 갯수(TOP_N)의 Top HashTag를 생성하여 출력하는 프로젝트이다.

Storm-starter project에서 많은 소스코드를 가져 왔으며 Twitter Library는 Twitter4J를 사용한다.

Project source : https://github.com/forcemax/storm_twitter_hashtag


실행하기

0. Prerequisites

Java 1.7 이상, Storm 0.9.5, Maven, Git

(Twitter API를 사용하기 위한 consumerKey, consumerSecret, accessToken, accessTokenSecret을 변경하지 않으면 실행이 안된다.)

1. 소스 가져오기

git clone https://github.com/forcemax/storm_twitter_hashtag

2. 소스 빌드하기

$ mvn clean package

3. Storm Cluster에 Topology submit

$ storm jar StormTwitterHashtag-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.embian.forcemax.twitter.StormTwitterHashtagTopologyRunner server

4. Storm UI에서 확인하기

그림. topology 목록

그림. topology 상세 정보


설명

- Rolling Count Algorithm은 Reference 3번 사이트를 참조

- Topology는 다음과 같이 구성되어 있다. Twitter API를 사용하기 위해서는 Twitter에 App 등록을 해야하며, App 등록을 하면 consumerKey, consumerSecret, accessToken, accessTokenSecret 값을 얻을 수 있다. 다음 코드에 해당 값을 넣어서 사용한다.

그림. Topology 구성도

- TwitterSpout은 Twitter4J Library를 사용하며, LinkedBlockingQueue를 사용해서 새로운 Public Status가 있을때 Queue에 저장한다. nextTuple() 호출시에 Queue에서 꺼내서 ExtractHashTagBolt에 넘긴다.

- ExtractHashTagBolt는 받은 Public Status에서 HashTag만 뽑아내서, RollingCountBolt로 넘긴다. HashTag의 갯수 만큼 emit이 발생한다.

- RollingCountBolt는 생성할 때 인자로 받은 window length와 emit frequency 값을 바탕으로, emit frequency마다 window length에 속하는 데이터에서 word별 count를 계산해서 IntermediateRankingsBolt로 넘긴다. 이때, emit frequency마다 emit을 하기 위해서 TickTuple을 사용하는데, TickTuple은 Storm 0.8에 새로 들어간 기능이며 Component(Spout, Bolt)내에서 일정 주기 마다 Tuple을 발생시키는 기능이다.

IntermediateRankingsBolt와 TotalRankingsBolt는 생성할때 인자로 topN, emit frequency를 받으며, 입력된 word별 count를 바탕으로 상위 topN개의 word와 count를 뽑아내고 emit frequency 마다 emit한다. IntermediateRankingsBolt는 parallelism hint를 크게 주어 map-reduce 구조에서 map의 역할을 하고, TotalRankingsBolt는 parallelism hint를 1로 주고 reduce의 역할을 한다. emit frequency마다 emit을 하기 위해서 RollingCountBolt와 마찬가지로 TickTuple을 사용한다.

- 마지막으로 PrinterBolt는 TotalRankingsBolt에서 emit한 Tuple을 출력하기 위해서 사용하며 특별한 기능은 없다.


이 프로젝트는 Storm의 Spout(nextTuple과 open), Bolt(execute, prepare)와 Topology wiring에 대한 이해만 있다면 코드를 보는데 아무 무리가 없을 정도로 간단한 예제이다. 그러나 외부 서비스(Twitter)과의 연계를 통한 Spout 구성, Atomic하게 역할을 분리한 Bolt, parallelism hint를 조절하여 성능을 향상시키는 방법을 확인해 보기에 알맞은 예제이다.

Twitter Sample Public Status API는 10분에 20000 statuses 정도의 데이터밖에 제공하지 않으므로, 한대의 Storm에서 처리하기에 충분하다.

초보자가 알아보기 쉽게 코드가 구성되어 있으니, Storm을 이용하여 realtime CEP 엔진을 공부하려는 분들에게 많은 도움이 되었으면 한다.



Reference : https://storm.apache.org/documentation/Fault-tolerance.html


Apache Storm(이하 Storm) Cluster를 Production 단계에서 사용하기 위해 고려하다보면 Nimbus의 SPoF(Single Point of Failure) 여부에 대해서 고민하지 않을 수 없다. 자세하게 Storm Cluster의 Fault Tolerance에 대해서 살펴보면 다음과 같다.


1. Worker가 Down되면?

Worker는 Supervisor에 의해서 관리되며 Worker가 Down될 경우 Supervisor가 Worker를 재기동(restart)한다. 그러나 Worker가 기동되는 단계에서 지속적으로 문제가 발생해서 기동에 실패하면, Nimbus에 의해서 다른 Supervisor에게 넘긴다.


2. Node가 Down되면?

여기서 Node는 장비(Machine)을 칭하며, Supervisor가 구동중인 장비로 규정한다. Node가 Down될 경우 Nimbus가 이를 감지하고 해당 Supervisor에서 동작된던 task를 다른 Supervisor로 넘긴다. 이 경우 다른 Supervisor에서 Worker가 새로 기동될 수도 있고, 현재 기동되어 있는 Worker에 task를 할당하기도 한다.


3. Nimbus 또는 Supervisor가 Down되면?

Nimbus와 Supervisor는 Storm에서 제공하는 문서에 의하면 Fail-Fast & Stateless 디자인이다. monit이나 daemontools와 같은 툴을 사용해서 process가 종료되면 자동으로 재기동 되도록 구성하는 것이 좋다. 좋은 정도가 아니라 꼭 이렇게 하라고 한다. 상태 정보가 Zookeeper 또는 디스크에 저장되어 있으므로 재기동시에 이를 읽어들여 기존 상태와 같이 구성한다.


4. Nimbus는 SPoF인가?

Nimbus node가 Down되더라도 다른 Node에서 동작중이던 Worker는 영향이 없다. 만약 다른 Node에서 동작중이던 Worker가 Down되더라도 해당 Node에서 동작중인 Supervisor에 의해서 재기동된다. 그러나 Worker가 재기동될 때 지속적으로 문제가 발생해서 기동에 실패한다면, 해당 Worker는 Down상태로 남는다.

그러므로 "Nimbus는 SPoF인가?"라는 물음에 대답은 "Nimbus는 일종의 SPoF로 볼 수 있다."이다. 그러나, 실제로 Nimbus node가 Down되어 있는 중에 비극적인 일(Worker가 막 Down된다던지...)만 발생하지 않는다면 큰 문제는 아니다. 향후 Nimbus의 HA를 지원할 계획은 갖고 있다.


결과적으로,

Nimbus Node(Nimbus process가 아니라)가 Down 되더라도 다른 Node(Supervisor Node)들은 정상적으로 Topology를 실행하고 있는 상태이다. 그러나, 정상적인 상태로 복구하기 위해서는 Nimbus Node를 복구해야한다.

여기서 Nimbus Node를 복구하는 방법에 두가지 경우가 있을 수 있다.

1. Nimbus Node가 상태 정보 데이터(in Disk)를 가지고 복구되는 경우 : Nimbus가 상태 정보를 디스크에서 읽어 들일 수 있으므로, 기존 상태 정보를 가지고 기동된다.

2. Nimbus Node에 상태 정보 데이터(in Disk)를 가지고 복구 되지 않는 경우 : 기존 정보가 없으므로 Storm Cluster가 새로 기동되는 상태이다. 이 경우 기존 동작중인 Supervisor가 새로운 Nimbus에 연결되면서 기존 Worker를 제거한다. 그러므로, Topology를 다시 Submit해야 한다.


PS. Storm 1.0.0 부터는 HA Nimbus가 지원됩니다~~