سؤال

أنا جديد جدًا على Kafka ونحن نستخدم Kafka 0.8.1.

ما يجب علي فعله هو استهلاك رسالة من الموضوع.لذلك، سيتعين علي أن أكتب مستهلكًا واحدًا في Java والذي سيستهلك رسالة من الموضوع ثم يحفظ تلك الرسالة في قاعدة البيانات.بعد حفظ الرسالة، سيتم إرسال بعض الإقرارات إلى عميل Java.إذا كان الإقرار صحيحا، فيجب استهلاك الرسالة التالية من الموضوع.إذا كان الإقرار خاطئًا (مما يعني أنه بسبب بعض رسائل الخطأ، لا يمكن حفظ القراءة من الموضوع في قاعدة البيانات)، فيجب قراءة هذه الرسالة مرة أخرى.

أعتقد أنني بحاجة لاستخدام Simple Consumer، للتحكم في إزاحة الرسالة والاطلاع على مثال المستهلك البسيط كما هو موضح في هذا الرابط https://cwiki.Apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example.

في هذا المثال، يتم تقييم الإزاحة في طريقة التشغيل كـ 'readOffset'.هل أحتاج للعب مع ذلك؟على سبيل المثال.استطيع ان استخدم LatestTime() بدلاً من EarliestTime() وفي حالة الخطأ، سأقوم بإعادة ضبط الإزاحة على ما كان عليه قبل الاستخدام offset - 1.

هل هذه هي الطريقة التي يجب أن أمضي بها؟

هل كانت مفيدة؟

المحلول

أعتقد أنه يمكنك الانسجام مع استخدام المستهلك عالي المستوى (http://kafka.apache.org/documentation.html#highlevelconsumerapi)، والذي يجب أن يكون أسهل في الاستخدام من SimpleConsumer.لا أعتقد أن المستهلك يحتاج إلى إعادة قراءة الرسائل من كافكا عند فشل قاعدة البيانات، حيث أن المستهلك لديه بالفعل تلك الرسائل ويمكنه إعادة إرسالها إلى قاعدة البيانات أو القيام بأي شيء آخر يراه مناسبًا.

يقوم المستهلكون رفيعو المستوى بتخزين آخر إزاحة تمت قراءتها من قسم معين في Zookeeper (استنادًا إلى اسم مجموعة المستهلك)، بحيث عندما تموت عملية المستهلك ويتم إعادة تشغيلها لاحقًا (من المحتمل على مضيف آخر)، يمكنها الاستمرار في معالجة الرسائل حيث يتم متروك مهمل.من الممكن حفظ هذه الإزاحة تلقائيًا في Zookeeper بشكل دوري (راجع خصائص المستهلك auto.commit.enable وauto.commit.interval.ms)، أو حفظها عن طريق منطق التطبيق عن طريق الاتصال ConsumerConnector.commitOffsets .أنظر أيضا https://cwiki.Apache.org/confluence/display/KAFKA/Consumer+Group+Example .

أقترح عليك إيقاف تشغيل الالتزام التلقائي والالتزام بالإزاحات الخاصة بك بأنفسكم بمجرد تلقي إقرار قاعدة البيانات.وبالتالي، يمكنك التأكد من إعادة قراءة الرسائل غير المعالجة من Kafka في حالة فشل المستهلك وستصل جميع الرسائل الملتزمة بـ Kafka في النهاية إلى قاعدة البيانات مرة واحدة على الأقل (ولكن ليس "مرة واحدة بالضبط").

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top