mysql的thread_running数量分析

技术mysql的thread_running数量分析本篇内容主要讲解“mysql的thread_running数量分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“mysql

本文主要讲解“mysql的thread_running Quantity Analysis”,感兴趣的朋友不妨看看。本文介绍的方法简单、快速、实用。让边肖带你学习“mysql线程运行量分析”!

线程池的原理:

服务器级别的解析已完成;

该层不是为每个连接创建单独的线程,而是创建多组驻留线程来接收和执行客户端连接发送的查询。

判断运行读取的层数,达到阈值直接报错或休眠。

状态变量记录当前并发执行stmt/command的次数,执行前加1,执行后减1;

突然飙升的原因:

客户端连接爆炸;

系统性能瓶颈,如CPU、IO或Memswap

sql中的异常;

会表现出吊死人的错觉。

执行,并引入两个阈值,低水位线和高水位线,以及变量threads _ running _ CTL _ mode(选择或全部);

之前,检查thread_running,

如果达到high_watermark阈值,将直接拒绝执行并返回错误:mysql。

服务器太忙

如果在低和高之间,休眠5 ms,然后重试,等待100ms,然后执行。

对已启动的交易和超级用户没有限制。

控制查询类型:SELECTS/ALL,默认为SELECTed,表示只影响select语句。

源代码见注释1。

优化为基于先进先出的秒等待/信号(实现8个先进先出);

高水位限流(此点保持不变);

低水位优化;其他解决方案:mariadb开发线程池,percona在上面实现优先级队列;

优点:思路和线程池一样,只是代码更简单(不到1000行);而且增加了对特定查询的过滤;

代码见注释2。

添加thread_active记录并发线程数,位于MySQL _ execute _ command中(SQL解析后),查询解析前判断水位高;

仅计算选择/DML,而保留提交/回滚。

>

采用FIFO,当thread_active >=
thread_running_low_watermark时进程进入FIFO等待,其他线程执行完sql后唤醒FIFO

内,同时引入threads_running_wait_timeout控制线程在FIFO最大等待时间,超时则直接报错返回。

引入8FIFO,降低了进出FIFO的锁竞争,线程采用RR分配到不同fifo,每个队列限制并发运行线程为threads_running_low_watermark/8

,开始执行query[解析后进行低水位判断,若通过则执行],执行当前sql完毕后,thread可能发起新query,则重复[]过程。

:进入FIFO排队最长时间,等待超时后sql被拒,默认100,单位为毫秒ms

当前并发SELECT/INSERT/UPDATE/DELETE执行的线程数目;

:当前进入到FIFO中等待的线程数目;

未打补丁版本,设置innodb_thread_concurrency=0

未打补丁版本,innodb_thread_concurrency=32

低水位限流补丁版本(活跃线程数不超过64

1

http://www.gpfeng.com/wp-content/uploads/2013/09/threads_running_control.txt

+static my_bool thread_running_control(THD *thd, ulong tr)
+{
+  int slept_cnt= 0;
+  ulong tr_low, tr_high;
+  DBUG_ENTER("thread_running_control");
+  
+  /* 
+    Super user/slave thread will not be affected at any time,
+    transactions that have already started will continue.
+  */
+  if ( thd->security_ctx->master_access & SUPER_ACL|| --对于super权限的用户和已经开启的事务不做限制
+      thd->in_active_multi_stmt_transaction() ||
+      thd->slave_thread)  
+    DBUG_RETURN(FALSE);
+
+  /* 
+    To promise that tr_low will never be greater than tr_high, 
+    as values may be changed between these two statements.
+    eg. 
+        (low, high) = (200, 500)
+        1. read low = 200
+        2. other sessions: set low = 20; set high = 80
+        3. read high = 80
+    Don't take a lock here to avoid lock contention.
+  */
+  do 
+  {
+    tr_low= thread_running_low_watermark;
+    tr_high= thread_running_high_watermark;
+
+  } while (tr_low > tr_high);
+
+check_buzy:

+  /* tr_high is promised to be non-zero.*/ 
+  if ((tr_low == 0 && tr < tr_high) || (tr_low != 0 && tr < tr_low))
+    DBUG_RETURN(FALSE);
+  
+  if (tr >= tr_high)
+  { 
+    int can_reject= 1;
+
+    /* thread_running_ctl_mode: 0 -> SELECTS, 1 -> ALL. */
+    if (thread_running_ctl_mode == 0)
+    {
+      int query_is_select= 0;
+      if (thd->query_length() >= 8)
+      {
+        char *p= thd->query();  --读取query text的前6个字符,以判断是否为select
+        if (my_toupper(system_charset_info, p[0]) == 'S' &&
+            my_toupper(system_charset_info, p[1]) == 'E' &&
+            my_toupper(system_charset_info, p[2]) == 'L' &&
+            my_toupper(system_charset_info, p[3]) == 'E' &&
+            my_toupper(system_charset_info, p[4]) == 'C' &&
+            my_toupper(system_charset_info, p[5]) == 'T')
+
+          query_is_select= 1;
+      }
+
+      if (!query_is_select)
+        can_reject= 0;
+    }
+
+    if (can_reject)
+    {
+      inc_thread_rejected();
+      DBUG_RETURN(TRUE);
+    }
+    else
+      DBUG_RETURN(FALSE);
+  }
+    
+  if (tr_low != 0 && tr >= tr_low)
+  {
+    /* 
+      If total slept time exceed 100ms and thread running does not
+      reach high watermark, let it in.
+    */
+    if (slept_cnt >= 20)
+      DBUG_RETURN(FALSE);
+    
+    dec_thread_running()
+    
+    /* wait for 5ms. */
+    my_sleep(5000UL); 
+
+    slept_cnt++;
+    tr= inc_thread_running() - 1;
+    
+    goto check_buzy;
+  }
+
+  DBUG_RETURN(FALSE);
+}
+
+/**
   Perform one connection-level (COM_XXXX) command.
   @param command         type of command to perform
@@ -1016,7 +1126,8 @@
   thd->set_query_id(get_query_id());
   if (!(server_command_flags[command] & CF_SKIP_QUERY_ID))
     next_query_id();
-  inc_thread_running();
+  /* remember old value of thread_running for *thread_running_control*. */
+  int32 tr= inc_thread_running() - 1;
   if (!(server_command_flags[command] & CF_SKIP_QUESTIONS))
     statistic_increment(thd->status_var.questions, &LOCK_status);

@@ -1129,6 +1240,13 @@
   {
     if (alloc_query(thd, packet, packet_length))
       break;                                 // fatal error is set
+
+    if (thread_running_control(thd, (ulong)tr))
+    {
+      my_error(ER_SERVER_THREAD_RUNNING_TOO_HIGH, MYF(0));
+      break;
+    }
+
     MYSQL_QUERY_START(thd->query(), thd->thread_id, (char *) (thd->db ? thd->db : ""),  &thd->security_ctx->priv_user[0])

注2 
http://www.gpfeng.com/wp-content/uploads/2014/01/tr-control.diff_.txt  
+/**
   Perform one connection-level (COM_XXXX) command.
 
   @param command         type of command to perform
@@ -1177,7 +1401,7 @@
     command= COM_SHUTDOWN;
   }
   thd->set_query_id(next_query_id());
-  inc_thread_running();
+  int32 tr= inc_thread_running();
 
   if (!(server_command_flags[command] & CF_SKIP_QUESTIONS))
     statistic_increment(thd->status_var.questions, &LOCK_status);
@@ -1209,6 +1433,15 @@
     goto done;
   }
 
+  if (command == COM_QUERY && alloc_query(thd, packet, packet_length))
+    goto endof_case;                 // fatal error is set
+
+  if (thread_running_control_high(thd, tr))
+  {
+    my_error(ER_SERVER_THREAD_RUNNING_TOO_HIGH, MYF(0));
+    goto endof_case;
+  }
+
   switch (command) {
   case COM_INIT_DB:
   {
@@ -1311,8 +1544,6 @@
   }
   case COM_QUERY:
   {
-    if (alloc_query(thd, packet, packet_length))
-      break;                                 // fatal error is set
     MYSQL_QUERY_START(thd->query(), thd->thread_id,
                       (char *) (thd->db ? thd->db : ""),
                       &thd->security_ctx->priv_user[0],
@@ -1751,6 +1982,7 @@
     my_message(ER_UNKNOWN_COM_ERROR, ER(ER_UNKNOWN_COM_ERROR), MYF(0));
     break;
   }
+endof_case:
 
 done:
   DBUG_ASSERT(thd->derived_tables == NULL &&
@@ -2502,12 +2734,37 @@
   Opt_trace_array trace_command_steps(&thd->opt_trace, "steps");
 
   DBUG_ASSERT(thd->transaction.stmt.cannot_safely_rollback() == FALSE);
+  bool count_active= false;
 
   if (need_traffic_control(thd, lex->sql_command))
   {
     thd->killed = THD::KILL_QUERY;
     goto error;
   }
+
+  switch (lex->sql_command) {
+
+  case SQLCOM_SELECT:
+  case SQLCOM_UPDATE:
+  case SQLCOM_UPDATE_MULTI:
+  case SQLCOM_DELETE:
+  case SQLCOM_DELETE_MULTI:
+  case SQLCOM_INSERT:
+  case SQLCOM_INSERT_SELECT:
+  case SQLCOM_REPLACE:
+  case SQLCOM_REPLACE_SELECT:
+    count_active= true;
+    break;
+  default:
+    break;
+  }
+
+  if (count_active && thread_running_control_low_enter(thd))
+  {
+    my_error(ER_SERVER_THREAD_RUNNING_TOO_HIGH, myf(0));
+    goto error;
+  }
+
   status_var_increment(thd->status_var.com_stat[lex->sql_command]);
 
   switch (gtid_pre_statement_checks(thd))
@@ -4990,6 +5247,9 @@
 
 finish:
 
+  if (count_active)
+    thread_running_control_low_exit(thd);
+
   DBUG_ASSERT(!thd->in_active_multi_stmt_transaction() ||
                thd->in_multi_stmt_transaction_mode());
 
 

+static my_bool thread_running_control_high(THD *thd, int32 tr)
+{
+  int32 tr_high;
+  DBUG_ENTER("thread_running_control_high");
+
+  tr_high= (int32)thread_running_high_watermark;
+
+  /* thread_running_ctl_mode: 0 -> SELECTS, 1 -> ALL. */
+  if ((!tr_high || tr <= tr_high) ||
+      thd->transaction.is_active() ||
+      thd->get_command() != COM_QUERY ||
+      thd->security_ctx->master_access & SUPER_ACL ||
+      thd->slave_thread)
+    DBUG_RETURN(FALSE);
+
+  const char *query= thd->query();
+  uint32 len= thd->query_length();
+
+  if ((!has_prefix(query, len, "SELECT", 6) && thread_running_ctl_mode == 0) || --不再是逐个字符判断
+      has_prefix(query, len, "COMMIT", 6) ||
+      has_prefix(query, len, "ROLLBACK", 8))
+    DBUG_RETURN(FALSE);
+
+  /* confirm again*/
+  if (tr > tr_high && get_thread_running() > tr_high)
+  {
+    __sync_add_and_fetch(&thread_rejected, 1);
+    DBUG_RETURN(TRUE);
+  }
+
+  DBUG_RETURN(FALSE);
+}
+
 

+static my_bool thread_running_control_low_enter(THD *thd)
+{
+  int res= 0;
+  int32 tr_low;
+  my_bool ret= FALSE;
+  my_bool slept= FALSE;
+  struct timespec timeout;
+  Thread_conc_queue *queue;
+  DBUG_ENTER("thread_running_control_low_enter");
+
+  /* update global status */
+  __sync_add_and_fetch(&thread_active, 1);
+
+  tr_low= (int32)queue_tr_low_watermark;
+  queue= thread_conc_queues + thd->query_id % N_THREAD_CONC_QUEUE;
+
+  queue->lock();--问1:在进行低水位判断前,先锁定FIFO,避免低水位验证失败时无法获取FIFO锁进而不能放入FIFO;
+
+retry:
+
+  if ((!tr_low || queue->thread_active < tr_low) ||
+      (thd->lex->sql_command != SQLCOM_SELECT && thread_running_ctl_mode == 0) ||
+      (!slept && (thd->transaction.is_active() ||
+        thd->security_ctx->master_access & SUPER_ACL || thd->slave_thread)))
+  {
+    queue->thread_active++; --判断是否满足进入FIFO条件,如不满足则立即更新thread_active++,解锁queue并退出;
+    queue->unlock();
+    DBUG_RETURN(ret);
+  }
+
+  if (!slept)
+  {
+    queue->unlock();
+
+    /* sleep for 500 us */
+    my_sleep(500);
+    slept= TRUE;
+    queue->lock();
+
+    goto retry;
+  }
+
+  /* get a free wait-slot */
+  Thread_wait_slot *slot= queue->pop_free();
+
+  /* can't find a free wait slot, must let the query enter */
+  if (!slot)-- 当FIFO都满了,即无法把当前线程放入,则必须放行让该sql正常执行
+  {
+    queue->thread_active++;
+    queue->unlock();
+    DBUG_RETURN(ret);
+  }
+
+  slot->signaled= false;
+  slot->wait_ended= false;
+
+  /* put slot into waiting queue. */
+  queue->push_back_wait(slot);
+  queue->thread_wait++;
+
+  queue->unlock();
+
+  /* update global status */
+  thd_proc_info(thd, "waiting in server fifo");
+  __sync_sub_and_fetch(&thread_active, 1);
+  __sync_add_and_fetch(&thread_wait, 1);
+
+  /* cond-wait for at most thread_running_wait_timeout(ms). */
+  set_timespec_nsec(timeout, thread_running_wait_timeout_ns);
+
+  mysql_mutex_lock(&slot->mutex);
+  while (!slot->signaled)
+  {
+    res= mysql_cond_timedwait(&slot->cond, &slot->mutex, &timeout);
+    /* no need to signal if cond-wait timedout */
+    slot->signaled= true;
+  }
+  mysql_mutex_unlock(&slot->mutex);
+
+  queue->lock();
+  queue->thread_wait--;
+  queue->thread_active++;
+
+  /* remove slot from waiting queue. */
+  queue->remove_wait(slot);
+  /* put slot into the free queue for reuse. */
+  queue->push_back_free(slot);
+
+  queue->unlock();
+
+  /* update global status */
+  __sync_sub_and_fetch(&thread_wait, 1);
+  __sync_add_and_fetch(&thread_active, 1);
+  thd_proc_info(thd, 0);
+
+  if (res == ETIMEDOUT || res == ETIME)
+  {
+    ret= TRUE; // indicate that query is rejected.
+    __sync_add_and_fetch(&thread_rejected, 1);
+  }
+
+  DBUG_RETURN(ret);
+}

到此,相信大家对“mysql的thread_running数量分析”有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/104198.html

(0)

相关推荐

  • CF1394C Boboniu and String

    技术CF1394C Boboniu and String CF1394C Boboniu and String题解
    好妙的一道题。
    将每个串都抽象成二维平面上的一个点 \((x_i,y_i)\),\(x

    礼包 2021年10月27日
  • 如何配置Git

    技术如何配置Git这篇文章主要介绍了如何配置Git,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。配置Git1.配置git 用户名和邮箱git config

    攻略 2021年11月21日
  • 信号量学习

    技术信号量学习 信号量学习(一)概念信号量是一个整数,这个整数允许多进程或多线程同步他们的操作。这个整数必须大于等于0.
    linux库函数中,类似有名映射、匿名映射,有名管道、无名管道,信号量也有有名信

    礼包 2021年11月26日
  • 使用SqlBulkCopy时为注意Sqlserver表中使用缺省值的列的示例分析

    技术使用SqlBulkCopy时为注意Sqlserver表中使用缺省值的列的示例分析本篇文章给大家分享的是有关使用SqlBulkCopy时为注意Sqlserver表中使用缺省值的列的示例分析,小编觉得挺实用的,因此分享给

    攻略 2021年12月4日
  • 忽视数据中心物理基础设施的现代化将产生的问题有哪些

    技术忽视数据中心物理基础设施的现代化将产生的问题有哪些这篇文章主要讲解了“忽视数据中心物理基础设施的现代化将产生的问题有哪些”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“

    攻略 2021年10月22日
  • uvm常见断言方法(uvm里面start的参数什么意义)

    技术如何浅析UVM概念中的topdown phase本篇文章给大家分享的是有关如何浅析UVM概念中的topdown phase,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小

    攻略 2021年12月18日