模式的限制同时异步调用的数量
-
25-09-2019 - |
题
我需要从外部系统中检索多个对象。外部系统支持多个并发的请求(即线程),但有可能淹没外部系统 - 因此,我希望能够异步检索多个对象,但是我希望能够节流的同时异步请求的数量。即我需要检索100个项目,但不希望的检索的超过25人在一次。当25所完成的每一个请求,我想引发另一检索,一旦它们都是完整的我要回所有的结果中,他们被要求的顺序(即没有点返回的结果,直到返回整个呼叫)。是否有这样的事情任何推荐的模式?
将这样的事情是适当的(伪代码,很明显)?
private List<externalSystemObjects> returnedObjects = new List<externalSystemObjects>;
public List<externalSystemObjects> GetObjects(List<string> ids)
{
int callCount = 0;
int maxCallCount = 25;
WaitHandle[] handles;
foreach(id in itemIds to get)
{
if(callCount < maxCallCount)
{
WaitHandle handle = executeCall(id, callback);
addWaitHandleToWaitArray(handle)
}
else
{
int returnedCallId = WaitHandle.WaitAny(handles);
removeReturnedCallFromWaitHandles(handles);
}
}
WaitHandle.WaitAll(handles);
return returnedObjects;
}
public void callback(object result)
{
returnedObjects.Add(result);
}
解决方案
考虑项工艺作为其25个处理线程出列任务队列的列表,处理任务中,添加结果,然后重复,直到队列是空的:
class Program
{
class State
{
public EventWaitHandle Done;
public int runningThreads;
public List<string> itemsToProcess;
public List<string> itemsResponses;
}
static void Main(string[] args)
{
State state = new State();
state.itemsResponses = new List<string>(1000);
state.itemsToProcess = new List<string>(1000);
for (int i = 0; i < 1000; ++i)
{
state.itemsToProcess.Add(String.Format("Request {0}", i));
}
state.runningThreads = 25;
state.Done = new AutoResetEvent(false);
for (int i = 0; i < 25; ++i)
{
Thread t =new Thread(new ParameterizedThreadStart(Processing));
t.Start(state);
}
state.Done.WaitOne();
foreach (string s in state.itemsResponses)
{
Console.WriteLine("{0}", s);
}
}
private static void Processing(object param)
{
Debug.Assert(param is State);
State state = param as State;
try
{
do
{
string item = null;
lock (state.itemsToProcess)
{
if (state.itemsToProcess.Count > 0)
{
item = state.itemsToProcess[0];
state.itemsToProcess.RemoveAt(0);
}
}
if (null == item)
{
break;
}
// Simulate some processing
Thread.Sleep(10);
string response = String.Format("Response for {0} on thread: {1}", item, Thread.CurrentThread.ManagedThreadId);
lock (state.itemsResponses)
{
state.itemsResponses.Add(response);
}
} while (true);
}
catch (Exception)
{
// ...
}
finally
{
int threadsLeft = Interlocked.Decrement(ref state.runningThreads);
if (0 == threadsLeft)
{
state.Done.Set();
}
}
}
}
您可以使用异步回调做同样的,也没有必要使用线程。
其他提示
有一些队列状结构以容纳待处理的请求是一个很常见的图案。在Web应用程序,其中有可能是处理你看到具有较大队列处理变化的早期部分一个“漏斗”式的方法几层。也可能有某种优先次序施加到队列,较高优先级请求被混洗到队列的顶部。
在解决方案中要考虑的一个重要的事情是,如果请求到达率是比你的处理速度更高(这可能是由于拒绝服务攻击,或者只是今天处理的某些部分是异常缓慢),那么您队列会增加无界。你需要有一些政策,例如,当队列深度超过一定值立即拒绝新的请求。
不隶属于 StackOverflow