diff --git a/ydb/apps/ydbd/exports.symlist b/ydb/apps/ydbd/exports.symlist new file mode 100644 index 000000000000..87f9162078e9 --- /dev/null +++ b/ydb/apps/ydbd/exports.symlist @@ -0,0 +1,45 @@ +{ +global: + +__free_hook; +__libc_calloc; +__libc_free; +__libc_malloc; +__libc_memalign; +__libc_pvalloc; +__libc_realloc; +__libc_valloc; +__malloc_hook; +__memalign_hook; +__realloc_hook; +aligned_alloc; +calloc; +cfree; +free; +mallinfo; +malloc; +malloc_info; +malloc_stats; +malloc_trim; +malloc_usable_size; +mallopt; +memalign; +posix_memalign; +pthread_equal; +pvalloc; +realloc; +valloc; + +UdfAllocate; +UdfFree; +UdfAllocateWithSize; +UdfFreeWithSize; +UdfTerminate; +UdfRegisterObject; +UdfUnregisterObject; +UdfArrowAllocate; +UdfArrowReallocate; +UdfArrowFree; + +local: *; +}; diff --git a/ydb/apps/ydbd/ya.make b/ydb/apps/ydbd/ya.make index ea242c4b01e9..952309e78ae7 100644 --- a/ydb/apps/ydbd/ya.make +++ b/ydb/apps/ydbd/ya.make @@ -1,7 +1,14 @@ PROGRAM(ydbd) IF (NOT SANITIZER_TYPE) # for some reasons some tests with asan are failed, see comment in CPPCOM-32 - NO_EXPORT_DYNAMIC_SYMBOLS() + # Disabling export of dynamic symbols allows to significantly reduce size of the stripped binary, + # however, to be able to use dynamic UDFs (the --udfs-dir flag of ydbd server), + # required explicit export of symbols from yql/essentials/public/udf/service/exception_policy/udf_service.cpp + IF (OS_LINUX) + EXPORTS_SCRIPT(ydb/apps/ydbd/exports.symlist) + ELSE() + NO_EXPORT_DYNAMIC_SYMBOLS() + ENDIF() ENDIF() IF (OS_LINUX) diff --git a/ydb/core/kqp/common/events/script_executions.h b/ydb/core/kqp/common/events/script_executions.h index 6b123ba48879..014b8886fc41 100644 --- a/ydb/core/kqp/common/events/script_executions.h +++ b/ydb/core/kqp/common/events/script_executions.h @@ -357,7 +357,7 @@ struct TEvGetScriptPhysicalGraphResponse : public TEventLocal&& physicalGraph, i64 generation, NYql::TIssues issues) : Status(status) , PhysicalGraph(std::move(physicalGraph)) , Generation(generation) @@ -365,7 +365,7 @@ struct TEvGetScriptPhysicalGraphResponse : public TEventLocal PhysicalGraph; i64 Generation = 0; NYql::TIssues Issues; }; diff --git a/ydb/core/kqp/finalize_script_service/kqp_check_script_lease_actor.cpp b/ydb/core/kqp/finalize_script_service/kqp_check_script_lease_actor.cpp index c8d1e6c043b3..580d8ffe71f4 100644 --- a/ydb/core/kqp/finalize_script_service/kqp_check_script_lease_actor.cpp +++ b/ydb/core/kqp/finalize_script_service/kqp_check_script_lease_actor.cpp @@ -20,9 +20,10 @@ class TScriptExecutionLeaseCheckActor : public TActorBootstrapped counters) + TScriptExecutionLeaseCheckActor(const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TDuration startupTimeout, TIntrusivePtr counters) : QueryServiceConfig(queryServiceConfig) , Counters(counters) + , StartupTimeout(startupTimeout) {} void Bootstrap() { @@ -30,7 +31,7 @@ class TScriptExecutionLeaseCheckActor : public TActorBootstrapped(EWakeup::ScheduleRefreshScriptExecutions))); } STRICT_STFUNC(MainState, @@ -64,6 +65,7 @@ class TScriptExecutionLeaseCheckActor : public TActorBootstrappedGet()->AssignedNodes.size(); RefreshLeasePeriod = std::max(nodesCount, static_cast(1)) * CHECK_PERIOD; + HasNodesInfo = true; LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_PROXY, LogPrefix() << "Handle discover tenant nodes result, number of nodes #" << nodesCount << ", new RefreshLeasePeriod: " << RefreshLeasePeriod); } @@ -92,6 +94,11 @@ class TScriptExecutionLeaseCheckActor : public TActorBootstrapped(EWakeup::ScheduleRefreshScriptExecutions))); + if (!HasNodesInfo) { + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_PROXY, LogPrefix() << "Skip ScheduleRefreshScriptExecutions, node info is not arrived"); + return; + } + if (!WaitRefreshScriptExecutions) { WaitRefreshScriptExecutions = true; @@ -110,17 +117,19 @@ class TScriptExecutionLeaseCheckActor : public TActorBootstrapped Counters; + const TDuration StartupTimeout; TDuration RefreshLeasePeriod = CHECK_PERIOD; bool WaitRefreshNodes = false; bool WaitRefreshScriptExecutions = false; + bool HasNodesInfo = false; }; } // anonymous namespace -IActor* CreateScriptExecutionLeaseCheckActor(const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TIntrusivePtr counters) { - return new TScriptExecutionLeaseCheckActor(queryServiceConfig, counters); +IActor* CreateScriptExecutionLeaseCheckActor(const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TDuration startupTimeout, TIntrusivePtr counters) { + return new TScriptExecutionLeaseCheckActor(queryServiceConfig, startupTimeout, counters); } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/finalize_script_service/kqp_check_script_lease_actor.h b/ydb/core/kqp/finalize_script_service/kqp_check_script_lease_actor.h index 86f15b788b67..5ab5bfa9ab00 100644 --- a/ydb/core/kqp/finalize_script_service/kqp_check_script_lease_actor.h +++ b/ydb/core/kqp/finalize_script_service/kqp_check_script_lease_actor.h @@ -9,6 +9,6 @@ namespace NKikimrConfig { namespace NKikimr::NKqp { -NActors::IActor* CreateScriptExecutionLeaseCheckActor(const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TIntrusivePtr counters); +NActors::IActor* CreateScriptExecutionLeaseCheckActor(const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TDuration startupTimeout, TIntrusivePtr counters); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp index 61183a49853d..a52690fb4e63 100644 --- a/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp +++ b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp @@ -21,9 +21,11 @@ class TKqpFinalizeScriptService : public TActorBootstrapped s3ActorsFactory, - bool enableBackgroundLeaseChecks) + bool enableBackgroundLeaseChecks, + TDuration leaseCheckStartupTimeout) : QueryServiceConfig(queryServiceConfig) , EnableBackgroundLeaseChecks(enableBackgroundLeaseChecks) + , LeaseCheckStartupTimeout(leaseCheckStartupTimeout) , FederatedQuerySetupFactory(federatedQuerySetupFactory) , S3ActorsFactory(std::move(s3ActorsFactory)) {} @@ -58,7 +60,7 @@ class TKqpFinalizeScriptService : public TActorBootstrapped Counters; IKqpFederatedQuerySetupFactory::TPtr FederatedQuerySetupFactory; @@ -206,8 +209,9 @@ class TKqpFinalizeScriptService : public TActorBootstrapped s3ActorsFactory, - bool enableBackgroundLeaseChecks) { - return new TKqpFinalizeScriptService(queryServiceConfig, std::move(federatedQuerySetupFactory), std::move(s3ActorsFactory), enableBackgroundLeaseChecks); + bool enableBackgroundLeaseChecks, + TDuration leaseCheckStartupTimeout) { + return new TKqpFinalizeScriptService(queryServiceConfig, std::move(federatedQuerySetupFactory), std::move(s3ActorsFactory), enableBackgroundLeaseChecks, leaseCheckStartupTimeout); } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.h b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.h index 944401d531dc..fc137c4104ee 100644 --- a/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.h +++ b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.h @@ -13,6 +13,7 @@ namespace NKikimr::NKqp { IActor* CreateKqpFinalizeScriptService(const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory, std::shared_ptr s3ActorsFactory, - bool enableBackgroundLeaseChecks = true); + bool enableBackgroundLeaseChecks = true, + TDuration leaseCheckStartupTimeout = TDuration::Seconds(15)); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index bf2a0c1340ac..bfa7d4f846ac 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -2821,6 +2821,8 @@ class TResetScriptExecutionRetriesQueryActor : public TQueryBase { DECLARE $execution_id AS Text; SELECT + operation_status, + finalization_status, retry_state FROM `.metadata/script_executions` WHERE database = $database AND execution_id = $execution_id AND @@ -2854,6 +2856,10 @@ class TResetScriptExecutionRetriesQueryActor : public TQueryBase { // Script execution info if (NYdb::TResultSetParser result(ResultSets[0]); result.TryNextRow()) { + if (!result.ColumnParser("operation_status").GetOptionalInt32() || result.ColumnParser("finalization_status").GetOptionalInt32()) { + return Finish(Ydb::StatusIds::PRECONDITION_FAILED, "Can not reset retray state then operation is running or not finalized"); + } + if (const auto& serializedRetryState = result.ColumnParser("retry_state").GetOptionalJsonDocument()) { NJson::TJsonValue value; if (!NJson::ReadJsonTree(*serializedRetryState, &value)) { @@ -2869,7 +2875,11 @@ class TResetScriptExecutionRetriesQueryActor : public TQueryBase { // Lease info if (NYdb::TResultSetParser result(ResultSets[1]); result.TryNextRow()) { if (const auto leaseState = result.ColumnParser("lease_state").GetOptionalInt32()) { - DropLease = static_cast(*leaseState) == ELeaseState::WaitRetry; + if (static_cast(*leaseState) != ELeaseState::WaitRetry) { + return Finish(Ydb::StatusIds::PRECONDITION_FAILED, "Can not reset retry state then operation is running or not finalized"); + } + + DropLease = true; } } @@ -2938,8 +2948,24 @@ class TResetScriptExecutionRetriesQueryActor : public TQueryBase { class TCancelScriptExecutionOperationActor : public TActorBootstrapped { using TBase = TActorBootstrapped; + using TRetryPolicy = IRetryPolicy<>; + + enum class EState { + GetScriptExecutionOperationStatus, + ResetRetryPolicy, + WaitQueryExecution, + CancelScriptExecution, + WaitCancelScriptExecution, + }; public: + // Cancellation pipeline: + // 1. Get script execution operation status and: + // - Finalize script execution if lease expired or there is not finished finalization + // 2. If script execution is running now, send cancel request to run script actor [race with script execution finalization and retries] + // 3. If script execution has retry state - reset it and also ensure that script execution is stopped + // 4. If error occurred - retry from step 1, otherwise reply SUCCESS + TCancelScriptExecutionOperationActor(TEvCancelScriptExecutionOperation::TPtr ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TIntrusivePtr counters) : Request(std::move(ev)) , QueryServiceConfig(queryServiceConfig) @@ -2947,8 +2973,11 @@ class TCancelScriptExecutionOperationActor : public TActorBootstrappedGet()->UserSID; - if (const auto& error = CheckScriptExecutionAccess(userSID)) { + Become(&TCancelScriptExecutionOperationActor::StateFunc); + KQP_PROXY_LOG_D("Bootstrap"); + + UserSID = Request->Get()->UserSID; + if (const auto& error = CheckScriptExecutionAccess(UserSID)) { return Reply(Ydb::StatusIds::UNAUTHORIZED, error); } @@ -2959,11 +2988,7 @@ class TCancelScriptExecutionOperationActor : public TActorBootstrappedGet()->Database, ExecutionId, QueryServiceConfig, Counters, false, 0, userSID)); - KQP_PROXY_LOG_D("Bootstrap. Start TCheckLeaseStatusActor " << checkerId); - - Become(&TCancelScriptExecutionOperationActor::StateFunc); + ContinueCancel(); } STRICT_STFUNC(StateFunc, @@ -2973,117 +2998,237 @@ class TCancelScriptExecutionOperationActor : public TActorBootstrappedGet()->Status; status != Ydb::StatusIds::SUCCESS) { const auto& issues = ev->Get()->Issues; - KQP_PROXY_LOG_W("Reset retry state " << ev->Sender << " failed " << status << ", issues: " << issues.ToOneLineString()); - - Reply(status, AddRootIssue("Reset retry state failed", issues, true)); - return; + KQP_PROXY_LOG_W("Check lease " << ev->Sender << " failed " << status << ", issues: " << issues.ToOneLineString()); + return Reply(status, AddRootIssue("Fetch script execution info failed", issues)); } - const auto& checkerId = Register(new TCheckLeaseStatusActor(SelfId(), Request->Get()->Database, ExecutionId, QueryServiceConfig, Counters, false)); - KQP_PROXY_LOG_D("Reset retry state " << ev->Sender << " success, start TCheckLeaseStatusActor " << checkerId); - } - - void Handle(TEvPrivate::TEvLeaseCheckResult::TPtr& ev) { - if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { - RunScriptActor = ev->Get()->RunScriptActorId; - const auto& operationStatus = ev->Get()->OperationStatus; - KQP_PROXY_LOG_D("Check lease " << ev->Sender << " success" << (operationStatus ? ", operation status: " + Ydb::StatusIds::StatusCode_Name(*operationStatus) : "") << ", CancelSent: " << CancelSent); + RunScriptActor = ev->Get()->RunScriptActorId; + HasRetryPolicy = ev->Get()->HasRetryPolicy; + const auto& operationStatus = ev->Get()->OperationStatus; + KQP_PROXY_LOG_D("Check lease " << ev->Sender << " success, operation status: " << (operationStatus ? Ydb::StatusIds::StatusCode_Name(*operationStatus) : "") << ", has retries: " << HasRetryPolicy); - if (ev->Get()->HasRetryPolicy && !RetryStateDropped) { - // Request maybe waiting for retry, reset retry state first - const auto& resetActorId = Register(new TResetScriptExecutionRetriesQueryActor::TRetry(SelfId(), Request->Get()->Database, ExecutionId)); - KQP_PROXY_LOG_D("Start TResetRetryStateRetryActor " << resetActorId); - } else if (operationStatus) { - Reply(Ydb::StatusIds::PRECONDITION_FAILED, "Script execution operation is already finished"); - } else { - if (CancelSent) { // We have not found the actor, but after it status of the operation is not defined, something strage happened. - Reply(Ydb::StatusIds::INTERNAL_ERROR, "Failed to cancel script execution operation after undelivered event"); - } else { - SendCancelToRunScriptActor(); // The race: operation is still working, but it can finish before it receives cancel signal. Try to cancel first and then maybe check its status. - } - } + if (!operationStatus) { + // Cancel running query first, to prevent TLI on retry policy reset + State = EState::CancelScriptExecution; + } else if (HasRetryPolicy) { + // Reset retry policy to prevent further query retries + State = EState::ResetRetryPolicy; } else { - KQP_PROXY_LOG_W("Check lease " << ev->Sender << " failed " << ev->Get()->Status << ", issues: " << ev->Get()->Issues.ToOneLineString()); - Reply(ev->Get()->Status, std::move(ev->Get()->Issues)); + return Reply(Ydb::StatusIds::PRECONDITION_FAILED, "Script execution operation is already finished"); } - } - void SendCancelToRunScriptActor() { - KQP_PROXY_LOG_D("Send cancel request to RunScriptActor"); - ui64 flags = IEventHandle::FlagTrackDelivery; - if (RunScriptActor.NodeId() != SelfId().NodeId()) { - flags |= IEventHandle::FlagSubscribeOnSession; - SubscribedOnSession = RunScriptActor.NodeId(); - } - Send(RunScriptActor, new TEvKqp::TEvCancelScriptExecutionRequest(), flags); - CancelSent = true; + ContinueCancel(); } void Handle(TEvKqp::TEvCancelScriptExecutionResponse::TPtr& ev) { + if (ev->Cookie != CancellationCookie) { + return; + } + NYql::TIssues issues; NYql::IssuesFromMessage(ev->Get()->Record.GetIssues(), issues); - const auto status = ev->Get()->Record.GetStatus(); + if (status != Ydb::StatusIds::SUCCESS && status != Ydb::StatusIds::PRECONDITION_FAILED) { + KQP_PROXY_LOG_W("Script execution cancel failed " << status << " from RunScriptActor: " << ev->Sender << ", issues: " << issues.ToOneLineString()); + return Reply(status, std::move(issues)); + } + KQP_PROXY_LOG_D("Got cancel response " << status << " from RunScriptActor: " << ev->Sender << ", issues: " << issues.ToOneLineString()); - Reply(status, std::move(issues)); + if (HasRetryPolicy) { + // Double check and reset retry policy to prevent further query retries + State = EState::ResetRetryPolicy; + ContinueCancel(); + } else { + Reply(status, std::move(issues)); + } + } + + void Handle(TEvResetScriptExecutionRetriesResponse::TPtr& ev) { + const auto status = ev->Get()->Status; + const auto& issues = ev->Get()->Issues; + if (status != Ydb::StatusIds::SUCCESS && status != Ydb::StatusIds::PRECONDITION_FAILED && status != Ydb::StatusIds::ABORTED) { + KQP_PROXY_LOG_W("Reset retry state " << ev->Sender << " failed " << status << ", issues: " << issues.ToOneLineString()); + Reply(status, AddRootIssue("Reset retry state failed", issues)); + return; + } + + KQP_PROXY_LOG_D("Reset retry state " << ev->Sender << " finished " << status << ", issues: " << issues.ToOneLineString()); + + if (status == Ydb::StatusIds::SUCCESS) { + return Reply(Ydb::StatusIds::SUCCESS); + } + + State = EState::GetScriptExecutionOperationStatus; + if (const NYql::TIssues resultIssues = AddRootIssue("Failed to cancel script execution, query execution is under retry", issues); !ScheduleRetry(resultIssues)) { + // Race with script execution retry + Reply(status, resultIssues); + } } void Handle(TEvents::TEvUndelivered::TPtr& ev) { - if (ev->Get()->Reason == TEvents::TEvUndelivered::ReasonActorUnknown) { // The actor probably had finished before our cancel message arrived. - const auto& checkerId = Register(new TCheckLeaseStatusActor(SelfId(), Request->Get()->Database, ExecutionId, QueryServiceConfig, Counters, false)); // Check if the operation has finished. - KQP_PROXY_LOG_I("Got delivery problem to RunScriptActor: " << ev->Sender << ", maybe already finished, start lease check " << checkerId); + const auto reason = ev->Get()->Reason; + KQP_PROXY_LOG_W("Delivery failed " << reason << " to RunScriptActor: " << ev->Sender); + + if (State != EState::WaitCancelScriptExecution || ev->Sender != RunScriptActor) { + return; + } + + if (reason == TEvents::TEvUndelivered::ReasonActorUnknown) { + // The actor probably had finished before our cancel message arrived + State = EState::GetScriptExecutionOperationStatus; } else { - KQP_PROXY_LOG_W("Delivery failed " << ev->Get()->Reason << " to RunScriptActor: " << ev->Sender); - Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver cancel request to destination"); + State = EState::CancelScriptExecution; + } + + if (const TString message(TStringBuilder() << "Failed to deliver cancel request to destination (delivery problem, reason: " << reason << ")"); !ScheduleRetry(message)) { + Reply(Ydb::StatusIds::UNAVAILABLE, message); } } void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { - KQP_PROXY_LOG_W("Delivery failed to RunScriptActor, node " << ev->Get()->NodeId << " disconnected"); - Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver cancel request to destination"); + const auto nodeId = ev->Get()->NodeId; + KQP_PROXY_LOG_W("Delivery failed to RunScriptActor, node " << nodeId << " disconnected"); + + if (State != EState::WaitCancelScriptExecution || !RunScriptActor || nodeId != RunScriptActor.NodeId()) { + return; + } + + State = EState::CancelScriptExecution; + if (const TString message("Failed to deliver cancel request to destination (node disconnected)"); !ScheduleRetry(message)) { + Reply(Ydb::StatusIds::UNAVAILABLE, message); + } } void PassAway() override { + ResetSessionSubscribe(); + TBase::PassAway(); + } + +private: + void ResetSessionSubscribe() { if (SubscribedOnSession) { Send(TActivationContext::InterconnectProxy(*SubscribedOnSession), new TEvents::TEvUnsubscribe()); + SubscribedOnSession = Nothing(); } - TBase::PassAway(); } -private: + void ContinueCancel() { + WaitRetry = false; + KQP_PROXY_LOG_D("Continue cancel, State: " << static_cast(State)); + + switch (State) { + case EState::GetScriptExecutionOperationStatus: { + State = EState::WaitQueryExecution; + const auto& checkerId = Register(new TCheckLeaseStatusActor(SelfId(), Request->Get()->Database, ExecutionId, QueryServiceConfig, Counters, /* canRetry */ false, 0, UserSID)); + KQP_PROXY_LOG_D("Start TCheckLeaseStatusActor " << checkerId); + break; + } + case EState::ResetRetryPolicy: { + State = EState::WaitQueryExecution; + const auto& resetActorId = Register(new TResetScriptExecutionRetriesQueryActor::TRetry(SelfId(), Request->Get()->Database, ExecutionId)); + KQP_PROXY_LOG_D("Start TResetRetryStateRetryActor " << resetActorId); + break; + } + case EState::CancelScriptExecution: { + State = EState::WaitCancelScriptExecution; + SendCancelToRunScriptActor(); + break; + } + case EState::WaitQueryExecution: + case EState::WaitCancelScriptExecution: { + break; + } + } + } + + void SendCancelToRunScriptActor() { + CancellationCookie++; + KQP_PROXY_LOG_D("Send cancel request to RunScriptActor, CancellationCookie: " << CancellationCookie); + ResetSessionSubscribe(); + + ui64 flags = IEventHandle::FlagTrackDelivery; + if (RunScriptActor.NodeId() != SelfId().NodeId()) { + flags |= IEventHandle::FlagSubscribeOnSession; + SubscribedOnSession = RunScriptActor.NodeId(); + } + + Send(RunScriptActor, new TEvKqp::TEvCancelScriptExecutionRequest(), flags, CancellationCookie); + } + TString LogPrefix() const { return TStringBuilder() << "[TCancelScriptExecutionOperationActor] OwnerId: " << Request->Sender << " ActorId: " << SelfId() << " Database: " << Request->Get()->Database << " ExecutionId: " << ExecutionId << " RunScriptActor: " << RunScriptActor << ". "; } - void Reply(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) { - KQP_PROXY_LOG_D("Reply " << status << ", issues: " << issues.ToOneLineString()); - Send(Request->Sender, new TEvCancelScriptExecutionOperationResponse(status, std::move(issues))); + bool ScheduleRetry(const NYql::TIssues& issues) { + if (WaitRetry) { + return true; + } + + if (!RetryState) { + RetryState = TRetryPolicy::GetExponentialBackoffPolicy( + []() { + return ERetryErrorClass::ShortRetry; + }, + TDuration::MilliSeconds(100), + TDuration::MilliSeconds(100), + TDuration::Seconds(1), + 10 + )->CreateRetryState(); + } + + if (const auto delay = RetryState->GetNextRetryDelay()) { + KQP_PROXY_LOG_W("Schedule retry for error: " << issues.ToOneLineString() << " in " << *delay); + Issues.AddIssues(std::move(issues)); + Schedule(*delay, new TEvents::TEvWakeup()); + WaitRetry = true; + return true; + } + + return false; + } + + bool ScheduleRetry(const TString& message) { + return ScheduleRetry({NYql::TIssue(message)}); + } + + void Reply(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues = {}) { + Issues.AddIssues(std::move(issues)); + + if (status == Ydb::StatusIds::SUCCESS) { + KQP_PROXY_LOG_D("Reply success, issues: " << Issues.ToOneLineString()); + } else { + KQP_PROXY_LOG_W("Reply failed, status: " << status << ", issues: " << Issues.ToOneLineString()); + } + + Send(Request->Sender, new TEvCancelScriptExecutionOperationResponse(status, std::move(Issues))); PassAway(); } void Reply(Ydb::StatusIds::StatusCode status, const TString& message) { - NYql::TIssues issues; - issues.AddIssue(message); - Reply(status, std::move(issues)); + Reply(status, {NYql::TIssue(message)}); } private: const TEvCancelScriptExecutionOperation::TPtr Request; const NKikimrConfig::TQueryServiceConfig QueryServiceConfig; const TIntrusivePtr Counters; + std::optional UserSID; TString ExecutionId; TActorId RunScriptActor; + bool HasRetryPolicy = false; + ui64 CancellationCookie = 0; TMaybe SubscribedOnSession; - bool RetryStateDropped = false; - bool CancelSent = false; + TRetryPolicy::IRetryState::TPtr RetryState; + EState State = EState::GetScriptExecutionOperationStatus; + bool WaitRetry = false; + NYql::TIssues Issues; }; class TSaveScriptExecutionResultMetaQuery : public TQueryBase { @@ -3840,7 +3985,7 @@ class TSaveScriptFinalStatusActor : public TQueryBase { script_sinks, script_secret_names, retry_state, - graph_compressed + graph_compressed IS NOT NULL AS has_graph FROM `.metadata/script_executions` WHERE database = $database AND execution_id = $execution_id AND (expire_at > CurrentUtcTimestamp() OR expire_at IS NULL); @@ -3971,12 +4116,9 @@ class TSaveScriptFinalStatusActor : public TQueryBase { OperationTtl = GetDuration(meta.GetOperationTtl()); LeaseDuration = GetDuration(meta.GetLeaseDuration()); - if (meta.GetSaveQueryPhysicalGraph()) { + if (meta.GetSaveQueryPhysicalGraph() && !result.ColumnParser("has_graph").GetBool()) { // Disable retries if state not saved - const auto& graph = result.ColumnParser("graph_compressed").GetOptionalString(); - if (!graph) { - RetryState.ClearRetryPolicyMapping(); - } + RetryState.ClearRetryPolicyMapping(); } } @@ -4857,12 +4999,13 @@ class TGetScriptExecutionPhysicalGraphActor : public TQueryBase { const TCompressor compressor(*compressionMethod); const auto& graph = compressor.Decompress(*graphCompressed); - - if (!PhysicalGraph.ParseFromString(graph)) { + NKikimrKqp::TQueryPhysicalGraph graphProto; + if (!graphProto.ParseFromString(graph)) { Finish(Ydb::StatusIds::INTERNAL_ERROR, "Query physical graph is corrupted"); return; } + PhysicalGraph = std::move(graphProto); Finish(); } @@ -4873,7 +5016,7 @@ class TGetScriptExecutionPhysicalGraphActor : public TQueryBase { private: const TString Database; const TString ExecutionId; - NKikimrKqp::TQueryPhysicalGraph PhysicalGraph; + std::optional PhysicalGraph; i64 LeaseGeneration = 0; }; diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp index 3fd79d8af513..097a6aa56559 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp @@ -672,7 +672,7 @@ Y_UNIT_TEST_SUITE(ScriptExecutionsTest) { // Wait background finalization Sleep(TestLeaseDuration); - ydb.GetRuntime()->Register(CreateKqpFinalizeScriptService({}, nullptr, nullptr, true)); + ydb.GetRuntime()->Register(CreateKqpFinalizeScriptService({}, nullptr, nullptr, true, TDuration::Zero())); ydb.WaitOperationStatus(executionId, Ydb::StatusIds::UNAVAILABLE); ydb.CheckLeaseExistence(executionId, false, Ydb::StatusIds::UNAVAILABLE); diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp index 4f94640cdd74..6f18828f5153 100644 --- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp +++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp @@ -248,7 +248,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped { ev->SetDisableDefaultTimeout(DisableDefaultTimeout); ev->SetUserRequestContext(UserRequestContext); if (PhysicalGraph) { - ev->SetQueryPhysicalGraph(std::move(*PhysicalGraph)); + ev->SetQueryPhysicalGraph(*PhysicalGraph); } if (ev->Record.GetRequest().GetCollectStats() >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL) { ev->SetProgressStatsPeriod(ProgressStatsPeriod ? ProgressStatsPeriod : TDuration::MilliSeconds(QueryServiceConfig.GetProgressStatsPeriodMs())); @@ -325,7 +325,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped { void TerminateActorExecution(Ydb::StatusIds::StatusCode replyStatus, const NYql::TIssues& replyIssues) { LOG_I("Script execution finalized, cancel response status: " << replyStatus << ", issues: " << replyIssues.ToOneLineString()); for (auto& req : CancelRequests) { - Send(req->Sender, new TEvKqp::TEvCancelScriptExecutionResponse(replyStatus, replyIssues)); + Send(req->Sender, new TEvKqp::TEvCancelScriptExecutionResponse(replyStatus, replyIssues), 0, req->Cookie); } PassAway(); } @@ -574,7 +574,13 @@ class TRunScriptActor : public NActors::TActorBootstrapped { } void Handle(TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) { - LOG_T("Got script progress from " << ev->Sender); + const bool isExecuting = IsExecuting(); + LOG_T("Got script progress from " << ev->Sender << ", isExecuting: " << isExecuting); + + if (!isExecuting) { + return; + } + const auto& record = ev->Get()->Record; QueryPlan = record.GetQueryPlan(); QueryAst = record.GetQueryAst(); @@ -726,13 +732,13 @@ class TRunScriptActor : public NActors::TActorBootstrapped { CancelRequests.emplace_front(std::move(ev)); break; case ERunState::Cancelled: - Send(ev->Sender, new TEvKqp::TEvCancelScriptExecutionResponse(Ydb::StatusIds::PRECONDITION_FAILED, "Already cancelled")); + Send(ev->Sender, new TEvKqp::TEvCancelScriptExecutionResponse(Ydb::StatusIds::PRECONDITION_FAILED, "Already cancelled"), 0, ev->Cookie); break; case ERunState::Finishing: CancelRequests.emplace_front(std::move(ev)); break; case ERunState::Finished: - Send(ev->Sender, new TEvKqp::TEvCancelScriptExecutionResponse(Ydb::StatusIds::PRECONDITION_FAILED, "Already finished")); + Send(ev->Sender, new TEvKqp::TEvCancelScriptExecutionResponse(Ydb::StatusIds::PRECONDITION_FAILED, "Already finished"), 0, ev->Cookie); break; } } diff --git a/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp b/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp index b9b183635646..8bf124b7b259 100644 --- a/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp @@ -135,11 +135,6 @@ class TStreamingTestFixture : public NUnitTest::TBaseFixture { .UseLocalCheckpointsInStreamingQueries = true, }); - if (GetTestParam("DEFAULT_LOG", "enabled") == "enabled") { - auto& runtime = *Kikimr->GetTestServer().GetRuntime(); - runtime.SetLogPriority(NKikimrServices::STREAMS_STORAGE_SERVICE, NLog::PRI_DEBUG); - runtime.SetLogPriority(NKikimrServices::STREAMS_CHECKPOINT_COORDINATOR, NLog::PRI_DEBUG); - } Kikimr->GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableSchemaSecrets(UseSchemaSecrets()); Kikimr->GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableStreamingQueries(true); } @@ -577,7 +572,10 @@ class TStreamingTestFixture : public NUnitTest::TBaseFixture { UNIT_ASSERT_C(graph, "Empty graph response"); UNIT_ASSERT_VALUES_EQUAL_C(graph->Get()->Status, Ydb::StatusIds::SUCCESS, graph->Get()->Issues.ToOneLineString()); - return graph->Get()->PhysicalGraph; + const auto& graphProto = graph->Get()->PhysicalGraph; + UNIT_ASSERT(graphProto); + + return *graphProto; } void CheckScriptExecutionsCount(ui64 expectedExecutionsCount, ui64 expectedLeasesCount) { @@ -907,6 +905,66 @@ class TStreamingSysViewTestFixture : public TStreamingTestFixture { TString OutputTopic; }; +class TTestTopicLoader : public TActorBootstrapped { +public: + TTestTopicLoader(const TString& endpoint, const TString& database, const TString& topic, NThreading::TFuture feature) + : Client(TDriver(TDriverConfig() + .SetEndpoint(endpoint) + .SetDatabase(database) + )) + , WriteSession(Client.CreateWriteSession(NTopic::TWriteSessionSettings().Path(topic))) + , Feature(feature) + , Message(1_KB, 'x') + {} + + STRICT_STFUNC(StateFunc, + sFunc(TEvents::TEvWakeup, WriteMessages) + ) + + void Bootstrap() { + Become(&TTestTopicLoader::StateFunc); + Schedule(Timeout, new TEvents::TEvWakeup()); + WriteMessages(); + } + + void WriteMessages() { + if (Feature.HasValue() || Timeout <= TInstant::Now()) { + PassAway(); + WriteSession->Close(TDuration::Zero()); + return; + } + + const auto event = WriteSession->GetEvent(); + if (!event) { + WriteSession->WaitEvent().Subscribe([actorSystem = ActorContext().ActorSystem(), selfId = SelfId()](const auto&) { + actorSystem->Send(selfId, new TEvents::TEvWakeup()); + }); + return; + } + + if (std::holds_alternative(*event)) { + const auto& status = std::get(*event); + UNIT_FAIL(status.GetStatus() << ", issues: " << status.GetIssues().ToOneLineString()); + } + + if (std::holds_alternative(*event)) { + WriteSession->Write( + std::move(std::get(*event).ContinuationToken), + Message + ); + } + + Schedule(TDuration::Zero(), new TEvents::TEvWakeup()); + } + +private: + NTopic::TTopicClient Client; + const std::shared_ptr WriteSession; + const NThreading::TFuture Feature; + const TString Message; + const TInstant Timeout = TInstant::Now() + TDuration::Seconds(60); +}; + } // anonymous namespace Y_UNIT_TEST_SUITE(KqpFederatedQueryDatastreams) { @@ -2715,6 +2773,88 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) { testNoAccess(); } + Y_UNIT_TEST_F(OffsetsRecoveryAfterManualAndInternalRetry, TStreamingTestFixture) { + constexpr char inputTopicName[] = "offsetsRecoveryAfterManualAndInternalRetry,InputTopic"; + constexpr char outputTopicName[] = "offsetsRecoveryAfterManualAndInternalRetry,OutputTopic"; + CreateTopic(inputTopicName); + CreateTopic(outputTopicName); + + constexpr char pqSourceName[] = "sourceName"; + CreatePqSource(pqSourceName); + + constexpr char consumerName[] = "unknownConsumer"; + constexpr char queryName[] = "streamingQuery"; + ExecQuery(fmt::format(R"( + CREATE STREAMING QUERY `{query_name}` AS + DO BEGIN + PRAGMA pq.Consumer = "{consumer_name}"; + INSERT INTO `{pq_source}`.`{output_topic}` + SELECT * FROM `{pq_source}`.`{input_topic}` + END DO;)", + "query_name"_a = queryName, + "pq_source"_a = pqSourceName, + "input_topic"_a = inputTopicName, + "output_topic"_a = outputTopicName, + "consumer_name"_a = consumerName + )); + + WaitFor(TDuration::Seconds(10), "Wait fail", [&](TString& error) { + const auto& result = ExecQuery("SELECT Issues FROM `.sys/streaming_queries`"); + UNIT_ASSERT_VALUES_EQUAL(result.size(), 1); + + TString issues; + CheckScriptResult(result[0], 1, 1, [&](TResultSetParser& resultSet) { + issues = resultSet.ColumnParser("Issues").GetOptionalUtf8().value_or(""); + }); + + error = TStringBuilder() << "Query issues: " << issues; + return issues.Contains("no read rule provided for consumer 'unknownConsumer' in topic"); + }); + + ExecExternalQuery(fmt::format(R"( + ALTER TOPIC `{input_topic}` ADD CONSUMER `{consumer_name}`;)", + "input_topic"_a = inputTopicName, + "consumer_name"_a = consumerName + )); + + WaitFor(TDuration::Seconds(10), "Wait fail", [&](TString& error) { + const auto& result = ExecQuery("SELECT Status FROM `.sys/streaming_queries`"); + UNIT_ASSERT_VALUES_EQUAL(result.size(), 1); + + TString status; + CheckScriptResult(result[0], 1, 1, [&](TResultSetParser& resultSet) { + status = *resultSet.ColumnParser("Status").GetOptionalUtf8(); + }); + + error = TStringBuilder() << "Query status: " << status; + return status == "RUNNING"; + }); + + Sleep(TDuration::Seconds(1)); + WriteTopicMessage(inputTopicName, R"({"key": "key1", "value": "value1"})"); + ReadTopicMessage(outputTopicName, R"({"key": "key1", "value": "value1"})"); + Sleep(TDuration::Seconds(1)); + + ExecQuery(fmt::format(R"( + ALTER STREAMING QUERY `{query_name}` SET ( + RUN = FALSE + );)", + "query_name"_a = queryName + )); + + const auto disposition = TInstant::Now(); + WriteTopicMessage(inputTopicName, R"({"key": "key2", "value": "value2"})"); + + ExecQuery(fmt::format(R"( + ALTER STREAMING QUERY `{query_name}` SET ( + RUN = TRUE + );)", + "query_name"_a = queryName + )); + + ReadTopicMessage(outputTopicName, R"({"key": "key2", "value": "value2"})", disposition); + } + Y_UNIT_TEST_F(OffsetsAndStateRecoveryOnInternalRetry, TStreamingTestFixture) { QueryClientSettings = TClientSettings(); @@ -3394,7 +3534,7 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) { CreatePqSource(pqSourceName); for (const bool rowTables : {true, false}) { - const auto inputTopicName = TStringBuilder() << "writingInLocalYdbWithLimitInputTopicName" << rowTables; + const auto inputTopicName = TStringBuilder() << "writingInLocalYdbTablesWithProjection" << rowTables; CreateTopic(inputTopicName); const auto ydbTable = TStringBuilder() << "tableSink" << rowTables; @@ -3444,6 +3584,72 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) { CheckScriptExecutionsCount(0, 0); } } + + Y_UNIT_TEST_F(DropStreamingQueryUnderLoad, TStreamingTestFixture) { + LogSettings.Freeze = true; + SetupAppConfig().MutableQueryServiceConfig()->SetProgressStatsPeriodMs(1); + + constexpr char inputTopicName[] = "inputTopic"; + constexpr char outputTopicName[] = "outputTopic"; + constexpr char pqSourceName[] = "pqSource"; + ExecQuery(fmt::format(R"( + CREATE TOPIC `{input_topic}` WITH ( + min_active_partitions = 100, + partition_count_limit = 100 + ); + CREATE TOPIC `{output_topic}` WITH ( + min_active_partitions = 100, + partition_count_limit = 100 + ); + CREATE EXTERNAL DATA SOURCE `{pq_source}` WITH ( + SOURCE_TYPE = "Ydb", + LOCATION = "{pq_location}", + DATABASE_NAME = "{pq_database_name}", + AUTH_METHOD = "NONE" + );)", + "input_topic"_a = inputTopicName, + "output_topic"_a = outputTopicName, + "pq_source"_a = pqSourceName, + "pq_location"_a = GetKikimrRunner()->GetEndpoint(), + "pq_database_name"_a = "/Root" + )); + + const auto queryName = "streamingQuery"; + ExecQuery(fmt::format(R"( + CREATE STREAMING QUERY `{query_name}` AS + DO BEGIN + PRAGMA ydb.MaxTasksPerStage = "100"; + PRAGMA ydb.OverridePlanner = @@ [ + {{ "tx": 0, "stage": 0, "tasks": 100 }} + ] @@; + INSERT INTO `{pq_source}`.`{output_topic}` + SELECT * FROM `{pq_source}`.`{input_topic}` + END DO;)", + "query_name"_a = queryName, + "pq_source"_a = pqSourceName, + "input_topic"_a = inputTopicName, + "output_topic"_a = outputTopicName + )); + + auto promise = NThreading::NewPromise(); + Y_DEFER { + promise.SetValue(); + }; + + for (ui32 i = 0; i < 10; ++i) { + GetRuntime().Register(new TTestTopicLoader(GetKikimrRunner()->GetEndpoint(), "/Root", inputTopicName, promise.GetFuture())); + } + + Sleep(TDuration::Seconds(2)); + CheckScriptExecutionsCount(1, 1); + + ExecQuery(fmt::format(R"( + DROP STREAMING QUERY `{query_name}`;)", + "query_name"_a = queryName + )); + + CheckScriptExecutionsCount(0, 0); + } } Y_UNIT_TEST_SUITE(KqpStreamingQueriesSysView) { diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index e4ff215b7bac..d2f3d6ecf9a4 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -1385,7 +1385,8 @@ namespace Tests { Settings->AppConfig->GetQueryServiceConfig(), federatedQuerySetupFactory, Settings->S3ActorsFactory, - Settings->EnableScriptExecutionBackgroundChecks + Settings->EnableScriptExecutionBackgroundChecks, + TDuration::Zero() ); TActorId scriptFinalizeServiceId = Runtime->Register(scriptFinalizeService, nodeIdx, userPoolId); Runtime->RegisterService(NKqp::MakeKqpFinalizeScriptServiceId(Runtime->GetNodeId(nodeIdx)), scriptFinalizeServiceId, nodeIdx); diff --git a/ydb/tests/fq/streaming/common.py b/ydb/tests/fq/streaming/common.py new file mode 100644 index 000000000000..540210bcc6cc --- /dev/null +++ b/ydb/tests/fq/streaming/common.py @@ -0,0 +1,64 @@ +import os +import logging + +from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator +from ydb.tests.library.harness.kikimr_runner import KiKiMR +from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase + +import yatest.common +import ydb + +logger = logging.getLogger(__name__) + + +class YdbClient: + def __init__(self, endpoint: str, database: str): + driver_config = ydb.DriverConfig(endpoint, database, auth_token="root@builtin") + self.driver = ydb.Driver(driver_config) + self.session_pool = ydb.QuerySessionPool(self.driver) + + def stop(self): + self.session_pool.stop() + self.driver.stop() + + def wait_connection(self, timeout: int = 5): + self.driver.wait(timeout, fail_fast=True) + + def query(self, statement: str): + return self.session_pool.execute_with_retries(statement) + + def query_async(self, statement: str): + return self.session_pool.execute_with_retries_async(statement) + + +class Kikimr: + def __init__(self, config: KikimrConfigGenerator, timeout_seconds: int = 240): + ydb_path = yatest.common.build_path(os.environ.get("YDB_DRIVER_BINARY")) + logger.info(yatest.common.execute([ydb_path, "-V"], wait=True).stdout.decode("utf-8")) + + self.cluster = KiKiMR(config) + self.cluster.start(timeout_seconds=timeout_seconds) + + first_node = list(self.cluster.nodes.values())[0] + self.ydb_client = YdbClient( + database=f"/{config.domain_name}", + endpoint=f"grpc://{first_node.host}:{first_node.port}" + ) + self.ydb_client.wait_connection() + + def stop(self): + self.ydb_client.stop() + self.cluster.stop() + + +class StreamingTestBase(TestYdsBase): + def create_source(self, kikimr: Kikimr, source_name: str, shared: bool = False): + kikimr.ydb_client.query(f""" + CREATE EXTERNAL DATA SOURCE `{source_name}` WITH ( + SOURCE_TYPE = "Ydb", + LOCATION = "{os.getenv("YDB_ENDPOINT")}", + DATABASE_NAME = "{os.getenv("YDB_DATABASE")}", + SHARED_READING = "{shared}", + AUTH_METHOD = "NONE" + ); + """) diff --git a/ydb/tests/fq/streaming/conftest.py b/ydb/tests/fq/streaming/conftest.py index 38cca5269026..3c8dafcdf590 100644 --- a/ydb/tests/fq/streaming/conftest.py +++ b/ydb/tests/fq/streaming/conftest.py @@ -1,45 +1,16 @@ import os -import logging +import random +import string +from ydb.tests.fq.streaming.common import Kikimr from ydb.tests.library.common.types import Erasure -from ydb.tests.library.harness.kikimr_runner import KiKiMR from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator -import ydb -import yatest.common import pytest -logger = logging.getLogger(__name__) - - -class YdbClient: - def __init__(self, endpoint: str, database: str): - driver_config = ydb.DriverConfig(endpoint, database, auth_token='root@builtin') - self.driver = ydb.Driver(driver_config) - self.session_pool = ydb.QuerySessionPool(self.driver) - - def stop(self): - self.session_pool.stop() - self.driver.stop() - - def wait_connection(self, timeout=5): - self.driver.wait(timeout, fail_fast=True) - - def query(self, statement): - return self.session_pool.execute_with_retries(statement) - - def query_async(self, statement): - return self.session_pool.execute_with_retries_async(statement) - @pytest.fixture(scope="module") def kikimr(request): - - class Kikimr: - def __init__(self, client, cluster): - self.YdbClient = client - self.Cluster = cluster - def get_ydb_config(): config = KikimrConfigGenerator( erasure=Erasure.MIRROR_3_DC, @@ -48,7 +19,18 @@ def get_ydb_config(): "enable_streaming_queries": True, "enable_streaming_queries_counters": True }, - query_service_config={"available_external_data_sources": ["Ydb"]}, + query_service_config={ + "available_external_data_sources": ["ObjectStorage", "Ydb", "YdbTopics"], + "enable_match_recognize": True, + "streaming_queries": { + "external_storage": { + "database_connection": { + "endpoint": os.getenv("YDB_ENDPOINT"), + "database": os.getenv("YDB_DATABASE") + } + } + } + }, table_service_config={}, default_clusteradmin="root@builtin", use_in_memory_pdisks=False @@ -56,33 +38,21 @@ def get_ydb_config(): config.yaml_config["log_config"]["default_level"] = 8 - query_service_config = config.yaml_config.setdefault("query_service_config", {}) - query_service_config["available_external_data_sources"] = ["ObjectStorage", "Ydb", "YdbTopics"] - query_service_config["enable_match_recognize"] = True - - database_connection = query_service_config.setdefault("streaming_queries", {}).setdefault("external_storage", {}).setdefault("database_connection", {}) - database_connection["endpoint"] = os.getenv("YDB_ENDPOINT") - database_connection["database"] = os.getenv("YDB_DATABASE") - return config - ydb_path = yatest.common.build_path(os.environ.get("YDB_DRIVER_BINARY")) - logger.info(yatest.common.execute([ydb_path, "-V"], wait=True).stdout.decode("utf-8")) - os.environ["YDB_TEST_DEFAULT_CHECKPOINTING_PERIOD_MS"] = "200" os.environ["YDB_TEST_LEASE_DURATION_SEC"] = "5" - config = get_ydb_config() - cluster = KiKiMR(config) - cluster.start() + kikimr = Kikimr(get_ydb_config()) + yield kikimr + kikimr.stop() + + +@pytest.fixture +def entity_name(request): + suffix = ''.join(random.choices(string.ascii_letters + string.digits, k=8)) - node = cluster.nodes[1] - ydb_client = YdbClient( - database=f"/{config.domain_name}", - endpoint=f"grpc://{node.host}:{node.port}" - ) - ydb_client.wait_connection() + def entity_name_wrapper(name: str) -> str: + return f"{name}_{suffix}" - yield Kikimr(ydb_client, cluster) - ydb_client.stop() - cluster.stop() + return entity_name_wrapper diff --git a/ydb/tests/fq/streaming/test_streaming.py b/ydb/tests/fq/streaming/test_streaming.py index 5e4d78241fa4..a1ad4e9f76a9 100644 --- a/ydb/tests/fq/streaming/test_streaming.py +++ b/ydb/tests/fq/streaming/test_streaming.py @@ -1,31 +1,19 @@ import logging -import os import time import random import string -from ydb.tests.tools.fq_runner.kikimr_runner import plain_or_under_sanitizer_wrapper - -from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase +from ydb.tests.fq.streaming.common import StreamingTestBase from ydb.tests.tools.fq_runner.kikimr_metrics import load_metrics +from ydb.tests.tools.fq_runner.kikimr_runner import plain_or_under_sanitizer_wrapper from ydb.tests.tools.datastreams_helpers.control_plane import create_read_rule logger = logging.getLogger(__name__) -class TestStreamingInYdb(TestYdsBase): - - def create_source(self, kikimr, sourceName, shared=False): - kikimr.YdbClient.query(f""" - CREATE EXTERNAL DATA SOURCE `{sourceName}` WITH ( - SOURCE_TYPE="Ydb", - LOCATION="{os.getenv("YDB_ENDPOINT")}", - DATABASE_NAME="{os.getenv("YDB_DATABASE")}", - SHARED_READING="{shared}", - AUTH_METHOD="NONE");""") - +class TestStreamingInYdb(StreamingTestBase): def monitoring_endpoint(self, kikimr, node_id=None): - node = kikimr.Cluster.nodes[node_id] + node = kikimr.cluster.nodes[node_id] return f"http://localhost:{node.mon_port}" def get_sensors(self, kikimr, node_id, counters): @@ -35,7 +23,7 @@ def get_sensors(self, kikimr, node_id, counters): def get_checkpoint_coordinator_metric(self, kikimr, path, metric_name, expect_counters_exist=False): sum = 0 found = False - for node_id in kikimr.Cluster.nodes: + for node_id in kikimr.cluster.nodes: sensor = self.get_sensors(kikimr, node_id, "kqp").find_sensor( { "path": path, @@ -69,38 +57,38 @@ def get_actor_count(self, kikimr, node_id, activity): {"activity": activity, "sensor": "ActorsAliveByActivity", "execpool": "User"}) return result if result is not None else 0 - def test_read_topic(self, kikimr): - sourceName = "test_read_topic" + ''.join(random.choices(string.ascii_letters + string.digits, k=8)) - self.init_topics(sourceName, create_output=False) + def test_read_topic(self, kikimr, entity_name): + source_name = entity_name("test_read_topic") + self.init_topics(source_name, create_output=False) - self.create_source(kikimr, sourceName, False) - sql = f"""SELECT time FROM {sourceName}.`{self.input_topic}` + self.create_source(kikimr, source_name) + sql = f"""SELECT time FROM {source_name}.`{self.input_topic}` WITH ( FORMAT="json_each_row", SCHEMA=(time String NOT NULL)) LIMIT 1""" - future = kikimr.YdbClient.query_async(sql) + future = kikimr.ydb_client.query_async(sql) time.sleep(1) data = ['{"time": "lunch time"}'] self.write_stream(data) result_sets = future.result() assert result_sets[0].rows[0]['time'] == b'lunch time' - def test_read_topic_shared_reading_limit(self, kikimr): - sourceName = "test_read_topic_shared_reading_limit" + ''.join(random.choices(string.ascii_letters + string.digits, k=8)) - self.init_topics(sourceName, create_output=False, partitions_count=10) + def test_read_topic_shared_reading_limit(self, kikimr, entity_name): + source_name = entity_name("test_read_topic_shared_reading_limit") + self.init_topics(source_name, create_output=False, partitions_count=10) - self.create_source(kikimr, sourceName, True) - sql = f"""SELECT time FROM {sourceName}.`{self.input_topic}` + self.create_source(kikimr, source_name, True) + sql = f"""SELECT time FROM {source_name}.`{self.input_topic}` WITH ( FORMAT="json_each_row", SCHEMA=(time String NOT NULL)) WHERE time like "%lunch%" LIMIT 1""" - future1 = kikimr.YdbClient.query_async(sql) - future2 = kikimr.YdbClient.query_async(sql) + future1 = kikimr.ydb_client.query_async(sql) + future2 = kikimr.ydb_client.query_async(sql) time.sleep(3) data = ['{"time": "lunch time"}'] self.write_stream(data) @@ -109,10 +97,10 @@ def test_read_topic_shared_reading_limit(self, kikimr): assert result_sets1[0].rows[0]['time'] == b'lunch time' assert result_sets2[0].rows[0]['time'] == b'lunch time' - def test_restart_query(self, kikimr): - sourceName = "test_restart_query" + ''.join(random.choices(string.ascii_letters + string.digits, k=8)) - self.init_topics(sourceName, partitions_count=10) - self.create_source(kikimr, sourceName, False) + def test_restart_query(self, kikimr, entity_name): + source_name = entity_name("test_restart_query") + self.init_topics(source_name, partitions_count=10) + self.create_source(kikimr, source_name, False) name = "test_restart_query" sql = R''' @@ -127,7 +115,7 @@ def test_restart_query(self, kikimr): END DO;''' path = f"/Root/{name}" - kikimr.YdbClient.query(sql.format(query_name=name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic)) + kikimr.ydb_client.query(sql.format(query_name=name, source_name=source_name, input_topic=self.input_topic, output_topic=self.output_topic)) self.wait_completed_checkpoints(kikimr, path) data = ['{"time": "lunch time"}'] @@ -137,22 +125,22 @@ def test_restart_query(self, kikimr): assert self.read_stream(len(expected_data), topic_path=self.output_topic) == expected_data self.wait_completed_checkpoints(kikimr, path) - kikimr.YdbClient.query(f"ALTER STREAMING QUERY `{name}` SET (RUN = FALSE);") + kikimr.ydb_client.query(f"ALTER STREAMING QUERY `{name}` SET (RUN = FALSE);") time.sleep(0.5) data = ['{"time": "next lunch time"}'] expected_data = ['next lunch time'] self.write_stream(data) - kikimr.YdbClient.query(f"ALTER STREAMING QUERY `{name}` SET (RUN = TRUE);") + kikimr.ydb_client.query(f"ALTER STREAMING QUERY `{name}` SET (RUN = TRUE);") assert self.read_stream(len(expected_data), topic_path=self.output_topic) == expected_data - kikimr.YdbClient.query(f"DROP STREAMING QUERY `{name}`;") + kikimr.ydb_client.query(f"DROP STREAMING QUERY `{name}`;") - def test_read_topic_shared_reading_insert_to_topic(self, kikimr): - sourceName = "source3_" + ''.join(random.choices(string.ascii_letters + string.digits, k=8)) - self.init_topics(sourceName, partitions_count=10) - self.create_source(kikimr, sourceName, True) + def test_read_topic_shared_reading_insert_to_topic(self, kikimr, entity_name): + source_name = entity_name("source3_") + self.init_topics(source_name, partitions_count=10) + self.create_source(kikimr, source_name, True) sql = R''' CREATE STREAMING QUERY `{query_name}` AS @@ -167,8 +155,8 @@ def test_read_topic_shared_reading_insert_to_topic(self, kikimr): query_name1 = "test_read_topic_shared_reading_insert_to_topic1" query_name2 = "test_read_topic_shared_reading_insert_to_topic2" - kikimr.YdbClient.query(sql.format(query_name=query_name1, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic)) - kikimr.YdbClient.query(sql.format(query_name=query_name2, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic)) + kikimr.ydb_client.query(sql.format(query_name=query_name1, source_name=source_name, input_topic=self.input_topic, output_topic=self.output_topic)) + kikimr.ydb_client.query(sql.format(query_name=query_name2, source_name=source_name, input_topic=self.input_topic, output_topic=self.output_topic)) path1 = f"/Root/{query_name1}" self.wait_completed_checkpoints(kikimr, path1) @@ -179,8 +167,8 @@ def test_read_topic_shared_reading_insert_to_topic(self, kikimr): self.wait_completed_checkpoints(kikimr, path1) sql = R'''ALTER STREAMING QUERY `{query_name}` SET (RUN = FALSE);''' - kikimr.YdbClient.query(sql.format(query_name=query_name1)) - kikimr.YdbClient.query(sql.format(query_name=query_name2)) + kikimr.ydb_client.query(sql.format(query_name=query_name1)) + kikimr.ydb_client.query(sql.format(query_name=query_name2)) time.sleep(1) @@ -189,18 +177,18 @@ def test_read_topic_shared_reading_insert_to_topic(self, kikimr): self.write_stream(data) sql = R'''ALTER STREAMING QUERY `{query_name}` SET (RUN = TRUE);''' - kikimr.YdbClient.query(sql.format(query_name=query_name1)) - kikimr.YdbClient.query(sql.format(query_name=query_name2)) + kikimr.ydb_client.query(sql.format(query_name=query_name1)) + kikimr.ydb_client.query(sql.format(query_name=query_name2)) assert self.read_stream(len(expected_data), topic_path=self.output_topic) == expected_data sql = R'''DROP STREAMING QUERY `{query_name}`;''' - kikimr.YdbClient.query(sql.format(query_name=query_name1)) - kikimr.YdbClient.query(sql.format(query_name=query_name2)) + kikimr.ydb_client.query(sql.format(query_name=query_name1)) + kikimr.ydb_client.query(sql.format(query_name=query_name2)) - def test_read_topic_shared_reading_restart_nodes(self, kikimr): - sourceName = "source_" + ''.join(random.choices(string.ascii_letters + string.digits, k=8)) - self.init_topics(sourceName, partitions_count=1) - self.create_source(kikimr, sourceName, True) + def test_read_topic_shared_reading_restart_nodes(self, kikimr, entity_name): + source_name = entity_name("source_") + self.init_topics(source_name, partitions_count=1) + self.create_source(kikimr, source_name, True) sql = R''' CREATE STREAMING QUERY `{query_name}` AS @@ -214,7 +202,7 @@ def test_read_topic_shared_reading_restart_nodes(self, kikimr): END DO;''' query_name = "test_read_topic_shared_reading_restart_nodes" - kikimr.YdbClient.query(sql.format(query_name=query_name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic)) + kikimr.ydb_client.query(sql.format(query_name=query_name, source_name=source_name, input_topic=self.input_topic, output_topic=self.output_topic)) path = f"/Root/{query_name}" self.wait_completed_checkpoints(kikimr, path) @@ -224,13 +212,13 @@ def test_read_topic_shared_reading_restart_nodes(self, kikimr): self.wait_completed_checkpoints(kikimr, path) restart_node_id = None - for node_id in kikimr.Cluster.nodes: + for node_id in kikimr.cluster.nodes: count = self.get_actor_count(kikimr, node_id, "DQ_PQ_READ_ACTOR") if count: restart_node_id = node_id - logging.debug(f"Restart node {restart_node_id}") - node = kikimr.Cluster.nodes[restart_node_id] + logger.debug(f"Restart node {restart_node_id}") + node = kikimr.cluster.nodes[restart_node_id] node.stop() node.start() @@ -239,10 +227,10 @@ def test_read_topic_shared_reading_restart_nodes(self, kikimr): assert self.read_stream(len(expected_data), topic_path=self.output_topic) == expected_data self.wait_completed_checkpoints(kikimr, path) - def test_read_topic_restore_state(self, kikimr): - sourceName = "source4_" + ''.join(random.choices(string.ascii_letters + string.digits, k=8)) - self.init_topics(sourceName, partitions_count=1) - self.create_source(kikimr, sourceName, True) + def test_read_topic_restore_state(self, kikimr, entity_name): + source_name = entity_name("source4_") + self.init_topics(source_name, partitions_count=1) + self.create_source(kikimr, source_name, True) sql = R''' CREATE STREAMING QUERY `{query_name}` AS DO BEGIN @@ -271,7 +259,7 @@ def test_read_topic_restore_state(self, kikimr): END DO;''' query_name = "test_read_topic_restore_state" - kikimr.YdbClient.query(sql.format(query_name=query_name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic)) + kikimr.ydb_client.query(sql.format(query_name=query_name, source_name=source_name, input_topic=self.input_topic, output_topic=self.output_topic)) path = f"/Root/{query_name}" self.wait_completed_checkpoints(kikimr, path) @@ -285,13 +273,13 @@ def test_read_topic_restore_state(self, kikimr): self.wait_completed_checkpoints(kikimr, path) restart_node_id = None - for node_id in kikimr.Cluster.nodes: + for node_id in kikimr.cluster.nodes: count = self.get_actor_count(kikimr, node_id, "DQ_PQ_READ_ACTOR") if count: restart_node_id = node_id - logging.debug(f"Restart node {restart_node_id}") - node = kikimr.Cluster.nodes[restart_node_id] + logger.debug(f"Restart node {restart_node_id}") + node = kikimr.cluster.nodes[restart_node_id] node.stop() node.start() @@ -300,10 +288,10 @@ def test_read_topic_restore_state(self, kikimr): expected_data = ['{"a_time":null,"b_time":1696849942500001,"c_time":1696849943000001}'] assert self.read_stream(len(expected_data), topic_path=self.output_topic) == expected_data - def test_json_errors(self, kikimr): - sourceName = "test_json_errors" + ''.join(random.choices(string.ascii_letters + string.digits, k=8)) - self.init_topics(sourceName, partitions_count=10) - self.create_source(kikimr, sourceName, True) + def test_json_errors(self, kikimr, entity_name): + source_name = entity_name("test_json_errors") + self.init_topics(source_name, partitions_count=10) + self.create_source(kikimr, source_name, True) name = "test_json_errors" sql = R''' @@ -318,7 +306,7 @@ def test_json_errors(self, kikimr): END DO;''' path = f"/Root/{name}" - kikimr.YdbClient.query(sql.format(query_name=name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic)) + kikimr.ydb_client.query(sql.format(query_name=name, source_name=source_name, input_topic=self.input_topic, output_topic=self.output_topic)) self.wait_completed_checkpoints(kikimr, path) data = [ @@ -331,10 +319,10 @@ def test_json_errors(self, kikimr): expected = ['hello1', 'hello2'] assert self.read_stream(len(expected), topic_path=self.output_topic) == expected - def test_restart_query_by_rescaling(self, kikimr): - sourceName = 'source' + ''.join(random.choices(string.ascii_letters + string.digits, k=8)) - self.init_topics(sourceName, partitions_count=10) - self.create_source(kikimr, sourceName, True) + def test_restart_query_by_rescaling(self, kikimr, entity_name): + source_name = entity_name('source') + self.init_topics(source_name, partitions_count=10) + self.create_source(kikimr, source_name, True) name = "test_restart_query_by_rescaling" sql = R''' @@ -352,7 +340,7 @@ def test_restart_query_by_rescaling(self, kikimr): END DO;''' path = f"/Root/{name}" - kikimr.YdbClient.query(sql.format(query_name=name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic)) + kikimr.ydb_client.query(sql.format(query_name=name, source_name=source_name, input_topic=self.input_topic, output_topic=self.output_topic)) self.wait_completed_checkpoints(kikimr, path) message_count = 20 @@ -361,8 +349,8 @@ def test_restart_query_by_rescaling(self, kikimr): assert self.read_stream(message_count, topic_path=self.output_topic) == ["time to do it" for i in range(message_count)] self.wait_completed_checkpoints(kikimr, path) - logging.debug(f"stopping query {name}") - kikimr.YdbClient.query(f"ALTER STREAMING QUERY `{name}` SET (RUN = FALSE);") + logger.debug(f"stopping query {name}") + kikimr.ydb_client.query(f"ALTER STREAMING QUERY `{name}` SET (RUN = FALSE);") sql = R'''ALTER STREAMING QUERY `{query_name}` SET ( RUN = TRUE, @@ -380,19 +368,19 @@ def test_restart_query_by_rescaling(self, kikimr): INSERT INTO `{source_name}`.`{output_topic}` SELECT time FROM $in; END DO;''' - kikimr.YdbClient.query(sql.format(query_name=name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic)) + kikimr.ydb_client.query(sql.format(query_name=name, source_name=source_name, input_topic=self.input_topic, output_topic=self.output_topic)) message = '{"time": "time to lunch"}' for i in range(message_count): self.write_stream([message], topic_path=None, partition_key=(''.join(random.choices(string.digits, k=8)))) assert self.read_stream(message_count, topic_path=self.output_topic) == ["time to lunch" for i in range(message_count)] - kikimr.YdbClient.query(f"ALTER STREAMING QUERY `{name}` SET (RUN = FALSE);") + kikimr.ydb_client.query(f"ALTER STREAMING QUERY `{name}` SET (RUN = FALSE);") def test_pragma(self, kikimr): - sourceName = "test_pragma" - self.init_topics(sourceName, partitions_count=10) - self.create_source(kikimr, sourceName) + source_name = "test_pragma" + self.init_topics(source_name, partitions_count=10) + self.create_source(kikimr, source_name) create_read_rule(self.input_topic, self.consumer_name) query_name = "test_pragma1" @@ -409,17 +397,17 @@ def test_pragma(self, kikimr): INSERT INTO {source_name}.`{output_topic}` SELECT time FROM $in; END DO;''' - kikimr.YdbClient.query(sql.format(query_name=query_name, consumer_name=self.consumer_name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic)) + kikimr.ydb_client.query(sql.format(query_name=query_name, consumer_name=self.consumer_name, source_name=source_name, input_topic=self.input_topic, output_topic=self.output_topic)) self.write_stream(['{"time": "lunch time"}']) assert self.read_stream(1, topic_path=self.output_topic) == ['lunch time'] - kikimr.YdbClient.query(f"DROP STREAMING QUERY `{query_name}`") + kikimr.ydb_client.query(f"DROP STREAMING QUERY `{query_name}`") def test_types(self, kikimr): - sourceName = "test_types" - self.init_topics(sourceName, partitions_count=1) + source_name = "test_types" + self.init_topics(source_name, partitions_count=1) - self.create_source(kikimr, sourceName) + self.create_source(kikimr, source_name) query_name = "test_types1" @@ -434,10 +422,10 @@ def test_type(self, kikimr, type, input, expected_output): INSERT INTO {source_name}.`{output_topic}` SELECT CAST(field_name as String) FROM $in; END DO;''' - kikimr.YdbClient.query(sql.format(query_name=query_name, source_name=sourceName, type_name=type, input_topic=self.input_topic, output_topic=self.output_topic)) + kikimr.ydb_client.query(sql.format(query_name=query_name, source_name=source_name, type_name=type, input_topic=self.input_topic, output_topic=self.output_topic)) self.write_stream([f"{{\"field_name\": {input}}}"]) assert self.read_stream(1, topic_path=self.output_topic) == [expected_output] - kikimr.YdbClient.query(f"DROP STREAMING QUERY `{query_name}`") + kikimr.ydb_client.query(f"DROP STREAMING QUERY `{query_name}`") test_type(self, kikimr, type="String", input='"lunch time"', expected_output='lunch time') test_type(self, kikimr, type="Utf8", input='"Relativitätstheorie"', expected_output='Relativitätstheorie') @@ -452,10 +440,10 @@ def test_type(self, kikimr, type, input, expected_output): # test_type(self, kikimr, type="Json", input='{"name": "value"}', expected_output='{"name": "value"}') # test_type(self, kikimr, type="JsonDocument", input='{"name": "value"}', expected_output='lunch time') - def test_raw_format(self, kikimr): - sourceName = "test_restart_query" + ''.join(random.choices(string.ascii_letters + string.digits, k=8)) - self.init_topics(sourceName, partitions_count=10) - self.create_source(kikimr, sourceName, False) + def test_raw_format(self, kikimr, entity_name): + source_name = entity_name("test_restart_query") + self.init_topics(source_name, partitions_count=10) + self.create_source(kikimr, source_name, False) query_name = "test_raw_format_string" sql = R''' @@ -469,7 +457,7 @@ def test_raw_format(self, kikimr): INSERT INTO {source_name}.`{output_topic}` SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow())))) FROM $parsed; END DO;''' path = f"/Root/{query_name}" - kikimr.YdbClient.query(sql.format(query_name=query_name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic)) + kikimr.ydb_client.query(sql.format(query_name=query_name, source_name=source_name, input_topic=self.input_topic, output_topic=self.output_topic)) self.wait_completed_checkpoints(kikimr, path) data = ['{"time": "2020-01-01T13:00:00.000000Z", "value": "lunch time"}'] @@ -477,7 +465,7 @@ def test_raw_format(self, kikimr): self.write_stream(data) assert self.read_stream(len(expected_data), topic_path=self.output_topic) == expected_data - kikimr.YdbClient.query(f"DROP STREAMING QUERY `{query_name}`") + kikimr.ydb_client.query(f"DROP STREAMING QUERY `{query_name}`") query_name = "test_raw_format_default" sql = R''' @@ -488,7 +476,7 @@ def test_raw_format(self, kikimr): INSERT INTO {source_name}.`{output_topic}` SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow())))) FROM $parsed; END DO;''' path = f"/Root/{query_name}" - kikimr.YdbClient.query(sql.format(query_name=query_name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic)) + kikimr.ydb_client.query(sql.format(query_name=query_name, source_name=source_name, input_topic=self.input_topic, output_topic=self.output_topic)) self.wait_completed_checkpoints(kikimr, path) data = ['{"time": "2020-01-01T13:00:00.000000Z", "value": "lunch time"}'] @@ -496,7 +484,7 @@ def test_raw_format(self, kikimr): self.write_stream(data) assert self.read_stream(len(expected_data), topic_path=self.output_topic) == expected_data - kikimr.YdbClient.query(f"DROP STREAMING QUERY `{query_name}`") + kikimr.ydb_client.query(f"DROP STREAMING QUERY `{query_name}`") query_name = "test_raw_format_json" sql = R''' @@ -510,7 +498,7 @@ def test_raw_format(self, kikimr): INSERT INTO {source_name}.`{output_topic}` SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow())))) FROM $parsed; END DO;''' path = f"/Root/{query_name}" - kikimr.YdbClient.query(sql.format(query_name=query_name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic)) + kikimr.ydb_client.query(sql.format(query_name=query_name, source_name=source_name, input_topic=self.input_topic, output_topic=self.output_topic)) self.wait_completed_checkpoints(kikimr, path) data = ['{"time": "2020-01-01T13:00:00.000000Z", "value": "lunch time"}'] @@ -519,4 +507,4 @@ def test_raw_format(self, kikimr): assert self.read_stream(len(expected_data), topic_path=self.output_topic) == expected_data - kikimr.YdbClient.query(f"DROP STREAMING QUERY `{query_name}`") + kikimr.ydb_client.query(f"DROP STREAMING QUERY `{query_name}`") diff --git a/ydb/tests/fq/streaming/test_udfs.py b/ydb/tests/fq/streaming/test_udfs.py new file mode 100644 index 000000000000..b48e58763a07 --- /dev/null +++ b/ydb/tests/fq/streaming/test_udfs.py @@ -0,0 +1,60 @@ +import time + +from ydb.tests.fq.streaming.common import Kikimr, StreamingTestBase +from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator + +import pytest +import yatest.common + + +@pytest.fixture(scope="module") +def kikimr_udfs(request): + config = KikimrConfigGenerator( + extra_feature_flags={"enable_external_data_sources": True}, + query_service_config={"available_external_data_sources": ["Ydb", "YdbTopics"]}, + table_service_config={}, + default_clusteradmin="root@builtin", + use_in_memory_pdisks=True, + udfs_path=yatest.common.build_path("yql/essentials/udfs/common/python/python3_small") + ) + config.yaml_config["log_config"]["default_level"] = 8 + + kikimr = Kikimr(config, timeout_seconds=30) + yield kikimr + kikimr.stop() + + +class TestUdfsUsage(StreamingTestBase): + def test_dynamic_udf(self, kikimr_udfs, entity_name): + source_name = entity_name("test_udfs") + self.init_topics(source_name, create_output=False) + self.create_source(kikimr_udfs, source_name) + + future = kikimr_udfs.ydb_client.query_async(f""" + $script = @@#py +import urllib.parse + +def get_all_cgi_params(url): + params = urllib.parse.parse_qs(urllib.parse.urlparse(url).query) + return {{k: v[0] for k, v in params.items()}} +@@; + + $callable = Python3::get_all_cgi_params( + Callable<(Utf8?)->Dict>, + $script + ); + + SELECT $callable(url)["name"] AS name FROM `{source_name}`.`{self.input_topic}` + WITH ( + FORMAT = "json_each_row", + SCHEMA ( + url Utf8 NOT NULL + ) + ) + LIMIT 1 + """) + + time.sleep(1) + self.write_stream(['{"url": "http://kuibyshevsky.sam.sudrf.ru/modules.php?name=information"}']) + + assert future.result()[0].rows[0]["name"] == "information" diff --git a/ydb/tests/fq/streaming/ya.make b/ydb/tests/fq/streaming/ya.make index 360fc8db4b6a..235604684f12 100644 --- a/ydb/tests/fq/streaming/ya.make +++ b/ydb/tests/fq/streaming/ya.make @@ -6,11 +6,24 @@ TEST_SRCS( test_streaming.py ) +IF (OS_LINUX) + TEST_SRCS( + test_udfs.py + ) +ENDIF() + PY_SRCS( + common.py conftest.py ) -SIZE(MEDIUM) +IF (SANITIZER_TYPE) + SIZE(LARGE) + INCLUDE(${ARCADIA_ROOT}/ydb/tests/large.inc) + REQUIREMENTS(ram:20) +ELSE() + SIZE(MEDIUM) +ENDIF() PEERDIR( ydb/tests/library @@ -25,6 +38,7 @@ PEERDIR( DEPENDS( ydb/apps/ydb ydb/tests/tools/pq_read + yql/essentials/udfs/common/python/python3_small ) END() diff --git a/ydb/tests/library/harness/kikimr_runner.py b/ydb/tests/library/harness/kikimr_runner.py index 3b3d9b221ead..532d6ddd408c 100644 --- a/ydb/tests/library/harness/kikimr_runner.py +++ b/ydb/tests/library/harness/kikimr_runner.py @@ -470,14 +470,14 @@ def __call_ydb_cli(self, cmd, token=None, check_exit_code=True, use_certs=False, )) raise - def start(self): + def start(self, timeout_seconds=240): """ Safely starts kikimr instance. Do not override this method. """ try: logger.debug("Working directory: " + self.__tmpdir) - self.__run() + self.__run(timeout_seconds=timeout_seconds) return self except Exception: @@ -515,7 +515,7 @@ def _bootstrap_cluster(self, self_assembly_uuid="test-cluster", timeout=30, inte time.sleep(interval) raise last_exception - def __run(self): + def __run(self, timeout_seconds=240): self.prepare() for node_id in self.__configurator.all_node_ids(): @@ -530,7 +530,7 @@ def __run(self): bs_needed = ('blob_storage_config' in self.__configurator.yaml_config) or self.__configurator.use_self_management if bs_needed: - self.__wait_for_bs_controller_to_start() + self.__wait_for_bs_controller_to_start(timeout_seconds=timeout_seconds) if not self.__configurator.use_self_management: self.__add_bs_box() @@ -859,13 +859,12 @@ def add_storage_pool(self, name=None, kind="rot", pdisk_user_kind=0, erasure=Non self._bs_config_invoke(request) return name - def __wait_for_bs_controller_to_start(self): + def __wait_for_bs_controller_to_start(self, timeout_seconds=240): monitors = [node.monitor for node in self.nodes.values()] def predicate(): return blobstorage_controller_has_started_on_some_node(monitors) - timeout_seconds = 240 bs_controller_started = wait_for( predicate=predicate, timeout_seconds=timeout_seconds, step_seconds=1.0, multiply=1.3 )