当前位置: 首页 > news >正文

建设简易电子商务网站流程vip解析网站怎么做

建设简易电子商务网站流程,vip解析网站怎么做,自己怎么制作网页游戏,做网站图片为什么不清晰说明 在上篇《TiProxy 尝鲜》 中做了一些实验#xff0c;比如加减tidb节点后tiproxy可以做到自动负载均衡#xff0c;如果遇到会话有未提交的事务则等待事务结束才迁移。 本次主要研究这样的功能在tiproxy中是如何实现的#xff0c;本次分享内容主要为以下几部分#xff…说明 在上篇《TiProxy 尝鲜》 中做了一些实验比如加减tidb节点后tiproxy可以做到自动负载均衡如果遇到会话有未提交的事务则等待事务结束才迁移。 本次主要研究这样的功能在tiproxy中是如何实现的本次分享内容主要为以下几部分 tiproxy是怎么发现tidb tiproxy是在tidb节点间自动负载均衡的逻辑 在自动负载均衡时tiproxy是怎么做到优雅的session迁移、session上下文恢复 tiproxy在自动负载均衡期间遇到处于未提交事务的session是怎么等待结束的 Tiproxy 介绍 tiproxy 在 2022年12月2日被operator支持 相关的设计文档可以从官方 README  和 goole doc  中查看 这个有个重要特性需要说明下 tiproxy组件不会保存账号的密码因为这是不安全的行为所以当进行会话迁移的时候使用的是 session token 认证方式(下文会提到这种方式的实现原理)。 声明 目前tiproxy还处于实验阶段、功能还在持续开发中本文讲述的内容跟日后GA版本可能存在差异届时请各位看官留意。 另外本人能力有限在阅读源码中难免有理解不到位的地方如有发现欢迎在评论区指正感谢。 开始发车 原理分析 1、tiproxy是怎么发现tidb 获取tidb拓扑最核心、简化后的代码如下其实就是使用etcdCli.Get获取信息 // 从 etcd 获取 tidb 拓扑 路径 /topology/tidb/ip:port/info /topology/tidb/ip:port/ttl func (is *InfoSyncer) GetTiDBTopology(ctx context.Context) (map[string]*TiDBInfo, error) {res, err : is.etcdCli.Get(ctx, tidbinfo.TopologyInformationPath, clientv3.WithPrefix())infos : make(map[string]*TiDBInfo, len(res.Kvs)/2)for _, kv : range res.Kvs {var ttl, addr stringvar topology *tidbinfo.TopologyInfokey : hack.String(kv.Key)switch {case strings.HasSuffix(key, ttlSuffix):addr key[len(tidbinfo.TopologyInformationPath)1 : len(key)-len(ttlSuffix)-1]ttl hack.String(kv.Value)case strings.HasSuffix(key, infoSuffix):addr key[len(tidbinfo.TopologyInformationPath)1 : len(key)-len(infoSuffix)-1]json.Unmarshal(kv.Value, topology)default:continue}info : infos[addr]if len(ttl) 0 {info.TTL hack.String(kv.Value)} else {info.TopologyInfo topology}}return infos, nil } 这个函数是怎么被tiproxy用起来的呢 其实在每个proxy启动时后都会开启一个BackendObserver协程这个协程会做三件事 func (bo *BackendObserver) observe(ctx context.Context) {for ctx.Err() nil {// 获取backendInfo, err : bo.fetcher.GetBackendList(ctx)// 检查bhMap : bo.checkHealth(ctx, backendInfo)// 通知bo.notifyIfChanged(bhMap)select {case -time.After(bo.healthCheckConfig.Interval): // 间隔3秒case -bo.refreshChan:case -ctx.Done():return}} } 第一步获取 从etcd获取tidb拓扑代码见上 第二步检查 判断获取到tidb节点是否可以连通、访问给每个节点设置StatusHealthy或者StatusCannotConnect状态 func (bo *BackendObserver) checkHealth(ctx context.Context, backends map[string]*BackendInfo) map[string]*backendHealth {curBackendHealth : make(map[string]*backendHealth, len(backends))for addr, info : range backends {bh : backendHealth{status: StatusHealthy,}curBackendHealth[addr] bh// http 服务检查if info ! nil len(info.IP) 0 {schema : httphttpCli : *bo.httpClihttpCli.Timeout bo.healthCheckConfig.DialTimeouturl : fmt.Sprintf(%s://%s:%d%s, schema, info.IP, info.StatusPort, statusPathSuffix)resp, err : httpCli.Get(url)if err ! nil {bh.status StatusCannotConnectbh.pingErr errors.Wrapf(err, connect status port failed)continue}}// tcp 服务检查conn, err : net.DialTimeout(tcp, addr, bo.healthCheckConfig.DialTimeout)if err ! nil {bh.status StatusCannotConnectbh.pingErr errors.Wrapf(err, connect sql port failed)} }return curBackendHealth } 第三步通知 将检查后的 backends 列表跟内存中缓存的 backends 进行比较将变动的 updatedBackends 进行通知 // notifyIfChanged 根据最新的 tidb 拓扑 bhMap 与之前的 tidb 拓扑 bo.curBackendInfo 进行比较 // - 在 bo.curBackendInfo 中但是不在 bhMap 中说明 tidb 节点失联需要记录下 // - 在 bo.curBackendInfo 中也在 bhMap 中但是最新的状态不是 StatusHealthy也需要记录下 // - 在 bhMap 中但是不在 bo.curBackendInfo 中说明是新增 tidb 节点需要记录下 func (bo *BackendObserver) notifyIfChanged(bhMap map[string]*backendHealth) {updatedBackends : make(map[string]*backendHealth)for addr, lastHealth : range bo.curBackendInfo {if lastHealth.status StatusHealthy {if newHealth, ok : bhMap[addr]; !ok {updatedBackends[addr] backendHealth{status: StatusCannotConnect,pingErr: errors.New(removed from backend list),}updateBackendStatusMetrics(addr, lastHealth.status, StatusCannotConnect)} else if newHealth.status ! StatusHealthy {updatedBackends[addr] newHealthupdateBackendStatusMetrics(addr, lastHealth.status, newHealth.status)}}}for addr, newHealth : range bhMap {if newHealth.status StatusHealthy {lastHealth, ok : bo.curBackendInfo[addr]if !ok {lastHealth backendHealth{status: StatusCannotConnect,}}if lastHealth.status ! StatusHealthy {updatedBackends[addr] newHealthupdateBackendStatusMetrics(addr, lastHealth.status, newHealth.status)} else if lastHealth.serverVersion ! newHealth.serverVersion {// Not possible here: the backend finishes upgrading between two health checks.updatedBackends[addr] newHealth}}}// Notify it even when the updatedBackends is empty, in order to clear the last error.bo.eventReceiver.OnBackendChanged(updatedBackends, nil)bo.curBackendInfo bhMap } 通过上面的步骤就获取到了变动的backends将这些变动从 BackendObserver 模块同步给 ScoreBasedRouter 模块。 2、tiproxy是在tidb节点间自动负载均衡的逻辑 此处自动负载的语义是将哪个 backend 的哪个 connect 迁移到哪个 backend 上。这就要解决 backend 挑选和 connect 挑选问题。 这个问题的解决办法是在 ScoreBasedRouter 模块完成。这个模块有3个 func 和上述解释相关 type ScoreBasedRouter struct {sync.Mutex// A list of *backendWrapper. The backends are in descending order of scores.backends *glist.List[*backendWrapper]// ... }// 被 BackendObserver 调用传来的 backends 会合并到 ScoreBasedRouter::backends 中 func (router *ScoreBasedRouter) OnBackendChanged(backends map[string]*backendHealth, err error) {}// 通过比较 backend 分数方式调整 ScoreBasedRouter::backends 中的位置 func (router *ScoreBasedRouter) adjustBackendList(be *glist.Element[*backendWrapper]) {}// 协程方式运行做负载均衡处理 func (router *ScoreBasedRouter) rebalanceLoop(ctx context.Context) {} OnBackendChanged 是暴露给 BackendObserver 模块的一个接口 用来同步从 etcd 发现的 tidb 信息这个逻辑不复杂详细可自行阅读源码。这个方法是问题一种提到的“通知”接收处。 adjustBackendList 本质就是调整 item 在双向链表中的位置这个也不复杂。 下面重点说下 rebalanceLoop 的逻辑这里涉及到将哪个 backend 的哪个 connect 迁移到哪个 backend 上的问题。 // rebalanceLoop 计算间隔是 10 ms每次最多处理 10 个连接(防止后端出现抖动) // - backends 的变化是通过 OnBackendChanged 修改的连接平衡是 rebalanceLoop 函数做的两者为了保证并发使用了 sync.Mutex func (router *ScoreBasedRouter) rebalanceLoop(ctx context.Context) {for {router.rebalance(rebalanceConnsPerLoop)select {case -ctx.Done():returncase -time.After(rebalanceInterval):}} }// rebalance func (router *ScoreBasedRouter) rebalance(maxNum int) {curTime : time.Now()router.Lock()defer router.Unlock()for i : 0; i maxNum; i {var busiestEle *glist.Element[*backendWrapper]for be : router.backends.Front(); be ! nil; be be.Next() {backend : be.Valueif backend.connList.Len() 0 {busiestEle bebreak}}if busiestEle nil {break}busiestBackend : busiestEle.ValueidlestEle : router.backends.Back()idlestBackend : idlestEle.Valueif float64(busiestBackend.score())/float64(idlestBackend.score()1) rebalanceMaxScoreRatio {break}var ce *glist.Element[*connWrapper]for ele : busiestBackend.connList.Front(); ele ! nil; ele ele.Next() {conn : ele.Valueswitch conn.phase {case phaseRedirectNotify:continuecase phaseRedirectFail:if conn.lastRedirect.Add(redirectFailMinInterval).After(curTime) {continue}}ce elebreak}if ce nil {break}conn : ce.ValuebusiestBackend.connScore--router.adjustBackendList(busiestEle)idlestBackend.connScorerouter.adjustBackendList(idlestEle)conn.phase phaseRedirectNotifyconn.lastRedirect curTimeconn.Redirect(idlestBackend.addr)} } rebalance 的逻辑 从前往后访问 backends list找到 busiestBackend 在 backends list 最后找到 idlestBackend 比较两者 score 如果差距在 20% 以内就不用处理了 否则在 busiestBackend 中取出一个 conn 给 idlestBackend 取出的逻辑很简单就是从前到后遍历当前 backend 的 connList 因为session迁移要保证事务完成所以迁移不是立刻执行的这就得加个 phase 来跟进 处于 phaseRedirectNotify 阶段的不要再取出 处于 phaseRedirectFail 但还没到超时时间的也不要取出 其他状态的 conn 可以被取出 因为有 conn 变动所以要调整下 busiestBackend 和 idlestBackend 在 backends list 中的位置 最后通过 channel 通知 BackendConnManager 做去session迁移此时 conn 状态是 phaseRedirectNotify 给每个backend的打分逻辑如下分数越大说明负载越大 func (b *backendWrapper) score() int {return b.status.ToScore() b.connScore }// var statusScores map[BackendStatus]int{ // StatusHealthy: 0, // StatusCannotConnect: 10000000, // StatusMemoryHigh: 5000, // StatusRunSlow: 5000, // StatusSchemaOutdated: 10000000, // }// connScore connList.Len() incoming connections - outgoing connections. 3、在自动负载均衡时tiproxy是怎么做到优雅的session迁移、session上下文恢复 这个问题可以继续细分 迁移消息接收 ScoreBasedRouter 模块计算出哪个 conn 从哪个 backend 迁移到哪个 backend 后怎么通知给对应的 conn 迁移任务执行 conn 接收到消息后要进行session迁移那么如何解决迁移期间 client 可能存在访问的问题 因为tiproxy没有保存密码那么基于session token的验证方式是怎么实现的 新的tidb节点登录成功后session上下问题信息是怎么恢复的 以上的问题都可以在 BackendConnManager 模块找到答案 type BackendConnManager struct {// processLock makes redirecting and command processing exclusive.processLock sync.MutexclientIO *pnet.PacketIObackendIO atomic.Pointer[pnet.PacketIO]authenticator *Authenticator } func (mgr *BackendConnManager) Redirect(newAddr string) bool {} func (mgr *BackendConnManager) processSignals(ctx context.Context) {} func (mgr *BackendConnManager) tryRedirect(ctx context.Context) {} func (mgr *BackendConnManager) querySessionStates(backendIO *pnet.PacketIO) (sessionStates, sessionToken string, err error) {} func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) (err error) {} 迁移消息接收 在前文的 rebalance 方法最后有行这样的逻辑 conn.Redirect(idlestBackend.addr) 这就是 ScoreBasedRouter 的通知给对应 conn 的地方。 这里调用的是 BackendConnManager::Redirect 具体执行逻辑 - 将目标 backend 存储到 redirectInfo - 给 signalReceived channel 发 signalTypeRedirect 消息 func (mgr *BackendConnManager) Redirect(newAddr string) bool {// NOTE: BackendConnManager may be closing concurrently because of no lock.switch mgr.closeStatus.Load() {case statusNotifyClose, statusClosing, statusClosed:return false}mgr.redirectInfo.Store(signalRedirect{newAddr: newAddr})// Generally, it wont wait because the caller wont send another signal before the previous one finishes.mgr.signalReceived - signalTypeRedirectreturn true } 该消息被 BackendConnManager::processSignals 协程接收 func (mgr *BackendConnManager) processSignals(ctx context.Context) {for {select {case s : -mgr.signalReceived:// Redirect the session immediately just in case the session is finishedTxn.mgr.processLock.Lock()switch s {case signalTypeGracefulClose:mgr.tryGracefulClose(ctx)case signalTypeRedirect: // mgr.tryRedirect(ctx) }mgr.processLock.Unlock()case rs : -mgr.redirectResCh:mgr.notifyRedirectResult(ctx, rs)case -mgr.checkBackendTicker.C:mgr.checkBackendActive()case -ctx.Done():return}} } 这里补充下 processSignals 是怎么来的。正常情况下client每发起一个连接proxy就会起两个协程 连接、转发 tcp 消息协程 连接SQLServer::Run 方法启动也就是每连接每协程的意思。 转发ClientConnection 模块调用 BackendConnManager::ExecuteCmd 实现消息转发 监听和执行 redirect 任务协程 BackendConnManager 模块启动 processSignals 协程处理 所以上文监听 signalTypeRedirect 消息的 processSignals 协程在连接建立时就启动了当收到消息后执行 tryRedirect 方法尝试执行迁移。 迁移任务执行 tryRedirect 处理逻辑比较复杂我们选取核心流程进行简述 func (mgr *BackendConnManager) tryRedirect(ctx context.Context) {// 获取目标 backendsignal : mgr.redirectInfo.Load()// 处于事务中先不做迁移if !mgr.cmdProcessor.finishedTxn() {return}// 组装执行结果rs : redirectResult{from: mgr.ServerAddr(),to: signal.newAddr,}defer func() {// 不论执行成功与否都清空 redirectInfo 并将 rs 结果发到 redirectResCh redirectResCh 的处理逻辑还是在 processSignals 中处理mgr.redirectInfo.Store(nil)mgr.redirectResCh - rs}()// 从源 backend 获取 sessionStates, sessionTokenbackendIO : mgr.backendIO.Load()sessionStates, sessionToken, rs.err : mgr.querySessionStates(backendIO)// 跟目标 backend 建立tcp连接cn, rs.err : net.DialTimeout(tcp, rs.to, DialTimeout)// 将 conn 包裹为 PacketIOnewBackendIO : pnet.NewPacketIO(cn, mgr.logger, pnet.WithRemoteAddr(rs.to, cn.RemoteAddr()), pnet.WithWrapError(ErrBackendConn))// 使用 session token方式跟目标 backend 进行鉴握手鉴权mgr.authenticator.handshakeSecondTime(mgr.logger, mgr.clientIO, newBackendIO, mgr.backendTLS, sessionToken)// 登录目标 backend 进行鉴权rs.err mgr.initSessionStates(newBackendIO, sessionStates)// 将新的 PacketIO 存储到 BackendConnManager 的成员变量中后续再有请求都是用此变量mgr.backendIO.Store(newBackendIO) } 上面展示了 session token 的认证方式和上下文恢复的逻辑对应 querySessionStates 、handshakeSecondTime 、initSessionStates 三个方法 querySessionStates: tiproxy 在 tidb a 上执行 SHOW SESSION_STATES 获取到 session_token session_state handshakeSecondTime: tiproxy 使用 session_token 认证方式登录到 tidb b initSessionStates: tiproxy 登录成功后执行 SET SESSION_STATES %s 设置 tidb b 的 session_state 补充 tiproxy 使用的 session token 的方式可以理解为 tidb 丰富了 mysql 协议在 client 登录 server 的时候除了账号密码这种mysql_native_password方式还支持了账号token方式。 使用 session token 认证方式要求整个tidb集群证书是一样的这样tidb a签名tidb b才可以验签通过。 为了方式迁移期间client还有新的会话在执行 tryRedirect 前后使用 sync.Mutex 进行保护 func (mgr *BackendConnManager) processSignals(ctx context.Context) {for {// ...mgr.processLock.Lock()switch s {case signalTypeRedirect:mgr.tryRedirect(ctx)}mgr.processLock.Unlock()// ...}} }func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) (err error) {// ...mgr.processLock.Lock()defer mgr.processLock.Unlock()// ...waitingRedirect : mgr.redirectInfo.Load() ! nil// ...if waitingRedirect {mgr.tryRedirect(ctx)}// ... } 4、tiproxy在自动负载均衡期间遇到处于未提交事务的session是怎么等待结束的 对于 tryRedirect 方法有两个地方被调用即前文提到的 BackendConnManager::processSignals 和 BackendConnManager::ExecuteCmd BackendConnManager::processSignals 只有在收到channe消息后立即出发一次如果有未完成的事务就不再执行了。 所以为了保证迁移任务可继续在 BackendConnManager::ExecuteCmd 中每次执行完 executeCmd 后尝试迁移这样就能保证事务结束后立刻迁移。 func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) (err error) {// ...waitingRedirect : mgr.redirectInfo.Load() ! nil// ...holdRequest, err mgr.cmdProcessor.executeCmd(request, mgr.clientIO, mgr.backendIO.Load(), waitingRedirect)// ...if mgr.cmdProcessor.finishedTxn() {if waitingRedirect {mgr.tryRedirect(ctx)}// ...}// ... } 判断事务是否结束的 finishedTxn 方法逻辑解析 client 的请求类型、解析 backend 的响应状态综合判断事务是否完成此逻辑过于硬核等以后研究明白后再分享吧。 有兴趣的读者可以分析下这段逻辑 func (cp *CmdProcessor) finishedTxn() bool {if cp.serverStatus(StatusInTrans|StatusQuit) 0 {return false}// If any result of the prepared statements is not fetched, we should wait.return !cp.hasPendingPreparedStmts() }func (cp *CmdProcessor) updatePrepStmtStatus(request []byte, serverStatus uint16) {var (stmtID intprepStmtStatus uint32)cmd : pnet.Command(request[0])switch cmd {case pnet.ComStmtSendLongData, pnet.ComStmtExecute, pnet.ComStmtFetch, pnet.ComStmtReset, pnet.ComStmtClose:stmtID int(binary.LittleEndian.Uint32(request[1:5]))case pnet.ComResetConnection, pnet.ComChangeUser:cp.preparedStmtStatus make(map[int]uint32)returndefault:return}switch cmd {case pnet.ComStmtSendLongData:prepStmtStatus StatusPrepareWaitExecutecase pnet.ComStmtExecute:if serverStatusmysql.ServerStatusCursorExists 0 {prepStmtStatus StatusPrepareWaitFetch}case pnet.ComStmtFetch:if serverStatusmysql.ServerStatusLastRowSend 0 {prepStmtStatus StatusPrepareWaitFetch}}if prepStmtStatus 0 {cp.preparedStmtStatus[stmtID] prepStmtStatus} else {delete(cp.preparedStmtStatus, stmtID)} } 总结 本文从4个疑惑入手阅读了下tiproxy的代码实现都找到了对应的处理逻辑。 对比于tidb、tikv、pd等组件代码tiproxy实简单很多推荐大家学习下。 彩蛋 在梳理上面4个问题的时理清思路后绘制了如下的内部交互图有兴趣的可以自己研究下下篇文章我们将对其进行说明。
http://icebutterfly214.com/news/56640/

相关文章:

  • 2025-11-21 XQQ NOIP Round 1 hetao1733837的record
  • Gephi怎样优化MySQL数据的展示效果
  • Fisrt Blog
  • STM32中断、NVIC、EXTI
  • 题解:AT_agc028_e [AGC028E] High Elements
  • Luogu P10778 BZOJ3569 DZY Loves Chinese II 题解 [ 紫 ] [ Xor Hashing ] [ 线性基 ] [ DFS 树 ]
  • .NET+AI | MEAI | Function Calling 基础(3)
  • NCHU-23207335-面向对象程序设计-BLOG-1
  • 开发智联笔记项目时所遇问题(4)
  • 卡码网94: bellman_ford算法
  • day11-Dify智能体-发布-工作流
  • 2025年热门的成都打印机行业内知名租赁公司排行榜
  • 深入解析:深度学习——Logistic回归中的梯度下降法
  • 详细介绍:压缩与缓存调优实战指南:从0到1根治性能瓶颈(四)
  • 完整教程:【人工智能】神经网络的优化器optimizer(四):Adam自适应动量优化器
  • 让 Maven 能找到本地 JAR 而无需把它上传到公共仓库:
  • Windows Server 2019 中文版、英文版下载 (2025 年 11 月更新)
  • 2025年包头家政服务标杆企业最新推荐:信达家政,保洁|保姆|开荒|月嫂|护工一体化服务新标准
  • markdown的初级使用教程
  • 浙江初中生数学提分老师怎么选?适配杭州、宁波、绍兴、温州学生
  • 2025 年 11 月羊绒衫厂家推荐排行榜,女式羊绒衫,男士羊绒衫,小香风羊绒衫,精选优质羊绒衫公司推荐
  • 【重磅】2025两院院士增选揭晓:144位顶尖专家入选,AI、芯片与量子技术成最大赢家!
  • 留学中介排名TOP10服务体验评选,这家学生最满意
  • 2025年度少儿编程教育机构权威评测:科学体系与竞赛成果双轨制分析
  • 2025年电磁采暖炉品牌厂商权威推荐榜单:电磁感应采暖炉/节能电磁采暖炉/高频电磁采暖炉设备源头厂家精选
  • 2025年双P淋膜防漏杯定制厂家权威推荐榜单:隔热瓦楞杯/双层套筒纸杯/斜口薯条杯源头厂家精选
  • 江苏抗17级台风抗风卷帘门厂家排名怎么选
  • 江苏抗17级台风抗风卷帘门厂家排名前十哪家靠谱
  • 上海有哪些AI企业值得投资?行业发展与企业实力解析
  • Luogu P6893 [ICPC 2014 WF] Buffed Buffet 题解