ยินดีต้อนรับสู่บทความ Ruby Magic ล่าสุดในชุดของเราเกี่ยวกับการทำงานพร้อมกัน ในรุ่นก่อนหน้านี้ เราได้ติดตั้งเซิร์ฟเวอร์แชทโดยใช้หลายกระบวนการและหลายเธรด คราวนี้เราจะทำสิ่งเดียวกันโดยใช้การวนซ้ำของเหตุการณ์
สรุป
เราจะใช้ไคลเอนต์เดียวกันและการตั้งค่าเซิร์ฟเวอร์เดียวกันกับที่เราใช้ในบทความก่อนหน้านี้ เป้าหมายของเราคือการสร้างระบบแชทที่มีลักษณะดังนี้:
โปรดดูบทความก่อนหน้าสำหรับรายละเอียดเพิ่มเติมเกี่ยวกับการตั้งค่าพื้นฐาน ซอร์สโค้ดแบบเต็มที่ใช้ในตัวอย่างในบทความนี้มีอยู่ใน GitHub ดังนั้นคุณจึงทดลองใช้งานได้ด้วยตนเอง
เซิร์ฟเวอร์แชทโดยใช้การวนรอบเหตุการณ์
การใช้วนรอบเหตุการณ์สำหรับเซิร์ฟเวอร์แชทของเรา คุณจะต้องมีโมเดลทางจิตที่แตกต่างจากการใช้เธรดหรือกระบวนการ ในแนวทางแบบคลาสสิก เธรดหรือกระบวนการมีหน้าที่ในการจัดการการเชื่อมต่อเดียว การใช้การวนซ้ำของเหตุการณ์ คุณจะมีเธรดเดียวในกระบวนการเดียวที่จัดการการเชื่อมต่อหลายรายการ มาดูกันว่ามันทำงานอย่างไรโดยแยกย่อย
วนรอบกิจกรรม
วนรอบเหตุการณ์ที่ใช้โดย EventMachine หรือ NodeJS ตัวอย่างเช่นทำงานดังนี้ เราเริ่มต้นด้วยการแจ้งระบบปฏิบัติการที่เราสนใจในบางเหตุการณ์ ตัวอย่างเช่น เมื่อเปิดการเชื่อมต่อกับซ็อกเก็ต เราทำสิ่งนี้โดยเรียกใช้ฟังก์ชันที่ลงทะเบียนความสนใจในวัตถุ IO บางอย่าง เช่น การเชื่อมต่อหรือซ็อกเก็ต
เมื่อมีบางอย่างเกิดขึ้นกับวัตถุ IO นี้ ระบบปฏิบัติการจะส่งเหตุการณ์ไปยังโปรแกรมของเรา เราวางเหตุการณ์เหล่านี้ไว้ในคิว การวนซ้ำของเหตุการณ์ทำให้เหตุการณ์หลุดออกจากรายการและจัดการทีละรายการ
ในแง่หนึ่งการวนซ้ำของเหตุการณ์ไม่ได้เกิดขึ้นพร้อมกันอย่างแท้จริง มันทำงานตามลำดับในชุดที่เล็กมากเพื่อจำลองเอฟเฟกต์
ในการลงทะเบียนความสนใจและให้ระบบปฏิบัติการส่งเหตุการณ์ IO มาให้เรา เราต้องเขียนส่วนขยาย C เนื่องจากไม่มี API สำหรับสิ่งนั้นในไลบรารีมาตรฐาน Ruby เจาะลึกลงไปว่าอยู่นอกขอบเขตของบทความนี้ ดังนั้นเราจะใช้ IO.select
แทนที่จะสร้างเหตุการณ์ IO.select
รับอาร์เรย์ของ IO
วัตถุที่จะตรวจสอบ รอจนกว่าวัตถุอย่างน้อยหนึ่งรายการจากอาร์เรย์จะพร้อมสำหรับการอ่านหรือเขียน และจะส่งกลับอาร์เรย์ที่มีเพียง IO
เหล่านั้น วัตถุ
รหัสที่ดูแลทุกอย่างที่เกี่ยวข้องกับการเชื่อมต่อถูกนำมาใช้เป็น Fiber
:เราจะเรียกรหัสนี้ว่า "ตัวจัดการ" ต่อจากนี้ไป A Fiber
เป็นบล็อกรหัสที่สามารถหยุดชั่วคราวและกลับมาทำงานต่อได้ Ruby VM ไม่ได้ทำสิ่งนี้โดยอัตโนมัติ ดังนั้นเราจึงต้องดำเนินการต่อและยอมจำนนด้วยตนเอง เราจะใช้อินพุตจาก IO.select
เพื่อแจ้งให้ผู้ดูแลของเราทราบเมื่อคนรู้จักพร้อมสำหรับการอ่านหรือเขียน
เช่นเดียวกับในตัวอย่างเธรดและหลายกระบวนการจากโพสต์ก่อนหน้านี้ เราต้องการพื้นที่เก็บข้อมูลบางส่วนเพื่อติดตามลูกค้าและข้อความที่ส่ง เราไม่ต้องการ Mutex
เวลานี้. วนรอบเหตุการณ์ของเราทำงานในเธรดเดียว ดังนั้นจึงไม่มีความเสี่ยงที่อ็อบเจ็กต์จะถูกกลายพันธุ์พร้อมกันโดยเธรดที่ต่างกัน
client_handlers = {}
messages = []
ตัวจัดการไคลเอ็นต์ถูกนำมาใช้ใน Fiber
. ต่อไปนี้ . เมื่อซ็อกเก็ตสามารถอ่านหรือเขียนถึง เหตุการณ์จะถูกทริกเกอร์ซึ่ง Fiber
ตอบสนอง เมื่อสถานะเป็น :readable
มันอ่านบรรทัดจากซ็อกเก็ตและกดไปที่ messages
อาร์เรย์ เมื่อสถานะเป็น :writable
มันเขียนข้อความใด ๆ ที่ได้รับจากลูกค้ารายอื่นตั้งแต่การเขียนถึงลูกค้าครั้งล่าสุด หลังจากจัดการเหตุการณ์แล้วจะเรียก Fiber.yield
ดังนั้นจะหยุดชั่วคราวและรอกิจกรรมถัดไป
def create_client_handler(nickname, socket)
Fiber.new do
last_write = Time.now
loop do
state = Fiber.yield
if state == :readable
# Read a message from the socket
incoming = read_line_from(socket)
# All good, add it to the list to write
$messages.push(
:time => Time.now,
:nickname => nickname,
:text => incoming
)
elsif state == :writable
# Write messages to the socket
get_messages_to_send(last_write, nickname, $messages).each do |message|
socket.puts "#{message[:nickname]}: #{message[:text]}"
end
last_write = Time.now
end
end
end
end
แล้วเราจะเรียก Fiber
. ได้อย่างไร เพื่ออ่านหรือเขียนในเวลาที่เหมาะสมเมื่อ Socket
พร้อมหรือยัง? เราใช้การวนซ้ำเหตุการณ์ที่มีสี่ขั้นตอน:
loop do
# Step 1: Accept incoming connections
accept_incoming_connections
# Step 2: Get connections that are ready for reading or writing
get_ready_connections
# Step 3: Read from readable connections
read_from_readable_connections
# Step 4: Write to writable connections
write_to_writable_connections
end
สังเกตว่าไม่มีเวทย์มนตร์ที่นี่ นี่คือรูบี้ลูปปกติ
ขั้นตอนที่ 1:ยอมรับการเชื่อมต่อที่เข้ามา
ดูว่าเรามีการเชื่อมต่อเข้ามาใหม่หรือไม่ เราใช้ accept_nonblock
ซึ่งจะไม่รอให้ไคลเอ็นต์เชื่อมต่อ มันจะทำให้เกิดข้อผิดพลาดแทนหากไม่มีไคลเอนต์ใหม่ และหากเกิดข้อผิดพลาดนั้น เราจะจับมันและไปที่ขั้นตอนถัดไป หากมีไคลเอนต์ใหม่ เราสร้างตัวจัดการสำหรับมันและวางไว้ใน clients
เก็บ. เราจะใช้วัตถุซ็อกเก็ตเป็นคีย์ของ Hash
เพื่อให้เราสามารถหาตัวจัดการไคลเอ็นต์ได้ในภายหลัง
begin
socket = server.accept_nonblock
nickname = socket.gets.chomp
$client_handlers[socket] = create_client_handler(nickname, socket)
puts "Accepted connection from #{nickname}"
rescue IO::WaitReadable, Errno::EINTR
# No new incoming connections at the moment
end
ขั้นตอนที่ 2:รับการเชื่อมต่อที่พร้อมสำหรับการอ่านหรือเขียน
ต่อไป เราขอให้ระบบปฏิบัติการแจ้งให้เราทราบเมื่อการเชื่อมต่อพร้อม เราส่งกุญแจของ client_handlers
ร้านค้าสำหรับการอ่าน การเขียน และการจัดการข้อผิดพลาด คีย์เหล่านี้เป็นวัตถุซ็อกเก็ตที่เรายอมรับในขั้นตอนที่ 1 เรารอ 10 มิลลิวินาทีเพื่อให้สิ่งนี้เกิดขึ้น
readable, writable = IO.select(
$client_handlers.keys,
$client_handlers.keys,
$client_handlers.keys,
0.01
)
ขั้นตอนที่ 3:อ่านจากการเชื่อมต่อที่อ่านได้
หากมีการเชื่อมต่อใด ๆ ของเราสามารถอ่านได้ เราจะเรียกตัวจัดการไคลเอ็นต์และดำเนินการต่อไปด้วย readable
สถานะ. เราสามารถค้นหาตัวจัดการไคลเอ็นต์เหล่านี้ได้เนื่องจาก Socket
วัตถุที่ส่งคืนโดย IO.select
ใช้เป็นกุญแจของตัวจัดการสโตร์
if readable
readable.each do |ready_socket|
# Get the client from storage
client = $client_handlers[ready_socket]
client.resume(:readable)
end
end
ขั้นตอนที่ 4:เขียนถึงการเชื่อมต่อที่เขียนได้
หากการเชื่อมต่อใด ๆ ของเราสามารถเขียนได้ เราจะเรียกตัวจัดการไคลเอ็นต์และดำเนินการต่อด้วย writable
รัฐ.
if writable
writable.each do |ready_socket|
# Get the client from storage
client = $client_handlers[ready_socket]
next unless client
client.resume(:writable)
end
end
โดยใช้สี่ขั้นตอนเหล่านี้ในลูปซึ่งสร้างตัวจัดการและเรียก readable
และ writable
บนตัวจัดการเหล่านี้ในเวลาที่เหมาะสม เราได้สร้างเซิร์ฟเวอร์แชทที่มีการจัดกิจกรรมที่ทำงานได้อย่างสมบูรณ์ มีค่าใช้จ่ายต่อการเชื่อมต่อน้อยมาก และเราสามารถขยายได้ถึงไคลเอนต์พร้อมกันจำนวนมาก
วิธีนี้ใช้ได้ผลดีตราบใดที่เรารักษาปริมาณงานต่อติ๊กของลูปให้น้อย นี่เป็นสิ่งสำคัญอย่างยิ่งสำหรับงานที่เกี่ยวข้องกับการคำนวณ เนื่องจากการวนรอบเหตุการณ์ทำงานในเธรดเดียว ดังนั้นจึงสามารถใช้ CPU เดียวได้ ในระบบที่ใช้งานจริงมักมีหลายกระบวนการที่รันการวนรอบเหตุการณ์เพื่อแก้ไขข้อจำกัดนี้
สรุป
หลังจากทั้งหมดนี้ คุณอาจถามว่าฉันควรใช้วิธีใดในสามวิธีนี้
- สำหรับแอปส่วนใหญ่ การทำเธรดก็สมเหตุสมผลดี เป็นแนวทางที่ง่ายที่สุดในการทำงานด้วย
- หากคุณเรียกใช้แอปที่มีการทำงานพร้อมกันสูงกับสตรีมที่ใช้เวลานาน การวนซ้ำของเหตุการณ์จะช่วยให้คุณปรับขนาดได้
- หากคุณคาดหวังว่ากระบวนการของคุณจะพัง ให้เลือกหลายกระบวนการแบบเก่าที่ดี เนื่องจากเป็นแนวทางที่แข็งแกร่งที่สุด
นี้สรุปชุดของเราเกี่ยวกับการทำงานพร้อมกัน หากคุณต้องการสรุปทั้งหมด ให้ตรวจสอบบทความเกี่ยวกับการทำงานพร้อมกันของมาสเตอร์ต้นฉบับ รวมถึงบทความโดยละเอียดเกี่ยวกับการใช้กระบวนการหลายขั้นตอนและหลายเธรด