About me
home
Portfolio
home
πŸͺ›

Process module

λ‚ μ§œ
2023/07/19
νƒœκ·Έ
파이썬
이둠

from multiprocessing import Process

multiprocessing λͺ¨λ“ˆμ€ νŒŒμ΄μ¬μ—μ„œ 병렬 처리λ₯Ό μœ„ν•΄ μ œκ³΅ν•˜λŠ” λͺ¨λ“ˆ 쀑 ν•˜λ‚˜λ‘œ, λ³„λ„μ˜ ν”„λ‘œμ„ΈμŠ€λ₯Ό μƒμ„±ν•˜κ³  관리할 수 μžˆλŠ” κΈ°λŠ₯을 μ œκ³΅ν•œλ‹€. 이 λͺ¨λ“ˆμ€ 특히 CPU bound μž‘μ—…μ— μ ν•©ν•˜λ©°, Global Interpreter Lock (GIL) λ•Œλ¬Έμ— λ©€ν‹°μŠ€λ ˆλ”©μ΄ 효과λ₯Ό 보이지 λͺ»ν•˜λŠ” μƒν™©μ—μ„œ μ‚¬μš©ν•˜λ©΄ μ’‹λ‹€.
Process ν΄λž˜μŠ€λŠ” 각각 독립적인 ν”„λ‘œμ„ΈμŠ€λ₯Ό λ‚˜νƒ€λ‚΄λŠ” ν΄λž˜μŠ€μ΄λ‹€. Process μΈμŠ€ν„΄μŠ€λ₯Ό μƒμ„±ν•˜κ³ , 이λ₯Ό μ‹œμž‘ν•˜λŠ” κ²ƒμœΌλ‘œ μƒˆλ‘œμš΄ ν”„λ‘œμ„ΈμŠ€λ₯Ό μƒμ„±ν•˜κ³  μ‹€ν–‰ν•  수 μžˆλ‹€.
from multiprocessing import Process def print_func(num): for i in range(num): print("Print number %d" % i) if __name__ == "__main__": p = Process(target=print_func, args=(5,)) # Process μΈμŠ€ν„΄μŠ€ 생성 p.start() # μƒˆλ‘œμš΄ ν”„λ‘œμ„ΈμŠ€ μ‹œμž‘ p.join() # ν”„λ‘œμ„ΈμŠ€μ˜ μ’…λ£Œλ₯Ό κΈ°λ‹€λ¦Ό
Python
볡사
이 μ½”λ“œλŠ” μƒˆλ‘œμš΄ ν”„λ‘œμ„ΈμŠ€λ₯Ό μƒμ„±ν•˜μ—¬ print_func ν•¨μˆ˜λ₯Ό μ‹€ν–‰ν•œλ‹€. 이 ν•¨μˆ˜λŠ” 전달받은 수만큼 숫자λ₯Ό 좜λ ₯ν•œλ‹€. 이처럼 multiprocessing λͺ¨λ“ˆκ³Ό Process 클래슀λ₯Ό μ‚¬μš©ν•˜λ©΄, μ—¬λŸ¬ ν”„λ‘œμ„ΈμŠ€λ₯Ό λ™μ‹œμ— μ‹€ν–‰ν•˜λŠ” 병렬 처리λ₯Ό μ‰½κ²Œ κ΅¬ν˜„ν•  수 μžˆλ‹€.
multiprocessing.Process 클래슀 μ£Όμš” 인자
β€’
target: ν”„λ‘œμ„ΈμŠ€μ— μ˜ν•΄ 싀행될 ν•¨μˆ˜μ΄λ‹€. 이 ν•¨μˆ˜λŠ” λ³„λ„μ˜ ν”„λ‘œμ„ΈμŠ€μ—μ„œ μ‹€ν–‰λ˜λ©°, ν•¨μˆ˜μ˜ μ΄λ¦„λ§Œ 전달해야 ν•©λ‹ˆλ‹€ (κ΄„ν˜Έ 없이).
β€’
args: target ν•¨μˆ˜μ— 전달될 μΈμžλ“€μ˜ νŠœν”Œμž…λ‹ˆλ‹€. ν•¨μˆ˜κ°€ 인자λ₯Ό ν•„μš”λ‘œ ν•˜μ§€ μ•ŠλŠ” κ²½μš°μ—λ„ 이 μΈμžλŠ” 제곡될 수 있으며, 이 경우 빈 νŠœν”Œμ„ μ‚¬μš©ν•œλ‹€.
β€’
kwargs: target ν•¨μˆ˜μ— 전달될 ν‚€μ›Œλ“œ μΈμžλ“€μ˜ 사전(dict)이닀.
β€’
name: ν”„λ‘œμ„ΈμŠ€μ˜ 이름이닀. 이 μΈμžλŠ” 선택적이며, 이름이 μ œκ³΅λ˜μ§€ μ•ŠλŠ” 경우 μžλ™μœΌλ‘œ "Process-N"κ³Ό 같은 ν˜•νƒœμ˜ 이름이 ν• λ‹Ήλœλ‹€.
β€’
daemon: 이 μΈμžκ°€ True둜 μ„€μ •λ˜λ©΄, ν•΄λ‹Ή ν”„λ‘œμ„ΈμŠ€λŠ” 데λͺ¬ ν”„λ‘œμ„ΈμŠ€λ‘œ μ„€μ •λœλ‹€. 데λͺ¬ ν”„λ‘œμ„ΈμŠ€λŠ” λΆ€λͺ¨ ν”„λ‘œμ„ΈμŠ€κ°€ μ’…λ£Œλ˜λ©΄ μžλ™μœΌλ‘œ μ’…λ£Œλ˜λŠ” ν”„λ‘œμ„ΈμŠ€μ΄λ‹€.

Value

μ•„λž˜ μ½”λ“œλŠ” ν”„λ‘œμ„ΈμŠ€ κ°„ 곡유 λ©”λͺ¨λ¦¬(Value)λ₯Ό κ΅¬ν˜„ν•œ 예제
# ν”„λ‘œμ„ΈμŠ€ λ©”λͺ¨λ¦¬ 곡유 예제(곡유 성곡!) from multiprocessing import Process, current_process, Value import os # μ‹€ν–‰ ν•¨μˆ˜ def generate_update_number(v : int): for i in range(50): v.value += 1 print(current_process().name, "data", v.value) def main(): # λΆ€λͺ¨ ν”„λ‘œμ„ΈμŠ€ 아이디 parent_process_id = os.getpid() # 좜λ ₯ print(f"Parent process ID {parent_process_id}") # ν”„λ‘œμ„ΈμŠ€ 리슀트 μ„ μ–Έ processes = list() # ν”„λ‘œμ„ΈμŠ€ λ©”λͺ¨λ¦¬ 곡유 λ³€μˆ˜ μ„ μ–Έ share_value = Value('i', 0) # iλŠ” μ •μˆ˜ν˜•μ„ 의미, 0은 μ΄ˆκΈ°κ°’ for _ in range(1,10): # 생성 p = Process(target=generate_update_number, args=(share_value,)) # 배열에 λ‹΄κΈ° processes.append(p) # μ‹€ν–‰ p.start() # Join for p in processes: p.join() # μ΅œμ’… ν”„λ‘œμ„ΈμŠ€ λΆ€λͺ¨ λ³€μˆ˜ 확인 print("Final Data(share_value) in parent process", share_value.value) if __name__ == '__main__': main()
Python
볡사
main() ν•¨μˆ˜μ—μ„œ μš°μ„  processes λ¦¬μŠ€νŠΈμ— μžμ‹ ν”„λ‘œμ„ΈμŠ€λ“€μ„ λ‹΄κ³ , 이듀을 μ‹€ν–‰μ‹œν‚¨λ‹€.
generate_update_number() ν•¨μˆ˜μ—μ„œλŠ”, 전달받은 Value 객체에 λŒ€ν•΄ 50번의 λ°˜λ³΅μ„ μˆ˜ν–‰ν•˜λ©°, ν•΄λ‹Ή 객체의 값을 1μ”© 더해쀀닀. 이후, ν˜„μž¬ ν”„λ‘œμ„ΈμŠ€μ˜ 이름과 ν•΄λ‹Ή 객체의 κ°’ 증가 κ²°κ³Όλ₯Ό 좜λ ₯ν•œλ‹€.
λ§ˆμ§€λ§‰μ— join() ν•¨μˆ˜λ₯Ό μ΄μš©ν•˜μ—¬ μžμ‹ ν”„λ‘œμ„ΈμŠ€λ“€μ˜ 싀행이 μ’…λ£Œλ  λ•ŒκΉŒμ§€ λŒ€κΈ°ν•˜κ³ , μ΅œμ’…μ μœΌλ‘œ share_value 객체의 μ΅œμ’… 값을 좜λ ₯ν•œλ‹€.
ν”„λ‘œμ„ΈμŠ€ λ©”λͺ¨λ¦¬ 곡유(μ‹€νŒ¨).py
1.0KB
dataκ°€ κ³΅μœ λ˜μ§€ λͺ»ν•œ 것을 확인할 수 μžˆλ‹€.
ν”„λ‘œμ„ΈμŠ€ λ©”λͺ¨λ¦¬ 곡유(성곡).py
1.0KB
dataκ°€ λˆ„μ  λ§μ…ˆλ˜μ–΄ 450이 좜λ ₯된 것을 확인할 수 μžˆλ‹€.

Pipe

파이썬의 multiprocessing λͺ¨λ“ˆμ˜ Pipe ν•¨μˆ˜λŠ” 두 ν”„λ‘œμ„ΈμŠ€ 간에 μ–‘λ°©ν–₯ 톡신을 κ°€λŠ₯ν•˜κ²Œ ν•˜λŠ” 연결을 μƒμ„±ν•œλ‹€. Pipe ν•¨μˆ˜λŠ” 두 개의 μ—°κ²° 객체λ₯Ό λ°˜ν™˜ν•˜λŠ”λ°, 각 κ°μ²΄λŠ” νŒŒμ΄ν”„μ˜ 두 끝을 λ‚˜νƒ€λ‚Έλ‹€.
# ν”„λ‘œμ„ΈμŠ€ κ°„ 톡신을 μœ„ν•œ νŒŒμ΄ν”„ μ‚¬μš© 예제 from multiprocessing import Process, Pipe # λ©”μ‹œμ§€λ₯Ό λ³΄λ‚΄λŠ” ν•¨μˆ˜ def send_messages(conn, messages): for message in messages: print(f"Sending: {message}") conn.send(message) conn.close() # λ©”μ‹œμ§€λ₯Ό λ°›λŠ” ν•¨μˆ˜ def receive_messages(conn, num_messages): print("Receiving...") for _ in range(num_messages): message = conn.recv() print(f"Received: {message}") if __name__ == '__main__': # νŒŒμ΄ν”„ 생성 parent_conn, child_conn = Pipe() # λ©”μ‹œμ§€ 생성 messages = ["Hello", "World", "!"] # λ©”μ‹œμ§€ 전솑 ν”„λ‘œμ„ΈμŠ€ 생성 p1 = Process(target=send_messages, args=(child_conn, messages)) p1.start() # λ©”μ‹œμ§€ μˆ˜μ‹  ν”„λ‘œμ„ΈμŠ€ 생성 p2 = Process(target=receive_messages, args=(parent_conn, len(messages))) p2.start() # ν”„λ‘œμ„ΈμŠ€κ°€ μ’…λ£Œλ  λ•ŒκΉŒμ§€ κΈ°λ‹€λ¦Ό p1.join() p2.join()
Python
볡사
ν”„λ‘œμ„ΈμŠ€ Pipe.py
1.0KB
send_messages() ν•¨μˆ˜λŠ” messages λ¦¬μŠ€νŠΈμ— μžˆλŠ” λ©”μ‹œμ§€λ₯Ό conn.send() λ©”μ†Œλ“œλ₯Ό μ΄μš©ν•΄ λ³΄λ‚΄λŠ” ν•¨μˆ˜μ΄λ‹€.
receive_messages() ν•¨μˆ˜λŠ” num_messages 횟수만큼 conn.recv() λ©”μ†Œλ“œλ₯Ό μ΄μš©ν•΄ λ©”μ‹œμ§€λ₯Ό λ°›λŠ” ν•¨μˆ˜μ΄λ‹€.
Pipe() ν•¨μˆ˜λ₯Ό μ΄μš©ν•˜μ—¬ parent_connκ³Ό child_conn 객체λ₯Ό μƒμ„±ν•©λ‹ˆλ‹€. 이 κ°μ²΄λŠ” νŒŒμ΄ν”„μ˜ μ–‘μͺ½ 끝을 λ‚˜νƒ€λ‚Έλ‹€.
send_messages() ν•¨μˆ˜μ™€ receive_messages() ν•¨μˆ˜ 각각을 ν”„λ‘œμ„ΈμŠ€λ‘œ μƒμ„±ν•˜μ—¬ μ‹€ν–‰ν•œλ‹€.
join() λ©”μ†Œλ“œλ₯Ό μ΄μš©ν•΄ ν”„λ‘œμ„ΈμŠ€κ°€ μ’…λ£Œλ  λ•ŒκΉŒμ§€ κΈ°λ‹€λ¦° λ’€, ν”„λ‘œμ„ΈμŠ€κ°€ μ’…λ£Œλ˜λ©΄ μ½”λ“œκ°€ μ’…λ£Œλœλ‹€.
μ΄λŸ¬ν•œ λ°©μ‹μœΌλ‘œ, Pipeλ₯Ό μ‚¬μš©ν•˜λ©΄ 두 ν”„λ‘œμ„ΈμŠ€ 간에 데이터λ₯Ό μ‰½κ²Œ κ΅ν™˜ν•  수 μžˆλ‹€.

Queue

multiprocessing λͺ¨λ“ˆμ˜ Queue ν΄λž˜μŠ€λŠ” ν”„λ‘œμ„ΈμŠ€ 간에 μ•ˆμ „ν•˜κ²Œ 데이터λ₯Ό κ΅ν™˜ν•  수 μžˆλŠ” 방법을 μ œκ³΅ν•œλ‹€. Queue ν΄λž˜μŠ€λŠ” FIFO (First In, First Out) 정책을 λ”°λ₯΄λŠ” ν‘œμ€€ 큐 데이터 ꡬ쑰λ₯Ό μ‚¬μš©ν•œλ‹€.
Queue 클래슀의 μ£Όμš” λ©”μ„œλ“œ
β€’
put(item): 큐에 ν•­λͺ©μ„ μΆ”κ°€ν•œλ‹€.
β€’
get(): νμ—μ„œ ν•­λͺ©μ„ μ œκ±°ν•˜κ³  λ°˜ν™˜ν•œλ‹€. 큐가 λΉ„μ–΄ 있으면, ν•­λͺ©μ΄ 도착할 λ•ŒκΉŒμ§€ μ°¨λ‹¨λœλ‹€.
β€’
empty(): 큐가 λΉ„μ–΄ 있으면 Trueλ₯Ό λ°˜ν™˜ν•œλ‹€.
β€’
full(): 큐가 꽉 μ°¨ 있으면 Trueλ₯Ό λ°˜ν™˜ν•œλ‹€.
# ν”„λ‘œμ„ΈμŠ€ κ°„ 톡신을 μœ„ν•œ 큐 from multiprocessing import Process, Queue def func(q, message): q.put(message) if __name__ == '__main__': # 큐 생성 q = Queue() # ν”„λ‘œμ„ΈμŠ€ 생성 a = Process(target=func, args=(q,[42, None, 'hello'])) b = Process(target=func, args=(q,[111, True, 'world'])) c = Process(target=func, args=(q,"wow")) a.start() b.start() print(q.get()) # prints "[42, None, 'hello']" print(q.get()) # prints "[111, True, 'world']" c.start() a.join() b.join() print(q.get()) # prints "wow" c.join()
Python
볡사
ν”„λ‘œμ„ΈμŠ€ Queue.py
0.6KB
Queue κ°μ²΄λŠ” μžλ£Œκ΅¬μ‘°μ—μ„œ λ§ν•˜λŠ” 큐(queue)와 μœ μ‚¬ν•œ κ°œλ…μœΌλ‘œ, 데이터λ₯Ό μ €μž₯ν•  수 μžˆλŠ” 곡간을 μ œκ³΅ν•˜λ©°, 데이터λ₯Ό μ €μž₯ν•  λ•Œμ—λŠ” put() λ©”μ†Œλ“œλ₯Ό μ‚¬μš©ν•˜κ³ , 데이터λ₯Ό μΆ”μΆœν•  λ•Œμ—λŠ” get() λ©”μ†Œλ“œλ₯Ό μ‚¬μš©ν•œλ‹€.
μœ„ μ˜ˆμ œμ—μ„œλŠ” 3개의 ν”„λ‘œμ„ΈμŠ€(a, b, c)κ°€ ν•˜λ‚˜μ˜ Queue 객체(q)λ₯Ό κ³΅μœ ν•˜κ³  μžˆλ‹€. λ¨Όμ € a ν”„λ‘œμ„ΈμŠ€μ™€ b ν”„λ‘œμ„ΈμŠ€κ°€ 각자의 데이터λ₯Ό q에 μ €μž₯ν•˜κ³ , 메인 ν”„λ‘œμ„ΈμŠ€μ—μ„œ 이λ₯Ό μˆœμ„œλŒ€λ‘œ μΆ”μΆœν•˜λ©° 좜λ ₯ν•œλ‹€. λ§ˆμ§€λ§‰μœΌλ‘œ c ν”„λ‘œμ„ΈμŠ€κ°€ q에 데이터λ₯Ό μ €μž₯ν•˜κ³ , 이λ₯Ό 메인 ν”„λ‘œμ„ΈμŠ€μ—μ„œ μΆ”μΆœν•˜λ©° 좜λ ₯ν•œλ‹€.
Queue 객체λ₯Ό μ΄μš©ν•˜λ©΄ λ³΅μž‘ν•œ ν”„λ‘œμ„ΈμŠ€ κ°„ 톡신을 μ‰½κ²Œ κ΅¬ν˜„ν•  수 μžˆλ‹€. λ‹€λ§Œ, put() λ©”μ†Œλ“œμ™€ get() λ©”μ†Œλ“œλŠ” 데이터λ₯Ό μ €μž₯ν•˜κ³  μΆ”μΆœν•  λ•Œ λΈ”λ‘œν‚Ή(blocking) λ°©μ‹μœΌλ‘œ λ™μž‘ν•˜κΈ° λ•Œλ¬Έμ—, ν•œ μͺ½ ν”„λ‘œμ„ΈμŠ€κ°€ 데이터λ₯Ό μΆ”κ°€ν•˜κ³  μžˆμ„ λ•Œ λ‹€λ₯Έ μͺ½ ν”„λ‘œμ„ΈμŠ€κ°€ 데이터λ₯Ό μΆ”μΆœν•˜λ €κ³  ν•˜λ©΄, ν•΄λ‹Ή ν”„λ‘œμ„ΈμŠ€λŠ” λΈ”λ‘œν‚Ήλ˜μ–΄ λŒ€κΈ°ν•˜κ²Œ λœλ‹€.

Next β†’ logging