Add mongoDB support #8

Merged
Sirherobrine23 merged 6 commits from db into main 2023-01-11 04:44:07 +00:00
7 changed files with 93 additions and 35 deletions
Showing only changes of commit 943bf1f39a - Show all commits

@ -4,5 +4,4 @@ COPY package*.json ./
RUN npm ci
COPY . .
RUN npm run build
ENV NODE_OPTIONS="--max_old_space_size=4096"
ENTRYPOINT [ "node", "src/index.js", "server" ]

@ -13,11 +13,13 @@ export default async function initApp(config: string) {
const packageManeger = await package_maneger(packageConfig);
const app = express();
app.disable("x-powered-by").disable("etag").use(express.json()).use(express.urlencoded({extended: true})).use((req, res, next) => {
res.json = data => res.setHeader("Content-Type", "application/json").send(JSON.stringify(data, null, 2));
const cluserID = cluster.worker?.id ?? 0;
// [%s TIME STAMP, cluserID: %f]: Path: %s, Method: %s, IP: %s
console.log("[%s, cluserID: %f]: Path: %s, Method: %s, IP: %s", new Date().toISOString(), cluserID, req.path, req.method, req.ip);
res.on("close", () => console.log("[%s, cluserID: %f]: Path: %s, Method: %s, IP: %s, Status: %f", new Date().toISOString(), cluserID, req.path, req.method, req.ip, res.statusCode));
res.json = data => {
Promise.resolve(data).then(data => res.setHeader("Content-Type", "application/json").send(JSON.stringify(data, null, 2))).catch(next);
return res;
};
const cluserID = (cluster.worker?.id === 1 ? "Primary" : cluster.worker?.id) ?? "Primary";
console.log("[%s, cluserID: %s]: Path: %s, Method: %s, IP: %s", new Date().toISOString(), cluserID, req.path, req.method, req.ip);
res.on("close", () => console.log("[%s, cluserID: %s]: Path: %s, Method: %s, IP: %s, Status: %f", new Date().toISOString(), cluserID, req.path, req.method, req.ip, res.statusCode));
next();
});
app.get("/pool/:dist/:suite/:package/:arch/:version/download.deb", async ({params: {dist, suite, package: packageName, arch, version}}, {writeHead}, next) => {
@ -55,6 +57,10 @@ export default async function initApp(config: string) {
// Create Package, Package.gz and Package.xz
async function createPackages(dist: string, suite: string, arch: string) {
if (!await packageManeger.existsDist(dist)) throw new Error("Distribution not exists");
if (!await packageManeger.existsSuite(dist, suite)) throw new Error("Suite not exists");
const packages = (await packageManeger.getPackages(dist, suite, undefined, arch)).concat(arch !== "all" ? await packageManeger.getPackages(dist, suite, undefined, "all") : []);
if (!packages.length) throw new Error("Check is dist or suite have packages");
let rawSize = 0, gzipSize = 0, lzmaSize = 0;
const mainReadstream = new Readable({read(){}}), rawSUMs = coreUtils.extendsCrypto.createHashAsync("all", mainReadstream).then(hash => ({size: rawSize, hash}));
const gzip = mainReadstream.pipe(createGzip()), gzipSUMs = coreUtils.extendsCrypto.createHashAsync("all", gzip).then(hash => ({size: gzipSize, hash}));
@ -63,8 +69,6 @@ export default async function initApp(config: string) {
gzip.on("data", data => gzipSize += data.length);
lzma.on("data", data => lzmaSize += data.length);
const packages = await packageManeger.getPackages(dist, suite, undefined, arch);
if (!packages.length) throw new Error("Check is dist or suite have packages");
let fist = true;
for (const {control} of packages) {
if (!(control.Size && (control.MD5sum || control.SHA256 || control.SHA1))) continue;
@ -93,8 +97,8 @@ export default async function initApp(config: string) {
// Release
async function createRelease(dist: string) {
if (!await packageManeger.existsDist(dist)) throw new Error("Dist exists");
const packagesArray = await packageManeger.getPackages(dist);
if (!packagesArray.length) throw new Error("Check is dist have packages");
const Release: {[key: string]: string|string[]} = {};
// Date

@ -100,7 +100,7 @@ yargs(process.argv.slice(2)).version(false).help().demandCommand().strictCommand
app.use((err, {}, res, {}) => {
console.error(err);
const stack: string = err?.stack ?? "No stack";
res.status(500).json({
res.status(400).json({
error: "Internal Server Error",
message: "There was an error on our part, sorry for the inconvenience",
stack: {
@ -112,21 +112,25 @@ yargs(process.argv.slice(2)).version(false).help().demandCommand().strictCommand
const port = process.env.PORT ?? packageConfig["apt-config"]?.portListen ?? 3000;
if (!(Boolean(process.env["DISABLE_CLUSTER"]))) {
if (cluster.isWorker) {
app.listen(port, function() {console.log("Apt Stream Port listen on %f", this.address()?.port)});
return console.log("Worker %d running, PID: %f", cluster.worker?.id ?? "No ID", process.pid);
console.log("Worker %d running, PID: %f", cluster.worker?.id ?? "No ID", process.pid);
app.listen(port, function() {
console.log("Work apt Stream Port listen on %f", this.address()?.port);
});
return;
}
console.log("Master %f is running", process.pid);
console.log("Work master, PID %f, starting workers ...", process.pid);
os.cpus().forEach(() => cluster.fork());
cluster.on("exit", (worker, code, signal: NodeJS.Signals) => {
cluster.on("error", console.error).on("exit", (worker, code, signal: NodeJS.Signals) => {
// if (process[Symbol.for("ts-node.register.instance")]) cluster.setupPrimary({/* Fix for ts-node */ execArgv: ["--loader", "ts-node/esm"]});
if (signal === "SIGKILL") return console.log("Worker %d was killed", worker?.id ?? "No ID");
else if (signal === "SIGABRT") return console.log("Worker %d was aborted", worker?.id ?? "No ID");
else if (signal === "SIGTERM") return console.log("Worker %d was terminated", worker?.id ?? "No ID");
else if (code )
console.log("Worker %d died with code: %s, Signal: %s", worker?.id ?? "No ID", code, signal ?? "No Signal");
});
}).on("online", worker => console.log("Worker %d is online", worker?.id ?? "No ID"));
} else app.listen(port, function() {console.log("Apt Stream Port listen on %f", this.address()?.port)});
// large ram available
if (os.freemem() > 2 * 1024 * 1024 * 1024) return Promise.all(Object.keys(packageConfig.repositories).map(async distName => {const dist = packageConfig.repositories[distName]; return Promise.all(dist.targets.map(async target => packageManeger.loadRepository(distName, target, packageConfig["apt-config"], packageConfig).catch(console.error)));})).catch(console.error);
if (os.freemem() > 2 * 1024 * 1024 * 1024) await Promise.all(Object.keys(packageConfig.repositories).map(async distName => {const dist = packageConfig.repositories[distName]; return Promise.all(dist.targets.map(async target => packageManeger.loadRepository(distName, target, packageConfig["apt-config"], packageConfig).catch(console.error)));})).catch(console.error);
console.warn("Not enough RAM to load all repositories, loading one by one");
for (const distName in packageConfig.repositories) {
const dist = packageConfig.repositories[distName];

@ -91,14 +91,14 @@ export async function getPackages(uri: string, options: {dist: string, suite?: s
reject(err);
}
});
let data = "";
let data: string;
stream.pipe(new Writable({
final(callback) {
done();
callback();
},
write(chunkR, encoding, callback) {
data = data + (encoding === "binary" ? chunkR.toString("utf8") : Buffer.from(chunkR).toString("utf8"));
data = (data ?? "") + (encoding === "binary" ? chunkR.toString("utf8") : Buffer.from(chunkR).toString("utf8"));
data.split(/^\n/).forEach((v) => {
if (v.trim()) {
data = data.replace(v, "");

@ -2,8 +2,9 @@ import coreUtils, { DebianPackage, DockerRegistry, extendFs, httpRequest, httpRe
import { createReadStream, createWriteStream, promises as fs } from "node:fs";
import { MongoClient, ServerApiVersion, Filter } from "mongodb";
import { apt_config, backendConfig, repository } from "./repoConfig.js";
import { getPackages as mirror } from "./mirror.js";
import { Readable } from "node:stream";
import { getPackages } from "./mirror.js";
import cluster from "node:cluster";
import path from "node:path";
import tar from "tar";
@ -25,6 +26,8 @@ export type packageManegerV2 = {
getPackages: (dist?: string, suite?: string, Package?: string, Arch?: string, Version?: string) => Promise<packageSave[]>,
deletePackage: (repo: Partial<packageSave>) => Promise<packageSave>,
addPackage: (repo: packageSave) => Promise<void>,
existsDist: (dist: string) => Promise<boolean>,
existsSuite: (dist: string, suite: string) => Promise<boolean>,
};
/**
@ -38,10 +41,13 @@ export default async function packageManeger(config: backendConfig): Promise<pac
const saveFile = aptConfig["apt-config"]?.saveFiles ?? false;
const rootPool = aptConfig["apt-config"]?.poolPath ?? path.join(process.cwd(), "pool");
if (repository.from === "mirror") {
return Promise.all(Object.keys(repository.dists).map(async distName => {
const distInfo = repository.dists[distName];
const packagesData = distInfo.suites ? await Promise.all(distInfo.suites.map(async suite => getPackages(repository.uri, {dist: distName, suite}))).then(U => U.flat()) : await getPackages(repository.uri, {dist: distName});
return packagesData.forEach(({Package: control}) => {
// Ingore fast load data for low ram memory
for (const repoDistName in repository.dists) {
const distInfo = repository.dists[repoDistName];
const packagesData: Awaited<ReturnType<typeof mirror>> = [];
if (!distInfo.suites) await mirror(repository.uri, {dist: distName}).then(U => packagesData.push(...U));
else for (const suite of distInfo.suites) await mirror(repository.uri, {dist: repoDistName, suite}).then(U => packagesData.push(...U));
const partialPromises = packagesData.map(({Package: control}) => {
const filePool = path.join(rootPool, control.Package.slice(0, 1), `${control.Package}_${control.Architecture}_${control.Version}.deb`);
const getStream = async () => {
if (saveFile && await extendFs.exists(filePool)) return createReadStream(filePool);
@ -56,7 +62,7 @@ export default async function packageManeger(config: backendConfig): Promise<pac
}
return partialConfig.addPackage({
dist: distName,
suite: repository.suite,
suite: repository.suite ?? "main",
repository: repository,
control,
aptConfig: packageAptConfig ?? aptConfig["apt-config"],
@ -67,7 +73,9 @@ export default async function packageManeger(config: backendConfig): Promise<pac
}
});
});
}));
return Promise.all(partialPromises);
}
} else if (repository.from === "oci") {
const registry = await DockerRegistry.Manifest.Manifest(repository.image, repository.platfom_target);
return registry.layersStream((data) => {
@ -298,18 +306,37 @@ export default async function packageManeger(config: backendConfig): Promise<pac
const mongoClient = await (new MongoClient(mongoConfig.uri, {serverApi: ServerApiVersion.v1})).connect();
const collection = mongoClient.db(mongoConfig.db ?? "aptStream").collection<packageSave>(mongoConfig.collection ?? "packagesData");
// Drop collection
if (cluster.isPrimary) {
if (mongoConfig.dropCollention && await collection.findOne()) {
await collection.drop();
console.log("Drop collection: %s", mongoConfig.collection ?? "packagesData");
}
}
// Add package to database
partialConfig.addPackage = async function addPackage(repo) {
const existsPackage = await collection.findOne({dist: repo.dist, suite: repo.suite, "control.Package": repo.control.Package, "control.Version": repo.control.Version, "control.Architecture": repo.control.Architecture});
if (existsPackage) await partialConfig.deletePackage(repo);
await collection.insertOne(repo);
console.log("Added '%s', version: %s, Arch: %s, in to %s/%s", repo.control.Package, repo.control.Version, repo.control.Architecture, repo.dist, repo.suite);
}
// Delete package
partialConfig.deletePackage = async function deletePackage(repo) {
const packageDelete = await collection.findOneAndDelete({dist: repo.dist, suite: repo.suite, "control.Package": repo.control.Package, "control.Version": repo.control.Version, "control.Architecture": repo.control.Architecture});
if (!packageDelete.value) throw new Error("Package not found!");
return packageDelete.value;
const packageDelete = (await collection.findOneAndDelete({dist: repo.dist, suite: repo.suite, "control.Package": repo.control.Package, "control.Version": repo.control.Version, "control.Architecture": repo.control.Architecture}))?.value;
if (!packageDelete) throw new Error("Package not found!");
console.info("Deleted '%s', version: %s, Arch: %s, from %s/%s", packageDelete.control.Package, packageDelete.control.Version, packageDelete.control.Architecture, packageDelete.dist, packageDelete.suite);
return packageDelete;
}
// Exists
partialConfig.existsDist = async function existsDist(dist) {
return (await collection.findOne({dist})) ? true : false;
}
partialConfig.existsSuite = async function existsSuite(dist, suite) {
if (await partialConfig.existsDist(dist)) return (await collection.findOne({dist, suite})) ? true : false;
return false;
}
// Packages
@ -341,8 +368,14 @@ export default async function packageManeger(config: backendConfig): Promise<pac
}
partialConfig.getPackages = async function getPackages(dist, suite, Package, Arch, Version) {
const doc: Filter<packageSave> = {};
if (dist) doc.dist = dist;
if (suite) doc.suite = suite;
if (dist) {
if (!await partialConfig.existsDist(dist)) throw new Error("Distribution not found!");
doc.dist = dist;
}
if (suite) {
if (!await partialConfig.existsSuite(dist, suite)) throw new Error("Suite/Component not found!");
doc.suite = suite;
}
if (Package) doc["control.Package"] = Package;
if (Arch) doc["control.Architecture"] = Arch;
if (Version) doc["control.Version"] = Version;
@ -359,18 +392,31 @@ export default async function packageManeger(config: backendConfig): Promise<pac
const existsPackage = packagesArray.find((x) => x.control.Package === repo.control.Package && x.control.Version === repo.control.Version && x.control.Architecture === repo.control.Architecture && x.dist === repo.dist && x.suite === repo.suite && x.repository === repo.repository);
if (existsPackage) await partialConfig.deletePackage(repo);
packagesArray.push(repo);
console.log("Added '%s', version: %s, Arch: %s, in to %s/%s", repo.control.Package, repo.control.Version, repo.control.Architecture, repo.dist, repo.suite);
}
// Delete package
partialConfig.deletePackage = async function deletePackage(repo) {
const index = packagesArray.findIndex((x) => x.control.Package === repo.control.Package && x.control.Version === repo.control.Version && x.control.Architecture === repo.control.Architecture && x.dist === repo.dist && x.suite === repo.suite && x.repository === repo.repository);
if (index === -1) throw new Error("Package not found!");
const packageDelete = packagesArray.splice(index, 1);
return packageDelete.at(-1);
const packageDelete = packagesArray.splice(index, 1).at(-1);
console.info("Deleted '%s', version: %s, Arch: %s, from %s/%s", packageDelete.control.Package, packageDelete.control.Version, packageDelete.control.Architecture, packageDelete.dist, packageDelete.suite);
return packageDelete;
}
// Exists
partialConfig.existsDist = async function existsDist(dist) {
return packagesArray.find(x => x.dist === dist) ? true : false;
}
partialConfig.existsSuite = async function existsSuite(dist, suite) {
if (await partialConfig.existsDist(dist)) return packagesArray.find(x => x.dist === dist && x.suite === suite) ? true : false;
return false;
}
// Packages
partialConfig.getPackages = async function getPackages(dist, suite, Package, Arch, Version) {
if (dist && !await partialConfig.existsDist(dist)) throw new Error("Distribution not found!");
if (suite && !await partialConfig.existsSuite(dist, suite)) throw new Error("Suite/Component not found!");
const packageInfo = packagesArray.filter(x => (!dist || x.dist === dist) && (!suite || x.suite === suite) && (!Package || x.control.Package === Package) && (!Arch || x.control.Architecture === Arch) && (!Version || x.control.Version === Version));
if (!packageInfo.length) throw new Error("Package not found!");
return packageInfo;

@ -82,6 +82,8 @@ export type backendConfig = Partial<{
uri: string,
db?: string,
collection?: string,
/** On connect to database drop collection to run in empty data */
dropCollention?: boolean
}
},
repositories: {
@ -165,7 +167,8 @@ export async function getConfig(config: string) {
fixedConfig["apt-config"].mongodb = {
uri: rootData.mongodb.uri,
db: rootData.mongodb.db ?? "apt-stream",
collection: rootData.mongodb.collection ?? "packages"
collection: rootData.mongodb.collection ?? "packages",
dropCollention: Boolean(rootData.mongodb.dropCollention ?? false)
};
}
}

@ -21,6 +21,8 @@
"src/**/*.test.ts"
],
"ts-node": {
"esm": true
"esm": true,
"experimentalResolver": true,
"experimentalSpecifierResolution": "node"
}
}