汇率多线程 草稿

1.html
<%@ Page Language="C#" AutoEventWireup="true" CodeBehind="WebForm2.aspx.cs" Inherits="WebApplication20.WebForm2" %>

<!DOCTYPE html>

<html xmlns="http://www.w3.org/1999/xhtml">
<head runat="server">
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
    <title></title>
</head>
<body>
    <form id="form1" runat="server">
        
    <div>
        <input name="_method" value="getFxRate"  type="hidden"/>
    <input type="date" name="start" />--<input type="date" name="end" />
    </div>
        <input type="submit" />
    </form>
</body>
</html>
2.cs
using Contract.Domain;
using ETLAPP;
using Framework;
using HraWeb.Common;
using NHibernate;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Web;
using System.Web.UI;
using System.Web.UI.WebControls;
using System.Xml;
using ThreadTemplate;
using Contract.IService;
namespace WebApplication20
{
    public enum SearchRange
    {
        th = 0,
        td = 1
    }
    class ThreadParameters
    {
        public string Url
        {
            get;
            set;
        }
        public string pairId
        {
            get;
            set;
        }
        public string Status
        {
            get;
            set;
        }
        public int PageCount
        {
            get;
            set;
        }
        public int pageIndex
        {
            get;
            set;
        }
        public string start { get; set; }

        public string end { get; set; }

        public string webCode { get; set; }
    }

    public partial class WebForm2 : BasePage
    {
        public static ArrayList contentList = new ArrayList();
        private static System.Data.DataTable table = null;
        private object InitThread(object para, SubThread th)
        {
            List<string> trList = new List<string>();
            var obj = (ThreadParameters)para;
            obj.pageIndex = 1;
            int pageCount = 0;
            //已经运行了一页的数据
            getData(para, null);
            pageCount = obj.PageCount;
            obj.pageIndex = pageCount;
            getData(obj, th);
            return "";

        }
        public static Dictionary<string, BasCurrencyPair> dic = new Dictionary<string, BasCurrencyPair>();
        private object getData(object para, SubThread th)
        {


            ABC: ThreadParameters obj = (ThreadParameters)para;
            var url = $"http://srh.bankofchina.com/search/whpj/search.jsp?erectDate={obj.start}&nothing={obj.start}&pjname=" + obj.webCode + "&page=" + obj.pageIndex; ;
            WebClient wc = new WebClient();
            List<string> trList = new List<string>();
            try
            {
                using (Stream stream = wc.OpenRead(url))
                {
                    using (StreamReader sr = new StreamReader(stream, Encoding.UTF8))
                    {

                        string content = sr.ReadToEnd();
                        string pagePatern = @"var m_nRecordCount = (.*);";
                        var pageMatch = Regex.Match(content, pagePatern);
                        int rows = int.Parse(pageMatch.Groups[1].Value);
                        int pagesize = 20;
                        int pages = (rows / pagesize) + (rows % pagesize == 0 ? 0 : 1);
                        if (obj.pageIndex == 1)
                        {
                            obj.PageCount = pages;
                        }
                        //提取div内容开始
                        string divPatern = @"(?<=<div (.*)?class=""BOC_main publish""[^>]*?>)([sS]*?)(?=</div>)";
                        MatchCollection divMatches = Regex.Matches(content, divPatern);
                        string divContent = string.Empty;
                        foreach (Match match2 in divMatches)
                        {
                            divContent = match2.Groups[0].Value;
                            break;
                        }
                        //提取div内容结束

                        //提取表格内容开始
                        string tablePatern = @"(?<=<table (.*)?[^>]*?>)([sS]*?)(?=</table>)";
                        MatchCollection tableMatches = Regex.Matches(divContent, tablePatern);
                        string tableContent = string.Empty;
                        foreach (Match match1 in tableMatches)
                        {
                            tableContent = match1.Groups[0].Value;
                            break;
                        }
                        string trPatern = @"(?<=<tr(.*)?[^>]*?>)([sS]*?)(?=</tr>)";
                        MatchCollection trMatchCollection = Regex.Matches(tableContent, trPatern);
                        Match match = trMatchCollection[trMatchCollection.Count - 2];
                        string tr1 = string.Empty;
                        tr1 = match.Groups[0].Value;
                        trList.Add(tr1);
                        //提取行结束

                    }

                    //获取表头列元素,或者内容行的单元格元素 trlist[0]是表头 SearchR,ange告诉程序要查表头 还是 内容行
                    System.Collections.ArrayList tdsList = new System.Collections.ArrayList();
                    ArrayList list = new ArrayList();
                    System.Threading.Monitor.Enter(table);
                    DataRow row = table.NewRow();
                    System.Threading.Monitor.Exit(table);

                    List<string> tr = null;
                    if (trList.Count >= 1)
                    {
                        tr = GET_TH_OR_TD_LIST(SearchRange.td, trList[trList.Count - 1]);
                    }



                    //row.FxRate = decimal.Parse(tr[6]);
                    if (tr.Count > 0)
                    {
                        var date = DateTime.Now.Date;
                        row["CURRENCY_PAIR"] = Int64.Parse(obj.pairId);
                        row["FX_RATE"] = decimal.Parse(tr[6]);
                        DateTime.TryParse(tr[7], out date);
                        row["CAPTURE_DATE"] = date.Date;
                    }


                    if ((obj.PageCount > 1 && obj.pageIndex > 1) || (obj.PageCount == 1))
                    {
                        if (th != null)
                        {
                            th.ReturnObj = row;

                        }
                    }
                    obj.Status = "完成";
                }
            }
            //应对无法连接到远程服务器,远程服务器没有响应或应答
            catch (System.Net.WebException ex)
            {

                goto ABC;
            }

            return "子完成";
        }


        private Contract.IService.IDaoService _dao;
        public Contract.IService.IDaoService Dao
        {
            get
            {
                if (_dao == null)
                {
                    lock (new object())
                    {
                        if (_dao == null)
                        {
                            _dao = (Contract.IService.IDaoService)ctx["DaoService"];
                        }
                    }
                }
                return _dao;
            }
        }
        private ISession _session;
        public ISession session
        {
            get
            {
                if (_session == null)
                {
                    lock (new object())
                    {
                        if (_session == null)
                            _session = Dao.GetSession();
                    }
                }
                return _session;
            }
        }

        private List<string> GET_TH_OR_TD_LIST(SearchRange range, string row)
        {
            string tmp = "";
            tmp = range.ToString();
            string tdPatern = $@"(?<=(<{tmp}[^>]*?>))(?<tdCell>[sS]*?)(?=</{tmp}>)";
            MatchCollection CurrenttdMatchCollection = Regex.Matches(row, tdPatern);
            string td = string.Empty;
            List<string> tdlList = new List<string>();
            List<string> contentList = new List<string>();
            foreach (Match match in CurrenttdMatchCollection)
            {

                td = match.Groups["tdCell"].Value;
                contentList.Add(td);

            }
            return contentList;

        }
        public object RateThreadCompeleted(object currentThread)
        {
            SubThread CallThread = currentThread as SubThread;

            SubThread t11 = new SubThread(Guid.NewGuid().ToString(), (a, b) =>
            {
                Monitor.Enter(table);
                if (table.Rows.Count < 100)
                    table.Rows.Add((DataRow)(CallThread.ReturnObj));
                if (table.Rows.Count == 100)
                {
                    Holworth.Utility.HraUtility.DataTableWriteToServer(table, "BAS_FX_RATE_BAK3", "ID", true);
                    table.Clear();
                }
                Monitor.Exit(table);
                return null;

            });
            t11.Start();


            return null;
        }
        protected void Page_Load(object sender, EventArgs e)
        {
            if (table == null)
            {
                table = new System.Data.DataTable();
                System.Threading.Monitor.Enter(table);
                //没想到添加列的顺序对BUlkCopy有影响
                table.Columns.Add(new System.Data.DataColumn() { ColumnName = "ID", DataType = typeof(string) });
                table.Columns.Add(new System.Data.DataColumn() { ColumnName = "CURRENCY_PAIR", DataType = typeof(Int64) });
                table.Columns.Add(new System.Data.DataColumn() { ColumnName = "FX_RATE", DataType = typeof(decimal) });
                table.Columns.Add(new System.Data.DataColumn() { ColumnName = "CAPTURE_DATE", DataType = typeof(DateTime) });
                System.Threading.Monitor.Exit(table);
            }
            string log = "";
            if (Request["_method"] == "getFxRate")
            {
                info = new Framework.QueryInfo() { CustomSQL = "select * from BASE_CURRENCY_WEB" };
                var Dao = (Contract.IService.IDaoService)ctx["DaoService"];
                dic = Dao.FindList<BasCurrencyPair>(new QueryInfo() { QueryObject = "BasCurrencyPair" }).ToDictionary(x => x.Id);
                var ds = Dao.ExcuteDataSet(info);
                var start = DateTime.Parse(Request["start"]);
                var end = DateTime.Parse(Request["end"]);
                if (string.IsNullOrEmpty(Request["start"]) || string.IsNullOrEmpty(Request["end"]))
                {
                    throw new Exception(":输个日期,我的哥!");
                }
                SubThread thread = new SubThread("1");
                //多线程的最大并发数
                int maxPoolThread = 200;
                int totalThreadNum = 0;
                //当前正在运行的线程
                var runingHt = new Dictionary<int, SubThread>();
                //处于等待队列的未运行的线程
                var unRunHt = new Dictionary<int, SubThread>();
                int k = 0;
                string url = "";

                //var code = row[""].ToString();

                for (DateTime t1 = start; t1 <= end; t1 = t1.AddDays(1))
                {
                    foreach (DataRow row in ds.Tables[0].Rows)
                    {
                        var pairId = row["CURRENCY_PAIR_ID"].ToString();
                        url = string.Format("http://srh.bankofchina.com/search/whpj/search.jsp?erectDate={0}&nothing={1}&pjname={2}", t1.ToString("yyyy-MM-dd"), t1.ToString("yyyy-MM-dd"), row["WEB_CODE"]);
                        var param = new ThreadParameters() { pairId = pairId, Url = url };
                        k++;
                        param.pageIndex = 1;
                        param.start = t1.ToString("yyyy-MM-dd");
                        param.webCode = row["WEB_CODE"].ToString();
                        SubThread th = new SubThread(k.ToString());

                        th.ThreadAction = (currentThread,inputParameter) =>
                        {
                            param.pageIndex = 1;
                            InitThread(param, th);
                            return "";
                        };
                        th.ThreadCompeleted += RateThreadCompeleted;
                        if (k <= maxPoolThread)
                        {
                            runingHt.Add(k, th);
                            th.Start();
                        }
                        else
                        {
                            unRunHt.Add(k, th);
                        }


                    }
                }
                while (true)
                {
                    Thread.Sleep(500);
                    //初始化完成队列,用于存取已经执行完的线程的id
                    var stepFinishList = new List<int>();

                    //将完成的线程放入完成队列
                    foreach (int tid in runingHt.Keys)
                    {
                        var t = runingHt[tid];
                        if (t.IsStopped)
                        {
                            t.Dispose();
                            if (t.ReturnObj == null)
                            {
                                log += "
" + "无数据" + "
";
                            }

                            //回调处理

                            stepFinishList.Add(tid);
                        }

                    }
                    System.Threading.Thread.Sleep(1000);
                    //1.遍历完成队列,从当前运行的线程队列中移除该线程
                    //2.对完成的线程执行回调,将数据持久化到数据库
                    //3.如果等待队列中还有数据,获取等待队列中的第一个,并执行该线程,将该线程从等待队列移除,加入到运行队列

                    foreach (int tid in stepFinishList)
                    {

                        runingHt.Remove(tid);
                        if (unRunHt.Count > 0)
                        {
                            SubThread unRunThread = unRunHt.First().Value;
                            var unRunTid = unRunHt.First().Key;
                            unRunThread.Start();
                            runingHt.Add(unRunTid, unRunThread);
                            unRunHt.Remove(unRunTid);
                        }
                    }
                    //所有线程都完成后,跳出循环
                    if (runingHt.Count == 0 && unRunHt.Count == 0)
                    {
                        break;
                    }

                }
                //Dao.SaveOrUpdateAll(contentList);
                Holworth.Utility.HraUtility.DataTableWriteToServer(table, "BAS_FX_RATE_BAK3", "ID", true);


            }

        }

    }


}
3.subthread.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ThreadTemplate
{
    public class SubThread : IDisposable
    {
        //线程锁
        public readonly object _locker = new object();
        /// <summary>
        /// 当前线程类的一个字典集合,可用于传递数据,属性辅助
        /// </summary>
        public IDictionary<object, object> DataDictionary { get; set; }
        /// <summary>
        /// 可用作theadstart的传入参数
        /// </summary>
        public dynamic InputParameter { get; set; }
        /// <summary>
        /// 第一个object对象是subthread本身,第二个是输入参数,第三个是执行委托函数的返回值
        /// </summary>
        public Func<object, object, object> ThreadAction { get; set; }
        //线程完成后的回调
        /// <summary>
        /// 第一个参数是subthread本身,第二个参数是委托的返回值
        /// </summary>
        public Func<object, object> ThreadCompeleted { get; set; }
        private Thread thdSubThread = null;
        //互斥信号量 
        //互斥锁(Mutex)
        //互斥锁是一个互斥的同步对象,意味着同一时间有且仅有一个线程可以获取它。
        //互斥锁可适用于一个共享资源每次只能被一个线程访问的情况
        public Mutex mUnique = new Mutex();
        private bool blnIsStopped;
        private bool blnSuspended;
        private bool blnStarted;
        private string threadId;
        public string ThreadId { get { return threadId; } set { threadId = value; } }
        private object _returnObj;
        public bool IsStopped
        {
            get { return blnIsStopped; }
        }
        public bool IsSuspended
        {
            get { return blnSuspended; }
        }
        public object ReturnObj
        {
            get { return _returnObj; }
            set { _returnObj = value; }

        }

        //阻塞,等待信号再解除阻塞
        public bool Wait()
        {
            lock (_locker)
            {
                //调用该方法进行阻塞,当有脉冲信号通知线程可以停止了,才结束阻塞
                while (!blnIsStopped) { Monitor.Wait(_locker); }
            }
  GC.SuppressFinalize(this);
return true; } /// <summary> /// 子线程构造函数 /// </summary> /// <param name="key">必须给,为了自己控制得到线程</param> /// <param name="threadStart">第一个参数subthread,第二个输入参数,第三个返回值</param> /// <param name="threadParamter">可以在 构造函数给,也可以通过赋值属性给</param> public SubThread(string key, Func<object, object, object> threadStart = null, dynamic threadParamter = null) { threadId = key; blnIsStopped = true; blnSuspended = false; blnStarted = false; if (threadStart != null) { ThreadAction = threadStart; } } /// <summary> /// Start sub-thread 线程启动 /// </summary> public void Start() { if (!blnStarted) { thdSubThread = new Thread(ExecuteAction); blnIsStopped = false; blnStarted = true; thdSubThread.Start(); } } /// <summary> /// Thread entry function 线程执行方法,从网站中用正则表达式,抓取需要的数据 /// </summary> /// private void ExecuteAction() { //线程动作 ThreadAction?.Invoke(this, InputParameter); //线程完成回调,这里直接运行 ThreadCompeleted?.Invoke(this); //通知主线程,任务完成内部阻塞,通过脉冲通知解除锁定 this.Stop(); } /// <summary> /// Suspend sub-thread /// </summary> public void Suspend() { if (blnStarted && !blnSuspended) { blnSuspended = true; mUnique.WaitOne(); } } /// <summary> /// Resume sub-thread /// </summary> public void Resume() { if (blnStarted && blnSuspended) { blnSuspended = false; mUnique.ReleaseMutex(); } } /// <summary> /// Stop sub-thread /// </summary> public void Stop() { if (blnStarted) { if (blnSuspended) Resume(); blnStarted = false; blnIsStopped = true; //thdSubThread.Join(); } lock (_locker) { blnStarted = false; blnIsStopped = true; //运行完成,使用脉冲通知锁定代码可以放行了 Monitor.PulseAll(_locker); } } #region IDisposable Members /// <summary> /// Class resources dispose here /// </summary> public void Dispose() { // TODO: Add clsSubThread.Dispose implementation Stop();//Stop thread first GC.SuppressFinalize(this); } #endregion } } 4.bulkcopy public static void DataTableWriteToServer(DataTable dt, string targetTable, string IdField = "ID", bool NeedId = false) { var Dao = GetDao(); QueryInfo info = new QueryInfo(); DataTable table = null; Int64 increId = 0; #region 获取序列的当前值 lock (new object()) { if (NeedId) { QueryInfo searchInfo = new QueryInfo(); searchInfo.CustomSQL = "select HIBERNATE_SEQUENCE.NEXTVAL from dual"; table = Dao.ExecuteDataSet(searchInfo).Tables[0]; increId = Convert.ToInt64(table.Rows[0][0].ToString()); #endregion QueryInfo tmpInfo = new QueryInfo(); tmpInfo.CustomSQL = "drop sequence HIBERNATE_SEQUENCE"; Dao.ExecuteNonQuery(tmpInfo); tmpInfo = new QueryInfo(); tmpInfo.CustomSQL = "CREATE SEQUENCE HIBERNATE_SEQUENCE START WITH " + (increId + 1 + dt.Rows.Count).ToString(); Dao.ExecuteNonQuery(tmpInfo); for (int i = 0; i < dt.Rows.Count; i++) { var row = dt.Rows[i]; increId++; row[IdField] = increId.ToString(); } } string connOrcleString = System.Configuration.ConfigurationManager.ConnectionStrings["ConnectionString"].ConnectionString; OracleConnection conn = new OracleConnection(connOrcleString); OracleBulkCopy bulkCopy = new OracleBulkCopy(connOrcleString, OracleBulkCopyOptions.UseInternalTransaction); bulkCopy.BulkCopyTimeout = 260 * 1000; bulkCopy.DestinationTableName = targetTable; //服务器上目标表的名称 bulkCopy.BatchSize = 5000; //每一批次中的行数 try { conn.Open(); if (dt != null && dt.Rows.Count != 0) bulkCopy.WriteToServer(dt); //将提供的数据源中的所有行复制到目标表中 } catch (Exception ex) { throw ex; } finally { if (bulkCopy != null) bulkCopy.Close(); } } } 5.信号通知 private void ComputeData() { try { QueryInfo info1 = new QueryInfo(); info1.CustomSQL = "delete from SysComputerStatus where CreateUid=:uid"; info1.Parameters.Add("uid", CurrentUser.UserId); Dao.ExecuteUpdate(info1); SubThread sub = new SubThread("RskBookComputeResultManage3_RiskValueComputeData"); sub.InputParameter = new { terminationStep = int.Parse(Request["terminationStep"]), riskBookId = int.Parse(Request["riskBookId"]??"4513"), computeDate = DateTime.Parse(Request["computeDate"]) }; sub.ThreadAction = (a,b) => { RiskValueComputeService2.SaveCompute2(sub.InputParameter.computeDate, sub.InputParameter.riskBookId, sub.InputParameter.terminationStep); return null; }; sub.Start(); //阻塞知道任务完成 if(sub.Wait()) { //if (sub.IsStopped) { Response.Write(Newtonsoft.Json.JsonConvert.SerializeObject(new {msg="计算成功" })); //break; } } } catch (Exception ex) { var errorObj = new { ErrorCode = "-999", error = ex.Message }; Response.Write(Newtonsoft.Json.JsonConvert.SerializeObject(errorObj)); throw; } finally { Response.End(); } }
原文地址:https://www.cnblogs.com/kexb/p/6045178.html