作者 王勇

使用http-client创建MQ用户

... ... @@ -29,18 +29,26 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>http-client</artifactId>
<version>3.7.0.RELEASE</version>
</dependency>
<dependency>
... ... @@ -48,6 +56,7 @@
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<!-- SpringBoot end -->
<!-- SpringCloud start -->
<dependency>
<groupId>org.springframework.cloud</groupId>
... ...
... ... @@ -15,8 +15,18 @@ import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
*/
@Configuration
public class ElasticSearchConfig extends AbstractElasticsearchConfiguration {
private String hostname;
private Integer port;
private String scheme;
@Override
@Bean
public RestHighLevelClient elasticsearchClient()
{
RestHighLevelClient client = new RestHighLevelClient(
... ... @@ -29,7 +39,8 @@ public class ElasticSearchConfig extends AbstractElasticsearchConfiguration {
}
@Bean
public ElasticsearchRestTemplate elasticsearchRestTemplate() {
public ElasticsearchRestTemplate elasticsearchRestTemplate()
{
return new ElasticsearchRestTemplate(elasticsearchClient());
}
}
... ...
... ... @@ -6,7 +6,10 @@ import com.sunyo.wlpt.message.bus.service.service.UserInfoService;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.TimeoutException;
/**
* @author 子诚
... ... @@ -40,30 +43,12 @@ public class UserInfoController {
* @return {@link ResultJson}
*/
@PostMapping("/insert")
public ResultJson insertUserByEntity(UserInfo userInfo)
public ResultJson insertUserByEntity(@RequestBody UserInfo userInfo) throws IOException, URISyntaxException, TimeoutException
{
return userInfoService.insertSelective(userInfo);
}
/**
* @param description 描述
* @param username 用户名称
* @param password 密码
* @return {@link ResultJson}
*/
@PostMapping("/insertByParam")
public ResultJson insertUserByParam(@RequestParam(value = "description", required = false) String description,
String username, String password)
{
// 接收参数
UserInfo userInfo = UserInfo.builder().username(username)
.password(password)
.description(description)
.build();
return userInfoService.insertSelective(userInfo);
}
/**
* 编辑用户信息
*
* @param userInfo {@link UserInfo}
... ...
... ... @@ -3,7 +3,10 @@ package com.sunyo.wlpt.message.bus.service.service;
import com.sunyo.wlpt.message.bus.service.domain.UserInfo;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.TimeoutException;
/**
* @author 子诚
... ... @@ -34,7 +37,7 @@ public interface UserInfoService {
* @param record the record
* @return insert count
*/
ResultJson insertSelective(UserInfo record);
ResultJson insertSelective(UserInfo record) throws IOException, URISyntaxException, TimeoutException;
/**
* 查询,根据主键
... ...
package com.sunyo.wlpt.message.bus.service.service.impl;
import com.rabbitmq.http.client.Client;
import com.rabbitmq.http.client.domain.TopicPermissions;
import com.rabbitmq.http.client.domain.UserPermissions;
import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
import com.sunyo.wlpt.message.bus.service.domain.BusServer;
import com.sunyo.wlpt.message.bus.service.domain.UserInfo;
import com.sunyo.wlpt.message.bus.service.domain.VirtualHost;
import com.sunyo.wlpt.message.bus.service.mapper.UserInfoMapper;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.RabbitUtils;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import com.sunyo.wlpt.message.bus.service.service.BusQueueService;
import com.sunyo.wlpt.message.bus.service.service.BusServerService;
import com.sunyo.wlpt.message.bus.service.service.UserInfoService;
import com.sunyo.wlpt.message.bus.service.service.VirtualHostService;
import com.sunyo.wlpt.message.bus.service.utils.IdUtils;
import io.netty.util.internal.StringUtil;
import org.springframework.util.DigestUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.stereotype.Service;
import org.springframework.util.DigestUtils;
import javax.annotation.Resource;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
/**
* @author 子诚
... ... @@ -23,6 +40,36 @@ public class UserInfoServiceImpl implements UserInfoService {
@Resource
private UserInfoMapper userInfoMapper;
@Resource
private RabbitProperties rabbitProperties;
@Resource
private RabbitUtils rabbitUtils;
@Resource
private BusServerService busServerService;
@Resource
private VirtualHostService virtualHostService;
@Resource
private BusQueueService busQueueService;
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String rabbitUsername;
@Value("${spring.rabbitmq.password}")
private String rabbitPassword;
@Value("${spring.rabbitmq.virtual-host}")
private String vHost;
@Override
public int deleteByPrimaryKey(String id)
{
... ... @@ -36,21 +83,84 @@ public class UserInfoServiceImpl implements UserInfoService {
}
@Override
public ResultJson insertSelective(UserInfo userInfo)
public ResultJson insertSelective(UserInfo userInfo) throws IOException, URISyntaxException, TimeoutException
{
String password = userInfo.getPassword();
VirtualHost virtualHost = virtualHostService.selectByPrimaryKey(userInfo.getVirtualHostId());
userInfo.setVirtualHostName(virtualHost.getVirtualHostName());
BusServer busServer = busServerService.selectByPrimaryKey(userInfo.getServerId());
userInfo.setServerName(busServer.getServerName());
ResultJson validateResult = validateUser(userInfo);
if (!"200".equals(validateUser(userInfo).getCode())) {
return validateResult;
}
if (StringUtil.isNullOrEmpty(userInfo.getId())) {
userInfo.setId(IdUtils.generateId());
}
userInfo.setId(IdUtils.generateId());
userInfo.setPassword(DigestUtils.md5DigestAsHex(userInfo.getPassword().getBytes()));
return userInfoMapper.insertSelective(userInfo) > 0
int num = userInfoMapper.insertSelective(userInfo);
// 1.根据用户信息,在MQ创建用户
addMQUser(userInfo, password);
// 3.在对应虚拟机下创建队列
createQueue(userInfo);
return num > 0
? new ResultJson<>("200", "添加用户信息,成功")
: new ResultJson<>("500", "添加用户信息,失败");
}
/**
* 根据用户信息,添加队列
*
* @param userInfo
* @throws IOException
* @throws TimeoutException
*/
void createQueue(UserInfo userInfo) throws IOException, TimeoutException
{
BusQueue busQueue = BusQueue.builder()
.id(IdUtils.generateId())
.userId(userInfo.getId())
.username(userInfo.getUsername())
.queueName(userInfo.getUsername() + "_R")
.virtualHostId(userInfo.getVirtualHostId())
.durability(true)
.autoDelete(false)
.description(userInfo.getUsername() + ":专属队列")
.build();
busQueueService.insertSelective(busQueue);
}
/**
* 添加MQ用户信息,并配置MQ用户与虚拟主机的关系
*
* @param userInfo {@link UserInfo}
*/
public void addMQUser(UserInfo userInfo, String password) throws MalformedURLException, URISyntaxException
{
String username = userInfo.getUsername();
String virtualHostName = userInfo.getVirtualHostName();
String url = "http://" + host + ":15672/api";
Client client = new Client(url, rabbitUsername, rabbitPassword);
ArrayList<String> list = new ArrayList<>();
client.createUser(username, password.toCharArray(), list);
UserPermissions p = new UserPermissions();
p.setConfigure(username + ".*");
p.setRead(username + ".*");
p.setWrite(username + ".*");
client.updatePermissions(virtualHostName, username, p);
TopicPermissions topicPermissions = new TopicPermissions();
topicPermissions.setVhost(virtualHostName);
topicPermissions.setExchange("");
topicPermissions.setRead(".*");
topicPermissions.setWrite(".*");
client.updateTopicPermissions(virtualHostName, username, topicPermissions);
}
@Override
public UserInfo selectByPrimaryKey(String id)
... ... @@ -70,8 +180,6 @@ public class UserInfoServiceImpl implements UserInfoService {
if (StringUtil.isNullOrEmpty(userInfo.getId())) {
return new ResultJson<>("400", "该用户不存在");
}
return userInfoMapper.updateByPrimaryKeySelective(userInfo) > 0
? new ResultJson<>("200", "修改用户信息,成功")
: new ResultJson<>("500", "修改用户信息,失败");
... ... @@ -85,6 +193,9 @@ public class UserInfoServiceImpl implements UserInfoService {
if (StringUtil.isNullOrEmpty(userInfo.getUsername()) || StringUtil.isNullOrEmpty(userInfo.getPassword())) {
return new ResultJson<>("400", "用户名称和密码,不能为空");
}
if (StringUtil.isNullOrEmpty(userInfo.getServerName()) || StringUtil.isNullOrEmpty(userInfo.getVirtualHostName())) {
return new ResultJson<>("400", "服务器和虚拟主机,不能为空");
}
String userId = userInfo.getId();
if (!StringUtil.isNullOrEmpty(userId)) {
UserInfo oldUserInfo = userInfoMapper.selectByPrimaryKey(userId);
... ...