本博客介绍如何进行文件的分块上传。本文侧重介绍客户端,服务器端请参考博客《Java 文件分块上传服务器端源代码》。建议读者朋友在阅读本文代码前先了解一下 MIME 协议。
所谓分块上传并非把大文件进行物理分块,然后挨个上传,而是依次读取大文件的一部分文件流进行上传。分块,倒不如说分流比较切实。本文通过一个项目中的示例,说明使用 Apache 的 HttpComponents/HttpClient 对大文件进行分块上传的过程。示例使用的版本是
HttpComponents Client 4.2.1。
本文仅以一小 demo 功能性地解释 HttpComponents/HttpClient 分块上传,没有考虑 I/O 关闭、多线程等资源因素,读者可以根据自己的项目酌情处理。
本文核心思想及流程:以 100 MB 大小为例,大于 100 MB 的进行分块上传,否则整块上传。对于大于 100 MB 的文件,又以 100 MB 为单位进行分割,保证每次以不大于 100 MB 的大小进行上传。比如 304 MB 的一个文件会分为 100 MB、100
MB、100 MB、4 MB 等四块依次上传。第一次读取 0 字节开始的 100 MB 个字节,上传;第二次读取第 100 MB 字节开始的 100 MB 个字节,上传;第三次读取第 200 MB 字节开始的 100 MB 个字节,上传;第四次读取最后剩下的 4 MB 个字节进行上传。
自定义的 ContentBody 源码如下,其中定义了流的读取和输出:
packagecom.mon.util.block;
importjava.io.File;
importjava.io.IOException;
importjava.io.OutputStream;
importjava.io.RandomAccessFile;
importorg.apache.http.entity.mime.content.AbstractContentBody;
importcom.defonds.rtupload.GlobalConstant;
publicclassBlockStreamBodyextendsAbstractContentBody{
//给MultipartEntity看的2个参数
privatelongblockSize=0;//本次分块上传的大小
privateStringfileName=null;//上传文件名
//writeTo需要的3个参数
privateintblockNumber=0,blockIndex=0;//blockNumber分块数;blockIndex当前第几块
privateFiletargetFile=null;//要上传的文件
privateBlockStreamBody(StringmimeType){
super(mimeType);
//TODOAuto-generatedconstructorstub
}
/**
*自定义的ContentBody构造子
*@paramblockNumber分块数
*@paramblockIndex当前第几块
*@paramtargetFile要上传的文件
*/
publicBlockStreamBody(intblockNumber,intblockIndex,FiletargetFile){
this("application/octet-stream");
this.blockNumber=blockNumber;//blockNumber初始化
this.blockIndex=blockIndex;//blockIndex初始化
this.targetFile=targetFile;//targetFile初始化
this.fileName=targetFile.getName();//fileName初始化
//blockSize初始化
if(blockIndex
this.blockSize=GlobalConstant.CLOUD_API_LOGON_SIZE;
}else{//最后一块
this.blockSize=targetFile.length()-GlobalConstant.CLOUD_API_LOGON_SIZE*(blockNumber-1);
}
}
@Override
publicvoidwriteTo(OutputStreamout)throwsIOException{
byteb[]=newbyte[1024];//暂存容器
RandomAccessFileraf=newRandomAccessFile(targetFile,"r");//负责读取数据
if(blockIndex==1){//第一块
intn=0;
longreadLength=0;//记录已读字节数
while(readLength<=blockSize-1024){//大部分字节在这里读取
n=raf.read(b,0,1024);
readLength+=1024;
out.write(b,0,n);
}
if(readLength<=blockSize){//余下的不足1024个字节在这里读取
n=raf.read(b,0,(int)(blockSize-readLength));
out.write(b,0,n);
}
}elseif(blockIndex
raf.seek(GlobalConstant.CLOUD_API_LOGON_SIZE*(blockIndex-1));//跳过前[块数*固定大小]个字节
intn=0;
longreadLength=0;//记录已读字节数
while(readLength<=blockSize-1024){//大部分字节在这里读取
n=raf.read(b,0,1024);
readLength+=1024;
out.write(b,0,n);
}
if(readLength<=blockSize){//余下的不足1024个字节在这里读取
n=raf.read(b,0,(int)(blockSize-readLength));
out.write(b,0,n);
}
}else{//最后一块
raf.seek(GlobalConstant.CLOUD_API_LOGON_SIZE*(blockIndex-1));//跳过前[块数*固定大小]个字节
intn=0;
while((n=raf.read(b,0,1024))!=-1){
out.write(b,0,n);
}
}
//TODO最后不要忘掉关闭out/raf
}
@Override
publicStringgetCharset(){
//TODOAuto-generatedmethodstub
returnnull;
}
@Override
publicStringgetTransferEncoding(){
//TODOAuto-generatedmethodstub
return"binary";
}
@Override
publicStringgetFilename(){
//TODOAuto-generatedmethodstub
returnfileName;
}
@Override
publiclonggetContentLength(){
//TODOAuto-generatedmethodstub
returnblockSize;
}
}
在自定义的 HttpComponents/HttpClient 工具类 HttpClient4Util 里进行分块上传的封装:
publicstaticStringrestPost(StringserverURL,FiletargetFile,MapmediaInfoMap){
Stringcontent="";
try{
DefaultHttpClienthttpClient=newDefaultHttpClient();
HttpPostpost=newHttpPost(serverURL+"?");
httpClient.getParams().setParameter("http.socket.timeout",60*60*1000);
MultipartEntitympEntity=newMultipartEntity();
Listkeys=newArrayList(mediaInfoMap.keySet());
Collections.sort(keys,String.CASE_INSENSITIVE_ORDER);
for(Iteratoriterator=keys.iterator();iterator.hasNext();){
Stringkey=iterator.next();
if(StringUtils.isNotBlank(mediaInfoMap.get(key))){
mpEntity.addPart(key,newStringBody(mediaInfoMap.get(key)));
}
}
if(targetFile!=null&&targetFile.exists()){
ContentBodycontentBody=newFileBody(targetFile);
mpEntity.addPart("file",contentBody);
}
post.setEntity(mpEntity);
HttpResponseresponse=httpClient.execute(post);
content=EntityUtils.toString(response.getEntity());
httpClient.getConnectionManager().shutdown();
}catch(Exceptione){
e.printStackTrace();
}
System.out.println("=====RequestUrl==========================\n"
+getRequestUrlStrRest(serverURL,mediaInfoMap).replaceAll("&fmt=json",""));
System.out.println("=====content==========================\n"+content);
returncontent.trim();
}
其中 "file" 是分块上传服务器对分块文件参数定义的名字。细心的读者会发现,整块文件上传直接使用 Apache 官方的InputStreamBody,而分块才使用自定义的BlockStreamBody。
最后调用 HttpClient4Util 进行上传:
publicstaticMapuploadToDrive(
Mapparams,Stringdomain){
FiletargetFile=newFile(params.get("filePath"));
longtargetFileSize=targetFile.length();
intmBlockNumber=0;
if(targetFileSize
mBlockNumber=1;
}else{
mBlockNumber=(int)(targetFileSize/GlobalConstant.CLOUD_API_LOGON_SIZE);
longsomeExtra=targetFileSize
%GlobalConstant.CLOUD_API_LOGON_SIZE;
if(someExtra>0){
mBlockNumber++;
}
}
params.put("blockNumber",Integer.toString(mBlockNumber));
if(domain!=null){
LOG.debug("Drive---domain="+domain);
LOG.debug("drive---url="+"http://"+domain+"/sync"
+GlobalConstant.CLOUD_API_PRE_UPLOAD_PATH);
}else{
LOG.debug("Drive---domain=null");
}
StringresponseBodyStr=HttpClient4Util.getRest("http://"+domain
+"/sync"+GlobalConstant.CLOUD_API_PRE_UPLOAD_PATH,params);
ObjectMappermapper=newObjectMapper();
DrivePreInforesult;
try{
result=mapper.readValue(responseBodyStr,ArcDrivePreInfo.class);
}catch(IOExceptione){
LOG.error("Drive.preUploadToArcDriveerror.",e);
thrownewRtuploadException(GlobalConstant.ERROR_CODE_13001);//TODO
}
//JSONObjectjsonObject=JSONObject.fromObject(responseBodyStr);
if(Integer.valueOf(result.getRc())==0){
intuuid=result.getUuid();
StringupsServerUrl=result.getUploadServerUrl().replace("https",
"http");
if(uuid!=-1){
upsServerUrl=upsServerUrl
+GlobalConstant.CLOUD_API_UPLOAD_PATH;
params.put("uuid",String.valueOf(uuid));
for(inti=1;i<=mBlockNumber;i++){
params.put("blockIndex",""+i);
HttpClient4Util.restPostBlock(upsServerUrl,targetFile,
params);//
}
}
}else{
thrownewRtuploadException(GlobalConstant.ERROR_CODE_13001);//TODO
}
returnnull;
}
其中 params 这个 Map 里封装的是服务器分块上传所需要的一些参数,而上传块数也在这里进行确定。
本文中的示例经本人测试能够上传大文件成功,诸如 *.mp4 的文件上传成功没有出现任何问题。如果读者朋友测试时遇到问题无法上传成功,请在博客后跟帖留言,大家共同交流下。本文示例肯定还存在很多不足之处,如果读者朋友发现还请留言指出,笔者先行谢过了。
本博客将介绍如何进行文件的分块上传。如果读者还想了解文件的“分块”下载相关内容可以去参考博客《Java
服务器端支持断点续传的源代码【支持快车、迅雷】》。
本文侧重介绍服务器端,客户端端请参考本篇博客的姊妹篇《Java
文件分块上传客户端源代码》,关于分块上传的思想及其流程,已在该博客中进行了详细说明,这里不再赘述。
直接上代码。接收客户端 HTTP 分块上传请求的 Spring MVC 控制器源代码如下:
@Controller
publicclassUploadControllerextendsBaseController{
privatestaticfinalLoglog=LogFactory.getLog(UploadController.class);
privateUploadServiceuploadService;
privateAuthServiceauthService;
/**
*大文件分成小文件块上传,一次传递一块,最后一块上传成功后,将合并所有已经上传的块,保存到FileServer
*上相应的位置,并返回已经成功上传的文件的详细属性.当最后一块上传完毕,返回上传成功的信息。此时用getFileList查询该文件,
*该文件的uploadStatus为2。client请自行处理该状态下文件如何显示。(forUPSServer)
*
*/
@RequestMapping("/core/v1/file/upload")
@ResponseBody
publicObjectupload(HttpServletResponseresponse,
@RequestParam(value="client_id",required=false)Stringappkey,
@RequestParam(value="sig",required=false)Stringappsig,
@RequestParam(value="token",required=false)Stringtoken,
@RequestParam(value="uuid",required=false)Stringuuid,
@RequestParam(value="block",required=false)StringblockIndex,
@RequestParam(value="file",required=false)MultipartFilemultipartFile,
@RequestParamMapparameters){
checkEmpty(appkey,BaseException.ERROR_CODE_16002);
checkEmpty(token,BaseException.ERROR_CODE_16007);
checkEmpty(uuid,BaseException.ERROR_CODE_20016);
checkEmpty(blockIndex,BaseException.ERROR_CODE_20006);
checkEmpty(appsig,BaseException.ERROR_CODE_10010);
if(multipartFile==null){
thrownewBaseException(BaseException.ERROR_CODE_20020);//上传文件不存在
}
LonguuidL=parseLong(uuid,BaseException.ERROR_CODE_20016);
IntegerblockIndexI=parseInt(blockIndex,BaseException.ERROR_CODE_20006);
MapappMap=getAuthService().validateSigature(parameters);
AccessTokenaccessToken=CasUtil.checkAccessToken(token,appMap);
Longuid=accessToken.getUid();
StringbucketUrl=accessToken.getBucketUrl();
//从上传目录拷贝文件到工作目录
StringfileAbsulutePath=null;
try{
fileAbsulutePath=this.copyFile(multipartFile.getInputStream(),multipartFile.getOriginalFilename());
}catch(IOExceptionioe){
log.error(ioe.getMessage(),ioe);
thrownewBaseException(BaseException.ERROR_CODE_20020);//上传文件不存在
}
FileuploadedFile=newFile(Global.UPLOAD_TEMP_DIR+fileAbsulutePath);
checkEmptyFile(uploadedFile);//file非空验证
Objectrs=uploadService.upload(uuidL,blockIndexI,uid,uploadedFile,bucketUrl);
setHttpStatusOk(response);
returnrs;
}
//TODO查看下这里是否有问题
//上传文件非空验证
privatevoidcheckEmptyFile(Filefile){
if(file==null||file.getAbsolutePath()==null){
thrownewBaseException(BaseException.ERROR_CODE_20020);//上传文件不存在
}
}
/**
*写文件到本地文件夹
*
*@throwsIOException
*返回生成的文件名
*/
privateStringcopyFile(InputStreaminputStream,StringfileName){
OutputStreamoutputStream=null;
StringtempFileName=null;
intpointPosition=fileName.lastIndexOf(".");
if(pointPosition<0){//myvedio
tempFileName=UUID.randomUUID().toString();//94d1d2e0-9aad-4dd8-a0f6-494b0099ff26
}else{//myvedio.flv
tempFileName=UUID.randomUUID()+fileName.substring(pointPosition);//94d1d2e0-9aad-4dd8-a0f6-494b0099ff26.flv
}
try{
outputStream=newFileOutputStream(Global.UPLOAD_TEMP_DIR+tempFileName);
intreadBytes=0;
byte[]buffer=newbyte[10000];
while((readBytes=inputStream.read(buffer,0,10000))!=-1){
outputStream.write(buffer,0,readBytes);
}
returntempFileName;
}catch(IOExceptionioe){
//log.error(ioe.getMessage(),ioe);
thrownewBaseException(BaseException.ERROR_CODE_20020);//上传文件不存在
}finally{
if(outputStream!=null){
try{
outputStream.close();
}catch(IOExceptione){
}
}
if(inputStream!=null){
try{
inputStream.close();
}catch(IOExceptione){
}
}
}
}
/**
*测试此服务是否可用
*
*@paramresponse
*@return
*@authorzwq7978
*/
@RequestMapping("/core/v1/file/testServer")
@ResponseBody
publicObjecttestServer(HttpServletResponseresponse){
setHttpStatusOk(response);
returnGlobal.SUCCESS_RESPONSE;
}
publicUploadServicegetUploadService(){
returnuploadService;
}
publicvoidsetUploadService(UploadServiceuploadService){
this.uploadService=uploadService;
}
publicvoidsetAuthService(AuthServiceauthService){
this.authService=authService;
}
publicAuthServicegetAuthService(){
returnauthService;
}
}
比如要上传的文件是 test450k.mp4。对照《Java
文件分块上传客户端源代码》中分块上传服务器对分块文件参数定义的名字"file",upload 方法里使用的是 MultipartFile 接收该对象。对于每次的 HTTP 请求,使用 copyFile 方法将文件流输出到服务器本地的一个临时文件夹里,比如作者的是 D:/defonds/syncPath/uploadTemp,该文件下会有
50127019-b63b-4a54-8f53-14efd1e58ada.mp4 临时文件生成用于保存上传文件流。
分块依次上传。当所有块都上传完毕之后,将这些临时文件都转移到服务器指定目录中,比如作者的这个目录是 D:/defonds/syncPath/file,在该文件夹下会有/1/temp_dir_5_1 目录生成,而 uploadTemp 的临时文件则被挨个转移到这个文件夹下,生成形如 5.part0001 的文件。以下是文件转移的源代码:
/**
*把所有块从临时文件目录移到指定本地目录或S2/S3
*
*@parampreUpload
*/
privatevoidmoveBlockFiles(BlockPreuploadFileInfopreUpload){
@SuppressWarnings("unchecked")
String[]s3BlockUrl=newString[preUpload.getBlockNumber()];
String[]localBlockUrl=newString[preUpload.getBlockNumber()];//本地的块文件路径以便以后删除
Listblocks=(List)getBaseDao().queryForList(
"upload.getBlockUploadFileByUuid",preUpload.getUuid());
StringtempDirName=SyncUtil.getTempDirName(preUpload.getUuid(),preUpload.getUid());
StringparentPath=Global.UPLOAD_ABSOLUTE_PAHT_+Global.PATH_SEPARATIVE_SIGN
+String.valueOf(preUpload.getUid());
StringdirPath=parentPath+Global.PATH_SEPARATIVE_SIGN+tempDirName;
newFile(dirPath).mkdirs();//创建存放块文件的文件夹(本地)
intj=0;
for(BlockUploadInfoinfo:blocks){
try{
StringstrBlockIndex=createStrBlockIndex(info.getBlockIndex());
StringsuffixPath=preUpload.getUuid()+".part"+strBlockIndex;
StringtempFilePath=info.getTempFile();
FiletempFile=newFile(tempFilePath);
FiletmpFile=newFile(dirPath+suffixPath);
if(tmpFile.exists()){
FileUtils.deleteQuietly(tmpFile);
}
FileUtils.moveFile(tempFile,tmpFile);
localBlockUrl[j]=dirPath+suffixPath;
j++;
info.setStatus(Global.MOVED_TO_NEWDIR);
getBaseDao().update("upload.updateBlockUpload",info);
if(log.isInfoEnabled())
log.info(preUpload.getUuid()+""+info.getBuId()+"moveBlockFiles");
}catch(IOExceptione){
log.error(e.getMessage(),e);
thrownewBaseException("filenotfound");
}
}
preUpload.setLocalBlockUrl(localBlockUrl);
preUpload.setDirPath(dirPath);
preUpload.setStatus(Global.MOVED_TO_NEWDIR);
getBaseDao().update("upload.updatePreUploadInfo",preUpload);
}
privateStringcreateStrBlockIndex(intblockIndex){
StringstrBlockIndex;
if(blockIndex<10){
strBlockIndex="000"+blockIndex;
}elseif(10<=blockIndex&&blockIndex<100){
strBlockIndex="00"+blockIndex;
}elseif(100<=blockIndex&&blockIndex<1000){
strBlockIndex="0"+blockIndex;
}else{
strBlockIndex=""+blockIndex;
}
returnstrBlockIndex;
}
最后是文件的组装源代码:
/**
*组装文件
*
*/
privatevoidassembleFileWithBlock(BlockPreuploadFileInfopreUpload){
StringdirPath=preUpload.getDirPath();
//开始在指定目录组装文件
StringuploadedUrl=null;
String[]separatedFiles;
String[][]separatedFilesAndSize;
intfileNum=0;
Filefile=newFile(dirPath);
separatedFiles=file.list();
separatedFilesAndSize=newString[separatedFiles.length][2];
Arrays.sort(separatedFiles);
fileNum=separatedFiles.length;
for(inti=0;i
separatedFilesAndSize[i][0]=separatedFiles[i];
StringfileName=dirPath+separatedFiles[i];
FiletmpFile=newFile(fileName);
longfileSize=tmpFile.length();
separatedFilesAndSize[i][1]=String.valueOf(fileSize);
}
RandomAccessFilefileReader=null;
RandomAccessFilefileWrite=null;
longalreadyWrite=0;
intlen=0;
byte[]buf=newbyte[1024];
try{
uploadedUrl=Global.UPLOAD_ABSOLUTE_PAHT_+Global.PATH_SEPARATIVE_SIGN+preUpload.getUid()+Global.PATH_SEPARATIVE_SIGN+preUpload.getUuid();
fileWrite=newRandomAccessFile(uploadedUrl,"rw");
for(inti=0;i
fileWrite.seek(alreadyWrite);
//读取
fileReader=newRandomAccessFile((dirPath+separatedFilesAndSize[i][0]),"r");
//写入
while((len=fileReader.read(buf))!=-1){
fileWrite.write(buf,0,len);
}
fileReader.close();
alreadyWrite+=Long.parseLong(separatedFilesAndSize[i][1]);
}
fileWrite.close();
preUpload.setStatus(Global.ASSEMBLED);
preUpload.setServerPath(uploadedUrl);
getBaseDao().update("upload.updatePreUploadInfo",preUpload);
if(Global.BLOCK_UPLOAD_TO!=Global.BLOCK_UPLOAD_TO_LOCAL)
{
//组装完毕没有问题删除掉S2/S3上的block
String[]path=preUpload.getS3BlockUrl();
for(Stringstring:path){
try{
if(Global.BLOCK_UPLOAD_TO==Global.BLOCK_UPLOAD_TO_S2)
{
S2Util.deleteFile(preUpload.getBucketUrl(),string);
}else
{
S3Util.deleteFile(preUpload.getBucketUrl(),string);
}
}catch(Exceptione){
log.error(e.getMessage(),e);
}
}
}
if(log.isInfoEnabled())
log.info(preUpload.getUuid()+"assembleFileWithBlock");
}catch(IOExceptione){
log.error(e.getMessage(),e);
try{
if(fileReader!=null){
fileReader.close();
}
if(fileWrite!=null){
fileWrite.close();
}
}catch(IOExceptionex){
log.error(e.getMessage(),e);
}
}
}
BlockPreuploadFileInfo 是我们自定义的业务文件处理 bean。
OK,分块上传的服务器、客户端源代码及其工作流程至此已全部介绍完毕,以上源代码全部是经过项目实践过的,大部分现在仍运行于一些项目之中。有兴趣的朋友可以自己动手,将以上代码自行改造,看看能否运行成功。如果遇到问题可以在本博客下跟帖留言,大家一起讨论讨论。
如果觉得《java 分块上传_Java 文件分块上传客户端和服务器端源代码》对你有帮助,请点赞、收藏,并留下你的观点哦!