You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 25 Next »

대용량 데이터 분석에 있어서 RDB에서 SQL문만을 사용하여 추출하는 컨셉에서 탈출하고자

NOSQL진영은 무엇을 사용하고 활용하는가? 의 물음에 SPARK를 알게되었고

단순한 집계와 필터처리를 PYSPARK에서 어떻게 작동시키는가란 단순한 문서정리입니다.


사용준비

import pyspark
import requests
import pandas as pd
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import SQLContext

sc = pyspark.SparkContext('local[*]')
spark = SparkSession(sc)
sqlContext = SQLContext(sc)

spark 객체는 분산컴퓨팅을위해 여러개를 선택할수도 있으나, 여기서는 로컬 객체 하나만 사용하겠습니다.

spark를 설치하고 셋팅하는방법은 다양하니.. 여기서 언급하지는 않으며 

도커의 all-spark-notebook 을 이용하였습니다. 


설치참고: spark with akka


샘플데이터

API를통해 호텔정보 획득

import requests
import json

chkein = '2018-01-18'
stays = 1
url = 'http://mobile.xxxxx.co.kr/api/v3/hotels/sales?dateCheckIn={}&stays={}&provinceIdx=5&areaIdx=&page=1&limit=200&persons=2&details=true'.format(chkein,stays)
r = requests.get(url)
data = r.json()
hotels=data['data']['hotelSales']

pyspark를 간단하게 소개하기 위해 샘플데이터라고 표현하였지만, 사실 이 단계가 가장 중요합니다.

필요로하는 집계에 이용되는 데이터의 전체량을 범위한정지어야 하는문제와 동시에, 어디서로부터(DB?,실시간 크롤링,혹은 로그) 온데이터를 가지고

데이터 처리를 할것인가란 주제이며, 여기서는 실시간 API로부터 얻은데이터를 바로 소비를 하여 Spark을 통해 분석하는 방법을 이용하겠습니다.   


파이썬주제:

  • Request를 통해 크롤링하기

  • 크롤링된 데이터를 , 원하는 Json으로 변환하기


기본 사용

Json List객체를  Spark 병렬처리용 RDD로  변환

rddHotels = sc.parallelize(hotels)

rddHotels = sc.parallelize(hotels,5) //더 빠른 연산을 위해 5개로 쪼게겠다.

hotels라는 json 데이터를 분산처리를 위한 RDD로 만들어 줍니다.  

몇개로 쪼갤지의 결정에따라, 단일지점에서 집계처리(일반적인 RDBMS) 보다 수백배 또는 수천배 빨라질수 있습니다. 

SPARK의 장점을 설명하는 가장 심플하고 강력한 코드라고 생각됩니다.

고성능 분산연산을위해 몇개로 쪼갤지 결정하기만 하면됩니다.

( 최적화된 연산처리를위해 몇가지 고급 디플로이셋팅이 필요하긴합니다.)

Spark RDD를 전체 리스트로 반환

rddHotels.collect()

collect를 수행한다고 전체를 반환하는 과정자체가 수행되지 않습니다. (이 단계에 어떻게 하면 빨리가져올까 Spark이 고민을 하는 단계인지는 모르겠습니다.)

접근할수 있는 참조객체만 얻어오며 실제는 take(5) 를 수행했을때 가져오는 실행문이 수행됩니다.



SparkRDD 를 데이터셋으로

schema = StructType( [
    StructField('hotelIdx',LongType(),True),
    StructField('discount',LongType(),True),
    StructField('name',StringType(),True)
])

dfHotels = sqlContext.createDataFrame(rddHotels, schema)

데이터셋으로 변경하면 , 집계처리에서 RDB에 사용하는 SQL문및 관계형 DB에 집합을 정복하는

JOIN문 활용도 가능합니다.  여기서 NOSQL의 의미를 알아야 합니다. Not Only SQL로 SQL문만을

사용하지 않겠다란 의미며 사실은 SQL문은 집계처리에 있어서 중요한 위치를 차지하며

RDB가 하듯이 데이터 엔티티정의를 통해 관계형 DB처럼 집계처리를 할수가 있습니다.  


|


데이터 셋을 통해 SQL문사용하기

dfHotels.createOrReplaceTempView('tblHotel')

spark.sql('select * from tblHotel').collect()

RDB에서 사용하는 SQL문을 모두 모두 지원하지 않을테지만, 

대용량 데이터속에서 sql문을 통해 추출을 진행할수가 있습니다.


SparkData의 시각화 

Spark에서 처리된 데이터를 시각화하거나? 엑셀에 저장할때 이용합니다.

pdHotels = pd.read_json( json.dumps(rddHotels.collect()) )

pdHotels
addrSummaryavailableRoomscategorydiscountgradehotelIdxlatitudelongitudenameregionNamereviewCount
07hotel169900special123430.130.1메이필드호텔서울223


여기서 의미 있는 값이 무엇인지? 필드명을 파악합니다.


필요하면 엑셀로도 저장해줍니다.

pdHotels.to_excel('output.xls', index=False)

즉시 데이터의 결과를 확인하고, 몇가지 파이썬툴을 통해 정적 웹페이지로 만들어

랭킹처리에대한 결과를 사용자에게 제공해줄수도 있습니다.


Spark RDD 기본 API

필터-비싼호텔 찾기

expesiveHotel = rddHotels.filter( lambda row : row['discount'] > 300000 )
#전체를 다 가져오거나?
expesiveHotel.collect()
#특정 개수만 획득
expesiveHotel.take(5)


==> 데이터 예
[{'addrSummary': '강남 | 삼성역 도보 1분',
  'availableRooms': 5,
  'displayText': '서울 > 파크 하얏트 서울',
  'distance': 0.0,
  'districtName': '강남구',


중복제거-호텔 관리데이터 확인

grades = rddHotels.map( lambda row: row['grade'] ).distinct()
grades.take(5)

==> 결과 : 해당 데이터가 관리하는 grade파악이 가능합니다.
['special1', 'special2', 'grade2', 'motel', 'design']


regions = rddHotels.map( lambda row: row['districtName'] ).distinct()
regions.take(5)
==> 결과
['중구', '강남구', '서초구', '강서구', '구로구']

사용자 알고리즘 적용

Spark의 장점은 , 간단한 알고리즘 뿐만 아니라 여러가지 수학적 알고리즘을 사용하여 필터/집계처리에

활용할수 있다란 점입니다. 더 나아가 텐셔플로우와 상호연동도 가능할테이지만

커스텀한 알고리즘이 어떻게 상호연동이 될수 있나 간단하게 살펴보겠습니다. 

math-util

  • getDistance(52.2296756,21.0122287,52.406374,16.9251681) : 위경도 기반 얼마나 가깝나?
  • edit_distance('강남호텔', '강남호텔a')   : 얼마나 편집을 해야 같아지나 간단한 문자열 유사도검사?


근처 호텔찾기

#서울의 어느 호텔 'latitude': 37.5700718, 'longitude': 127.0089115
nearHotel= rddHotels.filter( lambda row : 5 > getDistance(37.5700718,127.0089115,row['latitude'],row['longitude'] ) )
nearHotel.take(10)
nearHotel.count()


반경검색을 더 빠르게 작동 시키기 : pyspark 반경검색


비슷한 이름 호텔찾기

siHotel = rddHotels.filter( lambda row : 3 > edit_distance('메아 펠드',row['name'] ) )
siHotel.take(100)







  • No labels