on
Embulk 사용하기
Embulk 사용하기
Embulk는 플러그인을 사용해 여러 형태의 데이터 소스에서 데이터를 가져와 병렬로 로딩 할 수 있습니다. 여러개의 파일을 동시에 로딩하거나 하나의 큰 파일을 여러 파일로 쪼개서 로딩할 수 있어서 속도적인 측면에 장점이 있습니다.
인풋 파일의 데이터 타입을 자동으로 추측하여 스키마를 예측하므로 스키마 작업을 줄일 수 있습니다.
Embulk는 자바 위에서 실행이 되므로 자바가 설치가 되어 있어야 합니다. 현재 버전인 v0.9와 v0.10은 자바8을 지원하고 자바9는 공식적으로는 지원하지 않으므로 유의해야 합니다.
이미지 출처: https://www.embulk.org/
설치
아래 명령어를 차례대로 입력하면 됩니다.
$ curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar" $ chmod +x ~/.embulk/bin/embulk $ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc $ source ~/.bashrc
플러그인 목록은 https://plugins.embulk.org/ 에서 확인할 수 있습니다.
이후 아래 명령어를 입력했을 때, 버전이 출력되면 정상적으로 설치가 된 경우입니다.
$ embulk -version
플러그인
아래 명령어를 통해 플러그인을 설치할 수 있습니다
embulk gem install [플러그인] # 설치된 플러그인 리스트 확인 $ embulk gem list 2021-11-11 01:02:10.382 +0900: Embulk v0.9.23 Gem plugin path is: /home/.embulk/lib/gems *** LOCAL GEMS *** addressable (2.8.0) bundler (1.16.0) concurrent-ruby (1.1.9) declarative (0.0.20) declarative-option (0.1.0) did_you_mean (default: 1.0.1) embulk (0.9.23 java) embulk-output-bigquery (0.6.7) faraday (0.17.4) google-api-client (0.32.1) googleauth (0.9.0) httpclient (2.8.3) jar-dependencies (default: 0.3.10) jruby-openssl (0.9.21 java) jruby-readline (1.2.0 java) json (1.8.3 java) jwt (2.3.0) liquid (4.0.0) memoist (0.16.2) mini_mime (1.1.2) minitest (default: 5.4.1) msgpack (1.1.0 java) multi_json (1.15.0) multipart-post (2.1.1) net-telnet (default: 0.1.1) os (1.1.4) power_assert (default: 0.2.3) psych (2.2.4 java) public_suffix (4.0.6) rake (default: 10.4.2) rdoc (default: 4.2.0) representable (3.0.4) retriable (3.1.2) signet (0.11.0) test-unit (default: 3.1.1) time_with_zone (0.3.1) tzinfo (2.0.4) uber (0.1.0)
플러그인은 https://plugins.embulk.org/ 에서 목록을 확인할 수 있습니다.
예제
embulk는 예제를 제공해 주고 있어서 손쉽게 확인을 할 수 있습니다.
$ embulk example ./try1
위 명령어를 통해 ./try1 폴더에 예제 파일을 생성해 줍니다. 예제 내용은 제공된 csv 파일을 읽어와 표준출력을 해주는 간단한 작업입니다. 만약 ./try1 를 지정하지 않으면 현재 위치에서 embulk_example 폴더를 만들어 줍니다.
생성된 파일을 보면 seed.yml 파일이 있는데 아래와 같은 형식으로 작성되어 있습니다.
in: # 읽어 들이는 곳의 설정 type: file # 읽어 들이는 곳의 타입 path_prefix: '/home/embulk-example/csv/sample_' # csv 파일 경로. prefix로 되어 있어서 sample_01.csv.gz파일에 접근이 가능함 out: # 로딩할 곳의 설정 type: stdout # 로딩할 곳의 타입
이제 위의 파일을 guess 명령어를 통해 데이터에 대한 추측을 하고 파일을 생성할 수도 있습니다.
$ embulk guess seed.yml -o config.yml # seed.yml 파일을 읽어와 config.yml 파일로 생성
만약 -o config.yml 부분이 없다면 표준출력으로 보여주게 됩니다.
위 명령어를 실행하면 아래와 같이 생성이 됩니다.
in: type: file path_prefix: /home/embulk-example/csv/sample_ decoders: - {type: gzip} parser: charset: UTF-8 newline: LF type: csv delimiter: ',' quote: '"' escape: '"' null_string: 'NULL' trim_if_not_quoted: false skip_header_lines: 1 allow_extra_columns: false allow_optional_columns: false columns: - {name: id, type: long} - {name: account, type: long} - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'} - {name: purchase, type: timestamp, format: '%Y%m%d'} - {name: comment, type: string} out: {type: stdout}
다음 데이터를 로딩하기 전, 제대로 동작하는지 일부 데이터를 읽어 파싱하고 결과를 보여줍니다.
$ embulk preview config.yml 021-11-11 01:24:33.714 +0900: Embulk v0.9.23 2021-11-11 01:24:34.628 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected. 2021-11-11 01:24:36.721 +0900 [INFO] (main): Gem's home and path are set by default: "/home/.embulk/lib/gems" 2021-11-11 01:24:37.376 +0900 [INFO] (main): Started Embulk v0.9.23 2021-11-11 01:24:37.466 +0900 [INFO] (0001:preview): Listing local files at directory '/home/embulk-example/csv' filtering filename by prefix 'sample_' 2021-11-11 01:24:37.467 +0900 [INFO] (0001:preview): "follow_symlinks" is set false. Note that symbolic links to directories are skipped. 2021-11-11 01:24:37.468 +0900 [INFO] (0001:preview): Loading files [/home/embulk-example/csv/sample_01.csv.gz] 2021-11-11 01:24:37.476 +0900 [INFO] (0001:preview): Try to read 32,768 bytes from input source +---------+--------------+-------------------------+-------------------------+----------------------------+ | id:long | account:long | time:timestamp | purchase:timestamp | comment:string | +---------+--------------+-------------------------+-------------------------+----------------------------+ | 1 | 32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC | embulk | | 2 | 14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC | embulk jruby | | 3 | 27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | Embulk "csv" parser plugin | | 4 | 11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC | | +---------+--------------+-------------------------+-------------------------+----------------------------+
다음은 실제로 out에 입력된 곳으로 로딩하는 명령어입니다. 현재 예제의 out은 표준출력으로 되어 있기 때문에 출력이 되는 것을 볼 수 있습니다.
$ embulk run config.yml 2021-11-11 01:27:06.212 +0900: Embulk v0.9.23 2021-11-11 01:27:07.379 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected. 2021-11-11 01:27:09.874 +0900 [INFO] (main): Gem's home and path are set by default: "/home/.embulk/lib/gems" 2021-11-11 01:27:10.709 +0900 [INFO] (main): Started Embulk v0.9.23 2021-11-11 01:27:10.873 +0900 [INFO] (0001:transaction): Listing local files at directory '/home/embulk-example/csv' filtering filename by prefix 'sample_' 2021-11-11 01:27:10.875 +0900 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped. 2021-11-11 01:27:10.876 +0900 [INFO] (0001:transaction): Loading files [/home/embulk-example/csv/sample_01.csv.gz] 2021-11-11 01:27:10.929 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=16 / output tasks 8 = input tasks 1 * 8 2021-11-11 01:27:10.937 +0900 [INFO] (0001:transaction): {done: 0 / 1, running: 0} 1,32864,2015-01-27 19:23:49,20150127,embulk 2,14824,2015-01-27 19:01:23,20150127,embulk jruby 3,27559,2015-01-28 02:20:02,20150128,Embulk "csv" parser plugin 4,11270,2015-01-29 11:54:36,20150129, 2021-11-11 01:27:11.047 +0900 [INFO] (0001:transaction): {done: 1 / 1, running: 0} 2021-11-11 01:27:11.056 +0900 [INFO] (main): Committed. 2021-11-11 01:27:11.057 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"/home/embulk-example/csv/sample_01.csv.gz"},"out":{}}
CSV → PostgreSQL
이번에는 표준출력이 아닌 DB에 저장하는 방식을 설명하면서 속도를 올리는 방법도 소개합니다.
admin_no,deleted,admin_name 3,T, 2,F,관리자2 1,F,관리자1
와 같은 csv 형식을 admin.csv 로 저장한 뒤, 아래와 같이 seed.yml 을 생성합니다.
seed.yml 파일을 생성하는 이유는 실제 in, out을 할 상세속성을 만들기 위함인데 이를 수동으로 작성을 했다면 seed.yml 파일을 생성할 필요는 없습니다.
in: type: file path_prefix: '/home/embulk-example/csv/admin' out: type: postgresql
다음 명령어를 통해서 config.yml 파일을 생성합니다.
$ embulk guess seed.yml -o config.yml 2021-11-12 00:55:59.152 +0900: Embulk v0.9.23 2021-11-12 00:56:00.019 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected. 2021-11-12 00:56:02.140 +0900 [INFO] (main): Gem's home and path are set by default: "/home/.embulk/lib/gems" 2021-11-12 00:56:02.801 +0900 [INFO] (main): Started Embulk v0.9.23 2021-11-12 00:56:02.902 +0900 [INFO] (0001:guess): Listing local files at directory '/home/embulk-example/csv' filtering filename by prefix 'admin' 2021-11-12 00:56:02.903 +0900 [INFO] (0001:guess): "follow_symlinks" is set false. Note that symbolic links to directories are skipped. 2021-11-12 00:56:02.904 +0900 [INFO] (0001:guess): Loading files [/home/embulk-example/csv/admin.csv] 2021-11-12 00:56:02.916 +0900 [INFO] (0001:guess): Try to read 32,768 bytes from input source 2021-11-12 00:56:02.973 +0900 [INFO] (0001:guess): Loaded plugin embulk (0.9.23) 2021-11-12 00:56:02.992 +0900 [INFO] (0001:guess): Loaded plugin embulk (0.9.23) 2021-11-12 00:56:03.012 +0900 [INFO] (0001:guess): Loaded plugin embulk (0.9.23) 2021-11-12 00:56:03.023 +0900 [INFO] (0001:guess): Loaded plugin embulk (0.9.23) in: type: file path_prefix: /home/embulk-example/csv/admin parser: charset: UTF-8 newline: LF type: csv delimiter: ',' quote: '"' escape: '"' trim_if_not_quoted: false skip_header_lines: 1 allow_extra_columns: false allow_optional_columns: false columns: - {name: admin_no, type: long} - {name: deleted, type: boolean} # 의도와 다르게 타입이 선언됨. 확인이 필요 - {name: admin_name, type: string} out: {type: postgresql} Created 'config.yml' file.
데이터를 보고 판단하여 자동으로 스키마를 생성해 주기 때문에 의도하는 것과 다를 수 있습니다. 여기서 is_deleted 는 string 타입으로 사용할 예정인데 embulk는 boolean으로 선언하였으니 한 번 더 확인 하는 것이 필요합니다.
다음은 out을 구성할 차례입니다. PostgreSQL에 적재를 해야 하므로 아래 명령어로 플러그인을 설치해 줍니다.
$ embulk gem install embulk-output-postgresql
마지막으로 config.yml 폴더의 out 부분에 로딩할 PostgreSQL의 접속정보와 테이블, 모드를 작성해 줍니다.
out: type: postgresql host: localhost port: 5432 user: pgadmintest password: "" database: my_database table: my_table mode: insert
my_table이란 테이블에 insert를 하는 설정입니다.
아래와 같이 더 자세하게 설정할 수도 있습니다.
out: type: postgresql host: localhost user: myuser password: "" database: my_database table: my_table options: {loglevel: 2} mode: insert_direct column_options: my_col_1: {type: 'BIGSERIAL'} my_col_3: {type: 'INT NOT NULL'} my_col_4: {value_type: string, timestamp_format: `%Y-%m-%d %H:%M:%S %z`, timezone: '-0700'} my_col_5: {type: 'DECIMAL(18,9)', value_type: pass}
추가적인 설정과 설명은 https://github.com/embulk/embulk-output-jdbc/tree/master/embulk-output-postgresql 에 작성되어 있습니다.
parser-none 플러그인으로 속도 향상시키기
csv나 json 파일이 이미 out 형식에 맞게 정의되어 있다면 parser 플러그인을 사용해 속도를 향상시킬 수 있습니다. 데이터를 일일이 파싱할 필요가 없기 때문에 속도 향상이 크게 발생합니다.
$ embulk gem install embulk-parser-none
이후 config.yml 파일의 in - parser 부분을 아래와 같이 변경합니다.
in: type: file path_prefix: /home/embulk-example/csv/admin parser: type: none column_name: payload
parser-none은 https://github.com/sonots/embulk-parser-none 자세하게 설명되어 있습니다.
PostgreSQL → GCP Bigquery
이번에는 PostgreSQL에서 GCP Bigquery로 옮기는 예제입니다.
PostgreSQL에 접속하여 쿼리를 호출해 데이터를 가져와야 하므로 아래 플러그인을 설치해 줍니다.
$ embulk gem install embulk-input-postgresql
그 다음 config.yml을 수정하여 읽어올 DB의 서버와 쿼리를 작성해 줍니다.
in: type: postgresql host: localhost user: myuser password: "" database: my_database table: my_table select: "col1, col2, col3" where: "col4 != 'a'" order_by: "col1 DESC"
쿼리가 간단하다면 위의 문법으로도 충분하지만 복잡한 쿼리라면 아래와 같이 작성할 수 있습니다.
in: type: postgresql host: localhost user: myuser password: "" database: my_database query: | SELECT t1.id, t1.name, t2.id AS t2_id, t2.name AS t2_name FROM table1 AS t1 LEFT JOIN table2 AS t2 ON t1.id = t2.t1_id
또한 읽어들일 때, 타입에 따라 형변환을 진행하거나 컬럼 일부만 형변환을 진행할 수 있습니다.
in: type: postgresql host: localhost user: myuser password: "" database: my_database table: "my_table" select: "col1, col2, col3" where: "col4 != 'a'" default_column_options: TIMESTAMP: { type: string, timestamp_format: "%Y/%m/%d %H:%M:%S", timezone: "+0900"} BIGINT: { type: string } column_options: col1: {type: long} col3: {type: string, timestamp_format: "%Y/%m/%d", timezone: "+0900"} after_select: "update my_table set col5 = '1' where col4 != 'a'"
보통 embulk는 데이터를 다른 곳으로 증분할 때 많이 사용하기 때문에 아래의 옵션을 사용하면 유용하게 쓸 수 있습니다.
last_record: 증분을 위한 마지막 레코드의 값을 입력. (기본값으로는 모든 레코드를 로딩함)
fetch_rows: 한 번에 가져오는 데이터 사이즈 (기본값 10,000)
기타 더 자세한 옵션과 설명은 https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-postgresql 에서 확인할 수 있습니다.
이제 GCP Bigquery 플러그인을 설치해 줍니다.
$ embulk gem install embulk-output-bigquery
마지막으로 out을 작성해 줍니다.
out: type: bigquery mode: append auth_method: service_account json_keyfile: /path/to/json_keyfile.json project: your-project-000 dataset: your_dataset_name table: your_table_name schema_file: /data/schema_file.json
auth_method
여기서는 service_account (or json_key) 와 compute_engine 두 가지만 설명하겠습니다.
service_account
GCP service account를 json 파일 형식으로 내려 받은 다음, embulk에서 해당 파일 위치를 호출하거나 json을 직접 입력하여 사용할 수 있습니다.
out: type: bigquery auth_method: service_account json_keyfile: /path/to/json_keyfile.json 또는 out: type: bigquery auth_method: service_account json_keyfile: content: | { "private_key_id": "123456789", "private_key": "-----BEGIN PRIVATE KEY-----
ABCDEF", "client_email": "..." }
compute_engine
만약 embulk를 GCP compute engine에서 실행하는데 같은 프로젝트 내에 Bigquery가 존재한다면 인증을 생략할 수 있습니다.
out: type: bigquery auth_method: compute_engine
schema
여기서는 미리 설정한 스키마 파일과 존재하는 테이블의 스키마를 사용하는 방식 2가지를 설명합니다.
schema_file
미리 스키마 파일을 정의하여 해당 파일경로를 입력하는 형식입니다.
out: type: bigquery table: table_%Y%m%d schema_file: /path/to/schema.json
기존 테이블 스키마 사용하기
bigquery에 존재하는 기존 테이블의 스키마를 읽어서 그대로 사용하는 형식입니다.
out: type: bigquery table: table_%Y%m%d template_table: existing_table_name
GCS 경유하기
바로 Bigquery로 적재할 수 있지만 embulk는 병렬 처리로 여러 태스크에 나눠 실행이 되는데 bigquery에서는 프로젝트 당 하루 100,000개의 job만 로딩이 가능합니다. 여기서 GCS를 사용하면 GCS에서 Bigquery로 여러 파일을 옮겨도 1개의 job으로 인식하게 됩니다. 또한 병렬 처리로 진행되는 작업의 안정성을 위해선 GCS를 사용하는 것이 좋습니다.
병렬로 job이 생성되는 개수는 min_output_tasks 와 max_threads 같은 파라미터에 의존합니다.
out: type: bigquery gcs_bucket: bucket_name auto_create_gcs_bucket: true
다음은 사용하면서 유용한 옵션들입니다.
auto_create_table: 테이블이 없을 경우, 자동으로 생성
ignore_unknown_values: 알 수 없는 값을 무시
allow_quoted_newlines: 데이터에 개행문자가 있으면 true로 설정해야함. true로 할 시, 속도가 느려짐
auto_create_dataset: dataset이 없을 시, 자동 생성
path_prefix: /tmp/prefix_ 와 같은 파일 경로 접두사. 기본값은 랜덤하게 생성
더 많은 정보는 https://github.com/embulk/embulk-output-bigquery에서 확인할 수 있습니다.
멀티 쓰레드를 이용하여 로딩 속도 올리기
max_threads 와 min_output_tasks 를 설정하여 병렬로 작업을 처리할 수 있습니다.
min_output_tasks 는 최소 실행할 작업의 수를 의미하므로 4라고 정하면 최소 4개의 작업이 병렬로 작업을 처리하게 됩니다. 1개의 task에서는 여러 thread를 사용할 수 있습니다.
max_threads 는 최대 스레드를 정하여 동시성을 제어하게 됩니다. 너무 많이 설정하면 부하가 걸릴 수 있고 CPU 사용율이 낮다면 이 숫자를 높여서 속도를 향상할 수 있습니다.
exec: max_threads: 기본적으로 사용 가능한 cpu 코어 수 * 2 min_output_tasks: 기본적으로 사용 가능한 cpu 코어 수 in: type: file path_prefix: /home/embulk-example/csv/admin parser: type: none column_name: payload
from http://brownbears.tistory.com/589 by ccl(A) rewrite - 2021-11-13 01:27:58