< 빅데이터 파일럿 프로젝트 - 빅데이터 수집>
수집 파일럿 실행 3단계 - 플럼 수집 기능 구현
0906
1. 일단위 수집(=대용량 데이터)
2. 실시간 수집(
- 저장하고 처리하는 방식이 다르다
Zookeeper 설치
대용량 데이터, 유실될 가능성을 염두하여 바로 hadoop으로 저장하지 않고
1차적으로 Flume 으로 저장
- 하둡으로 보내는 싱크
- 카프카로 보내는 싱크
이 싱크들이 이미 만들어져있다.
용어적으로 에이전트. 소스채널 에이전트.
중간으로 보내는 채널
어디로 내보낼 건데~ 싱크!
플럼에 저장되어 있는 것을 로그파일로 볼 거고,
싱크를 거쳐 하둡으로 뿅 들어가는 걸 볼 거다
*실시간데이터
Flume 으로 저장하지만, Kafka에 임시저장하도록 중간의 저장소를 하나 더 두었다.
카프카에서 Hbase로 그냥 내보낼 수 있는데,
위험한 데이터는 Redis, 일반적인 실시간 데이터는 하둡기반의 Hbase로 보낸다.
이벤트프로그램을 하는 애들
좀 더 간단하게 분기처리하는 것이 Storm.
좀 더 복잡할 때는 Esper
프로젝트에서는 Esper 사용
과속하는 애들을 Redis, 과속안하는 애들을 Hbase
마트를 만든다.
마트를 자동화시켜서 매일 그 일을 하도록 Oozie를 통해 워크 플로우를 만들 것이고
분석/응용
분석하고 머신러닝을 하는 과정을 볼 것이다.
Sqoop으로 데이터베이스를 빼는 작업을 하면서 마무리가 된다.
수집의 목적은 적재를 하기 전에, 데이터를 유실하지 않고 잘 담으려고 하는 것.
데이터의 종류가 다르다. 크기가 다르다.
수집 파일럿 실행 3단계 - 플럼 수집 기능 구현
- 2개의 에이전트 구현(대용량 데이터, 실시간 데이터)
SmartCar 에이전트 생성
- CM > Flume > 구성 > 구성 파일 부분 찾기
- CM 에서 제공하는 플럼 기본 에이전트
- Agent 이름 : tier1 => SmartCar_Agent
- bigdata2nd-master\CH03\예제-3.1\SmartCar_Agent.conf
SmartCar_Agent.sources = SmartCarInfo_SpoolSource
SmartCar_Agent.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks = SmartCarInfo_LoggerSink
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.type = spooldir
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.spoolDir = /home/pilot-pjt/working/car-batch-log
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.deletePolicy = immediate
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.batchSize = 1000
SmartCar_Agent.channels.SmartCarInfo_Channel.type = memory
SmartCar_Agent.channels.SmartCarInfo_Channel.capacity = 100000
SmartCar_Agent.channels.SmartCarInfo_Channel.transactionCapacity = 10000
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.type = logger
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.channel = SmartCarInfo_Channel
SmartCar_Agent.sources = SmartCarInfo_SpoolSource
SmartCar_Agent.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks = SmartCarInfo_LoggerSink
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.type = spooldir // spooldir 이라는 디렉토리에 있어. 플럼은 이 폴더를 감시하고 있다. 뭔가가 들어온다. 데이터가 들어왔을 때,
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.spoolDir = /home/pilot-pjt/working/car-batch-log //
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.deletePolicy = immediate // 일이 끝나면 즉각 지워버린다. 로그에서 다 읽고 나면 사라진다.
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.batchSize = 1000 // 얼만큼 읽을 것인지
SmartCar_Agent.channels.SmartCarInfo_Channel.type = memory // 채널에 담는 타입 : 메모리or 파일
SmartCar_Agent.channels.SmartCarInfo_Channel.capacity = 100000
SmartCar_Agent.channels.SmartCarInfo_Channel.transactionCapacity = 10000
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.type = logger
// 메모리에 데이터를 임시로 가지고 있는데, 어디로 보내줄까? 플럼로그, 내 자신의 로그
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.channel = SmartCarInfo_Channel
리눅스에 무브명령 파일만들고 클론 설정.
매일 몇 시마다 무슨 일 하라고 설정할 수 있다.
spooldir 이라는 디렉토리에 있어
interceptors 추가
- bigdata2nd-master\CH03\예제-3.2\SmartCar_Agent.conf
- Source와 Channel 중간에서 데이터 가공하는 역할
- - 메모리에 올라가는 데이터를 가공할 수 있다
- 플럼 Source에서 유입되는 데이터 중 일부를 수정/추가/가공/정제 등
- 플럼 데이터 전송 단위 : Event = Header + Body //헤더와 바디로 나눠서 들어온다
- interceptors는 Header 특정값 추가, Body 데이터 가공
- interceptors = filterInterceptor : 변수 선언
- type = regex_filter: 정규 표현식을 이용해서 필터링
- regex = ^\d{14} : 14자리 날짜 형식으로 시작하는 데이터
- excludeEvents = false : true 이면 반대로 제외된 값 수집
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors = filterInterceptor
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.type = regex_filter
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.regex = ^\\d{14}
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.excludeEvents = false
- 데이터 크롤링 할 때, 정규표현식을 사용하면 막강하다.
- 정규표현식의 기본문법이 있고, 자바, 파이썬 등 언어들이 이를 지원하는 것
Source와 Channel Sink 연결
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.channel = SmartCarInfo_Channel
DriverCarInfo 에이전트 생성
변수 추가
- bigdata2nd-master\CH03\예제-3.3\SmartCar_Agent.conf
SmartCar_Agent.sources = SmartCarInfo_SpoolSource DriverCarInfo_TailSource
SmartCar_Agent.channels = SmartCarInfo_Channel DriverCarInfo_Channel
SmartCar_Agent.sinks = SmartCarInfo_LoggerSink DriverCarInfo_KafkaSink
Source.type = exec
- 명령도 소스로 받을 수 있다
- bigdata2nd-master\CH03\예제-3.4\SmartCar_Agent.conf
- 외부 수행 명령 결과를 플럼 Event로 가져와 수집
- tail -F /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log
- 실시간
SmartCar_Agent.sources.DriverCarInfo_TailSource.type = exec
SmartCar_Agent.sources.DriverCarInfo_TailSource.command = tail -F /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log
SmartCar_Agent.sources.DriverCarInfo_TailSource.restart = true
SmartCar_Agent.sources.DriverCarInfo_TailSource.batchSize = 1000
interceptors 추가
filterInterceptor2
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors = filterInterceptor2
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.type = regex_filter
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.regex = ^\\d{14}
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.excludeEvents = false
ink.type = org.apache.flume.sink.kafka.KafkaSink
- 카프카(브로커) 설치 서버 : server02.hadoop.com:9092
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.topic = SmartCar-Topic
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.brokerList = server02.hadoop.com:9092
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.requiredAcks = 1
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.batchSize = 1000
1 플럼이 카프카와 연결할 때 사용하는 싱크.
2 카프카에 데이터를 받을 공간 / 카프카는 Topic 이 중요하다
3 브로커 : (카프카와 토픽은 그 안에 들어있는데) 브로커는 위치를 알려줘야 한다 / 브로커리스트에 카프카가 어디에 있는지 설정한다. 9092가 카프카의 디폴트 포트.
4
5 디폴트. (서버가 힘들어하면 줄어준다)
Channel.type = memory
SmartCar_Agent.channels.DriverCarInfo_Channel.type = memory
SmartCar_Agent.channels.DriverCarInfo_Channel.capacity= 100000
SmartCar_Agent.channels.DriverCarInfo_Channel.transactionCapacity = 10000
Source와 Channel Sink 연결
SmartCar_Agent.sources.DriverCarInfo_TailSource.channels = DriverCarInfo_Channel
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.channel = DriverCarInfo_Channel
플럼 에이전트 최종
- bigdata2nd-master\CH03\예제-3.4\SmartCar_Agent.conf
- 구성 파일 입력 => 변경 내용 저장
수집 파일럿 실행 4단계 - 카프카 기능 구현
- 카프카 명령어를 이용 카프카 Broker 안에서 사용할 Topic 생성
- Producer 명령어를 통해 데이터 전송
- Consumer 명령어로 수신
카프카 Topic 생성
- Server02 SSH 접속
- 모바텀 접속
kafka-topics --create --zookeeper server02.hadoop.com:2181 --replication-factor 1 --partitions 1 --topic SmartCar-Topic
- 결과: Created topic SmartCar-Topic 확인
- replication-factor 1: 다중 복제 수
- partitions 1: 분산 저장 수
- Topic의 메타 정보들이 zookeeper의 z노드에 생성 관리 됨
- SmartCar-Topic 명은 플럼 구성파일명과 동일해야 함
카프카 Topic 삭제 명령
# kafka-topics --delete --zookeeper server02.hadoop.com:2181 --topic SmartCar-Topic
- 결과: Topic SmartCar-Topic is marked for deletion.
카프카 Producer 사용
# kafka-console-producer --broker-list server02.hadoop.com:9092 --topic SmartCar-Topic
카프카 Consumer 사용
- 새로운 2개의 Server02 SSH 접속
# kafka-console-consumer --bootstrap-server server02.hadoop.com:9092 --topic SmartCar-Topic --partition 0 --from-beginning
- Producer => Broker(Topic) => Consumer1, Consumer2
- --from-beginning 옵션 : 토픽 첫 저장 데이터 부터
카프카 Producer, consumer 사용
- 송수신 테스트
- 수신 안될시 카프카 > 구성 > offsets.topic.replication.factor = 1로 변경
ctrl + z 명령어로 나온다.
수집 파일럿 실행 5단계 - 수집 기능 테스트
- Sever02 SSH 접속 시뮬레이터 위치로 이동
- cd /home/pilot-pjt/working
- 3대 스마트카 상태 정보 수집
SmartCar 로그 시뮬레이트 작동
운전정보 시뮬레이터 File 실행 DriverLogMain 날짜 차량수 &(backg)
# java -cp bigdata.smartcar.loggen-1.0.jar com.wikibook.bigdata.smartcar.loggen.DriverLogMain 20220906 3
Car정보 시뮬레이터 File 실행 CarLogMain 날짜 차량수 &(backg)
# java -cp bigdata.smartcar.loggen-1.0.jar com.wikibook.bigdata.smartcar.loggen.CarLogMain 20220906 3
서버2 :이동시키는 역할
서버2-1 : 보는 역할
시뮬레이터 정상 작동 확인
- ls -l /home/pilot-pjt/working/SmartCar/
- SmartCarStatusInfo_20210101.txt 생성 확인
- ls -l /home/pilot-pjt/working/driver-realtime-log/
- SmartCarDriverInfo.log 생성 확인
- 실시간 확인 : tail -f /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log
로그파일 Spooldir 경로로 이동
- mv /home/pilot-pjt/working/SmartCar/SmartCarStatusInfo_20210101.txt /home/pilot-pjt/working/car-batch-log/
플럼 에이전트 작동
- 플럼 > 재시작
카프카 Consumer 작동
- 카프카 > 재시작
수집 기능 점검
스마트카 상태 정보 로그파일 => 표준 출력로그 tail 로 확인
- tail -f /var/log/flume-ng/flume-cmf-flume-AGENT-server02.hadoop.com.log
- Event, Header, Body
-
오류 p
해결 - 775로 되어 있었고, 777 로 바꿔주니까. 권한을 주니까 되었다.
실시간 운전정보 DriverCarInfo 수집 확인
- kafka-console-consumer --bootstrap-server server02.hadoop.com:9092 --topic SmartCar-Topic
시뮬레이터 종료
- ps -ef |grep smartcar.log
- kill -9 pid
Tip _ 파일럿 환경 로그 확인
- Hadoop 에코시스템 서버들 위치 : /var/log/각서버(cloudera,Hadoop, Oozie)등
- Redis: /var/log/redis_6379.log
- Storm : /home/pilot-pjt/storm/logs/
- 한땀 한땀..
- Zeppelin: /home/pilot-pjt/zeppelin-x.x.x-bin-all/logs
- 내 관리 프로그램의 log들 잘 알고 있는 것이 시스템엔지니어의 기본
'Hadoop > 빅데이터 파일럿 프로젝트' 카테고리의 다른 글
빅데이터 적재 - 실시간 로그 파일 적재(1) (0) | 2022.09.06 |
---|---|
빅데이터 적재(1) - 대용량 로그 파일 적재 (0) | 2022.09.06 |
[Hadoop] 빅데이터 파일럿 프로젝트_ 파일럿 환경 구성 (0) | 2022.09.05 |
[Hadoop] 빅데이터 파일럿 프로젝트_ 스마트카 로그 시뮬레이터 설치 (0) | 2022.09.05 |
[Hadoop] CM(Cloudera Manager) 설치 (0) | 2022.09.05 |