บริษัทต่างๆ ต้องการตอบสนองต่อความต้องการในการประมวลผลและแบ่งปันข้อมูลจำนวนมากในแบบเรียลไทม์อย่างรวดเร็ว เพื่อรับข้อมูลเชิงลึกและสร้างประสบการณ์ที่ลูกค้ามีส่วนร่วมมากขึ้น ดังนั้นการประมวลผลข้อมูลแบบเดิมจึงไม่สามารถทำได้ในโลกปัจจุบันอีกต่อไป
เพื่อให้บรรลุผลดังกล่าว คุณต้องประมวลผลข้อมูลจำนวนมากให้เร็วที่สุด แล้วจึงส่งไปยังบริการอื่นๆ เพื่อการประมวลผลเพิ่มเติม แต่ในระหว่างการดำเนินการอย่างรวดเร็วเหล่านี้ จำเป็นต้องแจ้งให้ผู้บริโภคทราบเมื่อมีเหตุการณ์เกิดขึ้น และเราสามารถทำได้โดยใช้การสตรีมกิจกรรม
นี่คือ repo ใน GitHub ที่เราจะใช้
เหตุการณ์
ก่อนพูดถึงการสตรีมกิจกรรม เรามาคุยกันก่อนว่ากิจกรรมคืออะไร เหตุการณ์ที่เกิดขึ้นภายในแอปพลิเคชันอาจเกี่ยวข้องกับกระบวนการของผู้ใช้หรือเพียงแค่การกระทำที่ส่งผลต่อธุรกิจ
เหตุการณ์แสดงถึงการเปลี่ยนแปลงสถานะ ไม่ใช่คำถามเกี่ยวกับวิธีการแก้ไขแอปพลิเคชัน พิจารณาสิ่งเหล่านี้เป็นตัวอย่าง:
- ผู้ใช้ที่เข้าสู่ระบบบริการ
- ธุรกรรมการชำระเงิน
- นักเขียนที่เผยแพร่โพสต์ในบล็อก
ในกรณีส่วนใหญ่ เหตุการณ์จะทำให้เกิดเหตุการณ์มากขึ้น ตัวอย่างเช่น เมื่อผู้ใช้สมัครใช้บริการ แอปจะส่งการแจ้งเตือนไปยังอุปกรณ์ แทรกบันทึกลงในฐานข้อมูล และส่งอีเมลต้อนรับ
การสตรีมกิจกรรม
การสตรีมกิจกรรม เป็นรูปแบบการเก็บข้อมูลแบบเรียลไทม์จากแหล่งที่มาของเหตุการณ์ เช่น ฐานข้อมูล ส่วนหลักของการสตรีมกิจกรรมมีดังนี้:
- นายหน้า :ระบบที่รับผิดชอบการจัดเก็บเหตุการณ์
- หัวข้อ :หมวดหมู่ของเหตุการณ์
- โปรดิวเซอร์ :ส่งกิจกรรมไปยังนายหน้าในหัวข้อเฉพาะ
- ผู้บริโภค :อ่านเหตุการณ์
- เหตุการณ์ :ข้อมูลที่ผู้ผลิตต้องการสื่อสารกับผู้บริโภค
หลีกเลี่ยงไม่ได้ที่จะพูดถึง เผยแพร่และสมัครรูปแบบสถาปัตยกรรม (รูปแบบ pub/sub) ณ จุดนี้; การสตรีมเหตุการณ์เป็นการนำรูปแบบดังกล่าวไปใช้ แต่มีการเปลี่ยนแปลงเหล่านี้:
- เหตุการณ์เกิดขึ้นแทนข้อความ
- กิจกรรมจะเรียงลำดับ โดยปกติตามเวลา
- ผู้บริโภคสามารถอ่านเหตุการณ์จากจุดเฉพาะในหัวข้อ
- เหตุการณ์มีความคงทนชั่วขณะ
การไหลเริ่มต้นเมื่อ ผู้ผลิต เผยแพร่ กิจกรรมใหม่ เป็น หัวข้อ (ดังที่เราเห็นก่อนหน้านี้ หัวข้อเป็นเพียงการจัดหมวดหมู่สำหรับประเภทเหตุการณ์เฉพาะ) จากนั้น ผู้บริโภค สนใจในกิจกรรมของหมวดหมู่เฉพาะสมัครสมาชิกหัวข้อนั้น สุดท้ายนี้ โบรกเกอร์ ระบุผู้บริโภคของหัวข้อและทำให้กิจกรรมที่ต้องการพร้อมใช้งาน
ข้อดีของการสตรีมเหตุการณ์
-
ดีคัปปลิ้ง ไม่มีการพึ่งพาระหว่างผู้เผยแพร่และผู้บริโภคเพราะพวกเขาไม่จำเป็นต้องรู้จักกัน นอกจากนี้ เหตุการณ์ไม่ได้ระบุการกระทำ ดังนั้นผู้บริโภคจำนวนมากจึงสามารถรับเหตุการณ์เดียวกันและดำเนินการต่างๆ ได้
-
เวลาแฝงต่ำ เหตุการณ์ถูกแยกออกและให้ผู้บริโภคใช้งานได้ทุกเวลา มันสามารถเกิดขึ้นได้ในหน่วยมิลลิวินาที
-
อิสรภาพ อย่างที่เราทราบกันดีว่าผู้เผยแพร่โฆษณาและผู้บริโภคมีความเป็นอิสระ ดังนั้นทีมต่างๆ จึงสามารถทำงานร่วมกับพวกเขาได้โดยใช้เหตุการณ์เดียวกันสำหรับการดำเนินการหรือวัตถุประสงค์อื่น
-
ความทนทานต่อความผิดพลาด แพลตฟอร์มการสตรีมเหตุการณ์บางอย่างช่วยให้คุณจัดการกับความล้มเหลวของผู้บริโภค ตัวอย่างเช่น ผู้บริโภคสามารถบันทึกตำแหน่งของตนและเริ่มต้นจากจุดนั้นอีกครั้งหากมีข้อผิดพลาดเกิดขึ้น
-
การจัดการตามเวลาจริง จะได้รับคำติชมแบบเรียลไทม์ ดังนั้นผู้ใช้จึงไม่ต้องรอเป็นนาทีหรือชั่วโมงเพื่อดูการตอบกลับของกิจกรรม
-
ประสิทธิภาพสูง แพลตฟอร์มกิจกรรมสามารถรองรับข้อความจำนวนมากได้เนื่องจากเวลาในการตอบสนองต่ำ เช่น หลายพันเหตุการณ์ในหนึ่งวินาที
ข้อเสียของการสตรีมเหตุการณ์
-
การตรวจสอบ เครื่องมือสตรีมเหตุการณ์บางอย่างไม่มีเครื่องมือตรวจสอบที่สมบูรณ์ พวกเขาเรียกร้องให้มีการติดตั้งเครื่องมือเพิ่มเติม เช่น Datadog หรือ New Relic
-
การกำหนดค่า การกำหนดค่าในเครื่องมือบางอย่างอาจดูล้นหลามแม้กระทั่งกับคนที่มีประสบการณ์ มีพารามิเตอร์มากมาย และบางครั้ง คุณจำเป็นต้องรู้เชิงลึกเกี่ยวกับตัวแบบเพื่อนำไปใช้
-
ไลบรารีไคลเอนต์ การนำ Kafka ไปใช้ในภาษาอื่นที่ไม่ใช่ Java ไม่ใช่เรื่องง่าย บางครั้ง ไลบรารีของไคลเอ็นต์ไม่ทันสมัย แสดงความไม่เสถียร หรือไม่มีทางเลือกมากมายให้เลือก
เครื่องมือยอดนิยมสำหรับการสตรีมกิจกรรมคือ Apache Kafka . เครื่องมือนี้อนุญาตให้ผู้ใช้ส่ง จัดเก็บ และขอข้อมูลได้ทุกที่ทุกเวลาที่ต้องการ มาคุยกันครับ
อาปาเช่ คาฟคา
"Apache Kafka เป็นแพลตฟอร์มการสตรีมเหตุการณ์แบบโอเพนซอร์สแบบกระจายที่ใช้โดยบริษัทหลายพันแห่งสำหรับไปป์ไลน์ข้อมูลประสิทธิภาพสูง การวิเคราะห์การสตรีม การรวมข้อมูล และแอปพลิเคชันที่สำคัญต่อภารกิจ"
Apache Kafka ได้รับการออกแบบมาโดยเฉพาะสำหรับการส่งบันทึกแบบเรียลไทม์ เหมาะสำหรับแอปพลิเคชันที่ต้องการสิ่งต่อไปนี้:
- การแลกเปลี่ยนข้อมูลที่เชื่อถือได้ระหว่างส่วนประกอบต่างๆ
- ความสามารถในการแบ่งปริมาณงานการส่งข้อความเมื่อข้อกำหนดของแอปพลิเคชันเปลี่ยนไป
- การรับส่งข้อมูลแบบเรียลไทม์สำหรับการประมวลผลข้อมูล
มาใช้ Kafka ในแอปพลิเคชัน Rails กันเถอะ!
การใช้ Kafka กับ Rails
อัญมณีที่มีชื่อเสียงที่สุดที่ใช้ Kafka ใน Ruby เรียกว่า ruby-kafka โดย Zendesk และยอดเยี่ยมมาก! ถึงกระนั้น คุณต้องดำเนินการทั้งหมดด้วยตนเอง ซึ่งเป็นเหตุผลว่าทำไมเราจึงมี "เฟรมเวิร์ก" ที่สร้างขึ้นด้วย ruby-kafka นอกจากนี้ยังช่วยเราในการกำหนดค่าและขั้นตอนการดำเนินการทั้งหมด
Karafka เป็นเฟรมเวิร์กที่ใช้เพื่อทำให้การพัฒนาแอปพลิเคชัน Ruby บน Apache Kafka ง่ายขึ้น
ในการทำงานกับ Kafka จำเป็นต้องติดตั้ง Java เนื่องจาก Kafka เป็นแอปพลิเคชัน Scala และ Java จึงต้องติดตั้ง Zookeeper
ก่อนการติดตั้ง ฉันต้องการอธิบายเล็กน้อยเกี่ยวกับ Zookeeper Zookeeper เป็นบริการแบบรวมศูนย์ที่จำเป็นสำหรับ Kafka; จะส่งการแจ้งเตือนในกรณีที่มีการเปลี่ยนแปลง เช่น การสร้างหัวข้อใหม่ ความผิดพลาดของนายหน้า การลบนายหน้า การลบหัวข้อ และอื่นๆ
งานหลักคือการจัดการโบรกเกอร์ Kafka รักษารายการด้วยเมตาดาต้าที่เกี่ยวข้อง และอำนวยความสะดวกกลไกการตรวจสอบสภาพ นอกจากนี้ยังช่วยในการเลือกโบรกเกอร์ชั้นนำสำหรับพาร์ติชันต่างๆ ของหัวข้อ
ข้อกำหนด
สำหรับ MacOS:
ตอนนี้ มาติดตั้ง Java และ Zookeeper ด้วยคำสั่งต่อไปนี้:
brew install java
brew install zookeeper
จากนั้น เราสามารถดำเนินการติดตั้ง Kafka ต่อไปได้:
brew install kafka
เมื่อเราติดตั้ง Kafka และ Zookeeper แล้ว จำเป็นต้องเริ่มบริการด้วยวิธีนี้:
brew services start zookeeper
brew services start kafka
สำหรับ Windows และ Linux:
คำแนะนำ:
- การติดตั้งจาวา
- ดาวน์โหลด Zookeeper
การติดตั้งราง
เพียงสร้างแอปพลิเคชัน Rails อย่างง่ายตามปกติ:
rails new karafka_example
และเพิ่ม karafka gem ภายใน Gemfile:
gem 'karafka'
จากนั้นเรียกใช้ bundle install
เพื่อติดตั้งอัญมณีที่เพิ่งเพิ่มเข้ามา และอย่าลืมเรียกใช้คำสั่งต่อไปนี้เพื่อรับทุกสิ่งของ Karafka:
bundle exec karafka install
คำสั่งนั้นควรสร้างไฟล์ที่น่าสนใจ:อันแรกคือ karafka.rb
ในไดเรกทอรีราก app/consumers/application_consumer.rb
และ app/responders/application_responder.rb
.
โปรแกรมเริ่มต้น Karafka
karafka.rb
file เหมือนกับโปรแกรม initializer ที่แยกจาก Rails config ช่วยให้คุณสามารถกำหนดค่าแอปพลิเคชัน Karafka และวาดเส้นทางบางเส้นทาง ซึ่งคล้ายกับ API เป็นเส้นทางแอปพลิเคชัน Rails แต่สำหรับหัวข้อและผู้บริโภค
โปรดิวเซอร์
โปรดิวเซอร์ มีหน้าที่สร้างกิจกรรม และเราสามารถเพิ่มลงใน app/responders
โฟลเดอร์ ตอนนี้ มาสร้างผู้ผลิตอย่างง่ายสำหรับผู้ใช้:
# app/responders/users_responder.rb
class UsersResponder < ApplicationResponder
topic :users
def respond(event_payload)
respond_to :users, event_payload
end
end
ผู้บริโภค
ผู้บริโภค มีหน้าที่อ่านเหตุการณ์/ข้อความทั้งหมดที่ส่งจากผู้ผลิต นี่เป็นเพียงผู้บริโภคที่บันทึกข้อความที่ได้รับ
# app/consumers/users_consumer.rb
class UsersConsumer < ApplicationConsumer
def consume
Karafka.logger.info "New [User] event: #{params}"
end
end
เราใช้ params
เพื่อรับเหตุการณ์ แต่ถ้าคุณจะอ่านเหตุการณ์เป็นชุดและคุณมี config config.batch_fetching
ตามจริง คุณควรใช้ params_batch
.
กำลังทดสอบ
ในการเรียกใช้บริการ Karafka ของเรา (บริการที่จะได้ยินเหตุการณ์) ให้ไปที่คอนโซล เปิดแท็บใหม่ ไปที่โครงการ Rails และเรียกใช้:
bundle exec karafka server
กิจกรรมที่ประสบความสำเร็จ
ตอนนี้ เปิดแท็บคอนโซลอื่น ไปที่โปรเจ็กต์ Rails แล้วพิมพ์สิ่งนี้:
rails c
ที่นั่น มาสร้างกิจกรรมด้วยการตอบกลับของเรา:
> UsersResponder.call({ event_name: "user_created", payload: { user_id: 1 } })
หากคุณตรวจสอบคอนโซล Rails เราจะได้รับข้อความนี้หลังจากสร้างกิจกรรมแล้ว:
Successfully appended 1 messages to users/0 on 192.168.1.77:9092 (node_id=0)
=> {"users"=>[["{\"event_name\":\"user_created\",\"payload\":{\"user_id\":1}}", {:topic=>"users"}]]}
และในแท็บบริการ Karafka คุณจะเห็นสิ่งนี้:
New [User] event: #<Karafka::Params::Params:0x00007fa76f0316c8>
Inline processing of topic users with 1 messages took 0 ms
1 message on users topic delegated to UsersConsumer
[[karafka_example] {}:] Marking users/0:1 as processed
[[karafka_example] {}:] Committing offsets: users/0:2
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 28 to 192.168.1.77:9092
แต่ถ้าคุณต้องการแค่เพย์โหลดข้อความ คุณสามารถเพิ่ม params.payload
ในผู้บริโภคของคุณและคุณจะมีสิ่งนี้:
Params deserialization for users topic successful in 0 ms
New [User] event: {"event_name"=>"user_created", "payload"=>{"user_id"=>1}}
Inline processing of topic users with 1 messages took 1 ms
1 message on users topic delegated to UsersConsumer
เหตุการณ์ที่ล้มเหลว
คุณสร้างโมเดลผู้ใช้ที่มีแอตทริบิวต์บางอย่างได้ เช่น email
, first_name
และ last_name
รันคำสั่งต่อไปนี้:
rails g model User email first_name last_name
จากนั้น คุณสามารถเรียกใช้การย้ายข้อมูลด้วยสิ่งนี้:
rails db:migrate
ตอนนี้ ให้เพิ่มการตรวจสอบดังนี้:
class User < ApplicationRecord
validates :email, uniqueness: true
end
สุดท้าย เราสามารถเปลี่ยนแปลงผู้บริโภคได้:
class UsersConsumer < ApplicationConsumer
def consume
Karafka.logger.info "New [User] event: #{params.payload}"
User.create!(params.payload['user'])
end
end
ดังนั้น มาสร้างสองกิจกรรมด้วยอีเมลเดียวกัน:
UsersResponder.call({ event_name: "user_created", user: { user_id: 1, email: '[email protected]', first_name: 'Bruce', last_name: 'Wayne' } } )
UsersResponder.call({ event_name: "user_created", user: { user_id: 2, email: '[email protected]', first_name: 'Bruce', last_name: 'Wayne' } } )
ด้วยสิ่งนี้ เหตุการณ์แรกจะถูกสร้างขึ้นในฐานข้อมูล:
New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>1, "email"=>"[email protected]", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
[[karafka_example] {users: 0}:] [fetch] Received response 2 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 3 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 3 from 192.168.1.77:9092
TRANSACTION (0.1ms) BEGIN
↳ app/consumers/users_consumer.rb:14:in `consume'
User Create (9.6ms) INSERT INTO "users" ("user_id", "email", "first_name", "last_name", "created_at", "updated_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id" [["user_id", "1"], ["email", "[email protected]"], ["first_name", "Bruce"], ["last_name", "Wayne"], ["created_at", "2021-03-10 04:29:14.827778"], ["updated_at", "2021-03-10 04:29:14.827778"]]
↳ app/consumers/users_consumer.rb:14:in `consume'
TRANSACTION (5.0ms) COMMIT
↳ app/consumers/users_consumer.rb:14:in `consume'
Inline processing of topic users with 1 messages took 70 ms
1 message on users topic delegated to UsersConsumer
แต่อันที่สองจะล้มเหลวเพราะเรามีการตรวจสอบที่ระบุว่าอีเมลนั้นไม่ซ้ำกัน หากคุณพยายามเพิ่มระเบียนอื่นด้วยอีเมลที่มีอยู่ คุณจะเห็นดังนี้:
New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>2, "email"=>"[email protected]", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
[[karafka_example] {users: 0}:] [fetch] Received response 2 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 3 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 3 from 192.168.1.77:9092
TRANSACTION (0.2ms) BEGIN
↳ app/consumers/users_consumer.rb:14:in `consume'
User Exists? (0.3ms) SELECT 1 AS one FROM "users" WHERE "users"."email" = $1 LIMIT $2 [["email", "[email protected]"], ["LIMIT", 1]]
↳ app/consumers/users_consumer.rb:14:in `consume'
TRANSACTION (0.2ms) ROLLBACK
↳ app/consumers/users_consumer.rb:14:in `consume'
[[karafka_example] {users: 0}:] Exception raised when processing users/0 at offset 42 -- ActiveRecord::RecordInvalid: Validation failed: Email has already been taken
คุณสามารถดูข้อผิดพลาดในบรรทัดสุดท้าย ActiveRecord::RecordInvalid: Validation failed: Email has already been taken
. แต่สิ่งที่น่าสนใจของที่นี่คือ Kafka จะพยายามจัดการกิจกรรมครั้งแล้วครั้งเล่า แม้ว่าคุณจะรีสตาร์ทเซิร์ฟเวอร์ Karafka เซิร์ฟเวอร์จะพยายามประมวลผลเหตุการณ์ล่าสุด คาฟคารู้ได้อย่างไรว่าจะเริ่มต้นอย่างไร
หากคุณเห็นคอนโซลของคุณ หลังจากเกิดข้อผิดพลาด คุณจะเห็นสิ่งนี้:
[[karafka_example] {users: 0}:] Exception raised when processing users/0 at offset 42
มันจะบอกคุณว่าออฟเซ็ตใดถูกประมวลผล:ในกรณีนี้ มันถูกออฟเซ็ต 42 ดังนั้น หากคุณเริ่มบริการ Karafka ใหม่ บริการจะเริ่มในออฟเซ็ตนั้น
[[karafka_example] {}:] Committing offsets with recommit: users/0:42
[[karafka_example] {users: 0}:] Fetching batches
จะยังคงล้มเหลวเนื่องจากเรามีการตรวจสอบอีเมลในรูปแบบผู้ใช้ของเรา ณ จุดนี้ ให้หยุดเซิร์ฟเวอร์ Karafka ลบหรือแสดงความคิดเห็นการตรวจสอบนั้น และเริ่มเซิร์ฟเวอร์ของคุณอีกครั้ง คุณจะเห็นว่ากิจกรรมดำเนินการสำเร็จอย่างไร:
[[karafka_example] {}:] Committing offsets with recommit: users/0:42
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 5 to 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Waiting for response 5 from 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Received response 5 from 192.168.1.77:9092
Params deserialization for users topic successful in 0 ms
New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>2, "email"=>"[email protected]", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
TRANSACTION (0.2ms) BEGIN
↳ app/consumers/users_consumer.rb:14:in `consume'
User Create (3.8ms) INSERT INTO "users" ("user_id", "email", "first_name", "last_name", "created_at", "updated_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id" [["user_id", "2"], ["email", "[email protected]"], ["first_name", "Bruce"], ["last_name", "Wayne"], ["created_at", "2021-03-10 04:49:37.832452"], ["updated_at", "2021-03-10 04:49:37.832452"]]
↳ app/consumers/users_consumer.rb:14:in `consume'
TRANSACTION (5.5ms) COMMIT
↳ app/consumers/users_consumer.rb:14:in `consume'
Inline processing of topic users with 1 messages took 69 ms
1 message on users topic delegated to UsersConsumer
[[karafka_example] {}:] Marking users/0:43 as processed
สุดท้าย คุณสามารถเห็นข้อความนี้ในบรรทัดสุดท้าย:Marking users/0:43 as processed
.
โทรกลับ
นี่คือสิ่งที่ Karafka นำเสนอ:คุณสามารถใช้การโทรกลับใน Consumer ของคุณได้ ในการทำเช่นนั้น คุณจะต้องนำเข้าโมดูลและใช้งานเท่านั้น จากนั้นเปิด UserConsumer
และเพิ่มสิ่งนี้:
class UsersConsumer < ApplicationConsumer
include Karafka::Consumers::Callbacks
before_poll do
Karafka.logger.info "*** Checking something new for #{topic.name}"
end
after_poll do
Karafka.logger.info '*** We just checked for new messages!'
end
def consume
Karafka.logger.info "New [User] event: #{params.payload}"
User.create!(params.payload['user'])
end
end
โพลเป็นสื่อกลางที่เราดึงข้อมูลบันทึกตามการชดเชยพาร์ติชั่นปัจจุบัน ดังนั้นการโทรกลับเหล่านั้น before_poll
และ after_poll
เหมือนกับชื่อของพวกเขา ถูกประหารชีวิตในขณะนั้น เราเพิ่งบันทึกข้อความ และคุณสามารถเห็นข้อความเหล่านั้นในเซิร์ฟเวอร์ Karafka ของคุณ อันหนึ่งก่อนดึงข้อมูล และอีกอันหลังจากนั้น:
*** Checking something new for users
[[karafka_example] {}:] No batches to process
[[karafka_example] {users: 0}:] [fetch] Received response 325 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 326 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 326 from 192.168.1.77:9092
*** We just checked for new messages!
การเต้นของหัวใจ
การเต้นของหัวใจเป็นเพียงวิธีที่เราในฐานะผู้บริโภคพูดกับคาฟคาว่าเรายังมีชีวิตอยู่ มิฉะนั้น คาฟคาจะถือว่าผู้บริโภคเสียชีวิต
ใน Karafka เรามีการกำหนดค่าเริ่มต้นที่จะทำสิ่งนี้ในช่วงเวลาหนึ่ง มันคือ kafka.heartbeat_interval
และค่าเริ่มต้นคือ 10 วินาที คุณสามารถเห็นการเต้นของหัวใจนี้ในเซิร์ฟเวอร์ Karafka ของคุณ
*** Checking something new for users
[[karafka_example_example] {}:] Sending heartbeat...
[[karafka_example_example] {}:] [heartbeat] Sending heartbeat API request 72 to 192.168.1.77:9092
[[karafka_example_example] {}:] [heartbeat] Waiting for response 72 from 192.168.1.77:9092
[[karafka_example_example] {}:] [heartbeat] Received response 72 from 192.168.1.77:9092
*** We just checked for new messages!
ด้วย Sending heartbeat...
, Kafka รู้ว่าเรายังมีชีวิตอยู่และเราเป็นสมาชิกที่ถูกต้องของกลุ่มผู้บริโภค นอกจากนี้เรายังสามารถใช้บันทึกได้มากขึ้น
มุ่งมั่น
การทำเครื่องหมายออฟเซ็ตว่าใช้แล้วเรียกว่าการออฟเซ็ต ใน Kafka เราบันทึกการชดเชยโดยการเขียนไปยังหัวข้อ Kafka ภายในที่เรียกว่าหัวข้อ offsets ระบบจะถือว่าข้อความถูกใช้ไปก็ต่อเมื่อมีการคอมมิตออฟเซ็ตกับหัวข้อออฟเซ็ต
Karafka มีการกำหนดค่าเพื่อดำเนินการนี้โดยอัตโนมัติในแต่ละครั้ง การกำหนดค่าคือ kafka.offset_commit_interval
และค่าของมันคือ 10 วินาทีโดยค่าเริ่มต้น ด้วยสิ่งนี้ Karakfa จะทำการชดเชยทุก ๆ 10 วินาที และคุณสามารถดูข้อความนั้นในเซิร์ฟเวอร์ Karafka ของคุณ:
*** Checking something new for users
[[karafka_example] {}:] No batches to process
[[karafka_example] {users: 0}:] [fetch] Received response 307 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 308 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 308 from 192.168.1.77:9092
[[karafka_example] {}:] Committing offsets: users/0:44
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 69 to 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Waiting for response 69 from 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Received response 69 from 192.168.1.77:9092
*** We just checked for new messages!
Committing offsets: users/0:44
บอกเราว่าการชดเชยใดที่กระทำ; ในกรณีของฉัน มันบอก Kafka ว่าสามารถคอมมิตหมายเลขออฟเซ็ต 44 จากหัวข้อ 0 ได้ ด้วยวิธีนี้ หากมีอะไรเกิดขึ้นกับบริการของเรา Karafka สามารถเริ่มต้นอีกครั้งเพื่อประมวลผลกิจกรรมจากออฟเซ็ตนั้น
บทสรุป
การสตรีมกิจกรรมช่วยให้เราเร็วขึ้น ใช้ข้อมูลได้ดีขึ้น และออกแบบประสบการณ์ผู้ใช้ที่ดียิ่งขึ้น ตามความเป็นจริง หลายบริษัทใช้รูปแบบนี้เพื่อสื่อสารบริการทั้งหมดของตนและเพื่อให้สามารถตอบสนองต่อเหตุการณ์ต่างๆ ได้แบบเรียลไทม์ ดังที่ฉันได้กล่าวไว้ก่อนหน้านี้ มีทางเลือกอื่นนอกเหนือจาก Karafka ที่คุณสามารถใช้กับ Rails ได้ คุณมีพื้นฐานอยู่แล้ว ตอนนี้ อย่าลังเลที่จะทดลองกับพวกเขา
ข้อมูลอ้างอิง
- https://kafka.apache.org/
- https://github.com/karafka/karafka
- https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern