Question

I'm trying to use the executor plugin in grails but I'm having a problem which I am not able to solve. Basically, i have a list of links that I want to crawl and I was having an issue where it was always crawling the same ones, so I simplified my example to this:

    List offerLinks = getOfferLinks(parser)
    offerLinks.each{println it}

    List futures = new Vector()
    for (def link : offerLinks) {
        def future = callAsync {
            return link
        }
        futures.add(future)
    }

    futures.each{println "FUTURE " +  it.get()}

This is what gets printed in the console

bt-ofrd-acciona-6633344.htm?
bt-ofrd-celiasiffredi-293068.htm?
bt-ofrd-clahubiz-92924.htm?
bt-ofrd-haruko-1672632.htm?
FUTURE bt-ofrd-clahubiz-92924.htm?
FUTURE bt-ofrd-haruko-1672632.htm?
FUTURE bt-ofrd-haruko-1672632.htm?
FUTURE bt-ofrd-haruko-1672632.htm?

The first 4 results are for the offerLinks.each{println it} code
The last 4 are for futures.each{println "FUTURE " + it.get()}

What I'm trying to find out is why putting those links in the callAsync block and retrieving them from the future objects make them take the last value, it seems like its replacing the already created future objects?

This piece of code is inside a service called by a controller. I appreciate any help you can give me. Thanks

Update:
I'm thinking there is some kind of problem in the Java executor API... or maybe I'm don't fully understand how it really works?
Here is another test changing the code to use invokeAll:

    def threadPool = Executors.newCachedThreadPool()

    List offerLinks = getOfferLinks(parser)
    List lista = new ArrayList()
    for (enlace in offerLinks) {
        println "link " + enlace
        lista.add({enlace} as Callable)
    }
    def futures = threadPool.invokeAll(lista)

    futures.each{println "FUTURE " +  it.get()}

This is what gets printed
link /bt-ofrd-implementar-192996.htm?
link /bt-ofrd-cdonini-864908.htm?
link /bt-ofrd-hvtalent-1493932.htm?
link /bt-ofrd-dbak-1358120.htm?
link /bt-ofrd-hexacta-100072.htm?
link /bt-ofrd-ccibelli-457472.htm?
FUTURE /bt-ofrd-ccibelli-457472.htm?
FUTURE /bt-ofrd-ccibelli-457472.htm?
FUTURE /bt-ofrd-ccibelli-457472.htm?
FUTURE /bt-ofrd-ccibelli-457472.htm?
FUTURE /bt-ofrd-ccibelli-457472.htm?
FUTURE /bt-ofrd-ccibelli-457472.htm?

Était-ce utile?

La solution

It looks to me like something odd is going on with the scope of variables defined outside the closure but referred to from inside, it's not "closing" properly. Does it work any better if you do

def threadPool = Executors.newCachedThreadPool()

List offerLinks = getOfferLinks(parser)
List lista = new ArrayList()
for (enlace in offerLinks) {
    println "link " + enlace
    lista.add(({ it }.curry(enlace)) as Callable)
}
def futures = threadPool.invokeAll(lista)

futures.each{println "FUTURE " +  it.get()}

This should ensure that the right thing gets passed into the closure, and the closure itself doesn't need to refer to the externally-defined enlace variable directly.

This doesn't on its own explain why what you've already tried didn't work, but it might give you a workaround.


Edit: I didn't spot this before, but I now notice that you aren't declaring enlace in that for loop, so it's not a local variable and the closures are (correctly) referring to a single shared variable rather than "closing" over the value in a particular loop iteration. It should work if you use a construction like this instead:

def tasks = offerLinks.collect { link ->
  println "link " + enlace
  return ({ link } as Callable)
}
def futures = threadPool.invokeAll(tasks)
futures.each{println "FUTURE " +  it.get()}

where the link variable is local to the collect closure, so the {...} as Callable will close over the correct value. The equivalent in terms of callAsync would be to use

List futures = offerLinks.collect { link ->
  callAsync { link }
}

Autres conseils

Is this better?

List offerLinks = getOfferLinks(parser)
offerLinks.each{println it}

List futures = new Vector()
for (def link : offerLinks) {
    futures.add( callAsync {
        return link
    }

) }

futures.each{println "FUTURE " +  it.get()}

Sounds like the same phenonemon I just experienced, see this answer.

For me, the problem was that "The current thread's MDC is inherited by newly spawned threads". I don't know the why of that, so I can't tell why you may bump into the same problem - but maybe because you retrieve the links from a service?

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top