199 Matching Annotations
  1. Nov 2024
    1. Which is why you need to agree, as a group, on a collaborative tool everyone is comfortable with (TL:DR; it doesn’t exist for groups larger than 4), and then co-create on one platform while chatting on the other.
  2. Oct 2023
  3. Sep 2023
    1. permit streams to be transferred between workers, frames and anywhere else that postMessage() can be used. Chunks can be anything which is cloneable by postMessage(). Initially chunks enqueued in such a stream will always be cloned, ie. all data will be copied. Future work will extend the Streams APIs to support transferring objects (ie. zero copy).

      js const rs = new ReadableStream({ start(controller) { controller.enqueue('hello'); } }); const w = new Worker('worker.js'); w.postMessage(rs, [rs]);

      js onmessage = async (evt) => { const rs = evt.data; const reader = rs.getReader(); const {value, done} = await reader.read(); console.log(value); // logs 'hello'. };

    1. ``js // SSE only supports GET request export async function GET({ url }) { const stream = new ReadableStream({ start(controller) { // You can enqueue multiple data asynchronously here. const myData = ["abc", "def"] myData.forEach(data => { controller.enqueue(data: ${data}\n\n`) }) controller.close() }, cancel() { // cancel your resources here } });

      return new Response(stream, {
          headers: {
              // Denotes the response as SSE
              'Content-Type': 'text/event-stream', 
              // Optional. Request the GET request not to be cached.
              'Cache-Control': 'no-cache', 
          }
      })
      

      } ```

    1. On subsequent page loads using client-side navigation, we can allow the slow data to be streamed in. Client-side navigation will only occur if JavaScript is available, so there are no downsides to using promise streaming. If the user doesn’t have JavaScript, each navigation will trigger a full page load, and we will again wait for all data to resolve.We can switch between these two behaviors using the isDataRequest property on the RequestEvent passed to the load function. When this property is false, the request is for the HTML for the initial page load, and we should wait for all data to resolve. If the property is true, then the request is from SvelteKit’s client-side router and we can stream the data in.

      js export async function load({isDataRequest}) { const slowData = getSlowData(); return { nested: { slow: isDataRequest ? slowData : await slowData } }; }

    2. We can use Promise.race to give our promise a few hundred milliseconds to resolve. Promise.race takes an array of promises, and resolves when any one of those promises resolve. We can pass our delay call and our data fetching call, and conditionally await the result depending on which one resolves first.

      In this example, we race two promises: a 200ms delay and the actual data call we want to make. If the delay resolves first, then the data call is taking longer than 200ms and we should go ahead and render the page with partial data. If the data call resolves first, then we got the data under the time limit and we can render the page with complete data.

      ```js const TIME_TO_RESOLVE_MS = 200; export async function load() { const slowData = getSlowData();

      const result = await Promise.race([delay(TIME_TO_RESOLVE_MS), slowData]);

      return { nested: { slow: result ? result : slowData } }; }

      async function getSlowData() { // randomly delay either 50ms or 1s // this simulates variable API response times await delay(Math.random() < 0.5 ? 50 : 1000); return '😴'; }

      function delay(ms) { return new Promise(res => setTimeout(res, ms)); } ```

    1. Did you know you can now use streaming promises in SvelteKit? No need to import anything - it just works out of the box

      Every property of an object returned by the load function can be a promise which itself can return an object that can have a promise as property, and so on.

      Can build a tree of promises based on data delivery priority.

  4. Aug 2023
    1. SvelteKit will automatically await the fetchPost call before it starts rendering the page, since it’s at the top level. However, it won’t wait for the nested fetchComments call to complete – the page will render and data.streamed.comments will be a promise that will resolve as the request completes. We can show a loading state in the corresponding +page.svelte using Svelte’s await block:

      js export const load: PageServerLoad = () => { return { post: fetchPost(), streamed: { comments: fetchComments() } }; }; ```svelte

      <script lang="ts"> import type { PageData } from './$types'; export let data: PageData; </script> <article> {data.post} </article>

      {#await data.streamed.comments} Loading... {:then value} <br />

        {#each value as comment}
      1. {comment}
      2. {/each}

      {/await} ```

  5. Jul 2023
    1. ```js async function main() { const blob = new Blob([new Uint8Array(10 * 1024 * 1024)]); // any Blob, including a File const uploadProgress = document.getElementById("upload-progress"); const downloadProgress = document.getElementById("download-progress");

      const totalBytes = blob.size; let bytesUploaded = 0;

      // Use a custom TransformStream to track upload progress const progressTrackingStream = new TransformStream({ transform(chunk, controller) { controller.enqueue(chunk); bytesUploaded += chunk.byteLength; console.log("upload progress:", bytesUploaded / totalBytes); uploadProgress.value = bytesUploaded / totalBytes; }, flush(controller) { console.log("completed stream"); }, }); const response = await fetch("https://httpbin.org/put", { method: "PUT", headers: { "Content-Type": "application/octet-stream" }, body: blob.stream().pipeThrough(progressTrackingStream), duplex: "half", });

      // After the initial response headers have been received, display download progress for the response body let success = true; const totalDownloadBytes = response.headers.get("content-length"); let bytesDownloaded = 0; const reader = response.body.getReader(); while (true) { try { const { value, done } = await reader.read(); if (done) { break; } bytesDownloaded += value.length; if (totalDownloadBytes != undefined) { console.log("download progress:", bytesDownloaded / totalDownloadBytes); downloadProgress.value = bytesDownloaded / totalDownloadBytes; } else { console.log("download progress:", bytesDownloaded, ", unknown total"); } } catch (error) { console.error("error:", error); success = false; break; } }

      console.log("success:", success); } main().catch(console.error); ```

    1. On any Web page run the following code

      js await startLocalServer(); let abortable = new AbortController; let {signal} = abortable; (await fetch('https://localhost:8443', { method: 'post', body: 'cat local_server_export.js', // Code executed in server, piped to browser duplex: 'half', headers: { 'Access-Control-Request-Private-Network': true }, signal })).body.pipeThrough(new TextDecoderStream()).pipeTo(new WritableStream({ write(v) { console.log(v); }, close() { console.log('close'); }, abort(reason) { console.log(reason); } })).catch(console.warn); await resetLocalServer();

  6. Jun 2023
    1. ```js /* * Response from cache / self.addEventListener('fetch', event => { const response = self.caches.open('example') .then(caches => caches.match(event.request)) .then(response => response || fetch(event.request));

      event.respondWith(response); });

      /* * Response to SSE by text / self.addEventListener('fetch', event => { const { headers } = event.request; const isSSERequest = headers.get('Accept') === 'text/event-stream';

      if (!isSSERequest) { return; }

      event.respondWith(new Response('Hello!')); });

      /* * Response to SSE by stream / self.addEventListener('fetch', event => { const { headers } = event.request; const isSSERequest = headers.get('Accept') === 'text/event-stream';

      if (!isSSERequest) { return; }

      const responseText = 'Hello!'; const responseData = Uint8Array.from(responseText, x => x.charCodeAt(0)); const stream = new ReadableStream({ start: controller => controller.enqueue(responseData) }); const response = new Response(stream);

      event.respondWith(response); });

      /* * SSE chunk data / const sseChunkData = (data, event, retry, id) => Object.entries({ event, id, data, retry }) .filter(([, value]) => ![undefined, null].includes(value)) .map(([key, value]) => ${key}: ${value}) .join('\n') + '\n\n';

      /* * Success response to SSE from SW / self.addEventListener('fetch', event => { const { headers } = event.request; const isSSERequest = headers.get('Accept') === 'text/event-stream';

      if (!isSSERequest) { return; }

      const sseChunkData = (data, event, retry, id) => Object.entries({ event, id, data, retry }) .filter(([, value]) => ![undefined, null].includes(value)) .map(([key, value]) => ${key}: ${value}) .join('\n') + '\n\n';

      const sseHeaders = { 'content-type': 'text/event-stream', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', };

      const responseText = sseChunkData('Hello!'); const responseData = Uint8Array.from(responseText, x => x.charCodeAt(0)); const stream = new ReadableStream({ start: controller => controller.enqueue(responseData) }); const response = new Response(stream, { headers: sseHeaders });

      event.respondWith(response); });

      /* * Result / self.addEventListener('fetch', event => { const { headers, url } = event.request; const isSSERequest = headers.get('Accept') === 'text/event-stream';

      // Process only SSE connections if (!isSSERequest) { return; }

      // Headers for SSE response const sseHeaders = { 'content-type': 'text/event-stream', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', }; // Function for formatting message to SSE response const sseChunkData = (data, event, retry, id) => Object.entries({ event, id, data, retry }) .filter(([, value]) => ![undefined, null].includes(value)) .map(([key, value]) => ${key}: ${value}) .join('\n') + '\n\n';

      // Map with server connections, where key - url, value - EventSource const serverConnections = {}; // For each request opens only one server connection and use it for next requests with the same url const getServerConnection = url => { if (!serverConnections[url]) { serverConnections[url] = new EventSource(url); }

      return serverConnections[url];
      

      }; // On message from server forward it to browser const onServerMessage = (controller, { data, type, retry, lastEventId }) => { const responseText = sseChunkData(data, type, retry, lastEventId); const responseData = Uint8Array.from(responseText, x => x.charCodeAt(0)); controller.enqueue(responseData); }; const stream = new ReadableStream({ start: controller => getServerConnection(url).onmessage = onServerMessage.bind(null, controller) }); const response = new Response(stream, { headers: sseHeaders });

      event.respondWith(response); }); ```

    1. ```js self.addEventListener('fetch', event => { const { headers, url } = event.request; const isSSERequest = headers.get('Accept') === 'text/event-stream';

      // We process only SSE connections if (!isSSERequest) { return; }

      // Response Headers for SSE const sseHeaders = { 'content-type': 'text/event-stream', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', }; // Function formatting data for SSE const sseChunkData = (data, event, retry, id) => Object.entries({ event, id, data, retry }) .filter(([, value]) => ![undefined, null].includes(value)) .map(([key, value]) => ${key}: ${value}) .join('\n') + '\n\n'; // Table with server connections, where key is url, value is EventSource const serverConnections = {}; // For each url, we open only one connection to the server and use it for subsequent requests const getServerConnection = url => { if (!serverConnections[url]) serverConnections[url] = new EventSource(url);

      return serverConnections[url];
      

      }; // When we receive a message from the server, we forward it to the browser const onServerMessage = (controller, { data, type, retry, lastEventId }) => { const responseText = sseChunkData(data, type, retry, lastEventId); const responseData = Uint8Array.from(responseText, x => x.charCodeAt(0)); controller.enqueue(responseData); }; const stream = new ReadableStream({ start: controller => getServerConnection(url).onmessage = onServerMessage.bind(null, controller) }); const response = new Response(stream, { headers: sseHeaders });

      event.respondWith(response); }); ```

  7. May 2023
  8. Mar 2023
    1. ```js // Set up some pub/sub on the server

      import { EventEmitter } from "events"; export let emitter = new EventEmitter();

      // Set up an event stream with cleanup and queues // and stuff that subscribes to it and streams the // events when new stuff comes through:

      import { emitter } from "../some-emitter.server";

      type InitFunction = (send: SendFunction) => CleanupFunction; type SendFunction = (event: string, data: string) => void; type CleanupFunction = () => void;

      export function eventStream(request: Request, init: InitFunction) { let stream = new ReadableStream({ start(controller) { let encoder = new TextEncoder(); let send = (event: string, data: string) => { controller.enqueue(encoder.encode(event: ${event}\n)); controller.enqueue(encoder.encode(data: ${data}\n\n)); }; let cleanup = init(send);

        let closed = false;
        let close = () => {
          if (closed) return;
          cleanup();
          closed = true;
          request.signal.removeEventListener("abort", close);
          controller.close();
        };
      
        request.signal.addEventListener("abort", close);
        if (request.signal.aborted) {
          close();
          return;
        }
      },
      

      });

      return new Response(stream, { headers: { "Content-Type": "text/event-stream" }, }); }

      // Return the event stream from a loader in // a resource route:

      import { eventStream } from "./event-stream";

      export let loader: LoaderFunction = ({ request }) => { return eventStream(request, send => { emitter.addListener("messageReceived", handleChatMessage);

      function handleChatMessage(chatMessage: string) {
        send("message", chatMessage);
      }
      
      return () => {
        emitter.removeListener("messageReceived", handleChatMessage);
      };
      

      }); };

      // Push into the event emitter in actions:

      import { emitter } from "./some-emitter.server";

      export let action: ActionFunction = async ({ request }) => { let formData = await request.formData(); emitter.emit("messageReceived", formData.get("something"); return { ok: true }; };

      // And finally, set up an EventSource in the browser

      function useEventSource(href: string) { let [data, setData] = useState("");

      useEffect(() => { let eventSource = new EventSource(href); eventSource.addEventListener("message", handler);

      function handler(event: MessageEvent) {
        setData(event.data || "unknown");
      }
      
      return () => {
        eventSource.removeEventListener("message", handler);
      };
      

      }, []);

      return data; } ```

    1. ```js // Adds an entry to the event log on the page, optionally applying a specified // CSS class.

      let currentTransport, streamNumber, currentTransportDatagramWriter;

      // "Connect" button handler. async function connect() { const url = document.getElementById('url').value; try { var transport = new WebTransport(url); addToEventLog('Initiating connection...'); } catch (e) { addToEventLog('Failed to create connection object. ' + e, 'error'); return; }

      try { await transport.ready; addToEventLog('Connection ready.'); } catch (e) { addToEventLog('Connection failed. ' + e, 'error'); return; }

      transport.closed .then(() => { addToEventLog('Connection closed normally.'); }) .catch(() => { addToEventLog('Connection closed abruptly.', 'error'); });

      currentTransport = transport; streamNumber = 1; try { currentTransportDatagramWriter = transport.datagrams.writable.getWriter(); addToEventLog('Datagram writer ready.'); } catch (e) { addToEventLog('Sending datagrams not supported: ' + e, 'error'); return; } readDatagrams(transport); acceptUnidirectionalStreams(transport); document.forms.sending.elements.send.disabled = false; document.getElementById('connect').disabled = true; }

      // "Send data" button handler. async function sendData() { let form = document.forms.sending.elements; let encoder = new TextEncoder('utf-8'); let rawData = sending.data.value; let data = encoder.encode(rawData); let transport = currentTransport; try { switch (form.sendtype.value) { case 'datagram': await currentTransportDatagramWriter.write(data); addToEventLog('Sent datagram: ' + rawData); break; case 'unidi': { let stream = await transport.createUnidirectionalStream(); let writer = stream.getWriter(); await writer.write(data); await writer.close(); addToEventLog('Sent a unidirectional stream with data: ' + rawData); break; } case 'bidi': { let stream = await transport.createBidirectionalStream(); let number = streamNumber++; readFromIncomingStream(stream, number);

          let writer = stream.writable.getWriter();
          await writer.write(data);
          await writer.close();
          addToEventLog(
              'Opened bidirectional stream #' + number +
              ' with data: ' + rawData);
          break;
        }
      }
      

      } catch (e) { addToEventLog('Error while sending data: ' + e, 'error'); } }

      // Reads datagrams from |transport| into the event log until EOF is reached. async function readDatagrams(transport) { try { var reader = transport.datagrams.readable.getReader(); addToEventLog('Datagram reader ready.'); } catch (e) { addToEventLog('Receiving datagrams not supported: ' + e, 'error'); return; } let decoder = new TextDecoder('utf-8'); try { while (true) { const { value, done } = await reader.read(); if (done) { addToEventLog('Done reading datagrams!'); return; } let data = decoder.decode(value); addToEventLog('Datagram received: ' + data); } } catch (e) { addToEventLog('Error while reading datagrams: ' + e, 'error'); } }

      async function acceptUnidirectionalStreams(transport) { let reader = transport.incomingUnidirectionalStreams.getReader(); try { while (true) { const { value, done } = await reader.read(); if (done) { addToEventLog('Done accepting unidirectional streams!'); return; } let stream = value; let number = streamNumber++; addToEventLog('New incoming unidirectional stream #' + number); readFromIncomingStream(stream, number); } } catch (e) { addToEventLog('Error while accepting streams: ' + e, 'error'); } }

      async function readFromIncomingStream(stream, number) { let decoder = new TextDecoderStream('utf-8'); let reader = stream.pipeThrough(decoder).getReader(); try { while (true) { const { value, done } = await reader.read(); if (done) { addToEventLog('Stream #' + number + ' closed'); return; } let data = value; addToEventLog('Received data on stream #' + number + ': ' + data); } } catch (e) { addToEventLog( 'Error while reading from stream #' + number + ': ' + e, 'error'); addToEventLog(' ' + e.message); } }

      function addToEventLog(text, severity = 'info') { let log = document.getElementById('event-log'); let mostRecentEntry = log.lastElementChild; let entry = document.createElement('li'); entry.innerText = text; entry.className = 'log-' + severity; log.appendChild(entry);

      // If the most recent entry in the log was visible, scroll the log to the // newly added element. if (mostRecentEntry != null && mostRecentEntry.getBoundingClientRect().top < log.getBoundingClientRect().bottom) { entry.scrollIntoView(); } } ```

    1. Streaming allows you to break down the page's HTML into smaller chunks and progressively send those chunks from the server to the client.

    1. HTML templating and streaming response library for Worker Runtimes such as Cloudflare Workers.

      js function handleRequest(event: FetchEvent) { return new HTMLResponse(pageLayout('Hello World!', html` <h1>Hello World!</h1> ${async () => { const timeStamp = new Date( await fetch('https://time.api/now').then(r => r.text()) ); return html`<p>The current time is ${timeEl(timeStamp)}.</p>` }} `)); }

    1. Fortunately with webpackChunkName in an inline comment, we can instruct webpack to name the file something much friendlier.

      js const HeavyComponent = lazy(() => import(/* webpackChunkName: "HeavyComponent" */ './HeavyComponent');

    1. Similar to prefetching, you can specify the chunk name with an inline comment using the webpackChunkName key

      js const LazyLoad = lazy(() => import(/* webpackChunkName: 'lazyload' */ './LazyLoad') );

    2. For example, if the user is on the login page, you can load the post-login home page. An inline comment with the webpackPrefetch key within the dynamic import function will instruct Webpack to do just this.

      ```js const PrefetchLoad = lazy(() => import( / webpackPrefetch: true / './PrefetchLoad' ) );

      const App = () => { return ( <Suspense fallback="Loading"> {condition && <PrefetchLoad />} </Suspense> ); }; ```

    3. Now when App is rendered and a request is initiated to get the LazyLoad code, the fallback Loading is rendered. When this request completes, React will then render LazyLoad.

      js const App = () => { return ( <Suspense fallback="Loading"> <LazyLoad /> </Suspense> ); };

    4. Now, App and LazyLoad are in separate code chunks. A request is sent to fetch the LazyLoad code chunk only when App is rendered. When the request completes, React will then renderLazyLoad. You can verify this by looking at the Javascript requests in the network inspector.

      ```js import React, { lazy } from 'react';

      const LazyLoad = lazy(() => import('./LazyLoad'));

      const App = () => { return <LazyLoad />; }; ```

    1. You can have multiple nested React.lazy components inside React.Suspense.

      ```js

      const CatAvatar = React.lazy(() => import('./path/to/cat/avatar')); const ThorAvatar = React.lazy(() => import('./path/to/cat/thor-avatar'));

      const AppContainer = () => ( <React.Suspense fallback="loading..."> <CatAvatar /> <ThorAvatar /> </React.Suspense> ); ```

    1. Chunked encoding is useful when larger amounts of data are sent to the client and the total size of the response may not be known until the request has been fully processed. For example, when generating a large HTML table resulting from a database query or when transmitting large images.A chunked response looks like this:

      ```http HTTP/1.1 200 OK Content-Type: text/plain Transfer-Encoding: chunked

      7\r\n Mozilla\r\n 11\r\n Developer Network\r\n 0\r\n \r\n ```

    2. chunked Data is sent in a series of chunks. The Content-Length header is omitted in this case and at the beginning of each chunk you need to add the length of the current chunk in hexadecimal format, followed by '\r\n' and then the chunk itself, followed by another '\r\n'. The terminating chunk is a regular chunk, with the exception that its length is zero. It is followed by the trailer, which consists of a (possibly empty) sequence of header fields.
    3. ```abnf Transfer-Encoding: chunked Transfer-Encoding: compress Transfer-Encoding: deflate Transfer-Encoding: gzip

      // Several values can be listed, separated by a comma Transfer-Encoding: gzip, chunked ```

    1. ```js // FromStream component

      import { PureComponent } from 'react';

      export default class FromStream extends PureComponent { constructor(props) { super(props); this.state = { value: false }; }

      componentDidMount() { this.initStream(); }

      componentDidUpdate(prevProps) { if (prevProps.stream !== this.props.stream) { this.initStream(); } }

      componentWillUnmount() { if (this.unSubscribe) { this.unSubscribe(); } }

      initStream() { if (this.unSubscribe) { this.unSubscribe(); this.unSubscribe = null; }

      if (this.props.stream) {
        const onValue = (value) => {
          this.setState(() => ({ value: map(value) }));
        };
      
        this.props.stream.onValue(onValue);
        this.unSubscribe = () => stream.offValue(onValue);
      }
      

      }

      render() { return this.props.children(this.state && this.state.value); } } ```

      ```js // Date/Time import React from 'react'; import Kefir from 'kefir'; import FromStream from './FromStream';

      const dateStream = Kefir.interval(1000).map(() => new Date().toString());

      export default () => ( <FromStream stream={dateStream}> {(dateString) => dateString} </FromStream> ); ```

      ```js // Scroll import React from 'react'; import Kefir from 'kefir'; import FromStream from './FromStream';

      const scrolledPercentStream = Kefir.fromEvents(document, 'scroll').map((e) => { const scrollY = window.document.pageYOffset; const scrollHeight = e.target.body.scrollHeight; return scrollY / (scrollHeight - windiw.innerHeight) * 100; });

      export default () => ( <FromStream stream={scrolledPercentStream}> {(percent) => ( <div className="bar" style={{ top: <code>${percent}% }}></div> )} </FromStream> ); ```

    1. ```js import { useState, useEffect } from 'react';

      interface StreamState { data: Uint8Array | null; error: Error | null; controller: AbortController; }

      const useAbortableStreamFetch = (url: string, options?: RequestInit): { data: Uint8Array | null, error: Error | null, abort: () => void, } => {

      const [state, setState] = useState<StreamState>({ data: null, error: null, controller: new AbortController(), });

      useEffect(() => { (async () => { try { const resp = await fetch(url, { ...options, signal: state.controller.signal, }); if (!resp.ok || !resp.body) { throw resp.statusText; }

          const reader = resp.body.getReader();
          while (true) {
            const { value, done } = await reader.read();
            if (done) {
              break;
            }
      
            setState(prevState => ({ ...prevState, ...{ data: value } }));
          }
        } catch (err) {
          if (err.name !== 'AbortError') {
            setState(prevState => ({ ...prevState, ...{ error: err } }));
          }
        }
      })();
      
      return () => state.controller.abort();
      

      }, [url, options]);

      return { data: state.data, error: state.error, abort: () => state.controller && state.controller.abort(), }; };

      export default useAbortableStreamFetch; ```

    1. ```js import { renderToReadableStream } from 'react-dom/server'; import type { EntryContext } from '@remix-run/cloudflare'; import { RemixServer } from '@remix-run/react'; import { renderHeadToString } from 'remix-island'; import { Head } from './root';

      const readableString = (value: string) => { const te = new TextEncoder(); return new ReadableStream({ start(controller) { controller.enqueue(te.encode(value)); controller.close(); }, }); };

      export default async function handleRequest( request: Request, responseStatusCode: number, responseHeaders: Headers, remixContext: EntryContext, ) { const { readable, writable } = new TransformStream(); const head = readableString( <!DOCTYPE html><html><head>${renderHeadToString({ request, remixContext, Head, })}</head><body><div id="root">, ); const end = readableString(</div></body></html>);

      const body = await renderToReadableStream( <RemixServer context={remixContext} url={request.url} />, );

      Promise.resolve() .then(() => head.pipeTo(writable, { preventClose: true })) .then(() => body.pipeTo(writable, { preventClose: true })) .then(() => end.pipeTo(writable));

      responseHeaders.set('Content-Type', 'text/html');

      return new Response(readable, { status: responseStatusCode, headers: responseHeaders, }); } ```

    1. ```js import React, { Component } from 'react'; import './style.css'; import ndjsonStream from 'can-ndjson-stream';

      class App extends Component { constructor(props) { super(props);

      this.state = {
        todos: []
      };
      

      }

      componentDidMount(){ fetch('http://localhost:5000/api/10', { method: 'get' }).then(data => { return ndjsonStream(data.body); }).then((todoStream) => { const streamReader = todoStream.getReader(); const read = result => { if (result.done) return;

          this.setState({ 
            todos: this.state.todos.concat([result.value.user])
          });
      
          streamReader.read().then(read);
        };
      
        streamReader.read().then(read); 
      }).catch(err => {
        console.error(err)
      });
      

      }

      render() { return ( <div className="App">

      React + NDJSON Stream Demo

        {this.state.todos.map((todo, i) =>
      • {todo}
      • )}
      </div> ); } }

      export default App; ```

    1. Streaming across worker threads

      ```js import { ReadableStream } from 'node:stream/web'; import { Worker } from 'node:worker_threads';

      const readable = new ReadableStream(getSomeSource());

      const worker = new Worker('/path/to/worker.js', { workerData: readable, transferList: [readable], }); ```

      ```js const { workerData: stream } = require('worker_threads');

      const reader = stream.getReader(); reader.read().then(console.log); ```

    2. Consuming web streams

      ```js import { arrayBuffer, blob, buffer, json, text, } from 'node:stream/consumers';

      const data1 = await arrayBuffer(getReadableStreamSomehow());

      const data2 = await blob(getReadableStreamSomehow());

      const data3 = await buffer(getReadableStreamSomehow());

      const data4 = await json(getReadableStreamSomehow());

      const data5 = await text(getReadableStreamSomehow()); ```

    3. Adapting to the Node.js Streams API

      ```js /* * For instance, given a ReadableStream object, the stream.Readable.fromWeb() method * will create an return a Node.js stream.Readable object that can be used to consume * the ReadableStream's data: / import { Readable } from 'node:stream';

      const readable = new ReadableStream(getSomeSource());

      const nodeReadable = Readable.fromWeb(readable);

      nodeReadable.on('data', console.log); ```

      ```js /* * The adaptation can also work the other way -- starting with a Node.js * stream.Readable and acquiring a web streams ReadableStream: / import { Readable } from 'node:stream';

      const readable = new Readable({ read(size) { reader.push(Buffer.from('hello')); } });

      const readableStream = Readable.toWeb(readable);

      await readableStream.read();

      ```

    1. Async iteration of a stream using for await...ofThis example shows how you can process the fetch() response using a for await...of loop to iterate through the arriving chunks.

      ```js const response = await fetch("https://www.example.org"); let total = 0;

      // Iterate response.body (a ReadableStream) asynchronously for await (const chunk of response.body) { // Do something with each chunk // Here we just accumulate the size of the response. total += chunk.length; }

      // Do something with the total console.log(total); ```

    1. The body read-only property of the Response interface is a ReadableStream of the body contents.
  9. Feb 2023
    1. ```js import type { EntryContext } from "@remix-run/cloudflare"; import { RemixServer } from "@remix-run/react"; import isbot from "isbot"; import { renderToReadableStream } from "react-dom/server";

      const ABORT_DELAY = 5000;

      const handleRequest = async ( request: Request, responseStatusCode: number, responseHeaders: Headers, remixContext: EntryContext ) => { let didError = false;

      const stream = await renderToReadableStream( <RemixServer context={remixContext} url={request.url} abortDelay={ABORT_DELAY} />, { onError: (error: unknown) => { didError = true; console.error(error);

          // You can also log crash/error report
        },
        signal: AbortSignal.timeout(ABORT_DELAY),
      }
      

      );

      if (isbot(request.headers.get("user-agent"))) { await stream.allReady; }

      responseHeaders.set("Content-Type", "text/html"); return new Response(stream, { headers: responseHeaders, status: didError ? 500 : responseStatusCode, }); };

      export default handleRequest; ```

    1. js const wss = new WebSocketStream(url); const { readable } = await wss.connection; const reader = readable.getReader(); while (true) { const { value, done } = await reader.read(); if (done) break; await process(value); } done();


      js const wss = new WebSocketStream(url); const { writable } = await wss.connection; const writer = writable.getWriter(); for await (const message of messages) { await writer.write(message); }


      js const controller = new AbortController(); const wss = new WebSocketStream(url, { signal: controller.signal }); setTimeout(() => controller.abort(), 1000);

    1. ```js const supportsRequestStreams = (() => { let duplexAccessed = false;

      const hasContentType = new Request('', { body: new ReadableStream(), method: 'POST', get duplex() { duplexAccessed = true; return 'half'; }, }).headers.has('Content-Type');

      return duplexAccessed && !hasContentType; })();

      if (supportsRequestStreams) { // … } else { // … } ```

    1. ```js /* * Fetch and process the stream / async function process() { // Retrieve NDJSON from the server const response = await fetch('http://localhost:3000/request');

      const results = response.body // From bytes to text: .pipeThrough(new TextDecoderStream()) // Buffer until newlines: .pipeThrough(splitStream('\n')) // Parse chunks as JSON: .pipeThrough(parseJSON());

      // Loop through the results and write to the DOM writeToDOM(results.getReader()); }

      /* * Read through the results and write to the DOM * @param {object} reader / function writeToDOM(reader) { reader.read().then(({ value, done }) => { if (done) { console.log("The stream was already closed!");

      } else {
        // Build up the values
        let result = document.createElement('div');
        result.innerHTML =
          `<div>ID: ${value.id} - Phone: ${value.phone} - Result: $
              {value.result}</div><br>`;
      
        // Prepend to the target
        targetDiv.insertBefore(result, targetDiv.firstChild);
      
        // Recursively call
        writeToDOM(reader);
      }
      

      }, e => console.error("The stream became errored and cannot be read from!", e) ); } ```

    1. Node.js

      js import { renderToPipeableStream } from "react-dom/server.node"; import React from "react"; import http from "http"; const App = () => ( <html> <body> <h1>Hello World</h1> <p>This is an example.</p> </body> </html> ); var didError = false; http .createServer(function (req, res) { const stream = renderToPipeableStream(<App />, { onShellReady() { res.statusCode = didError ? 500 : 200; res.setHeader("Content-type", "text/html"); res.setHeader("Cache-Control", "no-transform"); stream.pipe(res); }, onShellError(error) { res.statusCode = 500; res.send( '<!doctype html><p>Loading...</p><script src="clientrender.js"></script>', ); }, onAllReady() { }, onError(err) { didError = true; console.error(err); }, }); }) .listen(3000);

      Deno

      ```js import { renderToReadableStream } from "https://esm.run/react-dom/server"; import * as React from "https://esm.run/react";

      const App = () => ( <html> <body>

      Hello World

      This is an example.

      </body> </html> );

      const headers = { headers: { "Content-Type": "text/html", "Cache-Control": "no-transform", }, };

      Deno.serve( async (req) => { return new Response(await renderToReadableStream(<App />), headers); }, { port: 3000 }, ); ```

      Bun

      ```js import { renderToReadableStream } from "react-dom/server"; const headers = { headers: { "Content-Type": "text/html", }, };

      const App = () => ( <html> <body>

      Hello World

      This is an example.

      </body> </html> );

      Bun.serve({ port: 3000, async fetch(req) { return new Response(await renderToReadableStream(<App />), headers); }, }); ```