Add mongoDB support #8

Merged
Sirherobrine23 merged 6 commits from db into main 2023-01-11 04:44:07 +00:00
6 changed files with 74 additions and 32 deletions
Showing only changes of commit cc3409ef48 - Show all commits

9
.vscode/launch.json vendored

@ -11,10 +11,11 @@
"internalConsoleOptions": "openOnSessionStart", "internalConsoleOptions": "openOnSessionStart",
"skipFiles": ["<node_internals>/**", "node_modules/**"], "skipFiles": ["<node_internals>/**", "node_modules/**"],
"cwd": "${workspaceRoot}", "cwd": "${workspaceRoot}",
"runtimeExecutable": "ts-node", "runtimeExecutable": "node",
"args": ["src/index.ts", "server"], "args": ["src/index.js", "server", "--cpus=0"],
"env": { "preLaunchTask": {
"DISABLE_CLUSTER": "true" "type": "npm",
"script": "build"
} }
} }
] ]

@ -1,7 +1,7 @@
FROM node:latest FROM node:lts-alpine
WORKDIR /app WORKDIR /app
COPY package*.json ./ COPY package*.json ./
RUN npm ci RUN npm ci
COPY . . COPY . .
RUN npm run build RUN npm run build && npm link
ENTRYPOINT [ "node", "src/index.js", "server" ] ENTRYPOINT [ "apt-stream", "server" ]

@ -30,8 +30,9 @@
"apt-stream": "./src/index.js" "apt-stream": "./src/index.js"
}, },
"scripts": { "scripts": {
"start": "ts-node src/index.ts", "start": "npm run build && node ./src/index.js",
"build": "tsc" "dev": "npm run build && node --inspect ./src/index.js server --cpu=0",
"build": "tsc --build --clean && tsc"
}, },
"devDependencies": { "devDependencies": {
"@types/express": "^4.17.15", "@types/express": "^4.17.15",

@ -22,12 +22,12 @@ export default async function initApp(config: string) {
res.on("close", () => console.log("[%s, cluserID: %s]: Path: %s, Method: %s, IP: %s, Status: %f", new Date().toISOString(), cluserID, req.path, req.method, req.ip, res.statusCode)); res.on("close", () => console.log("[%s, cluserID: %s]: Path: %s, Method: %s, IP: %s, Status: %f", new Date().toISOString(), cluserID, req.path, req.method, req.ip, res.statusCode));
next(); next();
}); });
app.get("/pool/:dist/:suite/:package/:arch/:version/download.deb", async ({params: {dist, suite, package: packageName, arch, version}}, {writeHead}, next) => { app.get("/pool/:dist/:suite/:package/:arch/:version/download.deb", async ({params: {dist, suite, package: packageName, arch, version}}, res, next) => {
try { try {
const data = (await packageManeger.getPackages(dist, suite, packageName, arch, version))?.at(-1); const data = (await packageManeger.getPackages(dist, suite, packageName, arch, version))?.at(-1);
if (!data) return next(new Error("Not Found")); if (!data) return next(new Error("Not Found"));
const fileStream = await data.getFileStream(); const fileStream = await data.getFileStream();
fileStream.pipe(writeHead(200, { fileStream.pipe(res.writeHead(200, {
"Content-Type": "application/x-debian-package", "Content-Type": "application/x-debian-package",
"Content-Length": data.control.Size, "Content-Length": data.control.Size,
"Content-Disposition": `attachment; filename="${packageName}_${version}_${arch}.deb"`, "Content-Disposition": `attachment; filename="${packageName}_${version}_${arch}.deb"`,
@ -55,6 +55,10 @@ export default async function initApp(config: string) {
return old; return old;
}, {}))).catch(next)); }, {}))).catch(next));
// Dists info
app.get("/dists", ({}, res, next) => packageManeger.getDists().then(data => res.json(data)).catch(next));
app.get("/dists/:dist", ({params: {dist}}, res, next) => packageManeger.getDistInfo(dist).then(data => res.json(data)).catch(next));
// Create Package, Package.gz and Package.xz // Create Package, Package.gz and Package.xz
async function createPackages(dist: string, suite: string, arch: string) { async function createPackages(dist: string, suite: string, arch: string) {
if (!await packageManeger.existsDist(dist)) throw new Error("Distribution not exists"); if (!await packageManeger.existsDist(dist)) throw new Error("Distribution not exists");
@ -74,7 +78,7 @@ export default async function initApp(config: string) {
if (!(control.Size && (control.MD5sum || control.SHA256 || control.SHA1))) continue; if (!(control.Size && (control.MD5sum || control.SHA256 || control.SHA1))) continue;
if (fist) fist = false; else mainReadstream.push("\n\n"); if (fist) fist = false; else mainReadstream.push("\n\n");
control.Filename = `pool/${dist}/${suite}/${control.Package}/${control.Architecture}/${control.Version}/download.deb`; control.Filename = `pool/${dist}/${suite}/${control.Package}/${control.Architecture}/${control.Version}/download.deb`;
mainReadstream.push(Object.keys(control).map(key => mainReadstream.push(`${key}: ${control[key]}`)).join("\n")); mainReadstream.push(Object.keys(control).map(key => `${key}: ${control[key]}`).join("\n"));
} }
mainReadstream.push(null); mainReadstream.push(null);
@ -89,6 +93,7 @@ export default async function initApp(config: string) {
} }
}; };
} }
app.get("/dists/(./)?:dist/:suite/binary-:arch/Packages(.(xz|gz)|)", async ({params: {dist, suite, arch}, path: reqPath}, res, next) => createPackages(dist, suite, arch).then(packages => { app.get("/dists/(./)?:dist/:suite/binary-:arch/Packages(.(xz|gz)|)", async ({params: {dist, suite, arch}, path: reqPath}, res, next) => createPackages(dist, suite, arch).then(packages => {
if (reqPath.endsWith(".gz")) return packages.gzip.pipe(res); if (reqPath.endsWith(".gz")) return packages.gzip.pipe(res);
else if (reqPath.endsWith(".xz")) return packages.lzma.pipe(res); else if (reqPath.endsWith(".xz")) return packages.lzma.pipe(res);

@ -90,9 +90,17 @@ yargs(process.argv.slice(2)).version(false).help().demandCommand().strictCommand
console.log("base64:%s", base64); console.log("base64:%s", base64);
}); });
}).command("server", "Run HTTP serber", async yargs => { }).command("server", "Run HTTP serber", async yargs => {
const options = yargs.parseSync(); const options = yargs.option("cpus", {
const envs = Object.keys(process.env).filter(key => key.startsWith("APT_STREAM")); type: "number",
const { app, packageConfig, packageManeger } = await repo(envs.length > 0 ? `env:${envs[0]}` : options.cofigPath); default: os.cpus().length/2,
alias: "C",
demandOption: false,
description: "Number of cpus to use in Cluster"
}).parseSync();
console.log("Starting server...");
process.on("unhandledRejection", err => console.error(err));
process.on("uncaughtException", err => console.error(err));
const { app, packageConfig, packageManeger } = await repo(Object.keys(process.env).find(key => key.startsWith("APT_STREAM")) ? `env:${Object.keys(process.env).find(key => key.startsWith("APT_STREAM"))}` : options.cofigPath);
app.all("*", ({res}) => res.status(404).json({ app.all("*", ({res}) => res.status(404).json({
error: "Endpoint not exists", error: "Endpoint not exists",
message: "Endpoint not exists, check the documentation for more information" message: "Endpoint not exists, check the documentation for more information"
@ -110,7 +118,7 @@ yargs(process.argv.slice(2)).version(false).help().demandCommand().strictCommand
}); });
}); });
const port = process.env.PORT ?? packageConfig["apt-config"]?.portListen ?? 3000; const port = process.env.PORT ?? packageConfig["apt-config"]?.portListen ?? 3000;
if (!(Boolean(process.env["DISABLE_CLUSTER"]))) { if (options.cpus > 1) {
if (cluster.isWorker) { if (cluster.isWorker) {
console.log("Worker %d running, PID: %f", cluster.worker?.id ?? "No ID", process.pid); console.log("Worker %d running, PID: %f", cluster.worker?.id ?? "No ID", process.pid);
app.listen(port, function() { app.listen(port, function() {
@ -119,23 +127,28 @@ yargs(process.argv.slice(2)).version(false).help().demandCommand().strictCommand
return; return;
} }
console.log("Work master, PID %f, starting workers ...", process.pid); console.log("Work master, PID %f, starting workers ...", process.pid);
os.cpus().forEach(() => cluster.fork()); for (let i = 0; i < options.cpus; i++) cluster.fork({...process.env, workNumber: i});
cluster.on("error", console.error).on("exit", (worker, code, signal: NodeJS.Signals) => { cluster.on("error", err => {
console.log(err?.stack ?? String(err));
// process.exit(1);
}).on("exit", (worker, code, signal: NodeJS.Signals) => {
// if (process[Symbol.for("ts-node.register.instance")]) cluster.setupPrimary({/* Fix for ts-node */ execArgv: ["--loader", "ts-node/esm"]}); // if (process[Symbol.for("ts-node.register.instance")]) cluster.setupPrimary({/* Fix for ts-node */ execArgv: ["--loader", "ts-node/esm"]});
if (signal === "SIGKILL") return console.log("Worker %d was killed", worker?.id ?? "No ID"); if (signal === "SIGKILL") return console.log("Worker %d was killed", worker?.id ?? "No ID");
else if (signal === "SIGABRT") return console.log("Worker %d was aborted", worker?.id ?? "No ID"); else if (signal === "SIGABRT") return console.log("Worker %d was aborted", worker?.id ?? "No ID");
else if (signal === "SIGTERM") return console.log("Worker %d was terminated", worker?.id ?? "No ID"); else if (signal === "SIGTERM") return console.log("Worker %d was terminated", worker?.id ?? "No ID");
console.log("Worker %d died with code: %s, Signal: %s", worker?.id ?? "No ID", code, signal ?? "No Signal"); console.log("Worker %d died with code: %s, Signal: %s", worker?.id ?? "No ID", code, signal ?? "No Signal");
cluster.fork();
}).on("online", worker => console.log("Worker %d is online", worker?.id ?? "No ID")); }).on("online", worker => console.log("Worker %d is online", worker?.id ?? "No ID"));
} else app.listen(port, function() {console.log("Apt Stream Port listen on %f", this.address()?.port)}); } else {
console.warn("Running without cluster, this is not recommended for production");
app.listen(port, function() {console.log("Apt Stream Port listen on %f", this.address()?.port)});
}
// large ram available
if (os.freemem() > 2 * 1024 * 1024 * 1024) await Promise.all(Object.keys(packageConfig.repositories).map(async distName => {const dist = packageConfig.repositories[distName]; return Promise.all(dist.targets.map(async target => packageManeger.loadRepository(distName, target, packageConfig["apt-config"], packageConfig).catch(console.error)));})).catch(console.error);
console.warn("Not enough RAM to load all repositories, loading one by one");
for (const distName in packageConfig.repositories) { for (const distName in packageConfig.repositories) {
const dist = packageConfig.repositories[distName]; const dist = packageConfig.repositories[distName];
for (const target of dist.targets) { for (const target of dist.targets) {
await packageManeger.loadRepository(distName, target, packageConfig["apt-config"], packageConfig).catch(console.error); await packageManeger.loadRepository(distName, target, packageConfig["apt-config"], packageConfig).catch(err => console.error(String(err)));
console.log("Complete load repository '%s'", distName);
} }
} }
}).parseAsync(); }).parseAsync();

@ -7,6 +7,7 @@ import { Readable } from "node:stream";
import cluster from "node:cluster"; import cluster from "node:cluster";
import path from "node:path"; import path from "node:path";
import tar from "tar"; import tar from "tar";
import { format } from "node:util";
export type packageSave = { export type packageSave = {
dist: string, dist: string,
@ -28,6 +29,13 @@ export type packageManegerV2 = {
addPackage: (repo: packageSave) => Promise<void>, addPackage: (repo: packageSave) => Promise<void>,
existsDist: (dist: string) => Promise<boolean>, existsDist: (dist: string) => Promise<boolean>,
existsSuite: (dist: string, suite: string) => Promise<boolean>, existsSuite: (dist: string, suite: string) => Promise<boolean>,
getDists: () => Promise<string[]>,
getDistInfo: (dist: string) => Promise<{
packagesCount: number,
arch: string[],
packagesName: string[],
suites: string[],
}>,
}; };
/** /**
@ -36,6 +44,20 @@ export type packageManegerV2 = {
*/ */
export default async function packageManeger(config: backendConfig): Promise<packageManegerV2> { export default async function packageManeger(config: backendConfig): Promise<packageManegerV2> {
const partialConfig: Partial<packageManegerV2> = {}; const partialConfig: Partial<packageManegerV2> = {};
partialConfig.getDists = async function getDists() {
const packages = await partialConfig.getPackages();
return [...new Set(packages.map(U => U.dist))];
}
partialConfig.getDistInfo = async function getDistInfo(dist: string) {
const packages = await partialConfig.getPackages(dist);
return {
packagesCount: packages.length,
packagesName: [...new Set(packages.map(U => U.control.Package))],
arch: [...new Set(packages.map(U => U.control.Architecture))],
suites: [...new Set(packages.map(U => U.suite))],
};
}
partialConfig.loadRepository = async function loadRepository(distName: string, repository: repository, packageAptConfig?: apt_config, aptConfig?: backendConfig) { partialConfig.loadRepository = async function loadRepository(distName: string, repository: repository, packageAptConfig?: apt_config, aptConfig?: backendConfig) {
const saveFile = aptConfig["apt-config"]?.saveFiles ?? false; const saveFile = aptConfig["apt-config"]?.saveFiles ?? false;
@ -71,7 +93,7 @@ export default async function packageManeger(config: backendConfig): Promise<pac
from: "mirror", from: "mirror",
fileUrl: control.Filename, fileUrl: control.Filename,
} }
}); }).catch(err => err);
}); });
return Promise.all(partialPromises); return Promise.all(partialPromises);
@ -154,7 +176,7 @@ export default async function packageManeger(config: backendConfig): Promise<pac
from: "github_release", from: "github_release",
fileUrl: browser_download_url, fileUrl: browser_download_url,
} }
}); }).catch(err => {});
})))).then(data => data.flat(2).filter(Boolean)); })))).then(data => data.flat(2).filter(Boolean));
} }
const release = await httpRequestGithub.getRelease({owner: repository.owner, repository: repository.repository, token: repository.token, peer: repository.assetsLimit, all: false}); const release = await httpRequestGithub.getRelease({owner: repository.owner, repository: repository.repository, token: repository.token, peer: repository.assetsLimit, all: false});
@ -184,7 +206,7 @@ export default async function packageManeger(config: backendConfig): Promise<pac
from: "github_release", from: "github_release",
fileUrl: browser_download_url, fileUrl: browser_download_url,
} }
}); }).catch(err => {});
})))).then(data => data.flat(2).filter(Boolean)); })))).then(data => data.flat(2).filter(Boolean));
} else if (repository.from === "github_tree") { } else if (repository.from === "github_tree") {
const { tree } = await httpRequestGithub.githubTree(repository.owner, repository.repository, repository.tree); const { tree } = await httpRequestGithub.githubTree(repository.owner, repository.repository, repository.tree);
@ -225,7 +247,7 @@ export default async function packageManeger(config: backendConfig): Promise<pac
from: "github_tree", from: "github_tree",
fileUrl: downloadUrl, fileUrl: downloadUrl,
} }
}); }).catch(err => {});
})); }));
} else if (repository.from === "google_drive") { } else if (repository.from === "google_drive") {
const client_id = repository.appSettings.client_id; const client_id = repository.appSettings.client_id;
@ -264,7 +286,7 @@ export default async function packageManeger(config: backendConfig): Promise<pac
from: "google_drive", from: "google_drive",
fileId: fileData.id, fileId: fileData.id,
} }
}); }).catch(err => {});
})); }));
} else if (repository.from === "oracle_bucket") { } else if (repository.from === "oracle_bucket") {
const oracleBucket = await coreUtils.oracleBucket(repository.region as any, repository.bucketName, repository.bucketNamespace, repository.auth); const oracleBucket = await coreUtils.oracleBucket(repository.region as any, repository.bucketName, repository.bucketNamespace, repository.auth);
@ -293,7 +315,7 @@ export default async function packageManeger(config: backendConfig): Promise<pac
from: "oracle_bucket", from: "oracle_bucket",
fileName: fileData.name, fileName: fileData.name,
} }
}); }).catch(err => {});
})); }));
} }
@ -317,7 +339,7 @@ export default async function packageManeger(config: backendConfig): Promise<pac
// Add package to database // Add package to database
partialConfig.addPackage = async function addPackage(repo) { partialConfig.addPackage = async function addPackage(repo) {
const existsPackage = await collection.findOne({dist: repo.dist, suite: repo.suite, "control.Package": repo.control.Package, "control.Version": repo.control.Version, "control.Architecture": repo.control.Architecture}); const existsPackage = await collection.findOne({dist: repo.dist, suite: repo.suite, "control.Package": repo.control.Package, "control.Version": repo.control.Version, "control.Architecture": repo.control.Architecture});
if (existsPackage) await partialConfig.deletePackage(repo); if (existsPackage) throw new Error(format("Package (%s/%s: %s) already exists!", repo.control.Package, repo.control.Version, repo.control.Architecture));
await collection.insertOne(repo); await collection.insertOne(repo);
console.log("Added '%s', version: %s, Arch: %s, in to %s/%s", repo.control.Package, repo.control.Version, repo.control.Architecture, repo.dist, repo.suite); console.log("Added '%s', version: %s, Arch: %s, in to %s/%s", repo.control.Package, repo.control.Version, repo.control.Architecture, repo.dist, repo.suite);
} }
@ -350,7 +372,7 @@ export default async function packageManeger(config: backendConfig): Promise<pac
} else if (data.restoreFileStream.from === "oracle_bucket" && data.repository.from === "oracle_bucket") { } else if (data.restoreFileStream.from === "oracle_bucket" && data.repository.from === "oracle_bucket") {
const oracleBucket = await coreUtils.oracleBucket(data.repository.region as any, data.repository.bucketName, data.repository.bucketNamespace, data.repository.auth); const oracleBucket = await coreUtils.oracleBucket(data.repository.region as any, data.repository.bucketName, data.repository.bucketNamespace, data.repository.auth);
return oracleBucket.getFileStream(data.restoreFileStream.fileName); return oracleBucket.getFileStream(data.restoreFileStream.fileName);
} else if (data.restoreFileStream.from === "mirror" && data.repository.from === "mirror") return coreUtils.httpRequest.pipeFetch(data.restoreFileStream.url); } else if (data.restoreFileStream.from === "mirror" && data.repository.from === "mirror") return coreUtils.httpRequest.pipeFetch(data.restoreFileStream.fileUrl);
else if (data.restoreFileStream.from === "oci" && data.repository.from === "oci") { else if (data.restoreFileStream.from === "oci" && data.repository.from === "oci") {
const oci = await coreUtils.DockerRegistry(data.repository.image); const oci = await coreUtils.DockerRegistry(data.repository.image);
return new Promise((done, reject) => { return new Promise((done, reject) => {
@ -390,7 +412,7 @@ export default async function packageManeger(config: backendConfig): Promise<pac
// Add package to array // Add package to array
partialConfig.addPackage = async function addPackage(repo) { partialConfig.addPackage = async function addPackage(repo) {
const existsPackage = packagesArray.find((x) => x.control.Package === repo.control.Package && x.control.Version === repo.control.Version && x.control.Architecture === repo.control.Architecture && x.dist === repo.dist && x.suite === repo.suite && x.repository === repo.repository); const existsPackage = packagesArray.find((x) => x.control.Package === repo.control.Package && x.control.Version === repo.control.Version && x.control.Architecture === repo.control.Architecture && x.dist === repo.dist && x.suite === repo.suite && x.repository === repo.repository);
if (existsPackage) await partialConfig.deletePackage(repo); if (existsPackage) throw new Error("Package already exists!");
packagesArray.push(repo); packagesArray.push(repo);
console.log("Added '%s', version: %s, Arch: %s, in to %s/%s", repo.control.Package, repo.control.Version, repo.control.Architecture, repo.dist, repo.suite); console.log("Added '%s', version: %s, Arch: %s, in to %s/%s", repo.control.Package, repo.control.Version, repo.control.Architecture, repo.dist, repo.suite);
} }