iMind Developers Blog

iMind開発者ブログ

Airflowでpythonのコードをspark-submit

概要 AirflowのSparkSubmitOperatorを使ってPySparkのスクリプトファイルをspark-submitで実行する。 バージョン情報 Python 3.6.7 apache-airflow==1.10.1 spark 2.3.1 PySpark側のコード 適当にHDFS上のファイルを読み込んで行数をcountするコードを書いて…

Pythonでヒストグラム平坦化とガンマ補正

概要 暗いところで撮った写真をさらっと明るくしたい。 バージョン情報 Python 3.6.5 opencv-python==3.4.2.17 サンプル画像 暗いところで撮影したかわいいワンコの画像があります。 暗い中でもそのかわいさは隠しきれませんが、よりかわいさが見えやすくな…

AirflowのSSHOperatorを使う

概要 AirflowのSSHOperatorで指定のサーバーにsshしてコマンドを実行する。 バージョン情報 apache-airflow==1.10.2 Python 3.6.8 SSHOperatorの引数 SSHOperator実行時はこのへんのパラメータを指定する。 parameter description ssh_conn_id ConnectionのI…

Airflowでタスク失敗時にSlackへメッセージを送る

概要 Airflowのタスクが失敗した際にSlackにメッセージを送るようにする。 トークン等はVariablesに保存して扱う。 バージョン情報 Python 3.6.7 apache-airflow==1.10.1 slackclient==1.3.0 導入 slackclientが必要になるので入れておく。 $ pip install sl…

PySparkで行に連番を振る

概要 PySParkで行に0〜nまでの連続する数値を採番したかった。 バージョン情報 spark-2.3.1 Python 3.5.5 サンプルデータ 下記のような2つのカラムを持つCSVファイル(100万行)を利用。 $ gunzip -c foo.csv.gz | head -5 0,0.194617 1,0.184299 2,0.988041…

PySparkからJavaのクラスを呼び出してHdfsのファイル操作

概要 PySparkでHDFS上のファイルをちょろっと操作したい時の為に、JavaのクラスをPySparkから呼び出してlsしたりwriteしたりrmするサンプルコードをまとめておく。 バージョン情報 spark-2.3.1 Hadoop 2.6.0 Python 3.5.5 FileSystemの取得 まずは必要なJava…

Apache AirflowでHello World

概要 Airflowはスケジューリングやワークフローが制御できるソフトウェア。毎日数十数百のバッチを動かしていて管理が煩雑と感じている人が使うと幸せになれる。 Pythonのスクリプトで記述できたりpipで手軽に入れられるところがPythonユーザー的には使いや…

PySparkでWikipediaのXMLをパースしてword2vec

概要 PySparkを利用して日本語版Wikipediaの全文を取り込んでわかち書きし、word2vecに放り込んでみる。 XMLのパース、わかち書き、word2vec等の全行程をPySpark上で行う。 バージョン情報 spark-2.3.1 Python 3.5.5 Janome==0.3.6 Janomeの動く環境を用意 S…

Pythonのmultiprocessingの進捗をtqdmで取りたい

概要 Pythonのmultiprocessing.Poolで並列処理をする際にtqdmで進捗を表示する方法の確認。 バージョン情報 Python 3.7.1 tqdm==4.29.1 導入 pip install tqdm サンプルコード Pool.imap、もしくはimap_unorderedを使えば進捗が出る。 import time, random f…

Pythonでファイルの行数を取るコード

概要 Pythonでファイルの行数を取るコードをシンプルな記述で書きたかった。 各コードに実行時間を記載しているが、これは1億行のファイル(520MB)をカウントした場合の数字。 バージョン情報 Python 3.7.1 単純にループしながらinclement 何も考えずに書く…