Domanda

I am trying to create multithreads for lucene indexing. Let's say I have a list of numeric numbers from 101 to 999. what I want to achieve is that split this list into different groups and each group is handled by different thread.

I am able to partition it to different group, however, when running multithreading, all threads only take the first partition (l[0]) in this case. for example, the above code creates 3 threads( t1,t2, t3) and I also have three partitions (p1,p2,p3) where p1 [101,400], p2[401,600], p3[601,999]. Supposely, t1 runs on p1, t2 runs on p2 and t3 runs on p3. But now, all threads run on p1.

does anyone know how to fix it?

added changes based on Tim's suggestion, but still same result

final int BLOCK_SIZE = 1; 
AtomicInteger nextBlock = new AtomicInteger(0);
 int blockToProcess =  nextBlock.getAndIncrement(); 
 int endBlocks = (blockToProcess+partitions.size())*BLOCK_SIZE; 
 for(int i=BLOCK_SIZE*blockToProcess;i<endBlocks;i++)
 { 
  Myclass it=new Myclass(l);
  todo.add( Executors.callable(it));
  } 
 taskExecutor.invokeAll(todo);

For partitioning, I used Guava library and confirmed with output, so partitions seems ok

I tried following as well, still same result. Actually, the following was what I had at first place.

 for(int i=0;i<partitions.size;i++)
{ 
Myclass it=new Myclass(partitions.get(i)); 
taskExecutor.execute(it);
 } 

to make things easier and cleaner, I just created whole new testing files as following there are two classes: Test and TestThreads For Test class

    public class Test {

public static void main(String[] args) throws InterruptedException {
    // TODO Auto-generated method stub

    int numOfthreads=2;
    List<String> originalList=new ArrayList<String>();

    for(int i=0;i<20;i++)
    {
        originalList.add(Integer.toString(i));
    }

    int partitionSize = IntMath.divide(originalList.size(), numOfthreads, RoundingMode.UP);
    List<List<String>> partitions=Lists.partition(originalList, partitionSize);
    List<Callable<Object>> todo = new ArrayList<Callable<Object>>();
    int count=0;
    ExecutorService taskExecutor = Executors.newFixedThreadPool(numOfthreads);

      for(int i=0;i<partitions.size();i++)
      { 
          TestThreads  it=new TestThreads(partitions.get(i));

            todo.add( Executors.callable(it));
             System.out.println("Created thread " +count+", containing: "
             +partitions.get(i).size()+" files\n");
             for(String s:partitions.get(i))
             {
                 System.out.print(s+" ");
             }
             count++;
             System.out.println("\n");
         }

        taskExecutor.invokeAll(todo);

}

 }

For TestThreads class:

   public class TestThreads implements Runnable {
private static List<String> lis;
public TestThreads(List<String> list)
{
    lis=list;
}
   public void run()
   {
System.out.println("This is thread "+Thread.currentThread().getId());

System.out.println("-----------------------------------------");
for(String s:lis)
{
    System.out.println(s);
}
   }
   }

Update:

it was STATIC list that causes issue, everything works fine after removing it. Appreciate the time and help from Tim and Xiezi !

È stato utile?

Soluzione 3

For your new testing codes, 2 things:

  1. You may not understand multi-threading very well. They way you output the result can't show anything. You should output the result with the thread Id instead of putting it at the beginning:

    public void run() {
        for (String s : lis) {
            System.out.println(Thread.currentThread().getId() + " : " + s);
        }
    }
    
  2. Why do you put the list as static in the TestThreads class? You may be better to check the way of using static. And I believe this is the reason. You can try my code which is just the same with yours except for removing the static:

    import java.math.RoundingMode;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import com.google.common.collect.Lists;
    import com.google.common.math.IntMath;
    
    public class Test {
    
        public static void main(String[] args) throws InterruptedException {
            // TODO Auto-generated method stub
    
            int numOfthreads = 2;
            List<String> originalList = new ArrayList<String>();
    
            for (int i = 0; i < 20; i++) {
                originalList.add(Integer.toString(i));
            }
    
            int partitionSize = IntMath.divide(originalList.size(), numOfthreads,
                    RoundingMode.UP);
            List<List<String>> partitions = Lists.partition(originalList,
                    partitionSize);
            List<Callable<Object>> todo = new ArrayList<Callable<Object>>();
            int count = 0;
            ExecutorService taskExecutor = Executors
                    .newFixedThreadPool(numOfthreads);
    
            for (int i = 0; i < partitions.size(); i++) {
                TestThreads it = new TestThreads(partitions.get(i));
    
                todo.add(Executors.callable(it));
                System.out.println("Created thread " + count + ", containing: "
                        + partitions.get(i).size() + " files\n");
                for (String s : partitions.get(i)) {
                    System.out.print(s + " ");
                }
                count++;
                System.out.println("\n");
            }
    
            taskExecutor.invokeAll(todo);
    
        }
    
        public static class TestThreads implements Runnable {
            private List<String> lis;
    
            public TestThreads(List<String> list) {
                lis = list;
            }
    
            public void run() {
                for (String s : lis) {
                    System.out.println(Thread.currentThread().getId() + " : " + s);
                }
            }
        }
    }
    

Update:

it was STATIC list that causes issue, everything works fine after removing it. Appreciate the time and help from Tim and Xiezi !

Altri suggerimenti

You need a way to assign the blocks to the different threads.

The simplest way may be to use an AtomicInteger to store the "next block" that needs processing. Each thread calls getAndIncrement() on the nextBlock integer and then processes the relevant block.

For example:

static final int BLOCK_SIZE = 100;
AtomicInteger nextBlock = new AtomicInteger(0);


private class Processor implements Runnable {
     public void run() {
          int blockToProcess = nextBlock.getAndIncrement();
          int end = (blockToProcess+1)*BLOCK_SIZE;

          for (int i=BLOCK_SIZE*blockToProcess;i<end;i++) {
              process(data[i]);
          }
     }
}

All code from memory so may have a few typos/method names that need correcting/etc.

The important point is the use of the AtomicInteger, this ensures that thread safely each thread gets assigned a different block to process.

I meant to add it to your question, but here will be ok. You tried to do this, which is not doing what you need:

final int BLOCK_SIZE = 1; 
AtomicInteger nextBlock = new AtomicInteger(0);
 int blockToProcess =  nextBlock.getAndIncrement(); 
 int endBlocks = (blockToProcess+partitions.size())*BLOCK_SIZE; 
 for(int i=BLOCK_SIZE*blockToProcess;i<endBlocks;i++)
 { 
  Myclass it=new Myclass(l);
  todo.add( Executors.callable(it));
  } 
 taskExecutor.invokeAll(todo);

You actually have two options here - since you are creating the tasks ahead of time you can just tell each task at that point what to process. The way I was suggesting each task picks up the next block when it is executed.

To do it the way you are trying here you can forget the AtomicInteger and just do:

 for(int i=0;i<partitions.size;i++)
 { 
    Myclass it=new Myclass(partitions.get(i)); // or just MyClass(i) and then MyClass pulls out the list from partitions
    taskExecutor.execute(it);
  } 

Where

 public class MyClass implements Runnable {
       List<String> toProcess;
       MyClass(List<String> toProcess) {
           this.toProcess = toProcess;
       }


       @override
       public void run() {
           // Process the list
       }
 }

or

 public class MyClass implements Runnable {
       int toProcess;
       MyClass(int toProcess) {
           this.toProcess = toProcess;
       }


       @override
       public void run() {
           // Process the list
           List<String> list = partitions.get(toProcess);
       }
 }

I bet it's simply that your partitioning has a bug in it. Make sure that it's indeed partitioning correctly, or try this (it uses the Guava library to do the partitioning).

ExecutorService taskExecutor = Executors.newFixedThreadPool(3);
List<String> input = //input
List<Future<Object>> tickets = new ArrayList<Future<Object>>();
for (List<String> partition : Lists.partition(input, 1000)) {
    Future<Object> ticket = 
        input.submit(Executors.callable(new Myclass(partition)));
    tickets.add(ticket);
}

for(Future<Object> ticket : tickets){
    ticket.get();
}

(This example will divide the input into list of length 1000)

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top