0
点赞
收藏
分享

微信扫一扫

QT的TCP通讯

是波波呀 01-20 12:00 阅读 5

目录

引言

一、docker中安装Elasticsearch

1、创建es专有的网络

2、开放端口

3、在es-net网络上安装es和kibana

4、可能出现的问题

5、测试

6、安装IK分词器

7、测试IK分词器

二、结合业务实战

1、准备依赖

2、配置yml

3、读取yml配置

4、准备es配置类

5、编写测试代码

6、使用mq异步修改es的表数据

7、实现搜索功能

三、简单介绍Elasticsearch

1、表结构与Mysql的对比

2、Mapping映射属性

3、索引库的CRUD

创建索引库和映射( * ):

查询索引库:

修改索引库:

删除索引库:

4、文档操作的CRUD

新增文档:

查询文档:

删除文档:

修改文档:

批处理:

四、RestAPI

1、初始化RestClient

(1)引入es的RestHighLevelClient依赖

(2)初始化RestHighLevelClient

2、在kibana的客户端准备创建索引库

3、Java客户端创建索引库

五、RestClient操作文档(重在方法理解)

1、准备实体类

2、Java实现CRUD(重点)

(1)增:

(2)删:

(3)改:

(4)查:

注意:

3、批量导入文档:

六、JavaRestClient查询

基本步骤(重点)

1、叶子查询

2、复合查询

3、排序和分页

4、高亮

七、数据聚合


引言

  • Mysql:擅长事务类型操作,可以确保数据的安全和一致性

  • Elasticsearch:擅长海量数据的搜索、分析、计算

一、docker中安装Elasticsearch

先说命令,后面再说可能会出现的问题。

1、创建es专有的网络

因为测试需要部署kibana容器作为一个图形化界面,创建一个网络方便让es和kibana容器互联。

docker network create es-net
2、开放端口

宝塔:

腾讯云:

5601
9200
9300
3、在es-net网络上安装es和kibana

这里我安装7.12.1版本的es和kibana,因为之前学习有现有的镜像包安装更快

分别执行这两条指令:

docker run -d 

–name es
-e “ES_JAVA_OPTS=-Xms512m -Xmx512m”
-e “discovery.type=single-node”
-v es-data:/usr/share/elasticsearch/data
-v es-plugins:/usr/share/elasticsearch/plugins
–privileged
–network es-net
-p 9200:9200
-p 9300:9300
elasticsearch:7.12.1

docker run -d 

–name kibana
-e ELASTICSEARCH_HOSTS=http://es:9200
–network=es-net
-p 5601:5601
kibana:7.12.1

4、可能出现的问题

这里我是在宝塔上部署的,由于我之前创建容器的时候没有开启防火墙的端口,应该先去开启防火墙再去安装docker容器,我这些流程出现混淆,导致出现下面这些类似的报错:

设置失败!500 Server Error for http+docker://localhost/v1.45/containers/1e013

Error response from daemon: Failed to Setup IP tables:
Unable to enable SKIP DNAT rule:
(iptables failed: iptables --wait -t nat -I DOCKER -i br-b649822bbcff -j RETURN: iptables:
No chain/target/match by that name. (exit status 1))

解决办法是先去开放端口然后重启docker服务再去安装es和kibana

重启docker:

systemctl restart docker

然后再去重新安装就行

可以参考:【DockerCE】运行Docker的服务器报“Failed to Setup IP tables“的解决方法_error response from daemon: failed to setup ip tab-CSDN博客

5、测试

es:

服务器ip:9200

kibana:

服务器ip:5601

选择Explore on my own之后,进入主页面:

测试安装成功!

查看docker:

或者使用指令:

docker ps

记住kibana是用于你开发的时候测试使用,比较方便的图形化界面,实际开发也只是用es。

6、安装IK分词器
docker exec -it es ./bin/elasticsearch-plugin  install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.12.1/elasticsearch-analysis-ik-7.12.1.zip

重启es容器使其生效:

docker restart es
7、测试IK分词器

进入Dev tools:

先测试Elasticsearch官方提供的标准分词器:

POST /_analyze
{
"analyzer": "standard",
"text": "在CSDN学习java太棒了"
}

测试IK分词器:

POST /_analyze
{
"analyzer": "ik_smart",
"text": "在CSDN学习java太棒了"
}

测试成功,安装分词器成功!

二、结合业务实战

一般使用easy-es比较多,下面是基于原生es的操作,如果想看easy-es可以参考我照片博客:

SpringBoot中easy-es入门实战(结合官方文档版)-CSDN博客文章浏览阅读386次,点赞10次,收藏15次。本文主要是参考官方文档进行编写,记录一下自己一些比较常使用easy-es使用方法和内容,其实他的使用和MybatisPlus差不多的,之前我还写了一些关于es的博客可以参考一下:Springboot中使用Elasticsearch(部署+使用+讲解 最完整)_spring boot elasticsearch-CSDN博客最完整最详细的springboot中使用es,在前面有服务器部署es相关的东西,在后面有使用java的实战,对于实战的方法使用结合官网深度去研究和讲解。[这里是图片009]https://blog.csdn.net/qq_73440769/article/details/144790200?spm=1001.2014.3001.5501

原理:当mysql数据发生改变时发送消息到mq,es服务接收消息,进行更新

es操作步骤:

1.创建Request
2.准备请求参数
3.聚合参数
4.发送请求
5.解析聚合结果
5.1.获取聚合
5.2.获取聚合中的桶
5.3.遍历桶内数据
1、准备依赖
    <properties>
<elasticsearch.version>7.12.1</elasticsearch.version>
</properties>

<dependencyManagement>
<dependencies>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>

</dependencies>
</dependencyManagement>
2、配置yml
quick:
elasticsearch:
host: ${quick.elasticsearch.host} # 服务器IP地址
port: ${quick.elasticsearch.port} # 服务器端口号
3、读取yml配置

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/*
* @读取yml配置
*/

@Component
@Data
@ConfigurationProperties(prefix = "quick.elasticsearch")
public class ElasticSearchProperties {

// es地址
private String host;
// es端口
private int port;

}
4、准备es配置类

import com.quick.properties.ElasticSearchProperties;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* es配置类
*/

@Configuration
public class ElasticSearchConfig {

@Bean(destroyMethod = "close") //程序开始时交给bean对象注入, 指定了当bean被销毁时应该调用其close方法
@ConditionalOnMissingBean//保证spring容器里面只有一个utils对象(当没有这个bean对象再去创建,有就没必要去创建了)
public RestHighLevelClient client(ElasticSearchProperties elasticSearchProperties){
return new RestHighLevelClient(RestClient.builder(
new HttpHost(
elasticSearchProperties.getHost(),
elasticSearchProperties.getPort(),
"http"
)
));
}

}
5、编写测试代码

UserDoc:

/*
User索引库实体类
*/


@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class UserDoc {

// 这里设计为String类型,因为在发送消息的时候是以字符的形式
@Schema(description = "用户ID")
private String id;

@Schema(description = "用户编号")
private String quickUserId;

@Schema(description = "姓名")
private String name;

@Schema(description = "手机号")
private String phone;

@Schema(description = "关注数")
private Long follow;

@Schema(description = "粉丝数")
private Long fan;

@Schema(description = "性别 0 女 1 男")
private String sex;

@Schema(description = "头像")
private String avatar;

@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@Schema(description = "注册时间")
private LocalDateTime createTime;

@Schema(description = "用龄,单位:年")
private Long useTime;

@Schema(description = "收藏数")
private Long collectNumber;

@Schema(description = "评分数")
private Long markNumber;

@Schema(description = "个人简介")
private String briefIntroduction;

}

UserDocHandleResponseVO:

/**
* 用户文档处理响应
*/

@Data
@Builder
public class UserDocHandleResponseVO {

List<UserDoc>userDocList;
Long total;

}

controller:

@RestController
@RequestMapping("/user/es-user")
@Tag(name="C端-用户es相关接口")
@Slf4j
public class EsUserController {

@Resource
private UserService userService;

@Operation(summary = "es查询所有用户")
@GetMapping("/query-all-user")
public Result<UserDocHandleResponseVO> queryAllUser() throws IOException {
return Result.success(userService.queryAllUser());
}

}

service:

public interface UserService extends IService<User> {
UserDocHandleResponseVO queryAllUser() throws IOException;
}

impl:

@Service
@Slf4j
public class UserServiceImpl extends ServiceImpl<UserMapper,User> implements UserService {

@Resource
private RestHighLevelClient restHighLevelClient;


@Override
public UserDocHandleResponseVO queryAllUser() throws IOException {
// 1.创建Request
SearchRequest request = new SearchRequest("user");
// 2.组织请求参数
request.source().query(QueryBuilders.matchAllQuery());
// 3.发送请求
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
// 4.解析响应
return handleResponse(response);
}

private static UserDocHandleResponseVO handleResponse(SearchResponse response) {
SearchHits searchHits = response.getHits();
// 4.1 获取总条数
long total = 0L;
if (searchHits.getTotalHits() != null) {
total = searchHits.getTotalHits().value;
}
// 4.2 获取命中的数据
SearchHit[] hits = searchHits.getHits();
List<UserDoc> userDocList=new ArrayList<>();
for (SearchHit hit : hits) {
// 4.2.1 获取source结果(结果是一个json对象)
String json = hit.getSourceAsString();
// 4.2.2 转为实体对象
UserDoc userDoc = JSONUtil.toBean(json, UserDoc.class);
userDocList.add(userDoc);

}
System.out.println("userDocList = " + userDocList);
System.out.println("total = " + total);
return UserDocHandleResponseVO.builder()
.userDocList(userDocList)
.total(total)
.build();
}

}

测试:

测试成功!!!

这里我将解析es的代码封装成一个工具类的方法

import com.quick.es.GenericSearchResponseVO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;

import com.alibaba.fastjson.JSON;

import java.util.ArrayList;
import java.util.List;

/**
* es处理搜索响应的工具类
*/

public class SearchResponseUtil {

/**
* 处理ES搜索响应
*
* @param response ES搜索响应对象
* @param clazz 目标文档对象的类类型
* @return 封装后的搜索响应对象
* @param <T> 泛型,表示文档的类型,用于封装返回对应类型文档的返回结果
*/

public static <T> GenericSearchResponseVO<T> handleResponse(SearchResponse response, Class<T> clazz) {
// 获取搜索命中的结果
SearchHits searchHits = response.getHits();
// 初始化总命中数为0
long total = 0L;
// 如果总命中数不为空,则赋值
if (searchHits.getTotalHits() != null) {
total = searchHits.getTotalHits().value;
}

// 初始化文档列表
List<T> docList = new ArrayList<>();
// 获取所有命中的文档
SearchHit[] hits = searchHits.getHits();
// 遍历所有命中的文档
for (SearchHit hit : hits) {
// 获取文档的JSON字符串
String json = hit.getSourceAsString();
// 将JSON字符串解析为目标类型的对象
//T doc = JSON.parseObject(json, clazz); 使用这个的话如果反序列化会报错
T doc = JSONUtil.toBean(json, clazz);
// 将解析后的文档对象添加到列表中
docList.add(doc);
}

// 构建并返回封装后的搜索响应对象
return GenericSearchResponseVO.<T>builder()
.total(total) // 设置总命中数
.docList(docList) // 设置文档列表
.build();
}
}

T doc = JSON.parseObject(json, clazz); 如果工具类用这个解析json的话反序列化会报错,具体怎么解决欢迎在评论区说一下。

将返回的对象封装成一个目标返回对象

@Data
@Builder
public class GenericSearchResponseVO<T> {
private Long total;
private List<T> docList;
}

修改impl的代码

    @Override
public GenericSearchResponseVO<UserDoc> queryAllUser() throws IOException {
// 1.创建Request
SearchRequest request = new SearchRequest("user");
// 2.组织请求参数
request.source().query(QueryBuilders.matchAllQuery());
// 3.发送请求
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
// 4.解析响应
/*return handleResponse(response);*/
return SearchResponseUtil.handleResponse(response, UserDoc.class);
}
6、使用mq异步修改es的表数据

service:

public interface EsUserDocService {

GenericSearchResponseVO<UserDoc> queryAllUserDoc() throws IOException;

// 修改UserDoc
void updateUserDocByOne(UserDoc userDoc) throws IOException;

}

impl:

import cn.hutool.json.JSONUtil;
import com.quick.vo.GenericSearchResponseVO;
import com.quick.entity.UserDoc;
import com.quick.service.EsUserDocService;
import com.quick.utils.ElasticsearchUtil;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Slf4j
@Service
public class EsUserDocServiceImpl implements EsUserDocService {

@Resource
private RestHighLevelClient restHighLevelClient;


@Override
public GenericSearchResponseVO<UserDoc> queryAllUserDoc() throws IOException {
// 页码
int pageNumber = 2;
// 每页数量
int pageSize = 10;
// 计算起始位置
int from = ElasticsearchUtil.calculateFrom(pageNumber,pageSize);

// 1.创建Request
SearchRequest request = new SearchRequest("user");
// 2.组织请求参数
request.source()
.query(QueryBuilders.matchAllQuery())
.from(from)
.size(pageSize);
// 3.发送请求
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
// 4.解析响应
/*return handleResponse(response);*/
return ElasticsearchUtil.handleResponse(response, UserDoc.class,pageNumber);
}

@Override
public void updateUserDocByOne(UserDoc userDoc) throws IOException {
// 1.准备Request
UpdateRequest request = new UpdateRequest("user",userDoc.getId() );
// 2.准备请求参数
// 将UserDoc转json
String doc = JSONUtil.toJsonStr(userDoc);
// 准备Json文档,XContentType.JSON表示json格式
request.doc(doc, XContentType.JSON);
// 3.发送请求
restHighLevelClient.update(request, RequestOptions.DEFAULT);
log.info("更新用户在es中数据成功,修改后文档为:{}",doc);

}

}

编写mq监听:

import com.quick.entity.UserDoc;
import com.quick.service.EsUserDocService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* Es中UserDoc相关 接收消息
*/

@Component
@RequiredArgsConstructor
public class EsUserDocListener {

public static final String UPDATE_USER_DOC_QUEUE_NAME = "userDoc.updateUserDocByOne.queue";
public static final String UPDATE_USER_DOC_EXCHANGE_NAME = "updateUserDocByOne.direct";
public static final String UPDATE_USER_DOC_ROUTING_KEY = "updateUserDocByOne.success";


private final EsUserDocService esUserDocService;

@RabbitListener(bindings = @QueueBinding(
value=@Queue(name = UPDATE_USER_DOC_QUEUE_NAME,durable = "true"),
exchange = @Exchange(name = UPDATE_USER_DOC_EXCHANGE_NAME),
key = UPDATE_USER_DOC_ROUTING_KEY
),
// 在@RabbitListener注解中指定容器工厂
containerFactory = "customContainerFactory")

public void listenUpdateUserDoc(UserDoc userDoc) throws IOException {
esUserDocService.updateUserDocByOne(userDoc);
}

}

编写实现修改操作的发送消息端:

    @Override
public void update(UserDTO userDTO) {
User user=userMapper.selectById(userDTO.getUserId());
BeanUtils.copyProperties(userDTO,user);
userMapper.updateById(user);

UserDoc userDoc = BeanUtil.copyProperties(user, UserDoc.class);
//发送mq异步消息修改
try {
rabbitTemplate.convertAndSend(
EsUserDocListener.UPDATE_USER_DOC_EXCHANGE_NAME, // 交换机名称
EsUserDocListener.UPDATE_USER_DOC_ROUTING_KEY, // 路由键
userDoc // 消息内容
);
} catch (AmqpException e) {
log.error("发送消息失败", e);
}

}

测试:

7、实现搜索功能

controller:

    @Operation(summary = "搜索功能")
@GetMapping("/search")
public Result<GenericSearchResponseVO<UserDoc>> search(
@RequestParam(required = false) String searchKeyword,
@RequestParam(required = false) Integer pageNumber,
@RequestParam(required = false) Integer pageSize
) throws IOException {
return Result.success(esUserDocService.search(searchKeyword,pageNumber,pageSize));
}

service:

GenericSearchResponseVO<UserDoc> search(String searchKeyword,Integer pageNumber,Integer pageSize)throws IOException;

impl:

    @Override
public GenericSearchResponseVO<UserDoc> search(String searchKeyword,Integer pageNumber,Integer pageSize) throws IOException{

// 如果不传就是默认
if (pageNumber == null) {
// 页码
pageNumber = 1;
}
if (pageSize == null) {
// 每页数量
pageSize = 10;
}
// 计算起始位置
int from = ElasticsearchUtil.calculateFrom(pageNumber,pageSize);
// 1.创建Request
SearchRequest request=new SearchRequest("user");

// 2.组织请求参数
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
if (searchKeyword != null && !searchKeyword.isEmpty()) {
boolQueryBuilder.must(QueryBuilders.multiMatchQuery(searchKeyword, "name", "briefIntroduction", "phone","quickUserId"));
}
request.source()
.query(boolQueryBuilder) // 查询条件
.from(from)
.size(pageSize)
.sort("fan", SortOrder.DESC);
// 3.发送请求
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
// 4.解析响应
return ElasticsearchUtil.handleResponse(response, UserDoc.class);

}

测试:

quickUserId精确查询:

默认按粉丝最多排序:

名字词条查询:

其他的es业务逻辑也是差不多这两个实现, 可以参考后面的一些语法进行对应的操作,后续我还会持续更新一些es拓展和升级的操作。

三、简单介绍Elasticsearch

这里只做演示和介绍,如果只需要了解在Java中使用可跳过,去看第四部分,但是这些还是很有必要了解一下。

具体的DSL操作参考:Docs

1、表结构与Mysql的对比

MySQL

Elasticsearch

说明

Table

Index

索引(index),就是文档的集合,类似数据库的表(table)

Row

Document

文档(Document),就是一条条的数据,类似数据库中的行(Row),文档都是JSON格式

Column

Field

字段(Field),就是JSON文档中的字段,类似数据库中的列(Column)

Schema

Mapping

Mapping(映射)是索引中文档的约束,例如字段类型约束。类似数据库的表结构(Schema)

SQL

DSL

DSL是elasticsearch提供的JSON风格的请求语句,用来操作elasticsearch,实现CRUD

2、Mapping映射属性

Mapping是对索引库中文档的约束,常见的Mapping属性包括:

3、索引库的CRUD
创建索引库和映射* ):

基本语法

  • 请求方式:PUT

  • 请求路径:/索引库名,可以自定义

  • 请求参数:mapping映射

示例:

PUT /索引库名称
{
"mappings": {
"properties": {
"字段名":{
"type": "text",
"analyzer": "ik_smart"
},
"字段名2":{
"type": "keyword",
"index": "false"
},
"字段名3":{
"properties": {
"子字段": {
"type": "keyword"
}
}
},
// ...略
}
}
}

索引库的其他CRUD如下:

查询索引库
GET /索引库名
修改索引库
PUT /索引库名/_mapping
{
"properties": {
"新字段名":{
"type": "integer"
}
}
}
删除索引库
DELETE /索引库名
4、文档操作的CRUD

了解即可,毕竟是使用Java实现比较实际,但是语法的熟悉还是很重要的,就像Mysql有mybatisplus,但是还要了解sql。

新增文档:
POST /索引库名/_doc/文档id
{
"字段1": "值1",
"字段2": "值2",
"字段3": {
"子属性1": "值3",
"子属性2": "值4"
},
}
查询文档:
GET /{索引库名称}/_doc/{id}
删除文档:
DELETE /{索引库名}/_doc/id
修改文档:

全量修改(覆盖之前,如果改id不存在则为新增):

PUT /{索引库名}/_doc/文档id
{
"字段1": "值1",
"字段2": "值2",
// ... 略
}

局部修改(局部某个字段):

POST /{索引库名}/_update/文档id
{
"doc": {
"字段名": "新的值",
}
}
批处理:

批处理采用POST请求,基本语法如下:

POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }

其中:

四、RestAPI

为什么要使用:

ES官方提供了各种不同语言的客户端,用来操作ES。这些客户端的本质就是组装DSL语句,通过http请求发送给ES。

官方文档地址:Elasticsearch Clients | Elastic

针对我们的版本:

在这里有该版本的各种操作API,可以参考来写代码

1、初始化RestClient
(1)引入esRestHighLevelClient依赖

依赖:

    <properties>
<elasticsearch.version>7.12.1</elasticsearch.version>
</properties>

<dependencyManagement>
<dependencies>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>

</dependencies>
</dependencyManagement>
(2)初始化RestHighLevelClient

基本语法如下:

RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(
HttpHost.create("http://服务器IP地址:9200")
));

做一个测试类测试一下:

成功有输出,测试代码参考如下:

package com.quick.es;

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;

public class EsTest {

private RestHighLevelClient client;

// 初始化
@BeforeEach
void setUp() {
this.client = new RestHighLevelClient(RestClient.builder(
HttpHost.create("http://你的服务器IP地址:9200")
));
}

// 测试连接es
@Test
void testConnect() {
System.out.println("client: "+client);
}


// 销毁
@AfterEach
void tearDown() throws IOException {
this.client.close();
}

}
2、在kibana的客户端准备创建索引库

下面为我对应我的用户表创建的索引库

PUT /user
{
"mappings": {
"properties": {
"id": {
"type": "keyword"
},
"quick_user_id":{
"type": "keyword"
},
"name":{
"type": "text",
"analyzer": "ik_max_word"
},
"sex":{
"type": "keyword"
},
"avatar":{
"type": "keyword",
"index": false
},
"phone":{
"type": "text",
"analyzer": "ik_max_word"
},
"follow":{
"type": "integer"
},
"fan":{
"type": "integer"
},
"use_time":{
"type": "integer"
},
"collect_number":{
"type": "integer",
"index": false
},
"mark_number":{
"type": "integer",
"index": false
},
"brief_introduction":{
"type": "text",
"index": false
},
"create_time":{
"type": "date"
}
}
}
}

拿着上面这些创建好的映射在Java客户端创建

3、Java客户端创建索引库

关于一些知识点,这里我拿之前在b站学习的PPT的内容展示一下,我觉得这个已经很直观的体现出创建索引库的一些解释:

下面给出测试类所有代码,记得服务器IP地址替换成自己的。

package com.quick.es;

import org.apache.http.HttpHost;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;

public class EsTest {

private RestHighLevelClient client;

static final String USER_MAPPING_TEMPLATE ="{
"
+
" "mappings": {
"
+
" "properties": {
"
+
" "id": {
"
+
" "type": "keyword"
"
+
" },
"
+
" "quick_user_id":{
"
+
" "type": "keyword"
"
+
" },
"
+
" "name":{
"
+
" "type": "text",
"
+
" "analyzer": "ik_max_word"
"
+
" },
"
+
" "sex":{
"
+
" "type": "keyword"
"
+
" },
"
+
" "avatar":{
"
+
" "type": "keyword",
"
+
" "index": false
"
+
" },
"
+
" "phone":{
"
+
" "type": "text",
"
+
" "analyzer": "ik_max_word"
"
+
" },
"
+
" "follow":{
"
+
" "type": "integer"
"
+
" },
"
+
" "fan":{
"
+
" "type": "integer"
"
+
" },
"
+
" "use_time":{
"
+
" "type": "integer"
"
+
" },
"
+
" "collect_number":{
"
+
" "type": "integer",
"
+
" "index": false
"
+
" },
"
+
" "mark_number":{
"
+
" "type": "integer",
"
+
" "index": false
"
+
" },
"
+
" "brief_introduction":{
"
+
" "type": "text",
"
+
" "index": false
"
+
" },
"
+
" "create_time":{
"
+
" "type": "date"
"
+
" }
"
+
" }
"
+
" }
"
+
"}";

// 初始化
@BeforeEach
void setUp() {
this.client = new RestHighLevelClient(RestClient.builder(
HttpHost.create("http://服务器IP地址:9200")
));
}

// 测试连接es
@Test
void testConnect() {
System.out.println("client: "+client);
}

// 创建索引库
@Test
void testCreateIndex() throws IOException {
// 1.创建Request对象
CreateIndexRequest request = new CreateIndexRequest("user");
// 2.准备请求参数
request.source(USER_MAPPING_TEMPLATE, XContentType.JSON);
// 3.发送请求
client.indices().create(request, RequestOptions.DEFAULT);
}


// 销毁
@AfterEach
void tearDown() throws IOException {
this.client.close();
}

}

测试:

去kibana客户端测试:

创建成功!

五、RestClient操作文档(重在方法理解)

1、准备实体类

准备一个对接索引库的es实体类

/*
User索引库实体类
*/


@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class UserDoc {

// 这里设计为String类型,因为在发送消息的时候是以字符的形式
@Schema(description = "用户ID")
private String id;

@Schema(description = "用户编号")
private String quickUserId;

@Schema(description = "姓名")
private String name;

@Schema(description = "手机号")
private String phone;

@Schema(description = "关注数")
private Long follow;

@Schema(description = "粉丝数")
private Long fan;

@Schema(description = "性别 0 女 1 男")
private String sex;

@Schema(description = "头像")
private String avatar;

@Schema(description = "注册时间")
private LocalDateTime createTime;

@Schema(description = "用龄,单位:年")
private Long useTime;

@Schema(description = "收藏数")
private Long collectNumber;

@Schema(description = "评分数")
private Long markNumber;

@Schema(description = "个人简介")
private String briefIntroduction;

}

这里的id用的是String类型,因为使用RestClient去根据id查,需要传过去的是字符类型的数据,所以在这里需要进行一个转变。

2、Java实现CRUD(重点)

代码:

package com.quick.es;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.json.JSONUtil;
import com.quick.entity.User;
import com.quick.service.UserService;
import jakarta.annotation.Resource;
import org.apache.http.HttpHost;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

@SpringBootTest(properties = "spring.profiles.active=dev")
public class EsDocTest {

@Resource
private UserService userService;

private RestHighLevelClient client;


// 初始化
@BeforeEach
void setUp() {
this.client = new RestHighLevelClient(RestClient.builder(
HttpHost.create("http://服务器IP地址:9200")
));
}

// 测试连接es
@Test
void testConnect() {
System.out.println("client: "+client);
}

// 测试添加文档信息
@Test
void testAddDocument() throws IOException {
// 1.根据id查询商品数据
User user = userService.getById(1L);
System.out.println("user = " + user);
// 2.转换为文档类型
UserDoc userDoc = BeanUtil.copyProperties(user, UserDoc.class);
System.out.println("userDoc = " + userDoc);
// 3.将UserDoc转json
String doc = JSONUtil.toJsonStr(userDoc);

// 1.准备Request对象
/*IndexRequest request = new IndexRequest("user").id(String.valueOf(userDoc.getId()));*/
IndexRequest request = new IndexRequest("user").id(userDoc.getId());
// 2.准备Json文档
request.source(doc, XContentType.JSON);
// 3.发送请求
client.index(request, RequestOptions.DEFAULT);
}

// 测试删除文档
@Test
void testDeleteDocument() throws IOException {
// 1.准备Request,两个参数,第一个是索引库名,第二个是文档id
DeleteRequest request = new DeleteRequest("user", "1");
// 2.发送请求
client.delete(request, RequestOptions.DEFAULT);
}

// 测试更新文档
@Test
void testUpdateDocument() throws IOException {
// 1.准备Request
UpdateRequest request = new UpdateRequest("user", "1");
// 2.准备请求参数

// 方法一
/*request.doc(
"userTime", 1,
"briefIntroduction", "hello world"
);*/


// 方法二
/*UserDoc userDoc=new UserDoc();
userDoc.setUseTime(1L);
userDoc.setBriefIntroduction("hello world");

// 构造参数
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("userTime", userDoc.getUseTime());
jsonMap.put("briefIntroduction", userDoc.getBriefIntroduction());
// 将数据放入请求参数
request.doc(jsonMap);*/


// 方法三
UserDoc userDoc=new UserDoc();
//userDoc.setUseTime(1L);
userDoc.setBriefIntroduction("hello world!");
// 将UserDoc转json
String doc = JSONUtil.toJsonStr(userDoc);
// 准备Json文档,XContentType.JSON表示json格式
request.doc(doc, XContentType.JSON);

// 3.发送请求
client.update(request, RequestOptions.DEFAULT);
}

// 测试根据id查询文档
@Test
void testGetDocumentById() throws IOException {
// 1.准备Request对象
GetRequest request = new GetRequest("user").id("1");
// 2.发送请求
GetResponse response = client.get(request, RequestOptions.DEFAULT);
// 3.获取响应结果中的source
String json = response.getSourceAsString();

UserDoc userDoc = JSONUtil.toBean(json, UserDoc.class);
System.out.println("userDoc= " + userDoc);
}


// 销毁
@AfterEach
void tearDown() throws IOException {
this.client.close();
}

}

其中:

(1)增:
// 测试添加文档信息
@Test
void testAddDocument() throws IOException {
// 1.根据id查询商品数据
User user = userService.getById(1L);
System.out.println("user = " + user);
// 2.转换为文档类型
UserDoc userDoc = BeanUtil.copyProperties(user, UserDoc.class);
System.out.println("userDoc = " + userDoc);
// 3.将UserDoc转json
String doc = JSONUtil.toJsonStr(userDoc);

// 1.准备Request对象
/*IndexRequest request = new IndexRequest("user").id(String.valueOf(userDoc.getId()));*/
IndexRequest request = new IndexRequest("user").id(userDoc.getId());
// 2.准备Json文档
request.source(doc, XContentType.JSON);
// 3.发送请求
client.index(request, RequestOptions.DEFAULT);
}
(2)删:
// 测试删除文档
@Test
void testDeleteDocument() throws IOException {
// 1.准备Request,两个参数,第一个是索引库名,第二个是文档id
DeleteRequest request = new DeleteRequest("user", "1");
// 2.发送请求
client.delete(request, RequestOptions.DEFAULT);
}
(3)改:
// 测试更新文档
@Test
void testUpdateDocument() throws IOException {
// 1.准备Request
UpdateRequest request = new UpdateRequest("user", "1");
// 2.准备请求参数
UserDoc userDoc=new UserDoc();
userDoc.setUseTime(1L);
userDoc.setBriefIntroduction("hello world!");
// 将UserDoc转json
String doc = JSONUtil.toJsonStr(userDoc);
// 准备Json文档,XContentType.JSON表示json格式
request.doc(doc, XContentType.JSON);

// 3.发送请求
client.update(request, RequestOptions.DEFAULT);
}
(4)查:
// 测试根据id查询文档
@Test
void testGetDocumentById() throws IOException {
// 1.准备Request对象
GetRequest request = new GetRequest("user").id("1");
// 2.发送请求
GetResponse response = client.get(request, RequestOptions.DEFAULT);
// 3.获取响应结果中的source
String json = response.getSourceAsString();

UserDoc userDoc = JSONUtil.toBean(json, UserDoc.class);
System.out.println("userDoc= " + userDoc);
}
注意:

可以看到在增加和修改那边会构造请求参数,我在改那边提供了三个方法,在上面测试类的完整代码中有那三种方法,其实添加的构造请求参数的实现也是一样的,下面我来逐一讲解一下构造的实现:

方法一:

官网的链式编程也很推荐,下面就是浓缩的修改操作:

UpdateRequest request = new UpdateRequest("posts", "1")
.doc("updated", new Date(),
"reason", "daily update");
client.update(request, RequestOptions.DEFAULT);

方法二:

因为根据方法一可知那个数据的格式类似 Map<String, Object> 这样的格式,可以通过map来构造。官网示例如下:

方法三:

官网在这里也提到,可以先构造默认Json格式,然后再换一种类型的Json

此外官网还提供了一个方法我觉得也很优雅,当然还不只这个。

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.timeField("updated", new Date());
builder.field("reason", "daily update");
}
builder.endObject();
UpdateRequest request = new UpdateRequest("posts", "1")
.doc(builder);
3、批量导入文档:

我们需要导入我们用户表里面的数据,非常多,不可能一个一个操作,基本上是批操作,这就需要我们学会批量导入文档

在我的理解add相当于加入你的请求到那里面,然后再根据具体请求的实现来执行各样的操作

基本语法如下:

@Test
void testBulk() throws IOException {
// 1.创建Request
BulkRequest request = new BulkRequest();
// 2.准备请求参数
request.add(new IndexRequest("items").id("1").source("json doc1", XContentType.JSON));
request.add(new IndexRequest("items").id("2").source("json doc2", XContentType.JSON));
// 3.发送请求
client.bulk(request, RequestOptions.DEFAULT);
}

下面是实战,用于添加我用户表信息:

在之前那个EsDocTest测试类里面加上这么一个测试方法:

    @Test
void testLoadUserDocs() throws IOException {
// 分页查询商品数据
int pageNo = 1;
int size = 100;
while (true) {
Page<User> page = userService.lambdaQuery().page(new Page<User>(pageNo, size));
// 非空校验
List<User> users = page.getRecords();
if (CollUtil.isEmpty(users)) {
return;
}
log.info("加载第{}页数据,共{}条", pageNo, users.size());
// 1.创建Request
BulkRequest request = new BulkRequest("user");
// 2.准备参数,添加多个新增的Request
for (User user : users) {
// 2.1.转换为文档类型ItemDTO
UserDoc userDoc = BeanUtil.copyProperties(user, UserDoc.class);
// 2.2.创建新增文档的Request对象
request.add(new IndexRequest()
.id(userDoc.getId())
.source(JSONUtil.toJsonStr(userDoc), XContentType.JSON));
}
// 3.发送请求
client.bulk(request, RequestOptions.DEFAULT);

// 翻页
pageNo++;
}
}

运行:

再去随便搜一个id的用户:

来到kibana:

六、JavaRestClient查询

基本步骤(重点)
    @Test
void testSearch() throws IOException {
// 1.创建Request
SearchRequest request = new SearchRequest("user");
// 2.组织请求参数
request.source().query(QueryBuilders.matchAllQuery());

// 3.发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4.解析响应
handleResponse(response);

}


private static void handleResponse(SearchResponse response) {
SearchHits searchHits = response.getHits();
// 4.1 获取总条数
long total = 0;
if (searchHits.getTotalHits() != null) {
total = searchHits.getTotalHits().value;
}
// 4.2 获取命中的数据
SearchHit[] hits = searchHits.getHits();
List<UserDoc> userDocList=new ArrayList<>();
for (SearchHit hit : hits) {
// 4.2.1 获取source结果(结果是一个json对象)
String json = hit.getSourceAsString();
// 4.2.2 转为实体对象
UserDoc userDoc = JSONUtil.toBean(json, UserDoc.class);
userDocList.add(userDoc);

}
System.out.println("userDocList = " + userDocList);
System.out.println("total = " + total);
}

下面是对一些查询的讲解,这里我用学习的资料总结展示一下,如果只想实战可以参考后面实战

1、叶子查询

全文检索查询(Full Text Queries):利用分词器对用户输入搜索条件先分词,得到词条,然后再利用倒排索引搜索词条。例如:

match(全文检索查询的一种,会对用户输入内容分词,然后去倒排索引库检索)

// 单字段查询
QueryBuilders.matchQuery("name", "脱脂牛奶");

multi_match**(**与match查询类似,只不过允许同时查询多个字段)

// 多字段查询
QueryBuilders.multiMatchQuery("脱脂牛奶", "name", "category");

精确查询(Term-level queries):不对用户输入搜索条件分词,根据字段内容精确值匹配。但只能查找keyword、数值、日期、boolean类型的字段。例如

term _(_词条查询)

equest.source().query(QueryBuilders.termQuery("brand", "华为"));

range(范围查询

request.source().query(QueryBuilders.rangeQuery("price").gte(10000).lte(30000))
2、复合查询

bool查询(基于逻辑运算组合叶子查询,实现组合条件)

// 创建布尔查询
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// 添加filter条件
boolQuery.must(QueryBuilders.termQuery("brand", "华为"));
// 添加filter条件
boolQuery.filter(QueryBuilders.rangeQuery("price").lte(2500));
3、排序和分页
// 查询
request.source().query(QueryBuilders.matchAllQuery());
// 分页
request.source().from(0).size(5);
// 价格排序
request.source().sort("price", SortOrder.ASC);
4、高亮

    // 1.创建Request
SearchRequest request = new SearchRequest("items");
// 2.组织请求参数
// 2.1.query条件
request.source().query(QueryBuilders.matchQuery("name", "脱脂牛奶"));
// 2.2.高亮条件
request.source().highlighter(
SearchSourceBuilder.highlight()
.field("name")
.preTags("<em>")
.postTags("</em>")
);
// 3.发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4.解析响应
handleResponse(response);

    SearchHits searchHits = response.getHits();
// 1.获取总条数
long total = searchHits.getTotalHits().value;
System.out.println("共搜索到" + total + "条数据");
// 2.遍历结果数组
SearchHit[] hits = searchHits.getHits();
for (SearchHit hit : hits) {
// 3.得到_source,也就是原始json文档
String source = hit.getSourceAsString();
// 4.反序列化
ItemDoc item = JSONUtil.toBean(source, ItemDoc.class);
// 5.获取高亮结果
Map<String, HighlightField> hfs = hit.getHighlightFields();
if (CollUtils.isNotEmpty(hfs)) {
// 5.1.有高亮结果,获取name的高亮结果
HighlightField hf = hfs.get("name");
if (hf != null) {
// 5.2.获取第一个高亮结果片段,就是商品名称的高亮值
String hfName = hf.getFragments()[0].string();
item.setName(hfName);
}
}
System.out.println(item);
}

七、数据聚合

聚合(aggregations)可以让我们极其方便的实现对数据的统计、分析、运算。

request.source().size(0); // 分页
request.source().aggregation(
AggregationBuilders
.terms("brand_agg") // 聚合名称
.field("brand") // 聚合字段
.size(20)); // 聚合结果条数
        // 解析聚合结果
Aggregations aggregations = response.getAggregations();
// 根据名称获取聚合结果
Terms brandTerms = aggregations.get("brand_agg");
// 获取桶
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
// 遍历
for (Terms.Bucket bucket : buckets) {
// 获取key,也就是品牌信
String brandName = bucket.getKeyAsString();
System.out.println(brandName);
}
举报

相关推荐

0 条评论