Jak wyodrębnić adres IP, który występuje 10 razy w ciągu jednej sekundy?Python: jak analizować i sprawdzać czas?
W następującym przypadku:
241.7118.197.10
28.252.8
Jak wyodrębnić adres IP, który występuje 10 razy w ciągu jednej sekundy?Python: jak analizować i sprawdzać czas?
W następującym przypadku:
241.7118.197.10
28.252.8
Można by zbierać dane do dict
gdy IP jest kluczem, a wartość zawiera znaczniki czasu dla danego adresu IP. Wtedy za każdym razem, gdy dodawany jest znacznik czasu można sprawdzić, czy dany IP ma trzy znaczniki czasu w ciągu sekundy:
from datetime import datetime, timedelta
from collections import defaultdict, deque
import re
THRESHOLD = timedelta(seconds=1)
COUNT = 3
res = set()
d = defaultdict(deque)
with open('test.txt') as f:
for line in f:
# Capture IP and timestamp
m = re.match(r'(\S*)[^\[]*\[(\S*)', line)
ip, dt = m.groups()
# Parse timestamp
dt = datetime.strptime(dt, '%d/%b/%Y:%H:%M:%S:%f')
# Remove timestamps from deque if they are older than threshold
que = d[ip]
while que and (dt - que[0]) > THRESHOLD:
que.popleft()
# Add timestamp, update result if there's 3 or more items
que.append(dt)
if len(que) >= COUNT:
res.add(ip)
print(res)
Wynik:
{'28.252.89.140'}
Przede odczytuje plik dziennika zawierający wiersz po wierszu dziennika. Dla każdej linii używane jest wyrażenie regularne do przechwytywania danych w dwóch grupach: IP i znacznik czasu. Następnie strptime
służy do analizy czasu.
Pierwsza grupa (\S*)
rejestruje wszystko oprócz spacji. Następnie [^\[]*
przechwytuje wszystko oprócz [
i \[
przechwytuje ostatni znak przed datą. Ostatecznie (\S*)
jest ponownie używany do przechwytywania wszystkiego do następnych białych znaków. Zobacz example on regex101.
Po uzyskaniu adresu IP i czasu są one dodawane do defaultdict
, gdzie kluczem jest IP, a wartością jest deque
znaczników czasowych. Przed dodaniem nowego znacznika czasu stare są usuwane, jeśli są starsze niż THRESHOLD
. Zakłada to, że linie dziennika są już posortowane według czasu. Po dodaniu długość jest sprawdzana i jeśli istnieje COUNT
lub więcej pozycji w IP kolejki jest dodawana do zestawu wyników.
Pierwszym krokiem byłoby do analizowania danych, można to zrobić z tym:
data = [(ip, datetime.strptime(time, '%d/%b/%Y:%H:%M:%S:%f')) for (ip, time) in re.findall("((?:[0-9]{1,3}\.){3}[0-9]{1,3}).+?\[(.+?) -", text)]
gdzie tekst wejściowy to text
.
Spowoduje to wyświetlenie listy z krotką dla każdego wpisu. Pierwszym elementem krotki będzie adres IP, a po nim data.
Następnym krokiem jest sprawdzenie, które z nich stało w 1 przedziale sek i mają ten sam IP:
print set([a[0] for a in data for b in data for c in data if (datetime.timedelta(seconds=0)<a[1]-b[1]<datetime.timedelta(seconds=1)) and (datetime.timedelta(seconds=0)<a[1]-c[1]<datetime.timedelta(seconds=1)) and (datetime.timedelta(seconds=0)<b[1]-c[1]<datetime.timedelta(seconds=1))])
wyjściowy:
set(['28.252.89.140'])
, więc czytasz cały plik? – Maria
Który plik? Podałeś tylko ten tekst wejściowy. Jeśli ten tekst znajduje się w pliku, powinieneś go najpierw przeczytać. –
Używając tych dwóch linii potrzebujesz dwóch modułów: 're' i' datetime' –
, więc każdy 'dic [ip_addr]' zawiera kolejkę? – Maria
Tak, 'deque' znaczników czasu, z których najstarsze elementy mogą być usuwane za każdym razem, gdy dodawany jest nowy znacznik czasu. – niemmi
czy można wyjaśnić "r" (^ [^ \ s] *) [^ \ [] * \ [([^ \ s \]] *): (\ d +) '' – Maria