You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 18 Next »


사용준비

import pyspark
import requests
import pandas as pd
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import SQLContext

sc = pyspark.SparkContext('local[*]')
spark = SparkSession(sc)
sqlContext = SQLContext(sc)

spark 객체는 분산컴퓨팅을위해 여러개를 선택할수도 있으나, 여기서는 로컬 객체 하나만 사용하겠습니다.

spark를 설치하고 셋팅하는방법은 다양하니.. 여기서 언급하지는 않으며 

도커의 all-spark-notebook 을 이용하였습니다.  - Docker 설치 참고

샘플데이터

API를통해 호텔정보 획득

import requests
import json

chkein = '2018-01-18'
stays = 1
url = 'http://mobile.xxxxx.co.kr/api/v3/hotels/sales?dateCheckIn={}&stays={}&provinceIdx=5&areaIdx=&page=1&limit=200&persons=2&details=true'.format(chkein,stays)
r = requests.get(url)
data = r.json()
hotels=data['data']['hotelSales']

pyspark를 간단하게 소개하기 위해 샘플데이터라고 표현하였지만, 사실 이 단계가 가장 중요합니다.

필요로하는 집계에 이용되는 데이터의 전체량을 범위한정지어야 하는문제와 동시에, 어디서로부터(DB?,실시간 크롤링,혹은 로그) 온데이터를 가지고

데이터 처리를 할것인가란 주제이며, 여기서는 실시간 API로부터 얻은데이터를 바로 분석을 하여 Spark을 통해 분석하는 방법을 이용하겠습니다.   

  • Request를 통해 크롤링하기

  • 크롤링된 데이터를 , 원하는 Json으로 추출하기


기본 사용

전체 리스트를 Spark 병렬처리용 RDD로  변환

rddHotels = sc.parallelize(hotels)


rddHotels = sc.parallelize(hotels,5) //5개로 쪼게겠다.

hotels라는 json 데이터를 분산처리를 위한 RDD로 만들어 줍니다.  

몇개로 쪼갤지의 결정에따라, 단일지점에서 집계처리(일반적인 RDBMS) 보다 수백배 또는 수천배 빨라질수 있습니다. 

( 최적화된 연산처리를위해 몇가지 고급 디플로이셋팅이 필요하긴합니다.)


Spark RDD를 전체 리스트로 반환

rddHotels.collect()


SparkRDD 를 데이터셋으로

schema = StructType( [
    StructField('hotelIdx',LongType(),True),
    StructField('discount',LongType(),True),
    StructField('name',StringType(),True)
])

dfHotels = sqlContext.createDataFrame(rddHotels, schema)


데이터 셋을 통해 SQL문사용하기

dfHotels.createOrReplaceTempView('tblHotel')


spark.sql('select * from tblHotel').collect()


SparkData의 시각화 

Spark에서 처리된 데이터를 시각화하거나? 엑셀에 저장할때 이용합니다.

pdHotels = pd.read_json( json.dumps(rddHotels.collect()) )

pdHotels
addrSummaryavailableRoomscategorydiscountgradehotelIdxlatitudelongitudenameregionNamereviewCount
07hotel169900special123430.130.1메이필드호텔서울223


여기서 의미 있는 값이 무엇인지? 필드명을 파악합니다.


필요하면 엑셀로도 저장해줍니다.

pdHotels.to_excel('output.xls', index=False)


Spark RDD 기본 API

필터-비싼호텔 찾기

expesiveHotel = rddHotels.filter( lambda row : row['discount'] > 300000 )
#전체를 다 가져오거나?
expesiveHotel.collect()
#특정 개수만 획득
expesiveHotel.take(5)


==> 데이터 예
[{'addrSummary': '강남 | 삼성역 도보 1분',
  'availableRooms': 5,
  'displayText': '서울 > 파크 하얏트 서울',
  'distance': 0.0,
  'districtName': '강남구',


중복제거-호텔 관리데이터 확인

grades = rddHotels.map( lambda row: row['grade'] ).distinct()
grades.take(5)

==> 결과 : 해당 데이터가 관리하는 grade파악이 가능합니다.
['special1', 'special2', 'grade2', 'motel', 'design']


regions = rddHotels.map( lambda row: row['districtName'] ).distinct()
regions.take(5)
==> 결과
['중구', '강남구', '서초구', '강서구', '구로구']

사용자 알고리즘 적용

Spark의 장점은 , 간단한 알고리즘 뿐만 아니라 여러가지 수학적 알고리즘을 사용하여 필터/집계처리에

활용할수 있다란 점입니다. 더 나아가 텐셔플로우와 상호연동도 가능할테이지만

커스텀한 알고리즘이 어떻게 상호연동이 될수 있나 간단하게 살펴보겠습니다. 

math-util

  • getDistance(52.2296756,21.0122287,52.406374,16.9251681) : 위경도 기반 얼마나 가깝나?
  • edit_distance('강남호텔', '강남호텔a')   : 얼마나 편집을 해야 같아지나 간단한 문자열 유사도검사?


근처 호텔찾기

#서울의 어느 호텔 'latitude': 37.5700718, 'longitude': 127.0089115
nearHotel= rddHotels.filter( lambda row : 5 > getDistance(37.5700718,127.0089115,row['latitude'],row['longitude'] ) )
nearHotel.take(10)
nearHotel.count()




비슷한 이름 호텔찾기

siHotel = rddHotels.filter( lambda row : 3 > edit_distance('메아 펠드',row['name'] ) )
siHotel.take(100)






  • No labels