大数据时代,数据实时同步解决方案的思考—最全的数据同步总结

  • 时间:
  • 浏览:2
  • 来源:大发快3_快3计划网_大发快3计划网

1、 早期关系型数据库之间的数据同步

1)、全量同步

比如从oracle数据库中同步一张表的数据到Mysql中,通常的做法某些 分页查询源端的表,否则通过 jdbc的batch 方法插入到目标表,你你是什么地方须要注意的是,分页查询时,一定要按照主键id来排序分页,除理重复插入。

2)、基于数据文件导出和导入的全量同步,你你是什么同步方法一般只适用于同种数据库之间的同步,肯能是不同的数据库,你你是什么方法肯能会处于大疑问。

3)、基于触发器的增量同步

增量同步一般是做实时的同步,早期某些数据同步不会 基于关系型数据库的触发器trigger来做的。

使用触发器实时同步数据的步骤:

A、 基于原表创触发器,触发器涵盖insert,modify,delete 一种生活类型的操作,数据库的触发器分Before和After一种生活情形,一种生活是在insert,modify,delete 一种生活类型的操作处于完后 触发(比如记录日志操作,一般是Before),一种生活是在insert,modify,delete 一种生活类型的操作完后 触发。

B、 创建增量表,增量表中的字段和原表中的字段详细一样,否则须要多一另1个多多多操作类型字段(分表代表insert,modify,delete 一种生活类型的操作),否则须要一另1个多多多唯一自增ID,代表数据原表中数据操作的顺序,你你是什么自增id非常重要,不然数据同步就会错乱。

C、 原表中出現insert,modify,delete 一种生活类型的操作时,通过触发器自动产生增量数据,插入增量表中。

D、除理增量表中的数据,除理时,一定是按照自增id的顺序来除理,你你是什么速度会非常低,没方法做批量操作,不然数据会错乱。  一帮人肯能会说,是不会 还须要把insert操作合并在一并,modify合并在一并,delete操作合并在一并,否则批量除理,我给的答案是不行,肯能数据的增详细是有顺序的,合并后,就都不可以可以 顺序了,同一根绳子 绳子 数据的增详细顺序一旦错了,那数据同步就肯定错了。

市面上某些数据etl数据交换产品不会 基于你你是什么思想来做的。

E、 你你是什么思想使用kettle 很容易就还须要实现,笔者原先在此人 的博客中写过 kettle的文章,https://www.cnblogs.com/laoqing/p/7360 673.html

4)、基于时间戳的增量同步

A、首先一帮人都儿须要一张临时temp表,用来存取每次读取的待同步的数据,也某些 把每次从原表中根据时间戳读取到数据先插入到临时表中,每次在插入前,先清空临时表的数据

B、一帮人都儿还须要创建一另1个多多多时间戳配置表,用于存放每次读取的除理完的数据的最后的时间戳。

C、每次从原表中读取数据时,先查询时间戳配置表,否则就知道了查询原表时的现在开始 时间戳。

D、根据时间戳读取到原表的数据,插入到临时表中,否则再将临时表中的数据插入到目标表中。

E、从缓存表中读取出数据的最大时间戳,否则更新到时间戳配置表中。缓存表的作用某些 使用sql获取每次读取到的数据的最大的时间戳,当然哪几个不会 详细基于sql句子在kettle中来配置,才须要原先的一张临时表。

2、    大数据时代下的数据同步

1)、基于数据库日志(比如mysql的binlog)的同步

一帮人都儿都知道某些数据库都支持了主从自动同步,尤其是mysql,还须要支持多主多从的模式。都不可以可以 一帮人都儿是不会 还须要利用你你是什么思想呢,答案当然是肯定的,mysql的主从同步的过程是原先的。

  A、master将改变记录到二进制日志(binary log)中(哪几个记录叫做二进制日志事件,binary log events,还须要通过show binlog events进行查看);

  B、slave将master的binary log events拷贝到它的中继日志(relay log);

  C、slave重做中继日志中的事件,将改变反映它此人 的数据。

阿里巴巴开源的canal就完美的使用你你是什么方法,canal 伪装了一另1个多多多Slave 去喝Master进行同步。

A、 canal模拟mysql slave的交互协议,伪装此人 为mysql slave,向mysql master发送dump协议

B、 mysql master收到dump请求,现在开始 推送binary log给slave(也某些 canal)

C、 canal解析binary log对象(原始为byte流)

另外canal 在设计时,有点儿设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑。

canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample

canal c# 客户端: https://github.com/dotnetcore/CanalSharp

canal go客户端: https://github.com/CanalClient/canal-go

canal php客户端: https://github.com/xingwenge/canal-php、

github的地址:https://github.com/alibaba/canal/

另外canal 1.1.1版本完后 , 默认支持将canal server接收到的binlog数据直接投递到MQ   https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

D、在使用canal时,mysql须要开启binlog,否则binlog-format须要为row,还须要在mysql的my.cnf文件中增加如下配置

log-bin=E:/mysql5.5/bin_log/mysql-bin.log

binlog-format=ROW

server-id=123、

E、 部署canal的服务端,配置canal.properties文件,否则 启动 bin/startup.sh 或bin/startup.bat

#设置要监听的mysql服务器的地址和端口

canal.instance.master.address = 127.0.0.1:360 6

#设置一另1个多多多可访问mysql的用户名和密码并具有相应的权限,本示例用户名、密码都为canal

canal.instance.dbUsername = canal

canal.instance.dbPassword = canal

#连接的数据库

canal.instance.defaultDatabaseName =test

#订阅实例中所有的数据库和表

canal.instance.filter.regex = .*\\..*

#连接canal的端口

canal.port= 11111

#监听到的数据变更发送的队列

canal.destinations= example

F、 客户端开发,在maven中引入canal的依赖

   <dependency>
         <groupId>com.alibaba.otter</groupId>
          <artifactId>canal.client</artifactId>
          <version>1.0.21</version>
      </dependency>

代码示例:

package com.example;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

 
public class CanalClientExample {

    public static void main(String[] args) {
        while (true) {
            //连接canal
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "canal", "canal");
            connector.connect();
            //订阅 监控的 数据库.表
            connector.subscribe("demo_db.user_tab");
            //一次取10条
            Message msg = connector.getWithoutAck(10);

            long batchId = msg.getId();
            int size = msg.getEntries().size();
            if (batchId < 0 || size == 0) {
                System.out.println("都不可以可以


消息,休眠5秒");
                try {
                    Thread.sleep(60

00);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                //
                CanalEntry.RowChange row = null;
                for (CanalEntry.Entry entry : msg.getEntries()) {
                    try {
                        row = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        List<CanalEntry.RowData> rowDatasList = row.getRowDatasList();
                        for (CanalEntry.RowData rowdata : rowDatasList) {
                            List<CanalEntry.Column> afterColumnsList = rowdata.getAfterColumnsList();
                            Map<String, Object> dataMap = transforListToMap(afterColumnsList);
                            if (row.getEventType() == CanalEntry.EventType.INSERT) {
                                //具体业务操作
                                System.out.println(dataMap);
                            } else if (row.getEventType() == CanalEntry.EventType.UPDATE) {
                                //具体业务操作
                                System.out.println(dataMap);
                            } else if (row.getEventType() == CanalEntry.EventType.DELETE) {
                                List<CanalEntry.Column> beforeColumnsList = rowdata.getBeforeColumnsList();
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    if ("id".equals(column.getName())) {
                                        //具体业务操作
                                        System.out.println("删除的id:" + column.getValue());
                                    }
                                }
                            } else {
                                System.out.println("某些操作类型不做除理");
                            }

                        }

                    } catch (InvalidProtocolBufferException e) {
                        e.printStackTrace();
                    }
                }
                //确认消息
                connector.ack(batchId);
            }


        }
    }

    public static Map<String, Object> transforListToMap(List<CanalEntry.Column> afterColumnsList) {
        Map map = new HashMap();
        if (afterColumnsList != null && afterColumnsList.size() > 0) {
            for (CanalEntry.Column column : afterColumnsList) {
                map.put(column.getName(), column.getValue());
            }
        }
        return map;
    }


}

2)、基于BulkLoad的数据同步,比如从hive同步数据到hbase

 

一帮人都儿有一种生活方法还须要实现,

A、 使用spark任务,通过HQl读取数据,否则再通过hbase的Api插入到hbase中。

否则你你是什么做法,速度很低,否则大批量的数据一并插入Hbase,对Hbase的性能影响很大。

在大数据量的情形下,使用BulkLoad还须要快速导入,BulkLoad主某些 借用了hbase的存储设计思想,肯能hbase本质是存储在hdfs上的一另1个多多多文件夹,否则底层是以一另1个多多多个的Hfile处于的。HFile的形式处于。Hfile的路径格式一般是原先的:

/hbase/data/default(默认是你你是什么,肯能hbase的表都不可以可以 指定命名空间句子,肯能指定了,你你是什么某些 命名空间的名字)/<tbl_name>/<region_id>/<cf>/<hfile_id>

B、 BulkLoad实现的原理某些 按照HFile格式存储数据到HDFS上,生成Hfile还须要使用hadoop的MapReduce来实现。肯能不会 hive中的数据,比如內部的数据,都不可以可以 一帮人都儿还须要将內部的数据生成文件,否则上传到hdfs中,组装RowKey,否则将封装后的数据在回写到HDFS上,以HFile的形式存储到HDFS指定的目录中。

 

当然一帮人都儿也还须要不完后 生成hfile,还须要使用spark任务直接从hive中读取数据转加进去RDD,否则使用HbaseContext的自动生成Hfile文件,累积关键代码如下:

…
//将DataFrame转换bulkload须要的RDD格式
    val rddnew = datahiveDF.rdd.map(row => {
      val rowKey = row.getAs[String](rowKeyField)
 
      fields.map(field => {
        val fieldValue = row.getAs[String](field)
        (Bytes.toBytes(rowKey), Array((Bytes.toBytes("info"), Bytes.toBytes(field), Bytes.toBytes(fieldValue))))
      })
    }).flatMap(array => {
      (array)
    })
…
//使用HBaseContext的bulkload生成HFile文件
    hbaseContext.bulkLoad[Put](rddnew.map(record => {
      val put = new Put(record._1)
      record._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
      put
    }), TableName.valueOf(hBaseTempTable), (t : Put) => putForLoad(t), "/tmp/bulkload")
 
    val conn = ConnectionFactory.createConnection(hBaseConf)
    val hbTableName = TableName.valueOf(hBaseTempTable.getBytes())
    val regionLocator = new HRegionLocator(hbTableName, classOf[ClusterConnection].cast(conn))
    val realTable = conn.getTable(hbTableName)
    HFileOutputFormat2.configureIncrementalLoad(Job.getInstance(), realTable, regionLocator)
 
    // bulk load start
    val loader = new LoadIncrementalHFiles(hBaseConf)
    val admin = conn.getAdmin()
    loader.doBulkLoad(new Path("/tmp/bulkload"),admin,realTable,regionLocator)
 
    sc.stop()
  }
…
  def putForLoad(put: Put): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
    val ret: mutable.MutableList[(KeyFamilyQualifier, Array[Byte])] = mutable.MutableList()
    import scala.collection.JavaConversions._
    for (cells <- put.getFamilyCellMap.entrySet().iterator()) {
      val family = cells.getKey
      for (value <- cells.getValue) {
        val kfq = new KeyFamilyQualifier(CellUtil.cloneRow(value), family, CellUtil.cloneQualifier(value))
        ret.+=((kfq, CellUtil.cloneValue(value)))
      }
    }
    ret.iterator
  }
}

…

C、pg_bulkload的使用

这是一另1个多多多支持pg库(PostgreSQL)批量导入的插件工具,它的思想也是通过內部文件加载的方法,你你是什么工具笔者都不可以可以 亲自去用过,详细的介绍还须要参考:https://my.oschina.net/u/3317105/blog/852785   pg_bulkload项目的地址:http://pgfoundry.org/projects/pgbulkload/

3)、基于sqoop的全量导入

Sqoop 是hadoop生态中的一另1个多多多工具,专门用于內部数据导入进入到hdfs中,內部数据导出时,支持某些常见的关系型数据库,也是在大数据中常用的一另1个多多多数据导出导入的交换工具。

 

Sqoop从內部导入数据的流程图如下:

Sqoop将hdfs中的数据导出的流程如下:

本质不会 用了大数据的数据分布式除理来快速的导入和导出数据。

4)、HBase中建表,否则Hive中建一另1个多多多內部表,原先当Hive中写入数据后,HBase中也会一并更新,否则须要注意

A、hbase中的空cell在hive中会补null

B、hive和hbase中不匹配的字段会补null

一帮人都儿还须要在hbase的shell 交互模式下,创建一张hbse表

create 'bokeyuan','zhangyongqing'

使用你你是什么命令,一帮人都儿还须要创建一张叫bokeyuan的表,否则里面有一另1个多多多列族zhangyongqing,hbase创建表时,还须要不想指定字段,否则须要指定表名以及列族

一帮人都儿还须要使用的hbase的put命令插入某些数据

put 'bokeyuan','001','zhangyongqing:name','robot'

put 'bokeyuan','001','zhangyongqing:age','20'

put 'bokeyuan','002','zhangyongqing:name','spring'

put 'bokeyuan','002','zhangyongqing:age','18'

还须要通过hbase的scan 全表扫描的方法查看一帮人都儿插入的数据

scan ' bokeyuan'

一帮人都儿继续创建一张hive內部表

create external table bokeyuan (id int, name string, age int) 

STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 

WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,user:name,user:age") 

TBLPROPERTIES("hbase.table.name" = " bokeyuan");

內部表创建好了后,一帮人都儿还须要使用HQL句子来查询hive中的数据了

select * from classes;

OK

1 robot 20

2 spring 18

Debezium是一另1个多多多开源项目,为捕获数据更改(change data capture,CDC)提供了一另1个多多多低延迟的流式除理平台。让他安装否则配置Debezium去监控你的数据库,否则你的应用就还须要消费对数据库的每一另1个多多多行级别(row-level)的更改。都不可以可以 已提交的更改才是可见的,某些你的应用不想担心事务(transaction)肯能更改被回滚(roll back)。Debezium为所有的数据库更改事件提供了一另1个多多多统一的模型,某些你的应用不想担心每一种生活数据库管理系统的复杂性性性。另外,肯能Debezium用持久化的、有副本备份的日志来记录数据库数据变化的历史,否则,你的应用还须要随时停止再重启,而不想错过它停止运行时处于的事件,保证了所有的事件都能被正确地、详细处于理掉。

该项目的GitHub地址为:https://github.com/debezium/debezium   这是一另1个多多多开源的项目。

 

  原先监控数据库,否则在数据变动的完后 获得通知随便说说总是是一件很复杂性的事情。关系型数据库的触发器还须要做到,否则只对特定的数据库有效,否则通常都不可以可以 更新数据库内的情形(无法和內部的线程池通信)。某些数据库提供了监控数据变动的API肯能框架,否则都不可以可以 一另1个多多多标准,累积数据库的实现方法不会 不同的,否则须要血块特定的知识和理解特定的代码不可以运用。确保以相同的顺序查看和除理所有更改,一并最小化影响数据库仍然非常具有挑战性。

       Debezium正好提供了模块为你做哪几个复杂性的工作。某些模块是通用的,否则不想可以适用多种数据库管理系统,但在功能和性能方面仍有某些限制。另某些模块是为特定的数据库管理系统定制的,某些一帮人都通常还须要更多地利用数据库系统一种生活的特征来提供更多功能,Debezium提供了对MongoDB,mysql,pg,sqlserver的支持。

Debezium是一另1个多多多捕获数据更改(CDC)平台,否则利用Kafka和Kafka Connect实现了此人 的持久性、可靠性和容错性。每一另1个多多多部署在Kafka Connect分布式的、可扩展的、容错性的服务中的connector监控一另1个多多多上游数据库服务器,捕获所有的数据库更改,否则记录到一另1个多多多肯能多个Kafka topic(通常一另1个多多多数据库表对应一另1个多多多kafka topic)。Kafka确保所有哪几个数据更改事件都不想可以多副本否则总体上有序(Kafka都不可以可以 保证一另1个多多多topic的单个分区内有序),原先,更多的客户端还须要独立消费同样的数据更改事件而对上游数据库系统造成的影响降到很小(肯能N个应用都直接去监控数据库更改,对数据库的压力为N,而用debezium汇报数据库更改事件到kafka,所有的应用都去消费kafka中的消息,还须要把对数据库的压力降到1)。另外,客户端还须要随时停止消费,否则重启,从上次停止消费的地方接着消费。每个客户端还须要自行决定一帮人都在否须要exactly-once肯能at-least-once消息交付语义保证,否则所有的数据库肯能表的更改事件是按照上游数据库处于的顺序被交付的。

       对于不须要肯能不想我你你是什么容错级别、性能、可扩展性、可靠性的应用,一帮人都还须要使用内嵌的Debezium connector引擎来直接在应用內部运行connector。你你是什么应用仍须要消费数据库更改事件,但更希望connector直接传递给它,而不会 持久化到Kafka里。

更详细的介绍还须要参考:https://www.jianshu.com/p/f86219b1ab98

bireme 的github 地址  https://github.com/HashDataInc/bireme

bireme 的介绍:https://github.com/HashDataInc/bireme/blob/master/README_zh-cn.md

另外Maxwell也是还须要实现MySQL到Kafka的消息里面件,消息格式采用Json:

Download:

https://github.com/zendesk/maxwell/releases/download/v1.22.5/maxwell-1.22.5.tar.gz 

Source:https://github.com/zendesk/maxwell 

datax 是阿里开源的etl 工具,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能,采用java+python进行开发,核心是java语言实现。

github地址:https://github.com/alibaba/DataX    

A、设计架构:

数据交换通过DataX进行中转,任何数据源只要和DataX连接上即还须要和已实现的任意数据源同步

B、框架

 

核心模块介绍:

  1. DataX完成单个数据同步的作业,一帮人都儿称之为Job,DataX接受到一另1个多多多Job完后 ,将启动一另1个多线程池池来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一另1个多多多Task不会负责一累积数据的同步工作。
  3. 切分多个Task完后 ,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一另1个多多多TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  4. 每一另1个多多多Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  5. DataX作业运行起来完后 , Job监控并停留多个TaskGroup模块任务完成,停留所有TaskGroup任务完成后Job成功退出。否则,异常退出,线程池退出值非0

DataX调度流程:

举例来说,用户提交了一另1个多多多DataX作业,否则配置了20个并发,目的是将一另1个多多多60 张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:

  1. DataXJob根据分库分表切分成了60 个Task。
  2. 根据20个并发,DataX计算共须要分配另1个多多多TaskGroup。
  3. 另1个多多多TaskGroup平分切分好的60 个Task,每一另1个多多多TaskGroup负责以八个并发共计运行2八个Task。

优势:

  • 累积插件不会 此人 的数据转换策略,放置数据失真;
  • 提供作业全链路的流量以及数据量运行时监控,包括作业一种生活情形、数据流量、数据速度、执行进度等。
  • 肯能各种原困原困传输报错的脏数据,DataX还须要实现精确的过滤、识别、派发、展示,为用户提太满种脏数据除理模式;
  • 精确的速度控制
  • 健壮的容错机制,包括线程內部重试、线程级别重试;

从插件视角看框架

  • Job:是DataX用来描述从一另1个多多多源头到目的的同步作业,是DataX数据同步的最小业务单元;
  • Task:为最大化而把Job拆分得到最小的执行单元,进行并发执行;
  • TaskGroup:一组Task集合,在同一另1个多多多TaskGroupContainer执行下的Task集合称为TaskGroup;
  • JobContainer:Job执行器,负责Job全局拆分、调度、前置句子和后置句子等工作的工作单元。你是什么Yarn中的JobTracker;
  • TaskGroupContainer:TaskGroup执行器,负责执行一组Task的工作单元,你是什么Yarn中的TAskTacker。

    总之,Job拆分为Task,分别在框架提供的容器中执行,插件只须要实现Job和Task两累积逻辑。

    物理执行有一种生活运行模式:

  • Standalone:单线程池运行,都不可以可以 內部依赖;
  • Local:单线程池运行,统计信息,错误信息汇报到集中存储;
  • Distrubuted:分布式线程池运行,依赖DataX Service服务;

    总体来说,当JobContainer和TaskGroupContainer运行在同一另1个多线程池池内的完后 某些 单机模式,在不同线程池执行某些 分布式模式。

肯能须要开发插件,还须要看zhege你你是什么插件开发指南:   https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md 

数据源支持情形:

类型数据源Reader(读)Writer(写)文档
RDBMS 关系型数据库 MySQL 读 、写
            Oracle         √         √     读 、写
  SQLServer 读 、写
  PostgreSQL 读 、写
  DRDS 读 、写
  通用RDBMS(支持所有关系型数据库) 读 、写
阿里云数仓数据存储 ODPS 读 、写
  ADS  
  OSS 读 、写
  OCS 读 、写
NoSQL数据存储 OTS 读 、写
  Hbase0.94 读 、写
  Hbase1.1 读 、写
  Phoenix4.x 读 、写
  Phoenix5.x 读 、写
  MongoDB 读 、写
  Hive 读 、写
无特征化数据存储 TxtFile 读 、写
  FTP 读 、写
  HDFS 读 、写
  Elasticsearch  
时间序列数据库 OpenTSDB  
  TSDB  

猜你喜欢

将良心进行到底!Oculus商店在英国推出退款政策

并非 他们儿赶在taobao上疯狂购物,其中一大意味着着是不满意都时需退,但会 目前绝大每项的软件销售平台能不能 像内部销售平台那样都时需退货退款。不过现在,O

2020-01-17

M. Zischler数据,M. Zischler新闻,M. Zischler视频,M. Zischler身价

M.ZischlerM.Zischler俱乐部:皮平斯列特国籍:德国身高:CM位置:中场年龄:50岁体重:KG号码:23号生日:1989-09-24惯用脚:赛季俱乐部上场首发进

2020-01-17

大冶市追回失盗文物清代香案

核心提示:邹氏宗祠一角汉白玉香案大冶市邹氏宗祠镇祠之宝汉白玉香案不翼而飞,村民自发组成寻宝队,历时3年发现其踪迹后报案,后在文物部门和警方努力下,终将其找回。 大冶市邹氏宗祠“

2020-01-17

楷和医心\前列腺癌的风险因素\泌尿外科专科医生 林建文

前列腺癌是香港男性最常见癌症之一,二○一六年有超过一千九百宗新症。多年来研究人员不断尝试找出癌症的成因,近期《国际癌症杂志(InternationalJournalofCanc

2020-01-17

绝地求生刺激战场催化剂怎么获得 刺激战场催化剂获得方法

绝地求生刺激战场催化剂为什么在么在获得?绝地求生刺激战场小春雷为什么在么在制作?一块儿来看看刺激战场催化剂获得法子 。刺激战场催化剂获得法子 1、每天通过帮助好友制作武器,

2020-01-17