Computer >> คอมพิวเตอร์ >  >> การเขียนโปรแกรม >> Ruby

Concurrency Deep Dive:ลูปเหตุการณ์

ยินดีต้อนรับสู่บทความ Ruby Magic ล่าสุดในชุดของเราเกี่ยวกับการทำงานพร้อมกัน ในรุ่นก่อนหน้านี้ เราได้ติดตั้งเซิร์ฟเวอร์แชทโดยใช้หลายกระบวนการและหลายเธรด คราวนี้เราจะทำสิ่งเดียวกันโดยใช้การวนซ้ำของเหตุการณ์

สรุป

เราจะใช้ไคลเอนต์เดียวกันและการตั้งค่าเซิร์ฟเวอร์เดียวกันกับที่เราใช้ในบทความก่อนหน้านี้ เป้าหมายของเราคือการสร้างระบบแชทที่มีลักษณะดังนี้:

โปรดดูบทความก่อนหน้าสำหรับรายละเอียดเพิ่มเติมเกี่ยวกับการตั้งค่าพื้นฐาน ซอร์สโค้ดแบบเต็มที่ใช้ในตัวอย่างในบทความนี้มีอยู่ใน GitHub ดังนั้นคุณจึงทดลองใช้งานได้ด้วยตนเอง

เซิร์ฟเวอร์แชทโดยใช้การวนรอบเหตุการณ์

การใช้วนรอบเหตุการณ์สำหรับเซิร์ฟเวอร์แชทของเรา คุณจะต้องมีโมเดลทางจิตที่แตกต่างจากการใช้เธรดหรือกระบวนการ ในแนวทางแบบคลาสสิก เธรดหรือกระบวนการมีหน้าที่ในการจัดการการเชื่อมต่อเดียว การใช้การวนซ้ำของเหตุการณ์ คุณจะมีเธรดเดียวในกระบวนการเดียวที่จัดการการเชื่อมต่อหลายรายการ มาดูกันว่ามันทำงานอย่างไรโดยแยกย่อย

วนรอบกิจกรรม

วนรอบเหตุการณ์ที่ใช้โดย EventMachine หรือ NodeJS ตัวอย่างเช่นทำงานดังนี้ เราเริ่มต้นด้วยการแจ้งระบบปฏิบัติการที่เราสนใจในบางเหตุการณ์ ตัวอย่างเช่น เมื่อเปิดการเชื่อมต่อกับซ็อกเก็ต เราทำสิ่งนี้โดยเรียกใช้ฟังก์ชันที่ลงทะเบียนความสนใจในวัตถุ IO บางอย่าง เช่น การเชื่อมต่อหรือซ็อกเก็ต

เมื่อมีบางอย่างเกิดขึ้นกับวัตถุ IO นี้ ระบบปฏิบัติการจะส่งเหตุการณ์ไปยังโปรแกรมของเรา เราวางเหตุการณ์เหล่านี้ไว้ในคิว การวนซ้ำของเหตุการณ์ทำให้เหตุการณ์หลุดออกจากรายการและจัดการทีละรายการ

ในแง่หนึ่งการวนซ้ำของเหตุการณ์ไม่ได้เกิดขึ้นพร้อมกันอย่างแท้จริง มันทำงานตามลำดับในชุดที่เล็กมากเพื่อจำลองเอฟเฟกต์

ในการลงทะเบียนความสนใจและให้ระบบปฏิบัติการส่งเหตุการณ์ IO มาให้เรา เราต้องเขียนส่วนขยาย C เนื่องจากไม่มี API สำหรับสิ่งนั้นในไลบรารีมาตรฐาน Ruby เจาะลึกลงไปว่าอยู่นอกขอบเขตของบทความนี้ ดังนั้นเราจะใช้ IO.select แทนที่จะสร้างเหตุการณ์ IO.select รับอาร์เรย์ของ IO วัตถุที่จะตรวจสอบ รอจนกว่าวัตถุอย่างน้อยหนึ่งรายการจากอาร์เรย์จะพร้อมสำหรับการอ่านหรือเขียน และจะส่งกลับอาร์เรย์ที่มีเพียง IO เหล่านั้น วัตถุ

รหัสที่ดูแลทุกอย่างที่เกี่ยวข้องกับการเชื่อมต่อถูกนำมาใช้เป็น Fiber :เราจะเรียกรหัสนี้ว่า "ตัวจัดการ" ต่อจากนี้ไป A Fiber เป็นบล็อกรหัสที่สามารถหยุดชั่วคราวและกลับมาทำงานต่อได้ Ruby VM ไม่ได้ทำสิ่งนี้โดยอัตโนมัติ ดังนั้นเราจึงต้องดำเนินการต่อและยอมจำนนด้วยตนเอง เราจะใช้อินพุตจาก IO.select เพื่อแจ้งให้ผู้ดูแลของเราทราบเมื่อคนรู้จักพร้อมสำหรับการอ่านหรือเขียน

เช่นเดียวกับในตัวอย่างเธรดและหลายกระบวนการจากโพสต์ก่อนหน้านี้ เราต้องการพื้นที่เก็บข้อมูลบางส่วนเพื่อติดตามลูกค้าและข้อความที่ส่ง เราไม่ต้องการ Mutex เวลานี้. วนรอบเหตุการณ์ของเราทำงานในเธรดเดียว ดังนั้นจึงไม่มีความเสี่ยงที่อ็อบเจ็กต์จะถูกกลายพันธุ์พร้อมกันโดยเธรดที่ต่างกัน

client_handlers = {}
messages = []

ตัวจัดการไคลเอ็นต์ถูกนำมาใช้ใน Fiber . ต่อไปนี้ . เมื่อซ็อกเก็ตสามารถอ่านหรือเขียนถึง เหตุการณ์จะถูกทริกเกอร์ซึ่ง Fiber ตอบสนอง เมื่อสถานะเป็น :readable มันอ่านบรรทัดจากซ็อกเก็ตและกดไปที่ messages อาร์เรย์ เมื่อสถานะเป็น :writable มันเขียนข้อความใด ๆ ที่ได้รับจากลูกค้ารายอื่นตั้งแต่การเขียนถึงลูกค้าครั้งล่าสุด หลังจากจัดการเหตุการณ์แล้วจะเรียก Fiber.yield ดังนั้นจะหยุดชั่วคราวและรอกิจกรรมถัดไป

def create_client_handler(nickname, socket)
  Fiber.new do
    last_write = Time.now
    loop do
      state = Fiber.yield
 
      if state == :readable
        # Read a message from the socket
        incoming = read_line_from(socket)
        # All good, add it to the list to write
        $messages.push(
          :time => Time.now,
          :nickname => nickname,
          :text => incoming
        )
      elsif state == :writable
        # Write messages to the socket
        get_messages_to_send(last_write, nickname, $messages).each do |message|
          socket.puts "#{message[:nickname]}: #{message[:text]}"
        end
        last_write = Time.now
      end
    end
  end
end

แล้วเราจะเรียก Fiber . ได้อย่างไร เพื่ออ่านหรือเขียนในเวลาที่เหมาะสมเมื่อ Socket พร้อมหรือยัง? เราใช้การวนซ้ำเหตุการณ์ที่มีสี่ขั้นตอน:

loop do
  # Step 1: Accept incoming connections
  accept_incoming_connections
 
  # Step 2: Get connections that are ready for reading or writing
  get_ready_connections
 
  # Step 3: Read from readable connections
  read_from_readable_connections
 
  # Step 4: Write to writable connections
  write_to_writable_connections
end

สังเกตว่าไม่มีเวทย์มนตร์ที่นี่ นี่คือรูบี้ลูปปกติ

ขั้นตอนที่ 1:ยอมรับการเชื่อมต่อที่เข้ามา

ดูว่าเรามีการเชื่อมต่อเข้ามาใหม่หรือไม่ เราใช้ accept_nonblock ซึ่งจะไม่รอให้ไคลเอ็นต์เชื่อมต่อ มันจะทำให้เกิดข้อผิดพลาดแทนหากไม่มีไคลเอนต์ใหม่ และหากเกิดข้อผิดพลาดนั้น เราจะจับมันและไปที่ขั้นตอนถัดไป หากมีไคลเอนต์ใหม่ เราสร้างตัวจัดการสำหรับมันและวางไว้ใน clients เก็บ. เราจะใช้วัตถุซ็อกเก็ตเป็นคีย์ของ Hash เพื่อให้เราสามารถหาตัวจัดการไคลเอ็นต์ได้ในภายหลัง

begin
  socket = server.accept_nonblock
  nickname = socket.gets.chomp
  $client_handlers[socket] = create_client_handler(nickname, socket)
  puts "Accepted connection from #{nickname}"
rescue IO::WaitReadable, Errno::EINTR
  # No new incoming connections at the moment
end

ขั้นตอนที่ 2:รับการเชื่อมต่อที่พร้อมสำหรับการอ่านหรือเขียน

ต่อไป เราขอให้ระบบปฏิบัติการแจ้งให้เราทราบเมื่อการเชื่อมต่อพร้อม เราส่งกุญแจของ client_handlers ร้านค้าสำหรับการอ่าน การเขียน และการจัดการข้อผิดพลาด คีย์เหล่านี้เป็นวัตถุซ็อกเก็ตที่เรายอมรับในขั้นตอนที่ 1 เรารอ 10 มิลลิวินาทีเพื่อให้สิ่งนี้เกิดขึ้น

readable, writable = IO.select(
  $client_handlers.keys,
  $client_handlers.keys,
  $client_handlers.keys,
  0.01
)

ขั้นตอนที่ 3:อ่านจากการเชื่อมต่อที่อ่านได้

หากมีการเชื่อมต่อใด ๆ ของเราสามารถอ่านได้ เราจะเรียกตัวจัดการไคลเอ็นต์และดำเนินการต่อไปด้วย readable สถานะ. เราสามารถค้นหาตัวจัดการไคลเอ็นต์เหล่านี้ได้เนื่องจาก Socket วัตถุที่ส่งคืนโดย IO.select ใช้เป็นกุญแจของตัวจัดการสโตร์

if readable
  readable.each do |ready_socket|
    # Get the client from storage
    client = $client_handlers[ready_socket]
 
    client.resume(:readable)
  end
end

ขั้นตอนที่ 4:เขียนถึงการเชื่อมต่อที่เขียนได้

หากการเชื่อมต่อใด ๆ ของเราสามารถเขียนได้ เราจะเรียกตัวจัดการไคลเอ็นต์และดำเนินการต่อด้วย writable รัฐ.

if writable
  writable.each do |ready_socket|
    # Get the client from storage
    client = $client_handlers[ready_socket]
    next unless client
 
    client.resume(:writable)
  end
end

โดยใช้สี่ขั้นตอนเหล่านี้ในลูปซึ่งสร้างตัวจัดการและเรียก readable และ writable บนตัวจัดการเหล่านี้ในเวลาที่เหมาะสม เราได้สร้างเซิร์ฟเวอร์แชทที่มีการจัดกิจกรรมที่ทำงานได้อย่างสมบูรณ์ มีค่าใช้จ่ายต่อการเชื่อมต่อน้อยมาก และเราสามารถขยายได้ถึงไคลเอนต์พร้อมกันจำนวนมาก

วิธีนี้ใช้ได้ผลดีตราบใดที่เรารักษาปริมาณงานต่อติ๊กของลูปให้น้อย นี่เป็นสิ่งสำคัญอย่างยิ่งสำหรับงานที่เกี่ยวข้องกับการคำนวณ เนื่องจากการวนรอบเหตุการณ์ทำงานในเธรดเดียว ดังนั้นจึงสามารถใช้ CPU เดียวได้ ในระบบที่ใช้งานจริงมักมีหลายกระบวนการที่รันการวนรอบเหตุการณ์เพื่อแก้ไขข้อจำกัดนี้

สรุป

หลังจากทั้งหมดนี้ คุณอาจถามว่าฉันควรใช้วิธีใดในสามวิธีนี้

  • สำหรับแอปส่วนใหญ่ การทำเธรดก็สมเหตุสมผลดี เป็นแนวทางที่ง่ายที่สุดในการทำงานด้วย
  • หากคุณเรียกใช้แอปที่มีการทำงานพร้อมกันสูงกับสตรีมที่ใช้เวลานาน การวนซ้ำของเหตุการณ์จะช่วยให้คุณปรับขนาดได้
  • หากคุณคาดหวังว่ากระบวนการของคุณจะพัง ให้เลือกหลายกระบวนการแบบเก่าที่ดี เนื่องจากเป็นแนวทางที่แข็งแกร่งที่สุด

นี้สรุปชุดของเราเกี่ยวกับการทำงานพร้อมกัน หากคุณต้องการสรุปทั้งหมด ให้ตรวจสอบบทความเกี่ยวกับการทำงานพร้อมกันของมาสเตอร์ต้นฉบับ รวมถึงบทความโดยละเอียดเกี่ยวกับการใช้กระบวนการหลายขั้นตอนและหลายเธรด