读取文件

package zhunneng;

import java.io.InputStream;
import java.util.List;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.alibaba.fastjson.JSON;
import com.bawei.utils.StreamUtil;
import com.liujin.cms.domain.Plan;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:producer.xml")
public class TestImportData {

    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;
    
    @Test
    public void testImport() {
        InputStream is = this.getClass().getResourceAsStream("/data.txt");
        //使用工具类进行解析
        List<String> read = StreamUtil.readLine(is);
        for (String line : read) {
            //System.out.println(line);
            //每个|都要转义
            String[] split = line.split("\|\|");
            String name=split[0];
            double amount=Double.parseDouble(split[1]);
            String manager=split[3];
            String content=split[2];
            int dept_id=0;
            if (split[4].contains("")) {
                dept_id=1;
            }else if (split[4].contains("准能选煤厂")) {
                dept_id=2;
            }else if (split[4].contains("洗选车间")) {
                dept_id=3;
            }else if (split[4].contains("生产服务中心")) {
                dept_id=4;
            }else if (split[4].contains("矸电公司")) {
                dept_id=5;
            }else if(split[4].contains("大准铁路公司")){
                dept_id=6;
            }
            Plan plan = new Plan();
            plan.setName(name);
            plan.setManager(manager);
            plan.setContent(content);
            plan.setAmount(amount);
            String jsonString = JSON.toJSONString(plan);
            kafkaTemplate.send("zhunneng", jsonString);
        }
    }
    
}
package com.liujin;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.liujin.cms.domain.CompanyAnnualCheck;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:spring-beans.xml")
public class TestportData {

    @Autowired
    RedisTemplate redisTemplate;
    
    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;
    
    @SuppressWarnings({ "resource", "unchecked" })
    @Test
    public void testImport() throws IOException, ParseException {
        //1.准备源文件
        File file = new File("G:\QQ文件\专高\专二\理论\data.txt");
        //2.读取文本文件的内容
        BufferedReader br = new BufferedReader(new FileReader(file));
        String line=null;
        br.readLine();//排除第一行
        //2.1.循环读取
        ArrayList<CompanyAnnualCheck> list = new ArrayList<CompanyAnnualCheck>();
        while ((line=br.readLine())!=null) {
            //声明一个实体类
            CompanyAnnualCheck check = new CompanyAnnualCheck();
            String[] split = line.split("    ");
            int id=Integer.parseInt(split[0]);
            String keywords=split[1];
            String description=split[2];
            String companyName=split[3];
            String mainBusinessProducts=split[4];
            String address=split[5];
            String registeredCapital=split[6];
            Date annualCheckDate=new SimpleDateFormat("yyyy/MM/dd").parse(split[7]);
            String annualCheckStatus=split[8];
            check.setId(id);
            check.setKeywords(keywords);
            check.setDescription(description);
            check.setCompanyName(companyName);
            check.setMainBusinessProducts(mainBusinessProducts);
            check.setAddress(address);
            check.setRegisteredCapital(registeredCapital);
            check.setAnnualCheckDate(annualCheckDate);
            check.setAnnualCheckStatus(annualCheckStatus);
            System.out.println(check);
            list.add(check);
            
        }
        //3.读取出来的内容一行一行的解析
        //解析,封装完数据之后,就可以把这些对象保存到redis中,并且向卡夫卡发送消息
        redisTemplate.opsForList().leftPushAll("check_list", list.toArray());
        System.err.println("保存到redis成功!!!");
        kafkaTemplate.send("check", "check_list");
        System.err.println("已经把Redis的key发送到卡夫卡");
    }
    
}
原文地址:https://www.cnblogs.com/liujinqq7/p/12693934.html