简介
引用与链接
Flow-based programming - Wikipedia
Low-code development platform - Wikipedia
How An Arcane Coding Method From 1970s Banking Software Could Save The (fastcompany.com)
部署
单机部署
默认情况下,Nifi官方的包可以直接启动(开箱即用),无需进行额外配置;但需要注意本地的 8080
端口是否被占用,因为默认情况下Nifi会通过 8080
端口开放服务,如果该端口被占用,可以修改 conf
目录中的
下载
下载地址
如无特殊要求,下载最新包即可
本地部署
解压缩以后,可以得到如下的目录结构:
测试
当前测试仅在Windows下进行,如需在Linux下测试,在Nifi的 bin
目录执行 sudo ./nifi.sh start
即可
进入 bin
目录,直接双击 run-nifi.bat
启动Nifi,会自动弹出CMD窗口,静候 3-5分钟
打开浏览器,在地址栏输入 localhost:8080/nifi
,进入Nifi
集群部署
集群可以更好的实现Nifi的负载均衡,对于大多数生产环境而言,建议使用集群的方式进行部署
K8S Docker 集群
K8S(Kubernets)集群方式和Docker Compose类似,但K8S应用更加广泛,更加适合大型网络的负载均衡以及迁移等需求
本文中通过 yaml
文件的方式在 Kubernets
中搭建 Nifi集群
,使用 外置Zookeeper
,本文中不包含Kubernets集群的搭建步骤,我们假设您已经拥有一个能够正常使用的Kubernets集群环境
前提和准备
准备Kubernets 环境(假设您已经搭建完成,且可以正常连通使用)
准备本地Docker(无论基于Linux发行版还是本地WSL2均可)
准备本地Docker镜像(Zookeeper和Nifi镜像,Nifi镜像也可以自行打包)
准备
yaml
配置文件(Zookeeper部署/服务文件,Nifi部署/服务文件)准备Apache Nifi 源码(Apache Nifi 官方网站或GitHub方式下载)
注:如果您需要自行打包或需要对其功能进行修改的情况下需要下载Nifi源码
版本限定
Zookeeper镜像 :bitnami/zookeeper:latest
Apache Nifi镜像:apache/nifi:latest
资源
nifi 部署yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: nifi-131-v1
namespace: olmp
selfLink: /apis/apps/v1/namespaces/olmp/deployments/nifi-131-v1
uid: 7e32f310-7c96-11eb-b020-000c296c4b14
resourceVersion: '87155117'
generation: 45
creationTimestamp: '2021-03-04T03:05:36Z'
labels:
applicationName: nifi
applicationType: throughImage
envId: '8'
kubernetes.customized/pod-affinity: nifi-131
versionName: v1
annotations:
deployment.kubernetes.io/revision: '18'
spec:
replicas: 2
revisionHistoryLimit: 10
selector:
matchLabels:
applicationName: nifi
versionName: v1
strategy:
type: Recreate
template:
metadata:
creationTimestamp: null
labels:
applicationName: nifi
applicationType: throughImage
envId: '8'
kubernetes.customized/pod-affinity: nifi-131
parentKind: Deployment
versionName: v1
spec:
volumes:
- name: v1614844117709
configMap:
name: nifi-config
items:
- key: start.sh
path: start.sh
defaultMode: 511
containers:
- name: nifi
image: bjharbor.sccba.org/public/olmp/nifi:1.12.1-dockermaven
env:
- name: NIFI_CLUSTER_ADDRESS
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: NIFI_ELECTION_MAX_CANDIDATES
value: '1'
- name: NIFI_ZK_CONNECT_STRING
value: olmp-zk.olmp:2181
- name: NIFI_CLUSTER_IS_NODE
value: 'true'
- name: NIFI_CLUSTER_NODE_PROTOCOL_PORT
value: '8082'
- name: NIFI_ELECTION_MAX_WAIT
value: 1 min
- name: NIFI_WEB_HTTP_HOST
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: NIFI_REMOTE_INPUT_HOST
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
resources:
limits:
cpu: '1'
memory: 2G
requests:
cpu: '1'
memory: 2G
volumeMounts:
- name: v1614844117709
mountPath: /opt/nifi/scripts/start.sh
subPath: start.sh
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
imagePullPolicy: IfNotPresent
restartPolicy: Always
terminationGracePeriodSeconds: 30
dnsPolicy: ClusterFirst
securityContext: {}
imagePullSecrets:
- name: olmp
schedulerName: default-scheduler
status:
observedGeneration: 45
replicas: 2
updatedReplicas: 2
readyReplicas: 2
availableReplicas: 2
conditions:
- type: Available
status: 'True'
lastUpdateTime: '2021-03-04T08:12:28Z'
lastTransitionTime: '2021-03-04T08:12:28Z'
reason: MinimumReplicasAvailable
message: Deployment has minimum availability.
- type: Progressing
status: 'True'
lastUpdateTime: '2021-03-04T08:12:28Z'
lastTransitionTime: '2021-03-04T03:05:36Z'
reason: NewReplicaSetAvailable
message: ReplicaSet "nifi-131-v1-746dfb547f" has successfully progressed.
nifi 服务yaml
apiVersion: v1
kind: Service
metadata:
namespace: zjyw-st-zjyw
annotations:
kompose.cmd: kompose -f docker-compose.yml convert
kompose.version: 1.1.0 (36652f6)
creationTimestamp: null
labels:
io.kompose.service: nifi
name: nifi
spec:
ports:
- name: "8080"
port: 8080
targetPort: 8080
selector:
io.kompose.service: nifi
status:
loadBalancer: {}
zookeeper 部署yaml
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: zjyw-st-zjyw
annotations:
kompose.cmd: kompose -f docker-compose.yml convert
kompose.version: 1.1.0 (36652f6)
creationTimestamp: null
labels:
io.kompose.service: zookeeper
name: zookeeper
spec:
replicas: 1
strategy: {}
selector:
matchLabels:
io.kompose.service: zookeeper
template:
metadata:
creationTimestamp: null
labels:
io.kompose.service: zookeeper
spec:
containers:
- env:
- name: ALLOW_ANONYMOUS_LOGIN
value: "yes"
image: bitnami/zookeeper:latest
name: zookeeper
resources: {}
restartPolicy: Always
status: {}
zookeeper 服务yaml
apiVersion: v1
kind: Service
metadata:
namespace: zjyw-st-zjyw
annotations:
kompose.cmd: kompose -f docker-compose.yml convert
kompose.version: 1.1.0 (36652f6)
creationTimestamp: null
labels:
io.kompose.service: zookeeper
name: zookeeper
spec:
clusterIP: None
ports:
- name: headless
port: 55555
targetPort: 0
selector:
io.kompose.service: zookeeper
status:
loadBalancer: {}
步骤
概略
创建Zookeeper部署
创建Zookeeper服务
创建Nifi部署
创建Nifi服务
Nifi 性能优化
1.配置最大文件句柄
( Maximum File Handles
)
sudo vi /etc/security/limits.conf
内容为
hard nofile 50000
soft nofile 50000
2.配置最大派生进程数
( Maximum Forked Processes
)
sudo vi /etc/security/limits.conf
内容为
hard nproc 10000
soft nproc 10000
对于大多数系统可能需要增加一个新文件
sudo vi /etc/security/limits.d/90-nproc.conf
其中添加
soft nproc 10000
3.配置TCP Socket端口数
( Increase the number of TCP socket ports available
)
sudo sysctl -w net.ipv4.ip_local_port_range ="10000 65000"
4.设置Socket套接字停留时间
(减少 TIMED_WAIT
状态持续时间)
sudo sysctl -w net.ipv4.netfilter.ip_conntrack_tcp_timeout_time_wait ="1"
5.禁止SWAP
禁止swap会避免因为磁盘IO造成速度降幅,当然最好是增大内存容量
sudo vi /etc/sysctl.conf
添加如下行
vm.swappiness = 0
6.对Repository禁用atime
sudo vi /etc/fstab
其内容为
proc /proc proc defaults 0 0
none /dev/pts devpts gid=5,mode=620 0 0
/dev/md0 /boot ext3 defaults 0 0
/dev/md1 none swap sw 0 0
# 添加noatime选项
/dev/md2 / ext3 defaults,noatime 0 0
最后重新挂载
mount -o remount /
7.排除防病毒扫描目录
content_repository
flowfile_repository
provenance_repository
logs
state
以上的目录一旦被其他程序占用,将会导致系统被暂停,特别是防病毒软件在扫描时,应该排除这些目录
8.硬件优化原则
并发的处理器越多,CPU就要越强(特别是线程数要多)
处理分发的数据越大,内存就要随着增大,并且尽量不要使用系统的memory swap
系统硬件瓶颈大多出现在内存容量和硬盘IO,因为FlowFile可以看做是运行在内存中的,如果回溯的需求很多(需要经常回溯数据),那么尽可能使用固态甚至内存持久化技术来存储各种Repository
如果处理器特别多,项目特别大,那么一定要预先准备性能高的Cpu,容量大的内存
如果运行速度过慢,可以检查
1.是开启了内存交换,一般建议增加内存,以防止系统进行内存交换
2.是否CPU出现了瓶颈,建议增加CPU性能
启动和关闭时默认都会有一定的延迟(一般为3-5秒),这是系统预准备的延迟,启动延迟用于防止系统出现数据一次性送到控制台引起异常,而关闭延迟用于等待Niif Core退出,超时(一般5秒)会由Bootstrap自动销毁掉进程。
使用
定时任务
Nifi定时任务不需要在处理器中定义,只需要在处理器配置中的Scheduling
标签页中设置即可
设置位置
表达式的基本形式
秒 分钟 小时 一个月中的一天 月 周(星期) 年(可选)
特殊标识
通过*
表达通配符,也就是任意,通过?标识未指定,仅用于周(星期)
值的表示
单独数字(可以用
,
分割多个时间段),如分钟字段设置为1,2,5
,即代表1分钟,2分钟和5分钟时间范围(一个时间段,用
-
表示)如分钟字段设置为1-10
,即代表从1分钟到十分钟自动增量(系统自动增量,用
/
分割开始和增量)如分钟字段设置为0/10
,即代表0,10,20,30,40,50
案例
# 表示在每天中午的12点执行
0 0 12 * * ?
# 表示在每天中午的12点12分12秒执行
12 12 12 * * ?
# 表示仅在2021年的每天中午的12点12分12秒执行
12 12 12 * * ? 2021
# 表示仅在2021年到2025年的每天中午的12点12分12秒执行
12 12 12 * * ? 2021-2025
Nifi开发
Processor开发
如何使处理器不接受传入
// 起源处理器(不接受数据流,仅流出数据)
// 在处理器类上增加下面的注释即可
@InputRequirement(Requirement.INPUT_FORBIDDEN)
如何创建Processor参数和返回
在这个例子中,参数2只有在参数1为True时方为有效,否则不会显示
1.声明参数
// 参数1
public static final PropertyDescriptor USE_FILE = new PropertyDescriptor.Buildler()
// 参数名
.name("Use File")
// 显示名
.displayName("Use File")
// 必须的参数(如果不填则处理器报错)
.required(true)
// 允许的值(其值只能是括号内的值)
.allowableValues("true", "false")
// 默认值
.defaultValue("true")
.build();
// 参数2
public static final PropertyDescriptor FILE = new PropertyDescriptor.Builder()
.name("File to Use")
.displayName("File to Use")
.required(true)
// 验证器(这个验证的是文件是否存在,不存在处理器会报错)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
// 可用条件(只有在USE_FILE属性的值为true时,这个参数才为可用???
.dependsOn(USE_FILE, "true")
.build();
2.声明返回
// 返回1
public static final Relationship REL_SUCCESS = new Relationship.Builder()
// 名称(在nifi中显示的名称)
.name("success")
// 描述
.description("The flowfile contains the original content with one or more attributes added containing the respective counts")
.build();
// 返回2
public static final Relationship REL_FAILURE = new Relationship.Builder()
// 名称(在nifi中显示的名称)
.name("failure")
// 描述
.description("If the flowfile text cannot be counted for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
.build();
2.加入到处理器中
// 用于存放属性
private static final List<PropertyDescriptor> properties;
// 用于存放返回
private static final Set<Relationship> relationships;
// 在一个static代码块中初始化
static {
// 将参数附加到list中
properties = Collections.unmodifiableList(Arrays.asList(USE_FILE, FILE));
// 将返回值附加到set中
relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS,
REL_FAILURE)));
}
如何读取Processor参数
1.首先定义参数
public static final PropertyDescriptor TEXT = new PropertyDescriptor
.Builder().name("Text")
.displayName("text")
.description("这部分是描述")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
2.将参数放入descriptors中
@Override
protected void init(final ProcessorInitializationContext context) {
// 参数
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(TEXT);
this.descriptors = Collections.unmodifiableList(descriptors);
// 返回
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
3.在onTrigger中获取参数
final String text = context.getProperty(TEXT).getValue();
如何读Content
if (RedisConfig.equals("ERROR")) {
// Oracle的配置
final Charset charset = StandardCharsets.UTF_8;
final byte[] buffer = new byte[1024];
final AtomicInteger bufferedByteCount = new AtomicInteger(0);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
bufferedByteCount.set(StreamUtils.fillBuffer(in, buffer, false));
}
});
// 这个变量中存储的就是Content的内容了
String Config = new String(buffer, 0, bufferedByteCount.get(), charset);
如何写Content
// flowFile = session.write(flowFile, out -> out.write("要写入Content的内容"));
flowFile = session.write(flowFile, out -> out.write(value.get().getBytes()));
如何读取Attribute
// 参数key指的就是属性的key值,传入即可获取value
String test = flowFile.getAttribute(key);
如何写Attribute
1.类中定义参数的键名和值
// 定义参数和值
public static final String TEST = "test";
2.写入FlowFile
// 定义一个HashMap(用于存放新的属性,名称不限)
Map<String, String> metricAttributes = new HashMap<>();
// 将属性送入一个Map中(多个属性就多次压入)
metricAttributes.put(TEST, "Hello");
// 将所有属性送入FlowFile(此处创建了一个新的FlowFile,并将旧的FlowFile中的内容复制,但属性是新增的)
FlowFile updatedFlowFile = processSession.putAllAttributes(FlowFile, metricAttributes);
// 传送FlowFile到下游处理器
processSession.transfer(updatedFlowFile, REL_SUCCESS);
简单的Processor示例
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.tineaine.processor.demo;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
* 概念
* 流文件:(FlowFile、数据流),指在Nifi中流动的一条数据文件(可类比与TCP中的一个请求)
* 流文件属性:(FlowFile Attribute,属性),指FlowFile中用于记录流文件本身属性的Map,比如一个流文件传递的是一个文件,那么属性就是文件名、文件大小,是否运行读写等属性
* 流文件内容:(FlowFile Content,内容),指FlowFile中用于记录流文件传递数据的内容,如上所例,Content指的就是该文件的内容
* 处理器参数:(Processor Attribute,参数),指用户对处理器的配置
* 处理器关系:(Processor Relationships、关系、流出关系、流出),指处理器处理完成数据后,该数据流出的路径
*/
// 标签,该注解声明了处理器的标签,以便于用户在选择处理器时进行筛选
@Tags({"Aine"})
@CapabilityDescription("")
@ReadsAttributes({@ReadsAttribute(attribute = "", description = "")})
@WritesAttributes({@WritesAttribute(attribute = "", description = "")})
public class LearnProcessor extends AbstractProcessor {
// 处理器参数,一般的该参数都以全大写字母命名
public static final PropertyDescriptor TEXT = new PropertyDescriptor
// 参数名称(此名称不能重复)
.Builder().name("Str")
// 参数显示名称(用户在JStudio平台中看到的参数名称)
.displayName("Str")
// 参数描述(用户在选择处理器界面出现的处理器描述,一般是对处理器的描述)
.description("附加字符串")
// 是否是必须的参数,如果为True,则该参数必须被填写,否则处理器无法运行
.required(true)
// 参数验证器,此处代表该参数不能为空
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
// 关系,指处理器处理完数据之后,数据流的出口
public static final Relationship SUCCESS = new Relationship.Builder()
// 流出关系的名称(显示名称)
.name("Success")
// 流出关系的描述
.description("成功")
.build();
private volatile Set<String> dynamicPropertyNames = new HashSet<>();
private List<PropertyDescriptor> properties;
// 该List用于存储所有的参数
private List<PropertyDescriptor> descriptors;
// 该Set用于存储所有的流出关系
private Set<Relationship> relationships;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
// 动态属性缓存
private volatile Map<Relationship, PropertyValue> propertyMap = new HashMap<>();
// 初始化函数,该函数会被Nifi在处理器执行时首先调用,此处用于将参数和关系加入列表中
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
// 所有的参数必须药加入,然后加入到descriptors中
descriptors.add(TEXT);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
// 所有的关系必须药加入,然后加入到relationships中
relationships.add(SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.required(false)
.name(propertyDescriptorName)
.dynamic(true)
.build();
}
@Override
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
final Set<String> newDynamicPropertyNames = new HashSet<>(dynamicPropertyNames);
if (newValue == null) {
newDynamicPropertyNames.remove(descriptor.getName());
} else if (oldValue == null) { // new property
newDynamicPropertyNames.add(descriptor.getName());
}
this.dynamicPropertyNames = Collections.unmodifiableSet(newDynamicPropertyNames);
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
final Map<Relationship, PropertyValue> newPropertyMap = new HashMap<>();
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
if (!descriptor.isDynamic()) {
continue;
}
getLogger().debug("Adding new dynamic property: {}", new Object[]{descriptor});
newPropertyMap.put(new Relationship.Builder().name(descriptor.getName()).build(), context.getProperty(descriptor));
}
this.propertyMap = newPropertyMap;
}
// 这个函数会在处理器被触发时执行
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// 从session中获取流文件
FlowFile flowFile = session.get();
// 判断流文件是否为空,如果为空终止掉处理器即可
if (flowFile == null) {
return;
}
// 通过使用原子操作,尽可能避免出现并发问题
final AtomicReference<String> value = new AtomicReference<>();
// 设置字符集
final Charset charset = StandardCharsets.UTF_8;
final byte[] buffer = new byte[1024];
// 原子操作类(Integer)
final AtomicInteger bufferedByteCount = new AtomicInteger(0);
// 从流文件中读取所有的内容
session.read(flowFile, in -> bufferedByteCount.set(StreamUtils.fillBuffer(in, buffer, false)));
// 读取用户参数
final String text = context.getProperty(TEXT).getValue();
// 将流中的数据进行追加
value.set(new String(buffer, 0, bufferedByteCount.get(), charset) + text);
Map<String, String> map = new HashMap<>();
for (Relationship key : propertyMap.keySet()) {
map.put(key.getName(), propertyMap.get(key).getValue());
}
flowFile = session.putAllAttributes(flowFile, map);
// 写Content
flowFile = session.write(flowFile, out -> out.write(value.get().getBytes(StandardCharsets.UTF_8)));
// 向下传输流
session.transfer(flowFile, SUCCESS);
}
}
ControllerService开发
Controller Service
和Processor
的开发大同小异
我们可以简单的把Controller Service分为Api、Service(Impl)和Nar(包括api nar和service nar两部分)
其中api部分类似于我们在做MVC开发时,在Service层写的接口,而Controller Service Impl则是该接口的实现,Nar不必多说,就是Api和Service生成的Nar包目录;但需要注意的是,Controller Service在Maven打包时会生成两个包,因此想Nifi的lib中复制时需要注意。
Nifi Api
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.example;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.processor.exception.ProcessException;
@Tags({"example"})
@CapabilityDescription("Example Service API.")
public interface ConfigServer extends ControllerService {
public void flush() throws ProcessException;
public String getVal() throws ProcessException;
}
Nifi Impl
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.example;
import cn.hutool.http.HttpUtil;
import com.alibaba.nacos.api.common.Constants;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import java.util.*;
@Tags({"config", "aine"})
@CapabilityDescription("Read Config Server.")
public class NacosService extends AbstractControllerService implements ConfigServer {
private String config = "Error";
public static final Validator NON_EMPTY_VALIDATOR = (subject, value, context) -> {
return new ValidationResult.Builder()
.subject(subject)
.input(value)
.valid(value != null && !value.isEmpty())
.explanation(subject + " cannot be empty")
.build();
};
// Nacos地址
public static final PropertyDescriptor SERVER_ADDR = new PropertyDescriptor
.Builder().name("nacos")
.displayName("nacos")
.description("Nacos Server Address")
.required(true)
.addValidator(NON_EMPTY_VALIDATOR)
.build();
// 配置 ID
public static final PropertyDescriptor DATA_ID = new PropertyDescriptor
.Builder().name("dataId")
.displayName("Data Id")
.description("Data id")
.required(true)
.addValidator(NON_EMPTY_VALIDATOR)
.build();
// 配置分组
public static final PropertyDescriptor GROUP = new PropertyDescriptor
.Builder().name("group")
.displayName("Group")
.description("Group")
.required(true)
.addValidator(NON_EMPTY_VALIDATOR)
.build();
// 命名空间
public static final PropertyDescriptor NAMESPACE = new PropertyDescriptor
.Builder().name("namespace")
.displayName("namespace")
.description("namespace")
.required(true)
.addValidator(NON_EMPTY_VALIDATOR)
.build();
private static final List<PropertyDescriptor> properties;
String serverAddr = "";
String dataId = "";
String group = "";
String namespace = "";
static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(SERVER_ADDR);
props.add(NAMESPACE);
props.add(DATA_ID);
props.add(GROUP);
properties = Collections.unmodifiableList(props);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
/**
* @param context the configuration context
* @throws InitializationException if unable to create a database connection
*/
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException {
try {
serverAddr = context.getProperty(SERVER_ADDR).getValue();
dataId = context.getProperty(DATA_ID).getValue();
group = context.getProperty(GROUP).getValue();
namespace = context.getProperty(NAMESPACE).getValue();
flush();
} catch (Exception e) {
e.printStackTrace();
}
}
@OnDisabled
public void shutdown() {
}
@Override
public void flush() throws ProcessException {
try {
Map<String, Object> paramMap = new HashMap<>(3);
paramMap.put("tenant", namespace);
paramMap.put(Constants.DATAID, dataId);
paramMap.put(Constants.GROUP, group);
config = HttpUtil.get(serverAddr + "/nacos" + Constants.CONFIG_CONTROLLER_PATH, paramMap);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public String getVal() throws ProcessException {
return this.config;
}
}
异常记录
环境异常
Kubernets集群中Nifi Pod间无法互相访问
不要使用默认的 主机名
方式进行Pod
间的连接,需要通过Pod Ip
的方式进行访问,表现形式为Pod
可以访问到Zookeeper
(因为Zookeeper
是通过Service
方式开放的),但Pod
间无法正常访问,导致无法选举主节点和进行互相通信。
在 Yaml
中写法如下:
- name: NIFI_WEB_HTTP_HOST
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: NIFI_REMOTE_INPUT_HOST
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
开发异常
FlowFile 传递中文乱码
在任何情况下,只要Nifi中出现了乱码,就首先要观察Nifi中的数据流,查看 流入
和 流出
的数据是否乱码,定位出现乱码的处理器,一般的,最常见的乱码原因都是因为没有使用 UTF-8
的编码方式,案例如下:
flowFile = session.write(flowFile, out -> out.write(outputs.getBytes()));
可见在将文本写入 FlowFile
的 内容(Content)
时,没有将其转换为 UTF-8
,解决方案就是在转为Bytes时将其作为 UTF-8
的方式进行转换:
flowFile = session.write(flowFile, out -> out.write(outputs.getBytes(StandardCharsets.UTF_8)));
HttpPost传递XML到远程服务时,XML丢失
不要添加JSON声明
// 去掉或注释掉设置请求头的代码
// httpPost.setHeader("Content-Type","application/json;charset=utf-8");
附录
新增包一览
Processor
JsonAnalysis
Json分析处理器,允许对Json进行解析,并获取其中的值
JsonToXml
AttributeToJson
ReadFile
Split
WriteFile
AttributeToXml
MapToXml
XmlAnalysis
XmlToJson
XmlToMap
ModAttribute
ModContent
SocketClient
RelayHttp
RelaySocket
AES
DES
Execute
Controller Service
NacosService
处理器代码生成器的使用
// 项目名称(填写你的项目名)
static String projectName = "Jeesing";
// 最终包名(nifi processor 最终nar包名称,用于描述功能类,不要和其他的nar包重复)
static String ProcessorName = "Demo";
// 项目所在位置(在哪里创建就写哪里)
static String projectPath = "F:\\Project\\develop\\work\\Nifi-Plus\\Demo";
// 组ID,一般写公司的域名即可(一般就是一个域名,可以随意,但不能有奇怪的字符)
static String groupId = "cn.tineaine";
// Processor源文件名(同时也是类名,实际上就是一个简单的Processor文件)
static String processorFileName = "LearnProcessor";
// Nar 包版本(每次发版必须修改)
static String NarVersion = "1.0.0";
//-----------------------------------------------
// 这部分的内容一般情况下请不要修改
//-----------------------------------------------
// 处理器所在包
static String processorPath = groupId + ".processor." + ProcessorName;
// 最终生成包全路径
static String processorFullPath = processorPath + "." + processorFileName;
// nifi processor module
static String processorModule = "nifi-" + ProcessorName + "-processors";
// nifi nar module
static String narModule = "nifi-" + ProcessorName + "-nar";
// 工件ID,一般是项目名(和projectName一致,不建议修改)
static String artifactId = projectName;
// 次要版本(Processor)
static String aceVersion = NarVersion;
TODO
完善集群部署相关文档
增加Reporting Task开发相关文档
增加相关开发实例
参与讨论
(Participate in the discussion)
参与讨论