26 Matching Annotations
  1. Last 7 days
    1. ``js // SSE only supports GET request export async function GET({ url }) { const stream = new ReadableStream({ start(controller) { // You can enqueue multiple data asynchronously here. const myData = ["abc", "def"] myData.forEach(data => { controller.enqueue(data: ${data}\n\n`) }) controller.close() }, cancel() { // cancel your resources here } });

      return new Response(stream, {
          headers: {
              // Denotes the response as SSE
              'Content-Type': 'text/event-stream', 
              // Optional. Request the GET request not to be cached.
              'Cache-Control': 'no-cache', 
          }
      })
      

      } ```

  2. Sep 2023
  3. Jul 2023
  4. Jun 2023
    1. ```js /* * Response from cache / self.addEventListener('fetch', event => { const response = self.caches.open('example') .then(caches => caches.match(event.request)) .then(response => response || fetch(event.request));

      event.respondWith(response); });

      /* * Response to SSE by text / self.addEventListener('fetch', event => { const { headers } = event.request; const isSSERequest = headers.get('Accept') === 'text/event-stream';

      if (!isSSERequest) { return; }

      event.respondWith(new Response('Hello!')); });

      /* * Response to SSE by stream / self.addEventListener('fetch', event => { const { headers } = event.request; const isSSERequest = headers.get('Accept') === 'text/event-stream';

      if (!isSSERequest) { return; }

      const responseText = 'Hello!'; const responseData = Uint8Array.from(responseText, x => x.charCodeAt(0)); const stream = new ReadableStream({ start: controller => controller.enqueue(responseData) }); const response = new Response(stream);

      event.respondWith(response); });

      /* * SSE chunk data / const sseChunkData = (data, event, retry, id) => Object.entries({ event, id, data, retry }) .filter(([, value]) => ![undefined, null].includes(value)) .map(([key, value]) => ${key}: ${value}) .join('\n') + '\n\n';

      /* * Success response to SSE from SW / self.addEventListener('fetch', event => { const { headers } = event.request; const isSSERequest = headers.get('Accept') === 'text/event-stream';

      if (!isSSERequest) { return; }

      const sseChunkData = (data, event, retry, id) => Object.entries({ event, id, data, retry }) .filter(([, value]) => ![undefined, null].includes(value)) .map(([key, value]) => ${key}: ${value}) .join('\n') + '\n\n';

      const sseHeaders = { 'content-type': 'text/event-stream', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', };

      const responseText = sseChunkData('Hello!'); const responseData = Uint8Array.from(responseText, x => x.charCodeAt(0)); const stream = new ReadableStream({ start: controller => controller.enqueue(responseData) }); const response = new Response(stream, { headers: sseHeaders });

      event.respondWith(response); });

      /* * Result / self.addEventListener('fetch', event => { const { headers, url } = event.request; const isSSERequest = headers.get('Accept') === 'text/event-stream';

      // Process only SSE connections if (!isSSERequest) { return; }

      // Headers for SSE response const sseHeaders = { 'content-type': 'text/event-stream', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', }; // Function for formatting message to SSE response const sseChunkData = (data, event, retry, id) => Object.entries({ event, id, data, retry }) .filter(([, value]) => ![undefined, null].includes(value)) .map(([key, value]) => ${key}: ${value}) .join('\n') + '\n\n';

      // Map with server connections, where key - url, value - EventSource const serverConnections = {}; // For each request opens only one server connection and use it for next requests with the same url const getServerConnection = url => { if (!serverConnections[url]) { serverConnections[url] = new EventSource(url); }

      return serverConnections[url];
      

      }; // On message from server forward it to browser const onServerMessage = (controller, { data, type, retry, lastEventId }) => { const responseText = sseChunkData(data, type, retry, lastEventId); const responseData = Uint8Array.from(responseText, x => x.charCodeAt(0)); controller.enqueue(responseData); }; const stream = new ReadableStream({ start: controller => getServerConnection(url).onmessage = onServerMessage.bind(null, controller) }); const response = new Response(stream, { headers: sseHeaders });

      event.respondWith(response); }); ```

    1. ```js self.addEventListener('fetch', event => { const { headers, url } = event.request; const isSSERequest = headers.get('Accept') === 'text/event-stream';

      // We process only SSE connections if (!isSSERequest) { return; }

      // Response Headers for SSE const sseHeaders = { 'content-type': 'text/event-stream', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', }; // Function formatting data for SSE const sseChunkData = (data, event, retry, id) => Object.entries({ event, id, data, retry }) .filter(([, value]) => ![undefined, null].includes(value)) .map(([key, value]) => ${key}: ${value}) .join('\n') + '\n\n'; // Table with server connections, where key is url, value is EventSource const serverConnections = {}; // For each url, we open only one connection to the server and use it for subsequent requests const getServerConnection = url => { if (!serverConnections[url]) serverConnections[url] = new EventSource(url);

      return serverConnections[url];
      

      }; // When we receive a message from the server, we forward it to the browser const onServerMessage = (controller, { data, type, retry, lastEventId }) => { const responseText = sseChunkData(data, type, retry, lastEventId); const responseData = Uint8Array.from(responseText, x => x.charCodeAt(0)); controller.enqueue(responseData); }; const stream = new ReadableStream({ start: controller => getServerConnection(url).onmessage = onServerMessage.bind(null, controller) }); const response = new Response(stream, { headers: sseHeaders });

      event.respondWith(response); }); ```

  5. Mar 2023
    1. ```js // Set up some pub/sub on the server

      import { EventEmitter } from "events"; export let emitter = new EventEmitter();

      // Set up an event stream with cleanup and queues // and stuff that subscribes to it and streams the // events when new stuff comes through:

      import { emitter } from "../some-emitter.server";

      type InitFunction = (send: SendFunction) => CleanupFunction; type SendFunction = (event: string, data: string) => void; type CleanupFunction = () => void;

      export function eventStream(request: Request, init: InitFunction) { let stream = new ReadableStream({ start(controller) { let encoder = new TextEncoder(); let send = (event: string, data: string) => { controller.enqueue(encoder.encode(event: ${event}\n)); controller.enqueue(encoder.encode(data: ${data}\n\n)); }; let cleanup = init(send);

        let closed = false;
        let close = () => {
          if (closed) return;
          cleanup();
          closed = true;
          request.signal.removeEventListener("abort", close);
          controller.close();
        };
      
        request.signal.addEventListener("abort", close);
        if (request.signal.aborted) {
          close();
          return;
        }
      },
      

      });

      return new Response(stream, { headers: { "Content-Type": "text/event-stream" }, }); }

      // Return the event stream from a loader in // a resource route:

      import { eventStream } from "./event-stream";

      export let loader: LoaderFunction = ({ request }) => { return eventStream(request, send => { emitter.addListener("messageReceived", handleChatMessage);

      function handleChatMessage(chatMessage: string) {
        send("message", chatMessage);
      }
      
      return () => {
        emitter.removeListener("messageReceived", handleChatMessage);
      };
      

      }); };

      // Push into the event emitter in actions:

      import { emitter } from "./some-emitter.server";

      export let action: ActionFunction = async ({ request }) => { let formData = await request.formData(); emitter.emit("messageReceived", formData.get("something"); return { ok: true }; };

      // And finally, set up an EventSource in the browser

      function useEventSource(href: string) { let [data, setData] = useState("");

      useEffect(() => { let eventSource = new EventSource(href); eventSource.addEventListener("message", handler);

      function handler(event: MessageEvent) {
        setData(event.data || "unknown");
      }
      
      return () => {
        eventSource.removeEventListener("message", handler);
      };
      

      }, []);

      return data; } ```

    1. We present deferred data by using React Suspense to conditionally show the content when it's ready. Suspense provides a fallback element to show when the data is not yet ready. Normally a loading spinner would go here, but we can use that to show our streamed progress instead.

      js export default function Index() { const data = useLoaderData() const params = useParams() const stream = useEventSource( `/items/${params.hash}/progress`, { event: "progress", }, ) return ( <div> <Suspense fallback={<span> {stream}% </span>}> <Await resolve={data.promise} errorElement={<p>Error loading img!</p>} > {(promise) => <img alt="" src={promise.img} />} </Await> </Suspense> </div> ) }

    2. On the client, while we're waiting for our deferred promise to resolve, we can consume that stream to know how far along our process is.

      js const stream = useEventSource( `/items/${params.hash}/progress`, { event: "progress", }, )

    3. In Remix, we can use a resource route to make this endpoint, and our loader will return a stream that constant checks our JSON file for its progress.

      js export async function loader({ request, params, }: LoaderArgs) { const hash = params.hash return eventStream(request.signal, function setup(send) { const interval = setInterval(() => { const file = fs.readFileSync( path.join("public", "items", `${hash}.json`), ) if (file.toString()) { const data = JSON.parse(file.toString()) const progress = data.progress send({ event: "progress", data: String(progress) }) if (progress === 100) { clearInterval(interval) } } }, 200) return function clear(timer: number) { clearInterval(interval) clearInterval(timer) } }) }

    4. server sent events work by having an endpoint that does not immediately close its connection, and which sends a content type of text/event-stream.
  6. Jan 2023
  7. Dec 2022
  8. Oct 2018