Answering my own question.
I have managed to crack the solution. Basically you need to resume uploading the file from where it was last uploaded. I am not sure if this it the best way of handling but it does the job for me.
server.js
const express = require("express");
const busboy = require("connect-busboy");
const path = require("path");
const fs = require("fs");
const cors = require("cors");
const app = express(); // Initialize the express web server
app.use(cors());
app.use(
busboy({
highWaterMark: 2 * 1024 * 1024, // Set 2MiB buffer
})
); // Insert the busboy middle-ware
app.use(busboy()); // Insert the busboy middle-ware
const uploadPath = path.join(__dirname, "upload_data");
const database = {};
// This endpoint tells the client how much bytes have already been uploaded so far, otherwise sends 0 byte (meaning file is new)
app.route("/:id").get((req, res, next) => {
const fileId = req.params.id;
let bytes = 0;
const dbFileName = database[fileId];
if (dbFileName) {
try {
const completeFilePath = path.join(uploadPath, dbFileName);
const fd = fs.openSync(completeFilePath, "r");
const fileStat = fs.fstatSync(fd);
bytes = fileStat.size;
return res.json({ bytes: bytes });
} catch (error) {
console.error(error);
return res.json({ bytes: bytes });
}
}
return res.json({ bytes: bytes });
});
// Handle the upload post request
app.route("/upload").post((req, res, next) => {
const xFileId = req.headers["x-file-id"];
const xStartByte = parseInt(req.headers["x-start-byte"], 10);
const xFileSize = parseInt(req.headers["x-file-size"], 10);
if (xStartByte >= xFileSize) {
return res.json("File already uploaded");
}
req.pipe(req.busboy); // Pipe it trough busboy
req.on("data", (data) => {
// console.log(">", data.length);
});
req.busboy.on("file", (fieldname, file, filename) => {
if (database[xFileId]) {
filename = database[xFileId];
} else {
database[xFileId] = filename;
}
const completeFilePath = path.join(uploadPath, filename);
console.log(`Upload of '${filename}' started`);
// Create a write stream of the new file
let fstream;
if (xStartByte) {
console.log("APPEND Mode");
fstream = fs.createWriteStream(completeFilePath, {
flags: "a",
});
} else {
console.log("WRITE Mode");
fstream = fs.createWriteStream(completeFilePath, {
flags: "w",
});
}
// Pipe it trough
file.pipe(fstream);
file.on("error", (e) => console.log("file.on.error", e));
file.on("limit", (e) => console.log("Limit reached", e));
fstream.on("error", function (err) {
console.log("fileStream error>>>>>", err);
});
// On finish of the upload
fstream.on("close", () => {
console.log(`Upload of '${filename}' finished`);
// res.json('done');
});
});
req.busboy.on("finish", function (a) {
return res.json("ok");
});
req.busboy.on("error", (err) => {
console.log(`Busboy error`, err);
});
});
app.listen(6969, () => console.log("listing on 6969"));
client.js
var request = require("request");
var fs = require("fs");
var path = require("path");
let filebasename = "35gb.zip";
const filePath = path.join(__dirname, filebasename);
// Get the information about the file, like filesize and unique id of the file
function getFileInfo() {
try {
const fd = fs.openSync(filePath, "r");
const fileStat = fs.fstatSync(fd);
return {
fileId: `${filebasename}-${fileStat.size}-${fileStat.mtimeMs}`,
size: fileStat.size,
};
} catch (error) {
console.error(error);
}
}
const { fileId, size } = getFileInfo();
// Send api request to server asking how much bytes have already been uploaded (if any)
function info() {
let url = `http://localhost:6969/${fileId}`;
const options = {
method: "GET",
url,
timeout: 200000,
headers: {
"Content-Type": "application/json",
},
};
return new Promise((resolve, reject) => {
request(options, function (err, res, body) {
if (err) {
console.log(err);
return reject(err);
}
const { bytes } = JSON.parse(body);
resolve(bytes);
});
});
}
// Send upload request
async function upload() {
const bytesAlreadyUploaded = await info();
let url = "http://localhost:6969/upload";
const uploadStream = fs.createReadStream(filePath, {
start: bytesAlreadyUploaded, // this will be 0 incase file is new
highWaterMark: 2 * 1024 * 1024,
});
const options = {
method: "POST",
url,
timeout: 200000,
headers: {
"Content-Type": "multipart/form-data",
"x-file-id": fileId,
"x-start-byte": bytesAlreadyUploaded,
"x-file-size": size,
"x-file-name": filebasename,
},
formData: {
image: uploadStream,
},
};
request(options, async (err, res, body) => {
if (err) {
// Basically if an error occurs, EPIPE or Connection timed out or any other error, we will resume uploading from the point where it was last uploaded
console.log(`Error ${err.code}. Resuming upload...`);
await upload();
return;
}
console.log("body", typeof body, body);
});
}
(async () => {
upload();
})();