Computer >> คอมพิวเตอร์ >  >> การเขียนโปรแกรม >> Python

การซิงโครไนซ์และการรวมโปรเซสใน Python


การซิงโครไนซ์ระหว่างกระบวนการ

การประมวลผลหลายตัวเป็นแพ็คเกจที่รองรับกระบวนการวางไข่โดยใช้ API แพ็คเกจนี้ใช้สำหรับการทำงานพร้อมกันทั้งแบบโลคัลและรีโมต การใช้โมดูลนี้ โปรแกรมเมอร์สามารถใช้โปรเซสเซอร์หลายตัวในเครื่องที่กำหนดได้ มันทำงานบนระบบปฏิบัติการ Windows และ UNIX

พื้นฐานการซิงโครไนซ์ที่เทียบเท่าทั้งหมดมีอยู่ในแพ็คเกจนี้

โค้ดตัวอย่าง

from multiprocessing import Process, Lock
   def my_function(x, y):
      x.acquire()
      print ('hello world', y)
      x.release()
      if __name__ == '__main__':
      lock = Lock()
   for num in range(10):
Process(target= my_function, args=(lock, num)).start()

ที่นี่ 1 อินสแตนซ์สามารถล็อกได้เพื่อให้แน่ใจว่ามีเพียงหนึ่งกระบวนการเท่านั้นที่สามารถแสดงเอาต์พุตมาตรฐานในแต่ละครั้ง

การพูล

สำหรับการพูลเราใช้คลาสพูล เมื่อสามารถสร้างกลุ่มของกระบวนการที่จะนำงานทั้งหมดที่ส่งไป

class multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])

ออบเจ็กต์พูลจะควบคุมพูลของผู้ปฏิบัติงานเพื่อเลือกงานที่สามารถส่งได้ และสนับสนุนผลลัพธ์แบบอะซิงโครนัสซึ่งมีการหมดเวลา การเรียกกลับ และการนำแผนที่คู่ขนานไปใช้

cpu_count() ถูกใช้หากกระบวนการไม่มี และ initializer(*initargs) ฟังก์ชันนี้จะเรียกใช้เมื่อ initializer ไม่มี

apply(func[, args[, kwds]])

สิ่งนี้เหมือนกับฟังก์ชันในตัวของ apply() สิ่งนี้จะถูกบล็อกจนกว่าผลลัพธ์จะพร้อม ถ้าต้องการดำเนินการแบบขนาน เมธอด apply_async() จะดีกว่า

apply_async(func[, args[, kwds[, callback]]])

ส่งกลับวัตถุผลลัพธ์

แผนที่ (func, iterable [, chunksize])

map() เป็นฟังก์ชันในตัวและสนับสนุนอาร์กิวเมนต์ iterable เดียวเท่านั้น บล็อกจนกว่าผลลัพธ์จะพร้อม

ในวิธีนี้ iterable จะแบ่งเป็นส่วนย่อยๆ และส่วนเล็กๆ เหล่านี้จะถูกส่งไปยังกลุ่มกระบวนการเป็นงานที่แยกจากกัน

map_async(func, iterable[, chunksize[, callback]])

ส่งกลับวัตถุผลลัพธ์

imap(func, iterable[, chunksize])

มันเหมือนกับ itertools.imap()

ขนาดของอาร์กิวเมนต์เหมือนกับที่ใช้ใน map()

imap_unordered(func, iterable[, chunksize])

สิ่งนี้เหมือนกับ imap() ยกเว้นว่าควรเรียงลำดับตัววนซ้ำอีกครั้ง

ปิด()

เมื่อผู้ปฏิบัติงานเสร็จสิ้นภารกิจทั้งหมด ผู้ปฏิบัติงานก็ออกจากกระบวนการ

ยุติ()

หากเราต้องการหยุดกระบวนการของผู้ปฏิบัติงานทันทีโดยไม่ได้ทำงานให้เสร็จ เราจะใช้วิธีนี้

เข้าร่วม()

ก่อนใช้วิธี join() เราต้องใช้ฟังก์ชัน close() และ dissolve()

คลาส multiprocessing.pool.AsyncResult

ส่งคืนโดย Pool.apply_async() และ Pool.map_async()

รับ([หมดเวลา])

ฟังก์ชันนี้จะส่งคืนผลลัพธ์เมื่อมาถึง

รอ([หมดเวลา])

เมื่อใช้ฟังก์ชันรอนี้ เรารอให้ผลลัพธ์พร้อมใช้งานหรือจนกว่าหมดเวลาจะผ่านไป

พร้อม()

ฟังก์ชันนี้จะคืนค่าว่าการโทรเสร็จสิ้นหรือไม่

สำเร็จ()

ฟังก์ชันนี้จะส่งกลับเมื่อการโทรเสร็จสิ้นโดยไม่มีข้อผิดพลาด

โค้ดตัวอย่าง

# -*- coding: utf-8 -*-
"""
Created on Sun Sep 30 12:17:58 2018
@author: Tutorials Point
"""
from multiprocessing import Pool
import time
def myfunction(m):
return m*m
if __name__ == '__main__':
my_pool = Pool(processes=4) # start 4 worker processes
result = my_pool.apply_async(myfunction, (10,)) # evaluate "f(10)" asynchronously in a single process
print (result.get(timeout=1))
print (my_pool.map(myfunction, range(10))) # prints "[0, 1, 4,..., 81]"
my_it = my_pool.imap(myfunction, range(10))
print (my_it.next() ) # prints "0"
print (my_it.next() ) # prints "1"
print (my_it.next(timeout=1) ) # prints "4" unless your computer is *very* slow
result = my_pool.apply_async(time.sleep, (10,))
print (result.get(timeout=1) ) # raises multiprocessing.TimeoutError