using System; using System.Collections.Generic; using System.IO; using System.Net; using System.Net.Sockets; using System.Text; using AllocsFixes.JSON; namespace AllocsFixes.NetConnections.Servers.Web.SSE { public abstract class EventBase { private const int EncodingBufferSize = 1024 * 1024; private readonly SseHandler Parent; public readonly string Name; private readonly byte[] encodingBuffer; private readonly StringBuilder stringBuilder = new StringBuilder (); private readonly List openStreams = new List (); private readonly global::BlockingQueue<(string _eventName, object _data)> sendQueue = new global::BlockingQueue<(string _eventName, object _data)> (); private int currentlyOpen; private int totalOpened; private int totalClosed; protected EventBase (SseHandler _parent, bool _reuseEncodingBuffer = true, string _name = null) { Name = _name ?? GetType ().Name; Parent = _parent; if (_reuseEncodingBuffer) { encodingBuffer = new byte[EncodingBufferSize]; } } public virtual void AddListener (HttpListenerResponse _resp) { totalOpened++; currentlyOpen++; openStreams.Add (_resp); } protected void SendData (string _eventName, object _data) { sendQueue.Enqueue ((_eventName, _data)); Parent.SignalSendQueue (); } public void ProcessSendQueue () { while (sendQueue.HasData ()) { (string eventName, object data) = sendQueue.Dequeue (); stringBuilder.Append ("event: "); stringBuilder.AppendLine (eventName); stringBuilder.Append ("data: "); if (data is string dataString) { stringBuilder.AppendLine (dataString); } else if (data is JSONNode dataJson) { dataJson.ToString (stringBuilder); stringBuilder.AppendLine (""); } else { Log.Error ($"SSE ({Name}): Data is neither string nor JSON."); continue; } stringBuilder.AppendLine (""); string output = stringBuilder.ToString (); stringBuilder.Clear (); byte[] buf; int bytesToSend; if (encodingBuffer != null) { buf = encodingBuffer; try { bytesToSend = Encoding.UTF8.GetBytes (output, 0, output.Length, buf, 0); } catch (ArgumentException e) { Log.Error ($"SSE ({Name}): Exception while encoding data for output, most likely exceeding buffer size:"); Log.Exception (e); return; } } else { buf = Encoding.UTF8.GetBytes (output); bytesToSend = buf.Length; } for (int i = openStreams.Count - 1; i >= 0; i--) { HttpListenerResponse resp = openStreams [i]; try { if (resp.OutputStream.CanWrite) { resp.OutputStream.Write (buf, 0, bytesToSend); resp.OutputStream.Flush (); } else { currentlyOpen--; totalClosed++; Log.Out ( $"SSE ({Name}): Can not write to endpoint, closing. (Left open: {currentlyOpen}, total opened: {totalOpened}, closed: {totalClosed}"); openStreams.RemoveAt (i); resp.Close (); } } catch (IOException e) { currentlyOpen--; totalClosed++; openStreams.RemoveAt (i); if (e.InnerException is SocketException se) { if (se.SocketErrorCode != SocketError.ConnectionAborted) { Log.Error ($"SSE ({Name}): SocketError ({se.SocketErrorCode}) while trying to write: (Left open: {currentlyOpen}, total opened: {totalOpened}, closed: {totalClosed}"); } } else { Log.Error ( $"SSE ({Name}): IOException while trying to write: (Left open: {currentlyOpen}, total opened: {totalOpened}, closed: {totalClosed}"); Log.Exception (e); } resp.Close (); } catch (Exception e) { currentlyOpen--; totalClosed++; openStreams.RemoveAt (i); Log.Error ( $"SSE ({Name}): Exception while trying to write: (Left open: {currentlyOpen}, total opened: {totalOpened}, closed: {totalClosed}"); Log.Exception (e); resp.Close (); } } } } public virtual int DefaultPermissionLevel () { return 0; } } }