Qaisar Nisar

Qaisar Nisar

  • NA
  • 22
  • 1.6k

how I stop all threads using manualResetEvent or any other?

Jan 16 2019 1:09 AM
I am using 10 threads each thread get 10 records whose Status is null and then update status to "Q". Now these records is requested to different APIs and on success response i update the status to "C" all is doing nicely. As a stopper to stop all threads i Used ManualResetEvent after when (j>30) ManualResetEven.Set() i check a condition
 
if (_stopper.WaitOne(0, false)) { break; }
 
to stop my while loop and update status to null for the remaining records of each thread whose status is "Q".On stopping i update status with "Q" to null on all records of 10 threads, the problem is that one of my thread's record is not updating status to null from "Q"?
 
Here is my Code:
  1. public void MainProcess()   
  2. {   
  3.    int totalThread = 10;   
  4.    Thread[] ThreadNum = new Thread[totalThread];   
  5.    DataTable dt = new DataTable();   
  6.    int j = 1;   
  7.    while (true)   
  8.    {   
  9.       if (_stopper.WaitOne(0, false)){   
  10.          lov.LogWriter("MainProcess stopper. |" + dt.Rows[0]["que_batch"].ToString());   
  11.          string var = sp_Queue_upd(dt.Rows[0]["que_batch"].ToString(), "Q");   
  12.          lov.LogWriter("MainProcess stopper sp_Queue_upd " + var + " stopper call, current number is " + que_BatchNumberIndex + " Batch No. :" + que_BatchNumber + ": mainthread End of While");//+ (string)argArray.GetValue(1));   
  13.          break;   
  14.       }   
  15.       if (j > 30) {  
  16.        _stopper.Set();  
  17.        lov.LogWriter("MainProcess stopper.Set");   
  18.       }   
  19.       if (threadCount.ThreadCount < totalThread && threadCount.ThreadCount >= 0) {   
  20.          if (ThreadNum[threadCount.ThreadCount] == null){  
  21.            dt = sp_get_entry_spool();   
  22.            if (dt.Rows.Count <= 0) {   
  23.                Thread.Sleep(1000);   
  24.                continue;   
  25.             }   
  26.              object[] objArr = { dt, threadCount.ThreadCount.ToString(), dt.Rows[0]["que_batch"].ToString() };  
  27.             ThreadNum[threadCount.ThreadCount] = new Thread(new ParameterizedThreadStart(Process));   
  28.             ThreadNum[threadCount.ThreadCount].Name = dt.Rows[0]["que_batch"].ToString();   
  29.             ThreadNum[threadCount.ThreadCount].IsBackground = true;   
  30.             ThreadNum[threadCount.ThreadCount].SetApartmentState(ApartmentState.MTA);   
  31.             ThreadNum[threadCount.ThreadCount].Start(objArr);   
  32.             lov.LogWriter("thread Start: Batch No. :" + dt.Rows[0]["que_batch"].ToString() + "|Actual Batch :" + j + "|ThreadCount" + threadCount.ThreadCount.ToString());  
  33.          }                                                                          
  34.          else if (!ThreadNum[threadCount.ThreadCount].IsAlive) {                        
  35.             dt = sp_get_entry_spool();   
  36.             if (dt.Rows.Count <= 0) {                                                 
  37.                Thread.Sleep(1000);   
  38.                continue;   
  39.             }   
  40.             object[] objArr = new object[3] { dt, threadCount.ThreadCount.ToString(), dt.Rows[0]["que_batch"].ToString() };   
  41.             ThreadNum[threadCount.ThreadCount] = new Thread(new ParameterizedThreadStart(Process));   
  42.             ThreadNum[threadCount.ThreadCount].Name = dt.Rows[0]["que_batch"].ToString();  
  43.             ThreadNum[threadCount.ThreadCount].IsBackground = true;   
  44.             ThreadNum[threadCount.ThreadCount].SetApartmentState(ApartmentState.MTA);   
  45.             ThreadNum[threadCount.ThreadCount].Start(objArr);                     
  46.             lov.LogWriter("thread Start: Batch No. :" + dt.Rows[0]["que_batch"].ToString() + "|Actual Batch :" + j + "|ThreadCount" + threadCount.ThreadCount.ToString());   
  47.             j++;                                                                       
  48.          }   
  49.          threadCount.ThreadCount++;   
  50.       }   
  51.       else if (threadCount.ThreadCount >= totalThread) {   
  52.           threadCount.ThreadCount = 0;   
  53.       }   
  54.    }   
  55.    if (statusQueue) {   
  56.       string var = sp_Queue_upd(que_BatchNumber, "Q");                                
  57.       lov.LogWriter("Back to MainProcess" + var + " stopper call, current number is " + que_BatchNumberIndex + " Batch No. :" + que_BatchNumber + ": mainthread End of While" );//+ (string)argArray.GetValue(1));  
  58.   }                                                                             
  59. }   
  60. public void Process(object ob)   
  61. {                                                                        
  62.    Array argArray = new object[3];                                                   
  63.    argArray = (Array)ob;                                                               
  64.    DataTable Data = (DataTable)argArray.GetValue(0);   
  65.    string var = "";                                                           
  66.    try {                                                                               
  67.       Core_Incor objCore = null;                                                         
  68.       int i = 0;                                                        
  69.       for (i = 0; i < Data.Rows.Count; i++)   
  70.       {   
  71.          #region Loop var = "";   
  72.          try {   
  73.             #region Loop   
  74.             if (_stopper.WaitOne(0, false)) {   
  75.                var = sp_Queue_upd(Data.Rows[i]["que_batch"].ToString(), "Q");   
  76.                lov.LogWriter("sp_Queue_upd " + var + " stopper call, current number is " + i + " Batch No. :" + Data.Rows[i]["que_batch"].ToString() + ":" + (string)argArray.GetValue(1));   
  77.                break;   
  78.             }   
  79.             if (Data.Rows[i]["mesg_type"].ToString() == "CQ") {   
  80.                // mark status to "C" }   
  81.             else if (Data.Rows[i]["mesg_type"].ToString() == "IN")   
  82.                { // mark status to "C" }   
  83.             else if (Data.Rows[i]["mesg_type"].ToString() == "CH")   
  84.                { // mark status to "C" }   
  85.             else if (Data.Rows[i]["mesg_type"].ToString() == "IF")   
  86.                { // mark status to "C" }   
  87.             else if (Data.Rows[i]["mesg_type"].ToString() == "IB")   
  88.                { // mark status to "C" }   
  89.             else if (Data.Rows[i]["mesg_type"].ToString() == "IB")   
  90.                { // mark status to "C" }   
  91.             else if (Data.Rows[i]["mesg_type"].ToString() == "BA")   
  92.                { // mark status to "C" }                                          
  93.             else if (Data.Rows[i]["mesg_type"].ToString() == "RT")   
  94.                { // mark status to "C" }   
  95.             else if (Data.Rows[i]["mesg_type"].ToString() == "CA" || Data.Rows[i]["mesg_type"].ToString() == "UB")   
  96.                { // mark status to "C" }   
  97.          }   
  98.          catch (Exception ex)   
  99.          {   
  100.             InsertReponce("1", ex.Message, Data.Rows[i]["rowid"].ToString());   
  101.          }   
  102.          #endregion   
  103.        }   
  104.     }   
  105.     catch (Exception ex){               
  106.        var = "1;MainException:FE Process :" + ex.Message;                        
  107.        lov.LogWriter(var);   
  108.     }   
  109. }