文件实时对比,将数据组装入库(SQLITE)

这个是自己之前实现的一个功能,主要给自己用。因为这部分逻辑,要改了。但是觉得挺可惜的。先在播客中留下,后续在整理。(框架:php laravel)

<?php
namespace AppServices;
use IlluminateSupportFacadesDB;
use IlluminateSupportFacadesLog;

class LogService {
private $log_path = '';

private $last_read_path = '';

public function __construct() {
$this->log_path = base_path(config('log_record_path'));
$this->last_read_path = base_path(config('log_last_record_path'));
}

/*
定义单独调用的方法
用于每次PHP层写入LOG文件,以及后台写入时实时推进入库
*/
public function pushDatabase() {
//获取出当前日志更新的行数
$lines = $this->getLastLine();
//取出最新更新的日志文件并整理好关系。存入数据库
$this->readFileByLine($this->log_path,$lines);
}

public function readFileByLine($filename,$lines) {
$offset = 1;
$line_stream = array();
$cur_last_str = '';
if(file_exists($filename)) {
$fp = fopen($filename, "r");
while(!feof($fp)){
$stream = fgets($fp); //可能需要指定字节数,目前少不需要。默认是1024
if($lines >= $offset) {
$offset++;
continue;
}
//$line_stream .= rtrim($stream,"
") . ' ';
//剔除换行符
$cur_stream = rtrim($stream);
if(!empty($cur_stream)) {
$line_stream[] = json_decode($cur_stream,true);
}
}
fclose($fp);
if(!empty($line_stream)) {
// 记录最后一条
$cur_last_str = json_encode($line_stream[count($line_stream) -1]);
//整合
$relate_array = array();
foreach($line_stream as $key => &$val) {
$mark = count($val) > 6 ? 'web_log' : 'response';
$relate_array[$val[0]][$mark] = $val;
}
//入库准备
$save_rel_array = array();
$save_rel_multiple = array();
foreach($relate_array as $key => $val) {
//有返回的日志直接整合入库
if(count($relate_array[$key]) == 2 || (count($relate_array[$key]) == 1 && isset($val['web_log']))) {
$save_rel_array['uuid'] = $key;
$save_rel_array['log_desc'] = $val['web_log'][0];
$save_rel_array['log_dec_code'] = $val['web_log'][1];
$save_rel_array['host_name'] = $val['web_log'][2];
$save_rel_array['host_uid'] = $val['web_log'][3];
$save_rel_array['operation_type'] = $val['web_log'][4];
$save_rel_array['operator_ip'] = $val['web_log'][5];
$save_rel_array['operator_name'] = $val['web_log'][6];
$save_rel_array['operator_uuid'] = $val['web_log'][7];
$save_rel_array['progress'] = $val['web_log'][8];
$save_rel_array['start_datetime'] = $val['web_log'][9];
//todo 目前只知道更新状态
if(count($val) == 2) {
$save_rel_array['status'] = isset($val['response'][5]) ? (int)$val['response'][5] : (int)$val['web_log'][10];
}
$save_rel_array['target_type'] = $val['web_log'][11];
$save_rel_array['target_name'] = $val['web_log'][12];
$save_rel_array['target_uid'] = $val['web_log'][13];
array_push($save_rel_multiple,$save_rel_array);
} else {
//执行更新操作
DB::table('log')->where('uuid', $key)->update(['status' => $val['response'][5]]);
}
}
//批量插入(需要做分批,目前本地测试是50条+就炸了,所以分批30)
$part_array = array_chunk($save_rel_multiple,30);
DB::beginTransaction();
try {
foreach($part_array as $val) {
$result = DB::table('log')->insert($val);
}
DB::commit();
//没有入库成功不记录
$this->recordLastLine($cur_last_str);
} catch (IlluminateDatabaseQueryException $e){
DB::rollback();
throw $e;
Log::info('push database error: '. dd($val));
}
}
}
}

public function recordLastLine($cur_last_str = '') {
//记录最后读取的文件
$fp = fopen($this->last_read_path,'w');
if($fp !== false) {
$fw = fwrite($fp, $cur_last_str);
if($fw === false) {
//写入失败,可以记录失败
Log::info('record last line failed: '.$cur_last_str);
}
}
fclose($fp);
}
 
public function getLastLine() {
$last_line = 0;
$last_record = '';
$log_array = array();
if(file_exists($this->log_path)) {
$log_array = file($this->log_path);
}
if(is_array($log_array) && !empty($log_array)) {
if(file_exists($this->last_read_path)) {
$last_record = file_get_contents($this->last_read_path);
}
foreach($log_array as $key => $val) {
if(rtrim($val) == rtrim($last_record)) {
$last_line = (int)($key + 1); //array 从0开始
}
}
// $last_line = !array_search($last_record,$log_array) ? 0 : array_search($last_record,$log_array) + 1; //array 从0开始
}
return $last_line;
}

public function writeLog($logContent) {
if (lock_file($this->log_path)) {
file_put_contents($this->log_path, $logContent . PHP_EOL, FILE_APPEND);
unlock_file($this->log_path);
return true;
} else {
return false;
}
}
}
原文地址:https://www.cnblogs.com/mikusnail/p/9039593.html