Changeset 487 for TFP-WebServer/WebServer/src/SSE
- Timestamp:
- Jun 17, 2024, 5:25:43 PM (7 months ago)
- Location:
- TFP-WebServer/WebServer/src/SSE
- Files:
-
- 1 added
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
TFP-WebServer/WebServer/src/SSE/AbsEvent.cs
r409 r487 1 1 using System; 2 2 using System.Collections.Generic; 3 using System.IO;4 using System.Net.Sockets;5 3 using System.Text; 6 4 using Webserver.UrlHandlers; 7 using HttpListenerResponse = SpaceWizards.HttpListener.HttpListenerResponse;8 5 9 6 namespace Webserver.SSE { 10 7 public abstract class AbsEvent { 11 8 private const int encodingBufferSize = 1024 * 1024; 12 private const int keepAliveIntervalSeconds = 10;13 private static readonly byte[] keepAliveData = Encoding.UTF8.GetBytes (": KeepAlive\n\n");14 9 15 10 private readonly SseHandler parent; … … 18 13 private readonly byte[] encodingBuffer; 19 14 private readonly StringBuilder stringBuilder = new StringBuilder (); 20 private DateTime lastMessageSent;21 15 22 private readonly List< HttpListenerResponse> openStreams = new List<HttpListenerResponse> ();16 private readonly List<SseClient> openClients = new List<SseClient> (); 23 17 24 18 private readonly BlockingQueue<(string _eventName, string _data)> sendQueue = … … 37 31 } 38 32 39 public void AddListener ( HttpListenerResponse _resp) {33 public void AddListener (SseClient _client) { 40 34 totalOpened++; 41 35 currentlyOpen++; 42 36 43 open Streams.Add (_resp);37 openClients.Add (_client); 44 38 } 45 39 … … 50 44 51 45 public void ProcessSendQueue () { 52 bool dataSent = false;53 54 46 while (sendQueue.HasData ()) { 55 47 (string eventName, string data) = sendQueue.Dequeue (); … … 81 73 } 82 74 83 dataSent = true;84 85 75 sendBufToListeners (buf, bytesToSend); 86 }87 88 DateTime now = DateTime.Now;89 if (dataSent) {90 lastMessageSent = now;91 } else if ((now - lastMessageSent).TotalSeconds >= keepAliveIntervalSeconds) {92 sendBufToListeners (keepAliveData, keepAliveData.Length);93 lastMessageSent = now;94 76 } 95 77 } 96 78 97 79 private void sendBufToListeners (byte[] _bytes, int _bytesToSend) { 98 for (int i = openStreams.Count - 1; i >= 0; i--) { 99 HttpListenerResponse resp = openStreams [i]; 100 try { 101 if (resp.OutputStream.CanWrite) { 102 resp.OutputStream.Write (_bytes, 0, _bytesToSend); 103 resp.OutputStream.Flush (); 104 } else { 105 currentlyOpen--; 106 totalClosed++; 80 for (int i = openClients.Count - 1; i >= 0; i--) { 81 ESseClientWriteResult writeResult = openClients [i].Write (_bytes, _bytesToSend); 82 if (writeResult == ESseClientWriteResult.Ok) { 83 continue; 84 } 107 85 108 logError ("Can not write to endpoint, closing", true); 109 openStreams.RemoveAt (i); 110 resp.Close (); 111 } 112 } catch (IOException e) { 113 currentlyOpen--; 114 totalClosed++; 86 currentlyOpen--; 87 totalClosed++; 115 88 116 openStreams.RemoveAt (i); 117 118 if (e.InnerException is SocketException se) { 119 if (se.SocketErrorCode != SocketError.ConnectionAborted && se.SocketErrorCode != SocketError.Shutdown) { 120 logError ($"SocketError ({se.SocketErrorCode.ToStringCached ()}) while trying to write", true); 121 } 122 } else { 123 logError ("IOException while trying to write:", true); 124 Log.Exception (e); 125 } 126 } catch (Exception e) { 127 currentlyOpen--; 128 totalClosed++; 129 130 openStreams.RemoveAt (i); 131 logError ("Exception while trying to write:", true); 132 Log.Exception (e); 133 resp.Close (); 89 if (writeResult == ESseClientWriteResult.Error) { 90 logError ("Can not write to endpoint, closing", true); 134 91 } 135 92 } … … 143 100 144 101 public virtual int DefaultPermissionLevel () => 0; 102 103 public void ClientClosed (SseClient _client) { 104 openClients.Remove (_client); 105 } 145 106 } 146 107 }
Note:
See TracChangeset
for help on using the changeset viewer.