Computer >> คอมพิวเตอร์ >  >> การเขียนโปรแกรม >> Ruby

Concurrency Deep Dive:หลายกระบวนการ

ในบทความ Ruby Magic ก่อนหน้านี้เกี่ยวกับ Mastering Concurrency เราได้แนะนำวิธีการสามวิธีในการบรรลุการทำงานพร้อมกันซึ่งมีให้บริการสำหรับเราในฐานะนักพัฒนา Ruby บทความนี้เป็นบทความแรกในซีรีส์สามตอนที่เราเจาะลึกข้อมูลแต่ละวิธี

อันดับแรก:หลายกระบวนการ . ด้วยวิธีนี้ กระบวนการหลักจะแยกตัวเองไปยังกระบวนการของผู้ปฏิบัติงานหลายคน กระบวนการของผู้ปฏิบัติงานทำงานจริง ในขณะที่ต้นแบบจัดการผู้ปฏิบัติงาน

ซอร์สโค้ดแบบเต็มที่ใช้ในตัวอย่างในบทความนี้มีอยู่ใน GitHub ดังนั้นคุณจึงทดลองใช้งานได้ด้วยตนเอง

มาสร้างระบบแชทกันเถอะ!

การสร้างระบบแชทเป็นวิธีที่ดีในการดำดิ่งสู่การทำงานพร้อมกัน เราจำเป็นต้องมีส่วนประกอบเซิร์ฟเวอร์ของระบบแชทที่สามารถรักษาการเชื่อมต่อกับลูกค้าหลายราย ซึ่งจะทำให้เราสามารถกระจายข้อความที่ได้รับจากลูกค้ารายหนึ่งไปยังไคลเอ็นต์ที่เชื่อมต่ออื่นๆ ทั้งหมดได้

เซิร์ฟเวอร์แชทของเรากำลังทำงานในแท็บด้านซ้าย มีโปรแกรมแชทสองตัวที่ทำงานอยู่ในแท็บด้านขวา ข้อความใด ๆ ที่ส่งโดยไคลเอนต์จะได้รับโดยไคลเอนต์อื่น ๆ ทั้งหมด

โปรแกรมแชท

บทความนี้เน้นที่เซิร์ฟเวอร์แชท แต่หากต้องการสื่อสารกับเซิร์ฟเวอร์นั้น เราจำเป็นต้องมีโปรแกรมแชทก่อน รหัสต่อไปนี้จะเป็นลูกค้าที่เรียบง่ายของเรา (ดูตัวอย่างที่สมบูรณ์กว่านี้ได้ใน GitHub)

# client.rb
# $ ruby client.rb
require 'socket'
client = TCPSocket.open(ARGV[0], 2000)
 
Thread.new do
  while line = client.gets
    puts line.chop
  end
end
 
while input = STDIN.gets.chomp
  client.puts input
end

ไคลเอนต์เปิดการเชื่อมต่อ TCP กับเซิร์ฟเวอร์ที่ทำงานบนพอร์ต 2000 เมื่อเชื่อมต่อ มันจะวางไข่เธรดที่จะ puts ทุกสิ่งที่เซิร์ฟเวอร์ส่ง ดังนั้นการแชทจะปรากฏในเอาต์พุตของเทอร์มินัล สุดท้าย มี while loop ที่ส่งบรรทัดใดๆ ที่คุณพิมพ์ไปยังเซิร์ฟเวอร์ ซึ่งจะส่งไปยังไคลเอ็นต์ที่เชื่อมต่ออื่นๆ ทั้งหมด

เซิร์ฟเวอร์แชท

ในตัวอย่างนี้ ลูกค้าเชื่อมต่อกับเซิร์ฟเวอร์การแชทเพื่อสื่อสารกับลูกค้ารายอื่น สำหรับวิธีการทำงานพร้อมกันทั้งสามวิธี เราจะใช้เซิร์ฟเวอร์ TCP เดียวกันจากไลบรารีมาตรฐานของ Ruby

# server_processes.rb
# $ ruby server_processes.rb
require 'socket'
 
puts 'Starting server on port 2000'
 
server = TCPServer.open(2000)

จนถึงตอนนี้ โค้ดจะเหมือนกันสำหรับโมเดลการทำงานพร้อมกันทั้งสามแบบ เซิร์ฟเวอร์แชทในทุกโมเดลจะต้องจัดการกับสองสถานการณ์:

  1. ยอมรับการเชื่อมต่อใหม่จากลูกค้า
  2. รับข้อความจากลูกค้าและส่งไปยังไคลเอ็นต์อื่นๆ ทั้งหมด

เซิร์ฟเวอร์แชทแบบหลายกระบวนการ

เพื่อจัดการกับสถานการณ์จำลองทั้งสองนี้ด้วยเซิร์ฟเวอร์การแชทแบบหลายกระบวนการ เราจะสร้างกระบวนการต่อการเชื่อมต่อไคลเอนต์ กระบวนการนี้จะจัดการข้อความทั้งหมดที่ส่งและรับสำหรับลูกค้ารายนั้น เราสามารถสร้างกระบวนการเหล่านี้ได้โดยการฟอร์กกระบวนการเซิร์ฟเวอร์ดั้งเดิม

กระบวนการฟอร์ก

เมื่อคุณเรียกใช้เมธอด fork จะสร้างสำเนาของกระบวนการปัจจุบันด้วยสถานะเดียวกันกับกระบวนการนั้น

กระบวนการที่แยกกันมีรหัสกระบวนการของตัวเอง และจะมองเห็นแยกต่างหากในเครื่องมือเช่น top หรือการตรวจสอบกิจกรรม หน้าตาประมาณนี้:

กระบวนการที่คุณเริ่มต้นเรียกว่ากระบวนการหลัก และกระบวนการที่แยกออกจากกระบวนการหลักจะเรียกว่ากระบวนการของผู้ปฏิบัติงาน

เนื่องจากกระบวนการของผู้ปฏิบัติงานที่เพิ่งแยกใหม่เหล่านี้เป็นกระบวนการที่แยกจากกันอย่างแท้จริง เราจึงไม่สามารถแบ่งปันหน่วยความจำระหว่างกระบวนการเหล่านี้กับกระบวนการหลักได้ เราต้องการบางสิ่งบางอย่างเพื่อสื่อสารระหว่างกัน

ท่อยูนิกซ์

ในการสื่อสารระหว่างกระบวนการ เราจะใช้ท่อยูนิกซ์ ไพพ์ Unix ตั้งค่าสตรีมแบบสองทางของไบต์ระหว่างสองกระบวนการ และคุณสามารถใช้เพื่อส่งข้อมูลจากกระบวนการหนึ่งไปยังอีกกระบวนการหนึ่งได้ โชคดีที่ Ruby มีผ้าห่อรอบๆ ท่อเหล่านี้อย่างดี ดังนั้นเราจึงไม่จำเป็นต้องประดิษฐ์วงล้อขึ้นใหม่

ในตัวอย่างต่อไปนี้ เราตั้งค่าไพพ์ใน Ruby –ด้วยการอ่านและการเขียนจบ– และเรา fork กระบวนการหลัก รหัสภายในบล็อกที่ส่งผ่านไปยัง fork กำลังทำงานอยู่ในกระบวนการทางแยก กระบวนการเดิมยังคงดำเนินต่อไปหลังจากบล็อกนี้ จากนั้นเราจะเขียนข้อความไปยังกระบวนการเดิมจากขั้นตอนที่แยกจากกัน

reader, writer = IO.pipe
 
fork do
  # This is running in the forked process.
  writer.puts 'Hello from the forked process'
end
 
# This is running in the original process, it will puts the
# message from the forked process.
puts reader.gets

การใช้ไพพ์ทำให้เราสามารถสื่อสารระหว่างกระบวนการที่แยกจากกัน แม้ว่ากระบวนการจะถูกแยกออกจากกันโดยสิ้นเชิง

การใช้งานเซิร์ฟเวอร์แชท

ขั้นแรก เราตั้งค่าอาร์เรย์เพื่อติดตามไพพ์สำหรับลูกค้าทั้งหมดและ "ผู้เขียน" ของพวกเขา (จุดสิ้นสุดการเขียนของไพพ์) เพื่อให้เราสามารถสื่อสารกับลูกค้าได้ จากนั้นเราจะตรวจสอบให้แน่ใจว่าข้อความขาเข้าทั้งหมดจากลูกค้าจะถูกส่งไปยังไคลเอ็นต์อื่นๆ ทั้งหมด

client_writers = []
master_reader, master_writer = IO.pipe
 
write_incoming_messages_to_child_processes(master_reader, client_writers)

คุณสามารถค้นหาการใช้งาน write_incoming_messages_to_child_processes บน GitHub หากคุณต้องการดูรายละเอียดว่ามันทำงานอย่างไร

ยอมรับการเชื่อมต่อใหม่

เราจะต้องยอมรับการเชื่อมต่อที่เข้ามาและติดตั้งท่อ ผู้เขียนใหม่จะถูกผลักไปที่ client_writers อาร์เรย์ กระบวนการหลักจะสามารถวนซ้ำผ่านอาร์เรย์และส่งข้อความไปยังแต่ละกระบวนการของผู้ปฏิบัติงานโดยการเขียนไปยังไพพ์

จากนั้นเราจะแยกกระบวนการหลัก และรหัสภายในกระบวนการของผู้ปฏิบัติงานที่แยกออกจะจัดการการเชื่อมต่อไคลเอ็นต์

loop do
  while socket = server.accept
    # Create a client reader and writer so that the master
    # process can write messages back to us.
    client_reader, client_writer = IO.pipe
 
    # Put the client writer on the list of writers so the
    # master process can write to them.
    client_writers.push(client_writer)
 
    # Fork child process, everything in the fork block
    # only runs in the child process.
    fork do
      # Handle connection
    end
  end
end

การจัดการการเชื่อมต่อไคลเอ็นต์

เราต้องจัดการกับการเชื่อมต่อไคลเอ็นต์ด้วย

กระบวนการแยกกันเริ่มต้นด้วยการรับชื่อเล่นจากไคลเอนต์ (ไคลเอนต์ส่งชื่อเล่นโดยค่าเริ่มต้น) หลังจากนั้นจะเริ่มเธรดใน write_incoming_messages_to_client ที่รับฟังข้อความจากกระบวนการหลัก

สุดท้าย กระบวนการที่แยกจากกันจะเริ่มต้นการวนซ้ำที่รับฟังข้อความที่เข้ามาและส่งไปยังกระบวนการหลัก กระบวนการหลักทำให้แน่ใจว่ากระบวนการของผู้ปฏิบัติงานได้รับข้อความ

nickname = read_line_from(socket)
puts "#{Process.pid}: Accepted connection from #{nickname}"
 
write_incoming_messages_to_client(nickname, client_reader, socket)
 
# Read incoming messages from the client.
while incoming = read_line_from(socket)
  master_writer.puts "#{nickname}: #{incoming}"
end
 
puts "#{Process.pid}: Disconnected #{nickname}"

ระบบแชทที่ใช้งานได้

ตอนนี้ระบบแชททั้งหมดใช้งานได้แล้ว! แต่อย่างที่คุณเห็น การเขียนโปรแกรมที่ใช้การประมวลผลหลายตัวนั้นค่อนข้างซับซ้อนและใช้ทรัพยากรจำนวนมาก ข้อดีคือมันแข็งแกร่งมาก หากกระบวนการย่อยตัวใดตัวหนึ่งขัดข้อง ส่วนที่เหลือของระบบก็จะยังทำงานต่อไป คุณสามารถลองใช้โค้ดตัวอย่างและเรียกใช้ kill -9 <process-id> ในกระบวนการใดกระบวนการหนึ่ง (คุณสามารถค้นหารหัสกระบวนการได้ในเอาต์พุตบันทึกของเซิร์ฟเวอร์)

ในบทความถัดไป เราจะใช้ระบบแชทเดียวกันโดยใช้เธรดเท่านั้น ดังนั้นเราจึงสามารถเรียกใช้เซิร์ฟเวอร์ที่มีคุณสมบัติเดียวกันโดยใช้กระบวนการเดียวและใช้หน่วยความจำน้อยลง