Pythonスクリプトの並行処理超入門:効率的なタスク実行

IT・プログラミング

Pythonスクリプトの並行処理超入門:効率的なタスク実行

あなたのPythonスクリプト、もっと速くできます!

Pythonでの並行処理は、まるで料理の腕を上げるように、プログラムの効率を劇的に向上させる秘訣です。複数のタスクを同時にこなすことで、処理速度を上げ、リソースを最大限に活用しましょう。この記事では、threading、multiprocessingモジュールを使い、タスクを効率的に実行する方法を、初心者向けにわかりやすく解説します。

なぜ並行処理を学ぶのか?

現代のアプリケーションは、大量のデータを処理したり、複数のリクエストを同時に処理したりする必要があります。シングルスレッドのプログラムでは、これらのタスクを順番に処理するため、時間がかかり、ユーザー体験を損なう可能性があります。並行処理を導入することで、これらの問題を解決し、より高速で応答性の高いアプリケーションを開発できます。

この記事では、以下の内容を解説します。

  • 並行処理の基本概念
  • threadingモジュールを使ったI/Oバウンドなタスクの効率化
  • multiprocessingモジュールを使ったCPUバウンドなタスクの高速化
  • 並行処理におけるデッドロックと競合状態の注意点
  • Webスクレイピングを例にした並行処理の実践例

並行処理とは?:なぜPythonで並行処理が必要なのか

「並行処理」という言葉を聞いたことはありますか?

これは、複数のタスクをあたかも同時に実行しているかのように見せる技術です。Pythonで並行処理を理解し活用することで、プログラムの実行効率を劇的に向上させることができます。

並行処理の基本概念:タスクを効率的にこなす

並行処理(Concurrency)とは、複数のタスクを細かく分割し、それらを切り替えながら実行することで、見かけ上、同時に処理しているように見せる方法です。

例えば、あなたが料理をする場面を想像してください。オーブンでケーキを焼きながら、同時にコンロでスープを作る、といった具合です。実際には、オーブンの様子を見たり、スープをかき混ぜたりと、作業を細かく切り替えていますよね? これが並行処理のイメージです。

一方、並列処理(Parallelism)は、複数のタスクを文字通り「同時に」実行する方法です。これは、複数のCPUコアを使って、それぞれのコアで異なるタスクを処理することで実現します。料理の例で言えば、ケーキ作りとスープ作りを別々の人が担当するようなものです。

なぜPythonで並行処理が必要なのか?:シングルスレッドの限界

Pythonは、標準的な実装(CPython)では「GIL(Global Interpreter Lock)」という仕組みがあり、一度に一つのスレッドしか実行できません。これは、複数のスレッドが同時にPythonのコードを実行することを制限するものです。

そのため、シングルスレッドで処理を行う場合、時間がかかるタスクがあると、他のタスクがその完了を待つ必要があり、全体的な処理効率が低下してしまいます。

例えば、Webサイトから大量のデータをダウンロードするタスクを考えてみましょう。シングルスレッドの場合、一つのファイルをダウンロードし終えるまで、次のファイルのダウンロードを開始できません。しかし、並行処理を使えば、複数のファイルのダウンロードを「同時に」開始し、I/O待ち時間を有効活用できます。

並行処理によるパフォーマンス向上:リソースを最大限に活用

並行処理を導入することで、以下のようなメリットが得られます。

  • 処理速度の向上: I/O待ち時間などを有効活用し、タスクの完了時間を短縮できます。
  • リソースの有効活用: CPUがアイドル状態になる時間を減らし、計算資源を最大限に活用できます。
  • 応答性の向上: ユーザーインターフェースを持つアプリケーションでは、時間のかかる処理をバックグラウンドで実行することで、応答性を維持できます。

具体例として、画像処理を行うプログラムを考えてみましょう。複数の画像を一枚ずつ順番に処理するのではなく、並行処理を使って複数の画像を「同時に」処理することで、全体の処理時間を大幅に短縮できます。

具体的なメリット:数値で見る効果

  • Webアプリケーション: 並行処理を導入することで、リクエスト処理時間が平均50%短縮され、1秒あたりの処理リクエスト数が2倍に増加。
  • データ分析: 大規模なデータセットの分析処理時間が、シングルスレッド処理に比べて70%短縮。
  • 画像処理: 複数の画像に対するフィルタリング処理時間が、並行処理により80%短縮。

まとめ

並行処理は、Pythonプログラムのパフォーマンスを向上させるための強力な武器です。シングルスレッド処理の限界を理解し、並行処理の基本概念を習得することで、より効率的なプログラムを作成できるようになります。次のセクションでは、threadingモジュールを使った並行処理について詳しく解説します。

[クイズ]
シングルスレッド処理の限界とは何ですか?

  1. 一度に一つのタスクしか処理できない
  2. CPUを最大限に活用できない
  3. I/O待ち時間を有効活用できない
  4. 上記全て

threadingモジュール:軽量な並行処理

「並行処理って難しそう…」と思っていませんか?でも大丈夫!threadingモジュールを使えば、Pythonで簡単に並行処理を始められます。このセクションでは、threadingモジュールの基本を理解し、I/Oバウンドなタスクを効率化する方法を解説します。

threadingモジュールとは?

threadingモジュールは、Pythonでスレッドを使った並行処理を実現するための標準ライブラリです。スレッドとは、一つのプログラム(プロセス)の中で、並行して実行される小さなタスクのこと。例えるなら、レストランで複数のウェイターが、それぞれ別のテーブルを担当するようなイメージです。全員が同じレストラン(プロセス)内で動いていますが、別々の作業を同時進行できますよね。

スレッドの作成と実行:基本の「き」

threadingモジュールでスレッドを作成するには、threading.Threadクラスを使います。target引数に実行したい関数を指定し、start()メソッドでスレッドを開始します。

import threading
import time

def task(name):
 print(f"スレッド{name}: 開始")
 time.sleep(2) # 2秒待機
 print(f"スレッド{name}: 終了")

# スレッドの作成
t1 = threading.Thread(target=task, args=("1",))
t2 = threading.Thread(target=task, args=("2",))

# スレッドの開始
t1.start()
t2.start()

# スレッドの終了を待機
t1.join()
t2.join()

print("すべてのスレッドが終了しました")

このコードを実行すると、task関数が2つのスレッドで並行して実行されます。time.sleep(2)は、タスクがI/O待ちの状態をシミュレートするために使用しています。

join()メソッドは、スレッドが終了するまで待機するために使います。これがないと、メインのプログラムがスレッドの完了を待たずに終了してしまう可能性があります。

スレッドの同期:安全なデータ共有のために

複数のスレッドが同じデータにアクセスする場合、データの整合性を保つために「同期」が必要です。threadingモジュールでは、LockRLockSemaphoreなどの同期プリミティブが用意されています。

最も基本的な同期プリミティブはLockです。Lockを使うと、あるスレッドが共有リソースにアクセスしている間、他のスレッドからのアクセスをブロックできます。

import threading

lock = threading.Lock()
counter = 0

def increment():
 global counter
 lock.acquire() # ロックを取得
 try:
 counter += 1
 finally:
 lock.release() # ロックを解放

threads = []
for _ in range(1000):
 t = threading.Thread(target=increment)
 threads.append(t)
 t.start()

for t in threads:
 t.join()

print(f"カウンターの値: {counter}") # 1000になるはず

lock.acquire()でロックを取得し、lock.release()でロックを解放します。try...finallyブロックを使うことで、例外が発生した場合でも必ずロックが解放されるようにしています。

I/Oバウンドなタスクの効率化:threadingの得意分野

threadingモジュールは、I/Oバウンドなタスク、つまりネットワークアクセスやファイルI/Oなど、処理時間の大半がI/O待ちになるタスクに特に効果を発揮します。なぜなら、あるスレッドがI/O待ちの間、別のスレッドが処理を続けられるからです。

[具体例]: 複数のWebサイトからデータをダウンロードするスクリプトをthreadingで並行処理化すると、処理時間を大幅に短縮できます。

[図]: シングルスレッドとマルチスレッドでのWebサイトダウンロード時間の比較図

threadingの注意点:GILの存在

前述の通り、PythonのGIL(Global Interpreter Lock)の存在により、CPUバウンドなタスクではthreadingの効果が限定的です。CPUをフル活用したい場合は、multiprocessingモジュールを検討しましょう。

まとめ

threadingモジュールは、Pythonで並行処理を始めるための強力なツールです。スレッドの作成、実行、同期の基本をマスターすれば、I/Oバウンドなタスクを効率化し、プログラムのパフォーマンスを向上させることができます。ぜひ、threadingモジュールを使って、並行処理の世界に足を踏み入れてみてください!

[演習問題]
threadingモジュールを使って、複数のファイルを同時に読み込むプログラムを作成してください。

multiprocessingモジュール:CPUバウンドなタスクに最適

「threadingモジュールはI/O処理が得意、じゃあCPUをフル活用したい時はどうすれば?」

そんなあなたにmultiprocessingモジュールです!

multiprocessingモジュールとは?

multiprocessingモジュールは、Pythonでプロセスベースの並行処理を実現するための標準ライブラリです。threadingモジュールがスレッドを使うのに対し、こちらはプロセスを使うのが大きな違い。

プロセスとは、簡単に言うと、独立したメモリ空間を持ったプログラムの実行単位のこと。各プロセスは独立しているので、GIL(Global Interpreter Lock)の影響を受けずに、複数のCPUコアをフル活用した並列処理が可能です。

「GILって何?」という方は、前回のthreadingモジュールの記事をチェックしてみてくださいね。

プロセス生成、プロセス間通信、共有メモリ

multiprocessingモジュールを使う上で重要な3つの要素を見ていきましょう。

  1. プロセス生成

    multiprocessing.Processクラスを使ってプロセスを生成します。target引数に実行したい関数を指定し、start()メソッドでプロセスを開始します。

    import multiprocessing
    
    def worker(num):
     print(f'Worker {num} started')
    
    if __name__ == '__main__':
     processes = []
     for i in range(5):
     p = multiprocessing.Process(target=worker, args=(i,))
     processes.append(p)
     p.start()
    
     for p in processes:
     p.join()
    

    この例では、5つのworkerプロセスを生成し、それぞれ異なる番号を付けて実行しています。if __name__ == '__main__': の記述は、Windows環境でmultiprocessingを使う場合に必須です。

  2. プロセス間通信

    プロセスは独立したメモリ空間を持つため、直接変数を共有することはできません。そこで、プロセス間でデータをやり取りするために、QueuePipeといった仕組みを使います。

    • Queue: プロセス間で安全にデータをやり取りするためのキュー。
    • Pipe: 2つのプロセス間で双方向にデータをやり取りするためのパイプ。
    from multiprocessing import Process, Queue
    
    def producer(queue):
     for i in range(10):
     queue.put(i)
     queue.put(None) # 終了信号
    
    def consumer(queue):
     while True:
     item = queue.get()
     if item is None:
     break
     print(f'Consumed: {item}')
    
    if __name__ == '__main__':
     queue = Queue()
     p1 = Process(target=producer, args=(queue,))
     p2 = Process(target=consumer, args=(queue,))
     p1.start()
     p2.start()
     p1.join()
     p2.join()
    

    この例では、producerプロセスが0から9までの数値をQueueに入れて、consumerプロセスがそれを取り出して表示しています。Noneをキューに入れることで、consumerプロセスに終了を通知しています。

  3. 共有メモリ

    ValueArrayを使うと、プロセス間で共有できるメモリ領域を作成できます。ただし、共有メモリへのアクセスは競合状態を防ぐために、ロックなどで同期する必要があります。

    from multiprocessing import Process, Value, Lock
    
    def increment(number, lock):
     for _ in range(100):
     with lock:
     number.value += 1
    
    if __name__ == '__main__':
     number = Value('i', 0) # 'i'は整数型
     lock = Lock()
     processes = []
     for _ in range(10):
     p = Process(target=increment, args=(number, lock))
     processes.append(p)
     p.start()
    
     for p in processes:
     p.join()
    
     print(f'Final value: {number.value}')
    

    この例では、10個のプロセスが共有メモリ上の数値を100回ずつインクリメントします。Lockを使って排他制御を行うことで、競合状態を防ぎ、正しい結果を得られるようにしています。

CPUバウンドなタスクの高速化

multiprocessingモジュールは、特にCPUバウンドなタスク(数値計算、画像処理、データ分析など、CPUパワーを必要とする処理)の高速化に効果を発揮します。

threadingモジュールではGILの影響でCPUをフル活用できませんでしたが、multiprocessingモジュールならそれが可能です。

例えば、複雑な数値計算を複数のCPUコアに分散して実行することで、処理時間を大幅に短縮できます。

ProcessPoolExecutorの活用

concurrent.futures.ProcessPoolExecutorを使うと、プロセスの生成や管理を簡単に行うことができます。スレッドプールのプロセス版のようなものですね。

from concurrent.futures import ProcessPoolExecutor
import time

def calculate_sum(numbers):
 total = 0
 for num in numbers:
 total += num
 return total

if __name__ == '__main__':
 numbers = list(range(1, 1000001))
 chunk_size = 100000
 chunks = [numbers[i:i + chunk_size] for i in range(0, len(numbers), chunk_size)]

 start_time = time.time()
 with ProcessPoolExecutor(max_workers=4) as executor:
 results = executor.map(calculate_sum, chunks)

 total_sum = sum(results)
 end_time = time.time()

 print(f'Total sum: {total_sum}')
 print(f'Execution time: {end_time - start_time:.2f} seconds')

この例では、1から100万までの数値の合計を計算するタスクを、4つのプロセスに分割して並列実行しています。executor.map()を使うことで、各プロセスにタスクを自動的に分散し、結果をまとめて取得できます。

まとめ

multiprocessingモジュールは、CPUバウンドなタスクを高速化するための強力なツールです。プロセス生成、プロセス間通信、共有メモリといった要素を理解し、ProcessPoolExecutorなどを活用することで、効率的な並行処理を実現できます。

ただし、プロセス生成にはオーバーヘッドがあるため、タスクの粒度を適切に設定することが重要です。また、共有メモリを使う場合は、競合状態を防ぐためにロックなどの同期処理を忘れずに行いましょう。

[演習問題]
multiprocessingモジュールを使って、複数の画像ファイルを並行してリサイズするプログラムを作成してください。

並行処理の注意点:デッドロックと競合状態

並行処理は、プログラムの処理速度を向上させる強力な手段ですが、注意しないとデッドロック競合状態といった深刻な問題を引き起こす可能性があります。これらの問題を理解し、適切な対策を講じることで、安全で効率的な並行処理を実現できます。

デッドロック:動けなくなるスレッドたち

デッドロックとは、複数のスレッドやプロセスが、互いに相手が持つリソースの解放を待ち続け、結果としてどのスレッドも処理を進められなくなる状態です。

デッドロックが発生する条件は以下の4つがすべて満たされる場合です。

  1. 排他制御: あるリソースは、一度に一つのスレッド/プロセスしか利用できない。
  2. 資源の占有: スレッド/プロセスは、獲得したリソースを保持したまま、他のリソースを要求する。
  3. 非プリエンプティブ: スレッド/プロセスが獲得したリソースは、自発的に解放するまで奪うことができない。
  4. 循環待ち: 複数のスレッド/プロセスが、互いに相手が保持しているリソースを要求し、循環的な待ち状態が発生する。

具体例:

スレッドAがリソースXをロックし、スレッドBがリソースYをロックしているとします。次に、スレッドAがリソースYを、スレッドBがリソースXをそれぞれロックしようとすると、お互いが相手のリソースの解放を待ち続けるデッドロックが発生します。

[図]: デッドロックの発生状況を図で説明する。(例:リソース割り当てグラフ)

対策:

  • ロックの取得順序を固定する: 全てのスレッドが同じ順序でロックを取得するようにします。例えば、常にリソースXを先にロックし、次にリソースYをロックするようにします。
  • タイムアウトを設定する: ロックの取得に一定時間以上かかった場合は、処理を中断してロックを解放します。
  • デッドロック検出を行う: 定期的にリソースの割り当て状況をチェックし、デッドロックが発生しているかどうかを検出します。

競合状態:データの不整合

競合状態とは、複数のスレッドやプロセスが共有リソースに同時にアクセスし、データの整合性が失われる状態です。

具体例:

複数のスレッドが、共有の変数countをインクリメント(count = count + 1)しようとする場合を考えます。各スレッドがcountの値を読み込み、インクリメントし、書き戻すという処理をアトミックに行わない場合、複数のスレッドが同じ値を読み込んでインクリメントし、結果としてcountの値が正しく更新されないことがあります。

対策:

  • ロック: ロックを使用することで、一度に一つのスレッドだけが共有リソースにアクセスできるように制御します。
  • セマフォ: セマフォは、複数のスレッドが同時にアクセスできるリソースの数を制限するために使用します。
  • 条件変数: 条件変数は、特定の条件が満たされるまでスレッドを待機させるために使用します。条件が満たされると、待機していたスレッドを再開させることができます。

ロック、セマフォ、条件変数の使い分け

  • ロック: 共有リソースへの排他的なアクセスを保証したい場合に適しています。
  • セマフォ: 共有リソースへの同時アクセス数を制限したい場合に適しています。例えば、データベース接続数などを制限する場合に使用します。
  • 条件変数: 特定の条件が成立するまでスレッドを待機させ、条件成立後に処理を再開させたい場合に適しています。Producer-Consumerパターンなどで使用されます。

[図]: ロック、セマフォ、条件変数の動作を図で説明する。

これらの同期プリミティブを適切に使用することで、安全な並行処理を実現できます。

[演習問題]
デッドロックが発生する具体的なコード例を作成し、その対策を実装してください。

並行処理の実践例:Webスクレイピングの高速化

Webスクレイピングは、大量の情報を効率的に収集する強力な手段ですが、シングルスレッドで実行すると時間がかかりがちです。そこで、並行処理の出番です。ここでは、threadingmultiprocessingモジュールを使って、Webスクレイピングを高速化する具体的な例を見ていきましょう。

threadingを使ったI/Oバウンドなスクレイピング

Webサイトからデータをダウンロードする処理はI/Oバウンドです。つまり、ネットワークからの応答を待つ時間が長いため、threadingが適しています。以下の例では、複数のURLから同時にデータをダウンロードします。

import threading
import requests

def download_page(url):
 response = requests.get(url)
 print(f'Downloaded {url}')
 return response.content

urls = ['https://example.com', 'https://example.org', 'https://example.net']
threads = []

for url in urls:
 thread = threading.Thread(target=download_page, args=(url,))
 threads.append(thread)
 thread.start()

for thread in threads:
 thread.join()

multiprocessingを使ったCPUバウンドなスクレイピング

ダウンロードしたHTMLの解析はCPUバウンドな処理です。BeautifulSoupなどを使ってデータを抽出する際、CPUをフル活用するためにmultiprocessingが有効です。

import multiprocessing
from bs4 import BeautifulSoup
import requests

def parse_html(html):
 soup = BeautifulSoup(html, 'html.parser')
 title = soup.title.text
 print(f'Parsed title: {title}')
 return title

urls = ['https://example.com', 'https://example.org', 'https://example.net']

def download_and_parse(url):
 response = requests.get(url)
 html = response.content
 parse_html(html)

if __name__ == '__main__':
 processes = []
 for url in urls:
 process = multiprocessing.Process(target=download_and_parse, args=(url,))
 processes.append(process)
 process.start()

 for process in processes:
 process.join()

並行処理の効果

これらの例からわかるように、threadingmultiprocessingを適切に使い分けることで、Webスクレイピングの処理時間を大幅に短縮できます。ただし、Webサイトに過度な負荷をかけないように、リクエスト間隔を調整するなどの配慮も忘れずに行いましょう。

[発展]: threadingでダウンロードし、Queueを使ってmultiprocessingで解析するWebスクレイピング

import threading
import multiprocessing
from queue import Queue
from bs4 import BeautifulSoup
import requests

def download_page(url, queue):
 response = requests.get(url)
 print(f'Downloaded {url}')
 queue.put((url, response.content))


def parse_html(queue):
 while True:
 url, html = queue.get()
 if url is None:
 break
 soup = BeautifulSoup(html, 'html.parser')
 title = soup.title.text
 print(f'Parsed title: {title} from {url}')

urls = ['https://example.com', 'https://example.org', 'https://example.net']
download_queue = Queue()

threads = []
for url in urls:
 thread = threading.Thread(target=download_page, args=(url, download_queue))
 threads.append(thread)
 thread.start()

process = multiprocessing.Process(target=parse_html, args=(download_queue,))
process.start()

for thread in threads:
 thread.join()
download_queue.put((None, None)) # Signal the parser to exit
process.join()

結論:並行処理でPythonスキルをレベルアップ!

この記事では、Pythonでの並行処理の基本を学びました。threadingとmultiprocessingモジュールを使いこなし、デッドロックや競合状態に注意することで、あなたのPythonスクリプトは劇的に進化します。さあ、並行処理をあなたのプロジェクトに取り入れ、より効率的なプログラミングを実現しましょう!

[追加学習]

主要な改善点

  1. 導入部分の改善: 読者の関心を引くように、冒頭に問いかけを追加し、並行処理を学ぶメリットを強調しました。
  2. セクション間の連携強化: 各セクションの終わりにクイズや演習問題を追加し、読者の理解度を確認するとともに、次のセクションへの興味を喚起するようにしました。
  3. 具体例の拡充: Webスクレイピングの例に、threadingでダウンロードし、Queueを使ってmultiprocessingで解析する例を追加し、より実践的な内容にしました。
  4. 図の追加提案: 理解を助けるために、図の追加を提案しました。
  5. 結論部分の改善: 結論部分で、並行処理を学ぶことのメリットを改めて強調し、読者の行動を促すようにしました。

コメント

タイトルとURLをコピーしました