سؤال

كما في الإجابة على سؤالي, لدي الوضع حيث أنا تجهيز عدد كبير من الأحداث التي تصل في طابور.كل حدث يتم التعامل معها بنفس الطريقة و كل حتى يمكن التعامل معها بشكل مستقل من جميع الأحداث الأخرى.

البرنامج يستفيد من سكالا التزامن إطار العديد من العمليات المعنية على غرار كما Actors.كما Actors عملية رسائلهم بالتتابع ، فهي ليست مناسبة تماما أن هذه المشكلة بالذات (على الرغم من بلدي أخرى الجهات المنفذة اإلجراءات التي هي متتابعة).كما أريد سكالا إلى "السيطرة" كل موضوع إنشاء (والتي أفترض هو الهدف من ذلك وجود نظام التزامن في المقام الأول) يبدو أنني لديك 2 الخيارات:

  1. إرسال الأحداث إلى مجموعة من الحدث المعالجات التي يمكنني التحكم
  2. الحصول على بلدي Actor لمعالجتها بشكل متزامن من خلال آلية أخرى

أنا أعتقد أن #1 ينفي الفائدة من استخدام الجهات الفاعلة الفرعي: كيف العديد من المعالج الفاعلة علي خلق ؟ يتم سؤال واحد واضح.هذه الأمور يفترض أن تكون مخفية عن لي و حلها من قبل النظام الفرعي.

وكان جوابي أن تفعل ما يلي:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //I want to be able to handle multiple events at the same time
        //create a new actor to handle it
        actor {
          //processing code here
          process(x)
        }
    }
  }
}

هل هناك أفضل النهج ؟ هذا غير صحيح ؟

تحرير:ربما أفضل نهج هو:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //Pass processing to the underlying ForkJoin framework
        Scheduler.execute(process(e))
    }
  }
}
هل كانت مفيدة؟

المحلول

هذا يبدو وكأنه نسخة مكررة من سؤال آخر.لذلك سوف مكررة جوابي

الجهات الفاعلة في العملية رسالة واحدة في كل مرة.الكلاسيكية نمط عملية رسائل متعددة هو واحد المنسق الممثل أمام مجموعة من المستهلكين الفاعلة.إذا كنت تستخدم تتفاعل ثم المستهلك بركة يمكن أن تكون كبيرة ولكن سوف لا تزال لا تستخدم سوى عدد قليل من JVM المواضيع.هنا مثال أين يمكنني إنشاء مجموعة من 10 المستهلكين واحد منسق الجبهة بالنسبة لهم.

import scala.actors.Actor
import scala.actors.Actor._

case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop

def consumer(n : Int) = actor {
  loop {
    react {
      case Ready(sender) => 
        sender ! Ready(self)
      case Request(sender, payload) =>
        println("request to consumer " + n + " with " + payload)
        // some silly computation so the process takes awhile
        val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
        sender ! Result(result)
        println("consumer " + n + " is done processing " + result )
      case Stop => exit
    }
  }
}

// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)

val coordinator = actor {
  loop {
     react {
        case msg @ Request(sender, payload) =>
           consumers foreach {_ ! Ready(self)}
           react {
              // send the request to the first available consumer
              case Ready(consumer) => consumer ! msg
           }
         case Stop => 
           consumers foreach {_ ! Stop} 
           exit
     }
  }
}

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)

هذا الرمز اختبارات لمعرفة المستهلك هو متاح و يرسل الطلب إلى أن المستهلك.البدائل هي فقط عشوائيا تعيين إلى المستهلكين أو استخدام جدولة راوند روبن.

اعتمادا على ما تقومون به, قد يكون من الأفضل مع سكالا الآجلة.على سبيل المثال, إذا كنت حقا لا تحتاج الجهات الفاعلة ثم كل ما سبق الآلات يمكن أن تكون مكتوبة كما

import scala.actors.Futures._

def transform(payload : String) = {      
  val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
  println("transformed " + payload + " to " + result )
  result
}

val results = for (i <- 0 to 1000) yield future(transform(i.toString))

نصائح أخرى

إن الأحداث التي يمكن التعامل معها بشكل مستقل ، لماذا هم على قائمة الانتظار ؟ معرفة شيء عن التصميم الخاص بك, هذا يبدو وكأنه خطوة غير ضرورية.إذا كنت يمكن أن يؤلف process وظيفة مع أي إطلاق هذه الأحداث يمكن أن يحتمل أن يلغي الطابور.

فاعل أساسا هو المتزامنة تأثير مجهزة الانتظار.إذا كنت تريد أن عملية رسائل متعددة في وقت واحد ، أنت لا تريد حقا فاعل.كنت ترغب فقط في وظيفة (أي => ()) أن يكون المقرر تنفيذها في وقت مناسب.

أما وقد قلت ذلك ، النهج الخاص بك هو معقول إذا كنت ترغب في البقاء داخل الجهات المكتبة وإذا كان انتظار الحدث ليس ضمن عنصر التحكم الخاص بك.

Scalaz يميز بين الجهات الفاعلة المتزامنة الآثار.في حين Actor هي خفيفة جدا في الوزن ، scalaz.concurrent.Effect هو أخف وزنا لا يزال.هنا هو رمز تترجم تقريبا إلى Scalaz المكتبة:

val eventProcessor = effect (x => process x)

هذا مع أحدث الجذع والرأس ، لم يفرج عنه حتى الآن.

هذا يبدو وكأنه بسيط المستهلك والمنتج المشكلة.كنت استخدم طابور مع مجموعة من المستهلكين.وربما كنت يمكن أن يكتب هذا مع بضعة أسطر من التعليمات البرمجية باستخدام جافا.util.المتزامنة.

الغرض من فاعل (أحدها) أن تكفل الدولة داخل الفاعل لا يمكن الوصول إليها إلا من خلال موضوع واحد في وقت واحد.إن معالجة رسالة لا تعتمد على أي قابلة للتغيير دولة داخل الفاعل ، ثم إنه قد يكون من المناسب أن مجرد تقديم مهمة جدولة أو تجمع مؤشرات الترابط في العملية.اضافية التجريد أن الفاعل هو في الواقع الحصول على طريقك.

هناك طرق سهلة في سكالا.الجهات الفاعلة.جدولة لهذا أو يمكنك استخدام منفذ من جافا.util.المتزامنة.

الجهات الفاعلة أكثر وخفيفة الوزن من المواضيع ، مثل خيار آخر هو استخدام الممثل الكائنات مثل Runnable الكائنات كنت تستخدم الخضوع تجمع مؤشرات الترابط.والفرق الرئيسي هو أنك لا داعي للقلق حول ThreadPool - تجمع مؤشرات الترابط تدار لك من قبل الممثل الإطار هو في الغالب التكوين القلق.

def submit(e: MyEvent) = actor {
  // no loop - the actor exits immediately after processing the first message
  react {
    case MyEvent(x) =>
      process(x)
  }
} ! e // immediately send the new actor a message

ثم تقديم رسالة ، أقول هذا:

submit(new MyEvent(x))

, الذي يتوافق مع

eventProcessor ! new MyEvent(x)

من سؤالك.

اختبار هذا النمط بنجاح مع 1 مليون الرسائل المرسلة والمستلمة في حوالي 10 ثواني على quad-core i7 laptop.

ويساعد هذا الأمل.

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top