ในบทความ Ruby Magic ก่อนหน้านี้เกี่ยวกับ Mastering Concurrency เราได้แนะนำวิธีการสามวิธีในการบรรลุการทำงานพร้อมกันซึ่งมีให้บริการสำหรับเราในฐานะนักพัฒนา Ruby บทความนี้เป็นบทความแรกในซีรีส์สามตอนที่เราเจาะลึกข้อมูลแต่ละวิธี
อันดับแรก:หลายกระบวนการ . ด้วยวิธีนี้ กระบวนการหลักจะแยกตัวเองไปยังกระบวนการของผู้ปฏิบัติงานหลายคน กระบวนการของผู้ปฏิบัติงานทำงานจริง ในขณะที่ต้นแบบจัดการผู้ปฏิบัติงาน
ซอร์สโค้ดแบบเต็มที่ใช้ในตัวอย่างในบทความนี้มีอยู่ใน GitHub ดังนั้นคุณจึงทดลองใช้งานได้ด้วยตนเอง
มาสร้างระบบแชทกันเถอะ!
การสร้างระบบแชทเป็นวิธีที่ดีในการดำดิ่งสู่การทำงานพร้อมกัน เราจำเป็นต้องมีส่วนประกอบเซิร์ฟเวอร์ของระบบแชทที่สามารถรักษาการเชื่อมต่อกับลูกค้าหลายราย ซึ่งจะทำให้เราสามารถกระจายข้อความที่ได้รับจากลูกค้ารายหนึ่งไปยังไคลเอ็นต์ที่เชื่อมต่ออื่นๆ ทั้งหมดได้
เซิร์ฟเวอร์แชทของเรากำลังทำงานในแท็บด้านซ้าย มีโปรแกรมแชทสองตัวที่ทำงานอยู่ในแท็บด้านขวา ข้อความใด ๆ ที่ส่งโดยไคลเอนต์จะได้รับโดยไคลเอนต์อื่น ๆ ทั้งหมด
โปรแกรมแชท
บทความนี้เน้นที่เซิร์ฟเวอร์แชท แต่หากต้องการสื่อสารกับเซิร์ฟเวอร์นั้น เราจำเป็นต้องมีโปรแกรมแชทก่อน รหัสต่อไปนี้จะเป็นลูกค้าที่เรียบง่ายของเรา (ดูตัวอย่างที่สมบูรณ์กว่านี้ได้ใน GitHub)
# client.rb
# $ ruby client.rb
require 'socket'
client = TCPSocket.open(ARGV[0], 2000)
Thread.new do
while line = client.gets
puts line.chop
end
end
while input = STDIN.gets.chomp
client.puts input
end
ไคลเอนต์เปิดการเชื่อมต่อ TCP กับเซิร์ฟเวอร์ที่ทำงานบนพอร์ต 2000 เมื่อเชื่อมต่อ มันจะวางไข่เธรดที่จะ puts
ทุกสิ่งที่เซิร์ฟเวอร์ส่ง ดังนั้นการแชทจะปรากฏในเอาต์พุตของเทอร์มินัล สุดท้าย มี while loop ที่ส่งบรรทัดใดๆ ที่คุณพิมพ์ไปยังเซิร์ฟเวอร์ ซึ่งจะส่งไปยังไคลเอ็นต์ที่เชื่อมต่ออื่นๆ ทั้งหมด
เซิร์ฟเวอร์แชท
ในตัวอย่างนี้ ลูกค้าเชื่อมต่อกับเซิร์ฟเวอร์การแชทเพื่อสื่อสารกับลูกค้ารายอื่น สำหรับวิธีการทำงานพร้อมกันทั้งสามวิธี เราจะใช้เซิร์ฟเวอร์ TCP เดียวกันจากไลบรารีมาตรฐานของ Ruby
# server_processes.rb
# $ ruby server_processes.rb
require 'socket'
puts 'Starting server on port 2000'
server = TCPServer.open(2000)
จนถึงตอนนี้ โค้ดจะเหมือนกันสำหรับโมเดลการทำงานพร้อมกันทั้งสามแบบ เซิร์ฟเวอร์แชทในทุกโมเดลจะต้องจัดการกับสองสถานการณ์:
- ยอมรับการเชื่อมต่อใหม่จากลูกค้า
- รับข้อความจากลูกค้าและส่งไปยังไคลเอ็นต์อื่นๆ ทั้งหมด
เซิร์ฟเวอร์แชทแบบหลายกระบวนการ
เพื่อจัดการกับสถานการณ์จำลองทั้งสองนี้ด้วยเซิร์ฟเวอร์การแชทแบบหลายกระบวนการ เราจะสร้างกระบวนการต่อการเชื่อมต่อไคลเอนต์ กระบวนการนี้จะจัดการข้อความทั้งหมดที่ส่งและรับสำหรับลูกค้ารายนั้น เราสามารถสร้างกระบวนการเหล่านี้ได้โดยการฟอร์กกระบวนการเซิร์ฟเวอร์ดั้งเดิม
กระบวนการฟอร์ก
เมื่อคุณเรียกใช้เมธอด fork จะสร้างสำเนาของกระบวนการปัจจุบันด้วยสถานะเดียวกันกับกระบวนการนั้น
กระบวนการที่แยกกันมีรหัสกระบวนการของตัวเอง และจะมองเห็นแยกต่างหากในเครื่องมือเช่น top
หรือการตรวจสอบกิจกรรม หน้าตาประมาณนี้:
กระบวนการที่คุณเริ่มต้นเรียกว่ากระบวนการหลัก และกระบวนการที่แยกออกจากกระบวนการหลักจะเรียกว่ากระบวนการของผู้ปฏิบัติงาน
เนื่องจากกระบวนการของผู้ปฏิบัติงานที่เพิ่งแยกใหม่เหล่านี้เป็นกระบวนการที่แยกจากกันอย่างแท้จริง เราจึงไม่สามารถแบ่งปันหน่วยความจำระหว่างกระบวนการเหล่านี้กับกระบวนการหลักได้ เราต้องการบางสิ่งบางอย่างเพื่อสื่อสารระหว่างกัน
ท่อยูนิกซ์
ในการสื่อสารระหว่างกระบวนการ เราจะใช้ท่อยูนิกซ์ ไพพ์ Unix ตั้งค่าสตรีมแบบสองทางของไบต์ระหว่างสองกระบวนการ และคุณสามารถใช้เพื่อส่งข้อมูลจากกระบวนการหนึ่งไปยังอีกกระบวนการหนึ่งได้ โชคดีที่ Ruby มีผ้าห่อรอบๆ ท่อเหล่านี้อย่างดี ดังนั้นเราจึงไม่จำเป็นต้องประดิษฐ์วงล้อขึ้นใหม่
ในตัวอย่างต่อไปนี้ เราตั้งค่าไพพ์ใน Ruby –ด้วยการอ่านและการเขียนจบ– และเรา fork
กระบวนการหลัก รหัสภายในบล็อกที่ส่งผ่านไปยัง fork
กำลังทำงานอยู่ในกระบวนการทางแยก กระบวนการเดิมยังคงดำเนินต่อไปหลังจากบล็อกนี้ จากนั้นเราจะเขียนข้อความไปยังกระบวนการเดิมจากขั้นตอนที่แยกจากกัน
reader, writer = IO.pipe
fork do
# This is running in the forked process.
writer.puts 'Hello from the forked process'
end
# This is running in the original process, it will puts the
# message from the forked process.
puts reader.gets
การใช้ไพพ์ทำให้เราสามารถสื่อสารระหว่างกระบวนการที่แยกจากกัน แม้ว่ากระบวนการจะถูกแยกออกจากกันโดยสิ้นเชิง
การใช้งานเซิร์ฟเวอร์แชท
ขั้นแรก เราตั้งค่าอาร์เรย์เพื่อติดตามไพพ์สำหรับลูกค้าทั้งหมดและ "ผู้เขียน" ของพวกเขา (จุดสิ้นสุดการเขียนของไพพ์) เพื่อให้เราสามารถสื่อสารกับลูกค้าได้ จากนั้นเราจะตรวจสอบให้แน่ใจว่าข้อความขาเข้าทั้งหมดจากลูกค้าจะถูกส่งไปยังไคลเอ็นต์อื่นๆ ทั้งหมด
client_writers = []
master_reader, master_writer = IO.pipe
write_incoming_messages_to_child_processes(master_reader, client_writers)
คุณสามารถค้นหาการใช้งาน write_incoming_messages_to_child_processes
บน GitHub หากคุณต้องการดูรายละเอียดว่ามันทำงานอย่างไร
ยอมรับการเชื่อมต่อใหม่
เราจะต้องยอมรับการเชื่อมต่อที่เข้ามาและติดตั้งท่อ ผู้เขียนใหม่จะถูกผลักไปที่ client_writers
อาร์เรย์ กระบวนการหลักจะสามารถวนซ้ำผ่านอาร์เรย์และส่งข้อความไปยังแต่ละกระบวนการของผู้ปฏิบัติงานโดยการเขียนไปยังไพพ์
จากนั้นเราจะแยกกระบวนการหลัก และรหัสภายในกระบวนการของผู้ปฏิบัติงานที่แยกออกจะจัดการการเชื่อมต่อไคลเอ็นต์
loop do
while socket = server.accept
# Create a client reader and writer so that the master
# process can write messages back to us.
client_reader, client_writer = IO.pipe
# Put the client writer on the list of writers so the
# master process can write to them.
client_writers.push(client_writer)
# Fork child process, everything in the fork block
# only runs in the child process.
fork do
# Handle connection
end
end
end
การจัดการการเชื่อมต่อไคลเอ็นต์
เราต้องจัดการกับการเชื่อมต่อไคลเอ็นต์ด้วย
กระบวนการแยกกันเริ่มต้นด้วยการรับชื่อเล่นจากไคลเอนต์ (ไคลเอนต์ส่งชื่อเล่นโดยค่าเริ่มต้น) หลังจากนั้นจะเริ่มเธรดใน write_incoming_messages_to_client
ที่รับฟังข้อความจากกระบวนการหลัก
สุดท้าย กระบวนการที่แยกจากกันจะเริ่มต้นการวนซ้ำที่รับฟังข้อความที่เข้ามาและส่งไปยังกระบวนการหลัก กระบวนการหลักทำให้แน่ใจว่ากระบวนการของผู้ปฏิบัติงานได้รับข้อความ
nickname = read_line_from(socket)
puts "#{Process.pid}: Accepted connection from #{nickname}"
write_incoming_messages_to_client(nickname, client_reader, socket)
# Read incoming messages from the client.
while incoming = read_line_from(socket)
master_writer.puts "#{nickname}: #{incoming}"
end
puts "#{Process.pid}: Disconnected #{nickname}"
ระบบแชทที่ใช้งานได้
ตอนนี้ระบบแชททั้งหมดใช้งานได้แล้ว! แต่อย่างที่คุณเห็น การเขียนโปรแกรมที่ใช้การประมวลผลหลายตัวนั้นค่อนข้างซับซ้อนและใช้ทรัพยากรจำนวนมาก ข้อดีคือมันแข็งแกร่งมาก หากกระบวนการย่อยตัวใดตัวหนึ่งขัดข้อง ส่วนที่เหลือของระบบก็จะยังทำงานต่อไป คุณสามารถลองใช้โค้ดตัวอย่างและเรียกใช้ kill -9 <process-id>
ในกระบวนการใดกระบวนการหนึ่ง (คุณสามารถค้นหารหัสกระบวนการได้ในเอาต์พุตบันทึกของเซิร์ฟเวอร์)
ในบทความถัดไป เราจะใช้ระบบแชทเดียวกันโดยใช้เธรดเท่านั้น ดังนั้นเราจึงสามารถเรียกใช้เซิร์ฟเวอร์ที่มีคุณสมบัติเดียวกันโดยใช้กระบวนการเดียวและใช้หน่วยความจำน้อยลง