*Refactor to a Web Stream based architecture for processing IPFS content
*Implement a concept of content filters to support modifying stream data before rendering it *Add a CSP filter to remove CSP meta tags
This commit is contained in:
parent
a9faa22b19
commit
33c6302205
|
@ -0,0 +1,4 @@
|
|||
export default abstract class ContentFilterBase {
|
||||
abstract getMimeTypes(): string[];
|
||||
abstract filter(data: string): Promise<string>;
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
import ContentFilterBase from "./contentFilterBase.js";
|
||||
|
||||
export default class ContentFilterRegistry {
|
||||
private static contentFilters: ContentFilterBase[] = [];
|
||||
|
||||
public static hasFilters(type: string): boolean {
|
||||
return (
|
||||
ContentFilterRegistry.contentFilters.filter((item) =>
|
||||
item.getMimeTypes().includes(type)
|
||||
).length > 0
|
||||
);
|
||||
}
|
||||
|
||||
public static filter(type: string) {
|
||||
return async (data: Uint8Array) => {
|
||||
const filters = ContentFilterRegistry.contentFilters.filter((item) =>
|
||||
item.getMimeTypes().includes(type)
|
||||
);
|
||||
|
||||
if (!filters.length) {
|
||||
return data;
|
||||
}
|
||||
|
||||
let filterData = new TextDecoder().decode(data);
|
||||
|
||||
for (const filter of filters) {
|
||||
filterData = await filter.filter(filterData);
|
||||
}
|
||||
|
||||
return new TextEncoder().encode(filterData);
|
||||
};
|
||||
}
|
||||
|
||||
public static registerFilter(contentFilter: ContentFilterBase): void {
|
||||
ContentFilterRegistry.contentFilters.push(contentFilter);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
import ContentFilterBase from "../contentFilterBase.js";
|
||||
|
||||
export default class Csp extends ContentFilterBase {
|
||||
async filter(data: string): Promise<string> {
|
||||
let htmlDoc = new DOMParser().parseFromString(
|
||||
data as string,
|
||||
this.getMimeTypes().shift() as any
|
||||
);
|
||||
let found = htmlDoc.documentElement.querySelectorAll(
|
||||
'meta[http-equiv="Content-Security-Policy"]'
|
||||
);
|
||||
|
||||
if (found.length) {
|
||||
found.forEach((item) => item.remove());
|
||||
data = htmlDoc.documentElement.outerHTML;
|
||||
}
|
||||
|
||||
return data;
|
||||
}
|
||||
|
||||
getMimeTypes(): string[] {
|
||||
return ["text/html"];
|
||||
}
|
||||
}
|
|
@ -0,0 +1,4 @@
|
|||
import ContentFilterRegistry from "../contentFilterRegistry.js";
|
||||
import Csp from "./csp.js";
|
||||
|
||||
ContentFilterRegistry.registerFilter(new Csp());
|
|
@ -7,8 +7,7 @@ import {
|
|||
OnRequestDetailsType,
|
||||
StreamFilter,
|
||||
} from "../types.js";
|
||||
import { getRelayProxies } from "../util.js";
|
||||
import browser from "@lumeweb/webextension-polyfill";
|
||||
import { getRelayProxies, streamToArray } from "../util.js";
|
||||
import { ipfsPath, ipnsPath, path } from "is-ipfs";
|
||||
import {
|
||||
fetchIpfs,
|
||||
|
@ -17,13 +16,10 @@ import {
|
|||
statIpns,
|
||||
} from "@lumeweb/kernel-ipfs-client";
|
||||
import ejs from "ejs";
|
||||
import {
|
||||
CONTENT_MODE_BUFFERED,
|
||||
CONTENT_MODE_CHUNKED,
|
||||
contentModes,
|
||||
} from "../mimes.js";
|
||||
import { cacheDb } from "../databases.js";
|
||||
import { DNS_RECORD_TYPE, DNSResult } from "@lumeweb/libresolver";
|
||||
import RequestStream from "../requestStream.js";
|
||||
import ContentFilterRegistry from "../contentFilterRegistry.js";
|
||||
|
||||
const INDEX_HTML_FILES = ["index.html", "index.htm", "index.shtml"];
|
||||
|
||||
|
@ -144,6 +140,7 @@ export default class IpfsProvider extends BaseProvider {
|
|||
urlObj.protocol = "http";
|
||||
return { redirectUrl: urlObj.toString() };
|
||||
}
|
||||
let contentSize = 0;
|
||||
|
||||
let cachedPage: { contentType: string; data: Blob } | null = null;
|
||||
|
||||
|
@ -173,12 +170,12 @@ export default class IpfsProvider extends BaseProvider {
|
|||
if (contentType?.includes(";")) {
|
||||
contentType = contentType?.split(";").shift() as string;
|
||||
}
|
||||
contentSize = resp?.size as number;
|
||||
} else {
|
||||
contentType = cachedPage.contentType;
|
||||
contentSize = cachedPage.data.size;
|
||||
}
|
||||
|
||||
let status = "200";
|
||||
|
||||
if (resp) {
|
||||
if (!resp.exists) {
|
||||
err = "404";
|
||||
|
@ -190,77 +187,26 @@ export default class IpfsProvider extends BaseProvider {
|
|||
|
||||
this.setData(details, "contentType", contentType);
|
||||
|
||||
if (err) {
|
||||
if (err === "NOT_FOUND") {
|
||||
err = "404";
|
||||
}
|
||||
if (err === "timeout") {
|
||||
err = "408";
|
||||
}
|
||||
if (err.includes("no link")) {
|
||||
err = "404";
|
||||
}
|
||||
|
||||
this.setData(details, "error", err);
|
||||
|
||||
if (!isNaN(parseInt(err))) {
|
||||
status = err;
|
||||
}
|
||||
}
|
||||
|
||||
this.setData(details, "status", status);
|
||||
|
||||
let filterPromiseResolve: any;
|
||||
let filterPromise = new Promise((resolve) => {
|
||||
filterPromiseResolve = resolve;
|
||||
});
|
||||
let streamPromise = Promise.resolve();
|
||||
const filter: StreamFilter = browser.webRequest.filterResponseData(
|
||||
details.requestId
|
||||
const isSmallFile = contentSize <= MAX_CACHE_SIZE;
|
||||
const reqStream = new RequestStream(
|
||||
details,
|
||||
isSmallFile && ContentFilterRegistry.hasFilters(contentType)
|
||||
? ContentFilterRegistry.filter(contentType)
|
||||
: undefined
|
||||
);
|
||||
filter.ondata = () => {};
|
||||
filter.onstop = () => {
|
||||
filterPromiseResolve();
|
||||
};
|
||||
|
||||
let buffer: Uint8Array[] = [];
|
||||
let cacheBuffer: Uint8Array[] | Uint8Array = [];
|
||||
|
||||
const receiveUpdate = (chunk: Uint8Array) => {
|
||||
if (!chunk.buffer.byteLength && chunk.byteOffset === 0) {
|
||||
return filterPromise;
|
||||
}
|
||||
if (
|
||||
Object.keys(contentModes).includes(contentType as string) &&
|
||||
[CONTENT_MODE_CHUNKED, CONTENT_MODE_BUFFERED].includes(
|
||||
contentModes[contentType as string]
|
||||
)
|
||||
) {
|
||||
buffer.push(chunk);
|
||||
resp = resp as StatFileResponse;
|
||||
cacheBuffer = cacheBuffer as Uint8Array[];
|
||||
if (!cachedPage && resp.size <= MAX_CACHE_SIZE) {
|
||||
cacheBuffer.push(chunk);
|
||||
}
|
||||
|
||||
return filterPromise;
|
||||
}
|
||||
|
||||
return filterPromise.then(() => {
|
||||
streamPromise = streamPromise.then(() => {
|
||||
filter.write(chunk);
|
||||
cacheBuffer = cacheBuffer as Uint8Array[];
|
||||
cacheBuffer.push(chunk);
|
||||
});
|
||||
});
|
||||
};
|
||||
reqStream.start();
|
||||
|
||||
if (err) {
|
||||
// receiveUpdate(new TextEncoder().encode(serverErrorTemplate()));
|
||||
filterPromise.then(() => streamPromise).then(() => filter.close());
|
||||
reqStream.close();
|
||||
return {};
|
||||
}
|
||||
|
||||
if (cachedPage) {
|
||||
(
|
||||
cachedPage?.data.stream() as unknown as ReadableStream<Uint8Array>
|
||||
).pipeThrough(reqStream.stream);
|
||||
return {};
|
||||
}
|
||||
if (resp?.directory) {
|
||||
let indexFiles =
|
||||
resp?.files.filter((item) => INDEX_HTML_FILES.includes(item.name)) ||
|
||||
|
@ -268,106 +214,30 @@ export default class IpfsProvider extends BaseProvider {
|
|||
|
||||
if (indexFiles.length > 0) {
|
||||
urlPath += `/${indexFiles[0].name}`;
|
||||
} else {
|
||||
const renderedDirectory = DIRECTORY_TEMPLATE(resp?.files);
|
||||
receiveUpdate(new TextEncoder().encode(renderedDirectory));
|
||||
filterPromise
|
||||
.then(() => streamPromise)
|
||||
.then(() => {
|
||||
filter.close();
|
||||
});
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
const handleBuffer = () => {
|
||||
if (buffer.length) {
|
||||
return filterPromise.then(() => {
|
||||
streamPromise = streamPromise.then(() => {
|
||||
let mode = contentModes[contentType as string];
|
||||
buffer = buffer.map((item: Uint8Array | ArrayBuffer) => {
|
||||
if (item instanceof ArrayBuffer) {
|
||||
return new Uint8Array(item);
|
||||
}
|
||||
return item;
|
||||
});
|
||||
if (mode === CONTENT_MODE_BUFFERED) {
|
||||
let data: string | Uint8Array = Uint8Array.from(
|
||||
buffer.reduce(
|
||||
(previousValue: Uint8Array, currentValue: Uint8Array) => {
|
||||
return Uint8Array.from([...previousValue, ...currentValue]);
|
||||
},
|
||||
new Uint8Array()
|
||||
)
|
||||
);
|
||||
filter.write(data);
|
||||
} else if (mode == CONTENT_MODE_CHUNKED) {
|
||||
buffer.forEach((data) => filter.write(data));
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
return Promise.resolve();
|
||||
};
|
||||
if (cachedPage) {
|
||||
// @ts-ignore
|
||||
cachedPage.data.arrayBuffer().then((data: ArrayBuffer) => {
|
||||
if (isSmallFile) {
|
||||
streamToArray(reqStream.readableStream).then((data: Uint8Array) => {
|
||||
// @ts-ignore
|
||||
receiveUpdate(new Uint8Array(data))
|
||||
?.then(() => {
|
||||
return handleBuffer();
|
||||
})
|
||||
.then(() => {
|
||||
return filterPromise.then(() => filter.close());
|
||||
});
|
||||
return cacheDb.items.put({
|
||||
url: details.url,
|
||||
contentType,
|
||||
data: new Blob([data.buffer], { type: contentType }),
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
});
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
const streamWriter = reqStream.stream.writable.getWriter();
|
||||
|
||||
// @ts-ignore
|
||||
fetchMethod?.(hash, urlPath, receiveUpdate)
|
||||
.then(() => streamPromise)
|
||||
.then(() => handleBuffer())
|
||||
.then(() => {
|
||||
filterPromise.then(() => streamPromise).then(() => filter.close());
|
||||
resp = resp as StatFileResponse;
|
||||
if (resp.size <= MAX_CACHE_SIZE) {
|
||||
cacheBuffer = Uint8Array.from(
|
||||
(cacheBuffer as Uint8Array[]).reduce(
|
||||
(previousValue: Uint8Array, currentValue: Uint8Array) => {
|
||||
return Uint8Array.from([...previousValue, ...currentValue]);
|
||||
},
|
||||
new Uint8Array()
|
||||
)
|
||||
);
|
||||
|
||||
// @ts-ignore
|
||||
return cacheDb.items.put({
|
||||
url: details.url,
|
||||
contentType,
|
||||
data: new Blob([cacheBuffer.buffer], { type: contentType }),
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
}
|
||||
})
|
||||
.catch((e) => {
|
||||
console.error("page error", urlPath, e.message);
|
||||
/* if (
|
||||
urlPath.endsWith(".html") ||
|
||||
urlPath.endsWith(".htm") ||
|
||||
urlPath.endsWith(".xhtml") ||
|
||||
urlPath.endsWith(".shtml")
|
||||
) {
|
||||
this.setData(details, "contentType", "text/html");
|
||||
let template = serverErrorTemplate();
|
||||
contentLength = template.length;
|
||||
receiveUpdate(new TextEncoder().encode(template));
|
||||
this.setData(details, "contentLength", contentLength);
|
||||
}*/
|
||||
filterPromise.then(() => streamPromise).then(() => filter.close());
|
||||
});
|
||||
fetchMethod?.(hash, urlPath, (data: Buffer) => {
|
||||
streamWriter.write(data);
|
||||
}).then(() => {
|
||||
streamWriter.releaseLock();
|
||||
return reqStream.close();
|
||||
});
|
||||
|
||||
return {};
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ import {
|
|||
var browser: any; // tsc
|
||||
|
||||
const defaultKernelResolverLink =
|
||||
"AQBFjXpEBwbMwkBwYg0gdkeAM-yy9vlajfLtZSee9f-MDg";
|
||||
"AQDJDoXMJiiEMBxXodQvUV89qtQHsnXWyV1ViQ9M1pMjUg";
|
||||
|
||||
document.title = "kernel.skynet";
|
||||
let header = document.createElement("h1");
|
||||
|
@ -400,7 +400,7 @@ function loadKernel() {
|
|||
}
|
||||
|
||||
try {
|
||||
window.eval(kernelCode);
|
||||
eval(kernelCode);
|
||||
kernelLoaded = "success";
|
||||
sendAuthUpdate();
|
||||
log("kernel successfully loaded");
|
||||
|
|
16
src/mimes.ts
16
src/mimes.ts
|
@ -1,16 +0,0 @@
|
|||
export const CONTENT_MODE_BUFFERED = "buffered";
|
||||
export const CONTENT_MODE_CHUNKED = "chunked";
|
||||
//export const CONTENT_MODE_STREAMED = "streamed";
|
||||
|
||||
export const contentModes: { [mimeType: string]: string } = {
|
||||
// Images
|
||||
"image/png": CONTENT_MODE_BUFFERED,
|
||||
"image/jpeg": CONTENT_MODE_BUFFERED,
|
||||
"image/x-citrix-jpeg": CONTENT_MODE_BUFFERED,
|
||||
"image/gif": CONTENT_MODE_BUFFERED,
|
||||
"image/webp": CONTENT_MODE_BUFFERED,
|
||||
|
||||
//JS
|
||||
"application/javascript": CONTENT_MODE_BUFFERED,
|
||||
"application/json": CONTENT_MODE_BUFFERED,
|
||||
};
|
|
@ -0,0 +1,55 @@
|
|||
import { OnBeforeRequestDetailsType, StreamFilter } from "./types.js";
|
||||
import browser from "@lumeweb/webextension-polyfill";
|
||||
import { iterateStream, streamToArray } from "./util.js";
|
||||
|
||||
export default class RequestStream {
|
||||
private _request: OnBeforeRequestDetailsType;
|
||||
private _filter: StreamFilter;
|
||||
private _contentFilter?: (data: Uint8Array) => Promise<Uint8Array>;
|
||||
private _readableStream: ReadableStream;
|
||||
|
||||
constructor(
|
||||
request: OnBeforeRequestDetailsType,
|
||||
contentFilter?: (data: Uint8Array) => Promise<Uint8Array>
|
||||
) {
|
||||
this._request = request;
|
||||
this._contentFilter = contentFilter;
|
||||
this._filter = browser.webRequest.filterResponseData(request.requestId);
|
||||
this._stream = new TransformStream<Uint8Array>();
|
||||
this._readableStream = this._stream.readable;
|
||||
}
|
||||
|
||||
private _stream: TransformStream<Uint8Array>;
|
||||
|
||||
get stream(): TransformStream {
|
||||
return this._stream;
|
||||
}
|
||||
|
||||
get readableStream(): ReadableStream {
|
||||
const [a, b] = this._readableStream.tee();
|
||||
this._readableStream = a;
|
||||
|
||||
return b;
|
||||
}
|
||||
|
||||
public start(): void {
|
||||
this._filter.onstop = async () => {
|
||||
if (this._contentFilter) {
|
||||
const data = await this._contentFilter(
|
||||
await streamToArray(this._readableStream)
|
||||
);
|
||||
this._filter.write(data);
|
||||
this._filter.close();
|
||||
return;
|
||||
}
|
||||
for await (const chunk of iterateStream(this._readableStream)) {
|
||||
this._filter.write(chunk);
|
||||
}
|
||||
this._filter.close();
|
||||
};
|
||||
}
|
||||
|
||||
public async close() {
|
||||
await this._stream.writable.close();
|
||||
}
|
||||
}
|
27
src/util.ts
27
src/util.ts
|
@ -129,3 +129,30 @@ export function downloadSkylink(
|
|||
});
|
||||
});
|
||||
}
|
||||
|
||||
export async function* iterateStream(
|
||||
stream: ReadableStream<any>
|
||||
): AsyncGenerator<Uint8Array> {
|
||||
let chunk;
|
||||
const reader = stream.getReader();
|
||||
do {
|
||||
chunk = await reader.read();
|
||||
if (chunk.value) {
|
||||
yield chunk.value;
|
||||
}
|
||||
} while (!chunk.done);
|
||||
|
||||
reader.releaseLock();
|
||||
}
|
||||
|
||||
export async function streamToArray(
|
||||
stream: ReadableStream<Uint8Array>
|
||||
): Promise<Uint8Array> {
|
||||
let buffer = new Uint8Array();
|
||||
|
||||
for await (const chunk of iterateStream(stream)) {
|
||||
buffer = Uint8Array.from([...buffer, ...chunk]);
|
||||
}
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
import browser from "@lumeweb/webextension-polyfill";
|
||||
|
||||
import BaseProvider from "./contentProviders/baseProvider.js";
|
||||
import {
|
||||
BlockingResponse,
|
||||
|
@ -18,6 +17,8 @@ import { blake2b, bufToHex } from "libskynet";
|
|||
import { getAuthStatus } from "./main/vars.js";
|
||||
import { DNSResult } from "@lumeweb/libresolver";
|
||||
|
||||
import "./contentFilters/index.js";
|
||||
|
||||
export default class WebEngine {
|
||||
private contentProviders: BaseProvider[] = [];
|
||||
private requests: Map<string, BaseProvider> = new Map();
|
||||
|
|
Reference in New Issue