作者 王勇

提交,做备份

... ... @@ -32,6 +32,12 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 引入websocket依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- SpringBoot end -->
<!-- Security start -->
... ... @@ -95,6 +101,12 @@
<!-- tools start -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>4.0.12</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
... ...
package com.sunyo.wlpt.cgonms.provide.controller;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.sunyo.wlpt.cgonms.provide.domain.*;
... ... @@ -7,14 +8,19 @@ import com.sunyo.wlpt.cgonms.provide.feign.GetCgoAsmFeign;
import com.sunyo.wlpt.cgonms.provide.feign.GetDataWareHouseFeign;
import com.sunyo.wlpt.cgonms.provide.feign.GetTransportFeign;
import com.sunyo.wlpt.cgonms.provide.response.ResultJson;
import com.sunyo.wlpt.cgonms.provide.response.ResultWs;
import com.sunyo.wlpt.cgonms.provide.service.*;
import com.sunyo.wlpt.cgonms.provide.thread.ExitThreadPoolFactory;
import com.sunyo.wlpt.cgonms.provide.websocket.WebSocketServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.propertyeditors.CustomDateEditor;
import org.springframework.web.bind.WebDataBinder;
import org.springframework.web.bind.annotation.*;
import java.io.IOException;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
... ... @@ -55,6 +61,7 @@ public class NmsController {
@Resource
GetDataWareHouseFeign getDataWareHouseFeign;
/**
* 线程数量
*/
... ... @@ -81,10 +88,18 @@ public class NmsController {
@GetMapping("/getInfo")
public ResultJson getData(
@RequestParam(value = "flightDate", required = false) Date flightDate,
@RequestParam(value = "flightNo", required = false) String flightNo) {
@RequestParam(value = "flightNo", required = false) String flightNo,
HttpServletRequest request) {
/**
* 1.取token作爲websocket的sid
* 2.每個可以返回給前端的消息通過websocket發送回去
*/
String sid = request.getHeader("Authorization");
log.info("token的值:"+sid);
final String startTime = sdf.format(new Date());
System.out.println("开始时间:" + startTime);
sendMsgByWebsocket("开始时间:" + startTime,sid);
System.out.println("出港数据,开始获取");
ResultJson resultJson = new ResultJson();
... ... @@ -110,6 +125,9 @@ public class NmsController {
for (i = 0; i < THREAD_ACCOUNT; i++) {
ResultExitData result = threadJob(resultList.get(i + index), latch, threadPool);
resultList.set(i, result);
int temp=i+index;
String resultJs=JSON.toJSONString(new ResultWs(sid,"获取数据,第"+temp+"条",result));
sendMsgByWebsocket(resultJs,sid);
}
index = index + i;
//完成一次,就等待。每次减1,为0的时候往下执行
... ... @@ -122,7 +140,9 @@ public class NmsController {
int i;
for (i = 0; i < resultList.size() - index; i++) {
ResultExitData result = threadJob(resultList.get(i + index), latch, threadPool);
resultList.set(i, result);
int temp=i+index;
String resultJs=JSON.toJSONString(new ResultWs(sid,"获取数据,第"+temp+"条",result));
sendMsgByWebsocket(resultJs,sid);
}
index = index + i;
//完成一次,就等待。每次减1,为0的时候往下执行
... ... @@ -137,6 +157,8 @@ public class NmsController {
for (int i = 0; i < resultList.size(); i++) {
ResultExitData result = threadJob(resultList.get(i), latch, threadPool);
resultList.set(i, result);
String resultJs=JSON.toJSONString(new ResultWs(sid,"获取数据,第"+i+"条",result));
sendMsgByWebsocket(resultJs,sid);
}
//完成一次,就等待。等所有的全部完成,再一起返回
latch.await();
... ... @@ -146,10 +168,14 @@ public class NmsController {
log.error("获取目录文件出错", e);
}
final String endTime = sdf.format(new Date());
System.out.println("结束时间:" + endTime);
System.out.println("出港信息,获取完毕");
System.out.println("index的值为:" + index);
final String endTime = sdf.format(new Date());
System.out.println("结束时间:" + endTime);
sendMsgByWebsocket("出港信息,获取完毕,结束时间:" + endTime,sid);
resultJson.setCode("200");
resultJson.setData(resultList);
return resultJson;
... ... @@ -278,4 +304,13 @@ public class NmsController {
dateFormat.setLenient(false);
binder.registerCustomEditor(Date.class, new CustomDateEditor(dateFormat, true));
}
private void sendMsgByWebsocket(String msg,String sid){
try {
WebSocketServer.sendInfo(msg,sid);
} catch (IOException e) {
e.printStackTrace();
}
}
}
... ...
package com.sunyo.wlpt.cgonms.provide.response;
import lombok.Data;
import java.io.Serializable;
/**
* @author 子诚
* Description:
* 时间:2020/5/28 16:59
*/
@Data
public class ResultWs<T> implements Serializable {
private static final long serialVersionUID = -1412519603409652504L;
String sid;
String message;
T data;
String status;
public ResultWs() {
}
public ResultWs(String sid, T data) {
this.sid = sid;
this.data = data;
}
public ResultWs(String sid, String message, T data) {
this.sid = sid;
this.message = message;
this.data = data;
}
public ResultWs(String sid, T data, String status) {
this.sid = sid;
this.data = data;
this.status = status;
}
public ResultWs(String sid, String message, T data, String status) {
this.sid = sid;
this.message = message;
this.data = data;
this.status = status;
}
}
... ...
package com.sunyo.wlpt.cgonms.provide.socket;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestBody;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author 子诚
* Description:WebSocket服务端
* 时间:2020/5/28 11:24
*/
//@Component
@Slf4j
//@ServerEndpoint(value = "/websocket")
public class WebSocketServer {
/**
* 在线数量
*/
private static final AtomicInteger OnlineCount = new AtomicInteger(0);
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。
*/
private static CopyOnWriteArraySet<Session> SessionSet = new CopyOnWriteArraySet<>();
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) throws IOException {
SessionSet.add(session);
// 在线数量,加1
int cnt = OnlineCount.incrementAndGet();
log.info("有连接加入,当前连接数为:{}", cnt);
SendMessage(session, "服务端回消息:连接成功");
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(Session session) {
SessionSet.remove(session);
// 在线数量,减1
int cnt = OnlineCount.decrementAndGet();
log.info("有连接关闭,当前连接数为:{}", cnt);
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) throws IOException {
log.info("来自客户端的消息:{}", message);
// JSONObject jsonObject = JSONObject.parseObject(message);
// Object flightNo = jsonObject.get("flightNo");
// Object flightDate = jsonObject.get("flightDate");
// log.info("flight: {},flightDate: {}", flightNo,flightDate);
SendMessage(session, "服务端收到消息,消息内容:" + message);
}
/**
* 出现错误
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误:{},Session ID: {}", error.getMessage(), session.getId());
}
/**
* 发送消息,实践表明,每次浏览器刷新,session会发生变化。
*
* @param session session
* @param message 消息
*/
private static void SendMessage(Session session, String message) throws IOException {
session.getBasicRemote().sendText(String.format("%s (From Server,Session ID=%s)", message, session.getId()));
}
/**
* 群发消息
*
* @param message 消息
*/
public static void BroadCastInfo(String message) throws IOException {
for (Session session : SessionSet) {
if (session.isOpen()) {
SendMessage(session, message);
}
}
}
/**
* 指定Session发送消息
*
* @param sessionId sessionId
* @param message 消息
*/
public static void SendMessage(String sessionId, String message) throws IOException {
Session session = null;
for (Session s : SessionSet) {
if (s.getId().equals(sessionId)) {
session = s;
break;
}
}
if (session != null) {
SendMessage(session, message);
} else {
log.warn("没有找到你指定ID的会话:{}", sessionId);
}
}
}
... ...
package com.sunyo.wlpt.cgonms.provide.websocket;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import javax.websocket.server.ServerEndpointConfig;
/**
*
* @author XYH
* @date 2019/12/24
*/
public class MySpringConfigurator extends ServerEndpointConfig.Configurator implements ApplicationContextAware {
private static volatile BeanFactory context;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
MySpringConfigurator.context = applicationContext;
}
@Override
public <T> T getEndpointInstance(Class<T> clazz) throws InstantiationException {
return context.getBean(clazz);
}
}
... ...
package com.sunyo.wlpt.cgonms.provide.websocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
*
* @author XYH
* @date 2019/12/23
*/
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
@Bean
public MySpringConfigurator mySpringConfigurator() {
return new MySpringConfigurator();
}
}
... ...
package com.sunyo.wlpt.cgonms.provide.websocket;
import com.sunyo.wlpt.cgonms.provide.response.ResultJson;
import org.springframework.web.bind.annotation.*;
import java.io.IOException;
/**
*
* @author XYH
* @date 2019/12/24
*/
@RestController
@RequestMapping("/checkcenter")
public class WebSocketController {
//页面请求
@GetMapping("/socket/{cid}")
public ResultJson socket(@PathVariable String cid) {
return new ResultJson("200","success",cid);
}
//推送数据接口
@ResponseBody
@RequestMapping("/socket/push/{cid}/{message}")
public ResultJson pushToWeb(@PathVariable String cid,@PathVariable String message) {
try {
WebSocketServer.sendInfo(message,cid);
} catch (IOException e) {
e.printStackTrace();
return new ResultJson("200","success",cid+"#"+e.getMessage());
}
return new ResultJson("200","success",cid);
}
}
... ...
package com.sunyo.wlpt.cgonms.provide.websocket;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.sunyo.wlpt.cgonms.provide.controller.NmsController;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by XYH on 2019/12/23.
*/
@ServerEndpoint(value = "/websocket/{sid}", configurator = MySpringConfigurator.class)
@Component
public class WebSocketServer {
static Log log = LogFactory.get(WebSocketServer.class);
//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static int onlineCount = 0;
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
//接收sid
private String sid = "";
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("sid") String sid) {
this.session = session;
webSocketSet.add(this); //加入set中
addOnlineCount(); //在线数加1
log.info("有新窗口开始监听:" + sid + ",当前在线人数为" + getOnlineCount());
this.sid = sid;
try {
String openMessage= "{\n" +
" \"sid\":\""+sid+"\",\n" +
" \"session\":\""+session+"\",\n" +
" \"message\": \"连接成功\",\n" +
" \"data\":{\"flightdate\":\"2015-02-05\",\"flightNo\":\"CV987\"}\n" +
"}";
sendMessage(openMessage);
} catch (IOException e) {
log.error("websocket IO异常");
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this); //从set中删除
subOnlineCount(); //在线数减1
log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息,以json形式传递接收,
* 如:{
* "sid":"aaa",
* "session":"",
* "message": "",
* "data":{"flightdate":"2015-02-05","flightNo":"CV987"}
*
*
* }
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("收到来自窗口" + sid + "的信息:" + message);
//群发消息
for (WebSocketServer item : webSocketSet) {
try {
item.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 群发自定义消息
*/
public static void sendInfo(String message, @PathParam("sid") String sid) throws IOException {
//log.info("推送消息到窗口"+sid+",推送内容:"+message);
for (WebSocketServer item : webSocketSet) {
try {
//这里可以设定只推送给这个sid的,为null则全部推送
if (sid == null) {
item.sendMessage(message);
} else if (item.sid.equals(sid)) {
item.sendMessage(message);
}
} catch (IOException e) {
continue;
}
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
... ...
... ... @@ -34,10 +34,10 @@ mybatis:
type-aliases-package: com.sunyo.wlpt.cgonms.provide.domain
# \u65E5\u5FD7\u6253\u5370
#logging:
logging:
# config: config/logback-dev.xml
# level:
# com.sunyo.wlpt.cgonms.provide.mapper: debug
level:
com.sunyo.wlpt.cgonms.provide.mapper: debug
#logback:
# appname: cgonms-provide
# logdir: ./log
... ...