Domanda

Il problema

Anche se il codice di cui parlerò qui ho scritto in F #, si basa sul framework .NET 4, non specificamente a seconda qualsiasi particolarità di F # (almeno sembra così!).

Ho alcuni pezzi di dati sul mio disco che devo aggiornare dalla rete, salvando la versione più recente per il disco:

type MyData =
    { field1 : int;
      field2 : float }

type MyDataGroup =
    { Data : MyData[];
      Id : int }

// load : int -> MyDataGroup
let load dataId =
    let data = ... // reads from disk
    { Data = data;
      Id = dataId }

// update : MyDataGroup -> MyDataGroup
let update dg =
    let newData = ... // reads from the network and process
                      // newData : MyData[]

    { dg with Data = dg.Data
                     |> Seq.ofArray
                     |> Seq.append newData
                     |> processDataSomehow
                     |> Seq.toArray }

// save : MyDataGroup -> unit
let save dg = ... // writes to the disk

let loadAndSaveAndUpdate = load >> update >> save

Il problema è che a loadAndSaveAndUpdate tutti i miei dati, avrei dovuto eseguire la funzione molti volte:

{1 .. 5000} |> loadAndSaveAndUpdate

Ogni passo farebbe

  • un po 'IO disco,
  • qualche scricchiolio di dati,
  • qualche rete IO (con possibilità di un sacco di latenza),
  • più dati scricchiolio,
  • e un po 'IO disco.

Non sarebbe bello avere questo fatto in parallelo, in una certa misura? Purtroppo, nessuno dei miei funzioni di lettura e di analisi sono "asincrone-workflow-ready".

I primi (non molto buono) le soluzioni sono arrivato fino a

Attività

La prima cosa che ho fatto è stato quello di impostare un Task[] e li iniziare:

let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
            |> Seq.map createTask
            |> Seq.toArray

tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)

Poi mi ha colpito CTRL + ESC solo per vedere il numero di thread che stava usando. 15, 17, ..., 35, ..., 170, ... fino a quando ha ucciso l'applicazione! Qualcosa stava andando male.

Parallela

Ho fatto quasi la stessa cosa, ma utilizzando Parallel.ForEach(...) ei risultati sono stati gli stessi:. Un sacco e un sacco di discussioni

Una soluzione che funziona ... tipo di

Poi ho deciso di iniziare solo discussioni n, Task.WaitAll(of them), poi altri n, fino a quando non c'erano più attività disponibili.

Questo funziona, ma il problema è che quando si ha terminato l'elaborazione, diciamo, compiti n-1, si aspetta, aspetta, aspetta per il maledetto ultima attività che insistono sul blocco a causa di un sacco di latenza di rete. Questo non è buono!

Quindi, come è possibile attaccare questo problema ? Apprezzerei per visualizzare diverse soluzioni, coinvolgendo sia asincrone flussi di lavoro (e in questo caso come adattare le mie funzioni non asincrone), estensioni parallele, modelli paralleli strani, ecc.

Grazie.

È stato utile?

Soluzione

Sei sicuro che le attività individuali stanno completando in modo tempestivo? Credo che sia Parallel.ForEach e la classe Task già utilizzano il ThreadPool .NET. Compiti dovrebbero generalmente essere gli elementi di lavoro di breve durata, nel qual caso il pool di thread sarà solo deporre le uova un piccolo numero di thread reali, ma se i vostri compiti non stanno facendo progressi e ci sono altri compiti in coda allora il numero di thread utilizzati aumenterà continuamente fino a il massimo (che di default è 250 / processore in .NET 2.0 SP1, ma è diverso in diverse versioni del quadro). E 'anche interessante notare che (almeno in .NET 2.0 SP1) nuova creazione filo viene strozzata a 2 nuove discussioni al secondo, in modo da ottenere fino al numero di thread che stai vedendo indica che i compiti non stanno completando in un breve lasso di tempo (in modo che non può essere completamente accurato per la colpa su Parallel.ForEach).

Credo che il suggerimento di Brian di utilizzare flussi di lavoro async è buona, soprattutto se la fonte dei compiti longevi è IO, dal momento che async tornerà tuoi thread per il ThreadPool fino a quando il IO completa. Un'altra opzione è quella di accettare semplicemente che le attività non stanno completando rapidamente e consentire la deposizione delle uova di molti fili (che può essere controllato in una certa misura utilizzando System.Threading.ThreadPool.SetMaxThreads) - a seconda della situazione che non può essere un grosso problema che si sta utilizzando un sacco di fili.

Altri suggerimenti

ParallelOptions.MaxDegreeOfParallelism limiti il numero di operazioni simultanee gestito da chiamate di metodo parallele

Usando 'asincrone di vi permetterà di fare l'I / O-bound lavoro senza fili bruciore durante le varie chiamate di I / O sono 'in mare', in modo che sarebbe il mio primo suggerimento. Dovrebbe essere semplice per convertire il codice per ASYNC, di solito sulla falsariga di

  • avvolgere ogni corpo della funzione in async{...}, aggiungere return se necessario
  • creare versioni asincrone di qualsiasi primitive di I / O che non sono già nella libreria tramite Async.FromBeginEnd
  • chiamate interruttore della forma let r = Foo() a let! r = AsyncFoo()
  • Usa Async.Parallel per convertire gli oggetti asincroni 5000 in un unico asincrono che corre in parallelo

Ci sono vari tutorial per fare questo; Uno di questi webcast è qui .

Si può sempre utilizzare un ThreadPool.

http://msdn.microsoft.com/en -us / library / system.threading.threadpool.aspx

in fondo:

  1. Crea un pool di thread
  2. Imposta il numero massimo di thread
  3. coda tutte le attività utilizzando QueueUserWorkItem(WaitCallback)
Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top