作者 朱兆平

用户心跳续期及在线监测功能

package com.tianbo.warehouse.controller;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.tianbo.warehouse.controller.response.ResultJson;
import com.tianbo.warehouse.dao.KakoUserMapper;
import com.tianbo.warehouse.model.KakoUser;
import com.tianbo.warehouse.util.RedisUtils;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Date;
@RestController()
@Slf4j
public class HeartBeatController {
@Autowired
private RedisUtils redisUtils;
@Resource
private KakoUserMapper kakoUserMapper;
//token头部标识类型,Bearer代表Bearer TOKEN
static final String AUTHORIZATION_HEADER = "Bearer ";
//检查token时效是否低于标准线
static final long TOKEN_TTL_CHECK_MIN= 36000L;
//重置token时效为标准线
static final long TOKEN_TTL_ADD= 86400L;
//心跳每次续费时长
static final long HEARTBEAT_TOKEN_TTL_ADD= 10L;
@ApiOperation(value = "用户心跳接口", notes = "心跳续期")
@PostMapping("/heartbeat")
public ResultJson heartbeat(HttpServletRequest request, HttpServletResponse response){
try {
//1. 获取客户端IP,因为有反向代理所以要从头部获取代理过来的头部IP
String clientIP =null;
clientIP = request.getRemoteAddr();
String header_forwarded = request.getHeader("x-forwarded-for");
if (StringUtils.isNotBlank(header_forwarded)) {
clientIP = request.getHeader("x-forwarded-for");
// 多次反向代理后会有多个ip值,第一个ip才是真实ip
if (clientIP.contains(",")) {
clientIP = clientIP.split(",")[0];
}
}
//2.获取token
String token = request.getHeader("Authorization");
/**
* key样式
* accessToken:token
*/
if (token!=null && !token.isEmpty() && token.startsWith(AUTHORIZATION_HEADER)){
token = token.substring(AUTHORIZATION_HEADER.length());
String accessToken = token;
String userDetailStr = redisUtils.get(accessToken);
//4. 更新用户心跳时间及在线状态IP等资料
if (StringUtils.isNotBlank(userDetailStr)){
JSONObject u = JSON.parseObject(userDetailStr);
String userId= u.getString("id");
String userInfo = u.getString("name");
String username = u.getString("username");
// userDetailStr = userDetailStr.replace("@","");
/**3.续期token过期时间
* 增加过期时间,考虑到程序及网络传输中间的时间损耗,
* 每10秒一个的心跳直接续费10秒的话,token的过期时间还是会随着时间逐步减少
*/
long tokenExpireTime= redisUtils.getExpire(accessToken);
if(tokenExpireTime < TOKEN_TTL_CHECK_MIN){
redisUtils.expire(accessToken, TOKEN_TTL_ADD);
redisUtils.expire(username, TOKEN_TTL_ADD);
}else{
redisUtils.expire(accessToken,tokenExpireTime+HEARTBEAT_TOKEN_TTL_ADD);
redisUtils.expire(username, tokenExpireTime+HEARTBEAT_TOKEN_TTL_ADD);
}
/**
* 多式联运用户表
*/
// Integer dsly_userId = u.getInteger("id");
// USER user = new USER();
// user.setId(dsly_userId);
// user.setLoginip(clientIP);
// user.setLogintime(new Date());
// user.setOnline(true);
// int ii = userMapper.updateByPrimaryKeySelective(user);
KakoUser kakoUser = new KakoUser();
kakoUser.setId(userId);
kakoUser.setLoginIp(clientIP);
kakoUser.setLoginDate(new Date());
kakoUser.setOnline(true);
int i = kakoUserMapper.updateByPrimaryKeySelective(kakoUser);
return i > 0 ? new ResultJson("200","心跳成功"): new ResultJson("400","心跳失败");
}
}
return new ResultJson("400","心跳失败");
}catch (Exception e){
log.error("[HEART-BEAT-ERROR]-",e);
return new ResultJson("400","心跳失败");
}
}
}
... ...
package com.tianbo.warehouse.heatbeat;
import com.tianbo.warehouse.dao.KakoUserMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import com.tianbo.warehouse.model.KakoUser;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;
@Component
@Slf4j
public class OfflineTheardJob implements Runnable {
private static OfflineTheardJob offlineTheardJob;
private KakoUser user;
//用户掉线判定时间差
static final long OFFLINE_= 60L;
@Resource
private KakoUserMapper userMapper;
OfflineTheardJob() {
}
OfflineTheardJob(KakoUser user) {
this.user = user;
}
@PostConstruct
public void init(){
offlineTheardJob = this;
}
@Override
public void run(){
Date userLoginTime = user.getLoginDate();
if(userLoginTime!=null){
long diff= Math.abs(System.currentTimeMillis() - userLoginTime.getTime());
long s = diff / 1000;
log.info("[HEAT-BEAT]-用户{}心跳-时间相差{}秒",user.getName(),s);
if (s > OFFLINE_){
setOffline();
}
}else {
setOffline();
}
}
private void setOffline(){
user.setOnline(false);
int i = offlineTheardJob.userMapper.updateByPrimaryKeySelective(user);
if (i>0){
log.info("用户id:{},用户名称:{},从IP:{}掉线",user.getId(),user.getName(),user.getLoginIp());
}
}
}
... ...
package com.tianbo.warehouse.heatbeat;
import com.tianbo.warehouse.dao.KakoUserMapper;
import com.tianbo.warehouse.model.KakoUser;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 清理心跳超时的在线用户,判定为离线
* @author xyh
* @date
* 记得给用户ID,用户名称,用户心跳时间,用户登录ip,用户在线状态的数据库字段设置索引。
*/
@Slf4j
@Component
public class OfflineUserTask {
@Resource
private KakoUserMapper userMapper;
@Scheduled(fixedRate = 60000)
private void offlineUserHeartBeat(){
//初始化线程池
ThreadPoolExecutor threadPool = XMLThreadPoolFactory.instance();
List<KakoUser> userList = userMapper.selectOnlineUser();
if (userList!=null && !userList.isEmpty()){
log.trace("用户掉线判定开始,共需判定{}个在线标识用户",userList.size());
for (KakoUser user:userList) {
OfflineTheardJob offlineTheardJob = new OfflineTheardJob(user);
threadPool.execute(offlineTheardJob);
}
}
}
}
... ...
package com.tianbo.warehouse.heatbeat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadFactory;
public class XMLThreadFactory implements ThreadFactory {
private int counter;
private String name;
private List<String> stats;
public XMLThreadFactory(String name)
{
counter = 1;
this.name = name;
stats = new ArrayList<String>();
}
@Override
public Thread newThread(Runnable runnable)
{
Thread t = new Thread(runnable, name + "-Thread_" + counter);
counter++;
stats.add(String.format("Created thread %d with name %s on %s \n", t.getId(), t.getName(), new Date()));
return t;
}
public String getStats()
{
StringBuffer buffer = new StringBuffer();
Iterator<String> it = stats.iterator();
while (it.hasNext())
{
buffer.append(it.next());
}
return buffer.toString();
}
}
... ...
package com.tianbo.warehouse.heatbeat;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class XMLThreadPoolFactory {
private static ThreadPoolExecutor threadPool;
public static ThreadPoolExecutor instance(){
if (threadPool==null){
XMLThreadFactory xmlThreadFactory = new XMLThreadFactory("heartbeatTask");
threadPool = new ThreadPoolExecutor(12, 64,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024),
xmlThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
}
return threadPool;
}
}
... ...