Многопоточное с .net httplistener
-
10-10-2019 - |
Вопрос
У меня есть слушатель:
listener = new HttpListener();
listener.Prefixes.Add(@"http://+:8077/");
listener.Start();
listenerThread = new Thread(HandleRequests);
listenerThread.Start();
И я обращаюсь с просьбами:
private void HandleRequests()
{
while (listener.IsListening)
{
var context = listener.BeginGetContext(new AsyncCallback(ListenerCallback), listener);
context.AsyncWaitHandle.WaitOne();
}
}
private void ListenerCallback(IAsyncResult ar)
{
var listener = ar.AsyncState as HttpListener;
var context = listener.EndGetContext(ar);
//do some stuff
}
Я хотел бы написать void Stop()
таким образом, что:
- Он будет блокироваться до тех пор, пока все просьбы в настоящее время обрабатываются в настоящее время (т.е. будет ждать, пока все потоки «сделают кое -что»).
- Хотя он будет ждать уже запускаемых запросов, он не позволит больше запросов (т.е. вернуться в начале
ListenerCallback
). - После этого это позвонит
listener.Stop()
(listener.IsListening
стал ложным).
Как это можно было написать?
РЕДАКТИРОВАТЬ: Что вы думаете об этом решении? Это безопасно?
public void Stop()
{
lock (this)
{
isStopping = true;
}
resetEvent.WaitOne(); //initially set to true
listener.Stop();
}
private void ListenerCallback(IAsyncResult ar)
{
lock (this)
{
if (isStopping)
return;
resetEvent.Reset();
numberOfRequests++;
}
var listener = ar.AsyncState as HttpListener;
var context = listener.EndGetContext(ar);
//do some stuff
lock (this)
{
if (--numberOfRequests == 0)
resetEvent.Set();
}
}
Решение 3
Я консультировался с моим кодом в редактировании части моего вопроса, и я решил принять его с некоторыми модификациями:
public void Stop()
{
lock (locker)
{
isStopping = true;
}
resetEvent.WaitOne(); //initially set to true
listener.Stop();
}
private void ListenerCallback(IAsyncResult ar)
{
lock (locker) //locking on this is a bad idea, but I forget about it before
{
if (isStopping)
return;
resetEvent.Reset();
numberOfRequests++;
}
try
{
var listener = ar.AsyncState as HttpListener;
var context = listener.EndGetContext(ar);
//do some stuff
}
finally //to make sure that bellow code will be executed
{
lock (locker)
{
if (--numberOfRequests == 0)
resetEvent.Set();
}
}
}
Другие советы
Для полноты, вот как бы это выглядело, если бы вы управляли своими собственными темами работников:
class HttpServer : IDisposable
{
private readonly HttpListener _listener;
private readonly Thread _listenerThread;
private readonly Thread[] _workers;
private readonly ManualResetEvent _stop, _ready;
private Queue<HttpListenerContext> _queue;
public HttpServer(int maxThreads)
{
_workers = new Thread[maxThreads];
_queue = new Queue<HttpListenerContext>();
_stop = new ManualResetEvent(false);
_ready = new ManualResetEvent(false);
_listener = new HttpListener();
_listenerThread = new Thread(HandleRequests);
}
public void Start(int port)
{
_listener.Prefixes.Add(String.Format(@"http://+:{0}/", port));
_listener.Start();
_listenerThread.Start();
for (int i = 0; i < _workers.Length; i++)
{
_workers[i] = new Thread(Worker);
_workers[i].Start();
}
}
public void Dispose()
{ Stop(); }
public void Stop()
{
_stop.Set();
_listenerThread.Join();
foreach (Thread worker in _workers)
worker.Join();
_listener.Stop();
}
private void HandleRequests()
{
while (_listener.IsListening)
{
var context = _listener.BeginGetContext(ContextReady, null);
if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle }))
return;
}
}
private void ContextReady(IAsyncResult ar)
{
try
{
lock (_queue)
{
_queue.Enqueue(_listener.EndGetContext(ar));
_ready.Set();
}
}
catch { return; }
}
private void Worker()
{
WaitHandle[] wait = new[] { _ready, _stop };
while (0 == WaitHandle.WaitAny(wait))
{
HttpListenerContext context;
lock (_queue)
{
if (_queue.Count > 0)
context = _queue.Dequeue();
else
{
_ready.Reset();
continue;
}
}
try { ProcessRequest(context); }
catch (Exception e) { Console.Error.WriteLine(e); }
}
}
public event Action<HttpListenerContext> ProcessRequest;
}
Ну, есть несколько способов решить это ... Это простой пример, который использует семафор для отслеживания текущей работы, и сигнал, который повышается, когда все работники закончат. Это должно дать вам основную идею для работы.
Решение ниже не идеально, в идеале мы должны приобрести семафор, прежде чем вызовать BeggeTcontext. Это затрудняет выключение, поэтому я решил использовать этот более упрощенный подход. Если бы я делал это для «настоящего», я бы, вероятно, писал свое собственное управление потоками, а не полагаться на тренас. Это позволило бы более надежного отключения.
В любом случае, вот полный пример:
class TestHttp
{
static void Main()
{
using (HttpServer srvr = new HttpServer(5))
{
srvr.Start(8085);
Console.WriteLine("Press [Enter] to quit.");
Console.ReadLine();
}
}
}
class HttpServer : IDisposable
{
private readonly int _maxThreads;
private readonly HttpListener _listener;
private readonly Thread _listenerThread;
private readonly ManualResetEvent _stop, _idle;
private readonly Semaphore _busy;
public HttpServer(int maxThreads)
{
_maxThreads = maxThreads;
_stop = new ManualResetEvent(false);
_idle = new ManualResetEvent(false);
_busy = new Semaphore(maxThreads, maxThreads);
_listener = new HttpListener();
_listenerThread = new Thread(HandleRequests);
}
public void Start(int port)
{
_listener.Prefixes.Add(String.Format(@"http://+:{0}/", port));
_listener.Start();
_listenerThread.Start();
}
public void Dispose()
{ Stop(); }
public void Stop()
{
_stop.Set();
_listenerThread.Join();
_idle.Reset();
//aquire and release the semaphore to see if anyone is running, wait for idle if they are.
_busy.WaitOne();
if(_maxThreads != 1 + _busy.Release())
_idle.WaitOne();
_listener.Stop();
}
private void HandleRequests()
{
while (_listener.IsListening)
{
var context = _listener.BeginGetContext(ListenerCallback, null);
if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle }))
return;
}
}
private void ListenerCallback(IAsyncResult ar)
{
_busy.WaitOne();
try
{
HttpListenerContext context;
try
{ context = _listener.EndGetContext(ar); }
catch (HttpListenerException)
{ return; }
if (_stop.WaitOne(0, false))
return;
Console.WriteLine("{0} {1}", context.Request.HttpMethod, context.Request.RawUrl);
context.Response.SendChunked = true;
using (TextWriter tw = new StreamWriter(context.Response.OutputStream))
{
tw.WriteLine("<html><body><h1>Hello World</h1>");
for (int i = 0; i < 5; i++)
{
tw.WriteLine("<p>{0} @ {1}</p>", i, DateTime.Now);
tw.Flush();
Thread.Sleep(1000);
}
tw.WriteLine("</body></html>");
}
}
finally
{
if (_maxThreads == 1 + _busy.Release())
_idle.Set();
}
}
}
Просто вызов слушателя.stop () должен сделать свое дело. Это не прекратит никаких соединений, которые уже установлены, но предотвратит какие -либо новые соединения.
Это использует вводную очередь блокировки для запросов на обслуживание. Это используется как есть. Вы должны получить класс из этого и переопределить ответ.
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Text;
using System.Threading;
namespace Service
{
class HttpServer : IDisposable
{
private HttpListener httpListener;
private Thread listenerLoop;
private Thread[] requestProcessors;
private BlockingCollection<HttpListenerContext> messages;
public HttpServer(int threadCount)
{
requestProcessors = new Thread[threadCount];
messages = new BlockingCollection<HttpListenerContext>();
httpListener = new HttpListener();
}
public virtual int Port { get; set; } = 80;
public virtual string[] Prefixes
{
get { return new string[] {string.Format(@"http://+:{0}/", Port )}; }
}
public void Start(int port)
{
listenerLoop = new Thread(HandleRequests);
foreach( string prefix in Prefixes ) httpListener.Prefixes.Add( prefix );
listenerLoop.Start();
for (int i = 0; i < requestProcessors.Length; i++)
{
requestProcessors[i] = StartProcessor(i, messages);
}
}
public void Dispose() { Stop(); }
public void Stop()
{
messages.CompleteAdding();
foreach (Thread worker in requestProcessors) worker.Join();
httpListener.Stop();
listenerLoop.Join();
}
private void HandleRequests()
{
httpListener.Start();
try
{
while (httpListener.IsListening)
{
Console.WriteLine("The Linstener Is Listening!");
HttpListenerContext context = httpListener.GetContext();
messages.Add(context);
Console.WriteLine("The Linstener has added a message!");
}
}
catch(Exception e)
{
Console.WriteLine (e.Message);
}
}
private Thread StartProcessor(int number, BlockingCollection<HttpListenerContext> messages)
{
Thread thread = new Thread(() => Processor(number, messages));
thread.Start();
return thread;
}
private void Processor(int number, BlockingCollection<HttpListenerContext> messages)
{
Console.WriteLine ("Processor {0} started.", number);
try
{
for (;;)
{
Console.WriteLine ("Processor {0} awoken.", number);
HttpListenerContext context = messages.Take();
Console.WriteLine ("Processor {0} dequeued message.", number);
Response (context);
}
} catch { }
Console.WriteLine ("Processor {0} terminated.", number);
}
public virtual void Response(HttpListenerContext context)
{
SendReply(context, new StringBuilder("<html><head><title>NULL</title></head><body>This site not yet implementd.</body></html>") );
}
public static void SendReply(HttpListenerContext context, StringBuilder responseString )
{
byte[] buffer = System.Text.Encoding.UTF8.GetBytes(responseString.ToString());
context.Response.ContentLength64 = buffer.Length;
System.IO.Stream output = context.Response.OutputStream;
output.Write(buffer, 0, buffer.Length);
output.Close();
}
}
}
Это образец того, как его использовать. Не нужно использовать события или какие -либо блоки блокировки. Блокировка решает все эти проблемы.
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Net;
using System.Text;
using System.Threading;
namespace Service
{
class Server
{
public static void Main (string[] args)
{
HttpServer Service = new QuizzServer (8);
Service.Start (80);
for (bool coninute = true; coninute ;)
{
string input = Console.ReadLine ().ToLower();
switch (input)
{
case "stop":
Console.WriteLine ("Stop command accepted.");
Service.Stop ();
coninute = false;
break;
default:
Console.WriteLine ("Unknown Command: '{0}'.",input);
break;
}
}
}
}
}