介绍
MongoDB是一个基于分布式文件存储的数据库,它是一个介于关系数据库和非关系数据库之间的产品,其主要目标是在键/值存储方式(提供了高性能和高度伸缩性)和传统的RDBMS系统(具有丰富的功能)之间架起一座桥梁,它集两者的优势于一身。
MongoDB支持的数据结构非常松散,是类似json的bson格式,因此可以存储比较复杂的数据类型,也因为他的存储格式也使得它所存储的数据在Nodejs程序应用中使用非常流畅。
既然称为NoSQL数据库,Mongo的查询语言非常强大,其语法有点类似于面向对象的查询语言,几乎可以实现类似关系数据库单表查询的绝大部分功能,而且还支持对数据建立索引。
但是,MongoDB也不是万能的,同MySQL等关系型数据库相比,它们在针对不同的数据类型和事务要求上都存在自己独特的优势。在数据存储的选择中,坚持多样化原则,选择更好更经济的方式,而不是自上而下的统一化。
较常见的,我们可以直接用MongoDB来存储键值对类型的数据,如:验证码、Session等;
由于MongoDB的横向扩展能力,也可以用来存储数据规模会在未来变的非常巨大的数据,如:日志、评论等;
由于MongoDB存储数据的弱类型,也可以用来存储一些多变json数据,如:与外系统交互时经常变化的JSON报文。
而对于一些对数据有复杂的高事务性要求的操作,如:账户交易等就不适合使用MongoDB来存储。
在Spring Boot中,对如此受欢迎的MongoDB,同样提供了自配置功能。
部署
啥也别说了,直接上github文档链接,或者docker hub链接
docker-compose.yml如下:
# Use root/example as user/password credentials
version: '3.1'
services:
mongo:
image: mongo
restart: always
ports:
- 27017:27017
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: example
mongo-express:
image: mongo-express
restart: always
ports:
- 8081:8081
environment:
ME_CONFIG_MONGODB_ADMINUSERNAME: root
ME_CONFIG_MONGODB_ADMINPASSWORD: example
ME_CONFIG_MONGODB_URL: mongodb://root:example@mongo:27017/
以上,docker-compose up -d即可。
其中,mongo是数据库,连接端口为27017,mongo-express为web控制台,访问地址为IP:8081,点击访问:http://192.168.10.44:8081/
使用Navicat连接:
以上部署MongoDB完毕。
使用
案例一:将kafka数据消费并存储到多个MongoDB中
消费一个kafka,然后存入到二个mongoDB库
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.zcj</groupId>
<artifactId>mongosinkkafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>mongosinkkafka</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
配置文件:
server:
servlet:
context-path: /
port: 80
spring:
kafka:
producer:
bootstrap-servers: 192.168.10.49:9093
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
bootstrap-servers: 192.168.10.49:9093
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
data:
mongodb:
primary:
uri: "mongodb://root:123456@192.168.10.49:27017/dbnetprobe"
secondary:
uri: "mongodb://root:123456@192.168.10.49:27017/dbnetdev"
Application类:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MongosinkkafkaApplication {
public static void main(String[] args) {
SpringApplication.run(MongosinkkafkaApplication.class, args);
}
}
定义两个库的配置类
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory;
import org.springframework.data.mongodb.repository.config.EnableMongoRepositories;
/**
* @ClassName: PrimaryMongoConfig
* @Description: 第一个库
*/
@Configuration
@EnableMongoRepositories(mongoTemplateRef = PrimaryMongoConfig.MONGO_TEMPLATE)
public class PrimaryMongoConfig {
public static final String MONGO_TEMPLATE = "primaryMongoTemplate";
@Value("${spring.data.mongodb.primary.uri}")
private String uri;
@Primary
@Bean(name = MONGO_TEMPLATE)
public MongoTemplate mongoPrimaryTemplate() {
return new MongoTemplate(mongoPrimaryFactory());
}
@Bean
@Primary
public MongoDatabaseFactory mongoPrimaryFactory() {
SimpleMongoClientDatabaseFactory simpleMongoClientDbFactory = new SimpleMongoClientDatabaseFactory(uri);
return simpleMongoClientDbFactory;
}
}
第二个:
mport org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory;
import org.springframework.data.mongodb.repository.config.EnableMongoRepositories;
/**
* @ClassName: SecondaryMongoConfig
* @Description: 第二个库
*/
@Configuration
@EnableMongoRepositories(mongoTemplateRef = SecondaryMongoConfig.MONGO_TEMPLATE)
public class SecondaryMongoConfig {
public static final String MONGO_TEMPLATE = "secondaryMongoTemplate";
@Value("${spring.data.mongodb.secondary.uri}")
private String uri;
@Primary
@Bean(name = MONGO_TEMPLATE)
public MongoTemplate mongoSecondaryTemplate() {
return new MongoTemplate(mongoSecondaryFactory());
}
@Bean
@Primary
public MongoDatabaseFactory mongoSecondaryFactory() {
SimpleMongoClientDatabaseFactory simpleMongoClientDbFactory = new SimpleMongoClientDatabaseFactory(uri);
return simpleMongoClientDbFactory;
}
}
消费者:
import com.zcj.mongokafka.config.PrimaryMongoConfig;
import com.zcj.mongokafka.config.SecondaryMongoConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class KafkaConsumer {
@Resource(name = PrimaryMongoConfig.MONGO_TEMPLATE)
private MongoTemplate primaryMongoTemplate; //第一个库的MongoTemplate
@Resource(name = SecondaryMongoConfig.MONGO_TEMPLATE)
private MongoTemplate secondaryMongoTemplate; //第二个库的MongoTemplate
@KafkaListener(topics = "common.vod.edge.traffic", groupId = "consumer-traffic")
public void KafkaMsgTraffic(ConsumerRecord<String, String> consumerRecord) {
String valus = consumerRecord.value();
primaryMongoTemplate.save(valus, "traffic");
}
@KafkaListener(topics = "common.vod.sdk.taskstart", groupId = "consumer-taskstart")
public void KafkaMsgNetTaskStart(ConsumerRecord<String, String> consumerRecord) {
String valus = consumerRecord.value();
secondaryMongoTemplate.save(valus, "taskstart");
}
@KafkaListener(topics = "common.vod.sdk.taskdata", groupId = "consumer-taskdata")
public void KafkaMsgTaskData(ConsumerRecord<String, String> consumerRecord) {
String valus = consumerRecord.value();
secondaryMongoTemplate.save(valus, "taskdata");
}
}