ERROR:upbmsAppid:upbmswl:20161208-16:24:52:jifenhptest:89128126:className:com.hp.upbms.slp.td.transfer.FTPTransfer functionName:doBusiness describe:Exception in FTPTransfer thread when connect to: 10.248.12.2121aiupbms
java.net.SocketException: Broken pipe (errno:32)
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:97)
at java.net.SocketOutputStream.write(SocketOutputStream.java:141)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123)
at org.apache.commons.net.telnet.TelnetClient._flushOutputStream(TelnetClient.java:77)
at org.apache.commons.net.telnet.TelnetOutputStream.flush(TelnetOutputStream.java:137)
at java.io.FilterOutputStream.flush(FilterOutputStream.java:123)
at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:278)
at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:122)
at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:212)
at java.io.BufferedWriter.flush(BufferedWriter.java:236)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:442)
at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:520)
at org.apache.commons.net.ftp.FTP.cwd(FTP.java:745)
at org.apache.commons.net.ftp.FTPClient.changeWorkingDirectory(FTPClient.java:725)
at com.hp.upbms.slp.td.transfer.FTPTransfer.doBusiness(FTPTransfer.java:111)
at com.hp.upbms.slp.td.transfer.FTPTransfer.run(FTPTransfer.java:55)
这是源码:
package com.hp.upbms.slp.td.transfer;
import java.io.File;
import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import lombok.Setter;
import lombok.extern.log4j.Log4j;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;
import org.springframework.beans.factory.InitializingBean;
import com.hp.upbms.slp.td.TransDownUtil;
import com.hp.upbms.slp.td.model.FTPConfig;
import com.hp.upbms.slp.td.model.SyncFile;
import com.hp.upbms.slp.td.transfer.dao.FtpTransferDaoImpl;
/**
-
FTPTransfer
is used transfer file to cooperator's FTP server. - A
FTPTransfer
thread dealing with a business according to -
businessConfig
. -
@author JetHan
*/
@Log4j
public class FTPTransfer extends Thread implements InitializingBean {
@Setter private DataSource dataSource;
@Setter private String ftpType = "00";
@Setter private boolean passwordSecurity = false;private static boolean isRunning = true;
private final static long ThreadSleepTime = 120000L;private FtpTransferDaoImpl ftpDao = new FtpTransferDaoImpl();
private static List provinceList = new ArrayList();
private Map ftpClients = new HashMap(0);public void afterPropertiesSet() throws Exception {
ftpDao.setDataSource(dataSource);
provinceList = ftpDao.getProvinceList();
start();
}@Override public void run() {
log.info("The FTPTransfer Thread is Started.");
while (isRunning) {
try {
doBusiness();
try {
Thread.sleep(ThreadSleepTime);
} catch (Exception ex) {
log.error("SLEEP(LONG MILLIS)", ex);
}
} catch (Throwable e) {
log.error("The FTPTransfer Thread Throwable:" + e.toString(), e);
}
}
log.info("The FTPTransfer Thread is End.");
}private void doBusiness() throws Throwable {
try {
List sfs = ftpDao.getNotSyncFile(ftpType);
log.info("The Ftp Transfer get data num is "+ sfs.size() + ".");
for (SyncFile sf : sfs) {
String partnerId = sf.getPartnerId();
if (provinceList.contains(partnerId)) {// 省公司
partnerId = "000";
}
FTPConfig fc = ftpDao.getFTPConfig(partnerId, ftpType);if (fc == null || !"00".equals(fc.getFtpStatus())) {// 没有FTP配置及配置信息不可用 ftpDao.updateSyncFileStatus(sf.getFileId(), "03", sf.getSyncTimes() + 1); continue; } String key = fc.getFtpIp() + fc.getFtpPort() + fc.getFtpUsername(); try { FTPClient ftpClient = ftpClients.get(key); if (ftpClient == null) { ftpClient = new FTPClient(); ftpClients.put(key, ftpClient); } if (!ftpClient.isConnected()) { ftpClient.connect(fc.getFtpIp(), Integer.parseInt(fc.getFtpPort())); if(passwordSecurity) { ftpClient.login(fc.getFtpUsername(), TransDownUtil.getDecoder(fc.getFtpPassword())); } else { ftpClient.login(fc.getFtpUsername(), fc.getFtpPassword()); } ftpClient.setFileType(FTPClient.BINARY_FILE_TYPE); int reply = ftpClient.getReplyCode(); // If connect failed, try other files. if (!FTPReply.isPositiveCompletion(reply)) { log.error("THE SERVER REPLY " + reply + " ,WHEN CONNECT TO " + key); ftpDao.updateSyncFileStatus(sf.getFileId(), "02", sf.getSyncTimes() + 1); disconnect(ftpClient); continue; } } ftpClient.changeWorkingDirectory(fc.getFilePath()); log.info("The File :" + sf.getFileName() + " transfer start."); FileInputStream in = new FileInputStream(new String(sf.getFilePath().getBytes("GBK"), "ISO-8859-1")); String fileName = new String(sf.getFileName().getBytes("GBK"), "ISO-8859-1"); long startTime = System.currentTimeMillis(); boolean transferResult = ftpClient.storeFile(fileName, in); long stopTime = System.currentTimeMillis(); log.info("The File :" + sf.getFileName() + " transfer finish.Time = " + (stopTime - startTime) + " ms"); if (transferResult) { ftpDao.updateSyncFileStatus(sf.getFileId(), "01", sf.getSyncTimes() + 1); log.info(sf.getFilePath() + " has been transfered to " + key + " successfully"); } else { ftpDao.updateSyncFileStatus(sf.getFileId(), "02", sf.getSyncTimes() + 1); log.info(sf.getFilePath() + " has been transfered to " + key + " failed "); } closeFileStream(in, sf); } catch (Exception ex) { ftpDao.updateSyncFileStatus(sf.getFileId(), "02", sf.getSyncTimes() + 1); log.error("Exception in FTPTransfer thread when connect to: " + key, ex); throw ex; } finally {} } try { Iterator<FTPClient> ite = ftpClients.values().iterator(); while (ite.hasNext()) { FTPClient ftpClient = ite.next(); disconnect(ftpClient); ite.remove(); } ftpClients.clear(); } catch (Exception e) { log.error("Exception in FTPTransfer thread, clear FtpClient info:", e); throw e; } } catch (Throwable ex) { throw ex; }
}
/**
- Close connection of
client
. - @param client */ private void disconnect(FTPClient client) { try { if (client != null) { client.logout(); client.disconnect(); } } catch (Throwable e) { log.error("Exception in FTPTransfer thread when disconnect ", e); } }
/**
- Close file input stream
- @param in
- @param sf */ private void closeFileStream(FileInputStream in, SyncFile sf) { try { if (in != null) { in.close(); } } catch (Exception e) { log.error("Exception in FTPTransfer when close file: " + sf.getFilePath() + File.separator + sf.getFileName(), e); } } }
- Close connection of