概况

Splunk是一个可以让你去搜索、分析、可视化机器数据的软件,当定义好数据源之后,splunk会对数据源的数据进行索引,并将他们组织到一系列个性化的事件中供用户查看及搜索。

大多数用户使用Web浏览器连接到Splunk Enterprise,并使用Splunk Web来管理其部署,管理和创建知识对象,运行搜索,创建数据透视表和报表等等。您还可以使用命令行界面来管理Splunk Enterprise部署。

Splunk组件

  • Forwarder : 转发器先使用数据,然后再将数据转发到索引器。转发器通常需要最少的资源,从而使它们可以轻松地驻留在生成数据的计算机上。
  • Indexer: 索引器对通常从一组转发器接收的传入数据进行索引。索引器将数据转换为事件并将事件存储在索引中。索引器还响应于来自Search Head的搜索请求来搜索索引数据。为了确保高数据可用性并防止数据丢失,或者只是为了简化多个索引器的管理,您可以在索引器群集中部署多个索引器。
  • Search Head:与用户交互,将搜索请求定向到一组索引器,然后将结果合并回用户。为了确保高可用性并简化水平扩展,您可以在搜索头集群中部署多个Search Head

简单的分布式例子

相关博客推荐

遇到问题可以先到Splunk社区问答中搜索一下,看一下是否有人已经提出这个问题并得到解决。

搜索教程

上传数据

上传数据时,将数据转换的过程称为索引,处理传入的数据以实现快速搜索和分析,处理后的记过作为事件存储在索引中。

默认情况下,所有数据都放入一个名为main的预配置索引中。当您将数据添加到Splunk实例时,可以创建索引来存储数据

搜索教程

  • 字段:作为索引过程的一部分,信息将从数据中提取出来,并格式化为名称和值对,称为字段

    字段搜索格式为 字段名=字段值,注意事项:

    • 字段名区分大小写,字段值不区分大小写
    • 字段值可以使用通配符
    • 字段值中含有空格,必须使用引号
  • 关键字:在搜索中需要用到多个关键字查询,用ORANDNOT连接。默认空格=AND,注意的是,连接词是大写的。一个多关键字的例子,星号(*)字符用作通配符匹配failfailurefailedfailing,等等。

    1
    buttercupgames (error OR fail* OR severe)

    在评估布尔表达式时,括号内的术语优先。NOT子句先于OR子句进行求值。AND子句的优先级最低。

  • 搜索结果:

    事件下方有一个时间柱状图,标识了搜索条件在该时间段出现的次数,出现次数越多,柱状图就越高。还可以通过-缩小按钮来获取更多时间段的统计。

  • 模式:将结构相似的时间进行统计。

  • 搜索处理语言(SPL):Splunk开发了与Splunk软件一起使用的搜索处理语言(SPL)。SPL包含所有搜索命令及其功能,参数和子句。

    在首选项中开启搜索助理完全模式,来学习。

    1
    sourcetype=access_* status=200 action=purchase | top categoryId
    • | : 管道搜索,以左边的结果为搜索输入。
    • top : 通用命令,返回字段中最常见的字段,该top命令是**转换命令**。转换命令将搜索结果组织到一个表中。使用转换命令生成可用于创建可视化的结果,例如柱形图,折线图,面积图和饼图。
  • 子搜索: 配合管道和转换命令来执行子查询。还可以用AS标识符来对搜索结果列名进行重命名。

创建报告和图表

1.保存并共享报告

执行搜索

1
sourcetype=access_* status=200 action=purchase [search sourcetype=access_* status=200 action=purchase | top limit=1 clientip | table clientip] | stats count AS "Total Purchased", dc(productId) AS "Total Products", values(productName) AS "Product Names" BY clientip | rename clientip AS "VIP Customer"

将报告另存为报表,在报表栏中可以查看或编辑该报告。

将报告设置成共享,只需要设置报表的权限为 应用 即可。

2.创建基本图表

开始搜索

1
sourcetype=access_* status=200 | chart count AS views count(eval(action="addtocart")) AS addtocart count(eval(action="purchase")) AS purchases by productName | rename productName AS "Product Name", views AS "Views", addtocart AS "Adds to Cart", purchases AS "Purchases"

该搜索使用chart命令来算那些事件的数量action=purchaseaction=addtocart。然后,搜索使用rename命令来重命名结果中显示的字段。

chart命令是变换命令。搜索结果显示在“统计信息”选项卡上。

创建仪表板

仪表板是由面板组成的视图。面板可以包含诸如搜索框,字段,图表,表格和列表之类的模块。仪表板面板通常连接到报表。

  1. 创建一个搜索
1
sourcetype=access_* status=200 action=purchase | top categoryId
  1. 点击可视化,设置为饼图,并保存为仪表版面板

  2. 创建一个新的仪表板面板

  3. 编辑仪表板

    可在 添加面板 中,为仪表板添加更多视图。

搜索语言(SPL)

Splunk SDK For Java

概况

使用Splunk Enterprise SDK for Java,您可以编写Java应用程序以编程方式与Splunk引擎进行交互。该SDK建立在REST API的基础上,提供了REST API端点的包装。

详细说明请看:Splunk SDK For Java 官方文档

命名空间

为了在整个Splunk安装过程中考虑到用户查看应用程序,系统文件和其他实体资源的权限,Splunk提供了基于名称空间对实体资源的访问。这类似于使用端点访问资源时Splunk REST API使用的应用程序/用户上下文。命名空间用法为ower,app,共享模式

  • ower:Splunk用户名,如 admin 。值为 nobody 表示没有特定用户。
  • app:应用程序,如 search 。 值为 - 表示所有应用程序。
  • 共享模式:
    • “user”: 资源是特定用户的资源
    • “app”: 资源是由app共享的,由app指定
    • “global”: 资源是所有app共享的
    • “system”: 资源是系统资源

列出所有保存的针对Kramer用户的搜索: kramer,-,user

快速入门

1.安装依赖

在项目的 pom.xml文件中添加SDK依赖。

1
2
3
4
5
6
7
8
<repositories>
...
<repository>
<id>splunk-artifactory</id>
<name>Splunk Releases</name>
<url>https://splunk.jfrog.io/splunk/ext-releases-local</url>
</repository>
</repositories>
1
2
3
4
5
6
7
8
<dependencies>
...
<dependency>
<groupId>com.splunk</groupId>
<artifactId>splunk</artifactId>
<version>1.6.0.0</version>
</dependency>
</dependencies>
2.连接到Splunk Enterprise
1
2
3
4
5
6
7
8
9
//解决SSL3报错,需要加入下面语句
HttpService.setSslSecurityProtocol(SSLSecurityProtocol.TLSv1_2);

ServiceArgs loginArgs = new ServiceArgs();
loginArgs.setUsername("admin");
loginArgs.setPassword("tangseng233");
loginArgs.setHost("192.168.215.131");
loginArgs.setPort(8089);
Service service = Service.connect(loginArgs);
3.处理搜索和作业

搜索以不同的方式运行,确定了它何时运行及如何检索结果:

  • 普通:普通搜索异步运行。它立即返回搜索作业。轮询作业以确定其状态。搜索完成后,您可以检索结果。如果启用了“预览”,您还可以预览结果。普通模式适用于实时搜索。
  • 阻止:阻止搜索同步运行。在搜索完成之前,它不会返回搜索作业,因此无需轮询状态。阻止模式不适用于实时搜索。
  • Oneshot:oneshot搜索是计划立即运行的阻止搜索。此模式不返回搜索作业,而是在完成后返回搜索结果。因为这是一个阻塞搜索,所以结果直到搜索完成才可用。
  • 实时:实时搜索以正常模式运行,并在实时事件流入Splunk Enterprise进行索引时对其进行搜索。返回的事件在指定的时间范围内符合您的条件。
  • 导出:导出搜索将立即运行,不返回搜索作业,并立即开始流式传输结果。此搜索对于从Splunk Enterprise导出大量数据很有用。

对于那些产生搜索作业(正常,阻止和实时)的搜索,搜索结果将在服务器上保存一段时间,并可根据要求进行检索。对于流式传输结果(oneshot和导出)的那些搜索,搜索结果不会保留在服务器上。如果流由于某种原因而中断,则无法再次运行搜索结果就无法恢复。

搜索基本代码

普通搜索

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import com.splunk.*;
...

//开始一个普通搜索 (搜索所有,展示头100条)
String query = "search * | head 100"
JobArgs jobargs = new JobArgs();
//设置作业模式为普通搜索
jobargs.setExecutionMode(JobArgs.ExecutionMode.NORMAL);
Job job = service.getJobs().create(query, jobargs);
// xml结果接收(后面详细讲)
InputStream resultsNormalSearch = job.getResults();
ResultsReaderXml resultsReaderNormalSearch;
try {
resultsReaderNormalSearch = new ResultsReaderXml(resultsNormalSearch);
HashMap<String, String> event;
//普通搜索是异步的,搜索结果需要用循环来确定是否还有下一个事件返回。
while ((event = resultsReaderNormalSearch.getNextEvent()) != null) {
System.out.println("\n*********EVENT****************\n");
for (String key: event.keySet())
System.out.println(" " + key + ": " + event.get(key));
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("\nSearch job properties\n---------------------");
System.out.println("Search job ID: " + job.getSid());
System.out.println("The number of events: " + job.getEventCount());
System.out.println("The number of results: " + job.getResultCount());
System.out.println("Search duration: " + job.getRunDuration() + " seconds");
System.out.println("This job expires in: " + job.getTtl() + " seconds");

阻止搜索

1
2
3
4
5
6
7
8
9
10
11
12
13
14
String searchQuery_blocking = "search * | head 100"; // 
JobArgs jobargs = new JobArgs();
//设置为阻止搜索
jobargs.setExecutionMode(JobArgs.ExecutionMode.BLOCKING);
//阻止搜索为同步搜索,当搜索完成后job也完全返回了,无需循环
System.out.println("Wait for the search to finish...");
Job job = service.getJobs().create(searchQuery_blocking, jobargs);
System.out.println("...done!\n");
System.out.println("Search job properties:\n---------------------");
System.out.println("Search job ID: " + job.getSid());
System.out.println("The number of events: " + job.getEventCount());
System.out.println("The number of results: " + job.getResultCount());
System.out.println("Search duration: " + job.getRunDuration() + " seconds");
System.out.println("This job expires in: " + job.getTtl() + " seconds");

OneShot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//设置搜索时间等参数
Args oneshotSearchArgs = new Args();
oneshotSearchArgs.put("earliest_time", "2012-06-19T12:00:00.000-07:00");
oneshotSearchArgs.put("latest_time", "2012-06-20T12:00:00.000-07:00");
String oneshotSearchQuery = "search * | head 10";

//与其他搜索不同,oneshot搜索不会创建搜索作业,因此无法使用Job和JobCollection类访问它。而是使用Service.oneshotSearch方法
InputStream results_oneshot = service.oneshotSearch(oneshotSearchQuery, oneshotSearchArgs);

// 直接处理结果
try {
ResultsReaderXml resultsReader = new ResultsReaderXml(results_oneshot);
System.out.println("Searching everything in a 24-hour time range starting June 19, 12:00pm and displaying 10 results in XML:\n");
HashMap<String, String> event;
while ((event = resultsReader.getNextEvent()) != null) {
System.out.println("\n********EVENT********");
for (String key: event.keySet())
System.out.println(" " + key + ": " + event.get(key));
}
resultsReader.close();
} catch (Exception e) {
e.printStackTrace();
}

实时搜索

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//在设置的时间范围内,搜索作业会不断的返回新的结果,直到搜索结束,
JobArgs jobArgs = new JobArgs();
//执行模式设置为普通
jobArgs.setExecutionMode(JobArgs.ExecutionMode.NORMAL);
//查询模式设置为实时
jobArgs.setSearchMode(JobArgs.SearchMode.REALTIME);
//当前时间 rt -1m 减1分钟
jobArgs.setEarliestTime("rt-1m");
//当前时间
jobArgs.setLatestTime("rt");
jobArgs.setStatusBuckets(300);

//创建搜索作业
String mySearch = "search index=_internal";
Job job = service.search(mySearch, jobArgs);

// 等待作业准备完成
while (!job.isReady()) {
Thread.sleep(500);
}

//接收预览结果
JobResultsPreviewArgs previewArgs = new JobResultsPreviewArgs();
previewArgs.setCount(300); // Retrieve 300 previews at a time

//实时输出
while (true) {
InputStream stream = job.getResultsPreview(previewArgs);
String line = null;
BufferedReader reader = new BufferedReader(new InputStreamReader(
stream, "UTF-8"));
while ((line = reader.readLine()) != null) {
System.out.println(line);
}
reader.close();
stream.close();
Thread.sleep(500);
}

导出搜索

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//创建搜索参数
JobExportArgs exportArgs = new JobExportArgs();
exportArgs.setEarliestTime("-1h");
exportArgs.setLatestTime("now");
exportArgs.setSearchMode(JobExportArgs.SearchMode.NORMAL);

//执行导出查询
String mySearch = "search index=_internal";
InputStream exportSearch = service.export(mySearch, exportArgs);

//MultiResultsReaderXml 接收导出结果
MultiResultsReaderXml multiResultsReader = new MultiResultsReaderXml(exportSearch);

int counter = 0;
for (SearchResults searchResults : multiResultsReader)
{
for (Event event : searchResults) {
System.out.println("***** Event " + counter++ + " *****");
for (String key: event.keySet())
System.out.println(" " + key + ": " + event.get(key));
}
}
multiResultsReader.close();
4.显示处理结果

运行搜索后,可以从搜索作业中检索不同的输出:

  • 事件:搜索的未转换事件。
  • 结果:处理完成后搜索的转换结果。如果搜索没有转换命令,则结果与事件相同。如果存在转换命令,则结果计数将小于事件计数。
  • 结果预览:仍在进行中的搜索预览,或实时搜索的结果。搜索完成后,预览结果与结果相同。您必须为非实时搜索启用预览(为实时搜索自动启用预览)。
  • 摘要:有关到目前为止已读取结果的搜索字段的摘要信息。将status_buckets搜索作业上的“ ”设置为正值以访问此数据。
  • 时间轴:到目前为止已读取的未转换事件随时间的事件分布。将status_buckets搜索作业上的“ ”设置为正值以访问此数据。

此输出作为流以XML,JSON,JSON_COLS,JSON_ROWS,CSV,ATOM或RAW格式返回。您可以使用标准Java类显示直接结果,也可以创建自己的解析器。但是为方便起见,SDK包含XML,CSV和JSON的结果读取器,它们可以为您正确地解析和格式化结果,并为每个Splunk Enterprise版本处理每种输出类型的特性。

搜索结果API

  • Job.getEvents方法从搜索作业中检索事件。使用JobEventsArgs该类可以为方法指定其他参数。
  • Job.getResults方法从搜索作业中检索结果。使用JobResultsArgs该类可以为方法指定其他参数。
  • Job.getResultsPreview方法从搜索作业中检索结果预览。使用JobResultsPreviewArgs该类可以为方法指定其他参数。
  • Job.getSummary方法从搜索作业中检索摘要数据。使用JobSummaryArgs该类可以为方法指定其他参数。
  • Job.getTimeline方法从搜索作业中检索时间轴数据。

使用以下类与结果阅读器一起显示结果。对于导出搜索的结果,请使用多结果阅读器来解析返回的多个结果集。

  • ResultsReader是基类。
    • ResultsReaderCsv显示CSV结果。
    • ResultsReaderJson显示JSON结果。
    • ResultsReaderXml显示XML结果。
  • MultiResultsReader是多结果的基类。(导出大量结果时用,返回数据流而不是job结果时用)
    • MultiResultsReaderJson类显示多组JSON的结果。
    • MultiResultsReaderXml级显示多组XML结果。

示例

无阅读器直接接收结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
String mySearch = "search * | head 5";
Job job = service.getJobs().create(mySearch);
while (!job.isDone()) {
Thread.sleep(500);
}
//
InputStream results = job.getResults();
String line = null;
System.out.println("Results from the search job as XML:\n");
//用原生字符缓冲流读取 (默认为xml格式结果流)
BufferedReader br = new BufferedReader(new InputStreamReader(results, "UTF-8"));
while ((line = br.readLine()) != null) {
System.out.println(line);
}
br.close();

ResultsReaderJson 读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
String mySearch = "search * | head 5";
Job job = service.getJobs().create(mySearch);

while (!job.isDone()) {
Thread.sleep(500);
}

JobResultsArgs resultsArgs = new JobResultsArgs();
//设置结果 格式化成Json格式
resultsArgs.setOutputMode(JobResultsArgs.OutputMode.JSON);

//ResultsReaderJson 接收结果
InputStream results = job.getResults(resultsArgs);
ResultsReaderJson resultsReader = new ResultsReaderJson(results);
HashMap<String, String> event;
System.out.println("\nFormatted results from the search job as JSON\n");
while ((event = resultsReader.getNextEvent()) != null) {
for (String key: event.keySet())
System.out.println(" " + key + ": " + event.get(key));
}
resultsReader.close();

显示预览结果

只要满足以下两个条件,就可以显示正在进行的搜索结果的预览:

  • 搜索必须在正常执行模式下运行(“ exec_mode“是” normal“)。预览不适用于阻止搜索(搜索作业ID在完成搜索后才可用)或流式搜索(无论如何都将返回结果)。
  • 必须启用预览(“ preview”为“ 1”)。默认情况下,仅对实时搜索启用预览,并且将“ status_buckets”设置为正值。使用该Job.enablePreview方法可以为现有搜索作业启用预览。

要显示预览,请运行搜索,为搜索作业启用预览,然后从中检索预览结果。默认情况下,将检索最近的100个预览。要更改此数字,请为“ count”设置一个值。使用“ offset”值可以浏览大量预览。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
String mySearch = "search * | head 50000";

JobArgs jobArgs = new JobArgs();
jobArgs.setExecutionMode(JobArgs.ExecutionMode.NORMAL);

Job job = service.search(mySearch, jobArgs);
job.enablePreview();
job.update();

while (!job.isReady()) {
Thread.sleep(500);
}

int countPreview=0;
int countBatch=0;
while (!job.isDone()) {
JobResultsPreviewArgs previewargs = new JobResultsPreviewArgs();
previewargs.setCount(500); // Get 500 previews at a time
previewargs.setOutputMode(JobResultsPreviewArgs.OutputMode.XML);

InputStream results = job.getResultsPreview(previewargs);
ResultsReaderXml resultsReader = new ResultsReaderXml(results);
HashMap<String, String> event;
while ((event = resultsReader.getNextEvent()) != null) {
System.out.println("BATCH " + countBatch + "\nPREVIEW " + countPreview++ + " ********");
for (String key: event.keySet())
System.out.println(" " + key + ": " + event.get(key));
}
countBatch++;
resultsReader.close();
}
System.out.println("Job is done with " + job.getResultCount() + " results");