首页编程parallel.foreach(我怎样才能等到Parallel.ForEach完成)

parallel.foreach(我怎样才能等到Parallel.ForEach完成)

编程之家2023-11-0768次浏览

老铁们,大家好,相信还有很多朋友对于parallel.foreach和我怎样才能等到Parallel.ForEach完成的相关问题不太懂,没关系,今天就由我来为大家分享分享parallel.foreach以及我怎样才能等到Parallel.ForEach完成的问题,文章篇幅可能偏长,希望可以帮助到大家,下面一起来看看吧!

parallel.foreach(我怎样才能等到Parallel.ForEach完成)

我怎样才能等到Parallel.ForEach完成

当需要为多核机器进行优化的时候,最好先检查下你的程序是否有处理能够分割开来进行并行处理。(例如,有一个巨大的数据集合,其中的元素需要一个一个进行彼此独立的耗时计算)。

.net framework 4中提供了 Parallel.ForEach和 PLINQ来帮助我们进行并行处理,本文探讨这两者的差别及适用的场景。

Parallel.ForEach

Parallel.ForEach是 foreach的多线程实现,他们都能对 IEnumerable<T>类型对象进行遍历,Parallel.ForEach的特殊之处在于它使用多线程来执行循环体内的代码段。

Parallel.ForEach最常用的形式如下:

public static ParallelLoopResult ForEach<TSource>(

parallel.foreach(我怎样才能等到Parallel.ForEach完成)

IEnumerable<TSource> source,

Action<TSource> body)

PLINQ

PLINQ也是一种对数据进行并行处理的编程模型,它通过 LINQ的语法来实现类似 Parallel.ForEach的多线程并行处理。

场景一:简单数据之独立操作的并行处理(使用 Parallel.ForEach)

示例代码:

parallel.foreach(我怎样才能等到Parallel.ForEach完成)

public static void IndependentAction(IEnumerable<T> source, Action<T> action)

{

Parallel.ForEach(source, element=> action(element));

}

理由:

虽然 PLINQ也提供了一个类似的 ForAll接口,但它对于简单的独立操作太重量化了。

2.使用 Parallel.ForEach你还能够设定

ParallelOptions.MaxDegreeOfParalelism参数(指定最多需要多少个线程),这样当 ThreadPool

资源匮乏(甚至当可用线程数<MaxDegreeOfParalelism)的时候, Parallel.ForEach

依然能够顺利运行,并且当后续有更多可用线程出现时,Parallel.ForEach也能及时地利用这些线程。PLINQ

只能通过WithDegreeOfParallelism方法来要求固定的线程数,即:要求了几个就是几个,不会多也不会少。

场景二:顺序数据之并行处理(使用 PLINQ来维持数据顺序)

当输出的数据序列需要保持原始的顺序时采用 PLINQ的 AsOrdered方法非常简单高效。

示例代码:

public static void GrayscaleTransformation(IEnumerable<Frame> Movie)

{

var ProcessedMovie=

Movie

.AsParallel()

.AsOrdered()

.Select(frame=> ConvertToGrayscale(frame));

foreach(var grayscaleFrame in ProcessedMovie)

{

// Movie frames will be evaluated lazily

}

}

理由:

Parallel.ForEach实现起来需要绕一些弯路,首先你需要使用以下的重载在方法:

public static ParallelLoopResult ForEach<TSource>(

IEnumerable<TSource> source,

Action<TSource, ParallelLoopState, Int64> body)

这个重载的 Action多包含了 index参数,这样你在输出的时候就能利用这个值来维持原先的序列顺序。请看下面的例子: public static double [] PairwiseMultiply(double[] v1, double[] v2)

{

var length= Math.Min(v1.Length, v2.Lenth);

double[] result= new double[length];

Parallel.ForEach(v1,(element, loopstate, elementIndex)=>

result[elementIndex]= element* v2[elementIndex]);

return result;

}

你可能已经意识到这里有个明显的问题:我们使用了固定长度的数组。如果传入的是 IEnumerable那么你有4个解决方案:

(1)调用 IEnumerable.Count()来获取数据长度,然后用这个值实例化一个固定长度的数组,然后使用上例的代码。

(2) The second option would be to materialize the original

collection before using it; in the event that your input data set is

prohibitively large, neither of the first two options will be

feasible.(没看懂贴原文)

(3)第三种方式是采用返回一个哈希集合的方式,这种方式下通常需要至少2倍于传入数据的内存,所以处理大数据时请慎用。

(4)自己实现排序算法(保证传入数据与传出数据经过排序后次序一致)

2.相比之下 PLINQ的 AsOrdered方法如此简单,而且该方法能处理流式的数据,从而允许传入数据是延迟实现的(lazy materialized)

场景三:流数据之并行处理(使用 PLINQ)

PLINQ能输出流数据,这个特性在一下场合非常有用:

结果集不需要是一个完整的处理完毕的数组,即:任何时间点下内存中仅保持数组中的部分信息

2.你能够在一个单线程上遍历输出结果(就好像他们已经存在/处理完了)

示例:

public static void AnalyzeStocks(IEnumerable<Stock> Stocks)

{

var StockRiskPortfolio=

Stocks

.AsParallel()

.AsOrdered()

.Select(stock=> new{ Stock= stock, Risk= ComputeRisk(stock)})

.Where(stockRisk=> ExpensiveRiskAnalysis(stockRisk.Risk));

foreach(var stockRisk in StockRiskPortfolio)

{

SomeStockComputation(stockRisk.Risk);

// StockRiskPortfolio will be a stream of results

}

}

这里使用一个单线程的 foreach来对 PLINQ的输出进行后续处理,通常情况下 foreach不需要等待 PLINQ处理完所有数据就能开始运作。

PLINQ也允许指定输出缓存的方式,具体可参照 PLINQ的 WithMergeOptions方法,及 ParallelMergeOptions枚举

场景四:处理两个集合(使用 PLINQ)

PLINQ的 Zip方法提供了同时遍历两个集合并进行结合元算的方法,并且它可以与其他查询处理操作结合,实现非常复杂的机能。

示例:

public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)

{

return

a

.AsParallel()

.AsOrdered()

.Select(element=> ExpensiveComputation(element))

.Zip(

b

.AsParallel()

.AsOrdered()

.Select(element=> DifferentExpensiveComputation(element)),

(a_element, b_element)=> Combine(a_element,b_element));

}

示例中的两个数据源能够并行处理,当双方都有一个可用元素时提供给 Zip进行后续处理(Combine)。

Parallel.ForEach也能实现类似的 Zip处理:

public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)

{

var numElements= Math.Min(a.Count(), b.Count());

var result= new T[numElements];

Parallel.ForEach(a,

(element, loopstate, index)=>

{

var a_element= ExpensiveComputation(element);

var b_element= DifferentExpensiveComputation(b.ElementAt(index));

result[index]= Combine(a_element, b_element);

});

return result;

}

当然使用 Parallel.ForEach后你就得自己确认是否要维持原始序列,并且要注意数组越界访问的问题。

场景五:线程局部变量

Parallel.ForEach提供了一个线程局部变量的重载,定义如下:

public static ParallelLoopResult ForEach<TSource, TLocal>(

IEnumerable<TSource> source,

Func<TLocal> localInit,

Func<TSource, ParallelLoopState, TLocal,TLocal> body,

Action<TLocal> localFinally)

使用的示例: public static List<R> Filtering<T,R>(IEnumerable<T> source)

{

var results= new List<R>();

using(SemaphoreSlim sem= new SemaphoreSlim(1))

{

Parallel.ForEach(source,

()=> new List<R>(),

(element, loopstate, localStorage)=>

{

bool filter= filterFunction(element);

if(filter)

localStorage.Add(element);

return localStorage;

},

(finalStorage)=>

{

lock(myLock)

{

results.AddRange(finalStorage)

};

});

}

return results;

}

线程局部变量有什么优势呢?请看下面的例子(一个网页抓取程序): public static void UnsafeDownloadUrls()

{

WebClient webclient= new WebClient();

Parallel.ForEach(urls,

(url,loopstate,index)=>

{

webclient.DownloadFile(url, filenames[index]+".dat");

Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);

});

}

通常第一版代码是这么写的,但是运行时会报错“System.NotSupportedException-> WebClient

does not support concurrent I/O operations.”。这是因为多个线程无法同时访问同一个 WebClient

对象。所以我们会把 WebClient对象定义到线程中来: public static void BAD_DownloadUrls()

{

Parallel.ForEach(urls,

(url,loopstate,index)=>

{

WebClient webclient= new WebClient();

webclient.DownloadFile(url, filenames[index]+".dat");

Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);

});

}

修改之后依然有问题,因为你的机器不是服务器,大量实例化的 WebClient迅速达到你机器允许的虚拟连接上限数。线程局部变量可以解决这个问题: public static void downloadUrlsSafe()

{

Parallel.ForEach(urls,

()=> new WebClient(),

(url, loopstate, index, webclient)=>

{

webclient.DownloadFile(url, filenames[index]+".dat");

Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);

return webclient;

},

(webclient)=>{});

}

这样的写法保证了我们能获得足够的 WebClient实例,同时这些 WebClient实例彼此隔离仅仅属于各自关联的线程。

虽然 PLINQ提供了 ThreadLocal<T>对象来实现类似的功能:

public static void downloadUrl()

{

var webclient= new ThreadLocal<WebClient>(()=> new WebClient());

var res=

urls

.AsParallel()

.ForAll(

url=>

{

webclient.Value.DownloadFile(url, host[url]+".dat"));

Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);

});

}

但是请注意:ThreadLocal<T>相对而言开销更大!

场景五:退出操作(使用 Parallel.ForEach)

Parallel.ForEach有个重载声明如下,其中包含一个 ParallelLoopState对象:

public static ParallelLoopResult ForEach<TSource>(

IEnumerable<TSource> source,

Action<TSource, ParallelLoopState> body)

ParallelLoopState.Stop()提供了退出循环的方法,这种方式要比其他两种方法更快。这个方法通知循环不要再启动执行新的迭代,并尽可能快的推出循环。

ParallelLoopState.IsStopped属性可用来判定其他迭代是否调用了 Stop方法。

示例:

public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>

{

var matchFound= false;

Parallel.ForEach(TSpace,

(curValue, loopstate)=>

{

if(curValue.Equals(match))

{

matchFound= true;

loopstate.Stop();

}

});

return matchFound;

}

ParallelLoopState.Break()通知循环继续执行本元素前的迭代,但不执行本元素之后的迭代。最前调用 Break

的起作用,并被记录到 ParallelLoopState.LowestBreakIteration

属性中。这种处理方式通常被应用在一个有序的查找处理中,比如你有一个排序过的数组,你想在其中查找匹配元素的最小

index,那么可以使用以下的代码:

public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>

{

var loopResult= Parallel.ForEach(source,

(curValue, loopState, curIndex)=>

{

if(curValue.Equals(match))

{

loopState.Break();

}

});

var matchedIndex= loopResult.LowestBreakIteration;

return matchedIndex.HasValue? matchedIndex:-1;

}

何时使用 Parallel.ForEach,何时使用 PLINQ

当需要为多核机器进行优化的时候,最好先检查下你的程序是否有处理能够分割开来进行并行处理。(例如,有一个巨大的数据集合,其中的元素需要一个一个进行彼此独立的耗时计算)。

.net framework 4中提供了 Parallel.ForEach和 PLINQ来帮助我们进行并行处理,本文探讨这两者的差别及适用的场景。

Parallel.ForEach

Parallel.ForEach是 foreach的多线程实现,他们都能对 IEnumerable<T>类型对象进行遍历,Parallel.ForEach的特殊之处在于它使用多线程来执行循环体内的代码段。

Parallel.ForEach最常用的形式如下:

public static ParallelLoopResult ForEach<TSource>(

IEnumerable<TSource> source,

Action<TSource> body)

PLINQ

PLINQ也是一种对数据进行并行处理的编程模型,它通过 LINQ的语法来实现类似 Parallel.ForEach的多线程并行处理。

场景一:简单数据之独立操作的并行处理(使用 Parallel.ForEach)

示例代码:

public static void IndependentAction(IEnumerable<T> source, Action<T> action)

{

Parallel.ForEach(source, element=> action(element));

}

理由:

1.虽然 PLINQ也提供了一个类似的 ForAll接口,但它对于简单的独立操作太重量化了。

2.使用 Parallel.ForEach你还能够设定

ParallelOptions.MaxDegreeOfParalelism参数(指定最多需要多少个线程),这样当 ThreadPool

资源匮乏(甚至当可用线程数<MaxDegreeOfParalelism)的时候, Parallel.ForEach

依然能够顺利运行,并且当后续有更多可用线程出现时,Parallel.ForEach也能及时地利用这些线程。PLINQ

只能通过WithDegreeOfParallelism方法来要求固定的线程数,即:要求了几个就是几个,不会多也不会少。

场景二:顺序数据之并行处理(使用 PLINQ来维持数据顺序)

当输出的数据序列需要保持原始的顺序时采用 PLINQ的 AsOrdered方法非常简单高效。

示例代码:

public static void GrayscaleTransformation(IEnumerable<Frame> Movie)

{

var ProcessedMovie=

Movie

.AsParallel()

.AsOrdered()

.Select(frame=> ConvertToGrayscale(frame));

foreach(var grayscaleFrame in ProcessedMovie)

{

// Movie frames will be evaluated lazily

}

}

理由:

1. Parallel.ForEach实现起来需要绕一些弯路,首先你需要使用以下的重载在方法:

public static ParallelLoopResult ForEach<TSource>(

IEnumerable<TSource> source,

Action<TSource, ParallelLoopState, Int64> body)

这个重载的 Action多包含了 index参数,这样你在输出的时候就能利用这个值来维持原先的序列顺序。请看下面的例子: public static double [] PairwiseMultiply(double[] v1, double[] v2)

{

var length= Math.Min(v1.Length, v2.Lenth);

double[] result= new double[length];

Parallel.ForEach(v1,(element, loopstate, elementIndex)=>

result[elementIndex]= element* v2[elementIndex]);

return result;

}

你可能已经意识到这里有个明显的问题:我们使用了固定长度的数组。如果传入的是 IEnumerable那么你有4个解决方案:

(1)调用 IEnumerable.Count()来获取数据长度,然后用这个值实例化一个固定长度的数组,然后使用上例的代码。

(2) The second option would be to materialize the original

collection before using it; in the event that your input data set is

prohibitively large, neither of the first two options will be

feasible.(没看懂贴原文)

(3)第三种方式是采用返回一个哈希集合的方式,这种方式下通常需要至少2倍于传入数据的内存,所以处理大数据时请慎用。

(4)自己实现排序算法(保证传入数据与传出数据经过排序后次序一致)

2.相比之下 PLINQ的 AsOrdered方法如此简单,而且该方法能处理流式的数据,从而允许传入数据是延迟实现的(lazy materialized)

场景三:流数据之并行处理(使用 PLINQ)

PLINQ能输出流数据,这个特性在一下场合非常有用:

1.结果集不需要是一个完整的处理完毕的数组,即:任何时间点下内存中仅保持数组中的部分信息

2.你能够在一个单线程上遍历输出结果(就好像他们已经存在/处理完了)

示例:

public static void AnalyzeStocks(IEnumerable<Stock> Stocks)

{

var StockRiskPortfolio=

Stocks

.AsParallel()

.AsOrdered()

.Select(stock=> new{ Stock= stock, Risk= ComputeRisk(stock)})

.Where(stockRisk=> ExpensiveRiskAnalysis(stockRisk.Risk));

foreach(var stockRisk in StockRiskPortfolio)

{

SomeStockComputation(stockRisk.Risk);

// StockRiskPortfolio will be a stream of results

}

}

这里使用一个单线程的 foreach来对 PLINQ的输出进行后续处理,通常情况下 foreach不需要等待 PLINQ处理完所有数据就能开始运作。

PLINQ也允许指定输出缓存的方式,具体可参照 PLINQ的 WithMergeOptions方法,及 ParallelMergeOptions枚举

场景四:处理两个集合(使用 PLINQ)

PLINQ的 Zip方法提供了同时遍历两个集合并进行结合元算的方法,并且它可以与其他查询处理操作结合,实现非常复杂的机能。

示例:

public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)

{

return

a

.AsParallel()

.AsOrdered()

.Select(element=> ExpensiveComputation(element))

.Zip(

b

.AsParallel()

.AsOrdered()

.Select(element=> DifferentExpensiveComputation(element)),

(a_element, b_element)=> Combine(a_element,b_element));

}

示例中的两个数据源能够并行处理,当双方都有一个可用元素时提供给 Zip进行后续处理(Combine)。

Parallel.ForEach也能实现类似的 Zip处理:

public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)

{

var numElements= Math.Min(a.Count(), b.Count());

var result= new T[numElements];

Parallel.ForEach(a,

(element, loopstate, index)=>

{

var a_element= ExpensiveComputation(element);

var b_element= DifferentExpensiveComputation(b.ElementAt(index));

result[index]= Combine(a_element, b_element);

});

return result;

}

当然使用 Parallel.ForEach后你就得自己确认是否要维持原始序列,并且要注意数组越界访问的问题。

场景五:线程局部变量

Parallel.ForEach提供了一个线程局部变量的重载,定义如下:

public static ParallelLoopResult ForEach<TSource, TLocal>(

IEnumerable<TSource> source,

Func<TLocal> localInit,

Func<TSource, ParallelLoopState, TLocal,TLocal> body,

Action<TLocal> localFinally)

使用的示例: public static List<R> Filtering<T,R>(IEnumerable<T> source)

{

var results= new List<R>();

using(SemaphoreSlim sem= new SemaphoreSlim(1))

{

Parallel.ForEach(source,

()=> new List<R>(),

(element, loopstate, localStorage)=>

{

bool filter= filterFunction(element);

if(filter)

localStorage.Add(element);

return localStorage;

},

(finalStorage)=>

{

lock(myLock)

{

results.AddRange(finalStorage)

};

});

}

return results;

}

线程局部变量有什么优势呢?请看下面的例子(一个网页抓取程序): public static void UnsafeDownloadUrls()

{

WebClient webclient= new WebClient();

Parallel.ForEach(urls,

(url,loopstate,index)=>

{

webclient.DownloadFile(url, filenames[index]+".dat");

Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);

});

}

通常第一版代码是这么写的,但是运行时会报错“System.NotSupportedException-> WebClient

does not support concurrent I/O operations.”。这是因为多个线程无法同时访问同一个 WebClient

对象。所以我们会把 WebClient对象定义到线程中来: public static void BAD_DownloadUrls()

{

Parallel.ForEach(urls,

(url,loopstate,index)=>

{

WebClient webclient= new WebClient();

webclient.DownloadFile(url, filenames[index]+".dat");

Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);

});

}

修改之后依然有问题,因为你的机器不是服务器,大量实例化的 WebClient迅速达到你机器允许的虚拟连接上限数。线程局部变量可以解决这个问题: public static void downloadUrlsSafe()

{

Parallel.ForEach(urls,

()=> new WebClient(),

(url, loopstate, index, webclient)=>

{

webclient.DownloadFile(url, filenames[index]+".dat");

Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);

return webclient;

},

(webclient)=>{});

}

这样的写法保证了我们能获得足够的 WebClient实例,同时这些 WebClient实例彼此隔离仅仅属于各自关联的线程。

虽然 PLINQ提供了 ThreadLocal<T>对象来实现类似的功能:

public static void downloadUrl()

{

var webclient= new ThreadLocal<WebClient>(()=> new WebClient());

var res=

urls

.AsParallel()

.ForAll(

url=>

{

webclient.Value.DownloadFile(url, host[url]+".dat"));

Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);

});

}

但是请注意:ThreadLocal<T>相对而言开销更大!

场景五:退出操作(使用 Parallel.ForEach)

Parallel.ForEach有个重载声明如下,其中包含一个 ParallelLoopState对象:

public static ParallelLoopResult ForEach<TSource>(

IEnumerable<TSource> source,

Action<TSource, ParallelLoopState> body)

ParallelLoopState.Stop()提供了退出循环的方法,这种方式要比其他两种方法更快。这个方法通知循环不要再启动执行新的迭代,并尽可能快的推出循环。

ParallelLoopState.IsStopped属性可用来判定其他迭代是否调用了 Stop方法。

示例:

public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>

{

var matchFound= false;

Parallel.ForEach(TSpace,

(curValue, loopstate)=>

{

if(curValue.Equals(match))

{

matchFound= true;

loopstate.Stop();

}

});

return matchFound;

}

ParallelLoopState.Break()通知循环继续执行本元素前的迭代,但不执行本元素之后的迭代。最前调用 Break

的起作用,并被记录到 ParallelLoopState.LowestBreakIteration

属性中。这种处理方式通常被应用在一个有序的查找处理中,比如你有一个排序过的数组,你想在其中查找匹配元素的最小

index,那么可以使用以下的代码:

public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>

{

var loopResult= Parallel.ForEach(source,

(curValue, loopState, curIndex)=>

{

if(curValue.Equals(match))

{

loopState.Break();

}

});

var matchedIndex= loopResult.LowestBreakIteration;

return matchedIndex.HasValue? matchedIndex:-1;

}

tpl和ppl区别

TPL和PPL都是与计算机编程中的并行编程相关的概念,但它们具有不同的含义。

TPL代表任务并行库(Task Parallel Library),是.NET Framework中的一个组件,用于在多核处理器上执行并行计算。TPL使得并行编程变得更加容易,通过提供一些高级抽象来简化并行编程任务,例如任务、数据流和并发集合等。TPL的目标是提高并行应用程序的性能、可伸缩性和可维护性。

而PPL则代表并行模式库(Parallel Patterns Library),是一个C++库,可用于编写高效的并行应用程序。PPL提供了一些高级抽象来简化并行编程任务,例如并行算法、并行循环、并行foreach、并行STL和并行数据流等。PPL的目标是提高并行应用程序的性能、可伸缩性和可维护性。

因此,TPL和PPL都是用于简化并行编程的库,但它们针对不同的编程语言和环境,且提供不同的抽象和功能。

文章到此结束,希望我们对于parallel.foreach和我怎样才能等到Parallel.ForEach完成的问题能够给您带来一些启发和解决方案。如果您需要更多信息或者有其他问题,请随时联系我们。

httpclient.jar,httpclient在哪个jar包shopex系统?ECshop与shopex的区别