HDFS는 RDBMS와는 다르게 기존 데이터를 업데이트해서 사용하는데 적합하지 않다.
하지만 로그성 데이터가 아니라 상태값이나 변할 수 있는 값을 가지는 데이터인 경우 변경 사항이 계속 발생하고
이것을 주기적으로 반영해야할 필요성이 있을 수 있다.
예를 들어 상품 판매 순위를 집계하려면, 상품 판매 로그와 상품 자체에 대한 정보가 있어야 한다.
(로그에 상품에 대한 정보를 포함시킬 수도 있지만 최신 정보를 얻으려면 별도 정보가 필요할 것이다.)
로그는 변하지 않는 데이터이지만 상품에 대한 정보는 상품명, 카테고리 등이 계속 변할 수 있다.
이런 경우 데이터 전체 크기가 그렇게 크지 않은 경우 전체 데이터를 주기적으로 새로 dump할 수 있다. (Sqoop 등을 활용해서)
하지만 전체 데이터 크기가 커서 dump하는데 비용이 많이 들거나, 극히 일부만 변경되어 비효율적이라고 판단되는 경우
업데이트 방식을 고려해볼 수 있다.
업데이트 방법으로 Hive를 설치하여 Hive의 업데이트 기능을 활용할 수 있지만
Hive도 원래 기존 테이블에 insert, update, delete 하는 것에 주된 용도가 있는 것이 아니기 때문에 다른 대안을 생각해보게 된다.
이런 용도로 사용할 수 있는 프레임워크들이 존재하는데 그 중 하나가 Apache Hudi이다.
Apache Hudi는 하둡 스토리지에 스트림 프로세스를 통한 upsert, delete를 제공하는 프레임워크이다.
Hudi = Hadoop Upserts Deletes Incrementals
최초 Uber에서 개발하였으며 Amazon EMR(Elastic MapReduce)에도 제공되고 있다고 한다.
https://docs.aws.amazon.com/ko_kr/emr/latest/ReleaseGuide/emr-hudi.html
OLTP로 사용하거나 In-memory DB를 대체하는 용도가 목표는 아니며 Spark, Hive, PrestoDB와 통합해서 사용할 수 있다.
Hive나 SparkSQL과 통합해서 사용시 Hive Metastore와 테이블, 파티션 정보를 동기화 해야한다.
(Hudi에서 이를 위한 동기화 스크립트를 제공하고, Spark 처리 시 연동되도록 옵션을 설정할 수도 있다.)
제공 기능에 대한 대략적인 그림이다.
변경사항을 Hudi에 반영하고 Hudi에 쿼리를 요청해서 특정 시점의 스냅샷 데이터(Snapshot Query, 최신 시점도 가능) 혹은 특정 기간 동안의 변경 데이터 목록(Incremental Query)을 가져올 수 있다.
# 동작 방식에 대한 대략적인 설명
데이터를 디렉토리 구조로 파티셔닝하여 지정된 basepath에 저장한다. (Hive 테이블과 유사)
레코드들이 여러개의 파티션내에 여러개의 parquet 파일로 분산되어 저장된다.
데이터에 변경이 발생하면 변경사항이 반영된 새로운 parquet 파일을 생성하며 파일에 timestamp를 기록해둔다.
(delta가 아니라 매번 전체 내용을 가지는 파일을 생성한다, 그리고 밑에 나오겠지만 Merge On Read 방식은 동작 방식이 조금 다르다.)
파티션의 데이터가 변경될 때마다 parquet 파일 갯수가 늘어나는 것이다.
그리고 데이터 조회 시 timestamp를 이용해서 가장 최근의 파일 혹은 특정 시점의 파일을 선택해서 최신/이전 데이터를 가져오게 된다.
# Table Types
테이블 타입은 두 가지가 있는데 데이터의 특성에 맞게 선택하면 될 것 같다.
Copy On Write
- 기본 유형으로 데이터 commit할 때마다 그 시점에 변경 사항이 처리된다
- commit 마다 새로운 버전의 파일을 계속해서 생성하는 방식
- 데이터가 자주 변경되지 않지만 조회가 많은 경우 유리하다
- 열지향 파일 형태(Parquet)로 데이터 저장
한 파티션 내에 데이터가 4개 파일(fileid 1, 2, 3, 4)로 나뉘어 저장되어있다.
한 파티션 내의 데이터가 무조건 하나의 파일로 되어있지 않아서 index를 통해서 변경 레코드가 속한 파일을 찾아 그 파일만 새로 파일을 생성하는 방식인 것 같다. (파티션 내 파일 갯수는 데이터 크기와 옵션 값을 통해 결정되는 것 같다)
여기서 10:10 이라고 되어있는 것은 10시 10분을 의미한다.
10:10 커밋 이전의 데이터를 조회한다면 file 1의 10:05 데이터, file2의 10:05 데이터, file3의 10:05 데이터를 조회하게 된다.
(file4는 10:10 이전 데이터가 없으므로 스킵)
10:10 커밋 이후의 데이터를 조회한다면 file 1의 10:10 데이터, file2의 10:10 데이터, file3의 10:05 데이터, file4의 10:10의 데이터를 조회하게 된다.
(file3은 10:05 이후 변하지 않았기 때문에 10:10 이후에도 10:05 시점 데이터와 같다)
Merge On Read
- commit 시는 delta 로그에만 기록하고 조회 시 변경 사항을 merge하여 보여준다. (parquet 파일이 새로 생기지는 않는다)
- 조회는 적으나 데이터 변경이 많은 경우 유리하다
- Parquet 파일(데이터)과 Avro 파일(delta 로그) 조합으로 데이터 저장
- 변경사항을 반영한 parquet 파일을 새로 생성하는 작업은 compaction이라고 하며, compaction 주기(기본값 1시간), commit 개수(기본값 5개)에 따라 비동기로 처리된다
# Query Types
3가지 타입의 쿼리를 지원한다.
- Snapshot query : 특정 시점의 스냅샷 데이터 조회 (최신 데이터도 조회 가능함)
- Incremental query : 특정 기간 동안 추가/수정된 데이터 조회 (그 기간 내 최신 내용으로 조회됨)
- Read Optimized query : Merge On Read 방식에서 사용되며, Snapshot query와 유사하지만 아직 compaction되지 않은 데이터는 제외하고 조회하여 성능을 높인다
Hive, Spark SQL, PrestoDB, Impala 등을 이용해서도 데이터를 읽을 수 있는데
읽기 위해서는 쿼리 엔진이 Hudi 포맷을 처리할 수 있도록 Hudi 라이브러리를 제공해야 한다 (jar 파일 제공됨)
# 데이터 구조
지정한 basepath 내에 다음과 같은 디렉토리 구조로 저장된다.
Table basepath > Partitions > File Groups
하나의 File Group은 다음 요소들을 가지고 있다.
- File Slices : Parquet 포맷의 데이터 파일
- Log Files : Avro 포맷의 Delta Log (Merge On Read 방식일 경우만 생성됨)
# Writing Data
지원하는 연산
- Upsert
- Insert
- Bulk-insert
delete도 가능하다
지원하는 방식
- Datasource Writer
- DeltaStreamer : DFS, Kafka, Sqoop(incremental imports) 등으로 부터 변경 사항을 받아 처리하는 방식
- MultiDeltaStreamer
- Flink SQL Writer
# 성능
조회 성능
- Read Optimized query : Hive/Spark/Presto parquet 테이블 처리와 유사한 수준
- Incremental query : 주어진 시간동안 보통 얼마나 데이터가 변하는지에 따라 다르다. 예를 들어 1000개 파일로된 파티션에 100개 파일 변경만 있었다면 전체 파티션 읽는 것보다 10배 빠르다
- Real time view : Hive/Spark/Presto avro 테이블 처리와 유사한 수준
의문점
Hive처럼 서버를 구동시키는 방식이 아니라서 백그라운드 프로세스가 없는데 MOR에서 어떻게 주기적으로 merge가 가능한지는 의문점이다.
Spark를 활용한 예제 코드
작성해본 예제 코드에서는 HDFS를 스토리지로 사용하지 않고 로컬 디스크를 사용해서 작성하였다.
다음 공식 사이트 가이드 문서를 활용해서 작성하였다.
https://hudi.apache.org/docs/quick-start-guide
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("Hudi_Data_Processing_Framework")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.hive.convertMetastoreParquet", "false")
.config(
"spark.jars.packages",
"org.apache.hudi:hudi-spark-bundle_2.12:0.8.0,org.apache.spark:spark-avro_2.12:2.4.4"
)
.getOrCreate()
)
basePath = "file:///Users/user/hudi"
# 최초 데이터 insert, 테이블이 생성된다
def insert():
input_df = spark.createDataFrame(
[
("100", "2015-01-01", "refactoring", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "apache hadoop guide", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "clean python", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "hadoop the definitive guide", "2015-01-01T13:51:40.519832Z"),
("104", "2015-01-02", "fluent python", "2015-01-01T12:15:00.512679Z"),
("105", "2015-01-02", "introducing python", "2015-01-01T13:51:42.248818Z"),
],
("id", "register_ymd", "book_name", "modify_date"),
)
hudi_options = {
'hoodie.table.name': 'hudi_book',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.partitionpath.field': 'register_ymd',
'hoodie.datasource.write.table.name': 'hudi_book',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'modify_date',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2
}
(
input_df.write.format("hudi")
.options(**hudi_options)
.mode("overwrite")
.save(basePath)
)
# 최신 데이터 조회
def read():
output_df = spark.read.format("hudi").load(
basePath + "/*/*"
)
output_df.show()
# 데이터 업데이트
def update():
df = spark.createDataFrame(
[
("101", "2015-01-01", "apache hadoop guide @@@", "2015-01-11T12:14:58.597216Z"),
("102", "2015-01-01", "clean python @@@", "2015-01-11T13:51:40.417052Z"),
],
("id", "register_ymd", "book_name", "modify_date"),
)
hudi_options = {
'hoodie.table.name': 'hudi_book',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.partitionpath.field': 'register_ymd',
'hoodie.datasource.write.table.name': 'hudi_book',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'modify_date',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2
}
df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(basePath)
# incremental query
def incremental_read():
incremental_read_options = {
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.begin.instanttime': '20210629175947',
}
output_df = spark.read.format("hudi"). \
options(**incremental_read_options). \
load(basePath + "/*/*")
output_df.show()
# snapshot query, 과거 시점의 데이터 조회
def point_in_time_read():
incremental_read_options = {
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.begin.instanttime': '000',
'hoodie.datasource.read.end.instanttime': '20210629175949',
}
output_df = spark.read.format("hudi"). \
options(**incremental_read_options). \
load(basePath + "/*/*")
output_df.show()
# 데이터 삭제
def delete():
hudi_options = {
'hoodie.table.name': 'hudi_book',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.partitionpath.field': 'register_ymd',
'hoodie.datasource.write.table.name': 'hudi_book',
'hoodie.datasource.write.operation': 'delete',
'hoodie.datasource.write.precombine.field': 'modify_date',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2
}
to_delete_df = spark.createDataFrame(
[
("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
],
("id", "register_ymd", "modify_date"),
)
to_delete_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(basePath)
# insert()
# update()
# read()
# incremental_read()
# point_in_time_read()
유사 프레임워크
비슷한 목적의 다른 프레임워크들도 있다.
Delta Lake
- https://docs.delta.io/
- Databricks에서 개발하였고 Microsoft Azure에 제공하고 있다. (https://docs.microsoft.com/ko-kr/azure/databricks/delta/)
Apache Iceberg
- https://iceberg.apache.org/
- Netflix에서 개발하여 Apache로 옮겨졌다.
자세히 보지는 않았지만 두 가지 다 제공하는 기능이나 동작은 Hudi와 크게 다르지는 않고 세부적인 부분만 다른 것으로 보인다.
비교 자료들을 봤을 때 큰 차이점은 없는 것 같고
아직 나온지 오래되지 않은 프레임워크들이라 최근 자료가 아니면 신뢰성이 떨어지는 부분도 있을 것 같다.
- https://blogs.oracle.com/developers/deltalake-vs-hudi-on-oracle-cloud-infrastructure-part-2
- https://www.slideshare.net/databricks/a-thorough-comparison-of-delta-lake-iceberg-and-hudi
참고 자료
'개발 > Data Engineering' 카테고리의 다른 글
Apache Airflow 재수행 방법 정리 (0) | 2022.04.21 |
---|---|
Apache Airflow DAG 간의 dependency 설정 (0) | 2022.01.25 |
Avro와 Parquet (0) | 2021.06.24 |
태블로 데이터 원본 구성 방법 - 관계와 조인 (0) | 2021.04.16 |
HDFS 네임노드 OutOfMemory 에러 (0) | 2016.09.27 |