데이터 분석에 있어서 RDB에서 SQL문만을 사용하여 추출하는 컨셉에서 업그레이드를 하고자
NOSQL진영은 무엇을 사용하고 활용하는가? 의 물음에 SPARK를 알게되었고
단순한 집계와 필터처리를 PYSPARK에서는 어떻게 코딩을 하는가란 단순한 문서정리입니다.
사용준비
import pyspark import requests import pandas as pd import json from pyspark.sql import SparkSession from pyspark.sql.types import * from pyspark.sql import SQLContext sc = pyspark.SparkContext('local[*]') spark = SparkSession(sc) sqlContext = SQLContext(sc)
spark 객체는 분산컴퓨팅을위해 여러개를 선택할수도 있으나, 여기서는 로컬 객체 하나만 사용하겠습니다.
spark를 설치하고 셋팅하는방법은 다양하니.. 여기서 언급하지는 않으며
도커의 all-spark-notebook 을 이용하였습니다.
설치참고: spark with akka
샘플데이터
API를통해 호텔정보 획득
pyspark를 간단하게 소개하기 위해 샘플데이터라고 표현하였지만, 사실 이 단계가 가장 중요합니다.
필요로하는 집계에 이용되는 데이터의 전체량을 범위한정지어야 하는문제와 동시에, 어디서로부터(DB?,실시간 크롤링,혹은 로그) 온데이터를 가지고
데이터 처리를 할것인가란 주제이며, 여기서는 실시간 API로부터 얻은데이터를 바로 소비를 하여 Spark을 통해 분석하는 방법을 이용하겠습니다.
파이썬주제:
Request를 통해 크롤링하기
- 크롤링된 데이터를 , 원하는 Json으로 변환하기
SPARK의 분석대상 소스 : 분석대상의 소스 제약이 없다고 보시면 되며, 분석결과역시 다양한 방식으로 저장이 가능하며
1차분석 결과를 다시 분석소스로 사용이가능합니다. 여기서는 단순하게 엑셀로 Export하겠습니다.
엑셀은 분석결과를 가장 심플하고 강력하게 재필터링 할수 있는 툴의 하나입니다.
- RDB( MSSQL, 오라클,MYSQL)
- KAFKA (실시간 스트림)
- NOSQL기반 DB(하둡등등등)
- 로그기반(간단한 전송시스템 필요)
- 크롱링 데이터 바로 소비
- 파일기반(엑셀,CSV등등등)
기본 사용
Json List객체를 Spark 병렬처리용 RDD로 변환
rddHotels = sc.parallelize(hotels) rddHotels = sc.parallelize(hotels,5) //더 빠른 연산을 위해 5개로 쪼게겠다.
hotels라는 json 데이터를 분산처리를 위한 RDD로 만들어 줍니다.
몇개로 쪼갤지의 결정에따라, 단일지점에서 집계처리(일반적인 RDBMS) 보다 수백배 또는 수천배 빨라질수 있습니다.
SPARK의 장점을 설명하는 가장 심플하고 강력한 코드라고 생각됩니다.
고성능 분산연산을위해 몇개로 쪼갤지 결정하기만 하면됩니다.
( 최적화된 연산처리를위해 몇가지 고급 디플로이셋팅이 필요하긴합니다.)
Spark RDD를 전체 리스트로 반환
rddHotels.collect()
collect를 수행한다고 전체를 반환하는 과정자체가 수행되지 않습니다. (이 단계에 어떻게 하면 빨리가져올까 Spark이 고민을 하는 단계인지는 모르겠습니다.)
접근할수 있는 참조객체만 얻어오며 실제는 take(5) 를 수행했을때 가져오는 실행문이 수행됩니다.
SparkRDD 를 데이터셋으로
schema = StructType( [ StructField('hotelIdx',LongType(),True), StructField('discount',LongType(),True), StructField('name',StringType(),True) ]) dfHotels = sqlContext.createDataFrame(rddHotels, schema)
데이터셋으로 변경하면 , 집계처리에서 RDB에 사용하는 SQL문및 관계형 DB에 집합을 정복하는
JOIN문 활용도 가능합니다. 여기서 NOSQL의 의미를 알아야 합니다. Not Only SQL로 SQL문만을
사용하지 않겠다란 의미며 사실은 SQL문은 집계처리에 있어서 중요한 위치를 차지하며
RDB가 하듯이 데이터 엔티티정의를 통해 관계형 DB처럼 집계처리를 할수가 있습니다.
물론 내부적으로 고성능 분산 집계처리를 위해 맵리듀스가 숨어서 작동이될것으로 기대되나, 확인해보지 못했습니다.
데이터 셋을 통해 SQL문사용하기
dfHotels.createOrReplaceTempView('tblHotel') spark.sql('select * from tblHotel').collect()
RDB에서 사용하는 SQL문을 모두 모두 지원하지 않을테지만,
대용량으로 처리된 1차데이터를 , Sql문을 통해 편리하게 2차 요약처리를 사용할수가 있습니다.
관련키워드 : Dataframe,SQL -RDB와 성능적으로 상호연동을 하려면 Datafame이라는 Spark영역을 조금더 살펴볼필요가 있습니다.
https://spark.apache.org/docs/latest/sql-programming-guide.html
SparkData의 시각화
Spark에서 처리된 데이터를 시각화하거나? 엑셀에 저장할때 이용합니다.
pdHotels = pd.read_json( json.dumps(rddHotels.collect()) )
pdHotels
addrSummary | availableRooms | category | discount | grade | hotelIdx | latitude | longitude | name | regionName | reviewCount |
---|---|---|---|---|---|---|---|---|---|---|
0 | 7 | hotel | 169900 | special1 | 234 | 30.1 | 30.1 | 메이필드호텔 | 서울 | 223 |
여기서 의미 있는 값이 무엇인지? 필드명을 파악합니다.
필요하면 엑셀로도 저장해줍니다.
pdHotels.to_excel('output.xls', index=False)
즉시 데이터의 결과를 확인하고, 몇가지 파이썬툴을 통해 정적 웹페이지로 만들어
랭킹처리에대한 결과를 사용자에게 제공해줄수도 있습니다.
쥬피터를 사용해보진 못했지만, 요 녀석이 조금더 고급적인 시각관리기능을 제공합니다.
관련 키워드 : pandas라는 dataframe이 사용되었습니다.
Spark RDD 기본 API
필터-비싼호텔 찾기
expesiveHotel = rddHotels.filter( lambda row : row['discount'] > 300000 ) # 참조객체접근 expesiveHotel.collect() #특정 개수만 획득 expesiveHotel.take(5) ==> 데이터 예 [{'addrSummary': '강남 | 삼성역 도보 1분', 'availableRooms': 5, 'displayText': '서울 > 파크 하얏트 서울', 'distance': 0.0, 'districtName': '강남구',
중복제거-호텔 관리데이터 확인
grades = rddHotels.map( lambda row: row['grade'] ).distinct() grades.take(5) ==> 결과 : 해당 데이터가 관리하는 grade파악이 가능합니다. ['special1', 'special2', 'grade2', 'motel', 'design'] regions = rddHotels.map( lambda row: row['districtName'] ).distinct() regions.take(5) ==> 결과 ['중구', '강남구', '서초구', '강서구', '구로구']
사용자 알고리즘 적용
Spark의 장점은 , 간단한 알고리즘 뿐만 아니라 여러가지 수학적 알고리즘을 사용하여 필터/집계처리에
활용할수 있다란 점입니다. 더 나아가 텐셔플로우와 상호연동도 가능할테이지만
커스텀한 알고리즘이 어떻게 상호연동이 될수 있나 간단하게 살펴보겠습니다.
알고리즘 샘플 : math-util - 구글링해서 원하는 알고리즘을 획득합니다.( 수학자가 아니기때문에 우린 검증된 공식을 이용할것입니다.)
getDistance(
52.2296756
,
21.0122287
,
52.406374
,
16.9251681
) : 위경도 기반 얼마나 가깝나?
- edit_distance('강남호텔', '강남호텔a') : 얼마나 편집을 해야 같아지나 간단한 문자열 유사도검사?
근처 호텔찾기
#서울의 어느 호텔 'latitude': 37.5700718, 'longitude': 127.0089115 nearHotel= rddHotels.filter( lambda row : 5 > getDistance(37.5700718,127.0089115,row['latitude'],row['longitude'] ) ) nearHotel.take(10) nearHotel.count()
반경검색을 더 빠르게 작동 시키기 : pyspark 반경검색
비슷한 이름 호텔찾기
siHotel = rddHotels.filter( lambda row : 3 > edit_distance('메아 펠드',row['name'] ) ) siHotel.take(100)
마치며
개인적인 의견임으로, 모두의 동의를 얻어내기 어려운 내용입니다.
RDB에서 이용되는 SQL문을 통한 집계처리의 방법은 , Spark내에서도 DataFrame을 통해 동일하게 중요한 요소이며
중요한 데이터의 소스가 대부분 RDB에 있기때문에 이것을 버리고 생각하는것은 있을수 없는일입니다.
RDB의 한계를 단점으로 지적하였지만, 사실 그 한계까지 분석/통계에 대해 이용해본 경험이 부족하기 때문이기도합니다.
대용량데이터라는 추상적인 개념에 가려져 SQL문을 쓰지않겠다, RDB를 이용하지 않겠다란 잘못된 해석을 해왔는데
SPARK를 연구하면서 RDB의 SQL문을 같이 병행해서 학습을 해야할 필요를 느끼게되었으며
- 의미있고 원하는 데이터가 무엇이냐? 명확한 질의를 한다란것은 어려운 주제이며 , 이것을 연습하는 가장좋은것은 SQL문입니다.
-NoSQL : NotOnlySQL - SQL문만을 쓰지않겠다(O) , SQL문을 안쓰겠다 (x) , Not Used SQL( X)
맵리듀스라는 개념에 가려져, SQL문을 쓰지 않겠다란 선언으로 알고 있었습니다만
아주 큰차이가 있으며 위 차이를 구분하는데 아주 오랜시간이 걸렸습니다.
다만 빅 데이터는 왜 비관계형이어야 하는가? 이 주제는 RDB를 다시 공부하게 하는 좋은 주제입니다.
참고URL : https://blog.outsider.ne.kr/519
필자는 데이터분석 전문가가 아님을 밝혀두며, 전문가가 제공한 쿼리를 어떻게 이용하고 빠르게
어플리케이션을 통해 전달할까? 메시징 처리에 조금더 중심을둔 미들웨어 개발자이며
SPARK이 데이터 분석처리, 메시징처리 둘의 컨셉을 통합하는 바람에
학습해야할 경계를 넘어야하며, 귀찮아졌네라고 생각하는 개발자중에 하나입니다.
대충 살펴보는 데이터 분석의 변천사
SQL(RDB) → NOSQL(하둡등) → SPARK(모든것 을 이용한 통합) ← 고급연산및 AI는 지원되고 봐야함
하둡은 사용해보지도 못했는데, SPARK의 등장으로 그와 관련된 기술들을 모두 공부해야하며
사용기술은 단순해졌지만, 도메인이 요구하는 스펙은 더 늘어나게 되었습니다.
단순 배치처리 어플리케이션 개발자 : 분산된 노드에, 분산된 집계처리 명령을 날리고 실시간 서비스를 만들어야함
SQL문만 사용하던 데이터 분석가(BI) : 다양한 데이터의 소스로부터 더 복잡하고 의미있는 연관관계를 만들어야함 ( N:N → 1:N or 1:1 )