pyspark를 활용하면, spark을 분산 병렬 데이터처리를 유연하게 할수 있으며
보너스로 AI 툴킷인 텐셔플로우를 연동하여 사용할수 있습니다.
import tensorflow as tf import numpy as np
# 텐서플로우의 기본적인 구성을 익힙니다. # tf.constant: 말 그대로 상수입니다. hello = tf.constant('Hello, TensorFlow!') print(hello) a = tf.constant(10) b = tf.constant(32) c = tf.add(a, b) # a + b 로도 쓸 수 있음 print(c) # 위에서 변수와 수식들을 정의했지만, 실행이 정의한 시점에서 실행되는 것은 아닙니다. # 다음처럼 Session 객제와 run 메소드를 사용할 때 계산이 됩니다. # 따라서 모델을 구성하는 것과, 실행하는 것을 분리하여 프로그램을 깔끔하게 작성할 수 있습니다. # 그래프를 실행할 세션을 구성합니다. sess = tf.Session() # sess.run: 설정한 텐서 그래프(변수나 수식 등등)를 실행합니다. print(sess.run(hello)) print(sess.run([a, b, c])) # 세션을 닫습니다. sess.close()
# 플레이스홀더와 변수의 개념을 익혀봅니다 # tf.placeholder: 계산을 실행할 때 입력값을 받는 변수로 사용합니다. # None 은 크기가 정해지지 않았음을 의미합니다. X = tf.placeholder(tf.float32, [None, 3]) print(X) # X 플레이스홀더에 넣을 값 입니다. # 플레이스홀더에서 설정한 것 처럼, 두번째 차원의 요소의 갯수는 3개 입니다. x_data = [[1, 2, 3], [4, 5, 6]] # tf.Variable: 그래프를 계산하면서 최적화 할 변수들입니다. 이 값이 바로 신경망을 좌우하는 값들입니다. # tf.random_normal: 각 변수들의 초기값을 정규분포 랜덤 값으로 초기화합니다. W = tf.Variable(tf.random_normal([3, 2])) b = tf.Variable(tf.random_normal([2, 1])) # 입력값과 변수들을 계산할 수식을 작성합니다. # tf.matmul 처럼 mat* 로 되어 있는 함수로 행렬 계산을 수행합니다. expr = tf.matmul(X, W) + b sess = tf.Session() # 위에서 설정한 Variable 들의 값들을 초기화 하기 위해 # 처음에 tf.global_variables_initializer 를 한 번 실행해야 합니다. sess.run(tf.global_variables_initializer()) print("=== x_data ===") print(x_data) print("=== W ===") print(sess.run(W)) print("=== b ===") print(sess.run(b)) print("=== expr ===") # expr 수식에는 X 라는 입력값이 필요합니다. # 따라서 expr 실행시에는 이 변수에 대한 실제 입력값을 다음처럼 넣어줘야합니다. print(sess.run(expr, feed_dict={X: x_data})) sess.close()
# X 와 Y 의 상관관계를 분석하는 기초적인 선형 회귀 모델을 만들고 실행해봅니다. # X 와 Y 의 상관관계를 분석하는 기초적인 선형 회귀 모델을 만들고 실행해봅니다. x_data = [1, 2, 3] y_data = [1, 2, 3] W = tf.Variable(tf.random_uniform([1], -1.0, 1.0)) b = tf.Variable(tf.random_uniform([1], -1.0, 1.0)) # name: 나중에 텐서보드등으로 값의 변화를 추적하거나 살펴보기 쉽게 하기 위해 이름을 붙여줍니다. X = tf.placeholder(tf.float32, name="X") Y = tf.placeholder(tf.float32, name="Y") print(X) print(Y) # X 와 Y 의 상관 관계를 분석하기 위한 가설 수식을 작성합니다. # y = W * x + b # W 와 X 가 행렬이 아니므로 tf.matmul 이 아니라 기본 곱셈 기호를 사용했습니다. hypothesis = W * X + b # 손실 함수를 작성합니다. # mean(h - Y)^2 : 예측값과 실제값의 거리를 비용(손실) 함수로 정합니다. cost = tf.reduce_mean(tf.square(hypothesis - Y)) # 텐서플로우에 기본적으로 포함되어 있는 함수를 이용해 경사 하강법 최적화를 수행합니다. optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.1) # 비용을 최소화 하는 것이 최종 목표 train_op = optimizer.minimize(cost) # 세션을 생성하고 초기화합니다. with tf.Session() as sess: sess.run(tf.global_variables_initializer()) # 최적화를 100번 수행합니다. for step in range(100): # sess.run 을 통해 train_op 와 cost 그래프를 계산합니다. # 이 때, 가설 수식에 넣어야 할 실제값을 feed_dict 을 통해 전달합니다. _, cost_val = sess.run([train_op, cost], feed_dict={X: x_data, Y: y_data}) print(step, cost_val, sess.run(W), sess.run(b)) # 최적화가 완료된 모델에 테스트 값을 넣고 결과가 잘 나오는지 확인해봅니다. print("\n=== Test ===") print("X: 5, Y:", sess.run(hypothesis, feed_dict={X: 5})) print("X: 2.5, Y:", sess.run(hypothesis, feed_dict={X: 2.5}))
# 털과 날개가 있는지 없는지에 따라, 포유류인지 조류인지 분류하는 신경망 모델을 만들어봅니다. # [털, 날개] x_data = np.array( [[0, 0], [1, 0], [1, 1], [0, 0], [0, 0], [0, 1]]) # [기타, 포유류, 조류] # 다음과 같은 형식을 one-hot 형식의 데이터라고 합니다. y_data = np.array([ [1, 0, 0], # 기타 [0, 1, 0], # 포유류 [0, 0, 1], # 조류 [1, 0, 0], [1, 0, 0], [0, 0, 1] ]) ######### # 신경망 모델 구성 ###### X = tf.placeholder(tf.float32) Y = tf.placeholder(tf.float32) # 신경망은 2차원으로 [입력층(특성), 출력층(레이블)] -> [2, 3] 으로 정합니다. W = tf.Variable(tf.random_uniform([2, 3], -1., 1.)) # 편향을 각각 각 레이어의 아웃풋 갯수로 설정합니다. # 편향은 아웃풋의 갯수, 즉 최종 결과값의 분류 갯수인 3으로 설정합니다. b = tf.Variable(tf.zeros([3])) # 신경망에 가중치 W과 편향 b을 적용합니다 L = tf.add(tf.matmul(X, W), b) # 가중치와 편향을 이용해 계산한 결과 값에 # 텐서플로우에서 기본적으로 제공하는 활성화 함수인 ReLU 함수를 적용합니다. L = tf.nn.relu(L) # 마지막으로 softmax 함수를 이용하여 출력값을 사용하기 쉽게 만듭니다 # softmax 함수는 다음처럼 결과값을 전체합이 1인 확률로 만들어주는 함수입니다. # 예) [8.04, 2.76, -6.52] -> [0.53 0.24 0.23] model = tf.nn.softmax(L) # 신경망을 최적화하기 위한 비용 함수를 작성합니다. # 각 개별 결과에 대한 합을 구한 뒤 평균을 내는 방식을 사용합니다. # 전체 합이 아닌, 개별 결과를 구한 뒤 평균을 내는 방식을 사용하기 위해 axis 옵션을 사용합니다. # axis 옵션이 없으면 -1.09 처럼 총합인 스칼라값으로 출력됩니다. # Y model Y * tf.log(model) reduce_sum(axis=1) # 예) [[1 0 0] [[0.1 0.7 0.2] -> [[-1.0 0 0] -> [-1.0, -0.09] # [0 1 0]] [0.2 0.8 0.0]] [ 0 -0.09 0]] # 즉, 이것은 예측값과 실제값 사이의 확률 분포의 차이를 비용으로 계산한 것이며, # 이것을 Cross-Entropy 라고 합니다. cost = tf.reduce_mean(-tf.reduce_sum(Y * tf.log(model), axis=1)) optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.01) train_op = optimizer.minimize(cost) ######### # 신경망 모델 학습 ###### init = tf.global_variables_initializer() sess = tf.Session() sess.run(init) for step in range(100): sess.run(train_op, feed_dict={X: x_data, Y: y_data}) if (step + 1) % 10 == 0: print(step + 1, sess.run(cost, feed_dict={X: x_data, Y: y_data})) ######### # 결과 확인 # 0: 기타 1: 포유류, 2: 조류 ###### # tf.argmax: 예측값과 실제값의 행렬에서 tf.argmax 를 이용해 가장 큰 값을 가져옵니다. # 예) [[0 1 0] [1 0 0]] -> [1 0] # [[0.2 0.7 0.1] [0.9 0.1 0.]] -> [1 0] prediction = tf.argmax(model, 1) target = tf.argmax(Y, 1) print('예측값:', sess.run(prediction, feed_dict={X: x_data})) print('실제값:', sess.run(target, feed_dict={Y: y_data})) is_correct = tf.equal(prediction, target) accuracy = tf.reduce_mean(tf.cast(is_correct, tf.float32)) print('정확도: %.2f' % sess.run(accuracy * 100, feed_dict={X: x_data, Y: y_data}))
출처: https://github.com/golbin/TensorFlow-Tutorials