Mongodb实践

Posted by     "zengchengjie" on Saturday, January 15, 2022

介绍

MongoDB是一个基于分布式文件存储的数据库,它是一个介于关系数据库和非关系数据库之间的产品,其主要目标是在键/值存储方式(提供了高性能和高度伸缩性)和传统的RDBMS系统(具有丰富的功能)之间架起一座桥梁,它集两者的优势于一身。

MongoDB支持的数据结构非常松散,是类似json的bson格式,因此可以存储比较复杂的数据类型,也因为他的存储格式也使得它所存储的数据在Nodejs程序应用中使用非常流畅。

既然称为NoSQL数据库,Mongo的查询语言非常强大,其语法有点类似于面向对象的查询语言,几乎可以实现类似关系数据库单表查询的绝大部分功能,而且还支持对数据建立索引。

但是,MongoDB也不是万能的,同MySQL等关系型数据库相比,它们在针对不同的数据类型和事务要求上都存在自己独特的优势。在数据存储的选择中,坚持多样化原则,选择更好更经济的方式,而不是自上而下的统一化。

较常见的,我们可以直接用MongoDB来存储键值对类型的数据,如:验证码、Session等;

由于MongoDB的横向扩展能力,也可以用来存储数据规模会在未来变的非常巨大的数据,如:日志、评论等;

由于MongoDB存储数据的弱类型,也可以用来存储一些多变json数据,如:与外系统交互时经常变化的JSON报文。

而对于一些对数据有复杂的高事务性要求的操作,如:账户交易等就不适合使用MongoDB来存储。

在Spring Boot中,对如此受欢迎的MongoDB,同样提供了自配置功能。

部署

啥也别说了,直接上github文档链接,或者docker hub链接

docker-compose.yml如下:

# Use root/example as user/password credentials
version: '3.1'

services:

  mongo:
    image: mongo
    restart: always
    ports:
      - 27017:27017
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: example

  mongo-express:
    image: mongo-express
    restart: always
    ports:
      - 8081:8081
    environment:
      ME_CONFIG_MONGODB_ADMINUSERNAME: root
      ME_CONFIG_MONGODB_ADMINPASSWORD: example
      ME_CONFIG_MONGODB_URL: mongodb://root:example@mongo:27017/

以上,docker-compose up -d即可。

其中,mongo是数据库,连接端口为27017,mongo-express为web控制台,访问地址为IP:8081,点击访问:http://192.168.10.44:8081/

image-20220328184127674

使用Navicat连接:

以上部署MongoDB完毕。

使用

案例一:将kafka数据消费并存储到多个MongoDB中

消费一个kafka,然后存入到二个mongoDB库

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.zcj</groupId>
    <artifactId>mongosinkkafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>mongosinkkafka</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

配置文件:


server:
  servlet:
    context-path: /
  port: 80

spring:
  kafka:
    producer:
      bootstrap-servers: 192.168.10.49:9093
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      bootstrap-servers: 192.168.10.49:9093
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

  data:
    mongodb:
      primary:
        uri: "mongodb://root:123456@192.168.10.49:27017/dbnetprobe"

      secondary:
        uri: "mongodb://root:123456@192.168.10.49:27017/dbnetdev"

Application类:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MongosinkkafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(MongosinkkafkaApplication.class, args);
    }

}

定义两个库的配置类

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory;
import org.springframework.data.mongodb.repository.config.EnableMongoRepositories;

/**
 * @ClassName: PrimaryMongoConfig
 * @Description: 第一个库
 */
@Configuration
@EnableMongoRepositories(mongoTemplateRef = PrimaryMongoConfig.MONGO_TEMPLATE)
public class PrimaryMongoConfig {
    public static final String MONGO_TEMPLATE = "primaryMongoTemplate";

    @Value("${spring.data.mongodb.primary.uri}")
    private String uri;

    @Primary
    @Bean(name = MONGO_TEMPLATE)
    public MongoTemplate mongoPrimaryTemplate() {
        return new MongoTemplate(mongoPrimaryFactory());
    }

    @Bean
    @Primary
    public MongoDatabaseFactory mongoPrimaryFactory() {
        SimpleMongoClientDatabaseFactory simpleMongoClientDbFactory = new SimpleMongoClientDatabaseFactory(uri);
        return simpleMongoClientDbFactory;
    }
}

第二个:

mport org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory;
import org.springframework.data.mongodb.repository.config.EnableMongoRepositories;

/**
 * @ClassName: SecondaryMongoConfig
 * @Description: 第二个库
 */
@Configuration
@EnableMongoRepositories(mongoTemplateRef = SecondaryMongoConfig.MONGO_TEMPLATE)
public class SecondaryMongoConfig {
    public static final String MONGO_TEMPLATE = "secondaryMongoTemplate";

    @Value("${spring.data.mongodb.secondary.uri}")
    private String uri;

    @Primary
    @Bean(name = MONGO_TEMPLATE)
    public MongoTemplate mongoSecondaryTemplate() {
        return new MongoTemplate(mongoSecondaryFactory());
    }

    @Bean
    @Primary
    public MongoDatabaseFactory mongoSecondaryFactory() {
        SimpleMongoClientDatabaseFactory simpleMongoClientDbFactory = new SimpleMongoClientDatabaseFactory(uri);
        return simpleMongoClientDbFactory;
    }
}

消费者:

import com.zcj.mongokafka.config.PrimaryMongoConfig;
import com.zcj.mongokafka.config.SecondaryMongoConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;


@Component
public class KafkaConsumer {
    @Resource(name = PrimaryMongoConfig.MONGO_TEMPLATE)
    private MongoTemplate primaryMongoTemplate;     //第一个库的MongoTemplate

    @Resource(name = SecondaryMongoConfig.MONGO_TEMPLATE)
    private MongoTemplate secondaryMongoTemplate;   //第二个库的MongoTemplate

    @KafkaListener(topics = "common.vod.edge.traffic", groupId = "consumer-traffic")
    public void KafkaMsgTraffic(ConsumerRecord<String, String> consumerRecord) {
        String valus = consumerRecord.value();
        primaryMongoTemplate.save(valus, "traffic");
    }

    @KafkaListener(topics = "common.vod.sdk.taskstart", groupId = "consumer-taskstart")
    public void KafkaMsgNetTaskStart(ConsumerRecord<String, String> consumerRecord) {
        String valus = consumerRecord.value();
        secondaryMongoTemplate.save(valus, "taskstart");
    }

    @KafkaListener(topics = "common.vod.sdk.taskdata", groupId = "consumer-taskdata")
    public void KafkaMsgTaskData(ConsumerRecord<String, String> consumerRecord) {
        String valus = consumerRecord.value();
        secondaryMongoTemplate.save(valus, "taskdata");
    }

}