gh-135852: Remove out of tree pywin32 dependency for NTEventLogHandler (GH-137860)

Add RegisterEventSource(), DeregisterEventSource(), ReportEvent()
and a number of EVENTLOG_* constants to _winapi.
This commit is contained in:
AN Long
2025-12-31 19:50:50 +09:00
committed by GitHub
parent 96ab379dca
commit 3c4429f65a
6 changed files with 358 additions and 41 deletions
+52 -39
View File
@@ -1129,7 +1129,7 @@ class NTEventLogHandler(logging.Handler):
"""
A handler class which sends events to the NT Event Log. Adds a
registry entry for the specified application name. If no dllname is
provided, win32service.pyd (which contains some basic message
provided and pywin32 installed, win32service.pyd (which contains some basic message
placeholders) is used. Note that use of these placeholders will make
your event logs big, as the entire message source is held in the log.
If you want slimmer logs, you have to pass in the name of your own DLL
@@ -1137,38 +1137,46 @@ class NTEventLogHandler(logging.Handler):
"""
def __init__(self, appname, dllname=None, logtype="Application"):
logging.Handler.__init__(self)
try:
import win32evtlogutil, win32evtlog
self.appname = appname
self._welu = win32evtlogutil
if not dllname:
dllname = os.path.split(self._welu.__file__)
import _winapi
self._winapi = _winapi
self.appname = appname
if not dllname:
# backward compatibility
try:
import win32evtlogutil
dllname = os.path.split(win32evtlogutil.__file__)
dllname = os.path.split(dllname[0])
dllname = os.path.join(dllname[0], r'win32service.pyd')
self.dllname = dllname
self.logtype = logtype
# Administrative privileges are required to add a source to the registry.
# This may not be available for a user that just wants to add to an
# existing source - handle this specific case.
try:
self._welu.AddSourceToRegistry(appname, dllname, logtype)
except Exception as e:
# This will probably be a pywintypes.error. Only raise if it's not
# an "access denied" error, else let it pass
if getattr(e, 'winerror', None) != 5: # not access denied
raise
self.deftype = win32evtlog.EVENTLOG_ERROR_TYPE
self.typemap = {
logging.DEBUG : win32evtlog.EVENTLOG_INFORMATION_TYPE,
logging.INFO : win32evtlog.EVENTLOG_INFORMATION_TYPE,
logging.WARNING : win32evtlog.EVENTLOG_WARNING_TYPE,
logging.ERROR : win32evtlog.EVENTLOG_ERROR_TYPE,
logging.CRITICAL: win32evtlog.EVENTLOG_ERROR_TYPE,
}
except ImportError:
print("The Python Win32 extensions for NT (service, event "\
"logging) appear not to be available.")
self._welu = None
except ImportError:
pass
self.dllname = dllname
self.logtype = logtype
# Administrative privileges are required to add a source to the registry.
# This may not be available for a user that just wants to add to an
# existing source - handle this specific case.
try:
self._add_source_to_registry(appname, dllname, logtype)
except PermissionError:
pass
self.deftype = _winapi.EVENTLOG_ERROR_TYPE
self.typemap = {
logging.DEBUG: _winapi.EVENTLOG_INFORMATION_TYPE,
logging.INFO: _winapi.EVENTLOG_INFORMATION_TYPE,
logging.WARNING: _winapi.EVENTLOG_WARNING_TYPE,
logging.ERROR: _winapi.EVENTLOG_ERROR_TYPE,
logging.CRITICAL: _winapi.EVENTLOG_ERROR_TYPE,
}
@staticmethod
def _add_source_to_registry(appname, dllname, logtype):
import winreg
key_path = f"SYSTEM\\CurrentControlSet\\Services\\EventLog\\{logtype}\\{appname}"
with winreg.CreateKey(winreg.HKEY_LOCAL_MACHINE, key_path) as key:
if dllname:
winreg.SetValueEx(key, "EventMessageFile", 0, winreg.REG_EXPAND_SZ, dllname)
winreg.SetValueEx(key, "TypesSupported", 0, winreg.REG_DWORD, 7) # All types are supported
def getMessageID(self, record):
"""
@@ -1209,15 +1217,20 @@ class NTEventLogHandler(logging.Handler):
Determine the message ID, event category and event type. Then
log the message in the NT event log.
"""
if self._welu:
try:
id = self.getMessageID(record)
cat = self.getEventCategory(record)
type = self.getEventType(record)
msg = self.format(record)
# Get a handle to the event log
handle = self._winapi.RegisterEventSource(None, self.appname)
try:
id = self.getMessageID(record)
cat = self.getEventCategory(record)
type = self.getEventType(record)
msg = self.format(record)
self._welu.ReportEvent(self.appname, id, cat, type, [msg])
except Exception:
self.handleError(record)
self._winapi.ReportEvent(handle, type, cat, id, msg)
finally:
self._winapi.DeregisterEventSource(handle)
except Exception:
self.handleError(record)
def close(self):
"""
+12 -1
View File
@@ -7248,8 +7248,8 @@ for when, exp in (('S', 1),
setattr(TimedRotatingFileHandlerTest, name, test_compute_rollover)
@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil/pywintypes required for this test.')
class NTEventLogHandlerTest(BaseTest):
@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil/pywintypes required for this test.')
def test_basic(self):
logtype = 'Application'
elh = win32evtlog.OpenEventLog(None, logtype)
@@ -7283,6 +7283,17 @@ class NTEventLogHandlerTest(BaseTest):
msg = 'Record not found in event log, went back %d records' % GO_BACK
self.assertTrue(found, msg=msg)
@unittest.skipUnless(sys.platform == "win32", "Windows required for this test")
def test_without_pywin32(self):
h = logging.handlers.NTEventLogHandler('python_test')
self.addCleanup(h.close)
# Verify that the handler uses _winapi module
self.assertIsNotNone(h._winapi, "_winapi module should be available")
r = logging.makeLogRecord({'msg': 'Hello!'})
h.emit(r)
class MiscTestCase(unittest.TestCase):
def test__all__(self):
+36
View File
@@ -1,5 +1,6 @@
# Test the Windows-only _winapi module
import errno
import os
import pathlib
import re
@@ -156,3 +157,38 @@ class WinAPITests(unittest.TestCase):
pipe2.write(b'testdata')
pipe2.flush()
self.assertEqual((b'testdata', 8), _winapi.PeekNamedPipe(pipe, 8)[:2])
def test_event_source_registration(self):
source_name = "PythonTestEventSource"
handle = _winapi.RegisterEventSource(None, source_name)
self.addCleanup(_winapi.DeregisterEventSource, handle)
self.assertNotEqual(handle, _winapi.INVALID_HANDLE_VALUE)
with self.assertRaises(OSError) as cm:
_winapi.RegisterEventSource(None, "")
self.assertEqual(cm.exception.errno, errno.EINVAL)
with self.assertRaises(OSError) as cm:
_winapi.DeregisterEventSource(_winapi.INVALID_HANDLE_VALUE)
self.assertEqual(cm.exception.errno, errno.EBADF)
def test_report_event(self):
source_name = "PythonTestEventSource"
handle = _winapi.RegisterEventSource(None, source_name)
self.assertNotEqual(handle, _winapi.INVALID_HANDLE_VALUE)
self.addCleanup(_winapi.DeregisterEventSource, handle)
_winapi.ReportEvent(handle, _winapi.EVENTLOG_SUCCESS, 1, 1002,
"Test message 1")
with self.assertRaises(TypeError):
_winapi.ReportEvent(handle, _winapi.EVENTLOG_SUCCESS, 1, 1002, 42)
with self.assertRaises(TypeError):
_winapi.ReportEvent(handle, _winapi.EVENTLOG_SUCCESS, 1, 1002, None)
with self.assertRaises(ValueError):
_winapi.ReportEvent(handle, _winapi.EVENTLOG_SUCCESS, 1, 1002,
"Test message \0 with embedded null character")
@@ -0,0 +1,4 @@
Add :func:`!_winapi.RegisterEventSource`,
:func:`!_winapi.DeregisterEventSource` and :func:`!_winapi.ReportEvent`.
Using these functions in :class:`~logging.handlers.NTEventLogHandler`
to replace :mod:`!pywin32`.
+106
View File
@@ -2982,6 +2982,103 @@ _winapi_CopyFile2_impl(PyObject *module, LPCWSTR existing_file_name,
Py_RETURN_NONE;
}
/*[clinic input]
_winapi.RegisterEventSource -> HANDLE
unc_server_name: LPCWSTR(accept={str, NoneType})
The UNC name of the server on which the event source should be registered.
If None, registers the event source on the local computer.
source_name: LPCWSTR
The name of the event source to register.
/
Retrieves a registered handle to the specified event log.
[clinic start generated code]*/
static HANDLE
_winapi_RegisterEventSource_impl(PyObject *module, LPCWSTR unc_server_name,
LPCWSTR source_name)
/*[clinic end generated code: output=e376c8950a89ae8f input=9d01059ac2156d0c]*/
{
HANDLE handle;
Py_BEGIN_ALLOW_THREADS
handle = RegisterEventSourceW(unc_server_name, source_name);
Py_END_ALLOW_THREADS
if (handle == NULL) {
PyErr_SetFromWindowsErr(0);
return INVALID_HANDLE_VALUE;
}
return handle;
}
/*[clinic input]
_winapi.DeregisterEventSource
handle: HANDLE
The handle to the event log to be deregistered.
/
Closes the specified event log.
[clinic start generated code]*/
static PyObject *
_winapi_DeregisterEventSource_impl(PyObject *module, HANDLE handle)
/*[clinic end generated code: output=7387ff34c7358bce input=947593cf67641f16]*/
{
BOOL success;
Py_BEGIN_ALLOW_THREADS
success = DeregisterEventSource(handle);
Py_END_ALLOW_THREADS
if (!success) {
return PyErr_SetFromWindowsErr(0);
}
Py_RETURN_NONE;
}
/*[clinic input]
_winapi.ReportEvent
handle: HANDLE
The handle to the event log.
type: unsigned_short(bitwise=False)
The type of event being reported.
category: unsigned_short(bitwise=False)
The event category.
event_id: unsigned_int(bitwise=False)
The event identifier.
string: LPCWSTR
A string to be inserted into the event message.
/
Writes an entry at the end of the specified event log.
[clinic start generated code]*/
static PyObject *
_winapi_ReportEvent_impl(PyObject *module, HANDLE handle,
unsigned short type, unsigned short category,
unsigned int event_id, LPCWSTR string)
/*[clinic end generated code: output=4281230b70a2470a input=8fb3385b8e7a6d3d]*/
{
BOOL success;
Py_BEGIN_ALLOW_THREADS
success = ReportEventW(handle, type, category, event_id, NULL, 1, 0,
&string, NULL);
Py_END_ALLOW_THREADS
if (!success) {
return PyErr_SetFromWindowsErr(0);
}
Py_RETURN_NONE;
}
static PyMethodDef winapi_functions[] = {
_WINAPI_CLOSEHANDLE_METHODDEF
@@ -2994,6 +3091,7 @@ static PyMethodDef winapi_functions[] = {
_WINAPI_CREATEPIPE_METHODDEF
_WINAPI_CREATEPROCESS_METHODDEF
_WINAPI_CREATEJUNCTION_METHODDEF
_WINAPI_DEREGISTEREVENTSOURCE_METHODDEF
_WINAPI_DUPLICATEHANDLE_METHODDEF
_WINAPI_EXITPROCESS_METHODDEF
_WINAPI_GETCURRENTPROCESS_METHODDEF
@@ -3010,6 +3108,8 @@ static PyMethodDef winapi_functions[] = {
_WINAPI_OPENMUTEXW_METHODDEF
_WINAPI_OPENPROCESS_METHODDEF
_WINAPI_PEEKNAMEDPIPE_METHODDEF
_WINAPI_REGISTEREVENTSOURCE_METHODDEF
_WINAPI_REPORTEVENT_METHODDEF
_WINAPI_LCMAPSTRINGEX_METHODDEF
_WINAPI_READFILE_METHODDEF
_WINAPI_RELEASEMUTEX_METHODDEF
@@ -3082,6 +3182,12 @@ static int winapi_exec(PyObject *m)
WINAPI_CONSTANT(F_DWORD, ERROR_PIPE_CONNECTED);
WINAPI_CONSTANT(F_DWORD, ERROR_PRIVILEGE_NOT_HELD);
WINAPI_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT);
WINAPI_CONSTANT(F_DWORD, EVENTLOG_SUCCESS);
WINAPI_CONSTANT(F_DWORD, EVENTLOG_AUDIT_FAILURE);
WINAPI_CONSTANT(F_DWORD, EVENTLOG_AUDIT_SUCCESS);
WINAPI_CONSTANT(F_DWORD, EVENTLOG_ERROR_TYPE);
WINAPI_CONSTANT(F_DWORD, EVENTLOG_INFORMATION_TYPE);
WINAPI_CONSTANT(F_DWORD, EVENTLOG_WARNING_TYPE);
WINAPI_CONSTANT(F_DWORD, FILE_FLAG_FIRST_PIPE_INSTANCE);
WINAPI_CONSTANT(F_DWORD, FILE_FLAG_OVERLAPPED);
WINAPI_CONSTANT(F_DWORD, FILE_GENERIC_READ);
+148 -1
View File
@@ -2184,7 +2184,154 @@ exit:
return return_value;
}
PyDoc_STRVAR(_winapi_RegisterEventSource__doc__,
"RegisterEventSource($module, unc_server_name, source_name, /)\n"
"--\n"
"\n"
"Retrieves a registered handle to the specified event log.\n"
"\n"
" unc_server_name\n"
" The UNC name of the server on which the event source should be registered.\n"
" If None, registers the event source on the local computer.\n"
" source_name\n"
" The name of the event source to register.");
#define _WINAPI_REGISTEREVENTSOURCE_METHODDEF \
{"RegisterEventSource", _PyCFunction_CAST(_winapi_RegisterEventSource), METH_FASTCALL, _winapi_RegisterEventSource__doc__},
static HANDLE
_winapi_RegisterEventSource_impl(PyObject *module, LPCWSTR unc_server_name,
LPCWSTR source_name);
static PyObject *
_winapi_RegisterEventSource(PyObject *module, PyObject *const *args, Py_ssize_t nargs)
{
PyObject *return_value = NULL;
LPCWSTR unc_server_name = NULL;
LPCWSTR source_name = NULL;
HANDLE _return_value;
if (!_PyArg_CheckPositional("RegisterEventSource", nargs, 2, 2)) {
goto exit;
}
if (args[0] == Py_None) {
unc_server_name = NULL;
}
else if (PyUnicode_Check(args[0])) {
unc_server_name = PyUnicode_AsWideCharString(args[0], NULL);
if (unc_server_name == NULL) {
goto exit;
}
}
else {
_PyArg_BadArgument("RegisterEventSource", "argument 1", "str or None", args[0]);
goto exit;
}
if (!PyUnicode_Check(args[1])) {
_PyArg_BadArgument("RegisterEventSource", "argument 2", "str", args[1]);
goto exit;
}
source_name = PyUnicode_AsWideCharString(args[1], NULL);
if (source_name == NULL) {
goto exit;
}
_return_value = _winapi_RegisterEventSource_impl(module, unc_server_name, source_name);
if ((_return_value == INVALID_HANDLE_VALUE) && PyErr_Occurred()) {
goto exit;
}
if (_return_value == NULL) {
Py_RETURN_NONE;
}
return_value = HANDLE_TO_PYNUM(_return_value);
exit:
/* Cleanup for unc_server_name */
PyMem_Free((void *)unc_server_name);
/* Cleanup for source_name */
PyMem_Free((void *)source_name);
return return_value;
}
PyDoc_STRVAR(_winapi_DeregisterEventSource__doc__,
"DeregisterEventSource($module, handle, /)\n"
"--\n"
"\n"
"Closes the specified event log.\n"
"\n"
" handle\n"
" The handle to the event log to be deregistered.");
#define _WINAPI_DEREGISTEREVENTSOURCE_METHODDEF \
{"DeregisterEventSource", (PyCFunction)_winapi_DeregisterEventSource, METH_O, _winapi_DeregisterEventSource__doc__},
static PyObject *
_winapi_DeregisterEventSource_impl(PyObject *module, HANDLE handle);
static PyObject *
_winapi_DeregisterEventSource(PyObject *module, PyObject *arg)
{
PyObject *return_value = NULL;
HANDLE handle;
if (!PyArg_Parse(arg, "" F_HANDLE ":DeregisterEventSource", &handle)) {
goto exit;
}
return_value = _winapi_DeregisterEventSource_impl(module, handle);
exit:
return return_value;
}
PyDoc_STRVAR(_winapi_ReportEvent__doc__,
"ReportEvent($module, handle, type, category, event_id, string, /)\n"
"--\n"
"\n"
"Writes an entry at the end of the specified event log.\n"
"\n"
" handle\n"
" The handle to the event log.\n"
" type\n"
" The type of event being reported.\n"
" category\n"
" The event category.\n"
" event_id\n"
" The event identifier.\n"
" string\n"
" A string to be inserted into the event message.");
#define _WINAPI_REPORTEVENT_METHODDEF \
{"ReportEvent", _PyCFunction_CAST(_winapi_ReportEvent), METH_FASTCALL, _winapi_ReportEvent__doc__},
static PyObject *
_winapi_ReportEvent_impl(PyObject *module, HANDLE handle,
unsigned short type, unsigned short category,
unsigned int event_id, LPCWSTR string);
static PyObject *
_winapi_ReportEvent(PyObject *module, PyObject *const *args, Py_ssize_t nargs)
{
PyObject *return_value = NULL;
HANDLE handle;
unsigned short type;
unsigned short category;
unsigned int event_id;
LPCWSTR string = NULL;
if (!_PyArg_ParseStack(args, nargs, "" F_HANDLE "O&O&O&O&:ReportEvent",
&handle, _PyLong_UnsignedShort_Converter, &type, _PyLong_UnsignedShort_Converter, &category, _PyLong_UnsignedInt_Converter, &event_id, _PyUnicode_WideCharString_Converter, &string)) {
goto exit;
}
return_value = _winapi_ReportEvent_impl(module, handle, type, category, event_id, string);
exit:
/* Cleanup for string */
PyMem_Free((void *)string);
return return_value;
}
#ifndef _WINAPI_GETSHORTPATHNAME_METHODDEF
#define _WINAPI_GETSHORTPATHNAME_METHODDEF
#endif /* !defined(_WINAPI_GETSHORTPATHNAME_METHODDEF) */
/*[clinic end generated code: output=4581fd481c3c6293 input=a9049054013a1b77]*/
/*[clinic end generated code: output=4ab94eaee93a0a90 input=a9049054013a1b77]*/