多线程实现ftp 文件下载

  • 2025-08-20 18:18:57

1 需求:

某个接口的纪录在ftp 服务器上,以类别/日期/时间.来存放文件,而一天可能会产生几百个文件,需要下载文件进行保存

问题:

1. 这个时候如果同时,要拿几个类别,某个时间段的数据,就要疯狂下载了,如果是单线程的

2. ftp 一般只允许同一个用户名,同时有几个进程连接

3. ftp 多个读取文件循环读取的时候,经常会出现,第一个文件得到字节流,而后面的文件的字节流都是null, 不管是sun的ftp,还是apache得common得ftp 包

4. 读取完文件,多线程如何通知,或者得到结果

解决办法:

1. 第一个问题,使用多线程去爬取,先取出符合条件得文件名,然后放到文件名集合,然后取根据文件名集合下载文件

2. 第二个问题,给每个线程都有一个连接

3.第三个问题: ,读取完一个文件,就关流,并执行

completePending 方法,使连接可以持续读取文件,不然读取一个文件后,读取其他文件就一直获取null,

ftp.enablePassiveMode(true);//被动模式,读取文件时有时候ftp 时区和本地时区不一样,读取文件之前,需要将连接设置为被动模式,不然有时也会报错

4.使用 Executors 类 和 Future来获得通知,并使用

future.get(); 方法,这个方法会阻塞当前线程,直至线程执行完毕

具体代码:

package com.my;

import java.io.BufferedReader;

import java.io.File;

import java.io.FileOutputStream;

import java.io.IOException;

import java.io.InputStream;

import java.io.InputStreamReader;

import java.net.InetSocketAddress;

import java.net.SocketAddress;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.concurrent.*;

import org.dom4j.*;

import sun.net.ftp.FtpClient;

import sun.net.ftp.FtpProtocolException;

public class FtpUtil {

public static void main(String[] args) {

long startTime = System.currentTimeMillis();

List list = history("201810230000","201810231200");

long endTime = System.currentTimeMillis();

System.out.println((endTime - startTime) + "毫秒");

System.out.println(list.size());

}

public static FtpClient connectFTP(String url, int port, String username, String password) {

//创建ftp

FtpClient ftp = null;

try {

//创建地址

SocketAddress addr = new InetSocketAddress(url, port);

//连接

ftp = FtpClient.create();

ftp.connect(addr);

//登陆

ftp.login(username, password.toCharArray());

ftp.setBinaryType();

} catch (FtpProtocolException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

}

return ftp;

}

public static void close (FtpClient ftp) {

if(ftp != null) {

try {

ftp.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

// ======================================== 对 需要 的文件编译的字符串进行 解析成Map ========================================================

//对时间进行转换 转换为美东时间

// 找出对应的日期的文件有哪些

//对四个顶层目录,分别遍历查找对应时间的文件有哪些,将这些文件名放入 对应的四个map中

// 对每个map 中的文件,分别进行解析,并放入map,然后将map 放入list

//最后将四个list 放入list

//顶层目录有四个

private static String [] parentFile = {"文件夹1","文件夹2","文件夹3", "文件夹4"};

private static final String url = "ftp网址";

private static final int port = 23;

private static final String username = "用户名";

private static final String password = "密码";

/**

*

* @param fromDate

* @param toDate

*/

public static List history(String fromDate,String toDate) {

FtpClient ftp = FtpUtil.connectFTP(url, port, username, password);//获得ftp 连接

List list = new ArrayList<>();

List nameLists = new ArrayList<>();

if(ftp.isConnected() && ftp.isLoggedIn()) {

for (int i = 0; i < parentFile.length; i++) {

String filePath = "/";

filePath = filePath + parentFile[i] +"/" + fromDate.substring(0,8);

List nameList = getChildFileName(filePath, ftp, fromDate.substring(0,12), toDate.substring(0,12));

if(nameList != null && nameList.size() > 0) {

nameLists.add(nameList);

}

}

}

close(ftp);

if(nameLists != null && nameLists.size() > 0) {

for (List nameList: nameLists ) {

FtpUtil ftpUtil = new FtpUtil();

List>>> futureList = ftpUtil.threadSearch(nameList,10);

List> childList = new ArrayList<>();

list.add(childList);

for (Future>> future : futureList) {

try {

System.out.println(future.isDone() + "---" + future.toString());

List> reList = future.get();

System.out.println(future.toString() + "---单个线程所得的结果---" + reList.size());

if (reList != null) {

childList.addAll(reList);

}

System.out.println(future.isDone() + "---" + future.toString());

} catch (InterruptedException | ExecutionException e) {

e.printStackTrace();

}

}

}

}

return list;

}

/**

* 根据父文件名,获得列出其下的文件的字节流

* 读取字节流的同时,根据 开始时间和结束时间判断是否是需要的文件,

* 将需要的文件放入list中

* @param ftpFile

* @param ftp

* @param fromDate

* @param toDate

* @return

*/

public static List getChildFileName(String ftpFile,FtpClient ftp,String fromDate,String toDate) {

long startTime = Long.valueOf(fromDate);

long endTime = Long.valueOf(toDate);

InputStream is = null;

List list = new ArrayList<>();

try {

// 获取ftp上的文件

is = ftp.nameList(ftpFile);

if(is != null) {

BufferedReader in = new BufferedReader(new InputStreamReader(is,"UTF-8"));

while (true) {

String line = in.readLine();

if (line == null) {

break;

}

else {

long dateNumber = Long.valueOf(line.substring(line.lastIndexOf("/") + 1,line.length()).replace(".xml",""));

if(startTime <= dateNumber && dateNumber <= endTime ) {

list.add(line);

}

}

}

in.close();

}

} catch (FtpProtocolException | IOException e) {

e.printStackTrace();

} finally {

try {

if(is!=null){

is.close();

}

} catch (IOException e) {

e.printStackTrace();

}

}

return list;

}

public static String parseXmlFile(String ftpFile,FtpClient ftp) {

InputStream is = null;

StringBuffer sbf = new StringBuffer();

ftp.enablePassiveMode(true);//被动模式

try {

// 获取ftp上的文件

is = ftp.getFileStream(ftpFile);

if (is != null) {

BufferedReader in = new BufferedReader(new InputStreamReader(is, "UTF-8"));

while (true) {

String line = in.readLine();

if (line == null) {

break;

} else {

sbf.append(line);

sbf.append("\r\n");

// valList.add(parseXml(line));

}

}

in.close();

ftp.completePending();

}

} catch (FtpProtocolException | IOException e) {

e.printStackTrace();

}

return sbf.toString();

}

/**

* 多线程分配

*/

public List>>> threadSearch(List list,int threadCount) {

int len = threadCount;//定义分割多少份线程

if(list.size() < len){

len = list.size();

}

List> splitList = splitList(list, len);//分割一个线程执行多少个类型

TimerThread tt = null;

ExecutorService exec = Executors.newCachedThreadPool();//工头

ArrayList>>> results = new ArrayList<>();//结果通知

for (int i = 0; i < len; i++) {

TimerThread timerThread = new TimerThread(splitList.get(i),connectFTP(url,port,username,password));

results.add(exec.submit(timerThread));//submit返回一个Future,代表了即将要返回的结果

}

return results;

}

/**

* 多线程具体类

* @author 12198

*

*/

public class TimerThread implements

Callable>> {

public List li;

public FtpClient ftp;

public List> list;

public TimerThread(List li,FtpClient ftp){

this.li=li;

this.ftp = ftp;

this.list = new ArrayList<>();

}

@Override

public List> call() throws Exception {

StringBuffer sbf = new StringBuffer();

if(null != li && li.size() > 0){

for(String filePath : li){

System.out.println(Thread.currentThread().getName() + "--" + ftp.toString());

// list.addAll(parseXmlFile(filePath,ftp));

sbf.append(parseXmlFile(filePath,ftp));

}

}

close(ftp);

//最后一起解析字符串

String [] str = sbf.toString().split("\r\n");

for (String xmlString:str) {

Map map = parseXml(xmlString);

if(map != null && map.size() > 0) list.add(map);

}

return this.list;

}

}

// ========================= 引用的其他类方法 ====================================

public static Map parseXml(String str) {

Map xmlMap = new HashMap<>();

Document doc = null;

try {

doc = DocumentHelper.parseText(str);

} catch (DocumentException e) {

e.printStackTrace();

}

if(doc != null) {

Element rootElement = doc.getRootElement();

List list = rootElement.attributes();

if(list != null && list.size() > 0) {

if(list != null && list.size() > 0 ) {

for (Attribute attr : list) {

xmlMap.put(attr.getName(), attr.getValue());

}

}

}

}

return xmlMap;

}

public static List> splitList(List list, int n) {

List> strList = new ArrayList<>();

if (list == null) return strList;

int size = list.size();

int quotient = size / n; // 商数

int remainder = size % n; // 余数

int offset; // 偏移量

int len = quotient > 0 ? n : remainder; // 循环长度

int start = 0; // 起始下标

int end; // 结束下标

List tempList;

for (int i = 0; i < len; i++) {

if (remainder != 0) {

remainder--;

offset = 1;

} else {

offset = 0;

}

end = start + quotient + offset;

tempList = list.subList(start, end);

start = end;

strList.add(tempList);

}

return strList;

}

}

apache commons ftp 版得

package com.my;

import org.apache.commons.net.ftp.FTPClient;

import org.dom4j.*;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStream;

import java.io.InputStreamReader;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.concurrent.*;

/**

* 使用apache-commons-net 包的ftp 连接工具

*

* 指定每个线程获得一个FtpClient 连接实例

*

*/

public class ApacheFtpUtils {

public static void main(String[] args) {

List list = history("201810170200","201810170300");

System.out.println(list.size());

}

public static FTPClient connectFTP(String hostName,int port,String username,String password) {

FTPClient ftp = new FTPClient();

boolean isLogin = false;

try {

ftp.connect(url,port);

isLogin = ftp.login(username,password);

} catch (IOException e) {

e.printStackTrace();

}

if(isLogin) return ftp;

else return null;

}

/**

* 根据父文件名,获得列出其下的文件的字节流

* 读取字节流的同时,根据 开始时间和结束时间判断是否是需要的文件,

* 将需要的文件放入list中

* @param ftpFile

* @param ftp

* @param fromDate

* @param toDate

* @return

*/

public static List getChildFileName(String ftpFile,FTPClient ftp,String fromDate,String toDate) {

long startTime = Long.valueOf(fromDate);

long endTime = Long.valueOf(toDate);

List list = new ArrayList<>();

String [] nameStr = null;

ftp.enterLocalPassiveMode();//解决 net 包因中文 年/月,无法匹配,而导致的文件查找为空问题

try {

nameStr = ftp.listNames(ftpFile);

} catch (IOException e) {

e.printStackTrace();

}

if(nameStr != null && nameStr.length >0 ) {

for (int i = 0; i < nameStr.length; i++) {

String line = nameStr[i];

long dateNumber = Long.valueOf(line.substring(line.lastIndexOf("/") + 1,line.length()).replace(".xml",""));

if(startTime <= dateNumber && dateNumber <= endTime ) {

list.add(line);

}

}

}

ftp.enterLocalActiveMode();

return list;

}

/**

* 下载ftp 文件,并隔行解析,放入list

* @param ftpFile

* @param ftp

* @return

*/

public static List> parseXmlFile(String ftpFile,FTPClient ftp) {

List> valList = new ArrayList<>();

InputStream is = null;

StringBuffer sbf = new StringBuffer();

try {

// 获取ftp上的文件

ftp.enterLocalPassiveMode();

is = ftp.retrieveFileStream(ftpFile);

if (is != null) {

BufferedReader in = new BufferedReader(new InputStreamReader(is, "UTF-8"));

while (true) {

String line = in.readLine();

if (line == null) {

break;

} else {

valList.add(parseXml(line));

}

}

in.close();

ftp.completePendingCommand();//完成等待方法 ,如果一个连接中要连续读写多个文件,需要强行关流,在执行completePendingCommand 方法

}

} catch ( IOException e) {

e.printStackTrace();

}

ftp.enterLocalActiveMode();

return valList;

}

private static String [] parentFile = {"文件名1","文件名2","文件名3", "文件名4"};

private static final String url = "ftp网址";

private static final int port = 23;

private static final String username = "用户名";

private static final String password = "密码";

/**

*

* @param fromDate

* @param toDate

*/

public static List history(String fromDate,String toDate) {

FTPClient ftp = ApacheFtpUtils.connectFTP(url, port, username, password);//获得ftp 连接

List list = new ArrayList<>();// 具体的List>> 集合

List nameLists = new ArrayList<>();// 文件名列集合

if(ftp != null ) {

for (int i = 0; i < parentFile.length; i++) {

String filePath = "/";

filePath = filePath + parentFile[i] +"/" + fromDate.substring(0,8);

List nameList = getChildFileName(filePath, ftp, fromDate.substring(0,12), toDate.substring(0,12));

//nameList 为0 判断

if(nameList != null && nameList.size() > 0) {

nameLists.add(nameList);

}

}

}

try {

ftp.logout();

} catch (IOException e) {

e.printStackTrace();

}

//将得到的符合查询要求的文件名集合,进行遍历,每个元素,循环处理,每个元素,分多个线程去下载解析

if(nameLists != null && nameLists.size() > 0) {

for (List nameList:nameLists) {

ApacheFtpUtils ftpUtils = new ApacheFtpUtils();

List>>> futureList = ftpUtils.threadSearch(nameList);

List> childList = new ArrayList<>();

list.add(childList);

for (Future>> future : futureList) {

try {

System.out.println(future.isDone() + "---" + future.toString());

List> reList = future.get();

System.out.println(future.toString() + "---单个线程所得的结果---" + reList.size());

if (reList != null) {

childList.addAll(reList);

}

System.out.println(future.isDone() + "---" + future.toString());

} catch (InterruptedException | ExecutionException e) {

e.printStackTrace();

}

}

}

}

return list;

}

/**

* 多线程分配

* @param list

* @return

*/

public List>>> threadSearch(List list) {

int len = 5;//定义分割多少份线程

if(list.size() < len){

len = list.size();

}

List> splitList = splitList(list, len);//分割一个线程执行多少个类型

TimerThread tt = null;

ExecutorService exec = Executors.newCachedThreadPool();//工头

ArrayList>>> results = new ArrayList<>();//结果通知

for (int i = 0; i < len; i++) {

TimerThread timerThread = new TimerThread(splitList.get(i),connectFTP(url,port,username,password));

results.add(exec.submit(timerThread));//submit返回一个Future,代表了即将要返回的结果

}

return results;

}

/**

* 多线程 具体类

*/

public class TimerThread implements

Callable>> {

public List li;

public FTPClient ftp;

public List> list;

public TimerThread(List li,FTPClient ftp){

this.li=li;

this.ftp = ftp;

this.list = new ArrayList<>();

}

@Override

public List> call() throws Exception {

if(null != li && li.size() > 0){

// for(String filePath : li){

for (int i = 0; i < li.size(); i++) {

System.out.println(Thread.currentThread().getName() + "--" + ftp.toString());

list.addAll(parseXmlFile(li.get(i),ftp));

}

}

//执行完毕,关闭连接

ftp.logout();

return this.list;

}

}

//====================================== 引入的其他类的方法 ============================================================

/**

* 解析xml 字符串

* @param str

* @return

*/

public static Map parseXml(String str) {

Map xmlMap = new HashMap<>();

Document doc = null;

try {

doc = DocumentHelper.parseText(str);

} catch (DocumentException e) {

e.printStackTrace();

}

if(doc != null) {

Element rootElement = doc.getRootElement();

List list = rootElement.attributes();

if(list != null && list.size() > 0) {

if(list != null && list.size() > 0 ) {

for (Attribute attr : list) {

xmlMap.put(attr.getName(), attr.getValue());

}

}

}

}

return xmlMap;

}

public static List> splitList(List list, int n) {

List> strList = new ArrayList<>();

if (list == null) return strList;

int size = list.size();

int quotient = size / n; // 商数

int remainder = size % n; // 余数

int offset; // 偏移量

int len = quotient > 0 ? n : remainder; // 循环长度

int start = 0; // 起始下标

int end; // 结束下标

List tempList;

for (int i = 0; i < len; i++) {

if (remainder != 0) {

remainder--;

offset = 1;

} else {

offset = 0;

}

end = start + quotient + offset;

tempList = list.subList(start, end);

start = end;

strList.add(tempList);

}

return strList;

}

}