본문 바로가기
학습장/Data Engineering

pyspark groupBy 샘플코드

by daedoo_ 2021. 3. 1.

DW용으로 사용중인 DB의 용량 확보를 위해

잘 사용되지 않는 로그성 데이터를 Sqoop 사용하여 HDFS로 이전 진행 예정임.

 

 

Sqoop ETL

Sqoop (SQL to Hadoop) Sqoop은 RBMS HDFS 사이에 데이터 ETL을 위해 만들어진 프로젝트이며, 하둡의 YARN, MapReduce 위에서 동작하는 하둡 에코시스템의 툴 중의 하나입니다. CLI로 간단하게 DB와 HDFS사이에 ET..

ourhistory160109.tistory.com

 

아래는 sqoop으로 적재 후 pyspark으로 건수 확인을 위한 코드

(curdate 기준 월별 건수 체크)

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

click_log_2016 = HDFS경로
 
 
if __name__ == "__main__":
    spark = SparkSession.builder.getOrCreate()
    sc = spark.sparkContext
 
    df = spark.read.parquet(click_log_2016).repartition(
        F.from_unixtime(F.col('curdate')/1000,'yyyy-MM')
    ).select(
        "*"
    )
    
    df.withColumn("curdate_yyyymm",F.from_unixtime(F.col('curdate')/1000,"yyyy-MM")
    ).groupBy(
        "curdate_yyyymm"
    ).agg(
       {"*":"count"}
    ).orderBy('curdate_yyyyMM').show()
    df.printSchema()
    
    exit(0)

curdate는 date 타입의 컬럼인데, sqoop으로 import하면 (parquet포맷으로)

13자리 unix timestamp 값으로 바뀌어서 들어감. 뒤에 3자리 빼고 변환하니 맞게 나옴

(다시 export 하면 알아서 date값으로 들어가는지는 확인 필요..)

 

읽을 수 있는 날짜로 변환해서 집계 해보고자 했는데,

read 하면서 기준이 되는 컬럼을 변환해서 groupBy 할 수는 없는 거 같음.. 

그래서 dataframe으로 먼저 뽑아낸 후에 변환 해야 하는 듯

'학습장 > Data Engineering' 카테고리의 다른 글

까먹었을때 참고할 linux 명령어 정리  (0) 2021.03.13
sqoop export  (0) 2021.03.12
Sqoop ETL  (0) 2021.02.21
DataStage Job Xml export  (0) 2021.01.17
Hadoop 설치(2)  (0) 2021.01.10

댓글