....................................../////.===Shadow-Here===./////................................................ > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < ------------------------------------------------------------------------------------------------------------------- /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// RIFF¤ WEBPVP8 ˜ ðÑ *ôô>‘HŸK¥¤"§£±¨àð enü¹%½_F‘åè¿2ºQú³íªú`N¿­3ÿƒügµJžaÿ¯ÿ°~¼ÎùnúîÞÖô•òíôÁÉß®Sm¥Ü/ ‡ó˜f£Ùà<˜„xëJ¢Ù€SO3x<ªÔ©4¿+ç¶A`q@Ì“Úñè™ÍÿJÌ´ª-˜ÆtÊÛL]Ïq*‘Ý”ì#ŸÌÏãY]@ê`¿ /ªfkØB4·®£ó z—Üw¥Pxù–ÞLШKÇN¾AkÙTf½è'‰g gÆv›Øuh~ a˜Z— ïj*á¥t d£“uÒ ¨`K˜¹ßþ]b>˜]_ÏÔ6W—è2r4x•íÖ…"ƒÖNîä!¦å Ú}ýxGøÌ —@ ;ÆÚŠ=ɾ1ý8lªË¥ô ^yf®Œ¢u&2©nÙÇ›ñÂñŒ³ aPo['½»øFùà­+4ê“$!lövlüÞ=;N®3ð‚õ›DÉKòÞ>ÄÍ ¥ˆuߤ#ˆ$6ù™¥îЇy’ÍB¼ çxÛ;X"WL£R÷͝*ó-¶Zu}º.s¸sšXqù–DþÿvªhüïwyŸ ¯é³lÀ:KCûÄ£Ëá\…­ ~—ýóî ¼ûûÜTÓüÇy…ŽÆvc»¾×U ñ¸žþоP÷¦ó:Ò¨¨5;Ð#&#ÖúñläÿÁœ GxÉ­/ñ‡áQðìYÉtÒw޼GÔ´zàÒò ð*ëzƒ•4~H]Ø‹f ñÓÈñ`NåWçs'ÆÏW^ø¹!XžµmQ5ÃËoLœÎ: ÞËÍ¥J ù…î èo£ßPÎñ¶ž8.Œ]ʵ~5›ÙË-ù*8ÙÖß±~ ©¹rÓê‚j¶d¸{^Q'˜±Crß ÚH—#¥¥QlÀ×ëã‡DÜ«èî þ&Çæžî;ŽÏºò6ÒLÃXy&ZŒ'j‚¢Ù€IßÚù+–MGi‰*jE€‘JcÜ ÓÌ EÏÚj]o˜ Þr <¾U ûŪæÍ/šÝH¥˜b”¼ ÁñßX GP›ï2›4WŠÏà×£…íÓk†¦H·ÅíMh–*nó÷à]ÁjCº€b7<ب‹¨5車bp2:Á[UªM„QŒçiNMa#<5›áËó¸HýÊ"…×Éw¹¦ì2º–x<›»a±¸3Weü®FÝ⑱ö–î–³|LPÈ~çð~Çå‡|º kD¢µÏàÆAI %1À% ¹Ò – ”ϝS¦‰4&¶£°à Öý”û_Ò Áw°A«Å€?mÇÛgHÉ/8)á¾ÛìáöŽP í¨PŸNÙµº¦‡§Ùš"ÿ«>+ªÕ`Ê÷‡‚ß Õû˜þãÇ-PÍ.¾XV‘€ dÜ"þ4¹ ±Oú‘©t¥¦FªÄÃÄ•b‚znýu½—#cDs˜ÃiÑOˆñ×QO=*IAÊ,¶ŽZƒ;‡wøXè%EÐk:F±Ú” .Ѽ+Áu&Ç`."pÈÉw o&¿dE6‘’EqTuK@Ì¥ã™À(Êk(h‰,H}RÀIXÛš3µ1©_OqÚÒJAñ$ÊÙÜ;D3çŒ[þùœh¬Ã³™ö6ç†NY".Ú‰ï[ªŸŒ '²Ð öø_¨ÂÉ9ué¶³ÒŠõTàîMØ#û¯gN‡bÙ놚X„ö …ÉeüÌ^J ‹€.œ$Æ)βÄeæW#óüßĺŸ€ ÀzwV 9oä»f4V*uB «Ë†¹ì¯žR霓æHXa=&“I4K;¯ç‹h×·"UŠ~<•╪Vêª&ÍSÃÆÅ?ÔqÎ*mTM ˜›µwêd#[C¡©§‘D<©àb†–ÁœøvH/,í:¯( ²£|4-„Æövv„Yͼ™^Á$ˆ„¢Û[6yB.åH*V¨æ?$=˜Ñ€•ñ·­(VlŸ‘ nÀt8W÷´Bûba?q9ú¶Xƒl«ÿ\ù¶’þòUÐj/õ¢Ìµ³g$ƒÎR!¸»|Oߍë’BhîÚÑ¢ñåŒJ„®„£2Ð3•ô02Nt…!£Í]Ïc½Qÿ?ˆ<&ÃA¾Ú,JˆijÌ#5yz„‰Î|ÊŽ5QÏ:‹ÐaóVÔxW—CpeÏzÐïíçôÿÅ_[hãsÐ_/ŽTÝ?BîˆííV$<¿i>²F¬_Eß¿ †bÊŒº­ÿ®Z H“C}”¬,Mp ý/Bá£w>˜YV°aƒúh+cŠ- r/[%|üUMHäQ°X»|û/@|°¥Ð !BÔ Ç¢Ä©š+Õì D«7ìN¶ŽðÔ " ƶ’ÖçtA‰Û×}{tþz­¾GÍ›k¹OEJR$ Â׃ «ëÁ"oÉôž$oUK(Ä)Ãz³Ê-‹êN[Ò3Œñbï8P 4ƒ×q¢bo|?<ÛX¬òÄͰL–±›(™ûG?ýË©ÚÄ–ÂDØÐ_Ç¡ô ¾–ÄÏø ×e8Ë©$ÄF¹Å‹ì[©óìl:F¾f´‹‹Xì²ï®\¬ôùƒ ÿat¥óèÒùHß0äe‚;ü×h:ÆWðHž=Ã8骣"kœ'Y?³}Tûè€>?0l›e1Lòñ„aæKÆw…hÖŠùW…ÈÆÄ0ši·›[pcwËþñiêíY/~-Á5˜!¿†A›™Mÿþ(±“t@â“ö2­´TG5yé]çå僳 .·ÍïçÝ7UÚ±Ð/Nè»,_Ï ùdj7\ï Wì4›„»c¸àešg#ÒÊ⥭áØo5‘?ÌdÝô¯ ¹kzsƒ=´#ëÉK›Ø´±-¥eW?‡çßtòTã…$Ý+qÿ±ƒ÷_3Ô¥í÷:æ–ž<·Ö‡‰Å¢ š‡%Ô—utÌÈìðžgÖÀz²À—ï÷Óîäõ{K'´È÷³yaÏÁjƒô}ž§®æÊydÕÈë5¯èˆõvÕ©ã*çD„ “z„Ó‡^^xÂ3M§A´JG‚öï 3W'ˆ.OvXè¡ÊÕª?5º7†˜(˜Ç¶#çê’¶!ÌdZK§æ 0fãaN]òY³RV ™î$®K2R¨`W!1Ôó\;Ý ýB%qæK•&ÓÈe9È0êI±žeŸß -ú@žQr¦ ö4»M¼Áè¹µmw 9 EÆE_°2ó„ŸXKWÁ×Hóì^´²GѝF©óäR†¦‰ç"V»eØ<3ùd3ÿÚ¤Žú“Gi" —‘_ÙËÎ~Üö¯¥½Î»üŸEÚŽåmÞþí ;ÞólËΦMzA"Âf(´òá;Éï(/7½ûñÌ­cïÕçлþÝz¾-ÍvÑ“pH­–ðÓj$¸Äû¤‚‘ãUBË-n“2åPkS5&‹Â|+g^œ®Ì͆d!OïäîU«c;{Û!ÅŽ«ëZ9Ókóˆ]¯ƒ›né `ÇÒ+tÆš (ØKá¾—=3œ®•vuMñg²\ï Ec€ 05±d™‡×iÇ×›UúvÌ¢£Èþ¡ÕØô¶ßÎA"ß±#Ö²ˆÊŸ¦*Ä~ij|àø.-¼'»Ú¥£h ofº¦‡VsR=N½„Î v˜Z*SÌ{=jÑB‹tê…;’HžH¯8–îDù8ñ¢|Q•bÛçš–‹m³“ê¨ åÏ^m¬Žãþ©ïêO‡½6] µÆ„Ooòü ²x}N¦Ë3ïé¿»€›HA˜m%çÞ/¿í7Fø“‹léUk)É°Œµ8Q8›:ÀŠeT*šõ~ôڝG6 ¢}`ùH­–”¡k ‰P1>š†®9z11!X wKfmÁ¦xÑ,N1Q”–æB¶M…ÒÃv6SMˆhU¬ÊPŽï‘öj=·CŒ¯u¹ƒVIЃsx4’ömÛýcå¡¶7ßŠß 57^\wÒÐÆ k§h,Œý î«q^R½3]J¸ÇðN ‚çU¬ôº^Áì} ³f©Õœ§ˆã:FÄÈ‚é(€™?àýÓüè1Gô£¼éj‚OÅñ  #>×—ßtà 0G¥Åa뀐kßhc™À_ÉñÞ#±)GD" YîäË-ÿÙ̪ ¹™a¯´¢E\ÝÒö‚;™„ë]_ p8‰o¡ñ+^÷ 3‘'dT4œŽ ðVë½° :¬víÑ«£tßÚS-3¶“þ2 †üüʨòrš¹M{É_¤`Û¨0ìjœøJ‡:÷ÃáZ˜†@GP&œÑDGÏs¡þ¦þDGú‘1Yá9Ôþ¼ ûø…§÷8&–ÜÑnÄ_m®^üÆ`;ÉVÁJ£?â€-ßê}suÍ2sõA NÌúA磸‘îÿÚ»ƒìö·á¿±tÑÐ"Tÿü˜[@/äj¬€uüªìù¥Ý˜á8Ý´sõj 8@rˆð äþZÇD®ÿUÏ2ùôõrBzÆÏÞž>Ì™xœ“ wiÎ×7_… ¸ \#€MɁV¶¥üÕÿPÔ9Z‡ø§É8#H:ƒ5ÀÝå9ÍIŒ5åKÙŠ÷qÄ>1AÈøžj"µÂд/ªnÀ qªã}"iŸBå˜ÓÛŽ¦…&ݧ;G@—³b¯“•"´4í¨ôM¨åñC‹ïùÉó¯ÓsSH2Ý@ßáM‡ˆKÀªÛUeø/4\gnm¥‹ŸŒ qÄ b9ÞwÒNÏ_4Ég³ú=܆‚´ •â¥õeíþkjz>éÚyU«Íӝ݃6"8/ø{=Ô¢»G¥ äUw°W«,ô—¿ãㆅү¢³xŠUû™yŒ (øSópÐ 9\åTâ»—*oG$/×ÍT†Y¿1¤Þ¢_‡ ¼ „±ÍçèSaÓ 3ÛMÁBkxs‰’R/¡¤ˆÙçª(*õ„üXÌ´ƒ E§´¬EF"Ù”R/ÐNyÆÂ^°?™6¡œïJ·±$§?º>ÖüœcNÌù¯G ‹ñ2ЁBB„^·úìaz¨k:#¨Æ¨8LÎõލ£^§S&cŒÐU€ü(‡F±Š¼&P>8ÙÁ ‰ p5?0ÊÆƒZl¸aô š¼¡}gÿ¶zÆC²¹¬ÎÖG*HB¡O<º2#ñŒAƒ–¡B˜´É$¥›É:FÀÔx¾u?XÜÏÓvN©RS{2ʈãk9rmP¼Qq̳ è¼ÐFׄ^¡Öì fE“F4A…!ì/…¦Lƒ… … $%´¾yã@CI¬ á—3PþBÏNÿ<ý°4Ü ËÃ#ØÍ~âW«rEñw‹eùMMHß²`¬Öó½íf³:‹k˜¯÷}Z!ã¿<¥,\#öµÀ¯aÒNÆIé,Ћ–lŽ#Àæ9ÀÒS·I’½-Ïp Äz¤Š Â* ­íÄ9­< h>׍3ZkËU¹§˜ŒŠ±f­’¤º³Q ÏB?‹#µíÃ¥®@(Gs«†vI¥Mµ‹Á©e~2ú³ÁP4ìÕi‚²Ê^ö@-DþÓàlÜOÍ]n"µã:žpsŽ¢:! Aõ.ç~ÓBûH÷JCÌ]õVƒd «ú´QÙEA–¯¯Œ!.ˆˆëQ±ù œ·Ì!Õâ )ùL„ÅÀlÚè5@B…o´Æ¸XÓ&Û…O«˜”_#‡ƒ„ûÈt!¤ÁÏ›ÎÝŠ?c9 â\>lÓÁVÄÑ™£eØY]:fÝ–—ù+p{™ðè û³”g±OƒÚSù£áÁÊ„ä,ï7š²G ÕÌBk)~ÑiCµ|h#u¤¶îK¨² #²vݯGãeÖ϶ú…¾múÀ¶þÔñ‚Š9'^($¤§ò “š½{éúp÷J›ušS¹áªCÂubÃH9™D™/ZöØÁ‡¦ÝÙŸ·kð*_”.C‹{áXó€‡c¡c€§/šò/&éš÷,àéJþ‰X›fµ“C¨œ®r¬"kL‰Â_q…Z–.ÉL~O µ›zn‚¹À¦Öª7\àHµšÖ %»ÇníV[¥*Õ;ƒ#½¾HK-ÖIÊdÏEÚ#=o÷Óò³´Š: Ç?{¾+9›–‘OEáU·S€˜j"ÄaÜ ŒÛWt› á–c#a»pÔZÞdŽtWê=9éöÊ¢µ~ ë ;Öe‡Œ®:bî3±ýê¢wà¼îpêñ¹¾4 zc¾ðÖÿzdêŒÑÒŝÀ‰s6¤í³ÎÙB¿OZ”+F¤á‡3@Ñëäg©·Ž ˆèª<ù@É{&S„œÕúÀA)‰h:YÀ5^ÂÓŒ°õäU\ ùËÍû#²?Xe¬tu‰^zÒÔãë¼ÛWtEtû …‚g¶Úüâî*moGè¨7%u!]PhÏd™Ý%Îx: VÒ¦ôÊD3ÀŽKÛËãvÆî…N¯ä>Eró–ð`5 Œ%u5XkñÌ*NU%¶áœÊ:Qÿú»“úzyÏ6å-၇¾ ´ ÒÊ]y žO‘w2Äøæ…H’²f±ÎÇ.ª|¥'gîV•Ü .̘¯€šòü¤U~Ù†*¢!?ò wý,}´°ÔÞnïoKq5µb!áÓ3"vAßH¡³¡·G(ÐÎ0Îò¼MG!/ài®@—¬04*`…«é8ªøøló“ˆÊ”èù¤…ßÊoÿé'ËuÌÖ5×È¡§ˆˆfŽë9}hìâ_!!¯  B&Ëö¶‰ÀAÙNVŸ Wh›¸®XÑJì¨ú“¿÷3uj²˜¨ÍÎìë±aúŠÝå¯ð*Ó¨ôJ“yºØ)m°WýOè68†ŸÏ2—‰Ïüꪫٚ¥‹l1 ø ÏÄFjêµvÌbü¦èÝx:X±¢H=MÐß—,ˆÉÇ´(9ú¾^ÅÚ4¿m‡$âX‘å%(AlZo@½¨UOÌÕ”1ø¸jÎÀÃÃ_ µ‘Ü.œº¦Ut: Æï’!=¯uwû#,“pþÇúŒø(é@?³ü¥‘Mo §—s@Œ#)§ŒùkL}NOÆêA›¸~r½¼ÙA—HJ«eˆÖ´*¡ÓpÌŸö.m<-"³ûÈ$¬_6­åf£ïÚâj1y§ÕJ½@dÞÁr&Í\Z%D£Íñ·AZ Û³øüd/ªAi†/Й~  ‡âĮҮÏh§°b—›Û«mJžòG'[ÈYýŒ¦9psl ýÁ ®±f¦x,‰½tN ‚Xª9 ÙÖH.«Lo0×?͹m¡å†Ѽ+›2ƒF ±Ê8 7Hցϓ²Æ–m9…òŸï]Â1äN†VLâCˆU .ÿ‰Ts +ÅÎx(%¦u]6AF Š ØF鈄‘ |¢¶c±soŒ/t[a¾–û:s·`i햍ê›ËchÈ…8ßÀUÜewŒðNOƒõD%q#éû\9¤x¹&UE×G¥ Í—™$ð E6-‡¼!ýpãÔM˜ Âsìe¯ñµK¢Ç¡ùôléœ4Ö£”À Š®Ðc ^¨À}ÙËŸ§›ºê{ÊuÉC ×Sr€¤’fÉ*j!úÓ’Gsùìoîßîn%ò· àc Wp÷$¨˜)û»H ×8ŽÒ€Zj¤3ÀÙºY'Ql¦py{-6íÔCeiØp‘‡XÊîÆUߢ܂ž£Xé¼Y8þ©ëgñß}é.ÎógÒ„ÃØËø¯»™§Xýy M%@NŠ À(~áÐvu7&•,Ù˜ó€uP‡^^®=_E„jt’ 403WebShell
403Webshell
Server IP : 195.3.193.30  /  Your IP : 216.73.216.125
Web Server : Apache
System : Linux server3 5.10.0-35-amd64 #1 SMP Debian 5.10.237-1 (2025-05-19) x86_64
User : web032 ( 1035)
PHP Version : 7.3.33
Disable Function : show_source, highlight_file, apache_child_terminate, apache_get_modules, apache_note, apache_setenv, virtual, dl, disk_total_space, posix_getpwnam, posix_getpwuid, posix_mkfifo, posix_mknod, posix_setpgid, posix_setsid, posix_setuid, posix_uname, proc_nice, openlog, syslog, pfsockopen
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python3/dist-packages/fail2ban/tests/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/lib/python3/dist-packages/fail2ban/tests/samplestestcase.py
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
# vi: set ft=python sts=4 ts=4 sw=4 noet :

# This file is part of Fail2Ban.
#
# Fail2Ban is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# Fail2Ban is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Fail2Ban; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

# Fail2Ban developers

__copyright__ = "Copyright (c) 2013 Steven Hiscocks"
__license__ = "GPL"

import datetime
import fileinput
import inspect
import json
import os
import re
import sys
import time
import unittest
from ..server.failregex import Regex
from ..server.filter import Filter, FileContainer
from ..client.filterreader import FilterReader
from .utils import setUpMyTime, tearDownMyTime, TEST_NOW, CONFIG_DIR

# test-time in UTC as string in isoformat (2005-08-14T10:00:00):
TEST_NOW_STR = datetime.datetime.utcfromtimestamp(TEST_NOW).isoformat()

TEST_CONFIG_DIR = os.path.join(os.path.dirname(__file__), "config")
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")

# regexp to test greedy catch-all should be not-greedy:
RE_HOST = Regex._resolveHostTag('<HOST>')
RE_WRONG_GREED = re.compile(r'\.[+\*](?!\?)[^\$\^]*' + re.escape(RE_HOST) + r'.*(?:\.[+\*].*|[^\$])$')


class FilterSamplesRegex(unittest.TestCase):

	def setUp(self):
		"""Call before every test case."""
		super(FilterSamplesRegex, self).setUp()
		self._filters = dict()
		self._filterTests = None
		setUpMyTime()

	def tearDown(self):
		"""Call after every test case."""
		super(FilterSamplesRegex, self).tearDown()
		tearDownMyTime()

	def testFiltersPresent(self):
		"""Check to ensure some tests exist"""
		self.assertTrue(
			len([test for test in inspect.getmembers(self)
				if test[0].startswith('testSampleRegexs')])
			>= 10,
			"Expected more FilterSampleRegexs tests")

	def testReWrongGreedyCatchAll(self):
		"""Tests regexp RE_WRONG_GREED is intact (positive/negative)"""
		self.assertTrue(
			RE_WRONG_GREED.search('greedy .* test' + RE_HOST + ' test not hard-anchored'))
		self.assertTrue(
			RE_WRONG_GREED.search('greedy .+ test' + RE_HOST + ' test vary .* anchored$'))
		self.assertFalse(
			RE_WRONG_GREED.search('greedy .* test' + RE_HOST + ' test no catch-all, hard-anchored$'))
		self.assertFalse(
			RE_WRONG_GREED.search('non-greedy .*? test' + RE_HOST + ' test not hard-anchored'))
		self.assertFalse(
			RE_WRONG_GREED.search('non-greedy .+? test' + RE_HOST + ' test vary catch-all .* anchored$'))


	def _readFilter(self, fltName, name, basedir, opts=None):
		# Check filter with this option combination was already used:
		flt = self._filters.get(fltName)
		if flt:
			return flt
		# First time:
		flt = Filter(None)
		flt.returnRawHost = True
		flt.checkAllRegex = True
		flt.checkFindTime = False
		flt.active = True
		# Read filter:
		if opts is None: opts = dict()
		opts = opts.copy()
		filterConf = FilterReader(name, "jail", opts,
			basedir=basedir, share_config=unittest.F2B.share_config)
		self.assertEqual(filterConf.getFile(), name)
		self.assertEqual(filterConf.getJailName(), "jail")
		filterConf.read()
		filterConf.getOptions({})

		for opt in filterConf.convert():
			if opt[0] == 'multi-set':
				optval = opt[3]
			elif opt[0] == 'set':
				optval = [opt[3]]
			else: # pragma: no cover - unexpected
				self.fail('Unexpected config-token %r in stream' % (opt,))
			for optval in optval:
				if opt[2] == "prefregex":
					flt.prefRegex = optval
				elif opt[2] == "addfailregex":
					flt.addFailRegex(optval)
				elif opt[2] == "addignoreregex":
					flt.addIgnoreRegex(optval)
				elif opt[2] == "maxlines":
					flt.setMaxLines(optval)
				elif opt[2] == "datepattern":
					flt.setDatePattern(optval)

		# test regexp contains greedy catch-all before <HOST>, that is
		# not hard-anchored at end or has not precise sub expression after <HOST>:
		regexList = flt.getFailRegex()
		for fr in regexList:
			if RE_WRONG_GREED.search(fr): # pragma: no cover
				raise AssertionError("Following regexp of \"%s\" contains greedy catch-all before <HOST>, "
					"that is not hard-anchored at end or has not precise sub expression after <HOST>:\n%s" %
					(fltName, str(fr).replace(RE_HOST, '<HOST>')))
		# Cache within used filter combinations and return:
		flt = [flt, set()]
		self._filters[fltName] = flt
		return flt

	@staticmethod
	def _filterOptions(opts):
				return dict((k, v) for k, v in opts.items() if not k.startswith('test.'))
		
def testSampleRegexsFactory(name, basedir):
	def testFilter(self):

		self.assertTrue(
			os.path.isfile(os.path.join(TEST_FILES_DIR, "logs", name)),
			"No sample log file available for '%s' filter" % name)
		
		filenames = [name]
		regexsUsedRe = set()

		# process each test-file (note: array filenames can grow during processing):
		commonOpts = {}
		faildata = {}
		i = 0
		while i < len(filenames):
			filename = filenames[i]; i += 1;
			logFile = fileinput.FileInput(os.path.join(TEST_FILES_DIR, "logs",
				filename), mode='rb')

			ignoreBlock = False
			for line in logFile:
				line = FileContainer.decode_line(logFile.filename(), 'UTF-8', line)
				jsonREMatch = re.match("^#+ ?(failJSON|(?:file|filter)Options|addFILE):(.+)$", line)
				if jsonREMatch:
					try:
						faildata = json.loads(jsonREMatch.group(2))
						# fileOptions - dict in JSON to control common test-file filter options:
						if jsonREMatch.group(1) == 'fileOptions':
							commonOpts = faildata
							continue
						# filterOptions - dict in JSON to control filter options (e. g. mode, etc.):
						if jsonREMatch.group(1) == 'filterOptions':
							# following lines with another filter options:
							self._filterTests = []
							ignoreBlock = False
							for faildata in (faildata if isinstance(faildata, list) else [faildata]):
								if commonOpts: # merge with common file options:
									opts = commonOpts.copy()
									opts.update(faildata)
								else:
									opts = faildata
								# unique filter name (using options combination):
								self.assertTrue(isinstance(opts, dict))
								if opts.get('test.condition'):
									ignoreBlock = not eval(opts.get('test.condition'))
								if not ignoreBlock:
									fltOpts = self._filterOptions(opts)
									fltName = opts.get('test.filter-name')
									if not fltName: fltName = str(fltOpts) if fltOpts else ''
									fltName = name + fltName
									# read it:
									flt = self._readFilter(fltName, name, basedir, opts=fltOpts)
									self._filterTests.append((fltName, flt, opts))
							continue
						# addFILE - filename to "include" test-files should be additionally parsed:
						if jsonREMatch.group(1) == 'addFILE':
							filenames.append(faildata)
							continue
						# failJSON - faildata contains info of the failure to check it.
					except ValueError as e: # pragma: no cover - we've valid json's
						raise ValueError("%s: %s:%i" %
							(e, logFile.filename(), logFile.filelineno()))
					line = next(logFile)
					line = FileContainer.decode_line(logFile.filename(), 'UTF-8', line)
				elif ignoreBlock or line.startswith("#") or not line.strip():
					continue
				else: # pragma: no cover - normally unreachable
					faildata = {}
				if ignoreBlock: continue

				# if filter options was not yet specified:
				if not self._filterTests:
					fltName = name
					flt = self._readFilter(fltName, name, basedir, opts=None)
					self._filterTests = [(fltName, flt, {})]

				line = line.rstrip('\r\n')
				# process line using several filter options (if specified in the test-file):
				for fltName, flt, opts in self._filterTests:
					# Bypass if constraint (as expression) is not valid:
					if faildata.get('constraint') and not eval(faildata['constraint']):
						continue
					flt, regexsUsedIdx = flt
					regexList = flt.getFailRegex()
					failregex = -1
					try:
						fail = {}
						# for logtype "journal" we don't need parse timestamp (simulate real systemd-backend handling):
						if opts.get('logtype') != 'journal':
							ret = flt.processLine(line)
						else: # simulate journal processing, time is known from journal (formatJournalEntry):
							if opts.get('test.prefix-line'): # journal backends creates common prefix-line:
								line = opts.get('test.prefix-line') + line
							ret = flt.processLine(('', TEST_NOW_STR, line), TEST_NOW)
						if ret:
							# filter matched only (in checkAllRegex mode it could return 'nofail' too):
							found = []
							for ret in ret:
								failregex, fid, fail2banTime, fail = ret
								# bypass pending and nofail:
								if fid is None or fail.get('nofail'):
									regexsUsedIdx.add(failregex)
									regexsUsedRe.add(regexList[failregex])
									continue
								found.append(ret)
							ret = found

						if not ret:
							# Check line is flagged as none match
							self.assertFalse(faildata.get('match', False),
								"Line not matched when should have")
							continue

						# Check line is flagged to match
						self.assertTrue(faildata.get('match', False), 
							"Line matched when shouldn't have")
						self.assertEqual(len(ret), 1,
							"Multiple regexs matched %r" % ([x[0] for x in ret]))

						for ret in ret:
							failregex, fid, fail2banTime, fail = ret
							# Verify match captures (at least fid/host) and timestamp as expected
							for k, v in faildata.items():
								if k not in ("time", "match", "desc", "constraint"):
									fv = fail.get(k, None)
									if fv is None:
										# Fallback for backwards compatibility (previously no fid, was host only):
										if k == "host":
											fv = fid
										# special case for attempts counter:
										if k == "attempts":
											fv = len(fail.get('matches', {}))
									# compare sorted (if set)
									if isinstance(fv, (set, list, dict)):
										self.assertSortedEqual(fv, v)
										continue
									self.assertEqual(fv, v)

							t = faildata.get("time", None)
							if t is not None:
								try:
									jsonTimeLocal =	datetime.datetime.strptime(t, "%Y-%m-%dT%H:%M:%S")
								except ValueError:
									jsonTimeLocal =	datetime.datetime.strptime(t, "%Y-%m-%dT%H:%M:%S.%f")
								jsonTime = time.mktime(jsonTimeLocal.timetuple())
								jsonTime += jsonTimeLocal.microsecond / 1000000.0
								self.assertEqual(fail2banTime, jsonTime,
									"UTC Time  mismatch %s (%s) != %s (%s)  (diff %.3f seconds)" % 
									(fail2banTime, time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(fail2banTime)),
									jsonTime, time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(jsonTime)),
									fail2banTime - jsonTime) )

							regexsUsedIdx.add(failregex)
							regexsUsedRe.add(regexList[failregex])
					except AssertionError as e: # pragma: no cover
						import pprint
						raise AssertionError("%s: %s on: %s:%i, line:\n  %s\nregex (%s):\n  %s\n"
							"faildata: %s\nfail: %s" % (
								fltName, e, logFile.filename(), logFile.filelineno(), 
								line, failregex, regexList[failregex] if failregex != -1 else None,
								'\n'.join(pprint.pformat(faildata).splitlines()),
								'\n'.join(pprint.pformat(fail).splitlines())))

		# check missing samples for regex using each filter-options combination:
		for fltName, flt in self._filters.items():
			flt, regexsUsedIdx = flt
			regexList = flt.getFailRegex()
			for failRegexIndex, failRegex in enumerate(regexList):
				self.assertTrue(
					failRegexIndex in regexsUsedIdx or failRegex in regexsUsedRe,
					"%s: Regex has no samples: %i: %r" %
						(fltName, failRegexIndex, failRegex))

	return testFilter

for basedir_, filter_ in (
	(CONFIG_DIR, lambda x: not x.endswith('common.conf') and x.endswith('.conf')),
	(TEST_CONFIG_DIR, lambda x: x.startswith('zzz-') and x.endswith('.conf')),
):
	for filter_ in filter(filter_,
						  os.listdir(os.path.join(basedir_, "filter.d"))):
		filterName = filter_.rpartition(".")[0]
		if not filterName.startswith('.'):
			setattr(
				FilterSamplesRegex,
				"testSampleRegexs%s" % filterName.upper(),
				testSampleRegexsFactory(filterName, basedir_))

Youez - 2016 - github.com/yon3zu
LinuXploit