简介

引用与链接

Apache

Tine Aine:CSDN博客

Apache Documentation

Explore - Docker Hub

Apache Nifi - Wikipedia

Flow-based programming - Wikipedia

Low-code development platform - Wikipedia

How An Arcane Coding Method From 1970s Banking Software Could Save The (fastcompany.com)


部署

单机部署

默认情况下,Nifi官方的包可以直接启动(开箱即用),无需进行额外配置;但需要注意本地的 8080 端口是否被占用,因为默认情况下Nifi会通过 8080 端口开放服务,如果该端口被占用,可以修改 conf 目录中的

下载

下载地址

Apache Nifi 官方下载地址

如无特殊要求,下载最新包即可

本地部署

解压缩以后,可以得到如下的目录结构:

测试

当前测试仅在Windows下进行,如需在Linux下测试,在Nifi的 bin 目录执行 sudo ./nifi.sh start 即可

进入 bin 目录,直接双击 run-nifi.bat 启动Nifi,会自动弹出CMD窗口,静候 3-5分钟 

打开浏览器,在地址栏输入 localhost:8080/nifi ,进入Nifi

集群部署

集群可以更好的实现Nifi的负载均衡,对于大多数生产环境而言,建议使用集群的方式进行部署

K8S Docker 集群

K8S(Kubernets)集群方式和Docker Compose类似,但K8S应用更加广泛,更加适合大型网络的负载均衡以及迁移等需求

本文中通过 yaml 文件的方式在 Kubernets 中搭建 Nifi集群 ,使用 外置Zookeeper ,本文中不包含Kubernets集群的搭建步骤,我们假设您已经拥有一个能够正常使用的Kubernets集群环境

前提和准备

  • 准备Kubernets 环境(假设您已经搭建完成,且可以正常连通使用)

  • 准备本地Docker(无论基于Linux发行版还是本地WSL2均可)

  • 准备本地Docker镜像(Zookeeper和Nifi镜像,Nifi镜像也可以自行打包)

  • 准备 yaml 配置文件(Zookeeper部署/服务文件,Nifi部署/服务文件)

  • 准备Apache Nifi 源码(Apache Nifi 官方网站或GitHub方式下载)

注:如果您需要自行打包或需要对其功能进行修改的情况下需要下载Nifi源码

版本限定

  • Zookeeper镜像 :bitnami/zookeeper:latest

  • Apache Nifi镜像:apache/nifi:latest

资源

nifi 部署yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: nifi-131-v1
  namespace: olmp
  selfLink: /apis/apps/v1/namespaces/olmp/deployments/nifi-131-v1
  uid: 7e32f310-7c96-11eb-b020-000c296c4b14
  resourceVersion: '87155117'
  generation: 45
  creationTimestamp: '2021-03-04T03:05:36Z'
  labels:
    applicationName: nifi
    applicationType: throughImage
    envId: '8'
    kubernetes.customized/pod-affinity: nifi-131
    versionName: v1
  annotations:
    deployment.kubernetes.io/revision: '18'
spec:
  replicas: 2
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      applicationName: nifi
      versionName: v1
  strategy:
    type: Recreate
  template:
    metadata:
      creationTimestamp: null
      labels:
        applicationName: nifi
        applicationType: throughImage
        envId: '8'
        kubernetes.customized/pod-affinity: nifi-131
        parentKind: Deployment
        versionName: v1
    spec:
      volumes:
      - name: v1614844117709
        configMap:
          name: nifi-config
          items:
          - key: start.sh
            path: start.sh
          defaultMode: 511
      containers:
      - name: nifi
        image: bjharbor.sccba.org/public/olmp/nifi:1.12.1-dockermaven
        env:
        - name: NIFI_CLUSTER_ADDRESS
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.podIP
        - name: NIFI_ELECTION_MAX_CANDIDATES
          value: '1'
        - name: NIFI_ZK_CONNECT_STRING
          value: olmp-zk.olmp:2181
        - name: NIFI_CLUSTER_IS_NODE
          value: 'true'
        - name: NIFI_CLUSTER_NODE_PROTOCOL_PORT
          value: '8082'
        - name: NIFI_ELECTION_MAX_WAIT
          value: 1 min
        - name: NIFI_WEB_HTTP_HOST
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.podIP
        - name: NIFI_REMOTE_INPUT_HOST
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.podIP
        resources:
          limits:
            cpu: '1'
            memory: 2G
          requests:
            cpu: '1'
            memory: 2G
        volumeMounts:
        - name: v1614844117709
          mountPath: /opt/nifi/scripts/start.sh
          subPath: start.sh
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        imagePullPolicy: IfNotPresent
      restartPolicy: Always
      terminationGracePeriodSeconds: 30
      dnsPolicy: ClusterFirst
      securityContext: {}
      imagePullSecrets:
      - name: olmp
      schedulerName: default-scheduler
status:
  observedGeneration: 45
  replicas: 2
  updatedReplicas: 2
  readyReplicas: 2
  availableReplicas: 2
  conditions:
  - type: Available
    status: 'True'
    lastUpdateTime: '2021-03-04T08:12:28Z'
    lastTransitionTime: '2021-03-04T08:12:28Z'
    reason: MinimumReplicasAvailable
    message: Deployment has minimum availability.
  - type: Progressing
    status: 'True'
    lastUpdateTime: '2021-03-04T08:12:28Z'
    lastTransitionTime: '2021-03-04T03:05:36Z'
    reason: NewReplicaSetAvailable
    message: ReplicaSet "nifi-131-v1-746dfb547f" has successfully progressed.

nifi 服务yaml

apiVersion: v1
kind: Service
metadata:
  namespace: zjyw-st-zjyw
  annotations:
    kompose.cmd: kompose -f docker-compose.yml convert
    kompose.version: 1.1.0 (36652f6)
  creationTimestamp: null
  labels:
    io.kompose.service: nifi
  name: nifi
spec:
  ports:
  - name: "8080"
    port: 8080
    targetPort: 8080
  selector:
    io.kompose.service: nifi
status:
  loadBalancer: {}

zookeeper 部署yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  namespace: zjyw-st-zjyw
  annotations:
    kompose.cmd: kompose -f docker-compose.yml convert
    kompose.version: 1.1.0 (36652f6)
  creationTimestamp: null
  labels:
    io.kompose.service: zookeeper
  name: zookeeper
spec:
  replicas: 1
  strategy: {}
  selector:
    matchLabels:
      io.kompose.service: zookeeper
  template:
    metadata:
      creationTimestamp: null
      labels:
        io.kompose.service: zookeeper
    spec:
      containers:
      - env:
        - name: ALLOW_ANONYMOUS_LOGIN
          value: "yes"
        image: bitnami/zookeeper:latest
        name: zookeeper
        resources: {}
      restartPolicy: Always
status: {}

zookeeper 服务yaml

apiVersion: v1
kind: Service
metadata:
  namespace: zjyw-st-zjyw
  annotations:
    kompose.cmd: kompose -f docker-compose.yml convert
    kompose.version: 1.1.0 (36652f6)
  creationTimestamp: null
  labels:
    io.kompose.service: zookeeper
  name: zookeeper
spec:
  clusterIP: None
  ports:
  - name: headless
    port: 55555
    targetPort: 0
  selector:
    io.kompose.service: zookeeper
status:
  loadBalancer: {}

步骤

概略

  • 创建Zookeeper部署

  • 创建Zookeeper服务

  • 创建Nifi部署

  • 创建Nifi服务


Nifi 性能优化

1.配置最大文件句柄

Maximum File Handles )

sudo vi /etc/security/limits.conf

内容为

hard nofile 50000
soft nofile 50000

2.配置最大派生进程数

Maximum Forked Processes )

sudo vi /etc/security/limits.conf

内容为

hard  nproc  10000
soft  nproc  10000

对于大多数系统可能需要增加一个新文件

sudo vi /etc/security/limits.d/90-nproc.conf

其中添加

soft nproc 10000

3.配置TCP Socket端口数

Increase the number of TCP socket ports available )

sudo sysctl -w net.ipv4.ip_local_port_range ="10000 65000"

4.设置Socket套接字停留时间

(减少 TIMED_WAIT 状态持续时间)

sudo sysctl -w net.ipv4.netfilter.ip_conntrack_tcp_timeout_time_wait ="1"

5.禁止SWAP

禁止swap会避免因为磁盘IO造成速度降幅,当然最好是增大内存容量

sudo vi /etc/sysctl.conf

添加如下行

vm.swappiness = 0

6.对Repository禁用atime

sudo vi /etc/fstab

其内容为

proc /proc proc defaults 0 0
none /dev/pts devpts gid=5,mode=620 0 0
/dev/md0 /boot ext3 defaults 0 0
/dev/md1 none swap sw 0 0
# 添加noatime选项
/dev/md2 / ext3 defaults,noatime 0 0

最后重新挂载

mount -o remount /

7.排除防病毒扫描目录

  • content_repository

  • flowfile_repository

  • provenance_repository

  • logs

  • state

以上的目录一旦被其他程序占用,将会导致系统被暂停,特别是防病毒软件在扫描时,应该排除这些目录

8.硬件优化原则

并发的处理器越多,CPU就要越强(特别是线程数要多)

处理分发的数据越大,内存就要随着增大,并且尽量不要使用系统的memory swap

系统硬件瓶颈大多出现在内存容量和硬盘IO,因为FlowFile可以看做是运行在内存中的,如果回溯的需求很多(需要经常回溯数据),那么尽可能使用固态甚至内存持久化技术来存储各种Repository

如果处理器特别多,项目特别大,那么一定要预先准备性能高的Cpu,容量大的内存

如果运行速度过慢,可以检查

1.是开启了内存交换,一般建议增加内存,以防止系统进行内存交换

2.是否CPU出现了瓶颈,建议增加CPU性能

启动和关闭时默认都会有一定的延迟(一般为3-5秒),这是系统预准备的延迟,启动延迟用于防止系统出现数据一次性送到控制台引起异常,而关闭延迟用于等待Niif Core退出,超时(一般5秒)会由Bootstrap自动销毁掉进程。


使用

定时任务

Nifi定时任务不需要在处理器中定义,只需要在处理器配置中的Scheduling标签页中设置即可

设置位置

表达式的基本形式

秒 分钟 小时 一个月中的一天 月 周(星期) 年(可选)

特殊标识

通过*表达通配符,也就是任意,通过?标识未指定,仅用于周(星期)

值的表示

  • 单独数字(可以用,分割多个时间段),如分钟字段设置为 1,2,5,即代表1分钟,2分钟和5分钟

  • 时间范围(一个时间段,用-表示)如分钟字段设置为 1-10,即代表从1分钟到十分钟

  • 自动增量(系统自动增量,用/分割开始和增量)如分钟字段设置为 0/10,即代表0,10,20,30,40,50

案例

# 表示在每天中午的12点执行
0 0 12 * * ?
# 表示在每天中午的12点12分12秒执行
12 12 12 * * ?
# 表示仅在2021年的每天中午的12点12分12秒执行
12 12 12 * * ? 2021
# 表示仅在2021年到2025年的每天中午的12点12分12秒执行
12 12 12 * * ? 2021-2025


Nifi开发

Processor开发

如何使处理器不接受传入

// 起源处理器(不接受数据流,仅流出数据)
// 在处理器类上增加下面的注释即可
@InputRequirement(Requirement.INPUT_FORBIDDEN)

如何创建Processor参数和返回

在这个例子中,参数2只有在参数1为True时方为有效,否则不会显示

1.声明参数

// 参数1
public static final PropertyDescriptor USE_FILE = new PropertyDescriptor.Buildler()
    // 参数名
    .name("Use File")
    // 显示名
    .displayName("Use File")
    // 必须的参数(如果不填则处理器报错)
    .required(true)
    // 允许的值(其值只能是括号内的值)
    .allowableValues("true", "false")
    // 默认值
    .defaultValue("true")
    .build();

// 参数2
public static final PropertyDescriptor FILE = new PropertyDescriptor.Builder()
    .name("File to Use")
    .displayName("File to Use")
    .required(true)
    // 验证器(这个验证的是文件是否存在,不存在处理器会报错)
    .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
    // 可用条件(只有在USE_FILE属性的值为true时,这个参数才为可用???
    .dependsOn(USE_FILE, "true")
    .build();

2.声明返回

// 返回1
public static final Relationship REL_SUCCESS = new Relationship.Builder()   
    // 名称(在nifi中显示的名称)
    .name("success")
    // 描述
    .description("The flowfile contains the original content with one or more attributes added containing the respective counts")
    .build();
// 返回2
public static final Relationship REL_FAILURE = new Relationship.Builder()
    	// 名称(在nifi中显示的名称)
    .name("failure")
    	// 描述
    .description("If the flowfile text cannot be counted for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
        .build();

2.加入到处理器中

// 用于存放属性    
private static final List<PropertyDescriptor> properties;
// 用于存放返回
private static final Set<Relationship> relationships;
// 在一个static代码块中初始化
static {
    // 将参数附加到list中
    properties = Collections.unmodifiableList(Arrays.asList(USE_FILE, FILE));
	// 将返回值附加到set中
    relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS,
            REL_FAILURE)));
}

如何读取Processor参数

1.首先定义参数

    public static final PropertyDescriptor TEXT = new PropertyDescriptor
            .Builder().name("Text")
            .displayName("text")
            .description("这部分是描述")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

2.将参数放入descriptors中

    @Override
    protected void init(final ProcessorInitializationContext context) {
		// 参数
        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(TEXT);
        this.descriptors = Collections.unmodifiableList(descriptors);
		// 返回
        final Set<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(SUCCESS);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

3.在onTrigger中获取参数

final String text = context.getProperty(TEXT).getValue();

如何读Content

if (RedisConfig.equals("ERROR")) {
    // Oracle的配置
    final Charset charset = StandardCharsets.UTF_8;
    final byte[] buffer = new byte[1024];
    final AtomicInteger bufferedByteCount = new AtomicInteger(0);
    session.read(flowFile, new InputStreamCallback() {
        @Override
        public void process(final InputStream in) throws IOException {
            bufferedByteCount.set(StreamUtils.fillBuffer(in, buffer, false));
        }
    });
    // 这个变量中存储的就是Content的内容了
    String Config = new String(buffer, 0, bufferedByteCount.get(), charset);

如何写Content

// flowFile = session.write(flowFile, out -> out.write("要写入Content的内容"));
flowFile = session.write(flowFile, out -> out.write(value.get().getBytes()));

如何读取Attribute

// 参数key指的就是属性的key值,传入即可获取value
String test = flowFile.getAttribute(key);

如何写Attribute

1.类中定义参数的键名和值

// 定义参数和值
public static final String TEST = "test";

2.写入FlowFile

// 定义一个HashMap(用于存放新的属性,名称不限)
Map<String, String> metricAttributes = new HashMap<>();
// 将属性送入一个Map中(多个属性就多次压入)
metricAttributes.put(TEST, "Hello");
// 将所有属性送入FlowFile(此处创建了一个新的FlowFile,并将旧的FlowFile中的内容复制,但属性是新增的)
FlowFile updatedFlowFile = processSession.putAllAttributes(FlowFile, metricAttributes);
// 传送FlowFile到下游处理器
processSession.transfer(updatedFlowFile, REL_SUCCESS);

简单的Processor示例

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.tineaine.processor.demo;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
 * 概念
 * 流文件:(FlowFile、数据流),指在Nifi中流动的一条数据文件(可类比与TCP中的一个请求)
 * 流文件属性:(FlowFile Attribute,属性),指FlowFile中用于记录流文件本身属性的Map,比如一个流文件传递的是一个文件,那么属性就是文件名、文件大小,是否运行读写等属性
 * 流文件内容:(FlowFile Content,内容),指FlowFile中用于记录流文件传递数据的内容,如上所例,Content指的就是该文件的内容
 * 处理器参数:(Processor Attribute,参数),指用户对处理器的配置
 * 处理器关系:(Processor Relationships、关系、流出关系、流出),指处理器处理完成数据后,该数据流出的路径
 */

// 标签,该注解声明了处理器的标签,以便于用户在选择处理器时进行筛选
@Tags({"Aine"})
@CapabilityDescription("")
@ReadsAttributes({@ReadsAttribute(attribute = "", description = "")})
@WritesAttributes({@WritesAttribute(attribute = "", description = "")})
public class LearnProcessor extends AbstractProcessor {
    // 处理器参数,一般的该参数都以全大写字母命名
    public static final PropertyDescriptor TEXT = new PropertyDescriptor
            // 参数名称(此名称不能重复)
            .Builder().name("Str")
            // 参数显示名称(用户在JStudio平台中看到的参数名称)
            .displayName("Str")
            // 参数描述(用户在选择处理器界面出现的处理器描述,一般是对处理器的描述)
            .description("附加字符串")
            // 是否是必须的参数,如果为True,则该参数必须被填写,否则处理器无法运行
            .required(true)
            // 参数验证器,此处代表该参数不能为空
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    // 关系,指处理器处理完数据之后,数据流的出口
    public static final Relationship SUCCESS = new Relationship.Builder()
            // 流出关系的名称(显示名称)
            .name("Success")
            // 流出关系的描述
            .description("成功")
            .build();
    private volatile Set<String> dynamicPropertyNames = new HashSet<>();
    private List<PropertyDescriptor> properties;
    // 该List用于存储所有的参数
    private List<PropertyDescriptor> descriptors;
    // 该Set用于存储所有的流出关系
    private Set<Relationship> relationships;

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }

    // 动态属性缓存
    private volatile Map<Relationship, PropertyValue> propertyMap = new HashMap<>();

    // 初始化函数,该函数会被Nifi在处理器执行时首先调用,此处用于将参数和关系加入列表中
    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        // 所有的参数必须药加入,然后加入到descriptors中
        descriptors.add(TEXT);
        this.descriptors = Collections.unmodifiableList(descriptors);
        final Set<Relationship> relationships = new HashSet<Relationship>();
        // 所有的关系必须药加入,然后加入到relationships中
        relationships.add(SUCCESS);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder()
                .required(false)
                .name(propertyDescriptorName)
                .dynamic(true)
                .build();
    }

    @Override
    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        final Set<String> newDynamicPropertyNames = new HashSet<>(dynamicPropertyNames);
        if (newValue == null) {
            newDynamicPropertyNames.remove(descriptor.getName());
        } else if (oldValue == null) {    // new property
            newDynamicPropertyNames.add(descriptor.getName());
        }

        this.dynamicPropertyNames = Collections.unmodifiableSet(newDynamicPropertyNames);
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {
        final Map<Relationship, PropertyValue> newPropertyMap = new HashMap<>();
        for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
            if (!descriptor.isDynamic()) {
                continue;
            }
            getLogger().debug("Adding new dynamic property: {}", new Object[]{descriptor});
            newPropertyMap.put(new Relationship.Builder().name(descriptor.getName()).build(), context.getProperty(descriptor));
        }

        this.propertyMap = newPropertyMap;
    }

    // 这个函数会在处理器被触发时执行
    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        // 从session中获取流文件
        FlowFile flowFile = session.get();
        // 判断流文件是否为空,如果为空终止掉处理器即可
        if (flowFile == null) {
            return;
        }
        // 通过使用原子操作,尽可能避免出现并发问题
        final AtomicReference<String> value = new AtomicReference<>();
        // 设置字符集
        final Charset charset = StandardCharsets.UTF_8;
        final byte[] buffer = new byte[1024];
        // 原子操作类(Integer)
        final AtomicInteger bufferedByteCount = new AtomicInteger(0);
        // 从流文件中读取所有的内容
        session.read(flowFile, in -> bufferedByteCount.set(StreamUtils.fillBuffer(in, buffer, false)));
        // 读取用户参数
        final String text = context.getProperty(TEXT).getValue();
        // 将流中的数据进行追加
        value.set(new String(buffer, 0, bufferedByteCount.get(), charset) + text);
        Map<String, String> map = new HashMap<>();
        for (Relationship key : propertyMap.keySet()) {
            map.put(key.getName(), propertyMap.get(key).getValue());
        }
        flowFile = session.putAllAttributes(flowFile, map);
        // 写Content
        flowFile = session.write(flowFile, out -> out.write(value.get().getBytes(StandardCharsets.UTF_8)));
        // 向下传输流
        session.transfer(flowFile, SUCCESS);
    }
}

ControllerService开发

Controller ServiceProcessor的开发大同小异

我们可以简单的把Controller Service分为Api、Service(Impl)和Nar(包括api nar和service nar两部分)

其中api部分类似于我们在做MVC开发时,在Service层写的接口,而Controller Service Impl则是该接口的实现,Nar不必多说,就是Api和Service生成的Nar包目录;但需要注意的是,Controller Service在Maven打包时会生成两个包,因此想Nifi的lib中复制时需要注意。

Nifi Api

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.nifi.example;

import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.processor.exception.ProcessException;

@Tags({"example"})
@CapabilityDescription("Example Service API.")
public interface ConfigServer extends ControllerService {

    public void flush()  throws ProcessException;

    public String getVal() throws ProcessException;

}

Nifi Impl

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.nifi.example;

import cn.hutool.http.HttpUtil;
import com.alibaba.nacos.api.common.Constants;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;

import java.util.*;

@Tags({"config", "aine"})
@CapabilityDescription("Read Config Server.")
public class NacosService extends AbstractControllerService implements ConfigServer {

    private String config = "Error";

    public static final Validator NON_EMPTY_VALIDATOR = (subject, value, context) -> {
        return new ValidationResult.Builder()
                .subject(subject)
                .input(value)
                .valid(value != null && !value.isEmpty())
                .explanation(subject + " cannot be empty")
                .build();
    };
    // Nacos地址
    public static final PropertyDescriptor SERVER_ADDR = new PropertyDescriptor
            .Builder().name("nacos")
            .displayName("nacos")
            .description("Nacos Server Address")
            .required(true)
            .addValidator(NON_EMPTY_VALIDATOR)
            .build();
    // 配置 ID
    public static final PropertyDescriptor DATA_ID = new PropertyDescriptor
            .Builder().name("dataId")
            .displayName("Data Id")
            .description("Data id")
            .required(true)
            .addValidator(NON_EMPTY_VALIDATOR)
            .build();

    // 配置分组
    public static final PropertyDescriptor GROUP = new PropertyDescriptor
            .Builder().name("group")
            .displayName("Group")
            .description("Group")
            .required(true)
            .addValidator(NON_EMPTY_VALIDATOR)
            .build();

    // 命名空间
    public static final PropertyDescriptor NAMESPACE = new PropertyDescriptor
            .Builder().name("namespace")
            .displayName("namespace")
            .description("namespace")
            .required(true)
            .addValidator(NON_EMPTY_VALIDATOR)
            .build();

    private static final List<PropertyDescriptor> properties;

    String serverAddr = "";
    String dataId = "";
    String group = "";
    String namespace = "";

    static {
        final List<PropertyDescriptor> props = new ArrayList<>();
        props.add(SERVER_ADDR);
        props.add(NAMESPACE);
        props.add(DATA_ID);
        props.add(GROUP);
        properties = Collections.unmodifiableList(props);
    }

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }

    /**
     * @param context the configuration context
     * @throws InitializationException if unable to create a database connection
     */

    @OnEnabled
    public void onEnabled(final ConfigurationContext context) throws InitializationException {
        try {
            serverAddr = context.getProperty(SERVER_ADDR).getValue();
            dataId = context.getProperty(DATA_ID).getValue();
            group = context.getProperty(GROUP).getValue();
            namespace = context.getProperty(NAMESPACE).getValue();
            flush();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    @OnDisabled
    public void shutdown() {

    }

    @Override
    public void flush() throws ProcessException {
        try {
            Map<String, Object> paramMap = new HashMap<>(3);
            paramMap.put("tenant", namespace);
            paramMap.put(Constants.DATAID, dataId);
            paramMap.put(Constants.GROUP, group);
            config = HttpUtil.get(serverAddr + "/nacos" + Constants.CONFIG_CONTROLLER_PATH, paramMap);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public String getVal() throws ProcessException {
        return this.config;
    }


}


异常记录

环境异常

Kubernets集群中Nifi Pod间无法互相访问

不要使用默认的 主机名 方式进行Pod间的连接,需要通过Pod Ip的方式进行访问,表现形式为Pod可以访问到Zookeeper(因为Zookeeper是通过Service方式开放的),但Pod间无法正常访问,导致无法选举主节点和进行互相通信。

Yaml 中写法如下:

        - name: NIFI_WEB_HTTP_HOST
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.podIP
        - name: NIFI_REMOTE_INPUT_HOST
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.podIP

开发异常

FlowFile 传递中文乱码

在任何情况下,只要Nifi中出现了乱码,就首先要观察Nifi中的数据流,查看 流入 和 流出 的数据是否乱码,定位出现乱码的处理器,一般的,最常见的乱码原因都是因为没有使用 UTF-8 的编码方式,案例如下:

flowFile = session.write(flowFile, out -> out.write(outputs.getBytes()));

可见在将文本写入 FlowFile 的 内容(Content) 时,没有将其转换为 UTF-8 ,解决方案就是在转为Bytes时将其作为 UTF-8 的方式进行转换:

flowFile = session.write(flowFile, out -> out.write(outputs.getBytes(StandardCharsets.UTF_8)));

HttpPost传递XML到远程服务时,XML丢失

不要添加JSON声明

// 去掉或注释掉设置请求头的代码
// httpPost.setHeader("Content-Type","application/json;charset=utf-8");

附录

新增包一览

Processor

JsonAnalysis

Json分析处理器,允许对Json进行解析,并获取其中的值

JsonToXml

AttributeToJson

ReadFile

Split

WriteFile

AttributeToXml

MapToXml

XmlAnalysis

XmlToJson

XmlToMap

ModAttribute

ModContent

SocketClient

RelayHttp

RelaySocket

AES

DES

Execute

Controller Service

NacosService

处理器代码生成器的使用

    // 项目名称(填写你的项目名)
    static String projectName = "Jeesing";
    // 最终包名(nifi processor 最终nar包名称,用于描述功能类,不要和其他的nar包重复)
    static String ProcessorName = "Demo";
    // 项目所在位置(在哪里创建就写哪里)
    static String projectPath = "F:\\Project\\develop\\work\\Nifi-Plus\\Demo";
    // 组ID,一般写公司的域名即可(一般就是一个域名,可以随意,但不能有奇怪的字符)
    static String groupId = "cn.tineaine";
    // Processor源文件名(同时也是类名,实际上就是一个简单的Processor文件)
    static String processorFileName = "LearnProcessor";
    // Nar 包版本(每次发版必须修改)
    static String NarVersion = "1.0.0";

    //-----------------------------------------------
    // 这部分的内容一般情况下请不要修改
    //-----------------------------------------------

    // 处理器所在包
    static String processorPath = groupId + ".processor." + ProcessorName;
    // 最终生成包全路径
    static String processorFullPath = processorPath + "." + processorFileName;
    // nifi processor module
    static String processorModule = "nifi-" + ProcessorName + "-processors";
    // nifi nar module
    static String narModule = "nifi-" + ProcessorName + "-nar";
    // 工件ID,一般是项目名(和projectName一致,不建议修改)
    static String artifactId = projectName;
    // 次要版本(Processor)
    static String aceVersion = NarVersion;

TODO

  • 完善集群部署相关文档

  • 增加Reporting Task开发相关文档

  • 增加相关开发实例