storm技术内幕和大数据实践(storm经典案例分析)

技术Storm可靠性acker案例分析本篇内容主要讲解“Storm可靠性acker案例分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Storm可靠性acker案例分析”吧

本文主要解释“风暴可靠性指标案例分析”。感兴趣的朋友不妨看看。本文介绍的方法简单、快速、实用。让边肖带你学习“风暴可靠性acker案例分析”!

00-1010

Storm可靠性分析

工作进程死亡。

当工作进程挂起时,风暴集群将重新启动工作进程。

主管流程死亡

如果主管进程挂机,不会影响之前提交的拓扑,但是以后不能给这个节点分配任务,因为这个节点不再是集群的成员。

Nimbus进程死亡(HA有问题)并且很快失败。

如果nimbus进程挂起,不会影响之前已经提交的拓扑,但是以后不能向集群提交新的拓扑。1.0以下的版本有HA的问题,1.0以后已经修复了,可以有多个备选光轮。

节点停机时间

确认/失败消息确认机制(确保元组被完全处理)

在spout中启动tuple时,需要同时发送messageid,相当于打开了消息确认机制。

如果拓扑中有更多的元组,那么将acker的数量设置得多一点会提高效率。

通过config.setNumAckers(num)设置拓扑中的acker数量,默认值为1。

注意:acker使用了一种特殊的算法,因此跟踪每个spout元组的状态所需的内存量是恒定的(20字节)(您可以了解它的算法。目前百度风暴acker在没有深入了解这个算法的情况下可以找到相关分析文章)

注意:如果元组没有在指定的超时(配置的默认值)内成功处理。topology _ message _ timeout _ secs为30秒),则该元组将被视为处理失败。

完整处理元组

在storm中,一个元组被完全处理,这意味着这个元组和从这个元组派生的所有元组都被成功处理。

00-1010之前也提到过,如果要使用qck/fail确认机制,需要做好以下几点:

1.在我们的喷口重写ack和fail方法。

2.spout在发送元组时需要携带messageId。

3.3.bolt成功或失败后主动回电,根据以上描述,程序代码如下,注意这几点:

package cn . xpleaf . big data . storm . acker;

import cn . xpleaf . big data . storm . utils . storm util;

import org . Apache . storm . config;

import org . Apache . storm . local CLuster;

import org . Apache . storm . storm submitter;

import org . Apache . storm . generated . storm topology;

import org . Apache . storm . spout . PubToutCollector;

import org . Apache . storm . task . output collector;

import org . Apache . storm . task . topologycontext;

import org . Apache . storm . topology . OutPutfields clarer;

import org . Apache . storm . topology . topology builder;

import org . Apache . storm . topology . base . baserichbolt;

import org . Apache . storm . topology . base . baserichspout;

import org . Apache . storm . tuple . field;

import org . Apache . storm . tuple . tuple;

import org . Apache . storm . tuple . values;

import Java . util . date;

import Java . util . map;

import Java . util . Uuid;

/**

* 1、数字累加求和的情况:数据源不断生成递增的数字,并将生成的数字累加求和。

*p

*

Storm组件:Spout、Bolt、数据是Tuple,使用main中的Topology将spout和bolt进行关联
 * MapReduce的组件:Mapper和Reducer、数据是Writable,通过一个main中的job将二者关联
 * <p>
 * 适配器模式(Adapter):BaseRichSpout,其对继承接口中一些没必要的方法进行了重写,但其重写的代码没有实现任何功能。
 * 我们称这为适配器模式
 * <p>
 * storm消息确认机制---可靠性分析
 * acker
 * fail
 */
public class AckerSumTopology {

    /**
     * 数据源
     */
    static class OrderSpout extends BaseRichSpout {

        private Map conf;   // 当前组件配置信息
        private TopologyContext context;    // 当前组件上下文对象
        private SpoutOutputCollector collector; // 发送tuple的组件

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

         private long num = 0;
        /**
         * 接收数据的核心方法
         */
        @Override
        public void nextTuple() {
            String messageId = UUID.randomUUID().toString().replaceAll("-", "").toLowerCase();
//            while (true) {
                num++;
                StormUtil.sleep(1000);
                System.out.println("当前时间" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "产生的订单金额:" + num);
                this.collector.emit(new Values(num), messageId);
//            }
        }

        /**
         * 是对发送出去的数据的描述schema
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("order_cost"));
        }

        @Override
        public void ack(Object msgId) {
            System.out.println(msgId + "对应的消息被处理成功了");
        }

        @Override
        public void fail(Object msgId) {
            System.out.println(msgId + "---->对应的消息被处理失败了");
        }
    }

    /**
     * 计算和的Bolt节点
     */
    static class SumBolt extends BaseRichBolt {

        private Map conf;   // 当前组件配置信息
        private TopologyContext context;    // 当前组件上下文对象
        private OutputCollector collector; // 发送tuple的组件

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        private Long sumOrderCost = 0L;

        /**
         * 处理数据的核心方法
         */
        @Override
        public void execute(Tuple input) {
            Long orderCost = input.getLongByField("order_cost");
            sumOrderCost += orderCost;
            if (orderCost % 10 == 1) {   // 每10次模拟消息失败一次
                collector.fail(input);
            } else {
                System.out.println("线程ID:" + Thread.currentThread().getId() + " ,商城网站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品总交易额" + sumOrderCost);
                collector.ack(input);
            }
            StormUtil.sleep(1000);
        }

        /**
         * 如果当前bolt为最后一个处理单元,该方法可以不用管
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    /**
     * 构建拓扑,相当于在MapReduce中构建Job
     */
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        /**
         * 设置spout和bolt的dag(有向无环图)
         */
        builder.setSpout("id_order_spout", new OrderSpout());
        builder.setBolt("id_sum_bolt", new SumBolt(), 1)
                .shuffleGrouping("id_order_spout"); // 通过不同的数据流转方式,来指定数据的上游组件
        // 使用builder构建topology
        StormTopology topology = builder.createTopology();
        String topologyName = AckerSumTopology.class.getSimpleName();  // 拓扑的名称
        Config config = new Config();   // Config()对象继承自HashMap,但本身封装了一些基本的配置
        // 启动topology,本地启动使用LocalCluster,集群启动使用StormSubmitter
        if (args == null || args.length < 1) {  // 没有参数时使用本地模式,有参数时使用集群模式
            LocalCluster localCluster = new LocalCluster(); // 本地开发模式,创建的对象为LocalCluster
            localCluster.submitTopology(topologyName, config, topology);
        } else {
            StormSubmitter.submitTopology(topologyName, config, topology);
        }
    }
}

运行后(本地运行或上传到集群上提交作业),输出如下:

当前时间20180413215706产生的订单金额:1
当前时间20180413215707产生的订单金额:2
7a4ce596fd3a40659f2d7f80a7738f55---->对应的消息被处理失败了
线程ID:133 ,商城网站到目前20180413215707的商品总交易额3
当前时间20180413215708产生的订单金额:3
0555a933a49f413e94480be201a55615对应的消息被处理成功了
线程ID:133 ,商城网站到目前20180413215708的商品总交易额6
当前时间20180413215709产生的订单金额:4
4b923132e4034e939c875aca368a8897对应的消息被处理成功了
线程ID:133 ,商城网站到目前20180413215709的商品总交易额10
当前时间20180413215710产生的订单金额:5
51f159472e854ba282ab84a2218459b8对应的消息被处理成功了
线程ID:133 ,商城网站到目前20180413215710的商品总交易额15
......

Storm定时任务

一般的业务数据存储,最终还是要落地,存储到RDBMS,但是RDBMS无法达到高访问量,能力达不到实时处理,或者说处理能力是有限的,会造成连接中断等问题,为了数据落地,我们可以采取迂回方式,可以采用比如说先缓存到高速内存数据库(如redis),然后再将内存数据库中的数据定时同步到rdbms中,而且可以定期定时来做。

  • 可以每隔指定的时间将数据整合一次存入数据库。

  • 或者每隔指定的时间执行一些

可以在storm中使用定时任务来实现这些定时数据落地的功能,不过需要先了解storm定时任务。

全局定时任务

在main中设置

conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);  // 设置多久发送一个系统的tuple定时发射数据

但是我们一般都会对特定的bolt设置定时任务,而没有必要对全局每一个bolt都发送系统的tuple,这样非常的耗费资源,所以就有了局部定时任务,也是我们常用的。

注意:storm会按照用户设置的时间间隔给拓扑中的所有bolt发送系统级别的tuple。在main函数中设置定时器,storm会定时给拓扑中的所有bolt都发送系统级别的tuple,如果只需要给某一个bolt设置定时功能的话,只需要在这个bolt中覆盖getComponentConfiguration方法,里面设置定时间隔即可。

测试代码如下:

package cn.xpleaf.bigdata.storm.quartz;

import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.shade.org.apache.commons.io.FileUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.io.File;
import java.io.IOException;
import java.util.*;

/**
 * 2°、单词计数:监控一个目录下的文件,当发现有新文件的时候,
        把文件读取过来,解析文件中的内容,统计单词出现的总次数
        E:\data\storm

        研究storm的定时任务
        有两种方式:
            1.main中设置,全局有效
            2.在特定bolt中设置,bolt中有效
 */
public class QuartzWordCountTopology {

    /**
     * Spout,获取数据源,这里是持续读取某一目录下的文件,并将每一行输出到下一个Bolt中
     */
    static class FileSpout extends BaseRichSpout {
        private Map conf;   // 当前组件配置信息
        private TopologyContext context;    // 当前组件上下文对象
        private SpoutOutputCollector collector; // 发送tuple的组件

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        @Override
        public void nextTuple() {
            File directory = new File("D:/data/storm");
            // 第二个参数extensions的意思就是,只采集某些后缀名的文件
            Collection<File> files = FileUtils.listFiles(directory, new String[]{"txt"}, true);
            for (File file : files) {
                try {
                    List<String> lines = FileUtils.readLines(file, "utf-8");
                    for(String line : lines) {
                        this.collector.emit(new Values(line));
                    }
                    // 当前文件被消费之后,需要重命名,同时为了防止相同文件的加入,重命名后的文件加了一个随机的UUID,或者加入时间戳也可以的
                    File destFile = new File(file.getAbsolutePath() + "_" + UUID.randomUUID().toString() + ".completed");
                    FileUtils.moveFile(file, destFile);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("line"));
        }
    }

    /**
     * Bolt节点,将接收到的每一行数据切割为一个个单词并发送到下一个节点
     */
    static class SplitBolt extends BaseRichBolt {

        private Map conf;   // 当前组件配置信息
        private TopologyContext context;    // 当前组件上下文对象
        private OutputCollector collector; // 发送tuple的组件

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        @Override
        public void execute(Tuple input) {
            if (!input.getSourceComponent().equalsIgnoreCase(Constants.SYSTEM_COMPONENT_ID) ) { // 确保不是系统发送的tuple,才使用我们的业务逻辑
                String line = input.getStringByField("line");
                String[] words = line.split(" ");
                for (String word : words) {
                    this.collector.emit(new Values(word, 1));
                }
            } else {
                System.out.println("splitBolt: " + input.getSourceComponent().toString());
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }

    /**
     * Bolt节点,执行单词统计计算
     */
    static class WCBolt extends BaseRichBolt {

        private Map conf;   // 当前组件配置信息
        private TopologyContext context;    // 当前组件上下文对象
        private OutputCollector collector; // 发送tuple的组件

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        private Map<String, Integer> map = new HashMap<>();

        @Override
        public void execute(Tuple input) {
            if (!input.getSourceComponent().equalsIgnoreCase(Constants.SYSTEM_COMPONENT_ID) ) { // 确保不是系统发送的tuple,才使用我们的业务逻辑
                String word = input.getStringByField("word");
                Integer count = input.getIntegerByField("count");
            /*if (map.containsKey(word)) {
                map.put(word, map.get(word) + 1);
            } else {
                map.put(word, 1);
            }*/
                map.put(word, map.getOrDefault(word, 0) + 1);

                System.out.println("====================================");
                map.forEach((k, v) -> {
                    System.out.println(k + ":::" + v);
                });
            } else {
                System.out.println("sumBolt: " + input.getSourceComponent().toString());
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    /**
     * 构建拓扑,组装Spout和Bolt节点,相当于在MapReduce中构建Job
     */
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        // dag
        builder.setSpout("id_file_spout", new FileSpout());
        builder.setBolt("id_split_bolt", new SplitBolt()).shuffleGrouping("id_file_spout");
        builder.setBolt("id_wc_bolt", new WCBolt()).shuffleGrouping("id_split_bolt");

        StormTopology stormTopology = builder.createTopology();
        LocalCluster cluster = new LocalCluster();
        String topologyName = QuartzWordCountTopology.class.getSimpleName();
        Config config = new Config();
        config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
        cluster.submitTopology(topologyName, config, stormTopology);
    }
}

输出:

splitBolt: __system
sumBolt: __system
splitBolt: __system
sumBolt: __system
......

局部定时任务

在bolt中使用下面代码判断是否是触发用的bolt

tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)

如果为true,则执行定时任务需要执行的代码,最后return,如果为false,则执行正常的tuple处理的业务逻辑。

即对于需要进行数据落地的bolt,我们可以只给该bolt设置定时任务,这样系统会定时给该bolt发送系统级别的tuple,在我们该bolt的代码中进行判断,如果接收到的是系统级别的bolt,则进行数据落地的操作,比如将数据写入数据库或其它操作等,否则就按照正常的逻辑来执行我们的业务代码。

工作中常用这一种方式进行操作。

测试程序如下:

package cn.xpleaf.bigdata.storm.quartz;

import clojure.lang.Obj;
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.shade.org.apache.commons.io.FileUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.io.File;
import java.io.IOException;
import java.util.*;

/**
 * 2°、单词计数:监控一个目录下的文件,当发现有新文件的时候,
        把文件读取过来,解析文件中的内容,统计单词出现的总次数
        E:\data\storm

        研究storm的定时任务
        有两种方式:
            1.main中设置,全局有效
            2.在特定bolt中设置,bolt中有效
 */
public class QuartzPartWCTopology {

    /**
     * Spout,获取数据源,这里是持续读取某一目录下的文件,并将每一行输出到下一个Bolt中
     */
    static class FileSpout extends BaseRichSpout {
        private Map conf;   // 当前组件配置信息
        private TopologyContext context;    // 当前组件上下文对象
        private SpoutOutputCollector collector; // 发送tuple的组件

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        @Override
        public void nextTuple() {
            File directory = new File("D:/data/storm");
            // 第二个参数extensions的意思就是,只采集某些后缀名的文件
            Collection<File> files = FileUtils.listFiles(directory, new String[]{"txt"}, true);
            for (File file : files) {
                try {
                    List<String> lines = FileUtils.readLines(file, "utf-8");
                    for(String line : lines) {
                        this.collector.emit(new Values(line));
                    }
                    // 当前文件被消费之后,需要重命名,同时为了防止相同文件的加入,重命名后的文件加了一个随机的UUID,或者加入时间戳也可以的
                    File destFile = new File(file.getAbsolutePath() + "_" + UUID.randomUUID().toString() + ".completed");
                    FileUtils.moveFile(file, destFile);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("line"));
        }
    }

    /**
     * Bolt节点,将接收到的每一行数据切割为一个个单词并发送到下一个节点
     */
    static class SplitBolt extends BaseRichBolt {

        private Map conf;   // 当前组件配置信息
        private TopologyContext context;    // 当前组件上下文对象
        private OutputCollector collector; // 发送tuple的组件

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        @Override
        public void execute(Tuple input) {
            String line = input.getStringByField("line");
            String[] words = line.split(" ");
            for (String word : words) {
                this.collector.emit(new Values(word, 1));
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }

    /**
     * Bolt节点,执行单词统计计算
     */
    static class WCBolt extends BaseRichBolt {

        private Map conf;   // 当前组件配置信息
        private TopologyContext context;    // 当前组件上下文对象
        private OutputCollector collector; // 发送tuple的组件

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        private Map<String, Integer> map = new HashMap<>();

        @Override
        public void execute(Tuple input) {
            if (!input.getSourceComponent().equalsIgnoreCase(Constants.SYSTEM_COMPONENT_ID) ) { // 确保不是系统发送的tuple,才使用我们的业务逻辑
                String word = input.getStringByField("word");
                Integer count = input.getIntegerByField("count");
            /*if (map.containsKey(word)) {
                map.put(word, map.get(word) + 1);
            } else {
                map.put(word, 1);
            }*/
                map.put(word, map.getOrDefault(word, 0) + 1);

                System.out.println("====================================");
                map.forEach((k, v) -> {
                    System.out.println(k + ":::" + v);
                });
            } else {
                System.out.println("sumBolt: " + input.getSourceComponent().toString() + "---" + System.currentTimeMillis());
            }
        }

        @Override
        public Map<String, Object> getComponentConfiguration() { // 修改局部bolt的配置信息
            Map<String, Object> config = new HashMap<>();
            config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
            return config;
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    /**
     * 构建拓扑,组装Spout和Bolt节点,相当于在MapReduce中构建Job
     */
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        // dag
        builder.setSpout("id_file_spout", new FileSpout());
        builder.setBolt("id_split_bolt", new SplitBolt()).shuffleGrouping("id_file_spout");
        builder.setBolt("id_wc_bolt", new WCBolt()).shuffleGrouping("id_split_bolt");

        StormTopology stormTopology = builder.createTopology();
        LocalCluster cluster = new LocalCluster();
        String topologyName = QuartzPartWCTopology.class.getSimpleName();
        Config config = new Config();
        cluster.submitTopology(topologyName, config, stormTopology);
    }
}

输出如下:

sumBolt: __system---1523631954330
sumBolt: __system---1523631964330
sumBolt: __system---1523631974329
sumBolt: __system---1523631984329
sumBolt: __system---1523631994330
sumBolt: __system---1523632004330
sumBolt: __system---1523632014329
sumBolt: __system---1523632024330
......

Storm UI参数介绍

Storm可靠性acker案例分析

  • deactive:未激活(暂停)

  • emitted: emitted tuple数

  • transferred: transferred tuple数

    emitted的区别:如果一个task,emitted一个tuple到2个task中,则 transferred tuple数是emitted tuple数的两倍

  • complete latency: spout emitting 一个tuple到spout ack这个tuple的平均时间(可以认为是tuple以及该tuple树的整个处理时间)

  • process latency: bolt收到一个tuple到bolt ack这个tuple的平均时间,如果没有启动acker机制,那么值为0

  • execute latency:bolt处理一个tuple的平均时间,不包含acker操作,单位是毫秒(也就是bolt 执行 execute 方法的平均时间)

  • capacity:这个值越接近1,说明bolt或者spout基本一直在调用execute方法,说明并行度不够,需要扩展这个组件的executor数量。

到此,相信大家对“Storm可靠性acker案例分析”有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/156277.html

(0)

相关推荐

  • 怎么理解MySQL的API接口

    技术怎么理解MySQL的API接口这篇文章给大家介绍怎么理解MySQL的API接口,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。MySQL 的 API 接口为了方便应用程序的开发,MySQL 提供了

    攻略 2021年11月17日
  • go restful接口开发步骤(go语言调用第三方restful api)

    技术gorm+gin怎么实现restful分页本篇内容主要讲解“gorm+gin怎么实现restful分页”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“gorm+gin怎么实

    攻略 2021年12月22日
  • php.ini状态设置在哪里(php.ini怎么配置)

    技术php.ini如何禁用方法这篇文章将为大家详细讲解有关php.ini如何禁用方法,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。 php.ini禁用方法:

    攻略 2021年12月19日
  • 加拿大动物,加拿大的代表动物是什么

    技术加拿大动物,加拿大的代表动物是什么加拿大的国兽,海狸(英语叫BEAVER),也叫河狸,它是一种躯体肥胖的啮齿类动物,外型酷似大老鼠。海狸身上有两宝,一种是用于制作香水的原料,属于海狸性腺分泌的液体;另一种是海狸油光水

    生活 2021年10月27日
  • Axios中值得学习的核心知识点有哪些

    技术Axios中值得学习的核心知识点有哪些本篇内容介绍了“Axios中值得学习的核心知识点有哪些”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔

    攻略 2021年10月27日
  • kvm总结(2) : 配置文件和磁盘

    技术kvm总结(2) : 配置文件和磁盘 kvm总结(2) : 配置文件和磁盘KVM是一个专题系列,建议按照顺序阅读这些文章,以便站在前文的基础上去理解,专题链接如下:
    https://www.zsyt

    礼包 2021年11月10日