Welcome to WuJiGu Developer Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
627 views
in Technique[技术] by (71.8m points)

node.js - Nodejs Resumable file upload using busboy

I am implementing an upload endpoint using express and busboy. The code works like a charm. I am trying to implement the logic of resumable uploads. For eg, if a user is uploading a 10gb or 50gb file, and if their internet disconnects, and next time when they are uploading the same file, it should resume.

I do understand that I need to implement another endpoint that should tell the client how many bytes has been uploaded so far, so that the client can send the remaining bytes.

I am not sure how to proceed from here, because the first problem i am facing here is that when the upload happens, express uploads the temp file in the tmp directory of the OS. Is it possible to upload the temp file in my current script directory?

Following is my upload endpoint code.

router.post("/upload", (req, res, next) => {
  const busboy = new Busboy({ headers: req.headers });
  req.pipe(busboy);
  busboy.on("file", (fieldname, file, filename) => {
    const filepath = path.join(__dirname, filename);
    var writeStream = fs.createWriteStream(filepath);

    file.pipe(writeStream);
    writeStream.on("close", () => {
      res.send(filename);
    });
  });
});

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Answering my own question.

I have managed to crack the solution. Basically you need to resume uploading the file from where it was last uploaded. I am not sure if this it the best way of handling but it does the job for me.

server.js

const express = require("express");
const busboy = require("connect-busboy");
const path = require("path");
const fs = require("fs");
const cors = require("cors");

const app = express(); // Initialize the express web server
app.use(cors());
app.use(
  busboy({
    highWaterMark: 2 * 1024 * 1024, // Set 2MiB buffer
  })
); // Insert the busboy middle-ware

app.use(busboy()); // Insert the busboy middle-ware

const uploadPath = path.join(__dirname, "upload_data");

const database = {};

// This endpoint tells the client how much bytes have already been uploaded so far, otherwise sends 0 byte (meaning file is new)
app.route("/:id").get((req, res, next) => {
  const fileId = req.params.id;
  let bytes = 0;

  const dbFileName = database[fileId];
  if (dbFileName) {
    try {
      const completeFilePath = path.join(uploadPath, dbFileName);
      const fd = fs.openSync(completeFilePath, "r");
      const fileStat = fs.fstatSync(fd);
      bytes = fileStat.size;
      return res.json({ bytes: bytes });
    } catch (error) {
      console.error(error);
      return res.json({ bytes: bytes });
    }
  }
  return res.json({ bytes: bytes });
});



// Handle the upload post request
app.route("/upload").post((req, res, next) => {
  const xFileId = req.headers["x-file-id"];
  const xStartByte = parseInt(req.headers["x-start-byte"], 10);
  const xFileSize = parseInt(req.headers["x-file-size"], 10);

  if (xStartByte >= xFileSize) {
    return res.json("File already uploaded");
  }

  req.pipe(req.busboy); // Pipe it trough busboy

  req.on("data", (data) => {
    // console.log(">", data.length);
  });

  req.busboy.on("file", (fieldname, file, filename) => {
    if (database[xFileId]) {
      filename = database[xFileId];
    } else {
      database[xFileId] = filename;
    }

    const completeFilePath = path.join(uploadPath, filename);
    console.log(`Upload of '${filename}' started`);

    // Create a write stream of the new file
    let fstream;
    if (xStartByte) {
      console.log("APPEND Mode");
      fstream = fs.createWriteStream(completeFilePath, {
        flags: "a",
      });
    } else {
      console.log("WRITE Mode");
      fstream = fs.createWriteStream(completeFilePath, {
        flags: "w",
      });
    }

    // Pipe it trough
    file.pipe(fstream);

    file.on("error", (e) => console.log("file.on.error", e));

    file.on("limit", (e) => console.log("Limit reached", e));

    fstream.on("error", function (err) {
      console.log("fileStream error>>>>>", err);
    });

    // On finish of the upload
    fstream.on("close", () => {
      console.log(`Upload of '${filename}' finished`);
      // res.json('done');
    });
  });

  req.busboy.on("finish", function (a) {
    return res.json("ok");
  });

  req.busboy.on("error", (err) => {
    console.log(`Busboy error`, err);
  });
});
 

app.listen(6969, () => console.log("listing on 6969"));

client.js

var request = require("request");
var fs = require("fs");
var path = require("path");

let filebasename = "35gb.zip";
const filePath = path.join(__dirname, filebasename);


// Get the information about the file, like filesize and unique id of the file 
function getFileInfo() {
  try {
    const fd = fs.openSync(filePath, "r");
    const fileStat = fs.fstatSync(fd);
    return {
      fileId: `${filebasename}-${fileStat.size}-${fileStat.mtimeMs}`,
      size: fileStat.size,
    };
  } catch (error) {
    console.error(error);
  }
}

const { fileId, size } = getFileInfo();

// Send api request to server asking how much bytes have already been uploaded (if any)
function info() {
  let url = `http://localhost:6969/${fileId}`;

  const options = {
    method: "GET",
    url,
    timeout: 200000,
    headers: {
      "Content-Type": "application/json",
    },
  };

  return new Promise((resolve, reject) => {
    request(options, function (err, res, body) {
      if (err) {
        console.log(err);
        return reject(err);
      }

      const { bytes } = JSON.parse(body);
      resolve(bytes);
    });
  });
}

 
// Send upload request
async function upload() {
  const bytesAlreadyUploaded = await info();
  let url = "http://localhost:6969/upload";
  const uploadStream = fs.createReadStream(filePath, {
    start: bytesAlreadyUploaded, // this will be 0 incase file is new
    highWaterMark: 2 * 1024 * 1024,
  });

  const options = {
    method: "POST",
    url,
    timeout: 200000,
    headers: {
      "Content-Type": "multipart/form-data",
      "x-file-id": fileId,
      "x-start-byte": bytesAlreadyUploaded,
      "x-file-size": size,
      "x-file-name": filebasename,
    },
    formData: {
      image: uploadStream,
    },
  };

  request(options, async (err, res, body) => {
    if (err) {
      // Basically if an error occurs, EPIPE or Connection timed out or any other error, we will resume uploading from the point where it was last uploaded
      console.log(`Error ${err.code}. Resuming upload...`);
      await upload();
      return;
    }
    console.log("body", typeof body, body);
  });
}

(async () => {
  upload();
})();

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to WuJiGu Developer Q&A Community for programmer and developer-Open, Learning and Share
...