سؤال

يبدو أن من المستحيل لجعل مؤقتا تجمع مؤشرات الترابط مع الحد إلى عدد من المواضيع التي يمكن أن تخلق.

هنا هو كيف ثابتة والمنفذين.newCachedThreadPool يتم تنفيذها في مكتبة جافا القياسية:

 public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

وذلك باستخدام هذا القالب أن أذهب إلى خلق ثابت الحجم مؤقتا تجمع مؤشر الترابط:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());

الآن إذا كنت تستخدم هذا و تقديم 3 المهام ، كل شيء سيكون على ما يرام.تقديم أي مهام أخرى سوف يؤدي إلى رفض تنفيذ الاستثناءات.

يحاول هذا:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());

سيؤدي في جميع المواضيع تنفيذ بالتتابع.I. e., تجمع مؤشرات الترابط لن تجعل أكثر من موضوع واحد في التعامل مع المهام الخاصة بك.

هذا هو الخلل في تنفيذ طريقة ThreadPoolExecutor?أو ربما هذا هو مقصود ؟ أو هناك طريقة أخرى ؟

تحرير:أريد شيئا تماما مثل مؤقتا تجمع مؤشرات الترابط (يخلق المواضيع على الطلب ثم يقتل منهم بعد المهلة) ولكن مع حد على عدد من المواضيع التي يمكن أن تخلق و القدرة على الاستمرار في طابور المهام الإضافية مرة واحدة وقد ضرب الخيط الحد.وفقا sjlee رد هذا أمر مستحيل.النظر في تنفيذ() طريقة ThreadPoolExecutor هو في الواقع من المستحيل.سوف تحتاج إلى فرعية ThreadPoolExecutor و تجاوز تنفيذ() إلى حد ما مثل SwingWorker ، ولكن ما SwingWorker هل في تنفيذ() هو استكمال هاك.

هل كانت مفيدة؟

المحلول

على ThreadPoolExecutor التالية عدة الرئيسية السلوكيات و المشاكل الخاصة بك يمكن تفسير هذه السلوكيات.

عندما المهام المقدمة ،

  1. إذا تجمع مؤشرات الترابط لم يبلغ حجم النواة ، فإنه يخلق مواضيع جديدة.
  2. إذا كان حجم النواة تم التوصل إليه و لا يوجد المواضيع الخمول ، قوائم المهام.
  3. إذا كان حجم النواة تم التوصل إليه ، لا يوجد المواضيع الخمول و الانتظار يصبح كامل ، فإنه يخلق مواضيع جديدة (حتى تصل إلى الحجم الأقصى).
  4. إذا كان الحد الأقصى لحجم تم التوصل إليه ، لا يوجد المواضيع الخمول و الانتظار يصبح كامل ، ورفض سياسة في ركلات.

في المثال الأول ، علما أن SynchronousQueue أساسا حجم 0.ولذلك لحظة تصل إلى الحجم الأقصى (3) ، ورفض سياسة جزاء في (#4).

في المثال الثاني ، طابور من الاختيار هو linkedblockingqueue العلامة التي لديها حجم غير محدود.ولذلك واجهتك مشكلة مع سلوك #2.

لا يمكنك حقا العبث كثيرا مع ذاكرة التخزين المؤقت نوع أو نوع ثابت ، كما سلوكهم هو تماما تقريبا تحديدها.

إذا كنت تريد أن يكون لها يحدها ديناميكية تجمع مؤشرات الترابط ، تحتاج إلى استخدام إيجابي حجم النواة و ماكس حجم جنبا إلى جنب مع طابور من حجم محدود.على سبيل المثال ،

new ThreadPoolExecutor(10, // core size
    50, // max size
    10*60, // idle timeout
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(20)); // queue with a size

الإضافة:هذا هو إلى حد ما قديمة الإجابة ، ويبدو أن جدك تغير سلوكها عندما يتعلق الأمر حجم النواة من 0.منذ JDK 1.6, إذا كان حجم النواة هو 0 و المسبح لا توجد أية مواضيع ، ThreadPoolExecutor سيتم إضافة الموضوع إلى تنفيذ هذه المهمة.وبالتالي فإن حجم النواة 0 هو استثناء من القاعدة أعلاه.شكرا ستيف بالنسبة وبذلك التي انتباهي.

نصائح أخرى

وإذا لم لقد غاب عن شيء، وإيجاد حل لمسألة الأصلي هو بسيط. التعليمات البرمجية التالية تطبق السلوك المطلوب كما وصفها الملصق الأصلي. وسوف تفرخ تصل إلى 5 المواضيع للعمل على قائمة انتظار غير محدود وسوف المواضيع الخمول إنهاء بعد 60 ثانية.

tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>());
tp.allowCoreThreadTimeOut(true);

كان نفس القضية. لأن أي إجابة أخرى يضع كل القضايا معا ، أنا إضافة لي:

فمن الواضح الآن أنه مكتوب في مستندات:إذا كنت تستخدم قائمة الانتظار التي لا كتل (LinkedBlockingQueue) ماكس المواضيع الإعداد لا يؤثر فقط المواضيع الأساسية المستخدمة.

لذلك:

public class MyExecutor extends ThreadPoolExecutor {

    public MyExecutor() {
        super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        allowCoreThreadTimeOut(true);
    }

    public void setThreads(int n){
        setMaximumPoolSize(Math.max(1, n));
        setCorePoolSize(Math.max(1, n));
    }

}

هذا المنفذ له:

  1. لا مفهوم ماكس المواضيع كما نقوم باستخدام غير محدود الانتظار.وهذا أمر جيد لأن مثل هذا الانتظار قد يسبب المنفذ إلى إنشاء عدد هائل من غير الأساسية, المواضيع إضافية إذا كان يلي المعتاد السياسة.

  2. طابور من ماكس الحجم Integer.MAX_VALUE. Submit() سوف رمي RejectedExecutionException إذا كان عدد من المهام العالقة يتجاوز Integer.MAX_VALUE.غير متأكد من أننا سوف ينفد من الذاكرة الأولى أو هذا سوف يحدث.

  3. 4 كور المواضيع ممكن.الخمول الأساسية المواضيع تلقائيا الخروج إذا كان خاملا لمدة 5 ثوان.لذا, نعم, بدقة على الطلب المواضيع.عدد يمكن أن تكون متنوعة باستخدام setThreads() الأسلوب.

  4. يتأكد مين عدد من المواضيع الأساسية ليست أبدا أقل من واحد أو آخر submit() يرفض كل مهمة.منذ الأساسية المواضيع تحتاج إلى أن يكون >= ماكس المواضيع الطريقة setThreads() مجموعات ماكس المواضيع كذلك ، على الرغم من ماكس وضع موضوع لا طائل منه على غير انتظار.

في المثال الأول، يتم رفض المهام اللاحقة لأن AbortPolicy هو RejectedExecutionHandler الافتراضية. يحتوي على ThreadPoolExecutor السياسات التالية، والتي يمكنك تغيير من خلال طريقة setRejectedExecutionHandler:

CallerRunsPolicy
AbortPolicy
DiscardPolicy
DiscardOldestPolicy

وهذا يبدو وكأنه كنت تريد ترابط التجمع مؤقتا مع CallerRunsPolicy.

وأي من الإجابات هنا ثابتة مشكلتي، الذي كان علي القيام به مع إنشاء كمية محدودة من اتصالات HTTP باستخدام عميل HTTP أباتشي (إصدار 3.x). منذ استغرق مني بضع ساعات لمعرفة الإعداد الجيد، وسوف أشارك:

private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L,
  TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
  Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

وهذا يخلق ThreadPoolExecutor الذي يبدأ مع خمسة ويحمل بحد أقصى عشرة مؤشرات الترابط قيد التشغيل في وقت واحد باستخدام CallerRunsPolicy لتنفيذ.

ولكل من جافادوك لThreadPoolExecutor:

<اقتباس فقرة>   

وإذا كان هناك أكثر من corePoolSize ولكن أقل من تشغيلها، سيتم إنشاء المواضيع maximumPoolSize موضوع جديد <م> إلا إذا قائمة الانتظار الكامل . عن طريق وضع corePoolSize وmaximumPoolSize نفسه، يمكنك إنشاء تجمع مؤشرات ترابط ذات حجم ثابت.

و(التشديد من الألغام).

والجواب غضب هو ما تريد، وعلى الرغم من أن الألغام يجيب عن الأخرى الخاصة بك. :)

وهناك أكثر من خيار. بدلا من استخدام SynchronousQueue الجديدة يمكنك استخدام أي قائمة انتظار أخرى أيضا، ولكن لديك للتأكد من حجمه 1، بحيث سيجبر executorservice لخلق ترابط جديد.

لا تبدو كما لو أي من الإجابات الإجابة في الواقع السؤال - في الحقيقة لا استطيع ان ارى وسيلة للقيام بذلك - حتى لو كنت فئة فرعية من PooledExecutorService لأن العديد من الأساليب / الخصائص مثل الخاص صنع addIfUnderMaximumPoolSize كان محميا هل يمكن أن تفعل ما يلي:

class MyThreadPoolService extends ThreadPoolService {
    public void execute(Runnable run) {
        if (poolSize() == 0) {
            if (addIfUnderMaximumPoolSize(run) != null)
                return;
        }
        super.execute(run);
    }
}

والأقرب ما حصلت عليه هو هذا - ولكن حتى هذا ليس حلا جيدا للغاية

new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
    public void execute(Runnable command) {
        if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) {        
            super.setCorePoolSize(super.getCorePoolSize() + 1);
        }
        super.execute(command);
    }

    protected void afterExecute(Runnable r, Throwable t) {
         // nothing in the queue
         if (getQueue().isEmpty() && getPoolSize() > min) {
             setCorePoolSize(getCorePoolSize() - 1);
         }
    };
 };

وp.s. لم تختبر أعلى

هذا هو ما تريد (على الأقل أعتقد ذلك).تفسيرا الاختيار جوناثان فاينبيرغ الإجابة

Executors.newFixedThreadPool(int n)

يخلق تجمع مؤشر الترابط الذي يعيد استخدام عدد محدد من المواضيع التشغيل إيقاف مشترك غير محدود الانتظار.في أي نقطة على الأكثر nThreads المواضيع سوف تكون نشطة معالجة المهام.إن المهام الإضافية المقدمة عند كل المواضيع النشطة ، وسوف ننتظر في طابور حتى الخيط هو متاح.إذا كان أي موضوع ينهي بسبب فشل أثناء التنفيذ قبل الاغلاق ، واحدة جديدة سوف تأخذ مكانها إذا لزم الأمر لتنفيذ المهام اللاحقة.المواضيع في المسبح موجودة حتى ذلك صراحة الاغلاق.

وهنا حل آخر. وأعتقد أن هذا الحل تتصرف كما تريد أن (وإن لم يكن فخورا هذا الحل):

final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    public boolean offer(Runnable o) {
        if (size() > 1)
            return false;
        return super.offer(o);
    };

    public boolean add(Runnable o) {
        if (super.offer(o))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
};

RejectedExecutionHandler handler = new RejectedExecutionHandler() {         
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        queue.add(r);
    }
};

dbThreadExecutor =
        new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler);
  1. يمكنك استخدام ThreadPoolExecutor كما اقترح @sjlee

    يمكنك التحكم في حجم تجمع حيوي.إلقاء نظرة على هذا السؤال لمزيد من التفاصيل :

    ديناميكية تجمع مؤشرات الترابط

    أو

  2. يمكنك استخدام newWorkStealingPool API التي أدخلت مع جافا 8.

    public static ExecutorService newWorkStealingPool()
    

    يخلق العمل-سرقة تجمع مؤشرات الترابط باستخدام كل ما هو متاح المعالجات المستهدفة التوازي المستوى.

بشكل افتراضي ، التوازي على مستوى تعيين عدد النوى وحدة المعالجة المركزية في الخادم الخاص بك.إذا كان لديك 4 وحدة المعالجة المركزية الأساسية خادم تجمع الموضوع سيكون حجم 4.هذا API ForkJoinPool نوع من ExecutorService والسماح بالعمل سرقة من الخمول المواضيع التي كتبها سرقة المهام من مشغول المواضيع في ForkJoinPool.

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top