[한국통계학회] 통계계산연구회 튜토리얼

한국통계학회
Author

김보람

Published

June 29, 2023

ref

해당 자료는 2023 한국통계학회 통계계산연구회 여인권 교수님 자료임

https://github.com/statfunny/Bigdata-statistical-Analysis

아래 내용 실행하기 전에 깔아야 할 게 엄청 많음……

# pip install pyarrow pyspark==3.4.0

효율적인 메모리 관리와 프로그램 작성

빅데이터분석에서의 메모리 관리

  • 문자열보다는 범주형
  • 범위가 제한적인 정수형
  • 최소한의 실수형
  • 이진(binary)인 경우 Boolean(True/False)
import pandas as pd
import numpy as np
def 데이터프레임생성(size):
    df = pd.DataFrame()
    df["나이"] = np.random.choice(100,size)
    df["수행평가1"] = np.random.choice(["A","B","C","D","F"], size)
    df["수행평가2"] = np.random.choice(["상","중","하"], size)    
    df["학점"] = np.random.choice(["[0,3)","[3,3.5)","[3.5,4)","[4,4.3]"], size)
    df["합격확률"] = np.random.uniform(0,1,size)
    df["결과"] = np.random.choice(["합격","불합격"],size)
    return df
df = 데이터프레임생성(1000000)
# df = 데이터프레임생성(1_000_000) # 컴마 대신에 언더바를 작성해서 나눠주기
df1 = df.copy()
df2 = df.copy()
df1.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000000 entries, 0 to 999999
Data columns (total 6 columns):
 #   Column  Non-Null Count    Dtype  
---  ------  --------------    -----  
 0   나이      1000000 non-null  int64  
 1   수행평가1   1000000 non-null  object 
 2   수행평가2   1000000 non-null  object 
 3   학점      1000000 non-null  object 
 4   합격확률    1000000 non-null  float64
 5   결과      1000000 non-null  object 
dtypes: float64(1), int64(1), object(4)
memory usage: 45.8+ MB

수행작업

  • 수행평가1, 학점에 따라 데이터를 나누고 그 안에서 나이의 순위
  • 수행평가1, 학점에 따라 데이터를 나누고 그 안에서 합격확률의 순위
  • 수행평가1, 학점, 결과에 따라 데이터를 나누고 그 안에서 합격확률의 순위
  • 수행시간계산
    • %timeit : 반복 작업을 하며 해당 프로그램을 수행하는데 걸린 시간의 평균과 표준편차 제고
%timeit df1["순위1"] = df1.groupby(["수행평가1","학점"])["나이"].rank()
%timeit df1["순위2"] = df1.groupby(["수행평가1","학점"])["합격확률"].rank()
%timeit df1["순위3"] = df1.groupby(["수행평가1","학점","결과"])["합격확률"].rank()
157 ms ± 533 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
219 ms ± 2.95 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
256 ms ± 384 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

string \(\rightarrow\) 범주형

df2["수행평가1"] = df2["수행평가1"].astype('category')
df2["수행평가2"] = df2["수행평가2"].astype('category')
df2["학점"] = df2["학점"].astype('category')
df2.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000000 entries, 0 to 999999
Data columns (total 6 columns):
 #   Column  Non-Null Count    Dtype   
---  ------  --------------    -----   
 0   나이      1000000 non-null  int64   
 1   수행평가1   1000000 non-null  category
 2   수행평가2   1000000 non-null  category
 3   학점      1000000 non-null  category
 4   합격확률    1000000 non-null  float64 
 5   결과      1000000 non-null  object  
dtypes: category(3), float64(1), int64(1), object(1)
memory usage: 25.7+ MB

Downcastrng

  • int8: -128~127
    • uint8: 0~255
  • int16: -32,768 ~ 32,767
    • uint16: 0~65,535
  • int32: -2,147,483,648~2,147,483,647
    • uint32: 0~ 4,294,967,295
  • int64: -9,223,372,036,854,775,808 ~ -9,223,372,036,854,775,807
    • uint64: 0~18,446,744,073,709,551,615
df2["나이"] = df2["나이"].astype('int8')
df2.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000000 entries, 0 to 999999
Data columns (total 6 columns):
 #   Column  Non-Null Count    Dtype   
---  ------  --------------    -----   
 0   나이      1000000 non-null  int8    
 1   수행평가1   1000000 non-null  category
 2   수행평가2   1000000 non-null  category
 3   학점      1000000 non-null  category
 4   합격확률    1000000 non-null  float64 
 5   결과      1000000 non-null  object  
dtypes: category(3), float64(1), int8(1), object(1)
memory usage: 19.1+ MB
df2["합격확률"] = df2["합격확률"].astype('float32')
df2.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000000 entries, 0 to 999999
Data columns (total 6 columns):
 #   Column  Non-Null Count    Dtype   
---  ------  --------------    -----   
 0   나이      1000000 non-null  int8    
 1   수행평가1   1000000 non-null  category
 2   수행평가2   1000000 non-null  category
 3   학점      1000000 non-null  category
 4   합격확률    1000000 non-null  float32 
 5   결과      1000000 non-null  object  
dtypes: category(3), float32(1), int8(1), object(1)
memory usage: 15.3+ MB
df2["결과"] = df2["결과"].map({"합격":True,"불합격":False})
df2.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000000 entries, 0 to 999999
Data columns (total 6 columns):
 #   Column  Non-Null Count    Dtype   
---  ------  --------------    -----   
 0   나이      1000000 non-null  int8    
 1   수행평가1   1000000 non-null  category
 2   수행평가2   1000000 non-null  category
 3   학점      1000000 non-null  category
 4   합격확률    1000000 non-null  float32 
 5   결과      1000000 non-null  bool    
dtypes: bool(1), category(3), float32(1), int8(1)
memory usage: 8.6 MB
%timeit df2["순위1"] = df2.groupby(["수행평가1","학점"])["나이"].rank()
%timeit df2["순위2"] = df2.groupby(["수행평가1","학점"])["합격확률"].rank()
%timeit df2["순위3"] = df2.groupby(["수행평가1","학점","결과"])["합격확률"].rank()
104 ms ± 377 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
164 ms ± 2.18 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
169 ms ± 1.55 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
## 파일 저장 및 읽기
변수 = ["나이","수행평가1","수행평가2","학점","합격확률","결과"] 
df1 = df1[변수]
df2 = df2[변수]
df1.to_csv("BSA03_df1.csv",index=False)
df2.to_csv('BSA03_df2.csv',index=False)
df1csv = pd.read_csv('BSA03_df1.csv')
df2csv = pd.read_csv('BSA03_df2.csv')

df1.to_csv : csv파일로 저장해라.

df1csv.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000000 entries, 0 to 999999
Data columns (total 6 columns):
 #   Column  Non-Null Count    Dtype  
---  ------  --------------    -----  
 0   나이      1000000 non-null  int64  
 1   수행평가1   1000000 non-null  object 
 2   수행평가2   1000000 non-null  object 
 3   학점      1000000 non-null  object 
 4   합격확률    1000000 non-null  float64
 5   결과      1000000 non-null  object 
dtypes: float64(1), int64(1), object(4)
memory usage: 45.8+ MB
df2csv.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000000 entries, 0 to 999999
Data columns (total 6 columns):
 #   Column  Non-Null Count    Dtype  
---  ------  --------------    -----  
 0   나이      1000000 non-null  int64  
 1   수행평가1   1000000 non-null  object 
 2   수행평가2   1000000 non-null  object 
 3   학점      1000000 non-null  object 
 4   합격확률    1000000 non-null  float64
 5   결과      1000000 non-null  bool   
dtypes: bool(1), float64(1), int64(1), object(3)
memory usage: 39.1+ MB
# pip install pyarrow
df2.to_parquet('BSA03_df2.parquet')
df2pqt = pd.read_parquet('BSA03_df2.parquet')
df2pqt.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000000 entries, 0 to 999999
Data columns (total 6 columns):
 #   Column  Non-Null Count    Dtype   
---  ------  --------------    -----   
 0   나이      1000000 non-null  int8    
 1   수행평가1   1000000 non-null  category
 2   수행평가2   1000000 non-null  category
 3   학점      1000000 non-null  category
 4   합격확률    1000000 non-null  float32 
 5   결과      1000000 non-null  bool    
dtypes: bool(1), category(3), float32(1), int8(1)
memory usage: 8.6 MB
  • pandas에서 제공함

효율적인 프로그램

반복적인 작업 진행시 for문 말고 아래와 같은 문법 사용

수행작업

“평가”라는 새로운 변수에 - “나이”가 65세 미만이거나 “합격확률”이 0.6 이상이고 “학점”이 [4,4.3]이면 “수행평가1”를 - 위 조건이 아니면 “수행평가2”를 대입

def 변수추가(행자료):
    if 행자료["나이"] < 65:
        return 행자료["수행평가1"]
    if (행자료["합격확률"] >= 0.6) & (행자료["학점"] == "[4,4.3]"):
        return 행자료["수행평가1"]
    return(행자료["수행평가2"])

Loop를 이용한 프로그램

df =데이터프레임생성(100_000)
df1 = df.copy()
df2 = df.copy()
df3 = df.copy()
%%timeit
for index, row in df1.iterrows():
    df1.loc[index,"평가"] = 변수추가(row)
17.3 s ± 1.05 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

Apply를 이용한 프로그램

%%timeit
df2["평가"] = df2.apply(변수추가,axis=1)
1.54 s ± 104 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Vectorized를 이용한 프로그램

(df3["나이"] < 65) | ((df3["합격확률"] >= 0.6) & (df3["학점"] == "[4,4.3]"))
0         True
1         True
2        False
3        False
4         True
         ...  
99995     True
99996     True
99997     True
99998     True
99999    False
Length: 100000, dtype: bool
%%timeit
df3["평가"] = df3["수행평가2"]
조건 = (df3["나이"] < 65) | ((df3["합격확률"] >= 0.6) & (df3["학점"] == "[4,4.3]"))
df3.loc[조건,"평가"] = df["수행평가1"]
23.2 ms ± 1.66 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
  • Vectorized로 바꾸는게 가장 빠르게 할 수 있음.

Set up the environment variables for Pyspark, Java, Spark, and python

  • 오류 발생 시
import os
import sys
os.environ['JAVA_HOME'] = "C:\Java"
os.environ['SPARK_HOME'] = "C:\spark-3.4.0"
os.environ['PYLIB'] = "C:\spark-3.4.0\python\lib"
sys.path.insert(0,os.environ['PYLIB']+"\py4j-0.10.9.7-src.zip")
sys.path.insert(0,os.environ['PYLIB']+"\pyspark.zip")
import pyspark
from pyspark.sql import SparkSession
스파크 = SparkSession.builder.appName('Test').getOrCreate()
스파크

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.3.2
Master
local[*]
AppName
Test

웹브라우저에서 localhost:4040 연결

Pyspark에서 hdfs 데이터 불러오기

  • CMD에서 start-dfs.cmd와 start-yarn.cmd 실행 후
  • Spark 경로를 찾지 못하는 경우
!pip install findspark
import findspark
import os
findspark.find()
findspark.init(os.environ.get("SPARK_HOME"))
sparkDF1 = 스파크.read.csv("hdfs://localhost:9000/Spark/BSA03_df1.csv")
sparkDF1.show(10)
+----+---------+---------+-------+-------------------+------+
| _c0|      _c1|      _c2|    _c3|                _c4|   _c5|
+----+---------+---------+-------+-------------------+------+
|나이|수행평가1|수행평가2|   학점|           합격확률|  결과|
|  19|        C|       하|  [0,3)| 0.1704850998911155|불합격|
|  78|        F|       중|[3,3.5)| 0.7007295241834984|불합격|
|  78|        F|       중|[4,4.3]|0.06793823954810418|불합격|
|  23|        A|       중|[3.5,4)| 0.8262506446089442|  합격|
|  97|        C|       하|[4,4.3]| 0.5911258463622218|불합격|
|  45|        B|       하|  [0,3)| 0.3677844602679712|  합격|
|  66|        A|       하|[3,3.5)| 0.9721303956886912|  합격|
|  66|        F|       중|[4,4.3]|0.33333421672000396|  합격|
|  61|        A|       상|[3,3.5)| 0.7048925310189916|불합격|
+----+---------+---------+-------+-------------------+------+
only showing top 10 rows
sparkDF1.printSchema()
root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
sparkDF2 = 스파크.read.csv("hdfs://localhost:9000/Spark/BSA03_df1.csv", header=True, encoding="utf-8", inferSchema="true")
#sparkDF2 = 스파크.read.option('encoding','utf-8').option('header',True).option(inferSchema='True') \
#    .csv("hdfs://localhost:9000/Spark/BSA03_df1.csv")
sparkDF2.printSchema()
root
 |-- 나이: integer (nullable = true)
 |-- 수행평가1: string (nullable = true)
 |-- 수행평가2: string (nullable = true)
 |-- 학점: string (nullable = true)
 |-- 합격확률: double (nullable = true)
 |-- 결과: string (nullable = true)

- 하둡에 설치된 데이터를 가지고 올 때

  • inferSchema="true" 설정을 해주면 형태에 맞게 가지고 옴 (위에서는 string으로 다 가져옴)

여러개의 CSV 파일 읽기

  • can also read multiple csv files, just pass all file names by separating comma as a path
sparkDF = 스파크.read.csv("path1,path2,path3")

직접 읽을 데이터의 type 지정

import pyspark
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType

스키마 = StructType() \
    .add("나이",IntegerType(),True) \
    .add("수행평가1",StringType(),True) \
    .add("수행평가2",StringType(),True) \
    .add("학점",StringType(),True) \
    .add("합격확률",DoubleType(),True) \
    .add("결과",StringType(),True)

sparkDF = 스파크.read.format('csv')\
    .option('header',True).schema(스키마)\
    load("hdfs://localhost:9000/Spark/BSA03_df1.csv")
sparkDF3 = 스파크.read.parquet("hdfs://localhost:9000/Spark/BSA03_df2.parquet")
sparkDF3.show(10)
+----+---------+---------+-------+----------+-----+
|나이|수행평가1|수행평가2|   학점|  합격확률| 결과|
+----+---------+---------+-------+----------+-----+
|  19|        C|       하|  [0,3)| 0.1704851|false|
|  78|        F|       중|[3,3.5)|0.70072955|false|
|  78|        F|       중|[4,4.3]|0.06793824|false|
|  23|        A|       중|[3.5,4)| 0.8262507| true|
|  97|        C|       하|[4,4.3]|0.59112585|false|
|  45|        B|       하|  [0,3)|0.36778447| true|
|  66|        A|       하|[3,3.5)| 0.9721304| true|
|  66|        F|       중|[4,4.3]| 0.3333342| true|
|  61|        A|       상|[3,3.5)| 0.7048925|false|
|  46|        C|       상|[3,3.5)| 0.8856867|false|
+----+---------+---------+-------+----------+-----+
only showing top 10 rows

DataFrame을 다른 형식으로 변환하고 저장/불러오기

  • pandas DataFrame \(\Longleftrightarrow\) spark DataFrame
  • csv \(\Longleftrightarrow\) parquet
pandasDF_spark = sparkDF2.toPandas()
pandasDF_spark.head()
나이 수행평가1 수행평가2 학점 합격확률 결과
0 19 C [0,3) 0.170485 불합격
1 78 F [3,3.5) 0.700730 불합격
2 78 F [4,4.3] 0.067938 불합격
3 23 A [3.5,4) 0.826251 합격
4 97 C [4,4.3] 0.591126 불합격

pandas DataFrame을 spark DataFrame으로

Apache Spark uses Apache Arrow which is an in-memory columnar format to transfer the data between Python and JVM. You need to enable to use Arrow as this is disabled by default and have Apache Arrow (PyArrow) install on all Spark cluster nodes using pip install pyspark[sql] or by directly downloading from Apache Arrow for Python.

### !pip install pyarrow
스파크.conf.set("spark.sql.execution.arrow.pyspark.enabled","true")
## pandas 2.0 : AttributeError: 'DataFrame' object has no attribute 'iteritems'
## -->  iteritems was removed in pandas 2.0 ==> pandas downgrade 
#sparkDF_pandas = 스파크.createDataFrame(df1csv)
#sparkDF_pandas.show(10)

When an error occurs,

Spark automatically fallback to non-Arrow optimization implementation, this can be controlled by spark.sql.execution.arrow.pyspark.fallback.enabled.

spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")

Pyspark vs Python

  • pyspark는 scala로 만든 spark의 python 버전
  • scala는 JVM object \(\rightarrow\) pyspark 또한 JVM object
  • python은 python object
  • python(pandas) \(\rightarrow\) py4J \(\rightarrow\) scala(pyspark) : 오류가 자주 발생

Spark에서

  • sparkDF.write.csv(“경로”)
  • sparkDF.format(‘csv’).save(“경로”)
  • 기존 파일이 있는 경우
    • 덮어쓰기: sparkDF.write.mode(‘overwrite’).csv(“경로”)
    • 추가하기: sparkDF.write.mode(‘append’).csv(“경로”)
    • 무시하기: sparkDF.write.mode(‘ignore’).csv(“경로”)
    • 오류발생: sparkDF.write.mode(‘error’).csv(“경로”) \(\Leftarrow\) default
sparkDF3.write.csv("hdfs://localhost:9000/Test/csv")
sparkDF3.write.parquet("hdfs://localhost:9000/Test/parquet")

Pyspark 기본 예제

df_hdfs = 스파크.read.csv("hdfs://localhost:9000/Spark/Employee.csv", header=True, encoding='cp949', inferSchema='true')
df_hdfs.show(10)
+---+------+----+------+-------+--------+-------+-------+--------+
| id|gender|educ|jobcat| salary|salbegin|jobtime|prevexp|minority|
+---+------+----+------+-------+--------+-------+-------+--------+
|  1|  남성|  15|경영자|57000.0|   27000|     98|    144|      No|
|  2|  남성|  16|사무직|40200.0|   18750|     98|     36|      No|
|  3|  여성|  12|사무직|21450.0|   12000|     98|    381|      No|
|  4|  여성|   8|사무직|21900.0|   13200|     98|    190|      No|
|  5|  남성|  15|사무직|45000.0|   21000|     98|    138|      No|
|  6|  남성|  15|사무직|32100.0|   13500|     98|     67|      No|
|  7|  남성|  15|사무직|36000.0|   18750|     98|    114|      No|
|  8|  여성|  12|사무직|21900.0|    9750|     98|      0|      No|
|  9|  여성|  15|사무직|27900.0|   12750|     98|    115|      No|
| 10|  여성|  12|사무직|24000.0|   13500|     98|    244|      No|
+---+------+----+------+-------+--------+-------+-------+--------+
only showing top 10 rows
df_hdfs.where('jobcat=="경영자"').show(10)
+---+------+----+------+--------+--------+-------+-------+--------+
| id|gender|educ|jobcat|  salary|salbegin|jobtime|prevexp|minority|
+---+------+----+------+--------+--------+-------+-------+--------+
|  1|  남성|  15|경영자| 57000.0|   27000|     98|    144|      No|
| 18|  남성|  16|경영자|103750.0|   27510|     97|     70|      No|
| 27|  남성|  19|경영자| 60375.0|   27480|     96|     96|      No|
| 29|  남성|  19|경영자|135000.0|   79980|     96|    199|      No|
| 32|  남성|  19|경영자|110625.0|   45000|     96|    120|      No|
| 34|  남성|  19|경영자| 92000.0|   39990|     96|    175|      No|
| 35|  남성|  17|경영자| 81250.0|   30000|     96|     18|      No|
| 50|  남성|  16|경영자| 60000.0|   23730|     94|     59|      No|
| 53|  남성|  18|경영자| 73750.0|   26250|     94|     56|      No|
| 62|  남성|  16|경영자| 48000.0|   21750|     93|     22|      No|
+---+------+----+------+--------+--------+-------+-------+--------+
only showing top 10 rows
from pyspark.sql.functions import col, log, exp, when
df_hdfs.withColumn("Lsalary",log("salary")).withColumn("LBsalary",log("salbegin")).show(5)
+---+------+----+------+-------+--------+-------+-------+--------+------------------+------------------+
| id|gender|educ|jobcat| salary|salbegin|jobtime|prevexp|minority|           Lsalary|          LBsalary|
+---+------+----+------+-------+--------+-------+-------+--------+------------------+------------------+
|  1|  남성|  15|경영자|57000.0|   27000|     98|    144|      No|10.950806546816688|10.203592144986466|
|  2|  남성|  16|사무직|40200.0|   18750|     98|     36|      No|10.601622274607113| 9.838949031398556|
|  3|  여성|  12|사무직|21450.0|   12000|     98|    381|      No| 9.973479924356162| 9.392661928770137|
|  4|  여성|   8|사무직|21900.0|   13200|     98|    190|      No| 9.994241915804592| 9.487972108574462|
|  5|  남성|  15|사무직|45000.0|   21000|     98|    138|      No|10.714417768752456|  9.95227771670556|
+---+------+----+------+-------+--------+-------+-------+--------+------------------+------------------+
only showing top 5 rows
df_hdfs.select(["gender","jobcat"]).distinct().show()
+------+------+
|gender|jobcat|
+------+------+
|  남성|사무직|
|  여성|사무직|
|  여성|경영자|
|  남성|경영자|
|  남성|관리직|
+------+------+
df_hdfs.withColumn("Job",when(col("jobcat")=="경영자","임원").otherwise("사원")).show(5)
+---+------+----+------+-------+--------+-------+-------+--------+----+
| id|gender|educ|jobcat| salary|salbegin|jobtime|prevexp|minority| Job|
+---+------+----+------+-------+--------+-------+-------+--------+----+
|  1|  남성|  15|경영자|57000.0|   27000|     98|    144|      No|임원|
|  2|  남성|  16|사무직|40200.0|   18750|     98|     36|      No|사원|
|  3|  여성|  12|사무직|21450.0|   12000|     98|    381|      No|사원|
|  4|  여성|   8|사무직|21900.0|   13200|     98|    190|      No|사원|
|  5|  남성|  15|사무직|45000.0|   21000|     98|    138|      No|사원|
+---+------+----+------+-------+--------+-------+-------+--------+----+
only showing top 5 rows

How to use SQL in Pyspark

  • DF: DSL(domain specific language)
  • Tables: pure SQL(Structured Query Language)
  • DF(DataFrame)으로부터 (temporary, permanant) table를 create할 수 있음
df_hdfs.registerTempTable("table1")
c:\users\smu\appdata\local\programs\python\python38\lib\site-packages\pyspark\sql\dataframe.py:229: FutureWarning: Deprecated in 2.0, use createOrReplaceTempView instead.
  warnings.warn("Deprecated in 2.0, use createOrReplaceTempView instead.", FutureWarning)
  • Database 형태로 만드는 작업
스파크.sql("select count(*) from table1 group by jobcat").show()
+--------+
|count(1)|
+--------+
|      27|
|      84|
|     363|
+--------+
스파크.sql("show databases").show()
+---------+
|namespace|
+---------+
|  default|
+---------+
스파크.sql("create database db1")
스파크.sql("show databases").show()
+---------+
|namespace|
+---------+
|      db1|
|  default|
+---------+
df_hdfs.registerTempTable("table1") 
스파크.sql("show tables in default").show()
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|         |   table1|       true|
+---------+---------+-----------+
df_hdfs.registerTempTable("table1")
df_hdfs.write.saveAsTable("db1.permtable")  
## 현재 jupyter 실행 폴더의 spark-warehouse/db1.db/permtable에 저장
## 기존 table이 존재하는 경우 오류

mode 변경:

  • 관리자권한
  • append, overwrite, error, errorifexists, ignore (default: error) 중 선택
df_hdfs.write.mode('overwrite').saveAsTable("db1.permtable")
스파크.sql("show tables in db1").show()
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|      db1|permtable|      false|
|         |   table1|       true|
+---------+---------+-----------+