Posted on 八月 30, 2019
核算对账核心
清算对账系统
支付公司提供的所有金融服务是建立在银行资金体系之上的,支付公司账务系统内账户的资金都与其在银行的存款资金一一对应,为了保证真实的资金账户和虚拟账户的资金转换正确,支付公司必须及时与银行进行各类业务的资金核对,所有资金核对都依赖于银行的系统。
1. 资金流入与银行的对账
从银行流入的资金是由银行侧控制资金结转清算与对账时间,即每日客户通过银行向支付机构充值的资金是由银行实时通知支付机构充值指令的发生,银行在每日晚间经过汇总后向支付机构的银行收款账户入账,同时提供入账清算文件。支付机构获取该文件后,与业务数据进行核对。
对账结果若相符,则没有问题。若出现对账结果不符,很有可能是系统或者业务在某些环节发生问题,存在两种情况:
-
银行充值明细多,支付机构充值明细少;即银行向支付机构入账资金多于支付机构业务发生情况,一般采取临时挂账处理,查出原因后再具体解决;
-
银行充值明细少,支付充值明细多;即银行向支付机构入账资金少于支付机构业务发生情况,则可能对支付机构产生资金损失,一般采取临时挂账处理,查出原因后再具体解决。
2. 资金流出与银行的对账
从支付公司流出的资金是由银行侧控制资金结转清算与对账时间,即每日客户向支付机构申请提现的资金是在每日支付公司批量向银行发起提现请求时,从支付公司银行存款账户扣转,但扣转的结果一般需要一段时间才能从银行侧得到反馈。
当银行侧提供扣转成功和失败的清算文件,支付公司获取这些文件进行明细核对,对于提现失败的申请,由支付公司后台发起直接将资金回充入客户账户,不在对账中心进行对账处理。
什么是对账?
什么是资金对账?
在会计上的概念:指为了保证账簿记录的正确性而进行的有关账项的核对工作,做到账证相符、账账相符、账实相符。
在支付机构的概念:资金对账,在对账中心进行,将系统保存的账务流水与银行返回的清算流水和清算文件进行对账,核对系统账务数据与银行清算数据的一致性,保证支付机构各备付金银行账户每日的预计发生额与实际发生额一致。即核对银行实际清算资金如充值、充退、提现等业务的银行处理结果是否一致。
对账中心的作用?
是主要处理对账的系统模块,主要业务是清算对账。对账中心部署于工作平台,分别接受会计系统和清算系统的数据输入进行对账处理。
对账中心最主要的职责是勾兑银行清算流水与支付系统入账流水,用以检查反映银存实际账户的余额变化与支付系统内部户余额变化是否平衡。对于已经核对无误的银行清算流水和支付系统入账流水,分别进入相应的历史银行流水库和历史入账流水库。
对于勾兑结果中银行清算流水多于系统入账流水的,而操作人员不明确资金的来源,需要根据所设定的分类规则将暂不明确的资金进行挂账处理。而后我们认为该部分资金已经系统入账,可以入历史流水库。
因为此时的银存实际账户的余额增加,与之对应的是支付系统内部户余额也增加了,比如: T 日的挂账可能会在 T+N 日后进行销账确认,而后续的销账行为是对明细流水的业务分流处理,我们不应将 T+N 日的销账所产生的账务流水作为入账流水,不再需要到对账中心体现。
对账内容和数据来源
第一步
入账流水和清算流水进行核对,目的是保证对每一笔订单银行的处理结果和我们系统的业务处理结果一致。
第二步
对账汇总确认单和银行对账单进行核对,目的是通过轧余额等方式,核对各类业务的银行处理结果是否与银行实际清算给我们的资金一致。
1. 对账业务流程
对账业务流程图
2. 对账中心主要功能
银行发生资金变动的入账流水,包括:充值、提现、提现失败、充退、充退失败、退票、购汇、信贷放款、信贷还款、还贷失败一系列业务引起的账务变动信息。这些业务流水在账务系统入账后,会计核心接收到登记分录请求并处理完毕,发送该流水至对账中心,对账中心对于某个业务的反向资金变动所产生入账流水也同步至对账中心。
这样做的目的是让待清算与入账流水在日切点保持等额,比如:提现 T 日会员申请提现,生成文件后会员账户提现金额转入待清算提现款项,与此同时发送流水到对账中心,此时待清算与对账中心入账流水是保持平衡的。
而 T+1 日银行处理失败,系统会回充带清算提现款,如果此时不发送失败流水到对账中心与其对应的流水进行购销的话,则入账流水就会不变,和带清算提现不平衡。
Tips:
-
完整的日结流程支持:银行清算流水入库 → 流水对账 → 流水分类 → 汇总确认单 → 自动挂账 → 销账确认 → 操作员轧账 → 总账日结以及登记银行余额等。
-
完善的报表模型输出:如按入账日期 / 银行日期 / 清算日期等的统计报表、银行账户余额报表、未达款项报表等;
-
提供日切服务:在经确认后进入历史清算流水的,同步汇总该部分数据进入历史流水汇总表中保存,所以在日切时,只需直接将该汇总数据再次汇总即可。
-
外围业务功能支持:提供对于银行账户( 独立于对账中心之外,不参与处理逻辑 )的管理功能,支持与内部户的映射管理,用以满足部分报表需求;提供基于账务核心所提供的业务代码查询功能;提供对于财务系统的交互支持,包括与财务科目的对应关系管理、通知流水数据等;提供用以校验订单与清算流水匹配状态的错账核实功能。
对账流程图
对账中心功能模块分析
获取资金对账数据:
-
获取方式:需要进行清算流水导入的文件一般通过人工在页面上导入,另外有些业务的流水文件通过系统自动匹配或者定时获取的方式得到。
-
清算流水导入渠道非常多:需要统一各清算流水导入功能到一个页面入口, 同时引入清算通道对账的逻辑,将清算流水导入过程需要映射清算通道( 包含新增清算流水等 )。
-
各类业务的清算流水文件的解析和导入逻辑不一样。
清算流水对账:
对账逻辑:
一对一对账:入账流水只可能存在一条,银行入账流水也仅存在一条,然后一对一去对。目前按照一对一对账的业务涉及到:网银 B2C 充值、网银 B2B 充值、VISA、网汇E、卡通充值、正常提现、认证提现、银企互联提现、卡通提现、卖家信贷、信用卡还款提现、COD、网点支付。
对账标准:清算通道 + 订单号 + 金额 Or 银行名称 + 业务代码 + 订单号 + 金额。
满足一对一的业务如下:
-
提现成功:500401
-
认证提现:500402
-
还贷:500404
-
卡通提现:500403
-
个人网银充值:400301
-
企业网银充值:400302
-
VISA:400314
-
网汇e:400315
-
卡通充值:400304
-
贷款:400307
-
银企互联提现:500405
-
信用卡还款提现:500407
-
后台强制提现500406
-
COD
-
网点支付
Tips:
-
清算流水有,入账流水也有,金额一致,对账成功,清算流水、对账流水打上 ‘对账成功’ 标志,记录对账日期为系统当日;
-
清算流水有,入账流水也有,金额不一致,清算流水记金额不等,记录对账日期为系统当日,入账流水不变;
-
清算流水有,入账流水该订单号没有,清算流水打上 ‘银行多帐’ 的标志,记录对账日期为系统当日;
-
清算流水有,入账流水没有该业务代码初始状态的,清算流水打上 ‘银行多帐’ 的标志,记录对账日期为系统当日;
-
清算流水有,入账流水没有该银行的初始状态的记录,清算流水打上 ‘银行多帐’ 的标志,记录对账日期为系统当日;
-
对账之前需要判断 1 对 1 的流水中是否有重复,有重复的返回失败不进行后续对账。
多对多对账:入账流水里相同订单号,相同清算渠道,相同,金额相同业务代码的流水存在对条(比如充退);而银行清算流水也可能是存在多条的情况的,这种情况下是多对多去对账的,遵循先到的先对的原则。
对账标准:清算通道 + 订单号 + 金额 Or 银行名称 + 业务代码 + 订单号 + 金额。
满足多对多的业务如下:
-
充退:410401
-
银企互联充退:410402
-
Motopay 充退:410403
Tips:
-
清算流水有,入账流水有,满足对账标准,则对账成功,清算流水、对账流水打上 ‘对账成功’ 标志,记录对账日期为系统当日;
-
清算流水有,入账流水有,金额不等,清算流水打上 ‘金额不等’ 的标志,记录对账日期为系统当日,入账流水不变;
-
清算流水有,入账流水没有该订单号,清算流水打上‘银行多帐’ 的标志,记录对账日期为系统当日;
-
清算流水有,入账流水没有初始状态 410401 的流水,清算流水打上 ‘银行多帐’ 的标志,记录对账日期为系统当日;
-
清算流水有,入账流水没有这个银行初始状态 410401 的流水,清算流水打上 ‘银行多帐’ 的标志,记录对账日期为系统当日;
-
清算流水有,入账流水没有初始状态的 ‘410401’ 的流水,清算流水打上 ‘银行多帐’ 的标志,记录对账日期为系统当日。
一对多对账:入账流水有多条,和银行的一条去对账。
对账标准:清算通道 + 订单号 + 金额 Or 银行名称 + 业务代码 + 订单号 + 金额
涉及的业务:境外收单
-
境外收单购汇扣款(520101)
-
购汇益回补购汇账户(400319)
Tips:
-
入账流水存在 2 条 520101 的,清算流水存在 1 条 520101 的,将入账流水的 2 条加和后的金额和清算流水的进行对账,一致的为对账成功;
-
入账流水存在 1 条 520101 的,1 条 400319 的,那么要将 2 条金额之差和清算流水的 1 条 520101 的进行对账。不会出现 3 条 520101 或者 2 条 400319 的情况。
内部流水购销 :内部流水购销是针对那种有入账流水表来说的,比如提现业务,提现文件生成就会扣款此时会同步一笔入账流水;而回导失败之后又会回充,此时也会同步一笔反向的业务流水,这两条流水不用再去和银行的资金流水进行对账,直接在入账流水这边进行购销即可。
需要购销的业务如下:
-
购销的规则:非已勾销状态下,同一银行同一订单号,金额一致而业务代码相反的正向流水进行勾销,而且一条反向流水只勾销一条;
-
日终轧账时候的购销:日终轧账的时候会对未购销的流水再次进行购销确保当天的流水都购销完全。(这个情况是为了解决反向流水先于正向流水先出现的逻辑错误情况而加的并提示流水号);
-
如果业务代码相反,而金额不等的情况,就无法进行购销,这种情况原来是作为违反逻辑规则来进行标记的。这部分数据进行查明之后,可以修改流水之后,在日终轧账的时候从新进行购销;
-
购销分 ‘一对一’(提现)和 ‘多对多’ (充退)的购销;
-
勾销的流水不入历史库。
对账汇总确认单:
显示对账结果,选择后续的相应的处理界面,复核员在此模块进行流水的确认,还有的功能就是对于已经对账处理的银行清算流水与系统入账流水进行后续业务分流推进。
-
对于大部分对账成功的数据,可以将这些流水确认,确认后该部分流水将进入历史清算流水,只有进入历史清算流水的才认为银行与支付系统数据对平。对账成功的银行清算流水、账务入账流水全部进入各自历史清算流水表保存,相应的银行清算流水与入账流水同步删除。
-
金额不等的流水,可能是银行清算流水文件有误,也可能是错账,采取的做法是操作员手工修改金额,在银行清算流水管理里操作或直接删除,将其推进到下一轮的对账处理环节,对账是可以重复对的,只要满足初始状态要求的流水即可。
-
多账部分,由于各种已明确/未明确的原因,银行已经实际清算给支付公司了,但支付公司的账务系统并没有入账,需要提供一个流水分类和分类汇总挂账的操作入口。系统默认多账的流水给出的菜单是流水分类,金额不等的流水给出的流水管理页面。区别在于,在点流水分类时,只允许修改业务类型、银行日期,而且改完后不改变多账状态;流水管理,修改状态后,流水的对账状态将变成初始状态,需要进行重新对账。
-
流水进入历史库的校验规则。
举例说明:A 操作员导入了一笔流水,系统对账成功,银行清算流水与账务入账流水状态都被置为成功状态;B 操作员再次导入该相同流水,由于入账流水处于成功状态的可以继续对账,所以再次对账成功。系统在对账环节认为正常,但在入历史清算流水时必须做每组流水数据的平衡检查:必须是清算流水总额与入账流水总额匹配才可以进历史数据。
对于上述的场景,如复核员针对 B 操作员的对账成功流水确认后,可以正常进入历史流水,对账批次号认最后操作的批次。而继续对 A 操作员的成功对账流水确认,则系统将认为是不合法的入历史流水行为。或者不对特定操作员的汇总确认,系统必须检验出所存在的重复银行清算流水,并将对应的明细数据显示复核员。此时可将该重复对账的清算流水删除即可。
流水分类和分类汇总挂账:
-
对于多账的流水对账中心提供一个单独的处理入口,首先在此处进行分类,然后进行汇总挂账处理。操作人员可以根据多账流水的未确认类型修改成对应的业务代码,允许修改成业务类型为 7 的可挂账的类型(其他业务代码不允许挂账);流水可以重复分类,系统不做控制;但已申请挂账的除外,分类完毕的多账流水可直接提供分类汇总挂账操作。
-
系统根据既定的业务代码判断是待查收入或待查支出挂账,页面跳转至相关凭证登记页面,此处业务规则同待查收支挂账;另一部分比如退票,因为不经过对账环节,所以需要直接在清算流水查询里面新增,业务代码 700106 未确认退票,不需要经过对账,直接去汇总确认单里将流水挂账。允许挂账的业务代码如下 :700101 其他应付款,700102 其他应收款,700103 银行错账,700104 未确认结汇款,700105 线下汇款,700106 未确认退票,700107 未确认收款,700108 未确认支出款。
-
提交凭证登记申请相关校验:流水此刻状态是否是 ‘多帐’ ,申请提交人是否是当时的对账人,凭证登记成功,清算流水记录凭证号,流水状态改为 ‘已挂账’ ;
-
汇总挂账的反交易同步入账流水,金额是负值,系统在清算流水里在做一条数据,自动对账。
清算/入账/历史流水管理:
清算流水管理
对于所有的清算流水,都可以在此模块下进行查询、修改、删除,同时也可以新增流水。
-
查询功能,银行名称、业务代码、银行日期不能为空,业务代码和银行可以多选。
-
修改功能,处于未对账(初始)、金额不等、多账、对账成功状态的流水可以进行单笔、批量修改操作,这里的批量修改动作与流水分类功能类似,只是该功能入口不仅支持对批量流水的业务代码、银行日期更新,也支持其他要素信息批量修改,作为提供给操作员在未对账前批量修改已知流水的手段之一;对已挂账的流水,不允许更新。
-
流水删除:处于未对账(初始)、金额不等、多账、对账成功状态的流水可以进行单笔、批量删除操作,已挂账状态不允许删除。
入账流水管理
入账流水管理功能提供对账务入账流水的查询以及下载功能;为了杜绝对入账流水的人为变更操作,禁止支持对入账流水进行修改或删除处理。
-
查询时银行名称、业务代码、入账日期不能为空,业务系统流水号:针对某类业务的业务流水号,如充值业务就是充值流水号;账务流水号:账务处理的流水号,即账务操作记录数据的序号,在账户明细查询里可以获取。
-
对于提现类失败回充的流水,在操作员进行对账动作后,系统自动进行购销,在入账流水查询时,将对账状态选为已购销可以查到,这些不进入历史库,根据数据量,系统定时清理这些数据。
历史流水管理
分为历史入账流水和历史清算流水,查询功能在同一页面,查询时可以一起查询,也可以单独查询。历史流水只供查询和下载,不允许进行修改和删除操作。选择历史银行流水和历史清算流水(清算日期的时间间隔不能超过 7 天)进行查询,历史入账流水中无银行日期,历史银行流水中无入账日期。
银行余额录入功能:
在该模块下,实现对每个银行账户的实际余额录入,以此来和内部账户余额进行匹配校验。分为银行余额登记,银行余额导入(复核),以及银行余额查询功能。
银行余额登记:选择对应日期以及银行账户录入后,保存,此时去银行余额查询是看不到录入余额的,需要复核导入核对完毕后才能看到,保证核对的有效性。登记后系统记录一个临时余额。系统每天凌晨 1:15 的时候会跑批,取上一日已复核过的余额自动带入下日临时余额,如果不登记的话,就取上日余额。
银行余额导入:标准格式是一个填写银行余额的 excle 模板,导入后的核对逻辑,对复核员导入的数据进行检测该帐户是否有临时余额。根据银行账号、银行日期、银行余额等条件查询银行余额表。
判断 1 :如果查询出结果为空那么将返回给用户:“这个银行帐号找不到对应的银行临时余额!”。
判断 2 :如果查询结果大余一条,将返回给用户:“这个银行帐号的一天有多个银行临时余额,不正常!”
判断 3 :检测该银行帐号对应的银行余额是否相等。如果不相等将返回给用户:“临时余额和实际余额不相等,核对失败!”当余额核对成功后,将复核员导入的余额写入该对应帐户的实际余额,并修改余额状态为已复核。实际余额更新成功后,将删除当天日期的临时余额。
银行余额查询:对操作员登记余额的信息,以及复核员复核过的余额进行查询,查询条件:银行名称、银行帐户、帐户状态、银行日期、余额状态。帐户状态:分为正常、废除、和销户三个状态,默认为正常状态。余额状态:分为未录入(N)、已录入(A)、已复核(Y)三个状态,默认为全部。
内部账户登账:
业务简述
针对日常结算工作中非银行待查类的内部账户进行登记,如:清算款、利息、手续费等;该业务登记不产生后续业务登记行为,即不具有作为初始凭证号的使用功能。对于批量销帐类业务处理中,多会员转帐失败的,导致过渡户上有剩余金额情况的,可通过此处进行内部户登账,将过渡户(负债)和银存(资产)余额同时减少,再通过待查收入挂帐实现平衡。
业务校验规则
-
必须登记的是一借一贷帐户信息;
-
借贷方帐户不能一致;
-
金额必须大于 0 ;
-
不允许任意一方是销帐帐户。
是否同步对帐中心流水:不需要同步对帐中心流水。
待查收入挂账:
业务简述
应用于结算操作员针对日常结算工作中的银行待查收入进行登记,所产生的业务凭证号作为后续销帐业务的原业务凭证号,并且作为所有该登记所引发的后续业务登记的初始凭证号。其中待查收入挂帐业务凭证的借方(银存帐户)所对应的银行名称作为后续销帐业务的银行名称,包括差额重新挂帐部分再销帐的业务凭证,都递沿该银行名称。
业务校验规则:
-
必须登记的是一借一贷帐户信息;
-
借贷方帐户不能一致;
-
金额必须大于 0 ;
-
借方帐户必须是银存帐户;
-
贷方帐户必须是销帐帐户。
是否同步对帐中心流水:不需要同步对帐中心流水。
待查收入确认:
业务简述
针对银行待查收入登账的挂帐,可以通过本模块进行销帐。此处采取销帐确认方式进行处理,需要选择相应的待查收入挂帐业务凭证进行销帐业务登记。该业务登记可能产生后续业务登记行为,如差额销帐情况下,系统会自动做不足额部分的重新挂帐并复核通过,所产生的挂帐凭证作为后续销帐凭证的销帐卡片号。
业务校验规则:
-
所销的原待查收入挂帐凭证必须合法;
-
所销的原待查收入挂帐凭证必须已复核通过,处理完毕;
-
所销的原待查收入挂帐凭证不能已被销帐;
-
销帐总额(贷方发生额之和不得大于原待查收入挂帐凭证发生额)并大于 0 ;
-
贷方必须是内部户。
待查支出挂账:
业务简述:
针对日常结算工作中的银行待查支出进行登记,所产生的业务凭证号作为后续销帐业务的原业务凭证号,并且作为所有该登记所引发的后续业务登记的初始凭证号。其中待查收入挂帐业务凭证的贷方( 银存帐户)所对应的银行名称作为后续销帐业务的银行名称,包括差额重新挂帐部分再销帐的业务凭证,都递沿该银行名称。
业务校验规则:
-
必须登记的是一借一贷帐户信息;
-
借贷方帐户不能一致;
-
金额必须大于 0 ;
-
贷方帐户必须是银存帐户;
-
借方帐户必须是销帐帐户 。
是否同步对帐中心流水:不需要同步对帐中心流水。
待查支出确认:
业务简述
针对银行待查支出登账的挂帐,可以通过本模块进行销帐。此处采取销帐确认方式进行处理,需要选择相应的待查支出挂帐业务凭证进行销帐业务登记。该业务登记可能产生后续业务登记行为,如差额销帐情况下系统会自动做不足额部分的重新挂帐并复核通过,所产生的挂帐凭证作为后续销帐凭证的销帐卡片号。
业务校验规则:
-
所销的原待查支出挂帐凭证必须合法;
-
所销的原待查支出挂帐凭证必须已复核通过,处理完毕;
-
所销的原待查支出挂帐凭证不能已被销帐;
-
销帐总额(借方发生额之和不得大于原待查收入挂帐凭证发生额)并大于0 ;
-
借方必须是内部户。
意外数据恢复逻辑1. 意外数据恢复逻辑
重复支付:对同一内部订单号进行了二次或二次以上的支付。
支付失败,金额不等:买家实际支付的金额与交易金额不等。一般产生的原因是,买家在支付时,产生了掉单,卖家随后修改了交易价格。 在进行网银对账的时候,即会出现订单金额和交易金额不等的情况,且是一笔掉单。2、3 两类情况只发生在支付上。
支付成功,金额不等(这一类异常,偶尔有发生):商户订单状态为成功,后台订单状态也为成功,并且交易状态是买家已付款,等待卖家发货。
(金额不等,支付成功,是因为会员对一个交易进行支付,但由于网络或银行系统等原因支付公司未接收到银行扣款信息,支付侧交易状态未予以变更,后卖家对该交易修改了价格,买家又对该修改过的价格进行了支付。但该支付成功的信息仍然没有被支付侧接收到,该交易状态仍未变更,后支付侧后台人员先对后面的那笔意外数据进行了恢复,后再对前面那笔意外数据恢复,就会出现这种“金额不等,支付成功”的数据)
2. 对帐及异常恢复逻辑
以商户成功订单为准:
用商户上的成功订单与后台的订单来核对:
-
若商户订单为成功,后台为初始或者失败,则更改后台状态为成功;
-
若后台为成功,商户成功订单中无该订单(时间差),则不更改后台状态;
-
若后台为初始或失败,商户成功订单中无该订单,则不更改后台状态。
不重复恢复:
T+1 日恢复T 日的订单,并且在 T+1 日后不再下载 T 日的订单进行二次或多次恢复。(为考虑会员感受,T 日下班前恢复 T 日 0 点到下班时点的订单)
时间性差异:
由于各家银行系统日切点均不同,并且大多不会在每日的 24 点(或早或晚),所以下载到的 T 日的订单流水与后台T日( 0:00-24:00 )的订单流水并不能全部对应上。将商户订单流水与后台订单流水核对,会出现商户有,后台无;商户无,后台有;商户有,后台有三种情况。对于 1、2 两种情况为我们所说的时间性差异。
商户数据与后台数据的关系为:
商户数据-(商户有,后台无)+(商户无,后台有)= 后台数据
Updated on 一月 17, 2019
CompletableFuture 案例
private static ThreadFactory namedTreadFactory = new ThreadFactoryBuilder().setNameFormat("lottery-pool-%d").build(); private static ExecutorService doLotteryPool = new ThreadPoolExecutor(1, 5, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(2), namedTreadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); public static void main(String[] args) { Long start = System.currentTimeMillis(); // 结果集 List<String> list = new ArrayList<>(); List<Integer> taskList = Arrays.asList(2, 1, 3,2, 1, 3,2, 1, 3,2, 1, 3,2, 1, 3,2, 1, 3,2, 1, 3); // 全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取 List<CompletableFuture<String>> futures = taskList.stream() .map(integer -> CompletableFuture.supplyAsync(() -> calc(integer), doLotteryPool) //当计算完成的时候请执行某个function .thenApply(h -> thenApply(h) ) //计算结果完成时的处理 .whenComplete((s, e) -> { System.out.println("任务" + s + "完成!result=" + s + ",异常 e=" + e + "," + new Date()); list.add(s); }) .exceptionally(e -> { //e.printStackTrace(); System.out.println("出现异常"+e); //返回一个默认值 return "0"; }) ).collect(Collectors.toList()); // 封装后无返回值,必须自己whenComplete()获取 CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join(); System.out.println("list=" + list + ",耗时=" + (System.currentTimeMillis() - start)); // 输出结果; //获取值 System.out.println("----------等待结果返回-------------"); String res = futures.stream() .map(f -> { try { return f.thenApply(String::valueOf).get(); } catch (Exception e) { e.printStackTrace(); } return ""; }).collect(Collectors.joining(" , ", "", "")); System.out.println("res = " + res); } private static String thenApply(Integer h) { System.out.println("当计算完成的时候请执行某个function"); return Integer.toString(h); } public static Integer calc(Integer i) { try { if (i == 1) { Thread.sleep(3000);//任务1耗时3秒 } else if (i == 5) { Thread.sleep(5000);//任务5耗时5秒 } else { Thread.sleep(1000);//其它任务耗时1秒 } System.out.println("task线程:" + Thread.currentThread().getName() + "任务i=" + i + ",完成!+" + new Date()); } catch (InterruptedException e) { e.printStackTrace(); } if(i==3){ int num=1/0; } return i; }
Posted on 一月 17, 2019
CompletableFuture 详解
一、Future模式
Java 1.5开始,提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。
Future接口可以构建异步应用,是多线程开发中常见的设计模式。
当我们需要调用一个函数方法时。如果这个函数执行很慢,那么我们就要进行等待。但有时候,我们可能并不急着要结果。
因此,我们可以让被调用者立即返回,让他在后台慢慢处理这个请求。对于调用者来说,则可以先处理一些其他任务,在真正需要数据的场合再去尝试获取需要的数据。
用法如下:
public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); Future<String> future=executor.submit(new Callable<String>() { @Override public String call() throws Exception { String result=new Random().nextLong()+""; Thread.sleep(2000); return result; } }); try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
Future以及相关使用方法提供了异步执行任务的能力,但对于结果的获取却是不方便,只能通过阻塞或轮询的方式得到任务结果。阻塞的方式与我们理解的异步编程其实是相违背的,而轮询又会耗无谓的CPU资源。而且还不能及时得到计算结果.
Future 接口的局限性
了解了Future的使用,这里就要谈谈Future的局限性。Future很难直接表述多个Future 结果之间的依赖性,开发中,我们经常需要达成以下目的:
将两个异步计算合并为一个(这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果)
等待 Future 集合中的所有任务都完成。
仅等待 Future 集合中最快结束的任务完成,并返回它的结果。
CompletableFuture的作用
CompletableFuture是Java8的一个新加的类,它在原来的Future类上,结合Java8的函数式编程,扩展了一系列强大的功能.
CompletableFuture 的方法比较多;
Either
Either 表示的是两个CompletableFuture,当其中任意一个CompletableFuture计算完成的时候就会执行。
方法名 |
描述 |
acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) |
当任意一个CompletableFuture完成的时候,action这个消费者就会被执行。 |
acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) |
当任意一个CompletableFuture完成的时候,action这个消费者就会被执行。使用ForkJoinPool |
acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor) |
当任意一个CompletableFuture完成的时候,action这个消费者就会被执行。使用指定的线程池 |
例子:
public static void main(String[] args) { Random random = new Random(); CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } return "from future1"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } return "from future2"; }); CompletableFuture<Void> future = future1.acceptEither(future2, str -> System.out.println("The future is " + str)); try { future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
执行结果:The future is from future1 或者 The future is from future2。
因为future1和future2,执行的顺序是随机的。
applyToEither 方法:
applyToEither 跟 acceptEither 类似。
方法名 |
描述 |
applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn) |
当任意一个CompletableFuture完成的时候,fn会被执行,它的返回值会当作新的CompletableFuture<U>的计算结果。 |
applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn) |
当任意一个CompletableFuture完成的时候,fn会被执行,它的返回值会当作新的CompletableFuture<U>的计算结果。使用ForkJoinPool |
applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor) |
当任意一个CompletableFuture完成的时候,fn会被执行,它的返回值会当作新的CompletableFuture<U>的计算结果。使用指定的线程池 |
例子省略。。。。。
allOf 方法
方法名 |
描述 |
allOf(CompletableFuture<?>… cfs) |
在所有Future对象完成后结束,并返回一个future。 |
allOf()方法所返回的CompletableFuture,并不能组合前面多个CompletableFuture的计算结果。于是我们借助Java 8的Stream来组合多个future的结果。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "tony");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "cafei");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "aaron");
CompletableFuture.allOf(future1, future2, future3) .thenApply(v -> Stream.of(future1, future2, future3) .map(CompletableFuture::join) .collect(Collectors.joining(" "))) .thenAccept(System.out::print);
|
执行结果:
tony cafei aaron
3.7.2 anyOf
方法名 |
描述 |
anyOf(CompletableFuture<?>… cfs) |
在任何一个Future对象结束后结束,并返回一个future。 |
Random rand = new Random(); CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(rand.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } return "from future1"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(rand.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } return "from future2"; }); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(rand.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } return "from future3"; }); CompletableFuture<Object> future = CompletableFuture.anyOf(future1,future2,future3); try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
使用anyOf()时,只要某一个future完成,就结束了。所以执行结果可能是"from future1"、"from future2"、"from future3"中的任意一个。
anyOf 和 acceptEither、applyToEither的区别在于,后两者只能使用在两个future中,而anyOf可以使用在多个future中。
异步方法:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); } public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); }
supplyAsync 返回Future有结果,而runAsync返回CompletableFuture<Void>。
Executor参数可以手动指定线程池,否则默认ForkJoinPool.commonPool()系统级公共线程池。
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> { |
在这段代码中,runAsync 是异步执行的 ,通过 Thread.currentThread().isDaemon() 打印的结果就可以知道是Daemon线程异步执行的。
同步执行方法:
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); public CompletableFuture<Void> thenAccept(Consumer<? super T> action); public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action); public CompletableFuture<Void> thenRun(Runnable action); CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApply(s -> { System.out.println(Thread.currentThread().isDaemon() + "_"+ s); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } return s.toUpperCase(); }); System.out.println("done=" + cf.isDone()); TimeUnit.SECONDS.sleep(4); System.out.println("done=" + cf.isDone()); // 一直等待成功,然后返回结果 System.out.println(cf.join());
thenApply
当前阶段正常完成以后执行,而且当前阶段的执行的结果会作为下一阶段的输入参数。thenApplyAsync默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。
thenApply相当于回调函数(callback)
thenAccept与thenRun
可以看到,thenAccept和thenRun都是无返回值的。如果说thenApply是不停的输入输出的进行生产,那么thenAccept和thenRun就是在进行消耗。它们是整个计算的最后两个阶段。
同样是执行指定的动作,同样是消耗,二者也有区别:
thenAccept接收上一阶段的输出作为本阶段的输入,同步执行;
thenRun根本不关心前一阶段的输出,根本不不关心前一阶段的计算结果,因为它不需要输入参数
CompletableFuture异常处理
exceptionally
方法名 |
描述 |
exceptionally(Function<Throwable,? extends T> fn) |
只有当CompletableFuture抛出异常的时候,才会触发这个exceptionally的计算,调用function计算值。 |
CompletableFuture.supplyAsync(() -> "hello world") .thenApply(s -> { s = null; int length = s.length(); return length; }).thenAccept(i -> System.out.println(i)) .exceptionally(t -> { System.out.println("Unexpected error:" + t); return null; });
执行结果:
Unexpected error:java.util.concurrent.CompletionException: java.lang.NullPointerException
对上面的代码稍微做了一下修改,修复了空指针的异常。
CompletableFuture.supplyAsync(() -> "hello world") .thenApply(s -> { // s = null; int length = s.length(); return length; }).thenAccept(i -> System.out.println(i)) .exceptionally(t -> { System.out.println("Unexpected error:" + t); return null; });
执行结果:
11
whenComplete
whenComplete 在上一篇文章其实已经介绍过了,在这里跟exceptionally的作用差不多,可以捕获任意阶段的异常。如果没有异常的话,就执行action。
CompletableFuture.supplyAsync(() -> "hello world") .thenApply(s -> { s = null; int length = s.length(); return length; }).thenAccept(i -> System.out.println(i)) .whenComplete((result, throwable) -> { if (throwable != null) { System.out.println("Unexpected error:"+throwable); } else { System.out.println(result); } });
执行结果:
Unexpected error:java.util.concurrent.CompletionException: java.lang.NullPointerException
跟whenComplete相似的方法是handle,handle的用法在上一篇文章中也已经介绍过。
方法名 |
描述 |
thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) |
在异步操作完成的时候对异步操作的结果进行一些操作,并且仍然返回CompletableFuture类型。 |
thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) |
在异步操作完成的时候对异步操作的结果进行一些操作,并且仍然返回CompletableFuture类型。使用ForkJoinPool。 |
thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor) |
在异步操作完成的时候对异步操作的结果进行一些操作,并且仍然返回CompletableFuture类型。使用指定的线程池。 |
thenCompose可以用于组合多个CompletableFuture,将前一个结果作为下一个计算的参数,它们之间存在着先后顺序。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello") .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World")); try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
执行结果:
Hello World
下面的例子展示了多次调用thenCompose()
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> "100") .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "100")) .thenCompose(s -> CompletableFuture.supplyAsync(() -> Double.parseDouble(s))); try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
执行结果:
100100.0
组合
方法名 |
描述 |
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) |
当两个CompletableFuture都正常完成后,执行提供的fn,用它来组合另外一个CompletableFuture的结果。 |
thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) |
当两个CompletableFuture都正常完成后,执行提供的fn,用它来组合另外一个CompletableFuture的结果。使用ForkJoinPool。 |
thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) |
当两个CompletableFuture都正常完成后,执行提供的fn,用它来组合另外一个CompletableFuture的结果。使用指定的线程池。 |
现在有CompletableFuture<T>、CompletableFuture<U>和一个函数(T,U)->V,thenCompose就是将CompletableFuture<T>和CompletableFuture<U>变为CompletableFuture<V>。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "100"); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100); CompletableFuture<Double> future = future1.thenCombine(future2, (s, i) -> Double.parseDouble(s + i)); try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
执行结果:
100100.0
使用thenCombine()之后future1、future2之间是并行执行的,最后再将结果汇总。这一点跟thenCompose()不同。
thenAcceptBoth跟thenCombine类似,但是返回CompletableFuture<Void>类型。
方法名 |
描述 |
thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) |
当两个CompletableFuture都正常完成后,执行提供的action,用它来组合另外一个CompletableFuture的结果。 |
thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) |
当两个CompletableFuture都正常完成后,执行提供的action,用它来组合另外一个CompletableFuture的结果。使用ForkJoinPool。 |
thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor) |
当两个CompletableFuture都正常完成后,执行提供的action,用它来组合另外一个CompletableFuture的结果。使用指定的线程池。 |
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "100"); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100); CompletableFuture<Void> future = future1.thenAcceptBoth(future2, (s, i) -> System.out.println(Double.parseDouble(s + i))); try { future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
执行结果:
100100.0
3.5 计算结果完成时的处理
当CompletableFuture完成计算结果后,我们可能需要对结果进行一些处理。
3.5.1 执行特定的Action
方法名 |
描述 |
whenComplete(BiConsumer<? super T,? super Throwable> action) |
当CompletableFuture完成计算结果时对结果进行处理,或者当CompletableFuture产生异常的时候对异常进行处理。 |
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) |
当CompletableFuture完成计算结果时对结果进行处理,或者当CompletableFuture产生异常的时候对异常进行处理。使用ForkJoinPool。 |
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) |
当CompletableFuture完成计算结果时对结果进行处理,或者当CompletableFuture产生异常的时候对异常进行处理。使用指定的线程池。 |
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s->s+" World")
.thenApply(s->s+ "\nThis is CompletableFuture demo")
.thenApply(String::toLowerCase)
.whenComplete((result, throwable) -> System.out.println(result));
执行结果:
hello world
this is completablefuture demo
3.5.2 执行完Action可以做转换
方法名 |
描述 |
handle(BiFunction<? super T, Throwable, ? extends U> fn) |
当CompletableFuture完成计算结果或者抛出异常的时候,执行提供的fn |
handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) |
当CompletableFuture完成计算结果或者抛出异常的时候,执行提供的fn,使用ForkJoinPool。 |
handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) |
当CompletableFuture完成计算结果或者抛出异常的时候,执行提供的fn,使用指定的线程池。 |
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> "100")
.thenApply(s->s+"100")
.handle((s, t) -> s != null ? Double.parseDouble(s) : 0);
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
执行结果:
100100.0
在这里,handle()的参数是BiFunction,apply()方法返回R,相当于转换的操作。
@FunctionalInterface
public interface BiFunction<T, U, R> {
/**
* Applies this function to the given arguments.
*
* @param t the first function argument
* @param u the second function argument
* @return the function result
*/
R apply(T t, U u);
/**
* Returns a composed function that first applies this function to
* its input, and then applies the {@code after} function to the result.
* If evaluation of either function throws an exception, it is relayed to
* the caller of the composed function.
*
* @param <V> the type of output of the {@code after} function, and of the
* composed function
* @param after the function to apply after this function is applied
* @return a composed function that first applies this function and then
* applies the {@code after} function
* @throws NullPointerException if after is null
*/
default <V> BiFunction<T, U, V> andThen(Function<? super R, ? extends V> after) {
Objects.requireNonNull(after);
return (T t, U u) -> after.apply(apply(t, u));
}
}
而whenComplete()的参数是BiConsumer,accept()方法返回void。
@FunctionalInterface
public interface BiConsumer<T, U> {
/**
* Performs this operation on the given arguments.
*
* @param t the first input argument
* @param u the second input argument
*/
void accept(T t, U u);
/**
* Returns a composed {@code BiConsumer} that performs, in sequence, this
* operation followed by the {@code after} operation. If performing either
* operation throws an exception, it is relayed to the caller of the
* composed operation. If performing this operation throws an exception,
* the {@code after} operation will not be performed.
*
* @param after the operation to perform after this operation
* @return a composed {@code BiConsumer} that performs in sequence this
* operation followed by the {@code after} operation
* @throws NullPointerException if {@code after} is null
*/
default BiConsumer<T, U> andThen(BiConsumer<? super T, ? super U> after) {
Objects.requireNonNull(after);
return (l, r) -> {
accept(l, r);
after.accept(l, r);
};
}
}
所以,handle()相当于whenComplete()+转换。
3.5.3 纯消费(执行Action)
方法名 |
描述 |
thenAccept(Consumer<? super T> action) |
当CompletableFuture完成计算结果,只对结果执行Action,而不返回新的计算值 |
thenAcceptAsync(Consumer<? super T> action) |
当CompletableFuture完成计算结果,只对结果执行Action,而不返回新的计算值,使用ForkJoinPool。 |
thenAcceptAsync(Consumer<? super T> action, Executor executor) |
当CompletableFuture完成计算结果,只对结果执行Action,而不返回新的计算值 |
thenAccept()是只会对计算结果进行消费而不会返回任何结果的方法。
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s->s+" World")
.thenApply(s->s+ "\nThis is CompletableFuture demo")
.thenApply(String::toLowerCase)
.thenAccept(System.out::print);
执行结果:
hello world
this is completablefuture demo
Posted on 十二月 24, 2018
基于千万数据的在线表结构修改
在系统正常运作一定时间后,随着市场、产品汪的需求不断变更,比较大的一些表结构面临不得不增加字段的方式来扩充满足业务需求;并且是不能停止服务器。在 MySQL 在体量上了千万、数据的时候,Alter Table 的操作,可以让你等一天,而且在高峰期执行这种 SQL ,会造成表被锁,增加字段的操作需要很长的时间 ,让你的数据库也承担着压力。
方案: pt-online-schema-change 工具
https://segmentfault.com/a/1190000014924677
方案二:土方法 rename;
1. 创建目的表结构的空表,A_new (新加的字段也有了);
2. 在A表上创建触发器,包括增、删、改触发器;这样在copy的过程中,有其他的操作的话可以同步进行处理;
3. 通过insert…select…limit N 语句分片拷贝数据到目的表 (这个可以通过一个定时器Job)去做这些事情。
4. Copy完成后,将A_new表rename到A表。
触发器例子:
增加触发触发器;
DELIMITER $$
USE `demo_1`$$
DROP TRIGGER /*!50032 IF EXISTS */ `user_insert`$$
CREATE /*!50017 DEFINER = 'root'@'localhost' */ TRIGGER `user_insert` AFTER INSERT ON `t_student_0` FOR EACH ROW BEGIN — 插入目标表 INSERT INTO `demo_1`.`t_student_1` VALUES (new.id, new.student_id,new.name,new.age); END; $$ DELIMITER ; |
修改触发触发器:
DELIMITER $$
USE `demo_1`$$
DROP TRIGGER /*!50032 IF EXISTS */ `user_update`$$
CREATE /*!50017 DEFINER = 'root'@'localhost' */ TRIGGER `user_update` AFTER UPDATE ON `t_student_0` FOR EACH ROW BEGIN UPDATE `demo_1`.`t_student_1` SET student_id=new.student_id,NAME=new.name, age=new.age WHERE id=old.id; END; $$
DELIMITER ; |
删除触发触发器:
DELIMITER $$
USE `demo_1`$$
DROP TRIGGER /*!50032 IF EXISTS */ `user_delete`$$
CREATE /*!50017 DEFINER = 'root'@'localhost' */ TRIGGER `user_delete` AFTER DELETE ON `t_student_0` FOR EACH ROW BEGIN DELETE FROM `demo_1`.`t_student_1` WHERE id=old.id; END; $$
DELIMITER ; |
Posted on 十二月 19, 2018
基于binlog 增量的数据解析服务
什么是binlog
binlog是mysql的一种二进制日志文件,用来记录数据的变化。mysql使用binlog进行主从复制,如图:
客户端向master的mysql sever写入数据
-
当数据发生变化时,master将变更的数据记录写入到二进制文件中,即binlog。
-
slave订阅了master的binlog,所以会通过一个I/O THREAD与master的DUMP THREAD进行通信,同步binlog
-
I/O THREAD读取到binlog后会吸入到relay log中,准备重放。
-
slave会通过SQL THREAD读取relay log,重放数据的改动并执行相应的改动。
这里有几点需要注意:
-
主从复制不是强一致性,只能保证最终一致
-
master配合binlog复制会影响性能,所以尽量不要在master上挂太多的slave,如果对时间要求不高,可以在slave上挂slave
2.binlog的业务应用
上面介绍了mysql中应用binlog的场景,而我们的业务可以伪装成master的slave节点,感知数据的变化,这就给了我们很多的业务运用空间。
2.1 数据异构
经常有这样一个场景:
原来业务是一个很单一的系统,所以表也在一起。随着业务的发展,系统开始拆分,总有一些表是各个业务都关注的表,但是对相关的字段的运用场景不同,所以这样一份元数据怎样更好的为各个系统服务就成了问题。当然,多写或者读写分离可以从物理节点上减少对数据服务器的压力,但是对业务并没有做到足够的支持,因为这些表都是一样的。因此我们可以通过binlog进行数据异构。
如图所示,订单系统生成订单后,通过binlog可以解析生成用户维度的订单信息供用户中心查询、商户维度订单表供运营管理,以及搜索系统的搜索数据,提供全文搜索功能。
这样,我们就通过原始的订单数据异构到三个系统中,提供了丰富的数据访问功能。不仅从节点上降低了数据服务器的压力,数据表现形式也更贴近自己的服务,减少不必要的字段冗余。
2.2 缓存数据的补充
对于高并发的系统,数据库往往是系统性能的瓶颈,毕竟IO响应速度是远远小于电子的运算速度的。因此,很多查询类服务都会在CPU与数据库之间加上一层缓存。即现从缓存获取,命中后直接返回,否则从DB中获取并存入缓存后返回。而如果原始数据变化了但缓存尚未超时,则缓存中的数据就是过时的数据了。当数据有变更的时候主动修改缓存数据。
当客户端更改了数据之后,中间件系统通过binlog获得数据变更,并同步到缓存中。这样就保证了缓存中数据有效性,减少了对数据库的调用,从而提高整体性能。
2.3 基于数据的任务分发
有这样一个场景:
很多系统依赖同一块重要数据,当这些数据发生变化的时候,需要调用其他相关系统的通知接口同步数据变化,或者mq消息告知变化并等待其主动同步。这两种情况都对原始系统造成了侵入,原始系统改一块数据,并不想做这么多其他的事情。所以这时候可以通过binlog进行任务分发。
当原始业务系统修改数据后,不需要进行其他的业务关联。由调度系统读取binlog进行相应的任务分发、消息发送以及同步其他业务状态。这样可以将其他业务与原始业务系统解耦,并从数据的角度将所有管理功能放在了同一个调度系统中,责任清晰。
2.4 可以用于数据平滑迁移:
https://www.w3cschool.cn/architectroad/architectroad-data-smooth-migration.html
2.5 数据抽取:
https://blog.csdn.net/u010670689/article/details/81066807
mysql bin-log相关比较好的开源项目
https://blog.csdn.net/everlasting_188/article/details/53304530
binlog的业务实践:
1. 首先需要开启mysql bin_log功能;
启动成功之后,我们可以登陆查看我们的配置是否起作用
show variables like '%log_bin%'
本次解析只考虑insert、update、delete三种事件类型
采用OpenReplicator解析MySQL binlog
Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用。Open Replicator可以被应用到MySQL数据变化的实时推送,多Master到单Slave的数据同步等多种应用场景。Open Replicator目前只支持MySQL5.0及以上版本。
Open Replicator项目地址:https://github.com/whitesock/open-replicator
binlog事件分析结构图
在阅读下面的内容时,首先需要对binlog有一定的了解,可以 参考《MySQL Binlog解析》。
这里通过open-replicator解析binlog日志事件(binlog-format = row)。binlog日志事件里存在两种操作:DDL和DML,当DDL时输出一条sql,当DML时输出相关行信息。可以参考下面:
DDL(CREATE, ALTER, DROP, TRUNCATE,主要用在定义或改变表的结构):
{
"eventId": 1,
"databaseName": "canal_test",
"tableName": "`company`",
"eventType": 2,
"timestamp": 1477033198000,
"timestampReceipt": 1477033248780,
"binlogName": "mysql-bin.000006",
"position": 353,
"nextPostion": 468,
"serverId": 2,
"before": null,
"after": null,
"isDdl": true,
"sql": "DROP TABLE `company` /* generated by server */"
}
DML(SELECT, UPDATE, INSERT, DELETE,对数据库里的数据进行操作):
{
"eventId": 0,
"databaseName": "canal_test",
"tableName": "person",
"eventType": 24,
"timestamp": 1477030734000,
"timestampReceipt": 1477032161988,
"binlogName": "mysql-bin.000006",
"position": 242,
"nextPostion": 326,
"serverId": 2,
"before": {
"id": "3",
"sex": "f",
"address": "shanghai",
"age": "23",
"name": "zzh3"
},
"after": {
"id": "3",
"sex": "m",
"address": "shanghai",
"age": "23",
"name": "zzh3"
},
"isDdl": false,
"sql": null
}
相关的类文件如下:
OpenReplicatorTest:
public class OpenReplicatorTest {
private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorTest.class);
private static final String host = "192.168.56.101";
private static final int port = 3306;
private static final String user = "root";
private static final String password = "123456";
public static void main(String[] args) throws IOException {
OpenReplicator or = new OpenReplicator ();
or.setUser(user);
or.setPassword(password);
or.setHost(host);
or.setPort(port);
MysqlConnection.setConnection(host, port, user, password);
// or.setServerId(MysqlConnection.getServerId());
//配置里的serverId是open-replicator(作为一个slave)的id,不是master的serverId
BinlogMasterStatus bms = MysqlConnection.getBinlogMasterStatus();
or.setBinlogFileName(bms.getBinlogName());
or.setBinlogPosition(4);
or.setBinlogEventListener(new NotificationListener());
try {
or.start();
} catch (Exception e) {
logger.error(e.getMessage(),e);
}
Thread thread = new Thread(new PrintLogEvent());
thread.start();
}
public static class PrintLogEvent implements Runnable{
@Override
public void run() {
while(true){
if(CDCEventManager.queue.isEmpty() == false)
{
LogEvent ce = CDCEventManager.queue.pollFirst();
String prettyStr1 = JSON.toJSONString(ce);
System.out.println(prettyStr1);
}
else{
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
LogEvent:
public class LogEvent implements Serializable {
/**
* 只针对delete、insert、update事件
*/
private static final long serialVersionUID = 5503152746318421290L;
private long eventId = 0;//事件唯一标识
private String databaseName = null;
private String tableName = null;
private int eventType = 0;//事件类型
private long timestamp = 0;//事件发生的时间戳[MySQL服务器的时间]
private long timestampReceipt = 0;//Open-replicator接收到的时间戳[CDC执行的时间戳]
private long position = 0;
private long nextPostion = 0;
private long serverId = 0;
private Map<String, String> before = null;
private Map<String, String> after = null;
private Boolean isDdl = null;
private String sql = null;
private static AtomicLong uuid = new AtomicLong(0);
public LogEvent() {}
public LogEvent(final AbstractBinlogEventV4 are, String databaseName, String tableName) {
this.init(are);
this.databaseName = databaseName;
this.tableName = tableName;
}
private void init(final BinlogEventV4 be) {
this.eventId = uuid.getAndAdd(1);
BinlogEventV4Header header = be.getHeader();
this.timestamp = header.getTimestamp();
this.eventType = header.getEventType();
this.serverId = header.getServerId();
this.timestampReceipt = header.getTimestampOfReceipt();
this.position = header.getPosition();
this.nextPostion = header.getNextPosition();
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("{ eventId:").append(eventId);
builder.append(",databaseName:").append(databaseName);
builder.append(",tableName:").append(tableName);
builder.append(",eventType:").append(eventType);
builder.append(",timestamp:").append(timestamp);
builder.append(",timestampReceipt:").append(timestampReceipt);
builder.append(",position:").append(position);
builder.append(",nextPostion:").append(nextPostion);
builder.append(",serverId:").append(serverId);
builder.append(",isDdl:").append(isDdl);
builder.append(",sql:").append(sql);
builder.append(",before:").append(before);
builder.append(",after:").append(after).append("}");
return builder.toString();
}
public long getEventId() {
return eventId;
}
public void setEventId(long eventId) {
this.eventId = eventId;
}
public String getDatabaseName() {
return databaseName;
}
public void setDatabaseName(String databaseName) {
this.databaseName = databaseName;
}
public String getTableName() {
return tableName;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
public int getEventType() {
return eventType;
}
public void setEventType(int eventType) {
this.eventType = eventType;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public long getTimestampReceipt() {
return timestampReceipt;
}
public void setTimestampReceipt(long timestampReceipt) {
this.timestampReceipt = timestampReceipt;
}
public long getPosition() {
return position;
}
public void setPosition(long position) {
this.position = position;
}
public long getNextPostion() {
return nextPostion;
}
public void setNextPostion(long nextPostion) {
this.nextPostion = nextPostion;
}
public long getServerId() {
return serverId;
}
public void setServerId(long serverId) {
this.serverId = serverId;
}
public Map<String, String> getBefore() {
return before;
}
public void setBefore(Map<String, String> before) {
this.before = before;
}
public Map<String, String> getAfter() {
return after;
}
public void setAfter(Map<String, String> after) {
this.after = after;
}
public Boolean getDdl() {
return isDdl;
}
public void setDdl(Boolean ddl) {
isDdl = ddl;
}
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
}
public class CDCEventManager {
public static final ConcurrentLinkedDeque<LogEvent> queue = new ConcurrentLinkedDeque<LogEvent>();
}
MysqlConnection:
public class MysqlConnection {
private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);
private static Connection conn;
private static String host;
private static int port;
private static String user;
private static String password;
public static void setConnection(String hostArg, int portArg, String userArg, String passwordArg) {
try {
if (conn == null || conn.isClosed()) {
Class.forName("com.mysql.jdbc.Driver");
host = hostArg;
port = portArg;
user = userArg;
password = passwordArg;
conn = DriverManager.getConnection("jdbc:mysql://" + host + ":" + port + "/", user, password);
logger.info("connected to mysql:{} : {}", user, password);
}
} catch (ClassNotFoundException e) {
logger.error(e.getMessage(), e);
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
}
public static Connection getConnection() {
try {
if (conn == null || conn.isClosed()) {
setConnection(host, port, user, password);
}
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
return conn;
}
/**
* 获取Column信息
*
* @return
*/
public static Map<String, List<ColumnInfo>> getColumns() {
Map<String, List<ColumnInfo>> cols = new HashMap<>();
Connection conn = getConnection();
try {
DatabaseMetaData metaData = conn.getMetaData();
ResultSet r = metaData.getCatalogs();
String tableType[] = {"TABLE"};
while (r.next()) {
String databaseName = r.getString("TABLE_CAT");
ResultSet result = metaData.getTables(databaseName, null, null, tableType);
while (result.next()) {
String tableName = result.getString("TABLE_NAME");
// System.out.println(result.getInt("TABLE_ID"));
String key = databaseName + "." + tableName;
ResultSet colSet = metaData.getColumns(databaseName, null, tableName, null);
cols.put(key, new ArrayList<ColumnInfo>());
while (colSet.next()) {
ColumnInfo columnInfo = new ColumnInfo(colSet.getString("COLUMN_NAME"),
colSet.getString("TYPE_NAME"));
cols.get(key).add(columnInfo);
}
}
}
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
return cols;
}
/**
* 参考 mysql> show binary logs +------------------+-----------+ | Log_name | File_size |
* +------------------+-----------+ | mysql-bin.000001 | 126 | | mysql-bin.000002 | 126 | |
* mysql-bin.000003 | 6819 | | mysql-bin.000004 | 1868 | +------------------+-----------+
*/
public static List<BinlogInfo> getBinlogInfo() {
List<BinlogInfo> binlogList = new ArrayList<>();
Connection conn = null;
Statement statement = null;
ResultSet resultSet = null;
try {
conn = getConnection();
statement = conn.createStatement();
resultSet = statement.executeQuery("show binary logs");
while (resultSet.next()) {
BinlogInfo binlogInfo = new BinlogInfo(resultSet.getString("Log_name"), resultSet.getLong("File_size"));
binlogList.add(binlogInfo);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
try {
if (resultSet != null) { resultSet.close(); }
if (statement != null) { statement.close(); }
if (conn != null) { conn.close(); }
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
}
return binlogList;
}
/**
* 参考: mysql> show master status; +------------------+----------+--------------+------------------+ | File |
* Position | Binlog_Do_DB | Binlog_Ignore_DB | +------------------+----------+--------------+------------------+ |
* mysql-bin.000004 | 1868 | | |
* +------------------+----------+--------------+------------------+
*
* @return
*/
public static BinlogMasterStatus getBinlogMasterStatus() {
BinlogMasterStatus binlogMasterStatus = new BinlogMasterStatus();
Connection conn = null;
Statement statement = null;
ResultSet resultSet = null;
try {
conn = getConnection();
statement = conn.createStatement();
resultSet = statement.executeQuery("show master status");
while (resultSet.next()) {
binlogMasterStatus.setBinlogName(resultSet.getString("File"));
binlogMasterStatus.setPosition(resultSet.getLong("Position"));
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
try {
if (resultSet != null) { resultSet.close(); }
if (statement != null) { statement.close(); }
if (conn != null) { conn.close(); }
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
}
return binlogMasterStatus;
}
/**
* 获取open-replicator所连接的mysql服务器的serverid信息
*
* @return
*/
public static int getServerId() {
int serverId = 6789;
Connection conn = null;
Statement statement = null;
ResultSet resultSet = null;
try {
conn = getConnection();
statement = conn.createStatement();
resultSet = statement.executeQuery("show variables like 'server_id'");
while (resultSet.next()) {
serverId = resultSet.getInt("Value");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
try {
if (resultSet != null) { resultSet.close(); }
if (statement != null) { statement.close(); }
if (conn != null) { conn.close(); }
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
}
return serverId;
}
}
NotificationListener:
public class NotificationListener implements BinlogEventListener {
private static Logger logger = LoggerFactory.getLogger(NotificationListener.class);
@Override
public void onEvents(BinlogEventV4 event) {
if (event == null) {
logger.error("binlog event is null");
return;
}
int eventType = event.getHeader().getEventType();
System.out.println("eventType---->"+ MySqlEventTypeIdToString.getInstance().get(eventType));
switch (eventType) {
case MySQLConstants.FORMAT_DESCRIPTION_EVENT: {
logger.trace("FORMAT_DESCRIPTION_EVENT");
break;
}
case MySQLConstants.TABLE_MAP_EVENT:
//每次ROW_EVENT前都伴随一个TABLE_MAP_EVENT事件,保存一些表信息,如tableId, tableName, databaseName, 而ROW_EVENT只有tableId
{
TableMapEvent tme = (TableMapEvent)event;
TableInfoKeeper.saveTableIdMap(tme);
logger.trace("TABLE_MAP_EVENT:tableId:{}", tme.getTableId());
break;
}
case MySQLConstants.DELETE_ROWS_EVENT: {
DeleteRowsEvent dre = (DeleteRowsEvent)event;
long tableId = dre.getTableId();
logger.trace("DELETE_ROW_EVENT:tableId:{}", tableId);
TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
String databaseName = tableInfo.getDatabaseName();
String tableName = tableInfo.getTableName();
List<Row> rows = dre.getRows();
for (Row row : rows) {
List<Column> before = row.getColumns();
Map<String, String> beforeMap = getMap(before, databaseName, tableName);
if (beforeMap != null && beforeMap.size() > 0) {
LogEvent cdcEvent = new LogEvent(dre, databaseName, tableName);
cdcEvent.setDdl(false);
cdcEvent.setSql(null);
cdcEvent.setBefore(beforeMap);
CDCEventManager.queue.addLast(cdcEvent);
logger.info("cdcEvent:{}", cdcEvent);
}
}
break;
}
case MySQLConstants.UPDATE_ROWS_EVENT:
{
UpdateRowsEvent upe = (UpdateRowsEvent)event;
long tableId = upe.getTableId();
logger.info("UPDATE_ROWS_EVENT:tableId:{}",tableId);
TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
String databaseName = tableInfo.getDatabaseName();
String tableName = tableInfo.getTableName();
List<Pair<Row>> rows = upe.getRows();
for(Pair<Row> p:rows){
List<Column> colsBefore = p.getBefore().getColumns();
List<Column> colsAfter = p.getAfter().getColumns();
Map<String,String> beforeMap = getMap(colsBefore,databaseName,tableName);
Map<String,String> afterMap = getMap(colsAfter,databaseName,tableName);
if(beforeMap!=null && afterMap!=null && beforeMap.size()>0 && afterMap.size()>0){
LogEvent cdcEvent = new LogEvent(upe,databaseName,tableName);
cdcEvent.setDdl(false);
cdcEvent.setSql(null);
cdcEvent.setBefore(beforeMap);
cdcEvent.setAfter(afterMap);
CDCEventManager.queue.addLast(cdcEvent);
logger.info("cdcEvent:{}",cdcEvent);
}
}
break;
}
case MySQLConstants.UPDATE_ROWS_EVENT_V2:
{
UpdateRowsEventV2 upe = (UpdateRowsEventV2)event;
long tableId = upe.getTableId();
logger.info("UPDATE_ROWS_EVENT_V2:tableId:{}",tableId);
TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
String databaseName = tableInfo.getDatabaseName();
String tableName = tableInfo.getTableName();
List<Pair<Row>> rows = upe.getRows();
for(Pair<Row> p:rows){
List<Column> colsBefore = p.getBefore().getColumns();
List<Column> colsAfter = p.getAfter().getColumns();
Map<String,String> beforeMap = getMap(colsBefore,databaseName,tableName);
Map<String,String> afterMap = getMap(colsAfter,databaseName,tableName);
if(beforeMap!=null && afterMap!=null && beforeMap.size()>0 && afterMap.size()>0){
LogEvent cdcEvent = new LogEvent(upe,databaseName,tableName);
cdcEvent.setDdl(false);
cdcEvent.setSql(null);
cdcEvent.setBefore(beforeMap);
cdcEvent.setAfter(afterMap);
CDCEventManager.queue.addLast(cdcEvent);
logger.info("cdcEvent:{}",cdcEvent);
}
}
break;
}
case MySQLConstants.WRITE_ROWS_EVENT :
{
WriteRowsEventV2 wre = (WriteRowsEventV2)event;
long tableId = wre.getTableId();
logger.trace("WRITE_ROWS_EVENT:tableId:{}",tableId);
TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
String databaseName = tableInfo.getDatabaseName();
String tableName = tableInfo.getTableName();
List<Row> rows = wre.getRows();
for(Row row: rows){
List<Column> after = row.getColumns();
Map<String,String> afterMap = getMap(after,databaseName,tableName);
if(afterMap!=null && afterMap.size()>0){
LogEvent cdcEvent = new LogEvent(wre,databaseName,tableName);
cdcEvent.setDdl(false);
cdcEvent.setSql(null);
cdcEvent.setAfter(afterMap);
CDCEventManager.queue.addLast(cdcEvent);
logger.info("cdcEvent:{}",cdcEvent);
}
}
break;
}
case MySQLConstants.WRITE_ROWS_EVENT_V2:
{
WriteRowsEventV2 wre = (WriteRowsEventV2)event;
long tableId = wre.getTableId();
logger.trace("WRITE_ROWS_EVENT:tableId:{}",tableId);
TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
String databaseName = tableInfo.getDatabaseName();
String tableName = tableInfo.getTableName();
List<Row> rows = wre.getRows();
for(Row row: rows){
List<Column> after = row.getColumns();
Map<String,String> afterMap = getMap(after,databaseName,tableName);
if(afterMap!=null && afterMap.size()>0){
LogEvent cdcEvent = new LogEvent(wre,databaseName,tableName);
cdcEvent.setDdl(false);
cdcEvent.setSql(null);
cdcEvent.setAfter(afterMap);
CDCEventManager.queue.addLast(cdcEvent);
logger.info("cdcEvent:{}",cdcEvent);
}
}
break;
}
case MySQLConstants.QUERY_EVENT:
{
QueryEvent qe = (QueryEvent)event;
TableInfo tableInfo = createTableInfo(qe);
if(tableInfo == null){
break;
}
String databaseName = tableInfo.getDatabaseName();
String tableName = tableInfo.getTableName();
logger.trace("QUERY_EVENT:databaseName:{},tableName:{}",databaseName,tableName);
LogEvent cdcEvent = new LogEvent(qe,databaseName,tableName);
cdcEvent.setDdl(true);
cdcEvent.setSql(qe.getSql().toString());
CDCEventManager.queue.addLast(cdcEvent);
logger.info("cdcEvent:{}",cdcEvent);
break;
}
case MySQLConstants.XID_EVENT:{
XidEvent xe = (XidEvent)event;
logger.trace("XID_EVENT: xid:{}",xe.getXid());
break;
}
default:
{
logger.trace("DEFAULT:{}",eventType);
break;
}
}
}
/**
* ROW_EVENT中是没有Column信息的,需要通过MysqlConnection(下面会讲到)的方式读取列名信息,
* 然后跟取回的List<Column>进行映射。
*
* @param cols
* @param databaseName
* @param tableName
* @return
*/
private Map<String,String> getMap(List<Column> cols, String databaseName, String tableName){
Map<String,String> map = new HashMap<>();
if(cols == null || cols.size()==0){
return null;
}
String fullName = databaseName+"."+tableName;
List<ColumnInfo> columnInfoList = TableInfoKeeper.getColumns(fullName);
if(columnInfoList == null)
return null;
if(columnInfoList.size() != cols.size()){
TableInfoKeeper.refreshColumnsMap();
if(columnInfoList.size() != cols.size())
{
logger.warn("columnInfoList.size is not equal to cols.");
return null;
}
}
for(int i=0;i<columnInfoList.size(); i++){
if(cols.get(i).getValue()==null)
map.put(columnInfoList.get(i).getName(),"");
else
map.put(columnInfoList.get(i).getName(), cols.get(i).toString());
}
return map;
}
/**
* 从sql中提取Table信息,因为QUERY_EVENT是对应DATABASE这一级别的,不像ROW_EVENT是对应TABLE这一级别的,
* 所以需要通过从sql中提取TABLE信息,封装到TableInfo对象中
*
* @param qe
* @return
*/
private TableInfo createTableInfo(QueryEvent qe){
String sql = qe.getSql().toString().toLowerCase();
if("begin".equals(sql)){
return null;
}
TableInfo ti = new TableInfo();
String databaseName = qe.getDatabaseName().toString();
String tableName = null;
if(checkFlag(sql,"table")){
tableName = getTableName(sql,"table");
} else if(checkFlag(sql,"truncate")){
tableName = getTableName(sql,"truncate");
} else{
logger.warn("can not find table name from sql:{}",sql);
return null;
}
ti.setDatabaseName(databaseName);
ti.setTableName(tableName);
ti.setFullName(databaseName+"."+tableName);
return ti;
}
private boolean checkFlag(String sql, String flag){
String[] ss = sql.split(" ");
for(String s:ss){
if(s.equals(flag)){
return true;
}
}
return false;
}
private String getTableName(String sql, String flag){
String[] ss = sql.split("\\.");
String tName = null;
if (ss.length > 1) {
String[] strs = ss[1].split(" ");
tName = strs[0];
} else {
String[] strs = sql.split(" ");
boolean start = false;
for (String s : strs) {
if (s.indexOf(flag) >= 0) {
start = true;
continue;
}
if (start && !s.isEmpty()) {
tName = s;
break;
}
}
}
tName.replaceAll("`", "").replaceAll(";", "");
//del "("[create table person(....]
int index = tName.indexOf('(');
if(index>0){
tName = tName.substring(0, index);
}
return tName;
}
}
TableInfo:
public class TableInfo {
private String databaseName;
private String tableName;
private String fullName;
// 省略Getter和Setter
public String getDatabaseName() {
return databaseName;
}
public void setDatabaseName(String databaseName) {
this.databaseName = databaseName;
}
public String getTableName() {
return tableName;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
public String getFullName() {
return fullName;
}
public void setFullName(String fullName) {
this.fullName = fullName;
}
@Override
public boolean equals(Object o) {
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }
TableInfo tableInfo = (TableInfo)o;
return Objects.equals(databaseName, tableInfo.databaseName) &&
Objects.equals(tableName, tableInfo.tableName) &&
Objects.equals(fullName, tableInfo.fullName);
}
@Override
public int hashCode() {
return Objects.hash(databaseName, tableName, fullName);
}
}
TableInfoKeeper:
public class TableInfoKeeper {
private static final Logger logger = LoggerFactory.getLogger(TableInfoKeeper.class);
private static Map<Long, TableInfo> tabledIdMap = new ConcurrentHashMap<>();
private static Map<String, List<ColumnInfo>> columnsMap = new ConcurrentHashMap<>();
static {
columnsMap = MysqlConnection.getColumns();
}
public static void saveTableIdMap(TableMapEvent tme) {
long tableId = tme.getTableId();
tabledIdMap.remove(tableId);
TableInfo table = new TableInfo();
table.setDatabaseName(tme.getDatabaseName().toString());
table.setTableName(tme.getTableName().toString());
table.setFullName(tme.getDatabaseName() + "." + tme.getTableName());
tabledIdMap.put(tableId, table);
}
public static synchronized void refreshColumnsMap() {
Map<String, List<ColumnInfo>> map = MysqlConnection.getColumns();
if (map.size() > 0) {
// logger.warn("refresh and clear cols.");
columnsMap = map;
// logger.warn("refresh and switch cols:{}",map);
} else {
logger.error("refresh columnsMap error.");
}
}
public static TableInfo getTableInfo(long tableId) {
return tabledIdMap.get(tableId);
}
public static List<ColumnInfo> getColumns(String fullName) {
return columnsMap.get(fullName);
}
}
Posted on 十二月 18, 2018
MySQL主从配置-Docker
MySQL单机启动mysql
docker run --name mysqlserver -e MYSQL_ROOT_PASSWORD=123456 -d -i -p 3306:3306 mysql:5.7
进入终端:
docker exec -it 2a7a85124400 /bin/bash mysql -h 127.0.0.1 -u root -p
宿主机目录结构:
主从配置:
Master和Slaver 配置文件
Master: my.cnf
[mysqld] server_id = 1 log-bin= mysql-bin read-only=0 binlog-do-db=order_demo replicate-ignore-db=mysql replicate-ignore-db=sys replicate-ignore-db=information_schema replicate-ignore-db=performance_schema character-set-server=utf8 !includedir /etc/mysql/conf.d/ !includedir /etc/mysql/mysql.conf.d/
Slaver: my.cnf
[mysqld] server_id = 2 log-bin= mysql-bin read-only=1 replicate-do-db=order_demo replicate-ignore-db=mysql replicate-ignore-db=sys replicate-ignore-db=information_schema replicate-ignore-db=performance_schema character-set-server=utf8 !includedir /etc/mysql/conf.d/ !includedir /etc/mysql/mysql.conf.d/
说明: log-bin :需要启用二进制日志 server_id : 用于标识不同的数据库服务器,而且唯一
binlog-do-db : 需要记录到二进制日志的数据库 binlog-ignore-db : 忽略记录二进制日志的数据库 auto-increment-offset :该服务器自增列的初始值 auto-increment-increment :该服务器自增列增量
replicate-do-db :指定复制的数据库 replicate-ignore-db :不复制的数据库 relay_log :从库的中继日志,主库日志写到中继日志,中继日志再重做到从库 log-slave-updates :该从库是否写入二进制日志,如果需要成为多主则可启用。只读可以不需要
如果为多主的话注意设置 auto-increment-offset 和 auto-increment-increment 如上面为双主的设置: 服务器 152 自增列显示为:1,3,5,7,……(offset=1,increment=2) 服务器 153 自增列显示为:2,4,6,8,……(offset=2,increment=2)
1)read_only=1只读模式,不会影响slave同步复制的功能,所以在MySQL slave库中设定了read_only=1后,通过 show slave status\G ,命令查看salve状态,可以看到salve仍然会读取master上的日志,并且在slave库中应用日志,保证主从数据库同步一致;
2)read_only=1只读模式,可以限定普通用户进行数据修改的操作,但不会限定具有super权限的用户的数据修改操作;在MySQL中设置read_only=1后,普通的应用用户进行insert、update、delete等会产生数据变化的DML操作时,都会报出数据库处于只读模式不能发生数据变化的错误,但具有super权限的用户,例如在本地或远程通过root用户登录到数据库,还是可以进行数据变化的DML操作;
2、启动创建主从容器
//创建并启动主从容器;
//master
docker run –name mastermysql -d -p 3307:3306 -e MYSQL_ROOT_PASSWORD=123456 -v /opt/docker/mysql/master/data:/var/lib/mysql -v /opt/docker/mysql/master/conf/my.cnf:/etc/mysql/my.cnf mysql:5.7
//slave
docker run –name slavermysql -d -p 3308:3306 -e MYSQL_ROOT_PASSWORD=123456 -v /opt/docker/mysql/slaver/data:/var/lib/mysql -v /opt/docker/mysql/slaver/conf/my.cnf:/etc/mysql/my.cnf mysql:5.7
这里为了方便查看数据,把Docker的端口都与本机进行了映射,对应的本地master/data文件夹和slaver/data文件夹下也能看到同步的数据库文件
Master和Slaver设置;
//进入master容器
docker exec -it mastermysql bash
//启动mysql命令,刚在创建窗口时我们把密码设置为:anech
mysql -u root -p
//创建一个用户来同步数据
GRANT REPLICATION SLAVE ON *.* to 'backup'@'%' identified by '123456';
//这里表示创建一个slaver同步账号backup,允许访问的IP地址为%,%表示通配符
//例如:192.168.0.%表示192.168.0.0-192.168.0.255的slaver都可以用backup用户登陆到master上
//查看状态,记住File、Position的值,在Slaver中将用到
show master status;
slaver容器:
//进入slaver容器
docker exec -it slavermysql bash
//启动mysql命令,刚在创建窗口时我们把密码设置为:anech
mysql -u root -p
//设置主库链接
change master to master_host='172.17.0.2',master_user='backup',master_password='123456',master_log_file='mysql-bin.000001',master_log_pos=0,master_port=3306;
//启动从库同步
start slave;
//查看状态
show slave status\G;
表示配置成功;
说明:
master_host:主库地址
master_user:主库创建的同步账号
master_password:主库创建的同步密码
master_log_file:主库产生的日志
master_log_pos:主库日志记录偏移量
master_port:主库使用的端口,默认为3306
测试主从是否成功,是否同步!
在master创建数据内容,看slave 是否同步过去,
create database order_demo;
use order_demo;
create table userinfo(username varchar(50),age int);
insert into userinfo values('Tom',18);
select * from userinfo;
java 读写分离操作:
https://www.cnblogs.com/fengwenzhee/p/7193218.html?utm_source=itdadao&utm_medium=referral
https://www.cnblogs.com/xiaoit/p/4599914.html
Posted on 十二月 3, 2018
log4j2配置文件log4j2.xml详解
配置实例:
<!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL --> <!--Configuration后面的status,这个用于设置log4j2自身内部的信息输出,可以不设置,当设置成trace时,你会看到log4j2内部各种详细输出--> <!--monitorInterval:Log4j能够自动检测修改配置 文件和重新配置本身,设置间隔秒数--> <configuration status="WARN" monitorInterval="30"> <!—配置参数 --> <Properties> <Property name="log_dir">logs</Property> <Property name="PATTERN_LAYOUT">%d{yyyy-MM-dd HH:mm:ss} %-5level %class{36} %L %M - %msg%xEx%n</Property> </Properties> <Appenders> <!-- Console日志: 线上删除console, 把此段日志配置删除即可--> <Console name="STDOUT"> <!--输出日志的格式--> <PatternLayout pattern="${PATTERN_LAYOUT}"/> </Console> <!--文件会打印出所有信息,这个log每次运行程序会自动清空,由append属性决定,这个也挺有用的,适合临时测试用--> <File name="log" fileName="log/test.log" append="false"> <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/> </File> <!-- INFO 级别日志 --> <!-- 这个会打印出所有的info及以下级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档--> <RollingFile name="INFO" fileName="${log_dir}/info/info.log" filePattern="${log_dir}/info/info-%d{yyyyMMdd}-%i.log.gz"> <PatternLayout pattern="${PATTERN_LAYOUT}"/> <Filters> <!--如果是error级别拒绝--> <ThresholdFilter level="error" onMatch="DENY" onMismatch="NEUTRAL"/> <ThresholdFilter level="warn" onMatch="DENY" onMismatch="NEUTRAL"/> <!--如果是 debug\info 输出--> </Filters> <Policies> <TimeBasedTriggeringPolicy/> <!--单个文件大小--> <SizeBasedTriggeringPolicy size="500MB"/> </Policies> <!--保存日志文件个数--> <DefaultRolloverStrategy max="10"/> </RollingFile> <!--error级别日志输出--> <RollingFile name="ERROR" fileName="${log_dir}/info/error.log" filePattern="${log_dir}/info/error-%d{yyyyMMdd}-%i.log.gz"> <PatternLayout pattern="${PATTERN_LAYOUT}"/> <Filters> <!--如果是 error 输出--> <ThresholdFilter level="error" onMatch="ACCEPT" onMismatch="DENY"/> </Filters> <Policies> <TimeBasedTriggeringPolicy/> <SizeBasedTriggeringPolicy size="500MB"/> </Policies> <DefaultRolloverStrategy max="10"/> </RollingFile> </Appenders> <Loggers> <!-- Console日志: 线上删除console, 把此段日志配置删除即可--> <Root level="debug"> <AppenderRef ref="STDOUT"/> </Root> <AsyncLogger name="jws.event.rec" level="info" additivity="false"> <AppenderRef ref="eventRecRolling"/> </AsyncLogger> </Loggers> </Configuration>
(1).根节点Configuration有两个属性:status和monitorinterval,有两个子节点:Appenders和Loggers(表明可以定义多个Appender和Logger).
status 属性,这个属性表示log4j2本身的日志信息打印级别。如果把status改为TRACE再执行测试代码,可以看到控制台中打印了一些log4j加载插件、组装logger等调试信息。
monitorinterval用于指定log4j自动重新配置的监测间隔时间,单位是s,最小是5s.
日志级别从低到高分为TRACE < DEBUG < INFO < WARN < ERROR < FATAL,如果设置为WARN,则低于WARN的信息都不会输出。对于Loggers中level的定义同样适用。
上面配置了两种日志打印的方式,打印的等级是info。
(2).Appenders节点,常见的有三种子节点:Console、RollingFile、File.
Console节点用来定义输出到控制台的Appender.
name:指定Appender的名字.
target:SYSTEM_OUT 或 SYSTEM_ERR,一般只设置默认:SYSTEM_OUT.
PatternLayout:输出格式,不设置默认为:%m%n.
File节点用来定义输出到指定位置的文件的Appender.
name:指定Appender的名字.
fileName:指定输出日志的目的文件带全路径的文件名.
PatternLayout:输出格式,不设置默认为:%m%n.
RollingFile节点用来定义超过指定大小自动删除旧的创建新的的Appender.
name:指定Appender的名字.
fileName:指定输出日志的目的文件带全路径的文件名.
PatternLayout:输出格式,不设置默认为:%m%n.
filePattern:指定新建日志文件的名称格式.
Policies:指定滚动日志的策略,就是什么时候进行新建日志文件输出日志.
TimeBasedTriggeringPolicy:Policies子节点,基于时间的滚动策略,interval属性用来指定多久滚动一次,默认是1 hour。modulate=true用来调整时间:比如现在是早上3am,interval是4,那么第一次滚动是在4am,接着是8am,12am…而不是7am.
SizeBasedTriggeringPolicy:Policies子节点,基于指定文件大小的滚动策略,size属性用来定义每个日志文件的大小.
DefaultRolloverStrategy:用来指定同一个文件夹下最多有几个日志文件时开始删除最旧的,创建新的(通过max属性)。
(3).Loggers节点,常见的有两种:Root和Logger.
Root节点用来指定项目的根日志,如果没有单独指定Logger,那么就会默认使用该Root日志输出
level:日志输出级别,共有8个级别,按照从低到高为:All < Trace < Debug < Info < Warn < Error < Fatal < OFF.
AppenderRef:Root的子节点,用来指定该日志输出到哪个Appender.
Logger节点用来单独指定日志的形式,比如要为指定包下的class指定不同的日志级别等。
level:日志输出级别,共有8个级别,按照从低到高为:All < Trace < Debug < Info < Warn < Error < Fatal < OFF.
name:用来指定该Logger所适用的类或者类所在的包全路径,继承自Root节点.
AppenderRef:Logger的子节点,用来指定该日志输出到哪个Appender,如果没有指定,就会默认继承自Root.如果指定了,那么会在指定的这个Appender和Root的Appender中都会输出,此时我们可以设置Logger的additivity="false"只在自定义的Appender中进行输出。
(4).关于日志level.
共有8个级别,按照从低到高为:All < Trace < Debug < Info < Warn < Error < Fatal < OFF.
All:最低等级的,用于打开所有日志记录.
Trace:是追踪,就是程序推进以下,你就可以写个trace输出,所以trace应该会特别多,不过没关系,我们可以设置最低日志级别不让他输出.
Debug:指出细粒度信息事件对调试应用程序是非常有帮助的.
Info:消息在粗粒度级别上突出强调应用程序的运行过程.
Warn:输出警告及warn以下级别的日志.
Error:输出错误信息日志.
Fatal:输出每个严重的错误事件将会导致应用程序的退出的日志.
OFF:最高等级的,用于关闭所有日志记录.
程序会打印高于或等于所设置级别的日志,设置的日志等级越高,打印出来的日志就越少。
Updated on 十一月 23, 2018
java 日志框架详解-干货
一、 日志的重要性
对于我们开发人员来说,日志记录往往不被重视。在生产环境中,日志是查找问题来源的重要依据。日志可记录程序运行时产生的错误信息、状态信息、调试信息和执行时间信息等多种多样的信息。可以在程序运行出现错误时,快速地定位潜在的问题源。目前常用的日志系统有java.util.logging、commons logging、slf4j、log4j1.x、logback、log4j2.x 等若干种。
二、 Java常用日志框架历史
1996年早期,欧洲安全电子市场项目组决定编写它自己的程序跟踪API(Tracing API)。经过不断的完善,这个API终于成为一个十分受欢迎的Java日志软件包,即log4j。后来log4j成为Apache基金会项目中的一员。
期间log4j近乎成了Java社区的日志标准。据说Apache基金会还曾经建议Sun引入log4j到java的标准库中,但Sun拒绝了。
2002年Java1.4发布,Sun推出了自己的日志库jul(java util logging),其实现基本模仿了log4j的实现。在JUL出来以前,log4j就已经成为一项成熟的技术,使得log4j在选择上占据了一定的优势。
接着,Apache推出了jakarta commons logging,jcl只是定义了一套日志接口(其内部也提供一个simple log的简单实现),支持运行时动态加载日志组件的实现,也就是说,在你的应用代码里,只需调用commons logging的接口,底层实现可以是log4j,也可以是java util logging。
后来(2006年),Ceki Gülcü不适应Apache的工作方式,离开了Apache。然后先后创建了slf4j(日志门面接口,类似于commons logging)和logback(slf4j的实现)两个项目,并回瑞典创建了QOS公司,QOS官网上是这样描述logback的:The Generic,Reliable Fast&Flexible Logging Framework(一个通用,可靠,快速且灵活的日志框架)。
现今,Java日志领域被划分为两大阵营:commons logging阵营和slf4j阵营。
commons logging在Apache大树的笼罩下,有很大的用户基数。但有证据表明,形式正在发生变化。2013年底有人分析了GitHub上30000个项目,统计出了最流行的100个Libraries,可以看出slf4j的发展趋势更好。如下图1所示。
图1
Apache眼看有被logback反超的势头,于2012年7月重写了log4j 1.x,成立了新的项目log4j2.x。log4j2在各个方面都与logback非常相似。
Java的logger世界
Commons logging
Apache的commons项目,一个很薄的logging抽象层,制定了使用log的相关接口和规范,可以由不同的logging implementations,[http://commons.apache.org/proper/commons-logging/]
SLF4J
Simple Logging Facade for Java, 也是一个logging抽象层,底层logging框架可以是(e.g. java.util.logging, logback, log4j),是Commons logging的替代物, [http://www.slf4j.org/]
jcl-over-slf4j
提供Commons-logging向slf4j迁移用的bridge, 可参考 [http://www.slf4j.org/legacy.html]
slf4j-log4j
前面说了,SLF4J是个facade,log4j是其实现的一种框架,抽象成接口,具体的绿叶可以是Log4j/Log4j2/LockBack,当使用log4j时,需要此Jar包作为桥接,可参考 [http://www.slf4j.org/legacy.html]
log4j
这个不用多说,大家使用最多的是1.x版本, [http://logging.apache.org/log4j/1.2/], 不过好像2.x也出来了,据说采用了异步机制,性能有很大提升 [http://logging.apache.org/log4j/2.x/]
logback
原生实现了SLF4J API,所以不需要中间的bridge,号称是log4j终结者,下一代logging框架,性能比log4j有很大提升,推荐使用,况且现在我们已经在使用SLF4j,所以切换过去应该是很方便的事。[http://logback.qos.ch/]
总的来说:slf4j与commons-logging只是一个日志门面,实际还是要依赖真正的日志库log4j,虽然slf4j和commons-loggins自带了日志库,但是毕竟log4j才是最强大的。
至于Logback是由log4j创始人设计的另一个开源日志组件,是用来取代log4j,
取代的理由自行百度;
https://blog.csdn.net/zbajie001/article/details/79596109
jul日志
这个是SUN公司自带的日志输出框架,本来Log4j有建议过加入SUN JDK框架,但是SUN不要人家,后台就出了这个框架,但是比较XX,所以很少人使用;
@Test public void test() throws IOException { Logger logger = Logger.getLogger(""); logger.info("Hola JDK!"); }
这就是jul日志。
commons-logging日志
common logging本身不是log,你可以把它看做是一个日志的接口,而log4j就是日志的实现,它自身只是实现了简单的日志实现类.
使用很简单:
commons-logging的使用非常简单。首先,需要在pom.xml文件中添加依赖:
<dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.2</version> </dependency>
声明测试代码:
public class commons_loggingDemo { Log log= LogFactory.getLog(commons_loggingDemo.class); @Test public void test() throws IOException { log.debug("Debug info."); log.info("Info info"); log.warn("Warn info"); log.error("Error info"); log.fatal("Fatal info"); } }
接下来,在classpath下定义配置文件:commons-logging.properties:
#指定日志对象: org.apache.commons.logging.Log = org.apache.commons.logging.impl.Jdk14Logger #指定日志工厂: org.apache.commons.logging.LogFactory = org.apache.commons.logging.impl.LogFactoryImpl
如果只单纯的依赖了commons-logging,那么默认使用的日志对象就是Jdk14Logger,默认使用的日志工厂就是LogFactoryImpl
commons-logging + Log4j使用:
去掉commons-logging.properties 配置文件:
因为commons-logging
1) 首先在classpath下寻找自己的配置文件commons-logging.properties,如果找到,则使用其中定义的Log实现类;
2) 如果找不到commons-logging.properties文件,则在查找是否已定义系统环境变量org.apache.commons.logging.Log,找到则使用其定义的Log实现类;
3) 否则,查看classpath中是否有Log4j的包,如果发现,则自动使用Log4j作为日志实现类;
4) 否则,使用JDK自身的日志实现类(JDK1.4以后才有日志实现类);
5) 否则,使用commons-logging自己提供的一个简单的日志实现类SimpleLog;
需要在pom.xml文件中添加依赖:
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
接下来,在classpath下定义配置文件:log4j.properties
# Logger root # \u6ce8\u610f\uff1a\u7ebf\u4e0a\u7cfb\u7edf\uff0c\u9700\u628aconsole\u5220\u9664 log4j.rootLogger=INFO ,console log4j.logger.org.springframework=info,console # \u6253\u5370\u5230Console\u7684\u65e5\u5fd7\uff0c\u6ce8\u610f\uff1a\u7ebf\u4e0a\u7cfb\u7edf\u9700\u8981\u5c06\u8be5\u6bb5\u65e5\u5fd7\u914d\u7f6e\u5220\u9664 #### First appender writes to console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%-4p,%t,%d{MM-dd HH:mm:ss.SSS},%c{2}.%M:%L - %m%n
Log4j日志框架
log4j是Apache的一个开放源代码的项目,通过使用log4j,我们可以控制日志信息输送的目的地, 日志的输出格式, 日志信息的级别,可以通过一个配置文件来灵活地进行配置,而不需要修改应用的代码
如果在我们系统中单独使用log4j的话,我们只需要引入log4j的核心包就可以了
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
public class Log4jTest { private static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(Log4jTest.class); public static void main(String[] args) { logger.info("hello word"); } }
在系统的src目录下添加依赖的配置文件:
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n
Log4j 的代码结构:
Log4j启动过程:
1.找到对应的配置文件,虽然log4j支持多种配置文件,在平时使用中多数使用log4j.xml格式,该文件位置应该放在classpath下面,在log4j启动时候会去clasPath下面找寻log4j.xml。
2.解析xml,根据配置生成对应名字的logger以及绑定该logger应用的appender。
log4j日志输出关键类图
logger组件:logger的父类category完成基本所有的log功能,其中实现的接口appenderAttachable用于存储与该logger绑定的appender。logger主要用于管理日志level,确定是否需要打出日志,即调用appender。其中每个category中都会有自己父亲的引用,当additivity参数为true的时候(默认为true),则会在自己appender输出信息后,调用父category去输出日志。
appender组件:appender是具体message输出的组件,管理信息输出的位置和格式。
Log4j获取特定名字logger:
org.apache.log4j.Logger log4j = or.apache.log4h.Logger.getLogger(Name.class)
logger内部的存储是由Hierarchy来完成,在启动过程中会初始化所有log4j.xml中定义的logger。如果logger中不存在该名字的logger则会新生成一个新的logger,由于该名字的logger不存在配置文件中,所以会根据名字规则寻找其父亲logger,如果找不到则会以rootLogger为父亲。所以如果没有配置该类名对应的logger则会调用其父亲logger来输出日志。
Log4j 的详细配置:
#Access log log4j.appender.A=org.apache.log4j.DailyRollingFileAppender log4j.additivity.A = false log4j.appender.A.File=${catalina.base}/logs/access.log log4j.appender.A.layout=org.apache.log4j.PatternLayout log4j.appender.A.layout.ConversionPattern=%-4p,%t,%d{MM-dd HH:mm:ss.SSS},%c{2}.%M:%L – %m%n log4j.appender.A.DatePattern='_' yyyy-MM-dd log4j.appender.A.append=true log4j.appender.A.ImmediateFlush=true log4j.appender.A.Threshold = INFO |
https://blog.csdn.net/earthchinagl/article/details/70256527
高级配置:
https://www.cnblogs.com/dengjiahai/p/4608946.html
https://www.cnblogs.com/leefreeman/p/3610459.html
使用坑:
在高并发时候会有性能问题:
http://zl378837964.iteye.com/blog/2373591
解决的方式:
1.规避:
1)排查代码是否写入了大量日志,删除非必要日志
2)简化日志序列,尽量不出现日志嵌套(日志B打印调用了日志D)
2.解决:
1)使用log4j异步写AsyncAppender
2)升级到log4j 2 –> 建议
3)使用logback替换log4j –> 建议
4)补丁:使用可重入锁替换synchronized
slf4j日志
一个新的日志框架横空出世了:slf4j
, 这个框架由log4j的作者开发并且后台出了一个性能更加好的框架logback;
哪有人会问?为什么还需要slf4j了???
https://www.oschina.net/translate/why-use-sl4j-over-log4j-for-logging
slf4j是一个日志统一的框架,主要是为了接入不同日志系统建立的统一包装层的框架。其他具体实现日志的系统只需要实现slf4j的一些特定规则则可以接入slf4j使用。
上图中可以看出,slf4j对外提供了统一的api,这由slf4j-api.jar包提供。另外如果需要接入log4j,则需要在api和具体框架中加入一个适配层,实现slf4j和log4j接口的适配,这个由包slf4j-log4j12实现。
有几个包需要区别一下:
log4j-slf4j-impl 是 slf4j 和 logj4 2 的 Binding,而 slf4j-log4j12 是 slf4j 和 log4j 1.2 的 Binding,jcl-over-slf4j 是common-logggin 适配 slf4j。
log4j-over-slf4j 是把Log4j适配到slf4j,比如有些系统使用logback+slef4j,有一个第三方jar依赖于log4j打印日志,就需要这个包了
log4j-slf4j-impl 是用于log4j2与slf4j 的桥接用的;
slf4j + Log4j
使用过程:
private org.slf4j.Logger log= LoggerFactory.getLogger(SlfloggingDemo.class);
@Test
public void test() throws IOException {
log.debug("Debug info.");
log.info("Info info");
log.warn("Warn info");
log.error("Error info");
}
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
slf4j
底层实现方式:
1.slf4j的loggerfactory在getlogger()的过程中会检查slf4j-LoggerFactory是否初始化没有,其中performInitialization调用会去适配具体的日志框架。(在slf4j-api层)
2.slf4j-LoggerFactory未初始化,则会调用当前classloader去寻找“org/slf4j/impl/StaticLoggerBinder.class”类,并创建该类。所以需要适配slf4j的日志框架都需要实现该类。(在slf4j-api层)
3. 调用对应的staticLoggerBinder会初始化log4j。并生成新的log4j12-log4jloggerFactory。该类在返回logger的时候会将log4j返回的logger在装饰一层即log4jLoggerAdapter,用于适配slf4j的logger接口。(在slf4j-log4j12)
注意:当有多个日志框架的时候,在找寻“org/slf4j/impl/StaticLoggerBinder.class”会出现多个,这时候加载具体那个StaticLoggerBinder.class则会由jvm来决定,从而加载了对应的日志框架。如果不想出现这个问题,则应该保证classPath下只有一个该类。
有些系统你残留了一些另外的日志框架比如apache commons-logging ( 简称 jcl)的接口, 则当引入这个二方包的时候,由于原本自己并不支持jcl接口,或者想将jcl接口最后输入的日志系统为log4j。则需要引入jcl的桥接工具 jcl-over-slf4j,该包功能代码很少,核心为将jcl的接口调用适配成slf4j接口的调用。这样即可让jcl接口的二方库和自己共用一个日志框架。
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>1.7.13</version>
<scope>runtime</scope>
</dependency>
总结:在使用日志系统中,通常在工作的系统上,主要即为log4j作为具体的日志系统的实现,然后将slf4j作为日志系统的抽象层,这样使得应用和具体的日志系统解耦。
LogBack日志
LogBack和Log4j都是开源日记工具库,LogBack是Log4j的改良版本,比Log4j拥有更多的特性,同时也带来很大性能提升。LogBack官方建议配合Slf4j使用,这样可以灵活地替换底层日志框架。 为了优化log4j,以及更大性能的提升,Apache基金会已经着手开发了log4j 2.0
LogBack被分为3个组件,logback-core, logback-classic 和 logback-access。
logback-core:提供了LogBack的核心功能,是另外两个组件的基础。
logback-classic:实现了Slf4j的API,所以当想配合Slf4j使用时,需要将logback-classic加入classpath。
logback-access:是为了集成Servlet环境而准备的,可提供HTTP-access的日志接口。
需要在pom.xml文件中添加依赖:
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.logback-extensions</groupId>
<artifactId>logback-ext-spring</artifactId>
<version>0.1.2</version>
</dependency>
接下来,在classpath下定义配置文件:logback.xml
<?xml version="1.0" encoding="UTF-8"?>
|
https://blog.csdn.net/zzzgd_666/article/details/80458444
logback 使用的坑:
1.packagingData="false" 当此属性设置为true时,logback可以包含它输出的堆栈跟踪行的每一行的打包数据,很影响性能,建议线上不能开启
2.配置中有个<discardingThreshold>0</discardingThreshold>
这个很表示不丢日志,并且底层是通过
//class : ch.qos.logback.core.AsyncAppenderBase 的 append方法 BlockingQueue<E> blockingQueue; @Override protected void append(E eventObject) { if (isQueueBelowDiscardingThreshold() && isDiscardable(eventObject)) { return; } preprocess(eventObject); put(eventObject); } private void put(E eventObject) { try { blockingQueue.put(eventObject); } catch (InterruptedException e) { } }
从代码中可以看出来虽然是异步的,但是把日志塞进队列中用的是put方法.是会block的,直到队列空出位置来,所以在配置上可以把队列配置大一掉;建议还是升级采用异步方式
采用log4j2 所以就出现 log4j2 这个异步框架了。
Log4j 与 logback的性能对比:
https://my.oschina.net/OutOfMemory/blog/789267
Log4j2.0日志
Log4j2.0基于LMAX Disruptor的异步日志在多线程环境下性能会远远优于Log4j 1.x和logback(官方数据是10倍以上)。我想日后logback 也会优化成异步模式的,具体看官方公告。。。。
https://www.jianshu.com/p/570b406bddcd
具体搭建:
需要在pom.xml文件中添加依赖:
<!--log4j-2模式-->
|
<?xml version="1.0" encoding="UTF-8"?> <Configuration status="ERROR"> <Properties> <Property name="baseDir">D:\eclipse-workspace\logs</Property> <Property name="filename">D:\eclipse-workspace\logs/info.log</Property> <Property name="filenameError">D:\eclipse-workspace\logs/error.log</Property> </Properties> <Appenders> <Console name="STDOUT"> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %l - %msg%n"/> </Console> <RollingFile name="RollingFile" fileName="${filename}" filePattern="${baseDir}/${date:yyyy-MM}/info-%d{yyyy-MM-dd-HH-mm}.log.gz"> <PatternLayout pattern="%d %-5level [%t]%l - %msg%n"/> <Policies> <SizeBasedTriggeringPolicy size="200 MB"/> <TimeBasedTriggeringPolicy interval="10" modulate="true"/> </Policies> <ThresholdFilter level="ERROR" onMatch="DENY" onMismatch="ACCEPT"/> <!--自动删除超过120天的日志压缩文件--> <DefaultRolloverStrategy> <Delete basePath="${baseDir}" maxDepth="2"> <IfFileName glob="*/info-*.log.gz"/> <IfLastModified age="20d"/> </Delete> </DefaultRolloverStrategy> </RollingFile> <!--错误日志入文件--> <RollingFile name="RollingFileError" fileName="${filenameError}" filePattern="${baseDir}/${date:yyyy-MM}/error-%d{yyyy-MM-dd-HH}.log"> <PatternLayout pattern="%d %-5level [%t]%l - %msg%n"/> <Policies> <SizeBasedTriggeringPolicy size="200 MB"/> <TimeBasedTriggeringPolicy interval="24" modulate="true"/> </Policies> <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/> <!--自动删除超过120天的日志压缩文件--> <DefaultRolloverStrategy> <Delete basePath="${baseDir}" maxDepth="2"> <IfFileName glob="*/error-*.log"/> <IfLastModified age="30d"/> </Delete> </DefaultRolloverStrategy> </RollingFile> </Appenders> <Loggers> <!--采用异步输出日志--> <AsyncLogger name="com.fulihui.sharding.jdbc" level="debug" additivity="false"> <!--写入info级别--> <AppenderRef ref="RollingFile" /> <!--写入error级别--> <AppenderRef ref="RollingFileError" level="error"/> <AppenderRef ref="STDOUT"/> </AsyncLogger> <!--采用异步输出日志--> <AsyncRoot level="debug"> <AppenderRef ref="STDOUT"/> </AsyncRoot> </Loggers> </Configuration>
配置详情:
https://blog.csdn.net/u013269532/article/details/53186526
https://blog.csdn.net/scherrer/article/details/73744392
logback log4j log4j2 性能实测
https://blog.souche.com/logback-log4j-log4j2shi-ce/
Updated on 十月 8, 2018
快捷键 Eclipse VS Idea
分类 |
功能点 |
Eclipse快捷键 |
IDEA快捷键 |
搜索 |
搜索文本 |
Ctrl + F |
Ctrl + F Ctrl + R 查找替换 Alt + P/A 逐个/全部替换 Alt + F3 查找当前选中词 |
继续搜索 |
Ctrl + K 向前 Ctrl + Shift + K 向后 |
F3 Shift + F3 |
|
搜索方法 |
Ctrl + O |
Ctrl + F12 |
|
显示类的所有防范 | Alt + 7 | ||
搜索类 |
Ctrl + Shift + T |
Ctrl + N |
|
搜索所有文件 | Ctrl + Shift + R | ||
搜索文件 |
Ctrl + Shift + T |
Ctrl + Shift + N 这两个都支持简单的正则表达式,还支持直接按大写字母的缩略,例如: 查找JsonTranscoder,只需要输入JT |
|
搜索所有引用处 |
Ctrl + Alt + H |
Alt + F7 |
|
搜索所有文本出现的位置 |
Ctrl + H |
Ctrl + Shift + F |
|
编辑 |
自动代码补全 |
Alt + / |
Ctrl + J |
自动代码生成 |
Alt + Insert |
||
快速修复错误 |
Ctrl + 1 |
Alt + Enter |
|
删除当前行 |
Ctrl + D |
Ctrl + X |
|
复制到下一行 |
Ctrl + D |
||
注释/取消注释 |
Ctrl + / |
Ctrl + / |
|
选中当前字 |
Ctrl + W
|
||
补全当前行 |
Ctrl + Shift + Enter 神器,补全当前行,最常用的场景时补全当前行后的;号,并将光标定位到下一行 |
||
调出最近复制的N份内容 |
Ctrl + Shift + V |
||
查看最近编辑的文件 |
Ctrl + E |
||
对比最近修改 |
Alt + Shift + C |
||
格式化代码 |
Ctrl + Shift + F |
Ctrl + Alt + L |
|
整理import |
Ctrl + Shift + O |
Ctrl + Alt + O |
|
跳转 |
显示方法层次 |
Ctrl + Shift + H |
|
显示类、方法说明 |
F2 |
Ctrl + Q |
|
跳到方法定义处 |
Ctrl + B |
||
跳到方法实现处 |
Ctrl + Alt + B |
||
跳到上/下一方法 |
Alt + Up/Down |
||
上/下一查看处 |
Alt + <- Alt + -> |
Ctrl + Alt + Up/Down |
|
跳到指定行 |
Ctrl + L |
Ctrl + G |
|
重构 |
改名 |
Alt + Shift + R |
Shift + F6 |
其他常用 |
Ctrl + F6 修改方法签名 Ctrl + Shift + F6 修改参数的类型 Ctrl + Shift + V引入局部变量 Ctrl + Shift + P 引入参数 Ctrl + Shift + F 引入类变量 Ctrl + Shift + M 引入方法 Ctrl + Shift + C 引入常量 |
||
运行 |
启动调试 |
Alt + Shift + F9 |
|
启动运行 |
Alt + Shift + F10 |
||
单步进入 |
F5 |
F7 |
|
单步跳过 |
F6 |
F8 |
|
跳过 |
F8 |
F9 |
|
执行选中语句 |
Alt + F8 |
||
窗口 |
调出界面 |
Ctrl + Alt + S调出Settings界面 Ctrl + Alt + Shift + S调出项目Setting界面 |
|
关闭界面 |
Ctrl + F4 或 ESC |
||
打开窗口 |
Alt + 窗口编号(例如项目窗口编号是1) |
||
最大化窗口 |
Ctrl + M |
Ctrl + Shift + F12 |
|
隐藏窗口 |
Shift + ESC |
||
关闭当前文件 |
Ctrl + F4 |
||
垂直分屏 |
Ctrl + | (自定义的) |
||
调整窗口位置 |
Ctrl + M 将当前光标处显示到屏幕中央 |
||
切换窗口 |
Ctrl + Tab |
Posted on 九月 17, 2018
使用Nginx+Lua实现Web项目的灰度发布
需求:
领导对时间要求紧迫、研发对现有系统摸不透、做到数据的兼容性,基于这样的要求就必须做到系统上线采用灰度的方式,指定忠实用户进行线上测试、选取有特征的群体进行线上测试和基于流量切换的方式进行线上测试等。
常规的部署做法:
常规的部署方式是采用Nginx的 upstream 配置来简单的实现新旧机器的切换, 在开发过程中,开发完成,完成测试阶段,修复bug后都要重启后台服务,测试又在测试,每次重启都要一两分钟,平凡的重启,测试不干了;所以想到就是部署两台服务器;用nginx upstream 模块实现 无感知部署,发现一个bug,修复;直接部署不会打断测试;常用的是一台部署完毕之后部署另外一台机器;
灰度发布概述:
灰度发布,简单来说,就是根据各种条件,让一部分用户使用旧版本,另一部分用户使用新版本。
灰度发布是指在黑与白之间,能够平滑过渡的一种发布方式。AB test就是一种灰度发布方式,让一部分用户继续用A,一部分用户开始用B,如果用户对B没有什么反对意见,那么逐步扩大范围,把所有用户都迁移到B上面 来。灰度发布可以保证整体系统的稳定,在初始灰度的时候就可以发现、调整问题,以保证其影响度。
灰度部署还可以根据设定的规则将请求路由到我们的灰度版本(灰度机器)上来。比如对于API来说,一般有如下几个需求:特定用户(比如测试帐号)、 特定的App(比如测试app或者合作App)、特定的模块、接口(只有某些接口需要灰度,这种一般是API Container的修改,拿一些不是很重要的API做灰度测试)、特定的机器(某些请求IP转发到灰度机)等。
本章只是简单的简述灰度部署的实现思路:
这里我们所做的灰度发布稍作改变:用1-2台机器作为B,B测试成功再部署A。用于WEB系统新代码的测试发布,让一部分(IP)用户访问新版本,一部分用户仍然访问正常版本,原理如图:
执行过程:
1、当用户请求到达前端web(代理)服务器Openresty,内嵌的lua模块解析Nginx配置文件中的lua脚本代码;
2、Lua获取客户端IP地址,去查询Redis中是否有该键值,如果有返回值执行@clien2,否则执行@client1。
3、Location @client2把请求转发给预发布服务器,location @client1把请求转发给生产服务器,服务器返回结果,整个过程完成。
Lua 的好处并不至于这个,可以使用LUA语言是实现一些业务上的负载,比如热点分离; 热点数据的自动降级机制;
Lua教程: https://www.runoob.com/lua/lua-tutorial.html
案例实现:
1. 安装部署OpenResty:
OpenResty由Nginx核心加很多第三方模块组成,默认集成了Lua开发环境,使得Nginx可以作为一个Web Server使用。
借助于Nginx的事件驱动模型和非阻塞IO,可以实现高性能的Web应用程序。
而且OpenResty提供了大量组件如Mysql、Redis、Memcached等等,使在Nginx上开发Web应用更方便更简单。
1、部署第一个nginx,作为应用层nginx(192.168.1.104那个机器上)
1、 创建目录/usr/servers,以后我们把所有软件安装在此目录
mkdir -p /usr/servers
cd /usr/servers/
2、 安装依赖(我的环境是centos,可以使用如下命令安装,其他的可以参考openresty安装步骤)
yum install -y readline-devel pcre-devel openssl-devel gcc
3、 下载ngx_openresty-xxx.tar.gz并解压(ngx_openresty-xxx/bundle目录里存nginx核心和很多第三方模块,比如有我们需要的Lua和LuaJIT。)
wget https://openresty.org/download/ngx_openresty-1.9.7.1.tar.gz
tar xvf ngx_openresty-1.9.7.1.tar.gz
cd ngx_openresty-1.9.7.1
2. 安装LuaJIT
cd bundle/LuaJIT-2.1-20151219/
make clean && make && make install
ln -sf luajit-2.1.0-alpha /usr/local/bin/luajit
下载ngx_cache_purge模块,该模块用于清理nginx缓存
root@user:/usr/servers/ngx_openresty-1.9.7.1/bundle# wget https://github.com/FRiCKLE/ngx_cache_purge/archive/2.3.tar.gz
root@user:/usr/servers/ngx_openresty-1.9.7.1/bundle# tar -xvf 2.3.tar.gz
下载nginx_upstream_check_module模块,该模块用于ustream健康检查
root@user:/usr/servers/ngx_openresty-1.9.7.1/bundle#
wget https://github.com/yaoweibin/nginx_upstream_check_module/archive/v0.3.0.tar.gz
root@user:/usr/servers/ngx_openresty-1.9.7.1/bundle# tar -xvf v0.3.0.tar.gz
安装ngx_openresty
root@user:/usr/servers/ngx_openresty-1.9.7.1/bundle# cd ..
root@user:/usr/servers/ngx_openresty-1.9.7.1# ./configure –prefix=/usr/servers –with-http_realip_module –with-pcre –with-luajit –add-module=./bundle/ngx_cache_purge-2.3/ –add-module=./bundle/nginx_upstream_check_module-0.3.0/ -j2
root@user:/usr/servers/ngx_openresty-1.9.7.1# make && make install
参数说明:
–with*** 安装一些内置/集成的模块
–with-http_realip_module 取用户真实ip模块
-with-pcre Perl兼容的达式模块
–with-luajit 集成luajit模块
–add-module 添加自定义的第三方模块,如此次的ngx_che_purge
到/usr/servers目录下用ll命令查看,会发现多出来了如下目录,说明安装成功
root@user:/usr/servers/ngx_openresty-1.9.7.1# cd /usr/servers/
root@user:/usr/servers# ll
说明:
/usr/servers/luajit :luajit环境,luajit类似于java的jit,即即时编译,lua是一种解释语言,通过luajit可以即时编译lua代码到机器代码,得到很好的性能;
/usr/servers/lualib:要使用的lua库,里边提供了一些默认的lua库,如redis,json库等,也可以把一些自己开发的或第三方的放在这;
/usr/servers/nginx :安装的nginx,通过/usr/servers/nginx/sbin/nginx -V 查看nginx版本和安装的模块
启动nginx
root@user:/usr/servers# /usr/servers/nginx/sbin/nginx
检测配置是否正确(需要先切换到root用户):
root@user:/usr/servers# /usr/servers/nginx/sbin/nginx -t
重启nginx:
root@user:/usr/servers# /usr/servers/nginx/sbin/nginx -s reload
LUA环境测试:
1、 为了方便开发我们在/usr/servers/nginx/conf目录下创建一个lua.conf
root@user:/home/user# cd /usr/servers/nginx/conf
root@user:/usr/servers/nginx/conf# vim lua.conf
server {
listen 80;
server_name _;
#HelloWorld
location /lua {
default_type 'text/html';
content_by_lua 'ngx.say("hello world")';
}
}
编辑nginx.conf配置文件
vim /usr/servers/nginx/conf/nginx.conf
在http部分添加如下配置
lua_package_path "/usr/servers/lualib/?.lua;;"; #lua 模块
lua_package_cpath "/usr/servers/lualib/?.so;;"; #c模块
include lua.conf; #单独lua配置
重启nginx
/usr/servers/nginx/sbin/nginx -s reload
访问如http://192.168.1.104/lua(自己的机器根据实际情况换ip),可以看到如下内容
hello world
说明配置成功。
灰度部署测试:
采用redis 方式;比如 192.168.0.101这个IP采用的是访问服务器的项目资源,其他IP是访问旧项目的资源;进行测试完毕并且完成之后,可以切换正式环境;
默认情况下lua_code_cache 是开启的,即缓存lua代码,即每次lua代码变更必须reload nginx才生效,如果在开发阶段可以通过lua_code_cache off;关闭缓存,这样调试时每次修改lua代码不需要reload nginx;但是正式环境一定记得开启缓存
开启后reload nginx会看到如下报警
nginx: [alert] lua_code_cache is off; this will hurt performance ******;
配置文件:
Nginx.conf:
#user nobody; worker_processes 1; #error_log logs/error.log; #error_log logs/error.log notice; #error_log logs/error.log info; #pid logs/nginx.pid; events { worker_connections 1024; } http { include mime.types; default_type application/octet-stream; lua_package_path "/usr/servers/lualib/?.lua;;"; #lua 模块 lua_package_cpath "/usr/servers/lualib/?.so;;"; #c模块 log_format main '$remote_addr - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for"'; access_log logs/access.log main; sendfile on; #tcp_nopush on; #keepalive_timeout 0; keepalive_timeout 65; #gzip on; proxy_buffering off; proxy_set_header Host $host; proxy_set_header X-real-ip $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; upstream productServer { server 127.0.0.1:8080 weight=1 max_fails=3 fail_timeout=100s; #模拟生产服务器 } upstream preServer { server 127.0.0.1:8081 weight=1 max_fails=3 fail_timeout=100s; #模拟预发布服务器 } server { listen 80; server_name localhost; #charset koi8-r; #access_log logs/host.access.log main; location / { default_type 'text/html'; lua_code_cache off; content_by_lua_file /usr/servers/nginx/conf/huidu.lua; } #error_page 404 /404.html; # redirect server error pages to the static page /50x.html # error_page 500 502 503 504 /50x.html; location = /50x.html { root html; } location @productServer{ proxy_pass http://productServer; } location @preServer{ proxy_pass http://preServer; } } }
huidu.lua:
local redis = require "resty.redis" local cache = redis.new() cache:set_timeout(60000) local ok, err = cache.connect(cache, '127.0.0.1', 6379) if not ok then ngx.say("failed to connect:", err) return end --local red, err = cache:auth("foobared") --if not red then --ngx.say("failed to authenticate: ", err) --return --end local local_ip = ngx.req.get_headers()["X-Real-IP"] if local_ip == nil then local_ip = ngx.req.get_headers()["x_forwarded_for"] end if local_ip == nil then local_ip = ngx.var.remote_addr end --ngx.say("local_ip is : ", local_ip) local intercept = cache:get(local_ip) if intercept == local_ip then ngx.exec("@preServer") return end ngx.exec("@productServer") local ok, err = cache:close() if not ok then ngx.say("failed to close:", err) return end
上面的代码是简单的IP相等,可以采用IP段形式;
上面的例子测试:
额外参考资料: