Implement automatic local file sync for desktop (#1803)

* Implement local file sync WIP

* keep dropzone

* leave in failure checks and update sync comments

* update job worker with new bree

* revert to non-debug settings

* update generateChunnkSources to single location

* always winston logger for desktop

* update warning comment
This commit is contained in:
Timothy Carambat
2024-07-11 13:23:47 -07:00
committed by GitHub
parent 9f85df39ed
commit f8d61e9061
29 changed files with 573 additions and 100 deletions

View File

@@ -1,4 +1,5 @@
const { getLinkText } = require("../../processLink");
const { processFileByReference } = require("../../processSingleFile");
/**
* Fetches the content of a raw link. Returns the content as a text string of the link in question.
@@ -75,8 +76,8 @@ async function resyncConfluence({ chunkSource }, response) {
}
/**
* Fetches the content of a specific confluence page via its chunkSource.
* Returns the content as a text string of the page in question and only that page.
* Fetches the content of a specific Github file via its chunkSource.
* Returns the content as a text string of the file in question and only that file.
* @param {object} data - metadata from document (eg: chunkSource)
* @param {import("../../middleware/setDataSigner").ResponseWithSigner} response
*/
@@ -105,9 +106,31 @@ async function resyncGithub({ chunkSource }, response) {
}
}
/**
* Fetches the content of a specific local file via its full path reference.
* Returns the content as a text string of the document in question.
* @param {object} data - source from document (eg: source)
* @param {import("../../middleware/setDataSigner").ResponseWithSigner} response
*/
async function resyncLocalfile({ source }, response) {
if (!source) throw new Error('Invalid source property provided');
try {
const { success, reason, content } = await processFileByReference(source);
if (!success) throw new Error(`Failed to resync local file content. ${reason}`);
response.status(200).json({ success, content });
} catch (e) {
console.error(e);
response.status(200).json({
success: false,
content: null,
});
}
}
module.exports = {
link: resyncLink,
youtube: resyncYouTube,
confluence: resyncConfluence,
github: resyncGithub,
localfile: resyncLocalfile,
}

View File

@@ -70,4 +70,4 @@ async function asAudio({ fullFilePath = "", filename = "", options = {} }) {
return { success: true, reason: null, documents: [document] };
}
module.exports = asAudio;
module.exports = { asAudio };

View File

@@ -7,8 +7,9 @@ const {
} = require("../../utils/files");
const { tokenizeString } = require("../../utils/tokenizer");
const { default: slugify } = require("slugify");
const { generateLocalfileChunkSource } = require("../../utils/metadata");
async function asDocX({ fullFilePath = "", filename = "" }) {
async function asDocx({ fullFilePath = "", filename = "", options = {} }) {
const loader = new DocxLoader(fullFilePath);
console.log(`-- Working ${filename} --`);
@@ -38,7 +39,7 @@ async function asDocX({ fullFilePath = "", filename = "" }) {
docAuthor: "no author found",
description: "No description found.",
docSource: "pdf file uploaded by the user.",
chunkSource: "",
chunkSource: generateLocalfileChunkSource({ filename, ...options }, ""),
published: createdDate(fullFilePath),
wordCount: content.split(" ").length,
pageContent: content,
@@ -54,4 +55,31 @@ async function asDocX({ fullFilePath = "", filename = "" }) {
return { success: true, reason: null, documents: [document] };
}
module.exports = asDocX;
async function resyncDocx({ fullFilePath = "", filename = "" }) {
const loader = new DocxLoader(fullFilePath);
console.log(`-- Syncing ${filename} --`);
let pageContent = [];
const docs = await loader.load();
for (const doc of docs) {
console.log(doc.metadata);
console.log(`-- Parsing content from docx page --`);
if (!doc.pageContent.length) continue;
pageContent.push(doc.pageContent);
}
if (!pageContent.length) {
console.error(`Resulting text content was empty for ${filename}.`);
return {
success: false,
reason: `No text content found in ${filename}.`,
content: null,
};
}
const content = pageContent.join("");
console.log(`[SYNC SUCCESS]: ${filename} content was able to be synced.\n`);
return { success: true, reason: null, content };
}
module.exports = { asDocx, resyncDocx };

View File

@@ -7,8 +7,9 @@ const {
writeToServerDocuments,
} = require("../../utils/files");
const { default: slugify } = require("slugify");
const { generateLocalfileChunkSource } = require("../../utils/metadata");
async function asEPub({ fullFilePath = "", filename = "" }) {
async function asEPub({ fullFilePath = "", filename = "", options = {} }) {
let content = "";
try {
const loader = new EPubLoader(fullFilePath, { splitChapters: false });
@@ -36,7 +37,7 @@ async function asEPub({ fullFilePath = "", filename = "" }) {
docAuthor: "Unknown", // TODO: Find a better author
description: "Unknown", // TODO: Find a better description
docSource: "a epub file uploaded by the user.",
chunkSource: "",
chunkSource: generateLocalfileChunkSource({ filename, ...options }, ""),
published: createdDate(fullFilePath),
wordCount: content.split(" ").length,
pageContent: content,
@@ -52,4 +53,28 @@ async function asEPub({ fullFilePath = "", filename = "" }) {
return { success: true, reason: null, documents: [document] };
}
module.exports = asEPub;
async function resyncEPub({ fullFilePath = "", filename = "" }) {
let content = "";
console.log(`-- Syncing ${filename} --`);
try {
const loader = new EPubLoader(fullFilePath, { splitChapters: false });
const docs = await loader.load();
docs.forEach((doc) => (content += doc.pageContent));
} catch (err) {
console.error("Could not read epub file!", err);
}
if (!content?.length) {
console.error(`Resulting text content was empty for ${filename}.`);
return {
success: false,
reason: `No text content found in ${filename}.`,
content: null,
};
}
console.log(`[SYNC SUCCESS]: ${filename} content was able to be synced.\n`);
return { success: true, reason: null, content };
}
module.exports = { asEPub, resyncEPub };

View File

@@ -71,4 +71,4 @@ async function asMbox({ fullFilePath = "", filename = "" }) {
return { success: true, reason: null, documents };
}
module.exports = asMbox;
module.exports = { asMbox };

View File

@@ -7,8 +7,13 @@ const {
} = require("../../utils/files");
const { tokenizeString } = require("../../utils/tokenizer");
const { default: slugify } = require("slugify");
const { generateLocalfileChunkSource } = require("../../utils/metadata");
async function asOfficeMime({ fullFilePath = "", filename = "" }) {
async function asOfficeMime({
fullFilePath = "",
filename = "",
options = {},
}) {
console.log(`-- Working ${filename} --`);
let content = "";
try {
@@ -34,7 +39,10 @@ async function asOfficeMime({ fullFilePath = "", filename = "" }) {
docAuthor: "no author found",
description: "No description found.",
docSource: "Office file uploaded by the user.",
chunkSource: "",
chunkSource: generategenerateLocalfileChunkSourceChunkSource(
{ filename, ...options },
""
),
published: createdDate(fullFilePath),
wordCount: content.split(" ").length,
pageContent: content,
@@ -50,4 +58,26 @@ async function asOfficeMime({ fullFilePath = "", filename = "" }) {
return { success: true, reason: null, documents: [document] };
}
module.exports = asOfficeMime;
async function resyncOfficeMime({ fullFilePath = "", filename = "" }) {
console.log(`-- Syncing ${filename} --`);
let content = "";
try {
content = await officeParser.parseOfficeAsync(fullFilePath);
} catch (error) {
console.error(`Could not parse office or office-like file`, error);
}
if (!content.length) {
console.error(`Resulting text content was empty for ${filename}.`);
return {
success: false,
reason: `No text content found in ${filename}.`,
content: null,
};
}
console.log(`[SYNC SUCCESS]: ${filename} content was able to be synced.\n`);
return { success: true, reason: null, content };
}
module.exports = { asOfficeMime, resyncOfficeMime };

View File

@@ -6,9 +6,10 @@ const {
} = require("../../../utils/files");
const { tokenizeString } = require("../../../utils/tokenizer");
const { default: slugify } = require("slugify");
const { generateLocalfileChunkSource } = require("../../../utils/metadata");
const PDFLoader = require("./PDFLoader");
async function asPdf({ fullFilePath = "", filename = "" }) {
async function asPdf({ fullFilePath = "", filename = "", options = {} }) {
const pdfLoader = new PDFLoader(fullFilePath, {
splitPages: true,
});
@@ -45,7 +46,7 @@ async function asPdf({ fullFilePath = "", filename = "" }) {
docAuthor: docs[0]?.metadata?.pdf?.info?.Creator || "no author found",
description: docs[0]?.metadata?.pdf?.info?.Title || "No description found.",
docSource: "pdf file uploaded by the user.",
chunkSource: "",
chunkSource: generateLocalfileChunkSource({ filename, ...options }, ""),
published: createdDate(fullFilePath),
wordCount: content.split(" ").length,
pageContent: content,
@@ -61,4 +62,37 @@ async function asPdf({ fullFilePath = "", filename = "" }) {
return { success: true, reason: null, documents: [document] };
}
module.exports = asPdf;
async function resyncPdf({ fullFilePath = "", filename = "" }) {
const pdfLoader = new PDFLoader(fullFilePath, {
splitPages: true,
});
console.log(`-- Syncing ${filename} --`);
const pageContent = [];
const docs = await pdfLoader.load();
for (const doc of docs) {
console.log(
`-- Parsing content from pg ${
doc.metadata?.loc?.pageNumber || "unknown"
} --`
);
if (!doc.pageContent || !doc.pageContent.length) continue;
pageContent.push(doc.pageContent);
}
if (!pageContent.length) {
console.error(`Resulting text content was empty for ${filename}.`);
return {
success: false,
reason: `No text content found in ${filename}.`,
content: null,
};
}
const content = pageContent.join("");
console.log(`[SYNC SUCCESS]: ${filename} content was able to be synced.\n`);
return { success: true, reason: null, content };
}
module.exports = { asPdf, resyncPdf };

View File

@@ -7,8 +7,9 @@ const {
writeToServerDocuments,
} = require("../../utils/files");
const { default: slugify } = require("slugify");
const { generateLocalfileChunkSource } = require("../../utils/metadata");
async function asTxt({ fullFilePath = "", filename = "" }) {
async function asText({ fullFilePath = "", filename = "", options = {} }) {
let content = "";
try {
content = fs.readFileSync(fullFilePath, "utf8");
@@ -34,7 +35,7 @@ async function asTxt({ fullFilePath = "", filename = "" }) {
docAuthor: "Unknown", // TODO: Find a better author
description: "Unknown", // TODO: Find a better description
docSource: "a text file uploaded by the user.",
chunkSource: "",
chunkSource: generateLocalfileChunkSource({ filename, ...options }, ""),
published: createdDate(fullFilePath),
wordCount: content.split(" ").length,
pageContent: content,
@@ -50,4 +51,26 @@ async function asTxt({ fullFilePath = "", filename = "" }) {
return { success: true, reason: null, documents: [document] };
}
module.exports = asTxt;
async function resyncText({ fullFilePath = "", filename = "" }) {
let content = "";
try {
content = fs.readFileSync(fullFilePath, "utf8");
} catch (err) {
console.error("Could not read file!", err);
}
if (!content?.length) {
console.error(`Resulting text content was empty for ${filename}.`);
return {
success: false,
reason: `No text content found in ${filename}.`,
content: null,
};
}
console.log(`-- Syncing ${filename} --`);
console.log(`[SYNC SUCCESS]: ${filename} content was able to be synced.\n`);
return { success: true, reason: null, content: content };
}
module.exports = { asText, resyncText };

View File

@@ -3,6 +3,7 @@ const fs = require("fs");
const {
WATCH_DIRECTORY,
SUPPORTED_FILETYPE_CONVERTERS,
SUPPORTED_RESYNC_CONVERTERS,
} = require("../utils/constants");
const {
trashFile,
@@ -75,6 +76,40 @@ async function processSingleFile(targetFilename, options = {}) {
});
}
async function processFileByReference(fileLocation) {
const fullFilePath = path.resolve(normalizePath(fileLocation));
if (!fs.existsSync(fullFilePath))
throw new Error(`File ${fullFilePath} does not exist.`);
const fileExtension = path.extname(fullFilePath).toLowerCase();
if (!fileExtension)
throw new Error(
`No file extension found for ${fullFilePath}. This file cannot be processed.`
);
let processFileAs = fileExtension;
if (!SUPPORTED_RESYNC_CONVERTERS.hasOwnProperty(fileExtension)) {
if (isTextType(fullFilePath)) {
console.log(
`\x1b[33m[Collector]\x1b[0m The provided filetype of ${fileExtension} does not have a preset and will be processed as .txt.`
);
processFileAs = ".txt";
} else {
throw new Error(
`File extension ${fileExtension} not supported for parsing and cannot be assumed as text file type.`
);
}
}
const FileTypeProcessor = SUPPORTED_RESYNC_CONVERTERS[processFileAs];
return await FileTypeProcessor({
fullFilePath,
filename: path.basename(fullFilePath),
options: { contentOnly: true },
});
}
module.exports = {
processSingleFile,
processFileByReference,
};

View File

@@ -29,13 +29,25 @@ const ACCEPTED_MIMES = {
"application/epub+zip": [".epub"],
};
const asText = require("../processSingleFile/convert/asTxt.js");
const asPdf = require("../processSingleFile/convert/asPDF/index.js");
const asDocx = require("../processSingleFile/convert/asDocx.js");
const asOfficeMime = require("../processSingleFile/convert/asOfficeMime.js");
const asMbox = require("../processSingleFile/convert/asMbox.js");
const asAudio = require("../processSingleFile/convert/asAudio.js");
const asEPub = require("../processSingleFile/convert/asEPub.js");
const { asText, resyncText } = require("../processSingleFile/convert/asTxt.js");
const {
asPdf,
resyncPdf,
} = require("../processSingleFile/convert/asPDF/index.js");
const {
asDocx,
resyncDocx,
} = require("../processSingleFile/convert/asDocx.js");
const {
asOfficeMime,
resyncOfficeMime,
} = require("../processSingleFile/convert/asOfficeMime.js");
const { asMbox } = require("../processSingleFile/convert/asMbox.js");
const { asAudio } = require("../processSingleFile/convert/asAudio.js");
const {
asEPub,
resyncEPub,
} = require("../processSingleFile/convert/asEPub.js");
const SUPPORTED_FILETYPE_CONVERTERS = {
".txt": asText,
@@ -63,8 +75,28 @@ const SUPPORTED_FILETYPE_CONVERTERS = {
".mpeg": asAudio,
};
const SUPPORTED_RESYNC_CONVERTERS = {
".txt": resyncText,
".md": resyncText,
".org": resyncText,
".adoc": resyncText,
".rst": resyncText,
".html": resyncText,
".pdf": resyncPdf,
".docx": resyncDocx,
".pptx": resyncOfficeMime,
".odt": resyncOfficeMime,
".odp": resyncOfficeMime,
".epub": resyncEPub,
};
module.exports = {
SUPPORTED_FILETYPE_CONVERTERS,
SUPPORTED_RESYNC_CONVERTERS,
WATCH_DIRECTORY,
ACCEPTED_MIMES,
};

View File

@@ -6,6 +6,7 @@ const UrlPattern = require("url-pattern");
const { writeToServerDocuments, sanitizeFileName } = require("../../files");
const { tokenizeString } = require("../../tokenizer");
const { ConfluencePagesLoader } = require("./ConfluenceLoader");
const { generateConfluenceChunkSource } = require("../../metadata");
/**
* Load Confluence documents from a spaceID and Confluence credentials
@@ -83,7 +84,7 @@ async function loadConfluence({ pageUrl, username, accessToken }, response) {
docAuthor: subdomain,
description: doc.metadata.title,
docSource: `${subdomain} Confluence`,
chunkSource: generateChunkSource(
chunkSource: generateConfluenceChunkSource(
{ doc, baseUrl, accessToken, username },
response.locals.encryptionWorker
),
@@ -278,28 +279,6 @@ function validSpaceUrl(spaceUrl = "") {
return { valid: false, result: null };
}
/**
* Generate the full chunkSource for a specific Confluence page so that we can resync it later.
* This data is encrypted into a single `payload` query param so we can replay credentials later
* since this was encrypted with the systems persistent password and salt.
* @param {object} chunkSourceInformation
* @param {import("../../EncryptionWorker").EncryptionWorker} encryptionWorker
* @returns {string}
*/
function generateChunkSource(
{ doc, baseUrl, accessToken, username },
encryptionWorker
) {
const payload = {
baseUrl,
token: accessToken,
username,
};
return `confluence://${doc.metadata.url}?payload=${encryptionWorker.encrypt(
JSON.stringify(payload)
)}`;
}
module.exports = {
loadConfluence,
fetchConfluencePage,

View File

@@ -5,6 +5,7 @@ const { default: slugify } = require("slugify");
const { v4 } = require("uuid");
const { writeToServerDocuments } = require("../../files");
const { tokenizeString } = require("../../tokenizer");
const { generateGitHubChunkSource } = require("../../metadata");
/**
* Load in a Github Repo recursively or just the top level if no PAT is provided
@@ -58,7 +59,7 @@ async function loadGithubRepo(args, response) {
docAuthor: repo.author,
description: "No description found.",
docSource: doc.metadata.source,
chunkSource: generateChunkSource(
chunkSource: generateGitHubChunkSource(
repo,
doc,
response.locals.encryptionWorker
@@ -134,26 +135,4 @@ async function fetchGithubFile({
};
}
/**
* Generate the full chunkSource for a specific file so that we can resync it later.
* This data is encrypted into a single `payload` query param so we can replay credentials later
* since this was encrypted with the systems persistent password and salt.
* @param {RepoLoader} repo
* @param {import("@langchain/core/documents").Document} doc
* @param {import("../../EncryptionWorker").EncryptionWorker} encryptionWorker
* @returns {string}
*/
function generateChunkSource(repo, doc, encryptionWorker) {
const payload = {
owner: repo.author,
project: repo.project,
branch: repo.branch,
path: doc.metadata.source,
pat: !!repo.accessToken ? repo.accessToken : null,
};
return `github://${repo.repo}?payload=${encryptionWorker.encrypt(
JSON.stringify(payload)
)}`;
}
module.exports = { loadGithubRepo, fetchGithubFile };

View File

@@ -5,8 +5,7 @@ class Logger {
static _instance;
constructor() {
if (Logger._instance) return Logger._instance;
this.logger =
process.env.NODE_ENV === "production" ? this.getWinstonLogger() : console;
this.logger = this.getWinstonLogger();
Logger._instance = this;
}

View File

@@ -0,0 +1,62 @@
/**
* Generate the full chunkSource for a locally uploaded/referenced file so that we can resync it later.
* @param {{localPath?: string, filename?: string}} props
* @param {string} fallbackValue - the fallback value for if the filename or localPath is empty
* @returns {string}
*/
function generateLocalfileChunkSource(props = {}, fallbackValue = "") {
if (!props?.hasOwnProperty("localPath")) return fallbackValue;
const { filename, localPath } = props;
if (!filename || !localPath) return fallbackValue;
return `localfile://${localPath}`;
}
/**
* Generate the full chunkSource for a specific Confluence page so that we can resync it later.
* This data is encrypted into a single `payload` query param so we can replay credentials later
* since this was encrypted with the systems persistent password and salt.
* @param {object} chunkSourceInformation
* @param {import("../EncryptionWorker").EncryptionWorker} encryptionWorker
* @returns {string}
*/
function generateConfluenceChunkSource(
{ doc, baseUrl, accessToken, username },
encryptionWorker
) {
const payload = {
baseUrl,
token: accessToken,
username,
};
return `confluence://${doc.metadata.url}?payload=${encryptionWorker.encrypt(
JSON.stringify(payload)
)}`;
}
/**
* Generate the full chunkSource for a specific GitHub file so that we can resync it later.
* This data is encrypted into a single `payload` query param so we can replay credentials later
* since this was encrypted with the systems persistent password and salt.
* @param {RepoLoader} repo
* @param {import("@langchain/core/documents").Document} doc
* @param {import("../EncryptionWorker").EncryptionWorker} encryptionWorker
* @returns {string}
*/
function generateGitHubChunkSource(repo, doc, encryptionWorker) {
const payload = {
owner: repo.author,
project: repo.project,
branch: repo.branch,
path: doc.metadata.source,
pat: !!repo.accessToken ? repo.accessToken : null,
};
return `github://${repo.repo}?payload=${encryptionWorker.encrypt(
JSON.stringify(payload)
)}`;
}
module.exports = {
generateLocalfileChunkSource,
generateConfluenceChunkSource,
generateGitHubChunkSource,
};

View File

@@ -46,7 +46,11 @@ function FileUploadProgressComponent({
}, 100);
// Chunk streaming not working in production so we just sit and wait
const { response, data } = await Workspace.uploadFile(slug, formData);
const { response, data } = await Workspace.uploadFile(
slug,
formData,
file.path
);
if (!response.ok) {
setStatus("failed");
clearInterval(timer);

View File

@@ -1,5 +1,5 @@
import { CloudArrowUp } from "@phosphor-icons/react";
import { useEffect, useState } from "react";
import { useEffect, useRef, useState } from "react";
import showToast from "../../../../../utils/toast";
import System from "../../../../../models/system";
import { useDropzone } from "react-dropzone";
@@ -14,6 +14,7 @@ export default function UploadFile({
setLoading,
setLoadingMessage,
}) {
const inputRef = useRef(null);
const [ready, setReady] = useState(false);
const [files, setFiles] = useState([]);
const [fetchingUrl, setFetchingUrl] = useState(false);
@@ -43,7 +44,15 @@ export default function UploadFile({
// Don't spam fetchKeys, wait 1s between calls at least.
const handleUploadSuccess = debounce(() => fetchKeys(true), 1000);
const handleUploadError = (_msg) => null; // stubbed.
const handleInputChange = (event) => {
const newFiles = Array.from(event.target.files).map((file) => {
return {
uid: v4(),
file,
};
});
setFiles([...files, ...newFiles]);
};
const onDrop = async (acceptedFiles, rejections) => {
const newAccepted = acceptedFiles.map((file) => {
return {
@@ -70,9 +79,11 @@ export default function UploadFile({
checkProcessorOnline();
}, []);
const { getRootProps, getInputProps } = useDropzone({
const { getRootProps } = useDropzone({
onDrop,
disabled: !ready,
noClick: true,
noKeyboard: true,
});
return (
@@ -81,9 +92,16 @@ export default function UploadFile({
className={`w-[560px] border-2 border-dashed rounded-2xl bg-zinc-900/50 p-3 ${
ready ? "cursor-pointer" : "cursor-not-allowed"
} hover:bg-zinc-900/90`}
{...getRootProps()}
onClick={() => inputRef.current.click()}
>
<input {...getInputProps()} />
<input
ref={inputRef}
type="file"
hidden={true}
name="files"
multiple={true}
onChange={handleInputChange}
/>
{ready === false ? (
<div className="flex flex-col items-center justify-center h-full">
<CloudArrowUp className="w-8 h-8 text-white/80" />
@@ -96,7 +114,10 @@ export default function UploadFile({
</div>
</div>
) : files.length === 0 ? (
<div className="flex flex-col items-center justify-center">
<div
{...getRootProps()}
className="flex flex-col items-center justify-center"
>
<CloudArrowUp className="w-8 h-8 text-white/80" />
<div className="text-white text-opacity-80 text-sm font-semibold py-1">
Click to upload or drag and drop

View File

@@ -228,8 +228,10 @@ const DocumentWatchAlert = memo(() => {
content in every workspace where this file is managed.
</p>
<p>
This feature currently supports online-based content and will not
be available for manually uploaded documents.
This feature currently supports online-based content (links,
GitHub, Confluence) and <b>locally uploaded files</b>. In general,
online-based content is synced every hour and local files are
synced every 10 minutes.
</p>
<p>
You can manage what documents are watched from the{" "}

View File

@@ -198,11 +198,14 @@ const Workspace = {
.then((res) => res.ok)
.catch(() => false);
},
uploadFile: async function (slug, formData) {
uploadFile: async function (slug, formData, localPath = null) {
const response = await fetch(`${API_BASE()}/workspace/${slug}/upload`, {
method: "POST",
body: formData,
headers: baseHeaders(),
headers: {
"x-file-origin": localPath,
...baseHeaders(),
},
});
const data = await response.json();

View File

@@ -57,8 +57,10 @@ export default function LiveSyncToggle({ enabled = false, onToggle }) {
are referenced in at the same time of update.
</p>
<p className="text-white/80 text-xs italic">
This feature only applies to web-based content, such as websites,
Confluence, YouTube, and GitHub files.
You can sync both web-based content like websites, Confluence, and
GitHub files as well as locally uploaded documents. Web-based
documents are checked hourly and locally referenced documents are
checked every 5 minutes.
</p>
</div>
</div>

View File

@@ -115,7 +115,7 @@ function workspaceEndpoints(app) {
async function (request, response) {
try {
const Collector = new CollectorApi();
const { originalname } = request.file;
const { originalname, localPath = null } = request.file;
const processingOnline = await Collector.online();
if (!processingOnline) {
@@ -129,8 +129,10 @@ function workspaceEndpoints(app) {
return;
}
const { success, reason } =
await Collector.processDocument(originalname);
const { success, reason } = await Collector.processDocument(
originalname,
localPath
);
if (!success) {
response.status(500).json({ success: false, error: reason }).end();
return;

View File

@@ -8,7 +8,7 @@ const { DocumentSyncRun } = require('../models/documentSyncRun.js');
(async () => {
try {
const queuesToProcess = await DocumentSyncQueue.staleDocumentQueues();
const queuesToProcess = await DocumentSyncQueue.staleDocumentQueues('remote');
if (queuesToProcess.length === 0) {
log('No outstanding documents to sync. Exiting.');
return;

View File

@@ -0,0 +1,147 @@
const { Document } = require('../models/documents.js');
const { DocumentSyncQueue } = require('../models/documentSyncQueue.js');
const { CollectorApi } = require('../utils/collectorApi');
const { fileData } = require("../utils/files");
const { log, conclude, updateSourceDocument } = require('./helpers/index.js');
const { getVectorDbClass } = require('../utils/helpers/index.js');
const { DocumentSyncRun } = require('../models/documentSyncRun.js');
const fs = require('fs');
(async () => {
try {
const queuesToProcess = await DocumentSyncQueue.staleDocumentQueues('local');
if (queuesToProcess.length === 0) {
log('No outstanding documents to sync. Exiting.');
return;
}
const collector = new CollectorApi();
if (!(await collector.online())) {
log('Could not reach collector API. Exiting.');
return;
}
log(`${queuesToProcess.length} watched documents have been found to be stale and will be updated now.`)
for (const queue of queuesToProcess) {
let newContent = null;
const document = queue.workspaceDoc;
const workspace = document.workspace;
const { metadata, type, source } = Document.parseDocumentTypeAndSource(document);
if (!metadata || !DocumentSyncQueue.validFileTypes.includes(type)) {
// Document is either broken, invalid, or not supported so drop it from future queues.
log(`Document ${document.filename} has no metadata, is broken, or invalid and has been removed from all future runs.`)
await DocumentSyncQueue.unwatch(document);
continue;
}
if (!fs.existsSync(source)) {
// Document reference is either broken, invalid, or not supported so drop it from future queues.
log(`Document ${document.filename} has moved and its known source is unable to be found - removing from queue.`)
await DocumentSyncQueue.unwatch(document);
continue;
}
const response = await collector.forwardExtensionRequest({
endpoint: "/ext/resync-source-document",
method: "POST",
body: JSON.stringify({
type,
options: { source }
})
});
newContent = response?.content;
if (!newContent) {
// Check if the last "x" runs were all failures (not exits!). If so - remove the job entirely since it is broken.
const failedRunCount = (await DocumentSyncRun.where({ queueId: queue.id }, DocumentSyncQueue.maxRepeatFailures, { createdAt: 'desc' })).filter((run) => run.status === DocumentSyncRun.statuses.failed).length;
if (failedRunCount >= DocumentSyncQueue.maxRepeatFailures) {
log(`Document ${document.filename} has failed to refresh ${failedRunCount} times continuously and will now be removed from the watched document set.`)
await DocumentSyncQueue.unwatch(document);
continue;
}
log(`Failed to get a new content response from collector for source ${source}. Skipping, but will retry next worker interval. Attempt ${failedRunCount === 0 ? 1 : failedRunCount}/${DocumentSyncQueue.maxRepeatFailures}`);
await DocumentSyncQueue.saveRun(queue.id, DocumentSyncRun.statuses.failed, { filename: document.filename, workspacesModified: [], reason: 'No content found.' })
continue;
}
const currentDocumentData = await fileData(document.docpath)
if (currentDocumentData.pageContent === newContent) {
const nextSync = DocumentSyncQueue.calcNextSync(queue)
log(`Source ${source} is unchanged and will be skipped. Next sync will be ${nextSync.toLocaleString()}.`);
await DocumentSyncQueue._update(
queue.id,
{
lastSyncedAt: new Date().toISOString(),
nextSyncAt: nextSync.toISOString(),
}
);
await DocumentSyncQueue.saveRun(queue.id, DocumentSyncRun.statuses.exited, { filename: document.filename, workspacesModified: [], reason: 'Content unchanged.' })
continue;
}
// update the defined document and workspace vectorDB with the latest information
// it will skip cache and create a new vectorCache file.
const vectorDatabase = getVectorDbClass();
await vectorDatabase.deleteDocumentFromNamespace(workspace.slug, document.docId);
await vectorDatabase.addDocumentToNamespace(
workspace.slug,
{ ...currentDocumentData, pageContent: newContent, docId: document.docId },
document.docpath,
true
);
updateSourceDocument(
document.docpath,
{
...currentDocumentData,
pageContent: newContent,
docId: document.docId,
published: (new Date).toLocaleString(),
// Todo: Update word count and token_estimate?
}
)
log(`Workspace "${workspace.name}" vectors of ${source} updated. Document and vector cache updated.`)
// Now we can bloom the results to all matching documents in all other workspaces
const workspacesModified = [workspace.slug];
const moreReferences = await Document.where({
id: { not: document.id },
filename: document.filename
}, null, null, { workspace: true });
if (moreReferences.length !== 0) {
log(`${source} is referenced in ${moreReferences.length} other workspaces. Updating those workspaces as well...`)
for (const additionalDocumentRef of moreReferences) {
const additionalWorkspace = additionalDocumentRef.workspace;
workspacesModified.push(additionalWorkspace.slug);
await vectorDatabase.deleteDocumentFromNamespace(additionalWorkspace.slug, additionalDocumentRef.docId);
await vectorDatabase.addDocumentToNamespace(
additionalWorkspace.slug,
{ ...currentDocumentData, pageContent: newContent, docId: additionalDocumentRef.docId },
additionalDocumentRef.docpath,
);
log(`Workspace "${additionalWorkspace.name}" vectors for ${source} was also updated with the new content from cache.`)
}
}
const nextRefresh = DocumentSyncQueue.calcNextSync(queue);
log(`${source} has been refreshed in all workspaces it is currently referenced in. Next refresh will be ${nextRefresh.toLocaleString()}.`)
await DocumentSyncQueue._update(
queue.id,
{
lastSyncedAt: new Date().toISOString(),
nextSyncAt: nextRefresh.toISOString(),
}
);
await DocumentSyncQueue.saveRun(queue.id, DocumentSyncRun.statuses.success, { filename: document.filename, workspacesModified })
}
} catch (e) {
console.error(e)
log(`errored with ${e.message}`)
} finally {
conclude();
}
})();

View File

@@ -1,16 +1,17 @@
const { BackgroundService } = require("../utils/BackgroundWorkers");
const { safeJsonParse } = require("../utils/http");
const prisma = require("../utils/prisma");
const { SystemSettings } = require("./systemSettings");
const { Telemetry } = require("./telemetry");
/**
* @typedef {('link'|'youtube'|'confluence'|'github')} validFileType
* @typedef {('link'|'youtube'|'confluence'|'github'|'localfile')} validFileType
*/
const DocumentSyncQueue = {
featureKey: "experimental_live_file_sync",
// update the validFileTypes and .canWatch properties when adding elements here.
validFileTypes: ["link", "youtube", "confluence", "github"],
validFileTypes: ["link", "youtube", "confluence", "github", "localfile"],
defaultStaleAfter: 604800000,
maxRepeatFailures: 5, // How many times a run can fail in a row before pruning.
writable: [],
@@ -44,6 +45,7 @@ const DocumentSyncQueue = {
if (chunkSource.startsWith("youtube://")) return true; // If is a youtube link
if (chunkSource.startsWith("confluence://")) return true; // If is a confluence document link
if (chunkSource.startsWith("github://")) return true; // If is a Github file reference
if (chunkSource.startsWith("localfile://")) return true; // If is a local file reference
return false;
},
@@ -69,10 +71,16 @@ const DocumentSyncQueue = {
`Cannot watch this document again - it already has a queue set.`
);
const metadata = safeJsonParse(document.metadata, { chunkSource: "" });
const isLocalFile = metadata?.chunkSource?.startsWith("localfile:");
const queue = await prisma.document_sync_queues.create({
data: {
workspaceDocId: document.id,
nextSyncAt: new Date(Number(new Date()) + this.defaultStaleAfter),
staleAfterMs: isLocalFile ? 0 : this.defaultStaleAfter,
nextSyncAt: isLocalFile
? new Date()
: new Date(Number(new Date()) + this.defaultStaleAfter),
type: isLocalFile ? "local" : "remote",
},
});
await Document._updateAll(
@@ -190,12 +198,13 @@ const DocumentSyncQueue = {
* { workspace: import("@prisma/client").workspaces }
* })[]}>}
*/
staleDocumentQueues: async function () {
staleDocumentQueues: async function (type = null) {
const queues = await this.where(
{
nextSyncAt: {
lte: new Date().toISOString(),
},
...(!!type ? { type } : {}), // sync by "remote" || "local". Default is all.
},
null,
null,

View File

@@ -0,0 +1,18 @@
-- RedefineTables
PRAGMA foreign_keys=OFF;
CREATE TABLE "new_document_sync_queues" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"staleAfterMs" INTEGER NOT NULL DEFAULT 604800000,
"nextSyncAt" DATETIME NOT NULL,
"createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"lastSyncedAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"type" TEXT NOT NULL DEFAULT 'remote',
"workspaceDocId" INTEGER NOT NULL,
CONSTRAINT "document_sync_queues_workspaceDocId_fkey" FOREIGN KEY ("workspaceDocId") REFERENCES "workspace_documents" ("id") ON DELETE CASCADE ON UPDATE CASCADE
);
INSERT INTO "new_document_sync_queues" ("createdAt", "id", "lastSyncedAt", "nextSyncAt", "staleAfterMs", "workspaceDocId") SELECT "createdAt", "id", "lastSyncedAt", "nextSyncAt", "staleAfterMs", "workspaceDocId" FROM "document_sync_queues";
DROP TABLE "document_sync_queues";
ALTER TABLE "new_document_sync_queues" RENAME TO "document_sync_queues";
CREATE UNIQUE INDEX "document_sync_queues_workspaceDocId_key" ON "document_sync_queues"("workspaceDocId");
PRAGMA foreign_key_check;
PRAGMA foreign_keys=ON;

View File

@@ -284,6 +284,7 @@ model document_sync_queues {
nextSyncAt DateTime
createdAt DateTime @default(now())
lastSyncedAt DateTime @default(now())
type String @default("remote")
workspaceDocId Int @unique
workspaceDoc workspace_documents? @relation(fields: [workspaceDocId], references: [id], onDelete: Cascade)
runs document_sync_executions[]

View File

@@ -67,6 +67,11 @@ class BackgroundService {
timeout: "1min", // Wait 60s to check job on boot to make sure there are no pending jobs since last app close.
interval: "1hr",
},
{
name: "sync-watched-local-documents",
timeout: false,
interval: "5min",
},
];
}

View File

@@ -42,12 +42,15 @@ class CollectorApi {
});
}
async processDocument(filename = "") {
async processDocument(filename = "", localPath = null) {
if (!filename) return false;
const data = JSON.stringify({
filename,
options: this.#attachOptions(),
options: {
...this.#attachOptions(),
localPath,
},
});
return await fetch(`${this.endpoint}/process`, {

View File

@@ -11,10 +11,18 @@ const fileUploadStorage = multer.diskStorage({
: path.resolve(process.env.STORAGE_DIR, `hotdir`); // specific for desktop.
cb(null, uploadOutput);
},
filename: function (_, file, cb) {
filename: function (req, file, cb) {
file.originalname = Buffer.from(file.originalname, "latin1").toString(
"utf8"
);
// Set origin for watching
if (
req.headers.hasOwnProperty("x-file-origin") &&
typeof req.headers["x-file-origin"] === "string"
)
file.localPath = req.headers["x-file-origin"];
cb(null, file.originalname);
},
});

View File

@@ -5,8 +5,7 @@ class Logger {
static _instance;
constructor() {
if (Logger._instance) return Logger._instance;
this.logger =
process.env.NODE_ENV === "production" ? this.getWinstonLogger() : console;
this.logger = this.getWinstonLogger();
Logger._instance = this;
}