منفذي جافا:كيف يتم إعلامك، دون حظر، عند اكتمال المهمة؟

StackOverflow https://stackoverflow.com/questions/826212

سؤال

لنفترض أن لدي قائمة انتظار مليئة بالمهام التي أحتاج إلى تقديمها إلى خدمة المنفذ.أريد معالجتها واحدة تلو الأخرى.إن أبسط طريقة يمكنني التفكير بها هي:

  1. خذ مهمة من قائمة الانتظار
  2. أرسله إلى المنفذ
  3. اتصل بـ .get على المستقبل الذي تم إرجاعه وقم بحظره حتى تتوفر نتيجة
  4. قم بمهمة أخرى من قائمة الانتظار...

ومع ذلك، أحاول تجنب الحظر تمامًا.إذا كان لدي 10000 قائمة انتظار من هذا القبيل، والتي تحتاج إلى معالجة مهامها واحدة تلو الأخرى، فسوف تنفد مساحة المكدس لأن معظمها سيحتفظ بسلاسل الرسائل المحظورة.

ما أريده هو إرسال مهمة وتقديم معاودة اتصال يتم الاتصال بها عند اكتمال المهمة.سأستخدم إشعار معاودة الاتصال هذا كعلامة لإرسال المهمة التالية.(يبدو أن جافا الوظيفية وجيتلانج تستخدمان مثل هذه الخوارزميات غير المحظورة، لكن لا يمكنني فهم الكود الخاص بهما)

كيف يمكنني القيام بذلك باستخدام java.util.concurrent الخاص بـ JDK، دون كتابة خدمة المنفذ الخاصة بي؟

(قد يتم حظر قائمة الانتظار التي تغذيني بهذه المهام، ولكن هذه مشكلة يجب معالجتها لاحقًا)

هل كانت مفيدة؟

المحلول

حدد واجهة رد الاتصال لتلقي أي معلمات تريد تمريرها في إشعار الإكمال.ثم استدعائه في نهاية المهمة.

يمكنك أيضًا كتابة غلاف عام للمهام القابلة للتشغيل وإرسالها إلى ExecutorService.أو انظر أدناه للتعرف على الآلية المضمنة في Java 8.

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}

مع CompletableFuture, ، يتضمن Java 8 وسيلة أكثر تفصيلاً لإنشاء خطوط الأنابيب حيث يمكن إكمال العمليات بشكل غير متزامن ومشروط.فيما يلي مثال مفتعل ولكنه كامل للإخطار.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}

نصائح أخرى

في Java 8 يمكنك استخدام المستقبل الكامل.فيما يلي مثال لدي في الكود الخاص بي حيث أستخدمه لجلب المستخدمين من خدمة المستخدم الخاصة بي، وتعيينهم لكائنات العرض الخاصة بي، ثم تحديث العرض الخاص بي أو إظهار مربع حوار الخطأ (هذا تطبيق واجهة المستخدم الرسومية):

    CompletableFuture.supplyAsync(
            userService::listUsers
    ).thenApply(
            this::mapUsersToUserViews
    ).thenAccept(
            this::updateView
    ).exceptionally(
            throwable -> { showErrorDialogFor(throwable); return null; }
    );

يتم تنفيذه بشكل غير متزامن.أنا أستخدم طريقتين خاصتين: mapUsersToUserViews و updateView.

يستخدم واجهة برمجة التطبيقات المستقبلية القابلة للاستماع لـ Guava وإضافة رد الاتصال.راجع.من الموقع :

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});

هل يمكن أن تمتد FutureTask الطبقة، وتجاوز done() الطريقة، ثم قم بإضافة FutureTask يعترض على ExecutorService, ، لذلك done() سيتم استدعاء الطريقة عندما FutureTask الانتهاء على الفور.

ThreadPoolExecutor لديها أيضا beforeExecute و afterExecute طرق الخطاف التي يمكنك تجاوزها والاستفادة منها.هنا الوصفة من ThreadPoolExecutorجافادوكس.

طرق هوك

توفر هذه الفئة الحماية القابلة للتجاوز beforeExecute(java.lang.Thread, java.lang.Runnable) و afterExecute(java.lang.Runnable, java.lang.Throwable) الأساليب التي يتم استدعاؤها قبل وبعد تنفيذ كل مهمة.يمكن استخدامها لمعالجة بيئة التنفيذ؛على سبيل المثال، إعادة التهيئة ThreadLocals, أو جمع الإحصائيات أو إضافة إدخالات السجل.بالإضافة إلى ذلك، طريقة terminated() يمكن تجاوزها لإجراء أي معالجة خاصة يجب إجراؤها بمجرد Executor تم إنهاؤه بالكامل.إذا طرحت أساليب الخطاف أو رد الاتصال استثناءات، فقد تفشل سلاسل العمليات الداخلية بدورها وتنتهي بشكل مفاجئ.

إستخدم CountDownLatch.

انها من java.util.concurrent وهي بالضبط طريقة الانتظار حتى تكتمل عدة سلاسل عمليات التنفيذ قبل المتابعة.

من أجل تحقيق تأثير رد الاتصال الذي تبحث عنه، فإن ذلك يتطلب القليل من العمل الإضافي.وهي التعامل مع هذا بنفسك في موضوع منفصل يستخدم ملف CountDownLatch وينتظر ذلك، ثم يستمر في إخطار كل ما تحتاج إلى إخطاره.لا يوجد دعم أصلي لرد الاتصال، أو أي شيء مشابه لهذا التأثير.


يحرر: والآن بعد أن فهمت سؤالك بشكل أكبر، أعتقد أنك قد وصلت إلى أبعد من ذلك، دون داع.إذا كنت تأخذ العادية SingleThreadExecutor, ، وقم بتكليفه بجميع المهام، وسوف يقوم بالانتظار محليًا.

إذا كنت تريد التأكد من عدم تشغيل أي مهام في نفس الوقت، فاستخدم ملف تنفيذي مفرد.ستتم معالجة المهام بالترتيب الذي تم إرسالها به.لا تحتاج حتى إلى الاحتفاظ بالمهام، فقط أرسلها إلى المسؤول التنفيذي.

فقط للإضافة إلى إجابة مات، التي ساعدت، إليك مثال أكثر تفصيلاً لإظهار استخدام رد الاتصال.

private static Primes primes = new Primes();

public static void main(String[] args) throws InterruptedException {
    getPrimeAsync((p) ->
        System.out.println("onPrimeListener; p=" + p));

    System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
    void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
    CompletableFuture.supplyAsync(primes::getNextPrime)
        .thenApply((prime) -> {
            System.out.println("getPrimeAsync(); prime=" + prime);
            if (listener != null) {
                listener.onPrime(prime);
            }
            return prime;
        });
}

الإخراج هو:

    getPrimeAsync(); prime=241
    onPrimeListener; p=241
    Adios mi amigito

كود بسيط للتنفيذ Callback آلية باستخدام ExecutorService

import java.util.concurrent.*;
import java.util.*;

public class CallBackDemo{
    public CallBackDemo(){
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(5);

        try{
            for ( int i=0; i<5; i++){
                Callback callback = new Callback(i+1);
                MyCallable myCallable = new MyCallable((long)i+1,callback);
                Future<Long> future = service.submit(myCallable);
                //System.out.println("future status:"+future.get()+":"+future.isDone());
            }
        }catch(Exception err){
            err.printStackTrace();
        }
        service.shutdown();
    }
    public static void main(String args[]){
        CallBackDemo demo = new CallBackDemo();
    }
}
class MyCallable implements Callable<Long>{
    Long id = 0L;
    Callback callback;
    public MyCallable(Long val,Callback obj){
        this.id = val;
        this.callback = obj;
    }
    public Long call(){
        //Add your business logic
        System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
        callback.callbackMethod();
        return id;
    }
}
class Callback {
    private int i;
    public Callback(int i){
        this.i = i;
    }
    public void callbackMethod(){
        System.out.println("Call back:"+i);
        // Add your business logic
    }
}

انتاج:

creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4

الملاحظات الرئيسية:

  1. إذا كنت تريد تنفيذ مهام المعالجة بالتسلسل حسب ترتيب FIFO، فاستبدلها newFixedThreadPool(5) مع newFixedThreadPool(1)
  2. إذا كنت تريد معالجة المهمة التالية بعد تحليل النتيجة منها callback للمهمة السابقة، فقط قم بإلغاء التعليق أدناه

    //System.out.println("future status:"+future.get()+":"+future.isDone());
    
  3. يمكنك استبدال newFixedThreadPool() مع واحد من

    Executors.newCachedThreadPool()
    Executors.newWorkStealingPool()
    ThreadPoolExecutor
    

    اعتمادا على حالة الاستخدام الخاصة بك.

  4. إذا كنت تريد التعامل مع طريقة رد الاتصال بشكل غير متزامن

    أ.تمرير مشترك ExecutorService or ThreadPoolExecutor إلى مهمة قابلة للاستدعاء

    ب.تحويل الخاص بك Callable طريقة ل Callable/Runnable مهمة

    ج.دفع مهمة رد الاتصال إلى ExecutorService or ThreadPoolExecutor

هذا امتداد لإجابة باتشي باستخدام إجابة الجوافة ListenableFuture.

بخاصة، Futures.transform() عائدات ListenableFuture لذلك يمكن استخدامها لسلسلة المكالمات غير المتزامنة. Futures.addCallback() عائدات void, ، لذلك لا يمكن استخدامه للتسلسل، ولكنه جيد للتعامل مع النجاح/الفشل عند الإكمال غير المتزامن.

// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());

// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
    Futures.transform(database, database -> database.query(table, ...));

// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
    Futures.transform(cursor, cursor -> cursorToFooList(cursor));

// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
  public void onSuccess(List<Foo> foos) {
    doSomethingWith(foos);
  }
  public void onFailure(Throwable thrown) {
    log.error(thrown);
  }
});

ملحوظة: بالإضافة إلى تسلسل المهام غير المتزامنة، Futures.transform() يسمح لك أيضًا بجدولة كل مهمة على منفذ منفصل (غير موضح في هذا المثال).

يمكنك استخدام تطبيق Callable من هذا القبيل

public class MyAsyncCallable<V> implements Callable<V> {

    CallbackInterface ci;

    public MyAsyncCallable(CallbackInterface ci) {
        this.ci = ci;
    }

    public V call() throws Exception {

        System.out.println("Call of MyCallable invoked");
        System.out.println("Result = " + this.ci.doSomething(10, 20));
        return (V) "Good job";
    }
}

حيث يعتبر CallbackInterface شيئًا أساسيًا جدًا

public interface CallbackInterface {
    public int doSomething(int a, int b);
}

والآن ستبدو الطبقة الرئيسية بهذا الشكل

ExecutorService ex = Executors.newFixedThreadPool(2);

MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);
مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top