본문 바로가기

Hadoop/빅데이터 파일럿 프로젝트

빅데이터 적재 - 실시간 로그 파일 적재 / 환경 구성(2)

 

  • 빅데이터 실시간 적재 개요
  • 빅데이터 실시간 적재에 활용되는 기술
  • 실시간 적재 파일럿 실행 1단계 - 적재 아키텍처
  • 실시간 적재 파일럿 실행 2단계 - 적재 환경 구성

         프로그램 설치 및 설정, 테스트

    • 01 HBase 설치
    • 02 Redis 설치
    • 03 Storm 설치
  • 실시간 적재 파일럿 실행 3단계 - 적재 기능 구현

         스톰의 Spout와 Bolt 프로그램 구현 단계

    • 01 - Kafk Spout 구현
    • 02 Split Bolt 구현
    • 03 HBase Bolt 구현
    • 04 Esper Bolt 구현
    • 05 Redis Bolt 구현
    • 06 Redis 클라이언트 구현
    • 07 HBase 테이블 생성
    • 08 Storm 토폴로지 배포
  • 실시간 적재 파일럿 실행 4단계 - 적재 기능 테스트
    • CM 각 서버 정상 확인
    • 스톰, 레디스 서비스 정상 확인
    • service redis_6379 status

어제 수업에서 Redis 를 Server02에 설치하고, 잘 설치가 되었는지 확인하였습니다.

오늘은 이어서 Storm 을 설치 및 설정(+테스트) 합니다.

 

하둡을 배우는 요즘은, 수업 시작 전에 아래와 같이 기본셋팅을 합니다

1) Virtual Box 를 켜고, 서버1과 서버2를 시작

2) MobaXterm 을 켜고, 세션 접속(서버1, 서버2, 서버2-1), 패스워드 입력

  • 서버2-1 : 서버2 세션을 하나 더 엽니다.

3) 크롬을 켜고, 클라우드 매니저 사이트로 들어가서 로그인


 

스톰 설치 - Server02

  • CM 포함 컴포넌트 아님
# cd cd /home/pilot-pjt/
# wget http://archive.apache.org/dist/storm/apache-storm-1.2.3/apache-storm-1.2.3.tar.gz
# tar -xvf apache-storm-1.2.3.tar.gz
# ln -s apache-storm-1.2.3 storm

환경 설정 파일 변경

# cd /home/pilot-pjt/storm/conf/
# vi storm.yaml
# 주키퍼 정보(주석 풀고 수정)
storm.zookeeper.servers:
     - "server02.hadoop.com"
# 스톰이 작동하는 데이터 저장소 (추가)
storm.local.dir: "/home/pilot-pjt/storm/data"
# 스톰 Nimbus 정보
nimbus.seeds: ["server02.hadoop.com"]
# Worker 포트(포트 갯수 만큼 멀티 Worker 만들어짐)
supervisor.slots.ports:     
   - 6700
# 스톰 UI 접속 포트
ui.port: 8087

스톰 로그 레벨 조정 : INOF => ERROR

  • 대규모 트랜잭션에 로그 성능저하 및 디스크 공간 부족
# cd /home/pilot-pjt/storm/log4j2/
# vi cluster.xml
# vi worker.xml
level="error" # info => error
level="ERROR" # INFO => ERROR

Path 설정

# vi /root/.bash_profile
PATH=$PATH:/home/pilot-pjt/storm/bin
# source /root/.bash_profile

- 교재에는 위와 같이 나와있는데, 저는 범용으로 설정하기 위해 아래 코드로 입력했습니다.

 

스톰서비스 자동 실행 스크립트 작성

storm-nimbus / storm-supervisor / storm-ui

위 파일3개  - 권한 변경 : 755

첫 번째 수 -  소유자 권한

두 번째 수 - 그룹 사용자 권한 

세 번째 수 - 기타 사용자 권한

 

755

=> 소유자만 모든 권한이 가능하고, 그 외 사용자는 '읽고, 쓰기'만 가능합니다.

# chmod 755 /etc/rc.d/init.d/storm-nimbus
# chmod 755 /etc/rc.d/init.d/storm-supervisor
# chmod 755 /etc/rc.d/init.d/storm-ui


로그 및 Pid 디렉토리 생성

# mkdir /var/log/storm
# mkdir /var/run/storm

 

자동 실행 명령 및 구동 확인(시간이 조금 걸림)

# service storm-nimbus start
# service storm-supervisor start
# service storm-ui start
# service storm-nimbus status
# service storm-supervisor status
# service storm-ui status

스톰 UI 접속

 

 


실시간 적재 파일럿 실행 3단계 - 적재 기능 구현

 

  • 스톰의 Spout와 Bolt 프로그램 구현 단계

 

스톰에서 중요한 것 3가지

1. Spout

2. Bolt (볼트끼리 연결이 가능 ex: 스톰->HBase로 가는 볼트 ) 

 - Java 로 구현해야 한다

3. 잘 적재되어 있는지 확인

토폴리지 안에 아래 그림의 Workers의 내용이 있는 것.

어떤 종류의 Spout를 만들지가 1차


StormTopology - SmartCarDriver

  • 실시간 적재 기능 구현 5개의 자바 컴포 넌트(java)
    • 카프카 out
    •  스플릿 out (과속 자동차 체크해서 레디스로 보내야 함)
    • 에스퍼(HBase or Memory / 둘 중 하나로)
  • 카프카 Spout => 스플릿 Bolt => HBase Bolt 
  • 카프카 Spout => 에스퍼 Bolt => 레디스 Bolt

 

카프카 Spout 기능 구현

  • 스톰 Spout의 기본 기능은 외부 시스템과의 연동을 통해 스톰의 Topology로 데이터 가져오기
    • Spout 와 비슷한 기능을 하는 것이 Java에서는 JDBC와 같음
    • 만들어주기만 하면 어디서든지 기능을 한다
    • Spout이 지원해기만 하면 데이터를 다 읽을 수 있다.
    • Topology 로 가져오기 위해 
  • 운행 정보 => 카프카에 적재된 데이터 => 스톰 Spout 이 읽어와서 Bolt 에 전달
  • 카프카에 적재된 데이터를 가져오기 위해 카프카- Spout 사용

 


<실습>

자동 실행 명령 및 구동 확인(시간이 조금 걸림)

# service storm-nimbus start
# service storm-supervisor start
# service storm-ui start
# service storm-nimbus status
# service storm-supervisor status
# service storm-ui status

 

정상적으로 실행이 되었다면, 위와 같은 화면을 확인할 수 있음

1차 목표는 Topology Summary 

 

SmartCarDriverTopology.java

// 카프카에 접속하기 위한 서버와 토픽 정보 정의
String zkHost = "server02.hadoop.com:2181";

// 스톰 Topology 생성
TopologyBuilder driverCarTopologyBuilder = new TopologyBuilder();

// Spout Bolt
BrokerHosts brkBost = new ZkHosts(zkHost);
String topicName = "SmartCar-Topic";
String zkPathName = "/SmartCar-Topic";

SpoutConfig spoutConf = new SpoutConfig(brkBost, topicName, zkPathName, UUID.randomUUID().toString());
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());

// KafkaSpout 객체 생성
KafkaSpout kafkaSpout = new KafkaSpout(spoutConf);

// KafkaSpout 객체를 Topology 에 설정
driverCarTopologyBuilder.setSpout("kafkaSpout", kafkaSpout, 1);

// Grouping - SplitBolt & EsperBolt
//  KafkaSpout에서 받은 데이터를 2개의 볼트(splitBolt/esperBolt)로 전달
driverCarTopologyBuilder.setBolt("splitBolt", new SplitBolt(),1).allGrouping("kafkaSpout");
driverCarTopologyBuilder.setBolt("esperBolt", new EsperBolt(),1).allGrouping("kafkaSpout");

 

SplitBolt

  • KafkaSpout => SplitBolt

SplitBolt.java

package com.wikibook.bigdata.smartcar.storm;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;


public class SplitBolt extends BaseBasicBolt{ 
// BaseBasicBolt 라는 볼트만들 때 필요한 객체를 상속받음

    private static final long serialVersionUID = 1L;
    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {

        String tValue = tuple.getString(0);  

        //  KafkaSpout 에서 전달된 데이터 / tuple "," 로 분리
        // Tuple : 스톰 레이어간  메세지 전달 단위(Spout => Bolt, Bolt => Bolt)
        
        String[] receiveData = tValue.split("\\,");

        // 운전자의 실시간 운행정보 데이터셋 형식 정의
        //발생일시(14자리), 차량번호, 가속페달, 브레이크페달, 운전대회적각, 방향지시등, 주행속도, 뮤직번호        
        collector.emit(new Values(new StringBuffer(receiveData[0]).reverse() + "-" + receiveData[1]  , receiveData[0], receiveData[1], receiveData[2], receiveData[3],
receiveData[4], receiveData[5], receiveData[6], receiveData[7]));
    }

	// 필드에 대한 정의
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("r_key"    , "date", "car_number",
        "speed_pedal", "break_pedal", "steer_angle","direct_light", "speed"    ,"area_number"));
    }

}

extends 베이스를 상속받으면 볼트가 된다.

excute 변수 - 자료형을 넣어준다.

 

import backtype.storm.tuple.Tuple;

- tuple 에 들어가보면 사용할 때, get 을 사용하는 구나.

 

 String[] receiveData = tValue.split("\\,");

- 자바의 split

스트링 배열이 반환하는 값. -> split

 

   // 운전자의 실시간 운행정보 데이터셋 형식 정의
 //발생일시(14자리), 차량번호, 가속페달, 브레이크페달, 운전대회적각, 방향지시등, 주행속도, 뮤직번호        
        collector.emit(new Values(new StringBuffer(receiveData[0]).reverse() + "-" + receiveData[1]  , receiveData[0], receiveData[1], receiveData[2], receiveData[3],
receiveData[4], receiveData[5], receiveData[6], receiveData[7]));
    }

 

  • 초기값 -> new StringBuffer

- 메모리를 새로 할당하는 단계가 없기 때문에 10배 이상 훨씬 빠르다

- 문자열의 연산을 하기 위해 StringBuffer 사용

- reverse() => 역순으로 

 

 


HBase Bolt 기능 구현

  • KafkaSpout => SplitBolt => HBase Bolt

SmartCarDriverTopology.java

// HBase Bolt
// 테이블명과 로우키 설정
TupleTableConfig hTableConfig = new TupleTableConfig("DriverCarInfo", "r_key");

// HBase 연결된 주키퍼 정보 필요
hTableConfig.setZkQuorum("server02.hadoop.com");
hTableConfig.setZkClientPort("2181");
hTableConfig.setBatch(false);

// 저장하고자 하는 테이블의 컬럼패밀리 cf1의 정보 설정
hTableConfig.addColumn("cf1", "date");
hTableConfig.addColumn("cf1", "car_number");
hTableConfig.addColumn("cf1", "speed_pedal");
hTableConfig.addColumn("cf1", "break_pedal");
hTableConfig.addColumn("cf1", "steer_angle");
hTableConfig.addColumn("cf1", "direct_light");
hTableConfig.addColumn("cf1", "speed");
hTableConfig.addColumn("cf1", "area_number");

// HBaseBolt 객체 생성
HBaseBolt hbaseBolt = new HBaseBolt(hTableConfig);

// HBaseBolt 객체를 Topology에 설정 데이터 전달 받을 Bolt(splitBolt) 설정
driverCarTopologyBuilder.setBolt("HBASE", hbaseBolt, 1).shuffleGrouping("splitBolt");


 

EsperBolt

에스퍼 Bolt는 스마트카 운전자 가운데 과속을 하는 운전자를 찾아 이벤트를 발생시키는 기능입니다.

  • KafkaSpout => EsperBolt
  • 에스퍼 Bort의 이벤트 구현은 "에스퍼 CEP 엔진"을 이용
  • EPL 쿼리로 30초 기준으로 평균 속도 80Km/h 초과 운전자 룰(규칙) 작성
  • 해당 차량정보평균 속도를 초과한 시점의 시간정보다음 Bolt( 레디스 Bolt)에 전달합니다.

 

 

EsperBolt.java

package com.wikibook.bigdata.smartcar.storm;

import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import com.espertech.esper.client.Configuration;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;


public class EsperBolt extends BaseBasicBolt{

    private static final long serialVersionUID = 1L;

    private EPServiceProvider espService;

    private boolean isOverSpeedEvent = false;

    public void prepare(Map stormConf, TopologyContext context) {

        Configuration configuration = new Configuration();
        configuration.addEventType("DriverCarInfoBean", DriverCarInfoBean.class.getName());

		// EPServiceProviderManager => espService 를 꺼내오는 매니저
        espService = EPServiceProviderManager.getDefaultProvider(configuration);
        espService.initialize();
  
        int avgOverSpeed = 80;
        int windowTime  = 30;

        // EPL 쿼리 정의
        // From 절에서 윈도우 타임이라는 기능을 이용해 실시간 스트림 데이터를 Group By 한 데이터를 메모리상에 올려 놓고 30초 단위로 평균 속도를 계산
        String overSpeedEpl =  "SELECT date, carNumber, speedPedal, breakPedal, "
                                + "steerAngle, directLight, speed , areaNumber "
                                + " FROM DriverCarInfoBean.win:time_batch("+windowTime+" sec) "
                                + " GROUP BY carNumber HAVING AVG(speed) > " + avgOverSpeed;

        // EPL 조건 실행
        EPStatement driverCarinfoStmt = espService.getEPAdministrator().createEPL(overSpeedEpl);

        // 조건 발생시 처리할 이벤트 함수 등록
        driverCarinfoStmt.addListener((UpdateListener) new OverSpeedEventListener());
    }



    public void execute(Tuple tuple, BasicOutputCollector collector) {

        // TODO Auto-generated method stub
        String tValue = tuple.getString(0);

        // Tuple tuple 받은 데이터 처리
        //발생일시(14자리), 차량번호, 가속페달, 브레이크페달, 운전대회적각, 방향지시등, 주행속도, 뮤직번호
        String[] receiveData = tValue.split("\\,");

        // VO(Value Object) 생성
        DriverCarInfoBean driverCarInfoBean =new DriverCarInfoBean();

        driverCarInfoBean.setDate(receiveData[0]);
        driverCarInfoBean.setCarNumber(receiveData[1]);
        driverCarInfoBean.setSpeedPedal(receiveData[2]);
        driverCarInfoBean.setBreakPedal(receiveData[3]);
        driverCarInfoBean.setSteerAngle(receiveData[4]);
        driverCarInfoBean.setDirectLight(receiveData[5]);
        driverCarInfoBean.setSpeed(Integer.parseInt(receiveData[6]));
        driverCarInfoBean.setAreaNumber(receiveData[7]);

        espService.getEPRuntime().sendEvent(driverCarInfoBean);


        if(isOverSpeedEvent) {
            // 다음 Bolt(레디스 Bolt)로 emit(전달)
            //발생일시(14자리), 차량번호
            collector.emit(new Values(    driverCarInfoBean.getDate().substring(0,8),
                                        driverCarInfoBean.getCarNumber()+"-"+driverCarInfoBean.getDate()));
            isOverSpeedEvent = false;
        }

    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        declarer.declare(new Fields("date", "car_number"));
    }


    private class OverSpeedEventListener implements UpdateListener
    {
        @Override
        public void update(EventBean[] newEvents, EventBean[] oldEvents) {
            if (newEvents != null) {
                try {
                    isOverSpeedEvent = true;
                } catch (Exception e) {
                    System.out.println("Failed to Listener Update" + e);
                }
            }
        }
    }

}

위 코드를 나누어서 살펴봅니다.

 

에스퍼 Bolt 소스 1 - EsperBolt.java에서 에스퍼의 EPL 퀴리 정의 및 이벤트 함수 등록

public class EsperBolt extends BaseBasicBolt{

    private static final long serialVersionUID = 1L;
    private EPServiceProvider espService;
    private boolean isOverSpeedEvent = false;
    public void prepare(Map stormConf, TopologyContext context) {

        Configuration configuration = new Configuration();
        configuration.addEventType("DriverCarInfoBean", DriverCarInfoBean.class.getName());

		// EPServiceProviderManager => espService 를 꺼내오는 매니저
        espService = EPServiceProviderManager.getDefaultProvider(configuration);
        espService.initialize();
  
        int avgOverSpeed = 80;
        int windowTime  = 30;

        // EPL 쿼리 정의
        // From 절에서 윈도우 타임이라는 기능을 이용해 실시간 스트림 데이터를 Group By 한 데이터를 메모리상에 올려 놓고 30초 단위로 평균 속도를 계산
        String overSpeedEpl =  "SELECT date, carNumber, speedPedal, breakPedal, "
                                + "steerAngle, directLight, speed , areaNumber "
                                + " FROM DriverCarInfoBean.win:time_batch("+windowTime+" sec) "
                                + " GROUP BY carNumber HAVING AVG(speed) > " + avgOverSpeed;

        // EPL 조건 실행
        EPStatement driverCarinfoStmt = espService.getEPAdministrator().createEPL(overSpeedEpl);

        // 조건 발생시 처리할 이벤트 함수 등록
        driverCarinfoStmt.addListener((UpdateListener) new OverSpeedEventListener());
    }

 

 

 

// EPServiceProviderManager => espService 를 꺼내오는 매니저
        espService = EPServiceProviderManager.getDefaultProvider(configuration);
        espService.initialize();

 

- JDBC 를 꺼내왔을 때처럼

(exv: getActionDB)

 

 // EPL 쿼리 정의
        // From 절에서 윈도우 타임이라는 기능을 이용해 실시간 스트림 데이터를 Group By 한 데이터를 메모리상에 올려 놓고 30초 단위로 평균 속도를 계산
        String overSpeedEpl =  "SELECT date, carNumber, speedPedal, breakPedal, "
                                + "steerAngle, directLight, speed , areaNumber "
                                + " FROM DriverCarInfoBean.win:time_batch("+windowTime+" sec) "
                                + " GROUP BY carNumber HAVING AVG(speed) > " + avgOverSpeed;

- 차량번호로 Group By. / 평균속도로 주행하는 자동차으 넘버들  > + 과속 

        // EPL 조건 실행
        EPStatement driverCarinfoStmt = espService.getEPAdministrator().createEPL(overSpeedEpl);

        // 조건 발생시 처리할 이벤트 함수 등록
        driverCarinfoStmt.addListener((UpdateListener) new OverSpeedEventListener());
    }

이벤트의 가장 큰 특징은, 이벤트가 발생할 때 어떤 일을 한다.

이벤트 프로그램의 핵심은 언제 누가 이걸 누르는지 -> 알 수 없다

이벤트 핸들러를 인터페이스를 많이 넣어둔다. 

 

인터페이스 구현하면, 구현하도록 자동으로 내용이 생성이 되고 구현을 해야 하는데

 new OverSpeedEventListener 

이렇게 나온다는 건, 이미 구현해놓은 내용이 있다는 것. 

 


에스퍼EPL 동적 로딩

앞의 예제 소스를 보면 에스퍼의 룰인 EPL을 EsperBolt.java에 직접 작성했습니다. 하지만 EPL은 업무 룰이 바뀔 때마다 빈번하게 수정되므로 프로그램 안에 직접 하드코딩하는 것이 좋은 방법은 아닙니다. 실제 환경에서는 EPL 쿼리를 별도의 공유 저장소를 구축해 통합 보관하고, 스톰의 Bolt 같은 프로그램이 이 저장소로부터 EPL 쿼리를 주기적으로 로딩하거나 역으로 스톰의 Bolt로 푸시하는 아키텍처를 구성합니다. 

 

 

OverSpeedEventListener

 


Redis Bolt

  • KafkaSpout => EsperBolt => RedisBolt
  • 레디스 라이브러리 제디스(Jdeis) 구현
  • JedisPoolConfig를 이용해서 RedisBolt 생성과 Topology 등록

SmartCarDriverTopology.java

// Redis Bolt
String redisServer = "server02.hadoop.com";
int redisPort = 6379;
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder().setHost(redisServer).setPort(redisPort).build();

//  JedisPoolConfig를 이용해서 RedisBolt 생성
RedisBolt redisBolt = new RedisBolt(jedisPoolConfig);

// Topology 등록
driverCarTopologyBuilder.setBolt("REDIS", redisBolt, 1).shuffleGrouping("esperBolt");

에스퍼가 emit 한 내용을 


레디스 클라이언트 애플리케이션 구현

  • 레디스 => 업무시스템 1
  • 레디스 실시간 저장 정보를 주변 업무 시스템에서 곧바로 활용
  •  
package com.wikibook.bigdata.smartcar.redis;

import java.util.Set;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class RedisClient extends Thread{
    private String key;
    private Jedis jedis;

    public RedisClient(String k) {

        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        JedisPool jPool = new JedisPool(jedisPoolConfig, "server02.hadoop.com", 6379);
        jedis = jPool.getResource();

        this.key = k;
    }

    @Override    
    public void run() {
    
        Set<String> overSpeedCarList = null;
        int cnt = 1;

        try {
            while(true) {

                // 과속 차량의 데이터셋 저장
                overSpeedCarList = jedis.smembers(key);

                System.out.println("################################################");
                System.out.println("#####   Start of The OverSpeed SmartCar    #####");
                System.out.println("################################################");

                System.out.println("\n[ Try No." + cnt++ + "]");

                // 과속 차량의 데이터셋이 발생하면
                if(overSpeedCarList.size() > 0) {
                    for (String list : overSpeedCarList) {
                        System.out.println(list); // 정보 출력
                    }
                    System.out.println("");

                    jedis.del(key); // 출력 후  데이터 삭제
                }else{
                    System.out.println("\nEmpty Car List...\n");
                }
                System.out.println("################################################");
                System.out.println("######   End of The OverSpeed SmartCar    ######");
                System.out.println("################################################");
                System.out.println("\n\n");

                Thread.sleep(10 * 1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if( jedis != null ) jedis.close();
        }
    }
}

 

jdis의 smembers 는 'select'와 같다

Thread.sleep(10 * 1000); // 10초에 1번씩 run 해

HBase 테이블 생성

hbase 명령으로 테이블 생성

  • SSH : Server01(HBase Master Server) 에서 실행
  • Server02에서도 가능
  • Table Name : DriverCarInfo
  • Region : 3개(미리 만들어 놓아 안정성과 특정 크기 도달시 자동으로 분리-샤딩 )
    • Region - 일정부분 쌓이면 자동으로 뾱뾱 분리한다.
  • 칼럼 패밀리 : cf1
  • 3개의 Region에 접근하는 방식은 로우키의 HexString
  • 실행 확인 : CREATE, Table Name: default:DriverCarInfo ...
# hbase org.apache.hadoop.hbase.util.RegionSplitter DriverCarInfo HexStringSplit -c 3 -f cf1

CM으로 테이블 잘 만들었는지 확인

스톰 Topology 배포 및 실행

  • SSH : Server02 에서 실행

배포

  • ftp upload: bigdata.smartcar.storm-1.0.jar => /home/pilot-pjt/working

실행

# cd /home/pilot-pjt/working
# storm jar bigdata.smartcar.storm-1.0.jar com.wikibook.bigdata.smartcar.storm.SmartCarDriverTopology DriverCarInfo

storm jar bigdata.smartcar.storm-1.0.jar com.wikibook.bigdata.smartcar.storm.SmartCarDriverTopology DriverCarInfo

 

com.wikibook.bigdata.smartcar.storm : 자바 클래스에서 import 해 준 내용 복사

SmartCarDriverTopology  : java 클래스명

DriverCarInfo : 테이블명

 

 


레디스 적재된 데이터 확인

  • 30초 윈도우 타임동안 평균 속도 80Km/h 초과 시 적재
  • Sever02 SSH
#  redis-cli
- smembers 20220908
- smembers key-value

 

레디스 적재된 데이터 10초 간격으로 가져오는 클라이언트 애플리케이션 실행

  • ftp upload: bigdata.smartcar.redis-1.0.jar => /home/pilot-pjt/working

- cd /home/pilot-pjt/working
- java -cp bigdata.smartcar.redis-1.0.jar com.wikibook.bigdata.smartcar.redis.OverSpeedCarInfo 20220908

 

 

  • smembers 20220908 : 데이터 삭제 확인

서버 2-1

 

 

 


실시간 적재 파일럿 실행 4단계 - 실시간 적재 기능 테스트

  • CM 각 서버 정상 확인
  • 스톰, 레디스 서비스 정상 확인
  • service redis_6379 status

SmartCar 로그 시뮬레이터 작동 100대

  • 플럼 => 카프카 => 스톰 => HBase
  • Sever02 SSH 접속 시뮬레이터 위치로 이동
- cd  /home/pilot-pjt/working
- tail -f /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log
- java -cp bigdata.smartcar.loggen-1.0.jar com.wikibook.bigdata.smartcar.loggen.DriverLogMain 20210101 10 &
  • Topology Visualization 실시간 모니터링

적재 데이터 확인

Storm UI 에서 확인 및 모니터링**

HBase에 적재 데이터 확인

# hbase shell
hbase(main)> count 'DriverCarInfo' => 1000단위씩 출력
hbase(main):002:0> scan 'DriverCarInfo', {LIMIT => 20} => 20개
hbase(main):002:0> scan 'DriverCarInfo', {STARTROW => '00000030106102-Z0007', LIMIT => 1}
00000030106102-Z0007 column=cf1:area_number, timestamp=1573392537325, value=D05
00000030106102-Z0007 column=cf1:break_pedal, timestamp=1573392537325, value=0                             
00000030106102-Z0007 column=cf1:car_number, timestamp=1573392537325, value=Z0007                          
00000030106102-Z0007 column=cf1:date, timestamp=1573392537325, value=20160103000000                      
00000030106102-Z0007 column=cf1:direct_light, timestamp=1573392537325, value=L                           
00000030106102-Z0007 column=cf1:speed, timestamp=1573392537325, value=10
00000030106102-Z0007 column=cf1:speed_pedal, timestamp=1573392537325, value=2                                                                 
00000030106102-Z0007 column=cf1:steer_angle, timestamp=1573392537325, value=L1         
  • area_number : D04 지역 운행
  • break_pedal: 0 => 브레이크 밟지 않은 상태
  • car_number: 차량번호
  • date: 운행 날짜 정보
  • direct_light: N => 깜빡이지 않은 상태
  • speed: 시속
  • speed_peda: 1 => 가속 페달 1단계 진행l
  • steer_angle: F => 핸들 직진 중

** 20210101 D04 지역 운행했던 차량 번화 지역 번호 출력**

scan 'DriverCarInfo', {COLUMNS=>['cf1:car_number','cf1:area_number'], FILTER=>"RowFilter(=,'regexstring:10101202') AND SingleColumnValueFilter('cf1', 'area_number',=,'regexstring:D04')"}

시뮬레이터 종료

  • ps -ef |grep smartcar.log
  • kill -9 pid

저사양 서버 서비스 정지

  • 플럼, 카프카, HBase => CM
  • 스톰 : service sotorm-ui, storm-supervisor, stom-nimbus stop
  • 레디스: service redis_6379 stop

실시간 개발 환경 구성 : P210 참고

  • 프로젝트 java 소스 load 및 수정

 


우리에겐 하이브가 있다. 

 

어제 쌓은 하둡 데이터와, 오늘 적재한 데이터를 토대로

다음 주 수업을 진행