hadoop mapreduce 示例

关于hadoop集群的搭建,请参考我的上一篇 hadoop 集群的搭建,这里将说说如何写一个简单的统计单词个数的mapReduce示例程序,并部署在YARN上面运行。

代码托管在:https://github.com/hewentian/bigdata

下面详细说明。

第一步:将要统计单词个数的文件放到HDFS中

例如我们将hadoop安装目录下的README.txt文件放到HDFS中的/目录下,在master节点上面执行:

1
2
$ cd /home/hadoop/hadoop-2.7.3/
$ ./bin/hdfs dfs -put /home/hadoop/hadoop-2.7.3/README.txt /

README.txt文件的内容如下:

For the latest information about Hadoop, please visit our website at:

   http://hadoop.apache.org/core/

and our wiki, at:

   http://wiki.apache.org/hadoop/

This distribution includes cryptographic software.  The country in 
which you currently reside may have restrictions on the import, 
possession, use, and/or re-export to another country, of 
encryption software.  BEFORE using any encryption software, please 
check your country's laws, regulations and policies concerning the
import, possession, or use, and re-export of encryption software, to 
see if this is permitted.  See <http://www.wassenaar.org/> for more
information.

The U.S. Government Department of Commerce, Bureau of Industry and
Security (BIS), has classified this software as Export Commodity 
Control Number (ECCN) 5D002.C.1, which includes information security
software using or performing cryptographic functions with asymmetric
algorithms.  The form and manner of this Apache Software Foundation
distribution makes it eligible for export under the License Exception
ENC Technology Software Unrestricted (TSU) exception (see the BIS 
Export Administration Regulations, Section 740.13) for both object 
code and source code.

The following provides more details on the included cryptographic
software:
  Hadoop Core uses the SSL libraries from the Jetty project written 
by mortbay.org.

第二步:建立一个maven工程

新建一个maven工程,目录结构如下:

其中,pom.xml内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hewentian</groupId>
<artifactId>hadoop</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>

<name>hadoop/</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>

</plugins>
</build>
</project>

第三步:编写mapper程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.hewentian.hadoop.mr;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

import java.io.IOException;
import java.util.StringTokenizer;

/**
* <p>
* <b>WordCountMapper</b> 是
* </p>
*
* @author <a href="mailto:wentian.he@qq.com">hewentian</a>
* @date 2018-12-18 23:06:02
* @since JDK 1.8
*/
public class WordCountMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
/**
* 每次调用map方法会传入split中的一行数据
*
* @param key 该行数据在文件中的位置下标
* @param value 这行数据
* @param outputCollector
* @param reporter
* @throws IOException
*/
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
String line = value.toString();
if (StringUtils.isNotBlank(line)) {
StringTokenizer st = new StringTokenizer(line);

while (st.hasMoreTokens()) {
String word = st.nextToken();
outputCollector.collect(new Text(word), new IntWritable(1)); // map 的输出
}
}
}
}

第四步:编写reducer程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.hewentian.hadoop.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

import java.io.IOException;
import java.util.Iterator;

/**
* <p>
* <b>WordCountReducer</b> 是
* </p>
*
* @author <a href="mailto:wentian.he@qq.com">hewentian</a>
* @date 2018-12-18 23:47:12
* @since JDK 1.8
*/
public class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}

outputCollector.collect(key, new IntWritable(sum));
}
}

第五步:编写job程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.hewentian.hadoop.mr;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;


/**
* <p>
* <b>WordCountJob</b> 是
* </p>
*
* @author <a href="mailto:wentian.he@qq.com">hewentian</a>
* @date 2018-12-19 09:05:18
* @since JDK 1.8
*/
public class WordCountJob {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("need: input file and output dir.");
System.out.println("eg: {HADOOP_HOME}/bin/hadoop jar /home/hadoop/wordCount.jar /README.txt /output/wc/");
System.exit(1);
}

JobConf jobConf = new JobConf(WordCountJob.class);
jobConf.setJobName("word count mapreduce demo");

jobConf.setMapperClass(WordCountMapper.class);
jobConf.setReducerClass(WordCountReducer.class);
jobConf.setOutputKeyClass(Text.class);
jobConf.setOutputValueClass(IntWritable.class);

// mapreduce 输入数据所在的目录或文件
FileInputFormat.addInputPath(jobConf, new Path(args[0]));
// mr执行之后的输出数据的目录
FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));

JobClient.runJob(jobConf);
}
}

第六步:将程序打包成JAR文件

将上述工程打包成JAR文件,并设置默认运行的类为WordCountJob,打包后得文件wordCount.jar,我们将它上传到master节点的home目录下:

1
$ scp wordCount.jar hadoop@hadoop-host-master:~/

第七步:登录master节点执行JAR文件

登录master节点:

1
$ ssh hadoop@hadoop-host-master

执行JAR文件,若指定的输出目录不存在,HDFS会自动创建:

1
2
$ cd /home/hadoop/hadoop-2.7.3/
$ ./bin/hadoop jar /home/hadoop/wordCount.jar /README.txt /output/wc/

执行过程中部分输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
18/12/07 02:48:57 INFO client.RMProxy: Connecting to ResourceManager at hadoop-host-master/192.168.56.110:8032
18/12/07 02:48:58 INFO client.RMProxy: Connecting to ResourceManager at hadoop-host-master/192.168.56.110:8032
18/12/07 02:48:58 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/12/07 02:49:00 INFO mapred.FileInputFormat: Total input paths to process : 1

18/12/07 02:49:00 INFO mapreduce.JobSubmitter: number of splits:2
18/12/07 02:49:00 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1544066791145_0005
18/12/07 02:49:00 INFO impl.YarnClientImpl: Submitted application application_1544066791145_0005
18/12/07 02:49:01 INFO mapreduce.Job: The url to track the job: http://hadoop-host-master:8088/proxy/application_1544066791145_0005/
18/12/07 02:49:01 INFO mapreduce.Job: Running job: job_1544066791145_0005
18/12/07 02:49:10 INFO mapreduce.Job: Job job_1544066791145_0005 running in uber mode : false
18/12/07 02:49:10 INFO mapreduce.Job: map 0% reduce 0%
18/12/07 02:49:20 INFO mapreduce.Job: map 100% reduce 0%
18/12/07 02:49:27 INFO mapreduce.Job: map 100% reduce 100%
18/12/07 02:49:28 INFO mapreduce.Job: Job job_1544066791145_0005 completed successfully
18/12/07 02:49:28 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=2419
FILE: Number of bytes written=360364
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2235
HDFS: Number of bytes written=1306
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=16581
Total time spent by all reduces in occupied slots (ms)=4407
Total time spent by all map tasks (ms)=16581
Total time spent by all reduce tasks (ms)=4407
Total vcore-milliseconds taken by all map tasks=16581
Total vcore-milliseconds taken by all reduce tasks=4407
Total megabyte-milliseconds taken by all map tasks=16978944
Total megabyte-milliseconds taken by all reduce tasks=4512768
Map-Reduce Framework
Map input records=31
Map output records=179
Map output bytes=2055
Map output materialized bytes=2425
Input split bytes=186
Combine input records=0
Combine output records=0
Reduce input groups=131
Reduce shuffle bytes=2425
Reduce input records=179
Reduce output records=131
Spilled Records=358
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=364
CPU time spent (ms)=1510
Physical memory (bytes) snapshot=480575488
Virtual memory (bytes) snapshot=5843423232
Total committed heap usage (bytes)=262725632
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=2049
File Output Format Counters
Bytes Written=1306

等执行成功后,在master节点上查看结果(部分):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
$ cd /home/hadoop/hadoop-2.7.3/
$ ./bin/hdfs dfs -cat /output/wc/*
(BIS), 1
(ECCN) 1
(TSU) 1
(see 1
5D002.C.1, 1
740.13) 1
<http://www.wassenaar.org/> 1
Administration 1
Apache 1
BEFORE 1
BIS 1
Bureau 1
Commerce, 1
Commodity 1
Control 1
Core 1
Department 1
ENC 1
Exception 1
Export 2
For 1
Foundation 1

我们在浏览器中查看HDFS和YARN中的数据

在HDFS管理器中查看:

在YARN管理器中查看:

大功告成!!! (hadoop集群中的时间与我本机的时间不一致,毕竟,很久没启动集群了)

hadoop 集群的搭建

本篇将说说hadoop集群的搭建,这里说的集群是真集群,不是伪集群。不过,这里的真集群是在虚拟机环境中搭建的。

在我的笔记本电脑中,安装虚拟机VirtualBox,在虚拟机中安装三台服务器:master、slave1、slave2来搭建hadoop集群。安装好VirtualBox后,启动它。依次点File -> Host Network Manager -> Create,来创建一个网络和虚拟机中的机器通讯,这个地址是:192.168.56.1,也就是我们外面实体机的地址(仅和虚拟机中的机器通讯使用)。如下图:

我们使用ubuntu 18.04来作为我们的服务器,先在虚拟机中安装好一台服务器master,将Jdk、hadoop在上面安装好,然后将master克隆出slave1、slave2。以master为namenode节点,slave1、slave2作为datanode节点。相关配置如下:

master:
    ip: 192.168.56.110
    hostname: hadoop-host-master
slave1:
    ip: 192.168.56.111
    hostname: hadoop-host-slave-1
slave2:
    ip: 192.168.56.112
    hostname: hadoop-host-slave-2

下面开始master的安装

在虚拟机中安装master的过程中我们会设置一个用户用于登录,我们将用户名、密码都设为hadoop,当然也可以为其他名字,其他安装过程略。安装好之后,使用默认的网关配置NAT,NAT可以访问外网,我们将jdk-8u102-linux-x64.tar.gzhadoop-2.7.3.tar.gz从它们的官网下载到用户的/home/hadoop/目录下。或在实体机中通过SCP命令传进去。然后将网关设置为Host-only Adapter,如下图所示。

网关设置好了之后,我们接下来配置IP地址。在master[Settings] -> [Network] -> [Wired 这里打开] -> [IPv4]按如下设置:

管理集群

在上面的IP等配置好之后,我们选择关闭master,注意不是直接关闭,而是在关闭的时候选择Save the machine state。然后在虚拟机中选中master -> Start 下拉箭头 -> Headless start,然后在我们实体机中通过ssh直接登录到master。

1
$ ssh hadoop@192.168.56.110

我们可以在实体机通过配置/etc/hosts,加上如下配置:

192.168.56.110    hadoop-host-master

然后就可以通过如下方式登录了

1
$ ssh hadoop@hadoop-host-master

在实体机中通过下面的配置,就可以无密码登录了:

1
$ ssh-copy-id hadoop@hadoop-host-master

下面的操作,均是在实体机中通过SSH到虚拟机执行的操作。

安装ssh openssh rsync

如系统已安装,则勿略下面的安装操作

1
$ sudo apt-get install ssh openssh-server rsync

如果上述命令无法执行,请先执行如下命令:

1
$ sudo apt-get update

JDK的安装请参考我之前的笔记:安装 JDK,这里不再赘述。安装到此目录/usr/local/jdk1.8.0_102/下,记住此路径,下面会用到。下在进行hadoop的安装。

1
2
$ cd /home/hadoop/
$ tar xf hadoop-2.7.3.tar.gz

解压后得到hadoop-2.7.3目录,hadoop的程序和相关配置就在此目录中。

建保存数据的目录

1
2
3
4
5
6
$ cd /home/hadoop/hadoop-2.7.3
$ mkdir -p hdfs/tmp
$ mkdir -p hdfs/name
$ mkdir -p hdfs/data
$
$ chmod -R 777 hdfs/

配置文件浏览

hadoop的配置文件都位于下面的目录下:

1
2
3
4
5
6
7
8
9
10
11
12
$ cd /home/hadoop/hadoop-2.7.3/etc/hadoop
$ ls
capacity-scheduler.xml httpfs-env.sh mapred-env.sh
configuration.xsl httpfs-log4j.properties mapred-queues.xml.template
container-executor.cfg httpfs-signature.secret mapred-site.xml.template
core-site.xml httpfs-site.xml slaves
hadoop-env.cmd kms-acls.xml ssl-client.xml.example
hadoop-env.sh kms-env.sh ssl-server.xml.example
hadoop-metrics2.properties kms-log4j.properties yarn-env.cmd
hadoop-metrics.properties kms-site.xml yarn-env.sh
hadoop-policy.xml log4j.properties yarn-site.xml
hdfs-site.xml mapred-env.cmd

配置hadoop-env.sh,加上JDK绝对路径

JDK的路径就是上面安装JDK的时候的路径:

1
export JAVA_HOME=/usr/local/jdk1.8.0_102/

配置core-site.xml,在该文件中加入如下内容

1
2
3
4
5
6
7
8
9
10
11
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>file:/home/hadoop/hadoop-2.7.3/hdfs/tmp</value>
<description>Abase for other temporary directories.</description>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop-host-master:9000</value>
</property>
</configuration>

配置hdfs-site.xml,在该文件中加入如下内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<configuration>
<property>
<name>dfs.nameservices</name>
<value>hadoop-cluster</value>
</property>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/home/hadoop/hadoop-2.7.3/hdfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/home/hadoop/hadoop-2.7.3/hdfs/data</value>
</property>
</configuration>

至此,master中要安装的通用环境配置完成。在虚拟机中将master复制出slave1、slave2。并参考上面配置IP地址的方法将slave1的ip配置为:192.168.56.111,slave2的ip配置为:192.168.56.112

配置主机名

配置master的主机名为hadoop-host-master,在master节点执行如下操作:

1
2
3
4
$ sudo vi /etc/hostname

修改为如下内容
hadoop-host-master

配置slave1的主机名为hadoop-host-slave-1,在slave1节点执行如下操作:

1
2
3
4
$ sudo vi /etc/hostname

修改为如下内容
hadoop-host-slave-1

配置slave2的主机名为hadoop-host-slave-2,在slave2节点执行如下操作:

1
2
3
4
$ sudo vi /etc/hostname

修改为如下内容
hadoop-host-slave-2

注意:各个节点的主机名一定要不同,否则相同主机名的节点,只会有一个连得上namenode节点,并且集群会报错,修改主机名后,要重启才生效。

配置域名解析

分别对master、slave1和slave2都执行如下操作:

1
2
3
4
5
6
7
$ sudo vi /etc/hosts

修改为如下内容
127.0.0.1 localhost
192.168.56.110 hadoop-host-master
192.168.56.111 hadoop-host-slave-1
192.168.56.112 hadoop-host-slave-2

至此,集群配置完成,下面将启动集群。

启动集群

首先启动namenode节点,也就是master,首次启动的时候,要格式化namenode。

1
2
3
4
5
6
7
$ cd /home/hadoop/hadoop-2.7.3/
$ ./bin/hdfs namenode -format # 再次启动的时候不需要执行此操作
$ ./sbin/hadoop-daemon.sh start namenode
$
$ jps # 查看是否启动成功
4016 Jps
2556 NameNode

接下来启动datanode节点,也就是slave1、slave2,在这两台服务器上都执行如下启动脚本。

1
2
3
4
5
6
$ cd /home/hadoop/hadoop-2.7.3/
$ ./sbin/hadoop-daemon.sh start datanode
$
$ jps # 查看是否启动成功
2451 Jps
2162 DataNode

可以在实体机的浏览器中输入:
http://hadoop-host-master:50070/
来查看是否启动成功。

切换tab到Datanodes可以看到有2个datanode节点,如下图所示:

切换到Utilities -> Browse the file system,如下图所示:

从上面的界面可以,目前HDFS中没有任何文件。我们尝试往其中放一个文件,就将我们的hadoop压缩包放进去,在namenode节点中执行如下操作:

1
2
3
4
5
6
$ cd /home/hadoop/hadoop-2.7.3/
$ ./bin/hdfs dfs -put /home/hadoop/hadoop-2.7.3.tar.gz /

$ ./bin/hdfs dfs -ls /
Found 1 items
-rw-r--r-- 2 hadoop supergroup 214092195 2018-12-06 12:20 /hadoop-2.7.3.tar.gz

我们在图形界面中查看,如下图:

我们点击列表中的文件,将会显示它的数据具体分布在哪些节点上,如下图:

停掉集群

先停掉datanode节点,在slave1、slave2上面执行如下命令:

1
2
$ cd /home/hadoop/hadoop-2.7.3/
$ ./sbin/hadoop-daemon.sh stop datanode

然后停掉namenode节点,在master上面执行如下命令:

1
2
$ cd /home/hadoop/hadoop-2.7.3/
$ ./sbin/hadoop-daemon.sh stop namenode

集中式管理集群

如果我们的集群里面有成千上万台机器,在每一台机器上面都这样来启动,肯定是不行的。下面我们将通过配置,只在一台机器上面执行一个脚本,就将整个集群启动。

配置SSH无密码登陆,分别在master、slave1和slave2上面执行如下脚本:

1
$ ssh-keygen -t rsa -P ""

在master上面执行如下脚本:

1
2
3
$ ssh-copy-id hadoop-host-master
$ ssh-copy-id hadoop-host-slave-1
$ ssh-copy-id hadoop-host-slave-2

每执行一条命令的时候,都先输入yes,然后再输入目标机器的登录密码。

如果能成功运行如下命令,则配置免密登录其他机器成功。

1
2
3
$ ssh hadoop-host-master
$ ssh hadoop-host-slave-1
$ ssh hadoop-host-slave-2

在master上面执行如下脚本:

1
2
3
4
5
$ cd /home/hadoop/hadoop-2.7.3/etc/hadoop/
$ vi slaves # 加入如下内容
$
hadoop-host-slave-1
hadoop-host-slave-2

当执行start-dfs.sh时,它会去slaves文件中找从节点。

在master上面启动整个集群:

1
2
$ cd /home/hadoop/hadoop-2.7.3/
$ ./sbin/start-dfs.sh

分别在master、slave1和slave2上面通过jps命令可以看到整个集群已经成功启动。同样的,停掉整个集群的命令,如下,同样是在master上面执行:

1
2
$ cd /home/hadoop/hadoop-2.7.3/
$ ./sbin/stop-dfs.sh

相关操作,如下图所示:

注意:在主节点执行start-dfs.sh,主节点的用户名必须和所有从节点的用户名相同。因为主节点服务器以这个用户名去远程登录到其他从节点的服务器中,所以在所有的生产环境中控制同一类集群的用户一定要相同。

启动YARN

分别在master、slave1和slave2上面都建如下目录:

1
2
3
4
$ cd /home/hadoop/hadoop-2.7.3
$ mkdir -p yarn/nm
$
$ chmod -R 777 yarn/

分别在master、slave1和slave2上面都按如下方式配置mapred-site.xml,刚解压的hadoop是没有mapred-site.xml的,但是有mapred-site.xml.template,我们修改文件名,并作如下配置:

1
2
3
4
5
6
7
8
9
10
$ cd /home/hadoop/hadoop-2.7.3/etc/hadoop/
$ mv mapred-site.xml.template mapred-site.xml
$ vi mapred-site.xml

<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>

分别在master、slave1和slave2上面都按如下方式配置yarn-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
$ cd /home/hadoop/hadoop-2.7.3/etc/hadoop/
$ vi yarn-site.xml

<configuration>
<!-- 指定ResourceManager的地址 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop-host-master</value>
</property>
<!-- 指定reducer获取数据的方式 -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.local-dirs</name>
<value>file:/home/hadoop/hadoop-2.7.3/yarn/nm</value>
</property>
</configuration>

配置好了,下面开始启动:
在master上面启动resourcemanager

1
2
$ cd /home/hadoop/hadoop-2.7.3/
$ ./sbin/yarn-daemon.sh start resourcemanager

在slave1、slave2上面分别启动nodemanager

1
2
$ cd /home/hadoop/hadoop-2.7.3/
$ ./sbin/yarn-daemon.sh start nodemanager

我们可以通过浏览器,查看资源管理器:
http://hadoop-host-master:8088/

点击图中的Active Nodes可以看到下图的详情,(如果Unhealthy Nodes有节点,则可能是由于虚拟机中主机的磁盘空间不足所致)。

当然相应的停止命令如下:

1
2
$ ./sbin/yarn-daemon.sh stop resourcemanager
$ ./sbin/yarn-daemon.sh stop nodemanager

如果有配置集中式管理,我们也可以通过在master上面通过一个命令启动、停止YARN

1
2
$ ./sbin/start-yarn.sh    # 启动yarn
$ ./sbin/stop-yarn.sh # 停止yarn

或者在master上面,通过一个命令启动hadoop和yarn

1
2
3
4
$ ./sbin/start-all.sh
或者按顺序执行如下两个命令
$ ./sbin/start-dfs.sh
$ ./sbin/start-yarn.sh

启动MR作业日志管理器

在namenode节点,也就是master,启动MR作业日志管理器。

1
2
$ cd /home/hadoop/hadoop-2.7.3/
$ ./sbin/mr-jobhistory-daemon.sh start historyserver

它同样有自已的图形界面:
http://hadoop-host-master:19888/

尝试向集群中提交一个mapReduce任务

我们在namenode节点中向集群提交一个计算圆周率的mapReduce任务:

1
2
$ cd /home/hadoop/hadoop-2.7.3/
$ ./bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar pi 4 10

从上图可以看出,圆周率已经被计算出来:3.40。另外,在yarn中也可以看到任务的执行情况:

至此, 集群搭建完毕。

kafka 介绍

这是一篇译文,因英文水平有限,翻译未免有不足之处。如果想看原文,请访问这里:
http://kafka.apache.org/intro

如果想简单体验一下kafka,可以阅读我上两篇介绍的 kafka 单节点安装kafka 集群的搭建

Apache Kafka是一个分布式的流媒体平台,那么,它到底指的是什么呢?

流媒体平台有3个主要的性能指标:

  1. 发布和订阅消息流,类似于消息队列或者企业消息系统;
  2. 以容错方式、持久化存储流数据;
  3. 实时处理流数据。

kafka通常应用于两种广泛的场景:

  1. 在系统或应用程序之间构建可靠的用于传输实时数据的管道;
  2. 构建实时的流数据处理程序,来转换或处理数据流。

为了弄清楚kafka到底是怎样完成这些功能的,从下面开始我们钻研和探究一下kafka的功能

首先,了解一下几个概念:

  1. kafka可以以集群方式运行于一台或者多台服务器,这些服务器可以分布在不同的数据中心;
  2. kafka集群将流式数据分类存储,这种类别通常被称为主题;
  3. 每一条消息由键、值和时间戳组成。

kafka有4个核心API:

  1. Producer API允许应用程序将一条消息发布到一个或者多个kafka主题中;
  2. Consumer API允许应用程序订阅一个或者多个主题,并且处理被发往其中的数据流;
  3. Streams API允许一个应用充当流式处理器:从作为输入流的一个或多个主题中消费消息,然后将处理过的消息输出到另一个或多个主题中。高效地将输入流的数据转换、传输到输出流中;
  4. Connector API允许建立和重用已有的生产者或消费者,它们连接着某个kafka主题,而这些主题是和已存在的应用和数据系统连接着的。例如,关系数据库的连接器将会捕获对表的每一个修改。

在kafka中,客户端和服务器端的通讯是通过一个简单、高效、语言无关的TCP协议完成的。此协议是有版本代差的,但新版本向后兼容旧版本。我们提供一个JAVA客户端连接kafka,但是其他语言的客户端也提供。消费者和服务端建立的是长连接。

主题和日志(存储策略)

我们首先钻研一下kafka中为处理流记录而提供的核心抽象概念–主题。

主题就是一个分类,或者说是专为发布消息而命名的。在kafka中主题通常有多个订阅者,也就是说一个主题可以有零个、一个或者多个消费者,这些消费者都订阅写往其中的消息。

对每一个主题,kafka集群都使用分区存储,像下面这样:

每一个分区中的消息都是按顺序存储的,持续往该分区中存放的数据的顺序都是不可改变的,结构化存储。分区中的每一条消息都会被分配一个有序的ID号,被称为偏移量,用于唯一标示该分区中的每一条消息。

kafka集群会持久化发布到它的每一条消息,无论它们是否已经被消费过,可以通过配置文件配置该消息存放多久。例如,如果保存策略被设置为2天,那么当一条消息发布2天之内,它都是可以被消费的,只是一旦被消费之后,它就会被删掉以释放空间。kafka是持续高性能的,这与存储于它的数据大小关系不大,因此长期保存数据,都是没问题的。

实际上,唯一存储于每一个消费者中的元数据是偏移量或者该消费者在这个分区中访问存储数据的位置。偏移量由消费者控制:通常,消费者中保存的偏移量随着它消费消息,将呈线性增长。但是,实际上,由于这个偏移量是由消费者控制的,所以它可以指定它消费的任何位置上的消息。例如:一个消费者可以重置到一个旧的偏移量来处理旧的数据或者跳过大部分记录,然后从当前位置开始消费消息。每个消费者的偏移量在kafka服务器中也是有存储的。

这个组合功能意味着kafka消费者是轻量级的,它们的连接和断开对集群和其他消费者影响极小。例如,你可以使用我们的命令行工具去不停地显示最新添加到某个主题的内容,而这,不会对任何订阅这个主题的消费者产生影响。

分区在存储中扮演着不同的目的:首先,它允许存储的数据量超过单台服务器允许的规模。每个单独的分区能储存的数据量取决于它所在的服务器磁盘的大小等因素,但是一个主题可以有多个分区,因此它能储存任意数量的数据。其次,分区充当并行处理的单元–同时能处理的并发数。

分布式

一个主题的分区分布于kafka集群中的多台服务器中,每一台服务器都可以处理数据和向共享分区发送请求。为了容错,每一个分区都可以配置一定的副本数。

每一个分区都有一台服务器担当主服务器,可以有零个或者多个从服务器。主服务器处理对分区的所有读和写请求,从服务器由主服务器调度。如果主服务器挂掉了,从服务器中会自动产生一个新的主服务器。集群中的每一台服务器既充当某个分区的主服务器,又充当其他分区的从服务器,所以整个集群是负载均衡的。

地域复制

kafka的MirrorMaker为你的集群提供跨地域复制支持。使用MirrorMaker,消息可以跨越多个数据中心或者不同的云区进行同步。你可以在主从模式下用于备份和恢复,或者在主主模式下使数据更靠近你的用户,或者支持数据本地请求。

生产者

生产者往它们选定的主题中发送消息的时候,应该为每一条消息指定它要发送到的分区。我们可以使用环形策略,简单地使数据平均分配于所有分区中,也可以根据消息中的语义来自动地选择分区。接下来将会说下分区的使用。

消息者

我们可以对消费者分组,每个组有一个组名。每一条发送到指定主题中的消息,都会被订阅了这个主题的同一个组中的一个消费者消费。同一个组中的消费者可以在同一台机器或者多台机器中。

如果所有消费者实例都在同一个组中,那么所有消息都会高效地平均发送到所有消费者实例。
如果所有消费者实例分布在不同的组中,那么每条消息都会被广播到所有组中的一个消费者。

上图所示:该kafka集群有2台服务器,4个分区(P0-P3),有2个消费者组。消费者组A有2个消费者实例,而消费者组B有4个。

通常,主题都会有少量的消费者组,在逻辑上看,一个消费者组就是一个订阅者。每个组包含多个消费者,这能很好的实现扩展和容错。在订阅的语义上:订阅者只不过是一群消费者,而不是一个。

消费的方式在kafka中的实现是通过将分区分配给所有消费者实例,因此在任何时刻,每一个实例都是一个”公平共享“分区的唯一消费者。维护组中成员关系的方式在kafak中是通过kafka协议自动实现的:如果新的实例加进组,那么它将从其他组员中获取一个分区(如果这个组员处理两个以上分区);如果一个实例挂掉了,那么它所处理的分区将被分配给组中剩下的成员们。

kafka对每一个分区中的消息都只提供一个总的顺序,同一个主题中不同分区中的顺序各不相同。每个分区排序组织该分区中数据的能力能满足大部分应用的需求。但是,如果你想要一个所有消息的总顺序,可以通过为这个主题设置一个分区来实现,不过,这意味着一个消费者组中只能有一个消费者来处理该主题的消息。

多租户架构

你可以以多租户架构方式部署kafka。多租户架构可以通过配置,来指定哪个主题可以生产和消费数据,并且支持设置操作指标。管理员可以定义和限制所有请求的指标,以控制客户端能使用的服务器资源数。

更多相关信息,请访问这里

保证

在高层次看kafka提供以下保证:

  1. 一个生产者发往指定主题中指定分区中的消息,将会按照它们发送的顺序出现。例如:如果一个生产者发送消息M1、M2,如果M1先发送,那么M1将会首先出现在那个分区中,并且M1的顺序号要比M2的小;
  2. 消费者按顺序读取存储在主题中的消息;
  3. 如果一个主题有 N 个副本,那么我们能承受高达 N-1 台服务器同时挂掉,而不会丢失任何消息。

更多关于这些保证的描述将在相关章节中详细说明。

kafka作为一个消息系统

kafka的流媒体概念与传统的企业消息系统相比有什么不同?

消息传输在传统上有2种模型:队列和发布-订阅。在队列模型中,一组消费者从一台服务器读取消息,每个消息只会发送到其中一个消费者;在发布-订阅模型中,每条消息都会广播到所有消费者。这两种模型各自都有优势和不足。队列的优势是允许你将消息平均分配给所有消费者处理,这能扩展系统的处理能力。不幸的是,队列不能有多个订阅者,消息一旦被其中一个消费者读取就会被删掉。发布-订阅模型允许你将消息广播到所有消费者,这种方式不能扩展处理能力,因为每条消息都会被发送到所有订阅者。

消费者组的概念在kafka中通常包含上述两种概念。对队列来说,消费者组允许你将数据平均分配给所有消费者组来处理;对发布-订阅来说,kafka允许你将消息广播到所有消费者组。

kafka模型的优势是每个主题都有这两种属性:它能扩展处理能力,同时也支持多个订阅者。我们不需要选择其中一个,或者另外一个。

kafka相比于传统的消息系统,它更能保证消息的顺序。

传统队列会将消息按顺序保存在服务器上,如果多个消费者同时消费这个队列,那么服务器将按消息的存储顺序来分发给消费者。然而,尽管服务器按顺序分发消息,但是消息是异步的发送到每个消费者的,所以不同的消费者接收到消息的顺序可能不同。这意味着,在并行处理的情况下,消息的顺序将不能保证。在消息系统中通常有一个概念:唯一消费者,它允许只有一个消费者消费一个队列,当然这也意味着这种情况下不存在并行处理。

kafka在这方面做得比较好。在主题中,它有一个并行的概念–分区。kafka既能保证消息的顺序,又能在多个消费者之间保持负载均衡。通过将主题的分区分配给指定的消费者组,每个分区只能被消费者组中的一个消费者消费,来实现的。这样,我们能确保这个消费者是这个分区的唯一消费者和按顺序消费这个分区中的消息。尽管主题有很多个分区,我们仍能在多个消费者实例之间保持负载均衡。值得注意的是,消费者组中消费者的数量不能多于分区数。

kafka作为一个存储系统

任何允许发布与消费消息分离的消息队列,实际上充当了目前使用的消息存储系统。kafka的不同之处在于它还是一个非常优秀的存储系统。

写入kafka的数据将会写入磁盘,并且进行副本复制以实现容错。kafka允许生产者等待确认,在收到回复之后才会认为写成功,并且即使写入的服务器失败了,也能保证这条消息是存在的。

kafka能很好地使用磁盘结构来扩容:无论服务器上有 50KB 还是 50TB 的持久化数据,kafka的性能都是一样的,不会随着数据的增多而出现性能下降。

由于kafka可以大规模的存储数据,并且允许客户控制其读取位置,您可以将kafka作为一种专用于高性能、低延迟提交日志存储,并且能复制和传播的分布式文件系统。

更多关于kafka的提交日志存储和副本复制的设计,请访问这里

kafka作为一个流媒体处理系统

kafka仅仅提供读取、写入和存储数据流是不够的,最终目的是实现流的实时处理。

在kafka中,流处理器是指持续地从输入主题获取数据流,对获取到的数据流执行某种处理,并将处理过的数据,持续地输出到输出主题中。

例如,零售店的应用程序可能会将销售额和货物作为输入流,通过相关计算,然后输出重新排序和根据此数据计算的价格调整的流。

可以直接使用生产者和消费者的相关API进行简单处理。但是,对于更复杂的转换,kafka提供了完全集成的Streams API。这允许构建一些应用程序去执行非普通处理任务、计算流的聚合或者将流连接在一起。

它能有效地解决此类应用程序面临的难题:处理无序数据,在代码更改时重新处理输入流,执行有状态计算等。

流式API构建在kafka提供的核心基础功能上:它使用生产者和消费者API进行输入,使用kafka进行有状态存储,并在流处理器实例之间使用相同的组机制来实现容错。

把碎片整合在一起

将消息传递、存储和流处理组合在一起可能看起来没多大用处,但它对于kafka作为流媒体平台的作用至关重要。

像HDFS这种分布式文件系统允许存储静态文件以进行批处理。kafka系统是高效的,它允许存储和处理过去的历史数据。

传统的企业消息系统允许处理你订阅之后到达的数据。以这种方式构建的应用程序只能处理在它订阅之后到达的未来数据。

kafka结合了这两种功能,这种组合对于kafka作为流媒体应用程序平台以及流数据管道的使用至关重要。

通过组合存储和低延迟订阅,流应用程序可以以相同的方式处理过去和未来的数据。也就是说,单个应用程序也可以处理历史存储的数据,而不是在它处理到达最后一条记录时结束,它可以在未来数据到达时继续处理。这就是流处理包含批处理以及消息驱动应用程序的一般概念。

同样,对于流数据管道,通过组合订阅实时事件,可以将kafka用作极低延迟的管道; 另外,能够可靠地存储数据,也使得可以将其用于必须保证安全的核心数据的传输,或者与仅定期加载数据的离线系统或可能长时间停机以进行扩展和维护集成。它的流处理能力使它可以实时的转换数据。

更多关于kafka提供的保证、API和功能的信息,请参阅其余的文档

这里介绍一个很好用的kafka可视化工具kafkatool,下载地址:
http://www.kafkatool.com/

kafkatool连接kafka服务器后,记得要将kafkatool中topic的Message类型设置为String,否则将看到字节码。

创建主题

1
2
> bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my-topic --partitions 1 \
--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1

修改主题

增加分区数,分区只能增加不能减少

1
2
> bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \
--partitions 40

删除主题

1
> bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name

kafka 集群的搭建

本篇将说说kafka集群的搭建,如果你只是想简单体验一下kafka,可以直接使用我在上一篇介绍的 kafka 单节点安装 即可。

但是,如果你想在生产环境中使用,那么搭建一个集群可能更适合你。下面将说说kafka集群的安装使用,kafka同样是使用前面例子使用的2.0.0版本,我在一台机器上安装,所以这是伪集群,当修改为真集群的时候,只要将IP地址修改下即可,下面会说明。

首先,你得搭建 zookeeper 集群,因为高版本的kafka中内置了zookeeper组件,所以我们直接使用kafka中内置的zookeeper组件搭建zookeeper集群。但是,你也可以使用zookeeper独立的安装包来搭建zookeeper集群。两者的搭建方法都是一样的,可以参考 zookeeper集群版安装方法

计划在一台Ubuntu Linux服务器上部署3台kafka服务器,分别为kafka1, kafka2, kafka3

因为三台kafka服务器的配置都差不多,所以我们先设置好一台kafka1的配置,再将其复制成kafka2, kafka3并修改其中的配置即可。

下面使用kafka内置的zookeeper组件搭建zookeeper集群,我们将kafka的所有服务器都放在同一个目录下:

1.建目录,如下:

1
2
$ cd /home/hewentian/ProjectD
$ mkdir kafkaCluster

2.将kafka_2.12-2.0.0.tgz放到/home/hewentian/ProjectD/kafkaCluster目录下,并执行如下脚本解压

1
2
3
4
5
6
7
8
9
10
11
12
13
$ cd /home/hewentian/ProjectD/kafkaCluster
$ tar xzvf kafka_2.12-2.0.0.tgz

$ ls
kafka_2.12-2.0.0 kafka_2.12-2.0.0.tgz

$ rm kafka_2.12-2.0.0.tgz
$ mv kafka_2.12-2.0.0/ kafka1 # 为方便起见,将其命名为 kafka1

$ cd kafka1/
$ mkdir -p data/zk # 存放zookeeper数据的目录
$ mkdir -p data/kafka # 存放kafka数据的目录
$ mkdir logs # 新解压的 kafka 没有此目录,需手动创建。因为重定向的日志logs/zookeeper.log需要此目录

3.修改/home/hewentian/ProjectD/kafkaCluster/kafka1/config/zookeeper.properties并在其中修改如下内容:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/hewentian/ProjectD/kafkaCluster/kafka1/data/zk  # 这里必须为绝对路径,否则有可能无法启动
clientPort=2181                                               # 这台服务器的端口为2181这里为默认值
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

4.在/home/hewentian/ProjectD/kafkaCluster/kafka1/data/zk目录下建myid文件并在其中输入1,只输入1,代表server.1

1
2
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka1/data/zk
$ vi myid

这样第一台服务器已经配置完毕。

5.接下来我们将kafka1复制为kafka2, kafka3

1
2
3
$ cd /home/hewentian/ProjectD/kafkaCluster
$ cp -r kafka1 kafka2
$ cp -r kafka1 kafka3

6.将kafka2/data/zk目录下的myid的内容修改为2

1
2
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka2/data/zk
$ vi myid

同理,将将kafka3/data/zk目录下的myid的内容修改为3

1
2
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka3/data/zk
$ vi myid

7.修改kafka2的配置文件

1
2
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka2/config
$ vi zookeeper.properties

仅修改两处地方即可,要修改的地方如下:

dataDir=/home/hewentian/ProjectD/kafkaCluster/kafka2/data/zk  # 这里是数据保存的位置
clientPort=2182                                               # 这台服务器的端口为2182

同理,修改kafka3的配置文件

1
2
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka3/config
$ vi zookeeper.properties

仅修改两处地方即可,要修改的地方如下:

dataDir=/home/hewentian/ProjectD/kafkaCluster/kafka3/data/zk  # 这里是数据保存的位置
clientPort=2183                                               # 这台服务器的端口为2183

8.到目前为此,我们已经将3台zookeeper服务器都配置好了。接下来,我们要将他们都启动

启动kafka1的zookeeper服务器

1
2
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka1
$ nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties > logs/zookeeper.log 2>&1 &

启动kafka2的zookeeper服务器

1
2
3
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka2
$ mkdir logs
$ nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties > logs/zookeeper.log 2>&1 &

启动kafka3的zookeeper服务器

1
2
3
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka3
$ mkdir logs
$ nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties > logs/zookeeper.log 2>&1 &

9.当三台服务器都启动好了,我们分别连到三台zookeeper服务器:

连接到kafka1的zookeeper服务器

1
2
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka1
$ ./bin/zookeeper-shell.sh 127.0.0.1:2181

连接到kafka2的zookeeper服务器

1
2
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka2
$ ./bin/zookeeper-shell.sh 127.0.0.1:2182

连接到kafka3的zookeeper服务器

1
2
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka3
$ ./bin/zookeeper-shell.sh 127.0.0.1:2183

可以通过查看logs/zookeeper.log文件,如果没有报错就说明zookeeper集群启动成功。

这样你在kafka1中的zookeeper所作的修改,都会同步到kafka2, kafka3
例如你在kafka1的zookeeper服务器

1
$ create /zk_test_cluster my_data_cluster

你在kafka2, kafka3的zookeeper客户端用

1
$ ls /

都会看到节点zk_test_cluster

至此,zookeeper集群部署结束。

10.搭建kafka集群

配置kafka1服务器:

1
2
3
4
5
6
7
8
9
10
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka1
$ vi config/server.properties

broker.id=1 # 这里设置为1,另外两台分别设置为2、3

listeners=PLAINTEXT://127.0.0.1:9092 # IP地址和端口,这里使用默认的 9092,另外两台分别使用9093、9094

log.dirs=/home/hewentian/ProjectD/kafkaCluster/kafka1/data/kafka

zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

配置kafka2服务器:

1
2
3
4
5
6
7
8
9
10
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka2
$ vi config/server.properties

broker.id=2

listeners=PLAINTEXT://127.0.0.1:9093

log.dirs=/home/hewentian/ProjectD/kafkaCluster/kafka2/data/kafka

zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

配置kafka3服务器:

1
2
3
4
5
6
7
8
9
10
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka3
$ vi config/server.properties

broker.id=3

listeners=PLAINTEXT://127.0.0.1:9094

log.dirs=/home/hewentian/ProjectD/kafkaCluster/kafka3/data/kafka

zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

11.启动三台kafka服务器

1
2
3
4
5
6
7
8
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka1
$ ./bin/kafka-server-start.sh -daemon /home/hewentian/ProjectD/kafkaCluster/kafka1/config/server.properties

$ cd /home/hewentian/ProjectD/kafkaCluster/kafka2
$ ./bin/kafka-server-start.sh -daemon /home/hewentian/ProjectD/kafkaCluster/kafka2/config/server.properties

$ cd /home/hewentian/ProjectD/kafkaCluster/kafka3
$ ./bin/kafka-server-start.sh -daemon /home/hewentian/ProjectD/kafkaCluster/kafka3/config/server.properties

分别从三台kafka服务器中查看启动日志logs/server.log,如果没报错,并且看到如下输出,则启动成功:

# kafka1 的输出
[2018-10-27 15:48:54,890] INFO Kafka version : 2.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2018-10-27 15:48:54,890] INFO Kafka commitId : 3402a8361b734732 (org.apache.kafka.common.utils.AppInfoParser)
[2018-10-27 15:48:54,895] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)

# kafka2 的输出
[2018-10-27 15:49:22,694] INFO Kafka version : 2.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2018-10-27 15:49:22,694] INFO Kafka commitId : 3402a8361b734732 (org.apache.kafka.common.utils.AppInfoParser)
[2018-10-27 15:49:22,697] INFO [KafkaServer id=2] started (kafka.server.KafkaServer)

# kafka3 的输出
[2018-10-27 15:49:41,746] INFO Kafka version : 2.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2018-10-27 15:49:41,746] INFO Kafka commitId : 3402a8361b734732 (org.apache.kafka.common.utils.AppInfoParser)
[2018-10-27 15:49:41,749] INFO [KafkaServer id=3] started (kafka.server.KafkaServer)

至此,kafka集群搭建成功。下面,我们简单的试用一下。

12.创建topic

在任意一台kafka服务器上面创建topic,例如在kafka1上面创建一个名为 my-replicated-topic 的 topic,指定 1 个分区,3 个副本:

1
2
3
4
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka1
$ ./bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 --replication-factor 3 --partitions 1 --topic my-replicated-topic

Created topic "my-replicated-topic".

上面的参数--zookeeper是集群列表,可以指定所有节点,也可以指定为部分列表。

查看topic的情况:

1
2
3
4
5
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka1
$ ./bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic my-replicated-topic

Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1

13.发送消息

往我们刚才创建的toipc中发送消息,在任意一台kafka上面都可以的,我们在kafka2上面执行:

1
2
3
4
5
6
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka2
$ ./bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --topic my-replicated-topic
>
>my test message 1
>my test message 2
>

14.消费消息

将我们刚刚发送的消息消费掉,我们从kafka3上面执行:

1
2
3
4
5
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka3
$ ./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --from-beginning --topic my-replicated-topic

my test message 1
my test message 2

我们在生产者中发送消息,在消费者中就能实时的看到消息。

15.容错测试

从上面可知my-replicated-topic的leader为3,那我们将broker.id=3的进程杀掉:

1
2
3
4
5
6
7
8
9
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka3
$ ps -ef | grep kafka3/config/server.properties
hewenti+ 22018 1897 5 17:19 pts/23 00:00:16 /usr/local/java/jdk1.8.0_102/bin/java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true

[中间省略部分]

-0.10.jar:/home/hewentian/ProjectD/kafkaCluster/kafka3/bin/../libs/zookeeper-3.4.13.jar kafka.Kafka /home/hewentian/ProjectD/kafkaCluster/kafka3/config/server.properties

$ kill -9 22018 # 单机环境下不能通过执行: ./bin/kafka-server-stop.sh 来杀掉当前目录下的kafka,它会杀掉全部kafka

再查看my-replicated-topic的情况:

1
2
3
4
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka3
$ ./bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 3,2,1 Isr: 1

由上面可见,leader已经变为1。并且,生产消息和消费消息一样可用,不受影响:

1
2
3
4
5
6
7
8
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka2
$ ./bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --topic my-replicated-topic
>
>my test message 1
>my test message 2
>
> Tim Ho
>

1
2
3
4
5
6
7
$ cd /home/hewentian/ProjectD/kafkaCluster/kafka3
$ ./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --from-beginning --topic my-replicated-topic

my test message 1
my test message 2

Tim Ho

未完,待续……

kafka 单节点安装

本文将说下kafka的单节点安装,我的机器为Ubuntu 16.04 LTS,下面的安装过程参考:
http://kafka.apache.org/quickstart

第一步:我们要将kafka安装包下载回来

截止本文写时,它的最新版本为2.0.0,可以在它的官网下载。

1
2
3
4
5
6
7
8
9
$ cd /home/hewentian/ProjectD/
$ wget https://www.apache.org/dist/kafka/2.0.0/kafka_2.12-2.0.0.tgz
$ wget https://www.apache.org/dist/kafka/2.0.0/kafka_2.12-2.0.0.tgz.sha512

验证下载文件的完整性,在下载的时候要将 SHA512 文件也下载回来
$ sha512sum -c kafka_2.12-2.0.0.tgz.sha512
kafka_2.12-2.0.0.tgz: OK

$ tar xzf kafka_2.12-2.0.0.tgz

第二步:启动服务器

kafka需要用到zookeeper,所以必须首先启动zookeeper。在高版本的kafka发行包中,已经内置zookeeper,我们直接使用即可。

1
2
$ cd /home/hewentian/ProjectD/kafka_2.12-2.0.0/
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties

启动成功后,会看到如下输出:

[2018-10-24 09:14:29,072] INFO Server environment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)
[2018-10-24 09:14:29,072] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
[2018-10-24 09:14:29,072] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
[2018-10-24 09:14:29,072] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-10-24 09:14:29,072] INFO Server environment:os.version=4.13.0-32-generic (org.apache.zookeeper.server.ZooKeeperServer)
[2018-10-24 09:14:29,072] INFO Server environment:user.name=hewentian (org.apache.zookeeper.server.ZooKeeperServer)
[2018-10-24 09:14:29,073] INFO Server environment:user.home=/home/hewentian (org.apache.zookeeper.server.ZooKeeperServer)
[2018-10-24 09:14:29,073] INFO Server environment:user.dir=/home/hewentian/ProjectD/kafka_2.12-2.0.0 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-10-24 09:14:29,091] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-10-24 09:14:29,091] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-10-24 09:14:29,091] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-10-24 09:14:29,111] INFO Using org.apache.zookeeper.server.NIOServerCnxnFactory as server connection factory (org.apache.zookeeper.server.ServerCnxnFactory)
[2018-10-24 09:14:29,121] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

接着,打开另外一个终端,启动kafka服务器:

1
2
$ cd /home/hewentian/ProjectD/kafka_2.12-2.0.0/
$ ./bin/kafka-server-start.sh config/server.properties

启动成功后,会看到如下输出:

[2018-10-24 11:01:45,462] INFO [SocketServer brokerId=0] Started processors for 1 acceptors (kafka.network.SocketServer)
[2018-10-24 11:01:45,494] INFO Kafka version : 2.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2018-10-24 11:01:45,494] INFO Kafka commitId : 3402a8361b734732 (org.apache.kafka.common.utils.AppInfoParser)
[2018-10-24 11:01:45,497] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

第三步:创建topic

创建一个名字叫test的topic,只有一个分区和一个副本,打开另外一个终端:

1
2
3
4
5
6
7
$ cd /home/hewentian/ProjectD/kafka_2.12-2.0.0/
$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

查看所有创建的topic
$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
test

第四步:往topic发送消息

kafka自带一个命令行的客户端,用于从文件中或者标准输入中读取消息并且发送到kafka集群,默认每一行会被作为一条消息发送:

1
2
3
4
5
$ cd /home/hewentian/ProjectD/kafka_2.12-2.0.0/
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>This is a message
>This is another message
>

第五步:消费topic中的消息

kafka同样自带一个命令行的消费者,它会将消息输出到标准输出:

1
2
3
4
$ cd /home/hewentian/ProjectD/kafka_2.12-2.0.0/
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

这样,一个简单的单节点kafka服务器就搭建完成了,接下来我们将尝试搭建多节点的集群。

jenkins 学习笔记

本篇将说说jenkins的使用,通过阅读本POST,你将拥有一台属于自已的jenkins服务器。

首先,我们要将jenkins的安装包下载回来,可以在它的官网下载最新稳定版:

1
2
3
4
5
6
7
$ cd /home/hewentian/ProjectD/
$ wget http://mirrors.jenkins.io/war-stable/latest/jenkins.war
$ wget http://mirrors.jenkins.io/war-stable/latest/jenkins.war.sha256

验证下载文件的完整性
$ sha256sum -c jenkins.war.sha256
jenkins.war: OK

我们将它安装在当前目录(/home/hewentian/ProjectD)下,在当前目录下创建一个jenkins目录,用作JENKINS_HOME目录,我们将相关命令放到一个脚本start_jenkins.sh中:

1
2
3
$ cd /home/hewentian/ProjectD
$ touch start_jenkins.sh
$ vi start_jenkins.sh

其中start_jenkins.sh脚本的内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/bin/sh

JENKINS_HOME=/home/hewentian/ProjectD/jenkins
JENKINS_WAR=/home/hewentian/ProjectD/jenkins.war
LOG_ROOT=$JENKINS_HOME/logs
LOG_FILE=$LOG_ROOT/jenkins.log
WEB_ROOT=$JENKINS_HOME/war

echo "Starting Jenkins ..."
echo "JENKINS_HOME: $JENKINS_HOME"
echo "JENKINS_WAR: $JENKINS_WAR"
echo "LOG_FILE: $LOG_FILE"
echo "WEB_ROOT: $WEB_ROOT"

if [ ! -d $JENKINS_HOME ]; then
echo "creating: $JENKINS_HOME"
mkdir $JENKINS_HOME
fi

if [ ! -d $LOG_ROOT ]; then
echo "creating: $LOG_ROOT"
mkdir $LOG_ROOT
fi

if [ ! -e $LOG_FILE ]; then
echo "creating: $LOG_FILE"
touch $LOG_FILE
fi

if [ ! -d $WEB_ROOT ]; then
echo "creating: $WEB_ROOT"
mkdir $WEB_ROOT
fi

java -Xms1024m -Xmx1024m -Djava.awt.headless=true -DJENKINS_HOME=$JENKINS_HOME -jar $JENKINS_WAR --logfile=$LOG_FILE --webroot=$WEB_ROOT --httpPort=8080 --daemon >> $LOG_FILE

tail -f $LOG_FILE

启动jenkins:

1
2
$ chmod +x start_jenkins.sh
$ . start_jenkins.sh

如果你看到如下输出:

Starting Jenkins ...
JENKINS_HOME: /home/hewentian/ProjectD/jenkins
JENKINS_WAR: /home/hewentian/ProjectD/jenkins.war
LOG_FILE: /home/hewentian/ProjectD/jenkins/logs/jenkins.log
WEB_ROOT: /home/hewentian/ProjectD/jenkins/war
creating: /home/hewentian/ProjectD/jenkins
creating: /home/hewentian/ProjectD/jenkins/logs
creating: /home/hewentian/ProjectD/jenkins/logs/jenkins.log
creating: /home/hewentian/ProjectD/jenkins/war
Forking into background to run as a daemon.
Running from: /home/hewentian/ProjectD/jenkins.war
Oct 06, 2018 10:48:18 AM org.eclipse.jetty.util.log.Log initialized
INFO: Logging initialized @780ms to org.eclipse.jetty.util.log.JavaUtilLog
Oct 06, 2018 10:48:18 AM winstone.Logger logInternal
.
.
. 中间省略部分日志
.
Oct 06, 2018 10:48:29 AM jenkins.install.SetupWizard init
INFO: 

*************************************************************
*************************************************************
*************************************************************

Jenkins initial setup is required. An admin user has been created and a password generated.
Please use the following password to proceed to installation:

02b24053bc4844f4a348fdbbbf65c347

This may also be found at: /home/hewentian/ProjectD/jenkins/secrets/initialAdminPassword

*************************************************************
*************************************************************
*************************************************************

Oct 06, 2018 10:48:36 AM hudson.model.UpdateSite updateData
INFO: Obtained the latest update center data file for UpdateSource default
Oct 06, 2018 10:48:37 AM hudson.model.UpdateSite updateData
INFO: Obtained the latest update center data file for UpdateSource default
Oct 06, 2018 10:48:37 AM jenkins.InitReactorRunner$1 onAttained
INFO: Completed initialization
Oct 06, 2018 10:48:37 AM hudson.WebAppMain$3 run
INFO: Jenkins is fully up and running
Oct 06, 2018 10:48:37 AM hudson.model.DownloadService$Downloadable load
INFO: Obtained the updated data file for hudson.tasks.Maven.MavenInstaller
Oct 06, 2018 10:48:37 AM hudson.model.AsyncPeriodicWork$1 run
INFO: Finished Download metadata. 10,126 ms

则证明启动成功,我们按上面的提示打开浏览器,输入:
http://localhost:8080

你将会见到如下界面:

将上面日志中的密码输入到上述界面,并点击[Continue]按钮,将出现下图界面:

为简单起见,选择Install suggested plugins安装即可,安装进度如下:

接下来是设置admin用户和密码:
Username: hewentian
Password: abc123

点击[Save and Continue],并在接下来的界面点击[Save and Finish]完成设置。

下面进行简单的配置

按下图所示设置JDK、Maven:[Manage Jenkins]->[Global Tool Configuration]

下面安装插件

[Manage Jenkins]->[Manage Plugins]
安装Maven Integration插件,如下图,直接点击Install without restart,该插件是用于建立maven job

安装Deploy to container插件,用于将构建好的应用部署到容器中:

下面演示构建项目

示例A、构建一个从gitHub中拉取原码的maven项目

[New Item]->选择[Maven project],并在[Enter an item name]中输入mvn-test,然后点击[ok],如下图:

在弹出的界面中选中[Discard old builds]并将Max of builds to keep设为10,然后设置源码仓库,如下所示:

注意: 如果我们的仓库中包含有多个项目,而我们此处要构建的只是其中一个,则我们需要指定构建哪一个:Additional Behaviours -> Add -> Sparse Checkout paths,在Path处填入: /{repository_name}/{need_to_build_project}/**

例如:
Repository URL: https://github.com/jenkins-docs/simple-java-maven-app.git
Path: /simple-java-maven-app/my-app/**
如果是这种方式,则下面的Root POM也要修改成对应的项目:
Root POM: simple-java-maven-app/my-app/pom.xml
上面的Path开头是有/的,而Root POM开头是没有/的。

设置Build的Goals and optionsclean install,如下:

其他设置保持默认,点击[Save],在弹出的界面点击[Build Now],然后再点击下方构建历史中正在构建的任务的[Console Output]


如上图所示,构建成功了。切换到上图中的目录中查看目标文件,并运行它:

这样,一个简单的maven项目就构建完成了。

示例B、构建一个从gitHub中拉取原码的maven web项目,并部署到运行中的tomcat

首先,我们创建一个最简单的maven web项目,并推到:
https://github.com/hewentian/web-test
web-test项目只有三个文件:

pom.xml
src/main/webapp/WEB-INF/web.xml
src/main/webapp/index.jsp

pom.xml文件内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hewentian</groupId>
<artifactId>web-test</artifactId>
<packaging>war</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>web-test Maven Webapp</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>web-test</finalName>
</build>
</project>

src/main/webapp/WEB-INF/web.xml文件内容如下:

1
2
3
4
5
6
7
<!DOCTYPE web-app PUBLIC
"-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
"http://java.sun.com/dtd/web-app_2_3.dtd" >

<web-app>
<display-name>Archetype Created Web Application</display-name>
</web-app>

src/main/webapp/index.jsp文件内容如下:

1
2
3
4
5
<html>
<body>
<h2>Hello World!</h2>
</body>
</html>

接着,我们准备一台tomcat,我已经准备好了一台,位于:

/home/hewentian/ProjectD/apache-tomcat-8.0.47

因为jenkins使用了8080端口,所以tomcat不能使用默认的8080端口,我们将其修改为8867

1
2
3
4
5
$ cd /home/hewentian/ProjectD/apache-tomcat-8.0.47/conf
$ vi server.xml

只修改此处即可
<Connector port="8867" protocol="HTTP/1.1" connectionTimeout="20000" redirectPort="8443" />

配置tomcat的管理员帐号:

1
2
3
4
5
6
7
8
9
10
11
$ cd /home/hewentian/ProjectD/apache-tomcat-8.0.47/conf
$ vi tomcat-users.xml

在<tomcat-users>节点里添加如下内容:

<role rolename="manager-gui"/>
<role rolename="manager-script"/>
<role rolename="manager-jmx"/>
<role rolename="manager-status"/>

<user username="hwt" password="pwd123" roles="manager-gui,manager-script,manager-jmx,manager-status"/>

其中的username="hwt" password="pwd123"是用于登录Tomcat用的,下面会用到,重启tomcat。

回到jenkins,我们新建一个Item,命名为web-app-test:

配置代码仓库,如下图。点击Credentials右边的Add->jenkins

在弹出的对话框中,选择SSH Username with private key,将~/.ssh/id_rsa文件的内容复制到Key中,点Add

在配置代码仓库中,选择刚才创建的Credentials

配置构建触发器:

说明:

  1. Build whenever a SNAPSHOT dependency is built:在构建的时候,会根据pom.xml文件的继承关系构建发生一个构建引起其他构建的;
  2. Poll SCM:这是CI系统中常见的选项。当您选择此选项,您可以指定一个定时作业表达式来定义Jenkins每隔多久检查一下您源代码仓库的变化。如果发现变化,就执行一次构建。例如,表达式中填写0,15,30,45 将使Jenkins每隔15分钟就检查一次您源码仓库的变化;
  3. Build periodically:此选项仅仅通知Jenkins按指定的频率对项目进行构建,而不管SCM是否有变化。如果想在这个Job中运行一些测试用例的话,它就很有帮助。

配置构建设置:

接着我们试着点击Build Now试下能否成功构建:

当你看到如下输出时,证明构建成功:

接着我们配置部署到tomcat,回到web-app-test的jenkins配置,在Add post-build action中选择Deploy war/ear to a container,如下图:

Credentials右则点击Add->Jenkins,并在弹出的对话框中输入上面在tomcat中配置的用户名:

说明:

  1. 首先tomcat是启动的,并且Tomcat中没有部署web-test.war;
  2. WAR/EAR files:war文件的存放位置,如:target/web-test.war 注意:相对路径,target前是没有/的;
  3. Context path:访问时需要输入的内容,如wt访问时如下:http://127.0.0.1:8867/wt/,如果为空,默认是war包的名字;
  4. Container:选择你的web容器,如tomca 8.x;
  5. Credentials: 在右边的下拉页面中选择访问Tomcat的用户名、密码,如果没有,则点【Add】;
  6. Tomcat URL:填入你Tomcat的访问地址,如:http://127.0.0.1:8867/;
  7. svn、git、tomcat的用户名和密码设置了是没有办法在web界面修改的。如果要修改则先去Jenkins目录删除hudson.scm.SubversionSCM.xml文件,或者在jenkins用户页中删掉该用户,虽然jenkins页面提供修改方法,但是,无效。

接着我们点击Build Now开始构建:

如果你看到上面输出,则证明构建和部署成功,可以打开浏览器查看:

到此,大功告成。

示例C、将示例A产生的JAR包部署到远程机器上面运行

我们回到示例A的jenkins配置,在Post Steps下选择Run only if build succeeds,点Add post-build step并选择Execute shell,在Command中填入如下脚本,此脚本由我同事严忠思编写,我稍作修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 1.定义变量
# SSH 端口
export SSH_PORT="12022"

# 运行 jar 包的机器,多个IP以空格分隔,如: 192.168.30.241 192.168.30.242
export SSH_IP_LIST="192.168.30.241"

# 运行 jar 的用户
export USERNAME="root"

# 环境 dev,test,gray,prod
export RUN_SERVER="dev"

# 远程存放 jar 包文件路径,注这个路径要先手动创建, mkdir -p /www/web/my-app && chown root.root /www/web/my-app
export REMOTE_JAR_DIR="/www/web/my-app/${RUN_SERVER}"

# Jenkins (-DJENKINS_HOME)用 maven 编译打包程序的路径与文件
export JENKINS_JAR_FILE="/home/hewentian/ProjectD/jenkins/workspace/mvn-test/target/my-app-1.0-SNAPSHOT.jar"

#jar 打包文件名
export JAR_FILE="my-app-1.0-SNAPSHOT.jar"

# 运行 JAR 的端口,我这里并不使用这个端口号,故可不填
export JAR_PORT="8802"

# 日志路径
export LOG_PATH="/www/logs/my-app"

#jvm参数
export JAR_JAVA_OPTS="-XX:-UseGCOverheadLimit -Xmx1024m"

# jar 运行命令
export JAR_COMMOND="nohup java ${JAR_JAVA_OPTS} \
-jar ${REMOTE_JAR_DIR}/${JAR_FILE} \
--spring.cloud.config.profile=${RUN_SERVER} \
--server.port=${JAR_PORT} \
--logging.path=${LOG_PATH} \
> ${LOG_PATH}/my-app.log &"

# 等待时间,如果不配置,则脚本默认为 40 秒
export SLEEP_SEC=20

# 2.主程序
/bin/bash -x /home/hewentian/ProjectD/jenkins/script/jar.sh

在我的本机执行如下命令,创建存放脚本的目录:

1
2
3
4
5
$ cd /home/hewentian/ProjectD/jenkins
$ mkdir script
$ cd script
$ touch jar.sh
$ chmod +x jar.sh

在jar.sh中输入如下脚本,此脚本同样由我的同事严忠思编写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#!/bin/env sh

source /etc/profile > /dev/null 2>&1

echo "-------------------- start print env var --------------------"
echo "SSH_PORT: $SSH_PORT"
echo "SSH_IP_LIST: $SSH_IP_LIST"
echo "USERNAME: $USERNAME"
echo "RUN_SERVER: $RUN_SERVER"
echo "REMOTE_JAR_DIR: $REMOTE_JAR_DIR"
echo "JENKINS_JAR_FILE: $JENKINS_JAR_FILE"
echo "JAR_COMMOND: $JAR_COMMOND"
echo "-------------------- end print env var --------------------"

#### IP 数量
IP_LIST=1
HOST_COUNT=$(echo ${SSH_IP_LIST} | wc -w)

#### 默认定义时间为40秒
if [ "${SLEEP_SEC}" == "" ];then
SLEEP_SEC=40
fi

#### 检查是否添加公钥
function SSH_CHECK(){
ssh -p ${SSH_PORT} -o "StrictHostKeyChecking=no" ${USERNAME}@${SSH_HOST} "uname -n"
if [ "$?" -ne 0 ];then
echo -e "Jenkins 登录失败, ${SSH_HOST} 没有添加 SSH 公钥,请把 Jenkins 公钥添加到 ${SSH_HOST} \n"
echo "或检查 ${SSH_HOST} ~/.ssh 目录与 ~/.ssh/authorized_keys 文件权限(chmod 700 ~/.ssh && chmod 600 ~/.ssh/authorized_keys)"
exit 1
fi
}

#### 构建目录,如果失败可以验证客户端没权限
function RUN_DIR(){
ssh -p ${SSH_PORT} -o "StrictHostKeyChecking=no" ${USERNAME}@${SSH_HOST} "uname -n;/bin/mkdir -p ${REMOTE_JAR_DIR}"
RUN_ID=`echo $?`
if [ "${RUN_ID}" -ne 0 ];then
echo "${SSH_HOST}${USERNAME} 用户创建目录失败,请检查 ${USERNAME} 用户是否有权限"
exit 1
fi
}

#### 存放日志目录
function LOG_PATH_DIR(){
ssh -p ${SSH_PORT} -o "StrictHostKeyChecking=no" ${USERNAME}@${SSH_HOST} "uname -n;/bin/mkdir -p ${LOG_PATH}"
RUN_ID=`echo $?`
if [ "${RUN_ID}" -ne 0 ];then
echo "${SSH_HOST}${USERNAME} 用户创建目录失败,请检查 ${USERNAME} 用户是否有权限"
exit 1
fi
}

function RSYNC_JAR(){
rsync -azP --delete -e "ssh -p $SSH_PORT -o 'StrictHostKeyChecking=no'" ${JENKINS_JAR_FILE} ${USERNAME}@${SSH_HOST}:${REMOTE_JAR_DIR} > /dev/null
}

function JAR_PID(){
PID=$(ssh -p ${SSH_PORT} -o "StrictHostKeyChecking=no" ${USERNAME}@${SSH_HOST} "/usr/sbin/lsof -i:${JAR_PORT} | grep -vi PID | awk '{print \$2}'")
echo $PID
}

function STOP_JAR(){
if [ -n "${PID}" ] || [ -n "${PID2}" ];then
ssh -p ${SSH_PORT} -o "StrictHostKeyChecking=no" ${USERNAME}@${SSH_HOST} "kill -9 $PID > /dev/null 2>&1"
echo "INFO: ${JAR_FILE} 进程已杀"
else
echo "INFO: ${JAR_FILE} is Down"
fi
PID=""
}

function START_JAR(){
ssh -p ${SSH_PORT} -o "StrictHostKeyChecking=no" ${USERNAME}@${SSH_HOST} "source /etc/profile > /dev/null; cd ${REMOTE_JAR_DIR}; ${JAR_COMMOND} "
}

function CHECK_JAR(){
PID=$(ssh -p ${SSH_PORT} -o "StrictHostKeyChecking=no" ${USERNAME}@${SSH_HOST} "/usr/sbin/lsof -i:${JAR_PORT} | grep -vi PID | awk '{print \$2}'")
if [ "$PID" != "" ];then
echo "${SSH_HOST}${JAR_FILE} 启动成功"
else
echo "${SSH_HOST}${JAR_FILE} 启动失败,请运维登录服务器查看进程或相关启动日志"
exit 1
fi
}

for SSH_HOST in $SSH_IP_LIST
do
SSH_CHECK
RUN_DIR
LOG_PATH_DIR
RSYNC_JAR
JAR_PID
#STOP_JAR
START_JAR

if [ "${IP_LIST}" -le "${HOST_COUNT}" ];then
echo "正在检测 ${SSH_HOST}${JAR_FILE} 程序是否成功启动,请等待 ${SLEEP_SEC} 秒!"
sleep ${SLEEP_SEC}
#CHECK_JAR
fi
if [ "${IP_LIST}" -gt "${HOST_COUNT}" ];then
echo "INFO: <${IP_LIST}> 更新下一台机..."
fi
let IP_LIST=IP_LIST+1
done

回到jenkins去点击[Build Now],在[Console Output]观看它的构建情况。等构建成功后,我们登录192.168.30.241查看情况:

1
2
3
4
5
6
7
8
9
10
$ cd /home/hewentian/
$ ssh -p 12022 root@192.168.30.241

Last login: Thu Oct 11 11:18:50 2018 from 10.1.23.231
[root@192.168.30.241 ~]# ls /www/
logs web
[root@192.168.30.241 ~]# ls /www/web/my-app/dev/
my-app-1.0-SNAPSHOT.jar
[root@192.168.30.241 ~]# more /www/logs/my-app/my-app.log
Hello World!

从上述输出可知,我们的构建已经成功!!!

将脚本存放在jar.sh中的好处是此脚本可以供多个项目共同使用,只要在Execute shell中根据不同项目定义不同的变量值即可。

示例D、构建指定的git分支

要实现这个功能,我们要在jenkins安装一个插件Git Parameter

我们还是以示例A的为例,去到它的配置中,选中This project is parameterized,点Add parameter->Git Parameter,设置如下:

并在Branches to build按下图所示填:

回到mvn-test这个job,你会发现原先的Build Now已经变成了Build with Parameters,我们点它:

至此,就可以构建我们想构建的分支了。

示例E、当构建出错的时候,如何回滚(rollback)到上一个版本

要实现这个功能,我们要在jenkins安装一个插件Copy Artifact

未完待续……

ELK 日志系统的搭建

本篇将介绍 ELK 日志系统的搭建,我们将在一台机器上面搭建,系统配置如下:

logstash的整体结构图如下:

我们将使用redis作为上图中的INPUTS,而elasticsearch作为上图中的OUTPUTS,这也是logstash官方的推荐。而它们的安装可以参考以下例子:
redis的安装请参考:redis 的安装使用
elasticsearch的安装请参考:elasticsearch 单节点安装

注意:elasticsearch、logstash、kibana它们的版本最好保持一致,这里都是使用6.4.0版本。

kibana的安装将在本篇的稍后介绍,下面先介绍下logstash的安装

首先,我们要将logstash安装包下载回来,可以在它的官网下载,当然,我们也可以从这里下载 logstash-6.4.0.tar.gz,推荐从logstash官网下载对应版本。

1
2
3
4
5
6
7
8
9
$ cd /home/hewentian/ProjectD/
$ wget https://artifacts.elastic.co/downloads/logstash/logstash-6.4.0.tar.gz
$ wget https://artifacts.elastic.co/downloads/logstash/logstash-6.4.0.tar.gz.sha512

验证下载文件的完整性,在下载的时候要将 SHA512 文件也下载回来
$ sha512sum -c logstash-6.4.0.tar.gz.sha512
logstash-6.4.0.tar.gz: OK

$ tar xzf logstash-6.4.0.tar.gz

解压后,得到目录logstash-6.4.0,可以查看下它包含有哪些文件

1
2
3
4
5
6
$ cd /home/hewentian/ProjectD/logstash-6.4.0
$ ls

bin data lib logstash-core NOTICE.TXT x-pack
config Gemfile LICENSE.txt logstash-core-plugin-api tools
CONTRIBUTORS Gemfile.lock logs modules vendor

测试安装是否成功:以标准输入、标准输出作为input, output

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
$ cd /home/hewentian/ProjectD/logstash-6.4.0/bin
$ ./logstash -e 'input { stdin { } } output { stdout { } }'

Sending Logstash logs to /home/hewentian/ProjectD/logstash-6.4.0/logs which is now configured via log4j2.properties
[2018-10-02T14:25:37,017][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-10-02T14:25:38,201][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.4.0"}
[2018-10-02T14:25:41,748][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-10-02T14:25:41,919][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x4c1685e4 run>"}
The stdin plugin is now waiting for input:
[2018-10-02T14:25:41,990][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2018-10-02T14:25:42,396][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}

#此时窗口在等待输入

hello world

#下面是logstash的输出结果

{
"@version" => "1",
"@timestamp" => 2018-10-02T06:25:59.608Z,
"message" => "hello world",
"host" => "hewentian-Lenovo-IdeaPad-Y470"
}

从上面的测试结果可知,软件安装正确,下面开始我们的定制配置。

配置文件放在config目录下,此目录下已经有一个示例配置,因为我们要将redis作为我们的INPUTS,所以我们要建立它的配置文件:

1
2
3
4
$ cd /home/hewentian/ProjectD/logstash-6.4.0/config
$ cp logstash-sample.conf logstash-redis.conf
$
$ vi logstash-redis.conf

logstash-redis.conf中配置如下,这里暂未配置FILTERS(后面会讲到如何配置):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# Sample Logstash configuration for creating a simple
# Redis -> Logstash -> Elasticsearch pipeline.

input {
redis {
type => "systemlog"
host => "127.0.0.1"
port => 6379
password => "abc123"
db => 0
data_type => "list"
key => "systemlog"
}
}

output {
if [type] == "systemlog" {
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "redis-systemlog-%{+YYYY.MM.dd}"
#user => "elastic"
#password => "changeme"
}
}
}

在启动logstash前,验证一下配置文件是否正确,这是一个好习惯:

1
2
$ cd /home/hewentian/ProjectD/logstash-6.4.0/bin
$ ./logstash -f ../config/logstash-redis.conf -t

如果你见到如下输出,则配置正确:

Sending Logstash logs to /home/hewentian/ProjectD/logstash-6.4.0/logs which is now configured via log4j2.properties
[2018-09-30T16:32:45,043][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"/home/hewentian/ProjectD/logstash-6.4.0/data/queue"}
[2018-09-30T16:32:45,064][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/home/hewentian/ProjectD/logstash-6.4.0/data/dead_letter_queue"}
[2018-09-30T16:32:46,030][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
Configuration OK
[2018-09-30T16:32:50,630][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

接下来,就可以启动logstash了:

1
2
$ cd /home/hewentian/ProjectD/logstash-6.4.0/bin
$ ./logstash -f ../config/logstash-redis.conf

如果见到如下输出,则启动成功:

[2018-09-30T16:34:44,175][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

下面进行简单的测试

我们首先,往redis中推入3条记录:

1
2
3
4
5
6
7
8
$ cd /home/hewentian/ProjectD/redis-4.0.11_master/src
$ ./redis-cli -h 127.0.0.1
127.0.0.1:6379> AUTH abc123
OK
127.0.0.1:6379> lpush systemlog hello world
(integer) 2
127.0.0.1:6379> lpush systemlog '{"name":"Tim Ho","age":23,"student":true}'
(integer) 1

启动elastchsearch-head可以看到数据已经进入到es中了:

你会发现上面推到systemlog中的信息如果是JSON格式,则在elasticsearch中会自动解析到相应的field中,否则会放到默认的field:message中。

kibana的安装

kibana的安装很简单,将kibana安装包下载回来,可以在它的官网下载,当然,我们也可以从这里下载 kibana-6.4.0-linux-x86_64.tar.gz,推荐从kibana官网下载对应版本。

1
2
3
4
5
6
7
8
9
$ cd /home/hewentian/ProjectD/
$ wget https://artifacts.elastic.co/downloads/kibana/kibana-6.4.0-linux-x86_64.tar.gz
$ wget https://artifacts.elastic.co/downloads/kibana/kibana-6.4.0-linux-x86_64.tar.gz.sha512

验证下载文件的完整性,在下载的时候要将 SHA512 文件也下载回来
$ sha512sum -c kibana-6.4.0-linux-x86_64.tar.gz.sha512
kibana-6.4.0-linux-x86_64.tar.gz: OK

$ tar xzf kibana-6.4.0-linux-x86_64.tar.gz

kibana配置要查看的elasticsearch,只需修改如下配置项即可,如果是在本机安装elasticsearch,并且使用默认的9200端口,则无需配置。

1
2
3
4
5
6
7
8
$ cd /home/hewentian/ProjectD/kibana-6.4.0-linux-x86_64/config
$ vi kibana.yml

#修改如下配置项,如果使用默认的,则无需修改
#server.port: 5601
#elasticsearch.url: "http://localhost:9200"
#elasticsearch.username: "user"
#elasticsearch.password: "pass"

接着启动kibana

1
2
$ cd /home/hewentian/ProjectD/kibana-6.4.0-linux-x86_64/bin
$ ./kibana # 或者以后台方式运行 nohup ./kibana &

打开浏览器,并输入下面的地址:
http://localhost:5601

你将看到如下界面:

点击上图中的[Management]->[Index Patterns]->[Create index pattern],输入index name:redis-systemlog-*,如下图

点击[Next step]按钮,并在接下来的界面中的Time Filter field name中选择I don't want to user the Time Filter,最后点击Create index pattern完成创建。接着点击左则的[Discover]并在左则的界面中选择中redis-systemlog-*,你将看到如下结果:

至此,简单的 ELK 基本搭建完毕。下面展示一个简单的配置示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Sample Logstash configuration for creating a simple
# Redis -> Logstash -> Elasticsearch pipeline.

input {
# system log
redis {
type => "systemlog"
host => "127.0.0.1"
port => 6379
password => "abc123"
db => 0
data_type => "list"
key => "systemlog"
codec => "json"
}

# user log
redis {
type => "userlog"
host => "127.0.0.1"
port => 6379
password => "abc123"
db => 0
data_type => "list"
key => "userlog"
codec => "json"
}
}

output {
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "%{type}-%{+YYYY.MM}"
}
}

下面我们将继续探索它的高级功能。

很多时候,对于systemlog中的某条信息(不一定是JSON格式),如果我们只需要某些信息,那我们又怎样做呢?这里就需要使用FILTERS了。

在FILTERS中使用grok正则表达式,关于grok,可以参见这里的说明:
https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html

未完,待续……

elasticsearch 学习笔记

参考资料:
Elasticsearch 权威指南
Elasticsearch Reference
http://es.xiaoleilu.com/080_Structured_Search/20_contains.html
https://github.com/searchbox-io/Jest/tree/master/jest/src/test/java/io/searchbox/core

首先,你必须至少有一台elasticsearch服务器可以使用,如果还没安装,可以参考我的上两篇 elasticsearch 单节点安装elasticsearch 集群的搭建

使用JAVA API来操作elasticsearch的例子可以在这里找到:EsJestUtil.javaEsJestDemo.java

要单独创建一个索引

curl -XPUT 'http://localhost:9200/user_index' -H 'Content-Type: application/json' -d '{
    "settings" : {
        "index" : {
            "number_of_shards" : 4,
            "number_of_replicas" : 1
        }
    }
}'

{"acknowledged":true,"shards_acknowledged":true,"index":"user_index"}

删除索引的命令

curl -XDELETE 'http://localhost:9200/user_index/'

{"acknowledged":true}

为user_index中的user创建mapping

curl -XPUT 'http://localhost:9200/user_index/_mapping/user' -H 'Content-Type: application/json' -d '{
    "properties": {
        "id": {
            "type": "long",
            "index": "false"
        },
        "name": {
            "type": "keyword"
        },
        "age": {
            "type": "integer"
        },
        "tags": {
            "type": "keyword",
            "boost": 3.0
       },
       "birthday": {
            "type": "date",
            "format": "strict_date_optional_time || epoch_millis || yyyy-MM-dd HH:mm:ss"
       }
   }
}'

修改索引的副本数

Es在索引数据的时候,如果存在副本,那么主分片会将数据同时同步到副本。所以,如果当前插入大量数据,那么会对es集群造成一定的压力,所以我们最好把副本数设置为0;等数据建立完索引之后,在手动的将副本数更改到2,这样可以提高数据的索引效率。副本数量最好是1-2个。

curl -XPUT 'http://localhost:9200/user_index/_settings' -H 'Content-Type: application/json' -d '
{
    "index" : {
        "number_of_replicas" : 2
    }
}'

ES别名

别名的好处,就是更换索引的时候,系统业务代码无需修改。
查询所有别名:

curl -XGET 'http://localhost:9200/_alias?pretty=true' -H 'Content-Type: application/json'

查询某个索引的别名:

curl -XGET 'http://localhost:9200/user_index/_alias?pretty=true' -H 'Content-Type: application/json'

添加别名:

curl -XPOST 'http://localhost:9200/_aliases' -H 'Content-Type: application/json' -d '
{
    "actions" : [{"add" : {"index" : "user_index" , "alias" : "user"}}]
}'

删除别名:

curl -XPOST 'http://localhost:9200/_aliases' -H 'Content-Type: application/json' -d '
{
    "actions" : [{"remove" : {"index" : "user_index" , "alias" : "user"}}]
}'

修改别名,ES没有修改别名的操作,只能先删除后添加:

curl -XPOST 'http://localhost:9200/_aliases' -H 'Content-Type: application/json' -d '
{
    "actions" : [
                {"remove" : {"index" : "user_index" , "alias" : "user"}},
                {"add" : {"index" : "user_index" , "alias" : "user2"}}
    ]
}'

ES中的一些概念

cluster
代表一个集群,集群中有多个节点,其中有一个为主节点,这个主节点是可以通过选举产生的,主从节点是对于集群内部来说的。es的一个概念就是去中心化,字面上理解就是无中心节点,这是对于集群外部来说的,因为从外部来看es集群,在逻辑上是个整体,你与任何一个节点的通信和与整个es集群通信是等价的。

shards
代表索引分片,es可以把一个完整的索引分成多个分片,这样的好处是可以把一个大的索引拆分成多个,分布到不同的节点上。构成分布式搜索。分片的数量只能在索引创建前指定,并且索引创建后不能更改。

replicas
代表索引副本,es可以设置多个索引的副本,副本的作用一是提高系统的容错性,当某个节点某个分片损坏或丢失时可以从副本中恢复。二是提高es的查询效率,es会自动对搜索请求进行负载均衡。

recovery
代表数据恢复或叫数据重新分布,es在有节点加入或退出时会根据机器的负载对索引分片进行重新分配,挂掉的节点重新启动时也会进行数据恢复。

river
代表es的一个数据源,也是其它存储方式(如:数据库)同步数据到es的一个方法。它是以插件方式存在的一个es服务,通过读取river中的数据并把它索引到es中,官方的river有couchDB的,RabbitMQ的,Twitter的,Wikipedia的。

gateway
代表es索引快照的存储方式,es默认是先把索引存放到内存中,当内存满了时再持久化到本地硬盘。gateway对索引快照进行存储,当这个es集群关闭再重新启动时就会从gateway中读取索引备份数据。es支持多种类型的gateway,有本地文件系统(默认),分布式文件系统,Hadoop的HDFS和amazon的s3云存储服务。

discovery.zen
代表es的自动发现节点机制,es是一个基于p2p的系统,它先通过广播寻找存在的节点,再通过多播协议来进行节点之间的通信,同时也支持点对点的交互。

Transport
代表es内部节点或集群与客户端的交互方式,默认内部是使用tcp协议进行交互,同时它支持http协议(json格式)、thrift、servlet、memcached、zeroMQ等的传输协议(通过插件方式集成)。

Date formats can be customised, but if no format is specified then it uses the default:

"strict_date_optional_time||epoch_millis"

if you set it like this:

PUT my_index
{
  "mappings": {
    "_doc": {
      "properties": {
        "date": {
          "type":   "date",
          "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
        }
      }
    }
  }
}

you can use the below method to set date:

PUT my_index/_doc/1
{ "date": "2015-01-01 12:10:30" } 

PUT my_index/_doc/2
{ "date": "2015-01-01T12:10:30Z" } 

PUT my_index/_doc/3
{ "date": 1420070400001 }

文件的部分更新

文档是不可变的:他们不能被修改,只能被替换。 update API 必须遵循同样的规则。 从外部来看,我们在一个文档的某个位置进行部分更新。然而在内部, update API 简单使用与之前描述相同的 检索-修改-重建索引 的处理过程。 区别在于这个过程发生在分片内部,这样就避免了多次请求的网络开销。通过减少检索和重建索引步骤之间的时间,我们也减少了其他进程的变更带来冲突的可能性。

方法一:update 请求最简单的一种形式是接收文档的一部分作为 doc 的参数, 它只是与现有的文档进行合并。对象被合并到一起,覆盖现有的字段,增加新的字段。 例如,我们增加字段 tags 和 views 到我们的博客文章,如下所示:

POST /website/blog/1/_update
{
       "doc" : {
          "tags" : [ "testing" ],
          "views": 0
       }
}

测试示例:

先插件一条数据:
curl -XPUT 'http://localhost:9200/facebook/tuser/1?pretty' -H 'Content-Type: application/json' -d '
{
    "user": "tim",
    "post_date": "2009-11-15T13:12:00",
    "message": "Elasticsearch, so far so good?"
}'

再将这条数所的 message 字段修改一下
curl -XPOST 'http://localhost:9200/facebook/tuser/1/_update' -H 'Content-Type: application/json' -d '
{
    "doc":{
        "message": "Elasticsearch, so far so good? yes"
    }
}'

方法二:使用脚本部分更新文档编辑
脚本可以在 update API中用来改变 _source 的字段内容, 它在更新脚本中称为 ctx._source 。 例如,我们可以使用脚本来增加博客文章中 views 的数量:

POST /website/blog/1/_update
{
       "script" : "ctx._source.views+=1"
}
curl -XPOST 'http://127.0.0.1:9200/facebook/tuser/1/_update' -H 'Content-Type: application/json' -d '
{
    "script" : "ctx._source.message=\"yes, you are right.\""
}'

下面的命令可以列出每个 Index 所包含的 Type

1
$ curl -XGET 'http://127.0.0.1:9200/user_index/_mapping?pretty=true'

  1. cluster.name
    配置es的集群名称,默认是elasticsearch,不同的集群用名字来区分,es会自动发现在同一网段下的es,配置成相同集群名字的各个节点形成一个集群。如果在同一网段下有多个集群,就可以用这个属性来区分不同的集群。
  2. http.port
    设置对外服务的http端口,默认为9200。不能相同,否则会冲突。

ES有很多插件,我们可以选择安装一些,例如,以安装head插件为例。有两种方式安装,一种为在线安装,另一种为本地安装,本地安装要下载插件(git clone)。
插件下载地址为:
https://github.com/mobz/elasticsearch-head

这里以在线安装为例,我之前在介绍elasticsearch 单节点安装中使用的是本地安装,推荐使用本地安装。旧版本安装过程如下:
进入ES的HOME目录,执行plugin命令,如下,

cd ${ES_HOME}/bin
./elasticsearch-plugin install mobz/elasticsearch-head

安装完毕后,要重启ES。在浏览器中输入:http://localhost:9200/_plugin/head/,如果看到了页面,则表明安装成功。

ES一次查询,最多返回10条,但hits会显示total一共有多少条,要使用from, size指定。
在ES里面删除数据的时候要非常小心,如果全部都清空了,可能整个库的MAPPING都会有问题。这时,一些原先可以执行的语句可能会无法执行

清空数据

注意是清空,不是删除,示例:
在kibana下命令:

1
2
3
4
5
6
POST /my_index/my_type/_delete_by_query?refresh&slices=3&pretty
{
"query": {
"match_all": {}
}
}

在curl下命令:

curl -XPOST 'http://127.0.0.1:9200/my_index/my_type/_delete_by_query?refresh&slices=3&pretty' -H 'Content-Type: application/json' -d '
{
  "query": {
    "match_all": {}
  }
}'

以下是一些常查询:

{
  "query": {
    "bool": {
      "should": [
        {
          "match": {
            "_id": "http://www.abc.com"
          }
        },
        {
          "match": {
            "_id": "http://www.csdn.net/tag/scala"
          }
        }
      ]
    }
  }
}


{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "address": "*canton*"
          }
        },
        {
          "match": {
            "name": "Tim"
          }
        }
      ]
    }
  }
}


{
  "query": {
    "bool": {
      "must": [
        {
          "bool": {
            "should": [
              {
                "match": {
                  "title": {
                    "minimum_should_match": "100%",
                    "query": "Air Quality"
                  }
                }
              },
              {
                "match": {
                  "body_text": {
                    "minimum_should_match": "100%",
                    "query": "Air Quality"
                  }
                }
              }
            ]
          }
        },
        {
          "wildcard": {
            "user_ids": "*760aa069-2ed2-40d6-89da-f62e83f82887*"
          }
        }
      ]
    }
  },
  "from": 0,
  "size": 20
}


{
  "sort": [
    {
      "updatetime_6h": {
        "order": "desc"
      }
    },
    {
      "_score": {
        "order": "desc"
      }
    }
  ],
  "query": {
    "filtered": {
      "query": {
        "bool": {
          "must_not": [],
          "should": [
            {
              "bool": {
                "should": [
                  {
                    "match_phrase": {
                      "app_type.title": {
                        "query": "china"
                      }
                    }
                  },
                  {
                    "match_phrase": {
                      "app_type.title": {
                        "query": "中国"
                      }
                    }
                  }
                ]
              }
            },
            {
              "bool": {
                "should": [
                  {
                    "match_phrase": {
                      "app_type.body_text": {
                        "query": "china"
                      }
                    }
                  },
                  {
                    "match_phrase": {
                      "app_type.body_text": {
                        "query": "中国"
                      }
                    }
                  }
                ]
              }
            }
          ],
          "must": []
        }
      },
      "filter": {
        "bool": {
          "should": [],
          "must": [
            {
              "range": {
                "updatetime": {
                  "lte": 1474617163524
                }
              }
            },
            {
              "query": {
                "wildcard": {
                  "user_ids": "*760aa069-2ed2-40d6-89da-f62e83f82887*"
                }
              }
            }
          ]
        }
      }
    }
  },
  "from": 0,
  "size": 20
}

GET /people/user/_search
{
  "query": {
    "bool":{
      "must":[{
        "match":{"birthYear":1989}}
        ]
    }
  },
  "aggregations" : {
    "CATEGORY" : {
      "terms" : {
        "field" : "deposit",
        "size" : 100,
        "order" : {
          "_count" : "desc"
        }
      }
    }
  },
  "size":0
}


GET /people/user/_search
{
  "aggs": {
    "NAME": {
      "terms": {
        "field": "username",
        "size": 100
      }
    }
  },
  "query": {
    "bool": {
      "must": [
        {
          "wildcard": {
            "username": {
              "value": "*文*"
            }
          }
        }
      ]
    }
  },
  "size":100
}


GET /people/user/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "city": "深圳市"
          }
        },
        {
          "bool": {
            "should": [
              {
                "match": {
                  "industry": "制造业"
                }
              },
              {
                "match": {
                  "industry": "通用设备制造业"
                }
              },
              {
                "match": {
                  "industry": "专用设备制造业"
                }
              }
            ]
          }
        },
        {
          "range": {
            "patentCount": {
              "gte": 5
            }
          }
        },
        {
          "range": {
            "regCapital": {
               "gte": 200,
              "lte": 5000
            }
          }
        },
        {
          "match": {
            "isGaoXin": 0
          }
        },
        {
          "range": {
            "regYear": {
              "gte": 2015,
              "lte": 2017
            }
          }
        }
      ]
    }
  },
  "size": 20
}


查询 username 必须存在的记录
GET /people/user/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "exists": {
            "field": "username"
          }
        }
      ]
    }
  }
}

查询返回某些指定的字段

如果查询的结果字段很多,而我们仅需要其中的某些字段的时候,我们可能通过_source来指定,比如我们只要userName, address:

GET people/user/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "userName": "张三"
          }
        }
      ]
    }
  },
  "_source": [
    "userName",
    "address"
  ],
  "size": 2
}

深度翻页问题

ES默认的分页机制一个不足的地方是,比如有5010条数据,当你仅想取第5000到5010条数据的时候,ES也会将前5000条数据加载到内存当中。从价值观上来看,使用大量的CPU,内存和带宽,分类过程确实会变得非常重要。 为此,我们强烈建议不要进行深度分页。

{
    "query": {
        "bool": {
            "must": [],
            "must_not": [],
            "should": [{
                "wildcard": {
                    "orgName": "*有限公司"
                }
            }, {
                "wildcard": {
                    "orgName": "*株式会社"
                }
            }]
        }
    },
    "from": 1000000,
    "size": 100,
    "sort": [],
    "aggs": {}
}
Result window is too large, from + size must be less than or equal to: [1000000] but was [1000100]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting.

要解决这个问题,可以使用下面的方式来改变ES默认深度分页的index.max_result_window最大窗口值

curl -XPUT http://127.0.0.1:9200/my_index/_settings -d '{ "index" : { "max_result_window" : 500000}}'

其中my_index为要修改的index名,500000为要调整的新的窗口数

或者分页使用ES的scroll api实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import static org.elasticsearch.index.query.QueryBuilders.*;

QueryBuilder qb = termQuery("multi", "test");

SearchResponse scrollResp = client.prepareSearch(test)
.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC)
.setScroll(new TimeValue(60000))
.setQuery(qb)
.setSize(100).get(); //max of 100 hits will be returned for each scroll
//Scroll until no hits are returned
do {
for (SearchHit hit : scrollResp.getHits().getHits()) {
//Handle the hit...
}

scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
} while(scrollResp.getHits().getHits().length != 0); // Zero hits mark the end of the scroll and the while loop.

统计指定索引下的文档数:

1
2
3
4
$ curl -X GET "127.0.0.1:9200/_cat/count/my_index?v"

epoch timestamp count
1559718460 15:07:40 123

未完待续……

elasticsearch 集群的搭建

下面说说elasticsearch集群的搭建,同样是使用前面例子elasticsearch 单节点安装使用的elasticsearch-6.4.0.tar.gz版本,我在一台机器上安装,所以这是伪集群,当修改为真集群的时候,只要将IP地址修改下即可,下面会说明。

下面开始搭建elasticsearch集群

创建一个目录用于存放集群使用到的所有实例信息

1
2
$ cd /home/hewentian/ProjectD
$ mkdir elasticsearchCluster # 集群的文件都放在这里

将一个elasticsearch压缩包放到这个目录,我之前已在ProjectD目录下载好了

1
2
3
4
5
6
7
$ cd /home/hewentian/ProjectD/elasticsearchCluster
$ cp /home/hewentian/ProjectD/elasticsearch-6.4.0.tar.gz ./
$ tar xzvf elasticsearch-6.4.0.tar.gz

为方便起见,这里将其重命名为elasticsearch-node1,先将elasticsearch-node1配置好,
后面会将其复制为elasticsearch-node2, elasticsearch-node3
$ mv elasticsearch-6.4.0 elasticsearch-node1

elasticsearch-node1进行设置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
$ cd /home/hewentian/ProjectD/elasticsearchCluster/elasticsearch-node1/config
$ vi elasticsearch.yml # 增加下面的配置

cluster.name: hewentian-cluster # 配置集群的名字
node.name: node-1 # 配置集群下的节点的名字

node.master: true # 是否有资格被选举为master节点, 默认为 true
node.data: true # 设置该节点是否存储数据, 默认为 true

network.host: 127.0.0.1 # 设置本机IP地址
http.port: 9201 # 将端口设置成9201
transport.tcp.port: 9301 # 内部节点之间沟通的端口

discovery.zen.ping.unicast.hosts: ["127.0.0.1:9301", "127.0.0.1:9302", "127.0.0.1:9303"]

discovery.zen.minimum_master_nodes: 2 # total number of master-eligible nodes / 2 + 1

action.destructive_requires_name: true

允许跨域,否则 elasticsearch head 不能访问 elasticsearch
http.cors.enabled: true
http.cors.allow-origin: "*"

这样一个节点就配置好了,我们只要以这个为蓝本,复制出两份,并修改其中的三点:node.name、http.port、transport.tcp.port即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
$ cd /home/hewentian/ProjectD/elasticsearchCluster
$ cp -r elasticsearch-node1 elasticsearch-node2
$ cp -r elasticsearch-node1 elasticsearch-node3

$ ls
elasticsearch-6.4.0.tar.gz elasticsearch-node1 elasticsearch-node2 elasticsearch-node3

对 elasticsearch-node2 进行设置,只修改如下三项
$ vi /home/hewentian/ProjectD/elasticsearchCluster/elasticsearch-node2/config/elasticsearch.yml

node.name: node-2
http.port: 9202
transport.tcp.port: 9302


对 elasticsearch-node3 进行设置,只修改如下三项
$ vi /home/hewentian/ProjectD/elasticsearchCluster/elasticsearch-node3/config/elasticsearch.yml

node.name: node-3
http.port: 9203
transport.tcp.port: 9303

内存大小的设置,根据机器内存大小而设置,一般不超过系统总内存的一半,分别对三个节点进行:

1
2
3
4
5
$ cd /home/hewentian/ProjectD/elasticsearchCluster/elasticsearch-node1/config/
$ vi jvm.options

-Xms1g
-Xmx1g

在每个节点所在的机器上都作下面的配置(下面是根据我机器的情况作的配置,我的是伪集群,所以只配置一次):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ su root			# 必须在 root 下才有权限修改系统配置文件
Password:
$ vi /etc/security/limits.conf # 添加如下配置
* soft nofile 65536 # 上面第一个错误有提示
* hard nofile 131072 # 一般为 soft nofile 的2倍
* soft nproc 4096 # 这个设置线程数
* hard nproc 8192

$ vi /etc/sysctl.conf # 添加如下配置
vm.max_map_count=262144

$ sysctl -p # 最后执行这个命令,你会见到如下输出
vm.max_map_count=262144

$ exit # 退出 root

分别启动三个节点:

1
2
3
4
5
6
7
8
$ cd /home/hewentian/ProjectD/elasticsearchCluster/elasticsearch-node1/bin
$ ./elasticsearch

$ cd /home/hewentian/ProjectD/elasticsearchCluster/elasticsearch-node2/bin
$ ./elasticsearch

$ cd /home/hewentian/ProjectD/elasticsearchCluster/elasticsearch-node3/bin
$ ./elasticsearch

部分输出如下:

[2018-09-17T17:27:08,325][INFO ][o.e.n.Node               ] [node-1] initializing ...
[2018-09-17T17:27:08,430][INFO ][o.e.e.NodeEnvironment    ] [node-1] using [1] data paths, mounts [[/ (/dev/sda2)]], net usable_space [59.7gb], net total_space [101.7gb], types [ext4]

[2018-09-17T17:27:19,552][DEBUG][o.e.a.ActionModule       ] Using REST wrapper from plugin org.elasticsearch.xpack.security.Security
[2018-09-17T17:27:19,878][INFO ][o.e.d.DiscoveryModule    ] [node-1] using discovery type [zen]
[2018-09-17T17:27:21,289][INFO ][o.e.n.Node               ] [node-1] initialized
[2018-09-17T17:27:21,289][INFO ][o.e.n.Node               ] [node-1] starting ...
[2018-09-17T17:27:21,496][INFO ][o.e.t.TransportService   ] [node-1] publish_address {127.0.0.1:9301}, bound_addresses {127.0.0.1:9301}
[2018-09-17T17:27:24,571][WARN ][o.e.d.z.ZenDiscovery     ] [node-1] not enough master nodes discovered during pinging (found [[Candidate{node={node-1}{D7fISYxgQFahYdTEbqMO4g}{7x0Hu4tQTP-N73j_A073DA}{127.0.0.1}{127.0.0.1:9301}{ml.machine_memory=8305086464, xpack.installed=true, ml.max_open_jobs=20, ml.enabled=true}, clusterStateVersion=-1}]], but needed [2]), pinging again

发现节点 node-2
[2018-09-17T17:28:37,362][INFO ][o.e.c.s.MasterService    ] [node-1] zen-disco-elected-as-master ([1] nodes joined)[, ], reason: new_master {node-1}{D7fISYxgQFahYdTEbqMO4g}{7x0Hu4tQTP-N73j_A073DA}{127.0.0.1}{127.0.0.1:9301}{ml.machine_memory=8305086464, xpack.installed=true, ml.max_open_jobs=20, ml.enabled=true}, added {{node-2}{LNHhP-WPSDaGZJeRHYwBQQ}{zRaTzS0OQkyfCdaRhczR9A}{127.0.0.1}{127.0.0.1:9302}{ml.machine_memory=8305086464, ml.max_open_jobs=20, xpack.installed=true, ml.enabled=true},}

发现节点 node-3
[node-1] zen-disco-node-join, reason: added {{node-3}{4rqo1NL0R8KVCAkzI0w4UQ}{s599uwThRGi74YE4WFULIg}{127.0.0.1}{127.0.0.1:9303}{ml.machine_memory=8305086464, ml.max_open_jobs=20, xpack.installed=true, ml.enabled=true},}
[2018-09-17T17:29:27,022][INFO ][o.e.c.s.ClusterApplierService] [node-1] added {{node-3}{4rqo1NL0R8KVCAkzI0w4UQ}{s599uwThRGi74YE4WFULIg}{127.0.0.1}{127.0.0.1:9303}{ml.machine_memory=8305086464, ml.max_open_jobs=20, xpack.installed=true, ml.enabled=true},}, reason: apply cluster state (from master [master {node-1}{D7fISYxgQFahYdTEbqMO4g}{7x0Hu4tQTP-N73j_A073DA}{127.0.0.1}{127.0.0.1:9301}{ml.machine_memory=8305086464, xpack.installed=true, ml.max_open_jobs=20, ml.enabled=true} committed version [15] source [zen-disco-node-join]])

在浏览器中输入如下地址:
http://localhost:9201/
http://localhost:9202/
http://localhost:9203/

{
  "name" : "node-1",
  "cluster_name" : "hewentian-cluster",
  "cluster_uuid" : "odUSNw8jS4q-w_Vl66q1qg",
  "version" : {
    "number" : "6.4.0",
    "build_flavor" : "default",
    "build_type" : "tar",
    "build_hash" : "595516e",
    "build_date" : "2018-08-17T23:18:47.308994Z",
    "build_snapshot" : false,
    "lucene_version" : "7.4.0",
    "minimum_wire_compatibility_version" : "5.6.0",
    "minimum_index_compatibility_version" : "5.0.0"
  },
  "tagline" : "You Know, for Search"
}

{
  "name" : "node-2",
  "cluster_name" : "hewentian-cluster",
  "cluster_uuid" : "odUSNw8jS4q-w_Vl66q1qg",
  "version" : {
    "number" : "6.4.0",
    "build_flavor" : "default",
    "build_type" : "tar",
    "build_hash" : "595516e",
    "build_date" : "2018-08-17T23:18:47.308994Z",
    "build_snapshot" : false,
    "lucene_version" : "7.4.0",
    "minimum_wire_compatibility_version" : "5.6.0",
    "minimum_index_compatibility_version" : "5.0.0"
  },
  "tagline" : "You Know, for Search"
}

{
  "name" : "node-3",
  "cluster_name" : "hewentian-cluster",
  "cluster_uuid" : "odUSNw8jS4q-w_Vl66q1qg",
  "version" : {
    "number" : "6.4.0",
    "build_flavor" : "default",
    "build_type" : "tar",
    "build_hash" : "595516e",
    "build_date" : "2018-08-17T23:18:47.308994Z",
    "build_snapshot" : false,
    "lucene_version" : "7.4.0",
    "minimum_wire_compatibility_version" : "5.6.0",
    "minimum_index_compatibility_version" : "5.0.0"
  },
  "tagline" : "You Know, for Search"
}

启动elasticsearch-head,我们就可以看到集群如下图所示:

从图中可以看到node-1已经成为master节点。我们尝试往集群中PUT一些数据,分别往3个不同的端口中PUT数据:

curl -XPUT 'http://localhost:9201/twitter/doc/1?pretty' -H 'Content-Type: application/json' -d '
{
    "user": "kimchy",
    "post_date": "2009-11-15T13:12:00",
    "message": "Trying out Elasticsearch, so far so good?"
}'

curl -XPUT 'http://localhost:9202/twitter/doc/2?pretty' -H 'Content-Type: application/json' -d '
{
    "user": "kimchy",
    "post_date": "2009-11-15T14:12:12",
    "message": "Another tweet, will it be indexed?"
}'

curl -XPUT 'http://localhost:9203/twitter/doc/3?pretty' -H 'Content-Type: application/json' -d '
{
    "user": "elastic",
    "post_date": "2010-01-15T01:46:38",
    "message": "Building the site, should be kewl"
}'

输出结果如下:

{
  "_index" : "twitter",
  "_type" : "doc",
  "_id" : "1",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 0,
  "_primary_term" : 1
}

{
  "_index" : "twitter",
  "_type" : "doc",
  "_id" : "2",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 0,
  "_primary_term" : 1
}

{
  "_index" : "twitter",
  "_type" : "doc",
  "_id" : "3",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 0,
  "_primary_term" : 1
}

我们在elasticsearch-head中查看数据

elasticsearch-集群状态:

elasticsearch-集群数据

至此,集群搭建结束。

elasticsearch 单节点安装

本文将说下elasticsearch的单节点安装,我的机器为Ubuntu 16.04 LTS,当前用户为hewentian

首先,我们要将elasticsearch安装包下载回来,截止本文写时,它的最新版本为6.4.0,可以在它的官网下载,当然,我们也可以从这里下载 elasticsearch-6.4.0.tar.gz,推荐从elasticsearch官网下载最新版本。

1
2
3
4
5
6
7
8
9
$ cd /home/hewentian/ProjectD/
$ wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.4.0.tar.gz
$ wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.4.0.tar.gz.sha512

验证下载文件的完整性,在下载的时候要将 SHA512 文件也下载回来
$ sha512sum -c elasticsearch-6.4.0.tar.gz.sha512
elasticsearch-6.4.0.tar.gz: OK

$ tar xzf elasticsearch-6.4.0.tar.gz

elasticsearch进行设置(目前是单节点,所以也可以不对elasticsearch.yml进行设置,可直接跳过):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ cd /home/hewentian/ProjectD/elasticsearch-6.4.0/config
$ vi elasticsearch.yml # 增加下面的配置

cluster.name: hewentian-cluster # 配置集群的名字
node.name: node-1 # 配置集群下的节点的名字

network.host: 127.0.0.1 # 设置本机IP地址,这里可不设置,但是在集群环境中必须设置。如果在这里指定了IP,则localhost可能无法使用,这个要注意

http.port: 9200 # 默认也是这个端口

下面的配置保持默认即可,它会将数据和日志保存到elasticsearch-6.4.0目录下的data和logs目录
#path.data: /path/to/data
#
# Path to log files:
#
#path.logs: /path/to/logs

内存大小的设置,根据机器内存大小而设置,一般不超过系统总内存的一半:

1
2
3
4
5
$ cd /home/hewentian/ProjectD/elasticsearch-6.4.0/config
$ vi jvm.options

-Xms1g
-Xmx1g

启动elasticsearch

1
2
$ cd /home/hewentian/ProjectD/elasticsearch-6.4.0/bin
$ ./elasticsearch

我们也可以在启动的时候加上参数-d,以后台运行方式启动:./elasticsearch -d。这样单节点版本的elasticsearch就安装完了,在浏览器中输入如下地址:http://localhost:9200/

{
"name" : "node-1",
  "cluster_name" : "hewentian-cluster",
  "cluster_uuid" : "ihLk1iOlTEis2PkQrLhmLQ",
  "version" : {
    "number" : "6.4.0",
    "build_flavor" : "default",
    "build_type" : "tar",
    "build_hash" : "595516e",
    "build_date" : "2018-08-17T23:18:47.308994Z",
    "build_snapshot" : false,
    "lucene_version" : "7.4.0",
    "minimum_wire_compatibility_version" : "5.6.0",
    "minimum_index_compatibility_version" : "5.0.0"
  },
  "tagline" : "You Know, for Search"
}

如果你见到上面的输出,证明安装成功了。

不过,你在安装的过程中,有可能会遇到下面的问题之一:

ERROR: [3] bootstrap checks failed
[1]: max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536]
[2]: max number of threads [2048] for user [hewentian] is too low, increase to at least [4096]
[3]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]

解决方法如下,就算它不报上面的错误,我们也应该根据机器的性能,对下面的选项作相应配置(下面是根据我机器的情况作的配置):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ su root			# 必须在 root 下才有权限修改系统配置文件
Password:
$ vi /etc/security/limits.conf # 添加如下配置
* soft nofile 65536 # 上面第一个错误有提示
* hard nofile 131072 # 一般为 soft nofile 的2倍
* soft nproc 4096 # 这个设置线程数
* hard nproc 8192

$ vi /etc/sysctl.conf # 添加如下配置
vm.max_map_count=262144

$ sysctl -p # 最后执行这个命令,你会见到如下输出
vm.max_map_count=262144

$ exit # 退出 root

elasticsearch-head的安装

为了更方便的与elasticsearch交互,我们还要安装elasticsearch-head插件,安装步骤如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ cd /home/hewentian/ProjectD/gitHub
$ git clone https://github.com/mobz/elasticsearch-head.git
$ cd elasticsearch-head
$ npm install # 这个安装过程可能会报错,但是一般不影响。安装完之后,运行下面的命令即可
$ npm run start # 运行此命令,你会看到如下输出

> elasticsearch-head@0.0.0 start /home/hewentian/ProjectD/gitHub/elasticsearch-head
> grunt server

Running "connect:server" (connect) task
Waiting forever...
Started connect web server on http://localhost:9100

也可以使用下面这种方式启动
$ nohup npm run start &

安装结束之后,可以试着打开这个连接:http://localhost:9100/
这个连接可以打开,就证明elasticsearch-head安装成功,但是你可能会发现,它无法连上elasticsearch。因为,我们还没有对elasticsearch进行设置:

1
2
3
4
5
6
$ cd /home/hewentian/ProjectD/elasticsearch-6.4.0/config
$ vi elasticsearch.yml # 增加下面的配置

允许跨域,否则 elasticsearch head 不能访问 elasticsearch
http.cors.enabled: true
http.cors.allow-origin: "*"

配置好之后,重启elasticsearch,就可以使用elasticsearch-head访问elasticsearch了,如下图所示:

elasticsearch使用示例

使用示例我不打算自已写,因为elasticsearch官方的README.textile已经写得非常详细了。下面的示例摘自elasticsearch-6.4.0/README.textile
下面代码的缩进尽量不要使用tab,要使用空格

Indexing

Let’s try and index some twitter like information. First, let’s index some tweets (the @twitter@ index will be created automatically):

curl -XPUT 'http://localhost:9200/twitter/doc/1?pretty' -H 'Content-Type: application/json' -d '
{
    "user": "kimchy",
    "post_date": "2009-11-15T13:12:00",
    "message": "Trying out Elasticsearch, so far so good?"
}'

curl -XPUT 'http://localhost:9200/twitter/doc/2?pretty' -H 'Content-Type: application/json' -d '
{
    "user": "kimchy",
    "post_date": "2009-11-15T14:12:12",
    "message": "Another tweet, will it be indexed?"
}'

curl -XPUT 'http://localhost:9200/twitter/doc/3?pretty' -H 'Content-Type: application/json' -d '
{
    "user": "elastic",
    "post_date": "2010-01-15T01:46:38",
    "message": "Building the site, should be kewl"
}'

Getting

Now, let’s see if the information was added by GETting it:

curl -XGET 'http://localhost:9200/twitter/doc/1?pretty=true'
curl -XGET 'http://localhost:9200/twitter/doc/2?pretty=true'
curl -XGET 'http://localhost:9200/twitter/doc/3?pretty=true'

The results show below:

{
  "_index" : "twitter",
  "_type" : "doc",
  "_id" : "1",
  "_version" : 1,
  "found" : true,
  "_source" : {
    "user" : "kimchy",
    "post_date" : "2009-11-15T13:12:00",
    "message" : "Trying out Elasticsearch, so far so good?"
  }
}

{
  "_index" : "twitter",
  "_type" : "doc",
  "_id" : "2",
  "_version" : 1,
  "found" : true,
  "_source" : {
    "user" : "kimchy",
    "post_date" : "2009-11-15T14:12:12",
    "message" : "Another tweet, will it be indexed?"
  }
}

{
  "_index" : "twitter",
  "_type" : "doc",
  "_id" : "3",
  "_version" : 1,
  "found" : true,
  "_source" : {
    "user" : "elastic",
    "post_date" : "2010-01-15T01:46:38",
    "message" : "Building the site, should be kewl"
  }
}

Updating

The updating operation is the same as Indexing.

curl -XPUT 'http://localhost:9200/twitter/doc/1?pretty' -H 'Content-Type: application/json' -d '
{
    "user": "kimchy",
    "post_date": "2009-11-15T13:12:00",
    "message": "Trying out Elasticsearch, so far so good? yes"
}'

{
  "_index" : "twitter",
  "_type" : "doc",
  "_id" : "1",
  "_version" : 2,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 1,
  "_primary_term" : 1
}

Deleting

The deleting operation is also easy.

curl -XDELETE 'http://localhost:9200/twitter/doc/1'

{"_index":"twitter","_type":"doc","_id":"1","_version":3,"result":"deleted","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":2,"_primary_term":1}

Searching

Mmm search…, shouldn’t it be elastic?
Let’s find all the tweets that @kimchy@ posted:

curl -XGET 'http://localhost:9200/twitter/_search?q=user:kimchy&pretty=true'

{
  "took" : 42,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 0.2876821,
    "hits" : [
      {
        "_index" : "twitter",
        "_type" : "doc",
        "_id" : "2",
        "_score" : 0.2876821,
        "_source" : {
          "user" : "kimchy",
          "post_date" : "2009-11-15T14:12:12",
          "message" : "Another tweet, will it be indexed?"
        }
      },
      {
        "_index" : "twitter",
        "_type" : "doc",
        "_id" : "1",
        "_score" : 0.2876821,
        "_source" : {
          "user" : "kimchy",
          "post_date" : "2009-11-15T13:12:00",
          "message" : "Trying out Elasticsearch, so far so good?"
        }
      }
    ]
  }
}

We can also use the JSON query language Elasticsearch provides instead of a query string:

curl -XGET 'http://localhost:9200/twitter/_search?pretty=true' -H 'Content-Type: application/json' -d '
{
    "query" : {
        "match" : { "user": "kimchy" }
    }
}'

Just for kicks, let’s get all the documents stored (we should see the tweet from @elastic@ as well):

curl -XGET 'http://localhost:9200/twitter/_search?pretty=true' -H 'Content-Type: application/json' -d '
{
    "query" : {
        "match_all" : {}
    }
}'

We can also do range search (the @post_date@ was automatically identified as date)

curl -XGET 'http://localhost:9200/twitter/_search?pretty=true' -H 'Content-Type: application/json' -d '
{
    "query" : {
        "range" : {
            "post_date" : { "from" : "2009-11-15T13:00:00", "to" : "2009-11-15T14:00:00" }
        }
    }
}'

There are many more options to perform search, after all, it’s a search product no? All the familiar Lucene queries are available through the JSON query language, or through the query parser.

Multi Tenant - Indices and Types

Man, that twitter index might get big (in this case, index size == valuation). Let’s see if we can structure our twitter system a bit differently in order to support such large amounts of data.

Elasticsearch supports multiple indices. In the previous example we used an index called @twitter@ that stored tweets for every user.

Another way to define our simple twitter system is to have a different index per user (note, though that each index has an overhead). Here is the indexing curl’s in this case:

curl -XPUT 'http://localhost:9200/kimchy/doc/1?pretty' -H 'Content-Type: application/json' -d '
{
    "user": "kimchy",
    "post_date": "2009-11-15T13:12:00",
    "message": "Trying out Elasticsearch, so far so good?"
}'

curl -XPUT 'http://localhost:9200/kimchy/doc/2?pretty' -H 'Content-Type: application/json' -d '
{
    "user": "kimchy",
    "post_date": "2009-11-15T14:12:12",
    "message": "Another tweet, will it be indexed?"
}'

The above will index information into the @kimchy@ index. Each user will get their own special index.

Complete control on the index level is allowed. As an example, in the above case, we would want to change from the default 5 shards with 1 replica per index, to only 1 shard with 1 replica per index (== per twitter user). Here is how this can be done (the configuration can be in yaml as well):

curl -XPUT http://localhost:9200/another_user?pretty -H 'Content-Type: application/json' -d '
{
    "index" : {
        "number_of_shards" : 1,
        "number_of_replicas" : 1
    }
}'

Search (and similar operations) are multi index aware. This means that we can easily search on more than one
index (twitter user), for example:

curl -XGET 'http://localhost:9200/kimchy,another_user/_search?pretty=true' -H 'Content-Type: application/json' -d '
{
    "query" : {
        "match_all" : {}
    }
}'

Or on all the indices:

curl -XGET 'http://localhost:9200/_search?pretty=true' -H 'Content-Type: application/json' -d '
{
    "query" : {
        "match_all" : {}
    }
}'

{One liner teaser}: And the cool part about that? You can easily search on multiple twitter users (indices), with different boost levels per user (index), making social search so much simpler (results from my friends rank higher than results from friends of my friends).

Cat Index

curl -XGET 'http://localhost:9200/_cat/indices?v'

health status index   uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   twitter 1aZo0hSfRkKG1INAEZgpnQ   5   1          2            0       14kb           14kb
yellow open   music   ytI7cirvQXOi1hsrvAMGGA   5   1          0            0      1.2kb          1.2kb