您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 15229|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    4 l  ^( O% I* s5 E# u
  2. require_once __DIR__ . '/Workerman/Autoloader.php';% U9 H: W2 }% }  m4 B- {

  3. & l4 A+ V6 d. r6 D- M
  4. $worker = new Worker();
    9 t! O) a- k7 Y+ S% q
  5. // 4个进程6 G6 T8 \/ g: W& q4 D, g
  6. $worker->count = 4;
    ! \9 Q# _$ k4 k) S
  7. // 每个进程启动后在当前进程新增一个Worker监听( W. o2 n$ m# U( [/ B' u
  8. $worker->onWorkerStart = function($worker); Z8 B: J, j7 ~; B3 h# b: P5 b! L
  9. {
    " v' k; k6 \7 C' i
  10.     /**
    : r4 U$ a8 B. a. l; o! f$ h4 }
  11.      * 4个进程启动的时候都创建2016端口的Worker' L0 d9 O/ y" E# f; A
  12.      * 当执行到worker->listen()时会报Address already in use错误
    6 T* T. X1 d0 x+ H% v7 x$ r/ C
  13.      * 如果worker->count=1则不会报错7 u8 b' a0 J. y- B( i
  14.      */
    $ y2 d3 u& d7 Q9 U  \5 R, P% r1 `
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    6 k3 Y! {6 i+ [- P
  16.     $inner_worker->onMessage = 'on_message';
    % _. @) c) R' g) T5 v3 M
  17.     // 执行监听。这里会报Address already in use错误5 m0 Y6 }7 V  l6 {
  18.     $inner_worker->listen();( x/ K: e' b; e5 b
  19. };, ]" B: n" N6 n1 i. U; f2 k

  20. * C) B5 E+ q: D0 F  o5 z
  21. $worker->onMessage = 'on_message';# d, V" x- |- U: r. X
  22. $ [, ?7 c& h' s8 F( F
  23. function on_message($connection, $data)( y: {7 b/ Q* m' @6 _6 ~' s
  24. {
    $ ?1 Z  o  _5 o2 ^6 z' f* @
  25.     $connection->send("hello\n");
    5 j. ~  I# ?, z5 J3 m+ |9 h
  26. }
    3 D( e: k3 J: Q/ |
  27. 0 F0 {8 `0 S/ g9 B* t7 Z
  28. // 运行worker
    , @$ @- S. ]. V; [# y9 \
  29. Worker::runAll();5 O% R/ e1 X2 G; \$ W3 u& S
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:( T5 y. E0 I. y
  31. + [- n, S% f. T8 A8 E" Y
  32. use Workerman\Worker;
    / N9 [3 ^5 [/ x2 {! Y( B
  33. require_once './Workerman/Autoloader.php';5 B0 i* b: Y+ [$ u1 s! u( `7 {" k; E1 ^

  34. 1 j7 p$ m! [* |1 |* V0 N
  35. $worker = new Worker('text://0.0.0.0:2015');' E9 Q  l3 R$ A0 Z/ E0 @+ @  E4 w2 m
  36. // 4个进程
    & [- `: m. m5 R
  37. $worker->count = 4;& K$ U: O: Q5 @! u% `
  38. // 每个进程启动后在当前进程新增一个Worker监听
    2 O1 O( Z/ x4 y8 x6 ?3 j7 N: |
  39. $worker->onWorkerStart = function($worker)7 g) F/ e: F# O5 C, n% U
  40. {
    7 [- g) @6 i# M
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    4 E+ W/ j, f6 i. r/ H4 O
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    + c. `- q) b9 w& i  K. s1 l5 i
  43.     $inner_worker->reusePort = true;
    . y3 |: }% N6 q" u1 D4 a
  44.     $inner_worker->onMessage = 'on_message';
    # ]; z; S8 Z. H$ b5 Q( ^  O/ v3 C
  45.     // 执行监听。正常监听不会报错
    % \6 u7 D* B& c1 y4 |/ b( x
  46.     $inner_worker->listen();
    ; X/ H. x, H& |
  47. };
    1 t/ v9 l2 p. J0 J: I( E, p

  48. 1 y. C, ^. |8 e
  49. $worker->onMessage = 'on_message';
    - d1 b$ J  D5 T' I* h$ Q

  50. , X4 l5 |& k( D
  51. function on_message($connection, $data), h$ V, Z6 s$ l/ g, U
  52. {
    & a* n6 z% p! u% }$ R. f8 x
  53.     $connection->send("hello\n");0 R9 z  g9 M# d6 o
  54. }
    3 ?. t' \: f! l( ^" V# R* @6 A3 J
  55. ! R( `7 r! k. _1 x* L  d8 W  B( R
  56. // 运行worker! d' O7 R: k; U$ }# ^8 M7 _
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    8 I5 C) K) f. \+ ]
  2. use Workerman\Worker;
    - z( a' R9 Z+ P: I
  3. require_once './Workerman/Autoloader.php';
    . T# [! j; ^- b" i" v. ^* d2 h
  4. // 初始化一个worker容器,监听1234端口* D8 D; f6 [8 l0 p. E
  5. $worker = new Worker('websocket://0.0.0.0:1234');8 `" d5 Z. B7 T# m
  6. 6 V: V1 c4 o) w
  7. /*
    ' H! b5 l+ ~! o* J, y2 x
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    4 {5 K8 \  |! E
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true). L3 g4 m1 l2 D2 ~& F+ o
  10. */# v8 p) |; q2 P  v, }" t  q- j
  11. $worker->count = 1;6 |4 Y+ y4 d: U7 n3 K! D
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口( `. M( N, v/ T: j* u
  13. $worker->onWorkerStart = function($worker)2 |# M! ?+ Z. v( Y: v- `) S: }+ [4 c
  14. {0 v. g! c3 j& `6 K) X: H
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    ' o* r/ b$ w! o& E
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');+ P2 M$ \) i3 K4 f+ Q
  17.     $inner_text_worker->onMessage = function($connection, $buffer): Q- ?7 u/ D" n
  18.     {
    6 R( z1 |: \5 m+ d* A/ |5 k
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    1 a+ f: |6 d4 I( |, a8 k" q* b
  20.         $data = json_decode($buffer, true);, i$ U9 i  i5 }5 Z9 e$ V
  21.         $uid = $data['uid'];
    ' I! c! w7 }( Q; U, n
  22.         // 通过workerman,向uid的页面推送数据
    5 j& u+ Q1 ]  W/ G
  23.         $ret = sendMessageByUid($uid, $buffer);1 p6 F$ ]9 O5 ^
  24.         // 返回推送结果6 S' c0 t' @. g
  25.         $connection->send($ret ? 'ok' : 'fail');' E/ W3 R' Z" y: I# A
  26.     };0 K) J8 f1 L# C2 Z) Q
  27.     // ## 执行监听 ##" y( a4 y6 n) m$ T3 }3 q; y
  28.     $inner_text_worker->listen();
    ( D; j5 o9 o$ r9 [1 \. g
  29. };, U5 z( E; S* v3 X% G6 _2 q1 Q4 u
  30. // 新增加一个属性,用来保存uid到connection的映射" r% |4 z- n$ j% @* ]2 I. l2 c
  31. $worker->uidConnections = array();! f& M6 @1 D2 w% B7 G4 p
  32. // 当有客户端发来消息时执行的回调函数
    ; Q: ~: I% m) _$ Q# V( F
  33. $worker->onMessage = function($connection, $data)6 g0 q2 {/ W% V3 M$ ?
  34. {" k2 E' [9 B1 N
  35.     global $worker;
    ! e% `$ u: b' y- g
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    9 B1 H9 T5 G. l4 E3 D3 A
  37.     if(!isset($connection->uid))
    + f: e" o7 H5 w  R' E! b$ g2 d1 _
  38.     {
    8 R6 A) a9 S0 G# y$ W
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    / J! y7 B+ F' y6 S
  40.        $connection->uid = $data;
    * ~5 K4 P( _6 G6 @$ P
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,4 P# K# ^; r1 j6 t& i( v8 B& r
  42.         * 实现针对特定uid推送数据
    2 S1 ]' }! ?. \/ w+ {# X( }
  43.         */7 l8 z4 j4 S) J9 u3 z
  44.        $worker->uidConnections[$connection->uid] = $connection;+ T' u7 ?* J9 W
  45.        return;
    # T/ ^2 A$ H( e! w1 ?  H# R( f
  46.     }
    . y) P* \2 }: y
  47. };: {' P0 x/ I& d3 p" h4 m
  48. 6 w# E; q. j2 x( ^0 Z' Z6 K" x
  49. // 当有客户端连接断开时, k( r8 Q1 ~7 K1 s& f& `+ |; e
  50. $worker->onClose = function($connection)8 z; ?1 \3 _8 Y3 g
  51. {
    ! |: f2 V5 \$ d6 v+ g; n' e; y
  52.     global $worker;  G; C! ]$ v* o  O6 g
  53.     if(isset($connection->uid))
    $ I& H* n8 V& g1 m6 P
  54.     {- ^* b& h) ~! s; I3 M4 Z+ r
  55.         // 连接断开时删除映射
    ) Q6 [8 x4 |' o, ^. i
  56.         unset($worker->uidConnections[$connection->uid]);" j3 Y$ Z6 F; W- }8 @5 k) Q
  57.     }
    2 q7 n) N0 R( j
  58. };6 w" `. h9 [) `  ]3 X" V' ^
  59. 9 V7 w! N  q) U1 P
  60. // 向所有验证的用户推送数据) z/ l& d- r5 r. W9 t
  61. function broadcast($message)& _$ H2 V0 P9 P  K; H, A
  62. {8 \% U6 e4 a6 y' E' B& E7 K
  63.    global $worker;
    , T3 C# E$ A' C
  64.    foreach($worker->uidConnections as $connection)" \' t: M2 j3 n) N6 O# y. u
  65.    {# ?7 N8 {8 W! W
  66.         $connection->send($message);
    7 M6 G' ?' `6 b3 E. p& D+ t& E
  67.    }0 Y# U$ i5 n/ q7 Y6 p( S' G8 k
  68. }
    9 [9 M0 [5 |# X  w& I8 W

  69. 2 R0 d) A9 @5 Y4 O" h+ ]- a8 s+ {: z
  70. // 针对uid推送数据
    8 F0 a# \+ y7 y, W! I' B5 j
  71. function sendMessageByUid($uid, $message): L0 i! J, T+ V6 `
  72. {& D3 L' g: G/ ]% x4 N1 Y9 N3 a. P
  73.     global $worker;  x. V' C! Z9 m! f- x
  74.     if(isset($worker->uidConnections[$uid])), G# M6 [/ w; r8 Y
  75.     {/ N4 F' e& ], V" \8 ]7 o3 y" v/ u
  76.         $connection = $worker->uidConnections[$uid];
    , k9 C, @! \9 f! L$ n% H# h4 A) O
  77.         $connection->send($message);
    0 q+ P9 G1 `/ r) G0 H1 r8 n. i
  78.         return true;
    2 X4 _* j5 r+ U/ R8 X
  79.     }: J  V! e4 e' F" s
  80.     return false;
    3 c* B9 ?5 |# `& a/ w" t9 i+ u
  81. }
    0 w7 t: H" w2 j3 f9 c" ^0 w
  82. + W  M6 j" L# d1 I
  83. // 运行所有的worker
    ! B) T7 r2 Z2 ^! C$ ^- q, j
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    , s6 ]% F0 s1 C# A- G
  2. ws.onopen = function(){
    2 o, X- g- Q3 T, r' ?
  3.     var uid = 'uid1';  P( P# n) ^, \! V
  4.     ws.send(uid);
    # c6 d" o  o! h/ v; A8 y
  5. };
    . c) `: T1 g! o% B+ |/ w
  6. ws.onmessage = function(e){
    % R$ ?' J9 Y( [3 ~/ l) p2 w
  7.     alert(e.data);
    8 I: v# |3 z  O3 m4 e1 W/ {
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口) X* n  E/ n( D; C6 w1 x
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    , |0 x6 D! k( F/ T( T* T# [- s& W' w7 O. w8 Q
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    ) `' `7 T6 g# ?$ a
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');( D* @2 V  C0 ~' ]* `$ x3 n
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符% t. E2 p) X" ^
  6. fwrite($client, json_encode($data)."\n");1 O/ j5 B# E( @1 i" q
  7. // 读取推送结果& _' E* U0 L( x" d' v
  8. echo fread($client, 8192);
复制代码

  K9 a5 h2 U) Y+ y, g- Q
8 e5 s  _( D; B# V8 v; B. p
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-6-20 04:18 , Processed in 0.057340 second(s), 20 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!