본문 바로가기

Hadoop/빅데이터 파일럿 프로젝트

[Hadoop] 빅데이터수집

< 빅데이터 파일럿 프로젝트 -  빅데이터 수집>

수집 파일럿 실행 3단계 - 플럼 수집 기능 구현


0906

1. 일단위 수집(=대용량 데이터)

2. 실시간 수집(

- 저장하고 처리하는 방식이 다르다


Zookeeper 설치

 

대용량 데이터, 유실될 가능성을 염두하여 바로 hadoop으로 저장하지 않고

1차적으로 Flume 으로 저장

- 하둡으로 보내는 싱크

- 카프카로 보내는 싱크

이 싱크들이 이미 만들어져있다. 

용어적으로 에이전트. 소스채널 에이전트.

 

중간으로 보내는 채널

어디로 내보낼 건데~ 싱크!

 

플럼에 저장되어 있는 것을 로그파일로 볼 거고, 

싱크를 거쳐 하둡으로 뿅 들어가는 걸 볼 거다

 

*실시간데이터

Flume 으로 저장하지만, Kafka에 임시저장하도록 중간의 저장소를 하나 더 두었다.

카프카에서 Hbase로 그냥 내보낼 수 있는데,

위험한 데이터는 Redis, 일반적인 실시간 데이터는 하둡기반의 Hbase로 보낸다.

 

이벤트프로그램을 하는 애들

좀 더 간단하게 분기처리하는 것이 Storm.

좀 더 복잡할 때는 Esper

 

프로젝트에서는 Esper 사용

과속하는 애들을 Redis, 과속안하는 애들을 Hbase

 

마트를 만든다.

마트를 자동화시켜서 매일 그 일을 하도록 Oozie를 통해 워크 플로우를 만들 것이고

 

분석/응용

분석하고 머신러닝을 하는 과정을 볼 것이다.

 

Sqoop으로 데이터베이스를 빼는 작업을 하면서 마무리가 된다.

 

수집의 목적은 적재를 하기 전에, 데이터를 유실하지 않고 잘 담으려고 하는 것.

데이터의 종류가 다르다. 크기가 다르다.


 

수집 파일럿 실행 3단계 - 플럼 수집 기능 구현

  • 2개의 에이전트 구현(대용량 데이터, 실시간 데이터)

SmartCar 에이전트 생성

  • CM > Flume > 구성 > 구성 파일 부분 찾기
  • CM 에서 제공하는 플럼 기본 에이전트
  • Agent 이름 : tier1 => SmartCar_Agent
  • bigdata2nd-master\CH03\예제-3.1\SmartCar_Agent.conf
SmartCar_Agent.sources  = SmartCarInfo_SpoolSource
SmartCar_Agent.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks    = SmartCarInfo_LoggerSink 

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.type = spooldir
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.spoolDir = /home/pilot-pjt/working/car-batch-log
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.deletePolicy = immediate
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.batchSize = 1000


SmartCar_Agent.channels.SmartCarInfo_Channel.type = memory
SmartCar_Agent.channels.SmartCarInfo_Channel.capacity  = 100000
SmartCar_Agent.channels.SmartCarInfo_Channel.transactionCapacity  = 10000


SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.type = logger

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.channel = SmartCarInfo_Channel

SmartCar_Agent.sources  = SmartCarInfo_SpoolSource
SmartCar_Agent.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks    = SmartCarInfo_LoggerSink 

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.type = spooldir // spooldir 이라는 디렉토리에 있어. 플럼은 이 폴더를 감시하고 있다. 뭔가가 들어온다. 데이터가 들어왔을 때, 
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.spoolDir = /home/pilot-pjt/working/car-batch-log //
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.deletePolicy = immediate // 일이 끝나면 즉각 지워버린다. 로그에서 다 읽고 나면 사라진다. 
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.batchSize = 1000 // 얼만큼 읽을 것인지

SmartCar_Agent.channels.SmartCarInfo_Channel.type = memory // 채널에 담는 타입 : 메모리or 파일
SmartCar_Agent.channels.SmartCarInfo_Channel.capacity  = 100000
SmartCar_Agent.channels.SmartCarInfo_Channel.transactionCapacity  = 10000

SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.type = logger
 // 메모리에 데이터를 임시로 가지고 있는데, 어디로 보내줄까? 플럼로그, 내 자신의 로그 


SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.channel = SmartCarInfo_Channel

 


리눅스에 무브명령 파일만들고 클론 설정.

매일 몇 시마다 무슨 일 하라고 설정할 수 있다.

spooldir 이라는 디렉토리에 있어


interceptors 추가

  • bigdata2nd-master\CH03\예제-3.2\SmartCar_Agent.conf
  • Source와 Channel 중간에서 데이터 가공하는 역할
  • - 메모리에 올라가는 데이터를 가공할 수 있다
  • 플럼 Source에서 유입되는 데이터 중 일부를 수정/추가/가공/정제 등
  • 플럼 데이터 전송 단위 : Event = Header + Body //헤더와 바디로 나눠서 들어온다
  • interceptors는 Header 특정값 추가, Body 데이터 가공
  • interceptors = filterInterceptor : 변수 선언
  • type = regex_filter: 정규 표현식을 이용해서 필터링
  • regex = ^\d{14} : 14자리 날짜 형식으로 시작하는 데이터
  • excludeEvents = false : true 이면 반대로 제외된 값 수집
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors = filterInterceptor

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.type = regex_filter
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.regex = ^\\d{14}
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.excludeEvents = false

 

- 데이터 크롤링 할 때, 정규표현식을 사용하면 막강하다.

- 정규표현식의 기본문법이 있고, 자바, 파이썬 등 언어들이 이를 지원하는 것

 

 

 

Source와 Channel Sink 연결

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.channel = SmartCarInfo_Channel

 

DriverCarInfo 에이전트 생성

변수 추가

  • bigdata2nd-master\CH03\예제-3.3\SmartCar_Agent.conf
SmartCar_Agent.sources  = SmartCarInfo_SpoolSource DriverCarInfo_TailSource
SmartCar_Agent.channels = SmartCarInfo_Channel DriverCarInfo_Channel
SmartCar_Agent.sinks    = SmartCarInfo_LoggerSink DriverCarInfo_KafkaSink

Source.type = exec

- 명령도 소스로 받을 수 있다

  • bigdata2nd-master\CH03\예제-3.4\SmartCar_Agent.conf
  • 외부 수행 명령 결과를 플럼 Event로 가져와 수집
  • tail -F /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log
    • 실시간
SmartCar_Agent.sources.DriverCarInfo_TailSource.type = exec
SmartCar_Agent.sources.DriverCarInfo_TailSource.command = tail -F /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log
SmartCar_Agent.sources.DriverCarInfo_TailSource.restart = true
SmartCar_Agent.sources.DriverCarInfo_TailSource.batchSize = 1000

interceptors 추가

filterInterceptor2
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors = filterInterceptor2

SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.type = regex_filter
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.regex = ^\\d{14}
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.excludeEvents = false

 

ink.type = org.apache.flume.sink.kafka.KafkaSink

  • 카프카(브로커) 설치 서버 : server02.hadoop.com:9092
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.topic = SmartCar-Topic
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.brokerList = server02.hadoop.com:9092
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.requiredAcks = 1
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.batchSize = 1000

1 플럼이 카프카와 연결할 때 사용하는 싱크.

2 카프카에 데이터를 받을 공간 / 카프카는 Topic 이 중요하다

3 브로커 : (카프카와 토픽은 그 안에 들어있는데) 브로커는 위치를 알려줘야 한다 / 브로커리스트에 카프카가 어디에 있는지 설정한다. 9092가 카프카의 디폴트 포트.

5 디폴트. (서버가 힘들어하면 줄어준다)

 

Channel.type = memory

SmartCar_Agent.channels.DriverCarInfo_Channel.type = memory
SmartCar_Agent.channels.DriverCarInfo_Channel.capacity= 100000
SmartCar_Agent.channels.DriverCarInfo_Channel.transactionCapacity = 10000

Source와 Channel Sink 연결

SmartCar_Agent.sources.DriverCarInfo_TailSource.channels = DriverCarInfo_Channel
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.channel = DriverCarInfo_Channel

플럼 에이전트 최종

  • bigdata2nd-master\CH03\예제-3.4\SmartCar_Agent.conf
  • 구성 파일 입력 => 변경 내용 저장


 

수집 파일럿 실행 4단계 - 카프카 기능 구현

  • 카프카 명령어를 이용 카프카 Broker 안에서 사용할 Topic 생성
  • Producer 명령어를 통해 데이터 전송
  • Consumer 명령어로 수신

카프카 Topic 생성

서버1 / 서버2 / 서버2-1

  • Server02 SSH 접속
    • 모바텀 접속 
kafka-topics --create --zookeeper server02.hadoop.com:2181 --replication-factor 1 --partitions 1 --topic SmartCar-Topic
  • 결과: Created topic SmartCar-Topic 확인
  • replication-factor 1: 다중 복제 수
  • partitions 1: 분산 저장 수
  • Topic의 메타 정보들이 zookeeper의 z노드에 생성 관리 됨
  • SmartCar-Topic 명은 플럼 구성파일명과 동일해야 함
  •  

카프카 Topic 삭제 명령

# kafka-topics --delete --zookeeper server02.hadoop.com:2181 --topic SmartCar-Topic
  • 결과: Topic SmartCar-Topic is marked for deletion.

카프카 Producer 사용

# kafka-console-producer --broker-list server02.hadoop.com:9092 --topic SmartCar-Topic

카프카 Consumer 사용

  • 새로운 2개의 Server02 SSH 접속
# kafka-console-consumer --bootstrap-server server02.hadoop.com:9092 --topic SmartCar-Topic --partition 0 --from-beginning
  • Producer => Broker(Topic) => Consumer1, Consumer2
  • --from-beginning 옵션 : 토픽 첫 저장 데이터 부터

카프카 Producer, consumer 사용

  • 송수신 테스트
  • 수신 안될시 카프카 > 구성 > offsets.topic.replication.factor = 1로 변경

ctrl + z 명령어로 나온다.

 


수집 파일럿 실행 5단계 - 수집 기능 테스트

  • Sever02 SSH 접속 시뮬레이터 위치로 이동
  • cd /home/pilot-pjt/working
  • 3대 스마트카 상태 정보 수집

SmartCar 로그 시뮬레이트 작동

운전정보 시뮬레이터 File 실행 DriverLogMain 날짜 차량수 &(backg)

# java -cp bigdata.smartcar.loggen-1.0.jar com.wikibook.bigdata.smartcar.loggen.DriverLogMain 20220906 3

Car정보 시뮬레이터 File 실행 CarLogMain 날짜 차량수 &(backg)

# java -cp bigdata.smartcar.loggen-1.0.jar com.wikibook.bigdata.smartcar.loggen.CarLogMain 20220906 3

 

서버2  :이동시키는 역할

서버2-1  : 보는 역할

 

시뮬레이터 정상 작동 확인

  • ls -l /home/pilot-pjt/working/SmartCar/
  • SmartCarStatusInfo_20210101.txt 생성 확인
  • ls -l /home/pilot-pjt/working/driver-realtime-log/
  • SmartCarDriverInfo.log 생성 확인
  • 실시간 확인 : tail -f /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log

로그파일 Spooldir 경로로 이동

  • mv /home/pilot-pjt/working/SmartCar/SmartCarStatusInfo_20210101.txt /home/pilot-pjt/working/car-batch-log/

플럼 에이전트 작동

  • 플럼 > 재시작

카프카 Consumer 작동

  • 카프카 > 재시작

수집 기능 점검

스마트카 상태 정보 로그파일 => 표준 출력로그 tail 로 확인

  • tail -f /var/log/flume-ng/flume-cmf-flume-AGENT-server02.hadoop.com.log
  • Event, Header, Body

-

오류 p

해결 - 775로 되어 있었고, 777 로 바꿔주니까. 권한을 주니까 되었다.

 

서버2-1

실시간 운전정보 DriverCarInfo 수집 확인

  • kafka-console-consumer --bootstrap-server server02.hadoop.com:9092 --topic SmartCar-Topic

시뮬레이터 종료

  • ps -ef |grep smartcar.log
  • kill -9 pid

Tip _ 파일럿 환경 로그 확인

  • Hadoop 에코시스템 서버들 위치 : /var/log/각서버(cloudera,Hadoop, Oozie)등
  • Redis: /var/log/redis_6379.log
  • Storm : /home/pilot-pjt/storm/logs/
    • 한땀 한땀..
  • Zeppelin: /home/pilot-pjt/zeppelin-x.x.x-bin-all/logs
    • 내 관리 프로그램의 log들 잘 알고 있는 것이 시스템엔지니어의 기본